Beefy Boxes and Bandwidth Generously Provided by pair Networks
go ahead... be a heretic

Re: Resource pools and concurrency

by BrowserUk (Pope)
on Jun 27, 2007 at 12:03 UTC ( #623598=note: print w/replies, xml ) Need Help??

in reply to Resource pools and concurrency

Dare I suggest threads? The following demos a simple architecture that might work for you.

It starts one thread per registry that makes a permanent, blocking connection to that registry. Each thread then enters a blocking loop reading from a dedicated (work) queue of requests.

Once the agent threads are running, the main thread creates a local, non-blocking, listening thread and loops over that and another (replies) queue.

When an inbound connection is made, it reads the request and enqueues it to the appropriate registry agent thread via its dedicate work queue.

When a reply is received from the replies queue, it is sent back to the requestor and the connection is dropped.

The only shared state or locking is inside the queues and these are unidirectional so there is no possibility of deadlocking, priority inversions or other nasties. Each registry agent object and connection are used only with a single thread.

There is one thread per registry, uses ~1.5 MB (less if you tailor the stack requirements), they run for the life of the connection and are blocking so no great resource is consumed. In my tests, around 1MB/agent and cpu usage never gets off zero no matter how hard I drive it.

Some sample code with dummied up agents that just sleep a bit and give back a stock answer:

#! perl -slw use strict; package Registry::Agent; sub new { my( $class, %config ) = @_; return bless \%config, $class; } sub makeRequest{ die "makeRequest for class ${ ref $_[0] } not reified"; } package Registry::Agent::A; use base 'Registry::Agent'; sub makeRequest{ sleep rand 30; return "The Answer from A"; } package Registry::Agent::B; use base 'Registry::Agent'; sub makeRequest{ sleep rand 30; return "The Answer from B"; } package Registry::Agent::C; use base 'Registry::Agent'; sub makeRequest{ sleep rand 30; return "The Answer from C"; } package main; use threads; use Thread::Queue; sub registryAgent { my( $class, $Qin, $Qout, %otherConfig ) = @_; #require $class; ## Disabled for mocked up agents. my $agent = $class->new( %otherConfig ) or die "Couldn't create '$class' object\n"; while( my $workItem = $Qin->dequeue ) { my( $tag, $request ) = unpack 'N/A* N/A*', $workItem; warn "$class: Got request for $tag:$request\n"; my $result = $agent->makeRequest( $request ); warn "$class: returning $result for $tag\n"; $Qout->enqueue( pack 'N/A* N/A*', $tag, $result ); } } my $Qresults = new Thread::Queue; my %workQs; my @agents = map{ my $Qwork = new Thread::Queue; $workQs{ $_ } = $Qwork; threads->create( \&registryAgent, $_, $Qwork, $Qresults, configA => 1, configB => 2 ); } qw[ Registry::Agent::A Registry::Agent::B Registry::Agent::C ]; require IO::Socket::INET; my $listener = IO::Socket::INET->new( LocalAddr => 'localhost:33333', Listen => 100, Blocking => 0, ) or die "Couldn't listen on localhost:33333; $!"; ioctl( $listener, 0x8004667e, \1 ); ## non-blocking on Win32 my $done = 0; $SIG{ INT } = sub{ shutdown( $listener, 0 ); ## Stop listening $_->enqueue( undef ) for values %workQs; ## Signal termination +to threads $_->join for @agents; ## Perform last rites. $done = 1; ## Tell main thread we +'re done. }; my %clients; while( !$done ) { if( my $client = $listener->accept ) { warn "main: Client:$client connected\n"; my $fno = fileno $client; $clients{ $fno } = $client; defined( my $request = <$client> ) or warn "Read failed: $!\n" and next; my( $agent, $rest ) = split ':', $request, 2; warn "main: Client: $client requests Agent:$agent Request:$res +t\n"; $workQs{ "Registry::Agent::$agent" }->enqueue( pack "N/A* N/A*", $fno, $rest ); } elsif( my $result = $Qresults->dequeue_nb ) { my( $fno, $reply ) = unpack 'N/A* N/A*', $result; warn "Got reply for request tag:$fno; sending...\n"; print { $clients{ $fno } } $reply; close delete $clients{ $fno }; } else { sleep 1; } } close $listener;

Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.

Replies are listed 'Best First'.
Re^2: Resource pools and concurrency
by mattk (Pilgrim) on Jun 28, 2007 at 06:55 UTC
    This is exactly what I needed :) After hacking on it for most of the day I've got a thread pool for interacting with the clients via RPC, and another for interacting with the registries. Works beautifully!

Log In?

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://623598]
and all is quiet...

How do I use this? | Other CB clients
Other Users?
Others rifling through the Monastery: (7)
As of 2017-06-29 11:02 GMT
Find Nodes?
    Voting Booth?
    How many monitors do you use while coding?

    Results (659 votes). Check out past polls.