|
in reply to Re^3: Proper undefine queue with multithreads in thread Proper undefine queue with multithreads
Hi
The output of omnidb looks as folows
# omnidb -filesystem server:/SHARED '/SHARED (test)' -listdir '/SHAR
+ED/tmp/test'
Type Filename
======================================================================
+=========
File file9
File file8
File file7
File file6
File file5
File file4
File file3
File file2
File file1
Dir dir9
Dir dir8
Dir dir7
Dir dir6
Dir dir5
Dir dir4
Dir dir3
Dir dir2
Dir dir1
Explain what you are going to do with the list of files you are building in @data.
Right now I am just printing out the output. Nothing fancy.
Here the full code of my script. I am not really a perl programmer. Might not be developed the most efficient way.,?p>
#!/usr/bin/perl -w
BEGIN { our($_pathname,$_filename)=($0=~m#(.*)/([^/]+)$#)?($1,$2):("."
+,$0); push @INC,$_pathname; };
sub usage {
################################################################
#
# Title : dpfilesearch.pl
#
# Autor : Christian Sandrini
#
# Description :
print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription";
$_filename
Required Parameters:
--filesystem 'host:dir' Filesystem with format
+ host:fs
ex. host:/SHARED
--label 'label' Label
--dir 'directory' Directory to search
Optional Parameters:
--recursive Recursive search
--maxCount 10000 Maximum allowed item count
--threads 10 Maximul parallel jobs
--exclude dir Can be specified muliple times
EndOfDescription
exit 2
}
# -------------------------
# Required libraries
# -------------------------
use strict;
use Data::Dumper;
use Getopt::Long;
use Term::ANSIColor;
use threads;
use Thread::Queue;
# -------------------------
# Global Variables
# -------------------------
my $omnidb = '/opt/omni/bin/omnidb';
my @data :shared;
my $maxNumberOfParallelJobs = 10;
my $maxNumberOfItems = 10000;
my $itemCount = 0;
my $worker = Thread::Queue->new();
my @IDLE_THREADS :shared;
# -------------------------
# Argument handling
# -------------------------
my( $filesystem, $label, $directory, $recursive, $debug, @exclude );
Getopt::Long::Configure("pass_through");
GetOptions(
q{filesystem=s} => \$filesystem,
q{label=s} => \$label,
q{dir=s} => \$directory,
q{recursive!} => \$recursive,
q{maxCount=i} => \$maxNumberOfItems,
q{threads=i} => \$maxNumberOfParallelJ
+obs,
q{debug!} => \$debug,
q{exclude=s} => \@exclude
);
usage "Invalid argument(s)." if (grep {/^-/o } @ARGV);
my( @args ) = @ARGV;
if ( !($filesystem || $label || $directory) ) {
usage "Not enough arguments." if (! @args );
}
# -------------------------
# Methods
# -------------------------
sub pullDataFromDbWithDirectory {
my $_dir = $_[0];
if ($itemCount <= $maxNumberOfItems) {
my @retval = grep { /^Dir|^File/ } qx($omnidb -filesys
+tem $filesystem '$label' -listdir '$_dir');
foreach my $item (@retval) {
$itemCount++;
(my $filename = $item) =~ s/^File\s+|^Dir\s+|\
+n//g;
my $file = "$_dir/$filename";
if (!($file ~~ @exclude)) {
push(@data,$file);
if ($item =~ /^Dir/) {
$worker->enqueue($file);
print "Add $file to queue\n" i
+f $debug;
}
}
}
}
}
sub doOperation () {
my $ithread = threads->tid();
do {
my $folder = $worker->dequeue();
print "Read $folder from queue with thread $ithread\n"
+ if $debug;
pullDataFromDbWithDirectory($folder);
} while ($worker->pending());
push(@IDLE_THREADS,$ithread);
}
sub printData {
foreach my $file (sort @data) {
print "$file\n";
}
if ($itemCount > $maxNumberOfItems) {
print colored ['red on_black'], "\nWARNING: Maximum it
+em count of $itemCount / $maxNumberOfItems has be
+ en reached. Please adjust y
+our filter\n";
}
}
# -------------------------
# Main
# -------------------------
print "Exclude: " . Dumper(\@exclude) if $debug;
my @threads = map threads->create(\&doOperation), 1 .. $maxNumberOfPar
+allelJobs;
pullDataFromDbWithDirectory($directory);
sleep 0.01 while (scalar @IDLE_THREADS < $maxNumberOfParallelJobs);
$worker->enqueue((undef) x $maxNumberOfParallelJobs);
$_->join for @threads;
printData();
Re^5: Proper undefine queue with multithreads
by BrowserUk (Patriarch) on Jun 05, 2014 at 12:55 UTC
|
Untested and without trying to understand the rest if your code, try the following minimal changes to your posted code:
#!/usr/bin/perl -w
BEGIN {
our($_pathname,$_filename) =
( $0 =~ m[(.*)/([^/]+)$] ) ? ( $1, $2 ) : ( ".", $0 );
push @INC,$_pathname;
};
sub usage {
print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription";
$_filename
Required Parameters:
--filesystem 'host:dir' Filesystem with format
+ host:fs
ex. host:/SHARED
--label 'label' Label
--dir 'directory' Directory to search
Optional Parameters:
--recursive Recursive search
--maxCount 10000 Maximum allowed item count
--threads 10 Maximul parallel jobs
--exclude dir Can be specified muliple times
EndOfDescription
exit 2
}
# -------------------------
# Required libraries
# -------------------------
use strict;
use Data::Dumper;
use Getopt::Long;
use Term::ANSIColor;
use threads;
use Thread::Queue;
# -------------------------
# Global Variables
# -------------------------
my $omnidb = '/opt/omni/bin/omnidb';
my @data :shared;
my $maxNumberOfParallelJobs = 10;
my $maxNumberOfItems = 10000;
my $itemCount = 0;
my $worker = Thread::Queue->new();
my @IDLE_THREADS :shared;
# -------------------------
# Argument handling
# -------------------------
my( $filesystem, $label, $directory, $recursive, $debug, @exclude );
Getopt::Long::Configure("pass_through");
GetOptions(
q{filesystem=s} => \$filesystem,
q{label=s} => \$label,
q{dir=s} => \$directory,
q{recursive!} => \$recursive,
q{maxCount=i} => \$maxNumberOfItems,
q{threads=i} => \$maxNumberOfParallelJobs,
q{debug!} => \$debug,
q{exclude=s} => \@exclude
);
usage "Invalid argument(s)." if (grep {/^-/o } @ARGV);
my( @args ) = @ARGV;
if ( !($filesystem || $label || $directory) ) {
usage "Not enough arguments." if (! @args );
}
sub pullDataFromDbWithDirectory {
my $_dir = $_[0];
if ($itemCount <= $maxNumberOfItems) {
my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $fil
+esystem '$label' -listdir '$_dir');
foreach my $item (@retval) {
$itemCount++;
(my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g;
my $file = "$_dir/$filename";
if (!($file ~~ @exclude)) {
push(@data,$file);
if ($item =~ /^Dir/) {
$worker->enqueue( $file );
print "Add $file to queue\n" if $debug;
}
}
}
}
}
sub doOperation () {
my $ithread = threads->tid();
while( my $folder = $worker->dequeue() ) {
print "Read $folder from queue with thread $ithread\n" if $deb
+ug;
pullDataFromDbWithDirectory($folder);
};
}
sub printData {
foreach my $file (sort @data) {
print "$file\n";
}
if ($itemCount > $maxNumberOfItems) {
print colored ['red on_black'], "\nWARNING: Maximum item count
+ of $itemCount / $maxNumberOfItems has been reached. Please adjust yo
+ur filter\n";
}
}
# -------------------------
# Main
# -------------------------
print "Exclude: " . Dumper( \@exclude ) if $debug;
my @threads = map{
threads->create( \&doOperation )
} 1 .. $maxNumberOfParallelJobs;
pullDataFromDbWithDirectory( $directory );
$worker->enqueue( (undef) x $maxNumberOfParallelJobs );
$_->join for @threads;
printData();
With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] [d/l] |
|
|
Hi
Thanks for your answer. This is exactly how I had it in the beginning. The problem here is that it does not parse all the sub directories. The problem here I think is that as soon as the queue gets empty it hits the undefined entries. But after that, new entries are being pushed into the queue but get ignored because the threads were joined already at that time. Or maybe I misunderstood the logic behind adding undef into the queue.
Here an example:
# ./dpfilesearch.pl --object server:/SHARED --label '/SHARED (test)' -
+-dir /SHARED/tmp/test --recursive --threads 1
/SHARED/tmp/test/dir7
/SHARED/tmp/test/dir7/dir1
/SHARED/tmp/test/dir7/dir2
/SHARED/tmp/test/dir7/dir3
/SHARED/tmp/test/dir7/dir4
/SHARED/tmp/test/dir7/dir5
/SHARED/tmp/test/dir7/dir6
/SHARED/tmp/test/dir7/dir7
/SHARED/tmp/test/dir7/dir8
/SHARED/tmp/test/dir7/dir9
Still pending in queue: 18
While if I search directly within dir7 I get more subfolders reviled.
# ./dpfilesearch.pl --object server:/SHARED --label '/SHARED (test)' -
+-dir /SHARED/tmp/test/dir7 --recursive --threads 1
/SHARED/tmp/test/dir7/dir1
/SHARED/tmp/test/dir7/dir2
/SHARED/tmp/test/dir7/dir3
/SHARED/tmp/test/dir7/dir4
/SHARED/tmp/test/dir7/dir5
/SHARED/tmp/test/dir7/dir6
/SHARED/tmp/test/dir7/dir7
/SHARED/tmp/test/dir7/dir8
/SHARED/tmp/test/dir7/dir9
/SHARED/tmp/test/dir7/dir9/dir1
/SHARED/tmp/test/dir7/dir9/dir2
/SHARED/tmp/test/dir7/dir9/dir3
/SHARED/tmp/test/dir7/dir9/dir4
/SHARED/tmp/test/dir7/dir9/dir5
/SHARED/tmp/test/dir7/dir9/dir6
/SHARED/tmp/test/dir7/dir9/dir7
/SHARED/tmp/test/dir7/dir9/dir8
/SHARED/tmp/test/dir7/dir9/dir9
Still pending in queue: 9
It is the same if I specify more threads than one. I add a $worker->pending() just after printing the output.
| [reply] [d/l] [select] |
|
|
Try this (again untested) code.
I've yet to convince myself that there isn't a race condition here, but if there is, you'll probably find it by running it for real quicker than I will be running it mentally:
#!/usr/bin/perl -w
BEGIN {
our($_pathname,$_filename) =
( $0 =~ m[(.*)/([^/]+)$] ) ? ( $1, $2 ) : ( ".", $0 );
push @INC,$_pathname;
};
sub usage {
print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription";
$_filename
Required Parameters:
--filesystem 'host:dir' Filesystem with format
+ host:fs
ex. host:/SHARED
--label 'label' Label
--dir 'directory' Directory to search
Optional Parameters:
--recursive Recursive search
--maxCount 10000 Maximum allowed item count
--threads 10 Maximul parallel jobs
--exclude dir Can be specified muliple times
EndOfDescription
exit 2
}
# -------------------------
# Required libraries
# -------------------------
use strict;
use Data::Dumper;
use Getopt::Long;
use Term::ANSIColor;
use threads;
use Thread::Queue;
# -------------------------
# Global Variables
# -------------------------
my $omnidb = '/opt/omni/bin/omnidb';
my @data :shared;
my $maxNumberOfParallelJobs = 10;
my $maxNumberOfItems = 10000;
my $itemCount = 0;
my $worker = Thread::Queue->new();
my $idle :shared = $maxNumberOfParallelJobs;
# -------------------------
# Argument handling
# -------------------------
my( $filesystem, $label, $directory, $recursive, $debug, @exclude );
Getopt::Long::Configure("pass_through");
GetOptions(
q{filesystem=s} => \$filesystem,
q{label=s} => \$label,
q{dir=s} => \$directory,
q{recursive!} => \$recursive,
q{maxCount=i} => \$maxNumberOfItems,
q{threads=i} => \$maxNumberOfParallelJobs,
q{debug!} => \$debug,
q{exclude=s} => \@exclude
);
usage "Invalid argument(s)." if (grep {/^-/o } @ARGV);
my( @args ) = @ARGV;
if ( !($filesystem || $label || $directory) ) {
usage "Not enough arguments." if (! @args );
}
sub pullDataFromDbWithDirectory {
my $_dir = $_[0];
if ($itemCount <= $maxNumberOfItems) {
my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $fil
+esystem '$label' -listdir '$_dir');
foreach my $item (@retval) {
$itemCount++;
(my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g;
my $file = "$_dir/$filename";
if (!($file ~~ @exclude)) {
push(@data,$file);
if ($item =~ /^Dir/) {
$worker->enqueue( $file );
print "Add $file to queue\n" if $debug;
}
}
}
}
}
sub doOperation () {
my $ithread = threads->tid();
while( my $folder = $worker->dequeue() ) {
{ lock $idle; --$idle }
print "Read $folder from queue with thread $ithread\n" if $deb
+ug;
pullDataFromDbWithDirectory($folder);
{ lock $idle; ++$idle }
};
}
sub printData {
foreach my $file (sort @data) {
print "$file\n";
}
if ($itemCount > $maxNumberOfItems) {
print colored ['red on_black'], "\nWARNING: Maximum item count
+ of $itemCount / $maxNumberOfItems has been reached. Please adjust yo
+ur filter\n";
}
}
# -------------------------
# Main
# -------------------------
print "Exclude: " . Dumper( \@exclude ) if $debug;
my @threads = map{
threads->create( \&doOperation )
} 1 .. $maxNumberOfParallelJobs;
pullDataFromDbWithDirectory( $directory );
sleep 1 until $idle < $maxNumberOfParallelJobs; ## Wait for (some) thr
+eads to get started
sleep 1 until $idle == $maxNumberOfParallelJobs; ## wait until they a
+re all done
$worker->enqueue( (undef) x $maxNumberOfParallelJobs ); ## Tell'em to
+stop
$_->join for @threads;
printData();
With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] [d/l] |
|
|
|
|
| |
|
|
|
| |
|
|
|
The problem here I think is that as soon as the queue gets empty it hits the undefined entries. But after that, new entries are being pushed into the queue but get ignored because the threads were joined already at that time.
Okay. So now we get to the crux of the real question you should have asked: How to determine when there are no more directories to be processed? (And thus, the threads can safely terminate.)
You cannot have the main thread post the undefs once it has finished the initial population of the queue, because the worker threads will be adding to the queue as they process that initial population.
And you cannot use the absence of queue entries (Q->pending == 0), because another thread may add another directory to the queue a millisecond later.
This is the classic chicken & egg scenario. How can any of the worker threads know when they are finished, when at any given moment in time one of the other worker threads might queue an new directory to be processed?
I don't have an answer for you yet. I'm not sure that there is an answer given the current methodology of your code.
I'm sure that there is a better way, and I'll get back to you once I think of it.
With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] |
|
|