#!/usr/bin/perl # http://perlmonks.org/?node_id=1167900 use Async::Tiny; # http://72.211.166.141:8080/async-tiny.tgz use Curses; use strict; use warnings; my $t = Async::Tiny->new; $t->addReadCallback( *STDIN, sub { die } ); $t->changeReadMode( *STDIN, 'character' ); $t->addRepeatCallback( 0.100, \&counter, 5, [0] ); $t->addRepeatCallback( 0.110, \&counter, 15, [1000] ); sub counter { my ($pos, $statearrayref) = @_; move $pos, 5; addstr ++$statearrayref->[0]; refresh; } initscr(); clear; curs_set 0; box( ACS_VLINE, ACS_HLINE ); eval { $t->eventloop }; endwin(); #### package Async::Tiny; ################################################## use Time::HiRes qw(time); use IO::Ppoll qw( POLLIN POLLOUT ); #use IO::Poll qw( POLLIN POLLOUT ); # if you don't have IO::Ppoll use IO::Socket; use warnings; use strict; my (@timerqueue, %reads, %writes, %listens, $poll, $wait); my $udp = 17; # or not :) sub isudp { "$_[0]" =~ /IO::Socket/ && $_[0]->protocol == $udp } sub new { $poll = IO::Ppoll->new; #$poll = IO::Poll->new; my $self = shift; bless { }, ref $self || $self; } sub status { my $t = @timerqueue; my $r = keys %reads; my $l = keys %listens; my $w = keys %writes; my $h = $poll->handles; return "timers: $t reads: $r listens: $l writes: $w handles: $h\n"; } sub addWaitCallback { (undef, $wait) = @_ } sub addReadCallback { my ($self, $handle, $callback, @args) = @_; $poll->mask($handle => POLLIN | $poll->mask($handle)); # add read $reads{$handle} = { callback => $callback, args => \@args, mode => '', ( isudp($handle) ? (packets => []) : (data => '')) }; } sub changeReadMode { my ($self, $handle, $mode) = @_; $reads{$handle}{mode} = $mode; } sub addListenCallback { my ($self, $socket, $callback, @args) = @_; if( defined $callback ) { ref $socket or $socket = { ($socket =~ /:/ ? 'LocalAddr' : 'LocalPort') => $socket }; # scalar is port if( ref $socket eq 'HASH' ) { my %args = ( Listen => 10, Reuse => 1, %$socket ); # add defaults $socket = IO::Socket::INET->new(%args) or die $@; } $poll->mask($socket => POLLIN | $poll->mask($socket)); # add read $listens{$socket} = { callback => $callback, args => \@args}; } else { $poll->remove($socket); delete $listens{$socket}; } } sub queue # binary insertion into timer queue { for my $event ( @_ ) { my $time = $event->[0]; my $low = 0; my $high = @timerqueue; my $mid; $timerqueue[$mid = $low + $high >> 1][0] <= $time ? ($low = $mid + 1) : ($high = $mid) while $low < $high; splice @timerqueue, $low, 0, $event; } } sub repeater { my ($delay, $callback, @args) = @_; ($callback->(@args) // '') =~ /end *repeat/i or queue [ $delay + time, \&repeater, $delay, $callback, @args ]; } sub addRepeatCallback { my ($self, $delay, $callback, @args) = @_; queue [ $delay + time, \&repeater, $delay, $callback, @args ]; } sub addDelayCallback { my ($self, $delay, $callback, @args) = @_; queue [ $delay + time, $callback, @args ]; } sub write { my ($self, $handle, @data) = @_; for my $fh (ref $handle eq 'ARRAY' ? @$handle : $handle) { if( isudp($fh) ) { $writes{$fh} ||= {packets => []}; push @{ $writes{$fh}{packets} }, @data; @{ $writes{$fh}{packets} } and $poll->mask($fh => POLLOUT | $poll->mask($fh)); # add out } else { $writes{$fh} ||= {data => ''}; $writes{$fh}{data} .= join '', @data; $writes{$fh}{shutdown} = @data == 0 && "$fh" =~ /IO::Socket/; #length $writes{$fh}{data} and $poll->mask_add($fh => POLLOUT); # add out length $writes{$fh}{data} and $poll->mask($fh => POLLOUT | $poll->mask($fh)); # add out } } } sub addErrorCallback { my ($self, $handle, $callback, @args) = @_; if( exists $writes{$handle} ) { $writes{$handle}{error} = $callback; $writes{$handle}{errorargs} = \@args; } } sub mtimecheck { my ($interval, $file, $oldtime, $callback, @args) = @_; my $mtime = (stat $file)[9] || 0; my $return = $mtime && $oldtime < $mtime && $callback->(@args); defined $return && $return =~ /end *repeat/i or queue [ $interval + time, \&mtimecheck, $interval, $file, $mtime, $callback, @args ]; } sub addMtimeCallback { my ($self, $interval, $file, $callback, @args) = @_; my $mtime = (stat $file)[9]; queue [ $interval + time, \&mtimecheck, $interval, $file, $mtime, $callback, @args ]; } sub eventloop ######################################################### { while( @timerqueue || %reads || %writes || %listens ) { my $waitfor = @timerqueue ? $timerqueue[0][0] - time : 1e6; $waitfor < 0 and $waitfor = 0; defined $wait and $waitfor > 0 and $wait->(); ############################################################ my $cnt = $poll->poll($waitfor); # wait for next queued time ############################################################ if( $cnt > 0 ) { for my $fh ( $poll->handles(POLLOUT) ) # ready writes { if( !defined $fh or not exists $writes{$fh} ) { warn "leftover write handle"; #$poll->mask_del($fh => POLLOUT); # del write $poll->mask($fh => ~POLLOUT & $poll->mask($fh)); # del write } elsif( isudp($fh) ) { if( @{ $writes{$fh}{packets} } ) { $fh->send( shift @{ $writes{$fh}{packets} } ) or die "send error $!"; } else { #$poll->mask_del($fh => POLLOUT); # del write $poll->mask($fh => ~POLLOUT & $poll->mask($fh)); # del write delete $writes{$fh}; # no packet to write } } else # tcp or other { my $have = length $writes{$fh}{data}; if(defined $have and $have > 0) { my $len = syswrite $fh, $writes{$fh}{data}; if( not defined $len ) { if( $writes{$fh}{error} ) { $writes{$fh}{error}->("$!", @{ $writes{$fh}{errorargs} }); } else { warn "write error $!"; } } elsif( $len == $have ) { $writes{$fh}{shutdown} and $fh->shutdown(1); #$poll->mask_del($fh => POLLOUT); # del write $poll->mask($fh => ~POLLOUT & $poll->mask($fh)); # del write delete $writes{$fh}; # all data has been written } elsif( $len > 0 ) { substr $writes{$fh}{data}, 0, $len, ''; } else { die "zero length write"; } } else { $writes{$fh}{shutdown} and $fh->shutdown(1); #$poll->mask_del($fh => POLLOUT); # del write $poll->mask($fh => ~POLLOUT & $poll->mask($fh)); # del write delete $writes{$fh}; # no data to write #die "had no data to write"; } } } for my $fh ( $poll->handles(POLLIN) ) # ready reads { if( not (exists $reads{$fh} || exists $listens{$fh}) ) { warn "leftover read handle"; $poll->mask($fh => 0); } elsif( isudp($fh) ) { $fh->recv(my $packet, 1500); $reads{$fh}{callback}->( $packet, @{ $reads{$fh}{args} } ); } elsif( exists $listens{$fh} ) # new tcp connection { $listens{$fh}{callback}-> (scalar($fh->accept), @{ $listens{$fh}{args} } ); } elsif(sysread($fh, $reads{$fh}{data}, 8192, length $reads{$fh}{data})) { if( $reads{$fh}{mode} =~ /^char/i ) { $reads{$fh}{callback}->( $reads{$fh}{data}, @{ $reads{$fh}{args} } ); $reads{$fh}{data} = ''; } elsif( $reads{$fh}{mode} =~ /^full/i ) { # no callbacks until eof } else { $reads{$fh}{callback}->( $1, @{ $reads{$fh}{args} } ) while $reads{$fh}{data} =~ s/(.*\n)//; } } else # end of file { $reads{$fh}{callback}->( $reads{$fh}{mode} =~ /^full/i ? $reads{$fh}{data} : '', @{ $reads{$fh}{args} } ); $poll->remove($fh); delete $reads{$fh}; delete $writes{$fh}; # dump output if EOF } } } $waitfor = @timerqueue ? $timerqueue[0][0] - time : 1e6; if($waitfor <= 0) # a timer has expired { my (undef, $callback, @args) = @{ shift @timerqueue }; $callback->(@args); } } } 1; # return true for this module __END__ =head1 NAME Async::Tiny - Tiny? async eventloop module mostly for example purposes =head1 SYNOPSIS use Async::Tiny; my $tiny = Async::Tiny->new; $tiny->addReadCallback( *STDIN, sub { print "input: @_" } ); $tiny->addDelayCallback( 10, sub { print "stop\n" } ); $tiny->addDelayCallback( 0, sub { print "start\n" } ); $tiny->eventloop; =head1 DESCRIPTION Async::Tiny implements a simple "select" based async/callback style kernel (sort of like POE or AnyEvent, maybe) that handles the messy work and lets the user just define callbacks to handle his special needs. =head1 METHODS =over 4 =item new Constructor. =item addDelayCallback ( DELAY, CALLBACK, ARGS ) Add a timer to expire after DELAY seconds. CALLBACK(ARGS) =item addRepeatCallback ( INTERVAL, CALLBACK, ARGS ) Add a timer that does callback every INTERVAL seconds. CALLBACK(ARGS) =item addReadCallback ( HANDLE, CALLBACK, ARGS ) Add a callback for each line of text (but see below) input on HANDLE. CALLBACK(LINE, ARGS) =item changeReadMode ( HANDLE, MODE ) There are three read modes: 'character' - return all new characters 'linebyline' - return each "\n" terminated line (default) 'full' - only return once at EOF, with complete buffer =item addListenCallback ( HANDLEorPORT, CALLBACK, ARGS ) Add a callback for new connections on a listen socket. CALLBACK(NEWCONNECTION, ARGS) =item write ( HANDLE, DATA ) Queues DATA for writing to HANDLE. =item write ( [HANDLES], DATA ) Queues DATA for writing to all HANDLES in []. (Think IRCD) =item write ( HANDLES ) Perform HANDLE->shutdown(1) after all queued data is written. =item addWaitCallback ( CALLBACK ) Specify a callback to be called just before the select call. This can be used to update the display in a GUI. CALLBACK() =item eventloop Start the event loop processing. The event loop ends when no callbacks are left. =back =head1 EXAMPLE tinyirc.pl #!/usr/bin/perl # tinyirc.pl - simple tiny irc client Ppoll use Async::Tiny; use IO::Socket; use strict; my $t = Async::Tiny->new; sub sendline # works for both directions :) { my ($line, $to) = @_; $line eq '' ? exit : $t->write( $to, $line ); } my $s = IO::Socket::INET->new(shift // 'localhost:6667') or die $@; $t->addReadCallback( *STDIN, \&sendline, $s ); $t->addReadCallback( $s, \&sendline, *STDOUT ); $t->eventloop; =head1 EXAMPLE tinyircd.pl #!/usr/bin/perl # tinyircd.pl - minimal tiny ircd with Async::Tiny with Ppoll # multiple telnets or tinyirc.pl to it, echos to the others (*very* basic ircd) use Async::Tiny; use strict; my %clients; my $t = Async::Tiny->new; for my $port (@ARGV ? @ARGV : 6667) { $t->addListenCallback($port, sub { my ($socket) = @_; $clients{$socket} = $socket; $t->addReadCallback($socket, \&clientline, $socket ); }); $t->addDelayCallback(0, sub{print "listening on $port...\n"} ); } $t->addRepeatCallback( 600, sub { print localtime() . " clients: @{[ scalar keys %clients, times ]}\n"}); $t->eventloop; sub clientline { my ($line, $me) = @_; if( $line eq '' ) { delete $clients{$me}; # was closed } else { $t->write( [grep $me != $_, values %clients], $line); } } =head1 EXAMPLE math.pl #!/usr/bin/perl # # math.pl - a math server using Async::Tiny Ppoll use Async::Tiny; use strict; my $t = Async::Tiny->new; $t->addListenCallback( shift // 8081, sub { my ($socket) = @_; $t->addReadCallback( $socket, sub { my ($line) = @_; tr#0-9()*/+-# #c for $line; # strip non-math for safety :) $t->write( $socket, eval $line // "$line is an invalid expression $@", "\n"); }); }); $t->eventloop; =head1 EXAMPLE key.pl #!/usr/bin/perl # # key.pl - read single key-at-a-time with Async::Tiny Ppoll use Term::ReadKey; use Async::Tiny; use Data::Dump; use strict; $| = 1; my $t = Async::Tiny->new; $t->addReadCallback( *STDIN, sub { my ($string) = @_; dd $string; #printf "%vd\n", $string; $string =~ /^[q\e]$/i and die "quit character entered\n"; }); $t->changeReadMode( *STDIN, 'character' ); $t->addListenCallback( 8081, sub { $t->addReadCallback( $_[0], sub { print @_ }); }); my $count = 0; $t->addRepeatCallback( 60, sub { print ++$count, " tick\n" }); ReadMode 'raw'; eval { $t->eventloop }; my $answer = $@; ReadMode 'restore'; print $answer; =head1 EXAMPLE curses.pl #!/usr/bin/perl # # curses.pl - client using Async::Tiny Ppoll use Curses; use Term::ReadKey; use Async::Tiny; use IO::Socket; use Data::Dump qw(pp dd); use strict; $| = 1; my @lines; my $input = ''; my ($width, $height) = GetTerminalSize; my $t = Async::Tiny->new; $t->addWaitCallback( sub { ($width, $height) = GetTerminalSize; my $row = 0; for ( @lines = ( ('') x $height, @lines )[2 - $height .. -1], # top part '#' x ($width - 25) . ' ' . localtime(), # divider line $input) # bottom line { addstr($row++, 0, substr $_, 0, $width); clrtoeol; } refresh; }); my $s = IO::Socket::INET->new(shift // 'arch:8081') or die $@; $t->addReadCallback( $s, sub { my ($line) = @_; $line eq '' and die "server closed connection\n"; push @lines, "< $line" =~ s/\n//r; }); $t->addReadCallback( *STDIN, sub { my ($string) = @_; #dd $string; for ($string =~ /\e(?:\[M...|[O\[][0-9;]*[A-~])|./gs ) # keep esc seq together { /^[ -~]\z/ ? $input .= $_ : /^[\cc\cd\e]\z/ ? die "quit character @{[ pp $_ ]} entered\n" : /^[\010\177]\z/ ? chop $input : /^[\r\n]\z/ ? do { if( length $input ) { push @lines, " > $input"; $t->write( $s, $input, "\n" ); $input = ''; } } : do { $input .= pp $_ }; $input &= "\xff" x ($width - 3); # trim to size } }); $t->changeReadMode( *STDIN, 'character' ); $t->addRepeatCallback( 1, sub { }); # for incrementing clock $t->addRepeatCallback( 3600, sub { push @lines, ' ' . localtime() } ); initscr(); clear; ReadMode 'cbreak'; mousemask(BUTTON1_CLICKED, my $oldmask); eval { $t->eventloop }; my $errormsg = $@; ReadMode 'restore'; endwin(); print $errormsg; =head1 EXAMPLE animate.pl #!/usr/bin/perl # # animate.pl - spin things Ppoll use Async::Tiny; use strict; $| = 1; my $t = Async::Tiny->new; my $test = shift // 5; my $count = 0; if($test == 1 || $test > 4) { print "test 1 - auto repeat\n"; $t->addRepeatCallback( 0.05, sub { print "\r", qw(- \\ | /)[$count % 4]; ++$count < 60 or 'endrepeat'; }); } if($test == 2 || $test > 4) { print "test 2 - chained delays\n"; sub tick { print "\r", qw(- \\ | /)[$count % 4]; ++$count < 60 and $t->addDelayCallback( .05, \&tick ); } $t->addDelayCallback( 0, \&tick ); } if($test == 3 || $test > 4) { print "test 3 - queue all up beforehand\n"; for (0..59) { my $char = qw(- \\ | /)[$_ % 4]; $t->addDelayCallback( $_ / 20, sub { print "\r$char" }); } } if($test == 4 || $test > 4) { print "test 4 - dual auto repeat\n"; my $secondcount = 0; $t->addRepeatCallback( 0.12, sub { print "\r", map qw(- \\ | /)[$_ % 4], $count, $secondcount; ++$count < 3/.12 or 'endrepeat'; }); $t->addRepeatCallback( 0.05, sub { print "\b", qw(- \\ | /)[$secondcount % 4]; ++$secondcount < 3/.05 or 'endrepeat'; }); } $t->eventloop; print "\n"; =head1 EXAMPLE bars.pl #!/usr/bin/perl # # bars.pl - multiple independent timers using Async::Tiny Ppoll use Curses; use Term::ReadKey; use Async::Tiny; use strict; $| = 1; my ($width, $height) = GetTerminalSize; my @lines = ( '-' x $width ) x $height; my $t = Async::Tiny->new; $t->addWaitCallback( sub { my $row = 0; for ( @lines ) { addstr($row++, 0, substr $_, 0, $width); clrtoeol; } refresh; }); for my $line (@lines) { $t->addRepeatCallback( (3 + rand 20) / 50 , sub { s/-/#/ or s/#(?!#)/=/ or tr/=/-/ for $line; (times)[0] + rand() > 2 and 'end repeat'; } ); } initscr(); clear; $t->eventloop; endwin(); =head1 EXAMPLE echoall.pl #!/usr/bin/perl # Async::Tiny with Ppoll version of # https://blog.afoolishmanifesto.com/posts/concurrency-and-async-in-perl/ use experimental 'signatures'; use Async::Tiny; use strict; my %clients; my $t = Async::Tiny->new; $t->addListenCallback( 9935, sub ($socket) { $t->addReadCallback( $clients{$socket} = $socket, sub ($line, $me) { $t->write( $me, $line); $line eq '' and delete $clients{$me}; # was closed }, $socket ); }); $t->addRepeatCallback(5, sub {$t->write([ values %clients ], "ping!\n" )}); $t->addDelayCallback(0, sub { warn "ready on port 9935\n" }); #$t->addRepeatCallback(60, sub { warn $t->status }); $t->eventloop; =cut