Beefy Boxes and Bandwidth Generously Provided by pair Networks
Syntactic Confectionery Delight
 
PerlMonks  

Fast provider feeding slow consumer

by leostereo (Sexton)
on Apr 24, 2016 at 16:11 UTC ( #1161363=perlquestion: print w/replies, xml ) Need Help??
leostereo has asked for the wisdom of the Perl Monks concerning the following question:

Hi friends, a week ago I had to deal with this situation where a very fast consumer is feeding a slow consumer.
I posted my solution using forks wich was working fine running on a test server but when I put it to run on a production server it crashed.
Testing server is a very old box running centos 6 and production server is a Virtual machine on vmware plataform running Oracle RH ver 7.
Instead of figure out why it was running fine on one machine and crashing in the other I decided to go for a parallel solution.
Some users seggested my to read about parallel preforking so I came with these piped scripts:
./lines_dispacher.pl | ./lines_consumer_parallel2.pl

I want to say that Im running both scripts using pipes for two reasons:
First: I can not merge them ... I don't know how to do it, so some help on this would be great.
Second: I realized that this way I can use tee command and analize both outputs.
I would like to share both scripts so you can help me to improve them or maybe to suggest other alternatives to do this task. Thanks


##################################lines_dispacher.pl: #!/usr/bin/perl use IO::Socket::INET::Daemon; use Proc::Daemon; use Proc::PID::File; use IO::Handle; STDOUT->autoflush(1); my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => \&data, }, ); $host->run; sub data { my ($io, $host) = @_; my $line = $io->getline; chomp($line); return 0 unless $line; print "$line\n"; return !0; } ###########################################lines_consumer_parallel2.pl +: #!/usr/bin/perl use DBI; use Parallel::ForkManager; my $pm = Parallel::ForkManager->new(10); $forks =1; while(<>){ $pm->start() and next; # Parent nexts ### my ($type, $ip, $mac, $bsid, $datecode) = split(',', $_); $cpe=$ip; $mac=~s/-//g; $community='public'; $snmp_rssi = '.1.3.6.1.4.1.9885.9885.1.2.0'; $output=qx(snmpwalk -v2c -t1 -c $community $cpe $snmp_rssi + 2>&1); #this is the task that delays the consumer process. if( $output eq "Timeout: No Response from $ip" ) { $rssi=0; $error='SNMP not responding. Upgrade firmware'; } else { @result=split(/:/,$output); $rssi=$result[3]; $rssi=~s/ //g; $rssi=~s/\n//g; if($rssi < -100) { $rssi=$rssi/100; } $rssi=int($rssi); } $dbh = DBI->connect("DBI:mysql:database=cpe_info;host=172.24.3.207;por +t=3306","account_process","neting.!"); $query = "INSERT INTO cpe_info(mac,ip,bsid,rssi) VALUES". "('$mac','$ip','$bsid','$rssi')". "ON DUPLICATE KEY UPDATE ip='$ip',bsid='$bsid',rssi='$rssi'"; $sth = $dbh->prepare($query); $sth->execute(); $dbh->disconnect(); print "we are on fork number $forks\n"; $forks++; ### $pm->finish(); }

Last comment: I was also trying to print the fork number a the end of the consumer script. I did not get the expected output since all the lines prints "1" but I accidentally realized that it was the correct out since it is running on a different process. So other goal for me is to learn how can I get the forks number. Regards.

Replies are listed 'Best First'.
Re: Fast provider feeding slow consumer
by NetWallah (Canon) on Apr 24, 2016 at 19:37 UTC
    I belive BrowserUk originally wrote this code - I could not find his original post, so I'm sharing again.

    This uses threads/queues, and provides the functionality you are seeking, aside from being an excellent reference implementation:

    #! perl -slw use strict; use threads; use Thread::Queue; $| = 1; #$OUTPUT_AUTOFLUSH our $KIDS ||= 10; our $WORK ||= 500; our $SLEEP||= 5; sub kid { my( $Q ) = shift; my $tid = threads->tid; my $count=0; printf "Kid: %02d started\n", $tid; ## Pick a work item of the queue and process it while( my $work = $Q->dequeue ) { printf "Kid: %02d processing work item '%s'\n", $tid, $work; $count ++; ## Replace the sleep with the code to process teh work items rand > 0.7 and sleep rand( $SLEEP ); } print "kid: $tid ending after processing $count items.\n"; } ## A queue for communications my $Q = new Thread::Queue; ## Start the kids my @kids = map{ threads->create( \&kid, $Q ) } 1 .. $KIDS; ## Wait till they're all up and running sleep 1 until @{[ threads->list ]} == $KIDS; ## Feed the queue with work ## The limit just ensure we don't fill lots of memory for my $workitem ( 1 .. $WORK ) { sleep 1 while $Q->pending > $KIDS *10; print "Queueing work item $workitem"; $Q->enqueue( $workitem ); } ## Tell them to stop $Q->enqueue( (undef) x $KIDS ); ## And wait for them to do so. $_->join for @kids;

            This is not an optical illusion, it just looks like one.

      Excellent note my friend, I will study it.
Re: Fast provider feeding slow consumer
by BrowserUk (Pope) on Apr 25, 2016 at 00:20 UTC

    Here's how I do it with threads. One thread runs the sever; 10 do the smp walking; the main thread populates the database.

    (No clean up code because it wasn't clear from your code what caused things to stop? Compiles clean; but untested.)

    #! perl -slw use strict; use threads; use Thread::Queue; our $T //= 10; ## 10 walkers; adjust to suit. sub listener { my( $Qout ) = @_; require 'IO::Socket::INET::Daemon'; ## Requiring here means othe +r threads don't carry the redundant weight my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => sub { my ($io, $host) = @_; chomp( my $line = <$io> ); return 0 unless $line; $Qout->enqueue( $line ); ## send work to listener return !0; } }, ); $host->run; return; } sub walker { my( $Qin, $Qout ) = @_; while( $Qin->dequeue ) { ## receive work from listener my( $type, $ip, $mac, $bsid, $datecode ) = split( ',', $_ ); $mac =~ tr[-][]d; my $output = qx( snmpwalk -v2c -t1 -c public $ip .1.3.6.1.4.1. +9885.9885.1.2.0 2>&1 ); my( $rssi, $error ) = 0; if( $output eq "Timeout: No Response from $ip" ) { $error = 'SNMP not responding. Upgrade firmware'; } else { my @result = split( /:/, $output ); $rssi = $result[ 3 ]; $rssi =~ tr[ \n][]d; if( $rssi < -100 ) { $rssi = $rssi / 100; } $rssi = int( $rssi ); } $Qout->enqueue( join $;, $mac, $ip, $bsid, $rssi ); + ## Send data items to DBI as one scalar } } use enum qw[ IN DBI ]; my @Qs = map Thread::Queue->new(), 1 .. 2; + ## set up two Qs ## start the listener thread my $tListener = threads->create( \&listener, $Qs[ IN ] ); + ## One for the listener to send work to the walkers ## start 10 walkers my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI ] ) } 1 .. +$T; ## And one for the walkers to forward data for adding to the db require DBI; ## Avoid loading DBI into threads my $dbh = DBI->connect( "DBI:mysql:database=cpe_info;host=172.24.3.207 +;port=3306","account_process","neting.!" ); my $sth = $dbh->prepare( "INSERT INTO cpe_info(mac,ip,bsid,rssi) VALU +ES( ?, ?, ?, ? ) ON DUPLICATE KEY UPDATE ip = ?, bsid = ?, rssi = ?" +); ## process data produced by walkers while( $Qs[DBI]->dequeue ) { my( $mac, $ip, $bsid, $rssi ) = split $;; + ## Retrieve individual data $sth->execute( $mac, $ip, $bsid, $rssi, $ip, $bsid, $rssi ); + ## bind and execute } $dbh->disconnect(); __END__

    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority". I knew I was on the right track :)
    In the absence of evidence, opinion is indistinguishable from prejudice.

      Update: Renamed an enum constant from DBI to DB. Thank you NetWallah.

      When Perl lacks threads support, e.g. Perl on Solaris, AIX, one may use MCE::Hobo in place of threads for BrowserUk's demonstration.

      A Hobo is a migratory worker inside the machine that carries the asynchronous gene. Hobos are equipped with threads-like capabilities for running code asynchronously. Unlike threads, each hobo is a unique process to the underlying OS. The IPC is managed by MCE::Shared, which runs on all the major platforms including Windows and Cygwin.

      #! perl -slw use strict; use MCE::Hobo; use MCE::Shared; our $T //= 10; ## 10 walkers; adjust to suit. sub listener { my( $Qout ) = @_; require IO::Socket::INET::Daemon; ## Requiring here means other +threads don't carry the redundant weight my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => sub { my ($io, $host) = @_; chomp( my $line = <$io> ); return 0 unless $line; $Qout->enqueue( $line ); ## send work to listener return !0; } }, ); $host->run; return; } sub walker { my( $Qin, $Qout ) = @_; while( $Qin->dequeue ) { ## receive work from listener my( $type, $ip, $mac, $bsid, $datecode ) = split( ',', $_ ); $mac =~ tr[-][]d; my $output = qx( snmpwalk -v2c -t1 -c public $ip .1.3.6.1.4.1. +9885.9885.1.2.0 2>&1 ); my( $rssi, $error ) = 0; if( $output eq "Timeout: No Response from $ip" ) { $error = 'SNMP not responding. Upgrade firmware'; } else { my @result = split( /:/, $output ); $rssi = $result[ 3 ]; $rssi =~ tr[ \n][]d; if( $rssi < -100 ) { $rssi = $rssi / 100; } $rssi = int( $rssi ); } $Qout->enqueue( join $;, $mac, $ip, $bsid, $rssi ); + ## Send data items to DBI as one scalar } } use enum qw[ IN DB ]; my @Qs = map MCE::Shared->queue(), 1 .. 2; + ## set up two Qs ## start the listener thread my $tListener = MCE::Hobo->create( \&listener, $Qs[ IN ] ); + ## One for the listener to send work to the walkers ## start 10 walkers my @walkers = map{ MCE::Hobo->create( \&walker, @Qs[ IN, DB ] ) } 1 .. + $T; ## And one for the walkers to forward data for adding to the db require DBI; ## Avoid loading DBI into threads my $dbh = DBI->connect( "DBI:mysql:database=cpe_info;host=172.24.3.207 +;port=3306","account_process","neting.!" ); my $sth = $dbh->prepare( "INSERT INTO cpe_info(mac,ip,bsid,rssi) VALU +ES( ?, ?, ?, ? ) ON DUPLICATE KEY UPDATE ip = ?, bsid = ?, rssi = ?" +); ## process data produced by walkers while( $Qs[ DB ]->dequeue ) { my( $mac, $ip, $bsid, $rssi ) = split $;; + ## Retrieve individual data $sth->execute( $mac, $ip, $bsid, $rssi, $ip, $bsid, $rssi ); + ## bind and execute } $dbh->disconnect(); __END__

      BrowserUk's cool demonstration is working after removing the quotes for the require statement on line 10.

      sub listener { my( $Qout ) = @_; require IO::Socket::INET::Daemon; ## Requiring here means other +threads don't carry the redundant weight ... }
      Dear BrowserUk; I can not believe you wrote this script for me, I really apreciate your effort on this and of course I will try and back to you with the result.
      Regarding your question about what was wrong with my script, let me explain.
      The current script I posted on this node is working good, I wanted to share it since it is the first time I use those modules and I wanted to get some comments/advices from you.
      The problem I had with my previous script using fork on the production server is that CPU goes to %100.
      It was very weird since running on the testing server it was working good, perhaps it was due to a difference on the OS architecuture , signal handling ... dont know, but decided to try other ways.
      Any way , thanks for your time and patience.
      Regards.
      Leandro.

      Dear BrowserUk , the script is almost done, After applying both changes it begun working, but there is something wrong when the walker receives the line from the listener, it receives only a number instead of the complete line.
      I put them to print as following:

      sub listener { my( $Qout ) = @_; require IO::Socket::INET::Daemon; ## Requiring here means other +threads don't carry the redundant weight my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => sub { my ($io, $host) = @_; chomp( my $line = <$io> ); return 0 unless $line; print "here line is : $line"; $Qout->enqueue( $line ); ## send work to listener return !0; } }, ); $host->run; return; } sub walker { my( $Qin, $Qout ) = @_; while( $Qin->dequeue ) { ## receive work from listener print "here \$\_ is : $_"; my( $type, $ip, $mac, $bsid, $datecode ) = split( ',', $_ ); $mac =~ tr[-][]d;
      and got the following:
      here line is : 2,190.115.32.180,00-21-07-2E-DD-6A,000044000070,20160426105156,D
      here $_ is : 7
      Use of uninitialized value in transliteration (tr///) at ./lines_dispacher_and_consumer_threads.pl line 34.
      Use of uninitialized value $ip
      ....
      ...
      ..
      .

      Im reading the Thread::Queue documentacion and the example provided looks very similar like your method ... I will keep investigating it. Thanks

        it receives only a number instead of the complete line.

        I have really no idea how that could possibly happen; nor do I see anything in the code you posted that would explain it.

        If you haven't already solved it; please post the full code that demonstrates the problem and I'll try and re-create it here.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". I knew I was on the right track :)
        In the absence of evidence, opinion is indistinguishable from prejudice.

        Ok , the script is working , there is one good and one bad news. The bad news is that there is 1 on 10 aprox snmp queries that returns empty rssi values.
        (I manually cheked that device is alive and can process snmp queries).
        I tryed incrementing the number of workers to 20 but it has no effect.
        I wonder if is it possible to put those empty lines back on the walkers queue.
        The good news is that the processor load is 8 times less thant the script at the begining of this post.
        I need to read/learn about why threads are much efficient than fork/prefork.
        O .... lets try.
        And ...
        Thanks fo everything!

      Dear BrowserUk; I made some minor modifications to test it but can not make it run yet.
      I wrote:

      #!/usr/bin/perl -slw use strict; use threads; use Thread::Queue; our $T = 10; ## 10 walkers; adjust to suit. sub listener { my( $Qout ) = @_; require 'IO::Socket::INET::Daemon'; ## Requiring here means othe +r threads don't carry the redundant weight my $host = new IO::Socket::INET::Daemon( host => '172.24.3.208', port => 7777, timeout => 20, callback => { data => sub { my ($io, $host) = @_; chomp( my $line = <$io> ); return 0 unless $line; $Qout->enqueue( $line ); ## send work to listener # +#-> aren't going to the walker ? return !0; } }, ); $host->run; return; } sub walker { my( $Qin, $Qout ) = @_; while( $Qin->dequeue ) { ## receive work from listener my( $type, $ip, $mac, $bsid, $datecode ) = split( ',', $_ ); $mac =~ tr[-][]d; my $output = qx( snmpwalk -v2c -t1 -c public $ip .1.3.6.1.4.1. +9885.9885.1.2.0 2>&1 ); my( $rssi, $error ) = 0; if( $output eq "Timeout: No Response from $ip" ) { $error = 'SNMP not responding. Upgrade firmware'; } else { my @result = split( /:/, $output ); $rssi = $result[ 3 ]; $rssi =~ tr[ \n][]d; if( $rssi < -100 ) { $rssi = $rssi / 100; } $rssi = int( $rssi ); } $Qout->enqueue( join $;, $mac, $ip, $bsid, $rssi ); ## Send +data items to DBI as one scalar } } use enum qw[ IN DBI ]; my @Qs = map Thread::Queue->new(), 1 .. 2; # set up two Qs ## start the listener thread my $tListener = threads->create( \&listener, $Qs[ IN ] ); ## One for t +he listener to send work to the walkers ## start 10 walkers my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI ] ) } 1 .. +$T; ## And one for the walkers to forward data for adding to the db require DBI; ## Avoid loading DBI into threads my $dbh = DBI->connect("DBI:mysql:database=cpe_info;host=172.24.3.207; +port=3306","account_process","neting.!" ); my $sth = $dbh->prepare("INSERT INTO cpe_info(mac,ip,bsid,rssi) VALUE +S( ?, ?, ?, ? ) ON DUPLICATE KEY UPDATE ip = ?, bsid = ?, rssi = ?"); ## process data produced by walkers while( $Qs[DBI]->dequeue ) { my( $mac, $ip, $bsid, $rssi ) = split $;; ## +Retrieve individual data $sth->execute( $mac, $ip, $bsid, $rssi, $ip, $bsid, $rssi ); ## +bind and execute } $dbh->disconnect();

      It yields:

      ./lines_dispacher_and_consumer_threads.pl Thread 1 terminated abnormally: Can't locate IO::Socket::INET::Daemon +in @INC (@INC contains: /root/perl5/lib/perl5/5.16.3/x86_64-linux-thr +ead-multi /root/perl5/lib/perl5/5.16.3 /root/perl5/lib/perl5/x86_64-l +inux-thread-multi /root/perl5/lib/perl5 /usr/local/lib64/perl5 /usr/l +ocal/share/perl5 /usr/lib64/perl5/vendor_perl /usr/share/perl5/vendor +_perl /usr/lib64/perl5 /usr/share/perl5 .) at ./lines_dispacher_and_c +onsumer_threads.pl line 10. Can't call method "connect" without a package or object reference at . +/lines_dispacher_and_consumer_threads.pl line 65. Perl exited with active threads: 10 running and unjoined 1 finished and unjoined 0 running and detached

      Very strange , IO::Socket::INET::Daemon is at /usr/local/share/perl5 folder , I have used it with no problem. The other error is about the dbh=DBI->connect line ... also ver strange. I have been playing around with both modules declaration statements but still can not make it work ... I will keep trying.
      Regards.
      Leandro.

        The error appears to occur due to the fact that the code uses "DBI" both as an ENUM and a module name. Try changing these 2 lines:
        use enum qw[ IN DBI_ENUM ]; # *CHANGED* my @Qs = map Thread::Queue->new(), 1 .. 2; # set up two Qs ... ## start 10 walkers # *CHANGED* next line my @walkers = map{ threads->create( \&walker, @Qs[ IN, DBI_ENUM ] ) } +1 .. $T; ##

                This is not an optical illusion, it just looks like one.

        marioroy seems to have identified the first error, and netwallah the second.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". I knew I was on the right track :)
        In the absence of evidence, opinion is indistinguishable from prejudice.
Re: Fast provider feeding slow consumer
by graff (Chancellor) on Apr 24, 2016 at 18:34 UTC
    I don't use smnpwalk or Parallel::ForkManager, so I apologize in advance if my comments are of no use to you, but...

    First, the only thing you've said (so far) about the problem on the production server is "it crashed." Can you be more specific? (What error messages do you get?)

    Second, I wonder why you don't include the (slow) snmpwalk operation as part of the first script -- that is, do the fork management right there in the &data callback as you read from the socket, and print a line to STDOUT with four "insertable" values for each socket input, instead of just echoing it.

    Then, your downstream process would be doing just the database connection and insertion without fork management being involved, which seems to me like a worthwhile way to keep things simple.

    (updated to be slightly less wordy)

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://1161363]
Front-paged by Discipulus
help
Chatterbox?
and all is quiet...

How do I use this? | Other CB clients
Other Users?
Others browsing the Monastery: (7)
As of 2017-12-17 23:43 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    What programming language do you hate the most?




















    Results (466 votes). Check out past polls.

    Notices?