Skip to content

Commit 28072af

Browse files
Add job callbacks to Server
1 parent 2363b3a commit 28072af

File tree

3 files changed

+179
-167
lines changed

3 files changed

+179
-167
lines changed

README.md

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,160 @@
1-
# php-job-server
1+
# php-job-server
2+
3+
Brainstorming:
4+
5+
Blocking i/o (perfectly synchronized):
6+
7+
worker1: 2ms to finish
8+
worker2: 2ms to finish
9+
worker3: 2ms to finish
10+
11+
server reads result: 0.5ms (r)
12+
server writes new job: 0.5ms (w)
13+
14+
time: 0 1 2 3 4 5 6 7
15+
worker1: rrrwww rrrwww
16+
worker2: rrrwww rrrwww
17+
worker3: rrrwww rrrwww
18+
19+
-----------------------------------------------------------
20+
21+
Blocking i/o (not synchronized):
22+
23+
worker1: 3ms to finish
24+
worker2: 2ms to finish
25+
worker3: 1ms to finish
26+
27+
server reads result: 0.5ms (r)
28+
server writes new job: 0.5ms (w)
29+
cpu time available for extra non-i/o work: (+)
30+
waiting for i/o: (.)
31+
32+
time: 0 1 2 3 4 5 6 7
33+
worker1: rrrwww rrrwww
34+
worker2: ......rrrwww ......rrrwww
35+
worker3: ............rrrwww ............rrrwww
36+
server: ......
37+
38+
-----------------------------------------------------------
39+
40+
Non-blocking i/o (perfectly synchronized):
41+
42+
worker1: 2ms to finish
43+
worker2: 2ms to finish
44+
worker3: 2ms to finish
45+
46+
server reads result: 0.5ms (r)
47+
server writes new job: 0.5ms (w)
48+
cpu time available for extra non-i/o work: (+)
49+
waiting for i/o: (.)
50+
51+
time: 0 1 2 3 4 5 6 7
52+
worker1: r++r++r++w++w++w r++r++r++w++w++w
53+
worker2: r++r++r++w++w++w r++r++r++w++w++w
54+
worker3: r++r++r++w++w++w r++r++r++w++w++w
55+
server: ++++++++++
56+
57+
-----------------------------------------------------------
58+
59+
Non-blocking i/o (not synchronized):
60+
61+
worker1: 3ms to finish
62+
worker2: 2ms to finish
63+
worker3: 1ms to finish
64+
65+
server reads result: 0.5ms (r)
66+
server writes new job: 0.5ms (w)
67+
cpu time available for extra non-i/o work: (+)
68+
waiting for i/o: (.)
69+
70+
time: 0 1 2 3 4 5 6 7
71+
worker1: r++r++r++w++w++w r+r+rwww
72+
worker2: r++r++r++w++w++w r+rrw+w+w
73+
worker3: r++r++r++w++w++w rrrww+w
74+
server: ++++++
75+
76+
-----------------------------------------------------------
77+
78+
79+
Blocking i/o:
80+
-------------
81+
82+
Server:
83+
84+
results = []
85+
stream_socket_server
86+
while ( unfinished jobs )
87+
stream_socket_accept
88+
while ( got data )
89+
stream_socket_recvfrom
90+
results[] = data
91+
stream_socket_sendto
92+
fclose
93+
94+
Worker:
95+
96+
stream_socket_client
97+
stream_socket_sendto [register worker with server]
98+
stream_socket_recvfrom [get first job from server]
99+
while ( got job )
100+
stream_socket_sendto [send result]
101+
stream_socket_recvfrom [get another job]
102+
103+
104+
Non-blocking i/o:
105+
-----------------
106+
107+
Server:
108+
109+
clients = []
110+
writeBufferPerClient = []
111+
readBufferPerClient = []
112+
stream_socket_server
113+
stream_set_blocking( server socket not blocking )
114+
while ( unfinished jobs )
115+
stream_select( readable: clients + server, writable: clients )
116+
if ( server is readable )
117+
clients[] = stream_socket_accept
118+
stream_set_blocking( client socket not blocking )
119+
foreach ( clients[] )
120+
if ( is readable )
121+
while ( got data, which means client socket is open )
122+
stream_socket_recvfrom
123+
readBufferPerClient[] .= data
124+
if ( no received data )
125+
results[] = Request(readBuffer)->getBody/Headers()
126+
if ( is writable )
127+
while ( didn't write all bytes of writeBufferPerClient[current] yet )
128+
stream_socket_sendto
129+
if ( writeBufferPerClient[current] is empty )
130+
fclose
131+
unset( clients[current] )
132+
133+
Worker:
134+
135+
initialized = false
136+
jobQueue = []
137+
resultsQueue = []
138+
readBuffer = ''
139+
writeBuffer = ''
140+
stream_socket_client
141+
stream_set_blocking( client socket not blocking )
142+
while ( true )
143+
stream_select( readable: server, writable: server )
144+
if ( server is writable )
145+
if ( !initialized )
146+
writeBuffer = I'm a new worker, register me
147+
else
148+
writeBuffer = here's a job result resultsQueue.pop(), gimme another job
149+
while ( didn't write all bytes of writeBuffer yet )
150+
stream_socket_sendto
151+
if ( writeBuffer is empty )
152+
fclose
153+
if ( server is readable )
154+
while ( got data, which means client socket is open )
155+
stream_socket_recvfrom
156+
readBuffer .= data
157+
if ( no received data )
158+
jobQueue[] = Request(readBuffer)->getBody/Headers()
159+
160+

src/Crusse/JobServer/Server.php

Lines changed: 14 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -1,166 +1,5 @@
11
<?php
22

3-
/*
4-
5-
6-
Blocking i/o (perfectly synchronized):
7-
8-
worker1: 2ms to finish
9-
worker2: 2ms to finish
10-
worker3: 2ms to finish
11-
12-
server reads result: 0.5ms (r)
13-
server writes new job: 0.5ms (w)
14-
15-
time: 0 1 2 3 4 5 6 7
16-
worker1: rrrwww rrrwww
17-
worker2: rrrwww rrrwww
18-
worker3: rrrwww rrrwww
19-
20-
-----------------------------------------------------------
21-
22-
Blocking i/o (not synchronized):
23-
24-
worker1: 3ms to finish
25-
worker2: 2ms to finish
26-
worker3: 1ms to finish
27-
28-
server reads result: 0.5ms (r)
29-
server writes new job: 0.5ms (w)
30-
cpu time available for extra non-i/o work: (+)
31-
waiting for i/o: (.)
32-
33-
time: 0 1 2 3 4 5 6 7
34-
worker1: rrrwww rrrwww
35-
worker2: ......rrrwww ......rrrwww
36-
worker3: ............rrrwww ............rrrwww
37-
server: ......
38-
39-
-----------------------------------------------------------
40-
41-
Non-blocking i/o (perfectly synchronized):
42-
43-
worker1: 2ms to finish
44-
worker2: 2ms to finish
45-
worker3: 2ms to finish
46-
47-
server reads result: 0.5ms (r)
48-
server writes new job: 0.5ms (w)
49-
cpu time available for extra non-i/o work: (+)
50-
waiting for i/o: (.)
51-
52-
time: 0 1 2 3 4 5 6 7
53-
worker1: r++r++r++w++w++w r++r++r++w++w++w
54-
worker2: r++r++r++w++w++w r++r++r++w++w++w
55-
worker3: r++r++r++w++w++w r++r++r++w++w++w
56-
server: ++++++++++
57-
58-
-----------------------------------------------------------
59-
60-
Non-blocking i/o (not synchronized):
61-
62-
worker1: 3ms to finish
63-
worker2: 2ms to finish
64-
worker3: 1ms to finish
65-
66-
server reads result: 0.5ms (r)
67-
server writes new job: 0.5ms (w)
68-
cpu time available for extra non-i/o work: (+)
69-
waiting for i/o: (.)
70-
71-
time: 0 1 2 3 4 5 6 7
72-
worker1: r++r++r++w++w++w r+r+rwww
73-
worker2: r++r++r++w++w++w r+rrw+w+w
74-
worker3: r++r++r++w++w++w rrrww+w
75-
server: ++++++
76-
77-
-----------------------------------------------------------
78-
79-
80-
Blocking i/o:
81-
-------------
82-
83-
Server:
84-
85-
results = []
86-
stream_socket_server
87-
while ( unfinished jobs )
88-
stream_socket_accept
89-
while ( got data )
90-
stream_socket_recvfrom
91-
results[] = data
92-
stream_socket_sendto
93-
fclose
94-
95-
Worker:
96-
97-
stream_socket_client
98-
stream_socket_sendto [register worker with server]
99-
stream_socket_recvfrom [get first job from server]
100-
while ( got job )
101-
stream_socket_sendto [send result]
102-
stream_socket_recvfrom [get another job]
103-
104-
105-
Non-blocking i/o:
106-
-----------------
107-
108-
Server:
109-
110-
clients = []
111-
writeBufferPerClient = []
112-
readBufferPerClient = []
113-
stream_socket_server
114-
stream_set_blocking( server socket not blocking )
115-
while ( unfinished jobs )
116-
stream_select( readable: clients + server, writable: clients )
117-
if ( server is readable )
118-
clients[] = stream_socket_accept
119-
stream_set_blocking( client socket not blocking )
120-
foreach ( clients[] )
121-
if ( is readable )
122-
while ( got data, which means client socket is open )
123-
stream_socket_recvfrom
124-
readBufferPerClient[] .= data
125-
if ( no received data )
126-
results[] = Request(readBuffer)->getBody/Headers()
127-
if ( is writable )
128-
while ( didn't write all bytes of writeBufferPerClient[current] yet )
129-
stream_socket_sendto
130-
if ( writeBufferPerClient[current] is empty )
131-
fclose
132-
unset( clients[current] )
133-
134-
Worker:
135-
136-
initialized = false
137-
jobQueue = []
138-
resultsQueue = []
139-
readBuffer = ''
140-
writeBuffer = ''
141-
stream_socket_client
142-
stream_set_blocking( client socket not blocking )
143-
while ( true )
144-
stream_select( readable: server, writable: server )
145-
if ( server is writable )
146-
if ( !initialized )
147-
writeBuffer = I'm a new worker, register me
148-
else
149-
writeBuffer = here's a job result resultsQueue.pop(), gimme another job
150-
while ( didn't write all bytes of writeBuffer yet )
151-
stream_socket_sendto
152-
if ( writeBuffer is empty )
153-
fclose
154-
if ( server is readable )
155-
while ( got data, which means client socket is open )
156-
stream_socket_recvfrom
157-
readBuffer .= data
158-
if ( no received data )
159-
jobQueue[] = Request(readBuffer)->getBody/Headers()
160-
161-
162-
*/
163-
1643
namespace Crusse\JobServer;
1654

1665
use Symfony\Component\Process\Process;
@@ -175,6 +14,7 @@ class Server {
17514
private $results = array();
17615
private $workerTimeout = 60;
17716
private $sentJobCount = 0;
17+
private $jobCallback;
17818

17919
function __construct( $workerCount ) {
18020

@@ -212,7 +52,12 @@ function addWorkerInclude( $phpFilePath ) {
21252
$this->workerIncludes[] = $phpFilePath;
21353
}
21454

215-
function getResults() {
55+
function getResults( $jobCallback = null ) {
56+
57+
if ( $jobCallback && !is_callable( $jobCallback ) )
58+
throw new \InvalidArgumentException( '$jobCallback is not callable' );
59+
60+
$this->jobCallback = $jobCallback;
21661

21762
$loop = new EventLoop( $this->serverSocketAddr );
21863
$loop->listen( $this->workerTimeout );
@@ -231,7 +76,7 @@ function getResults() {
23176
}
23277

23378
// FIXME: workerTimeout isn't working yet. A sleep()ing worker does _not_
234-
// cause a timeout.
79+
// cause a timeout. Check why.
23580
function setWorkerTimeout( $timeout ) {
23681
$this->workerTimeout = (int) $timeout;
23782
}
@@ -263,8 +108,14 @@ function _messageCallback( Message $message, EventLoop $loop, $socket ) {
263108
throw new \Exception( 'Missing header "cmd"' );
264109

265110
if ( $headers[ 'cmd' ] === 'job-result' ) {
111+
266112
$jobNumber = $headers[ 'job-num' ];
267113
$this->results[ $jobNumber ] = $message->body;
114+
115+
if ( $this->jobCallback ) {
116+
call_user_func( $this->jobCallback, $message->body, count( $this->results ),
117+
count( $this->jobQueue ) );
118+
}
268119
}
269120

270121
// We have all the results; stop the event loop

tests/test.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,13 @@ function generateString($length) {
2323
$server->addWorkerInclude( __DIR__ .'/functions.php' );
2424
$server->setWorkerTimeout( 2 );
2525

26-
for ( $i = 0; $i < 1000; $i++ )
27-
$server->addJob( 'job_test', 'Job '. $i .': '. generateString( 2000 ) );
26+
for ( $i = 0; $i < 50; $i++ )
27+
$server->addJob( 'job_test', 'Job '. $i .': '. generateString( 50 ) );
2828

2929
$time = microtime( true );
30-
$res = $server->getResults();
30+
$res = $server->getResults( function( $result, $jobNumber, $total ) {
31+
echo 'Job '. $jobNumber .'/'. $total .': '. $result . PHP_EOL;
32+
} );
3133
$elapsed = ( microtime( true ) - $time ) * 1000;
3234
$elapsedTotal = ( microtime( true ) - $timeTotal ) * 1000;
3335

0 commit comments

Comments
 (0)