http://www.perlmonks.org?node_id=859377


in reply to Re^4: Thread Design help
in thread Thread Design help

The following code works for me.

use strict; use threads; use Data::Dumper; use Thread::Queue; warn "Using threads $threads::VERSION"; warn "Using Thread::Queue $Thread::Queue::VERSION"; my $THREADS = 2; my %dataEntity; while(<>){ chomp; next if !length($_); my ($dsName,$passwd) = split /\|/, $_; $dataEntity{$dsName} = $passwd; } my $request = Thread::Queue->new; my $response = Thread::Queue->new; # Submit all requests for my $dbname (keys %dataEntity) { $request->enqueue([$dbname,$dataEntity{$dbname}]); }; sub getConn {}; # Tell each thread that we're done for (1..$THREADS) { $request->enqueue(undef); }; # Launch our threads for (1..$THREADS) { async(\&getData); }; sub getData { my $idx = 1; while (my $job = $request->dequeue()) { my ($dbname, $credentials) = @$job; #connect to DB, retrieve information my $dbh = getConn($dbname,$credentials); my %results; #my $resArrRef = $dbh->selectall_arrayref("select srvname,dbna +me from syslogins",{ Slice => {} }); # package some dummy results my $resArrRef = [ { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, ]; foreach my $row ( @$resArrRef ) { $results{$row->{srvname}} = $row->{dbname}; } $response->enqueue(\%results); }; # tell our main thread we're done $response->enqueue( undef ); }; while ($THREADS) { while (my $payload = $response->dequeue()) { print Dumper $payload; }; $THREADS-- };

Output

Using threads 1.73 at tmp.pl line 6. Using Thread::Queue 2.11 at tmp.pl line 7. qwe asd yxc zui jjj ^Z $VAR1 = { 'server:asd:4' => 'asd', 'server:asd:2' => 'asd', 'server:asd:1' => 'asd', 'server:asd:3' => 'asd' }; $VAR1 = { 'server:zui:5' => 'zui', 'server:zui:7' => 'zui', 'server:zui:8' => 'zui', 'server:zui:6' => 'zui' }; $VAR1 = { 'server:jjj:9' => 'jjj', 'server:jjj:11' => 'jjj', 'server:jjj:10' => 'jjj', 'server:jjj:12' => 'jjj' }; $VAR1 = { 'server:yxc:14' => 'yxc', 'server:yxc:15' => 'yxc', 'server:yxc:13' => 'yxc', 'server:yxc:16' => 'yxc' }; $VAR1 = { 'server:qwe:18' => 'qwe', 'server:qwe:19' => 'qwe', 'server:qwe:20' => 'qwe', 'server:qwe:17' => 'qwe' }; Perl exited with active threads: 0 running and unjoined 2 finished and unjoined 0 running and detached

Replies are listed 'Best First'.
Re^6: Thread Design help
by perlCrazy (Monk) on Sep 08, 2010 at 18:18 UTC
    do you think, this is causing problem ?
    do i need to get latest version of modules:
    Using threads 1.05 at ./thread2.pl line 9. Using Thread::Queue 2.00 at ./thread2.pl line 10. Invalid value for shared scalar at /opt/perl-5.8.6_1/lib/5.8.6/Thread/Queue.pm line 90, <> line 10.
    which is the most stable version of perl for thread purpose ?

      The combination of this line:

      $request->enqueue( [$dbname,$dataEntity{$dbname}] );

      And the very down-level version of threads you are using is almost certainly the source of your problem.

      You might be able to fix it without upgrading by changing the code to be:

      use threads::shared; ... for my $dbname (keys %dataEntity) { my @args :shared = ( $dbname,$dataEntity{$dbname} ); $request->enqueue( \@args ); };

      But I would still suggest upgrading threads, threads::shared and Thread::Queue ASAP.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.
        Thanks for reply and help Browser UK.
        Need you help and opinion for disigning this collector.
        Requirement: I will have many dataservers and host that will be divided in multiple groups( wil contain dataservers). We need to connect to each of dataserver/host and execute some commands/utility and process the data, bring back the data and write into a file.
        ex: DSA1,DSA2...DSAN
        1. conenct to DSA1, process and get the data and write into a file
        2. once we get the data back from DSA1, may be after 1/2 hour will need to conenct again and get back data...this will continue depending on interval time.
        above step 1 and 2 should run for many servers ( may be 1000s). Do you think thread queue is best way for this requirement ?
        is below code is the best way to move forward ? how will I run the job again for particular group or DSA on demand basis?
        Thanks a lot for your help in advance. any suggestions/directions will help.
        #!/usr/bin/perl use strict; use threads; use Data::Dumper; use Thread::Queue; warn "Using threads $threads::VERSION"; warn "Using Thread::Queue $Thread::Queue::VERSION"; my $THREADS = 3; my %dataEntity; while(<>){ chomp; next if !length($_); my ($dsName,$passwd) = split /\|/, $_; $dataEntity{$dsName} = $passwd; } my $request = Thread::Queue->new; my $response = Thread::Queue->new; # Submit all requests for my $dbname (keys %dataEntity) { $request->enqueue([$dbname,$dataEntity{$dbname}]); }; # Tell each thread that we're done for (1..$THREADS) { $request->enqueue(undef); }; # Launch our threads for (1..$THREADS) { async(\&getData); }; sub getData { ## my $idx = 1; while (my $job = $request->dequeue()) { my ($dbname, $credentials) = @$job; #connect to DB, retrieve information #my $dbh = getConn($dbname,$credentials); my %results; #my $resArrRef = $dbh->selectall_arrayref("select srvname,dbna +me from syslogins",{ Slice => {} }); # package some dummy results my $resArrRef = [ { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, ]; foreach my $row ( @$resArrRef ) { $results{$row->{srvname}} = $row->{dbname}; } $response->enqueue(\%results); } # tell our main thread we're done $response->enqueue( undef ); ## } while ($THREADS) { while (my $payload = $response->dequeue()) { print Dumper $payload; }; $THREADS-- } sub getConn { my ($DB,$pwd) = @_; return $dbh; }

      Well, I use the following versions:

      Using threads 1.73 at tmp.pl line 6. Using Thread::Queue 2.11 at tmp.pl line 7.

      So maybe just upgrading Thread::Queue or upgrading threads, or upgrading both might help. The posted code works for me, that's all I can say.