Respected Monks.
I picked up perl almost a year back and now use it for almost all my scripting & gui applications.
Lately I am working on a project, that requires me to stream data between threads and was wondering if there are any modules that implement this. (I am looking for something similar to get/put like in scemi-2 standards)
Either way I put together some code to implement these and would really appreciate any recommendations..
Below code.. api's put & get are the important part. (Posting rest of code for reference Please bear with me :))
# _________________________ _________________________ ______________
+___________ _________________________
#/ \/ \/
+ \/ \
#\_________________________/\_________________________/\______________
+___________/\_________________________/
# Author : Rohan Bhatia - (rohan.dd.bhatia@gmail.com|rohan_bhatia@m
+entor.com)
package communicate;
#---------------------------------------------------------------------
+------#
# Other STD packages ..
#---------------------------------------------------------------------
+------#
use strict;
use base 'Exporter';
use mUtils;
use threads;
use threads::shared;
#---------------------------------------------------------------------
+------#
# VERSION & ISA
#---------------------------------------------------------------------
+------#
use vars qw($VERSION @ISA);
$VERSION = 1.1;
#@ISA = qw(); # Not derived from any class.
#---------------------------------------------------------------------
+------#
# Global Vars..
#---------------------------------------------------------------------
+------#
my $objId = 1010; # Variable.. Assign ID to new objects automa
+tically
my %objTable;
#---------------------------------------------------------------------
+------#
# Defaults.. for object
#---------------------------------------------------------------------
+------#
my %Defaults = (
'_debug' => 0,
'pipeDepth' => -1, # -1 for infinite, 0 & 1 not support
+ed, will be defaulted to 2.
'pid' => undef,
);
## KeyPat -> Make a pattern to search for matching keys..
my $keyPat = join('\b|\b' , sort keys %Defaults);
my %pass;
#---------------------------------------------------------------------
+------#
# BEGIN block, Import function
#---------------------------------------------------------------------
+------#
BEGIN {
#eval "use ";
#die "Problem loading IO::Socket : \n$@\nx---------x\n" if $@;
}
# The import block allows to overwrite defaults for entire package.
# Parameters can still be overwritten, when a new object is created
+.
sub import {
my $package = shift;
my @args = @_;
while (my ($k,$v) = splice(@args, 0, 2)) {
if(exists $Defaults{$k}) {
$Defaults{$k} = $v if(defined $v and $v ne '');
} else {
$pass{$k} = $v;
}
}
}
#---------------------------------------------------------------------
+------#
# Constructor..
#---------------------------------------------------------------------
+------#
sub new {
my $that = shift;
my @args = @_;
my $class = ref($that) || $that;
my $self;
my @_dataArray = undef;
#-------------------------#
# Some more internal vars..
#-------------------------#
my (%parms, $k, $v);
#params.. assign these, otherwise defaults.. k,v internal proces
+sing
while (($k,$v) = splice(@args, 0, 2)) {
if ($k =~ /^$keyPat$/) {
$parms{$k} = $v;
} else {
$pass{$k} = $v;
}
}
# Assign the params ..
foreach (keys %parms, keys %Defaults) {
my $key = $_;
$self->{$key} = exists $parms{$key} && defined $parms{$key
+} ?
$parms{$key} : $Defaults{$key};
}
# Check pipe/stack depth. If less than 2 than it is 2
if($self->{'pipeDepth'} == 0 or $self->{'pipeDepth'} == 1) {
$self->{'pipeDepth'} = 2;
printMsg "Unsupported pipe depth, Pipe depth changed to: $
+self->{'pipeDepth'}","Warning";
}
#Set self id..
$self->{'pid'} = $objId if(!defined $self->{'pid'
+});
$self->{'_phase'} = 1;
$self->{'_putInProgress'} = 0;
$self->{'_getInProgress'} = 0;
$self->{'_stackEmpty'} = 1;
$self->{'_stackFull'} = 0;
$self->{'_arrPtr'} = \@_dataArray;
share($self->{'_putInProgress'});
share($self->{'_getInProgress'});
share($self->{'_stackEmpty'} );
share($self->{'_stackFull'} );
share($self->{'_arrPtr'} );
share(@_dataArray );
bless $self, $class;
$self->_reset;
$self->_importPackages(%pass);
$objTable{$objId} = $self; # push object in object
+table
$objId++;
return $self;
}
#---------------------------------------------------------------------
+------#
# put - (blocking, return 0 on success)
#---------------------------------------------------------------------
+------#
sub put {
my $self = shift;
my $data = shift;
#1. Check phase compliancy..
if($self->{'_phase'} < 1) {
printVerb("Operation denied. Object phase incorrect!","Warnin
+g");
return -1;
}
#2. lock put call. This blocks recursive calls to put and stre
+amlines them
lock($self->{'_putInProgress'});
$self->{'_putInProgress'} = 1;
#3. wait for any get calls to finish.
while($self->{'_getInProgress'} and $self->{'_putInProgress'}
+< 2 ) {
$self->{'_getInProgress'} = 2; # Set this to prevent deadl
+ock in put & get
last if($self->{'_stackEmpty'}); # Should not be requir
+ed..
}
#4. check if stack has reached its size limit
if(($#{$self->{'_arrPtr'}} >= eval($self->{'pipeDepth'} -1)) a
+nd ($self->{'pipeDepth'} != -1)) {
lock($self->{'_stackFull'});
$self->{'_stackFull'} = 1;
}
#5. wait till stack has some space. This makes the call blocki
+ng
while($self->{'_stackFull'}) {
lock($self->{'_stackFull'});
last if($self->{'_stackEmpty'});
}
#6. lock the appropriate vars and put the data.
lock($self->{'_arrPtr'});
lock($self->{'_stackEmpty'});
push(@{$self->{'_arrPtr'}},$data);
$self->{'_putInProgress'} = 0;
$self->{'_stackEmpty'} = 0;
#7. return the current size of stack.
return eval($#{$self->{'arrPtr'}} + 1);
}
#---------------------------------------------------------------------
+------#
# get blocking
#---------------------------------------------------------------------
+------#
sub get {
my $self = shift;
#1. Check phase compliancy..
if($self->{'_phase'} < 1) {
printVerb("Operation denied. Object phase incorrect!","Warnin
+g");
return -1;
}
#2. lock get call. This blocks recursive calls to get and stre
+amlines them
lock($self->{'_getInProgress'});
$self->{'_getInProgress'} = 1;
#3. wait for any put calls to finish.
while($self->{'_putInProgress'} and $self->{'_getInProgress'}
+< 2 ) {
$self->{'_putInProgress'} = 2; # Set this to prevent dead
+lock in put & get
last if($self->{'_stackFull'}); # Should not be required..
+
}
#4. check if stack is empty
if($#{$self->{'_arrPtr'}} < 0 ) {
lock($self->{'_stackEmpty'});
$self->{'_stackEmpty'} = 1;
}
#5. wait till stack has some more data. This makes the call bl
+ocking
while($self->{'_stackEmpty'}) {
lock($self->{'_stackEmpty'});
last if($self->{'_stackFull'});
}
#6. lock the appropriate vars and get the data.
lock($self->{'_arrPtr'});
lock($self->{'_stackFull'});
my $data = pop(@{$self->{'_arrPtr'}});
$self->{'_getInProgress'} = 0;
$self->{'_stackFull'} = 0;
#7. return the data.
return $data;
}
1;
Noticeable problems..
1. The while loops keep the cpu busy. I tried to implement using this locks, but was not unable to async unblock a lock. Also is there a way to check if a var is locked?
2. If I create try_put/try_get then may be parent script using these api's can yield threads..
Test application(parent script)..
use base 'Exporter';
use strict;
use vars qw($VERSION @ISA @EXPORT);
use threads;
use threads::shared;
use mUtils qw/callerDepth -1/;
use communicate;
#my @arr;
#my @arr3;
#share(@arr3);
#share(@arr);
my $pipe1 = communicate->new(pipeDepth => -1, callerDepth => 1, pid =>
+ 'clientQueue');
my $key = $pipe1->getCid();
printMsg "Got key: $key","Info";
my $tPut1 = threads->create(\&keepPut1);
my $tPut2 = threads->create(\&keepPut2);
sleep 60;
my $tGet1 = threads->create(\&keepGet1);
my $tGet2 = threads->create(\&keepGet2);
$tPut1->join();
$tPut2->join();
sleep 2;
print "\n======================\n";
#print "Array size : $#arr3\n";
#print "Array size : $#arr\n";
print "\n======================\n";
$tGet2->join();
$tGet1->join();
sub keepPut1 {
printMsg "In Put1";
my $count = 0;
while($count<500) {
printMsg "1. Putting Count => $count";
$pipe1->put($count);
#lock(@arr3);
#push(@arr3,$count);
if($count%10==0) { sleep .1;}
$count++;
#sleep 0.1;
}
return 0;
}
sub keepPut2 {
printMsg "In Put2";
my $count = 500;
while($count<500) {
printMsg "2. Putting Count => $count";
$pipe1->put($count);
#lock(@arr3);
#push(@arr3,$count);
$count++;
#sleep 0.1;
}
return 0;
}
sub keepGet1 {
printMsg "In Get 1";
my $pipe = communicate::getObject($key);
while(1) {
my $countGet = $pipe1->get();
#printMsg "Got count: $countGet";
if(!defined($countGet)) {
print "Vola\n";
sleep 10;
} else {
printMsg "1. Got from pipe => $countGet";
#lock(@arr);
#push(@arr,$countGet);
}
#sleep 0.1;
}
print "Get1 ends.. ";
}
sub keepGet2 {
printMsg "In Get 2";
my $pipe = communicate::getObject($key);
while(1) {
my $countGet = $pipe1->get();
#printMsg "Got count: $countGet";
if(!defined($countGet)) {
print "Vola\n";
sleep 10;
} else {
printMsg "2. Got from pipe => $countGet";
#lock(@arr);
#push(@arr,$countGet);
}
#sleep 0.1;
}
print "Get2 ends.. ";
}
printErr "---------------------------------";