Beefy Boxes and Bandwidth Generously Provided by pair Networks
Just another Perl shrine
 
PerlMonks  

A framework for implementing child processes performing file updates, SQL queries, and the passing of results to the parent using Parallel::ForkManager

by atcroft (Monsignor)
on Jan 04, 2003 at 19:11 UTC ( #224291=snippet: print w/ replies, xml ) Need Help??

Description:

A recent project, because of time constraints, required I rewrite a script to run jobs in parallel. The original retrieved a list of items from a database, then processed each, maintaining a count of certain actions performed, and occasionally updating a file.

Because of the number of jobs to run simultaneously needed to be kept to certain maxima, I decided to implement the revised code using Parallel::ForkManager. Because of the difficulty I experienced in finding information in any one place on having the children capable of doing such things as having connections into the database, updating a common file, and passing data back to the parent when using this module, and because the assistance and answers to questions in the CB proved so instrumental in my success in implementing a parallel version of the original, I have taken my implementation, stripped it to what I hope to be a basic framework usable by someone else, and offer it here.

Your feedback and comments are appreciated.

#!/usr/bin/perl -w
#
# Acknowledgements:
#    - Sections of the code below were based on or, in some cases, 
#    taken from examples provided in one or more of the following 
#    places:
#      - Christiansen, Tom, and Nathan Torkington. Perl Cookbook. 
#      1st ed. O'Reilly and Associates, 1998.
#      - Descartes, Alligator, and Tim Bunce. Programming the Perl 
#      DBI. O'Reilly and Associates, 2000.
#      - Wall, Larry, Tom Christiansen, and Randal L. Schwartz. 
#      Programming Perl. 2nd ed. O'Reilly and Associates, 1996.
#      - Szabo, Balazs. Documentation for 
#      Parallel::ForkManager-0.7.5. http://search.cpan.org/author/DLUX
+/Parallel-ForkManager-0.7.5/ForkManager.pm . 03 Jan 2003.
#  Faults or incorrect implementations of code based upon code 
#  from the above sources should not be attributed to original 
#  source, but to my implementation.
#  - My thanks to those individuals visiting the Perl Monks 
#  (http://www.perlmonks.org/) "ChatterBox" between approximately 
#  15:00 and 22:00 GMT on Friday, 03 Jan 2003, for answering 
#  questions, especially the user known as Tye for his assistance 
#  with questions while working out the child-to-parent messaging 
#  methodology.
#  - If I have made an inappropriate attribution, or failed to 
#  make an attribution, it was not intentional. Please contact me, 
#  and I will be happy to attempt to correct the situation.
#

use DBI;
use Fcntl qw(:DEFAULT :flock);
use IO::Handle;
use Parallel::ForkManager;
use strict;

$| = 1;

my $table_name = "tablename";
my $cfg_data   = "data.file";

my $db_type  = "Oracle";
my $db_sid   = "databasename";
my $username = "username";
my $password = "password";

#
# Process reads information for a database to determine items to 
#   handle, then handles each in a child process, as processing 
#   times can be lengthy.
# 

my $dbh_parent =
  DBI->connect( join ( ':', ( 'dbi', $db_type, $db_sid ) ),
    $username, $password, { AutoCommit => 0 } )
  or die (
    join (
        "\n",
        (
          'Database connection failed:',
        'Error returned was:',
        $DBI->errstr
        )
      )
      . "\n"
  );
$dbh_parent->{LongReadLen} = 64000;

my @targetlist = ();
{

    #
    # Get items to handle, and load into an array, so we do not need 
    #   the parent db handle when we start creating child processes.
    # 
    my ($item);

    my $sql_statement = "SELECT UNIQUE items FROM " . $tablename;
    my $statement_handle = $dbh_parent->prepare($sql_statement);
    $statement_handle->execute
      or die (
        join (
            "\n",
            (
              'Failure while executing SQL statement:',
            $sql_statement,
            'Error returned was:',
            $DBI->errstr
            )
          )
          . "\n"
      );
    my $result_handle =
      $statement_handle->bind_columns( \$item );

    #
    # Because child processes inherit the environment of their 
    #   parent when spawned, they will also inherit the DB handles 
    #   of the parent-thus, we get the listing of hosts into an 
    #   array, then no longer need that handle (and prevent 
    #   spurtious errors regarding the parent DB handle not being 
    #   closed when child exits.
    #
    while ( $statement_handle->fetch ) {
        push ( @targetlist, $item );
    }
}

#
# Loop is done, 
#   we no longer need the db connection from the parent process
#
$dbh_parent->disconnect;

#
# Code for returning a total count of modems handled at the end.
#   The pipe created should allow the children to send a value back
#   to the parent.
#
my $grandtotal = 0;
my $childcount = scalar(@targetlist);

pipe( READER, WRITER );
WRITER->autoflush(1);

my $MAX_PROCESSES = 7;
my $pm            = new Parallel::ForkManager($MAX_PROCESSES);

foreach my $item (@targetlist) {

    # 
    # Fork off the next child process using Parallel::ForkManager to 
    #   keep the number of children to no more than $MAX_PROCESSES.
    #

    $pm->start and next;

    # Close the read end of the pipe for the child processes.
    close(READER);

    #
    # Each child needs its own handle to the database, 
    #   as they do not play well with each other's
    #
    my $dbh =
      DBI->connect( join ( ':', ( 'dbi', $db_type, $db_sid ) ),
        $username, $password, { AutoCommit => 0 } )
      or die (
        join (
            "\n",
            (
              'Database connection failed:',
            'Error returned was:',
            $DBI->errstr
            )
          )
          . "\n"
      );
    $dbh->{LongReadLen} = 64000;
    {
        my ($loop);

        #
        # Child processing occurs herein, including count of whatever 
        #   is being done (stored in $loop).
        # 

        #
        # ### CHILD PROCESS CODE HERE ####
        # 

        #
        # Child processes in original script had to occasionally 
        #   append to a file, thus the routine below for handling 
        #   file locking.
        #   
        {

            #
            # Implemented using file locking to prevent data 
            # corruption.
            #
            open( OUTFILE, ">>" . $cfg_data );
            {
                flock( OUTFILE, LOCK_EX );

                #
                # ### CODE TO ADD APPROPRIATE DATA TO FILE HERE ###
                #


            }
            close(OUTFILE);
        }

        #
        # Child process clean-up:
        #   - send item and count to parent via pipe
        #   - close DB handle
        #   - close writing end of the pipe for the child
        #   - exit child process
        #
        print( WRITER join ( ' ', ( $item, $loop ) ), "\n" );
        $dbh->disconnect;
    }
    close(WRITER);
    $pm->finish;
}

#
# Close the writing end of the pipe for the parent, 
#   loop thru
#     read a number of lines equal to the number of children 
#     expected, increment $grandtotal by the value read,
#   then close the read end of the pipe
#
# Data on a line sent from child to parent is in the form:
#   <line>      = <item><seperator><count>
#   <item>      = [^\s]+
#   <seperator> = \s+
#   <count>     = <numeric-value>
# 
{
    close(WRITER);
    for ( my $i = 0 ; $i < $childcount ; $i++ ) {
        my $line = <READER>;

        chomp($line);
        my @parts = split ( /\s+/, $line, 2 );
        $grandtotal += $parts[1];
    }
    close(READER);
}

# Wait for any remaining children (should be none).
$pm->wait_all_children;

print <<GRAND_TOTAL;

Total items processed: $grandtotal

GRAND_TOTAL
Comment on A framework for implementing child processes performing file updates, SQL queries, and the passing of results to the parent using Parallel::ForkManager
Download Code
•Re: A framework for implementing child processes performing file updates, SQL queries, and the passing of results to the parent using Parallel::ForkManager
by merlyn (Sage) on Jan 04, 2003 at 19:23 UTC

Back to Snippets Section

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: snippet [id://224291]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others studying the Monastery: (8)
As of 2014-10-02 17:17 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    What is your favourite meta-syntactic variable name?














    Results (66 votes), past polls