Beefy Boxes and Bandwidth Generously Provided by pair Networks
There's more than one way to do things
 
PerlMonks  

Re: Threads slurping a directory and processing before conclusion

by TRoderic (Novice)
on Sep 06, 2011 at 21:00 UTC ( #924463=note: print w/ replies, xml ) Need Help??


in reply to Threads slurping a directory and processing before conclusion

Thanks for all the input. I've made a start on combining what i've learned from this into a reusable program, replicated here in case it proves useful to somebody.
The module

package MTD; use strict; use warnings; our $VERSION = "0.1"; use threads; use Thread::Queue; sub weave_dir{ # MTD::weave_dir($directory,$limit,$queueobjectref,\&f +unction,$functionvar); my ($directory,$limit,$q,$function,$var) = @_; unless(-d $directory){die("$directory is not a directory; weave_di +r gave up on [$!]");} #what happens if this process is called by another process which h +as it's own collection of threads ? if(scalar(threads->list()) != 0 ){ print("WARNING: weave_dir is st +arting with threads already in existance, no guarantee what will happ +en\n");} #unless we have an existing queue variable being passed to us, mak +e one. unless($q){ $q = Thread::Queue->new(); #create shared queue } my $lister = threads->create(\&enqueue_directorylist,$directory,\$ +q,1); # create directory listing thread. while( scalar(threads->list()) < $limit){threads->create(\&qnanny, +\$q,$function,$var);} # create a queue watching process so long as we +'re under limit for threads. #close the listing process and assign it's opportunity cost thread + to another queue processing function. $lister->join(); undef $lister; threads->create(\&qnanny,\$q,$function,$var); my @threads = threads->list(); #get listing of thread objects curr +ently in existance (potential hazard when other threads from calling +process are running ?) #dump undef into the queue for each thread running,nanny thread st +ops when it recieves undef. foreach my $t (@threads){ $q->enqueue(undef); } #actually join all the weave_dir threads. foreach my $z (@threads){ #print("joining $z\n"); $z->join(); } #print("done"); } sub qnanny{ # qnanny($queueobject,$functiontouse,$singlescalarvariable +topasstothefunctiontouse); my ($q,$func,$var) = @_; #while there objects in this queue and if it's an actual value as +opposed to undef while (my $r = $$q->dequeue()){ #pass a reference to the object in the queue to the function s +pecified, along with a single scalar variable. $func->(\$r,$var); } #print(threads->tid() . " is terminating\n"); #we're done threads->exit(); } sub enqueue_directorylist{ # enqueue_directorylist($director,$queueref +erence,$recursivemode); my ($dir,$queue,$r) = @_; my @subdirs = ();# array for list of subdirectories found in the d +irectory, only used in [r]ecursive mode opendir(DIR,$dir) or die "could not open $dir: $!"; # create the d +irectory sequence while (my $file = readdir(DIR)){ #while we have a file to evalu +ate next if ($file eq '.'|| $file eq '..' ); #skip self and parent + dir next if ($file eq 'RECYCLER' || $file eq 'System Volume Inform +ation'); #skip problem directories on windows root paths #if the current specimen is a directory if (-d $dir.$file){ #if in recursive mode if ($r){ #put the reference to the subdirectory in storage for +later push (@subdirs, $dir.$file.'/'); } # otherwise next; #dont try to process (yet) } #if we got this far, add the full file path to the directory $$queue->enqueue("$dir$file"); } #repeat the above until.... foreach my $sdir (@subdirs){ enqueue_directorylist($sdir,$queue,$r); } } return 1;

a simple directory listing process
use MTD; use strict; use warnings; use threads; use threads::shared; use Thread::Semaphore; main(); sub main{ my $directory = q#F:/#; my $limit = 4; my $queueobjectref = undef; my %fvar :shared; my @ar :shared; %fvar = ( t_sem => Thread::Semaphore->new(), array => \@ar, ); MTD::weave_dir($directory,$limit,$queueobjectref,\&push_fname,\%fv +ar); sleep(1); foreach my $i (@{$fvar{array}}){ print("$i\n"); } } sub push_fname{ my($filename,$var) = @_; $$var{t_sem}->down(); push(@{$$var{array}}, $$filename); # reference to an array within +a hash reference as an array $$var{t_sem}->up(); }


Comment on Re: Threads slurping a directory and processing before conclusion
Select or Download Code

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://924463]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others having an uproarious good time at the Monastery: (17)
As of 2015-07-31 20:21 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    The top three priorities of my open tasks are (in descending order of likelihood to be worked on) ...









    Results (281 votes), past polls