Skip to content

Commit cd98647

Browse files
committed
Better sharding and conflict managing
- Separate keys index from storage, make it global - Shard by payload hmac - Handle conflicting PUT with ETags
1 parent ebf5ec5 commit cd98647

File tree

6 files changed

+122
-132
lines changed

6 files changed

+122
-132
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Storage is gen_server owning a private ets `set` table. Entries stored indexed b
2323

2424
#### Placement
2525

26-
Sharding done with consistent hasing where each erlang node holds a shard range. The placement decided by first byte of key's hmac, so there might be max 256 shards. Ranges defined explicitly by nodes names provided in format `{name}-{range beginning inclusive}-{range ending inclusive}@hostname`. If beginning of the range large than ending, the node will hold two ranges flipped over 0.
26+
Sharding done with consistent hasing where each erlang node holds a shard range. The placement decided by first byte of value's hmac, so there might be max 256 shards. Ranges defined explicitly by nodes names provided in format `{name}-{range beginning inclusive}-{range ending inclusive}@hostname`. If beginning of the range large than ending, the node will hold two ranges flipped over 0.
2727

2828
#### Configuration examples
2929

src/seppen.erl

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -7,60 +7,73 @@
77
-type value() :: binary().
88

99
%% public API
10-
-export([list/0, get/1, get_uid/1, member/1, set/2, delete/1]).
10+
-export([set/2, get/1, hmac/1, member/1, delete/1, list/0]).
1111

1212
%% application callbacks
1313
-export([start/2, stop/1]).
1414

1515
%% supervisor callbacks
1616
-export([start_link/0, init/1]).
1717

18+
-define(INDEX, seppen_index).
1819
-define(STORE, seppen_store).
1920

2021

2122
%% Public API
2223

23-
-spec list() -> [key()].
24-
list() ->
25-
Nodes = seppen_dispatch:all_shards(),
26-
{Replies, _} = gen_server:multi_call(Nodes, ?STORE, list),
27-
lists:merge([R || {_Node, R} <- Replies]).
24+
-spec set(key(), value()) -> ok | {error, term()}.
25+
set(Key, Value) ->
26+
case gen_server:call(?INDEX, {get, Key}) of
27+
{ok, OldVHmac} ->
28+
%% FIXME! check that none other keys point to this before deleting
29+
OldNodes = seppen_dispatch:shards(OldVHmac),
30+
abcast = gen_server:abcast(OldNodes, ?STORE, {delete, OldVHmac});
31+
{error, not_found} ->
32+
ok
33+
end,
34+
VHmac = seppen_hash:hmac(Value),
35+
Nodes = seppen_dispatch:shards(VHmac),
36+
AllNodes = seppen_dispatch:all_shards(),
37+
abcast = gen_server:abcast(Nodes, ?STORE, {set, VHmac, Value}),
38+
abcast = gen_server:abcast(AllNodes, ?INDEX, {set, Key, VHmac}),
39+
ok.
2840

2941
-spec get(key()) -> {ok, value()} | {error, term()}.
3042
get(Key) ->
31-
Nodes = seppen_dispatch:shards(Key),
32-
Reply = gen_server:multi_call(Nodes, ?STORE, {get, Key}),
33-
multi_reply(Reply).
34-
35-
-spec get_uid(key()) -> {ok, binary()} | {error, term()}.
36-
get_uid(Key) ->
37-
Nodes = seppen_dispatch:shards(Key),
38-
Reply = gen_server:multi_call(Nodes, ?STORE, {get_uid, Key}),
39-
multi_reply(Reply).
43+
case gen_server:call(?INDEX, {get, Key}) of
44+
{ok, VHmac} ->
45+
Nodes = seppen_dispatch:shards(VHmac),
46+
{Replies, []} = gen_server:multi_call(Nodes, ?STORE, {get, VHmac}),
47+
{_, Reply} = hd(Replies),
48+
Reply;
49+
Error ->
50+
Error
51+
end.
52+
53+
-spec hmac(key()) -> {ok, binary()} | {error, term()}.
54+
hmac(Key) ->
55+
gen_server:call(?INDEX, {get, Key}).
4056

4157
-spec member(key()) -> boolean().
4258
member(Key) ->
43-
Nodes = seppen_dispatch:shards(Key),
44-
Reply = gen_server:multi_call(Nodes, ?STORE, {member, Key}),
45-
multi_reply(Reply).
46-
47-
-spec set(key(), value()) -> ok | {error, term()}.
48-
set(Key, Value) ->
49-
Nodes = seppen_dispatch:shards(Key),
50-
Reply = gen_server:multi_call(Nodes, ?STORE, {set, Key, Value}),
51-
multi_reply(Reply).
59+
gen_server:call(?INDEX, {member, Key}).
5260

5361
-spec delete(key()) -> ok | {error, term()}.
5462
delete(Key) ->
55-
Nodes = seppen_dispatch:shards(Key),
56-
Reply = gen_server:multi_call(Nodes, ?STORE, {delete, Key}),
57-
multi_reply(Reply).
63+
case gen_server:call(?INDEX, {get, Key}) of
64+
{ok, OldVHmac} ->
65+
OldNodes = seppen_dispatch:shards(OldVHmac),
66+
abcast = gen_server:abcast(OldNodes, ?STORE, {delete, OldVHmac}),
67+
AllNodes = seppen_dispatch:all_shards(),
68+
abcast = gen_server:abcast(AllNodes, ?INDEX, {delete, Key}),
69+
ok;
70+
Error ->
71+
Error
72+
end.
5873

59-
%% private
60-
61-
multi_reply({Replies, _BadNodes}) ->
62-
[Reply] = sets:to_list(sets:from_list([R || {_Node, R} <- Replies])),
63-
Reply.
74+
-spec list() -> [key()].
75+
list() ->
76+
gen_server:call(?INDEX, list).
6477

6578

6679
%% application callbacks
@@ -81,7 +94,11 @@ init([]) ->
8194
Children = [
8295
#{
8396
id => seppen_store,
84-
start => {seppen_store, start_link, []}
97+
start => {seppen_store, start_link, [?STORE]}
98+
},
99+
#{
100+
id => seppen_index,
101+
start => {seppen_store, start_link, [?INDEX]}
85102
},
86103
#{
87104
id => seppen_dispatch,

src/seppen_rest.erl

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,13 @@ generate_etag(#{path := <<"/_keys">>} = Req, Ctx) ->
4545
{undefined, Req, Ctx};
4646
generate_etag(Req, Ctx) ->
4747
Key = cowboy_req:binding(key, Req),
48-
{ok, UID} = seppen:get_uid(Key),
49-
ETag = iolist_to_binary([$", seppen_hash:to_hex(UID), $"]),
48+
{ok, Hmac} = seppen:hmac(Key),
49+
ETag = iolist_to_binary([$", seppen_hash:to_hex(Hmac), $"]),
5050
{ETag, Req, Ctx}.
5151

5252
expires(Req, Ctx) ->
5353
{undefined, Req, Ctx}.
5454

55-
5655
get_resource(#{path := <<"/_keys">>} = Req, Ctx) ->
5756
Body = jiffy:encode(seppen:list()),
5857
{Body, Req, Ctx};
@@ -64,16 +63,8 @@ get_resource(Req, Ctx) ->
6463
set_resource(Req0, Ctx) ->
6564
Key = cowboy_req:binding(key, Req0),
6665
{ok, Value, Req1} = cowboy_req:read_body(Req0),
67-
Req = case seppen:set(Key, Value) of
68-
ok ->
69-
Req1;
70-
{error, conflict} ->
71-
%% FIXME! ok, the correct way to handle this is
72-
%% 1. get UID for payload and check if it's in storage in resource_exists
73-
%% 2. add method is_conflict, that always returns true
74-
cowboy_req:reply(409, Req1)
75-
end,
76-
{true, Req, Ctx}.
66+
ok = seppen:set(Key, Value),
67+
{true, Req1, Ctx}.
7768

7869
delete_resource(Req, Ctx) ->
7970
Key = cowboy_req:binding(key, Req),

src/seppen_store.erl

Lines changed: 21 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
-behaviour(gen_server).
44

5-
-export([start_link/0]).
5+
-export([start_link/1]).
66

77
-export([
88
init/1,
@@ -13,64 +13,36 @@
1313
-record(kv, {key, value}).
1414

1515

16-
start_link() ->
17-
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
16+
start_link(Name) ->
17+
gen_server:start_link({local, Name}, ?MODULE, [], []).
1818

1919

2020
init([]) ->
21-
MasterKey = erlang:atom_to_binary(erlang:get_cookie(), unicode),
2221
Tid = ets:new(?MODULE, [set, private, {keypos, #kv.key}]),
23-
Idx = ets:new(?MODULE, [set, private, {keypos, #kv.key}]),
24-
{ok, #{tid => Tid, idx => Idx, mkey => MasterKey}}.
22+
{ok, #{tid => Tid}}.
2523

26-
handle_call({set, Key, Value}, _, Ctx) ->
27-
#{
28-
tid := Tid,
29-
idx := Idx,
30-
mkey := MKey
31-
} = Ctx,
32-
UID = seppen_hash:hmac(MKey, Value),
33-
Reply = case ets:member(Tid, UID) of
34-
true ->
35-
{error, conflict};
36-
false ->
37-
%% FIXME! actually delete OldUID first
38-
true = ets:insert(Idx, #kv{key = Key, value = UID}),
39-
true = ets:insert(Tid, #kv{key = UID, value = Value}),
40-
ok
41-
end,
42-
{reply, Reply, Ctx};
43-
handle_call({delete, Key}, _, #{tid := Tid, idx := Idx} = Ctx) ->
44-
[#kv{value = UID}] = ets:lookup(Idx, Key),
45-
true = ets:delete(Idx, Key),
46-
true = ets:delete(Tid, UID),
47-
{reply, ok, Ctx};
48-
handle_call({member, Key}, _, #{idx := Idx} = Ctx) ->
49-
IsMemeber = ets:member(Idx, Key),
50-
{reply, IsMemeber, Ctx};
51-
handle_call({get, Key}, _, #{tid := Tid, idx := Idx} = Ctx) ->
52-
Reply = case ets:lookup(Idx, Key) of
53-
[#kv{value = UID}] ->
54-
[#kv{value = Value}] = ets:lookup(Tid, UID),
55-
{ok, Value};
56-
[] ->
57-
{error, not_found}
58-
end,
59-
{reply, Reply, Ctx};
60-
handle_call({get_uid, Key}, _, #{idx := Idx} = Ctx) ->
61-
Reply = case ets:lookup(Idx, Key) of
62-
[#kv{value = UID}] ->
63-
{ok, UID};
24+
handle_call({get, Key}, _, #{tid := Tid} = Ctx) ->
25+
case ets:lookup(Tid, Key) of
26+
[#kv{value = Value}] ->
27+
{reply, {ok, Value}, Ctx};
6428
[] ->
65-
{error, not_found}
66-
end,
67-
{reply, Reply, Ctx};
68-
handle_call(list, _, #{idx := Idx} = Ctx) ->
29+
{reply, {error, not_found}, Ctx}
30+
end;
31+
handle_call({member, Key}, _, #{tid := Tid} = Ctx) ->
32+
IsMemeber = ets:member(Tid, Key),
33+
{reply, IsMemeber, Ctx};
34+
handle_call(list, _, #{tid := Tid} = Ctx) ->
6935
Head = #kv{key = '$1', _ = '_'},
70-
Keys = ets:select(Idx, [{Head, [], ['$1']}]),
36+
Keys = ets:select(Tid, [{Head, [], ['$1']}]),
7137
{reply, Keys, Ctx};
7238
handle_call(_, _, Ctx) ->
7339
{stop, unknown_call, Ctx}.
7440

41+
handle_cast({set, Key, Value}, #{tid := Tid} = Ctx) ->
42+
true = ets:insert(Tid, #kv{key = Key, value = Value}),
43+
{noreply, Ctx};
44+
handle_cast({delete, Key}, #{tid := Tid} = Ctx) ->
45+
true = ets:delete(Tid, Key),
46+
{noreply, Ctx};
7547
handle_cast(_, Ctx) ->
7648
{stop, unknown_cast, Ctx}.

0 commit comments

Comments
 (0)