Beefy Boxes and Bandwidth Generously Provided by pair Networks
Pathologically Eclectic Rubbish Lister
 
PerlMonks  

Re: Solving the Long List is Long challenge - IPC::MMA

by marioroy (Prior)
on Jul 13, 2023 at 10:52 UTC ( [id://11153404]=note: print w/replies, xml ) Need Help??


in reply to Solving the Long List is Long challenge, finally?

Hi, again... :)

Something else I tried is IPC::MMA, to complement the other two demonstrations here (using Tie::Hash::DBD) and here (using DB_File). This variant creates 256 hashes (by default) into shared memory. Crypt::xxHash is used to determine which hash to insert/update. Sorting is handled by Sort::Packed.

The Author states, "IPC::MMA 'hashes' do not hash keys. They maintain their entries in sorted order on their keys, but they are not BTrees." So, to keep up with SQLite and DB_File, I configured 8x the number of maps. Be sure ulimit -n is plentiful.

Usage: KEYSIZE=N NUM_THREADS=N NUM_MAPS=N perl llilmma.pl file...
       perl llilmma.pl --keysize=N --threads=N --maps=N file...
       perl llilmma.pl --keysize=N --threads=max --maps=max file...

Running:

$ perl llilmma.pl big{1,2,3}.txt | cksum IPC::MMA shared memory hash - start fixed string length=12, threads=8, maps=256 get properties : 6.677 secs pack properties : 0.781 secs sort packed data : 0.935 secs write stdout : 0.734 secs total time : 9.131 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llilmma.pl --threads=16 --maps=64 big{1,2,3}.txt | cksum IPC::MMA shared memory hash - start fixed string length=12, threads=16, maps=512 get properties : 3.189 secs pack properties : 0.462 secs sort packed data : 0.941 secs write stdout : 0.383 secs total time : 4.983 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llilmma.pl --threads=24 --maps=64 big{1,2,3}.txt | cksum IPC::MMA shared memory hash - start fixed string length=12, threads=24, maps=512 get properties : 2.388 secs pack properties : 0.332 secs sort packed data : 0.942 secs write stdout : 0.294 secs total time : 3.962 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llilmma.pl --threads=48 --maps=max big{1,2,3}.txt | cksum IPC::MMA shared memory hash - start fixed string length=12, threads=48, maps=1024 get properties : 1.441 secs pack properties : 0.409 secs sort packed data : 0.935 secs write stdout : 0.223 secs total time : 3.024 secs count lines : 10545600 count unique : 10367603 2956888413 93308427

llilmma.pl

use strict; use warnings; no warnings 'uninitialized'; BEGIN { die "this script was not validated on this platform" unless ($^O =~ /linux/); } use IPC::MMA qw(:basic :hash); use Crypt::xxHash qw(xxhash64); use Sort::Packed qw(sort_packed); use Time::HiRes qw(time); use MCE::Signal qw($tmp_dir -use_dev_shm); use MCE; sub usage { die "Usage: [KEYSIZE=N] [NUM_THREADS=N] [NUM_MAPS=N] perl $0 file... +\n". " perl $0 [--keysize=N] [--threads=N] [--maps=N] file...\n +". " perl $0 [--keysize=N] [--threads=max] [--maps=max] file. +..\n"; } @ARGV or usage(); my $NUM_CPUS = MCE::Util->get_ncpu(); my $KEY_SIZE = $ENV{KEYSIZE} || 12; my $NUM_THDS = $ENV{NUM_THREADS} || 8; my $NUM_MAPS = $ENV{NUM_MAPS} || 256; while ($ARGV[0] =~ /^--?/) { my $arg = shift; $KEY_SIZE = $1, next if $arg =~ /-keysize=(\d+)$/; $NUM_THDS = $1, next if $arg =~ /-threads=(\d+)$/; $NUM_THDS = $NUM_CPUS, next if $arg =~ /-threads=max$/; $NUM_MAPS = $1, next if $arg =~ /-maps=(\d+)$/; $NUM_MAPS = 1024, next if $arg =~ /-maps=max$/; usage(); } $NUM_THDS = $NUM_CPUS if $NUM_THDS > $NUM_CPUS; $NUM_MAPS = 1024 if ($NUM_MAPS > 1024); #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Setup. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ print {*STDERR} "IPC::MMA shared memory hash - start\n"; print {*STDERR} "fixed string length=${KEY_SIZE}, threads=${NUM_THDS}, + maps=${NUM_MAPS}\n"; our (@MM, @HA); for my $idx (0 .. $NUM_MAPS - 1) { $MM[$idx] = mm_create(0, "$tmp_dir/$idx"); $HA[$idx] = mm_make_hash($MM[$idx]); } #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Get properties. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ my ($start1, $process_count, $num_lines) = (time, 0, 0); our ($NUM_LINES, %TEMP) = (0); # child vars my $mce = MCE->new( max_workers => $NUM_THDS, chunk_size => 65536, gather => sub { $num_lines += $_[0] }, posix_exit => 1, use_slurpio => 1, user_func => sub { my ($mce, $slurp_ref, $chunk_id) = @_; open my $input_fh, '<', $slurp_ref; while (<$input_fh>) { my ($key, $count) = split /\t/; my $idx = xxhash64($key, 0) % $NUM_MAPS; $TEMP{$idx}{$key} += $count; $NUM_LINES++; } close $input_fh; }, user_end => sub { for my $idx (keys %TEMP) { mm_lock($MM[$idx], MM_LOCK_RW); while (my ($key, $count) = each %{ $TEMP{$idx} }) { my $val = mma_hash_fetch($HA[$idx], $key); mma_hash_store($HA[$idx], $key, $val + $count); } mm_unlock($MM[$idx]); } MCE->gather($NUM_LINES); $NUM_LINES = 0, %TEMP = (); }, ); for my $fname (@ARGV) { warn("'$fname': Is a directory, skipping\n"), next if (-d $fname); warn("'$fname': No such file, skipping\n"), next unless (-f $fname +); warn("'$fname': Permission denied, skipping\n"), next unless (-r $ +fname); ++$process_count, $mce->process($fname) if (-s $fname); } $mce->shutdown; # reap workers printf {*STDERR} "get properties : %9.3f secs\n", time - $start1; exit unless $process_count; #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Pack data for sorting. # Each worker handles a unique hashmap. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ my $VAL_SIZE = length pack('l', 0); my $STR_SIZE = $KEY_SIZE + 1; # null-terminated my $PACK_SIZE = $STR_SIZE + $VAL_SIZE; my $FETCH_SIZE = $PACK_SIZE * 12000; sub pack_task { my ($mce, $seq_id, $chunk_id) = @_; my $ha = $HA[$seq_id]; my $kv_pairs = ''; if ((my $num_keys = mma_hash_scalar($ha)) > 0) { for my $i (0 .. $num_keys - 1) { my ($key, $val) = mma_hash_get_entry($ha, $i); last unless length($key); $kv_pairs .= pack("lZ${STR_SIZE}", -$val, $key); } $mce->gather($num_keys, $kv_pairs); } } my ($start2, $unique, $data) = (time, 0, ''); # Spin up MCE workers to handle packing and output. $mce = MCE->new( max_workers => $NUM_THDS, chunk_size => 1, init_relay => 1, posix_exit => 1, user_func => sub { my $task = MCE->user_args->[0]; no strict 'refs'; $task->(@_); }, ); # Pack data for sorting. $mce->process({ user_args => [ 'pack_task' ], sequence => [ 0, $NUM_MAPS - 1 ], chunk_size => 1, gather => sub { $unique += $_[0]; $data .= $_[1]; }, }); printf {*STDERR} "pack properties : %9.3f secs\n", time - $start2; #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Output data by value descending, word ascending. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Get the next down value for integer division. sub divide_down { my ($dividend, $divisor) = @_; return int($dividend / $divisor) if $dividend % $divisor; return int($dividend / $divisor) - 1; } # Return a chunk of $data: manager responding to worker request. sub fetch_chunk { my ($seq_id) = @_; return substr($data, $seq_id * $FETCH_SIZE, $FETCH_SIZE); } # Worker task: unpack chunk and write directly to standard output. sub disp_task { my ($mce, $seq_id, $chunk_id) = @_; my ($output, $chunk) = ('', $mce->do('fetch_chunk', $seq_id)); while (length $chunk) { my ($val, $key) = unpack( "lZ$STR_SIZE", substr($chunk, 0, $PACK_SIZE, '') ); $output .= $key. "\t". -($val). "\n"; } MCE::relay { print $output; }; } if (length $data) { my $start3 = time; sort_packed "C$PACK_SIZE", $data; printf {*STDERR} "sort packed data : %9.3f secs\n", time - $start3 +; my $start4 = time; $mce->process({ user_args => [ 'disp_task' ], sequence => [ 0, divide_down(length($data), $FETCH_SIZE) ], chunk_size => 1, }); printf {*STDERR} "write stdout : %9.3f secs\n", time - $start4 +; } $mce->shutdown; # reap workers @MM = @HA = (); printf {*STDERR} "total time : %9.3f secs\n", time - $start1; printf {*STDERR} " count lines : %lu\n", $num_lines; printf {*STDERR} " count unique : %lu\n", $unique;

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://11153404]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others meditating upon the Monastery: (2)
As of 2025-02-09 15:35 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Which URL do you most often use to access this site?












    Results (96 votes). Check out past polls.