Hi 1nickt,
Nice example! MCE has helpful user_begin and user_end options. That's a great place for workers to connect and clean up respectively. Basically, one connects to the DB one time including preparing any statement inside user_begin. The user_end block is for calling finish on every statement handle prior to calling disconnect on the DBI handle.
Update: Added MCE interval option, stagger workers connecting to the DB.
Schema:
create table mytable(
field1 integer,
field2 varchar(24),
field3 varchar(24),
field4 varchar(24),
field5 varchar(24)
);
Perl:
use strict;
use warnings;
use Data::GUID;
use DBD::Pg;
use SQL::Abstract;
use Tie::Cycle;
use MCE::Loop;
my $sqla = SQL::Abstract->new;
my @cols = map {"field$_"} 1..5;
# https://www.cattlegrid.info/2006/06/13/write-no-more-sql-abstract-it
+.html
my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols });
my $sel_sql = $sqla->select('mytable', 'count(*)', { field2 => '' });
my $upd_sql = $sqla->update('mytable', { field2 => '' }, { field2 => '
+' });
#--------------------------------------------------------------------#
my $dsn = 'DBI:Pg:dbname=test_db;host=localhost;port=5432';
my ($dbh, $ins_sth, $sel_sth, $upd_sth);
MCE::Loop->init(
max_workers => 4,
interval => 0.125, # delay period for MCE->yield
user_begin => sub {
MCE->yield; # stagger workers connecting to the DB
$dbh = DBI->connect($dsn, $ENV{USER}, undef, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
$ins_sth = $dbh->prepare_cached($ins_sql);
$sel_sth = $dbh->prepare_cached($sel_sql);
$upd_sth = $dbh->prepare_cached($upd_sql);
},
user_end => sub {
$sel_sth->finish, $ins_sth->finish;
$dbh->disconnect;
},
);
mce_loop {
my ($mce, $chunk, $chunk_id) = @_;
for my $record( @{$chunk} ) {
$ins_sth->execute( @{$record} );
my $field2_old = $record->[1];
my $field2_new1 = Data::GUID->new->as_base64;
my $field2_new2 = Data::GUID->new->as_base64;
# update using a prepared statement
$upd_sth->execute( $field2_new1, $field2_old );
# update using the dbh handle
my ($query, @bind) = $sqla->update(
'mytable',
{ field2 => $field2_new2 },
{ field2 => $field2_new1 },
);
$dbh->do($query, undef, @bind);
# select records
$sel_sth->execute( $field2_new2 );
my ($count) = $sel_sth->fetchrow_array;
# count is 1 due to selecting field2 = $field2_new2
my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s',
$mce->wid, $chunk_id, $record->[0], $count;
MCE->say($msg);
}
} get_sample_data();
# ^^ do not pass @{ get_sample_data() } to mce_loop
# it will not work if @{ [ has 1 element ] }
# pass the array ref instead, MCE accepts it
MCE::Loop->finish;
#--------------------------------------------------------------------#
sub get_sample_data {
tie my $value1, 'Tie::Cycle', [ 40 .. 49 ];
return [ map {
[ $value1, map { Data::GUID->new->as_base64 } 0..3]
} 1..1000 ];
}
See also this post for a version using a shared DBI handle.
Regards, Mario