Skip to content

Add native asyncio S3FS and Spark cursors with boilerplate deduplication (Phase 3)#668

Merged
laughingman7743 merged 5 commits intomasterfrom
feature/native-asyncio-cursor-phase3
Feb 21, 2026
Merged

Add native asyncio S3FS and Spark cursors with boilerplate deduplication (Phase 3)#668
laughingman7743 merged 5 commits intomasterfrom
feature/native-asyncio-cursor-phase3

Conversation

@laughingman7743
Copy link
Member

@laughingman7743 laughingman7743 commented Feb 20, 2026

Summary

Phase 3 of native asyncio cursor implementation (#662), building on Phase 2 (PR #667).

Part 1: Boilerplate Deduplication — WithAsyncFetch mixin

Extract shared boilerplate from 4 existing aio SQL cursors into a WithAsyncFetch mixin in pyathena/aio/common.py:

  • Properties: arraysize (getter/setter), result_set (getter/setter), query_id (getter/setter), rownumber, rowcount
  • Lifecycle: close(), executemany(), cancel()
  • Default sync fetch: fetchone(), fetchmany(), fetchall() — for cursors that load data eagerly in __init__
  • Async protocol: __aiter__, __anext__, __aenter__, __aexit__

Net reduction of ~520 lines of duplicated code across AioCursor, AioPandasCursor, AioArrowCursor, AioPolarsCursor.

Part 2: AioS3FSCursor

New pyathena/aio/s3fs/cursor.py — async CSV cursor using S3FileSystem.

Unlike other aio cursors (Arrow/Pandas/Polars) that load data eagerly, AthenaS3FSResultSet lazily streams rows from S3 via a CSV reader. Therefore:

  • Result set creation: asyncio.to_thread(AthenaS3FSResultSet, ...)
  • Fetch methods: async via asyncio.to_thread(result_set.fetch*, ...) — reads from S3 on each call
  • No UNLOAD support (consistent with sync S3FSCursor)

Part 3: AioSparkCursor

New pyathena/aio/spark/cursor.py — async PySpark code execution.

  • Inherits from SparkBaseCursor and WithCalculationExecution directly (no intermediate base class — aio only has one Spark cursor variant)
  • Post-init I/O methods (_poll, _cancel, _terminate_session, _read_s3_file_as_text, _calculate) overridden with async equivalents
  • Session management stays synchronous in __init__ (runs inside asyncio.to_thread at cursor creation)
  • _read_s3_file_as_text uses async_retry_api_call for retry consistency with sync version

Additional changes

  • Removed on_start_query_execution callback from all aio cursors — unnecessary in async context where await provides direct control flow. Consistent with AsyncCursor which also omits this callback.

Class hierarchy

BaseCursor
  └─ AioBaseCursor (async I/O: _execute, _poll, _cancel, metadata)
      └─ WithAsyncFetch (properties, close, cancel, sync fetch, async protocol)
          ├─ AioCursor / AioDictCursor     — async fetch (paginated API)
          ├─ AioPandasCursor               — eager load, as_pandas()
          ├─ AioArrowCursor                — eager load, as_arrow(), as_polars()
          ├─ AioPolarsCursor               — eager load, as_polars(), as_arrow()
          └─ AioS3FSCursor                 — lazy streaming, async fetch

SparkBaseCursor + WithCalculationExecution
  └─ AioSparkCursor (async calculation I/O, session, stdout/stderr)

Files changed

Action File Description
Modify pyathena/aio/common.py Add WithAsyncFetch mixin
Modify pyathena/aio/cursor.py Extend WithAsyncFetch, remove boilerplate
Modify pyathena/aio/pandas/cursor.py Extend WithAsyncFetch, remove boilerplate
Modify pyathena/aio/arrow/cursor.py Extend WithAsyncFetch, remove boilerplate
Modify pyathena/aio/polars/cursor.py Extend WithAsyncFetch, remove boilerplate
Create pyathena/aio/s3fs/__init__.py Package marker
Create pyathena/aio/s3fs/cursor.py AioS3FSCursor
Create pyathena/aio/spark/__init__.py Package marker
Create pyathena/aio/spark/cursor.py AioSparkCursor
Modify tests/pyathena/aio/conftest.py Add s3fs + spark fixtures
Create tests/pyathena/aio/s3fs/ S3FS cursor tests
Create tests/pyathena/aio/spark/ Spark cursor tests
Modify tests/pyathena/aio/test_cursor.py Remove callback test

Test plan

  • make fmt — formatting clean
  • make chk — lint + mypy clean
  • All existing aio tests pass (no regressions from Phase 2)
  • New AioS3FSCursor tests: fetchone/many/all, async iterator, description, cancel, executemany, arraysize, context manager
  • New AioSparkCursor tests: spark_dataframe, spark_sql, failed, cancel, executemany, context manager

Related issues

🤖 Generated with Claude Code

laughingman7743 and others added 2 commits February 21, 2026 00:53
Phase 3 of native asyncio cursor implementation:

Part 1 - Boilerplate deduplication:
- Extract shared properties, lifecycle methods, sync fetch, and async
  protocol into AioCursorBase (aio/base.py)
- Refactor AioCursor, AioPandasCursor, AioArrowCursor, AioPolarsCursor
  to extend AioCursorBase, reducing ~520 lines of duplicated code

Part 2 - AioS3FSCursor:
- Lightweight async CSV cursor using S3FileSystem
- Async fetch methods (via asyncio.to_thread) since S3FS uses lazy
  streaming from S3

Part 3 - AioSparkCursor:
- AioSparkBaseCursor overrides post-init I/O with async equivalents
  (poll, cancel, terminate_session, read_s3_file)
- AioSparkCursor for executing PySpark code asynchronously
- Session init stays sync (wrapped in asyncio.to_thread at creation)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move the shared SQL cursor mixin from aio/base.py into aio/common.py
and rename from AioCursorBase to WithAsyncFetch to follow the existing
WithXXX naming convention (WithResultSet, WithCalculationExecution)
where XXX describes the functionality provided.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
laughingman7743 and others added 3 commits February 21, 2026 10:05
Unlike sync Spark (SparkCursor + AsyncSparkCursor sharing SparkBaseCursor),
the aio side has only AioSparkCursor, so a separate base class is unnecessary.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Native asyncio cursors provide direct control flow via await, making
the on_start_query_execution callback unnecessary. This aligns with
AsyncCursor which also omits this callback.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Change _calculate exception from OperationalError to DatabaseError
  to match sync SparkBaseCursor behavior
- Add retry logic to _read_s3_file_as_text via async_retry_api_call
  to match sync version's retry_api_call usage
- Remove incorrect # type: ignore[override] on name-mangled __poll
- Add test_async_iterator for AioS3FSCursor
- Add test_executemany and test_context_manager for AioSparkCursor
- Move runtime import to top-level in S3FS test file

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@laughingman7743 laughingman7743 marked this pull request as ready for review February 21, 2026 01:41
@laughingman7743 laughingman7743 merged commit eede1e4 into master Feb 21, 2026
5 checks passed
@laughingman7743 laughingman7743 deleted the feature/native-asyncio-cursor-phase3 branch February 21, 2026 01:42
laughingman7743 added a commit that referenced this pull request Feb 21, 2026
Add comprehensive documentation for the native asyncio cursor
implementations added in PRs #666, #667, #668. This includes a new
docs/aio.md overview page, AioCursor sections in each specialized
cursor page, API reference for the aio module, and an async example
in the README.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add native asyncio cursor support (AioCursor)

1 participant