#!/usr/bin/perl -w use strict; use warnings 'all'; use forks; use forks::shared; use Plack::Request; use JSON::XS; use ASP4::API; my $api; BEGIN { $api = ASP4::API->new } use BP::db::job; use BP::db::job_list; my @finished_jobs : shared = ( ); my $RUNNING : shared = 1; # XXX: This subref right here is what I want to run - safely though - # without clobbering any existing INT/TERM signal handlers (and # without getting mine) clobbered as well! $SIG{INT} = $SIG{TERM} = sub { SCOPE: { lock($RUNNING); $RUNNING = 0; }; # Wait until any remaining completed jobs are marked as updated: map { $_->join } threads->list( threads::joinable ); exit(0); }; # Mark finished records as completed in batches - # makes things more efficient: threads->create(sub { while( $RUNNING ) { if( @finished_jobs ) { my @to_update; SCOPE: { lock(@finished_jobs); @to_update = splice(@finished_jobs, 0, scalar(@finished_jobs)); }; warn "About to update @{[ scalar(@to_update) ]} finished jobs...\n"; # Do the update in one fell swoop: my $sth = BP::db::job_list->db_Main->prepare(<<"SQL"); update jobs set is_completed = 1, completed_on = now() where job_id in ( @{[ join ", ", map { '?' } @to_update ]} ) SQL $sth->execute( @to_update ); $sth->finish(); }# end if() sleep(4); }# end while() }); # The main app itself: my $app = sub { my $env = shift; my $req = Plack::Request->new( $env ); my %modes = map { $_ => $_ } qw( get_jobs mark_as_completed ); my $mode = $modes{ $req->param('mode') } or return [ 404, [ ], [ ] ]; my $s = bless { }, __PACKAGE__; my $result = $s->$mode( $req ) or return [ 500, [ ], [ ] ]; return $result; }; sub get_jobs { my ($s, $req) = @_; my @jobs = ( ); my $dbh = BP::db::job->db_Main; my $job_sth = $dbh->prepare(<<"SQL"); select * from jobs where job_list_id = ? and ( ( is_started = 0 ) or ( is_started = 1 and is_completed = 0 and started_on < date_add(now(), interval -30 minute) ) ) limit 0, 40 SQL $job_sth->execute( $req->param('job_list_id') ); @jobs = map { $_->as_hashref } BP::db::job->sth_to_objects( $job_sth ); # Mark as 'is_started' any jobs we've found: if( @jobs ) { $dbh->do(<<"SQL"); update jobs set is_started = 1, started_on = now() where job_id IN ( @{[ join ',', map { $_->{job_id} } @jobs ]} ) SQL }# end if() # Finally: return [ 200, ['content-type' => 'text/plain'], [ encode_json( \@jobs ) ] ]; }# end get_jobs() sub mark_as_completed { my ($s, $req) = @_; my $job_ids = eval { decode_json( $req->param('job_ids') ) } || [ ]; SCOPE: { lock(@finished_jobs); push @finished_jobs, @$job_ids; }; return [ 200, ['content-type' => 'text/plain'], ['OK'] ]; }# end mark_as_completed()