diff --git a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/client/PDPulseTest.java b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/client/PDPulseTest.java
index dfdc63cf36..220ec80f15 100644
--- a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/client/PDPulseTest.java
+++ b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/client/PDPulseTest.java
@@ -24,6 +24,7 @@
import org.apache.hugegraph.pd.grpc.pulse.PartitionHeartbeatRequest;
import org.apache.hugegraph.pd.pulse.PulseServerNotice;
import org.junit.BeforeClass;
+import org.junit.Test;
public class PDPulseTest {
private static PDClient pdClient;
@@ -40,7 +41,7 @@ public static void beforeClass() throws Exception {
pdClient.getLeader();
}
- // @Test
+ @Test
public void listen() {
PDPulse pulse = new PDPulseImpl(pdClient.getLeaderIp());
@@ -106,4 +107,4 @@ public void onCompleted() {
HgPDTestUtil.println(this.listenerName + " is completed");
}
}
-}
\ No newline at end of file
+}
diff --git a/hugegraph-pd/pom.xml b/hugegraph-pd/pom.xml
index 743ead0d19..26181e5a40 100644
--- a/hugegraph-pd/pom.xml
+++ b/hugegraph-pd/pom.xml
@@ -135,7 +135,7 @@
*.tar.gz
.flattened-pom.xml
-
+ dist/**
false
diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index c496483ac7..4551b00af8 100644
--- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -18,7 +18,6 @@
package org.apache.hugegraph;
import java.util.Collection;
-import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -179,8 +178,6 @@ public class StandardHugeGraph implements HugeGraph {
private final RamTable ramtable;
- private final MetaManager metaManager = MetaManager.instance();
-
private final String schedulerType;
public StandardHugeGraph(HugeConfig config) {
@@ -230,7 +227,10 @@ public StandardHugeGraph(HugeConfig config) {
}
if (isHstore()) {
- initMetaManager();
+ // TODO: parameterize the remaining configurations
+ MetaManager.instance().connect("hg", MetaManager.MetaDriverType.PD,
+ "ca", "ca", "ca",
+ config.get(CoreOptions.PD_PEERS));
}
try {
@@ -470,12 +470,6 @@ private boolean isHstore() {
return this.storeProvider.isHstore();
}
- private void initMetaManager() {
- this.metaManager.connect("hg", MetaManager.MetaDriverType.PD,
- "ca", "ca", "ca",
- Collections.singletonList("127.0.0.1:8686"));
- }
-
private ISchemaTransaction openSchemaTransaction() throws HugeException {
this.checkGraphNotClosed();
try {
diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
index 9f17b0caef..3697382f57 100644
--- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
+++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java
@@ -674,4 +674,11 @@ public static synchronized CoreOptions instance() {
CollectionType::valueOf,
"EC"
);
+
+ public static final ConfigOption PD_PEERS = new ConfigOption<>(
+ "pd.peers",
+ "The addresses of pd nodes, separated with commas.",
+ disallowEmpty(),
+ "127.0.0.1:8686"
+ );
}
diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties
index 74e1408c70..1a3532914b 100644
--- a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties
+++ b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties
@@ -24,6 +24,16 @@ serializer=binary
store=hugegraph
+# pd config
+pd.peers=127.0.0.1:8686
+
+# task config
+task.scheduler_type=local
+task.schedule_period=10
+task.retry=0
+task.wait_timeout=10
+
+# raft config
raft.mode=false
raft.path=./raft-log
raft.safe_read=true
@@ -45,6 +55,7 @@ raft.rpc_connect_timeout=5000
raft.rpc_timeout=60
raft.install_snapshot_rpc_timeout=36000
+# search config
search.text_analyzer=jieba
search.text_analyzer_mode=INDEX
diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreNodePartitionerImpl.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreNodePartitionerImpl.java
deleted file mode 100644
index 2a69fe1c03..0000000000
--- a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreNodePartitionerImpl.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hugegraph.backend.store.hstore;
-
-import static org.apache.hugegraph.store.client.util.HgStoreClientConst.ALL_PARTITION_OWNER;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.hugegraph.config.HugeConfig;
-import org.apache.hugegraph.pd.client.PDClient;
-import org.apache.hugegraph.pd.common.KVPair;
-import org.apache.hugegraph.pd.common.PDException;
-import org.apache.hugegraph.pd.common.PartitionUtils;
-import org.apache.hugegraph.pd.grpc.Metapb;
-import org.apache.hugegraph.store.client.HgNodePartition;
-import org.apache.hugegraph.store.client.HgNodePartitionerBuilder;
-import org.apache.hugegraph.store.client.HgStoreNode;
-import org.apache.hugegraph.store.client.HgStoreNodeManager;
-import org.apache.hugegraph.store.client.HgStoreNodeNotifier;
-import org.apache.hugegraph.store.client.HgStoreNodePartitioner;
-import org.apache.hugegraph.store.client.HgStoreNodeProvider;
-import org.apache.hugegraph.store.client.HgStoreNotice;
-import org.apache.hugegraph.store.client.type.HgNodeStatus;
-import org.apache.hugegraph.store.client.util.HgStoreClientConst;
-import org.apache.hugegraph.util.Log;
-import org.slf4j.Logger;
-
-public class HstoreNodePartitionerImpl implements HgStoreNodePartitioner,
- HgStoreNodeProvider,
- HgStoreNodeNotifier {
-
- private static final Logger LOG = Log.logger(HstoreNodePartitionerImpl.class);
- private PDClient pdClient;
- private HgStoreNodeManager nodeManager;
-
- protected HstoreNodePartitionerImpl() {
-
- }
-
- public HstoreNodePartitionerImpl(String pdPeers) {
- pdClient = HstoreSessionsImpl.getDefaultPdClient();
- }
-
- public HstoreNodePartitionerImpl(HgStoreNodeManager nodeManager,
- String pdPeers) {
- this(pdPeers);
- this.nodeManager = nodeManager;
- }
-
- public void setPDClient(PDClient pdClient) {
- this.pdClient = pdClient;
- }
-
- /**
- * 查询分区信息,结果通过HgNodePartitionerBuilder返回
- */
- @Override
- public int partition(HgNodePartitionerBuilder builder, String graphName,
- byte[] startKey, byte[] endKey) {
- try {
- HashSet partitions = null;
- if (HgStoreClientConst.ALL_PARTITION_OWNER == startKey) {
- List stores = pdClient.getActiveStores(graphName);
- partitions = new HashSet<>(stores.size());
- for (Metapb.Store store : stores) {
- partitions.add(HgNodePartition.of(store.getId(), -1));
- }
-
- } else if (endKey == HgStoreClientConst.EMPTY_BYTES
- || startKey == endKey || Arrays.equals(startKey, endKey)) {
- KVPair partShard =
- pdClient.getPartition(graphName, startKey);
- Metapb.Shard leader = partShard.getValue();
- partitions = new HashSet<>(1);
- partitions.add(HgNodePartition.of(leader.getStoreId(),
- pdClient.keyToCode(graphName, startKey)));
- } else {
- LOG.warn(
- "StartOwnerkey is not equal to endOwnerkey, which is meaningless!!, It is" +
- " a error!!");
- List stores = pdClient.getActiveStores(graphName);
- for (Metapb.Store store : stores) {
- partitions.add(HgNodePartition.of(store.getId(), -1));
- }
- }
- builder.setPartitions(partitions);
- } catch (PDException e) {
- LOG.error("An error occurred while getting partition information :{}", e.getMessage());
- throw new RuntimeException(e.getMessage(), e);
- }
- return 0;
- }
-
- @Override
- public int partition(HgNodePartitionerBuilder builder, String graphName,
- int startKey, int endKey) {
- try {
- HashSet partitions = new HashSet<>();
- Metapb.Partition partition = null;
- while ((partition == null || partition.getEndKey() < endKey)
- && startKey < PartitionUtils.MAX_VALUE) {
- KVPair partShard =
- pdClient.getPartitionByCode(graphName, startKey);
- if (partShard != null) {
- partition = partShard.getKey();
- Metapb.Shard leader = partShard.getValue();
- partitions.add(HgNodePartition.of(leader.getStoreId(), startKey,
- (int) partition.getStartKey(),
- (int) partition.getEndKey()));
- startKey = (int) partition.getEndKey();
- } else {
- break;
- }
- }
- builder.setPartitions(partitions);
- } catch (PDException e) {
- LOG.error("An error occurred while getting partition information :{}", e.getMessage());
- throw new RuntimeException(e.getMessage(), e);
- }
- return 0;
-
- }
-
- /**
- * 查询hgstore信息
- *
- * @return hgstore
- */
- @Override
- public HgStoreNode apply(String graphName, Long nodeId) {
- try {
- Metapb.Store store = pdClient.getStore(nodeId);
- return nodeManager.getNodeBuilder().setNodeId(store.getId())
- .setAddress(store.getAddress()).build();
- } catch (PDException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- /**
- * 通知更新缓存
- */
- @Override
- public int notice(String graphName, HgStoreNotice storeNotice) {
- LOG.warn(storeNotice.toString());
- if (storeNotice.getPartitionLeaders() != null) {
- storeNotice.getPartitionLeaders().forEach((partId, leader) -> {
- pdClient.updatePartitionLeader(graphName, partId, leader);
- LOG.warn("updatePartitionLeader:{}-{}-{}",
- graphName, partId, leader);
- });
- }
- if (storeNotice.getPartitionIds() != null) {
- storeNotice.getPartitionIds().forEach(partId -> {
- pdClient.invalidPartitionCache(graphName, partId);
- });
- }
- if (!storeNotice.getNodeStatus().equals(
- HgNodeStatus.PARTITION_COMMON_FAULT)
- && !storeNotice.getNodeStatus().equals(
- HgNodeStatus.NOT_PARTITION_LEADER)) {
- pdClient.invalidPartitionCache();
- LOG.warn("invalidPartitionCache:{} ", storeNotice.getNodeStatus());
- }
- return 0;
- }
-
- public Metapb.Graph delGraph(String graphName) {
- try {
- return pdClient.delGraph(graphName);
- } catch (PDException e) {
- LOG.error("delGraph {} exception, {}", graphName, e.getMessage());
- }
- return null;
- }
-
- public void setNodeManager(HgStoreNodeManager nodeManager) {
- this.nodeManager = nodeManager;
- }
-}
-
-class FakeHstoreNodePartitionerImpl extends HstoreNodePartitionerImpl {
- private static final Logger LOG = Log.logger(HstoreNodePartitionerImpl.class);
- private static final int partitionCount = 3;
- private static final Map leaderMap = new ConcurrentHashMap<>();
- private static final Map storeMap = new ConcurrentHashMap<>();
- HgStoreNodeManager nodeManager;
- private final String hstorePeers;
-
- public FakeHstoreNodePartitionerImpl(String pdPeers) {
- this.hstorePeers = pdPeers;
- // store列表
- for (String address : hstorePeers.split(",")) {
- storeMap.put((long) address.hashCode(), address);
- }
- // 分区列表
- for (int i = 0; i < partitionCount; i++) {
- leaderMap.put(i, storeMap.keySet().iterator().next());
- }
- }
-
- public FakeHstoreNodePartitionerImpl(HgStoreNodeManager nodeManager,
- String peers) {
- this(peers);
- this.nodeManager = nodeManager;
- }
-
- @Override
- public int partition(HgNodePartitionerBuilder builder, String graphName,
- byte[] startKey, byte[] endKey) {
- int startCode = PartitionUtils.calcHashcode(startKey);
- HashSet partitions = new HashSet<>(storeMap.size());
- if (ALL_PARTITION_OWNER == startKey) {
- storeMap.forEach((k, v) -> {
- partitions.add(HgNodePartition.of(k, -1));
- });
- } else if (endKey == HgStoreClientConst.EMPTY_BYTES || startKey == endKey ||
- Arrays.equals(startKey, endKey)) {
- partitions.add(
- HgNodePartition.of(leaderMap.get(startCode % partitionCount), startCode));
- } else {
- LOG.error("OwnerKey转成HashCode后已经无序了, 按照OwnerKey范围查询没意义");
- storeMap.forEach((k, v) -> {
- partitions.add(HgNodePartition.of(k, -1));
- });
- }
- builder.setPartitions(partitions);
- return 0;
- }
-
- @Override
- public HgStoreNode apply(String graphName, Long nodeId) {
- return nodeManager.getNodeBuilder().setNodeId(nodeId)
- .setAddress(storeMap.get(nodeId)).build();
- }
-
- @Override
- public int notice(String graphName, HgStoreNotice storeNotice) {
- if (storeNotice.getPartitionLeaders() != null
- && storeNotice.getPartitionLeaders().size() > 0) {
- leaderMap.putAll(storeNotice.getPartitionLeaders());
- }
- return 0;
- }
-
- public static class NodePartitionerFactory {
- public static HstoreNodePartitionerImpl getNodePartitioner(
- HugeConfig config, HgStoreNodeManager nodeManager) {
- if (config.get(HstoreOptions.PD_FAKE)) {
- return new FakeHstoreNodePartitionerImpl(nodeManager,
- config.get(HstoreOptions.HSTORE_PEERS));
- } else {
- return new HstoreNodePartitionerImpl(nodeManager,
- config.get(HstoreOptions.PD_PEERS)
- );
- }
-
- }
- }
-}
diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java
index bafde45f82..6de800697c 100644
--- a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java
+++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java
@@ -24,24 +24,6 @@
public class HstoreOptions extends OptionHolder {
- public static final ConfigOption PD_PEERS = new ConfigOption<>(
- "pd.peers",
- "The addresses of pd nodes, separated with commas.",
- disallowEmpty(),
- "localhost:8686"
- );
- public static final ConfigOption PD_FAKE = new ConfigOption<>(
- "pd.fake",
- "Enable the fake PD service.",
- disallowEmpty(),
- false
- );
- public static final ConfigOption HSTORE_PEERS = new ConfigOption<>(
- "hstore.peers",
- "The addresses of store nodes, separated with commas.",
- disallowEmpty(),
- "localhost:9080"
- );
public static final ConfigOption PARTITION_COUNT = new ConfigOption<>(
"hstore.partition_count",
"Number of partitions, which PD controls partitions based on.",
diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
index be90902dad..27de0e029b 100755
--- a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
+++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java
@@ -37,6 +37,7 @@
import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn;
import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator;
import org.apache.hugegraph.backend.store.BackendEntryIterator;
+import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.pd.client.PDClient;
import org.apache.hugegraph.pd.client.PDConfig;
@@ -103,7 +104,7 @@ private void initStoreNode(HugeConfig config) {
synchronized (this) {
if (!initializedNode) {
PDConfig pdConfig =
- PDConfig.of(config.get(HstoreOptions.PD_PEERS))
+ PDConfig.of(config.get(CoreOptions.PD_PEERS))
.setEnableCache(true);
defaultPdClient = PDClient.create(pdConfig);
hgStoreClient =
@@ -244,7 +245,9 @@ public ColumnIterator(String table, T results, byte[] keyBegin,
this.position = iter.position();
} else {
this.gotNext = false;
- this.position = null;
+ // QUESTION: Resetting the position may result in the caller being unable to
+ // retrieve the corresponding position.
+ // this.position = null;
}
if (!ArrayUtils.isEmpty(this.keyBegin) ||
!ArrayUtils.isEmpty(this.keyEnd)) {
@@ -315,7 +318,9 @@ public boolean hasNext() {
if (gotNext) {
this.position = this.iter.position();
} else {
- this.position = null;
+ // QUESTION: Resetting the position may result in the caller being unable to
+ // retrieve the corresponding position.
+ // this.position = null;
}
return gotNext;
}
diff --git a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/grpc/KvPageScanner.java b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/grpc/KvPageScanner.java
index c9e99d9b70..e6ed4a729e 100644
--- a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/grpc/KvPageScanner.java
+++ b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/grpc/KvPageScanner.java
@@ -194,8 +194,10 @@ public boolean hasNext() {
if (!this.in) {
return false;
}
- if (this.iterator != null && this.iterator.hasNext()) {
- return true;
+ // QUESTION: After `this.iterator.hasNext()` evaluates to false,
+ // no further attempts are made to reconstruct the iterator.
+ if (this.iterator != null) {
+ return this.iterator.hasNext();
}
long start = 0;
boolean debugEnabled = log.isDebugEnabled();
diff --git a/hugegraph-store/pom.xml b/hugegraph-store/pom.xml
index bc6154ad12..36f8b6982f 100644
--- a/hugegraph-store/pom.xml
+++ b/hugegraph-store/pom.xml
@@ -153,7 +153,7 @@
*.tar.gz
.flattened-pom.xml
-
+ dist/**
false