From 0b2ad28cb6f288c82524ad8664ed41c49d9ad660 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Fri, 9 Jan 2026 21:00:27 +0200 Subject: [PATCH 1/3] Reduce data copies in connection receive path Optimize buffer management in `_ConnectionIOBuffer` to avoid unnecessary byte allocations during high-throughput reads. 1. **Buffer Compaction**: Replaced `io.BytesIO(buffer.read())` with a `getbuffer()` slice. The previous method `read()` allocated a new `bytes` object for the remaining content before creating the new generic `BytesIO`. The new approach uses a zero-copy memoryview slice for initialization. 2. **Header Peeking**: Replaced `getvalue()` in `_read_frame_header` with `getbuffer()`. This allows inspecting the protocol version and frame length without materializing the entire buffer contents into a new `bytes` string. Signed-off-by: Yaniv Kaul --- cassandra/connection.py | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/cassandra/connection.py b/cassandra/connection.py index 9ac02c9776..7a1f134c4e 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -630,14 +630,25 @@ def readable_io_bytes(self): def readable_cql_frame_bytes(self): return self.cql_frame_buffer.tell() + @staticmethod + def _reset_buffer(buf): + """ + Reset a BytesIO buffer by discarding consumed data. + Avoid an intermediate bytes copy from .read(); slice the existing buffer. + BytesIO will still materialize its own backing store, but this reduces + one full-buffer allocation on the hot receive path. + """ + pos = buf.tell() + new_buf = io.BytesIO(buf.getbuffer()[pos:]) + new_buf.seek(0, 2) # 2 == SEEK_END + return new_buf + def reset_io_buffer(self): - self._io_buffer = io.BytesIO(self._io_buffer.read()) - self._io_buffer.seek(0, 2) # 2 == SEEK_END + self._io_buffer = self._reset_buffer(self._io_buffer) def reset_cql_frame_buffer(self): if self.is_checksumming_enabled: - self._cql_frame_buffer = io.BytesIO(self._cql_frame_buffer.read()) - self._cql_frame_buffer.seek(0, 2) # 2 == SEEK_END + self._cql_frame_buffer = self._reset_buffer(self._cql_frame_buffer) else: self.reset_io_buffer() @@ -1182,9 +1193,10 @@ def control_conn_disposed(self): @defunct_on_error def _read_frame_header(self): - buf = self._io_buffer.cql_frame_buffer.getvalue() - pos = len(buf) + cql_buf = self._io_buffer.cql_frame_buffer + pos = cql_buf.tell() if pos: + buf = cql_buf.getbuffer() version = buf[0] & PROTOCOL_VERSION_MASK if version not in ProtocolVersion.SUPPORTED_VERSIONS: raise ProtocolError("This version of the driver does not support protocol version %d" % version) From c59934c3c084bd2d8c7b06ae2e34ae41e3393a81 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Fri, 9 Jan 2026 22:26:15 +0200 Subject: [PATCH 2/3] Optimize frame body extraction to reduce memory copies Replace BytesIO.read() with direct buffer slicing to eliminate one intermediate bytes allocation per received message frame. Changes: - Use getbuffer() to get memoryview of underlying buffer - Slice directly at [body_offset:end_pos] instead of seek+read - Convert memoryview slice to bytes in single operation - Maintain buffer position tracking for proper reset behavior Benefits: - Eliminates one full-frame allocation on hot receive path - Maintains compatibility with existing protocol decoder The memoryview is immediately converted to bytes and released, preventing buffer resize issues while still gaining the allocation savings. Signed-off-by: Yaniv Kaul --- cassandra/connection.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cassandra/connection.py b/cassandra/connection.py index 7a1f134c4e..c527b10b6d 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -1260,8 +1260,12 @@ def process_io_buffer(self): return else: frame = self._current_frame - self._io_buffer.cql_frame_buffer.seek(frame.body_offset) - msg = self._io_buffer.cql_frame_buffer.read(frame.end_pos - frame.body_offset) + # Use memoryview to avoid intermediate allocation, then convert to bytes + # Explicitly scope the buffer to ensure memoryview is released before reset + cql_buf = self._io_buffer.cql_frame_buffer + msg = bytes(cql_buf.getbuffer()[frame.body_offset:frame.end_pos]) + # Advance buffer position to end of frame before reset + cql_buf.seek(frame.end_pos) self.process_msg(frame, msg) self._io_buffer.reset_cql_frame_buffer() self._current_frame = None From b1b0b3ddeb4fba6f7677991918e1770d834ae250 Mon Sep 17 00:00:00 2001 From: Yaniv Michael Kaul Date: Fri, 9 Jan 2026 22:56:04 +0200 Subject: [PATCH 3/3] Add BytesReader to replace BytesIO in decode_message() Introduce a lightweight BytesReader class that provides the same read() interface as io.BytesIO but operates directly on the input bytes/memoryview without internal buffering overhead. Changes: - Add BytesReader class with __slots__ for memory efficiency - Replace io.BytesIO(body) with BytesReader(body) in decode_message() - BytesReader.read() returns slices directly, converting memoryview to bytes only when necessary for compatibility Benefits: - Eliminates BytesIO's internal buffer allocation and management - Reduces memory overhead for protocol message decoding - Works seamlessly with both bytes and memoryview inputs - Maintains full API compatibility with existing read_* functions The BytesReader is a minimal implementation focused on the read() method needed by the protocol decoder. It avoids the overhead of io.BytesIO's full file-like interface. Signed-off-by: Yaniv Kaul --- cassandra/protocol.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/cassandra/protocol.py b/cassandra/protocol.py index e574965de8..48d891cda2 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -53,6 +53,34 @@ class NotSupportedError(Exception): class InternalError(Exception): pass + +class BytesReader: + """ + Lightweight reader for bytes data without BytesIO overhead. + Provides the same read() interface but operates directly on a + bytes or memoryview object, avoiding internal buffer copies. + """ + __slots__ = ('_data', '_pos', '_size') + + def __init__(self, data): + self._data = data + self._pos = 0 + self._size = len(data) + + def read(self, n=-1): + if n < 0: + result = self._data[self._pos:] + self._pos = self._size + else: + end = self._pos + n + if end > self._size: + raise EOFError("Cannot read past the end of the buffer") + result = self._data[self._pos:end] + self._pos = end + # Return bytes to maintain compatibility with unpack functions + return bytes(result) if isinstance(result, memoryview) else result + + ColumnMetadata = namedtuple("ColumnMetadata", ['keyspace_name', 'table_name', 'name', 'type']) HEADER_DIRECTION_TO_CLIENT = 0x80 @@ -1142,7 +1170,8 @@ def decode_message(cls, protocol_version, protocol_features, user_type_map, stre body = decompressor(body) flags ^= COMPRESSED_FLAG - body = io.BytesIO(body) + # Use lightweight BytesReader instead of io.BytesIO to avoid buffer copy + body = BytesReader(body) if flags & TRACING_FLAG: trace_id = UUID(bytes=body.read(16)) flags ^= TRACING_FLAG