Skip to content

Commit 2bdf897

Browse files
committed
git add hugegraph-pd/hg-pd-grpc/ hugegraph-pd/hg-pd-common/ hugegraph-pd/hg-pd-client/ pom.xml hugegraph-pd/pom.xml
1 parent dfee5bf commit 2bdf897

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+7038
-4
lines changed

hugegraph-pd/hg-pd-client/pom.xml

+73
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one or more
4+
contributor license agreements. See the NOTICE file distributed with
5+
this work for additional information regarding copyright ownership.
6+
The ASF licenses this file to You under the Apache License, Version 2.0
7+
(the "License"); you may not use this file except in compliance with
8+
the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
19+
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xmlns="http://maven.apache.org/POM/4.0.0"
21+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
22+
23+
<modelVersion>4.0.0</modelVersion>
24+
25+
<parent>
26+
<groupId>org.apache.hugegraph</groupId>
27+
<artifactId>hugegraph-pd</artifactId>
28+
<version>${revision}</version>
29+
<relativePath>../pom.xml</relativePath>
30+
</parent>
31+
<artifactId>hg-pd-client</artifactId>
32+
33+
<dependencies>
34+
<dependency>
35+
<groupId>org.projectlombok</groupId>
36+
<artifactId>lombok</artifactId>
37+
<version>1.18.20</version>
38+
</dependency>
39+
<dependency>
40+
<groupId>org.apache.logging.log4j</groupId>
41+
<artifactId>log4j-slf4j-impl</artifactId>
42+
<version>2.17.0</version>
43+
</dependency>
44+
<dependency>
45+
<groupId>org.apache.hugegraph</groupId>
46+
<artifactId>hg-pd-grpc</artifactId>
47+
<version>${revision}</version>
48+
</dependency>
49+
<dependency>
50+
<groupId>org.apache.hugegraph</groupId>
51+
<artifactId>hg-pd-common</artifactId>
52+
<version>${revision}</version>
53+
<scope>compile</scope>
54+
</dependency>
55+
<dependency>
56+
<groupId>junit</groupId>
57+
<artifactId>junit</artifactId>
58+
<version>4.13.2</version>
59+
<scope>test</scope>
60+
</dependency>
61+
<dependency>
62+
<groupId>commons-io</groupId>
63+
<artifactId>commons-io</artifactId>
64+
<version>2.8.0</version>
65+
</dependency>
66+
<dependency>
67+
<groupId>org.yaml</groupId>
68+
<artifactId>snakeyaml</artifactId>
69+
<version>1.28</version>
70+
<scope>test</scope>
71+
</dependency>
72+
</dependencies>
73+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hugegraph.pd.client;
19+
20+
import java.io.Closeable;
21+
import java.util.LinkedList;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.ConcurrentMap;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
import java.util.function.Predicate;
27+
import java.util.stream.Stream;
28+
29+
import org.apache.hugegraph.pd.common.KVPair;
30+
import org.apache.hugegraph.pd.common.PDException;
31+
import org.apache.hugegraph.pd.grpc.Metapb;
32+
import org.apache.hugegraph.pd.grpc.PDGrpc;
33+
import org.apache.hugegraph.pd.grpc.PDGrpc.PDBlockingStub;
34+
import org.apache.hugegraph.pd.grpc.Pdpb;
35+
import org.apache.hugegraph.pd.grpc.Pdpb.GetMembersRequest;
36+
import org.apache.hugegraph.pd.grpc.Pdpb.GetMembersResponse;
37+
38+
import io.grpc.Channel;
39+
import io.grpc.ClientCall;
40+
import io.grpc.ManagedChannel;
41+
import io.grpc.ManagedChannelBuilder;
42+
import io.grpc.MethodDescriptor;
43+
import io.grpc.StatusRuntimeException;
44+
import io.grpc.stub.AbstractBlockingStub;
45+
import io.grpc.stub.AbstractStub;
46+
import io.grpc.stub.ClientCalls;
47+
import io.grpc.stub.StreamObserver;
48+
import lombok.extern.slf4j.Slf4j;
49+
50+
@Slf4j
51+
public abstract class AbstractClient implements Closeable {
52+
53+
private static final ConcurrentHashMap<String, ManagedChannel> chs = new ConcurrentHashMap<>();
54+
public static Pdpb.ResponseHeader okHeader = Pdpb.ResponseHeader.newBuilder().setError(
55+
Pdpb.Error.newBuilder().setType(Pdpb.ErrorType.OK)).build();
56+
protected final Pdpb.RequestHeader header;
57+
protected final AbstractClientStubProxy stubProxy;
58+
protected final PDConfig config;
59+
protected ManagedChannel channel = null;
60+
protected volatile ConcurrentMap<String, AbstractBlockingStub> stubs = null;
61+
62+
protected AbstractClient(PDConfig config) {
63+
String[] hosts = config.getServerHost().split(",");
64+
this.stubProxy = new AbstractClientStubProxy(hosts);
65+
this.header = Pdpb.RequestHeader.getDefaultInstance();
66+
this.config = config;
67+
}
68+
69+
public static Pdpb.ResponseHeader newErrorHeader(int errorCode, String errorMsg) {
70+
Pdpb.ResponseHeader header = Pdpb.ResponseHeader.newBuilder().setError(
71+
Pdpb.Error.newBuilder().setTypeValue(errorCode).setMessage(errorMsg)).build();
72+
return header;
73+
}
74+
75+
protected static void handleErrors(Pdpb.ResponseHeader header) throws PDException {
76+
if (header.hasError() && header.getError().getType() != Pdpb.ErrorType.OK) {
77+
throw new PDException(header.getError().getTypeValue(),
78+
String.format("PD request error, error code = %d, msg = %s",
79+
header.getError().getTypeValue(),
80+
header.getError().getMessage()));
81+
}
82+
}
83+
84+
protected AbstractBlockingStub getBlockingStub() throws PDException {
85+
if (stubProxy.getBlockingStub() == null) {
86+
synchronized (this) {
87+
if (stubProxy.getBlockingStub() == null) {
88+
String host = resetStub();
89+
if (host.isEmpty()) {
90+
throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE,
91+
"PD unreachable, pd.peers=" +
92+
config.getServerHost());
93+
}
94+
}
95+
}
96+
}
97+
return (AbstractBlockingStub) stubProxy.getBlockingStub()
98+
.withDeadlineAfter(config.getGrpcTimeOut(),
99+
TimeUnit.MILLISECONDS);
100+
}
101+
102+
protected AbstractStub getStub() throws PDException {
103+
if (stubProxy.getStub() == null) {
104+
synchronized (this) {
105+
if (stubProxy.getStub() == null) {
106+
String host = resetStub();
107+
if (host.isEmpty()) {
108+
throw new PDException(Pdpb.ErrorType.PD_UNREACHABLE_VALUE,
109+
"PD unreachable, pd.peers=" +
110+
config.getServerHost());
111+
}
112+
}
113+
}
114+
}
115+
return stubProxy.getStub();
116+
}
117+
118+
protected abstract AbstractStub createStub();
119+
120+
protected abstract AbstractBlockingStub createBlockingStub();
121+
122+
private String resetStub() {
123+
String leaderHost = "";
124+
for (int i = 0; i < stubProxy.getHostCount(); i++) {
125+
String host = stubProxy.nextHost();
126+
channel = ManagedChannelBuilder.forTarget(host).usePlaintext().build();
127+
PDBlockingStub blockingStub = PDGrpc.newBlockingStub(channel)
128+
.withDeadlineAfter(config.getGrpcTimeOut(),
129+
TimeUnit.MILLISECONDS);
130+
try {
131+
GetMembersRequest request = Pdpb.GetMembersRequest.newBuilder()
132+
.setHeader(header).build();
133+
GetMembersResponse members = blockingStub.getMembers(request);
134+
Metapb.Member leader = members.getLeader();
135+
leaderHost = leader.getGrpcUrl();
136+
close();
137+
channel = ManagedChannelBuilder.forTarget(leaderHost).usePlaintext().build();
138+
stubProxy.setBlockingStub(createBlockingStub());
139+
stubProxy.setStub(createStub());
140+
log.info("PDClient connect to host = {} success", leaderHost);
141+
break;
142+
} catch (Exception e) {
143+
log.error("PDClient connect to {} exception {}, {}", host, e.getMessage(),
144+
e.getCause() != null ? e.getCause().getMessage() : "");
145+
}
146+
}
147+
return leaderHost;
148+
}
149+
150+
protected <ReqT, RespT, StubT extends AbstractBlockingStub<StubT>> RespT blockingUnaryCall(
151+
MethodDescriptor<ReqT, RespT> method, ReqT req) throws PDException {
152+
return blockingUnaryCall(method, req, 5);
153+
}
154+
155+
protected <ReqT, RespT, StubT extends AbstractBlockingStub<StubT>> RespT blockingUnaryCall(
156+
MethodDescriptor<ReqT, RespT> method, ReqT req, int retry) throws PDException {
157+
AbstractBlockingStub stub = getBlockingStub();
158+
try {
159+
RespT resp =
160+
ClientCalls.blockingUnaryCall(stub.getChannel(), method, stub.getCallOptions(),
161+
req);
162+
return resp;
163+
} catch (Exception e) {
164+
log.error(method.getFullMethodName() + " exception, {}", e.getMessage());
165+
if (e instanceof StatusRuntimeException) {
166+
if (retry < stubProxy.getHostCount()) {
167+
// 网络不通,关掉之前连接,换host重新连接
168+
synchronized (this) {
169+
stubProxy.setBlockingStub(null);
170+
}
171+
return blockingUnaryCall(method, req, ++retry);
172+
}
173+
}
174+
}
175+
return null;
176+
}
177+
178+
// this.stubs = new ConcurrentHashMap<String,AbstractBlockingStub>(hosts.length);
179+
private AbstractBlockingStub getConcurrentBlockingStub(String address) {
180+
AbstractBlockingStub stub = stubs.get(address);
181+
if (stub != null) {
182+
return stub;
183+
}
184+
Channel ch = ManagedChannelBuilder.forTarget(address).usePlaintext().build();
185+
PDBlockingStub blockingStub =
186+
PDGrpc.newBlockingStub(ch).withDeadlineAfter(config.getGrpcTimeOut(),
187+
TimeUnit.MILLISECONDS);
188+
stubs.put(address, blockingStub);
189+
return blockingStub;
190+
191+
}
192+
193+
protected <ReqT, RespT> KVPair<Boolean, RespT> concurrentBlockingUnaryCall(
194+
MethodDescriptor<ReqT, RespT> method, ReqT req, Predicate<RespT> predicate) {
195+
LinkedList<String> hostList = this.stubProxy.getHostList();
196+
if (this.stubs == null) {
197+
synchronized (this) {
198+
if (this.stubs == null) {
199+
this.stubs = new ConcurrentHashMap<>(hostList.size());
200+
}
201+
}
202+
}
203+
Stream<RespT> respTStream = hostList.parallelStream().map((address) -> {
204+
AbstractBlockingStub stub = getConcurrentBlockingStub(address);
205+
RespT resp = ClientCalls.blockingUnaryCall(stub.getChannel(),
206+
method, stub.getCallOptions(), req);
207+
return resp;
208+
});
209+
KVPair<Boolean, RespT> pair;
210+
AtomicReference<RespT> response = new AtomicReference<>();
211+
boolean result = respTStream.anyMatch((r) -> {
212+
response.set(r);
213+
return predicate.test(r);
214+
});
215+
if (result) {
216+
pair = new KVPair<>(true, null);
217+
} else {
218+
pair = new KVPair<>(false, response.get());
219+
}
220+
return pair;
221+
}
222+
223+
protected <ReqT, RespT> void streamingCall(MethodDescriptor<ReqT, RespT> method, ReqT request,
224+
StreamObserver<RespT> responseObserver,
225+
int retry) throws PDException {
226+
AbstractStub stub = getStub();
227+
try {
228+
ClientCall<ReqT, RespT> call = stub.getChannel().newCall(method, stub.getCallOptions());
229+
ClientCalls.asyncServerStreamingCall(call, request, responseObserver);
230+
} catch (Exception e) {
231+
if (e instanceof StatusRuntimeException) {
232+
if (retry < stubProxy.getHostCount()) {
233+
synchronized (this) {
234+
stubProxy.setStub(null);
235+
}
236+
streamingCall(method, request, responseObserver, ++retry);
237+
return;
238+
}
239+
}
240+
log.error("rpc call with exception, {}", e.getMessage());
241+
}
242+
}
243+
244+
@Override
245+
public void close() {
246+
closeChannel(channel);
247+
if (stubs != null) {
248+
for (AbstractBlockingStub stub : stubs.values()) {
249+
closeChannel((ManagedChannel) stub.getChannel());
250+
}
251+
}
252+
253+
}
254+
255+
private void closeChannel(ManagedChannel channel) {
256+
try {
257+
while (channel != null &&
258+
!channel.shutdownNow().awaitTermination(100, TimeUnit.MILLISECONDS)) {
259+
continue;
260+
}
261+
} catch (Exception e) {
262+
log.info("Close channel with error : ", e);
263+
}
264+
}
265+
}

0 commit comments

Comments
 (0)