diff --git a/docs/aio.md b/docs/aio.md index cd3bb22d..060bc190 100644 --- a/docs/aio.md +++ b/docs/aio.md @@ -193,3 +193,78 @@ The `as_pandas()`, `as_arrow()`, and `as_polars()` convenience methods operate o already-loaded data and remain synchronous. See each cursor's documentation page for detailed usage examples. + +(aio-s3-filesystem)= + +## AioS3FileSystem + +`AioS3FileSystem` is a native asyncio filesystem interface for Amazon S3, built on +fsspec's `AsyncFileSystem`. It provides the same functionality as `S3FileSystem` but +uses `asyncio.gather` with `asyncio.to_thread` for parallel operations instead of +`ThreadPoolExecutor`. + +### Why AioS3FileSystem? + +The synchronous `S3FileSystem` uses `ThreadPoolExecutor` for parallel S3 operations +(batch deletes, multipart uploads, range reads). When used from within an asyncio +application via `AioS3FSCursor`, this creates a thread-in-thread pattern: +the cursor wraps calls in `asyncio.to_thread()`, and inside that thread +`S3FileSystem` spawns additional threads via `ThreadPoolExecutor`. + +`AioS3FileSystem` eliminates this inefficiency by dispatching all parallel +operations through the asyncio event loop. + +| | S3FileSystem | AioS3FileSystem | +|---|---|---| +| **Parallelism** | `ThreadPoolExecutor` | `asyncio.gather` + `asyncio.to_thread` | +| **File handles** | `S3File` with thread pool | `AioS3File` with `S3AioExecutor` | +| **Bulk delete** | Thread pool per batch | `asyncio.gather` per batch | +| **Multipart copy** | Thread pool per part | `asyncio.gather` per part | +| **Best for** | Synchronous applications | Async frameworks (FastAPI, aiohttp, etc.) | + +### Executor strategy + +`S3FileSystem` and `S3File` use a pluggable executor abstraction (`S3Executor`) for +parallel operations. Two implementations are provided: + +- `S3ThreadPoolExecutor` — wraps `ThreadPoolExecutor` (default for sync usage) +- `S3AioExecutor` — dispatches work via `asyncio.run_coroutine_threadsafe` + `asyncio.to_thread` + +`AioS3FileSystem` automatically uses `S3AioExecutor` for file handles, so multipart +uploads and parallel range reads are executed on the event loop without spawning +additional threads. + +### Usage with AioS3FSCursor + +`AioS3FSCursor` automatically uses `AioS3FileSystem` internally. No additional +configuration is needed: + +```python +from pyathena import aio_connect +from pyathena.aio.s3fs.cursor import AioS3FSCursor + +async with await aio_connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", + region_name="us-west-2") as conn: + cursor = conn.cursor(AioS3FSCursor) + await cursor.execute("SELECT * FROM many_rows") + async for row in cursor: + print(row) +``` + +### Standalone usage + +`AioS3FileSystem` can also be used directly for S3 operations: + +```python +from pyathena.filesystem.s3_async import AioS3FileSystem + +# Async context +fs = AioS3FileSystem(asynchronous=True) + +files = await fs._ls("s3://my-bucket/data/") +data = await fs._cat_file("s3://my-bucket/data/file.csv") +await fs._rm("s3://my-bucket/data/old/", recursive=True) + +# Sync wrappers are auto-generated by fsspec +files = fs.ls("s3://my-bucket/data/") +``` diff --git a/docs/api/filesystem.rst b/docs/api/filesystem.rst index 827803d9..17d12908 100644 --- a/docs/api/filesystem.rst +++ b/docs/api/filesystem.rst @@ -14,6 +14,27 @@ S3 FileSystem .. autoclass:: pyathena.filesystem.s3.S3File :members: +Async S3 FileSystem +------------------- + +.. autoclass:: pyathena.filesystem.s3_async.AioS3FileSystem + :members: + +.. autoclass:: pyathena.filesystem.s3_async.AioS3File + :members: + +S3 Executor +----------- + +.. autoclass:: pyathena.filesystem.s3_executor.S3Executor + :members: + +.. autoclass:: pyathena.filesystem.s3_executor.S3ThreadPoolExecutor + :members: + +.. autoclass:: pyathena.filesystem.s3_executor.S3AioExecutor + :members: + S3 Objects ----------