Skip to content

Commit 1326b4d

Browse files
committed
Update LibUV example.[ci skip]
1 parent 36de4e1 commit 1326b4d

File tree

1 file changed

+49
-18
lines changed

1 file changed

+49
-18
lines changed

examples/uv_actor.lua

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,64 @@
11
-- Use LibUV event loop in actor thread
2-
--
3-
-- !NOTE! I can not figure out why this does not work
4-
-- with `inproc` transport so I use `tcp` to communicate
5-
-- with actor thread
62

73
local function thread(pipe)
84
local uv = require "lluv"
5+
local ut = require "lluv.utils"
96
local zmq = require "lzmq"
107

11-
local function uv_poll_zmq(sock, cb)
12-
uv.poll_socket(sock:fd()):start(function(handle, err, event)
13-
if err then cb(sock, err) else
14-
while handle:active() and not sock:closed() do
15-
local ok, err = sock:has_event(zmq.POLLIN)
16-
if ok == nil then cb(sock, err) break end
17-
if ok then cb(sock) else break end
18-
end
8+
local uv_poll_zmq = ut.class() do
9+
10+
function uv_poll_zmq:__init(s)
11+
self._s = s
12+
self._h = uv.poll_socket(s:fd())
13+
return self
14+
end
15+
16+
local function on_poll(self, err, cb)
17+
if err then cb(self, err, self._s) else
18+
while self._h:active() and not self._s:closed() do
19+
local ok, err = self._s:has_event(zmq.POLLIN)
20+
if ok == nil then cb(self, err, self._s) break end
21+
if ok then cb(self, nil, self._s) else break end
1922
end
20-
if sock:closed() then handle:close() end
21-
end)
23+
end
24+
if self._s:closed() then self._h:close() end
25+
end
26+
27+
function uv_poll_zmq:start(cb)
28+
self._h:start(function(handle, err) on_poll(self, err, cb) end)
29+
30+
-- For `inproc` socket without this call socket never get in signal state.
31+
local ok, err = self._s:has_event(zmq.POLLIN)
32+
if ok == nil then
33+
-- context already terminated
34+
uv.defer(on_poll, self, err, cb)
35+
elseif ok then
36+
-- socket already has events
37+
uv.defer(on_poll, self, nil, cb)
38+
end
39+
40+
return self
41+
end
42+
43+
function uv_poll_zmq:stop()
44+
self._h:stop()
45+
return self
46+
end
47+
48+
function uv_poll_zmq:close(...)
49+
self._h:close(...)
50+
return self
51+
end
52+
2253
end
2354

24-
uv_poll_zmq(pipe, function(pipe, err)
55+
uv_poll_zmq.new(pipe):start(function(handle, err, pipe)
2556
if err then
2657
print("Poll error:", err)
27-
return pipe:close()
58+
return handle:close()
2859
end
2960

30-
print(pipe:recv())
61+
print("Pipe recv:", pipe:recvx())
3162
end)
3263

3364
uv.timer():start(1000, function()
@@ -40,7 +71,7 @@ end
4071
local zth = require "lzmq.threads"
4172
local ztm = require "lzmq.timer"
4273

43-
local actor = zth.xactor{thread, pipe = 'tcp'}:start()
74+
local actor = zth.xactor(thread):start()
4475

4576
for i = 1, 5 do
4677
actor:send("Hello #" .. i)

0 commit comments

Comments
 (0)