From b7534951f9f25da80c51da675f0cdef7c71adcc4 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 21 Feb 2026 15:01:01 +0900 Subject: [PATCH] Extract shared boilerplate from sync cursors into WithFetch mixin Add WithFetch class to result_set.py as the sync counterpart to WithAsyncFetch, consolidating ~500 lines of duplicated properties, lifecycle methods, and fetch methods from 5 sync cursor types. Closes #669 Co-Authored-By: Claude Opus 4.6 --- pyathena/arrow/cursor.py | 89 +------------- pyathena/async_cursor.py | 3 +- pyathena/cursor.py | 241 +++----------------------------------- pyathena/pandas/cursor.py | 87 +------------- pyathena/polars/cursor.py | 142 +--------------------- pyathena/result_set.py | 138 +++++++++++++++++++++- pyathena/s3fs/cursor.py | 145 +---------------------- 7 files changed, 166 insertions(+), 679 deletions(-) diff --git a/pyathena/arrow/cursor.py b/pyathena/arrow/cursor.py index c13820cc..514333be 100644 --- a/pyathena/arrow/cursor.py +++ b/pyathena/arrow/cursor.py @@ -2,17 +2,17 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union, cast +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union, cast from pyathena.arrow.converter import ( DefaultArrowTypeConverter, DefaultArrowUnloadTypeConverter, ) from pyathena.arrow.result_set import AthenaArrowResultSet -from pyathena.common import BaseCursor, CursorIterator +from pyathena.common import CursorIterator from pyathena.error import OperationalError, ProgrammingError from pyathena.model import AthenaCompression, AthenaFileFormat, AthenaQueryExecution -from pyathena.result_set import WithResultSet +from pyathena.result_set import WithFetch if TYPE_CHECKING: import polars as pl @@ -21,7 +21,7 @@ _logger = logging.getLogger(__name__) # type: ignore -class ArrowCursor(BaseCursor, CursorIterator, WithResultSet): +class ArrowCursor(WithFetch): """Cursor for handling Apache Arrow Table results from Athena queries. This cursor returns query results as Apache Arrow Tables, which provide @@ -116,8 +116,6 @@ def __init__( self._on_start_query_execution = on_start_query_execution self._connect_timeout = connect_timeout self._request_timeout = request_timeout - self._query_id: Optional[str] = None - self._result_set: Optional[AthenaArrowResultSet] = None @staticmethod def get_default_converter( @@ -127,45 +125,6 @@ def get_default_converter( return DefaultArrowUnloadTypeConverter() return DefaultArrowTypeConverter() - @property - def arraysize(self) -> int: - return self._arraysize - - @arraysize.setter - def arraysize(self, value: int) -> None: - if value <= 0: - raise ProgrammingError("arraysize must be a positive integer value.") - self._arraysize = value - - @property # type: ignore - def result_set(self) -> Optional[AthenaArrowResultSet]: - return self._result_set - - @result_set.setter - def result_set(self, val) -> None: - self._result_set = val - - @property - def query_id(self) -> Optional[str]: - return self._query_id - - @query_id.setter - def query_id(self, val) -> None: - self._query_id = val - - @property - def rownumber(self) -> Optional[int]: - return self.result_set.rownumber if self.result_set else None - - @property - def rowcount(self) -> int: - """Get the number of rows affected by the last operation.""" - return self.result_set.rowcount if self.result_set else -1 - - def close(self) -> None: - if self.result_set and not self.result_set.is_closed: - self.result_set.close() - def execute( self, operation: str, @@ -255,46 +214,6 @@ def execute( raise OperationalError(query_execution.state_change_reason) return self - def executemany( - self, - operation: str, - seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], - **kwargs, - ) -> None: - for parameters in seq_of_parameters: - self.execute(operation, parameters, **kwargs) - # Operations that have result sets are not allowed with executemany. - self._reset_state() - - def cancel(self) -> None: - if not self.query_id: - raise ProgrammingError("QueryExecutionId is none or empty.") - self._cancel(self.query_id) - - def fetchone( - self, - ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaArrowResultSet, self.result_set) - return result_set.fetchone() - - def fetchmany( - self, size: Optional[int] = None - ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaArrowResultSet, self.result_set) - return result_set.fetchmany(size) - - def fetchall( - self, - ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaArrowResultSet, self.result_set) - return result_set.fetchall() - def as_arrow(self) -> "Table": """Return query results as an Apache Arrow Table. diff --git a/pyathena/async_cursor.py b/pyathena/async_cursor.py index c97750d3..4e8a28ea 100644 --- a/pyathena/async_cursor.py +++ b/pyathena/async_cursor.py @@ -7,8 +7,7 @@ from multiprocessing import cpu_count from typing import Any, Dict, List, Optional, Tuple, Union, cast -from pyathena.common import CursorIterator -from pyathena.cursor import BaseCursor +from pyathena.common import BaseCursor, CursorIterator from pyathena.error import NotSupportedError, ProgrammingError from pyathena.model import AthenaQueryExecution from pyathena.result_set import AthenaDictResultSet, AthenaResultSet diff --git a/pyathena/cursor.py b/pyathena/cursor.py index b1edd0af..46bf967b 100644 --- a/pyathena/cursor.py +++ b/pyathena/cursor.py @@ -2,17 +2,17 @@ from __future__ import annotations import logging -from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast +from typing import Any, Callable, Dict, List, Optional, Union, cast -from pyathena.common import BaseCursor, CursorIterator +from pyathena.common import CursorIterator from pyathena.error import OperationalError, ProgrammingError from pyathena.model import AthenaQueryExecution -from pyathena.result_set import AthenaDictResultSet, AthenaResultSet, WithResultSet +from pyathena.result_set import AthenaDictResultSet, AthenaResultSet, WithFetch _logger = logging.getLogger(__name__) # type: ignore -class Cursor(BaseCursor, CursorIterator, WithResultSet): +class Cursor(WithFetch): """A DB API 2.0 compliant cursor for executing SQL queries on Amazon Athena. The Cursor class provides methods for executing SQL queries against Amazon Athena @@ -68,84 +68,20 @@ def __init__( result_reuse_minutes=result_reuse_minutes, **kwargs, ) - self._query_id: Optional[str] = None - self._result_set: Optional[AthenaResultSet] = None self._result_set_class = AthenaResultSet self._on_start_query_execution = on_start_query_execution @property - def result_set(self) -> Optional[AthenaResultSet]: - """Get the result set from the last executed query. - - Returns: - The result set object containing query results, or None if no - query has been executed or the query didn't return results. - """ - return self._result_set - - @result_set.setter - def result_set(self, val) -> None: - """Set the result set for the cursor. - - Args: - val: The result set object to assign. - """ - self._result_set = val - - @property - def query_id(self) -> Optional[str]: - """Get the Athena query execution ID of the last executed query. - - Returns: - The query execution ID assigned by Athena, or None if no query - has been executed. - """ - return self._query_id - - @query_id.setter - def query_id(self, val) -> None: - """Set the Athena query execution ID. - - Args: - val: The query execution ID to set. - """ - self._query_id = val - - @property - def rownumber(self) -> Optional[int]: - """Get the current row number within the result set. - - Returns: - The zero-based index of the current row, or None if no result set - is available or no rows have been fetched. - """ - return self.result_set.rownumber if self.result_set else None - - @property - def rowcount(self) -> int: - """Get the number of rows affected by the last operation. - - For SELECT statements, this returns the total number of rows in the - result set. For other operations, behavior follows DB API 2.0 specification. - - Returns: - The number of rows, or -1 if not applicable or unknown. - """ - return self.result_set.rowcount if self.result_set else -1 - - def close(self) -> None: - """Close the cursor and free any associated resources. - - Closes the cursor and any associated result sets. This method is provided - for DB API 2.0 compatibility and should be called when the cursor is no - longer needed. - - Note: - After calling this method, the cursor should not be used for - further database operations. - """ - if self.result_set and not self.result_set.is_closed: - self.result_set.close() + def arraysize(self) -> int: + return self._arraysize + + @arraysize.setter + def arraysize(self, value: int) -> None: + if value <= 0 or value > self.DEFAULT_FETCH_SIZE: + raise ProgrammingError( + f"MaxResults is more than maximum allowed length {self.DEFAULT_FETCH_SIZE}." + ) + self._arraysize = value def execute( self, @@ -219,155 +155,6 @@ def on_execution_started(query_id): raise OperationalError(query_execution.state_change_reason) return self - def executemany( - self, - operation: str, - seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], - **kwargs, - ) -> None: - """Execute a SQL query multiple times with different parameters. - - This method executes the same SQL operation multiple times, once for each - parameter set in the sequence. This is useful for bulk operations like - inserting multiple rows. - - Args: - operation: SQL query string to execute. - seq_of_parameters: Sequence of parameter dictionaries or lists, one for each execution. - **kwargs: Additional keyword arguments passed to each execute() call. - - Note: - This method executes each query sequentially. For better performance - with bulk operations, consider using batch operations where supported. - Operations that return result sets are not allowed with executemany. - - Example: - >>> cursor.executemany( - ... "INSERT INTO users (id, name) VALUES (%(id)s, %(name)s)", - ... [ - ... {"id": 1, "name": "Alice"}, - ... {"id": 2, "name": "Bob"}, - ... {"id": 3, "name": "Charlie"} - ... ] - ... ) - """ - for parameters in seq_of_parameters: - self.execute(operation, parameters, **kwargs) - # Operations that have result sets are not allowed with executemany. - self._reset_state() - - def cancel(self) -> None: - """Cancel the currently executing query. - - Cancels the query execution on Amazon Athena. This method can be called - from a different thread to interrupt a long-running query. - - Raises: - ProgrammingError: If no query is currently executing (query_id is None). - - Example: - >>> import threading - >>> import time - >>> - >>> def cancel_after_delay(): - ... time.sleep(5) # Wait 5 seconds - ... cursor.cancel() - >>> - >>> # Start cancellation in separate thread - >>> threading.Thread(target=cancel_after_delay).start() - >>> cursor.execute("SELECT * FROM very_large_table") - """ - if not self.query_id: - raise ProgrammingError("QueryExecutionId is none or empty.") - self._cancel(self.query_id) - - def fetchone( - self, - ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - """Fetch the next row of a query result set. - - Returns the next row of the query result as a tuple, or None when - no more data is available. Column values are converted to appropriate - Python types based on the Athena data types. - - Returns: - A tuple representing the next row, or None if no more rows. - - Raises: - ProgrammingError: If called before executing a query that returns results. - - Example: - >>> cursor.execute("SELECT id, name FROM users LIMIT 3") - >>> while True: - ... row = cursor.fetchone() - ... if not row: - ... break - ... print(f"ID: {row[0]}, Name: {row[1]}") - """ - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaResultSet, self.result_set) - return result_set.fetchone() - - def fetchmany( - self, size: Optional[int] = None - ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - """Fetch multiple rows from a query result set. - - Returns up to 'size' rows from the query result as a list of tuples. - If size is not specified, uses the cursor's arraysize attribute. - - Args: - size: Maximum number of rows to fetch. If None, uses arraysize. - - Returns: - List of tuples representing the fetched rows. May contain fewer - rows than requested if fewer are available. - - Raises: - ProgrammingError: If called before executing a query that returns results. - - Example: - >>> cursor.execute("SELECT id, name FROM users") - >>> rows = cursor.fetchmany(5) # Fetch up to 5 rows - >>> for row in rows: - ... print(f"ID: {row[0]}, Name: {row[1]}") - """ - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaResultSet, self.result_set) - return result_set.fetchmany(size) - - def fetchall( - self, - ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - """Fetch all remaining rows from a query result set. - - Returns all remaining rows from the query result as a list of tuples. - For large result sets, consider using fetchmany() or iterating with - fetchone() to avoid memory issues. - - Returns: - List of tuples representing all remaining rows in the result set. - - Raises: - ProgrammingError: If called before executing a query that returns results. - - Example: - >>> cursor.execute("SELECT id, name FROM users WHERE active = true") - >>> all_rows = cursor.fetchall() - >>> print(f"Found {len(all_rows)} active users") - >>> for row in all_rows: - ... print(f"ID: {row[0]}, Name: {row[1]}") - - Warning: - Be cautious with large result sets as this loads all data into memory. - """ - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaResultSet, self.result_set) - return result_set.fetchall() - class DictCursor(Cursor): """A cursor that returns query results as dictionaries instead of tuples. diff --git a/pyathena/pandas/cursor.py b/pyathena/pandas/cursor.py index 46e8d864..e7bcc455 100644 --- a/pyathena/pandas/cursor.py +++ b/pyathena/pandas/cursor.py @@ -12,13 +12,11 @@ Iterable, List, Optional, - Tuple, Union, cast, ) from pyathena.common import CursorIterator -from pyathena.cursor import BaseCursor from pyathena.error import OperationalError, ProgrammingError from pyathena.model import AthenaCompression, AthenaFileFormat, AthenaQueryExecution from pyathena.pandas.converter import ( @@ -26,7 +24,7 @@ DefaultPandasUnloadTypeConverter, ) from pyathena.pandas.result_set import AthenaPandasResultSet, PandasDataFrameIterator -from pyathena.result_set import WithResultSet +from pyathena.result_set import WithFetch if TYPE_CHECKING: from pandas import DataFrame @@ -34,7 +32,7 @@ _logger = logging.getLogger(__name__) # type: ignore -class PandasCursor(BaseCursor, CursorIterator, WithResultSet): +class PandasCursor(WithFetch): """Cursor for handling pandas DataFrame results from Athena queries. This cursor returns query results as pandas DataFrames with memory-efficient @@ -138,8 +136,6 @@ def __init__( self._max_workers = max_workers self._auto_optimize_chunksize = auto_optimize_chunksize self._on_start_query_execution = on_start_query_execution - self._query_id: Optional[str] = None - self._result_set: Optional[AthenaPandasResultSet] = None @staticmethod def get_default_converter( @@ -149,45 +145,6 @@ def get_default_converter( return DefaultPandasUnloadTypeConverter() return DefaultPandasTypeConverter() - @property - def arraysize(self) -> int: - return self._arraysize - - @arraysize.setter - def arraysize(self, value: int) -> None: - if value <= 0: - raise ProgrammingError("arraysize must be a positive integer value.") - self._arraysize = value - - @property # type: ignore - def result_set(self) -> Optional[AthenaPandasResultSet]: - return self._result_set - - @result_set.setter - def result_set(self, val) -> None: - self._result_set = val - - @property - def query_id(self) -> Optional[str]: - return self._query_id - - @query_id.setter - def query_id(self, val) -> None: - self._query_id = val - - @property - def rownumber(self) -> Optional[int]: - return self.result_set.rownumber if self.result_set else None - - @property - def rowcount(self) -> int: - """Get the number of rows affected by the last operation.""" - return self.result_set.rowcount if self.result_set else -1 - - def close(self) -> None: - if self.result_set and not self.result_set.is_closed: - self.result_set.close() - def execute( self, operation: str, @@ -292,46 +249,6 @@ def execute( return self - def executemany( - self, - operation: str, - seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], - **kwargs, - ) -> None: - for parameters in seq_of_parameters: - self.execute(operation, parameters, **kwargs) - # Operations that have result sets are not allowed with executemany. - self._reset_state() - - def cancel(self) -> None: - if not self.query_id: - raise ProgrammingError("QueryExecutionId is none or empty.") - self._cancel(self.query_id) - - def fetchone( - self, - ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaPandasResultSet, self.result_set) - return result_set.fetchone() - - def fetchmany( - self, size: Optional[int] = None - ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaPandasResultSet, self.result_set) - return result_set.fetchmany(size) - - def fetchall( - self, - ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaPandasResultSet, self.result_set) - return result_set.fetchall() - def as_pandas(self) -> Union["DataFrame", PandasDataFrameIterator]: """Return DataFrame or PandasDataFrameIterator based on chunksize setting. diff --git a/pyathena/polars/cursor.py b/pyathena/polars/cursor.py index 9cec74a9..5ad08010 100644 --- a/pyathena/polars/cursor.py +++ b/pyathena/polars/cursor.py @@ -11,12 +11,11 @@ Iterator, List, Optional, - Tuple, Union, cast, ) -from pyathena.common import BaseCursor, CursorIterator +from pyathena.common import CursorIterator from pyathena.error import OperationalError, ProgrammingError from pyathena.model import AthenaCompression, AthenaFileFormat, AthenaQueryExecution from pyathena.polars.converter import ( @@ -24,7 +23,7 @@ DefaultPolarsUnloadTypeConverter, ) from pyathena.polars.result_set import AthenaPolarsResultSet -from pyathena.result_set import WithResultSet +from pyathena.result_set import WithFetch if TYPE_CHECKING: import polars as pl @@ -33,7 +32,7 @@ _logger = logging.getLogger(__name__) -class PolarsCursor(BaseCursor, CursorIterator, WithResultSet): +class PolarsCursor(WithFetch): """Cursor for handling Polars DataFrame results from Athena queries. This cursor returns query results as Polars DataFrames using Polars' native @@ -135,8 +134,6 @@ def __init__( self._cache_type = cache_type self._max_workers = max_workers self._chunksize = chunksize - self._query_id: Optional[str] = None - self._result_set: Optional[AthenaPolarsResultSet] = None @staticmethod def get_default_converter( @@ -154,60 +151,6 @@ def get_default_converter( return DefaultPolarsUnloadTypeConverter() return DefaultPolarsTypeConverter() - @property - def arraysize(self) -> int: - """Get the number of rows to fetch per batch.""" - return self._arraysize - - @arraysize.setter - def arraysize(self, value: int) -> None: - """Set the number of rows to fetch per batch. - - Args: - value: Number of rows to fetch. Must be positive. - - Raises: - ProgrammingError: If value is not positive. - """ - if value <= 0: - raise ProgrammingError("arraysize must be a positive integer value.") - self._arraysize = value - - @property # type: ignore - def result_set(self) -> Optional[AthenaPolarsResultSet]: - """Get the current result set.""" - return self._result_set - - @result_set.setter - def result_set(self, val) -> None: - """Set the current result set.""" - self._result_set = val - - @property - def query_id(self) -> Optional[str]: - """Get the current query execution ID.""" - return self._query_id - - @query_id.setter - def query_id(self, val) -> None: - """Set the current query execution ID.""" - self._query_id = val - - @property - def rownumber(self) -> Optional[int]: - """Get the current row number in the result set.""" - return self.result_set.rownumber if self.result_set else None - - @property - def rowcount(self) -> int: - """Get the number of rows affected by the last operation.""" - return self.result_set.rowcount if self.result_set else -1 - - def close(self) -> None: - """Close the cursor and release resources.""" - if self.result_set and not self.result_set.is_closed: - self.result_set.close() - def execute( self, operation: str, @@ -298,85 +241,6 @@ def execute( raise OperationalError(query_execution.state_change_reason) return self - def executemany( - self, - operation: str, - seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], - **kwargs, - ) -> None: - """Execute a SQL query multiple times with different parameters. - - Args: - operation: SQL query string to execute. - seq_of_parameters: Sequence of parameter sets. - **kwargs: Additional execution parameters. - """ - for parameters in seq_of_parameters: - self.execute(operation, parameters, **kwargs) - # Operations that have result sets are not allowed with executemany. - self._reset_state() - - def cancel(self) -> None: - """Cancel the currently running query. - - Raises: - ProgrammingError: If no query is currently running. - """ - if not self.query_id: - raise ProgrammingError("QueryExecutionId is none or empty.") - self._cancel(self.query_id) - - def fetchone( - self, - ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - """Fetch the next row of the query result. - - Returns: - A single row as a tuple, or None if no more rows are available. - - Raises: - ProgrammingError: If no result set is available. - """ - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaPolarsResultSet, self.result_set) - return result_set.fetchone() - - def fetchmany( - self, size: Optional[int] = None - ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - """Fetch the next set of rows of the query result. - - Args: - size: Number of rows to fetch. Defaults to arraysize. - - Returns: - A list of rows as tuples. - - Raises: - ProgrammingError: If no result set is available. - """ - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaPolarsResultSet, self.result_set) - return result_set.fetchmany(size) - - def fetchall( - self, - ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - """Fetch all remaining rows of the query result. - - Returns: - A list of all remaining rows as tuples. - - Raises: - ProgrammingError: If no result set is available. - """ - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaPolarsResultSet, self.result_set) - return result_set.fetchall() - def as_polars(self) -> "pl.DataFrame": """Return query results as a Polars DataFrame. diff --git a/pyathena/result_set.py b/pyathena/result_set.py index 3793aa7a..103a8a22 100644 --- a/pyathena/result_set.py +++ b/pyathena/result_set.py @@ -18,7 +18,7 @@ cast, ) -from pyathena.common import CursorIterator +from pyathena.common import BaseCursor, CursorIterator from pyathena.converter import Converter, DefaultTypeConverter from pyathena.error import DataError, OperationalError, ProgrammingError from pyathena.model import AthenaQueryExecution @@ -898,3 +898,139 @@ def rowcount(self) -> int: The number of rows, or -1 if not applicable or unknown. """ return self.result_set.rowcount if self.result_set else -1 + + +class WithFetch(BaseCursor, CursorIterator, WithResultSet): + """Mixin providing shared properties, fetch, lifecycle, and sync iteration for SQL cursors. + + Provides properties (``arraysize``, ``result_set``, ``query_id``, + ``rownumber``, ``rowcount``), lifecycle methods (``close``, ``executemany``, + ``cancel``), default sync fetch (for cursors whose result sets load all + data eagerly in ``__init__``), and the sync iteration protocol. + + Subclasses override ``execute()`` and optionally ``__init__`` and + format-specific helpers. + """ + + def __init__(self, **kwargs) -> None: + super().__init__(**kwargs) + self._query_id: Optional[str] = None + self._result_set: Optional[AthenaResultSet] = None + + @property + def arraysize(self) -> int: + return self._arraysize + + @arraysize.setter + def arraysize(self, value: int) -> None: + if value <= 0: + raise ProgrammingError("arraysize must be a positive integer value.") + self._arraysize = value + + @property # type: ignore + def result_set(self) -> Optional[AthenaResultSet]: + return self._result_set + + @result_set.setter + def result_set(self, val) -> None: + self._result_set = val + + @property + def query_id(self) -> Optional[str]: + return self._query_id + + @query_id.setter + def query_id(self, val) -> None: + self._query_id = val + + @property + def rownumber(self) -> Optional[int]: + return self.result_set.rownumber if self.result_set else None + + @property + def rowcount(self) -> int: + return self.result_set.rowcount if self.result_set else -1 + + def close(self) -> None: + """Close the cursor and release associated resources.""" + if self.result_set and not self.result_set.is_closed: + self.result_set.close() + + def executemany( + self, + operation: str, + seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], + **kwargs, + ) -> None: + """Execute a SQL query multiple times with different parameters. + + Args: + operation: SQL query string to execute. + seq_of_parameters: Sequence of parameter sets, one per execution. + **kwargs: Additional keyword arguments passed to each ``execute()``. + """ + for parameters in seq_of_parameters: + self.execute(operation, parameters, **kwargs) + # Operations that have result sets are not allowed with executemany. + self._reset_state() + + def cancel(self) -> None: + """Cancel the currently executing query. + + Raises: + ProgrammingError: If no query is currently executing. + """ + if not self.query_id: + raise ProgrammingError("QueryExecutionId is none or empty.") + self._cancel(self.query_id) + + def fetchone( + self, + ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch the next row of the result set. + + Returns: + A tuple representing the next row, or None if no more rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaResultSet, self.result_set) + return result_set.fetchone() + + def fetchmany( + self, size: Optional[int] = None + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch multiple rows from the result set. + + Args: + size: Maximum number of rows to fetch. Defaults to arraysize. + + Returns: + List of tuples representing the fetched rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaResultSet, self.result_set) + return result_set.fetchmany(size) + + def fetchall( + self, + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch all remaining rows from the result set. + + Returns: + List of tuples representing all remaining rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaResultSet, self.result_set) + return result_set.fetchall() diff --git a/pyathena/s3fs/cursor.py b/pyathena/s3fs/cursor.py index 4fefe7e2..138e7b71 100644 --- a/pyathena/s3fs/cursor.py +++ b/pyathena/s3fs/cursor.py @@ -2,19 +2,19 @@ from __future__ import annotations import logging -from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast +from typing import Any, Callable, Dict, List, Optional, Union, cast -from pyathena.common import BaseCursor, CursorIterator -from pyathena.error import OperationalError, ProgrammingError +from pyathena.common import CursorIterator +from pyathena.error import OperationalError from pyathena.model import AthenaQueryExecution -from pyathena.result_set import WithResultSet +from pyathena.result_set import WithFetch from pyathena.s3fs.converter import DefaultS3FSTypeConverter from pyathena.s3fs.result_set import AthenaS3FSResultSet, CSVReaderType _logger = logging.getLogger(__name__) -class S3FSCursor(BaseCursor, CursorIterator, WithResultSet): +class S3FSCursor(WithFetch): """Cursor for reading CSV results via S3FileSystem without pandas/pyarrow. This cursor uses Python's standard csv module and PyAthena's S3FileSystem @@ -106,8 +106,6 @@ def __init__( ) self._on_start_query_execution = on_start_query_execution self._csv_reader = csv_reader - self._query_id: Optional[str] = None - self._result_set: Optional[AthenaS3FSResultSet] = None @staticmethod def get_default_converter( @@ -123,60 +121,6 @@ def get_default_converter( """ return DefaultS3FSTypeConverter() - @property - def arraysize(self) -> int: - """Get the number of rows to fetch at a time with fetchmany().""" - return self._arraysize - - @arraysize.setter - def arraysize(self, value: int) -> None: - """Set the number of rows to fetch at a time with fetchmany(). - - Args: - value: Number of rows (must be positive). - - Raises: - ProgrammingError: If value is not positive. - """ - if value <= 0: - raise ProgrammingError("arraysize must be a positive integer value.") - self._arraysize = value - - @property # type: ignore - def result_set(self) -> Optional[AthenaS3FSResultSet]: - """Get the current result set.""" - return self._result_set - - @result_set.setter - def result_set(self, val) -> None: - """Set the current result set.""" - self._result_set = val - - @property - def query_id(self) -> Optional[str]: - """Get the ID of the last executed query.""" - return self._query_id - - @query_id.setter - def query_id(self, val) -> None: - """Set the query ID.""" - self._query_id = val - - @property - def rownumber(self) -> Optional[int]: - """Get the current row number (0-indexed).""" - return self.result_set.rownumber if self.result_set else None - - @property - def rowcount(self) -> int: - """Get the number of rows affected by the last operation.""" - return self.result_set.rowcount if self.result_set else -1 - - def close(self) -> None: - """Close the cursor and release resources.""" - if self.result_set and not self.result_set.is_closed: - self.result_set.close() - def execute( self, operation: str, @@ -249,82 +193,3 @@ def execute( else: raise OperationalError(query_execution.state_change_reason) return self - - def executemany( - self, - operation: str, - seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], - **kwargs, - ) -> None: - """Execute a SQL query with multiple parameter sets. - - Args: - operation: SQL query string to execute. - seq_of_parameters: Sequence of parameter sets. - **kwargs: Additional execution parameters. - """ - for parameters in seq_of_parameters: - self.execute(operation, parameters, **kwargs) - # Operations that have result sets are not allowed with executemany. - self._reset_state() - - def cancel(self) -> None: - """Cancel the currently running query. - - Raises: - ProgrammingError: If no query is running. - """ - if not self.query_id: - raise ProgrammingError("QueryExecutionId is none or empty.") - self._cancel(self.query_id) - - def fetchone( - self, - ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - """Fetch the next row of the result set. - - Returns: - A tuple representing the next row, or None if no more rows. - - Raises: - ProgrammingError: If no query has been executed. - """ - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaS3FSResultSet, self.result_set) - return result_set.fetchone() - - def fetchmany( - self, size: Optional[int] = None - ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - """Fetch the next set of rows of the result set. - - Args: - size: Maximum number of rows to fetch. Defaults to arraysize. - - Returns: - A list of tuples representing the rows. - - Raises: - ProgrammingError: If no query has been executed. - """ - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaS3FSResultSet, self.result_set) - return result_set.fetchmany(size) - - def fetchall( - self, - ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: - """Fetch all remaining rows of the result set. - - Returns: - A list of tuples representing all remaining rows. - - Raises: - ProgrammingError: If no query has been executed. - """ - if not self.has_result_set: - raise ProgrammingError("No result set.") - result_set = cast(AthenaS3FSResultSet, self.result_set) - return result_set.fetchall()