diff --git a/buildSrc/src/main/kotlin/tasks.base.gradle.kts b/buildSrc/src/main/kotlin/tasks.base.gradle.kts index 23388f1..68c83b3 100644 --- a/buildSrc/src/main/kotlin/tasks.base.gradle.kts +++ b/buildSrc/src/main/kotlin/tasks.base.gradle.kts @@ -28,7 +28,7 @@ mavenPublishing { licenses { license { name = "The Apache License, Version 2.0" - url = "http://www.apache.org/licenses/LICENSE-2.0.txt" + url = "https://www.apache.org/licenses/LICENSE-2.0.txt" } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Cancellable.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Cancellable.java index 0063d0f..b072603 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Cancellable.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Cancellable.java @@ -2,6 +2,7 @@ import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NonBlocking; +import org.jetbrains.annotations.Nullable; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -48,7 +49,7 @@ static Cancellable getEmpty() { */ @ApiStatus.Internal final class CancellableUtils { - static Cancellable EMPTY = () -> {}; + static final Cancellable EMPTY = () -> {}; } /** @@ -60,8 +61,7 @@ final class CancellableUtils { * breakage between minor version updates. */ @ApiStatus.Internal -@FunctionalInterface -interface CancellableForwardRef { +interface CancellableForwardRef extends Cancellable { void set(Cancellable cancellable); } @@ -74,41 +74,104 @@ interface CancellableForwardRef { */ @ApiStatus.Internal final class MutableCancellable implements Cancellable { - private final AtomicReference ref = - new AtomicReference<>(new State.Active(Cancellable.getEmpty(), 0)); + private final AtomicReference ref; + + MutableCancellable(final Cancellable initialRef) { + ref = new AtomicReference<>(new State.Active(initialRef, 0, null)); + } + + MutableCancellable() { + this(CancellableUtils.EMPTY); + } @Override public void cancel() { - final var prev = ref.getAndSet(State.Cancelled.INSTANCE); - if (prev instanceof State.Active active) { - active.token.cancel(); + @Nullable + var state = ref.getAndSet(State.Closed.INSTANCE); + while (state instanceof State.Active active) { + try { + active.token.cancel(); + } catch (Exception e) { + UncaughtExceptionHandler.logOrRethrow(e); + } + state = active.rest; } } public CancellableForwardRef newCancellableRef() { final var current = ref.get(); - if (current instanceof State.Cancelled) { - return Cancellable::cancel; + if (current instanceof State.Closed) { + return new CancellableForwardRef() { + @Override + public void set(Cancellable cancellable) { + cancellable.cancel(); + } + @Override + public void cancel() {} + }; } else if (current instanceof State.Active active) { - return cancellable -> registerOrdered( - active.order, - cancellable, - active - ); + return new CancellableForwardRef() { + @Override + public void set(Cancellable cancellable) { + registerOrdered( + active.order, + cancellable, + active + ); + } + + @Override + public void cancel() { + unregister(active.order); + } + }; } else { throw new IllegalStateException("Invalid state: " + current); } } - public void register(Cancellable token) { + public @Nullable Cancellable register(Cancellable token) { Objects.requireNonNull(token, "token"); while (true) { final var current = ref.get(); if (current instanceof State.Active active) { - final var update = new State.Active(token, active.order + 1); - if (ref.compareAndSet(current, update)) { return; } - } else if (current instanceof State.Cancelled) { + final var newOrder = active.order + 1; + final var update = new State.Active(token, newOrder, active); + if (ref.compareAndSet(current, update)) { return () -> unregister(newOrder); } + } else if (current instanceof State.Closed) { token.cancel(); + return null; + } else { + throw new IllegalStateException("Invalid state: " + current); + } + } + } + + private void unregister(final long order) { + while (true) { + final var current = ref.get(); + if (current instanceof State.Active active) { + @Nullable var cursor = active; + @Nullable State.Active acc = null; + while (cursor != null) { + if (cursor.order != order) { + acc = new State.Active(cursor.token, cursor.order, acc); + } + cursor = cursor.rest; + } + // Reversing + @Nullable State.Active update = null; + while (acc != null) { + update = new State.Active(acc.token, acc.order, update); + acc = acc.rest; + } + if (update == null) { + update = new State.Active(Cancellable.getEmpty(), 0, null); + } + if (ref.compareAndSet(current, update)) { + return; + } + } else if (current instanceof State.Closed) { return; } else { throw new IllegalStateException("Invalid state: " + current); @@ -126,11 +189,11 @@ private void registerOrdered( // Double-check ordering if (active.order != order) { return; } // Try to update - final var update = new State.Active(newToken, order + 1); + final var update = new State.Active(newToken, order + 1, null); if (ref.compareAndSet(current, update)) { return; } // Retry current = ref.get(); - } else if (current instanceof State.Cancelled) { + } else if (current instanceof State.Closed) { newToken.cancel(); return; } else { @@ -142,11 +205,12 @@ private void registerOrdered( sealed interface State { record Active( Cancellable token, - long order + long order, + @Nullable Active rest ) implements State {} - record Cancelled() implements State { - static Cancelled INSTANCE = new Cancelled(); + record Closed() implements State { + static final Closed INSTANCE = new Closed(); } } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java index c2b199e..5040be3 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java @@ -4,8 +4,13 @@ import org.jspecify.annotations.Nullable; import java.io.Serializable; +import java.time.Duration; import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * Represents a callback that will be invoked when a task completes. @@ -61,11 +66,98 @@ default void onCancellation() { } @ApiStatus.Internal -final class ProtectedCompletionCallback - implements CompletionCallback, Runnable { +final class ManyCompletionCallback + implements CompletionCallback { + + private final ImmutableStack> listeners; + + @SafeVarargs + ManyCompletionCallback(final CompletionCallback... listeners) { + Objects.requireNonNull(listeners, "listeners"); + ImmutableStack> stack = ImmutableStack.empty(); + for (final CompletionCallback listener : listeners) { + Objects.requireNonNull(listener, "listener"); + stack = stack.prepend(listener); + } + this.listeners = stack; + } + + private ManyCompletionCallback( + final ImmutableStack> listeners + ) { + Objects.requireNonNull(listeners, "listeners"); + this.listeners = listeners; + } + + ManyCompletionCallback withExtraListener(CompletionCallback extraListener) { + Objects.requireNonNull(extraListener, "extraListener"); + final var newListeners = this.listeners.prepend(extraListener); + return new ManyCompletionCallback<>(newListeners); + } + + @Override + public void onOutcome(Outcome outcome) { + for (final CompletionCallback listener : listeners) { + try { + listener.onOutcome(outcome); + } catch (Throwable e) { + UncaughtExceptionHandler.logOrRethrow(e); + } + } + } + + @Override + public void onSuccess(T value) { + for (final CompletionCallback listener : listeners) { + try { + listener.onSuccess(value); + } catch (Throwable e) { + UncaughtExceptionHandler.logOrRethrow(e); + } + } + } + + @Override + public void onFailure(Throwable e) { + Objects.requireNonNull(e, "e"); + for (final CompletionCallback listener : listeners) { + try { + listener.onFailure(e); + } catch (Throwable ex) { + UncaughtExceptionHandler.logOrRethrow(ex); + } + } + } + + @Override + public void onCancellation() { + for (final CompletionCallback listener : listeners) { + try { + listener.onCancellation(); + } catch (Throwable e) { + UncaughtExceptionHandler.logOrRethrow(e); + } + } + } +} + +@ApiStatus.Internal +interface ContinuationCallback + extends CompletionCallback, Serializable { + + /** + * Registers an extra callback to be invoked when the task completes. + * This is useful for chaining callbacks or adding additional listeners. + */ + void registerExtraCallback(CompletionCallback extraCallback); +} + +@ApiStatus.Internal +final class AsyncContinuationCallback + implements ContinuationCallback, Runnable { private final AtomicBoolean isWaiting = new AtomicBoolean(true); - private final CompletionCallback listener; + private final AtomicReference> listenerRef; private final TaskExecutor executor; private @Nullable Outcome outcome; @@ -73,26 +165,27 @@ final class ProtectedCompletionCallback private @Nullable Throwable failureCause; private boolean isCancelled = false; - private ProtectedCompletionCallback( + public AsyncContinuationCallback( final CompletionCallback listener, final TaskExecutor executor ) { - this.listener = listener; this.executor = executor; + this.listenerRef = new AtomicReference<>( + Objects.requireNonNull(listener, "listener") + ); } @Override + @SuppressWarnings("NullAway") public void run() { if (this.outcome != null) { - listener.onOutcome(this.outcome); + listenerRef.get().onOutcome(this.outcome); } else if (this.failureCause != null) { - listener.onFailure(this.failureCause); + listenerRef.get().onFailure(this.failureCause); } else if (this.isCancelled) { - listener.onCancellation(); - } else if (this.successValue != null) { - listener.onSuccess(this.successValue); + listenerRef.get().onCancellation(); } else { - throw new IllegalStateException("No outcome, success value, failure cause, or cancellation state set"); + listenerRef.get().onSuccess(this.successValue); } // For GC purposes; but it doesn't really matter if we nullify these or not this.outcome = null; @@ -139,14 +232,199 @@ public void onCancellation() { } } - public static CompletionCallback protect( + public static ContinuationCallback protect( final TaskExecutor executor, final CompletionCallback listener ) { Objects.requireNonNull(listener, "listener"); - return new ProtectedCompletionCallback<>( + return new AsyncContinuationCallback<>( listener, executor ); } + + @SuppressWarnings("NullAway") + public void registerExtraCallback(CompletionCallback extraCallback) { + while (true) { + final var current = listenerRef.get(); + if (current instanceof ManyCompletionCallback many) { + final var update = many.withExtraListener(extraCallback); + if (listenerRef.compareAndSet(current, update)) { + return; + } + } else if (listenerRef.compareAndSet(current, new ManyCompletionCallback<>(current, extraCallback))) { + return; + } + } + } +} + +/** + * INTERNAL API. + *

+ * INTERNAL API: Internal apis are subject to change or removal + * without any notice. When code depends on internal APIs, it is subject to + * breakage between minor version updates. + */ +@ApiStatus.Internal +final class BlockingCompletionCallback + extends AbstractQueuedSynchronizer implements ContinuationCallback { + + private final AtomicBoolean isDone = + new AtomicBoolean(false); + private final AtomicReference<@Nullable CompletionCallback> extraCallbackRef = + new AtomicReference<>(null); + + @Nullable + private T result = null; + @Nullable + private Throwable error = null; + @Nullable + private InterruptedException interrupted = null; + + @SuppressWarnings("NullAway") + private void notifyOutcome() { + final var extraCallback = extraCallbackRef.getAndSet(null); + if (extraCallback != null) + try { + if (error != null) + extraCallback.onFailure(error); + else if (interrupted != null) + extraCallback.onCancellation(); + else + extraCallback.onSuccess(result); + } catch (Throwable e) { + UncaughtExceptionHandler.logOrRethrow(e); + } + releaseShared(1); + } + + @Override + public void onSuccess(final T value) { + if (!isDone.getAndSet(true)) { + result = value; + notifyOutcome(); + } + } + + @Override + public void onFailure(final Throwable e) { + UncaughtExceptionHandler.rethrowIfFatal(e); + if (!isDone.getAndSet(true)) { + error = e; + notifyOutcome(); + } else { + UncaughtExceptionHandler.logOrRethrow(e); + } + } + + @Override + public void onCancellation() { + if (!isDone.getAndSet(true)) { + interrupted = new InterruptedException("Task was cancelled"); + notifyOutcome(); + } + } + + @Override + public void onOutcome(Outcome outcome) { + if (outcome instanceof Outcome.Success success) { + onSuccess(success.value()); + } else if (outcome instanceof Outcome.Failure failure) { + onFailure(failure.exception()); + } else { + onCancellation(); + } + } + + @Override + protected int tryAcquireShared(final int arg) { + return getState() != 0 ? 1 : -1; + } + + @Override + protected boolean tryReleaseShared(final int arg) { + setState(1); + return true; + } + + @FunctionalInterface + interface AwaitFunction { + void apply(boolean isCancelled) throws InterruptedException, TimeoutException; + } + + @SuppressWarnings("NullAway") + private T awaitInline(final Cancellable cancelToken, final AwaitFunction await) + throws InterruptedException, ExecutionException, TimeoutException { + + TaskLocalContext.signalTheStartOfBlockingCall(); + var isCancelled = false; + TimeoutException timedOut = null; + while (true) { + try { + await.apply(isCancelled); + break; + } catch (final TimeoutException | InterruptedException e) { + if (!isCancelled) { + isCancelled = true; + if (e instanceof TimeoutException te) + timedOut = te; + cancelToken.cancel(); + } + } + // Clearing the interrupted flag may not be necessary, + // but doesn't hurt, and we should have a cleared flag before + // re-throwing the exception + // + // noinspection ResultOfMethodCallIgnored + Thread.interrupted(); + } + if (timedOut != null) throw timedOut; + if (interrupted != null) throw interrupted; + if (error != null) throw new ExecutionException(error); + return result; + } + + public T await(final Cancellable cancelToken) throws InterruptedException, ExecutionException { + try { + return awaitInline(cancelToken, isCancelled -> acquireSharedInterruptibly(1)); + } catch (final TimeoutException e) { + throw new IllegalStateException("Unexpected timeout", e); + } + } + + public T await(final Cancellable cancelToken, final Duration timeout) + throws ExecutionException, InterruptedException, TimeoutException { + + return awaitInline(cancelToken, isCancelled -> { + if (!isCancelled) { + if (!tryAcquireSharedNanos(1, timeout.toNanos())) { + throw new TimeoutException("Task timed-out after " + timeout); + } + } else { + // Waiting without a timeout, since at this point it's waiting + // on the cancelled task to finish + acquireSharedInterruptibly(1); + } + }); + } + + @Override + public void registerExtraCallback(CompletionCallback extraCallback) { + while (true) { + final var current = extraCallbackRef.get(); + if (current == null) { + if (extraCallbackRef.compareAndSet(null, extraCallback)) { + return; + } + } else if (current instanceof ManyCompletionCallback many) { + final var update = many.withExtraListener(extraCallback); + if (extraCallbackRef.compareAndSet(current, update)) { + return; + } + } else if (extraCallbackRef.compareAndSet(current, new ManyCompletionCallback<>(current, extraCallback))) { + return; + } + } + } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java index 9474f48..268a9a2 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java @@ -33,11 +33,13 @@ interface Continuation * @param cancellable is the reference to the cancellable object that this * continuation will register. */ - void registerCancellable(Cancellable cancellable); + @Nullable Cancellable registerCancellable(Cancellable cancellable); CancellableForwardRef registerForwardCancellable(); - CancellableContinuation withExecutorOverride(TaskExecutor executor); + Continuation withExecutorOverride(TaskExecutor executor); + + void registerExtraCallback(CompletionCallback extraCallback); } /** @@ -46,7 +48,7 @@ interface Continuation @ApiStatus.Internal @FunctionalInterface interface AsyncContinuationFun { - void invoke(Continuation continuation); + void invoke(Continuation continuation); } /** @@ -56,13 +58,13 @@ interface AsyncContinuationFun { final class CancellableContinuation implements Continuation, Cancellable { - private final CompletionCallback callback; - private final MutableCancellable cancellable; + private final ContinuationCallback callback; + private final MutableCancellable cancellableRef; private final TaskExecutor executor; public CancellableContinuation( final TaskExecutor executor, - final CompletionCallback callback + final ContinuationCallback callback ) { this( executor, @@ -71,14 +73,14 @@ public CancellableContinuation( ); } - private CancellableContinuation( + CancellableContinuation( final TaskExecutor executor, - final CompletionCallback callback, + final ContinuationCallback callback, final MutableCancellable cancellable ) { this.executor = executor; this.callback = callback; - this.cancellable = cancellable; + this.cancellableRef = cancellable; } @Override @@ -88,17 +90,17 @@ public TaskExecutor getExecutor() { @Override public void cancel() { - cancellable.cancel(); + cancellableRef.cancel(); } @Override public CancellableForwardRef registerForwardCancellable() { - return cancellable.newCancellableRef(); + return cancellableRef.newCancellableRef(); } @Override - public void registerCancellable(Cancellable cancellable) { - this.cancellable.register(cancellable); + public @Nullable Cancellable registerCancellable(Cancellable cancellable) { + return this.cancellableRef.register(cancellable); } @Override @@ -122,11 +124,16 @@ public void onCancellation() { } @Override - public CancellableContinuation withExecutorOverride(TaskExecutor executor) { + public Continuation withExecutorOverride(TaskExecutor executor) { return new CancellableContinuation<>( executor, callback, - cancellable + cancellableRef ); } + + @Override + public void registerExtraCallback(CompletionCallback extraCallback) { + callback.registerExtraCallback(extraCallback); + } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java index 4d6ffe2..3d8d50e 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java @@ -260,13 +260,35 @@ public NotCompletedException() { @ApiStatus.Internal final class ExecutedFiber implements Fiber { private final TaskExecutor executor; - private final Continuation continuation; - private final AtomicReference> stateRef = - new AtomicReference<>(State.start()); + private final Continuation continuation; + private final MutableCancellable cancellableRef; + private final AtomicReference> stateRef; private ExecutedFiber(final TaskExecutor executor) { + this.cancellableRef = new MutableCancellable(this::fiberCancel); + this.stateRef = new AtomicReference<>(State.start()); this.executor = executor; - this.continuation = new FiberContinuation<>(executor, stateRef); + this.continuation = new CancellableContinuation<>( + executor, + new AsyncContinuationCallback<>( + new FiberCallback<>(executor, stateRef), + executor + ), + cancellableRef + ); + } + + private void fiberCancel() { + while (true) { + final var current = stateRef.get(); + if (current instanceof State.Active active) { + if (stateRef.compareAndSet(current, new State.Cancelled<>(active.listeners))) { + return; + } + } else { + return; + } + } } @Override @@ -297,17 +319,7 @@ public Cancellable joinAsync(final Runnable onComplete) { @Override public void cancel() { - while (true) { - final var current = stateRef.get(); - if (current instanceof State.Active active) { - if (stateRef.compareAndSet(current, new State.Cancelled<>(active.listeners))) { - active.cancellable.cancel(); - return; - } - } else { - return; - } - } + this.cancellableRef.cancel(); } private Cancellable removeListenerCancellable(final Runnable listener) { @@ -328,8 +340,7 @@ private Cancellable removeListenerCancellable(final Runnable listener) { sealed interface State { record Active( - ImmutableQueue listeners, - MutableCancellable cancellable + ImmutableQueue listeners ) implements State {} record Cancelled( @@ -355,7 +366,7 @@ default void triggerListeners(TaskExecutor executor) { default State addListener(final Runnable listener) { if (this instanceof Active ref) { final var newQueue = ref.listeners.enqueue(listener); - return new Active<>(newQueue, ref.cancellable); + return new Active<>(newQueue); } else if (this instanceof Cancelled ref) { final var newQueue = ref.listeners.enqueue(listener); return new Cancelled<>(newQueue); @@ -367,7 +378,7 @@ default State addListener(final Runnable listener) { default State removeListener(final Runnable listener) { if (this instanceof Active ref) { final var newQueue = ref.listeners.filter(l -> l != listener); - return new Active<>(newQueue, ref.cancellable); + return new Active<>(newQueue); } else if (this instanceof Cancelled ref) { final var newQueue = ref.listeners.filter(l -> l != listener); return new Cancelled<>(newQueue); @@ -377,7 +388,7 @@ default State removeListener(final Runnable listener) { } static State start() { - return new Active<>(ImmutableQueue.empty(), new MutableCancellable()); + return new Active<>(ImmutableQueue.empty() ); } } @@ -398,11 +409,11 @@ default State removeListener(final Runnable listener) { return fiber; } - static final class FiberContinuation implements Continuation { + static final class FiberCallback implements CompletionCallback { private final TaskExecutor executor; private final AtomicReference> stateRef; - FiberContinuation( + FiberCallback( final TaskExecutor executor, final AtomicReference> stateRef ) { @@ -410,35 +421,6 @@ static final class FiberContinuation implements Cont this.stateRef = stateRef; } - @Override - public TaskExecutor getExecutor() { - return executor; - } - - @Override - public CancellableForwardRef registerForwardCancellable() { - final var current = stateRef.get(); - if (current instanceof State.Completed) { - return (cancellable) -> {}; - } else if (current instanceof State.Cancelled) { - return Cancellable::cancel; - } else if (current instanceof State.Active active) { - return active.cancellable.newCancellableRef(); - } else { - throw new IllegalStateException("Invalid state: " + current); - } - } - - @Override - public void registerCancellable(Cancellable cancellable) { - final var current = stateRef.get(); - if (current instanceof State.Active active) { - active.cancellable.register(cancellable); - } else if (current instanceof State.Cancelled) { - cancellable.cancel(); - } - } - @Override public void onSuccess(T value) { onOutcome(Outcome.success(value)); @@ -479,11 +461,6 @@ public void onOutcome(Outcome outcome) { } } } - - @Override - public CancellableContinuation withExecutorOverride(TaskExecutor executor) { - return new CancellableContinuation<>(executor, this); - } } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Outcome.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Outcome.java index 91e5232..36d8832 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Outcome.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Outcome.java @@ -23,11 +23,6 @@ public sealed interface Outcome @NonBlocking T getOrThrow() throws ExecutionException, TaskCancellationException; - /** - * Converts this task outcome to an {@link ExitCase}. - */ - ExitCase toTaskExitCase(); - /** * Signals a successful result of the task. */ @@ -37,8 +32,6 @@ record Success(T value) @Override public T getOrThrow() { return value; } - @Override - public ExitCase toTaskExitCase() { return ExitCase.succeeded(); } } /** @@ -52,8 +45,6 @@ public T getOrThrow() throws ExecutionException { throw new ExecutionException(exception); } - @Override - public ExitCase toTaskExitCase() { return ExitCase.failed(exception); } } /** @@ -67,9 +58,6 @@ public T getOrThrow() throws TaskCancellationException { throw new TaskCancellationException(); } - @Override - public ExitCase toTaskExitCase() { return ExitCase.canceled(); } - private static final Cancellation INSTANCE = new Cancellation<>(); } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java index 5b5f91f..ec7b4ed 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java @@ -8,8 +8,6 @@ import java.time.Duration; import java.util.Objects; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * Represents a function that can be executed asynchronously. @@ -40,7 +38,7 @@ private Task(final AsyncContinuationFun createFun) { */ public Task ensureRunningOnExecutor(final @Nullable Executor executor) { return new Task<>((cont) -> { - final Continuation cont2 = executor != null + final Continuation cont2 = executor != null ? cont.withExecutorOverride(TaskExecutor.from(executor)) : cont; cont2.getExecutor().resumeOnExecutor(() -> createFun.invoke(cont2)); @@ -73,6 +71,42 @@ public Task ensureRunningOnExecutor() { return ensureRunningOnExecutor(null); } + /** + * Guarantees that the task will complete with the given callback. + *

+ * This method is useful for releasing resources or performing + * cleanup operations when the task completes. + *

+ * This callback will be invoked in addition to whatever the client + * provides as a callback to {@link #runAsync(CompletionCallback)} + * or similar methods. + *

+ * WARNING: The invocation of this method is concurrent + * with the task's completion, meaning that ordering isn't guaranteed + * (i.e., a callback installed with this method may be called before or + * after the callback provided to {@link #runAsync(CompletionCallback)}). + */ + public Task withOnComplete(final CompletionCallback callback) { + return new Task<>((cont) -> { + @SuppressWarnings("unchecked") + final var extraCallback = (CompletionCallback) Objects.requireNonNull(callback); + cont.registerExtraCallback(extraCallback); + cont.getExecutor().resumeOnExecutor(() -> createFun.invoke(cont)); + }); + } + + /** + * Registers a {@link Cancellable} that can be used to cancel the running task. + */ + public Task withCancellation( + final Cancellable cancellable + ) { + return new Task<>((cont) -> { + cont.registerCancellable(cancellable); + cont.getExecutor().resumeOnExecutor(() -> createFun.invoke(cont)); + }); + } + /** * Executes the task asynchronously. *

@@ -90,9 +124,13 @@ public Cancellable runAsync( final var taskExecutor = TaskExecutor.from( executor != null ? executor : TaskExecutors.sharedBlockingIO() ); + @SuppressWarnings("unchecked") final var cont = new CancellableContinuation<>( taskExecutor, - ProtectedCompletionCallback.protect(taskExecutor, callback) + new AsyncContinuationCallback<>( + (CompletionCallback) callback, + taskExecutor + ) ); taskExecutor.execute(() -> { try { @@ -302,18 +340,23 @@ public T runBlockingTimed(final Duration timeout) public static Task fromBlockingIO(final DelayedFun run) { return new Task<>((cont) -> { Thread th = Thread.currentThread(); - cont.registerCancellable(th::interrupt); + final var registration = cont.registerCancellable(th::interrupt); + if (registration == null) { + cont.onCancellation(); + return; + } try { T result; try { TaskLocalContext.signalTheStartOfBlockingCall(); result = run.invoke(); } finally { - cont.registerCancellable(Cancellable.getEmpty()); + registration.cancel(); } if (th.isInterrupted()) { throw new InterruptedException(); } + //noinspection DataFlowIssue cont.onSuccess(result); } catch (final InterruptedException | TaskCancellationException e) { cont.onCancellation(); @@ -426,6 +469,7 @@ public T runBlockingTimed(final Duration timeout) * Creates a task that completes with the given static/pure value. */ public static Task pure(final T value) { + //noinspection DataFlowIssue return new Task<>((cont) -> cont.onSuccess(value)); } @@ -434,136 +478,6 @@ public T runBlockingTimed(final Duration timeout) public static final Task NOOP = Task.pure(null); } -/** - * INTERNAL API. - *

- * INTERNAL API: Internal apis are subject to change or removal - * without any notice. When code depends on internal APIs, it is subject to - * breakage between minor version updates. - */ -@ApiStatus.Internal -final class BlockingCompletionCallback - extends AbstractQueuedSynchronizer implements CompletionCallback { - - private final AtomicBoolean isDone = new AtomicBoolean(false); - @Nullable - private T result = null; - @Nullable - private Throwable error = null; - @Nullable - private InterruptedException interrupted = null; - - @Override - public void onSuccess(final T value) { - if (!isDone.getAndSet(true)) { - result = value; - releaseShared(1); - } - } - - @Override - public void onFailure(final Throwable e) { - UncaughtExceptionHandler.rethrowIfFatal(e); - if (!isDone.getAndSet(true)) { - error = e; - releaseShared(1); - } else { - UncaughtExceptionHandler.logOrRethrow(e); - } - } - - @Override - public void onCancellation() { - if (!isDone.getAndSet(true)) { - interrupted = new InterruptedException("Task was cancelled"); - releaseShared(1); - } - } - - @Override - public void onOutcome(Outcome outcome) { - if (outcome instanceof Outcome.Success success) { - onSuccess(success.value()); - } else if (outcome instanceof Outcome.Failure failure) { - onFailure(failure.exception()); - } else { - onCancellation(); - } - } - - @Override - protected int tryAcquireShared(final int arg) { - return getState() != 0 ? 1 : -1; - } - - @Override - protected boolean tryReleaseShared(final int arg) { - setState(1); - return true; - } - - @FunctionalInterface - interface AwaitFunction { - void apply(boolean isCancelled) throws InterruptedException, TimeoutException; - } - - @SuppressWarnings("NullAway") - private T awaitInline(final Cancellable cancelToken, final AwaitFunction await) - throws InterruptedException, ExecutionException, TimeoutException { - - TaskLocalContext.signalTheStartOfBlockingCall(); - var isCancelled = false; - TimeoutException timedOut = null; - while (true) { - try { - await.apply(isCancelled); - break; - } catch (final TimeoutException | InterruptedException e) { - if (!isCancelled) { - isCancelled = true; - if (e instanceof TimeoutException te) - timedOut = te; - cancelToken.cancel(); - } - } - // Clearing the interrupted flag may not be necessary, - // but doesn't hurt, and we should have a cleared flag before - // re-throwing the exception - // - // noinspection ResultOfMethodCallIgnored - Thread.interrupted(); - } - if (timedOut != null) throw timedOut; - if (interrupted != null) throw interrupted; - if (error != null) throw new ExecutionException(error); - return result; - } - - public T await(final Cancellable cancelToken) throws InterruptedException, ExecutionException { - try { - return awaitInline(cancelToken, isCancelled -> acquireSharedInterruptibly(1)); - } catch (final TimeoutException e) { - throw new IllegalStateException("Unexpected timeout", e); - } - } - - public T await(final Cancellable cancelToken, final Duration timeout) - throws ExecutionException, InterruptedException, TimeoutException { - - return awaitInline(cancelToken, isCancelled -> { - if (!isCancelled) { - if (!tryAcquireSharedNanos(1, timeout.toNanos())) { - throw new TimeoutException("Task timed-out after " + timeout); - } - } else { - // Waiting without a timeout, since at this point it's waiting - // on the cancelled task to finish - acquireSharedInterruptibly(1); - } - }); - } -} - /** * INTERNAL API. *

@@ -582,7 +496,7 @@ public TaskFromCancellableFuture(DelayedFun> buil } @Override - public void invoke(Continuation continuation) { + public void invoke(Continuation continuation) { try { final var cancellableRef = continuation.registerForwardCancellable(); diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/CompletionCallbackTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/CompletionCallbackTest.java index 122dc2e..e92e534 100644 --- a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/CompletionCallbackTest.java +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/CompletionCallbackTest.java @@ -32,7 +32,7 @@ void emptyLogsRuntimeFailure() throws InterruptedException { void protectedCallbackForSuccess() { final var called = new AtomicInteger(0); final var outcomeRef = new AtomicReference<@Nullable Outcome>(null); - final var cb = ProtectedCompletionCallback.protect( + final var cb = AsyncContinuationCallback.protect( TaskExecutor.from(TaskExecutors.trampoline()), new CompletionCallback() { @Override @@ -71,7 +71,7 @@ public void onOutcome(Outcome outcome) { void protectedCallbackForRuntimeFailure() throws InterruptedException { final var called = new AtomicInteger(0); final var outcomeRef = new AtomicReference<@Nullable Outcome>(null); - final var cb = ProtectedCompletionCallback.protect( + final var cb = AsyncContinuationCallback.protect( TaskExecutor.from(TaskExecutors.trampoline()), new CompletionCallback() { @Override @@ -119,7 +119,7 @@ public void onOutcome(Outcome outcome) { void protectedCallbackForCancellation() { final var called = new AtomicInteger(0); final var outcomeRef = new AtomicReference<@Nullable Outcome>(null); - final var cb = ProtectedCompletionCallback.protect( + final var cb = AsyncContinuationCallback.protect( TaskExecutor.from(TaskExecutors.trampoline()), new CompletionCallback() { @Override diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/LoomTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/LoomTest.java index acdb3cc..344af0f 100644 --- a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/LoomTest.java +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/LoomTest.java @@ -1,5 +1,6 @@ package org.funfix.tasks.jvm; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; import java.util.Objects; @@ -20,7 +21,7 @@ public void commonPoolInJava21() throws InterruptedException { try { final var latch = new CountDownLatch(1); final var isVirtual = new AtomicBoolean(false); - final var name = new AtomicReference(); + final var name = new AtomicReference<@Nullable String>(); commonPool.execute(() -> { isVirtual.set(VirtualThreads.isVirtualThread(Thread.currentThread())); @@ -48,7 +49,7 @@ public void canInitializeFactoryInJava21() throws InterruptedException, VirtualT final var latch = new CountDownLatch(1); final var isVirtual = new AtomicBoolean(false); - final var name = new AtomicReference(); + final var name = new AtomicReference<@Nullable String>(); f.newThread(() -> { isVirtual.set(VirtualThreads.isVirtualThread(Thread.currentThread())); @@ -73,7 +74,7 @@ public void canInitializeExecutorInJava21() throws InterruptedException, Virtual try { final var latch = new CountDownLatch(1); final var isVirtual = new AtomicBoolean(false); - final var name = new AtomicReference(); + final var name = new AtomicReference<@Nullable String>(); executor.execute(() -> { isVirtual.set(VirtualThreads.isVirtualThread(Thread.currentThread())); name.set(Thread.currentThread().getName()); @@ -120,7 +121,7 @@ public void commonPoolInOlderJava() throws InterruptedException { final var latch = new CountDownLatch(1); final var isVirtual = new AtomicBoolean(true); - final var name = new AtomicReference(); + final var name = new AtomicReference<@Nullable String>(); commonPool.execute(() -> { isVirtual.set(VirtualThreads.isVirtualThread(Thread.currentThread())); diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/OutcomeTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/OutcomeTest.java index 5933b55..3a3c8b3 100644 --- a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/OutcomeTest.java +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/OutcomeTest.java @@ -14,8 +14,7 @@ void outcomeBuildSuccess() { final Outcome outcome2 = Outcome.success("value"); assertEquals(outcome1, outcome2); - if (outcome2 instanceof Outcome.Success) { - final var success = (Outcome.Success) outcome2; + if (outcome2 instanceof Outcome.Success success) { assertEquals("value", success.value()); } else { fail("Expected Success"); @@ -58,8 +57,7 @@ void outcomeBuildRuntimeFailure() { final Outcome outcome2 = Outcome.failure(e); assertEquals(outcome1, outcome2); - if (outcome2 instanceof Outcome.Failure) { - final var failure = (Outcome.Failure) outcome2; + if (outcome2 instanceof Outcome.Failure failure) { assertEquals("error", failure.exception().getMessage()); } else { fail("Expected Failure"); diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/PureTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/PureTest.java index 62f4f1b..931d87e 100644 --- a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/PureTest.java +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/PureTest.java @@ -18,7 +18,7 @@ void pureTask() throws ExecutionException, InterruptedException { } @Test - void pureResource() throws IOException, ExecutionException, InterruptedException { + void pureResource() throws ExecutionException, InterruptedException { final var resource = Resource.pure(42); for (int i = 0; i < 100; i++) { final var outcome = resource.useBlocking(value -> value); diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithCancellationTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithCancellationTest.java new file mode 100644 index 0000000..f31e13e --- /dev/null +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithCancellationTest.java @@ -0,0 +1,112 @@ +package org.funfix.tasks.jvm; + +import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class TaskWithCancellationTest { + @Test + void testTaskWithCancellation() throws InterruptedException { + for (int r = 0; r < TestSettings.CONCURRENCY_REPEATS; r++) { + final var cancelTokensRef = new ConcurrentLinkedQueue(); + final var outcomeRef = new AtomicReference<@Nullable Outcome>(null); + + final var startedLatch = new CountDownLatch(1); + final var taskLatch = new CountDownLatch(1); + final var cancelTokensLatch = new CountDownLatch(2); + final var completedLatch = new CountDownLatch(1); + final var runningTask = Task + .fromBlockingIO(() -> { + try { + startedLatch.countDown(); + taskLatch.await(); + return "Completed"; + } catch (final InterruptedException e) { + TimedAwait.latchNoExpectations(cancelTokensLatch); + cancelTokensRef.add(3); + throw e; + } + }) + .ensureRunningOnExecutor() + .withCancellation(() -> { + cancelTokensRef.add(1); + cancelTokensLatch.countDown(); + }) + .withCancellation(() -> { + cancelTokensRef.add(2); + cancelTokensLatch.countDown(); + }) + .runAsync(outcome -> { + outcomeRef.set(outcome); + completedLatch.countDown(); + }); + + TimedAwait.latchAndExpectCompletion(startedLatch, "startedLatch"); + runningTask.cancel(); + TimedAwait.latchAndExpectCompletion(completedLatch, "completedLatch"); + assertEquals(Outcome.cancellation(), outcomeRef.get()); + + final var arr = cancelTokensRef.toArray(new Integer[0]); + for (int i = 0; i < arr.length; i++) { + assertEquals(i + 1, arr[i], "cancelTokensRef[" + i + "] should be " + (i + 1)); + } + assertEquals(3, arr.length, "cancelTokensRef should have 3 elements"); + } + } + + @Test + void testTaskWithCancellationAndFibers() throws Exception { + for (int r = 0; r < TestSettings.CONCURRENCY_REPEATS; r++) { + final var cancelTokensRef = new ConcurrentLinkedQueue(); + final var startedLatch = new CountDownLatch(1); + final var taskLatch = new CountDownLatch(1); + final var cancelTokensLatch = new CountDownLatch(2); + + final var fiber = Task + .fromBlockingIO(() -> { + try { + startedLatch.countDown(); + taskLatch.await(); + return "Completed"; + } catch (final InterruptedException e) { + TimedAwait.latchNoExpectations(cancelTokensLatch); + cancelTokensRef.add(3); + throw e; + } + }) + .ensureRunningOnExecutor() + .withCancellation(() -> { + cancelTokensRef.add(1); + cancelTokensLatch.countDown(); + }) + .withCancellation(() -> { + cancelTokensRef.add(2); + cancelTokensLatch.countDown(); + }) + .runFiber(); + + TimedAwait.latchAndExpectCompletion(startedLatch, "startedLatch"); + fiber.cancel(); + TimedAwait.fiberAndExpectCancellation(fiber); + try { + fiber.getResultOrThrow(); + fail("Should have thrown a TaskCancellationException"); + } catch (final TaskCancellationException e) { + // Expected + } + + final var arr = cancelTokensRef.toArray(new Integer[0]); + for (int i = 0; i < arr.length; i++) { + assertEquals(i + 1, arr[i], "cancelTokensRef[" + i + "] should be " + (i + 1)); + } + assertEquals(3, arr.length, "cancelTokensRef should have 3 elements"); + } + } + +} diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java new file mode 100644 index 0000000..7d4fed9 --- /dev/null +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java @@ -0,0 +1,222 @@ +package org.funfix.tasks.jvm; + +import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.funfix.tasks.jvm.TestSettings.CONCURRENCY_REPEATS; +import static org.funfix.tasks.jvm.TestSettings.TIMEOUT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class TaskWithOnCompletionTest { + @Test + void guaranteeOnSuccess() throws ExecutionException, InterruptedException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var outcome1 = Task + .fromBlockingIO(() -> "Success") + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runBlocking(); + + assertEquals("Success", outcome1); + assertEquals(Outcome.success("Success"), ref1.get()); + assertEquals(Outcome.success("Success"), ref2.get()); + } + } + + @Test + void guaranteeOnSuccessWithFibers() throws ExecutionException, InterruptedException, TaskCancellationException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var fiber = Task + .fromBlockingIO(() -> "Success") + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runFiber(); + + assertEquals("Success", fiber.awaitBlocking()); + assertEquals(Outcome.success("Success"), ref1.get()); + assertEquals(Outcome.success("Success"), ref2.get()); + } + } + + @Test + void guaranteeOnSuccessWithBlockingIO() throws ExecutionException, InterruptedException, + TaskCancellationException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var r = Task + .fromBlockingIO(() -> "Success") + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runBlocking(); + + assertEquals("Success", r); + assertEquals(Outcome.success("Success"), ref1.get()); + assertEquals(Outcome.success("Success"), ref2.get()); + } + } + + + @Test + void guaranteeOnFailure() throws InterruptedException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var error = new RuntimeException("Failure"); + + try { + Task.fromBlockingIO(() -> { + throw error; + }) + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runBlocking(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertEquals(error, e.getCause()); + } + + assertEquals(Outcome.failure(error), ref1.get()); + assertEquals(Outcome.failure(error), ref2.get()); + } + } + + @Test + void guaranteeOnFailureWithFibers() throws InterruptedException, TaskCancellationException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var error = new RuntimeException("Failure"); + + try { + Task.fromBlockingIO(() -> { + throw error; + }) + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runFiber() + .awaitBlocking(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertEquals(error, e.getCause()); + } + + assertEquals(Outcome.failure(error), ref1.get()); + assertEquals(Outcome.failure(error), ref2.get()); + } + } + + @Test + void guaranteeOnFailureBlockingIO() throws InterruptedException, TaskCancellationException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var error = new RuntimeException("Failure"); + + try { + Task.fromBlockingIO(() -> { + throw error; + }) + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runBlocking(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertEquals(error, e.getCause()); + } + + assertEquals(Outcome.failure(error), ref1.get()); + assertEquals(Outcome.failure(error), ref2.get()); + } + } + + @Test + void guaranteeOnCancellation() throws InterruptedException, ExecutionException, TimeoutException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var latch = new CountDownLatch(1); + + final var task = Task + .fromBlockingIO(() -> { + latch.await(); + return "Should not complete"; + }) + .ensureRunningOnExecutor() + .withOnComplete(ref1::set) + .withOnComplete(ref2::set); + + final var fiber = task.runFiber(); + fiber.cancel(); + + try { + fiber.awaitBlockingTimed(TIMEOUT); + fail("Expected TaskCancellationException"); + } catch (TaskCancellationException e) { + // Expected + } catch (TimeoutException e) { + latch.countDown(); + throw e; + } + + assertEquals(Outcome.cancellation(), ref1.get()); + assertEquals(Outcome.cancellation(), ref2.get()); + } + } + + @Test + void callOrdering() throws InterruptedException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var latch = new CountDownLatch(1); + final var ref = new ConcurrentLinkedQueue(); + + Task.fromBlockingIO(() -> 0) + .ensureRunningOnExecutor() + .withOnComplete((ignored) -> ref.add(1)) + .withOnComplete((ignored) -> ref.add(2)) + .runAsync((ignored) -> { + ref.add(3); + latch.countDown(); + }); + + TimedAwait.latchAndExpectCompletion(latch, "latch"); + assertEquals(3, ref.size(), "Expected 3 calls to onCompletion"); + final var arr = ref.toArray(new Integer[0]); + for (int i = 0; i < 3; i++) { + assertEquals(i + 1, arr[i]); + } + } + } + + @Test + void callOrderingViaFibers() throws Exception { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref = new ConcurrentLinkedQueue(); + + final var fiber = Task.fromBlockingIO(() -> 0) + .ensureRunningOnExecutor() + .withOnComplete((ignored) -> ref.add(1)) + .withOnComplete((ignored) -> ref.add(2)) + .runFiber(); + + assertEquals(0, fiber.awaitBlockingTimed(TIMEOUT)); + assertEquals(2, ref.size(), "Expected 2 calls to onCompletion"); + final var arr = ref.toArray(new Integer[0]); + for (int i = 0; i < arr.length; i++) { + assertEquals(i + 1, arr[i]); + } + } + } + +}