Beefy Boxes and Bandwidth Generously Provided by pair Networks
Do you know where your variables are?
 
PerlMonks  

Threads slurping a directory and processing before conclusion

by TRoderic (Novice)
on Aug 21, 2011 at 20:38 UTC ( #921552=perlquestion: print w/ replies, xml ) Need Help??
TRoderic has asked for the wisdom of the Perl Monks concerning the following question:

Your eminences, exalteds and other worthies, I request your wisdom. The situation is as follows:

1. I have a very large quantity ( 1.2 million) of images that will need to have mappings done of them, based on an as-yet-to-be-written process determining which ones to promote to the processor intensive stage.

2. the determination process itself is expected to take up to .2 seconds on a 3.4 ghz processor per file wholly in memory, based on some early prototypes

3. previous attempts have hit major stability and time snags, even at the prototyping stage due to the sheer volume of files that make up a comprehensive sample.

4. I believe this is a valid application of a threaded program ( though i'll freely admit I could be wrong), alowing e.g. 4 evaluation processes to be running at once on a 4 core system (and possibly distributed beyond one machine, but that's waaay beyond me right now)

5. the first performance snag I hit was the Disk I/O wall, which i'm trying to work around by using RAM as much as possible

6. because the full evaluation process to appply to all these is not yet complete, I need a way of quickly running prototypes on large samples of the data, which I see as being done by using one thread to consume the contents of a test directory directly into memory, while n other threads try processing them when there's something to process.

7. unfortunately, solving this through threaded directory structures consumption has not been successful, as it either goes above the physical limits of the hardware, or as the following code shows ( at least on my system), halts at a limit which is not being handled properly.

use strict;use warnings;use threads; use threads::shared; use File::Slurp; use Digest::MD5; use Thread::Queue;use Thread::Semaphore; main(@ARGV); sub main { my ($dir,$limit) = @_; my $q= Thread::Queue->new(); unless ($dir) {$dir = 'F:/';} unless($limit){$limit = 3145728; } #example my $done:shared; my %mem:shared; $done = 'n'; %mem = ( limit => $limit, total => 0, t_sem=> Thread::Semaphore->new(), f_count => 0, ); my $thread = threads->create(\&slurp_directory_as_bin,$dir,\$q,\% +mem,\$done); my $thread2 = threads->create(\&procqueue,\$q,\$done,\%mem); #my $thread3 = threads->create(\&procqueue,\$q,\$done,\%mem); #my $thread4 = threads->create(\&procqueue,\$q,\$done,\%mem); $thread->join(); print("concluded directory listing\n"); $thread2->join(); #$thread3->join(); #$thread4->join(); print("\nit is done, move on.") } sub procqueue{ my ($q,$done,$mem) = @_; while ($$done eq 'n'){ while ( my $bl = $$q->dequeue_nb()){ #to be replaced with darker magic once this works print("$$bl{name} ($$bl{fpath}) is $$bl{size} and it's md5 + is : " . Digest::MD5::md5_hex($$bl{bin}) . "\n"); $$mem{t_sem}->down(); $$mem{total} = $$mem{total} - $$bl{size}; print("popped to $$mem{total} with ". $$q->pending +()."left\n"); $$mem{t_sem}->up(); if($$mem{total} < 0){print("HAAALT!"); exit;} } sleep 1; print("thread waiting...\n"); } } sub slurp_directory_as_bin{ my ($dir,$queue,$mem,$r) = @_; unless($dir and (-d $dir)){die("first parameter to slurp_directory +_as_bin must be a valid directory [$!]"); } unless($dir =~ m|/$|){$dir .= '/';} # add on trailing slash for go +od measure my @subdirs = ();#list of subdirectories found in the directory, o +nly used in [r]ecursive mode opendir(DIR,$dir); while (my $file = readdir(DIR)){ next if ($file eq '.'|| $file eq '..' ); #skip self and parent + dir next if ($file eq 'RECYCLER' || $file eq 'System Volume Inform +ation'); #skip problem directories on windows root paths my $fpath =$dir.$file; if (-d $fpath){#if the current specimen is a directory if ($r){#if in recursive mode push (@subdirs,$fpath); #put the reference to the s +ubdirectory in storage for later } next;#dont try to process (yet) } my $bina = File::Slurp::read_file( $fpath, binmode => ':raw' ) +; #consume the file as it's component parts into memory my %data = ( bin => $bina, name => $file, fpath => $fpath, size => -s $fpath, ); my $nqd = 1; while ($nqd == 1){ #wait here until we can add the fil +e into the queue unless($data{size} < $$mem{limit}){die("$data{fpath} is la +rger than the total memory limit"); } my $new_total = $$mem{total} += $data{size}; if ($new_total <= $$mem{limit} ){ $$mem{t_sem}->down(); $$mem{total} += $data{size}; $$queue->enqueue(\%data); $$mem{t_sem}->up(); $$mem{f_count}++; $nqd = 0; }else{ print("waiting for space in the queue for $data{fpath} +, trying to add $data{size} to $$mem{total} to make $new_total with a + limit of $$mem{limit} \n"); print($$queue->pending() . "items left \n"); #sleep 1; $new_total = $$mem{total} + $data{size}; # recheck in +case it's changed since then... } } } # end of files in directory foreach my $sdir (@subdirs){ slurp_directory_as_bin($sdir,$queue,$mem,$r); } }

i'm sure i've missed some specifically thread handling documentation somewhere, as pretty much every other problem (e.g. 1 + 0 = 72) i've encountered making this was solved through perldocs or here

Comment on Threads slurping a directory and processing before conclusion
Download Code
Re: Threads slurping a directory and processing before conclusion
by Corion (Pope) on Aug 21, 2011 at 20:44 UTC
    while ( my $bl = $$q->dequeue_nb()){

    There is almost no reason to ever use ->dequeue_nb() in a worker thread. Enqueue undef to tell a worker thread to quit, and enqueue a value (or an arrayref of values for bulk processing) when the worker should do something.

    sub slurp_directory_as_bin{ ...

    Personally, if you have that many files, I would consider to either store all the file names in a database, or to spawn an external process (like ls -lR or dir /b /s) to read in all the filenames. With the database you get more flexibility to slice and dice your dataset, and if it doesn't change (that often), having convenient access makes all the other things more convenient too. With the external process, you can start processing the first files in the "directory reader thread" while it still reads in more files.

      I'd never seen any documentation mentioned that enqueue undef did that, thanks for highlighting it.

        There is no documentation mentioning that because you have to do all of this yourself. You will need to make the receiving threads quit once they read a undef from their input queue.

Re: Threads slurping a directory and processing before conclusion
by BrowserUk (Pope) on Aug 21, 2011 at 21:29 UTC

    Your design means that you are (attempting) to store a huge amount of data, thousands of files, into shared memory. This design doesn't make any sense. Shared memory is comparatively slow, because of locking considerations and has high memory usage in order that you can access it from multiple threads. And that is all entirely unnecessary,

    Rather than loading the contents of the files in your directory traversal thread and then sharing them with all the other threads (so that just one of them can process each of them), the smarter and more efficient design would be to share the names of the files, and have each processing thread: read a path from the queue; slurp the file; process; and then discard it. This will save huge amounts of shared memory contention and speed your throughput, without exhausting memory.

    Here's a quick example that gets the paths of all the .pl files in the sub-tree below the CWD, and processes through them using 4 threads counting the mys they find, before finally totalling the sub-counts and printing out the final tally:

    #! perl -slw use strict; use threads; use Thread::Queue; sub worker { my $Q = shift; my $count = 0; while( my $path = $Q->dequeue ) { chomp $path; my $file = do{ local( @ARGV, $/ ) = $path; <> }; $count += () = $file =~ m[my]g; } return $count; } our $THREADS //= 4; my $Q = new Thread::Queue; my @workers = map{ threads->create( \&worker, $Q ); } 1 .. $THREADS; open DIR, '-|', q[ dir /s /b *.pl ] or die $!; $Q->enqueue( <DIR> ); close DIR; $Q->enqueue( (undef) x $THREADS ); my $count = 0; $count += $_->join for @workers; print "Found $count 'my's"; __END__ C:\test>921522-2.pl Found 20825 'my's

    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
Re: Threads slurping a directory and processing before conclusion
by Clarendon4 (Acolyte) on Aug 22, 2011 at 10:26 UTC
     > 3. previous attempts have hit major stability and
     > time snags, even at the prototyping stage due to the
     > sheer volume of files that make up a comprehensive sample
    

    I notice (based on the "F:/" pathname) that you're on Win32.

    You have a File::Find::find like recursive file processing part in your code. This is always going to be slower than necesary on Win32 when coded in Perl.

    Consider using/writing some C/XS that generates the file list and avoids all the unnecesary stat (-d !) calls by using FindNextFile().

    Also consider using forks over threads. They're easier on Win32 than you might think.

    Take a look at qfind.c and peg in my CPAN directory for ideas:

    http://cpan.mirrors.uk2.net/authors/id/A/AD/ADAVIES/

    Try comparing the time taken for qfind to generate a file list compared to a pure Perl solution eg.

    
    c:\> perl -e "${^WIN32_SLOPPY_STAT}=1; use Time::HiRes; $start = Time::HiRes::time; open Q, 'qfind.exe |'; while (<Q>) {}; close Q; print 'Took ', (Time::HiRes::time - $start)"
    
    c:\> perl -e "${^WIN32_SLOPPY_STAT}=1; use Time::HiRes; use File::Find; $start = Time::HiRes::time; File::Find::find(sub { }, '.'); print 'Took ', (Time::HiRes::time - $start)"
    
    

    On my Perl source directory of ~10_000 files this is <0.3 sec vs 1.7 sec. I suspect on your 1.2 million files this gives a *considerable* speed up.

    Oh, and make sure you  BEGIN { ${^WIN32_SLOPPY_STAT} = 1 }; at the top of your code!

    Good luck.

      qfind is interesting and quite fast, but given that the OP is talking about slurping the contents of millions of image files, the time taken to produce the list of those files is likely to be completely insignificant.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.
Re: Threads slurping a directory and processing before conclusion
by Anonymous Monk on Aug 22, 2011 at 11:53 UTC
    That "disk I/O wall" is the only wall you need to be concerned about. It IS "the ruling constraint," such that nothing else matters.
Re: Threads slurping a directory and processing before conclusion
by Voronich (Hermit) on Aug 22, 2011 at 13:40 UTC
    If you're (as has been conjectured) on win32 I'd suggest pulling threading then running while watching task manager to see what the distribution of load over cores is.

    I think it's probably moot though since you're almost certainly going to be IO bound, not cpu bound.

    Make sure you're solving the right problem before adding infrastructure :)

    Me
Reaped: Re: Threads slurping a directory and processing before conclusion
by NodeReaper (Curate) on Aug 22, 2011 at 13:46 UTC
Re: Threads slurping a directory and processing before conclusion
by jdrago999 (Pilgrim) on Aug 22, 2011 at 21:55 UTC

    I am just wrapping up a project *very* similar to this right now. Except that the environment is Ubuntu Linux.

    I used forks and forks::shared which mimics the threads interface.

    The whole thing ran in the Amazon cloud, copying files from one S3 bucket to another S3 bucket. This meant that I was also able to scale this thing out across multiple machines, copying files together.

    I wanted to be able to stop/restart/reset the thing at will, so I created a database with a "jobs" table that looked something like this:

    create table job_lists ( job_list_id bigint unsigned not null primary key auto_increment, name varchar(100) not null, unique(name) ) engine=innodb charset=utf8; create table jobs ( job_id bigint unsigned not null primary key auto_increment, job_list_id bigint unsigned not null, reference_key varchar(100) not null, data text, is_started tinyint not null default 0, started_on timestamp null, is_completed tinyint not null default 0, completed_on timestamp null, unique(job_list_id, reference_key), foreign key fk_jobs_to_job_lists (job_list_id) references job_lists (job_list_id) on delete cascade ) engine=innodb charset=utf8;

    The first step was to get a list of all the files to be copied, and them as records in the "jobs" table. I call this the "Enqueue" step.

    The next step was to create a webservice running under Plack which has 2 functions: get_jobs (returns a list of new job data as JSON) and update_jobs - (given a JSON array of job ids, marks them all as "is_completed=1, completed_on=now()").

    Finally the "Dequeue" step. A simple perl script which loops through the following:

    • Fetch new jobs from the webservice if we don't have enough to distribute to our worker threads.
    • Spawn threads until we have enough.
    • If there are finished jobs, ask the webservice to mark them as completed.
    #!/usr/bin/perl -w # Basically: use strict; use warnings 'all'; use forks; use forks::shared; use Time::HiRes 'gettimeofday'; my $max_workers = 10; my $IS_RUNNING : shared = 1; my @new_jobs : shared = ( ); my @finished_jobs : shared = ( ); my $finished_count : shared = 0; my $job_chunk_size = 10; my $start_time; # Handle signals: $SIG{INT} = $SIG{TERM} = sub { SCOPE: { lock($IS_RUNNING); $IS_RUNNING = 0; }; }; # Main loop: while( $IS_RUNNING ) { # Create some workers if we don't have enough: my $running_procs = scalar( eval { $_->is_running } threads->list ); for( $running_procs..$max_workers ) { threads->create( \&worker ); }# end for() # Get some jobs if we don't have enough: if( scalar(@new_jobs) < $max_workers * $job_chunk_size ) { if( my @unfiled_jobs = get_next_chunk_of_jobs() ) { SCOPE: { lock(@new_jobs); push @new_jobs, @unfiled_jobs; }; }# end if() }# end if() $start_time ||= gettimeofday(); # Mark-as-completed any jobs which are finished: if( @finished_jobs ) { my @out = ( ); SCOPE: { lock(@finished_jobs); @out = splice(@finished_jobs, 0, scalar(@finished_jobs)); }; mark_as_completed( @out ); }# end if() # Print the rate at which we are finishing work: my $diff = gettimeofday() - $start_time; my $rate = $finished_count / $diff; warn "[Main] $finished_count jobs done in $diff sec: $rate/sec\n"; # And wait a moment...: sleep(5); }# end while() # We are shutting down...: warn "\nShutting down...\n"; map { eval { $_->join } } threads->list; # This is what the worker processes do: sub worker { my $tid = threads->tid; CHUNK: while( $IS_RUNNING ) { my @jobs = (sub{ SCOPE: { lock(@new_jobs); return splice(@new_jobs, 0, $job_chunk_size); }; })->(); # Did we get any jobs?: unless( @jobs ) { sleep(4); next CHUNK; }# end unless() # Process the jobs: map { process_job( $_ ) } @jobs; # Finished with this chunk of jobs: SCOPE: { lock(@finished_jobs); push @finished_jobs, map { $_->{job_id} } @jobs; lock($finished_count); $finished_count += scalar(@jobs); }; }# end while() }# end worker() # Your code here: sub process_job { my ($job) = @_; # Do your thing here... }# end process_job()

    Over the weekend, I processed some 4Million images (several terabytes) in about 4 hours using a script very similar to this.

      That is a work of thunderous beauty and I thank you tremendously for sharing it. Admittedly I only understand about 2/3rds of it right now but it's about time I move on from begginer level syntax, as I believe you've saved me several weekends of effort. Thanks again.
Reaped: Re: Threads slurping a directory and processing before conclusion
by NodeReaper (Curate) on Aug 23, 2011 at 02:47 UTC
Re: Threads slurping a directory and processing before conclusion
by TRoderic (Novice) on Sep 06, 2011 at 21:00 UTC
    Thanks for all the input. I've made a start on combining what i've learned from this into a reusable program, replicated here in case it proves useful to somebody.
    The module
    package MTD; use strict; use warnings; our $VERSION = "0.1"; use threads; use Thread::Queue; sub weave_dir{ # MTD::weave_dir($directory,$limit,$queueobjectref,\&f +unction,$functionvar); my ($directory,$limit,$q,$function,$var) = @_; unless(-d $directory){die("$directory is not a directory; weave_di +r gave up on [$!]");} #what happens if this process is called by another process which h +as it's own collection of threads ? if(scalar(threads->list()) != 0 ){ print("WARNING: weave_dir is st +arting with threads already in existance, no guarantee what will happ +en\n");} #unless we have an existing queue variable being passed to us, mak +e one. unless($q){ $q = Thread::Queue->new(); #create shared queue } my $lister = threads->create(\&enqueue_directorylist,$directory,\$ +q,1); # create directory listing thread. while( scalar(threads->list()) < $limit){threads->create(\&qnanny, +\$q,$function,$var);} # create a queue watching process so long as we +'re under limit for threads. #close the listing process and assign it's opportunity cost thread + to another queue processing function. $lister->join(); undef $lister; threads->create(\&qnanny,\$q,$function,$var); my @threads = threads->list(); #get listing of thread objects curr +ently in existance (potential hazard when other threads from calling +process are running ?) #dump undef into the queue for each thread running,nanny thread st +ops when it recieves undef. foreach my $t (@threads){ $q->enqueue(undef); } #actually join all the weave_dir threads. foreach my $z (@threads){ #print("joining $z\n"); $z->join(); } #print("done"); } sub qnanny{ # qnanny($queueobject,$functiontouse,$singlescalarvariable +topasstothefunctiontouse); my ($q,$func,$var) = @_; #while there objects in this queue and if it's an actual value as +opposed to undef while (my $r = $$q->dequeue()){ #pass a reference to the object in the queue to the function s +pecified, along with a single scalar variable. $func->(\$r,$var); } #print(threads->tid() . " is terminating\n"); #we're done threads->exit(); } sub enqueue_directorylist{ # enqueue_directorylist($director,$queueref +erence,$recursivemode); my ($dir,$queue,$r) = @_; my @subdirs = ();# array for list of subdirectories found in the d +irectory, only used in [r]ecursive mode opendir(DIR,$dir) or die "could not open $dir: $!"; # create the d +irectory sequence while (my $file = readdir(DIR)){ #while we have a file to evalu +ate next if ($file eq '.'|| $file eq '..' ); #skip self and parent + dir next if ($file eq 'RECYCLER' || $file eq 'System Volume Inform +ation'); #skip problem directories on windows root paths #if the current specimen is a directory if (-d $dir.$file){ #if in recursive mode if ($r){ #put the reference to the subdirectory in storage for +later push (@subdirs, $dir.$file.'/'); } # otherwise next; #dont try to process (yet) } #if we got this far, add the full file path to the directory $$queue->enqueue("$dir$file"); } #repeat the above until.... foreach my $sdir (@subdirs){ enqueue_directorylist($sdir,$queue,$r); } } return 1;

    a simple directory listing process
    use MTD; use strict; use warnings; use threads; use threads::shared; use Thread::Semaphore; main(); sub main{ my $directory = q#F:/#; my $limit = 4; my $queueobjectref = undef; my %fvar :shared; my @ar :shared; %fvar = ( t_sem => Thread::Semaphore->new(), array => \@ar, ); MTD::weave_dir($directory,$limit,$queueobjectref,\&push_fname,\%fv +ar); sleep(1); foreach my $i (@{$fvar{array}}){ print("$i\n"); } } sub push_fname{ my($filename,$var) = @_; $$var{t_sem}->down(); push(@{$$var{array}}, $$filename); # reference to an array within +a hash reference as an array $$var{t_sem}->up(); }

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://921552]
Approved by Corion
Front-paged by Corion
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others exploiting the Monastery: (8)
As of 2014-12-18 02:55 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    Is guessing a good strategy for surviving in the IT business?





    Results (41 votes), past polls