#!/usr/bin/perl use strict; use warnings; use Carp qw(carp cluck croak confess); use XML::Hash; use File::Slurp; use Date::Parse; binmode STDOUT, ":utf8"; use threads; use threads::shared; use Thread::Queue; use Sys::CPU; use Devel::Size qw(size total_size); use List::MoreUtils qw(uniq); #use Data::Dumper; local $| = 1; print `/bin/date`."\n"; our $THREADS = Sys::CPU::cpu_count()*2; my $dir='/xmlFeeds'; my ($DIR,@files); opendir($DIR,$dir); foreach(readdir($DIR)) { push @files, $_ if $_ =~ m/.*\.xml/; } closedir($DIR); my $outFile='./out.nt'; my $OUTFILE; open($OUTFILE,'>:utf8',$outFile); my %similar :shared; my $recordCount :shared; $recordCount=1; my $Qwork = new Thread::Queue; ## Create the pool of workers my @pool = map{ threads->create( \&worker, $Qwork ) } 1 .. $THREADS; $Qwork->enqueue(@files); ## Tell the workers there are no more work items $Qwork->enqueue( (undef) x $THREADS ); ## Clean up the threads $_->join for @pool; my @doms = keys %similar; ## get keys into non-shared space for speed my %bigrams; for my $dom ( @doms ) { undef @{ $bigrams{ $dom } }{ uniq( unpack '(A2)*', $dom ) }; } for my $dom1 ( @doms ) { my $type = $similar{ $dom1 }; my $cDom1 = keys %{ $bigrams{ $dom1 } }; for my $dom2 ( @doms ) { next if $dom1 eq $dom2; my $innerType = $similar{ $dom2 }; my $cDom2 = keys %{ $bigrams{ $dom2 } }; my $counter = grep{ exists $bigrams{ $dom1 }{ $_ } } keys %{ $bigrams{ $dom2 } }; my $value = ( $counter * 2 ) / ( $cDom1 + $cDom2 ); if( $value >= 0.9 ) { my $triple .= qq| .\n|; print $triple; print $OUTFILE $triple; } } } close($OUTFILE); print `/bin/date`."\n"; sub worker { my $tid = threads->tid; my( $Qwork ) = @_; while( my $file = $Qwork->dequeue ) { my $triple = procXml($file); print $OUTFILE $triple if defined $triple; } } sub procXml { [code here] }