http://www.perlmonks.org?node_id=664360


in reply to RFC: A module for SNMP polling of thousands of nodes simultaneously

I've re-worked some things to make the module handle ridiculously large numbers of hosts, and somehow, I got a small speed-boost out of that change. Also, I've added some features like having user-specified (pre|post-)query callbacks and a 'master' timeout, just like SNMP::Effective. Still, this module is about 4x as fast in a 1000 node test.

I *did* just run a test across 6900+ nodes using my module in it's current state. I got some interesting results. First off, the program ran out of memory when passing the results back after the execute() call (or maybe during the Dumper call). However, even after that - the program completed in 3 minutes and 59 seconds!

Without further ado, here's the module code:
#!/usr/bin/perl package SNMP::Query::AsynchMulti; # Pragmas use strict; use warnings; # Standard use Carp; # Cpan use SNMP; # This comes in handy so we don't pass bogus # parameters to SNMP::Session->new() our @valid_sess_params = qw( DestHost Community Version RemotePort Timeout Retries RetryNoSuch SecName SecLevel SecEngineId ContextEngineId Context AuthProto AuthPass PrivProto PrivPass AuthMasterKey PrivMasterKey AuthLocalizedKey PrivLocalizedKey VarFormats TypeFormats UseLongNames UseSprintValue UseEnums UseNumeric BestGuess NonIncreasing ); #---------------------------------------------------------- # Constructor sub new { my $class = shift; my $self = bless {}, $class; $self->{query_stack} = []; $self->{results} = []; $self->{max_in_flight} = 10; $self->{current_in_flight} = 0; $self->{total_this_run} = 0; $self->{grand_total} = 0; return $self; } # Verifies that the named parameter is a subref sub _check_param_callback { my $self = shift; my $param_name = shift; # string, hash key my $params = shift; # hashref return 1 unless exists $params->{$param_name}; croak "Bad parameter for [$param_name] - not a CODE ref" if ref $params->{$param_name} ne 'CODE'; return 1; } # TODO Fill in the code and use in the add() or _add_getbulk_query() m +ethods # Verifies that the named parameter is something the SNMP # module can use as a VarBind or VarBindList. sub _check_param_varbinds { my $self = shift; my $param_name = shift; # string, hash key my $params = shift; # hashref } sub add { my $self = shift; my $params = shift; my $query_stack = $self->{query_stack}; # These are required for all query ops so make sure they're presen +t. my $varbinds = $params->{VarBinds} || croak "Bad or missing parameter [VarBinds]"; my $query_type = $params->{QueryType} || croak "Bad or missing parameter [QueryType]"; # Make sure our callback params are valid. $self->_check_param_callback( 'PreCallback', $params ); $self->_check_param_callback( 'PostCallback', $params ); if ( $query_type eq 'getbulk' ) { my $query = $self->_make_getbulk_query($params); my $query_stack = $self->{query_stack}; push @$query_stack, $query; } else { croak "Attempt to add using unknown query type: $query_type\n" +; } return 1; } sub _make_getbulk_query { my $self = shift; my ($query_info) = @_; # These params are required for a getbulk query op. my $non_repeaters = exists $query_info->{NonRepeaters} ? $query_info->{NonRepeaters} : croak "Bad or missing parameter [NonRepeaters]"; my $max_repeaters = exists $query_info->{MaxRepeaters} ? $query_info->{MaxRepeaters} : croak "Bad or missing parameter [MaxRepeaters]"; # Currently, these are validated in the add() method, so no need h +ere. my $preop_callback = $query_info->{PreCallback}; my $postop_callback = $query_info->{PostCallback}; # TODO I may want to add a method to validate # and/or transform the VarBinds parameter my $varbinds = $query_info->{VarBinds}; # Parse out the parameters for creating the session. # I really think I should be validating them better here... # Maybe I need a separate subroutine... # TODO write the routine described above. my %sess_params; $sess_params{$_} = $query_info->{$_} for grep { exists $query_info->{$_} } @valid_sess_params; return sub { my $callback = sub { my $bulkwalk_results = shift; # Store the results and info about the query for later... push @{ $self->{results} }, $query_info, $bulkwalk_results +; $postop_callback->() if $postop_callback; $self->{current_in_flight}--; if ( scalar @{ $self->{query_stack} } ) { my $next_query = pop @{ $self->{query_stack} }; return $next_query->(); } $self->{current_in_flight} <= 0 ? return SNMP::finish() : +return 1; }; my $sess = SNMP::Session->new(%sess_params); $preop_callback->() if $preop_callback; $self->{current_in_flight}++; $self->{total_this_run}++; $self->{grand_total}++; return $sess->bulkwalk( $non_repeaters, $max_repeaters, $varbi +nds, [$callback] ); }; } sub current_in_flight { shift->{current_in_flight} } sub total_this_run { shift->{total_this_run} } sub grand_total { shift->{grand_total} } sub shuffle { } sub execute { my $self = shift; my $params = shift; # The KeepLast option can come in handy if, for example, another # thread or process is working on the contents of the results # array from a previous execution and may not finish before the # next execution. @{ $self->{results} } = () unless ( # I'll make my OWN idioms from now on, HAHA! (You can explicit +ly # set keeplast or it will use the object's default) defined $params->{KeepLast} ? $params->{KeepLast} : $self->{Ke +epLast} ); # Determine our maximum concurrency level for this run my $max_in_flight = $params->{InFlight} || $self->{max_in_flight}; # Make a copy of the stack in case we want to run the same query my $query_stack_ref = $self->{query_stack}; my @query_stack_copy = @{ $self->{query_stack} }; # Set some counters $self->{current_in_flight} = 0; $self->{total_this_run} = 0; # Begin issuing operations while ( scalar @$query_stack_ref ) { my $query = pop @$query_stack_ref; $query->(); last if $self->{current_in_flight} >= $max_in_flight; } # Wait for the ops to complete, or time-out (if specified) $params->{MasterTimeout} ? SNMP::MainLoop( $params->{MasterTimeout}, &SNMP::finish() ) : SNMP::MainLoop(); # Reset the stack for the next run. $self->{query_stack} = \@query_stack_copy; return $self->get_results(); } # Returns a reference to a copy of the results of the last query exec +ution sub get_results { return \@{ shift->{results} }; } 1; __END__

And here's the test program's code:
#!/usr/bin/perl use strict; use warnings; use Data::Dumper; use Carp; use Parse::CSV; use SNMP; use SNMP::Query::AsynchMulti; #--------------------------------------------------------- my $csv_file = shift || die "Please specify a CSV file with SNMP host +info!"; my $max_inflight = shift || 50; my $num_cycles = shift || 1; my $master_timeout = 0; # Set to number of seconds before # all queries are terminated. # 0 means no master timeout. my $batch_size = 50; # Run a callback whenever this many # results have been returned my @varbinds = qw( ifDescr ifInOctets ifOutOctets ifAlias ifType ifName ifInErrors ifOutErrors ifSpeed ifAdminStatus ifOperStatus ); my @reqired_fields = qw(HOSTIP COMMUNITY SNMPVER SNMPPORT); my @hosts = read_hosts_csv( $csv_file, @reqired_fields ); @hosts = clean_hosts_data( \@hosts ); # This object encapsulates the desired queries to run. my $query = SNMP::Query::AsynchMulti->new(); # This probably isn't necessary, but it's the Right Thing To Do. my $varlist = SNMP::VarList->new( map { [$_] } @varbinds ); my $preop_callback = sub { warn "+ IF/CT/GT: " . $query->current_in_flight() . "/" . $query->total_this_run() . "/" . $query->grand_total() . "\n"; }; my $postop_callback = sub { warn "- IF/CT/GT: " . $query->current_in_flight() . "/" . $query->total_this_run() . "/" . $query->grand_total() . "\n"; }; foreach my $host (@hosts) { $query->add( { # Params concerning the SNMP Session DestHost => $host->{HOSTIP}, Community => $host->{COMMUNITY}, Version => $host->{SNMPVER}, RemotePort => $host->{SNMPPORT}, #Timeout => $host->{SNMP_TIMEOUT}, #Retries => $host->{SNMP_RETRIES}, # Params concerning the type of query operation QueryType => 'getbulk', # See POD for explanation... MaxRepeaters => 20, # Additional options depend +on NonRepeaters => 0, # the QueryType... # The varbinds to be operated on VarBinds => $varlist, # Callbacks before and after this query op. PreCallback => $preop_callback, # Do this before the +op PostCallback => $postop_callback, # Do this after the o +p } ); warn "Added query to: $host->{HOSTIP}\n"; } warn "Shuffling queries (not yet implemented)\n"; $query->shuffle(); # Randomize order of queries...(not yet implemen +ted) # Run all the added queries with up to $max_inflight # asynchronous operations in-flight at any time. # Lather, rinse, repeat for $num_cycles. warn "Beginning polling cycle\n"; #use DDS; #warn Dump $query; exit; foreach my $iter ( 1 .. $num_cycles ) { sleep 30 unless $iter == 1; my $results = $query->execute( { InFlight => $max_inflight, MasterTimeout => $master_timeout, # TODO Implement the batching functionality BatchSize => 50, # Number of results that makes up a 'b +atch' BatchCallback => sub { 1 } , # Sub to call when a batch is done (or all results ar +e in) } ); print Dumper $results; } exit; #--------------------------------------------------------- # Read in the CSV file. sub read_hosts_csv { my $file = shift; my @required_fields = @_; # Parse entries from a CSV file into hashes hash my $csv_parser = Parse::CSV->new( file => $file, fields => 'auto', # Use the first line as column headers, # which become the hash keys. ); my @node_cfg; # Return a reference to this my $line_num = 0; while ( my $line = $csv_parser->fetch() ) { $line_num++; my $error_flag = 0; foreach my $field (@required_fields) { if ( !exists $line->{$field} ) { $error_flag = 1; carp "Missing field [$field] on line [$line_num] in CSV file [$file]"; } } croak "Terminating due to errors on line [$line_num] in CSV file [ +$file]" if $error_flag; push @node_cfg, $line; } if ( $csv_parser->errstr() ) { croak "Fatal error parsing [$file]: " . $csv_parser->errstr(); } return @node_cfg; } sub clean_hosts_data { my $hosts_data = shift; my @clean_hosts; foreach my $host (@$hosts_data) { # Maybe put in a loop to scrub leading and trailing # whitespace from each field? Yeah, I know. map in # void context is the devil's work, yadda, yadda. map { s/^\s*|\s*$//g } values %$host; if ( $host->{SNMPVER} == 2 #=~ /^1|2c?|3$/ && $host->{SNMPPORT} =~ /^\d+$/ && $host->{HOSTIP} =~ /^(?:\d{1,3}\.){3}\d{1,3}$/ # Flawed, but Good Enough. && $host->{COMMUNITY} ) { push @clean_hosts, $host; } else { warn "Invalid host data - skipping:\n" . " " . Dumper($host) . "\n"; } } return @clean_hosts; } 1; __END__

And finally, the example dummy node info CSV file
NODE_NAME,NODE_TYPE,COMMUNITY,HOSTIP,SNMPPORT,SNMPVER camel.mystuff.com,CPE-3810,foobar,10.1.15.62,161,2 somename.mystuff.com,CPE-2420,wheebaz,10.2.7.45,161,2 buhziuhuh.mystuff.com,CPE-2420,nipnop,10.2.7.40,161,2 salma-hayek.mystuff.com,CPE-2420,hummahumma,10.1.14.16,161,2 zippy.mystuff.com,CPE-2420,woo!woo,10.2.6.41,161,2 napoleon.mystuff.com,CPE-2420,vote4pedro,10.2.7.35,161,2 wjul.mystuff.com,CPE-3810,hoosurdaddy,10.1.15.72,161,2 telephone.mystuff.com,CPE-3660,yipyipyip,10.1.15.42,161,2 brrrrrring.mystuff.com,CPE-3660,uuuuuh-huh,10.1.15.73,161,2