Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a67ea0c
[ECO-5426] Implemented snippet to populate missing fields for objects
sacOO7 Jul 1, 2025
632a518
[ECO-5426] Generated object_sync code related to ably-js and spec using
sacOO7 Jul 3, 2025
3f4e0f7
[ECO-5426] Created separate files for LiveMap and LiveCounter
sacOO7 Jul 3, 2025
b91412d
[ECO-5426] Update duplicate check for canApplyOperation, moved `isTom…
sacOO7 Jul 4, 2025
88bb0a4
[ECO-5426] Refactored spec comments for LiveObject, LiveCounter and L…
sacOO7 Jul 4, 2025
d7a3047
[ECO-5426] Generated test cases for ObjectId addresing all edge cases
sacOO7 Jul 4, 2025
2751cfd
[ECO-5426] Refactored ObjectId tests, kept only important ones
sacOO7 Jul 4, 2025
859bdfd
[ECO-5426] Refactored DefaultLiveObjects
sacOO7 Jul 7, 2025
c71df5a
[ECO-5426] Fixed DefaultLiveObjects sync process
sacOO7 Jul 8, 2025
4c8a3bc
[ECO-5426] Fixed DefaultLiveMap and DefaultLiveCounter constructors
sacOO7 Jul 9, 2025
838bf14
[ECO-5426] Added spec annotation comments for LiveMap, LiveCounter an…
sacOO7 Jul 9, 2025
64b2ff6
[ECO-5426] Added separate managers for handling incoming objectMessages
sacOO7 Jul 9, 2025
6982441
[ECO-5426] refactor: consolidate duplicate serial handling logic in B…
sacOO7 Jul 10, 2025
47b3c86
[ECO-5426] refactor: created ObjectsManager for handling incomingobjects
sacOO7 Jul 10, 2025
b26ce0f
[ECO-5426] refactor: extract sync tracking logic into ObjectsSyncTrac…
sacOO7 Jul 10, 2025
0f9f69a
[ECO-5426] fix: address concurrency issues in live objects processing
sacOO7 Jul 10, 2025
4ab8a31
[ECO-5426] Fixed deferred state change event during objects sync
sacOO7 Jul 10, 2025
557dac6
[ECO-5426] feat: implement handleStateChange method for channel state…
sacOO7 Jul 10, 2025
d9f4e42
[ECO-5426] tests: Added few more spec based tests for objetcs and obj…
sacOO7 Jul 11, 2025
57cf42b
[ECO-5426] Refactored code as per review comments
sacOO7 Jul 11, 2025
44d60ce
[ECO-5426] Added few more tests to ObjectsPool
sacOO7 Jul 14, 2025
2ab7b59
[ECO-5426] Added unit tests for ObjectsManager in ObjectsManagerTest
sacOO7 Jul 14, 2025
e808d38
[ECO-5426] Added missing unit tests for BaseLiveObject, refactored code
sacOO7 Jul 15, 2025
647a5e0
[ECO-5426] Updated ObjectMessage Counter specific amount and count to…
sacOO7 Jul 16, 2025
7030cef
[ECO-5426] Refactored BaseLiveObject for validation
sacOO7 Jul 17, 2025
2dee529
[ECO-5426] Refactored initializeHandlerForIncomingObjectMessages to h…
sacOO7 Jul 18, 2025
32bbedb
[ECO-5426] DefaultLiveObjects: added channel state checks for detache…
sacOO7 Jul 22, 2025
3d45982
[ECO-5426] DefaultLiveObjects: reverted to using Double value for Liv…
sacOO7 Jul 23, 2025
a873802
[ECO-5426] Added missing check for handling ObjectOperationAction
sacOO7 Jul 23, 2025
f7a5ce9
Merge branch 'main' into feature/object-sync
sacOO7 Jul 23, 2025
45fde3f
[ECO-5426] Removed unnecessary BaseLiveObject param for BaseLiveObject
sacOO7 Jul 23, 2025
529efd8
[ECO-5426] Updated code, marked data types in objectsPool, LiveCounte…
sacOO7 Jul 23, 2025
4b084f8
[ECO-5426] Updated json values to be passed into `json` key
sacOO7 Jul 29, 2025
32b1a36
[ECO-5426] Updated ObjectValue to have compile time type safety inste…
sacOO7 Jul 31, 2025
17164c4
[ECO-5426] Updated assertions method syntax for expected and actual
sacOO7 Aug 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/src/main/java/io/ably/lib/objects/LiveCounter.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,5 @@ public interface LiveCounter {
*/
@NotNull
@Contract(pure = true) // Indicates this method does not modify the state of the object.
Long value();
Double value();
}
12 changes: 12 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/LiveObjectsPlugin.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.ably.lib.objects;

import io.ably.lib.realtime.ChannelState;
import io.ably.lib.types.ProtocolMessage;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -30,6 +31,17 @@ public interface LiveObjectsPlugin {
*/
void handle(@NotNull ProtocolMessage message);

/**
* Handles state changes for a specific channel.
* This method is invoked whenever a channel's state changes, allowing the implementation
* to update the LiveObjects instances accordingly based on the new state and presence of objects.
*
* @param channelName the name of the channel whose state has changed.
* @param state the new state of the channel.
* @param hasObjects flag indicates whether the channel has any associated live objects.
*/
void handleStateChange(@NotNull String channelName, @NotNull ChannelState state, boolean hasObjects);

/**
* Disposes of the LiveObjects instance associated with the specified channel name.
* This method removes the LiveObjects instance for the given channel, releasing any
Expand Down
16 changes: 16 additions & 0 deletions lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,15 @@ private void setState(ChannelState newState, ErrorInfo reason, boolean resumed,
this.reason = stateChange.reason;
}

// cover states other than attached, ChannelState.attached already covered in setAttached
if (liveObjectsPlugin != null && newState!= ChannelState.attached) {
try {
liveObjectsPlugin.handleStateChange(name, newState, false);
} catch (Throwable t) {
Log.e(TAG, "Unexpected exception in LiveObjectsPlugin.handle", t);
}
}

if (newState != ChannelState.attaching && newState != ChannelState.suspended) {
this.retryAttempt = 0;
}
Expand Down Expand Up @@ -439,6 +448,13 @@ private void setAttached(ProtocolMessage message) {
}
return;
}
if (liveObjectsPlugin != null) {
try {
liveObjectsPlugin.handleStateChange(name, ChannelState.attached, message.hasFlag(Flag.has_objects));
} catch (Throwable t) {
Log.e(TAG, "Unexpected exception in LiveObjectsPlugin.handle", t);
}
}
if(state == ChannelState.attached) {
Log.v(TAG, String.format(Locale.ROOT, "Server initiated attach for channel %s", name));
if (!message.hasFlag(Flag.resumed)) { // RTL12
Expand Down
172 changes: 161 additions & 11 deletions live-objects/src/main/kotlin/io/ably/lib/objects/DefaultLiveObjects.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,59 @@
package io.ably.lib.objects

import io.ably.lib.realtime.ChannelState
import io.ably.lib.types.Callback
import io.ably.lib.types.ProtocolMessage
import io.ably.lib.util.Log
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.flow.MutableSharedFlow

internal class DefaultLiveObjects(private val channelName: String, private val adapter: LiveObjectsAdapter): LiveObjects {
private val tag = DefaultLiveObjects::class.simpleName
/**
* @spec RTO2 - enum representing objects state
*/
internal enum class ObjectsState {
INITIALIZED,
SYNCING,
SYNCED
}

/**
* Default implementation of LiveObjects interface.
* Provides the core functionality for managing live objects on a channel.
*/
internal class DefaultLiveObjects(internal val channelName: String, internal val adapter: LiveObjectsAdapter): LiveObjects {
private val tag = "DefaultLiveObjects"
/**
* @spec RTO3 - Objects pool storing all live objects by object ID
*/
internal val objectsPool = ObjectsPool(this)

internal var state = ObjectsState.INITIALIZED

/**
* @spec RTO4 - Used for handling object messages and object sync messages
*/
private val objectsManager = ObjectsManager(this)

/**
* Coroutine scope for running sequential operations on a single thread, used to avoid concurrency issues.
*/
private val sequentialScope =
CoroutineScope(Dispatchers.Default.limitedParallelism(1) + CoroutineName(channelName) + SupervisorJob())

/**
* Event bus for handling incoming object messages sequentially.
*/
private val objectsEventBus = MutableSharedFlow<ProtocolMessage>(extraBufferCapacity = UNLIMITED)
private val incomingObjectsHandler: Job

init {
incomingObjectsHandler = initializeHandlerForIncomingObjectMessages()
}

/**
* @spec RTO1 - Returns the root LiveMap object with proper validation and sync waiting
*/
override fun getRoot(): LiveMap {
TODO("Not yet implemented")
}
Expand Down Expand Up @@ -47,18 +94,121 @@ internal class DefaultLiveObjects(private val channelName: String, private val a
TODO("Not yet implemented")
}

fun handle(msg: ProtocolMessage) {
// RTL15b
msg.channelSerial?.let {
if (msg.action === ProtocolMessage.Action.`object`) {
Log.v(tag, "Setting channel serial for channelName: $channelName, value: ${msg.channelSerial}")
adapter.setChannelSerial(channelName, msg.channelSerial)
/**
* Handles a ProtocolMessage containing proto action as `object` or `object_sync`.
* @spec RTL1 - Processes incoming object messages and object sync messages
*/
internal fun handle(protocolMessage: ProtocolMessage) {
// RTL15b - Set channel serial for OBJECT messages
adapter.setChannelSerial(channelName, protocolMessage)

if (protocolMessage.state == null || protocolMessage.state.isEmpty()) {
Log.w(tag, "Received ProtocolMessage with null or empty objects, ignoring")
return
}

objectsEventBus.tryEmit(protocolMessage)
}

/**
* Initializes the handler for incoming object messages and object sync messages.
* Processes the messages sequentially to ensure thread safety and correct order of operations.
*
* @spec OM2 - Populates missing fields from parent protocol message
*/
private fun initializeHandlerForIncomingObjectMessages(): Job {
return sequentialScope.launch {
objectsEventBus.collect { protocolMessage ->
// OM2 - Populate missing fields from parent
val objects = protocolMessage.state.filterIsInstance<ObjectMessage>()
.mapIndexed { index, objMsg ->
objMsg.copy(
connectionId = objMsg.connectionId ?: protocolMessage.connectionId, // OM2c
timestamp = objMsg.timestamp ?: protocolMessage.timestamp, // OM2e
id = objMsg.id ?: (protocolMessage.id + ':' + index) // OM2a
)
}

try {
when (protocolMessage.action) {
ProtocolMessage.Action.`object` -> objectsManager.handleObjectMessages(objects)
ProtocolMessage.Action.object_sync -> objectsManager.handleObjectSyncMessages(
objects,
protocolMessage.channelSerial
)
else -> Log.w(tag, "Ignoring protocol message with unhandled action: ${protocolMessage.action}")
}
} catch (exception: Exception) {
// Skip current message if an error occurs, don't rethrow to avoid crashing the collector
Log.e(tag, "Error handling objects message with protocolMsg id ${protocolMessage.id}", exception)
}
}
}
}

fun dispose() {
// Dispose of any resources associated with this LiveObjects instance
// For example, close any open connections or clean up references
internal fun handleStateChange(state: ChannelState, hasObjects: Boolean) {
sequentialScope.launch {
when (state) {
ChannelState.attached -> {
Log.v(tag, "Objects.onAttached() channel=$channelName, hasObjects=$hasObjects")

// RTO4a
val fromInitializedState = this@DefaultLiveObjects.state == ObjectsState.INITIALIZED
if (hasObjects || fromInitializedState) {
// should always start a new sync sequence if we're in the initialized state, no matter the HAS_OBJECTS flag value.
// this guarantees we emit both "syncing" -> "synced" events in that order.
objectsManager.startNewSync(null)
}

// RTO4b
if (!hasObjects) {
// if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel.
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2
objectsManager.clearSyncObjectsDataPool() // RTO4b3
objectsManager.clearBufferedObjectOperations() // RTO4b5
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
objectsManager.endSync(fromInitializedState) // RTO4b4
}
}
ChannelState.detached,
ChannelState.failed -> {
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
objectsPool.clearObjectsData(false)
objectsManager.clearSyncObjectsDataPool()
}

else -> {
// No action needed for other states
}
}
}
}

/**
* Changes the state and emits events.
*
* @spec RTO2 - Emits state change events for syncing and synced states
*/
internal fun stateChange(newState: ObjectsState, deferEvent: Boolean) {
if (state == newState) {
return
}

state = newState
Log.v(tag, "Objects state changed to: $newState")

// TODO: Emit state change events
}

// Dispose of any resources associated with this LiveObjects instance
fun dispose(reason: String) {
val cancellationError = CancellationException("Objects disposed for channel $channelName, reason: $reason")
incomingObjectsHandler.cancel(cancellationError) // objectsEventBus automatically garbage collected when collector is cancelled
objectsPool.dispose()
objectsManager.dispose()
// Don't cancel sequentialScope (needed in public methods), just cancel ongoing coroutines
sequentialScope.coroutineContext.cancelChildren(cancellationError)
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.ably.lib.objects

import io.ably.lib.realtime.ChannelState
import io.ably.lib.types.ProtocolMessage
import java.util.concurrent.ConcurrentHashMap

Expand All @@ -16,14 +17,18 @@ public class DefaultLiveObjectsPlugin(private val adapter: LiveObjectsAdapter) :
liveObjects[channelName]?.handle(msg)
}

override fun handleStateChange(channelName: String, state: ChannelState, hasObjects: Boolean) {
liveObjects[channelName]?.handleStateChange(state, hasObjects)
}

override fun dispose(channelName: String) {
liveObjects[channelName]?.dispose()
liveObjects[channelName]?.dispose("Channel has ben released using channels.release()")
liveObjects.remove(channelName)
}

override fun dispose() {
liveObjects.values.forEach {
it.dispose()
it.dispose("AblyClient has been closed using client.close()")
}
liveObjects.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ internal enum class ErrorCode(public val code: Int) {
BadRequest(40_000),
InternalError(50_000),
MaxMessageSizeExceeded(40_009),
InvalidObject(92_000),
// LiveMap specific error codes
MapKeyShouldBeString(40_003),
MapValueDataTypeUnsupported(40_013),
}

internal enum class HttpStatusCode(public val code: Int) {
Expand Down
7 changes: 7 additions & 0 deletions live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ internal fun LiveObjectsAdapter.ensureMessageSizeWithinLimit(objectMessages: Arr
}
}

internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMessage: ProtocolMessage) {
if (protocolMessage.action != ProtocolMessage.Action.`object`) return
val channelSerial = protocolMessage.channelSerial
if (channelSerial.isNullOrEmpty()) return
setChannelSerial(channelName, channelSerial)
}

internal class Binary(val data: ByteArray) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
Expand Down
62 changes: 62 additions & 0 deletions live-objects/src/main/kotlin/io/ably/lib/objects/ObjectId.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.ably.lib.objects

import io.ably.lib.objects.type.ObjectType

internal class ObjectId private constructor(
internal val type: ObjectType,
private val hash: String,
private val timestampMs: Long
) {
/**
* Converts ObjectId to string representation.
*/
override fun toString(): String {
return "${type.value}:$hash@$timestampMs"
}

companion object {
/**
* Creates ObjectId instance from hashed object id string.
*/
fun fromString(objectId: String): ObjectId {
if (objectId.isEmpty()) {
throw objectError("Invalid object id: $objectId")
}

// Parse format: type:hash@msTimestamp
val parts = objectId.split(':')
if (parts.size != 2) {
throw objectError("Invalid object id: $objectId")
}

val (typeStr, rest) = parts

val type = when (typeStr) {
"map" -> ObjectType.Map
"counter" -> ObjectType.Counter
else -> throw objectError("Invalid object type in object id: $objectId")
}

val hashAndTimestamp = rest.split('@')
if (hashAndTimestamp.size != 2) {
throw objectError("Invalid object id: $objectId")
}

val hash = hashAndTimestamp[0]

if (hash.isEmpty()) {
throw objectError("Invalid object id: $objectId")
}

val msTimestampStr = hashAndTimestamp[1]

val msTimestamp = try {
msTimestampStr.toLong()
} catch (e: NumberFormatException) {
throw objectError("Invalid object id: $objectId", e)
}

return ObjectId(type, hash, msTimestamp)
}
}
}
Loading
Loading