Skip to content

Commit 4125dcd

Browse files
committed
Revert "uasyncio: WIP lots of fixes and changes needs splitting and documenting"
This reverts commit 08ccfa8.
1 parent 25fb042 commit 4125dcd

File tree

2 files changed

+48
-81
lines changed

2 files changed

+48
-81
lines changed

uasyncio.core/uasyncio/core.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def call_at(self, time, callback, *args, exc=None):
3636
# Including self.cnt is a workaround per heapq docs
3737
if __debug__:
3838
log.debug("Scheduling %s", (time, self.cnt, callback, args, exc))
39-
heapq.heappush(self.q, (time, self.cnt, callback, args, exc, False))
39+
heapq.heappush(self.q, (time, self.cnt, callback, args, exc))
4040
# print(self.q)
4141
self.cnt += 1
4242

@@ -50,30 +50,24 @@ def wait(self, delay):
5050
def cancel(self, callback, exc = CancelledError):
5151
_id = id(callback)
5252
for idx, item in enumerate(self.q):
53-
t, cnt, cb, args, _exc = item
53+
t, cnt, cb, args, exc = item
5454
if id(cb) != _id:
5555
continue
56-
if __debug__:
57-
log.debug("Setting discard flag on: %s at index %d", (t, cnt, cb, args, _exc), idx)
58-
self.q[idx] = t, cnt, cb, args, _exc, True
56+
del self.q[idx]
57+
heapq.heapify(self.q)
5958
self.call_at(0, cb, *args, exc=exc)
59+
return
6060
self.remove_polled_cb(callback)
6161

6262
def run_forever(self):
6363
while True:
6464
if self.q:
65-
tnow = self.time()
66-
if __debug__:
67-
log.debug('*'*20+' sched step start at %s, num tasks in queue %d', tnow, len(self.q))
68-
t, cnt, cb, args, exc, discard = heapq.heappop(self.q)
69-
delay = t - tnow
65+
t, cnt, cb, args, exc = heapq.heappop(self.q)
7066
if __debug__:
71-
log.debug("Next coroutine to run in %s: %s", delay, (t, cnt, cb, args, exc))
72-
if discard:
73-
if __debug__:
74-
log.debug("Discarding: %s", (t, cnt, cb, args, exc, discard))
75-
continue
67+
log.debug("Next coroutine to run: %s", (t, cnt, cb, args, exc))
7668
# __main__.mem_info()
69+
tnow = self.time()
70+
delay = t - tnow
7771
if delay > 0 and not exc:
7872
self.call_at(t, cb, *args)
7973
self.wait(delay)
@@ -83,9 +77,8 @@ def run_forever(self):
8377
# Assuming IO completion scheduled some tasks
8478
continue
8579
# cancelled callbacks aren't called and nor rescheduled
86-
if callable(cb):
87-
if not exc:
88-
cb(*args)
80+
if callable(cb) and not exc:
81+
cb(*args)
8982
else:
9083
delay = 0
9184
try:
@@ -118,9 +111,9 @@ def run_forever(self):
118111
self.add_writer(arg.fileno(), cb)
119112
continue
120113
elif isinstance(ret, IOReadDone):
121-
self.remove_reader(arg.fileno(), cb)
114+
self.remove_reader(arg.fileno())
122115
elif isinstance(ret, IOWriteDone):
123-
self.remove_writer(arg.fileno(), cb)
116+
self.remove_writer(arg.fileno())
124117
elif isinstance(ret, StopLoop):
125118
return arg
126119
elif isinstance(ret, type_gen):

uasyncio/uasyncio/__init__.py

Lines changed: 35 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -11,95 +11,69 @@ def __init__(self):
1111
self.poller = select.poll()
1212
self.objmap = {}
1313

14-
def _unregister_fd(self, fd):
15-
self.objmap.pop(fd, None)
16-
try:
17-
self.poller.unregister(fd)
18-
except OSError as e:
19-
if e.args[0] != errno.ENOENT:
20-
raise
21-
22-
def remove_polled_cb(self, cb):
23-
_id = id(cb)
24-
for fd, cbs in self.objmap.items():
25-
cbs.pop(id(cb), None)
26-
if not cbs:
27-
self._unregister_fd(fd)
14+
def remove_polled_cb(self, _id):
15+
for fd, cb in self.objmap.items():
16+
if id(cb) == _id:
17+
self.poller.unregister(fd)
18+
break
2819

2920
def add_reader(self, fd, cb, *args):
3021
if __debug__:
3122
log.debug("add_reader%s", (fd, cb, args))
32-
cbs = self.objmap.setdefault(fd, {})
33-
self.poller.register(fd, select.POLLIN)
3423
if args:
35-
cbs[id(cb)] = (cb, args)
24+
self.poller.register(fd, select.POLLIN)
25+
self.objmap[fd] = (cb, args)
3626
else:
37-
cbs[id(cb)] = (cb, None)
27+
self.poller.register(fd, select.POLLIN)
28+
self.objmap[fd] = cb
3829

39-
def remove_reader(self, fd, cb):
30+
def remove_reader(self, fd):
4031
if __debug__:
41-
log.debug("remove_reader(%s)", (fd, cb))
42-
cbs = self.objmap.get(fd, {})
43-
cbs.pop(id(cb), None)
44-
if not cbs:
45-
self._unregister_fd(fd)
32+
log.debug("remove_reader(%s)", fd)
33+
self.poller.unregister(fd)
34+
del self.objmap[fd]
4635

4736
def add_writer(self, fd, cb, *args):
4837
if __debug__:
4938
log.debug("add_writer%s", (fd, cb, args))
50-
cbs = self.objmap.setdefault(fd, {})
51-
self.poller.register(fd, select.POLLOUT)
5239
if args:
53-
cbs[id(cb)] = (cb, args)
40+
self.poller.register(fd, select.POLLOUT)
41+
self.objmap[fd] = (cb, args)
5442
else:
55-
cbs[id(cb)] = (cb, None)
43+
self.poller.register(fd, select.POLLOUT)
44+
self.objmap[fd] = cb
5645

57-
def remove_writer(self, fd, cb):
46+
def remove_writer(self, fd):
5847
if __debug__:
5948
log.debug("remove_writer(%s)", fd)
60-
cbs = self.objmap.get(fd, {})
61-
cbs.pop(id(cb), None)
62-
if not cbs:
63-
self._unregister_fd(fd)
49+
try:
50+
self.poller.unregister(fd)
51+
self.objmap.pop(fd, None)
52+
except OSError as e:
53+
# StreamWriter.awrite() first tries to write to an fd,
54+
# and if that succeeds, yield IOWrite may never be called
55+
# for that fd, and it will never be added to poller. So,
56+
# ignore such error.
57+
if e.args[0] != errno.ENOENT:
58+
raise
6459

6560
def wait(self, delay):
6661
if __debug__:
67-
log.debug("epoll.wait(%s)", delay)
68-
for fd, cbs in self.objmap.items():
69-
for cb, args in cbs.values():
70-
log.debug("epoll.registered(%d) %s", fd, (cb, args))
71-
62+
log.debug("epoll.wait(%d)", delay)
7263
# We need one-shot behavior (second arg of 1 to .poll())
7364
if delay == -1:
7465
res = self.poller.poll(-1, 1)
7566
else:
7667
res = self.poller.poll(int(delay * 1000), 1)
7768
#log.debug("epoll result: %s", res)
7869
for fd, ev in res:
79-
# Remove the registered callbacks dictionary from its parent
80-
# so when callbacks are invoked they can add their registrations
81-
# to a fresh dictionary.
82-
cbs = self.objmap.pop(fd, {})
83-
if not cbs:
84-
log.error("Event %d on fd %r but no callback registered", ev, fd)
85-
continue
70+
cb = self.objmap[fd]
8671
if __debug__:
87-
s = '\n'.join(str(v) for v in cbs.values())
88-
log.debug("Matching IO callbacks for %r:\n%s", (fd, ev), s)
89-
while cbs:
90-
_id, data = cbs.popitem()
91-
cb, args = data
92-
if args is None:
93-
if __debug__:
94-
log.debug("Scheduling IO coro: %r", (fd, ev, cb))
95-
self.call_soon(cb)
96-
else:
97-
if __debug__:
98-
log.debug("Calling IO callback: %r", (fd, ev, cb, args))
99-
cb(*args)
100-
# If no callback registered an event for this fd unregister it
101-
if not self.objmap.get(fd, None):
102-
self._unregister_fd(fd)
72+
log.debug("Calling IO callback: %r", cb)
73+
if isinstance(cb, tuple):
74+
cb[0](*cb[1])
75+
else:
76+
self.call_soon(cb)
10377

10478

10579
class StreamReader:

0 commit comments

Comments
 (0)