Skip to content

Commit a1f5338

Browse files
Comments
1 parent e4f7d99 commit a1f5338

File tree

2 files changed

+97
-0
lines changed

2 files changed

+97
-0
lines changed

src/Crusse/JobServer/EventLoop.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,18 @@
11
<?php
22

3+
//
4+
// TODO
5+
//
6+
// http://docs.libuv.org/en/v1.x/design.html
7+
//
8+
// "The event loop follows the rather usual single threaded asynchronous I/O approach: all (network) I/O is performed on non-blocking sockets. As part of a loop iteration **the loop will block waiting for I/O activity on sockets which have been added to the poller** and callbacks will be fired indicating socket conditions (readable, writable hangup) so handles can read, write or perform the desired I/O operation."
9+
//
10+
// - create a watchSocket() method for adding server and client sockets
11+
// - emit events (run eventCallback with a 'writable' event) when a socket has sent a _full_ Request, and becomes writable
12+
// - pass the callback the Request, and maybe a SocketWriter (or make the callback return a Response|null)
13+
// - implement BlockingEventLoop and NonBlockingEventLoop (with stream_select()) to test performance diff (if any)
14+
//
15+
316
namespace Crusse\JobServer;
417

518
class EventLoop {

src/Crusse/JobServer/Server.php

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,90 @@
7474
worker3: r++r++r++w++w++w rrrww+w
7575
server: ++++++
7676
77+
-----------------------------------------------------------
78+
79+
80+
Blocking i/o:
81+
-------------
82+
83+
Server:
84+
85+
results = []
86+
stream_socket_server
87+
while ( unfinished jobs )
88+
stream_socket_accept
89+
while ( got data )
90+
stream_socket_recvfrom
91+
results[] = data
92+
stream_socket_sendto
93+
fclose
94+
95+
Worker:
96+
97+
stream_socket_client
98+
stream_socket_sendto [register worker with server]
99+
stream_socket_recvfrom [get first job from server]
100+
while ( got job )
101+
stream_socket_sendto [send result]
102+
stream_socket_recvfrom [get another job]
103+
104+
105+
Non-blocking i/o:
106+
-----------------
107+
108+
Server:
109+
110+
clients = []
111+
writeBufferPerClient = []
112+
readBufferPerClient = []
113+
stream_socket_server
114+
stream_set_blocking( server socket not blocking )
115+
while ( unfinished jobs )
116+
stream_select( readable: clients + server, writable: clients )
117+
if ( server is readable )
118+
clients[] = stream_socket_accept
119+
stream_set_blocking( client socket not blocking )
120+
foreach ( clients[] )
121+
if ( is readable )
122+
while ( got data, which means client socket is open )
123+
stream_socket_recvfrom
124+
readBufferPerClient[] .= data
125+
if ( no received data )
126+
results[] = Request(readBuffer)->getBody/Headers()
127+
if ( is writable )
128+
while ( didn't write all bytes of writeBufferPerClient[current] yet )
129+
stream_socket_sendto
130+
if ( writeBufferPerClient[current] is empty )
131+
fclose
132+
unset( clients[current] )
133+
134+
Worker:
135+
136+
initialized = false
137+
jobQueue = []
138+
resultsQueue = []
139+
readBuffer = ''
140+
writeBuffer = ''
141+
stream_socket_client
142+
stream_set_blocking( client socket not blocking )
143+
while ( true )
144+
stream_select( readable: server, writable: server )
145+
if ( server is writable )
146+
if ( !initialized )
147+
writeBuffer = I'm a new worker, register me
148+
else
149+
writeBuffer = here's a job result resultsQueue.pop(), gimme another job
150+
while ( didn't write all bytes of writeBuffer yet )
151+
stream_socket_sendto
152+
if ( writeBuffer is empty )
153+
fclose
154+
if ( server is readable )
155+
while ( got data, which means client socket is open )
156+
stream_socket_recvfrom
157+
readBuffer .= data
158+
if ( no received data )
159+
jobQueue[] = Request(readBuffer)->getBody/Headers()
160+
77161
78162
*/
79163

0 commit comments

Comments
 (0)