Beefy Boxes and Bandwidth Generously Provided by pair Networks
No such thing as a small change
 
PerlMonks  

Re: File Loader (load the content of a file and insert into DB)

by r34d0nl1 (Pilgrim)
on Jun 14, 2005 at 18:10 UTC ( #466662=note: print w/replies, xml ) Need Help??


in reply to File Loader (load the content of a file and insert into DB)

Well, I created a little monster that monitors a directory, load the file (put it into
a hash and start to load it if it does not change in 3 seconds
- the way I found to see if the transfer of the file has finished).
Then it creats a child that will actually load the file and say OK or Fail.
And the father loader tries to just check it and move the file.
The status of the file (ready to load, loading or loaded is stored in a hash, until the file is loaded and moved)
Basically the code (that is running without problems for more than 3 weeks - and I can't believe it :d) is bellow. Please coment it:
#!/usr/bin/perl use strict; use Data::Dumper ; use POSIX 'setsid'; use File::Copy ; $| = 1; #unbuferred print my $basedir = '/home/emf003/loader'; # base loader + dir my $sourceFile = "$basedir/raw"; # dir to chec +k my $logsDir = "$basedir/logs"; # where lock +and log files will be in my $loaderDcProgram = "$basedir/t.pl"; # program to +run my $moveFile2Dir = "$basedir/arch"; # where files + will be moved after beeing processed my $errorFilesDir = "$basedir/errorFiles"; # where files + that could not be loaded will be my $sleep = 5; # how long to + sleep for my $maxProcess = 10 ; # max number +of process in the stack my $processCount = 0 ; my $timeToKill = 180; # 3 (180) min +utes to kill a son of the process; my $maxInternalErrors = 100 ; # max number +of messages (before this daemon kill himself) my $internalErrors = 0 ; # internal er +ror counter. To avoid endless loop my %files = () ; $SIG{INT} = \&catchKiller; # deal with ctrl+c $SIG{TERM} = \&catchKiller; # deal with kill process $SIG{KILL} = \&catchKiller; # deal with kill process $SIG{CHLD} = 'IGNORE' ; # kill zombies :d like resident evil girl +does :d sub daemonize { chdir '/' or die "Can't chdir to /: $!"; open STDIN, '/dev/null' or die "Can't read /dev/null: $!"; open STDOUT, '>/dev/null' or die "Can't write to /dev/null: $!"; defined(my $pid = fork) or die "Can't fork: $!"; exit if $pid; setsid or die "Can't start a new session: $!" +; open STDERR, '>&STDOUT' or die "Can't dup stdout: $!"; } &daemonize(); &check_env(); # very basic env check #print STATUS Dumper(\%files); print STATUS "\nloader started at " . localtime() . "\n"; while (1) { &createInfoFile(); #warn "\n opening $sourceFile "; opendir(BASE, $sourceFile) or "Cant open dir $sourceFile: $!\n +"; #any file; minus . and .. my @dirContent = grep {!/^\.{1,2}$/ } readdir BASE ; closedir BASE ; #main idea: get the file and its size. Wait some seconds and c +heck if its #size had changed. If not, then let's load it. #STATUs A (active - will be checked to see if file size had +changed) # R (ready to be loaded) # L (loading - loader is processing the file) # X (file had been loaded and process finished - can b +e deleted) # F (files had been processed but returned errors) for (@dirContent) { if (!exists $files{$_}) { open (FH, ">>/$sourceFile/$_"); $files{$_}{FILESIZE} = -s FH ; $files{$_}{TIMESTAMP} = time ; $files{$_}{STATUS} = 'A'; close FH ; } } sleep $sleep; &mark2Load(\%files); foreach my $file (keys %files) { if ($files{$file}{STATUS} eq 'R') { if ($processCount < $maxProcess) { if (my $pid = fork) { $files{$file}{STATUS}='L'; $files{$file}{PROCESS}=$pid; $files{$file}{STARTPROCESS}=ti +me ; print STATUS "\n PID $pid proc +essing file $file"; $processCount++ ; } else { exec ( "$loaderDcProgram $sour +ceFile/$file > $logsDir/$$.lock" ) ; exit(0); } } } elsif ( ($files{$file}{STATUS} eq 'L') && (( +time - $files{$file}{STARTPROCESS}) <= $timeToKill) ) { #check if lock file still exists eval { open TEST, "< $logsDir/$files{ +$file}{PROCESS}.lock"; die "not found " unless (-e TE +ST); }; #update status if file not found if ($@ =~ /not found/) { $files{$file}{STATUS} = 'X' } else { my @lines = <TEST> ; if ($lines[0] =~ /^OK$/) { unlink("$logsDir/$file +s{$file}{PROCESS}.lock") or makeLog("\n Could n +ot unlink file $logsDir/$files{$file}{PROCESS}.lock" . "$!"); } elsif ($lines[0] =~ /^FAIL$/ +) { $files{$file}{STATUS} += 'F'; } #if file has not been correct +loaded (OK) it should have a FAIL in the 1st line. #antway it can implement more +'status' in a switch way } close TEST ; } elsif (($files{$file}{STATUS} eq 'L') && (ti +me - $files{$file}{STARTPROCESS}) > $timeToKill) { #Kill all of them. Do not exitet (??) + Show no mercy. kill $files{$file}{PROCESS} ; } elsif ($files{$file}{STATUS} eq 'X') { move ("$sourceFile/$file","$moveFile2D +ir/$file"); delete ($files{$file}); $processCount--; } elsif ($files{$file}{STATUS} eq 'F') { move ("$sourceFile/$file","$errorFiles +Dir/$file"); move ("$logsDir/$files{$file}{PROCESS} +.lock","$errorFilesDir/$file.err"); delete ($files{$file}); $processCount-- ; } } } print Dumper(\%files); #------------------------------------------------------------ sub mark2Load() { my $files = shift ; ##receives the hash as a reference foreach my $file (keys %{$files}) { #check if timestamp changed and if filesize doesn't. #if it happened, than we can try to load it. if ($files->{$file}{STATUS} eq 'A') { open (FH, ">>/$sourceFile/$file"); my $fileSize = -s FH ; if ( (time - $files->{$file}{TIMESTAMP +}) > 0 && ($files->{$file}{FILESIZE} == $fil +eSize ) ) { $files->{$file}{STATUS} = 'R' +; } close FH ; } } } #--------------------------------------------------------- sub catchKiller { my $sigName = shift ; makeLog("\nRecebendo sinal de kill $sigName as ". localtime() +); die "\n I've been killed $sigName"; } #-------------------------------------------------------- sub check_env() { open TEST1, $basedir or die "\n Can't open $basedir $!"; open TEST2, $sourceFile or die "\n Can't open $sourceFile $!"; open TEST3, $logsDir or die "\n Can't open $logsDir $!"; open TEST4, $loaderDcProgram or die "\n Can't open $loaderDcP +rogram $!"; die " $loaderDcProgram has no exec permission" unless ( -x TES +T4) ; close TEST4 ; close TEST1 ; close TEST2 ; close TEST3 ; open (STATUS, ">> $logsDir/LOADERSTATUS") } #--------------------------------------------------------- sub makeLog() { print STATUS shift ; $internalErrors++; print STATUS "'\nErrors $internalErrors process: $processCou +nt"; if ($internalErrors >= $maxInternalErrors) { print STATUS "\nkilling myself ($$) . too many errors. +" . " DEBUG info: " . print Dumper(\%files); die ; } } sub END { print STATUS "\n closing loader " . localtime() ; close STATUS ; } #-------------------------------------------------------- sub createInfoFile() { open (STATUS, "> $logsDir/STATUS"); print STATUS <<EOF; Last Update: @{[scalar localtime()]} process: $$ basedir: $basedir sourcefile: $sourceFile logs: $logsDir program: $loaderDcProgram move good: $moveFile2Dir move error: $errorFilesDir sleep time: $sleep max process: $maxProcess process count: $processCount time to kill child: $timeToKill Max errors: $maxInternalErrors internal errors: $internalErrors @{[Dumper(\%files)]} EOF close STATUS }

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://466662]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others exploiting the Monastery: (3)
As of 2019-10-19 04:10 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Notices?