Hi, I'm adding another to the mix.
##
# This demonstration requires Tokyo Cabinet.
# homepage http://fallabs.com/tokyocabinet/
# documentation http://fallabs.com/tokyocabinet/perldoc/
#
# Installation:
# wget http://fallabs.com/tokyocabinet/tokyocabinet-1.4.48.tar.gz
# wget http://fallabs.com/tokyocabinet/perlpkg/tokyocabinet-perl-1.3
+4.tar.gz
#
# tar xzf tokyocabinet-1.4.48.tar.gz && cd tokyocabinet-1.4.48
# ./configure --disable-bzip # enabling requires bzip dev pkg
# make
# make install # Note: you may need to use "sudo"
# cd ..
#
# tar xzf tokyocabinet-perl-1.34.tar.gz && cd tokyocabinet-perl-1.34
# perl Makefile.PL
# make
# make install # Note: you may need to use "sudo"
# cd ..
##
use strict;
use warnings;
no warnings 'uninitialized';
use TokyoCabinet;
use Crypt::xxHash qw(xxhash64);
use Sort::Packed qw(sort_packed);
use Time::HiRes qw(time);
use MCE::Signal qw($tmp_dir -use_dev_shm);
use MCE;
sub usage {
die "Usage: [KEYSIZE=N] [NUM_THREADS=N] [NUM_MAPS=N] perl $0 file...
+\n".
" perl $0 [--keysize=N] [--threads=N] [--maps=N] file...\n
+".
" perl $0 [--keysize=N] [--threads=max] [--maps=max] file.
+..\n";
}
@ARGV or usage();
my $NUM_CPUS = MCE::Util->get_ncpu();
my $KEY_SIZE = $ENV{KEYSIZE} || 12;
my $NUM_THDS = $ENV{NUM_THREADS} || 8;
my $NUM_MAPS = $ENV{NUM_MAPS} || 32;
while ($ARGV[0] =~ /^--?/) {
my $arg = shift;
$KEY_SIZE = $1, next if $arg =~ /-keysize=(\d+)$/;
$NUM_THDS = $1, next if $arg =~ /-threads=(\d+)$/;
$NUM_THDS = $NUM_CPUS, next if $arg =~ /-threads=max$/;
$NUM_MAPS = $1, next if $arg =~ /-maps=(\d+)$/;
$NUM_MAPS = 128, next if $arg =~ /-maps=max$/;
usage();
}
$NUM_THDS = $NUM_CPUS if $NUM_THDS > $NUM_CPUS;
$NUM_MAPS = 128 if ($NUM_MAPS > 128);
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Setup.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
print {*STDERR} "Tokyo Cabinet hash database - start\n";
print {*STDERR} "fixed string length=${KEY_SIZE}, threads=${NUM_THDS},
+ maps=${NUM_MAPS}\n";
our @MM;
# Let's have Tokyo Cabinet handle locking because we're not doing FETC
+H/STORE.
# Instead, we're calling "addint" to increment the value (single call)
+.
# if ($^O =~ /cygwin|MSWin32/) {
# # On Cygwin, use Channel instead for better performance.
# $MM[$_] = MCE::Mutex->new(impl => "Channel")
# for (0 .. $NUM_MAPS - 1);
# } else {
# $MM[$_] = MCE::Mutex->new(impl => "Flock", path => "$tmp_dir/$_.
+sem")
# for (0 .. $NUM_MAPS - 1);
# }
# Open DB function.
# Each child must open the DB file separately.
sub open_db {
my ($idx, $omode) = @_;
# hash (*.tch db) is faster than tree (*.tcb db) for this demonstr
+ation
my $hdb = TokyoCabinet::HDB->new();
my $path = "$tmp_dir/$idx.tch";
$hdb->tune(500000); # tune bnum before opening the db
$hdb->open($path, $omode) or die "Open error '$path': $!";
return $hdb;
}
# Create the databases.
{
open_db($_, TokyoCabinet::HDB::OWRITER | TokyoCabinet::HDB::OCREAT
+)
for (0 .. $NUM_MAPS - 1);
}
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Get properties.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
my ($start1, $process_count, $num_lines) = (time, 0, 0);
our ($NUM_LINES, %TEMP) = (0); # child vars
my $mce = MCE->new(
max_workers => $NUM_THDS,
chunk_size => 65536,
gather => sub { $num_lines += $_[0] },
posix_exit => 1,
use_slurpio => 1,
user_func => sub {
my ($mce, $slurp_ref, $chunk_id) = @_;
open my $input_fh, '<', $slurp_ref;
while (<$input_fh>) {
my ($key, $count) = split /\t/;
my $idx = xxhash64($key, 0) % $NUM_MAPS;
$TEMP{$idx}{$key} += $count;
$NUM_LINES++;
}
close $input_fh;
},
user_end => sub {
my $omode = TokyoCabinet::HDB::OREADER | TokyoCabinet::HDB::OW
+RITER;
for my $idx (keys %TEMP) {
# Acquire the lock before opening the DB file. Must also c
+lose.
# $MM[$idx]->lock_exclusive;
# my $db = open_db($idx, $omode);
# while (my ($key, $count) = each %{ $TEMP{$idx} }) {
# my $val = $db->get($key);
# $db->put($key, $val + $count);
# }
# $db->close;
# $MM[$idx]->unlock;
my $db = open_db($idx, $omode);
while (my ($key, $count) = each %{ $TEMP{$idx} }) {
$db->addint($key, $count);
}
$db->close;
}
MCE->gather($NUM_LINES);
$NUM_LINES = 0, %TEMP = ();
},
);
for my $fname (@ARGV) {
warn("'$fname': Is a directory, skipping\n"), next if (-d $fname);
warn("'$fname': No such file, skipping\n"), next unless (-f $fname
+);
warn("'$fname': Permission denied, skipping\n"), next unless (-r $
+fname);
++$process_count, $mce->process($fname) if (-s $fname);
}
$mce->shutdown; # reap workers
printf {*STDERR} "get properties : %9.3f secs\n", time - $start1;
exit unless $process_count;
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Pack data for sorting.
# Each worker handles a unique DB.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
my $VAL_SIZE = length pack('l', 0);
my $STR_SIZE = $KEY_SIZE + 1; # null-terminated
my $PACK_SIZE = $STR_SIZE + $VAL_SIZE;
my $FETCH_SIZE = $PACK_SIZE * 12000;
sub pack_task {
my ($mce, $seq_id, $chunk_id) = @_;
my $db = open_db($seq_id, TokyoCabinet::HDB::OREADER);
my ($num_rows, $kv_pairs) = (0, '');
# Calling addint above? "Because records are stored in binary form
+at,
# they should be processed with the 'unpack' function with the 'i'
# operator after retrieval."
if ($db->iterinit) {
my ($key, $val);
while (defined ($key = $db->iternext)) {
# $val = $db->get($key);
$val = unpack('i', $db->get($key));
$num_rows += 1;
$kv_pairs .= pack("lZ${STR_SIZE}", -($val), $key);
}
}
$mce->gather($num_rows, $kv_pairs);
}
my ($start2, $unique, $data) = (time, 0, '');
# Spin up MCE workers to handle packing and output.
$mce = MCE->new(
max_workers => $NUM_THDS,
chunk_size => 1,
init_relay => 1,
posix_exit => 1,
user_func => sub {
my $task = MCE->user_args->[0];
no strict 'refs';
$task->(@_);
},
);
# Pack data for sorting.
$mce->process({
user_args => [ 'pack_task' ],
sequence => [ 0, $NUM_MAPS - 1 ],
chunk_size => 1,
gather => sub {
$unique += $_[0];
$data .= $_[1];
},
});
printf {*STDERR} "pack properties : %9.3f secs\n", time - $start2;
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Output data by value descending, word ascending.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Get the next down value for integer division.
sub divide_down {
my ($dividend, $divisor) = @_;
return int($dividend / $divisor) if $dividend % $divisor;
return int($dividend / $divisor) - 1;
}
# Return a chunk of $data: manager responding to worker request.
sub fetch_chunk {
my ($seq_id) = @_;
return substr($data, $seq_id * $FETCH_SIZE, $FETCH_SIZE);
}
# Worker task: unpack chunk and write directly to standard output.
sub disp_task {
my ($mce, $seq_id, $chunk_id) = @_;
my ($output, $chunk) = ('', $mce->do('fetch_chunk', $seq_id));
while (length $chunk) {
my ($val, $key) = unpack(
"lZ$STR_SIZE",
substr($chunk, 0, $PACK_SIZE, '')
);
$output .= $key. "\t". -($val). "\n";
}
MCE::relay { print $output; };
}
if (length $data) {
my $start3 = time;
sort_packed "C$PACK_SIZE", $data;
printf {*STDERR} "sort packed data : %9.3f secs\n", time - $start3
+;
my $start4 = time;
$mce->process({
user_args => [ 'disp_task' ],
sequence => [ 0, divide_down(length($data), $FETCH_SIZE) ],
chunk_size => 1,
});
printf {*STDERR} "write stdout : %9.3f secs\n", time - $start4
+;
}
$mce->shutdown; # reap workers
@MM = ();
printf {*STDERR} "total time : %9.3f secs\n", time - $start1;
printf {*STDERR} " count lines : %lu\n", $num_lines;
printf {*STDERR} " count unique : %lu\n", $unique;