Skip to content

Commit c98fcd3

Browse files
Bikramjeet Vigfacebook-github-bot
Bikramjeet Vig
authored andcommitted
Add option for admission control for filesystem resources (facebookincubator#10452)
Summary: Pull Request resolved: facebookincubator#10452 This change adds a generic admission controller class that can be used for filesystem resources like read bytes in flight or number of read requests in flight. It also provides a way to report stats for resource usage, queued count, queued wait times by allowing the client to specify a metric name. Differential Revision: D59643306
1 parent 2518463 commit c98fcd3

5 files changed

+226
-0
lines changed

velox/dwio/common/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ add_library(
3939
ExecutorBarrier.cpp
4040
FileSink.cpp
4141
FlatMapHelper.cpp
42+
GenericAdmissionController.cpp
4243
OnDemandUnitLoader.cpp
4344
InputStream.cpp
4445
IntDecoder.cpp
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/dwio/common/GenericAdmissionController.h"
18+
19+
#include "velox/common/base/Exceptions.h"
20+
#include "velox/common/base/StatsReporter.h"
21+
#include "velox/common/time/Timer.h"
22+
23+
namespace facebook::velox::dwio::common {
24+
25+
void GenericAdmissionController::accept(uint64_t resourceUnits) {
26+
ContinueFuture future;
27+
uint64_t updatedValue = 0;
28+
VELOX_CHECK_LE(
29+
resourceUnits,
30+
config_.maxLimit,
31+
"A single request cannot exceed the max limit");
32+
{
33+
std::lock_guard<std::mutex> l(mtx);
34+
if (unitsUsed_ + resourceUnits > config_.maxLimit) {
35+
auto [unblockPromise, unblockFuture] = makeVeloxContinuePromiseContract();
36+
Request req{resourceUnits};
37+
req.promise = std::move(unblockPromise);
38+
queue_.push_back(std::move(req));
39+
future = std::move(unblockFuture);
40+
} else {
41+
updatedValue = unitsUsed_ += resourceUnits;
42+
}
43+
}
44+
if (future.valid()) {
45+
if (!config_.resourceQueuedCountMetric.empty()) {
46+
RECORD_METRIC_VALUE(config_.resourceQueuedCountMetric);
47+
}
48+
uint64_t waitTimeUs{0};
49+
{
50+
MicrosecondTimer timer(&waitTimeUs);
51+
future.wait();
52+
}
53+
if (!config_.resourceQueuedTimeMsMetric.empty()) {
54+
RECORD_HISTOGRAM_METRIC_VALUE(
55+
config_.resourceQueuedTimeMsMetric, waitTimeUs / 1'000);
56+
}
57+
return;
58+
}
59+
// Only upadate if there was no wait, as the releasing thread is responsible
60+
// for updating the metric.
61+
if (!config_.resourceUsageMetric.empty()) {
62+
RECORD_METRIC_VALUE(config_.resourceUsageMetric, updatedValue);
63+
}
64+
}
65+
66+
void GenericAdmissionController::release(uint64_t resourceUnits) {
67+
uint64_t updatedValue = 0;
68+
{
69+
std::lock_guard<std::mutex> l(mtx);
70+
VELOX_CHECK_LE(
71+
resourceUnits,
72+
unitsUsed_,
73+
"Cannot release more units than have been acquired");
74+
unitsUsed_ -= resourceUnits;
75+
while (!queue_.empty()) {
76+
auto& request = queue_.front();
77+
if (unitsUsed_ + request.unitsRequested > config_.maxLimit) {
78+
break;
79+
}
80+
unitsUsed_ += request.unitsRequested;
81+
request.promise.setValue();
82+
queue_.pop_front();
83+
}
84+
updatedValue = unitsUsed_;
85+
}
86+
if (!config_.resourceUsageMetric.empty()) {
87+
RECORD_METRIC_VALUE(config_.resourceUsageMetric, updatedValue);
88+
}
89+
}
90+
} // namespace facebook::velox::dwio::common
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include <deque>
19+
#include <mutex>
20+
#include "velox/common/future/VeloxPromise.h"
21+
22+
namespace facebook::velox::dwio::common {
23+
24+
/// A generic admission controller that can be used to limit the number of
25+
/// resources in use and can log metrics like resource usage, queued count,
26+
/// queued wait times. When a calling thread's request for resources surpasses
27+
/// the set limit, it will be placed in a FIFO queue. The thread must then wait
28+
/// until sufficient resources are freed by other threads, addressing all
29+
/// preceding requests in the queue, before its own request can be granted.
30+
class GenericAdmissionController {
31+
public:
32+
struct Config {
33+
uint64_t maxLimit;
34+
/// The metric name for resource usage. If not set, it will not be reported.
35+
std::string resourceUsageMetric;
36+
/// The metric name for resource queued count. If not set, it will not be
37+
/// reported
38+
std::string resourceQueuedCountMetric;
39+
/// The metric name for resource queued wait time. If not set, it will not
40+
/// be reported
41+
std::string resourceQueuedTimeMsMetric;
42+
};
43+
explicit GenericAdmissionController(Config config) : config_(config) {}
44+
45+
void accept(uint64_t resourceUnits);
46+
void release(uint64_t resourceUnits);
47+
48+
uint64_t currentResourceUsage() const {
49+
std::lock_guard<std::mutex> l(mtx);
50+
return unitsUsed_;
51+
}
52+
53+
private:
54+
struct Request {
55+
uint64_t unitsRequested;
56+
ContinuePromise promise;
57+
};
58+
Config config_;
59+
mutable std::mutex mtx;
60+
uint64_t unitsUsed_{0};
61+
std::deque<Request> queue_;
62+
};
63+
} // namespace facebook::velox::dwio::common

velox/dwio/common/tests/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ add_executable(
2323
DataBufferTests.cpp
2424
DecoderUtilTest.cpp
2525
ExecutorBarrierTest.cpp
26+
GenericAdmissionControllerTest.cpp
2627
OnDemandUnitLoaderTests.cpp
2728
LocalFileSinkTest.cpp
2829
MemorySinkTest.cpp
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
#include "velox/dwio/common/GenericAdmissionController.h"
16+
#include <gtest/gtest.h>
17+
#include <atomic>
18+
#include "velox/common/base/VeloxException.h"
19+
#include "velox/common/base/tests/GTestUtils.h"
20+
21+
using namespace facebook::velox;
22+
namespace facebook::velox::dwio::common {
23+
TEST(GenericAdmissionController, basic) {
24+
const uint64_t kLimit = 100000;
25+
GenericAdmissionController::Config config{.maxLimit = kLimit};
26+
GenericAdmissionController admissionController(config);
27+
EXPECT_EQ(admissionController.currentResourceUsage(), 0);
28+
29+
admissionController.accept(100);
30+
EXPECT_EQ(admissionController.currentResourceUsage(), 100);
31+
32+
admissionController.accept(100);
33+
EXPECT_EQ(admissionController.currentResourceUsage(), 200);
34+
35+
admissionController.release(100);
36+
EXPECT_EQ(admissionController.currentResourceUsage(), 100);
37+
38+
VELOX_ASSERT_THROW(
39+
admissionController.release(101),
40+
"Cannot release more units than have been acquired");
41+
42+
VELOX_ASSERT_THROW(
43+
admissionController.accept(kLimit + 1),
44+
"A single request cannot exceed the max limit");
45+
}
46+
47+
TEST(GenericAdmissionController, multiThreaded) {
48+
// Ensure that resource usage never exceeds the limit set in the admission
49+
// controller.
50+
const uint64_t kLimit = 10;
51+
std::atomic_uint64_t currentUsage{0};
52+
GenericAdmissionController::Config config{.maxLimit = 10};
53+
GenericAdmissionController admissionController(config);
54+
55+
std::vector<std::thread> threads;
56+
for (int i = 0; i < 20; i++) {
57+
threads.push_back(std::thread([&]() {
58+
for (int j = 0; j < 1000; j++) {
59+
admissionController.accept(1);
60+
uint64_t curr = currentUsage.fetch_add(1);
61+
ASSERT_LE(curr + 1, kLimit);
62+
currentUsage--;
63+
admissionController.release(1);
64+
}
65+
}));
66+
}
67+
for (auto& thread : threads) {
68+
thread.join();
69+
}
70+
}
71+
} // namespace facebook::velox::dwio::common

0 commit comments

Comments
 (0)