diff --git a/lib/src/main/java/io/ably/lib/objects/LiveObjects.java b/lib/src/main/java/io/ably/lib/objects/LiveObjects.java index ac5b2c919..a68822d5f 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveObjects.java +++ b/lib/src/main/java/io/ably/lib/objects/LiveObjects.java @@ -1,6 +1,8 @@ package io.ably.lib.objects; import io.ably.lib.objects.state.ObjectsStateChange; +import io.ably.lib.objects.type.counter.LiveCounter; +import io.ably.lib.objects.type.map.LiveMap; import org.jetbrains.annotations.Blocking; import org.jetbrains.annotations.NonBlocking; import org.jetbrains.annotations.NotNull; diff --git a/lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java b/lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java index d6d007ecd..2b22d71d4 100644 --- a/lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java +++ b/lib/src/main/java/io/ably/lib/objects/ObjectsSubscription.java @@ -11,12 +11,14 @@ * s.unsubscribe(); * } * + * Spec: RTLO4b5 */ public interface ObjectsSubscription { /** * This method should be called when the subscription is no longer needed, * it will make sure no further events will be sent to the subscriber and * that references to the subscriber are cleaned up. + * Spec: RTLO4b5a */ void unsubscribe(); } diff --git a/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java new file mode 100644 index 000000000..abbb5476e --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java @@ -0,0 +1,27 @@ +package io.ably.lib.objects.type; + +import org.jetbrains.annotations.Nullable; + +/** + * Abstract base class for all LiveObject update notifications. + * Provides common structure for updates that occur on LiveMap and LiveCounter objects. + * Contains the update data that describes what changed in the live object. + * Spec: RTLO4b4 + */ +public abstract class LiveObjectUpdate { + /** + * The update data containing details about the change that occurred + * Spec: RTLO4b4a + */ + @Nullable + protected final Object update; + + /** + * Creates a LiveObjectUpdate with the specified update data. + * + * @param update the data describing the change, or null for no-op updates + */ + protected LiveObjectUpdate(@Nullable Object update) { + this.update = update; + } +} diff --git a/lib/src/main/java/io/ably/lib/objects/LiveCounter.java b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java similarity index 94% rename from lib/src/main/java/io/ably/lib/objects/LiveCounter.java rename to lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java index 81ef13f37..54e7a3130 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveCounter.java +++ b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounter.java @@ -1,5 +1,6 @@ -package io.ably.lib.objects; +package io.ably.lib.objects.type.counter; +import io.ably.lib.objects.ObjectsCallback; import org.jetbrains.annotations.Blocking; import org.jetbrains.annotations.NonBlocking; import org.jetbrains.annotations.NotNull; @@ -10,7 +11,7 @@ * It allows incrementing, decrementing, and retrieving the current value of the counter, * both synchronously and asynchronously. */ -public interface LiveCounter { +public interface LiveCounter extends LiveCounterChange { /** * Increments the value of the counter by 1. diff --git a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java new file mode 100644 index 000000000..79f842e74 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterChange.java @@ -0,0 +1,56 @@ +package io.ably.lib.objects.type.counter; + +import io.ably.lib.objects.ObjectsSubscription; +import org.jetbrains.annotations.NonBlocking; +import org.jetbrains.annotations.NotNull; + +/** + * Provides methods to subscribe to real-time updates on LiveCounter objects. + * Enables clients to receive notifications when counter values change due to + * operations performed by any client connected to the same channel. + */ +public interface LiveCounterChange { + + /** + * Subscribes to real-time updates on this LiveCounter object. + * Multiple listeners can be subscribed to the same object independently. + * Spec: RTLO4b + * + * @param listener the listener to be notified of counter updates + * @return an ObjectsSubscription for managing this specific listener + */ + @NonBlocking + @NotNull ObjectsSubscription subscribe(@NotNull Listener listener); + + /** + * Unsubscribes a specific listener from receiving updates. + * Has no effect if the listener is not currently subscribed. + * Spec: RTLO4c + * + * @param listener the listener to be unsubscribed + */ + @NonBlocking + void unsubscribe(@NotNull Listener listener); + + /** + * Unsubscribes all listeners from receiving updates. + * No notifications will be delivered until new listeners are subscribed. + * Spec: RTLO4d + */ + @NonBlocking + void unsubscribeAll(); + + /** + * Listener interface for receiving LiveCounter updates. + * Spec: RTLO4b3 + */ + interface Listener { + /** + * Called when the LiveCounter has been updated. + * Should execute quickly as it's called from the real-time processing thread. + * + * @param update details about the counter change + */ + void onUpdated(@NotNull LiveCounterUpdate update); + } +} diff --git a/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java new file mode 100644 index 000000000..d0362d27b --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/type/counter/LiveCounterUpdate.java @@ -0,0 +1,80 @@ +package io.ably.lib.objects.type.counter; + +import io.ably.lib.objects.type.LiveObjectUpdate; +import org.jetbrains.annotations.NotNull; + +/** + * Represents an update that occurred on a LiveCounter object. + * Contains information about counter value changes from increment/decrement operations. + * Updates can represent positive changes (increments) or negative changes (decrements). + * + * @spec RTLC11, RTLC11a - LiveCounter update structure and behavior + */ +public class LiveCounterUpdate extends LiveObjectUpdate { + + /** + * Creates a no-op LiveCounterUpdate representing no actual change. + */ + public LiveCounterUpdate() { + super(null); + } + + /** + * Creates a LiveCounterUpdate with the specified amount change. + * + * @param amount the amount by which the counter changed (positive = increment, negative = decrement) + */ + public LiveCounterUpdate(@NotNull Double amount) { + super(new Update(amount)); + } + + /** + * Gets the update information containing the amount of change. + * + * @return the Update object with the counter modification amount + */ + @NotNull + public LiveCounterUpdate.Update getUpdate() { + return (Update) update; + } + + /** + * Returns a string representation of this LiveCounterUpdate. + * + * @return a string showing the amount of change to the counter + */ + @Override + public String toString() { + if (update == null) { + return "LiveCounterUpdate{no change}"; + } + return "LiveCounterUpdate{amount=" + getUpdate().getAmount() + "}"; + } + + /** + * Contains the specific details of a counter update operation. + * + * @spec RTLC11b, RTLC11b1 - Counter update data structure + */ + public static class Update { + private final @NotNull Double amount; + + /** + * Creates an Update with the specified amount. + * + * @param amount the counter change amount (positive = increment, negative = decrement) + */ + public Update(@NotNull Double amount) { + this.amount = amount; + } + + /** + * Gets the amount by which the counter value was modified. + * + * @return the change amount (positive for increments, negative for decrements) + */ + public @NotNull Double getAmount() { + return amount; + } + } +} diff --git a/lib/src/main/java/io/ably/lib/objects/LiveMap.java b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java similarity index 97% rename from lib/src/main/java/io/ably/lib/objects/LiveMap.java rename to lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java index ae1299dd4..e66a63728 100644 --- a/lib/src/main/java/io/ably/lib/objects/LiveMap.java +++ b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMap.java @@ -1,5 +1,6 @@ -package io.ably.lib.objects; +package io.ably.lib.objects.type.map; +import io.ably.lib.objects.ObjectsCallback; import org.jetbrains.annotations.Blocking; import org.jetbrains.annotations.NonBlocking; import org.jetbrains.annotations.Contract; @@ -13,7 +14,7 @@ * The LiveMap interface provides methods to interact with a live, real-time map structure. * It supports both synchronous and asynchronous operations for managing key-value pairs. */ -public interface LiveMap { +public interface LiveMap extends LiveMapChange { /** * Retrieves the value associated with the specified key. diff --git a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java new file mode 100644 index 000000000..c30ae7850 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java @@ -0,0 +1,56 @@ +package io.ably.lib.objects.type.map; + +import io.ably.lib.objects.ObjectsSubscription; +import org.jetbrains.annotations.NonBlocking; +import org.jetbrains.annotations.NotNull; + +/** + * Provides methods to subscribe to real-time updates on LiveMap objects. + * Enables clients to receive notifications when map entries are added, updated, or removed. + * Uses last-write-wins conflict resolution when multiple clients modify the same key. + */ +public interface LiveMapChange { + + /** + * Subscribes to real-time updates on this LiveMap object. + * Multiple listeners can be subscribed to the same object independently. + * Spec: RTLO4b + * + * @param listener the listener to be notified of map updates + * @return an ObjectsSubscription for managing this specific listener + */ + @NonBlocking + @NotNull ObjectsSubscription subscribe(@NotNull Listener listener); + + /** + * Unsubscribes a specific listener from receiving updates. + * Has no effect if the listener is not currently subscribed. + * Spec: RTLO4c + * + * @param listener the listener to be unsubscribed + */ + @NonBlocking + void unsubscribe(@NotNull Listener listener); + + /** + * Unsubscribes all listeners from receiving updates. + * No notifications will be delivered until new listeners are subscribed. + * Spec: RTLO4d + */ + @NonBlocking + void unsubscribeAll(); + + /** + * Listener interface for receiving LiveMap updates. + * Spec: RTLO4b3 + */ + interface Listener { + /** + * Called when the LiveMap has been updated. + * Should execute quickly as it's called from the real-time processing thread. + * + * @param update details about which keys were modified and how + */ + void onUpdated(@NotNull LiveMapUpdate update); + } +} diff --git a/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java new file mode 100644 index 000000000..5d753cd5c --- /dev/null +++ b/lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java @@ -0,0 +1,66 @@ +package io.ably.lib.objects.type.map; + +import io.ably.lib.objects.type.LiveObjectUpdate; +import org.jetbrains.annotations.NotNull; + +import java.util.Map; + +/** + * Represents an update that occurred on a LiveMap object. + * Contains information about which keys were modified and whether they were updated or removed. + * + * @spec RTLM18, RTLM18a - LiveMap update structure and behavior + */ +public class LiveMapUpdate extends LiveObjectUpdate { + + /** + * Creates a no-op LiveMapUpdate representing no actual change. + */ + public LiveMapUpdate() { + super(null); + } + + /** + * Creates a LiveMapUpdate with the specified key changes. + * + * @param update map of key names to their change types (UPDATED or REMOVED) + */ + public LiveMapUpdate(@NotNull Map update) { + super(update); + } + + /** + * Gets the map of key changes that occurred in this update. + * + * @return map of key names to their change types + */ + @NotNull + public Map getUpdate() { + return (Map) update; + } + + /** + * Returns a string representation of this LiveMapUpdate. + * + * @return a string showing the map key changes in this update + */ + @Override + public String toString() { + if (update == null) { + return "LiveMapUpdate{no change}"; + } + return "LiveMapUpdate{changes=" + getUpdate() + "}"; + } + + /** + * Indicates the type of change that occurred to a map key. + * + * @spec RTLM18b - Map change types + */ + public enum Change { + /** The key was added or its value was modified */ + UPDATED, + /** The key was removed from the map */ + REMOVED + } +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt index d449404ce..0ffaeacf1 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt @@ -2,6 +2,8 @@ package io.ably.lib.objects import io.ably.lib.objects.state.ObjectsStateChange import io.ably.lib.objects.state.ObjectsStateEvent +import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.map.LiveMap import io.ably.lib.realtime.ChannelState import io.ably.lib.types.AblyException import io.ably.lib.types.ProtocolMessage @@ -125,7 +127,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val * @spec OM2 - Populates missing fields from parent protocol message */ private fun initializeHandlerForIncomingObjectMessages(): Job { - return sequentialScope.launch { + return sequentialScope.launch { objectsEventBus.collect { protocolMessage -> // OM2 - Populate missing fields from parent val objects = protocolMessage.state.filterIsInstance() diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt index 8dbd86bad..00e079bf3 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt @@ -41,6 +41,7 @@ internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMe setChannelSerial(channelName, channelSerial) } +// Spec: RTLO4b1, RTLO4b2 internal fun LiveObjectsAdapter.throwIfInvalidAccessApiConfiguration(channelName: String) { throwIfMissingChannelMode(channelName, ChannelMode.object_subscribe) throwIfInChannelState(channelName, arrayOf(ChannelState.detached, ChannelState.failed)) diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt index 206ebc71c..b099ccfef 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt @@ -1,6 +1,7 @@ package io.ably.lib.objects import io.ably.lib.objects.type.BaseLiveObject +import io.ably.lib.objects.type.LiveObjectUpdate import io.ably.lib.objects.type.livecounter.DefaultLiveCounter import io.ably.lib.objects.type.livemap.DefaultLiveMap import io.ably.lib.util.Log @@ -125,7 +126,8 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje } val receivedObjectIds = mutableSetOf() - val existingObjectUpdates = mutableListOf>() + // RTO5c1a2 - List to collect updates for existing objects + val existingObjectUpdates = mutableListOf>() // RTO5c1 for ((objectId, objectState) in syncObjectsDataPool) { @@ -148,7 +150,7 @@ internal class ObjectsManager(private val liveObjects: DefaultLiveObjects): Obje // RTO5c2 - need to remove LiveObject instances from the ObjectsPool for which objectIds were not received during the sync sequence liveObjects.objectsPool.deleteExtraObjectIds(receivedObjectIds) - // call subscription callbacks for all updated existing objects + // RTO5c7 - call subscription callbacks for all updated existing objects existingObjectUpdates.forEach { (obj, update) -> obj.notifyUpdated(update) } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt index fa5d19d2a..af822e948 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/ObjectsPool.kt @@ -76,7 +76,7 @@ internal class ObjectsPool( */ internal fun resetToInitialPool(emitUpdateEvents: Boolean) { pool.entries.removeIf { (key, _) -> key != ROOT_OBJECT_ID } // only keep the root object - clearObjectsData(emitUpdateEvents) // clear the root object and emit update events + clearObjectsData(emitUpdateEvents) // RTO4b2a - clear the root object and emit update events } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt index 523d37dc8..b740b0b8c 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/BaseLiveObject.kt @@ -5,6 +5,8 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.ObjectsPoolDefaults import io.ably.lib.objects.objectError +import io.ably.lib.objects.type.livecounter.noOpCounterUpdate +import io.ably.lib.objects.type.livemap.noOpMapUpdate import io.ably.lib.util.Log internal enum class ObjectType(val value: String) { @@ -12,6 +14,9 @@ internal enum class ObjectType(val value: String) { Counter("counter") } +// Spec: RTLO4b4b +internal val LiveObjectUpdate.noOp get() = this.update == null + /** * Base implementation of LiveObject interface. * Provides common functionality for all live objects. @@ -42,7 +47,7 @@ internal abstract class BaseLiveObject( * * @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter */ - internal fun applyObjectSync(objectState: ObjectState): Map { + internal fun applyObjectSync(objectState: ObjectState): LiveObjectUpdate { validate(objectState) // object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation. // should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object. @@ -51,7 +56,10 @@ internal abstract class BaseLiveObject( if (isTombstoned) { // this object is tombstoned. this is a terminal state which can't be overridden. skip the rest of object state message processing - return mapOf() + if (objectType == ObjectType.Map) { + return noOpMapUpdate + } + return noOpCounterUpdate } return applyObjectState(objectState) // RTLM6, RTLC6 } @@ -74,7 +82,7 @@ internal abstract class BaseLiveObject( Log.v( tag, "Skipping ${objectOperation.action} op: op serial $msgTimeSerial <= site serial ${siteTimeserials[msgSiteCode]}; " + - "objectId=$objectId" + "objectId=$objectId" ) return } @@ -89,11 +97,6 @@ internal abstract class BaseLiveObject( applyObjectOperation(objectOperation, objectMessage) // RTLC7d } - internal fun notifyUpdated(update: Any) { - // TODO: Implement event emission for updates - Log.v(tag, "Object $objectId updated: $update") - } - /** * Checks if an operation can be applied based on serial comparison. * @@ -119,7 +122,7 @@ internal abstract class BaseLiveObject( /** * Marks the object as tombstoned. */ - internal fun tombstone(): Any { + internal fun tombstone(): LiveObjectUpdate { isTombstoned = true tombstonedAt = System.currentTimeMillis() val update = clearData() @@ -150,7 +153,7 @@ internal abstract class BaseLiveObject( * @return A map describing the changes made to the object's data * */ - abstract fun applyObjectState(objectState: ObjectState): Map + abstract fun applyObjectState(objectState: ObjectState): LiveObjectUpdate /** * Applies an operation to this live object. @@ -176,7 +179,14 @@ internal abstract class BaseLiveObject( * * @return A map representing the diff of changes made */ - abstract fun clearData(): Map + abstract fun clearData(): LiveObjectUpdate + + /** + * Notifies subscribers about changes made to this live object. Propagates updates through the + * appropriate manager after converting the generic update map to type-specific update objects. + * Spec: RTLO4b4c + */ + abstract fun notifyUpdated(update: LiveObjectUpdate) /** * Called during garbage collection intervals to clean up expired entries. diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt index 5f0ee538e..162a6a64f 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/DefaultLiveCounter.kt @@ -4,8 +4,14 @@ import io.ably.lib.objects.* import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.type.BaseLiveObject +import io.ably.lib.objects.type.LiveObjectUpdate import io.ably.lib.objects.type.ObjectType +import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.counter.LiveCounterChange +import io.ably.lib.objects.type.counter.LiveCounterUpdate +import io.ably.lib.objects.type.noOp import java.util.concurrent.atomic.AtomicReference +import io.ably.lib.util.Log /** * Implementation of LiveObject for LiveCounter. @@ -54,9 +60,18 @@ internal class DefaultLiveCounter private constructor( return data.get() } + override fun subscribe(listener: LiveCounterChange.Listener): ObjectsSubscription { + adapter.throwIfInvalidAccessApiConfiguration(channelName) + return liveCounterManager.subscribe(listener) + } + + override fun unsubscribe(listener: LiveCounterChange.Listener) = liveCounterManager.unsubscribe(listener) + + override fun unsubscribeAll() = liveCounterManager.unsubscribeAll() + override fun validate(state: ObjectState) = liveCounterManager.validate(state) - override fun applyObjectState(objectState: ObjectState): Map { + override fun applyObjectState(objectState: ObjectState): LiveCounterUpdate { return liveCounterManager.applyState(objectState) } @@ -64,8 +79,16 @@ internal class DefaultLiveCounter private constructor( liveCounterManager.applyOperation(operation) } - override fun clearData(): Map { - return mapOf("amount" to data.get()).apply { data.set(0.0) } + override fun clearData(): LiveCounterUpdate { + return LiveCounterUpdate(data.get()).apply { data.set(0.0) } + } + + override fun notifyUpdated(update: LiveObjectUpdate) { + if (update.noOp) { + return + } + Log.v(tag, "Object $objectId updated: $update") + liveCounterManager.notify(update as LiveCounterUpdate) } override fun onGCInterval() { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt new file mode 100644 index 000000000..0ea58f389 --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterChangeCoordinator.kt @@ -0,0 +1,51 @@ +package io.ably.lib.objects.type.livecounter + +import io.ably.lib.objects.ObjectsSubscription +import io.ably.lib.objects.type.counter.LiveCounterChange +import io.ably.lib.objects.type.counter.LiveCounterUpdate +import io.ably.lib.util.EventEmitter +import io.ably.lib.util.Log + +internal val noOpCounterUpdate = LiveCounterUpdate() + +/** + * Interface for handling live counter changes by notifying subscribers of updates. + * Implementations typically propagate updates through event emission to registered listeners. + */ +internal interface HandlesLiveCounterChange { + /** + * Notifies all registered listeners about a counter update by propagating the change through the event system. + * This method is called when counter data changes and triggers the emission of update events to subscribers. + */ + fun notify(update: LiveCounterUpdate) +} + +internal abstract class LiveCounterChangeCoordinator: LiveCounterChange, HandlesLiveCounterChange { + private val counterChangeEmitter = LiveCounterChangeEmitter() + + override fun subscribe(listener: LiveCounterChange.Listener): ObjectsSubscription { + counterChangeEmitter.on(listener) + return ObjectsSubscription { + counterChangeEmitter.off(listener) + } + } + + override fun unsubscribe(listener: LiveCounterChange.Listener) = counterChangeEmitter.off(listener) + + override fun unsubscribeAll() = counterChangeEmitter.off() + + override fun notify(update: LiveCounterUpdate) = counterChangeEmitter.emit(update) +} + +private class LiveCounterChangeEmitter : EventEmitter() { + private val tag = "LiveCounterChangeEmitter" + + override fun apply(listener: LiveCounterChange.Listener?, event: LiveCounterUpdate?, vararg args: Any?) { + try { + event?.let { listener?.onUpdated(it) } + ?: Log.w(tag, "Null event passed to listener callback") + } catch (t: Throwable) { + Log.e(tag, "Error occurred while executing listener callback for event: $event", t) + } + } +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt index 0a34c530a..0988b316d 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livecounter/LiveCounterManager.kt @@ -5,9 +5,11 @@ import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectOperationAction import io.ably.lib.objects.ObjectState import io.ably.lib.objects.objectError +import io.ably.lib.objects.type.counter.LiveCounterUpdate import io.ably.lib.util.Log -internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { +internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter): LiveCounterChangeCoordinator() { + private val objectId = liveCounter.objectId private val tag = "LiveCounterManager" @@ -15,7 +17,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { /** * @spec RTLC6 - Overrides counter data with state from sync */ - internal fun applyState(objectState: ObjectState): Map { + internal fun applyState(objectState: ObjectState): LiveCounterUpdate { val previousData = liveCounter.data.get() if (objectState.tombstone) { @@ -31,7 +33,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { } } - return mapOf("amount" to (liveCounter.data.get() - previousData)) + return LiveCounterUpdate(liveCounter.data.get() - previousData) } /** @@ -51,13 +53,13 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { else -> throw objectError("Invalid ${operation.action} op for LiveCounter objectId=${objectId}") // RTLC7d3 } - liveCounter.notifyUpdated(update) + liveCounter.notifyUpdated(update) // RTLC7d1a, RTLC7d2a } /** * @spec RTLC8 - Applies counter create operation */ - private fun applyCounterCreate(operation: ObjectOperation): Map { + private fun applyCounterCreate(operation: ObjectOperation): LiveCounterUpdate { if (liveCounter.createOperationIsMerged) { // RTLC8b // There can't be two different create operation for the same object id, because the object id @@ -67,7 +69,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { tag, "Skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=$objectId" ) - return mapOf() + return noOpCounterUpdate // RTLC8c } return mergeInitialDataFromCreateOperation(operation) // RTLC8c @@ -76,17 +78,17 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { /** * @spec RTLC9 - Applies counter increment operation */ - private fun applyCounterInc(counterOp: ObjectCounterOp): Map { + private fun applyCounterInc(counterOp: ObjectCounterOp): LiveCounterUpdate { val amount = counterOp.amount ?: 0.0 val previousValue = liveCounter.data.get() liveCounter.data.set(previousValue + amount) // RTLC9b - return mapOf("amount" to amount) + return LiveCounterUpdate(amount) } /** * @spec RTLC10 - Merges initial data from create operation */ - private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): Map { + private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): LiveCounterUpdate { // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case. // note that it is intentional to SUM the incoming count from the create op. // if we got here, it means that current counter instance is missing the initial value in its data reference, @@ -95,7 +97,7 @@ internal class LiveCounterManager(private val liveCounter: DefaultLiveCounter) { val previousValue = liveCounter.data.get() liveCounter.data.set(previousValue + count) // RTLC10a liveCounter.createOperationIsMerged = true // RTLC10b - return mapOf("amount" to count) + return LiveCounterUpdate(count) } internal fun validate(state: ObjectState) { diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt index b17368de9..99e2617fd 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/DefaultLiveMap.kt @@ -6,7 +6,13 @@ import io.ably.lib.objects.ObjectMessage import io.ably.lib.objects.ObjectOperation import io.ably.lib.objects.ObjectState import io.ably.lib.objects.type.BaseLiveObject +import io.ably.lib.objects.type.LiveObjectUpdate import io.ably.lib.objects.type.ObjectType +import io.ably.lib.objects.type.map.LiveMap +import io.ably.lib.objects.type.map.LiveMapChange +import io.ably.lib.objects.type.map.LiveMapUpdate +import io.ably.lib.objects.type.noOp +import io.ably.lib.util.Log import java.util.concurrent.ConcurrentHashMap import java.util.AbstractMap @@ -103,7 +109,16 @@ internal class DefaultLiveMap private constructor( override fun validate(state: ObjectState) = liveMapManager.validate(state) - override fun applyObjectState(objectState: ObjectState): Map { + override fun subscribe(listener: LiveMapChange.Listener): ObjectsSubscription { + adapter.throwIfInvalidAccessApiConfiguration(channelName) + return liveMapManager.subscribe(listener) + } + + override fun unsubscribe(listener: LiveMapChange.Listener) = liveMapManager.unsubscribe(listener) + + override fun unsubscribeAll() = liveMapManager.unsubscribeAll() + + override fun applyObjectState(objectState: ObjectState): LiveMapUpdate { return liveMapManager.applyState(objectState) } @@ -111,11 +126,19 @@ internal class DefaultLiveMap private constructor( liveMapManager.applyOperation(operation, message.serial) } - override fun clearData(): Map { + override fun clearData(): LiveMapUpdate { return liveMapManager.calculateUpdateFromDataDiff(data.toMap(), emptyMap()) .apply { data.clear() } } + override fun notifyUpdated(update: LiveObjectUpdate) { + if (update.noOp) { + return + } + Log.v(tag, "Object $objectId updated: $update") + liveMapManager.notify(update as LiveMapUpdate) + } + override fun onGCInterval() { data.entries.removeIf { (_, entry) -> entry.isEligibleForGc() } } diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt new file mode 100644 index 000000000..0013f2388 --- /dev/null +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapChangeCoordinator.kt @@ -0,0 +1,51 @@ +package io.ably.lib.objects.type.livemap + +import io.ably.lib.objects.ObjectsSubscription +import io.ably.lib.objects.type.map.LiveMapChange +import io.ably.lib.objects.type.map.LiveMapUpdate +import io.ably.lib.util.EventEmitter +import io.ably.lib.util.Log + +internal val noOpMapUpdate = LiveMapUpdate() + +/** + * Interface for handling live map changes by notifying subscribers of updates. + * Implementations typically propagate updates through event emission to registered listeners. + */ +internal interface HandlesLiveMapChange { + /** + * Notifies all registered listeners about a map update by propagating the change through the event system. + * This method is called when map data changes and triggers the emission of update events to subscribers. + */ + fun notify(update: LiveMapUpdate) +} + +internal abstract class LiveMapChangeCoordinator: LiveMapChange, HandlesLiveMapChange { + private val mapChangeEmitter = LiveMapChangeEmitter() + + override fun subscribe(listener: LiveMapChange.Listener): ObjectsSubscription { + mapChangeEmitter.on(listener) + return ObjectsSubscription { + mapChangeEmitter.off(listener) + } + } + + override fun unsubscribe(listener: LiveMapChange.Listener) = mapChangeEmitter.off(listener) + + override fun unsubscribeAll() = mapChangeEmitter.off() + + override fun notify(update: LiveMapUpdate) = mapChangeEmitter.emit(update) +} + +private class LiveMapChangeEmitter : EventEmitter() { + private val tag = "LiveMapChangeEmitter" + + override fun apply(listener: LiveMapChange.Listener?, event: LiveMapUpdate?, vararg args: Any?) { + try { + event?.let { listener?.onUpdated(it) } + ?: Log.w(tag, "Null event passed to listener callback") + } catch (t: Throwable) { + Log.e(tag, "Error occurred while executing listener callback for event: $event", t) + } + } +} diff --git a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt index 55b660d16..a081455cc 100644 --- a/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt +++ b/live-objects/src/main/kotlin/io/ably/lib/objects/type/livemap/LiveMapManager.kt @@ -7,9 +7,12 @@ import io.ably.lib.objects.ObjectOperationAction import io.ably.lib.objects.ObjectState import io.ably.lib.objects.isInvalid import io.ably.lib.objects.objectError +import io.ably.lib.objects.type.map.LiveMapUpdate +import io.ably.lib.objects.type.noOp import io.ably.lib.util.Log -internal class LiveMapManager(private val liveMap: DefaultLiveMap) { +internal class LiveMapManager(private val liveMap: DefaultLiveMap): LiveMapChangeCoordinator() { + private val objectId = liveMap.objectId private val tag = "LiveMapManager" @@ -17,7 +20,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { /** * @spec RTLM6 - Overrides object data with state from sync */ - internal fun applyState(objectState: ObjectState): Map { + internal fun applyState(objectState: ObjectState): LiveMapUpdate { val previousData = liveMap.data.toMap() if (objectState.tombstone) { @@ -69,13 +72,13 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { else -> throw objectError("Invalid ${operation.action} op for LiveMap objectId=${objectId}") // RTLM15d4 } - liveMap.notifyUpdated(update) + liveMap.notifyUpdated(update) // RTLM15d1a, RTLM15d2a, RTLM15d3a } /** * @spec RTLM16 - Applies map create operation */ - private fun applyMapCreate(operation: ObjectOperation): Map { + private fun applyMapCreate(operation: ObjectOperation): LiveMapUpdate { if (liveMap.createOperationIsMerged) { // RTLM16b // There can't be two different create operation for the same object id, because the object id @@ -85,7 +88,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { tag, "Skipping applying MAP_CREATE op on a map instance as it was already applied before; objectId=${objectId}" ) - return mapOf() + return noOpMapUpdate } validateMapSemantics(operation.map?.semantics) // RTLM16c @@ -99,7 +102,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { private fun applyMapSet( mapOp: ObjectMapOp, // RTLM7d1 timeSerial: String?, // RTLM7d2 - ): Map { + ): LiveMapUpdate { val existingEntry = liveMap.data[mapOp.key] // RTLM7a @@ -109,7 +112,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { "Skipping update for key=\"${mapOp.key}\": op serial $timeSerial <= entry serial ${existingEntry.timeserial};" + " objectId=${objectId}" ) - return mapOf() + return noOpMapUpdate } if (mapOp.data.isInvalid()) { @@ -142,7 +145,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { ) } - return mapOf(mapOp.key to "updated") + return LiveMapUpdate(mapOf(mapOp.key to LiveMapUpdate.Change.UPDATED)) } /** @@ -151,7 +154,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { private fun applyMapRemove( mapOp: ObjectMapOp, // RTLM8c1 timeSerial: String?, // RTLM8c2 - ): Map { + ): LiveMapUpdate { val existingEntry = liveMap.data[mapOp.key] // RTLM8a @@ -162,7 +165,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { "Skipping remove for key=\"${mapOp.key}\": op serial $timeSerial <= entry serial ${existingEntry.timeserial}; " + "objectId=${objectId}" ) - return mapOf() + return noOpMapUpdate } if (existingEntry != null) { @@ -182,7 +185,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { ) } - return mapOf(mapOp.key to "removed") + return LiveMapUpdate(mapOf(mapOp.key to LiveMapUpdate.Change.REMOVED)) } /** @@ -206,12 +209,12 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { /** * @spec RTLM17 - Merges initial data from create operation */ - private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): Map { + private fun mergeInitialDataFromCreateOperation(operation: ObjectOperation): LiveMapUpdate { if (operation.map?.entries.isNullOrEmpty()) { // no map entries in MAP_CREATE op - return mapOf() + return noOpMapUpdate } - val aggregatedUpdate = mutableMapOf() + val aggregatedUpdate = mutableListOf() // RTLM17a // in order to apply MAP_CREATE op for an existing map, we should merge their underlying entries keys. @@ -228,25 +231,30 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { } // skip noop updates - if (update.isEmpty()) { + if (update.noOp) { return@forEach } - aggregatedUpdate.putAll(update) + aggregatedUpdate.add(update) } liveMap.createOperationIsMerged = true // RTLM17b - return aggregatedUpdate + return LiveMapUpdate( + aggregatedUpdate.map { it.update }.fold(emptyMap()) { acc, map -> acc + map } + ) } - internal fun calculateUpdateFromDataDiff(prevData: Map, newData: Map): Map { - val update = mutableMapOf() + internal fun calculateUpdateFromDataDiff( + prevData: Map, + newData: Map + ): LiveMapUpdate { + val update = mutableMapOf() // Check for removed entries for ((key, prevEntry) in prevData) { if (!prevEntry.isTombstoned && !newData.containsKey(key)) { - update[key] = "removed" + update[key] = LiveMapUpdate.Change.REMOVED } } @@ -255,7 +263,7 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { if (!prevData.containsKey(key)) { // if property does not exist in current map, but new data has it as non-tombstoned property - got updated if (!newEntry.isTombstoned) { - update[key] = "updated" + update[key] = LiveMapUpdate.Change.UPDATED } // otherwise, if new data has this prop tombstoned - do nothing, as property didn't exist anyway continue @@ -267,12 +275,12 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { // compare tombstones first if (prevEntry.isTombstoned && !newEntry.isTombstoned) { // prev prop is tombstoned, but new is not. it means prop was updated to a meaningful value - update[key] = "updated" + update[key] = LiveMapUpdate.Change.UPDATED continue } if (!prevEntry.isTombstoned && newEntry.isTombstoned) { // prev prop is not tombstoned, but new is. it means prop was removed - update[key] = "removed" + update[key] = LiveMapUpdate.Change.REMOVED continue } if (prevEntry.isTombstoned && newEntry.isTombstoned) { @@ -283,12 +291,12 @@ internal class LiveMapManager(private val liveMap: DefaultLiveMap) { // both props exist and are not tombstoned, need to compare values to see if it was changed val valueChanged = prevEntry.data != newEntry.data if (valueChanged) { - update[key] = "updated" + update[key] = LiveMapUpdate.Change.UPDATED continue } } - return update + return LiveMapUpdate(update) } internal fun validate(state: ObjectState) { diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt index a55106451..bac176d3f 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveCounterTest.kt @@ -1,14 +1,17 @@ package io.ably.lib.objects.integration -import io.ably.lib.objects.LiveCounter -import io.ably.lib.objects.LiveMap +import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.map.LiveMap import io.ably.lib.objects.assertWaiter +import io.ably.lib.objects.integration.helpers.ObjectId +import io.ably.lib.objects.integration.helpers.fixtures.createUserEngagementMatrixMap import io.ably.lib.objects.integration.helpers.fixtures.createUserMapWithCountersObject import io.ably.lib.objects.integration.setup.IntegrationTest import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals import kotlin.test.assertNotNull +import kotlin.test.assertTrue class DefaultLiveCounterTest: IntegrationTest() { /** @@ -202,4 +205,73 @@ class DefaultLiveCounterTest: IntegrationTest() { assertNotNull(finalCounterCheck, "Counter should still be accessible from root map") assertEquals(30.0, finalCounterCheck.value(), "Final counter value should be 30 when accessed from root map") } + + @Test + fun testLiveCounterChangesUsingSubscription() = runTest { + val channelName = generateChannelName() + val userEngagementMapId = restObjects.createUserEngagementMatrixMap(channelName) + restObjects.setMapRef(channelName, "root", "userMatrix", userEngagementMapId) + + val channel = getRealtimeChannel(channelName) + val rootMap = channel.objects.root + + val userEngagementMap = rootMap.get("userMatrix") as LiveMap + assertEquals(4L, userEngagementMap.size(), "User engagement map should contain 4 top-level entries") + + val totalReactions = userEngagementMap.get("totalReactions") as LiveCounter + assertEquals(189.0, totalReactions.value(), "Total reactions counter should have initial value of 189") + + // Subscribe to changes on the totalReactions counter + val counterUpdates = mutableListOf() + val totalReactionsSubscription = totalReactions.subscribe { update -> + counterUpdates.add(update.update.amount) + } + + // Step 1: Increment the totalReactions counter by 10 (189 + 10 = 199) + restObjects.incrementCounter(channelName, totalReactions.ObjectId, 10.0) + + // Wait for the update to be received + assertWaiter { counterUpdates.isNotEmpty() } + + // Verify the increment update was received + assertEquals(1, counterUpdates.size, "Should receive one update for increment") + assertEquals(10.0, counterUpdates.first(), "Update should contain increment amount of 10") + assertEquals(199.0, totalReactions.value(), "Counter should be incremented to 199") + + // Step 2: Decrement the totalReactions counter by 5 (199 - 5 = 194) + counterUpdates.clear() + restObjects.decrementCounter(channelName, totalReactions.ObjectId, 5.0) + + // Wait for the second update + assertWaiter { counterUpdates.isNotEmpty() } + + // Verify the decrement update was received + assertEquals(1, counterUpdates.size, "Should receive one update for decrement") + assertEquals(-5.0, counterUpdates.first(), "Update should contain decrement amount of -5") + assertEquals(194.0, totalReactions.value(), "Counter should be decremented to 194") + + // Step 3: Increment the totalReactions counter by 15 (194 + 15 = 209) + counterUpdates.clear() + restObjects.incrementCounter(channelName, totalReactions.ObjectId, 15.0) + + // Wait for the third update + assertWaiter { counterUpdates.isNotEmpty() } + + // Verify the third increment update was received + assertEquals(1, counterUpdates.size, "Should receive one update for third increment") + assertEquals(15.0, counterUpdates.first(), "Update should contain increment amount of 15") + assertEquals(209.0, totalReactions.value(), "Counter should be incremented to 209") + + // Clean up subscription + counterUpdates.clear() + totalReactionsSubscription.unsubscribe() + + // No updates should be received after unsubscribing + restObjects.incrementCounter(channelName, totalReactions.ObjectId, 20.0) + + // Wait for a moment to ensure no updates are received + assertWaiter { totalReactions.value() == 229.0 } + + assertTrue(counterUpdates.isEmpty(), "No updates should be received after unsubscribing") + } } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt index 68e94c891..98f167521 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveMapTest.kt @@ -4,12 +4,17 @@ import io.ably.lib.objects.* import io.ably.lib.objects.ObjectData import io.ably.lib.objects.ObjectValue import io.ably.lib.objects.integration.helpers.fixtures.createUserMapObject +import io.ably.lib.objects.integration.helpers.fixtures.createUserProfileMapObject import io.ably.lib.objects.integration.setup.IntegrationTest +import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.map.LiveMap +import io.ably.lib.objects.type.map.LiveMapUpdate import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertNull +import kotlin.test.assertTrue class DefaultLiveMapTest: IntegrationTest() { /** @@ -210,4 +215,94 @@ class DefaultLiveMapTest: IntegrationTest() { val finalValues = testMap.values().toSet() assertEquals(setOf("Bob", false, "bob@example.com"), finalValues, "Final values should match expected set") } + + @Test + fun testLiveMapChangesUsingSubscription() = runTest { + val channelName = generateChannelName() + val userProfileObjectId = restObjects.createUserProfileMapObject(channelName) + restObjects.setMapRef(channelName, "root", "userProfile", userProfileObjectId) + + val channel = getRealtimeChannel(channelName) + val rootMap = channel.objects.root + + // Get the user profile map object from the root map + val userProfile = rootMap.get("userProfile") as LiveMap + assertNotNull(userProfile, "User profile should be synchronized") + assertEquals(4L, userProfile.size(), "User profile should contain 4 entries") + + // Verify initial values + assertEquals("user123", userProfile.get("userId"), "Initial userId should be user123") + assertEquals("John Doe", userProfile.get("name"), "Initial name should be John Doe") + assertEquals("john@example.com", userProfile.get("email"), "Initial email should be john@example.com") + assertEquals(true, userProfile.get("isActive"), "Initial isActive should be true") + + // Subscribe to changes in the user profile map + val userProfileUpdates = mutableListOf() + val userProfileSubscription = userProfile.subscribe { update -> userProfileUpdates.add(update) } + + // Step 1: Update an existing field in the user profile map (change the name) + restObjects.setMapValue(channelName, userProfileObjectId, "name", ObjectValue.String("Bob Smith")) + + // Wait for the update to be received + assertWaiter { userProfileUpdates.isNotEmpty() } + + // Verify the update was received + assertEquals(1, userProfileUpdates.size, "Should receive one update") + val firstUpdateMap = userProfileUpdates.first().update + assertEquals(1, firstUpdateMap.size, "Should have one key change") + assertTrue(firstUpdateMap.containsKey("name"), "Update should contain name key") + assertEquals(LiveMapUpdate.Change.UPDATED, firstUpdateMap["name"], "name should be marked as UPDATED") + + // Verify the value was actually updated + assertEquals("Bob Smith", userProfile.get("name"), "Name should be updated to Bob Smith") + + // Step 2: Update another field in the user profile map (change the email) + userProfileUpdates.clear() + restObjects.setMapValue(channelName, userProfileObjectId, "email", ObjectValue.String("bob@example.com")) + + // Wait for the second update + assertWaiter { userProfileUpdates.isNotEmpty() } + + // Verify the second update + assertEquals(1, userProfileUpdates.size, "Should receive one update for the second change") + val secondUpdateMap = userProfileUpdates.first().update + assertEquals(1, secondUpdateMap.size, "Should have one key change") + assertTrue(secondUpdateMap.containsKey("email"), "Update should contain email key") + assertEquals(LiveMapUpdate.Change.UPDATED, secondUpdateMap["email"], "email should be marked as UPDATED") + + // Verify the value was actually updated + assertEquals("bob@example.com", userProfile.get("email"), "Email should be updated to bob@example.com") + + // Step 3: Remove an existing field from the user profile map (remove isActive) + userProfileUpdates.clear() + restObjects.removeMapValue(channelName, userProfileObjectId, "isActive") + + // Wait for the removal update + assertWaiter { userProfileUpdates.isNotEmpty() } + + // Verify the removal update + assertEquals(1, userProfileUpdates.size, "Should receive one update for removal") + val removalUpdateMap = userProfileUpdates.first().update + assertEquals(1, removalUpdateMap.size, "Should have one key change") + assertTrue(removalUpdateMap.containsKey("isActive"), "Update should contain isActive key") + assertEquals(LiveMapUpdate.Change.REMOVED, removalUpdateMap["isActive"], "isActive should be marked as REMOVED") + + // Verify final state of the user profile map + assertEquals(3L, userProfile.size(), "User profile should have 3 entries after removing isActive") + assertEquals("user123", userProfile.get("userId"), "userId should remain unchanged") + assertEquals("Bob Smith", userProfile.get("name"), "name should remain updated") + assertEquals("bob@example.com", userProfile.get("email"), "email should remain updated") + assertNull(userProfile.get("isActive"), "isActive should be removed") + + // Clean up subscription + userProfileUpdates.clear() + userProfileSubscription.unsubscribe() + // No updates should be received after unsubscribing + restObjects.setMapValue(channelName, userProfileObjectId, "country", ObjectValue.String("uk")) + + // Wait for a moment to ensure no updates are received + assertWaiter { userProfile.size() == 4L } + + assertTrue(userProfileUpdates.isEmpty(), "No updates should be received after unsubscribing") + } } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt index 8e5396a1a..3bea82c92 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/DefaultLiveObjectsTest.kt @@ -9,6 +9,8 @@ import io.ably.lib.objects.integration.helpers.fixtures.initializeRootMap import io.ably.lib.objects.integration.setup.IntegrationTest import io.ably.lib.objects.size import io.ably.lib.objects.state.ObjectsStateEvent +import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.map.LiveMap import kotlinx.coroutines.test.runTest import org.junit.Test import kotlin.test.assertEquals diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/RestObjects.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/RestObjects.kt index 82e1dbef0..165563bd2 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/RestObjects.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/RestObjects.kt @@ -5,6 +5,7 @@ import io.ably.lib.objects.ObjectData import io.ably.lib.objects.ObjectValue import io.ably.lib.rest.AblyRest import io.ably.lib.http.HttpUtils +import io.ably.lib.objects.integration.helpers.fixtures.DataFixtures import io.ably.lib.types.ClientOptions /** @@ -37,8 +38,7 @@ internal class RestObjects(options: ClientOptions) { * Sets an object reference at the specified key in an existing map. */ internal fun setMapRef(channelName: String, mapObjectId: String, key: String, refMapObjectId: String) { - val data = ObjectData(objectId = refMapObjectId) - val mapCreateOp = PayloadBuilder.mapSetRestOp(mapObjectId, key, data) + val mapCreateOp = PayloadBuilder.mapSetRestOp(mapObjectId, key, DataFixtures.mapRef(refMapObjectId)) operationRequest(channelName, mapCreateOp) } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt index e402347e7..f50c60cf9 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/Utils.kt @@ -1,8 +1,8 @@ package io.ably.lib.objects.integration.helpers import io.ably.lib.objects.DefaultLiveObjects -import io.ably.lib.objects.LiveCounter -import io.ably.lib.objects.LiveMap +import io.ably.lib.objects.type.counter.LiveCounter +import io.ably.lib.objects.type.map.LiveMap import io.ably.lib.objects.LiveObjects import io.ably.lib.objects.type.livecounter.DefaultLiveCounter import io.ably.lib.objects.type.livemap.DefaultLiveMap diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/CounterFixtures.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/CounterFixtures.kt index ec9e7aa61..a8135a9e4 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/CounterFixtures.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/CounterFixtures.kt @@ -42,15 +42,7 @@ internal fun RestObjects.createUserMapWithCountersObject(channelName: String): S val loginStreakCounterObjectId = createCounter(channelName, 7.0) // Create engagement metrics nested map with counters - val engagementMetricsMapObjectId = createMap( - channelName, - data = mapOf( - "totalShares" to DataFixtures.mapRef(createCounter(channelName, 34.0)), - "totalBookmarks" to DataFixtures.mapRef(createCounter(channelName, 67.0)), - "totalReactions" to DataFixtures.mapRef(createCounter(channelName, 189.0)), - "dailyActiveStreak" to DataFixtures.mapRef(createCounter(channelName, 12.0)) - ) - ) + val engagementMetricsMapObjectId = createUserEngagementMatrixMap(channelName) // Set up the main test map structure with references to all created counters setMapRef(channelName, testMapObjectId, "profileViews", profileViewsCounterObjectId) @@ -63,3 +55,34 @@ internal fun RestObjects.createUserMapWithCountersObject(channelName: String): S return testMapObjectId } + +/** + * Creates a user engagement matrix map object with counter references for testing. + * + * This method creates a simple engagement metrics map containing counter objects + * that track various user engagement metrics. The map contains references to + * counter objects representing different types of user interactions and activities. + * + * **Object Structure:** + * ``` + * userEngagementMatrixMap (Map) + * ├── "totalShares" → Counter(value=34) + * ├── "totalBookmarks" → Counter(value=67) + * ├── "totalReactions" → Counter(value=189) + * └── "dailyActiveStreak" → Counter(value=12) + * ``` + * + * @param channelName The channel where the user engagement matrix map will be created + * @return The object ID of the created user engagement matrix map + */ +internal fun RestObjects.createUserEngagementMatrixMap(channelName: String): String { + return createMap( + channelName, + data = mapOf( + "totalShares" to DataFixtures.mapRef(createCounter(channelName, 34.0)), + "totalBookmarks" to DataFixtures.mapRef(createCounter(channelName, 67.0)), + "totalReactions" to DataFixtures.mapRef(createCounter(channelName, 189.0)), + "dailyActiveStreak" to DataFixtures.mapRef(createCounter(channelName, 12.0)) + ) + ) +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/MapFixtures.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/MapFixtures.kt index f99dd7d9c..8499eefc2 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/MapFixtures.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/helpers/fixtures/MapFixtures.kt @@ -134,17 +134,9 @@ internal fun RestObjects.createUserMapObject(channelName: String): String { ) // Create a user profile map with mixed data types and references - val userProfileMapObjectId = createMap( - channelName, - data = mapOf( - "userId" to ObjectData(value = ObjectValue.String("user123")), - "name" to ObjectData(value = ObjectValue.String("John Doe")), - "email" to ObjectData(value = ObjectValue.String("john@example.com")), - "isActive" to ObjectData(value = ObjectValue.Boolean(true)), - "metrics" to DataFixtures.mapRef(metricsMapObjectId), - "preferences" to DataFixtures.mapRef(preferencesMapObjectId) - ) - ) + val userProfileMapObjectId = createUserProfileMapObject(channelName) + setMapRef(channelName, userProfileMapObjectId, "metrics", metricsMapObjectId) + setMapRef(channelName, userProfileMapObjectId, "preferences", preferencesMapObjectId) // Set up the main test map structure with references to all created objects setMapRef(channelName, testMapObjectId, "userProfile", userProfileMapObjectId) @@ -155,3 +147,38 @@ internal fun RestObjects.createUserMapObject(channelName: String): String { return testMapObjectId } + +/** + * Creates a user profile map object with basic user information for testing. + * + * This method creates a simple user profile map containing essential user data fields + * that are commonly used in user management systems. The map contains primitive data types + * representing basic user information. + * + * **Object Structure:** + * ``` + * userProfileMap (Map) + * ├── "userId" → "user123" + * ├── "name" → "John Doe" + * ├── "email" → "john@example.com" + * └── "isActive" → true + * ``` + * + * This structure provides a foundation for testing map operations on user profile data, + * including field updates, additions, and removals. The map contains a mix of string, + * boolean, and numeric data types to test various primitive value handling. + * + * @param channelName The channel where the user profile map will be created + * @return The object ID of the created user profile map + */ +internal fun RestObjects.createUserProfileMapObject(channelName: String): String { + return createMap( + channelName, + data = mapOf( + "userId" to ObjectData(value = ObjectValue.String("user123")), + "name" to ObjectData(value = ObjectValue.String("John Doe")), + "email" to ObjectData(value = ObjectValue.String("john@example.com")), + "isActive" to ObjectData(value = ObjectValue.Boolean(true)), + ) + ) +} diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt index b79f24a04..6cad20508 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/integration/setup/IntegrationTest.kt @@ -22,7 +22,7 @@ abstract class IntegrationTest { @JvmField @Rule - val timeout: Timeout = Timeout.seconds(10) + val timeout: Timeout = Timeout.seconds(15) private val realtimeClients = mutableMapOf() diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt index 133e8ba80..48ddd41c2 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livecounter/LiveCounterManagerTest.kt @@ -28,7 +28,7 @@ class DefaultLiveCounterManagerTest { assertFalse(liveCounter.createOperationIsMerged) // RTLC6b assertEquals(25.0, liveCounter.data.get()) // RTLC6c - assertEquals(15.0, update["amount"]) // Difference between old and new data + assertEquals(15.0, update.update.amount) // Difference between old and new data } @@ -58,7 +58,7 @@ class DefaultLiveCounterManagerTest { val update = liveCounterManager.applyState(objectState) assertEquals(25.0, liveCounter.data.get()) // 15 from state + 10 from create op - assertEquals(20.0, update["amount"]) // Total change + assertEquals(20.0, update.update.amount) // Total change } diff --git a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt index 418de2609..5f00d7446 100644 --- a/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt +++ b/live-objects/src/test/kotlin/io/ably/lib/objects/unit/type/livemap/LiveMapManagerTest.kt @@ -3,6 +3,7 @@ package io.ably.lib.objects.unit.type.livemap import io.ably.lib.objects.* import io.ably.lib.objects.type.livemap.LiveMapEntry import io.ably.lib.objects.type.livemap.LiveMapManager +import io.ably.lib.objects.type.map.LiveMapUpdate import io.ably.lib.objects.unit.LiveMapManager import io.ably.lib.objects.unit.getDefaultLiveMapWithMockedDeps import io.ably.lib.types.AblyException @@ -55,10 +56,10 @@ class LiveMapManagerTest { // Assert on update field - should show changes from old to new state val expectedUpdate = mapOf( - "key1" to "updated", // key1 was updated from "oldValue" to "newValue1" - "key2" to "updated" // key2 was added + "key1" to LiveMapUpdate.Change.UPDATED, // key1 was updated from "oldValue" to "newValue1" + "key2" to LiveMapUpdate.Change.UPDATED // key2 was added ) - assertEquals(expectedUpdate, update) + assertEquals(expectedUpdate, update.update) } @Test @@ -89,8 +90,8 @@ class LiveMapManagerTest { assertEquals(0, liveMap.data.size) // RTLM6c - should be empty map // Assert on update field - should show that key1 was removed - val expectedUpdate = mapOf("key1" to "removed") - assertEquals(expectedUpdate, update) + val expectedUpdate = mapOf("key1" to LiveMapUpdate.Change.REMOVED) + assertEquals(expectedUpdate, update.update) } @Test @@ -118,8 +119,8 @@ class LiveMapManagerTest { assertEquals(0, liveMap.data.size) // RTLM6c - should be empty map when map is null // Assert on update field - should show that key1 was removed - val expectedUpdate = mapOf("key1" to "removed") - assertEquals(expectedUpdate, update) + val expectedUpdate = mapOf("key1" to LiveMapUpdate.Change.REMOVED) + assertEquals(expectedUpdate, update.update) } @Test @@ -177,10 +178,10 @@ class LiveMapManagerTest { // Assert on update field - should show changes from create operation val expectedUpdate = mapOf( - "key1" to "updated", // key1 was updated from "existingValue" to "stateValue" - "key2" to "updated" // key2 was added from create operation + "key1" to LiveMapUpdate.Change.UPDATED, // key1 was updated from "existingValue" to "stateValue" + "key2" to LiveMapUpdate.Change.UPDATED // key2 was added from create operation ) - assertEquals(expectedUpdate, update) + assertEquals(expectedUpdate, update.update) } @@ -637,7 +638,7 @@ class LiveMapManagerTest { val prevData1 = mapOf() val newData1 = mapOf() val result1 = livemapManager.calculateUpdateFromDataDiff(prevData1, newData1) - assertEquals(emptyMap(), result1, "Should return empty map for no changes") + assertEquals(emptyMap(), result1.update, "Should return empty map for no changes") // Test case 2: Entry added val prevData2 = mapOf() @@ -649,7 +650,7 @@ class LiveMapManagerTest { ) ) val result2 = livemapManager.calculateUpdateFromDataDiff(prevData2, newData2) - assertEquals(mapOf("key1" to "updated"), result2, "Should detect added entry") + assertEquals(mapOf("key1" to LiveMapUpdate.Change.UPDATED), result2.update, "Should detect added entry") // Test case 3: Entry removed val prevData3 = mapOf( @@ -661,7 +662,7 @@ class LiveMapManagerTest { ) val newData3 = mapOf() val result3 = livemapManager.calculateUpdateFromDataDiff(prevData3, newData3) - assertEquals(mapOf("key1" to "removed"), result3, "Should detect removed entry") + assertEquals(mapOf("key1" to LiveMapUpdate.Change.REMOVED), result3.update, "Should detect removed entry") // Test case 4: Entry updated val prevData4 = mapOf( @@ -679,7 +680,7 @@ class LiveMapManagerTest { ) ) val result4 = livemapManager.calculateUpdateFromDataDiff(prevData4, newData4) - assertEquals(mapOf("key1" to "updated"), result4, "Should detect updated entry") + assertEquals(mapOf("key1" to LiveMapUpdate.Change.UPDATED), result4.update, "Should detect updated entry") // Test case 5: Entry tombstoned val prevData5 = mapOf( @@ -697,7 +698,7 @@ class LiveMapManagerTest { ) ) val result5 = livemapManager.calculateUpdateFromDataDiff(prevData5, newData5) - assertEquals(mapOf("key1" to "removed"), result5, "Should detect tombstoned entry") + assertEquals(mapOf("key1" to LiveMapUpdate.Change.REMOVED), result5.update, "Should detect tombstoned entry") // Test case 6: Entry untombstoned val prevData6 = mapOf( @@ -715,7 +716,7 @@ class LiveMapManagerTest { ) ) val result6 = livemapManager.calculateUpdateFromDataDiff(prevData6, newData6) - assertEquals(mapOf("key1" to "updated"), result6, "Should detect untombstoned entry") + assertEquals(mapOf("key1" to LiveMapUpdate.Change.UPDATED), result6.update, "Should detect untombstoned entry") // Test case 7: Both entries tombstoned (noop) val prevData7 = mapOf( @@ -733,7 +734,7 @@ class LiveMapManagerTest { ) ) val result7 = livemapManager.calculateUpdateFromDataDiff(prevData7, newData7) - assertEquals(emptyMap(), result7, "Should not detect change for both tombstoned entries") + assertEquals(emptyMap(), result7.update, "Should not detect change for both tombstoned entries") // Test case 8: New tombstoned entry (noop) val prevData8 = mapOf() @@ -745,7 +746,7 @@ class LiveMapManagerTest { ) ) val result8 = livemapManager.calculateUpdateFromDataDiff(prevData8, newData8) - assertEquals(emptyMap(), result8, "Should not detect change for new tombstoned entry") + assertEquals(emptyMap(), result8.update, "Should not detect change for new tombstoned entry") // Test case 9: Multiple changes val prevData9 = mapOf( @@ -774,11 +775,11 @@ class LiveMapManagerTest { ) val result9 = livemapManager.calculateUpdateFromDataDiff(prevData9, newData9) val expected9 = mapOf( - "key1" to "updated", - "key2" to "removed", - "key3" to "updated" + "key1" to LiveMapUpdate.Change.UPDATED, + "key2" to LiveMapUpdate.Change.REMOVED, + "key3" to LiveMapUpdate.Change.UPDATED ) - assertEquals(expected9, result9, "Should detect multiple changes correctly") + assertEquals(expected9, result9.update, "Should detect multiple changes correctly") // Test case 10: ObjectId references val prevData10 = mapOf( @@ -796,7 +797,7 @@ class LiveMapManagerTest { ) ) val result10 = livemapManager.calculateUpdateFromDataDiff(prevData10, newData10) - assertEquals(mapOf("key1" to "updated"), result10, "Should detect objectId change") + assertEquals(mapOf("key1" to LiveMapUpdate.Change.UPDATED), result10.update, "Should detect objectId change") // Test case 11: Same data, no change val prevData11 = mapOf( @@ -814,6 +815,6 @@ class LiveMapManagerTest { ) ) val result11 = livemapManager.calculateUpdateFromDataDiff(prevData11, newData11) - assertEquals(emptyMap(), result11, "Should not detect change for same data") + assertEquals(emptyMap(), result11.update, "Should not detect change for same data") } }