Skip to content

Commit

Permalink
Merge pull request #173 from eraft-io/feature_20230610_vector
Browse files Browse the repository at this point in the history
Feature 20230610 vector
  • Loading branch information
LLiuJJ authored Jun 25, 2023
2 parents 7ecaca6 + 79ad1ba commit b3eb1ac
Show file tree
Hide file tree
Showing 21 changed files with 2,923 additions and 111 deletions.
5 changes: 5 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ list(APPEND eraftvdb_sources src/unbounded_buffer.cc)
list(APPEND eraftvdb_sources src/unbounded_buffer.h)
list(APPEND eraftvdb_sources src/eraftkv.grpc.pb.cc)
list(APPEND eraftvdb_sources src/eraftkv.pb.cc)
list(APPEND eraftvdb_sources src/info_command_handler.cc)
list(APPEND eraftvdb_sources src/set_command_handler.cc)
list(APPEND eraftvdb_sources src/get_command_handler.cc)
list(APPEND eraftvdb_sources src/unknow_command_handler.cc)

add_executable(eraft-vdb ${eraftvdb_sources})
target_link_libraries(eraft-vdb
Expand Down Expand Up @@ -278,6 +282,7 @@ target_link_libraries(grpc_network_impl_test PUBLIC
${Protobuf_LIBRARY}
)


# eraft-ctl
add_executable(eraftkv-ctl
src/eraftkv_ctl.cc
Expand Down
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# ERaftVDB
# ERaftKDB

ERaftVDB is a distributed vector database that supports the RESP protocol for vector operations, and uses ERaftKV as the distributed storage layer.
ERaftKDB is a distributed database that supports the Redis RESP protocol, and uses ERaftKV as the distributed storage layer.

![eraft-vdb](doc/eraft-vdb.png)
![eraft-kdb](doc/eraft-kdb.jpg)

## ERaftKV

Expand Down Expand Up @@ -63,14 +63,18 @@ command output
chmod +x utils/run-vdb-tests.sh
docker run --name vdbserver-node-tests --network mytestnetwork --ip 172.18.0.8 -it --rm -v /home/colin/eraft:/eraft eraft/eraftkv:v0.0.6 /eraft/utils/run-vdb-tests.sh
+ redis-cli -h 172.18.0.6 -p 12306 info
server_id: 0,server_address: 172.18.0.2:8088,status: Running,Role: Leader
server_id: 1,server_address: 172.18.0.3:8089,status: Running,Role: Follower
server_id: 0,server_address: 172.18.0.2:8088,status: Running,Role: Follower
server_id: 1,server_address: 172.18.0.3:8089,status: Running,Role: Leader
server_id: 2,server_address: 172.18.0.4:8090,status: Running,Role: Follower
+ redis-cli -h 172.18.0.6 -p 12306 set a testvalue
OK
+ redis-cli -h 172.18.0.6 -p 12306 get a
"testvalue"
```

- step 4, clean all
```
sudo make run stop-demo
sudo make stop-demo
sudo make rm-net
```

Expand Down
Binary file added doc/eraft-kdb.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions doc/eraft-vdb.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ Vec stores a single vector, include vector id, vector data and additional commen
- vid:
A unique identifier of your vector.

- vdata:
- vdata (vextra):
Vectorized representation of information in the physical world such as sound, video, and pictures.

- vextra:
- vlabel:
The description information of the vector datam which is a json structure.

### Storage Cluster
Expand Down
53 changes: 53 additions & 0 deletions doc/vdb-server.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
## vdb-server RESP protocol for vector design

### 1.Create a Vecset

- Syntax
```
VECSET ADD vecset_name vecset_size
```
- Return
vecset id

### 2.Get a Vecset

- Syntax
```
VECSET GET vecset_name
```
- Return

Bulk string reply:

```
vecset_size
vec_count
vecdata_disk_size
vecdata_mem_size
c_time
```

### 3.Add Vecs to Vecset

- Syntax
```
VEC ADD vecset_name vector_data vector_label [vector_data vector_label ...]
```

- Return
The status of the add operation.

### 4.Search from a vecset

-Syntax
```
VEC SEARCH vecset_name vector_data DESC count
```
- Return

search result vector data similar with
vector_data.

```
vec_id score vec_label [vec_id score vec_label ...]
```
87 changes: 87 additions & 0 deletions doc/vec-storage-encode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
## sys_catalog

key encode
```
-------------------------------------------------------------------
| 'S' | 'Y' | 'S' | '_' | 'C' | 'A' | 'T' | 'A' | 'L' | 'O' | 'G' |
-------------------------------------------------------------------
```
value encode
```
--------------------------------
| SysCatalog protobuf message |
--------------------------------
```

- SysCatalog message
```
{
uint64 vecset_count
uint64 max_vecset_id
uint64 mem_used
uint64 disk_used
map<string, uint64> vecset_name2id
}
```

## vecset

```
key encode
---------------------------------------------------
| 'V' | 'E' | 'C' | 'S' | 'E' | 'T' | ID(uint64) |
---------------------------------------------------
value encode
----------------------------
| Vecset protobuf message |
----------------------------
```

- Vecset protobuf message

```
{
uint64_t id
bytes name
uint64_t vec_count
uint64_t max_vec_id
uint64_t used_disk_capacity
uint64_t used_mem_capacity
uint64_t c_time
uint64_t m_time
}
```

## vec

```
key encode
----------------------------------------------------------------------------
| 'V' | 'E' | 'C' | 'T' | 'O' | 'R' | VECSET_ID(uint64) | VEC_ID(uint64) |
----------------------------------------------------------------------------
value encode
------------------------
| Vec protobuf message |
------------------------
```

- Vec protobuf message

```
{
uint64_t id
uint64_t dim
repeated double vdata
bytes vlabel
uint64_t c_time
uint64_t m_time
}
```
5 changes: 5 additions & 0 deletions doc/vector-similarity-search-in-eraftvdb.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

https://colab.research.google.com/drive/1OLdDYk-TzVNhpyL3pAONtkHnKGI-GUsU?usp=sharing#scrollTo=420dPmKH8sTi

https://medium.com/swlh/fine-grained-image-similarity-detection-using-facebook-ai-similarity-search-faiss-b357da4f1644

28 changes: 28 additions & 0 deletions protocol/eraftkv.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,34 @@ message ClientOperationResp {
repeated KvOpPair ops = 2;
}

message SysCatalog {
uint64 vecset_count = 1;
uint64 max_vecset_id = 2;
uint64 mem_used = 3;
uint64 disk_used = 4;
map<string, uint64> vecset_name2id = 5;
}

message Vecset {
uint64 id = 1;
string name = 2;
uint64 vec_count = 3;
uint64 max_vec_id = 4;
uint64 used_disk_capacity = 5;
uint64 used_mem_capacity = 6;
uint64 c_time = 7;
uint64 m_time = 8;
}

message Vec {
uint64 id = 1;
uint64 dim = 2;
repeated double vdata = 3;
string vlabel = 4;
uint64 c_time = 5;
uint64 m_time = 6;
}

service ERaftKv {
rpc RequestVote(RequestVoteReq) returns (RequestVoteResp);
rpc AppendEntries(AppendEntriesReq) returns (AppendEntriesResp);
Expand Down
65 changes: 27 additions & 38 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,50 +26,22 @@ PacketLength Client::_HandlePacket(const char *start, std::size_t bytes) {
const char *ptr = start;
parser_.ParseRequest(ptr, end);

CommandHandler *handler = nullptr;
if (parser_.GetParams()[0] == "info") {
ClientContext context;
eraftkv::ClusterConfigChangeReq req;
req.set_change_type(eraftkv::ClusterConfigChangeType::Query);
eraftkv::ClusterConfigChangeResp resp;

auto status =
stubs_.begin()->second->ClusterConfigChange(&context, req, &resp);
std::string info_str;
for (int i = 0; i < resp.shard_group(0).servers_size(); i++) {
info_str += "server_id: ";
info_str += std::to_string(resp.shard_group(0).servers(i).id());
info_str += ",server_address: ";
info_str += resp.shard_group(0).servers(i).address();
resp.shard_group(0).servers(i).server_status() ==
eraftkv::ServerStatus::Up
? info_str += ",status: Running"
: info_str += ",status: Down";
resp.shard_group(0).leader_id() == resp.shard_group(0).servers(i).id()
? info_str += ",Role: Leader"
: info_str += ",Role: Follower";
info_str += "\r\n";
}
std::string reply_buf;
reply_buf += "$";
reply_buf += std::to_string(info_str.size());
reply_buf += "\r\n";
reply_buf += info_str;
reply_buf += "\r\n";
reply_.PushData(reply_buf.c_str(), reply_buf.size());
SendPacket(reply_);

_Reset();
return static_cast<PacketLength>(bytes);
handler = new InfoCommandHandler();
} else if (parser_.GetParams()[0] == "set") {
handler = new SetCommandHandler();
} else if (parser_.GetParams()[0] == "get") {
handler = new GetCommandHandler();
} else {
handler = new UnKnowCommandHandler();
}

reply_.PushData("+OK\r\n", 5);
SendPacket(reply_);

_Reset();
handler->Execute(parser_.GetParams(), this);
return static_cast<PacketLength>(bytes);
}

Client::Client(std::string kv_addrs) {
Client::Client(std::string kv_addrs) : leader_addr_("") {
// init stub to kv server node
auto kv_node_addrs = StringUtil::Split(kv_addrs, ',');
for (auto kv_node_addr : kv_node_addrs) {
Expand All @@ -87,3 +59,20 @@ void Client::_Reset() {
}

void Client::OnConnect() {}

std::string Client::GetLeaderAddr() {
ClientContext context;
eraftkv::ClusterConfigChangeReq req;
req.set_change_type(eraftkv::ClusterConfigChangeType::Query);
eraftkv::ClusterConfigChangeResp resp;
auto status =
this->stubs_.begin()->second->ClusterConfigChange(&context, req, &resp);
std::string leader_addr = "";
for (int i = 0; i < resp.shard_group(0).servers_size(); i++) {
if (resp.shard_group(0).leader_id() ==
resp.shard_group(0).servers(i).id()) {
leader_addr = resp.shard_group(0).servers(i).address();
}
}
return leader_addr;
}
11 changes: 10 additions & 1 deletion src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <grpcpp/grpcpp.h>

#include "command_handler.h"
#include "eraftkv.grpc.pb.h"
#include "eraftkv.pb.h"
#include "proto_parser.h"
Expand All @@ -25,8 +26,12 @@ using grpc::Channel;
using grpc::ClientContext;
using grpc::Status;


class Client : public StreamSocket {
friend class InfoCommandHandler;
friend class SetCommandHandler;
friend class GetCommandHandler;
friend class UnKnowCommandHandler;

private:
PacketLength _HandlePacket(const char *msg, std::size_t len) override;

Expand All @@ -36,9 +41,13 @@ class Client : public StreamSocket {

std::map<std::string, std::unique_ptr<ERaftKv::Stub> > stubs_;

std::string leader_addr_;

public:
Client(std::string kv_addrs);

std::string GetLeaderAddr();

void _Reset();

void OnConnect() override;
Expand Down
Loading

0 comments on commit b3eb1ac

Please sign in to comment.