Skip to content
This repository was archived by the owner on Aug 7, 2018. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions appender/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.xjeffrose</groupId>
<artifactId>chicago-appender</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>

<inceptionYear>2015</inceptionYear>

Expand Down Expand Up @@ -45,7 +45,7 @@
<dependency>
<groupId>com.xjeffrose</groupId>
<artifactId>chicago-client</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
4 changes: 2 additions & 2 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.xjeffrose</groupId>
<artifactId>chicago-client</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>

<inceptionYear>2015</inceptionYear>

Expand Down Expand Up @@ -45,7 +45,7 @@
<dependency>
<groupId>com.xjeffrose</groupId>
<artifactId>chicago-core</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public ListenableFuture<byte[]> read(byte[] key) {
}

public ListenableFuture<byte[]> read(byte[] colFam, byte[] key) {
List<String> nodes = getEffectiveNodes(Bytes.concat(colFam, key));
List<String> nodes = rendezvousHash.get(Bytes.concat(colFam,key));
UUID id = UUID.randomUUID();
SettableFuture<byte[]> f = SettableFuture.create();
futureMap.put(id, f);
Expand Down Expand Up @@ -289,7 +289,7 @@ public void onFailure(Throwable throwable) {
});
}
});

connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.WRITE, colFam, key, val));
return f;
}

Expand All @@ -303,13 +303,13 @@ public ListenableFuture<byte[]> tsWrite(byte[] topic, byte[] val) {
@Override
public void onSuccess(@Nullable byte[] bytes) {
try {
resp.set(tsWrite(topic, bytes, val).get(400, TimeUnit.MILLISECONDS));
resp.set(tsWrite(topic, bytes, val).get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
//} catch (TimeoutException e) {
// e.printStackTrace();
}
}

Expand All @@ -335,14 +335,14 @@ public void onFailure(Throwable throwable) {
});
}
});

connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.GET_OFFSET, topic, null, null));
return resp;
}

public ListenableFuture<byte[]> tsWrite(byte[] topic, byte[] offset, byte[] val) {
final List<SettableFuture<byte[]>> futureList = new ArrayList<>();
final SettableFuture<byte[]> respFuture = SettableFuture.create();
final List<String> nodes = getEffectiveNodes(Bytes.concat(topic, offset));
final List<String> nodes = rendezvousHash.get(Bytes.concat(topic,offset));
if (nodes.size() == 0) {
log.error("Unable to establish Quorum");
return null;
Expand Down Expand Up @@ -402,7 +402,7 @@ public void onFailure(Throwable throwable) {
// }
// }
// });

//connectionManager.write(nodes.get(0), new DefaultChicagoMessage(id, Op.TS_WRITE, topic, offset, val));
return f;
}

Expand Down Expand Up @@ -458,6 +458,12 @@ public void onFailure(Throwable throwable) {
return f;
}

public ListenableFuture<byte[]> deleteColFam(byte[] colFam){
SettableFuture<byte[]> f = SettableFuture.create();
f.set("true".getBytes());
return f;
}

public void close() {
try {
if (zkClient != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent event)
}
break;
case CONNECTION_RECONNECTED:
connectionPoolManager.checkConnection();
//connectionPoolManager.checkConnection();
default: {
log.info("Zk " + event.getType().name());
}
Expand Down
18 changes: 15 additions & 3 deletions cluster-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.smadan</groupId>
<artifactId>cluster-test</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>

<inceptionYear>2015</inceptionYear>

Expand Down Expand Up @@ -39,12 +39,12 @@
<dependency>
<groupId>com.xjeffrose</groupId>
<artifactId>chicago-client</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.xjeffrose</groupId>
<artifactId>chicago-appender</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
Expand Down Expand Up @@ -111,4 +111,16 @@
</dependency>

</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.xjeffrose</groupId>
<artifactId>chicago-core</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>

<inceptionYear>2015</inceptionYear>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.xjeffrose.chicago.client;
package com.xjeffrose.chicago;

import com.google.common.hash.Funnels;
import com.xjeffrose.chicago.RendezvousHash;
Expand Down Expand Up @@ -42,4 +42,21 @@ public void get() throws Exception {

}

@Test
public void HashTestNode0(){
List<String> nodeList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
nodeList.add(("Host" + i));
}
RendezvousHash rendezvousHash = new RendezvousHash(Funnels.stringFunnel(Charset.defaultCharset()), nodeList, 3);

String key ="someColFam+someKey";
List<String> nodeBefore = rendezvousHash.get(key.getBytes());

rendezvousHash.add("Host6");

List<String> nodeAfter = rendezvousHash.get(key.getBytes());
assertEquals(nodeBefore.get(0),nodeAfter.get(0));
}

}
19 changes: 17 additions & 2 deletions server/config/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ chicago {
application = ${chicago.applicationTemplate} {
settings {
zookeeperCluster = "localhost:2181"
dbPath = "/var/chicago/"
dbPath = "/var/chicago"
quorum = 3
compactionSize = 60GB
databaseMode = true
Expand All @@ -15,16 +15,31 @@ chicago {
admin {
settings {
bindHost = 0.0.0.0
bindPort = 9991
}
}
stats {
settings {
bindHost = 0.0.0.0
bindPort = 9001
}
}
db {
db {
settings {
bindHost = 0.0.0.0
bindPort = 12000
}
}
election {
settings {
bindHost = 0.0.0.0
bindPort = 12001
}
}
paxos {
settings {
bindHost = 0.0.0.0
bindPort = 12002
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.xjeffrose</groupId>
<artifactId>chicago-server</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>

<inceptionYear>2015</inceptionYear>

Expand Down Expand Up @@ -45,7 +45,7 @@
<dependency>
<groupId>com.xjeffrose</groupId>
<artifactId>chicago-core</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
Expand All @@ -56,7 +56,7 @@
<dependency>
<groupId>com.xjeffrose</groupId>
<artifactId>chicago-client</artifactId>
<version>0.4.0-SNAPSHOT</version>
<version>0.5.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>com.xjeffrose</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,14 @@ public void onFailure(Throwable throwable) {
});
}
});
connectionManager.write(nodes.get(finalI), new DefaultChicagoMessage(id, Op.MPAXOS_PROPOSE_WRITE, colFam, key, val));
}

return Futures.allAsList(futureList);

}

public ListenableFuture<List<byte[]>> tsWrite(byte[] topic, byte[] offset, byte[] val) {
List<String> nodes = getEffectiveNodes(Bytes.concat(topic, offset));
List<String> nodes = rendezvousHash.get(Bytes.concat(topic, offset));
final List<SettableFuture<byte[]>> futureList = new ArrayList<>();
for (int i = 1; i < replicaSize; i++) {

Expand Down Expand Up @@ -199,6 +199,7 @@ public void onFailure(Throwable throwable) {
});
}
});
connectionManager.write(nodes.get(finalI), new DefaultChicagoMessage(id, Op.MPAXOS_PROPOSE_TS_WRITE, topic, offset, val));
}

return Futures.allAsList(futureList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
public class ChiConfig {
@Getter
private final String zkHosts;
private Config conf;
@Getter
public Config conf;
// private Map<XioServerDef, XioMetrics> channelStats;
@Getter
private String dbPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public class ChicagoDBHandler extends SimpleChannelInboundHandler<ChicagoMessage
private final DBManager db;
private final ChicagoPaxosClient paxosClient;
private final Map<String, AtomicLong> offset = PlatformDependent.newConcurrentHashMap();
private final Map<String, Integer> q = PlatformDependent.newConcurrentHashMap();
private final Map<String, Map<String, Long>> sessionCoordinator = PlatformDependent.newConcurrentHashMap();
private final Map<String, AtomicInteger> qCount = PlatformDependent.newConcurrentHashMap();
// private final Map<String, Long> q = PlatformDependent.newConcurrentHashMap();
// private final Map<String, Map<String, Long>> sessionCoordinator = PlatformDependent.newConcurrentHashMap();
// private final Map<String, AtomicInteger> qCount = PlatformDependent.newConcurrentHashMap();


public ChicagoDBHandler(DBManager db, ChicagoPaxosClient paxosClient) {
Expand Down Expand Up @@ -256,6 +256,7 @@ public void operationComplete(ChannelFuture future) {
}
}
};
log.info(ctx.toString() + " received msg="+ msg.toString());

switch (msg.getOp()) {
case READ:
Expand Down Expand Up @@ -330,21 +331,21 @@ private void handleGettingOffset(ChannelHandlerContext ctx, ChicagoMessage msg,
if (offset.containsKey(new String(msg.getColFam()))) {
// This offset has been written to all members of the replica set
// if (qCount.get(new String(msg.getColFam())).get() == q.get(new String(msg.getColFam()))) {
if (sessionCoordinator.containsKey(new String(msg.getColFam()))) {

} else {
sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap());
sessionCoordinator.get(new String(msg.getKey())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).incrementAndGet());
}
// if (sessionCoordinator.containsKey(new String(msg.getColFam()))) {
//
// } else {
// sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap());
// sessionCoordinator.get(new String(msg.getKey())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).incrementAndGet());
// }

ctx.writeAndFlush(new DefaultChicagoMessage(
msg.getId(),
Op.RESPONSE,
msg.getColFam(),
null,
Boolean.toString(true).getBytes(),
Longs.toByteArray(offset.get(new String(msg.getColFam())).incrementAndGet()))).addListener(writeComplete);
qCount.get(new String(msg.getColFam())).incrementAndGet();
sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get());
//qCount.get(new String(msg.getColFam())).incrementAndGet();
//sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get());

// } else {
// // Return current offset to member of the replica set
Expand All @@ -361,19 +362,19 @@ private void handleGettingOffset(ChannelHandlerContext ctx, ChicagoMessage msg,

} else {
// Create the offset for the ColFam on first message (ColFam Create)
offset.put(new String(msg.getColFam()), new AtomicLong());
q.put(new String(msg.getColFam()), Ints.fromByteArray(msg.getVal()));
qCount.put(new String(msg.getColFam()), new AtomicInteger());
sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap());
offset.put(new String(msg.getColFam()), new AtomicLong(0));
// q.put(new String(msg.getColFam()), Longs.fromByteArray(msg.getVal()));
// qCount.put(new String(msg.getColFam()), new AtomicInteger(0));
// sessionCoordinator.put(new String(msg.getColFam()), PlatformDependent.newConcurrentHashMap());

ctx.writeAndFlush(new DefaultChicagoMessage(
msg.getId(),
Op.RESPONSE,
msg.getColFam(),
null,
Boolean.toString(true).getBytes(),
Longs.toByteArray(offset.get(new String(msg.getColFam())).get()))).addListener(writeComplete);
qCount.get(new String(msg.getColFam())).incrementAndGet();
sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get());
//qCount.get(new String(msg.getColFam())).incrementAndGet();
//sessionCoordinator.get(new String(msg.getColFam())).put(new String(msg.getKey()), offset.get(new String(msg.getColFam())).get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public ChicagoServer(ChiConfig config) {
this.db = getStorageProvider(config);
this.nodeWatcher = new NodeWatcher(NODE_LIST_PATH, NODE_LOCK_PATH, config.getQuorum());
this.paxosClient = new ChicagoPaxosClient(config.getZkHosts(), config.getReplicaSize());
this.dbRouter = new DBRouter(db, paxosClient);
this.dbRouter = new DBRouter(db, paxosClient, config.conf);
}

private StorageProvider getStorageProvider(ChiConfig config) {
Expand Down
Loading