Beefy Boxes and Bandwidth Generously Provided by pair Networks
Just another Perl shrine
 
PerlMonks  

Re^2: Thread Design help

by perlCrazy (Monk)
on Sep 08, 2010 at 16:38 UTC ( [id://859361]=note: print w/replies, xml ) Need Help??


in reply to Re: Thread Design help
in thread Thread Design help

Thanks for response.
When i run the code, getting error : Invalid value for shared scalar at /opt/perl-5.8.6_1/lib/5.8.6/Thread/Queue.pm line 90, <> line 10.
Posting my code here.
use strict; use threads; use Data::Dumper; use Thread::Queue; my $THREADS = 5; my %dataEntity; while(<>){ chomp; next if !length($_); my ($dsName,$passwd) = split /\|/, $_; $dataEntity{$dsName} = $passwd; } my $request = Thread::Queue->new; my $response = Thread::Queue->new; # Submit all requests for my $dbname (keys %dataEntity) { $request->enqueue([$dbname,$dataEntity{$dbname}]); }; # Tell each thread that we're done for (1..$THREADS) { $request->enqueue(undef); }; # Launch our threads for (1..$THREADS) { async(\&getData); }; sub getData { while (my $job = $request->dequeue()) { my ($dbname, $credentials) = @$job; #connect to DB, retrieve information my $dbh = getConn($dbname,$credentials); my %results; my $resArrRef = $dbh->selectall_arrayref("select srvname,dbnam +e from syslogins",{ Slice => {} }); foreach my $row ( @$resArrRef ) { $results{$row->{srvname}} = $row->{dbname}; } $response->enqueue(\%results); }; # tell our main thread we're done $response->enqueue( undef ); }; while (my $payload = $response->dequeue or $THREADS--) { print Dumper $payload; }

Replies are listed 'Best First'.
Re^3: Thread Design help
by Corion (Patriarch) on Sep 08, 2010 at 17:24 UTC

    You don't show &getConn. How am I supposed to debug your code?

    On the off-chance that I'm psychic, I'll still venture a guess. &getConn caches a database handle. Objects cannot be shared across threads, and &getConn tries to access shared data. You will need to connect to a database from within each thread that wants to access it.

    If your process is simply collecting information in parallel, maybe just launch n instances of it instead and have them write to either a database or append to a file. Threads and DBI together are a recipe for disaster if you're not careful.

      here is the code for getConn.
      sub getConn { my ($DB,$pwd) = @_; my $count = 0; my $dbhParm; my $USER = 'sa'; my $api = 'Sybase'; while(!$dbhParm) { if (! ($dbhParm = DBI->connect("dbi:$api:${DB}", $USER, $pwd, {Pri +ntError => 0}))) { warn "Can't connect to ${DB} as \"$USER\"\n$DBI::errstr... Ret +ry after 1 seconds\n"; sleep (1); $count++; if ($count > 2) { print "Connection to the database, ${DB} could not be esta +blished"; } } last if ($count > 2); # Give up after three tries } return $dbhParm; }

        The following code works for me.

        use strict; use threads; use Data::Dumper; use Thread::Queue; warn "Using threads $threads::VERSION"; warn "Using Thread::Queue $Thread::Queue::VERSION"; my $THREADS = 2; my %dataEntity; while(<>){ chomp; next if !length($_); my ($dsName,$passwd) = split /\|/, $_; $dataEntity{$dsName} = $passwd; } my $request = Thread::Queue->new; my $response = Thread::Queue->new; # Submit all requests for my $dbname (keys %dataEntity) { $request->enqueue([$dbname,$dataEntity{$dbname}]); }; sub getConn {}; # Tell each thread that we're done for (1..$THREADS) { $request->enqueue(undef); }; # Launch our threads for (1..$THREADS) { async(\&getData); }; sub getData { my $idx = 1; while (my $job = $request->dequeue()) { my ($dbname, $credentials) = @$job; #connect to DB, retrieve information my $dbh = getConn($dbname,$credentials); my %results; #my $resArrRef = $dbh->selectall_arrayref("select srvname,dbna +me from syslogins",{ Slice => {} }); # package some dummy results my $resArrRef = [ { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, { srvname => "server:$dbname:".$idx++, dbname => $dbname, +}, ]; foreach my $row ( @$resArrRef ) { $results{$row->{srvname}} = $row->{dbname}; } $response->enqueue(\%results); }; # tell our main thread we're done $response->enqueue( undef ); }; while ($THREADS) { while (my $payload = $response->dequeue()) { print Dumper $payload; }; $THREADS-- };

        Output

        Using threads 1.73 at tmp.pl line 6. Using Thread::Queue 2.11 at tmp.pl line 7. qwe asd yxc zui jjj ^Z $VAR1 = { 'server:asd:4' => 'asd', 'server:asd:2' => 'asd', 'server:asd:1' => 'asd', 'server:asd:3' => 'asd' }; $VAR1 = { 'server:zui:5' => 'zui', 'server:zui:7' => 'zui', 'server:zui:8' => 'zui', 'server:zui:6' => 'zui' }; $VAR1 = { 'server:jjj:9' => 'jjj', 'server:jjj:11' => 'jjj', 'server:jjj:10' => 'jjj', 'server:jjj:12' => 'jjj' }; $VAR1 = { 'server:yxc:14' => 'yxc', 'server:yxc:15' => 'yxc', 'server:yxc:13' => 'yxc', 'server:yxc:16' => 'yxc' }; $VAR1 = { 'server:qwe:18' => 'qwe', 'server:qwe:19' => 'qwe', 'server:qwe:20' => 'qwe', 'server:qwe:17' => 'qwe' }; Perl exited with active threads: 0 running and unjoined 2 finished and unjoined 0 running and detached
      One small clarification: I don't need to share $dbh with other databases. each $dbh will be independent.
      This is kind of collector which will keep running at some interval, bring data from remote server and write to a file.
      There is possiblity of having 1000s of dataserver and we should write data into 1000s of file.
      Thanks
Re^3: Thread Design help
by perlCrazy (Monk) on Sep 11, 2010 at 06:46 UTC
    How the below blocks will get executed ? This is my my first thread program, want to understand in details.
    My requirement : i will have 1000's of data server which will be diveided in different groups. Now if I start this program, will all 1000's of dataserver be processed by 10 or 20 threads.
    Also this has to be keep running based on some interval time. Once we put all serverrs in enqueue, how each thread will pick up the data.
    note: each dataserver will be on different host and port so connection will be independent. Thanks for help.
    # Tell each thread that we're done for (1..$THREADS) { #####what is this block doing $request->enqueue(undef); }; # Launch our threads for (1..$THREADS) { async(\&getData); };

      As an aside, you should maybe consider actually replying to the person posting code instead of replying to yourself. If you reply to another person, that person will get notified of your post.

      The first part of the code is simply a loop that puts one undef for each thread into the queue. As all the thread code is written to stop when it encounters an undef, this serves to tell the threads that they are done. See See Thread::Queue for the ->enqueue command.

      The second part simply launches the subroutine getData in a separate thread. See threads.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://859361]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others drinking their drinks and smoking their pipes about the Monastery: (4)
As of 2024-04-23 16:28 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found