From 819ec3d7b300210d1566113dc19e06fcb774f42e Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Mon, 21 Jul 2025 17:40:18 +0530 Subject: [PATCH 1/6] [ECO-5458] Added public interfaces for map and counter subscriptions --- .../java/io/ably/lib/objects/LiveObjects.java | 2 + .../{ => type/counter}/LiveCounter.java | 2 +- .../type/counter/LiveCounterChange.java | 21 ++++++++++ .../type/counter/LiveCounterUpdate.java | 20 ++++++++++ .../lib/objects/{ => type/map}/LiveMap.java | 2 +- .../lib/objects/type/map/LiveMapChange.java | 21 ++++++++++ .../lib/objects/type/map/LiveMapUpdate.java | 39 +++++++++++++++++++ .../io/ably/lib/objects/DefaultLiveObjects.kt | 4 +- .../io/ably/lib/objects/ObjectsManager.kt | 3 +- .../type/livecounter/DefaultLiveCounter.kt | 1 + .../type/livecounter/LiveCounterManager.kt | 2 +- .../objects/type/livemap/DefaultLiveMap.kt | 1 + .../objects/type/livemap/LiveMapManager.kt | 2 +- .../integration/DefaultLiveCounterTest.kt | 4 +- .../objects/integration/DefaultLiveMapTest.kt | 2 + .../integration/DefaultLiveObjectsTest.kt | 2 + .../lib/objects/integration/helpers/Utils.kt | 4 +- 17 files changed, 122 insertions(+), 10 deletions(-) rename lib/src/main/java/io/ably/lib/objects/{ => type/counter}/LiveCounter.java (98%) create mode 100644 lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java create mode 100644 lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java rename lib/src/main/java/io/ably/lib/objects/{ => type/map}/LiveMap.java (99%) create mode 100644 lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java create mode 100644 lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjects.java b/lib/src/main/java/io/ably/lib/objects/LiveObjects.java index fff0344ca..bd1809f1d 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjects.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjects.java @@ -1,6 +1,8 @@ package io.ably.lib.objects; import io.ably.lib.objects.state.ObjectsStateChange; +import io.ably.lib.objects.type.counter.LiveCounter; +import io.ably.lib.objects.type.map.LiveMap; import io.ably.lib.types.Callback; import org.jetbrains.annotations.Blocking; import org.jetbrains.annotations.NonBlocking; diff --git a/lib/src/main/java/io/ably/lib/objects/LiveCounter.java b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java similarity index 98% rename from lib/src/main/java/io/ably/lib/objects/LiveCounter.java rename to lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java index fd44b853c..e23f750e8 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveCounter.java +++ b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java @@ -1,4 +1,4 @@ -package io.ably.lib.objects; +package io.ably.lib.objects.type.counter; import io.ably.lib.types.Callback; import org.jetbrains.annotations.Blocking; diff --git a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java new file mode 100644 index 000000000..654423f5e --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java @@ -0,0 +1,21 @@ +package io.ably.lib.objects.type.counter; + +import io.ably.lib.objects.ObjectsSubscription; +import org.jetbrains.annotations.NonBlocking; +import org.jetbrains.annotations.NotNull; + +public interface LiveCounterChange { + + @NonBlocking + @NotNull ObjectsSubscription subscribe(@NotNull Listener listener); + + @NonBlocking + void unsubscribe(@NotNull Listener listener); + + @NonBlocking + void unsubscribeAll(); + + interface Listener { + void onUpdated(@NotNull LiveCounterUpdate update); + } +} diff --git a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java new file mode 100644 index 000000000..323679e7e --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java @@ -0,0 +1,20 @@ +package io.ably.lib.objects.type.counter; + +import org.jetbrains.annotations.NotNull; + +/** + * Spec: RTLC11, RTLC11a + */ +public class LiveCounterUpdate { + @NotNull + private final Long amount; // RTLC11b, RTLC11b1 + + public LiveCounterUpdate(@NotNull Long amount) { + this.amount = amount; + } + + @NotNull + public Long getUpdate() { + return amount; + } +} diff --git a/lib/src/main/java/io/ably/lib/objects/LiveMap.java b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java similarity index 99% rename from lib/src/main/java/io/ably/lib/objects/LiveMap.java rename to lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java index cc297a401..28408b4e7 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveMap.java +++ b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java @@ -1,4 +1,4 @@ -package io.ably.lib.objects; +package io.ably.lib.objects.type.map; import io.ably.lib.types.Callback; import org.jetbrains.annotations.Blocking; diff --git a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java new file mode 100644 index 000000000..ff787a16f --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java @@ -0,0 +1,21 @@ +package io.ably.lib.objects.type.map; + +import io.ably.lib.objects.ObjectsSubscription; +import org.jetbrains.annotations.NonBlocking; +import org.jetbrains.annotations.NotNull; + +public interface LiveMapChange { + + @NonBlocking + @NotNull ObjectsSubscription subscribe(@NotNull Listener listener); + + @NonBlocking + void unsubscribe(@NotNull Listener listener); + + @NonBlocking + void unsubscribeAll(); + + interface Listener { + void onUpdated(@NotNull LiveMapUpdate update); + } +} diff --git a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java new file mode 100644 index 000000000..625310aab --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java @@ -0,0 +1,39 @@ +package io.ably.lib.objects.type.map; + +import org.jetbrains.annotations.NotNull; + +import java.util.Map; + +/** + * Spec: RTLM18, RTLM18a + */ +public class LiveMapUpdate { + + @NotNull + private final Map update; + + /** + * Constructor for LiveMapUpdate + * @param update The map of updates + */ + public LiveMapUpdate(@NotNull Map update) { + this.update = update; + } + + /** + * Get the map of updates + * @return The update map + */ + @NotNull + public Map getUpdate() { + return update; + } + + /** + * Spec: RTLM18b + */ + public enum Change { + UPDATED, + REMOVED + } +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt index f8e3d2ad0..e420166c0 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt @@ -2,6 +2,8 @@ package io.ably.lib.objects import io.ably.lib.objects.state.ObjectsStateChange import io.ably.lib.objects.state.ObjectsStateEvent +import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.map.LiveMap import io.ably.lib.realtime.ChannelState import io.ably.lib.types.Callback import io.ably.lib.types.ProtocolMessage @@ -167,7 +169,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val objectsManager.clearBufferedObjectOperations() // RTO4b5 // defer the state change event until the next tick if we started a new sequence just now due to being in initialized state. // this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop. - objectsManager.endSync(fromInitializedState) // RTO4b4 + objectsManager.endSync(fromInitializedState) // RTO4b4, RTO4b2a } } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt index a85bf2368..dd43bf530 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt @@ -125,6 +125,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje } val receivedObjectIds = mutableSetOf() + // RTO5c1a2 - List to collect updates for existing objects val existingObjectUpdates = mutableListOf>() // RTO5c1 @@ -148,7 +149,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje // RTO5c2 - need to remove LiveObject instances from the ObjectsPool for which objectIds were not received during the sync sequence liveObjects.objectsPool.deleteExtraObjectIds(receivedObjectIds) - // call subscription callbacks for all updated existing objects + // RTO5c7 - call subscription callbacks for all updated existing objects existingObjectUpdates.forEach { (obj, update) -> obj.notifyUpdated(update) } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index c40fda469..5477b95ba 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -5,6 +5,7 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.type.BaseLiveObject import io.ably.lib.objects.type.ObjectType +import io.ably.lib.objects.type.counter.LiveCounter import io.ably.lib.types.Callback import java.util.concurrent.atomic.AtomicLong diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt index ba77f53cf..ef7af3374 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt @@ -51,7 +51,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3 } - liveCounter.notifyUpdated(update) + liveCounter.notifyUpdated(update) // RTLC7d1a, RTLC7d2a } /** diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index c900a2e39..6343953d8 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -7,6 +7,7 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.type.BaseLiveObject import io.ably.lib.objects.type.ObjectType +import io.ably.lib.objects.type.map.LiveMap import io.ably.lib.types.Callback import java.util.AbstractMap import java.util.concurrent.ConcurrentHashMap diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt index 6a738d08b..75511b999 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt @@ -68,7 +68,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { else -> throw objectError("Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4 } - liveMap.notifyUpdated(update) + liveMap.notifyUpdated(update) // RTLM15d1a, RTLM15d2a, RTLM15d3a } /** diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt index 632ac9683..58bdbd410 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt @@ -1,7 +1,7 @@ package io.ably.lib.objects.integration -import io.ably.lib.objects.LiveCounter -import io.ably.lib.objects.LiveMap +import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.map.LiveMap import io.ably.lib.objects.assertWaiter import io.ably.lib.objects.integration.helpers.fixtures.createUserMapWithCountersObject import io.ably.lib.objects.integration.setup.IntegrationTest diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt index fabfa3f12..6940ed435 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt @@ -5,6 +5,8 @@ import io.ably.lib.objects.ObjectData import io.ably.lib.objects.ObjectValue import io.ably.lib.objects.integration.helpers.fixtures.createUserMapObject import io.ably.lib.objects.integration.setup.IntegrationTest +import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.map.LiveMap import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt index 7c96a75f6..75cdc2aef 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt @@ -9,6 +9,8 @@ import io.ably.lib.objects.integration.helpers.fixtures.initializeRootMap import io.ably.lib.objects.integration.setup.IntegrationTest import io.ably.lib.objects.size import io.ably.lib.objects.state.ObjectsStateEvent +import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.map.LiveMap import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt index e402347e7..f50c60cf9 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt @@ -1,8 +1,8 @@ package io.ably.lib.objects.integration.helpers import io.ably.lib.objects.DefaultLiveObjects -import io.ably.lib.objects.LiveCounter -import io.ably.lib.objects.LiveMap +import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.map.LiveMap import io.ably.lib.objects.LiveObjects import io.ably.lib.objects.type.livecounter.DefaultLiveCounter import io.ably.lib.objects.type.livemap.DefaultLiveMap From abdf4452fb4473049ab6275b893b02a26306239f Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 22 Jul 2025 17:32:36 +0530 Subject: [PATCH 2/6] [ECO-5458] Updated interfaces for mapUpdate and counterUpdate to inherit LiveObjectUpdate 1. Updated mapManager to return LiveMapUpdate instead of Map 2. Updated counterManager to return LiveCounterUpdate instead of map 3. Updated BaseLiveObject accordingly --- .../lib/objects/type/LiveObjectUpdate.java | 9 +++ .../lib/objects/type/counter/LiveCounter.java | 2 +- .../type/counter/LiveCounterUpdate.java | 31 ++++++++-- .../io/ably/lib/objects/type/map/LiveMap.java | 2 +- .../lib/objects/type/map/LiveMapUpdate.java | 12 ++-- .../io/ably/lib/objects/ObjectsManager.kt | 3 +- .../ably/lib/objects/type/BaseLiveObject.kt | 28 ++++++---- .../type/livecounter/DefaultLiveCounter.kt | 27 ++++++++- .../LiveCounterChangeCoordinator.kt | 44 +++++++++++++++ .../type/livecounter/LiveCounterManager.kt | 20 ++++--- .../objects/type/livemap/DefaultLiveMap.kt | 25 ++++++++- .../type/livemap/LiveMapChangeCoordinator.kt | 44 +++++++++++++++ .../objects/type/livemap/LiveMapManager.kt | 56 +++++++++++-------- .../integration/setup/IntegrationTest.kt | 2 +- .../unit/type/livemap/LiveMapManagerTest.kt | 29 +++++----- 15 files changed, 257 insertions(+), 77 deletions(-) create mode 100644 lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java create mode 100644 live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt create mode 100644 live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt diff --git a/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java new file mode 100644 index 000000000..7971a6a86 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java @@ -0,0 +1,9 @@ +package io.ably.lib.objects.type; + +public abstract class LiveObjectUpdate { + protected final Object update; + + protected LiveObjectUpdate(Object update) { + this.update = update; + } +} diff --git a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java index e23f750e8..38ee856d7 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java +++ b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java @@ -11,7 +11,7 @@ * It allows incrementing, decrementing, and retrieving the current value of the counter, * both synchronously and asynchronously. */ -public interface LiveCounter { +public interface LiveCounter extends LiveCounterChange { /** * Increments the value of the counter by 1. diff --git a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java index 323679e7e..1145d7fef 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java +++ b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java @@ -1,20 +1,39 @@ package io.ably.lib.objects.type.counter; +import io.ably.lib.objects.type.LiveObjectUpdate; import org.jetbrains.annotations.NotNull; /** * Spec: RTLC11, RTLC11a */ -public class LiveCounterUpdate { - @NotNull - private final Long amount; // RTLC11b, RTLC11b1 +public class LiveCounterUpdate extends LiveObjectUpdate { + + public LiveCounterUpdate() { + super(null); + } public LiveCounterUpdate(@NotNull Long amount) { - this.amount = amount; + super(new Update(amount)); } @NotNull - public Long getUpdate() { - return amount; + public LiveCounterUpdate.Update getUpdate() { + return (Update) update; + } + + /** + * Spec: RTLC11b, RTLC11b1 + */ + public static class Update { + @NotNull + private final Long amount; + + public Update(@NotNull Long amount) { + this.amount = amount; + } + + public @NotNull Long getAmount() { + return amount; + } } } diff --git a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java index 28408b4e7..4bbb49008 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java +++ b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java @@ -14,7 +14,7 @@ * The LiveMap interface provides methods to interact with a live, real-time map structure. * It supports both synchronous and asynchronous operations for managing key-value pairs. */ -public interface LiveMap { +public interface LiveMap extends LiveMapChange { /** * Retrieves the value associated with the specified key. diff --git a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java index 625310aab..5c0f24164 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java +++ b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java @@ -1,5 +1,6 @@ package io.ably.lib.objects.type.map; +import io.ably.lib.objects.type.LiveObjectUpdate; import org.jetbrains.annotations.NotNull; import java.util.Map; @@ -7,17 +8,18 @@ /** * Spec: RTLM18, RTLM18a */ -public class LiveMapUpdate { +public class LiveMapUpdate extends LiveObjectUpdate { - @NotNull - private final Map update; + public LiveMapUpdate() { + super(null); + } /** * Constructor for LiveMapUpdate * @param update The map of updates */ public LiveMapUpdate(@NotNull Map update) { - this.update = update; + super(update); } /** @@ -26,7 +28,7 @@ public LiveMapUpdate(@NotNull Map update) { */ @NotNull public Map getUpdate() { - return update; + return (Map) update; } /** diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt index dd43bf530..9f8fecb08 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt @@ -1,6 +1,7 @@ package io.ably.lib.objects import io.ably.lib.objects.type.BaseLiveObject +import io.ably.lib.objects.type.LiveObjectUpdate import io.ably.lib.objects.type.livecounter.DefaultLiveCounter import io.ably.lib.objects.type.livemap.DefaultLiveMap import io.ably.lib.util.Log @@ -126,7 +127,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje val receivedObjectIds = mutableSetOf() // RTO5c1a2 - List to collect updates for existing objects - val existingObjectUpdates = mutableListOf>() + val existingObjectUpdates = mutableListOf>() // RTO5c1 for ((objectId, objectState) in syncObjectsDataPool) { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt index 06231f618..b061b40bd 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt @@ -6,6 +6,8 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.ObjectsPoolDefaults import io.ably.lib.objects.objectError +import io.ably.lib.objects.type.livecounter.noOpCounterUpdate +import io.ably.lib.objects.type.livemap.noOpMapUpdate import io.ably.lib.util.Log internal enum class ObjectType(val value: String) { @@ -13,6 +15,8 @@ internal enum class ObjectType(val value: String) { Counter("counter") } +internal val LiveObjectUpdate.noOp get() = this.update == null + /** * Base implementation of LiveObject interface. * Provides common functionality for all live objects. @@ -43,7 +47,7 @@ internal abstract class BaseLiveObject( * * @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter */ - internal fun applyObjectSync(objectState: ObjectState): Map { + internal fun applyObjectSync(objectState: ObjectState): LiveObjectUpdate { if (objectState.objectId != objectId) { throw objectError("Invalid object state: object state objectId=${objectState.objectId}; $objectType objectId=$objectId") } @@ -61,7 +65,10 @@ internal abstract class BaseLiveObject( if (isTombstoned) { // this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of object state message processing - return mapOf() + if (objectType == ObjectType.Map) { + return noOpMapUpdate + } + return noOpCounterUpdate } return applyObjectState(objectState) // RTLM6, RTLC6 } @@ -102,11 +109,6 @@ internal abstract class BaseLiveObject( applyObjectOperation(objectOperation, objectMessage) // RTLC7d } - internal fun notifyUpdated(update: Any) { - // TODO: Implement event emission for updates - Log.v(tag, "Object $objectId updated: $update") - } - /** * Checks if an operation can be applied based on serial comparison. * @@ -126,7 +128,7 @@ internal abstract class BaseLiveObject( /** * Marks the object as tombstoned. */ - internal fun tombstone(): Any { + internal fun tombstone(): LiveObjectUpdate { isTombstoned = true tombstonedAt = System.currentTimeMillis() val update = clearData() @@ -151,7 +153,7 @@ internal abstract class BaseLiveObject( * @return A map describing the changes made to the object's data * */ - abstract fun applyObjectState(objectState: ObjectState): Map + abstract fun applyObjectState(objectState: ObjectState): LiveObjectUpdate /** * Applies an operation to this live object. @@ -177,7 +179,13 @@ internal abstract class BaseLiveObject( * * @return A map representing the diff of changes made */ - abstract fun clearData(): Map + abstract fun clearData(): LiveObjectUpdate + + /** + * Notifies subscribers about changes made to this live object. Propagates updates through the + * appropriate manager after converting the generic update map to type-specific update objects. + */ + abstract fun notifyUpdated(update: LiveObjectUpdate) /** * Called during garbage collection intervals to clean up expired entries. diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index 5477b95ba..dbc450195 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -4,9 +4,14 @@ import io.ably.lib.objects.* import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.type.BaseLiveObject +import io.ably.lib.objects.type.LiveObjectUpdate import io.ably.lib.objects.type.ObjectType import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.counter.LiveCounterChange +import io.ably.lib.objects.type.counter.LiveCounterUpdate +import io.ably.lib.objects.type.noOp import io.ably.lib.types.Callback +import io.ably.lib.util.Log import java.util.concurrent.atomic.AtomicLong /** @@ -50,12 +55,20 @@ internal class DefaultLiveCounter private constructor( TODO("Not yet implemented") } + override fun subscribe(listener: LiveCounterChange.Listener): ObjectsSubscription { + return liveCounterManager.subscribe(listener) + } + + override fun unsubscribe(listener: LiveCounterChange.Listener) = liveCounterManager.unsubscribe(listener) + + override fun unsubscribeAll() = liveCounterManager.unsubscribeAll() + override fun value(): Long { adapter.throwIfInvalidAccessApiConfiguration(channelName) return data.get() } - override fun applyObjectState(objectState: ObjectState): Map { + override fun applyObjectState(objectState: ObjectState): LiveCounterUpdate { return liveCounterManager.applyState(objectState) } @@ -63,8 +76,16 @@ internal class DefaultLiveCounter private constructor( liveCounterManager.applyOperation(operation) } - override fun clearData(): Map { - return mapOf("amount" to data.get()).apply { data.set(0) } + override fun clearData(): LiveCounterUpdate { + return LiveCounterUpdate(data.get()).apply { data.set(0) } + } + + override fun notifyUpdated(update: LiveObjectUpdate) { + if (update.noOp) { + return + } + Log.v(tag, "Object $objectId updated: $update") + liveCounterManager.notify(update as LiveCounterUpdate) } override fun onGCInterval() { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt new file mode 100644 index 000000000..49261f239 --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt @@ -0,0 +1,44 @@ +package io.ably.lib.objects.type.livecounter + +import io.ably.lib.objects.ObjectsSubscription +import io.ably.lib.objects.type.counter.LiveCounterChange +import io.ably.lib.objects.type.counter.LiveCounterUpdate +import io.ably.lib.util.EventEmitter +import io.ably.lib.util.Log + +internal val noOpCounterUpdate = LiveCounterUpdate() + +internal interface HandlesLiveCounterChange { + fun notify(update: LiveCounterUpdate) +} + +internal abstract class LiveCounterChangeCoordinator: LiveCounterChange, HandlesLiveCounterChange { + private val tag = "DefaultLiveCounterChangeCoordinator" + + private val counterChangeEmitter = LiveCounterChangeEmitter() + + override fun subscribe(listener: LiveCounterChange.Listener): ObjectsSubscription { + counterChangeEmitter.on(listener) + return ObjectsSubscription { + counterChangeEmitter.off(listener) + } + } + + override fun unsubscribe(listener: LiveCounterChange.Listener) = counterChangeEmitter.off(listener) + + override fun unsubscribeAll() = counterChangeEmitter.off() + + override fun notify(update: LiveCounterUpdate) = counterChangeEmitter.emit(update) +} + +private class LiveCounterChangeEmitter : EventEmitter() { + private val tag = "LiveCounterChangeEmitter" + + override fun apply(listener: LiveCounterChange.Listener?, event: LiveCounterUpdate?, vararg args: Any?) { + try { + listener?.onUpdated(event!!) + } catch (t: Throwable) { + Log.e(tag, "Error occurred while executing listener callback for event: $event", t) + } + } +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt index ef7af3374..f3aa88b97 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt @@ -5,9 +5,11 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectOperationAction import io.ably.lib.objects.ObjectState import io.ably.lib.objects.objectError +import io.ably.lib.objects.type.counter.LiveCounterUpdate import io.ably.lib.util.Log -internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { +internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter): LiveCounterChangeCoordinator() { + private val objectId = liveCounter.objectId private val tag = "LiveCounterManager" @@ -15,7 +17,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { /** * @spec RTLC6 - Overrides counter data with state from sync */ - internal fun applyState(objectState: ObjectState): Map { + internal fun applyState(objectState: ObjectState): LiveCounterUpdate { val previousData = liveCounter.data.get() if (objectState.tombstone) { @@ -31,7 +33,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { } } - return mapOf("amount" to (liveCounter.data.get() - previousData)) + return LiveCounterUpdate(liveCounter.data.get() - previousData) } /** @@ -57,7 +59,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { /** * @spec RTLC8 - Applies counter create operation */ - private fun applyCounterCreate(operation: ObjectOperation): Map { + private fun applyCounterCreate(operation: ObjectOperation): LiveCounterUpdate { if (liveCounter.createOperationIsMerged) { // RTLC8b // There can't be two different create operation for the same object id, because the object id @@ -67,7 +69,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { tag, "Skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=$objectId" ) - return mapOf() + return noOpCounterUpdate // RTLC8c } return mergeInitialDataFromCreateOperation(operation) // RTLC8c @@ -76,17 +78,17 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { /** * @spec RTLC9 - Applies counter increment operation */ - private fun applyCounterInc(counterOp: ObjectCounterOp): Map { + private fun applyCounterInc(counterOp: ObjectCounterOp): LiveCounterUpdate { val amount = counterOp.amount?.toLong() ?: 0 val previousValue = liveCounter.data.get() liveCounter.data.set(previousValue + amount) // RTLC9b - return mapOf("amount" to amount) + return LiveCounterUpdate(amount) } /** * @spec RTLC10 - Merges initial data from create operation */ - private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): Map { + private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): LiveCounterUpdate { // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. // note that it is intentional to SUM the incoming count from the create op. // if we got here, it means that current counter instance is missing the initial value in its data reference, @@ -95,6 +97,6 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { val previousValue = liveCounter.data.get() liveCounter.data.set(previousValue + count) // RTLC10a liveCounter.createOperationIsMerged = true // RTLC10b - return mapOf("amount" to count) + return LiveCounterUpdate(count) } } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index 6343953d8..9b4d1f1f2 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -6,9 +6,14 @@ import io.ably.lib.objects.ObjectMessage import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.type.BaseLiveObject +import io.ably.lib.objects.type.LiveObjectUpdate import io.ably.lib.objects.type.ObjectType import io.ably.lib.objects.type.map.LiveMap +import io.ably.lib.objects.type.map.LiveMapChange +import io.ably.lib.objects.type.map.LiveMapUpdate +import io.ably.lib.objects.type.noOp import io.ably.lib.types.Callback +import io.ably.lib.util.Log import java.util.AbstractMap import java.util.concurrent.ConcurrentHashMap @@ -102,7 +107,15 @@ internal class DefaultLiveMap private constructor( TODO("Not yet implemented") } - override fun applyObjectState(objectState: ObjectState): Map { + override fun subscribe(listener: LiveMapChange.Listener): ObjectsSubscription { + return liveMapManager.subscribe(listener) + } + + override fun unsubscribe(listener: LiveMapChange.Listener) = liveMapManager.unsubscribe(listener) + + override fun unsubscribeAll() = liveMapManager.unsubscribeAll() + + override fun applyObjectState(objectState: ObjectState): LiveMapUpdate { return liveMapManager.applyState(objectState) } @@ -110,11 +123,19 @@ internal class DefaultLiveMap private constructor( liveMapManager.applyOperation(operation, message.serial) } - override fun clearData(): Map { + override fun clearData(): LiveMapUpdate { return liveMapManager.calculateUpdateFromDataDiff(data.toMap(), emptyMap()) .apply { data.clear() } } + override fun notifyUpdated(update: LiveObjectUpdate) { + if (update.noOp) { + return + } + Log.v(tag, "Object $objectId updated: $update") + liveMapManager.notify(update as LiveMapUpdate) + } + override fun onGCInterval() { data.entries.removeIf { (_, entry) -> entry.isEligibleForGc() } } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt new file mode 100644 index 000000000..87cda95b8 --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt @@ -0,0 +1,44 @@ +package io.ably.lib.objects.type.livemap + +import io.ably.lib.objects.ObjectsSubscription +import io.ably.lib.objects.type.map.LiveMapChange +import io.ably.lib.objects.type.map.LiveMapUpdate +import io.ably.lib.util.EventEmitter +import io.ably.lib.util.Log + +internal val noOpMapUpdate = LiveMapUpdate() + +internal interface HandlesLiveMapChange { + fun notify(update: LiveMapUpdate) +} + +internal abstract class LiveMapChangeCoordinator: LiveMapChange, HandlesLiveMapChange { + private val tag = "DefaultLiveMapChangeCoordinator" + + private val mapChangeEmitter = LiveMapChangeEmitter() + + override fun subscribe(listener: LiveMapChange.Listener): ObjectsSubscription { + mapChangeEmitter.on(listener) + return ObjectsSubscription { + mapChangeEmitter.off(listener) + } + } + + override fun unsubscribe(listener: LiveMapChange.Listener) = mapChangeEmitter.off(listener) + + override fun unsubscribeAll() = mapChangeEmitter.off() + + override fun notify(update: LiveMapUpdate) = mapChangeEmitter.emit(update) +} + +private class LiveMapChangeEmitter : EventEmitter() { + private val tag = "LiveMapChangeEmitter" + + override fun apply(listener: LiveMapChange.Listener?, event: LiveMapUpdate?, vararg args: Any?) { + try { + listener?.onUpdated(event!!) + } catch (t: Throwable) { + Log.e(tag, "Error occurred while executing listener callback for event: $event", t) + } + } +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt index 75511b999..51db6cf58 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt @@ -6,9 +6,12 @@ import io.ably.lib.objects.ObjectOperationAction import io.ably.lib.objects.ObjectState import io.ably.lib.objects.isInvalid import io.ably.lib.objects.objectError +import io.ably.lib.objects.type.map.LiveMapUpdate +import io.ably.lib.objects.type.noOp import io.ably.lib.util.Log -internal class LiveMapManager(private val liveMap: DefaultLiveMap) { +internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChangeCoordinator() { + private val objectId = liveMap.objectId private val tag = "LiveMapManager" @@ -16,7 +19,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { /** * @spec RTLM6 - Overrides object data with state from sync */ - internal fun applyState(objectState: ObjectState): Map { + internal fun applyState(objectState: ObjectState): LiveMapUpdate { val previousData = liveMap.data.toMap() if (objectState.tombstone) { @@ -74,7 +77,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { /** * @spec RTLM16 - Applies map create operation */ - private fun applyMapCreate(operation: ObjectOperation): Map { + private fun applyMapCreate(operation: ObjectOperation): LiveMapUpdate { if (liveMap.createOperationIsMerged) { // RTLM16b // There can't be two different create operation for the same object id, because the object id @@ -84,7 +87,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { tag, "Skipping applying MAP_CREATE op on a map instance as it was already applied before; objectId=${objectId}" ) - return mapOf() + return noOpMapUpdate } if (liveMap.semantics != operation.map?.semantics) { @@ -103,7 +106,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { private fun applyMapSet( mapOp: ObjectMapOp, // RTLM7d1 timeSerial: String?, // RTLM7d2 - ): Map { + ): LiveMapUpdate { val existingEntry = liveMap.data[mapOp.key] // RTLM7a @@ -113,7 +116,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { "Skipping update for key=\"${mapOp.key}\": op serial $timeSerial <= entry serial ${existingEntry.timeserial};" + " objectId=${objectId}" ) - return mapOf() + return noOpMapUpdate } if (mapOp.data.isInvalid()) { @@ -146,7 +149,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { ) } - return mapOf(mapOp.key to "updated") + return LiveMapUpdate(mapOf(mapOp.key to LiveMapUpdate.Change.UPDATED)) } /** @@ -155,7 +158,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { private fun applyMapRemove( mapOp: ObjectMapOp, // RTLM8c1 timeSerial: String?, // RTLM8c2 - ): Map { + ): LiveMapUpdate { val existingEntry = liveMap.data[mapOp.key] // RTLM8a @@ -166,7 +169,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { "Skipping remove for key=\"${mapOp.key}\": op serial $timeSerial <= entry serial ${existingEntry.timeserial}; " + "objectId=${objectId}" ) - return mapOf() + return noOpMapUpdate } if (existingEntry != null) { @@ -186,7 +189,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { ) } - return mapOf(mapOp.key to "removed") + return LiveMapUpdate(mapOf(mapOp.key to LiveMapUpdate.Change.REMOVED)) } /** @@ -210,12 +213,12 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { /** * @spec RTLM17 - Merges initial data from create operation */ - private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): Map { + private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): LiveMapUpdate { if (operation.map?.entries.isNullOrEmpty()) { // no map entries in MAP_CREATE op - return mapOf() + return noOpMapUpdate } - val aggregatedUpdate = mutableMapOf() + val aggregatedUpdate = mutableListOf() // RTLM17a // in order to apply MAP_CREATE op for an existing map, we should merge their underlying entries keys. @@ -232,25 +235,30 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { } // skip noop updates - if (update.isEmpty()) { + if (update.noOp) { return@forEach } - aggregatedUpdate.putAll(update) + aggregatedUpdate.add(update) } liveMap.createOperationIsMerged = true // RTLM17b - return aggregatedUpdate + return LiveMapUpdate( + aggregatedUpdate.map { it.update }.fold(emptyMap()) { acc, map -> acc + map } + ) } - internal fun calculateUpdateFromDataDiff(prevData: Map, newData: Map): Map { - val update = mutableMapOf() + internal fun calculateUpdateFromDataDiff( + prevData: Map, + newData: Map + ): LiveMapUpdate { + val update = mutableMapOf() // Check for removed entries for ((key, prevEntry) in prevData) { if (!prevEntry.isTombstoned && !newData.containsKey(key)) { - update[key] = "removed" + update[key] = LiveMapUpdate.Change.REMOVED } } @@ -259,7 +267,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { if (!prevData.containsKey(key)) { // if property does not exist in current map, but new data has it as non-tombstoned property - got updated if (!newEntry.isTombstoned) { - update[key] = "updated" + update[key] = LiveMapUpdate.Change.UPDATED } // otherwise, if new data has this prop tombstoned - do nothing, as property didn't exist anyway continue @@ -271,12 +279,12 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { // compare tombstones first if (prevEntry.isTombstoned && !newEntry.isTombstoned) { // prev prop is tombstoned, but new is not. it means prop was updated to a meaningful value - update[key] = "updated" + update[key] = LiveMapUpdate.Change.UPDATED continue } if (!prevEntry.isTombstoned && newEntry.isTombstoned) { // prev prop is not tombstoned, but new is. it means prop was removed - update[key] = "removed" + update[key] = LiveMapUpdate.Change.REMOVED continue } if (prevEntry.isTombstoned && newEntry.isTombstoned) { @@ -287,11 +295,11 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { // both props exist and are not tombstoned, need to compare values to see if it was changed val valueChanged = prevEntry.data != newEntry.data if (valueChanged) { - update[key] = "updated" + update[key] = LiveMapUpdate.Change.UPDATED continue } } - return update + return LiveMapUpdate(update) } } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt index b79f24a04..6cad20508 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt @@ -22,7 +22,7 @@ abstract class IntegrationTest { @JvmField @Rule - val timeout: Timeout = Timeout.seconds(10) + val timeout: Timeout = Timeout.seconds(15) private val realtimeClients = mutableMapOf() diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt index d16934f6f..92d101fa7 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt @@ -3,6 +3,7 @@ package io.ably.lib.objects.unit.type.livemap import io.ably.lib.objects.* import io.ably.lib.objects.type.livemap.LiveMapEntry import io.ably.lib.objects.type.livemap.LiveMapManager +import io.ably.lib.objects.type.map.LiveMapUpdate import io.mockk.mockk import org.junit.Test import org.junit.Assert.* @@ -17,7 +18,7 @@ class LiveMapManagerTest { val prevData1 = mapOf() val newData1 = mapOf() val result1 = livemapManager.calculateUpdateFromDataDiff(prevData1, newData1) - assertEquals("Should return empty map for no changes", emptyMap(), result1) + assertEquals("Should return empty map for no changes", emptyMap(), result1.update) // Test case 2: Entry added val prevData2 = mapOf() @@ -29,7 +30,7 @@ class LiveMapManagerTest { ) ) val result2 = livemapManager.calculateUpdateFromDataDiff(prevData2, newData2) - assertEquals("Should detect added entry", mapOf("key1" to "updated"), result2) + assertEquals("Should detect added entry", mapOf("key1" to LiveMapUpdate.Change.UPDATED), result2.update) // Test case 3: Entry removed val prevData3 = mapOf( @@ -41,7 +42,7 @@ class LiveMapManagerTest { ) val newData3 = mapOf() val result3 = livemapManager.calculateUpdateFromDataDiff(prevData3, newData3) - assertEquals("Should detect removed entry", mapOf("key1" to "removed"), result3) + assertEquals("Should detect removed entry", mapOf("key1" to LiveMapUpdate.Change.REMOVED), result3.update) // Test case 4: Entry updated val prevData4 = mapOf( @@ -59,7 +60,7 @@ class LiveMapManagerTest { ) ) val result4 = livemapManager.calculateUpdateFromDataDiff(prevData4, newData4) - assertEquals("Should detect updated entry", mapOf("key1" to "updated"), result4) + assertEquals("Should detect updated entry", mapOf("key1" to LiveMapUpdate.Change.UPDATED), result4.update) // Test case 5: Entry tombstoned val prevData5 = mapOf( @@ -77,7 +78,7 @@ class LiveMapManagerTest { ) ) val result5 = livemapManager.calculateUpdateFromDataDiff(prevData5, newData5) - assertEquals("Should detect tombstoned entry", mapOf("key1" to "removed"), result5) + assertEquals("Should detect tombstoned entry", mapOf("key1" to LiveMapUpdate.Change.REMOVED), result5.update) // Test case 6: Entry untombstoned val prevData6 = mapOf( @@ -95,7 +96,7 @@ class LiveMapManagerTest { ) ) val result6 = livemapManager.calculateUpdateFromDataDiff(prevData6, newData6) - assertEquals("Should detect untombstoned entry", mapOf("key1" to "updated"), result6) + assertEquals("Should detect untombstoned entry", mapOf("key1" to LiveMapUpdate.Change.UPDATED), result6.update) // Test case 7: Both entries tombstoned (noop) val prevData7 = mapOf( @@ -113,7 +114,7 @@ class LiveMapManagerTest { ) ) val result7 = livemapManager.calculateUpdateFromDataDiff(prevData7, newData7) - assertEquals("Should not detect change for both tombstoned entries", emptyMap(), result7) + assertEquals("Should not detect change for both tombstoned entries", emptyMap(), result7.update) // Test case 8: New tombstoned entry (noop) val prevData8 = mapOf() @@ -125,7 +126,7 @@ class LiveMapManagerTest { ) ) val result8 = livemapManager.calculateUpdateFromDataDiff(prevData8, newData8) - assertEquals("Should not detect change for new tombstoned entry", emptyMap(), result8) + assertEquals("Should not detect change for new tombstoned entry", emptyMap(), result8.update) // Test case 9: Multiple changes val prevData9 = mapOf( @@ -154,11 +155,11 @@ class LiveMapManagerTest { ) val result9 = livemapManager.calculateUpdateFromDataDiff(prevData9, newData9) val expected9 = mapOf( - "key1" to "updated", - "key2" to "removed", - "key3" to "updated" + "key1" to LiveMapUpdate.Change.UPDATED, + "key2" to LiveMapUpdate.Change.REMOVED, + "key3" to LiveMapUpdate.Change.UPDATED ) - assertEquals("Should detect multiple changes correctly", expected9, result9) + assertEquals("Should detect multiple changes correctly", expected9, result9.update) // Test case 10: ObjectId references val prevData10 = mapOf( @@ -176,7 +177,7 @@ class LiveMapManagerTest { ) ) val result10 = livemapManager.calculateUpdateFromDataDiff(prevData10, newData10) - assertEquals("Should detect objectId change", mapOf("key1" to "updated"), result10) + assertEquals("Should detect objectId change", mapOf("key1" to LiveMapUpdate.Change.UPDATED), result10.update) // Test case 11: Same data, no change val prevData11 = mapOf( @@ -194,6 +195,6 @@ class LiveMapManagerTest { ) ) val result11 = livemapManager.calculateUpdateFromDataDiff(prevData11, newData11) - assertEquals("Should not detect change for same data", emptyMap(), result11) + assertEquals("Should not detect change for same data", emptyMap(), result11.update) } } From 9989a1efabd8df8c638f445dd204f1ffbcaae14f Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Tue, 22 Jul 2025 20:24:41 +0530 Subject: [PATCH 3/6] [ECO-5458] Added missing doc for subscription specific public interfaces - Added toString methods to pretty print update --- .../lib/objects/type/LiveObjectUpdate.java | 16 ++++++- .../type/counter/LiveCounterChange.java | 31 +++++++++++++ .../type/counter/LiveCounterUpdate.java | 46 ++++++++++++++++++- .../lib/objects/type/map/LiveMapChange.java | 31 +++++++++++++ .../lib/objects/type/map/LiveMapUpdate.java | 37 ++++++++++++--- .../LiveCounterChangeCoordinator.kt | 10 +++- .../type/livemap/LiveMapChangeCoordinator.kt | 10 +++- 7 files changed, 168 insertions(+), 13 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java index 7971a6a86..9130a1234 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java +++ b/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java @@ -1,9 +1,23 @@ package io.ably.lib.objects.type; +import org.jetbrains.annotations.Nullable; + +/** + * Abstract base class for all LiveObject update notifications. + * Provides common structure for updates that occur on LiveMap and LiveCounter objects. + * Contains the update data that describes what changed in the live object. + */ public abstract class LiveObjectUpdate { + /** The update data containing details about the change that occurred */ + @Nullable protected final Object update; - protected LiveObjectUpdate(Object update) { + /** + * Creates a LiveObjectUpdate with the specified update data. + * + * @param update the data describing the change, or null for no-op updates + */ + protected LiveObjectUpdate(@Nullable Object update) { this.update = update; } } diff --git a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java index 654423f5e..0ac70552c 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java +++ b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java @@ -4,18 +4,49 @@ import org.jetbrains.annotations.NonBlocking; import org.jetbrains.annotations.NotNull; +/** + * Provides methods to subscribe to real-time updates on LiveCounter objects. + * Enables clients to receive notifications when counter values change due to + * operations performed by any client connected to the same channel. + */ public interface LiveCounterChange { + /** + * Subscribes to real-time updates on this LiveCounter object. + * Multiple listeners can be subscribed to the same object independently. + * + * @param listener the listener to be notified of counter updates + * @return an ObjectsSubscription for managing this specific listener + */ @NonBlocking @NotNull ObjectsSubscription subscribe(@NotNull Listener listener); + /** + * Unsubscribes a specific listener from receiving updates. + * Has no effect if the listener is not currently subscribed. + * + * @param listener the listener to be unsubscribed + */ @NonBlocking void unsubscribe(@NotNull Listener listener); + /** + * Unsubscribes all listeners from receiving updates. + * No notifications will be delivered until new listeners are subscribed. + */ @NonBlocking void unsubscribeAll(); + /** + * Listener interface for receiving LiveCounter updates. + */ interface Listener { + /** + * Called when the LiveCounter has been updated. + * Should execute quickly as it's called from the real-time processing thread. + * + * @param update details about the counter change + */ void onUpdated(@NotNull LiveCounterUpdate update); } } diff --git a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java index 1145d7fef..240d38445 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java +++ b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java @@ -4,34 +4,76 @@ import org.jetbrains.annotations.NotNull; /** - * Spec: RTLC11, RTLC11a + * Represents an update that occurred on a LiveCounter object. + * Contains information about counter value changes from increment/decrement operations. + * Updates can represent positive changes (increments) or negative changes (decrements). + * + * @spec RTLC11, RTLC11a - LiveCounter update structure and behavior */ public class LiveCounterUpdate extends LiveObjectUpdate { + /** + * Creates a no-op LiveCounterUpdate representing no actual change. + */ public LiveCounterUpdate() { super(null); } + /** + * Creates a LiveCounterUpdate with the specified amount change. + * + * @param amount the amount by which the counter changed (positive = increment, negative = decrement) + */ public LiveCounterUpdate(@NotNull Long amount) { super(new Update(amount)); } + /** + * Gets the update information containing the amount of change. + * + * @return the Update object with the counter modification amount + */ @NotNull public LiveCounterUpdate.Update getUpdate() { return (Update) update; } /** - * Spec: RTLC11b, RTLC11b1 + * Returns a string representation of this LiveCounterUpdate. + * + * @return a string showing the amount of change to the counter + */ + @Override + public String toString() { + if (update == null) { + return "LiveCounterUpdate{no change}"; + } + return "LiveCounterUpdate{amount=" + getUpdate().getAmount() + "}"; + } + + /** + * Contains the specific details of a counter update operation. + * + * @spec RTLC11b, RTLC11b1 - Counter update data structure */ public static class Update { @NotNull private final Long amount; + /** + * Creates an Update with the specified amount. + * + * @param amount the counter change amount (positive = increment, negative = decrement) + */ public Update(@NotNull Long amount) { this.amount = amount; } + /** + * Gets the amount by which the counter value was modified. + * + * @return the change amount (positive for increments, negative for decrements) + */ public @NotNull Long getAmount() { return amount; } diff --git a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java index ff787a16f..f39790af3 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java +++ b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java @@ -4,18 +4,49 @@ import org.jetbrains.annotations.NonBlocking; import org.jetbrains.annotations.NotNull; +/** + * Provides methods to subscribe to real-time updates on LiveMap objects. + * Enables clients to receive notifications when map entries are added, updated, or removed. + * Uses last-write-wins conflict resolution when multiple clients modify the same key. + */ public interface LiveMapChange { + /** + * Subscribes to real-time updates on this LiveMap object. + * Multiple listeners can be subscribed to the same object independently. + * + * @param listener the listener to be notified of map updates + * @return an ObjectsSubscription for managing this specific listener + */ @NonBlocking @NotNull ObjectsSubscription subscribe(@NotNull Listener listener); + /** + * Unsubscribes a specific listener from receiving updates. + * Has no effect if the listener is not currently subscribed. + * + * @param listener the listener to be unsubscribed + */ @NonBlocking void unsubscribe(@NotNull Listener listener); + /** + * Unsubscribes all listeners from receiving updates. + * No notifications will be delivered until new listeners are subscribed. + */ @NonBlocking void unsubscribeAll(); + /** + * Listener interface for receiving LiveMap updates. + */ interface Listener { + /** + * Called when the LiveMap has been updated. + * Should execute quickly as it's called from the real-time processing thread. + * + * @param update details about which keys were modified and how + */ void onUpdated(@NotNull LiveMapUpdate update); } } diff --git a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java index 5c0f24164..5d753cd5c 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java +++ b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java @@ -6,25 +6,33 @@ import java.util.Map; /** - * Spec: RTLM18, RTLM18a + * Represents an update that occurred on a LiveMap object. + * Contains information about which keys were modified and whether they were updated or removed. + * + * @spec RTLM18, RTLM18a - LiveMap update structure and behavior */ public class LiveMapUpdate extends LiveObjectUpdate { + /** + * Creates a no-op LiveMapUpdate representing no actual change. + */ public LiveMapUpdate() { super(null); } /** - * Constructor for LiveMapUpdate - * @param update The map of updates + * Creates a LiveMapUpdate with the specified key changes. + * + * @param update map of key names to their change types (UPDATED or REMOVED) */ public LiveMapUpdate(@NotNull Map update) { super(update); } /** - * Get the map of updates - * @return The update map + * Gets the map of key changes that occurred in this update. + * + * @return map of key names to their change types */ @NotNull public Map getUpdate() { @@ -32,10 +40,27 @@ public Map getUpdate() { } /** - * Spec: RTLM18b + * Returns a string representation of this LiveMapUpdate. + * + * @return a string showing the map key changes in this update + */ + @Override + public String toString() { + if (update == null) { + return "LiveMapUpdate{no change}"; + } + return "LiveMapUpdate{changes=" + getUpdate() + "}"; + } + + /** + * Indicates the type of change that occurred to a map key. + * + * @spec RTLM18b - Map change types */ public enum Change { + /** The key was added or its value was modified */ UPDATED, + /** The key was removed from the map */ REMOVED } } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt index 49261f239..02cdded56 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt @@ -8,13 +8,19 @@ import io.ably.lib.util.Log internal val noOpCounterUpdate = LiveCounterUpdate() +/** + * Interface for handling live counter changes by notifying subscribers of updates. + * Implementations typically propagate updates through event emission to registered listeners. + */ internal interface HandlesLiveCounterChange { + /** + * Notifies all registered listeners about a counter update by propagating the change through the event system. + * This method is called when counter data changes and triggers the emission of update events to subscribers. + */ fun notify(update: LiveCounterUpdate) } internal abstract class LiveCounterChangeCoordinator: LiveCounterChange, HandlesLiveCounterChange { - private val tag = "DefaultLiveCounterChangeCoordinator" - private val counterChangeEmitter = LiveCounterChangeEmitter() override fun subscribe(listener: LiveCounterChange.Listener): ObjectsSubscription { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt index 87cda95b8..a2c368b00 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt @@ -8,13 +8,19 @@ import io.ably.lib.util.Log internal val noOpMapUpdate = LiveMapUpdate() +/** + * Interface for handling live map changes by notifying subscribers of updates. + * Implementations typically propagate updates through event emission to registered listeners. + */ internal interface HandlesLiveMapChange { + /** + * Notifies all registered listeners about a map update by propagating the change through the event system. + * This method is called when map data changes and triggers the emission of update events to subscribers. + */ fun notify(update: LiveMapUpdate) } internal abstract class LiveMapChangeCoordinator: LiveMapChange, HandlesLiveMapChange { - private val tag = "DefaultLiveMapChangeCoordinator" - private val mapChangeEmitter = LiveMapChangeEmitter() override fun subscribe(listener: LiveMapChange.Listener): ObjectsSubscription { From f0b64931ccbaa251db268ddc8fed5e313d57c9e7 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Wed, 23 Jul 2025 17:10:24 +0530 Subject: [PATCH 4/6] [ECO-5458] Added integration tests for object-subscriptions - Fixed integration tests for LiveMapManager based on returned result --- .../integration/DefaultLiveCounterTest.kt | 72 ++++++++++++++ .../objects/integration/DefaultLiveMapTest.kt | 93 +++++++++++++++++++ .../integration/helpers/RestObjects.kt | 4 +- .../helpers/fixtures/CounterFixtures.kt | 41 ++++++-- .../helpers/fixtures/MapFixtures.kt | 49 +++++++--- .../unit/type/livemap/LiveMapManagerTest.kt | 20 ++-- 6 files changed, 247 insertions(+), 32 deletions(-) diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt index 9403b00c3..bac176d3f 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt @@ -3,12 +3,15 @@ package io.ably.lib.objects.integration import io.ably.lib.objects.type.counter.LiveCounter import io.ably.lib.objects.type.map.LiveMap import io.ably.lib.objects.assertWaiter +import io.ably.lib.objects.integration.helpers.ObjectId +import io.ably.lib.objects.integration.helpers.fixtures.createUserEngagementMatrixMap import io.ably.lib.objects.integration.helpers.fixtures.createUserMapWithCountersObject import io.ably.lib.objects.integration.setup.IntegrationTest import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals import kotlin.test.assertNotNull +import kotlin.test.assertTrue class DefaultLiveCounterTest: IntegrationTest() { /** @@ -202,4 +205,73 @@ class DefaultLiveCounterTest: IntegrationTest() { assertNotNull(finalCounterCheck, "Counter should still be accessible from root map") assertEquals(30.0, finalCounterCheck.value(), "Final counter value should be 30 when accessed from root map") } + + @Test + fun testLiveCounterChangesUsingSubscription() = runTest { + val channelName = generateChannelName() + val userEngagementMapId = restObjects.createUserEngagementMatrixMap(channelName) + restObjects.setMapRef(channelName, "root", "userMatrix", userEngagementMapId) + + val channel = getRealtimeChannel(channelName) + val rootMap = channel.objects.root + + val userEngagementMap = rootMap.get("userMatrix") as LiveMap + assertEquals(4L, userEngagementMap.size(), "User engagement map should contain 4 top-level entries") + + val totalReactions = userEngagementMap.get("totalReactions") as LiveCounter + assertEquals(189.0, totalReactions.value(), "Total reactions counter should have initial value of 189") + + // Subscribe to changes on the totalReactions counter + val counterUpdates = mutableListOf() + val totalReactionsSubscription = totalReactions.subscribe { update -> + counterUpdates.add(update.update.amount) + } + + // Step 1: Increment the totalReactions counter by 10 (189 + 10 = 199) + restObjects.incrementCounter(channelName, totalReactions.ObjectId, 10.0) + + // Wait for the update to be received + assertWaiter { counterUpdates.isNotEmpty() } + + // Verify the increment update was received + assertEquals(1, counterUpdates.size, "Should receive one update for increment") + assertEquals(10.0, counterUpdates.first(), "Update should contain increment amount of 10") + assertEquals(199.0, totalReactions.value(), "Counter should be incremented to 199") + + // Step 2: Decrement the totalReactions counter by 5 (199 - 5 = 194) + counterUpdates.clear() + restObjects.decrementCounter(channelName, totalReactions.ObjectId, 5.0) + + // Wait for the second update + assertWaiter { counterUpdates.isNotEmpty() } + + // Verify the decrement update was received + assertEquals(1, counterUpdates.size, "Should receive one update for decrement") + assertEquals(-5.0, counterUpdates.first(), "Update should contain decrement amount of -5") + assertEquals(194.0, totalReactions.value(), "Counter should be decremented to 194") + + // Step 3: Increment the totalReactions counter by 15 (194 + 15 = 209) + counterUpdates.clear() + restObjects.incrementCounter(channelName, totalReactions.ObjectId, 15.0) + + // Wait for the third update + assertWaiter { counterUpdates.isNotEmpty() } + + // Verify the third increment update was received + assertEquals(1, counterUpdates.size, "Should receive one update for third increment") + assertEquals(15.0, counterUpdates.first(), "Update should contain increment amount of 15") + assertEquals(209.0, totalReactions.value(), "Counter should be incremented to 209") + + // Clean up subscription + counterUpdates.clear() + totalReactionsSubscription.unsubscribe() + + // No updates should be received after unsubscribing + restObjects.incrementCounter(channelName, totalReactions.ObjectId, 20.0) + + // Wait for a moment to ensure no updates are received + assertWaiter { totalReactions.value() == 229.0 } + + assertTrue(counterUpdates.isEmpty(), "No updates should be received after unsubscribing") + } } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt index 5face26e7..ad265f5af 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt @@ -4,14 +4,17 @@ import io.ably.lib.objects.* import io.ably.lib.objects.ObjectData import io.ably.lib.objects.ObjectValue import io.ably.lib.objects.integration.helpers.fixtures.createUserMapObject +import io.ably.lib.objects.integration.helpers.fixtures.createUserProfileMapObject import io.ably.lib.objects.integration.setup.IntegrationTest import io.ably.lib.objects.type.counter.LiveCounter import io.ably.lib.objects.type.map.LiveMap +import io.ably.lib.objects.type.map.LiveMapUpdate import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertNull +import kotlin.test.assertTrue class DefaultLiveMapTest: IntegrationTest() { /** @@ -212,4 +215,94 @@ class DefaultLiveMapTest: IntegrationTest() { val finalValues = testMap.values().toSet() assertEquals(setOf("Bob", false, "bob@example.com"), finalValues, "Final values should match expected set") } + + @Test + fun testLiveMapChangesUsingSubscription() = runTest { + val channelName = generateChannelName() + val userProfileObjectId = restObjects.createUserProfileMapObject(channelName) + restObjects.setMapRef(channelName, "root", "userProfile", userProfileObjectId) + + val channel = getRealtimeChannel(channelName) + val rootMap = channel.objects.root + + // Get the user profile map object from the root map + val userProfile = rootMap.get("userProfile") as LiveMap + assertNotNull(userProfile, "User profile should be synchronized") + assertEquals(4L, userProfile.size(), "User profile should contain 4 entries") + + // Verify initial values + assertEquals("user123", userProfile.get("userId"), "Initial userId should be user123") + assertEquals("John Doe", userProfile.get("name"), "Initial name should be John Doe") + assertEquals("john@example.com", userProfile.get("email"), "Initial email should be john@example.com") + assertEquals(true, userProfile.get("isActive"), "Initial isActive should be true") + + // Subscribe to changes in the user profile map + val userProfileUpdates = mutableListOf() + val userProfileSubscription = userProfile.subscribe { update -> userProfileUpdates.add(update) } + + // Step 1: Update an existing field in the user profile map (change the name) + restObjects.setMapValue(channelName, userProfileObjectId, "name", ObjectValue("Bob Smith")) + + // Wait for the update to be received + assertWaiter { userProfileUpdates.isNotEmpty() } + + // Verify the update was received + assertEquals(1, userProfileUpdates.size, "Should receive one update") + val firstUpdateMap = userProfileUpdates.first().update + assertEquals(1, firstUpdateMap.size, "Should have one key change") + assertTrue(firstUpdateMap.containsKey("name"), "Update should contain name key") + assertEquals(LiveMapUpdate.Change.UPDATED, firstUpdateMap["name"], "name should be marked as UPDATED") + + // Verify the value was actually updated + assertEquals("Bob Smith", userProfile.get("name"), "Name should be updated to Bob Smith") + + // Step 2: Update another field in the user profile map (change the email) + userProfileUpdates.clear() + restObjects.setMapValue(channelName, userProfileObjectId, "email", ObjectValue("bob@example.com")) + + // Wait for the second update + assertWaiter { userProfileUpdates.isNotEmpty() } + + // Verify the second update + assertEquals(1, userProfileUpdates.size, "Should receive one update for the second change") + val secondUpdateMap = userProfileUpdates.first().update + assertEquals(1, secondUpdateMap.size, "Should have one key change") + assertTrue(secondUpdateMap.containsKey("email"), "Update should contain email key") + assertEquals(LiveMapUpdate.Change.UPDATED, secondUpdateMap["email"], "email should be marked as UPDATED") + + // Verify the value was actually updated + assertEquals("bob@example.com", userProfile.get("email"), "Email should be updated to bob@example.com") + + // Step 3: Remove an existing field from the user profile map (remove isActive) + userProfileUpdates.clear() + restObjects.removeMapValue(channelName, userProfileObjectId, "isActive") + + // Wait for the removal update + assertWaiter { userProfileUpdates.isNotEmpty() } + + // Verify the removal update + assertEquals(1, userProfileUpdates.size, "Should receive one update for removal") + val removalUpdateMap = userProfileUpdates.first().update + assertEquals(1, removalUpdateMap.size, "Should have one key change") + assertTrue(removalUpdateMap.containsKey("isActive"), "Update should contain isActive key") + assertEquals(LiveMapUpdate.Change.REMOVED, removalUpdateMap["isActive"], "isActive should be marked as REMOVED") + + // Verify final state of the user profile map + assertEquals(3L, userProfile.size(), "User profile should have 3 entries after removing isActive") + assertEquals("user123", userProfile.get("userId"), "userId should remain unchanged") + assertEquals("Bob Smith", userProfile.get("name"), "name should remain updated") + assertEquals("bob@example.com", userProfile.get("email"), "email should remain updated") + assertNull(userProfile.get("isActive"), "isActive should be removed") + + // Clean up subscription + userProfileUpdates.clear() + userProfileSubscription.unsubscribe() + // No updates should be received after unsubscribing + restObjects.setMapValue(channelName, userProfileObjectId, "country", ObjectValue("uk")) + + // Wait for a moment to ensure no updates are received + assertWaiter { userProfile.size() == 4L } + + assertTrue(userProfileUpdates.isEmpty(), "No updates should be received after unsubscribing") + } } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/RestObjects.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/RestObjects.kt index 82e1dbef0..165563bd2 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/RestObjects.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/RestObjects.kt @@ -5,6 +5,7 @@ import io.ably.lib.objects.ObjectData import io.ably.lib.objects.ObjectValue import io.ably.lib.rest.AblyRest import io.ably.lib.http.HttpUtils +import io.ably.lib.objects.integration.helpers.fixtures.DataFixtures import io.ably.lib.types.ClientOptions /** @@ -37,8 +38,7 @@ internal class RestObjects(options: ClientOptions) { * Sets an object reference at the specified key in an existing map. */ internal fun setMapRef(channelName: String, mapObjectId: String, key: String, refMapObjectId: String) { - val data = ObjectData(objectId = refMapObjectId) - val mapCreateOp = PayloadBuilder.mapSetRestOp(mapObjectId, key, data) + val mapCreateOp = PayloadBuilder.mapSetRestOp(mapObjectId, key, DataFixtures.mapRef(refMapObjectId)) operationRequest(channelName, mapCreateOp) } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/CounterFixtures.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/CounterFixtures.kt index ec9e7aa61..a8135a9e4 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/CounterFixtures.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/CounterFixtures.kt @@ -42,15 +42,7 @@ internal fun RestObjects.createUserMapWithCountersObject(channelName: String): S val loginStreakCounterObjectId = createCounter(channelName, 7.0) // Create engagement metrics nested map with counters - val engagementMetricsMapObjectId = createMap( - channelName, - data = mapOf( - "totalShares" to DataFixtures.mapRef(createCounter(channelName, 34.0)), - "totalBookmarks" to DataFixtures.mapRef(createCounter(channelName, 67.0)), - "totalReactions" to DataFixtures.mapRef(createCounter(channelName, 189.0)), - "dailyActiveStreak" to DataFixtures.mapRef(createCounter(channelName, 12.0)) - ) - ) + val engagementMetricsMapObjectId = createUserEngagementMatrixMap(channelName) // Set up the main test map structure with references to all created counters setMapRef(channelName, testMapObjectId, "profileViews", profileViewsCounterObjectId) @@ -63,3 +55,34 @@ internal fun RestObjects.createUserMapWithCountersObject(channelName: String): S return testMapObjectId } + +/** + * Creates a user engagement matrix map object with counter references for testing. + * + * This method creates a simple engagement metrics map containing counter objects + * that track various user engagement metrics. The map contains references to + * counter objects representing different types of user interactions and activities. + * + * **Object Structure:** + * ``` + * userEngagementMatrixMap (Map) + * ├── "totalShares" → Counter(value=34) + * ├── "totalBookmarks" → Counter(value=67) + * ├── "totalReactions" → Counter(value=189) + * └── "dailyActiveStreak" → Counter(value=12) + * ``` + * + * @param channelName The channel where the user engagement matrix map will be created + * @return The object ID of the created user engagement matrix map + */ +internal fun RestObjects.createUserEngagementMatrixMap(channelName: String): String { + return createMap( + channelName, + data = mapOf( + "totalShares" to DataFixtures.mapRef(createCounter(channelName, 34.0)), + "totalBookmarks" to DataFixtures.mapRef(createCounter(channelName, 67.0)), + "totalReactions" to DataFixtures.mapRef(createCounter(channelName, 189.0)), + "dailyActiveStreak" to DataFixtures.mapRef(createCounter(channelName, 12.0)) + ) + ) +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/MapFixtures.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/MapFixtures.kt index ee4b8c01f..5b0ec0561 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/MapFixtures.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/MapFixtures.kt @@ -134,17 +134,9 @@ internal fun RestObjects.createUserMapObject(channelName: String): String { ) // Create a user profile map with mixed data types and references - val userProfileMapObjectId = createMap( - channelName, - data = mapOf( - "userId" to ObjectData(value = ObjectValue("user123")), - "name" to ObjectData(value = ObjectValue("John Doe")), - "email" to ObjectData(value = ObjectValue("john@example.com")), - "isActive" to ObjectData(value = ObjectValue(true)), - "metrics" to DataFixtures.mapRef(metricsMapObjectId), - "preferences" to DataFixtures.mapRef(preferencesMapObjectId) - ) - ) + val userProfileMapObjectId = createUserProfileMapObject(channelName) + setMapRef(channelName, userProfileMapObjectId, "metrics", metricsMapObjectId) + setMapRef(channelName, userProfileMapObjectId, "preferences", preferencesMapObjectId) // Set up the main test map structure with references to all created objects setMapRef(channelName, testMapObjectId, "userProfile", userProfileMapObjectId) @@ -155,3 +147,38 @@ internal fun RestObjects.createUserMapObject(channelName: String): String { return testMapObjectId } + +/** + * Creates a user profile map object with basic user information for testing. + * + * This method creates a simple user profile map containing essential user data fields + * that are commonly used in user management systems. The map contains primitive data types + * representing basic user information. + * + * **Object Structure:** + * ``` + * userProfileMap (Map) + * ├── "userId" → "user123" + * ├── "name" → "John Doe" + * ├── "email" → "john@example.com" + * └── "isActive" → true + * ``` + * + * This structure provides a foundation for testing map operations on user profile data, + * including field updates, additions, and removals. The map contains a mix of string, + * boolean, and numeric data types to test various primitive value handling. + * + * @param channelName The channel where the user profile map will be created + * @return The object ID of the created user profile map + */ +internal fun RestObjects.createUserProfileMapObject(channelName: String): String { + return createMap( + channelName, + data = mapOf( + "userId" to ObjectData(value = ObjectValue("user123")), + "name" to ObjectData(value = ObjectValue("John Doe")), + "email" to ObjectData(value = ObjectValue("john@example.com")), + "isActive" to ObjectData(value = ObjectValue(true)), + ) + ) +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt index e4be1b4e9..bc25a7c0e 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt @@ -56,10 +56,10 @@ class LiveMapManagerTest { // Assert on update field - should show changes from old to new state val expectedUpdate = mapOf( - "key1" to "updated", // key1 was updated from "oldValue" to "newValue1" - "key2" to "updated" // key2 was added + "key1" to LiveMapUpdate.Change.UPDATED, // key1 was updated from "oldValue" to "newValue1" + "key2" to LiveMapUpdate.Change.UPDATED // key2 was added ) - assertEquals(expectedUpdate, update) + assertEquals(expectedUpdate, update.update) } @Test @@ -90,8 +90,8 @@ class LiveMapManagerTest { assertEquals(0, liveMap.data.size) // RTLM6c - should be empty map // Assert on update field - should show that key1 was removed - val expectedUpdate = mapOf("key1" to "removed") - assertEquals(expectedUpdate, update) + val expectedUpdate = mapOf("key1" to LiveMapUpdate.Change.REMOVED) + assertEquals(expectedUpdate, update.update) } @Test @@ -119,8 +119,8 @@ class LiveMapManagerTest { assertEquals(0, liveMap.data.size) // RTLM6c - should be empty map when map is null // Assert on update field - should show that key1 was removed - val expectedUpdate = mapOf("key1" to "removed") - assertEquals(expectedUpdate, update) + val expectedUpdate = mapOf("key1" to LiveMapUpdate.Change.REMOVED) + assertEquals(expectedUpdate, update.update) } @Test @@ -178,10 +178,10 @@ class LiveMapManagerTest { // Assert on update field - should show changes from create operation val expectedUpdate = mapOf( - "key1" to "updated", // key1 was updated from "existingValue" to "stateValue" - "key2" to "updated" // key2 was added from create operation + "key1" to LiveMapUpdate.Change.UPDATED, // key1 was updated from "existingValue" to "stateValue" + "key2" to LiveMapUpdate.Change.UPDATED // key2 was added from create operation ) - assertEquals(expectedUpdate, update) + assertEquals(expectedUpdate, update.update) } From 60ee93c20b3b103136fc86d61b0bb48b65bd4661 Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Thu, 24 Jul 2025 15:40:40 +0530 Subject: [PATCH 5/6] [ECO-5458] Annotated java intefaces with relevant spec - Added missing throwIfInvalidAccessApiConfiguration during subscribe --- .../java/io/ably/lib/objects/ObjectsSubscription.java | 2 ++ .../io/ably/lib/objects/type/LiveObjectUpdate.java | 6 +++++- .../lib/objects/type/counter/LiveCounterChange.java | 10 +++++++--- .../io/ably/lib/objects/type/map/LiveMapChange.java | 10 +++++++--- .../kotlin/io/ably/lib/objects/DefaultLiveObjects.kt | 2 +- .../src/main/kotlin/io/ably/lib/objects/Helpers.kt | 1 + .../src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt | 2 +- .../kotlin/io/ably/lib/objects/type/BaseLiveObject.kt | 2 ++ .../lib/objects/type/livecounter/DefaultLiveCounter.kt | 1 + .../io/ably/lib/objects/type/livemap/DefaultLiveMap.kt | 1 + 10 files changed, 28 insertions(+), 9 deletions(-) diff --git a/lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java b/lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java index d6d007ecd..2b22d71d4 100644 --- a/lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java +++ b/lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java @@ -11,12 +11,14 @@ * s.unsubscribe(); * } * + * Spec: RTLO4b5 */ public interface ObjectsSubscription { /** * This method should be called when the subscription is no longer needed, * it will make sure no further events will be sent to the subscriber and * that references to the subscriber are cleaned up. + * Spec: RTLO4b5a */ void unsubscribe(); } diff --git a/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java index 9130a1234..abbb5476e 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java +++ b/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java @@ -6,9 +6,13 @@ * Abstract base class for all LiveObject update notifications. * Provides common structure for updates that occur on LiveMap and LiveCounter objects. * Contains the update data that describes what changed in the live object. + * Spec: RTLO4b4 */ public abstract class LiveObjectUpdate { - /** The update data containing details about the change that occurred */ + /** + * The update data containing details about the change that occurred + * Spec: RTLO4b4a + */ @Nullable protected final Object update; diff --git a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java index 0ac70552c..79f842e74 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java +++ b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java @@ -14,7 +14,8 @@ public interface LiveCounterChange { /** * Subscribes to real-time updates on this LiveCounter object. * Multiple listeners can be subscribed to the same object independently. - * + * Spec: RTLO4b + * * @param listener the listener to be notified of counter updates * @return an ObjectsSubscription for managing this specific listener */ @@ -24,7 +25,8 @@ public interface LiveCounterChange { /** * Unsubscribes a specific listener from receiving updates. * Has no effect if the listener is not currently subscribed. - * + * Spec: RTLO4c + * * @param listener the listener to be unsubscribed */ @NonBlocking @@ -33,18 +35,20 @@ public interface LiveCounterChange { /** * Unsubscribes all listeners from receiving updates. * No notifications will be delivered until new listeners are subscribed. + * Spec: RTLO4d */ @NonBlocking void unsubscribeAll(); /** * Listener interface for receiving LiveCounter updates. + * Spec: RTLO4b3 */ interface Listener { /** * Called when the LiveCounter has been updated. * Should execute quickly as it's called from the real-time processing thread. - * + * * @param update details about the counter change */ void onUpdated(@NotNull LiveCounterUpdate update); diff --git a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java index f39790af3..c30ae7850 100644 --- a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java +++ b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java @@ -14,7 +14,8 @@ public interface LiveMapChange { /** * Subscribes to real-time updates on this LiveMap object. * Multiple listeners can be subscribed to the same object independently. - * + * Spec: RTLO4b + * * @param listener the listener to be notified of map updates * @return an ObjectsSubscription for managing this specific listener */ @@ -24,7 +25,8 @@ public interface LiveMapChange { /** * Unsubscribes a specific listener from receiving updates. * Has no effect if the listener is not currently subscribed. - * + * Spec: RTLO4c + * * @param listener the listener to be unsubscribed */ @NonBlocking @@ -33,18 +35,20 @@ public interface LiveMapChange { /** * Unsubscribes all listeners from receiving updates. * No notifications will be delivered until new listeners are subscribed. + * Spec: RTLO4d */ @NonBlocking void unsubscribeAll(); /** * Listener interface for receiving LiveMap updates. + * Spec: RTLO4b3 */ interface Listener { /** * Called when the LiveMap has been updated. * Should execute quickly as it's called from the real-time processing thread. - * + * * @param update details about which keys were modified and how */ void onUpdated(@NotNull LiveMapUpdate update); diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt index 215bb9b9e..e94c3540d 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt @@ -173,7 +173,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val objectsManager.clearBufferedObjectOperations() // RTO4b5 // defer the state change event until the next tick if we started a new sequence just now due to being in initialized state. // this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop. - objectsManager.endSync(fromInitializedState) // RTO4b4, RTO4b2a + objectsManager.endSync(fromInitializedState) // RTO4b4 } } ChannelState.detached, diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 8dbd86bad..00e079bf3 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -41,6 +41,7 @@ internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMe setChannelSerial(channelName, channelSerial) } +// Spec: RTLO4b1, RTLO4b2 internal fun LiveObjectsAdapter.throwIfInvalidAccessApiConfiguration(channelName: String) { throwIfMissingChannelMode(channelName, ChannelMode.object_subscribe) throwIfInChannelState(channelName, arrayOf(ChannelState.detached, ChannelState.failed)) diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt index fa5d19d2a..af822e948 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt @@ -76,7 +76,7 @@ internal class ObjectsPool( */ internal fun resetToInitialPool(emitUpdateEvents: Boolean) { pool.entries.removeIf { (key, _) -> key != ROOT_OBJECT_ID } // only keep the root object - clearObjectsData(emitUpdateEvents) // clear the root object and emit update events + clearObjectsData(emitUpdateEvents) // RTO4b2a - clear the root object and emit update events } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt index c7e48652c..b740b0b8c 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt @@ -14,6 +14,7 @@ internal enum class ObjectType(val value: String) { Counter("counter") } +// Spec: RTLO4b4b internal val LiveObjectUpdate.noOp get() = this.update == null /** @@ -183,6 +184,7 @@ internal abstract class BaseLiveObject( /** * Notifies subscribers about changes made to this live object. Propagates updates through the * appropriate manager after converting the generic update map to type-specific update objects. + * Spec: RTLO4b4c */ abstract fun notifyUpdated(update: LiveObjectUpdate) diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index d9dfa5f0f..f910f785b 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -62,6 +62,7 @@ internal class DefaultLiveCounter private constructor( } override fun subscribe(listener: LiveCounterChange.Listener): ObjectsSubscription { + adapter.throwIfInvalidAccessApiConfiguration(channelName) return liveCounterManager.subscribe(listener) } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index 5f20a8821..1ad361df3 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -111,6 +111,7 @@ internal class DefaultLiveMap private constructor( override fun validate(state: ObjectState) = liveMapManager.validate(state) override fun subscribe(listener: LiveMapChange.Listener): ObjectsSubscription { + adapter.throwIfInvalidAccessApiConfiguration(channelName) return liveMapManager.subscribe(listener) } From 86c08f6622683c8f4377b537ad34dc45aea3a7ad Mon Sep 17 00:00:00 2001 From: sacOO7 Date: Fri, 1 Aug 2025 16:01:51 +0530 Subject: [PATCH 6/6] [ECO-5458] Updated ChangeEmitter implementations to be type safe --- .../objects/type/livecounter/LiveCounterChangeCoordinator.kt | 3 ++- .../ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt index 02cdded56..0ea58f389 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt @@ -42,7 +42,8 @@ private class LiveCounterChangeEmitter : EventEmitter