diff --git a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java index ded2c967d..5bac5f249 100644 --- a/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java +++ b/lib/src/main/java/io/ably/lib/realtime/ChannelBase.java @@ -31,6 +31,7 @@ import io.ably.lib.types.DeltaExtras; import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.Message; +import io.ably.lib.types.MessageAction; import io.ably.lib.types.MessageAnnotations; import io.ably.lib.types.MessageDecodeException; import io.ably.lib.types.MessageOperation; @@ -46,6 +47,7 @@ import io.ably.lib.types.UpdateDeleteResult; import io.ably.lib.util.CollectionUtils; import io.ably.lib.util.EventEmitter; +import io.ably.lib.util.Listeners; import io.ably.lib.util.Log; import io.ably.lib.util.ReconnectionStrategy; import io.ably.lib.util.StringUtils; @@ -1123,7 +1125,7 @@ public synchronized void publish(Message[] messages, CompletionListener listener case suspended: throw AblyException.fromErrorInfo(new ErrorInfo("Unable to publish in failed or suspended state", 400, 40000)); default: - connectionManager.send(msg, queueMessages, listener); + connectionManager.send(msg, queueMessages, Listeners.fromCompletionListener(listener)); } } @@ -1206,34 +1208,26 @@ public void getMessageAsync(String serial, Callback callback) { } /** - * Updates an existing message using patch semantics. - *

- * Non-null fields in the provided message (name, data, extras) will replace the corresponding - * fields in the existing message, while null fields will be left unchanged. + * Asynchronously updates an existing message. * * @param message A {@link Message} object containing the fields to update and the serial identifier. - * Only non-null fields will be applied to the existing message. - * @param operation operation metadata such as clientId, description, or metadata in the version field - * @throws AblyException If the update operation fails. - * @return A {@link UpdateDeleteResult} containing the updated message version serial. + *

+ * This callback is invoked on a background thread. */ - public UpdateDeleteResult updateMessage(Message message, MessageOperation operation) throws AblyException { - return messageEditsMixin.updateMessage(ably.http, message, operation); + public void updateMessage(Message message) throws AblyException { + updateMessage(message, null, null); } /** - * Updates an existing message using patch semantics. - *

- * Non-null fields in the provided message (name, data, extras) will replace the corresponding - * fields in the existing message, while null fields will be left unchanged. + * Asynchronously updates an existing message. * * @param message A {@link Message} object containing the fields to update and the serial identifier. - * Only non-null fields will be applied to the existing message. - * @throws AblyException If the update operation fails. - * @return A {@link UpdateDeleteResult} containing the updated message version serial. + * @param operation operation metadata such as clientId, description, or metadata in the version field + *

+ * This callback is invoked on a background thread. */ - public UpdateDeleteResult updateMessage(Message message) throws AblyException { - return updateMessage(message, null); + public void updateMessage(Message message, MessageOperation operation) throws AblyException { + updateMessage(message, operation, null); } /** @@ -1241,55 +1235,48 @@ public UpdateDeleteResult updateMessage(Message message) throws AblyException { * * @param message A {@link Message} object containing the fields to update and the serial identifier. * @param operation operation metadata such as clientId, description, or metadata in the version field - * @param callback A callback to be notified of the outcome of this operation. + * @param listener A callback to be notified of the outcome of this operation. *

* This callback is invoked on a background thread. */ - public void updateMessageAsync(Message message, MessageOperation operation, Callback callback) { - messageEditsMixin.updateMessageAsync(ably.http, message, operation, callback); + public void updateMessage(Message message, MessageOperation operation, Callback listener) throws AblyException { + Log.v(TAG, "updateMessage(Message); channel = " + this.name + "; serial = " + message.serial); + updateDeleteImpl(message, operation, MessageAction.MESSAGE_UPDATE, listener); } /** * Asynchronously updates an existing message. * * @param message A {@link Message} object containing the fields to update and the serial identifier. - * @param callback A callback to be notified of the outcome of this operation. + * @param listener A callback to be notified of the outcome of this operation. *

* This callback is invoked on a background thread. */ - public void updateMessageAsync(Message message, Callback callback) { - updateMessageAsync(message, null, callback); + public void updateMessage(Message message, Callback listener) throws AblyException { + updateMessage(message, null, listener); } /** - * Marks a message as deleted. - *

- * This operation does not remove the message from history; it marks it as deleted - * while preserving the full message history. The deleted message can still be - * retrieved and will have its action set to MESSAGE_DELETE. + * Asynchronously marks a message as deleted. * - * @param message A {@link Message} message containing the serial identifier. - * @param operation operation metadata such as clientId, description, or metadata in the version field - * @throws AblyException If the delete operation fails. - * @return A {@link UpdateDeleteResult} containing the deleted message version serial. + * @param message A {@link Message} object containing the serial identifier and operation metadata. + *

+ * This callback is invoked on a background thread. */ - public UpdateDeleteResult deleteMessage(Message message, MessageOperation operation) throws AblyException { - return messageEditsMixin.deleteMessage(ably.http, message, operation); + public void deleteMessage(Message message) throws AblyException { + deleteMessage(message, null, null); } /** - * Marks a message as deleted. - *

- * This operation does not remove the message from history; it marks it as deleted - * while preserving the full message history. The deleted message can still be - * retrieved and will have its action set to MESSAGE_DELETE. + * Asynchronously marks a message as deleted. * - * @param message A {@link Message} message containing the serial identifier. - * @throws AblyException If the delete operation fails. - * @return A {@link UpdateDeleteResult} containing the deleted message version serial. + * @param message A {@link Message} object containing the serial identifier and operation metadata. + * @param operation operation metadata such as clientId, description, or metadata in the version field + *

+ * This callback is invoked on a background thread. */ - public UpdateDeleteResult deleteMessage(Message message) throws AblyException { - return deleteMessage(message, null); + public void deleteMessage(Message message, MessageOperation operation) throws AblyException { + deleteMessage(message, operation, null); } /** @@ -1297,12 +1284,13 @@ public UpdateDeleteResult deleteMessage(Message message) throws AblyException { * * @param message A {@link Message} object containing the serial identifier and operation metadata. * @param operation operation metadata such as clientId, description, or metadata in the version field - * @param callback A callback to be notified of the outcome of this operation. + * @param listener A callback to be notified of the outcome of this operation. *

* This callback is invoked on a background thread. */ - public void deleteMessageAsync(Message message, MessageOperation operation, Callback callback) { - messageEditsMixin.deleteMessageAsync(ably.http, message, operation, callback); + public void deleteMessage(Message message, MessageOperation operation, Callback listener) throws AblyException { + Log.v(TAG, "deleteMessage(Message); channel = " + this.name + "; serial = " + message.serial); + updateDeleteImpl(message, operation, MessageAction.MESSAGE_DELETE, listener); } /** @@ -1313,31 +1301,31 @@ public void deleteMessageAsync(Message message, MessageOperation operation, Call *

* This callback is invoked on a background thread. */ - public void deleteMessageAsync(Message message, Callback callback) { - deleteMessageAsync(message, null, callback); + public void deleteMessage(Message message, Callback callback) throws AblyException { + deleteMessage(message, null, callback); } /** - * Appends message text to the end of the message. + * Asynchronously appends message text to the end of the message. * * @param message A {@link Message} object containing the serial identifier and data to append. - * @param operation operation details such as clientId, description, or metadata - * @return A {@link UpdateDeleteResult} containing the updated message version serial. - * @throws AblyException If the append operation fails. + *

+ * This callback is invoked on a background thread. */ - public UpdateDeleteResult appendMessage(Message message, MessageOperation operation) throws AblyException { - return messageEditsMixin.appendMessage(ably.http, message, operation); + public void appendMessage(Message message) throws AblyException { + appendMessage(message, null, null); } /** - * Appends message text to the end of the message. + * Asynchronously appends message text to the end of the message. * * @param message A {@link Message} object containing the serial identifier and data to append. - * @return A {@link UpdateDeleteResult} containing the updated message version serial. - * @throws AblyException If the append operation fails. + * @param operation operation details such as clientId, description, or metadata + *

+ * This callback is invoked on a background thread. */ - public UpdateDeleteResult appendMessage(Message message) throws AblyException { - return appendMessage(message, null); + public void appendMessage(Message message, MessageOperation operation) throws AblyException { + appendMessage(message, operation, null); } /** @@ -1345,12 +1333,13 @@ public UpdateDeleteResult appendMessage(Message message) throws AblyException { * * @param message A {@link Message} object containing the serial identifier and data to append. * @param operation operation details such as clientId, description, or metadata - * @param callback A callback to be notified of the outcome of this operation. + * @param listener A callback to be notified of the outcome of this operation. *

* This callback is invoked on a background thread. */ - public void appendMessageAsync(Message message, MessageOperation operation, Callback callback) { - messageEditsMixin.appendMessageAsync(ably.http, message, operation, callback); + public void appendMessage(Message message, MessageOperation operation, Callback listener) throws AblyException { + Log.v(TAG, "appendMessage(Message); channel = " + this.name + "; serial = " + message.serial); + updateDeleteImpl(message, operation, MessageAction.MESSAGE_APPEND, listener); } /** @@ -1361,8 +1350,49 @@ public void appendMessageAsync(Message message, MessageOperation operation, Call *

* This callback is invoked on a background thread. */ - public void appendMessageAsync(Message message, Callback callback) { - appendMessageAsync(message, null, callback); + public void appendMessage(Message message, Callback callback) throws AblyException { + appendMessage(message, null, callback); + } + + private void updateDeleteImpl( + Message message, + MessageOperation operation, + MessageAction action, + Callback listener + ) throws AblyException { + if (message.serial == null || message.serial.isEmpty()) { + throw AblyException.fromErrorInfo(new ErrorInfo("Message serial cannot be empty", 400, 40003)); + } + ConnectionManager connectionManager = ably.connection.connectionManager; + ConnectionManager.State connectionState = connectionManager.getConnectionState(); + boolean queueMessages = ably.options.queueMessages; + if (!connectionManager.isActive() || (connectionState.queueEvents && !queueMessages)) { + throw AblyException.fromErrorInfo(connectionState.defaultErrorInfo); + } + boolean connected = (connectionState.sendEvents); + + Message updatedMessage = new Message(message.name, message.data, message.extras); + updatedMessage.action = action; + updatedMessage.version = new MessageVersion(); + if (operation != null) { + updatedMessage.version.clientId = operation.clientId; + updatedMessage.version.description = operation.description; + updatedMessage.version.metadata = operation.metadata; + } + + try { + ably.auth.checkClientId(message, true, connected); + updatedMessage.encode(options); + } catch (AblyException e) { + if (listener != null) { + listener.onError(e.errorInfo); + } + return; + } + + ProtocolMessage msg = new ProtocolMessage(Action.message, this.name); + msg.messages = new Message[] { message }; + connectionManager.send(msg, queueMessages, Listeners.toPublishResultListener(listener)); } /** @@ -1681,7 +1711,7 @@ public void once(ChannelState state, ChannelStateListener listener) { */ public void sendProtocolMessage(ProtocolMessage protocolMessage, CompletionListener listener) throws AblyException { ConnectionManager connectionManager = ably.connection.connectionManager; - connectionManager.send(protocolMessage, ably.options.queueMessages, listener); + connectionManager.send(protocolMessage, ably.options.queueMessages, Listeners.fromCompletionListener(listener)); } private static final String TAG = Channel.class.getName(); diff --git a/lib/src/main/java/io/ably/lib/realtime/Presence.java b/lib/src/main/java/io/ably/lib/realtime/Presence.java index c56297fc0..9a7e89e7e 100644 --- a/lib/src/main/java/io/ably/lib/realtime/Presence.java +++ b/lib/src/main/java/io/ably/lib/realtime/Presence.java @@ -15,6 +15,8 @@ import io.ably.lib.types.PresenceMessage; import io.ably.lib.types.PresenceSerializer; import io.ably.lib.types.ProtocolMessage; +import io.ably.lib.types.PublishResult; +import io.ably.lib.util.Listeners; import io.ably.lib.util.Log; import io.ably.lib.util.StringUtils; @@ -120,9 +122,9 @@ public synchronized PresenceMessage[] get(String clientId, boolean wait) throws return get(new Param(GET_WAITFORSYNC, String.valueOf(wait)), new Param(GET_CLIENTID, clientId)); } - void addPendingPresence(PresenceMessage presenceMessage, CompletionListener listener) { + void addPendingPresence(PresenceMessage presenceMessage, Callback listener) { synchronized(channel) { - final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage,listener); + final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage, Listeners.unwrap(listener)); pendingPresence.add(queuedPresence); } } @@ -763,7 +765,7 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr ProtocolMessage message = new ProtocolMessage(ProtocolMessage.Action.presence, channel.name); message.presence = new PresenceMessage[] { msg }; ConnectionManager connectionManager = ably.connection.connectionManager; - connectionManager.send(message, ably.options.queueMessages, listener); + connectionManager.send(message, ably.options.queueMessages, Listeners.fromCompletionListener(listener)); break; default: throw AblyException.fromErrorInfo(new ErrorInfo("Unable to enter presence channel in detached or failed state", 400, 91001)); @@ -892,7 +894,7 @@ private void sendQueuedMessages() { pendingPresence.clear(); try { - connectionManager.send(message, queueMessages, listener); + connectionManager.send(message, queueMessages, Listeners.fromCompletionListener(listener)); } catch(AblyException e) { Log.e(TAG, "sendQueuedMessages(): Unexpected exception sending message", e); if(listener != null) diff --git a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java index 3a680fb2a..a1987cd27 100644 --- a/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java +++ b/lib/src/main/java/io/ably/lib/transport/ConnectionManager.java @@ -28,15 +28,18 @@ import io.ably.lib.transport.ITransport.TransportParams; import io.ably.lib.transport.NetworkConnectivity.NetworkConnectivityListener; import io.ably.lib.types.AblyException; +import io.ably.lib.types.Callback; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ConnectionDetails; import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.Param; import io.ably.lib.types.ProtocolMessage; import io.ably.lib.types.ProtocolSerializer; +import io.ably.lib.types.PublishResult; import io.ably.lib.util.Log; import io.ably.lib.util.PlatformAgentProvider; import io.ably.lib.util.ReconnectionStrategy; +import org.jetbrains.annotations.Nullable; public class ConnectionManager implements ConnectListener { final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); @@ -1403,7 +1406,7 @@ private synchronized void onError(ProtocolMessage message) { } private void onAck(ProtocolMessage message) { - pendingMessages.ack(message.msgSerial, message.count, message.error); + pendingMessages.ack(message.msgSerial, message.count, message.res, message.error); } private void onNack(ProtocolMessage message) { @@ -1724,14 +1727,14 @@ protected void setLastActivity(long lastActivityTime) { public static class QueuedMessage { public final ProtocolMessage msg; - public final CompletionListener listener; - public QueuedMessage(ProtocolMessage msg, CompletionListener listener) { + public final Callback listener; + public QueuedMessage(ProtocolMessage msg, Callback listener) { this.msg = msg; this.listener = listener; } } - public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener listener) throws AblyException { + public void send(ProtocolMessage msg, boolean queueEvents, Callback listener) throws AblyException { State state; synchronized(this) { state = this.currentState; @@ -1747,7 +1750,7 @@ public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener li throw AblyException.fromErrorInfo(state.defaultErrorInfo); } - private void sendImpl(ProtocolMessage message, CompletionListener listener) throws AblyException { + private void sendImpl(ProtocolMessage message, Callback listener) throws AblyException { if(transport == null) { Log.v(TAG, "sendImpl(): Discarding message; transport unavailable"); return; @@ -1825,7 +1828,7 @@ public synchronized void push(QueuedMessage msg) { queue.add(msg); } - public void ack(long msgSerial, int count, ErrorInfo reason) { + public void ack(long msgSerial, int count, @Nullable PublishResult[] results, ErrorInfo reason) { QueuedMessage[] ackMessages = null, nackMessages = null; synchronized(this) { if (queue.isEmpty()) return; @@ -1867,11 +1870,14 @@ public void ack(long msgSerial, int count, ErrorInfo reason) { } } if(ackMessages != null) { - for(QueuedMessage msg : ackMessages) { + for (int i = 0; i < ackMessages.length; i++) { + QueuedMessage msg = ackMessages[i]; try { - if(msg.listener != null) - msg.listener.onSuccess(); - } catch(Throwable t) { + if (msg.listener != null) { + PublishResult messageResult = results != null && results.length > i ? results[i] : null; + msg.listener.onSuccess(messageResult); + } + } catch (Throwable t) { Log.e(TAG, "ack(): listener exception", t); } } diff --git a/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java b/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java index 0548e4c64..e813a21b7 100644 --- a/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java +++ b/lib/src/main/java/io/ably/lib/types/ProtocolMessage.java @@ -137,6 +137,8 @@ public ProtocolMessage(Action action, String channel) { @JsonAdapter(ObjectsJsonSerializer.class) public Object[] state; + public @Nullable PublishResult[] res; + public boolean hasFlag(final Flag flag) { return (flags & flag.getMask()) == flag.getMask(); } @@ -161,6 +163,7 @@ void writeMsgpack(MessagePacker packer) throws IOException { if(channelSerial != null) ++fieldCount; if(annotations != null) ++fieldCount; if(state != null && ObjectsHelper.getSerializer() != null) ++fieldCount; + if(res != null) ++fieldCount; packer.packMapHeader(fieldCount); packer.packString("action"); packer.packInt(action.getValue()); @@ -209,6 +212,10 @@ void writeMsgpack(MessagePacker packer) throws IOException { Log.w(TAG, "Skipping 'state' field msgpack serialization because ObjectsSerializer not found"); } } + if (res != null) { + packer.packString("res"); + PublishResult.writeMsgpackArray(res, packer); + } } ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException { @@ -280,6 +287,9 @@ ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException { unpacker.skipValue(); } break; + case "res": + res = PublishResult.readMsgpackArray(unpacker); + break; default: Log.v(TAG, "Unexpected field: " + fieldName); unpacker.skipValue(); diff --git a/lib/src/main/java/io/ably/lib/types/PublishResult.java b/lib/src/main/java/io/ably/lib/types/PublishResult.java new file mode 100644 index 000000000..f92f9d083 --- /dev/null +++ b/lib/src/main/java/io/ably/lib/types/PublishResult.java @@ -0,0 +1,130 @@ +package io.ably.lib.types; + +import io.ably.lib.http.HttpCore; +import io.ably.lib.util.Serialisation; +import org.jetbrains.annotations.Nullable; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessagePacker; +import org.msgpack.core.MessageUnpacker; + +import java.io.IOException; + +/** + * Contains the result of a publish operation. + */ +public class PublishResult { + + private static final String SERIALS = "serials"; + + /** + * An array of message serials corresponding 1:1 to the messages that were published. + * A serial may be null if the message was discarded due to a configured conflation rule. + */ + public final @Nullable String[] serials; + + public PublishResult(@Nullable String[] serials) { + this.serials = serials; + } + + public static PublishResult readFromJson(byte[] packed) throws MessageDecodeException { + return Serialisation.gson.fromJson(new String(packed), PublishResult.class); + } + + public static PublishResult readMsgpack(byte[] packed) throws AblyException { + try { + MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(packed); + return readMsgpack(unpacker); + } catch (IOException ioe) { + throw AblyException.fromThrowable(ioe); + } + } + + public static PublishResult readMsgpack(MessageUnpacker unpacker) throws IOException { + int fieldCount = unpacker.unpackMapHeader(); + for (int i = 0; i < fieldCount; i++) { + String fieldName = unpacker.unpackString(); + MessageFormat fieldFormat = unpacker.getNextFormat(); + if (fieldFormat.equals(MessageFormat.NIL)) { + unpacker.unpackNil(); + continue; + } + + if (fieldName.equals(SERIALS)) { + int count = unpacker.unpackArrayHeader(); + String[] serials = new String[count]; + for (int j = 0; i < count; i++) { + if (unpacker.getNextFormat().equals(MessageFormat.NIL)) { + unpacker.unpackNil(); + serials[j] = null; + } else { + serials[j] = unpacker.unpackString(); + } + } + return new PublishResult(serials); + } else { + unpacker.skipValue(); + } + } + return new PublishResult(new String[]{}); + } + + public static void writeMsgpackArray(PublishResult[] results, MessagePacker packer) { + try { + int count = results.length; + packer.packArrayHeader(count); + for (PublishResult result : results) { + result.writeMsgpack(packer); + } + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public static PublishResult[] readMsgpackArray(MessageUnpacker unpacker) throws IOException { + int count = unpacker.unpackArrayHeader(); + PublishResult[] results = new PublishResult[count]; + for (int i = 0; i < count; i++) { + results[i] = readMsgpack(unpacker); + } + return results; + } + + public static HttpCore.BodyHandler getBodyHandler() { + return new PublishResultBodyHandler(); + } + + private void writeMsgpack(MessagePacker packer) throws IOException { + int fieldCount = 0; + if (serials != null) ++fieldCount; + packer.packMapHeader(fieldCount); + if (serials != null) { + packer.packString(SERIALS); + packer.packArrayHeader(serials.length); + for (String serial : serials) { + if (serial == null) { + packer.packNil(); + } else { + packer.packString(serial); + } + } + } + } + + private static class PublishResultBodyHandler implements HttpCore.BodyHandler { + + @Override + public String[] handleResponseBody(String contentType, byte[] body) throws AblyException { + try { + PublishResult publishResult = null; + if ("application/json".equals(contentType)) + publishResult = readFromJson(body); + else if ("application/x-msgpack".equals(contentType)) + publishResult = readMsgpack(body); + return publishResult != null ? publishResult.serials : new String[]{}; + } catch (MessageDecodeException e) { + throw AblyException.fromThrowable(e); + } + } + } +} + diff --git a/lib/src/main/java/io/ably/lib/util/Listeners.java b/lib/src/main/java/io/ably/lib/util/Listeners.java index 534df2291..36076bee3 100644 --- a/lib/src/main/java/io/ably/lib/util/Listeners.java +++ b/lib/src/main/java/io/ably/lib/util/Listeners.java @@ -3,6 +3,8 @@ import io.ably.lib.realtime.CompletionListener; import io.ably.lib.types.Callback; import io.ably.lib.types.ErrorInfo; +import io.ably.lib.types.PublishResult; +import io.ably.lib.types.UpdateDeleteResult; public class Listeners { @@ -10,6 +12,18 @@ public static Callback fromCompletionListener(CompletionListener listener return new CompletionListenerWrapper(listener); } + public static Callback toPublishResultListener(Callback listener) { + return new UpdateResultToPublishAdapter(listener); + } + + public static CompletionListener unwrap(Callback listener) { + if (listener instanceof CompletionListenerWrapper) { + return ((CompletionListenerWrapper)listener).listener; + } else { + return null; + } + } + private static class CompletionListenerWrapper implements Callback { private final CompletionListener listener; @@ -31,4 +45,28 @@ public void onError(ErrorInfo reason) { } } } + + private static class UpdateResultToPublishAdapter implements Callback { + private final Callback listener; + + private UpdateResultToPublishAdapter(Callback listener) { + this.listener = listener; + } + + @Override + public void onSuccess(PublishResult result) { + if (listener != null) { + String serial = result != null && result.serials != null && result.serials.length > 0 + ? result.serials[0] : null; + listener.onSuccess(new UpdateDeleteResult(serial)); + } + } + + @Override + public void onError(ErrorInfo reason) { + if (listener != null) { + listener.onError(reason); + } + } + } } diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelMessageEditTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelMessageEditTest.java new file mode 100644 index 000000000..797105841 --- /dev/null +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeChannelMessageEditTest.java @@ -0,0 +1,466 @@ +package io.ably.lib.test.realtime; + +import io.ably.lib.realtime.AblyRealtime; +import io.ably.lib.realtime.Channel; +import io.ably.lib.test.common.Helpers.CompletionSet; +import io.ably.lib.test.common.ParameterizedTest; +import io.ably.lib.types.AblyException; +import io.ably.lib.types.ChannelOptions; +import io.ably.lib.types.ClientOptions; +import io.ably.lib.types.ErrorInfo; +import io.ably.lib.types.Message; +import io.ably.lib.types.MessageAction; +import io.ably.lib.types.MessageOperation; +import io.ably.lib.types.PaginatedResult; +import io.ably.lib.types.Param; +import io.ably.lib.util.Crypto; +import io.ably.lib.util.Listeners; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +import java.util.HashMap; +import java.util.UUID; +import java.util.function.Predicate; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +/** + * Tests for REST channel message edit and delete operations + */ +public class RealtimeChannelMessageEditTest extends ParameterizedTest { + + @Rule + public Timeout testTimeout = Timeout.seconds(300); + private AblyRealtime ably; + + @Before + public void setUpBefore() throws Exception { + ClientOptions opts = createOptions(testVars.keys[0].keyStr); + ably = new AblyRealtime(opts); + } + + /** + * Test getMessage: Publish a message and retrieve it by serial + */ + @Test + public void getMessage_retrieveBySerial() throws Exception { + String channelName = "mutable:get_message_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Test message data"); + + // Get the message from history to obtain its serial + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Retrieve the message by serial + Message retrievedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + + // Verify the retrieved message + assertNotNull("Expected non-null retrieved message", retrievedMessage); + assertEquals("Expected same message name", publishedMessage.name, retrievedMessage.name); + assertEquals("Expected same message data", publishedMessage.data, retrievedMessage.data); + assertEquals("Expected same serial", publishedMessage.serial, retrievedMessage.serial); + } + + /** + * Test updateMessage: Update a message's data + */ + @Test + public void updateMessage_updateData() throws Exception { + String channelName = "mutable:update_message_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original message data"); + + // Get the message from history to obtain its serial + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message + Message updateMessage = new Message(); + updateMessage.serial = publishedMessage.serial; + updateMessage.data = "Updated message data"; + updateMessage.name = "updated_event"; + + channel.updateMessage(updateMessage); + + // Retrieve the updated message + Message updatedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + + // Verify the message was updated + assertNotNull("Expected non-null updated message", updatedMessage); + assertEquals("Expected updated message data", "Updated message data", updatedMessage.data); + assertEquals("Expected updated message name", "updated_event", updatedMessage.name); + assertEquals("Expected action to be MESSAGE_UPDATE", MessageAction.MESSAGE_UPDATE, updatedMessage.action); + } + + /** + * Test updateMessage: Update a message's data + */ + @Test + public void updateMessage_updateEncodedData() throws Exception { + String channelName = "mutable:update_encodedmessage_" + UUID.randomUUID() + "_" + testParams.name; + ChannelOptions channelOptions = ChannelOptions.withCipherKey(Crypto.generateRandomKey()); + Channel channel = ably.channels.get(channelName, channelOptions); + + // Publish a message + channel.publish("test_event", "Original message data"); + + // Get the message from history to obtain its serial + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message + Message updateMessage = new Message(); + updateMessage.serial = publishedMessage.serial; + updateMessage.data = "Updated message data"; + updateMessage.name = "updated_event"; + + channel.updateMessage(updateMessage); + + // Retrieve the updated message + Message updatedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + + // Verify the message was updated + assertNotNull("Expected non-null updated message", updatedMessage); + assertEquals("Expected updated message data", "Updated message data", updatedMessage.data); + assertEquals("Expected updated message name", "updated_event", updatedMessage.name); + assertEquals("Expected action to be MESSAGE_UPDATE", MessageAction.MESSAGE_UPDATE, updatedMessage.action); + } + + /** + * Test updateMessage async: Update a message using async API + */ + @Test + public void updateMessage_async() throws Exception { + String channelName = "mutable:update_message_async_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original message data"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + final Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message using async API + Message updateMessage = new Message(); + updateMessage.serial = publishedMessage.serial; + updateMessage.data = "Updated message data async"; + + CompletionSet updateComplete = new CompletionSet(); + channel.updateMessage(updateMessage, Listeners.fromCompletionListener(updateComplete.add())); + + ErrorInfo[] updateErrors = updateComplete.waitFor(); + assertEquals("Expected no errors from update", 0, updateErrors.length); + + // Retrieve the updated message + Message updatedMessage = waitForUpdatedMessageAppear(channel, publishedMessage.serial); + assertNotNull("Expected non-null updated message", updatedMessage); + assertEquals("Expected updated message data", "Updated message data async", updatedMessage.data); + } + + /** + * Test deleteMessage: Soft delete a message + */ + @Test + public void deleteMessage_softDelete() throws Exception { + String channelName = "mutable:delete_message_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Message to be deleted"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Delete the message + Message deleteMessage = new Message(); + deleteMessage.serial = publishedMessage.serial; + deleteMessage.data = "Message deleted"; + + channel.deleteMessage(deleteMessage); + + // Retrieve the deleted message + Message deletedMessage = waitForDeletedMessageAppear(channel, publishedMessage.serial); + + // Verify the message was soft deleted + assertNotNull("Expected non-null deleted message", deletedMessage); + assertEquals("Expected action to be MESSAGE_DELETE", MessageAction.MESSAGE_DELETE, deletedMessage.action); + } + + /** + * Test deleteMessage async: Delete a message using async API + */ + @Test + public void deleteMessage_async() throws Exception { + String channelName = "mutable:delete_message_async_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Message to be deleted async"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + final Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Delete the message using async API + Message deleteMessage = new Message(); + deleteMessage.serial = publishedMessage.serial; + deleteMessage.data = "Message deleted async"; + + CompletionSet deleteComplete = new CompletionSet(); + channel.deleteMessage(deleteMessage, Listeners.fromCompletionListener(deleteComplete.add())); + + ErrorInfo[] deleteErrors = deleteComplete.waitFor(); + assertEquals("Expected no errors from delete", 0, deleteErrors.length); + + // Retrieve the deleted message + Message deletedMessage = waitForDeletedMessageAppear(channel, publishedMessage.serial); + assertNotNull("Expected non-null deleted message", deletedMessage); + assertEquals("Expected action to be MESSAGE_DELETE", MessageAction.MESSAGE_DELETE, deletedMessage.action); + } + + /** + * Test getMessageVersions: Retrieve version history of a message + */ + @Test + public void getMessageVersions_retrieveHistory() throws Exception { + String channelName = "mutable:message_versions_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original data"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + + // Update the message to create version history + Message updateMessage1 = new Message(); + updateMessage1.serial = publishedMessage.serial; + updateMessage1.data = "First update"; + channel.updateMessage(updateMessage1); + + Message updateMessage2 = new Message(); + updateMessage2.serial = publishedMessage.serial; + updateMessage2.data = "Second update"; + MessageOperation messageOperation = new MessageOperation(); + messageOperation.description = "description"; + messageOperation.metadata = new HashMap<>(); + messageOperation.metadata.put("key", "value"); + channel.updateMessage(updateMessage2, messageOperation); + + // Retrieve version history + PaginatedResult versions = waitForMessageAppearInVersionHistory(channel, publishedMessage.serial, null, msgs -> + msgs.length >= 3 + ); + + // Verify version history + assertNotNull("Expected non-null versions", versions); + assertTrue("Expected at least 3 versions (original + 2 updates)", versions.items().length >= 3); + + Message latestVersion = versions.items()[versions.items().length - 1]; + assertEquals("Expected latest version to have second update data", "Second update", latestVersion.data); + assertEquals("description", latestVersion.version.description); + assertEquals("value", latestVersion.version.metadata.get("key")); + } + + /** + * Test getMessageVersions async: Retrieve version history using async API + */ + @Test + public void getMessageVersions_async() throws Exception { + String channelName = "mutable:message_versions_async_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // Publish a message + channel.publish("test_event", "Original data"); + + // Get the message from history + PaginatedResult history = waitForMessageAppearInHistory(channel); + assertNotNull("Expected non-null history", history); + assertEquals(1, history.items().length); + + final Message publishedMessage = history.items()[0]; + assertNotNull("Expected message to have a serial", publishedMessage.serial); + } + + /** + * Test error handling: getMessage with invalid serial + */ + @Test + public void getMessage_invalidSerial() { + String channelName = "mutable:get_message_invalid_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + AblyException exception = assertThrows(AblyException.class, () -> { + channel.getMessage("invalid_serial_12345"); + }); + + assertNotNull("Expected error info", exception.errorInfo); + } + + /** + * Test error handling: updateMessage with null serial + */ + @Test + public void updateMessage_nullSerial() { + String channelName = "mutable:update_message_null_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + AblyException exception = assertThrows(AblyException.class, () -> { + Message updateMessage = new Message(); + updateMessage.serial = null; + updateMessage.data = "Update data"; + + channel.updateMessage(updateMessage); + }); + + assertNotNull("Expected error info", exception.errorInfo); + assertTrue("Expected error message about serial", + exception.errorInfo.message.toLowerCase().contains("serial")); + } + + /** + * Test error handling: deleteMessage with empty serial + */ + @Test + public void deleteMessage_emptySerial() { + String channelName = "mutable:delete_message_empty_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + AblyException exception = assertThrows(AblyException.class, () -> { + Message deleteMessage = new Message(); + deleteMessage.serial = ""; + deleteMessage.data = "Delete data"; + + channel.deleteMessage(deleteMessage); + }); + + assertNotNull("Expected error info", exception.errorInfo); + assertTrue("Expected error message about serial", + exception.errorInfo.message.toLowerCase().contains("serial")); + } + + /** + * Test complete workflow: publish, update, get versions, delete + */ + @Test + public void completeWorkflow_publishUpdateVersionsDelete() throws Exception { + String channelName = "mutable:complete_workflow_" + UUID.randomUUID() + "_" + testParams.name; + Channel channel = ably.channels.get(channelName); + + // 1. Publish a message + channel.publish("workflow_event", "Initial data"); + + // Get the published message + PaginatedResult history = waitForMessageAppearInHistory(channel); + Message publishedMessage = history.items()[0]; + String serial = publishedMessage.serial; + + // 2. Update the message + Message updateMessage = new Message(); + updateMessage.serial = serial; + updateMessage.data = "Updated data"; + updateMessage.name = "workflow_event_updated"; + channel.updateMessage(updateMessage); + + // 3. Verify update + Message retrieved = waitForUpdatedMessageAppear(channel, serial); + assertEquals("Expected updated data", "Updated data", retrieved.data); + assertEquals("Expected MESSAGE_UPDATE action", MessageAction.MESSAGE_UPDATE, retrieved.action); + + // 4. Delete the message + Message deleteMessage = new Message(); + deleteMessage.serial = serial; + deleteMessage.data = "Deleted"; + channel.deleteMessage(deleteMessage); + + // 5. Verify deletion + Message deleted = waitForDeletedMessageAppear(channel, serial); + assertEquals("Expected MESSAGE_DELETE action", MessageAction.MESSAGE_DELETE, deleted.action); + + // 6. Verify delete appears in versions + PaginatedResult finalVersions = waitForMessageAppearInVersionHistory(channel, serial, null, msgs -> + msgs.length > 0 && msgs[msgs.length - 1].action == MessageAction.MESSAGE_DELETE + ); + assertTrue("Expected at least 3 versions (create, update, delete)", finalVersions.items().length >= 3); + } + + private PaginatedResult waitForMessageAppearInVersionHistory(Channel channel, String serial, Param[] params, Predicate predicate) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + PaginatedResult history = channel.getMessageVersions(serial, params); + if (history.items().length > 0 && predicate.test(history.items()) || System.currentTimeMillis() > timeout) + return history; + Thread.sleep(200); + } + } + + private PaginatedResult waitForMessageAppearInHistory(Channel channel) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + PaginatedResult history = channel.history(null); + if (history.items().length > 0 || System.currentTimeMillis() > timeout) return history; + Thread.sleep(200); + } + } + + private Message waitForUpdatedMessageAppear(Channel channel, String serial) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + Message message = channel.getMessage(serial); + if ((message != null && message.action == MessageAction.MESSAGE_UPDATE) || System.currentTimeMillis() > timeout) + return message; + Thread.sleep(200); + } + } + + private Message waitForDeletedMessageAppear(Channel channel, String serial) throws Exception { + long timeout = System.currentTimeMillis() + 5_000; + while (true) { + Message message = channel.getMessage(serial); + if ((message != null && message.action == MessageAction.MESSAGE_DELETE) || System.currentTimeMillis() > timeout) + return message; + Thread.sleep(200); + } + } +} diff --git a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java index 65133d4cd..11a07274c 100644 --- a/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java +++ b/lib/src/test/java/io/ably/lib/test/realtime/RealtimeConnectFailTest.java @@ -19,9 +19,11 @@ import io.ably.lib.test.util.MockWebsocketFactory; import io.ably.lib.transport.Defaults; import io.ably.lib.types.AblyException; +import io.ably.lib.types.Callback; import io.ably.lib.types.ClientOptions; import io.ably.lib.types.ErrorInfo; import io.ably.lib.types.ProtocolMessage; +import io.ably.lib.types.PublishResult; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -392,9 +394,9 @@ public void connect_test_queued_messages_on_failure() { final int[] numberOfErrors = new int[]{0}; // assume we are in connecting state now - ably.connection.connectionManager.send(new ProtocolMessage(), true, new CompletionListener() { + ably.connection.connectionManager.send(new ProtocolMessage(), true, new Callback() { @Override - public void onSuccess() { + public void onSuccess(PublishResult result) { fail("Unexpected success sending message"); }