From 0a05a48c68587b707a66f516523ff25385ef485a Mon Sep 17 00:00:00 2001 From: Andre Graf Date: Sun, 19 Jul 2015 13:41:22 +0200 Subject: [PATCH 01/11] first version of proper tombstone cleanup --- src/hashtree_tree.erl | 15 +++ src/plumtree_dets_metadata_manager.erl | 11 ++ src/plumtree_leveldb_metadata_manager.erl | 10 ++ src/plumtree_metadata_cleanup.erl | 148 ++++++++++++++++++++++ src/plumtree_metadata_cleanup_sup.erl | 57 +++++++++ src/plumtree_metadata_hashtree.erl | 10 ++ src/plumtree_metadata_manager.erl | 51 +++++++- src/plumtree_sup.erl | 1 + 8 files changed, 297 insertions(+), 6 deletions(-) create mode 100644 src/plumtree_metadata_cleanup.erl create mode 100644 src/plumtree_metadata_cleanup_sup.erl diff --git a/src/hashtree_tree.erl b/src/hashtree_tree.erl index 4bbb7b4..b6efc7b 100644 --- a/src/hashtree_tree.erl +++ b/src/hashtree_tree.erl @@ -108,6 +108,7 @@ destroy/1, insert/4, insert/5, + delete/3, update_snapshot/1, update_perform/1, local_compare/2, @@ -229,6 +230,20 @@ insert(Prefixes, Key, Hash, Opts, Tree) -> {error, bad_prefixes} end. +-spec delete(prefixes(), binary(), tree()) -> tree() | {error, term()}. +delete(Prefixes, Key, Tree=#hashtree_tree{dirty=Dirty}) -> + NodeName = prefixes_to_node_name(Prefixes), + case valid_prefixes(NodeName, Tree) of + true -> + Node = get_node(NodeName, Tree), + Node2 = hashtree:delete(Key, Node), + Dirty2 = gb_sets:add_element(NodeName, Dirty), + _ = set_node(NodeName, Node2, Tree), + Tree#hashtree_tree{dirty=Dirty2}; + false -> + {error, bad_prefixes} + end. + %% @doc Snapshot the tree for updating. The return tree should be %% updated using {@link update_perform/1} and to perform future operations %% on the tree diff --git a/src/plumtree_dets_metadata_manager.erl b/src/plumtree_dets_metadata_manager.erl index 2f9ab7c..68bfaed 100644 --- a/src/plumtree_dets_metadata_manager.erl +++ b/src/plumtree_dets_metadata_manager.erl @@ -1,6 +1,7 @@ -module(plumtree_dets_metadata_manager). -export([init/1, store/3, + delete/3, terminate/2]). -define(MANIFEST, cluster_meta_manifest). @@ -25,6 +26,12 @@ store(FullPrefix, Objs, State) -> ok = dets_insert(dets_tabname(FullPrefix), Objs), {ok, State}. +delete(FullPrefix, Key, State) -> + maybe_init_dets(FullPrefix, State#state.data_root), + ok = dets_delete(dets_tabname(FullPrefix), Key), + {ok, State}. + + terminate(_Reason, _State) -> close_dets_tabs(), ok = close_manifest(). @@ -83,6 +90,10 @@ dets_insert(TabName, Objs) -> ok = dets:insert(TabName, Objs), ok = dets:sync(TabName). +dets_delete(TabName, Key) -> + ok = dets:delete(TabName, Key), + ok = dets:sync(TabName). + dets_tabname(FullPrefix) -> {?MODULE, FullPrefix}. dets_tabname_to_prefix({?MODULE, FullPrefix}) -> FullPrefix. diff --git a/src/plumtree_leveldb_metadata_manager.erl b/src/plumtree_leveldb_metadata_manager.erl index 3f5fb56..39b2f1d 100644 --- a/src/plumtree_leveldb_metadata_manager.erl +++ b/src/plumtree_leveldb_metadata_manager.erl @@ -1,6 +1,7 @@ -module(plumtree_leveldb_metadata_manager). -export([init/1, store/3, + delete/3, terminate/2]). -define(MANIFEST, cluster_meta_manifest). @@ -27,6 +28,12 @@ store(FullPrefix, Objs, State) -> ok = lvldb_insert(TabRef, Updates), {ok, NewState}. +delete(FullPrefix, Key, State) -> + #state{tab_refs=Refs} = NewState = maybe_init_lvldb(FullPrefix, State), + {_, TabRef} = lists:keyfind(FullPrefix, 1, Refs), + ok = lvldb_delete(TabRef, Key), + {ok, NewState}. + objs_to_updates([{Key, Val}|Rest], Acc) -> objs_to_updates(Rest, [{put, term_to_binary(Key), term_to_binary(Val)}|Acc]); objs_to_updates([], Acc) -> Acc. @@ -91,6 +98,9 @@ close_lvldb_tab({_FullPrefix, TabRef}) -> lvldb_insert(TabRef, Updates) -> eleveldb:write(TabRef, Updates, []). +lvldb_delete(TabRef, Key) -> + eleveldb:delete(TabRef, Key, []). + lvldb_file(DataRoot, FullPrefix) -> filename:join(DataRoot, lvldb_filename(FullPrefix)). diff --git a/src/plumtree_metadata_cleanup.erl b/src/plumtree_metadata_cleanup.erl new file mode 100644 index 0000000..a2e64c8 --- /dev/null +++ b/src/plumtree_metadata_cleanup.erl @@ -0,0 +1,148 @@ +-module(plumtree_metadata_cleanup). + +-behaviour(gen_server). + +%% API functions +-export([start_link/1]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {full_prefix, interval, deleted}). + +-define(TOMBSTONE, '$deleted'). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link(FullPrefix) -> + gen_server:start_link(?MODULE, [FullPrefix], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +init([FullPrefix]) -> + {_, Deleted} = plumtree_metadata:fold(fun cleanup_tombstones/2, + {FullPrefix, gb_sets:new()}, + FullPrefix, [{resolver, lww}]), + CleanupInterval = app_helper:get_prop_or_env(cleanup_interval, [], + plumtree, 10000), + erlang:send_after(CleanupInterval, self(), cleanup), + {ok, #state{full_prefix=FullPrefix, + interval=CleanupInterval, + deleted=Deleted}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info(cleanup, #state{full_prefix=FullPrefix, + interval=Interval, + deleted=Deleted} = State) -> + {_, NewDeleted} = plumtree_metadata:fold(fun cleanup_tombstones/2, + {FullPrefix, Deleted}, + FullPrefix, [{resolver, lww}]), + erlang:send_after(Interval, self(), cleanup), + {noreply, State#state{deleted=NewDeleted}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +cleanup_tombstones({Key, ?TOMBSTONE}, {FullPrefix, Set}) -> + case gb_sets:is_element(Key, Set) of + true -> + %% delete + plumtree_metadata_manager:force_delete({FullPrefix, Key}), + {FullPrefix, gb_sets:delete(Key, Set)}; + false -> + {FullPrefix, gb_sets:add(Key, Set)} + end; +cleanup_tombstones({Key, _}, {FullPrefix, Set}) -> + {FullPrefix, gb_sets:delete_any(Key, Set)}. diff --git a/src/plumtree_metadata_cleanup_sup.erl b/src/plumtree_metadata_cleanup_sup.erl new file mode 100644 index 0000000..bfd15c6 --- /dev/null +++ b/src/plumtree_metadata_cleanup_sup.erl @@ -0,0 +1,57 @@ +-module(plumtree_metadata_cleanup_sup). + +-behaviour(supervisor). + +%% API functions +-export([start_link/0, + add_full_prefix/1]). + +%% Supervisor callbacks +-export([init/1]). + +-define(CHILD(Id, Mod, Type, Args), {Id, {Mod, start_link, Args}, + permanent, 5000, Type, [Mod]}). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the supervisor +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +add_full_prefix(FullPrefix) -> + supervisor:start_child(?MODULE, + ?CHILD({plumtree_metadata_cleanup, FullPrefix}, + plumtree_metadata_cleanup, worker, [FullPrefix])). + + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever a supervisor is started using supervisor:start_link/[2,3], +%% this function is called by the new process to find out about +%% restart strategy, maximum restart frequency and child +%% specifications. +%% +%% @spec init(Args) -> {ok, {SupFlags, [ChildSpec]}} | +%% ignore | +%% {error, Reason} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + {ok, {{one_for_one, 5, 10}, []}}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/src/plumtree_metadata_hashtree.erl b/src/plumtree_metadata_hashtree.erl index b09ea63..4f44783 100644 --- a/src/plumtree_metadata_hashtree.erl +++ b/src/plumtree_metadata_hashtree.erl @@ -34,6 +34,7 @@ lock/2, update/0, update/1, + delete/1, compare/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -163,6 +164,11 @@ update(Node) -> compare(RemoteFun, HandlerFun, HandlerAcc) -> gen_server:call(?SERVER, {compare, RemoteFun, HandlerFun, HandlerAcc}, infinity). + +-spec delete(metadata_pkey()) -> ok. +delete(PKey) -> + gen_server:call(?SERVER, {delete, PKey}, infinity). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -198,6 +204,10 @@ handle_call({prefix_hash, Prefix}, _From, State=#state{tree=Tree}) -> handle_call({insert, PKey, Hash, IfMissing}, _From, State=#state{tree=Tree}) -> {Prefixes, Key} = prepare_pkey(PKey), Tree1 = hashtree_tree:insert(Prefixes, Key, Hash, [{if_missing, IfMissing}], Tree), + {reply, ok, State#state{tree=Tree1}}; +handle_call({delete, PKey}, _From, State=#state{tree=Tree}) -> + {Prefixes, Key} = prepare_pkey(PKey), + Tree1 = hashtree_tree:delete(Prefixes, Key, Tree), {reply, ok, State#state{tree=Tree1}}. handle_cast(_Msg, State) -> diff --git a/src/plumtree_metadata_manager.erl b/src/plumtree_metadata_manager.erl index f845c17..228dbc9 100644 --- a/src/plumtree_metadata_manager.erl +++ b/src/plumtree_metadata_manager.erl @@ -47,6 +47,9 @@ graft/1, exchange/1]). +%% used by cleanup +-export([force_delete/1]). + %% used by storage backends -export([init_ets_for_full_prefix/1]). @@ -270,6 +273,13 @@ put({{Prefix, SubPrefix}, _Key}=PKey, Context, ValueOrFun) (is_binary(SubPrefix) orelse is_atom(SubPrefix)) -> gen_server:call(?SERVER, {put, PKey, Context, ValueOrFun}, infinity). +%% @doc forcefully deletes the key +-spec force_delete(metadata_pkey()) -> ok. +force_delete({{Prefix, SubPrefix}, _Key}=PKey) + when (is_binary(Prefix) orelse is_atom(Prefix)) andalso + (is_binary(SubPrefix) orelse is_atom(SubPrefix)) -> + gen_server:call(?SERVER, {force_delete, PKey}, infinity). + %% @doc same as merge/2 but merges the object on `Node' -spec merge(node(), {metadata_pkey(), undefined | metadata_context()}, metadata_object()) -> boolean(). merge(Node, {PKey, _Context}, Obj) -> @@ -390,6 +400,9 @@ handle_call({merge, PKey, Obj}, _From, State) -> handle_call({get, PKey}, _From, State) -> Result = read(PKey), {reply, Result, State}; +handle_call({force_delete, PKey}, _From, State) -> + {Result, NewState} = force_delete(PKey, State), + {reply, Result, NewState}; handle_call({open_remote_iterator, Pid, FullPrefix, KeyMatch}, _From, State) -> Iterator = new_remote_iterator(Pid, FullPrefix, KeyMatch, State), {reply, Iterator, State}; @@ -601,13 +614,37 @@ read_modify_write(PKey, Context, ValueOrFun, State=#state{serverid=ServerId}) -> read_merge_write(PKey, Obj, State) -> Existing = read(PKey), - case plumtree_metadata_object:reconcile(Obj, Existing) of - false -> {false, State}; - {true, Reconciled} -> - {_, NewState} = store(PKey, Reconciled, State), - {true, NewState} + IsDeleted = + case {Obj, Existing} of + {undefined, undefined} -> true; + {undefined, O} -> + plumtree_metadata_object:values(O) == ['$deleted']; + {O, undefined} -> + plumtree_metadata_object:values(O) == ['$deleted']; + _ -> + false + end, + case IsDeleted of + true -> + {false, State}; + false -> + case plumtree_metadata_object:reconcile(Obj, Existing) of + false -> {false, State}; + {true, Reconciled} -> + {_, NewState} = store(PKey, Reconciled, State), + {true, NewState} + end end. +force_delete({FullPrefix, Key}=PKey, + #state{storage_mod=Mod, + storage_mod_state=ModSt} = State) -> + Tab = ets_tab(FullPrefix), + ets:delete(Tab, Key), + {ok, NewModSt} = Mod:delete(FullPrefix, Key, ModSt), + ok = plumtree_metadata_hashtree:delete(PKey), + {ok, State#state{storage_mod_state=NewModSt}}. + store({FullPrefix, Key}=PKey, Metadata, State) -> #state{storage_mod=Mod, storage_mod_state=ModSt, @@ -665,13 +702,15 @@ ets_tab(FullPrefix) -> maybe_init_ets(FullPrefix) -> case ets_tab(FullPrefix) of - undefined -> init_ets(FullPrefix); + undefined -> + init_ets(FullPrefix); _TabId -> ok end. init_ets(FullPrefix) -> TabId = new_ets_tab(), ets:insert(?ETS, [{FullPrefix, TabId}]), + plumtree_metadata_cleanup_sup:add_full_prefix(FullPrefix), TabId. new_ets_tab() -> diff --git a/src/plumtree_sup.erl b/src/plumtree_sup.erl index 01f7165..d7bad7d 100644 --- a/src/plumtree_sup.erl +++ b/src/plumtree_sup.erl @@ -35,6 +35,7 @@ start_link() -> init([]) -> Children = lists:flatten( [ + ?CHILD(plumtree_metadata_cleanup_sup, supervisor), ?CHILD(plumtree_peer_service_gossip, worker), ?CHILD(plumtree_peer_service_events, worker), ?CHILD(plumtree_broadcast, worker), From 2222c208daf706113b401f6a46967208fcb888ae Mon Sep 17 00:00:00 2001 From: Andre Graf Date: Tue, 21 Jul 2015 22:09:07 +0200 Subject: [PATCH 02/11] refactoring and some cleanup, cleanup test case --- src/plumtree_metadata.erl | 14 +++- src/plumtree_metadata_cleanup.erl | 116 +++++++++++++++++++------- src/plumtree_metadata_cleanup_sup.erl | 15 ++++ src/plumtree_peer_service.erl | 26 +++--- test/cluster_membership_SUITE.erl | 11 ++- test/metadata_SUITE.erl | 28 ++++++- 6 files changed, 163 insertions(+), 47 deletions(-) diff --git a/src/plumtree_metadata.erl b/src/plumtree_metadata.erl index d968cf8..741ca38 100644 --- a/src/plumtree_metadata.erl +++ b/src/plumtree_metadata.erl @@ -39,7 +39,9 @@ put/3, put/4, delete/2, - delete/3]). + delete/3, + cleanup/2, + cleanup_all/1]). -include("plumtree_metadata.hrl"). @@ -332,6 +334,16 @@ delete(FullPrefix, Key) -> delete(FullPrefix, Key, _Opts) -> put(FullPrefix, Key, ?TOMBSTONE, []). + +-spec cleanup(metadata_prefix(), pos_integer()) -> ok | {error, any()}. +cleanup(FullPrefix, AgeInSecs) -> + plumtree_metadata_cleanup:force_cleanup(FullPrefix, AgeInSecs). + +-spec cleanup_all(metadata_prefix()) -> ok | {error, any()}. +cleanup_all(AgeInSecs) -> + plumtree_metadata_cleanup:force_cleanup(AgeInSecs). + + %%%=================================================================== %%% Internal functions %%%=================================================================== diff --git a/src/plumtree_metadata_cleanup.erl b/src/plumtree_metadata_cleanup.erl index a2e64c8..7bcd93d 100644 --- a/src/plumtree_metadata_cleanup.erl +++ b/src/plumtree_metadata_cleanup.erl @@ -3,7 +3,9 @@ -behaviour(gen_server). %% API functions --export([start_link/1]). +-export([start_link/1, + force_cleanup/1, + force_cleanup/2]). %% gen_server callbacks -export([init/1, @@ -13,7 +15,7 @@ terminate/2, code_change/3]). --record(state, {full_prefix, interval, deleted}). +-record(state, {interval, deleted, tref, waiting}). -define(TOMBSTONE, '$deleted'). @@ -21,16 +23,24 @@ %%% API functions %%%=================================================================== -%%-------------------------------------------------------------------- -%% @doc -%% Starts the server -%% -%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} -%% @end -%%-------------------------------------------------------------------- start_link(FullPrefix) -> gen_server:start_link(?MODULE, [FullPrefix], []). +force_cleanup(AgeInSecs) when AgeInSecs > 0 -> + lists:foreach(fun({_FullPrefix, Pid}) -> + force_cleanup(Pid, AgeInSecs) + end, plumtree_metadata_cleanup_sup:get_full_prefix_and_pid()). + +force_cleanup(FullPrefix, AgeInSecs) when is_tuple(FullPrefix) -> + case plumtree_metadata_cleanup_sup:get_pid(FullPrefix) of + {ok, Pid} -> + force_cleanup(Pid, AgeInSecs); + E -> + E + end; +force_cleanup(Pid, AgeInSecs) when is_pid(Pid) and (AgeInSecs > 0) -> + gen_server:call(Pid, {force_cleanup, AgeInSecs}, infinity). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -47,15 +57,19 @@ start_link(FullPrefix) -> %% @end %%-------------------------------------------------------------------- init([FullPrefix]) -> - {_, Deleted} = plumtree_metadata:fold(fun cleanup_tombstones/2, - {FullPrefix, gb_sets:new()}, - FullPrefix, [{resolver, lww}]), CleanupInterval = app_helper:get_prop_or_env(cleanup_interval, [], - plumtree, 10000), - erlang:send_after(CleanupInterval, self(), cleanup), - {ok, #state{full_prefix=FullPrefix, - interval=CleanupInterval, - deleted=Deleted}}. + plumtree, undefined), + case CleanupInterval of + undefined -> + %% no cleanup happens + {ok, #state{interval=undefined, + deleted={FullPrefix, gb_sets:new()}}}; + _ when CleanupInterval > 0 -> + TRef = erlang:send_after(0, self(), cleanup), + {ok, #state{interval=CleanupInterval * 1000, + tref=TRef, + deleted={FullPrefix, gb_sets:new()}}} + end. %%-------------------------------------------------------------------- %% @private @@ -71,9 +85,26 @@ init([FullPrefix]) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. +handle_call({force_cleanup, AgeInSecs}, From, #state{tref=TRef, + waiting=Waiting, + deleted=Deleted} = State) -> + case Waiting of + undefined -> + case TRef of + undefined -> + ignore; + _ -> + erlang:cancel_timer(TRef) + end, + NewDeleted = cleanup_tombstones(Deleted), + NewTRef = erlang:send_after(AgeInSecs * 1000, self(), cleanup), + {noreply, State#state{deleted=NewDeleted, + tref=NewTRef, + waiting=From}}; + _ -> + {reply, {error, already_scheduled}, State} + end. + %%-------------------------------------------------------------------- %% @private @@ -98,14 +129,25 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_info(cleanup, #state{full_prefix=FullPrefix, +handle_info(cleanup, #state{waiting=Waiting, interval=Interval, deleted=Deleted} = State) -> - {_, NewDeleted} = plumtree_metadata:fold(fun cleanup_tombstones/2, - {FullPrefix, Deleted}, - FullPrefix, [{resolver, lww}]), - erlang:send_after(Interval, self(), cleanup), - {noreply, State#state{deleted=NewDeleted}}. + NewDeleted = cleanup_tombstones(Deleted), + case Waiting of + undefined -> + ignore; + _ -> + gen_server:reply(Waiting, ok) + end, + NewState = + case Interval of + undefined -> + State#state{waiting=undefined, deleted=NewDeleted}; + _ -> + NewTRef = erlang:send_after(Interval, self(), cleanup), + State#state{waiting=undefined, deleted=NewDeleted, tref=NewTRef} + end, + {noreply, NewState}. %%-------------------------------------------------------------------- %% @private @@ -135,14 +177,26 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -cleanup_tombstones({Key, ?TOMBSTONE}, {FullPrefix, Set}) -> +cleanup_tombstones({FullPrefix, Set}) -> + T1 = os:timestamp(), + {_, Marked, Deleted, Total, NewSet} = plumtree_metadata:fold( + fun cleanup_tombstones/2, + {FullPrefix, 0, 0, 0, Set}, + FullPrefix, [{resolver, lww}]), + T2 = os:timestamp(), + DiffInMs = timer:now_diff(T2, T1) div 1000, + lager:info("completed cleanup for ~p in ~pms. deleted ~p, marked ~p, good ~p", + [FullPrefix, DiffInMs, Deleted, Marked, Total]), + {FullPrefix, NewSet}. + +cleanup_tombstones({Key, ?TOMBSTONE}, {FullPrefix, Marked, Deleted, Total, Set}) -> case gb_sets:is_element(Key, Set) of true -> %% delete plumtree_metadata_manager:force_delete({FullPrefix, Key}), - {FullPrefix, gb_sets:delete(Key, Set)}; + {FullPrefix, Marked, Deleted + 1, Total, gb_sets:delete(Key, Set)}; false -> - {FullPrefix, gb_sets:add(Key, Set)} + {FullPrefix, Marked + 1, Deleted, Total, gb_sets:add(Key, Set)} end; -cleanup_tombstones({Key, _}, {FullPrefix, Set}) -> - {FullPrefix, gb_sets:delete_any(Key, Set)}. +cleanup_tombstones({Key, _}, {FullPrefix, Marked, Deleted, Total, Set}) -> + {FullPrefix, Marked, Deleted, Total + 1, gb_sets:delete_any(Key, Set)}. diff --git a/src/plumtree_metadata_cleanup_sup.erl b/src/plumtree_metadata_cleanup_sup.erl index bfd15c6..a8424fa 100644 --- a/src/plumtree_metadata_cleanup_sup.erl +++ b/src/plumtree_metadata_cleanup_sup.erl @@ -4,6 +4,8 @@ %% API functions -export([start_link/0, + get_pid/1, + get_full_prefix_and_pid/0, add_full_prefix/1]). %% Supervisor callbacks @@ -31,6 +33,19 @@ add_full_prefix(FullPrefix) -> ?CHILD({plumtree_metadata_cleanup, FullPrefix}, plumtree_metadata_cleanup, worker, [FullPrefix])). +get_pid(FullPrefix) -> + case lists:keyfind({plumtree_metadata_cleanup, FullPrefix}, 1, + supervisor:which_children(?MODULE)) of + {_, Pid, _, _} when is_pid(Pid) -> + {ok, Pid}; + _ -> + {error, not_found} + end. + +get_full_prefix_and_pid() -> + [{FullPrefix, Pid} || {{plumtree_metadata_cleanup, FullPrefix}, Pid, _, _} + <- supervisor:which_children(?MODULE), is_pid(Pid)]. + %%%=================================================================== %%% Supervisor callbacks diff --git a/src/plumtree_peer_service.erl b/src/plumtree_peer_service.erl index 06745e5..9805cf9 100644 --- a/src/plumtree_peer_service.erl +++ b/src/plumtree_peer_service.erl @@ -27,7 +27,8 @@ attempt_join/2, leave/1, stop/0, - stop/1 + stop/1, + stop/2 ]). %% @doc prepare node to join a cluster @@ -40,7 +41,7 @@ join(NodeStr, Auto) when is_list(NodeStr) -> join(Node, Auto) when is_atom(Node) -> join(node(), Node, Auto). -%% @doc Initiate join. Nodes cannot join themselves. +%% @doc Initiate join. Nodes cannot join themselves. join(Node, Node, _) -> {error, self_join}; join(_, Node, _Auto) -> @@ -59,15 +60,15 @@ attempt_join(Node) -> attempt_join(Node, Local) -> {ok, Remote} = gen_server:call({plumtree_peer_service_gossip, Node}, send_state), - Merged = riak_dt_orswot:merge(Remote, Local), + Merged = riak_dt_orswot:merge(Remote, Local), _ = plumtree_peer_service_manager:update_state(Merged), %% broadcast to all nodes %% get peer list Members = riak_dt_orswot:value(Merged), _ = [gen_server:cast({plumtree_peer_service_gossip, P}, {receive_state, Merged}) || P <- Members, P /= node()], ok. - -leave(_Args) when is_list(_Args) -> + +leave(Args) when is_list(Args) -> {ok, Local} = plumtree_peer_service_manager:get_local_state(), {ok, Actor} = plumtree_peer_service_manager:get_actor(), {ok, Leave} = riak_dt_orswot:update({remove, node()}, Actor, Local), @@ -82,21 +83,26 @@ leave(_Args) when is_list(_Args) -> [] -> %% leaving the cluster shuts down the node plumtree_peer_service_manager:delete_state(), - stop("Leaving cluster"); + stop("Leaving cluster", Args); _ -> - leave([]) + leave(Args) end; {error, singleton} -> lager:warning("Cannot leave, not a member of a cluster.") end; -leave(_Args) -> - leave([]). +leave(Args) -> + leave(Args). stop() -> - stop("received stop request"). + stop("received stop request", []). stop(Reason) -> + stop(Reason, []). + +stop(Reason, Args) -> + StopFun = proplists:get_value(stop_fun, Args, fun() -> ok end), lager:notice("~p", [Reason]), + StopFun(), ok. random_peer(Leave) -> diff --git a/test/cluster_membership_SUITE.erl b/test/cluster_membership_SUITE.erl index ead5a52..9c9015e 100644 --- a/test/cluster_membership_SUITE.erl +++ b/test/cluster_membership_SUITE.erl @@ -119,7 +119,8 @@ leave_test(Config) -> [?assertEqual({Node, Expected}, {Node, lists:sort(plumtree_test_utils:get_cluster_members(Node))}) || Node <- Nodes], - ?assertEqual(ok, rpc:call(Node1, plumtree_peer_service, leave, [[]])), + ?assertEqual(ok, rpc:call(Node1, plumtree_peer_service, leave, + [[{stop_fun, fun init:stop/0}]])), Expected2 = lists:sort(OtherNodes), ok = plumtree_test_utils:wait_until_left(OtherNodes, Node1), %% should be a 3 node cluster now @@ -140,7 +141,8 @@ leave_rejoin_test(Config) -> [?assertEqual({Node, Expected}, {Node, lists:sort(plumtree_test_utils:get_cluster_members(Node))}) || Node <- Nodes], - ?assertEqual(ok, rpc:call(Node1, plumtree_peer_service, leave, [[]])), + ?assertEqual(ok, rpc:call(Node1, plumtree_peer_service, leave, + [[{stop_fun, fun init:stop/0}]])), Expected2 = lists:sort(OtherNodes), ok = plumtree_test_utils:wait_until_left(OtherNodes, Node1), %% should be a 3 node cluster now @@ -153,7 +155,7 @@ leave_rejoin_test(Config) -> %% rejoin cluster ?assertEqual(ok, rpc:call(Node1, plumtree_peer_service, join, [Node2])), ok = plumtree_test_utils:wait_until_joined(Nodes, Expected), - [?assertEqual({Node, Expected}, {Node, + [?assertEqual({Node, Expected}, {Node, lists:sort(plumtree_test_utils:get_cluster_members(Node))}) || Node <- Nodes], ok. @@ -179,7 +181,8 @@ sticky_membership_test(Config) -> ct_slave:stop(jaguar), ok = plumtree_test_utils:wait_until_offline(Node1), [Node2|LastTwo] = OtherNodes, - ?assertEqual(ok, rpc:call(Node2, plumtree_peer_service, leave, [[]])), + ?assertEqual(ok, rpc:call(Node2, plumtree_peer_service, leave, + [[{stop_fun, fun init:stop/0}]])), ok = plumtree_test_utils:wait_until_left(LastTwo, Node2), ok = plumtree_test_utils:wait_until_offline(Node2), Expected2 = lists:sort(Nodes -- [Node2]), diff --git a/test/metadata_SUITE.erl b/test/metadata_SUITE.erl index 7b30517..9abcbe6 100644 --- a/test/metadata_SUITE.erl +++ b/test/metadata_SUITE.erl @@ -31,6 +31,7 @@ -export([ read_write_delete_test/1, + manual_force_cleanup_test/1, partitioned_cluster_test/1, siblings_test/1 ]). @@ -72,7 +73,8 @@ end_per_testcase(_, _Config) -> ok. all() -> - [read_write_delete_test, partitioned_cluster_test, siblings_test]. + [read_write_delete_test, manual_force_cleanup_test, + partitioned_cluster_test, siblings_test]. read_write_delete_test(Config) -> [Node1|OtherNodes] = Nodes = proplists:get_value(nodes, Config), @@ -93,6 +95,30 @@ read_write_delete_test(Config) -> ok = wait_until_converged(Nodes, {foo, bar}, baz, undefined), ok. +manual_force_cleanup_test(Config) -> + ok = read_write_delete_test(Config), + Nodes = proplists:get_value(nodes, Config), + {Res1, _} = rpc:multicall(Nodes, plumtree_metadata_manager, size, [{foo, bar}]), + + %% every node still has one tombstone entry in the ets cache + ?assertEqual(length(Nodes), lists:sum(Res1)), + + lists:foreach(fun(Node) -> + %% this blocks for 5 seconds, during this time + %% other nodes will discover the discrepancy in the + %% hashtree and will try to replicate the proper + %% tombstone... which we don't merge in read_write_merge. + %% eventually all tombstones are removed. + rpc:call(Node, plumtree_metadata, cleanup_all, [5]) + end, Nodes), + {Res2, _} = rpc:multicall(Nodes, plumtree_metadata_manager, size, [{foo, bar}]), + + ?assertEqual(0, lists:sum(Res2)), + ok. + + + + partitioned_cluster_test(Config) -> [Node1|OtherNodes] = Nodes = proplists:get_value(nodes, Config), [?assertEqual(ok, rpc:call(Node, plumtree_peer_service, join, [Node1])) From 51bfe37669db36056558eceaf23976310dec7226 Mon Sep 17 00:00:00 2001 From: Andre Graf Date: Wed, 22 Jul 2015 16:47:45 +0200 Subject: [PATCH 03/11] bugfix and simple refactorings and utility functions --- src/plumtree_leveldb_metadata_manager.erl | 2 +- src/plumtree_metadata_cleanup.erl | 83 ++++++++++++++++++++--- 2 files changed, 76 insertions(+), 9 deletions(-) diff --git a/src/plumtree_leveldb_metadata_manager.erl b/src/plumtree_leveldb_metadata_manager.erl index 39b2f1d..f242ba5 100644 --- a/src/plumtree_leveldb_metadata_manager.erl +++ b/src/plumtree_leveldb_metadata_manager.erl @@ -31,7 +31,7 @@ store(FullPrefix, Objs, State) -> delete(FullPrefix, Key, State) -> #state{tab_refs=Refs} = NewState = maybe_init_lvldb(FullPrefix, State), {_, TabRef} = lists:keyfind(FullPrefix, 1, Refs), - ok = lvldb_delete(TabRef, Key), + ok = lvldb_delete(TabRef, term_to_binary(Key)), {ok, NewState}. objs_to_updates([{Key, Val}|Rest], Acc) -> diff --git a/src/plumtree_metadata_cleanup.erl b/src/plumtree_metadata_cleanup.erl index 7bcd93d..1d932bf 100644 --- a/src/plumtree_metadata_cleanup.erl +++ b/src/plumtree_metadata_cleanup.erl @@ -5,7 +5,11 @@ %% API functions -export([start_link/1, force_cleanup/1, - force_cleanup/2]). + force_cleanup/2, + set_interval/1, + set_interval/2, + cancel_cleanup/0, + cancel_cleanup/1]). %% gen_server callbacks -export([init/1, @@ -27,19 +31,45 @@ start_link(FullPrefix) -> gen_server:start_link(?MODULE, [FullPrefix], []). force_cleanup(AgeInSecs) when AgeInSecs > 0 -> + for_all_cleaners(fun force_cleanup/2, [AgeInSecs]). + +force_cleanup(FullPrefix, AgeInSecs) when is_tuple(FullPrefix) -> + for_cleaner(FullPrefix, fun force_cleanup/2, [AgeInSecs]); +force_cleanup(Pid, AgeInSecs) when is_pid(Pid) and (AgeInSecs > 0) -> + gen_server:call(Pid, {force_cleanup, AgeInSecs}, infinity). + +set_interval(Int) when Int >= 0 -> + for_all_cleaners(fun set_interval/2, [Int]). + +set_interval(FullPrefix, Int) when is_tuple(FullPrefix) and (Int >= 0) -> + for_cleaner(FullPrefix, fun set_interval/2, [Int]); +set_interval(Pid, Int) when is_pid(Pid) and (Int >= 0) -> + gen_server:call(Pid, {set_interval, Int}, infinity). + +cancel_cleanup() -> + for_all_cleaners(fun cancel_cleanup/1, []). + +cancel_cleanup(FullPrefix) when is_tuple(FullPrefix) -> + for_cleaner(FullPrefix, fun cancel_cleanup/1, []); +cancel_cleanup(Pid) when is_pid(Pid) -> + gen_server:call(Pid, cancel_cleanup, infinity). + + +for_all_cleaners(Fun, Args) -> lists:foreach(fun({_FullPrefix, Pid}) -> - force_cleanup(Pid, AgeInSecs) + apply(Fun, [Pid|Args]) end, plumtree_metadata_cleanup_sup:get_full_prefix_and_pid()). -force_cleanup(FullPrefix, AgeInSecs) when is_tuple(FullPrefix) -> +for_cleaner(FullPrefix, Fun, Args) -> case plumtree_metadata_cleanup_sup:get_pid(FullPrefix) of {ok, Pid} -> - force_cleanup(Pid, AgeInSecs); + apply(Fun, [Pid|Args]); E -> E - end; -force_cleanup(Pid, AgeInSecs) when is_pid(Pid) and (AgeInSecs > 0) -> - gen_server:call(Pid, {force_cleanup, AgeInSecs}, infinity). + end. + + + %%%=================================================================== %%% gen_server callbacks @@ -103,9 +133,46 @@ handle_call({force_cleanup, AgeInSecs}, From, #state{tref=TRef, waiting=From}}; _ -> {reply, {error, already_scheduled}, State} + end; +handle_call({set_interval, IntInSecs}, _From, #state{waiting=Waiting, + tref=TRef} = State) -> + case Waiting of + undefined -> + {NewInterval, NewTRef} = + case {IntInSecs, TRef} of + {0, undefined} -> + {undefined, undefined}; + {0, _} -> + erlang:cancel_timer(TRef), + {undefined, undefined}; + {_, undefined} -> + NewIntInSecs = IntInSecs * 1000, + {NewIntInSecs, erlang:send_after(NewIntInSecs, self(), cleanup)}; + {_, _} -> + erlang:cancel_timer(TRef), + NewIntInSecs = IntInSecs * 1000, + {NewIntInSecs, erlang:send_after(NewIntInSecs, self(), cleanup)} + end, + {reply, ok, State#state{interval=NewInterval, tref=NewTRef}}; + _ -> + {reply, {error, waiting_for_cleanup}, State} + end; +handle_call(cancel_cleanup, _From, #state{waiting=Waiting, + interval=Interval} = State) -> + case Waiting of + undefined -> + {reply, {error, no_waiting_proc}, State}; + _ -> + gen_server:reply(Waiting, canceled), + case Interval of + undefined -> + {reply, ok, State#state{waiting=undefined}}; + _ -> + NewTRef = erlang:send_after(Interval, self(), cleanup), + {reply, ok, State#state{waiting=undefined, tref=NewTRef}} + end end. - %%-------------------------------------------------------------------- %% @private %% @doc From 8b9fb125843a28d25fc0ab01d86c5c9e9f39e2d0 Mon Sep 17 00:00:00 2001 From: Andre Graf Date: Sun, 19 Jul 2015 13:41:22 +0200 Subject: [PATCH 04/11] first version of proper tombstone cleanup --- src/hashtree_tree.erl | 15 +++ src/plumtree_metadata_cleanup.erl | 148 ++++++++++++++++++++++++++ src/plumtree_metadata_cleanup_sup.erl | 57 ++++++++++ src/plumtree_metadata_hashtree.erl | 10 ++ src/plumtree_metadata_manager.erl | 23 ++++ src/plumtree_sup.erl | 1 + 6 files changed, 254 insertions(+) create mode 100644 src/plumtree_metadata_cleanup.erl create mode 100644 src/plumtree_metadata_cleanup_sup.erl diff --git a/src/hashtree_tree.erl b/src/hashtree_tree.erl index 4bbb7b4..b6efc7b 100644 --- a/src/hashtree_tree.erl +++ b/src/hashtree_tree.erl @@ -108,6 +108,7 @@ destroy/1, insert/4, insert/5, + delete/3, update_snapshot/1, update_perform/1, local_compare/2, @@ -229,6 +230,20 @@ insert(Prefixes, Key, Hash, Opts, Tree) -> {error, bad_prefixes} end. +-spec delete(prefixes(), binary(), tree()) -> tree() | {error, term()}. +delete(Prefixes, Key, Tree=#hashtree_tree{dirty=Dirty}) -> + NodeName = prefixes_to_node_name(Prefixes), + case valid_prefixes(NodeName, Tree) of + true -> + Node = get_node(NodeName, Tree), + Node2 = hashtree:delete(Key, Node), + Dirty2 = gb_sets:add_element(NodeName, Dirty), + _ = set_node(NodeName, Node2, Tree), + Tree#hashtree_tree{dirty=Dirty2}; + false -> + {error, bad_prefixes} + end. + %% @doc Snapshot the tree for updating. The return tree should be %% updated using {@link update_perform/1} and to perform future operations %% on the tree diff --git a/src/plumtree_metadata_cleanup.erl b/src/plumtree_metadata_cleanup.erl new file mode 100644 index 0000000..a2e64c8 --- /dev/null +++ b/src/plumtree_metadata_cleanup.erl @@ -0,0 +1,148 @@ +-module(plumtree_metadata_cleanup). + +-behaviour(gen_server). + +%% API functions +-export([start_link/1]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-record(state, {full_prefix, interval, deleted}). + +-define(TOMBSTONE, '$deleted'). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link(FullPrefix) -> + gen_server:start_link(?MODULE, [FullPrefix], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +init([FullPrefix]) -> + {_, Deleted} = plumtree_metadata:fold(fun cleanup_tombstones/2, + {FullPrefix, gb_sets:new()}, + FullPrefix, [{resolver, lww}]), + CleanupInterval = app_helper:get_prop_or_env(cleanup_interval, [], + plumtree, 10000), + erlang:send_after(CleanupInterval, self(), cleanup), + {ok, #state{full_prefix=FullPrefix, + interval=CleanupInterval, + deleted=Deleted}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @spec handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @spec handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +handle_info(cleanup, #state{full_prefix=FullPrefix, + interval=Interval, + deleted=Deleted} = State) -> + {_, NewDeleted} = plumtree_metadata:fold(fun cleanup_tombstones/2, + {FullPrefix, Deleted}, + FullPrefix, [{resolver, lww}]), + erlang:send_after(Interval, self(), cleanup), + {noreply, State#state{deleted=NewDeleted}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +cleanup_tombstones({Key, ?TOMBSTONE}, {FullPrefix, Set}) -> + case gb_sets:is_element(Key, Set) of + true -> + %% delete + plumtree_metadata_manager:force_delete({FullPrefix, Key}), + {FullPrefix, gb_sets:delete(Key, Set)}; + false -> + {FullPrefix, gb_sets:add(Key, Set)} + end; +cleanup_tombstones({Key, _}, {FullPrefix, Set}) -> + {FullPrefix, gb_sets:delete_any(Key, Set)}. diff --git a/src/plumtree_metadata_cleanup_sup.erl b/src/plumtree_metadata_cleanup_sup.erl new file mode 100644 index 0000000..bfd15c6 --- /dev/null +++ b/src/plumtree_metadata_cleanup_sup.erl @@ -0,0 +1,57 @@ +-module(plumtree_metadata_cleanup_sup). + +-behaviour(supervisor). + +%% API functions +-export([start_link/0, + add_full_prefix/1]). + +%% Supervisor callbacks +-export([init/1]). + +-define(CHILD(Id, Mod, Type, Args), {Id, {Mod, start_link, Args}, + permanent, 5000, Type, [Mod]}). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the supervisor +%% +%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +add_full_prefix(FullPrefix) -> + supervisor:start_child(?MODULE, + ?CHILD({plumtree_metadata_cleanup, FullPrefix}, + plumtree_metadata_cleanup, worker, [FullPrefix])). + + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever a supervisor is started using supervisor:start_link/[2,3], +%% this function is called by the new process to find out about +%% restart strategy, maximum restart frequency and child +%% specifications. +%% +%% @spec init(Args) -> {ok, {SupFlags, [ChildSpec]}} | +%% ignore | +%% {error, Reason} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + {ok, {{one_for_one, 5, 10}, []}}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/src/plumtree_metadata_hashtree.erl b/src/plumtree_metadata_hashtree.erl index 3688f23..4da77b2 100644 --- a/src/plumtree_metadata_hashtree.erl +++ b/src/plumtree_metadata_hashtree.erl @@ -34,6 +34,7 @@ lock/2, update/0, update/1, + delete/1, compare/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -163,6 +164,11 @@ update(Node) -> compare(RemoteFun, HandlerFun, HandlerAcc) -> gen_server:call(?SERVER, {compare, RemoteFun, HandlerFun, HandlerAcc}, infinity). + +-spec delete(metadata_pkey()) -> ok. +delete(PKey) -> + gen_server:call(?SERVER, {delete, PKey}, infinity). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -198,6 +204,10 @@ handle_call({prefix_hash, Prefix}, _From, State=#state{tree=Tree}) -> handle_call({insert, PKey, Hash, IfMissing}, _From, State=#state{tree=Tree}) -> {Prefixes, Key} = prepare_pkey(PKey), Tree1 = hashtree_tree:insert(Prefixes, Key, Hash, [{if_missing, IfMissing}], Tree), + {reply, ok, State#state{tree=Tree1}}; +handle_call({delete, PKey}, _From, State=#state{tree=Tree}) -> + {Prefixes, Key} = prepare_pkey(PKey), + Tree1 = hashtree_tree:delete(Prefixes, Key, Tree), {reply, ok, State#state{tree=Tree1}}. handle_cast(_Msg, State) -> diff --git a/src/plumtree_metadata_manager.erl b/src/plumtree_metadata_manager.erl index 707fa3f..c52c4e1 100644 --- a/src/plumtree_metadata_manager.erl +++ b/src/plumtree_metadata_manager.erl @@ -46,6 +46,9 @@ graft/1, exchange/1]). +%% used by cleanup +-export([force_delete/1]). + %% utilities -export([size/1, subscribe/1, @@ -215,6 +218,13 @@ put({{Prefix, SubPrefix}, _Key}=PKey, Context, ValueOrFun) (is_binary(SubPrefix) orelse is_atom(SubPrefix)) -> read_modify_write(PKey, Context, ValueOrFun). +%% @doc forcefully deletes the key +-spec force_delete(metadata_pkey()) -> ok. +force_delete({{Prefix, SubPrefix}, _Key}=PKey) + when (is_binary(Prefix) orelse is_atom(Prefix)) andalso + (is_binary(SubPrefix) orelse is_atom(SubPrefix)) -> + gen_server:call(?SERVER, {force_delete, PKey}, infinity). + %% @doc same as merge/2 but merges the object on `Node' -spec merge(node(), {metadata_pkey(), undefined | metadata_context()}, metadata_object()) -> boolean(). merge(Node, {PKey, _Context}, Obj) -> @@ -316,6 +326,9 @@ handle_call({merge, PKey, Obj}, _From, State) -> handle_call({get, PKey}, _From, State) -> Result = read(PKey), {reply, Result, State}; +handle_call({force_delete, PKey}, _From, State) -> + {Result, NewState} = force_delete(PKey, State), + {reply, Result, NewState}; handle_call({open_remote_iterator, Pid, FullPrefix, KeyMatch}, _From, State) -> Iterator = new_remote_iterator(Pid, FullPrefix, KeyMatch), {reply, Iterator, State}; @@ -449,6 +462,7 @@ read_modify_write(PKey, Context, ValueOrFun) -> read_merge_write(PKey, Obj) -> Existing = read(PKey), +<<<<<<< HEAD case plumtree_metadata_object:reconcile(Obj, Existing) of false -> false; {true, Reconciled} -> @@ -479,6 +493,15 @@ store({FullPrefix, Key}=PKey, Metadata) -> ets:lookup(?SUBS, FullPrefix)), Metadata. +force_delete({FullPrefix, Key}=PKey, + #state{storage_mod=Mod, + storage_mod_state=ModSt} = State) -> + Tab = ets_tab(FullPrefix), + ets:delete(Tab, Key), + {ok, NewModSt} = Mod:delete(FullPrefix, Key, ModSt), + ok = plumtree_metadata_hashtree:delete(PKey), + {ok, State#state{storage_mod_state=NewModSt}}. + trigger_subscription_event(FullPrefix, Event, [{FullPrefix, {Pid, _}}|Rest]) -> Pid ! Event, trigger_subscription_event(FullPrefix, Event, Rest); diff --git a/src/plumtree_sup.erl b/src/plumtree_sup.erl index 17b57a0..8c38c9a 100644 --- a/src/plumtree_sup.erl +++ b/src/plumtree_sup.erl @@ -37,6 +37,7 @@ init([]) -> [ ?CHILD(plumtree_metadata_leveldb_iterator_sup, supervisor), ?CHILD(plumtree_metadata_leveldb_instance_sup, supervisor), + ?CHILD(plumtree_metadata_cleanup_sup, supervisor), ?CHILD(plumtree_peer_service_gossip, worker), ?CHILD(plumtree_peer_service_events, worker), ?CHILD(plumtree_broadcast, worker), From 823e3e70b2331173fc67bfdf2d56963ba6d9bb3c Mon Sep 17 00:00:00 2001 From: Andre Graf Date: Tue, 21 Jul 2015 22:09:07 +0200 Subject: [PATCH 05/11] refactoring and some cleanup, cleanup test case --- src/plumtree_metadata.erl | 14 +++- src/plumtree_metadata_cleanup.erl | 116 +++++++++++++++++++------- src/plumtree_metadata_cleanup_sup.erl | 15 ++++ src/plumtree_peer_service.erl | 26 +++--- test/metadata_SUITE.erl | 28 ++++++- 5 files changed, 156 insertions(+), 43 deletions(-) diff --git a/src/plumtree_metadata.erl b/src/plumtree_metadata.erl index d968cf8..741ca38 100644 --- a/src/plumtree_metadata.erl +++ b/src/plumtree_metadata.erl @@ -39,7 +39,9 @@ put/3, put/4, delete/2, - delete/3]). + delete/3, + cleanup/2, + cleanup_all/1]). -include("plumtree_metadata.hrl"). @@ -332,6 +334,16 @@ delete(FullPrefix, Key) -> delete(FullPrefix, Key, _Opts) -> put(FullPrefix, Key, ?TOMBSTONE, []). + +-spec cleanup(metadata_prefix(), pos_integer()) -> ok | {error, any()}. +cleanup(FullPrefix, AgeInSecs) -> + plumtree_metadata_cleanup:force_cleanup(FullPrefix, AgeInSecs). + +-spec cleanup_all(metadata_prefix()) -> ok | {error, any()}. +cleanup_all(AgeInSecs) -> + plumtree_metadata_cleanup:force_cleanup(AgeInSecs). + + %%%=================================================================== %%% Internal functions %%%=================================================================== diff --git a/src/plumtree_metadata_cleanup.erl b/src/plumtree_metadata_cleanup.erl index a2e64c8..7bcd93d 100644 --- a/src/plumtree_metadata_cleanup.erl +++ b/src/plumtree_metadata_cleanup.erl @@ -3,7 +3,9 @@ -behaviour(gen_server). %% API functions --export([start_link/1]). +-export([start_link/1, + force_cleanup/1, + force_cleanup/2]). %% gen_server callbacks -export([init/1, @@ -13,7 +15,7 @@ terminate/2, code_change/3]). --record(state, {full_prefix, interval, deleted}). +-record(state, {interval, deleted, tref, waiting}). -define(TOMBSTONE, '$deleted'). @@ -21,16 +23,24 @@ %%% API functions %%%=================================================================== -%%-------------------------------------------------------------------- -%% @doc -%% Starts the server -%% -%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} -%% @end -%%-------------------------------------------------------------------- start_link(FullPrefix) -> gen_server:start_link(?MODULE, [FullPrefix], []). +force_cleanup(AgeInSecs) when AgeInSecs > 0 -> + lists:foreach(fun({_FullPrefix, Pid}) -> + force_cleanup(Pid, AgeInSecs) + end, plumtree_metadata_cleanup_sup:get_full_prefix_and_pid()). + +force_cleanup(FullPrefix, AgeInSecs) when is_tuple(FullPrefix) -> + case plumtree_metadata_cleanup_sup:get_pid(FullPrefix) of + {ok, Pid} -> + force_cleanup(Pid, AgeInSecs); + E -> + E + end; +force_cleanup(Pid, AgeInSecs) when is_pid(Pid) and (AgeInSecs > 0) -> + gen_server:call(Pid, {force_cleanup, AgeInSecs}, infinity). + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -47,15 +57,19 @@ start_link(FullPrefix) -> %% @end %%-------------------------------------------------------------------- init([FullPrefix]) -> - {_, Deleted} = plumtree_metadata:fold(fun cleanup_tombstones/2, - {FullPrefix, gb_sets:new()}, - FullPrefix, [{resolver, lww}]), CleanupInterval = app_helper:get_prop_or_env(cleanup_interval, [], - plumtree, 10000), - erlang:send_after(CleanupInterval, self(), cleanup), - {ok, #state{full_prefix=FullPrefix, - interval=CleanupInterval, - deleted=Deleted}}. + plumtree, undefined), + case CleanupInterval of + undefined -> + %% no cleanup happens + {ok, #state{interval=undefined, + deleted={FullPrefix, gb_sets:new()}}}; + _ when CleanupInterval > 0 -> + TRef = erlang:send_after(0, self(), cleanup), + {ok, #state{interval=CleanupInterval * 1000, + tref=TRef, + deleted={FullPrefix, gb_sets:new()}}} + end. %%-------------------------------------------------------------------- %% @private @@ -71,9 +85,26 @@ init([FullPrefix]) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_call(_Request, _From, State) -> - Reply = ok, - {reply, Reply, State}. +handle_call({force_cleanup, AgeInSecs}, From, #state{tref=TRef, + waiting=Waiting, + deleted=Deleted} = State) -> + case Waiting of + undefined -> + case TRef of + undefined -> + ignore; + _ -> + erlang:cancel_timer(TRef) + end, + NewDeleted = cleanup_tombstones(Deleted), + NewTRef = erlang:send_after(AgeInSecs * 1000, self(), cleanup), + {noreply, State#state{deleted=NewDeleted, + tref=NewTRef, + waiting=From}}; + _ -> + {reply, {error, already_scheduled}, State} + end. + %%-------------------------------------------------------------------- %% @private @@ -98,14 +129,25 @@ handle_cast(_Msg, State) -> %% {stop, Reason, State} %% @end %%-------------------------------------------------------------------- -handle_info(cleanup, #state{full_prefix=FullPrefix, +handle_info(cleanup, #state{waiting=Waiting, interval=Interval, deleted=Deleted} = State) -> - {_, NewDeleted} = plumtree_metadata:fold(fun cleanup_tombstones/2, - {FullPrefix, Deleted}, - FullPrefix, [{resolver, lww}]), - erlang:send_after(Interval, self(), cleanup), - {noreply, State#state{deleted=NewDeleted}}. + NewDeleted = cleanup_tombstones(Deleted), + case Waiting of + undefined -> + ignore; + _ -> + gen_server:reply(Waiting, ok) + end, + NewState = + case Interval of + undefined -> + State#state{waiting=undefined, deleted=NewDeleted}; + _ -> + NewTRef = erlang:send_after(Interval, self(), cleanup), + State#state{waiting=undefined, deleted=NewDeleted, tref=NewTRef} + end, + {noreply, NewState}. %%-------------------------------------------------------------------- %% @private @@ -135,14 +177,26 @@ code_change(_OldVsn, State, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -cleanup_tombstones({Key, ?TOMBSTONE}, {FullPrefix, Set}) -> +cleanup_tombstones({FullPrefix, Set}) -> + T1 = os:timestamp(), + {_, Marked, Deleted, Total, NewSet} = plumtree_metadata:fold( + fun cleanup_tombstones/2, + {FullPrefix, 0, 0, 0, Set}, + FullPrefix, [{resolver, lww}]), + T2 = os:timestamp(), + DiffInMs = timer:now_diff(T2, T1) div 1000, + lager:info("completed cleanup for ~p in ~pms. deleted ~p, marked ~p, good ~p", + [FullPrefix, DiffInMs, Deleted, Marked, Total]), + {FullPrefix, NewSet}. + +cleanup_tombstones({Key, ?TOMBSTONE}, {FullPrefix, Marked, Deleted, Total, Set}) -> case gb_sets:is_element(Key, Set) of true -> %% delete plumtree_metadata_manager:force_delete({FullPrefix, Key}), - {FullPrefix, gb_sets:delete(Key, Set)}; + {FullPrefix, Marked, Deleted + 1, Total, gb_sets:delete(Key, Set)}; false -> - {FullPrefix, gb_sets:add(Key, Set)} + {FullPrefix, Marked + 1, Deleted, Total, gb_sets:add(Key, Set)} end; -cleanup_tombstones({Key, _}, {FullPrefix, Set}) -> - {FullPrefix, gb_sets:delete_any(Key, Set)}. +cleanup_tombstones({Key, _}, {FullPrefix, Marked, Deleted, Total, Set}) -> + {FullPrefix, Marked, Deleted, Total + 1, gb_sets:delete_any(Key, Set)}. diff --git a/src/plumtree_metadata_cleanup_sup.erl b/src/plumtree_metadata_cleanup_sup.erl index bfd15c6..a8424fa 100644 --- a/src/plumtree_metadata_cleanup_sup.erl +++ b/src/plumtree_metadata_cleanup_sup.erl @@ -4,6 +4,8 @@ %% API functions -export([start_link/0, + get_pid/1, + get_full_prefix_and_pid/0, add_full_prefix/1]). %% Supervisor callbacks @@ -31,6 +33,19 @@ add_full_prefix(FullPrefix) -> ?CHILD({plumtree_metadata_cleanup, FullPrefix}, plumtree_metadata_cleanup, worker, [FullPrefix])). +get_pid(FullPrefix) -> + case lists:keyfind({plumtree_metadata_cleanup, FullPrefix}, 1, + supervisor:which_children(?MODULE)) of + {_, Pid, _, _} when is_pid(Pid) -> + {ok, Pid}; + _ -> + {error, not_found} + end. + +get_full_prefix_and_pid() -> + [{FullPrefix, Pid} || {{plumtree_metadata_cleanup, FullPrefix}, Pid, _, _} + <- supervisor:which_children(?MODULE), is_pid(Pid)]. + %%%=================================================================== %%% Supervisor callbacks diff --git a/src/plumtree_peer_service.erl b/src/plumtree_peer_service.erl index 06745e5..9805cf9 100644 --- a/src/plumtree_peer_service.erl +++ b/src/plumtree_peer_service.erl @@ -27,7 +27,8 @@ attempt_join/2, leave/1, stop/0, - stop/1 + stop/1, + stop/2 ]). %% @doc prepare node to join a cluster @@ -40,7 +41,7 @@ join(NodeStr, Auto) when is_list(NodeStr) -> join(Node, Auto) when is_atom(Node) -> join(node(), Node, Auto). -%% @doc Initiate join. Nodes cannot join themselves. +%% @doc Initiate join. Nodes cannot join themselves. join(Node, Node, _) -> {error, self_join}; join(_, Node, _Auto) -> @@ -59,15 +60,15 @@ attempt_join(Node) -> attempt_join(Node, Local) -> {ok, Remote} = gen_server:call({plumtree_peer_service_gossip, Node}, send_state), - Merged = riak_dt_orswot:merge(Remote, Local), + Merged = riak_dt_orswot:merge(Remote, Local), _ = plumtree_peer_service_manager:update_state(Merged), %% broadcast to all nodes %% get peer list Members = riak_dt_orswot:value(Merged), _ = [gen_server:cast({plumtree_peer_service_gossip, P}, {receive_state, Merged}) || P <- Members, P /= node()], ok. - -leave(_Args) when is_list(_Args) -> + +leave(Args) when is_list(Args) -> {ok, Local} = plumtree_peer_service_manager:get_local_state(), {ok, Actor} = plumtree_peer_service_manager:get_actor(), {ok, Leave} = riak_dt_orswot:update({remove, node()}, Actor, Local), @@ -82,21 +83,26 @@ leave(_Args) when is_list(_Args) -> [] -> %% leaving the cluster shuts down the node plumtree_peer_service_manager:delete_state(), - stop("Leaving cluster"); + stop("Leaving cluster", Args); _ -> - leave([]) + leave(Args) end; {error, singleton} -> lager:warning("Cannot leave, not a member of a cluster.") end; -leave(_Args) -> - leave([]). +leave(Args) -> + leave(Args). stop() -> - stop("received stop request"). + stop("received stop request", []). stop(Reason) -> + stop(Reason, []). + +stop(Reason, Args) -> + StopFun = proplists:get_value(stop_fun, Args, fun() -> ok end), lager:notice("~p", [Reason]), + StopFun(), ok. random_peer(Leave) -> diff --git a/test/metadata_SUITE.erl b/test/metadata_SUITE.erl index 7b30517..9abcbe6 100644 --- a/test/metadata_SUITE.erl +++ b/test/metadata_SUITE.erl @@ -31,6 +31,7 @@ -export([ read_write_delete_test/1, + manual_force_cleanup_test/1, partitioned_cluster_test/1, siblings_test/1 ]). @@ -72,7 +73,8 @@ end_per_testcase(_, _Config) -> ok. all() -> - [read_write_delete_test, partitioned_cluster_test, siblings_test]. + [read_write_delete_test, manual_force_cleanup_test, + partitioned_cluster_test, siblings_test]. read_write_delete_test(Config) -> [Node1|OtherNodes] = Nodes = proplists:get_value(nodes, Config), @@ -93,6 +95,30 @@ read_write_delete_test(Config) -> ok = wait_until_converged(Nodes, {foo, bar}, baz, undefined), ok. +manual_force_cleanup_test(Config) -> + ok = read_write_delete_test(Config), + Nodes = proplists:get_value(nodes, Config), + {Res1, _} = rpc:multicall(Nodes, plumtree_metadata_manager, size, [{foo, bar}]), + + %% every node still has one tombstone entry in the ets cache + ?assertEqual(length(Nodes), lists:sum(Res1)), + + lists:foreach(fun(Node) -> + %% this blocks for 5 seconds, during this time + %% other nodes will discover the discrepancy in the + %% hashtree and will try to replicate the proper + %% tombstone... which we don't merge in read_write_merge. + %% eventually all tombstones are removed. + rpc:call(Node, plumtree_metadata, cleanup_all, [5]) + end, Nodes), + {Res2, _} = rpc:multicall(Nodes, plumtree_metadata_manager, size, [{foo, bar}]), + + ?assertEqual(0, lists:sum(Res2)), + ok. + + + + partitioned_cluster_test(Config) -> [Node1|OtherNodes] = Nodes = proplists:get_value(nodes, Config), [?assertEqual(ok, rpc:call(Node, plumtree_peer_service, join, [Node1])) From 8ec68c4d757274039209b76b0b4fa829ae164b02 Mon Sep 17 00:00:00 2001 From: Andre Graf Date: Wed, 22 Jul 2015 16:47:45 +0200 Subject: [PATCH 06/11] bugfix and simple refactorings and utility functions --- src/plumtree_metadata_cleanup.erl | 83 ++++++++++++++++++++++++++++--- src/plumtree_metadata_manager.erl | 44 +++++++++------- test/metadata_SUITE.erl | 1 + 3 files changed, 101 insertions(+), 27 deletions(-) diff --git a/src/plumtree_metadata_cleanup.erl b/src/plumtree_metadata_cleanup.erl index 7bcd93d..1d932bf 100644 --- a/src/plumtree_metadata_cleanup.erl +++ b/src/plumtree_metadata_cleanup.erl @@ -5,7 +5,11 @@ %% API functions -export([start_link/1, force_cleanup/1, - force_cleanup/2]). + force_cleanup/2, + set_interval/1, + set_interval/2, + cancel_cleanup/0, + cancel_cleanup/1]). %% gen_server callbacks -export([init/1, @@ -27,19 +31,45 @@ start_link(FullPrefix) -> gen_server:start_link(?MODULE, [FullPrefix], []). force_cleanup(AgeInSecs) when AgeInSecs > 0 -> + for_all_cleaners(fun force_cleanup/2, [AgeInSecs]). + +force_cleanup(FullPrefix, AgeInSecs) when is_tuple(FullPrefix) -> + for_cleaner(FullPrefix, fun force_cleanup/2, [AgeInSecs]); +force_cleanup(Pid, AgeInSecs) when is_pid(Pid) and (AgeInSecs > 0) -> + gen_server:call(Pid, {force_cleanup, AgeInSecs}, infinity). + +set_interval(Int) when Int >= 0 -> + for_all_cleaners(fun set_interval/2, [Int]). + +set_interval(FullPrefix, Int) when is_tuple(FullPrefix) and (Int >= 0) -> + for_cleaner(FullPrefix, fun set_interval/2, [Int]); +set_interval(Pid, Int) when is_pid(Pid) and (Int >= 0) -> + gen_server:call(Pid, {set_interval, Int}, infinity). + +cancel_cleanup() -> + for_all_cleaners(fun cancel_cleanup/1, []). + +cancel_cleanup(FullPrefix) when is_tuple(FullPrefix) -> + for_cleaner(FullPrefix, fun cancel_cleanup/1, []); +cancel_cleanup(Pid) when is_pid(Pid) -> + gen_server:call(Pid, cancel_cleanup, infinity). + + +for_all_cleaners(Fun, Args) -> lists:foreach(fun({_FullPrefix, Pid}) -> - force_cleanup(Pid, AgeInSecs) + apply(Fun, [Pid|Args]) end, plumtree_metadata_cleanup_sup:get_full_prefix_and_pid()). -force_cleanup(FullPrefix, AgeInSecs) when is_tuple(FullPrefix) -> +for_cleaner(FullPrefix, Fun, Args) -> case plumtree_metadata_cleanup_sup:get_pid(FullPrefix) of {ok, Pid} -> - force_cleanup(Pid, AgeInSecs); + apply(Fun, [Pid|Args]); E -> E - end; -force_cleanup(Pid, AgeInSecs) when is_pid(Pid) and (AgeInSecs > 0) -> - gen_server:call(Pid, {force_cleanup, AgeInSecs}, infinity). + end. + + + %%%=================================================================== %%% gen_server callbacks @@ -103,9 +133,46 @@ handle_call({force_cleanup, AgeInSecs}, From, #state{tref=TRef, waiting=From}}; _ -> {reply, {error, already_scheduled}, State} + end; +handle_call({set_interval, IntInSecs}, _From, #state{waiting=Waiting, + tref=TRef} = State) -> + case Waiting of + undefined -> + {NewInterval, NewTRef} = + case {IntInSecs, TRef} of + {0, undefined} -> + {undefined, undefined}; + {0, _} -> + erlang:cancel_timer(TRef), + {undefined, undefined}; + {_, undefined} -> + NewIntInSecs = IntInSecs * 1000, + {NewIntInSecs, erlang:send_after(NewIntInSecs, self(), cleanup)}; + {_, _} -> + erlang:cancel_timer(TRef), + NewIntInSecs = IntInSecs * 1000, + {NewIntInSecs, erlang:send_after(NewIntInSecs, self(), cleanup)} + end, + {reply, ok, State#state{interval=NewInterval, tref=NewTRef}}; + _ -> + {reply, {error, waiting_for_cleanup}, State} + end; +handle_call(cancel_cleanup, _From, #state{waiting=Waiting, + interval=Interval} = State) -> + case Waiting of + undefined -> + {reply, {error, no_waiting_proc}, State}; + _ -> + gen_server:reply(Waiting, canceled), + case Interval of + undefined -> + {reply, ok, State#state{waiting=undefined}}; + _ -> + NewTRef = erlang:send_after(Interval, self(), cleanup), + {reply, ok, State#state{waiting=undefined, tref=NewTRef}} + end end. - %%-------------------------------------------------------------------- %% @private %% @doc diff --git a/src/plumtree_metadata_manager.erl b/src/plumtree_metadata_manager.erl index c52c4e1..3586c1b 100644 --- a/src/plumtree_metadata_manager.erl +++ b/src/plumtree_metadata_manager.erl @@ -223,7 +223,9 @@ put({{Prefix, SubPrefix}, _Key}=PKey, Context, ValueOrFun) force_delete({{Prefix, SubPrefix}, _Key}=PKey) when (is_binary(Prefix) orelse is_atom(Prefix)) andalso (is_binary(SubPrefix) orelse is_atom(SubPrefix)) -> - gen_server:call(?SERVER, {force_delete, PKey}, infinity). + ok = plumtree_metadata_hashtree:delete(PKey), + plumtree_metadata_leveldb_instance:delete(PKey), + ok. %% @doc same as merge/2 but merges the object on `Node' -spec merge(node(), {metadata_pkey(), undefined | metadata_context()}, metadata_object()) -> boolean(). @@ -326,9 +328,6 @@ handle_call({merge, PKey, Obj}, _From, State) -> handle_call({get, PKey}, _From, State) -> Result = read(PKey), {reply, Result, State}; -handle_call({force_delete, PKey}, _From, State) -> - {Result, NewState} = force_delete(PKey, State), - {reply, Result, NewState}; handle_call({open_remote_iterator, Pid, FullPrefix, KeyMatch}, _From, State) -> Iterator = new_remote_iterator(Pid, FullPrefix, KeyMatch), {reply, Iterator, State}; @@ -462,12 +461,28 @@ read_modify_write(PKey, Context, ValueOrFun) -> read_merge_write(PKey, Obj) -> Existing = read(PKey), -<<<<<<< HEAD - case plumtree_metadata_object:reconcile(Obj, Existing) of - false -> false; - {true, Reconciled} -> - store(PKey, Reconciled), - true + IsDeleted = + case {Obj, Existing} of + {undefined, undefined} -> + true; + {undefined, O} -> + plumtree_metadata_object:values(O) == ['$deleted']; + {O, undefined} -> + plumtree_metadata_object:values(O) == ['$deleted']; + _ -> + false + end, + case IsDeleted of + true -> + false; + false -> + case plumtree_metadata_object:reconcile(Obj, Existing) of + false -> + false; + {true, Reconciled} -> + store(PKey, Reconciled), + true + end end. store({FullPrefix, Key}=PKey, Metadata) -> @@ -493,15 +508,6 @@ store({FullPrefix, Key}=PKey, Metadata) -> ets:lookup(?SUBS, FullPrefix)), Metadata. -force_delete({FullPrefix, Key}=PKey, - #state{storage_mod=Mod, - storage_mod_state=ModSt} = State) -> - Tab = ets_tab(FullPrefix), - ets:delete(Tab, Key), - {ok, NewModSt} = Mod:delete(FullPrefix, Key, ModSt), - ok = plumtree_metadata_hashtree:delete(PKey), - {ok, State#state{storage_mod_state=NewModSt}}. - trigger_subscription_event(FullPrefix, Event, [{FullPrefix, {Pid, _}}|Rest]) -> Pid ! Event, trigger_subscription_event(FullPrefix, Event, Rest); diff --git a/test/metadata_SUITE.erl b/test/metadata_SUITE.erl index 9abcbe6..17ad944 100644 --- a/test/metadata_SUITE.erl +++ b/test/metadata_SUITE.erl @@ -99,6 +99,7 @@ manual_force_cleanup_test(Config) -> ok = read_write_delete_test(Config), Nodes = proplists:get_value(nodes, Config), {Res1, _} = rpc:multicall(Nodes, plumtree_metadata_manager, size, [{foo, bar}]), + rpc:multicall(Nodes, plumtree_metadata_cleanup_sup, add_full_prefix, [{foo, bar}]), %% every node still has one tombstone entry in the ets cache ?assertEqual(length(Nodes), lists:sum(Res1)), From 0b7140d409488ba4ef52167e358a9caf331ed4f0 Mon Sep 17 00:00:00 2001 From: Andre Graf Date: Mon, 10 Aug 2015 16:43:15 +0200 Subject: [PATCH 07/11] using incremental tombstone gc --- src/plumtree.app.src | 1 + src/plumtree_metadata.erl | 14 +- src/plumtree_metadata_cleanup.erl | 269 --------------------- src/plumtree_metadata_cleanup_sup.erl | 72 ------ src/plumtree_metadata_leveldb_instance.erl | 90 ++++++- src/plumtree_metadata_leveldb_iterator.erl | 7 +- src/plumtree_sup.erl | 1 - test/metadata_SUITE.erl | 12 +- test/plumtree_test_utils.erl | 3 + 9 files changed, 89 insertions(+), 380 deletions(-) delete mode 100644 src/plumtree_metadata_cleanup.erl delete mode 100644 src/plumtree_metadata_cleanup_sup.erl diff --git a/src/plumtree.app.src b/src/plumtree.app.src index 3b5a0b2..3b3926b 100644 --- a/src/plumtree.app.src +++ b/src/plumtree.app.src @@ -16,6 +16,7 @@ {env, [ {plumtree_data_dir, "data"}, {nr_of_meta_instances, 12}, + {gc_grace_seconds, 0}, % tombstones wont be gc'd {meta_leveldb_opts, [ {sync, false}, {total_leveldb_mem_percent, 6}, diff --git a/src/plumtree_metadata.erl b/src/plumtree_metadata.erl index 741ca38..d968cf8 100644 --- a/src/plumtree_metadata.erl +++ b/src/plumtree_metadata.erl @@ -39,9 +39,7 @@ put/3, put/4, delete/2, - delete/3, - cleanup/2, - cleanup_all/1]). + delete/3]). -include("plumtree_metadata.hrl"). @@ -334,16 +332,6 @@ delete(FullPrefix, Key) -> delete(FullPrefix, Key, _Opts) -> put(FullPrefix, Key, ?TOMBSTONE, []). - --spec cleanup(metadata_prefix(), pos_integer()) -> ok | {error, any()}. -cleanup(FullPrefix, AgeInSecs) -> - plumtree_metadata_cleanup:force_cleanup(FullPrefix, AgeInSecs). - --spec cleanup_all(metadata_prefix()) -> ok | {error, any()}. -cleanup_all(AgeInSecs) -> - plumtree_metadata_cleanup:force_cleanup(AgeInSecs). - - %%%=================================================================== %%% Internal functions %%%=================================================================== diff --git a/src/plumtree_metadata_cleanup.erl b/src/plumtree_metadata_cleanup.erl deleted file mode 100644 index 1d932bf..0000000 --- a/src/plumtree_metadata_cleanup.erl +++ /dev/null @@ -1,269 +0,0 @@ --module(plumtree_metadata_cleanup). - --behaviour(gen_server). - -%% API functions --export([start_link/1, - force_cleanup/1, - force_cleanup/2, - set_interval/1, - set_interval/2, - cancel_cleanup/0, - cancel_cleanup/1]). - -%% gen_server callbacks --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - --record(state, {interval, deleted, tref, waiting}). - --define(TOMBSTONE, '$deleted'). - -%%%=================================================================== -%%% API functions -%%%=================================================================== - -start_link(FullPrefix) -> - gen_server:start_link(?MODULE, [FullPrefix], []). - -force_cleanup(AgeInSecs) when AgeInSecs > 0 -> - for_all_cleaners(fun force_cleanup/2, [AgeInSecs]). - -force_cleanup(FullPrefix, AgeInSecs) when is_tuple(FullPrefix) -> - for_cleaner(FullPrefix, fun force_cleanup/2, [AgeInSecs]); -force_cleanup(Pid, AgeInSecs) when is_pid(Pid) and (AgeInSecs > 0) -> - gen_server:call(Pid, {force_cleanup, AgeInSecs}, infinity). - -set_interval(Int) when Int >= 0 -> - for_all_cleaners(fun set_interval/2, [Int]). - -set_interval(FullPrefix, Int) when is_tuple(FullPrefix) and (Int >= 0) -> - for_cleaner(FullPrefix, fun set_interval/2, [Int]); -set_interval(Pid, Int) when is_pid(Pid) and (Int >= 0) -> - gen_server:call(Pid, {set_interval, Int}, infinity). - -cancel_cleanup() -> - for_all_cleaners(fun cancel_cleanup/1, []). - -cancel_cleanup(FullPrefix) when is_tuple(FullPrefix) -> - for_cleaner(FullPrefix, fun cancel_cleanup/1, []); -cancel_cleanup(Pid) when is_pid(Pid) -> - gen_server:call(Pid, cancel_cleanup, infinity). - - -for_all_cleaners(Fun, Args) -> - lists:foreach(fun({_FullPrefix, Pid}) -> - apply(Fun, [Pid|Args]) - end, plumtree_metadata_cleanup_sup:get_full_prefix_and_pid()). - -for_cleaner(FullPrefix, Fun, Args) -> - case plumtree_metadata_cleanup_sup:get_pid(FullPrefix) of - {ok, Pid} -> - apply(Fun, [Pid|Args]); - E -> - E - end. - - - - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Initializes the server -%% -%% @spec init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% @end -%%-------------------------------------------------------------------- -init([FullPrefix]) -> - CleanupInterval = app_helper:get_prop_or_env(cleanup_interval, [], - plumtree, undefined), - case CleanupInterval of - undefined -> - %% no cleanup happens - {ok, #state{interval=undefined, - deleted={FullPrefix, gb_sets:new()}}}; - _ when CleanupInterval > 0 -> - TRef = erlang:send_after(0, self(), cleanup), - {ok, #state{interval=CleanupInterval * 1000, - tref=TRef, - deleted={FullPrefix, gb_sets:new()}}} - end. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling call messages -%% -%% @spec handle_call(Request, From, State) -> -%% {reply, Reply, State} | -%% {reply, Reply, State, Timeout} | -%% {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, Reply, State} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_call({force_cleanup, AgeInSecs}, From, #state{tref=TRef, - waiting=Waiting, - deleted=Deleted} = State) -> - case Waiting of - undefined -> - case TRef of - undefined -> - ignore; - _ -> - erlang:cancel_timer(TRef) - end, - NewDeleted = cleanup_tombstones(Deleted), - NewTRef = erlang:send_after(AgeInSecs * 1000, self(), cleanup), - {noreply, State#state{deleted=NewDeleted, - tref=NewTRef, - waiting=From}}; - _ -> - {reply, {error, already_scheduled}, State} - end; -handle_call({set_interval, IntInSecs}, _From, #state{waiting=Waiting, - tref=TRef} = State) -> - case Waiting of - undefined -> - {NewInterval, NewTRef} = - case {IntInSecs, TRef} of - {0, undefined} -> - {undefined, undefined}; - {0, _} -> - erlang:cancel_timer(TRef), - {undefined, undefined}; - {_, undefined} -> - NewIntInSecs = IntInSecs * 1000, - {NewIntInSecs, erlang:send_after(NewIntInSecs, self(), cleanup)}; - {_, _} -> - erlang:cancel_timer(TRef), - NewIntInSecs = IntInSecs * 1000, - {NewIntInSecs, erlang:send_after(NewIntInSecs, self(), cleanup)} - end, - {reply, ok, State#state{interval=NewInterval, tref=NewTRef}}; - _ -> - {reply, {error, waiting_for_cleanup}, State} - end; -handle_call(cancel_cleanup, _From, #state{waiting=Waiting, - interval=Interval} = State) -> - case Waiting of - undefined -> - {reply, {error, no_waiting_proc}, State}; - _ -> - gen_server:reply(Waiting, canceled), - case Interval of - undefined -> - {reply, ok, State#state{waiting=undefined}}; - _ -> - NewTRef = erlang:send_after(Interval, self(), cleanup), - {reply, ok, State#state{waiting=undefined, tref=NewTRef}} - end - end. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling cast messages -%% -%% @spec handle_cast(Msg, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_cast(_Msg, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling all non call/cast messages -%% -%% @spec handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- -handle_info(cleanup, #state{waiting=Waiting, - interval=Interval, - deleted=Deleted} = State) -> - NewDeleted = cleanup_tombstones(Deleted), - case Waiting of - undefined -> - ignore; - _ -> - gen_server:reply(Waiting, ok) - end, - NewState = - case Interval of - undefined -> - State#state{waiting=undefined, deleted=NewDeleted}; - _ -> - NewTRef = erlang:send_after(Interval, self(), cleanup), - State#state{waiting=undefined, deleted=NewDeleted, tref=NewTRef} - end, - {noreply, NewState}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. -%% -%% @spec terminate(Reason, State) -> void() -%% @end -%%-------------------------------------------------------------------- -terminate(_Reason, _State) -> - ok. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Convert process state when code is changed -%% -%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} -%% @end -%%-------------------------------------------------------------------- -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== -cleanup_tombstones({FullPrefix, Set}) -> - T1 = os:timestamp(), - {_, Marked, Deleted, Total, NewSet} = plumtree_metadata:fold( - fun cleanup_tombstones/2, - {FullPrefix, 0, 0, 0, Set}, - FullPrefix, [{resolver, lww}]), - T2 = os:timestamp(), - DiffInMs = timer:now_diff(T2, T1) div 1000, - lager:info("completed cleanup for ~p in ~pms. deleted ~p, marked ~p, good ~p", - [FullPrefix, DiffInMs, Deleted, Marked, Total]), - {FullPrefix, NewSet}. - -cleanup_tombstones({Key, ?TOMBSTONE}, {FullPrefix, Marked, Deleted, Total, Set}) -> - case gb_sets:is_element(Key, Set) of - true -> - %% delete - plumtree_metadata_manager:force_delete({FullPrefix, Key}), - {FullPrefix, Marked, Deleted + 1, Total, gb_sets:delete(Key, Set)}; - false -> - {FullPrefix, Marked + 1, Deleted, Total, gb_sets:add(Key, Set)} - end; -cleanup_tombstones({Key, _}, {FullPrefix, Marked, Deleted, Total, Set}) -> - {FullPrefix, Marked, Deleted, Total + 1, gb_sets:delete_any(Key, Set)}. diff --git a/src/plumtree_metadata_cleanup_sup.erl b/src/plumtree_metadata_cleanup_sup.erl deleted file mode 100644 index a8424fa..0000000 --- a/src/plumtree_metadata_cleanup_sup.erl +++ /dev/null @@ -1,72 +0,0 @@ --module(plumtree_metadata_cleanup_sup). - --behaviour(supervisor). - -%% API functions --export([start_link/0, - get_pid/1, - get_full_prefix_and_pid/0, - add_full_prefix/1]). - -%% Supervisor callbacks --export([init/1]). - --define(CHILD(Id, Mod, Type, Args), {Id, {Mod, start_link, Args}, - permanent, 5000, Type, [Mod]}). - -%%%=================================================================== -%%% API functions -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @doc -%% Starts the supervisor -%% -%% @spec start_link() -> {ok, Pid} | ignore | {error, Error} -%% @end -%%-------------------------------------------------------------------- -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -add_full_prefix(FullPrefix) -> - supervisor:start_child(?MODULE, - ?CHILD({plumtree_metadata_cleanup, FullPrefix}, - plumtree_metadata_cleanup, worker, [FullPrefix])). - -get_pid(FullPrefix) -> - case lists:keyfind({plumtree_metadata_cleanup, FullPrefix}, 1, - supervisor:which_children(?MODULE)) of - {_, Pid, _, _} when is_pid(Pid) -> - {ok, Pid}; - _ -> - {error, not_found} - end. - -get_full_prefix_and_pid() -> - [{FullPrefix, Pid} || {{plumtree_metadata_cleanup, FullPrefix}, Pid, _, _} - <- supervisor:which_children(?MODULE), is_pid(Pid)]. - - -%%%=================================================================== -%%% Supervisor callbacks -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Whenever a supervisor is started using supervisor:start_link/[2,3], -%% this function is called by the new process to find out about -%% restart strategy, maximum restart frequency and child -%% specifications. -%% -%% @spec init(Args) -> {ok, {SupFlags, [ChildSpec]}} | -%% ignore | -%% {error, Reason} -%% @end -%%-------------------------------------------------------------------- -init([]) -> - {ok, {{one_for_one, 5, 10}, []}}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== diff --git a/src/plumtree_metadata_leveldb_instance.erl b/src/plumtree_metadata_leveldb_instance.erl index bba4edf..d5b80f8 100644 --- a/src/plumtree_metadata_leveldb_instance.erl +++ b/src/plumtree_metadata_leveldb_instance.erl @@ -33,6 +33,7 @@ status/1, data_size/1, iterator/2, + iterator_move/2, iterator_close/2]). %% gen_server callbacks @@ -50,10 +51,13 @@ read_opts = [], write_opts = [], fold_opts = [{fill_cache, false}], - open_iterators = [] + open_iterators = [], + grace_secs, + cleanup=maps:new() }). -type config() :: [{atom(), term()}]. +-define(TOMBSTONE, '$deleted'). %%%=================================================================== %%% API functions @@ -100,9 +104,13 @@ data_size(InstanceId) -> iterator(Instance, KeysOnly) when is_pid(Instance) or is_atom(Instance) -> gen_server:call(Instance, {new_iterator, self(), KeysOnly}, infinity). -iterator_close(Instance, Itr) when is_pid(Instance) or is_atom(Instance) -> +iterator_close(Instance, {_,_} = Itr) when is_pid(Instance) or is_atom(Instance) -> gen_server:call(Instance, {close_iterator, Itr}, infinity). +iterator_move({MRef, Itr}, FirstKey) when is_reference(MRef) -> + eleveldb:iterator_move(Itr, FirstKey). + + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -127,12 +135,18 @@ init([InstanceId, Opts]) -> "meta"), DataDir2 = filename:join(DataDir1, integer_to_list(InstanceId)), + GraceSeconds = app_helper:get_prop_or_env(gc_grace_seconds, Opts, plumtree), + %% Initialize state S0 = init_state(DataDir2, Opts), process_flag(trap_exit, true), case open_db(S0) of {ok, State} -> - {ok, State}; + case GraceSeconds > 0 of + true -> timer:send_interval(GraceSeconds * 1000, force_cleanup); + false -> ignore + end, + {ok, init_cleanup(State#state{grace_secs=GraceSeconds})}; {error, Reason} -> {stop, Reason} end. @@ -161,20 +175,22 @@ handle_call({get, Key}, _From, #state{read_opts=ReadOpts, ref=Ref} = State) -> {reply, {error, Reason}, State} end; handle_call({put, Key, Value}, _From, #state{write_opts=WriteOpts, ref=Ref} = State) -> - Update = [{put, sext:encode(Key), term_to_binary(Value)}], + Update = {put, sext:encode(Key), term_to_binary(Value)}, + {NewCleanup, CleanupOps} = maybe_trigger_cleanup(Key, Value, State), %% Perform the write... - case eleveldb:write(Ref, Update, WriteOpts) of + case eleveldb:write(Ref, [Update|CleanupOps], WriteOpts) of ok -> - {reply, ok, State}; + {reply, ok, State#state{cleanup=NewCleanup}}; {error, Reason} -> {reply, {error, Reason}, State} end; handle_call({delete, Key}, _From, #state{write_opts=WriteOpts, ref=Ref} = State) -> - Update = [{delete, sext:encode(Key)}], + Update = {delete, sext:encode(Key)}, + {NewCleanup, CleanupOps} = maybe_trigger_cleanup(State), %% Perform the write... - case eleveldb:write(Ref, Update, WriteOpts) of + case eleveldb:write(Ref, [Update|CleanupOps], WriteOpts) of ok -> - {reply, ok, State}; + {reply, ok, State#state{cleanup=NewCleanup}}; {error, Reason} -> {reply, {error, Reason}, State} end; @@ -202,9 +218,10 @@ handle_call({new_iterator, Owner, KeysOnly}, _From, #state{ref=Ref, fold_opts=Fo false -> eleveldb:iterator(Ref, FoldOpts) end, - {reply, Itr, State#state{open_iterators=[{MRef, Itr}|OpenIterators]}}; -handle_call({close_iterator, Itr}, _From, #state{open_iterators=OpenIterators} = State) -> - {MRef, _} = lists:keyfind(Itr, 2, OpenIterators), + {reply, {MRef, Itr}, State#state{open_iterators=[{MRef, Itr}|OpenIterators]}}; +handle_call({close_iterator, {MRef, _}}, _From, #state{open_iterators=OpenIterators} = State) -> + {MRef, Itr} = lists:keyfind(MRef, 1, OpenIterators), + eleveldb:iterator_close(Itr), demonitor(MRef, [flush]), {reply, ok, State#state{open_iterators=lists:keydelete(MRef, 1, OpenIterators)}}. @@ -239,6 +256,11 @@ handle_info({'DOWN', MRef, process, _, _}, #state{open_iterators=OpenIterators} eleveldb:iterator_close(Itr) end, {noreply, State#state{open_iterators=lists:keydelete(MRef, 1, OpenIterators)}}; +handle_info(force_cleanup, #state{ref=Ref, write_opts=WriteOpts} = State) -> + {NewCleanup, CleanupOps} = maybe_trigger_cleanup(State), + %% Perform the write... + eleveldb:write(Ref, CleanupOps, WriteOpts), + {noreply, State#state{cleanup=NewCleanup}}; handle_info(_Info, State) -> {noreply, State}. @@ -364,3 +386,47 @@ open_db(State0, RetriesLeft, _) -> {error, Reason} -> {error, Reason} end. + +maybe_trigger_cleanup(Key, Val, #state{grace_secs=GS, cleanup=Cleanup}) when GS > 0 -> + Now = epoch(), + Cleanup1 = + case plumtree_metadata_object:values(Val) of + [?TOMBSTONE] -> + maps:put(Key, Now, Cleanup); + _ -> + maps:remove(Key, Cleanup) + end, + incr_cleanup(Cleanup1, Now, GS); +maybe_trigger_cleanup(_, _, #state{cleanup=Cleanup}) -> + {Cleanup, []}. + +maybe_trigger_cleanup(#state{grace_secs=GS, cleanup=Cleanup}) when GS > 0-> + incr_cleanup(Cleanup, epoch(), GS); +maybe_trigger_cleanup(#state{cleanup=Cleanup}) -> + {Cleanup, []}. + +incr_cleanup(Cleanup, Now, GS) -> + KeysToDelete = + maps:fold(fun(K, V, Acc) when (Now - V) > GS -> + [K|Acc]; + (_, _, Acc) -> + Acc + end, [], Cleanup), + NewCleanup = maps:without(KeysToDelete, Cleanup), + {NewCleanup, [{delete, sext:encode(K)} || K <- KeysToDelete]}. + +init_cleanup(#state{cleanup=Cleanup, fold_opts=FoldOpts, ref=Ref} = State) -> + Now = epoch(), + NewCleanup = + eleveldb:fold(Ref, fun({BKey, BVal} , Acc) -> + case plumtree_metadata_object:values(binary_to_term(BVal)) of + [?TOMBSTONE] -> + maps:put(sext:decode(BKey), Now, Acc); + _ -> Acc + end + end, Cleanup, FoldOpts), + State#state{cleanup=NewCleanup}. + +epoch() -> + {MegaSecs, Secs, _} = os:timestamp(), + MegaSecs * 1000000 + Secs. diff --git a/src/plumtree_metadata_leveldb_iterator.erl b/src/plumtree_metadata_leveldb_iterator.erl index 6cb68d2..29491f6 100644 --- a/src/plumtree_metadata_leveldb_iterator.erl +++ b/src/plumtree_metadata_leveldb_iterator.erl @@ -188,12 +188,12 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%=================================================================== iterate([{Itr, _Instance}|_] = Instances, FullPrefix, KeyMatch, KeysOnly) -> - Res = eleveldb:iterator_move(Itr, prefetch), + Res = plumtree_metadata_leveldb_instance:iterator_move(Itr, prefetch), iterate(Res, Instances, FullPrefix, KeyMatch, KeysOnly); iterate([Instance|Rest], FullPrefix, KeyMatch, KeysOnly) when is_atom(Instance)-> Itr = plumtree_metadata_leveldb_instance:iterator(Instance, KeysOnly), FirstKey = first_key(FullPrefix), - Res = eleveldb:iterator_move(Itr, FirstKey), + Res = plumtree_metadata_leveldb_instance:iterator_move(Itr, FirstKey), iterate(Res, [{Itr, Instance}|Rest], FullPrefix, KeyMatch, KeysOnly); iterate([], _, _, _) -> done. @@ -223,6 +223,7 @@ iterate(OkVal, Instances, FullPrefix, KeyMatch, KeysOnly) when element(1, OkVal) end. prefix_match({_, Key}, undefined) -> {true, Key}; +prefix_match({_, Key}, {undefined, undefined}) -> {true, Key}; prefix_match({{Prefix, _}, Key}, {Prefix, undefined}) -> {true, Key}; prefix_match({{_, SubPrefix}, Key}, {undefined, SubPrefix}) -> {true, Key}; prefix_match({FullPrefix, Key}, {_,_} = FullPrefix) -> {true, Key}; @@ -232,6 +233,6 @@ key_match(_, undefined) -> true; key_match(Key, KeyMatch) -> KeyMatch(Key). first_key(undefined) -> first; -first_key({undefined, undefind}) -> first; +first_key({undefined, undefined}) -> first; first_key({Prefix, undefined}) -> sext:encode({{Prefix, ''}, ''}); first_key({_, _}=FullPrefix) -> sext:encode({FullPrefix, ''}). diff --git a/src/plumtree_sup.erl b/src/plumtree_sup.erl index 8c38c9a..17b57a0 100644 --- a/src/plumtree_sup.erl +++ b/src/plumtree_sup.erl @@ -37,7 +37,6 @@ init([]) -> [ ?CHILD(plumtree_metadata_leveldb_iterator_sup, supervisor), ?CHILD(plumtree_metadata_leveldb_instance_sup, supervisor), - ?CHILD(plumtree_metadata_cleanup_sup, supervisor), ?CHILD(plumtree_peer_service_gossip, worker), ?CHILD(plumtree_peer_service_events, worker), ?CHILD(plumtree_broadcast, worker), diff --git a/test/metadata_SUITE.erl b/test/metadata_SUITE.erl index 17ad944..36366cd 100644 --- a/test/metadata_SUITE.erl +++ b/test/metadata_SUITE.erl @@ -99,19 +99,11 @@ manual_force_cleanup_test(Config) -> ok = read_write_delete_test(Config), Nodes = proplists:get_value(nodes, Config), {Res1, _} = rpc:multicall(Nodes, plumtree_metadata_manager, size, [{foo, bar}]), - rpc:multicall(Nodes, plumtree_metadata_cleanup_sup, add_full_prefix, [{foo, bar}]), %% every node still has one tombstone entry in the ets cache ?assertEqual(length(Nodes), lists:sum(Res1)), - - lists:foreach(fun(Node) -> - %% this blocks for 5 seconds, during this time - %% other nodes will discover the discrepancy in the - %% hashtree and will try to replicate the proper - %% tombstone... which we don't merge in read_write_merge. - %% eventually all tombstones are removed. - rpc:call(Node, plumtree_metadata, cleanup_all, [5]) - end, Nodes), + %% grace_seconds is 10 seconds, worst case we have to wait 2x that long + timer:sleep(20000), {Res2, _} = rpc:multicall(Nodes, plumtree_metadata_manager, size, [{foo, bar}]), ?assertEqual(0, lists:sum(Res2)), diff --git a/test/plumtree_test_utils.erl b/test/plumtree_test_utils.erl index ccb7b16..43026d1 100644 --- a/test/plumtree_test_utils.erl +++ b/test/plumtree_test_utils.erl @@ -118,6 +118,9 @@ start_node(Name, Config, Case) -> ok = rpc:call(Node, application, set_env, [plumtree, plumtree_data_dir, NodeDir]), + ok = rpc:call(Node, application, set_env, [plumtree, + gc_grace_seconds, + 10]), {ok, _} = rpc:call(Node, application, ensure_all_started, [plumtree]), ok = wait_until(fun() -> case rpc:call(Node, plumtree_peer_service_manager, get_local_state, []) of From 09ea829a4564d0e0064c9006f23e20006433cd11 Mon Sep 17 00:00:00 2001 From: Andre Graf Date: Mon, 10 Aug 2015 16:54:15 +0200 Subject: [PATCH 08/11] dialyzer fix for keys-only iterator --- src/plumtree_metadata_manager.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/plumtree_metadata_manager.erl b/src/plumtree_metadata_manager.erl index 3586c1b..f1e55d7 100644 --- a/src/plumtree_metadata_manager.erl +++ b/src/plumtree_metadata_manager.erl @@ -71,7 +71,7 @@ prefix :: metadata_prefix(), match :: term(), pos :: term(), - obj :: {metadata_key(), metadata_object()} | undefined, + obj :: {metadata_key(), metadata_object() | undefined} | undefined, done :: boolean() }). From 57ba6156e5a6cdc36d3d6cef9c64c8ff2d51077d Mon Sep 17 00:00:00 2001 From: Lars Hesel Christensen Date: Fri, 24 Jun 2016 16:44:42 +0200 Subject: [PATCH 09/11] Code review cleanup --- src/plumtree_metadata_leveldb_instance.erl | 32 +--------------------- src/plumtree_metadata_manager.erl | 12 -------- test/metadata_SUITE.erl | 21 +------------- 3 files changed, 2 insertions(+), 63 deletions(-) diff --git a/src/plumtree_metadata_leveldb_instance.erl b/src/plumtree_metadata_leveldb_instance.erl index 780120b..37f3429 100644 --- a/src/plumtree_metadata_leveldb_instance.erl +++ b/src/plumtree_metadata_leveldb_instance.erl @@ -29,7 +29,6 @@ name/1, get/1, put/2, - delete/1, status/1, data_size/1, iterator/2, @@ -84,11 +83,6 @@ put(Key, Value) -> Name = name(InstanceId), gen_server:call(Name, {put, Key, Value}, infinity). -delete(Key) -> - InstanceId = plumtree_metadata_leveldb_instance_sup:get_instance_id_for_key(Key), - Name = name(InstanceId), - gen_server:call(Name, {delete, Key}, infinity). - name(Id) -> list_to_atom("plmtrlvld_" ++ integer_to_list(Id)). @@ -142,10 +136,6 @@ init([InstanceId, Opts]) -> process_flag(trap_exit, true), case open_db(S0) of {ok, State} -> - case GraceSeconds > 0 of - true -> timer:send_interval(GraceSeconds * 1000, force_cleanup); - false -> ignore - end, {ok, init_cleanup(State#state{grace_secs=GraceSeconds})}; {error, Reason} -> {stop, Reason} @@ -184,17 +174,7 @@ handle_call({put, Key, Value}, _From, #state{write_opts=WriteOpts, ref=Ref} = St {error, Reason} -> {reply, {error, Reason}, State} end; -handle_call({delete, Key}, _From, #state{write_opts=WriteOpts, ref=Ref} = State) -> - Update = {delete, sext:encode(Key)}, - {NewCleanup, CleanupOps} = maybe_trigger_cleanup(State), - %% Perform the write... - case eleveldb:write(Ref, [Update|CleanupOps], WriteOpts) of - ok -> - {reply, ok, State#state{cleanup=NewCleanup}}; - {error, Reason} -> - {reply, {error, Reason}, State} - end; -handle_call(status, _From, #state{ref=Ref} = State) -> +handle_call(status, _From, #state{ref=Ref, cleanup=Cleanup} = State) -> {ok, Stats} = eleveldb:status(Ref, <<"leveldb.stats">>), {ok, ReadBlockError} = eleveldb:status(Ref, <<"leveldb.ReadBlockError">>), {reply, [{stats, Stats}, {read_block_error, ReadBlockError}], State}; @@ -256,11 +236,6 @@ handle_info({'DOWN', MRef, process, _, _}, #state{open_iterators=OpenIterators} eleveldb:iterator_close(Itr) end, {noreply, State#state{open_iterators=lists:keydelete(MRef, 1, OpenIterators)}}; -handle_info(force_cleanup, #state{ref=Ref, write_opts=WriteOpts} = State) -> - {NewCleanup, CleanupOps} = maybe_trigger_cleanup(State), - %% Perform the write... - eleveldb:write(Ref, CleanupOps, WriteOpts), - {noreply, State#state{cleanup=NewCleanup}}; handle_info(_Info, State) -> {noreply, State}. @@ -400,11 +375,6 @@ maybe_trigger_cleanup(Key, Val, #state{grace_secs=GS, cleanup=Cleanup}) when GS maybe_trigger_cleanup(_, _, #state{cleanup=Cleanup}) -> {Cleanup, []}. -maybe_trigger_cleanup(#state{grace_secs=GS, cleanup=Cleanup}) when GS > 0-> - incr_cleanup(Cleanup, epoch(), GS); -maybe_trigger_cleanup(#state{cleanup=Cleanup}) -> - {Cleanup, []}. - incr_cleanup(Cleanup, Now, GS) -> KeysToDelete = maps:fold(fun(K, V, Acc) when (Now - V) > GS -> diff --git a/src/plumtree_metadata_manager.erl b/src/plumtree_metadata_manager.erl index 5a9774d..949d902 100644 --- a/src/plumtree_metadata_manager.erl +++ b/src/plumtree_metadata_manager.erl @@ -46,9 +46,6 @@ graft/1, exchange/1]). -%% used by cleanup --export([force_delete/1]). - %% utilities -export([size/1, subscribe/1, @@ -220,15 +217,6 @@ put({{Prefix, SubPrefix}, _Key}=PKey, Context, ValueOrFun) (is_binary(SubPrefix) orelse is_atom(SubPrefix)) -> read_modify_write(PKey, Context, ValueOrFun). -%% @doc forcefully deletes the key --spec force_delete(metadata_pkey()) -> ok. -force_delete({{Prefix, SubPrefix}, _Key}=PKey) - when (is_binary(Prefix) orelse is_atom(Prefix)) andalso - (is_binary(SubPrefix) orelse is_atom(SubPrefix)) -> - ok = plumtree_metadata_hashtree:delete(PKey), - plumtree_metadata_leveldb_instance:delete(PKey), - ok. - %% @doc same as merge/2 but merges the object on `Node' -spec merge(node(), {metadata_pkey(), undefined | metadata_context()}, metadata_object()) -> boolean(). merge(Node, {PKey, _Context}, Obj) -> diff --git a/test/metadata_SUITE.erl b/test/metadata_SUITE.erl index 36366cd..7b30517 100644 --- a/test/metadata_SUITE.erl +++ b/test/metadata_SUITE.erl @@ -31,7 +31,6 @@ -export([ read_write_delete_test/1, - manual_force_cleanup_test/1, partitioned_cluster_test/1, siblings_test/1 ]). @@ -73,8 +72,7 @@ end_per_testcase(_, _Config) -> ok. all() -> - [read_write_delete_test, manual_force_cleanup_test, - partitioned_cluster_test, siblings_test]. + [read_write_delete_test, partitioned_cluster_test, siblings_test]. read_write_delete_test(Config) -> [Node1|OtherNodes] = Nodes = proplists:get_value(nodes, Config), @@ -95,23 +93,6 @@ read_write_delete_test(Config) -> ok = wait_until_converged(Nodes, {foo, bar}, baz, undefined), ok. -manual_force_cleanup_test(Config) -> - ok = read_write_delete_test(Config), - Nodes = proplists:get_value(nodes, Config), - {Res1, _} = rpc:multicall(Nodes, plumtree_metadata_manager, size, [{foo, bar}]), - - %% every node still has one tombstone entry in the ets cache - ?assertEqual(length(Nodes), lists:sum(Res1)), - %% grace_seconds is 10 seconds, worst case we have to wait 2x that long - timer:sleep(20000), - {Res2, _} = rpc:multicall(Nodes, plumtree_metadata_manager, size, [{foo, bar}]), - - ?assertEqual(0, lists:sum(Res2)), - ok. - - - - partitioned_cluster_test(Config) -> [Node1|OtherNodes] = Nodes = proplists:get_value(nodes, Config), [?assertEqual(ok, rpc:call(Node, plumtree_peer_service, join, [Node1])) From d12db23687f9e1b038769112ab2ae971f14c34a6 Mon Sep 17 00:00:00 2001 From: Andre Graf Date: Sun, 10 Jul 2016 17:34:33 +0200 Subject: [PATCH 10/11] gc_grace_seconds imple using an ETS based graveyard --- src/plumtree.app.src | 3 +- src/plumtree_metadata_leveldb_instance.erl | 175 +++++++++++++++------ src/plumtree_metadata_leveldb_iterator.erl | 34 ++-- src/plumtree_metadata_manager.erl | 71 ++++----- 4 files changed, 177 insertions(+), 106 deletions(-) diff --git a/src/plumtree.app.src b/src/plumtree.app.src index 6c54bff..2134026 100644 --- a/src/plumtree.app.src +++ b/src/plumtree.app.src @@ -17,7 +17,8 @@ {env, [ {plumtree_data_dir, "data"}, {nr_of_meta_instances, 12}, - {gc_grace_seconds, 0}, % tombstones wont be gc'd + {gc_grace_seconds, undefined}, % tombstones wont be gc'd + {gc_interval, 10000}, {meta_leveldb_opts, [ {sync, false}, {total_leveldb_mem_percent, 6}, diff --git a/src/plumtree_metadata_leveldb_instance.erl b/src/plumtree_metadata_leveldb_instance.erl index 37f3429..6f6a6a6 100644 --- a/src/plumtree_metadata_leveldb_instance.erl +++ b/src/plumtree_metadata_leveldb_instance.erl @@ -29,6 +29,7 @@ name/1, get/1, put/2, + delete/2, status/1, data_size/1, iterator/2, @@ -51,12 +52,11 @@ write_opts = [], fold_opts = [{fill_cache, false}], open_iterators = [], - grace_secs, - cleanup=maps:new() + grace_period, + refs=ets:new(?MODULE, []) }). -type config() :: [{atom(), term()}]. --define(TOMBSTONE, '$deleted'). %%%=================================================================== %%% API functions @@ -83,6 +83,11 @@ put(Key, Value) -> Name = name(InstanceId), gen_server:call(Name, {put, Key, Value}, infinity). +delete(Key, Value) -> + InstanceId = plumtree_metadata_leveldb_instance_sup:get_instance_id_for_key(Key), + Name = name(InstanceId), + gen_server:call(Name, {delete, Key, Value}, infinity). + name(Id) -> list_to_atom("plmtrlvld_" ++ integer_to_list(Id)). @@ -129,14 +134,17 @@ init([InstanceId, Opts]) -> "meta"), DataDir2 = filename:join(DataDir1, integer_to_list(InstanceId)), - GraceSeconds = app_helper:get_prop_or_env(gc_grace_seconds, Opts, plumtree), - + GracePeriod = app_helper:get_prop_or_env(gc_grace_seconds, Opts, plumtree, + 60), + CleanupInterval = app_helper:get_prop_or_env(gc_interval, Opts, plumtree, + 10000), %% Initialize state S0 = init_state(DataDir2, Opts), process_flag(trap_exit, true), case open_db(S0) of {ok, State} -> - {ok, init_cleanup(State#state{grace_secs=GraceSeconds})}; + erlang:send_after(CleanupInterval, self(), cleanup), + {ok, init_graveyard(State#state{grace_period=GracePeriod})}; {error, Reason} -> {stop, Reason} end. @@ -157,24 +165,22 @@ init([InstanceId, Opts]) -> %%-------------------------------------------------------------------- handle_call({get, Key}, _From, #state{read_opts=ReadOpts, ref=Ref} = State) -> case eleveldb:get(Ref, sext:encode(Key), ReadOpts) of - {ok, Value} -> - {reply, {ok, binary_to_term(Value)}, State}; + {ok, BVal} -> + Reply = {ok, binary_to_term(BVal)}, + {reply, Reply, State}; not_found -> {reply, {error, not_found}, State}; {error, Reason} -> {reply, {error, Reason}, State} end; handle_call({put, Key, Value}, _From, #state{write_opts=WriteOpts, ref=Ref} = State) -> - Update = {put, sext:encode(Key), term_to_binary(Value)}, - {NewCleanup, CleanupOps} = maybe_trigger_cleanup(Key, Value, State), - %% Perform the write... - case eleveldb:write(Ref, [Update|CleanupOps], WriteOpts) of - ok -> - {reply, ok, State#state{cleanup=NewCleanup}}; - {error, Reason} -> - {reply, {error, Reason}, State} - end; -handle_call(status, _From, #state{ref=Ref, cleanup=Cleanup} = State) -> + UpdateOps = update_ops(Key, Value, State), + eleveldb:write(Ref, UpdateOps, WriteOpts), + {reply, true, State}; +handle_call({delete, Key, TombstoneVal}, _From, State) -> + Accepted = delete(Key, TombstoneVal, State), + {reply, Accepted, State}; +handle_call(status, _From, #state{ref=Ref} = State) -> {ok, Stats} = eleveldb:status(Ref, <<"leveldb.stats">>), {ok, ReadBlockError} = eleveldb:status(Ref, <<"leveldb.ReadBlockError">>), {reply, [{stats, Stats}, {read_block_error, ReadBlockError}], State}; @@ -236,6 +242,9 @@ handle_info({'DOWN', MRef, process, _, _}, #state{open_iterators=OpenIterators} eleveldb:iterator_close(Itr) end, {noreply, State#state{open_iterators=lists:keydelete(MRef, 1, OpenIterators)}}; +handle_info(cleanup, State) -> + erlang:send_after(10000, self(), cleanup), + {noreply, cleanup_graveyard(State)}; handle_info(_Info, State) -> {noreply, State}. @@ -362,40 +371,104 @@ open_db(State0, RetriesLeft, _) -> {error, Reason} end. -maybe_trigger_cleanup(Key, Val, #state{grace_secs=GS, cleanup=Cleanup}) when GS > 0 -> - Now = epoch(), - Cleanup1 = - case plumtree_metadata_object:values(Val) of - [?TOMBSTONE] -> - maps:put(Key, Now, Cleanup); +delete(Key, TombstoneVal, #state{ref=Ref, write_opts=WriteOpts, grace_period=undefined}) -> + %% grace seconds disabled + SKey = sext:encode(Key), + Hash = plumtree_metadata_object:hash(TombstoneVal), + plumtree_metadata_hashtree:insert(Key, Hash), + eleveldb:write(Ref, [{put, SKey, term_to_binary(TombstoneVal)}], WriteOpts), + true; +delete(Key, _, #state{ref=Ref, write_opts=WriteOpts, grace_period=0}) -> + %% No grace period: we are allowed to delete this key + %% might make sense on single node clusters + plumtree_metadata_hashtree:delete(Key), + SKey = sext:encode(Key), + eleveldb:write(Ref, [{delete, SKey}], WriteOpts), + true; +delete(Key, TombstoneVal, #state{ref=Ref, write_opts=WriteOpts, refs=Refs}) -> + %% Grace period defined, add tombstone to graveyard + Hash = plumtree_metadata_object:hash(TombstoneVal), + plumtree_metadata_hashtree:insert(Key, Hash), + SKey = sext:encode(Key), + case ets:lookup(Refs, SKey) of + [] -> + TS = epoch(), + GraveyardKey = graveyard_key(Key), + ets:insert(Refs, {SKey, TS}), + eleveldb:write(Ref, [{put, SKey, term_to_binary(TombstoneVal)}, + {put, GraveyardKey, term_to_binary(TS)}], + WriteOpts), + true; _ -> - maps:remove(Key, Cleanup) - end, - incr_cleanup(Cleanup1, Now, GS); -maybe_trigger_cleanup(_, _, #state{cleanup=Cleanup}) -> - {Cleanup, []}. - -incr_cleanup(Cleanup, Now, GS) -> - KeysToDelete = - maps:fold(fun(K, V, Acc) when (Now - V) > GS -> - [K|Acc]; - (_, _, Acc) -> - Acc - end, [], Cleanup), - NewCleanup = maps:without(KeysToDelete, Cleanup), - {NewCleanup, [{delete, sext:encode(K)} || K <- KeysToDelete]}. - -init_cleanup(#state{cleanup=Cleanup, fold_opts=FoldOpts, ref=Ref} = State) -> - Now = epoch(), - NewCleanup = - eleveldb:fold(Ref, fun({BKey, BVal} , Acc) -> - case plumtree_metadata_object:values(binary_to_term(BVal)) of - [?TOMBSTONE] -> - maps:put(sext:decode(BKey), Now, Acc); - _ -> Acc - end - end, Cleanup, FoldOpts), - State#state{cleanup=NewCleanup}. + %% already graveyarded, we got this 'new' tombstone through merge + false + end. + +update_ops(Key, Val, #state{refs=Refs}) -> + Hash = plumtree_metadata_object:hash(Val), + plumtree_metadata_hashtree:insert(Key, Hash), + SKey = sext:encode(Key), + BVal = term_to_binary(Val), + case ets:lookup(Refs, SKey) of + [] -> + [{put, SKey, BVal}]; + [{_, _}] -> + %% remove from graveyard + ets:delete(Refs, SKey), + [{put, SKey, BVal}, + {delete, graveyard_key(Key)}] + end. + +graveyard_key(Key) -> + %% Key should be plain, not sext encoded + sext:encode({'$graveyard', Key}). + +init_graveyard(#state{ref=Ref, fold_opts=FoldOpts} = State) -> + FirstKey = graveyard_key(''), + {ok, Itr} = eleveldb:iterator(Ref, FoldOpts), + init_graveyard(eleveldb:iterator_move(Itr, FirstKey), Itr, State). + +init_graveyard({error, _}, _, State) -> + %% no need to close the iterator + State; +init_graveyard({ok, SKey, Val}, Itr, State) -> + case sext:decode(SKey) of + {'$graveyard', Key} -> + TS = binary_to_term(Val), + %% We let the cleanup_graveyard check decide what to do + %% with tombstones older than graceseconds + ets:insert(State#state.refs, {sext:encode(Key), TS}), + init_graveyard(eleveldb:iterator_move(Itr, prefetch), Itr, State); + _ -> + eleveldb:iterator_close(Itr), + State + end. + + + +cleanup_graveyard(#state{grace_period=undefined} = State) -> State; +cleanup_graveyard(#state{grace_period=GS, refs=Refs} = State) -> + MatchHead = {'$1', '$2'}, + Guard = {'<', '$2', epoch() - GS}, + Result = '$1', + MatchFunction = {MatchHead, [Guard], [Result]}, + MatchSpec = [MatchFunction], + cleanup_graveyard(ets:select(Refs, MatchSpec, 100), [], State). + +cleanup_graveyard({[SKey|Rest], Cont}, Batch, State) -> + ets:delete(State#state.refs, SKey), + Key = sext:decode(SKey), + plumtree_metadata_hashtree:delete(Key), + cleanup_graveyard({Rest, Cont}, + [{delete, SKey}, + {delete, graveyard_key(Key)}|Batch], State); +cleanup_graveyard({[], Cont}, Batch, #state{ref=Ref, write_opts=WriteOpts} = State) -> + eleveldb:write(Ref, Batch, WriteOpts), + cleanup_graveyard(ets:select(Cont), [], State); +cleanup_graveyard('$end_of_table', [], State) -> State; +cleanup_graveyard('$end_of_table', Batch, #state{ref=Ref, write_opts=WriteOpts} = State) -> + eleveldb:write(Ref, Batch, WriteOpts), + State. epoch() -> {MegaSecs, Secs, _} = os:timestamp(), diff --git a/src/plumtree_metadata_leveldb_iterator.erl b/src/plumtree_metadata_leveldb_iterator.erl index 755cf1b..a37abb9 100644 --- a/src/plumtree_metadata_leveldb_iterator.erl +++ b/src/plumtree_metadata_leveldb_iterator.erl @@ -197,22 +197,26 @@ iterate(OkVal, Instances, FullPrefix, KeyMatch, KeysOnly) when element(1, OkVal) %% OkVal has the form of %% {ok, Key} or {ok, Key, Val} BKey = element(2, OkVal), - PrefixedKey = sext:decode(BKey), - case prefix_match(PrefixedKey, FullPrefix) of - {true, Key} -> - case key_match(Key, KeyMatch) of - true when KeysOnly -> - {{PrefixedKey, self()}, Instances}; - true -> - BVal = element(3, OkVal), - {{{PrefixedKey, binary_to_term(BVal)}, self()}, Instances}; + case sext:decode(BKey) of + {'$graveyard', _} -> + iterate(Instances, FullPrefix, KeyMatch, KeysOnly); + PrefixedKey -> + case prefix_match(PrefixedKey, FullPrefix) of + {true, Key} -> + case key_match(Key, KeyMatch) of + true when KeysOnly -> + {{PrefixedKey, self()}, Instances}; + true -> + BVal = element(3, OkVal), + {{{PrefixedKey, binary_to_term(BVal)}, self()}, Instances}; + false -> + iterate(Instances, FullPrefix, KeyMatch, KeysOnly) + end; false -> - iterate(Instances, FullPrefix, KeyMatch, KeysOnly) - end; - false -> - [{Itr, Instance}|RestInstances] = Instances, - ok = plumtree_metadata_leveldb_instance:iterator_close(Instance, Itr), - iterate(RestInstances, FullPrefix, KeyMatch, KeysOnly) + [{Itr, Instance}|RestInstances] = Instances, + ok = plumtree_metadata_leveldb_instance:iterator_close(Instance, Itr), + iterate(RestInstances, FullPrefix, KeyMatch, KeysOnly) + end end. prefix_match({_, Key}, {undefined, undefined}) -> {true, Key}; diff --git a/src/plumtree_metadata_manager.erl b/src/plumtree_metadata_manager.erl index 949d902..849336e 100644 --- a/src/plumtree_metadata_manager.erl +++ b/src/plumtree_metadata_manager.erl @@ -313,7 +313,7 @@ init([]) -> {stop, term(), term(), #state{}} | {stop, term(), #state{}}. handle_call({merge, PKey, Obj}, _From, State) -> - Result = read_merge_write(PKey, Obj), + Result = read_merge_action(PKey, Obj), {reply, Result, State}; handle_call({get, PKey}, _From, State) -> Result = read(PKey), @@ -444,59 +444,52 @@ finish_iterator(It) -> iterator_match(undefined) -> undefined; iterator_match(KeyMatch) when is_function(KeyMatch) -> KeyMatch. +read_modify_write(PKey, Context, '$deleted' = ValueOrFun) -> + Existing = read(PKey), + Modified = plumtree_metadata_object:modify(Existing, Context, ValueOrFun, node()), + delete(PKey, Existing, Modified), + Modified; read_modify_write(PKey, Context, ValueOrFun) -> Existing = read(PKey), Modified = plumtree_metadata_object:modify(Existing, Context, ValueOrFun, node()), - store(PKey, Modified). + store(PKey, Existing, Modified), + Modified. -read_merge_write(PKey, Obj) -> +read_merge_action(PKey, Obj) -> Existing = read(PKey), - IsDeleted = - case {Obj, Existing} of - {undefined, undefined} -> - true; - {undefined, O} -> - plumtree_metadata_object:values(O) == ['$deleted']; - {O, undefined} -> - plumtree_metadata_object:values(O) == ['$deleted']; - _ -> - false - end, - case IsDeleted of - true -> - false; + case plumtree_metadata_object:reconcile(Obj, Existing) of false -> - case plumtree_metadata_object:reconcile(Obj, Existing) of - false -> - false; - {true, Reconciled} -> - store(PKey, Reconciled), - true + false; + {true, Reconciled} -> + case plumtree_metadata_object:values(Obj) of + ['$deleted'|_] -> + delete(PKey, Existing, Reconciled); + _ -> + store(PKey, Existing, Reconciled) end end. -store({FullPrefix, Key}=PKey, Metadata) -> - Hash = plumtree_metadata_object:hash(Metadata), +store({FullPrefix, Key}=PKey, OldMeta, Metadata) -> OldObj = - case read(PKey) of - undefined -> - undefined; - OldMeta -> + case OldMeta of + undefined -> undefined; + _ -> [Val|_] = plumtree_metadata_object:values(OldMeta), Val end, - Event = - case plumtree_metadata_object:values(Metadata) of - ['$deleted'|_] -> - {deleted, FullPrefix, Key, OldObj}; - [NewObj|_] -> - {updated, FullPrefix, Key, OldObj, NewObj} - end, - plumtree_metadata_hashtree:insert(PKey, Hash), - plumtree_metadata_leveldb_instance:put(PKey, Metadata), + [NewObj|_] = plumtree_metadata_object:values(Metadata), + Event = {updated, FullPrefix, Key, OldObj, NewObj}, + Accepted = plumtree_metadata_leveldb_instance:put(PKey, Metadata), + trigger_subscription_event(FullPrefix, Event, + ets:lookup(?SUBS, FullPrefix)), + Accepted. + +delete({FullPrefix, Key}=PKey, OldObj, Metadata) -> + Event = {deleted, FullPrefix, Key, OldObj}, + Accepted = plumtree_metadata_leveldb_instance:delete(PKey, Metadata), trigger_subscription_event(FullPrefix, Event, ets:lookup(?SUBS, FullPrefix)), - Metadata. + Accepted. trigger_subscription_event(FullPrefix, Event, [{FullPrefix, {Pid, _}}|Rest]) -> Pid ! Event, From ae50d12bda7528cdf85c47a42caff6af50d2ecf0 Mon Sep 17 00:00:00 2001 From: Andre Graf Date: Tue, 12 Jul 2016 13:43:16 +0200 Subject: [PATCH 11/11] added gc_interval config option --- src/plumtree_metadata_leveldb_instance.erl | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/plumtree_metadata_leveldb_instance.erl b/src/plumtree_metadata_leveldb_instance.erl index 6f6a6a6..7e7c24e 100644 --- a/src/plumtree_metadata_leveldb_instance.erl +++ b/src/plumtree_metadata_leveldb_instance.erl @@ -53,6 +53,7 @@ fold_opts = [{fill_cache, false}], open_iterators = [], grace_period, + gc_interval, refs=ets:new(?MODULE, []) }). @@ -135,16 +136,24 @@ init([InstanceId, Opts]) -> DataDir2 = filename:join(DataDir1, integer_to_list(InstanceId)), GracePeriod = app_helper:get_prop_or_env(gc_grace_seconds, Opts, plumtree, - 60), - CleanupInterval = app_helper:get_prop_or_env(gc_interval, Opts, plumtree, - 10000), + undefined), + CleanupInterval = + case GracePeriod of + undefined -> + undefined; + _ -> + I = app_helper:get_prop_or_env(gc_interval, Opts, plumtree, + 10000), + erlang:send_after(I, self(), cleanup) + end, + %% Initialize state S0 = init_state(DataDir2, Opts), process_flag(trap_exit, true), case open_db(S0) of {ok, State} -> - erlang:send_after(CleanupInterval, self(), cleanup), - {ok, init_graveyard(State#state{grace_period=GracePeriod})}; + {ok, init_graveyard(State#state{grace_period=GracePeriod, + gc_interval=CleanupInterval})}; {error, Reason} -> {stop, Reason} end. @@ -243,7 +252,7 @@ handle_info({'DOWN', MRef, process, _, _}, #state{open_iterators=OpenIterators} end, {noreply, State#state{open_iterators=lists:keydelete(MRef, 1, OpenIterators)}}; handle_info(cleanup, State) -> - erlang:send_after(10000, self(), cleanup), + erlang:send_after(State#state.gc_interval, self(), cleanup), {noreply, cleanup_graveyard(State)}; handle_info(_Info, State) -> {noreply, State}.