diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSession.java index eb424fcde0..598da94606 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSession.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSession.java @@ -56,6 +56,22 @@ public interface BlobReadSession extends AutoCloseable, Closeable { @TransportCompatibility({Transport.GRPC}) Projection readAs(ReadProjectionConfig config); + /** + * Read all {@code configs} from this session as a specific {@code Projection} as dictated by the + * provided {@code configs}. + * + *

This allows for batching multiple reads into a single request, which can be more efficient + * than calling {@link #readAs(ReadProjectionConfig)} in a loop. + * + * @see ReadProjectionConfig + * @see ReadProjectionConfigs + * @since 2.51.0 This new api is in preview and is subject to breaking changes. + */ + @BetaApi + @TransportCompatibility({Transport.GRPC}) + java.util.List readAllAs( + java.util.List> configs); + /** * Close this session and any {@code Projection}s produced by {@link * #readAs(ReadProjectionConfig)}. diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSessionAdapter.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSessionAdapter.java index 1823a13283..c849757eb2 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSessionAdapter.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSessionAdapter.java @@ -48,6 +48,23 @@ public Projection readAs(ReadProjectionConfig config) { return projection; } + @SuppressWarnings({"rawtypes", "unchecked"}) + @Override + public java.util.List readAllAs( + java.util.List> configs) { + java.util.List projections = session.readAllAs(configs); + java.util.List wrapped = new java.util.ArrayList<>(projections.size()); + for (Projection projection : projections) { + if (projection instanceof ApiFuture) { + ApiFuture apiFuture = (ApiFuture) projection; + wrapped.add((Projection) StorageException.coalesceAsync(apiFuture)); + } else { + wrapped.add(projection); + } + } + return wrapped; + } + @Override public void close() throws IOException { session.close(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSession.java index 655c64dda8..562e50deb8 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSession.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSession.java @@ -27,4 +27,7 @@ interface ObjectReadSession extends IOAutoCloseable { Object getResource(); Projection readAs(ReadProjectionConfig config); + + java.util.List readAllAs( + java.util.List> configs); } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionImpl.java index 2998f4d8ff..7f9cc7524c 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionImpl.java @@ -97,6 +97,46 @@ public Projection readAs(ReadProjectionConfig config) { } } + @Override + public java.util.List readAllAs( + java.util.List> configs) { + checkState(open, "Session already closed"); + java.util.List results = new ArrayList<>(configs.size()); + + // Filter for STREAM_READ configs to batch + List> batchedReads = new ArrayList<>(); + List batchedReadIds = new ArrayList<>(); + + for (ReadProjectionConfig config : configs) { + switch (config.getType()) { + case STREAM_READ: + long readId = state.newReadId(); + ObjectReadSessionStreamRead read = + config.cast().newRead(readId, retryContextProvider.create()); + batchedReads.add(read); + batchedReadIds.add(readId); + results.add(read.project()); + break; + case SESSION_USER: + results.add(config.project(this, IOAutoCloseable.noOp())); + break; + default: + throw new IllegalStateException( + String.format( + Locale.US, + "Broken java enum %s value=%s", + ProjectionType.class.getName(), + config.getType().name())); + } + } + + if (!batchedReads.isEmpty()) { + registerBatchReadsInState(batchedReadIds, batchedReads); + } + + return results; + } + @Override public void close() throws IOException { try { @@ -137,6 +177,61 @@ private void registerReadInState(long readId, ObjectReadSessionStreamRead rea } } + private void registerBatchReadsInState( + List readIds, List> reads) { + // 1. Check if the current state can handle ALL new reads + // We assume if it can handle one valid read of the batch, it can handle all, + // BUT we must ensure transactionality or consistency if they differ. + // However, usually they differ only by offset/length. + // Let's check all or check if they are compatible with each other + state. + // For now, we check if state can handle the first one? + // Correctness: we should check all or rely on the fact they come from same session/context. + // If state implies "same generation/metadata", then all reads should be fine if one is fine. + // Let's verify strict correctness: + boolean allCompatible = true; + for (ObjectReadSessionStreamRead read : reads) { + if (!state.canHandleNewRead(read)) { + allCompatible = false; + break; + } + } + + // Prepare Request Builder + BidiReadObjectRequest.Builder requestBuilder = BidiReadObjectRequest.newBuilder(); + for (ObjectReadSessionStreamRead read : reads) { + requestBuilder.addReadRanges(read.makeReadRange()); + } + BidiReadObjectRequest request = requestBuilder.build(); + + if (allCompatible) { + for (int i = 0; i < readIds.size(); i++) { + state.putOutstandingRead(readIds.get(i), reads.get(i)); + } + stream.send(request); + } else { + // Fork a new child for this BATCH + ObjectReadSessionState child = state.forkChild(); + ObjectReadSessionStream newStream = + ObjectReadSessionStream.create(executor, callable, child, retryContextProvider.create()); + children.put(newStream, child); + + // We only need one close callback for the stream closure, so we can attach it to the first read + // or all? If we attach to all, we might try to close multiple times, which is fine (concurrent map remove). + IOAutoCloseable closeCallback = + () -> { + children.remove(newStream); + newStream.close(); + }; + + for (int i = 0; i < readIds.size(); i++) { + ObjectReadSessionStreamRead read = reads.get(i); + read.setOnCloseCallback(closeCallback); + child.putOutstandingRead(readIds.get(i), read); + } + newStream.send(request); + } + } + @VisibleForTesting static final class ConcurrentIdentityMap { private final ReentrantLock lock; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java index e418e5e106..c0ff78f57a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java @@ -56,6 +56,7 @@ import java.nio.ByteBuffer; import java.nio.channels.WritableByteChannel; import java.nio.file.Path; +import java.util.ArrayList; import java.util.List; import java.util.Locale; import java.util.concurrent.TimeUnit; @@ -1989,6 +1990,36 @@ public Projection readAs(ReadProjectionConfig config) { } } + @Override + public java.util.List readAllAs( + java.util.List> configs) { + java.util.List spans = new ArrayList<>(configs.size()); + java.util.List> otrConfigs = + new ArrayList<>(configs.size()); + + for (ReadProjectionConfig config : configs) { + Span readRangeSpan = + tracer + .spanBuilder(BLOB_READ_SESSION + "/readAs") + .setAttribute("gsutil.uri", id.toGsUtilUriWithGeneration()) + .setParent(blobReadSessionContext) + .startSpan(); + spans.add(readRangeSpan); + otrConfigs.add(new OtelReadProjectionConfig<>(config, readRangeSpan)); + } + + try { + return delegate.readAllAs(otrConfigs); + } catch (Throwable t) { + for (Span span : spans) { + span.recordException(t); + span.setStatus(StatusCode.ERROR, t.getClass().getSimpleName()); + span.end(); + } + throw t; + } + } + @Override public void close() throws IOException { try { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionLifeCycleTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionLifeCycleTest.java index 2889ebfe51..8396285c07 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionLifeCycleTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ObjectReadSessionLifeCycleTest.java @@ -44,6 +44,12 @@ public Projection readAs(ReadProjectionConfig config) { return null; } + @Override + public java.util.List readAllAs( + java.util.List> configs) { + throw new UnsupportedOperationException(); + } + @Override public void close() throws IOException { sessionCloseCount.getAndIncrement(); diff --git a/renovate.json b/renovate.json index 2ae8e23301..4c31c58e88 100644 --- a/renovate.json +++ b/renovate.json @@ -128,6 +128,16 @@ "^ch.qos.logback:logback-classic" ], "allowedVersions": "<1.4.0" + }, + { + "description": "Disable updates for templatized GitHub Actions", + "matchManagers": [ + "github-actions" + ], + "matchFileNames": [ + ".github/workflows/**" + ], + "enabled": false } ], "semanticCommits": true,