Beefy Boxes and Bandwidth Generously Provided by pair Networks
Come for the quick hacks, stay for the epiphanies.
 
PerlMonks  

Converting a parallel-serial shell script

by Corion (Patriarch)
on Sep 18, 2008 at 11:20 UTC ( [id://712235]=perlquestion: print w/replies, xml ) Need Help??

Corion has asked for the wisdom of the Perl Monks concerning the following question:

Dear monks,

I have a process pipeline that consists of the following steps:

  1. Convert data to TSV (using a Perl script)
  2. Launch the database bulk loader to import the TSV into the DB

I have a machine that has 4 CPUs and hence I want to maximize CPU utilization, as the data takes some while to get converted. Step 1 can be easily parallelized, but step 2 must be serialized as the database bulk loader fails if another bulk loading process is currently loading into the same table. I also want to keep the disk space usage low, so I want to import the data filie as soon as it is written to disk instead of importing all data after converting it. So I need a semaphore on step 2.

I have written a set of shell scripts that nicely do this, using the runN utility by Dominus for convenient parallelization:

BASE=$(cd $(dirname $0); pwd) echo $DAYS THIS=$(basename $0) DB_SEMAPHORE=/tmp/$THIS.$$.import rm -f $DB_SEMAPHORE touch $DB_SEMAPHORE export DB_SEMAPHORE echo "Launching reader in $DB_SEMAPHORE" (cd ..; tail -f $DB_SEMAPHORE | xargs -i ./load.sh {} >>$BASE/import.l +og )& # ./convert.sh echoes the appropriate parameters into $DB_SEMAPHORE ../runN -n 4 ./convert.sh $DAYS # Signal EOF to xargs echo "_" >> $DB_SEMAPHORE wait echo "Import done" rm $DB_SEMAPHORE

There are lots of ugly parts to this shell script, but it works. The ugly parts are:

  1. In (cd ..; tail -f $DB_SEMAPHORE | xargs -i ./load.sh {} >>$BASE/import.log )& even after the import is done, the tail -f process stays alive, at least when I prematurely cancel the shell script.
  2. The parallelization logic in ../runN -n 4 ./convert-wp-for-import.sh $DAYS is topologically separated from the parameter passing logic between the converter and the importer. There is too much action at a distance happening here.

What I'd like is an easy way to write the whole parallelize-then-serialize stuff in Perl. Simple forking doesn't work because I need to pass the data "back up" to the parent process or downwards in a serial fashion so that only one DB import runs at a time, preferrably still without blocking the overall progress, so that all 4 CPUs keep running. Also, of course it would be much nicer to pass around Perl data structures instead of having to manually make sure that the number of columns in the converter script is identical to the number of columns expected by the importer script.

I envision as an imaginary API something like the following:

use strict; use Magic::Parallel max_parallel => 4; my $parallel_handle = parallel sub { my ($self,$payload) = @_; system("convert.sh $payload") == 0 or warn "Couldn't launch: $!/$?"; }, @ARGV; $parallel_handle->serial(sub { my ($self,$payload) = @_; system("load_db.sh $payload") == 0 or warn "Couldn't launch: $!/$?"; });

In practice, the next step would be to eliminate the wrapping shell scripts and to replace them by the real Perl code.

Has anybody done something like this? Is there anything that shields me from serializing the data and then deserializing it like I'd have to do with Parallel::ForkManager?

Update: Just after posting (not after previewing) this, I realize that this would be a prime application for threads, at least under Windows. The target machine runs HP-UX, but at least it's an ActiveState build so threads should be available there too. Is writing a smallish wrapper around threads and Thread::Queue the way to go then?

Replies are listed 'Best First'.
Re: Converting a parallel-serial shell script
by salva (Canon) on Sep 18, 2008 at 12:29 UTC
    A solution using Proc::Queue:

    One process is created to perform the data load while several (4) processes handle the data conversion. The conversors tell to the loader when they finish through a pipe.

    Proc::Queue is used to ensure that only 4 conversors + the loader run at the same time

    # untested: use Proc::Queue size => 4, qw(run_back); my $lpid = open my $loader, '|-'; defined $lpid or die "unable to fork: $!"; if ($lpid) { select $loader; $| = 1; select STDOUT; for (@ARGV) { run_back { if (system("convert.sh $payload") == 0) { print $loader "$payload\n"; } else { warn "Couldn't launch: $!/$?"; } } or warn "unable to fork: $!/$?"; } } else { while (<>) { system "load_db.sh $_") == 0 or warn "Couldn't launch: $!/$?"; } } 1 while wait != -1;
Re: Converting a parallel-serial shell script
by pjotrik (Friar) on Sep 18, 2008 at 11:48 UTC
    I don't see what's the problem with forking, maybe I'm just missing something. What I would do:

    Create a private semaphore
    Fork to create N processes
    In every process
    - convert its part
    - lock the semaphore
    - do the db import
    - raise the semaphore

      "The semaphore" goes into file locking territory. And on operating systems where file locks are only advisory, I want to avoid file locking or at least prefer a prepackaged solution. But your approach of forking the whole script would have the advantage of making the script simple. It would have the disadvantage that I couldn't aggregate the results of the "child" scripts, but I don't need that for the application at hand.

        What if you simply spawn children to do the conversions and then have the parent import the results into the database as each one completes its task? There is only one parent, so the imports will be nice and sequential.

        You don't need locked files if you use files creatively: Have the children put their results into a results###.tmp file. After they have finished, they can rename the file to results###.tsv

        The parent simply waits for the .tsv files to appear, and the files are complete as soon as they appear.

        Or, if the file rename is not atomic enough, you can instead create a flag file results###.tsv.done after the .tsv is finished. The .tsv will be completed and closed and completely ready to be imported before the flag file appears

Re: Converting a parallel-serial shell script
by BrowserUk (Patriarch) on Sep 18, 2008 at 12:29 UTC

    The data you're converting to TSV comes from 1 file, many files, some other source?

    The bulk loader requires it's input be on-disk in a file? (Ie. You cannot feed it from a pipe?)


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      Yes - I get about 20 files per month, and these need to be processed before they can be loaded into the DB.

      And yes, unfortunately, the loading process cannot read from a pipe, as the server process itself wants to read from the file. I want to avoid fancy stuff like trying to make it read from /proc/$$/fd/4, especially as on that machine there is no /proc filesystem.

        Updated: This modified version actually terminates. ++Corion

        Needs some error checking, but this should work:

        #! perl -slw use strict; use threads; use Thread::Queue; our $THREADS ||= 4; my $Qraw = new Thread::Queue; my $Qtsv = new Thread::Queue; sub convert{ $_[ 0 ] } sub toTSV { while( my $filename = $Qraw->dequeue ) { my $outFile = $filename . '.tsv'; open my $fhIn, '<', $filename or warn "$filename : $!" and nex +t; open my $fhOut, '>', $outFile or warn "$outFile : $!" and nex +t; while( <$fhIn> ) { my $tsv = convert( $_ ); print $fhOut $tsv; } close $fhOut; close $fhIn; $Qtsv->enqueue( $outFile ); } $Qtsv->enqueue( undef ); } my @threads = map threads->create( \&toTSV ), 1 .. $THREADS; ## Filenames from command line, 4 threads to terminate $Qraw->enqueue( @ARGV, (undef) x $THREADS ); for( 1 .. $THREADS ) { while( my $tsvFile = $Qtsv->dequeue ) { system "echo $tsvFile"; unlink $tsvFile; } } $_->join for @threads;

        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
Re: Converting a parallel-serial shell script
by talexb (Chancellor) on Sep 19, 2008 at 02:54 UTC

    I don't have complete chunks of code to pass along, but I did want to mention that I used IO::Socket and IPC::Run with great success to run a similar long-running process. I sent commands from the master down to the slaves over a pseudo-tty, checked for updates from the slaves over another pseudo-tty, ran a couple of processes in parallel, and it all worked great.

    And the process tree (ps axf under Linux anyways) looked awesome.

    Alex / talexb / Toronto

    "Groklaw is the open-source mentality applied to legal research" ~ Linus Torvalds

Re: Converting a parallel-serial shell script
by repellent (Priest) on Sep 19, 2008 at 18:30 UTC
    I'm not sure if you would find this useful, but I wrote a module to pipe processes together. It also gives you a choice of whether you'd like to background the pipeline ala ( .. | .. | .. ) & using bg (exe .. , exe .. , exe ..)

    Pipe processes and Perl subroutines together

    Trying somewhat to emulate your imaginary API example:
    use IPC::Exe; ## exports bg() & exe() my $parallel_handle = bg sub { my ($payload) = @_; ## Run one after another, but parallel wr +t to parent process ## &{ exe 'convert.sh', $payload, @ARGV }; die "Couldn't launch: $!/$?" if $! = $? > +> 8; &{ exe 'load_db.sh', $payload }; warn "Couldn't launch: $!/$?" if $! = $? +>> 8; }; my $payload = "ha ha!"; print "Background PID: ", $parallel_handle->($payload), "\n";
Re: Converting a parallel-serial shell script
by kubrat (Scribe) on Sep 22, 2008 at 13:46 UTC

    I would advise against using Threads to solve your problem. There's nothing to gain from using threads since the problem you describe is disk bound and computationally intensive. Forks, on the other hand, are bit simpler to use and have less potential gotchas.

    You could use SystemV semaphores to serialize the bulk loading part of the code, which should be available on any Posix complaint system including Windows, although I have never tried using SystemV semaphores on Windows before.

    To be honest I am not convinced that you need semaphores at all. What if somebody or something else starts doing bulk upload while your script is running? Instead, in every forked process I would try to do the bulk load and if it fails go into sleep for 5 seconds and then try again. And I would keep trying until the bulk load succeedes.

      Perhaps you would enlighten us by posting a forking solution to the OPs problem that's simpler than this?

      And for bonus points, you could try making it also be scalable, portable and efficient?


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

        I think you have failed to see the motivation behind my post. I just wanted to share my thoughts and experience in approaching this type of problem. And that is why I haven't given a code example. It is probably my fault because of the way I have expressed myself but Corion seems to have got it right.

        Your solution proves me wrong. It is neat and elegant and I really like it. But I still think that I make good points when considering the problem of parallelization in more general terms.

        Finally, you could perhaps shed some light on how what I am talking about is not scalable - after all you could fork as many processes as you need. Portable? I am not sure how portable fork and semaphores really are. Though, fork() works for me on Windows with ActivePerl, it appears to be using threads behind the scenes, so does that mean that you the speed benefits of threads without the disadvantages of having to be careful with shared data? Efficient? I don't think that there will be a noticeable difference between a forking and a threading implementation.

      Thanks for your advice. I know that my problem is not disk bound as running four processes instead of one reduces the overall runtime to 25% of the serial runtime. I'm aware that forks() in principle are simpler to use if you don't have to pass information around, but in my case, I have information to pass around to the master process.

      As I'm the only user with administrative privileges on the database, I will be the only one doing bulk uploads and hence I want the bulk uploads to be serialized in the fashion I proposed. Having the conversion retry at fixed (or even at random) intervals creates the risk of flooding the machine with (infinitely) retrying programs, which I dislike.

        If you don't mind me asking what type of DB are you running? Is it Mysql, Oracle, ...?

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://712235]
Approved by lamp
Front-paged by salva
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others avoiding work at the Monastery: (5)
As of 2024-04-24 08:16 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found