Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup tombstones #3

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
15 changes: 15 additions & 0 deletions src/hashtree_tree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
destroy/1,
insert/4,
insert/5,
delete/3,
update_snapshot/1,
update_perform/1,
local_compare/2,
Expand Down Expand Up @@ -229,6 +230,20 @@ insert(Prefixes, Key, Hash, Opts, Tree) ->
{error, bad_prefixes}
end.

-spec delete(prefixes(), binary(), tree()) -> tree() | {error, term()}.
delete(Prefixes, Key, Tree=#hashtree_tree{dirty=Dirty}) ->
NodeName = prefixes_to_node_name(Prefixes),
case valid_prefixes(NodeName, Tree) of
true ->
Node = get_node(NodeName, Tree),
Node2 = hashtree:delete(Key, Node),
Dirty2 = gb_sets:add_element(NodeName, Dirty),
_ = set_node(NodeName, Node2, Tree),
Tree#hashtree_tree{dirty=Dirty2};
false ->
{error, bad_prefixes}
end.

%% @doc Snapshot the tree for updating. The return tree should be
%% updated using {@link update_perform/1} and to perform future operations
%% on the tree
Expand Down
2 changes: 2 additions & 0 deletions src/plumtree.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
{env, [
{plumtree_data_dir, "data"},
{nr_of_meta_instances, 12},
{gc_grace_seconds, undefined}, % tombstones wont be gc'd
{gc_interval, 10000},
{meta_leveldb_opts, [
{sync, false},
{total_leveldb_mem_percent, 6},
Expand Down
10 changes: 10 additions & 0 deletions src/plumtree_metadata_hashtree.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
lock/2,
update/0,
update/1,
delete/1,
compare/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand Down Expand Up @@ -163,6 +164,11 @@ update(Node) ->
compare(RemoteFun, HandlerFun, HandlerAcc) ->
gen_server:call(?SERVER, {compare, RemoteFun, HandlerFun, HandlerAcc}, infinity).


-spec delete(metadata_pkey()) -> ok.
delete(PKey) ->
gen_server:call(?SERVER, {delete, PKey}, infinity).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand Down Expand Up @@ -198,6 +204,10 @@ handle_call({prefix_hash, Prefix}, _From, State=#state{tree=Tree}) ->
handle_call({insert, PKey, Hash, IfMissing}, _From, State=#state{tree=Tree}) ->
{Prefixes, Key} = prepare_pkey(PKey),
Tree1 = hashtree_tree:insert(Prefixes, Key, Hash, [{if_missing, IfMissing}], Tree),
{reply, ok, State#state{tree=Tree1}};
handle_call({delete, PKey}, _From, State=#state{tree=Tree}) ->
{Prefixes, Key} = prepare_pkey(PKey),
Tree1 = hashtree_tree:delete(Prefixes, Key, Tree),
{reply, ok, State#state{tree=Tree1}}.

handle_cast(_Msg, State) ->
Expand Down
174 changes: 146 additions & 28 deletions src/plumtree_metadata_leveldb_instance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@
name/1,
get/1,
put/2,
delete/1,
delete/2,
status/1,
data_size/1,
iterator/2,
iterator_move/2,
iterator_close/2]).

%% gen_server callbacks
Expand All @@ -50,7 +51,10 @@
read_opts = [],
write_opts = [],
fold_opts = [{fill_cache, false}],
open_iterators = []
open_iterators = [],
grace_period,
gc_interval,
refs=ets:new(?MODULE, [])
}).

-type config() :: [{atom(), term()}].
Expand Down Expand Up @@ -80,10 +84,10 @@ put(Key, Value) ->
Name = name(InstanceId),
gen_server:call(Name, {put, Key, Value}, infinity).

delete(Key) ->
delete(Key, Value) ->
InstanceId = plumtree_metadata_leveldb_instance_sup:get_instance_id_for_key(Key),
Name = name(InstanceId),
gen_server:call(Name, {delete, Key}, infinity).
gen_server:call(Name, {delete, Key, Value}, infinity).

name(Id) ->
list_to_atom("plmtrlvld_" ++ integer_to_list(Id)).
Expand All @@ -100,9 +104,13 @@ data_size(InstanceId) ->
iterator(Instance, KeysOnly) when is_pid(Instance) or is_atom(Instance) ->
gen_server:call(Instance, {new_iterator, self(), KeysOnly}, infinity).

iterator_close(Instance, Itr) when is_pid(Instance) or is_atom(Instance) ->
iterator_close(Instance, {_,_} = Itr) when is_pid(Instance) or is_atom(Instance) ->
gen_server:call(Instance, {close_iterator, Itr}, infinity).

iterator_move({MRef, Itr}, FirstKey) when is_reference(MRef) ->
eleveldb:iterator_move(Itr, FirstKey).


%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
Expand All @@ -127,12 +135,25 @@ init([InstanceId, Opts]) ->
"meta"),
DataDir2 = filename:join(DataDir1, integer_to_list(InstanceId)),

GracePeriod = app_helper:get_prop_or_env(gc_grace_seconds, Opts, plumtree,
undefined),
CleanupInterval =
case GracePeriod of
undefined ->
undefined;
_ ->
I = app_helper:get_prop_or_env(gc_interval, Opts, plumtree,
10000),
erlang:send_after(I, self(), cleanup)
end,

%% Initialize state
S0 = init_state(DataDir2, Opts),
process_flag(trap_exit, true),
case open_db(S0) of
{ok, State} ->
{ok, State};
{ok, init_graveyard(State#state{grace_period=GracePeriod,
gc_interval=CleanupInterval})};
{error, Reason} ->
{stop, Reason}
end.
Expand All @@ -153,31 +174,21 @@ init([InstanceId, Opts]) ->
%%--------------------------------------------------------------------
handle_call({get, Key}, _From, #state{read_opts=ReadOpts, ref=Ref} = State) ->
case eleveldb:get(Ref, sext:encode(Key), ReadOpts) of
{ok, Value} ->
{reply, {ok, binary_to_term(Value)}, State};
{ok, BVal} ->
Reply = {ok, binary_to_term(BVal)},
{reply, Reply, State};
not_found ->
{reply, {error, not_found}, State};
{error, Reason} ->
{reply, {error, Reason}, State}
end;
handle_call({put, Key, Value}, _From, #state{write_opts=WriteOpts, ref=Ref} = State) ->
Update = [{put, sext:encode(Key), term_to_binary(Value)}],
%% Perform the write...
case eleveldb:write(Ref, Update, WriteOpts) of
ok ->
{reply, ok, State};
{error, Reason} ->
{reply, {error, Reason}, State}
end;
handle_call({delete, Key}, _From, #state{write_opts=WriteOpts, ref=Ref} = State) ->
Update = [{delete, sext:encode(Key)}],
%% Perform the write...
case eleveldb:write(Ref, Update, WriteOpts) of
ok ->
{reply, ok, State};
{error, Reason} ->
{reply, {error, Reason}, State}
end;
UpdateOps = update_ops(Key, Value, State),
eleveldb:write(Ref, UpdateOps, WriteOpts),
{reply, true, State};
handle_call({delete, Key, TombstoneVal}, _From, State) ->
Accepted = delete(Key, TombstoneVal, State),
{reply, Accepted, State};
handle_call(status, _From, #state{ref=Ref} = State) ->
{ok, Stats} = eleveldb:status(Ref, <<"leveldb.stats">>),
{ok, ReadBlockError} = eleveldb:status(Ref, <<"leveldb.ReadBlockError">>),
Expand All @@ -202,9 +213,10 @@ handle_call({new_iterator, Owner, KeysOnly}, _From, #state{ref=Ref, fold_opts=Fo
false ->
eleveldb:iterator(Ref, FoldOpts)
end,
{reply, Itr, State#state{open_iterators=[{MRef, Itr}|OpenIterators]}};
handle_call({close_iterator, Itr}, _From, #state{open_iterators=OpenIterators} = State) ->
{MRef, _} = lists:keyfind(Itr, 2, OpenIterators),
{reply, {MRef, Itr}, State#state{open_iterators=[{MRef, Itr}|OpenIterators]}};
handle_call({close_iterator, {MRef, _}}, _From, #state{open_iterators=OpenIterators} = State) ->
{MRef, Itr} = lists:keyfind(MRef, 1, OpenIterators),
eleveldb:iterator_close(Itr),
demonitor(MRef, [flush]),
{reply, ok, State#state{open_iterators=lists:keydelete(MRef, 1, OpenIterators)}}.

Expand Down Expand Up @@ -239,6 +251,9 @@ handle_info({'DOWN', MRef, process, _, _}, #state{open_iterators=OpenIterators}
eleveldb:iterator_close(Itr)
end,
{noreply, State#state{open_iterators=lists:keydelete(MRef, 1, OpenIterators)}};
handle_info(cleanup, State) ->
erlang:send_after(State#state.gc_interval, self(), cleanup),
{noreply, cleanup_graveyard(State)};
handle_info(_Info, State) ->
{noreply, State}.

Expand Down Expand Up @@ -364,3 +379,106 @@ open_db(State0, RetriesLeft, _) ->
{error, Reason} ->
{error, Reason}
end.

delete(Key, TombstoneVal, #state{ref=Ref, write_opts=WriteOpts, grace_period=undefined}) ->
%% grace seconds disabled
SKey = sext:encode(Key),
Hash = plumtree_metadata_object:hash(TombstoneVal),
plumtree_metadata_hashtree:insert(Key, Hash),
eleveldb:write(Ref, [{put, SKey, term_to_binary(TombstoneVal)}], WriteOpts),
true;
delete(Key, _, #state{ref=Ref, write_opts=WriteOpts, grace_period=0}) ->
%% No grace period: we are allowed to delete this key
%% might make sense on single node clusters
plumtree_metadata_hashtree:delete(Key),
SKey = sext:encode(Key),
eleveldb:write(Ref, [{delete, SKey}], WriteOpts),
true;
delete(Key, TombstoneVal, #state{ref=Ref, write_opts=WriteOpts, refs=Refs}) ->
%% Grace period defined, add tombstone to graveyard
Hash = plumtree_metadata_object:hash(TombstoneVal),
plumtree_metadata_hashtree:insert(Key, Hash),
SKey = sext:encode(Key),
case ets:lookup(Refs, SKey) of
[] ->
TS = epoch(),
GraveyardKey = graveyard_key(Key),
ets:insert(Refs, {SKey, TS}),
eleveldb:write(Ref, [{put, SKey, term_to_binary(TombstoneVal)},
{put, GraveyardKey, term_to_binary(TS)}],
WriteOpts),
true;
_ ->
%% already graveyarded, we got this 'new' tombstone through merge
false
end.

update_ops(Key, Val, #state{refs=Refs}) ->
Hash = plumtree_metadata_object:hash(Val),
plumtree_metadata_hashtree:insert(Key, Hash),
SKey = sext:encode(Key),
BVal = term_to_binary(Val),
case ets:lookup(Refs, SKey) of
[] ->
[{put, SKey, BVal}];
[{_, _}] ->
%% remove from graveyard
ets:delete(Refs, SKey),
[{put, SKey, BVal},
{delete, graveyard_key(Key)}]
end.

graveyard_key(Key) ->
%% Key should be plain, not sext encoded
sext:encode({'$graveyard', Key}).

init_graveyard(#state{ref=Ref, fold_opts=FoldOpts} = State) ->
FirstKey = graveyard_key(''),
{ok, Itr} = eleveldb:iterator(Ref, FoldOpts),
init_graveyard(eleveldb:iterator_move(Itr, FirstKey), Itr, State).

init_graveyard({error, _}, _, State) ->
%% no need to close the iterator
State;
init_graveyard({ok, SKey, Val}, Itr, State) ->
case sext:decode(SKey) of
{'$graveyard', Key} ->
TS = binary_to_term(Val),
%% We let the cleanup_graveyard check decide what to do
%% with tombstones older than graceseconds
ets:insert(State#state.refs, {sext:encode(Key), TS}),
init_graveyard(eleveldb:iterator_move(Itr, prefetch), Itr, State);
_ ->
eleveldb:iterator_close(Itr),
State
end.



cleanup_graveyard(#state{grace_period=undefined} = State) -> State;
cleanup_graveyard(#state{grace_period=GS, refs=Refs} = State) ->
MatchHead = {'$1', '$2'},
Guard = {'<', '$2', epoch() - GS},
Result = '$1',
MatchFunction = {MatchHead, [Guard], [Result]},
MatchSpec = [MatchFunction],
cleanup_graveyard(ets:select(Refs, MatchSpec, 100), [], State).

cleanup_graveyard({[SKey|Rest], Cont}, Batch, State) ->
ets:delete(State#state.refs, SKey),
Key = sext:decode(SKey),
plumtree_metadata_hashtree:delete(Key),
cleanup_graveyard({Rest, Cont},
[{delete, SKey},
{delete, graveyard_key(Key)}|Batch], State);
cleanup_graveyard({[], Cont}, Batch, #state{ref=Ref, write_opts=WriteOpts} = State) ->
eleveldb:write(Ref, Batch, WriteOpts),
cleanup_graveyard(ets:select(Cont), [], State);
cleanup_graveyard('$end_of_table', [], State) -> State;
cleanup_graveyard('$end_of_table', Batch, #state{ref=Ref, write_opts=WriteOpts} = State) ->
eleveldb:write(Ref, Batch, WriteOpts),
State.

epoch() ->
{MegaSecs, Secs, _} = os:timestamp(),
MegaSecs * 1000000 + Secs.
38 changes: 21 additions & 17 deletions src/plumtree_metadata_leveldb_iterator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================
iterate([{Itr, _Instance}|_] = Instances, FullPrefix, KeyMatch, KeysOnly) ->
Res = eleveldb:iterator_move(Itr, prefetch),
Res = plumtree_metadata_leveldb_instance:iterator_move(Itr, prefetch),
iterate(Res, Instances, FullPrefix, KeyMatch, KeysOnly);
iterate([Instance|Rest], FullPrefix, KeyMatch, KeysOnly) when is_atom(Instance)->
Itr = plumtree_metadata_leveldb_instance:iterator(Instance, KeysOnly),
FirstKey = first_key(FullPrefix),
Res = eleveldb:iterator_move(Itr, FirstKey),
Res = plumtree_metadata_leveldb_instance:iterator_move(Itr, FirstKey),
iterate(Res, [{Itr, Instance}|Rest], FullPrefix, KeyMatch, KeysOnly);
iterate([], _, _, _) -> done.

Expand All @@ -197,22 +197,26 @@ iterate(OkVal, Instances, FullPrefix, KeyMatch, KeysOnly) when element(1, OkVal)
%% OkVal has the form of
%% {ok, Key} or {ok, Key, Val}
BKey = element(2, OkVal),
PrefixedKey = sext:decode(BKey),
case prefix_match(PrefixedKey, FullPrefix) of
{true, Key} ->
case key_match(Key, KeyMatch) of
true when KeysOnly ->
{{PrefixedKey, self()}, Instances};
true ->
BVal = element(3, OkVal),
{{{PrefixedKey, binary_to_term(BVal)}, self()}, Instances};
case sext:decode(BKey) of
{'$graveyard', _} ->
iterate(Instances, FullPrefix, KeyMatch, KeysOnly);
PrefixedKey ->
case prefix_match(PrefixedKey, FullPrefix) of
{true, Key} ->
case key_match(Key, KeyMatch) of
true when KeysOnly ->
{{PrefixedKey, self()}, Instances};
true ->
BVal = element(3, OkVal),
{{{PrefixedKey, binary_to_term(BVal)}, self()}, Instances};
false ->
iterate(Instances, FullPrefix, KeyMatch, KeysOnly)
end;
false ->
iterate(Instances, FullPrefix, KeyMatch, KeysOnly)
end;
false ->
[{Itr, Instance}|RestInstances] = Instances,
ok = plumtree_metadata_leveldb_instance:iterator_close(Instance, Itr),
iterate(RestInstances, FullPrefix, KeyMatch, KeysOnly)
[{Itr, Instance}|RestInstances] = Instances,
ok = plumtree_metadata_leveldb_instance:iterator_close(Instance, Itr),
iterate(RestInstances, FullPrefix, KeyMatch, KeysOnly)
end
end.

prefix_match({_, Key}, {undefined, undefined}) -> {true, Key};
Expand Down
Loading