Oh wise and gracious monks, I really need your help understanding what is happening and what I might be able to do.
I have successfully used IPC::MPS to create a parent <==> broker(1) <==> workers(many) forked message passing architecture. The broker is forked at INIT time, and the workers are forked when the broker decides they are needed. After a worker is created, the broker just forwards messages between the parent and the workers.
I am using forks instead of threads, because I am trying to isolate some non-thread-safe code which will successfully run within the forked workers.
All was GREAT and worked SURPRISINGLY WELL, until a couple of the users introduced the requirement to be able to thread in the parent script and then pass messages to the broker. That is when I discovered the threads of the parent, will hang when they try to send a message to the broker. I have reduced the issue to the following example code:
#!/usr/bin/perl
#
# Create parent <==> child message relationship with socketpair()
# Then thread the parent and try to contact the child
use threads;
use IPC::MPS;
use warnings;
use v5.14;
my $child;
INIT {
$child = spawn {
receive {
msg hi => sub {
my ($from) = @_;
say "$$ - Hi there!";
snd( $from, 'hi', $$ );
};
};
};
receive { quit; };
say "My child vpid: $child";
}
sub say_hi {
my $ans = snd_wt( $child, 'hi');
say "$$ - My child $ans said hi";
};
say 'Single-threaded execution';
say_hi;
say 'Threaded execution';
my $thr = threads->create( \&say_hi );
$thr->join;
__END__
My child vpid: 18275192
Single-threaded execution
7533 - Hi there!
7532 - My child 7533 said hi
Threaded execution
<_HANGS_HERE_>
Note the communication works fine when not-threaded, and hangs when threaded. I suspect it may have something to do with the state of the dup'd socket file descriptors within the thread. I am open to patching IPC::MPS to support this scenario, or using a similar fork module that will work.
Any suggestions?
Here is the end of the strace output:
...
clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIG
+CHLD, child_tidptr=0x7f0235ade9d0) = 8680
close(5) = 0
rt_sigprocmask(SIG_BLOCK, [PIPE], [], 8) = 0
rt_sigaction(SIGPIPE, {SIG_DFL, [], SA_RESTORER, 0x7f02351d2150}, {SIG
+_IGN, [], SA_RESTORER, 0x7f02351d2150}, 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
rt_sigprocmask(SIG_BLOCK, [CHLD], [], 8) = 0
rt_sigaction(SIGCHLD, {SIG_DFL, [], SA_RESTORER, 0x7f02351d2150}, {SIG
+_IGN, [], SA_RESTORER|SA_NOCLDWAIT, 0x7f02351d2150}, 8) = 0
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
write(1, "My child vpid: 20032376\n", 24My child vpid: 20032376
) = 24
write(1, "Single-threaded execution\n", 26Single-threaded execution
) = 26
brk(0x15f4000) = 0x15f4000
brk(0x15ea000) = 0x15ea000
select(8, [4], [4], [4], NULL) = 1 (out [4])
select(8, [4], [4], [4], NULL) = 1 (out [4])
write(4, "\0\0\0*\4\t\01012345678\4\10\10\10\2\4\0\0\0\10\200\n\010200
+3"..., 46) = 46
select(8, [4], NULL, [4], NULL8680 - Hi there!
) = 1 (in [4])
read(4, "\0\0\0000\4\t\01012345678\4\10\10\10\2\4\0\0\0\6x\2531\1\0\0\
+0"..., 16384) = 52
write(1, "8679 - My child 8680 said hi\n", 298679 - My child 8680 said
+ hi
) = 29
write(1, "Threaded execution\n", 19Threaded execution
) = 19
rt_sigprocmask(SIG_BLOCK, ~[ILL BUS SEGV RTMIN RT_1], [], 8) = 0
ioctl(0, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo
+ ...}) = 0
lseek(0, 0, SEEK_CUR) = -1 ESPIPE (Illegal seek)
ioctl(1, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo
+ ...}) = 0
lseek(1, 0, SEEK_CUR) = -1 ESPIPE (Illegal seek)
ioctl(2, SNDCTL_TMR_TIMEBASE or TCGETS, {B38400 opost isig icanon echo
+ ...}) = 0
lseek(2, 0, SEEK_CUR) = -1 ESPIPE (Illegal seek)
ioctl(3, SNDCTL_TMR_TIMEBASE or TCGETS, 0x7fff55890df8) = -1 ENOTTY (I
+nappropriate ioctl for device)
lseek(3, 0, SEEK_CUR) = 700
ioctl(4, SNDCTL_TMR_TIMEBASE or TCGETS, 0x7fff55890df8) = -1 ENOTTY (I
+nappropriate ioctl for device)
lseek(4, 0, SEEK_CUR) = -1 ESPIPE (Illegal seek)
ioctl(4, SNDCTL_TMR_TIMEBASE or TCGETS, 0x7fff55890df8) = -1 ENOTTY (I
+nappropriate ioctl for device)
lseek(4, 0, SEEK_CUR) = -1 ESPIPE (Illegal seek)
brk(0x160b000) = 0x160b000
brk(0x162c000) = 0x162c000
brk(0x164d000) = 0x164d000
brk(0x166e000) = 0x166e000
brk(0x168f000) = 0x168f000
brk(0x16b1000) = 0x16b1000
brk(0x16d2000) = 0x16d2000
mmap(NULL, 135168, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1
+, 0) = 0x7f0235933000
brk(0x16f3000) = 0x16f3000
brk(0x1714000) = 0x1714000
brk(0x1735000) = 0x1735000
brk(0x1756000) = 0x1756000
brk(0x1777000) = 0x1777000
brk(0x1798000) = 0x1798000
munmap(0x7f0235933000, 135168) = 0
mmap(NULL, 8392704, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS|MA
+P_STACK, -1, 0) = 0x7f0233404000
mprotect(0x7f0233404000, 4096, PROT_NONE) = 0
clone(child_stack=0x7f0233c03ff0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|
+CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SE
+TTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f0233c049d0, tls=0x7f0233
+c04700, child_tidptr=0x7f0233c049d0) = 8681
rt_sigprocmask(SIG_SETMASK, [], NULL, 8) = 0
futex(0x7f0233c049d0, FUTEX_WAIT, 8681, NULL<_HANGS_HERE_>
Please don't suggest converting the user code to use forks or a similar user-level rewrite. The requirement for the parent to be able to thread and then pass a message to the broker from a thread is non-negotiable.
I highly appreciate any wisdom and/or direction anyone can provide, including any details that might help explain what is going wrong.
Thank you,
-Shaun