diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java index f4b259cc41..5041a90b3c 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java @@ -73,6 +73,15 @@ public static synchronized ServerOptions instance() { "master" ); + public static final ConfigOption ENABLE_SERVER_ROLE_ELECTION = + new ConfigOption<>( + "server.role_election", + "Whether to enable role election, if enabled, the server " + + "will elect a master node in the cluster.", + disallowEmpty(), + false + ); + public static final ConfigOption MAX_WORKER_THREADS = new ConfigOption<>( "restserver.max_worker_threads", diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java index daae180a36..80b52e1245 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java @@ -469,7 +469,8 @@ private void serverStarted(HugeConfig config) { NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase()); boolean supportRoleElection = !nodeRole.computer() && - this.supportRoleElection(); + this.supportRoleElection() && + config.get(ServerOptions.ENABLE_SERVER_ROLE_ELECTION); if (supportRoleElection) { // Init any server as Worker role, then do role election nodeRole = NodeRole.WORKER; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index f8e5602048..a638a79407 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -37,6 +37,13 @@ import org.apache.hugegraph.util.Log; import org.slf4j.Logger; +/** + * Central task management system that coordinates task scheduling and execution. + * Manages task schedulers for different graphs and handles role-based execution. + *

+ * Note: The local master-worker mechanism will be deprecated in version 1.7 + * (configuration has been removed from config files). + */ public final class TaskManager { private static final Logger LOG = Log.logger(TaskManager.class); @@ -44,8 +51,7 @@ public final class TaskManager { public static final String TASK_WORKER_PREFIX = "task-worker"; public static final String TASK_WORKER = TASK_WORKER_PREFIX + "-%d"; public static final String TASK_DB_WORKER = "task-db-worker-%d"; - public static final String SERVER_INFO_DB_WORKER = - "server-info-db-worker-%d"; + public static final String SERVER_INFO_DB_WORKER = "server-info-db-worker-%d"; public static final String TASK_SCHEDULER = "task-scheduler-%d"; public static final String OLAP_TASK_WORKER = "olap-task-worker-%d"; @@ -53,7 +59,7 @@ public final class TaskManager { public static final String EPHEMERAL_TASK_WORKER = "ephemeral-task-worker-%d"; public static final String DISTRIBUTED_TASK_SCHEDULER = "distributed-scheduler-%d"; - protected static final long SCHEDULE_PERIOD = 1000L; // unit ms + static final long SCHEDULE_PERIOD = 1000L; // unit ms private static final long TX_CLOSE_TIMEOUT = 30L; // unit s private static final int THREADS = 4; private static final TaskManager MANAGER = new TaskManager(THREADS); @@ -87,17 +93,13 @@ private TaskManager(int pool) { this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool( 1, SERVER_INFO_DB_WORKER); - this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, - SCHEMA_TASK_WORKER); - this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, - OLAP_TASK_WORKER); - this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, - EPHEMERAL_TASK_WORKER); + this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, SCHEMA_TASK_WORKER); + this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, OLAP_TASK_WORKER); + this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, EPHEMERAL_TASK_WORKER); this.distributedSchedulerExecutor = - ExecutorUtil.newPausableScheduledThreadPool(1, - DISTRIBUTED_TASK_SCHEDULER); + ExecutorUtil.newPausableScheduledThreadPool(1, DISTRIBUTED_TASK_SCHEDULER); - // For schedule task to run, just one thread is ok + // For a schedule task to run, just one thread is ok this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool( 1, TASK_SCHEDULER); // Start after 10x period time waiting for HugeGraphServer startup @@ -111,7 +113,9 @@ public void addScheduler(HugeGraphParams graph) { E.checkArgumentNotNull(graph, "The graph can't be null"); LOG.info("Use {} as the scheduler of graph ({})", graph.schedulerType(), graph.name()); - // TODO: If the current service is bound to a specified non-DEFAULT graph space, the graph outside of the current graph space will no longer create task schedulers (graph space) + // TODO: If the current service is bound to a specified non-DEFAULT graph space, the + // graph outside of the current graph space will no longer create task schedulers (graph + // space) switch (graph.schedulerType()) { case "distributed": { TaskScheduler scheduler = @@ -194,7 +198,7 @@ private void closeTaskTx(HugeGraphParams graph) { private void closeSchedulerTx(HugeGraphParams graph) { final Callable closeTx = () -> { - // Do close-tx for current thread + // Do close-tx for the current thread graph.closeTx(); // Let other threads run Thread.yield(); @@ -209,7 +213,7 @@ private void closeSchedulerTx(HugeGraphParams graph) { private void closeDistributedSchedulerTx(HugeGraphParams graph) { final Callable closeTx = () -> { - // Do close-tx for current thread + // Do close-tx for the current thread graph.closeTx(); // Let other threads run Thread.yield(); @@ -252,8 +256,7 @@ public void shutdown(long timeout) { if (!this.schedulerExecutor.isShutdown()) { this.schedulerExecutor.shutdown(); try { - terminated = this.schedulerExecutor.awaitTermination(timeout, - unit); + terminated = this.schedulerExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -262,8 +265,7 @@ public void shutdown(long timeout) { if (terminated && !this.distributedSchedulerExecutor.isShutdown()) { this.distributedSchedulerExecutor.shutdown(); try { - terminated = this.distributedSchedulerExecutor.awaitTermination(timeout, - unit); + terminated = this.distributedSchedulerExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -272,8 +274,7 @@ public void shutdown(long timeout) { if (terminated && !this.taskExecutor.isShutdown()) { this.taskExecutor.shutdown(); try { - terminated = this.taskExecutor.awaitTermination(timeout, - unit); + terminated = this.taskExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -282,8 +283,7 @@ public void shutdown(long timeout) { if (terminated && !this.serverInfoDbExecutor.isShutdown()) { this.serverInfoDbExecutor.shutdown(); try { - terminated = this.serverInfoDbExecutor.awaitTermination(timeout, - unit); + terminated = this.serverInfoDbExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -292,8 +292,7 @@ public void shutdown(long timeout) { if (terminated && !this.taskDbExecutor.isShutdown()) { this.taskDbExecutor.shutdown(); try { - terminated = this.taskDbExecutor.awaitTermination(timeout, - unit); + terminated = this.taskDbExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -302,8 +301,7 @@ public void shutdown(long timeout) { if (terminated && !this.ephemeralTaskExecutor.isShutdown()) { this.ephemeralTaskExecutor.shutdown(); try { - terminated = this.ephemeralTaskExecutor.awaitTermination(timeout, - unit); + terminated = this.ephemeralTaskExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -312,8 +310,7 @@ public void shutdown(long timeout) { if (terminated && !this.schemaTaskExecutor.isShutdown()) { this.schemaTaskExecutor.shutdown(); try { - terminated = this.schemaTaskExecutor.awaitTermination(timeout, - unit); + terminated = this.schemaTaskExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -322,8 +319,7 @@ public void shutdown(long timeout) { if (terminated && !this.olapTaskExecutor.isShutdown()) { this.olapTaskExecutor.shutdown(); try { - terminated = this.olapTaskExecutor.awaitTermination(timeout, - unit); + terminated = this.olapTaskExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -356,9 +352,12 @@ public void enableRoleElection() { public void onAsRoleMaster() { try { for (TaskScheduler entry : this.schedulers.values()) { - StandardTaskScheduler scheduler = (StandardTaskScheduler) entry; - ServerInfoManager serverInfoManager = scheduler.serverManager(); - serverInfoManager.changeServerRole(NodeRole.MASTER); + ServerInfoManager serverInfoManager = entry.serverManager(); + if (serverInfoManager != null) { + serverInfoManager.changeServerRole(NodeRole.MASTER); + } else { + LOG.warn("ServerInfoManager is null for graph {}", entry.graphName()); + } } } catch (Throwable e) { LOG.error("Exception occurred when change to master role", e); @@ -369,9 +368,12 @@ public void onAsRoleMaster() { public void onAsRoleWorker() { try { for (TaskScheduler entry : this.schedulers.values()) { - StandardTaskScheduler scheduler = (StandardTaskScheduler) entry; - ServerInfoManager serverInfoManager = scheduler.serverManager(); - serverInfoManager.changeServerRole(NodeRole.WORKER); + ServerInfoManager serverInfoManager = entry.serverManager(); + if (serverInfoManager != null) { + serverInfoManager.changeServerRole(NodeRole.WORKER); + } else { + LOG.warn("ServerInfoManager is null for graph {}", entry.graphName()); + } } } catch (Throwable e) { LOG.error("Exception occurred when change to worker role", e); @@ -379,8 +381,8 @@ public void onAsRoleWorker() { } } - protected void notifyNewTask(HugeTask task) { - Queue queue = ((ThreadPoolExecutor) this.schedulerExecutor) + void notifyNewTask(HugeTask task) { + Queue queue = this.schedulerExecutor .getQueue(); if (queue.size() <= 1) { /* @@ -398,10 +400,9 @@ private void scheduleOrExecuteJob() { // Called by scheduler timer try { for (TaskScheduler entry : this.schedulers.values()) { - TaskScheduler scheduler = entry; - // Maybe other thread close&remove scheduler at the same time - synchronized (scheduler) { - this.scheduleOrExecuteJobForGraph(scheduler); + // Maybe other threads close&remove scheduler at the same time + synchronized (entry) { + this.scheduleOrExecuteJobForGraph(entry); } } } catch (Throwable e) {