Beefy Boxes and Bandwidth Generously Provided by pair Networks vroom
Think about Loose Coupling
 
PerlMonks  

any module with streaming pipe implementation for use with threads

by Rohan_Bhatia (Initiate)
on Dec 20, 2011 at 18:56 UTC ( #944447=perlquestion: print w/ replies, xml ) Need Help??
Rohan_Bhatia has asked for the wisdom of the Perl Monks concerning the following question:

Respected Monks.

I picked up perl almost a year back and now use it for almost all my scripting & gui applications.

Lately I am working on a project, that requires me to stream data between threads and was wondering if there are any modules that implement this. (I am looking for something similar to get/put like in scemi-2 standards)

Either way I put together some code to implement these and would really appreciate any recommendations..

Below code.. api's put & get are the important part. (Posting rest of code for reference Please bear with me :))

# _________________________ _________________________ ______________ +___________ _________________________ #/ \/ \/ + \/ \ #\_________________________/\_________________________/\______________ +___________/\_________________________/ # Author : Rohan Bhatia - (rohan.dd.bhatia@gmail.com|rohan_bhatia@m +entor.com) package communicate; #--------------------------------------------------------------------- +------# # Other STD packages .. #--------------------------------------------------------------------- +------# use strict; use base 'Exporter'; use mUtils; use threads; use threads::shared; #--------------------------------------------------------------------- +------# # VERSION & ISA #--------------------------------------------------------------------- +------# use vars qw($VERSION @ISA); $VERSION = 1.1; #@ISA = qw(); # Not derived from any class. #--------------------------------------------------------------------- +------# # Global Vars.. #--------------------------------------------------------------------- +------# my $objId = 1010; # Variable.. Assign ID to new objects automa +tically my %objTable; #--------------------------------------------------------------------- +------# # Defaults.. for object #--------------------------------------------------------------------- +------# my %Defaults = ( '_debug' => 0, 'pipeDepth' => -1, # -1 for infinite, 0 & 1 not support +ed, will be defaulted to 2. 'pid' => undef, ); ## KeyPat -> Make a pattern to search for matching keys.. my $keyPat = join('\b|\b' , sort keys %Defaults); my %pass; #--------------------------------------------------------------------- +------# # BEGIN block, Import function #--------------------------------------------------------------------- +------# BEGIN { #eval "use "; #die "Problem loading IO::Socket : \n$@\nx---------x\n" if $@; } # The import block allows to overwrite defaults for entire package. # Parameters can still be overwritten, when a new object is created +. sub import { my $package = shift; my @args = @_; while (my ($k,$v) = splice(@args, 0, 2)) { if(exists $Defaults{$k}) { $Defaults{$k} = $v if(defined $v and $v ne ''); } else { $pass{$k} = $v; } } } #--------------------------------------------------------------------- +------# # Constructor.. #--------------------------------------------------------------------- +------# sub new { my $that = shift; my @args = @_; my $class = ref($that) || $that; my $self; my @_dataArray = undef; #-------------------------# # Some more internal vars.. #-------------------------# my (%parms, $k, $v); #params.. assign these, otherwise defaults.. k,v internal proces +sing while (($k,$v) = splice(@args, 0, 2)) { if ($k =~ /^$keyPat$/) { $parms{$k} = $v; } else { $pass{$k} = $v; } } # Assign the params .. foreach (keys %parms, keys %Defaults) { my $key = $_; $self->{$key} = exists $parms{$key} && defined $parms{$key +} ? $parms{$key} : $Defaults{$key}; } # Check pipe/stack depth. If less than 2 than it is 2 if($self->{'pipeDepth'} == 0 or $self->{'pipeDepth'} == 1) { $self->{'pipeDepth'} = 2; printMsg "Unsupported pipe depth, Pipe depth changed to: $ +self->{'pipeDepth'}","Warning"; } #Set self id.. $self->{'pid'} = $objId if(!defined $self->{'pid' +}); $self->{'_phase'} = 1; $self->{'_putInProgress'} = 0; $self->{'_getInProgress'} = 0; $self->{'_stackEmpty'} = 1; $self->{'_stackFull'} = 0; $self->{'_arrPtr'} = \@_dataArray; share($self->{'_putInProgress'}); share($self->{'_getInProgress'}); share($self->{'_stackEmpty'} ); share($self->{'_stackFull'} ); share($self->{'_arrPtr'} ); share(@_dataArray ); bless $self, $class; $self->_reset; $self->_importPackages(%pass); $objTable{$objId} = $self; # push object in object +table $objId++; return $self; } #--------------------------------------------------------------------- +------# # put - (blocking, return 0 on success) #--------------------------------------------------------------------- +------# sub put { my $self = shift; my $data = shift; #1. Check phase compliancy.. if($self->{'_phase'} < 1) { printVerb("Operation denied. Object phase incorrect!","Warnin +g"); return -1; } #2. lock put call. This blocks recursive calls to put and stre +amlines them lock($self->{'_putInProgress'}); $self->{'_putInProgress'} = 1; #3. wait for any get calls to finish. while($self->{'_getInProgress'} and $self->{'_putInProgress'} +< 2 ) { $self->{'_getInProgress'} = 2; # Set this to prevent deadl +ock in put & get last if($self->{'_stackEmpty'}); # Should not be requir +ed.. } #4. check if stack has reached its size limit if(($#{$self->{'_arrPtr'}} >= eval($self->{'pipeDepth'} -1)) a +nd ($self->{'pipeDepth'} != -1)) { lock($self->{'_stackFull'}); $self->{'_stackFull'} = 1; } #5. wait till stack has some space. This makes the call blocki +ng while($self->{'_stackFull'}) { lock($self->{'_stackFull'}); last if($self->{'_stackEmpty'}); } #6. lock the appropriate vars and put the data. lock($self->{'_arrPtr'}); lock($self->{'_stackEmpty'}); push(@{$self->{'_arrPtr'}},$data); $self->{'_putInProgress'} = 0; $self->{'_stackEmpty'} = 0; #7. return the current size of stack. return eval($#{$self->{'arrPtr'}} + 1); } #--------------------------------------------------------------------- +------# # get blocking #--------------------------------------------------------------------- +------# sub get { my $self = shift; #1. Check phase compliancy.. if($self->{'_phase'} < 1) { printVerb("Operation denied. Object phase incorrect!","Warnin +g"); return -1; } #2. lock get call. This blocks recursive calls to get and stre +amlines them lock($self->{'_getInProgress'}); $self->{'_getInProgress'} = 1; #3. wait for any put calls to finish. while($self->{'_putInProgress'} and $self->{'_getInProgress'} +< 2 ) { $self->{'_putInProgress'} = 2; # Set this to prevent dead +lock in put & get last if($self->{'_stackFull'}); # Should not be required.. + } #4. check if stack is empty if($#{$self->{'_arrPtr'}} < 0 ) { lock($self->{'_stackEmpty'}); $self->{'_stackEmpty'} = 1; } #5. wait till stack has some more data. This makes the call bl +ocking while($self->{'_stackEmpty'}) { lock($self->{'_stackEmpty'}); last if($self->{'_stackFull'}); } #6. lock the appropriate vars and get the data. lock($self->{'_arrPtr'}); lock($self->{'_stackFull'}); my $data = pop(@{$self->{'_arrPtr'}}); $self->{'_getInProgress'} = 0; $self->{'_stackFull'} = 0; #7. return the data. return $data; } 1;

Noticeable problems..

1. The while loops keep the cpu busy. I tried to implement using this locks, but was not unable to async unblock a lock. Also is there a way to check if a var is locked?

2. If I create try_put/try_get then may be parent script using these api's can yield threads..

Test application(parent script)..

use base 'Exporter'; use strict; use vars qw($VERSION @ISA @EXPORT); use threads; use threads::shared; use mUtils qw/callerDepth -1/; use communicate; #my @arr; #my @arr3; #share(@arr3); #share(@arr); my $pipe1 = communicate->new(pipeDepth => -1, callerDepth => 1, pid => + 'clientQueue'); my $key = $pipe1->getCid(); printMsg "Got key: $key","Info"; my $tPut1 = threads->create(\&keepPut1); my $tPut2 = threads->create(\&keepPut2); sleep 60; my $tGet1 = threads->create(\&keepGet1); my $tGet2 = threads->create(\&keepGet2); $tPut1->join(); $tPut2->join(); sleep 2; print "\n======================\n"; #print "Array size : $#arr3\n"; #print "Array size : $#arr\n"; print "\n======================\n"; $tGet2->join(); $tGet1->join(); sub keepPut1 { printMsg "In Put1"; my $count = 0; while($count<500) { printMsg "1. Putting Count => $count"; $pipe1->put($count); #lock(@arr3); #push(@arr3,$count); if($count%10==0) { sleep .1;} $count++; #sleep 0.1; } return 0; } sub keepPut2 { printMsg "In Put2"; my $count = 500; while($count<500) { printMsg "2. Putting Count => $count"; $pipe1->put($count); #lock(@arr3); #push(@arr3,$count); $count++; #sleep 0.1; } return 0; } sub keepGet1 { printMsg "In Get 1"; my $pipe = communicate::getObject($key); while(1) { my $countGet = $pipe1->get(); #printMsg "Got count: $countGet"; if(!defined($countGet)) { print "Vola\n"; sleep 10; } else { printMsg "1. Got from pipe => $countGet"; #lock(@arr); #push(@arr,$countGet); } #sleep 0.1; } print "Get1 ends.. "; } sub keepGet2 { printMsg "In Get 2"; my $pipe = communicate::getObject($key); while(1) { my $countGet = $pipe1->get(); #printMsg "Got count: $countGet"; if(!defined($countGet)) { print "Vola\n"; sleep 10; } else { printMsg "2. Got from pipe => $countGet"; #lock(@arr); #push(@arr,$countGet); } #sleep 0.1; } print "Get2 ends.. "; } printErr "---------------------------------";

Comment on any module with streaming pipe implementation for use with threads
Select or Download Code
Re: any module with streaming pipe implementation for use with threads
by mr.nick (Chaplain) on Dec 20, 2011 at 19:38 UTC
    Hi Rohan,

    I haven't used threads in quite a while now, but after reading through your code I do have a couple comments/observations to make. Take them with a grain of salt.

    First of all, are you absolutely certain that you need to roll your own shared stack? Other than controlling the size of the stack, and your 'phases', it looks to me like you could do the same using threads::shared. In fact, it looks like you started that approach. It may be that you are overengineering the project.

    Secondly, I don't believe your communication object, $pipe1, is going to be shared between the two threads. If I recall my perl threading model, each thread receive a copy of the interpreter, with data as-of the creation of the thread. Once that split has been done, they don't share any relationship with each other.

    Here's something that I whipped up real fast. No doubt the locking mechanism isn't correct, but maybe it'll point you towards the correct location:

    use strict; use warnings; use threads; use threads::shared; my @stack: shared; my $thr = threads->create(\&thr2); threads->create(\&thr1)->detach; threads->create(\&thr1)->detach; $thr->join; sub get { threads->yield while scalar @stack == 0; lock(@stack); pop @stack; } sub put { my $val = shift; lock(@stack); push @stack,$val; } sub thr1 { put($_) for 1..500; } sub thr2 { my $cnt; my $val; while ($val = get()) { $cnt++; print "$val ($cnt)\n"; } }

    I can see a few issues with my code, especially if there are multiple getters and putters running at the same time. But you can serialize those operations, in a very brute force kind of way, but creating some shared scalars (eg: $get_in_progress) and locking those.

    Edit: changed code to do a little more work.
    Editx2: do what BrowserUK said.

    mr.nick ...

      Thanks nick, for the suggestions. I started to implement my class the way you have described :), but it fell short when multiple get/put calls are made at the same time. Also I need the calls to be blocking/nb.

      I tested the code over 5M iterations and all the data is popd out, I believe the threads make a copy of pipe1, but the store_array & lock vars are part of a shared location.

Re: any module with streaming pipe implementation for use with threads
by BrowserUk (Pope) on Dec 20, 2011 at 20:14 UTC

    Ditch your code and use Thread::Queue. Your put() method is called $Q->enqueue() and your get() method is called $Q->dequeue().

    It is simple, efficient and won't burn a hole in your desk/server room floor by thrashing your cpu to death.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

    The start of some sanity?

      Thanks BrowseUk, This looks like the thing i was looking for :). Was trying to implement same set of functions. I will try this out.

      Another thing.. can we use them with datatypes that are not shared?

        can we use them with datatypes that are not shared?

        No. Only non-shared scalars which get copied.

        It isn't logical to queue shared datatypes between threads as they can already be seen by all threads. At least all threads that were spawned after they came into existance.(*)

        If I want to pass an array or hash between threads, then I join it to form scalar and split it at the receiver:

        use threads; use Thread::Queue; my $Q = new Thread::Queue; async { while( $Q->dequeue ) { my @array = split $; $_; ... } }->detach; my @array = 1 .. 10; $Q->enqueue( join $;, @array ); ...

        And:

        use threads; use Thread::Queue; my $Q = new Thread::Queue; async { while( $Q->dequeue ) { my %hash = split $; $_; ... } }->detach; my %hash = 'a' .. 'z'; $Q->enqueue( join $;, %hash ); ...

        (*)Subject to visibility.)


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

        The start of some sanity?

        can we use them with datatypes that are not shared?

        In my first reply, I said you can only queue scalars. I was wrong. After a fashion.

        The original Thread::Queue module, which is unfortunately no longer available on CPAN for some reason, was a wonderfully simple module that consisted of just the constructor and four methods: enqueue(), dequeue(), pending() and the little used but sometimes useful dequeue_nb().

        The perfect example of a module that did just what it need to do to satisfy it name, and did it quietly and efficiently. With an interface so simple that you never have to look it up(*see later). It looked like this:

        Simple and beautiful. And it only allowed scalars to be shared.

        But you remember I mentioned never having to look up the docs? Well, long after I posted my first reply, I was watching a movie when I remembered that the module had been upgraded a while back and some new (mis)features were added. I did play with them once, but since I never need them, I'd forgotten about them.

        In addition to sprouting a bunch of unnecessary and definitively un-queue-like methods: peek(), insert() & extract(), it also gained the ability to convey references to arrays and hashes. I remember thinking this was a cute idea when I first saw it. Until I ran a few tests that is.

        This conveys arrays using my join/split method:

        This does the same thing using references:

        And here are a couple of comparative runs of both:

        c:\test>junk51 -T=10 -N=100 -I=1e4 Took 2.544000 seconds 4.742 2.137 0 0 c:\test>junk52 -T=10 -N=100 -I=1e4 Took 22.028000 seconds 16.832 29.296 0 0 c:\test>junk51 -T=10 -N=1000 -I=1e3 Took 2.148000 seconds 3.541 1.809 0 0 c:\test>junk52 -T=10 -N=1000 -I=1e3 Took 21.426000 seconds 15.303 29.156 0 0

        It really is hard justify why queueing array references is 10 times slower than joining and spliting them. Especially if you look at the processor usage rather than wall clock and realise that it takes 55 seconds cpu for the former and only 5 for the latter.

        So, after all that. Yes, you can share other datatypes, but you probably don't want to. At least I don't :)


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

        The start of some sanity?

Re: any module with streaming pipe implementation for use with threads
by cavac (Chaplain) on Dec 20, 2011 at 20:31 UTC

    I can't really comment on your code (don't use threads but preforking in my code, haven't done threads for years), just let me say this:

    While there are some minor indentation problems in your code, i find the code layout and the comprehensive information provided by you most enjoyable. Thank you for caring :-)

    As i said, i use forking/preforking in my bigger projects. For interprocess communication i use two systems in combination: When race conditions don't really matter (either because only one process writes the data or because the data integrity isn't that important), i use memcache because it's quite fast.

    When data integrity really matters and speed has a lower priority than integrity, i generally use a database. ACID compliant databases are a modern marvel when it comes to data integrity.

    It of course all depends what your final goal is and what your application is trying to achieve if you can go one of these routes (or a combination thereof).

    BREW /very/strong/coffee HTTP/1.1
    Host: goodmorning.example.com
    
    418 I'm a teapot

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://944447]
Approved by ww
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others musing on the Monastery: (6)
As of 2014-04-19 16:34 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    April first is:







    Results (483 votes), past polls