diff --git a/appender/pom.xml b/appender/pom.xml
index 79b2761..a1dd019 100644
--- a/appender/pom.xml
+++ b/appender/pom.xml
@@ -6,7 +6,7 @@
com.xjeffrose
chicago-appender
- 0.4.0-SNAPSHOT
+ 0.5.0-SNAPSHOT
2015
@@ -45,7 +45,7 @@
com.xjeffrose
chicago-client
- 0.4.0-SNAPSHOT
+ 0.5.0-SNAPSHOT
org.slf4j
diff --git a/client/pom.xml b/client/pom.xml
index aebb8b4..d6c1674 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -6,7 +6,7 @@
com.xjeffrose
chicago-client
- 0.4.0-SNAPSHOT
+ 0.5.0-SNAPSHOT
2015
@@ -45,7 +45,7 @@
com.xjeffrose
chicago-core
- 0.4.0-SNAPSHOT
+ 0.5.0-SNAPSHOT
org.slf4j
diff --git a/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java b/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java
index 2d154b0..1079465 100644
--- a/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java
+++ b/client/src/main/java/com/xjeffrose/chicago/client/ChicagoAsyncClient.java
@@ -208,7 +208,7 @@ public ListenableFuture read(byte[] key) {
}
public ListenableFuture read(byte[] colFam, byte[] key) {
- List nodes = getEffectiveNodes(Bytes.concat(colFam, key));
+ List nodes = rendezvousHash.get(Bytes.concat(colFam,key));
UUID id = UUID.randomUUID();
SettableFuture f = SettableFuture.create();
futureMap.put(id, f);
@@ -289,7 +289,7 @@ public void onFailure(Throwable throwable) {
});
}
});
-
+ connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.WRITE, colFam, key, val));
return f;
}
@@ -303,13 +303,13 @@ public ListenableFuture tsWrite(byte[] topic, byte[] val) {
@Override
public void onSuccess(@Nullable byte[] bytes) {
try {
- resp.set(tsWrite(topic, bytes, val).get(400, TimeUnit.MILLISECONDS));
+ resp.set(tsWrite(topic, bytes, val).get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
+ //} catch (TimeoutException e) {
+ // e.printStackTrace();
}
}
@@ -335,14 +335,14 @@ public void onFailure(Throwable throwable) {
});
}
});
-
+ connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.GET_OFFSET, topic, null, null));
return resp;
}
public ListenableFuture tsWrite(byte[] topic, byte[] offset, byte[] val) {
final List> futureList = new ArrayList<>();
final SettableFuture respFuture = SettableFuture.create();
- final List nodes = getEffectiveNodes(Bytes.concat(topic, offset));
+ final List nodes = rendezvousHash.get(Bytes.concat(topic,offset));
if (nodes.size() == 0) {
log.error("Unable to establish Quorum");
return null;
@@ -402,7 +402,7 @@ public void onFailure(Throwable throwable) {
// }
// }
// });
-
+ //connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.TS_WRITE, topic, offset, val));
return f;
}
@@ -458,6 +458,12 @@ public void onFailure(Throwable throwable) {
return f;
}
+ public ListenableFuture deleteColFam(byte[] colFam){
+ SettableFuture f = SettableFuture.create();
+ f.set("true".getBytes());
+ return f;
+ }
+
public void close() {
try {
if (zkClient != null) {
diff --git a/client/src/main/java/com/xjeffrose/chicago/client/ClientNodeWatcher.java b/client/src/main/java/com/xjeffrose/chicago/client/ClientNodeWatcher.java
index b402b33..2792e2f 100644
--- a/client/src/main/java/com/xjeffrose/chicago/client/ClientNodeWatcher.java
+++ b/client/src/main/java/com/xjeffrose/chicago/client/ClientNodeWatcher.java
@@ -125,7 +125,7 @@ public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event)
}
break;
case CONNECTION_RECONNECTED:
- connectionPoolManager.checkConnection();
+ //connectionPoolManager.checkConnection();
default: {
log.info("Zk " + event.getType().name());
}
diff --git a/cluster-test/pom.xml b/cluster-test/pom.xml
index 525bd7c..a32fe49 100644
--- a/cluster-test/pom.xml
+++ b/cluster-test/pom.xml
@@ -6,7 +6,7 @@
com.smadan
cluster-test
- 0.4.0-SNAPSHOT
+ 0.5.0-SNAPSHOT
2015
@@ -39,12 +39,12 @@
com.xjeffrose
chicago-client
- 0.4.0-SNAPSHOT
+ 0.5.0-SNAPSHOT
com.xjeffrose
chicago-appender
- 0.4.0-SNAPSHOT
+ 0.5.0-SNAPSHOT
org.apache.curator
@@ -111,4 +111,16 @@
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 1.8
+ 1.8
+
+
+
+
diff --git a/core/pom.xml b/core/pom.xml
index f0213a2..8a92f92 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -6,7 +6,7 @@
com.xjeffrose
chicago-core
- 0.4.0-SNAPSHOT
+ 0.5.0-SNAPSHOT
2015
diff --git a/client/src/test/java/client/RendezvousHashTest.java b/core/src/test/java/com.xjeffrose.chicago/RendezvousHashTest.java
similarity index 69%
rename from client/src/test/java/client/RendezvousHashTest.java
rename to core/src/test/java/com.xjeffrose.chicago/RendezvousHashTest.java
index 1eace7c..510d48d 100644
--- a/client/src/test/java/client/RendezvousHashTest.java
+++ b/core/src/test/java/com.xjeffrose.chicago/RendezvousHashTest.java
@@ -1,4 +1,4 @@
-package com.xjeffrose.chicago.client;
+package com.xjeffrose.chicago;
import com.google.common.hash.Funnels;
import com.xjeffrose.chicago.RendezvousHash;
@@ -42,4 +42,21 @@ public void get() throws Exception {
}
+ @Test
+ public void HashTestNode0(){
+ List nodeList = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ nodeList.add(("Host" + i));
+ }
+ RendezvousHash rendezvousHash = new RendezvousHash(Funnels.stringFunnel(Charset.defaultCharset()), nodeList, 3);
+
+ String key ="someColFam+someKey";
+ List nodeBefore = rendezvousHash.get(key.getBytes());
+
+ rendezvousHash.add("Host6");
+
+ List nodeAfter = rendezvousHash.get(key.getBytes());
+ assertEquals(nodeBefore.get(0),nodeAfter.get(0));
+ }
+
}
diff --git a/server/config/application.conf b/server/config/application.conf
index 172dc81..f4456e4 100644
--- a/server/config/application.conf
+++ b/server/config/application.conf
@@ -2,7 +2,7 @@ chicago {
application = ${chicago.applicationTemplate} {
settings {
zookeeperCluster = "localhost:2181"
- dbPath = "/var/chicago/"
+ dbPath = "/var/chicago"
quorum = 3
compactionSize = 60GB
databaseMode = true
@@ -15,16 +15,31 @@ chicago {
admin {
settings {
bindHost = 0.0.0.0
+ bindPort = 9991
}
}
stats {
settings {
bindHost = 0.0.0.0
+ bindPort = 9001
}
}
- db {
+ db {
settings {
bindHost = 0.0.0.0
+ bindPort = 12000
+ }
+ }
+ election {
+ settings {
+ bindHost = 0.0.0.0
+ bindPort = 12001
+ }
+ }
+ paxos {
+ settings {
+ bindHost = 0.0.0.0
+ bindPort = 12002
}
}
}
diff --git a/server/pom.xml b/server/pom.xml
index 340ccc2..2ab6ce2 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -6,7 +6,7 @@
com.xjeffrose
chicago-server
- 0.4.0-SNAPSHOT
+ 0.5.0-SNAPSHOT
2015
@@ -45,7 +45,7 @@
com.xjeffrose
chicago-core
- 0.4.0-SNAPSHOT
+ 0.5.0-SNAPSHOT
org.slf4j
@@ -56,7 +56,7 @@
com.xjeffrose
chicago-client
- 0.4.0-SNAPSHOT
+ 0.5.0-SNAPSHOT
com.xjeffrose
diff --git a/server/src/main/java/com/xjeffrose/chicago/ChicagoPaxosClient.java b/server/src/main/java/com/xjeffrose/chicago/ChicagoPaxosClient.java
index fae4a0a..6ee70c8 100644
--- a/server/src/main/java/com/xjeffrose/chicago/ChicagoPaxosClient.java
+++ b/server/src/main/java/com/xjeffrose/chicago/ChicagoPaxosClient.java
@@ -161,14 +161,14 @@ public void onFailure(Throwable throwable) {
});
}
});
+ connectionManager.write(nodes.get(finalI), new DefaultChicagoMessage(id, Op.MPAXOS_PROPOSE_WRITE, colFam, key, val));
}
-
return Futures.allAsList(futureList);
}
public ListenableFuture> tsWrite(byte[] topic, byte[] offset, byte[] val) {
- List nodes = getEffectiveNodes(Bytes.concat(topic, offset));
+ List nodes = rendezvousHash.get(Bytes.concat(topic, offset));
final List> futureList = new ArrayList<>();
for (int i = 1; i < replicaSize; i++) {
@@ -199,6 +199,7 @@ public void onFailure(Throwable throwable) {
});
}
});
+ connectionManager.write(nodes.get(finalI), new DefaultChicagoMessage(id, Op.MPAXOS_PROPOSE_TS_WRITE, topic, offset, val));
}
return Futures.allAsList(futureList);
diff --git a/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java b/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java
index bbd91bb..1336206 100644
--- a/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java
+++ b/server/src/main/java/com/xjeffrose/chicago/server/ChiConfig.java
@@ -12,7 +12,8 @@
public class ChiConfig {
@Getter
private final String zkHosts;
- private Config conf;
+ @Getter
+ public Config conf;
// private Map channelStats;
@Getter
private String dbPath;
diff --git a/server/src/main/java/com/xjeffrose/chicago/server/ChicagoDBHandler.java b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoDBHandler.java
index 86f939a..517ad42 100644
--- a/server/src/main/java/com/xjeffrose/chicago/server/ChicagoDBHandler.java
+++ b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoDBHandler.java
@@ -35,9 +35,9 @@ public class ChicagoDBHandler extends SimpleChannelInboundHandler offset = PlatformDependent.newConcurrentHashMap();
- private final Map q = PlatformDependent.newConcurrentHashMap();
- private final Map> sessionCoordinator = PlatformDependent.newConcurrentHashMap();
- private final Map qCount = PlatformDependent.newConcurrentHashMap();
+// private final Map q = PlatformDependent.newConcurrentHashMap();
+// private final Map> sessionCoordinator = PlatformDependent.newConcurrentHashMap();
+// private final Map qCount = PlatformDependent.newConcurrentHashMap();
public ChicagoDBHandler(DBManager db, ChicagoPaxosClient paxosClient) {
@@ -256,6 +256,7 @@ public void operationComplete(ChannelFuture future) {
}
}
};
+ log.info(ctx.toString() + " received msg="+ msg.toString());
switch (msg.getOp()) {
case READ:
@@ -330,21 +331,21 @@ private void handleGettingOffset(ChannelHandlerContext ctx, ChicagoMessage msg,
if (offset.containsKey(new String(msg.getColFam()))) {
// This offset has been written to all members of the replica set
// if (qCount.get(new String(msg.getColFam())).get() == q.get(new String(msg.getColFam()))) {
- if (sessionCoordinator.containsKey(new String(msg.getColFam()))) {
-
- } else {
- sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap());
- sessionCoordinator.get(new String(msg.getKey())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).incrementAndGet());
- }
+// if (sessionCoordinator.containsKey(new String(msg.getColFam()))) {
+//
+// } else {
+// sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap());
+// sessionCoordinator.get(new String(msg.getKey())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).incrementAndGet());
+// }
ctx.writeAndFlush(new DefaultChicagoMessage(
msg.getId(),
Op.RESPONSE,
msg.getColFam(),
- null,
+ Boolean.toString(true).getBytes(),
Longs.toByteArray(offset.get(new String(msg.getColFam())).incrementAndGet()))).addListener(writeComplete);
- qCount.get(new String(msg.getColFam())).incrementAndGet();
- sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get());
+ //qCount.get(new String(msg.getColFam())).incrementAndGet();
+ //sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get());
// } else {
// // Return current offset to member of the replica set
@@ -361,19 +362,19 @@ private void handleGettingOffset(ChannelHandlerContext ctx, ChicagoMessage msg,
} else {
// Create the offset for the ColFam on first message (ColFam Create)
- offset.put(new String(msg.getColFam()), new AtomicLong());
- q.put(new String(msg.getColFam()), Ints.fromByteArray(msg.getVal()));
- qCount.put(new String(msg.getColFam()), new AtomicInteger());
- sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap());
+ offset.put(new String(msg.getColFam()), new AtomicLong(0));
+// q.put(new String(msg.getColFam()), Longs.fromByteArray(msg.getVal()));
+// qCount.put(new String(msg.getColFam()), new AtomicInteger(0));
+// sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap());
ctx.writeAndFlush(new DefaultChicagoMessage(
msg.getId(),
Op.RESPONSE,
msg.getColFam(),
- null,
+ Boolean.toString(true).getBytes(),
Longs.toByteArray(offset.get(new String(msg.getColFam())).get()))).addListener(writeComplete);
- qCount.get(new String(msg.getColFam())).incrementAndGet();
- sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get());
+ //qCount.get(new String(msg.getColFam())).incrementAndGet();
+ //sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get());
}
}
diff --git a/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java
index bc611f8..f154f74 100755
--- a/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java
+++ b/server/src/main/java/com/xjeffrose/chicago/server/ChicagoServer.java
@@ -38,7 +38,7 @@ public ChicagoServer(ChiConfig config) {
this.db = getStorageProvider(config);
this.nodeWatcher = new NodeWatcher(NODE_LIST_PATH, NODE_LOCK_PATH, config.getQuorum());
this.paxosClient = new ChicagoPaxosClient(config.getZkHosts(), config.getReplicaSize());
- this.dbRouter = new DBRouter(db, paxosClient);
+ this.dbRouter = new DBRouter(db, paxosClient, config.conf);
}
private StorageProvider getStorageProvider(ChiConfig config) {
diff --git a/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java b/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java
index 1a84b39..1a7846d 100755
--- a/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java
+++ b/server/src/main/java/com/xjeffrose/chicago/server/DBRouter.java
@@ -1,5 +1,7 @@
package com.xjeffrose.chicago.server;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import com.xjeffrose.chicago.ChicagoPaxosClient;
import com.xjeffrose.chicago.db.DBManager;
import com.xjeffrose.chicago.db.StorageProvider;
@@ -30,10 +32,11 @@ public class DBRouter implements Closeable {
private final Map q;
private final Map> sessionCoordinator;
private final Map qCount;
+ private final Config conf;
private Application application;
- public DBRouter(StorageProvider db, ChicagoPaxosClient paxosClient) {
+ public DBRouter(StorageProvider db, ChicagoPaxosClient paxosClient, Config conf) {
this.db = db;
this.manager = new DBManager(db);
this.handler = new ChicagoDBHandler(manager, paxosClient);
@@ -42,6 +45,7 @@ public DBRouter(StorageProvider db, ChicagoPaxosClient paxosClient) {
this.sessionCoordinator = PlatformDependent.newConcurrentHashMap();
this.qCount = PlatformDependent.newConcurrentHashMap();
this.chicagoPaxosHandler = new ChicagoPaxosHandler(offset, q, sessionCoordinator, qCount);
+ this.conf = conf;
}
private ChicagoServerPipeline buildDbPipeline() {
@@ -66,7 +70,7 @@ public ChannelHandler getApplicationHandler() {
public void run() {
manager.startAsync().awaitRunning();
- application = new ApplicationBootstrap("chicago.application")
+ application = new ApplicationBootstrap(conf)
.addServer("admin", (bs) -> bs.addToPipeline(new XioSslHttp1_1Pipeline()))
.addServer("stats", (bs) -> bs.addToPipeline(new XioSslHttp1_1Pipeline()))
.addServer("db", (bs) -> bs.addToPipeline(buildDbPipeline()))