Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prefetch blocks and place into data BlockCache for major compactions #5302

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,14 @@ public enum Property {
// Compactor properties
COMPACTOR_PREFIX("compactor.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the accumulo compactor server.", "2.1.0"),
COMPACTOR_RFILE_BLOCK_PREFETCH_COUNT("compactor.prefetch.rfile.blocks", "0", PropertyType.COUNT,
"Number of RFile blocks to prefetch and load into the data block cache. Requires"
+ " compactor.cache.data.size to be greater than zero.",
"4.0.0"),
COMPACTOR_DATACACHE_SIZE("compactor.cache.data.size", "0", PropertyType.MEMORY,
"Specifies the size of the cache for RFile data blocks on each compactor.", "4.0.0"),
COMPACTOR_DEFAULT_BLOCKSIZE("compactor.default.blocksize", "1M", PropertyType.BYTES,
"Specifies a default blocksize for the compactor caches.", "4.0.0"),
COMPACTOR_CANCEL_CHECK_INTERVAL("compactor.cancel.check.interval", "5m",
PropertyType.TIMEDURATION,
"Interval at which Compactors will check to see if the currently executing compaction"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,14 @@ protected static class FileOptions {
public final Set<ByteSequence> columnFamilies;
public final boolean inclusive;
public final boolean dropCacheBehind;
public final int prefetchBlocks;

protected FileOptions(AccumuloConfiguration tableConfiguration, TabletFile file, FileSystem fs,
Configuration fsConf, String compression, FSDataOutputStream outputStream,
boolean enableAccumuloStart, CacheProvider cacheProvider, Cache<String,Long> fileLenCache,
boolean seekToBeginning, CryptoService cryptoService, Range range,
Set<ByteSequence> columnFamilies, boolean inclusive, boolean dropCacheBehind) {
Set<ByteSequence> columnFamilies, boolean inclusive, boolean dropCacheBehind,
int prefetchBlocks) {
this.tableConfiguration = tableConfiguration;
this.file = Objects.requireNonNull(file);
this.fs = fs;
Expand All @@ -210,6 +212,7 @@ protected FileOptions(AccumuloConfiguration tableConfiguration, TabletFile file,
this.columnFamilies = columnFamilies;
this.inclusive = inclusive;
this.dropCacheBehind = dropCacheBehind;
this.prefetchBlocks = prefetchBlocks;
}

public AccumuloConfiguration getTableConfiguration() {
Expand Down Expand Up @@ -279,6 +282,7 @@ public static class FileHelper {
private Configuration fsConf;
private CryptoService cryptoService;
private boolean dropCacheBehind = false;
private int prefetchBlocks = 0;

protected FileHelper fs(FileSystem fs) {
this.fs = Objects.requireNonNull(fs);
Expand All @@ -295,6 +299,11 @@ protected FileHelper file(TabletFile file) {
return this;
}

protected FileHelper prefetchBlocks(int blocks) {
this.prefetchBlocks = blocks;
return this;
}

protected FileHelper tableConfiguration(AccumuloConfiguration tableConfiguration) {
this.tableConfiguration = Objects.requireNonNull(tableConfiguration);
return this;
Expand All @@ -314,25 +323,26 @@ protected FileOptions toWriterBuilderOptions(String compression,
FSDataOutputStream outputStream, boolean startEnabled) {
return new FileOptions(tableConfiguration, file, fs, fsConf, compression, outputStream,
startEnabled, NULL_PROVIDER, null, false, cryptoService, null, null, true,
dropCacheBehind);
dropCacheBehind, 0);
}

protected FileOptions toReaderBuilderOptions(CacheProvider cacheProvider,
Cache<String,Long> fileLenCache, boolean seekToBeginning) {
return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, false,
cacheProvider == null ? NULL_PROVIDER : cacheProvider, fileLenCache, seekToBeginning,
cryptoService, null, null, true, dropCacheBehind);
cryptoService, null, null, true, dropCacheBehind, prefetchBlocks);
}

protected FileOptions toIndexReaderBuilderOptions(Cache<String,Long> fileLenCache) {
return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, false, NULL_PROVIDER,
fileLenCache, false, cryptoService, null, null, true, dropCacheBehind);
fileLenCache, false, cryptoService, null, null, true, dropCacheBehind, 0);
}

protected FileOptions toScanReaderBuilderOptions(Range range, Set<ByteSequence> columnFamilies,
boolean inclusive) {
return new FileOptions(tableConfiguration, file, fs, fsConf, null, null, false, NULL_PROVIDER,
null, false, cryptoService, range, columnFamilies, inclusive, dropCacheBehind);
null, false, cryptoService, range, columnFamilies, inclusive, dropCacheBehind,
prefetchBlocks);
}

protected AccumuloConfiguration getTableConfiguration() {
Expand Down Expand Up @@ -407,6 +417,14 @@ public ReaderTableConfiguration forFile(TabletFile file, FileSystem fs, Configur
return this;
}

/**
* As the file is read, prefetch the next N blocks
*/
public ReaderBuilder prefetch(int blocks) {
this.prefetchBlocks(blocks);
return this;
}

@Override
public ReaderBuilder withTableConfiguration(AccumuloConfiguration tableConfiguration) {
tableConfiguration(tableConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public static BlockCacheConfiguration forScanServer(AccumuloConfiguration conf)
Property.SSERV_DEFAULT_BLOCKSIZE);
}

public static BlockCacheConfiguration forCompactor(AccumuloConfiguration conf) {
return new BlockCacheConfiguration(conf, Property.COMPACTOR_PREFIX,
Property.COMPACTOR_DATACACHE_SIZE, Property.COMPACTOR_DEFAULT_BLOCKSIZE);
}

private BlockCacheConfiguration(AccumuloConfiguration conf, Property serverPrefix,
Property indexCacheSizeProperty, Property dataCacheSizeProperty,
Property summaryCacheSizeProperty, Property defaultBlockSizeProperty) {
Expand All @@ -66,6 +71,17 @@ private BlockCacheConfiguration(AccumuloConfiguration conf, Property serverPrefi
this.blockSize = conf.getAsBytes(defaultBlockSizeProperty);
}

private BlockCacheConfiguration(AccumuloConfiguration conf, Property serverPrefix,
Property dataCacheSizeProperty, Property defaultBlockSizeProperty) {
this.serverPrefix = serverPrefix;
this.genProps = conf.getAllPropertiesWithPrefix(serverPrefix);
this.indexMaxSize = 0L;
this.dataMaxSize = conf.getAsBytes(dataCacheSizeProperty);
this.summaryMaxSize = 0L;
this.blockSize = conf.getAsBytes(defaultBlockSizeProperty);

}

@Override
public long getMaxSize(CacheType type) {
switch (type) {
Expand All @@ -87,7 +103,7 @@ public long getBlockSize() {

@Override
public String toString() {
return "indexMaxSize: " + indexMaxSize + "dataMaxSize: " + dataMaxSize + "summaryMaxSize: "
return "indexMaxSize: " + indexMaxSize + " dataMaxSize: " + dataMaxSize + " summaryMaxSize: "
+ summaryMaxSize + ", blockSize: " + getBlockSize();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import org.apache.accumulo.core.file.rfile.BlockIndex;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.IndexEntry;
import org.apache.accumulo.core.file.rfile.MultiLevelIndex.Reader.IndexIterator;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile;
import org.apache.accumulo.core.file.rfile.bcfile.BCFile.Reader.BlockReader;
import org.apache.accumulo.core.file.rfile.bcfile.MetaBlockDoesNotExist;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.BlockCache.Loader;
import org.apache.accumulo.core.spi.cache.CacheEntry;
import org.apache.accumulo.core.spi.crypto.CryptoService;
import org.apache.accumulo.core.util.threads.ThreadPoolNames;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -69,7 +74,9 @@ public static String pathToCacheId(Path p) {
public static class CachableBuilder {
String cacheId = null;
IoeSupplier<FSDataInputStream> inputSupplier = null;
IoeSupplier<Long> blockSizeSupplier = null;
IoeSupplier<Long> lengthSupplier = null;
int numPrefetchBlocks = 0;
Cache<String,Long> fileLenCache = null;
volatile CacheProvider cacheProvider = CacheProvider.NULL_PROVIDER;
Configuration hadoopConf = null;
Expand All @@ -81,10 +88,11 @@ public CachableBuilder conf(Configuration hadoopConf) {
}

public CachableBuilder fsPath(FileSystem fs, Path dataFile) {
return fsPath(fs, dataFile, false);
return fsPath(fs, dataFile, false, 0);
}

public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean dropCacheBehind) {
public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean dropCacheBehind,
int blocksToPrefetch) {
this.cacheId = pathToCacheId(dataFile);
this.inputSupplier = () -> {
FSDataInputStream is = fs.open(dataFile);
Expand All @@ -104,6 +112,7 @@ public CachableBuilder fsPath(FileSystem fs, Path dataFile, boolean dropCacheBeh
return is;
};
this.lengthSupplier = () -> fs.getFileStatus(dataFile).getLen();
this.numPrefetchBlocks = blocksToPrefetch;
return this;
}

Expand Down Expand Up @@ -149,6 +158,8 @@ public static class Reader implements Closeable {

private final IoeSupplier<FSDataInputStream> inputSupplier;
private final IoeSupplier<Long> lengthSupplier;
private final int numPrefetchBlocks;
private final ThreadPoolExecutor prefetchBlockThreadPool;
private final AtomicReference<BCFile.Reader> bcfr = new AtomicReference<>();

private static final String ROOT_BLOCK_NAME = "!RootData";
Expand Down Expand Up @@ -371,10 +382,18 @@ public Reader(CachableBuilder b) {
this.cacheId = Objects.requireNonNull(b.cacheId);
this.inputSupplier = b.inputSupplier;
this.lengthSupplier = b.lengthSupplier;
this.numPrefetchBlocks = b.numPrefetchBlocks;
this.fileLenCache = b.fileLenCache;
this.cacheProvider = b.cacheProvider;
this.conf = b.hadoopConf;
this.cryptoService = Objects.requireNonNull(b.cryptoService);
if (this.numPrefetchBlocks > 0) {
this.prefetchBlockThreadPool =
ThreadPools.getServerThreadPools().getPoolBuilder(ThreadPoolNames.BLOCK_READ_AHEAD_POOL)
.numCoreThreads(this.numPrefetchBlocks).build();
} else {
this.prefetchBlockThreadPool = null;
}
}

/**
Expand Down Expand Up @@ -421,6 +440,96 @@ public CachedBlockRead getMetaBlock(long offset, long compressedSize, long rawSi
return new CachedBlockRead(_currBlock);
}

public CachableBlockFile.CachedBlockRead getDataBlock(int version, int startBlock,
IndexIterator iiter, IndexEntry indexEntry) throws IOException {

final BlockCache dataBlockCache = cacheProvider.getDataCache();

if (version == 3 || version == 4) {
final int thisBlockIndex = startBlock + iiter.previousIndex();
// TODO: Question - does pre-fetching a block cause any type of
// issue with the underlying FSDataInputStream?
if (numPrefetchBlocks > 0 && dataBlockCache != null) {
// move forward in the index and prefetch the data blocks
int fetched = 0;
for (int i = 0; i < numPrefetchBlocks; i++) {
if (iiter.hasNext()) {
iiter.next();
this.prefetchBlockThreadPool.execute(() -> {
int blockIndex = startBlock + iiter.previousIndex();
String name = this.cacheId + "O" + blockIndex;
try {
log.debug("Prefetching data block: {}", name);
if (null
== dataBlockCache.getBlock(name, new OffsetBlockLoader(blockIndex, false))) {
log.debug("Prefetching data block {} did not work", name);
}
} catch (Exception e) {
if (e instanceof IOException) {
// It's possible that the underlying file could be closed while
// we are trying to prefetch the next block.
log.info("IOException thrown while trying to prefetch data block, msg: {}",
e.getMessage());
log.debug("IOException details", e);
} else if (e instanceof InterruptedException && closed) {
// The reader was closed causing the thread pool to be
// shutdown, ignore these.
}
}

});
fetched++;
}
}
// rewind the index iterator
for (int j = 0; j < fetched; j++) {
iiter.previous();
}
}
return getDataBlock(thisBlockIndex);
} else {
if (numPrefetchBlocks > 0 && dataBlockCache != null) {
// move forward in the index and prefetch the data blocks
int fetched = 0;
for (int i = 0; i < numPrefetchBlocks; i++) {
if (iiter.hasNext()) {
final IndexEntry next = iiter.next();
this.prefetchBlockThreadPool.execute(() -> {
final long offset = next.getOffset();
String name = this.cacheId + "R" + offset;
try {
log.debug("Prefetching data block: {}", name);
if (null == dataBlockCache.getBlock(name, new RawBlockLoader(offset,
next.getCompressedSize(), next.getRawSize(), false))) {
log.debug("Prefetching data block {} did not work", name);
}
} catch (Exception e) {
if (e instanceof IOException) {
// It's possible that the underlying file could be closed while
// we are trying to prefetch the next block.
log.info("IOException thrown while trying to prefetch data block, msg: {}",
e.getMessage());
log.debug("IOException details", e);
} else if (e instanceof InterruptedException && closed) {
// The reader was closed causing the thread pool to be
// shutdown, ignore these.
}
}
});
fetched++;
}
}
// rewind the index iterator
for (int j = 0; j < fetched; j++) {
iiter.previous();
}
}
return getDataBlock(indexEntry.getOffset(), indexEntry.getCompressedSize(),
indexEntry.getRawSize());
}

}

/**
* It is intended that once the BlockRead object is returned to the caller, that the caller will
* read the entire block and then call close on the BlockRead class.
Expand All @@ -429,7 +538,7 @@ public CachedBlockRead getMetaBlock(long offset, long compressedSize, long rawSi
* read from disk and other threads check the cache before it has been inserted.
*/

public CachedBlockRead getDataBlock(int blockIndex) throws IOException {
private CachedBlockRead getDataBlock(int blockIndex) throws IOException {
BlockCache _dCache = cacheProvider.getDataCache();
if (_dCache != null) {
String _lookup = this.cacheId + "O" + blockIndex;
Expand All @@ -443,7 +552,7 @@ public CachedBlockRead getDataBlock(int blockIndex) throws IOException {
return new CachedBlockRead(_currBlock);
}

public CachedBlockRead getDataBlock(long offset, long compressedSize, long rawSize)
private CachedBlockRead getDataBlock(long offset, long compressedSize, long rawSize)
throws IOException {
BlockCache _dCache = cacheProvider.getDataCache();
if (_dCache != null) {
Expand All @@ -467,6 +576,10 @@ public synchronized void close() throws IOException {

closed = true;

if (prefetchBlockThreadPool != null) {
prefetchBlockThreadPool.shutdownNow();
}

BCFile.Reader reader = bcfr.getAndSet(null);
if (reader != null) {
reader.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -893,13 +893,7 @@ private CachableBlockFile.CachedBlockRead getDataBlock(IndexEntry indexEntry)
throw new IterationInterruptedException();
}

if (version == RINDEX_VER_3 || version == RINDEX_VER_4) {
return reader.getDataBlock(startBlock + iiter.previousIndex());
} else {
return reader.getDataBlock(indexEntry.getOffset(), indexEntry.getCompressedSize(),
indexEntry.getRawSize());
}

return reader.getDataBlock(version, startBlock, iiter, indexEntry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public class RFileOperations extends FileOperations {

private static RFileSKVIterator getReader(FileOptions options) throws IOException {
CachableBuilder cb = new CachableBuilder()
.fsPath(options.getFileSystem(), options.getFile().getPath(), options.dropCacheBehind)
.fsPath(options.getFileSystem(), options.getFile().getPath(), options.dropCacheBehind,
options.prefetchBlocks)
.conf(options.getConfiguration()).fileLen(options.getFileLenCache())
.cacheProvider(options.cacheProvider).cryptoService(options.getCryptoService());
return RFile.getReader(cb, options.getFile());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ public interface Configuration {
*/
public void start(Configuration conf) {
for (CacheType type : CacheType.values()) {
BlockCache cache = this.createCache(conf, type);
this.caches.put(type, cache);
if (conf.getMaxSize(type) > 0) {
BlockCache cache = this.createCache(conf, type);
this.caches.put(type, cache);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public enum ThreadPoolNames {
ACCUMULO_POOL_PREFIX("accumulo.pool"),
BATCH_WRITER_SEND_POOL("accumulo.pool.batch.writer.send"),
BATCH_WRITER_BIN_MUTATIONS_POOL("accumulo.pool.batch.writer.bin.mutations"),
BLOCK_READ_AHEAD_POOL("accumulo.pool.block.read.ahead"),
BLOOM_LOADER_POOL("accumulo.pool.bloom.loader"),
BULK_IMPORT_CLIENT_LOAD_POOL("accumulo.pool.bulk.import.client.bulk.load"),
BULK_IMPORT_CLIENT_BULK_THREADS_POOL("accumulo.pool.bulk.import.client.bulk.threads"),
Expand Down
Loading