Skip to content

Commit 05fc7f8

Browse files
feat: Support read local file async (#11869)
Summary: Add struct FileSystemOptions to function registerLocalFileSystem, default readAhead false, Use an executor to submit the read async task, the number thread of pool is half of system concurrency. The performance is same with before when device is SSD, can save about 50% time in HDD. Pull Request resolved: #11869 Reviewed By: yuandagits Differential Revision: D68311321 Pulled By: xiaoxmeng fbshipit-source-id: d091363b8f826ae5fc47252b6c123edcc2ffbdfe
1 parent 9f4d43d commit 05fc7f8

File tree

5 files changed

+127
-18
lines changed

5 files changed

+127
-18
lines changed

velox/common/file/File.cpp

+24-3
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,11 @@ uint64_t InMemoryWriteFile::size() const {
129129
return file_->size();
130130
}
131131

132-
LocalReadFile::LocalReadFile(std::string_view path, bool bufferIo)
133-
: path_(path) {
132+
LocalReadFile::LocalReadFile(
133+
std::string_view path,
134+
folly::Executor* executor,
135+
bool bufferIo)
136+
: executor_(executor), path_(path) {
134137
int32_t flags = O_RDONLY;
135138
#ifdef linux
136139
if (!bufferIo) {
@@ -160,7 +163,8 @@ LocalReadFile::LocalReadFile(std::string_view path, bool bufferIo)
160163
size_ = ret;
161164
}
162165

163-
LocalReadFile::LocalReadFile(int32_t fd) : fd_(fd) {}
166+
LocalReadFile::LocalReadFile(int32_t fd, folly::Executor* executor)
167+
: executor_(executor), fd_(fd) {}
164168

165169
LocalReadFile::~LocalReadFile() {
166170
const int ret = close(fd_);
@@ -245,6 +249,23 @@ uint64_t LocalReadFile::preadv(
245249
return totalBytesRead;
246250
}
247251

252+
folly::SemiFuture<uint64_t> LocalReadFile::preadvAsync(
253+
uint64_t offset,
254+
const std::vector<folly::Range<char*>>& buffers) const {
255+
if (!executor_) {
256+
return ReadFile::preadvAsync(offset, buffers);
257+
}
258+
auto [promise, future] = folly::makePromiseContract<uint64_t>();
259+
executor_->add([this,
260+
_promise = std::move(promise),
261+
_offset = offset,
262+
_buffers = buffers]() mutable {
263+
auto delegateFuture = ReadFile::preadvAsync(_offset, _buffers);
264+
_promise.setTry(std::move(delegateFuture).getTry());
265+
});
266+
return std::move(future);
267+
}
268+
248269
uint64_t LocalReadFile::size() const {
249270
return size_;
250271
}

velox/common/file/File.h

+15-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <string>
3535
#include <string_view>
3636

37+
#include <folly/Executor.h>
3738
#include <folly/Range.h>
3839
#include <folly/futures/Future.h>
3940

@@ -266,11 +267,14 @@ class InMemoryWriteFile final : public WriteFile {
266267
/// files match against any filepath starting with '/'.
267268
class LocalReadFile final : public ReadFile {
268269
public:
269-
explicit LocalReadFile(std::string_view path, bool bufferIo = true);
270+
LocalReadFile(
271+
std::string_view path,
272+
folly::Executor* executor = nullptr,
273+
bool bufferIo = true);
270274

271275
/// TODO: deprecate this after creating local file all through velox fs
272276
/// interface.
273-
explicit LocalReadFile(int32_t fd);
277+
LocalReadFile(int32_t fd, folly::Executor* executor = nullptr);
274278

275279
~LocalReadFile();
276280

@@ -283,6 +287,14 @@ class LocalReadFile final : public ReadFile {
283287
uint64_t offset,
284288
const std::vector<folly::Range<char*>>& buffers) const final;
285289

290+
folly::SemiFuture<uint64_t> preadvAsync(
291+
uint64_t offset,
292+
const std::vector<folly::Range<char*>>& buffers) const override;
293+
294+
bool hasPreadvAsync() const override {
295+
return executor_ != nullptr;
296+
}
297+
286298
uint64_t memoryUsage() const final;
287299

288300
bool shouldCoalesce() const final {
@@ -303,6 +315,7 @@ class LocalReadFile final : public ReadFile {
303315
private:
304316
void preadInternal(uint64_t offset, uint64_t length, char* pos) const;
305317

318+
folly::Executor* const executor_;
306319
std::string path_;
307320
int32_t fd_;
308321
long size_;

velox/common/file/FileSystems.cpp

+35-11
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
#include "velox/common/file/FileSystems.h"
18+
#include <folly/executors/CPUThreadPoolExecutor.h>
1819
#include <folly/synchronization/CallOnce.h>
1920
#include "velox/common/base/Exceptions.h"
2021
#include "velox/common/file/File.h"
@@ -79,10 +80,27 @@ folly::once_flag localFSInstantiationFlag;
7980
// Implement Local FileSystem.
8081
class LocalFileSystem : public FileSystem {
8182
public:
82-
explicit LocalFileSystem(std::shared_ptr<const config::ConfigBase> config)
83-
: FileSystem(config) {}
83+
LocalFileSystem(
84+
std::shared_ptr<const config::ConfigBase> config,
85+
const FileSystemOptions& options)
86+
: FileSystem(config),
87+
executor_(
88+
options.readAheadEnabled
89+
? std::make_unique<folly::CPUThreadPoolExecutor>(
90+
std::max(
91+
1,
92+
static_cast<int32_t>(
93+
std::thread::hardware_concurrency() / 2)),
94+
std::make_shared<folly::NamedThreadFactory>(
95+
"LocalReadahead"))
96+
: nullptr) {}
8497

85-
~LocalFileSystem() override {}
98+
~LocalFileSystem() override {
99+
if (executor_) {
100+
executor_->stop();
101+
LOG(INFO) << "Executor " << executor_->getName() << " stopped.";
102+
}
103+
}
86104

87105
std::string name() const override {
88106
return "Local FS";
@@ -98,7 +116,8 @@ class LocalFileSystem : public FileSystem {
98116
std::unique_ptr<ReadFile> openFileForRead(
99117
std::string_view path,
100118
const FileOptions& options) override {
101-
return std::make_unique<LocalReadFile>(extractPath(path), options.bufferIo);
119+
return std::make_unique<LocalReadFile>(
120+
extractPath(path), executor_.get(), options.bufferIo);
102121
}
103122

104123
std::unique_ptr<WriteFile> openFileForWrite(
@@ -216,23 +235,28 @@ class LocalFileSystem : public FileSystem {
216235

217236
static std::function<std::shared_ptr<
218237
FileSystem>(std::shared_ptr<const config::ConfigBase>, std::string_view)>
219-
fileSystemGenerator() {
220-
return [](std::shared_ptr<const config::ConfigBase> properties,
221-
std::string_view filePath) {
238+
fileSystemGenerator(const FileSystemOptions& options) {
239+
return [options](
240+
std::shared_ptr<const config::ConfigBase> properties,
241+
std::string_view filePath) {
222242
// One instance of Local FileSystem is sufficient.
223243
// Initialize on first access and reuse after that.
224244
static std::shared_ptr<FileSystem> lfs;
225-
folly::call_once(localFSInstantiationFlag, [&properties]() {
226-
lfs = std::make_shared<LocalFileSystem>(properties);
245+
folly::call_once(localFSInstantiationFlag, [properties, options]() {
246+
lfs = std::make_shared<LocalFileSystem>(properties, options);
227247
});
228248
return lfs;
229249
};
230250
}
251+
252+
private:
253+
const std::unique_ptr<folly::CPUThreadPoolExecutor> executor_;
231254
};
232255
} // namespace
233256

234-
void registerLocalFileSystem() {
257+
void registerLocalFileSystem(const FileSystemOptions& options) {
235258
registerFileSystem(
236-
LocalFileSystem::schemeMatcher(), LocalFileSystem::fileSystemGenerator());
259+
LocalFileSystem::schemeMatcher(),
260+
LocalFileSystem::fileSystemGenerator(options));
237261
}
238262
} // namespace facebook::velox::filesystems

velox/common/file/FileSystems.h

+9-1
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ struct DirectoryOptions : FileOptions {
7777
"make-directory-config"};
7878
};
7979

80+
struct FileSystemOptions {
81+
/// As for now, only local file system respects this option. It implements
82+
/// async read by using a background cpu executor. Some filesystem might has
83+
/// native async read-ahead support.
84+
bool readAheadEnabled{false};
85+
};
86+
8087
/// An abstract FileSystem
8188
class FileSystem {
8289
public:
@@ -161,6 +168,7 @@ void registerFileSystem(
161168
std::string_view)> fileSystemGenerator);
162169

163170
/// Register the local filesystem.
164-
void registerLocalFileSystem();
171+
void registerLocalFileSystem(
172+
const FileSystemOptions& options = FileSystemOptions());
165173

166174
} // namespace facebook::velox::filesystems

velox/common/file/tests/FileTest.cpp

+44-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
#include <fcntl.h>
18+
#include <folly/executors/CPUThreadPoolExecutor.h>
1819

1920
#include "velox/common/base/tests/GTestUtils.h"
2021
#include "velox/common/file/File.h"
@@ -66,7 +67,10 @@ void writeDataWithOffset(WriteFile* writeFile) {
6667
ASSERT_EQ(writeFile->size(), 15 + kOneMB);
6768
}
6869

69-
void readData(ReadFile* readFile, bool checkFileSize = true) {
70+
void readData(
71+
ReadFile* readFile,
72+
bool checkFileSize = true,
73+
bool testReadAsync = false) {
7074
if (checkFileSize) {
7175
ASSERT_EQ(readFile->size(), 15 + kOneMB);
7276
}
@@ -105,6 +109,30 @@ void readData(ReadFile* readFile, bool checkFileSize = true) {
105109
ASSERT_EQ(std::string_view(head, sizeof(head)), "aaaaabbbbbcc");
106110
ASSERT_EQ(std::string_view(middle, sizeof(middle)), "cccc");
107111
ASSERT_EQ(std::string_view(tail, sizeof(tail)), "ccddddd");
112+
if (testReadAsync) {
113+
std::vector<folly::Range<char*>> buffers1 = {
114+
folly::Range<char*>(head, sizeof(head)),
115+
folly::Range<char*>(nullptr, (char*)(uint64_t)500000)};
116+
auto future1 = readFile->preadvAsync(0, buffers1);
117+
const auto offset1 = sizeof(head) + 500000;
118+
std::vector<folly::Range<char*>> buffers2 = {
119+
folly::Range<char*>(middle, sizeof(middle)),
120+
folly::Range<char*>(
121+
nullptr,
122+
(char*)(uint64_t)(15 + kOneMB - offset1 - sizeof(middle) -
123+
sizeof(tail)))};
124+
auto future2 = readFile->preadvAsync(offset1, buffers2);
125+
std::vector<folly::Range<char*>> buffers3 = {
126+
folly::Range<char*>(tail, sizeof(tail))};
127+
const auto offset2 = 15 + kOneMB - sizeof(tail);
128+
auto future3 = readFile->preadvAsync(offset2, buffers3);
129+
ASSERT_EQ(offset1, future1.wait().value());
130+
ASSERT_EQ(offset2 - offset1, future2.wait().value());
131+
ASSERT_EQ(sizeof(tail), future3.wait().value());
132+
ASSERT_EQ(std::string_view(head, sizeof(head)), "aaaaabbbbbcc");
133+
ASSERT_EQ(std::string_view(middle, sizeof(middle)), "cccc");
134+
ASSERT_EQ(std::string_view(tail, sizeof(tail)), "ccddddd");
135+
}
108136
}
109137

110138
// We could templated this test, but that's kinda overkill for how simple it is.
@@ -157,6 +185,13 @@ class LocalFileTest : public ::testing::TestWithParam<bool> {
157185
}
158186

159187
const bool useFaultyFs_;
188+
const std::unique_ptr<folly::CPUThreadPoolExecutor> executor_ =
189+
std::make_unique<folly::CPUThreadPoolExecutor>(
190+
std::max(
191+
1,
192+
static_cast<int32_t>(std::thread::hardware_concurrency() / 2)),
193+
std::make_shared<folly::NamedThreadFactory>(
194+
"LocalFileReadAheadTest"));
160195
};
161196

162197
TEST_P(LocalFileTest, writeAndRead) {
@@ -185,6 +220,14 @@ TEST_P(LocalFileTest, writeAndRead) {
185220
writeFile->close();
186221
ASSERT_EQ(writeFile->size(), 15 + kOneMB);
187222
}
223+
// Test read async.
224+
if (!useFaultyFs_) {
225+
auto readFile =
226+
std::make_shared<LocalReadFile>(filename, executor_.get());
227+
readData(readFile.get(), true, true);
228+
auto readFileWithoutExecutor = std::make_shared<LocalReadFile>(filename);
229+
readData(readFileWithoutExecutor.get(), true, true);
230+
}
188231
auto readFile = fs->openFileForRead(filename);
189232
readData(readFile.get());
190233
}

0 commit comments

Comments
 (0)