Beefy Boxes and Bandwidth Generously Provided by pair Networks
Just another Perl shrine
 
PerlMonks  

Re: Proper undefine queue with multithreads

by BrowserUk (Patriarch)
on Jun 03, 2014 at 13:15 UTC ( [id://1088428]=note: print w/replies, xml ) Need Help??


in reply to Proper undefine queue with multithreads

Your queue handling is fatally flawed.

Why are you exiting your queue reading loop if there is nothing pending?

do { ... } while ($worker->pending());

If (for example), some other process decides to hammer the disk drive, then your queue population code might stall waiting for access to the disk, at which point your queue will empty and all your worker threads will terminate despite that there are still files to be read from disk.

Also, why are you calling (pullDataFromDbWithDirectory(), bith in your main thread and in all your worker threads. That's very confused.


The normal way to do this is:

  1. Have your queue worker thread queue reading loops terminate when they see undef:
    sub doOperation () { my $ithread = threads->tid(); while( my $folder = $worker->dequeue() ) { print "Read $folder from queue with thread $ithread\n" if $deb +ug; ## Do something useful here... } }
  2. Start your threads before you populate the queue:

    They will all block on the dequeue() until something is made available.

  3. Populate your queue from you main thread (*ONLY*);

    The threads will start doing work as soon as your main thread gives them something to work on.

  4. Once the main thread has finished populating the queue, it then queues one undef per worker thread to cause the worker loops to terminate and thus the worker threads to end.

    You are queing undefs, but you're not doing it until you've already seen that your threads have ended, at which point it serves no purpose.

  5. Finally, loop over the thread handles calling join() to ensure all the threads have finished before you exit the program.

This way, the whole process becomes self-managing and you don;t have to poll to count threads.


With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.

Replies are listed 'Best First'.
Re^2: Proper undefine queue with multithreads
by sanc (Initiate) on Jun 03, 2014 at 17:00 UTC
    Thank you for your valuable input. I am not reading a filesystem but issuing an omnidb command which will query HP Dataprotector database for backed up files. There problem here is that it does not support recursive listing. Which means I have to run the omnidb command for each found directory from the previous output. Having that said I can not populate the queue completely. As querying the db takes a lot of time I try to do that in parallel. Would that still be doable with the normal way you just explained? Would you mind showing a small example? Thanks a lot
      Would that still be doable with the normal way you just explained?

      Yes. But you're going to have to explain what output you get from this "omnidb" command; because I've never heard of it.

      Would you mind showing a small example?

      Given that your example code is incomplete, which leaves us none the wiser as to and how a whole bunch of global variables used by pullDataFromDbWithDirectory() -- eg. $itemCount, $maxNumberOfItems, $omnidb, $filesystem, $label, @data -- are initialised/used, it would require gobs of time and guesswork to try and construct a working example.

      Post your full code.

      Post a sample output from the command.

      Explain what you are going to do with the list of files you are building in @data.

      Then maybe we stand a chance of helping you.


      BTW: Ignore/downvote both existing and any further replies you get from sundialsvc4. He has no working knowledge of using Perl's threading and has a history of posting long, rambling, always useless, often dangerous "suggestions".

      He even knows this: " Clearly, none of my suggestions would apply to this particular case. ", but he continues to waste everyones time by posting these useless, over generic replies on subjects that he has been proven, time & time again, to have no first hand knowledge.

      Why? I think the poor ol' thing is getting so senile that he genuinely forgets that he's only regurgitating things he's read rather than his own experiences. Sadly, whilst he seems to be able to retrieve odd snippets of generally good advice; he always seems to forget the correct context, rendering them useless.


      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.
        Hi

        The output of omnidb looks as folows

        # omnidb -filesystem server:/SHARED '/SHARED (test)' -listdir '/SHAR +ED/tmp/test' Type Filename ====================================================================== +========= File file9 File file8 File file7 File file6 File file5 File file4 File file3 File file2 File file1 Dir dir9 Dir dir8 Dir dir7 Dir dir6 Dir dir5 Dir dir4 Dir dir3 Dir dir2 Dir dir1
        Explain what you are going to do with the list of files you are building in @data.

        Right now I am just printing out the output. Nothing fancy.

        Here the full code of my script. I am not really a perl programmer. Might not be developed the most efficient way.,?p>

        #!/usr/bin/perl -w BEGIN { our($_pathname,$_filename)=($0=~m#(.*)/([^/]+)$#)?($1,$2):("." +,$0); push @INC,$_pathname; }; sub usage { ################################################################ # # Title : dpfilesearch.pl # # Autor : Christian Sandrini # # Description : print STDERR "\nERROR: $_[0]\nUsage:\n", <<"EndOfDescription"; $_filename Required Parameters: --filesystem 'host:dir' Filesystem with format + host:fs ex. host:/SHARED --label 'label' Label --dir 'directory' Directory to search Optional Parameters: --recursive Recursive search --maxCount 10000 Maximum allowed item count --threads 10 Maximul parallel jobs --exclude dir Can be specified muliple times EndOfDescription exit 2 } # ------------------------- # Required libraries # ------------------------- use strict; use Data::Dumper; use Getopt::Long; use Term::ANSIColor; use threads; use Thread::Queue; # ------------------------- # Global Variables # ------------------------- my $omnidb = '/opt/omni/bin/omnidb'; my @data :shared; my $maxNumberOfParallelJobs = 10; my $maxNumberOfItems = 10000; my $itemCount = 0; my $worker = Thread::Queue->new(); my @IDLE_THREADS :shared; # ------------------------- # Argument handling # ------------------------- my( $filesystem, $label, $directory, $recursive, $debug, @exclude ); Getopt::Long::Configure("pass_through"); GetOptions( q{filesystem=s} => \$filesystem, q{label=s} => \$label, q{dir=s} => \$directory, q{recursive!} => \$recursive, q{maxCount=i} => \$maxNumberOfItems, q{threads=i} => \$maxNumberOfParallelJ +obs, q{debug!} => \$debug, q{exclude=s} => \@exclude ); usage "Invalid argument(s)." if (grep {/^-/o } @ARGV); my( @args ) = @ARGV; if ( !($filesystem || $label || $directory) ) { usage "Not enough arguments." if (! @args ); } # ------------------------- # Methods # ------------------------- sub pullDataFromDbWithDirectory { my $_dir = $_[0]; if ($itemCount <= $maxNumberOfItems) { my @retval = grep { /^Dir|^File/ } qx($omnidb -filesys +tem $filesystem '$label' -listdir '$_dir'); foreach my $item (@retval) { $itemCount++; (my $filename = $item) =~ s/^File\s+|^Dir\s+|\ +n//g; my $file = "$_dir/$filename"; if (!($file ~~ @exclude)) { push(@data,$file); if ($item =~ /^Dir/) { $worker->enqueue($file); print "Add $file to queue\n" i +f $debug; } } } } } sub doOperation () { my $ithread = threads->tid(); do { my $folder = $worker->dequeue(); print "Read $folder from queue with thread $ithread\n" + if $debug; pullDataFromDbWithDirectory($folder); } while ($worker->pending()); push(@IDLE_THREADS,$ithread); } sub printData { foreach my $file (sort @data) { print "$file\n"; } if ($itemCount > $maxNumberOfItems) { print colored ['red on_black'], "\nWARNING: Maximum it +em count of $itemCount / $maxNumberOfItems has be + en reached. Please adjust y +our filter\n"; } } # ------------------------- # Main # ------------------------- print "Exclude: " . Dumper(\@exclude) if $debug; my @threads = map threads->create(\&doOperation), 1 .. $maxNumberOfPar +allelJobs; pullDataFromDbWithDirectory($directory); sleep 0.01 while (scalar @IDLE_THREADS < $maxNumberOfParallelJobs); $worker->enqueue((undef) x $maxNumberOfParallelJobs); $_->join for @threads; printData();
    A reply falls below the community's threshold of quality. You may see it by logging in.
A reply falls below the community's threshold of quality. You may see it by logging in.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1088428]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others surveying the Monastery: (8)
As of 2024-04-24 11:36 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found