Beefy Boxes and Bandwidth Generously Provided by pair Networks
good chemistry is complicated,
and a little bit messy -LW
 
PerlMonks  

Re^2: Weird Output with Threads and NCurses

by Anonymous Monk
on Jul 18, 2016 at 15:19 UTC ( #1167978=note: print w/replies, xml ) Need Help??


in reply to Re: Weird Output with Threads and NCurses
in thread Weird Output with Threads and NCurses

By popular (at least one) request:

#!/usr/bin/perl # http://perlmonks.org/?node_id=1167900 use Async::Tiny; # http://72.211.166.141:8080/async-tiny.tgz use Curses; use strict; use warnings; my $t = Async::Tiny->new; $t->addReadCallback( *STDIN, sub { die } ); $t->changeReadMode( *STDIN, 'character' ); $t->addRepeatCallback( 0.100, \&counter, 5, [0] ); $t->addRepeatCallback( 0.110, \&counter, 15, [1000] ); sub counter { my ($pos, $statearrayref) = @_; move $pos, 5; addstr ++$statearrayref->[0]; refresh; } initscr(); clear; curs_set 0; box( ACS_VLINE, ACS_HLINE ); eval { $t->eventloop }; endwin();

And here's Async::Tiny

package Async::Tiny; ################################################# +# use Time::HiRes qw(time); use IO::Ppoll qw( POLLIN POLLOUT ); #use IO::Poll qw( POLLIN POLLOUT ); # if you don't have IO::Ppoll use IO::Socket; use warnings; use strict; my (@timerqueue, %reads, %writes, %listens, $poll, $wait); my $udp = 17; # or not :) sub isudp { "$_[0]" =~ /IO::Socket/ && $_[0]->protocol == $udp } sub new { $poll = IO::Ppoll->new; #$poll = IO::Poll->new; my $self = shift; bless { }, ref $self || $self; } sub status { my $t = @timerqueue; my $r = keys %reads; my $l = keys %listens; my $w = keys %writes; my $h = $poll->handles; return "timers: $t reads: $r listens: $l writes: $w handles: $h\n"; } sub addWaitCallback { (undef, $wait) = @_ } sub addReadCallback { my ($self, $handle, $callback, @args) = @_; $poll->mask($handle => POLLIN | $poll->mask($handle)); # add read $reads{$handle} = { callback => $callback, args => \@args, mode => ' +', ( isudp($handle) ? (packets => []) : (data => '')) }; } sub changeReadMode { my ($self, $handle, $mode) = @_; $reads{$handle}{mode} = $mode; } sub addListenCallback { my ($self, $socket, $callback, @args) = @_; if( defined $callback ) { ref $socket or $socket = { ($socket =~ /:/ ? 'LocalAddr' : 'LocalPort') => $socket }; # s +calar is port if( ref $socket eq 'HASH' ) { my %args = ( Listen => 10, Reuse => 1, %$socket ); # add default +s $socket = IO::Socket::INET->new(%args) or die $@; } $poll->mask($socket => POLLIN | $poll->mask($socket)); # add read $listens{$socket} = { callback => $callback, args => \@args}; } else { $poll->remove($socket); delete $listens{$socket}; } } sub queue # binary insertion into timer queue { for my $event ( @_ ) { my $time = $event->[0]; my $low = 0; my $high = @timerqueue; my $mid; $timerqueue[$mid = $low + $high >> 1][0] <= $time ? ($low = $mid + 1) : ($high = $mid) while $low < $high; splice @timerqueue, $low, 0, $event; } } sub repeater { my ($delay, $callback, @args) = @_; ($callback->(@args) // '') =~ /end *repeat/i or queue [ $delay + time, \&repeater, $delay, $callback, @args ]; } sub addRepeatCallback { my ($self, $delay, $callback, @args) = @_; queue [ $delay + time, \&repeater, $delay, $callback, @args ]; } sub addDelayCallback { my ($self, $delay, $callback, @args) = @_; queue [ $delay + time, $callback, @args ]; } sub write { my ($self, $handle, @data) = @_; for my $fh (ref $handle eq 'ARRAY' ? @$handle : $handle) { if( isudp($fh) ) { $writes{$fh} ||= {packets => []}; push @{ $writes{$fh}{packets} }, @data; @{ $writes{$fh}{packets} } and $poll->mask($fh => POLLOUT | $pol +l->mask($fh)); # add out } else { $writes{$fh} ||= {data => ''}; $writes{$fh}{data} .= join '', @data; $writes{$fh}{shutdown} = @data == 0 && "$fh" =~ /IO::Socket/; #length $writes{$fh}{data} and $poll->mask_add($fh => POLLOUT); +# add out length $writes{$fh}{data} and $poll->mask($fh => POLLOUT | $poll +->mask($fh)); # add out } } } sub addErrorCallback { my ($self, $handle, $callback, @args) = @_; if( exists $writes{$handle} ) { $writes{$handle}{error} = $callback; $writes{$handle}{errorargs} = \@args; } } sub mtimecheck { my ($interval, $file, $oldtime, $callback, @args) = @_; my $mtime = (stat $file)[9] || 0; my $return = $mtime && $oldtime < $mtime && $callback->(@args); defined $return && $return =~ /end *repeat/i or queue [ $interval + time, \&mtimecheck, $interval, $file, $mtime, $callback, @args ]; } sub addMtimeCallback { my ($self, $interval, $file, $callback, @args) = @_; my $mtime = (stat $file)[9]; queue [ $interval + time, \&mtimecheck, $interval, $file, $mtime, $callback, @args ]; } sub eventloop ######################################################## +# { while( @timerqueue || %reads || %writes || %listens ) { my $waitfor = @timerqueue ? $timerqueue[0][0] - time : 1e6; $waitfor < 0 and $waitfor = 0; defined $wait and $waitfor > 0 and $wait->(); ############################################################ my $cnt = $poll->poll($waitfor); # wait for next queued time ############################################################ if( $cnt > 0 ) { for my $fh ( $poll->handles(POLLOUT) ) # ready writes { if( !defined $fh or not exists $writes{$fh} ) { warn "leftover write handle"; #$poll->mask_del($fh => POLLOUT); # del write $poll->mask($fh => ~POLLOUT & $poll->mask($fh)); # del write } elsif( isudp($fh) ) { if( @{ $writes{$fh}{packets} } ) { $fh->send( shift @{ $writes{$fh}{packets} } ) or die "send error $!"; } else { #$poll->mask_del($fh => POLLOUT); # del write $poll->mask($fh => ~POLLOUT & $poll->mask($fh)); # del wri +te delete $writes{$fh}; # no packet to write } } else # tcp or other { my $have = length $writes{$fh}{data}; if(defined $have and $have > 0) { my $len = syswrite $fh, $writes{$fh}{data}; if( not defined $len ) { if( $writes{$fh}{error} ) { $writes{$fh}{error}->("$!", @{ $writes{$fh}{errorargs} + }); } else { warn "write error $!"; } } elsif( $len == $have ) { $writes{$fh}{shutdown} and $fh->shutdown(1); #$poll->mask_del($fh => POLLOUT); # del write $poll->mask($fh => ~POLLOUT & $poll->mask($fh)); # del w +rite delete $writes{$fh}; # all data has been written } elsif( $len > 0 ) { substr $writes{$fh}{data}, 0, $len, ''; } else { die "zero length write"; } } else { $writes{$fh}{shutdown} and $fh->shutdown(1); #$poll->mask_del($fh => POLLOUT); # del write $poll->mask($fh => ~POLLOUT & $poll->mask($fh)); # del wri +te delete $writes{$fh}; # no data to write #die "had no data to write"; } } } for my $fh ( $poll->handles(POLLIN) ) # ready reads { if( not (exists $reads{$fh} || exists $listens{$fh}) ) { warn "leftover read handle"; $poll->mask($fh => 0); } elsif( isudp($fh) ) { $fh->recv(my $packet, 1500); $reads{$fh}{callback}->( $packet, @{ $reads{$fh}{args} } ); } elsif( exists $listens{$fh} ) # new tcp connection { $listens{$fh}{callback}-> (scalar($fh->accept), @{ $listens{$fh}{args} } ); } elsif(sysread($fh, $reads{$fh}{data}, 8192, length $reads{$fh}{data})) { if( $reads{$fh}{mode} =~ /^char/i ) { $reads{$fh}{callback}->( $reads{$fh}{data}, @{ $reads{$fh}{args} } ); $reads{$fh}{data} = ''; } elsif( $reads{$fh}{mode} =~ /^full/i ) { # no callbacks until eof } else { $reads{$fh}{callback}->( $1, @{ $reads{$fh}{args} } ) while $reads{$fh}{data} =~ s/(.*\n)//; } } else # end of file { $reads{$fh}{callback}->( $reads{$fh}{mode} =~ /^full/i ? $reads{$fh}{data} : '', @{ $reads{$fh}{args} } ); $poll->remove($fh); delete $reads{$fh}; delete $writes{$fh}; # dump output if EOF } } } $waitfor = @timerqueue ? $timerqueue[0][0] - time : 1e6; if($waitfor <= 0) # a timer has expired { my (undef, $callback, @args) = @{ shift @timerqueue }; $callback->(@args); } } } 1; # return true for this module __END__ =head1 NAME Async::Tiny - Tiny? async eventloop module mostly for example purposes =head1 SYNOPSIS use Async::Tiny; my $tiny = Async::Tiny->new; $tiny->addReadCallback( *STDIN, sub { print "input: @_" } ); $tiny->addDelayCallback( 10, sub { print "stop\n" } ); $tiny->addDelayCallback( 0, sub { print "start\n" } ); $tiny->eventloop; =head1 DESCRIPTION Async::Tiny implements a simple "select" based async/callback style kernel (sort of like POE or AnyEvent, maybe) that handles the messy work and lets the user just define callbacks to handle his special nee +ds. =head1 METHODS =over 4 =item new Constructor. =item addDelayCallback ( DELAY, CALLBACK, ARGS ) Add a timer to expire after DELAY seconds. CALLBACK(ARGS) =item addRepeatCallback ( INTERVAL, CALLBACK, ARGS ) Add a timer that does callback every INTERVAL seconds. CALLBACK(ARGS) =item addReadCallback ( HANDLE, CALLBACK, ARGS ) Add a callback for each line of text (but see below) input on HANDLE +. CALLBACK(LINE, ARGS) =item changeReadMode ( HANDLE, MODE ) There are three read modes: 'character' - return all new characters 'linebyline' - return each "\n" terminated line (default) 'full' - only return once at EOF, with complete buffer =item addListenCallback ( HANDLEorPORT, CALLBACK, ARGS ) Add a callback for new connections on a listen socket. CALLBACK(NEWCONNECTION, ARGS) =item write ( HANDLE, DATA ) Queues DATA for writing to HANDLE. =item write ( [HANDLES], DATA ) Queues DATA for writing to all HANDLES in []. (Think IRCD) =item write ( HANDLES ) Perform HANDLE->shutdown(1) after all queued data is written. =item addWaitCallback ( CALLBACK ) Specify a callback to be called just before the select call. This ca +n be used to update the display in a GUI. CALLBACK() =item eventloop Start the event loop processing. The event loop ends when no callbac +ks are left. =back =head1 EXAMPLE tinyirc.pl #!/usr/bin/perl # tinyirc.pl - simple tiny irc client Ppoll use Async::Tiny; use IO::Socket; use strict; my $t = Async::Tiny->new; sub sendline # works for both directions :) { my ($line, $to) = @_; $line eq '' ? exit : $t->write( $to, $line ); } my $s = IO::Socket::INET->new(shift // 'localhost:6667') or die $@; $t->addReadCallback( *STDIN, \&sendline, $s ); $t->addReadCallback( $s, \&sendline, *STDOUT ); $t->eventloop; =head1 EXAMPLE tinyircd.pl #!/usr/bin/perl # tinyircd.pl - minimal tiny ircd with Async::Tiny with Ppoll # multiple telnets or tinyirc.pl to it, echos to the others (*very* +basic ircd) use Async::Tiny; use strict; my %clients; my $t = Async::Tiny->new; for my $port (@ARGV ? @ARGV : 6667) { $t->addListenCallback($port, sub { my ($socket) = @_; $clients{$socket} = $socket; $t->addReadCallback($socket, \&clientline, $socket ); }); $t->addDelayCallback(0, sub{print "listening on $port...\n"} ); } $t->addRepeatCallback( 600, sub { print localtime() . " clients: @{[ scalar keys %clients, times ]}\ +n"}); $t->eventloop; sub clientline { my ($line, $me) = @_; if( $line eq '' ) { delete $clients{$me}; # was closed } else { $t->write( [grep $me != $_, values %clients], $line); } } =head1 EXAMPLE math.pl #!/usr/bin/perl # # math.pl - a math server using Async::Tiny Ppoll use Async::Tiny; use strict; my $t = Async::Tiny->new; $t->addListenCallback( shift // 8081, sub { my ($socket) = @_; $t->addReadCallback( $socket, sub { my ($line) = @_; tr#0-9()*/+-# #c for $line; # strip non-math for safety :) $t->write( $socket, eval $line // "$line is an invalid expressio +n $@", "\n"); }); }); $t->eventloop; =head1 EXAMPLE key.pl #!/usr/bin/perl # # key.pl - read single key-at-a-time with Async::Tiny Ppoll use Term::ReadKey; use Async::Tiny; use Data::Dump; use strict; $| = 1; my $t = Async::Tiny->new; $t->addReadCallback( *STDIN, sub { my ($string) = @_; dd $string; #printf "%vd\n", $string; $string =~ /^[q\e]$/i and die "quit character entered\n"; }); $t->changeReadMode( *STDIN, 'character' ); $t->addListenCallback( 8081, sub { $t->addReadCallback( $_[0], sub { print @_ }); }); my $count = 0; $t->addRepeatCallback( 60, sub { print ++$count, " tick\n" }); ReadMode 'raw'; eval { $t->eventloop }; my $answer = $@; ReadMode 'restore'; print $answer; =head1 EXAMPLE curses.pl #!/usr/bin/perl # # curses.pl - client using Async::Tiny Ppoll use Curses; use Term::ReadKey; use Async::Tiny; use IO::Socket; use Data::Dump qw(pp dd); use strict; $| = 1; my @lines; my $input = ''; my ($width, $height) = GetTerminalSize; my $t = Async::Tiny->new; $t->addWaitCallback( sub { ($width, $height) = GetTerminalSize; my $row = 0; for ( @lines = ( ('') x $height, @lines )[2 - $height .. -1], # top pa +rt '#' x ($width - 25) . ' ' . localtime(), # divide +r line $input) # bottom + line { addstr($row++, 0, substr $_, 0, $width); clrtoeol; } refresh; }); my $s = IO::Socket::INET->new(shift // 'arch:8081') or die $@; $t->addReadCallback( $s, sub { my ($line) = @_; $line eq '' and die "server closed connection\n"; push @lines, "< $line" =~ s/\n//r; }); $t->addReadCallback( *STDIN, sub { my ($string) = @_; #dd $string; for ($string =~ /\e(?:\[M...|[O\[][0-9;]*[A-~])|./gs ) # keep esc +seq together { /^[ -~]\z/ ? $input .= $_ : /^[\cc\cd\e]\z/ ? die "quit character @{[ pp $_ ]} entered\n" : /^[\010\177]\z/ ? chop $input : /^[\r\n]\z/ ? do { if( length $input ) { push @lines, " > $input"; $t->write( $s, $input, "\n" ); $input = ''; } } : do { $input .= pp $_ }; $input &= "\xff" x ($width - 3); # trim to size } }); $t->changeReadMode( *STDIN, 'character' ); $t->addRepeatCallback( 1, sub { }); # for incrementing clock $t->addRepeatCallback( 3600, sub { push @lines, ' ' . localtime() +} ); initscr(); clear; ReadMode 'cbreak'; mousemask(BUTTON1_CLICKED, my $oldmask); eval { $t->eventloop }; my $errormsg = $@; ReadMode 'restore'; endwin(); print $errormsg; =head1 EXAMPLE animate.pl #!/usr/bin/perl # # animate.pl - spin things Ppoll use Async::Tiny; use strict; $| = 1; my $t = Async::Tiny->new; my $test = shift // 5; my $count = 0; if($test == 1 || $test > 4) { print "test 1 - auto repeat\n"; $t->addRepeatCallback( 0.05, sub { print "\r", qw(- \\ | /)[$count % 4]; ++$count < 60 or 'endrepeat'; }); } if($test == 2 || $test > 4) { print "test 2 - chained delays\n"; sub tick { print "\r", qw(- \\ | /)[$count % 4]; ++$count < 60 and $t->addDelayCallback( .05, \&tick ); } $t->addDelayCallback( 0, \&tick ); } if($test == 3 || $test > 4) { print "test 3 - queue all up beforehand\n"; for (0..59) { my $char = qw(- \\ | /)[$_ % 4]; $t->addDelayCallback( $_ / 20, sub { print "\r$char" }); } } if($test == 4 || $test > 4) { print "test 4 - dual auto repeat\n"; my $secondcount = 0; $t->addRepeatCallback( 0.12, sub { print "\r", map qw(- \\ | /)[$_ % 4], $count, $secondcount; ++$count < 3/.12 or 'endrepeat'; }); $t->addRepeatCallback( 0.05, sub { print "\b", qw(- \\ | /)[$secondcount % 4]; ++$secondcount < 3/.05 or 'endrepeat'; }); } $t->eventloop; print "\n"; =head1 EXAMPLE bars.pl #!/usr/bin/perl # # bars.pl - multiple independent timers using Async::Tiny Ppoll use Curses; use Term::ReadKey; use Async::Tiny; use strict; $| = 1; my ($width, $height) = GetTerminalSize; my @lines = ( '-' x $width ) x $height; my $t = Async::Tiny->new; $t->addWaitCallback( sub { my $row = 0; for ( @lines ) { addstr($row++, 0, substr $_, 0, $width); clrtoeol; } refresh; }); for my $line (@lines) { $t->addRepeatCallback( (3 + rand 20) / 50 , sub { s/-/#/ or s/#(?!#)/=/ or tr/=/-/ for $line; (times)[0] + rand() > 2 and 'end repeat'; } ); } initscr(); clear; $t->eventloop; endwin(); =head1 EXAMPLE echoall.pl #!/usr/bin/perl # Async::Tiny with Ppoll version of # https://blog.afoolishmanifesto.com/posts/concurrency-and-async-in- +perl/ use experimental 'signatures'; use Async::Tiny; use strict; my %clients; my $t = Async::Tiny->new; $t->addListenCallback( 9935, sub ($socket) { $t->addReadCallback( $clients{$socket} = $socket, sub ($line, $me) { $t->write( $me, $line); $line eq '' and delete $clients{$me}; # was closed }, $socket ); }); $t->addRepeatCallback(5, sub {$t->write([ values %clients ], "ping!\ +n" )}); $t->addDelayCallback(0, sub { warn "ready on port 9935\n" }); #$t->addRepeatCallback(60, sub { warn $t->status }); $t->eventloop; =cut

Replies are listed 'Best First'.
Re^3: Weird Output with Threads and NCurses
by BrowserUk (Pope) on Jul 18, 2016 at 16:06 UTC

    By way of contrast, here's what a threaded solution might look like:

    #! perl -slw use strict; use threads; use Thread::Queue; use Win32::Console; my $con = new Win32::Console STD_OUTPUT_HANDLE; $con->Cls( $FG_LIGHTBLUE | $BG_WHITE ); my $Qcon = new Thread::Queue; async{ while( my( $what, $where ) = split $;, $Qcon->dequeue ) { $con->WriteChar( $what, 5, $where ); } }->detach; sub writer { my $pos = shift; my $counter = 0; while( 1 ) { $Qcon->enqueue( join $;, ++$counter, $pos ); select '','','', rand( 0.2 ); } } my @writers = map threads->create( \&writer, $_ ), 5, 15; sleep 100;

    It is windows only, but adapting it to *nix shouldn't be too hard.

    The real benefit shows up when you start to want to do something useful in the threads.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority". I knew I was on the right track :)
    In the absence of evidence, opinion is indistinguishable from prejudice.
Re^3: Weird Output with Threads and NCurses
by choroba (Bishop) on Jul 18, 2016 at 16:07 UTC
    > And here's Async::Tiny

    It shouldn't be here, it should be there!

    ($q=q:Sq=~/;[c](.)(.)/;chr(-||-|5+lengthSq)`"S|oS2"`map{chr |+ord }map{substrSq`S_+|`|}3E|-|`7**2-3:)=~y+S|`+$1,++print+eval$q,q,a,
Re^3: Weird Output with Threads and NCurses
by var121 (Novice) on Jul 19, 2016 at 14:24 UTC
    Thanks for your post, I will go through the concept of Asynchronous programming. What is the last argument in  addRepeatCallback call ? Does First argument define the time ? and we are assuming the first call will be completed in 0.010 time units ? For writing games, where we will have to do many things simultaneously, like keeping score, updating screen, is that possible with Async Technique ? I have this doubt because I have seen some simple game code, written using threads, and it works absolutely fine.

      The calls are documented in POD at the end of Async::Tiny. Just run "perldoc Async/Tiny.pm" to see the formatted version.

      The first argument to addRepeatCallback is the interval in seconds, and the second is the callback subref. Any extra arguments are provided to the callback sub when it is called.

      In this case they are the row number at which to display the count, and an array ref which holds as its first element the actual count. This array ref holds the "state" for this particular "thread" of repeated callbacks, so I can have two (or more, if desired) independent counts running at the "same" time using the same counter sub.

      ( see the "counter" sub for how those two extra arguments are handled.)

      "simultaneously" ? yes and no :)
      My example program shows two counters incrementing "simultaneously" at two different rates.

      Here's another example program (a newer version of the "bars.pl" in the POD) where each row sweeps at its own rate independent of the other rows (and "simultaneously").

      (This is one of my test programs for the timerqueue.)

      #!/usr/bin/perl # # bars.pl - multiple independent timers using Async::Tiny Ppoll use Curses; use Term::ReadKey; use Async::Tiny; use strict; use warnings; my ($width, $height) = GetTerminalSize; my @lines = ( '-' x $width ) x $height; my $endcode; my $t = Async::Tiny->new; $t->addDelayCallback( 100, sub { $endcode = 'endrepeat' } ); for my $row (0 .. $#lines) { $t->addRepeatCallback( (3 + rand 20) / 50 , sub { s/-/#/ or s/#*\K#/=/ or tr/=/-/ for $lines[$row]; addstr $row, 0, $lines[$row]; refresh; $endcode; } ); } initscr; curs_set 0; addstr 0, 0, '-' x $width x $height; $t->eventloop; endwin;

      I'll see if I can do a simple game with action, interaction, and score keeping as "proof of concept" sometime soon.

        Simple game? (note: I've never played the original or any variation thereof).

        It's just a proof of concept :)

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://1167978]
help
Chatterbox?
and all is quiet...

How do I use this? | Other CB clients
Other Users?
Others meditating upon the Monastery: (2)
As of 2018-04-19 23:52 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Notices?