<?xml version="1.0" encoding="windows-1252"?>
<node id="358977" title="dhoss's scratchpad" created="2004-06-01 21:49:34" updated="2005-08-15 15:11:39">
<type id="182711">
scratchpad</type>
<author id="203787">
dhoss</author>
<data>
<field name="doctext">
&lt;h2&gt;program:&lt;/h2&gt;
&lt;c&gt;
#!/usr/bin/env perl

use strict;
use warnings 'all';
use Amazon::SQS::Simple;
use Parallel::Runner;
use Getopt::Long;
use JSON::XS;
use Try::Tiny;
use Data::Dump qw(ddx);
use Time::HiRes 'gettimeofday';
use Time::Duration;
use DBIx::Connector;

GetOptions(
  'queue=s'       =&gt; \my $queue_name,
  'max-workers=i' =&gt; \my $max_workers,
  'chunk-size=i'  =&gt; \my $page_size,
);

die "Usage: $0 --queue=&lt;queue-name&gt; --max-workers=&lt;number&gt; --chunk-size=&lt;items&gt; --json=file\n"
  unless $queue_name &amp;&amp; $max_workers; # &amp;&amp; $json; # &amp;&amp; $chunk_size &amp;&amp; $mime_type;

my $aqs = Amazon::SQS::Simple-&gt;new(

  );

my $queue = $aqs-&gt;CreateQueue( $queue_name );
my $runner    = Parallel::Runner-&gt;new( $max_workers );
my $started_chunks = 0;
my $json_xs = JSON::XS-&gt;new-&gt;utf8;
my $start_time = gettimeofday();
my $started_items = 0;
my $last_seen = 1;
my $dsn = $ENV{'DB_DSN'}       || 'dbi:mysql:bp';
my $username = $ENV{'DB_USER'} || 'root';
my $password = $ENV{'DB_PASS'} || '';
$page_size ||= 5000;
my $conn = DBIx::Connector-&gt;new($dsn, $username, $password, {
    RaiseError =&gt; 1,
    AutoCommit =&gt; 1,
});
my $rows;

for my $workerid (1..$max_workers) {
  
  my $offset = $last_seen + $workerid;
  warn "[$$] OFFSET $offset";
  $runner-&gt;run( sub {
    warn "[$$] OFFSET IN CHILD $offset";
    $started_items++;
    my $diff = gettimeofday() - $start_time;
    my $rate = $started_items / $diff;
    warn "[$$] $started_items started in ", concise(duration($diff)), " = $rate/sec\n";
    try {
     # warn "LAST SEEN $last_seen";
      #warn "OFFSET $offset";
        $rows = $conn-&gt;run( sub {
          $_-&gt;selectall_arrayref(qq{
              SELECT s.standard_image_id, s.signature, s.client_info, b.name
              FROM standard_images s
              LEFT JOIN buckets b on s.bucket_id=b.bucket_id
              WHERE s.has_client_info = 1
              AND s.standard_image_id &gt; ?
              GROUP BY s.signature
              ORDER BY s.standard_image_id
              LIMIT ?
            },{} ,
            $offset,
            $page_size
          );
          
        });
        foreach my $image ( @{ $rows }  ) {
            my $line = encode_json {
              id          =&gt; $image-&gt;[0], 
              mime_type   =&gt; 'application/json',
              client_info =&gt; decode_json $image-&gt;[2],
              signature   =&gt; $image-&gt;[1],
              bucket_name =&gt; $image-&gt;[3]
            };
            warn "[$$] ID: " . $image-&gt;[0];
            sqs_do( $queue, 'SendMessage', $line );
        
        }
    } catch {
      warn "ERROR\t$_\t$@";
    };
  });
  $last_seen += $page_size;
}
$runner-&gt;finish;

sub sqs_do {
  my ($queue, $action, @params) = @_;

  my @rv;

  my $attempts = 5;
  ACTION: {
    @rv = eval {
      $queue-&gt;$action( @params );
    }; if ( $@ ) {
      if ( --$attempts &gt; 0 ) {
        select undef, undef, undef, 0.25;
        redo ACTION;
      }
      else {
        warn "Cannot $action( @params ) after 5 attempts. Giving up!\n$@";
      }
    }
  }

  return @rv;
}

&lt;/c&gt;
&lt;h2&gt;Sample output&lt;/h2&gt;
&lt;c&gt;perl -I../common/lib sbin/1off/enqueue-client-info.pl --queue=ac-client-info --max-workers=4 
[18656] OFFSET 2 at sbin/1off/enqueue-client-info.pl line 50.
[18657] OFFSET IN CHILD 2 at sbin/1off/enqueue-client-info.pl line 52.
[18656] OFFSET 5003 at sbin/1off/enqueue-client-info.pl line 50.
[18657] 1 started in just now = 945.088778729157/sec
[18656] OFFSET 10004 at sbin/1off/enqueue-client-info.pl line 50.
[18656] OFFSET 15005 at sbin/1off/enqueue-client-info.pl line 50.
[18659] OFFSET IN CHILD 10004 at sbin/1off/enqueue-client-info.pl line 52.
[18659] 1 started in just now = 227.321229201669/sec
[18658] OFFSET IN CHILD 5003 at sbin/1off/enqueue-client-info.pl line 52.
[18658] 1 started in just now = 96.9557096625058/sec
[18660] OFFSET IN CHILD 15005 at sbin/1off/enqueue-client-info.pl line 52.
[18660] 1 started in just now = 91.8957100915823/sec
[18657] ID: 1185 at sbin/1off/enqueue-client-info.pl line 84.
[18658] ID: 37056 at sbin/1off/enqueue-client-info.pl line 84.
[18659] ID: 37056 at sbin/1off/enqueue-client-info.pl line 84.
[18660] ID: 37056 at sbin/1off/enqueue-client-info.pl line 84.
[18658] ID: 37060 at sbin/1off/enqueue-client-info.pl line 84.
[18659] ID: 37060 at sbin/1off/enqueue-client-info.pl line 84.
[18660] ID: 37060 at sbin/1off/enqueue-client-info.pl line 84.
[18657] ID: 1197 at sbin/1off/enqueue-client-info.pl line 84.
&lt;/c&gt;</field>
</data>
</node>
