#!/usr/bin/perl -l use strict; use threads; use threads::shared; use Thread::Queue; use Time::HiRes qw( usleep time ); use IPC::Open3; use POSIX qw(:errno_h :sys_wait_h); use File::Basename; use FindBin qw( $RealBin $RealScript ); use FileHandle; BEGIN { our $SCRIPT_NAME = $RealScript; our $SCRIPT_DIR = File::Basename::dirname( $RealBin ); # if you have a custom lib, add it. push ( @INC, ( q{.}, qq{$SCRIPT_DIR/lib} ) ); } use constant { RET_SUCCESS => 1, RET_FAILURE => 0, EXIT_SUCCESS => 0, EXIT_FAILURE => 1, TIMEOUT => 4, ## SET YOUR TIMEOUT in seconds THREADS => 2, ## SET the number of threads you want to run. SSH_USER => q{root}, ## Remote user SSH_KEY => qq{$main::SCRIPT_DIR/keys/YOUR_DSS_KEY}, ## authorized keys SSH_CMD => q{/usr/bin/ssh}, SSH_OPT => q{-q -o UserKnownHostsFile=/dev/null } . q{-o StrictHostKeyChecking=no } . q{-o BatchMode=yes } . q{-o ConnectTimeout=10 } . q{-o NoHostAuthenticationForLocalhost=yes } . q{-o PreferredAuthentications=publickey } . q{-o ServerAliveInterval=15 } . q{-o ServerAliveCountMax=4 } . q{-o TCPKeepAlive=no}, }; my %RESULTS :shared; # Stupid example of collective # returned information. my %PROCESS_WATCH :shared; # watch external procs. # pretend like we have obtained some list of servers. my @SERVERS = qw( server01.your.domain.com server02.your.domain.com ); my $semSTD :shared; sub tprint { my $tid = threads->tid; lock $semSTD; print STDOUT q{[} . timestamp() . q{][} . $tid . q{]: }, @_; return RET_SUCCESS; } sub timestamp { return localtime time; } ## end timestamp. my $die_early :shared = 0; $SIG{ INT } = sub { tprint q{Early termination requested}; $die_early = 1; }; sub check_process_signal { my $sig = shift; if ( WIFEXITED($sig) ) { tprint q{process normal exit}; return RET_SUCCESS; } elsif ( WIFSIGNALED($sig) ) { tprint q{process terminated because of signal}; return RET_FAILURE; } elsif ( WIFSTOPPED($sig) ) { tprint q{process is stopped}; return RET_FAILURE; } return RET_SUCCESS; } sub add_to_process_watch { my $pid = shift; lock %PROCESS_WATCH; $PROCESS_WATCH{$pid} = time; return RET_SUCCESS; } sub remove_from_process_watch { my $pid = shift; lock %PROCESS_WATCH; if ( defined $PROCESS_WATCH{$pid} ) { delete $PROCESS_WATCH{$pid}; } return RET_SUCCESS; } sub set_results { lock %RESULTS; ($_[0]) ? $RESULTS{'success'}++ : $RESULTS{'failure'}++; return RET_SUCCESS } sub is_pid_alive { my $pid = shift; my $status = RET_SUCCESS; if ( kill(0, $pid) ) { ## Still alive. $status = RET_SUCCESS; } elsif ( $! == EPERM ) { ## Changed UID. $status = RET_SUCCESS; } elsif ( $! == ESRCH ) { ## Died or zombied. $status = RET_FAILURE; } else { ## Could not locate. $status = RET_FAILURE; } return $status; } ## end is_pid_alive. sub run_command { my $o = { 'debug' => q{}, 'host' => q{}, 'ssh_user' => q{}, 'ssh_key' => q{}, 'cmd' => q{}, 'opt' => q{}, @_, }; my $cmd = q{}; if ( (defined $o->{'host'}) and ($o->{'host'} ne q{}) ) { # Remote command. my $ssh_user = ( $o->{'ssh_user'} ne q{} ) ? $o->{'ssh_user'} : SSH_USER; my $ssh_key = ( $o->{'ssh_key'} ne q{} ) ? $o->{'ssh_key'} : SSH_KEY; $cmd = SSH_CMD . qq{ -i $ssh_key } . SSH_OPT . qq{ $ssh_user\@$o->{'host'} '$o->{'cmd'} $o->{'opt'}'}; } else { # Local command. $cmd = qq{$o->{'cmd'} $o->{'opt'}} } my $hdl = { 'stdin' => FileHandle->new, 'stdout' => FileHandle->new, 'stderr' => FileHandle->new, }; my $pid = eval { open3( $hdl->{'stdin'}, $hdl->{'stdout'}, $hdl->{'stderr'}, qq{$cmd} ) or die $!; }; add_to_process_watch( $pid ); tprint qq{waiting for external process: $pid}; waitpid( $pid, 0 ); my $exit_status = check_process_signal($?); my $h_ret = { 'stdout' => [$hdl->{'stdout'}->getlines()], 'stderr' => [$hdl->{'stderr'}->getlines()], 'status' => $exit_status }; return $h_ret; } sub get_disk_information { my $o = { 'debug' => 0, 'host' => q{}, 'df_opt' => q{-Pk}, @_ }; my $ref_cmd = run_command( 'debug' => $o->{'debug'}, 'cmd' => q{/bin/df}, 'opt' => $o->{'df_opt'}, 'host' => $o->{'host'} ); # one could parse the data as needed # but I will just return the info... return $ref_cmd; } sub get_uname_information { my $o = { 'debug' => 0, 'host' => q{}, 'uname_opt' => q{}, @_ }; my $ref_cmd = run_command( 'debug' => $o->{'debug'}, 'cmd' => q{/bin/uname}, 'opt' => $o->{'uname_opt'}, 'host' => $o->{'host'} ); # one could parse the data as needed # but I will just return the info... return $ref_cmd; } sub worker { my( $Q ) = @_; tprint q{worker started}; while( !$die_early and defined( my $job = $Q->dequeue ) ) { tprint qq{processing job: $job}; my $ref_di = get_disk_information( 'debug' => 0, 'host' => $job, 'df_opt' => q{-Pk /opt} ); # process the return ( $ref_di->{'status'} ) ? tprint q{this thread might continue} : tprint q{this thread might move on to the next job}; # one might just print the data tprint qq{stdout: $job\n}, @{$ref_di->{'stdout'}} if ( $#{$ref_di->{'stdout'}} > 0 ); tprint qq{stderr: $job\n}, @{$ref_di->{'stderr'}} if ( $#{$ref_di->{'stderr'}} > 0 ); # one might just want to know overall status. set_results( $ref_di->{'status'} ); # OBVIOUSLY I could call get_uname_information # and collect and report on that information as # well. Just an example. } ## end while tprint q{Worker ending}; return RET_SUCCESS; } ## end worker. my $semPW :shared; sub process_watcher { lock $semPW; while ( !$die_early ) { usleep( 250_000 ); { lock %PROCESS_WATCH; foreach ( keys %PROCESS_WATCH ) { unless ( ( defined $PROCESS_WATCH{$_} ) and ( is_pid_alive( $_ ) ) and ( ( time - $PROCESS_WATCH{$_} ) > TIMEOUT ) ) { next; } tprint qq{process $_ exceeded timeout } . ( time - $PROCESS_WATCH{$_} ) ; kill( 9, $_ ); $PROCESS_WATCH{$_} = undef; } ## end foreach. } ## end lock. } ## end while. tprint q{process_watcher is terminating}; return RET_SUCCESS; } sub main { %RESULTS = ( 'success' => 0, 'failure' => 0 ); my $Q = new Thread::Queue; $Q->enqueue( @SERVERS ); $Q->enqueue( (undef) x THREADS ); tprint q{queue populated}; my $thr_pw = threads->create( \&process_watcher )->detach; my @threads = map threads->new( \&worker, $Q ), 1 .. THREADS; tprint q{workers started; waiting...}; $_->join for @threads; print STDOUT q{*} x60; print STDOUT q{ Success: }, $RESULTS{'success'}; print STDOUT q{ Failure: }, $RESULTS{'failure'}; print STDOUT q{*} x60; print STDOUT q{Program complete}; return RET_SUCCESS; } main(); exit EXIT_SUCCESS; __END__