Skip to content

Commit f3d5fb0

Browse files
committedMar 4, 2024·
Merge branch '2.1'
2 parents ebf7054 + d91d016 commit f3d5fb0

File tree

8 files changed

+158
-170
lines changed

8 files changed

+158
-170
lines changed
 

‎core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java

+20-9
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424

2525
import org.apache.accumulo.core.rpc.SaslConnectionParams;
2626
import org.apache.accumulo.core.rpc.SslConnectionParams;
27+
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
2728

2829
import com.google.common.annotations.VisibleForTesting;
2930
import com.google.common.net.HostAndPort;
3031

3132
@VisibleForTesting
3233
public class ThriftTransportKey {
34+
private final ThriftClientTypes<?> type;
3335
private final HostAndPort server;
3436
private final long timeout;
3537
private final SslConnectionParams sslParams;
@@ -38,16 +40,18 @@ public class ThriftTransportKey {
3840
private final int hash;
3941

4042
@VisibleForTesting
41-
public ThriftTransportKey(HostAndPort server, long timeout, ClientContext context) {
42-
this(server, timeout, context.getClientSslParams(), context.getSaslParams());
43+
public ThriftTransportKey(ThriftClientTypes<?> type, HostAndPort server, long timeout,
44+
ClientContext context) {
45+
this(type, server, timeout, context.getClientSslParams(), context.getSaslParams());
4346
}
4447

4548
/**
4649
* Visible only for testing
4750
*/
48-
ThriftTransportKey(HostAndPort server, long timeout, SslConnectionParams sslParams,
49-
SaslConnectionParams saslParams) {
51+
ThriftTransportKey(ThriftClientTypes<?> type, HostAndPort server, long timeout,
52+
SslConnectionParams sslParams, SaslConnectionParams saslParams) {
5053
requireNonNull(server, "location is null");
54+
this.type = type;
5155
this.server = server;
5256
this.timeout = timeout;
5357
this.sslParams = sslParams;
@@ -56,14 +60,21 @@ public ThriftTransportKey(HostAndPort server, long timeout, ClientContext contex
5660
// TSasl and TSSL transport factories don't play nicely together
5761
throw new IllegalArgumentException("Cannot use both SSL and SASL thrift transports");
5862
}
59-
this.hash = Objects.hash(server, timeout, sslParams, saslParams);
63+
this.hash = Objects.hash(type, server, timeout, sslParams, saslParams);
6064
}
6165

62-
HostAndPort getServer() {
66+
@VisibleForTesting
67+
public ThriftClientTypes<?> getType() {
68+
return type;
69+
}
70+
71+
@VisibleForTesting
72+
public HostAndPort getServer() {
6373
return server;
6474
}
6575

66-
long getTimeout() {
76+
@VisibleForTesting
77+
public long getTimeout() {
6778
return timeout;
6879
}
6980

@@ -81,7 +92,7 @@ public boolean equals(Object o) {
8192
return false;
8293
}
8394
ThriftTransportKey ttk = (ThriftTransportKey) o;
84-
return server.equals(ttk.server) && timeout == ttk.timeout
95+
return type.equals(ttk.type) && server.equals(ttk.server) && timeout == ttk.timeout
8596
&& (!isSsl() || (ttk.isSsl() && sslParams.equals(ttk.sslParams)))
8697
&& (!isSasl() || (ttk.isSasl() && saslParams.equals(ttk.saslParams)));
8798
}
@@ -99,7 +110,7 @@ public String toString() {
99110
} else if (isSasl()) {
100111
prefix = saslParams + ":";
101112
}
102-
return prefix + server + " (" + timeout + ")";
113+
return prefix + type + ":" + server + " (" + timeout + ")";
103114
}
104115

105116
public SslConnectionParams getSslParams() {

‎core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java

+27-83
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.function.Supplier;
4242

4343
import org.apache.accumulo.core.rpc.ThriftUtil;
44+
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
4445
import org.apache.accumulo.core.util.Pair;
4546
import org.apache.accumulo.core.util.threads.Threads;
4647
import org.apache.thrift.TConfiguration;
@@ -49,7 +50,6 @@
4950
import org.slf4j.Logger;
5051
import org.slf4j.LoggerFactory;
5152

52-
import com.google.common.annotations.VisibleForTesting;
5353
import com.google.common.base.Preconditions;
5454
import com.google.common.net.HostAndPort;
5555

@@ -109,71 +109,40 @@ static ThriftTransportPool startNew(LongSupplier maxAgeMillis) {
109109
return pool;
110110
}
111111

112-
public TTransport getTransport(HostAndPort location, long milliseconds, ClientContext context)
113-
throws TTransportException {
114-
ThriftTransportKey cacheKey = new ThriftTransportKey(location, milliseconds, context);
112+
public TTransport getTransport(ThriftClientTypes<?> type, HostAndPort location, long milliseconds,
113+
ClientContext context, boolean preferCached) throws TTransportException {
115114

116-
CachedConnection connection = connectionPool.reserveAny(cacheKey);
117-
118-
if (connection != null) {
119-
log.trace("Using existing connection to {}", cacheKey.getServer());
120-
return connection.transport;
121-
} else {
122-
return createNewTransport(cacheKey);
115+
ThriftTransportKey cacheKey = new ThriftTransportKey(type, location, milliseconds, context);
116+
if (preferCached) {
117+
CachedConnection connection = connectionPool.reserveAny(cacheKey);
118+
if (connection != null) {
119+
log.trace("Using existing connection to {}", cacheKey.getServer());
120+
return connection.transport;
121+
}
123122
}
123+
return createNewTransport(cacheKey);
124124
}
125125

126-
@VisibleForTesting
127-
public Pair<String,TTransport> getAnyTransport(List<ThriftTransportKey> servers,
128-
boolean preferCachedConnection) throws TTransportException {
129-
130-
servers = new ArrayList<>(servers);
131-
132-
if (preferCachedConnection) {
133-
HashSet<ThriftTransportKey> serversSet = new HashSet<>(servers);
134-
135-
// randomly pick a server from the connection cache
136-
serversSet.retainAll(connectionPool.getThriftTransportKeys());
137-
138-
if (!serversSet.isEmpty()) {
139-
ArrayList<ThriftTransportKey> cachedServers = new ArrayList<>(serversSet);
140-
Collections.shuffle(cachedServers, RANDOM.get());
141-
142-
for (ThriftTransportKey ttk : cachedServers) {
143-
CachedConnection connection = connectionPool.reserveAny(ttk);
144-
if (connection != null) {
145-
final String serverAddr = ttk.getServer().toString();
146-
log.trace("Using existing connection to {}", serverAddr);
147-
return new Pair<>(serverAddr, connection.transport);
148-
}
149-
150-
}
126+
public Pair<String,TTransport> getAnyCachedTransport(ThriftClientTypes<?> type) {
127+
final List<ThriftTransportKey> serversSet = new ArrayList<>();
128+
for (ThriftTransportKey ttk : connectionPool.getThriftTransportKeys()) {
129+
if (ttk.getType().equals(type)) {
130+
serversSet.add(ttk);
151131
}
152132
}
153-
154-
int retryCount = 0;
155-
while (!servers.isEmpty() && retryCount < 10) {
156-
157-
int index = RANDOM.get().nextInt(servers.size());
158-
ThriftTransportKey ttk = servers.get(index);
159-
160-
if (preferCachedConnection) {
161-
CachedConnection connection = connectionPool.reserveAnyIfPresent(ttk);
162-
if (connection != null) {
163-
return new Pair<>(ttk.getServer().toString(), connection.transport);
164-
}
165-
}
166-
167-
try {
168-
return new Pair<>(ttk.getServer().toString(), createNewTransport(ttk));
169-
} catch (TTransportException tte) {
170-
log.debug("Failed to connect to {}", servers.get(index), tte);
171-
servers.remove(index);
172-
retryCount++;
133+
if (serversSet.isEmpty()) {
134+
return null;
135+
}
136+
Collections.shuffle(serversSet, RANDOM.get());
137+
for (ThriftTransportKey ttk : serversSet) {
138+
CachedConnection connection = connectionPool.reserveAny(ttk);
139+
if (connection != null) {
140+
final String serverAddr = ttk.getServer().toString();
141+
log.trace("Using existing connection to {}", serverAddr);
142+
return new Pair<>(serverAddr, connection.transport);
173143
}
174144
}
175-
176-
throw new TTransportException("Failed to connect to a server");
145+
return null;
177146
}
178147

179148
private TTransport createNewTransport(ThriftTransportKey cacheKey) throws TTransportException {
@@ -389,27 +358,6 @@ CachedConnection reserveAny(final ThriftTransportKey key) {
389358
return executeWithinLock(key, connections::reserveAny);
390359
}
391360

392-
/**
393-
* Reserve and return a new {@link CachedConnection} from the {@link CachedConnections} mapped
394-
* to the specified transport key. If a {@link CachedConnections} is not found, null will be
395-
* returned.
396-
*
397-
* <p>
398-
*
399-
* This operation locks access to the mapping for the key in {@link ConnectionPool#connections}
400-
* until the operation completes.
401-
*
402-
* @param key the transport key
403-
* @return the reserved {@link CachedConnection}, or null if none were available.
404-
*/
405-
CachedConnection reserveAnyIfPresent(final ThriftTransportKey key) {
406-
// It's possible that multiple locks from executeWithinLock will overlap with a single lock
407-
// inside the ConcurrentHashMap which can unnecessarily block threads. Access the
408-
// ConcurrentHashMap outside of executeWithinLock to prevent this.
409-
var connections = getCachedConnections(key);
410-
return connections == null ? null : executeWithinLock(key, connections::reserveAny);
411-
}
412-
413361
/**
414362
* Puts the specified connection into the reserved map of the {@link CachedConnections} for the
415363
* specified transport key. If a {@link CachedConnections} is not found, one will be created.
@@ -515,10 +463,6 @@ Lock getLock(final ThriftTransportKey key) {
515463
return lock;
516464
}
517465

518-
CachedConnections getCachedConnections(final ThriftTransportKey key) {
519-
return connections.get(key);
520-
}
521-
522466
CachedConnections getOrCreateCachedConnections(final ThriftTransportKey key) {
523467
return connections.computeIfAbsent(key, k -> new CachedConnections());
524468
}

‎core/src/main/java/org/apache/accumulo/core/rpc/ThriftUtil.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ public static <T extends TServiceClient> T getClientNoTimeout(ThriftClientTypes<
111111
*/
112112
public static <T extends TServiceClient> T getClient(ThriftClientTypes<T> type,
113113
HostAndPort address, ClientContext context) throws TTransportException {
114-
TTransport transport = context.getTransportPool().getTransport(address,
115-
context.getClientTimeoutInMillis(), context);
114+
TTransport transport = context.getTransportPool().getTransport(type, address,
115+
context.getClientTimeoutInMillis(), context, true);
116116
return createClient(type, transport);
117117
}
118118

@@ -127,7 +127,8 @@ public static <T extends TServiceClient> T getClient(ThriftClientTypes<T> type,
127127
*/
128128
public static <T extends TServiceClient> T getClient(ThriftClientTypes<T> type,
129129
HostAndPort address, ClientContext context, long timeout) throws TTransportException {
130-
TTransport transport = context.getTransportPool().getTransport(address, timeout, context);
130+
TTransport transport =
131+
context.getTransportPool().getTransport(type, address, timeout, context, true);
131132
return createClient(type, transport);
132133
}
133134

0 commit comments

Comments
 (0)
Please sign in to comment.