Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ public static synchronized ServerOptions instance() {
"master"
);

public static final ConfigOption<Boolean> ENABLE_SERVER_ROLE_ELECTION =
new ConfigOption<>(
"server.role_election",
"Whether to enable role election, if enabled, the server " +
"will elect a master node in the cluster.",
disallowEmpty(),
false
);

public static final ConfigOption<Integer> MAX_WORKER_THREADS =
new ConfigOption<>(
"restserver.max_worker_threads",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,8 @@ private void serverStarted(HugeConfig config) {

NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase());
boolean supportRoleElection = !nodeRole.computer() &&
this.supportRoleElection();
this.supportRoleElection() &&
config.get(ServerOptions.ENABLE_SERVER_ROLE_ELECTION);
if (supportRoleElection) {
// Init any server as Worker role, then do role election
nodeRole = NodeRole.WORKER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,29 @@
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

/**
* Central task management system that coordinates task scheduling and execution.
* Manages task schedulers for different graphs and handles role-based execution.
* <p>
* Note: The local master-worker mechanism will be deprecated in version 1.7
* (configuration has been removed from config files).
*/
public final class TaskManager {

private static final Logger LOG = Log.logger(TaskManager.class);

public static final String TASK_WORKER_PREFIX = "task-worker";
public static final String TASK_WORKER = TASK_WORKER_PREFIX + "-%d";
public static final String TASK_DB_WORKER = "task-db-worker-%d";
public static final String SERVER_INFO_DB_WORKER =
"server-info-db-worker-%d";
public static final String SERVER_INFO_DB_WORKER = "server-info-db-worker-%d";
public static final String TASK_SCHEDULER = "task-scheduler-%d";

public static final String OLAP_TASK_WORKER = "olap-task-worker-%d";
public static final String SCHEMA_TASK_WORKER = "schema-task-worker-%d";
public static final String EPHEMERAL_TASK_WORKER = "ephemeral-task-worker-%d";
public static final String DISTRIBUTED_TASK_SCHEDULER = "distributed-scheduler-%d";

protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
static final long SCHEDULE_PERIOD = 1000L; // unit ms
private static final long TX_CLOSE_TIMEOUT = 30L; // unit s
private static final int THREADS = 4;
private static final TaskManager MANAGER = new TaskManager(THREADS);
Expand Down Expand Up @@ -87,17 +93,13 @@ private TaskManager(int pool) {
this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
1, SERVER_INFO_DB_WORKER);

this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
SCHEMA_TASK_WORKER);
this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
OLAP_TASK_WORKER);
this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
EPHEMERAL_TASK_WORKER);
this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, SCHEMA_TASK_WORKER);
this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, OLAP_TASK_WORKER);
this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, EPHEMERAL_TASK_WORKER);
this.distributedSchedulerExecutor =
ExecutorUtil.newPausableScheduledThreadPool(1,
DISTRIBUTED_TASK_SCHEDULER);
ExecutorUtil.newPausableScheduledThreadPool(1, DISTRIBUTED_TASK_SCHEDULER);

// For schedule task to run, just one thread is ok
// For a schedule task to run, just one thread is ok
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
1, TASK_SCHEDULER);
// Start after 10x period time waiting for HugeGraphServer startup
Expand All @@ -111,7 +113,9 @@ public void addScheduler(HugeGraphParams graph) {
E.checkArgumentNotNull(graph, "The graph can't be null");
LOG.info("Use {} as the scheduler of graph ({})",
graph.schedulerType(), graph.name());
// TODO: If the current service is bound to a specified non-DEFAULT graph space, the graph outside of the current graph space will no longer create task schedulers (graph space)
// TODO: If the current service is bound to a specified non-DEFAULT graph space, the
// graph outside of the current graph space will no longer create task schedulers (graph
// space)
switch (graph.schedulerType()) {
case "distributed": {
TaskScheduler scheduler =
Expand Down Expand Up @@ -194,7 +198,7 @@ private void closeTaskTx(HugeGraphParams graph) {

private void closeSchedulerTx(HugeGraphParams graph) {
final Callable<Void> closeTx = () -> {
// Do close-tx for current thread
// Do close-tx for the current thread
graph.closeTx();
// Let other threads run
Thread.yield();
Expand All @@ -209,7 +213,7 @@ private void closeSchedulerTx(HugeGraphParams graph) {

private void closeDistributedSchedulerTx(HugeGraphParams graph) {
final Callable<Void> closeTx = () -> {
// Do close-tx for current thread
// Do close-tx for the current thread
graph.closeTx();
// Let other threads run
Thread.yield();
Expand Down Expand Up @@ -252,8 +256,7 @@ public void shutdown(long timeout) {
if (!this.schedulerExecutor.isShutdown()) {
this.schedulerExecutor.shutdown();
try {
terminated = this.schedulerExecutor.awaitTermination(timeout,
unit);
terminated = this.schedulerExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
Expand All @@ -262,8 +265,7 @@ public void shutdown(long timeout) {
if (terminated && !this.distributedSchedulerExecutor.isShutdown()) {
this.distributedSchedulerExecutor.shutdown();
try {
terminated = this.distributedSchedulerExecutor.awaitTermination(timeout,
unit);
terminated = this.distributedSchedulerExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
Expand All @@ -272,8 +274,7 @@ public void shutdown(long timeout) {
if (terminated && !this.taskExecutor.isShutdown()) {
this.taskExecutor.shutdown();
try {
terminated = this.taskExecutor.awaitTermination(timeout,
unit);
terminated = this.taskExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
Expand All @@ -282,8 +283,7 @@ public void shutdown(long timeout) {
if (terminated && !this.serverInfoDbExecutor.isShutdown()) {
this.serverInfoDbExecutor.shutdown();
try {
terminated = this.serverInfoDbExecutor.awaitTermination(timeout,
unit);
terminated = this.serverInfoDbExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
Expand All @@ -292,8 +292,7 @@ public void shutdown(long timeout) {
if (terminated && !this.taskDbExecutor.isShutdown()) {
this.taskDbExecutor.shutdown();
try {
terminated = this.taskDbExecutor.awaitTermination(timeout,
unit);
terminated = this.taskDbExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
Expand All @@ -302,8 +301,7 @@ public void shutdown(long timeout) {
if (terminated && !this.ephemeralTaskExecutor.isShutdown()) {
this.ephemeralTaskExecutor.shutdown();
try {
terminated = this.ephemeralTaskExecutor.awaitTermination(timeout,
unit);
terminated = this.ephemeralTaskExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
Expand All @@ -312,8 +310,7 @@ public void shutdown(long timeout) {
if (terminated && !this.schemaTaskExecutor.isShutdown()) {
this.schemaTaskExecutor.shutdown();
try {
terminated = this.schemaTaskExecutor.awaitTermination(timeout,
unit);
terminated = this.schemaTaskExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
Expand All @@ -322,8 +319,7 @@ public void shutdown(long timeout) {
if (terminated && !this.olapTaskExecutor.isShutdown()) {
this.olapTaskExecutor.shutdown();
try {
terminated = this.olapTaskExecutor.awaitTermination(timeout,
unit);
terminated = this.olapTaskExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
Expand Down Expand Up @@ -356,9 +352,12 @@ public void enableRoleElection() {
public void onAsRoleMaster() {
try {
for (TaskScheduler entry : this.schedulers.values()) {
StandardTaskScheduler scheduler = (StandardTaskScheduler) entry;
ServerInfoManager serverInfoManager = scheduler.serverManager();
serverInfoManager.changeServerRole(NodeRole.MASTER);
ServerInfoManager serverInfoManager = entry.serverManager();
if (serverInfoManager != null) {
serverInfoManager.changeServerRole(NodeRole.MASTER);
} else {
LOG.warn("ServerInfoManager is null for graph {}", entry.graphName());
}
}
} catch (Throwable e) {
LOG.error("Exception occurred when change to master role", e);
Expand All @@ -369,18 +368,21 @@ public void onAsRoleMaster() {
public void onAsRoleWorker() {
try {
for (TaskScheduler entry : this.schedulers.values()) {
StandardTaskScheduler scheduler = (StandardTaskScheduler) entry;
ServerInfoManager serverInfoManager = scheduler.serverManager();
serverInfoManager.changeServerRole(NodeRole.WORKER);
ServerInfoManager serverInfoManager = entry.serverManager();
if (serverInfoManager != null) {
serverInfoManager.changeServerRole(NodeRole.WORKER);
} else {
LOG.warn("ServerInfoManager is null for graph {}", entry.graphName());
}
}
} catch (Throwable e) {
LOG.error("Exception occurred when change to worker role", e);
throw e;
}
}

protected void notifyNewTask(HugeTask<?> task) {
Queue<Runnable> queue = ((ThreadPoolExecutor) this.schedulerExecutor)
void notifyNewTask(HugeTask<?> task) {
Queue<Runnable> queue = this.schedulerExecutor
.getQueue();
if (queue.size() <= 1) {
/*
Expand All @@ -398,10 +400,9 @@ private void scheduleOrExecuteJob() {
// Called by scheduler timer
try {
for (TaskScheduler entry : this.schedulers.values()) {
TaskScheduler scheduler = entry;
// Maybe other thread close&remove scheduler at the same time
synchronized (scheduler) {
this.scheduleOrExecuteJobForGraph(scheduler);
// Maybe other threads close&remove scheduler at the same time
synchronized (entry) {
this.scheduleOrExecuteJobForGraph(entry);
}
}
} catch (Throwable e) {
Expand Down
Loading