diff --git a/cassandra/connection.py b/cassandra/connection.py index 9ac02c9776..c527b10b6d 100644 --- a/cassandra/connection.py +++ b/cassandra/connection.py @@ -630,14 +630,25 @@ def readable_io_bytes(self): def readable_cql_frame_bytes(self): return self.cql_frame_buffer.tell() + @staticmethod + def _reset_buffer(buf): + """ + Reset a BytesIO buffer by discarding consumed data. + Avoid an intermediate bytes copy from .read(); slice the existing buffer. + BytesIO will still materialize its own backing store, but this reduces + one full-buffer allocation on the hot receive path. + """ + pos = buf.tell() + new_buf = io.BytesIO(buf.getbuffer()[pos:]) + new_buf.seek(0, 2) # 2 == SEEK_END + return new_buf + def reset_io_buffer(self): - self._io_buffer = io.BytesIO(self._io_buffer.read()) - self._io_buffer.seek(0, 2) # 2 == SEEK_END + self._io_buffer = self._reset_buffer(self._io_buffer) def reset_cql_frame_buffer(self): if self.is_checksumming_enabled: - self._cql_frame_buffer = io.BytesIO(self._cql_frame_buffer.read()) - self._cql_frame_buffer.seek(0, 2) # 2 == SEEK_END + self._cql_frame_buffer = self._reset_buffer(self._cql_frame_buffer) else: self.reset_io_buffer() @@ -1182,9 +1193,10 @@ def control_conn_disposed(self): @defunct_on_error def _read_frame_header(self): - buf = self._io_buffer.cql_frame_buffer.getvalue() - pos = len(buf) + cql_buf = self._io_buffer.cql_frame_buffer + pos = cql_buf.tell() if pos: + buf = cql_buf.getbuffer() version = buf[0] & PROTOCOL_VERSION_MASK if version not in ProtocolVersion.SUPPORTED_VERSIONS: raise ProtocolError("This version of the driver does not support protocol version %d" % version) @@ -1248,8 +1260,12 @@ def process_io_buffer(self): return else: frame = self._current_frame - self._io_buffer.cql_frame_buffer.seek(frame.body_offset) - msg = self._io_buffer.cql_frame_buffer.read(frame.end_pos - frame.body_offset) + # Use memoryview to avoid intermediate allocation, then convert to bytes + # Explicitly scope the buffer to ensure memoryview is released before reset + cql_buf = self._io_buffer.cql_frame_buffer + msg = bytes(cql_buf.getbuffer()[frame.body_offset:frame.end_pos]) + # Advance buffer position to end of frame before reset + cql_buf.seek(frame.end_pos) self.process_msg(frame, msg) self._io_buffer.reset_cql_frame_buffer() self._current_frame = None diff --git a/cassandra/protocol.py b/cassandra/protocol.py index e574965de8..48d891cda2 100644 --- a/cassandra/protocol.py +++ b/cassandra/protocol.py @@ -53,6 +53,34 @@ class NotSupportedError(Exception): class InternalError(Exception): pass + +class BytesReader: + """ + Lightweight reader for bytes data without BytesIO overhead. + Provides the same read() interface but operates directly on a + bytes or memoryview object, avoiding internal buffer copies. + """ + __slots__ = ('_data', '_pos', '_size') + + def __init__(self, data): + self._data = data + self._pos = 0 + self._size = len(data) + + def read(self, n=-1): + if n < 0: + result = self._data[self._pos:] + self._pos = self._size + else: + end = self._pos + n + if end > self._size: + raise EOFError("Cannot read past the end of the buffer") + result = self._data[self._pos:end] + self._pos = end + # Return bytes to maintain compatibility with unpack functions + return bytes(result) if isinstance(result, memoryview) else result + + ColumnMetadata = namedtuple("ColumnMetadata", ['keyspace_name', 'table_name', 'name', 'type']) HEADER_DIRECTION_TO_CLIENT = 0x80 @@ -1142,7 +1170,8 @@ def decode_message(cls, protocol_version, protocol_features, user_type_map, stre body = decompressor(body) flags ^= COMPRESSED_FLAG - body = io.BytesIO(body) + # Use lightweight BytesReader instead of io.BytesIO to avoid buffer copy + body = BytesReader(body) if flags & TRACING_FLAG: trace_id = UUID(bytes=body.read(16)) flags ^= TRACING_FLAG