Skip to content
Merged
90 changes: 90 additions & 0 deletions gcsfs/extended_gcsfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,96 @@ async def _mv(self, path1, path2, **kwargs):

mv = asyn.sync_wrapper(_mv)

async def _mkdir(
self, path, create_parents=False, enable_hierarchial_namespace=False, **kwargs
):
"""
If the path does not contain an object key, a new bucket is created.
If `enable_hierarchial_namespace` is True, the bucket will have Hierarchical Namespace enabled.

For HNS-enabled buckets, this method creates a folder object. If
`create_parents` is True, any missing parent folders are also created.

If bucket doesn't exist, enable_hierarchial_namespace and create_parents are set to True
and the path includes a key then HNS-enabled bucket will be created
and also the folders within that bucket.

If `create_parents` is False and a parent does not exist, a
FileNotFoundError is raised.

For non-HNS buckets, it falls back to the parent implementation which
may involve creating a bucket or doing nothing (as GCS has no true empty directories).
"""
path = self._strip_protocol(path)
if enable_hierarchial_namespace:
kwargs["hierarchicalNamespace"] = {"enabled": True}
# HNS buckets require uniform bucket-level access.
kwargs["iamConfiguration"] = {"uniformBucketLevelAccess": {"enabled": True}}
# When uniformBucketLevelAccess is enabled, ACLs cannot be used.
# We must explicitly set them to None to prevent the parent
# method from using default ACLs.
kwargs["acl"] = None
kwargs["default_acl"] = None

bucket, key, _ = self.split_path(path)
# If the key is empty, the path refers to a bucket, not an object.
# Defer to the parent method to handle bucket creation.
if not key:
return await super()._mkdir(path, create_parents=create_parents, **kwargs)

is_hns = False
# If creating an HNS bucket, check for its existence first.
if create_parents and enable_hierarchial_namespace:
if not await self._exists(bucket):
await super()._mkdir(bucket, create_parents=True, **kwargs)
is_hns = True # Skip HNS check since we just created it.

if not is_hns:
# If the bucket was not created above, we need to check its type.
try:
is_hns = await self._is_bucket_hns_enabled(bucket)
except Exception as e:
logger.warning(
f"Could not determine if bucket '{bucket}' is HNS-enabled, falling back to default mkdir: {e}"
)

if not is_hns:
return await super()._mkdir(path, create_parents=create_parents, **kwargs)

logger.info(f"Using HNS-aware mkdir for '{path}'.")
parent = f"projects/_/buckets/{bucket}"
folder_id = key.rstrip("/")
request = storage_control_v2.CreateFolderRequest(
parent=parent,
folder_id=folder_id,
recursive=create_parents,
)
try:
logger.debug(f"create_folder request: {request}")
await self._storage_control_client.create_folder(request=request)
# Instead of invalidating the parent cache, update it to add the new entry.
parent_path = self._parent(path)
if parent_path in self.dircache:
new_entry = {
"Key": key.rstrip("/"),
"Size": 0,
"name": path,
"size": 0,
"type": "directory",
"storageClass": "DIRECTORY",
}
self.dircache[parent_path].append(new_entry)
except api_exceptions.Conflict as e:
logger.debug(f"Directory already exists: {path}: {e}")
except api_exceptions.FailedPrecondition as e:
# This error can occur if create_parents=False and the parent dir doesn't exist.
# Translate it to FileNotFoundError for fsspec compatibility.
raise FileNotFoundError(
f"mkdir for '{path}' failed due to a precondition error: {e}"
) from e

mkdir = asyn.sync_wrapper(_mkdir)

async def _get_directory_info(self, path, bucket, key, generation):
"""
Override to use Storage Control API's get_folder for HNS buckets.
Expand Down
3 changes: 1 addition & 2 deletions gcsfs/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,7 @@ def gcs_hns(gcs_factory, buckets_to_delete):
try:
if not gcs.exists(TEST_HNS_BUCKET):
# Note: Emulators may not fully support HNS features like real GCS.
# TODO: Update to create HNS bucket once mkdir supports creating HNS buckets.
gcs.mkdir(TEST_HNS_BUCKET)
gcs.mkdir(TEST_HNS_BUCKET, enable_hierarchial_namespace=True)
buckets_to_delete.add(TEST_HNS_BUCKET)
else:
_cleanup_gcs(gcs, bucket=TEST_HNS_BUCKET)
Expand Down
Loading
Loading