Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add defragment command #3003

Merged
merged 5 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/server/dragonfly_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ extern "C" {
#include "server/test_utils.h"

ABSL_DECLARE_FLAG(float, mem_defrag_threshold);
ABSL_DECLARE_FLAG(uint32_t, mem_defrag_check_sec_interval);
ABSL_DECLARE_FLAG(std::vector<std::string>, rename_command);
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
ABSL_DECLARE_FLAG(bool, lua_resp2_legacy_float);
Expand Down Expand Up @@ -641,7 +642,9 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
GTEST_SKIP() << "Defragmentation via idle task is only supported in io uring";
}

absl::SetFlag(&FLAGS_mem_defrag_threshold, 0.02);
// mem_defrag_threshold is based on RSS statistic, but we don't count it in the test
absl::SetFlag(&FLAGS_mem_defrag_threshold, 0.0);
absl::SetFlag(&FLAGS_mem_defrag_check_sec_interval, 0);
// Fill data into dragonfly and then check if we have
// any location in memory to defrag. See issue #448 for details about this.
constexpr size_t kMaxMemoryForTest = 1'100'000;
Expand Down
39 changes: 28 additions & 11 deletions src/server/engine_shard_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ ABSL_FLAG(string, shard_round_robin_prefix, "",
"support up to a few hundreds of prefixes. Note: prefix is looked inside hash tags when "
"cluster mode is enabled.");

ABSL_FLAG(uint32_t, mem_defrag_check_sec_interval, 10,
"Number of seconds between every defragmentation necessity check");

namespace dfly {

using namespace tiering::literals;
Expand Down Expand Up @@ -222,7 +225,6 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)

void EngineShard::DefragTaskState::UpdateScanState(uint64_t cursor_val) {
cursor = cursor_val;
underutilized_found = false;
// Once we're done with a db, jump to the next
if (cursor == kCursorDoneState) {
dbid++;
Expand All @@ -231,7 +233,6 @@ void EngineShard::DefragTaskState::UpdateScanState(uint64_t cursor_val) {

void EngineShard::DefragTaskState::ResetScanState() {
dbid = cursor = 0u;
underutilized_found = false;
}

// This function checks 3 things:
Expand All @@ -241,8 +242,9 @@ void EngineShard::DefragTaskState::ResetScanState() {
// 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory
// (control by mem_defrag_waste_threshold flag)
bool EngineShard::DefragTaskState::CheckRequired() {
if (cursor > kCursorDoneState || underutilized_found) {
VLOG(2) << "cursor: " << cursor << " and underutilized_found " << underutilized_found;
if (is_force_defrag || cursor > kCursorDoneState) {
is_force_defrag = false;
VLOG(2) << "cursor: " << cursor << " and is_force_defrag " << is_force_defrag;
return true;
}

Expand All @@ -251,20 +253,36 @@ bool EngineShard::DefragTaskState::CheckRequired() {
return false;
}

const std::size_t threshold_mem = memory_per_shard * GetFlag(FLAGS_mem_defrag_threshold);
const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
const std::size_t global_threshold = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold);
if (global_threshold > rss_mem_current.load(memory_order_relaxed)) {
return false;
}

const auto now = std::chrono::steady_clock::now();
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
const auto seconds_from_prev_check =
chrono::duration_cast<chrono::seconds>(now - prev_check).count();
const auto mem_defrag_interval = GetFlag(FLAGS_mem_defrag_check_sec_interval);

if (seconds_from_prev_check < mem_defrag_interval) {
return false;
}
prev_check = now;

ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));

if (threshold_mem < usage.commited &&
usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) {
const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
if (usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) {
VLOG(1) << "memory issue found for memory " << usage;
underutilized_found = true;
return true;
}

return false;
}

void EngineShard::ForceDefrag() {
defrag_state_.is_force_defrag = true;
}

bool EngineShard::DoDefrag() {
// --------------------------------------------------------------------------
// NOTE: This task is running with exclusive access to the shard.
Expand Down Expand Up @@ -341,8 +359,7 @@ uint32_t EngineShard::DefragTask() {
const auto shard_id = db_slice().shard_id();

if (defrag_state_.CheckRequired()) {
VLOG(2) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor
<< ", underutilzation found: " << defrag_state_.underutilized_found;
VLOG(2) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor;
if (DoDefrag()) {
// we didn't finish the scan
return util::ProactorBase::kOnIdleMaxLevel;
Expand Down
6 changes: 5 additions & 1 deletion src/server/engine_shard_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,15 @@ class EngineShard {

TxQueueInfo AnalyzeTxQueue() const;

void ForceDefrag();

private:
struct DefragTaskState {
size_t dbid = 0u;
uint64_t cursor = 0u;
bool underutilized_found = false;
std::chrono::time_point<std::chrono::steady_clock> prev_check =
BorysTheDev marked this conversation as resolved.
Show resolved Hide resolved
std::chrono::steady_clock::now();
bool is_force_defrag = false;

// check the current threshold and return true if
// we need to do the defragmentation
Expand Down
8 changes: 8 additions & 0 deletions src/server/memory_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ void MemoryCmd::Run(CmdArgList args) {
return Track(args);
}

if (sub_cmd == "DEFRAGMENT") {
shard_set->pool()->DispatchOnAll([this](util::ProactorBase*) {
if (auto* shard = EngineShard::tlocal(); shard)
shard->ForceDefrag();
});
return cntx_->SendSimpleString("OK");
}

string err = UnknownSubCmd(sub_cmd, "MEMORY");
return cntx_->SendError(err, kSyntaxErrType);
}
Expand Down
74 changes: 50 additions & 24 deletions tools/defrag_mem_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import async_timeout
import sys
import argparse
'''

"""
To install: pip install -r requirements.txt

Run
dragonfly --mem_defrag_threshold=0.01 --commit_use_threshold=1.2 --mem_utilization_threshold=0.8
defrag_mem_test.py -k 800000 -v 645
dragonfly --mem_defrag_threshold=0.01 --mem_defrag_waste_threshold=0.01
defrag_mem_test.py -k 8000000 -v 645

This program would try to re-create the issue with memory defragmentation.
See issue number 448 for more details.
Expand All @@ -29,7 +30,8 @@
NOTE:
If this seems to get stuck please kill it with ctrl+c
This can happen in case we don't have "defrag_realloc_total > 0"
'''
"""


class TaskCancel:
def __init__(self):
Expand All @@ -41,37 +43,40 @@ def dont_stop(self):
def stop(self):
self.run = False


async def run_cmd(connection, cmd, sub_val):
val = await connection.execute_command(cmd, sub_val)
return val


async def handle_defrag_stats(connection, prev):
info = await run_cmd(connection, "info", "stats")
if info is not None:
if info['defrag_task_invocation_total'] != prev:
if info["defrag_task_invocation_total"] != prev:
print("--------------------------------------------------------------")
print(f"defrag_task_invocation_total: {info['defrag_task_invocation_total']:,}")
print(f"defrag_realloc_total: {info['defrag_realloc_total']:,}")
print(f"defrag_attempt_total: {info['defrag_attempt_total']:,}")
print("--------------------------------------------------------------")
if info["defrag_realloc_total"] > 0:
return True, None
return False, info['defrag_task_invocation_total']
return False, info["defrag_task_invocation_total"]
return False, None


async def memory_stats(connection):
print("--------------------------------------------------------------")
info = await run_cmd(connection, "info", "memory")
print(f"memory commited: {info['comitted_memory']:,}")
# print(f"memory commited: {info['comitted_memory']:,}")
print(f"memory used: {info['used_memory']:,}")
print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}")
# print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}")
print("--------------------------------------------------------------")


async def stats_check(connection, condition):
try:
defrag_task_invocation_total = 0;
runs=0
defrag_task_invocation_total = 0
runs = 0
while condition.dont_stop():
await asyncio.sleep(0.3)
done, d = await handle_defrag_stats(connection, defrag_task_invocation_total)
Expand Down Expand Up @@ -101,13 +106,15 @@ async def delete_keys(connection, keys):
results = await connection.delete(*keys)
return results


def generate_keys(pattern: str, count: int, batch_size: int) -> list:
for i in range(1, count, batch_size):
batch = [f"{pattern}{j}" for j in range(i, batch_size + i, 3)]
yield batch


async def mem_cleanup(connection, pattern, num, cond, keys_count):
counter=0
counter = 0
for keys in generate_keys(pattern=pattern, count=keys_count, batch_size=950):
if cond.dont_stop() == False:
print(f"task number {num} that deleted keys {pattern} finished")
Expand All @@ -130,9 +137,17 @@ async def run_tasks(pool, key_name, value_size, keys_count):
tasks = []
count = 0
for key in keys:
pattern=f"{key}:"
pattern = f"{key}:"
print(f"deleting keys from {pattern}")
tasks.append(mem_cleanup(connection=connection, pattern=pattern, num=count, cond=stop_cond, keys_count=int(keys_count)))
tasks.append(
mem_cleanup(
connection=connection,
pattern=pattern,
num=count,
cond=stop_cond,
keys_count=int(keys_count),
)
)
count += 1
monitor_task = asyncio.create_task(stats_check(connection, stop_cond))
total = await asyncio.gather(*tasks, return_exceptions=True)
Expand All @@ -147,29 +162,40 @@ async def run_tasks(pool, key_name, value_size, keys_count):


def connect_and_run(key_name, value_size, keys_count, host="localhost", port=6379):
async_pool = aioredis.ConnectionPool(host=host, port=port,
db=0, decode_responses=True, max_connections=16)
async_pool = aioredis.ConnectionPool(
host=host, port=port, db=0, decode_responses=True, max_connections=16
)

loop = asyncio.new_event_loop()
success = loop.run_until_complete(run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count))
success = loop.run_until_complete(
run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count)
)
return success


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='active memory testing', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-k', '--keys', type=int, default=800000, help='total number of keys')
parser.add_argument('-v', '--value_size', type=int, default=645, help='size of the values')
parser.add_argument('-n', '--key_name', type=str, default="key-for-testing", help='the base key name')
parser.add_argument('-s', '--server', type=str, default="localhost", help='server host name')
parser.add_argument('-p', '--port', type=int, default=6379, help='server port number')
parser = argparse.ArgumentParser(
description="active memory testing", formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument("-k", "--keys", type=int, default=800000, help="total number of keys")
parser.add_argument("-v", "--value_size", type=int, default=645, help="size of the values")
parser.add_argument(
"-n", "--key_name", type=str, default="key-for-testing", help="the base key name"
)
parser.add_argument("-s", "--server", type=str, default="localhost", help="server host name")
parser.add_argument("-p", "--port", type=int, default=6379, help="server port number")
args = parser.parse_args()
keys_num = args.keys
key_name = args.key_name
value_size = args.value_size
host = args.server
port = args.port
print(f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}")
result = connect_and_run(key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port)
print(
f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}"
)
result = connect_and_run(
key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port
)
if result == True:
print("finished successfully")
else:
Expand Down