Skip to content
This repository was archived by the owner on Aug 7, 2018. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,28 @@ public void activateOptions() {

@Override
protected void append(LoggingEvent loggingEvent) {
String message = subAppend(loggingEvent);
ListenableFuture<byte[]> chiResp = cs.tsWrite(key.getBytes(), message.getBytes());
Futures.addCallback(chiResp, new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] bytes) {

}

@Override
public void onFailure(Throwable throwable) {
// TODO(JR): Maybe Try again?
try {
String message = subAppend(loggingEvent);
ListenableFuture<byte[]> chiResp = cs.tsWrite(key.getBytes(), message.getBytes());
if (chiResp != null) {
Futures.addCallback(chiResp, new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] bytes) {

}

@Override
public void onFailure(Throwable throwable) {
// TODO(JR): Maybe Try again?
}
});
} else {
//Todo : Maybe try again since the future was null.
}
});
} catch (Exception e){
e.printStackTrace();
throw new RuntimeException(e);
}
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
*/
public class AsyncChicagoAppenderTest {

//@Test
public void testAppenderNoServer(){
@Test
public void testAppeclientnderNoServer(){
AsyncChicagoAppender chicagoAppender = new AsyncChicagoAppender();
chicagoAppender.setChicagoZk("localhost:2181");
chicagoAppender.setChicagoZk("badIP:2181");
chicagoAppender.setKey("TestKey");
long start = System.currentTimeMillis();
try {
chicagoAppender.activateOptions();
}catch (Exception e){
e.printStackTrace();
long timeDifference = System.currentTimeMillis() - start;
assert(timeDifference < 3000);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import io.netty.channel.ChannelHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.http2.StreamBufferingEncoder;
import io.netty.util.internal.PlatformDependent;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -338,7 +340,6 @@ public ListenableFuture<byte[]> tsWrite(byte[] topic, byte[] key, byte[] val) {
SettableFuture<byte[]> f = SettableFuture.create();
futureMap.put(id, f);
futureList.add(f);

Futures.addCallback(f, new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] bytes) {
Expand All @@ -358,17 +359,21 @@ public void onSuccess(@Nullable Boolean aBoolean) {

@Override
public void onFailure(Throwable throwable) {
Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.TS_WRITE, topic, key, val)), new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean aBoolean) {

}

@Override
public void onFailure(Throwable throwable) {

}
});
if(throwable instanceof ClosedChannelException){
f.setException(throwable);
} else {
Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.TS_WRITE, topic, key, val)), new FutureCallback<Boolean>() {
@Override
public void onSuccess(@Nullable Boolean aBoolean) {

}

@Override
public void onFailure(Throwable throwable) {
f.setException(throwable);
}
});
}
}
});
});
Expand All @@ -381,11 +386,7 @@ public void onSuccess(@Nullable List<byte[]> bytes) {

@Override
public void onFailure(Throwable throwable) {
try {
respFuture.set(tsWrite(topic,key, val).get(TIMEOUT, TimeUnit.MILLISECONDS));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
respFuture.setException(e);
}
respFuture.setException(throwable);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import javax.annotation.Nullable;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
Expand Down Expand Up @@ -94,6 +95,7 @@ public void close() {
for(ScheduledFuture f : scheduledFutures){
f.cancel(true);
}
stopMessageQ();
}

public void shutdownGracefully() {
Expand Down Expand Up @@ -177,7 +179,7 @@ public void write(T sendReq, SettableFuture<Boolean> f) {
}
}

private Channel requestNode(){
public Channel requestNode(){

ChannelFuture cf = connectionQ.removeFirst();

Expand Down Expand Up @@ -246,7 +248,7 @@ public void operationComplete(Future<? super Void> future) throws Exception {
if (future.isSuccess()) {
mm.getF().set(true);
} else {
mm.getF().set(false);
//mm.getF().set(false);
mm.getF().setException(future.cause());
}
}
Expand All @@ -265,14 +267,21 @@ public void operationComplete(Future<? super Void> future) throws Exception {
counter.decrementAndGet();
f.set(true);
} else {
f.set(false);
//f.set(false);
f.setException(future.cause());
}
}
});
}
}

private void stopMessageQ(){
while(messageQ.peekFirst() != null){
final MuxedMessage<T> mm = messageQ.pollFirst();
mm.getF().setException(new ClosedChannelException());
}
}

@Data
private class MuxedMessage<T> {
private final T msg;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.ListenableFuture;
import com.xjeffrose.chicago.ChiUtil;
import com.xjeffrose.chicago.client.ChicagoAsyncClient;
import com.xjeffrose.chicago.client.ChicagoClientException;
import com.xjeffrose.chicago.client.ChicagoClientTimeoutException;
import com.xjeffrose.chicago.client.ChicagoClient;
Expand All @@ -14,17 +15,15 @@
* Created by root on 6/22/16.
*/
public class ChicagoStreamExample {
ChicagoClient chicagoClient;
ChicagoAsyncClient chicagoClient;

//private static String key = "ppfe-msmaster-LM-SJN-00875858";
private static String key = "ppfe-test-cc";
private static String key = "ppfe-tests";

public static void main(String[] args) throws Exception{
ChicagoStreamExample cs = new ChicagoStreamExample();
cs.chicagoClient = new ChicagoClient("10.24.25.188:2181,10.24.25.189:2181,10.25.145.56:2181,10.24.33.123:2181",3);
//cs.chicagoClient = new ChicagoClient("ppfe-msmaster-stage2cs2678.qa.paypal.com",3);


cs.chicagoClient = new ChicagoAsyncClient("10.24.25.188:2181,10.24.25.189:2181,10.25.145.56:2181,10.24.33.123:2181",3);
cs.chicagoClient.start();

//cs.chicagoClient.startAndWaitForNodes(4);
//cs.writeSomeData();
Expand All @@ -36,7 +35,7 @@ public static void main(String[] args) throws Exception{

public void printStream() throws Exception{
Long offset = 3L;
byte[] resultArray = chicagoClient.read(key.getBytes(), Longs.toByteArray(offset)).get().get(0);
byte[] resultArray = chicagoClient.stream(key.getBytes(), Longs.toByteArray(offset)).get();
String result = new String(resultArray);
System.out.println(result);
}
Expand All @@ -47,82 +46,80 @@ public void writeSomeData() throws Exception{
for(int i =0;i<100000;i++){
try {
System.out.println(i);
System.out.println("Response = " + Longs.fromByteArray(chicagoClient.tsWrite(key.getBytes(), (i+val).getBytes()).get().get(0)));
System.out.println("Response = " + Longs.fromByteArray(chicagoClient.tsWrite(key.getBytes(), (i+val).getBytes()).get()));
//Thread.sleep(500);
} catch (ChicagoClientException e){
e.printStackTrace();
} catch (ChicagoClientTimeoutException e){
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("Total time taken :"+ (System.currentTimeMillis() - startTime) + "ms");
}


public void transactStream() throws Exception {
Long offset = 0L;

ListenableFuture<List<byte[]>> resp = chicagoClient.stream(key.getBytes(), Longs.toByteArray(offset));

byte[] resultArray = resp.get().get(0);
String result = new String(resultArray);
long old=-1;
while(true){
if(!result.contains(ChiUtil.delimiter)){
System.out.println("No delimetr present");
System.out.println(result);
break;

}

offset = ChiUtil.findOffset(resultArray);
String[] lines = (result.split(ChiUtil.delimiter)[0]).split("\0");
int count =0;
for(String line : lines){
if(line.length()!= 0) {
System.out.println(offset +":" +line);
count++;
}
}
if(count > 0){
offset = offset + 1;
}
if(old != -1 && (old == offset)){
Thread.sleep(500);
}

ListenableFuture<List<byte[]>> newresp = chicagoClient.stream(key.getBytes(), Longs.toByteArray(offset));
resultArray = newresp.get().get(0);
result = new String(resultArray);
old = offset;
}
}

public void transactStreamWithBuf() throws Exception {
ByteBuf buffer = chicagoClient.aggregatedStream(key.getBytes(),Longs.toByteArray(9999));
int readableBytes = buffer.readableBytes();
int i =0;
while(true) {
if (readableBytes > 0) {
byte[] data = new byte[buffer.readableBytes()];
try {
buffer.readBytes(data);
String stringData = new String(data);
String[] lines = (stringData).split("\0");
for (String line : lines) {
line.replace('\n',' ');
System.out.println(++i + line);
if(i>400000){
break;
}
}
}catch (Exception e){
e.printStackTrace();
}
} else {
Thread.sleep(100);
}
readableBytes = buffer.readableBytes();
}
}
// public void transactStream() throws Exception {
// Long offset = 0L;
//
// ListenableFuture<List<byte[]>> resp = chicagoClient.stream(key.getBytes(), Longs.toByteArray(offset));
//
// byte[] resultArray = resp.get().get(0);
// String result = new String(resultArray);
// long old=-1;
// while(true){
// if(!result.contains(ChiUtil.delimiter)){
// System.out.println("No delimetr present");
// System.out.println(result);
// break;
//
// }
//
// offset = ChiUtil.findOffset(resultArray);
// String[] lines = (result.split(ChiUtil.delimiter)[0]).split("\0");
// int count =0;
// for(String line : lines){
// if(line.length()!= 0) {
// System.out.println(offset +":" +line);
// count++;
// }
// }
// if(count > 0){
// offset = offset + 1;
// }
// if(old != -1 && (old == offset)){
// Thread.sleep(500);
// }
//
// ListenableFuture<List<byte[]>> newresp = chicagoClient.stream(key.getBytes(), Longs.toByteArray(offset));
// resultArray = newresp.get().get(0);
// result = new String(resultArray);
// old = offset;
// }
// }
//
// public void transactStreamWithBuf() throws Exception {
// ByteBuf buffer = chicagoClient.aggregatedStream(key.getBytes(),Longs.toByteArray(9999));
// int readableBytes = buffer.readableBytes();
// int i =0;
// while(true) {
// if (readableBytes > 0) {
// byte[] data = new byte[buffer.readableBytes()];
// try {
// buffer.readBytes(data);
// String stringData = new String(data);
// String[] lines = (stringData).split("\0");
// for (String line : lines) {
// line.replace('\n',' ');
// System.out.println(++i + line);
// if(i>400000){
// break;
// }
// }
// }catch (Exception e){
// e.printStackTrace();
// }
// } else {
// Thread.sleep(100);
// }
// readableBytes = buffer.readableBytes();
// }
// }
}
Loading