#!/usr/bin/perl -w
use strict;
use warnings 'all';
use forks;
use forks::shared;
use Plack::Request;
use JSON::XS;
use ASP4::API;
my $api; BEGIN { $api = ASP4::API->new }
use BP::db::job;
use BP::db::job_list;
my @finished_jobs : shared = ( );
my $RUNNING : shared = 1;
# XXX: This subref right here is what I want to run - safely though -
# without clobbering any existing INT/TERM signal handlers (and
# without getting mine) clobbered as well!
$SIG{INT} = $SIG{TERM} = sub {
SCOPE: {
lock($RUNNING);
$RUNNING = 0;
};
# Wait until any remaining completed jobs are marked as updated:
map { $_->join } threads->list( threads::joinable );
exit(0);
};
# Mark finished records as completed in batches -
# makes things more efficient:
threads->create(sub {
while( $RUNNING )
{
if( @finished_jobs )
{
my @to_update;
SCOPE: {
lock(@finished_jobs);
@to_update = splice(@finished_jobs, 0, scalar(@finished_jobs))
+;
};
warn "About to update @{[ scalar(@to_update) ]} finished jobs...
+\n";
# Do the update in one fell swoop:
my $sth = BP::db::job_list->db_Main->prepare(<<"SQL");
update jobs set
is_completed = 1,
completed_on = now()
where job_id in ( @{[ join ", ", map { '?' } @to_update ]} )
SQL
$sth->execute( @to_update );
$sth->finish();
}# end if()
sleep(4);
}# end while()
});
# The main app itself:
my $app = sub {
my $env = shift;
my $req = Plack::Request->new( $env );
my %modes = map { $_ => $_ } qw(
get_jobs
mark_as_completed
);
my $mode = $modes{ $req->param('mode') }
or return [ 404, [ ], [ ] ];
my $s = bless { }, __PACKAGE__;
my $result = $s->$mode( $req )
or return [ 500, [ ], [ ] ];
return $result;
};
sub get_jobs
{
my ($s, $req) = @_;
my @jobs = ( );
my $dbh = BP::db::job->db_Main;
my $job_sth = $dbh->prepare(<<"SQL");
select *
from jobs
where job_list_id = ?
and (
(
is_started = 0
)
or
(
is_started = 1
and is_completed = 0
and started_on < date_add(now(), interval -30 minute)
)
)
limit 0, 40
SQL
$job_sth->execute(
$req->param('job_list_id')
);
@jobs = map { $_->as_hashref }
BP::db::job->sth_to_objects( $job_sth );
# Mark as 'is_started' any jobs we've found:
if( @jobs )
{
$dbh->do(<<"SQL");
update jobs set
is_started = 1,
started_on = now()
where job_id IN ( @{[ join ',', map { $_->{job_id} } @jobs ]} )
SQL
}# end if()
# Finally:
return [
200,
['content-type' => 'text/plain'],
[ encode_json( \@jobs ) ]
];
}# end get_jobs()
sub mark_as_completed
{
my ($s, $req) = @_;
my $job_ids = eval { decode_json( $req->param('job_ids') ) } || [ ];
SCOPE: {
lock(@finished_jobs);
push @finished_jobs, @$job_ids;
};
return [
200,
['content-type' => 'text/plain'],
['OK']
];
}# end mark_as_completed()