Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(server): optimize the server-node info #2671

Merged
merged 3 commits into from
Oct 10, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
tiny improve
imbajin committed Oct 9, 2024
commit e8c74013dee97c297f6a04277cacee8f2ea47da9
Original file line number Diff line number Diff line change
@@ -316,8 +316,7 @@ private void initRoleStateMachine(Id serverId) {
conf.get(
RoleElectionOptions.BASE_TIMEOUT_MILLISECOND));
ClusterRoleStore roleStore = new StandardClusterRoleStore(this.params);
this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleConfig,
roleStore);
this.roleElectionStateMachine = new StandardRoleElectionStateMachine(roleConfig, roleStore);
}

@Override
@@ -1007,7 +1006,7 @@ public void create(String configPath, GlobalMasterInfo nodeInfo) {
this.initBackend();
this.serverStarted(nodeInfo);

// Write config to disk file
// Write config to the disk file
String confPath = ConfigUtil.writeToFile(configPath, this.name(),
this.configuration());
this.configuration.file(confPath);
@@ -1349,7 +1348,7 @@ public String schedulerType() {

private class TinkerPopTransaction extends AbstractThreadLocalTransaction {

// Times opened from upper layer
// Times opened from the upper layer
private final AtomicInteger refs;
// Flag opened of each thread
private final ThreadLocal<Boolean> opened;
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

@@ -47,14 +46,13 @@
public class HugeServerInfo {

// Unit millisecond
private static final long EXPIRED_INTERVAL =
TaskManager.SCHEDULE_PERIOD * 10;
private static final long EXPIRED_INTERVAL = TaskManager.SCHEDULE_PERIOD * 10;

private Id id;
private NodeRole role;
private Date updateTime;
private int maxLoad;
private int load;
private Date updateTime;
private final Id id;

private transient boolean updated = false;

@@ -115,7 +113,7 @@
}

public long expireTime() {
return this.updateTime.getTime() + EXPIRED_INTERVAL;

Check warning on line 116 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java#L116

Added line #L116 was not covered by tests
}

public Date updateTime() {
@@ -204,8 +202,7 @@

public static HugeServerInfo fromVertex(Vertex vertex) {
HugeServerInfo serverInfo = new HugeServerInfo((Id) vertex.id());
for (Iterator<VertexProperty<Object>> iter = vertex.properties();
iter.hasNext(); ) {
for (var iter = vertex.properties(); iter.hasNext(); ) {
VertexProperty<Object> prop = iter.next();
serverInfo.property(prop.key(), prop.value());
}
@@ -250,7 +247,7 @@

public static final String SERVER = P.SERVER;

protected final HugeGraphParams graph;
private final HugeGraphParams graph;

public Schema(HugeGraphParams graph) {
this.graph = graph;
@@ -268,16 +265,14 @@
VertexLabel label = graph.schema().vertexLabel(SERVER)
.properties(properties)
.useCustomizeStringId()
.nullableKeys(P.ROLE, P.MAX_LOAD,
P.LOAD, P.UPDATE_TIME)
.nullableKeys(P.ROLE, P.MAX_LOAD, P.LOAD, P.UPDATE_TIME)
.enableLabelIndex(true)
.build();
this.graph.schemaTransaction().addVertexLabel(label);
}

private String[] initProperties() {
List<String> props = new ArrayList<>();

props.add(createPropertyKey(P.ROLE, DataType.BYTE));
props.add(createPropertyKey(P.MAX_LOAD, DataType.INT));
props.add(createPropertyKey(P.LOAD, DataType.INT));
@@ -287,8 +282,7 @@
}

public boolean existVertexLabel(String label) {
return this.graph.schemaTransaction()
.getVertexLabel(label) != null;
return this.graph.schemaTransaction().getVertexLabel(label) != null;
}

@SuppressWarnings("unused")
@@ -300,8 +294,7 @@
return this.createPropertyKey(name, dataType, Cardinality.SINGLE);
}

private String createPropertyKey(String name, DataType dataType,
Cardinality cardinality) {
private String createPropertyKey(String name, DataType dataType, Cardinality cardinality) {
SchemaManager schema = this.graph.graph().schema();
PropertyKey propertyKey = schema.propertyKey(name)
.dataType(dataType)
Original file line number Diff line number Diff line change
@@ -67,8 +67,7 @@
private volatile boolean onlySingleNode;
private volatile boolean closed;

public ServerInfoManager(HugeGraphParams graph,
ExecutorService dbExecutor) {
public ServerInfoManager(HugeGraphParams graph, ExecutorService dbExecutor) {
E.checkNotNull(graph, "graph");
E.checkNotNull(dbExecutor, "db executor");

@@ -108,21 +107,19 @@
Id serverId = nodeInfo.nodeId();
HugeServerInfo existed = this.serverInfo(serverId);
if (existed != null && existed.alive()) {
final long now = DateUtil.now().getTime();

Check warning on line 110 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java#L110

Added line #L110 was not covered by tests
if (existed.expireTime() > now + 30 * 1000) {
LOG.info("The node time maybe skew very much: {}", existed);
throw new HugeException("The server with name '%s' maybe skew very much", serverId);

Check warning on line 113 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java#L112-L113

Added lines #L112 - L113 were not covered by tests
}
try {
Thread.sleep(existed.expireTime() - now + 1);
} catch (InterruptedException e) {
throw new HugeException("Interrupted when waiting for server " +
"info expired", e);
throw new HugeException("Interrupted when waiting for server info expired", e);
}

Check warning on line 119 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java#L116-L119

Added lines #L116 - L119 were not covered by tests
}
E.checkArgument(existed == null || !existed.alive(),
"The server with name '%s' already in cluster",
serverId);
"The server with name '%s' already in cluster", serverId);

if (nodeInfo.nodeRole().master()) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
@@ -198,13 +195,12 @@
/* ServerInfo is missing */
if (this.selfNodeId() == null) {
// Ignore if ServerInfo is not initialized
LOG.info("ServerInfo is missing: {}, may not be initialized yet");
LOG.info("ServerInfo is missing: {}, may not be initialized yet", this.selfNodeId());

Check warning on line 198 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java#L198

Added line #L198 was not covered by tests
return;
}
if (this.selfIsMaster()) {
// On master node, just wait for ServerInfo re-init
LOG.warn("ServerInfo is missing: {}, may be cleared before",
this.selfNodeId());
// On the master node, just wait for ServerInfo re-init
LOG.warn("ServerInfo is missing: {}, may be cleared before", this.selfNodeId());
return;
}
/*
@@ -245,12 +241,10 @@
if (!server.alive()) {
continue;
}

if (server.role().master()) {
master = server;
continue;
}

hasWorkerNode = true;
if (!server.suitableFor(task, now)) {
continue;
@@ -267,13 +261,12 @@
this.onlySingleNode = singleNode;
}

// Only schedule to master if there is no workers and master is suitable
// Only schedule to master if there are no workers and master are suitable
if (!hasWorkerNode) {
if (master != null && master.suitableFor(task, now)) {
serverWithMinLoad = master;
}
}

return serverWithMinLoad;
}

@@ -299,8 +292,7 @@
throw new HugeException("Schema is missing for %s '%s'",
HugeServerInfo.P.SERVER, serverInfo);
}
HugeVertex vertex = this.tx().constructVertex(false,
serverInfo.asArray());
HugeVertex vertex = this.tx().constructVertex(false, serverInfo.asArray());
// Add or update server info in backend store
vertex = this.tx().addVertex(vertex);
return vertex.id();
@@ -314,8 +306,7 @@
}
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
throw new HugeException("Schema is missing for %s",
HugeServerInfo.P.SERVER);
throw new HugeException("Schema is missing for %s", HugeServerInfo.P.SERVER);

Check warning on line 309 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java#L309

Added line #L309 was not covered by tests
}
// Save server info in batch
GraphTransaction tx = this.tx();
Original file line number Diff line number Diff line change
@@ -120,7 +120,7 @@
if (this.taskTx == null) {
BackendStore store = this.graph.loadSystemStore();
TaskTransaction tx = new TaskTransaction(this.graph, store);
assert this.taskTx == null; // may be reentrant?
assert this.taskTx == null; // maybe reentrant?
this.taskTx = tx;
}
}
@@ -196,7 +196,7 @@

if (this.serverManager().onlySingleNode() && !task.computer()) {
/*
* Speed up for single node, submit task immediately,
* Speed up for single node, submit the task immediately,
* this code can be removed without affecting code logic
*/
task.status(TaskStatus.QUEUED);
@@ -205,7 +205,7 @@
return this.submitTask(task);
} else {
/*
* Just set SCHEDULING status and save task,
* Just set the SCHEDULING status and save the task,
* it will be scheduled by periodic scheduler worker
*/
task.status(TaskStatus.SCHEDULING);
@@ -276,11 +276,11 @@
assert this.serverManager().selfIsMaster();
if (!task.server().equals(this.serverManager().selfNodeId())) {
/*
* Remove task from memory if it's running on worker node,
* but keep task in memory if it's running on master node.
* cancel-scheduling will read task from backend store, if
* Remove the task from memory if it's running on worker node,
* but keep the task in memory if it's running on master node.
* Cancel-scheduling will read the task from backend store, if
* removed this instance from memory, there will be two task
* instances with same id, and can't cancel the real task that
* instances with the same id, and can't cancel the real task that
* is running but removed from memory.
*/
this.remove(task);
@@ -301,12 +301,10 @@

protected synchronized void scheduleTasksOnMaster() {
// Master server schedule all scheduling tasks to suitable worker nodes
Collection<HugeServerInfo> serverInfos = this.serverManager()
.allServerInfos();
Collection<HugeServerInfo> serverInfos = this.serverManager().allServerInfos();
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULING,
PAGE_SIZE, page);
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
if (task.server() != null) {
@@ -318,12 +316,10 @@
return;
}

HugeServerInfo server = this.serverManager().pickWorkerNode(
serverInfos, task);
HugeServerInfo server = this.serverManager().pickWorkerNode(serverInfos, task);
if (server == null) {
LOG.info("The master can't find suitable servers to " +
"execute task '{}', wait for next schedule",
task.id());
"execute task '{}', wait for next schedule", task.id());

Check warning on line 322 in hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java

Codecov / codecov/patch

hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java#L322

Added line #L322 was not covered by tests
continue;
}

@@ -336,8 +332,7 @@
// Update server load in memory, it will be saved at the ending
server.increaseLoad(task.load());

LOG.info("Scheduled task '{}' to server '{}'",
task.id(), server.id());
LOG.info("Scheduled task '{}' to server '{}'", task.id(), server.id());
}
if (page != null) {
page = PageInfo.pageInfo(tasks);
@@ -351,8 +346,7 @@
protected void executeTasksOnWorker(Id server) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULED,
PAGE_SIZE, page);
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
this.initTaskCallable(task);
@@ -381,8 +375,7 @@
protected void cancelTasksOnWorker(Id server) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.CANCELLING,
PAGE_SIZE, page);
Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
Id taskServer = task.server();
@@ -557,10 +550,10 @@

HugeTask<?> task = this.task(id);
/*
* The following is out of date when task running on worker node:
* The following is out of date when the task running on worker node:
* HugeTask<?> task = this.tasks.get(id);
* Tasks are removed from memory after completed at most time,
* but there is a tiny gap between tasks are completed and
* but there is a tiny gap between tasks is completed and
* removed from memory.
* We assume tasks only in memory may be incomplete status,
* in fact, it is also possible to appear on the backend tasks
@@ -621,7 +614,7 @@
throw e;
}
if (task.completed()) {
// Wait for task result being set after status is completed
// Wait for the task result being set after the status is completed
sleep(intervalMs);
return task;
}