I have this problem. I need to take 70G~ of JSON data from a MySQL database and migrate it over to S3. To do so, I am attempting to fork off $n workers using Parallel::Runner, grab $n (most likely 5k at a time) records from the database in staggered fashion (such that child 1 grabs 1-5000, child 2 grabs 5001-10000, and so on.), and enqueues the data to SQS. This all runs fine except when I make the call to SQS.
As you can see here, without the SQS call, the script runs swimmingly (as best as I can tell, at least):
[6516] ID: 81523, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81525, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81529, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81536, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81537, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81538, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81541, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81545, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81546, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81547, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81548, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81550, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81554, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81560, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81564, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81582, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81583, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81585, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81592, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81593, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81597, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81600, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81601, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81604, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81605, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81606, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81622, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81623, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81629, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81631, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81636, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81637, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81638, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81641, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81642, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81643, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6514] ID: 80565, last seen 15001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81750, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6514] ID: 80580, last seen 15001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6517] ID: 37056, last seen 15001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81758, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6514] ID: 80604, last seen 15001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81644, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81759, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81647, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6517] ID: 37060, last seen 15001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81771, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81653, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6514] ID: 80614, last seen 15001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81775, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6514] ID: 80627, last seen 15001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6517] ID: 37081, last seen 15001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81780, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6514] ID: 80629, last seen 15001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81654, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81781, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81655, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81788, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81664, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81789, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6516] ID: 81665, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81792, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81793, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81794, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81816, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81817, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81826, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81835, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81836, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81840, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81842, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81844, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81846, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
[6515] ID: 81867, last seen 10001 at sbin/1off/enqueue-client-info.pl
+line 84.
When I add the SQS call however, each child processes the same item. It's almost like SQS is taking too long and the next child is created before the $last_seen variable is updated.
#!/usr/bin/env perl
use strict;
use warnings 'all';
use Amazon::SQS::Simple;
use Parallel::Runner;
use Getopt::Long;
use JSON::XS;
use Try::Tiny;
use Data::Dump qw(ddx);
use Time::HiRes 'gettimeofday';
use Time::Duration;
use DBIx::Connector;
GetOptions(
'queue=s' => \my $queue_name,
'max-workers=i' => \my $max_workers,
'chunk-size=i' => \my $page_size,
);
die "Usage: $0 --queue=<queue-name> --max-workers=<number> --chunk-siz
+e=<items> --json=file\n"
unless $queue_name && $max_workers; # && $json; # && $chunk_size &&
+$mime_type;
my $aqs = Amazon::SQS::Simple->new(
);
my $queue = $aqs->CreateQueue( $queue_name );
my $runner = Parallel::Runner->new( $max_workers );
my $started_chunks = 0;
my $json_xs = JSON::XS->new->utf8;
my $start_time = gettimeofday();
my $started_items = 0;
my $last_seen = 1;
my $dsn = $ENV{'DB_DSN'} || 'dbi:mysql:bp';
my $username = $ENV{'DB_USER'} || 'root';
my $password = $ENV{'DB_PASS'} || '*';
$page_size ||= 5000;
my $conn = DBIx::Connector->new($dsn, $username, $password, {
RaiseError => 1,
AutoCommit => 1,
});
my $rows;
for my $workerid (1..$max_workers) {
$runner->run( sub {
$started_items++;
my $diff = gettimeofday() - $start_time;
my $rate = $started_items / $diff;
warn "[$$] $started_items started in ", concise(duration($diff)),
+" = $rate/sec\n";
# warn "LAST SEEN $last_seen";
#warn "OFFSET $offset";
while ( $rows = $conn->run( sub {
$_->selectall_arrayref(qq{
SELECT s.standard_image_id, s.signature, s.client_info, b.
+name
FROM standard_images s
LEFT JOIN buckets b on s.bucket_id=b.bucket_id
WHERE s.has_client_info = 1
AND s.standard_image_id > ?
GROUP BY s.signature
ORDER BY s.standard_image_id
LIMIT ?
},{} ,
$last_seen,
$page_size
);
} )
){
try {
foreach my $image ( @{ $rows } ) {
my $line = encode_json {
id => $image->[0],
mime_type => 'application/json',
client_info => decode_json $image->[2],
signature => $image->[1],
bucket_name => $image->[3]
};
warn "[$$] ID: " . $image->[0] . ", last seen $last_seen";
sqs_do( $queue, 'SendMessage', $line );
}
} catch {
warn "ERROR\t$_\t$@";
};
$last_seen += $page_size;
}
});
}
$runner->finish;
sub sqs_do {
my ($queue, $action, @params) = @_;
my @rv;
my $attempts = 5;
ACTION: {
@rv = eval {
$queue->$action( @params );
}; if ( $@ ) {
if ( --$attempts > 0 ) {
select undef, undef, undef, 0.25;
redo ACTION;
}
else {
warn "Cannot $action( @params ) after 5 attempts. Giving up!\n
+$@";
}
}
}
return @rv;
}
Thanks in advance.