Beefy Boxes and Bandwidth Generously Provided by pair Networks
No such thing as a small change
 
PerlMonks  

Re: worker threads - one does all the work

by marioroy (Prior)
on Jun 09, 2017 at 03:56 UTC ( [id://1192410]=note: print w/replies, xml ) Need Help??


in reply to worker threads - one does all the work

Update: Previously, I noted the time after enqueuing 10k traps into the queue. Unfortunately, I didn't factor the traps still pending in the queue and met to report the duration time after the queue is depleated. I've gone through and corrected all my posts in this thread.

Hello jvuman and welcome to the amazing monastery. Thank you for introducing Net::SNMPTrapd which I've not used before.

It is possible to run a powerful trap server using a single listener and many consumers. One might do so using threads and Thread::Queue or similarly with MCE::Flow and MCE::Queue. The latter is provided below. Here, I have each consumer sleep for 4 milliseconds to simulate work. Awaiting on the queue is a safety measure to prevent the queue from consuming gigabytes of memory in the event receiving millions of traps. Please adjust the threshold to your satisfaction.

In my testing, the server process never entered the pending if statement. MCE and MCE::Shared (not used here) involve IPC behind the scene. Fetching is faster from a BSD OS (e.g. FreeBSD, darwin) when compared to Linux. See this post for more info.

perl snmp_test.pl >/dev/null duration: 1.013 seconds

That's seems fast considering that 2 producers share CPU time with the listener process and consumers.

use strict; use warnings; use feature 'say'; use Time::HiRes qw( sleep time ); use Net::SNMPTrapd; use MCE::Flow; use MCE::Queue; my $queue = MCE::Queue->new( await => 1, fast => 1 ); my $start = time; # floating seconds # from left to right: 1 listener, 30 consumers, and 2 producers mce_flow { max_workers => [ 1, 30, 2 ] }, \&listener, \&consumer, \&pr +oducer; printf {*STDERR} "duration: %0.3f seconds\n", time() - $start; exit(0); # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # listener and consumer roles # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ sub listener { my $snmptrapd = Net::SNMPTrapd->new( ReusePort => 1 ); my $count = 0; while ( 1 ) { my $trap = $snmptrapd->get_trap(); if ( !defined $trap ) { printf {*STDERR} "$0: %s\n", Net::SNMPTrapd->error(); next; } elsif ( $trap == 0 ) { next; } # important, remove the file handle inside the object # so that serialization into the queue succeeds delete $trap->{_UDPSERVER_}; # enqueue the trap for a consumer to process $queue->enqueue($trap); # await will block if the queue has more than 2000 pending # this prevents the queue from consuming memory out of control # simply increase the number of consumers to not block # # $queue->await(2000) if ( ++$count % 4000 == 0 ); if ( ( ++$count % 4000 == 0 ) && $queue->pending() > 3000 ) { say {*STDERR} "$0: WARN: blocking temporarily"; $queue->await(2000); } # reset the counter to not overflow $count = 0 if ( $count > 2e9 ); # for benchmarking, leave the loop after 4000 traps last if ( $count == 4000 ); } # notify the manager process to stop the producer and queue MCE->do('quit_producers'); } sub consumer { while ( defined ( my $trap = $queue->dequeue() ) ) { $trap->process_trap(); # printf "[$$] %s\t%i\t%i\t%s\n", # $trap->remoteaddr, $trap->remoteport, # $trap->version, $trap->community; say "[$$] ".$trap->varbinds->[5]->{'1.3.6.1.4.1.50000.1.6'}; sleep 0.004; } } # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ # producer role for generating traps # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ my @producer_pids; sub set_producer_pid { push @producer_pids, $_[0]; } sub quit_producers { kill 'QUIT', @producer_pids; $queue->end(); } sub producer { require Net::SNMP; my ( $run, $i ) = ( 1, 0 ); $SIG{QUIT} = sub { $run = 0 }; # notify the manager process my pid MCE->do('set_producer_pid', $$); my ( $session, $error ) = Net::SNMP->session( -hostname => 'localhost', -version => 2, -community => 'public', -port => 162 ); if ( !defined $session ) { printf "Error: Starting SNMP session (v2c trap) - %s\n", $erro +r; return; } while ( $run ) { my $result = $session->snmpv2_trap( -varbindlist => [ '1.3.6.1.2.1.1.3.0', 0x43, int( time() ), '1.3.6.1.6.3.1.1.4.1.0', 0x06, '1.3.6.1.4.1.50000', '1.3.6.1.4.1.50000.1.3', 0x02, 1, '1.3.6.1.4.1.50000.1.4', 0x04, 'String', '1.3.6.1.4.1.50000.1.5', 0x06, '1.2.3.4.5.6.7.8.9', '1.3.6.1.4.1.50000.1.6', 0x40, '10.10.10.'.((++$i % 1 +00) + 1), '1.3.6.1.4.1.50000.1.7', 0x41, 32323232, '1.3.6.1.4.1.50000.1.8', 0x42, 42424242, '1.3.6.1.4.1.50000.1.9', 0x43, int( time() ), '1.3.6.1.4.1.50000.1.10', 0x44, 'opaque data' ] ); } $session->close; }

In reality, the server process can handle more than 4,000 traps per second simply by running the server and consumers only.

mce_flow { max_workers => [ 1, 30 ] }, \&server, \&consumer;

Regards, Mario.

Replies are listed 'Best First'.
Re^2: worker threads - one does all the work
by marioroy (Prior) on Jun 09, 2017 at 04:48 UTC

    Update: I had a chance to run this on a Linux machine. IPC wise (fetching - dequeue) is slower from a Linux OS than from a BSD variant: e.g. FreeBSD, darwin. Running with threads also takes longer. See this post for more info. Below, I've updated the main script to show the enqueue time, pending count, and finally the duration time. A production environment might have a load balancer and 4 pizza-box (1-inch) servers. Together, the 4 servers can handle 1 million traps per minute if that's the scale needed. I reached 4.5k traps per second on my Linux machine.

    Update: Loading threads at the top of the script will have MCE spawn threads instead. Doing so may cause consumers to run slower for some reason on Linux. Maybe it's from running an older Perl 5.16.3 release. I'm not sure. For maximum performance check to see if Perl has Sereal installed. MCE defaults to Sereal::Encoder 3.015+ and Sereal::Decoder 3.015+ for serealization when available.

    use threads;

    Update: I've updated the code to synchronize STDOUT and STDERR output to the manager process. Furthermore, specified user_output and user_error MCE options so that one can send to a logging routine if need be. Omitting them will have the manager process write directly to STDOUT and STDERR respectively.

    Hello again jvuman,

    Generating traps is handled by another script. Doing this allowed me to move the trap generator (producer) to another machine. My laptop running the server script can process 6k per second.

    use strict; use warnings; # perl snmp_server.pl | wc -l use Time::HiRes qw( sleep time ); use Net::SNMPTrapd; use MCE::Flow; use MCE::Queue; my $queue = MCE::Queue->new( await => 1, fast => 1 ); my $max_consumers = 30; my $start = 0; MCE::Flow::init { user_output => sub { print {*STDOUT} $_[0]; }, user_error => sub { print {*STDERR} $_[0]; }, }; mce_flow { max_workers => [ 1, $max_consumers ] }, \&listener, \&consu +mer; printf {*STDERR} "duration : %0.03f seconds\n", time - $start; exit(0); sub set_start { $start = $_[0]; } sub listener { my $snmptrapd = Net::SNMPTrapd->new( ReusePort => 1 ); my $count = 0; my $start; while ( 1 ) { my $trap = $snmptrapd->get_trap(); if ( !defined $trap ) { MCE->printf(\*STDERR, "$0: %s\n", Net::SNMPTrapd->error()) +; next; } elsif ( $trap == 0 ) { next; } $start = time(), MCE->do('set_start', $start) unless $start; # important, remove the file handle inside the object delete $trap->{_UDPSERVER_}; # enqueue the trap for a consumer to process $queue->enqueue($trap); # leave the loop after 10,000 traps last if ( ++$count >= 10000 ); # safety to prevent the queue from consuming memory out of con +trol # $queue->await(2000) if ( $count % 4000 == 0 ); # if ( ( $count % 4000 == 0 ) && $queue->pending() > 3000 ) { # MCE->say(\*STDERR, "Warn: blocking temporarily"); # $queue->await(2000); # } # reset the counter to not overflow $count = 0 if ( $count > 2e9 ); } $queue->enqueue((undef) x $max_consumers); MCE->printf(\*STDERR, "enqueue : %0.03f seconds\n", time - $start +); MCE->printf(\*STDERR, "pending : %d\n", $queue->pending()); } sub consumer { while ( defined ( my $trap = $queue->dequeue() ) ) { $trap->process_trap(); MCE->printf( "[$$] %s\t%i\t%i\t%s\n", $trap->remoteaddr, $trap->remoteport, $trap->version, $trap->community ); sleep 0.004; } }

    Here is the producer script for generating traps in parallel. This is useful for load testing.

    use strict; use warnings; # perl snmp_producer.pl use Net::SNMP; use MCE::Flow; use MCE::Queue; mce_flow { max_workers => 2 }, \&producer; exit(0); sub producer { my ( $session, $error ) = Net::SNMP->session( -hostname => '127.0.0.1', -version => 2, -community => 'public', -port => 162 ); if ( !defined $session ) { printf "Error: Starting SNMP session (v2c trap) - %s\n", $erro +r; return; } for my $i ( 1 .. 5000 ) { my $result = $session->snmpv2_trap( -varbindlist => [ '1.3.6.1.2.1.1.3.0', 0x43, int( time() ), '1.3.6.1.6.3.1.1.4.1.0', 0x06, '1.3.6.1.4.1.50000', '1.3.6.1.4.1.50000.1.3', 0x02, 1, '1.3.6.1.4.1.50000.1.4', 0x04, 'String', '1.3.6.1.4.1.50000.1.5', 0x06, '1.2.3.4.5.6.7.8.9', '1.3.6.1.4.1.50000.1.6', 0x40, '10.10.10.'.((++$i % 1 +00) + 1), '1.3.6.1.4.1.50000.1.7', 0x41, 32323232, '1.3.6.1.4.1.50000.1.8', 0x42, 42424242, '1.3.6.1.4.1.50000.1.9', 0x43, int( time() ), '1.3.6.1.4.1.50000.1.10', 0x44, 'opaque data' ] ); } $session->close; }

    To benchmark, run snmp_server.pl on machine A. Then, run snmp_producer.pl on machine B. Remember to change the IP address to host A inside producer if running on another host.

    Host A : perl snmp_server.pl | wc -l Host B : perl snmp_producer.pl

    Regards, Mario.

      The following does similarly to what the OP described. I've experienced lost traps on Linux, causing the server script to never leave the loop. On the Mac, it takes 3.822 seconds to process 10,000 traps when successful.

      use strict; use warnings; use threads; use threads::shared; use Time::HiRes qw( sleep time ); use Net::SNMPTrapd; use MCE::Flow; use MCE::Queue; my $tid = 0; sub CLONE { $tid = threads->tid(); } my $snmptrapd = Net::SNMPTrapd->new( ReusePort => 1 ); my $max_workers = 30; my $count : shared = 0; my $start : shared; MCE::Flow::init { user_output => sub { print {*STDOUT} $_[0]; }, user_error => sub { print {*STDERR} $_[0]; }, }; mce_flow { max_workers => $max_workers }, \&server; printf {*STDERR} "duration: %0.03f seconds\n", time - $start; exit(0); sub server { my $done = 0; while ( 1 ) { my $trap; { lock $count; $start = time unless $start; $trap = $snmptrapd->get_trap(); $done = 1 if ( ++$count > 10000 - $max_workers ); } if ( !defined $trap ) { MCE->printf(\*STDERR, "$0: %s\n", Net::SNMPTrapd->error()) +; next; } elsif ( $trap == 0 ) { next; } $trap->process_trap(); MCE->printf( "[%02d] %s\t%i\t%i\t%s\n", $tid, $trap->remoteaddr, $trap->remoteport, $trap->version, $trap->community ); last if $done; sleep 0.004; } }

      Regards, Mario.

Re^2: worker threads - one does all the work
by zentara (Archbishop) on Jun 09, 2017 at 13:52 UTC
    Wow, Mario, I'm so glad you are posting many workable templates for MCE. I'm saving them all. You have a Cookbook yet?

    I'm not really a human, but I play one on earth. ..... an animated JAPH

      Hi zentara. Started something on github but am not liking the format and stopped temporarily. At some point will renew the documentation.

A reply falls below the community's threshold of quality. You may see it by logging in.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1192410]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others cooling their heels in the Monastery: (4)
As of 2024-04-24 21:09 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found