Beefy Boxes and Bandwidth Generously Provided by pair Networks
Come for the quick hacks, stay for the epiphanies.
 
PerlMonks  

Re: Solving the Long List is Long challenge - Tokyo Cabinet

by marioroy (Prior)
on Jul 14, 2023 at 00:17 UTC ( [id://11153407]=note: print w/replies, xml ) Need Help??


in reply to Solving the Long List is Long challenge, finally?

Hi, I'm adding another to the mix.

I tried also, Tokyo Cabinet with Perl. The demonstration creates 32 hash databases (by default) into a temp directory. Crypt::xxHash is used to determine which database to insert/update. Sorting is handled by Sort::Packed. Similar to the DB_File variant, this is best run on a Unix OS.

Usage: KEYSIZE=N NUM_THREADS=N NUM_MAPS=N perl lliltch.pl file...
       perl lliltch.pl --keysize=N --threads=N --maps=N file...
       perl lliltch.pl --keysize=N --threads=max --maps=max file...

Running:

$ perl lliltch.pl big{1,2,3}.txt | cksum Tokyo Cabinet hash database - start fixed string length=12, threads=8, maps=32 get properties : 3.238 secs pack properties : 1.624 secs sort packed data : 0.963 secs write stdout : 0.725 secs total time : 6.554 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl lliltch.pl --threads=16 --maps=64 big{1,2,3}.txt | cksum Tokyo Cabinet hash database - start fixed string length=12, threads=16, maps=64 get properties : 1.814 secs pack properties : 0.891 secs sort packed data : 0.971 secs write stdout : 0.384 secs total time : 4.064 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl lliltch.pl --threads=24 --maps=64 big{1,2,3}.txt | cksum Tokyo Cabinet hash database - start fixed string length=12, threads=24, maps=64 get properties : 1.353 secs pack properties : 0.695 secs sort packed data : 0.963 secs write stdout : 0.276 secs total time : 3.296 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl lliltch.pl --threads=48 --maps=max big{1,2,3}.txt | cksum Tokyo Cabinet hash database - start fixed string length=12, threads=48, maps=128 get properties : 0.817 secs pack properties : 0.450 secs sort packed data : 0.961 secs write stdout : 0.210 secs total time : 2.446 secs count lines : 10545600 count unique : 10367603 2956888413 93308427

lliltch.pl

## # This demonstration requires Tokyo Cabinet. # homepage http://fallabs.com/tokyocabinet/ # documentation http://fallabs.com/tokyocabinet/perldoc/ # # Installation: # wget http://fallabs.com/tokyocabinet/tokyocabinet-1.4.48.tar.gz # wget http://fallabs.com/tokyocabinet/perlpkg/tokyocabinet-perl-1.3 +4.tar.gz # # tar xzf tokyocabinet-1.4.48.tar.gz && cd tokyocabinet-1.4.48 # ./configure --disable-bzip # enabling requires bzip dev pkg # make # make install # Note: you may need to use "sudo" # cd .. # # tar xzf tokyocabinet-perl-1.34.tar.gz && cd tokyocabinet-perl-1.34 # perl Makefile.PL # make # make install # Note: you may need to use "sudo" # cd .. ## use strict; use warnings; no warnings 'uninitialized'; use TokyoCabinet; use Crypt::xxHash qw(xxhash64); use Sort::Packed qw(sort_packed); use Time::HiRes qw(time); use MCE::Signal qw($tmp_dir -use_dev_shm); use MCE; sub usage { die "Usage: [KEYSIZE=N] [NUM_THREADS=N] [NUM_MAPS=N] perl $0 file... +\n". " perl $0 [--keysize=N] [--threads=N] [--maps=N] file...\n +". " perl $0 [--keysize=N] [--threads=max] [--maps=max] file. +..\n"; } @ARGV or usage(); my $NUM_CPUS = MCE::Util->get_ncpu(); my $KEY_SIZE = $ENV{KEYSIZE} || 12; my $NUM_THDS = $ENV{NUM_THREADS} || 8; my $NUM_MAPS = $ENV{NUM_MAPS} || 32; while ($ARGV[0] =~ /^--?/) { my $arg = shift; $KEY_SIZE = $1, next if $arg =~ /-keysize=(\d+)$/; $NUM_THDS = $1, next if $arg =~ /-threads=(\d+)$/; $NUM_THDS = $NUM_CPUS, next if $arg =~ /-threads=max$/; $NUM_MAPS = $1, next if $arg =~ /-maps=(\d+)$/; $NUM_MAPS = 128, next if $arg =~ /-maps=max$/; usage(); } $NUM_THDS = $NUM_CPUS if $NUM_THDS > $NUM_CPUS; $NUM_MAPS = 128 if ($NUM_MAPS > 128); #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Setup. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ print {*STDERR} "Tokyo Cabinet hash database - start\n"; print {*STDERR} "fixed string length=${KEY_SIZE}, threads=${NUM_THDS}, + maps=${NUM_MAPS}\n"; our @MM; # Let's have Tokyo Cabinet handle locking because we're not doing FETC +H/STORE. # Instead, we're calling "addint" to increment the value (single call) +. # if ($^O =~ /cygwin|MSWin32/) { # # On Cygwin, use Channel instead for better performance. # $MM[$_] = MCE::Mutex->new(impl => "Channel") # for (0 .. $NUM_MAPS - 1); # } else { # $MM[$_] = MCE::Mutex->new(impl => "Flock", path => "$tmp_dir/$_. +sem") # for (0 .. $NUM_MAPS - 1); # } # Open DB function. # Each child must open the DB file separately. sub open_db { my ($idx, $omode) = @_; # hash (*.tch db) is faster than tree (*.tcb db) for this demonstr +ation my $hdb = TokyoCabinet::HDB->new(); my $path = "$tmp_dir/$idx.tch"; $hdb->tune(500000); # tune bnum before opening the db $hdb->open($path, $omode) or die "Open error '$path': $!"; return $hdb; } # Create the databases. { open_db($_, TokyoCabinet::HDB::OWRITER | TokyoCabinet::HDB::OCREAT +) for (0 .. $NUM_MAPS - 1); } #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Get properties. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ my ($start1, $process_count, $num_lines) = (time, 0, 0); our ($NUM_LINES, %TEMP) = (0); # child vars my $mce = MCE->new( max_workers => $NUM_THDS, chunk_size => 65536, gather => sub { $num_lines += $_[0] }, posix_exit => 1, use_slurpio => 1, user_func => sub { my ($mce, $slurp_ref, $chunk_id) = @_; open my $input_fh, '<', $slurp_ref; while (<$input_fh>) { my ($key, $count) = split /\t/; my $idx = xxhash64($key, 0) % $NUM_MAPS; $TEMP{$idx}{$key} += $count; $NUM_LINES++; } close $input_fh; }, user_end => sub { my $omode = TokyoCabinet::HDB::OREADER | TokyoCabinet::HDB::OW +RITER; for my $idx (keys %TEMP) { # Acquire the lock before opening the DB file. Must also c +lose. # $MM[$idx]->lock_exclusive; # my $db = open_db($idx, $omode); # while (my ($key, $count) = each %{ $TEMP{$idx} }) { # my $val = $db->get($key); # $db->put($key, $val + $count); # } # $db->close; # $MM[$idx]->unlock; my $db = open_db($idx, $omode); while (my ($key, $count) = each %{ $TEMP{$idx} }) { $db->addint($key, $count); } $db->close; } MCE->gather($NUM_LINES); $NUM_LINES = 0, %TEMP = (); }, ); for my $fname (@ARGV) { warn("'$fname': Is a directory, skipping\n"), next if (-d $fname); warn("'$fname': No such file, skipping\n"), next unless (-f $fname +); warn("'$fname': Permission denied, skipping\n"), next unless (-r $ +fname); ++$process_count, $mce->process($fname) if (-s $fname); } $mce->shutdown; # reap workers printf {*STDERR} "get properties : %9.3f secs\n", time - $start1; exit unless $process_count; #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Pack data for sorting. # Each worker handles a unique DB. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ my $VAL_SIZE = length pack('l', 0); my $STR_SIZE = $KEY_SIZE + 1; # null-terminated my $PACK_SIZE = $STR_SIZE + $VAL_SIZE; my $FETCH_SIZE = $PACK_SIZE * 12000; sub pack_task { my ($mce, $seq_id, $chunk_id) = @_; my $db = open_db($seq_id, TokyoCabinet::HDB::OREADER); my ($num_rows, $kv_pairs) = (0, ''); # Calling addint above? "Because records are stored in binary form +at, # they should be processed with the 'unpack' function with the 'i' # operator after retrieval." if ($db->iterinit) { my ($key, $val); while (defined ($key = $db->iternext)) { # $val = $db->get($key); $val = unpack('i', $db->get($key)); $num_rows += 1; $kv_pairs .= pack("lZ${STR_SIZE}", -($val), $key); } } $mce->gather($num_rows, $kv_pairs); } my ($start2, $unique, $data) = (time, 0, ''); # Spin up MCE workers to handle packing and output. $mce = MCE->new( max_workers => $NUM_THDS, chunk_size => 1, init_relay => 1, posix_exit => 1, user_func => sub { my $task = MCE->user_args->[0]; no strict 'refs'; $task->(@_); }, ); # Pack data for sorting. $mce->process({ user_args => [ 'pack_task' ], sequence => [ 0, $NUM_MAPS - 1 ], chunk_size => 1, gather => sub { $unique += $_[0]; $data .= $_[1]; }, }); printf {*STDERR} "pack properties : %9.3f secs\n", time - $start2; #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Output data by value descending, word ascending. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Get the next down value for integer division. sub divide_down { my ($dividend, $divisor) = @_; return int($dividend / $divisor) if $dividend % $divisor; return int($dividend / $divisor) - 1; } # Return a chunk of $data: manager responding to worker request. sub fetch_chunk { my ($seq_id) = @_; return substr($data, $seq_id * $FETCH_SIZE, $FETCH_SIZE); } # Worker task: unpack chunk and write directly to standard output. sub disp_task { my ($mce, $seq_id, $chunk_id) = @_; my ($output, $chunk) = ('', $mce->do('fetch_chunk', $seq_id)); while (length $chunk) { my ($val, $key) = unpack( "lZ$STR_SIZE", substr($chunk, 0, $PACK_SIZE, '') ); $output .= $key. "\t". -($val). "\n"; } MCE::relay { print $output; }; } if (length $data) { my $start3 = time; sort_packed "C$PACK_SIZE", $data; printf {*STDERR} "sort packed data : %9.3f secs\n", time - $start3 +; my $start4 = time; $mce->process({ user_args => [ 'disp_task' ], sequence => [ 0, divide_down(length($data), $FETCH_SIZE) ], chunk_size => 1, }); printf {*STDERR} "write stdout : %9.3f secs\n", time - $start4 +; } $mce->shutdown; # reap workers @MM = (); printf {*STDERR} "total time : %9.3f secs\n", time - $start1; printf {*STDERR} " count lines : %lu\n", $num_lines; printf {*STDERR} " count unique : %lu\n", $unique;

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://11153407]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others having an uproarious good time at the Monastery: (1)
As of 2025-02-09 14:27 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Which URL do you most often use to access this site?












    Results (96 votes). Check out past polls.