use strict;
use warnings;
no warnings 'uninitialized';
use DBD::SQLite::Constants qw(SQLITE_OPEN_NOMUTEX SQLITE_OPEN_READONLY
+);
use Tie::Hash::DBD;
# Do not clone the DB handle, passed to Tie::Hash::DBD->TIEHASH.
# The PRAGMA statements are not copied/preserved when cloned.
{ no warnings 'redefine'; sub DBI::db::clone { return $_[0]; } }
use Crypt::xxHash qw(xxhash64);
use Sort::Packed qw(sort_packed);
use Time::HiRes qw(time);
use MCE::Signal qw($tmp_dir -use_dev_shm);
use MCE;
sub usage {
die "Usage: [KEYSIZE=N] [NUM_THREADS=N] [NUM_MAPS=N] perl $0 file...
+\n".
" perl $0 [--keysize=N] [--threads=N] [--maps=N] file...\n
+".
" perl $0 [--keysize=N] [--threads=max] [--maps=max] file.
+..\n";
}
@ARGV or usage();
my $NUM_CPUS = MCE::Util->get_ncpu();
my $KEY_SIZE = $ENV{KEYSIZE} || 12;
my $NUM_THDS = $ENV{NUM_THREADS} || 8;
my $NUM_MAPS = $ENV{NUM_MAPS} || 32;
while ($ARGV[0] =~ /^--?/) {
my $arg = shift;
$KEY_SIZE = $1, next if $arg =~ /-keysize=(\d+)$/;
$NUM_THDS = $1, next if $arg =~ /-threads=(\d+)$/;
$NUM_THDS = $NUM_CPUS, next if $arg =~ /-threads=max$/;
$NUM_MAPS = $1, next if $arg =~ /-maps=(\d+)$/;
$NUM_MAPS = 128, next if $arg =~ /-maps=max$/;
usage();
}
$NUM_THDS = $NUM_CPUS if $NUM_THDS > $NUM_CPUS;
$NUM_MAPS = 128 if ($NUM_MAPS > 128);
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Setup.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
print {*STDERR} "Tie::Hash::DBD SQLite database - start\n";
print {*STDERR} "fixed string length=${KEY_SIZE}, threads=${NUM_THDS},
+ maps=${NUM_MAPS}\n";
our @MM;
if ($^O =~ /cygwin|MSWin32/) {
# On Cygwin, use Channel instead for better performance.
$MM[$_] = MCE::Mutex->new(impl => "Channel")
for (0 .. $NUM_MAPS - 1);
} else {
$MM[$_] = MCE::Mutex->new(impl => "Flock", path => "$tmp_dir/$_.se
+m")
for (0 .. $NUM_MAPS - 1);
}
# Open DB function.
# Each child must open the DB file separately.
# https://metacpan.org/pod/DBD::SQLite#DBD::SQLite-and-fork()
sub open_db {
my ($idx, $flags) = @_;
my $dbh = DBI->connect (
"dbi:SQLite:dbname=$tmp_dir/$idx.db", undef, undef, {
sqlite_open_flags => defined($flags) ? $flags : 0,
PrintError => 0, RaiseError => 1, PrintWarn => 0,
FetchHashKeyName => "NAME_lc",
}
) or Carp::croak (DBI->errstr);
# Apply PRAGMA statements before constructing Tie::Hash::DBD.
$dbh->do("PRAGMA journal_mode = OFF");
$dbh->do("PRAGMA synchronous = OFF");
$dbh->do("PRAGMA temp_store = MEMORY");
my $db = Tie::Hash::DBD->TIEHASH( $dbh, {
tbl => 'kv_store', key => 'f_key', fld => 'f_val',
trh => 0, str => undef,
});
# Change the insert statement to the special UPSERT syntax.
# This form is supported since SQLite version 3.24.0 (2018-06-04).
# Refer to https://sqlite.org/lang_UPSERT.html
$db->{ins} = $dbh->prepare("
INSERT INTO kv_store (f_key, f_val) VALUES (:1, :2)
ON CONFLICT(f_key) DO UPDATE SET
f_val = kv_store.f_val + :2
");
return $db;
}
# Create the databases.
{
open_db($_) for (0 .. $NUM_MAPS - 1);
}
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Get properties.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
my ($start1, $process_count, $num_lines) = (time, 0, 0);
our ($NUM_LINES, %TEMP) = (0); # child vars
my $mce = MCE->new(
max_workers => $NUM_THDS,
chunk_size => 65536,
gather => sub { $num_lines += $_[0] },
posix_exit => 1,
use_slurpio => 1,
user_func => sub {
my ($mce, $slurp_ref, $chunk_id) = @_;
open my $input_fh, '<', $slurp_ref;
while (<$input_fh>) {
my ($key, $count) = split /\t/;
my $idx = xxhash64($key, 0) % $NUM_MAPS;
$TEMP{$idx}{$key} += $count;
$NUM_LINES++;
}
close $input_fh;
},
user_end => sub {
for my $idx (keys %TEMP) {
$MM[$idx]->lock_exclusive;
my $db = open_db($idx, SQLITE_OPEN_NOMUTEX);
$db->{dbh}->begin_work;
# while (my ($key, $count) = each %{ $TEMP{$idx} }) {
# my $val = $db->FETCH($key);
# defined($val)
# ? ( $db->{upd}->execute($val + $count, $key) )
# : ( $db->{ins}->execute($key, $count) );
# }
# Faster, with UPSERT statement.
while (my ($key, $count) = each %{ $TEMP{$idx} }) {
$db->{ins}->execute($key, $count);
}
$db->{dbh}->commit;
$MM[$idx]->unlock;
undef $db->{dbh};
}
MCE->gather($NUM_LINES);
$NUM_LINES = 0, %TEMP = ();
},
);
for my $fname (@ARGV) {
warn("'$fname': Is a directory, skipping\n"), next if (-d $fname);
warn("'$fname': No such file, skipping\n"), next unless (-f $fname
+);
warn("'$fname': Permission denied, skipping\n"), next unless (-r $
+fname);
++$process_count, $mce->process($fname) if (-s $fname);
}
$mce->shutdown; # reap workers
printf {*STDERR} "get properties : %9.3f secs\n", time - $start1;
exit unless $process_count;
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Pack data for sorting.
# Each worker handles a unique DB.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
my $VAL_SIZE = length pack('l', 0);
my $STR_SIZE = $KEY_SIZE + 1; # null-terminated
my $PACK_SIZE = $STR_SIZE + $VAL_SIZE;
my $FETCH_SIZE = $PACK_SIZE * 12000;
sub pack_task {
my ($mce, $seq_id, $chunk_id) = @_;
my $db = open_db($seq_id, SQLITE_OPEN_NOMUTEX | SQLITE_OPEN_READON
+LY);
my $kv_pairs = '';
if ((my $num_rows = $db->SCALAR) > 0) {
my $dbh = $db->{dbh};
my $sql = "select f_key, f_val from kv_store";
my $sth = $dbh->prepare($sql); $sth->execute;
while (my $aref = $sth->fetchall_arrayref(undef, 4000)) {
foreach (@{ $aref }) {
$kv_pairs .= pack("lZ${STR_SIZE}", -$_->[1], $_->[0]);
}
}
$mce->gather($num_rows, $kv_pairs);
}
}
my ($start2, $unique, $data) = (time, 0, '');
# Spin up MCE workers to handle packing and output.
$mce = MCE->new(
max_workers => $NUM_THDS,
chunk_size => 1,
init_relay => 1,
posix_exit => 1,
user_func => sub {
my $task = MCE->user_args->[0];
no strict 'refs';
$task->(@_);
},
);
# Pack data for sorting.
$mce->process({
user_args => [ 'pack_task' ],
sequence => [ 0, $NUM_MAPS - 1 ],
chunk_size => 1,
gather => sub {
$unique += $_[0];
$data .= $_[1];
},
});
printf {*STDERR} "pack properties : %9.3f secs\n", time - $start2;
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Output data by value descending, word ascending.
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+~~
# Get the next down value for integer division.
sub divide_down {
my ($dividend, $divisor) = @_;
return int($dividend / $divisor) if $dividend % $divisor;
return int($dividend / $divisor) - 1;
}
# Return a chunk of $data: manager responding to worker request.
sub fetch_chunk {
my ($seq_id) = @_;
return substr($data, $seq_id * $FETCH_SIZE, $FETCH_SIZE);
}
# Worker task: unpack chunk and write directly to standard output.
sub disp_task {
my ($mce, $seq_id, $chunk_id) = @_;
my ($output, $chunk) = ('', $mce->do('fetch_chunk', $seq_id));
while (length $chunk) {
my ($val, $key) = unpack(
"lZ$STR_SIZE",
substr($chunk, 0, $PACK_SIZE, '')
);
$output .= $key. "\t". -($val). "\n";
}
MCE::relay { print $output; };
}
if (length $data) {
my $start3 = time;
sort_packed "C$PACK_SIZE", $data;
printf {*STDERR} "sort packed data : %9.3f secs\n", time - $start3
+;
my $start4 = time;
$mce->process({
user_args => [ 'disp_task' ],
sequence => [ 0, divide_down(length($data), $FETCH_SIZE) ],
chunk_size => 1,
});
printf {*STDERR} "write stdout : %9.3f secs\n", time - $start4
+;
}
$mce->shutdown; # reap workers
@MM = ();
printf {*STDERR} "total time : %9.3f secs\n", time - $start1;
printf {*STDERR} " count lines : %lu\n", $num_lines;
printf {*STDERR} " count unique : %lu\n", $unique;