http://www.perlmonks.org?node_id=999845

Anonymous Monk has asked for the wisdom of the Perl Monks concerning the following question:

I want to write an application that reads from a file and once it's read enough (filled up a buffer of some size) does some processing on what it's read. A simple example of the processing might be to count the frequency of words in a text file. To take advantage of multiple cores, I'd like to have several threads (each responsible for words starting with a different letter, say) processing the buffered data in parallel. Once they have all finished, reading from the input file would resume, refilling the buffer, etc. until the entire (large) file has been processed. Once the entire file has been read and words counted, I'd like to print out the frequency of each word found.

I've read the thread tutorial and it seems like this should be pretty straightforward, but I'm not sure if the pattern is best suited to a queueing model, or whether the word frequency hashes should be shared data, or exactly how to manage the flow from single file-reader thread to parallel processing threads, back to file-reader, etc. and finally to single output-writer thread.

Any suggestions (or pointers to previous examples) for this kind of pattern?

Replies are listed 'Best First'.
Re: to thread or fork or ?
by Tanktalus (Canon) on Oct 19, 2012 at 03:08 UTC

    Personally, I'd default to the same as mbethke: no threading. And forking might not be any better.

    If I were to even consider something like this, the file would have to be split. Across multiple physical disks. Because your CPU is so much faster than the disk access that there's no way for the disk to overwhelm a single CPU, nevermind multiple CPUs. Only with multiple physical disks all spinning and sending back data simultaneously do you have a chance at even keeping a single CPU busy. And even then, I suspect that the bus would still be a limiting factor.

    If, however, you were to be doing analysis of each word to group similar-sounding words together (see soundex), then you might have enough to do to make the disks start to wait for the CPU. But I doubt it.

    Assuming we completely change your requirements so that there would be a point to this, that your file was actually large files each on separate physical disks, and that you were doing some significant processing, even then, I'd probably avoid threading or forking if at all possible. To this end, I'd consider using IO::AIO (which uses threads under the covers, but not perl threads) to read the files.

    If that ends up not being fast enough where you actually need to process on more than one CPU at a time, I would suggest as little sharing as possible. Having different threads be responsible for words starting with a different letter just doesn't scale easily (26 threads may not be optimal for your CPU configuration, and what if you start dealing with non-English languages, especially Russian or Greek or Chinese, among others?), nor uniformly (more words start with "s" in English than any other letter, for example, while the thread dealing with words starting with "x" will be awfully idle). Instead, one thread/process per file, and each thread/process doing exactly the same thing: populating a hash. Then the challenge is that in the parent process, you need to pull all this together and add them together. That's not really too hard:

    for my $word (keys %child_results) { $total{$word} += $child_results{$ +word} }
    The challenge is getting that data back to the parent atomically. That is, the entire hash, not a partial result. And doing so easily. One way is to share the %total hash in each thread, and then use a mutal exclusion semaphore on it that blocks concurrent access. Another way is to use AnyEvent::Util::fork_call to transfer the data back from the subprocess (not thread) to the parent process. In this model, the parent is not using threads at all, and so will not be able to get partial results.

    But, for your original requirements, there's just no way that threading, forking, or any of that will add anything other than complexity. :-)

      To add to what both mbethke and Tanktalus have already said, Perl can be pretty fast with just single CPU linear processes. I was doing some stuff for fun where I extracted all the music related entries from the Wikipedia dump files (about 8 GB compressed) using a pull parser out of CPAN and it was slurping them all out and dumping to a file in a couple hours on a 1.8 GHz Macbook Pro 1,1 (not even a Core Duo) with 2 GB of RAM. And I could do other stuff at the same time.

      So unless you're doing something like sorting through CERN data to check for the Higgs yourself, or something that requires a lot of real time processing on a bunch of data, working to split a task to multiple cores might be overkill. Unless it's for entertainment, and then you should post it as a CUFP when it's working.

      Thanks for your suggestions/comments. To be a bit more forthcoming I should describe something closer to the real problem. The input data set can be 100s of Gb (up to perhaps Tb) and the "words" are states of a system that can number in the billions (it's actually closer to the CERN problem someone suggested than the baby example I gave). The full frequency spectrum won't fit in memory on a single node, and to this point I've been doing a Perl solution where the "word-space" is roughly load-balanced across hundreds to thousands of cores each reading the entire dataset and only keeping track of the frequencies of their portion of the full spectrum. The trouble with this is the I/O bandwidth requirement is huge and I'm trying to reduce that by having each node in my cluster only read the data once and share it among all of the threads running on that node, instead of the current solution where every process needs to read the entire dataset. Does this change things at all?

        An architecture:

        Split your bigfile size across N machines.

        Have a process on each machine that processes a filesize/N chunk of the bigfile. (Say, 32 machines each reading a different 32GB chunk of your 1TB file.)

        Each reader accumulates word counts in a hash until the hash size approaches it's memory limit.

        (Assume ~1.5GB/10 million words/keys on a 64-bit Perl; somewhat less on a 32-bit.)

        When that limit is reached; it posts (probably udp) out the word/count pairs to the appropriate accumulator machines; frees the hash and continues reading the file from where it left off.

        • The file is only processed once.
        • It can be processed in parallel by as many reader processes/machines as your IO/Disk bandwidth will allow.
        • Each reader process only uses whatever amount of memory you decide is the limit.
        • You can split the accumulations across as many (or few) boxes as are required.
        • As only word/count pairs are exchanged, IO (other than reading the file) will be minimal.

        No threading, shared memory or locking required. Simple to set up and efficient to process.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority".
        In the absence of evidence, opinion is indistinguishable from prejudice.

        RIP Neil Armstrong

Re: to thread or fork or ?
by mbethke (Hermit) on Oct 19, 2012 at 02:25 UTC

    Unless the work you do is A LOT more expensive than a simple frequency count, or your data set extremely large and your disks very fast, you're best off using a single process. Synchronization is a lot of work with threads so accessing say a shared hash can be orders of magnitude slower than a non-shared one; with processes you'd end up having to serialize the end result and somehow pipe it back to the master process---also very expensive.

    If you're sure you want this (maybe just as a learning experience), I'd suggest just using fixed-size chunks from the input stream per process/thread, to minimize shared data. Say, read a couple of megabytes plus a line into a single string (so as to read at maximum speed, plus the line so you don't split your work in the middle of a word), then start a thread to process it (split into words, optionally normalize, count) into a local hash that then goes into a queue read by the master thread that checks for results from worker threads in-between blocks.