Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/LiveObjectSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.ably.lib.objects;

import com.google.gson.JsonArray;
import org.jetbrains.annotations.NotNull;
import org.msgpack.core.MessagePacker;
import org.msgpack.core.MessageUnpacker;

import java.io.IOException;

/**
* Serializer interface for converting between LiveObject arrays and their
* MessagePack or JSON representations.
*/
public interface LiveObjectSerializer {
/**
* Reads a MessagePack array from the given unpacker and deserializes it into an Object array.
*
* @param unpacker the MessageUnpacker to read from
* @return the deserialized Object array
* @throws IOException if an I/O error occurs during unpacking
*/
@NotNull
Object[] readMsgpackArray(@NotNull MessageUnpacker unpacker) throws IOException;

/**
* Serializes the given Object array as a MessagePack array using the provided packer.
*
* @param objects the Object array to serialize
* @param packer the MessagePacker to write to
* @throws IOException if an I/O error occurs during packing
*/
void writeMsgpackArray(@NotNull Object[] objects, @NotNull MessagePacker packer) throws IOException;

/**
* Reads a JSON array from the given {@link JsonArray} and deserializes it into an Object array.
*
* @param json the {@link JsonArray} representing the array to deserialize
* @return the deserialized Object array
*/
@NotNull
Object[] readFromJsonArray(@NotNull JsonArray json);

/**
* Serializes the given Object array as a JSON array.
*
* @param objects the Object array to serialize
* @return the resulting JsonArray
*/
@NotNull
JsonArray asJsonArray(@NotNull Object[] objects);
}
43 changes: 43 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/LiveObjectsHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.ably.lib.objects;

import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.util.Log;

import java.lang.reflect.InvocationTargetException;

public class LiveObjectsHelper {

private static final String TAG = LiveObjectsHelper.class.getName();
private static volatile LiveObjectSerializer liveObjectSerializer;

public static LiveObjectsPlugin tryInitializeLiveObjectsPlugin(AblyRealtime ablyRealtime) {
try {
Class<?> liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin");
LiveObjectsAdapter adapter = new Adapter(ablyRealtime);
return (LiveObjectsPlugin) liveObjectsImplementation
.getDeclaredConstructor(LiveObjectsAdapter.class)
.newInstance(adapter);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e);
return null;
}
}

public static LiveObjectSerializer getLiveObjectSerializer() {
if (liveObjectSerializer == null) {
synchronized (LiveObjectsHelper.class) {
try {
Class<?> serializerClass = Class.forName("io.ably.lib.objects.serialization.DefaultLiveObjectSerializer");
liveObjectSerializer = (LiveObjectSerializer) serializerClass.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException |
NoSuchMethodException |
InvocationTargetException e) {
Log.e(TAG, "Failed to init LiveObjectSerializer, LiveObjects plugin not included in the classpath", e);
return null;
}
}
}
return liveObjectSerializer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.ably.lib.objects;

import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import io.ably.lib.util.Log;

import java.lang.reflect.Type;

public class LiveObjectsJsonSerializer implements JsonSerializer<Object[]>, JsonDeserializer<Object[]> {
private static final String TAG = LiveObjectsJsonSerializer.class.getName();
private final LiveObjectSerializer serializer = LiveObjectsHelper.getLiveObjectSerializer();

@Override
public Object[] deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
if (serializer == null) {
Log.w(TAG, "Skipping 'state' field json deserialization because LiveObjectsSerializer not found.");
return null;
}
if (!json.isJsonArray()) {
throw new JsonParseException("Expected a JSON array for 'state' field, but got: " + json);
}
return serializer.readFromJsonArray(json.getAsJsonArray());
}

@Override
public JsonElement serialize(Object[] src, Type typeOfSrc, JsonSerializationContext context) {
if (serializer == null) {
Log.w(TAG, "Skipping 'state' field json serialization because LiveObjectsSerializer not found.");
return JsonNull.INSTANCE;
}
return serializer.asJsonArray(src);
}
}
20 changes: 2 additions & 18 deletions lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package io.ably.lib.realtime;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.ably.lib.objects.Adapter;
import io.ably.lib.objects.LiveObjectsAdapter;
import io.ably.lib.objects.LiveObjectsHelper;
import io.ably.lib.objects.LiveObjectsPlugin;
import io.ably.lib.rest.AblyRest;
import io.ably.lib.rest.Auth;
Expand Down Expand Up @@ -74,7 +72,7 @@ public AblyRealtime(ClientOptions options) throws AblyException {
final InternalChannels channels = new InternalChannels();
this.channels = channels;

liveObjectsPlugin = tryInitializeLiveObjectsPlugin();
liveObjectsPlugin = LiveObjectsHelper.tryInitializeLiveObjectsPlugin(this);

connection = new Connection(this, channels, platformAgentProvider, liveObjectsPlugin);

Expand Down Expand Up @@ -185,20 +183,6 @@ public interface Channels extends ReadOnlyMap<String, Channel> {
void release(String channelName);
}

private LiveObjectsPlugin tryInitializeLiveObjectsPlugin() {
try {
Class<?> liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin");
LiveObjectsAdapter adapter = new Adapter(this);
return (LiveObjectsPlugin) liveObjectsImplementation
.getDeclaredConstructor(LiveObjectsAdapter.class)
.newInstance(adapter);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e);
return null;
}
}

private class InternalChannels extends InternalMap<String, Channel> implements Channels, ConnectionManager.Channels {
/**
* Get the named channel; if it does not already exist,
Expand Down
32 changes: 32 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ProtocolMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import java.lang.reflect.Type;
import java.util.Map;

import com.google.gson.annotations.JsonAdapter;
import io.ably.lib.objects.LiveObjectSerializer;
import io.ably.lib.objects.LiveObjectsHelper;
import io.ably.lib.objects.LiveObjectsJsonSerializer;
import org.jetbrains.annotations.Nullable;
import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessagePacker;
import org.msgpack.core.MessageUnpacker;
Expand Down Expand Up @@ -123,6 +128,14 @@ public ProtocolMessage(Action action, String channel) {
public AuthDetails auth;
public Map<String, String> params;
public Annotation[] annotations;
/**
* This will be null if we skipped decoding this property due to user not requesting Objects functionality
* JsonAdapter annotation supports java version (1.8) mentioned in build.gradle
* This is targeted and specific to the state field, so won't affect other fields
*/
@Nullable
@JsonAdapter(LiveObjectsJsonSerializer.class)
public Object[] state;

public boolean hasFlag(final Flag flag) {
return (flags & flag.getMask()) == flag.getMask();
Expand All @@ -147,6 +160,7 @@ void writeMsgpack(MessagePacker packer) throws IOException {
if(params != null) ++fieldCount;
if(channelSerial != null) ++fieldCount;
if(annotations != null) ++fieldCount;
if(state != null && LiveObjectsHelper.getLiveObjectSerializer() != null) ++fieldCount;
packer.packMapHeader(fieldCount);
packer.packString("action");
packer.packInt(action.getValue());
Expand Down Expand Up @@ -186,6 +200,15 @@ void writeMsgpack(MessagePacker packer) throws IOException {
packer.packString("annotations");
AnnotationSerializer.writeMsgpackArray(annotations, packer);
}
if(state != null) {
LiveObjectSerializer liveObjectsSerializer = LiveObjectsHelper.getLiveObjectSerializer();
if (liveObjectsSerializer != null) {
packer.packString("state");
liveObjectsSerializer.writeMsgpackArray(state, packer);
} else {
Log.w(TAG, "Skipping 'state' field msgpack serialization because LiveObjectsSerializer not found");
}
}
}

ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException {
Expand Down Expand Up @@ -248,6 +271,15 @@ ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException {
case "annotations":
annotations = AnnotationSerializer.readMsgpackArray(unpacker);
break;
case "state":
LiveObjectSerializer liveObjectsSerializer = LiveObjectsHelper.getLiveObjectSerializer();
if (liveObjectsSerializer != null) {
state = liveObjectsSerializer.readMsgpackArray(unpacker);
} else {
Log.w(TAG, "Skipping 'state' field msgpack deserialization because LiveObjectsSerializer not found");
unpacker.skipValue();
}
break;
default:
Log.v(TAG, "Unexpected field: " + fieldName);
unpacker.skipValue();
Expand Down
15 changes: 8 additions & 7 deletions lib/src/main/java/io/ably/lib/types/ProtocolSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class ProtocolSerializer {
/****************************************
* Msgpack decode
****************************************/

public static ProtocolMessage readMsgpack(byte[] packed) throws AblyException {
try {
MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(packed);
Expand All @@ -27,30 +27,31 @@ public static ProtocolMessage readMsgpack(byte[] packed) throws AblyException {
/****************************************
* Msgpack encode
****************************************/
public static byte[] writeMsgpack(ProtocolMessage message) {

public static byte[] writeMsgpack(ProtocolMessage message) throws AblyException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
MessagePacker packer = Serialisation.msgpackPackerConfig.newPacker(out);
try {
message.writeMsgpack(packer);

packer.flush();
return out.toByteArray();
} catch(IOException e) { return null; }
} catch (IOException ioe) {
throw AblyException.fromThrowable(ioe);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you double-check that we won't introduce regression here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, there's no point in returning null when writeMsgpack fails.
Catching error silently means although our send/publish operation failed, we are not returning any error for the operation. This is not expected ( even from customer use-case point of view ) and relevant error should be thrown to understand and fix the failed operation.
PS. I faced similar issue when writeMsgpack failed for publish op. and was not able to trace the error thrown for the same.

Copy link
Collaborator Author

@sacOO7 sacOO7 Jun 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we already throw error for similar readMsgpack method =>

public static ProtocolMessage readMsgpack(byte[] packed) throws AblyException {
try {
MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(packed);
return ProtocolMessage.fromMsgpack(unpacker);
} catch (IOException ioe) {
throw AblyException.fromThrowable(ioe);
}
}

Seems, the relevant code for the writeMsgpack method wasn't updated.

}
}

/****************************************
* JSON decode
****************************************/

public static ProtocolMessage fromJSON(String packed) throws AblyException {
return Serialisation.gson.fromJson(packed, ProtocolMessage.class);
}

/****************************************
* JSON encode
****************************************/

public static byte[] writeJSON(ProtocolMessage message) throws AblyException {
return Serialisation.gson.toJson(message).getBytes(Charset.forName("UTF-8"));
}
Expand Down
1 change: 1 addition & 0 deletions live-objects/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ repositories {

dependencies {
implementation(project(":java"))
implementation(libs.bundles.common)
implementation(libs.coroutine.core)

testImplementation(kotlin("test"))
Expand Down
8 changes: 4 additions & 4 deletions live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ internal enum class ProtocolMessageFormat(private val value: String) {
override fun toString(): String = value
}

internal class Binary(val data: ByteArray?) {
internal class Binary(val data: ByteArray) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is Binary) return false
return data?.contentEquals(other.data) == true
return data.contentEquals(other.data)
}

override fun hashCode(): Int {
return data?.contentHashCode() ?: 0
return data.contentHashCode()
}
}

internal fun Binary.size(): Int {
return data?.size ?: 0
return data.size
}
18 changes: 11 additions & 7 deletions live-objects/src/main/kotlin/io/ably/lib/objects/ObjectMessage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ package io.ably.lib.objects
import com.google.gson.JsonArray
import com.google.gson.JsonObject

import com.google.gson.annotations.JsonAdapter
import com.google.gson.annotations.SerializedName
import io.ably.lib.objects.serialization.InitialValueJsonSerializer
import io.ably.lib.objects.serialization.ObjectDataJsonSerializer
import io.ably.lib.objects.serialization.gson

/**
* An enum class representing the different actions that can be performed on an object.
* Spec: OOP2
Expand All @@ -28,19 +34,14 @@ internal enum class MapSemantics(val code: Int) {
* An ObjectData represents a value in an object on a channel.
* Spec: OD1
*/
@JsonAdapter(ObjectDataJsonSerializer::class)
internal data class ObjectData(
/**
* A reference to another object, used to support composable object structures.
* Spec: OD2a
*/
val objectId: String? = null,

/**
* Can be set by the client to indicate that value in `string` or `bytes` field have an encoding.
* Spec: OD2b
*/
val encoding: String? = null,

/**
* String, number, boolean or binary - a concrete value of the object
* Spec: OD2c
Expand Down Expand Up @@ -217,11 +218,13 @@ internal data class ObjectOperation(
* the initialValue, nonce, and initialValueEncoding will be removed.
* Spec: OOP3h
*/
@JsonAdapter(InitialValueJsonSerializer::class)
val initialValue: Binary? = null,

/** The initial value encoding defines how the initialValue should be interpreted.
* Spec: OOP3i
*/
@Deprecated("Will be removed in the future, initialValue will be json string")
val initialValueEncoding: ProtocolMessageFormat? = null
)

Expand Down Expand Up @@ -312,7 +315,7 @@ internal data class ObjectMessage(
* or validation of the @extras@ field itself, but should treat it opaquely, encoding it and passing it to realtime unaltered
* Spec: OM2d
*/
val extras: Any? = null,
val extras: JsonObject? = null,

/**
* Describes an operation to be applied to an object.
Expand All @@ -328,6 +331,7 @@ internal data class ObjectMessage(
* the `ProtocolMessage` encapsulating it is `OBJECT_SYNC`.
* Spec: OM2g
*/
@SerializedName("object")
val objectState: ObjectState? = null,

/**
Expand Down
Loading
Loading