Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ mockk = "1.14.2"
turbine = "1.2.0"
ktor = "3.1.3"
jetbrains-annoations = "26.0.2"
jackson-msgpack = "0.8.11" # Compatible with msgpack-core 0.8.11
jackson-param = "2.19.1" # Compatible with jackson-msgpack

[libraries]
gson = { group = "com.google.code.gson", name = "gson", version.ref = "gson" }
Expand Down Expand Up @@ -54,6 +56,8 @@ turbine = { group = "app.cash.turbine", name = "turbine", version.ref = "turbine
ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" }
ktor-client-cio = { module = "io.ktor:ktor-client-cio", version.ref = "ktor" }
jetbrains = { group = "org.jetbrains", name = "annotations", version.ref = "jetbrains-annoations" }
jackson-msgpack = { group = "org.msgpack", name = "jackson-dataformat-msgpack", version.ref = "jackson-msgpack" }
jackson-parameter-names = { group = "com.fasterxml.jackson.module", name = "jackson-module-parameter-names", version.ref = "jackson-param" }

[bundles]
common = ["msgpack", "vcdiff-core"]
Expand Down
51 changes: 51 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/LiveObjectSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.ably.lib.objects;

import com.google.gson.JsonArray;
import org.jetbrains.annotations.NotNull;
import org.msgpack.core.MessagePacker;
import org.msgpack.core.MessageUnpacker;

import java.io.IOException;

/**
* Serializer interface for converting between LiveObject arrays and their
* MessagePack or JSON representations.
*/
public interface LiveObjectSerializer {
/**
* Reads a MessagePack array from the given unpacker and deserializes it into an Object array.
*
* @param unpacker the MessageUnpacker to read from
* @return the deserialized Object array
* @throws IOException if an I/O error occurs during unpacking
*/
@NotNull
Object[] readMsgpackArray(@NotNull MessageUnpacker unpacker) throws IOException;

/**
* Serializes the given Object array as a MessagePack array using the provided packer.
*
* @param objects the Object array to serialize
* @param packer the MessagePacker to write to
* @throws IOException if an I/O error occurs during packing
*/
void writeMsgpackArray(@NotNull Object[] objects, @NotNull MessagePacker packer) throws IOException;

/**
* Reads a JSON array from the given {@link JsonArray} and deserializes it into an Object array.
*
* @param json the {@link JsonArray} representing the array to deserialize
* @return the deserialized Object array
*/
@NotNull
Object[] readFromJsonArray(@NotNull JsonArray json);

/**
* Serializes the given Object array as a JSON array.
*
* @param objects the Object array to serialize
* @return the resulting JsonArray
*/
@NotNull
JsonArray asJsonArray(@NotNull Object[] objects);
}
40 changes: 40 additions & 0 deletions lib/src/main/java/io/ably/lib/objects/LiveObjectsHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.ably.lib.objects;

import io.ably.lib.realtime.AblyRealtime;
import io.ably.lib.util.Log;

import java.lang.reflect.InvocationTargetException;

public class LiveObjectsHelper {

private static final String TAG = LiveObjectsHelper.class.getName();
private static LiveObjectSerializer liveObjectSerializer;

public static LiveObjectsPlugin tryInitializeLiveObjectsPlugin(AblyRealtime ablyRealtime) {
try {
Class<?> liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin");
LiveObjectsAdapter adapter = new Adapter(ablyRealtime);
return (LiveObjectsPlugin) liveObjectsImplementation
.getDeclaredConstructor(LiveObjectsAdapter.class)
.newInstance(adapter);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e);
return null;
}
}

public static LiveObjectSerializer getLiveObjectSerializer() {
if (liveObjectSerializer == null) {
try {
Class<?> serializerClass = Class.forName("io.ably.lib.objects.serialization.DefaultLiveObjectSerializer");
liveObjectSerializer = (LiveObjectSerializer) serializerClass.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
Log.e(TAG, "Failed to init LiveObjectSerializer, LiveObjects plugin not included in the classpath", e);
return null;
}
}
return liveObjectSerializer;
}
Comment on lines +27 to +39
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix thread safety issue in singleton pattern.

The singleton implementation for liveObjectSerializer is not thread-safe. Multiple threads could simultaneously check the null condition and create multiple instances, violating the singleton pattern.

 public static LiveObjectSerializer getLiveObjectSerializer() {
-    if (liveObjectSerializer == null) {
+    if (liveObjectSerializer == null) {
+        synchronized (LiveObjectsHelper.class) {
+            if (liveObjectSerializer == null) {
                 try {
                     Class<?> serializerClass = Class.forName("io.ably.lib.objects.serialization.DefaultLiveObjectSerializer");
                     liveObjectSerializer = (LiveObjectSerializer) serializerClass.getDeclaredConstructor().newInstance();
                 } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
                          InvocationTargetException e) {
                     Log.e(TAG, "Failed to init LiveObjectSerializer, LiveObjects plugin not included in the classpath", e);
                     return null;
                 }
+            }
+        }
     }
     return liveObjectSerializer;
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public static LiveObjectSerializer getLiveObjectSerializer() {
if (liveObjectSerializer == null) {
try {
Class<?> serializerClass = Class.forName("io.ably.lib.objects.serialization.DefaultLiveObjectSerializer");
liveObjectSerializer = (LiveObjectSerializer) serializerClass.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
Log.e(TAG, "Failed to init LiveObjectSerializer, LiveObjects plugin not included in the classpath", e);
return null;
}
}
return liveObjectSerializer;
}
public static LiveObjectSerializer getLiveObjectSerializer() {
if (liveObjectSerializer == null) {
synchronized (LiveObjectsHelper.class) {
if (liveObjectSerializer == null) {
try {
Class<?> serializerClass = Class.forName("io.ably.lib.objects.serialization.DefaultLiveObjectSerializer");
liveObjectSerializer = (LiveObjectSerializer) serializerClass.getDeclaredConstructor().newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
Log.e(TAG, "Failed to init LiveObjectSerializer, LiveObjects plugin not included in the classpath", e);
return null;
}
}
}
}
return liveObjectSerializer;
}
🤖 Prompt for AI Agents
In lib/src/main/java/io/ably/lib/objects/LiveObjectsHelper.java around lines 27
to 39, the singleton pattern for liveObjectSerializer is not thread-safe,
allowing multiple threads to create separate instances. To fix this, synchronize
the block that checks and initializes liveObjectSerializer or use a thread-safe
lazy initialization approach such as double-checked locking to ensure only one
instance is created even under concurrent access.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.ably.lib.objects;

import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import io.ably.lib.util.Log;

import java.lang.reflect.Type;

public class LiveObjectsJsonSerializer implements JsonSerializer<Object[]>, JsonDeserializer<Object[]> {
private static final String TAG = LiveObjectsJsonSerializer.class.getName();
private final LiveObjectSerializer serializer = LiveObjectsHelper.getLiveObjectSerializer();

@Override
public Object[] deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
if (serializer == null) {
Log.w(TAG, "Skipping 'state' field json deserialization because LiveObjectsSerializer not found.");
return null;
}
if (!json.isJsonArray()) {
throw new JsonParseException("Expected a JSON array for 'state' field, but got: " + json);
}
return serializer.readFromJsonArray(json.getAsJsonArray());
}

@Override
public JsonElement serialize(Object[] src, Type typeOfSrc, JsonSerializationContext context) {
if (serializer == null) {
Log.w(TAG, "Skipping 'state' field json serialization because LiveObjectsSerializer not found.");
return JsonNull.INSTANCE;
}
return serializer.asJsonArray(src);
}
}
20 changes: 2 additions & 18 deletions lib/src/main/java/io/ably/lib/realtime/AblyRealtime.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package io.ably.lib.realtime;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.ably.lib.objects.Adapter;
import io.ably.lib.objects.LiveObjectsAdapter;
import io.ably.lib.objects.LiveObjectsHelper;
import io.ably.lib.objects.LiveObjectsPlugin;
import io.ably.lib.rest.AblyRest;
import io.ably.lib.rest.Auth;
Expand Down Expand Up @@ -74,7 +72,7 @@ public AblyRealtime(ClientOptions options) throws AblyException {
final InternalChannels channels = new InternalChannels();
this.channels = channels;

liveObjectsPlugin = tryInitializeLiveObjectsPlugin();
liveObjectsPlugin = LiveObjectsHelper.tryInitializeLiveObjectsPlugin(this);

connection = new Connection(this, channels, platformAgentProvider, liveObjectsPlugin);

Expand Down Expand Up @@ -185,20 +183,6 @@ public interface Channels extends ReadOnlyMap<String, Channel> {
void release(String channelName);
}

private LiveObjectsPlugin tryInitializeLiveObjectsPlugin() {
try {
Class<?> liveObjectsImplementation = Class.forName("io.ably.lib.objects.DefaultLiveObjectsPlugin");
LiveObjectsAdapter adapter = new Adapter(this);
return (LiveObjectsPlugin) liveObjectsImplementation
.getDeclaredConstructor(LiveObjectsAdapter.class)
.newInstance(adapter);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException |
InvocationTargetException e) {
Log.i(TAG, "LiveObjects plugin not found in classpath. LiveObjects functionality will not be available.", e);
return null;
}
}

private class InternalChannels extends InternalMap<String, Channel> implements Channels, ConnectionManager.Channels {
/**
* Get the named channel; if it does not already exist,
Expand Down
32 changes: 32 additions & 0 deletions lib/src/main/java/io/ably/lib/types/ProtocolMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import java.lang.reflect.Type;
import java.util.Map;

import com.google.gson.annotations.JsonAdapter;
import io.ably.lib.objects.LiveObjectSerializer;
import io.ably.lib.objects.LiveObjectsHelper;
import io.ably.lib.objects.LiveObjectsJsonSerializer;
import org.jetbrains.annotations.Nullable;
import org.msgpack.core.MessageFormat;
import org.msgpack.core.MessagePacker;
import org.msgpack.core.MessageUnpacker;
Expand Down Expand Up @@ -123,6 +128,14 @@ public ProtocolMessage(Action action, String channel) {
public AuthDetails auth;
public Map<String, String> params;
public Annotation[] annotations;
/**
* This will be null if we skipped decoding this property due to user not requesting Objects functionality
* JsonAdapter annotation supports java version (1.8) mentioned in build.gradle
* This is targeted and specific to the state field, so won't affect other fields
*/
@Nullable
@JsonAdapter(LiveObjectsJsonSerializer.class)
public Object[] state;

public boolean hasFlag(final Flag flag) {
return (flags & flag.getMask()) == flag.getMask();
Expand All @@ -147,6 +160,7 @@ void writeMsgpack(MessagePacker packer) throws IOException {
if(params != null) ++fieldCount;
if(channelSerial != null) ++fieldCount;
if(annotations != null) ++fieldCount;
if(state != null) ++fieldCount;
packer.packMapHeader(fieldCount);
packer.packString("action");
packer.packInt(action.getValue());
Expand Down Expand Up @@ -186,6 +200,15 @@ void writeMsgpack(MessagePacker packer) throws IOException {
packer.packString("annotations");
AnnotationSerializer.writeMsgpackArray(annotations, packer);
}
if(state != null) {
LiveObjectSerializer liveObjectsSerializer = LiveObjectsHelper.getLiveObjectSerializer();
if (liveObjectsSerializer != null) {
packer.packString("state");
liveObjectsSerializer.writeMsgpackArray(state, packer);
} else {
Log.w(TAG, "Skipping 'state' field msgpack serialization because LiveObjectsSerializer not found");
}
}
}

ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException {
Expand Down Expand Up @@ -248,6 +271,15 @@ ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException {
case "annotations":
annotations = AnnotationSerializer.readMsgpackArray(unpacker);
break;
case "state":
LiveObjectSerializer liveObjectsSerializer = LiveObjectsHelper.getLiveObjectSerializer();
if (liveObjectsSerializer != null) {
state = liveObjectsSerializer.readMsgpackArray(unpacker);
} else {
Log.w(TAG, "Skipping 'state' field msgpack deserialization because LiveObjectsSerializer not found");
unpacker.skipValue();
}
break;
default:
Log.v(TAG, "Unexpected field: " + fieldName);
unpacker.skipValue();
Expand Down
15 changes: 8 additions & 7 deletions lib/src/main/java/io/ably/lib/types/ProtocolSerializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class ProtocolSerializer {
/****************************************
* Msgpack decode
****************************************/

public static ProtocolMessage readMsgpack(byte[] packed) throws AblyException {
try {
MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(packed);
Expand All @@ -27,30 +27,31 @@ public static ProtocolMessage readMsgpack(byte[] packed) throws AblyException {
/****************************************
* Msgpack encode
****************************************/
public static byte[] writeMsgpack(ProtocolMessage message) {

public static byte[] writeMsgpack(ProtocolMessage message) throws AblyException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
MessagePacker packer = Serialisation.msgpackPackerConfig.newPacker(out);
try {
message.writeMsgpack(packer);

packer.flush();
return out.toByteArray();
} catch(IOException e) { return null; }
} catch (IOException ioe) {
throw AblyException.fromThrowable(ioe);
}
}

/****************************************
* JSON decode
****************************************/

public static ProtocolMessage fromJSON(String packed) throws AblyException {
return Serialisation.gson.fromJson(packed, ProtocolMessage.class);
}

/****************************************
* JSON encode
****************************************/

public static byte[] writeJSON(ProtocolMessage message) throws AblyException {
return Serialisation.gson.toJson(message).getBytes(Charset.forName("UTF-8"));
}
Expand Down
12 changes: 12 additions & 0 deletions live-objects/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ repositories {

dependencies {
implementation(project(":java"))
implementation(libs.bundles.common)
implementation(libs.coroutine.core)
implementation(libs.jackson.msgpack)
implementation(libs.jackson.parameter.names) // Add this


testImplementation(kotlin("test"))
testImplementation(libs.bundles.kotlin.tests)
Expand Down Expand Up @@ -43,4 +47,12 @@ tasks.register<Test>("runLiveObjectIntegrationTests") {

kotlin {
explicitApi()

/**
* Enables Jackson to map JSON property names to constructor parameters without use of @JsonProperty.
* Adds metadata params to bytecode class. Approach is completely binary-compatible with consumers of the library.
*/
compilerOptions {
javaParameters = true
}
}
8 changes: 4 additions & 4 deletions live-objects/src/main/kotlin/io/ably/lib/objects/Helpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ internal enum class ProtocolMessageFormat(private val value: String) {
override fun toString(): String = value
}

internal class Binary(val data: ByteArray?) {
internal class Binary(val data: ByteArray) {
override fun equals(other: Any?): Boolean {
if (this === other) return true
if (other !is Binary) return false
return data?.contentEquals(other.data) == true
return data.contentEquals(other.data)
}

override fun hashCode(): Int {
return data?.contentHashCode() ?: 0
return data.contentHashCode()
}
}

internal fun Binary.size(): Int {
return data?.size ?: 0
return data.size
}
Loading
Loading