Skip to content

Commit f5318c3

Browse files
authored
feat: Add findOrCreateBatchQueryCtx to query ctx manager (#26619)
Summary: In Batch mode, only one query is running at a time. When tasks fail during memory arbitration, the query memory pool will be set aborted, failing any successive tasks immediately. Yet one task should not fail other newly admitted tasks because of task retries and server reuse. Failure control among tasks should be independent. So if query memory pool is aborted already, a cache clear is performed to allow successive tasks to create a new query context to continue execution. This change also changes the folly::Synchronized lock to std::mutex for more flexible locking. Differential Revision: D87006158
1 parent 2f8bbba commit f5318c3

File tree

5 files changed

+191
-82
lines changed

5 files changed

+191
-82
lines changed

presto-native-execution/presto_cpp/main/QueryContextManager.cpp

Lines changed: 110 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,68 @@ inline QueryId queryIdFromTaskId(const TaskId& taskId) {
3434

3535
} // namespace
3636

37+
std::shared_ptr<velox::core::QueryCtx> QueryContextCache::get(
38+
const protocol::QueryId& queryId) {
39+
auto iter = queryCtxs_.find(queryId);
40+
if (iter == queryCtxs_.end()) {
41+
return nullptr;
42+
}
43+
44+
queryIds_.erase(iter->second.idListIterator);
45+
46+
if (auto queryCtx = iter->second.queryCtx.lock()) {
47+
// Move the queryId to front, if queryCtx is still alive.
48+
queryIds_.push_front(queryId);
49+
iter->second.idListIterator = queryIds_.begin();
50+
return queryCtx;
51+
}
52+
queryCtxs_.erase(iter);
53+
return nullptr;
54+
}
55+
56+
std::shared_ptr<velox::core::QueryCtx> QueryContextCache::insert(
57+
const protocol::QueryId& queryId,
58+
std::shared_ptr<velox::core::QueryCtx> queryCtx) {
59+
if (queryCtxs_.size() >= capacity_) {
60+
evict();
61+
}
62+
queryIds_.push_front(queryId);
63+
queryCtxs_[queryId] = {
64+
folly::to_weak_ptr(queryCtx), queryIds_.begin(), false};
65+
return queryCtx;
66+
}
67+
68+
bool QueryContextCache::hasStartedTasks(
69+
const protocol::QueryId& queryId) const {
70+
auto iter = queryCtxs_.find(queryId);
71+
if (iter != queryCtxs_.end()) {
72+
return iter->second.hasStartedTasks;
73+
}
74+
return false;
75+
}
76+
77+
void QueryContextCache::setTasksStarted(const protocol::QueryId& queryId) {
78+
auto iter = queryCtxs_.find(queryId);
79+
if (iter != queryCtxs_.end()) {
80+
iter->second.hasStartedTasks = true;
81+
}
82+
}
83+
84+
void QueryContextCache::evict() {
85+
// Evict least recently used queryCtx if it is not referenced elsewhere.
86+
for (auto victim = queryIds_.end(); victim != queryIds_.begin();) {
87+
--victim;
88+
if (!queryCtxs_[*victim].queryCtx.lock()) {
89+
queryCtxs_.erase(*victim);
90+
queryIds_.erase(victim);
91+
return;
92+
}
93+
}
94+
95+
// All queries are still inflight. Increase capacity.
96+
capacity_ = std::max(kInitialCapacity, capacity_ * 2);
97+
}
98+
3799
QueryContextManager::QueryContextManager(
38100
folly::Executor* driverExecutor,
39101
folly::Executor* spillerExecutor)
@@ -43,25 +105,58 @@ std::shared_ptr<velox::core::QueryCtx>
43105
QueryContextManager::findOrCreateQueryCtx(
44106
const protocol::TaskId& taskId,
45107
const protocol::TaskUpdateRequest& taskUpdateRequest) {
46-
return findOrCreateQueryCtx(
108+
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
109+
return findOrCreateQueryCtxLocked(
47110
taskId,
48111
toVeloxConfigs(
49112
taskUpdateRequest.session, taskUpdateRequest.extraCredentials),
50113
toConnectorConfigs(taskUpdateRequest));
51114
}
52115

116+
std::shared_ptr<velox::core::QueryCtx>
117+
QueryContextManager::findOrCreateBatchQueryCtx(
118+
const protocol::TaskId& taskId,
119+
const protocol::TaskUpdateRequest& taskUpdateRequest) {
120+
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
121+
auto queryCtx = findOrCreateQueryCtxLocked(
122+
taskId,
123+
toVeloxConfigs(
124+
taskUpdateRequest.session, taskUpdateRequest.extraCredentials),
125+
toConnectorConfigs(taskUpdateRequest));
126+
if (queryCtx->pool()->aborted()) {
127+
// In Batch mode, only one query is running at a time. When tasks fail
128+
// during memory arbitration, the query memory pool will be set
129+
// aborted, failing any successive tasks immediately. Yet one task
130+
// should not fail other newly admitted tasks because of task retries
131+
// and server reuse. Failure control among tasks should be
132+
// independent. So if query memory pool is aborted already, a cache clear is
133+
// performed to allow successive tasks to create a new query context to
134+
// continue execution.
135+
VELOX_CHECK_EQ(queryContextCache_.size(), 1);
136+
queryContextCache_.clear();
137+
queryCtx = findOrCreateQueryCtxLocked(
138+
taskId,
139+
toVeloxConfigs(
140+
taskUpdateRequest.session, taskUpdateRequest.extraCredentials),
141+
toConnectorConfigs(taskUpdateRequest));
142+
}
143+
return queryCtx;
144+
}
145+
53146
bool QueryContextManager::queryHasStartedTasks(
54147
const protocol::TaskId& taskId) const {
55-
return queryContextCache_.rlock()->hasStartedTasks(queryIdFromTaskId(taskId));
148+
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
149+
return queryContextCache_.hasStartedTasks(queryIdFromTaskId(taskId));
56150
}
57151

58152
void QueryContextManager::setQueryHasStartedTasks(
59153
const protocol::TaskId& taskId) {
60-
queryContextCache_.wlock()->setHasStartedTasks(queryIdFromTaskId(taskId));
154+
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
155+
queryContextCache_.setTasksStarted(queryIdFromTaskId(taskId));
61156
}
62157

63-
std::shared_ptr<core::QueryCtx> QueryContextManager::createAndCacheQueryCtx(
64-
QueryContextCache& cache,
158+
std::shared_ptr<core::QueryCtx>
159+
QueryContextManager::createAndCacheQueryCtxLocked(
65160
const QueryId& queryId,
66161
velox::core::QueryConfig&& queryConfig,
67162
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>&&
@@ -75,18 +170,17 @@ std::shared_ptr<core::QueryCtx> QueryContextManager::createAndCacheQueryCtx(
75170
std::move(pool),
76171
spillerExecutor_,
77172
queryId);
78-
return cache.insert(queryId, std::move(queryCtx));
173+
return queryContextCache_.insert(queryId, std::move(queryCtx));
79174
}
80175

81-
std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtx(
176+
std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtxLocked(
82177
const TaskId& taskId,
83178
velox::core::QueryConfig&& queryConfig,
84179
std::unordered_map<std::string, std::shared_ptr<config::ConfigBase>>&&
85180
connectorConfigs) {
86181
const QueryId queryId{queryIdFromTaskId(taskId)};
87182

88-
auto lockedCache = queryContextCache_.wlock();
89-
if (auto queryCtx = lockedCache->get(queryId)) {
183+
if (auto queryCtx = queryContextCache_.get(queryId)) {
90184
return queryCtx;
91185
}
92186

@@ -111,8 +205,7 @@ std::shared_ptr<core::QueryCtx> QueryContextManager::findOrCreateQueryCtx(
111205
nullptr,
112206
poolDbgOpts);
113207

114-
return createAndCacheQueryCtx(
115-
*lockedCache,
208+
return createAndCacheQueryCtxLocked(
116209
queryId,
117210
std::move(queryConfig),
118211
std::move(connectorConfigs),
@@ -123,19 +216,20 @@ void QueryContextManager::visitAllContexts(
123216
const std::function<
124217
void(const protocol::QueryId&, const velox::core::QueryCtx*)>& visitor)
125218
const {
126-
auto lockedCache = queryContextCache_.rlock();
127-
for (const auto& it : lockedCache->ctxs()) {
219+
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
220+
for (const auto& it : queryContextCache_.ctxMap()) {
128221
if (const auto queryCtxSP = it.second.queryCtx.lock()) {
129222
visitor(it.first, queryCtxSP.get());
130223
}
131224
}
132225
}
133226

134-
void QueryContextManager::testingClearCache() {
135-
queryContextCache_.wlock()->testingClear();
227+
void QueryContextManager::clearCache() {
228+
std::lock_guard<std::mutex> lock(queryContextCacheMutex_);
229+
queryContextCache_.clear();
136230
}
137231

138-
void QueryContextCache::testingClear() {
232+
void QueryContextCache::clear() {
139233
queryCtxs_.clear();
140234
queryIds_.clear();
141235
}

presto-native-execution/presto_cpp/main/QueryContextManager.h

Lines changed: 21 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <folly/executors/CPUThreadPoolExecutor.h>
1818
#include <folly/executors/IOThreadPoolExecutor.h>
1919
#include <memory>
20+
#include <mutex>
2021
#include <unordered_map>
2122

2223
#include "presto_cpp/presto_protocol/core/presto_protocol_core.h"
@@ -44,77 +45,31 @@ class QueryContextCache {
4445
return queryCtxs_.size();
4546
}
4647

47-
std::shared_ptr<velox::core::QueryCtx> get(const protocol::QueryId& queryId) {
48-
auto iter = queryCtxs_.find(queryId);
49-
if (iter != queryCtxs_.end()) {
50-
queryIds_.erase(iter->second.idListIterator);
51-
52-
if (auto queryCtx = iter->second.queryCtx.lock()) {
53-
// Move the queryId to front, if queryCtx is still alive.
54-
queryIds_.push_front(queryId);
55-
iter->second.idListIterator = queryIds_.begin();
56-
return queryCtx;
57-
} else {
58-
queryCtxs_.erase(iter);
59-
}
60-
}
61-
return nullptr;
48+
const QueryCtxMap& ctxMap() const {
49+
return queryCtxs_;
6250
}
6351

52+
std::shared_ptr<velox::core::QueryCtx> get(const protocol::QueryId& queryId);
53+
6454
std::shared_ptr<velox::core::QueryCtx> insert(
6555
const protocol::QueryId& queryId,
66-
std::shared_ptr<velox::core::QueryCtx> queryCtx) {
67-
if (queryCtxs_.size() >= capacity_) {
68-
evict();
69-
}
70-
queryIds_.push_front(queryId);
71-
queryCtxs_[queryId] = {
72-
folly::to_weak_ptr(queryCtx), queryIds_.begin(), false};
73-
return queryCtx;
74-
}
56+
std::shared_ptr<velox::core::QueryCtx> queryCtx);
7557

76-
bool hasStartedTasks(const protocol::QueryId& queryId) const {
77-
auto iter = queryCtxs_.find(queryId);
78-
if (iter != queryCtxs_.end()) {
79-
return iter->second.hasStartedTasks;
80-
}
81-
return false;
82-
}
58+
bool hasStartedTasks(const protocol::QueryId& queryId) const;
8359

84-
void setHasStartedTasks(const protocol::QueryId& queryId) {
85-
auto iter = queryCtxs_.find(queryId);
86-
if (iter != queryCtxs_.end()) {
87-
iter->second.hasStartedTasks = true;
88-
}
89-
}
60+
void setTasksStarted(const protocol::QueryId& queryId);
9061

91-
void evict() {
92-
// Evict least recently used queryCtx if it is not referenced elsewhere.
93-
for (auto victim = queryIds_.end(); victim != queryIds_.begin();) {
94-
--victim;
95-
if (!queryCtxs_[*victim].queryCtx.lock()) {
96-
queryCtxs_.erase(*victim);
97-
queryIds_.erase(victim);
98-
return;
99-
}
100-
}
101-
102-
// All queries are still inflight. Increase capacity.
103-
capacity_ = std::max(kInitialCapacity, capacity_ * 2);
104-
}
105-
const QueryCtxMap& ctxs() const {
106-
return queryCtxs_;
107-
}
62+
void evict();
10863

109-
void testingClear();
64+
void clear();
11065

11166
private:
67+
static constexpr size_t kInitialCapacity = 256UL;
68+
11269
size_t capacity_;
11370

11471
QueryCtxMap queryCtxs_;
11572
QueryIdList queryIds_;
116-
117-
static constexpr size_t kInitialCapacity = 256UL;
11873
};
11974

12075
class QueryContextManager {
@@ -129,6 +84,10 @@ class QueryContextManager {
12984
const protocol::TaskId& taskId,
13085
const protocol::TaskUpdateRequest& taskUpdateRequest);
13186

87+
std::shared_ptr<velox::core::QueryCtx> findOrCreateBatchQueryCtx(
88+
const protocol::TaskId& taskId,
89+
const protocol::TaskUpdateRequest& taskUpdateRequest);
90+
13291
/// Returns true if the given task's query has at least one task started.
13392
bool queryHasStartedTasks(const protocol::TaskId& taskId) const;
13493

@@ -142,30 +101,30 @@ class QueryContextManager {
142101
visitor) const;
143102

144103
/// Test method to clear the query context cache.
145-
void testingClearCache();
104+
void clearCache();
146105

147106
protected:
148107
folly::Executor* const driverExecutor_{nullptr};
149108
folly::Executor* const spillerExecutor_{nullptr};
109+
QueryContextCache queryContextCache_;
150110

151111
private:
152-
virtual std::shared_ptr<velox::core::QueryCtx> createAndCacheQueryCtx(
153-
QueryContextCache& cache,
112+
virtual std::shared_ptr<velox::core::QueryCtx> createAndCacheQueryCtxLocked(
154113
const protocol::QueryId& queryId,
155114
velox::core::QueryConfig&& queryConfig,
156115
std::unordered_map<
157116
std::string,
158117
std::shared_ptr<velox::config::ConfigBase>>&& connectorConfigs,
159118
std::shared_ptr<velox::memory::MemoryPool>&& pool);
160119

161-
std::shared_ptr<velox::core::QueryCtx> findOrCreateQueryCtx(
120+
std::shared_ptr<velox::core::QueryCtx> findOrCreateQueryCtxLocked(
162121
const protocol::TaskId& taskId,
163122
velox::core::QueryConfig&& queryConfig,
164123
std::unordered_map<
165124
std::string,
166125
std::shared_ptr<velox::config::ConfigBase>>&& connectorConfigStrings);
167126

168-
folly::Synchronized<QueryContextCache> queryContextCache_;
127+
mutable std::mutex queryContextCacheMutex_;
169128
};
170129

171130
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/TaskResource.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateBatchTask(
338338
}
339339

340340
auto queryCtx =
341-
taskManager_.getQueryContextManager()->findOrCreateQueryCtx(
341+
taskManager_.getQueryContextManager()->findOrCreateBatchQueryCtx(
342342
taskId, updateRequest);
343343

344344
VeloxBatchQueryPlanConverter converter(

presto-native-execution/presto_cpp/main/tests/QueryContextCacheTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ TEST_F(QueryContextCacheTest, hasStartedTasks) {
9595
auto queryId = fmt::format("query-{}", i);
9696
EXPECT_FALSE(queryContextCache.hasStartedTasks(queryId));
9797
if (i % 2 == 0) {
98-
queryContextCache.setHasStartedTasks(queryId);
98+
queryContextCache.setTasksStarted(queryId);
9999
}
100100
}
101101

0 commit comments

Comments
 (0)