Skip to content

Commit 7947c2c

Browse files
authored
adds PluginEnv support to client side iterators (apache#4283)
Accumulo code that ran user iterators client side would throw an unsupported operations exception when attempting to access the PluginEnv. This commit adds support for PluginEnv in situations where iterators are run client side.
1 parent ec8ae12 commit 7947c2c

File tree

4 files changed

+192
-6
lines changed

4 files changed

+192
-6
lines changed

core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java

+45-1
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,20 @@
2929
import java.util.Set;
3030
import java.util.TreeMap;
3131
import java.util.TreeSet;
32+
import java.util.function.Supplier;
3233

3334
import org.apache.accumulo.core.Constants;
3435
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
36+
import org.apache.accumulo.core.clientImpl.ClientContext;
37+
import org.apache.accumulo.core.clientImpl.ClientServiceEnvironmentImpl;
38+
import org.apache.accumulo.core.clientImpl.ScannerImpl;
3539
import org.apache.accumulo.core.clientImpl.ScannerOptions;
3640
import org.apache.accumulo.core.data.ArrayByteSequence;
3741
import org.apache.accumulo.core.data.ByteSequence;
3842
import org.apache.accumulo.core.data.Column;
3943
import org.apache.accumulo.core.data.Key;
4044
import org.apache.accumulo.core.data.Range;
45+
import org.apache.accumulo.core.data.TableId;
4146
import org.apache.accumulo.core.data.Value;
4247
import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
4348
import org.apache.accumulo.core.iterators.IteratorAdapter;
@@ -47,6 +52,7 @@
4752
import org.apache.accumulo.core.iteratorsImpl.IteratorBuilder;
4853
import org.apache.accumulo.core.iteratorsImpl.IteratorConfigUtil;
4954
import org.apache.accumulo.core.security.Authorizations;
55+
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
5056
import org.apache.hadoop.io.Text;
5157

5258
/**
@@ -70,13 +76,17 @@
7076
* server side) and to the client side scanner (which will execute client side).
7177
*/
7278
public class ClientSideIteratorScanner extends ScannerOptions implements Scanner {
79+
7380
private int size;
7481

7582
private Range range;
7683
private boolean isolated = false;
7784
private long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD;
7885
private SamplerConfiguration iteratorSamplerConfig;
7986

87+
private final Supplier<ClientContext> context;
88+
private final Supplier<TableId> tableId;
89+
8090
private class ClientSideIteratorEnvironment implements IteratorEnvironment {
8191

8292
private SamplerConfiguration samplerConfig;
@@ -94,7 +104,9 @@ public IteratorScope getIteratorScope() {
94104

95105
@Override
96106
public boolean isFullMajorCompaction() {
97-
return false;
107+
// The javadocs state this method will throw an ISE when scope is not majc
108+
throw new IllegalStateException(
109+
"Asked about major compaction type when scope is " + getIteratorScope());
98110
}
99111

100112
@Override
@@ -121,6 +133,22 @@ public boolean isSamplingEnabled() {
121133
public SamplerConfiguration getSamplerConfiguration() {
122134
return samplerConfig;
123135
}
136+
137+
@Deprecated(since = "2.1.0")
138+
@Override
139+
public ServiceEnvironment getServiceEnv() {
140+
return new ClientServiceEnvironmentImpl(context.get());
141+
}
142+
143+
@Override
144+
public PluginEnvironment getPluginEnv() {
145+
return new ClientServiceEnvironmentImpl(context.get());
146+
}
147+
148+
@Override
149+
public TableId getTableId() {
150+
return tableId.get();
151+
}
124152
}
125153

126154
/**
@@ -220,6 +248,22 @@ public ClientSideIteratorScanner(final Scanner scanner) {
220248
if (samplerConfig != null) {
221249
setSamplerConfiguration(samplerConfig);
222250
}
251+
252+
if (scanner instanceof ScannerImpl) {
253+
var scannerImpl = (ScannerImpl) scanner;
254+
this.context = () -> scannerImpl.getClientContext();
255+
this.tableId = () -> scannerImpl.getTableId();
256+
} else {
257+
// These may never be used, so only fail if an attempt is made to use them.
258+
this.context = () -> {
259+
throw new UnsupportedOperationException(
260+
"Do not know how to obtain client context from " + scanner.getClass().getName());
261+
};
262+
this.tableId = () -> {
263+
throw new UnsupportedOperationException(
264+
"Do not know how to obtain tableId from " + scanner.getClass().getName());
265+
};
266+
}
223267
}
224268

225269
/**

core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java

+29-5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import org.apache.accumulo.core.client.AccumuloException;
3636
import org.apache.accumulo.core.client.AccumuloSecurityException;
37+
import org.apache.accumulo.core.client.PluginEnvironment;
3738
import org.apache.accumulo.core.client.SampleNotPresentException;
3839
import org.apache.accumulo.core.client.TableNotFoundException;
3940
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
@@ -65,6 +66,7 @@
6566
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
6667
import org.apache.accumulo.core.security.Authorizations;
6768
import org.apache.accumulo.core.security.ColumnVisibility;
69+
import org.apache.accumulo.core.spi.common.ServiceEnvironment;
6870
import org.apache.accumulo.core.util.LocalityGroupUtil;
6971
import org.apache.accumulo.core.volume.VolumeConfiguration;
7072
import org.apache.hadoop.conf.Configuration;
@@ -79,9 +81,13 @@ static class OfflineIteratorEnvironment implements IteratorEnvironment {
7981
private final AccumuloConfiguration conf;
8082
private final boolean useSample;
8183
private final SamplerConfiguration sampleConf;
84+
private final ClientContext context;
85+
private final TableId tableId;
8286

83-
public OfflineIteratorEnvironment(Authorizations auths, AccumuloConfiguration acuTableConf,
84-
boolean useSample, SamplerConfiguration samplerConf) {
87+
public OfflineIteratorEnvironment(ClientContext context, TableId tableId, Authorizations auths,
88+
AccumuloConfiguration acuTableConf, boolean useSample, SamplerConfiguration samplerConf) {
89+
this.context = context;
90+
this.tableId = tableId;
8591
this.authorizations = auths;
8692
this.conf = acuTableConf;
8793
this.useSample = useSample;
@@ -147,7 +153,24 @@ public IteratorEnvironment cloneWithSamplingEnabled() {
147153
if (sampleConf == null) {
148154
throw new SampleNotPresentException();
149155
}
150-
return new OfflineIteratorEnvironment(authorizations, conf, true, sampleConf);
156+
return new OfflineIteratorEnvironment(context, tableId, authorizations, conf, true,
157+
sampleConf);
158+
}
159+
160+
@Deprecated(since = "2.1.0")
161+
@Override
162+
public ServiceEnvironment getServiceEnv() {
163+
return new ClientServiceEnvironmentImpl(context);
164+
}
165+
166+
@Override
167+
public PluginEnvironment getPluginEnv() {
168+
return new ClientServiceEnvironmentImpl(context);
169+
}
170+
171+
@Override
172+
public TableId getTableId() {
173+
return tableId;
151174
}
152175
}
153176

@@ -322,8 +345,9 @@ private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent,
322345

323346
MultiIterator multiIter = new MultiIterator(readers, extent);
324347

325-
OfflineIteratorEnvironment iterEnv = new OfflineIteratorEnvironment(authorizations, tableCC,
326-
false, samplerConfImpl == null ? null : samplerConfImpl.toSamplerConfiguration());
348+
OfflineIteratorEnvironment iterEnv =
349+
new OfflineIteratorEnvironment(context, tableId, authorizations, tableCC, false,
350+
samplerConfImpl == null ? null : samplerConfImpl.toSamplerConfiguration());
327351

328352
byte[] defaultSecurityLabel;
329353
ColumnVisibility cv =

core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java

+10
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,16 @@ public ScannerImpl(ClientContext context, TableId tableId, Authorizations author
120120
this.size = Constants.SCAN_BATCH_SIZE;
121121
}
122122

123+
public ClientContext getClientContext() {
124+
ensureOpen();
125+
return context;
126+
}
127+
128+
public TableId getTableId() {
129+
ensureOpen();
130+
return tableId;
131+
}
132+
123133
@Override
124134
public synchronized void setRange(Range range) {
125135
ensureOpen();

test/src/main/java/org/apache/accumulo/test/ClientSideIteratorIT.java

+108
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,32 @@
2222
import static org.junit.jupiter.api.Assertions.assertFalse;
2323
import static org.junit.jupiter.api.Assertions.assertTrue;
2424

25+
import java.io.IOException;
2526
import java.util.ArrayList;
2627
import java.util.List;
28+
import java.util.Map;
2729
import java.util.Map.Entry;
30+
import java.util.Set;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.function.Predicate;
33+
import java.util.regex.Pattern;
34+
import java.util.stream.Collectors;
2835

2936
import org.apache.accumulo.core.client.Accumulo;
3037
import org.apache.accumulo.core.client.AccumuloClient;
3138
import org.apache.accumulo.core.client.BatchWriter;
3239
import org.apache.accumulo.core.client.ClientSideIteratorScanner;
3340
import org.apache.accumulo.core.client.IteratorSetting;
3441
import org.apache.accumulo.core.client.Scanner;
42+
import org.apache.accumulo.core.clientImpl.ClientContext;
43+
import org.apache.accumulo.core.clientImpl.OfflineScanner;
3544
import org.apache.accumulo.core.data.Key;
3645
import org.apache.accumulo.core.data.Mutation;
3746
import org.apache.accumulo.core.data.PartialKey;
3847
import org.apache.accumulo.core.data.Value;
48+
import org.apache.accumulo.core.iterators.Filter;
49+
import org.apache.accumulo.core.iterators.IteratorEnvironment;
50+
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
3951
import org.apache.accumulo.core.iterators.user.IntersectingIterator;
4052
import org.apache.accumulo.core.iterators.user.VersioningIterator;
4153
import org.apache.accumulo.core.security.Authorizations;
@@ -154,4 +166,100 @@ public void testVersioning() throws Exception {
154166
assertFalse(csis.iterator().hasNext());
155167
}
156168
}
169+
170+
private static final AtomicBoolean initCalled = new AtomicBoolean(false);
171+
172+
public static class TestPropFilter extends Filter {
173+
174+
private Predicate<Key> keyPredicate;
175+
176+
private Predicate<Key> createRegexPredicate(String regex) {
177+
Predicate<Key> kp = k -> true;
178+
if (regex != null) {
179+
var pattern = Pattern.compile(regex);
180+
kp = k -> pattern.matcher(k.getRowData().toString()).matches();
181+
}
182+
183+
return kp;
184+
}
185+
186+
@Override
187+
public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options,
188+
IteratorEnvironment env) throws IOException {
189+
super.init(source, options, env);
190+
Predicate<Key> generalPredicate =
191+
createRegexPredicate(env.getPluginEnv().getConfiguration().getCustom("testRegex"));
192+
Predicate<Key> tablePredicate = createRegexPredicate(
193+
env.getPluginEnv().getConfiguration(env.getTableId()).getTableCustom("testRegex"));
194+
keyPredicate = generalPredicate.and(tablePredicate);
195+
initCalled.set(true);
196+
}
197+
198+
@Override
199+
public boolean accept(Key k, Value v) {
200+
return keyPredicate.test(k);
201+
}
202+
}
203+
204+
private void runPluginEnvTest(Set<String> expected) throws Exception {
205+
try (var scanner = client.createScanner(tableName)) {
206+
initCalled.set(false);
207+
var csis = new ClientSideIteratorScanner(scanner);
208+
csis.addScanIterator(new IteratorSetting(100, "filter", TestPropFilter.class));
209+
assertEquals(expected,
210+
csis.stream().map(e -> e.getKey().getRowData().toString()).collect(Collectors.toSet()));
211+
// this check is here to ensure the iterator executed client side and not server side
212+
assertTrue(initCalled.get());
213+
}
214+
215+
// The offline scanner also runs iterators client side, so test its client side access to
216+
// accumulo config from iterators also.
217+
client.tableOperations().offline(tableName, true);
218+
var context = (ClientContext) client;
219+
try (OfflineScanner offlineScanner =
220+
new OfflineScanner(context, context.getTableId(tableName), Authorizations.EMPTY)) {
221+
initCalled.set(false);
222+
offlineScanner.addScanIterator(new IteratorSetting(100, "filter", TestPropFilter.class));
223+
assertEquals(expected, offlineScanner.stream().map(e -> e.getKey().getRowData().toString())
224+
.collect(Collectors.toSet()));
225+
assertTrue(initCalled.get());
226+
}
227+
client.tableOperations().online(tableName, true);
228+
}
229+
230+
/**
231+
* Test an iterators ability to access accumulo config in an iterator running client side.
232+
*/
233+
@Test
234+
public void testPluginEnv() throws Exception {
235+
Set<String> rows = Set.of("1234", "abc", "xyz789");
236+
237+
client.tableOperations().create(tableName);
238+
try (BatchWriter bw = client.createBatchWriter(tableName)) {
239+
for (var row : rows) {
240+
Mutation m = new Mutation(row);
241+
m.put("f", "q", "v");
242+
bw.addMutation(m);
243+
}
244+
}
245+
246+
runPluginEnvTest(rows);
247+
248+
// The iterator should see the following system property and filter based on it
249+
client.instanceOperations().setProperty("general.custom.testRegex", ".*[a-z]+.*");
250+
runPluginEnvTest(Set.of("abc", "xyz789"));
251+
252+
// The iterator should see the following table property and filter based on the table and system
253+
// property
254+
client.tableOperations().setProperty(tableName, "table.custom.testRegex", ".*[0-9]+.*");
255+
runPluginEnvTest(Set.of("xyz789"));
256+
257+
// Remove the system property, so filtering should only happen based on the table property
258+
client.instanceOperations().removeProperty("general.custom.testRegex");
259+
runPluginEnvTest(Set.of("1234", "xyz789"));
260+
261+
// Iterator should do no filtering after removing this property
262+
client.tableOperations().removeProperty(tableName, "table.custom.testRegex");
263+
runPluginEnvTest(rows);
264+
}
157265
}

0 commit comments

Comments
 (0)