Beefy Boxes and Bandwidth Generously Provided by pair Networks
Perl-Sensitive Sunglasses
 
PerlMonks  

Thread communication

by libvenus (Sexton)
on Aug 27, 2008 at 14:20 UTC ( [id://707168]=perlquestion: print w/replies, xml ) Need Help??

libvenus has asked for the wisdom of the Perl Monks concerning the following question:

Hi Monks,

Is there a way i can let the the compBoss know the contents in the globalQ. The globalQ is being enqueued by the queryWorkers

The application that i m buldling needs parallel prosscessing and more can be read in posts with the title "best strategy" and "Problem in Inter Process Communication".

Thanks,
use strict; use threads; use threads::shared; use Thread::Queue; my @queryList = qw(/dev/files_xls_tmpl_cfg_nonscripts/sql1.clmod.NEW. +txt /dev/files_xls_tmpl_cfg_nonscripts/sql1.clmod.NEW1 +4-78B6R.txt /dev/files_xls_tmpl_cfg_nonscripts/sql1.clmod.NEW14-78B7N. +txt /dev/files_xls_tmpl_cfg_nonscripts/Interest_on_Cash_PATS_c +mt_dt.clmod.txt /dev/files_xls_tmpl_cfg_nonscripts/Interest_on_Cash_PATS_c +mt_dt.clmod14-78B6R.txt /dev/files_xls_tmpl_cfg_nonscripts/Interest_on_Cash_PATS_c +mt_dt.clmod14-78B7N.txt ); print $ARGV[0],"\n"; my $maxnoofQueryThreads = $ARGV[0]; my $maxnoofCompThreads = 2*$ARGV[0]; my $globalQ : shared = new Thread::Queue; my $q = new Thread::Queue; m +y $defSig = undef; my $queryBossID = threads->new(\&queryBoss,$global +Q,$q); my $compBossID = threads->new(\&compBoss,$globalQ,); $queryBossID->join; $compBossID->join; sub queryBoss { print " query boss triggered\n"; my $globalQ = shift; my $q = shift; my @queryworkers = map {threads->new( \&queryworker, $ +q,$globalQ,$_);} 1 .. $maxnoofQueryThreads; $q->enqueue(@queryList); $q->enqueue( ( undef ) x $maxnoofQueryThreads ); $_->join for @queryworkers; } sub compBoss { print " comp boss triggered\n"; my $globalQ = shift; print " i am comp boss and i can see the following items\n",$g +lobalQ->pending; #while(my $workItem = $globalQ->dequeue) { my @compworkers = map {threads->new( \&compworker, $globalQ,$_ +);} 1 .. $maxnoofCompThreads; #$q->enqueue(@queryList); $globalQ->enqueue( ( undef ) x $maxnoofCompThreads ); $_->join for @compworkers; } sub compworker { print " comp worker triggered\n"; my $globalQ = shift; my $id = shift; my $tid = threads->self->tid; print " i am comp worker and i can see the following items\n", +$globalQ->pending; while(my $workItem = $globalQ->dequeue){ print " id $id the file is $workItem \n"; print " i must compare the following file $workItem \n +"; #start comparison on the file and dump result in a new + file and del the file } } sub queryworker { print " query worker triggered\n"; my( $Q ) = shift; my $globalQ = shift; my $id = shift; my $tid = threads->self->tid; while( my $workItem = $Q->dequeue ){ local $/ = undef; #system("client hqsas501 22415 200 \-f $workItem > + $workItem\.prod"); open(HAN,"client hqsas501 22415 200 \-f $workItem +|") || die "$!"; my $prod = <HAN>; #print "prod file slurped $prod\n"; close HAN; #system("client hqsas501 22710 200 \-f $workItem > + $workItem\.test"); open(HAN,"client hqsas501 22710 200 \-f $workItem +|") || die "$!"; my $test = <HAN>; #print "test file slurped $test\n"; close HAN; my $handleProd = $workItem.'prod'; my $handleTest = $workItem.'test'; unless($prod eq $test){ open(HANPRD,">$handleProd") || die "$!"; open(HANTST,">$handleTest") || die "$!"; print " thread it $tid working on $workItem prod an +d test file differ \n"; print HANPRD $prod; print HANTST $test; print " enqueing workitem $workItem\n"; $globalQ->enqueue($workItem); print "$compBossID compbossid\n"; #$compBossID->kill(ALRM); close HANPRD; close HANTST; } print " i am query worker and i can see the following items +\n",$globalQ->pending; } }

Replies are listed 'Best First'.
Re: Thread communication
by jbert (Priest) on Aug 27, 2008 at 15:36 UTC
    Here's some working queuer/consumer code based on Thread::Queue. Hope it helps.~
    #!/usr/bin/perl use threads; use threads::shared; use Thread::Queue; my $q : shared; use constant FINISH => 'finish'; main(); exit 0; sub main { $q = Thread::Queue->new(); my $queuer = threads->create('queuer', 10); my $worker = threads->create('worker'); $queuer->join; $worker->join; print "main thread finished\n"; } sub queuer { my $numItems = shift; my $str = "foo"; while ($numItems-- > 0) { print "queueing $str\n"; $q->enqueue($str); $str++; sleep 1; } $q->enqueue(FINISH); print "queuer finished\n"; } sub worker { DOING_WORK: while (1) { while (my $pending = $q->pending) { my $str = $q->dequeue; print "worker found $pending items, got $str\n"; last DOING_WORK if $str eq FINISH; } sleep 3; } print "worker finished\n"; }
Re: Thread communication
by gone2015 (Deacon) on Aug 27, 2008 at 16:02 UTC
    Is there a way i can let the the compBoss know the contents in the globalQ. The globalQ is being enqueued by the queryWorkers

    I don't think that's the problem here.

    The problem is that compBoss is filling the globalQ with enough undef entries to kill off the compworker before the queryworkers have had time to process all the queries and queue their results on globalQ.

    It looks like race that the queryworkers are unlikely to win.

    Since whatever the compworker does seems to follow on from what the queryworker does, I'm not sure why you need those pieces of work in separate threads ? It would certainly simplify things if they weren't.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://707168]
Approved by Corion
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others imbibing at the Monastery: (4)
As of 2024-04-19 06:21 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found