Skip to content

Commit df6df7d

Browse files
committed
WIP adds new highly concurrent tablet location cache
This is a draft change that contains the basic incomplete structure of a new highly concurrent tablet location cache. For 2.1 this new implementation could be added along side the existing battle tested implementation with an option to use the new one. The current structure with an abstract class for TabletLocator would make it easy to switch the implementation based on a client config property or java property. The current tablet location cache has read/write locks that accomplish two goals. 1. Protect the integrity of an in memory tree map from concurrent updates. 2. Avoids N threads doing N metadata lookups on a cache miss. The write lock in the code will usually result in a single metadata lookup in the case where many threads show up and want info from the same area of the metadata table. This changes replaces the tree map with a concurrent map, removing the need to have locks protecting the data structure. Then it proposes a mechanism in `ConcurrentTabletLocator.requestLookup()` to attempt to avoid many concurrent metadata lookups for the same information. If this implementation were fleshed out, it seems like it may end up being much simpler than the existing cache. The existing code has a lot of complexity related to first trying with a read lock and then switching to a write lock on cache miss, these changes do not have any of that complexity.
1 parent 4761d5a commit df6df7d

File tree

2 files changed

+235
-2
lines changed

2 files changed

+235
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.accumulo.core.clientImpl;
20+
21+
import static org.apache.accumulo.core.clientImpl.TabletLocatorImpl.END_ROW_COMPARATOR;
22+
23+
import java.util.ArrayList;
24+
import java.util.Collection;
25+
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.concurrent.BlockingQueue;
29+
import java.util.concurrent.ConcurrentSkipListMap;
30+
import java.util.concurrent.LinkedBlockingQueue;
31+
import java.util.concurrent.locks.Lock;
32+
import java.util.concurrent.locks.ReentrantLock;
33+
34+
import org.apache.accumulo.core.client.AccumuloException;
35+
import org.apache.accumulo.core.client.AccumuloSecurityException;
36+
import org.apache.accumulo.core.client.TableNotFoundException;
37+
import org.apache.accumulo.core.data.Mutation;
38+
import org.apache.accumulo.core.data.Range;
39+
import org.apache.accumulo.core.dataImpl.KeyExtent;
40+
import org.apache.hadoop.io.Text;
41+
42+
public class ConcurrentTabletLocator extends TabletLocator {
43+
44+
private ConcurrentSkipListMap<Text,TabletLocation> metaCache =
45+
new ConcurrentSkipListMap<>(END_ROW_COMPARATOR);
46+
private BlockingQueue<Text> lookupQueue = new LinkedBlockingQueue<>();
47+
private final Lock lookupLock = new ReentrantLock();
48+
49+
@Override
50+
public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow,
51+
boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
52+
53+
if (skipRow) {
54+
row = new Text(row);
55+
row.append(new byte[] {0}, 0, 1);
56+
}
57+
58+
// this does no locking when all the needed info is in the cache already and is much simpler
59+
// than the code in TabletLocatorImpl which tries w/ a read lock first and on miss switches to a
60+
// write lock
61+
TabletLocation tl = locateTabletInCache(row);
62+
while (tl == null) {
63+
// TODO sleep w/ backoff
64+
requestLookup(row);
65+
tl = locateTabletInCache(row);
66+
}
67+
68+
return tl;
69+
}
70+
71+
/**
72+
* This function gathers all work from all threads that currently need to do metadata lookups and
73+
* processes them all together. The goal of this function is to avoid N threads that all want the
74+
* same tablet metadata from doing N metadata lookups. This function attempt to take the N
75+
* metadata lookup needs and reduce them to a single metadata lookup for the client.
76+
*
77+
* @param row
78+
*/
79+
private void requestLookup(Text row) {
80+
// Add lookup request to queue outside the lock, whatever thread gets the lock will process this
81+
// work. If a thread is currently processing a lookup, then new request will build up in the
82+
// queue until its done and when its done one thread will process all the work for all waiting
83+
// threads.
84+
lookupQueue.add(row);
85+
lookupLock.lock();
86+
try {
87+
if (lookupQueue.isEmpty()) {
88+
// some other thread processed our request, so nothing to do
89+
return;
90+
}
91+
92+
// TODO could filter out anything that is now in the cache, could have been added since the
93+
// lookup miss and there may not be anything to do at this point
94+
95+
ArrayList<Text> lookupsToProcess = new ArrayList<>();
96+
lookupQueue.drainTo(lookupsToProcess);
97+
98+
// TODO process all the queued work from all threads in lookupsToProcess using a batch scanner
99+
// and update the cache. Could collapse the requested lookups into ranges and use a batch
100+
// scanner to get the top N tablets for each range.
101+
102+
} finally {
103+
lookupLock.unlock();
104+
}
105+
106+
}
107+
108+
@Override
109+
public <T extends Mutation> void binMutations(ClientContext context, List<T> mutations,
110+
Map<String,TabletServerMutations<T>> binnedMutations, List<T> failures)
111+
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
112+
113+
// TODO handle empty mutations, otherwise this could loop forever
114+
115+
// this is also much simpler than the existing code because it does not do the switch from read
116+
// lock to write lock
117+
do {
118+
TabletLocatorImpl.LockCheckerSession lcSession = null;
119+
ArrayList<T> notInCache = new ArrayList<>();
120+
Text row = new Text();
121+
122+
for (T mutation : mutations) {
123+
row.set(mutation.getRow());
124+
TabletLocation tl = locateTabletInCache(row);
125+
if (tl == null || !addMutation(binnedMutations, mutation, tl, lcSession)) {
126+
notInCache.add(mutation);
127+
}
128+
}
129+
130+
if (!notInCache.isEmpty()) {
131+
binnedMutations.clear();
132+
// TODO sleep w/ backoff
133+
// request coordinated lookup of the missing extents
134+
requestLookups(notInCache);
135+
}
136+
137+
} while (binnedMutations.isEmpty());
138+
}
139+
140+
private <T extends Mutation> void requestLookups(ArrayList<T> notInCache) {
141+
// TODO
142+
}
143+
144+
@Override
145+
public List<Range> binRanges(ClientContext context, List<Range> ranges,
146+
Map<String,Map<KeyExtent,List<Range>>> binnedRanges)
147+
throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
148+
// TODO implement
149+
throw new UnsupportedOperationException();
150+
}
151+
152+
@Override
153+
public void invalidateCache(KeyExtent failedExtent) {
154+
metaCache.remove(getCacheKey(failedExtent));
155+
}
156+
157+
@Override
158+
public void invalidateCache(Collection<KeyExtent> keySet) {
159+
keySet.forEach(extent -> metaCache.remove(getCacheKey(extent)));
160+
}
161+
162+
@Override
163+
public void invalidateCache() {
164+
metaCache.clear();
165+
}
166+
167+
private BlockingQueue<String> serverInvalidationQueue = new LinkedBlockingQueue<>();
168+
private final Lock serverInvalidationLock = new ReentrantLock();
169+
170+
@Override
171+
public void invalidateCache(ClientContext context, String server) {
172+
// This method is structured so that when lots of threads attempt to invalidate servers at
173+
// around the same time the amount of full scans of the metadata cache is minimized.
174+
serverInvalidationQueue.add(server);
175+
serverInvalidationLock.lock();
176+
try {
177+
if (serverInvalidationQueue.isEmpty()) {
178+
// some other thread invalidated our server so nothing to do
179+
return;
180+
}
181+
HashSet<String> serversToInvalidate = new HashSet<>();
182+
serverInvalidationQueue.drainTo(serversToInvalidate);
183+
metaCache.values().removeIf(tl -> serversToInvalidate.contains(tl.tablet_location));
184+
} finally {
185+
serverInvalidationLock.unlock();
186+
}
187+
}
188+
189+
private Text getCacheKey(KeyExtent extent) {
190+
// TODO handle null end row
191+
return extent.endRow();
192+
}
193+
194+
// TODO was copied from TabletLocatorImpl, could share code w/ TabletLocatorImpl
195+
private TabletLocation locateTabletInCache(Text row) {
196+
197+
Map.Entry<Text,TabletLocation> entry = metaCache.ceilingEntry(row);
198+
199+
if (entry != null) {
200+
KeyExtent ke = entry.getValue().tablet_extent;
201+
if (ke.prevEndRow() == null || ke.prevEndRow().compareTo(row) < 0) {
202+
return entry.getValue();
203+
}
204+
}
205+
return null;
206+
}
207+
208+
// TODO was copied from TabletLocatorImpl, could share code w/ TabletLocatorImpl
209+
private <T extends Mutation> boolean addMutation(
210+
Map<String,TabletServerMutations<T>> binnedMutations, T mutation, TabletLocation tl,
211+
TabletLocatorImpl.LockCheckerSession lcSession) {
212+
TabletServerMutations<T> tsm = binnedMutations.get(tl.tablet_location);
213+
214+
if (tsm == null) {
215+
// do lock check once per tserver here to make binning faster
216+
boolean lockHeld = lcSession.checkLock(tl) != null;
217+
if (lockHeld) {
218+
tsm = new TabletServerMutations<>(tl.tablet_session);
219+
binnedMutations.put(tl.tablet_location, tsm);
220+
} else {
221+
return false;
222+
}
223+
}
224+
225+
// its possible the same tserver could be listed with different sessions
226+
if (tsm.getSession().equals(tl.tablet_session)) {
227+
tsm.addMutation(tl.tablet_extent, mutation);
228+
return true;
229+
}
230+
231+
return false;
232+
}
233+
}

core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocatorImpl.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,12 @@ public interface TabletServerLockChecker {
108108
void invalidateCache(String server);
109109
}
110110

111-
private class LockCheckerSession {
111+
class LockCheckerSession {
112112

113113
private final HashSet<Pair<String,String>> okLocks = new HashSet<>();
114114
private final HashSet<Pair<String,String>> invalidLocks = new HashSet<>();
115115

116-
private TabletLocation checkLock(TabletLocation tl) {
116+
TabletLocation checkLock(TabletLocation tl) {
117117
// the goal of this class is to minimize calls out to lockChecker under that assumption that
118118
// its a resource synchronized among many threads... want to
119119
// avoid fine grained synchronization when binning lots of mutations or ranges... remember

0 commit comments

Comments
 (0)