diff --git a/pom.xml b/pom.xml
index d360696..14e7039 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,7 +72,7 @@
src/main/resources
*.xml
- connect.conf
+ runtime.conf
true
diff --git a/run_worker.sh b/run_worker.sh
index bea760a..e977640 100755
--- a/run_worker.sh
+++ b/run_worker.sh
@@ -1,5 +1,5 @@
#!/bin/bash
export OMS_RMQ_DIRECT_NAME_SRV=true
echo "run rumtime worker"
-cd target/distribution/ && java -cp .:./conf/:./lib/* io.openmessaging.connect.runtime.ConnectStartup -c conf/connect.conf
+cd target/distribution/ && java -cp .:./conf/:./lib/* io.openmessaging.connect.runtime.RuntimeStartup -c conf/runtime.conf
diff --git a/src/main/java/io/openmessaging/connect/runtime/ConnectController.java b/src/main/java/io/openmessaging/connect/runtime/RuntimeController.java
similarity index 95%
rename from src/main/java/io/openmessaging/connect/runtime/ConnectController.java
rename to src/main/java/io/openmessaging/connect/runtime/RuntimeController.java
index 6c81b66..104070b 100644
--- a/src/main/java/io/openmessaging/connect/runtime/ConnectController.java
+++ b/src/main/java/io/openmessaging/connect/runtime/RuntimeController.java
@@ -33,7 +33,7 @@
/**
* Connect controller to access and control all resource in runtime.
*/
-public class ConnectController {
+public class RuntimeController {
private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME);
@@ -87,11 +87,11 @@ public class ConnectController {
*/
private ScheduledExecutorService scheduledExecutorService;
- public ConnectController(ConnectConfig connectConfig) {
+ public RuntimeController(ConnectConfig connectConfig) {
this.connectConfig = connectConfig;
this.messagingAccessWrapper = new MessagingAccessWrapper();
- MessagingAccessPoint messageAccessPoint = messagingAccessWrapper.getMessageAccessPoint(connectConfig.getOmsDriverUrl());
+ MessagingAccessPoint messageAccessPoint = messagingAccessWrapper.getMessageAccessPoint(connectConfig.getRuntimeOmsDriverUrl());
this.clusterManagementService = new ClusterManagementServiceImpl(connectConfig, messageAccessPoint);
this.configManagementService = new ConfigManagementServiceImpl(connectConfig, messageAccessPoint);
this.positionManagementService = new PositionManagementServiceImpl(connectConfig, messageAccessPoint);
@@ -118,7 +118,7 @@ public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
- ConnectController.this.configManagementService.persist();
+ RuntimeController.this.configManagementService.persist();
} catch (Exception e) {
log.error("schedule persist config error.", e);
}
@@ -128,7 +128,7 @@ public void start() {
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
- ConnectController.this.positionManagementService.persist();
+ RuntimeController.this.positionManagementService.persist();
} catch (Exception e) {
log.error("schedule persist position error.", e);
}
diff --git a/src/main/java/io/openmessaging/connect/runtime/ConnectStartup.java b/src/main/java/io/openmessaging/connect/runtime/RuntimeStartup.java
similarity index 95%
rename from src/main/java/io/openmessaging/connect/runtime/ConnectStartup.java
rename to src/main/java/io/openmessaging/connect/runtime/RuntimeStartup.java
index a1390f9..7b2bbd4 100644
--- a/src/main/java/io/openmessaging/connect/runtime/ConnectStartup.java
+++ b/src/main/java/io/openmessaging/connect/runtime/RuntimeStartup.java
@@ -36,7 +36,7 @@
/**
* Startup class of the runtime worker.
*/
-public class ConnectStartup {
+public class RuntimeStartup {
private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME);
@@ -51,7 +51,7 @@ public static void main(String[] args) {
start(createConnectController(args));
}
- private static void start(ConnectController controller) {
+ private static void start(RuntimeController controller) {
try {
controller.start();
@@ -70,7 +70,7 @@ private static void start(ConnectController controller) {
* @param args
* @return
*/
- private static ConnectController createConnectController(String[] args) {
+ private static RuntimeController createConnectController(String[] args) {
try {
@@ -99,7 +99,7 @@ private static ConnectController createConnectController(String[] args) {
}
// Create controller and initialize.
- ConnectController controller = new ConnectController(connectConfig);
+ RuntimeController controller = new RuntimeController(connectConfig);
controller.initialize();
// Invoked when shutdown.
diff --git a/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java b/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java
index a758af4..296f03a 100644
--- a/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java
+++ b/src/main/java/io/openmessaging/connect/runtime/config/ConnectConfig.java
@@ -38,7 +38,7 @@ public class ConnectConfig {
* OMS driver url, which determine the specific MQ to send and consume message.
* The MQ is used for internal management of the connect runtime.
*/
- private String omsDriverUrl = "oms:rocketmq://localhost:9876/default:default";
+ private String runtimeOmsDriverUrl = "oms:rocketmq://localhost:9876/default:default";
/**
* Http port for REST API.
@@ -55,14 +55,6 @@ public class ConnectConfig {
*/
private int configPersistInterval = 20 * 1000;
- public String getOmsDriverUrl() {
- return omsDriverUrl;
- }
-
- public void setOmsDriverUrl(String omsDriverUrl) {
- this.omsDriverUrl = omsDriverUrl;
- }
-
public String getWorkerId() {
return workerId;
}
@@ -102,4 +94,12 @@ public int getConfigPersistInterval() {
public void setConfigPersistInterval(int configPersistInterval) {
this.configPersistInterval = configPersistInterval;
}
+
+ public String getRuntimeOmsDriverUrl() {
+ return runtimeOmsDriverUrl;
+ }
+
+ public void setRuntimeOmsDriverUrl(String runtimeOmsDriverUrl) {
+ this.runtimeOmsDriverUrl = runtimeOmsDriverUrl;
+ }
}
diff --git a/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java b/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java
index a5cdfda..077e4e6 100644
--- a/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java
+++ b/src/main/java/io/openmessaging/connect/runtime/config/RuntimeConfigDefine.java
@@ -32,11 +32,11 @@ public class RuntimeConfigDefine {
public static final String CONNECTOR_CLASS = "connector-class";
public static final String TASK_CLASS = "task-class";
-
+
/**
- * OMS driver url for the connector.
+ * OMS driver url for the runtime connector
*/
- public static final String OMS_DRIVER_URL = "oms-driver-url";
+ public static final String RUNTIME_OMS_DRIVER_URL = "runtime-oms-driver-url";
/**
* Last updated time of the configuration.
@@ -59,8 +59,8 @@ public class RuntimeConfigDefine {
public static final Set REQUEST_CONFIG = new HashSet(){
{
add(CONNECTOR_CLASS);
- add(OMS_DRIVER_URL);
add(SOURCE_RECORD_CONVERTER);
+ add(RUNTIME_OMS_DRIVER_URL);
}
};
diff --git a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java
index 0f2d8f4..9f35b2c 100644
--- a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java
+++ b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/Worker.java
@@ -209,7 +209,7 @@ public synchronized void startTasks(Map> taskConfi
if(task instanceof SourceTask){
Producer producer = messagingAccessWrapper
- .getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.OMS_DRIVER_URL)).createProducer();
+ .getMessageAccessPoint(keyValue.getString(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL)).createProducer();
producer.startup();
WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName,
(SourceTask) task, keyValue,
diff --git a/src/main/java/io/openmessaging/connect/runtime/converter/ConnAndTaskConfigConverter.java b/src/main/java/io/openmessaging/connect/runtime/converter/ConnAndTaskConfigConverter.java
index ff5c187..c2376fc 100644
--- a/src/main/java/io/openmessaging/connect/runtime/converter/ConnAndTaskConfigConverter.java
+++ b/src/main/java/io/openmessaging/connect/runtime/converter/ConnAndTaskConfigConverter.java
@@ -19,7 +19,7 @@
import io.openmessaging.connect.runtime.common.ConnAndTaskConfigs;
import io.openmessaging.connect.runtime.common.LoggerName;
-import io.openmessaging.connect.runtime.utils.TransferUtils;
+import io.openmessaging.connect.runtime.utils.TransferUtil;
import io.openmessaging.connector.api.data.Converter;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
@@ -41,12 +41,12 @@ public byte[] objectToByte(ConnAndTaskConfigs object) {
Map connectorMap = new HashMap<>();
Map taskMap = new HashMap<>();
for(String key : configs.getConnectorConfigs().keySet()){
- connectorMap.put(key, TransferUtils.keyValueToString(configs.getConnectorConfigs().get(key)));
+ connectorMap.put(key, TransferUtil.keyValueToString(configs.getConnectorConfigs().get(key)));
}
for(String key : configs.getTaskConfigs().keySet()){
- taskMap.put(key, TransferUtils.keyValueListToString(configs.getTaskConfigs().get(key)));
+ taskMap.put(key, TransferUtil.keyValueListToString(configs.getTaskConfigs().get(key)));
}
- return TransferUtils.toJsonString(connectorMap, taskMap).getBytes("UTF-8");
+ return TransferUtil.toJsonString(connectorMap, taskMap).getBytes("UTF-8");
} catch (Exception e) {
log.error("ConnAndTaskConfigConverter#objectToByte failed", e);
}
@@ -58,7 +58,7 @@ public ConnAndTaskConfigs byteToObject(byte[] bytes) {
try {
String jsonString = new String(bytes, "UTF-8");
- ConnAndTaskConfigs configs = TransferUtils.toConnAndTaskConfigs(jsonString);
+ ConnAndTaskConfigs configs = TransferUtil.toConnAndTaskConfigs(jsonString);
return configs;
} catch (UnsupportedEncodingException e) {
log.error("ConnAndTaskConfigConverter#byteToObject failed", e);
diff --git a/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java b/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java
index 199d454..5c9fd17 100644
--- a/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java
+++ b/src/main/java/io/openmessaging/connect/runtime/rest/RestHandler.java
@@ -20,7 +20,7 @@
import com.alibaba.fastjson.JSON;
import io.javalin.Context;
import io.javalin.Javalin;
-import io.openmessaging.connect.runtime.ConnectController;
+import io.openmessaging.connect.runtime.RuntimeController;
import io.openmessaging.connect.runtime.common.ConnectKeyValue;
import io.openmessaging.connect.runtime.common.LoggerName;
import io.openmessaging.connect.runtime.connectorwrapper.WorkerConnector;
@@ -38,15 +38,15 @@ public class RestHandler {
private static final Logger log = LoggerFactory.getLogger(LoggerName.OMS_RUNTIME);
- private final ConnectController connectController;
+ private final RuntimeController runtimeController;
- public RestHandler(ConnectController connectController){
- this.connectController = connectController;
- Javalin app = Javalin.start(connectController.getConnectConfig().getHttpPort());
- app.get("/connectors/:connectorName", this::handleCreateConnector);
+ public RestHandler(RuntimeController runtimeController){
+ this.runtimeController = runtimeController;
+ Javalin app = Javalin.start(runtimeController.getConnectConfig().getHttpPort());
+ app.post("/connectors/:connectorName", this::handleCreateConnector);
app.get("/connectors/:connectorName/config", this::handleQueryConnectorConfig);
app.get("/connectors/:connectorName/status", this::handleQueryConnectorStatus);
- app.get("/connectors/:connectorName/stop", this::handleStopConnector);
+ app.delete("/connectors/:connectorName/stop", this::handleStopConnector);
app.get("/getClusterInfo", this::getClusterInfo);
app.get("/getConfigInfo", this::getConfigInfo);
app.get("/getAllocatedInfo", this::getAllocatedInfo);
@@ -54,8 +54,8 @@ public RestHandler(ConnectController connectController){
private void getAllocatedInfo(Context context){
- Set workerConnectors = connectController.getWorker().getWorkingConnectors();
- Set workerSourceTasks = connectController.getWorker().getWorkingTasks();
+ Set workerConnectors = runtimeController.getWorker().getWorkingConnectors();
+ Set workerSourceTasks = runtimeController.getWorker().getWorkingTasks();
StringBuilder sb = new StringBuilder();
sb.append("working connectors:\n");
for(WorkerConnector workerConnector : workerConnectors){
@@ -70,13 +70,13 @@ private void getAllocatedInfo(Context context){
private void getConfigInfo(Context context) {
- Map connectorConfigs = connectController.getConfigManagementService().getConnectorConfigs();
- Map> taskConfigs = connectController.getConfigManagementService().getTaskConfigs();
+ Map connectorConfigs = runtimeController.getConfigManagementService().getConnectorConfigs();
+ Map> taskConfigs = runtimeController.getConfigManagementService().getTaskConfigs();
context.result("ConnectorConfigs:"+JSON.toJSONString(connectorConfigs)+"\nTaskConfigs:"+JSON.toJSONString(taskConfigs));
}
private void getClusterInfo(Context context) {
- context.result(JSON.toJSONString(connectController.getClusterManagementService().getAllAliveWorkers()));
+ context.result(JSON.toJSONString(runtimeController.getClusterManagementService().getAllAliveWorkers()));
}
private void handleCreateConnector(Context context) {
@@ -89,13 +89,14 @@ private void handleCreateConnector(Context context) {
}
try {
- String result = connectController.getConfigManagementService().putConnectorConfig(connectorName, configs);
+ String result = runtimeController.getConfigManagementService().putConnectorConfig(connectorName, configs);
if(result != null && result.length() > 0){
context.result(result);
}else{
context.result("success");
}
} catch (Exception e) {
+ log.error("oms connect runtime create the connector exception, ", e);
context.result("failed");
}
}
@@ -104,8 +105,8 @@ private void handleQueryConnectorConfig(Context context){
String connectorName = context.param("connectorName");
- Map connectorConfigs = connectController.getConfigManagementService().getConnectorConfigs();
- Map> taskConfigs = connectController.getConfigManagementService().getTaskConfigs();
+ Map connectorConfigs = runtimeController.getConfigManagementService().getConnectorConfigs();
+ Map> taskConfigs = runtimeController.getConfigManagementService().getTaskConfigs();
StringBuilder sb = new StringBuilder();
sb.append("ConnectorConfigs:")
.append(JSON.toJSONString(connectorConfigs.get(connectorName)))
@@ -118,7 +119,7 @@ private void handleQueryConnectorConfig(Context context){
private void handleQueryConnectorStatus(Context context){
String connectorName = context.param("connectorName");
- Map connectorConfigs = connectController.getConfigManagementService().getConnectorConfigs();
+ Map connectorConfigs = runtimeController.getConfigManagementService().getConnectorConfigs();
if(connectorConfigs.containsKey(connectorName)){
context.result("running");
@@ -131,9 +132,10 @@ private void handleStopConnector(Context context){
String connectorName = context.param("connectorName");
try {
- connectController.getConfigManagementService().removeConnectorConfig(connectorName);
+ runtimeController.getConfigManagementService().removeConnectorConfig(connectorName);
context.result("success");
} catch (Exception e) {
+ log.error("oms connect runtime stop the connector exception, ", e);
context.result("failed");
}
}
diff --git a/src/main/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImpl.java b/src/main/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImpl.java
index a6daae6..f03864b 100644
--- a/src/main/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/src/main/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -159,7 +159,7 @@ public String putConnectorConfig(String connectorName, ConnectKeyValue configs)
newKeyValue.put(key, keyValue.getString(key));
}
newKeyValue.put(RuntimeConfigDefine.TASK_CLASS, connector.taskClass().getName());
- newKeyValue.put(RuntimeConfigDefine.OMS_DRIVER_URL, configs.getString(RuntimeConfigDefine.OMS_DRIVER_URL));
+ newKeyValue.put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, configs.getString(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL));
newKeyValue.put(RuntimeConfigDefine.UPDATE_TIMESATMP, currentTimestamp);
converterdConfigs.add(newKeyValue);
}
diff --git a/src/main/java/io/openmessaging/connect/runtime/service/RebalanceService.java b/src/main/java/io/openmessaging/connect/runtime/service/RebalanceService.java
index 0ee8fbc..007b962 100644
--- a/src/main/java/io/openmessaging/connect/runtime/service/RebalanceService.java
+++ b/src/main/java/io/openmessaging/connect/runtime/service/RebalanceService.java
@@ -47,7 +47,7 @@ public RebalanceService(RebalanceImpl rebalanceImpl, ConfigManagementService con
this.rebalanceImpl = rebalanceImpl;
this.configManagementService = configManagementService;
this.clusterManagementService = clusterManagementService;
- this.configManagementService.registerListener(new ConnectorConnectorConfigChangeListenerImpl());
+ this.configManagementService.registerListener(new ConnectorConfigChangeListenerImpl());
this.clusterManagementService.registerListener(new WorkerStatusListenerImpl());
}
@@ -79,7 +79,7 @@ public void onWorkerChange() {
}
}
- class ConnectorConnectorConfigChangeListenerImpl implements ConfigManagementService.ConnectorConfigUpdateListener {
+ class ConnectorConfigChangeListenerImpl implements ConfigManagementService.ConnectorConfigUpdateListener {
/**
* When config change.
diff --git a/src/main/java/io/openmessaging/connect/runtime/utils/TransferUtils.java b/src/main/java/io/openmessaging/connect/runtime/utils/TransferUtil.java
similarity index 99%
rename from src/main/java/io/openmessaging/connect/runtime/utils/TransferUtils.java
rename to src/main/java/io/openmessaging/connect/runtime/utils/TransferUtil.java
index 4b159e2..6914f7d 100644
--- a/src/main/java/io/openmessaging/connect/runtime/utils/TransferUtils.java
+++ b/src/main/java/io/openmessaging/connect/runtime/utils/TransferUtil.java
@@ -26,7 +26,7 @@
import java.util.List;
import java.util.Map;
-public class TransferUtils {
+public class TransferUtil {
public static String keyValueToString(ConnectKeyValue keyValue) {
diff --git a/src/main/resources/connect.conf b/src/main/resources/runtime.conf
similarity index 91%
rename from src/main/resources/connect.conf
rename to src/main/resources/runtime.conf
index 868be08..8fda7aa 100644
--- a/src/main/resources/connect.conf
+++ b/src/main/resources/runtime.conf
@@ -16,11 +16,11 @@
## Worker id, should be unique
workerId=DEFAULT_WORKER_1
-## Choose a MQ to support runtime data synchronize
-omsDriverUrl=oms:rocketmq://localhost:9876/default:default
-
## Http prot for user to access REST API
httpPort=8081
## local file dir for config store
-storePathRootDir=./storeRoot/
\ No newline at end of file
+storePathRootDir=./storeRoot/
+
+## Choose a MQ to support runtime data synchronize
+runtimeOmsDriverUrl=oms:rocketmq://localhost:9876/default:default
diff --git a/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java b/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java
index 6bcead1..eb5c6a0 100644
--- a/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java
+++ b/src/test/java/io/openmessaging/connect/runtime/config/ConnectConfigTest.java
@@ -27,10 +27,11 @@ public class ConnectConfigTest {
public void testConnectConfigAttribute() {
ConnectConfig connectConfig = new ConnectConfig();
connectConfig.setHttpPort(8081);
- connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
connectConfig.setWorkerId("DEFAULT_WORKER_1");
+ connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default3:default3");
assertThat(connectConfig.getHttpPort()).isEqualTo(8081);
- assertThat(connectConfig.getOmsDriverUrl()).isEqualTo("oms:rocketmq://localhost:9876/default:default");
assertThat(connectConfig.getWorkerId()).isEqualTo("DEFAULT_WORKER_1");
+ assertThat(connectConfig.getRuntimeOmsDriverUrl()).isEqualTo("oms:rocketmq://localhost:9876/default3:default3");
+
}
}
\ No newline at end of file
diff --git a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java
index 2e15fac..ecc47b5 100644
--- a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java
+++ b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerTest.java
@@ -65,7 +65,7 @@ public class WorkerTest {
public void init() {
connectConfig = new ConnectConfig();
connectConfig.setHttpPort(8081);
- connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
+ connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
connectConfig.setWorkerId("DEFAULT_WORKER_1");
connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
messagingAccessWrapper = new MessagingAccessWrapper();
@@ -139,7 +139,7 @@ public void testStartTasks() {
connectKeyValue.getProperties().put("key2", "TEST-CONN-" + i + "2");
connectKeyValue.getProperties().put(RuntimeConfigDefine.TASK_CLASS, TestSourceTask.class.getName());
connectKeyValue.getProperties().put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, TestConverter.class.getName());
- connectKeyValue.getProperties().put(RuntimeConfigDefine.OMS_DRIVER_URL, this.connectConfig.getOmsDriverUrl());
+ connectKeyValue.getProperties().put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, this.connectConfig.getRuntimeOmsDriverUrl());
connectKeyValues.add(connectKeyValue);
taskConfigs.put("TEST-CONN-" + i, connectKeyValues);
}
diff --git a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java
index 3b9e6b1..61c6d4b 100644
--- a/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java
+++ b/src/test/java/io/openmessaging/connect/runtime/rest/RestHandlerTest.java
@@ -19,7 +19,7 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
-import io.openmessaging.connect.runtime.ConnectController;
+import io.openmessaging.connect.runtime.RuntimeController;
import io.openmessaging.connect.runtime.common.ConnectKeyValue;
import io.openmessaging.connect.runtime.config.ConnectConfig;
import io.openmessaging.connect.runtime.config.RuntimeConfigDefine;
@@ -44,7 +44,9 @@
import java.util.Set;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
@@ -59,13 +61,11 @@
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.when;
-//import org.apache.rocketmq.mysql.MysqlConstants;
-
@RunWith(MockitoJUnitRunner.class)
public class RestHandlerTest {
@Mock
- private ConnectController connectController;
+ private RuntimeController runtimeController;
@Mock
private ConfigManagementService configManagementService;
@@ -131,20 +131,20 @@ public class RestHandlerTest {
@Before
public void init() throws Exception {
- when(connectController.getConnectConfig()).thenReturn(connectConfig);
+ when(runtimeController.getConnectConfig()).thenReturn(connectConfig);
when(connectConfig.getHttpPort()).thenReturn(8081);
- when(connectController.getConfigManagementService()).thenReturn(configManagementService);
+ when(runtimeController.getConfigManagementService()).thenReturn(configManagementService);
when(configManagementService.putConnectorConfig(anyString(), any(ConnectKeyValue.class))).thenReturn("");
String connectName = "testConnector";
ConnectKeyValue connectKeyValue = new ConnectKeyValue();
connectKeyValue.put(RuntimeConfigDefine.CONNECTOR_CLASS, "io.openmessaging.connect.runtime.service.TestConnector");
- connectKeyValue.put(RuntimeConfigDefine.OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default");
+ connectKeyValue.put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default");
connectKeyValue.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, "source-record-converter");
ConnectKeyValue connectKeyValue1 = new ConnectKeyValue();
connectKeyValue1.put(RuntimeConfigDefine.CONNECTOR_CLASS, "io.openmessaging.connect.runtime.service.TestConnector");
- connectKeyValue1.put(RuntimeConfigDefine.OMS_DRIVER_URL, "oms:kafka://localhost:1234/default:default");
+ connectKeyValue1.put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, "oms:kafka://localhost:1234/default:default");
connectKeyValue1.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, "source-record-converter1");
List connectKeyValues = new ArrayList(8) {
@@ -172,7 +172,7 @@ public void init() throws Exception {
}
};
- when(connectController.getClusterManagementService()).thenReturn(clusterManagementService);
+ when(runtimeController.getClusterManagementService()).thenReturn(clusterManagementService);
when(clusterManagementService.getAllAliveWorkers()).thenReturn(aliveWorker);
sourcePartition = "127.0.0.13306".getBytes("UTF-8");
@@ -202,51 +202,51 @@ public void init() throws Exception {
add(workerSourceTask2);
}
};
- when(connectController.getWorker()).thenReturn(worker);
+ when(runtimeController.getWorker()).thenReturn(worker);
when(worker.getWorkingConnectors()).thenReturn(workerConnectors);
when(worker.getWorkingTasks()).thenReturn(workerSourceTasks);
- restHandler = new RestHandler(connectController);
+ restHandler = new RestHandler(runtimeController);
httpClient = HttpClientBuilder.create().build();
}
@Test
public void testRESTful() throws Exception {
- URIBuilder uriBuilder = new URIBuilder(String.format(CREATE_CONNECTOR_URL, "testConnectorName"));
- uriBuilder.setParameter("config", "{\"connector-class\": \"org.apache.rocketmq.mysql.connector.MysqlConnector\",\"mysqlAddr\": \"112.74.179.68\",\"mysqlPort\": \"3306\",\"mysqlUsername\": \"canal\",\"mysqlPassword\": \"canal\",\"source-record-converter\":\"io.openmessaging.connect.runtime.converter.JsonConverter\",\"oms-driver-url\":\"oms:rocketmq://localhost:9876/default:default\"}");
- URI uri = uriBuilder.build();
- HttpGet httpGet = new HttpGet(uri);
- HttpResponse httpResponse = httpClient.execute(httpGet);
- assertEquals(200, httpResponse.getStatusLine().getStatusCode());
- assertEquals("success", EntityUtils.toString(httpResponse.getEntity(), "UTF-8"));
-
- URIBuilder uriBuilder1 = new URIBuilder(String.format(STOP_CONNECTOR_URL, "testConnectorName"));
- URI uri1 = uriBuilder1.build();
- HttpGet httpGet1 = new HttpGet(uri1);
- HttpResponse httpResponse1 = httpClient.execute(httpGet1);
- assertEquals(200, httpResponse1.getStatusLine().getStatusCode());
- assertEquals("success", EntityUtils.toString(httpResponse1.getEntity(), "UTF-8"));
-
- URIBuilder uriBuilder2 = new URIBuilder(GET_CLUSTER_INFO_URL);
- URI uri2 = uriBuilder2.build();
- HttpGet httpGet2 = new HttpGet(uri2);
- HttpResponse httpResponse2 = httpClient.execute(httpGet2);
- assertEquals(200, httpResponse2.getStatusLine().getStatusCode());
- assertEquals(JSON.toJSONString(aliveWorker), EntityUtils.toString(httpResponse2.getEntity(), "UTF-8"));
-
- URIBuilder uriBuilder3 = new URIBuilder(GET_CONFIG_INFO_URL);
- URI uri3 = uriBuilder3.build();
- HttpGet httpGet3 = new HttpGet(uri3);
- HttpResponse httpResponse3 = httpClient.execute(httpGet3);
- assertEquals(200, httpResponse3.getStatusLine().getStatusCode());
+ URIBuilder uriCreateBuilder = new URIBuilder(String.format(CREATE_CONNECTOR_URL, "testConnectorName"));
+ uriCreateBuilder.setParameter("config", "{\"connector-class\": \"org.apache.rocketmq.mysql.connector.MysqlConnector\",\"mysqlAddr\": \"112.74.179.68\",\"mysqlPort\": \"3306\",\"mysqlUsername\": \"canal\",\"mysqlPassword\": \"canal\",\"source-record-converter\":\"io.openmessaging.connect.runtime.converter.JsonConverter\",\"oms-driver-url\":\"oms:rocketmq://localhost:9876/default:default\"}");
+ URI uriPost = uriCreateBuilder.build();
+ HttpPost httpPost = new HttpPost(uriPost);
+ HttpResponse httpPostResponse = httpClient.execute(httpPost);
+ assertEquals(200, httpPostResponse.getStatusLine().getStatusCode());
+ assertEquals("success", EntityUtils.toString(httpPostResponse.getEntity(), "UTF-8"));
+
+ URIBuilder uriDeleteBuilder = new URIBuilder(String.format(STOP_CONNECTOR_URL, "testConnectorName"));
+ URI uriDelete = uriDeleteBuilder.build();
+ HttpDelete httpDelete = new HttpDelete(uriDelete);
+ HttpResponse httpDeleteResponse = httpClient.execute(httpDelete);
+ assertEquals(200, httpDeleteResponse.getStatusLine().getStatusCode());
+ assertEquals("success", EntityUtils.toString(httpDeleteResponse.getEntity(), "UTF-8"));
+
+ URIBuilder uriClusterInfoBuilder = new URIBuilder(GET_CLUSTER_INFO_URL);
+ URI uriClusterInfo = uriClusterInfoBuilder.build();
+ HttpGet httpClusterInfo = new HttpGet(uriClusterInfo);
+ HttpResponse httpClusterInfoResponse = httpClient.execute(httpClusterInfo);
+ assertEquals(200, httpClusterInfoResponse.getStatusLine().getStatusCode());
+ assertEquals(JSON.toJSONString(aliveWorker), EntityUtils.toString(httpClusterInfoResponse.getEntity(), "UTF-8"));
+
+ URIBuilder uriConfigInfoBuilder = new URIBuilder(GET_CONFIG_INFO_URL);
+ URI uriConfigInfo = uriConfigInfoBuilder.build();
+ HttpGet httpConfigInfo = new HttpGet(uriConfigInfo);
+ HttpResponse httpConfigInfoResponse = httpClient.execute(httpConfigInfo);
+ assertEquals(200, httpConfigInfoResponse.getStatusLine().getStatusCode());
String expectedResultConfig = "ConnectorConfigs:" + JSON.toJSONString(connectorConfigs) + "\nTaskConfigs:" + JSON.toJSONString(taskConfigs);
- assertEquals(expectedResultConfig, EntityUtils.toString(httpResponse3.getEntity(), "UTF-8"));
+ assertEquals(expectedResultConfig, EntityUtils.toString(httpConfigInfoResponse.getEntity(), "UTF-8"));
- URIBuilder uriBuilder4 = new URIBuilder(GET_ALLOCATED_INFO_URL);
- URI uri4 = uriBuilder4.build();
- HttpGet httpGet4 = new HttpGet(uri4);
- HttpResponse httpResponse4 = httpClient.execute(httpGet4);
+ URIBuilder uriAllocatedInfoBuilder = new URIBuilder(GET_ALLOCATED_INFO_URL);
+ URI uriAllocatedInfo = uriAllocatedInfoBuilder.build();
+ HttpGet httpAllocatedInfo = new HttpGet(uriAllocatedInfo);
+ HttpResponse httpResponse4 = httpClient.execute(httpAllocatedInfo);
assertEquals(200, httpResponse4.getStatusLine().getStatusCode());
StringBuilder sb = new StringBuilder();
sb.append("working connectors:\n");
diff --git a/src/test/java/io/openmessaging/connect/runtime/service/ClusterManagementServiceImplTest.java b/src/test/java/io/openmessaging/connect/runtime/service/ClusterManagementServiceImplTest.java
index e0bf050..d9f7b11 100644
--- a/src/test/java/io/openmessaging/connect/runtime/service/ClusterManagementServiceImplTest.java
+++ b/src/test/java/io/openmessaging/connect/runtime/service/ClusterManagementServiceImplTest.java
@@ -67,7 +67,7 @@ public class ClusterManagementServiceImplTest {
public void init() {
connectConfig = new ConnectConfig();
connectConfig.setHttpPort(8081);
- connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
+ connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
connectConfig.setWorkerId("testWorkerId");
doReturn(producer).when(messagingAccessPoint).createProducer();
diff --git a/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java b/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java
index 6c035b3..82e6117 100644
--- a/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java
+++ b/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java
@@ -88,14 +88,14 @@ public class ConfigManagementServiceImplTest {
public void init() throws Exception {
connectConfig = new ConnectConfig();
connectConfig.setHttpPort(8081);
- connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
+ connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
connectConfig.setWorkerId("testWorkerId");
connectorName = "testConnectorName";
connectKeyValue = new ConnectKeyValue();
connectKeyValue.put(RuntimeConfigDefine.CONNECTOR_CLASS, "io.openmessaging.connect.runtime.service.TestConnector");
- connectKeyValue.put(RuntimeConfigDefine.OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default");
+ connectKeyValue.put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default");
connectKeyValue.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, "source-record-converter");
diff --git a/src/test/java/io/openmessaging/connect/runtime/service/PositionManagementServiceImplTest.java b/src/test/java/io/openmessaging/connect/runtime/service/PositionManagementServiceImplTest.java
index eb6b737..b437d76 100644
--- a/src/test/java/io/openmessaging/connect/runtime/service/PositionManagementServiceImplTest.java
+++ b/src/test/java/io/openmessaging/connect/runtime/service/PositionManagementServiceImplTest.java
@@ -84,7 +84,7 @@ public class PositionManagementServiceImplTest {
public void init() throws Exception {
connectConfig = new ConnectConfig();
connectConfig.setHttpPort(8081);
- connectConfig.setOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
+ connectConfig.setRuntimeOmsDriverUrl("oms:rocketmq://localhost:9876/default:default");
connectConfig.setStorePathRootDir(System.getProperty("user.home") + File.separator + "testConnectorStore");
connectConfig.setWorkerId("testWorkerId");
doReturn(producer).when(messagingAccessPoint).createProducer();
diff --git a/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilsTest.java b/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilTest.java
similarity index 83%
rename from src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilsTest.java
rename to src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilTest.java
index 1839a4e..804eb91 100644
--- a/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilsTest.java
+++ b/src/test/java/io/openmessaging/connect/runtime/utils/TransferUtilTest.java
@@ -26,7 +26,7 @@
import static org.junit.Assert.*;
-public class TransferUtilsTest {
+public class TransferUtilTest {
@Test
public void testKeyValue2StringKeyValue() {
@@ -35,8 +35,8 @@ public void testKeyValue2StringKeyValue() {
connectKeyValue.put("key2", 2L);
connectKeyValue.put("key3", 3.0);
connectKeyValue.put("key4", "4");
- String s = TransferUtils.keyValueToString(connectKeyValue);
- ConnectKeyValue connectKeyValue1 = TransferUtils.stringToKeyValue(s);
+ String s = TransferUtil.keyValueToString(connectKeyValue);
+ ConnectKeyValue connectKeyValue1 = TransferUtil.stringToKeyValue(s);
assertEquals(1, connectKeyValue1.getInt("key1"));
assertEquals(2L, connectKeyValue1.getLong("key2"));
assertTrue(Objects.equals(3.0, connectKeyValue1.getDouble("key3")));
@@ -55,8 +55,8 @@ public void testKeyValueList2String2KeyValueList() {
add(connectKeyValue);
}
};
- String s = TransferUtils.keyValueListToString(connectKeyValues);
- List connectKeyValues1 = TransferUtils.stringToKeyValueList(s);
+ String s = TransferUtil.keyValueListToString(connectKeyValues);
+ List connectKeyValues1 = TransferUtil.stringToKeyValueList(s);
assertNotNull(connectKeyValues1);
ConnectKeyValue connectKeyValue1 = connectKeyValues1.get(0);
assertEquals(1, connectKeyValue1.getInt("key1"));
@@ -70,7 +70,7 @@ public void testToJsonStringToConnAndTaskConfigs() {
String connectName = "testConnector";
ConnectKeyValue connectKeyValue = new ConnectKeyValue();
connectKeyValue.put(RuntimeConfigDefine.CONNECTOR_CLASS, "io.openmessaging.connect.runtime.service.TestConnector");
- connectKeyValue.put(RuntimeConfigDefine.OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default");
+ connectKeyValue.put(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL, "oms:rocketmq://localhost:9876/default:default");
connectKeyValue.put(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER, "source-record-converter");
List connectKeyValues = new ArrayList(8) {
{
@@ -93,13 +93,13 @@ public void testToJsonStringToConnAndTaskConfigs() {
Map connectorMap = new HashMap<>();
Map taskMap = new HashMap<>();
for (String key : connAndTaskConfigs.getConnectorConfigs().keySet()) {
- connectorMap.put(key, TransferUtils.keyValueToString(connAndTaskConfigs.getConnectorConfigs().get(key)));
+ connectorMap.put(key, TransferUtil.keyValueToString(connAndTaskConfigs.getConnectorConfigs().get(key)));
}
for (String key : connAndTaskConfigs.getTaskConfigs().keySet()) {
- taskMap.put(key, TransferUtils.keyValueListToString(connAndTaskConfigs.getTaskConfigs().get(key)));
+ taskMap.put(key, TransferUtil.keyValueListToString(connAndTaskConfigs.getTaskConfigs().get(key)));
}
- String s = TransferUtils.toJsonString(connectorMap, taskMap);
- ConnAndTaskConfigs connAndTaskConfigs1 = TransferUtils.toConnAndTaskConfigs(s);
+ String s = TransferUtil.toJsonString(connectorMap, taskMap);
+ ConnAndTaskConfigs connAndTaskConfigs1 = TransferUtil.toConnAndTaskConfigs(s);
Map connectorConfigs1 = connAndTaskConfigs1.getConnectorConfigs();
assertNotNull(connAndTaskConfigs1);
@@ -108,7 +108,7 @@ public void testToJsonStringToConnAndTaskConfigs() {
assertNotNull(connectKeyValue1);
assertEquals("io.openmessaging.connect.runtime.service.TestConnector", connectKeyValue1.getString(RuntimeConfigDefine.CONNECTOR_CLASS));
- assertEquals("oms:rocketmq://localhost:9876/default:default", connectKeyValue1.getString(RuntimeConfigDefine.OMS_DRIVER_URL));
+ assertEquals("oms:rocketmq://localhost:9876/default:default", connectKeyValue1.getString(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL));
assertEquals("source-record-converter", connectKeyValue1.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER));
Map> taskConfigs1 = connAndTaskConfigs1.getTaskConfigs();
@@ -119,7 +119,7 @@ public void testToJsonStringToConnAndTaskConfigs() {
ConnectKeyValue connectKeyValue2 = connectKeyValues1.get(0);
assertNotNull(connectKeyValue2);
assertEquals("io.openmessaging.connect.runtime.service.TestConnector", connectKeyValue2.getString(RuntimeConfigDefine.CONNECTOR_CLASS));
- assertEquals("oms:rocketmq://localhost:9876/default:default", connectKeyValue2.getString(RuntimeConfigDefine.OMS_DRIVER_URL));
+ assertEquals("oms:rocketmq://localhost:9876/default:default", connectKeyValue2.getString(RuntimeConfigDefine.RUNTIME_OMS_DRIVER_URL));
assertEquals("source-record-converter", connectKeyValue2.getString(RuntimeConfigDefine.SOURCE_RECORD_CONVERTER));
}