Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,14 +630,25 @@ def readable_io_bytes(self):
def readable_cql_frame_bytes(self):
return self.cql_frame_buffer.tell()

@staticmethod
def _reset_buffer(buf):
"""
Reset a BytesIO buffer by discarding consumed data.
Avoid an intermediate bytes copy from .read(); slice the existing buffer.
BytesIO will still materialize its own backing store, but this reduces
one full-buffer allocation on the hot receive path.
"""
pos = buf.tell()
new_buf = io.BytesIO(buf.getbuffer()[pos:])
new_buf.seek(0, 2) # 2 == SEEK_END
return new_buf

def reset_io_buffer(self):
self._io_buffer = io.BytesIO(self._io_buffer.read())
self._io_buffer.seek(0, 2) # 2 == SEEK_END
self._io_buffer = self._reset_buffer(self._io_buffer)

def reset_cql_frame_buffer(self):
if self.is_checksumming_enabled:
self._cql_frame_buffer = io.BytesIO(self._cql_frame_buffer.read())
self._cql_frame_buffer.seek(0, 2) # 2 == SEEK_END
self._cql_frame_buffer = self._reset_buffer(self._cql_frame_buffer)
else:
self.reset_io_buffer()

Expand Down Expand Up @@ -1182,9 +1193,10 @@ def control_conn_disposed(self):

@defunct_on_error
def _read_frame_header(self):
buf = self._io_buffer.cql_frame_buffer.getvalue()
pos = len(buf)
cql_buf = self._io_buffer.cql_frame_buffer
pos = cql_buf.tell()
if pos:
buf = cql_buf.getbuffer()
version = buf[0] & PROTOCOL_VERSION_MASK
if version not in ProtocolVersion.SUPPORTED_VERSIONS:
raise ProtocolError("This version of the driver does not support protocol version %d" % version)
Expand Down Expand Up @@ -1248,8 +1260,12 @@ def process_io_buffer(self):
return
else:
frame = self._current_frame
self._io_buffer.cql_frame_buffer.seek(frame.body_offset)
msg = self._io_buffer.cql_frame_buffer.read(frame.end_pos - frame.body_offset)
# Use memoryview to avoid intermediate allocation, then convert to bytes
# Explicitly scope the buffer to ensure memoryview is released before reset
cql_buf = self._io_buffer.cql_frame_buffer
msg = bytes(cql_buf.getbuffer()[frame.body_offset:frame.end_pos])
# Advance buffer position to end of frame before reset
cql_buf.seek(frame.end_pos)
self.process_msg(frame, msg)
self._io_buffer.reset_cql_frame_buffer()
self._current_frame = None
Expand Down
31 changes: 30 additions & 1 deletion cassandra/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,34 @@ class NotSupportedError(Exception):
class InternalError(Exception):
pass


class BytesReader:
"""
Lightweight reader for bytes data without BytesIO overhead.
Provides the same read() interface but operates directly on a
bytes or memoryview object, avoiding internal buffer copies.
"""
__slots__ = ('_data', '_pos', '_size')

def __init__(self, data):
self._data = data
self._pos = 0
self._size = len(data)

def read(self, n=-1):
if n < 0:
result = self._data[self._pos:]
self._pos = self._size
else:
end = self._pos + n
if end > self._size:
raise EOFError("Cannot read past the end of the buffer")
result = self._data[self._pos:end]
self._pos = end
# Return bytes to maintain compatibility with unpack functions
return bytes(result) if isinstance(result, memoryview) else result


ColumnMetadata = namedtuple("ColumnMetadata", ['keyspace_name', 'table_name', 'name', 'type'])

HEADER_DIRECTION_TO_CLIENT = 0x80
Expand Down Expand Up @@ -1142,7 +1170,8 @@ def decode_message(cls, protocol_version, protocol_features, user_type_map, stre
body = decompressor(body)
flags ^= COMPRESSED_FLAG

body = io.BytesIO(body)
# Use lightweight BytesReader instead of io.BytesIO to avoid buffer copy
body = BytesReader(body)
if flags & TRACING_FLAG:
trace_id = UUID(bytes=body.read(16))
flags ^= TRACING_FLAG
Expand Down
Loading