Beefy Boxes and Bandwidth Generously Provided by pair Networks
more useful options
 
PerlMonks  

Comment on

( #3333=superdoc: print w/ replies, xml ) Need Help??
package IGSP::GoogleAgent; use FindBin; use YAML::Any qw/LoadFile/; use Net::Google::Spreadsheets; use Net::SMTP::TLS; use IO::CaptureOutput qw/capture/; use Sys::Hostname; use Moose; use Carp; our $VERSION = '0.01'; sub BUILD { my $self = shift; my @required_key_fields = grep { $self->config->{key_fields}->{$_} +->{required} } keys %{$self->config->{key_fields}}; die ("Your configuration must have at least one required key_field +s key!\n") unless (@required_key_fields); foreach my $required_query_field (@required_key_fields) { croak ("You must provide a bind_key_fields ${required_query_fi +eld} key - value pair!\n") unless ($self->bind_key_fields->{$required_query_field}); } } has 'bind_key_fields' => ( is => 'ro', isa => 'HashRef', required => 1 ); has 'agent_name' => ( is => 'ro', isa => 'Str', required => 1 ); has 'page_name' => ( is => 'ro', required => 1 ); has 'prerequisites' => ( is => 'ro', isa => 'ArrayRef' ); has 'debug' => ( is => 'ro', isa => 'Bool' ); has 'max_selves' => ( is => 'ro', isa => 'Int' ); has 'subsumed_by' => ( is => 'ro', isa => 'HashRef' ); has 'config_file' => ( is => 'ro', isa => 'Str', ); has 'config' => ( is => 'ro', builder => '_build_config' ); has 'google_db' => ( is => 'ro', builder => '_build_google_db', lazy => 1, # depends on config init_arg => undef # google_db cannot be overridden ); #### BUILDERS sub _build_config { my $self = shift; my $config_file = $self->config_file || $FindBin::Bin.'/../config/ +agent.conf.yml'; croak "Config ${config_file} not found!\n" unless (-e $config_file +); return YAML::Any::LoadFile($config_file); } sub _build_google_db { my $self = shift; my $service = Net::Google::Spreadsheets->new( username => $self->co +nfig->{guser}, password => $self->co +nfig->{gpass}, ); return $service->spreadsheet({ title => $self->config->{spreadsheet_name} }); } #### METHODS around 'run_my' => sub { my ($orig, $self, @args) = @_; if ($self->debug) { return $self->$orig(@args); } else { my $capture_output; my $no_problems = capture { my $ret; eval { $ret = $self->$orig(@args); }; if ($@) { print STDERR $@; return; } return $ret; } \$capture_output, \$capture_output; $self->mail_error($capture_output) unless ($no_problems); return $no_problems; } }; sub run_my { my ($self, $agent_code) = @_; return 1 if ($self->is_subsumed); my $entry = $self->run_entry(); return unless ($entry); return 1 if ($entry->{'not_runnable'}); # this is one that is not +ready, already running, or already run my ($success, $update_entry) = $agent_code->($entry->content); if ($success) { $self->complete_entry($update_entry); return 1; } else { $self->fail_entry($update_entry); return; } } sub is_subsumed { my $self = shift; return unless ($self->max_selves || $self->subsumed_by); # nothing + to subsume here my $subsumed; my %running_subsumers; my $subsume_opened = open (my $subsuming_in, '-|', 'ps', '-eo', 'p +id,command'); unless ($subsume_opened) { print STDERR "Couldnt check subsumption $!\n"; return 1; # subsume to be safe } SUBIN: while (my $in = <$subsuming_in>) { next if ($in =~ m/emacs|vi|screen|SCREEN/); # skip editing and + screen next if ($in =~ m/\s*$$/); # skip this agent next if ($in =~ m/(\[|\])/); # skip daemons my $self_name = $self->agent_name; if ($self->max_selves && $in =~ m/$self_name/) { $running_subsumers{$self->agent_name}++; if ($running_subsumers{$self->agent_name} == $self->max_se +lves) { print STDERR "max_selves limit reached\n"; $subsumed = 1; last SUBIN; } } if ($self->subsumed_by) { foreach my $subsumer (keys %{$self->subsumed_by}) { if ($in =~ m/$subsumer/) { $running_subsumers{$subsumer}++; if ($running_subsumers{$subsumer} == $self->subsum +ed_by->{$subsumer}) { print STDERR "subsumed by ${subsumer}\n"; $subsumed = 1; last SUBIN; } } } } } close $subsuming_in; return $subsumed; } sub get_entry { my $self = shift; my $entry; my $worksheet = $self->google_db->worksheet({ title => $self->page_name }); # note, the Google Spreadsheet Data API does supply an sq query op +erator # which could be used here, but, as of 0.04 of Net::Google::Spread +sheets # this did not prove to be reliable during the tests. This may be + # a limitation of the Google API rather than Net::Google::Spreadsh +eets # as it appeared that Net::Google::Spreadsheets was submitting val +id, # url encoded queries that the Google system rejected. Instead thi +s software # conducts a full table scan to ensure the correct row is returned if ($worksheet) { my @rows = $worksheet->rows(); ROW: foreach my $row (@rows) { ARG: foreach my $arg (keys %{$self->config->{key_fields}}) + { next ARG if ( !($self->config->{key_fields}->{$arg}->{r +equired}) && !($self->bind_key_fields->{$arg}) ); # skip args that are not required and +not bound next ROW unless ($row->content->{$arg} eq $self->bind_ +key_fields->{$arg}); } $entry = $row; last ROW; } } return $entry; } # this call initiates a race resistant attempt to make sure that there + is only 1 clear 'winner' among N potential # agents attempting to run the same goal on the same spreadsheet agent +'s cell sub run_entry { my $self = shift; my $entry = $self->get_entry(); my $output = ''; foreach my $bound_arg (keys %{$self->bind_key_fields}) { next if (!($self->config->{key_fields}->{$bound_arg}) && !($se +lf->bind_key_fields->{$bound_arg})); $output .= join(' ', $bound_arg, $self->bind_key_fields->{$bou +nd_arg})." "; } unless ($entry) { print STDERR $output." is not supported on ".$self->page_name. +"\n"; return; } unless ($entry->content->{ready}) { print STDERR $output." is not ready to run ".$self->agent_name +."\n"; return {'not_runnable' => 1}; } if ($entry->content->{$self->agent_name}) { my ($status, $running_hostname) = split /\:/, $entry->content- +>{$self->agent_name}; if ($status eq 'r') { print STDERR $output." is already running ".$self->agent_n +ame." on ${running_hostname}\n"; return {'not_runnable' => 1}; } if ($status == 1) { print STDERR $output." has already run ".$self->agent_name +."\n"; return {'not_runnable' => 1}; } if ($status eq 'F') { print STDERR $output." has already Failed ".$self->agent_n +ame." on a previous run and must be investigated on ${running_hostnam +e}\n"; return {'not_runnable' => 1}; } } if ($self->prerequisites) { foreach my $prereq_field (@{$self->prerequisites}) { unless ($entry->content->{$prereq_field} == 1) { print STDERR $output." has not finished ${prereq_field +}\n"; return {'not_runnable' => 1}; } } } my $content = $entry->content; # first attempt to set the hostname of the machine as the value of + the agent my $hostname = Sys::Hostname::hostname; $content->{$self->agent_name} = 'r:'.$hostname; eval { $entry->content($content); }; if ($@) { # this is a collision, which is to be treated as if it is not +runnable print STDERR $output." lost ".$self->agent_name." on ${hostnam +e}\n"; return {'not_runnable' => 1}; } sleep 3; my $nentry; eval { $nentry = $self->get_entry(); }; if ($@) { # this is a collision, which is to be treated as if it is not +runnable print STDERR $output." lost ".$self->agent_name." on ${hostnam +e}\n"; return {'not_runnable' => 1}; } my $check = $nentry->content->{$self->agent_name}; my ($status, $running_hostname) = split /\:/, $check; return $nentry if ($hostname eq $running_hostname); print STDERR $output." lost ".$self->agent_name." on ${hostname}\n +"; return {'not_runnable' => 1}; } sub fail_entry { my $self = shift; my $update_entry = shift; my $entry = $self->get_entry(); my $hostname = Sys::Hostname::hostname; my $content = $entry->content; if ($update_entry) { print STDERR "Updating entry\n"; foreach my $key (keys %{$update_entry}) { $content->{$key} = $update_entry->{$key}; } } $content->{$self->agent_name} = 'F:'.$hostname; $entry->content($content); } sub complete_entry { my $self = shift; my $update_entry = shift; print STDERR "All Complete\n"; my $entry = $self->get_entry(); my $content = $entry->content; if ($update_entry) { print STDERR "Updating entry\n"; foreach my $key (keys %{$update_entry}) { $content->{$key} = $update_entry->{$key}; } } $content->{$self->agent_name} = 1; $entry->content($content); } sub mail_error { my ($self, $error) = @_; my $output = ''; foreach my $bound_arg (keys %{$self->bind_key_fields}) { $output .= join(' ', $bound_arg, $self->bind_key_fields->{$bou +nd_arg})." "; } my $prefix = join(' ', Sys::Hostname::hostname, $output, $self->ag +ent_name); eval { my $mailer = new Net::SMTP::TLS( 'smtp.gmail.com', Hello => 'smtp.gmail. +com', Port => 587, User => $self->confi +g->{guser}, Password => $self->conf +ig->{gpass}); $mailer->mail($self->config->{reply_email}); $mailer->to($self->config->{send_to}); $mailer->data; $mailer->datasend(join("\n", $prefix,$error)); $mailer->dataend; $mailer->quit; }; } 1; # End of IGSP::GoogleAgent __END__ =head1 NAME IGSP::GoogleAgent - A Distributed Agent System using Google Spreadshee +ts =head1 VERSION Version 0.01 =head1 SYNOPSIS use IGSP::GoogleAgent; my $google_agent = IGSP::GoogleAgent->new( agent_name => $goal, page_name => $google_page, debug => $debug, max_selves => $max, bind_key_fields => { 'foo' => 'this_particul +ar_foo' }, prerequisites => [ 'isitdone +', 'isthisone' ], subsumed_by => { 'someother_ +agent.pl' => 3, 'someother_ +process' => 1 } ); $google_agent->run_my(sub { print STDERR "THIS ONE PASSES!!!"; return 1; }); $google_agent->run_my(sub { print STDERR "THIS ONE FAILS AND EITHER + EMAILS OR PRINTS THIS ERROR TO STDERR (depending on debug)!!!"; return; }); $google_agent->run_my(sub { print STDERR "THIS ONE PASSES AND UPDAT +ES THE 'cool' field in the spreadsheet!!!"; return (1, {'cool' => 'really cool'}); }); =head1 DESCRIPTION IGSP::GoogleAgent is a framework for creating massively distributed +pipelines across many different servers, each using the same google spreadshee +t as a control panel. It is extensible, and flexible. It doesnt specify w +hat goals any pipeline should be working towards, or which goals are pre +requisites for other goals, but it does provide logic for easily defining these + relationships based on your own needs. It does this by providing a subsumption ar +chitecture, whereby many small, highly focused agents are written to perform spe +cific goals, and also know what resources they require to perform them. In addit +ion, it is designed from the beginning to support the creation of simple human- +computational workflows. =head1 CONFIGURATION Scripts which use IGSP::GoogleAgents must run in the 'agent_bin' dir +ectory within the same agent root as the 'config' directory where the agent +.conf.yaml file is contained. See that file for more details on what is config +ured. =head1 METHODS =head2 new This method constructs a new instance of an IGSP::GoogleAgent. An in +stance must specify its name, the name of the Worksheet within the spreadsheet th +at it is working off, and values for the required key_field(s) within the conf +iguration which will result in a single row being returned from the given sprea +dsheet. Optionally, you can specify an ArrayRef of prerequisite fields in the + spreadsheet which must be true before the agent can run, whether to print out deb +ug information to the terminal, or email the errors using the configured email only +on errors (default), the maximum number of agents of this name to allow to run on the give +n machine, and a HashRef of processes which, if a certain number are already run +ning on the machine, should cause the agent to exit without running. required: agent_name => Str page_name => Str bind_key_fields => HashRef { key_field_name => bound_value, ... } optional: prerequisites => [] debug => Bool max_selves => Int subsumed_by => { process_name => max_allowed, ... } This method will throw an exception if bind_key_fields are not supplied for required key_fields, as specified in the configuration. Also, there must be a field in the spreadsheet name for the agent_na +me. This field will be filled in with the status of the agent for a part +icular row, e.g. 1 for finished, r:hostname for running, or f:hostname for +failure. =head2 run_my This method takes a subroutine codeRef as an argument. It then chec +ks to determine if the agent needs to run for the given bind_key_field(s) specified +row (it must have a 1 in the 'ready' field for the row, and the agent_name field +must be empty), whether any prerequisite fields are true, whether the agent is subsu +med by something else running on the machine, and whether there are not already max_s +elves other instances of the agent running on the machine. If all of these are +true, it then attempts to fill its hostname into the field for the agent_name. If + it succeeds, it will then run the code_ref. If it does not succeed (such as if a +n instance running on another server already chose that job and won the field) +it exits. The coderef can do almost anything it wants to do, but it must retur +n one of the following: =over 3 =item return true This instructs IGSP::GoogleAgent to place a 1 (true) value in the fi +eld for the agent on the spreadsheet, signifying that it has been completed. =item return false This instructs IGSP::GoogleAgent to place F:hostname into the field +for the agent on the spreadsheet, signifying that it has failed. It will not run again f +or this job until the failure is cleared from the spreadsheet (by any other agent). =item return (true|false, HashRef) This does what returning true or false does, as well as allowing spe +cific fields in the spreadsheet to also be modified by the calling code. The HashRef sh +ould contain keys only for those fields to be updated (it should not attempt to update + the field for the agent_name itself, as this will be ignored). =back In addition, the coderef can print to STDOUT and STDERR. If the age +nt was instantiated in debug mode (true), it will print these to their normal destination. + If the agent was instantiated without debug mode (the default), STDOUT and STDERR are + captured, and, if the codeRef returned false, emailed to the address specified in the +configuration using the same google account that configures access to the google spreadsheet +. One thing the agent must try at all costs to avoid is dying during t +he subref (e.g. use eval for anything that you dont have control over). It should alway +s try to return one of the valid return states so that the spreadsheet status can be upd +ated correctly. =head2 agent_name This returns the name of the agent, in case it is needed by the calli +ng code for other reasons. =head2 debug This returns the debug state specified in the constructor. =head2 google_db This returns the actual Net::Google::Spreadsheet object used by the agent, in case other types of queries, or modifications need to be made that do not fit within this system. =head1 AUTHOR Darin London, C<< <darin.london at duke.edu> >> =head1 BUGS Please report any bugs or feature requests to C<bug-igsp-googleagent a +t rt.cpan.org>, or through the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue= +IGSP-GoogleAgent>. I will be notified, and then you'll automatically be notified of progress on your bug as I make changes. =head1 SUPPORT You can find documentation for this module with the perldoc command. perldoc IGSP::GoogleAgent You can also look for information at: =over 4 =item * RT: CPAN's request tracker L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=IGSP-GoogleAgent> =item * AnnoCPAN: Annotated CPAN documentation L<http://annocpan.org/dist/IGSP-GoogleAgent> =back =head1 SEE ALSO L<Net::Google::Spreadsheets> L<Moose> =head1 COPYRIGHT & LICENSE Copyright 2009 Darin London. This program is free software; you can redistribute it and/or modify i +t under the terms of either: the GNU General Public License as published by the Free Software Foundation; or the Artistic License. See http://dev.perl.org/licenses/ for more information. =head1 PerlMonks Nodes RFC: 798154 Code: 798311 =cut

In reply to Google Spreadsheet Distributed Agent System by dmlond

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post; it's "PerlMonks-approved HTML":



  • Posts are HTML formatted. Put <p> </p> tags around your paragraphs. Put <code> </code> tags around your code and data!
  • Read Where should I post X? if you're not absolutely sure you're posting in the right place.
  • Please read these before you post! —
  • Posts may use any of the Perl Monks Approved HTML tags:
    a, abbr, b, big, blockquote, br, caption, center, col, colgroup, dd, del, div, dl, dt, em, font, h1, h2, h3, h4, h5, h6, hr, i, ins, li, ol, p, pre, readmore, small, span, spoiler, strike, strong, sub, sup, table, tbody, td, tfoot, th, thead, tr, tt, u, ul, wbr
  • You may need to use entities for some characters, as follows. (Exception: Within code tags, you can put the characters literally.)
            For:     Use:
    & &amp;
    < &lt;
    > &gt;
    [ &#91;
    ] &#93;
  • Link using PerlMonks shortcuts! What shortcuts can I use for linking?
  • See Writeup Formatting Tips and other pages linked from there for more info.
  • Log In?
    Username:
    Password:

    What's my password?
    Create A New User
    Chatterbox?
    and the web crawler heard nothing...

    How do I use this? | Other CB clients
    Other Users?
    Others contemplating the Monastery: (6)
    As of 2015-07-30 04:08 GMT
    Sections?
    Information?
    Find Nodes?
    Leftovers?
      Voting Booth?

      The top three priorities of my open tasks are (in descending order of likelihood to be worked on) ...









      Results (269 votes), past polls