Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup tombstones #3

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
15 changes: 15 additions & 0 deletions src/hashtree_tree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
destroy/1,
insert/4,
insert/5,
delete/3,
update_snapshot/1,
update_perform/1,
local_compare/2,
Expand Down Expand Up @@ -229,6 +230,20 @@ insert(Prefixes, Key, Hash, Opts, Tree) ->
{error, bad_prefixes}
end.

-spec delete(prefixes(), binary(), tree()) -> tree() | {error, term()}.
delete(Prefixes, Key, Tree=#hashtree_tree{dirty=Dirty}) ->
NodeName = prefixes_to_node_name(Prefixes),
case valid_prefixes(NodeName, Tree) of
true ->
Node = get_node(NodeName, Tree),
Node2 = hashtree:delete(Key, Node),
Dirty2 = gb_sets:add_element(NodeName, Dirty),
_ = set_node(NodeName, Node2, Tree),
Tree#hashtree_tree{dirty=Dirty2};
false ->
{error, bad_prefixes}
end.

%% @doc Snapshot the tree for updating. The return tree should be
%% updated using {@link update_perform/1} and to perform future operations
%% on the tree
Expand Down
1 change: 1 addition & 0 deletions src/plumtree.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
{env, [
{plumtree_data_dir, "data"},
{nr_of_meta_instances, 12},
{gc_grace_seconds, 0}, % tombstones wont be gc'd
{meta_leveldb_opts, [
{sync, false},
{total_leveldb_mem_percent, 6},
Expand Down
10 changes: 10 additions & 0 deletions src/plumtree_metadata_hashtree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
lock/2,
update/0,
update/1,
delete/1,
compare/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand Down Expand Up @@ -163,6 +164,11 @@ update(Node) ->
compare(RemoteFun, HandlerFun, HandlerAcc) ->
gen_server:call(?SERVER, {compare, RemoteFun, HandlerFun, HandlerAcc}, infinity).


-spec delete(metadata_pkey()) -> ok.
delete(PKey) ->
gen_server:call(?SERVER, {delete, PKey}, infinity).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand Down Expand Up @@ -198,6 +204,10 @@ handle_call({prefix_hash, Prefix}, _From, State=#state{tree=Tree}) ->
handle_call({insert, PKey, Hash, IfMissing}, _From, State=#state{tree=Tree}) ->
{Prefixes, Key} = prepare_pkey(PKey),
Tree1 = hashtree_tree:insert(Prefixes, Key, Hash, [{if_missing, IfMissing}], Tree),
{reply, ok, State#state{tree=Tree1}};
handle_call({delete, PKey}, _From, State=#state{tree=Tree}) ->
{Prefixes, Key} = prepare_pkey(PKey),
Tree1 = hashtree_tree:delete(Prefixes, Key, Tree),
{reply, ok, State#state{tree=Tree1}}.

handle_cast(_Msg, State) ->
Expand Down
90 changes: 78 additions & 12 deletions src/plumtree_metadata_leveldb_instance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
status/1,
data_size/1,
iterator/2,
iterator_move/2,
iterator_close/2]).

%% gen_server callbacks
Expand All @@ -50,10 +51,13 @@
read_opts = [],
write_opts = [],
fold_opts = [{fill_cache, false}],
open_iterators = []
open_iterators = [],
grace_secs,
cleanup=maps:new()
}).

-type config() :: [{atom(), term()}].
-define(TOMBSTONE, '$deleted').

%%%===================================================================
%%% API functions
Expand Down Expand Up @@ -100,9 +104,13 @@ data_size(InstanceId) ->
iterator(Instance, KeysOnly) when is_pid(Instance) or is_atom(Instance) ->
gen_server:call(Instance, {new_iterator, self(), KeysOnly}, infinity).

iterator_close(Instance, Itr) when is_pid(Instance) or is_atom(Instance) ->
iterator_close(Instance, {_,_} = Itr) when is_pid(Instance) or is_atom(Instance) ->
gen_server:call(Instance, {close_iterator, Itr}, infinity).

iterator_move({MRef, Itr}, FirstKey) when is_reference(MRef) ->
eleveldb:iterator_move(Itr, FirstKey).


%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand All @@ -127,12 +135,18 @@ init([InstanceId, Opts]) ->
"meta"),
DataDir2 = filename:join(DataDir1, integer_to_list(InstanceId)),

GraceSeconds = app_helper:get_prop_or_env(gc_grace_seconds, Opts, plumtree),

%% Initialize state
S0 = init_state(DataDir2, Opts),
process_flag(trap_exit, true),
case open_db(S0) of
{ok, State} ->
{ok, State};
case GraceSeconds > 0 of
true -> timer:send_interval(GraceSeconds * 1000, force_cleanup);
false -> ignore
end,
{ok, init_cleanup(State#state{grace_secs=GraceSeconds})};
{error, Reason} ->
{stop, Reason}
end.
Expand Down Expand Up @@ -161,20 +175,22 @@ handle_call({get, Key}, _From, #state{read_opts=ReadOpts, ref=Ref} = State) ->
{reply, {error, Reason}, State}
end;
handle_call({put, Key, Value}, _From, #state{write_opts=WriteOpts, ref=Ref} = State) ->
Update = [{put, sext:encode(Key), term_to_binary(Value)}],
Update = {put, sext:encode(Key), term_to_binary(Value)},
{NewCleanup, CleanupOps} = maybe_trigger_cleanup(Key, Value, State),
%% Perform the write...
case eleveldb:write(Ref, Update, WriteOpts) of
case eleveldb:write(Ref, [Update|CleanupOps], WriteOpts) of
ok ->
{reply, ok, State};
{reply, ok, State#state{cleanup=NewCleanup}};
{error, Reason} ->
{reply, {error, Reason}, State}
end;
handle_call({delete, Key}, _From, #state{write_opts=WriteOpts, ref=Ref} = State) ->
Update = [{delete, sext:encode(Key)}],
Update = {delete, sext:encode(Key)},
{NewCleanup, CleanupOps} = maybe_trigger_cleanup(State),
%% Perform the write...
case eleveldb:write(Ref, Update, WriteOpts) of
case eleveldb:write(Ref, [Update|CleanupOps], WriteOpts) of
ok ->
{reply, ok, State};
{reply, ok, State#state{cleanup=NewCleanup}};
{error, Reason} ->
{reply, {error, Reason}, State}
end;
Expand Down Expand Up @@ -202,9 +218,10 @@ handle_call({new_iterator, Owner, KeysOnly}, _From, #state{ref=Ref, fold_opts=Fo
false ->
eleveldb:iterator(Ref, FoldOpts)
end,
{reply, Itr, State#state{open_iterators=[{MRef, Itr}|OpenIterators]}};
handle_call({close_iterator, Itr}, _From, #state{open_iterators=OpenIterators} = State) ->
{MRef, _} = lists:keyfind(Itr, 2, OpenIterators),
{reply, {MRef, Itr}, State#state{open_iterators=[{MRef, Itr}|OpenIterators]}};
handle_call({close_iterator, {MRef, _}}, _From, #state{open_iterators=OpenIterators} = State) ->
{MRef, Itr} = lists:keyfind(MRef, 1, OpenIterators),
eleveldb:iterator_close(Itr),
demonitor(MRef, [flush]),
{reply, ok, State#state{open_iterators=lists:keydelete(MRef, 1, OpenIterators)}}.

Expand Down Expand Up @@ -239,6 +256,11 @@ handle_info({'DOWN', MRef, process, _, _}, #state{open_iterators=OpenIterators}
eleveldb:iterator_close(Itr)
end,
{noreply, State#state{open_iterators=lists:keydelete(MRef, 1, OpenIterators)}};
handle_info(force_cleanup, #state{ref=Ref, write_opts=WriteOpts} = State) ->
{NewCleanup, CleanupOps} = maybe_trigger_cleanup(State),
%% Perform the write...
eleveldb:write(Ref, CleanupOps, WriteOpts),
{noreply, State#state{cleanup=NewCleanup}};
handle_info(_Info, State) ->
{noreply, State}.

Expand Down Expand Up @@ -364,3 +386,47 @@ open_db(State0, RetriesLeft, _) ->
{error, Reason} ->
{error, Reason}
end.

maybe_trigger_cleanup(Key, Val, #state{grace_secs=GS, cleanup=Cleanup}) when GS > 0 ->
Now = epoch(),
Cleanup1 =
case plumtree_metadata_object:values(Val) of
[?TOMBSTONE] ->
maps:put(Key, Now, Cleanup);
_ ->
maps:remove(Key, Cleanup)
end,
incr_cleanup(Cleanup1, Now, GS);
maybe_trigger_cleanup(_, _, #state{cleanup=Cleanup}) ->
{Cleanup, []}.

maybe_trigger_cleanup(#state{grace_secs=GS, cleanup=Cleanup}) when GS > 0->
incr_cleanup(Cleanup, epoch(), GS);
maybe_trigger_cleanup(#state{cleanup=Cleanup}) ->
{Cleanup, []}.

incr_cleanup(Cleanup, Now, GS) ->
KeysToDelete =
maps:fold(fun(K, V, Acc) when (Now - V) > GS ->
[K|Acc];
(_, _, Acc) ->
Acc
end, [], Cleanup),
NewCleanup = maps:without(KeysToDelete, Cleanup),
{NewCleanup, [{delete, sext:encode(K)} || K <- KeysToDelete]}.

init_cleanup(#state{cleanup=Cleanup, fold_opts=FoldOpts, ref=Ref} = State) ->
Now = epoch(),
NewCleanup =
eleveldb:fold(Ref, fun({BKey, BVal} , Acc) ->
case plumtree_metadata_object:values(binary_to_term(BVal)) of
[?TOMBSTONE] ->
maps:put(sext:decode(BKey), Now, Acc);
_ -> Acc
end
end, Cleanup, FoldOpts),
State#state{cleanup=NewCleanup}.

epoch() ->
{MegaSecs, Secs, _} = os:timestamp(),
MegaSecs * 1000000 + Secs.
4 changes: 2 additions & 2 deletions src/plumtree_metadata_leveldb_iterator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================
iterate([{Itr, _Instance}|_] = Instances, FullPrefix, KeyMatch, KeysOnly) ->
Res = eleveldb:iterator_move(Itr, prefetch),
Res = plumtree_metadata_leveldb_instance:iterator_move(Itr, prefetch),
iterate(Res, Instances, FullPrefix, KeyMatch, KeysOnly);
iterate([Instance|Rest], FullPrefix, KeyMatch, KeysOnly) when is_atom(Instance)->
Itr = plumtree_metadata_leveldb_instance:iterator(Instance, KeysOnly),
FirstKey = first_key(FullPrefix),
Res = eleveldb:iterator_move(Itr, FirstKey),
Res = plumtree_metadata_leveldb_instance:iterator_move(Itr, FirstKey),
iterate(Res, [{Itr, Instance}|Rest], FullPrefix, KeyMatch, KeysOnly);
iterate([], _, _, _) -> done.

Expand Down
39 changes: 34 additions & 5 deletions src/plumtree_metadata_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
graft/1,
exchange/1]).

%% used by cleanup
-export([force_delete/1]).
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @dergraf This function does not seem to be in use anywhere. Is it kept around for debugging purposes or just dead code?


%% utilities
-export([size/1,
subscribe/1,
Expand Down Expand Up @@ -217,6 +220,15 @@ put({{Prefix, SubPrefix}, _Key}=PKey, Context, ValueOrFun)
(is_binary(SubPrefix) orelse is_atom(SubPrefix)) ->
read_modify_write(PKey, Context, ValueOrFun).

%% @doc forcefully deletes the key
-spec force_delete(metadata_pkey()) -> ok.
force_delete({{Prefix, SubPrefix}, _Key}=PKey)
when (is_binary(Prefix) orelse is_atom(Prefix)) andalso
(is_binary(SubPrefix) orelse is_atom(SubPrefix)) ->
ok = plumtree_metadata_hashtree:delete(PKey),
plumtree_metadata_leveldb_instance:delete(PKey),
ok.

%% @doc same as merge/2 but merges the object on `Node'
-spec merge(node(), {metadata_pkey(), undefined | metadata_context()}, metadata_object()) -> boolean().
merge(Node, {PKey, _Context}, Obj) ->
Expand Down Expand Up @@ -451,11 +463,28 @@ read_modify_write(PKey, Context, ValueOrFun) ->

read_merge_write(PKey, Obj) ->
Existing = read(PKey),
case plumtree_metadata_object:reconcile(Obj, Existing) of
false -> false;
{true, Reconciled} ->
store(PKey, Reconciled),
true
IsDeleted =
case {Obj, Existing} of
{undefined, undefined} ->
true;
{undefined, O} ->
plumtree_metadata_object:values(O) == ['$deleted'];
{O, undefined} ->
plumtree_metadata_object:values(O) == ['$deleted'];
_ ->
false
end,
case IsDeleted of
true ->
false;
false ->
case plumtree_metadata_object:reconcile(Obj, Existing) of
false ->
false;
{true, Reconciled} ->
store(PKey, Reconciled),
true
end
end.

store({FullPrefix, Key}=PKey, Metadata) ->
Expand Down
14 changes: 10 additions & 4 deletions src/plumtree_peer_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
attempt_join/2,
leave/1,
stop/0,
stop/1
stop/1,
stop/2
]).

%% @doc prepare node to join a cluster
Expand Down Expand Up @@ -89,14 +90,19 @@ leave(_Args) when is_list(_Args) ->
{error, singleton} ->
lager:warning("Cannot leave, not a member of a cluster.")
end;
leave(_Args) ->
leave([]).
leave(Args) ->
leave(Args).

stop() ->
stop("received stop request").
stop("received stop request", []).

stop(Reason) ->
stop(Reason, []).

stop(Reason, Args) ->
StopFun = proplists:get_value(stop_fun, Args, fun() -> ok end),
lager:notice("~p", [Reason]),
StopFun(),
ok.

random_peer(Leave) ->
Expand Down
21 changes: 20 additions & 1 deletion test/metadata_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

-export([
read_write_delete_test/1,
manual_force_cleanup_test/1,
partitioned_cluster_test/1,
siblings_test/1
]).
Expand Down Expand Up @@ -72,7 +73,8 @@ end_per_testcase(_, _Config) ->
ok.

all() ->
[read_write_delete_test, partitioned_cluster_test, siblings_test].
[read_write_delete_test, manual_force_cleanup_test,
partitioned_cluster_test, siblings_test].

read_write_delete_test(Config) ->
[Node1|OtherNodes] = Nodes = proplists:get_value(nodes, Config),
Expand All @@ -93,6 +95,23 @@ read_write_delete_test(Config) ->
ok = wait_until_converged(Nodes, {foo, bar}, baz, undefined),
ok.

manual_force_cleanup_test(Config) ->
ok = read_write_delete_test(Config),
Nodes = proplists:get_value(nodes, Config),
{Res1, _} = rpc:multicall(Nodes, plumtree_metadata_manager, size, [{foo, bar}]),

%% every node still has one tombstone entry in the ets cache
?assertEqual(length(Nodes), lists:sum(Res1)),
%% grace_seconds is 10 seconds, worst case we have to wait 2x that long
timer:sleep(20000),
{Res2, _} = rpc:multicall(Nodes, plumtree_metadata_manager, size, [{foo, bar}]),

?assertEqual(0, lists:sum(Res2)),
ok.




partitioned_cluster_test(Config) ->
[Node1|OtherNodes] = Nodes = proplists:get_value(nodes, Config),
[?assertEqual(ok, rpc:call(Node, plumtree_peer_service, join, [Node1]))
Expand Down
3 changes: 3 additions & 0 deletions test/plumtree_test_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ start_node(Name, Config, Case) ->
ok = rpc:call(Node, application, set_env, [plumtree,
plumtree_data_dir,
NodeDir]),
ok = rpc:call(Node, application, set_env, [plumtree,
gc_grace_seconds,
10]),
{ok, _} = rpc:call(Node, application, ensure_all_started, [plumtree]),
ok = wait_until(fun() ->
case rpc:call(Node, plumtree_peer_service_manager, get_local_state, []) of
Expand Down