Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 9 additions & 28 deletions docs/aio.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,38 +177,19 @@ Native asyncio versions are available for all cursor types:

### Fetch behavior

For **AioPandasCursor**, **AioArrowCursor**, and **AioPolarsCursor**, the S3 download
(CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`.
By the time `execute()` returns, all data is already loaded into memory.
Therefore `fetchone()`, `fetchall()`, `as_pandas()`, `as_arrow()`, and `as_polars()`
are synchronous (in-memory only) and do not need `await`:
All aio cursors use `await` for fetch operations. The S3 download (CSV or Parquet)
happens inside `execute()`, wrapped in `asyncio.to_thread()`. Fetch methods are also
wrapped in `asyncio.to_thread()` to ensure the event loop is never blocked — this is
especially important when `chunksize` is set, as fetch calls trigger lazy S3 reads.

```python
# Pandas, Arrow, Polars — S3 download completes during execute()
await cursor.execute("SELECT * FROM many_rows") # Downloads data here
row = cursor.fetchone() # No await — data already in memory
rows = cursor.fetchall() # No await
df = cursor.as_pandas() # No await
```

The exceptions are **AioCursor** and **AioS3FSCursor**, which stream rows lazily from S3.
Their fetch methods perform I/O and require `await`:

```python
# AioCursor, AioS3FSCursor — fetch reads from S3 lazily
await cursor.execute("SELECT * FROM many_rows")
row = await cursor.fetchone() # Await required — reads from S3
rows = await cursor.fetchall() # Await required
row = await cursor.fetchone()
rows = await cursor.fetchall()
df = cursor.as_pandas() # In-memory conversion, no await needed
```

```{note}
When using AioPandasCursor or AioPolarsCursor with the `chunksize` option,
`execute()` creates a lazy reader (e.g., pandas `TextFileReader`) instead of
loading all data at once. Subsequent iteration via `as_pandas()`, `fetchone()`,
or `async for` triggers chunk-by-chunk S3 reads that are **not** wrapped in
`asyncio.to_thread()` and will block the event loop. If you need chunked
processing in an async application, consider wrapping the iteration in
`asyncio.to_thread()` yourself, or use the default non-chunked mode.
```
The `as_pandas()`, `as_arrow()`, and `as_polars()` convenience methods operate on
already-loaded data and remain synchronous.

See each cursor's documentation page for detailed usage examples.
13 changes: 5 additions & 8 deletions docs/arrow.md
Original file line number Diff line number Diff line change
Expand Up @@ -487,11 +487,8 @@ cursor = connect(

AioArrowCursor is a native asyncio cursor that returns results as Apache Arrow Tables.
Unlike AsyncArrowCursor which uses `concurrent.futures`, this cursor uses
`asyncio.to_thread()` for result set creation, keeping the event loop free.

The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`.
By the time `execute()` returns, all data is already loaded into memory.
Therefore fetch methods, `as_arrow()`, and `as_polars()` are synchronous and do not need `await`.
`asyncio.to_thread()` for both result set creation and fetch operations,
keeping the event loop free.

```python
from pyathena import aconnect
Expand All @@ -517,9 +514,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
cursor = conn.cursor(AioArrowCursor)
await cursor.execute("SELECT * FROM many_rows")
print(cursor.fetchone())
print(cursor.fetchmany())
print(cursor.fetchall())
print(await cursor.fetchone())
print(await cursor.fetchmany())
print(await cursor.fetchall())
```

```python
Expand Down
19 changes: 5 additions & 14 deletions docs/pandas.md
Original file line number Diff line number Diff line change
Expand Up @@ -774,11 +774,8 @@ cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",

AioPandasCursor is a native asyncio cursor that returns results as pandas DataFrames.
Unlike AsyncPandasCursor which uses `concurrent.futures`, this cursor uses
`asyncio.to_thread()` for result set creation, keeping the event loop free.

The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`.
By the time `execute()` returns, all data is already loaded into memory.
Therefore fetch methods and `as_pandas()` are synchronous and do not need `await`.
`asyncio.to_thread()` for both result set creation and fetch operations,
keeping the event loop free.

```python
from pyathena import aconnect
Expand All @@ -803,9 +800,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
cursor = conn.cursor(AioPandasCursor)
await cursor.execute("SELECT * FROM many_rows")
print(cursor.fetchone())
print(cursor.fetchmany())
print(cursor.fetchall())
print(await cursor.fetchone())
print(await cursor.fetchmany())
print(await cursor.fetchall())
```

```python
Expand Down Expand Up @@ -833,9 +830,3 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
df = cursor.as_pandas()
```

```{note}
When using AioPandasCursor with the `chunksize` option, `execute()` creates a lazy
`TextFileReader` instead of loading all data at once. Subsequent iteration via
`as_pandas()`, `fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that
are not wrapped in `asyncio.to_thread()` and will block the event loop.
```
19 changes: 5 additions & 14 deletions docs/polars.md
Original file line number Diff line number Diff line change
Expand Up @@ -581,11 +581,8 @@ for chunk in result_set.iter_chunks():

AioPolarsCursor is a native asyncio cursor that returns results as Polars DataFrames.
Unlike AsyncPolarsCursor which uses `concurrent.futures`, this cursor uses
`asyncio.to_thread()` for result set creation, keeping the event loop free.

The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`.
By the time `execute()` returns, all data is already loaded into memory.
Therefore fetch methods, `as_polars()`, and `as_arrow()` are synchronous and do not need `await`.
`asyncio.to_thread()` for both result set creation and fetch operations,
keeping the event loop free.

```python
from pyathena import aconnect
Expand All @@ -610,9 +607,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2") as conn:
cursor = conn.cursor(AioPolarsCursor)
await cursor.execute("SELECT * FROM many_rows")
print(cursor.fetchone())
print(cursor.fetchmany())
print(cursor.fetchall())
print(await cursor.fetchone())
print(await cursor.fetchmany())
print(await cursor.fetchall())
```

```python
Expand Down Expand Up @@ -653,9 +650,3 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
df = cursor.as_polars()
```

```{note}
When using AioPolarsCursor with the `chunksize` option, `execute()` creates a lazy
reader instead of loading all data at once. Subsequent iteration via `as_polars()`,
`fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that are not wrapped
in `asyncio.to_thread()` and will block the event loop.
```
73 changes: 69 additions & 4 deletions pyathena/aio/arrow/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import asyncio
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast

from pyathena.aio.common import WithAsyncFetch
from pyathena.arrow.converter import (
Expand All @@ -25,9 +25,8 @@
class AioArrowCursor(WithAsyncFetch):
"""Native asyncio cursor that returns results as Apache Arrow Tables.

Uses ``asyncio.to_thread()`` to create the result set off the event loop.
Since ``AthenaArrowResultSet`` loads all data in ``__init__`` (via S3),
fetch methods are synchronous (in-memory only) and do not need to be async.
Uses ``asyncio.to_thread()`` for both result set creation and fetch
operations, keeping the event loop free.

Example:
>>> async with await pyathena.aconnect(...) as conn:
Expand Down Expand Up @@ -153,6 +152,72 @@ async def execute( # type: ignore[override]
raise OperationalError(query_execution.state_change_reason)
return self

async def fetchone( # type: ignore[override]
self,
) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
"""Fetch the next row of the result set.

Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
blocking the event loop.

Returns:
A tuple representing the next row, or None if no more rows.

Raises:
ProgrammingError: If no result set is available.
"""
if not self.has_result_set:
raise ProgrammingError("No result set.")
result_set = cast(AthenaArrowResultSet, self.result_set)
return await asyncio.to_thread(result_set.fetchone)

async def fetchmany( # type: ignore[override]
self, size: Optional[int] = None
) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
"""Fetch multiple rows from the result set.

Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
blocking the event loop.

Args:
size: Maximum number of rows to fetch. Defaults to arraysize.

Returns:
List of tuples representing the fetched rows.

Raises:
ProgrammingError: If no result set is available.
"""
if not self.has_result_set:
raise ProgrammingError("No result set.")
result_set = cast(AthenaArrowResultSet, self.result_set)
return await asyncio.to_thread(result_set.fetchmany, size)

async def fetchall( # type: ignore[override]
self,
) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
"""Fetch all remaining rows from the result set.

Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
blocking the event loop.

Returns:
List of tuples representing all remaining rows.

Raises:
ProgrammingError: If no result set is available.
"""
if not self.has_result_set:
raise ProgrammingError("No result set.")
result_set = cast(AthenaArrowResultSet, self.result_set)
return await asyncio.to_thread(result_set.fetchall)

async def __anext__(self):
row = await self.fetchone()
if row is None:
raise StopAsyncIteration
return row

def as_arrow(self) -> "Table":
"""Return query results as an Apache Arrow Table.

Expand Down
73 changes: 70 additions & 3 deletions pyathena/aio/pandas/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Iterable,
List,
Optional,
Tuple,
Union,
cast,
)
Expand All @@ -34,9 +35,9 @@
class AioPandasCursor(WithAsyncFetch):
"""Native asyncio cursor that returns results as pandas DataFrames.

Uses ``asyncio.to_thread()`` to create the result set off the event loop.
Since ``AthenaPandasResultSet`` loads all data in ``__init__`` (via S3),
fetch methods are synchronous (in-memory only) and do not need to be async.
Uses ``asyncio.to_thread()`` for both result set creation and fetch
operations, keeping the event loop free. This is especially important
when ``chunksize`` is set, as fetch calls trigger lazy S3 reads.

Example:
>>> async with await pyathena.aconnect(...) as conn:
Expand Down Expand Up @@ -183,6 +184,72 @@ async def execute( # type: ignore[override]
raise OperationalError(query_execution.state_change_reason)
return self

async def fetchone( # type: ignore[override]
self,
) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
"""Fetch the next row of the result set.

Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
blocking the event loop when ``chunksize`` triggers lazy S3 reads.

Returns:
A tuple representing the next row, or None if no more rows.

Raises:
ProgrammingError: If no result set is available.
"""
if not self.has_result_set:
raise ProgrammingError("No result set.")
result_set = cast(AthenaPandasResultSet, self.result_set)
return await asyncio.to_thread(result_set.fetchone)

async def fetchmany( # type: ignore[override]
self, size: Optional[int] = None
) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
"""Fetch multiple rows from the result set.

Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
blocking the event loop when ``chunksize`` triggers lazy S3 reads.

Args:
size: Maximum number of rows to fetch. Defaults to arraysize.

Returns:
List of tuples representing the fetched rows.

Raises:
ProgrammingError: If no result set is available.
"""
if not self.has_result_set:
raise ProgrammingError("No result set.")
result_set = cast(AthenaPandasResultSet, self.result_set)
return await asyncio.to_thread(result_set.fetchmany, size)

async def fetchall( # type: ignore[override]
self,
) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
"""Fetch all remaining rows from the result set.

Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
blocking the event loop when ``chunksize`` triggers lazy S3 reads.

Returns:
List of tuples representing all remaining rows.

Raises:
ProgrammingError: If no result set is available.
"""
if not self.has_result_set:
raise ProgrammingError("No result set.")
result_set = cast(AthenaPandasResultSet, self.result_set)
return await asyncio.to_thread(result_set.fetchall)

async def __anext__(self):
row = await self.fetchone()
if row is None:
raise StopAsyncIteration
return row

def as_pandas(self) -> Union["DataFrame", PandasDataFrameIterator]:
"""Return DataFrame or PandasDataFrameIterator based on chunksize setting.

Expand Down
Loading