kennethk has asked for the wisdom of the Perl Monks concerning the following question:

Oh sagacious and munificent monks;

In my eternal attempt to better myself, I've been trying to learn threads under Perl. I wrote the following code to play with concepts in message dispatching. Ideally, it should just echo input back at me until I 'quit'. It behaves nicely on my Linux box (5.8.8 64-bit multithread under Debian) but hangs on my Windows box (ActiveState perl v5.8.8 MSWin32-x86-multi-thread on a dual-core) on line 26 on the first invocation. Any ideas on routes I could pursue to fix this?

#!/usr/bin/perl use strict; use warnings; use threads; use Thread::Queue; # Create terminal watcher print "Create terminal watcher...\n"; my $Q_stdin = Thread::Queue->new; async { $Q_stdin->enqueue( $_ ) while defined( $_ = <STDIN> ); }->detach; my $Q_found = Thread::Queue->new; my $cmd; print "Awaiting commands...\n"; MAIN_LOOP: while (not defined $cmd or $cmd !~ /^q/i) { sleep(1); # Reduce load # Process commands $cmd = $Q_stdin->dequeue_nb; if (defined $cmd) { chomp $cmd; if ($cmd =~ /^q/i) { print "Resolving open threads\n"; } else { async { $Q_found->enqueue( $cmd ) ; }->detach; #$Q_found->enqueue( $cmd ) ; # using this in place of asyn +c works } } # Print announcements while (defined(my $output = $Q_found->dequeue_nb)) { print $output, "\n"; } }

Update: The issue can be resolved by ensuring STDIN is closed before entering MAIN_LOOP (thanks oshalla). The reason for it is still not entirely clear in my mind, but I have some clear avenues of research to pursue - there seem to be both OS and Perl version variations impacting the behaviors of this code. In any case, a more verbose version that behaves as expected follows:

#!/usr/bin/perl use strict; use warnings; use threads; use Thread::Queue; use Thread::Semaphore; # Create terminal watcher print "Create terminal watcher...\n"; my $Q_stdin = Thread::Queue->new ; my $S_stdin = Thread::Semaphore->new(0) ; print "fileno(STDIN) = ", fileno(STDIN), "\n" ; async { $S_stdin->down() ; print "Reading STDIN, fileno=", fileno(STDIN), "\n" ; while (defined( $_ = <STDIN> )) { chomp ; print "\n+IN : '$_'\n" ; $Q_stdin->enqueue( $_ ) ; }; }->detach; close STDIN ; ### no longer our business + <<<< print "STDIN is: ", defined(fileno(STDIN)) ? 'open' : 'closed', "\n" ; $S_stdin->up() ; my $Q_found = Thread::Queue->new; my $cmd; my $quit_flag = 1; print "Awaiting commands...\n"; MAIN_LOOP: while ($quit_flag) { sleep(1); # Reduce load # Process commands while (defined ($cmd = $Q_stdin->dequeue_nb)) { chomp $cmd; if ($cmd =~ /^q/i) { print "Resolving open threads\n"; $quit_flag = 0; } else { async { $Q_found->enqueue( $cmd ) ; }->detach; #$Q_found->enqueue( $cmd ) ; # using this in place of asyn +c works } } # Print announcements while (defined(my $output = $Q_found->dequeue_nb)) { print "+OUT: '$output'\n"; } }

Replies are listed 'Best First'.
Re: threads on Windows
by gone2015 (Deacon) on Feb 12, 2009 at 10:27 UTC

    I scattered print statements around to see if that told me any more about what is happening here -- as below. I tried this on Windows XP, Activestate 5.10.0 (1004).

    What I saw was roughly what you described, the main thread seemed to hang trying to create the new thread (ie on the async in the MAIN_LOOP) -- but only until the input thread got something from STDIN.

    I tried adding a sleep(2) after enqueuing the input (where shown in the code below). Now things worked more as expected -- the main thread no longer hangs.

    Under Windows, what appears to be going on is: the main thread is hanging trying to create a new thread, while the input thread is waiting on STDIN.

    Now... we know that when a new thread is created the entire state of the creator thread is duplicated. That includes filehandles. My guess (and this is only a guess) is that the duplication of STDIN is blocked while another thread is waiting on it (or otherwise "owns it").

    If the main thread closes STDIN after creating the input thread, then all is well.

    Trying to share an input across a number of threads is obviously tricky. I can see the logic of having an exclusive lock on a file handle -- who knows, it may even support multiple threads reading a line at a time from a shared filehandle. I don't know how *n*x deals with this.

    So, I guess the lesson here is that tidiness is next to working code. When you no longer have a legitimate interest in a file handle, close the sucker.

      yikes! If <> blocks thread creation, you gotta wonder 1) what else <> blocks, and 2) what else can block thread creation.

      Confirmed that adding sleep to my text input thread resolves the issue. I don't get any significant behavioral change w/ closing STDIN, so I conclude that the issue is that <> blocks (waiting on input) while the main thread is blocked at async (waiting for a free control), resulting in a race condition whenever the IO thread receives a line. The *NIX box has an unfair advantage here since it's running off a quad core, thus it has enough control to allow three threads at once. Since my ultimate plan was replacing the STDIN reading thread with a GUI thread, I'll have to see how it behaves with regard to control issues (I'm seeing run-time thread safety determination in my future...).

      Thanks so much for your help.

        Hmmm... that's odd. I found that the close STDIN did the job perfectly. I note you are using ActiveState perl v5.8.8 on a dual core machine -- while I have 5.10.0, single core... but have no way of knowing whether either accounts for the difference.

        Just in case: it's not necessary to have the sleep in the input thread and the close together... the close on its own does the trick (here).

        I wondered whether it made a difference if the input thread was blocked on the <STDIN> at the time the main thread did the close STDIN in the main thread to block or fail... which would be more likely on a dual core machine. However, I tried a threads->yield() before the close, and that didn't make any difference. (The input loop was definitely waiting for <STDIN> but the close completed successfully.)I suppose this could be different on 5.8.8 ?

        If you wanted to play with semaphores you could do:

        # Create terminal watcher print "Create terminal watcher...\n"; my $Q_stdin = Thread::Queue->new ; my $S_stdin = Thread::Semaphore->new(0) ; print "fileno(STDIN) = ", fileno(STDIN), "\n" ; async { $S_stdin->down() ; print "Reading STDIN, fileno=", fileno(STDIN), "\n" ; while (defined( $_ = <STDIN> )) { chomp ; print "\n+IN : '$_' " ; $Q_stdin->enqueue( $_ ) ; print "*" ; } ; }->detach; close STDIN ; ### no longer our business + <<<< print "STDIN is: ", defined(fileno(STDIN)) ? 'open' : 'closed', "\n" ; $S_stdin->up() ;
        which guarantees that the main thread has closed STDIN before the input thread touches it.

Re: threads on Windows
by BrowserUk (Pope) on Feb 12, 2009 at 16:43 UTC

    Ignoring for a moment all the other "advise" in this thread, I have a question.

    Given that, as your comments suggests, using enqueue() directly:

    #$Q_found->enqueue( $cmd ) ; # using this in place of async works

    Rather than starting a whole new thread for the sole purpose of doing something that takes approximately 1/1000 of the time it takes to start that thread, the program runs perfectly:

    c:\test>junk9 Create terminal watcher... Awaiting commands... fred fred bill bill john john quit Resolving open threads

    Why are you starting that thread?


    But then, why are you queuing to a second queue from your main loop, when the only thing you do with what you queue, is to dequeue it later from that same main loop?

    Which also means that you have to use dequeue_nb() rather than dequeue(), which in turn forces you to use sleep to stop it from running away with the cpu. Using non-blocking dequeue makes a complete nonsense of a message passing architecture.


    Even your OP code works--as you've designed it, though possibly not as you expect--on Win32. The ONLY change I've made is to prefix the output with '>' to differentiate it from the input:

    #!/usr/bin/perl use strict; use warnings; use threads; use Thread::Queue; # Create terminal watcher print "Create terminal watcher...\n"; my $Q_stdin = Thread::Queue->new; async { $Q_stdin->enqueue( $_ ) while defined( $_ = <STDIN> ); }->detach; my $Q_found = Thread::Queue->new; my $cmd; print "Awaiting commands...\n"; MAIN_LOOP: while (not defined $cmd or $cmd !~ /^q/i) { sleep(1); # Reduce load # Process commands $cmd = $Q_stdin->dequeue_nb; if (defined $cmd) { chomp $cmd; if ($cmd =~ /^q/i) { print "Resolving open threads\n"; } else { async { $Q_found->enqueue( $cmd ) ; }->detach; #$Q_found->enqueue( $cmd ) ; # using this in place of asyn +c works } } # Print announcements while (defined(my $output = $Q_found->dequeue_nb)) { print ">$output\n"; } } __END__ c:\test>junk9 Create terminal watcher... Awaiting commands... fred bill john >fred mary >bill quit >john Resolving open threads >mary

    Why do you not get the first output until after the third input? Because that's exactly the way you've designed it to work.

    Why does it work differently on Win32 to *nix? Because they are different systems. Specifically, the OS serialises read and write accesses to console devices.

    Can I prove this is the case? Yes. If you supply the input via a pipe, rather than from the keyboard, then you'll see the bahaviour you are (wrongly) expecting:

    c:\test>perl -wle" printf qq[%08d\n], $_ for 1 .. 10; print 'quit' " | + perl junk9.pl Create terminal watcher... Awaiting commands... >00000001 >00000002 >00000003 >00000004 >00000005 >00000006 >00000007 >00000008 >00000009 Resolving open threads >00000010

    If you would describe what you are actually trying to do, I can probably suggest a simpler and more consistant architecture.


    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

      First, thank you for your detailed response.

      As you have noted, this code is extremely impractical. It represents the smallest version of a more complex but impractical code that repeated the issue, which is why I posted it. In addition to being a generic learning exercise, my ultimate goal is the design of a main processing engine with worker threads that respond to pause/resume/kill requests. The more complex version follows:

      .

      It mostly comes down to me being unwilling to relinquish control over the scheduling a user interface. Ultimately, I plan on replacing the CLI with a GUI thread, though this is probably all an abuse of queues. I also note I am abusing the FPE signal, which came down to random selection of a signal on both my Linux and Windows boxes that Windows did not treat as fatal.

        I do not understand how you arrived at this architecture.

        Why start a thread to accept the commands from STDIN and then post them to a queue, and then have your main thread read that queue and action them?

        The only reason I can see for doing this is so that your main thread/loop can make busy work polling two queues, in non-blocking mode, and sleeping.

        Ignoring that the second queues only purpose seems to be to print messages from the workers to the console (which could just as easily have been directed there from within the worker threads), the actions essentially consist entirely of sending signals, so why not perform those actions in the STDIN thread?

        I spent ages thinking about your statement of intent to move to using a gui, and whether this conflation of loops was some attempt to bring all the interaction, input and output, together into a single thread in preparation for that. But that doesn't make any sense either. You aren't going to use a command line for input and a gui for output (are you!?), and if the interaction is all going to be via the gui, whether button or keyboard based, it's all going to be event driven. So conflating the two now, would just need to be undone when you make the switch.

        Anyway, on the basis of what you've posted, here's how I would do what you are attempting. It all works, though using Thread::Sempaphore and those dratted per thread signals together is awfully clumsy, given that a signal won't even awake a sleep, never mind a semaphore wait.

        This code is somewhat stylistically challenged, because it was a case of tacking things in here and there to see what works. I might have tried to rewrite it a little, but given the total disparity between our coding styles, you would have just rewritten it yourself anyway--so you might as well expend the effort :)

        The sample run at the bottom shows that all the commands: (w)orker; (s)tatus; (p)ause/resume; (k)ill; and (q)uit all work. To some degree at least:

        #! perl -slw use strict; use threads; use Thread::Queue; use Thread::Semaphore; my $Q = new Thread::Queue; my $boss = async { my %workers; my %sems; while( <STDIN> ) { ( local( $_ ), my $name ) = m[([a-zA-Z])\s*(\d+)?] or next; warn "Non-integer thread '$name'\n" and next if defined $name and $name =~m[\D]; if( m[^w]i ) { warn "You must give a name for the worker\n" and next unless defined $name; warn "Worker $name already exists\n" and next if exists $workers{ $name }; $sems{ $name } = new Thread::Semaphore; $workers{ $name } = threads->create( \&worker, $name, $Q, $sems{ $name } ); next; } warn "Worker $name does not exist\n" and next unless m[^q] or exists $workers{ $name }; m[^k] and $sems{ $name }->up, ## Ensure the worker can respon +d delete( $workers{ $name } )->kill( 'KILL' )->join and next; if( m[^p] ) { ${ $sems{ $name } } ? $sems{ $name }->down : $sems{ $name }->up; $workers{ $name }->kill( 'STOP' ); next; } m[^s] and $sems{ $name }->up, ## Ensure a paused thread is able to r +espond $workers{ $name }->kill( 'FPE' ), $sems{ $name }->down ## and set it back and next; m[^q] or next; for( keys %workers ) { $sems{ $_ }->up; $workers{ $_ }->kill( 'KILL' )->join; } $Q->enqueue( undef ); ## Terminate main loop last; } }; print ">$_" while defined( $_ = $Q->dequeue ); $boss->join; exit; sub worker { my( $name, $queue, $sem ) = @_; my $cooldown = 0; my $state = 0; $SIG{KILL} = sub { warn "Worker $name dying\n"; threads->exit(); }; $SIG{STOP} = sub { printf STDERR "Worker $name is %s\n", ( $state ^= 1 ) ? 'paused' : 'resumed'; $sem->down(); $sem->up(); }; $SIG{FPE } = sub { $queue->enqueue("Worker $name has cooldown $cooldown."); }; $queue->enqueue("Worker $name starting"); my %nodes = (); while (1) { $queue->enqueue("$name step $cooldown"); sleep 1; $cooldown++; } } __END__ C:\test>743398.pl w1 >Worker 1 starting >1 step 0 >1 step 1 p1>1 step 2 Worker 1 is paused s1 >Worker 1 has cooldown 2. w2 >Worker 2 starting >2 step 0 >2 step 1 >2 step 2 p>2 step 3 2 Worker 2 is paused s2 >Worker 2 has cooldown 3. k1 >1 step 3 Worker 1 dying q >2 step 4 Worker 2 dying

        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.
Re: threads on Windows
by weismat (Friar) on Feb 12, 2009 at 05:46 UTC
    I think that Windows might allow you to access STDIN only from the main thread.
    This is at least the normal convention in Windows GUI programming.
    I am not sure if this is the only issue in your code, but you may consider to change your code accordingly and check if it will work then.
      I got the code for a non-blocking read from Re^3: Non Blocking input on Win32 platforms. My original idea was ultimately to replace the usage of STDIN CLI with a GUI thread passing messages to a boss/worker model, since I don't want to surrender execution control to the GUI.
Re: threads on Windows
by imrags (Monk) on Feb 12, 2009 at 07:08 UTC
    Ok here's something
    Is your CPU multi-processor???
    If your answer is yes, try this
    C:\Windows\System32\cmd.exe /C start /AFFINITY 0x1 perl.exe <SCRIPT.pl +>
    If this works, let us know.
    Also, you can try printing something everytime a thread is executed (maybe the tid)
    Update:
    You can also try to control the number of threads (while queue
    also helps as in your code), Even semaphores will help.
    Raghu
      I'm running off a dual-core, but cannot execute the command you've suggested - I get an invalid switch /AFFINITY error. Printing a loop index confirms hang on new thread creation. Thankfully oshalla found where my threads were blocking. I appreciate your help.