Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Introduce EtcdMetadataStore #4520

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions .github/workflows/interactive.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ jobs:
git submodule update --init
cd ${GITHUB_WORKSPACE}/flex
mkdir build && cd build
cmake .. -DCMAKE_INSTALL_PREFIX=/opt/graphscope -DCMAKE_BUILD_TYPE=DEBUG && sudo make -j$(nproc)
cmake .. -DCMAKE_INSTALL_PREFIX=/opt/graphscope -DCMAKE_BUILD_TYPE=DEBUG -DBUILD_ETCD_METASTORE=ON && sudo make -j$(nproc)
# package the build artifacts
cd .. && tar -zcf build.tar.gz build

Expand Down Expand Up @@ -186,6 +186,22 @@ jobs:
cd ${GITHUB_WORKSPACE}/flex/tests/hqps
bash hqps_admin_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./interactive_config_test.yaml ${GS_TEST_DIR}

# test admin service with etcd metastore
GOOGLE_URL=https://storage.googleapis.com/etcd
GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GOOGLE_URL}
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test
/tmp/etcd-download-test/etcd &
sleep 3

cd ${GITHUB_WORKSPACE}/flex/tests/hqps
python3 -c 'import yaml;f=open("./interactive_config_test.yaml");y=yaml.safe_load(f);y["compute_engine"]["metadata_store"]["uri"] = "http://localhost:2379"; f.close();f=open("./interactive_config_test_etcd.yaml","w");yaml.dump(y,f);f.close()'
bash hqps_admin_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./interactive_config_test_etcd.yaml ${GS_TEST_DIR}
rm ./interactive_config_test_etcd.yaml


- name: Build and test Interactive Java/Python SDK
env:
FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph
Expand Down Expand Up @@ -483,7 +499,7 @@ jobs:
git clone --single-branch https://github.com/alibaba/libgrape-lite.git /tmp/libgrape-lite
cd /tmp/libgrape-lite
mkdir -p build && cd build
cmake ..
cmake .. -BUILD_ETCD_METASTORE=ON
make -j$(nproc)
make install

Expand Down Expand Up @@ -649,3 +665,16 @@ jobs:
BULK_LOAD_FILE=${GITHUB_WORKSPACE}/flex/interactive/examples/modern_graph/bulk_load.yaml
sed -i 's/|/\\t/g' ${BULK_LOAD_FILE}
GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/

- name: Test etcd-based metadata service
run: |
GOOGLE_URL=https://storage.googleapis.com/etcd
GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GOOGLE_URL}
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test
/tmp/etcd-download-test/etcd &
sleep 3

GLOG_v=10 ./tests/hqps/etcd_meta_test http://localhost:2379/
6 changes: 6 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,9 @@
[submodule "flex/third_party/parallel-hashmap"]
path = flex/third_party/parallel-hashmap
url = https://github.com/greg7mdp/parallel-hashmap.git
[submodule "flex/third_party/etcd-cpp-apiv3"]
path = flex/third_party/etcd-cpp-apiv3
url = https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3.git
[submodule "flex/third_party/cpprestsdk"]
path = flex/third_party/cpprestsdk
url = https://github.com/microsoft/cpprestsdk.git
1 change: 1 addition & 0 deletions docs/flex/interactive/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ In this following table, we use the `.` notation to represent the hierarchy with
| verbose_level | 0 | The verbose level of database log, should be a int | 0.0.3 |
| compute_engine.thread_num_per_worker | 1 | The number of threads will be used to process the queries. Increase the number can benefit the query throughput | 0.0.1 |
| compute_engine.wal_uri | file://{GRAPH_DATA_DIR}/wal | The location where Interactive will store and access WALs. `GRAPH_DATA_DIR` is a placeholder that will be populated by Interactive. | 0.5 |
| compute_engine.metadata_store.uri | file://{WORKSPACE}/METADATA | The location where Interactive will store and access metadatas. `WORKSPACE` is a placeholder that will be populated by Interactive. | 0.5 |
| compiler.planner.is_on | true | Determines if query optimization is enabled for compiling Cypher queries | 0.0.1 |
| compiler.planner.opt | RBO | Specifies the optimizer to be used for query optimization. Currently, only the Rule-Based Optimizer (RBO) is supported | 0.0.1 |
| compiler.planner.rules.FilterMatchRule | N/A | An optimization rule that pushes filter (`Where`) conditions into the `Match` clause | 0.0.1 |
Expand Down
35 changes: 35 additions & 0 deletions docs/flex/interactive/development/dev_and_test.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,38 @@ In Interactive's execution engine, transactions such as `ReadTransaction`, `Upda
2. If a transaction returns `false` during the `commit()` process, the error occurred prior to applying the WAL to the graph data. This type of failure could arise during the construction of the WAL or during its writing phase.

3. It is important to note that errors can still occur when replaying the WAL to the graph database. Replaying might fail due to limitations in resources or due to unforeseen bugs. **However,** any errors encountered during this stage will be handled via exceptions or may result in process failure. Currently, there is no established mechanism to handle such failures. Future improvements should focus on implementing failover strategies, potentially allowing the GraphDB to continue replaying the WAL until it succeeds.


## Using ETCD as Metadata storage

```bash
ETCD_VER=v3.4.13

# choose either URL
GOOGLE_URL=https://storage.googleapis.com/etcd
GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GOOGLE_URL}

rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test

curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz
tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1
rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz

cd /tmp/etcd-download-test
# You may find etcd and etcdctl in the directory
```

Start a local etcd server

```bash
cd /tmp/etcd-download-test
etcd
```

Now in another terminal test etcd metadata store.
```bash
cd GraphScope/flex/build
./tests/hqps/etcd_meta_test http://localhost:2379
```
19 changes: 19 additions & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ option(USE_PTHASH "Whether to use pthash" OFF)
option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host
option(USE_STATIC_ARROW "Whether to use static arrow" OFF) # Whether to link arrow statically, default is OFF
option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF
option(BUILD_ETCD_METASTORE "Whether to build with etcd metastore" OFF) # Whether to build with etcd metastore, default is OFF

#print options
message(STATUS "Build test: ${BUILD_TEST}")
Expand Down Expand Up @@ -226,6 +227,24 @@ macro(install_without_export_flex_target target)
)
endmacro()

if (BUILD_ETCD_METASTORE)
add_definitions(-DBUILD_ETCD_METASTORE)
# cd to third_party/cpprestsdk and run git submodule update --init
include("cmake/BuildEtcdCpp.cmake")
# set(WERROR OFF CACHE BOOL "Treat warnings as errors")
# set(CPPREST_EXCLUDE_WEBSOCKETS ON CACHE BOOL "Exclude websockets functionality..")
# add_subdirectory(third_party/cpprestsdk EXCLUDE_FROM_ALL)
# set(CPPREST_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cpprestsdk/Release/include)
# set(CPPREST_LIB cpprest)
# add_subdirectory(third_party/etcd-cpp-apiv3 EXCLUDE_FROM_ALL)
include_directories(SYSTEM ${CPPREST_INCLUDE_DIR})
# for each directory in ${ETCD_CPP_INCLUDE_DIR}
foreach(dir ${ETCD_CPP_INCLUDE_DIR})
message(STATUS "Include directory: ${dir}")
include_directories(SYSTEM ${dir})
endforeach()
endif()

add_subdirectory(utils)
add_subdirectory(codegen)
add_subdirectory(storages)
Expand Down
35 changes: 35 additions & 0 deletions flex/cmake/BuildEtcdCpp.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2020-2023 Alibaba Group Holding Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This File is copied from https://github.com/v6d-io/v6d/blob/main/cmake/BuildEtcdCpp.cmake
# build cpprestsdk
set(WERROR OFF CACHE BOOL "Treat warnings as errors")
set(BUILD_TESTS OFF CACHE BOOL "Build tests.")
set(BUILD_SAMPLES OFF CACHE BOOL "Build sample applications.")
set(CPPREST_EXCLUDE_WEBSOCKETS ON CACHE BOOL "Exclude websockets functionality..")
add_subdirectory(third_party/cpprestsdk)
set(CPPREST_INCLUDE_DIR flex/third_party/cpprestsdk/Release/include)
set(CPPREST_LIB cpprest)

# disable a warning message inside cpprestsdk on Mac with llvm/clang
if(W_NO_UNUSED_BUT_SET_PARAMETER)
target_compile_options(cpprest PRIVATE -Wno-unused-but-set-parameter)
endif()

# build etcd-cpp-apiv3
add_subdirectory(third_party/etcd-cpp-apiv3)
set(ETCD_CPP_LIBRARIES etcd-cpp-api)
set(ETCD_CPP_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/third_party/etcd-cpp-apiv3/
${PROJECT_BINARY_DIR}/third_party/etcd-cpp-apiv3/proto/gen
${PROJECT_BINARY_DIR}/third_party/etcd-cpp-apiv3/proto/gen/proto)
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/wal/local_wal_parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ LocalWalParser::LocalWalParser(const std::string& wal_uri)
}

void LocalWalParser::open(const std::string& wal_uri) {
auto wal_dir = get_wal_uri_path(wal_uri);
auto wal_dir = get_uri_path(wal_uri);
if (!std::filesystem::exists(wal_dir)) {
std::filesystem::create_directory(wal_dir);
}
Expand Down
1 change: 1 addition & 0 deletions flex/engines/graph_db/database/wal/local_wal_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <vector>
#include "flex/engines/graph_db/database/wal/wal.h"
#include "flex/utils/service_utils.h"

namespace gs {

Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/wal/local_wal_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ std::unique_ptr<IWalWriter> LocalWalWriter::Make() {
}

void LocalWalWriter::open(const std::string& wal_uri, int thread_id) {
auto prefix = get_wal_uri_path(wal_uri);
auto prefix = get_uri_path(wal_uri);
if (!std::filesystem::exists(prefix)) {
std::filesystem::create_directories(prefix);
}
Expand Down
1 change: 1 addition & 0 deletions flex/engines/graph_db/database/wal/local_wal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <memory>
#include <unordered_map>
#include "flex/engines/graph_db/database/wal/wal.h"
#include "flex/utils/service_utils.h"

namespace gs {

Expand Down
5 changes: 3 additions & 2 deletions flex/engines/graph_db/database/wal/wal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <dlfcn.h>
#include <memory>
#include <utility>
#include "flex/utils/service_utils.h"

#include <boost/algorithm/string.hpp>

Expand Down Expand Up @@ -54,7 +55,7 @@ void WalWriterFactory::Finalize() {}
std::unique_ptr<IWalWriter> WalWriterFactory::CreateWalWriter(
const std::string& wal_uri) {
auto& known_writers_ = getKnownWalWriters();
auto scheme = get_wal_uri_scheme(wal_uri);
auto scheme = get_uri_scheme(wal_uri);
auto iter = known_writers_.find(scheme);
if (iter != known_writers_.end()) {
return iter->second();
Expand Down Expand Up @@ -89,7 +90,7 @@ void WalParserFactory::Finalize() {}
std::unique_ptr<IWalParser> WalParserFactory::CreateWalParser(
const std::string& wal_uri) {
auto& know_parsers_ = getKnownWalParsers();
auto scheme = get_wal_uri_scheme(wal_uri);
auto scheme = get_uri_scheme(wal_uri);
auto iter = know_parsers_.find(scheme);
if (iter != know_parsers_.end()) {
return iter->second(wal_uri);
Expand Down
3 changes: 0 additions & 3 deletions flex/engines/graph_db/database/wal/wal.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,6 @@ struct UpdateWalUnit {
size_t size{0};
};

std::string get_wal_uri_scheme(const std::string& uri);
std::string get_wal_uri_path(const std::string& uri);

/**
* The interface of wal writer.
*/
Expand Down
12 changes: 9 additions & 3 deletions flex/engines/http_server/graph_db_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ServiceConfig::ServiceConfig()
start_compiler(false),
enable_gremlin(false),
enable_bolt(false),
metadata_store_type_(gs::MetadataStoreType::kLocalFile),
metadata_store_uri(DEFAULT_METADATA_STORE_URI),
log_level(DEFAULT_LOG_LEVEL),
verbose_level(DEFAULT_VERBOSE_LEVEL),
sharding_mode(DEFAULT_SHARDING_MODE),
Expand Down Expand Up @@ -132,8 +132,14 @@ void GraphDBService::init(const ServiceConfig& config) {
service_config_ = config;
gs::init_cpu_usage_watch();
if (config.start_admin_service) {
metadata_store_ = gs::MetadataStoreFactory::Create(
config.metadata_store_type_, WorkDirManipulator::GetWorkspace());
// instantiate the placeholder {WOKRSPACE} in metadata store uri.
auto metadata_store_uri = config.metadata_store_uri;
if (metadata_store_uri.find("{WORKSPACE}") != std::string::npos) {
metadata_store_uri =
std::regex_replace(metadata_store_uri, std::regex("\\{WORKSPACE\\}"),
WorkDirManipulator::GetWorkspace());
}
metadata_store_ = gs::MetadataStoreFactory::Create(metadata_store_uri);

auto res = metadata_store_->Open();
if (!res.ok()) {
Expand Down
26 changes: 11 additions & 15 deletions flex/engines/http_server/graph_db_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ struct ServiceConfig {
1024 * 1024 * 1024; // 1GB
static constexpr const char* DEFAULT_WAL_URI =
"{GRAPH_DATA_DIR}/wal"; // By default we will use the wal directory in
// the graph data directory. The {GRAPH_DATA_DIR}
// is a placeholder, which will be replaced by
// the actual graph data directory.
// the graph data directory. The {GRAPH_DATA_DIR}
// is a placeholder, which will be replaced by
// the actual graph data directory.
static constexpr const char* DEFAULT_METADATA_STORE_URI =
"{WORKSPACE}/METADATA"; // By default we will use the local file system
// as

// Those has default value
uint32_t bolt_port;
Expand All @@ -71,7 +74,7 @@ struct ServiceConfig {
bool start_compiler;
bool enable_gremlin;
bool enable_bolt;
gs::MetadataStoreType metadata_store_type_;
std::string metadata_store_uri;
// verbose log level. should be a int
// could also be set from command line: GLOG_v={}.
// If we found GLOG_v in the environment, we will at the first place.
Expand Down Expand Up @@ -262,17 +265,10 @@ struct convert<server::ServiceConfig> {

auto metadata_store_node = engine_node["metadata_store"];
if (metadata_store_node) {
auto metadata_store_type = metadata_store_node["type"];
if (metadata_store_type) {
auto metadata_store_type_str = metadata_store_type.as<std::string>();
if (metadata_store_type_str == "file") {
service_config.metadata_store_type_ =
gs::MetadataStoreType::kLocalFile;
} else {
LOG(ERROR) << "Unsupported metadata store type: "
<< metadata_store_type_str;
return false;
}
auto metadata_store_uri = metadata_store_node["uri"];
if (metadata_store_uri) {
service_config.metadata_store_uri =
metadata_store_uri.as<std::string>();
}
}
if (engine_node["wal_uri"]) {
Expand Down
9 changes: 9 additions & 0 deletions flex/storages/metadata/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@

file(GLOB_RECURSE METADATA_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc")
# Exclude etcd_metadata_store.cc, if BUILD_ETCD_METASTORE is off
if (NOT BUILD_ETCD_METASTORE)
list(REMOVE_ITEM METADATA_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/etcd_metadata_store.cc")
endif()
message(STATUS "metadata source files: ${METADATA_SRC_FILES}")

add_library(flex_metadata_store SHARED ${METADATA_SRC_FILES})
target_link_libraries(flex_metadata_store flex_utils)
if (BUILD_ETCD_METASTORE)
target_link_libraries(flex_metadata_store ${ETCD_CPP_LIBRARIES})
# message(STATUS "etcd cpp include dir: ${ETCD_CPP_INCLUDE_DIR}")
endif()
install_flex_target(flex_metadata_store)


Expand Down
Loading
Loading