Skip to content

Commit eaa1f67

Browse files
committed
Merge branch '2.1'
2 parents 0ad96b1 + 7947c2c commit eaa1f67

File tree

4 files changed

+178
-6
lines changed

4 files changed

+178
-6
lines changed

core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java

+38-1
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,20 @@
3030
import java.util.Set;
3131
import java.util.TreeMap;
3232
import java.util.TreeSet;
33+
import java.util.function.Supplier;
3334

3435
import org.apache.accumulo.core.Constants;
3536
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
37+
import org.apache.accumulo.core.clientImpl.ClientContext;
38+
import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl;
39+
import org.apache.accumulo.core.clientImpl.ScannerImpl;
3640
import org.apache.accumulo.core.clientImpl.ScannerOptions;
3741
import org.apache.accumulo.core.data.ArrayByteSequence;
3842
import org.apache.accumulo.core.data.ByteSequence;
3943
import org.apache.accumulo.core.data.Column;
4044
import org.apache.accumulo.core.data.Key;
4145
import org.apache.accumulo.core.data.Range;
46+
import org.apache.accumulo.core.data.TableId;
4247
import org.apache.accumulo.core.data.Value;
4348
import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
4449
import org.apache.accumulo.core.iterators.IteratorAdapter;
@@ -71,13 +76,17 @@
7176
* server side) and to the client side scanner (which will execute client side).
7277
*/
7378
public class ClientSideIteratorScanner extends ScannerOptions implements Scanner {
79+
7480
private int size;
7581

7682
private Range range;
7783
private boolean isolated = false;
7884
private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
7985
private SamplerConfiguration iteratorSamplerConfig;
8086

87+
private final Supplier<ClientContext> context;
88+
private final Supplier<TableId> tableId;
89+
8190
private class ClientSideIteratorEnvironment implements IteratorEnvironment {
8291

8392
private SamplerConfiguration samplerConfig;
@@ -95,7 +104,9 @@ public IteratorScope getIteratorScope() {
95104

96105
@Override
97106
public boolean isFullMajorCompaction() {
98-
return false;
107+
// The javadocs state this method will throw an ISE when scope is not majc
108+
throw new IllegalStateException(
109+
"Asked about major compaction type when scope is " + getIteratorScope());
99110
}
100111

101112
@Override
@@ -122,6 +133,16 @@ public boolean isSamplingEnabled() {
122133
public SamplerConfiguration getSamplerConfiguration() {
123134
return samplerConfig;
124135
}
136+
137+
@Override
138+
public PluginEnvironment getPluginEnv() {
139+
return new ClientServiceEnvironmentImpl(context.get());
140+
}
141+
142+
@Override
143+
public TableId getTableId() {
144+
return tableId.get();
145+
}
125146
}
126147

127148
/**
@@ -221,6 +242,22 @@ public ClientSideIteratorScanner(final Scanner scanner) {
221242
if (samplerConfig != null) {
222243
setSamplerConfiguration(samplerConfig);
223244
}
245+
246+
if (scanner instanceof ScannerImpl) {
247+
var scannerImpl = (ScannerImpl) scanner;
248+
this.context = () -> scannerImpl.getClientContext();
249+
this.tableId = () -> scannerImpl.getTableId();
250+
} else {
251+
// These may never be used, so only fail if an attempt is made to use them.
252+
this.context = () -> {
253+
throw new UnsupportedOperationException(
254+
"Do not know how to obtain client context from " + scanner.getClass().getName());
255+
};
256+
this.tableId = () -> {
257+
throw new UnsupportedOperationException(
258+
"Do not know how to obtain tableId from " + scanner.getClass().getName());
259+
};
260+
}
224261
}
225262

226263
/**

core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java

+22-5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import org.apache.accumulo.core.client.AccumuloException;
3737
import org.apache.accumulo.core.client.AccumuloSecurityException;
38+
import org.apache.accumulo.core.client.PluginEnvironment;
3839
import org.apache.accumulo.core.client.SampleNotPresentException;
3940
import org.apache.accumulo.core.client.TableNotFoundException;
4041
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
@@ -79,9 +80,13 @@ static class OfflineIteratorEnvironment implements IteratorEnvironment {
7980
private final AccumuloConfiguration conf;
8081
private final boolean useSample;
8182
private final SamplerConfiguration sampleConf;
83+
private final ClientContext context;
84+
private final TableId tableId;
8285

83-
public OfflineIteratorEnvironment(Authorizations auths, AccumuloConfiguration acuTableConf,
84-
boolean useSample, SamplerConfiguration samplerConf) {
86+
public OfflineIteratorEnvironment(ClientContext context, TableId tableId, Authorizations auths,
87+
AccumuloConfiguration acuTableConf, boolean useSample, SamplerConfiguration samplerConf) {
88+
this.context = context;
89+
this.tableId = tableId;
8590
this.authorizations = auths;
8691
this.conf = acuTableConf;
8792
this.useSample = useSample;
@@ -135,7 +140,18 @@ public IteratorEnvironment cloneWithSamplingEnabled() {
135140
if (sampleConf == null) {
136141
throw new SampleNotPresentException();
137142
}
138-
return new OfflineIteratorEnvironment(authorizations, conf, true, sampleConf);
143+
return new OfflineIteratorEnvironment(context, tableId, authorizations, conf, true,
144+
sampleConf);
145+
}
146+
147+
@Override
148+
public PluginEnvironment getPluginEnv() {
149+
return new ClientServiceEnvironmentImpl(context);
150+
}
151+
152+
@Override
153+
public TableId getTableId() {
154+
return tableId;
139155
}
140156
}
141157

@@ -311,8 +327,9 @@ private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent,
311327

312328
MultiIterator multiIter = new MultiIterator(readers, extent);
313329

314-
OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations, tableCC,
315-
false, samplerConfImpl == null ? null : samplerConfImpl.toSamplerConfiguration());
330+
OfflineIteratorEnvironment iterEnv =
331+
new OfflineIteratorEnvironment(context, tableId, authorizations, tableCC, false,
332+
samplerConfImpl == null ? null : samplerConfImpl.toSamplerConfiguration());
316333

317334
byte[] defaultSecurityLabel;
318335
ColumnVisibility cv =

core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java

+10
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,16 @@ public ScannerImpl(ClientContext context, TableId tableId, Authorizations author
120120
this.size = Constants.SCAN_BATCH_SIZE;
121121
}
122122

123+
public ClientContext getClientContext() {
124+
ensureOpen();
125+
return context;
126+
}
127+
128+
public TableId getTableId() {
129+
ensureOpen();
130+
return tableId;
131+
}
132+
123133
@Override
124134
public synchronized void setRange(Range range) {
125135
ensureOpen();

test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java

+108
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,32 @@
2222
import static org.junit.jupiter.api.Assertions.assertFalse;
2323
import static org.junit.jupiter.api.Assertions.assertTrue;
2424

25+
import java.io.IOException;
2526
import java.util.ArrayList;
2627
import java.util.List;
28+
import java.util.Map;
2729
import java.util.Map.Entry;
30+
import java.util.Set;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.function.Predicate;
33+
import java.util.regex.Pattern;
34+
import java.util.stream.Collectors;
2835

2936
import org.apache.accumulo.core.client.Accumulo;
3037
import org.apache.accumulo.core.client.AccumuloClient;
3138
import org.apache.accumulo.core.client.BatchWriter;
3239
import org.apache.accumulo.core.client.ClientSideIteratorScanner;
3340
import org.apache.accumulo.core.client.IteratorSetting;
3441
import org.apache.accumulo.core.client.Scanner;
42+
import org.apache.accumulo.core.clientImpl.ClientContext;
43+
import org.apache.accumulo.core.clientImpl.OfflineScanner;
3544
import org.apache.accumulo.core.data.Key;
3645
import org.apache.accumulo.core.data.Mutation;
3746
import org.apache.accumulo.core.data.PartialKey;
3847
import org.apache.accumulo.core.data.Value;
48+
import org.apache.accumulo.core.iterators.Filter;
49+
import org.apache.accumulo.core.iterators.IteratorEnvironment;
50+
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
3951
import org.apache.accumulo.core.iterators.user.IntersectingIterator;
4052
import org.apache.accumulo.core.iterators.user.VersioningIterator;
4153
import org.apache.accumulo.core.security.Authorizations;
@@ -154,4 +166,100 @@ public void testVersioning() throws Exception {
154166
assertFalse(csis.iterator().hasNext());
155167
}
156168
}
169+
170+
private static final AtomicBoolean initCalled = new AtomicBoolean(false);
171+
172+
public static class TestPropFilter extends Filter {
173+
174+
private Predicate<Key> keyPredicate;
175+
176+
private Predicate<Key> createRegexPredicate(String regex) {
177+
Predicate<Key> kp = k -> true;
178+
if (regex != null) {
179+
var pattern = Pattern.compile(regex);
180+
kp = k -> pattern.matcher(k.getRowData().toString()).matches();
181+
}
182+
183+
return kp;
184+
}
185+
186+
@Override
187+
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
188+
IteratorEnvironment env) throws IOException {
189+
super.init(source, options, env);
190+
Predicate<Key> generalPredicate =
191+
createRegexPredicate(env.getPluginEnv().getConfiguration().getCustom("testRegex"));
192+
Predicate<Key> tablePredicate = createRegexPredicate(
193+
env.getPluginEnv().getConfiguration(env.getTableId()).getTableCustom("testRegex"));
194+
keyPredicate = generalPredicate.and(tablePredicate);
195+
initCalled.set(true);
196+
}
197+
198+
@Override
199+
public boolean accept(Key k, Value v) {
200+
return keyPredicate.test(k);
201+
}
202+
}
203+
204+
private void runPluginEnvTest(Set<String> expected) throws Exception {
205+
try (var scanner = client.createScanner(tableName)) {
206+
initCalled.set(false);
207+
var csis = new ClientSideIteratorScanner(scanner);
208+
csis.addScanIterator(new IteratorSetting(100, "filter", TestPropFilter.class));
209+
assertEquals(expected,
210+
csis.stream().map(e -> e.getKey().getRowData().toString()).collect(Collectors.toSet()));
211+
// this check is here to ensure the iterator executed client side and not server side
212+
assertTrue(initCalled.get());
213+
}
214+
215+
// The offline scanner also runs iterators client side, so test its client side access to
216+
// accumulo config from iterators also.
217+
client.tableOperations().offline(tableName, true);
218+
var context = (ClientContext) client;
219+
try (OfflineScanner offlineScanner =
220+
new OfflineScanner(context, context.getTableId(tableName), Authorizations.EMPTY)) {
221+
initCalled.set(false);
222+
offlineScanner.addScanIterator(new IteratorSetting(100, "filter", TestPropFilter.class));
223+
assertEquals(expected, offlineScanner.stream().map(e -> e.getKey().getRowData().toString())
224+
.collect(Collectors.toSet()));
225+
assertTrue(initCalled.get());
226+
}
227+
client.tableOperations().online(tableName, true);
228+
}
229+
230+
/**
231+
* Test an iterators ability to access accumulo config in an iterator running client side.
232+
*/
233+
@Test
234+
public void testPluginEnv() throws Exception {
235+
Set<String> rows = Set.of("1234", "abc", "xyz789");
236+
237+
client.tableOperations().create(tableName);
238+
try (BatchWriter bw = client.createBatchWriter(tableName)) {
239+
for (var row : rows) {
240+
Mutation m = new Mutation(row);
241+
m.put("f", "q", "v");
242+
bw.addMutation(m);
243+
}
244+
}
245+
246+
runPluginEnvTest(rows);
247+
248+
// The iterator should see the following system property and filter based on it
249+
client.instanceOperations().setProperty("general.custom.testRegex", ".*[a-z]+.*");
250+
runPluginEnvTest(Set.of("abc", "xyz789"));
251+
252+
// The iterator should see the following table property and filter based on the table and system
253+
// property
254+
client.tableOperations().setProperty(tableName, "table.custom.testRegex", ".*[0-9]+.*");
255+
runPluginEnvTest(Set.of("xyz789"));
256+
257+
// Remove the system property, so filtering should only happen based on the table property
258+
client.instanceOperations().removeProperty("general.custom.testRegex");
259+
runPluginEnvTest(Set.of("1234", "xyz789"));
260+
261+
// Iterator should do no filtering after removing this property
262+
client.tableOperations().removeProperty(tableName, "table.custom.testRegex");
263+
runPluginEnvTest(rows);
264+
}
157265
}

0 commit comments

Comments
 (0)