#!/usr/bin/perl -lw use strict; use threads; use threads::shared; use Thread::Queue; use Thread::Semaphore; use Time::HiRes ('usleep'); my $screenaccess :shared; # Lock for printing to screen my $multi :shared; # Number simultaneous threads my $logline; # Log line content my $logsize; # Log line size (not counting attributes) my @log :shared; # For debugging, incase a thread errors out my $dispatcher :shared; # Job queue my $queuelen; # Max queue length my $throttle :shared; # Queue semaphore my @thread; my $semStdout :shared; $|++; sub tprint{ my $str = shift; my $tid = threads->tid(); lock $semStdout; print "[$tid] $str"; } sub initThreadedOperation { $multi = shift; $queuelen = 1 * $multi; $dispatcher = Thread::Queue->new(); $throttle = Thread::Semaphore->new($queuelen); for( 1 .. $multi ) { push( @thread, threads->create( \&processrefsthread, $_ ) ); } print STDERR scalar(@thread), ' threads created', "\n"; } sub endThreadedOperation { $dispatcher->enqueue((undef) x scalar(@thread)); foreach (@thread) { $_->join(); } } sub processrefsthread { tprint 'Starting'; while (my $job = $dispatcher->dequeue()) { tprint "processing job $job"; $throttle->up(); # Job removed from queue my $d = int(rand(5)); for my $i (0 .. $d * 500000) { my $j = $i * $d; $j = int(rand($j)); } } tprint 'Ending'; } sub queueProcessRequest { my ($job) = @_; $dispatcher->enqueue($job); $throttle->down(); return undef } initThreadedOperation( 4 ); for my $i ( 1 .. 50 ) { tprint "Qing job $i"; queueProcessRequest( $i ); } endThreadedOperation();