Beefy Boxes and Bandwidth Generously Provided by pair Networks
Your skill will accomplish
what the force of many cannot
 
PerlMonks  

Re: Solving the Long List is Long challenge - SQLite DB

by marioroy (Prior)
on Jul 13, 2023 at 09:53 UTC ( [id://11153401]=note: print w/replies, xml ) Need Help??


in reply to Solving the Long List is Long challenge, finally?

Aloha,

I circled back to Perl and tried something using Tie::Hash::DBD. The demonstration creates 32 SQLite databases (by default) into a temp directory. Crypt::xxHash is used to determine which database to insert/update. Sorting is handled by Sort::Packed. Similar to the DB_File variant, this is best run on a Unix OS.

Usage: KEYSIZE=N NUM_THREADS=N NUM_MAPS=N perl llilsql.pl file...
       perl llilsql.pl --keysize=N --threads=N --maps=N file...
       perl llilsql.pl --keysize=N --threads=max --maps=max file...

Running:

$ perl llilsql.pl big{1,2,3}.txt | cksum Tie::Hash::DBD SQLite database - start fixed string length=12, threads=8, maps=32 get properties : 5.123 secs pack properties : 0.893 secs sort packed data : 0.949 secs write stdout : 0.745 secs total time : 7.713 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llilsql.pl --threads=16 --maps=64 big{1,2,3}.txt | cksum Tie::Hash::DBD SQLite database - start fixed string length=12, threads=16, maps=64 get properties : 2.927 secs pack properties : 0.518 secs sort packed data : 0.944 secs write stdout : 0.396 secs total time : 4.792 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llilsql.pl --threads=24 --maps=64 big{1,2,3}.txt | cksum Tie::Hash::DBD SQLite database - start fixed string length=12, threads=24, maps=64 get properties : 2.074 secs pack properties : 0.450 secs sort packed data : 0.940 secs write stdout : 0.275 secs total time : 3.747 secs count lines : 10545600 count unique : 10367603 2956888413 93308427 $ perl llilsql.pl --threads=48 --maps=max big{1,2,3}.txt | cksum Tie::Hash::DBD SQLite database - start fixed string length=12, threads=48, maps=128 get properties : 1.432 secs pack properties : 0.297 secs sort packed data : 0.939 secs write stdout : 0.255 secs total time : 2.934 secs count lines : 10545600 count unique : 10367603 2956888413 93308427

llilsql.pl

use strict; use warnings; no warnings 'uninitialized'; use DBD::SQLite::Constants qw(SQLITE_OPEN_NOMUTEX SQLITE_OPEN_READONLY +); use Tie::Hash::DBD; # Do not clone the DB handle, passed to Tie::Hash::DBD->TIEHASH. # The PRAGMA statements are not copied/preserved when cloned. { no warnings 'redefine'; sub DBI::db::clone { return $_[0]; } } use Crypt::xxHash qw(xxhash64); use Sort::Packed qw(sort_packed); use Time::HiRes qw(time); use MCE::Signal qw($tmp_dir -use_dev_shm); use MCE; sub usage { die "Usage: [KEYSIZE=N] [NUM_THREADS=N] [NUM_MAPS=N] perl $0 file... +\n". " perl $0 [--keysize=N] [--threads=N] [--maps=N] file...\n +". " perl $0 [--keysize=N] [--threads=max] [--maps=max] file. +..\n"; } @ARGV or usage(); my $NUM_CPUS = MCE::Util->get_ncpu(); my $KEY_SIZE = $ENV{KEYSIZE} || 12; my $NUM_THDS = $ENV{NUM_THREADS} || 8; my $NUM_MAPS = $ENV{NUM_MAPS} || 32; while ($ARGV[0] =~ /^--?/) { my $arg = shift; $KEY_SIZE = $1, next if $arg =~ /-keysize=(\d+)$/; $NUM_THDS = $1, next if $arg =~ /-threads=(\d+)$/; $NUM_THDS = $NUM_CPUS, next if $arg =~ /-threads=max$/; $NUM_MAPS = $1, next if $arg =~ /-maps=(\d+)$/; $NUM_MAPS = 128, next if $arg =~ /-maps=max$/; usage(); } $NUM_THDS = $NUM_CPUS if $NUM_THDS > $NUM_CPUS; $NUM_MAPS = 128 if ($NUM_MAPS > 128); #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Setup. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ print {*STDERR} "Tie::Hash::DBD SQLite database - start\n"; print {*STDERR} "fixed string length=${KEY_SIZE}, threads=${NUM_THDS}, + maps=${NUM_MAPS}\n"; our @MM; if ($^O =~ /cygwin|MSWin32/) { # On Cygwin, use Channel instead for better performance. $MM[$_] = MCE::Mutex->new(impl => "Channel") for (0 .. $NUM_MAPS - 1); } else { $MM[$_] = MCE::Mutex->new(impl => "Flock", path => "$tmp_dir/$_.se +m") for (0 .. $NUM_MAPS - 1); } # Open DB function. # Each child must open the DB file separately. # https://metacpan.org/pod/DBD::SQLite#DBD::SQLite-and-fork() sub open_db { my ($idx, $flags) = @_; my $dbh = DBI->connect ( "dbi:SQLite:dbname=$tmp_dir/$idx.db", undef, undef, { sqlite_open_flags => defined($flags) ? $flags : 0, PrintError => 0, RaiseError => 1, PrintWarn => 0, FetchHashKeyName => "NAME_lc", } ) or Carp::croak (DBI->errstr); # Apply PRAGMA statements before constructing Tie::Hash::DBD. $dbh->do("PRAGMA journal_mode = OFF"); $dbh->do("PRAGMA synchronous = OFF"); $dbh->do("PRAGMA temp_store = MEMORY"); my $db = Tie::Hash::DBD->TIEHASH( $dbh, { tbl => 'kv_store', key => 'f_key', fld => 'f_val', trh => 0, str => undef, }); # Change the insert statement to the special UPSERT syntax. # This form is supported since SQLite version 3.24.0 (2018-06-04). # Refer to https://sqlite.org/lang_UPSERT.html $db->{ins} = $dbh->prepare(" INSERT INTO kv_store (f_key, f_val) VALUES (:1, :2) ON CONFLICT(f_key) DO UPDATE SET f_val = kv_store.f_val + :2 "); return $db; } # Create the databases. { open_db($_) for (0 .. $NUM_MAPS - 1); } #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Get properties. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ my ($start1, $process_count, $num_lines) = (time, 0, 0); our ($NUM_LINES, %TEMP) = (0); # child vars my $mce = MCE->new( max_workers => $NUM_THDS, chunk_size => 65536, gather => sub { $num_lines += $_[0] }, posix_exit => 1, use_slurpio => 1, user_func => sub { my ($mce, $slurp_ref, $chunk_id) = @_; open my $input_fh, '<', $slurp_ref; while (<$input_fh>) { my ($key, $count) = split /\t/; my $idx = xxhash64($key, 0) % $NUM_MAPS; $TEMP{$idx}{$key} += $count; $NUM_LINES++; } close $input_fh; }, user_end => sub { for my $idx (keys %TEMP) { $MM[$idx]->lock_exclusive; my $db = open_db($idx, SQLITE_OPEN_NOMUTEX); $db->{dbh}->begin_work; # while (my ($key, $count) = each %{ $TEMP{$idx} }) { # my $val = $db->FETCH($key); # defined($val) # ? ( $db->{upd}->execute($val + $count, $key) ) # : ( $db->{ins}->execute($key, $count) ); # } # Faster, with UPSERT statement. while (my ($key, $count) = each %{ $TEMP{$idx} }) { $db->{ins}->execute($key, $count); } $db->{dbh}->commit; $MM[$idx]->unlock; undef $db->{dbh}; } MCE->gather($NUM_LINES); $NUM_LINES = 0, %TEMP = (); }, ); for my $fname (@ARGV) { warn("'$fname': Is a directory, skipping\n"), next if (-d $fname); warn("'$fname': No such file, skipping\n"), next unless (-f $fname +); warn("'$fname': Permission denied, skipping\n"), next unless (-r $ +fname); ++$process_count, $mce->process($fname) if (-s $fname); } $mce->shutdown; # reap workers printf {*STDERR} "get properties : %9.3f secs\n", time - $start1; exit unless $process_count; #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Pack data for sorting. # Each worker handles a unique DB. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ my $VAL_SIZE = length pack('l', 0); my $STR_SIZE = $KEY_SIZE + 1; # null-terminated my $PACK_SIZE = $STR_SIZE + $VAL_SIZE; my $FETCH_SIZE = $PACK_SIZE * 12000; sub pack_task { my ($mce, $seq_id, $chunk_id) = @_; my $db = open_db($seq_id, SQLITE_OPEN_NOMUTEX | SQLITE_OPEN_READON +LY); my $kv_pairs = ''; if ((my $num_rows = $db->SCALAR) > 0) { my $dbh = $db->{dbh}; my $sql = "select f_key, f_val from kv_store"; my $sth = $dbh->prepare($sql); $sth->execute; while (my $aref = $sth->fetchall_arrayref(undef, 4000)) { foreach (@{ $aref }) { $kv_pairs .= pack("lZ${STR_SIZE}", -$_->[1], $_->[0]); } } $mce->gather($num_rows, $kv_pairs); } } my ($start2, $unique, $data) = (time, 0, ''); # Spin up MCE workers to handle packing and output. $mce = MCE->new( max_workers => $NUM_THDS, chunk_size => 1, init_relay => 1, posix_exit => 1, user_func => sub { my $task = MCE->user_args->[0]; no strict 'refs'; $task->(@_); }, ); # Pack data for sorting. $mce->process({ user_args => [ 'pack_task' ], sequence => [ 0, $NUM_MAPS - 1 ], chunk_size => 1, gather => sub { $unique += $_[0]; $data .= $_[1]; }, }); printf {*STDERR} "pack properties : %9.3f secs\n", time - $start2; #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Output data by value descending, word ascending. #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~ # Get the next down value for integer division. sub divide_down { my ($dividend, $divisor) = @_; return int($dividend / $divisor) if $dividend % $divisor; return int($dividend / $divisor) - 1; } # Return a chunk of $data: manager responding to worker request. sub fetch_chunk { my ($seq_id) = @_; return substr($data, $seq_id * $FETCH_SIZE, $FETCH_SIZE); } # Worker task: unpack chunk and write directly to standard output. sub disp_task { my ($mce, $seq_id, $chunk_id) = @_; my ($output, $chunk) = ('', $mce->do('fetch_chunk', $seq_id)); while (length $chunk) { my ($val, $key) = unpack( "lZ$STR_SIZE", substr($chunk, 0, $PACK_SIZE, '') ); $output .= $key. "\t". -($val). "\n"; } MCE::relay { print $output; }; } if (length $data) { my $start3 = time; sort_packed "C$PACK_SIZE", $data; printf {*STDERR} "sort packed data : %9.3f secs\n", time - $start3 +; my $start4 = time; $mce->process({ user_args => [ 'disp_task' ], sequence => [ 0, divide_down(length($data), $FETCH_SIZE) ], chunk_size => 1, }); printf {*STDERR} "write stdout : %9.3f secs\n", time - $start4 +; } $mce->shutdown; # reap workers @MM = (); printf {*STDERR} "total time : %9.3f secs\n", time - $start1; printf {*STDERR} " count lines : %lu\n", $num_lines; printf {*STDERR} " count unique : %lu\n", $unique;

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://11153401]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others surveying the Monastery: (1)
As of 2025-02-09 14:15 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Which URL do you most often use to access this site?












    Results (96 votes). Check out past polls.