diff --git a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/client/PDPulseTest.java b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/client/PDPulseTest.java index dfdc63cf36..220ec80f15 100644 --- a/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/client/PDPulseTest.java +++ b/hugegraph-pd/hg-pd-test/src/main/java/org/apache/hugegraph/pd/client/PDPulseTest.java @@ -24,6 +24,7 @@ import org.apache.hugegraph.pd.grpc.pulse.PartitionHeartbeatRequest; import org.apache.hugegraph.pd.pulse.PulseServerNotice; import org.junit.BeforeClass; +import org.junit.Test; public class PDPulseTest { private static PDClient pdClient; @@ -40,7 +41,7 @@ public static void beforeClass() throws Exception { pdClient.getLeader(); } - // @Test + @Test public void listen() { PDPulse pulse = new PDPulseImpl(pdClient.getLeaderIp()); @@ -106,4 +107,4 @@ public void onCompleted() { HgPDTestUtil.println(this.listenerName + " is completed"); } } -} \ No newline at end of file +} diff --git a/hugegraph-pd/pom.xml b/hugegraph-pd/pom.xml index 743ead0d19..26181e5a40 100644 --- a/hugegraph-pd/pom.xml +++ b/hugegraph-pd/pom.xml @@ -135,7 +135,7 @@ *.tar.gz .flattened-pom.xml - + dist/** false diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index c496483ac7..4551b00af8 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -18,7 +18,6 @@ package org.apache.hugegraph; import java.util.Collection; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -179,8 +178,6 @@ public class StandardHugeGraph implements HugeGraph { private final RamTable ramtable; - private final MetaManager metaManager = MetaManager.instance(); - private final String schedulerType; public StandardHugeGraph(HugeConfig config) { @@ -230,7 +227,10 @@ public StandardHugeGraph(HugeConfig config) { } if (isHstore()) { - initMetaManager(); + // TODO: parameterize the remaining configurations + MetaManager.instance().connect("hg", MetaManager.MetaDriverType.PD, + "ca", "ca", "ca", + config.get(CoreOptions.PD_PEERS)); } try { @@ -470,12 +470,6 @@ private boolean isHstore() { return this.storeProvider.isHstore(); } - private void initMetaManager() { - this.metaManager.connect("hg", MetaManager.MetaDriverType.PD, - "ca", "ca", "ca", - Collections.singletonList("127.0.0.1:8686")); - } - private ISchemaTransaction openSchemaTransaction() throws HugeException { this.checkGraphNotClosed(); try { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java index 9f17b0caef..3697382f57 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java @@ -674,4 +674,11 @@ public static synchronized CoreOptions instance() { CollectionType::valueOf, "EC" ); + + public static final ConfigOption PD_PEERS = new ConfigOption<>( + "pd.peers", + "The addresses of pd nodes, separated with commas.", + disallowEmpty(), + "127.0.0.1:8686" + ); } diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties index 74e1408c70..1a3532914b 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties +++ b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties @@ -24,6 +24,16 @@ serializer=binary store=hugegraph +# pd config +pd.peers=127.0.0.1:8686 + +# task config +task.scheduler_type=local +task.schedule_period=10 +task.retry=0 +task.wait_timeout=10 + +# raft config raft.mode=false raft.path=./raft-log raft.safe_read=true @@ -45,6 +55,7 @@ raft.rpc_connect_timeout=5000 raft.rpc_timeout=60 raft.install_snapshot_rpc_timeout=36000 +# search config search.text_analyzer=jieba search.text_analyzer_mode=INDEX diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreNodePartitionerImpl.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreNodePartitionerImpl.java deleted file mode 100644 index 2a69fe1c03..0000000000 --- a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreNodePartitionerImpl.java +++ /dev/null @@ -1,279 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hugegraph.backend.store.hstore; - -import static org.apache.hugegraph.store.client.util.HgStoreClientConst.ALL_PARTITION_OWNER; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hugegraph.config.HugeConfig; -import org.apache.hugegraph.pd.client.PDClient; -import org.apache.hugegraph.pd.common.KVPair; -import org.apache.hugegraph.pd.common.PDException; -import org.apache.hugegraph.pd.common.PartitionUtils; -import org.apache.hugegraph.pd.grpc.Metapb; -import org.apache.hugegraph.store.client.HgNodePartition; -import org.apache.hugegraph.store.client.HgNodePartitionerBuilder; -import org.apache.hugegraph.store.client.HgStoreNode; -import org.apache.hugegraph.store.client.HgStoreNodeManager; -import org.apache.hugegraph.store.client.HgStoreNodeNotifier; -import org.apache.hugegraph.store.client.HgStoreNodePartitioner; -import org.apache.hugegraph.store.client.HgStoreNodeProvider; -import org.apache.hugegraph.store.client.HgStoreNotice; -import org.apache.hugegraph.store.client.type.HgNodeStatus; -import org.apache.hugegraph.store.client.util.HgStoreClientConst; -import org.apache.hugegraph.util.Log; -import org.slf4j.Logger; - -public class HstoreNodePartitionerImpl implements HgStoreNodePartitioner, - HgStoreNodeProvider, - HgStoreNodeNotifier { - - private static final Logger LOG = Log.logger(HstoreNodePartitionerImpl.class); - private PDClient pdClient; - private HgStoreNodeManager nodeManager; - - protected HstoreNodePartitionerImpl() { - - } - - public HstoreNodePartitionerImpl(String pdPeers) { - pdClient = HstoreSessionsImpl.getDefaultPdClient(); - } - - public HstoreNodePartitionerImpl(HgStoreNodeManager nodeManager, - String pdPeers) { - this(pdPeers); - this.nodeManager = nodeManager; - } - - public void setPDClient(PDClient pdClient) { - this.pdClient = pdClient; - } - - /** - * 查询分区信息,结果通过HgNodePartitionerBuilder返回 - */ - @Override - public int partition(HgNodePartitionerBuilder builder, String graphName, - byte[] startKey, byte[] endKey) { - try { - HashSet partitions = null; - if (HgStoreClientConst.ALL_PARTITION_OWNER == startKey) { - List stores = pdClient.getActiveStores(graphName); - partitions = new HashSet<>(stores.size()); - for (Metapb.Store store : stores) { - partitions.add(HgNodePartition.of(store.getId(), -1)); - } - - } else if (endKey == HgStoreClientConst.EMPTY_BYTES - || startKey == endKey || Arrays.equals(startKey, endKey)) { - KVPair partShard = - pdClient.getPartition(graphName, startKey); - Metapb.Shard leader = partShard.getValue(); - partitions = new HashSet<>(1); - partitions.add(HgNodePartition.of(leader.getStoreId(), - pdClient.keyToCode(graphName, startKey))); - } else { - LOG.warn( - "StartOwnerkey is not equal to endOwnerkey, which is meaningless!!, It is" + - " a error!!"); - List stores = pdClient.getActiveStores(graphName); - for (Metapb.Store store : stores) { - partitions.add(HgNodePartition.of(store.getId(), -1)); - } - } - builder.setPartitions(partitions); - } catch (PDException e) { - LOG.error("An error occurred while getting partition information :{}", e.getMessage()); - throw new RuntimeException(e.getMessage(), e); - } - return 0; - } - - @Override - public int partition(HgNodePartitionerBuilder builder, String graphName, - int startKey, int endKey) { - try { - HashSet partitions = new HashSet<>(); - Metapb.Partition partition = null; - while ((partition == null || partition.getEndKey() < endKey) - && startKey < PartitionUtils.MAX_VALUE) { - KVPair partShard = - pdClient.getPartitionByCode(graphName, startKey); - if (partShard != null) { - partition = partShard.getKey(); - Metapb.Shard leader = partShard.getValue(); - partitions.add(HgNodePartition.of(leader.getStoreId(), startKey, - (int) partition.getStartKey(), - (int) partition.getEndKey())); - startKey = (int) partition.getEndKey(); - } else { - break; - } - } - builder.setPartitions(partitions); - } catch (PDException e) { - LOG.error("An error occurred while getting partition information :{}", e.getMessage()); - throw new RuntimeException(e.getMessage(), e); - } - return 0; - - } - - /** - * 查询hgstore信息 - * - * @return hgstore - */ - @Override - public HgStoreNode apply(String graphName, Long nodeId) { - try { - Metapb.Store store = pdClient.getStore(nodeId); - return nodeManager.getNodeBuilder().setNodeId(store.getId()) - .setAddress(store.getAddress()).build(); - } catch (PDException e) { - throw new RuntimeException(e.getMessage(), e); - } - } - - /** - * 通知更新缓存 - */ - @Override - public int notice(String graphName, HgStoreNotice storeNotice) { - LOG.warn(storeNotice.toString()); - if (storeNotice.getPartitionLeaders() != null) { - storeNotice.getPartitionLeaders().forEach((partId, leader) -> { - pdClient.updatePartitionLeader(graphName, partId, leader); - LOG.warn("updatePartitionLeader:{}-{}-{}", - graphName, partId, leader); - }); - } - if (storeNotice.getPartitionIds() != null) { - storeNotice.getPartitionIds().forEach(partId -> { - pdClient.invalidPartitionCache(graphName, partId); - }); - } - if (!storeNotice.getNodeStatus().equals( - HgNodeStatus.PARTITION_COMMON_FAULT) - && !storeNotice.getNodeStatus().equals( - HgNodeStatus.NOT_PARTITION_LEADER)) { - pdClient.invalidPartitionCache(); - LOG.warn("invalidPartitionCache:{} ", storeNotice.getNodeStatus()); - } - return 0; - } - - public Metapb.Graph delGraph(String graphName) { - try { - return pdClient.delGraph(graphName); - } catch (PDException e) { - LOG.error("delGraph {} exception, {}", graphName, e.getMessage()); - } - return null; - } - - public void setNodeManager(HgStoreNodeManager nodeManager) { - this.nodeManager = nodeManager; - } -} - -class FakeHstoreNodePartitionerImpl extends HstoreNodePartitionerImpl { - private static final Logger LOG = Log.logger(HstoreNodePartitionerImpl.class); - private static final int partitionCount = 3; - private static final Map leaderMap = new ConcurrentHashMap<>(); - private static final Map storeMap = new ConcurrentHashMap<>(); - HgStoreNodeManager nodeManager; - private final String hstorePeers; - - public FakeHstoreNodePartitionerImpl(String pdPeers) { - this.hstorePeers = pdPeers; - // store列表 - for (String address : hstorePeers.split(",")) { - storeMap.put((long) address.hashCode(), address); - } - // 分区列表 - for (int i = 0; i < partitionCount; i++) { - leaderMap.put(i, storeMap.keySet().iterator().next()); - } - } - - public FakeHstoreNodePartitionerImpl(HgStoreNodeManager nodeManager, - String peers) { - this(peers); - this.nodeManager = nodeManager; - } - - @Override - public int partition(HgNodePartitionerBuilder builder, String graphName, - byte[] startKey, byte[] endKey) { - int startCode = PartitionUtils.calcHashcode(startKey); - HashSet partitions = new HashSet<>(storeMap.size()); - if (ALL_PARTITION_OWNER == startKey) { - storeMap.forEach((k, v) -> { - partitions.add(HgNodePartition.of(k, -1)); - }); - } else if (endKey == HgStoreClientConst.EMPTY_BYTES || startKey == endKey || - Arrays.equals(startKey, endKey)) { - partitions.add( - HgNodePartition.of(leaderMap.get(startCode % partitionCount), startCode)); - } else { - LOG.error("OwnerKey转成HashCode后已经无序了, 按照OwnerKey范围查询没意义"); - storeMap.forEach((k, v) -> { - partitions.add(HgNodePartition.of(k, -1)); - }); - } - builder.setPartitions(partitions); - return 0; - } - - @Override - public HgStoreNode apply(String graphName, Long nodeId) { - return nodeManager.getNodeBuilder().setNodeId(nodeId) - .setAddress(storeMap.get(nodeId)).build(); - } - - @Override - public int notice(String graphName, HgStoreNotice storeNotice) { - if (storeNotice.getPartitionLeaders() != null - && storeNotice.getPartitionLeaders().size() > 0) { - leaderMap.putAll(storeNotice.getPartitionLeaders()); - } - return 0; - } - - public static class NodePartitionerFactory { - public static HstoreNodePartitionerImpl getNodePartitioner( - HugeConfig config, HgStoreNodeManager nodeManager) { - if (config.get(HstoreOptions.PD_FAKE)) { - return new FakeHstoreNodePartitionerImpl(nodeManager, - config.get(HstoreOptions.HSTORE_PEERS)); - } else { - return new HstoreNodePartitionerImpl(nodeManager, - config.get(HstoreOptions.PD_PEERS) - ); - } - - } - } -} diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java index bafde45f82..6de800697c 100644 --- a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreOptions.java @@ -24,24 +24,6 @@ public class HstoreOptions extends OptionHolder { - public static final ConfigOption PD_PEERS = new ConfigOption<>( - "pd.peers", - "The addresses of pd nodes, separated with commas.", - disallowEmpty(), - "localhost:8686" - ); - public static final ConfigOption PD_FAKE = new ConfigOption<>( - "pd.fake", - "Enable the fake PD service.", - disallowEmpty(), - false - ); - public static final ConfigOption HSTORE_PEERS = new ConfigOption<>( - "hstore.peers", - "The addresses of store nodes, separated with commas.", - disallowEmpty(), - "localhost:9080" - ); public static final ConfigOption PARTITION_COUNT = new ConfigOption<>( "hstore.partition_count", "Number of partitions, which PD controls partitions based on.", diff --git a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java index be90902dad..27de0e029b 100755 --- a/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java +++ b/hugegraph-server/hugegraph-hstore/src/main/java/org/apache/hugegraph/backend/store/hstore/HstoreSessionsImpl.java @@ -37,6 +37,7 @@ import org.apache.hugegraph.backend.store.BackendEntry.BackendColumn; import org.apache.hugegraph.backend.store.BackendEntry.BackendColumnIterator; import org.apache.hugegraph.backend.store.BackendEntryIterator; +import org.apache.hugegraph.config.CoreOptions; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.pd.client.PDClient; import org.apache.hugegraph.pd.client.PDConfig; @@ -103,7 +104,7 @@ private void initStoreNode(HugeConfig config) { synchronized (this) { if (!initializedNode) { PDConfig pdConfig = - PDConfig.of(config.get(HstoreOptions.PD_PEERS)) + PDConfig.of(config.get(CoreOptions.PD_PEERS)) .setEnableCache(true); defaultPdClient = PDClient.create(pdConfig); hgStoreClient = @@ -244,7 +245,9 @@ public ColumnIterator(String table, T results, byte[] keyBegin, this.position = iter.position(); } else { this.gotNext = false; - this.position = null; + // QUESTION: Resetting the position may result in the caller being unable to + // retrieve the corresponding position. + // this.position = null; } if (!ArrayUtils.isEmpty(this.keyBegin) || !ArrayUtils.isEmpty(this.keyEnd)) { @@ -315,7 +318,9 @@ public boolean hasNext() { if (gotNext) { this.position = this.iter.position(); } else { - this.position = null; + // QUESTION: Resetting the position may result in the caller being unable to + // retrieve the corresponding position. + // this.position = null; } return gotNext; } diff --git a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/grpc/KvPageScanner.java b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/grpc/KvPageScanner.java index c9e99d9b70..e6ed4a729e 100644 --- a/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/grpc/KvPageScanner.java +++ b/hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/grpc/KvPageScanner.java @@ -194,8 +194,10 @@ public boolean hasNext() { if (!this.in) { return false; } - if (this.iterator != null && this.iterator.hasNext()) { - return true; + // QUESTION: After `this.iterator.hasNext()` evaluates to false, + // no further attempts are made to reconstruct the iterator. + if (this.iterator != null) { + return this.iterator.hasNext(); } long start = 0; boolean debugEnabled = log.isDebugEnabled(); diff --git a/hugegraph-store/pom.xml b/hugegraph-store/pom.xml index bc6154ad12..36f8b6982f 100644 --- a/hugegraph-store/pom.xml +++ b/hugegraph-store/pom.xml @@ -153,7 +153,7 @@ *.tar.gz .flattened-pom.xml - + dist/** false