Beefy Boxes and Bandwidth Generously Provided by pair Networks
Think about Loose Coupling
 
PerlMonks  

Re^3: Proper undefine queue with multithreads

by BrowserUk (Patriarch)
on Jun 03, 2014 at 20:08 UTC ( [id://1088507]=note: print w/replies, xml ) Need Help??


in reply to Re^2: Proper undefine queue with multithreads
in thread Proper undefine queue with multithreads

Would that still be doable with the normal way you just explained?

Yes. But you're going to have to explain what output you get from this "omnidb" command; because I've never heard of it.

Would you mind showing a small example?

Given that your example code is incomplete, which leaves us none the wiser as to and how a whole bunch of global variables used by pullDataFromDbWithDirectory() -- eg. $itemCount, $maxNumberOfItems, $omnidb, $filesystem, $label, @data -- are initialised/used, it would require gobs of time and guesswork to try and construct a working example.

Post your full code.

Post a sample output from the command.

Explain what you are going to do with the list of files you are building in @data.

Then maybe we stand a chance of helping you.


BTW: Ignore/downvote both existing and any further replies you get from sundialsvc4. He has no working knowledge of using Perl's threading and has a history of posting long, rambling, always useless, often dangerous "suggestions".

He even knows this: " Clearly, none of my suggestions would apply to this particular case. ", but he continues to waste everyones time by posting these useless, over generic replies on subjects that he has been proven, time & time again, to have no first hand knowledge.

Why? I think the poor ol' thing is getting so senile that he genuinely forgets that he's only regurgitating things he's read rather than his own experiences. Sadly, whilst he seems to be able to retrieve odd snippets of generally good advice; he always seems to forget the correct context, rendering them useless.


With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
  • Comment on Re^3: Proper undefine queue with multithreads

Replies are listed 'Best First'.
Re^4: Proper undefine queue with multithreads
by sanc (Initiate) on Jun 04, 2014 at 05:56 UTC
    Hi

    The output of omnidb looks as folows

    # omnidb -filesystem server:/SHARED '/SHARED (test)' -listdir '/SHAR +ED/tmp/test' Type Filename ====================================================================== +========= File file9 File file8 File file7 File file6 File file5 File file4 File file3 File file2 File file1 Dir dir9 Dir dir8 Dir dir7 Dir dir6 Dir dir5 Dir dir4 Dir dir3 Dir dir2 Dir dir1
    Explain what you are going to do with the list of files you are building in @data.

    Right now I am just printing out the output. Nothing fancy.

    Here the full code of my script. I am not really a perl programmer. Might not be developed the most efficient way.,?p>

    #!/usr/bin/perl -w BEGIN { our($_pathname,$_filename)=($0=~m#(.*)/([^/]+)$#)?($1,$2):("." +,$0); push @INC,$_pathname; }; sub usage { ################################################################ # # Title : dpfilesearch.pl # # Autor : Christian Sandrini # # Description : print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription"; $_filename Required Parameters: --filesystem 'host:dir' Filesystem with format + host:fs ex. host:/SHARED --label 'label' Label --dir 'directory' Directory to search Optional Parameters: --recursive Recursive search --maxCount 10000 Maximum allowed item count --threads 10 Maximul parallel jobs --exclude dir Can be specified muliple times EndOfDescription exit 2 } # ------------------------- # Required libraries # ------------------------- use strict; use Data::Dumper; use Getopt::Long; use Term::ANSIColor; use threads; use Thread::Queue; # ------------------------- # Global Variables # ------------------------- my $omnidb = '/opt/omni/bin/omnidb'; my @data :shared; my $maxNumberOfParallelJobs = 10; my $maxNumberOfItems = 10000; my $itemCount = 0; my $worker = Thread::Queue->new(); my @IDLE_THREADS :shared; # ------------------------- # Argument handling # ------------------------- my( $filesystem, $label, $directory, $recursive, $debug, @exclude ); Getopt::Long::Configure("pass_through"); GetOptions( q{filesystem=s} => \$filesystem, q{label=s} => \$label, q{dir=s} => \$directory, q{recursive!} => \$recursive, q{maxCount=i} => \$maxNumberOfItems, q{threads=i} => \$maxNumberOfParallelJ +obs, q{debug!} => \$debug, q{exclude=s} => \@exclude ); usage "Invalid argument(s)." if (grep {/^-/o } @ARGV); my( @args ) = @ARGV; if ( !($filesystem || $label || $directory) ) { usage "Not enough arguments." if (! @args ); } # ------------------------- # Methods # ------------------------- sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesys +tem $filesystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\ +n//g; my $file = "$_dir/$filename"; if (!($file ~~ @exclude)) { push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue($file); print "Add $file to queue\n" i +f $debug; } } } } } sub doOperation () { my $ithread = threads->tid(); do { my $folder = $worker->dequeue(); print "Read $folder from queue with thread $ithread\n" + if $debug; pullDataFromDbWithDirectory($folder); } while ($worker->pending()); push(@IDLE_THREADS,$ithread); } sub printData { foreach my $file (sort @data) { print "$file\n"; } if ($itemCount > $maxNumberOfItems) { print colored ['red on_black'], "\nWARNING: Maximum it +em count of $itemCount / $maxNumberOfItems has be + en reached. Please adjust y +our filter\n"; } } # ------------------------- # Main # ------------------------- print "Exclude: " . Dumper(\@exclude) if $debug; my @threads = map threads->create(\&doOperation), 1 .. $maxNumberOfPar +allelJobs; pullDataFromDbWithDirectory($directory); sleep 0.01 while (scalar @IDLE_THREADS < $maxNumberOfParallelJobs); $worker->enqueue((undef) x $maxNumberOfParallelJobs); $_->join for @threads; printData();

      Untested and without trying to understand the rest if your code, try the following minimal changes to your posted code:

      #!/usr/bin/perl -w BEGIN { our($_pathname,$_filename) = ( $0 =~ m[(.*)/([^/]+)$] ) ? ( $1, $2 ) : ( ".", $0 ); push @INC,$_pathname; }; sub usage { print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription"; $_filename Required Parameters: --filesystem 'host:dir' Filesystem with format + host:fs ex. host:/SHARED --label 'label' Label --dir 'directory' Directory to search Optional Parameters: --recursive Recursive search --maxCount 10000 Maximum allowed item count --threads 10 Maximul parallel jobs --exclude dir Can be specified muliple times EndOfDescription exit 2 } # ------------------------- # Required libraries # ------------------------- use strict; use Data::Dumper; use Getopt::Long; use Term::ANSIColor; use threads; use Thread::Queue; # ------------------------- # Global Variables # ------------------------- my $omnidb = '/opt/omni/bin/omnidb'; my @data :shared; my $maxNumberOfParallelJobs = 10; my $maxNumberOfItems = 10000; my $itemCount = 0; my $worker = Thread::Queue->new(); my @IDLE_THREADS :shared; # ------------------------- # Argument handling # ------------------------- my( $filesystem, $label, $directory, $recursive, $debug, @exclude ); Getopt::Long::Configure("pass_through"); GetOptions( q{filesystem=s} => \$filesystem, q{label=s} => \$label, q{dir=s} => \$directory, q{recursive!} => \$recursive, q{maxCount=i} => \$maxNumberOfItems, q{threads=i} => \$maxNumberOfParallelJobs, q{debug!} => \$debug, q{exclude=s} => \@exclude ); usage "Invalid argument(s)." if (grep {/^-/o } @ARGV); my( @args ) = @ARGV; if ( !($filesystem || $label || $directory) ) { usage "Not enough arguments." if (! @args ); } sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesystem $fil +esystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\n//g; my $file = "$_dir/$filename"; if (!($file ~~ @exclude)) { push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue( $file ); print "Add $file to queue\n" if $debug; } } } } } sub doOperation () { my $ithread = threads->tid(); while( my $folder = $worker->dequeue() ) { print "Read $folder from queue with thread $ithread\n" if $deb +ug; pullDataFromDbWithDirectory($folder); }; } sub printData { foreach my $file (sort @data) { print "$file\n"; } if ($itemCount > $maxNumberOfItems) { print colored ['red on_black'], "\nWARNING: Maximum item count + of $itemCount / $maxNumberOfItems has been reached. Please adjust yo +ur filter\n"; } } # ------------------------- # Main # ------------------------- print "Exclude: " . Dumper( \@exclude ) if $debug; my @threads = map{ threads->create( \&doOperation ) } 1 .. $maxNumberOfParallelJobs; pullDataFromDbWithDirectory( $directory ); $worker->enqueue( (undef) x $maxNumberOfParallelJobs ); $_->join for @threads; printData();

      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.
        Hi Thanks for your answer. This is exactly how I had it in the beginning. The problem here is that it does not parse all the sub directories. The problem here I think is that as soon as the queue gets empty it hits the undefined entries. But after that, new entries are being pushed into the queue but get ignored because the threads were joined already at that time. Or maybe I misunderstood the logic behind adding undef into the queue. Here an example:
        # ./dpfilesearch.pl --object server:/SHARED --label '/SHARED (test)' - +-dir /SHARED/tmp/test --recursive --threads 1 /SHARED/tmp/test/dir7 /SHARED/tmp/test/dir7/dir1 /SHARED/tmp/test/dir7/dir2 /SHARED/tmp/test/dir7/dir3 /SHARED/tmp/test/dir7/dir4 /SHARED/tmp/test/dir7/dir5 /SHARED/tmp/test/dir7/dir6 /SHARED/tmp/test/dir7/dir7 /SHARED/tmp/test/dir7/dir8 /SHARED/tmp/test/dir7/dir9 Still pending in queue: 18
        While if I search directly within dir7 I get more subfolders reviled.
        # ./dpfilesearch.pl --object server:/SHARED --label '/SHARED (test)' - +-dir /SHARED/tmp/test/dir7 --recursive --threads 1 /SHARED/tmp/test/dir7/dir1 /SHARED/tmp/test/dir7/dir2 /SHARED/tmp/test/dir7/dir3 /SHARED/tmp/test/dir7/dir4 /SHARED/tmp/test/dir7/dir5 /SHARED/tmp/test/dir7/dir6 /SHARED/tmp/test/dir7/dir7 /SHARED/tmp/test/dir7/dir8 /SHARED/tmp/test/dir7/dir9 /SHARED/tmp/test/dir7/dir9/dir1 /SHARED/tmp/test/dir7/dir9/dir2 /SHARED/tmp/test/dir7/dir9/dir3 /SHARED/tmp/test/dir7/dir9/dir4 /SHARED/tmp/test/dir7/dir9/dir5 /SHARED/tmp/test/dir7/dir9/dir6 /SHARED/tmp/test/dir7/dir9/dir7 /SHARED/tmp/test/dir7/dir9/dir8 /SHARED/tmp/test/dir7/dir9/dir9 Still pending in queue: 9
        It is the same if I specify more threads than one. I add a $worker->pending() just after printing the output.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1088507]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others musing on the Monastery: (4)
As of 2024-04-25 14:54 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found