Sure, this is the multi-threaded code:
use strict;
use threads;
use Thread::Queue;
use Time::HiRes 'time';
use constant MAXTHREADS => 2;
my $workQueue = Thread::Queue->new();
my $outQueue = Thread::Queue->new();
my @filters;
push @filters, sub { /blahdeblah/ ? 1 : undef } for 1..83;
push @filters, sub { /active/ ? 1 : undef };
push @filters, sub { /anotherfilter/ ? 1 : undef };
my @threads = map { threads->new( \&worker ) } 1..MAXTHREADS;
my $file_name = 'test.txt';
open my $DATF, '<', $file_name;
while ( <$DATF> ) {
$workQueue->enqueue($_);
}
close $DATF;
$workQueue->end();
$_->join for @threads;
$outQueue->end();
my @dat;
while (my $line = $outQueue->dequeue()) {
push @dat, $line;
}
print( time - $^T, "\n" );
sub worker {
while ( my $line = $workQueue->dequeue() ) {
chomp $line;
foreach my $filter (@filters) {
my $newline = $filter->($line) or next;
$outQueue->enqueue($line);
last;
}
}
}
This is the single-threaded code:
use strict;
use Time::HiRes 'time';
my (@dat) = ();
my @filters;
push @filters, sub { /blahdeblah/ ? 1 : undef } for 1..83;
push @filters, sub { /active/ ? 1 : undef };
push @filters, sub { /anotherfilter/ ? 1 : undef };
my $file_name = 'test.txt';
open my $DATF, '<', $file_name;
while( chomp(my $line = <$DATF>) ) {
foreach my $filter (@filters) {
my $newline = $filter->($line) or next;
push (@dat, $line);
last;
}
}
close($DATF);
print( time - $^T, "\n" );
And the data file I used was made up of the following:
active=sync|foo=bar=bam|sync=53|foo=bar=bam|sync=53|foo=bar=bam|sync=5
+3|
foo=bar=bam|sync=53||foo=bar=bam|sync=53
anotherfilter=forest|foo=bar=bam|sync=53|foo=bar=bam|sync=53|foo=bar=b
+am|sync=53|foo=bar=bam|sync=53|foo=bar=bam|sync=53
repeated ~100,000 times, to generate a file of ~300,000 lines at roughly 23MB
I initially came up with this threading model to parse I/B/E/S monthly financial data files, which are pretty hefty (roughly 30GB all up) and require a lot of processing for each line (anywhere between 40 lines of code - 200 lines of code), ultimately winding up with a massively CPU bound operation. Given the ultimate dataset size though (and the fact it is already broken up into multiple files), the final model I went with, which provides the best speed enhancement for this scenario, is a multi-threaded model that divides the work based on files, rather than lines. Splitting on lines was good for individual file processing, but not good enough for overall processing and ultimately not as scalable, due to natural limits on performance enhancement when splitting processing into such tiny work units.
Ultimately, you need to know the underlying environment and task very well to be able to make a good decision about what can / needs to be multi-threaded, how it should be split up and where the sweet spot is for performance enhancement.
edit: Removed use warnings, as I didn't actually run this code with use warnings enabled |