Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/tasks.base.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mavenPublishing {
licenses {
license {
name = "The Apache License, Version 2.0"
url = "http://www.apache.org/licenses/LICENSE-2.0.txt"
url = "https://www.apache.org/licenses/LICENSE-2.0.txt"
}
}

Expand Down
112 changes: 88 additions & 24 deletions tasks-jvm/src/main/java/org/funfix/tasks/jvm/Cancellable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.jetbrains.annotations.ApiStatus;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.Nullable;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -48,7 +49,7 @@ static Cancellable getEmpty() {
*/
@ApiStatus.Internal
final class CancellableUtils {
static Cancellable EMPTY = () -> {};
static final Cancellable EMPTY = () -> {};
}

/**
Expand All @@ -60,8 +61,7 @@ final class CancellableUtils {
* breakage between minor version updates.
*/
@ApiStatus.Internal
@FunctionalInterface
interface CancellableForwardRef {
interface CancellableForwardRef extends Cancellable {
void set(Cancellable cancellable);
}

Expand All @@ -74,41 +74,104 @@ interface CancellableForwardRef {
*/
@ApiStatus.Internal
final class MutableCancellable implements Cancellable {
private final AtomicReference<State> ref =
new AtomicReference<>(new State.Active(Cancellable.getEmpty(), 0));
private final AtomicReference<State> ref;

MutableCancellable(final Cancellable initialRef) {
ref = new AtomicReference<>(new State.Active(initialRef, 0, null));
}

MutableCancellable() {
this(CancellableUtils.EMPTY);
}

@Override
public void cancel() {
final var prev = ref.getAndSet(State.Cancelled.INSTANCE);
if (prev instanceof State.Active active) {
active.token.cancel();
@Nullable
var state = ref.getAndSet(State.Closed.INSTANCE);
while (state instanceof State.Active active) {
try {
active.token.cancel();
} catch (Exception e) {
UncaughtExceptionHandler.logOrRethrow(e);
}
state = active.rest;
}
}

public CancellableForwardRef newCancellableRef() {
final var current = ref.get();
if (current instanceof State.Cancelled) {
return Cancellable::cancel;
if (current instanceof State.Closed) {
return new CancellableForwardRef() {
@Override
public void set(Cancellable cancellable) {
cancellable.cancel();
}
@Override
public void cancel() {}
};
} else if (current instanceof State.Active active) {
return cancellable -> registerOrdered(
active.order,
cancellable,
active
);
return new CancellableForwardRef() {
@Override
public void set(Cancellable cancellable) {
registerOrdered(
active.order,
cancellable,
active
);
}

@Override
public void cancel() {
unregister(active.order);
}
};
} else {
throw new IllegalStateException("Invalid state: " + current);
}
}

public void register(Cancellable token) {
public @Nullable Cancellable register(Cancellable token) {
Objects.requireNonNull(token, "token");
while (true) {
final var current = ref.get();
if (current instanceof State.Active active) {
final var update = new State.Active(token, active.order + 1);
if (ref.compareAndSet(current, update)) { return; }
} else if (current instanceof State.Cancelled) {
final var newOrder = active.order + 1;
final var update = new State.Active(token, newOrder, active);
if (ref.compareAndSet(current, update)) { return () -> unregister(newOrder); }
} else if (current instanceof State.Closed) {
token.cancel();
return null;
} else {
throw new IllegalStateException("Invalid state: " + current);
}
}
}

private void unregister(final long order) {
while (true) {
final var current = ref.get();
if (current instanceof State.Active active) {
@Nullable var cursor = active;
@Nullable State.Active acc = null;
while (cursor != null) {
if (cursor.order != order) {
acc = new State.Active(cursor.token, cursor.order, acc);
}
cursor = cursor.rest;
}
// Reversing
@Nullable State.Active update = null;
while (acc != null) {
update = new State.Active(acc.token, acc.order, update);
acc = acc.rest;
}
if (update == null) {
update = new State.Active(Cancellable.getEmpty(), 0, null);
}
if (ref.compareAndSet(current, update)) {
return;
}
} else if (current instanceof State.Closed) {
return;
} else {
throw new IllegalStateException("Invalid state: " + current);
Expand All @@ -126,11 +189,11 @@ private void registerOrdered(
// Double-check ordering
if (active.order != order) { return; }
// Try to update
final var update = new State.Active(newToken, order + 1);
final var update = new State.Active(newToken, order + 1, null);
if (ref.compareAndSet(current, update)) { return; }
// Retry
current = ref.get();
} else if (current instanceof State.Cancelled) {
} else if (current instanceof State.Closed) {
newToken.cancel();
return;
} else {
Expand All @@ -142,11 +205,12 @@ private void registerOrdered(
sealed interface State {
record Active(
Cancellable token,
long order
long order,
@Nullable Active rest
) implements State {}

record Cancelled() implements State {
static Cancelled INSTANCE = new Cancelled();
record Closed() implements State {
static final Closed INSTANCE = new Closed();
}
}
}
Loading