use strict; use threads; use Thread::Queue; my $threadcount = 4; =head2 C Launches C<$threadcount> threads that process the items in C in parallel. Returns the input queue, output queue and an array reference to the threads. The aliasing effect of C on C<$_> is not preserved. =cut sub qmap(&;@) { my $cb = shift; my $in = Thread::Queue->new(@_); my $out = Thread::Queue->new(); my $handler = sub { while (defined(my $args = $in->dequeue())) { local $_ = $args; $out->enqueue($cb->()); }; }; my @threads = map { threads->new($handler) } 1..$threadcount; $in,$out,\@threads }; =head2 C Processes a list in parallel and returns the results in the order they were finished. The aliasing effect of C on C<$_> is not preserved. =cut sub pmap(&;@) { my ($in,$out,$threads) = &qmap(@_); $in->enqueue((undef) x scalar @$threads); $_->join for @$threads; return $out->dequeue( scalar @$out ) }; =head2 C Processes a list in parallel and returns the results in the order they were input. This is slightly more processing intensive, as the results are sorted after having been processed. The aliasing effect of C on C<$_> is not preserved. =cut sub smap(&;@) { my $user_cb = shift; my $cb = sub { my $args = $_; local $_ = $args->[1]; [ $args->[0], $user_cb->() ]; }; my $pos = 0; my ($in,$out,$threads) = &qmap($cb, map {[ $pos++, $_ ]} @_); $in->enqueue((undef) x scalar @$threads); $_->join for @$threads; return map { shift @$_; @$_ } sort { $a->[0] <=> $b->[0] } $out->dequeue( scalar @$out ) }; print "Got $_\n" for smap { sleep rand 10; printf "%d %d\n", threads->tid, $_; $_ } (1..10);