From 62adae347f275ca8d181120563dda3bdd2c9f352 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 21 Feb 2026 14:04:39 +0900 Subject: [PATCH 1/3] Wrap fetch methods in asyncio.to_thread for aio pandas/arrow/polars cursors Override fetchone(), fetchmany(), fetchall(), and __anext__() in AioPandasCursor, AioArrowCursor, and AioPolarsCursor to use asyncio.to_thread(), preventing event loop blocking when chunksize triggers lazy S3 reads. This unifies the API so all aio cursors require await for fetch operations, consistent with AioCursor and AioS3FSCursor. Fixes #672 Co-Authored-By: Claude Opus 4.6 --- docs/aio.md | 37 +++++------------- docs/arrow.md | 13 +++--- docs/pandas.md | 19 +++------ docs/polars.md | 19 +++------ pyathena/aio/arrow/cursor.py | 73 ++++++++++++++++++++++++++++++++-- pyathena/aio/pandas/cursor.py | 73 ++++++++++++++++++++++++++++++++-- pyathena/aio/polars/cursor.py | 74 +++++++++++++++++++++++++++++++++-- 7 files changed, 233 insertions(+), 75 deletions(-) diff --git a/docs/aio.md b/docs/aio.md index 681f2e81..f33415a5 100644 --- a/docs/aio.md +++ b/docs/aio.md @@ -177,38 +177,19 @@ Native asyncio versions are available for all cursor types: ### Fetch behavior -For **AioPandasCursor**, **AioArrowCursor**, and **AioPolarsCursor**, the S3 download -(CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. -By the time `execute()` returns, all data is already loaded into memory. -Therefore `fetchone()`, `fetchall()`, `as_pandas()`, `as_arrow()`, and `as_polars()` -are synchronous (in-memory only) and do not need `await`: +All aio cursors use `await` for fetch operations. The S3 download (CSV or Parquet) +happens inside `execute()`, wrapped in `asyncio.to_thread()`. Fetch methods are also +wrapped in `asyncio.to_thread()` to ensure the event loop is never blocked — this is +especially important when `chunksize` is set, as fetch calls trigger lazy S3 reads. ```python -# Pandas, Arrow, Polars — S3 download completes during execute() -await cursor.execute("SELECT * FROM many_rows") # Downloads data here -row = cursor.fetchone() # No await — data already in memory -rows = cursor.fetchall() # No await -df = cursor.as_pandas() # No await -``` - -The exceptions are **AioCursor** and **AioS3FSCursor**, which stream rows lazily from S3. -Their fetch methods perform I/O and require `await`: - -```python -# AioCursor, AioS3FSCursor — fetch reads from S3 lazily await cursor.execute("SELECT * FROM many_rows") -row = await cursor.fetchone() # Await required — reads from S3 -rows = await cursor.fetchall() # Await required +row = await cursor.fetchone() +rows = await cursor.fetchall() +df = cursor.as_pandas() # In-memory conversion, no await needed ``` -```{note} -When using AioPandasCursor or AioPolarsCursor with the `chunksize` option, -`execute()` creates a lazy reader (e.g., pandas `TextFileReader`) instead of -loading all data at once. Subsequent iteration via `as_pandas()`, `fetchone()`, -or `async for` triggers chunk-by-chunk S3 reads that are **not** wrapped in -`asyncio.to_thread()` and will block the event loop. If you need chunked -processing in an async application, consider wrapping the iteration in -`asyncio.to_thread()` yourself, or use the default non-chunked mode. -``` +The `as_pandas()`, `as_arrow()`, and `as_polars()` convenience methods operate on +already-loaded data and remain synchronous. See each cursor's documentation page for detailed usage examples. diff --git a/docs/arrow.md b/docs/arrow.md index 49121bbc..419808fc 100644 --- a/docs/arrow.md +++ b/docs/arrow.md @@ -487,11 +487,8 @@ cursor = connect( AioArrowCursor is a native asyncio cursor that returns results as Apache Arrow Tables. Unlike AsyncArrowCursor which uses `concurrent.futures`, this cursor uses -`asyncio.to_thread()` for result set creation, keeping the event loop free. - -The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. -By the time `execute()` returns, all data is already loaded into memory. -Therefore fetch methods, `as_arrow()`, and `as_polars()` are synchronous and do not need `await`. +`asyncio.to_thread()` for both result set creation and fetch operations, +keeping the event loop free. ```python from pyathena import aconnect @@ -517,9 +514,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", region_name="us-west-2") as conn: cursor = conn.cursor(AioArrowCursor) await cursor.execute("SELECT * FROM many_rows") - print(cursor.fetchone()) - print(cursor.fetchmany()) - print(cursor.fetchall()) + print(await cursor.fetchone()) + print(await cursor.fetchmany()) + print(await cursor.fetchall()) ``` ```python diff --git a/docs/pandas.md b/docs/pandas.md index a26c4bde..4e8886cb 100644 --- a/docs/pandas.md +++ b/docs/pandas.md @@ -774,11 +774,8 @@ cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", AioPandasCursor is a native asyncio cursor that returns results as pandas DataFrames. Unlike AsyncPandasCursor which uses `concurrent.futures`, this cursor uses -`asyncio.to_thread()` for result set creation, keeping the event loop free. - -The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. -By the time `execute()` returns, all data is already loaded into memory. -Therefore fetch methods and `as_pandas()` are synchronous and do not need `await`. +`asyncio.to_thread()` for both result set creation and fetch operations, +keeping the event loop free. ```python from pyathena import aconnect @@ -803,9 +800,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", region_name="us-west-2") as conn: cursor = conn.cursor(AioPandasCursor) await cursor.execute("SELECT * FROM many_rows") - print(cursor.fetchone()) - print(cursor.fetchmany()) - print(cursor.fetchall()) + print(await cursor.fetchone()) + print(await cursor.fetchmany()) + print(await cursor.fetchall()) ``` ```python @@ -833,9 +830,3 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", df = cursor.as_pandas() ``` -```{note} -When using AioPandasCursor with the `chunksize` option, `execute()` creates a lazy -`TextFileReader` instead of loading all data at once. Subsequent iteration via -`as_pandas()`, `fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that -are not wrapped in `asyncio.to_thread()` and will block the event loop. -``` diff --git a/docs/polars.md b/docs/polars.md index fed8887f..6c50cab7 100644 --- a/docs/polars.md +++ b/docs/polars.md @@ -581,11 +581,8 @@ for chunk in result_set.iter_chunks(): AioPolarsCursor is a native asyncio cursor that returns results as Polars DataFrames. Unlike AsyncPolarsCursor which uses `concurrent.futures`, this cursor uses -`asyncio.to_thread()` for result set creation, keeping the event loop free. - -The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. -By the time `execute()` returns, all data is already loaded into memory. -Therefore fetch methods, `as_polars()`, and `as_arrow()` are synchronous and do not need `await`. +`asyncio.to_thread()` for both result set creation and fetch operations, +keeping the event loop free. ```python from pyathena import aconnect @@ -610,9 +607,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", region_name="us-west-2") as conn: cursor = conn.cursor(AioPolarsCursor) await cursor.execute("SELECT * FROM many_rows") - print(cursor.fetchone()) - print(cursor.fetchmany()) - print(cursor.fetchall()) + print(await cursor.fetchone()) + print(await cursor.fetchmany()) + print(await cursor.fetchall()) ``` ```python @@ -653,9 +650,3 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", df = cursor.as_polars() ``` -```{note} -When using AioPolarsCursor with the `chunksize` option, `execute()` creates a lazy -reader instead of loading all data at once. Subsequent iteration via `as_polars()`, -`fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that are not wrapped -in `asyncio.to_thread()` and will block the event loop. -``` diff --git a/pyathena/aio/arrow/cursor.py b/pyathena/aio/arrow/cursor.py index cb7b9c2f..4ed51845 100644 --- a/pyathena/aio/arrow/cursor.py +++ b/pyathena/aio/arrow/cursor.py @@ -3,7 +3,7 @@ import asyncio import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast from pyathena.aio.common import WithAsyncFetch from pyathena.arrow.converter import ( @@ -25,9 +25,8 @@ class AioArrowCursor(WithAsyncFetch): """Native asyncio cursor that returns results as Apache Arrow Tables. - Uses ``asyncio.to_thread()`` to create the result set off the event loop. - Since ``AthenaArrowResultSet`` loads all data in ``__init__`` (via S3), - fetch methods are synchronous (in-memory only) and do not need to be async. + Uses ``asyncio.to_thread()`` for both result set creation and fetch + operations, keeping the event loop free. Example: >>> async with await pyathena.aconnect(...) as conn: @@ -153,6 +152,72 @@ async def execute( # type: ignore[override] raise OperationalError(query_execution.state_change_reason) return self + async def fetchone( # type: ignore[override] + self, + ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch the next row of the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop. + + Returns: + A tuple representing the next row, or None if no more rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaArrowResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchone) + + async def fetchmany( # type: ignore[override] + self, size: Optional[int] = None + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch multiple rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop. + + Args: + size: Maximum number of rows to fetch. Defaults to arraysize. + + Returns: + List of tuples representing the fetched rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaArrowResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchmany, size) + + async def fetchall( # type: ignore[override] + self, + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch all remaining rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop. + + Returns: + List of tuples representing all remaining rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaArrowResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchall) + + async def __anext__(self): + row = await self.fetchone() + if row is None: + raise StopAsyncIteration + return row + def as_arrow(self) -> "Table": """Return query results as an Apache Arrow Table. diff --git a/pyathena/aio/pandas/cursor.py b/pyathena/aio/pandas/cursor.py index fbee2625..9d0e0ca3 100644 --- a/pyathena/aio/pandas/cursor.py +++ b/pyathena/aio/pandas/cursor.py @@ -11,6 +11,7 @@ Iterable, List, Optional, + Tuple, Union, cast, ) @@ -34,9 +35,9 @@ class AioPandasCursor(WithAsyncFetch): """Native asyncio cursor that returns results as pandas DataFrames. - Uses ``asyncio.to_thread()`` to create the result set off the event loop. - Since ``AthenaPandasResultSet`` loads all data in ``__init__`` (via S3), - fetch methods are synchronous (in-memory only) and do not need to be async. + Uses ``asyncio.to_thread()`` for both result set creation and fetch + operations, keeping the event loop free. This is especially important + when ``chunksize`` is set, as fetch calls trigger lazy S3 reads. Example: >>> async with await pyathena.aconnect(...) as conn: @@ -183,6 +184,72 @@ async def execute( # type: ignore[override] raise OperationalError(query_execution.state_change_reason) return self + async def fetchone( # type: ignore[override] + self, + ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch the next row of the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Returns: + A tuple representing the next row, or None if no more rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPandasResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchone) + + async def fetchmany( # type: ignore[override] + self, size: Optional[int] = None + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch multiple rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Args: + size: Maximum number of rows to fetch. Defaults to arraysize. + + Returns: + List of tuples representing the fetched rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPandasResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchmany, size) + + async def fetchall( # type: ignore[override] + self, + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch all remaining rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Returns: + List of tuples representing all remaining rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPandasResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchall) + + async def __anext__(self): + row = await self.fetchone() + if row is None: + raise StopAsyncIteration + return row + def as_pandas(self) -> Union["DataFrame", PandasDataFrameIterator]: """Return DataFrame or PandasDataFrameIterator based on chunksize setting. diff --git a/pyathena/aio/polars/cursor.py b/pyathena/aio/polars/cursor.py index 1f897f9d..f12000eb 100644 --- a/pyathena/aio/polars/cursor.py +++ b/pyathena/aio/polars/cursor.py @@ -4,7 +4,7 @@ import asyncio import logging from multiprocessing import cpu_count -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast from pyathena.aio.common import WithAsyncFetch from pyathena.common import CursorIterator @@ -26,9 +26,9 @@ class AioPolarsCursor(WithAsyncFetch): """Native asyncio cursor that returns results as Polars DataFrames. - Uses ``asyncio.to_thread()`` to create the result set off the event loop. - Since ``AthenaPolarsResultSet`` loads all data in ``__init__`` (via S3), - fetch methods are synchronous (in-memory only) and do not need to be async. + Uses ``asyncio.to_thread()`` for both result set creation and fetch + operations, keeping the event loop free. This is especially important + when ``chunksize`` is set, as fetch calls trigger lazy S3 reads. Example: >>> async with await pyathena.aconnect(...) as conn: @@ -160,6 +160,72 @@ async def execute( # type: ignore[override] raise OperationalError(query_execution.state_change_reason) return self + async def fetchone( # type: ignore[override] + self, + ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch the next row of the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Returns: + A tuple representing the next row, or None if no more rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPolarsResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchone) + + async def fetchmany( # type: ignore[override] + self, size: Optional[int] = None + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch multiple rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Args: + size: Maximum number of rows to fetch. Defaults to arraysize. + + Returns: + List of tuples representing the fetched rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPolarsResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchmany, size) + + async def fetchall( # type: ignore[override] + self, + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch all remaining rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Returns: + List of tuples representing all remaining rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPolarsResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchall) + + async def __anext__(self): + row = await self.fetchone() + if row is None: + raise StopAsyncIteration + return row + def as_polars(self) -> "pl.DataFrame": """Return query results as a Polars DataFrame. From c8f536224aed84b22172a53fb5cd4b7e3323c681 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 21 Feb 2026 14:34:03 +0900 Subject: [PATCH 2/3] Add await to fetch calls in aio cursor tests Tests were calling fetchone(), fetchmany(), fetchall() without await, producing "coroutine was never awaited" warnings and incorrect assertions (comparing coroutine objects instead of results). Co-Authored-By: Claude Opus 4.6 --- tests/pyathena/aio/arrow/test_cursor.py | 32 ++++++++++++------------ tests/pyathena/aio/pandas/test_cursor.py | 32 ++++++++++++------------ tests/pyathena/aio/polars/test_cursor.py | 32 ++++++++++++------------ 3 files changed, 48 insertions(+), 48 deletions(-) diff --git a/tests/pyathena/aio/arrow/test_cursor.py b/tests/pyathena/aio/arrow/test_cursor.py index de5d13a5..efc8f6e6 100644 --- a/tests/pyathena/aio/arrow/test_cursor.py +++ b/tests/pyathena/aio/arrow/test_cursor.py @@ -11,20 +11,20 @@ class TestAioArrowCursor: async def test_fetchone(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT * FROM one_row") assert aio_arrow_cursor.rownumber == 0 - assert aio_arrow_cursor.fetchone() == (1,) + assert await aio_arrow_cursor.fetchone() == (1,) assert aio_arrow_cursor.rownumber == 1 - assert aio_arrow_cursor.fetchone() is None + assert await aio_arrow_cursor.fetchone() is None async def test_fetchmany(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT * FROM many_rows LIMIT 15") - assert len(aio_arrow_cursor.fetchmany(10)) == 10 - assert len(aio_arrow_cursor.fetchmany(10)) == 5 + assert len(await aio_arrow_cursor.fetchmany(10)) == 10 + assert len(await aio_arrow_cursor.fetchmany(10)) == 5 async def test_fetchall(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT * FROM one_row") - assert aio_arrow_cursor.fetchall() == [(1,)] + assert await aio_arrow_cursor.fetchall() == [(1,)] await aio_arrow_cursor.execute("SELECT a FROM many_rows ORDER BY a") - assert aio_arrow_cursor.fetchall() == [(i,) for i in range(10000)] + assert await aio_arrow_cursor.fetchall() == [(i,) for i in range(10000)] async def test_as_arrow(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT * FROM one_row") @@ -45,11 +45,11 @@ async def test_execute_returns_self(self, aio_arrow_cursor): async def test_no_result_set_raises(self, aio_arrow_cursor): with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchone() + await aio_arrow_cursor.fetchone() with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchmany() + await aio_arrow_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchall() + await aio_arrow_cursor.fetchall() with pytest.raises(ProgrammingError): aio_arrow_cursor.as_arrow() with pytest.raises(ProgrammingError): @@ -62,7 +62,7 @@ async def test_context_manager(self): try: async with conn.cursor() as cursor: await cursor.execute("SELECT * FROM one_row") - assert cursor.fetchone() == (1,) + assert await cursor.fetchone() == (1,) finally: conn.close() @@ -77,7 +77,7 @@ async def test_invalid_arraysize(self, aio_arrow_cursor): async def test_description(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT CAST(1 AS INT) AS foobar FROM one_row") - assert aio_arrow_cursor.fetchall() == [(1,)] + assert await aio_arrow_cursor.fetchall() == [(1,)] assert aio_arrow_cursor.description == [("foobar", "integer", None, None, 10, 0, "UNKNOWN")] async def test_description_initial(self, aio_arrow_cursor): @@ -92,11 +92,11 @@ async def test_executemany_fetch(self, aio_arrow_cursor): "SELECT %(x)d FROM one_row", [{"x": i} for i in range(1, 2)] ) with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchall() + await aio_arrow_cursor.fetchall() with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchmany() + await aio_arrow_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchone() + await aio_arrow_cursor.fetchone() with pytest.raises(ProgrammingError): aio_arrow_cursor.as_arrow() with pytest.raises(ProgrammingError): @@ -109,8 +109,8 @@ async def test_executemany_fetch(self, aio_arrow_cursor): ) async def test_fetchone_unload(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT * FROM one_row") - assert aio_arrow_cursor.fetchone() == (1,) - assert aio_arrow_cursor.fetchone() is None + assert await aio_arrow_cursor.fetchone() == (1,) + assert await aio_arrow_cursor.fetchone() is None @pytest.mark.parametrize( "aio_arrow_cursor", diff --git a/tests/pyathena/aio/pandas/test_cursor.py b/tests/pyathena/aio/pandas/test_cursor.py index 0b615c04..6e3794e7 100644 --- a/tests/pyathena/aio/pandas/test_cursor.py +++ b/tests/pyathena/aio/pandas/test_cursor.py @@ -11,20 +11,20 @@ class TestAioPandasCursor: async def test_fetchone(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT * FROM one_row") assert aio_pandas_cursor.rownumber == 0 - assert aio_pandas_cursor.fetchone() == (1,) + assert await aio_pandas_cursor.fetchone() == (1,) assert aio_pandas_cursor.rownumber == 1 - assert aio_pandas_cursor.fetchone() is None + assert await aio_pandas_cursor.fetchone() is None async def test_fetchmany(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT * FROM many_rows LIMIT 15") - assert len(aio_pandas_cursor.fetchmany(10)) == 10 - assert len(aio_pandas_cursor.fetchmany(10)) == 5 + assert len(await aio_pandas_cursor.fetchmany(10)) == 10 + assert len(await aio_pandas_cursor.fetchmany(10)) == 5 async def test_fetchall(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT * FROM one_row") - assert aio_pandas_cursor.fetchall() == [(1,)] + assert await aio_pandas_cursor.fetchall() == [(1,)] await aio_pandas_cursor.execute("SELECT a FROM many_rows ORDER BY a") - assert aio_pandas_cursor.fetchall() == [(i,) for i in range(10000)] + assert await aio_pandas_cursor.fetchall() == [(i,) for i in range(10000)] async def test_as_pandas(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT * FROM one_row") @@ -39,11 +39,11 @@ async def test_execute_returns_self(self, aio_pandas_cursor): async def test_no_result_set_raises(self, aio_pandas_cursor): with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchone() + await aio_pandas_cursor.fetchone() with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchmany() + await aio_pandas_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchall() + await aio_pandas_cursor.fetchall() with pytest.raises(ProgrammingError): aio_pandas_cursor.as_pandas() @@ -54,7 +54,7 @@ async def test_context_manager(self): try: async with conn.cursor() as cursor: await cursor.execute("SELECT * FROM one_row") - assert cursor.fetchone() == (1,) + assert await cursor.fetchone() == (1,) finally: conn.close() @@ -69,7 +69,7 @@ async def test_invalid_arraysize(self, aio_pandas_cursor): async def test_description(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT CAST(1 AS INT) AS foobar FROM one_row") - assert aio_pandas_cursor.fetchall() == [(1,)] + assert await aio_pandas_cursor.fetchall() == [(1,)] assert aio_pandas_cursor.description == [ ("foobar", "integer", None, None, 10, 0, "UNKNOWN") ] @@ -86,11 +86,11 @@ async def test_executemany_fetch(self, aio_pandas_cursor): "SELECT %(x)d FROM one_row", [{"x": i} for i in range(1, 2)] ) with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchall() + await aio_pandas_cursor.fetchall() with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchmany() + await aio_pandas_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchone() + await aio_pandas_cursor.fetchone() with pytest.raises(ProgrammingError): aio_pandas_cursor.as_pandas() @@ -101,8 +101,8 @@ async def test_executemany_fetch(self, aio_pandas_cursor): ) async def test_fetchone_unload(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT * FROM one_row") - assert aio_pandas_cursor.fetchone() == (1,) - assert aio_pandas_cursor.fetchone() is None + assert await aio_pandas_cursor.fetchone() == (1,) + assert await aio_pandas_cursor.fetchone() is None @pytest.mark.parametrize( "aio_pandas_cursor", diff --git a/tests/pyathena/aio/polars/test_cursor.py b/tests/pyathena/aio/polars/test_cursor.py index 17242a4f..81309fe5 100644 --- a/tests/pyathena/aio/polars/test_cursor.py +++ b/tests/pyathena/aio/polars/test_cursor.py @@ -11,20 +11,20 @@ class TestAioPolarsCursor: async def test_fetchone(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT * FROM one_row") assert aio_polars_cursor.rownumber == 0 - assert aio_polars_cursor.fetchone() == (1,) + assert await aio_polars_cursor.fetchone() == (1,) assert aio_polars_cursor.rownumber == 1 - assert aio_polars_cursor.fetchone() is None + assert await aio_polars_cursor.fetchone() is None async def test_fetchmany(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT * FROM many_rows LIMIT 15") - assert len(aio_polars_cursor.fetchmany(10)) == 10 - assert len(aio_polars_cursor.fetchmany(10)) == 5 + assert len(await aio_polars_cursor.fetchmany(10)) == 10 + assert len(await aio_polars_cursor.fetchmany(10)) == 5 async def test_fetchall(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT * FROM one_row") - assert aio_polars_cursor.fetchall() == [(1,)] + assert await aio_polars_cursor.fetchall() == [(1,)] await aio_polars_cursor.execute("SELECT a FROM many_rows ORDER BY a") - assert aio_polars_cursor.fetchall() == [(i,) for i in range(10000)] + assert await aio_polars_cursor.fetchall() == [(i,) for i in range(10000)] async def test_as_polars(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT * FROM one_row") @@ -44,11 +44,11 @@ async def test_execute_returns_self(self, aio_polars_cursor): async def test_no_result_set_raises(self, aio_polars_cursor): with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchone() + await aio_polars_cursor.fetchone() with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchmany() + await aio_polars_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchall() + await aio_polars_cursor.fetchall() with pytest.raises(ProgrammingError): aio_polars_cursor.as_polars() with pytest.raises(ProgrammingError): @@ -61,7 +61,7 @@ async def test_context_manager(self): try: async with conn.cursor() as cursor: await cursor.execute("SELECT * FROM one_row") - assert cursor.fetchone() == (1,) + assert await cursor.fetchone() == (1,) finally: conn.close() @@ -76,7 +76,7 @@ async def test_invalid_arraysize(self, aio_polars_cursor): async def test_description(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT CAST(1 AS INT) AS foobar FROM one_row") - assert aio_polars_cursor.fetchall() == [(1,)] + assert await aio_polars_cursor.fetchall() == [(1,)] assert aio_polars_cursor.description == [ ("foobar", "integer", None, None, 10, 0, "UNKNOWN") ] @@ -93,11 +93,11 @@ async def test_executemany_fetch(self, aio_polars_cursor): "SELECT %(x)d FROM one_row", [{"x": i} for i in range(1, 2)] ) with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchall() + await aio_polars_cursor.fetchall() with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchmany() + await aio_polars_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchone() + await aio_polars_cursor.fetchone() with pytest.raises(ProgrammingError): aio_polars_cursor.as_polars() with pytest.raises(ProgrammingError): @@ -110,8 +110,8 @@ async def test_executemany_fetch(self, aio_polars_cursor): ) async def test_fetchone_unload(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT * FROM one_row") - assert aio_polars_cursor.fetchone() == (1,) - assert aio_polars_cursor.fetchone() is None + assert await aio_polars_cursor.fetchone() == (1,) + assert await aio_polars_cursor.fetchone() is None @pytest.mark.parametrize( "aio_polars_cursor", From ca788ba249bf1e896e9153ecadd81a86d1e330e7 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 21 Feb 2026 14:34:57 +0900 Subject: [PATCH 3/3] Set asyncio_default_fixture_loop_scope to suppress pytest-asyncio warning pytest-asyncio warns that future versions will default the loop scope for async fixtures to function scope. Set it explicitly to "function" to suppress the deprecation warning and ensure consistent behavior. Co-Authored-By: Claude Opus 4.6 --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 0bf7f15e..6106c255 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,6 +110,7 @@ version-file = "pyathena/_version.py" [tool.pytest.ini_options] asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" norecursedirs = [ "benchmarks", ".venv",