Hi, again... :)
use strict;
use warnings;
no warnings 'uninitialized';
BEGIN {
die "this script was not validated on this platform"
unless ($^O =~ /linux/);
}
use IPC::MMA qw(:basic :hash);
use Crypt::xxHash qw(xxhash64);
use Sort::Packed qw(sort_packed);
use Time::HiRes qw(time);
use MCE::Signal qw($tmp_dir -use_dev_shm);
use MCE;
sub usage {
die "Usage: [KEYSIZE=N] [NUM_THREADS=N] [NUM_MAPS=N] perl $0 file...
+\n".
" perl $0 [--keysize=N] [--threads=N] [--maps=N] file...\n
+".
" perl $0 [--keysize=N] [--threads=max] [--maps=max] file.
+..\n";
}
@ARGV or usage();
my $NUM_CPUS = MCE::Util->get_ncpu();
my $KEY_SIZE = $ENV{KEYSIZE} || 12;
my $NUM_THDS = $ENV{NUM_THREADS} || 8;
my $NUM_MAPS = $ENV{NUM_MAPS} || 256;
while ($ARGV[0] =~ /^--?/) {
my $arg = shift;
$KEY_SIZE = $1, next if $arg =~ /-keysize=(\d+)$/;
$NUM_THDS = $1, next if $arg =~ /-threads=(\d+)$/;
$NUM_THDS = $NUM_CPUS, next if $arg =~ /-threads=max$/;
$NUM_MAPS = $1, next if $arg =~ /-maps=(\d+)$/;
$NUM_MAPS = 1024, next if $arg =~ /-maps=max$/;
usage();
}
$NUM_THDS = $NUM_CPUS if $NUM_THDS > $NUM_CPUS;
$NUM_MAPS = 1024 if ($NUM_MAPS > 1024);
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Setup.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
print {*STDERR} "IPC::MMA shared memory hash - start\n";
print {*STDERR} "fixed string length=${KEY_SIZE}, threads=${NUM_THDS},
+ maps=${NUM_MAPS}\n";
our (@MM, @HA);
for my $idx (0 .. $NUM_MAPS - 1) {
$MM[$idx] = mm_create(0, "$tmp_dir/$idx");
$HA[$idx] = mm_make_hash($MM[$idx]);
}
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Get properties.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
my ($start1, $process_count, $num_lines) = (time, 0, 0);
our ($NUM_LINES, %TEMP) = (0); # child vars
my $mce = MCE->new(
max_workers => $NUM_THDS,
chunk_size => 65536,
gather => sub { $num_lines += $_[0] },
posix_exit => 1,
use_slurpio => 1,
user_func => sub {
my ($mce, $slurp_ref, $chunk_id) = @_;
open my $input_fh, '<', $slurp_ref;
while (<$input_fh>) {
my ($key, $count) = split /\t/;
my $idx = xxhash64($key, 0) % $NUM_MAPS;
$TEMP{$idx}{$key} += $count;
$NUM_LINES++;
}
close $input_fh;
},
user_end => sub {
for my $idx (keys %TEMP) {
mm_lock($MM[$idx], MM_LOCK_RW);
while (my ($key, $count) = each %{ $TEMP{$idx} }) {
my $val = mma_hash_fetch($HA[$idx], $key);
mma_hash_store($HA[$idx], $key, $val + $count);
}
mm_unlock($MM[$idx]);
}
MCE->gather($NUM_LINES);
$NUM_LINES = 0, %TEMP = ();
},
);
for my $fname (@ARGV) {
warn("'$fname': Is a directory, skipping\n"), next if (-d $fname);
warn("'$fname': No such file, skipping\n"), next unless (-f $fname
+);
warn("'$fname': Permission denied, skipping\n"), next unless (-r $
+fname);
++$process_count, $mce->process($fname) if (-s $fname);
}
$mce->shutdown; # reap workers
printf {*STDERR} "get properties : %9.3f secs\n", time - $start1;
exit unless $process_count;
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Pack data for sorting.
# Each worker handles a unique hashmap.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
my $VAL_SIZE = length pack('l', 0);
my $STR_SIZE = $KEY_SIZE + 1; # null-terminated
my $PACK_SIZE = $STR_SIZE + $VAL_SIZE;
my $FETCH_SIZE = $PACK_SIZE * 12000;
sub pack_task {
my ($mce, $seq_id, $chunk_id) = @_;
my $ha = $HA[$seq_id];
my $kv_pairs = '';
if ((my $num_keys = mma_hash_scalar($ha)) > 0) {
for my $i (0 .. $num_keys - 1) {
my ($key, $val) = mma_hash_get_entry($ha, $i);
last unless length($key);
$kv_pairs .= pack("lZ${STR_SIZE}", -$val, $key);
}
$mce->gather($num_keys, $kv_pairs);
}
}
my ($start2, $unique, $data) = (time, 0, '');
# Spin up MCE workers to handle packing and output.
$mce = MCE->new(
max_workers => $NUM_THDS,
chunk_size => 1,
init_relay => 1,
posix_exit => 1,
user_func => sub {
my $task = MCE->user_args->[0];
no strict 'refs';
$task->(@_);
},
);
# Pack data for sorting.
$mce->process({
user_args => [ 'pack_task' ],
sequence => [ 0, $NUM_MAPS - 1 ],
chunk_size => 1,
gather => sub {
$unique += $_[0];
$data .= $_[1];
},
});
printf {*STDERR} "pack properties : %9.3f secs\n", time - $start2;
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Output data by value descending, word ascending.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Get the next down value for integer division.
sub divide_down {
my ($dividend, $divisor) = @_;
return int($dividend / $divisor) if $dividend % $divisor;
return int($dividend / $divisor) - 1;
}
# Return a chunk of $data: manager responding to worker request.
sub fetch_chunk {
my ($seq_id) = @_;
return substr($data, $seq_id * $FETCH_SIZE, $FETCH_SIZE);
}
# Worker task: unpack chunk and write directly to standard output.
sub disp_task {
my ($mce, $seq_id, $chunk_id) = @_;
my ($output, $chunk) = ('', $mce->do('fetch_chunk', $seq_id));
while (length $chunk) {
my ($val, $key) = unpack(
"lZ$STR_SIZE",
substr($chunk, 0, $PACK_SIZE, '')
);
$output .= $key. "\t". -($val). "\n";
}
MCE::relay { print $output; };
}
if (length $data) {
my $start3 = time;
sort_packed "C$PACK_SIZE", $data;
printf {*STDERR} "sort packed data : %9.3f secs\n", time - $start3
+;
my $start4 = time;
$mce->process({
user_args => [ 'disp_task' ],
sequence => [ 0, divide_down(length($data), $FETCH_SIZE) ],
chunk_size => 1,
});
printf {*STDERR} "write stdout : %9.3f secs\n", time - $start4
+;
}
$mce->shutdown; # reap workers
@MM = @HA = ();
printf {*STDERR} "total time : %9.3f secs\n", time - $start1;
printf {*STDERR} " count lines : %lu\n", $num_lines;
printf {*STDERR} " count unique : %lu\n", $unique;