Beefy Boxes and Bandwidth Generously Provided by pair Networks
The stupid question is the question not asked
 
PerlMonks  

Re: Solving the Long List is Long challenge - DB_File

by marioroy (Prior)
on Jul 13, 2023 at 10:07 UTC ( [id://11153403]=note: print w/replies, xml ) Need Help??


in reply to Solving the Long List is Long challenge, finally?

Greetings,

After trying C++, I thought to give Perl another try using DB_File. The demonstration creates 32 databases (by default) into a temp directory. Crypt::xxHash is used to determine which database to insert/update. Sorting is handled by Sort::Packed. Like with the SQLite variant, this is best run on a Unix OS.

Usage: KEYSIZE=N NUM_THREADS=N NUM_MAPS=N perl llildbf.pl file...
       perl llildbf.pl --keysize=N --threads=N --maps=N file...
       perl llildbf.pl --keysize=N --threads=max --maps=max file...

Running:

$ perl llildbf.pl big{1,2,3}.txt | cksum DB_File B-tree database - start fixed string length=12, threads=8, maps=32 get properties : 6.918 secs pack properties : 0.915 secs sort packed data : 0.948 secs write stdout : 0.763 secs total time : 9.548 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llildbf.pl --threads=16 --maps=64 big{1,2,3}.txt | cksum DB_File B-tree database - start fixed string length=12, threads=16, maps=64 get properties : 3.418 secs pack properties : 0.530 secs sort packed data : 0.933 secs write stdout : 0.390 secs total time : 5.274 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llildbf.pl --threads=24 --maps=64 big{1,2,3}.txt | cksum DB_File B-tree database - start fixed string length=12, threads=24, maps=64 get properties : 2.400 secs pack properties : 0.438 secs sort packed data : 0.935 secs write stdout : 0.297 secs total time : 4.076 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llildbf.pl --threads=48 --maps=max big{1,2,3}.txt | cksum DB_File B-tree database - start fixed string length=12, threads=48, maps=128 get properties : 1.387 secs pack properties : 0.318 secs sort packed data : 0.934 secs write stdout : 0.242 secs total time : 2.891 secs count lines : 10545600 count unique : 10367603 2956888413 93308427

llildbf.pl

use strict; use warnings; no warnings 'uninitialized'; use Fcntl qw(O_CREAT O_RDWR O_RDONLY); use DB_File; use Crypt::xxHash qw(xxhash64); use Sort::Packed qw(sort_packed); use Time::HiRes qw(time); use MCE::Signal qw($tmp_dir -use_dev_shm); use MCE; sub usage { die "Usage: [KEYSIZE=N] [NUM_THREADS=N] [NUM_MAPS=N] perl $0 file... +\n". " perl $0 [--keysize=N] [--threads=N] [--maps=N] file...\n +". " perl $0 [--keysize=N] [--threads=max] [--maps=max] file. +..\n"; } @ARGV or usage(); my $NUM_CPUS = MCE::Util->get_ncpu(); my $KEY_SIZE = $ENV{KEYSIZE} || 12; my $NUM_THDS = $ENV{NUM_THREADS} || 8; my $NUM_MAPS = $ENV{NUM_MAPS} || 32; while ($ARGV[0] =~ /^--?/) { my $arg = shift; $KEY_SIZE = $1, next if $arg =~ /-keysize=(\d+)$/; $NUM_THDS = $1, next if $arg =~ /-threads=(\d+)$/; $NUM_THDS = $NUM_CPUS, next if $arg =~ /-threads=max$/; $NUM_MAPS = $1, next if $arg =~ /-maps=(\d+)$/; $NUM_MAPS = 128, next if $arg =~ /-maps=max$/; usage(); } $NUM_THDS = $NUM_CPUS if $NUM_THDS > $NUM_CPUS; $NUM_MAPS = 128 if ($NUM_MAPS > 128); #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Setup. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ print {*STDERR} "DB_File B-tree database - start\n"; print {*STDERR} "fixed string length=${KEY_SIZE}, threads=${NUM_THDS}, + maps=${NUM_MAPS}\n"; our @MM; if ($^O =~ /cygwin|MSWin32/) { # On Cygwin, use Channel instead for better performance. $MM[$_] = MCE::Mutex->new(impl => "Channel") for (0 .. $NUM_MAPS - 1); } else { $MM[$_] = MCE::Mutex->new(impl => "Flock", path => "$tmp_dir/$_.se +m") for (0 .. $NUM_MAPS - 1); } # Open DB function. # Each child must open the DB file separately. sub open_db { my ($idx, $flags) = @_; # $DB_BTREE is faster than $DB_HASH for this demonstration return DB_File->TIEHASH("$tmp_dir/$idx.db", $flags, 0600, $DB_BTRE +E); } # Create the databases. { open_db($_, O_CREAT|O_RDWR) for (0 .. $NUM_MAPS - 1); } #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Get properties. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ my ($start1, $process_count, $num_lines) = (time, 0, 0); our ($NUM_LINES, %TEMP) = (0); # child vars my $mce = MCE->new( max_workers => $NUM_THDS, chunk_size => 65536, gather => sub { $num_lines += $_[0] }, posix_exit => 1, use_slurpio => 1, user_func => sub { my ($mce, $slurp_ref, $chunk_id) = @_; open my $input_fh, '<', $slurp_ref; while (<$input_fh>) { my ($key, $count) = split /\t/; my $idx = xxhash64($key, 0) % $NUM_MAPS; $TEMP{$idx}{$key} += $count; $NUM_LINES++; } close $input_fh; }, user_end => sub { for my $idx (keys %TEMP) { # Acquire the lock before opening the DB file. Must also s +ync. $MM[$idx]->lock_exclusive; my $db = open_db($idx, O_RDWR); while (my ($key, $count) = each %{ $TEMP{$idx} }) { my $val = $db->FETCH($key); $db->STORE($key, $val + $count); } $db->sync; $MM[$idx]->unlock; } MCE->gather($NUM_LINES); $NUM_LINES = 0, %TEMP = (); }, ); for my $fname (@ARGV) { warn("'$fname': Is a directory, skipping\n"), next if (-d $fname); warn("'$fname': No such file, skipping\n"), next unless (-f $fname +); warn("'$fname': Permission denied, skipping\n"), next unless (-r $ +fname); ++$process_count, $mce->process($fname) if (-s $fname); } $mce->shutdown; # reap workers printf {*STDERR} "get properties : %9.3f secs\n", time - $start1; exit unless $process_count; #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Pack data for sorting. # Each worker handles a unique DB. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ my $VAL_SIZE = length pack('l', 0); my $STR_SIZE = $KEY_SIZE + 1; # null-terminated my $PACK_SIZE = $STR_SIZE + $VAL_SIZE; my $FETCH_SIZE = $PACK_SIZE * 12000; sub pack_task { my ($mce, $seq_id, $chunk_id) = @_; my $db = open_db($seq_id, O_RDONLY); my ($num_rows, $kv_pairs) = (0, ''); # Iterate through the database using seq. return if $db->seq(my $key, my $val, R_FIRST); do { $num_rows += 1; $kv_pairs .= pack("lZ${STR_SIZE}", -$val, $key); } until ($db->seq($key, $val, R_NEXT)); $mce->gather($num_rows, $kv_pairs); } my ($start2, $unique, $data) = (time, 0, ''); # Spin up MCE workers to handle packing and output. $mce = MCE->new( max_workers => $NUM_THDS, chunk_size => 1, init_relay => 1, posix_exit => 1, user_func => sub { my $task = MCE->user_args->[0]; no strict 'refs'; $task->(@_); }, ); # Pack data for sorting. $mce->process({ user_args => [ 'pack_task' ], sequence => [ 0, $NUM_MAPS - 1 ], chunk_size => 1, gather => sub { $unique += $_[0]; $data .= $_[1]; }, }); printf {*STDERR} "pack properties : %9.3f secs\n", time - $start2; #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Output data by value descending, word ascending. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Get the next down value for integer division. sub divide_down { my ($dividend, $divisor) = @_; return int($dividend / $divisor) if $dividend % $divisor; return int($dividend / $divisor) - 1; } # Return a chunk of $data: manager responding to worker request. sub fetch_chunk { my ($seq_id) = @_; return substr($data, $seq_id * $FETCH_SIZE, $FETCH_SIZE); } # Worker task: unpack chunk and write directly to standard output. sub disp_task { my ($mce, $seq_id, $chunk_id) = @_; my ($output, $chunk) = ('', $mce->do('fetch_chunk', $seq_id)); while (length $chunk) { my ($val, $key) = unpack( "lZ$STR_SIZE", substr($chunk, 0, $PACK_SIZE, '') ); $output .= $key. "\t". -($val). "\n"; } MCE::relay { print $output; }; } if (length $data) { my $start3 = time; sort_packed "C$PACK_SIZE", $data; printf {*STDERR} "sort packed data : %9.3f secs\n", time - $start3 +; my $start4 = time; $mce->process({ user_args => [ 'disp_task' ], sequence => [ 0, divide_down(length($data), $FETCH_SIZE) ], chunk_size => 1, }); printf {*STDERR} "write stdout : %9.3f secs\n", time - $start4 +; } $mce->shutdown; # reap workers @MM = (); printf {*STDERR} "total time : %9.3f secs\n", time - $start1; printf {*STDERR} " count lines : %lu\n", $num_lines; printf {*STDERR} " count unique : %lu\n", $unique;

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://11153403]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others rifling through the Monastery: (5)
As of 2025-02-14 15:29 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found