From 7ce809a35835ad199a0eb28ee2031d4ca7153fd9 Mon Sep 17 00:00:00 2001 From: vaughn Date: Tue, 29 Jul 2025 22:38:06 +0800 Subject: [PATCH 1/3] feat: add switch for role election --- .../java/org/apache/hugegraph/config/ServerOptions.java | 9 +++++++++ .../java/org/apache/hugegraph/core/GraphManager.java | 3 ++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java index f4b259cc41..2e933abf5c 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java @@ -73,6 +73,15 @@ public static synchronized ServerOptions instance() { "master" ); + public static final ConfigOption ENABLE_SERVER_ROLE_ELECTION = + new ConfigOption<>( + "server.role_election", + "Whether to enable role election, if enabled, the server " + + "will elect a master node in the cluster.", + disallowEmpty(), + true + ); + public static final ConfigOption MAX_WORKER_THREADS = new ConfigOption<>( "restserver.max_worker_threads", diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java index daae180a36..80b52e1245 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java @@ -469,7 +469,8 @@ private void serverStarted(HugeConfig config) { NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase()); boolean supportRoleElection = !nodeRole.computer() && - this.supportRoleElection(); + this.supportRoleElection() && + config.get(ServerOptions.ENABLE_SERVER_ROLE_ELECTION); if (supportRoleElection) { // Init any server as Worker role, then do role election nodeRole = NodeRole.WORKER; From df715c723c4be65dd1d8dd14286be0f418b2249a Mon Sep 17 00:00:00 2001 From: vaughn Date: Tue, 29 Jul 2025 23:41:57 +0800 Subject: [PATCH 2/3] feat: classNotCass --- .../apache/hugegraph/task/TaskManager.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index f8e5602048..8ffef080f4 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -356,9 +356,13 @@ public void enableRoleElection() { public void onAsRoleMaster() { try { for (TaskScheduler entry : this.schedulers.values()) { - StandardTaskScheduler scheduler = (StandardTaskScheduler) entry; - ServerInfoManager serverInfoManager = scheduler.serverManager(); - serverInfoManager.changeServerRole(NodeRole.MASTER); + ServerInfoManager serverInfoManager = entry.serverManager(); + if (serverInfoManager != null) { + serverInfoManager.changeServerRole(NodeRole.MASTER); + } else { + LOG.warn("ServerInfoManager is null for graph {}", + entry.graphName()); + } } } catch (Throwable e) { LOG.error("Exception occurred when change to master role", e); @@ -369,9 +373,13 @@ public void onAsRoleMaster() { public void onAsRoleWorker() { try { for (TaskScheduler entry : this.schedulers.values()) { - StandardTaskScheduler scheduler = (StandardTaskScheduler) entry; - ServerInfoManager serverInfoManager = scheduler.serverManager(); - serverInfoManager.changeServerRole(NodeRole.WORKER); + ServerInfoManager serverInfoManager = entry.serverManager(); + if (serverInfoManager != null) { + serverInfoManager.changeServerRole(NodeRole.WORKER); + } else { + LOG.warn("ServerInfoManager is null for graph {}", + entry.graphName()); + } } } catch (Throwable e) { LOG.error("Exception occurred when change to worker role", e); From 89b8625bd6179b548d6aa91c1d07e9b6e89d301c Mon Sep 17 00:00:00 2001 From: imbajin Date: Wed, 30 Jul 2025 16:04:23 +0800 Subject: [PATCH 3/3] Update role election default and refactor TaskManager Changed the default value of the role election option to false in ServerOptions. Refactored TaskManager for improved readability, updated comments, removed unnecessary protected modifiers, and streamlined thread pool initialization and shutdown logic. Added deprecation notice for local master-worker mechanism. --- .../hugegraph/config/ServerOptions.java | 2 +- .../apache/hugegraph/task/TaskManager.java | 75 +++++++++---------- 2 files changed, 35 insertions(+), 42 deletions(-) diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java index 2e933abf5c..5041a90b3c 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java @@ -79,7 +79,7 @@ public static synchronized ServerOptions instance() { "Whether to enable role election, if enabled, the server " + "will elect a master node in the cluster.", disallowEmpty(), - true + false ); public static final ConfigOption MAX_WORKER_THREADS = diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index 8ffef080f4..a638a79407 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -37,6 +37,13 @@ import org.apache.hugegraph.util.Log; import org.slf4j.Logger; +/** + * Central task management system that coordinates task scheduling and execution. + * Manages task schedulers for different graphs and handles role-based execution. + *

+ * Note: The local master-worker mechanism will be deprecated in version 1.7 + * (configuration has been removed from config files). + */ public final class TaskManager { private static final Logger LOG = Log.logger(TaskManager.class); @@ -44,8 +51,7 @@ public final class TaskManager { public static final String TASK_WORKER_PREFIX = "task-worker"; public static final String TASK_WORKER = TASK_WORKER_PREFIX + "-%d"; public static final String TASK_DB_WORKER = "task-db-worker-%d"; - public static final String SERVER_INFO_DB_WORKER = - "server-info-db-worker-%d"; + public static final String SERVER_INFO_DB_WORKER = "server-info-db-worker-%d"; public static final String TASK_SCHEDULER = "task-scheduler-%d"; public static final String OLAP_TASK_WORKER = "olap-task-worker-%d"; @@ -53,7 +59,7 @@ public final class TaskManager { public static final String EPHEMERAL_TASK_WORKER = "ephemeral-task-worker-%d"; public static final String DISTRIBUTED_TASK_SCHEDULER = "distributed-scheduler-%d"; - protected static final long SCHEDULE_PERIOD = 1000L; // unit ms + static final long SCHEDULE_PERIOD = 1000L; // unit ms private static final long TX_CLOSE_TIMEOUT = 30L; // unit s private static final int THREADS = 4; private static final TaskManager MANAGER = new TaskManager(THREADS); @@ -87,17 +93,13 @@ private TaskManager(int pool) { this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool( 1, SERVER_INFO_DB_WORKER); - this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, - SCHEMA_TASK_WORKER); - this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, - OLAP_TASK_WORKER); - this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, - EPHEMERAL_TASK_WORKER); + this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, SCHEMA_TASK_WORKER); + this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, OLAP_TASK_WORKER); + this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, EPHEMERAL_TASK_WORKER); this.distributedSchedulerExecutor = - ExecutorUtil.newPausableScheduledThreadPool(1, - DISTRIBUTED_TASK_SCHEDULER); + ExecutorUtil.newPausableScheduledThreadPool(1, DISTRIBUTED_TASK_SCHEDULER); - // For schedule task to run, just one thread is ok + // For a schedule task to run, just one thread is ok this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool( 1, TASK_SCHEDULER); // Start after 10x period time waiting for HugeGraphServer startup @@ -111,7 +113,9 @@ public void addScheduler(HugeGraphParams graph) { E.checkArgumentNotNull(graph, "The graph can't be null"); LOG.info("Use {} as the scheduler of graph ({})", graph.schedulerType(), graph.name()); - // TODO: If the current service is bound to a specified non-DEFAULT graph space, the graph outside of the current graph space will no longer create task schedulers (graph space) + // TODO: If the current service is bound to a specified non-DEFAULT graph space, the + // graph outside of the current graph space will no longer create task schedulers (graph + // space) switch (graph.schedulerType()) { case "distributed": { TaskScheduler scheduler = @@ -194,7 +198,7 @@ private void closeTaskTx(HugeGraphParams graph) { private void closeSchedulerTx(HugeGraphParams graph) { final Callable closeTx = () -> { - // Do close-tx for current thread + // Do close-tx for the current thread graph.closeTx(); // Let other threads run Thread.yield(); @@ -209,7 +213,7 @@ private void closeSchedulerTx(HugeGraphParams graph) { private void closeDistributedSchedulerTx(HugeGraphParams graph) { final Callable closeTx = () -> { - // Do close-tx for current thread + // Do close-tx for the current thread graph.closeTx(); // Let other threads run Thread.yield(); @@ -252,8 +256,7 @@ public void shutdown(long timeout) { if (!this.schedulerExecutor.isShutdown()) { this.schedulerExecutor.shutdown(); try { - terminated = this.schedulerExecutor.awaitTermination(timeout, - unit); + terminated = this.schedulerExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -262,8 +265,7 @@ public void shutdown(long timeout) { if (terminated && !this.distributedSchedulerExecutor.isShutdown()) { this.distributedSchedulerExecutor.shutdown(); try { - terminated = this.distributedSchedulerExecutor.awaitTermination(timeout, - unit); + terminated = this.distributedSchedulerExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -272,8 +274,7 @@ public void shutdown(long timeout) { if (terminated && !this.taskExecutor.isShutdown()) { this.taskExecutor.shutdown(); try { - terminated = this.taskExecutor.awaitTermination(timeout, - unit); + terminated = this.taskExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -282,8 +283,7 @@ public void shutdown(long timeout) { if (terminated && !this.serverInfoDbExecutor.isShutdown()) { this.serverInfoDbExecutor.shutdown(); try { - terminated = this.serverInfoDbExecutor.awaitTermination(timeout, - unit); + terminated = this.serverInfoDbExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -292,8 +292,7 @@ public void shutdown(long timeout) { if (terminated && !this.taskDbExecutor.isShutdown()) { this.taskDbExecutor.shutdown(); try { - terminated = this.taskDbExecutor.awaitTermination(timeout, - unit); + terminated = this.taskDbExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -302,8 +301,7 @@ public void shutdown(long timeout) { if (terminated && !this.ephemeralTaskExecutor.isShutdown()) { this.ephemeralTaskExecutor.shutdown(); try { - terminated = this.ephemeralTaskExecutor.awaitTermination(timeout, - unit); + terminated = this.ephemeralTaskExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -312,8 +310,7 @@ public void shutdown(long timeout) { if (terminated && !this.schemaTaskExecutor.isShutdown()) { this.schemaTaskExecutor.shutdown(); try { - terminated = this.schemaTaskExecutor.awaitTermination(timeout, - unit); + terminated = this.schemaTaskExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -322,8 +319,7 @@ public void shutdown(long timeout) { if (terminated && !this.olapTaskExecutor.isShutdown()) { this.olapTaskExecutor.shutdown(); try { - terminated = this.olapTaskExecutor.awaitTermination(timeout, - unit); + terminated = this.olapTaskExecutor.awaitTermination(timeout, unit); } catch (Throwable e) { ex = e; } @@ -360,8 +356,7 @@ public void onAsRoleMaster() { if (serverInfoManager != null) { serverInfoManager.changeServerRole(NodeRole.MASTER); } else { - LOG.warn("ServerInfoManager is null for graph {}", - entry.graphName()); + LOG.warn("ServerInfoManager is null for graph {}", entry.graphName()); } } } catch (Throwable e) { @@ -377,8 +372,7 @@ public void onAsRoleWorker() { if (serverInfoManager != null) { serverInfoManager.changeServerRole(NodeRole.WORKER); } else { - LOG.warn("ServerInfoManager is null for graph {}", - entry.graphName()); + LOG.warn("ServerInfoManager is null for graph {}", entry.graphName()); } } } catch (Throwable e) { @@ -387,8 +381,8 @@ public void onAsRoleWorker() { } } - protected void notifyNewTask(HugeTask task) { - Queue queue = ((ThreadPoolExecutor) this.schedulerExecutor) + void notifyNewTask(HugeTask task) { + Queue queue = this.schedulerExecutor .getQueue(); if (queue.size() <= 1) { /* @@ -406,10 +400,9 @@ private void scheduleOrExecuteJob() { // Called by scheduler timer try { for (TaskScheduler entry : this.schedulers.values()) { - TaskScheduler scheduler = entry; - // Maybe other thread close&remove scheduler at the same time - synchronized (scheduler) { - this.scheduleOrExecuteJobForGraph(scheduler); + // Maybe other threads close&remove scheduler at the same time + synchronized (entry) { + this.scheduleOrExecuteJobForGraph(entry); } } } catch (Throwable e) {