Beefy Boxes and Bandwidth Generously Provided by pair Networks
Problems? Is your data what you think it is?

Callbacks in async code

by Adamba (Sexton)
on Jun 19, 2013 at 10:02 UTC ( #1039754=perlquestion: print w/replies, xml ) Need Help??
Adamba has asked for the wisdom of the Perl Monks concerning the following question:

I'm trying to make a Queue Manager that get jobs when files are created in a specific folder. I've created my code using AnyEvent so it's async.

My problem is, i'm trying to deliver a return value from the subroutines add_route, and del_route, using callbacks, but the AE::timer won't stop, and the value that the callback gets, won't saved in the variable $return_code. Where have I gone wrong?

#!/usr/bin/perl use strict; use warnings; use AnyEvent; use AnyEvent::Filesys::Notify; use Const::Fast; use DDP; use File::Basename; use File::Copy; use File::Slurp; use FindBin '$Bin'; use List::Util qw(first); use Regexp::Common qw(net); use v5.10.1; const my $true => 1; const my $false => 0; my $cv = AE::cv; my $jobs_folder_path = $Bin . '/jobs'; my $interval = 5; my $after = 10; my %jobs_folders = ( "new" => "$jobs_folder_path/new", "progress" => "$jobs_folder_path/progress", "failed" => "$jobs_folder_path/failed", ); my $notifier = AnyEvent::Filesys::Notify->new( dirs => [ $jobs_folders{'new'} ], interval => $interval, cb => sub { my (@events) = @_; for my $event (@events) { if ($event->is_created) { process_new_job($event->path); } } } ); my $timer = AE::timer $after, $interval, sub { my @files = read_dir($jobs_folders{'progress'}, prefix => $true); if (@files) { foreach my $file (@files) { my $file_name = basename($file); my $line = read_file($file); for ($file_name) { when (/add/) { my ($ip_address, $next_hop) = split(/ /, $line); my $return_code; my $cb = sub { my $ret_val = shift; $return_code = $ret_val; }; add_route($ip_address, $next_hop, $cb); print $return_code; #post_job_process($return_code, $file_name); } when (/del/) { my ($ip_address) = $line; my $return_code; my $cb = sub { my $ret_val = shift; $return_code = $ret_val; }; del_route($ip_address, $cb); print $return_code, "\n"; #post_job_process($return_code, $file_name); } } } } }; $cv->recv; sub process_new_job { my ($new_job) = shift; my $file_name = basename($new_job); move("$jobs_folders{'new'}/$file_name", "$jobs_folders{'progress'} +/$file_name"); } sub post_job_process { my ($return_code, $file_name) = @_; if ($return_code == $false) { move("$jobs_folders{'progress'}/$file_name", "$jobs_folders{'f +ailed'}/$file_name"); send_email(); } } sub send_email { print "Sending Email...\n"; } sub add_route { my ($ip_address, $next_hop, $cb) = @_; my $attempt = 0; my $sleep = 10; my $add_timer; $add_timer = AE::timer 0, $sleep, sub { if ($attempt++ >= 3) { undef $add_timer; $cb->($false); } print "$attempt. Adding Route $ip_address via $next_hop\n"; my @addresses = get_routing_table(); my ($comparable_ip) = $ip_address =~ /($RE{net}{IPv4})\/32$/; my $is_in_routing_table = first { $_->{'ip_address'} eq $compa +rable_ip } @addresses; if ($is_in_routing_table) { undef $add_timer; $cb->($true); } }; } sub del_route { my ($ip_address, $cb) = @_; my $attempt = 0; my $sleep = 10; my $delete_timer; $delete_timer = AE::timer 0, $sleep, sub { if ($attempt++ >= 3) { undef $delete_timer; $cb->($false); } print "$attempt. Deleting Route $ip_address\n"; my @addresses = get_routing_table(); my ($comparable_ip) = $ip_address =~ /^($RE{net}{IPv4})\/32/; my $is_in_routing_table = first { $_->{'ip_address'} eq $compa +rable_ip } @addresses; if (not $is_in_routing_table) { undef $delete_timer; $cb->($true); } }; } sub get_routing_table { #my @routing_table = `ip ro`; my @routing_table = ( ' dev lo proto kernel scope link src', ' via dev eth0 proto baba', ); my @ret_val; foreach my $line (@routing_table) { my ($ip_address, $next_hop) = $line =~ /^($RE{net}{IPv4}) via +($RE{net}{IPv4}) .*proto baba$/; if (defined ($ip_address) and defined ($next_hop)) { push @ret_val, { ip_address => $ip_address, next_hop => $n +ext_hop }; } } return @ret_val; }

Replies are listed 'Best First'.
Re: Callbacks in async code
by Corion (Pope) on Jun 19, 2013 at 12:11 UTC

    You seem to be confused about the order of execution here:

    ... my $return_code; my $cb = sub { my $ret_val = shift; $return_code = $ret_val; }; add_route($ip_address, $next_hop, $cb); print $return_code; #post_job_process($return_code, $file_name); ...

    add_route() will basically return immediately, and you print out $return_code immediately. What you seem to want is to print out $return_code in your callback instead.

    As an alternative to printing in the callback, you might want to ->send the return code using a condvar, and ->recv that somewhere up the call stack where you actually need it.

Re: Callbacks in async code
by choroba (Chancellor) on Jun 19, 2013 at 12:03 UTC
    Crossposted at StackOverflow. It is considered polite to inform about crossposting, so that people not attending both sites do not waste their efforts solving a problem already solved at the other end of the wires.
    لսႽ ᥲᥒ⚪⟊Ⴙᘓᖇ Ꮅᘓᖇ⎱ Ⴙᥲ𝇋ƙᘓᖇ

Log In?

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://1039754]
Approved by Happy-the-monk
and the monks are chillaxin'...

How do I use this? | Other CB clients
Other Users?
Others drinking their drinks and smoking their pipes about the Monastery: (4)
As of 2016-12-03 14:23 GMT
Find Nodes?
    Voting Booth?
    On a regular basis, I'm most likely to spy upon:

    Results (56 votes). Check out past polls.