Skip to content

Commit 5f3efe3

Browse files
committed
PythonParser uses a separate method for filling the IO buffer
1 parent 858b440 commit 5f3efe3

File tree

1 file changed

+43
-56
lines changed

1 file changed

+43
-56
lines changed

redis/asyncio/connection.py

Lines changed: 43 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -208,19 +208,14 @@ async def read_response(
208208
class PythonParser(BaseParser):
209209
"""Plain Python parsing class"""
210210

211-
__slots__ = BaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks")
211+
__slots__ = BaseParser.__slots__ + ("encoder", "_buffer", "_pos")
212212

213213
def __init__(self, socket_read_size: int):
214214
super().__init__(socket_read_size)
215215
self.encoder: Optional[Encoder] = None
216216
self._buffer = b""
217-
self._chunks = []
218217
self._pos = 0
219218

220-
def _clear(self):
221-
self._buffer = b""
222-
self._chunks.clear()
223-
224219
def on_connect(self, connection: "Connection"):
225220
"""Called when the stream connects"""
226221
self._stream = connection._reader
@@ -234,6 +229,7 @@ def on_disconnect(self):
234229
if self._stream is not None:
235230
self._stream = None
236231
self.encoder = None
232+
self._buffer = b""
237233

238234
async def can_read_destructive(self) -> bool:
239235
if self._buffer:
@@ -247,30 +243,37 @@ async def can_read_destructive(self) -> bool:
247243
return False
248244

249245
async def read_response(self, disable_decoding: bool = False):
250-
if self._stream is None:
251-
raise RedisError("Buffer is closed.")
252-
if self._chunks:
253-
# augment parsing buffer with previously read data
254-
self._buffer += b"".join(self._chunks)
255-
self._chunks.clear()
256-
try:
246+
if not self._stream or not self.encoder:
247+
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
248+
249+
if not self._buffer:
250+
await self._fill_buffer()
251+
while True:
257252
self._pos = 0
258-
response = await self._read_response(disable_decoding=disable_decoding)
259-
except (ConnectionError, InvalidResponse):
260-
# We don't want these errors to be resumable
261-
self._clear()
262-
raise
263-
else:
264-
# Successfully parsing a response allows us to clear our parsing buffer
265-
self._clear()
266-
return response
253+
try:
254+
response = self._read_response(disable_decoding=disable_decoding)
267255

268-
async def _read_response(
256+
except EOFError:
257+
await self._fill_buffer()
258+
else:
259+
break
260+
# Successfully parsing a response allows us to clear our parsing buffer
261+
self._buffer = self._buffer[self._pos :]
262+
return response
263+
264+
async def _fill_buffer(self):
265+
"""
266+
IO is performed here
267+
"""
268+
buffer = await self._stream.read(self._read_size)
269+
if not buffer or not isinstance(buffer, bytes):
270+
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
271+
self._buffer += buffer
272+
273+
def _read_response(
269274
self, disable_decoding: bool = False
270275
) -> Union[EncodableT, ResponseError, None]:
271-
if not self._stream or not self.encoder:
272-
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
273-
raw = await self._readline()
276+
raw = self._readline()
274277
if not raw:
275278
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
276279
response: Any
@@ -286,7 +289,7 @@ async def _read_response(
286289
# if the error is a ConnectionError, raise immediately so the user
287290
# is notified
288291
if isinstance(error, ConnectionError):
289-
self._clear() # Successful parse
292+
self._buffer = self._buffer[self._pos :] # Successful parse
290293
raise error
291294
# otherwise, we're dealing with a ResponseError that might belong
292295
# inside a pipeline response. the connection's read_response()
@@ -304,55 +307,39 @@ async def _read_response(
304307
length = int(response)
305308
if length == -1:
306309
return None
307-
response = await self._read(length)
310+
response = self._read(length)
308311
# multi-bulk response
309312
elif byte == b"*":
310313
length = int(response)
311314
if length == -1:
312315
return None
313-
response = [
314-
(await self._read_response(disable_decoding)) for _ in range(length)
315-
]
316+
response = [(self._read_response(disable_decoding)) for _ in range(length)]
316317
if isinstance(response, bytes) and disable_decoding is False:
317318
response = self.encoder.decode(response)
318319
return response
319320

320-
async def _read(self, length: int) -> bytes:
321+
def _read(self, length: int) -> bytes:
321322
"""
322323
Read `length` bytes of data. These are assumed to be followed
323324
by a '\r\n' terminator which is subsequently discarded.
324325
"""
325-
want = length + 2
326-
end = self._pos + want
327-
if len(self._buffer) >= end:
328-
result = self._buffer[self._pos : end - 2]
329-
else:
330-
tail = self._buffer[self._pos :]
331-
try:
332-
data = await self._stream.readexactly(want - len(tail))
333-
except asyncio.IncompleteReadError as error:
334-
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error
335-
result = (tail + data)[:-2]
336-
self._chunks.append(data)
337-
self._pos += want
326+
end = self._pos + length + 2
327+
if len(self._buffer) < end:
328+
raise EOFError() # Signal that we need more data
329+
result = self._buffer[self._pos : end - 2]
330+
self._pos = end
338331
return result
339332

340-
async def _readline(self) -> bytes:
333+
def _readline(self) -> bytes:
341334
"""
342335
read an unknown number of bytes up to the next '\r\n'
343336
line separator, which is discarded.
344337
"""
345338
found = self._buffer.find(b"\r\n", self._pos)
346-
if found >= 0:
347-
result = self._buffer[self._pos : found]
348-
else:
349-
tail = self._buffer[self._pos :]
350-
data = await self._stream.readline()
351-
if not data.endswith(b"\r\n"):
352-
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
353-
result = (tail + data)[:-2]
354-
self._chunks.append(data)
355-
self._pos += len(result) + 2
339+
if found < 0:
340+
raise EOFError() # signal that we need more data
341+
result = self._buffer[self._pos : found]
342+
self._pos = found + 2
356343
return result
357344

358345

0 commit comments

Comments
 (0)