http://www.perlmonks.org?node_id=859302


in reply to Thread Design help

I'd use BrowserUk's approach of having a thread pool and feeding jobs to that pool through a queue. Each thread then returns its results to the main program again through a queue.

You can easily turn your serial program into such a queue-based program after you have a working serial program. You haven't made clear that your existing program already works as a single-threaded program, so I can't give you much working code, but given your vague outline above, I'd use something like this:

#!perl -w use strict; use threads; my $THREADS = 20; my $request = Thread::Queue->new; my $response = Thread::Queue->new; my %dataEntity = ( ... ); # Submit all requests for my $dbname (keys %dataEntity) { $request->enqueue([ $dbname, $dataEntity{ $dbname } ]); }; # Tell each thread that we're done for (1..$THREADS) { $request->enqueue( undef ); }; # Launch our threads for (1..$THREADS) { async(\&getData); }; sub getData { while (my $job = $request->dequeue()) { my ($dbname, $credentials) = @$job; # connect to DB, retrieve information $response->enqueue( "Results from $dbname" ); }; # tell our main thread we're done $response->enqueue( undef ); }; while (my $payload = $response->dequeue or $THREADS--) { print Dumper $payload; };

Replies are listed 'Best First'.
Re^2: Thread Design help
by perlCrazy (Monk) on Sep 08, 2010 at 16:38 UTC
    Thanks for response.
    When i run the code, getting error : Invalid value for shared scalar at /opt/perl-5.8.6_1/lib/5.8.6/Thread/Queue.pm line 90, <> line 10.
    Posting my code here.
    use strict; use threads; use Data::Dumper; use Thread::Queue; my $THREADS = 5; my %dataEntity; while(<>){ chomp; next if !length($_); my ($dsName,$passwd) = split /\|/, $_; $dataEntity{$dsName} = $passwd; } my $request = Thread::Queue->new; my $response = Thread::Queue->new; # Submit all requests for my $dbname (keys %dataEntity) { $request->enqueue([$dbname,$dataEntity{$dbname}]); }; # Tell each thread that we're done for (1..$THREADS) { $request->enqueue(undef); }; # Launch our threads for (1..$THREADS) { async(\&getData); }; sub getData { while (my $job = $request->dequeue()) { my ($dbname, $credentials) = @$job; #connect to DB, retrieve information my $dbh = getConn($dbname,$credentials); my %results; my $resArrRef = $dbh->selectall_arrayref("select srvname,dbnam +e from syslogins",{ Slice => {} }); foreach my $row ( @$resArrRef ) { $results{$row->{srvname}} = $row->{dbname}; } $response->enqueue(\%results); }; # tell our main thread we're done $response->enqueue( undef ); }; while (my $payload = $response->dequeue or $THREADS--) { print Dumper $payload; }

      You don't show &getConn. How am I supposed to debug your code?

      On the off-chance that I'm psychic, I'll still venture a guess. &getConn caches a database handle. Objects cannot be shared across threads, and &getConn tries to access shared data. You will need to connect to a database from within each thread that wants to access it.

      If your process is simply collecting information in parallel, maybe just launch n instances of it instead and have them write to either a database or append to a file. Threads and DBI together are a recipe for disaster if you're not careful.

        here is the code for getConn.
        sub getConn { my ($DB,$pwd) = @_; my $count = 0; my $dbhParm; my $USER = 'sa'; my $api = 'Sybase'; while(!$dbhParm) { if (! ($dbhParm = DBI->connect("dbi:$api:${DB}", $USER, $pwd, {Pri +ntError => 0}))) { warn "Can't connect to ${DB} as \"$USER\"\n$DBI::errstr... Ret +ry after 1 seconds\n"; sleep (1); $count++; if ($count > 2) { print "Connection to the database, ${DB} could not be esta +blished"; } } last if ($count > 2); # Give up after three tries } return $dbhParm; }
        One small clarification: I don't need to share $dbh with other databases. each $dbh will be independent.
        This is kind of collector which will keep running at some interval, bring data from remote server and write to a file.
        There is possiblity of having 1000s of dataserver and we should write data into 1000s of file.
        Thanks
      How the below blocks will get executed ? This is my my first thread program, want to understand in details.
      My requirement : i will have 1000's of data server which will be diveided in different groups. Now if I start this program, will all 1000's of dataserver be processed by 10 or 20 threads.
      Also this has to be keep running based on some interval time. Once we put all serverrs in enqueue, how each thread will pick up the data.
      note: each dataserver will be on different host and port so connection will be independent. Thanks for help.
      # Tell each thread that we're done for (1..$THREADS) { #####what is this block doing $request->enqueue(undef); }; # Launch our threads for (1..$THREADS) { async(\&getData); };

        As an aside, you should maybe consider actually replying to the person posting code instead of replying to yourself. If you reply to another person, that person will get notified of your post.

        The first part of the code is simply a loop that puts one undef for each thread into the queue. As all the thread code is written to stop when it encounters an undef, this serves to tell the threads that they are done. See See Thread::Queue for the ->enqueue command.

        The second part simply launches the subroutine getData in a separate thread. See threads.