Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@

import org.testng.annotations.Test;

import java.io.IOException;
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;

public class KryoTest {
public class FSTTest {
@Test
public void optional() {
assertThat( KryoConsts.readClassAndObject( KryoConsts.writeClassAndObject( Optional.empty() ) ) ).isEqualTo( Optional.empty() );
assertThat( KryoConsts.readClassAndObject( KryoConsts.writeClassAndObject( Optional.of( "1" ) ) ) ).isEqualTo( Optional.of( "1" ) );
public void optional() throws IOException {
assertThat( FstConsts.asObject( FstConsts.asByteArray( Optional.empty() ) ) ).isEqualTo( Optional.empty() );
assertThat( FstConsts.asObject( FstConsts.asByteArray( Optional.of( "1" ) ) ) ).isEqualTo( Optional.of( "1" ) );
}
}
5 changes: 0 additions & 5 deletions oap-application/oap-application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,6 @@
<artifactId>oap-application-cli</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>de.javakaffee</groupId>
<artifactId>kryo-serializers</artifactId>
<version>0.45</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package oap.application.remote;

import org.nustaq.serialization.FSTBasicObjectSerializer;
import org.nustaq.serialization.FSTClazzInfo;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.FSTObjectInput;
import org.nustaq.serialization.FSTObjectOutput;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;

@SuppressWarnings( "checkstyle:AbstractClassName" )
public abstract class FstConsts {
private static final ConcurrentLinkedDeque<FSTConfiguration> queue = new ConcurrentLinkedDeque<>();

private static FSTConfiguration newInstance() {
FSTConfiguration configuration = FSTConfiguration.createDefaultConfiguration();

configuration.registerClass( RemoteInvocation.class );
configuration.registerSerializer( Optional.class, new FSTOptionalSerializer(), false );

return configuration;
}

@SuppressWarnings( "unchecked" )
public static <T> T readObject( DataInputStream dis, int size ) throws IOException {
byte[] bytes = new byte[size];
dis.readFully( bytes );

return ( T ) asObject( bytes );
}

public static Object asObject( byte[] bytes ) throws IOException {
FSTConfiguration configuration = queue.poll();
if( configuration == null ) {
configuration = newInstance();
}
try {
return configuration.asObject( bytes );
} finally {
queue.push( configuration );
}
}

public static <T> T readObjectWithSize( DataInputStream is ) throws IOException {
int size = is.readInt();
return readObject( is, size );
}

public static void writeObjectWithSize( DataOutputStream dataOutputStream, Object obj ) throws IOException {
byte[] sv = asByteArray( obj );
dataOutputStream.writeInt( sv.length );
dataOutputStream.write( sv );
}

public static byte[] asByteArray( Object obj ) {
FSTConfiguration configuration = queue.poll();
if( configuration == null ) {
configuration = newInstance();
}
try {
return configuration.asByteArray( obj );
} finally {
queue.push( configuration );
}
}

private static class FSTOptionalSerializer extends FSTBasicObjectSerializer {
@Override
public void writeObject( FSTObjectOutput out, Object o, FSTClazzInfo fstClazzInfo, FSTClazzInfo.FSTFieldInfo fstFieldInfo, int i ) throws IOException {
Optional<?> opt = ( Optional<?> ) o;
if( opt.isPresent() ) out.writeObject( opt.get() );
else out.writeObject( null );
}

@Override
public void readObject( FSTObjectInput in, Object toRead, FSTClazzInfo clzInfo, FSTClazzInfo.FSTFieldInfo referencedBy ) throws Exception {
}

@Override
public Object instantiate( Class objectClass, FSTObjectInput in, FSTClazzInfo serializationInfo, FSTClazzInfo.FSTFieldInfo referencee, int streamPosition ) throws Exception {
return Optional.ofNullable( in.readObject() );
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
*/
package oap.application.remote;

import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tags;
Expand All @@ -35,6 +33,10 @@
import oap.util.function.Try;
import org.apache.commons.lang3.mutable.MutableInt;

import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -143,22 +145,23 @@ public void handleRequest( HttpServerExchange exchange ) {
exchange.setResponseHeader( CONTENT_TYPE, APPLICATION_OCTET_STREAM );

try( OutputStream outputStream = exchange.getOutputStream();
Output out = new Output( outputStream ) ) {
out.writeBoolean( ex == null );
BufferedOutputStream bos = new BufferedOutputStream( outputStream );
DataOutputStream dos = new DataOutputStream( bos ) ) {
dos.writeBoolean( ex == null );

if( ex != null ) {
KryoConsts.writeClassAndObject( out, ex );
FstConsts.writeObjectWithSize( dos, ex );
} else if( v instanceof Stream<?> ) {
out.writeBoolean( true );
dos.writeBoolean( true );

( ( Stream<?> ) v ).forEach( Try.consume( obj -> {
out.writeBoolean( true );
KryoConsts.writeClassAndObject( out, obj );
dos.writeBoolean( true );
FstConsts.writeObjectWithSize( dos, obj );
} ) );
out.writeBoolean( false );
dos.writeBoolean( false );
} else {
out.writeBoolean( false );
KryoConsts.writeClassAndObject( out, v );
dos.writeBoolean( false );
FstConsts.writeObjectWithSize( dos, v );
}
} catch( Throwable e ) {
log.error( "invocation {}", finalInvocation, e );
Expand All @@ -175,11 +178,11 @@ public void handleRequest( HttpServerExchange exchange ) {
}
}

public RemoteInvocation getRemoteInvocation( InputStream body ) {
Input in = new Input( body );
int version = in.readInt();
public RemoteInvocation getRemoteInvocation( InputStream body ) throws IOException {
DataInputStream dis = new DataInputStream( body );
int version = dis.readInt();

RemoteInvocation invocation = ( RemoteInvocation ) KryoConsts.readClassAndObject( in );
RemoteInvocation invocation = FstConsts.<RemoteInvocation>readObjectWithSize( dis );
log.trace( "invoke v{} - {}", version, invocation );
return invocation;
}
Expand Down
Loading