Skip to content

Commit f35ea33

Browse files
committed
allow event callbacks
1 parent a845690 commit f35ea33

File tree

2 files changed

+97
-107
lines changed

2 files changed

+97
-107
lines changed

src/main/java/org/comroid/api/func/util/Event.java

Lines changed: 88 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import lombok.Data;
66
import lombok.EqualsAndHashCode;
77
import lombok.Getter;
8-
import lombok.RequiredArgsConstructor;
98
import lombok.Setter;
109
import lombok.ToString;
1110
import lombok.Value;
@@ -64,16 +63,35 @@
6463

6564
@Data
6665
@EqualsAndHashCode(of = { "cancelled", "unixNanos", "key", "seq" })
67-
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
6866
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
6967
public class Event<T> implements Wrap<T> {
7068
long unixNanos = System.nanoTime();
7169
long seq;
72-
@Nullable Long flag;
73-
@Nullable String key;
74-
@Nullable T data;
75-
@NonFinal
76-
boolean cancelled = false;
70+
@Nullable Long flag;
71+
@Nullable String key;
72+
@Nullable T data;
73+
@Nullable Runnable callback;
74+
@NonFinal boolean cancelled = false;
75+
76+
public Event(long seq, @Nullable T data) {
77+
this(seq, null, data);
78+
}
79+
80+
public Event(long seq, @Nullable String key, @Nullable T data) {
81+
this(seq, null, key, data);
82+
}
83+
84+
public Event(long seq, @Nullable Long flag, @Nullable String key, @Nullable T data) {
85+
this(seq, flag, key, data, null);
86+
}
87+
88+
public Event(long seq, @Nullable Long flag, @Nullable String key, @Nullable T data, @Nullable Runnable callback) {
89+
this.seq = seq;
90+
this.flag = flag;
91+
this.key = key;
92+
this.data = data;
93+
this.callback = callback;
94+
}
7795

7896
public Instant getTimestamp() {
7997
return Instant.ofEpochMilli(unixNanos / 1000);
@@ -84,7 +102,7 @@ public Instant getTimestamp() {
84102
}
85103

86104
public <R> @Nullable Event<R> withData(@Nullable R data) {
87-
return data == null ? null : new Event<>(seq, flag, key, data).setCancelled(cancelled);
105+
return data == null ? null : new Event<>(seq, flag, key, data, callback).setCancelled(cancelled);
88106
}
89107

90108
public boolean cancel() {
@@ -127,41 +145,31 @@ public static abstract class Factory<T, E extends Event<? super T>> implements N
127145

128146
@Override
129147
public E apply(T data, String key, Long flag) {
130-
return factory(counter(), data, key, flag);
148+
return factory(counter(), flag, data, key);
131149
}
132150

133151
public Long counter() {
134152
var seq = counter.addAndGet(1);
135-
if (seq == Long.MAX_VALUE)
136-
counter.set(0);
153+
if (seq == Long.MAX_VALUE) counter.set(0);
137154
return seq;
138155
}
139156

140-
public abstract E factory(long seq, T data, String key, long flag);
157+
public abstract E factory(long seq, long flag, @Nullable T data, @Nullable String key);
141158
}
142159

143160
@Getter
144161
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
145162
@EqualsAndHashCode(exclude = "bus", callSuper = true)
146163
@ToString(of = "location", includeFieldNames = false)
147164
public static class Listener<T> extends Container.Base implements Predicate<Event<T>>, Consumer<Event<T>>, Comparable<Listener<?>>, Closeable, Named {
148-
@NotNull
149-
static Comparator<Listener<?>> Comparator = java.util.Comparator.<Listener<?>>comparingInt(Listener::getPriority).reversed();
150-
@NotNull Event.Bus<T> bus;
151-
@lombok.experimental.Delegate
152-
Predicate<Event<T>> requirement;
153-
@lombok.experimental.Delegate
154-
Consumer<Event<T>> action;
165+
@NotNull static Comparator<Listener<?>> Comparator = java.util.Comparator.<Listener<?>>comparingInt(Listener::getPriority).reversed();
166+
@NotNull Event.Bus<T> bus;
167+
@lombok.experimental.Delegate Predicate<Event<T>> requirement;
168+
@lombok.experimental.Delegate Consumer<Event<T>> action;
155169
String location;
156-
@NonFinal
157-
@Setter
158-
int priority = 0;
159-
@NonFinal
160-
@Setter
161-
@Nullable
162-
String name;
163-
@NonFinal
164-
boolean active = true;
170+
@NonFinal @Setter int priority = 0;
171+
@NonFinal @Setter @Nullable String name;
172+
@NonFinal boolean active = true;
165173

166174
private Listener(@Nullable String key, @NotNull Bus<T> bus, Predicate<Event<T>> requirement, Consumer<Event<T>> action) {
167175
this.name = key;
@@ -190,29 +198,24 @@ public int compareTo(@NotNull Event.Listener<?> other) {
190198
@EqualsAndHashCode(of = { }, callSuper = true)
191199
@ToString(of = { "name", "upstream", "factory", "active" })
192200
public static class Bus<T> extends Container.Base implements Named, N.Consumer.$3<T, String, Long>, Provider<T>, UUIDContainer {
193-
@Nullable
194-
private Event.Bus<?> upstream;
195-
@NotNull Set<Event.Bus<?>> downstream = new HashSet<>();
196-
@NotNull Queue<Event.Listener<T>> listeners = new ConcurrentLinkedQueue<>();
197-
@Nullable
198-
private Function<@NotNull Event<?>, @Nullable Event<T>> function;
199-
@Nullable
200-
private Function<String, String> keyFunction;
201-
@Setter
202-
private Event.Factory<T, ? extends Event<T>> factory = new Factory<>() {
203-
@Override
204-
public Event<T> factory(long seq, T data, String key, long flag) {
205-
return new Event<>(seq, flag, key, data);
206-
}
207-
};
201+
@Nullable private Event.Bus<?> upstream;
202+
@NotNull Set<Bus<?>> downstream = new HashSet<>();
203+
@NotNull Queue<Listener<T>> listeners = new ConcurrentLinkedQueue<>();
204+
@Nullable private Function<@NotNull Event<?>, @Nullable Event<T>> function;
205+
@Nullable private Function<String, String> keyFunction;
206+
@Setter private Factory<T, ? extends Event<T>> factory = child(Factory.class).<Factory<T, ? extends Event<T>>>castRef()
207+
.orElseGet(() -> new Factory<>() {
208+
@Override
209+
public Event<T> factory(long seq, long flag, T data, String key) {
210+
return new Event<>(seq, flag, key, data, null);
211+
}
212+
});
208213
@Setter
209214
//private Executor executor = Context.wrap(Executor.class).orElseGet(()->Runnable::run);
210-
private Executor executor = Context.wrap(Executor.class)
215+
private Executor executor = Context.wrap(Executor.class)
211216
.orElseGet(() -> Debug.isDebug() ? Runnable::run : Executors.newFixedThreadPool(4));
212-
@Setter
213-
private boolean active = true;
214-
@Setter
215-
private String name = null;
217+
@Setter private boolean active = true;
218+
@Setter private String name = null;
216219

217220
public Bus() {
218221
this("EventBus @ " + caller(1));
@@ -250,12 +253,11 @@ public <I> Event.Bus<O> setJunction(
250253
*/
251254

252255
private <P> Bus(@NotNull String name, @Nullable Event.Bus<P> upstream, @Nullable Function<@NotNull Event<P>, @Nullable Event<T>> function) {
253-
this.name = name;
256+
this.name = name;
254257
this.upstream = upstream;
255-
this.function = Polyfill.uncheckedCast(function);
258+
this.function = uncheckedCast(function);
256259

257-
if (upstream != null)
258-
upstream.downstream.add(this);
260+
if (upstream != null) upstream.downstream.add(this);
259261
register(this);
260262
}
261263

@@ -268,7 +270,7 @@ private <P> Bus(@Nullable Event.Bus<P> upstream, @Nullable Function<@NotNull Eve
268270
}
269271

270272
@Contract(value = "_ -> this", mutates = "this")
271-
public Event.Bus<T> setUpstream(@NotNull Event.Bus<? extends T> parent) {
273+
public Bus<T> setUpstream(@NotNull Event.Bus<? extends T> parent) {
272274
return setUpstream(parent, Function.identity());
273275
}
274276

@@ -288,21 +290,18 @@ public Event.Bus<T> setUpstream(@NotNull Event.Bus<? extends T> parent) {
288290
var attribute = it.getAnnotation(Subscriber.class);
289291

290292
Invocable<?> delegate;
291-
if (it instanceof Method method)
292-
delegate = Invocable.ofMethodCall(target, method);
293+
if (it instanceof Method method) delegate = Invocable.ofMethodCall(target, method);
293294
else if (it instanceof Field field) {
294295
Collection<DataNode> dest = ReflectionHelper.forceGetField(target, field);
295296
delegate = Invocable.ofConsumer(attribute.type(), dest::add);
296297
} else throw new AssertionError();
297298

298-
var key = Optional.of(attribute.value())
299-
.filter(Predicate.not(Subscriber.EmptyName::equals))
300-
.orElseGet(it::getName);
299+
var key = Optional.of(attribute.value()).filter(Predicate.not(Subscriber.EmptyName::equals)).orElseGet(it::getName);
301300
return Stream.of(new SubscriberImpl(key, attribute.flag(), attribute.mode(), delegate));
302-
}).toList();
301+
})
302+
.toList();
303303

304-
if (subscribers.isEmpty())
305-
return null;
304+
if (subscribers.isEmpty()) return null;
306305
var listener = new SubscriberListener(target, subscribers);
307306
synchronized (listeners) {
308307
listeners.add(listener);
@@ -311,32 +310,25 @@ else if (it instanceof Field field) {
311310
}
312311

313312
@Contract(value = "_, _ -> this", mutates = "this")
314-
public <I> Bus<T> setUpstream(
315-
@NotNull Event.Bus<? extends I> parent,
316-
@NotNull Function<@NotNull Event<I>, @Nullable Event<T>> function
317-
) {
313+
public <I> Bus<T> setUpstream(@NotNull Event.Bus<? extends I> parent, @NotNull Function<@NotNull Event<I>, @Nullable Event<T>> function) {
318314
return setUpstream(parent, function, UnaryOperator.identity());
319315
}
320316

321317
@Contract(value = "_, _, _ -> this", mutates = "this")
322-
public <I> Event.Bus<T> setUpstream(
323-
@NotNull Event.Bus<? extends I> parent,
324-
@NotNull Function<@NotNull Event<I>, @Nullable Event<T>> function,
318+
public <I> Bus<T> setUpstream(
319+
@NotNull Event.Bus<? extends I> parent, @NotNull Function<@NotNull Event<I>, @Nullable Event<T>> function,
325320
@Nullable Function<String, String> keyFunction
326321
) {
327322
return setDependent(parent, function, keyFunction, true);
328323
}
329324

330325
private <I> Bus<T> setDependent(
331-
final @NotNull Bus<? extends I> parent,
332-
final @NotNull Function<@NotNull Event<I>, @Nullable Event<T>> function,
333-
final @Nullable Function<String, String> keyFunction,
334-
final boolean cleanup
326+
final @NotNull Bus<? extends I> parent, final @NotNull Function<@NotNull Event<I>, @Nullable Event<T>> function,
327+
final @Nullable Function<String, String> keyFunction, final boolean cleanup
335328
) {
336-
if (cleanup && this.upstream != null)
337-
this.upstream.downstream.remove(this);
329+
if (cleanup && this.upstream != null) this.upstream.downstream.remove(this);
338330
this.upstream = parent;
339-
this.function = Polyfill.uncheckedCast(function);
331+
this.function = uncheckedCast(function);
340332
this.keyFunction = keyFunction;
341333
this.upstream.downstream.add(this);
342334
return this;
@@ -354,41 +346,41 @@ public Listener<T> subscribe(final @NotNull Consumer<Event<T>> action) {
354346
return listen().subscribe(action);
355347
}
356348

357-
public Event.Bus.Filter<T> listen() {
349+
public Filter<T> listen() {
358350
return new Filter<>(this);
359351
}
360352

361-
public Event.Bus<T> peekData(final Consumer<@NotNull T> action) {
353+
public Bus<T> peekData(final Consumer<@NotNull T> action) {
362354
return filterData(it -> {
363355
action.accept(it);
364356
return true;
365357
});
366358
}
367359

368-
public Event.Bus<T> filterData(final Predicate<@NotNull T> predicate) {
360+
public Bus<T> filterData(final Predicate<@NotNull T> predicate) {
369361
return mapData(x -> predicate.test(x) ? x : null);
370362
}
371363

372-
public <R> Event.Bus<R> mapData(final @NotNull Function<T, @Nullable R> function) {
364+
public <R> Bus<R> mapData(final @NotNull Function<T, @Nullable R> function) {
373365
return map(e -> e.withDataBy(function));
374366
}
375367

376-
public <R> Event.Bus<R> map(final @NotNull Function<@NotNull Event<T>, @Nullable Event<R>> function) {
377-
return new Event.Bus<>(this, function);
368+
public <R> Bus<R> map(final @NotNull Function<@NotNull Event<T>, @Nullable Event<R>> function) {
369+
return new Bus<>(this, function);
378370
}
379371

380-
public <R extends T> Event.Bus<R> flatMap(final Class<R> type) {
372+
public <R extends T> Bus<R> flatMap(final Class<R> type) {
381373
return filterData(type::isInstance).mapData(type::cast);
382374
}
383375

384-
public Event.Bus<T> peek(final Consumer<Event<@NotNull T>> action) {
376+
public Bus<T> peek(final Consumer<Event<@NotNull T>> action) {
385377
return filter(it -> {
386378
action.accept(it);
387379
return true;
388380
});
389381
}
390382

391-
public Event.Bus<T> filter(final Predicate<Event<@NotNull T>> predicate) {
383+
public Bus<T> filter(final Predicate<Event<@NotNull T>> predicate) {
392384
return map(x -> predicate.test(x) ? x : null);
393385
}
394386

@@ -415,8 +407,7 @@ public Event<T> publish(@Nullable String key, @Nullable T data) {
415407

416408
@Builder(builderClassName = "Publisher", buildMethodName = "publish", builderMethodName = "publisher")
417409
public Event<T> publish(@Nullable String key, @Nullable Long flag_, @Nullable T data) {
418-
if (!active)
419-
return null;
410+
if (!active) return null;
420411
final var flag = flag_ == null ? Subscriber.DefaultFlag : flag_;
421412
final var event = factory.apply(data, key, flag);
422413
accept(event);
@@ -456,19 +447,16 @@ private void publish(Event<T> event) {
456447
synchronized (listeners) {
457448
//Collections.sort(listeners, Listener.Comparator);
458449
listeners.stream().sorted(Listener.Comparator).forEach(listener -> {
459-
if (listener.isActive() && !event.isCancelled() && listener.test(event))
460-
listener.accept(event);
450+
if (listener.isActive() && !event.isCancelled() && listener.test(event)) listener.accept(event);
461451
});
462452
}
463453
}
464454

465455
private <P> void $publishDownstream(final Event<P> data) {
466-
if (function == null)
467-
return;
456+
if (function == null) return;
468457
Function<@NotNull Event<P>, @Nullable Event<T>> func = uncheckedCast(function);
469458
var it = func.apply(data);
470-
if (it == null)
471-
return;
459+
if (it == null) return;
472460
accept(it);
473461
}
474462

@@ -481,9 +469,8 @@ private class SubscriberImpl implements Predicate<Event<T>>, BiConsumer<@Nullabl
481469

482470
@Override
483471
public boolean test(Event<T> event) {
484-
return Wrap.of(event.flag).or(() -> 0xffff_ffff_ffff_ffffL).testIfPresent(this::testFlag)
485-
&& (Objects.equals(key, event.key) || ("null".equals(key)
486-
&& (Objects.isNull(event.key) || Subscriber.EmptyName.equals(event.key))));
472+
return Wrap.of(event.flag).or(() -> 0xffff_ffff_ffff_ffffL).testIfPresent(this::testFlag) && (Objects.equals(key, event.key) || ("null".equals(
473+
key) && (Objects.isNull(event.key) || Subscriber.EmptyName.equals(event.key))));
487474
}
488475

489476
public boolean testFlag(long x) {
@@ -511,10 +498,7 @@ private class SubscriberListener extends Listener<T> {
511498
@Nullable Object target;
512499
Collection<SubscriberImpl> subscribers;
513500

514-
public SubscriberListener(
515-
@Nullable Object target,
516-
Collection<SubscriberImpl> subscribers
517-
) {
501+
public SubscriberListener(@Nullable Object target, Collection<SubscriberImpl> subscribers) {
518502
super(null, Bus.this, $ -> true, $ -> {});
519503
this.target = target;
520504
this.subscribers = subscribers;
@@ -523,8 +507,7 @@ public SubscriberListener(
523507
@Override
524508
public void accept(Event<T> event) {
525509
for (var subscriber : subscribers)
526-
if (subscriber.test(event))
527-
subscriber.accept(target, event);
510+
if (subscriber.test(event)) subscriber.accept(target, event);
528511
}
529512
}
530513

@@ -542,16 +525,16 @@ public Listener<T> subscribeData(final @NotNull Consumer<T> action) {
542525
}
543526

544527
public Listener<T> subscribe(final @NotNull Consumer<Event<T>> action) {
545-
var listener = new Event.Listener<>(key, bus, filters(), action);
528+
var listener = new Listener<>(key, bus, filters(), action);
546529
synchronized (bus.listeners) {
547530
bus.listeners.add(listener);
548531
}
549532
return listener;
550533
}
551534

552535
private Predicate<Event<T>> filters() {
553-
return ((Predicate<Event<T>>) (e -> key == null || Subscriber.EmptyName.equals(key) || Objects.equals(e.key, key)))
554-
.and(e -> flag == null || flag == Subscriber.DefaultFlag || Objects.equals(e.flag, flag))
536+
return ((Predicate<Event<T>>) (e -> key == null || Subscriber.EmptyName.equals(key) || Objects.equals(e.key,
537+
key))).and(e -> flag == null || flag == Subscriber.DefaultFlag || Objects.equals(e.flag, flag))
555538
.and(Objects.requireNonNullElse(predicate, $ -> true))
556539
.and(e -> type == null || e.testIfPresent(type::isInstance));
557540
}
@@ -560,8 +543,7 @@ public CompletableFuture<Event<T>> once() {
560543
final var future = new CompletableFuture<Event<T>>();
561544
final var filters = filters();
562545
final var listener = subscribe(e -> {
563-
if (future.isDone() || !filters.test(e))
564-
return;
546+
if (future.isDone() || !filters.test(e)) return;
565547
future.complete(e);
566548
});
567549
future.whenComplete((e, t) -> listener.close());

0 commit comments

Comments
 (0)