## Mutex::Flock - Fcntl-based advisory locking. package Mutex::Flock; use strict; use warnings; no warnings qw( threads recursion uninitialized once ); our $VERSION = '0.007'; use Fcntl ':flock'; use Carp (); my $has_threads = $INC{'threads.pm'} ? 1 : 0; my $tid = $has_threads ? threads->tid() : 0; sub CLONE { $tid = threads->tid() if $has_threads; } sub DESTROY { my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); $obj->unlock(), close(delete $obj->{_fh}) if $obj->{ $pid }; unlink $obj->{path} if ($obj->{_init} && $obj->{_init} eq $pid); return; } sub _open { my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); return if exists $obj->{ $pid }; open $obj->{_fh}, '+>>:raw:stdio', $obj->{path} or Carp::croak("Could not create temp file $obj->{path}: $!"); return; } ## Public methods. my ($id, $prog_name) = (0); $prog_name = $0; $prog_name =~ s{^.*[\\/]}{}g; $prog_name = 'perl' if ($prog_name eq '-e' || $prog_name eq '-'); sub new { my ($class, %obj) = (@_); if (! defined $obj{path}) { my ($pid, $tmp_dir, $tmp_file) = ( abs($$) ); if ($ENV{TEMP} && -d $ENV{TEMP} && -w _) { $tmp_dir = $ENV{TEMP}; } elsif ($ENV{TMPDIR} && -d $ENV{TMPDIR} && -w _) { $tmp_dir = $ENV{TMPDIR}; } elsif (-d '/tmp' && -w _) { $tmp_dir = '/tmp'; } else { Carp::croak("no writable dir found for temp file"); } $id++, $tmp_dir =~ s{/$}{}; # remove tainted'ness from $tmp_dir if ($^O eq 'MSWin32') { ($tmp_file) = "$tmp_dir\\$prog_name.$pid.$tid.$id" =~ /(.*)/; } else { ($tmp_file) = "$tmp_dir/$prog_name.$pid.$tid.$id" =~ /(.*)/; } $obj{_init} = $has_threads ? $$ .'.'. $tid : $$; $obj{ path} = $tmp_file.'.lock'; } # test open open my $fh, '+>>:raw:stdio', $obj{path} or Carp::croak("Could not create temp file $obj{path}: $!"); close $fh; # update permission chmod 0600, $obj{path} if $obj{_init}; return bless(\%obj, $class); } sub lock { my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); $obj->_open() unless exists $obj->{ $pid }; flock ($obj->{_fh}, LOCK_EX), $obj->{ $pid } = 1 unless $obj->{ $pid }; return; } *lock_exclusive = \&lock; sub lock_shared { my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); $obj->_open() unless exists $obj->{ $pid }; flock ($obj->{_fh}, LOCK_SH), $obj->{ $pid } = 1 unless $obj->{ $pid }; return; } sub unlock { my ($pid, $obj) = ($has_threads ? $$ .'.'. $tid : $$, @_); flock ($obj->{_fh}, LOCK_UN), $obj->{ $pid } = 0 if $obj->{ $pid }; return; } sub synchronize { my ($pid, $obj, $code, @ret) = ( $has_threads ? $$ .'.'. $tid : $$, shift, shift ); return if ref($code) ne 'CODE'; $obj->_open() unless exists $obj->{ $pid }; # lock, run, unlock - inlined for performance flock ($obj->{_fh}, LOCK_EX), $obj->{ $pid } = 1 unless $obj->{ $pid }; defined wantarray ? @ret = $code->(@_) : $code->(@_); flock ($obj->{_fh}, LOCK_UN), $obj->{ $pid } = 0; return wantarray ? @ret : $ret[-1]; } *enter = \&synchronize; sub timedwait { my ($obj, $timeout) = @_; local $@; local $SIG{'ALRM'} = sub { alarm 0; die "timed out\n" }; eval { alarm $timeout || 1; $obj->lock_exclusive }; alarm 0; ( $@ && $@ eq "timed out\n" ) ? '' : 1; } 1; __END__ =head1 NAME Mutex::Flock - Fcntl advisory locking =head1 SYNOPSIS { use Mutex::Flock; ( my $mutex = Mutex::Flock->new( path => $0 ) )->lock_exclusive; ... } { my $mutex = MCE::Mutex::Flock->new( path => $0 ); # terminate script if a previous instance is still running exit unless $mutex->timedwait(2); ... } { use threads; use Mutex::Flock; my $mutex = Mutex::Flock->new; threads->create('task', $_) for 1..4; $_->join for ( threads->list ); } { use MCE::Hobo; use Mutex::Flock; my $mutex = Mutex::Flock->new; MCE::Hobo->create('task', $_) for 5..8; MCE::Hobo->waitall; } sub task { my ($id) = @_; $mutex->lock; # access shared resource print $id, "\n"; sleep 1; $mutex->unlock; } =head1 DESCRIPTION This module implements locking methods that can be used to coordinate access to shared data from multiple workers spawned as processes or threads. =head1 API DOCUMENTATION =head2 Mutex::Flock->new ( [ path => "/tmp/file.lock" ] ) Creates a new mutex. When path is given, it is the responsibility of the caller to remove the file. Otherwise, it establishes a C internally including removal on scope exit. =head2 $mutex->lock ( void ) =head2 $mutex->lock_exclusive ( void ) Attempts to grab an exclusive lock and waits if not available. Multiple calls to mutex->lock by the same process or thread is safe. The mutex will remain locked until mutex->unlock is called. The method C is an alias for C. =head2 $mutex->lock_shared ( void ) Like C, but attempts to grab a shared lock instead. =head2 $mutex->unlock ( void ) Releases the lock. A held lock by an exiting process or thread is released automatically. =head2 $mutex->synchronize ( sub { ... }, @_ ) =head2 $mutex->enter ( sub { ... }, @_ ) Obtains a lock, runs the code block, and releases the lock after the block completes. Optionally, the method is C aware. my $val = $mutex->synchronize( sub { # access shared resource return 'scalar'; }); my @ret = $mutex->enter( sub { # access shared resource return @list; }); The method C is an alias for C. =head2 $mutex->timedwait ( timeout ) Blocks until taking obtaining an exclusive lock. A false value is returned if the timeout is reached, and a true value otherwise. my $mutex = MCE::Mutex::Flock->new( path => $0 ); # terminate script if a previous instance is still running exit unless $mutex->timedwait(2); ... =head1 AUTHOR Mario E. Roy, Smarioeroy AT gmail DOT comE> =cut