diff --git a/oap-application/oap-application-test/src/test/java/oap/application/remote/KryoTest.java b/oap-application/oap-application-test/src/test/java/oap/application/remote/FSTTest.java similarity index 79% rename from oap-application/oap-application-test/src/test/java/oap/application/remote/KryoTest.java rename to oap-application/oap-application-test/src/test/java/oap/application/remote/FSTTest.java index 2b838ae84..b08368197 100644 --- a/oap-application/oap-application-test/src/test/java/oap/application/remote/KryoTest.java +++ b/oap-application/oap-application-test/src/test/java/oap/application/remote/FSTTest.java @@ -26,14 +26,15 @@ import org.testng.annotations.Test; +import java.io.IOException; import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; -public class KryoTest { +public class FSTTest { @Test - public void optional() { - assertThat( KryoConsts.readClassAndObject( KryoConsts.writeClassAndObject( Optional.empty() ) ) ).isEqualTo( Optional.empty() ); - assertThat( KryoConsts.readClassAndObject( KryoConsts.writeClassAndObject( Optional.of( "1" ) ) ) ).isEqualTo( Optional.of( "1" ) ); + public void optional() throws IOException { + assertThat( FstConsts.asObject( FstConsts.asByteArray( Optional.empty() ) ) ).isEqualTo( Optional.empty() ); + assertThat( FstConsts.asObject( FstConsts.asByteArray( Optional.of( "1" ) ) ) ).isEqualTo( Optional.of( "1" ) ); } } diff --git a/oap-application/oap-application/pom.xml b/oap-application/oap-application/pom.xml index ddaafdf0e..caaeee377 100644 --- a/oap-application/oap-application/pom.xml +++ b/oap-application/oap-application/pom.xml @@ -27,11 +27,6 @@ oap-application-cli ${project.version} - - de.javakaffee - kryo-serializers - 0.45 - org.projectlombok diff --git a/oap-application/oap-application/src/main/java/oap/application/remote/FstConsts.java b/oap-application/oap-application/src/main/java/oap/application/remote/FstConsts.java new file mode 100644 index 000000000..cda5b84d9 --- /dev/null +++ b/oap-application/oap-application/src/main/java/oap/application/remote/FstConsts.java @@ -0,0 +1,88 @@ +package oap.application.remote; + +import org.nustaq.serialization.FSTBasicObjectSerializer; +import org.nustaq.serialization.FSTClazzInfo; +import org.nustaq.serialization.FSTConfiguration; +import org.nustaq.serialization.FSTObjectInput; +import org.nustaq.serialization.FSTObjectOutput; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedDeque; + +@SuppressWarnings( "checkstyle:AbstractClassName" ) +public abstract class FstConsts { + private static final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>(); + + private static FSTConfiguration newInstance() { + FSTConfiguration configuration = FSTConfiguration.createDefaultConfiguration(); + + configuration.registerClass( RemoteInvocation.class ); + configuration.registerSerializer( Optional.class, new FSTOptionalSerializer(), false ); + + return configuration; + } + + @SuppressWarnings( "unchecked" ) + public static T readObject( DataInputStream dis, int size ) throws IOException { + byte[] bytes = new byte[size]; + dis.readFully( bytes ); + + return ( T ) asObject( bytes ); + } + + public static Object asObject( byte[] bytes ) throws IOException { + FSTConfiguration configuration = queue.poll(); + if( configuration == null ) { + configuration = newInstance(); + } + try { + return configuration.asObject( bytes ); + } finally { + queue.push( configuration ); + } + } + + public static T readObjectWithSize( DataInputStream is ) throws IOException { + int size = is.readInt(); + return readObject( is, size ); + } + + public static void writeObjectWithSize( DataOutputStream dataOutputStream, Object obj ) throws IOException { + byte[] sv = asByteArray( obj ); + dataOutputStream.writeInt( sv.length ); + dataOutputStream.write( sv ); + } + + public static byte[] asByteArray( Object obj ) { + FSTConfiguration configuration = queue.poll(); + if( configuration == null ) { + configuration = newInstance(); + } + try { + return configuration.asByteArray( obj ); + } finally { + queue.push( configuration ); + } + } + + private static class FSTOptionalSerializer extends FSTBasicObjectSerializer { + @Override + public void writeObject( FSTObjectOutput out, Object o, FSTClazzInfo fstClazzInfo, FSTClazzInfo.FSTFieldInfo fstFieldInfo, int i ) throws IOException { + Optional opt = ( Optional ) o; + if( opt.isPresent() ) out.writeObject( opt.get() ); + else out.writeObject( null ); + } + + @Override + public void readObject( FSTObjectInput in, Object toRead, FSTClazzInfo clzInfo, FSTClazzInfo.FSTFieldInfo referencedBy ) throws Exception { + } + + @Override + public Object instantiate( Class objectClass, FSTObjectInput in, FSTClazzInfo serializationInfo, FSTClazzInfo.FSTFieldInfo referencee, int streamPosition ) throws Exception { + return Optional.ofNullable( in.readObject() ); + } + } +} diff --git a/oap-application/oap-application/src/main/java/oap/application/remote/KryoConsts.java b/oap-application/oap-application/src/main/java/oap/application/remote/KryoConsts.java deleted file mode 100644 index eb87c04ab..000000000 --- a/oap-application/oap-application/src/main/java/oap/application/remote/KryoConsts.java +++ /dev/null @@ -1,139 +0,0 @@ -package oap.application.remote; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy; -import de.javakaffee.kryoserializers.ArraysAsListSerializer; -import de.javakaffee.kryoserializers.CollectionsEmptyListSerializer; -import de.javakaffee.kryoserializers.CollectionsEmptyMapSerializer; -import de.javakaffee.kryoserializers.CollectionsEmptySetSerializer; -import de.javakaffee.kryoserializers.CollectionsSingletonListSerializer; -import de.javakaffee.kryoserializers.CollectionsSingletonMapSerializer; -import de.javakaffee.kryoserializers.CollectionsSingletonSetSerializer; -import de.javakaffee.kryoserializers.GregorianCalendarSerializer; -import de.javakaffee.kryoserializers.JdkProxySerializer; -import de.javakaffee.kryoserializers.SynchronizedCollectionsSerializer; -import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; -import de.javakaffee.kryoserializers.guava.ArrayListMultimapSerializer; -import de.javakaffee.kryoserializers.guava.ArrayTableSerializer; -import de.javakaffee.kryoserializers.guava.HashBasedTableSerializer; -import de.javakaffee.kryoserializers.guava.HashMultimapSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableListSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer; -import de.javakaffee.kryoserializers.guava.ImmutableTableSerializer; -import de.javakaffee.kryoserializers.guava.LinkedHashMultimapSerializer; -import de.javakaffee.kryoserializers.guava.LinkedListMultimapSerializer; -import de.javakaffee.kryoserializers.guava.ReverseListSerializer; -import de.javakaffee.kryoserializers.guava.TreeBasedTableSerializer; -import de.javakaffee.kryoserializers.guava.TreeMultimapSerializer; -import de.javakaffee.kryoserializers.guava.UnmodifiableNavigableSetSerializer; -import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer; -import de.javakaffee.kryoserializers.jodatime.JodaLocalDateSerializer; -import de.javakaffee.kryoserializers.jodatime.JodaLocalDateTimeSerializer; -import de.javakaffee.kryoserializers.jodatime.JodaLocalTimeSerializer; -import org.joda.time.DateTime; -import org.joda.time.LocalDate; -import org.joda.time.LocalDateTime; -import org.objenesis.strategy.SerializingInstantiatorStrategy; - -import java.io.ByteArrayOutputStream; -import java.lang.reflect.InvocationHandler; -import java.util.Arrays; -import java.util.Collections; -import java.util.GregorianCalendar; -import java.util.concurrent.ConcurrentLinkedDeque; - -@SuppressWarnings( "checkstyle:AbstractClassName" ) -public abstract class KryoConsts { - private static final ConcurrentLinkedDeque queue = new ConcurrentLinkedDeque<>(); - - private static Kryo newInstance() { - Kryo kryo = new Kryo(); - kryo.setRegistrationRequired( false ); - kryo.setReferences( true ); - kryo.setInstantiatorStrategy( new DefaultInstantiatorStrategy( new SerializingInstantiatorStrategy() ) ); - - - // joda - kryo.register( DateTime.class, new JodaDateTimeSerializer() ); - kryo.register( LocalDate.class, new JodaLocalDateSerializer() ); - kryo.register( LocalDateTime.class, new JodaLocalDateTimeSerializer() ); - kryo.register( LocalDateTime.class, new JodaLocalTimeSerializer() ); - - // java - kryo.register( Arrays.asList( "" ).getClass(), new ArraysAsListSerializer() ); - kryo.register( Collections.EMPTY_LIST.getClass(), new CollectionsEmptyListSerializer() ); - kryo.register( Collections.EMPTY_MAP.getClass(), new CollectionsEmptyMapSerializer() ); - kryo.register( Collections.EMPTY_SET.getClass(), new CollectionsEmptySetSerializer() ); - kryo.register( Collections.singletonList( "" ).getClass(), new CollectionsSingletonListSerializer() ); - kryo.register( Collections.singleton( "" ).getClass(), new CollectionsSingletonSetSerializer() ); - kryo.register( Collections.singletonMap( "", "" ).getClass(), new CollectionsSingletonMapSerializer() ); - kryo.register( GregorianCalendar.class, new GregorianCalendarSerializer() ); - kryo.register( InvocationHandler.class, new JdkProxySerializer() ); - UnmodifiableCollectionsSerializer.registerSerializers( kryo ); - SynchronizedCollectionsSerializer.registerSerializers( kryo ); - - // guava - ImmutableListSerializer.registerSerializers( kryo ); - ImmutableSetSerializer.registerSerializers( kryo ); - ImmutableMapSerializer.registerSerializers( kryo ); - ImmutableMultimapSerializer.registerSerializers( kryo ); - ImmutableTableSerializer.registerSerializers( kryo ); - ReverseListSerializer.registerSerializers( kryo ); - UnmodifiableNavigableSetSerializer.registerSerializers( kryo ); - - ArrayListMultimapSerializer.registerSerializers( kryo ); - HashMultimapSerializer.registerSerializers( kryo ); - LinkedHashMultimapSerializer.registerSerializers( kryo ); - LinkedListMultimapSerializer.registerSerializers( kryo ); - TreeMultimapSerializer.registerSerializers( kryo ); - ArrayTableSerializer.registerSerializers( kryo ); - HashBasedTableSerializer.registerSerializers( kryo ); - TreeBasedTableSerializer.registerSerializers( kryo ); - - return kryo; - } - - public static byte[] writeClassAndObject( Object obj ) { - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - try( Output output = new Output( byteArrayOutputStream ) ) { - writeClassAndObject( output, obj ); - } - - return byteArrayOutputStream.toByteArray(); - } - - public static void writeClassAndObject( Output output, Object obj ) { - Kryo kryo = queue.poll(); - if( kryo == null ) { - kryo = newInstance(); - } - try { - kryo.writeClassAndObject( output, obj ); - } finally { - queue.push( kryo ); - } - } - - public static Object readClassAndObject( byte[] bytes ) { - Input input = new Input( bytes ); - - return readClassAndObject( input ); - } - - public static Object readClassAndObject( Input input ) { - Kryo kryo = queue.poll(); - if( kryo == null ) { - kryo = newInstance(); - } - try { - return kryo.readClassAndObject( input ); - } finally { - queue.push( kryo ); - } - - } -} diff --git a/oap-application/oap-application/src/main/java/oap/application/remote/Remote.java b/oap-application/oap-application/src/main/java/oap/application/remote/Remote.java index f30f731df..0ca59a515 100644 --- a/oap-application/oap-application/src/main/java/oap/application/remote/Remote.java +++ b/oap-application/oap-application/src/main/java/oap/application/remote/Remote.java @@ -23,8 +23,6 @@ */ package oap.application.remote; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Tags; @@ -35,6 +33,10 @@ import oap.util.function.Try; import org.apache.commons.lang3.mutable.MutableInt; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; @@ -143,22 +145,23 @@ public void handleRequest( HttpServerExchange exchange ) { exchange.setResponseHeader( CONTENT_TYPE, APPLICATION_OCTET_STREAM ); try( OutputStream outputStream = exchange.getOutputStream(); - Output out = new Output( outputStream ) ) { - out.writeBoolean( ex == null ); + BufferedOutputStream bos = new BufferedOutputStream( outputStream ); + DataOutputStream dos = new DataOutputStream( bos ) ) { + dos.writeBoolean( ex == null ); if( ex != null ) { - KryoConsts.writeClassAndObject( out, ex ); + FstConsts.writeObjectWithSize( dos, ex ); } else if( v instanceof Stream ) { - out.writeBoolean( true ); + dos.writeBoolean( true ); ( ( Stream ) v ).forEach( Try.consume( obj -> { - out.writeBoolean( true ); - KryoConsts.writeClassAndObject( out, obj ); + dos.writeBoolean( true ); + FstConsts.writeObjectWithSize( dos, obj ); } ) ); - out.writeBoolean( false ); + dos.writeBoolean( false ); } else { - out.writeBoolean( false ); - KryoConsts.writeClassAndObject( out, v ); + dos.writeBoolean( false ); + FstConsts.writeObjectWithSize( dos, v ); } } catch( Throwable e ) { log.error( "invocation {}", finalInvocation, e ); @@ -175,11 +178,11 @@ public void handleRequest( HttpServerExchange exchange ) { } } - public RemoteInvocation getRemoteInvocation( InputStream body ) { - Input in = new Input( body ); - int version = in.readInt(); + public RemoteInvocation getRemoteInvocation( InputStream body ) throws IOException { + DataInputStream dis = new DataInputStream( body ); + int version = dis.readInt(); - RemoteInvocation invocation = ( RemoteInvocation ) KryoConsts.readClassAndObject( in ); + RemoteInvocation invocation = FstConsts.readObjectWithSize( dis ); log.trace( "invoke v{} - {}", version, invocation ); return invocation; } diff --git a/oap-application/oap-application/src/main/java/oap/application/remote/RemoteInvocationHandler.java b/oap-application/oap-application/src/main/java/oap/application/remote/RemoteInvocationHandler.java index a5879734f..4b357d3f9 100644 --- a/oap-application/oap-application/src/main/java/oap/application/remote/RemoteInvocationHandler.java +++ b/oap-application/oap-application/src/main/java/oap/application/remote/RemoteInvocationHandler.java @@ -23,8 +23,6 @@ */ package oap.application.remote; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SimpleTimeLimiter; import io.micrometer.core.instrument.Counter; @@ -48,6 +46,8 @@ import java.io.BufferedInputStream; import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.lang.invoke.MethodHandles; @@ -201,13 +201,13 @@ public void onResponse( @NotNull Call call, @NotNull Response response ) { if( response.code() == HTTP_OK ) { InputStream inputStream = response.body().byteStream(); BufferedInputStream bis = new BufferedInputStream( inputStream ); - Input in = new Input( bis ); - boolean success = in.readBoolean(); + DataInputStream dis = new DataInputStream( bis ); + boolean success = dis.readBoolean(); try { if( !success ) { try { - Throwable throwable = ( Throwable ) KryoConsts.readClassAndObject( in ); + Throwable throwable = FstConsts.readObjectWithSize( dis ); if( throwable instanceof RemoteInvocationException riex ) { errorMetrics.increment(); @@ -217,29 +217,29 @@ public void onResponse( @NotNull Call call, @NotNull Response response ) { errorMetrics.increment(); return async ? CompletableFuture.>failedStage( throwable ) : CompletableFuture.completedStage( Result.failure( throwable ) ); } finally { - in.close(); + dis.close(); } } else { - boolean stream = in.readBoolean(); + boolean stream = dis.readBoolean(); if( stream ) { - ChainIterator it = new ChainIterator( in ); + ChainIterator it = new ChainIterator( dis ); return CompletableFuture.completedStage( Result.success( Stream.of( it ).onClose( Try.run( () -> { - in.close(); + dis.close(); successMetrics.increment(); } ) ) ) ); } else { try { - Result r = Result.success( KryoConsts.readClassAndObject( in ) ); + Result r = Result.success( FstConsts.readObjectWithSize( dis ) ); successMetrics.increment(); return CompletableFuture.completedStage( r ); } finally { - in.close(); + dis.close(); } } } } catch( Exception e ) { - in.close(); + dis.close(); return retException( e, async ); } } else { @@ -292,9 +292,9 @@ private byte[] getInvocation( Method method, List arg Reference reference = ServiceKernelCommand.INSTANCE.reference( service, null ); ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try( Output out = new Output( baos ) ) { + try( DataOutputStream out = new DataOutputStream( baos ) ) { out.writeInt( RemoteInvocation.VERSION ); - KryoConsts.writeClassAndObject( out, new RemoteInvocation( reference.toString(), method.getName(), arguments ) ); + FstConsts.writeObjectWithSize( out, new RemoteInvocation( reference.toString(), method.getName(), arguments ) ); } return baos.toByteArray(); @@ -306,12 +306,12 @@ public String toString() { } private static class ChainIterator implements Iterator { - private final Input in; + private final DataInputStream dis; private Object obj; private boolean end; - ChainIterator( Input dis ) { - this.in = dis; + ChainIterator( DataInputStream dis ) { + this.dis = dis; obj = null; end = false; } @@ -323,13 +323,13 @@ public boolean hasNext() { if( obj != null ) return true; - boolean b = in.readBoolean(); + boolean b = dis.readBoolean(); if( b ) { - obj = KryoConsts.readClassAndObject( in ); + obj = FstConsts.readObjectWithSize( dis ); } else { end = true; obj = null; - in.close(); + dis.close(); } return obj != null; diff --git a/oap-application/oap-application/src/main/java/oap/application/remote/RemoteSerialization.java b/oap-application/oap-application/src/main/java/oap/application/remote/RemoteSerialization.java index c7b3890da..988522b7a 100644 --- a/oap-application/oap-application/src/main/java/oap/application/remote/RemoteSerialization.java +++ b/oap-application/oap-application/src/main/java/oap/application/remote/RemoteSerialization.java @@ -62,16 +62,15 @@ public Object invoke( Object proxy, Method method, Object[] args ) throws Throwa parameters[i].getType(), args[i] ) ); } - byte[] content = KryoConsts.writeClassAndObject( new RemoteInvocation( "service", method.getName(), arguments ) ); - RemoteInvocation ri = ( RemoteInvocation ) KryoConsts.readClassAndObject( content ); + byte[] content = FstConsts.asByteArray( new RemoteInvocation( "service", method.getName(), arguments ) ); + RemoteInvocation ri = ( RemoteInvocation ) FstConsts.asObject( content ); Object result = master.getClass() .getMethod( ri.method, ri.types() ) .invoke( master, ri.values() ); - content = KryoConsts.writeClassAndObject( result ); - - return KryoConsts.readClassAndObject( content ); + content = FstConsts.asByteArray( result ); + return FstConsts.asObject( content ); } } diff --git a/oap-stdlib/pom.xml b/oap-stdlib/pom.xml index fd0abb7f2..0c7622153 100644 --- a/oap-stdlib/pom.xml +++ b/oap-stdlib/pom.xml @@ -269,11 +269,24 @@ ${oap.deps.plugins.antlr4.version} - - com.esotericsoftware - kryo - ${oap.deps.kryo.version} + de.ruedigermoeller + fst + ${oap.deps.fst.version} + + + com.fasterxml.jackson.core + jackson-core + + + org.javassist + javassist + + + org.apache.felix + maven-bundle-plugin + + javax.activation diff --git a/pom.xml b/pom.xml index 1914647d4..6f164b2e5 100644 --- a/pom.xml +++ b/pom.xml @@ -71,7 +71,7 @@ - 25.0.12 + 25.0.13 22.0.1 25.0.0 @@ -82,8 +82,6 @@ 5.1.0 - 5.6.2 - 4.4.16 4.5.14 4.1.5 @@ -104,6 +102,7 @@ 2.2.1 0.25 1.5.5-11 + 3.0.4-jdk17 3.0.3 1.1.1 1.2.21