Beefy Boxes and Bandwidth Generously Provided by pair Networks
good chemistry is complicated,
and a little bit messy -LW
 
PerlMonks  

PerlIO file handle dup

by chris212 (Scribe)
on Mar 06, 2017 at 17:12 UTC ( #1183773=perlquestion: print w/replies, xml ) Need Help??
chris212 has asked for the wisdom of the Perl Monks concerning the following question:

I'm trying to have multiple threads read from the same file handle. I know they can't read concurrently, that is what the semaphore is for. I understand that file handles cannot be "shared", but threads can inherit a dup copy of a file handle. It seems with PerlIO, each copy tracks its own position, causing the data to read read out-of-sequence. This isn't the case if I use the :unix layer, but this kills performance without buffering. As a workaround, I can have my script keep track of the correct position and have each thread seek to it before reading data. This doesn't work if the file handle is piped output from a command such as gzip, though, since you cannot seek on it.

#!/opt/perl/bin/perl use strict; use threads; use Thread::Semaphore; my $fh; open($fh,'|-','gzip > test.txt'); foreach(1..1000) { print {$fh} sprintf('%04d',$_).('abc123' x 10)."\n"; } close($fh); open($fh,'-|','gzip','-cd','test.txt'); $| = 1; my @threads = (); my $sem = Thread::Semaphore->new(); foreach(1..3) { push(@threads,threads->create(\&test)); } $_->join() foreach(@threads); close($fh); print "\n"; sub test { my $tid = threads->tid(); my $line; while(1) { threads->yield(); $sem->down(); $line = <$fh> or last; print "Thread $tid ".$line; $sem->up(); } $sem->up(); }

I can't have the piped output read by a single thread, because I need the data read to be processed concurrently in threads. It is too slow to queue/dequeue the data between threads (too much data). Creating a new thread for each chunk of data and passing the data to the new thread did work, but caused intermittent crashes. Apparently this was due to nearly a million threads being created throughout execution, although only 34 would run concurrently. The MCE module has been suggested, but I don't understand it well enough to use that yet.

Apparently with stdio, these dup file handles would share a position? Is there any way to get a shared position without sacrificing buffering (even if each thread has it's own buffer)? Does using the stdio layer use the pre-5.8 I/O rather than PerlIO, or would that require re-compiling Perl?

UPDATE

I did some more reading about MCE, and apparently MCE's shared file handles are compatible with Perl threading, so I don't even need to replace Perl threading. It is really simple! Just replace open with mce_open, and the file handle is shared. It just works. For uncompressed files, it is still faster to have each thread seek rather than use MCE's IPC, though. I guess the buffering more than makes up for the IPC overhead for compressed files.

#!/opt/perl/bin/perl use strict; use threads; use Thread::Semaphore; use MCE::Shared; my $fh; open($fh,'|-','gzip > test.txt'); foreach(1..1000) { print {$fh} sprintf('%04d',$_).('abc123' x 10)."\n"; } close($fh); mce_open($fh,'-|','gzip -cd test.txt') or die("Failed to uncompress: $ +!\n"); $| = 1; my @threads = (); my $sem = Thread::Semaphore->new(); foreach(1..3) { push(@threads,threads->create(\&test)); } $_->join() foreach(@threads); close($fh); print "\n"; sub test { my $tid = threads->tid(); my $line; while(1) { threads->yield(); $sem->down(); $line = <$fh> or last; print "Thread $tid ".$line; $sem->up(); } $sem->up(); }

Replies are listed 'Best First'.
Re: PerlIO file handle dup
by BrowserUk (Pope) on Mar 07, 2017 at 07:31 UTC

    How does this compare with what you are doing?

    #! perl -slw use strict; use threads; use threads::shared; our $T //= 8; my @buffers; share( $buffers[ $_ ] ) for 0 .. $T-1; my $stdoutSem :shared; sub reader { my $fname = shift; open my $fh, '<', $fname or die $!; my $next = 0; while( <$fh> ) { chomp; lock $buffers[ $next ]; $buffers[ $next ] = $_; $next = ++$next % $T; } close $fh; for( 0 .. $T -1 ) { lock $buffers[ $_ ]; $buffers[ $_ ] = undef; } } sub worker { my $tid = threads->tid; my $bufn = shift; my $localbuf; while( 1 ) { { lock $buffers[ $bufn ]; last unless defined( $buffers[ $bufn ] ); $localbuf = $buffers[ $bufn ]; } ## process localbuf here. { lock $stdoutSem; print "[$tid] processed record: '", $localbuf, "'"; } } } my $reader = threads->new( \&reader, $ARGV[ 0 ] ); my @workers = map threads->new( \&worker, $_ ), 0 .. $T-1; $reader->join; $_->join for @workers;

    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority". The enemy of (IT) success is complexity.
    In the absence of evidence, opinion is indistinguishable from prejudice.
      I'm not sure I tried passing data between threads with a shared array, but I think performance would be similar to a queue, which was too slow.
        but I think performance would be similar to a queue,

        The problem with a queue is that all locking is applied to the entire shared array, thus every lock blocks all contenders, even if they are after different elements of the array.

        If you look carefully at my example, you'll see that the @buffers array itself isn't (explicitly) shared and is never locked; only the per-thread scalar elements are.

        And as only the reader thread and 1 worker thread per buffer are competing for any given lock, all workers threads are free to continue independantly of each other.

        And finally, the lock on any given buffer is only held for the brief time it takes to copy its contents to a local buffer, thus the reader thread can be repopulating it with the next record whilst the worker thread is processing the previous one.

        The upshot is that in my use of this technique, it beats Thread::Queue by a wide margin for applications where the processing of a record takes 3x or more time, than that required to read it.

        Test it. You might be pleasantly surprised.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". The enemy of (IT) success is complexity.
        In the absence of evidence, opinion is indistinguishable from prejudice.
      You lose sequence. I had to autoflush output to keep it from getting jumbled. I didn't realize you can share elements of an array without sharing the whole array. Can you do the same with hashes?
        You lose sequence.

        Sorry, I hadn't realised that was a requirement. I'll think about that.

        Can you do the same with hashes?

        Yes. But be aware that if you don't lock the hash and insertions and deletions are going on, things can get a little strange. (Eg. Iterating the hash keys might return a key, but by the time you try to access the associate value, the key might have been deleted.

        If the hash is essentially static -- only the values changing -- then only sharing the value scalars and applying locking to them individually can be much more efficient than locking the entire hash for every update.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". The enemy of (IT) success is complexity.
        In the absence of evidence, opinion is indistinguishable from prejudice.
Re: PerlIO file handle dup
by marioroy (Priest) on Mar 07, 2017 at 08:45 UTC

    Greetings,

    Welcome to the world of multi-threads or cores for that matter. It's one thing to make possible on the UNIX platform and altogether something else on the Windows platform. And not to forget Cygwin.

    For me, Perl is a box of crayons. MCE and MCE::Shared are my paintings. I imagine folks around the world joined together in a relay race. I have no idea what we're aiming for. It just happens to be my turn at this moment in time. The tools given me are a box of crayons named Perl, a laptop, and an agent name Perseverance. Perseverance brought along a long-time friend named Grace. Grace invited Randomness, an Uncle.

    The thing is that MCE and MCE::Shared may not be perfect. They are paintings, after all. Paintings take time to paint.

    In regards to any slowness, IMHO, let the script fly. Oh, please do. For this use-case, a semaphore is not necessary. Nor is yield. Upon starting a thread, that thread will begin interacting with the shared-manager immediately. That is why the same thread ID is shown repeatedly for many lines in the output. Eventually, the 2nd thread has completed spawning and joins the 1st thread. Thread 3 joins later due to being spawned last.

    I up'ed the count to 100k. The OP's script with semaphore + yield takes 3.6 seconds to run on a laptop (2.6 GHz - Core i7 Haswell). Removing the semaphore + yield allows the script to complete in 1.1 seconds. The latter includes threads writing to a shared output handle. In case it was missed, I removed the line to autoflush STDOUT; e.g. $| = 1. There's no reason to slow down IO. Let Perl fly. Ditto for MCE::Shared and workers.

    use strict; use threads; use MCE::Shared; { open my $fh, '|-', 'gzip > test.txt.gz'; foreach (1..100000) { print {$fh} sprintf('%04d',$_).('abc123' x 10)."\n"; } close $fh; } { mce_open my $fh, '-|', 'gzip -cd test.txt.gz' or die "open error: +$!\n"; mce_open my $out, '>', \*STDOUT or die "open error: $!\n"; my @thrs; foreach (1..3) { push @thrs, threads->create('test'); } $_->join() foreach @thrs; close($fh); sub test { my $tid = threads->tid(); # using shared output to not garble among threads while ( my $line = <$fh> ) { print {$out} "thread: $tid, line: $., ".$line; } } }

    It can run faster. To be continued in the next post.

    Regards, Mario.

      Greetings,

      To decrease the number of trips to and from the shared-manager, one can provide a suffix (k * 1024) or (m * 1024 * 1024) for the 3rd argument to read. That there enables chunk IO. Not to worry, the shared-manager completes reading until reaching the end of line or record. Notice $. It is the chunk_id, not the actual line number. The chunk_id value is important when output order is desired.

      OP's script involving semaphore + yield: 3.6 seconds. Shared handle (non-chunking): 1.1 seconds.

      Below, chunking completes in 0.240 seconds which is the total running time including initial gzip.

      use strict; use threads; use MCE::Shared; { open my $fh, '|-', 'gzip > test.txt.gz'; foreach (1..100000) { print {$fh} sprintf('%04d',$_).('abc123' x 10)."\n"; } close $fh; } { mce_open my $fh, '-|', 'gzip -cd test.txt.gz' or die "open error: +$!\n"; mce_open my $out, '>', \*STDOUT or die "open error: $!\n"; my @thrs; foreach (1..3) { push @thrs, threads->create('test'); } $_->join() foreach @thrs; close($fh); sub test { my $tid = threads->tid(); # using shared output to not garble among threads while (1) { my $n_chars = read $fh, my($buf), '4k'; last if (!defined $n_chars || $n_chars <= 0); print {$out} "## thread: $tid, chunkid: $.\n".$buf; } } }

      Regards, Mario.

      If we read one record at a time, the input semaphore isn't needed. However, I'm reading 500 records at a time, and they need to be in sequence. I suppose if I read in and processed one record at a time, I could eliminate the input semaphore when MCE::Shared is being used (probably not for regular file handles). However, I think that would make output slower since each thread needs to block until its processed data is the next to be written.

      I only put the yield in there because the first thread seemed to be hogging all the input before the other threads even started. In my actual script I'm not using MCE::Shared for the output file, and autoflush is needed to keep the output in order.

      So this

      read $fh, my($buf), '4k';

      is the same but faster than this?

      my $buf = <$fh>;

      If it always reads exactly one entire record regardless of "chunk size", what does the chunk size do exactly? Or is the chunk size a minimum, then it continues reading until EOL? It is confusing that MCE's read works fundamentally differently from Perl's read.

      I don't suppose there is a "readlines" function for MCE file handles? I assume if I could read all 500 lines at a time, that would minimize overhead related to MCE. For delimited input, I'm currently letting Text::CSV_XS read from the file handle, though.

        It is confusing that MCE's read works fundamentally differently from Perl's read.

        It's not clear what you mean by "MCE's read" but the snippet which you quoted as

        read $fh, my($buf), '4k';

        is most definitely Perl's read. HTH.

        In this context, a record is one line; e.g. $/ = "\n". When the 3rd argument to read contains a suffix 'k' or 'm', then it slurps up (e.g. '4k') including till the end of line, not EOL. This read behavior applies to MCE::Shared::Handle only. When missing the suffix 'k' or 'm', read behaves exactly like the native read.

        Yes, I had thought about adding readlines at the time. But, decided against it after writing the following.

        my @lines = tied(*{$fh})->readlines(10);

        In the end, I settled on having the file-handle specifics feel like native Perl and it does. The 'k' or 'm' suffix (extra behavior) provides chunk IO. Likewise, $. giving you chunk_id. One can get an extimate by "cat csv file | head -500 | wc". Take that and divide by 1024, append the k suffix to use with read. IMHO, there's no reason for workers to receive the same number of lines. Some will get a little less, some a little more.

        A possibility that comes to mind is having MCE::Shared export "mce_read" to provide full MCE-like chunk IO capabilites. A value greater than 8192 means to read number of bytes including till the end of line. If doing so, the following will only work for handles constructed with mce_open.

        # same as chunk_size => 1 in MCE $n_lines = mce_read $fh, \@lines, 1; # read max 500 lines $n_lines = mce_read $fh, \@lines, 500; # read 1m, including till the end of line $n_lines = mce_read $fh, \@lines, '1m'; # read 16k, ditto regarding till the end of line $n_lines = mce_read $fh, \@lines, '16k'; # same thing as above, but slurp into $buf $n_chars = mce_read $fh, $buf, 500; $n_chars = mce_read $fh, $buf, '1m'; $n_chars = mce_read $fh, $buf, '16k'; $. gives chunk_id

        Regards, Mario.

Re: PerlIO file handle dup
by marioroy (Priest) on Sep 03, 2018 at 00:16 UTC

    Hi chris212,

    At the time of my last reply, I didn't realize you had made an update. Disclaimer: Oh, btw, I'm not here to push MCE. Please use the module of your liking. Depending on the OS and/or number of workers, MCE::Mutex may run faster than Thread::Semaphore.

    Thread::Semaphore

    use strict; use warnings; use threads; use Thread::Semaphore; use MCE::Shared; use Time::HiRes 'time'; my $condvar = MCE::Shared->condvar; my $sem = Thread::Semaphore->new; # Start the shared server. Not necessary if Perl has IO::FDPass. MCE::Shared->start; sub test { $condvar->wait; for (1..10000) { threads->yield; $sem->down; $sem->up; } } threads->create('test') for 1..3; $condvar->broadcast(0.5); my $start = time; $_->join for threads->list; printf "duration: %0.03f secs\n", time - $start;

    MCE::Mutex::Channel

    use strict; use warnings; use threads; use MCE::Mutex; use MCE::Shared; use Time::HiRes 'time'; my $condvar = MCE::Shared->condvar; my $mutex = MCE::Mutex->new; # Start the shared server. Not necessary if Perl has IO::FDPass. MCE::Shared->start; sub test { $condvar->wait; for (1..10000) { threads->yield; $mutex->lock; $mutex->unlock; } } threads->create('test') for 1..3; $condvar->broadcast(0.5); my $start = time; $_->join for threads->list; printf "duration: %0.03f secs\n", time - $start;

    MCE::Mutex::Flock

    use strict; use warnings; use threads; use MCE::Mutex; use MCE::Shared; use Time::HiRes 'time'; my $condvar = MCE::Shared->condvar; my $mutex = MCE::Mutex->new( impl => 'Flock' ); # Start the shared server. Not necessary if Perl has IO::FDPass. MCE::Shared->start; sub test { $condvar->wait; for (1..10000) { threads->yield; $mutex->lock; $mutex->unlock; } } threads->create('test') for 1..3; $condvar->broadcast(0.5); my $start = time; $_->join for threads->list; printf "duration: %0.03f secs\n", time - $start;

    Results

    My laptop is a late 2013 Macbook Pro, 2.6 Ghz i7 quad CPU. Each virtual machine is configured with 4 cores.

    * 3 threads: CentOS Linux 7.3

    Thread::Semaphore 0.386 secs MCE::Mutex::Channel 0.162 secs MCE::Mutex::Flock 0.144 secs

    * 3 threads: Windows 7

    Thread::Semaphore 0.293 secs MCE::Mutex::Channel 0.499 secs MCE::Mutex::Flock 0.498 secs

    * 20 threads: CentOS Linux 7.3

    Thread::Semaphore 41.897 secs MCE::Mutex::Channel 0.980 secs MCE::Mutex::Flock 0.702 secs

    * 20 threads: Windows 7

    Thread::Semaphore 35.521 secs MCE::Mutex::Channel 2.994 secs MCE::Mutex::Flock 3.322 secs

    Regards, Mario

      Using the OP's example, I modified it to do locking via MCE::Mutex. Afterwards, toyed with a couple MCE demonstrations. All demonstrations output orderly.

      OP's example, locking via MCE::Mutex

      #!/opt/perl/bin/perl use strict; use threads; use MCE::Mutex; use MCE::Shared; open my $fh, '|-', 'gzip > test.txt.gz'; foreach (1..10000) { print {$fh} sprintf("%05d%s\n", $_, ('abc123' x 10)); } close $fh; mce_open $fh,'-|','gzip -cd test.txt.gz' or die "Failed to uncompress: $!\n"; $| = 1; my @threads = (); my $mutex = MCE::Mutex->new(); foreach (1..3) { push @threads, threads->create(\&test); } $_->join() foreach @threads; close $fh; print "\n"; sub test { my $tid = threads->tid(); my $line; while(1) { threads->yield(); $mutex->lock(); $line = <$fh> or last; print "Thread $tid ".$line; $mutex->unlock(); } $mutex->unlock; }

      MCE, no chunking

      #!/opt/perl/bin/perl use strict; use threads; use MCE; open my $fh, '|-', 'gzip > test.txt.gz'; foreach (1..10000) { print {$fh} sprintf("%05d%s\n", $_, ('abc123' x 10)); } close $fh; open $fh, '-|', 'gzip -cd test.txt.gz' or die "Failed to uncompress: $!\n"; $| = 1; # MCE spawns threads when threads is present MCE->new( chunk_size => 1, max_workers => 3, input_data => $fh, init_relay => 1, user_func => sub { my ($mce, $chunk_ref, $chunk_id) = @_; my $tid = threads->tid(); MCE::relay sub { print "Thread $tid ".$chunk_ref->[0]; }; } )->run; close $fh; print "\n";

      MCE, chunking enabled

      #!/opt/perl/bin/perl use strict; use threads; use MCE; open my $fh, '|-', 'gzip > test.txt.gz'; foreach (1..10000) { print {$fh} sprintf("%05d%s\n", $_, ('abc123' x 10)); } close $fh; open $fh, '-|', 'gzip -cd test.txt.gz' or die "Failed to uncompress: $!\n"; $| = 1; # MCE spawns threads when threads is present MCE->new( chunk_size => 500, max_workers => 3, input_data => $fh, init_relay => 1, user_func => sub { my ($mce, $chunk_ref, $chunk_id) = @_; my $tid = threads->tid(); my $buf = ''; foreach my $line ( @{ $chunk_ref } ) { $buf .= "Thread $tid ".$line; } MCE::relay sub { print $buf; }; } )->run; close $fh; print "\n";

      Results taken from a Linux CentOS 7.3 VM

      My laptop is a late 2013 Macbook Pro, 2.6 Ghz i7 quad CPU. The Linux virtual machine is configured with 4 cores.

      * 3 workers

      time perl script_sem.pl | wc -l # 0.543s OP's example time perl script_mutex.pl | wc -l # 0.663s Ditto, MCE::Mutex time perl script_mce.pl | wc -l # 0.225s MCE, no chunking time perl script_chunk.pl | wc -l # 0.077s MCE, chunking enabled

      * 10 workers

      time perl script_sem.pl | wc -l # 1.072s OP's example time perl script_mutex.pl | wc -l # 0.726s Ditto, MCE::Mutex time perl script_mce.pl | wc -l # 0.255s MCE, no chunking time perl script_chunk.pl | wc -l # 0.112s MCE, chunking enabled

      * 20 workers

      time perl script_sem.pl | wc -l # 1.849s OP's example time perl script_mutex.pl | wc -l # 0.803s Ditto, MCE::Mutex time perl script_mce.pl | wc -l # 0.339s MCE, no chunking time perl script_chunk.pl | wc -l # 0.179s MCE, chunking enabled

      Regards, Mario

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://1183773]
Front-paged by Arunbear
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others meditating upon the Monastery: (8)
As of 2019-04-23 15:52 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    I am most likely to install a new module from CPAN if:
















    Results (118 votes). Check out past polls.

    Notices?