From fcf3fa3b1833df46dcbab2bc7b1b4a73e1375a5a Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Sun, 10 Aug 2025 11:18:13 +0300 Subject: [PATCH 1/7] Get rid of ? super T in Callback --- .../java/org/funfix/tasks/jvm/Continuation.java | 2 +- .../src/main/java/org/funfix/tasks/jvm/Fiber.java | 2 +- .../src/main/java/org/funfix/tasks/jvm/Task.java | 14 ++++++++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java index 9474f48..2a2e012 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java @@ -46,7 +46,7 @@ interface Continuation @ApiStatus.Internal @FunctionalInterface interface AsyncContinuationFun { - void invoke(Continuation continuation); + void invoke(Continuation continuation); } /** diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java index 4d6ffe2..06da5cf 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java @@ -260,7 +260,7 @@ public NotCompletedException() { @ApiStatus.Internal final class ExecutedFiber implements Fiber { private final TaskExecutor executor; - private final Continuation continuation; + private final Continuation continuation; private final AtomicReference> stateRef = new AtomicReference<>(State.start()); diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java index 5b5f91f..2669b59 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java @@ -40,7 +40,7 @@ private Task(final AsyncContinuationFun createFun) { */ public Task ensureRunningOnExecutor(final @Nullable Executor executor) { return new Task<>((cont) -> { - final Continuation cont2 = executor != null + final Continuation cont2 = executor != null ? cont.withExecutorOverride(TaskExecutor.from(executor)) : cont; cont2.getExecutor().resumeOnExecutor(() -> createFun.invoke(cont2)); @@ -90,9 +90,13 @@ public Cancellable runAsync( final var taskExecutor = TaskExecutor.from( executor != null ? executor : TaskExecutors.sharedBlockingIO() ); - final var cont = new CancellableContinuation<>( + @SuppressWarnings("unchecked") + final var cont = new CancellableContinuation( taskExecutor, - ProtectedCompletionCallback.protect(taskExecutor, callback) + ProtectedCompletionCallback.protect( + taskExecutor, + (CompletionCallback) callback + ) ); taskExecutor.execute(() -> { try { @@ -314,6 +318,7 @@ public T runBlockingTimed(final Duration timeout) if (th.isInterrupted()) { throw new InterruptedException(); } + //noinspection DataFlowIssue cont.onSuccess(result); } catch (final InterruptedException | TaskCancellationException e) { cont.onCancellation(); @@ -426,6 +431,7 @@ public T runBlockingTimed(final Duration timeout) * Creates a task that completes with the given static/pure value. */ public static Task pure(final T value) { + //noinspection DataFlowIssue return new Task<>((cont) -> cont.onSuccess(value)); } @@ -582,7 +588,7 @@ public TaskFromCancellableFuture(DelayedFun> buil } @Override - public void invoke(Continuation continuation) { + public void invoke(Continuation continuation) { try { final var cancellableRef = continuation.registerForwardCancellable(); From 3ecca1d630c7b09caa6265eec437f429a94bb5ce Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Sun, 10 Aug 2025 11:19:19 +0300 Subject: [PATCH 2/7] Continuation API change --- .../src/main/java/org/funfix/tasks/jvm/Continuation.java | 4 ++-- tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java index 2a2e012..efc449b 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java @@ -37,7 +37,7 @@ interface Continuation CancellableForwardRef registerForwardCancellable(); - CancellableContinuation withExecutorOverride(TaskExecutor executor); + Continuation withExecutorOverride(TaskExecutor executor); } /** @@ -122,7 +122,7 @@ public void onCancellation() { } @Override - public CancellableContinuation withExecutorOverride(TaskExecutor executor) { + public Continuation withExecutorOverride(TaskExecutor executor) { return new CancellableContinuation<>( executor, callback, diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java index 06da5cf..bd19638 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java @@ -481,8 +481,8 @@ public void onOutcome(Outcome outcome) { } @Override - public CancellableContinuation withExecutorOverride(TaskExecutor executor) { - return new CancellableContinuation<>(executor, this); + public Continuation withExecutorOverride(TaskExecutor executor) { + return new FiberContinuation<>(executor, stateRef); } } } From 90faeeb95ceee19badff14738983e2bd41afc24c Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Sun, 10 Aug 2025 11:32:43 +0300 Subject: [PATCH 3/7] Modify Continuation --- .../funfix/tasks/jvm/CompletionCallback.java | 84 +++++++++++++++++++ .../org/funfix/tasks/jvm/Continuation.java | 12 +++ .../main/java/org/funfix/tasks/jvm/Fiber.java | 8 ++ 3 files changed, 104 insertions(+) diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java index c2b199e..dd1644f 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java @@ -58,6 +58,90 @@ default void onCancellation() { } }; } + + /** + * Composes multiple {@code CompletionCallback} instances into a single + * {@code CompletionCallback} that will invoke all the listeners in order. + */ + @SafeVarargs + static CompletionCallback compose( + final CompletionCallback... listeners + ) { + Objects.requireNonNull(listeners, "listeners"); + if (listeners.length == 0) { + return empty(); + } else if (listeners.length == 1) { + return listeners[0]; + } else { + return new ManyCompletionCallback<>(listeners); + } + } +} + +@ApiStatus.Internal +final class ManyCompletionCallback + implements CompletionCallback { + + private final CompletionCallback[] listeners; + + @SafeVarargs + ManyCompletionCallback(final CompletionCallback... listeners) { + Objects.requireNonNull(listeners, "listeners"); + this.listeners = listeners; + } + + @Override + public void onOutcome(Outcome outcome) { + Trampoline.execute(() -> { + for (final CompletionCallback listener : listeners) { + try { + listener.onOutcome(outcome); + } catch (Throwable e) { + UncaughtExceptionHandler.logOrRethrow(e); + } + } + }); + } + + @Override + public void onSuccess(T value) { + Trampoline.execute(() -> { + for (final CompletionCallback listener : listeners) { + try { + listener.onSuccess(value); + } catch (Throwable e) { + UncaughtExceptionHandler.logOrRethrow(e); + } + } + }); + } + + @Override + public void onFailure(Throwable e) { + Objects.requireNonNull(e, "e"); + Trampoline.execute(() -> { + for (final CompletionCallback listener : listeners) { + try { + listener.onFailure(e); + } catch (Throwable ex) { + UncaughtExceptionHandler.logOrRethrow(ex); + } + } + }); + } + + @Override + public void onCancellation() { + Trampoline.execute(() -> { + for (final CompletionCallback listener : listeners) { + try { + listener.onCancellation(); + } catch (Throwable e) { + UncaughtExceptionHandler.logOrRethrow(e); + } + } + }); + } } @ApiStatus.Internal diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java index efc449b..330be4f 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java @@ -38,6 +38,8 @@ interface Continuation CancellableForwardRef registerForwardCancellable(); Continuation withExecutorOverride(TaskExecutor executor); + + Continuation withExtraCallback(CompletionCallback extraCallback); } /** @@ -129,4 +131,14 @@ public Continuation withExecutorOverride(TaskExecutor executor) { cancellable ); } + + @Override + public Continuation withExtraCallback(CompletionCallback extraCallback) { + final var callback2 = CompletionCallback.compose(extraCallback, callback); + return new CancellableContinuation<>( + executor, + callback2, + cancellable + ); + } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java index bd19638..3bc817d 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java @@ -484,6 +484,14 @@ public void onOutcome(Outcome outcome) { public Continuation withExecutorOverride(TaskExecutor executor) { return new FiberContinuation<>(executor, stateRef); } + + @Override + public Continuation withExtraCallback(CompletionCallback extraCallback) { + return new CancellableContinuation<>( + executor, + CompletionCallback.compose(extraCallback, this) + ); + } } } From e44f1822983ec8fc3a8da1c1e69fded79c90ddf0 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Sun, 10 Aug 2025 11:54:17 +0300 Subject: [PATCH 4/7] Add guaranteeOnCompletion --- .../main/java/org/funfix/tasks/jvm/Fiber.java | 2 +- .../main/java/org/funfix/tasks/jvm/Task.java | 26 +++- .../jvm/TaskGuaranteeOnCompletionTest.java | 121 ++++++++++++++++++ 3 files changed, 147 insertions(+), 2 deletions(-) create mode 100644 tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskGuaranteeOnCompletionTest.java diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java index 3bc817d..fe4aa34 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java @@ -489,7 +489,7 @@ public Continuation withExecutorOverride(TaskExecutor executor) { public Continuation withExtraCallback(CompletionCallback extraCallback) { return new CancellableContinuation<>( executor, - CompletionCallback.compose(extraCallback, this) + CompletionCallback.compose(extraCallback, this), ); } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java index 2669b59..b550b95 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java @@ -73,6 +73,30 @@ public Task ensureRunningOnExecutor() { return ensureRunningOnExecutor(null); } + /** + * Guarantees that the task will complete with the given callback. + *

+ * This method is useful for releasing resources or performing + * cleanup operations when the task completes. + *

+ * This callback will be invoked in addition to whatever the client + * provides as a callback to {@link #runAsync(CompletionCallback)} + * or similar methods, and will also be invoked before it, i.e., + * callbacks get added and executed in LIFO order. + */ + public Task onCompletion(final CompletionCallback callback) { + return new Task<>((cont) -> { + @SuppressWarnings("unchecked") + final Continuation cont2 = cont.withExtraCallback( + ProtectedCompletionCallback.protect( + cont.getExecutor(), + (CompletionCallback) callback + ) + ); + cont2.getExecutor().resumeOnExecutor(() -> createFun.invoke(cont2)); + }); + } + /** * Executes the task asynchronously. *

@@ -91,7 +115,7 @@ public Cancellable runAsync( executor != null ? executor : TaskExecutors.sharedBlockingIO() ); @SuppressWarnings("unchecked") - final var cont = new CancellableContinuation( + final var cont = new CancellableContinuation<>( taskExecutor, ProtectedCompletionCallback.protect( taskExecutor, diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskGuaranteeOnCompletionTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskGuaranteeOnCompletionTest.java new file mode 100644 index 0000000..4c4b22a --- /dev/null +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskGuaranteeOnCompletionTest.java @@ -0,0 +1,121 @@ +package org.funfix.tasks.jvm; + +import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.funfix.tasks.jvm.TestSettings.TIMEOUT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class TaskGuaranteeOnCompletionTest { + @Test + void guaranteeOnSuccess() throws ExecutionException, InterruptedException { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var outcome1 = Task + .fromBlockingIO(() -> "Success") + .onCompletion(ref1::set) + .onCompletion(ref2::set) + .runBlocking(); + + assertEquals("Success", outcome1); + assertEquals(Outcome.success("Success"), ref1.get()); + assertEquals(Outcome.success("Success"), ref2.get()); + } + + @Test + void guaranteeOnSuccessWithFibers() throws ExecutionException, InterruptedException, TaskCancellationException { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var fiber = Task + .fromBlockingIO(() -> "Success") + .onCompletion(ref1::set) + .onCompletion(ref2::set) + .runFiber(); + + assertEquals("Success", fiber.awaitBlocking()); + assertEquals(Outcome.success("Success"), ref1.get()); + assertEquals(Outcome.success("Success"), ref2.get()); + } + + @Test + void guaranteeOnFailure() throws InterruptedException { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var error = new RuntimeException("Failure"); + + try { + Task.fromBlockingIO(() -> { + throw error; + }) + .onCompletion(ref1::set) + .onCompletion(ref2::set) + .runBlocking(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertEquals(error, e.getCause()); + } + + assertEquals(Outcome.failure(error), ref1.get()); + assertEquals(Outcome.failure(error), ref2.get()); + } + + @Test + void guaranteeOnFailureWithFibers() throws InterruptedException, TaskCancellationException { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var error = new RuntimeException("Failure"); + + try { + Task.fromBlockingIO(() -> { + throw error; + }) + .onCompletion(ref1::set) + .onCompletion(ref2::set) + .runFiber() + .awaitBlocking(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertEquals(error, e.getCause()); + } + + assertEquals(Outcome.failure(error), ref1.get()); + assertEquals(Outcome.failure(error), ref2.get()); + } + + @Test + void guaranteeOnCancellation() throws InterruptedException, ExecutionException, TimeoutException { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var latch = new CountDownLatch(1); + + final var task = Task + .fromBlockingIO(() -> { + latch.await(); + return "Should not complete"; + }) + .onCompletion(ref1::set) + .onCompletion(ref2::set); + + final var fiber = task.runFiber(); + fiber.cancel(); + + try { + fiber.awaitBlockingTimed(TIMEOUT); + fail("Expected TaskCancellationException"); + } catch (TaskCancellationException e) { + // Expected + } catch (TimeoutException e) { + latch.countDown(); + throw e; + } + + assertEquals(Outcome.cancellation(), ref1.get()); + assertEquals(Outcome.cancellation(), ref2.get()); + } +} From 39ef557f8aaf68202ab687f6663cb7b0bc4b8f70 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 11 Aug 2025 08:08:15 +0300 Subject: [PATCH 5/7] Many changes to Cancellable --- .../src/main/kotlin/tasks.base.gradle.kts | 2 +- .../org/funfix/tasks/jvm/Cancellable.java | 112 ++++++++--- .../org/funfix/tasks/jvm/Continuation.java | 24 +-- .../main/java/org/funfix/tasks/jvm/Fiber.java | 92 +++------ .../java/org/funfix/tasks/jvm/Outcome.java | 12 -- .../main/java/org/funfix/tasks/jvm/Task.java | 22 ++- .../java/org/funfix/tasks/jvm/LoomTest.java | 9 +- .../org/funfix/tasks/jvm/OutcomeTest.java | 6 +- .../java/org/funfix/tasks/jvm/PureTest.java | 2 +- .../jvm/TaskGuaranteeOnCompletionTest.java | 121 ------------ .../tasks/jvm/TaskWithCancellationTest.java | 112 +++++++++++ .../tasks/jvm/TaskWithOnCompletionTest.java | 175 ++++++++++++++++++ 12 files changed, 444 insertions(+), 245 deletions(-) delete mode 100644 tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskGuaranteeOnCompletionTest.java create mode 100644 tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithCancellationTest.java create mode 100644 tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java diff --git a/buildSrc/src/main/kotlin/tasks.base.gradle.kts b/buildSrc/src/main/kotlin/tasks.base.gradle.kts index 23388f1..68c83b3 100644 --- a/buildSrc/src/main/kotlin/tasks.base.gradle.kts +++ b/buildSrc/src/main/kotlin/tasks.base.gradle.kts @@ -28,7 +28,7 @@ mavenPublishing { licenses { license { name = "The Apache License, Version 2.0" - url = "http://www.apache.org/licenses/LICENSE-2.0.txt" + url = "https://www.apache.org/licenses/LICENSE-2.0.txt" } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Cancellable.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Cancellable.java index 0063d0f..b072603 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Cancellable.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Cancellable.java @@ -2,6 +2,7 @@ import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NonBlocking; +import org.jetbrains.annotations.Nullable; import java.util.Objects; import java.util.concurrent.atomic.AtomicReference; @@ -48,7 +49,7 @@ static Cancellable getEmpty() { */ @ApiStatus.Internal final class CancellableUtils { - static Cancellable EMPTY = () -> {}; + static final Cancellable EMPTY = () -> {}; } /** @@ -60,8 +61,7 @@ final class CancellableUtils { * breakage between minor version updates. */ @ApiStatus.Internal -@FunctionalInterface -interface CancellableForwardRef { +interface CancellableForwardRef extends Cancellable { void set(Cancellable cancellable); } @@ -74,41 +74,104 @@ interface CancellableForwardRef { */ @ApiStatus.Internal final class MutableCancellable implements Cancellable { - private final AtomicReference ref = - new AtomicReference<>(new State.Active(Cancellable.getEmpty(), 0)); + private final AtomicReference ref; + + MutableCancellable(final Cancellable initialRef) { + ref = new AtomicReference<>(new State.Active(initialRef, 0, null)); + } + + MutableCancellable() { + this(CancellableUtils.EMPTY); + } @Override public void cancel() { - final var prev = ref.getAndSet(State.Cancelled.INSTANCE); - if (prev instanceof State.Active active) { - active.token.cancel(); + @Nullable + var state = ref.getAndSet(State.Closed.INSTANCE); + while (state instanceof State.Active active) { + try { + active.token.cancel(); + } catch (Exception e) { + UncaughtExceptionHandler.logOrRethrow(e); + } + state = active.rest; } } public CancellableForwardRef newCancellableRef() { final var current = ref.get(); - if (current instanceof State.Cancelled) { - return Cancellable::cancel; + if (current instanceof State.Closed) { + return new CancellableForwardRef() { + @Override + public void set(Cancellable cancellable) { + cancellable.cancel(); + } + @Override + public void cancel() {} + }; } else if (current instanceof State.Active active) { - return cancellable -> registerOrdered( - active.order, - cancellable, - active - ); + return new CancellableForwardRef() { + @Override + public void set(Cancellable cancellable) { + registerOrdered( + active.order, + cancellable, + active + ); + } + + @Override + public void cancel() { + unregister(active.order); + } + }; } else { throw new IllegalStateException("Invalid state: " + current); } } - public void register(Cancellable token) { + public @Nullable Cancellable register(Cancellable token) { Objects.requireNonNull(token, "token"); while (true) { final var current = ref.get(); if (current instanceof State.Active active) { - final var update = new State.Active(token, active.order + 1); - if (ref.compareAndSet(current, update)) { return; } - } else if (current instanceof State.Cancelled) { + final var newOrder = active.order + 1; + final var update = new State.Active(token, newOrder, active); + if (ref.compareAndSet(current, update)) { return () -> unregister(newOrder); } + } else if (current instanceof State.Closed) { token.cancel(); + return null; + } else { + throw new IllegalStateException("Invalid state: " + current); + } + } + } + + private void unregister(final long order) { + while (true) { + final var current = ref.get(); + if (current instanceof State.Active active) { + @Nullable var cursor = active; + @Nullable State.Active acc = null; + while (cursor != null) { + if (cursor.order != order) { + acc = new State.Active(cursor.token, cursor.order, acc); + } + cursor = cursor.rest; + } + // Reversing + @Nullable State.Active update = null; + while (acc != null) { + update = new State.Active(acc.token, acc.order, update); + acc = acc.rest; + } + if (update == null) { + update = new State.Active(Cancellable.getEmpty(), 0, null); + } + if (ref.compareAndSet(current, update)) { + return; + } + } else if (current instanceof State.Closed) { return; } else { throw new IllegalStateException("Invalid state: " + current); @@ -126,11 +189,11 @@ private void registerOrdered( // Double-check ordering if (active.order != order) { return; } // Try to update - final var update = new State.Active(newToken, order + 1); + final var update = new State.Active(newToken, order + 1, null); if (ref.compareAndSet(current, update)) { return; } // Retry current = ref.get(); - } else if (current instanceof State.Cancelled) { + } else if (current instanceof State.Closed) { newToken.cancel(); return; } else { @@ -142,11 +205,12 @@ private void registerOrdered( sealed interface State { record Active( Cancellable token, - long order + long order, + @Nullable Active rest ) implements State {} - record Cancelled() implements State { - static Cancelled INSTANCE = new Cancelled(); + record Closed() implements State { + static final Closed INSTANCE = new Closed(); } } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java index 330be4f..abb24d4 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java @@ -33,7 +33,7 @@ interface Continuation * @param cancellable is the reference to the cancellable object that this * continuation will register. */ - void registerCancellable(Cancellable cancellable); + @Nullable Cancellable registerCancellable(Cancellable cancellable); CancellableForwardRef registerForwardCancellable(); @@ -59,7 +59,7 @@ final class CancellableContinuation implements Continuation, Cancellable { private final CompletionCallback callback; - private final MutableCancellable cancellable; + private final MutableCancellable cancellableRef; private final TaskExecutor executor; public CancellableContinuation( @@ -73,14 +73,14 @@ public CancellableContinuation( ); } - private CancellableContinuation( + CancellableContinuation( final TaskExecutor executor, final CompletionCallback callback, final MutableCancellable cancellable ) { this.executor = executor; this.callback = callback; - this.cancellable = cancellable; + this.cancellableRef = cancellable; } @Override @@ -90,17 +90,17 @@ public TaskExecutor getExecutor() { @Override public void cancel() { - cancellable.cancel(); + cancellableRef.cancel(); } @Override public CancellableForwardRef registerForwardCancellable() { - return cancellable.newCancellableRef(); + return cancellableRef.newCancellableRef(); } @Override - public void registerCancellable(Cancellable cancellable) { - this.cancellable.register(cancellable); + public @Nullable Cancellable registerCancellable(Cancellable cancellable) { + return this.cancellableRef.register(cancellable); } @Override @@ -128,17 +128,17 @@ public Continuation withExecutorOverride(TaskExecutor executor) { return new CancellableContinuation<>( executor, callback, - cancellable + cancellableRef ); } @Override public Continuation withExtraCallback(CompletionCallback extraCallback) { - final var callback2 = CompletionCallback.compose(extraCallback, callback); + final var updatedCallback = CompletionCallback.compose(extraCallback, callback); return new CancellableContinuation<>( executor, - callback2, - cancellable + updatedCallback, + cancellableRef ); } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java index fe4aa34..e2aa5ea 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java @@ -261,12 +261,31 @@ public NotCompletedException() { final class ExecutedFiber implements Fiber { private final TaskExecutor executor; private final Continuation continuation; - private final AtomicReference> stateRef = - new AtomicReference<>(State.start()); + private final MutableCancellable cancellableRef; + private final AtomicReference> stateRef; private ExecutedFiber(final TaskExecutor executor) { + this.cancellableRef = new MutableCancellable(this::fiberCancel); + this.stateRef = new AtomicReference<>(State.start()); this.executor = executor; - this.continuation = new FiberContinuation<>(executor, stateRef); + this.continuation = new CancellableContinuation<>( + executor, + new FiberCallback<>(executor, stateRef), + cancellableRef + ); + } + + private void fiberCancel() { + while (true) { + final var current = stateRef.get(); + if (current instanceof State.Active active) { + if (stateRef.compareAndSet(current, new State.Cancelled<>(active.listeners))) { + return; + } + } else { + return; + } + } } @Override @@ -297,17 +316,7 @@ public Cancellable joinAsync(final Runnable onComplete) { @Override public void cancel() { - while (true) { - final var current = stateRef.get(); - if (current instanceof State.Active active) { - if (stateRef.compareAndSet(current, new State.Cancelled<>(active.listeners))) { - active.cancellable.cancel(); - return; - } - } else { - return; - } - } + this.cancellableRef.cancel(); } private Cancellable removeListenerCancellable(final Runnable listener) { @@ -328,8 +337,7 @@ private Cancellable removeListenerCancellable(final Runnable listener) { sealed interface State { record Active( - ImmutableQueue listeners, - MutableCancellable cancellable + ImmutableQueue listeners ) implements State {} record Cancelled( @@ -355,7 +363,7 @@ default void triggerListeners(TaskExecutor executor) { default State addListener(final Runnable listener) { if (this instanceof Active ref) { final var newQueue = ref.listeners.enqueue(listener); - return new Active<>(newQueue, ref.cancellable); + return new Active<>(newQueue); } else if (this instanceof Cancelled ref) { final var newQueue = ref.listeners.enqueue(listener); return new Cancelled<>(newQueue); @@ -367,7 +375,7 @@ default State addListener(final Runnable listener) { default State removeListener(final Runnable listener) { if (this instanceof Active ref) { final var newQueue = ref.listeners.filter(l -> l != listener); - return new Active<>(newQueue, ref.cancellable); + return new Active<>(newQueue); } else if (this instanceof Cancelled ref) { final var newQueue = ref.listeners.filter(l -> l != listener); return new Cancelled<>(newQueue); @@ -377,7 +385,7 @@ default State removeListener(final Runnable listener) { } static State start() { - return new Active<>(ImmutableQueue.empty(), new MutableCancellable()); + return new Active<>(ImmutableQueue.empty() ); } } @@ -398,11 +406,11 @@ default State removeListener(final Runnable listener) { return fiber; } - static final class FiberContinuation implements Continuation { + static final class FiberCallback implements CompletionCallback { private final TaskExecutor executor; private final AtomicReference> stateRef; - FiberContinuation( + FiberCallback( final TaskExecutor executor, final AtomicReference> stateRef ) { @@ -410,35 +418,6 @@ static final class FiberContinuation implements Cont this.stateRef = stateRef; } - @Override - public TaskExecutor getExecutor() { - return executor; - } - - @Override - public CancellableForwardRef registerForwardCancellable() { - final var current = stateRef.get(); - if (current instanceof State.Completed) { - return (cancellable) -> {}; - } else if (current instanceof State.Cancelled) { - return Cancellable::cancel; - } else if (current instanceof State.Active active) { - return active.cancellable.newCancellableRef(); - } else { - throw new IllegalStateException("Invalid state: " + current); - } - } - - @Override - public void registerCancellable(Cancellable cancellable) { - final var current = stateRef.get(); - if (current instanceof State.Active active) { - active.cancellable.register(cancellable); - } else if (current instanceof State.Cancelled) { - cancellable.cancel(); - } - } - @Override public void onSuccess(T value) { onOutcome(Outcome.success(value)); @@ -479,19 +458,6 @@ public void onOutcome(Outcome outcome) { } } } - - @Override - public Continuation withExecutorOverride(TaskExecutor executor) { - return new FiberContinuation<>(executor, stateRef); - } - - @Override - public Continuation withExtraCallback(CompletionCallback extraCallback) { - return new CancellableContinuation<>( - executor, - CompletionCallback.compose(extraCallback, this), - ); - } } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Outcome.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Outcome.java index 91e5232..36d8832 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Outcome.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Outcome.java @@ -23,11 +23,6 @@ public sealed interface Outcome @NonBlocking T getOrThrow() throws ExecutionException, TaskCancellationException; - /** - * Converts this task outcome to an {@link ExitCase}. - */ - ExitCase toTaskExitCase(); - /** * Signals a successful result of the task. */ @@ -37,8 +32,6 @@ record Success(T value) @Override public T getOrThrow() { return value; } - @Override - public ExitCase toTaskExitCase() { return ExitCase.succeeded(); } } /** @@ -52,8 +45,6 @@ public T getOrThrow() throws ExecutionException { throw new ExecutionException(exception); } - @Override - public ExitCase toTaskExitCase() { return ExitCase.failed(exception); } } /** @@ -67,9 +58,6 @@ public T getOrThrow() throws TaskCancellationException { throw new TaskCancellationException(); } - @Override - public ExitCase toTaskExitCase() { return ExitCase.canceled(); } - private static final Cancellation INSTANCE = new Cancellation<>(); } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java index b550b95..88bba66 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java @@ -84,7 +84,7 @@ public Task ensureRunningOnExecutor() { * or similar methods, and will also be invoked before it, i.e., * callbacks get added and executed in LIFO order. */ - public Task onCompletion(final CompletionCallback callback) { + public Task withOnComplete(final CompletionCallback callback) { return new Task<>((cont) -> { @SuppressWarnings("unchecked") final Continuation cont2 = cont.withExtraCallback( @@ -97,6 +97,18 @@ public Task onCompletion(final CompletionCallback callback) { }); } + /** + * Registers a {@link Cancellable} that can be used to cancel the running task. + */ + public Task withCancellation( + final Cancellable cancellable + ) { + return new Task<>((cont) -> { + cont.registerCancellable(cancellable); + cont.getExecutor().resumeOnExecutor(() -> createFun.invoke(cont)); + }); + } + /** * Executes the task asynchronously. *

@@ -330,14 +342,18 @@ public T runBlockingTimed(final Duration timeout) public static Task fromBlockingIO(final DelayedFun run) { return new Task<>((cont) -> { Thread th = Thread.currentThread(); - cont.registerCancellable(th::interrupt); + final var registration = cont.registerCancellable(th::interrupt); + if (registration == null) { + cont.onCancellation(); + return; + } try { T result; try { TaskLocalContext.signalTheStartOfBlockingCall(); result = run.invoke(); } finally { - cont.registerCancellable(Cancellable.getEmpty()); + registration.cancel(); } if (th.isInterrupted()) { throw new InterruptedException(); diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/LoomTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/LoomTest.java index acdb3cc..344af0f 100644 --- a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/LoomTest.java +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/LoomTest.java @@ -1,5 +1,6 @@ package org.funfix.tasks.jvm; +import org.jspecify.annotations.Nullable; import org.junit.jupiter.api.Test; import java.util.Objects; @@ -20,7 +21,7 @@ public void commonPoolInJava21() throws InterruptedException { try { final var latch = new CountDownLatch(1); final var isVirtual = new AtomicBoolean(false); - final var name = new AtomicReference(); + final var name = new AtomicReference<@Nullable String>(); commonPool.execute(() -> { isVirtual.set(VirtualThreads.isVirtualThread(Thread.currentThread())); @@ -48,7 +49,7 @@ public void canInitializeFactoryInJava21() throws InterruptedException, VirtualT final var latch = new CountDownLatch(1); final var isVirtual = new AtomicBoolean(false); - final var name = new AtomicReference(); + final var name = new AtomicReference<@Nullable String>(); f.newThread(() -> { isVirtual.set(VirtualThreads.isVirtualThread(Thread.currentThread())); @@ -73,7 +74,7 @@ public void canInitializeExecutorInJava21() throws InterruptedException, Virtual try { final var latch = new CountDownLatch(1); final var isVirtual = new AtomicBoolean(false); - final var name = new AtomicReference(); + final var name = new AtomicReference<@Nullable String>(); executor.execute(() -> { isVirtual.set(VirtualThreads.isVirtualThread(Thread.currentThread())); name.set(Thread.currentThread().getName()); @@ -120,7 +121,7 @@ public void commonPoolInOlderJava() throws InterruptedException { final var latch = new CountDownLatch(1); final var isVirtual = new AtomicBoolean(true); - final var name = new AtomicReference(); + final var name = new AtomicReference<@Nullable String>(); commonPool.execute(() -> { isVirtual.set(VirtualThreads.isVirtualThread(Thread.currentThread())); diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/OutcomeTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/OutcomeTest.java index 5933b55..3a3c8b3 100644 --- a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/OutcomeTest.java +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/OutcomeTest.java @@ -14,8 +14,7 @@ void outcomeBuildSuccess() { final Outcome outcome2 = Outcome.success("value"); assertEquals(outcome1, outcome2); - if (outcome2 instanceof Outcome.Success) { - final var success = (Outcome.Success) outcome2; + if (outcome2 instanceof Outcome.Success success) { assertEquals("value", success.value()); } else { fail("Expected Success"); @@ -58,8 +57,7 @@ void outcomeBuildRuntimeFailure() { final Outcome outcome2 = Outcome.failure(e); assertEquals(outcome1, outcome2); - if (outcome2 instanceof Outcome.Failure) { - final var failure = (Outcome.Failure) outcome2; + if (outcome2 instanceof Outcome.Failure failure) { assertEquals("error", failure.exception().getMessage()); } else { fail("Expected Failure"); diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/PureTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/PureTest.java index 62f4f1b..931d87e 100644 --- a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/PureTest.java +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/PureTest.java @@ -18,7 +18,7 @@ void pureTask() throws ExecutionException, InterruptedException { } @Test - void pureResource() throws IOException, ExecutionException, InterruptedException { + void pureResource() throws ExecutionException, InterruptedException { final var resource = Resource.pure(42); for (int i = 0; i < 100; i++) { final var outcome = resource.useBlocking(value -> value); diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskGuaranteeOnCompletionTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskGuaranteeOnCompletionTest.java deleted file mode 100644 index 4c4b22a..0000000 --- a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskGuaranteeOnCompletionTest.java +++ /dev/null @@ -1,121 +0,0 @@ -package org.funfix.tasks.jvm; - -import org.jspecify.annotations.Nullable; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; - -import static org.funfix.tasks.jvm.TestSettings.TIMEOUT; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.fail; - -public class TaskGuaranteeOnCompletionTest { - @Test - void guaranteeOnSuccess() throws ExecutionException, InterruptedException { - final var ref1 = new AtomicReference<@Nullable Outcome>(null); - final var ref2 = new AtomicReference<@Nullable Outcome>(null); - final var outcome1 = Task - .fromBlockingIO(() -> "Success") - .onCompletion(ref1::set) - .onCompletion(ref2::set) - .runBlocking(); - - assertEquals("Success", outcome1); - assertEquals(Outcome.success("Success"), ref1.get()); - assertEquals(Outcome.success("Success"), ref2.get()); - } - - @Test - void guaranteeOnSuccessWithFibers() throws ExecutionException, InterruptedException, TaskCancellationException { - final var ref1 = new AtomicReference<@Nullable Outcome>(null); - final var ref2 = new AtomicReference<@Nullable Outcome>(null); - final var fiber = Task - .fromBlockingIO(() -> "Success") - .onCompletion(ref1::set) - .onCompletion(ref2::set) - .runFiber(); - - assertEquals("Success", fiber.awaitBlocking()); - assertEquals(Outcome.success("Success"), ref1.get()); - assertEquals(Outcome.success("Success"), ref2.get()); - } - - @Test - void guaranteeOnFailure() throws InterruptedException { - final var ref1 = new AtomicReference<@Nullable Outcome>(null); - final var ref2 = new AtomicReference<@Nullable Outcome>(null); - final var error = new RuntimeException("Failure"); - - try { - Task.fromBlockingIO(() -> { - throw error; - }) - .onCompletion(ref1::set) - .onCompletion(ref2::set) - .runBlocking(); - fail("Expected ExecutionException"); - } catch (ExecutionException e) { - assertEquals(error, e.getCause()); - } - - assertEquals(Outcome.failure(error), ref1.get()); - assertEquals(Outcome.failure(error), ref2.get()); - } - - @Test - void guaranteeOnFailureWithFibers() throws InterruptedException, TaskCancellationException { - final var ref1 = new AtomicReference<@Nullable Outcome>(null); - final var ref2 = new AtomicReference<@Nullable Outcome>(null); - final var error = new RuntimeException("Failure"); - - try { - Task.fromBlockingIO(() -> { - throw error; - }) - .onCompletion(ref1::set) - .onCompletion(ref2::set) - .runFiber() - .awaitBlocking(); - fail("Expected ExecutionException"); - } catch (ExecutionException e) { - assertEquals(error, e.getCause()); - } - - assertEquals(Outcome.failure(error), ref1.get()); - assertEquals(Outcome.failure(error), ref2.get()); - } - - @Test - void guaranteeOnCancellation() throws InterruptedException, ExecutionException, TimeoutException { - final var ref1 = new AtomicReference<@Nullable Outcome>(null); - final var ref2 = new AtomicReference<@Nullable Outcome>(null); - final var latch = new CountDownLatch(1); - - final var task = Task - .fromBlockingIO(() -> { - latch.await(); - return "Should not complete"; - }) - .onCompletion(ref1::set) - .onCompletion(ref2::set); - - final var fiber = task.runFiber(); - fiber.cancel(); - - try { - fiber.awaitBlockingTimed(TIMEOUT); - fail("Expected TaskCancellationException"); - } catch (TaskCancellationException e) { - // Expected - } catch (TimeoutException e) { - latch.countDown(); - throw e; - } - - assertEquals(Outcome.cancellation(), ref1.get()); - assertEquals(Outcome.cancellation(), ref2.get()); - } -} diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithCancellationTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithCancellationTest.java new file mode 100644 index 0000000..f31e13e --- /dev/null +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithCancellationTest.java @@ -0,0 +1,112 @@ +package org.funfix.tasks.jvm; + +import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class TaskWithCancellationTest { + @Test + void testTaskWithCancellation() throws InterruptedException { + for (int r = 0; r < TestSettings.CONCURRENCY_REPEATS; r++) { + final var cancelTokensRef = new ConcurrentLinkedQueue(); + final var outcomeRef = new AtomicReference<@Nullable Outcome>(null); + + final var startedLatch = new CountDownLatch(1); + final var taskLatch = new CountDownLatch(1); + final var cancelTokensLatch = new CountDownLatch(2); + final var completedLatch = new CountDownLatch(1); + final var runningTask = Task + .fromBlockingIO(() -> { + try { + startedLatch.countDown(); + taskLatch.await(); + return "Completed"; + } catch (final InterruptedException e) { + TimedAwait.latchNoExpectations(cancelTokensLatch); + cancelTokensRef.add(3); + throw e; + } + }) + .ensureRunningOnExecutor() + .withCancellation(() -> { + cancelTokensRef.add(1); + cancelTokensLatch.countDown(); + }) + .withCancellation(() -> { + cancelTokensRef.add(2); + cancelTokensLatch.countDown(); + }) + .runAsync(outcome -> { + outcomeRef.set(outcome); + completedLatch.countDown(); + }); + + TimedAwait.latchAndExpectCompletion(startedLatch, "startedLatch"); + runningTask.cancel(); + TimedAwait.latchAndExpectCompletion(completedLatch, "completedLatch"); + assertEquals(Outcome.cancellation(), outcomeRef.get()); + + final var arr = cancelTokensRef.toArray(new Integer[0]); + for (int i = 0; i < arr.length; i++) { + assertEquals(i + 1, arr[i], "cancelTokensRef[" + i + "] should be " + (i + 1)); + } + assertEquals(3, arr.length, "cancelTokensRef should have 3 elements"); + } + } + + @Test + void testTaskWithCancellationAndFibers() throws Exception { + for (int r = 0; r < TestSettings.CONCURRENCY_REPEATS; r++) { + final var cancelTokensRef = new ConcurrentLinkedQueue(); + final var startedLatch = new CountDownLatch(1); + final var taskLatch = new CountDownLatch(1); + final var cancelTokensLatch = new CountDownLatch(2); + + final var fiber = Task + .fromBlockingIO(() -> { + try { + startedLatch.countDown(); + taskLatch.await(); + return "Completed"; + } catch (final InterruptedException e) { + TimedAwait.latchNoExpectations(cancelTokensLatch); + cancelTokensRef.add(3); + throw e; + } + }) + .ensureRunningOnExecutor() + .withCancellation(() -> { + cancelTokensRef.add(1); + cancelTokensLatch.countDown(); + }) + .withCancellation(() -> { + cancelTokensRef.add(2); + cancelTokensLatch.countDown(); + }) + .runFiber(); + + TimedAwait.latchAndExpectCompletion(startedLatch, "startedLatch"); + fiber.cancel(); + TimedAwait.fiberAndExpectCancellation(fiber); + try { + fiber.getResultOrThrow(); + fail("Should have thrown a TaskCancellationException"); + } catch (final TaskCancellationException e) { + // Expected + } + + final var arr = cancelTokensRef.toArray(new Integer[0]); + for (int i = 0; i < arr.length; i++) { + assertEquals(i + 1, arr[i], "cancelTokensRef[" + i + "] should be " + (i + 1)); + } + assertEquals(3, arr.length, "cancelTokensRef should have 3 elements"); + } + } + +} diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java new file mode 100644 index 0000000..500da6d --- /dev/null +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java @@ -0,0 +1,175 @@ +package org.funfix.tasks.jvm; + +import org.jspecify.annotations.Nullable; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import static org.funfix.tasks.jvm.TestSettings.CONCURRENCY_REPEATS; +import static org.funfix.tasks.jvm.TestSettings.TIMEOUT; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +public class TaskWithOnCompletionTest { + @Test + void guaranteeOnSuccess() throws ExecutionException, InterruptedException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var outcome1 = Task + .fromBlockingIO(() -> "Success") + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runBlocking(); + + assertEquals("Success", outcome1); + assertEquals(Outcome.success("Success"), ref1.get()); + assertEquals(Outcome.success("Success"), ref2.get()); + } + } + + @Test + void guaranteeOnSuccessViaFiber() throws ExecutionException, InterruptedException, TaskCancellationException, TimeoutException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var fiber = Task + .fromBlockingIO(() -> "Success") + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runFiber(); + + assertEquals("Success", fiber.awaitBlockingTimed(TIMEOUT)); + assertEquals(Outcome.success("Success"), ref1.get()); + assertEquals(Outcome.success("Success"), ref2.get()); + } + } + + @Test + void guaranteeOnSuccessWithFibers() throws ExecutionException, InterruptedException, TaskCancellationException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var fiber = Task + .fromBlockingIO(() -> "Success") + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runFiber(); + + assertEquals("Success", fiber.awaitBlocking()); + assertEquals(Outcome.success("Success"), ref1.get()); + assertEquals(Outcome.success("Success"), ref2.get()); + } + } + + @Test + void guaranteeOnFailure() throws InterruptedException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var error = new RuntimeException("Failure"); + + try { + Task.fromBlockingIO(() -> { + throw error; + }) + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runBlocking(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertEquals(error, e.getCause()); + } + + assertEquals(Outcome.failure(error), ref1.get()); + assertEquals(Outcome.failure(error), ref2.get()); + } + } + + @Test + void guaranteeOnFailureWithFibers() throws InterruptedException, TaskCancellationException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var error = new RuntimeException("Failure"); + + try { + Task.fromBlockingIO(() -> { + throw error; + }) + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runFiber() + .awaitBlocking(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertEquals(error, e.getCause()); + } + + assertEquals(Outcome.failure(error), ref1.get()); + assertEquals(Outcome.failure(error), ref2.get()); + } + } + + @Test + void guaranteeOnCancellation() throws InterruptedException, ExecutionException, TimeoutException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var latch = new CountDownLatch(1); + + final var task = Task + .fromBlockingIO(() -> { + latch.await(); + return "Should not complete"; + }) + .ensureRunningOnExecutor() + .withOnComplete(ref1::set) + .withOnComplete(ref2::set); + + final var fiber = task.runFiber(); + fiber.cancel(); + + try { + fiber.awaitBlockingTimed(TIMEOUT); + fail("Expected TaskCancellationException"); + } catch (TaskCancellationException e) { + // Expected + } catch (TimeoutException e) { + latch.countDown(); + throw e; + } + + assertEquals(Outcome.cancellation(), ref1.get()); + assertEquals(Outcome.cancellation(), ref2.get()); + } + } + + @Test + void callOrdering() throws InterruptedException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var latch = new CountDownLatch(1); + final var ref = new ConcurrentLinkedQueue(); + + Task.fromBlockingIO(() -> 0) + .ensureRunningOnExecutor() + .withOnComplete((ignored) -> ref.add(1)) + .withOnComplete((ignored) -> ref.add(2)) + .runAsync((ignored) -> { + ref.add(3); + latch.countDown(); + }); + + TimedAwait.latchAndExpectCompletion(latch, "latch"); + assertEquals(3, ref.size(), "Expected 3 calls to onCompletion"); + final var arr = ref.toArray(new Integer[0]); + for (int i = 0; i < 3; i++) { + assertEquals(i + 1, arr[i]); + } + } + } +} From 3bba078351bc31337651d8074480924c04fba85e Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 11 Aug 2025 08:57:39 +0300 Subject: [PATCH 6/7] Lots of problematic changes --- .../funfix/tasks/jvm/CompletionCallback.java | 308 ++++++++++++++---- .../org/funfix/tasks/jvm/Continuation.java | 17 +- .../main/java/org/funfix/tasks/jvm/Fiber.java | 5 +- .../main/java/org/funfix/tasks/jvm/Task.java | 156 +-------- .../tasks/jvm/CompletionCallbackTest.java | 6 +- .../tasks/jvm/TaskWithOnCompletionTest.java | 12 +- 6 files changed, 282 insertions(+), 222 deletions(-) diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java index dd1644f..11746ce 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java @@ -4,8 +4,14 @@ import org.jspecify.annotations.Nullable; import java.io.Serializable; +import java.time.Duration; +import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * Represents a callback that will be invoked when a task completes. @@ -58,24 +64,6 @@ default void onCancellation() { } }; } - - /** - * Composes multiple {@code CompletionCallback} instances into a single - * {@code CompletionCallback} that will invoke all the listeners in order. - */ - @SafeVarargs - static CompletionCallback compose( - final CompletionCallback... listeners - ) { - Objects.requireNonNull(listeners, "listeners"); - if (listeners.length == 0) { - return empty(); - } else if (listeners.length == 1) { - return listeners[0]; - } else { - return new ManyCompletionCallback<>(listeners); - } - } } @ApiStatus.Internal @@ -90,66 +78,76 @@ final class ManyCompletionCallback this.listeners = listeners; } + ManyCompletionCallback withExtraListener(CompletionCallback extraListener) { + Objects.requireNonNull(extraListener, "extraListener"); + final var newListeners = Arrays.copyOf(listeners, listeners.length + 1); + newListeners[newListeners.length - 1] = extraListener; + return new ManyCompletionCallback<>(newListeners); + } + @Override public void onOutcome(Outcome outcome) { - Trampoline.execute(() -> { - for (final CompletionCallback listener : listeners) { - try { - listener.onOutcome(outcome); - } catch (Throwable e) { - UncaughtExceptionHandler.logOrRethrow(e); - } + for (final CompletionCallback listener : listeners) { + try { + listener.onOutcome(outcome); + } catch (Throwable e) { + UncaughtExceptionHandler.logOrRethrow(e); } - }); + } } @Override public void onSuccess(T value) { - Trampoline.execute(() -> { - for (final CompletionCallback listener : listeners) { - try { - listener.onSuccess(value); - } catch (Throwable e) { - UncaughtExceptionHandler.logOrRethrow(e); - } + for (final CompletionCallback listener : listeners) { + try { + listener.onSuccess(value); + } catch (Throwable e) { + UncaughtExceptionHandler.logOrRethrow(e); } - }); + } } @Override public void onFailure(Throwable e) { Objects.requireNonNull(e, "e"); - Trampoline.execute(() -> { - for (final CompletionCallback listener : listeners) { - try { - listener.onFailure(e); - } catch (Throwable ex) { - UncaughtExceptionHandler.logOrRethrow(ex); - } + for (final CompletionCallback listener : listeners) { + try { + listener.onFailure(e); + } catch (Throwable ex) { + UncaughtExceptionHandler.logOrRethrow(ex); } - }); + } } @Override public void onCancellation() { - Trampoline.execute(() -> { - for (final CompletionCallback listener : listeners) { - try { - listener.onCancellation(); - } catch (Throwable e) { - UncaughtExceptionHandler.logOrRethrow(e); - } + for (final CompletionCallback listener : listeners) { + try { + listener.onCancellation(); + } catch (Throwable e) { + UncaughtExceptionHandler.logOrRethrow(e); } - }); + } } } @ApiStatus.Internal -final class ProtectedCompletionCallback - implements CompletionCallback, Runnable { +interface ContinuationCallback + extends CompletionCallback, Serializable { + + /** + * Registers an extra callback to be invoked when the task completes. + * This is useful for chaining callbacks or adding additional listeners. + */ + void registerExtraCallback(CompletionCallback extraCallback); +} + +@ApiStatus.Internal +final class AsyncContinuationCallback + implements ContinuationCallback, Runnable { private final AtomicBoolean isWaiting = new AtomicBoolean(true); - private final CompletionCallback listener; + private final AtomicReference> listenerRef; private final TaskExecutor executor; private @Nullable Outcome outcome; @@ -157,24 +155,27 @@ final class ProtectedCompletionCallback private @Nullable Throwable failureCause; private boolean isCancelled = false; - private ProtectedCompletionCallback( + public AsyncContinuationCallback( final CompletionCallback listener, final TaskExecutor executor ) { - this.listener = listener; this.executor = executor; + this.listenerRef = new AtomicReference<>( + Objects.requireNonNull(listener, "listener") + ); } @Override + @SuppressWarnings("NullAway") public void run() { if (this.outcome != null) { - listener.onOutcome(this.outcome); + listenerRef.get().onOutcome(this.outcome); } else if (this.failureCause != null) { - listener.onFailure(this.failureCause); + listenerRef.get().onFailure(this.failureCause); } else if (this.isCancelled) { - listener.onCancellation(); + listenerRef.get().onCancellation(); } else if (this.successValue != null) { - listener.onSuccess(this.successValue); + listenerRef.get().onSuccess(this.successValue); } else { throw new IllegalStateException("No outcome, success value, failure cause, or cancellation state set"); } @@ -223,14 +224,199 @@ public void onCancellation() { } } - public static CompletionCallback protect( + public static ContinuationCallback protect( final TaskExecutor executor, final CompletionCallback listener ) { Objects.requireNonNull(listener, "listener"); - return new ProtectedCompletionCallback<>( + return new AsyncContinuationCallback<>( listener, executor ); } + + @SuppressWarnings("NullAway") + public void registerExtraCallback(CompletionCallback extraCallback) { + while (true) { + final var current = listenerRef.get(); + if (current instanceof ManyCompletionCallback many) { + final var update = many.withExtraListener(extraCallback); + if (listenerRef.compareAndSet(current, update)) { + return; + } + } else if (listenerRef.compareAndSet(current, new ManyCompletionCallback<>(current, extraCallback))) { + return; + } + } + } +} + +/** + * INTERNAL API. + *

+ * INTERNAL API: Internal apis are subject to change or removal + * without any notice. When code depends on internal APIs, it is subject to + * breakage between minor version updates. + */ +@ApiStatus.Internal +final class BlockingCompletionCallback + extends AbstractQueuedSynchronizer implements ContinuationCallback { + + private final AtomicBoolean isDone = + new AtomicBoolean(false); + private final AtomicReference<@Nullable CompletionCallback> extraCallbackRef = + new AtomicReference<>(null); + + @Nullable + private T result = null; + @Nullable + private Throwable error = null; + @Nullable + private InterruptedException interrupted = null; + + @SuppressWarnings("NullAway") + private void notifyOutcome() { + final var extraCallback = extraCallbackRef.getAndSet(null); + if (extraCallback != null) + try { + if (error != null) + extraCallback.onFailure(error); + else if (interrupted != null) + extraCallback.onCancellation(); + else + extraCallback.onSuccess(result); + } catch (Throwable e) { + UncaughtExceptionHandler.logOrRethrow(e); + } + releaseShared(1); + } + + @Override + public void onSuccess(final T value) { + if (!isDone.getAndSet(true)) { + result = value; + notifyOutcome(); + } + } + + @Override + public void onFailure(final Throwable e) { + UncaughtExceptionHandler.rethrowIfFatal(e); + if (!isDone.getAndSet(true)) { + error = e; + notifyOutcome(); + } else { + UncaughtExceptionHandler.logOrRethrow(e); + } + } + + @Override + public void onCancellation() { + if (!isDone.getAndSet(true)) { + interrupted = new InterruptedException("Task was cancelled"); + notifyOutcome(); + } + } + + @Override + public void onOutcome(Outcome outcome) { + if (outcome instanceof Outcome.Success success) { + onSuccess(success.value()); + } else if (outcome instanceof Outcome.Failure failure) { + onFailure(failure.exception()); + } else { + onCancellation(); + } + } + + @Override + protected int tryAcquireShared(final int arg) { + return getState() != 0 ? 1 : -1; + } + + @Override + protected boolean tryReleaseShared(final int arg) { + setState(1); + return true; + } + + @FunctionalInterface + interface AwaitFunction { + void apply(boolean isCancelled) throws InterruptedException, TimeoutException; + } + + @SuppressWarnings("NullAway") + private T awaitInline(final Cancellable cancelToken, final AwaitFunction await) + throws InterruptedException, ExecutionException, TimeoutException { + + TaskLocalContext.signalTheStartOfBlockingCall(); + var isCancelled = false; + TimeoutException timedOut = null; + while (true) { + try { + await.apply(isCancelled); + break; + } catch (final TimeoutException | InterruptedException e) { + if (!isCancelled) { + isCancelled = true; + if (e instanceof TimeoutException te) + timedOut = te; + cancelToken.cancel(); + } + } + // Clearing the interrupted flag may not be necessary, + // but doesn't hurt, and we should have a cleared flag before + // re-throwing the exception + // + // noinspection ResultOfMethodCallIgnored + Thread.interrupted(); + } + if (timedOut != null) throw timedOut; + if (interrupted != null) throw interrupted; + if (error != null) throw new ExecutionException(error); + return result; + } + + public T await(final Cancellable cancelToken) throws InterruptedException, ExecutionException { + try { + return awaitInline(cancelToken, isCancelled -> acquireSharedInterruptibly(1)); + } catch (final TimeoutException e) { + throw new IllegalStateException("Unexpected timeout", e); + } + } + + public T await(final Cancellable cancelToken, final Duration timeout) + throws ExecutionException, InterruptedException, TimeoutException { + + return awaitInline(cancelToken, isCancelled -> { + if (!isCancelled) { + if (!tryAcquireSharedNanos(1, timeout.toNanos())) { + throw new TimeoutException("Task timed-out after " + timeout); + } + } else { + // Waiting without a timeout, since at this point it's waiting + // on the cancelled task to finish + acquireSharedInterruptibly(1); + } + }); + } + + @Override + public void registerExtraCallback(CompletionCallback extraCallback) { + while (true) { + final var current = extraCallbackRef.get(); + if (current == null) { + if (extraCallbackRef.compareAndSet(null, extraCallback)) { + return; + } + } else if (current instanceof ManyCompletionCallback many) { + final var update = many.withExtraListener(extraCallback); + if (extraCallbackRef.compareAndSet(current, update)) { + return; + } + } else if (extraCallbackRef.compareAndSet(current, new ManyCompletionCallback<>(current, extraCallback))) { + return; + } + } + } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java index abb24d4..268a9a2 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Continuation.java @@ -39,7 +39,7 @@ interface Continuation Continuation withExecutorOverride(TaskExecutor executor); - Continuation withExtraCallback(CompletionCallback extraCallback); + void registerExtraCallback(CompletionCallback extraCallback); } /** @@ -58,13 +58,13 @@ interface AsyncContinuationFun { final class CancellableContinuation implements Continuation, Cancellable { - private final CompletionCallback callback; + private final ContinuationCallback callback; private final MutableCancellable cancellableRef; private final TaskExecutor executor; public CancellableContinuation( final TaskExecutor executor, - final CompletionCallback callback + final ContinuationCallback callback ) { this( executor, @@ -75,7 +75,7 @@ public CancellableContinuation( CancellableContinuation( final TaskExecutor executor, - final CompletionCallback callback, + final ContinuationCallback callback, final MutableCancellable cancellable ) { this.executor = executor; @@ -133,12 +133,7 @@ public Continuation withExecutorOverride(TaskExecutor executor) { } @Override - public Continuation withExtraCallback(CompletionCallback extraCallback) { - final var updatedCallback = CompletionCallback.compose(extraCallback, callback); - return new CancellableContinuation<>( - executor, - updatedCallback, - cancellableRef - ); + public void registerExtraCallback(CompletionCallback extraCallback) { + callback.registerExtraCallback(extraCallback); } } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java index e2aa5ea..3d8d50e 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Fiber.java @@ -270,7 +270,10 @@ private ExecutedFiber(final TaskExecutor executor) { this.executor = executor; this.continuation = new CancellableContinuation<>( executor, - new FiberCallback<>(executor, stateRef), + new AsyncContinuationCallback<>( + new FiberCallback<>(executor, stateRef), + executor + ), cancellableRef ); } diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java index 88bba66..ec7b4ed 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/Task.java @@ -8,8 +8,6 @@ import java.time.Duration; import java.util.Objects; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * Represents a function that can be executed asynchronously. @@ -81,19 +79,19 @@ public Task ensureRunningOnExecutor() { *

* This callback will be invoked in addition to whatever the client * provides as a callback to {@link #runAsync(CompletionCallback)} - * or similar methods, and will also be invoked before it, i.e., - * callbacks get added and executed in LIFO order. + * or similar methods. + *

+ * WARNING: The invocation of this method is concurrent + * with the task's completion, meaning that ordering isn't guaranteed + * (i.e., a callback installed with this method may be called before or + * after the callback provided to {@link #runAsync(CompletionCallback)}). */ public Task withOnComplete(final CompletionCallback callback) { return new Task<>((cont) -> { @SuppressWarnings("unchecked") - final Continuation cont2 = cont.withExtraCallback( - ProtectedCompletionCallback.protect( - cont.getExecutor(), - (CompletionCallback) callback - ) - ); - cont2.getExecutor().resumeOnExecutor(() -> createFun.invoke(cont2)); + final var extraCallback = (CompletionCallback) Objects.requireNonNull(callback); + cont.registerExtraCallback(extraCallback); + cont.getExecutor().resumeOnExecutor(() -> createFun.invoke(cont)); }); } @@ -129,9 +127,9 @@ public Cancellable runAsync( @SuppressWarnings("unchecked") final var cont = new CancellableContinuation<>( taskExecutor, - ProtectedCompletionCallback.protect( - taskExecutor, - (CompletionCallback) callback + new AsyncContinuationCallback<>( + (CompletionCallback) callback, + taskExecutor ) ); taskExecutor.execute(() -> { @@ -480,136 +478,6 @@ public T runBlockingTimed(final Duration timeout) public static final Task NOOP = Task.pure(null); } -/** - * INTERNAL API. - *

- * INTERNAL API: Internal apis are subject to change or removal - * without any notice. When code depends on internal APIs, it is subject to - * breakage between minor version updates. - */ -@ApiStatus.Internal -final class BlockingCompletionCallback - extends AbstractQueuedSynchronizer implements CompletionCallback { - - private final AtomicBoolean isDone = new AtomicBoolean(false); - @Nullable - private T result = null; - @Nullable - private Throwable error = null; - @Nullable - private InterruptedException interrupted = null; - - @Override - public void onSuccess(final T value) { - if (!isDone.getAndSet(true)) { - result = value; - releaseShared(1); - } - } - - @Override - public void onFailure(final Throwable e) { - UncaughtExceptionHandler.rethrowIfFatal(e); - if (!isDone.getAndSet(true)) { - error = e; - releaseShared(1); - } else { - UncaughtExceptionHandler.logOrRethrow(e); - } - } - - @Override - public void onCancellation() { - if (!isDone.getAndSet(true)) { - interrupted = new InterruptedException("Task was cancelled"); - releaseShared(1); - } - } - - @Override - public void onOutcome(Outcome outcome) { - if (outcome instanceof Outcome.Success success) { - onSuccess(success.value()); - } else if (outcome instanceof Outcome.Failure failure) { - onFailure(failure.exception()); - } else { - onCancellation(); - } - } - - @Override - protected int tryAcquireShared(final int arg) { - return getState() != 0 ? 1 : -1; - } - - @Override - protected boolean tryReleaseShared(final int arg) { - setState(1); - return true; - } - - @FunctionalInterface - interface AwaitFunction { - void apply(boolean isCancelled) throws InterruptedException, TimeoutException; - } - - @SuppressWarnings("NullAway") - private T awaitInline(final Cancellable cancelToken, final AwaitFunction await) - throws InterruptedException, ExecutionException, TimeoutException { - - TaskLocalContext.signalTheStartOfBlockingCall(); - var isCancelled = false; - TimeoutException timedOut = null; - while (true) { - try { - await.apply(isCancelled); - break; - } catch (final TimeoutException | InterruptedException e) { - if (!isCancelled) { - isCancelled = true; - if (e instanceof TimeoutException te) - timedOut = te; - cancelToken.cancel(); - } - } - // Clearing the interrupted flag may not be necessary, - // but doesn't hurt, and we should have a cleared flag before - // re-throwing the exception - // - // noinspection ResultOfMethodCallIgnored - Thread.interrupted(); - } - if (timedOut != null) throw timedOut; - if (interrupted != null) throw interrupted; - if (error != null) throw new ExecutionException(error); - return result; - } - - public T await(final Cancellable cancelToken) throws InterruptedException, ExecutionException { - try { - return awaitInline(cancelToken, isCancelled -> acquireSharedInterruptibly(1)); - } catch (final TimeoutException e) { - throw new IllegalStateException("Unexpected timeout", e); - } - } - - public T await(final Cancellable cancelToken, final Duration timeout) - throws ExecutionException, InterruptedException, TimeoutException { - - return awaitInline(cancelToken, isCancelled -> { - if (!isCancelled) { - if (!tryAcquireSharedNanos(1, timeout.toNanos())) { - throw new TimeoutException("Task timed-out after " + timeout); - } - } else { - // Waiting without a timeout, since at this point it's waiting - // on the cancelled task to finish - acquireSharedInterruptibly(1); - } - }); - } -} - /** * INTERNAL API. *

diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/CompletionCallbackTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/CompletionCallbackTest.java index 122dc2e..e92e534 100644 --- a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/CompletionCallbackTest.java +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/CompletionCallbackTest.java @@ -32,7 +32,7 @@ void emptyLogsRuntimeFailure() throws InterruptedException { void protectedCallbackForSuccess() { final var called = new AtomicInteger(0); final var outcomeRef = new AtomicReference<@Nullable Outcome>(null); - final var cb = ProtectedCompletionCallback.protect( + final var cb = AsyncContinuationCallback.protect( TaskExecutor.from(TaskExecutors.trampoline()), new CompletionCallback() { @Override @@ -71,7 +71,7 @@ public void onOutcome(Outcome outcome) { void protectedCallbackForRuntimeFailure() throws InterruptedException { final var called = new AtomicInteger(0); final var outcomeRef = new AtomicReference<@Nullable Outcome>(null); - final var cb = ProtectedCompletionCallback.protect( + final var cb = AsyncContinuationCallback.protect( TaskExecutor.from(TaskExecutors.trampoline()), new CompletionCallback() { @Override @@ -119,7 +119,7 @@ public void onOutcome(Outcome outcome) { void protectedCallbackForCancellation() { final var called = new AtomicInteger(0); final var outcomeRef = new AtomicReference<@Nullable Outcome>(null); - final var cb = ProtectedCompletionCallback.protect( + final var cb = AsyncContinuationCallback.protect( TaskExecutor.from(TaskExecutors.trampoline()), new CompletionCallback() { @Override diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java index 500da6d..6a879d1 100644 --- a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java @@ -18,15 +18,23 @@ public class TaskWithOnCompletionTest { @Test void guaranteeOnSuccess() throws ExecutionException, InterruptedException { for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var latch = new CountDownLatch(2); final var ref1 = new AtomicReference<@Nullable Outcome>(null); final var ref2 = new AtomicReference<@Nullable Outcome>(null); final var outcome1 = Task .fromBlockingIO(() -> "Success") - .withOnComplete(ref1::set) - .withOnComplete(ref2::set) + .withOnComplete(o -> { + ref1.set(o); + latch.countDown(); + }) + .withOnComplete(o -> { + ref2.set(o); + latch.countDown(); + }) .runBlocking(); assertEquals("Success", outcome1); + TimedAwait.latchAndExpectCompletion(latch, "latch"); assertEquals(Outcome.success("Success"), ref1.get()); assertEquals(Outcome.success("Success"), ref2.get()); } From 521333c577b499be83e7cc27d233cb3d852a36d9 Mon Sep 17 00:00:00 2001 From: Alexandru Nedelcu Date: Mon, 11 Aug 2025 09:17:26 +0300 Subject: [PATCH 7/7] Fixes :( --- .../funfix/tasks/jvm/CompletionCallback.java | 22 ++++-- .../tasks/jvm/TaskWithOnCompletionTest.java | 71 ++++++++++++++----- 2 files changed, 70 insertions(+), 23 deletions(-) diff --git a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java index 11746ce..5040be3 100644 --- a/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java +++ b/tasks-jvm/src/main/java/org/funfix/tasks/jvm/CompletionCallback.java @@ -5,7 +5,6 @@ import java.io.Serializable; import java.time.Duration; -import java.util.Arrays; import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -70,18 +69,29 @@ default void onCancellation() { final class ManyCompletionCallback implements CompletionCallback { - private final CompletionCallback[] listeners; + private final ImmutableStack> listeners; @SafeVarargs ManyCompletionCallback(final CompletionCallback... listeners) { + Objects.requireNonNull(listeners, "listeners"); + ImmutableStack> stack = ImmutableStack.empty(); + for (final CompletionCallback listener : listeners) { + Objects.requireNonNull(listener, "listener"); + stack = stack.prepend(listener); + } + this.listeners = stack; + } + + private ManyCompletionCallback( + final ImmutableStack> listeners + ) { Objects.requireNonNull(listeners, "listeners"); this.listeners = listeners; } ManyCompletionCallback withExtraListener(CompletionCallback extraListener) { Objects.requireNonNull(extraListener, "extraListener"); - final var newListeners = Arrays.copyOf(listeners, listeners.length + 1); - newListeners[newListeners.length - 1] = extraListener; + final var newListeners = this.listeners.prepend(extraListener); return new ManyCompletionCallback<>(newListeners); } @@ -174,10 +184,8 @@ public void run() { listenerRef.get().onFailure(this.failureCause); } else if (this.isCancelled) { listenerRef.get().onCancellation(); - } else if (this.successValue != null) { - listenerRef.get().onSuccess(this.successValue); } else { - throw new IllegalStateException("No outcome, success value, failure cause, or cancellation state set"); + listenerRef.get().onSuccess(this.successValue); } // For GC purposes; but it doesn't really matter if we nullify these or not this.outcome = null; diff --git a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java index 6a879d1..7d4fed9 100644 --- a/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java +++ b/tasks-jvm/src/test/java/org/funfix/tasks/jvm/TaskWithOnCompletionTest.java @@ -18,30 +18,22 @@ public class TaskWithOnCompletionTest { @Test void guaranteeOnSuccess() throws ExecutionException, InterruptedException { for (int t = 0; t < CONCURRENCY_REPEATS; t++) { - final var latch = new CountDownLatch(2); final var ref1 = new AtomicReference<@Nullable Outcome>(null); final var ref2 = new AtomicReference<@Nullable Outcome>(null); final var outcome1 = Task .fromBlockingIO(() -> "Success") - .withOnComplete(o -> { - ref1.set(o); - latch.countDown(); - }) - .withOnComplete(o -> { - ref2.set(o); - latch.countDown(); - }) + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) .runBlocking(); assertEquals("Success", outcome1); - TimedAwait.latchAndExpectCompletion(latch, "latch"); assertEquals(Outcome.success("Success"), ref1.get()); assertEquals(Outcome.success("Success"), ref2.get()); } } @Test - void guaranteeOnSuccessViaFiber() throws ExecutionException, InterruptedException, TaskCancellationException, TimeoutException { + void guaranteeOnSuccessWithFibers() throws ExecutionException, InterruptedException, TaskCancellationException { for (int t = 0; t < CONCURRENCY_REPEATS; t++) { final var ref1 = new AtomicReference<@Nullable Outcome>(null); final var ref2 = new AtomicReference<@Nullable Outcome>(null); @@ -51,29 +43,31 @@ void guaranteeOnSuccessViaFiber() throws ExecutionException, InterruptedExceptio .withOnComplete(ref2::set) .runFiber(); - assertEquals("Success", fiber.awaitBlockingTimed(TIMEOUT)); + assertEquals("Success", fiber.awaitBlocking()); assertEquals(Outcome.success("Success"), ref1.get()); assertEquals(Outcome.success("Success"), ref2.get()); } } @Test - void guaranteeOnSuccessWithFibers() throws ExecutionException, InterruptedException, TaskCancellationException { + void guaranteeOnSuccessWithBlockingIO() throws ExecutionException, InterruptedException, + TaskCancellationException { for (int t = 0; t < CONCURRENCY_REPEATS; t++) { final var ref1 = new AtomicReference<@Nullable Outcome>(null); final var ref2 = new AtomicReference<@Nullable Outcome>(null); - final var fiber = Task + final var r = Task .fromBlockingIO(() -> "Success") .withOnComplete(ref1::set) .withOnComplete(ref2::set) - .runFiber(); + .runBlocking(); - assertEquals("Success", fiber.awaitBlocking()); + assertEquals("Success", r); assertEquals(Outcome.success("Success"), ref1.get()); assertEquals(Outcome.success("Success"), ref2.get()); } } + @Test void guaranteeOnFailure() throws InterruptedException { for (int t = 0; t < CONCURRENCY_REPEATS; t++) { @@ -123,6 +117,30 @@ void guaranteeOnFailureWithFibers() throws InterruptedException, TaskCancellatio } } + @Test + void guaranteeOnFailureBlockingIO() throws InterruptedException, TaskCancellationException { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref1 = new AtomicReference<@Nullable Outcome>(null); + final var ref2 = new AtomicReference<@Nullable Outcome>(null); + final var error = new RuntimeException("Failure"); + + try { + Task.fromBlockingIO(() -> { + throw error; + }) + .withOnComplete(ref1::set) + .withOnComplete(ref2::set) + .runBlocking(); + fail("Expected ExecutionException"); + } catch (ExecutionException e) { + assertEquals(error, e.getCause()); + } + + assertEquals(Outcome.failure(error), ref1.get()); + assertEquals(Outcome.failure(error), ref2.get()); + } + } + @Test void guaranteeOnCancellation() throws InterruptedException, ExecutionException, TimeoutException { for (int t = 0; t < CONCURRENCY_REPEATS; t++) { @@ -180,4 +198,25 @@ void callOrdering() throws InterruptedException { } } } + + @Test + void callOrderingViaFibers() throws Exception { + for (int t = 0; t < CONCURRENCY_REPEATS; t++) { + final var ref = new ConcurrentLinkedQueue(); + + final var fiber = Task.fromBlockingIO(() -> 0) + .ensureRunningOnExecutor() + .withOnComplete((ignored) -> ref.add(1)) + .withOnComplete((ignored) -> ref.add(2)) + .runFiber(); + + assertEquals(0, fiber.awaitBlockingTimed(TIMEOUT)); + assertEquals(2, ref.size(), "Expected 2 calls to onCompletion"); + final var arr = ref.toArray(new Integer[0]); + for (int i = 0; i < arr.length; i++) { + assertEquals(i + 1, arr[i]); + } + } + } + }