diff --git a/.gitmodules b/.gitmodules
index ea3d64e15..e69de29bb 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,3 +0,0 @@
-[submodule "lib/src/test/resources/ably-common"]
- path = lib/src/test/resources/ably-common
- url = https://github.com/ably/ably-common.git
diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
index 16470f1d6..a5144f3fc 100644
--- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
+++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java
@@ -15,6 +15,7 @@
import io.ably.lib.http.HttpUtils;
import io.ably.lib.objects.LiveObjects;
import io.ably.lib.objects.LiveObjectsPlugin;
+import io.ably.lib.rest.RestAnnotations;
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.transport.ConnectionManager.QueuedMessage;
import io.ably.lib.transport.Defaults;
@@ -105,6 +106,8 @@ public LiveObjects getObjects() throws AblyException {
return liveObjectsPlugin.getInstance(name);
}
+ public final RealtimeAnnotations annotations;
+
/***
* internal
*
@@ -887,7 +890,7 @@ private void onMessage(final ProtocolMessage protocolMessage) {
if(msg.createdAt == null && msg.action == MessageAction.MESSAGE_CREATE) msg.createdAt = msg.timestamp;
try {
- msg.decode(options, decodingContext);
+ if (msg.data != null) msg.decode(options, decodingContext);
} catch (MessageDecodeException e) {
if (e.errorInfo.code == 40018) {
Log.e(TAG, String.format(Locale.ROOT, "Delta message decode failure - %s. Message id = %s, channel = %s", e.errorInfo.message, msg.id, name));
@@ -1310,6 +1313,10 @@ else if(stateChange.current.equals(failureState)) {
state = ChannelState.initialized;
this.decodingContext = new DecodingContext();
this.liveObjectsPlugin = liveObjectsPlugin;
+ this.annotations = new RealtimeAnnotations(
+ this,
+ new RestAnnotations(name, ably.http, ably.options, options)
+ );
}
void onChannelMessage(ProtocolMessage msg) {
@@ -1376,6 +1383,9 @@ void onChannelMessage(ProtocolMessage msg) {
case error:
setFailed(msg.error);
break;
+ case annotation:
+ annotations.onAnnotation(msg);
+ break;
default:
Log.e(TAG, "onChannelMessage(): Unexpected message action (" + msg.action + ")");
}
@@ -1402,6 +1412,17 @@ public void once(ChannelState state, ChannelStateListener listener) {
super.once(state.getChannelEvent(), listener);
}
+ /**
+ * (Internal) Sends a protocol message and provides a callback for completion.
+ *
+ * @param protocolMessage the protocol message to be sent
+ * @param listener the listener to be notified upon completion of the message delivery
+ */
+ public void sendProtocolMessage(ProtocolMessage protocolMessage, CompletionListener listener) throws AblyException {
+ ConnectionManager connectionManager = ably.connection.connectionManager;
+ connectionManager.send(protocolMessage, ably.options.queueMessages, listener);
+ }
+
private static final String TAG = Channel.class.getName();
final AblyRealtime ably;
final String basePath;
diff --git a/lib/src/main/java/io/ably/lib/realtime/RealtimeAnnotations.java b/lib/src/main/java/io/ably/lib/realtime/RealtimeAnnotations.java
new file mode 100644
index 000000000..2be2b36c2
--- /dev/null
+++ b/lib/src/main/java/io/ably/lib/realtime/RealtimeAnnotations.java
@@ -0,0 +1,390 @@
+package io.ably.lib.realtime;
+
+import io.ably.lib.rest.RestAnnotations;
+import io.ably.lib.types.AblyException;
+import io.ably.lib.types.Annotation;
+import io.ably.lib.types.AnnotationAction;
+import io.ably.lib.types.AsyncPaginatedResult;
+import io.ably.lib.types.Callback;
+import io.ably.lib.types.ErrorInfo;
+import io.ably.lib.types.Message;
+import io.ably.lib.types.MessageDecodeException;
+import io.ably.lib.types.PaginatedResult;
+import io.ably.lib.types.Param;
+import io.ably.lib.types.ProtocolMessage;
+import io.ably.lib.util.Log;
+import io.ably.lib.util.Multicaster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * RealtimeAnnotation provides subscription capabilities for annotations received on a channel.
+ * It allows adding or removing listeners to handle annotation events and facilitates broadcasting
+ * those events to the appropriate listeners.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ */
+public class RealtimeAnnotations {
+
+ private static final String TAG = RealtimeAnnotations.class.getName();
+
+ private final ChannelBase channel;
+ private final RestAnnotations restAnnotations;
+ private final AnnotationMulticaster listeners = new AnnotationMulticaster();
+ private final Map typeListeners = new HashMap<>();
+
+ public RealtimeAnnotations(ChannelBase channel, RestAnnotations restAnnotations) {
+ this.channel = channel;
+ this.restAnnotations = restAnnotations;
+ }
+
+ /**
+ * Publishes an annotation to the specified channel with the given message serial.
+ * Validates and encodes the annotation before sending it as a protocol message.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message to be annotated
+ * @param annotation the annotation object associated with the message
+ * @param listener the completion listener to handle success or failure during the publish process
+ * @throws AblyException if an error occurs during validation, encoding, or sending the annotation
+ */
+ public void publish(String messageSerial, Annotation annotation, CompletionListener listener) throws AblyException {
+ Log.v(TAG, String.format("publish(MsgSerial, Annotation); channel = %s", channel.name));
+ validateMessageSerial(messageSerial);
+ // (RSAN1, RSAN1c2)
+ annotation.action = AnnotationAction.ANNOTATION_CREATE;
+ sendAnnotation(messageSerial, annotation, listener);
+ }
+
+ /**
+ * See {@link #publish(String, Annotation, CompletionListener)}
+ */
+ public void publish(Message message, Annotation annotation, CompletionListener listener) throws AblyException {
+ publish(message.serial, annotation, listener);
+ }
+
+ /**
+ * Publishes an annotation to the specified channel with the given message serial.
+ * Validates and encodes the annotation before sending it as a protocol message.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message to be annotated
+ * @param annotation the annotation object associated with the message
+ * @throws AblyException if an error occurs during validation, encoding, or sending the annotation
+ */
+ public void publish(String messageSerial, Annotation annotation) throws AblyException {
+ publish(messageSerial, annotation, null);
+ }
+
+ /**
+ * See {@link #publish(String, Annotation)}
+ */
+ public void publish(Message message, Annotation annotation) throws AblyException {
+ publish(message.serial, annotation);
+ }
+
+ private void sendAnnotation(String messageSerial, Annotation annotation, CompletionListener listener) throws AblyException {
+ // (RSAN1, RSAN1a3)
+ if (annotation.type == null) {
+ throw AblyException.fromErrorInfo(new ErrorInfo("Annotation type must be specified", 400, 40000));
+ }
+
+ // (RSAN1, RSAN1c1)
+ annotation.messageSerial = messageSerial;
+
+ try {
+ // (RSAN1, RSAN1c3)
+ annotation.encode(channel.options);
+ } catch (MessageDecodeException e) {
+ throw AblyException.fromThrowable(e);
+ }
+
+ Log.v(TAG, String.format("RealtimeAnnotations.sendAnnotation(): channelName = %s, sending annotation with messageSerial = %s, type = %s, action = %s",
+ channel.name, messageSerial, annotation.type, annotation.action.name()));
+
+ ProtocolMessage protocolMessage = new ProtocolMessage();
+ protocolMessage.action = ProtocolMessage.Action.annotation;
+ protocolMessage.channel = channel.name;
+ protocolMessage.annotations = new Annotation[]{annotation};
+
+ channel.sendProtocolMessage(protocolMessage, listener);
+ }
+
+ /**
+ * Deletes an annotation associated with the specified message serial.
+ * Sets the annotation action to `ANNOTATION_DELETE` and publishes the
+ * update to the channel with the given completion listener.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated
+ * @param annotation the annotation object to be deleted
+ * @param listener the completion listener to handle success or failure during the deletion process
+ * @throws AblyException if an error occurs during the deletion or publishing process
+ */
+ public void delete(String messageSerial, Annotation annotation, CompletionListener listener) throws AblyException {
+ Log.v(TAG, String.format("delete(MsgSerial, Annotation); channel = %s", channel.name));
+ annotation.action = AnnotationAction.ANNOTATION_DELETE;
+ sendAnnotation(messageSerial, annotation, listener);
+ }
+
+ /**
+ * See {@link #delete(String, Annotation, CompletionListener)}
+ */
+ public void delete(Message message, Annotation annotation, CompletionListener listener) throws AblyException {
+ delete(message.serial, annotation, listener);
+ }
+
+ public void delete(String messageSerial, Annotation annotation) throws AblyException {
+ delete(messageSerial, annotation, null);
+ }
+
+ /**
+ * See {@link #delete(String, Annotation)}
+ */
+ public void delete(Message message, Annotation annotation) throws AblyException {
+ delete(message.serial, annotation);
+ }
+
+ /**
+ * Retrieves a paginated list of annotations associated with the specified message serial.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated.
+ * @param params an array of query parameters for filtering or modifying the request.
+ * @return a {@link PaginatedResult} containing the matching annotations.
+ * @throws AblyException if an error occurs during the retrieval process.
+ */
+ public PaginatedResult get(String messageSerial, Param[] params) throws AblyException {
+ return restAnnotations.get(messageSerial, params);
+ }
+
+ /**
+ * See {@link #get(String, Param[])}
+ */
+ public PaginatedResult get(Message message, Param[] params) throws AblyException {
+ return get(message.serial, params);
+ }
+
+ /**
+ * Retrieves a paginated list of annotations associated with the specified message serial.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated
+ * @return a PaginatedResult containing the matching annotations
+ * @throws AblyException if an error occurs during the retrieval process
+ */
+ public PaginatedResult get(String messageSerial) throws AblyException {
+ return restAnnotations.get(messageSerial, null);
+ }
+
+ /**
+ * See {@link #get(String)}
+ */
+ public PaginatedResult get(Message message) throws AblyException {
+ return get(message.serial);
+ }
+
+ /**
+ * Asynchronously retrieves a paginated list of annotations associated with the specified message serial.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated.
+ * @param params an array of query parameters for filtering or modifying the request.
+ * @param callback a callback to handle the result asynchronously, providing an {@link AsyncPaginatedResult} containing the matching annotations.
+ */
+ public void getAsync(String messageSerial, Param[] params, Callback> callback) throws AblyException {
+ restAnnotations.getAsync(messageSerial, params, callback);
+ }
+
+ /**
+ * See {@link #getAsync(String, Param[], Callback)}
+ */
+ public void getAsync(Message message, Param[] params, Callback> callback) throws AblyException {
+ getAsync(message.serial, params, callback);
+ }
+
+ /**
+ * Asynchronously retrieves a paginated list of annotations associated with the specified message serial.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated.
+ * @param callback a callback to handle the result asynchronously, providing an {@link AsyncPaginatedResult} containing the matching annotations.
+ */
+ public void getAsync(String messageSerial, Callback> callback) throws AblyException {
+ restAnnotations.getAsync(messageSerial, null, callback);
+ }
+
+ /**
+ * See {@link #getAsync(String, Callback)}
+ */
+ public void getAsync(Message message, Callback> callback) throws AblyException {
+ getAsync(message.serial, callback);
+ }
+
+ /**
+ * Subscribes the given {@link AnnotationListener} to the channel, allowing it to receive annotations.
+ * If the channel's attach on subscribe option is enabled, the channel is attached automatically.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param listener the listener to be subscribed to the channel
+ * @throws AblyException if an error occurs during channel attachment
+ */
+ public synchronized void subscribe(AnnotationListener listener) throws AblyException {
+ Log.v(TAG, String.format("subscribe(); annotations in channel = %s", channel.name));
+ listeners.add(listener);
+ if (channel.attachOnSubscribeEnabled()) {
+ channel.attach();
+ }
+ }
+
+ /**
+ * Unsubscribes the specified {@link AnnotationListener} from the channel, stopping it
+ * from receiving further annotations. Any corresponding type-specific listeners
+ * associated with the listener are also removed.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param listener the {@link AnnotationListener} to be unsubscribed
+ */
+ public synchronized void unsubscribe(AnnotationListener listener) {
+ Log.v(TAG, String.format("unsubscribe(); annotations in channel = %s", channel.name));
+ listeners.remove(listener);
+ for (AnnotationMulticaster multicaster : typeListeners.values()) {
+ multicaster.remove(listener);
+ }
+ }
+
+ /**
+ * Subscribes the given {@link AnnotationListener} to the channel for a specific annotation type,
+ * allowing it to receive annotations of the specified type. If the channel's attach on subscribe
+ * option is enabled, the channel is attached automatically.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param type the specific annotation type to subscribe to; if null, subscribes to all types
+ * @param listener the {@link AnnotationListener} to be subscribed
+ */
+ public synchronized void subscribe(String type, AnnotationListener listener) throws AblyException {
+ Log.v(TAG, String.format("subscribe(); annotations in channel = %s; single type = %s", channel.name, type));
+ subscribeImpl(type, listener);
+ if (channel.attachOnSubscribeEnabled()) {
+ channel.attach();
+ }
+ }
+
+ /**
+ * Unsubscribes the specified {@link AnnotationListener} from receiving annotations
+ * of a particular type within the channel. If there are no remaining listeners
+ * for the specified type, the type-specific listener collection is also removed.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param type the specific annotation type to unsubscribe from; if null, unsubscribes
+ * from all annotations associated with the listener
+ * @param listener the {@link AnnotationListener} to be unsubscribed
+ */
+ public synchronized void unsubscribe(String type, AnnotationListener listener) {
+ Log.v(TAG, String.format("unsubscribe(); annotations in channel = %s; single type = %s", channel.name, type));
+ unsubscribeImpl(type, listener);
+ }
+
+ /**
+ * Internal method. Handles incoming annotation messages from the protocol layer.
+ *
+ * @param protocolMessage the protocol message containing annotation data
+ */
+ public void onAnnotation(ProtocolMessage protocolMessage) {
+ List annotations = new ArrayList<>();
+ for (int i = 0; i < protocolMessage.annotations.length; i++) {
+ Annotation annotation = protocolMessage.annotations[i];
+ try {
+ if (annotation.data != null) annotation.decode(channel.options);
+ } catch (MessageDecodeException e) {
+ Log.e(TAG, String.format(Locale.ROOT, "%s on channel %s", e.errorInfo.message, channel.name));
+ }
+ /* populate fields derived from protocol message */
+ if (annotation.connectionId == null) annotation.connectionId = protocolMessage.connectionId;
+ if (annotation.timestamp == 0) annotation.timestamp = protocolMessage.timestamp;
+ if (annotation.id == null) annotation.id = protocolMessage.id + ':' + i;
+ annotations.add(annotation);
+ }
+ broadcastAnnotation(annotations);
+ }
+
+ private void validateMessageSerial(String messageSerial) throws AblyException {
+ if (messageSerial == null) throw AblyException.fromErrorInfo(
+ new ErrorInfo("Message serial can not be empty", 400, 40003)
+ );
+ }
+
+ private void broadcastAnnotation(List annotations) {
+ for (Annotation annotation : annotations) {
+ listeners.onAnnotation(annotation);
+
+ String type = annotation.type != null ? annotation.type : "";
+ AnnotationMulticaster eventListener = typeListeners.get(type);
+ if (eventListener != null) eventListener.onAnnotation(annotation);
+ }
+ }
+
+ private void subscribeImpl(String type, AnnotationListener listener) {
+ String annotationType = type != null ? type : "";
+ AnnotationMulticaster typeSpecificListeners = typeListeners.get(annotationType);
+ if (typeSpecificListeners == null) {
+ typeSpecificListeners = new AnnotationMulticaster();
+ typeListeners.put(annotationType, typeSpecificListeners);
+ }
+ typeSpecificListeners.add(listener);
+ }
+
+ private void unsubscribeImpl(String type, AnnotationListener listener) {
+ AnnotationMulticaster listeners = typeListeners.get(type);
+ if (listeners != null) {
+ listeners.remove(listener);
+ if (listeners.isEmpty()) {
+ typeListeners.remove(type);
+ }
+ }
+ }
+
+ public interface AnnotationListener {
+ void onAnnotation(Annotation annotation);
+ }
+
+ private static class AnnotationMulticaster extends Multicaster implements AnnotationListener {
+ @Override
+ public void onAnnotation(Annotation annotation) {
+ for (final AnnotationListener member : getMembers()) {
+ try {
+ member.onAnnotation(annotation);
+ } catch (Exception e) {
+ Log.e(TAG, e.getMessage(), e);
+ }
+ }
+ }
+ }
+}
diff --git a/lib/src/main/java/io/ably/lib/rest/ChannelBase.java b/lib/src/main/java/io/ably/lib/rest/ChannelBase.java
index a4c81a34d..11958e7b4 100644
--- a/lib/src/main/java/io/ably/lib/rest/ChannelBase.java
+++ b/lib/src/main/java/io/ably/lib/rest/ChannelBase.java
@@ -38,6 +38,13 @@ public class ChannelBase {
*/
public final Presence presence;
+ /**
+ * Represents the annotations associated with a channel message.
+ * This field provides functionality for managing annotations.
+ */
+ public final RestAnnotations annotations;
+
+
/**
* Publish a message on this channel using the REST API.
* Since the REST API is stateless, this request is made independently
@@ -315,6 +322,7 @@ private BasePaginatedQuery.ResultRequest historyImpl(Http http,
this.options = options;
this.basePath = "/channels/" + HttpUtils.encodeURIComponent(name);
this.presence = new Presence();
+ this.annotations = new RestAnnotations(name, ably.http, ably.options, options);
}
private final AblyBase ably;
diff --git a/lib/src/main/java/io/ably/lib/rest/RestAnnotations.java b/lib/src/main/java/io/ably/lib/rest/RestAnnotations.java
new file mode 100644
index 000000000..7c0931ac4
--- /dev/null
+++ b/lib/src/main/java/io/ably/lib/rest/RestAnnotations.java
@@ -0,0 +1,288 @@
+package io.ably.lib.rest;
+
+import io.ably.lib.http.BasePaginatedQuery;
+import io.ably.lib.http.Http;
+import io.ably.lib.http.HttpCore;
+import io.ably.lib.http.HttpUtils;
+import io.ably.lib.types.AblyException;
+import io.ably.lib.types.Annotation;
+import io.ably.lib.types.AnnotationAction;
+import io.ably.lib.types.AnnotationSerializer;
+import io.ably.lib.types.AsyncPaginatedResult;
+import io.ably.lib.types.Callback;
+import io.ably.lib.types.ChannelOptions;
+import io.ably.lib.types.ClientOptions;
+import io.ably.lib.types.ErrorInfo;
+import io.ably.lib.types.Message;
+import io.ably.lib.types.MessageDecodeException;
+import io.ably.lib.types.PaginatedResult;
+import io.ably.lib.types.Param;
+import io.ably.lib.util.Crypto;
+import io.ably.lib.util.Log;
+
+import java.util.Arrays;
+
+/**
+ * The RestAnnotation class provides methods to manage and interact with annotations
+ * associated with messages in a specific channel.
+ *
+ * Annotations can be retrieved, published, or deleted both synchronously and asynchronously.
+ * This class is intended as part of a client library for managing annotations via REST architecture.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ */
+public class RestAnnotations {
+
+ private static final String TAG = RestAnnotations.class.getName();
+
+ private final String channelName;
+ private final Http http;
+ private final ClientOptions clientOptions;
+ private final ChannelOptions channelOptions;
+
+ public RestAnnotations(String channelName, Http http, ClientOptions clientOptions, ChannelOptions channelOptions) {
+ this.channelName = channelName;
+ this.http = http;
+ this.clientOptions = clientOptions;
+ this.channelOptions = channelOptions;
+ }
+
+ /**
+ * Retrieves a paginated list of annotations associated with the specified message serial.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated.
+ * @param params an array of query parameters for filtering or modifying the request.
+ * @return a {@link PaginatedResult} containing the matching annotations.
+ * @throws AblyException if an error occurs during the retrieval process.
+ */
+ public PaginatedResult get(String messageSerial, Param[] params) throws AblyException {
+ validateMessageSerial(messageSerial);
+ return getImpl(messageSerial, params).sync();
+ }
+
+ /**
+ * @see #get(String, Param[])
+ */
+ public PaginatedResult get(Message message, Param[] params) throws AblyException {
+ return get(message.serial, params);
+ }
+
+ /**
+ * Asynchronously retrieves a paginated list of annotations associated with the specified message serial.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated.
+ * @param params an array of query parameters for filtering or modifying the request.
+ * @param callback a callback to handle the result asynchronously, providing an {@link AsyncPaginatedResult} containing the matching annotations.
+ */
+ public void getAsync(String messageSerial, Param[] params, Callback> callback) throws AblyException {
+ validateMessageSerial(messageSerial);
+ getImpl(messageSerial, params).async(callback);
+ }
+
+ /**
+ * @see #getAsync(String, Param[], Callback)
+ */
+ public void getAsync(Message message, Param[] params, Callback> callback) throws AblyException {
+ getAsync(message.serial, params, callback);
+ }
+
+ /**
+ * Retrieves a paginated list of annotations associated with the specified message serial.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated
+ * @return a PaginatedResult containing the matching annotations
+ * @throws AblyException if an error occurs during the retrieval process
+ */
+ public PaginatedResult get(String messageSerial) throws AblyException {
+ return get(messageSerial, null);
+ }
+
+ /**
+ * @see #get(String)
+ */
+ public PaginatedResult get(Message message) throws AblyException {
+ return get(message.serial);
+ }
+
+ /**
+ * Asynchronously retrieves a paginated list of annotations associated with the specified message serial.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated.
+ * @param callback a callback to handle the result asynchronously, providing an {@link AsyncPaginatedResult} containing the matching annotations.
+ */
+ public void getAsync(String messageSerial, Callback> callback) throws AblyException {
+ validateMessageSerial(messageSerial);
+ getImpl(messageSerial, null).async(callback);
+ }
+
+ /**
+ * @see #getAsync(String, Callback)
+ */
+ public void getAsync(Message message, Callback> callback) throws AblyException {
+ getAsync(message.serial, callback);
+ }
+
+ /**
+ * Publishes an annotation associated with the specified message serial
+ * to the REST channel.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated.
+ * @param annotation the annotation to be published.
+ * @throws AblyException if an error occurs during the publishing process.
+ */
+ public void publish(String messageSerial, Annotation annotation) throws AblyException {
+ validateMessageSerial(messageSerial);
+ publishImpl(messageSerial, annotation).sync();
+ }
+
+ /**
+ * @see #publish(String, Annotation)
+ */
+ public void publish(Message message, Annotation annotation) throws AblyException {
+ publish(message.serial, annotation);
+ }
+
+ /**
+ * Asynchronously publishes an annotation associated with the specified message serial
+ * to the REST channel.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated.
+ * @param annotation the annotation to be published.
+ * @param callback a callback to handle the result asynchronously, providing a
+ * completion indication or error information.
+ */
+ public void publishAsync(String messageSerial, Annotation annotation, Callback callback) throws AblyException {
+ validateMessageSerial(messageSerial);
+ publishImpl(messageSerial, annotation).async(callback);
+ }
+
+ /**
+ * @see #publishAsync(String, Annotation, Callback)
+ */
+ public void publishAsync(Message message, Annotation annotation, Callback callback) throws AblyException {
+ publishAsync(message.serial, annotation, callback);
+ }
+
+ /**
+ * Deletes an annotation associated with the specified message serial.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated.
+ * @param annotation the annotation to be deleted.
+ * @throws AblyException if an error occurs during the deletion process.
+ */
+ public void delete(String messageSerial, Annotation annotation) throws AblyException {
+ validateMessageSerial(messageSerial);
+ deleteImpl(messageSerial, annotation).sync();
+ }
+
+ /**
+ * @see #delete(String, Annotation)
+ */
+ public void delete(Message message, Annotation annotation) throws AblyException {
+ delete(message.serial, annotation);
+ }
+
+ /**
+ * Asynchronously deletes an annotation associated with the specified message serial.
+ *
+ * Note: This is an experimental API. While the underlying functionality is stable,
+ * the public API may change in future releases.
+ *
+ * @param messageSerial the unique serial identifier for the message being annotated.
+ * @param annotation the annotation to be deleted.
+ * @param callback a callback to handle the result asynchronously, providing a completion
+ * indication or error information.
+ */
+ public void deleteAsync(String messageSerial, Annotation annotation, Callback callback) throws AblyException {
+ validateMessageSerial(messageSerial);
+ deleteImpl(messageSerial, annotation).async(callback);
+ }
+
+ /**
+ * @see #deleteAsync(String, Annotation, Callback)
+ */
+ public void deleteAsync(Message message, Annotation annotation, Callback callback) throws AblyException {
+ deleteAsync(message.serial, annotation, callback);
+ }
+
+ private void validateMessageSerial(String messageSerial) throws AblyException {
+ if (messageSerial == null) throw AblyException.fromErrorInfo(
+ new ErrorInfo("Message serial can not be empty", 400, 40003)
+ );
+ }
+
+ private String getBasePath(String messageSerial) {
+ return "/channels/" + HttpUtils.encodeURIComponent(channelName) + "/messages/" + HttpUtils.encodeURIComponent(messageSerial) + "/annotations";
+ }
+
+ private Http.Request deleteImpl(String messageSerial, Annotation annotation) throws AblyException {
+ Log.v(TAG, "delete(): annotation=" + annotation);
+ annotation.action = AnnotationAction.ANNOTATION_DELETE;
+ return sendAnnotationImpl(messageSerial, annotation);
+ }
+
+ private Http.Request publishImpl(String messageSerial, Annotation annotation) throws AblyException {
+ Log.v(TAG, "publish(): annotation=" + annotation);
+ // (RSAN1c2)
+ annotation.action = AnnotationAction.ANNOTATION_CREATE;
+ return sendAnnotationImpl(messageSerial, annotation);
+ }
+
+ private Http.Request sendAnnotationImpl(String messageSerial, Annotation annotation) throws AblyException {
+ // (RSAN1a3)
+ if (annotation.type == null) {
+ throw AblyException.fromErrorInfo(new ErrorInfo("Annotation type must be specified", 400, 40000));
+ }
+
+ // (RSAN1c1)
+ annotation.messageSerial = messageSerial;
+
+ try {
+ // (RSAN1c3)
+ annotation.encode(channelOptions);
+ } catch (MessageDecodeException e) {
+ throw AblyException.fromThrowable(e);
+ }
+
+ // (RSAN1c4)
+ if (annotation.id == null && clientOptions.idempotentRestPublishing) {
+ annotation.id = Crypto.getRandomId();
+ }
+
+ return http.request((http, callback) -> {
+ Annotation[] annotations = new Annotation[] { annotation };
+ HttpCore.RequestBody requestBody = clientOptions.useBinaryProtocol ? AnnotationSerializer.asMsgpackRequest(annotations) : AnnotationSerializer.asJsonRequest(annotations);
+ final Param[] params = clientOptions.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; // RSC7c
+ http.post(getBasePath(messageSerial), HttpUtils.defaultAcceptHeaders(clientOptions.useBinaryProtocol), params, requestBody, null, true, callback);
+ });
+ }
+
+ private BasePaginatedQuery.ResultRequest getImpl(String messageSerial, Param[] initialParams) {
+ Log.v(TAG, "getImpl(): params=" + Arrays.toString(initialParams));
+ HttpCore.BodyHandler bodyHandler = AnnotationSerializer.getAnnotationResponseHandler(channelOptions);
+ final Param[] params = clientOptions.addRequestIds ? Param.set(initialParams, Crypto.generateRandomRequestId()) : initialParams; // RSC7c
+ return (new BasePaginatedQuery<>(http, getBasePath(messageSerial), HttpUtils.defaultAcceptHeaders(clientOptions.useBinaryProtocol), params, bodyHandler)).get();
+ }
+}
diff --git a/lib/src/main/java/io/ably/lib/types/Annotation.java b/lib/src/main/java/io/ably/lib/types/Annotation.java
new file mode 100644
index 000000000..ce57e1590
--- /dev/null
+++ b/lib/src/main/java/io/ably/lib/types/Annotation.java
@@ -0,0 +1,248 @@
+package io.ably.lib.types;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import io.ably.lib.util.Log;
+import io.ably.lib.util.Serialisation;
+import org.msgpack.core.MessageFormat;
+import org.msgpack.core.MessagePacker;
+import org.msgpack.core.MessageUnpacker;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+
+public class Annotation extends BaseMessage {
+
+ private static final String TAG = Annotation.class.getName();
+
+ private static final String ACTION = "action";
+ private static final String SERIAL = "serial";
+ private static final String MESSAGE_SERIAL = "messageSerial";
+ private static final String TYPE = "type";
+ private static final String NAME = "name";
+ private static final String COUNT = "count";
+ private static final String EXTRAS = "extras";
+
+ /**
+ * (TAN2b) The action, whether this is an annotation being added or removed,
+ * one of the AnnotationAction enum values.
+ */
+ public AnnotationAction action;
+
+ /**
+ * (TAN2i) This annotation's unique serial (lexicographically totally ordered).
+ */
+ public String serial;
+
+ /**
+ * (TAN2j) The serial of the message (of type `MESSAGE_CREATE`) that this annotation is annotating.
+ */
+ public String messageSerial;
+
+ /**
+ * (TAN2k) The type of annotation it is, typically some identifier together with an aggregation method;
+ * for example: "emoji:distinct.v1". Handled opaquely by the SDK and validated serverside. |
+ */
+ public String type;
+
+ /**
+ * (TAN2d) The name of this annotation. This is the field that most annotation aggregations will operate on.
+ * For example, using "distinct.v1" aggregation (specified in the type), the message summary will show a list
+ * of clients who have published an annotation with each distinct annotation.name.
+ */
+ public String name;
+
+ /**
+ * (TAN2e) An optional count, only relevant to certain aggregation methods,
+ * see aggregation methods documentation for more info.
+ */
+ public Integer count;
+
+ /**
+ * (TAN2l) A JSON object for metadata and/or ancillary payloads.
+ */
+ public MessageExtras extras;
+
+ public static Annotation fromMsgpack(MessageUnpacker unpacker) throws IOException {
+ return (new Annotation()).readMsgpack(unpacker);
+ }
+
+ void writeMsgpack(MessagePacker packer) throws IOException {
+ int fieldCount = super.countFields();
+ if (action != null) ++fieldCount;
+ if (serial != null) ++fieldCount;
+ if (messageSerial != null) ++fieldCount;
+ if (type != null) ++fieldCount;
+ if (name != null) ++fieldCount;
+ if (count != null) ++fieldCount;
+ if (extras != null) ++fieldCount;
+
+ packer.packMapHeader(fieldCount);
+ super.writeFields(packer);
+
+ if (action != null) {
+ packer.packString(ACTION);
+ packer.packInt(action.ordinal());
+ }
+
+ if (serial != null) {
+ packer.packString(SERIAL);
+ packer.packString(serial);
+ }
+
+ if (messageSerial != null) {
+ packer.packString(MESSAGE_SERIAL);
+ packer.packString(messageSerial);
+ }
+
+ if (type != null) {
+ packer.packString(TYPE);
+ packer.packString(type);
+ }
+
+ if (name != null) {
+ packer.packString(NAME);
+ packer.packString(name);
+ }
+
+ if (count != null) {
+ packer.packString(COUNT);
+ packer.packInt(count);
+ }
+
+ if (extras != null) {
+ packer.packString(EXTRAS);
+ extras.write(packer);
+ }
+ }
+
+ Annotation readMsgpack(MessageUnpacker unpacker) throws IOException {
+ int fieldCount = unpacker.unpackMapHeader();
+ for (int i = 0; i < fieldCount; i++) {
+ String fieldName = unpacker.unpackString().intern();
+ MessageFormat fieldFormat = unpacker.getNextFormat();
+ if (fieldFormat.equals(MessageFormat.NIL)) {
+ unpacker.unpackNil();
+ continue;
+ }
+
+ if (super.readField(unpacker, fieldName, fieldFormat)) {
+ continue;
+ }
+ if (fieldName.equals(ACTION)) {
+ action = AnnotationAction.tryFindByOrdinal(unpacker.unpackInt());
+ } else if (fieldName.equals(SERIAL)) {
+ serial = unpacker.unpackString();
+ } else if (fieldName.equals(MESSAGE_SERIAL)) {
+ messageSerial = unpacker.unpackString();
+ } else if (fieldName.equals(TYPE)) {
+ type = unpacker.unpackString();
+ } else if (fieldName.equals(NAME)) {
+ name = unpacker.unpackString();
+ } else if (fieldName.equals(COUNT)) {
+ count = unpacker.unpackInt();
+ } else if (fieldName.equals(EXTRAS)) {
+ extras = MessageExtras.read(unpacker);
+ } else {
+ Log.v(TAG, "Unexpected field: " + fieldName);
+ unpacker.skipValue();
+ }
+ }
+ return this;
+ }
+
+ @Override
+ protected void read(final JsonObject map) throws MessageDecodeException {
+ super.read(map);
+
+ Integer actionOrdinal = readInt(map, ACTION);
+ action = actionOrdinal == null ? null : AnnotationAction.tryFindByOrdinal(actionOrdinal);
+ serial = readString(map, SERIAL);
+ messageSerial = readString(map, MESSAGE_SERIAL);
+
+ type = readString(map, TYPE);
+ name = readString(map, NAME);
+ count = readInt(map, COUNT);
+
+ final JsonElement extrasElement = map.get(EXTRAS);
+ if (extrasElement != null) {
+ if (!extrasElement.isJsonObject()) {
+ throw MessageDecodeException.fromDescription("Message extras is of type \"" + extrasElement.getClass() + "\" when expected a JSON object.");
+ }
+ extras = MessageExtras.read((JsonObject) extrasElement);
+ }
+ }
+
+ public static class Serializer implements JsonSerializer, JsonDeserializer {
+ @Override
+ public JsonElement serialize(Annotation annotation, Type typeOfMessage, JsonSerializationContext ctx) {
+ final JsonObject json = BaseMessage.toJsonObject(annotation);
+ if (annotation.action != null) {
+ json.addProperty(ACTION, annotation.action.ordinal());
+ }
+
+ if (annotation.serial != null) {
+ json.addProperty(SERIAL, annotation.serial);
+ }
+
+ if (annotation.messageSerial != null) {
+ json.addProperty(MESSAGE_SERIAL, annotation.messageSerial);
+ }
+
+ if (annotation.type != null) {
+ json.addProperty(TYPE, annotation.type);
+ }
+
+ if (annotation.name != null) {
+ json.addProperty(NAME, annotation.name);
+ }
+
+ if (annotation.count != null) {
+ json.addProperty(COUNT, annotation.count);
+ }
+
+ if (annotation.extras != null) {
+ json.add(EXTRAS, Serialisation.gson.toJsonTree(annotation.extras));
+ }
+
+ return json;
+ }
+
+ @Override
+ public Annotation deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+ if (!json.isJsonObject()) {
+ throw new JsonParseException("Expected an object but got \"" + json.getClass() + "\".");
+ }
+
+ final Annotation annotation = new Annotation();
+
+ try {
+ annotation.read((JsonObject) json);
+ } catch (MessageDecodeException e) {
+ Log.e(TAG, e.getMessage(), e);
+ throw new JsonParseException("Failed to deserialize Message from JSON.", e);
+ }
+
+ return annotation;
+ }
+ }
+
+ public static class ActionSerializer implements JsonSerializer, JsonDeserializer {
+ @Override
+ public AnnotationAction deserialize(JsonElement json, Type t, JsonDeserializationContext ctx)
+ throws JsonParseException {
+ return AnnotationAction.tryFindByOrdinal(json.getAsInt());
+ }
+
+ @Override
+ public JsonElement serialize(AnnotationAction action, Type t, JsonSerializationContext ctx) {
+ return new JsonPrimitive(action.ordinal());
+ }
+ }
+}
diff --git a/lib/src/main/java/io/ably/lib/types/AnnotationAction.java b/lib/src/main/java/io/ably/lib/types/AnnotationAction.java
new file mode 100644
index 000000000..732cde594
--- /dev/null
+++ b/lib/src/main/java/io/ably/lib/types/AnnotationAction.java
@@ -0,0 +1,19 @@
+package io.ably.lib.types;
+
+/**
+ * Enumerates the possible values of the {@link Annotation#action} field of an {@link Annotation}
+ */
+public enum AnnotationAction {
+ /**
+ * (TAN2b) A created annotation
+ */
+ ANNOTATION_CREATE,
+ /**
+ * (TAN2b) A deleted annotation
+ */
+ ANNOTATION_DELETE;
+
+ static AnnotationAction tryFindByOrdinal(int ordinal) {
+ return values().length <= ordinal ? null: values()[ordinal];
+ }
+}
diff --git a/lib/src/main/java/io/ably/lib/types/AnnotationSerializer.java b/lib/src/main/java/io/ably/lib/types/AnnotationSerializer.java
new file mode 100644
index 000000000..88d7c59ff
--- /dev/null
+++ b/lib/src/main/java/io/ably/lib/types/AnnotationSerializer.java
@@ -0,0 +1,103 @@
+package io.ably.lib.types;
+
+import io.ably.lib.http.HttpCore;
+import io.ably.lib.http.HttpUtils;
+import io.ably.lib.util.Log;
+import io.ably.lib.util.Serialisation;
+import org.msgpack.core.MessagePacker;
+import org.msgpack.core.MessageUnpacker;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class AnnotationSerializer {
+
+ private static final String TAG = AnnotationSerializer.class.getName();
+
+ public static void writeMsgpackArray(Annotation[] annotations, MessagePacker packer) {
+ try {
+ int count = annotations.length;
+ packer.packArrayHeader(count);
+ for (Annotation annotation : annotations) {
+ annotation.writeMsgpack(packer);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public static Annotation[] readMsgpackArray(MessageUnpacker unpacker) throws IOException {
+ int count = unpacker.unpackArrayHeader();
+ Annotation[] result = new Annotation[count];
+ for (int i = 0; i < count; i++)
+ result[i] = Annotation.fromMsgpack(unpacker);
+ return result;
+ }
+
+ public static HttpCore.RequestBody asMsgpackRequest(Annotation[] annotations) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ try {
+ MessagePacker packer = Serialisation.msgpackPackerConfig.newPacker(out);
+ int count = annotations.length;
+ packer.packArrayHeader(count);
+ for (Annotation annotation : annotations) annotation.writeMsgpack(packer);
+ packer.flush();
+ } catch (IOException e) {
+ Log.e(TAG, e.getMessage(), e);
+ }
+ return new HttpUtils.ByteArrayRequestBody(out.toByteArray(), "application/x-msgpack");
+ }
+
+ public static HttpCore.RequestBody asJsonRequest(Annotation[] annotations) {
+ return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(annotations));
+ }
+
+ public static HttpCore.BodyHandler getAnnotationResponseHandler(ChannelOptions channelOptions) {
+ return new AnnotationBodyHandler(channelOptions);
+ }
+
+ public static Annotation[] readMsgpack(byte[] packed) throws AblyException {
+ try {
+ MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(packed);
+ return readMsgpackArray(unpacker);
+ } catch (IOException ioe) {
+ throw AblyException.fromThrowable(ioe);
+ }
+ }
+
+ public static Annotation[] readMessagesFromJson(byte[] packed) throws MessageDecodeException {
+ return Serialisation.gson.fromJson(new String(packed), Annotation[].class);
+ }
+
+ private static class AnnotationBodyHandler implements HttpCore.BodyHandler {
+
+ private final ChannelOptions channelOptions;
+
+ AnnotationBodyHandler(ChannelOptions channelOptions) {
+ this.channelOptions = channelOptions;
+ }
+
+ @Override
+ public Annotation[] handleResponseBody(String contentType, byte[] body) throws AblyException {
+ try {
+ Annotation[] annotations = null;
+ if ("application/json".equals(contentType))
+ annotations = readMessagesFromJson(body);
+ else if ("application/x-msgpack".equals(contentType))
+ annotations = readMsgpack(body);
+ if (annotations != null) {
+ for (Annotation annotation : annotations) {
+ try {
+ if (annotation.data != null) annotation.decode(channelOptions);
+ } catch (MessageDecodeException e) {
+ Log.e(TAG, e.errorInfo.message);
+ }
+ }
+ }
+ return annotations;
+ } catch (MessageDecodeException e) {
+ throw AblyException.fromThrowable(e);
+ }
+ }
+ }
+}
diff --git a/lib/src/main/java/io/ably/lib/types/Message.java b/lib/src/main/java/io/ably/lib/types/Message.java
index 9fd71cb39..06e9c17c1 100644
--- a/lib/src/main/java/io/ably/lib/types/Message.java
+++ b/lib/src/main/java/io/ably/lib/types/Message.java
@@ -89,6 +89,14 @@ public class Message extends BaseMessage {
*/
public Operation operation;
+ /**
+ * (TM2q) A summary of all the annotations that have been made to the message, whose keys are the `type` fields
+ * from any annotations that it includes. Will always be populated for a message with action {@code MESSAGE_SUMMARY},
+ * and may be populated for any other type (in particular a message retrieved from
+ * REST history will have its latest summary included).
+ */
+ public Summary summary;
+
public static class Operation {
public String clientId;
public String description;
@@ -178,6 +186,7 @@ protected static Operation read(final JsonObject jsonObject) throws MessageDecod
private static final String REF_SERIAL = "refSerial";
private static final String REF_TYPE = "refType";
private static final String OPERATION = "operation";
+ private static final String SUMMARY = "summary";
/**
* Default constructor
@@ -265,6 +274,7 @@ void writeMsgpack(MessagePacker packer) throws IOException {
if(refSerial != null) ++fieldCount;
if(refType != null) ++fieldCount;
if(operation != null) ++fieldCount;
+ if(summary != null) ++fieldCount;
packer.packMapHeader(fieldCount);
super.writeFields(packer);
@@ -308,6 +318,10 @@ void writeMsgpack(MessagePacker packer) throws IOException {
packer.packString(OPERATION);
operation.write(packer);
}
+ if(summary != null) {
+ packer.packString(SUMMARY);
+ summary.write(packer);
+ }
}
Message readMsgpack(MessageUnpacker unpacker) throws IOException {
@@ -343,6 +357,8 @@ Message readMsgpack(MessageUnpacker unpacker) throws IOException {
refType = unpacker.unpackString();
} else if (fieldName.equals(OPERATION)) {
operation = Operation.read(unpacker);
+ } else if (fieldName.equals(SUMMARY)) {
+ summary = Summary.read(unpacker);
}
else {
Log.v(TAG, "Unexpected field: " + fieldName);
@@ -512,10 +528,18 @@ protected void read(final JsonObject map) throws MessageDecodeException {
final JsonElement operationElement = map.get(OPERATION);
if (null != operationElement) {
- if (!(operationElement instanceof JsonObject)) {
+ if (!operationElement.isJsonObject()) {
throw MessageDecodeException.fromDescription("Message operation is of type \"" + operationElement.getClass() + "\" when expected a JSON object.");
}
- operation = Operation.read((JsonObject) operationElement);
+ operation = Operation.read(operationElement.getAsJsonObject());
+ }
+
+ final JsonElement summaryElement = map.get(SUMMARY);
+ if (summaryElement != null) {
+ if (!summaryElement.isJsonObject()) {
+ throw MessageDecodeException.fromDescription("Message summary is of type \"" + summaryElement.getClass() + "\" when expected a JSON object.");
+ }
+ summary = Summary.read(summaryElement.getAsJsonObject());
}
}
@@ -553,6 +577,9 @@ public JsonElement serialize(Message message, Type typeOfMessage, JsonSerializat
if (message.operation != null) {
json.add(OPERATION, Serialisation.gson.toJsonTree(message.operation));
}
+ if (message.summary != null) {
+ json.add(SUMMARY, message.summary.toJsonTree());
+ }
return json;
}
diff --git a/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java b/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java
index 986a8628b..73db3bf23 100644
--- a/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java
+++ b/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java
@@ -28,28 +28,28 @@
*/
public class ProtocolMessage {
public enum Action {
- heartbeat,
- ack,
- nack,
- connect,
- connected,
- disconnect,
- disconnected,
- close,
- closed,
- error,
- attach,
- attached,
- detach,
- detached,
- presence,
- message,
- sync,
- auth,
- activate,
- object,
- object_sync,
- annotation;
+ heartbeat, // 0
+ ack, // 1
+ nack, // 2
+ connect, // 3
+ connected, // 4
+ disconnect, // 5
+ disconnected, // 6
+ close, // 7
+ closed, // 8
+ error, // 9
+ attach, // 10
+ attached, // 11
+ detach, // 12
+ detached, // 13
+ presence, // 14
+ message, // 15
+ sync, // 16
+ auth, // 17
+ activate, // 18
+ object, // 19
+ object_sync, // 20
+ annotation; // 21
public int getValue() { return ordinal(); }
public static Action findByValue(int value) { return values()[value]; }
@@ -68,12 +68,14 @@ public enum Flag {
publish(17),
subscribe(18),
presence_subscribe(19),
+ // 20 reserved (TR3v)
/* Annotation flags */
- annotation_publish(21),
- annotation_subscribe(22),
+ annotation_publish(21), // (TR3w)
+ annotation_subscribe(22), // (TR3x)
+ // 23 reserved (TR3v)
/* Object flags */
- object_subscribe(24),
- object_publish(25);
+ object_subscribe(24), // (TR3y)
+ object_publish(25); // (TR3z)
private final int mask;
@@ -86,8 +88,12 @@ public int getMask() {
}
}
+ /**
+ * (RTN7a)
+ */
public static boolean ackRequired(ProtocolMessage msg) {
- return (msg.action == Action.message || msg.action == Action.presence);
+ return (msg.action == Action.message || msg.action == Action.presence
+ || msg.action == Action.object || msg.action == Action.annotation);
}
public ProtocolMessage() {}
@@ -116,6 +122,7 @@ public ProtocolMessage(Action action, String channel) {
public ConnectionDetails connectionDetails;
public AuthDetails auth;
public Map params;
+ public Annotation[] annotations;
public boolean hasFlag(final Flag flag) {
return (flags & flag.getMask()) == flag.getMask();
@@ -139,6 +146,7 @@ void writeMsgpack(MessagePacker packer) throws IOException {
if(flags != 0) ++fieldCount;
if(params != null) ++fieldCount;
if(channelSerial != null) ++fieldCount;
+ if(annotations != null) ++fieldCount;
packer.packMapHeader(fieldCount);
packer.packString("action");
packer.packInt(action.getValue());
@@ -174,6 +182,10 @@ void writeMsgpack(MessagePacker packer) throws IOException {
packer.packString("channelSerial");
packer.packString(channelSerial);
}
+ if(annotations != null) {
+ packer.packString("annotations");
+ AnnotationSerializer.writeMsgpackArray(annotations, packer);
+ }
}
ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException {
@@ -233,6 +245,9 @@ ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException {
case "params":
params = MessageSerializer.readStringMap(unpacker);
break;
+ case "annotations":
+ annotations = AnnotationSerializer.readMsgpackArray(unpacker);
+ break;
default:
Log.v(TAG, "Unexpected field: " + fieldName);
unpacker.skipValue();
diff --git a/lib/src/main/java/io/ably/lib/types/Summary.java b/lib/src/main/java/io/ably/lib/types/Summary.java
new file mode 100644
index 000000000..292fe67b6
--- /dev/null
+++ b/lib/src/main/java/io/ably/lib/types/Summary.java
@@ -0,0 +1,142 @@
+package io.ably.lib.types;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import io.ably.lib.util.Log;
+import io.ably.lib.util.Serialisation;
+import org.msgpack.core.MessagePacker;
+import org.msgpack.core.MessageUnpacker;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A summary of all the annotations that have been made to the message. Will always be
+ * populated for a message.summary, and may be populated for any other type (in
+ * particular a message retrieved from REST history will have its latest summary
+ * included).
+ * The keys of the map are the annotation types. The exact structure of the value of
+ * each key depends on the aggregation part of the annotation type, e.g. for a type of
+ * reaction:distinct.v1, the value will be a DistinctValues object. New aggregation
+ * methods might be added serverside, hence the 'unknown' part of the sum type.
+ */
+public class Summary {
+
+ private static final String TAG = Summary.class.getName();
+
+ /**
+ * (TM2q1) The sdk MUST be able to cope with structures and aggregation types that have it does not yet know about
+ * or have explicit support for, hence the loose (JsonObject) type.
+ */
+ private final Map typeToSummaryJson;
+
+ public Summary(Map typeToSummaryJson) {
+ this.typeToSummaryJson = typeToSummaryJson;
+ }
+
+ public static Map asSummaryDistinctV1(JsonObject jsonObject) {
+ Map summary = new HashMap<>();
+ for (Map.Entry entry : jsonObject.entrySet()) {
+ String key = entry.getKey();
+ summary.put(key, asSummaryFlagV1(entry.getValue().getAsJsonObject()));
+ }
+ return summary;
+ }
+
+ public static Map asSummaryUniqueV1(JsonObject jsonObject) {
+ return asSummaryDistinctV1(jsonObject);
+ }
+
+ public static Map asSummaryMultipleV1(JsonObject jsonObject) {
+ Map summary = new HashMap<>();
+ for (Map.Entry entry : jsonObject.entrySet()) {
+ String key = entry.getKey();
+ JsonObject value = entry.getValue().getAsJsonObject();
+ int total = value.get("total").getAsInt();
+ Map clientIds = new HashMap<>();
+ for (Map.Entry clientEntry: value.get("clientIds").getAsJsonObject().entrySet()) {
+ clientIds.put(clientEntry.getKey(), clientEntry.getValue().getAsInt());
+ }
+ summary.put(key, new SummaryClientIdCounts(total, clientIds));
+ }
+ return summary;
+ }
+
+ public static SummaryClientIdList asSummaryFlagV1(JsonObject jsonObject) {
+ int total = jsonObject.get("total").getAsInt();
+ List clientIds = Serialisation.gson.fromJson(jsonObject.get("clientIds"), List.class);
+ return new SummaryClientIdList(total, clientIds);
+ }
+
+ public static SummaryTotal asSummaryTotalV1(JsonObject jsonObject) {
+ int total = jsonObject.get("total").getAsInt();
+ return new SummaryTotal(total);
+ }
+
+ static Summary read(MessageUnpacker unpacker) {
+ try {
+ return read(Serialisation.msgpackToGson(unpacker.unpackValue()));
+ } catch (Exception e) {
+ Log.e(TAG, "Failed to read summary from MessagePack", e);
+ return null;
+ }
+ }
+
+ static Summary read(JsonElement json) {
+ if (!json.isJsonObject()) {
+ throw new JsonParseException("Expected an object but got \"" + json.getClass() + "\".");
+ }
+ Map typeToSummaryJson = new HashMap<>();
+ for (Map.Entry entry : json.getAsJsonObject().entrySet()) {
+ if (!entry.getValue().isJsonObject()) {
+ throw new JsonParseException("Expected an object but got \"" + json.getClass() + "\".");
+ }
+ typeToSummaryJson.put(entry.getKey(), entry.getValue().getAsJsonObject());
+ }
+ return new Summary(typeToSummaryJson);
+ }
+
+ /**
+ * Retrieves the JSON representation associated with a specified annotation type.
+ *
+ * @param annotationType the type of annotation to retrieve its JSON representation
+ * @return a JsonObject containing the JSON representation of the specified annotation type,
+ * or null if no representation exists for the given type
+ */
+ public JsonObject get(String annotationType) {
+ return typeToSummaryJson.get(annotationType);
+ }
+
+ void write(MessagePacker packer) {
+ Serialisation.gsonToMsgpack(toJsonTree(), packer);
+ }
+
+ JsonElement toJsonTree() {
+ return Serialisation.gson.toJsonTree(this);
+ }
+
+ public static class Serializer implements JsonSerializer, JsonDeserializer {
+
+ @Override
+ public JsonElement serialize(Summary summary, Type typeOfMessage, JsonSerializationContext ctx) {
+ JsonObject json = new JsonObject();
+ for (Map.Entry entry : summary.typeToSummaryJson.entrySet()) {
+ json.add(entry.getKey(), entry.getValue());
+ }
+ return json;
+ }
+
+ @Override
+ public Summary deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
+ return read(json);
+ }
+
+ }
+}
diff --git a/lib/src/main/java/io/ably/lib/types/SummaryClientIdCounts.java b/lib/src/main/java/io/ably/lib/types/SummaryClientIdCounts.java
new file mode 100644
index 000000000..d99996749
--- /dev/null
+++ b/lib/src/main/java/io/ably/lib/types/SummaryClientIdCounts.java
@@ -0,0 +1,13 @@
+package io.ably.lib.types;
+
+import java.util.Map;
+
+public class SummaryClientIdCounts {
+ public final int total; // TM7d1a
+ public final Map clientIds; // TM7d1b
+
+ public SummaryClientIdCounts(int total, Map clientIds) {
+ this.total = total;
+ this.clientIds = clientIds;
+ }
+}
diff --git a/lib/src/main/java/io/ably/lib/types/SummaryClientIdList.java b/lib/src/main/java/io/ably/lib/types/SummaryClientIdList.java
new file mode 100644
index 000000000..2c9db8a08
--- /dev/null
+++ b/lib/src/main/java/io/ably/lib/types/SummaryClientIdList.java
@@ -0,0 +1,13 @@
+package io.ably.lib.types;
+
+import java.util.List;
+
+public class SummaryClientIdList {
+ public final int total; // TM7c1a
+ public final List clientIds; // TM7c1b
+
+ public SummaryClientIdList(int total, List clientIds) {
+ this.total = total;
+ this.clientIds = clientIds;
+ }
+}
diff --git a/lib/src/main/java/io/ably/lib/types/SummaryTotal.java b/lib/src/main/java/io/ably/lib/types/SummaryTotal.java
new file mode 100644
index 000000000..f7d4b0724
--- /dev/null
+++ b/lib/src/main/java/io/ably/lib/types/SummaryTotal.java
@@ -0,0 +1,9 @@
+package io.ably.lib.types;
+
+public class SummaryTotal {
+ public final int total; // TM7e1a
+
+ SummaryTotal(int total) {
+ this.total = total;
+ }
+}
diff --git a/lib/src/main/java/io/ably/lib/util/Serialisation.java b/lib/src/main/java/io/ably/lib/util/Serialisation.java
index d2ae3baad..8397cd069 100644
--- a/lib/src/main/java/io/ably/lib/util/Serialisation.java
+++ b/lib/src/main/java/io/ably/lib/util/Serialisation.java
@@ -11,11 +11,14 @@
import io.ably.lib.http.HttpCore;
import io.ably.lib.platform.Platform;
import io.ably.lib.types.AblyException;
+import io.ably.lib.types.Annotation;
+import io.ably.lib.types.AnnotationAction;
import io.ably.lib.types.ErrorInfo;
import io.ably.lib.types.Message;
import io.ably.lib.types.MessageExtras;
import io.ably.lib.types.PresenceMessage;
import io.ably.lib.types.ProtocolMessage;
+import io.ably.lib.types.Summary;
import org.msgpack.core.MessagePack;
import org.msgpack.core.MessagePack.PackerConfig;
import org.msgpack.core.MessagePack.UnpackerConfig;
@@ -48,6 +51,9 @@ public class Serialisation {
gsonBuilder.registerTypeAdapter(PresenceMessage.class, new PresenceMessage.Serializer());
gsonBuilder.registerTypeAdapter(PresenceMessage.Action.class, new PresenceMessage.ActionSerializer());
gsonBuilder.registerTypeAdapter(ProtocolMessage.Action.class, new ProtocolMessage.ActionSerializer());
+ gsonBuilder.registerTypeAdapter(Annotation.class, new Annotation.Serializer());
+ gsonBuilder.registerTypeAdapter(AnnotationAction.class, new Annotation.ActionSerializer());
+ gsonBuilder.registerTypeAdapter(Summary.class, new Summary.Serializer());
gson = gsonBuilder.create();
msgpackPackerConfig = Platform.name.equals("android") ?
diff --git a/lib/src/test/java/io/ably/lib/test/common/Setup.java b/lib/src/test/java/io/ably/lib/test/common/Setup.java
index b6171edf0..889aba74f 100644
--- a/lib/src/test/java/io/ably/lib/test/common/Setup.java
+++ b/lib/src/test/java/io/ably/lib/test/common/Setup.java
@@ -68,6 +68,7 @@ public static class Namespace {
public boolean persisted;
public boolean pushEnabled;
public int status;
+ public boolean mutableMessages;
}
public static class Connection {
diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeAnnotationsTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeAnnotationsTest.java
new file mode 100644
index 000000000..cfc18bc20
--- /dev/null
+++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeAnnotationsTest.java
@@ -0,0 +1,184 @@
+package io.ably.lib.test.realtime;
+
+import io.ably.lib.realtime.AblyRealtime;
+import io.ably.lib.realtime.Channel;
+import io.ably.lib.realtime.ChannelState;
+import io.ably.lib.rest.AblyRest;
+import io.ably.lib.test.common.Helpers;
+import io.ably.lib.test.common.ParameterizedTest;
+import io.ably.lib.types.AblyException;
+import io.ably.lib.types.Annotation;
+import io.ably.lib.types.AnnotationAction;
+import io.ably.lib.types.ChannelMode;
+import io.ably.lib.types.ChannelOptions;
+import io.ably.lib.types.ClientOptions;
+import io.ably.lib.types.Message;
+import io.ably.lib.types.Param;
+import io.ably.lib.types.PaginatedResult;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class RealtimeAnnotationsTest extends ParameterizedTest {
+
+ @Rule
+ public Timeout testTimeout = Timeout.seconds(60);
+
+ @Test
+ public void publish_and_subscribe_annotations() throws Exception {
+
+ String channelName = "mutable:publish_subscribe_annotation";
+
+ TestChannel testChannel = new TestChannel(channelName);
+
+ Channel channel = testChannel.realtimeChannel;
+
+ final Message[] receivedMessage = new Message[1];
+ channel.subscribe(message -> receivedMessage[0] = message);
+
+ final Annotation[] receivedAnnotation = new Annotation[1];
+ Helpers.CompletionWaiter waiter = new Helpers.CompletionWaiter();
+ channel.annotations.subscribe(annotation -> {
+ receivedAnnotation[0] = annotation;
+ waiter.onSuccess();
+ });
+
+ Helpers.MessageWaiter messageWaiter = new Helpers.MessageWaiter(channel);
+ channel.publish("message", "foobar");
+ messageWaiter.waitFor(1);
+
+ assertNotNull("Message should be received", receivedMessage[0]);
+
+ Annotation emoji1Annotation = new Annotation();
+ emoji1Annotation.type = "reaction:distinct.v1";
+ emoji1Annotation.name = "👍";
+
+ channel.annotations.publish(receivedMessage[0].serial, emoji1Annotation);
+ waiter.waitFor();
+
+ assertNotNull("Annotation should be received", receivedAnnotation[0]);
+ assertEquals(AnnotationAction.ANNOTATION_CREATE, receivedAnnotation[0].action);
+ assertEquals(receivedMessage[0].serial, receivedAnnotation[0].messageSerial);
+ assertEquals("reaction:distinct.v1", receivedAnnotation[0].type);
+ assertEquals("👍", receivedAnnotation[0].name);
+ assertTrue(receivedAnnotation[0].serial.compareTo(receivedAnnotation[0].messageSerial) > 0);
+
+ waiter.reset();
+
+ receivedAnnotation[0] = null;
+ Annotation emoji2Annotation = new Annotation();
+ emoji2Annotation.type = "reaction:distinct.v1";
+ emoji2Annotation.name = "😕";
+ testChannel.restChannel.annotations.publish(receivedMessage[0].serial, emoji2Annotation);
+
+ waiter.waitFor();
+
+ assertNotNull("Rest annotation should be received", receivedAnnotation[0]);
+ assertEquals(AnnotationAction.ANNOTATION_CREATE, receivedAnnotation[0].action);
+ assertEquals(receivedMessage[0].serial, receivedAnnotation[0].messageSerial);
+ assertEquals("reaction:distinct.v1", receivedAnnotation[0].type);
+ assertEquals("😕", receivedAnnotation[0].name);
+ assertTrue(receivedAnnotation[0].serial.compareTo(receivedAnnotation[0].messageSerial) > 0);
+
+ testChannel.dispose();
+ }
+
+ @Test
+ public void get_all_annotations() throws Exception {
+ String channelName = "mutable:get_all_annotations_for_a_message";
+
+ TestChannel testChannel = new TestChannel(channelName);
+ Channel channel = testChannel.realtimeChannel;
+
+ final Message[] receivedMessage = new Message[1];
+ channel.subscribe(message -> receivedMessage[0] = message);
+
+ Helpers.MessageWaiter messageWaiter = new Helpers.MessageWaiter(channel);
+ channel.publish("message", "foobar");
+ messageWaiter.waitFor(1);
+
+ Helpers.CompletionWaiter waiter = new Helpers.CompletionWaiter();
+ channel.annotations.subscribe(annotation -> waiter.onSuccess());
+
+ String[] emojis = new String[]{"👍", "😕", "👎", "👍👍", "😕😕", "👎👎"};
+ for (String emoji : emojis) {
+ Annotation annotation = new Annotation();
+ annotation.type = "reaction:distinct.v1";
+ annotation.name = emoji;
+ testChannel.restChannel.annotations.publish(receivedMessage[0].serial, annotation);
+ }
+
+ waiter.waitFor(6);
+
+ // There is a gap between receiving annotation messages and getting them in annotations
+ Thread.sleep(1_000);
+
+ PaginatedResult result = channel.annotations.get(receivedMessage[0].serial);
+ assertEquals(6, result.items().length);
+
+ assertEquals(AnnotationAction.ANNOTATION_CREATE, result.items()[0].action);
+ assertEquals(receivedMessage[0].serial, result.items()[0].messageSerial);
+ assertEquals("reaction:distinct.v1", result.items()[0].type);
+ assertEquals("👍", result.items()[0].name);
+ assertEquals("😕", result.items()[1].name);
+ assertEquals("👎", result.items()[2].name);
+ assertTrue(result.items()[1].serial.compareTo(result.items()[0].serial) > 0);
+ assertTrue(result.items()[2].serial.compareTo(result.items()[1].serial) > 0);
+
+ result = channel.annotations.get(receivedMessage[0].serial, new Param[]{new Param("limit", "2")});
+ assertEquals(2, result.items().length);
+ assertEquals("👍", result.items()[0].name);
+ assertEquals("😕", result.items()[1].name);
+ assertTrue(result.hasNext());
+
+ result = result.next();
+ assertNotNull(result);
+ assertEquals(2, result.items().length);
+ assertEquals("👎", result.items()[0].name);
+ assertEquals("👍👍", result.items()[1].name);
+ assertTrue(result.hasNext());
+
+ result = result.next();
+ assertNotNull(result);
+ assertEquals(2, result.items().length);
+ assertEquals("😕😕", result.items()[0].name);
+ assertEquals("👎👎", result.items()[1].name);
+ assertTrue(!result.hasNext());
+ }
+
+
+ private class TestChannel {
+ TestChannel(String channelName) throws AblyException {
+ ClientOptions opts = createOptions(testVars.keys[0].keyStr);
+ opts.clientId = UUID.randomUUID().toString();
+ rest = new AblyRest(opts);
+ restChannel = rest.channels.get(channelName);
+ realtime = new AblyRealtime(opts);
+ ChannelOptions channelOptions = new ChannelOptions();
+ channelOptions.modes = new ChannelMode[] {
+ ChannelMode.publish, ChannelMode.subscribe, ChannelMode.annotation_publish, ChannelMode.annotation_subscribe
+ };
+
+ realtimeChannel = realtime.channels.get(channelName, channelOptions);
+ realtimeChannel.attach();
+ (new Helpers.ChannelWaiter(realtimeChannel)).waitFor(ChannelState.attached);
+ }
+
+ void dispose() throws Exception {
+ realtime.close();
+ rest.close();
+ }
+
+ AblyRest rest;
+ AblyRealtime realtime;
+ io.ably.lib.rest.Channel restChannel;
+ io.ably.lib.realtime.Channel realtimeChannel;
+ }
+
+}
diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java
index 1ff965b1d..dff2b1711 100644
--- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java
+++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeMessageTest.java
@@ -8,7 +8,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -20,6 +19,7 @@
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
+import io.ably.lib.test.util.AblyCommonsReader;
import io.ably.lib.types.ChannelOptions;
import io.ably.lib.types.MessageAction;
import io.ably.lib.types.MessageExtras;
@@ -47,7 +47,6 @@
import io.ably.lib.test.common.Helpers.MessageWaiter;
import io.ably.lib.test.common.Helpers;
import io.ably.lib.test.common.ParameterizedTest;
-import io.ably.lib.test.common.Setup;
import io.ably.lib.transport.ConnectionManager;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.Callback;
@@ -59,7 +58,7 @@
public class RealtimeMessageTest extends ParameterizedTest {
- private static final String testMessagesEncodingFile = "ably-common/test-resources/messages-encoding.json";
+ private static final String testMessagesEncodingFile = "test-resources/messages-encoding.json";
private static Gson gson = new Gson();
@Rule
@@ -532,13 +531,7 @@ public void ensure_disconnect_with_error_does_not_move_to_failed() {
@Test
public void messages_encoding_fixtures() {
MessagesEncodingData fixtures;
- try {
- fixtures = (MessagesEncodingData) Setup.loadJson(testMessagesEncodingFile, MessagesEncodingData.class);
- } catch(IOException e) {
- fail();
- return;
- }
-
+ fixtures = AblyCommonsReader.read(testMessagesEncodingFile, MessagesEncodingData.class);
AblyRealtime ably = null;
try {
ClientOptions opts = createOptions(testVars.keys[0].keyStr);
@@ -597,12 +590,7 @@ public MessagesEncodingDataItem[] handleResponse(HttpCore.Response response, Err
@Test
public void messages_msgpack_and_json_encoding_is_compatible() {
MessagesEncodingData fixtures;
- try {
- fixtures = (MessagesEncodingData) Setup.loadJson(testMessagesEncodingFile, MessagesEncodingData.class);
- } catch(IOException e) {
- fail();
- return;
- }
+ fixtures = AblyCommonsReader.read(testMessagesEncodingFile, MessagesEncodingData.class);
// Publish each data type through raw JSON POST and retrieve through MsgPack and JSON.
@@ -884,15 +872,10 @@ public void message_from_encoded_json_object() throws AblyException {
public void messages_from_encoded_json_array() throws AblyException {
JsonArray fixtures = null;
MessagesData testMessages = null;
- try {
- testMessages = (MessagesData) Setup.loadJson(testMessagesEncodingFile, MessagesData.class);
- JsonObject jsonObject = (JsonObject) Setup.loadJson(testMessagesEncodingFile, JsonObject.class);
- //We use this as-is for decoding purposes.
- fixtures = jsonObject.getAsJsonArray("messages");
- } catch(IOException e) {
- fail();
- return;
- }
+ testMessages = AblyCommonsReader.read(testMessagesEncodingFile, MessagesData.class);
+ JsonObject jsonObject = AblyCommonsReader.read(testMessagesEncodingFile, JsonObject.class);
+ //We use this as-is for decoding purposes.
+ fixtures = jsonObject.getAsJsonArray("messages");
Message[] decodedMessages = Message.fromEncodedArray(fixtures, null);
for(int index = 0; index < decodedMessages.length; index++) {
diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java
index dee3e57d2..85357acd2 100644
--- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java
+++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimePresenceTest.java
@@ -17,7 +17,6 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -41,7 +40,7 @@
import io.ably.lib.realtime.ConnectionState;
import io.ably.lib.realtime.ConnectionStateListener;
import io.ably.lib.realtime.Presence;
-import io.ably.lib.test.common.Setup;
+import io.ably.lib.test.util.AblyCommonsReader;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.Capability;
import io.ably.lib.types.ChannelOptions;
@@ -74,7 +73,7 @@
public class RealtimePresenceTest extends ParameterizedTest {
- private static final String testMessagesEncodingFile = "ably-common/test-resources/presence-messages-encoding.json";
+ private static final String testMessagesEncodingFile = "test-resources/presence-messages-encoding.json";
private static final String testClientId1 = "testClientId1";
private static final String testClientId2 = "testClientId2";
private Auth.TokenDetails token1;
@@ -3636,15 +3635,11 @@ public void message_from_encoded_json_object() throws AblyException {
public void messages_from_encoded_json_array() throws AblyException {
JsonArray fixtures = null;
MessagesData testMessages = null;
- try {
- testMessages = (MessagesData) Setup.loadJson(testMessagesEncodingFile, MessagesData.class);
- JsonObject jsonObject = (JsonObject) Setup.loadJson(testMessagesEncodingFile, JsonObject.class);
- //We use this as-is for decoding purposes.
- fixtures = jsonObject.getAsJsonArray("messages");
- } catch(IOException e) {
- fail();
- return;
- }
+ testMessages = AblyCommonsReader.read(testMessagesEncodingFile, MessagesData.class);
+ JsonObject jsonObject = AblyCommonsReader.readAsJsonObject(testMessagesEncodingFile);
+ //We use this as-is for decoding purposes.
+ fixtures = jsonObject.getAsJsonArray("messages");
+
PresenceMessage[] decodedMessages = PresenceMessage.fromEncodedArray(fixtures, null);
for(int index = 0; index < decodedMessages.length; index++) {
PresenceMessage testInputMsg = testMessages.messages[index];
diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeSuite.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeSuite.java
index e177269df..d292b960f 100644
--- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeSuite.java
+++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeSuite.java
@@ -15,6 +15,7 @@
@SuiteClasses({
ConnectionManagerTest.class,
RealtimeHttpHeaderTest.class,
+ RealtimeAnnotationsTest.class,
RealtimeAuthTest.class,
RealtimeJWTTest.class,
RealtimeReauthTest.class,
diff --git a/lib/src/test/java/io/ably/lib/test/util/AblyCommonsReader.java b/lib/src/test/java/io/ably/lib/test/util/AblyCommonsReader.java
new file mode 100644
index 000000000..aa8b30f32
--- /dev/null
+++ b/lib/src/test/java/io/ably/lib/test/util/AblyCommonsReader.java
@@ -0,0 +1,52 @@
+package io.ably.lib.test.util;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class AblyCommonsReader {
+ private static final String BASE_URL = "https://raw.githubusercontent.com/ably/ably-common/refs/heads/main/";
+ private static Gson gson = new Gson();
+
+ public static String readAsString(String path) throws Exception {
+ URL url = new URL(BASE_URL + path);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestMethod("GET");
+
+ if (conn.getResponseCode() != 200) {
+ throw new RuntimeException("Failed : HTTP error code : " + conn.getResponseCode());
+ }
+
+ BufferedReader br = new BufferedReader(new InputStreamReader((conn.getInputStream())));
+ StringBuilder sb = new StringBuilder();
+ String output;
+ while ((output = br.readLine()) != null) {
+ sb.append(output);
+ }
+
+ conn.disconnect();
+
+ return sb.toString();
+ }
+
+ public static JsonObject readAsJsonObject(String path) {
+ try {
+ return JsonParser.parseString(readAsString(path)).getAsJsonObject();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static T read(String path, Class classOfT) {
+ try {
+ return gson.fromJson(readAsString(path), classOfT);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/lib/src/test/java/io/ably/lib/types/SummaryTest.java b/lib/src/test/java/io/ably/lib/types/SummaryTest.java
new file mode 100644
index 000000000..13490f395
--- /dev/null
+++ b/lib/src/test/java/io/ably/lib/types/SummaryTest.java
@@ -0,0 +1,256 @@
+package io.ably.lib.types;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class SummaryTest {
+
+ @Test
+ public void testAsSummaryUniqueV1_SingleEntry() {
+ JsonObject jsonObject = new JsonObject();
+ JsonObject entryValue = new JsonObject();
+ entryValue.addProperty("total", 5);
+ JsonArray clientIds = new JsonArray();
+ clientIds.add("uniqueClient1");
+ clientIds.add("uniqueClient2");
+ clientIds.add("uniqueClient3");
+ clientIds.add("uniqueClient4");
+ clientIds.add("uniqueClient5");
+ entryValue.add("clientIds", clientIds);
+ jsonObject.add("😄️️️", entryValue);
+
+ Map result = Summary.asSummaryUniqueV1(jsonObject);
+
+ assertNotNull(result);
+ assertEquals(1, result.size());
+ assertTrue(result.containsKey("😄️️️"));
+
+ SummaryClientIdList summary = result.get("😄️️️");
+ assertNotNull(summary);
+ assertEquals(5, summary.total);
+ assertEquals(5, summary.clientIds.size());
+ assertTrue(summary.clientIds.contains("uniqueClient1"));
+ assertTrue(summary.clientIds.contains("uniqueClient2"));
+ assertTrue(summary.clientIds.contains("uniqueClient3"));
+ assertTrue(summary.clientIds.contains("uniqueClient4"));
+ assertTrue(summary.clientIds.contains("uniqueClient5"));
+ }
+
+ @Test
+ public void testAsSummaryUniqueV1_InvalidJsonStructure() {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("invalidKey", "invalidValue");
+
+ try {
+ Summary.asSummaryUniqueV1(jsonObject);
+ fail("Should throw IllegalStateException");
+ } catch (IllegalStateException exception) {
+ assertNotNull(exception);
+ }
+ }
+
+ @Test
+ public void testAsSummaryDistinctV1_EmptyJsonObject() {
+ JsonObject jsonObject = new JsonObject();
+
+ Map result = Summary.asSummaryDistinctV1(jsonObject);
+
+ assertNotNull(result);
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testAsSummaryDistinctV1_SingleEntry() {
+ JsonObject jsonObject = new JsonObject();
+ JsonObject entryValue = new JsonObject();
+ entryValue.addProperty("total", 3);
+ JsonArray clientIds = new JsonArray();
+ clientIds.add("client1");
+ clientIds.add("client2");
+ clientIds.add("client3");
+ entryValue.add("clientIds", clientIds);
+ jsonObject.add("😄️️️", entryValue);
+
+ Map result = Summary.asSummaryDistinctV1(jsonObject);
+
+ assertNotNull(result);
+ assertEquals(1, result.size());
+ assertTrue(result.containsKey("😄️️️"));
+
+ SummaryClientIdList summary = result.get("😄️️️");
+ assertNotNull(summary);
+ assertEquals(3, summary.total);
+ assertEquals(3, summary.clientIds.size());
+ assertTrue(summary.clientIds.contains("client1"));
+ assertTrue(summary.clientIds.contains("client2"));
+ assertTrue(summary.clientIds.contains("client3"));
+ }
+
+ @Test
+ public void testAsSummaryDistinctV1_InvalidJsonStructure() {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("invalidKey", "invalidValue");
+
+ try {
+ Summary.asSummaryDistinctV1(jsonObject);
+ fail("Should throw ClassCastException");
+ } catch (IllegalStateException exception) {
+ assertNotNull(exception);
+ }
+ }
+
+ @Test
+ public void testAsSummaryFlagV1_SingleEntry() {
+ JsonObject entryValue = new JsonObject();
+ entryValue.addProperty("total", 3);
+ JsonArray clientIds = new JsonArray();
+ clientIds.add("client1");
+ clientIds.add("client2");
+ clientIds.add("client3");
+ entryValue.add("clientIds", clientIds);
+
+ SummaryClientIdList result = Summary.asSummaryFlagV1(entryValue);
+
+ assertNotNull(result);
+ assertEquals(3, result.total);
+ assertEquals(3, result.clientIds.size());
+ assertTrue(result.clientIds.contains("client1"));
+ assertTrue(result.clientIds.contains("client2"));
+ assertTrue(result.clientIds.contains("client3"));
+ }
+
+ @Test
+ public void testAsSummaryMultipleV1_EmptyJsonObject() {
+ JsonObject jsonObject = new JsonObject();
+
+ Map result = Summary.asSummaryMultipleV1(jsonObject);
+
+ assertNotNull(result);
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testAsSummaryMultipleV1_SingleEntry() {
+ JsonObject jsonObject = new JsonObject();
+ JsonObject entryValue = new JsonObject();
+ entryValue.addProperty("total", 4);
+ JsonObject clientIds = new JsonObject();
+ clientIds.addProperty("client1", 2);
+ clientIds.addProperty("client2", 1);
+ clientIds.addProperty("client3", 1);
+ entryValue.add("clientIds", clientIds);
+ jsonObject.add("😄️️️", entryValue);
+
+ Map result = Summary.asSummaryMultipleV1(jsonObject);
+
+ assertNotNull(result);
+ assertEquals(1, result.size());
+ assertTrue(result.containsKey("😄️️️"));
+
+ SummaryClientIdCounts summary = result.get("😄️️️");
+ assertNotNull(summary);
+ assertEquals(4, summary.total);
+ assertEquals(3, summary.clientIds.size());
+ assertEquals(2, summary.clientIds.get("client1").intValue());
+ assertEquals(1, summary.clientIds.get("client2").intValue());
+ assertEquals(1, summary.clientIds.get("client3").intValue());
+ }
+
+ @Test
+ public void testAsSummaryMultipleV1_MultipleEntries() {
+ JsonObject jsonObject = new JsonObject();
+
+ JsonObject entryValue1 = new JsonObject();
+ entryValue1.addProperty("total", 5);
+ JsonObject clientIds1 = new JsonObject();
+ clientIds1.addProperty("clientA", 3);
+ clientIds1.addProperty("clientB", 2);
+ entryValue1.add("clientIds", clientIds1);
+ jsonObject.add("😄️️️", entryValue1);
+
+ JsonObject entryValue2 = new JsonObject();
+ entryValue2.addProperty("total", 2);
+ JsonObject clientIds2 = new JsonObject();
+ clientIds2.addProperty("clientX", 1);
+ clientIds2.addProperty("clientY", 1);
+ entryValue2.add("clientIds", clientIds2);
+ jsonObject.add("👍️️️️️️", entryValue2);
+
+ Map result = Summary.asSummaryMultipleV1(jsonObject);
+
+ assertNotNull(result);
+ assertEquals(2, result.size());
+
+ SummaryClientIdCounts summaryA = result.get("😄️️️");
+ assertNotNull(summaryA);
+ assertEquals(5, summaryA.total);
+ assertEquals(2, summaryA.clientIds.size());
+ assertEquals(3, (int) summaryA.clientIds.get("clientA"));
+ assertEquals(2, (int) summaryA.clientIds.get("clientB"));
+
+ SummaryClientIdCounts summaryB = result.get("👍️️️️️️");
+ assertNotNull(summaryB);
+ assertEquals(2, summaryB.total);
+ assertEquals(2, summaryB.clientIds.size());
+ assertEquals(1, (int) summaryB.clientIds.get("clientX"));
+ assertEquals(1, (int) summaryB.clientIds.get("clientY"));
+ }
+
+ @Test
+ public void testAsSummaryMultipleV1_InvalidJsonStructure() {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("invalidKey", "invalidValue");
+
+ try {
+ Summary.asSummaryMultipleV1(jsonObject);
+ fail("Should throw IllegalStateException");
+ } catch (IllegalStateException exception) {
+ assertNotNull(exception);
+ }
+ }
+
+ @Test
+ public void testAsSummaryTotalV1_ValidJsonObject() {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("total", 10);
+
+ SummaryTotal result = Summary.asSummaryTotalV1(jsonObject);
+
+ assertNotNull(result);
+ assertEquals(10, result.total);
+ }
+
+ @Test
+ public void testAsSummaryTotalV1_EmptyJsonObject() {
+ JsonObject jsonObject = new JsonObject();
+
+ try {
+ Summary.asSummaryTotalV1(jsonObject);
+ fail("Should throw NullPointerException");
+ } catch (NullPointerException exception) {
+ assertNotNull(exception);
+ }
+ }
+
+ @Test
+ public void testAsSummaryTotalV1_InvalidJsonStructure() {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty("invalidKey", "invalidValue");
+
+ try {
+ Summary.asSummaryTotalV1(jsonObject);
+ fail("Should throw IllegalStateException");
+ } catch (Exception exception) {
+ assertNotNull(exception);
+ }
+ }
+}
diff --git a/lib/src/test/java/io/ably/lib/util/CryptoMessageTest.java b/lib/src/test/java/io/ably/lib/util/CryptoMessageTest.java
index ca9f0cf11..702188cf9 100644
--- a/lib/src/test/java/io/ably/lib/util/CryptoMessageTest.java
+++ b/lib/src/test/java/io/ably/lib/util/CryptoMessageTest.java
@@ -9,18 +9,16 @@
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
+import io.ably.lib.test.util.AblyCommonsReader;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import io.ably.lib.test.common.Setup;
import io.ably.lib.types.AblyException;
import io.ably.lib.types.ChannelOptions;
import io.ably.lib.types.Message;
-import io.ably.lib.util.Base64Coder;
-import io.ably.lib.util.Crypto;
import io.ably.lib.util.Crypto.CipherParams;
@Ignore("FIXME: Initialization is failing")
@@ -62,8 +60,8 @@ public enum FixtureSet {
}
private CryptoTestData loadTestData() throws IOException {
- return (CryptoTestData)Setup.loadJson(
- "ably-common/test-resources/" + fileName + ".json",
+ return (CryptoTestData) AblyCommonsReader.read(
+ "test-resources/" + fileName + ".json",
CryptoTestData.class);
}
}
diff --git a/lib/src/test/resources/ably-common b/lib/src/test/resources/ably-common
deleted file mode 160000
index b2eeb4e1e..000000000
--- a/lib/src/test/resources/ably-common
+++ /dev/null
@@ -1 +0,0 @@
-Subproject commit b2eeb4e1efa8de83693649314c5d575a096fdb78
diff --git a/lib/src/test/resources/local/testAppSpec.json b/lib/src/test/resources/local/testAppSpec.json
index a0721dbcb..5c6b7fb5c 100644
--- a/lib/src/test/resources/local/testAppSpec.json
+++ b/lib/src/test/resources/local/testAppSpec.json
@@ -32,7 +32,11 @@
{
"id": "pushenabled",
"pushEnabled": true
- }
+ },
+ {
+ "id": "mutable",
+ "mutableMessages": true
+ }
],
"channels": [
{