Skip to content

Commit 6a0574f

Browse files
author
Jan Henning Thorsen
committed
Add Mojo::IOLoop::ReadWriteFork
1 parent 469b4f4 commit 6a0574f

File tree

6 files changed

+483
-0
lines changed

6 files changed

+483
-0
lines changed

Makefile.PL

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ WriteMakefile(
1010
'IO::Pty' => '1.10',
1111
},
1212
BUILD_REQUIRES => {
13+
'Test::Memory::Cycle' => '1.04',
1314
'Test::More' => '0.90',
1415
},
1516
META_MERGE => {

README

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
NAME
2+
Mojo::IOLoop::ReadWriteFork - Fork a process and read/write from it
3+
4+
DESCRIPTION
5+
This class enable you to fork children which you can write data to and
6+
emit events when the child prints to STDERR or STDOUT.
7+
8+
Patches that enable the "read" event to see the difference between
9+
STDERR and STDOUT are more than welcome.
10+
11+
SYNOPSIS
12+
my $fork = Mojo::IOLoop::ReadWriteFork->new;
13+
my $cat_result = '';
14+
15+
$fork->on(error => sub {
16+
my($fork, $error) = @_;
17+
warn $error;
18+
});
19+
$fork->on(close => sub {
20+
my($fork, $exit_value, $signal) = @_;
21+
warn "got close event";
22+
Mojo::IOLoop->stop;
23+
});
24+
$fork->on(read => sub {
25+
my($fork, $buffer) = @_; # $buffer = both STDERR and STDOUT
26+
$cat_result .= $buffer;
27+
});
28+
29+
$fork->start(
30+
program => 'cat',
31+
program_args => [ '-' ],
32+
conduit => 'pty',
33+
);
34+
35+
EVENTS
36+
close
37+
Emitted when the child process exit.
38+
39+
error
40+
Emitted when when the there is an issue with creating, writing or
41+
reading from the child process.
42+
43+
read
44+
Emitted when the child has written a chunk of data to STDOUT or STDERR.
45+
46+
ATTRIBUTES
47+
pid
48+
Holds the child process ID.
49+
50+
reactor
51+
Holds a Mojo::Reactor object. Default is:
52+
53+
Mojo::IOLoop->singleton->reactor;
54+
55+
METHODS
56+
start
57+
$self->start(
58+
program => $str,
59+
program_args => [@str],
60+
conduit => $str, # pipe or pty
61+
raw => $bool,
62+
clone_winsize_from => \*STDIN,
63+
);
64+
65+
Used to fork and exec a child process.
66+
67+
raw and "clone_winsize_from|IO::Pty" only makes sense if "conduit" is
68+
"pty".
69+
70+
write
71+
$self->write($buffer);
72+
73+
Used to write data to the child process.
74+
75+
kill
76+
$bool = $self->kill;
77+
$bool = $self->kill(15); # default
78+
79+
Used to signal the child.
80+
81+
AUTHOR
82+
Jan Henning Thorsen - "[email protected]"
83+

README.pod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
lib/Mojo/IOLoop/ReadWriteFork.pm

lib/Mojo/IOLoop/ReadWriteFork.pm

Lines changed: 304 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,304 @@
1+
package Mojo::IOLoop::ReadWriteFork;
2+
3+
=head1 NAME
4+
5+
Mojo::IOLoop::ReadWriteFork - Fork a process and read/write from it
6+
7+
=head1 DESCRIPTION
8+
9+
This class enable you to fork children which you can write data to
10+
and emit events when the child prints to STDERR or STDOUT.
11+
12+
Patches that enable the L</read> event to see the difference between STDERR
13+
and STDOUT are more than welcome.
14+
15+
=head1 SYNOPSIS
16+
17+
my $fork = Mojo::IOLoop::ReadWriteFork->new;
18+
my $cat_result = '';
19+
20+
$fork->on(error => sub {
21+
my($fork, $error) = @_;
22+
warn $error;
23+
});
24+
$fork->on(close => sub {
25+
my($fork, $exit_value, $signal) = @_;
26+
warn "got close event";
27+
Mojo::IOLoop->stop;
28+
});
29+
$fork->on(read => sub {
30+
my($fork, $buffer) = @_; # $buffer = both STDERR and STDOUT
31+
$cat_result .= $buffer;
32+
});
33+
34+
$fork->start(
35+
program => 'cat',
36+
program_args => [ '-' ],
37+
conduit => 'pty',
38+
);
39+
40+
=cut
41+
42+
use Mojo::Base 'Mojo::EventEmitter';
43+
use IO::Pty;
44+
use Mojo::Util 'url_escape';
45+
use POSIX ':sys_wait_h';
46+
use Scalar::Util ();
47+
use constant CHUNK_SIZE => $ENV{MOJO_CHUNK_SIZE} || 131072;
48+
use constant DEBUG => $ENV{MOJO_READWRITE_FORK_DEBUG} || 0;
49+
use constant WAIT_PID_INTERVAL => $ENV{WAIT_PID_INTERVAL} || 0.01;
50+
51+
=head1 EVENTS
52+
53+
=head2 close
54+
55+
Emitted when the child process exit.
56+
57+
=head2 error
58+
59+
Emitted when when the there is an issue with creating, writing or reading
60+
from the child process.
61+
62+
=head2 read
63+
64+
Emitted when the child has written a chunk of data to STDOUT or STDERR.
65+
66+
=head1 ATTRIBUTES
67+
68+
=head2 pid
69+
70+
Holds the child process ID.
71+
72+
=cut
73+
74+
has pid => 0;
75+
76+
=head2 reactor
77+
78+
Holds a L<Mojo::Reactor> object. Default is:
79+
80+
Mojo::IOLoop->singleton->reactor;
81+
82+
=cut
83+
84+
has reactor => sub {
85+
require Mojo::IOLoop;
86+
Mojo::IOLoop->singleton->reactor;
87+
};
88+
89+
=head1 METHODS
90+
91+
=head2 start
92+
93+
$self->start(
94+
program => $str,
95+
program_args => [@str],
96+
conduit => $str, # pipe or pty
97+
raw => $bool,
98+
clone_winsize_from => \*STDIN,
99+
);
100+
101+
Used to fork and exec a child process.
102+
103+
L<raw|IO::Pty> and C<clone_winsize_from|IO::Pty> only makes sense if
104+
C<conduit> is "pty".
105+
106+
=cut
107+
108+
sub start {
109+
my $self = shift;
110+
my $args = ref $_[0] ? $_[0] : {@_};
111+
112+
$args->{env} = { %ENV };
113+
$args->{program} or die 'program is required input';
114+
$args->{conduit} ||= 'pipe';
115+
$args->{program_args} ||= [];
116+
ref $args->{program_args} eq 'ARRAY' or die 'program_args need to be an array';
117+
Scalar::Util::weaken($self);
118+
$self->{delay} = $self->reactor->timer(0 => sub { $self->_start($args) });
119+
$self;
120+
}
121+
122+
sub _start {
123+
local($?, $!);
124+
my($self, $args) = @_;
125+
my($stdout_read, $stdout_write);
126+
my($stdin_read, $stdin_write);
127+
my $pid;
128+
129+
if($args->{conduit} eq 'pipe') {
130+
pipe $stdout_read, $stdout_write or return $self->emit_safe(error => "pipe: $!");
131+
pipe $stdin_read, $stdin_write or return $self->emit_safe(error => "pipe: $!");
132+
select +(select($stdout_write), $| = 1)[0];
133+
select +(select($stdin_write), $| = 1)[0];
134+
}
135+
elsif($args->{conduit} eq 'pty') {
136+
$stdin_write = $stdout_read = IO::Pty->new
137+
}
138+
else {
139+
warn "Invalid conduit ($args->{conduit})\n" if DEBUG;
140+
return $self->emit_safe(error => "Invalid conduit ($args->{conduit})");
141+
}
142+
143+
$pid = fork;
144+
145+
if(!defined $pid) {
146+
warn "Could not fork $!\n" if DEBUG;
147+
$self->emit_safe(error => "Couldn't fork ($!)");
148+
}
149+
elsif($pid) { # parent ===================================================
150+
warn "[$pid] Child starting ($args->{program} @{$args->{program_args}})\n" if DEBUG;
151+
$self->{pid} = $pid;
152+
$self->{stdin_write} = $stdin_write;
153+
$self->{stdout_read} = $stdout_read;
154+
$stdout_read->close_slave if defined $stdout_read and UNIVERSAL::isa($stdout_read, 'IO::Pty');
155+
156+
Scalar::Util::weaken($self);
157+
$self->reactor->io($stdout_read => sub {
158+
$self->{stop} and return;
159+
local($?, $!);
160+
my $reactor = shift;
161+
162+
$self->_read;
163+
164+
# 5 = Input/output error
165+
if($! == 5) {
166+
warn "[$pid] Ignoring child after $!\n" if DEBUG;
167+
$self->kill;
168+
$self->{stop}++;
169+
}
170+
elsif($!) {
171+
warn "[$pid] Child $!\n" if DEBUG;
172+
$self->emit_safe(error => "Read error: $!");
173+
}
174+
});
175+
$self->reactor->watch($stdout_read, 1, 0);
176+
$self->_setup_recurring_child_alive_check($pid);
177+
}
178+
else { # child ===========================================================
179+
if($args->{conduit} eq 'pty') {
180+
$stdin_write->make_slave_controlling_terminal;
181+
$stdin_read = $stdout_write = $stdin_write->slave;
182+
$stdin_read->set_raw if $args->{raw};
183+
$stdin_read->clone_winsize_from($args->{clone_winsize_from}) if $args->{clone_winsize_from};
184+
}
185+
186+
warn "[$$] Starting $args->{program} @{ $args->{program_args} }\n" if DEBUG;
187+
close $stdin_write;
188+
close $stdout_read;
189+
close STDIN;
190+
close STDOUT;
191+
close STDERR;
192+
open STDIN, '<&' . fileno $stdin_read or die $!;
193+
open STDOUT, '>&' . fileno $stdout_write or die $!;
194+
open STDERR, '>&' . fileno $stdout_write or die $!;
195+
select STDERR; $| = 1;
196+
select STDOUT; $| = 1;
197+
198+
$ENV{$_} = $args->{env}{$_} for keys %{ $args->{env} };
199+
200+
if(ref $args->{program} eq 'CODE') {
201+
$args->{program}->(@{ $args->{program_args} });
202+
}
203+
else {
204+
exec $args->{program}, @{ $args->{program_args} };
205+
}
206+
}
207+
}
208+
209+
sub _setup_recurring_child_alive_check {
210+
my($self, $pid) = @_;
211+
my $reactor = $self->reactor;
212+
213+
$reactor->{forks}{$pid} = $self;
214+
Scalar::Util::weaken($reactor->{forks}{$pid});
215+
$reactor->{fork_watcher} ||= $reactor->recurring(WAIT_PID_INTERVAL, sub {
216+
my $reactor = shift;
217+
218+
for my $pid (keys %{ $reactor->{forks} }) {
219+
local($?, $!);
220+
local $SIG{CHLD} = 'DEFAULT'; # no idea why i need to do this, but it seems like waitpid() below return -1 if not
221+
my $obj = $reactor->{forks}{$pid} || {};
222+
223+
if(waitpid($pid, WNOHANG) <= 0) {
224+
$obj->{stop} or next;
225+
# NOTE: cannot use kill() to check if the process is alive, since
226+
# the process might be owned by another user. (super tiadm)
227+
-d "/proc/$pid" and next;
228+
}
229+
230+
my($exit_value, $signal) = ($? >> 8, $? & 127);
231+
warn "[$pid] Child is dead $exit_value/$signal\n" if DEBUG;
232+
delete $reactor->{forks}{$pid} or next; # SUPER DUPER IMPORTANT THAT THIS next; IS NOT BEFORE waitpid; ABOVE!
233+
$obj->_read; # flush the rest
234+
$obj->emit_safe(close => $exit_value, $signal);
235+
$obj->_cleanup;
236+
}
237+
});
238+
}
239+
240+
=head2 write
241+
242+
$self->write($buffer);
243+
244+
Used to write data to the child process.
245+
246+
=cut
247+
248+
sub write {
249+
my($self, $buffer) = @_;
250+
251+
$self->{stdin_write} or return;
252+
warn "[${ \$self->pid }] Write buffer (" .url_escape($buffer) .")\n" if DEBUG;
253+
print { $self->{stdin_write} } $buffer;
254+
}
255+
256+
=head2 kill
257+
258+
$bool = $self->kill;
259+
$bool = $self->kill(15); # default
260+
261+
Used to signal the child.
262+
263+
=cut
264+
265+
sub kill {
266+
my $self = shift;
267+
my $signal = shift // 15;
268+
my $pid = $self->{pid} or return;
269+
270+
warn "[$pid] Kill $signal\n" if DEBUG;
271+
kill $signal, $pid;
272+
}
273+
274+
275+
sub _cleanup {
276+
my $self = shift;
277+
my $reactor = $self->{reactor} or return;
278+
279+
$reactor->watch($self->{stdout_read}, 0, 0) if $self->{stdout_read};
280+
$reactor->remove(delete $self->{stdout_read}) if $self->{stdout_read};
281+
$reactor->remove(delete $self->{delay}) if $self->{delay};
282+
$reactor->remove(delete $self->{stdin_write}) if $self->{stdin_write};
283+
}
284+
285+
sub _read {
286+
my $self = shift;
287+
my $stdout_read = $self->{stdout_read} or return;
288+
my $read = $stdout_read->sysread(my $buffer, CHUNK_SIZE, 0);
289+
290+
return unless defined $read;
291+
return unless $read;
292+
warn "[$self->{pid}] Got buffer (" .url_escape($buffer) .")\n" if DEBUG;
293+
$self->emit_safe(read => $buffer);
294+
}
295+
296+
sub DESTROY { shift->_cleanup }
297+
298+
=head1 AUTHOR
299+
300+
Jan Henning Thorsen - C<[email protected]>
301+
302+
=cut
303+
304+
1;

0 commit comments

Comments
 (0)