diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index b43bf363c..408a5f286 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -31,6 +31,7 @@ import io.ably.lib.types.DeltaExtras; import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.Message; +import io.ably.lib.types.MessageAction; import io.ably.lib.types.MessageAnnotations; import io.ably.lib.types.MessageDecodeException; import io.ably.lib.types.MessageOperation; @@ -42,9 +43,12 @@ import io.ably.lib.types.ProtocolMessage; import io.ably.lib.types.ProtocolMessage.Action; import io.ably.lib.types.ProtocolMessage.Flag; +import io.ably.lib.types.PublishResult; import io.ably.lib.types.Summary; +import io.ably.lib.types.UpdateDeleteResult; import io.ably.lib.util.CollectionUtils; import io.ably.lib.util.EventEmitter; +import io.ably.lib.util.Listeners; import io.ably.lib.util.Log; import io.ably.lib.util.ReconnectionStrategy; import io.ably.lib.util.StringUtils; @@ -440,6 +444,16 @@ private static void callCompletionListenerError(CompletionListener listener, Err } } + private static void callCompletionListenerError(Callback listener, ErrorInfo err) { + if(listener != null) { + try { + listener.onError(err); + } catch(Throwable t) { + Log.e(TAG, "Unexpected exception calling CompletionListener", t); + } + } + } + private void setAttached(ProtocolMessage message) { clearAttachTimers(); properties.attachSerial = message.channelSerial; @@ -1022,7 +1036,7 @@ private void unsubscribeImpl(String name, MessageListener listener) { * @throws AblyException */ public void publish(String name, Object data) throws AblyException { - publish(name, data, null); + publish(name, data, (Callback) null); } /** @@ -1034,7 +1048,7 @@ public void publish(String name, Object data) throws AblyException { * @throws AblyException */ public void publish(Message message) throws AblyException { - publish(message, null); + publish(message, (Callback) null); } /** @@ -1046,7 +1060,7 @@ public void publish(Message message) throws AblyException { * @throws AblyException */ public void publish(Message[] messages) throws AblyException { - publish(messages, null); + publish(messages, (CompletionListener) null); } /** @@ -1062,12 +1076,34 @@ public void publish(Message[] messages) throws AblyException { *

* This listener is invoked on a background thread. * @throws AblyException + * @deprecated Use {@link #publish(String, Object, Callback)} instead. */ + @Deprecated public void publish(String name, Object data, CompletionListener listener) throws AblyException { Log.v(TAG, "publish(String, Object); channel = " + this.name + "; event = " + name); publish(new Message[] {new Message(name, data)}, listener); } + /** + * Publishes a single message to the channel with the given event name and payload. + * When publish is called with this client library, it won't attempt to implicitly attach to the channel, + * so long as transient publishing is available in the library. + * Otherwise, the client will implicitly attach. + *

+ * Spec: RTL6i + * @param name the event name + * @param data the message payload + * @param callback A callback may optionally be passed in to this call to be notified of success or failure of the operation, + * receiving a {@link PublishResult} with message serial(s) on success. + *

+ * This callback is invoked on a background thread. + * @throws AblyException + */ + public void publish(String name, Object data, Callback callback) throws AblyException { + Log.v(TAG, "publish(String, Object); channel = " + this.name + "; event = " + name); + publish(new Message[] {new Message(name, data)}, callback); + } + /** * Publishes a message to the channel. * When publish is called with this client library, it won't attempt to implicitly attach to the channel. @@ -1078,12 +1114,31 @@ public void publish(String name, Object data, CompletionListener listener) throw *

* This listener is invoked on a background thread. * @throws AblyException + * @deprecated Use {@link #publish(Message, Callback)} instead. */ + @Deprecated public void publish(Message message, CompletionListener listener) throws AblyException { Log.v(TAG, "publish(Message); channel = " + this.name + "; event = " + message.name); publish(new Message[] {message}, listener); } + /** + * Publishes a message to the channel. + * When publish is called with this client library, it won't attempt to implicitly attach to the channel. + *

+ * Spec: RTL6i + * @param message A {@link Message} object. + * @param callback A callback may optionally be passed in to this call to be notified of success or failure of the operation, + * receiving a {@link PublishResult} with message serial(s) on success. + *

+ * This callback is invoked on a background thread. + * @throws AblyException + */ + public void publish(Message message, Callback callback) throws AblyException { + Log.v(TAG, "publish(Message); channel = " + this.name + "; event = " + message.name); + publish(new Message[] {message}, callback); + } + /** * Publishes an array of messages to the channel. * When publish is called with this client library, it won't attempt to implicitly attach to the channel. @@ -1095,7 +1150,12 @@ public void publish(Message message, CompletionListener listener) throws AblyExc * This listener is invoked on a background thread. * @throws AblyException */ + @Deprecated public synchronized void publish(Message[] messages, CompletionListener listener) throws AblyException { + publish(messages, Listeners.fromCompletionListener(listener)); + } + + public synchronized void publish(Message[] messages, Callback listener) throws AblyException { Log.v(TAG, "publish(Message[]); channel = " + this.name); ConnectionManager connectionManager = ably.connection.connectionManager; ConnectionManager.State connectionState = connectionManager.getConnectionState(); @@ -1205,32 +1265,26 @@ public void getMessageAsync(String serial, Callback callback) { } /** - * Updates an existing message using patch semantics. - *

- * Non-null fields in the provided message (name, data, extras) will replace the corresponding - * fields in the existing message, while null fields will be left unchanged. + * Asynchronously updates an existing message. * * @param message A {@link Message} object containing the fields to update and the serial identifier. - * Only non-null fields will be applied to the existing message. - * @param operation operation metadata such as clientId, description, or metadata in the version field - * @throws AblyException If the update operation fails. + *

+ * This callback is invoked on a background thread. */ - public void updateMessage(Message message, MessageOperation operation) throws AblyException { - messageEditsMixin.updateMessage(ably.http, message, operation); + public void updateMessage(Message message) throws AblyException { + updateMessage(message, null, null); } /** - * Updates an existing message using patch semantics. - *

- * Non-null fields in the provided message (name, data, extras) will replace the corresponding - * fields in the existing message, while null fields will be left unchanged. + * Asynchronously updates an existing message. * * @param message A {@link Message} object containing the fields to update and the serial identifier. - * Only non-null fields will be applied to the existing message. - * @throws AblyException If the update operation fails. + * @param operation operation metadata such as clientId, description, or metadata in the version field + *

+ * This callback is invoked on a background thread. */ - public void updateMessage(Message message) throws AblyException { - updateMessage(message, null); + public void updateMessage(Message message, MessageOperation operation) throws AblyException { + updateMessage(message, operation, null); } /** @@ -1238,53 +1292,48 @@ public void updateMessage(Message message) throws AblyException { * * @param message A {@link Message} object containing the fields to update and the serial identifier. * @param operation operation metadata such as clientId, description, or metadata in the version field - * @param listener A listener to be notified of the outcome of this operation. + * @param listener A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void updateMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { - messageEditsMixin.updateMessageAsync(ably.http, message, operation, listener); + public void updateMessage(Message message, MessageOperation operation, Callback listener) throws AblyException { + Log.v(TAG, "updateMessage(Message); channel = " + this.name + "; serial = " + message.serial); + updateDeleteImpl(message, operation, MessageAction.MESSAGE_UPDATE, listener); } /** * Asynchronously updates an existing message. * * @param message A {@link Message} object containing the fields to update and the serial identifier. - * @param listener A listener to be notified of the outcome of this operation. + * @param listener A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void updateMessageAsync(Message message, CompletionListener listener) { - updateMessageAsync(message, null, listener); + public void updateMessage(Message message, Callback listener) throws AblyException { + updateMessage(message, null, listener); } /** - * Marks a message as deleted. - *

- * This operation does not remove the message from history; it marks it as deleted - * while preserving the full message history. The deleted message can still be - * retrieved and will have its action set to MESSAGE_DELETE. + * Asynchronously marks a message as deleted. * - * @param message A {@link Message} message containing the serial identifier. - * @param operation operation metadata such as clientId, description, or metadata in the version field - * @throws AblyException If the delete operation fails. + * @param message A {@link Message} object containing the serial identifier and operation metadata. + *

+ * This callback is invoked on a background thread. */ - public void deleteMessage(Message message, MessageOperation operation) throws AblyException { - messageEditsMixin.deleteMessage(ably.http, message, operation); + public void deleteMessage(Message message) throws AblyException { + deleteMessage(message, null, null); } /** - * Marks a message as deleted. - *

- * This operation does not remove the message from history; it marks it as deleted - * while preserving the full message history. The deleted message can still be - * retrieved and will have its action set to MESSAGE_DELETE. + * Asynchronously marks a message as deleted. * - * @param message A {@link Message} message containing the serial identifier. - * @throws AblyException If the delete operation fails. + * @param message A {@link Message} object containing the serial identifier and operation metadata. + * @param operation operation metadata such as clientId, description, or metadata in the version field + *

+ * This callback is invoked on a background thread. */ - public void deleteMessage(Message message) throws AblyException { - deleteMessage(message, null); + public void deleteMessage(Message message, MessageOperation operation) throws AblyException { + deleteMessage(message, operation, null); } /** @@ -1292,24 +1341,115 @@ public void deleteMessage(Message message) throws AblyException { * * @param message A {@link Message} object containing the serial identifier and operation metadata. * @param operation operation metadata such as clientId, description, or metadata in the version field - * @param listener A listener to be notified of the outcome of this operation. + * @param listener A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void deleteMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { - messageEditsMixin.deleteMessageAsync(ably.http, message, operation, listener); + public void deleteMessage(Message message, MessageOperation operation, Callback listener) throws AblyException { + Log.v(TAG, "deleteMessage(Message); channel = " + this.name + "; serial = " + message.serial); + updateDeleteImpl(message, operation, MessageAction.MESSAGE_DELETE, listener); } /** * Asynchronously marks a message as deleted. * * @param message A {@link Message} object containing the serial identifier and operation metadata. - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. + */ + public void deleteMessage(Message message, Callback callback) throws AblyException { + deleteMessage(message, null, callback); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + *

+ * This callback is invoked on a background thread. */ - public void deleteMessageAsync(Message message, CompletionListener listener) { - deleteMessageAsync(message, null, listener); + public void appendMessage(Message message) throws AblyException { + appendMessage(message, null, null); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + *

+ * This callback is invoked on a background thread. + */ + public void appendMessage(Message message, MessageOperation operation) throws AblyException { + appendMessage(message, operation, null); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + * @param listener A callback to be notified of the outcome of this operation. + *

+ * This callback is invoked on a background thread. + */ + public void appendMessage(Message message, MessageOperation operation, Callback listener) throws AblyException { + Log.v(TAG, "appendMessage(Message); channel = " + this.name + "; serial = " + message.serial); + updateDeleteImpl(message, operation, MessageAction.MESSAGE_APPEND, listener); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param callback A callback to be notified of the outcome of this operation. + *

+ * This callback is invoked on a background thread. + */ + public void appendMessage(Message message, Callback callback) throws AblyException { + appendMessage(message, null, callback); + } + + private void updateDeleteImpl( + Message message, + MessageOperation operation, + MessageAction action, + Callback listener + ) throws AblyException { + if (message.serial == null || message.serial.isEmpty()) { + throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003)); + } + ConnectionManager connectionManager = ably.connection.connectionManager; + ConnectionManager.State connectionState = connectionManager.getConnectionState(); + boolean queueMessages = ably.options.queueMessages; + if (!connectionManager.isActive() || (connectionState.queueEvents && !queueMessages)) { + throw AblyException.fromErrorInfo(connectionState.defaultErrorInfo); + } + boolean connected = (connectionState.sendEvents); + + Message updatedMessage = new Message(message.name, message.data, message.extras); + updatedMessage.action = action; + updatedMessage.version = new MessageVersion(); + if (operation != null) { + updatedMessage.version.clientId = operation.clientId; + updatedMessage.version.description = operation.description; + updatedMessage.version.metadata = operation.metadata; + } + + try { + ably.auth.checkClientId(message, true, connected); + updatedMessage.encode(options); + } catch (AblyException e) { + if (listener != null) { + listener.onError(e.errorInfo); + } + return; + } + + ProtocolMessage msg = new ProtocolMessage(Action.message, this.name); + msg.messages = new Message[] { message }; + connectionManager.send(msg, queueMessages, Listeners.toPublishResultListener(listener)); } /** @@ -1628,7 +1768,7 @@ public void once(ChannelState state, ChannelStateListener listener) { */ public void sendProtocolMessage(ProtocolMessage protocolMessage, CompletionListener listener) throws AblyException { ConnectionManager connectionManager = ably.connection.connectionManager; - connectionManager.send(protocolMessage, ably.options.queueMessages, listener); + connectionManager.send(protocolMessage, ably.options.queueMessages, Listeners.fromCompletionListener(listener)); } private static final String TAG = Channel.class.getName(); diff --git a/lib/src/main/java/io/ably/lib/realtime/Presence.java b/lib/src/main/java/io/ably/lib/realtime/Presence.java index c56297fc0..9a7e89e7e 100644 --- a/lib/src/main/java/io/ably/lib/realtime/Presence.java +++ b/lib/src/main/java/io/ably/lib/realtime/Presence.java @@ -15,6 +15,8 @@ import io.ably.lib.types.PresenceMessage; import io.ably.lib.types.PresenceSerializer; import io.ably.lib.types.ProtocolMessage; +import io.ably.lib.types.PublishResult; +import io.ably.lib.util.Listeners; import io.ably.lib.util.Log; import io.ably.lib.util.StringUtils; @@ -120,9 +122,9 @@ public synchronized PresenceMessage[] get(String clientId, boolean wait) throws return get(new Param(GET_WAITFORSYNC, String.valueOf(wait)), new Param(GET_CLIENTID, clientId)); } - void addPendingPresence(PresenceMessage presenceMessage, CompletionListener listener) { + void addPendingPresence(PresenceMessage presenceMessage, Callback listener) { synchronized(channel) { - final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage,listener); + final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage, Listeners.unwrap(listener)); pendingPresence.add(queuedPresence); } } @@ -763,7 +765,7 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr ProtocolMessage message = new ProtocolMessage(ProtocolMessage.Action.presence, channel.name); message.presence = new PresenceMessage[] { msg }; ConnectionManager connectionManager = ably.connection.connectionManager; - connectionManager.send(message, ably.options.queueMessages, listener); + connectionManager.send(message, ably.options.queueMessages, Listeners.fromCompletionListener(listener)); break; default: throw AblyException.fromErrorInfo(new ErrorInfo("Unable to enter presence channel in detached or failed state", 400, 91001)); @@ -892,7 +894,7 @@ private void sendQueuedMessages() { pendingPresence.clear(); try { - connectionManager.send(message, queueMessages, listener); + connectionManager.send(message, queueMessages, Listeners.fromCompletionListener(listener)); } catch(AblyException e) { Log.e(TAG, "sendQueuedMessages(): Unexpected exception sending message", e); if(listener != null) diff --git a/lib/src/main/java/io/ably/lib/rest/ChannelBase.java b/lib/src/main/java/io/ably/lib/rest/ChannelBase.java index 47ec7bb4c..a7bb1e404 100644 --- a/lib/src/main/java/io/ably/lib/rest/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/rest/ChannelBase.java @@ -17,6 +17,8 @@ import io.ably.lib.types.Param; import io.ably.lib.types.PresenceMessage; import io.ably.lib.types.PresenceSerializer; +import io.ably.lib.types.PublishResult; +import io.ably.lib.types.UpdateDeleteResult; import io.ably.lib.util.Crypto; /** @@ -65,6 +67,23 @@ void publish(Http http, String name, Object data) throws AblyException { publishImpl(http, name, data).sync(); } + /** + * Publish a message on this channel using the REST API and return the result. + * Since the REST API is stateless, this request is made independently + * of any other request on this or any other channel. + * @param name the event name + * @param data the message payload; + * @return A {@link PublishResult} containing the message serial(s) + * @throws AblyException + */ + public PublishResult publishWithResult(String name, Object data) throws AblyException { + return publishWithResult(ably.http, name, data); + } + + PublishResult publishWithResult(Http http, String name, Object data) throws AblyException { + return publishWithResultImpl(http, name, data).sync(); + } + /** * Publish a message on this channel using the REST API. * Since the REST API is stateless, this request is made independently @@ -75,7 +94,9 @@ void publish(Http http, String name, Object data) throws AblyException { * @param listener a listener to be notified of the outcome of this message. *

* This listener is invoked on a background thread. + * @deprecated Use {@link #publishAsync(String, Object, Callback)} instead. */ + @Deprecated public void publishAsync(String name, Object data, CompletionListener listener) { publishAsync(ably.http, name, data, listener); } @@ -84,10 +105,33 @@ void publishAsync(Http http, String name, Object data, CompletionListener listen publishImpl(http, name, data).async(new CompletionListener.ToCallback(listener)); } + /** + * Asynchronously publish a message on this channel using the REST API. + * Since the REST API is stateless, this request is made independently + * of any other request on this or any other channel. + * + * @param name the event name + * @param data the message payload; + * @param callback a callback to be notified of the outcome of this message with the {@link PublishResult}. + *

+ * This callback is invoked on a background thread. + */ + public void publishAsync(String name, Object data, Callback callback) { + publishAsync(ably.http, name, data, callback); + } + + void publishAsync(Http http, String name, Object data, Callback callback) { + publishWithResultImpl(http, name, data).async(callback); + } + private Http.Request publishImpl(Http http, String name, Object data) { return publishImpl(http, new Message[] {new Message(name, data)}); } + private Http.Request publishWithResultImpl(Http http, String name, Object data) { + return publishWithResultImpl(http, new Message[] {new Message(name, data)}); + } + /** * Publish an array of messages on this channel. When there are * multiple messages to be sent, it is more efficient to use this @@ -149,6 +193,43 @@ public void execute(HttpScheduler http, final Callback callback) throws Ab }); } + private Http.Request publishWithResultImpl(Http http, final Message[] messages) { + return http.request(new Http.Execute() { + @Override + public void execute(HttpScheduler http, final Callback callback) throws AblyException { + /* handle message ids */ + boolean hasClientSuppliedId = false; + for(Message message : messages) { + /* RSL1k2 */ + hasClientSuppliedId |= (message.id != null); + /* RTL6g3 */ + ably.auth.checkClientId(message, true, false); + message.encode(options); + } + if(!hasClientSuppliedId && ably.options.idempotentRestPublishing) { + /* RSL1k1: populate the message id with a library-generated id */ + String messageId = Crypto.getRandomId(); + for (int i = 0; i < messages.length; i++) { + messages[i].id = messageId + ':' + i; + } + } + + HttpCore.RequestBody requestBody = ably.options.useBinaryProtocol ? MessageSerializer.asMsgpackRequest(messages) : MessageSerializer.asJsonRequest(messages); + final Param[] params = ably.options.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; // RSC7c + + // Create ResponseHandler from BodyHandler + HttpCore.BodyHandler bodyHandler = PublishResult.getBodyHandler(); + HttpCore.ResponseHandler responseHandler = (response, error) -> { + if (error != null) throw AblyException.fromErrorInfo(error); + String[] serials = bodyHandler.handleResponseBody(response.contentType, response.body); + return new PublishResult(serials); + }; + + http.post(basePath + "/messages", HttpUtils.defaultAcceptHeaders(ably.options.useBinaryProtocol), params, requestBody, responseHandler, true, callback); + } + }); + } + /** * Obtain recent history for this channel using the REST API. * The history provided relqtes to all clients of this application, @@ -352,9 +433,10 @@ public void getMessageAsync(String serial, Callback callback) { * Only non-null fields will be applied to the existing message. * @param operation operation metadata such as clientId, description, or metadata in the version field * @throws AblyException If the update operation fails. + * @return A {@link UpdateDeleteResult} containing the updated message version serial. */ - public void updateMessage(Message message, MessageOperation operation) throws AblyException { - messageEditsMixin.updateMessage(ably.http, message, operation); + public UpdateDeleteResult updateMessage(Message message, MessageOperation operation) throws AblyException { + return messageEditsMixin.updateMessage(ably.http, message, operation); } /** @@ -366,9 +448,10 @@ public void updateMessage(Message message, MessageOperation operation) throws Ab * @param message A {@link Message} object containing the fields to update and the serial identifier. * Only non-null fields will be applied to the existing message. * @throws AblyException If the update operation fails. + * @return A {@link UpdateDeleteResult} containing the updated message version serial. */ - public void updateMessage(Message message) throws AblyException { - updateMessage(message, null); + public UpdateDeleteResult updateMessage(Message message) throws AblyException { + return updateMessage(message, null); } /** @@ -376,24 +459,24 @@ public void updateMessage(Message message) throws AblyException { * * @param message A {@link Message} object containing the fields to update and the serial identifier. * @param operation operation metadata such as clientId, description, or metadata in the version field - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void updateMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { - messageEditsMixin.updateMessageAsync(ably.http, message, operation, listener); + public void updateMessageAsync(Message message, MessageOperation operation, Callback callback) { + messageEditsMixin.updateMessageAsync(ably.http, message, operation, callback); } /** * Asynchronously updates an existing message. * * @param message A {@link Message} object containing the fields to update and the serial identifier. - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void updateMessageAsync(Message message, CompletionListener listener) { - updateMessageAsync(message, null, listener); + public void updateMessageAsync(Message message, Callback callback) { + updateMessageAsync(message, null, callback); } /** @@ -406,9 +489,10 @@ public void updateMessageAsync(Message message, CompletionListener listener) { * @param message A {@link Message} message containing the serial identifier. * @param operation operation metadata such as clientId, description, or metadata in the version field * @throws AblyException If the delete operation fails. + * @return A {@link UpdateDeleteResult} containing the deleted message version serial. */ - public void deleteMessage(Message message, MessageOperation operation) throws AblyException { - messageEditsMixin.deleteMessage(ably.http, message, operation); + public UpdateDeleteResult deleteMessage(Message message, MessageOperation operation) throws AblyException { + return messageEditsMixin.deleteMessage(ably.http, message, operation); } /** @@ -420,9 +504,10 @@ public void deleteMessage(Message message, MessageOperation operation) throws Ab * * @param message A {@link Message} message containing the serial identifier. * @throws AblyException If the delete operation fails. + * @return A {@link UpdateDeleteResult} containing the deleted message version serial. */ - public void deleteMessage(Message message) throws AblyException { - deleteMessage(message, null); + public UpdateDeleteResult deleteMessage(Message message) throws AblyException { + return deleteMessage(message, null); } /** @@ -430,24 +515,72 @@ public void deleteMessage(Message message) throws AblyException { * * @param message A {@link Message} object containing the serial identifier and operation metadata. * @param operation operation metadata such as clientId, description, or metadata in the version field - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. */ - public void deleteMessageAsync(Message message, MessageOperation operation, CompletionListener listener) { - messageEditsMixin.deleteMessageAsync(ably.http, message, operation, listener); + public void deleteMessageAsync(Message message, MessageOperation operation, Callback callback) { + messageEditsMixin.deleteMessageAsync(ably.http, message, operation, callback); } /** * Asynchronously marks a message as deleted. * * @param message A {@link Message} object containing the serial identifier and operation metadata. - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A callback to be notified of the outcome of this operation. *

- * This listener is invoked on a background thread. + * This callback is invoked on a background thread. + */ + public void deleteMessageAsync(Message message, Callback callback) { + deleteMessageAsync(message, null, callback); + } + + /** + * Appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + * @return A {@link UpdateDeleteResult} containing the updated message version serial. + * @throws AblyException If the append operation fails. + */ + public UpdateDeleteResult appendMessage(Message message, MessageOperation operation) throws AblyException { + return messageEditsMixin.appendMessage(ably.http, message, operation); + } + + /** + * Appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @return A {@link UpdateDeleteResult} containing the updated message version serial. + * @throws AblyException If the append operation fails. + */ + public UpdateDeleteResult appendMessage(Message message) throws AblyException { + return appendMessage(message, null); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + * @param callback A callback to be notified of the outcome of this operation. + *

+ * This callback is invoked on a background thread. + */ + public void appendMessageAsync(Message message, MessageOperation operation, Callback callback) { + messageEditsMixin.appendMessageAsync(ably.http, message, operation, callback); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param callback A callback to be notified of the outcome of this operation. + *

+ * This callback is invoked on a background thread. */ - public void deleteMessageAsync(Message message, CompletionListener listener) { - deleteMessageAsync(message, null, listener); + public void appendMessageAsync(Message message, Callback callback) { + appendMessageAsync(message, null, callback); } /** diff --git a/lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java b/lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java index d08400f10..df0e72beb 100644 --- a/lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java +++ b/lib/src/main/java/io/ably/lib/rest/MessageEditsMixin.java @@ -4,7 +4,6 @@ import io.ably.lib.http.Http; import io.ably.lib.http.HttpCore; import io.ably.lib.http.HttpUtils; -import io.ably.lib.realtime.CompletionListener; import io.ably.lib.types.AblyException; import io.ably.lib.types.AsyncPaginatedResult; import io.ably.lib.types.Callback; @@ -12,11 +11,13 @@ import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.Message; +import io.ably.lib.types.MessageAction; import io.ably.lib.types.MessageOperation; -import io.ably.lib.types.MessageOperationSerializer; import io.ably.lib.types.MessageSerializer; +import io.ably.lib.types.MessageVersion; import io.ably.lib.types.PaginatedResult; import io.ably.lib.types.Param; +import io.ably.lib.types.UpdateDeleteResult; import io.ably.lib.util.Crypto; public class MessageEditsMixin { @@ -92,44 +93,23 @@ private Http.Request getMessageImpl(Http http, String serial) { * @param message A {@link Message} object containing the fields to update and the serial identifier. * Only non-null fields will be applied to the existing message. * @throws AblyException If the update operation fails. + * + * @return A {@link UpdateDeleteResult} containing the updated message version serial. */ - public void updateMessage(Http http, Message message, MessageOperation operation) throws AblyException { - updateMessageImpl(http, message, operation).sync(); + public UpdateDeleteResult updateMessage(Http http, Message message, MessageOperation operation) throws AblyException { + return updateMessageImpl(http, message, operation, MessageAction.MESSAGE_UPDATE).sync(); } /** * Asynchronously updates an existing message. * * @param message A {@link Message} object containing the fields to update and the serial identifier. - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A listener to be notified of the outcome of this operation. *

* This listener is invoked on a background thread. */ - public void updateMessageAsync(Http http, Message message, MessageOperation operation, CompletionListener listener) { - updateMessageImpl(http, message, operation).async(new CompletionListener.ToCallback(listener)); - } - - private Http.Request updateMessageImpl(Http http, Message message, MessageOperation operation) { - return http.request((scheduler, callback) -> { - if (message.serial == null || message.serial.isEmpty()) { - throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003)); - } - /* RTL6g3 */ - auth.checkClientId(message, true, false); - - HttpCore.RequestBody requestBody = clientOptions.useBinaryProtocol - ? MessageOperationSerializer.asMsgPackRequest(message, operation, channelOptions) - : MessageOperationSerializer.asJsonRequest(message, operation, channelOptions); - final Param[] params = clientOptions.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; - - scheduler.patch(basePath + "/messages/" + HttpUtils.encodeURIComponent(message.serial), - HttpUtils.defaultAcceptHeaders(clientOptions.useBinaryProtocol), - params, - requestBody, - null, - true, - callback); - }); + public void updateMessageAsync(Http http, Message message, MessageOperation operation, Callback callback) { + updateMessageImpl(http, message, operation, MessageAction.MESSAGE_UPDATE).async(callback); } /** @@ -141,24 +121,50 @@ private Http.Request updateMessageImpl(Http http, Message message, Message * * @param message A {@link Message} message containing the serial identifier. * @throws AblyException If the delete operation fails. + * + * @return A {@link UpdateDeleteResult} containing the deleted message version serial. */ - public void deleteMessage(Http http, Message message, MessageOperation operation) throws AblyException { - deleteMessageImpl(http, message, operation).sync(); + public UpdateDeleteResult deleteMessage(Http http, Message message, MessageOperation operation) throws AblyException { + return updateMessageImpl(http, message, operation, MessageAction.MESSAGE_DELETE).sync(); } /** * Asynchronously marks a message as deleted. * * @param message A {@link Message} object containing the serial identifier and operation metadata. - * @param listener A listener to be notified of the outcome of this operation. + * @param callback A listener to be notified of the outcome of this operation. + *

+ * This listener is invoked on a background thread. + */ + public void deleteMessageAsync(Http http, Message message, MessageOperation operation, Callback callback) { + updateMessageImpl(http, message, operation, MessageAction.MESSAGE_DELETE).async(callback); + } + + /** + * Appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + * @return A {@link UpdateDeleteResult} containing the updated message version serial. + */ + public UpdateDeleteResult appendMessage(Http http, Message message, MessageOperation operation) throws AblyException { + return updateMessageImpl(http, message, operation, MessageAction.MESSAGE_APPEND).sync(); + } + + /** + * Asynchronously appends message text to the end of the message. + * + * @param message A {@link Message} object containing the serial identifier and data to append. + * @param operation operation details such as clientId, description, or metadata + * @param callback A listener to be notified of the outcome of this operation. *

* This listener is invoked on a background thread. */ - public void deleteMessageAsync(Http http, Message message, MessageOperation operation, CompletionListener listener) { - deleteMessageImpl(http, message, operation).async(new CompletionListener.ToCallback(listener)); + public void appendMessageAsync(Http http, Message message, MessageOperation operation, Callback callback) { + updateMessageImpl(http, message, operation, MessageAction.MESSAGE_APPEND).async(callback); } - private Http.Request deleteMessageImpl(Http http, Message message, MessageOperation operation) { + private Http.Request updateMessageImpl(Http http, Message message, MessageOperation operation, MessageAction action) { return http.request((scheduler, callback) -> { if (message.serial == null || message.serial.isEmpty()) { throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003)); @@ -166,16 +172,33 @@ private Http.Request deleteMessageImpl(Http http, Message message, Message /* RTL6g3 */ auth.checkClientId(message, true, false); + Message updatedMessage = new Message(message.name, message.data, message.extras); + updatedMessage.action = action; + updatedMessage.version = new MessageVersion(); + if (operation != null) { + updatedMessage.version.clientId = operation.clientId; + updatedMessage.version.description = operation.description; + updatedMessage.version.metadata = operation.metadata; + } + updatedMessage.encode(channelOptions); + HttpCore.RequestBody requestBody = clientOptions.useBinaryProtocol - ? MessageOperationSerializer.asMsgPackRequest(message, operation, channelOptions) - : MessageOperationSerializer.asJsonRequest(message, operation, channelOptions); + ? MessageSerializer.asSingleMsgpackRequest(updatedMessage) + : MessageSerializer.asSingleJsonRequest(updatedMessage); final Param[] params = clientOptions.addRequestIds ? Param.array(Crypto.generateRandomRequestId()) : null; - scheduler.post(basePath + "/messages/" + HttpUtils.encodeURIComponent(message.serial) + "/delete", + HttpCore.BodyHandler bodyHandler = UpdateDeleteResult.getBodyHandler(); + + scheduler.patch(basePath + "/messages/" + HttpUtils.encodeURIComponent(message.serial), HttpUtils.defaultAcceptHeaders(clientOptions.useBinaryProtocol), params, requestBody, - null, + (response, error) -> { + if (error != null) throw AblyException.fromErrorInfo(error); + UpdateDeleteResult[] results = bodyHandler.handleResponseBody(response.contentType, response.body); + if (results != null && results.length > 0) return results[0]; + throw AblyException.fromErrorInfo(new ErrorInfo("No versionSerial in the update message response", 500, 50000)); + }, true, callback); }); diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 3a680fb2a..a1987cd27 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -28,15 +28,18 @@ import io.ably.lib.transport.ITransport.TransportParams; import io.ably.lib.transport.NetworkConnectivity.NetworkConnectivityListener; import io.ably.lib.types.AblyException; +import io.ably.lib.types.Callback; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ConnectionDetails; import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.Param; import io.ably.lib.types.ProtocolMessage; import io.ably.lib.types.ProtocolSerializer; +import io.ably.lib.types.PublishResult; import io.ably.lib.util.Log; import io.ably.lib.util.PlatformAgentProvider; import io.ably.lib.util.ReconnectionStrategy; +import org.jetbrains.annotations.Nullable; public class ConnectionManager implements ConnectListener { final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); @@ -1403,7 +1406,7 @@ private synchronized void onError(ProtocolMessage message) { } private void onAck(ProtocolMessage message) { - pendingMessages.ack(message.msgSerial, message.count, message.error); + pendingMessages.ack(message.msgSerial, message.count, message.res, message.error); } private void onNack(ProtocolMessage message) { @@ -1724,14 +1727,14 @@ protected void setLastActivity(long lastActivityTime) { public static class QueuedMessage { public final ProtocolMessage msg; - public final CompletionListener listener; - public QueuedMessage(ProtocolMessage msg, CompletionListener listener) { + public final Callback listener; + public QueuedMessage(ProtocolMessage msg, Callback listener) { this.msg = msg; this.listener = listener; } } - public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener listener) throws AblyException { + public void send(ProtocolMessage msg, boolean queueEvents, Callback listener) throws AblyException { State state; synchronized(this) { state = this.currentState; @@ -1747,7 +1750,7 @@ public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener li throw AblyException.fromErrorInfo(state.defaultErrorInfo); } - private void sendImpl(ProtocolMessage message, CompletionListener listener) throws AblyException { + private void sendImpl(ProtocolMessage message, Callback listener) throws AblyException { if(transport == null) { Log.v(TAG, "sendImpl(): Discarding message; transport unavailable"); return; @@ -1825,7 +1828,7 @@ public synchronized void push(QueuedMessage msg) { queue.add(msg); } - public void ack(long msgSerial, int count, ErrorInfo reason) { + public void ack(long msgSerial, int count, @Nullable PublishResult[] results, ErrorInfo reason) { QueuedMessage[] ackMessages = null, nackMessages = null; synchronized(this) { if (queue.isEmpty()) return; @@ -1867,11 +1870,14 @@ public void ack(long msgSerial, int count, ErrorInfo reason) { } } if(ackMessages != null) { - for(QueuedMessage msg : ackMessages) { + for (int i = 0; i < ackMessages.length; i++) { + QueuedMessage msg = ackMessages[i]; try { - if(msg.listener != null) - msg.listener.onSuccess(); - } catch(Throwable t) { + if (msg.listener != null) { + PublishResult messageResult = results != null && results.length > i ? results[i] : null; + msg.listener.onSuccess(messageResult); + } + } catch (Throwable t) { Log.e(TAG, "ack(): listener exception", t); } } diff --git a/lib/src/main/java/io/ably/lib/transport/Defaults.java b/lib/src/main/java/io/ably/lib/transport/Defaults.java index d55bce53e..83d5d83e4 100644 --- a/lib/src/main/java/io/ably/lib/transport/Defaults.java +++ b/lib/src/main/java/io/ably/lib/transport/Defaults.java @@ -12,7 +12,7 @@ public class Defaults { * spec: G4 *

*/ - public static final String ABLY_PROTOCOL_VERSION = "4"; + public static final String ABLY_PROTOCOL_VERSION = "5"; public static final String ABLY_AGENT_VERSION = String.format("%s/%s", "ably-java", BuildConfig.VERSION); diff --git a/lib/src/main/java/io/ably/lib/types/MessageAction.java b/lib/src/main/java/io/ably/lib/types/MessageAction.java index d80f3624f..20b580233 100644 --- a/lib/src/main/java/io/ably/lib/types/MessageAction.java +++ b/lib/src/main/java/io/ably/lib/types/MessageAction.java @@ -5,7 +5,8 @@ public enum MessageAction { MESSAGE_UPDATE, // 1 MESSAGE_DELETE, // 2 META, // 3 - MESSAGE_SUMMARY; // 4 + MESSAGE_SUMMARY, // 4 + MESSAGE_APPEND; // 5 static MessageAction tryFindByOrdinal(int ordinal) { return values().length <= ordinal ? null: values()[ordinal]; diff --git a/lib/src/main/java/io/ably/lib/types/MessageOperationSerializer.java b/lib/src/main/java/io/ably/lib/types/MessageOperationSerializer.java deleted file mode 100644 index 0dded32cf..000000000 --- a/lib/src/main/java/io/ably/lib/types/MessageOperationSerializer.java +++ /dev/null @@ -1,172 +0,0 @@ -package io.ably.lib.types; - -import com.google.gson.JsonObject; -import io.ably.lib.http.HttpCore; -import io.ably.lib.http.HttpUtils; -import io.ably.lib.util.Base64Coder; -import io.ably.lib.util.Log; -import io.ably.lib.util.Serialisation; -import org.msgpack.core.MessagePacker; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; - -/** - * MessageOperationSerializer: internal - * Utility class to serialize message update/delete requests in different formats. - */ -public class MessageOperationSerializer { - - /** - * Creates a JSON request body for a message update/delete operation. - * - * @param message The message containing the update/delete data - * @param operation The MessageOperation metadata - * @param channelOptions Channel options for encoding - * @return HttpCore.RequestBody for the request - * @throws AblyException If encoding fails - */ - public static HttpCore.RequestBody asJsonRequest(Message message, MessageOperation operation, ChannelOptions channelOptions) throws AblyException { - UpdateDeleteRequest request = new UpdateDeleteRequest(message, operation, channelOptions); - return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(request.asJsonObject())); - } - - /** - * Creates a MessagePack request body for a message update/delete operation. - * - * @param message The message containing the update/delete data - * @param operation The MessageOperation metadata - * @param channelOptions Channel options for encoding - * @return HttpCore.RequestBody for the request - * @throws AblyException If encoding fails - */ - public static HttpCore.RequestBody asMsgPackRequest(Message message, MessageOperation operation, ChannelOptions channelOptions) throws AblyException { - UpdateDeleteRequest request = new UpdateDeleteRequest(message, operation, channelOptions); - byte[] packed = writeMsgpack(request); - return new HttpUtils.ByteArrayRequestBody(packed, "application/x-msgpack"); - } - - /** - * Serializes an UpdateDeleteRequest to MessagePack format. - * - * @param request The request to serialize - * @return byte array containing the MessagePack data - */ - private static byte[] writeMsgpack(UpdateDeleteRequest request) { - try { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - MessagePacker packer = Serialisation.msgpackPackerConfig.newPacker(out); - request.writeMsgpack(packer); - packer.flush(); - return out.toByteArray(); - } catch (IOException e) { - Log.e(TAG, "Failed to write msgpack", e); - return null; - } - } - - /** - * Represents a request to update or delete a message. - * Contains the message data and operation metadata. - */ - static class UpdateDeleteRequest { - private static final String NAME = "name"; - private static final String DATA = "data"; - private static final String ENCODING = "encoding"; - private static final String EXTRAS = "extras"; - private static final String OPERATION = "operation"; - - public final String name; - public final Object data; - public final String encoding; - public final MessageExtras extras; - public final MessageOperation operation; - - /** - * Constructs an UpdateDeleteRequest from a Message and operation metadata. - * - * @param message The message containing the update/delete data - * @param operation The MessageOperation metadata - * @param channelOptions Channel options for encoding the message data - * @throws AblyException If encoding fails - */ - UpdateDeleteRequest(Message message, MessageOperation operation, ChannelOptions channelOptions) throws AblyException { - this.operation = operation; - this.name = message.name; - this.extras = message.extras; - - BaseMessage.EncodedMessageData encodedMessageData = message.encodeData(channelOptions); - this.data = encodedMessageData.data; - this.encoding = encodedMessageData.encoding; - } - - /** - * Writes this UpdateDeleteRequest to MessagePack format. - * - * @param packer The MessagePacker to write to - * @throws IOException If writing fails - */ - void writeMsgpack(MessagePacker packer) throws IOException { - int fieldCount = 0; - if (name != null) ++fieldCount; - if (data != null) ++fieldCount; - if (encoding != null) ++fieldCount; - if (extras != null) ++fieldCount; - if (operation != null) ++fieldCount; - - packer.packMapHeader(fieldCount); - - if (name != null) { - packer.packString(NAME); - packer.packString(name); - } - if (data != null) { - packer.packString(DATA); - if (data instanceof byte[]) { - byte[] byteData = (byte[])data; - packer.packBinaryHeader(byteData.length); - packer.writePayload(byteData); - } else { - packer.packString(data.toString()); - } - } - if (encoding != null) { - packer.packString(ENCODING); - packer.packString(encoding); - } - if (extras != null) { - packer.packString(EXTRAS); - extras.write(packer); - } - if (operation != null) { - packer.packString(OPERATION); - operation.writeMsgpack(packer); - } - } - - /** - * Base for gson serialisers. - */ - JsonObject asJsonObject() { - JsonObject json = new JsonObject(); - Object data = this.data; - String encoding = this.encoding; - if (data != null) { - if (data instanceof byte[]) { - byte[] dataBytes = (byte[])data; - json.addProperty(DATA, new String(Base64Coder.encode(dataBytes))); - encoding = (encoding == null) ? "base64" : encoding + "/base64"; - } else { - json.addProperty(DATA, data.toString()); - } - if (encoding != null) json.addProperty(ENCODING, encoding); - } - if (this.name != null) json.addProperty(NAME, this.name); - if (this.extras != null) json.add(EXTRAS, this.extras.asJsonObject()); - if (this.operation != null) json.add(OPERATION, this.operation.asJsonObject()); - return json; - } - } - - private static final String TAG = MessageOperationSerializer.class.getName(); -} diff --git a/lib/src/main/java/io/ably/lib/types/MessageSerializer.java b/lib/src/main/java/io/ably/lib/types/MessageSerializer.java index 5995ddfdf..b53c98110 100644 --- a/lib/src/main/java/io/ably/lib/types/MessageSerializer.java +++ b/lib/src/main/java/io/ably/lib/types/MessageSerializer.java @@ -47,8 +47,8 @@ public static Message[] readMsgpack(byte[] packed) throws AblyException { * Msgpack encode ****************************************/ - public static HttpCore.RequestBody asMsgpackRequest(Message message) throws AblyException { - return asMsgpackRequest(new Message[] { message }); + public static HttpCore.RequestBody asSingleMsgpackRequest(Message message) throws AblyException { + return new HttpUtils.ByteArrayRequestBody(write(message), "application/x-msgpack"); } public static HttpCore.RequestBody asMsgpackRequest(Message[] messages) { @@ -74,6 +74,16 @@ public static void writeMsgpackArray(Message[] messages, MessagePacker packer) { } catch(IOException e) {} } + public static byte[] write(Message message) { + try { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + MessagePacker packer = Serialisation.msgpackPackerConfig.newPacker(out); + message.writeMsgpack(packer); + packer.flush(); + return out.toByteArray(); + } catch(IOException e) { return null; } + } + public static void write(final Map map, final MessagePacker packer) throws IOException { packer.packMapHeader(map.size()); for (final Map.Entry entry : map.entrySet()) { @@ -138,6 +148,10 @@ public static HttpCore.RequestBody asJsonRequest(Message message) throws AblyExc return asJsonRequest(new Message[] { message }); } + public static HttpCore.RequestBody asSingleJsonRequest(Message message) { + return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(message)); + } + public static HttpCore.RequestBody asJsonRequest(Message[] messages) { return new HttpUtils.JsonRequestBody(Serialisation.gson.toJson(messages)); } diff --git a/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java b/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java index 0548e4c64..e813a21b7 100644 --- a/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java +++ b/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java @@ -137,6 +137,8 @@ public ProtocolMessage(Action action, String channel) { @JsonAdapter(ObjectsJsonSerializer.class) public Object[] state; + public @Nullable PublishResult[] res; + public boolean hasFlag(final Flag flag) { return (flags & flag.getMask()) == flag.getMask(); } @@ -161,6 +163,7 @@ void writeMsgpack(MessagePacker packer) throws IOException { if(channelSerial != null) ++fieldCount; if(annotations != null) ++fieldCount; if(state != null && ObjectsHelper.getSerializer() != null) ++fieldCount; + if(res != null) ++fieldCount; packer.packMapHeader(fieldCount); packer.packString("action"); packer.packInt(action.getValue()); @@ -209,6 +212,10 @@ void writeMsgpack(MessagePacker packer) throws IOException { Log.w(TAG, "Skipping 'state' field msgpack serialization because ObjectsSerializer not found"); } } + if (res != null) { + packer.packString("res"); + PublishResult.writeMsgpackArray(res, packer); + } } ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException { @@ -280,6 +287,9 @@ ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException { unpacker.skipValue(); } break; + case "res": + res = PublishResult.readMsgpackArray(unpacker); + break; default: Log.v(TAG, "Unexpected field: " + fieldName); unpacker.skipValue(); diff --git a/lib/src/main/java/io/ably/lib/types/PublishResult.java b/lib/src/main/java/io/ably/lib/types/PublishResult.java new file mode 100644 index 000000000..f92f9d083 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/types/PublishResult.java @@ -0,0 +1,130 @@ +package io.ably.lib.types; + +import io.ably.lib.http.HttpCore; +import io.ably.lib.util.Serialisation; +import org.jetbrains.annotations.Nullable; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessagePacker; +import org.msgpack.core.MessageUnpacker; + +import java.io.IOException; + +/** + * Contains the result of a publish operation. + */ +public class PublishResult { + + private static final String SERIALS = "serials"; + + /** + * An array of message serials corresponding 1:1 to the messages that were published. + * A serial may be null if the message was discarded due to a configured conflation rule. + */ + public final @Nullable String[] serials; + + public PublishResult(@Nullable String[] serials) { + this.serials = serials; + } + + public static PublishResult readFromJson(byte[] packed) throws MessageDecodeException { + return Serialisation.gson.fromJson(new String(packed), PublishResult.class); + } + + public static PublishResult readMsgpack(byte[] packed) throws AblyException { + try { + MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(packed); + return readMsgpack(unpacker); + } catch (IOException ioe) { + throw AblyException.fromThrowable(ioe); + } + } + + public static PublishResult readMsgpack(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + for (int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if (fieldFormat.equals(MessageFormat.NIL)) { + unpacker.unpackNil(); + continue; + } + + if (fieldName.equals(SERIALS)) { + int count = unpacker.unpackArrayHeader(); + String[] serials = new String[count]; + for (int j = 0; i < count; i++) { + if (unpacker.getNextFormat().equals(MessageFormat.NIL)) { + unpacker.unpackNil(); + serials[j] = null; + } else { + serials[j] = unpacker.unpackString(); + } + } + return new PublishResult(serials); + } else { + unpacker.skipValue(); + } + } + return new PublishResult(new String[]{}); + } + + public static void writeMsgpackArray(PublishResult[] results, MessagePacker packer) { + try { + int count = results.length; + packer.packArrayHeader(count); + for (PublishResult result : results) { + result.writeMsgpack(packer); + } + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public static PublishResult[] readMsgpackArray(MessageUnpacker unpacker) throws IOException { + int count = unpacker.unpackArrayHeader(); + PublishResult[] results = new PublishResult[count]; + for (int i = 0; i < count; i++) { + results[i] = readMsgpack(unpacker); + } + return results; + } + + public static HttpCore.BodyHandler getBodyHandler() { + return new PublishResultBodyHandler(); + } + + private void writeMsgpack(MessagePacker packer) throws IOException { + int fieldCount = 0; + if (serials != null) ++fieldCount; + packer.packMapHeader(fieldCount); + if (serials != null) { + packer.packString(SERIALS); + packer.packArrayHeader(serials.length); + for (String serial : serials) { + if (serial == null) { + packer.packNil(); + } else { + packer.packString(serial); + } + } + } + } + + private static class PublishResultBodyHandler implements HttpCore.BodyHandler { + + @Override + public String[] handleResponseBody(String contentType, byte[] body) throws AblyException { + try { + PublishResult publishResult = null; + if ("application/json".equals(contentType)) + publishResult = readFromJson(body); + else if ("application/x-msgpack".equals(contentType)) + publishResult = readMsgpack(body); + return publishResult != null ? publishResult.serials : new String[]{}; + } catch (MessageDecodeException e) { + throw AblyException.fromThrowable(e); + } + } + } +} + diff --git a/lib/src/main/java/io/ably/lib/types/UpdateDeleteResult.java b/lib/src/main/java/io/ably/lib/types/UpdateDeleteResult.java new file mode 100644 index 000000000..18d37f35a --- /dev/null +++ b/lib/src/main/java/io/ably/lib/types/UpdateDeleteResult.java @@ -0,0 +1,81 @@ +package io.ably.lib.types; + +import io.ably.lib.http.HttpCore; +import io.ably.lib.util.Serialisation; +import org.jetbrains.annotations.Nullable; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessageUnpacker; + +import java.io.IOException; + +/** + * Contains the result of an update or delete message operation. + */ +public class UpdateDeleteResult { + + private static final String VERSION_SERIAL = "versionSerial"; + + /** + * The serial of the new version of the updated or deleted message. + * Will be null if the message was superseded by a subsequent update before it could be published. + */ + public final @Nullable String versionSerial; + + public UpdateDeleteResult(@Nullable String versionSerial) { + this.versionSerial = versionSerial; + } + + public static UpdateDeleteResult readFromJson(byte[] packed) throws MessageDecodeException { + return Serialisation.gson.fromJson(new String(packed), UpdateDeleteResult.class); + } + + public static UpdateDeleteResult readMsgpack(byte[] packed) throws AblyException { + try { + MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(packed); + return readMsgpack(unpacker); + } catch (IOException ioe) { + throw AblyException.fromThrowable(ioe); + } + } + + public static UpdateDeleteResult readMsgpack(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + String versionSerial = null; + for (int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString().intern(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if (fieldFormat.equals(MessageFormat.NIL)) { + unpacker.unpackNil(); + continue; + } + + if (fieldName.equals(VERSION_SERIAL)) { + versionSerial = unpacker.unpackString(); + } else { + unpacker.skipValue(); + } + } + return new UpdateDeleteResult(versionSerial); + } + + public static HttpCore.BodyHandler getBodyHandler() { + return new UpdateDeleteResultBodyHandler(); + } + + private static class UpdateDeleteResultBodyHandler implements HttpCore.BodyHandler { + + @Override + public UpdateDeleteResult[] handleResponseBody(String contentType, byte[] body) throws AblyException { + try { + UpdateDeleteResult updateDeleteResult = null; + if ("application/json".equals(contentType)) + updateDeleteResult = readFromJson(body); + else if ("application/x-msgpack".equals(contentType)) + updateDeleteResult = readMsgpack(body); + return new UpdateDeleteResult[]{updateDeleteResult}; + } catch (MessageDecodeException e) { + throw AblyException.fromThrowable(e); + } + } + } +} diff --git a/lib/src/main/java/io/ably/lib/util/Listeners.java b/lib/src/main/java/io/ably/lib/util/Listeners.java new file mode 100644 index 000000000..36076bee3 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/util/Listeners.java @@ -0,0 +1,72 @@ +package io.ably.lib.util; + +import io.ably.lib.realtime.CompletionListener; +import io.ably.lib.types.Callback; +import io.ably.lib.types.ErrorInfo; +import io.ably.lib.types.PublishResult; +import io.ably.lib.types.UpdateDeleteResult; + +public class Listeners { + + public static Callback fromCompletionListener(CompletionListener listener) { + return new CompletionListenerWrapper(listener); + } + + public static Callback toPublishResultListener(Callback listener) { + return new UpdateResultToPublishAdapter(listener); + } + + public static CompletionListener unwrap(Callback listener) { + if (listener instanceof CompletionListenerWrapper) { + return ((CompletionListenerWrapper)listener).listener; + } else { + return null; + } + } + + private static class CompletionListenerWrapper implements Callback { + private final CompletionListener listener; + + private CompletionListenerWrapper(CompletionListener listener) { + this.listener = listener; + } + + @Override + public void onSuccess(T result) { + if (listener != null) { + listener.onSuccess(); + } + } + + @Override + public void onError(ErrorInfo reason) { + if (listener != null) { + listener.onError(reason); + } + } + } + + private static class UpdateResultToPublishAdapter implements Callback { + private final Callback listener; + + private UpdateResultToPublishAdapter(Callback listener) { + this.listener = listener; + } + + @Override + public void onSuccess(PublishResult result) { + if (listener != null) { + String serial = result != null && result.serials != null && result.serials.length > 0 + ? result.serials[0] : null; + listener.onSuccess(new UpdateDeleteResult(serial)); + } + } + + @Override + public void onError(ErrorInfo reason) { + if (listener != null) { + listener.onError(reason); + } + } + } +} diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelMessageEditTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelMessageEditTest.java new file mode 100644 index 000000000..797105841 --- /dev/null +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelMessageEditTest.java @@ -0,0 +1,466 @@ +package io.ably.lib.test.realtime; + +import io.ably.lib.realtime.AblyRealtime; +import io.ably.lib.realtime.Channel; +import io.ably.lib.test.common.Helpers.CompletionSet; +import io.ably.lib.test.common.ParameterizedTest; +import io.ably.lib.types.AblyException; +import io.ably.lib.types.ChannelOptions; +import io.ably.lib.types.ClientOptions; +import io.ably.lib.types.ErrorInfo; +import io.ably.lib.types.Message; +import io.ably.lib.types.MessageAction; +import io.ably.lib.types.MessageOperation; +import io.ably.lib.types.PaginatedResult; +import io.ably.lib.types.Param; +import io.ably.lib.util.Crypto; +import io.ably.lib.util.Listeners; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.util.HashMap; +import java.util.UUID; +import java.util.function.Predicate; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +/** + * Tests for REST channel message edit and delete operations + */ +public class RealtimeChannelMessageEditTest extends ParameterizedTest { + + @Rule + public Timeout testTimeout = Timeout.seconds(300); + private AblyRealtime ably; + + @Before + public void setUpBefore() throws Exception { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + ably = new AblyRealtime(opts); + } + + /** + * Test getMessage: Publish a message and retrieve it by serial + */ + @Test + public void getMessage_retrieveBySerial() throws Exception { + String channelName = "mutable:get_message_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Test message data"); + + // Get the message from history to obtain its serial + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Retrieve the message by serial + Message retrievedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + + // Verify the retrieved message + assertNotNull("Expected non-null retrieved message", retrievedMessage); + assertEquals("Expected same message name", publishedMessage.name, retrievedMessage.name); + assertEquals("Expected same message data", publishedMessage.data, retrievedMessage.data); + assertEquals("Expected same serial", publishedMessage.serial, retrievedMessage.serial); + } + + /** + * Test updateMessage: Update a message's data + */ + @Test + public void updateMessage_updateData() throws Exception { + String channelName = "mutable:update_message_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original message data"); + + // Get the message from history to obtain its serial + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message + Message updateMessage = new Message(); + updateMessage.serial = publishedMessage.serial; + updateMessage.data = "Updated message data"; + updateMessage.name = "updated_event"; + + channel.updateMessage(updateMessage); + + // Retrieve the updated message + Message updatedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + + // Verify the message was updated + assertNotNull("Expected non-null updated message", updatedMessage); + assertEquals("Expected updated message data", "Updated message data", updatedMessage.data); + assertEquals("Expected updated message name", "updated_event", updatedMessage.name); + assertEquals("Expected action to be MESSAGE_UPDATE", MessageAction.MESSAGE_UPDATE, updatedMessage.action); + } + + /** + * Test updateMessage: Update a message's data + */ + @Test + public void updateMessage_updateEncodedData() throws Exception { + String channelName = "mutable:update_encodedmessage_" + UUID.randomUUID() + "_" + testParams.name; + ChannelOptions channelOptions = ChannelOptions.withCipherKey(Crypto.generateRandomKey()); + Channel channel = ably.channels.get(channelName, channelOptions); + + // Publish a message + channel.publish("test_event", "Original message data"); + + // Get the message from history to obtain its serial + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message + Message updateMessage = new Message(); + updateMessage.serial = publishedMessage.serial; + updateMessage.data = "Updated message data"; + updateMessage.name = "updated_event"; + + channel.updateMessage(updateMessage); + + // Retrieve the updated message + Message updatedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + + // Verify the message was updated + assertNotNull("Expected non-null updated message", updatedMessage); + assertEquals("Expected updated message data", "Updated message data", updatedMessage.data); + assertEquals("Expected updated message name", "updated_event", updatedMessage.name); + assertEquals("Expected action to be MESSAGE_UPDATE", MessageAction.MESSAGE_UPDATE, updatedMessage.action); + } + + /** + * Test updateMessage async: Update a message using async API + */ + @Test + public void updateMessage_async() throws Exception { + String channelName = "mutable:update_message_async_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original message data"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + final Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message using async API + Message updateMessage = new Message(); + updateMessage.serial = publishedMessage.serial; + updateMessage.data = "Updated message data async"; + + CompletionSet updateComplete = new CompletionSet(); + channel.updateMessage(updateMessage, Listeners.fromCompletionListener(updateComplete.add())); + + ErrorInfo[] updateErrors = updateComplete.waitFor(); + assertEquals("Expected no errors from update", 0, updateErrors.length); + + // Retrieve the updated message + Message updatedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + assertNotNull("Expected non-null updated message", updatedMessage); + assertEquals("Expected updated message data", "Updated message data async", updatedMessage.data); + } + + /** + * Test deleteMessage: Soft delete a message + */ + @Test + public void deleteMessage_softDelete() throws Exception { + String channelName = "mutable:delete_message_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Message to be deleted"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Delete the message + Message deleteMessage = new Message(); + deleteMessage.serial = publishedMessage.serial; + deleteMessage.data = "Message deleted"; + + channel.deleteMessage(deleteMessage); + + // Retrieve the deleted message + Message deletedMessage = waitForDeletedMessageAppear(channel, publishedMessage.serial); + + // Verify the message was soft deleted + assertNotNull("Expected non-null deleted message", deletedMessage); + assertEquals("Expected action to be MESSAGE_DELETE", MessageAction.MESSAGE_DELETE, deletedMessage.action); + } + + /** + * Test deleteMessage async: Delete a message using async API + */ + @Test + public void deleteMessage_async() throws Exception { + String channelName = "mutable:delete_message_async_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Message to be deleted async"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + final Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Delete the message using async API + Message deleteMessage = new Message(); + deleteMessage.serial = publishedMessage.serial; + deleteMessage.data = "Message deleted async"; + + CompletionSet deleteComplete = new CompletionSet(); + channel.deleteMessage(deleteMessage, Listeners.fromCompletionListener(deleteComplete.add())); + + ErrorInfo[] deleteErrors = deleteComplete.waitFor(); + assertEquals("Expected no errors from delete", 0, deleteErrors.length); + + // Retrieve the deleted message + Message deletedMessage = waitForDeletedMessageAppear(channel, publishedMessage.serial); + assertNotNull("Expected non-null deleted message", deletedMessage); + assertEquals("Expected action to be MESSAGE_DELETE", MessageAction.MESSAGE_DELETE, deletedMessage.action); + } + + /** + * Test getMessageVersions: Retrieve version history of a message + */ + @Test + public void getMessageVersions_retrieveHistory() throws Exception { + String channelName = "mutable:message_versions_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original data"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message to create version history + Message updateMessage1 = new Message(); + updateMessage1.serial = publishedMessage.serial; + updateMessage1.data = "First update"; + channel.updateMessage(updateMessage1); + + Message updateMessage2 = new Message(); + updateMessage2.serial = publishedMessage.serial; + updateMessage2.data = "Second update"; + MessageOperation messageOperation = new MessageOperation(); + messageOperation.description = "description"; + messageOperation.metadata = new HashMap<>(); + messageOperation.metadata.put("key", "value"); + channel.updateMessage(updateMessage2, messageOperation); + + // Retrieve version history + PaginatedResult versions = waitForMessageAppearInVersionHistory(channel, publishedMessage.serial, null, msgs -> + msgs.length >= 3 + ); + + // Verify version history + assertNotNull("Expected non-null versions", versions); + assertTrue("Expected at least 3 versions (original + 2 updates)", versions.items().length >= 3); + + Message latestVersion = versions.items()[versions.items().length - 1]; + assertEquals("Expected latest version to have second update data", "Second update", latestVersion.data); + assertEquals("description", latestVersion.version.description); + assertEquals("value", latestVersion.version.metadata.get("key")); + } + + /** + * Test getMessageVersions async: Retrieve version history using async API + */ + @Test + public void getMessageVersions_async() throws Exception { + String channelName = "mutable:message_versions_async_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original data"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + final Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + } + + /** + * Test error handling: getMessage with invalid serial + */ + @Test + public void getMessage_invalidSerial() { + String channelName = "mutable:get_message_invalid_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + AblyException exception = assertThrows(AblyException.class, () -> { + channel.getMessage("invalid_serial_12345"); + }); + + assertNotNull("Expected error info", exception.errorInfo); + } + + /** + * Test error handling: updateMessage with null serial + */ + @Test + public void updateMessage_nullSerial() { + String channelName = "mutable:update_message_null_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + AblyException exception = assertThrows(AblyException.class, () -> { + Message updateMessage = new Message(); + updateMessage.serial = null; + updateMessage.data = "Update data"; + + channel.updateMessage(updateMessage); + }); + + assertNotNull("Expected error info", exception.errorInfo); + assertTrue("Expected error message about serial", + exception.errorInfo.message.toLowerCase().contains("serial")); + } + + /** + * Test error handling: deleteMessage with empty serial + */ + @Test + public void deleteMessage_emptySerial() { + String channelName = "mutable:delete_message_empty_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + AblyException exception = assertThrows(AblyException.class, () -> { + Message deleteMessage = new Message(); + deleteMessage.serial = ""; + deleteMessage.data = "Delete data"; + + channel.deleteMessage(deleteMessage); + }); + + assertNotNull("Expected error info", exception.errorInfo); + assertTrue("Expected error message about serial", + exception.errorInfo.message.toLowerCase().contains("serial")); + } + + /** + * Test complete workflow: publish, update, get versions, delete + */ + @Test + public void completeWorkflow_publishUpdateVersionsDelete() throws Exception { + String channelName = "mutable:complete_workflow_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // 1. Publish a message + channel.publish("workflow_event", "Initial data"); + + // Get the published message + PaginatedResult history = waitForMessageAppearInHistory(channel); + Message publishedMessage = history.items()[0]; + String serial = publishedMessage.serial; + + // 2. Update the message + Message updateMessage = new Message(); + updateMessage.serial = serial; + updateMessage.data = "Updated data"; + updateMessage.name = "workflow_event_updated"; + channel.updateMessage(updateMessage); + + // 3. Verify update + Message retrieved = waitForUpdatedMessageAppear(channel, serial); + assertEquals("Expected updated data", "Updated data", retrieved.data); + assertEquals("Expected MESSAGE_UPDATE action", MessageAction.MESSAGE_UPDATE, retrieved.action); + + // 4. Delete the message + Message deleteMessage = new Message(); + deleteMessage.serial = serial; + deleteMessage.data = "Deleted"; + channel.deleteMessage(deleteMessage); + + // 5. Verify deletion + Message deleted = waitForDeletedMessageAppear(channel, serial); + assertEquals("Expected MESSAGE_DELETE action", MessageAction.MESSAGE_DELETE, deleted.action); + + // 6. Verify delete appears in versions + PaginatedResult finalVersions = waitForMessageAppearInVersionHistory(channel, serial, null, msgs -> + msgs.length > 0 && msgs[msgs.length - 1].action == MessageAction.MESSAGE_DELETE + ); + assertTrue("Expected at least 3 versions (create, update, delete)", finalVersions.items().length >= 3); + } + + private PaginatedResult waitForMessageAppearInVersionHistory(Channel channel, String serial, Param[] params, Predicate predicate) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + PaginatedResult history = channel.getMessageVersions(serial, params); + if (history.items().length > 0 && predicate.test(history.items()) || System.currentTimeMillis() > timeout) + return history; + Thread.sleep(200); + } + } + + private PaginatedResult waitForMessageAppearInHistory(Channel channel) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + PaginatedResult history = channel.history(null); + if (history.items().length > 0 || System.currentTimeMillis() > timeout) return history; + Thread.sleep(200); + } + } + + private Message waitForUpdatedMessageAppear(Channel channel, String serial) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + Message message = channel.getMessage(serial); + if ((message != null && message.action == MessageAction.MESSAGE_UPDATE) || System.currentTimeMillis() > timeout) + return message; + Thread.sleep(200); + } + } + + private Message waitForDeletedMessageAppear(Channel channel, String serial) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + Message message = channel.getMessage(serial); + if ((message != null && message.action == MessageAction.MESSAGE_DELETE) || System.currentTimeMillis() > timeout) + return message; + Thread.sleep(200); + } + } +} diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java index 65133d4cd..11a07274c 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java @@ -19,9 +19,11 @@ import io.ably.lib.test.util.MockWebsocketFactory; import io.ably.lib.transport.Defaults; import io.ably.lib.types.AblyException; +import io.ably.lib.types.Callback; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.ProtocolMessage; +import io.ably.lib.types.PublishResult; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -392,9 +394,9 @@ public void connect_test_queued_messages_on_failure() { final int[] numberOfErrors = new int[]{0}; // assume we are in connecting state now - ably.connection.connectionManager.send(new ProtocolMessage(), true, new CompletionListener() { + ably.connection.connectionManager.send(new ProtocolMessage(), true, new Callback() { @Override - public void onSuccess() { + public void onSuccess(PublishResult result) { fail("Unexpected success sending message"); } diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java index 940aa3dd2..172f2a545 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeHttpHeaderTest.java @@ -81,7 +81,7 @@ public void realtime_websocket_param_test() { * Defaults.ABLY_VERSION_PARAM, as ultimately the request param has been derived from those values. */ assertEquals("Verify correct version", requestParameters.get("v"), - Collections.singletonList("4")); + Collections.singletonList("5")); /* Spec RSC7d3 * This test should not directly validate version against Defaults.ABLY_AGENT_VERSION, nor diff --git a/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java b/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java index a07b50d9c..f26fda4eb 100644 --- a/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java +++ b/lib/src/test/java/io/ably/lib/test/rest/HttpHeaderTest.java @@ -81,7 +81,7 @@ public void header_lib_channel_publish() { * from those values. */ Assert.assertNotNull("Expected headers", headers); - Assert.assertEquals(headers.get("x-ably-version"), "4"); + Assert.assertEquals(headers.get("x-ably-version"), "5"); Assert.assertEquals(headers.get("ably-agent"), expectedAblyAgentHeader); // RSA7e2 Assert.assertNull("Shouldn't include 'x-ably-clientid' if `clientId` is not specified", headers.get("x-ably-clientid")); diff --git a/lib/src/test/java/io/ably/lib/test/rest/RestChannelMessageEditTest.java b/lib/src/test/java/io/ably/lib/test/rest/RestChannelMessageEditTest.java index a15fbb3c8..1154b8cb1 100644 --- a/lib/src/test/java/io/ably/lib/test/rest/RestChannelMessageEditTest.java +++ b/lib/src/test/java/io/ably/lib/test/rest/RestChannelMessageEditTest.java @@ -16,6 +16,7 @@ import io.ably.lib.types.PaginatedResult; import io.ably.lib.types.Param; import io.ably.lib.util.Crypto; +import io.ably.lib.util.Listeners; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -184,7 +185,7 @@ public void updateMessage_async() throws Exception { updateMessage.data = "Updated message data async"; CompletionSet updateComplete = new CompletionSet(); - channel.updateMessageAsync(updateMessage, updateComplete.add()); + channel.updateMessageAsync(updateMessage, Listeners.fromCompletionListener(updateComplete.add())); ErrorInfo[] updateErrors = updateComplete.waitFor(); assertEquals("Expected no errors from update", 0, updateErrors.length); @@ -254,7 +255,7 @@ public void deleteMessage_async() throws Exception { deleteMessage.data = "Message deleted async"; CompletionSet deleteComplete = new CompletionSet(); - channel.deleteMessageAsync(deleteMessage, deleteComplete.add()); + channel.deleteMessageAsync(deleteMessage, Listeners.fromCompletionListener(deleteComplete.add())); ErrorInfo[] deleteErrors = deleteComplete.waitFor(); assertEquals("Expected no errors from delete", 0, deleteErrors.length); diff --git a/lib/src/test/java/io/ably/lib/transport/DefaultsTest.java b/lib/src/test/java/io/ably/lib/transport/DefaultsTest.java index b88d78d81..7f52ecb83 100644 --- a/lib/src/test/java/io/ably/lib/transport/DefaultsTest.java +++ b/lib/src/test/java/io/ably/lib/transport/DefaultsTest.java @@ -9,7 +9,7 @@ public class DefaultsTest { @Test public void protocol_version_CSV2() { - assertThat(Defaults.ABLY_PROTOCOL_VERSION, is("4")); + assertThat(Defaults.ABLY_PROTOCOL_VERSION, is("5")); } @Test