Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/LiveObjects.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.ably.lib.objects;

import io.ably.lib.objects.state.ObjectsStateChange;
import io.ably.lib.objects.type.counter.LiveCounter;
import io.ably.lib.objects.type.map.LiveMap;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
* s.unsubscribe();
* }
* </pre>
* Spec: RTLO4b5
*/
public interface ObjectsSubscription {
/**
* This method should be called when the subscription is no longer needed,
* it will make sure no further events will be sent to the subscriber and
* that references to the subscriber are cleaned up.
* Spec: RTLO4b5a
*/
void unsubscribe();
}
27 changes: 27 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/type/LiveObjectUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.ably.lib.objects.type;

import org.jetbrains.annotations.Nullable;

/**
* Abstract base class for all LiveObject update notifications.
* Provides common structure for updates that occur on LiveMap and LiveCounter objects.
* Contains the update data that describes what changed in the live object.
* Spec: RTLO4b4
*/
public abstract class LiveObjectUpdate {
/**
* The update data containing details about the change that occurred
* Spec: RTLO4b4a
*/
@Nullable
protected final Object update;

/**
* Creates a LiveObjectUpdate with the specified update data.
*
* @param update the data describing the change, or null for no-op updates
*/
protected LiveObjectUpdate(@Nullable Object update) {
this.update = update;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.ably.lib.objects;
package io.ably.lib.objects.type.counter;

import io.ably.lib.objects.ObjectsCallback;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;
Expand All @@ -10,7 +11,7 @@
* It allows incrementing, decrementing, and retrieving the current value of the counter,
* both synchronously and asynchronously.
*/
public interface LiveCounter {
public interface LiveCounter extends LiveCounterChange {

/**
* Increments the value of the counter by 1.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.ably.lib.objects.type.counter;

import io.ably.lib.objects.ObjectsSubscription;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;

/**
* Provides methods to subscribe to real-time updates on LiveCounter objects.
* Enables clients to receive notifications when counter values change due to
* operations performed by any client connected to the same channel.
*/
public interface LiveCounterChange {

/**
* Subscribes to real-time updates on this LiveCounter object.
* Multiple listeners can be subscribed to the same object independently.
* Spec: RTLO4b
*
* @param listener the listener to be notified of counter updates
* @return an ObjectsSubscription for managing this specific listener
*/
@NonBlocking
@NotNull ObjectsSubscription subscribe(@NotNull Listener listener);

/**
* Unsubscribes a specific listener from receiving updates.
* Has no effect if the listener is not currently subscribed.
* Spec: RTLO4c
*
* @param listener the listener to be unsubscribed
*/
@NonBlocking
void unsubscribe(@NotNull Listener listener);

/**
* Unsubscribes all listeners from receiving updates.
* No notifications will be delivered until new listeners are subscribed.
* Spec: RTLO4d
*/
@NonBlocking
void unsubscribeAll();

/**
* Listener interface for receiving LiveCounter updates.
* Spec: RTLO4b3
*/
interface Listener {
/**
* Called when the LiveCounter has been updated.
* Should execute quickly as it's called from the real-time processing thread.
*
* @param update details about the counter change
*/
void onUpdated(@NotNull LiveCounterUpdate update);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package io.ably.lib.objects.type.counter;

import io.ably.lib.objects.type.LiveObjectUpdate;
import org.jetbrains.annotations.NotNull;

/**
* Represents an update that occurred on a LiveCounter object.
* Contains information about counter value changes from increment/decrement operations.
* Updates can represent positive changes (increments) or negative changes (decrements).
*
* @spec RTLC11, RTLC11a - LiveCounter update structure and behavior
*/
public class LiveCounterUpdate extends LiveObjectUpdate {

/**
* Creates a no-op LiveCounterUpdate representing no actual change.
*/
public LiveCounterUpdate() {
super(null);
}

/**
* Creates a LiveCounterUpdate with the specified amount change.
*
* @param amount the amount by which the counter changed (positive = increment, negative = decrement)
*/
public LiveCounterUpdate(@NotNull Double amount) {
super(new Update(amount));
}

/**
* Gets the update information containing the amount of change.
*
* @return the Update object with the counter modification amount
*/
@NotNull
public LiveCounterUpdate.Update getUpdate() {
return (Update) update;
}

/**
* Returns a string representation of this LiveCounterUpdate.
*
* @return a string showing the amount of change to the counter
*/
@Override
public String toString() {
if (update == null) {
return "LiveCounterUpdate{no change}";
}
return "LiveCounterUpdate{amount=" + getUpdate().getAmount() + "}";
}

/**
* Contains the specific details of a counter update operation.
*
* @spec RTLC11b, RTLC11b1 - Counter update data structure
*/
public static class Update {
private final @NotNull Double amount;

/**
* Creates an Update with the specified amount.
*
* @param amount the counter change amount (positive = increment, negative = decrement)
*/
public Update(@NotNull Double amount) {
this.amount = amount;
}

/**
* Gets the amount by which the counter value was modified.
*
* @return the change amount (positive for increments, negative for decrements)
*/
public @NotNull Double getAmount() {
return amount;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.ably.lib.objects;
package io.ably.lib.objects.type.map;

import io.ably.lib.objects.ObjectsCallback;
import org.jetbrains.annotations.Blocking;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.Contract;
Expand All @@ -13,7 +14,7 @@
* The LiveMap interface provides methods to interact with a live, real-time map structure.
* It supports both synchronous and asynchronous operations for managing key-value pairs.
*/
public interface LiveMap {
public interface LiveMap extends LiveMapChange {

/**
* Retrieves the value associated with the specified key.
Expand Down
56 changes: 56 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/type/map/LiveMapChange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.ably.lib.objects.type.map;

import io.ably.lib.objects.ObjectsSubscription;
import org.jetbrains.annotations.NonBlocking;
import org.jetbrains.annotations.NotNull;

/**
* Provides methods to subscribe to real-time updates on LiveMap objects.
* Enables clients to receive notifications when map entries are added, updated, or removed.
* Uses last-write-wins conflict resolution when multiple clients modify the same key.
*/
public interface LiveMapChange {

/**
* Subscribes to real-time updates on this LiveMap object.
* Multiple listeners can be subscribed to the same object independently.
* Spec: RTLO4b
*
* @param listener the listener to be notified of map updates
* @return an ObjectsSubscription for managing this specific listener
*/
@NonBlocking
@NotNull ObjectsSubscription subscribe(@NotNull Listener listener);

/**
* Unsubscribes a specific listener from receiving updates.
* Has no effect if the listener is not currently subscribed.
* Spec: RTLO4c
*
* @param listener the listener to be unsubscribed
*/
@NonBlocking
void unsubscribe(@NotNull Listener listener);

/**
* Unsubscribes all listeners from receiving updates.
* No notifications will be delivered until new listeners are subscribed.
* Spec: RTLO4d
*/
@NonBlocking
void unsubscribeAll();

/**
* Listener interface for receiving LiveMap updates.
* Spec: RTLO4b3
*/
interface Listener {
/**
* Called when the LiveMap has been updated.
* Should execute quickly as it's called from the real-time processing thread.
*
* @param update details about which keys were modified and how
*/
void onUpdated(@NotNull LiveMapUpdate update);
}
}
66 changes: 66 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/type/map/LiveMapUpdate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.ably.lib.objects.type.map;

import io.ably.lib.objects.type.LiveObjectUpdate;
import org.jetbrains.annotations.NotNull;

import java.util.Map;

/**
* Represents an update that occurred on a LiveMap object.
* Contains information about which keys were modified and whether they were updated or removed.
*
* @spec RTLM18, RTLM18a - LiveMap update structure and behavior
*/
public class LiveMapUpdate extends LiveObjectUpdate {

/**
* Creates a no-op LiveMapUpdate representing no actual change.
*/
public LiveMapUpdate() {
super(null);
}

/**
* Creates a LiveMapUpdate with the specified key changes.
*
* @param update map of key names to their change types (UPDATED or REMOVED)
*/
public LiveMapUpdate(@NotNull Map<String, Change> update) {
super(update);
}

/**
* Gets the map of key changes that occurred in this update.
*
* @return map of key names to their change types
*/
@NotNull
public Map<String, Change> getUpdate() {
return (Map<String, Change>) update;
}

/**
* Returns a string representation of this LiveMapUpdate.
*
* @return a string showing the map key changes in this update
*/
@Override
public String toString() {
if (update == null) {
return "LiveMapUpdate{no change}";
}
return "LiveMapUpdate{changes=" + getUpdate() + "}";
}

/**
* Indicates the type of change that occurred to a map key.
*
* @spec RTLM18b - Map change types
*/
public enum Change {
/** The key was added or its value was modified */
UPDATED,
/** The key was removed from the map */
REMOVED
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package io.ably.lib.objects

import io.ably.lib.objects.state.ObjectsStateChange
import io.ably.lib.objects.state.ObjectsStateEvent
import io.ably.lib.objects.type.counter.LiveCounter
import io.ably.lib.objects.type.map.LiveMap
import io.ably.lib.realtime.ChannelState
import io.ably.lib.types.AblyException
import io.ably.lib.types.ProtocolMessage
Expand Down Expand Up @@ -125,7 +127,7 @@ internal class DefaultLiveObjects(internal val channelName: String, internal val
* @spec OM2 - Populates missing fields from parent protocol message
*/
private fun initializeHandlerForIncomingObjectMessages(): Job {
return sequentialScope.launch {
return sequentialScope.launch {
objectsEventBus.collect { protocolMessage ->
// OM2 - Populate missing fields from parent
val objects = protocolMessage.state.filterIsInstance<ObjectMessage>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ internal fun LiveObjectsAdapter.setChannelSerial(channelName: String, protocolMe
setChannelSerial(channelName, channelSerial)
}

// Spec: RTLO4b1, RTLO4b2
internal fun LiveObjectsAdapter.throwIfInvalidAccessApiConfiguration(channelName: String) {
throwIfMissingChannelMode(channelName, ChannelMode.object_subscribe)
throwIfInChannelState(channelName, arrayOf(ChannelState.detached, ChannelState.failed))
Expand Down
Loading
Loading