Skip to content

Commit 3f45331

Browse files
Use non-blocking IO
1 parent 5b53532 commit 3f45331

File tree

7 files changed

+344
-191
lines changed

7 files changed

+344
-191
lines changed

src/Crusse/JobServer/EventLoop.php

Lines changed: 229 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
// "The event loop follows the rather usual single threaded asynchronous I/O approach: all (network) I/O is performed on non-blocking sockets. As part of a loop iteration **the loop will block waiting for I/O activity on sockets which have been added to the poller** and callbacks will be fired indicating socket conditions (readable, writable hangup) so handles can read, write or perform the desired I/O operation."
99
//
1010
// - create a watchSocket() method for adding server and client sockets
11-
// - emit events (run eventCallback with a 'writable' event) when a socket has sent a _full_ Request, and becomes writable
12-
// - pass the callback the Request, and maybe a SocketWriter (or make the callback return a Response|null)
11+
// - emit events (run eventCallback with a 'writable' event) when a socket has sent a _full_ Message, and becomes writable
12+
// - pass the callback the Message, and maybe a SocketWriter (or make the callback return a Response|null)
1313
// - implement BlockingEventLoop and NonBlockingEventLoop (with stream_select()) to test performance diff (if any)
1414
//
1515
// https://www.reddit.com/r/programming/comments/3vzepv/the_difference_between_asynchronous_and/
@@ -21,118 +21,280 @@
2121

2222
class EventLoop {
2323

24-
private $listenStream;
25-
private $eventCallback;
26-
private $timeout = 60;
27-
28-
function __construct( $listenStream ) {
24+
private $serverStream;
25+
private $acceptTimeout = 60;
26+
private $callbacks = array();
27+
private $streams = array();
28+
private $blocking = true;
29+
private $stop = false;
30+
31+
function __construct( $blockingIo ) {
32+
33+
$this->blocking = (bool) $blockingIo;
34+
}
35+
36+
function addClientStream( $stream, $readTimeout = 2, $writeTimeout = 2 ) {
37+
38+
if ( !is_resource( $stream ) )
39+
throw new \InvalidArgumentException( '$stream is not a resource' );
2940

30-
$this->listenStream = $listenStream;
41+
stream_set_blocking( $stream, $this->blocking );
42+
$socket = socket_import_stream( $stream );
43+
socket_set_option( $socket, SOL_SOCKET, SO_RCVTIMEO, array( 'sec' => $readTimeout, 'usec' => 0 ) );
44+
socket_set_option( $socket, SOL_SOCKET, SO_SNDTIMEO, array( 'sec' => $writeTimeout, 'usec' => 0 ) );
45+
46+
$foundSpot = false;
47+
$streamCount = count( $this->streams );
48+
49+
// Reuse array keys, instead of always pushing to the array with an
50+
// incrementing key, so that we don't have large integer keys. We don't use
51+
// array_unshift when removing streams, as it's slow, so we use unset()
52+
// instead.
53+
for ( $i = 0; $i < $streamCount; $i++ ) {
54+
if ( !isset( $this->streams[ $i ] ) ) {
55+
$foundSpot = true;
56+
$this->streams[ $i ] = $stream;
57+
break;
58+
}
59+
}
60+
61+
if ( !$foundSpot )
62+
$this->streams[] = $stream;
63+
64+
$this->readBuffer[ $i ] = new MessageBuffer();
65+
$this->writeBuffer[ $i ] = '';
3166
}
3267

33-
private function tick() {
68+
function addServerStream( $stream, $acceptTimeout = 60 ) {
69+
70+
if ( !is_resource( $stream ) )
71+
throw new \InvalidArgumentException( '$stream is not a resource' );
3472

35-
$client = stream_socket_accept( $this->listenStream, $this->timeout );
73+
stream_set_blocking( $stream, $this->blocking );
3674

37-
if ( !$client )
38-
throw new \Exception( 'Reached idle timeout ('. $this->timeout .')' );
75+
$this->serverStream = $stream;
76+
$this->acceptTimeout = (int) $acceptTimeout;
77+
}
3978

40-
$request = $this->getRequestFromClientStream( $client );
79+
function subscribe( $callable ) {
80+
81+
if ( !is_callable( $callable ) )
82+
throw new \InvalidArgumentException( '$callable is not callable' );
4183

42-
return call_user_func( $this->eventCallback, $request );
84+
$this->callbacks[] = $callable;
4385
}
4486

45-
function run( $eventCallback ) {
87+
function send( $stream, Message $message ) {
4688

47-
$this->eventCallback = $eventCallback;
89+
if ( $this->stop )
90+
throw new \LogicException( 'Calling send() after stop() is redundant' );
91+
92+
$streamIndex = array_search( $stream, $this->streams, true );
4893

94+
if ( $streamIndex === false )
95+
throw new \InvalidArgumentException( 'No valid socket stream given' );
96+
97+
$this->writeBuffer[ $streamIndex ] .= (string) $message;
98+
}
99+
100+
function run() {
101+
49102
while ( true ) {
50-
if ( !$this->tick() )
103+
104+
$readables = $this->streams;
105+
if ( $this->serverStream )
106+
$readables[] = $this->serverStream;
107+
$writables = $this->streams;
108+
$nullVar = null;
109+
110+
$changedStreams = stream_select( $readables, $writables, $nullVar, $this->acceptTimeout );
111+
112+
// TODO: can it happen that $changedStreams === 0 and $readables is not empty?
113+
if ( !$changedStreams )
114+
throw new \Exception( 'select() timed out' );
115+
116+
// When we're stopping the loop, write all remaining buffer out, but
117+
// don't receive anything in anymore
118+
if ( $readables && !$this->stop )
119+
$this->handleReadableStreams( $readables );
120+
121+
if ( $writables )
122+
$this->handleWritableStreams( $writables );
123+
124+
// Exit the loop when all writes have been done
125+
if ( $this->stop && !array_filter( $this->writeBuffer ) )
51126
break;
52127
}
128+
129+
foreach ( $this->streams as $stream )
130+
$this->closeConnection( $stream );
131+
$this->streams = array();
132+
133+
if ( $this->serverStream )
134+
$this->closeConnection( $this->serverStream );
135+
unset( $this->serverStream );
136+
}
137+
138+
function stop() {
139+
140+
$this->stop = true;
53141
}
54142

55-
private function getRequestFromClientStream( $stream ) {
143+
private function handleReadableStreams( $streams ) {
144+
145+
if ( in_array( $this->serverStream, $streams ) )
146+
$this->acceptClient();
56147

57-
$request = new Request();
58-
$request->stream = $stream;
148+
foreach ( $streams as $stream ) {
59149

60-
while ( $this->populateRequestFromStream( $stream, $request ) ) {}
150+
if ( $stream === $this->serverStream )
151+
continue;
61152

62-
// Free memory
63-
unset( $request->headerBuffer );
64-
unset( $request->headerEnd );
65-
unset( $request->bodyLen );
153+
$messages = $this->getMessagesFromStream( $stream );
66154

67-
// The connection was closed before we got the full response
68-
if ( !$request->valid )
69-
throw new \Exception( 'Stream was closed unexpectedly' );
155+
if ( !$messages )
156+
continue;
70157

71-
return $request;
158+
foreach ( $messages as $message ) {
159+
foreach ( $this->callbacks as $callback ) {
160+
call_user_func( $callback, $message, $this, $stream );
161+
}
162+
}
163+
}
72164
}
73165

74-
private function populateRequestFromStream( $stream, &$request ) {
166+
private function handleWritableStreams( $streams ) {
75167

76-
$data = stream_socket_recvfrom( $stream, 1500 );
77-
$dataLen = strlen( $data );
168+
foreach ( $streams as $stream ) {
78169

79-
// Connection was dropped by the receiver, or the socket receive timed out
80-
if ( !$dataLen )
81-
return false;
170+
$streamIndex = array_search( $stream, $this->streams, true );
171+
$buffer = $this->writeBuffer[ $streamIndex ];
172+
$bufferLen = strlen( $buffer );
173+
$sentBytes = stream_socket_sendto( $stream, $buffer );
82174

83-
$request->headerBuffer .= $data;
175+
if ( $sentBytes < 0 )
176+
throw new \Exception( 'Could not write to socket' );
177+
178+
$this->writeBuffer[ $streamIndex ] = substr( $buffer, $sentBytes );
179+
}
180+
}
181+
182+
private function acceptClient() {
183+
184+
$stream = stream_socket_accept( $this->serverStream, $this->acceptTimeout );
185+
186+
if ( !$stream )
187+
throw new \Exception( 'Reached listen timeout ('. $this->acceptTimeout .')' );
188+
189+
$this->addClientStream( $stream );
190+
191+
return $stream;
192+
}
193+
194+
/**
195+
* Returns one or more Messages from the stream. Reading from the stream
196+
* might return multiple messages, and in that case this function will
197+
* conserve message boundaries and return each message as a Message.
198+
*
199+
* @return array Array of Message objects. Can be empty.
200+
*/
201+
private function getMessagesFromStream( $stream ) {
202+
203+
$streamIndex = array_search( $stream, $this->streams, true );
204+
$buffer = $this->readBuffer[ $streamIndex ];
205+
206+
// Populate the MessageBuffer from the stream
207+
208+
if ( $this->blocking ) {
209+
do {
210+
$data = stream_socket_recvfrom( $stream, 1500 );
211+
$this->populateMessageBuffer( $data, $buffer );
212+
}
213+
while ( !$buffer->hasMessage );
214+
}
215+
else {
216+
$data = stream_socket_recvfrom( $stream, 1500 );
217+
$this->populateMessageBuffer( $data, $buffer );
218+
}
219+
220+
// Get finished Message objects from the MessageBuffer
221+
222+
$messages = array();
223+
224+
while ( $buffer->hasMessage ) {
225+
226+
$messages[] = $buffer->message;
227+
// Check if we received multiple messages' data from the stream
228+
$overflowBytes = $buffer->bodyLen - $buffer->message->headers[ 'body-len' ];
229+
230+
// We got more bytes than the message consists of, so we got (possibly
231+
// partially) other messages' data
232+
if ( $overflowBytes > 0 ) {
233+
234+
$overflow = substr( $buffer->message->body, -$overflowBytes );
235+
$buffer->message->body .= substr( $buffer->message->body, 0, -$overflowBytes );
236+
$messages[] = $buffer->message;
237+
238+
$buffer = new MessageBuffer();
239+
$this->readBuffer[ $streamIndex ] = $buffer;
240+
241+
$this->populateMessageBuffer( $overflow, $buffer );
242+
}
243+
// We got the whole message, and nothing more (no overflow to the next message)
244+
else {
245+
246+
$buffer = new MessageBuffer();
247+
$this->readBuffer[ $streamIndex ] = $buffer;
248+
}
249+
}
250+
251+
return $messages;
252+
}
253+
254+
private function populateMessageBuffer( $data, MessageBuffer &$buffer ) {
84255

85256
// We already have the header. Add further data to body.
86-
if ( $request->headerEnd !== false ) {
257+
if ( $buffer->headerEnd !== false ) {
87258

88-
$request->body .= $data;
89-
$request->bodyLen += $dataLen;
259+
$dataLen = strlen( $data );
260+
$buffer->bodyLen += $dataLen;
261+
$buffer->message->body .= $data;
90262

91-
if ( $request->bodyLen >= $request->headers[ 'body-len' ] ) {
92-
$request->valid = true;
93-
return false;
94-
}
263+
if ( $buffer->bodyLen >= $buffer->message->headers[ 'body-len' ] )
264+
$buffer->hasMessage = true;
95265
}
96-
// We're reading the header of the request
266+
// We're reading the header of the message
97267
else {
98268

99-
$request->headerEnd = strpos( $request->headerBuffer, "\n\n" );
269+
$buffer->headerBuffer .= $data;
270+
$buffer->headerEnd = strpos( $buffer->headerBuffer, "\n\n" );
100271

101-
if ( $request->headerEnd !== false ) {
272+
if ( $buffer->headerEnd !== false ) {
102273

103-
$headerLines = array_filter( explode( "\n", substr( $request->headerBuffer, 0, $request->headerEnd ) ) );
274+
$headerLines = array_filter( explode( "\n", substr( $buffer->headerBuffer, 0, $buffer->headerEnd ) ) );
104275

105276
foreach ( $headerLines as $line ) {
106277
$colonPos = strpos( $line, ':' );
107278
$key = substr( $line, 0, $colonPos );
108279
$val = substr( $line, $colonPos + 1 );
109-
$request->headers[ $key ] = $val;
280+
$buffer->message->headers[ $key ] = $val;
110281
}
111282

112-
$bodyPart = substr( $headerBuffer, $headerEnd + 2 );
113-
$request->body .= $bodyPart;
114-
$bodyLen += strlen( $bodyPart );
283+
$bodyPart = substr( $buffer->headerBuffer, $buffer->headerEnd + 2 );
284+
$buffer->message->body .= $bodyPart;
285+
$buffer->bodyLen += strlen( $bodyPart );
115286

116-
if ( $request->bodyLen >= $request->headers[ 'body-len' ] ) {
117-
$request->valid = true;
118-
return false;
119-
}
287+
if ( $buffer->bodyLen >= $buffer->message->headers[ 'body-len' ] )
288+
$buffer->hasMessage = true;
120289
}
121290
}
122-
123-
return true;
124291
}
125-
}
126292

127-
class Request {
293+
private function closeConnection( $stream ) {
128294

129-
public $stream;
130-
public $headers = array();
131-
public $body = '';
132-
public $valid = false;
133-
134-
public $headerBuffer = '';
135-
public $headerEnd = false;
136-
public $bodyLen = 0;
295+
// Close the connection until the worker client sends us a new result
296+
stream_socket_shutdown( $stream, STREAM_SHUT_RDWR );
297+
fclose( $stream );
298+
}
137299
}
138300

src/Crusse/JobServer/Message.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<?php
2+
3+
namespace Crusse\JobServer;
4+
5+
class Message {
6+
7+
public $headers = array();
8+
public $body = '';
9+
10+
function __toString() {
11+
12+
// Always set the body-len header after all headers have been added, so that
13+
// we override any body-len header set earlier
14+
$this->headers[ 'body-len' ] = strlen( $this->body );
15+
16+
$ret = '';
17+
18+
foreach ( $this->headers as $key => $val )
19+
$ret .= trim( $key ) .':'. $val ."\n";
20+
21+
return rtrim( $ret ) ."\n\n". $this->body;
22+
}
23+
}
24+

0 commit comments

Comments
 (0)