Skip to content

Commit b6a91d5

Browse files
committed
Send errors and exceptions from worker processes to the server process, so that it is easier to notice worker errors
1 parent 82a3477 commit b6a91d5

File tree

3 files changed

+55
-16
lines changed

3 files changed

+55
-16
lines changed

src/Crusse/JobServer/Server.php

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,15 @@ function _messageCallback( Message $message, EventLoop $loop, $socket ) {
128128
if ( $headers[ 'cmd' ] === 'job-result' ) {
129129

130130
$jobNumber = $headers[ 'job-num' ];
131-
// Only store the results if the client called getOrderedResults()
131+
132+
if ( strtolower( $headers[ 'job-status' ] ) != 'ok' )
133+
throw new \RuntimeException( 'Worker error: ['. $headers[ 'job-status' ] .'] '. $message->body );
134+
135+
// Only store the result if the client called getOrderedResults()
132136
$this->results[ $jobNumber ] = ( $this->jobCallback )
133137
? true
134138
: $message->body;
135-
139+
// ...otherwise return the result immediately in the callback
136140
if ( $this->jobCallback ) {
137141
call_user_func( $this->jobCallback, $message->body, count( $this->results ),
138142
count( $this->jobQueue ) );

src/Crusse/JobServer/Worker.php

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,34 +29,56 @@ function run() {
2929
$loop->run();
3030
}
3131
catch ( \Exception $e ) {
32-
trigger_error( $e->getMessage() );
32+
trigger_error( $e->getMessage() .' in '. $e->getFile() .':'. $e->getLine(), E_USER_WARNING );
3333
}
3434
}
3535

3636
function _messageCallback( Message $message, EventLoop $loop, $socket ) {
3737

3838
$headers = $message->headers;
3939

40-
if ( isset( $headers[ 'includes' ] ) ) {
41-
foreach ( array_filter( explode( ',', $headers[ 'includes' ] ) ) as $path )
42-
require_once $path;
40+
try {
41+
42+
if ( isset( $headers[ 'includes' ] ) ) {
43+
foreach ( array_filter( explode( ',', $headers[ 'includes' ] ) ) as $path )
44+
require_once $path;
45+
}
46+
47+
if ( !isset( $headers[ 'function' ] ) ) {
48+
$result = 'Request has no \'function\' header';
49+
$status = 'invalid_request';
50+
}
51+
else if ( !is_callable( $headers[ 'function' ] ) ) {
52+
$result = '\''. $headers[ 'function' ] .'\' is not callable';
53+
$status = 'invalid_request';
54+
}
55+
else {
56+
$result = call_user_func( $headers[ 'function' ], $message->body );
57+
$status = 'ok';
58+
}
4359
}
60+
catch ( \Exception $e ) {
4461

45-
if ( !isset( $headers[ 'function' ] ) )
46-
throw new \Exception( 'Request has no \'function\' header' );
47-
if ( !is_callable( $headers[ 'function' ] ) )
48-
throw new \Exception( '\''. $headers[ 'function' ] .'\' is not callable' );
62+
$result = $e->getMessage() .' in '. $e->getFile() .':'. $e->getLine();
63+
$status = 'exception';
64+
}
4965

50-
$result = call_user_func( $headers[ 'function' ], $message->body );
51-
$this->sendMessage( $loop, $socket, 'job-result', $headers[ 'job-num' ], $result );
66+
$this->sendMessage( $loop, $socket, 'job-result', $headers[ 'job-num' ], $status, $result );
5267
}
5368

54-
private function sendMessage( EventLoop $loop, $socket, $cmd, $jobNumber = null, $body = '' ) {
69+
private function sendMessage( EventLoop $loop, $socket, $cmd, $jobNumber = null, $status = 'ok', $body = '' ) {
70+
71+
if ( !is_string( $body ) )
72+
throw new \InvalidArgumentException( 'Worker result must be a string' );
5573

5674
$message = new Message();
5775
$message->headers[ 'cmd' ] = $cmd;
58-
if ( $jobNumber !== null )
76+
77+
if ( $jobNumber !== null ) {
78+
$message->headers[ 'job-status' ] = $status;
5979
$message->headers[ 'job-num' ] = $jobNumber;
80+
}
81+
6082
$message->body = $body;
6183

6284
$loop->send( $socket, $message );

tests/test.php

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ function generateString($length) {
2727
$server = new Crusse\JobServer\Server( 4 );
2828
$server->addWorkerInclude( __DIR__ .'/functions.php' );
2929
$server->setWorkerTimeout( 2 );
30-
for ( $i = 0; $i < 100; $i++ )
30+
for ( $i = 0; $i < 50; $i++ )
3131
$server->addJob( 'job_test', 'Job '. $i .': '. generateString( 100 * 250 ) );
3232

3333
echo 'Results with callback:'. PHP_EOL . PHP_EOL;
@@ -41,7 +41,7 @@ function generateString($length) {
4141
$server = new Crusse\JobServer\Server( 4 );
4242
$server->addWorkerInclude( __DIR__ .'/functions.php' );
4343
$server->setWorkerTimeout( 2 );
44-
for ( $i = 0; $i < 100; $i++ )
44+
for ( $i = 0; $i < 50; $i++ )
4545
$server->addJob( 'job_test', 'Job '. $i .': '. generateString( 100 * 250 ) );
4646

4747
$time = microtime( true );
@@ -52,6 +52,19 @@ function generateString($length) {
5252
$elapsedTotal = ( microtime( true ) - $timeTotal ) * 1000;
5353

5454
echo implode( PHP_EOL, array_keys( $res ) ) . PHP_EOL . PHP_EOL;
55+
56+
echo PHP_EOL .'Worker exception test:'. PHP_EOL . PHP_EOL;
57+
58+
$server = new Crusse\JobServer\Server( 4 );
59+
$server->setWorkerTimeout( 2 );
60+
$server->addJob( 'nonexistent_function_123', 'Job '. $i .': '. generateString( 100 * 250 ) );
61+
try {
62+
$res = $server->getOrderedResults();
63+
}
64+
catch ( \RuntimeException $e ) {
65+
echo 'Successfully caught exception: '. $e->getMessage() . PHP_EOL;
66+
}
67+
5568
echo 'Finished in '. $elapsed .' ms'. PHP_EOL;
5669
echo 'Total '. $elapsedTotal .' ms'. PHP_EOL;
5770

0 commit comments

Comments
 (0)