Beefy Boxes and Bandwidth Generously Provided by pair Networks
We don't bite newbies here... much
 
PerlMonks  

Re^2: shared scalar freed early

by chris212 (Scribe)
on Feb 22, 2017 at 18:52 UTC ( [id://1182549]=note: print w/replies, xml ) Need Help??


in reply to Re: shared scalar freed early
in thread shared scalar freed early

Yeah, is that a problem? Did you miss the "using a semaphore to limit the number of concurrent threads" part of that sentence? Millions of threads will will be started throughout the execution of the script, but not all at the same time! I can't pass too much data to the thread's sub, right?

UPDATE:

I just did the math, and the test data I am using to replicate the crash would create 766,747 worker threads, but only 32 at a time. I tried making the threads persistent and read data from a queue, but that was MUCH slower.

Replies are listed 'Best First'.
Re^3: shared scalar freed early
by ikegami (Patriarch) on Feb 22, 2017 at 20:16 UTC

    Starting threads in Perl is rather expensive. It's far faster to reuse threads (e.g. using a worker model). You've indicated otherwise, but that simply points to a problem with the implementation you used.

    use strict; use warnings; use feature qw( say ); use threads; use Thread::Queue qw( ); use Thread::Semaphore qw( ); use Time::HiRes qw( time ); use constant MAX_WORKERS => 32; use constant NUM_JOBS => 1_000; { my $sem = Thread::Semaphore->new(MAX_WORKERS); my $s = time; for my $job (1..NUM_JOBS) { $sem->down(); $_->join() for threads->list(threads::joinable); async { # ... $sem->up(); }; } $_->join() for threads->list(); my $e = time; say($e-$s); # 5.88567113876343 } { my $q = Thread::Queue->new(); for (1..MAX_WORKERS) { async { while (defined( my $job = $q->dequeue() )) { # ... } }; } my $s = time; $q->enqueue($_) for 1..NUM_JOBS; $q->end(); $_->join() for threads->list(); my $e = time; say($e-$s); # 0.248196125030518 }

    Things couldn't be any more ideal for creating threads (minimal amount of variables to clone), yet creating all those threads was 25x slower than reusing a few threads. (The factor will grow as the number of jobs increases.)

      Can you make "testb" faster than "testa" while still preserving the order? If you help me do that, then I can rewrite my script using queues and hope that it resolves my intermittent crashes.
      #!/opt/perl/bin/perl use strict; use threads; use Thread::Queue; use Thread::Semaphore; use threads::shared; use Time::HiRes qw( time ); my $iterations = 100; my $chunksize = 50; my $threads = 5; my $aoutput = "a.txt"; my $boutput = "b.txt"; my $q; my $s; my $outq; my %data = (); foreach('a'..'z') { $data{$_} = $_ x 200; } testa(); testb(); system($^O eq 'MSWin32' ? 'fc' : 'diff',$aoutput,$boutput); sub testa { my $t0 = time; $q = Thread::Queue->new; $s = Thread::Semaphore->new($threads); my ($outth) = threads->create(\&outputa); inputa(); $q->end; $outth->join(); printf STDERR "testa done in %0.02f seconds\n",time - $t0; } sub worka { my ($data) = @_; my @ret = (); foreach my $chunk(@$data) { my %output = (); foreach my $key(keys %$chunk) { if($key eq '.') { $output{$key} = $$chunk{$key}; next; } my $val = $$chunk{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $output{$key} = $val; } push(@ret,\%output); } return(\@ret); } sub inputa { foreach my $a(1..$iterations) { my @chunk = (); foreach my $b(1..$chunksize) { my %adata = %data; $adata{'.'} = $a * $b; push(@chunk,\%adata); } $s->down(); my ($th) = threads->create(\&worka,\@chunk); $q->enqueue($th); } } sub outputa { open(my $fh,'>',$aoutput); while(1) { my $th = $q->dequeue(); last unless(defined $th); my ($output) = $th->join(); $s->up(); foreach my $data(@$output) { foreach my $key(sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } } close($fh); } sub testb { my $t0 = time; $q = Thread::Queue->new; $outq = Thread::Queue->new; $s = Thread::Semaphore->new($threads); my @threads = (); foreach(1..$threads) { my $th = threads->create(\&workb); push(@threads,$th); } my ($outth) = threads->create(\&outputb); inputb(); $q->end; $outq->end; $_->join() foreach(@threads,$outth); printf STDERR "testb done in %0.02f seconds\n",time - $t0; } sub workb { while(1) { my $arr = $q->dequeue; last unless(defined $arr); $s->up(); my ($input,$output,$sem) = @$arr; foreach my $data(@$input) { my %ret :shared = (); foreach my $key(keys %$data) { if($key eq '.') { $ret{$key} = $$data{$key}; next; } my $val = $$data{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $ret{$key} = $val; } push(@$output,\%ret); } $sem->up(); } } sub inputb { foreach my $a(1..$iterations) { my @input = (); foreach my $b(1..$chunksize) { my %bdata = %data; $bdata{'.'} = $a * $b; push(@input,\%bdata); } my @output :shared = (); my $sem = Thread::Semaphore->new(0); $s->down(); $outq->enqueue([\@output,$sem]); $q->enqueue([\@input,\@output,$sem]); } } sub outputb { open(my $fh,'>',$boutput); while(1) { my $arr = $outq->dequeue; last unless(defined $arr); my ($output,$sem) = @$arr; $sem->down(); foreach my $data(@$output) { foreach my $key(sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } } close($fh); }

        Hello, chris212. Welcome to the monastery.

        Hello fellow monks. I've been busy, for some time. I saw this thread and wanted to pass on knowledge. This is an interesting problem. The following is a demonstration one might write using the MCE module. Please notice the lack of semaphore and locking at the application level. The time to run is similar to the testa example by Chris. The output file (m.txt), generated here, matches with (a.txt).

        The script runs on all the major platforms, including Windows. The workers persist between chunks. Output order is preserved as per requirement. Both input and output iterators are handled by the MCE-manager process, the main process. Workers request the next input chunk, compute data, then submit their results via MCE->gather. This works reasonably well.

        #!/opt/perl/bin/perl ## http://www.perlmonks.org/?node_id=1182580 use strict; use warnings; use MCE; use Time::HiRes 'time'; my $iterations = 100; my $chunksize = 50; my $threads = 5; my $output = "m.txt"; my %data = (); foreach ('a'..'z') { $data{$_} = $_ x 200; } test_mce(); sub test_mce { my $start = time; my $mce = MCE->new( max_workers => $threads, chunk_size => $chunksize, input_data => make_iter_input($chunksize, $iterations), gather => make_iter_output($output), user_func => \&work, )->run(); printf STDERR "testa done in %0.02f seconds\n", time - $start; } # MCE task to run in parallel sub work { my ($mce, $chunk_ref, $chunk_id) = @_; my $data = $chunk_ref->[0]; my @ret = (); foreach my $chunk (@$data) { my %output = (); foreach my $key (keys %$chunk) { if ($key eq '.') { $output{$key} = $$chunk{$key}; next; } my $val = $$chunk{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $output{$key} = $val; } push(@ret,\%output); } MCE->gather($chunk_id, \@ret); } # make an input closure, returns an iterator sub make_iter_input { my ($chunk_size, $iterations) = @_; my $seq_a = 1; return sub { return if $seq_a > $iterations; my ($chunk_size) = @_; my @chunk = (); foreach my $seq_b ( 1 .. $chunk_size ) { my %retdata = %data; $retdata{'.'} = $seq_a * $seq_b; push @chunk, \%retdata; } $seq_a += 1; return \@chunk; }; } # make an output closure, returns an iterator sub make_iter_output { my ($path) = @_; my %hold; my $order_id = 1; open my $fh, '>', $path or die "open error: $!"; return sub { my $chunk_id = shift; # hold temporarily, until orderly $hold{$chunk_id} = shift; # \@ret while (1) { last unless exists $hold{$order_id}; foreach my $data (@{ delete $hold{$order_id} }) { foreach my $key (sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } $order_id++; } }; }

        Regards, Mario.

        Greetings, fellow monks.

        This is a continuation of my previous post. I try the same thing with MCE::Shared. MCE::Hobo spawns threads on the Windows platform and processes otherwise, on other platforms. The two iterators return a shared object during construction. Maybe the OP can run without having to spawn a million threads. Thus, the reason for writing this and the prior post.

        #!/opt/perl/bin/perl ## http://www.perlmonks.org/?node_id=1182580 use strict; use warnings; use MCE::Hobo; use MCE::Shared; use Time::HiRes 'time'; my %data = (); foreach ('a'..'z') { $data{$_} = $_ x 200; } my $chunk_size = 50; my $iterations = 100; my $num_workers = 5; my $output_path = "h.txt"; my $iter_i = Iter::Input->new($chunk_size, $iterations); my $iter_o = Iter::Output->new($output_path); test_mce_hobo(); sub test_mce_hobo { my $start = time; MCE::Hobo->create('work') for (1..$num_workers); MCE::Hobo->waitall; $iter_o->close(); printf STDERR "testa done in %0.02f seconds\n", time - $start; } # Hobo task to run in parallel sub work { while ( my ($chunk_id, $data) = $iter_i->recv() ) { my @ret = (); foreach my $chunk (@$data) { my %output = (); foreach my $key (keys %$chunk) { if ($key eq '.') { $output{$key} = $$chunk{$key}; next; } my $val = $$chunk{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $output{$key} = $val; } push(@ret,\%output); } my $buf = ''; foreach my $data (@ret) { foreach my $key (sort keys %$data) { $buf .= $$data{$key}; } $buf .= "\n"; } $iter_o->send($chunk_id, $buf); } } ##################################################################### package Iter::Input; sub new { my ( $class, $chunk_size, $iterations ) = @_; my ( $chunk_id, $seq_a ) = ( 0, 1 ); MCE::Shared->share( bless [ $iterations, $chunk_size, \$chunk_id, \$seq_a ], $class ); } sub recv { my ( $self ) = @_; my ( $iters, $chunk_size, $chunk_id, $seq_a ) = @{ $self }; return if ( ${$seq_a} > $iters ); my @chunk; foreach my $seq_b ( 1 .. $chunk_size ) { my %retdata = %data; $retdata{'.'} = ${$seq_a} * $seq_b; push @chunk, \%retdata; } # These were made references on purpose, during construction, # to minimize array access: e.g. $self->[1]++, $self->[3]++ ${$chunk_id}++, ${$seq_a}++; return ${$chunk_id}, \@chunk; } 1; ##################################################################### package Iter::Output; sub new { my ( $class, $path ) = @_; my ( $order_id, $fh, %hold ) = ( 1 ); # Note: Do not open the file handle here, during construction. # The reason is that sharing will fail (cannot serialize $fh). MCE::Shared->share( bless [ $path, $fh, \$order_id, \%hold ], $class ); } sub send { my ( $self, $chunk_id, $output ) = @_; my ( $path, $fh, $order_id, $hold ) = @{ $self }; if ( !defined $fh ) { open $fh, '>', $path or die "open error: $!"; $self->[1] = $fh; } # hold temporarily, until orderly $hold->{$chunk_id} = $output; while (1) { last unless exists( $hold->{ ${$order_id} } ); print {$fh} delete( $hold->{ ${$order_id} } ); ${$order_id}++; } return; } sub close { my ( $self ) = @_; if ( defined $self->[1] ) { CORE::close $self->[1]; $self->[1] = undef; } return; } 1;

        Regards, Mario.

        my $request_q; my $response_q; sub testc { my $t0 = time; $request_q = Thread::Queue->new(); $response_q = Thread::Queue->new(); my $merge_thread = threads->create(\&outputc); my @worker_threads; for (1..$threads) { push @worker_threads, async { while ( my $job = $request_q->dequeue() ) { $response_q->enqueue([ $job->[0], worka($job->[1]) ]); } }; } inputc(); $request_q->end(); $_->join() for @worker_threads; $response_q->end(); $merge_thread->join(); printf STDERR "testc done in %0.02f seconds\n",time - $t0; } sub inputc { my $id = -1; foreach my $a(1..$iterations) { my @chunk = (); foreach my $b(1..$chunksize) { my %adata = %data; $adata{'.'} = $a * $b; push(@chunk,\%adata); } $request_q->enqueue([ ++$id, \@chunk ]); } } sub outputc { my $next_id = 0; my %cache; open(my $fh,'>',$aoutput); while( my $job = $response_q->dequeue() ) { if ($job->[0] != $next_id) { $cache{$job->[0]} = $job; next; } do { foreach my $data(@{ $job->[1] }) { foreach my $key(sort keys %$data) { print {$fh} $$data{$key}; } print {$fh} "\n"; } $job = delete($cache{++$next_id}); } while $job; } close($fh); }

        Untested. Untimed.

        By the way, notice how the worker didn't have to change at all (testc calls worka)? That's good. That means the work is independent of the communication.

        Hello,

        Below is another variation. Here, workers write directly to the output handle, similarly to testa. The MCE relay option when defined loads MCE::Relay and with that enables relay capabilities. Relay is beneficial in places where workers must run orderly and serially. Only a single worker can run inside the relay block below. The important thing is that workers enter it orderly by chunk_id. In other words, workers wait their turn. The worker with chunk_id 1 goes first, then worker with chunk_id 2 next, and so forth.

        I forgot to mention that MCE can spawn threads. Simply add "use threads" at the top of the script, prior to loading MCE. This allows the use of Thread::Queue, Thread::Semaphore, and friends. If curious, compare memory consumption with testa against this one. I increased $iterations to 1000 to be able to monitor the process in another window. Typically, running without threads is faster on Unix. Either way, the option is yours to make if threads is a better fit; e.g wanting to use Threads::Queue.

        use strict; use warnings; use MCE; use Time::HiRes 'time'; my $iterations = 100; my $chunksize = 50; my $threads = 5; my $output = "m.txt"; my %data = (); foreach ('a'..'z') { $data{$_} = $_ x 200; } open my $fh, '>', $output or die "open error: $!"; $fh->autoflush(1); test_mce(); close $fh; sub test_mce { my $start = time; my $mce = MCE->new( max_workers => $threads, chunk_size => $chunksize, input_data => input_iter($chunksize, $iterations), user_func => \&work, init_relay => 0, )->run(); printf STDERR "testa done in %0.02f seconds\n", time - $start; } # make an input closure, return iterator sub input_iter { my ($chunk_size, $iterations) = @_; my $seq_a = 1; return sub { return if $seq_a > $iterations; my ($chunk_size) = @_; my @chunk = (); foreach my $seq_b ( 1 .. $chunk_size ) { my %retdata = %data; $retdata{'.'} = $seq_a * $seq_b; push @chunk, \%retdata; } $seq_a += 1; return \@chunk; }; } # MCE task to run in parallel sub work { my ($mce, $chunk_ref, $chunk_id) = @_; my $data = $chunk_ref->[0]; my @ret = (); foreach my $chunk (@$data) { my %output = (); foreach my $key (keys %$chunk) { if ($key eq '.') { $output{$key} = $$chunk{$key}; next; } my $val = $$chunk{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $output{$key} = $val; } push(@ret,\%output); } my $buf = ''; foreach my $data (@ret) { foreach my $key (sort keys %$data) { $buf .= $$data{$key}; } $buf .= "\n"; } MCE::relay { print {$fh} $buf }; }

        Regards, Mario.

        Hi, chris212.

        The following is a simplier form of testa. The main process creating worker threads is also the one joining them. What I've done is merged the input and ouput routines and named it manager.

        use strict; use warnings; use threads; use Thread::Semaphore; use Time::HiRes qw(time); my $iterations = 100; my $chunksize = 50; my $threads = 5; my $results = "s.txt"; my $s = Thread::Semaphore->new($threads); my %data = (); foreach ('a'..'z') { $data{$_} = $_ x 200; } test(); sub test { my $t = time; manager(); printf "duration: %0.02f secs.\n", time - $t; } sub work { my ($data) = @_; my @ret = (); foreach my $chunk (@$data) { my %output = (); foreach my $key (keys %$chunk) { if ($key eq '.') { $output{$key} = $$chunk{$key}; next; } my $val = $$chunk{$key}; my $uc = uc($key); $val =~ s/$key/$uc/g; $output{$key} = $val; } push @ret, \%output; } my $buf = ''; foreach my $data (@ret) { foreach my $key (sort keys %$data) { $buf .= $$data{$key}; } $buf .= "\n"; } $s->up(); return $buf; } sub manager { my @q; open my $fh, '>', $results or die "open error: $!\n"; foreach my $a (1..$iterations) { my @chunk = (); foreach my $b (1..$chunksize) { my %_data = %data; $_data{'.'} = $a * $b; push @chunk, \%_data; } $s->down(); push @q, threads->create('work', \@chunk); while (@q && $q[0]->is_joinable) { my $output = shift(@q)->join(); print {$fh} $output; } } while (@q) { my $output = shift(@q)->join(); print {$fh} $output; } close $fh; }

        Regards, Mario.

      I'll see if I can create a test script comparing the 2 methods in a way that better represents what I am trying to accomplish. The tricky part is keeping the output in the same order as input, and I don't remember how I accomplished that in my previous testing.
Re^3: shared scalar freed early (queue)
by tye (Sage) on Feb 22, 2017 at 20:17 UTC
    I tried making the threads persistent and read data from a queue, but that was MUCH slower.

    Sounds to me like you are doing something very strange. Creating a Perl "thread" is rather expensive. Much more expensive than pulling an item from a queue. Usually much, much more expensive.

    - tye        

      Please see reply to ikegami. That is not the results I get with my test script. Thanks.

        Oh, so it is "slower" because you are doing all kinds of blocking between threads (not because it is more expensive). Don't do that.

        If you need to preserve order, than create slots for the data and let the threads fill in the slots at their leisure. Way, way simpler code. I failed at trying to understand what the point of your overly complicated sample code was. Sorry.

        Don't pass thread handles through queues. Pass work items. If you only have as many threads as you need, then you don't need the semaphore at all.

        - tye        

Re^3: shared scalar freed early
by Laurent_R (Canon) on Feb 22, 2017 at 21:56 UTC
    Yeah, is that a problem?
    Oh, yes it is. Spawning a very large number of threads will most certainly take an heavy toll on the machine's resources and at best slowdown everything (or more probably bring your machine down), even if many of them are idle at any point of time.

    Other monks have already explained while I was off-line that it is much better to have a relatively limited number of threads picking work from a job queue (or something similar), and I absolutely agree with ikegami and tye on that.

    (And this is why I candidly asked the question in the first place, as well as to make sure I understood you correctly, because the idea seemed so extravagant to me.)

      So even after a thread is joined, it still consumes resources? I don't see memory usage growing or the number of threads growing as new threads are created to replace the finished ones. The CPU is utilized heavily the entire time it runs, but that is the point. Are you saying my CPU cycles are being wasted on threads that have already joined/returned?
      Please see reply to ikegami. I'm still interested in how using so many threads can be causing my crash, but if I can't get queues to work anywhere near as well, I don't see a practical alternative in Perl. Thanks.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1182549]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others rifling through the Monastery: (6)
As of 2024-04-19 17:19 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found