From 805c90e8bdb3d970cbdbc8aa6c72f6f0ed4eaff9 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl@alibaba-inc.com" Date: Tue, 25 Feb 2025 11:36:16 +0800 Subject: [PATCH] add etcd meta store impl Committed-by: xiaolei.zl@alibaba-inc.com from Dev container Committed-by: xiaolei.zl@alibaba-inc.com from Dev container code refactor Committed-by: xiaolei.zl@alibaba-inc.com from Dev container Committed-by: xiaolei.zl@alibaba-inc.com from Dev container Committed-by: xiaolei.zl from Dev container --- .github/workflows/interactive.yml | 33 ++- .gitmodules | 6 + docs/flex/interactive/configuration.md | 1 + .../interactive/development/dev_and_test.md | 35 +++ flex/CMakeLists.txt | 19 ++ flex/cmake/BuildEtcdCpp.cmake | 35 +++ .../graph_db/database/wal/local_wal_parser.cc | 2 +- .../graph_db/database/wal/local_wal_parser.h | 1 + .../graph_db/database/wal/local_wal_writer.cc | 2 +- .../graph_db/database/wal/local_wal_writer.h | 1 + flex/engines/graph_db/database/wal/wal.cc | 5 +- flex/engines/graph_db/database/wal/wal.h | 3 - flex/engines/http_server/graph_db_service.cc | 12 +- flex/engines/http_server/graph_db_service.h | 26 +- flex/storages/metadata/CMakeLists.txt | 9 + flex/storages/metadata/etcd_metadata_store.cc | 264 ++++++++++++++++++ flex/storages/metadata/etcd_metadata_store.h | 124 ++++++++ .../metadata/metadata_store_factory.cc | 20 +- .../metadata/metadata_store_factory.h | 10 +- flex/tests/hqps/CMakeLists.txt | 7 + flex/tests/hqps/etcd_meta_test.cc | 104 +++++++ flex/tests/hqps/interactive_config_test.yaml | 2 +- flex/third_party/cpprestsdk | 1 + flex/third_party/etcd-cpp-apiv3 | 1 + flex/utils/service_utils.cc | 25 ++ flex/utils/service_utils.h | 3 + proto/error/interactive.proto | 4 + 27 files changed, 716 insertions(+), 39 deletions(-) create mode 100644 flex/cmake/BuildEtcdCpp.cmake create mode 100644 flex/storages/metadata/etcd_metadata_store.cc create mode 100644 flex/storages/metadata/etcd_metadata_store.h create mode 100644 flex/tests/hqps/etcd_meta_test.cc create mode 160000 flex/third_party/cpprestsdk create mode 160000 flex/third_party/etcd-cpp-apiv3 diff --git a/.github/workflows/interactive.yml b/.github/workflows/interactive.yml index dcd0526ee0c7..406a24e737cc 100644 --- a/.github/workflows/interactive.yml +++ b/.github/workflows/interactive.yml @@ -55,7 +55,7 @@ jobs: git submodule update --init cd ${GITHUB_WORKSPACE}/flex mkdir build && cd build - cmake .. -DCMAKE_INSTALL_PREFIX=/opt/graphscope -DCMAKE_BUILD_TYPE=DEBUG && sudo make -j$(nproc) + cmake .. -DCMAKE_INSTALL_PREFIX=/opt/graphscope -DCMAKE_BUILD_TYPE=DEBUG -DBUILD_ETCD_METASTORE=ON && sudo make -j$(nproc) # package the build artifacts cd .. && tar -zcf build.tar.gz build @@ -186,6 +186,22 @@ jobs: cd ${GITHUB_WORKSPACE}/flex/tests/hqps bash hqps_admin_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./interactive_config_test.yaml ${GS_TEST_DIR} + # test admin service with etcd metastore + GOOGLE_URL=https://storage.googleapis.com/etcd + GITHUB_URL=https://github.com/etcd-io/etcd/releases/download + DOWNLOAD_URL=${GOOGLE_URL} + rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test + curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz + tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test + /tmp/etcd-download-test/etcd & + sleep 3 + + cd ${GITHUB_WORKSPACE}/flex/tests/hqps + python3 -c 'import yaml;f=open("./interactive_config_test.yaml");y=yaml.safe_load(f);y["compute_engine"]["metadata_store"]["uri"] = "http://localhost:2379"; f.close();f=open("./interactive_config_test_etcd.yaml","w");yaml.dump(y,f);f.close()' + bash hqps_admin_test.sh ${TMP_INTERACTIVE_WORKSPACE} ./interactive_config_test_etcd.yaml ${GS_TEST_DIR} + rm ./interactive_config_test_etcd.yaml + + - name: Build and test Interactive Java/Python SDK env: FLEX_DATA_DIR: ${{ github.workspace }}/flex/interactive/examples/modern_graph @@ -483,7 +499,7 @@ jobs: git clone --single-branch https://github.com/alibaba/libgrape-lite.git /tmp/libgrape-lite cd /tmp/libgrape-lite mkdir -p build && cd build - cmake .. + cmake .. -BUILD_ETCD_METASTORE=ON make -j$(nproc) make install @@ -649,3 +665,16 @@ jobs: BULK_LOAD_FILE=${GITHUB_WORKSPACE}/flex/interactive/examples/modern_graph/bulk_load.yaml sed -i 's/|/\\t/g' ${BULK_LOAD_FILE} GLOG_v=10 ./bin/bulk_loader -g ${SCHEMA_FILE} -l ${BULK_LOAD_FILE} -d /tmp/csr-data-dir/ + + - name: Test etcd-based metadata service + run: | + GOOGLE_URL=https://storage.googleapis.com/etcd + GITHUB_URL=https://github.com/etcd-io/etcd/releases/download + DOWNLOAD_URL=${GOOGLE_URL} + rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test + curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz + tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test + /tmp/etcd-download-test/etcd & + sleep 3 + + GLOG_v=10 ./tests/hqps/etcd_meta_test http://localhost:2379/ \ No newline at end of file diff --git a/.gitmodules b/.gitmodules index 1203c4c4d451..39f856ed0bf2 100644 --- a/.gitmodules +++ b/.gitmodules @@ -13,3 +13,9 @@ [submodule "flex/third_party/parallel-hashmap"] path = flex/third_party/parallel-hashmap url = https://github.com/greg7mdp/parallel-hashmap.git +[submodule "flex/third_party/etcd-cpp-apiv3"] + path = flex/third_party/etcd-cpp-apiv3 + url = https://github.com/etcd-cpp-apiv3/etcd-cpp-apiv3.git +[submodule "flex/third_party/cpprestsdk"] + path = flex/third_party/cpprestsdk + url = https://github.com/microsoft/cpprestsdk.git diff --git a/docs/flex/interactive/configuration.md b/docs/flex/interactive/configuration.md index 61c6526d3ae9..02cc752da070 100644 --- a/docs/flex/interactive/configuration.md +++ b/docs/flex/interactive/configuration.md @@ -116,6 +116,7 @@ In this following table, we use the `.` notation to represent the hierarchy with | verbose_level | 0 | The verbose level of database log, should be a int | 0.0.3 | | compute_engine.thread_num_per_worker | 1 | The number of threads will be used to process the queries. Increase the number can benefit the query throughput | 0.0.1 | | compute_engine.wal_uri | file://{GRAPH_DATA_DIR}/wal | The location where Interactive will store and access WALs. `GRAPH_DATA_DIR` is a placeholder that will be populated by Interactive. | 0.5 | +| compute_engine.metadata_store.uri | file://{WORKSPACE}/METADATA | The location where Interactive will store and access metadatas. `WORKSPACE` is a placeholder that will be populated by Interactive. | 0.5 | | compiler.planner.is_on | true | Determines if query optimization is enabled for compiling Cypher queries | 0.0.1 | | compiler.planner.opt | RBO | Specifies the optimizer to be used for query optimization. Currently, only the Rule-Based Optimizer (RBO) is supported | 0.0.1 | | compiler.planner.rules.FilterMatchRule | N/A | An optimization rule that pushes filter (`Where`) conditions into the `Match` clause | 0.0.1 | diff --git a/docs/flex/interactive/development/dev_and_test.md b/docs/flex/interactive/development/dev_and_test.md index e831f00e9591..a31457982021 100644 --- a/docs/flex/interactive/development/dev_and_test.md +++ b/docs/flex/interactive/development/dev_and_test.md @@ -262,3 +262,38 @@ In Interactive's execution engine, transactions such as `ReadTransaction`, `Upda 2. If a transaction returns `false` during the `commit()` process, the error occurred prior to applying the WAL to the graph data. This type of failure could arise during the construction of the WAL or during its writing phase. 3. It is important to note that errors can still occur when replaying the WAL to the graph database. Replaying might fail due to limitations in resources or due to unforeseen bugs. **However,** any errors encountered during this stage will be handled via exceptions or may result in process failure. Currently, there is no established mechanism to handle such failures. Future improvements should focus on implementing failover strategies, potentially allowing the GraphDB to continue replaying the WAL until it succeeds. + + +## Using ETCD as Metadata storage + +```bash +ETCD_VER=v3.4.13 + +# choose either URL +GOOGLE_URL=https://storage.googleapis.com/etcd +GITHUB_URL=https://github.com/etcd-io/etcd/releases/download +DOWNLOAD_URL=${GOOGLE_URL} + +rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz +rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test + +curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz +tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/etcd-download-test --strip-components=1 +rm -f /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz + +cd /tmp/etcd-download-test +# You may find etcd and etcdctl in the directory +``` + +Start a local etcd server + +```bash +cd /tmp/etcd-download-test +etcd +``` + +Now in another terminal test etcd metadata store. +```bash +cd GraphScope/flex/build +./tests/hqps/etcd_meta_test http://localhost:2379 +``` \ No newline at end of file diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index 2c27898a9e8e..f1558822a613 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -17,6 +17,7 @@ option(USE_PTHASH "Whether to use pthash" OFF) option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host option(USE_STATIC_ARROW "Whether to use static arrow" OFF) # Whether to link arrow statically, default is OFF option(BUILD_WITH_OTEL "Whether to build with opentelemetry-cpp" OFF) # Whether to build with opentelemetry-cpp, default is OFF +option(BUILD_ETCD_METASTORE "Whether to build with etcd metastore" OFF) # Whether to build with etcd metastore, default is OFF #print options message(STATUS "Build test: ${BUILD_TEST}") @@ -226,6 +227,24 @@ macro(install_without_export_flex_target target) ) endmacro() +if (BUILD_ETCD_METASTORE) + add_definitions(-DBUILD_ETCD_METASTORE) + # cd to third_party/cpprestsdk and run git submodule update --init + include("cmake/BuildEtcdCpp.cmake") + # set(WERROR OFF CACHE BOOL "Treat warnings as errors") + # set(CPPREST_EXCLUDE_WEBSOCKETS ON CACHE BOOL "Exclude websockets functionality..") + # add_subdirectory(third_party/cpprestsdk EXCLUDE_FROM_ALL) + # set(CPPREST_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cpprestsdk/Release/include) + # set(CPPREST_LIB cpprest) + # add_subdirectory(third_party/etcd-cpp-apiv3 EXCLUDE_FROM_ALL) + include_directories(SYSTEM ${CPPREST_INCLUDE_DIR}) + # for each directory in ${ETCD_CPP_INCLUDE_DIR} + foreach(dir ${ETCD_CPP_INCLUDE_DIR}) + message(STATUS "Include directory: ${dir}") + include_directories(SYSTEM ${dir}) + endforeach() +endif() + add_subdirectory(utils) add_subdirectory(codegen) add_subdirectory(storages) diff --git a/flex/cmake/BuildEtcdCpp.cmake b/flex/cmake/BuildEtcdCpp.cmake new file mode 100644 index 000000000000..b8992f55b099 --- /dev/null +++ b/flex/cmake/BuildEtcdCpp.cmake @@ -0,0 +1,35 @@ +# Copyright 2020-2023 Alibaba Group Holding Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This File is copied from https://github.com/v6d-io/v6d/blob/main/cmake/BuildEtcdCpp.cmake +# build cpprestsdk +set(WERROR OFF CACHE BOOL "Treat warnings as errors") +set(BUILD_TESTS OFF CACHE BOOL "Build tests.") +set(BUILD_SAMPLES OFF CACHE BOOL "Build sample applications.") +set(CPPREST_EXCLUDE_WEBSOCKETS ON CACHE BOOL "Exclude websockets functionality..") +add_subdirectory(third_party/cpprestsdk) +set(CPPREST_INCLUDE_DIR flex/third_party/cpprestsdk/Release/include) +set(CPPREST_LIB cpprest) + +# disable a warning message inside cpprestsdk on Mac with llvm/clang +if(W_NO_UNUSED_BUT_SET_PARAMETER) + target_compile_options(cpprest PRIVATE -Wno-unused-but-set-parameter) +endif() + +# build etcd-cpp-apiv3 +add_subdirectory(third_party/etcd-cpp-apiv3) +set(ETCD_CPP_LIBRARIES etcd-cpp-api) +set(ETCD_CPP_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/third_party/etcd-cpp-apiv3/ + ${PROJECT_BINARY_DIR}/third_party/etcd-cpp-apiv3/proto/gen + ${PROJECT_BINARY_DIR}/third_party/etcd-cpp-apiv3/proto/gen/proto) diff --git a/flex/engines/graph_db/database/wal/local_wal_parser.cc b/flex/engines/graph_db/database/wal/local_wal_parser.cc index 6ac4147d8a6b..0e6f965c98cb 100644 --- a/flex/engines/graph_db/database/wal/local_wal_parser.cc +++ b/flex/engines/graph_db/database/wal/local_wal_parser.cc @@ -28,7 +28,7 @@ LocalWalParser::LocalWalParser(const std::string& wal_uri) } void LocalWalParser::open(const std::string& wal_uri) { - auto wal_dir = get_wal_uri_path(wal_uri); + auto wal_dir = get_uri_path(wal_uri); if (!std::filesystem::exists(wal_dir)) { std::filesystem::create_directory(wal_dir); } diff --git a/flex/engines/graph_db/database/wal/local_wal_parser.h b/flex/engines/graph_db/database/wal/local_wal_parser.h index 409d3cb27246..1d5ec5851893 100644 --- a/flex/engines/graph_db/database/wal/local_wal_parser.h +++ b/flex/engines/graph_db/database/wal/local_wal_parser.h @@ -18,6 +18,7 @@ #include #include "flex/engines/graph_db/database/wal/wal.h" +#include "flex/utils/service_utils.h" namespace gs { diff --git a/flex/engines/graph_db/database/wal/local_wal_writer.cc b/flex/engines/graph_db/database/wal/local_wal_writer.cc index 66327b1bfa05..38bbd00c075a 100644 --- a/flex/engines/graph_db/database/wal/local_wal_writer.cc +++ b/flex/engines/graph_db/database/wal/local_wal_writer.cc @@ -26,7 +26,7 @@ std::unique_ptr LocalWalWriter::Make() { } void LocalWalWriter::open(const std::string& wal_uri, int thread_id) { - auto prefix = get_wal_uri_path(wal_uri); + auto prefix = get_uri_path(wal_uri); if (!std::filesystem::exists(prefix)) { std::filesystem::create_directories(prefix); } diff --git a/flex/engines/graph_db/database/wal/local_wal_writer.h b/flex/engines/graph_db/database/wal/local_wal_writer.h index c574facea14e..26aef0052793 100644 --- a/flex/engines/graph_db/database/wal/local_wal_writer.h +++ b/flex/engines/graph_db/database/wal/local_wal_writer.h @@ -19,6 +19,7 @@ #include #include #include "flex/engines/graph_db/database/wal/wal.h" +#include "flex/utils/service_utils.h" namespace gs { diff --git a/flex/engines/graph_db/database/wal/wal.cc b/flex/engines/graph_db/database/wal/wal.cc index 14d9c147651d..b7ec6355158e 100644 --- a/flex/engines/graph_db/database/wal/wal.cc +++ b/flex/engines/graph_db/database/wal/wal.cc @@ -17,6 +17,7 @@ #include #include #include +#include "flex/utils/service_utils.h" #include @@ -54,7 +55,7 @@ void WalWriterFactory::Finalize() {} std::unique_ptr WalWriterFactory::CreateWalWriter( const std::string& wal_uri) { auto& known_writers_ = getKnownWalWriters(); - auto scheme = get_wal_uri_scheme(wal_uri); + auto scheme = get_uri_scheme(wal_uri); auto iter = known_writers_.find(scheme); if (iter != known_writers_.end()) { return iter->second(); @@ -89,7 +90,7 @@ void WalParserFactory::Finalize() {} std::unique_ptr WalParserFactory::CreateWalParser( const std::string& wal_uri) { auto& know_parsers_ = getKnownWalParsers(); - auto scheme = get_wal_uri_scheme(wal_uri); + auto scheme = get_uri_scheme(wal_uri); auto iter = know_parsers_.find(scheme); if (iter != know_parsers_.end()) { return iter->second(wal_uri); diff --git a/flex/engines/graph_db/database/wal/wal.h b/flex/engines/graph_db/database/wal/wal.h index 2702c3ec6ddd..86062d385433 100644 --- a/flex/engines/graph_db/database/wal/wal.h +++ b/flex/engines/graph_db/database/wal/wal.h @@ -48,9 +48,6 @@ struct UpdateWalUnit { size_t size{0}; }; -std::string get_wal_uri_scheme(const std::string& uri); -std::string get_wal_uri_path(const std::string& uri); - /** * The interface of wal writer. */ diff --git a/flex/engines/http_server/graph_db_service.cc b/flex/engines/http_server/graph_db_service.cc index eb27be41b1e4..cd63a0b6e06d 100644 --- a/flex/engines/http_server/graph_db_service.cc +++ b/flex/engines/http_server/graph_db_service.cc @@ -45,7 +45,7 @@ ServiceConfig::ServiceConfig() start_compiler(false), enable_gremlin(false), enable_bolt(false), - metadata_store_type_(gs::MetadataStoreType::kLocalFile), + metadata_store_uri(DEFAULT_METADATA_STORE_URI), log_level(DEFAULT_LOG_LEVEL), verbose_level(DEFAULT_VERBOSE_LEVEL), sharding_mode(DEFAULT_SHARDING_MODE), @@ -132,8 +132,14 @@ void GraphDBService::init(const ServiceConfig& config) { service_config_ = config; gs::init_cpu_usage_watch(); if (config.start_admin_service) { - metadata_store_ = gs::MetadataStoreFactory::Create( - config.metadata_store_type_, WorkDirManipulator::GetWorkspace()); + // instantiate the placeholder {WOKRSPACE} in metadata store uri. + auto metadata_store_uri = config.metadata_store_uri; + if (metadata_store_uri.find("{WORKSPACE}") != std::string::npos) { + metadata_store_uri = + std::regex_replace(metadata_store_uri, std::regex("\\{WORKSPACE\\}"), + WorkDirManipulator::GetWorkspace()); + } + metadata_store_ = gs::MetadataStoreFactory::Create(metadata_store_uri); auto res = metadata_store_->Open(); if (!res.ok()) { diff --git a/flex/engines/http_server/graph_db_service.h b/flex/engines/http_server/graph_db_service.h index 0689cf1003ba..c6bd43ad3a42 100644 --- a/flex/engines/http_server/graph_db_service.h +++ b/flex/engines/http_server/graph_db_service.h @@ -51,9 +51,12 @@ struct ServiceConfig { 1024 * 1024 * 1024; // 1GB static constexpr const char* DEFAULT_WAL_URI = "{GRAPH_DATA_DIR}/wal"; // By default we will use the wal directory in - // the graph data directory. The {GRAPH_DATA_DIR} - // is a placeholder, which will be replaced by - // the actual graph data directory. + // the graph data directory. The {GRAPH_DATA_DIR} + // is a placeholder, which will be replaced by + // the actual graph data directory. + static constexpr const char* DEFAULT_METADATA_STORE_URI = + "{WORKSPACE}/METADATA"; // By default we will use the local file system + // as // Those has default value uint32_t bolt_port; @@ -71,7 +74,7 @@ struct ServiceConfig { bool start_compiler; bool enable_gremlin; bool enable_bolt; - gs::MetadataStoreType metadata_store_type_; + std::string metadata_store_uri; // verbose log level. should be a int // could also be set from command line: GLOG_v={}. // If we found GLOG_v in the environment, we will at the first place. @@ -262,17 +265,10 @@ struct convert { auto metadata_store_node = engine_node["metadata_store"]; if (metadata_store_node) { - auto metadata_store_type = metadata_store_node["type"]; - if (metadata_store_type) { - auto metadata_store_type_str = metadata_store_type.as(); - if (metadata_store_type_str == "file") { - service_config.metadata_store_type_ = - gs::MetadataStoreType::kLocalFile; - } else { - LOG(ERROR) << "Unsupported metadata store type: " - << metadata_store_type_str; - return false; - } + auto metadata_store_uri = metadata_store_node["uri"]; + if (metadata_store_uri) { + service_config.metadata_store_uri = + metadata_store_uri.as(); } } if (engine_node["wal_uri"]) { diff --git a/flex/storages/metadata/CMakeLists.txt b/flex/storages/metadata/CMakeLists.txt index dc2dcfd657b5..69391bf60ef9 100644 --- a/flex/storages/metadata/CMakeLists.txt +++ b/flex/storages/metadata/CMakeLists.txt @@ -1,8 +1,17 @@ file(GLOB_RECURSE METADATA_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") +# Exclude etcd_metadata_store.cc, if BUILD_ETCD_METASTORE is off +if (NOT BUILD_ETCD_METASTORE) + list(REMOVE_ITEM METADATA_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/etcd_metadata_store.cc") +endif() +message(STATUS "metadata source files: ${METADATA_SRC_FILES}") add_library(flex_metadata_store SHARED ${METADATA_SRC_FILES}) target_link_libraries(flex_metadata_store flex_utils) +if (BUILD_ETCD_METASTORE) + target_link_libraries(flex_metadata_store ${ETCD_CPP_LIBRARIES}) + # message(STATUS "etcd cpp include dir: ${ETCD_CPP_INCLUDE_DIR}") +endif() install_flex_target(flex_metadata_store) diff --git a/flex/storages/metadata/etcd_metadata_store.cc b/flex/storages/metadata/etcd_metadata_store.cc new file mode 100644 index 000000000000..7c65c4b92050 --- /dev/null +++ b/flex/storages/metadata/etcd_metadata_store.cc @@ -0,0 +1,264 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/storages/metadata/etcd_metadata_store.h" +#include "flex/utils/result.h" + +namespace gs { + +// see: https://etcd.io/docs/v2.3/errorcode/ +StatusCode etcdCodeToStatusCode(int etcd_code) { + switch (etcd_code) { + case 0: + return StatusCode::OK; + case 100: + return StatusCode::META_KEY_NOT_FOUND; + default: + return StatusCode::UNKNOWN; + } +} + +#define TRT_PUT_ETCD_KEY_VALUE_TXN(client, key, value) \ + etcdv3::Transaction txn; \ + txn.setup_put(key, value); \ + auto txn_resp = client->txn(txn); \ + if (!txn_resp.is_ok()) { \ + return Status(etcdCodeToStatusCode(txn_resp.error_code()), \ + txn_resp.error_message()); \ + } + +#define FAILS_IF_KEY_EXISTS(client, key) \ + auto __res = client->get(key); \ + if (__res.is_ok()) { \ + return Status(StatusCode::META_KEY_ALREADY_EXIST, "Key exists"); \ + } \ + if (__res.error_code() != 100) { \ + return Status(etcdCodeToStatusCode(__res.error_code()), \ + __res.error_message()); \ + } +#define FAILS_IF_KEY_NOT_EXISTS(client, key) \ + auto __res = client->get(key); \ + if (!__res.is_ok()) { \ + return Status(etcdCodeToStatusCode(__res.error_code()), \ + __res.error_message()); \ + } \ + if (__res.value().as_string().empty()) { \ + return Status(StatusCode::META_KEY_NOT_FOUND, "Key not found"); \ + } + +std::pair extractBaseUrlAndMetaRootUri( + const std::string& uri) { + VLOG(10) << "Extracting base URL and meta root URI from: " << uri; + // Find the position of the scheme (http:// or https://) + size_t schemeEnd = uri.find("://"); + if (schemeEnd == std::string::npos) { + return std::make_pair("", ""); // Invalid URI + } + // The scheme should be http or https + if (uri.compare(0, schemeEnd, "http") != 0 && + uri.compare(0, schemeEnd, "https") != 0) { + LOG(ERROR) << "The scheme should be http or https: " + << uri.substr(0, schemeEnd); + return std::make_pair("", ""); + } + + // Find the position of the next slash after the scheme + size_t pathStart = uri.find('/', schemeEnd + 3); // +3 to move past "://" + + if (pathStart == std::string::npos) { + // No path found, return the full URI up to the scheme + return std::make_pair(uri, ""); + } + + // Extract the base URL + return std::make_pair(uri.substr(0, pathStart), uri.substr(pathStart)); +} + +Result ETCDMetadataStore::Open() { return true; } + +Result ETCDMetadataStore::Close() { return true; } + +// Insert the value without specifying the key, so we need to generate the key +// by ourself. Suppose we are using prefx_/{meta_key}/cur_id to store the +// current id of the meta. +Result ETCDMetadataStore::CreateMeta( + const meta_kind_t& meta_kind, const meta_value_t& value) { + meta_key_t meta_key; + ASSIGN_AND_RETURN_IF_RESULT_NOT_OK(meta_key, get_next_meta_key(meta_kind)); + VLOG(10) << "got next meta key: " << meta_key; + auto real_key = get_full_meta_key(meta_kind, meta_key); + FAILS_IF_KEY_EXISTS(client_, real_key); + TRT_PUT_ETCD_KEY_VALUE_TXN(client_, real_key, value); + return meta_key; +} + +Result ETCDMetadataStore::CreateMeta( + const meta_kind_t& meta_kind, const meta_key_t& key, + const meta_value_t& value) { + auto real_key = get_full_meta_key(meta_kind, key); + FAILS_IF_KEY_EXISTS(client_, real_key); + TRT_PUT_ETCD_KEY_VALUE_TXN(client_, real_key, value); + return key; +} + +Result ETCDMetadataStore::GetMeta( + const meta_kind_t& meta_kind, const meta_key_t& key) { + auto real_key = get_full_meta_key(meta_kind, key); + auto res = client_->get(real_key); + if (!res.is_ok()) { + return Result( + Status(etcdCodeToStatusCode(res.error_code()), res.error_message())); + } + return res.value().as_string(); +} + +Result>> +ETCDMetadataStore::GetAllMeta(const meta_kind_t& meta_kind) { + // List all key-value pair under directory preifx_ + "/" + meta_kind + auto res = client_->ls(get_full_meta_key(meta_kind, "")); + if (!res.is_ok()) { + return Result>>( + Status(etcdCodeToStatusCode(res.error_code()), res.error_message())); + } + std::vector> result; + for (size_t i = 0; i < res.keys().size(); ++i) { + result.push_back( + std::make_pair(res.keys()[i], res.values()[i].as_string())); + } + return result; +} + +Result ETCDMetadataStore::DeleteMeta(const meta_kind_t& meta_kind, + const meta_key_t& key) { + etcdv3::Transaction txn; + txn.setup_delete(get_full_meta_key(meta_kind, key)); + auto res = client_->txn(txn); + if (!res.is_ok()) { + return Result( + Status(etcdCodeToStatusCode(res.error_code()), res.error_message())); + } + return true; +} + +Result ETCDMetadataStore::DeleteAllMeta(const meta_kind_t& meta_kind) { + // auto res = client_->rmdir(get_full_meta_key(meta_kind, ""), true); + etcdv3::Transaction txn; + txn.setup_delete(get_full_meta_key(meta_kind, ""), "", true); + auto res = client_->txn(txn); + if (!res.is_ok()) { + if (res.error_code() == 100) { // Key not found + return true; + } + return Result( + Status(etcdCodeToStatusCode(res.error_code()), res.error_message())); + } + return true; +} + +Result ETCDMetadataStore::UpdateMeta(const meta_kind_t& meta_kind, + const meta_key_t& key, + const meta_value_t& value) { + auto real_key = get_full_meta_key(meta_kind, key); + FAILS_IF_KEY_NOT_EXISTS(client_, real_key); + auto result = initOrUpdateValue( + real_key, value, [&value](const std::string& input) { return value; }); + if (!result.ok()) { + return result.status(); + } + return true; +} + +Result ETCDMetadataStore::UpdateMeta( + const meta_kind_t& meta_kind, const meta_key_t& key, + ETCDMetadataStore::update_func_t update_func) { + auto real_key = get_full_meta_key(meta_kind, key); + FAILS_IF_KEY_NOT_EXISTS(client_, real_key); + auto result = + initOrUpdateValue(real_key, "", [update_func](const std::string& input) { + auto res = update_func(input); + if (!res.ok()) { + LOG(ERROR) << "Failed to update meta: " + << res.status().error_message(); + return input; + } else { + return res.value(); + } + }); + if (!result.ok()) { + return result.status(); + } + return true; +} + +Result ETCDMetadataStore::initOrUpdateValue( + const std::string& key, const std::string& initial_value, + std::function update_func) { + auto value = client_->get(key); + etcdv3::Transaction txn; + std::string real_value; + + if (value.is_ok()) { + // do fetch_add + real_value = value.value().as_string(); + int32_t max_retry = 10; + while (true && max_retry--) { + txn.setup_compare_and_swap(key, real_value, update_func(real_value)); + etcd::Response resp = client_->txn(txn); + if (resp.is_ok()) { + break; + } + if (max_retry == 0) { + return Status(etcdCodeToStatusCode(resp.error_code()), + "Failed to update key:" + resp.error_message()); + } + real_value = stoi(resp.value().as_string()); + } + } else { + // do put + if (initial_value.empty()) { + return Status(StatusCode::ILLEGAL_OPERATION, "Initial value is empty"); + } + txn.setup_put(key, initial_value); + auto txn_resp = client_->txn(txn); + if (!txn_resp.is_ok()) { + return Status(etcdCodeToStatusCode(txn_resp.error_code()), + "Failed to initialize key:" + txn_resp.error_message()); + } + } + return client_->get(key).value().as_string(); +} + +/** + * @brief Get the next meta key for the given meta kind. + * We store a current id for each meta kind, and the next meta key is the + * current id plus one. + * + * There is no synchronization in this function, so it should be called in a + * synchronized context. + */ +Result ETCDMetadataStore::get_next_meta_key( + const std::string& meta_kind) { + std::string cur_id_key = get_full_meta_key("META_" + meta_kind, CUR_ID_KEY); + return initOrUpdateValue(cur_id_key, "1", [](const std::string& value) { + return std::to_string(std::stoi(value) + 1); + }); +} + +std::string ETCDMetadataStore::get_full_meta_key(const std::string& meta_kind, + const std::string& key) { + return prefix_ + "/" + meta_kind + "/" + key; +} +} // namespace gs diff --git a/flex/storages/metadata/etcd_metadata_store.h b/flex/storages/metadata/etcd_metadata_store.h new file mode 100644 index 000000000000..67a1f73d0245 --- /dev/null +++ b/flex/storages/metadata/etcd_metadata_store.h @@ -0,0 +1,124 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLEX_STORAGES_METADATA_ETCD_METADATA_STORE_H_ +#define FLEX_STORAGES_METADATA_ETCD_METADATA_STORE_H_ + +#include "flex/storages/metadata/i_meta_store.h" +#include "flex/third_party/etcd-cpp-apiv3/etcd/Client.hpp" +#include "flex/third_party/etcd-cpp-apiv3/etcd/v3/Transaction.hpp" +#include "flex/utils/service_utils.h" + +#include + +namespace gs { + +std::pair extractBaseUrlAndMetaRootUri( + const std::string& uri); +/** + * @brief ETCDMetadataStore is a concrete implementation of MetadataStore, + * which stores metadata via ETCD + */ +class ETCDMetadataStore : public IMetaStore { + public: + using meta_key_t = IMetaStore::meta_key_t; + using meta_value_t = IMetaStore::meta_value_t; + using meta_kind_t = IMetaStore::meta_kind_t; + + static constexpr const char* CUR_ID_KEY = "CUR_ID"; + + ETCDMetadataStore(const std::string& path) { + // Expect the path is like http://ip:port/uri/path, extract the part after + // http://ip:port/ + std::string base_uri; + std::tie(base_uri, prefix_) = extractBaseUrlAndMetaRootUri(path); + VLOG(10) << "ETCD base URI: " << base_uri + << ", meta base path: " << prefix_; + client_ = std::make_unique(base_uri); + } + + ~ETCDMetadataStore() { Close(); } + + Result Open() override; + + Result Close() override; + + /* + * @brief Create a meta with a new key. + * @param meta_kind The kind of meta. + * @param value The value of the meta. + * @return The key of the meta. + */ + Result CreateMeta(const meta_kind_t& meta_kind, + const meta_value_t& value) override; + + /* + * @brief Create a meta with a specific key. + * @param meta_kind The kind of meta. + * @param key The key of the meta. + * @param value The value of the meta. + * @return If the meta is created successfully. + */ + Result CreateMeta(const meta_kind_t& meta_kind, + const meta_key_t& key, + const meta_value_t& value) override; + + Result GetMeta(const meta_kind_t& meta_kind, + const meta_key_t& key) override; + + Result>> GetAllMeta( + const meta_kind_t& meta_kind) override; + + Result DeleteMeta(const meta_kind_t& meta_kind, + const meta_key_t& key) override; + + Result DeleteAllMeta(const meta_kind_t& meta_kind) override; + + /* + * @brief Update the meta with a specific key, regardless of the original + * value. + * @param meta_kind The kind of meta. + * @param key The key of the meta. + * @param value The new value of the meta. + * @return If the meta is updated successfully. + */ + Result UpdateMeta(const meta_kind_t& meta_kind, const meta_key_t& key, + const meta_value_t& value) override; + + /** + * @brief Update the meta with a specific key, based on the original value. + * @param meta_kind The kind of meta. + * @param key The key of the meta. + * @param update_func The function to update the meta. + * @return If the meta is updated successfully. + */ + Result UpdateMeta(const meta_kind_t& meta_kind, const meta_key_t& key, + update_func_t update_func) override; + + private: + Result get_next_meta_key(const std::string& meta_kind); + std::string get_full_meta_key(const std::string& meta_kind, + const std::string& key); + Result initOrUpdateValue( + const std::string& key, const std::string& initial_value, + std::function update_func); + + std::unique_ptr client_; + std::string prefix_; + // std::mutex meta_mutex_; +}; +} // namespace gs + +#endif // FLEX_STORAGES_METADATA_ETCD_METADATA_STORE_H_ \ No newline at end of file diff --git a/flex/storages/metadata/metadata_store_factory.cc b/flex/storages/metadata/metadata_store_factory.cc index 86dc69f0433a..3ccab631d772 100644 --- a/flex/storages/metadata/metadata_store_factory.cc +++ b/flex/storages/metadata/metadata_store_factory.cc @@ -17,13 +17,21 @@ namespace gs { std::shared_ptr MetadataStoreFactory::Create( - MetadataStoreType type, const std::string& path) { - switch (type) { - case MetadataStoreType::kLocalFile: + const std::string& metadata_store_uri) { + auto scheme = get_uri_scheme(metadata_store_uri); + if (scheme == "file") { return std::make_shared( - std::make_unique(path)); - default: - LOG(FATAL) << "Unsupported metadata store type: " << static_cast(type); + std::make_unique( + get_uri_path(metadata_store_uri))); + } +#ifdef BUILD_ETCD_METASTORE + else if (scheme == "http") { // assume http uri must be etcd + return std::make_shared( + std::make_unique(metadata_store_uri)); + } +#endif + else { + LOG(FATAL) << "Unsupported metadata store type: " << scheme; } return nullptr; } diff --git a/flex/storages/metadata/metadata_store_factory.h b/flex/storages/metadata/metadata_store_factory.h index 84428c0e8eeb..7e1868876b2d 100644 --- a/flex/storages/metadata/metadata_store_factory.h +++ b/flex/storages/metadata/metadata_store_factory.h @@ -18,22 +18,22 @@ #include #include "flex/storages/metadata/default_graph_meta_store.h" +#ifdef BUILD_ETCD_METASTORE +#include "flex/storages/metadata/etcd_metadata_store.h" +#endif #include "flex/storages/metadata/graph_meta_store.h" #include "flex/storages/metadata/local_file_metadata_store.h" namespace gs { -enum class MetadataStoreType { - kLocalFile, -}; /** * @brief LoaderFactory is a factory class to create IFragmentLoader. * Support Using dynamically built library as plugin. */ class MetadataStoreFactory { public: - static std::shared_ptr Create(MetadataStoreType type, - const std::string& path); + static std::shared_ptr Create( + const std::string& metadata_store_uri); }; } // namespace gs diff --git a/flex/tests/hqps/CMakeLists.txt b/flex/tests/hqps/CMakeLists.txt index 117460944726..b7b838dbfcbb 100644 --- a/flex/tests/hqps/CMakeLists.txt +++ b/flex/tests/hqps/CMakeLists.txt @@ -2,10 +2,17 @@ # file(GLOB_RECURSE GS_TEST_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") file(GLOB GS_TEST_FILES RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") +if (NOT BUILD_ETCD_METASTORE) + list(REMOVE_ITEM GS_TEST_FILES "etcd_meta_test.cc") +endif() + foreach(f ${GS_TEST_FILES}) string(REGEX MATCH "^(.*)\\.[^.]*$" dummy ${f}) set(T_NAME ${CMAKE_MATCH_1}) message(STATUS "Found graphscope test - " ${T_NAME}) add_executable(${T_NAME} ${CMAKE_CURRENT_SOURCE_DIR}/${T_NAME}.cc) target_link_libraries(${T_NAME} flex_plan_proto flex_graph_db) + if (BUILD_ETCD_METASTORE AND T_NAME STREQUAL "etcd_meta_test") + target_link_libraries(${T_NAME} ${ETCD_CPP_LIBRARIES} flex_metadata_store) + endif() endforeach() diff --git a/flex/tests/hqps/etcd_meta_test.cc b/flex/tests/hqps/etcd_meta_test.cc new file mode 100644 index 000000000000..34c82f98445f --- /dev/null +++ b/flex/tests/hqps/etcd_meta_test.cc @@ -0,0 +1,104 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/storages/metadata/etcd_metadata_store.h" + +#include + +#define CHECK_OK(res) CHECK(res.ok()) << res.status().error_message() + +void test_meta_store(std::shared_ptr store) { + // 0. Remove all meta + auto res0 = store->DeleteAllMeta("graph"); + CHECK_OK(res0); + + // 1. Create meta and get + auto res1 = store->CreateMeta("graph", "graph_1"); + // check or print error message + CHECK_OK(res1); + auto res2 = store->CreateMeta("graph", "2", "graph_2"); + CHECK_OK(res2); + auto res3 = store->GetMeta("graph", res1.value()); + CHECK_OK(res3); + CHECK(res3.value() == "graph_1") << res3.value(); + auto res4 = store->GetMeta("graph", "2"); + CHECK_OK(res4); + CHECK(res4.value() == "graph_2") << res4.value(); + + // 2. Get all meta + auto res5 = store->GetAllMeta("graph"); + CHECK_OK(res5); + CHECK(res5.value().size() == 2) << res5.value().size(); + CHECK(res5.value()[0].second == "graph_1" && + res5.value()[1].second == "graph_2") + << res5.value()[0].second << res5.value()[1].second; + + // 3. Update Meta + auto res6 = store->UpdateMeta("graph", "1", "graph_1_updated"); + CHECK_OK(res6); + auto res7 = store->GetMeta("graph", "1"); + CHECK_OK(res7); + CHECK(res7.value() == "graph_1_updated"); + // Update with function + auto res8 = store->UpdateMeta("graph", "2", [](const std::string& value) { + return value + "_updated"; + }); + CHECK_OK(res8); + auto res9 = store->GetMeta("graph", "2"); + CHECK_OK(res9); + CHECK(res9.value() == "graph_2_updated"); + + // 4. Delete Meta + auto res10 = store->DeleteMeta("graph", "1"); + CHECK_OK(res10); + auto res11 = store->GetMeta("graph", "1"); + CHECK(!res11.ok()); + + // 5. Delete All Meta + auto res12 = store->DeleteAllMeta("graph"); + CHECK_OK(res12); + auto res13 = store->GetAllMeta("graph"); + CHECK_OK(res13); + CHECK(res13.value().size() == 0) << res13.value().size(); + auto res14 = store->GetMeta("graph", "2"); + CHECK(!res14.ok()); +} + +int main(int argc, char** argv) { + // Expect args etcd_meta_path. + if (argc != 2) { + LOG(ERROR) << "Usage: ./etcd_meta_test "; + return 1; + } + auto etcd_meta_path = std::string(argv[1]); + + // First delete all meta in etcd. + auto base_url_and_root_uri = gs::extractBaseUrlAndMetaRootUri(etcd_meta_path); + etcd::SyncClient client(base_url_and_root_uri.first); + client.rmdir(base_url_and_root_uri.second, true); + + std::shared_ptr store = + std::make_shared(etcd_meta_path); + auto res = store->Open(); + if (!res.ok()) { + LOG(ERROR) << "Failed to open etcd metadata store: " + << res.status().error_message(); + return 1; + } + test_meta_store(store); + + LOG(INFO) << "Finish etcd_meta test."; + return 0; +} diff --git a/flex/tests/hqps/interactive_config_test.yaml b/flex/tests/hqps/interactive_config_test.yaml index b27f35e45e13..d6eeb39e786c 100644 --- a/flex/tests/hqps/interactive_config_test.yaml +++ b/flex/tests/hqps/interactive_config_test.yaml @@ -9,7 +9,7 @@ compute_engine: store: type: cpp-mcsr metadata_store: - type: file # file/sqlite/etcd + uri: file://{WORKSPACE}/METADATA # Could be file://{WORKSPACE}/METADATA or other supported storage class. WORKSPACE is the placeholder for the workspace directory. wal_uri: file://{GRAPH_DATA_DIR}/wal # Could be file://{GRAPH_DATA_DIR}/wal or other supported storage class. GRAPH_DATA_DIR is the placeholder for the graph data directory. compiler: planner: diff --git a/flex/third_party/cpprestsdk b/flex/third_party/cpprestsdk new file mode 160000 index 000000000000..0b1ce318a757 --- /dev/null +++ b/flex/third_party/cpprestsdk @@ -0,0 +1 @@ +Subproject commit 0b1ce318a757bbfb89bdb0fffb61ca4e38dc3b33 diff --git a/flex/third_party/etcd-cpp-apiv3 b/flex/third_party/etcd-cpp-apiv3 new file mode 160000 index 000000000000..216b86f8d763 --- /dev/null +++ b/flex/third_party/etcd-cpp-apiv3 @@ -0,0 +1 @@ +Subproject commit 216b86f8d763acf88e4ed7265f983b57c12da2df diff --git a/flex/utils/service_utils.cc b/flex/utils/service_utils.cc index 955a7bfa0c88..f51cfdb0f4bb 100644 --- a/flex/utils/service_utils.cc +++ b/flex/utils/service_utils.cc @@ -17,6 +17,31 @@ namespace gs { +std::string get_uri_scheme(const std::string& uri) { + std::string scheme; + auto pos = uri.find("://"); + if (pos != std::string::npos) { + scheme = uri.substr(0, pos); + } + if (scheme.empty()) { + LOG(INFO) << "No scheme found in wal uri: " << uri + << ", using default scheme: file"; + scheme = "file"; + } + return scheme; +} + +std::string get_uri_path(const std::string& uri) { + std::string path; + auto pos = uri.find("://"); + if (pos != std::string::npos) { + path = uri.substr(pos + 3); + } else { + path = uri; + } + return path; +} + static unsigned long long lastTotalUser, lastTotalUserLow, lastTotalSys, lastTotalIdle; diff --git a/flex/utils/service_utils.h b/flex/utils/service_utils.h index feff3a7b9053..9f16951cd37e 100644 --- a/flex/utils/service_utils.h +++ b/flex/utils/service_utils.h @@ -45,6 +45,9 @@ namespace gs { static constexpr const char* CODEGEN_BIN = "load_plan_and_gen.sh"; +std::string get_uri_scheme(const std::string& uri); +std::string get_uri_path(const std::string& uri); + /// Util functions. inline void blockSignal(int sig) { diff --git a/proto/error/interactive.proto b/proto/error/interactive.proto index ccc5fd418fe4..60dfd4396c7e 100644 --- a/proto/error/interactive.proto +++ b/proto/error/interactive.proto @@ -72,6 +72,10 @@ enum Code { SQL_EXECUTION_ERROR = 109; SQL_BINDING_ERROR = 110; ALREADY_LOCKED = 111; + ERROR_CREATE_META = 112; + META_KEY_NOT_FOUND = 113; + META_KEY_ALREADY_EXIST = 114; + META_LOCK_FAILED = 115; UNKNOWN = 999; }