Beefy Boxes and Bandwidth Generously Provided by pair Networks
good chemistry is complicated,
and a little bit messy -LW
 
PerlMonks  

Threaded Application Sequencing/Rendezvous

by learnedbyerror (Beadle)
on Jul 10, 2012 at 22:39 UTC ( #980970=perlquestion: print w/ replies, xml ) Need Help??
learnedbyerror has asked for the wisdom of the Perl Monks concerning the following question:

Oh learned monks,

Yet again, I need your assistance. I am looking for something similar to the IBM JCL of my youth. I have a moderately complex application sequence containing 11 steps that I am currently running serially. I have written 4 of these steps to be multi-threaded and have greatly reduced my run time. I am currently looking for an elegant solution to allow me to run some of the other steps in parallel, to further reduce run time, and to allow me to test the steps for success and either restart or abend.

The step sequence is shown in the following ASCII diagram:

==ACTIONS= ===============TIME==============
           0   1   2   3   4   5   6   7   8
Action  1  |-->|
Action  1a     |-->|
Action  2  |-->|
Action  2a     |-->|
Action  3          |-->|
Action  4              |-->|
Action  5                  |-->|
Action  6                      |-->|
Action  7                  |------>|
Action  8                          |-->|
Action  9                              |-->|
Action 10          |-->|
Action 11                              |-->|

I can currently achieve my functional goals through writing a hard coded script to carry this out; however, this is not elegant and would contain quite a bit of code itself.

I have spent many hours over the last couple of months looking through CPAN trying to find a solution that would let me model the dependencies and have the orchestration of the steps handled via a job management system. While I have found some possible solutions such as Gearman and BPM::Engine, the solutions are either overkill and/or more complex to setup and maintain than a hand tooled script.

My ideal solution would contain the following characteristics:

  • Configured via a markup language file
  • Provide a way to query current status and runtime statistics from the steps being executed
  • Be a turnkey solution that does not required a lot of work to install on a new system

Thank You in advance for sharing your wisdom with me

lbe


Update:

After a couple of cups of coffee, a couple of sheets of scratch paper, and an hour in vi, I have an approach which I am going to code up. It is a combination of the recommendations of BrowserUK and robiticus.

I am going to store the configuration using perl reference syntax. This is a little more verbose than other alternatives; however, it will eliminate some coding; I may change this in the future if proves hard to support.

I have roughed out the beginnings of a base object that will contain all of the logic to run the job (i.e. ready, running, complete, abend, ...) I will sub-class this object to handle updating a global refernce with the current state/running information on each step to support the reporting.

And lastly, I will use a main loop to iterate over the list of objects calling ready and done methods for each object. When ready is called, the object will exit immediately if it is in the running or done state. Otherwise, it will check the status of its predecessors in the global reference.and will run the step if the predecessors are complete or set status to cancelled if the predecessors have abended or cancelled. Subsequents calls to the complete method will update its status when the step ends, successfully or unsuccesfully.

I am going to hold off on the reporting side until I get the basic above solid. Then I'll add a small RPC server that will dump the global reference to the caller. I can then build a small separate program to support its monitoring.

Thanks for the ideas!! lbe


Update 2:

After letting the recommendations sink in, I crafted/hacked a solution combining the input from BrowserUK and roboticus. I created an object that contains the minimal information needed to to sequence the job steps. My intent is to sub-class this as needed to address any needs specific to a specific job step - such as incremental run-time statistics for some of the heavily threaded longer running steps. I created a script that uses my object that contains a main loop and and sub-routines to load the sequence data and a do-nothing routine to use in the test.

Please take a look and provide any feedback that you may have.

Thanks, lbe

The Module (Step::Base)

package Step::Base; use v5.10.1; use threads; use threads::shared; use namespace::autoclean; #use feature qw( switch ); use Moose; has 'name' => ( is => 'ro', isa => 'Str', required => 1, ); has 'code_to_run' => ( is => 'ro', isa => 'Str', required => 1, ); has 'args' => ( is => 'ro', isa => 'ArrayRef', ); has 'return_code' => ( is => 'rw', isa => 'ArrayRef', ); has 'state' => ( is => 'rw', isa => 'Str', default => 'Created', ); has 'success' => ( is => 'rw', isa => 'Int', ); has 'error' => ( is => 'rw', isa => 'Str', ); has 'time_started' => ( is => 'rw', isa => 'Int', ); has 'time_finished' => ( is => 'rw', isa => 'Int', ); has 'success' => ( is => 'rw', isa => 'Int', ); has 'predecessors' => ( is => 'ro', isa => 'ArrayRef', ); has 'thr' => ( is => 'rw', isa => 'Object', ); has 'steps' => ( is => 'ro', ); sub run { my ( $self, ) = @_; $self->time_started(time); my $thr = threads->create( $self->code_to_run, $self->args ); if ( $thr->error ) { $self->state('Abended'); $self->error(@$); $self->time_finished(time); $self->success(0); return( 0 ); } else { $self->thr($thr); $self->success(1); $self->state('Running'); return( 1 ); } } ## ---------- end sub run sub ready { my ( $self, ) = @_; return(0) unless $self->state eq 'Created'; $self->_predecessors_complete( $self->predecessors ) ? return( +1) : return(0); } ## ---------- end sub ready sub running { my ( $self, ) = @_; $self->state eq "Running" ? return(1) : return(0) } ## ---------- end sub running sub done { my ( $self, ) = @_; # check to see if thread is ready to join and join if it is given( $self->state ) { when( 'Running' ) { my @joinable = threads->list(threads::joinable); foreach my $thr (@joinable) { next unless ( $thr->tid == $self->thr->tid ); my $rc = $thr->join; if ( $rc->{success} ) { $self->state('Completed'); } else { $self->state('Abended'); my $err_msg; if ( my $thread_error = $thr-> +error ) { $err_msg = "$thread_er +ror\n" } $err_msg .= $rc->{err_msg}; $self->error($err_msg); } $self->time_finished(time); return (1); } return (0); } when( 'Completed' ) { return(1); } when( 'Abended' ) { return(1); } when( 'Cancelled' ) { return(1); } default { return (0); } } } ## ---------- end sub done sub _predecessors_complete { my ( $self, $predecessors ) = @_; foreach my $p ( @{ $predecessors } ) { #given( $main::steps->{ $p }->state ) { given( $self->steps->{ $p }->state ) { when ("Complete") { next; } when ("Created") { return(0); } when ("Running") { return(0); } when ("Cancelled") { $self->state("Cancelled"); return(0); } when ("Abended") { $self->state("Cancelled"); return(0); } } } return ( 1 ); } ## ---------- end sub _predecessors_status sub time_elapsed { my ( $self, ) = @_; given( $self->state ) { when( 'Running' ) { return ( time - $self->time_started ); } when( 'Completed' ) { return ( $self->time_finished - $self->time_started ); } default { return (undef); } } } ## ---------- end sub time_elapsed 1; __PACKAGE__->meta->make_immutable;

The control script

use strict; use warnings; use v5.10.1; use Step::Base; my ( $steps, $step_names ) = load_steps(); while (1) { my $cnt_not_ready = 0; my $cnt_running = 0; my $cnt_done = 0; FOR: foreach my $name ( @{$step_names} ) { if ( $steps->{$name}->done ) { $cnt_done++; } elsif ( $steps->{$name}->state eq 'Created' ) { if ( $steps->{$name}->ready ) { $steps->{$name}->run ? $cnt_running++ : $cnt_done++; } else { $cnt_not_ready++; } } elsif ( $steps->{$name}->running ) { $cnt_running++; } else { die "Something ain't right!\n" . "$name\-\>state = " . $steps->{$name +}->state . "\n" . "\tRunning: " . $steps->{$name}->run +ning . "\n" . "\t Done: " . $steps->{$name}->don +e . "\n"; } printf( "%-15s %s\n", $steps->{$name}->state, $name ); } printf( "Not Ready: %d Running: %d Done: %d\n", $cnt_not_ready, +$cnt_running, $cnt_done ); last if ( $cnt_done >= scalar( @{$step_names} ) ); sleep(5); } sub load_steps { my $steps; my $step_names; my $conf = define_steps_conf(); foreach my $step ( @{$conf} ) { my ( $name, $args ) = each %{$step}; $args->{name} = $name; $args->{steps} = $steps; my $s = Step::Base->new($args); $steps->{$name} = $s; push( @{$step_names}, $name ); 1; } return ( $steps, $step_names ); } ## ---------- end sub load_steps sub define_steps_conf { my $steps = [ { 'Action01' => { code_to_run => 'main::do_nothing', predecessors => [], } }, { 'Action01a', => { code_to_run => 'main::do_nothing', predecessors => [ 'Action01', ], } }, { 'Action02' => { code_to_run => 'main::do_nothing', predecessors => [], } }, { 'Action02a' => { code_to_run => 'main::do_nothing', predecessors => [ 'Action02', ], } }, { 'Action03' => { code_to_run => 'main::do_nothing', predecessors => [ 'Action01a', 'Action02a', ], } }, { 'Action04' => { code_to_run => 'main::do_nothing', predecessors => [ 'Action03', ], } }, { 'Action05' => { code_to_run => 'main::do_nothing', predecessors => [ 'Action04', ], } }, { 'Action06' => { code_to_run => 'main::do_nothing', predecessors => [ 'Action05', ], } }, { 'Action07' => { code_to_run => 'main::do_nothing', predecessors => [ 'Action05', ], } }, { 'Action08' => { code_to_run => 'main::do_nothing', predecessors => [ 'Action06', 'Action07' ], } }, { 'Action09' => { code_to_run => 'main::do_nothing', predecessors => [ 'Action08', ], } }, { 'Action10' => { code_to_run => 'main::do_nothing', predecessors => [ 'Action01a', 'Action02a' ], } }, { 'Action11' => { code_to_run => 'main::do_nothing', predecessors => [ 'Action08', ], } }, ]; return ($steps); } ## ---------- end sub define_steps_conf sub do_nothing { sleep int( rand(5) + 5.5 ); if ( int( rand() + 0.9 ) ) { # fail 10% of the time return ( { success => 1 } ); } else { return ( { success => 0, err_msg => "I screw up" } ); } } ## ---------- end sub do_nothing

Comment on Threaded Application Sequencing/Rendezvous
Select or Download Code
Re: Threaded Application Sequencing/Rendezvous
by onelesd (Pilgrim) on Jul 10, 2012 at 23:14 UTC

      I did consider Parallel::Forkmanager, about a half dozen other fork management modules in CPAN and doing it by forks manually. The need of the individual threads to access a shared corpus of data requires a lot more work than than the threads::shared module. So after a lot of testing with both forks and threads, I decided to go the threads route.

      Thanks, lbe

Re: Threaded Application Sequencing/Rendezvous
by BrowserUk (Pope) on Jul 11, 2012 at 00:23 UTC
    • Configured via a markup language file
    • Provide a way to query current status and runtime statistics from the steps being executed
    • Be a turnkey solution that does not required a lot of work to install on a new system

    Two out of three ain't bad :) And the third could be added fairly simply using a thread and a listener:

    #! perl -slw use strict; my( %scripts, %deps, %sched, @starts ); while( <DATA> ) { chomp; my( $name, $script, @deps ) = split '[ \t:,]+'; $scripts{ $name } = $script; if( @deps ) { $deps{ $name } = { map{ $_ => 1 } @deps }; push @{ $sched{ $_ } }, $name for @deps; } else { push @starts, $name; } } my %running = map{ system( 1, $scripts{ $_ } ) => $_ } @starts; while( keys %running ) { my $donePid = wait; my $done = delete $running{ $donePid } or next; for my $action ( @{ $sched{ $done } } ) { delete $deps{ $action }{ $done }; next if keys %{ $deps{ $action } }; $running{ system( 1, $scripts{ $action } ) } = $action; } } __DATA__ Action1: action1.pl Action1a: action1a.pl Action1 Action2: action2.pl Action2a: action2a.pl Action2 Action3: action3.pl Action1a, Action2a Action4: action4.pl Action3 Action5: action5.pl Action4 Action6: action6.pl Action5 Action7: action7.pl Action5 Action8: action8.pl Action6, Action7 Action9: action9.pl Action8 Action10: action10.pl Action1a, Action2a Action11: action11.pl Action8

    And a run against action scripts that print "starting"; sleep 10; print "Ending":

    [ 1:14:45.39] C:\test>980970 Action1 starting Action2 starting Action1 ending Action2 ending Action1a starting Action2a starting Action1a ending Action2a ending Action3 starting Action10 starting Action3 ending Action10 ending Action4 starting Action4 ending Action5 starting Action5 ending Action6 starting Action7 starting Action6 ending Action7 ending Action8 starting Action8 ending Action9 starting Action11 starting Action9 ending Action11 ending

    Of course, it needs some error checking and system(1, ... ) only works on Windows, so that would need changing for other OSs, but as a starting point it shows that you don't need to get complicated for something like this.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

    The start of some sanity?

      This is interesting. While it doesn't provide a reporting framework, I can roll my own using an additional monitoring thread. I have mocked this up but didn't want to write it if there was some framework already available.

      I'll play around with this.

      Thanks, lbe

Re: Threaded Application Sequencing/Rendezvous
by roboticus (Canon) on Jul 11, 2012 at 00:33 UTC

    learnedbyerror:

    The last time I did a system like that it worked pretty much like this:

    #!/usr/bin/perl use strict; use warnings; my %Jobs = ( Action1 => { status=>'READY', }, + Action1a=> { status=>'READY', prereqs => [ qw(Action1) ], }, Action2 => { status=>'READY', }, Action2a=> { status=>'READY', prereqs => [ qw(Action2) ], }, Action3 => { status=>'READY', prereqs => [ qw(Action1a Action2a) ] +, }, Action4 => { status=>'READY', prereqs => [ qw(Action3) ], }, ); my @task_list = sort keys %Jobs; my $cnt=0; while (@task_list) { ++$cnt; print "\nTIME: $cnt\n"; for (sort keys %Jobs) { printf "%-16.16s %-10.10s %u\n", $_, $Jobs{$_}{status}, is_ready($_); } # Clean out the items that are already done @task_list = grep { $Jobs{$_}{status} ne 'DONE' } @task_list; # Each time we wake up, get list of tasks ready to go and fire # them off my @ready = grep { is_ready($_) } @task_list; $Jobs{$_}{status}="RUNNING" for @ready; # Fake code to pretend to monitor tasks & update job list for (@task_list) { $Jobs{$_}{status}="DONE" if $Jobs{$_}{status} eq "RUNNING" and rand() < 0.25; } sleep 1; } sub is_ready { my $task=shift; return 0 if $Jobs{$task}{status} ne 'READY'; return 1 if ! exists $Jobs{$task}{prereqs}; for (@{$Jobs{$task}{prereqs}}) { return 0 if $Jobs{$_}{status} ne 'DONE'; } return 1; }

    Basically, each task knows what its prerequisite tasks are, and will fire off only when all its prerequisites are satisfied. The original also handled error cases by simply restarting the failed jobs after a delay. (On that system, most failures were due to missing files, so re-running was sufficient to fix most problems.) Also, I used a database as the scratchpad, so that the system could recover properly if the system went down and then recovered.

    I'd post the code, but it was C# rather than perl.

    ...roboticus

    When your only tool is a hammer, all problems look like your thumb.

      I like the approach regarding the autonomy of the threads. I scratched out something similar while thinking about this yesterday; however, I felt like I was making the solution more complicated than the problem.

      I'm going to get a cup of coffee and think through you model.

      Thanks, lbe

Re: Threaded Application Sequencing/Rendezvous
by Anonymous Monk on Jul 11, 2012 at 01:59 UTC
    //REMEMBER JOB (123,456),'BLAST FROM YOUR PAST'
    //STEP1 EXEC PGM=PERL
    //SYSIN DD *

      Anon,

      You've got it :) Another grey head around here lol!

      Through in a couple of IEBGENER and some IEF***** and I would be there :)

      lbe

Re: Threaded Application Sequencing/Rendezvous
by RichardK (Priest) on Jul 11, 2012 at 10:33 UTC

    I'd use make or one of its equivalents --there are lots -- possibly ant which is easier to use but needs java.

    make has been around forever and knows how to parallel process and lots of other things too, the dryrun mode can be really useful.

    I don't know if there's a perl only alternative, but I'm sure someone else here will.

      I have also thought using make. It is on my list of fall backs if I can't find a way I like better.

      Thanks, lbe

Re: Threaded Application Sequencing/Rendezvous
by DrHyde (Prior) on Jul 11, 2012 at 10:51 UTC
    It's not entirely clear what you want - for example, does job 1a only depend on job 1, or must job 2 also finish before 1a starts? and does job 3 only depend on job 2a, or must 1a also finish before job 3 starts?

    But that's a minor matter. My weapon of choice for this would be your shell's job control system. It's really good at that sort of thing and you'll have to write less code than you would if you were to do something perlish.

    update: ignore me, I didn't read the bit about wanting reports etc

Re: Threaded Application Sequencing/Rendezvous
by moritz (Cardinal) on Jul 11, 2012 at 16:13 UTC

    Such a system happens to be available in GNU make. You can control the number of commands executed in parallel with the -j option, so make -j4 would start at most 4 processes in parallel.

    If you happen to have a good way to factor everything so that each step produces a file (and reads the files it depends on), this might be an elegant solution.

      Moritz, Thanks for the suggestion. I do like the simplicity of this. I am still trying to keep it all in perl; however, if I don't succeed, make is my first fallback.

      Thanks, lbe

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://980970]
Approved by BrowserUk
Front-paged by BrowserUk
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others chilling in the Monastery: (10)
As of 2014-07-22 20:30 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    My favorite superfluous repetitious redundant duplicative phrase is:









    Results (127 votes), past polls