-
Notifications
You must be signed in to change notification settings - Fork 107
Description
Summary
The current S3FileSystem uses ThreadPoolExecutor for parallel S3 operations (range reads, batch deletes, multipart uploads). Add an AioS3FileSystem variant that uses asyncio.gather + asyncio.to_thread instead, providing better integration with the asyncio event loop for aio cursors.
Motivation
When aio cursors use the current S3FileSystem, operations are double-wrapped:
- The cursor wraps the result set creation in
asyncio.to_thread() - Inside that thread,
S3FileSystemspawns more threads viaThreadPoolExecutor
An AioS3FileSystem would allow aio cursors to use async S3 operations directly, eliminating the thread-in-thread pattern and integrating naturally with the event loop.
Design
- Keep
S3FileSystemas-is — no breaking changes to the existing synchronous implementation - Add
AioS3FileSystem— usesasyncio.gather+asyncio.to_threadfor individual boto3 calls instead ofThreadPoolExecutor - Follow fsspec's
_async_implpattern — implement async methods (_cat_file,_ls, etc.) so fsspec provides sync wrappers automatically - No new dependencies — uses
asyncio.to_threadto wrap synchronous boto3 calls (noaiobotocoreneeded)
ThreadPoolExecutor usage to replace
| Location | Current usage | Async replacement |
|---|---|---|
S3File._fetch_range() |
Parallel range GETs | asyncio.gather(*[asyncio.to_thread(...)]) |
S3FileSystem._delete_objects() |
Batch deletes | asyncio.gather(*[asyncio.to_thread(...)]) |
S3File._upload_chunk() |
Multipart uploads | asyncio.gather(*[asyncio.to_thread(...)]) |
User-facing API
Users can choose which filesystem to use. Aio cursors could default to AioS3FileSystem when available:
from pyathena.filesystem.s3 import AioS3FileSystem
# Direct usage
fs = AioS3FileSystem(connection=connection)
async with fs.open("s3://bucket/key", "rb") as f:
data = await f.read()Future consideration
If benchmarks show the aio version performs better, it could become the default implementation for aio cursors or even replace the ThreadPoolExecutor-based implementation entirely.
Related
- AioPandasCursor/AioPolarsCursor: chunksize fetch blocks the event loop #672 — chunksize fetch blocking the event loop (fixed in Wrap fetch methods in asyncio.to_thread for aio pandas/arrow/polars cursors #674 with
asyncio.to_threadwrapping) - Wrap fetch methods in asyncio.to_thread for aio pandas/arrow/polars cursors #674 — wraps fetch methods in
asyncio.to_threadfor aio pandas/arrow/polars cursors