From f819c4a97116de2c9a3b6212cf5430db51a54caf Mon Sep 17 00:00:00 2001 From: "Madan, Smarth" Date: Fri, 19 Aug 2016 16:48:08 -0700 Subject: [PATCH 1/3] fixing tests --- .../examples/ChicagoStreamExample.java | 151 +++++++++--------- .../chicago/tools/WritePerformance.java | 5 +- .../chicago/tools/WritePerformanceAsync.java | 16 +- 3 files changed, 86 insertions(+), 86 deletions(-) diff --git a/src/main/java/com/xjeffrose/chicago/examples/ChicagoStreamExample.java b/src/main/java/com/xjeffrose/chicago/examples/ChicagoStreamExample.java index 8e693ab..e7d48b0 100644 --- a/src/main/java/com/xjeffrose/chicago/examples/ChicagoStreamExample.java +++ b/src/main/java/com/xjeffrose/chicago/examples/ChicagoStreamExample.java @@ -3,6 +3,7 @@ import com.google.common.primitives.Longs; import com.google.common.util.concurrent.ListenableFuture; import com.xjeffrose.chicago.ChiUtil; +import com.xjeffrose.chicago.client.ChicagoAsyncClient; import com.xjeffrose.chicago.client.ChicagoClientException; import com.xjeffrose.chicago.client.ChicagoClientTimeoutException; import com.xjeffrose.chicago.client.ChicagoClient; @@ -14,17 +15,15 @@ * Created by root on 6/22/16. */ public class ChicagoStreamExample { - ChicagoClient chicagoClient; + ChicagoAsyncClient chicagoClient; //private static String key = "ppfe-msmaster-LM-SJN-00875858"; - private static String key = "ppfe-test-cc"; + private static String key = "ppfe-tests"; public static void main(String[] args) throws Exception{ ChicagoStreamExample cs = new ChicagoStreamExample(); - cs.chicagoClient = new ChicagoClient("10.24.25.188:2181,10.24.25.189:2181,10.25.145.56:2181,10.24.33.123:2181",3); - //cs.chicagoClient = new ChicagoClient("ppfe-msmaster-stage2cs2678.qa.paypal.com",3); - - + cs.chicagoClient = new ChicagoAsyncClient("10.24.25.188:2181,10.24.25.189:2181,10.25.145.56:2181,10.24.33.123:2181",3); + cs.chicagoClient.start(); //cs.chicagoClient.startAndWaitForNodes(4); //cs.writeSomeData(); @@ -36,7 +35,7 @@ public static void main(String[] args) throws Exception{ public void printStream() throws Exception{ Long offset = 3L; - byte[] resultArray = chicagoClient.read(key.getBytes(), Longs.toByteArray(offset)).get().get(0); + byte[] resultArray = chicagoClient.stream(key.getBytes(), Longs.toByteArray(offset)).get(); String result = new String(resultArray); System.out.println(result); } @@ -47,11 +46,9 @@ public void writeSomeData() throws Exception{ for(int i =0;i<100000;i++){ try { System.out.println(i); - System.out.println("Response = " + Longs.fromByteArray(chicagoClient.tsWrite(key.getBytes(), (i+val).getBytes()).get().get(0))); + System.out.println("Response = " + Longs.fromByteArray(chicagoClient.tsWrite(key.getBytes(), (i+val).getBytes()).get())); //Thread.sleep(500); - } catch (ChicagoClientException e){ - e.printStackTrace(); - } catch (ChicagoClientTimeoutException e){ + } catch (Exception e) { e.printStackTrace(); } } @@ -59,70 +56,70 @@ public void writeSomeData() throws Exception{ } - public void transactStream() throws Exception { - Long offset = 0L; - - ListenableFuture> resp = chicagoClient.stream(key.getBytes(), Longs.toByteArray(offset)); - - byte[] resultArray = resp.get().get(0); - String result = new String(resultArray); - long old=-1; - while(true){ - if(!result.contains(ChiUtil.delimiter)){ - System.out.println("No delimetr present"); - System.out.println(result); - break; - - } - - offset = ChiUtil.findOffset(resultArray); - String[] lines = (result.split(ChiUtil.delimiter)[0]).split("\0"); - int count =0; - for(String line : lines){ - if(line.length()!= 0) { - System.out.println(offset +":" +line); - count++; - } - } - if(count > 0){ - offset = offset + 1; - } - if(old != -1 && (old == offset)){ - Thread.sleep(500); - } - - ListenableFuture> newresp = chicagoClient.stream(key.getBytes(), Longs.toByteArray(offset)); - resultArray = newresp.get().get(0); - result = new String(resultArray); - old = offset; - } - } - - public void transactStreamWithBuf() throws Exception { - ByteBuf buffer = chicagoClient.aggregatedStream(key.getBytes(),Longs.toByteArray(9999)); - int readableBytes = buffer.readableBytes(); - int i =0; - while(true) { - if (readableBytes > 0) { - byte[] data = new byte[buffer.readableBytes()]; - try { - buffer.readBytes(data); - String stringData = new String(data); - String[] lines = (stringData).split("\0"); - for (String line : lines) { - line.replace('\n',' '); - System.out.println(++i + line); - if(i>400000){ - break; - } - } - }catch (Exception e){ - e.printStackTrace(); - } - } else { - Thread.sleep(100); - } - readableBytes = buffer.readableBytes(); - } - } +// public void transactStream() throws Exception { +// Long offset = 0L; +// +// ListenableFuture> resp = chicagoClient.stream(key.getBytes(), Longs.toByteArray(offset)); +// +// byte[] resultArray = resp.get().get(0); +// String result = new String(resultArray); +// long old=-1; +// while(true){ +// if(!result.contains(ChiUtil.delimiter)){ +// System.out.println("No delimetr present"); +// System.out.println(result); +// break; +// +// } +// +// offset = ChiUtil.findOffset(resultArray); +// String[] lines = (result.split(ChiUtil.delimiter)[0]).split("\0"); +// int count =0; +// for(String line : lines){ +// if(line.length()!= 0) { +// System.out.println(offset +":" +line); +// count++; +// } +// } +// if(count > 0){ +// offset = offset + 1; +// } +// if(old != -1 && (old == offset)){ +// Thread.sleep(500); +// } +// +// ListenableFuture> newresp = chicagoClient.stream(key.getBytes(), Longs.toByteArray(offset)); +// resultArray = newresp.get().get(0); +// result = new String(resultArray); +// old = offset; +// } +// } +// +// public void transactStreamWithBuf() throws Exception { +// ByteBuf buffer = chicagoClient.aggregatedStream(key.getBytes(),Longs.toByteArray(9999)); +// int readableBytes = buffer.readableBytes(); +// int i =0; +// while(true) { +// if (readableBytes > 0) { +// byte[] data = new byte[buffer.readableBytes()]; +// try { +// buffer.readBytes(data); +// String stringData = new String(data); +// String[] lines = (stringData).split("\0"); +// for (String line : lines) { +// line.replace('\n',' '); +// System.out.println(++i + line); +// if(i>400000){ +// break; +// } +// } +// }catch (Exception e){ +// e.printStackTrace(); +// } +// } else { +// Thread.sleep(100); +// } +// readableBytes = buffer.readableBytes(); +// } +// } } diff --git a/src/main/java/com/xjeffrose/chicago/tools/WritePerformance.java b/src/main/java/com/xjeffrose/chicago/tools/WritePerformance.java index 5eae987..b9ac1db 100644 --- a/src/main/java/com/xjeffrose/chicago/tools/WritePerformance.java +++ b/src/main/java/com/xjeffrose/chicago/tools/WritePerformance.java @@ -14,7 +14,7 @@ import javax.annotation.Nullable; public class WritePerformance { - private final static String key = "ppfe-test-cc"; + private final static String key = "ppfe-test-sm"; private static final long NS_PER_MS = 1000000L; private static final long NS_PER_SEC = 1000 * NS_PER_MS; private static final long MIN_SLEEP_NS = 2 * NS_PER_MS; @@ -63,7 +63,7 @@ public static void main(String[] args) throws Exception { val[j] = (byte) (random.nextInt(26) + 65); //String v = "val" +i + "TTE-cc"; Callback cb = stats.nextCompletion(sendStart, val.length, stats); - ListenableFuture> future = ctsa[i % clients].tsWrite(key.getBytes(), val); + ListenableFuture> future = ctsa[i % clients].tsbatchWrite(key.getBytes(), val); Futures.addCallback(future, cb); if (throughput > 0) { sleepDeficitNs += sleepTime; @@ -220,7 +220,6 @@ public Callback(int iter, long start, int bytes, Stats stats, CountDownLatch lat @Override public void onSuccess(@Nullable List bytes) { - System.out.println("Got response :" + success.incrementAndGet()); long now = System.currentTimeMillis(); int latency = (int) (now - start); this.stats.record(iteration, latency, nbytes, now); diff --git a/src/main/java/com/xjeffrose/chicago/tools/WritePerformanceAsync.java b/src/main/java/com/xjeffrose/chicago/tools/WritePerformanceAsync.java index 91e0d79..534ee80 100644 --- a/src/main/java/com/xjeffrose/chicago/tools/WritePerformanceAsync.java +++ b/src/main/java/com/xjeffrose/chicago/tools/WritePerformanceAsync.java @@ -15,7 +15,7 @@ import javax.annotation.Nullable; public class WritePerformanceAsync { - private final static String key = "ppfe-test"; + private final static String key = "ppfe-tests"; private static final long NS_PER_MS = 1000000L; private static final long NS_PER_SEC = 1000 * NS_PER_MS; private static final long MIN_SLEEP_NS = 2 * NS_PER_MS; @@ -38,7 +38,7 @@ public static void main(String[] args) throws Exception { // int throughput = Integer.parseInt(args[3]); // final String connectionString = args[4]; - final int loop = 1000000; + final int loop = 10; final int size = 100; final int clients = 1; int throughput = -1; @@ -63,17 +63,21 @@ public static void main(String[] args) throws Exception { Stats stats = new Stats(loop, 5000, latch); System.out.println("######## Statring writes #########"); long startTime = System.currentTimeMillis(); - for (int i = 0; i < loop; i++) { + for (int i = 20; i < 30; i++) { long sendStart = System.currentTimeMillis(); byte[] val = new byte[size]; Random random = new Random(0); +// byte[] val = ("I stage2cs2678.qa.paypal.com 0818 17:41:31.3521471542091352 THREAD142 " + +// "1 Corrids: 1 -e34a0ccfcda20 Uris: 1 /en_US/i/logo/paypal_logo.gif ResponseCode: 1 200 | " + +// "Outbound: [id: 0x8e34f7bd, L:/10.57.62.106:42630 - R:stage2cs2534.qa.paypal.com/10.57.62.15:443] " + +// "NumOfReads: 1").getBytes(); for (int j = 0; j < val.length; ++j) val[j] = (byte) (random.nextInt(26) + 65); //String v = "val" +i + "TTE-cc"; -// Callback cb = stats.nextCompletion(sendStart, val.length, stats); -// ListenableFuture future = ctsa[i % clients].tsWrite(key.getBytes(), val); Callback cb = stats.nextCompletion(sendStart, val.length, stats); - ListenableFuture future = ctsa[i % clients].write(key.concat(String.valueOf(i)).getBytes(), val); + ListenableFuture future = ctsa[i % clients].tsWrite(key.getBytes(), val); +// Callback cb = stats.nextCompletion(sendStart, val.length, stats); +// ListenableFuture future = ctsa[i % clients].write(key.getBytes(),key.concat(String.valueOf(i)).getBytes(), val); Futures.addCallback(future, cb); if (throughput > 0) { sleepDeficitNs += sleepTime; From b80d8e63b6be3306a6db2435b359829fe57356ba Mon Sep 17 00:00:00 2001 From: "Madan, Smarth" Date: Wed, 24 Aug 2016 15:25:24 -0700 Subject: [PATCH 2/3] Fixing smadan domain and tests --- .../appender/AsyncChicagoAppender.java | 33 ++++++++++++------- .../.#AsyncChicagoAppenderTest.java | 1 - .../AsyncChicagoAppenderTest.java | 10 +++--- .../client/AsyncClientFunctionalTest.java | 0 .../client/ChicagoAsyncClientTest.java | 0 .../client/ChicagoClientHandlerTest.java | 0 .../chicago}/client/ChicagoClientTest.java | 0 .../client/ConnectionPoolManagerXTest.java | 0 .../chicago}/client/RendezvousHashTest.java | 0 .../chicago}/client/RequestMuxerTest.java | 0 .../chicago}/client/fixtures/UnitHelp.java | 0 cluster-test/pom.xml | 2 +- .../chicago/ChicagoClientTest.java | 3 +- .../chicago/ReplicationTest.java | 2 +- .../chicago/SanityTest.java | 3 +- .../chicago/TestChicagoCluster.java | 2 +- .../java/com.xjeffrose.chicago/ZkClient.java | 5 +++ 17 files changed, 37 insertions(+), 24 deletions(-) delete mode 120000 appender/src/test/java/com.xjeffrose.chicago.appender/.#AsyncChicagoAppenderTest.java rename client/src/test/java/{ => com/xjeffrose/chicago}/client/AsyncClientFunctionalTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/ChicagoAsyncClientTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/ChicagoClientHandlerTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/ChicagoClientTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/ConnectionPoolManagerXTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/RendezvousHashTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/RequestMuxerTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/fixtures/UnitHelp.java (100%) rename cluster-test/src/main/java/com/{smadan => xjeffrose}/chicago/ChicagoClientTest.java (99%) rename cluster-test/src/main/java/com/{smadan => xjeffrose}/chicago/ReplicationTest.java (99%) rename cluster-test/src/main/java/com/{smadan => xjeffrose}/chicago/SanityTest.java (99%) rename cluster-test/src/main/java/com/{smadan => xjeffrose}/chicago/TestChicagoCluster.java (98%) diff --git a/appender/src/main/java/com/xjeffrose/chicago/appender/AsyncChicagoAppender.java b/appender/src/main/java/com/xjeffrose/chicago/appender/AsyncChicagoAppender.java index 007f923..ae8193b 100644 --- a/appender/src/main/java/com/xjeffrose/chicago/appender/AsyncChicagoAppender.java +++ b/appender/src/main/java/com/xjeffrose/chicago/appender/AsyncChicagoAppender.java @@ -59,19 +59,28 @@ public void activateOptions() { @Override protected void append(LoggingEvent loggingEvent) { - String message = subAppend(loggingEvent); - ListenableFuture chiResp = cs.tsWrite(key.getBytes(), message.getBytes()); - Futures.addCallback(chiResp, new FutureCallback() { - @Override - public void onSuccess(@Nullable byte[] bytes) { - - } - - @Override - public void onFailure(Throwable throwable) { - // TODO(JR): Maybe Try again? + try { + String message = subAppend(loggingEvent); + ListenableFuture chiResp = cs.tsWrite(key.getBytes(), message.getBytes()); + if (chiResp != null) { + Futures.addCallback(chiResp, new FutureCallback() { + @Override + public void onSuccess(@Nullable byte[] bytes) { + + } + + @Override + public void onFailure(Throwable throwable) { + // TODO(JR): Maybe Try again? + } + }); + } else { + //Todo : Maybe try again since the future was null. } - }); + } catch (Exception e){ + e.printStackTrace(); + throw new RuntimeException(e); + } } @Override diff --git a/appender/src/test/java/com.xjeffrose.chicago.appender/.#AsyncChicagoAppenderTest.java b/appender/src/test/java/com.xjeffrose.chicago.appender/.#AsyncChicagoAppenderTest.java deleted file mode 120000 index a3ac087..0000000 --- a/appender/src/test/java/com.xjeffrose.chicago.appender/.#AsyncChicagoAppenderTest.java +++ /dev/null @@ -1 +0,0 @@ -jefrose@lm-chi-21000081.paypalcorp.com.80195 \ No newline at end of file diff --git a/appender/src/test/java/com.xjeffrose.chicago.appender/AsyncChicagoAppenderTest.java b/appender/src/test/java/com.xjeffrose.chicago.appender/AsyncChicagoAppenderTest.java index 9781675..7e2b43e 100644 --- a/appender/src/test/java/com.xjeffrose.chicago.appender/AsyncChicagoAppenderTest.java +++ b/appender/src/test/java/com.xjeffrose.chicago.appender/AsyncChicagoAppenderTest.java @@ -12,15 +12,17 @@ */ public class AsyncChicagoAppenderTest { - //@Test - public void testAppenderNoServer(){ + @Test + public void testAppeclientnderNoServer(){ AsyncChicagoAppender chicagoAppender = new AsyncChicagoAppender(); - chicagoAppender.setChicagoZk("localhost:2181"); + chicagoAppender.setChicagoZk("badIP:2181"); chicagoAppender.setKey("TestKey"); + long start = System.currentTimeMillis(); try { chicagoAppender.activateOptions(); }catch (Exception e){ - e.printStackTrace(); + long timeDifference = System.currentTimeMillis() - start; + assert(timeDifference < 3000); } } diff --git a/client/src/test/java/client/AsyncClientFunctionalTest.java b/client/src/test/java/com/xjeffrose/chicago/client/AsyncClientFunctionalTest.java similarity index 100% rename from client/src/test/java/client/AsyncClientFunctionalTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/AsyncClientFunctionalTest.java diff --git a/client/src/test/java/client/ChicagoAsyncClientTest.java b/client/src/test/java/com/xjeffrose/chicago/client/ChicagoAsyncClientTest.java similarity index 100% rename from client/src/test/java/client/ChicagoAsyncClientTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/ChicagoAsyncClientTest.java diff --git a/client/src/test/java/client/ChicagoClientHandlerTest.java b/client/src/test/java/com/xjeffrose/chicago/client/ChicagoClientHandlerTest.java similarity index 100% rename from client/src/test/java/client/ChicagoClientHandlerTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/ChicagoClientHandlerTest.java diff --git a/client/src/test/java/client/ChicagoClientTest.java b/client/src/test/java/com/xjeffrose/chicago/client/ChicagoClientTest.java similarity index 100% rename from client/src/test/java/client/ChicagoClientTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/ChicagoClientTest.java diff --git a/client/src/test/java/client/ConnectionPoolManagerXTest.java b/client/src/test/java/com/xjeffrose/chicago/client/ConnectionPoolManagerXTest.java similarity index 100% rename from client/src/test/java/client/ConnectionPoolManagerXTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/ConnectionPoolManagerXTest.java diff --git a/client/src/test/java/client/RendezvousHashTest.java b/client/src/test/java/com/xjeffrose/chicago/client/RendezvousHashTest.java similarity index 100% rename from client/src/test/java/client/RendezvousHashTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/RendezvousHashTest.java diff --git a/client/src/test/java/client/RequestMuxerTest.java b/client/src/test/java/com/xjeffrose/chicago/client/RequestMuxerTest.java similarity index 100% rename from client/src/test/java/client/RequestMuxerTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/RequestMuxerTest.java diff --git a/client/src/test/java/client/fixtures/UnitHelp.java b/client/src/test/java/com/xjeffrose/chicago/client/fixtures/UnitHelp.java similarity index 100% rename from client/src/test/java/client/fixtures/UnitHelp.java rename to client/src/test/java/com/xjeffrose/chicago/client/fixtures/UnitHelp.java diff --git a/cluster-test/pom.xml b/cluster-test/pom.xml index 525bd7c..aa0d8b1 100644 --- a/cluster-test/pom.xml +++ b/cluster-test/pom.xml @@ -4,7 +4,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.smadan + com.xjeffrose cluster-test 0.4.0-SNAPSHOT diff --git a/cluster-test/src/main/java/com/smadan/chicago/ChicagoClientTest.java b/cluster-test/src/main/java/com/xjeffrose/chicago/ChicagoClientTest.java similarity index 99% rename from cluster-test/src/main/java/com/smadan/chicago/ChicagoClientTest.java rename to cluster-test/src/main/java/com/xjeffrose/chicago/ChicagoClientTest.java index 97e35ee..c4e53c5 100644 --- a/cluster-test/src/main/java/com/smadan/chicago/ChicagoClientTest.java +++ b/cluster-test/src/main/java/com/xjeffrose/chicago/ChicagoClientTest.java @@ -1,9 +1,8 @@ -package com.smadan.chicago; +package com.xjeffrose.chicago; import com.google.common.primitives.Longs; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import com.xjeffrose.chicago.ChiUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/cluster-test/src/main/java/com/smadan/chicago/ReplicationTest.java b/cluster-test/src/main/java/com/xjeffrose/chicago/ReplicationTest.java similarity index 99% rename from cluster-test/src/main/java/com/smadan/chicago/ReplicationTest.java rename to cluster-test/src/main/java/com/xjeffrose/chicago/ReplicationTest.java index 412d61e..512ff56 100644 --- a/cluster-test/src/main/java/com/smadan/chicago/ReplicationTest.java +++ b/cluster-test/src/main/java/com/xjeffrose/chicago/ReplicationTest.java @@ -1,4 +1,4 @@ -package com.smadan.chicago; +package com.xjeffrose.chicago; import com.google.common.primitives.Longs; import com.jcraft.jsch.Channel; diff --git a/cluster-test/src/main/java/com/smadan/chicago/SanityTest.java b/cluster-test/src/main/java/com/xjeffrose/chicago/SanityTest.java similarity index 99% rename from cluster-test/src/main/java/com/smadan/chicago/SanityTest.java rename to cluster-test/src/main/java/com/xjeffrose/chicago/SanityTest.java index 683fc73..23013f7 100644 --- a/cluster-test/src/main/java/com/smadan/chicago/SanityTest.java +++ b/cluster-test/src/main/java/com/xjeffrose/chicago/SanityTest.java @@ -1,9 +1,8 @@ -package com.smadan.chicago; +package com.xjeffrose.chicago; import com.google.common.primitives.Longs; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import com.xjeffrose.chicago.ChiUtil; import com.xjeffrose.chicago.client.ChicagoAsyncClient; import org.junit.Assert; import org.junit.Before; diff --git a/cluster-test/src/main/java/com/smadan/chicago/TestChicagoCluster.java b/cluster-test/src/main/java/com/xjeffrose/chicago/TestChicagoCluster.java similarity index 98% rename from cluster-test/src/main/java/com/smadan/chicago/TestChicagoCluster.java rename to cluster-test/src/main/java/com/xjeffrose/chicago/TestChicagoCluster.java index 840151c..2f4cd49 100644 --- a/cluster-test/src/main/java/com/smadan/chicago/TestChicagoCluster.java +++ b/cluster-test/src/main/java/com/xjeffrose/chicago/TestChicagoCluster.java @@ -1,4 +1,4 @@ -package com.smadan.chicago; +package com.xjeffrose.chicago; import com.xjeffrose.chicago.ZkClient; import com.xjeffrose.chicago.client.ChicagoAsyncClient; diff --git a/core/src/main/java/com.xjeffrose.chicago/ZkClient.java b/core/src/main/java/com.xjeffrose.chicago/ZkClient.java index ae462e8..6b4bdb5 100755 --- a/core/src/main/java/com.xjeffrose.chicago/ZkClient.java +++ b/core/src/main/java/com.xjeffrose.chicago/ZkClient.java @@ -4,6 +4,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -52,6 +53,10 @@ public boolean isLeader(){ public void start() throws InterruptedException { client.start(); + boolean connected = client.blockUntilConnected(2000, TimeUnit.MILLISECONDS); + if(!connected){ + throw new RuntimeException("Cannot connect to zookeeper"); + } } public void stop() throws Exception { From df602d31aecc7a30dbb7cdd2cbe1b02df312087b Mon Sep 17 00:00:00 2001 From: "Madan, Smarth" Date: Mon, 29 Aug 2016 10:06:18 -0700 Subject: [PATCH 3/3] Minor foxes for negative scenarios. --- .../chicago/client/ChicagoAsyncClient.java | 35 ++++++++-------- .../chicago/client/RequestMuxer.java | 15 +++++-- .../client/AsyncClientFunctionalTest.java | 0 .../client/ChicagoAsyncClientTest.java | 0 .../client/ChicagoClientHandlerTest.java | 0 .../chicago}/client/ChicagoClientTest.java | 0 .../client/ConnectionPoolManagerXTest.java | 0 .../chicago}/client/RendezvousHashTest.java | 0 .../chicago}/client/RequestMuxerTest.java | 41 +++++++++++++++++++ .../chicago}/client/fixtures/UnitHelp.java | 0 cluster-test/pom.xml | 2 +- .../chicago/ChicagoClientTest.java | 3 +- .../chicago/ReplicationTest.java | 2 +- .../chicago/SanityTest.java | 3 +- .../chicago/TestChicagoCluster.java | 2 +- .../java/com.xjeffrose.chicago/ZkClient.java | 5 +++ 16 files changed, 81 insertions(+), 27 deletions(-) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/AsyncClientFunctionalTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/ChicagoAsyncClientTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/ChicagoClientHandlerTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/ChicagoClientTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/ConnectionPoolManagerXTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/RendezvousHashTest.java (100%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/RequestMuxerTest.java (66%) rename client/src/test/java/{ => com/xjeffrose/chicago}/client/fixtures/UnitHelp.java (100%) rename cluster-test/src/main/java/com/{smadan => xjeffrose}/chicago/ChicagoClientTest.java (99%) rename cluster-test/src/main/java/com/{smadan => xjeffrose}/chicago/ReplicationTest.java (99%) rename cluster-test/src/main/java/com/{smadan => xjeffrose}/chicago/SanityTest.java (99%) rename cluster-test/src/main/java/com/{smadan => xjeffrose}/chicago/TestChicagoCluster.java (98%) diff --git a/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java b/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java index beea03e..79f661c 100644 --- a/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java +++ b/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java @@ -7,10 +7,12 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.handler.codec.http2.StreamBufferingEncoder; import io.netty.util.internal.PlatformDependent; import lombok.extern.slf4j.Slf4j; import javax.annotation.Nullable; +import java.nio.channels.ClosedChannelException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; @@ -338,7 +340,6 @@ public ListenableFuture tsWrite(byte[] topic, byte[] key, byte[] val) { SettableFuture f = SettableFuture.create(); futureMap.put(id, f); futureList.add(f); - Futures.addCallback(f, new FutureCallback() { @Override public void onSuccess(@Nullable byte[] bytes) { @@ -358,17 +359,21 @@ public void onSuccess(@Nullable Boolean aBoolean) { @Override public void onFailure(Throwable throwable) { - Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.TS_WRITE, topic, key, val)), new FutureCallback() { - @Override - public void onSuccess(@Nullable Boolean aBoolean) { - - } - - @Override - public void onFailure(Throwable throwable) { - - } - }); + if(throwable instanceof ClosedChannelException){ + f.setException(throwable); + } else { + Futures.addCallback(connectionManager.write(xs, new DefaultChicagoMessage(id, Op.TS_WRITE, topic, key, val)), new FutureCallback() { + @Override + public void onSuccess(@Nullable Boolean aBoolean) { + + } + + @Override + public void onFailure(Throwable throwable) { + f.setException(throwable); + } + }); + } } }); }); @@ -381,11 +386,7 @@ public void onSuccess(@Nullable List bytes) { @Override public void onFailure(Throwable throwable) { - try { - respFuture.set(tsWrite(topic,key, val).get(TIMEOUT, TimeUnit.MILLISECONDS)); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - respFuture.setException(e); - } + respFuture.setException(throwable); } }); diff --git a/client/src/main/java/com/xjeffrose/chicago/client/RequestMuxer.java b/client/src/main/java/com/xjeffrose/chicago/client/RequestMuxer.java index 8649efa..7dd5f5a 100644 --- a/client/src/main/java/com/xjeffrose/chicago/client/RequestMuxer.java +++ b/client/src/main/java/com/xjeffrose/chicago/client/RequestMuxer.java @@ -18,6 +18,7 @@ import javax.annotation.Nullable; import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; import java.util.ArrayList; import java.util.Collections; import java.util.Deque; @@ -94,6 +95,7 @@ public void close() { for(ScheduledFuture f : scheduledFutures){ f.cancel(true); } + stopMessageQ(); } public void shutdownGracefully() { @@ -177,7 +179,7 @@ public void write(T sendReq, SettableFuture f) { } } - private Channel requestNode(){ + public Channel requestNode(){ ChannelFuture cf = connectionQ.removeFirst(); @@ -246,7 +248,7 @@ public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { mm.getF().set(true); } else { - mm.getF().set(false); + //mm.getF().set(false); mm.getF().setException(future.cause()); } } @@ -265,7 +267,7 @@ public void operationComplete(Future future) throws Exception { counter.decrementAndGet(); f.set(true); } else { - f.set(false); + //f.set(false); f.setException(future.cause()); } } @@ -273,6 +275,13 @@ public void operationComplete(Future future) throws Exception { } } + private void stopMessageQ(){ + while(messageQ.peekFirst() != null){ + final MuxedMessage mm = messageQ.pollFirst(); + mm.getF().setException(new ClosedChannelException()); + } + } + @Data private class MuxedMessage { private final T msg; diff --git a/client/src/test/java/client/AsyncClientFunctionalTest.java b/client/src/test/java/com/xjeffrose/chicago/client/AsyncClientFunctionalTest.java similarity index 100% rename from client/src/test/java/client/AsyncClientFunctionalTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/AsyncClientFunctionalTest.java diff --git a/client/src/test/java/client/ChicagoAsyncClientTest.java b/client/src/test/java/com/xjeffrose/chicago/client/ChicagoAsyncClientTest.java similarity index 100% rename from client/src/test/java/client/ChicagoAsyncClientTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/ChicagoAsyncClientTest.java diff --git a/client/src/test/java/client/ChicagoClientHandlerTest.java b/client/src/test/java/com/xjeffrose/chicago/client/ChicagoClientHandlerTest.java similarity index 100% rename from client/src/test/java/client/ChicagoClientHandlerTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/ChicagoClientHandlerTest.java diff --git a/client/src/test/java/client/ChicagoClientTest.java b/client/src/test/java/com/xjeffrose/chicago/client/ChicagoClientTest.java similarity index 100% rename from client/src/test/java/client/ChicagoClientTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/ChicagoClientTest.java diff --git a/client/src/test/java/client/ConnectionPoolManagerXTest.java b/client/src/test/java/com/xjeffrose/chicago/client/ConnectionPoolManagerXTest.java similarity index 100% rename from client/src/test/java/client/ConnectionPoolManagerXTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/ConnectionPoolManagerXTest.java diff --git a/client/src/test/java/client/RendezvousHashTest.java b/client/src/test/java/com/xjeffrose/chicago/client/RendezvousHashTest.java similarity index 100% rename from client/src/test/java/client/RendezvousHashTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/RendezvousHashTest.java diff --git a/client/src/test/java/client/RequestMuxerTest.java b/client/src/test/java/com/xjeffrose/chicago/client/RequestMuxerTest.java similarity index 66% rename from client/src/test/java/client/RequestMuxerTest.java rename to client/src/test/java/com/xjeffrose/chicago/client/RequestMuxerTest.java index fb15be0..67f948c 100644 --- a/client/src/test/java/client/RequestMuxerTest.java +++ b/client/src/test/java/com/xjeffrose/chicago/client/RequestMuxerTest.java @@ -87,4 +87,45 @@ public void onFailure(Throwable t) { latch.await(); } + @Test + public void writeTestWithBadChannel() throws Exception{ + SettableFuture f = SettableFuture.create(); + ChannelFuture cf = mock(ChannelFuture.class); + f.set(cf); + when(connector.connect(new InetSocketAddress("127.0.0.1",12000))).thenReturn(f); + when(cf.isSuccess()).thenReturn(true); + + Channel helper = new EmbeddedChannel(mock(ChannelHandler.class)); + Channel chMock = spy(helper); + ChannelPromise promise = new DefaultChannelPromise(chMock); + promise.setFailure(new RuntimeException("Channel not acitve!!!")); + + when(cf.channel()).thenReturn(chMock); + when(chMock.isWritable()).thenReturn(false); + when(chMock.isActive()).thenReturn(false); + requestMuxer.start(); + ChicagoMessage cm = new DefaultChicagoMessage(UUID.randomUUID(), Op.TS_WRITE,"colFam".getBytes(),null,"val".getBytes()); + SettableFuture f2 = SettableFuture.create(); + DefaultChannelPromise cfmock = mock(DefaultChannelPromise.class); + when(chMock.write(cm)).thenReturn(promise); + when(cfmock.isSuccess()).thenReturn(false); + CountDownLatch latch = new CountDownLatch(1); + Futures.addCallback(f2, new FutureCallback() { + @Override + public void onSuccess(@Nullable Boolean result) { + latch.countDown(); + } + + @Override + public void onFailure(Throwable t) { + latch.countDown(); + } + }); + + + requestMuxer.write(cm, f2); + + latch.await(); + } + } diff --git a/client/src/test/java/client/fixtures/UnitHelp.java b/client/src/test/java/com/xjeffrose/chicago/client/fixtures/UnitHelp.java similarity index 100% rename from client/src/test/java/client/fixtures/UnitHelp.java rename to client/src/test/java/com/xjeffrose/chicago/client/fixtures/UnitHelp.java diff --git a/cluster-test/pom.xml b/cluster-test/pom.xml index 525bd7c..aa0d8b1 100644 --- a/cluster-test/pom.xml +++ b/cluster-test/pom.xml @@ -4,7 +4,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - com.smadan + com.xjeffrose cluster-test 0.4.0-SNAPSHOT diff --git a/cluster-test/src/main/java/com/smadan/chicago/ChicagoClientTest.java b/cluster-test/src/main/java/com/xjeffrose/chicago/ChicagoClientTest.java similarity index 99% rename from cluster-test/src/main/java/com/smadan/chicago/ChicagoClientTest.java rename to cluster-test/src/main/java/com/xjeffrose/chicago/ChicagoClientTest.java index 97e35ee..c4e53c5 100644 --- a/cluster-test/src/main/java/com/smadan/chicago/ChicagoClientTest.java +++ b/cluster-test/src/main/java/com/xjeffrose/chicago/ChicagoClientTest.java @@ -1,9 +1,8 @@ -package com.smadan.chicago; +package com.xjeffrose.chicago; import com.google.common.primitives.Longs; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import com.xjeffrose.chicago.ChiUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; diff --git a/cluster-test/src/main/java/com/smadan/chicago/ReplicationTest.java b/cluster-test/src/main/java/com/xjeffrose/chicago/ReplicationTest.java similarity index 99% rename from cluster-test/src/main/java/com/smadan/chicago/ReplicationTest.java rename to cluster-test/src/main/java/com/xjeffrose/chicago/ReplicationTest.java index 412d61e..512ff56 100644 --- a/cluster-test/src/main/java/com/smadan/chicago/ReplicationTest.java +++ b/cluster-test/src/main/java/com/xjeffrose/chicago/ReplicationTest.java @@ -1,4 +1,4 @@ -package com.smadan.chicago; +package com.xjeffrose.chicago; import com.google.common.primitives.Longs; import com.jcraft.jsch.Channel; diff --git a/cluster-test/src/main/java/com/smadan/chicago/SanityTest.java b/cluster-test/src/main/java/com/xjeffrose/chicago/SanityTest.java similarity index 99% rename from cluster-test/src/main/java/com/smadan/chicago/SanityTest.java rename to cluster-test/src/main/java/com/xjeffrose/chicago/SanityTest.java index 683fc73..23013f7 100644 --- a/cluster-test/src/main/java/com/smadan/chicago/SanityTest.java +++ b/cluster-test/src/main/java/com/xjeffrose/chicago/SanityTest.java @@ -1,9 +1,8 @@ -package com.smadan.chicago; +package com.xjeffrose.chicago; import com.google.common.primitives.Longs; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import com.xjeffrose.chicago.ChiUtil; import com.xjeffrose.chicago.client.ChicagoAsyncClient; import org.junit.Assert; import org.junit.Before; diff --git a/cluster-test/src/main/java/com/smadan/chicago/TestChicagoCluster.java b/cluster-test/src/main/java/com/xjeffrose/chicago/TestChicagoCluster.java similarity index 98% rename from cluster-test/src/main/java/com/smadan/chicago/TestChicagoCluster.java rename to cluster-test/src/main/java/com/xjeffrose/chicago/TestChicagoCluster.java index 840151c..2f4cd49 100644 --- a/cluster-test/src/main/java/com/smadan/chicago/TestChicagoCluster.java +++ b/cluster-test/src/main/java/com/xjeffrose/chicago/TestChicagoCluster.java @@ -1,4 +1,4 @@ -package com.smadan.chicago; +package com.xjeffrose.chicago; import com.xjeffrose.chicago.ZkClient; import com.xjeffrose.chicago.client.ChicagoAsyncClient; diff --git a/core/src/main/java/com.xjeffrose.chicago/ZkClient.java b/core/src/main/java/com.xjeffrose.chicago/ZkClient.java index ae462e8..6b4bdb5 100755 --- a/core/src/main/java/com.xjeffrose.chicago/ZkClient.java +++ b/core/src/main/java/com.xjeffrose.chicago/ZkClient.java @@ -4,6 +4,7 @@ import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -52,6 +53,10 @@ public boolean isLeader(){ public void start() throws InterruptedException { client.start(); + boolean connected = client.blockUntilConnected(2000, TimeUnit.MILLISECONDS); + if(!connected){ + throw new RuntimeException("Cannot connect to zookeeper"); + } } public void stop() throws Exception {