Skip to content

Commit 9737f79

Browse files
Fix PHP warning "Socket disconnected unexpectedly in EventLoop.php:296 in Worker.php on line 32". Also made some function names a bit clearer.
1 parent 9b4321f commit 9737f79

File tree

6 files changed

+81
-35
lines changed

6 files changed

+81
-35
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
.DS_Store
2+
composer.lock
23
vendor/
4+
php_errors.log

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "crusse/job-server",
33
"description": "A library for computing the results of multiple PHP scripts in parallel",
4-
"version": "1.0.2",
4+
"version": "1.0.3",
55
"require": {
66
"php": ">=5.3.3",
77
"symfony/process": "^2.6"

src/Crusse/JobServer/EventLoop.php

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ function __destruct() {
3939
function connect() {
4040

4141
if ( ( $socket = socket_create( AF_UNIX, SOCK_STREAM, 0 ) ) === false )
42-
throw new \Exception( socket_strerror( socket_last_error() ) );
42+
throw new \Exception( 'Could not create socket: '. socket_strerror( socket_last_error() ) );
4343

4444
$this->addClientSocket( $socket );
4545

4646
if ( socket_connect( $socket, $this->serverSocketAddr ) === false )
47-
throw new \Exception( socket_strerror( socket_last_error() ) );
47+
throw new \Exception( 'Could not connect to socket. Peer is probably not listen()ing. '. socket_strerror( socket_last_error() ) );
4848

4949
return $socket;
5050
}
@@ -106,7 +106,7 @@ function listen( $acceptTimeout = 60 ) {
106106
}
107107

108108
function subscribe( $callable ) {
109-
109+
110110
if ( !is_callable( $callable ) )
111111
throw new \InvalidArgumentException( '$callable is not callable' );
112112

@@ -119,9 +119,9 @@ function send( $socket, Message $message ) {
119119
$this->log( 'Error: called send() after stop()' );
120120
throw new \LogicException( 'Calling send() after stop() is redundant' );
121121
}
122-
122+
123123
$socketIndex = array_search( $socket, $this->sockets, true );
124-
124+
125125
if ( $socketIndex === false ) {
126126
$this->log( 'Error: $socket was not found in list of clients' );
127127
throw new \InvalidArgumentException( 'No valid socket given' );
@@ -130,24 +130,27 @@ function send( $socket, Message $message ) {
130130
$this->writeBuffer[ $socketIndex ] .= (string) $message;
131131
}
132132

133-
function run() {
133+
function receive() {
134134

135135
$this->log( 'Using select() timeout of '. $this->acceptTimeout .' s' );
136136

137137
while ( true ) {
138138

139139
// We have no more sockets to poll, all have disconnected
140140
if ( !$this->sockets && !$this->serverSocket ) {
141-
$this->log( 'No more sockets to poll, exiting run() loop' );
141+
$this->log( 'No more sockets to poll, exiting receive() loop' );
142142
break;
143143
}
144144

145145
$readables = $this->sockets;
146146
if ( $this->serverSocket )
147147
$readables[] = $this->serverSocket;
148148

149-
$writableSocketKeys = array_keys( array_filter( $this->writeBuffer ) );
150-
149+
// We're only interested in the sockets for which we have any buffered
150+
// data to send.
151+
// Note: array_filter() preserves keys.
152+
$writableSocketKeys = array_keys( array_filter( $this->writeBuffer, 'strlen' ) );
153+
151154
if ( $writableSocketKeys ) {
152155
$writables = array();
153156
foreach ( $writableSocketKeys as $key )
@@ -176,21 +179,23 @@ function run() {
176179
$this->handleWritableSockets( $writables );
177180

178181
if ( $this->stop ) {
179-
$this->log( 'stop() was called, exiting run() loop' );
182+
$this->log( 'stop() was called, exiting receive() loop' );
180183
break;
181184
}
182185
}
183186

184187
foreach ( $this->sockets as $socket )
185188
$this->disconnect( $socket );
189+
186190
$this->sockets = array();
187191

188-
if ( $this->serverSocket )
192+
if ( $this->serverSocket ) {
189193
$this->disconnect( $this->serverSocket );
194+
$this->serverSocket = null;
195+
}
190196
}
191197

192198
function stop() {
193-
194199
$this->stop = true;
195200
}
196201

@@ -218,7 +223,7 @@ private function handleReadableSockets( $sockets ) {
218223
}
219224

220225
if ( $this->stop ) {
221-
$this->log( 'stop() was called, so will not read from other sockets' );
226+
$this->log( 'stop() was called, so will skip reading from remaining readable sockets' );
222227
break;
223228
}
224229
}
@@ -232,6 +237,7 @@ private function handleWritableSockets( $sockets ) {
232237
$buffer = $this->writeBuffer[ $socketIndex ];
233238
$bufferLen = strlen( $buffer );
234239

240+
// Nothing to write
235241
if ( !$bufferLen )
236242
continue;
237243

@@ -242,7 +248,7 @@ private function handleWritableSockets( $sockets ) {
242248
throw new \Exception( 'Could not write to socket' );
243249
}
244250

245-
$this->log( 'Sent '. $sentBytes .' b to '. $socketIndex );
251+
$this->log( 'Sent '. $sentBytes .' bytes to '. $socketIndex );
246252
$this->writeBuffer[ $socketIndex ] = substr( $buffer, $sentBytes );
247253
}
248254
}
@@ -274,9 +280,13 @@ private function getMessagesFromSocket( $socket ) {
274280
$socketIndex = array_search( $socket, $this->sockets, true );
275281
$buffer = $this->readBuffer[ $socketIndex ];
276282

283+
// --------------------------------------------------------------------
277284
// Populate the MessageBuffer from the socket
285+
// --------------------------------------------------------------------
278286

279287
$data = '';
288+
289+
// "socket_recv() returns the number of bytes received, or FALSE if there was an error"
280290
$dataLen = socket_recv( $socket, $data, 64 * 1024, MSG_DONTWAIT );
281291

282292
// There was an error
@@ -285,15 +295,21 @@ private function getMessagesFromSocket( $socket ) {
285295
throw new \Exception( socket_strerror( socket_last_error() ) );
286296
}
287297

288-
// Connection was dropped by peer. We expect peers to be dropped only after
289-
// $this->stop() has been called, so this is unexpected.
298+
// Connection was dropped by peer. Calling code can decide if this is
299+
// an error or not by catching the exception (for the Server this is
300+
// probably an error, as Workers shouldn't disconnect before the Server,
301+
// but for Workers the Server disconnecting is a signal that there's no
302+
// more jobs; maybe do this more cleanly later, by sending a "close" message
303+
// to the Workers from the Server like HTTP does...).
290304
if ( $dataLen === 0 )
291-
throw new \Exception( 'Socket disconnected unexpectedly' );
305+
throw new SocketDisconnectedException( 'Socket disconnected' );
292306

293-
$this->log( 'Recvd '. $dataLen .' b from '. $socketIndex );
307+
$this->log( 'Recvd '. $dataLen .' bytes from '. $socketIndex );
294308
$this->populateMessageBuffer( $data, $buffer );
295309

310+
// --------------------------------------------------------------------
296311
// Get finished Message objects from the MessageBuffer
312+
// --------------------------------------------------------------------
297313

298314
$messages = array();
299315

@@ -302,7 +318,7 @@ private function getMessagesFromSocket( $socket ) {
302318
$messages[] = $buffer->message;
303319
// Check if we received multiple messages' data from the socket
304320
$overflowBytes = $buffer->bodyLen - $buffer->message->headers[ 'body-len' ];
305-
321+
306322
// We got more bytes than the message consists of, so we got (possibly
307323
// partially) other messages' data
308324
if ( $overflowBytes > 0 ) {
@@ -333,11 +349,11 @@ private function populateMessageBuffer( $data, MessageBuffer &$buffer ) {
333349

334350
// We already have the header. Add further data to body.
335351
if ( $buffer->headerEnd !== false ) {
336-
352+
337353
$dataLen = strlen( $data );
338354
$buffer->bodyLen += $dataLen;
339355
$buffer->message->body .= $data;
340-
356+
341357
if ( $buffer->bodyLen >= $buffer->message->headers[ 'body-len' ] )
342358
$buffer->hasMessage = true;
343359
}
@@ -370,11 +386,20 @@ private function populateMessageBuffer( $data, MessageBuffer &$buffer ) {
370386

371387
private function disconnect( $socket ) {
372388

389+
$clientName = array_search( $socket, $this->sockets, true );
390+
391+
if ( $clientName === false ) {
392+
if ( $socket === $this->serverSocket )
393+
$clientName = 'SERVER';
394+
else
395+
$clientName = '[unknown]';
396+
}
397+
373398
// Close the connection until the worker client sends us a new result. We
374399
// silence any errors so that we don't have to test the connection status
375400
// before we try to close the socket.
376401
if ( @socket_shutdown( $socket, 2 ) )
377-
$this->log( 'Closed connection' );
402+
$this->log( 'Closed connection to socket '. $clientName );
378403
}
379404

380405
private function log( $msg, $socketIndex = 0 ) {
@@ -387,8 +412,7 @@ private function log( $msg, $socketIndex = 0 ) {
387412
$id = uniqid();
388413

389414
$prefix = ( $this->serverSocket ) ? '[SERVER] ' : '[worker] ';
390-
file_put_contents( '/tmp/crusse-job-server.log',
391-
$id .' '. $prefix . $msg . PHP_EOL, FILE_APPEND );
415+
file_put_contents( '/tmp/crusse-job-server.log', number_format( microtime( true ), 4, '.', '' ) .' '. $id .' '. $prefix . $msg . PHP_EOL, FILE_APPEND );
392416
}
393417
}
394418

src/Crusse/JobServer/Server.php

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ function __construct( $workerCount ) {
2020

2121
if ( $workerCount < 1 )
2222
throw new \InvalidArgumentException( '$workerCount must be >= 1' );
23-
23+
2424
$this->workerCount = $workerCount;
2525

2626
$tmpDir = sys_get_temp_dir();
2727
if ( !$tmpDir )
2828
throw new \Exception( 'Could not find the system temporary files directory' );
29+
2930
$this->serverSocketAddr = $tmpDir .'/php_job_server_'. md5( uniqid( true ) ) .'.sock';
3031
}
3132

@@ -62,12 +63,12 @@ function getOrderedResults() {
6263

6364
$loop = new EventLoop( $this->serverSocketAddr );
6465
$loop->listen( $this->workerTimeout );
65-
$loop->subscribe( array( $this, '_messageCallback' ) );
66+
$loop->subscribe( array( $this, '_handleMessageFromWorker' ) );
6667

6768
if ( !$this->workerProcs )
6869
$this->createWorkerProcs( $this->workerCount );
6970

70-
$loop->run();
71+
$loop->receive();
7172

7273
$results = $this->results;
7374
ksort( $results );
@@ -85,12 +86,12 @@ function getResults( $jobCallback ) {
8586

8687
$loop = new EventLoop( $this->serverSocketAddr );
8788
$loop->listen( $this->workerTimeout );
88-
$loop->subscribe( array( $this, '_messageCallback' ) );
89+
$loop->subscribe( array( $this, '_handleMessageFromWorker' ) );
8990

9091
if ( !$this->workerProcs )
9192
$this->createWorkerProcs( $this->workerCount );
9293

93-
$loop->run();
94+
$loop->receive();
9495
}
9596

9697
function setWorkerTimeout( $timeout ) {
@@ -107,16 +108,18 @@ private function createWorkerProcs( $count ) {
107108
// worker's don't bring down the web server so easily
108109
$process = new Process( 'exec nice -n 5 php '. dirname( __FILE__ ) .
109110
'/worker_process.php \''. $this->serverSocketAddr .'\'' );
111+
110112
// We don't need stdout/stderr as we're communicating via sockets
111113
$process->disableOutput();
112114
$process->start();
115+
113116
$workers[] = $process;
114117
}
115118

116119
$this->workerProcs = $workers;
117120
}
118121

119-
function _messageCallback( Message $message, EventLoop $loop, $socket ) {
122+
function _handleMessageFromWorker( Message $message, EventLoop $loop, $socket ) {
120123

121124
$headers = $message->headers;
122125

@@ -134,16 +137,18 @@ function _messageCallback( Message $message, EventLoop $loop, $socket ) {
134137
$this->results[ $jobNumber ] = ( $this->jobCallback )
135138
? true
136139
: $message->body;
140+
137141
// ...otherwise return the result immediately in the callback
138142
if ( $this->jobCallback ) {
139143
call_user_func( $this->jobCallback, $message->body, count( $this->results ),
140144
count( $this->jobQueue ) );
141145
}
142146
}
143147

144-
// We have all the results; stop the event loop
148+
// We have all the results; stop the server (which causes workers to stop
149+
// as well)
145150
if ( count( $this->results ) >= count( $this->jobQueue ) ) {
146-
151+
147152
$loop->stop();
148153
}
149154
// Send a job to the worker
@@ -159,7 +164,7 @@ function _messageCallback( Message $message, EventLoop $loop, $socket ) {
159164
$message->headers[ 'function' ] = $job[ 0 ];
160165
$message->body = $job[ 1 ];
161166

162-
// Job was sent to worker, free memory
167+
// Job was sent to worker. Free memory.
163168
$this->jobQueue[ $this->sentJobCount ] = '';
164169
$this->sentJobCount++;
165170

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Crusse\JobServer;
4+
5+
class SocketDisconnectedException extends \Exception {
6+
7+
}
8+

src/Crusse/JobServer/Worker.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require_once dirname( __FILE__ ) .'/EventLoop.php';
88
require_once dirname( __FILE__ ) .'/Message.php';
99
require_once dirname( __FILE__ ) .'/MessageBuffer.php';
10+
require_once dirname( __FILE__ ) .'/SocketDisconnectedException.php';
1011

1112
class Worker {
1213

@@ -26,7 +27,13 @@ function run() {
2627
$loop->subscribe( array( $this, '_messageCallback' ) );
2728
$socket = $loop->connect();
2829
$this->sendMessage( $loop, $socket, 'new-worker' );
29-
$loop->run();
30+
$loop->receive();
31+
}
32+
catch ( SocketDisconnectedException $e ) {
33+
// This is expected. Currently, if the Server disconnects the socket that
34+
// the Worker is communicating on, it's a signal to the Worker that there
35+
// are not more jobs to handle. Maybe do this more cleanly later, by
36+
// sending a "close" message to the Workers from the Server like HTTP does...
3037
}
3138
catch ( \Exception $e ) {
3239
trigger_error( $e->getMessage() .' in '. $e->getFile() .':'. $e->getLine(), E_USER_WARNING );

0 commit comments

Comments
 (0)