#!/usr/bin/perl use strict; use warnings; use File::Basename; use File::Find (); use Getopt::Std; use Linux::Inotify2; use POE qw( Kernel Session Wheel::Run ); $|++; ####################################### ####################################### our @found_dirs; our $max_concurrent_tasks; sub watch_add_dir { my ($heap_ref, $session, $dir_name) = @_; ############## # Watch this directory with a call back # to the watch_hdlr() subroutine via # a message to the POE system ############## $heap_ref->{inotify}->watch($dir_name, IN_CREATE|IN_CLOSE_WRITE, $session->postback("watch_hdlr")); print " Watching directory $dir_name\n"; } sub watch_hdlr { my ($kernel, $heap, $session, $event) = ( $_[KERNEL], $_[HEAP], $_[SESSION], $_[ARG1][0] ); my $name = $event->fullname; my $short_name = $event->name; ############## # We can receive many many notifications # for a file. If we’ve already processed # the file, do nothing. ############## unless ($heap->{inotify}{files}{$name}) { ############## # If a new directory is added, we need # to watch that directory too. ############## if ($event->IN_CREATE && -d $name) { print "New directory: $name\n"; watch_add_dir($heap, $session, $name); } elsif ($event->IN_CLOSE_WRITE) { ############## # When a file descriptor that was opened for # ’writing’ is closed, then process that # file it was being written to. We’re # assuming that the file is complete at this # point as the operation will be a copy into # the watched directory ############## my $ext = ( fileparse($name, ‘\..*’) )[2]; if (lc($ext) eq ‘.mp3′) { ############## # Add the file to the file process queue ############## push @{ $heap->{task}{task_files} }, $name; ############## # Mark that we have processed the file. If # we don’t we will end up processing the file # in an infinite loop because we are modifying # the files. ############## $heap->{inotify}{files}{$name} = 1; ############## # Yield to "task_next_file" through so # that we can process files in the queue. ############## $kernel->yield("task_next_file"); } $heap->{inotify}{files}{$name} = 1; } } ############## # While possible, it is highly unlikely that we will # overflow the notification buffers within the Linux # kernel. If so, we should report that. ############## print "events for $name have been lost\n" if $event->IN_Q_OVERFLOW; } sub task_next_file { my ($kernel, $heap) = @_[ KERNEL, HEAP ]; ############## # Process the files in the queue up # to the $max_concurrent_tasks at # once. Any extras will be processed # when a file (task) completes. ############## while ( keys( %{ $heap->{task} } ) < $max_concurrent_tasks ) { my $next_task_file = shift @{ $heap->{task}{task_files} }; ############## # If the $next_task_file is empty, then we can safely # ignore it. ############## last unless defined $next_task_file; ############## # Use POE::Wheel::Run to fire off the # file processing using a sub process # to the process_file() subroutine ############## my $task = POE::Wheel::Run->new ( Program => sub { process_file($next_task_file) }, StdoutEvent => "task_output", CloseEvent => "task_done", ); ############## # Update the session with the task # information and the kernel with # the SIG_CHILD handler. These are # necessary for the task to execute. ############## $heap->{task}->{ $task->ID } = $task; $kernel->sig_child( $task->PID, "sig_child"); } } sub process_file { my $file = shift; print " Processed \"$file\"\n"; ############## # Use the eyeD3 package to convert # the mp3 id3v2/3/4 to id3v1. If # eyeD3 fails, we don’t really care. :) ############## my $cmd_output = `eyeD3 –to-v1.1 "$file"`; $cmd_output = `eyeD3 –remove-v2 "$file"`; } sub find_wanted { my $object = $File::Find::name; if (-d $object) { push @found_dirs, $object; } } ####################################### ####################################### ####################################### my %arg_options; my $watch_dir; getopts(‘d:t:’, \%arg_options); if ($arg_options{d} && -d $arg_options{d}) { $watch_dir = $arg_options{d}; if ($arg_options{t} && $arg_options{t} =~ /^\d+/) { $max_concurrent_tasks = $arg_options{t}; } else { $max_concurrent_tasks = 2; } ############## # We need to watch all existing sub directories # so we will find them and add them to the # @found_dirs array to be added to the watched # directories when we create the Inotify object ############## File::Find::find({wanted => \&find_wanted}, $watch_dir); POE::Session->create ( inline_states => { _start => sub { my $inotify_FH; ############## # alias this particular POE session to # ’notify’ so we can easily reference # it later if needed ############## $_[KERNEL]->alias_set(‘notify’); ############## # Create the Linux::INotify object ############## $_[HEAP]{inotify} = new Linux::Inotify2 or die "Unable to create new inotify object: $!"; ############## # Add the preexisting directories to # be watched from the @found_dirs array ############## foreach my $dir (@found_dirs) { watch_add_dir($_[HEAP], $_[SESSION], $dir); } ############## # We need to create a hash in the "notify" # POE session so we can determine if we’ve # processed a file already ############## $_[HEAP]{inotify}{files} = {}; ############## # The Inotify notifications are received # on a file descriptor. We need to read # from it when there is something to be # read ############## open $inotify_FH, "< &=" . $_[HEAP]{inotify}->fileno or die "Can’t fdopen: $!\n"; ############## # Inform POE to poll the file descriptor ############## $_[KERNEL]->select_read( $inotify_FH, "inotify_poll" ); }, inotify_poll => sub { $_[HEAP]{inotify}->poll; }, watch_hdlr => \&watch_hdlr, ############## # Process the next file in the queue ############## task_next_file => \&task_next_file, ############## # print the output of the job ############## task_output => sub { my $result = $_[ARG0]; print "$result\n"; }, ############## # When we are done with a file, go process the # next file if there is one waiting ############## task_done => sub { my ($kernel, $heap, $task_id) = @_[ KERNEL, HEAP, ARG0 ]; delete $heap->{task}{$task_id}; $kernel->yield("task_next_file"); }, sig_child => sub { my ($heap, $pid) = @_[ HEAP, ARG1 ]; my $details = delete $heap->{$pid}; }, }, ); POE::Kernel->run(); } exit 0;