#!/usr/bin/env perl use strict; use warnings 'all'; use Amazon::SQS::Simple; use Parallel::Runner; use Getopt::Long; use JSON::XS; use Try::Tiny; use Data::Dump qw(ddx); use Time::HiRes 'gettimeofday'; use Time::Duration; use DBIx::Connector; GetOptions( 'queue=s' => \my $queue_name, 'max-workers=i' => \my $max_workers, 'chunk-size=i' => \my $page_size, ); die "Usage: $0 --queue= --max-workers= --chunk-size= --json=file\n" unless $queue_name && $max_workers; # && $json; # && $chunk_size && $mime_type; my $aqs = Amazon::SQS::Simple->new( ); my $queue = $aqs->CreateQueue( $queue_name ); my $runner = Parallel::Runner->new( $max_workers ); my $started_chunks = 0; my $json_xs = JSON::XS->new->utf8; my $start_time = gettimeofday(); my $started_items = 0; my $last_seen = 1; my $dsn = $ENV{'DB_DSN'} || 'dbi:mysql:bp'; my $username = $ENV{'DB_USER'} || 'root'; my $password = $ENV{'DB_PASS'} || '*'; $page_size ||= 5000; my $conn = DBIx::Connector->new($dsn, $username, $password, { RaiseError => 1, AutoCommit => 1, }); my $rows; for my $workerid (1..$max_workers) { $runner->run( sub { $started_items++; my $diff = gettimeofday() - $start_time; my $rate = $started_items / $diff; warn "[$$] $started_items started in ", concise(duration($diff)), " = $rate/sec\n"; # warn "LAST SEEN $last_seen"; #warn "OFFSET $offset"; while ( $rows = $conn->run( sub { $_->selectall_arrayref(qq{ SELECT s.standard_image_id, s.signature, s.client_info, b.name FROM standard_images s LEFT JOIN buckets b on s.bucket_id=b.bucket_id WHERE s.has_client_info = 1 AND s.standard_image_id > ? GROUP BY s.signature ORDER BY s.standard_image_id LIMIT ? },{} , $last_seen, $page_size ); } ) ){ try { foreach my $image ( @{ $rows } ) { my $line = encode_json { id => $image->[0], mime_type => 'application/json', client_info => decode_json $image->[2], signature => $image->[1], bucket_name => $image->[3] }; warn "[$$] ID: " . $image->[0] . ", last seen $last_seen"; sqs_do( $queue, 'SendMessage', $line ); } } catch { warn "ERROR\t$_\t$@"; }; $last_seen += $page_size; } }); } $runner->finish; sub sqs_do { my ($queue, $action, @params) = @_; my @rv; my $attempts = 5; ACTION: { @rv = eval { $queue->$action( @params ); }; if ( $@ ) { if ( --$attempts > 0 ) { select undef, undef, undef, 0.25; redo ACTION; } else { warn "Cannot $action( @params ) after 5 attempts. Giving up!\n$@"; } } } return @rv; }