Skip to content

Commit 40e1201

Browse files
committed
[native] Make PeriodicMemoryChecker member of PrestoServer
1 parent fb6d9af commit 40e1201

File tree

5 files changed

+50
-42
lines changed

5 files changed

+50
-42
lines changed

presto-native-execution/presto_cpp/main/LinuxMemoryChecker.cpp

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,9 @@ class LinuxMemoryChecker : public PeriodicMemoryChecker {
205205
};
206206

207207
// Unit is in bytes.
208-
return inactiveAnon + activeAnon;
208+
const auto memBytes = inactiveAnon + activeAnon;
209+
cachedSystemUsedMemoryBytes_ = memBytes;
210+
return memBytes;
209211
}
210212

211213
// Last resort use host machine info.
@@ -226,7 +228,10 @@ class LinuxMemoryChecker : public PeriodicMemoryChecker {
226228
}
227229
};
228230
// Unit is in bytes.
229-
return (memAvailable && memTotal) ? memTotal - memAvailable : 0;
231+
const auto memBytes =
232+
(memAvailable && memTotal) ? memTotal - memAvailable : 0;
233+
cachedSystemUsedMemoryBytes_ = memBytes;
234+
return memBytes;
230235
}
231236

232237
int64_t mallocBytes() const override {
@@ -274,20 +279,18 @@ class LinuxMemoryChecker : public PeriodicMemoryChecker {
274279
}
275280
};
276281

277-
folly::Singleton<facebook::presto::PeriodicMemoryChecker> checker(
278-
[]() -> facebook::presto::PeriodicMemoryChecker* {
279-
auto* systemConfig = SystemConfig::instance();
280-
if (systemConfig->systemMemPushbackEnabled()) {
281-
PeriodicMemoryChecker::Config config;
282-
config.systemMemPushbackEnabled =
283-
systemConfig->systemMemPushbackEnabled();
284-
config.systemMemLimitBytes =
285-
static_cast<uint64_t>(systemConfig->systemMemLimitGb()) << 30;
286-
config.systemMemShrinkBytes =
287-
static_cast<uint64_t>(systemConfig->systemMemShrinkGb()) << 30;
288-
return std::make_unique<LinuxMemoryChecker>(config).release();
289-
}
290-
return nullptr;
291-
});
282+
std::unique_ptr<PeriodicMemoryChecker> createMemoryChecker() {
283+
auto* systemConfig = SystemConfig::instance();
284+
if (systemConfig->systemMemPushbackEnabled()) {
285+
PeriodicMemoryChecker::Config config;
286+
config.systemMemPushbackEnabled = systemConfig->systemMemPushbackEnabled();
287+
config.systemMemLimitBytes =
288+
static_cast<uint64_t>(systemConfig->systemMemLimitGb()) << 30;
289+
config.systemMemShrinkBytes =
290+
static_cast<uint64_t>(systemConfig->systemMemShrinkGb()) << 30;
291+
return std::make_unique<LinuxMemoryChecker>(config);
292+
}
293+
return nullptr;
294+
}
292295

293296
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/PeriodicMemoryChecker.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,8 +223,8 @@ void PeriodicMemoryChecker::pushbackMemory() {
223223
#ifndef PRESTO_MEMORY_CHECKER_TYPE
224224
// Initialize singleton for the checker to be nullptr if
225225
// PRESTO_MEMORY_CHECKER_TYPE is not defined.
226-
folly::Singleton<facebook::presto::PeriodicMemoryChecker> checker([]() {
226+
std::unique_ptr<PeriodicMemoryChecker> createMemoryChecker() {
227227
return nullptr;
228-
});
228+
}
229229
#endif
230230
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/PeriodicMemoryChecker.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,14 @@ class PeriodicMemoryChecker {
7676
/// Stops the 'PeriodicMemoryChecker'.
7777
void stop();
7878

79+
/// Returns the last known cached 'current' system memory usage in bytes.
80+
int64_t cachedSystemUsedMemoryBytes() const {
81+
return cachedSystemUsedMemoryBytes_;
82+
}
83+
7984
protected:
80-
/// Returns current system memory usage. The returned value is used to compare
81-
/// with 'Config::systemMemLimitBytes'.
85+
/// Fetches and returns current system memory usage in bytes.
86+
/// The returned value is used to compare with 'Config::systemMemLimitBytes'.
8287
virtual int64_t systemUsedMemoryBytes() = 0;
8388

8489
/// Returns current bytes allocated by malloc. The returned value is used to
@@ -102,6 +107,7 @@ class PeriodicMemoryChecker {
102107
virtual void pushbackMemory();
103108

104109
const Config config_;
110+
std::atomic<int64_t> cachedSystemUsedMemoryBytes_{0};
105111

106112
private:
107113
// Struct that stores the file names of the heap profiles dumped and the
@@ -137,4 +143,6 @@ class PeriodicMemoryChecker {
137143
std::greater<DumpFileInfo>>
138144
dumpFilesByHeapMemUsageMinPq_;
139145
};
146+
147+
std::unique_ptr<PeriodicMemoryChecker> createMemoryChecker();
140148
} // namespace facebook::presto

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -527,8 +527,10 @@ void PrestoServer::run() {
527527
addServerPeriodicTasks();
528528
addAdditionalPeriodicTasks();
529529
periodicTaskManager_->start();
530-
531-
addMemoryCheckerPeriodicTask();
530+
createPeriodicMemoryChecker();
531+
if (memoryChecker_ != nullptr) {
532+
memoryChecker_->start();
533+
}
532534

533535
auto setTaskUriCb = [&](bool useHttps, int port) {
534536
std::string taskUri;
@@ -618,12 +620,13 @@ void PrestoServer::run() {
618620
}
619621

620622
PRESTO_SHUTDOWN_LOG(INFO) << "Stopping all periodic tasks";
621-
periodicTaskManager_->stop();
622623

624+
if (memoryChecker_ != nullptr) {
625+
memoryChecker_->stop();
626+
}
627+
periodicTaskManager_->stop();
623628
stopAdditionalPeriodicTasks();
624629

625-
stopMemoryCheckerPeriodicTask();
626-
627630
// Destroy entities here to ensure we won't get any messages after Server
628631
// object is gone and to have nice log in case shutdown gets stuck.
629632
PRESTO_SHUTDOWN_LOG(INFO) << "Destroying Task Resource";
@@ -1032,18 +1035,6 @@ void PrestoServer::updateAnnouncerDetails() {
10321035
}
10331036
}
10341037

1035-
void PrestoServer::addMemoryCheckerPeriodicTask() {
1036-
if (folly::Singleton<PeriodicMemoryChecker>::try_get()) {
1037-
folly::Singleton<PeriodicMemoryChecker>::try_get()->start();
1038-
}
1039-
}
1040-
1041-
void PrestoServer::stopMemoryCheckerPeriodicTask() {
1042-
if (folly::Singleton<PeriodicMemoryChecker>::try_get()) {
1043-
folly::Singleton<PeriodicMemoryChecker>::try_get()->stop();
1044-
}
1045-
}
1046-
10471038
void PrestoServer::addServerPeriodicTasks() {
10481039
periodicTaskManager_->addTask(
10491040
[server = this]() { server->populateMemAndCPUInfo(); },
@@ -1107,6 +1098,12 @@ void PrestoServer::addServerPeriodicTasks() {
11071098
}
11081099
}
11091100

1101+
void PrestoServer::createPeriodicMemoryChecker() {
1102+
// The call below will either produce nullptr or unique pointer to an instance
1103+
// of LinuxMemoryChecker.
1104+
memoryChecker_ = createMemoryChecker();
1105+
}
1106+
11101107
std::shared_ptr<velox::exec::TaskListener> PrestoServer::getTaskListener() {
11111108
return nullptr;
11121109
}

presto-native-execution/presto_cpp/main/PrestoServer.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class Announcer;
6767
class SignalHandler;
6868
class TaskManager;
6969
class TaskResource;
70+
class PeriodicMemoryChecker;
7071
class PeriodicTaskManager;
7172
class SystemConfig;
7273

@@ -115,16 +116,14 @@ class PrestoServer {
115116
void enableAnnouncer(bool enable);
116117

117118
protected:
119+
virtual void createPeriodicMemoryChecker();
120+
118121
/// Hook for derived PrestoServer implementations to add/stop additional
119122
/// periodic tasks.
120123
virtual void addAdditionalPeriodicTasks(){};
121124

122125
virtual void stopAdditionalPeriodicTasks(){};
123126

124-
virtual void addMemoryCheckerPeriodicTask();
125-
126-
virtual void stopMemoryCheckerPeriodicTask();
127-
128127
virtual void initializeCoordinatorDiscoverer();
129128

130129
virtual std::shared_ptr<velox::exec::TaskListener> getTaskListener();
@@ -273,6 +272,7 @@ class PrestoServer {
273272
std::chrono::steady_clock::time_point start_;
274273
std::unique_ptr<PeriodicTaskManager> periodicTaskManager_;
275274
std::unique_ptr<PrestoServerOperations> prestoServerOperations_;
275+
std::unique_ptr<PeriodicMemoryChecker> memoryChecker_;
276276

277277
// We update these members asynchronously and return in http requests w/o
278278
// delay.

0 commit comments

Comments
 (0)