#!/usr/bin/perl use strict; use warnings; use threads; use threads::shared; use Thread::Queue; use Time::HiRes qw( usleep time ); use IPC::Open3; my $DEFAULTS = { 'exit_success' => 0, ## success exit. 'exit_failure' => 1, ## failure exit. 'ret_success' => 1, ## success return. 'ret_failure' => 0, ## failure return. 'random' => 10, ## Random amount of time to sleep. 'max_threads' => 10, ## Max # of threads to maintain. 'test_jobs' => 20, ## # of test jobs to run. }; my @q_JOBS :shared; ## shared jobs queue. my $f_TERM :shared = 0; ## main term flag. my $f_TERM_PRINT :shared = 0; ## print term flag. my $l_PRINT :shared; ## print lock my $q_IDLE :shared; ## idle queue. $q_IDLE = Thread::Queue->new(); my $q_PRINT :shared; ## print queue. $q_PRINT = Thread::Queue->new(); local $SIG{'INT'} = local $SIG{'TERM'} = sub { $q_PRINT->enqueue( 'Signaled TERMINATION' ); $f_TERM = 1; $q_IDLE->insert(0, -1); }; local $SIG{'KILL'} = sub { threads->exit(); }; sub main { ## populate a dummy list of jobs to work. ## this could be some server list, or whatever. @q_JOBS = map{ 'JOB-'.$_ }(1..$DEFAULTS->{'test_jobs'}); my $thr_pm = threads->create( \&print_manager )->detach(); for ( 1..$DEFAULTS->{'max_threads'} ) { my $thr_wk = threads->create( \&thread_worker ); } ## end for max_threads. while ( ! $f_TERM ) { usleep( 250_000 ); my $tid = $q_IDLE->dequeue(); last if ( $tid < 0 ); } $q_PRINT->enqueue( 'Shutdown on main started' ); shutdown_engine(); } ## end main. sub shutdown_engine { ## Centralized cleanup and shutdown. $_->join foreach ( threads->list() ); while( $q_PRINT->pending() > 0 ) { usleep( 250_000 ); } $f_TERM_PRINT = 1; exit $DEFAULTS->{'exit_success'}; } sub get_next_job { lock @q_JOBS; return shift @q_JOBS; } sub thread_worker { my $tid = threads->tid(); while ( ( $#q_JOBS > 0 ) && ( ! $f_TERM ) ) { $q_PRINT->enqueue( "Thread $tid is IDLE" ); $q_IDLE->enqueue($tid, 0); my $job = get_next_job(); do_work( $job, $tid ); ## do whatever work... } ## end $q_PRINT->enqueue( "Thread $tid is FINISHED" ); $q_IDLE->insert($tid, -1); return $DEFAULTS->{'ret_success'}; } ## end thread_worker. sub run_external_command { ## serves as a place for extensive ## external command where output ## and secondary processing may ## take place. my ( $job, $tid ) = @_; $q_PRINT->enqueue( "Thread $tid is running exteral job $job" ); local( *w_IN, *w_OUT, *w_ERR ); my $pid = open3( \*w_IN, \*w_OUT, \*w_ERR, '/bin/sleep', int( rand($DEFAULTS->{'random'} ) ) ); waitpid( $pid, 0 ); return $DEFAULTS->{'ret_success'}; } sub do_work { ## do_work could be a wrapper to whatever type ## of work needs to be accomplished. For now, ## I will just sleep. my ($job, $tid) = @_; $q_IDLE->enqueue($tid, 1); $q_PRINT->enqueue( "Thread $tid is WORKING [$job]" ); ## pretend it takes some time before running the ## external command -- i.e. mix shit up a bit. sleep( int( rand( 5 ) ) ); run_external_command( $job, $tid ); $q_PRINT->enqueue( "Thread $tid COMPLETED [$job]" ); return $DEFAULTS->{'ret_success'}; } ## end do_work. sub timestamp { return localtime time; } ## end timestamp. sub print_manager { lock $l_PRINT; while ( ! $f_TERM_PRINT ) { usleep(100_000); print STDOUT '[' . timestamp() . ']: ' . $q_PRINT->dequeue() . "\n"; } threads->exit(); } ## end print_manager. main(); exit $DEFAULTS->{'exit_success'}; __END__