From 84cd32b3852eaebafc0f28cb6630142a55f45eae Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 21 Feb 2026 13:34:39 +0900 Subject: [PATCH 1/5] Add documentation for native asyncio cursors (aio module) Add comprehensive documentation for the native asyncio cursor implementations added in PRs #666, #667, #668. This includes a new docs/aio.md overview page, AioCursor sections in each specialized cursor page, API reference for the aio module, and an async example in the README. Co-Authored-By: Claude Opus 4.6 --- README.md | 16 ++++ docs/aio.md | 201 +++++++++++++++++++++++++++++++++++++++++++++++ docs/api.md | 2 + docs/api/aio.rst | 80 +++++++++++++++++++ docs/arrow.md | 90 +++++++++++++++++++++ docs/cursor.md | 28 +++++++ docs/index.md | 1 + docs/pandas.md | 64 +++++++++++++++ docs/polars.md | 77 ++++++++++++++++++ docs/s3fs.md | 54 +++++++++++++ docs/spark.md | 66 ++++++++++++++++ 11 files changed, 679 insertions(+) create mode 100644 docs/aio.md create mode 100644 docs/api/aio.rst diff --git a/README.md b/README.md index 27b9e186..8ff964fa 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,22 @@ print(cursor.description) print(cursor.fetchall()) ``` +### Async + +```python +import asyncio +from pyathena import aconnect + +async def main(): + async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor() + await cursor.execute("SELECT 1") + print(await cursor.fetchone()) + +asyncio.run(main()) +``` + ## License [MIT license](LICENSE) diff --git a/docs/aio.md b/docs/aio.md new file mode 100644 index 00000000..7e3d8388 --- /dev/null +++ b/docs/aio.md @@ -0,0 +1,201 @@ +(aio)= + +# Native Asyncio Cursors + +PyAthena provides native asyncio cursor implementations under `pyathena.aio`. +These cursors use `asyncio.sleep` for polling and `asyncio.to_thread` for boto3 calls, +keeping the event loop free without relying on thread pools for concurrency. + +## Why native asyncio? + +PyAthena has two families of async cursors: + +| | AsyncCursor | AioCursor | +|---|---|---| +| **Concurrency model** | `concurrent.futures.ThreadPoolExecutor` | Native `asyncio` (`await` / `async for`) | +| **Event loop** | Blocks a thread per query | Non-blocking | +| **Connection** | `connect()` (sync) | `aconnect()` (async) | +| **execute()** returns | `(query_id, Future)` | Awaitable cursor (self) | +| **Fetch methods** | Sync (via `Future.result()`) | `await cursor.fetchone()` for streaming cursors | +| **Iteration** | `for row in result_set` | `async for row in cursor` | +| **Context manager** | `with conn.cursor() as cursor` | `async with conn.cursor() as cursor` | +| **Best for** | Adding concurrency to sync code | Async frameworks (FastAPI, aiohttp, etc.) | + +Choose `AioCursor` when your application already uses `asyncio` (e.g., web frameworks, +async pipelines). Choose `AsyncCursor` when you want simple parallel query execution +from synchronous code. + +(aio-connection)= + +## Connection + +Use the `aconnect()` function to create an async connection. +It returns an `AioConnection` that produces `AioCursor` instances by default. + +```python +from pyathena import aconnect + +conn = await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") +``` + +The connection supports the async context manager protocol: + +```python +from pyathena import aconnect + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor() + await cursor.execute("SELECT 1") + print(await cursor.fetchone()) +``` + +(aio-cursor)= + +## AioCursor + +AioCursor is a native asyncio cursor that uses `await` for query execution and result fetching. +It follows the DB API 2.0 interface adapted for async usage. + +```python +from pyathena import aconnect +from pyathena.aio.cursor import AioCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor() + await cursor.execute("SELECT * FROM many_rows") + print(await cursor.fetchone()) + print(await cursor.fetchmany(10)) + print(await cursor.fetchall()) +``` + +The cursor supports the `async with` context manager: + +```python +from pyathena import aconnect + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM many_rows") + rows = await cursor.fetchall() +``` + +You can iterate over results with `async for`: + +```python +from pyathena import aconnect + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM many_rows") + async for row in cursor: + print(row) +``` + +Execution information of the query can also be retrieved: + +```python +from pyathena import aconnect + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM many_rows") + print(cursor.state) + print(cursor.state_change_reason) + print(cursor.completion_date_time) + print(cursor.submission_date_time) + print(cursor.data_scanned_in_bytes) + print(cursor.engine_execution_time_in_millis) + print(cursor.query_queue_time_in_millis) + print(cursor.total_execution_time_in_millis) + print(cursor.query_planning_time_in_millis) + print(cursor.service_processing_time_in_millis) + print(cursor.output_location) +``` + +To cancel a running query: + +```python +from pyathena import aconnect + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM many_rows") + await cursor.cancel() +``` + +(aio-dict-cursor)= + +## AioDictCursor + +AioDictCursor is an AioCursor that returns rows as dictionaries with column names as keys. + +```python +from pyathena import aconnect +from pyathena.aio.cursor import AioDictCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioDictCursor) + await cursor.execute("SELECT * FROM many_rows LIMIT 10") + async for row in cursor: + print(row["a"]) +``` + +If you want to change the dictionary type (e.g., use OrderedDict): + +```python +from collections import OrderedDict +from pyathena import aconnect +from pyathena.aio.cursor import AioDictCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioDictCursor, dict_type=OrderedDict) + await cursor.execute("SELECT * FROM many_rows LIMIT 10") + async for row in cursor: + print(row) +``` + +## Specialized Aio Cursors + +Native asyncio versions are available for all cursor types: + +| Cursor | Module | Result format | +|--------|--------|---------------| +| {ref}`AioPandasCursor ` | `pyathena.aio.pandas.cursor` | pandas DataFrame | +| {ref}`AioArrowCursor ` | `pyathena.aio.arrow.cursor` | pyarrow Table | +| {ref}`AioPolarsCursor ` | `pyathena.aio.polars.cursor` | polars DataFrame | +| {ref}`AioS3FSCursor ` | `pyathena.aio.s3fs.cursor` | Row tuples (lightweight) | +| {ref}`AioSparkCursor ` | `pyathena.aio.spark.cursor` | PySpark execution | + +### Fetch behavior + +Most aio cursors load all result data eagerly during `execute()` (via `asyncio.to_thread`), +so `fetchone()`, `fetchmany()`, and `fetchall()` are synchronous (in-memory only): + +```python +# Pandas, Arrow, Polars — fetch is sync (data already loaded) +await cursor.execute("SELECT * FROM many_rows") +row = cursor.fetchone() # No await needed +rows = cursor.fetchall() # No await needed +df = cursor.as_pandas() # No await needed +``` + +The exceptions are **AioCursor** and **AioS3FSCursor**, which stream rows lazily from S3. +Their fetch methods require `await`: + +```python +# AioCursor, AioS3FSCursor — fetch is async (reads from S3) +await cursor.execute("SELECT * FROM many_rows") +row = await cursor.fetchone() # Await required +rows = await cursor.fetchall() # Await required +``` + +See each cursor's documentation page for detailed usage examples. diff --git a/docs/api.md b/docs/api.md index 6478d7ab..f2f11c3e 100644 --- a/docs/api.md +++ b/docs/api.md @@ -20,6 +20,7 @@ api/arrow api/polars api/s3fs api/spark +api/aio ``` ## Quick reference @@ -44,3 +45,4 @@ api/spark - {ref}`api_polars` - Polars DataFrame integration (no pyarrow required) - {ref}`api_s3fs` - Lightweight S3FS-based cursor (no pandas/pyarrow required) - {ref}`api_spark` - Apache Spark integration for big data processing +- {ref}`api_aio` - Native asyncio cursor implementations diff --git a/docs/api/aio.rst b/docs/api/aio.rst new file mode 100644 index 00000000..4bf5a569 --- /dev/null +++ b/docs/api/aio.rst @@ -0,0 +1,80 @@ +.. _api_aio: + +Native Asyncio +============== + +This section covers the native asyncio connection, cursors, and base classes. + +Connection +---------- + +.. automodule:: pyathena + :members: aconnect + +.. autoclass:: pyathena.aio.connection.AioConnection + :members: + :inherited-members: + +Aio Cursors +----------- + +.. autoclass:: pyathena.aio.cursor.AioCursor + :members: + :inherited-members: + +.. autoclass:: pyathena.aio.cursor.AioDictCursor + :members: + :inherited-members: + +Aio Result Set +-------------- + +.. autoclass:: pyathena.aio.result_set.AthenaAioResultSet + :members: + :inherited-members: + +Aio Base Classes +---------------- + +.. autoclass:: pyathena.aio.common.AioBaseCursor + :members: + :inherited-members: + +.. autoclass:: pyathena.aio.common.WithAsyncFetch + :members: + :inherited-members: + +Aio Pandas Cursor +----------------- + +.. autoclass:: pyathena.aio.pandas.cursor.AioPandasCursor + :members: + :inherited-members: + +Aio Arrow Cursor +---------------- + +.. autoclass:: pyathena.aio.arrow.cursor.AioArrowCursor + :members: + :inherited-members: + +Aio Polars Cursor +----------------- + +.. autoclass:: pyathena.aio.polars.cursor.AioPolarsCursor + :members: + :inherited-members: + +Aio S3FS Cursor +--------------- + +.. autoclass:: pyathena.aio.s3fs.cursor.AioS3FSCursor + :members: + :inherited-members: + +Aio Spark Cursor +---------------- + +.. autoclass:: pyathena.aio.spark.cursor.AioSparkCursor + :members: + :inherited-members: diff --git a/docs/arrow.md b/docs/arrow.md index 1917d408..fe063215 100644 --- a/docs/arrow.md +++ b/docs/arrow.md @@ -480,3 +480,93 @@ cursor = connect( region_name="us-west-2" ).cursor(AsyncArrowCursor, connect_timeout=10.0, request_timeout=30.0) ``` + +(aio-arrow-cursor)= + +## AioArrowCursor + +AioArrowCursor is a native asyncio cursor that returns results as Apache Arrow Tables. +Unlike AsyncArrowCursor which uses `concurrent.futures`, this cursor uses +`asyncio.to_thread()` for result set creation, keeping the event loop free. + +Since the result set is loaded eagerly during `execute()`, fetch methods, `as_arrow()`, +and `as_polars()` are synchronous (in-memory only) and do not need `await`. + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor) + table = (await cursor.execute("SELECT * FROM many_rows")).as_arrow() + print(table) + print(table.column_names) + print(table.num_rows) + print(table.schema) +``` + +Support fetch and iterate query results: + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor) + await cursor.execute("SELECT * FROM many_rows") + print(cursor.fetchone()) + print(cursor.fetchmany()) + print(cursor.fetchall()) +``` + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor) + await cursor.execute("SELECT * FROM many_rows") + async for row in cursor: + print(row) +``` + +The `as_polars()` method converts the result to a Polars DataFrame: + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor) + await cursor.execute("SELECT * FROM many_rows") + df = cursor.as_polars() +``` + +The unload option is also available: + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor, unload=True) + await cursor.execute("SELECT * FROM many_rows") + table = cursor.as_arrow() +``` + +AioArrowCursor also supports S3 timeout configuration: + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor, connect_timeout=10.0, request_timeout=30.0) + await cursor.execute("SELECT * FROM many_rows") +``` diff --git a/docs/cursor.md b/docs/cursor.md index aaa8966b..9e780def 100644 --- a/docs/cursor.md +++ b/docs/cursor.md @@ -294,6 +294,14 @@ cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", ``` +## AioCursor + +See {ref}`aio-cursor`. + +## AioDictCursor + +See {ref}`aio-dict-cursor`. + ## PandasCursor See {ref}`pandas-cursor`. @@ -302,6 +310,10 @@ See {ref}`pandas-cursor`. See {ref}`async-pandas-cursor`. +## AioPandasCursor + +See {ref}`aio-pandas-cursor`. + ## ArrowCursor See {ref}`arrow-cursor`. @@ -310,6 +322,10 @@ See {ref}`arrow-cursor`. See {ref}`async-arrow-cursor`. +## AioArrowCursor + +See {ref}`aio-arrow-cursor`. + ## PolarsCursor See {ref}`polars-cursor`. @@ -318,6 +334,10 @@ See {ref}`polars-cursor`. See {ref}`async-polars-cursor`. +## AioPolarsCursor + +See {ref}`aio-polars-cursor`. + ## S3FSCursor See {ref}`s3fs-cursor`. @@ -326,6 +346,10 @@ See {ref}`s3fs-cursor`. See {ref}`async-s3fs-cursor`. +## AioS3FSCursor + +See {ref}`aio-s3fs-cursor`. + ## SparkCursor See {ref}`spark-cursor`. @@ -334,5 +358,9 @@ See {ref}`spark-cursor`. See {ref}`async-spark-cursor`. +## AioSparkCursor + +See {ref}`aio-spark-cursor`. + For detailed API documentation of all cursor classes and their methods, see the {ref}`api` section. diff --git a/docs/index.md b/docs/index.md index 68761d03..42adab15 100644 --- a/docs/index.md +++ b/docs/index.md @@ -52,6 +52,7 @@ arrow polars s3fs spark +aio ``` ::: diff --git a/docs/pandas.md b/docs/pandas.md index 61c37ad6..e0e5364a 100644 --- a/docs/pandas.md +++ b/docs/pandas.md @@ -767,3 +767,67 @@ cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", region_name="us-west-2", cursor_class=AsyncPandasCursor).cursor(unload=True) ``` + +(aio-pandas-cursor)= + +## AioPandasCursor + +AioPandasCursor is a native asyncio cursor that returns results as pandas DataFrames. +Unlike AsyncPandasCursor which uses `concurrent.futures`, this cursor uses +`asyncio.to_thread()` for result set creation, keeping the event loop free. + +Since the result set is loaded eagerly during `execute()`, fetch methods and `as_pandas()` +are synchronous (in-memory only) and do not need `await`. + +```python +from pyathena import aconnect +from pyathena.aio.pandas.cursor import AioPandasCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPandasCursor) + await cursor.execute("SELECT * FROM many_rows") + df = cursor.as_pandas() + print(df.describe()) + print(df.head()) +``` + +Support fetch and iterate query results: + +```python +from pyathena import aconnect +from pyathena.aio.pandas.cursor import AioPandasCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPandasCursor) + await cursor.execute("SELECT * FROM many_rows") + print(cursor.fetchone()) + print(cursor.fetchmany()) + print(cursor.fetchall()) +``` + +```python +from pyathena import aconnect +from pyathena.aio.pandas.cursor import AioPandasCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPandasCursor) + await cursor.execute("SELECT * FROM many_rows") + async for row in cursor: + print(row) +``` + +The unload option is also available: + +```python +from pyathena import aconnect +from pyathena.aio.pandas.cursor import AioPandasCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPandasCursor, unload=True) + await cursor.execute("SELECT * FROM many_rows") + df = cursor.as_pandas() +``` diff --git a/docs/polars.md b/docs/polars.md index ffc78485..cf593d69 100644 --- a/docs/polars.md +++ b/docs/polars.md @@ -574,3 +574,80 @@ result_set = future.result() for chunk in result_set.iter_chunks(): process_chunk(chunk) ``` + +(aio-polars-cursor)= + +## AioPolarsCursor + +AioPolarsCursor is a native asyncio cursor that returns results as Polars DataFrames. +Unlike AsyncPolarsCursor which uses `concurrent.futures`, this cursor uses +`asyncio.to_thread()` for result set creation, keeping the event loop free. + +Since the result set is loaded eagerly during `execute()`, fetch methods, `as_polars()`, +and `as_arrow()` are synchronous (in-memory only) and do not need `await`. + +```python +from pyathena import aconnect +from pyathena.aio.polars.cursor import AioPolarsCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPolarsCursor) + await cursor.execute("SELECT * FROM many_rows") + df = cursor.as_polars() + print(df.describe()) + print(df.head()) +``` + +Support fetch and iterate query results: + +```python +from pyathena import aconnect +from pyathena.aio.polars.cursor import AioPolarsCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPolarsCursor) + await cursor.execute("SELECT * FROM many_rows") + print(cursor.fetchone()) + print(cursor.fetchmany()) + print(cursor.fetchall()) +``` + +```python +from pyathena import aconnect +from pyathena.aio.polars.cursor import AioPolarsCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPolarsCursor) + await cursor.execute("SELECT * FROM many_rows") + async for row in cursor: + print(row) +``` + +The `as_arrow()` method converts the result to an Apache Arrow Table: + +```python +from pyathena import aconnect +from pyathena.aio.polars.cursor import AioPolarsCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPolarsCursor) + await cursor.execute("SELECT * FROM many_rows") + table = cursor.as_arrow() +``` + +The unload option is also available: + +```python +from pyathena import aconnect +from pyathena.aio.polars.cursor import AioPolarsCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPolarsCursor, unload=True) + await cursor.execute("SELECT * FROM many_rows") + df = cursor.as_polars() +``` diff --git a/docs/s3fs.md b/docs/s3fs.md index d25d1a69..6d0ed511 100644 --- a/docs/s3fs.md +++ b/docs/s3fs.md @@ -366,3 +366,57 @@ cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", query_id, future = cursor.execute("SELECT * FROM many_rows") cursor.cancel(query_id) ``` + +(aio-s3fs-cursor)= + +## AioS3FSCursor + +AioS3FSCursor is a native asyncio cursor that uses the same lightweight CSV parsing as S3FSCursor. +Unlike AsyncS3FSCursor which uses `concurrent.futures`, this cursor uses +`asyncio.to_thread()` for both result set creation and fetch operations, +keeping the event loop free. + +Since `AthenaS3FSResultSet` lazily streams rows from S3 via a CSV reader, +fetch methods are async and require `await`. + +```python +from pyathena import aconnect +from pyathena.aio.s3fs.cursor import AioS3FSCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioS3FSCursor) + await cursor.execute("SELECT * FROM many_rows") + print(await cursor.fetchone()) + print(await cursor.fetchmany(10)) + print(await cursor.fetchall()) +``` + +Async iteration is supported: + +```python +from pyathena import aconnect +from pyathena.aio.s3fs.cursor import AioS3FSCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioS3FSCursor) + await cursor.execute("SELECT * FROM many_rows") + async for row in cursor: + print(row) +``` + +Execution information of the query can also be retrieved: + +```python +from pyathena import aconnect +from pyathena.aio.s3fs.cursor import AioS3FSCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioS3FSCursor) + await cursor.execute("SELECT * FROM many_rows") + print(cursor.state) + print(cursor.data_scanned_in_bytes) + print(cursor.output_location) +``` diff --git a/docs/spark.md b/docs/spark.md index f9cdce64..d2cc7e1e 100644 --- a/docs/spark.md +++ b/docs/spark.md @@ -333,3 +333,69 @@ with conn.cursor() as cursor: ``` NOTE: Currently it appears that the calculation is not canceled unless the session is terminated. + +(aio-spark-cursor)= + +## AioSparkCursor + +AioSparkCursor is a native asyncio cursor for executing PySpark code on Athena. +Unlike AsyncSparkCursor which uses `concurrent.futures`, this cursor uses +native `asyncio` for polling and API calls, keeping the event loop free. + +Since `SparkBaseCursor.__init__` performs I/O (session management), cursor creation +must be wrapped in `asyncio.to_thread`: + +```python +import asyncio +from pyathena import aconnect +from pyathena.aio.spark.cursor import AioSparkCursor + +async with await aconnect(work_group="YOUR_SPARK_WORKGROUP", + cursor_class=AioSparkCursor) as conn: + cursor = await asyncio.to_thread(conn.cursor) + await cursor.execute("""spark.sql("SELECT 1").show()""") + print(await cursor.get_std_out()) +``` + +The cursor supports the async context manager for automatic session termination: + +```python +import asyncio +import textwrap +from pyathena import aconnect +from pyathena.aio.spark.cursor import AioSparkCursor + +async with await aconnect(work_group="YOUR_SPARK_WORKGROUP", + cursor_class=AioSparkCursor) as conn: + cursor = await asyncio.to_thread(conn.cursor) + async with cursor: + await cursor.execute( + textwrap.dedent( + """ + file_name = "s3://athena-examples-us-east-1/notebooks/yellow_tripdata_2016-01.parquet" + + taxi_df = (spark.read.format("parquet") + .option("header", "true") + .option("inferSchema", "true") + .load(file_name)) + taxi1_df=taxi_df.groupBy("VendorID", "passenger_count").count() + taxi1_df.show() + """ + ) + ) + print(await cursor.get_std_out()) + print(await cursor.get_std_error()) +``` + +To cancel a running calculation: + +```python +import asyncio +from pyathena import aconnect +from pyathena.aio.spark.cursor import AioSparkCursor + +async with await aconnect(work_group="YOUR_SPARK_WORKGROUP", + cursor_class=AioSparkCursor) as conn: + cursor = await asyncio.to_thread(conn.cursor) + await cursor.cancel() +``` From f0adaeebfb8804bba1d5bbc08c699633206558ff Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 21 Feb 2026 13:39:27 +0900 Subject: [PATCH 2/5] Remove Async subsection heading from README Replace the ### Async heading with a simple inline lead-in sentence to avoid an unbalanced section structure under Usage. Co-Authored-By: Claude Opus 4.6 --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 8ff964fa..015109ea 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ print(cursor.description) print(cursor.fetchall()) ``` -### Async +Native asyncio is also supported: ```python import asyncio From ea55fbcb25f25fc929d56e024d30c29082cb8bae Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 21 Feb 2026 13:43:35 +0900 Subject: [PATCH 3/5] Clarify that S3 download happens during execute(), not during fetch Explain why as_pandas/as_arrow/as_polars don't need await: the S3 download is wrapped in asyncio.to_thread inside execute(), so data is already in memory by the time fetch/as_* methods are called. Co-Authored-By: Claude Opus 4.6 --- docs/aio.md | 23 +++++++++++++---------- docs/arrow.md | 5 +++-- docs/pandas.md | 5 +++-- docs/polars.md | 5 +++-- 4 files changed, 22 insertions(+), 16 deletions(-) diff --git a/docs/aio.md b/docs/aio.md index 7e3d8388..2876dd55 100644 --- a/docs/aio.md +++ b/docs/aio.md @@ -177,24 +177,27 @@ Native asyncio versions are available for all cursor types: ### Fetch behavior -Most aio cursors load all result data eagerly during `execute()` (via `asyncio.to_thread`), -so `fetchone()`, `fetchmany()`, and `fetchall()` are synchronous (in-memory only): +For **AioPandasCursor**, **AioArrowCursor**, and **AioPolarsCursor**, the S3 download +(CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. +By the time `execute()` returns, all data is already loaded into memory. +Therefore `fetchone()`, `fetchall()`, `as_pandas()`, `as_arrow()`, and `as_polars()` +are synchronous (in-memory only) and do not need `await`: ```python -# Pandas, Arrow, Polars — fetch is sync (data already loaded) -await cursor.execute("SELECT * FROM many_rows") -row = cursor.fetchone() # No await needed -rows = cursor.fetchall() # No await needed -df = cursor.as_pandas() # No await needed +# Pandas, Arrow, Polars — S3 download completes during execute() +await cursor.execute("SELECT * FROM many_rows") # Downloads data here +row = cursor.fetchone() # No await — data already in memory +rows = cursor.fetchall() # No await +df = cursor.as_pandas() # No await ``` The exceptions are **AioCursor** and **AioS3FSCursor**, which stream rows lazily from S3. -Their fetch methods require `await`: +Their fetch methods perform I/O and require `await`: ```python -# AioCursor, AioS3FSCursor — fetch is async (reads from S3) +# AioCursor, AioS3FSCursor — fetch reads from S3 lazily await cursor.execute("SELECT * FROM many_rows") -row = await cursor.fetchone() # Await required +row = await cursor.fetchone() # Await required — reads from S3 rows = await cursor.fetchall() # Await required ``` diff --git a/docs/arrow.md b/docs/arrow.md index fe063215..49121bbc 100644 --- a/docs/arrow.md +++ b/docs/arrow.md @@ -489,8 +489,9 @@ AioArrowCursor is a native asyncio cursor that returns results as Apache Arrow T Unlike AsyncArrowCursor which uses `concurrent.futures`, this cursor uses `asyncio.to_thread()` for result set creation, keeping the event loop free. -Since the result set is loaded eagerly during `execute()`, fetch methods, `as_arrow()`, -and `as_polars()` are synchronous (in-memory only) and do not need `await`. +The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. +By the time `execute()` returns, all data is already loaded into memory. +Therefore fetch methods, `as_arrow()`, and `as_polars()` are synchronous and do not need `await`. ```python from pyathena import aconnect diff --git a/docs/pandas.md b/docs/pandas.md index e0e5364a..0bb67232 100644 --- a/docs/pandas.md +++ b/docs/pandas.md @@ -776,8 +776,9 @@ AioPandasCursor is a native asyncio cursor that returns results as pandas DataFr Unlike AsyncPandasCursor which uses `concurrent.futures`, this cursor uses `asyncio.to_thread()` for result set creation, keeping the event loop free. -Since the result set is loaded eagerly during `execute()`, fetch methods and `as_pandas()` -are synchronous (in-memory only) and do not need `await`. +The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. +By the time `execute()` returns, all data is already loaded into memory. +Therefore fetch methods and `as_pandas()` are synchronous and do not need `await`. ```python from pyathena import aconnect diff --git a/docs/polars.md b/docs/polars.md index cf593d69..968973dc 100644 --- a/docs/polars.md +++ b/docs/polars.md @@ -583,8 +583,9 @@ AioPolarsCursor is a native asyncio cursor that returns results as Polars DataFr Unlike AsyncPolarsCursor which uses `concurrent.futures`, this cursor uses `asyncio.to_thread()` for result set creation, keeping the event loop free. -Since the result set is loaded eagerly during `execute()`, fetch methods, `as_polars()`, -and `as_arrow()` are synchronous (in-memory only) and do not need `await`. +The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. +By the time `execute()` returns, all data is already loaded into memory. +Therefore fetch methods, `as_polars()`, and `as_arrow()` are synchronous and do not need `await`. ```python from pyathena import aconnect From ece3ddaca32e03e327e60a65a8a38a4854d3bd6c Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 21 Feb 2026 13:47:35 +0900 Subject: [PATCH 4/5] Add note about chunksize blocking the event loop in aio cursors When chunksize is set, execute() creates a lazy reader instead of loading all data. Subsequent iteration triggers blocking S3 reads not wrapped in asyncio.to_thread(). Document this limitation. Co-Authored-By: Claude Opus 4.6 --- docs/aio.md | 10 ++++++++++ docs/pandas.md | 7 +++++++ docs/polars.md | 7 +++++++ 3 files changed, 24 insertions(+) diff --git a/docs/aio.md b/docs/aio.md index 2876dd55..681f2e81 100644 --- a/docs/aio.md +++ b/docs/aio.md @@ -201,4 +201,14 @@ row = await cursor.fetchone() # Await required — reads from S3 rows = await cursor.fetchall() # Await required ``` +```{note} +When using AioPandasCursor or AioPolarsCursor with the `chunksize` option, +`execute()` creates a lazy reader (e.g., pandas `TextFileReader`) instead of +loading all data at once. Subsequent iteration via `as_pandas()`, `fetchone()`, +or `async for` triggers chunk-by-chunk S3 reads that are **not** wrapped in +`asyncio.to_thread()` and will block the event loop. If you need chunked +processing in an async application, consider wrapping the iteration in +`asyncio.to_thread()` yourself, or use the default non-chunked mode. +``` + See each cursor's documentation page for detailed usage examples. diff --git a/docs/pandas.md b/docs/pandas.md index 0bb67232..a26c4bde 100644 --- a/docs/pandas.md +++ b/docs/pandas.md @@ -832,3 +832,10 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", await cursor.execute("SELECT * FROM many_rows") df = cursor.as_pandas() ``` + +```{note} +When using AioPandasCursor with the `chunksize` option, `execute()` creates a lazy +`TextFileReader` instead of loading all data at once. Subsequent iteration via +`as_pandas()`, `fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that +are not wrapped in `asyncio.to_thread()` and will block the event loop. +``` diff --git a/docs/polars.md b/docs/polars.md index 968973dc..fed8887f 100644 --- a/docs/polars.md +++ b/docs/polars.md @@ -652,3 +652,10 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", await cursor.execute("SELECT * FROM many_rows") df = cursor.as_polars() ``` + +```{note} +When using AioPolarsCursor with the `chunksize` option, `execute()` creates a lazy +reader instead of loading all data at once. Subsequent iteration via `as_polars()`, +`fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that are not wrapped +in `asyncio.to_thread()` and will block the event loop. +``` From 43239872acd06849de8358f94637a8e97b192ce8 Mon Sep 17 00:00:00 2001 From: laughingman7743 Date: Sat, 21 Feb 2026 13:54:11 +0900 Subject: [PATCH 5/5] Add AthenaAioDictResultSet to API reference Co-Authored-By: Claude Opus 4.6 --- docs/api/aio.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/api/aio.rst b/docs/api/aio.rst index 4bf5a569..4124a255 100644 --- a/docs/api/aio.rst +++ b/docs/api/aio.rst @@ -33,6 +33,10 @@ Aio Result Set :members: :inherited-members: +.. autoclass:: pyathena.aio.result_set.AthenaAioDictResultSet + :members: + :inherited-members: + Aio Base Classes ----------------