diff --git a/pyathena/aio/s3fs/cursor.py b/pyathena/aio/s3fs/cursor.py index 568ee0aa..40ebcd78 100644 --- a/pyathena/aio/s3fs/cursor.py +++ b/pyathena/aio/s3fs/cursor.py @@ -8,6 +8,7 @@ from pyathena.aio.common import WithAsyncFetch from pyathena.common import CursorIterator from pyathena.error import OperationalError, ProgrammingError +from pyathena.filesystem.s3_async import AioS3FileSystem from pyathena.model import AthenaQueryExecution from pyathena.s3fs.converter import DefaultS3FSTypeConverter from pyathena.s3fs.result_set import AthenaS3FSResultSet, CSVReaderType @@ -16,11 +17,12 @@ class AioS3FSCursor(WithAsyncFetch): - """Native asyncio cursor that reads CSV results via S3FileSystem. + """Native asyncio cursor that reads CSV results via AioS3FileSystem. - Uses ``asyncio.to_thread()`` for result set creation and fetch operations - because ``AthenaS3FSResultSet`` lazily streams rows from S3 via a CSV - reader, making fetch calls blocking I/O. + Uses ``AioS3FileSystem`` for S3 operations, which replaces + ``ThreadPoolExecutor`` parallelism with ``asyncio.gather`` + + ``asyncio.to_thread``. Fetch operations are wrapped in + ``asyncio.to_thread()`` because CSV reading is blocking I/O. Example: >>> async with await pyathena.aio_connect(...) as conn: @@ -127,6 +129,7 @@ async def execute( # type: ignore[override] arraysize=self.arraysize, retry_config=self._retry_config, csv_reader=self._csv_reader, + filesystem_class=AioS3FileSystem, **kwargs, ) else: diff --git a/pyathena/filesystem/s3.py b/pyathena/filesystem/s3.py index 4d5eda34..57f4bed8 100644 --- a/pyathena/filesystem/s3.py +++ b/pyathena/filesystem/s3.py @@ -1,13 +1,11 @@ # -*- coding: utf-8 -*- from __future__ import annotations -import itertools import logging import mimetypes import os.path import re from concurrent.futures import Future, as_completed -from concurrent.futures.thread import ThreadPoolExecutor from copy import deepcopy from datetime import datetime from multiprocessing import cpu_count @@ -23,6 +21,7 @@ from fsspec.utils import tokenize import pyathena +from pyathena.filesystem.s3_executor import S3Executor, S3ThreadPoolExecutor from pyathena.filesystem.s3_object import ( S3CompleteMultipartUpload, S3MultipartUpload, @@ -686,6 +685,20 @@ def _delete_object( **request, ) + def _create_executor(self, max_workers: int) -> S3Executor: + """Create an executor strategy for parallel operations. + + Subclasses can override to provide alternative execution strategies + (e.g., asyncio-based execution). + + Args: + max_workers: Maximum number of parallel workers. + + Returns: + An S3Executor instance. + """ + return S3ThreadPoolExecutor(max_workers=max_workers) + def _delete_objects( self, bucket: str, paths: List[str], max_workers: Optional[int] = None, **kwargs ) -> None: @@ -703,7 +716,7 @@ def _delete_objects( object_.update({"VersionId": version_id}) delete_objects.append(object_) - with ThreadPoolExecutor(max_workers=max_workers) as executor: + with self._create_executor(max_workers=max_workers) as executor: fs = [] for delete in [ delete_objects[i : i + self.DELETE_OBJECTS_MAX_KEYS] @@ -861,7 +874,7 @@ def _copy_object_with_multipart_upload( **kwargs, ) parts = [] - with ThreadPoolExecutor(max_workers=max_workers) as executor: + with self._create_executor(max_workers=max_workers) as executor: fs = [ executor.submit( self._upload_part_copy, @@ -1106,6 +1119,7 @@ def _open( mode, version_id=None, max_workers=max_workers, + executor=self._create_executor(max_workers=max_workers), block_size=block_size, cache_type=cache_type, autocommit=autocommit, @@ -1256,6 +1270,7 @@ def __init__( mode: str = "rb", version_id: Optional[str] = None, max_workers: int = (cpu_count() or 1) * 5, + executor: Optional[S3Executor] = None, block_size: int = S3FileSystem.DEFAULT_BLOCK_SIZE, cache_type: str = "bytes", autocommit: bool = True, @@ -1265,7 +1280,7 @@ def __init__( **kwargs, ) -> None: self.max_workers = max_workers - self._executor = ThreadPoolExecutor(max_workers=max_workers) + self._executor: S3Executor = executor or S3ThreadPoolExecutor(max_workers=max_workers) self.s3_additional_kwargs = s3_additional_kwargs if s3_additional_kwargs else {} super().__init__( @@ -1481,24 +1496,18 @@ def _fetch_range(self, start: int, end: int) -> bytes: start, end, max_workers=self.max_workers, worker_block_size=self.blocksize ) if len(ranges) > 1: - object_ = self._merge_objects( - list( - self._executor.map( - lambda bucket, key, ranges, version_id, kwargs: self.fs._get_object( - bucket=bucket, - key=key, - ranges=ranges, - version_id=version_id, - **kwargs, - ), - itertools.repeat(self.bucket), - itertools.repeat(self.key), - ranges, - itertools.repeat(self.version_id), - itertools.repeat(self.s3_additional_kwargs), - ) + futures = [ + self._executor.submit( + self.fs._get_object, + bucket=self.bucket, + key=self.key, + ranges=r, + version_id=self.version_id, + **self.s3_additional_kwargs, ) - ) + for r in ranges + ] + object_ = self._merge_objects([f.result() for f in as_completed(futures)]) else: object_ = self.fs._get_object( self.bucket, diff --git a/pyathena/filesystem/s3_async.py b/pyathena/filesystem/s3_async.py new file mode 100644 index 00000000..44aeb834 --- /dev/null +++ b/pyathena/filesystem/s3_async.py @@ -0,0 +1,357 @@ +# -*- coding: utf-8 -*- +from __future__ import annotations + +import asyncio +import logging +from multiprocessing import cpu_count +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast + +from fsspec.asyn import AsyncFileSystem +from fsspec.callbacks import _DEFAULT_CALLBACK + +from pyathena.filesystem.s3 import S3File, S3FileSystem +from pyathena.filesystem.s3_executor import S3AioExecutor +from pyathena.filesystem.s3_object import S3Object + +if TYPE_CHECKING: + from datetime import datetime + + from pyathena.connection import Connection + +_logger = logging.getLogger(__name__) + + +class AioS3FileSystem(AsyncFileSystem): + """An async filesystem interface for Amazon S3 using fsspec's AsyncFileSystem. + + This class wraps ``S3FileSystem`` to provide native asyncio support. Instead of + using ``ThreadPoolExecutor`` for parallel operations, it uses ``asyncio.gather`` + with ``asyncio.to_thread`` for natural integration with the asyncio event loop. + + The implementation uses composition: an internal ``S3FileSystem`` instance handles + all boto3 calls, while this class delegates to it via ``asyncio.to_thread()``. + This avoids diamond inheritance issues and keeps all boto3 logic in one place. + + File handles created by ``_open`` use ``S3AioExecutor`` so that parallel + operations (range reads, multipart uploads) are dispatched via the event loop + instead of spawning additional threads. + + Attributes: + _sync_fs: The internal synchronous S3FileSystem instance. + + Example: + >>> from pyathena.filesystem.s3_async import AioS3FileSystem + >>> fs = AioS3FileSystem(asynchronous=True) + >>> + >>> # Use in async context + >>> files = await fs._ls('s3://my-bucket/data/') + >>> + >>> # Sync wrappers also available (auto-generated by fsspec) + >>> files = fs.ls('s3://my-bucket/data/') + """ + + # https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html + DELETE_OBJECTS_MAX_KEYS: int = 1000 + + protocol = ("s3", "s3a") + mirror_sync_methods = True + async_impl = True + _extra_tokenize_attributes = ("default_block_size",) + + def __init__( + self, + connection: Optional["Connection[Any]"] = None, + default_block_size: Optional[int] = None, + default_cache_type: Optional[str] = None, + max_workers: int = (cpu_count() or 1) * 5, + s3_additional_kwargs: Optional[Dict[str, Any]] = None, + asynchronous: bool = False, + loop: Optional[Any] = None, + batch_size: Optional[int] = None, + **kwargs, + ) -> None: + super().__init__( + asynchronous=asynchronous, + loop=loop, + batch_size=batch_size, + **kwargs, + ) + self._sync_fs = S3FileSystem( + connection=connection, + default_block_size=default_block_size, + default_cache_type=default_cache_type, + max_workers=max_workers, + s3_additional_kwargs=s3_additional_kwargs, + **kwargs, + ) + # Share dircache for cache coherence between async and sync instances + self.dircache = self._sync_fs.dircache + + @staticmethod + def parse_path(path: str) -> Tuple[str, Optional[str], Optional[str]]: + return S3FileSystem.parse_path(path) + + async def _info(self, path: str, **kwargs) -> S3Object: + return await asyncio.to_thread(self._sync_fs.info, path, **kwargs) + + async def _ls( + self, path: str, detail: bool = False, **kwargs + ) -> Union[List[S3Object], List[str]]: + return await asyncio.to_thread(self._sync_fs.ls, path, detail=detail, **kwargs) + + async def _cat_file( + self, path: str, start: Optional[int] = None, end: Optional[int] = None, **kwargs + ) -> bytes: + return await asyncio.to_thread(self._sync_fs.cat_file, path, start=start, end=end, **kwargs) + + async def _exists(self, path: str, **kwargs) -> bool: + return await asyncio.to_thread(self._sync_fs.exists, path, **kwargs) + + async def _rm_file(self, path: str, **kwargs) -> None: + await asyncio.to_thread(self._sync_fs.rm_file, path, **kwargs) + + async def _pipe_file(self, path: str, value: bytes, **kwargs) -> None: + await asyncio.to_thread(self._sync_fs.pipe_file, path, value, **kwargs) + + async def _put_file(self, lpath: str, rpath: str, callback=_DEFAULT_CALLBACK, **kwargs) -> None: + await asyncio.to_thread(self._sync_fs.put_file, lpath, rpath, callback=callback, **kwargs) + + async def _get_file(self, rpath: str, lpath: str, callback=_DEFAULT_CALLBACK, **kwargs) -> None: + await asyncio.to_thread(self._sync_fs.get_file, rpath, lpath, callback=callback, **kwargs) + + async def _mkdir(self, path: str, create_parents: bool = True, **kwargs) -> None: + await asyncio.to_thread(self._sync_fs.mkdir, path, create_parents=create_parents, **kwargs) + + async def _makedirs(self, path: str, exist_ok: bool = False) -> None: + await asyncio.to_thread(self._sync_fs.makedirs, path, exist_ok=exist_ok) + + async def _rm(self, path: Union[str, List[str]], recursive: bool = False, **kwargs) -> None: + """Remove files or directories using async parallel batch deletion. + + For multiple paths, chunks into batches of 1000 (S3 API limit) and uses + ``asyncio.gather`` with ``asyncio.to_thread`` instead of ThreadPoolExecutor. + """ + if isinstance(path, str): + path = [path] + + bucket, _, _ = self.parse_path(path[0]) + + expand_paths: List[str] = [] + for p in path: + expanded = await asyncio.to_thread(self._sync_fs.expand_path, p, recursive=recursive) + expand_paths.extend(expanded) + + if not expand_paths: + return + + quiet = kwargs.pop("Quiet", True) + delete_objects: List[Dict[str, Any]] = [] + for p in expand_paths: + _, key, version_id = self.parse_path(p) + if key: + object_: Dict[str, Any] = {"Key": key} + if version_id: + object_["VersionId"] = version_id + delete_objects.append(object_) + + if not delete_objects: + return + + chunks = [ + delete_objects[i : i + self.DELETE_OBJECTS_MAX_KEYS] + for i in range(0, len(delete_objects), self.DELETE_OBJECTS_MAX_KEYS) + ] + + async def _delete_chunk(chunk: List[Dict[str, Any]]) -> None: + request = { + "Bucket": bucket, + "Delete": { + "Objects": chunk, + "Quiet": quiet, + }, + } + await asyncio.to_thread( + self._sync_fs._call, self._sync_fs._client.delete_objects, **request + ) + + await asyncio.gather(*[_delete_chunk(chunk) for chunk in chunks]) + + for p in expand_paths: + self._sync_fs.invalidate_cache(p) + + async def _cp_file(self, path1: str, path2: str, **kwargs) -> None: + """Copy an S3 object, using async parallel multipart upload for large files.""" + kwargs.pop("onerror", None) + bucket1, key1, version_id1 = self.parse_path(path1) + bucket2, key2, version_id2 = self.parse_path(path2) + if version_id2: + raise ValueError("Cannot copy to a versioned file.") + if not key1 or not key2: + raise ValueError("Cannot copy buckets.") + + info1 = await self._info(path1) + size1 = info1.get("size", 0) + if size1 <= S3FileSystem.MULTIPART_UPLOAD_MAX_PART_SIZE: + await asyncio.to_thread( + self._sync_fs._copy_object, + bucket1=bucket1, + key1=key1, + version_id1=version_id1, + bucket2=bucket2, + key2=key2, + **kwargs, + ) + else: + await self._copy_object_with_multipart_upload( + bucket1=bucket1, + key1=key1, + version_id1=version_id1, + size1=size1, + bucket2=bucket2, + key2=key2, + **kwargs, + ) + self._sync_fs.invalidate_cache(path2) + + async def _copy_object_with_multipart_upload( + self, + bucket1: str, + key1: str, + size1: int, + bucket2: str, + key2: str, + block_size: Optional[int] = None, + version_id1: Optional[str] = None, + **kwargs, + ) -> None: + block_size = block_size if block_size else S3FileSystem.MULTIPART_UPLOAD_MAX_PART_SIZE + if ( + block_size < S3FileSystem.MULTIPART_UPLOAD_MIN_PART_SIZE + or block_size > S3FileSystem.MULTIPART_UPLOAD_MAX_PART_SIZE + ): + raise ValueError("Block size must be greater than 5MiB and less than 5GiB.") + + copy_source: Dict[str, Any] = { + "Bucket": bucket1, + "Key": key1, + } + if version_id1: + copy_source["VersionId"] = version_id1 + + ranges = S3File._get_ranges( + 0, + size1, + self._sync_fs.max_workers, + block_size, + ) + multipart_upload = await asyncio.to_thread( + self._sync_fs._create_multipart_upload, + bucket=bucket2, + key=key2, + **kwargs, + ) + + async def _upload_part(i: int, range_: Tuple[int, int]) -> Dict[str, Any]: + result = await asyncio.to_thread( + self._sync_fs._upload_part_copy, + bucket=bucket2, + key=key2, + copy_source=copy_source, + upload_id=cast(str, multipart_upload.upload_id), + part_number=i + 1, + copy_source_ranges=range_, + ) + return { + "ETag": result.etag, + "PartNumber": result.part_number, + } + + parts = await asyncio.gather(*[_upload_part(i, r) for i, r in enumerate(ranges)]) + parts_list = sorted(parts, key=lambda x: x["PartNumber"]) + + await asyncio.to_thread( + self._sync_fs._complete_multipart_upload, + bucket=bucket2, + key=key2, + upload_id=cast(str, multipart_upload.upload_id), + parts=parts_list, + ) + + async def _find( + self, + path: str, + maxdepth: Optional[int] = None, + withdirs: bool = False, + **kwargs, + ) -> Union[Dict[str, S3Object], List[str]]: + detail = kwargs.pop("detail", False) + files = await asyncio.to_thread( + self._sync_fs._find, path, maxdepth=maxdepth, withdirs=withdirs, **kwargs + ) + if detail: + return {f.name: f for f in files} + return [f.name for f in files] + + def _open( + self, + path: str, + mode: str = "rb", + block_size: Optional[int] = None, + cache_type: Optional[str] = None, + autocommit: bool = True, + cache_options: Optional[Dict[Any, Any]] = None, + **kwargs, + ) -> "AioS3File": + if block_size is None: + block_size = self._sync_fs.default_block_size + if cache_type is None: + cache_type = self._sync_fs.default_cache_type + max_workers = kwargs.pop("max_worker", self._sync_fs.max_workers) + s3_additional_kwargs = kwargs.pop("s3_additional_kwargs", {}) + s3_additional_kwargs.update(self._sync_fs.s3_additional_kwargs) + + return AioS3File( + self._sync_fs, + path, + mode, + version_id=None, + max_workers=max_workers, + executor=S3AioExecutor(loop=self._loop), + block_size=block_size, + cache_type=cache_type, + autocommit=autocommit, + cache_options=cache_options, + s3_additional_kwargs=s3_additional_kwargs, + **kwargs, + ) + + def sign(self, path: str, expiration: int = 3600, **kwargs) -> str: + return cast(str, self._sync_fs.sign(path, expiration=expiration, **kwargs)) + + def checksum(self, path: str, **kwargs) -> int: + return cast(int, self._sync_fs.checksum(path, **kwargs)) + + def created(self, path: str) -> "datetime": + return self._sync_fs.created(path) + + def modified(self, path: str) -> "datetime": + return self._sync_fs.modified(path) + + def invalidate_cache(self, path: Optional[str] = None) -> None: + self._sync_fs.invalidate_cache(path) + + async def _touch(self, path: str, truncate: bool = True, **kwargs) -> None: + await asyncio.to_thread(self._sync_fs.touch, path, truncate=truncate, **kwargs) + + +class AioS3File(S3File): + """Async-aware S3 file handle using ``S3AioExecutor``. + + Functionally identical to ``S3File``; exists as a distinct type for + ``isinstance`` checks and to document the async execution model. + All parallel operations (range reads, multipart uploads) are dispatched + through the ``S3Executor`` interface — the ``S3AioExecutor`` + provided by ``AioS3FileSystem`` uses the event loop instead of threads. + """ + + pass diff --git a/pyathena/filesystem/s3_executor.py b/pyathena/filesystem/s3_executor.py new file mode 100644 index 00000000..7c0bcee0 --- /dev/null +++ b/pyathena/filesystem/s3_executor.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +from __future__ import annotations + +import asyncio +from abc import ABCMeta, abstractmethod +from concurrent.futures import Future +from concurrent.futures.thread import ThreadPoolExecutor +from typing import Any, Callable, Optional, TypeVar + +T = TypeVar("T") + + +class S3Executor(metaclass=ABCMeta): + """Abstract executor for parallel S3 operations. + + Defines the interface used by ``S3File`` and ``S3FileSystem`` for submitting + work to run in parallel and for shutting down the executor when done. + Both ``submit`` and ``shutdown`` mirror the ``concurrent.futures.Executor`` + interface so that ``as_completed()`` and ``Future.cancel()`` work unchanged. + """ + + @abstractmethod + def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]: + """Submit a callable for execution and return a Future.""" + ... + + @abstractmethod + def shutdown(self, wait: bool = True) -> None: + """Shut down the executor, freeing any resources.""" + ... + + def __enter__(self) -> "S3Executor": + return self + + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.shutdown(wait=True) + + +class S3ThreadPoolExecutor(S3Executor): + """Executor that delegates to a ``ThreadPoolExecutor``. + + This is the default executor used by ``S3File`` and ``S3FileSystem`` + for synchronous parallel operations. + """ + + def __init__(self, max_workers: int) -> None: + self._executor = ThreadPoolExecutor(max_workers=max_workers) + + def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]: + return self._executor.submit(fn, *args, **kwargs) + + def shutdown(self, wait: bool = True) -> None: + self._executor.shutdown(wait=wait) + + +class S3AioExecutor(S3Executor): + """Executor that schedules work on an asyncio event loop. + + Uses ``asyncio.run_coroutine_threadsafe(asyncio.to_thread(fn), loop)`` to + dispatch blocking functions onto the event loop's thread pool, returning + ``concurrent.futures.Future`` objects that are compatible with + ``as_completed()`` and ``Future.cancel()``. + + This avoids thread-in-thread nesting when ``S3File`` is used from within + ``asyncio.to_thread()`` calls (the pattern used by ``AioS3FileSystem``). + + Args: + loop: A running asyncio event loop. + + Raises: + RuntimeError: If the event loop is not running when ``submit`` is called. + """ + + def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: + self._loop = loop + + def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]: + if self._loop is not None and self._loop.is_running(): + return asyncio.run_coroutine_threadsafe( + asyncio.to_thread(fn, *args, **kwargs), self._loop + ) + raise RuntimeError( + "S3AioExecutor requires a running event loop. " + "Use S3ThreadPoolExecutor for synchronous usage." + ) + + def shutdown(self, wait: bool = True) -> None: + # No resources to release — work is dispatched to the event loop. + pass diff --git a/pyathena/s3fs/result_set.py b/pyathena/s3fs/result_set.py index 602de09f..1068aaa0 100644 --- a/pyathena/s3fs/result_set.py +++ b/pyathena/s3fs/result_set.py @@ -5,6 +5,8 @@ from io import TextIOWrapper from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union +from fsspec import AbstractFileSystem + from pyathena.converter import Converter from pyathena.error import OperationalError, ProgrammingError from pyathena.filesystem.s3 import S3FileSystem @@ -62,6 +64,7 @@ def __init__( retry_config: RetryConfig, block_size: Optional[int] = None, csv_reader: Optional[CSVReaderType] = None, + filesystem_class: Optional[Type[AbstractFileSystem]] = None, **kwargs, ) -> None: super().__init__( @@ -77,6 +80,7 @@ def __init__( self._arraysize = arraysize self._block_size = block_size if block_size else self.DEFAULT_BLOCK_SIZE self._csv_reader_class: CSVReaderType = csv_reader or AthenaCSVReader + self._filesystem_class: Type[AbstractFileSystem] = filesystem_class or S3FileSystem self._fs = self._create_s3_file_system() self._csv_reader: Optional[Any] = None @@ -92,9 +96,9 @@ def __init__( if not self._csv_reader and not self._rows and pre_fetched_rows: self._rows.extend(pre_fetched_rows) - def _create_s3_file_system(self) -> S3FileSystem: + def _create_s3_file_system(self) -> AbstractFileSystem: """Create S3FileSystem using connection settings.""" - return S3FileSystem( + return self._filesystem_class( connection=self.connection, default_block_size=self._block_size, ) diff --git a/tests/pyathena/filesystem/test_s3_async.py b/tests/pyathena/filesystem/test_s3_async.py new file mode 100644 index 00000000..5bd312bd --- /dev/null +++ b/tests/pyathena/filesystem/test_s3_async.py @@ -0,0 +1,864 @@ +# -*- coding: utf-8 -*- +import os +import tempfile +import time +import urllib.parse +import urllib.request +import uuid +from datetime import datetime, timezone +from itertools import chain +from pathlib import Path + +import fsspec +import pytest +from fsspec import Callback + +from pyathena.filesystem.s3 import S3File, S3FileSystem +from pyathena.filesystem.s3_async import AioS3File, AioS3FileSystem +from pyathena.filesystem.s3_object import S3ObjectType, S3StorageClass +from tests import ENV +from tests.pyathena.conftest import connect + + +@pytest.fixture(scope="class") +def register_async_filesystem(): + fsspec.register_implementation( + "s3", "pyathena.filesystem.s3_async.AioS3FileSystem", clobber=True + ) + fsspec.register_implementation( + "s3a", "pyathena.filesystem.s3_async.AioS3FileSystem", clobber=True + ) + + +@pytest.mark.usefixtures("register_async_filesystem") +class TestAioS3FileSystem: + def test_parse_path(self): + actual = AioS3FileSystem.parse_path("s3://bucket") + assert actual[0] == "bucket" + assert actual[1] is None + assert actual[2] is None + + actual = AioS3FileSystem.parse_path("s3://bucket/") + assert actual[0] == "bucket" + assert actual[1] is None + assert actual[2] is None + + actual = AioS3FileSystem.parse_path("s3://bucket/path/to/obj") + assert actual[0] == "bucket" + assert actual[1] == "path/to/obj" + assert actual[2] is None + + actual = AioS3FileSystem.parse_path("s3://bucket/path/to/obj?versionId=12345abcde") + assert actual[0] == "bucket" + assert actual[1] == "path/to/obj" + assert actual[2] == "12345abcde" + + actual = AioS3FileSystem.parse_path("s3a://bucket") + assert actual[0] == "bucket" + assert actual[1] is None + assert actual[2] is None + + actual = AioS3FileSystem.parse_path("s3a://bucket/") + assert actual[0] == "bucket" + assert actual[1] is None + assert actual[2] is None + + actual = AioS3FileSystem.parse_path("s3a://bucket/path/to/obj") + assert actual[0] == "bucket" + assert actual[1] == "path/to/obj" + assert actual[2] is None + + actual = AioS3FileSystem.parse_path("s3a://bucket/path/to/obj?versionId=12345abcde") + assert actual[0] == "bucket" + assert actual[1] == "path/to/obj" + assert actual[2] == "12345abcde" + + actual = AioS3FileSystem.parse_path("bucket") + assert actual[0] == "bucket" + assert actual[1] is None + assert actual[2] is None + + actual = AioS3FileSystem.parse_path("bucket/") + assert actual[0] == "bucket" + assert actual[1] is None + assert actual[2] is None + + actual = AioS3FileSystem.parse_path("bucket/path/to/obj") + assert actual[0] == "bucket" + assert actual[1] == "path/to/obj" + assert actual[2] is None + + actual = AioS3FileSystem.parse_path("bucket/path/to/obj?versionId=12345abcde") + assert actual[0] == "bucket" + assert actual[1] == "path/to/obj" + assert actual[2] == "12345abcde" + + actual = AioS3FileSystem.parse_path("bucket/path/to/obj?versionID=12345abcde") + assert actual[0] == "bucket" + assert actual[1] == "path/to/obj" + assert actual[2] == "12345abcde" + + actual = AioS3FileSystem.parse_path("bucket/path/to/obj?versionid=12345abcde") + assert actual[0] == "bucket" + assert actual[1] == "path/to/obj" + assert actual[2] == "12345abcde" + + actual = AioS3FileSystem.parse_path("bucket/path/to/obj?version_id=12345abcde") + assert actual[0] == "bucket" + assert actual[1] == "path/to/obj" + assert actual[2] == "12345abcde" + + def test_parse_path_invalid(self): + with pytest.raises(ValueError): + AioS3FileSystem.parse_path("http://bucket") + + with pytest.raises(ValueError): + AioS3FileSystem.parse_path("s3://bucket?") + + with pytest.raises(ValueError): + AioS3FileSystem.parse_path("s3://bucket?foo=bar") + + with pytest.raises(ValueError): + AioS3FileSystem.parse_path("s3://bucket/path/to/obj?foo=bar") + + with pytest.raises(ValueError): + AioS3FileSystem.parse_path("s3a://bucket?") + + with pytest.raises(ValueError): + AioS3FileSystem.parse_path("s3a://bucket?foo=bar") + + with pytest.raises(ValueError): + AioS3FileSystem.parse_path("s3a://bucket/path/to/obj?foo=bar") + + @pytest.fixture(scope="class") + def fs(self, request): + if not hasattr(request, "param"): + setattr(request, "param", {}) # noqa: B010 + return AioS3FileSystem(connection=connect(), **request.param) + + @pytest.mark.parametrize( + ["fs", "start", "end", "target_data"], + list( + chain( + *[ + [ + ({"default_block_size": x}, 0, 5, b"01234"), + ({"default_block_size": x}, 2, 7, b"23456"), + ({"default_block_size": x}, 0, 10, b"0123456789"), + ] + for x in (S3FileSystem.DEFAULT_BLOCK_SIZE, 3) + ] + ) + ), + indirect=["fs"], + ) + def test_read(self, fs, start, end, target_data): + # lowest level access: use _get_object + data = fs._sync_fs._get_object( + ENV.s3_staging_bucket, ENV.s3_filesystem_test_file_key, ranges=(start, end) + ) + assert data == (start, target_data), data + with fs.open( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_filesystem_test_file_key}", "rb" + ) as file: + # mid-level access: use _fetch_range + data = file._fetch_range(start, end) + assert data == target_data, data + # high-level: use fileobj seek and read + file.seek(start) + data = file.read(end - start) + assert data == target_data, data + + @pytest.mark.parametrize( + ["base", "exp"], + [ + (1, 2**10), + (1, 2**20), + ], + ) + def test_write(self, fs, base, exp): + data = b"a" * (base * exp) + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_write/{uuid.uuid4()}" + ) + with fs.open(path, "wb") as f: + f.write(data) + with fs.open(path, "rb") as f: + actual = f.read() + assert len(actual) == len(data) + assert actual == data + + @pytest.mark.parametrize( + ["base", "exp"], + [ + (1, 2**10), + (1, 2**20), + ], + ) + def test_append(self, fs, base, exp): + data = b"a" * (base * exp) + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_append/{uuid.uuid4()}" + ) + with fs.open(path, "ab") as f: + f.write(data) + extra = b"extra" + with fs.open(path, "ab") as f: + f.write(extra) + with fs.open(path, "rb") as f: + actual = f.read() + assert len(actual) == len(data + extra) + assert actual == data + extra + + @pytest.mark.asyncio + async def test_ls_buckets(self, fs): + fs.invalidate_cache() + actual = await fs._ls("s3://") + assert ENV.s3_staging_bucket in actual, actual + + fs.invalidate_cache() + actual = await fs._ls("s3:///") + assert ENV.s3_staging_bucket in actual, actual + + fs.invalidate_cache() + actual = await fs._ls("s3://", detail=True) + found = next(filter(lambda x: x.name == ENV.s3_staging_bucket, actual), None) + assert found + assert found.name == ENV.s3_staging_bucket + + fs.invalidate_cache() + actual = await fs._ls("s3:///", detail=True) + found = next(filter(lambda x: x.name == ENV.s3_staging_bucket, actual), None) + assert found + assert found.name == ENV.s3_staging_bucket + + @pytest.mark.asyncio + async def test_ls_dirs(self, fs): + dir_ = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_ls_dirs" + ) + for i in range(5): + await fs._pipe_file(f"{dir_}/prefix/test_{i}", bytes(i)) + await fs._touch(f"{dir_}/prefix2") + + assert len(await fs._ls(f"{dir_}/prefix")) == 5 + assert len(await fs._ls(f"{dir_}/prefix/")) == 5 + assert len(await fs._ls(f"{dir_}/prefix/test_")) == 0 + assert len(await fs._ls(f"{dir_}/prefix2")) == 1 + + test_1 = await fs._ls(f"{dir_}/prefix/test_1") + assert len(test_1) == 1 + assert test_1[0] == fs._strip_protocol(f"{dir_}/prefix/test_1") + + test_1_detail = await fs._ls(f"{dir_}/prefix/test_1", detail=True) + assert len(test_1_detail) == 1 + assert test_1_detail[0].name == fs._strip_protocol(f"{dir_}/prefix/test_1") + assert test_1_detail[0].size == 1 + + @pytest.mark.asyncio + async def test_info_bucket(self, fs): + dir_ = f"s3://{ENV.s3_staging_bucket}" + bucket, key, version_id = fs.parse_path(dir_) + info = await fs._info(dir_) + + assert info.name == fs._strip_protocol(dir_) + assert info.bucket == bucket + assert info.key is None + assert info.last_modified is None + assert info.size == 0 + assert info.etag is None + assert info.type == S3ObjectType.S3_OBJECT_TYPE_DIRECTORY + assert info.storage_class == S3StorageClass.S3_STORAGE_CLASS_BUCKET + assert info.version_id == version_id + + dir_ = f"s3://{ENV.s3_staging_bucket}/" + bucket, key, version_id = fs.parse_path(dir_) + info = await fs._info(dir_) + + assert info.name == fs._strip_protocol(dir_) + assert info.bucket == bucket + assert info.key is None + assert info.last_modified is None + assert info.size == 0 + assert info.etag is None + assert info.type == S3ObjectType.S3_OBJECT_TYPE_DIRECTORY + assert info.storage_class == S3StorageClass.S3_STORAGE_CLASS_BUCKET + assert info.version_id == version_id + + @pytest.mark.asyncio + async def test_info_dir(self, fs): + dir_ = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_info_dir" + ) + file = f"{dir_}/{uuid.uuid4()}" + + fs.invalidate_cache() + with pytest.raises(FileNotFoundError): + await fs._info(f"s3://{uuid.uuid4()}") + + await fs._pipe_file(file, b"a") + bucket, key, version_id = fs.parse_path(dir_) + fs.invalidate_cache() + info = await fs._info(dir_) + fs.invalidate_cache() + + assert info.name == fs._strip_protocol(dir_) + assert info.bucket == bucket + assert info.key == key.rstrip("/") + assert info.last_modified is None + assert info.size == 0 + assert info.etag is None + assert info.type == S3ObjectType.S3_OBJECT_TYPE_DIRECTORY + assert info.storage_class == S3StorageClass.S3_STORAGE_CLASS_DIRECTORY + assert info.version_id == version_id + + @pytest.mark.asyncio + async def test_info_file(self, fs): + dir_ = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_info_file" + ) + file = f"{dir_}/{uuid.uuid4()}" + + fs.invalidate_cache() + with pytest.raises(FileNotFoundError): + await fs._info(file) + + now = datetime.now(timezone.utc) + await fs._pipe_file(file, b"a") + bucket, key, version_id = fs.parse_path(file) + fs.invalidate_cache() + info = await fs._info(file) + fs.invalidate_cache() + ls_info = (await fs._ls(file, detail=True))[0] + + assert info == ls_info + assert info.name == fs._strip_protocol(file) + assert info.bucket == bucket + assert info.key == key + assert info.last_modified >= now + assert info.size == 1 + assert info.etag is not None + assert info.type == S3ObjectType.S3_OBJECT_TYPE_FILE + assert info.storage_class == S3StorageClass.S3_STORAGE_CLASS_STANDARD + assert info.version_id == version_id + + @pytest.mark.asyncio + async def test_find(self, fs): + dir_ = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_find" + ) + for i in range(5): + await fs._pipe_file(f"{dir_}/prefix/test_{i}", bytes(i)) + await fs._touch(f"{dir_}/prefix2") + + result = await fs._find(f"{dir_}/prefix") + assert len(result) == 5 + + result = await fs._find(f"{dir_}/prefix/") + assert len(result) == 5 + + result = await fs._find(dir_, prefix="prefix") + assert len(result) == 6 + + result = await fs._find(f"{dir_}/prefix/test_") + assert len(result) == 0 + + result = await fs._find(f"{dir_}/prefix", prefix="test_") + assert len(result) == 5 + + result = await fs._find(f"{dir_}/prefix/", prefix="test_") + assert len(result) == 5 + + test_1 = await fs._find(f"{dir_}/prefix/test_1") + assert len(test_1) == 1 + assert test_1[0] == fs._strip_protocol(f"{dir_}/prefix/test_1") + + test_1_detail = await fs._find(f"{dir_}/prefix/test_1", detail=True) + assert len(test_1_detail) == 1 + assert test_1_detail[ + fs._strip_protocol(f"{dir_}/prefix/test_1") + ].name == fs._strip_protocol(f"{dir_}/prefix/test_1") + assert test_1_detail[fs._strip_protocol(f"{dir_}/prefix/test_1")].size == 1 + + @pytest.mark.asyncio + async def test_find_maxdepth(self, fs): + dir_ = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_find_maxdepth" + ) + # Create files at different depths + await fs._touch(f"{dir_}/file0.txt") + await fs._touch(f"{dir_}/level1/file1.txt") + await fs._touch(f"{dir_}/level1/level2/file2.txt") + await fs._touch(f"{dir_}/level1/level2/level3/file3.txt") + + # Test maxdepth=0 (only files in the root) + result = await fs._find(dir_, maxdepth=0) + assert len(result) == 1 + assert fs._strip_protocol(f"{dir_}/file0.txt") in result + + # Test maxdepth=1 (files in root and level1) + result = await fs._find(dir_, maxdepth=1) + assert len(result) == 2 + assert fs._strip_protocol(f"{dir_}/file0.txt") in result + assert fs._strip_protocol(f"{dir_}/level1/file1.txt") in result + + # Test maxdepth=2 (files in root, level1, and level2) + result = await fs._find(dir_, maxdepth=2) + assert len(result) == 3 + assert fs._strip_protocol(f"{dir_}/level1/level2/file2.txt") in result + + # Test no maxdepth (all files) + result = await fs._find(dir_) + assert len(result) == 4 + + @pytest.mark.asyncio + async def test_find_withdirs(self, fs): + dir_ = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_find_withdirs" + ) + # Create directory structure with files + await fs._touch(f"{dir_}/file1.txt") + await fs._touch(f"{dir_}/subdir1/file2.txt") + await fs._touch(f"{dir_}/subdir1/subdir2/file3.txt") + await fs._touch(f"{dir_}/subdir3/file4.txt") + + # Test default behavior (withdirs=False) + result = await fs._find(dir_) + assert len(result) == 4 # Only files + for r in result: + assert r.endswith(".txt") + + # Test withdirs=True + result = await fs._find(dir_, withdirs=True) + assert len(result) > 4 # Files and directories + + # Verify directories are included + dirs = [r for r in result if not r.endswith(".txt")] + assert len(dirs) > 0 + assert any("subdir1" in d for d in dirs) + assert any("subdir2" in d for d in dirs) + assert any("subdir3" in d for d in dirs) + + # Test withdirs=False explicitly + result = await fs._find(dir_, withdirs=False) + assert len(result) == 4 # Only files + + def test_du(self): + # TODO + pass + + @pytest.mark.asyncio + async def test_glob(self, fs): + dir_ = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_glob" + ) + path = f"{dir_}/nested/test_{uuid.uuid4()}" + await fs._touch(path) + + assert fs._strip_protocol(path) not in fs.glob(f"{dir_}/") + assert fs._strip_protocol(path) not in fs.glob(f"{dir_}/*") + assert fs._strip_protocol(path) not in fs.glob(f"{dir_}/nested") + assert fs._strip_protocol(path) not in fs.glob(f"{dir_}/nested/") + assert fs._strip_protocol(path) in fs.glob(f"{dir_}/nested/*") + assert fs._strip_protocol(path) in fs.glob(f"{dir_}/nested/test_*") + assert fs._strip_protocol(path) in fs.glob(f"{dir_}/*/*") + + with pytest.raises(ValueError): + fs.glob("*") + + @pytest.mark.asyncio + async def test_exists_bucket(self, fs): + assert await fs._exists("s3://") + assert await fs._exists("s3:///") + + path = f"s3://{ENV.s3_staging_bucket}" + assert await fs._exists(path) + + not_exists_path = f"s3://{uuid.uuid4()}" + assert not await fs._exists(not_exists_path) + + @pytest.mark.asyncio + async def test_exists_object(self, fs): + path = f"s3://{ENV.s3_staging_bucket}/{ENV.s3_filesystem_test_file_key}" + assert await fs._exists(path) + + not_exists_path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_exists/{uuid.uuid4()}" + ) + assert not await fs._exists(not_exists_path) + + @pytest.mark.asyncio + async def test_rm_file(self, fs): + dir_ = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_rm_file" + ) + file = f"{dir_}/{uuid.uuid4()}" + await fs._touch(file) + await fs._rm_file(file) + + assert not await fs._exists(file) + assert not await fs._exists(dir_) + + @pytest.mark.asyncio + async def test_rm(self, fs): + dir_ = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_rm" + ) + file = f"{dir_}/{uuid.uuid4()}" + await fs._touch(file) + await fs._rm(file) + + assert not await fs._exists(file) + assert not await fs._exists(dir_) + + @pytest.mark.asyncio + async def test_rm_recursive(self, fs): + dir_ = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_rm_recursive" + ) + + files = [f"{dir_}/{uuid.uuid4()}" for _ in range(10)] + for f in files: + await fs._touch(f) + + await fs._rm(dir_) + for f in files: + assert await fs._exists(f) + assert await fs._exists(dir_) + + await fs._rm(dir_, recursive=True) + for f in files: + assert not await fs._exists(f) + assert not await fs._exists(dir_) + + @pytest.mark.asyncio + async def test_touch(self, fs): + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_touch/{uuid.uuid4()}" + ) + assert not await fs._exists(path) + await fs._touch(path) + assert await fs._exists(path) + info = await fs._info(path) + assert info.size == 0 + + with fs.open(path, "wb") as f: + f.write(b"data") + info = await fs._info(path, refresh=True) + assert info.size == 4 + await fs._touch(path, truncate=True) + info = await fs._info(path, refresh=True) + assert info.size == 0 + + with fs.open(path, "wb") as f: + f.write(b"data") + info = await fs._info(path, refresh=True) + assert info.size == 4 + with pytest.raises(ValueError): + await fs._touch(path, truncate=False) + info = await fs._info(path, refresh=True) + assert info.size == 4 + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ["base", "exp"], + [ + (1, 2**10), + (1, 2**20), + ], + ) + async def test_pipe_cat(self, fs, base, exp): + data = b"a" * (base * exp) + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_pipe_cat/{uuid.uuid4()}" + ) + await fs._pipe_file(path, data) + assert await fs._cat_file(path) == data + + @pytest.mark.asyncio + async def test_cat_ranges(self, fs): + data = b"1234567890abcdefghijklmnopqrstuvwxyz" + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_cat_ranges/{uuid.uuid4()}" + ) + await fs._pipe_file(path, data) + + assert await fs._cat_file(path) == data + assert await fs._cat_file(path, start=5) == data[5:] + assert await fs._cat_file(path, end=5) == data[:5] + assert await fs._cat_file(path, start=1, end=-1) == data[1:-1] + assert await fs._cat_file(path, start=-5) == data[-5:] + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ["base", "exp"], + [ + (1, 2**10), + (1, 2**20), + ], + ) + async def test_put(self, fs, base, exp): + with tempfile.NamedTemporaryFile(delete=False) as tmp: + data = b"a" * (base * exp) + tmp.write(data) + tmp.flush() + + rpath = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_put/{uuid.uuid4()}" + ) + await fs._put_file(lpath=tmp.name, rpath=rpath) + tmp.seek(0) + assert await fs._cat_file(rpath) == tmp.read() + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ["base", "exp"], + [ + (1, 2**10), + (1, 2**20), + ], + ) + async def test_put_with_callback(self, fs, base, exp): + with tempfile.NamedTemporaryFile(delete=False) as tmp: + data = b"a" * (base * exp) + tmp.write(data) + tmp.flush() + + rpath = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_put_with_callback/{uuid.uuid4()}" + ) + callback = Callback() + await fs._put_file(lpath=tmp.name, rpath=rpath, callback=callback) + tmp.seek(0) + assert await fs._cat_file(rpath) == tmp.read() + assert callback.size == os.stat(tmp.name).st_size + assert callback.value == callback.size + + @pytest.mark.asyncio + @pytest.mark.parametrize( + ["base", "exp"], + [ + (1, 2**10), + (1, 2**20), + ], + ) + async def test_upload_cp_file(self, fs, base, exp): + with tempfile.NamedTemporaryFile(delete=False) as tmp: + data = b"a" * (base * exp) + tmp.write(data) + tmp.flush() + + rpath = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_upload_cp_file/{uuid.uuid4()}" + ) + await fs._put_file(lpath=tmp.name, rpath=rpath) + + rpath_copy = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_upload_cp_file_copy/{uuid.uuid4()}" + ) + await fs._cp_file(path1=rpath, path2=rpath_copy) + tmp.seek(0) + assert await fs._cat_file(rpath_copy) == tmp.read() + assert await fs._cat_file(rpath_copy) == await fs._cat_file(rpath) + + @pytest.mark.asyncio + async def test_move(self, fs): + path1 = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_move/{uuid.uuid4()}" + ) + path2 = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_move/{uuid.uuid4()}" + ) + data = b"a" + await fs._pipe_file(path1, data) + fs.mv(path1, path2) + assert await fs._cat_file(path2) == data + assert not await fs._exists(path1) + + @pytest.mark.asyncio + async def test_get_file(self, fs): + with tempfile.TemporaryDirectory() as tmp: + rpath = f"s3://{ENV.s3_staging_bucket}/{ENV.s3_filesystem_test_file_key}" + lpath = Path(f"{tmp}/{uuid.uuid4()}") + callback = Callback() + await fs._get_file(rpath=rpath, lpath=str(lpath), callback=callback) + + assert lpath.open("rb").read() == await fs._cat_file(rpath) + assert callback.size == os.stat(lpath).st_size + assert callback.value == callback.size + + def test_open_returns_aio_s3_file(self, fs): + path = f"s3://{ENV.s3_staging_bucket}/{ENV.s3_filesystem_test_file_key}" + with fs.open(path, "rb") as f: + assert isinstance(f, AioS3File) + data = f.read() + assert data == b"0123456789" + + def test_checksum(self, fs): + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_checksum/{uuid.uuid4()}" + ) + bucket, key, _ = fs.parse_path(path) + + fs.pipe_file(path, b"foo") + checksum = fs.checksum(path) + fs.ls(path) # caching + fs._sync_fs._put_object(bucket=bucket, key=key, body=b"bar") + assert checksum == fs.checksum(path) + assert checksum != fs.checksum(path, refresh=True) + + fs.pipe_file(path, b"foo") + checksum = fs.checksum(path) + fs.ls(path) # caching + fs._sync_fs._delete_object(bucket, key) + assert checksum == fs.checksum(path) + with pytest.raises(FileNotFoundError): + fs.checksum(path, refresh=True) + + def test_sign(self, fs): + path = f"s3://{ENV.s3_staging_bucket}/{ENV.s3_filesystem_test_file_key}" + requested = time.time() + time.sleep(1) + url = fs.sign(path, expiration=100) + parsed = urllib.parse.urlparse(url) + query = urllib.parse.parse_qs(parsed.query) + expires = int(query["Expires"][0]) + with urllib.request.urlopen(url) as r: + data = r.read() + + assert "https" in url + assert requested + 100 < expires + assert data == b"0123456789" + + def test_pandas_read_csv(self): + import pandas + + df = pandas.read_csv( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_filesystem_test_file_key}", + header=None, + names=["col"], + ) + assert [(row["col"],) for _, row in df.iterrows()] == [(123456789,)] + + @pytest.mark.parametrize( + ["line_count"], + [ + (1 * (2**20),), + ], + ) + def test_pandas_write_csv(self, line_count): + import pandas + + with tempfile.NamedTemporaryFile("w+t") as tmp: + tmp.write("col1") + tmp.write("\n") + for _ in range(0, line_count): + tmp.write("a") + tmp.write("\n") + tmp.flush() + + tmp.seek(0) + df = pandas.read_csv(tmp.name) + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_pandas_write_csv/{uuid.uuid4()}.csv" + ) + df.to_csv(path, index=False) + + actual = pandas.read_csv(path) + pandas.testing.assert_frame_equal(actual, df) + + def test_sync_wrappers(self, fs): + """Verify that mirror_sync_methods generates working sync wrappers.""" + actual = fs.ls(f"s3://{ENV.s3_staging_bucket}") + assert isinstance(actual, list) + assert len(actual) > 0 + + assert fs.exists(f"s3://{ENV.s3_staging_bucket}") + + def test_invalidate_cache(self, fs): + path = ( + f"s3://{ENV.s3_staging_bucket}/{ENV.s3_staging_key}{ENV.schema}/" + f"filesystem/test_async_invalidate_cache/{uuid.uuid4()}" + ) + fs.pipe_file(path, b"data") + fs.info(path) + + # Cache should be populated + fs.invalidate_cache(path) + # Should not raise after cache invalidation + info = fs.info(path) + assert info.size == 4 + + +class TestAioS3File: + @pytest.mark.parametrize( + ["objects", "target"], + [ + ([(0, b"")], b""), + ([(0, b"foo")], b"foo"), + ([(0, b"foo"), (1, b"bar")], b"foobar"), + ([(1, b"foo"), (0, b"bar")], b"barfoo"), + ([(1, b""), (0, b"bar")], b"bar"), + ([(1, b"foo"), (0, b"")], b"foo"), + ([(2, b"foo"), (1, b"bar"), (3, b"baz")], b"barfoobaz"), + ], + ) + def test_merge_objects(self, objects, target): + assert S3File._merge_objects(objects) == target + + @pytest.mark.parametrize( + ["start", "end", "max_workers", "worker_block_size", "ranges"], + [ + (42, 1337, 1, 999, [(42, 1337)]), # single worker + (42, 1337, 2, 999, [(42, 42 + 999), (42 + 999, 1337)]), # more workers + ( + 42, + 1337, + 2, + 333, + [ + (42, 42 + 333), + (42 + 333, 42 + 666), + (42 + 666, 42 + 999), + (42 + 999, 1337), + ], + ), + (42, 1337, 2, 1295, [(42, 1337)]), # single block + (42, 1337, 2, 1296, [(42, 1337)]), # single block + (42, 1337, 2, 1294, [(42, 1336), (1336, 1337)]), # single block too small + ], + ) + def test_get_ranges(self, start, end, max_workers, worker_block_size, ranges): + assert ( + S3File._get_ranges( + start, end, max_workers=max_workers, worker_block_size=worker_block_size + ) + == ranges + ) + + def test_format_ranges(self): + assert S3File._format_ranges((0, 100)) == "bytes=0-99"