Skip to content

Commit 760ca69

Browse files
author
Jan Henning Thorsen
committed
Fix IO::Pty and write()
1 parent 6d63380 commit 760ca69

File tree

4 files changed

+80
-59
lines changed

4 files changed

+80
-59
lines changed

Changes

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ Revision history for Mojo-IOLoop-ReadWriteFork
55
* Add support for write() before child process has started
66
* Add support for "drain" callback to write()
77
* Add run(). a simpler version start()
8-
* Add support for closing STDIN
8+
* Add close() for closing STDIN
99

1010
0.05 Wed Feb 19 13:29:54 2014
1111
* Fix "read" event cannot change ERRNO from sysread()

lib/Mojo/IOLoop/ReadWriteFork.pm

Lines changed: 35 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,9 @@ See L<https://github.com/jhthorsen/mojo-ioloop-readwritefork/tree/master/example
5050
=cut
5151

5252
use Mojo::Base 'Mojo::EventEmitter';
53-
use IO::Pty;
5453
use Mojo::Util 'url_escape';
54+
use Errno qw( EAGAIN ECONNRESET EINTR EPIPE EWOULDBLOCK );
55+
use IO::Pty;
5556
use POSIX ':sys_wait_h';
5657
use Scalar::Util ();
5758
use constant CHUNK_SIZE => $ENV{MOJO_CHUNK_SIZE} || 131072;
@@ -109,22 +110,10 @@ Close STDIN stream to the child process immediately.
109110
=cut
110111

111112
sub close {
112-
my ($self, $what) = @_;
113-
$self->{stream}{$what}->close if ref $self->{stream}{$what};
114-
$self;
115-
}
116-
117-
=head2 close_gracefully
118-
119-
$self = $self->close_gracefully("stdin");
120-
121-
Close STDIN to the child process stream gracefully.
122-
123-
=cut
124-
125-
sub close_gracefully {
126-
my ($self, $what) = @_;
127-
$self->{stream}{$what}->close_gracefully if ref $self->{stream}{$what};
113+
my $self = shift;
114+
my $what = $_[0] eq 'stdout' ? 'stdout_read' : 'stdin_write'; # stdout_read is EXPERIMENTAL
115+
my $fh = delete $self->{$what} or return $self;
116+
CORE::close($fh) or $self->emit(error => $!);
128117
$self;
129118
}
130119

@@ -211,10 +200,9 @@ sub _start {
211200
warn "[$pid] Child starting ($args->{program} @{$args->{program_args}})\n" if DEBUG;
212201
$self->{pid} = $pid;
213202
$self->{stdout_read} = $stdout_read;
203+
$self->{stdin_write} = $stdin_write;
214204
$stdout_read->close_slave if defined $stdout_read and UNIVERSAL::isa($stdout_read, 'IO::Pty');
215205

216-
$self->_stdin($stdin_write);
217-
218206
Scalar::Util::weaken($self);
219207
$self->reactor->io($stdout_read => sub {
220208
$self->{stop} and return;
@@ -236,6 +224,7 @@ sub _start {
236224
});
237225
$self->reactor->watch($stdout_read, 1, 0);
238226
$self->_setup_recurring_child_alive_check($pid);
227+
$self->_write;
239228
}
240229
else { # child ===========================================================
241230
if($args->{conduit} eq 'pty') {
@@ -308,22 +297,17 @@ Example:
308297
309298
$self->write("some data\n", sub {
310299
my ($self) = @_;
311-
$self->close_gracefully;
300+
$self->close;
312301
});
313302
314303
=cut
315304

316305
sub write {
317-
my $self = shift;
318-
319-
if ($self->{stream}{stdin}) {
320-
$self->{stream}{stdin}->write(@_);
321-
warn "[${ \$self->pid }] Wrote buffer (" .url_escape($_[0]) .")\n" if DEBUG;
322-
}
323-
else {
324-
push @{ $self->{stdin_buffer} }, [@_];
325-
}
306+
my ($self, $chunk, $cb) = @_;
326307

308+
$self->once(drain => $cb) if $cb;
309+
$self->{stdin_buffer} .= $chunk;
310+
$self->_write if $self->{stdin_write};
327311
$self;
328312
}
329313

@@ -345,6 +329,11 @@ sub kill {
345329
kill $signal, $pid;
346330
}
347331

332+
sub _error {
333+
return if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK;
334+
return $_[0]->kill if $! == ECONNRESET || $! == EPIPE;
335+
return $_[0]->emit(error => $!)->kill;
336+
}
348337

349338
sub _cleanup {
350339
my $self = shift;
@@ -353,8 +342,6 @@ sub _cleanup {
353342
$reactor->watch($self->{stdout_read}, 0, 0) if $self->{stdout_read};
354343
$reactor->remove(delete $self->{stdout_read}) if $self->{stdout_read};
355344
$reactor->remove(delete $self->{delay}) if $self->{delay};
356-
$reactor->remove(delete $self->{delay}) if $self->{delay};
357-
delete($self->{stream}{stdin})->close if $self->{stream}{stdin};
358345
}
359346

360347
sub _read {
@@ -370,18 +357,24 @@ sub _read {
370357
$self->emit_safe(read => $buffer);
371358
}
372359

373-
sub _stdin {
374-
my ($self, $handle) = @_;
375-
my $stream = Mojo::IOLoop::Stream->new($handle);
376-
377-
Scalar::Util::weaken($self);
378-
$stream->on(error => sub { $! == 9 ? shift->close : return $self->emit(error => $_[1]); });
379-
$stream->reactor($self->reactor);
380-
$stream->timeout(0);
381-
$stream->start;
360+
sub _write {
361+
my $self = shift;
382362

383-
$self->{stream}{stdin} = $stream;
384-
$self->write(@$_) for @{ delete $self->{stdin_buffer} || [] };
363+
return unless length $self->{stdin_buffer};
364+
my $stdin_write = $self->{stdin_write};
365+
my $written = $stdin_write->syswrite($self->{stdin_buffer});
366+
return $self->_error unless defined $written;
367+
my $chunk = substr $self->{stdin_buffer}, 0, $written, '';
368+
warn "[${ \$self->pid }] Wrote buffer (" .url_escape($chunk) .")\n" if DEBUG;
369+
370+
if (length $self->{stdin_buffer}) {
371+
# This is one ugly hack because it does not seem like IO::Pty play
372+
# nice with Mojo::Reactor(::EV) ->io(...) and ->watch(...)
373+
$self->reactor->timer(0.01 => sub { $self and $self->_write });
374+
}
375+
else {
376+
$self->emit_safe('drain');
377+
}
385378
}
386379

387380
sub DESTROY { shift->_cleanup }

t/close.t

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,4 @@ use Test::Memory::Cycle;
1919
like $output, qr/line one\nline two\nFORCE\n/, 'close' or diag $output;
2020
}
2121

22-
{
23-
my $run = Mojo::IOLoop::ReadWriteFork->new;
24-
my $output = '';
25-
26-
$run->on(close => sub { Mojo::IOLoop->stop; });
27-
$run->on(error => sub { diag "error: @_" });
28-
$run->on(read => sub { $output .= $_[1]; shift->close_gracefully('stdin'); });
29-
$run->write("line one\nline two\n");
30-
$run->run(sub { print while <>; print "GRACEFUL\n" });
31-
32-
Mojo::IOLoop->timer(3 => sub { Mojo::IOLoop->stop }); # guard
33-
Mojo::IOLoop->start;
34-
35-
like $output, qr/line one\nline two\nGRACEFUL\n/, 'close_gracefully' or diag $output;
36-
}
37-
3822
done_testing;

t/telnet.t

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
use Mojo::Base -strict;
2+
use Mojo::IOLoop;
3+
use Mojo::IOLoop::ReadWriteFork;
4+
use Test::More;
5+
use Test::Memory::Cycle;
6+
7+
$ENV{PATH} ||= '';
8+
plan skip_all => 'telnet is missing' unless grep { -x "$_/telnet" } split /:/, $ENV{PATH};
9+
10+
my $port = Mojo::IOLoop::Server->generate_port;
11+
12+
# echo server
13+
Mojo::IOLoop->server({ port => $port }, sub {
14+
my ($ioloop, $stream) = @_;
15+
$stream->on(read => sub { $_[0]->write("I heard you say: $_[1]"); });
16+
});
17+
18+
{
19+
my $run = Mojo::IOLoop::ReadWriteFork->new;
20+
my $output = '';
21+
my $drain = 0;
22+
23+
$run->on(close => sub { Mojo::IOLoop->stop; });
24+
$run->on(read => sub {
25+
my ($run, $chunk) = @_;
26+
$run->write("hey\n", sub { $drain++; }) if $chunk =~ /Connected/;
27+
$run->kill if $chunk =~ /I heard you say: hey/;
28+
$output .= $chunk;
29+
});
30+
31+
$run->start(
32+
program => 'telnet',
33+
program_args => [localhost => $port],
34+
conduit => 'pty',
35+
);
36+
37+
#Mojo::IOLoop->timer(2 => sub { Mojo::IOLoop->stop }); # guard
38+
Mojo::IOLoop->start;
39+
like $output, qr{Connected}, 'Connected';
40+
like $output, qr{I heard you say: hey}, 'got echo';
41+
is $drain, 1, 'got drain event';
42+
}
43+
44+
done_testing;

0 commit comments

Comments
 (0)