diff --git a/README.md b/README.md index 27b9e186..015109ea 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,22 @@ print(cursor.description) print(cursor.fetchall()) ``` +Native asyncio is also supported: + +```python +import asyncio +from pyathena import aconnect + +async def main(): + async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor() + await cursor.execute("SELECT 1") + print(await cursor.fetchone()) + +asyncio.run(main()) +``` + ## License [MIT license](LICENSE) diff --git a/docs/aio.md b/docs/aio.md new file mode 100644 index 00000000..681f2e81 --- /dev/null +++ b/docs/aio.md @@ -0,0 +1,214 @@ +(aio)= + +# Native Asyncio Cursors + +PyAthena provides native asyncio cursor implementations under `pyathena.aio`. +These cursors use `asyncio.sleep` for polling and `asyncio.to_thread` for boto3 calls, +keeping the event loop free without relying on thread pools for concurrency. + +## Why native asyncio? + +PyAthena has two families of async cursors: + +| | AsyncCursor | AioCursor | +|---|---|---| +| **Concurrency model** | `concurrent.futures.ThreadPoolExecutor` | Native `asyncio` (`await` / `async for`) | +| **Event loop** | Blocks a thread per query | Non-blocking | +| **Connection** | `connect()` (sync) | `aconnect()` (async) | +| **execute()** returns | `(query_id, Future)` | Awaitable cursor (self) | +| **Fetch methods** | Sync (via `Future.result()`) | `await cursor.fetchone()` for streaming cursors | +| **Iteration** | `for row in result_set` | `async for row in cursor` | +| **Context manager** | `with conn.cursor() as cursor` | `async with conn.cursor() as cursor` | +| **Best for** | Adding concurrency to sync code | Async frameworks (FastAPI, aiohttp, etc.) | + +Choose `AioCursor` when your application already uses `asyncio` (e.g., web frameworks, +async pipelines). Choose `AsyncCursor` when you want simple parallel query execution +from synchronous code. + +(aio-connection)= + +## Connection + +Use the `aconnect()` function to create an async connection. +It returns an `AioConnection` that produces `AioCursor` instances by default. + +```python +from pyathena import aconnect + +conn = await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") +``` + +The connection supports the async context manager protocol: + +```python +from pyathena import aconnect + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor() + await cursor.execute("SELECT 1") + print(await cursor.fetchone()) +``` + +(aio-cursor)= + +## AioCursor + +AioCursor is a native asyncio cursor that uses `await` for query execution and result fetching. +It follows the DB API 2.0 interface adapted for async usage. + +```python +from pyathena import aconnect +from pyathena.aio.cursor import AioCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor() + await cursor.execute("SELECT * FROM many_rows") + print(await cursor.fetchone()) + print(await cursor.fetchmany(10)) + print(await cursor.fetchall()) +``` + +The cursor supports the `async with` context manager: + +```python +from pyathena import aconnect + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM many_rows") + rows = await cursor.fetchall() +``` + +You can iterate over results with `async for`: + +```python +from pyathena import aconnect + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM many_rows") + async for row in cursor: + print(row) +``` + +Execution information of the query can also be retrieved: + +```python +from pyathena import aconnect + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM many_rows") + print(cursor.state) + print(cursor.state_change_reason) + print(cursor.completion_date_time) + print(cursor.submission_date_time) + print(cursor.data_scanned_in_bytes) + print(cursor.engine_execution_time_in_millis) + print(cursor.query_queue_time_in_millis) + print(cursor.total_execution_time_in_millis) + print(cursor.query_planning_time_in_millis) + print(cursor.service_processing_time_in_millis) + print(cursor.output_location) +``` + +To cancel a running query: + +```python +from pyathena import aconnect + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + async with conn.cursor() as cursor: + await cursor.execute("SELECT * FROM many_rows") + await cursor.cancel() +``` + +(aio-dict-cursor)= + +## AioDictCursor + +AioDictCursor is an AioCursor that returns rows as dictionaries with column names as keys. + +```python +from pyathena import aconnect +from pyathena.aio.cursor import AioDictCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioDictCursor) + await cursor.execute("SELECT * FROM many_rows LIMIT 10") + async for row in cursor: + print(row["a"]) +``` + +If you want to change the dictionary type (e.g., use OrderedDict): + +```python +from collections import OrderedDict +from pyathena import aconnect +from pyathena.aio.cursor import AioDictCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioDictCursor, dict_type=OrderedDict) + await cursor.execute("SELECT * FROM many_rows LIMIT 10") + async for row in cursor: + print(row) +``` + +## Specialized Aio Cursors + +Native asyncio versions are available for all cursor types: + +| Cursor | Module | Result format | +|--------|--------|---------------| +| {ref}`AioPandasCursor ` | `pyathena.aio.pandas.cursor` | pandas DataFrame | +| {ref}`AioArrowCursor ` | `pyathena.aio.arrow.cursor` | pyarrow Table | +| {ref}`AioPolarsCursor ` | `pyathena.aio.polars.cursor` | polars DataFrame | +| {ref}`AioS3FSCursor ` | `pyathena.aio.s3fs.cursor` | Row tuples (lightweight) | +| {ref}`AioSparkCursor ` | `pyathena.aio.spark.cursor` | PySpark execution | + +### Fetch behavior + +For **AioPandasCursor**, **AioArrowCursor**, and **AioPolarsCursor**, the S3 download +(CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. +By the time `execute()` returns, all data is already loaded into memory. +Therefore `fetchone()`, `fetchall()`, `as_pandas()`, `as_arrow()`, and `as_polars()` +are synchronous (in-memory only) and do not need `await`: + +```python +# Pandas, Arrow, Polars — S3 download completes during execute() +await cursor.execute("SELECT * FROM many_rows") # Downloads data here +row = cursor.fetchone() # No await — data already in memory +rows = cursor.fetchall() # No await +df = cursor.as_pandas() # No await +``` + +The exceptions are **AioCursor** and **AioS3FSCursor**, which stream rows lazily from S3. +Their fetch methods perform I/O and require `await`: + +```python +# AioCursor, AioS3FSCursor — fetch reads from S3 lazily +await cursor.execute("SELECT * FROM many_rows") +row = await cursor.fetchone() # Await required — reads from S3 +rows = await cursor.fetchall() # Await required +``` + +```{note} +When using AioPandasCursor or AioPolarsCursor with the `chunksize` option, +`execute()` creates a lazy reader (e.g., pandas `TextFileReader`) instead of +loading all data at once. Subsequent iteration via `as_pandas()`, `fetchone()`, +or `async for` triggers chunk-by-chunk S3 reads that are **not** wrapped in +`asyncio.to_thread()` and will block the event loop. If you need chunked +processing in an async application, consider wrapping the iteration in +`asyncio.to_thread()` yourself, or use the default non-chunked mode. +``` + +See each cursor's documentation page for detailed usage examples. diff --git a/docs/api.md b/docs/api.md index 6478d7ab..f2f11c3e 100644 --- a/docs/api.md +++ b/docs/api.md @@ -20,6 +20,7 @@ api/arrow api/polars api/s3fs api/spark +api/aio ``` ## Quick reference @@ -44,3 +45,4 @@ api/spark - {ref}`api_polars` - Polars DataFrame integration (no pyarrow required) - {ref}`api_s3fs` - Lightweight S3FS-based cursor (no pandas/pyarrow required) - {ref}`api_spark` - Apache Spark integration for big data processing +- {ref}`api_aio` - Native asyncio cursor implementations diff --git a/docs/api/aio.rst b/docs/api/aio.rst new file mode 100644 index 00000000..4124a255 --- /dev/null +++ b/docs/api/aio.rst @@ -0,0 +1,84 @@ +.. _api_aio: + +Native Asyncio +============== + +This section covers the native asyncio connection, cursors, and base classes. + +Connection +---------- + +.. automodule:: pyathena + :members: aconnect + +.. autoclass:: pyathena.aio.connection.AioConnection + :members: + :inherited-members: + +Aio Cursors +----------- + +.. autoclass:: pyathena.aio.cursor.AioCursor + :members: + :inherited-members: + +.. autoclass:: pyathena.aio.cursor.AioDictCursor + :members: + :inherited-members: + +Aio Result Set +-------------- + +.. autoclass:: pyathena.aio.result_set.AthenaAioResultSet + :members: + :inherited-members: + +.. autoclass:: pyathena.aio.result_set.AthenaAioDictResultSet + :members: + :inherited-members: + +Aio Base Classes +---------------- + +.. autoclass:: pyathena.aio.common.AioBaseCursor + :members: + :inherited-members: + +.. autoclass:: pyathena.aio.common.WithAsyncFetch + :members: + :inherited-members: + +Aio Pandas Cursor +----------------- + +.. autoclass:: pyathena.aio.pandas.cursor.AioPandasCursor + :members: + :inherited-members: + +Aio Arrow Cursor +---------------- + +.. autoclass:: pyathena.aio.arrow.cursor.AioArrowCursor + :members: + :inherited-members: + +Aio Polars Cursor +----------------- + +.. autoclass:: pyathena.aio.polars.cursor.AioPolarsCursor + :members: + :inherited-members: + +Aio S3FS Cursor +--------------- + +.. autoclass:: pyathena.aio.s3fs.cursor.AioS3FSCursor + :members: + :inherited-members: + +Aio Spark Cursor +---------------- + +.. autoclass:: pyathena.aio.spark.cursor.AioSparkCursor + :members: + :inherited-members: diff --git a/docs/arrow.md b/docs/arrow.md index 1917d408..49121bbc 100644 --- a/docs/arrow.md +++ b/docs/arrow.md @@ -480,3 +480,94 @@ cursor = connect( region_name="us-west-2" ).cursor(AsyncArrowCursor, connect_timeout=10.0, request_timeout=30.0) ``` + +(aio-arrow-cursor)= + +## AioArrowCursor + +AioArrowCursor is a native asyncio cursor that returns results as Apache Arrow Tables. +Unlike AsyncArrowCursor which uses `concurrent.futures`, this cursor uses +`asyncio.to_thread()` for result set creation, keeping the event loop free. + +The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. +By the time `execute()` returns, all data is already loaded into memory. +Therefore fetch methods, `as_arrow()`, and `as_polars()` are synchronous and do not need `await`. + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor) + table = (await cursor.execute("SELECT * FROM many_rows")).as_arrow() + print(table) + print(table.column_names) + print(table.num_rows) + print(table.schema) +``` + +Support fetch and iterate query results: + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor) + await cursor.execute("SELECT * FROM many_rows") + print(cursor.fetchone()) + print(cursor.fetchmany()) + print(cursor.fetchall()) +``` + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor) + await cursor.execute("SELECT * FROM many_rows") + async for row in cursor: + print(row) +``` + +The `as_polars()` method converts the result to a Polars DataFrame: + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor) + await cursor.execute("SELECT * FROM many_rows") + df = cursor.as_polars() +``` + +The unload option is also available: + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor, unload=True) + await cursor.execute("SELECT * FROM many_rows") + table = cursor.as_arrow() +``` + +AioArrowCursor also supports S3 timeout configuration: + +```python +from pyathena import aconnect +from pyathena.aio.arrow.cursor import AioArrowCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioArrowCursor, connect_timeout=10.0, request_timeout=30.0) + await cursor.execute("SELECT * FROM many_rows") +``` diff --git a/docs/cursor.md b/docs/cursor.md index aaa8966b..9e780def 100644 --- a/docs/cursor.md +++ b/docs/cursor.md @@ -294,6 +294,14 @@ cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", ``` +## AioCursor + +See {ref}`aio-cursor`. + +## AioDictCursor + +See {ref}`aio-dict-cursor`. + ## PandasCursor See {ref}`pandas-cursor`. @@ -302,6 +310,10 @@ See {ref}`pandas-cursor`. See {ref}`async-pandas-cursor`. +## AioPandasCursor + +See {ref}`aio-pandas-cursor`. + ## ArrowCursor See {ref}`arrow-cursor`. @@ -310,6 +322,10 @@ See {ref}`arrow-cursor`. See {ref}`async-arrow-cursor`. +## AioArrowCursor + +See {ref}`aio-arrow-cursor`. + ## PolarsCursor See {ref}`polars-cursor`. @@ -318,6 +334,10 @@ See {ref}`polars-cursor`. See {ref}`async-polars-cursor`. +## AioPolarsCursor + +See {ref}`aio-polars-cursor`. + ## S3FSCursor See {ref}`s3fs-cursor`. @@ -326,6 +346,10 @@ See {ref}`s3fs-cursor`. See {ref}`async-s3fs-cursor`. +## AioS3FSCursor + +See {ref}`aio-s3fs-cursor`. + ## SparkCursor See {ref}`spark-cursor`. @@ -334,5 +358,9 @@ See {ref}`spark-cursor`. See {ref}`async-spark-cursor`. +## AioSparkCursor + +See {ref}`aio-spark-cursor`. + For detailed API documentation of all cursor classes and their methods, see the {ref}`api` section. diff --git a/docs/index.md b/docs/index.md index 68761d03..42adab15 100644 --- a/docs/index.md +++ b/docs/index.md @@ -52,6 +52,7 @@ arrow polars s3fs spark +aio ``` ::: diff --git a/docs/pandas.md b/docs/pandas.md index 61c37ad6..a26c4bde 100644 --- a/docs/pandas.md +++ b/docs/pandas.md @@ -767,3 +767,75 @@ cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", region_name="us-west-2", cursor_class=AsyncPandasCursor).cursor(unload=True) ``` + +(aio-pandas-cursor)= + +## AioPandasCursor + +AioPandasCursor is a native asyncio cursor that returns results as pandas DataFrames. +Unlike AsyncPandasCursor which uses `concurrent.futures`, this cursor uses +`asyncio.to_thread()` for result set creation, keeping the event loop free. + +The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. +By the time `execute()` returns, all data is already loaded into memory. +Therefore fetch methods and `as_pandas()` are synchronous and do not need `await`. + +```python +from pyathena import aconnect +from pyathena.aio.pandas.cursor import AioPandasCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPandasCursor) + await cursor.execute("SELECT * FROM many_rows") + df = cursor.as_pandas() + print(df.describe()) + print(df.head()) +``` + +Support fetch and iterate query results: + +```python +from pyathena import aconnect +from pyathena.aio.pandas.cursor import AioPandasCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPandasCursor) + await cursor.execute("SELECT * FROM many_rows") + print(cursor.fetchone()) + print(cursor.fetchmany()) + print(cursor.fetchall()) +``` + +```python +from pyathena import aconnect +from pyathena.aio.pandas.cursor import AioPandasCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPandasCursor) + await cursor.execute("SELECT * FROM many_rows") + async for row in cursor: + print(row) +``` + +The unload option is also available: + +```python +from pyathena import aconnect +from pyathena.aio.pandas.cursor import AioPandasCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPandasCursor, unload=True) + await cursor.execute("SELECT * FROM many_rows") + df = cursor.as_pandas() +``` + +```{note} +When using AioPandasCursor with the `chunksize` option, `execute()` creates a lazy +`TextFileReader` instead of loading all data at once. Subsequent iteration via +`as_pandas()`, `fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that +are not wrapped in `asyncio.to_thread()` and will block the event loop. +``` diff --git a/docs/polars.md b/docs/polars.md index ffc78485..fed8887f 100644 --- a/docs/polars.md +++ b/docs/polars.md @@ -574,3 +574,88 @@ result_set = future.result() for chunk in result_set.iter_chunks(): process_chunk(chunk) ``` + +(aio-polars-cursor)= + +## AioPolarsCursor + +AioPolarsCursor is a native asyncio cursor that returns results as Polars DataFrames. +Unlike AsyncPolarsCursor which uses `concurrent.futures`, this cursor uses +`asyncio.to_thread()` for result set creation, keeping the event loop free. + +The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. +By the time `execute()` returns, all data is already loaded into memory. +Therefore fetch methods, `as_polars()`, and `as_arrow()` are synchronous and do not need `await`. + +```python +from pyathena import aconnect +from pyathena.aio.polars.cursor import AioPolarsCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPolarsCursor) + await cursor.execute("SELECT * FROM many_rows") + df = cursor.as_polars() + print(df.describe()) + print(df.head()) +``` + +Support fetch and iterate query results: + +```python +from pyathena import aconnect +from pyathena.aio.polars.cursor import AioPolarsCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPolarsCursor) + await cursor.execute("SELECT * FROM many_rows") + print(cursor.fetchone()) + print(cursor.fetchmany()) + print(cursor.fetchall()) +``` + +```python +from pyathena import aconnect +from pyathena.aio.polars.cursor import AioPolarsCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPolarsCursor) + await cursor.execute("SELECT * FROM many_rows") + async for row in cursor: + print(row) +``` + +The `as_arrow()` method converts the result to an Apache Arrow Table: + +```python +from pyathena import aconnect +from pyathena.aio.polars.cursor import AioPolarsCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPolarsCursor) + await cursor.execute("SELECT * FROM many_rows") + table = cursor.as_arrow() +``` + +The unload option is also available: + +```python +from pyathena import aconnect +from pyathena.aio.polars.cursor import AioPolarsCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioPolarsCursor, unload=True) + await cursor.execute("SELECT * FROM many_rows") + df = cursor.as_polars() +``` + +```{note} +When using AioPolarsCursor with the `chunksize` option, `execute()` creates a lazy +reader instead of loading all data at once. Subsequent iteration via `as_polars()`, +`fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that are not wrapped +in `asyncio.to_thread()` and will block the event loop. +``` diff --git a/docs/s3fs.md b/docs/s3fs.md index d25d1a69..6d0ed511 100644 --- a/docs/s3fs.md +++ b/docs/s3fs.md @@ -366,3 +366,57 @@ cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", query_id, future = cursor.execute("SELECT * FROM many_rows") cursor.cancel(query_id) ``` + +(aio-s3fs-cursor)= + +## AioS3FSCursor + +AioS3FSCursor is a native asyncio cursor that uses the same lightweight CSV parsing as S3FSCursor. +Unlike AsyncS3FSCursor which uses `concurrent.futures`, this cursor uses +`asyncio.to_thread()` for both result set creation and fetch operations, +keeping the event loop free. + +Since `AthenaS3FSResultSet` lazily streams rows from S3 via a CSV reader, +fetch methods are async and require `await`. + +```python +from pyathena import aconnect +from pyathena.aio.s3fs.cursor import AioS3FSCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioS3FSCursor) + await cursor.execute("SELECT * FROM many_rows") + print(await cursor.fetchone()) + print(await cursor.fetchmany(10)) + print(await cursor.fetchall()) +``` + +Async iteration is supported: + +```python +from pyathena import aconnect +from pyathena.aio.s3fs.cursor import AioS3FSCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioS3FSCursor) + await cursor.execute("SELECT * FROM many_rows") + async for row in cursor: + print(row) +``` + +Execution information of the query can also be retrieved: + +```python +from pyathena import aconnect +from pyathena.aio.s3fs.cursor import AioS3FSCursor + +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioS3FSCursor) + await cursor.execute("SELECT * FROM many_rows") + print(cursor.state) + print(cursor.data_scanned_in_bytes) + print(cursor.output_location) +``` diff --git a/docs/spark.md b/docs/spark.md index f9cdce64..d2cc7e1e 100644 --- a/docs/spark.md +++ b/docs/spark.md @@ -333,3 +333,69 @@ with conn.cursor() as cursor: ``` NOTE: Currently it appears that the calculation is not canceled unless the session is terminated. + +(aio-spark-cursor)= + +## AioSparkCursor + +AioSparkCursor is a native asyncio cursor for executing PySpark code on Athena. +Unlike AsyncSparkCursor which uses `concurrent.futures`, this cursor uses +native `asyncio` for polling and API calls, keeping the event loop free. + +Since `SparkBaseCursor.__init__` performs I/O (session management), cursor creation +must be wrapped in `asyncio.to_thread`: + +```python +import asyncio +from pyathena import aconnect +from pyathena.aio.spark.cursor import AioSparkCursor + +async with await aconnect(work_group="YOUR_SPARK_WORKGROUP", + cursor_class=AioSparkCursor) as conn: + cursor = await asyncio.to_thread(conn.cursor) + await cursor.execute("""spark.sql("SELECT 1").show()""") + print(await cursor.get_std_out()) +``` + +The cursor supports the async context manager for automatic session termination: + +```python +import asyncio +import textwrap +from pyathena import aconnect +from pyathena.aio.spark.cursor import AioSparkCursor + +async with await aconnect(work_group="YOUR_SPARK_WORKGROUP", + cursor_class=AioSparkCursor) as conn: + cursor = await asyncio.to_thread(conn.cursor) + async with cursor: + await cursor.execute( + textwrap.dedent( + """ + file_name = "s3://athena-examples-us-east-1/notebooks/yellow_tripdata_2016-01.parquet" + + taxi_df = (spark.read.format("parquet") + .option("header", "true") + .option("inferSchema", "true") + .load(file_name)) + taxi1_df=taxi_df.groupBy("VendorID", "passenger_count").count() + taxi1_df.show() + """ + ) + ) + print(await cursor.get_std_out()) + print(await cursor.get_std_error()) +``` + +To cancel a running calculation: + +```python +import asyncio +from pyathena import aconnect +from pyathena.aio.spark.cursor import AioSparkCursor + +async with await aconnect(work_group="YOUR_SPARK_WORKGROUP", + cursor_class=AioSparkCursor) as conn: + cursor = await asyncio.to_thread(conn.cursor) + await cursor.cancel() +```