From f1b1103c44180e3a852edebcd96e9ebb4fda57a0 Mon Sep 17 00:00:00 2001 From: "Madan, Smarth" Date: Tue, 30 Aug 2016 14:48:03 -0700 Subject: [PATCH 1/2] Fixing tests --- appender/pom.xml | 2 +- client/pom.xml | 2 +- .../chicago/client/ChicagoAsyncClient.java | 16 +++++--- .../chicago/client/ClientNodeWatcher.java | 2 +- core/pom.xml | 2 +- .../RendezvousHashTest.java | 19 ++++++++- server/config/application.conf | 19 ++++++++- server/pom.xml | 6 +-- .../xjeffrose/chicago/ChicagoPaxosClient.java | 5 ++- .../xjeffrose/chicago/server/ChiConfig.java | 3 +- .../chicago/server/ChicagoDBHandler.java | 39 ++++++++++--------- .../chicago/server/ChicagoServer.java | 2 +- .../xjeffrose/chicago/server/DBRouter.java | 8 +++- 13 files changed, 85 insertions(+), 40 deletions(-) rename {client/src/test/java/client => core/src/test/java/com.xjeffrose.chicago}/RendezvousHashTest.java (69%) diff --git a/appender/pom.xml b/appender/pom.xml index 79b2761..cb2fdd0 100644 --- a/appender/pom.xml +++ b/appender/pom.xml @@ -6,7 +6,7 @@ com.xjeffrose chicago-appender - 0.4.0-SNAPSHOT + 0.5.0-SNAPSHOT 2015 diff --git a/client/pom.xml b/client/pom.xml index aebb8b4..9acb3cf 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -6,7 +6,7 @@ com.xjeffrose chicago-client - 0.4.0-SNAPSHOT + 0.5.0-SNAPSHOT 2015 diff --git a/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java b/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java index 2d154b0..4fa088d 100644 --- a/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java +++ b/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java @@ -208,7 +208,7 @@ public ListenableFuture read(byte[] key) { } public ListenableFuture read(byte[] colFam, byte[] key) { - List nodes = getEffectiveNodes(Bytes.concat(colFam, key)); + List nodes = rendezvousHash.get(Bytes.concat(colFam,key)); UUID id = UUID.randomUUID(); SettableFuture f = SettableFuture.create(); futureMap.put(id, f); @@ -289,7 +289,7 @@ public void onFailure(Throwable throwable) { }); } }); - + connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.WRITE, colFam, key, val)); return f; } @@ -335,14 +335,14 @@ public void onFailure(Throwable throwable) { }); } }); - + connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.GET_OFFSET, topic, null, null)); return resp; } public ListenableFuture tsWrite(byte[] topic, byte[] offset, byte[] val) { final List> futureList = new ArrayList<>(); final SettableFuture respFuture = SettableFuture.create(); - final List nodes = getEffectiveNodes(Bytes.concat(topic, offset)); + final List nodes = rendezvousHash.get(Bytes.concat(topic,offset)); if (nodes.size() == 0) { log.error("Unable to establish Quorum"); return null; @@ -402,7 +402,7 @@ public void onFailure(Throwable throwable) { // } // } // }); - + //connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.TS_WRITE, topic, offset, val)); return f; } @@ -458,6 +458,12 @@ public void onFailure(Throwable throwable) { return f; } + public ListenableFuture deleteColFam(byte[] colFam){ + SettableFuture f = SettableFuture.create(); + f.set("true".getBytes()); + return f; + } + public void close() { try { if (zkClient != null) { diff --git a/client/src/main/java/com/xjeffrose/chicago/client/ClientNodeWatcher.java b/client/src/main/java/com/xjeffrose/chicago/client/ClientNodeWatcher.java index b402b33..2792e2f 100644 --- a/client/src/main/java/com/xjeffrose/chicago/client/ClientNodeWatcher.java +++ b/client/src/main/java/com/xjeffrose/chicago/client/ClientNodeWatcher.java @@ -125,7 +125,7 @@ public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event) } break; case CONNECTION_RECONNECTED: - connectionPoolManager.checkConnection(); + //connectionPoolManager.checkConnection(); default: { log.info("Zk " + event.getType().name()); } diff --git a/core/pom.xml b/core/pom.xml index f0213a2..8a92f92 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -6,7 +6,7 @@ com.xjeffrose chicago-core - 0.4.0-SNAPSHOT + 0.5.0-SNAPSHOT 2015 diff --git a/client/src/test/java/client/RendezvousHashTest.java b/core/src/test/java/com.xjeffrose.chicago/RendezvousHashTest.java similarity index 69% rename from client/src/test/java/client/RendezvousHashTest.java rename to core/src/test/java/com.xjeffrose.chicago/RendezvousHashTest.java index 1eace7c..510d48d 100644 --- a/client/src/test/java/client/RendezvousHashTest.java +++ b/core/src/test/java/com.xjeffrose.chicago/RendezvousHashTest.java @@ -1,4 +1,4 @@ -package com.xjeffrose.chicago.client; +package com.xjeffrose.chicago; import com.google.common.hash.Funnels; import com.xjeffrose.chicago.RendezvousHash; @@ -42,4 +42,21 @@ public void get() throws Exception { } + @Test + public void HashTestNode0(){ + List nodeList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + nodeList.add(("Host" + i)); + } + RendezvousHash rendezvousHash = new RendezvousHash(Funnels.stringFunnel(Charset.defaultCharset()), nodeList, 3); + + String key ="someColFam+someKey"; + List nodeBefore = rendezvousHash.get(key.getBytes()); + + rendezvousHash.add("Host6"); + + List nodeAfter = rendezvousHash.get(key.getBytes()); + assertEquals(nodeBefore.get(0),nodeAfter.get(0)); + } + } diff --git a/server/config/application.conf b/server/config/application.conf index 172dc81..f4456e4 100644 --- a/server/config/application.conf +++ b/server/config/application.conf @@ -2,7 +2,7 @@ chicago { application = ${chicago.applicationTemplate} { settings { zookeeperCluster = "localhost:2181" - dbPath = "/var/chicago/" + dbPath = "/var/chicago" quorum = 3 compactionSize = 60GB databaseMode = true @@ -15,16 +15,31 @@ chicago { admin { settings { bindHost = 0.0.0.0 + bindPort = 9991 } } stats { settings { bindHost = 0.0.0.0 + bindPort = 9001 } } - db { + db { settings { bindHost = 0.0.0.0 + bindPort = 12000 + } + } + election { + settings { + bindHost = 0.0.0.0 + bindPort = 12001 + } + } + paxos { + settings { + bindHost = 0.0.0.0 + bindPort = 12002 } } } diff --git a/server/pom.xml b/server/pom.xml index 340ccc2..2ab6ce2 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -6,7 +6,7 @@ com.xjeffrose chicago-server - 0.4.0-SNAPSHOT + 0.5.0-SNAPSHOT 2015 @@ -45,7 +45,7 @@ com.xjeffrose chicago-core - 0.4.0-SNAPSHOT + 0.5.0-SNAPSHOT org.slf4j @@ -56,7 +56,7 @@ com.xjeffrose chicago-client - 0.4.0-SNAPSHOT + 0.5.0-SNAPSHOT com.xjeffrose diff --git a/server/src/main/java/com/xjeffrose/chicago/ChicagoPaxosClient.java b/server/src/main/java/com/xjeffrose/chicago/ChicagoPaxosClient.java index fae4a0a..6ee70c8 100644 --- a/server/src/main/java/com/xjeffrose/chicago/ChicagoPaxosClient.java +++ b/server/src/main/java/com/xjeffrose/chicago/ChicagoPaxosClient.java @@ -161,14 +161,14 @@ public void onFailure(Throwable throwable) { }); } }); + connectionManager.write(nodes.get(finalI), new DefaultChicagoMessage(id, Op.MPAXOS_PROPOSE_WRITE, colFam, key, val)); } - return Futures.allAsList(futureList); } public ListenableFuture> tsWrite(byte[] topic, byte[] offset, byte[] val) { - List nodes = getEffectiveNodes(Bytes.concat(topic, offset)); + List nodes = rendezvousHash.get(Bytes.concat(topic, offset)); final List> futureList = new ArrayList<>(); for (int i = 1; i < replicaSize; i++) { @@ -199,6 +199,7 @@ public void onFailure(Throwable throwable) { }); } }); + connectionManager.write(nodes.get(finalI), new DefaultChicagoMessage(id, Op.MPAXOS_PROPOSE_TS_WRITE, topic, offset, val)); } return Futures.allAsList(futureList); diff --git a/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java b/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java index bbd91bb..1336206 100644 --- a/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java +++ b/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java @@ -12,7 +12,8 @@ public class ChiConfig { @Getter private final String zkHosts; - private Config conf; + @Getter + public Config conf; // private Map channelStats; @Getter private String dbPath; diff --git a/server/src/main/java/com/xjeffrose/chicago/server/ChicagoDBHandler.java b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoDBHandler.java index 86f939a..517ad42 100644 --- a/server/src/main/java/com/xjeffrose/chicago/server/ChicagoDBHandler.java +++ b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoDBHandler.java @@ -35,9 +35,9 @@ public class ChicagoDBHandler extends SimpleChannelInboundHandler offset = PlatformDependent.newConcurrentHashMap(); - private final Map q = PlatformDependent.newConcurrentHashMap(); - private final Map> sessionCoordinator = PlatformDependent.newConcurrentHashMap(); - private final Map qCount = PlatformDependent.newConcurrentHashMap(); +// private final Map q = PlatformDependent.newConcurrentHashMap(); +// private final Map> sessionCoordinator = PlatformDependent.newConcurrentHashMap(); +// private final Map qCount = PlatformDependent.newConcurrentHashMap(); public ChicagoDBHandler(DBManager db, ChicagoPaxosClient paxosClient) { @@ -256,6 +256,7 @@ public void operationComplete(ChannelFuture future) { } } }; + log.info(ctx.toString() + " received msg="+ msg.toString()); switch (msg.getOp()) { case READ: @@ -330,21 +331,21 @@ private void handleGettingOffset(ChannelHandlerContext ctx, ChicagoMessage msg, if (offset.containsKey(new String(msg.getColFam()))) { // This offset has been written to all members of the replica set // if (qCount.get(new String(msg.getColFam())).get() == q.get(new String(msg.getColFam()))) { - if (sessionCoordinator.containsKey(new String(msg.getColFam()))) { - - } else { - sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap()); - sessionCoordinator.get(new String(msg.getKey())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).incrementAndGet()); - } +// if (sessionCoordinator.containsKey(new String(msg.getColFam()))) { +// +// } else { +// sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap()); +// sessionCoordinator.get(new String(msg.getKey())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).incrementAndGet()); +// } ctx.writeAndFlush(new DefaultChicagoMessage( msg.getId(), Op.RESPONSE, msg.getColFam(), - null, + Boolean.toString(true).getBytes(), Longs.toByteArray(offset.get(new String(msg.getColFam())).incrementAndGet()))).addListener(writeComplete); - qCount.get(new String(msg.getColFam())).incrementAndGet(); - sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get()); + //qCount.get(new String(msg.getColFam())).incrementAndGet(); + //sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get()); // } else { // // Return current offset to member of the replica set @@ -361,19 +362,19 @@ private void handleGettingOffset(ChannelHandlerContext ctx, ChicagoMessage msg, } else { // Create the offset for the ColFam on first message (ColFam Create) - offset.put(new String(msg.getColFam()), new AtomicLong()); - q.put(new String(msg.getColFam()), Ints.fromByteArray(msg.getVal())); - qCount.put(new String(msg.getColFam()), new AtomicInteger()); - sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap()); + offset.put(new String(msg.getColFam()), new AtomicLong(0)); +// q.put(new String(msg.getColFam()), Longs.fromByteArray(msg.getVal())); +// qCount.put(new String(msg.getColFam()), new AtomicInteger(0)); +// sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap()); ctx.writeAndFlush(new DefaultChicagoMessage( msg.getId(), Op.RESPONSE, msg.getColFam(), - null, + Boolean.toString(true).getBytes(), Longs.toByteArray(offset.get(new String(msg.getColFam())).get()))).addListener(writeComplete); - qCount.get(new String(msg.getColFam())).incrementAndGet(); - sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get()); + //qCount.get(new String(msg.getColFam())).incrementAndGet(); + //sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get()); } } diff --git a/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java index bc611f8..f154f74 100755 --- a/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java +++ b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java @@ -38,7 +38,7 @@ public ChicagoServer(ChiConfig config) { this.db = getStorageProvider(config); this.nodeWatcher = new NodeWatcher(NODE_LIST_PATH, NODE_LOCK_PATH, config.getQuorum()); this.paxosClient = new ChicagoPaxosClient(config.getZkHosts(), config.getReplicaSize()); - this.dbRouter = new DBRouter(db, paxosClient); + this.dbRouter = new DBRouter(db, paxosClient, config.conf); } private StorageProvider getStorageProvider(ChiConfig config) { diff --git a/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java b/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java index 1a84b39..1a7846d 100755 --- a/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java +++ b/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java @@ -1,5 +1,7 @@ package com.xjeffrose.chicago.server; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import com.xjeffrose.chicago.ChicagoPaxosClient; import com.xjeffrose.chicago.db.DBManager; import com.xjeffrose.chicago.db.StorageProvider; @@ -30,10 +32,11 @@ public class DBRouter implements Closeable { private final Map q; private final Map> sessionCoordinator; private final Map qCount; + private final Config conf; private Application application; - public DBRouter(StorageProvider db, ChicagoPaxosClient paxosClient) { + public DBRouter(StorageProvider db, ChicagoPaxosClient paxosClient, Config conf) { this.db = db; this.manager = new DBManager(db); this.handler = new ChicagoDBHandler(manager, paxosClient); @@ -42,6 +45,7 @@ public DBRouter(StorageProvider db, ChicagoPaxosClient paxosClient) { this.sessionCoordinator = PlatformDependent.newConcurrentHashMap(); this.qCount = PlatformDependent.newConcurrentHashMap(); this.chicagoPaxosHandler = new ChicagoPaxosHandler(offset, q, sessionCoordinator, qCount); + this.conf = conf; } private ChicagoServerPipeline buildDbPipeline() { @@ -66,7 +70,7 @@ public ChannelHandler getApplicationHandler() { public void run() { manager.startAsync().awaitRunning(); - application = new ApplicationBootstrap("chicago.application") + application = new ApplicationBootstrap(conf) .addServer("admin", (bs) -> bs.addToPipeline(new XioSslHttp1_1Pipeline())) .addServer("stats", (bs) -> bs.addToPipeline(new XioSslHttp1_1Pipeline())) .addServer("db", (bs) -> bs.addToPipeline(buildDbPipeline())) From 386e8ee4290cd742549be69456afaddeb39fed62 Mon Sep 17 00:00:00 2001 From: "Madan, Smarth" Date: Tue, 30 Aug 2016 16:06:09 -0700 Subject: [PATCH 2/2] version fixes --- appender/pom.xml | 2 +- client/pom.xml | 2 +- .../chicago/client/ChicagoAsyncClient.java | 6 +++--- cluster-test/pom.xml | 18 +++++++++++++++--- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/appender/pom.xml b/appender/pom.xml index cb2fdd0..a1dd019 100644 --- a/appender/pom.xml +++ b/appender/pom.xml @@ -45,7 +45,7 @@ com.xjeffrose chicago-client - 0.4.0-SNAPSHOT + 0.5.0-SNAPSHOT org.slf4j diff --git a/client/pom.xml b/client/pom.xml index 9acb3cf..d6c1674 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -45,7 +45,7 @@ com.xjeffrose chicago-core - 0.4.0-SNAPSHOT + 0.5.0-SNAPSHOT org.slf4j diff --git a/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java b/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java index 4fa088d..1079465 100644 --- a/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java +++ b/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java @@ -303,13 +303,13 @@ public ListenableFuture tsWrite(byte[] topic, byte[] val) { @Override public void onSuccess(@Nullable byte[] bytes) { try { - resp.set(tsWrite(topic, bytes, val).get(400, TimeUnit.MILLISECONDS)); + resp.set(tsWrite(topic, bytes, val).get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); - } catch (TimeoutException e) { - e.printStackTrace(); + //} catch (TimeoutException e) { + // e.printStackTrace(); } } diff --git a/cluster-test/pom.xml b/cluster-test/pom.xml index 525bd7c..a32fe49 100644 --- a/cluster-test/pom.xml +++ b/cluster-test/pom.xml @@ -6,7 +6,7 @@ com.smadan cluster-test - 0.4.0-SNAPSHOT + 0.5.0-SNAPSHOT 2015 @@ -39,12 +39,12 @@ com.xjeffrose chicago-client - 0.4.0-SNAPSHOT + 0.5.0-SNAPSHOT com.xjeffrose chicago-appender - 0.4.0-SNAPSHOT + 0.5.0-SNAPSHOT org.apache.curator @@ -111,4 +111,16 @@ + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + +