Skip to content

Commit cd28a4e

Browse files
committed
Issue #26050: Add asyncio.StreamReader.readuntil() method.
Patch by Марк Коренберг.
1 parent b0f5401 commit cd28a4e

File tree

3 files changed

+317
-39
lines changed

3 files changed

+317
-39
lines changed

Lib/asyncio/streams.py

Lines changed: 187 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
__all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
44
'open_connection', 'start_server',
55
'IncompleteReadError',
6+
'LimitOverrunError',
67
]
78

89
import socket
@@ -27,15 +28,28 @@ class IncompleteReadError(EOFError):
2728
Incomplete read error. Attributes:
2829
2930
- partial: read bytes string before the end of stream was reached
30-
- expected: total number of expected bytes
31+
- expected: total number of expected bytes (or None if unknown)
3132
"""
3233
def __init__(self, partial, expected):
33-
EOFError.__init__(self, "%s bytes read on a total of %s expected bytes"
34-
% (len(partial), expected))
34+
super().__init__("%d bytes read on a total of %r expected bytes"
35+
% (len(partial), expected))
3536
self.partial = partial
3637
self.expected = expected
3738

3839

40+
class LimitOverrunError(Exception):
41+
"""Reached buffer limit while looking for the separator.
42+
43+
Attributes:
44+
- message: error message
45+
- consumed: total number of bytes that should be consumed
46+
"""
47+
def __init__(self, message, consumed):
48+
super().__init__(message)
49+
self.message = message
50+
self.consumed = consumed
51+
52+
3953
@coroutine
4054
def open_connection(host=None, port=None, *,
4155
loop=None, limit=_DEFAULT_LIMIT, **kwds):
@@ -318,6 +332,10 @@ class StreamReader:
318332
def __init__(self, limit=_DEFAULT_LIMIT, loop=None):
319333
# The line length limit is a security feature;
320334
# it also doubles as half the buffer limit.
335+
336+
if limit <= 0:
337+
raise ValueError('Limit cannot be <= 0')
338+
321339
self._limit = limit
322340
if loop is None:
323341
self._loop = events.get_event_loop()
@@ -361,7 +379,7 @@ def set_exception(self, exc):
361379
waiter.set_exception(exc)
362380

363381
def _wakeup_waiter(self):
364-
"""Wakeup read() or readline() function waiting for data or EOF."""
382+
"""Wakeup read*() functions waiting for data or EOF."""
365383
waiter = self._waiter
366384
if waiter is not None:
367385
self._waiter = None
@@ -409,7 +427,10 @@ def feed_data(self, data):
409427

410428
@coroutine
411429
def _wait_for_data(self, func_name):
412-
"""Wait until feed_data() or feed_eof() is called."""
430+
"""Wait until feed_data() or feed_eof() is called.
431+
432+
If stream was paused, automatically resume it.
433+
"""
413434
# StreamReader uses a future to link the protocol feed_data() method
414435
# to a read coroutine. Running two read coroutines at the same time
415436
# would have an unexpected behaviour. It would not possible to know
@@ -418,6 +439,13 @@ def _wait_for_data(self, func_name):
418439
raise RuntimeError('%s() called while another coroutine is '
419440
'already waiting for incoming data' % func_name)
420441

442+
assert not self._eof, '_wait_for_data after EOF'
443+
444+
# Waiting for data while paused will make deadlock, so prevent it.
445+
if self._paused:
446+
self._paused = False
447+
self._transport.resume_reading()
448+
421449
self._waiter = futures.Future(loop=self._loop)
422450
try:
423451
yield from self._waiter
@@ -426,43 +454,150 @@ def _wait_for_data(self, func_name):
426454

427455
@coroutine
428456
def readline(self):
457+
"""Read chunk of data from the stream until newline (b'\n') is found.
458+
459+
On success, return chunk that ends with newline. If only partial
460+
line can be read due to EOF, return incomplete line without
461+
terminating newline. When EOF was reached while no bytes read, empty
462+
bytes object is returned.
463+
464+
If limit is reached, ValueError will be raised. In that case, if
465+
newline was found, complete line including newline will be removed
466+
from internal buffer. Else, internal buffer will be cleared. Limit is
467+
compared against part of the line without newline.
468+
469+
If stream was paused, this function will automatically resume it if
470+
needed.
471+
"""
472+
sep = b'\n'
473+
seplen = len(sep)
474+
try:
475+
line = yield from self.readuntil(sep)
476+
except IncompleteReadError as e:
477+
return e.partial
478+
except LimitOverrunError as e:
479+
if self._buffer.startswith(sep, e.consumed):
480+
del self._buffer[:e.consumed + seplen]
481+
else:
482+
self._buffer.clear()
483+
self._maybe_resume_transport()
484+
raise ValueError(e.args[0])
485+
return line
486+
487+
@coroutine
488+
def readuntil(self, separator=b'\n'):
489+
"""Read chunk of data from the stream until `separator` is found.
490+
491+
On success, chunk and its separator will be removed from internal buffer
492+
(i.e. consumed). Returned chunk will include separator at the end.
493+
494+
Configured stream limit is used to check result. Limit means maximal
495+
length of chunk that can be returned, not counting the separator.
496+
497+
If EOF occurs and complete separator still not found,
498+
IncompleteReadError(<partial data>, None) will be raised and internal
499+
buffer becomes empty. This partial data may contain a partial separator.
500+
501+
If chunk cannot be read due to overlimit, LimitOverrunError will be raised
502+
and data will be left in internal buffer, so it can be read again, in
503+
some different way.
504+
505+
If stream was paused, this function will automatically resume it if
506+
needed.
507+
"""
508+
seplen = len(separator)
509+
if seplen == 0:
510+
raise ValueError('Separator should be at least one-byte string')
511+
429512
if self._exception is not None:
430513
raise self._exception
431514

432-
line = bytearray()
433-
not_enough = True
434-
435-
while not_enough:
436-
while self._buffer and not_enough:
437-
ichar = self._buffer.find(b'\n')
438-
if ichar < 0:
439-
line.extend(self._buffer)
440-
self._buffer.clear()
441-
else:
442-
ichar += 1
443-
line.extend(self._buffer[:ichar])
444-
del self._buffer[:ichar]
445-
not_enough = False
446-
447-
if len(line) > self._limit:
448-
self._maybe_resume_transport()
449-
raise ValueError('Line is too long')
515+
# Consume whole buffer except last bytes, which length is
516+
# one less than seplen. Let's check corner cases with
517+
# separator='SEPARATOR':
518+
# * we have received almost complete separator (without last
519+
# byte). i.e buffer='some textSEPARATO'. In this case we
520+
# can safely consume len(separator) - 1 bytes.
521+
# * last byte of buffer is first byte of separator, i.e.
522+
# buffer='abcdefghijklmnopqrS'. We may safely consume
523+
# everything except that last byte, but this require to
524+
# analyze bytes of buffer that match partial separator.
525+
# This is slow and/or require FSM. For this case our
526+
# implementation is not optimal, since require rescanning
527+
# of data that is known to not belong to separator. In
528+
# real world, separator will not be so long to notice
529+
# performance problems. Even when reading MIME-encoded
530+
# messages :)
531+
532+
# `offset` is the number of bytes from the beginning of the buffer where
533+
# is no occurrence of `separator`.
534+
offset = 0
535+
536+
# Loop until we find `separator` in the buffer, exceed the buffer size,
537+
# or an EOF has happened.
538+
while True:
539+
buflen = len(self._buffer)
540+
541+
# Check if we now have enough data in the buffer for `separator` to
542+
# fit.
543+
if buflen - offset >= seplen:
544+
isep = self._buffer.find(separator, offset)
545+
546+
if isep != -1:
547+
# `separator` is in the buffer. `isep` will be used later to
548+
# retrieve the data.
549+
break
550+
551+
# see upper comment for explanation.
552+
offset = buflen + 1 - seplen
553+
if offset > self._limit:
554+
raise LimitOverrunError('Separator is not found, and chunk exceed the limit', offset)
450555

556+
# Complete message (with full separator) may be present in buffer
557+
# even when EOF flag is set. This may happen when the last chunk
558+
# adds data which makes separator be found. That's why we check for
559+
# EOF *ater* inspecting the buffer.
451560
if self._eof:
452-
break
561+
chunk = bytes(self._buffer)
562+
self._buffer.clear()
563+
raise IncompleteReadError(chunk, None)
564+
565+
# _wait_for_data() will resume reading if stream was paused.
566+
yield from self._wait_for_data('readuntil')
453567

454-
if not_enough:
455-
yield from self._wait_for_data('readline')
568+
if isep > self._limit:
569+
raise LimitOverrunError('Separator is found, but chunk is longer than limit', isep)
456570

571+
chunk = self._buffer[:isep + seplen]
572+
del self._buffer[:isep + seplen]
457573
self._maybe_resume_transport()
458-
return bytes(line)
574+
return bytes(chunk)
459575

460576
@coroutine
461577
def read(self, n=-1):
578+
"""Read up to `n` bytes from the stream.
579+
580+
If n is not provided, or set to -1, read until EOF and return all read
581+
bytes. If the EOF was received and the internal buffer is empty, return
582+
an empty bytes object.
583+
584+
If n is zero, return empty bytes object immediatelly.
585+
586+
If n is positive, this function try to read `n` bytes, and may return
587+
less or equal bytes than requested, but at least one byte. If EOF was
588+
received before any byte is read, this function returns empty byte
589+
object.
590+
591+
Returned value is not limited with limit, configured at stream creation.
592+
593+
If stream was paused, this function will automatically resume it if
594+
needed.
595+
"""
596+
462597
if self._exception is not None:
463598
raise self._exception
464599

465-
if not n:
600+
if n == 0:
466601
return b''
467602

468603
if n < 0:
@@ -477,29 +612,41 @@ def read(self, n=-1):
477612
break
478613
blocks.append(block)
479614
return b''.join(blocks)
480-
else:
481-
if not self._buffer and not self._eof:
482-
yield from self._wait_for_data('read')
483615

484-
if n < 0 or len(self._buffer) <= n:
485-
data = bytes(self._buffer)
486-
self._buffer.clear()
487-
else:
488-
# n > 0 and len(self._buffer) > n
489-
data = bytes(self._buffer[:n])
490-
del self._buffer[:n]
616+
if not self._buffer and not self._eof:
617+
yield from self._wait_for_data('read')
618+
619+
# This will work right even if buffer is less than n bytes
620+
data = bytes(self._buffer[:n])
621+
del self._buffer[:n]
491622

492623
self._maybe_resume_transport()
493624
return data
494625

495626
@coroutine
496627
def readexactly(self, n):
628+
"""Read exactly `n` bytes.
629+
630+
Raise an `IncompleteReadError` if EOF is reached before `n` bytes can be
631+
read. The `IncompleteReadError.partial` attribute of the exception will
632+
contain the partial read bytes.
633+
634+
if n is zero, return empty bytes object.
635+
636+
Returned value is not limited with limit, configured at stream creation.
637+
638+
If stream was paused, this function will automatically resume it if
639+
needed.
640+
"""
497641
if n < 0:
498642
raise ValueError('readexactly size can not be less than zero')
499643

500644
if self._exception is not None:
501645
raise self._exception
502646

647+
if n == 0:
648+
return b''
649+
503650
# There used to be "optimized" code here. It created its own
504651
# Future and waited until self._buffer had at least the n
505652
# bytes, then called read(n). Unfortunately, this could pause
@@ -516,6 +663,8 @@ def readexactly(self, n):
516663
blocks.append(block)
517664
n -= len(block)
518665

666+
assert n == 0
667+
519668
return b''.join(blocks)
520669

521670
if compat.PY35:

0 commit comments

Comments
 (0)