http://www.perlmonks.org?node_id=593482

opensourcer has asked for the wisdom of the Perl Monks concerning the following question:

hi,
down below is the code of thread tring to do some task, i did my best to remove the joined threads from the array, even thought there are few threads joined already. and when u try to get them i hit with the message Thread already joined at xxxx.pl line 98.. I have the eval trying to catch the culprit but it exits. i would be thankfull if some 1 could help me on this.
#!/usr/bin/perl use File::Find; use Thread qw/async/; my $source = "C:/DManager"; #"E:/Job_server/Helix_source"; $destination = "E:/Job_server/Helix_destination"; my @t; ######## my $workers = 1; #number of workers my $sequence = 0; my $threads = 0; #number of threads my $done = 0; my $resources = 5; # set how mant jobs u want to run my $count =0; ######## find(\&copydir, $source); ### find #### sub copydir { my ($parent, $child, $filename, $mpeg); # print "Now starting \n"; if (($_ ne ".") and ($_ ne " ")) { $mpeg = $_ if (($_ =~ /.*\..*/) && ($_ !~ /.*\.db/)); if ($_ !~ m/\.*\.\w+/) { $parent = "$File::Find::dir"; $child = "$File::Find::name"; print "Folder name : $_\n"; #print "Parent Folder : ". $parent."\n"; print "Child Folder : ". $child ."\n"; } } if ($mpeg) { print "filename : $mpeg\n"; my $rm; #my ($rm) = $mpeg =~ /((.*)\.)/; #$rm =~ s/\.//; #$rm .= ".rm"; my $path = $File::Find::name; $path =~ s/\//\\/g; $rm = $path; $rm =~s/\..*/\.rm/g; #print "\n". $cmd ."\n"; #system "$cmd"; &CheckQ(); StartQ($mpeg, $rm, $path); } } sub StartQ { #print "Now in StartQ \n"; my ($input, $output, $path) = @_; $threads++; print "pushing to the Thread ". $threads ."\n"; eval { push @t, async { Dummy_process(); }; }; if (!$@) { $workers++; #sleep 2; } else { $threads--; } #print "running process \n"; } sub CheckQ { print "Workers working : ". $workers. "\n"; if ($workers >= 5) { print "Wiating for one of the worker to finish the job\n"; print "Checking threads for safe exit \n"; SafeExit(); for (@t) { #$_->done() or sub {$workers--; return}; #if (!$@) { if (!$_->join) { print "Work in process checking other workers : Wo +rkers : $workers\n"; print $@ ."\n"; if ( @t <6 ) { $workers = @t; return; } } else { $workers--; print "Worker finished working : Workers : $worker +s\n"; return; } } } else { return; } } sub SafeExit { print "\n In Safe Exit\n"; for (@t) { eval{ if($_->join) { print "A thread already joined in exit status\n"; print "\n $count \n"; print "\n $_\n"; splice(@t, $count, 1); sleep 2; $count++; } }; if ($@) { print "I Don't want to c my programing deing\n"; } } } ###################################################################### +######################## #for checking the Threads only. sub Dummy_process { print "Thread: ". Thread->self->tid ."Started with working ". $wor +kers. "\n"; sleep 2; }

Replies are listed 'Best First'.
Re: catching Thread Exits - how ?
by Corion (Patriarch) on Jan 08, 2007 at 10:36 UTC

    Your program logic is wrong, or at least very weird. The only place where you try to remove elements from your @t array is in SafeExit, and there you remove the wrong elements. You use the global $count variable to remove the elements, but $count never gets reset to 0 so you will basically remove a random element from @t, or at least not a joined element. I recommend using a hash keyed by the thread id over using some array.

    Also, looking at the Thread documentation, it clearly recommends the ->eval method over the ->join method if you want to prevent dieing.

      bugs in my program, i guess i found what needs to be done.
Re: catching Thread Exits - how ?
by liz (Monsignor) on Jan 08, 2007 at 09:48 UTC
    It looks to me that you're using old 5.005 threads, rather than ithreads. Is there a particular reason for using this outdated, and deprecated since 5.8.0, form of threads?
      i have the same story even after using the "ithreads" by calling use threads , i have replaced all the occurance of thread with threads, but i hit the same message and comes out the program.
        for those who get it wrong, it's working now for me, if u find any bugs and errors, plz do update me.
        #!/usr/bin/perl use File::Find; use threads qw/async/; my $source = "C:/DManager"; $destination = "E:/Job_server/Helix_destination"; my @t; ######## my $workers = 1; #number of workers my $sequence = 0; my $threads = 0; #number of threads my $done = 0; my $resources = 5; # set how mant jobs u want to run my $count =0; my @fin_threads; ######## find(\&copydir, $source); ### find #### sub copydir { my ($parent, $child, $filename, $mpeg); # print "Now starting \n"; if (($_ ne ".") and ($_ ne " ")) { $mpeg = $_ if (($_ =~ /.*\..*/) && ($_ !~ /.*\.db/)); if ($_ !~ m/\.*\.\w+/) { $parent = "$File::Find::dir"; $child = "$File::Find::name"; print "Folder name : $_\n"; #print "Parent Folder : ". $parent."\n"; print "Child Folder : ". $child ."\n"; } } if ($mpeg) { print "filename : $mpeg\n"; my $rm; #my ($rm) = $mpeg =~ /((.*)\.)/; #$rm =~ s/\.//; #$rm .= ".rm"; my $path = $File::Find::name; $path =~ s/\//\\/g; $rm = $path; $rm =~s/\..*/\.rm/g; &CheckQ(); StartQ($mpeg, $rm, $path); } } sub StartQ { #print "Now in StartQ \n"; my ($input, $output, $path) = @_; $threads++; print "pushing to the Thread ". $threads ."\n"; eval { #push @t, async { push @t, async { Dummy_process(); }; }; if (!$@) { $fin_threads[$workers] = 0; $workers++; #sleep 2; } else { $threads--; } #print "running process \n"; } sub CheckQ { print "Workers working : ". $workers. "\n"; if ($workers >= 5) { print "Wiating for one of the worker to finish the job\n"; print "Checking threads for safe exit \n"; SafeExit(); for (@t) { #$_->done() or sub {$workers--; return}; #if (!$@) { #if (!$_->join) { if ($fin_threads[$workers] != 1) { print "Work in process checking other workers : Wo +rkers : $workers\n"; print $@ ."\n"; if ( @t <6 ) { $workers = @t; return; } } else { $workers--; print "Worker finished working : Workers : $worker +s\n"; return; } } } else { return; } } sub SafeExit { print "\n In Safe Exit\n"; for (@t) { eval{ #if($_->join) { if ($fin_threads != 1) { print "A thread already joined in exit status\n"; print "\n $count \n"; print "\n $_\n"; splice(@t, $count, 1); sleep 5; $count++; } #else # { # next; # } }; if ($@) { } } } ###################################################################### +######################## #for checking the Threads only. sub Dummy_process { print "Thread: ". threads->self->tid ."Started with working ". $wo +rkers. "\n"; sleep 2; $fin_threads[$workers] =1; }