diff --git a/docs/aio.md b/docs/aio.md index 681f2e81..f33415a5 100644 --- a/docs/aio.md +++ b/docs/aio.md @@ -177,38 +177,19 @@ Native asyncio versions are available for all cursor types: ### Fetch behavior -For **AioPandasCursor**, **AioArrowCursor**, and **AioPolarsCursor**, the S3 download -(CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. -By the time `execute()` returns, all data is already loaded into memory. -Therefore `fetchone()`, `fetchall()`, `as_pandas()`, `as_arrow()`, and `as_polars()` -are synchronous (in-memory only) and do not need `await`: +All aio cursors use `await` for fetch operations. The S3 download (CSV or Parquet) +happens inside `execute()`, wrapped in `asyncio.to_thread()`. Fetch methods are also +wrapped in `asyncio.to_thread()` to ensure the event loop is never blocked — this is +especially important when `chunksize` is set, as fetch calls trigger lazy S3 reads. ```python -# Pandas, Arrow, Polars — S3 download completes during execute() -await cursor.execute("SELECT * FROM many_rows") # Downloads data here -row = cursor.fetchone() # No await — data already in memory -rows = cursor.fetchall() # No await -df = cursor.as_pandas() # No await -``` - -The exceptions are **AioCursor** and **AioS3FSCursor**, which stream rows lazily from S3. -Their fetch methods perform I/O and require `await`: - -```python -# AioCursor, AioS3FSCursor — fetch reads from S3 lazily await cursor.execute("SELECT * FROM many_rows") -row = await cursor.fetchone() # Await required — reads from S3 -rows = await cursor.fetchall() # Await required +row = await cursor.fetchone() +rows = await cursor.fetchall() +df = cursor.as_pandas() # In-memory conversion, no await needed ``` -```{note} -When using AioPandasCursor or AioPolarsCursor with the `chunksize` option, -`execute()` creates a lazy reader (e.g., pandas `TextFileReader`) instead of -loading all data at once. Subsequent iteration via `as_pandas()`, `fetchone()`, -or `async for` triggers chunk-by-chunk S3 reads that are **not** wrapped in -`asyncio.to_thread()` and will block the event loop. If you need chunked -processing in an async application, consider wrapping the iteration in -`asyncio.to_thread()` yourself, or use the default non-chunked mode. -``` +The `as_pandas()`, `as_arrow()`, and `as_polars()` convenience methods operate on +already-loaded data and remain synchronous. See each cursor's documentation page for detailed usage examples. diff --git a/docs/arrow.md b/docs/arrow.md index 49121bbc..419808fc 100644 --- a/docs/arrow.md +++ b/docs/arrow.md @@ -487,11 +487,8 @@ cursor = connect( AioArrowCursor is a native asyncio cursor that returns results as Apache Arrow Tables. Unlike AsyncArrowCursor which uses `concurrent.futures`, this cursor uses -`asyncio.to_thread()` for result set creation, keeping the event loop free. - -The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. -By the time `execute()` returns, all data is already loaded into memory. -Therefore fetch methods, `as_arrow()`, and `as_polars()` are synchronous and do not need `await`. +`asyncio.to_thread()` for both result set creation and fetch operations, +keeping the event loop free. ```python from pyathena import aconnect @@ -517,9 +514,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", region_name="us-west-2") as conn: cursor = conn.cursor(AioArrowCursor) await cursor.execute("SELECT * FROM many_rows") - print(cursor.fetchone()) - print(cursor.fetchmany()) - print(cursor.fetchall()) + print(await cursor.fetchone()) + print(await cursor.fetchmany()) + print(await cursor.fetchall()) ``` ```python diff --git a/docs/pandas.md b/docs/pandas.md index a26c4bde..4e8886cb 100644 --- a/docs/pandas.md +++ b/docs/pandas.md @@ -774,11 +774,8 @@ cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", AioPandasCursor is a native asyncio cursor that returns results as pandas DataFrames. Unlike AsyncPandasCursor which uses `concurrent.futures`, this cursor uses -`asyncio.to_thread()` for result set creation, keeping the event loop free. - -The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. -By the time `execute()` returns, all data is already loaded into memory. -Therefore fetch methods and `as_pandas()` are synchronous and do not need `await`. +`asyncio.to_thread()` for both result set creation and fetch operations, +keeping the event loop free. ```python from pyathena import aconnect @@ -803,9 +800,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", region_name="us-west-2") as conn: cursor = conn.cursor(AioPandasCursor) await cursor.execute("SELECT * FROM many_rows") - print(cursor.fetchone()) - print(cursor.fetchmany()) - print(cursor.fetchall()) + print(await cursor.fetchone()) + print(await cursor.fetchmany()) + print(await cursor.fetchall()) ``` ```python @@ -833,9 +830,3 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", df = cursor.as_pandas() ``` -```{note} -When using AioPandasCursor with the `chunksize` option, `execute()` creates a lazy -`TextFileReader` instead of loading all data at once. Subsequent iteration via -`as_pandas()`, `fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that -are not wrapped in `asyncio.to_thread()` and will block the event loop. -``` diff --git a/docs/polars.md b/docs/polars.md index fed8887f..6c50cab7 100644 --- a/docs/polars.md +++ b/docs/polars.md @@ -581,11 +581,8 @@ for chunk in result_set.iter_chunks(): AioPolarsCursor is a native asyncio cursor that returns results as Polars DataFrames. Unlike AsyncPolarsCursor which uses `concurrent.futures`, this cursor uses -`asyncio.to_thread()` for result set creation, keeping the event loop free. - -The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. -By the time `execute()` returns, all data is already loaded into memory. -Therefore fetch methods, `as_polars()`, and `as_arrow()` are synchronous and do not need `await`. +`asyncio.to_thread()` for both result set creation and fetch operations, +keeping the event loop free. ```python from pyathena import aconnect @@ -610,9 +607,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", region_name="us-west-2") as conn: cursor = conn.cursor(AioPolarsCursor) await cursor.execute("SELECT * FROM many_rows") - print(cursor.fetchone()) - print(cursor.fetchmany()) - print(cursor.fetchall()) + print(await cursor.fetchone()) + print(await cursor.fetchmany()) + print(await cursor.fetchall()) ``` ```python @@ -653,9 +650,3 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", df = cursor.as_polars() ``` -```{note} -When using AioPolarsCursor with the `chunksize` option, `execute()` creates a lazy -reader instead of loading all data at once. Subsequent iteration via `as_polars()`, -`fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that are not wrapped -in `asyncio.to_thread()` and will block the event loop. -``` diff --git a/pyathena/aio/arrow/cursor.py b/pyathena/aio/arrow/cursor.py index cb7b9c2f..4ed51845 100644 --- a/pyathena/aio/arrow/cursor.py +++ b/pyathena/aio/arrow/cursor.py @@ -3,7 +3,7 @@ import asyncio import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast from pyathena.aio.common import WithAsyncFetch from pyathena.arrow.converter import ( @@ -25,9 +25,8 @@ class AioArrowCursor(WithAsyncFetch): """Native asyncio cursor that returns results as Apache Arrow Tables. - Uses ``asyncio.to_thread()`` to create the result set off the event loop. - Since ``AthenaArrowResultSet`` loads all data in ``__init__`` (via S3), - fetch methods are synchronous (in-memory only) and do not need to be async. + Uses ``asyncio.to_thread()`` for both result set creation and fetch + operations, keeping the event loop free. Example: >>> async with await pyathena.aconnect(...) as conn: @@ -153,6 +152,72 @@ async def execute( # type: ignore[override] raise OperationalError(query_execution.state_change_reason) return self + async def fetchone( # type: ignore[override] + self, + ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch the next row of the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop. + + Returns: + A tuple representing the next row, or None if no more rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaArrowResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchone) + + async def fetchmany( # type: ignore[override] + self, size: Optional[int] = None + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch multiple rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop. + + Args: + size: Maximum number of rows to fetch. Defaults to arraysize. + + Returns: + List of tuples representing the fetched rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaArrowResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchmany, size) + + async def fetchall( # type: ignore[override] + self, + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch all remaining rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop. + + Returns: + List of tuples representing all remaining rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaArrowResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchall) + + async def __anext__(self): + row = await self.fetchone() + if row is None: + raise StopAsyncIteration + return row + def as_arrow(self) -> "Table": """Return query results as an Apache Arrow Table. diff --git a/pyathena/aio/pandas/cursor.py b/pyathena/aio/pandas/cursor.py index fbee2625..9d0e0ca3 100644 --- a/pyathena/aio/pandas/cursor.py +++ b/pyathena/aio/pandas/cursor.py @@ -11,6 +11,7 @@ Iterable, List, Optional, + Tuple, Union, cast, ) @@ -34,9 +35,9 @@ class AioPandasCursor(WithAsyncFetch): """Native asyncio cursor that returns results as pandas DataFrames. - Uses ``asyncio.to_thread()`` to create the result set off the event loop. - Since ``AthenaPandasResultSet`` loads all data in ``__init__`` (via S3), - fetch methods are synchronous (in-memory only) and do not need to be async. + Uses ``asyncio.to_thread()`` for both result set creation and fetch + operations, keeping the event loop free. This is especially important + when ``chunksize`` is set, as fetch calls trigger lazy S3 reads. Example: >>> async with await pyathena.aconnect(...) as conn: @@ -183,6 +184,72 @@ async def execute( # type: ignore[override] raise OperationalError(query_execution.state_change_reason) return self + async def fetchone( # type: ignore[override] + self, + ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch the next row of the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Returns: + A tuple representing the next row, or None if no more rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPandasResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchone) + + async def fetchmany( # type: ignore[override] + self, size: Optional[int] = None + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch multiple rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Args: + size: Maximum number of rows to fetch. Defaults to arraysize. + + Returns: + List of tuples representing the fetched rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPandasResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchmany, size) + + async def fetchall( # type: ignore[override] + self, + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch all remaining rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Returns: + List of tuples representing all remaining rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPandasResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchall) + + async def __anext__(self): + row = await self.fetchone() + if row is None: + raise StopAsyncIteration + return row + def as_pandas(self) -> Union["DataFrame", PandasDataFrameIterator]: """Return DataFrame or PandasDataFrameIterator based on chunksize setting. diff --git a/pyathena/aio/polars/cursor.py b/pyathena/aio/polars/cursor.py index 1f897f9d..f12000eb 100644 --- a/pyathena/aio/polars/cursor.py +++ b/pyathena/aio/polars/cursor.py @@ -4,7 +4,7 @@ import asyncio import logging from multiprocessing import cpu_count -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast from pyathena.aio.common import WithAsyncFetch from pyathena.common import CursorIterator @@ -26,9 +26,9 @@ class AioPolarsCursor(WithAsyncFetch): """Native asyncio cursor that returns results as Polars DataFrames. - Uses ``asyncio.to_thread()`` to create the result set off the event loop. - Since ``AthenaPolarsResultSet`` loads all data in ``__init__`` (via S3), - fetch methods are synchronous (in-memory only) and do not need to be async. + Uses ``asyncio.to_thread()`` for both result set creation and fetch + operations, keeping the event loop free. This is especially important + when ``chunksize`` is set, as fetch calls trigger lazy S3 reads. Example: >>> async with await pyathena.aconnect(...) as conn: @@ -160,6 +160,72 @@ async def execute( # type: ignore[override] raise OperationalError(query_execution.state_change_reason) return self + async def fetchone( # type: ignore[override] + self, + ) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch the next row of the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Returns: + A tuple representing the next row, or None if no more rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPolarsResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchone) + + async def fetchmany( # type: ignore[override] + self, size: Optional[int] = None + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch multiple rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Args: + size: Maximum number of rows to fetch. Defaults to arraysize. + + Returns: + List of tuples representing the fetched rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPolarsResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchmany, size) + + async def fetchall( # type: ignore[override] + self, + ) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: + """Fetch all remaining rows from the result set. + + Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid + blocking the event loop when ``chunksize`` triggers lazy S3 reads. + + Returns: + List of tuples representing all remaining rows. + + Raises: + ProgrammingError: If no result set is available. + """ + if not self.has_result_set: + raise ProgrammingError("No result set.") + result_set = cast(AthenaPolarsResultSet, self.result_set) + return await asyncio.to_thread(result_set.fetchall) + + async def __anext__(self): + row = await self.fetchone() + if row is None: + raise StopAsyncIteration + return row + def as_polars(self) -> "pl.DataFrame": """Return query results as a Polars DataFrame. diff --git a/pyproject.toml b/pyproject.toml index 0bf7f15e..6106c255 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,6 +110,7 @@ version-file = "pyathena/_version.py" [tool.pytest.ini_options] asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" norecursedirs = [ "benchmarks", ".venv", diff --git a/tests/pyathena/aio/arrow/test_cursor.py b/tests/pyathena/aio/arrow/test_cursor.py index de5d13a5..efc8f6e6 100644 --- a/tests/pyathena/aio/arrow/test_cursor.py +++ b/tests/pyathena/aio/arrow/test_cursor.py @@ -11,20 +11,20 @@ class TestAioArrowCursor: async def test_fetchone(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT * FROM one_row") assert aio_arrow_cursor.rownumber == 0 - assert aio_arrow_cursor.fetchone() == (1,) + assert await aio_arrow_cursor.fetchone() == (1,) assert aio_arrow_cursor.rownumber == 1 - assert aio_arrow_cursor.fetchone() is None + assert await aio_arrow_cursor.fetchone() is None async def test_fetchmany(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT * FROM many_rows LIMIT 15") - assert len(aio_arrow_cursor.fetchmany(10)) == 10 - assert len(aio_arrow_cursor.fetchmany(10)) == 5 + assert len(await aio_arrow_cursor.fetchmany(10)) == 10 + assert len(await aio_arrow_cursor.fetchmany(10)) == 5 async def test_fetchall(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT * FROM one_row") - assert aio_arrow_cursor.fetchall() == [(1,)] + assert await aio_arrow_cursor.fetchall() == [(1,)] await aio_arrow_cursor.execute("SELECT a FROM many_rows ORDER BY a") - assert aio_arrow_cursor.fetchall() == [(i,) for i in range(10000)] + assert await aio_arrow_cursor.fetchall() == [(i,) for i in range(10000)] async def test_as_arrow(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT * FROM one_row") @@ -45,11 +45,11 @@ async def test_execute_returns_self(self, aio_arrow_cursor): async def test_no_result_set_raises(self, aio_arrow_cursor): with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchone() + await aio_arrow_cursor.fetchone() with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchmany() + await aio_arrow_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchall() + await aio_arrow_cursor.fetchall() with pytest.raises(ProgrammingError): aio_arrow_cursor.as_arrow() with pytest.raises(ProgrammingError): @@ -62,7 +62,7 @@ async def test_context_manager(self): try: async with conn.cursor() as cursor: await cursor.execute("SELECT * FROM one_row") - assert cursor.fetchone() == (1,) + assert await cursor.fetchone() == (1,) finally: conn.close() @@ -77,7 +77,7 @@ async def test_invalid_arraysize(self, aio_arrow_cursor): async def test_description(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT CAST(1 AS INT) AS foobar FROM one_row") - assert aio_arrow_cursor.fetchall() == [(1,)] + assert await aio_arrow_cursor.fetchall() == [(1,)] assert aio_arrow_cursor.description == [("foobar", "integer", None, None, 10, 0, "UNKNOWN")] async def test_description_initial(self, aio_arrow_cursor): @@ -92,11 +92,11 @@ async def test_executemany_fetch(self, aio_arrow_cursor): "SELECT %(x)d FROM one_row", [{"x": i} for i in range(1, 2)] ) with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchall() + await aio_arrow_cursor.fetchall() with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchmany() + await aio_arrow_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_arrow_cursor.fetchone() + await aio_arrow_cursor.fetchone() with pytest.raises(ProgrammingError): aio_arrow_cursor.as_arrow() with pytest.raises(ProgrammingError): @@ -109,8 +109,8 @@ async def test_executemany_fetch(self, aio_arrow_cursor): ) async def test_fetchone_unload(self, aio_arrow_cursor): await aio_arrow_cursor.execute("SELECT * FROM one_row") - assert aio_arrow_cursor.fetchone() == (1,) - assert aio_arrow_cursor.fetchone() is None + assert await aio_arrow_cursor.fetchone() == (1,) + assert await aio_arrow_cursor.fetchone() is None @pytest.mark.parametrize( "aio_arrow_cursor", diff --git a/tests/pyathena/aio/pandas/test_cursor.py b/tests/pyathena/aio/pandas/test_cursor.py index 0b615c04..6e3794e7 100644 --- a/tests/pyathena/aio/pandas/test_cursor.py +++ b/tests/pyathena/aio/pandas/test_cursor.py @@ -11,20 +11,20 @@ class TestAioPandasCursor: async def test_fetchone(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT * FROM one_row") assert aio_pandas_cursor.rownumber == 0 - assert aio_pandas_cursor.fetchone() == (1,) + assert await aio_pandas_cursor.fetchone() == (1,) assert aio_pandas_cursor.rownumber == 1 - assert aio_pandas_cursor.fetchone() is None + assert await aio_pandas_cursor.fetchone() is None async def test_fetchmany(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT * FROM many_rows LIMIT 15") - assert len(aio_pandas_cursor.fetchmany(10)) == 10 - assert len(aio_pandas_cursor.fetchmany(10)) == 5 + assert len(await aio_pandas_cursor.fetchmany(10)) == 10 + assert len(await aio_pandas_cursor.fetchmany(10)) == 5 async def test_fetchall(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT * FROM one_row") - assert aio_pandas_cursor.fetchall() == [(1,)] + assert await aio_pandas_cursor.fetchall() == [(1,)] await aio_pandas_cursor.execute("SELECT a FROM many_rows ORDER BY a") - assert aio_pandas_cursor.fetchall() == [(i,) for i in range(10000)] + assert await aio_pandas_cursor.fetchall() == [(i,) for i in range(10000)] async def test_as_pandas(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT * FROM one_row") @@ -39,11 +39,11 @@ async def test_execute_returns_self(self, aio_pandas_cursor): async def test_no_result_set_raises(self, aio_pandas_cursor): with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchone() + await aio_pandas_cursor.fetchone() with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchmany() + await aio_pandas_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchall() + await aio_pandas_cursor.fetchall() with pytest.raises(ProgrammingError): aio_pandas_cursor.as_pandas() @@ -54,7 +54,7 @@ async def test_context_manager(self): try: async with conn.cursor() as cursor: await cursor.execute("SELECT * FROM one_row") - assert cursor.fetchone() == (1,) + assert await cursor.fetchone() == (1,) finally: conn.close() @@ -69,7 +69,7 @@ async def test_invalid_arraysize(self, aio_pandas_cursor): async def test_description(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT CAST(1 AS INT) AS foobar FROM one_row") - assert aio_pandas_cursor.fetchall() == [(1,)] + assert await aio_pandas_cursor.fetchall() == [(1,)] assert aio_pandas_cursor.description == [ ("foobar", "integer", None, None, 10, 0, "UNKNOWN") ] @@ -86,11 +86,11 @@ async def test_executemany_fetch(self, aio_pandas_cursor): "SELECT %(x)d FROM one_row", [{"x": i} for i in range(1, 2)] ) with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchall() + await aio_pandas_cursor.fetchall() with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchmany() + await aio_pandas_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_pandas_cursor.fetchone() + await aio_pandas_cursor.fetchone() with pytest.raises(ProgrammingError): aio_pandas_cursor.as_pandas() @@ -101,8 +101,8 @@ async def test_executemany_fetch(self, aio_pandas_cursor): ) async def test_fetchone_unload(self, aio_pandas_cursor): await aio_pandas_cursor.execute("SELECT * FROM one_row") - assert aio_pandas_cursor.fetchone() == (1,) - assert aio_pandas_cursor.fetchone() is None + assert await aio_pandas_cursor.fetchone() == (1,) + assert await aio_pandas_cursor.fetchone() is None @pytest.mark.parametrize( "aio_pandas_cursor", diff --git a/tests/pyathena/aio/polars/test_cursor.py b/tests/pyathena/aio/polars/test_cursor.py index 17242a4f..81309fe5 100644 --- a/tests/pyathena/aio/polars/test_cursor.py +++ b/tests/pyathena/aio/polars/test_cursor.py @@ -11,20 +11,20 @@ class TestAioPolarsCursor: async def test_fetchone(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT * FROM one_row") assert aio_polars_cursor.rownumber == 0 - assert aio_polars_cursor.fetchone() == (1,) + assert await aio_polars_cursor.fetchone() == (1,) assert aio_polars_cursor.rownumber == 1 - assert aio_polars_cursor.fetchone() is None + assert await aio_polars_cursor.fetchone() is None async def test_fetchmany(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT * FROM many_rows LIMIT 15") - assert len(aio_polars_cursor.fetchmany(10)) == 10 - assert len(aio_polars_cursor.fetchmany(10)) == 5 + assert len(await aio_polars_cursor.fetchmany(10)) == 10 + assert len(await aio_polars_cursor.fetchmany(10)) == 5 async def test_fetchall(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT * FROM one_row") - assert aio_polars_cursor.fetchall() == [(1,)] + assert await aio_polars_cursor.fetchall() == [(1,)] await aio_polars_cursor.execute("SELECT a FROM many_rows ORDER BY a") - assert aio_polars_cursor.fetchall() == [(i,) for i in range(10000)] + assert await aio_polars_cursor.fetchall() == [(i,) for i in range(10000)] async def test_as_polars(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT * FROM one_row") @@ -44,11 +44,11 @@ async def test_execute_returns_self(self, aio_polars_cursor): async def test_no_result_set_raises(self, aio_polars_cursor): with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchone() + await aio_polars_cursor.fetchone() with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchmany() + await aio_polars_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchall() + await aio_polars_cursor.fetchall() with pytest.raises(ProgrammingError): aio_polars_cursor.as_polars() with pytest.raises(ProgrammingError): @@ -61,7 +61,7 @@ async def test_context_manager(self): try: async with conn.cursor() as cursor: await cursor.execute("SELECT * FROM one_row") - assert cursor.fetchone() == (1,) + assert await cursor.fetchone() == (1,) finally: conn.close() @@ -76,7 +76,7 @@ async def test_invalid_arraysize(self, aio_polars_cursor): async def test_description(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT CAST(1 AS INT) AS foobar FROM one_row") - assert aio_polars_cursor.fetchall() == [(1,)] + assert await aio_polars_cursor.fetchall() == [(1,)] assert aio_polars_cursor.description == [ ("foobar", "integer", None, None, 10, 0, "UNKNOWN") ] @@ -93,11 +93,11 @@ async def test_executemany_fetch(self, aio_polars_cursor): "SELECT %(x)d FROM one_row", [{"x": i} for i in range(1, 2)] ) with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchall() + await aio_polars_cursor.fetchall() with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchmany() + await aio_polars_cursor.fetchmany() with pytest.raises(ProgrammingError): - aio_polars_cursor.fetchone() + await aio_polars_cursor.fetchone() with pytest.raises(ProgrammingError): aio_polars_cursor.as_polars() with pytest.raises(ProgrammingError): @@ -110,8 +110,8 @@ async def test_executemany_fetch(self, aio_polars_cursor): ) async def test_fetchone_unload(self, aio_polars_cursor): await aio_polars_cursor.execute("SELECT * FROM one_row") - assert aio_polars_cursor.fetchone() == (1,) - assert aio_polars_cursor.fetchone() is None + assert await aio_polars_cursor.fetchone() == (1,) + assert await aio_polars_cursor.fetchone() is None @pytest.mark.parametrize( "aio_polars_cursor",