Skip to content

Commit d2896b4

Browse files
author
Jan Henning Thorsen
committed
Add "drain" callback and allow to write before started
1 parent 0adb132 commit d2896b4

File tree

3 files changed

+68
-9
lines changed

3 files changed

+68
-9
lines changed

Changes

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ Revision history for Mojo-IOLoop-ReadWriteFork
22

33
0.06
44
* Fix "Use of uninitialized value in numeric eq (==) at ReadWriteFork.pm line 182."
5+
* Add support for write() before child process has started
6+
* Add support for "drain" callback to write()
57

68
0.05 Wed Feb 19 13:29:54 2014
79
* Fix "read" event cannot change ERRNO from sysread()

lib/Mojo/IOLoop/ReadWriteFork.pm

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,11 @@ sub _start {
167167
elsif($pid) { # parent ===================================================
168168
warn "[$pid] Child starting ($args->{program} @{$args->{program_args}})\n" if DEBUG;
169169
$self->{pid} = $pid;
170-
$self->{stdin_write} = $stdin_write;
171170
$self->{stdout_read} = $stdout_read;
172171
$stdout_read->close_slave if defined $stdout_read and UNIVERSAL::isa($stdout_read, 'IO::Pty');
173172

173+
$self->_stdin($stdin_write);
174+
174175
Scalar::Util::weaken($self);
175176
$self->reactor->io($stdout_read => sub {
176177
$self->{stop} and return;
@@ -254,19 +255,33 @@ sub _setup_recurring_child_alive_check {
254255

255256
=head2 write
256257
257-
$self->write($buffer);
258+
$self = $self->write($chunk);
259+
$self = $self->write($chunk, $cb);
260+
261+
Used to write data to the child process STDIN. An optional callback will be
262+
called once STDIN is drained.
258263
259-
Used to write data to the child process.
264+
Example:
265+
266+
$self->write("some data\n", sub {
267+
my ($self) = @_;
268+
$self->close_gracefully;
269+
});
260270
261271
=cut
262272

263273
sub write {
264-
my($self, $buffer) = @_;
274+
my $self = shift;
275+
276+
if ($self->{stdin}) {
277+
$self->{stdin}->write(@_);
278+
warn "[${ \$self->pid }] Wrote buffer (" .url_escape($_[0]) .")\n" if DEBUG;
279+
}
280+
else {
281+
push @{ $self->{stdin_buffer} }, [@_];
282+
}
265283

266-
$self->{stdin_write} or return;
267-
print { $self->{stdin_write} } $buffer;
268-
$self->{stdin_write}->flush or die "Write buffer (" .url_escape($buffer) .") failed: $!";
269-
warn "[${ \$self->pid }] Wrote buffer (" .url_escape($buffer) .")\n" if DEBUG;
284+
$self;
270285
}
271286

272287
=head2 kill
@@ -295,7 +310,8 @@ sub _cleanup {
295310
$reactor->watch($self->{stdout_read}, 0, 0) if $self->{stdout_read};
296311
$reactor->remove(delete $self->{stdout_read}) if $self->{stdout_read};
297312
$reactor->remove(delete $self->{delay}) if $self->{delay};
298-
$reactor->remove(delete $self->{stdin_write}) if $self->{stdin_write};
313+
$reactor->remove(delete $self->{delay}) if $self->{delay};
314+
$self->{stdin}->close if $self->{stdin};
299315
}
300316

301317
sub _read {
@@ -311,6 +327,19 @@ sub _read {
311327
$self->emit_safe(read => $buffer);
312328
}
313329

330+
sub _stdin {
331+
my ($self, $handle) = @_;
332+
my $stream = Mojo::IOLoop::Stream->new($handle);
333+
334+
Scalar::Util::weaken($self);
335+
$stream->reactor($self->reactor);
336+
$stream->timeout(0);
337+
$stream->start;
338+
339+
$self->{stdin} = $stream;
340+
$self->write(@$_) for @{ delete $self->{stdin_buffer} || [] };
341+
}
342+
314343
sub DESTROY { shift->_cleanup }
315344

316345
=head1 AUTHOR

t/drain.t

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
use Mojo::Base -strict;
2+
use Mojo::IOLoop::ReadWriteFork;
3+
use Test::More;
4+
use Test::Memory::Cycle;
5+
6+
{
7+
my $run = Mojo::IOLoop::ReadWriteFork->new;
8+
my $drain = 0;
9+
my $output = '';
10+
11+
$run->on(close => sub { Mojo::IOLoop->stop; });
12+
$run->on(read => sub { $output .= $_[1]; });
13+
$run->write("line one\n", sub { $drain++; });
14+
$run->start(program => sub {
15+
print sysread STDIN, my $buf, 1024;
16+
print "\n$buf";
17+
print "line two\n";
18+
});
19+
20+
Mojo::IOLoop->timer(3 => sub { Mojo::IOLoop->stop }); # guard
21+
Mojo::IOLoop->start;
22+
memory_cycle_ok $run, 'no cycle after run';
23+
24+
like $output, qr/^9\nline one\nline two\n/, 'can write() before start()' or diag $output;
25+
is $drain, 1, 'drain callback was called';
26+
}
27+
28+
done_testing;

0 commit comments

Comments
 (0)