diff --git a/Cargo.lock b/Cargo.lock index a2bc9e0..d5ad722 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -90,6 +90,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.3" @@ -99,6 +111,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -116,9 +134,9 @@ dependencies = [ [[package]] name = "anstream" -version = "0.6.14" +version = "0.6.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" dependencies = [ "anstyle", "anstyle-parse", @@ -131,36 +149,36 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.7" +version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" [[package]] name = "anstyle-parse" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.1.0" +version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" dependencies = [ - "windows-sys 0.52.0", + "windows-sys", ] [[package]] name = "anstyle-wincon" -version = "3.0.3" +version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" dependencies = [ "anstyle", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -169,6 +187,12 @@ version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da" +[[package]] +name = "arrayvec" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" + [[package]] name = "asn1-rs" version = "0.6.1" @@ -241,6 +265,15 @@ dependencies = [ "syn", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "autocfg" version = "1.3.0" @@ -313,12 +346,42 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "339abbe78e73178762e23bea9dfd08e697eb3f3301cd4be981c0f78ba5859195" +dependencies = [ + "outref", + "vsimd", +] + [[package]] name = "binascii" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "383d29d513d8764dcdc42ea295d979eb99c3c9f00607b3692cf68a431f7dca72" +[[package]] +name = "bindgen" +version = "0.69.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a00dc851838a2120612785d195287475a3ac45514741da670b735818822129a0" +dependencies = [ + "bitflags 2.6.0", + "cexpr", + "clang-sys", + "itertools", + "lazy_static", + "lazycell", + "proc-macro2", + "quote", + "regex", + "rustc-hash", + "shlex", + "syn", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -331,6 +394,15 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "bp7" version = "0.1.0" @@ -362,11 +434,44 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +[[package]] +name = "bytestring" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74d80203ea6b29df88012294f62733de21cfeab47f17b41af3a38bc30a03ee72" +dependencies = [ + "bytes", +] + +[[package]] +name = "bzip2-sys" +version = "0.1.11+1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" +dependencies = [ + "cc", + "libc", + "pkg-config", +] + [[package]] name = "cc" -version = "1.1.4" +version = "1.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2aba8f4e9906c7ce3c73463f62a7f0c65183ada1a2d47e397cc8810827f9694f" +dependencies = [ + "jobserver", + "libc", +] + +[[package]] +name = "cexpr" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9711f33475c22aab363b05564a17d7b789bf3dfec5ebabb586adee56f0e271b5" +checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" +dependencies = [ + "nom", +] [[package]] name = "cfg-if" @@ -385,14 +490,25 @@ dependencies = [ "js-sys", "num-traits", "wasm-bindgen", - "windows-targets 0.52.6", + "windows-targets", +] + +[[package]] +name = "clang-sys" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" +dependencies = [ + "glob", + "libc", + "libloading", ] [[package]] name = "clap" -version = "4.5.9" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64acc1846d54c1fe936a78dc189c34e28d3f5afc348403f28ecf53660b9b8462" +checksum = "35723e6a11662c2afb578bcf0b88bf6ea8e21282a953428f240574fcc3a2b5b3" dependencies = [ "clap_builder", "clap_derive", @@ -400,9 +516,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.9" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fb8393d67ba2e7bfaf28a23458e4e2b543cc73a99595511eb207fdb8aede942" +checksum = "49eb96cbfa7cfa35017b7cd548c75b14c3118c98b423041d70562665e07fb0fa" dependencies = [ "anstream", "anstyle", @@ -412,9 +528,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.8" +version = "4.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bac35c6dafb060fd4d275d9a4ffae97917c13a6327903a8be2153cd964f7085" +checksum = "5d029b67f89d30bbb547c89fd5161293c0aec155fc691d7924b64550662db93e" dependencies = [ "heck", "proc-macro2", @@ -424,15 +540,28 @@ dependencies = [ [[package]] name = "clap_lex" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" [[package]] name = "colorchoice" -version = "1.0.1" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" +checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" + +[[package]] +name = "console-api" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic 0.10.2", + "tracing-core", +] [[package]] name = "console-api" @@ -443,8 +572,32 @@ dependencies = [ "futures-core", "prost", "prost-types", - "tonic", + "tonic 0.11.0", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" +dependencies = [ + "console-api 0.6.0", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic 0.10.2", + "tracing", "tracing-core", + "tracing-subscriber", ] [[package]] @@ -453,7 +606,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31c4cc54bae66f7d9188996404abdf7fdfa23034ef8e43478c8810828abad758" dependencies = [ - "console-api", + "console-api 0.7.0", "crossbeam-channel", "crossbeam-utils", "futures-task", @@ -466,7 +619,7 @@ dependencies = [ "thread_local", "tokio", "tokio-stream", - "tonic", + "tonic 0.11.0", "tracing", "tracing-core", "tracing-subscriber", @@ -478,6 +631,15 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +[[package]] +name = "cpufeatures" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53fe5e26ff1b7aef8bca9c6080520cfb8d9333c7568e1829cef191a9723e5504" +dependencies = [ + "libc", +] + [[package]] name = "crc32fast" version = "1.4.2" @@ -502,6 +664,16 @@ version = "0.8.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -531,6 +703,17 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", + "subtle", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -550,8 +733,8 @@ dependencies = [ "actix-rt", "async-stream", "bp7", - "console-subscriber", - "env_logger", + "console-subscriber 0.3.0", + "env_logger 0.11.5", "futures-util", "log", "openssl", @@ -560,7 +743,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tonic", + "tonic 0.11.0", "tonic-build", "url", "uuid", @@ -586,7 +769,7 @@ dependencies = [ "log", "maybe-async", "prost", - "tonic", + "tonic 0.11.0", "tonic-build", ] @@ -598,14 +781,27 @@ checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" [[package]] name = "env_filter" -version = "0.1.0" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea" +checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" dependencies = [ "log", "regex", ] +[[package]] +name = "env_logger" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "env_logger" version = "0.11.5" @@ -632,7 +828,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -687,6 +883,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -694,6 +905,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -702,6 +914,23 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + [[package]] name = "futures-macro" version = "0.3.30" @@ -731,15 +960,28 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -757,6 +999,12 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "h2" version = "0.3.26" @@ -793,6 +1041,10 @@ name = "hashbrown" version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] [[package]] name = "hdrhistogram" @@ -819,6 +1071,31 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "hex-simd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f7685beb53fc20efc2605f32f5d51e9ba18b8ef237961d1760169d2290d3bee" +dependencies = [ + "outref", + "vsimd", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "http" version = "0.2.12" @@ -948,11 +1225,22 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "is-terminal" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys", +] + [[package]] name = "is_terminal_polyfill" -version = "1.70.0" +version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" [[package]] name = "itertools" @@ -969,6 +1257,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jobserver" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.69" @@ -984,12 +1281,54 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "lazycell" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" + [[package]] name = "libc" version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "libloading" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" +dependencies = [ + "cfg-if", + "windows-targets", +] + +[[package]] +name = "librocksdb-sys" +version = "0.17.0+9.0.0" +source = "git+https://github.com/rust-rocksdb/rust-rocksdb#1710120e4549e04ba3baa6a1ee5a5a801fa45a72" +dependencies = [ + "bindgen", + "bzip2-sys", + "cc", + "glob", + "libc", + "libz-sys", + "lz4-sys", + "zstd-sys", +] + +[[package]] +name = "libz-sys" +version = "1.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c15da26e5af7e25c90b37a2d75cdbf940cf4a55316de9d84c679c9b8bfabf82e" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "linux-raw-sys" version = "0.4.14" @@ -1012,6 +1351,16 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +[[package]] +name = "lz4-sys" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109de74d5d2353660401699a4174a4ff23fcc649caf553df71933c7fb45ad868" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "matchers" version = "0.1.0" @@ -1038,6 +1387,16 @@ dependencies = [ "syn", ] +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.7.4" @@ -1067,13 +1426,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.11" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" +checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" dependencies = [ + "hermit-abi", "libc", "wasi", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -1092,6 +1452,25 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + +[[package]] +name = "nugine-rust-utils" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04dcd9cfa92246a9c7ca0671e00733c4e9d77ee1fa0ae08c9a181b7c8802aea2" +dependencies = [ + "simdutf8", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -1126,16 +1505,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi", - "libc", -] - [[package]] name = "num_enum" version = "0.7.2" @@ -1159,9 +1528,9 @@ dependencies = [ [[package]] name = "object" -version = "0.36.1" +version = "0.36.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "081b846d1d56ddfc18fdf1a922e4f6e07a11768ea1b92dec44e42b72712ccfce" +checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e" dependencies = [ "memchr", ] @@ -1183,9 +1552,9 @@ checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" [[package]] name = "openssl" -version = "0.10.65" +version = "0.10.66" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c2823eb4c6453ed64055057ea8bd416eda38c71018723869dd043a3b1186115e" +checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" dependencies = [ "bitflags 2.6.0", "cfg-if", @@ -1217,17 +1586,112 @@ dependencies = [ ] [[package]] -name = "openssl-sys" -version = "0.9.103" +name = "openssl-sys" +version = "0.9.103" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" +dependencies = [ + "cc", + "libc", + "openssl-src", + "pkg-config", + "vcpkg", +] + +[[package]] +name = "opentelemetry" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "900d57987be3f2aeb70d385fff9b27fb74c5723cc9a52d904d4f9c807a0667bf" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a016b8d9495c639af2145ac22387dcb88e44118e45320d9238fbf4e7889abcb" +dependencies = [ + "async-trait", + "futures-core", + "http", + "opentelemetry", + "opentelemetry-proto", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "prost", + "thiserror", + "tokio", + "tonic 0.11.0", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic 0.11.0", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" + +[[package]] +name = "opentelemetry_sdk" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e90c7113be649e31e9a0f8b5ee24ed7a16923b322c3c5ab6367469c049d6b7e" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "ordered-float", + "percent-encoding", + "rand", + "thiserror", + "tokio", + "tokio-stream", +] + +[[package]] +name = "ordered-float" +version = "4.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ff2cf528c6c03d9ed653d6c4ce1dc0582dc4af309790ad92f07c1cd551b0be" +dependencies = [ + "num-traits", +] + +[[package]] +name = "outref" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" + +[[package]] +name = "overload" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" -dependencies = [ - "cc", - "libc", - "openssl-src", - "pkg-config", - "vcpkg", -] +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "parking_lot" @@ -1249,7 +1713,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets 0.52.6", + "windows-targets", ] [[package]] @@ -1399,6 +1863,16 @@ dependencies = [ "prost", ] +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.36" @@ -1440,9 +1914,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.2" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" +checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" dependencies = [ "bitflags 2.6.0", ] @@ -1491,12 +1965,62 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "replistore" +version = "0.1.0" +dependencies = [ + "actix", + "actix-rt", + "anyhow", + "async-trait", + "bytes", + "console-subscriber 0.2.0", + "dtrd_client", + "env_logger 0.10.2", + "futures", + "futures-util", + "hex", + "hyper", + "log", + "md-5", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", + "prost", + "prost-build", + "rocksdb", + "s3s", + "sha2", + "time", + "tokio", + "tokio-util", + "tracing", + "tracing-opentelemetry", + "tracing-subscriber", + "uuid", +] + +[[package]] +name = "rocksdb" +version = "0.22.0" +source = "git+https://github.com/rust-rocksdb/rust-rocksdb#1710120e4549e04ba3baa6a1ee5a5a801fa45a72" +dependencies = [ + "libc", + "librocksdb-sys", +] + [[package]] name = "rustc-demangle" version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rusticata-macros" version = "4.1.0" @@ -1516,7 +2040,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -1531,6 +2055,46 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" +[[package]] +name = "s3s" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17419b26b83f810a49eb1087213e889da16b446f9cf5149aaad777c0b1e5f56f" +dependencies = [ + "arrayvec", + "async-trait", + "atoi", + "base64-simd", + "bytes", + "bytestring", + "chrono", + "crc32fast", + "futures", + "hex-simd", + "hmac", + "http-body", + "httparse", + "hyper", + "itoa", + "memchr", + "mime", + "nom", + "nugine-rust-utils", + "pin-project-lite", + "quick-xml", + "serde", + "serde_urlencoded", + "sha1", + "sha2", + "smallvec", + "thiserror", + "time", + "tracing", + "transform-stream", + "urlencoding", + "zeroize", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1589,6 +2153,40 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "sha1" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + +[[package]] +name = "sha2" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -1598,6 +2196,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.2" @@ -1607,6 +2211,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simdutf8" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" + [[package]] name = "slab" version = "0.4.9" @@ -1629,7 +2239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys", ] [[package]] @@ -1638,11 +2248,17 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "subtle" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" + [[package]] name = "syn" -version = "2.0.71" +version = "2.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462" +checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af" dependencies = [ "proc-macro2", "quote", @@ -1701,23 +2317,32 @@ dependencies = [ "cfg-if", "fastrand", "rustix", - "windows-sys 0.52.0", + "windows-sys", +] + +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", ] [[package]] name = "thiserror" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", @@ -1782,22 +2407,21 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.38.1" +version = "1.39.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb2caba9f80616f438e09748d5acda951967e1ea58508ef53d9c6402485a46df" +checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", "tracing", - "windows-sys 0.48.0", + "windows-sys", ] [[package]] @@ -1812,9 +2436,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.3.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -1852,16 +2476,20 @@ checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", + "slab", "tokio", ] [[package]] name = "toml_datetime" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4badfd56924ae69bcc9039335b2e017639ce3f9b001c393c1b2d1ef846ce2cbf" +checksum = "f8fb9f64314842840f1d940ac544da178732128f1c78c21772e876579e0da1db" [[package]] name = "toml_edit" @@ -1874,6 +2502,33 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic" version = "0.11.0" @@ -1978,6 +2633,35 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9be14ba1bbe4ab79e9229f7f89fab8d120b865859f10527f31c033e599d2284" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-subscriber" version = "0.3.18" @@ -1985,12 +2669,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" dependencies = [ "matchers", + "nu-ansi-term", "once_cell", "regex", "sharded-slab", + "smallvec", "thread_local", "tracing", "tracing-core", + "tracing-log", +] + +[[package]] +name = "transform-stream" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05034de7a8fcb11796a36478a2a8b16dca6772644dec5f49f709d5c66a38d359" +dependencies = [ + "futures-core", ] [[package]] @@ -1999,6 +2695,12 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "typenum" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -2037,6 +2739,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8parse" version = "0.2.2" @@ -2064,6 +2772,18 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "vsimd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" + [[package]] name = "want" version = "0.3.1" @@ -2134,45 +2854,62 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af190c94f2773fdb3729c55b007a722abb5384da03bc0986df4c289bf5567e96" [[package]] -name = "windows-core" -version = "0.52.0" +name = "web-time" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" dependencies = [ - "windows-targets 0.52.6", + "js-sys", + "wasm-bindgen", ] [[package]] -name = "windows-sys" -version = "0.48.0" +name = "winapi" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" dependencies = [ - "windows-targets 0.48.5", + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", ] [[package]] -name = "windows-sys" +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d4cc384e1e73b93bafa6fb4f1df8c41695c8a91cf9c4c64358067d15a7b6c6b" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "windows-core" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.52.6", + "windows-targets", ] [[package]] -name = "windows-targets" -version = "0.48.5" +name = "windows-sys" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows_aarch64_gnullvm 0.48.5", - "windows_aarch64_msvc 0.48.5", - "windows_i686_gnu 0.48.5", - "windows_i686_msvc 0.48.5", - "windows_x86_64_gnu 0.48.5", - "windows_x86_64_gnullvm 0.48.5", - "windows_x86_64_msvc 0.48.5", + "windows-targets", ] [[package]] @@ -2181,46 +2918,28 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm 0.52.6", - "windows_aarch64_msvc 0.52.6", - "windows_i686_gnu 0.52.6", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", "windows_i686_gnullvm", - "windows_i686_msvc 0.52.6", - "windows_x86_64_gnu 0.52.6", - "windows_x86_64_gnullvm 0.52.6", - "windows_x86_64_msvc 0.52.6", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" - [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" -[[package]] -name = "windows_aarch64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" - [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" -[[package]] -name = "windows_i686_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" - [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -2233,48 +2952,24 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" -[[package]] -name = "windows_i686_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" - [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" -[[package]] -name = "windows_x86_64_gnu" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" - [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" - [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" -[[package]] -name = "windows_x86_64_msvc" -version = "0.48.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" - [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -2306,3 +3001,39 @@ dependencies = [ "thiserror", "time", ] + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "zeroize" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + +[[package]] +name = "zstd-sys" +version = "2.0.12+zstd.1.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a4e40c320c3cb459d9a9ff6de98cff88f4751ee9275d140e2be94a2b74e4c13" +dependencies = [ + "cc", + "pkg-config", +] diff --git a/Cargo.toml b/Cargo.toml index 62e6a98..cdbabd7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,8 @@ members = [ "cli", "client", "dtrd", - "tcpcl" + "tcpcl", + "replistore" ] resolver = "2" \ No newline at end of file diff --git a/replistore/Cargo.toml b/replistore/Cargo.toml new file mode 100644 index 0000000..1cf65bd --- /dev/null +++ b/replistore/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "replistore" +version = "0.1.0" +edition = "2021" +resolver = "2" + +[dependencies] +log = "0.4" +env_logger = "0.10.0" +prost = "0.12.1" +dtrd_client = {path = "../client"} +tokio = { version = "1.34.0", features = ["full", "tracing"] } +futures-util = "0.3.28" +s3s = "0.8.0" +hyper = "0.14.27" +anyhow = "1.0.75" +async-trait = "0.1.74" +tokio-util = { version = "0.7.10", features = ["full"] } +bytes = "1.5.0" +time = "0.3.30" +md-5 = "0.10.6" +hex = "0.4.3" +sha2 = "0.10.8" +actix = "0.13.1" +actix-rt = "2.9.0" +console-subscriber = "0.2.0" +tracing = "0.1.40" +tracing-opentelemetry = "0.23.0" +tracing-subscriber = "0.3.18" +opentelemetry = "0.22.0" +opentelemetry-otlp = "0.15.0" +opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] } +futures = "0.3.30" +uuid = {version = "1.8.0", features = ["v4"]} +# until https://github.com/rust-rocksdb/rust-rocksdb/pull/868/commits/ef029f292c058d3465fcc203b32473ecaddfcd4f is released +rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb" } + +[build-dependencies] +prost-build = { version = "0.12.1" } diff --git a/replistore/src/common/mod.rs b/replistore/src/common/mod.rs new file mode 100644 index 0000000..9a2a938 --- /dev/null +++ b/replistore/src/common/mod.rs @@ -0,0 +1,18 @@ +// Copyright (C) 2023 Felix Huettner +// +// This file is part of DTRD. +// +// DTRD is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// DTRD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +pub mod settings; diff --git a/replistore/src/common/settings.rs b/replistore/src/common/settings.rs new file mode 100644 index 0000000..308da82 --- /dev/null +++ b/replistore/src/common/settings.rs @@ -0,0 +1,41 @@ +// Copyright (C) 2023 Felix Huettner +// +// This file is part of DTRD. +// +// DTRD is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// DTRD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::env; + +#[derive(Debug, Clone)] +pub struct Settings { + pub tokio_tracing_port: Option, +} + +impl Default for Settings { + fn default() -> Self { + Self { + tokio_tracing_port: None, + } + } +} + +impl Settings { + pub fn from_env() -> Self { + let mut settings = Settings::default(); + if let Ok(setting) = env::var("TOKIO_TRACING_PORT") { + settings.tokio_tracing_port = Some(setting); + }; + settings + } +} diff --git a/replistore/src/frontend/mod.rs b/replistore/src/frontend/mod.rs new file mode 100644 index 0000000..1e38235 --- /dev/null +++ b/replistore/src/frontend/mod.rs @@ -0,0 +1,18 @@ +// Copyright (C) 2023 Felix Huettner +// +// This file is part of DTRD. +// +// DTRD is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// DTRD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +pub mod s3; diff --git a/replistore/src/frontend/s3/messages.rs b/replistore/src/frontend/s3/messages.rs new file mode 100644 index 0000000..cea13a3 --- /dev/null +++ b/replistore/src/frontend/s3/messages.rs @@ -0,0 +1,193 @@ +use std::pin::Pin; + +use actix::prelude::*; +use time::OffsetDateTime; + +use crate::stores::messages::{DeleteBlobError, GetBlobError, PutBlobError, StoreError}; + +#[derive(Debug)] +pub struct S3Error { + store_error: StoreError, +} + +impl From for S3Error { + fn from(value: StoreError) -> Self { + S3Error { store_error: value } + } +} + +#[derive(Debug)] +pub struct Object { + pub key: String, + pub md5sum: String, + pub sha256sum: String, + pub last_modified: OffsetDateTime, + pub size: u64, +} + +#[derive(Message)] +#[rtype(result = "Result, S3Error>")] +pub struct ListBuckets {} + +pub enum CreateBucketError { + S3Error(S3Error), + BucketAlreadyExists, +} + +impl From for CreateBucketError { + fn from(value: StoreError) -> Self { + Self::S3Error(value.into()) + } +} + +#[derive(Message)] +#[rtype(result = "Result")] +pub struct CreateBucket { + pub name: String, +} + +#[derive(Message)] +#[rtype(result = "Result, S3Error>")] +pub struct HeadBucket { + pub name: String, +} + +#[derive(Debug)] +pub struct ReadDataError { + pub msg: String, +} + +pub enum PutObjectError { + S3Error(S3Error), + BucketNotFound, + ReadDataError(ReadDataError), +} + +impl From for PutObjectError { + fn from(value: StoreError) -> Self { + Self::S3Error(value.into()) + } +} + +impl From for PutObjectError { + fn from(value: PutBlobError) -> Self { + match value { + PutBlobError::StoreError(e) => e.into(), + PutBlobError::BlobReadError(e) => Self::ReadDataError(ReadDataError { msg: e.msg }), + PutBlobError::IoError(e) => Self::ReadDataError(ReadDataError { msg: e.to_string() }), + } + } +} + +#[derive(Message)] +#[rtype(result = "Result")] +pub struct PutObject { + pub bucket: String, + pub key: String, + pub data: Pin> + Send>>, +} + +pub enum ListObjectError { + S3Error(S3Error), + BucketNotFound, +} + +impl From for ListObjectError { + fn from(value: StoreError) -> Self { + Self::S3Error(value.into()) + } +} + +#[derive(Message)] +#[rtype(result = "Result, ListObjectError>")] +pub struct ListObject { + pub bucket: String, + pub prefix: String, +} + +pub enum HeadObjectError { + S3Error(S3Error), + BucketNotFound, + ObjectNotFound, +} + +impl From for HeadObjectError { + fn from(value: StoreError) -> Self { + Self::S3Error(value.into()) + } +} + +#[derive(Message)] +#[rtype(result = "Result")] +pub struct HeadObject { + pub bucket: String, + pub key: String, +} + +pub enum GetObjectError { + S3Error(S3Error), + BucketNotFound, + ObjectNotFound, + ReadDataError(ReadDataError), +} + +impl From for GetObjectError { + fn from(value: StoreError) -> Self { + Self::S3Error(value.into()) + } +} + +impl From for GetObjectError { + fn from(value: GetBlobError) -> Self { + match value { + GetBlobError::StoreError(e) => e.into(), + GetBlobError::BlobReadError(e) => Self::ReadDataError(ReadDataError { msg: e.msg }), + GetBlobError::IoError(e) => Self::ReadDataError(ReadDataError { msg: e.to_string() }), + GetBlobError::BlobDoesNotExist => GetObjectError::ObjectNotFound, + } + } +} + +pub struct GetObjectResult { + pub metadata: Object, + pub data: Pin> + Send + Sync>>, +} + +#[derive(Message)] +#[rtype(result = "Result")] +pub struct GetObject { + pub bucket: String, + pub key: String, +} + +pub enum DeleteObjectError { + S3Error(S3Error), + BucketNotFound, + ObjectNotFound, + ReadDataError(ReadDataError), +} + +impl From for DeleteObjectError { + fn from(value: StoreError) -> Self { + Self::S3Error(value.into()) + } +} + +impl From for DeleteObjectError { + fn from(value: DeleteBlobError) -> Self { + match value { + DeleteBlobError::StoreError(e) => e.into(), + DeleteBlobError::IoError(e) => { + Self::ReadDataError(ReadDataError { msg: e.to_string() }) + } + DeleteBlobError::BlobDoesNotExist => DeleteObjectError::ObjectNotFound, + } + } +} + +#[derive(Message)] +#[rtype(result = "Result<(), DeleteObjectError>")] +pub struct DeleteObject { + pub bucket: String, + pub key: String, +} diff --git a/replistore/src/frontend/s3/mod.rs b/replistore/src/frontend/s3/mod.rs new file mode 100644 index 0000000..c2eba30 --- /dev/null +++ b/replistore/src/frontend/s3/mod.rs @@ -0,0 +1,3 @@ +pub(super) mod messages; +pub mod s3; +pub mod s3_frontend; diff --git a/replistore/src/frontend/s3/s3.rs b/replistore/src/frontend/s3/s3.rs new file mode 100644 index 0000000..386d3c8 --- /dev/null +++ b/replistore/src/frontend/s3/s3.rs @@ -0,0 +1,453 @@ +use std::collections::HashMap; + +use actix::prelude::*; +use futures::{Future, TryStreamExt}; +use log::error; +use time::OffsetDateTime; + +use crate::stores::{ + contentaddressableblob::ContentAddressableBlobStore, + keyvalue::KeyValueStore, + messages::{BlobReadError, GetOrCreateError, StoreError}, + storeowner::StoreOwner, +}; + +use super::messages::{ + CreateBucket, CreateBucketError, DeleteObject, DeleteObjectError, GetObject, GetObjectError, + GetObjectResult, HeadBucket, HeadObject, HeadObjectError, ListBuckets, ListObject, + ListObjectError, Object, PutObject, PutObjectError, S3Error, +}; + +#[derive(Debug)] +pub struct S3 { + store_owner: Addr, + s3_kv_store: Option>, + s3_blob_store: Option>, +} + +/* Kv Structure + * + * s3_kv_store: + * \0buckets\0: nil + * s3_obj_kv_store: + * \0objects\0\0: nil + * \0objectmeta\0\0\0size: size in bytes + * \0objectmeta\0\0\0last_modified: u64 timestamp + * + */ + +impl S3 { + pub fn new(store_owner: Addr) -> Self { + S3 { + store_owner, + s3_kv_store: None, + s3_blob_store: None, + } + } + + fn store(&self) -> Addr { + self.s3_kv_store.clone().unwrap() + } + + fn blob_store(&self) -> Addr { + self.s3_blob_store.clone().unwrap() + } + + fn bucket_path(&self, name: &str) -> Vec { + vec!["buckets".to_string(), name.to_string()] + } + + fn object_path(&self, bucket: &str, key: &str) -> Vec { + vec!["objects".to_string(), bucket.to_string(), key.to_string()] + } + + fn objectmeta_path(&self, bucket: &str, key: &str, suffix: &str) -> Vec { + vec![ + "objectmeta".to_string(), + bucket.to_string(), + key.to_string(), + suffix.to_string(), + ] + } + + fn with_bucket_store( + &self, + bucket: String, + not_found_error: E, + handler: F, + ) -> ResponseFuture> + where + Fut: Future>, + F: FnOnce(Addr) -> Fut + 'static, + E: From + 'static, + { + let root_store = self.store(); + let store_owner = self.store_owner.clone(); + let bucket_path = self.bucket_path(&bucket); + + Box::pin(async move { + if root_store + .send(crate::stores::messages::Get { key: bucket_path }) + .await + .unwrap()? + .is_none() + { + return Err(not_found_error); + } + + match store_owner + .send(crate::stores::messages::GetOrCreateKeyValueStore { + name: format!("s3metadata\0{}", bucket).to_string(), + }) + .await + .unwrap() + { + Ok(addr) => handler(addr).await, + Err(GetOrCreateError::StoreError(e)) => return Err(e.into()), + Err(GetOrCreateError::StoreTypeMissmatch(store, e)) => { + panic!("Error getting s3 meta store {}: {}", store, e) + } + } + }) + } + + async fn meta_to_obj( + store: &Addr, + bucket: &String, + obj: String, + ) -> Result { + let mut meta = store + .send(crate::stores::messages::List { + prefix: vec![ + "objectmeta".to_string(), + bucket.clone(), + obj.clone(), + String::new(), + ], + }) + .await + .unwrap()?; + let last_modified = OffsetDateTime::from_unix_timestamp( + meta.get("last_modified") + .map(|e| e.parse().unwrap_or_default()) + .unwrap_or_default(), + ) + .unwrap(); + let md5sum = meta.remove("md5sum").unwrap_or_default(); + let sha256sum = meta.remove("sha256sum").unwrap_or_default(); + let size = meta + .get("size") + .map(|e| e.parse().unwrap_or_default()) + .unwrap_or_default(); + Ok(Object { + key: obj, + md5sum, + sha256sum, + last_modified, + size, + }) + } +} + +impl Actor for S3 { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + let store_owner = self.store_owner.clone(); + let fut = async move { + store_owner + .send(crate::stores::messages::GetOrCreateKeyValueStore { + name: "s3metadata\0root".to_string(), + }) + .await + }; + fut.into_actor(self) + .then(|res, act, ctx| { + match res.unwrap() { + Ok(addr) => act.s3_kv_store = Some(addr), + Err(e) => { + error!("Error getting keyvalue store {:?}", e); + ctx.stop(); + } + } + fut::ready(()) + }) + .wait(ctx); + + let store_owner = self.store_owner.clone(); + let fut = async move { + store_owner + .send( + crate::stores::messages::GetOrCreateContentAddressableBlobStore { + name: "s3data".to_string(), + path: "/tmp/replistore/s3data".into(), + }, + ) + .await + }; + fut.into_actor(self) + .then(|res, act, ctx| { + match res.unwrap() { + Ok(addr) => act.s3_blob_store = Some(addr), + Err(e) => { + error!("Error getting blob store {:?}", e); + ctx.stop(); + } + } + fut::ready(()) + }) + .wait(ctx) + } +} + +impl Handler for S3 { + type Result = ResponseFuture, S3Error>>; + fn handle(&mut self, _msg: ListBuckets, _ctx: &mut Self::Context) -> Self::Result { + let store = self.store(); + Box::pin(async move { + let resp = store + .send(crate::stores::messages::List { + prefix: vec!["buckets".to_string(), "".to_string()], + }) + .await + .unwrap()?; + Ok(resp.into_keys().collect()) + }) + } +} + +impl Handler for S3 { + type Result = ResponseFuture>; + fn handle(&mut self, msg: CreateBucket, _ctx: &mut Self::Context) -> Self::Result { + let CreateBucket { name } = msg; + let store = self.store(); + let bucket_path = self.bucket_path(&name); + Box::pin(async move { + let resp = store + .send(crate::stores::messages::Get { + key: bucket_path.clone(), + }) + .await + .unwrap()?; + if resp.is_some() { + return Err(CreateBucketError::BucketAlreadyExists); + } + store + .send(crate::stores::messages::Set { + key: bucket_path, + value: String::new(), + }) + .await + .unwrap()?; + Ok(name) + }) + } +} + +impl Handler for S3 { + type Result = ResponseFuture, S3Error>>; + + fn handle(&mut self, msg: HeadBucket, _ctx: &mut Self::Context) -> Self::Result { + let HeadBucket { name } = msg; + let store = self.store(); + let bucket_path = self.bucket_path(&name); + Box::pin(async move { + let resp = store + .send(crate::stores::messages::Get { + key: bucket_path.clone(), + }) + .await + .unwrap()?; + if resp.is_some() { + return Ok(Some(())); + } + Ok(None) + }) + } +} + +impl Handler for S3 { + type Result = ResponseFuture, ListObjectError>>; + + fn handle(&mut self, msg: ListObject, _ctx: &mut Self::Context) -> Self::Result { + let ListObject { bucket, prefix } = msg; + let object_path = self.object_path(&bucket, &prefix); + self.with_bucket_store( + bucket.clone(), + ListObjectError::BucketNotFound, + |store: Addr| async move { + let mut result = Vec::new(); + for obj in store + .send(crate::stores::messages::List { + prefix: object_path, + }) + .await + .unwrap()? + .into_keys() + { + result.push(Self::meta_to_obj(&store, &bucket, obj).await?); + } + Ok(result) + }, + ) + } +} + +impl Handler for S3 { + type Result = ResponseFuture>; + + fn handle(&mut self, msg: HeadObject, _ctx: &mut Self::Context) -> Self::Result { + let HeadObject { bucket, key } = msg; + let object_path = self.object_path(&bucket, &key); + self.with_bucket_store( + bucket.clone(), + HeadObjectError::BucketNotFound, + |store: Addr| async move { + let resp = store + .send(crate::stores::messages::Get { key: object_path }) + .await + .unwrap()?; + if resp.is_none() { + return Err(HeadObjectError::ObjectNotFound); + } + Ok(Self::meta_to_obj(&store, &bucket, key).await?) + }, + ) + } +} + +impl Handler for S3 { + type Result = ResponseFuture>; + + fn handle(&mut self, msg: PutObject, _ctx: &mut Self::Context) -> Self::Result { + let PutObject { bucket, key, data } = msg; + let blob_store = self.blob_store(); + let object_path = self.object_path(&bucket, &key); + let last_modified_path = self.objectmeta_path(&bucket, &key, "last_modified"); + let md5sum_path = self.objectmeta_path(&bucket, &key, "md5sum"); + let sha256sum_path = self.objectmeta_path(&bucket, &key, "sha256sum"); + let size_path = self.objectmeta_path(&bucket, &key, "size"); + self.with_bucket_store( + bucket.clone(), + PutObjectError::BucketNotFound, + |store: Addr| async move { + let info = blob_store + .send(crate::stores::messages::PutBlob { + data: Box::pin(data.map_err(|e| BlobReadError { msg: e.msg })), + }) + .await + .unwrap()?; + + let last_modified = OffsetDateTime::now_utc(); + store + .send(crate::stores::messages::MultiSet { + data: HashMap::from([ + (object_path.clone(), String::new()), + ( + last_modified_path, + last_modified.unix_timestamp().to_string(), + ), + (md5sum_path, info.md5sum.clone()), + (sha256sum_path, info.sha256sum.clone()), + (size_path, info.size.to_string()), + ]), + }) + .await + .unwrap()?; + Ok(Object { + key, + md5sum: info.md5sum, + sha256sum: info.sha256sum, + size: info.size, + last_modified, + }) + }, + ) + } +} + +impl Handler for S3 { + type Result = ResponseFuture>; + + fn handle(&mut self, msg: GetObject, _ctx: &mut Self::Context) -> Self::Result { + let GetObject { bucket, key } = msg; + let blob_store = self.blob_store(); + let object_path = self.object_path(&bucket, &key); + self.with_bucket_store( + bucket.clone(), + GetObjectError::BucketNotFound, + |store: Addr| async move { + let resp = store + .send(crate::stores::messages::Get { key: object_path }) + .await + .unwrap()?; + if resp.is_none() { + return Err(GetObjectError::ObjectNotFound); + } + let meta = Self::meta_to_obj(&store, &bucket, key).await?; + + let resp = blob_store + .send(crate::stores::messages::GetBlob { + sha256sum: meta.sha256sum.clone(), + }) + .await + .unwrap()?; + + Ok(GetObjectResult { + metadata: meta, + data: Box::pin(resp.map_err(|e| super::messages::ReadDataError { msg: e.msg })), + }) + }, + ) + } +} + +impl Handler for S3 { + type Result = ResponseFuture>; + + fn handle(&mut self, msg: DeleteObject, _ctx: &mut Self::Context) -> Self::Result { + let DeleteObject { bucket, key } = msg; + let blob_store = self.blob_store(); + let object_path = self.object_path(&bucket, &key); + self.with_bucket_store( + bucket.clone(), + DeleteObjectError::BucketNotFound, + |store: Addr| async move { + let resp = store + .send(crate::stores::messages::Get { + key: object_path.clone(), + }) + .await + .unwrap()?; + if resp.is_none() { + return Err(DeleteObjectError::ObjectNotFound); + } + let meta = Self::meta_to_obj(&store, &bucket, key.clone()).await?; + + store + .send(crate::stores::messages::MultiDelete { + data: vec![ + object_path, + vec![ + "objectmeta".to_string(), + bucket.clone(), + key.clone(), + String::new(), + ], + ], + }) + .await + .unwrap()?; + + blob_store + .send(crate::stores::messages::DeleteBlob { + sha256sum: meta.sha256sum, + }) + .await + .unwrap()?; + + Ok(()) + }, + ) + } +} diff --git a/replistore/src/frontend/s3/s3_frontend.rs b/replistore/src/frontend/s3/s3_frontend.rs new file mode 100644 index 0000000..cb699bc --- /dev/null +++ b/replistore/src/frontend/s3/s3_frontend.rs @@ -0,0 +1,412 @@ +// Copyright (C) 2023 Felix Huettner +// +// This file is part of DTRD. +// +// DTRD is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// DTRD is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use actix::prelude::*; +use futures::TryStreamExt; +use futures_util::future::FutureExt; + +use hyper::Server; +use log::info; +use s3s::{ + auth::SimpleAuth, + dto::{CreateBucketInput, CreateBucketOutput}, + service::S3ServiceBuilder, +}; +use tokio::sync::{broadcast, mpsc}; + +use std::time; + +use async_trait::async_trait; +use s3s::{ + dto::{ + Bucket, DeleteObjectInput, DeleteObjectOutput, GetBucketLocationInput, + GetBucketLocationOutput, GetObjectInput, GetObjectOutput, HeadBucketInput, + HeadBucketOutput, HeadObjectInput, HeadObjectOutput, ListBucketsInput, ListBucketsOutput, + ListObjectsInput, ListObjectsOutput, ListObjectsV2Input, ListObjectsV2Output, Object, + Owner, PutObjectInput, PutObjectOutput, + }, + s3_error, S3Request, S3Response, S3Result, +}; +use tracing::instrument; + +impl From for s3s::S3Error { + fn from(value: super::messages::S3Error) -> Self { + s3s::S3Error::with_message(s3s::S3ErrorCode::InternalError, format!("{:?}", value)) + } +} + +impl From for s3s::S3Error { + fn from(value: super::messages::CreateBucketError) -> Self { + match value { + super::messages::CreateBucketError::S3Error(e) => e.into(), + super::messages::CreateBucketError::BucketAlreadyExists => { + s3_error!(BucketAlreadyExists) + } + } + } +} + +impl From for s3s::S3Error { + fn from(value: super::messages::ListObjectError) -> Self { + match value { + super::messages::ListObjectError::S3Error(e) => e.into(), + super::messages::ListObjectError::BucketNotFound => s3_error!(NoSuchBucket), + } + } +} + +impl From for s3s::S3Error { + fn from(value: super::messages::PutObjectError) -> Self { + match value { + super::messages::PutObjectError::S3Error(e) => e.into(), + super::messages::PutObjectError::BucketNotFound => s3_error!(NoSuchBucket), + super::messages::PutObjectError::ReadDataError(e) => { + s3s::S3Error::with_message(s3s::S3ErrorCode::InternalError, e.msg) + } + } + } +} + +impl From for s3s::S3Error { + fn from(value: super::messages::HeadObjectError) -> Self { + match value { + super::messages::HeadObjectError::S3Error(e) => e.into(), + super::messages::HeadObjectError::BucketNotFound => s3_error!(NoSuchBucket), + super::messages::HeadObjectError::ObjectNotFound => s3_error!(NoSuchKey), + } + } +} + +impl From for s3s::S3Error { + fn from(value: super::messages::GetObjectError) -> Self { + match value { + super::messages::GetObjectError::S3Error(e) => e.into(), + super::messages::GetObjectError::BucketNotFound => s3_error!(NoSuchBucket), + super::messages::GetObjectError::ObjectNotFound => s3_error!(NoSuchKey), + super::messages::GetObjectError::ReadDataError(e) => { + s3s::S3Error::with_message(s3s::S3ErrorCode::InternalError, e.msg) + } + } + } +} + +impl From for s3s::S3Error { + fn from(value: super::messages::DeleteObjectError) -> Self { + match value { + super::messages::DeleteObjectError::S3Error(e) => e.into(), + super::messages::DeleteObjectError::BucketNotFound => s3_error!(NoSuchBucket), + super::messages::DeleteObjectError::ObjectNotFound => s3_error!(NoSuchKey), + super::messages::DeleteObjectError::ReadDataError(e) => { + s3s::S3Error::with_message(s3s::S3ErrorCode::InternalError, e.msg) + } + } + } +} + +#[async_trait] +trait AddrExt { + async fn send_s3(&self, msg: M) -> Result + where + M: Message + Send + 'static, + M::Result: Send, + A: Handler, + A::Context: actix::dev::ToEnvelope; +} + +#[async_trait] +impl AddrExt for Addr { + async fn send_s3(&self, msg: M) -> Result + where + M: Message + Send + 'static, + M::Result: Send, + A: Handler, + A::Context: actix::dev::ToEnvelope, + { + self.send(msg) + .await + .map_err(|_| s3s::S3Error::with_message(s3s::S3ErrorCode::InternalError, "actix error")) + } +} + +#[derive(Debug)] +pub struct S3Frontend { + s3: Addr, +} + +impl S3Frontend { + pub async fn new(s3: Addr) -> Self { + S3Frontend { s3 } + } + + pub async fn run( + self, + mut shutdown: broadcast::Receiver<()>, + _shutdown_complete_sender: mpsc::Sender<()>, + ) -> Result<(), Box> { + // Setup S3Frontend service + let service = { + let mut b = S3ServiceBuilder::new(self); + + // Enable authentication + b.set_auth(SimpleAuth::from_single("cake", "ilike")); + + b.build() + }; + + // Run server + let addr = "0.0.0.0:8080".parse().unwrap(); + info!("Server listening on {}", addr); + let server = Server::try_bind(&addr) + .unwrap() + .serve(service.into_shared().into_make_service()); + + info!("server is running at http://{addr}"); + server + .with_graceful_shutdown(shutdown.recv().map(|_| ())) + .await?; + + info!("Server has shutdown. See you"); + // _shutdown_complete_sender is explicitly dropped here + Ok(()) + } +} + +#[async_trait] +impl s3s::S3 for S3Frontend { + #[instrument] + async fn get_bucket_location( + &self, + _req: S3Request, + ) -> S3Result> { + Ok(S3Response::new(GetBucketLocationOutput { + location_constraint: None, + })) + } + + #[instrument] + async fn list_buckets( + &self, + _req: S3Request, + ) -> S3Result> { + let buckets = self.s3.send_s3(super::messages::ListBuckets {}).await??; + + Ok(S3Response::new(ListBucketsOutput { + buckets: Some( + buckets + .into_iter() + .map(|name| Bucket { + creation_date: Some(time::SystemTime::now().into()), + name: Some(name.to_string()), + }) + .collect(), + ), + owner: Some(Owner { + display_name: Some("ich teste mal".to_string()), + id: Some("test".to_string()), + }), + })) + } + + #[instrument] + async fn create_bucket( + &self, + req: S3Request, + ) -> S3Result> { + let bucket = self + .s3 + .send_s3(super::messages::CreateBucket { + name: req.input.bucket, + }) + .await??; + Ok(S3Response::new(CreateBucketOutput { + location: Some(format!("/{}", bucket)), + })) + } + + #[instrument] + async fn head_bucket( + &self, + req: S3Request, + ) -> S3Result> { + match self + .s3 + .send_s3(super::messages::HeadBucket { + name: req.input.bucket, + }) + .await?? + { + Some(_) => Ok(S3Response::new(HeadBucketOutput {})), + None => Err(s3_error!(NoSuchBucket)), + } + } + + #[instrument] + async fn list_objects( + &self, + req: S3Request, + ) -> S3Result> { + let objects = self + .s3 + .send_s3(super::messages::ListObject { + bucket: req.input.bucket.clone(), + prefix: req.input.prefix.unwrap_or_default(), + }) + .await??; + + Ok(S3Response::new(ListObjectsOutput { + contents: Some( + objects + .into_iter() + .map(|obj| Object { + key: Some(obj.key), + last_modified: Some(obj.last_modified.into()), + size: obj.size as i64, + ..Default::default() + }) + .collect(), + ), + max_keys: i32::MAX, + name: Some(req.input.bucket), + ..Default::default() + })) + } + + #[instrument] + async fn list_objects_v2( + &self, + req: S3Request, + ) -> S3Result> { + let objects = self + .s3 + .send_s3(super::messages::ListObject { + bucket: req.input.bucket.clone(), + prefix: req.input.prefix.unwrap_or_default(), + }) + .await??; + + Ok(S3Response::new(ListObjectsV2Output { + contents: Some( + objects + .into_iter() + .map(|obj| Object { + key: Some(obj.key), + last_modified: Some(obj.last_modified.into()), + size: obj.size as i64, + ..Default::default() + }) + .collect(), + ), + max_keys: i32::MAX, + name: Some(req.input.bucket), + ..Default::default() + })) + } + + #[instrument] + async fn head_object( + &self, + req: S3Request, + ) -> S3Result> { + let obj = self + .s3 + .send_s3(super::messages::HeadObject { + bucket: req.input.bucket.clone(), + key: req.input.key, + }) + .await??; + + Ok(S3Response::new(HeadObjectOutput { + last_modified: Some(obj.last_modified.into()), + content_length: obj.size as i64, + e_tag: Some(obj.md5sum), + checksum_sha256: Some(obj.sha256sum), + ..Default::default() + })) + } + + #[instrument] + async fn get_object( + &self, + req: S3Request, + ) -> S3Result> { + let result = self + .s3 + .send_s3(super::messages::GetObject { + bucket: req.input.bucket.clone(), + key: req.input.key, + }) + .await??; + let obj = result.metadata; + + Ok(S3Response::new(GetObjectOutput { + body: Some(s3s::dto::StreamingBlob::wrap(Box::pin( + result.data.map_err(|e| std::io::Error::other(e.msg)), + ))), + last_modified: Some(obj.last_modified.into()), + content_length: obj.size as i64, + e_tag: Some(obj.md5sum), + checksum_sha256: Some(obj.sha256sum), + ..Default::default() + })) + } + + #[instrument] + async fn delete_object( + &self, + req: S3Request, + ) -> S3Result> { + self.s3 + .send_s3(super::messages::DeleteObject { + bucket: req.input.bucket.clone(), + key: req.input.key, + }) + .await??; + + Ok(S3Response::new(DeleteObjectOutput { + ..Default::default() + })) + } + + #[instrument] + async fn put_object( + &self, + req: S3Request, + ) -> S3Result> { + if req.input.body.is_none() { + return Err(s3_error!(InvalidRequest)); + } + let object = self + .s3 + .send_s3(super::messages::PutObject { + bucket: req.input.bucket, + key: req.input.key, + data: Box::pin( + req.input + .body + .unwrap() + .map_err(|e| super::messages::ReadDataError { msg: e.to_string() }), + ), + }) + .await??; + + Ok(S3Response::new(PutObjectOutput { + e_tag: Some(object.md5sum), + checksum_sha256: Some(object.sha256sum), + ..Default::default() + })) + } +} diff --git a/replistore/src/main.rs b/replistore/src/main.rs new file mode 100644 index 0000000..1b3ff40 --- /dev/null +++ b/replistore/src/main.rs @@ -0,0 +1,132 @@ +use crate::{ + common::settings::Settings, frontend::s3::s3_frontend::S3Frontend, + stores::storeowner::StoreOwner, +}; +use actix::prelude::*; +use log::{error, info}; +use tokio::sync::{broadcast, mpsc}; + +use opentelemetry::KeyValue; +use opentelemetry_otlp::WithExportConfig; +use opentelemetry_sdk::{ + trace::{self, RandomIdGenerator, Sampler}, + Resource, +}; +use std::time::Duration; +use tracing_subscriber::layer::SubscriberExt; + +mod common; +mod frontend; +mod stores; + +fn init_tracing(settings: &Settings) { + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter() + .tonic() + .with_endpoint("http://localhost:4317") + .with_timeout(Duration::from_secs(3)), + ) + .with_trace_config( + trace::config() + .with_sampler(Sampler::AlwaysOn) + .with_id_generator(RandomIdGenerator::default()) + .with_max_events_per_span(64) + .with_max_attributes_per_span(16) + .with_max_events_per_span(16) + .with_resource(Resource::new(vec![KeyValue::new( + "service.name", + "replistore", + )])), + ) + .install_batch(opentelemetry_sdk::runtime::Tokio) + .unwrap(); + + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + + let console_layer = console_subscriber::ConsoleLayer::builder() + .server_addr(( + [127, 0, 0, 1], + settings + .tokio_tracing_port + .clone() + .map_or(0, |e| e.parse().unwrap()), + )) + .spawn(); + + let subscriber = tracing_subscriber::Registry::default() + .with(telemetry) + .with(console_layer); + + tracing::subscriber::set_global_default(subscriber).unwrap(); +} + +#[actix_rt::main] +async fn main() { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + info!("Starting up"); + let settings: Settings = Settings::from_env(); + info!("Starting with settings: {:?}", settings); + init_tracing(&settings); + + let (notify_shutdown, _) = broadcast::channel::<()>(1); + let (shutdown_complete_tx, mut shutdown_complete_rx) = mpsc::channel::<()>(1); + + let storeowner = StoreOwner::new("/tmp/replistore/db".into()) + .unwrap() + .start(); + + let s3_addr = frontend::s3::s3::S3::new(storeowner.clone()).start(); + + let s3_task_shutdown_notifier = notify_shutdown.subscribe(); + let s3_task_shutdown_complete_tx_task = shutdown_complete_tx.clone(); + let s3_task_s3_addr = s3_addr.clone(); + let s3_task = tokio::task::Builder::new() + .name("S3") + .spawn(async move { + let s3 = S3Frontend::new(s3_task_s3_addr).await; + match s3 + .run(s3_task_shutdown_notifier, s3_task_shutdown_complete_tx_task) + .await + { + Ok(_) => Ok(()), + Err(e) => Err(e.to_string()), + } + }) + .unwrap(); + + let ctrl_c = tokio::signal::ctrl_c(); + + tokio::select! { + res = s3_task => { + if let Ok(Err(e)) = res { + error!("something bad happened with the s3 server {:?}. Aborting...", e); + } + } + _ = ctrl_c => { + info!("Shutting down"); + } + } + + info!("Stopping external connections"); + // Stolen from: https://github.com/tokio-rs/mini-redis/blob/master/src/server.rs + // When `notify_shutdown` is dropped, all tasks which have `subscribe`d will + // receive the shutdown signal and can exit + drop(notify_shutdown); + // Drop final `Sender` so the `Receiver` below can complete + drop(shutdown_complete_tx); + + info!("Stopping individual actors"); + + info!("Now stopping actor system"); + System::current().stop(); + + // Wait for all active connections to finish processing. As the `Sender` + // handle held by the listener has been dropped above, the only remaining + // `Sender` instances are held by connection handler tasks. When those drop, + // the `mpsc` channel will close and `recv()` will return `None`. + let _ = shutdown_complete_rx.recv().await; + + info!("All done, see you"); +} diff --git a/replistore/src/stores/contentaddressableblob.rs b/replistore/src/stores/contentaddressableblob.rs new file mode 100644 index 0000000..81196e3 --- /dev/null +++ b/replistore/src/stores/contentaddressableblob.rs @@ -0,0 +1,179 @@ +use actix::prelude::*; +use bytes::{Bytes, BytesMut}; +use futures::{SinkExt, StreamExt, TryStreamExt}; +use md5::Md5; +use rocksdb::TransactionDB; +use sha2::Digest; +use std::{path::PathBuf, pin::Pin, sync::Arc}; +use tokio::io::AsyncReadExt; +use tokio_util::{ + codec::{BytesCodec, FramedRead}, + compat::TokioAsyncWriteCompatExt, +}; + +use super::messages::{ + BlobInfo, BlobReadError, DeleteBlob, DeleteBlobError, GetBlob, GetBlobError, PutBlob, + PutBlobError, +}; + +pub struct ContentAddressableBlobStore { + name: String, + base_path: PathBuf, + db: Arc, +} + +impl std::fmt::Debug for ContentAddressableBlobStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ContentAddressableBlobStore") + .field("name", &self.name) + .finish() + } +} + +impl ContentAddressableBlobStore { + pub fn new(name: String, base_path: PathBuf, db: Arc) -> Self { + ContentAddressableBlobStore { + name, + base_path, + db, + } + } + + fn get_path(&self, keys: Vec) -> String { + format!("\0store\0{}\0{}", self.name, keys.join("\0")) + } + + fn get_disk_base_path(&self) -> PathBuf { + self.base_path.join("data") + } + + fn get_disk_path(&self, sha256sum: &str) -> PathBuf { + self.get_disk_base_path().join(sha256sum) + } + + fn get_disk_tmp_path(&self) -> PathBuf { + let uuid = uuid::Uuid::new_v4().to_string(); + self.get_disk_base_path().join("tmp").join(uuid) + } + + async fn hash_file(path: &PathBuf) -> Result<(String, String), std::io::Error> { + let mut file = tokio::fs::File::open(path).await?; + let mut buf = vec![0; 65536]; + let mut md5_hash = Md5::new(); + let mut sha2_256_hash = sha2::Sha256::new(); + loop { + let nread = file.read(&mut buf).await?; + if nread == 0 { + break; + } + md5_hash.update(&buf[..nread]); + sha2_256_hash.update(&buf[..nread]); + } + let md5sum = hex::encode(md5_hash.finalize()); + let sha2_256sum = hex::encode(sha2_256_hash.finalize()); + Ok((md5sum, sha2_256sum)) + } +} + +impl Actor for ContentAddressableBlobStore { + type Context = Context; + + fn started(&mut self, ctx: &mut Self::Context) { + let fullpath = self.base_path.join("data").join("tmp"); + let fut = async move { tokio::fs::create_dir_all(&fullpath).await.unwrap() }; + + fut.into_actor(self).wait(ctx) + } +} + +impl Handler for ContentAddressableBlobStore { + type Result = ResponseActFuture>; + + fn handle(&mut self, msg: PutBlob, _ctx: &mut Context) -> Self::Result { + let PutBlob { data } = msg; + let basedir = self.get_disk_base_path(); + let tmpfile = self.get_disk_tmp_path(); + + Box::pin( + async move { + let file = Box::pin( + futures_util::AsyncWriteExt::into_sink( + tokio::fs::File::create(&tmpfile).await?.compat_write(), + ) + .sink_map_err(|e| e.into()), + ); + + data.map_err(|e| Into::::into(e)) + .forward(file) + .await?; + + let (md5sum, sha256sum) = Self::hash_file(&tmpfile).await?; + + let target_name = basedir.join(&sha256sum); + tokio::fs::rename(&tmpfile, &target_name).await?; + + let size = tokio::fs::metadata(&target_name).await?.len(); + + Ok(BlobInfo { + md5sum, + sha256sum, + size, + }) + } + .into_actor(self) // converts future to ActorFuture + .map(|res, _act, _ctx| res), + ) + } +} + +impl Handler for ContentAddressableBlobStore { + type Result = ResponseFuture< + Result< + Pin> + Send + Sync>>, + GetBlobError, + >, + >; + + fn handle(&mut self, msg: GetBlob, _ctx: &mut Self::Context) -> Self::Result { + let GetBlob { sha256sum } = msg; + let filepath = self.get_disk_path(&sha256sum); + + Box::pin(async move { + let metadata = tokio::fs::metadata(&filepath).await?; + if !metadata.is_file() { + return Err(GetBlobError::BlobDoesNotExist); + } + + let file = tokio::fs::File::open(&filepath).await?; + let stream = FramedRead::new(file, BytesCodec::new()) + .map_ok(BytesMut::freeze) + .map_err(|e| BlobReadError { msg: e.to_string() }); + + // need a explicit type here, otherwise daemons will arise + let out: Result< + Pin> + Send + Sync>>, + GetBlobError, + > = Ok(Box::pin(stream)); + out + }) + } +} + +impl Handler for ContentAddressableBlobStore { + type Result = ResponseFuture>; + + fn handle(&mut self, msg: DeleteBlob, _ctx: &mut Self::Context) -> Self::Result { + let DeleteBlob { sha256sum } = msg; + let filepath = self.get_disk_path(&sha256sum); + + Box::pin(async move { + let metadata = tokio::fs::metadata(&filepath).await?; + if !metadata.is_file() { + return Err(DeleteBlobError::BlobDoesNotExist); + } + + tokio::fs::remove_file(&filepath).await?; + Ok(()) + }) + } +} diff --git a/replistore/src/stores/keyvalue.rs b/replistore/src/stores/keyvalue.rs new file mode 100644 index 0000000..ef77b7d --- /dev/null +++ b/replistore/src/stores/keyvalue.rs @@ -0,0 +1,122 @@ +use actix::prelude::*; +use rocksdb::TransactionDB; +use std::{collections::HashMap, sync::Arc}; + +use super::messages::{Delete, Get, List, MultiDelete, MultiSet, Set, StoreError}; + +pub struct KeyValueStore { + name: String, + db: Arc, +} + +impl std::fmt::Debug for KeyValueStore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("KeyValueStore") + .field("name", &self.name) + .finish() + } +} + +impl KeyValueStore { + pub fn new(name: String, db: Arc) -> Self { + KeyValueStore { name, db } + } + + fn get_path(&self, keys: Vec) -> String { + format!("\0store\0{}\0{}", self.name, keys.join("\0")) + } + + fn iter_range(&self, key: &[u8]) -> rocksdb::DBIteratorWithThreadMode<'_, TransactionDB> { + let mut options = rocksdb::ReadOptions::default(); + options.set_iterate_range(rocksdb::PrefixRange(key)); + self.db.iterator_opt( + rocksdb::IteratorMode::From(key, rocksdb::Direction::Forward), + options, + ) + } +} + +impl Actor for KeyValueStore { + type Context = Context; +} + +impl Handler for KeyValueStore { + type Result = Result, StoreError>; + + fn handle(&mut self, msg: Get, _ctx: &mut Self::Context) -> Self::Result { + let Get { key } = msg; + Ok(self + .db + .get(&self.get_path(key))? + .map(|e| String::from_utf8(e).unwrap())) + } +} + +impl Handler for KeyValueStore { + type Result = Result<(), StoreError>; + + fn handle(&mut self, msg: Set, _ctx: &mut Self::Context) -> Self::Result { + let Set { key, value } = msg; + Ok(self.db.put(&self.get_path(key), value)?) + } +} + +impl Handler for KeyValueStore { + type Result = Result<(), StoreError>; + + fn handle(&mut self, msg: MultiSet, _ctx: &mut Self::Context) -> Self::Result { + let MultiSet { mut data } = msg; + let txn = self.db.transaction(); + for (key, value) in data.drain() { + txn.put(&self.get_path(key), value)?; + } + txn.commit()?; + Ok(()) + } +} + +impl Handler for KeyValueStore { + type Result = Result<(), StoreError>; + + fn handle(&mut self, msg: Delete, _ctx: &mut Self::Context) -> Self::Result { + let Delete { key } = msg; + Ok(self.db.delete(&self.get_path(key))?) + } +} + +impl Handler for KeyValueStore { + type Result = Result<(), StoreError>; + + fn handle(&mut self, msg: MultiDelete, _ctx: &mut Self::Context) -> Self::Result { + let MultiDelete { mut data } = msg; + let txn = self.db.transaction(); + for key in data.drain(..) { + let path = self.get_path(key); + let path_bytes = path.as_bytes(); + for found in self.iter_range(path_bytes) { + txn.delete(found?.0)? + } + } + txn.commit()?; + Ok(()) + } +} + +impl Handler for KeyValueStore { + type Result = Result, StoreError>; + + fn handle(&mut self, msg: List, _ctx: &mut Self::Context) -> Self::Result { + let List { prefix } = msg; + let path = self.get_path(prefix); + let path_bytes = path.as_bytes(); + self.iter_range(path_bytes) + .try_fold(HashMap::new(), |mut map, e| { + let (key, value) = e?; + let keystring = + String::from_utf8(key.iter().cloned().skip(path_bytes.len()).collect()) + .unwrap(); + map.insert(keystring, String::from_utf8(value.to_vec()).unwrap()); + Ok(map) + }) + } +} diff --git a/replistore/src/stores/messages.rs b/replistore/src/stores/messages.rs new file mode 100644 index 0000000..ee76264 --- /dev/null +++ b/replistore/src/stores/messages.rs @@ -0,0 +1,188 @@ +use std::collections::HashMap; +use std::fmt::Display; +use std::path::PathBuf; +use std::pin::Pin; + +use super::contentaddressableblob::ContentAddressableBlobStore; +use super::keyvalue::KeyValueStore; + +use actix::prelude::*; +use bytes::Bytes; + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum StoreType { + ContentAddressableBlob, + KeyValue, +} + +impl StoreType { + pub fn as_bytes(&self) -> &[u8] { + match self { + StoreType::ContentAddressableBlob => b"ContentAddressableBlob", + StoreType::KeyValue => b"KeyValue", + } + } +} + +impl Display for StoreType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + StoreType::ContentAddressableBlob => "ContentAddressableBlob", + StoreType::KeyValue => "KeyValue", + }) + } +} + +#[derive(Debug)] +pub struct StoreError { + rocks_error: rocksdb::Error, +} + +impl From for StoreError { + fn from(value: rocksdb::Error) -> Self { + StoreError { rocks_error: value } + } +} + +#[derive(Debug)] +pub enum GetOrCreateError { + StoreTypeMissmatch(String, String), + StoreError(StoreError), +} + +impl From for GetOrCreateError { + fn from(value: rocksdb::Error) -> Self { + Self::StoreError(value.into()) + } +} + +#[derive(Message)] +#[rtype(result = "Result, GetOrCreateError>")] +pub struct GetOrCreateKeyValueStore { + pub name: String, +} + +#[derive(Message)] +#[rtype(result = "Result, GetOrCreateError>")] +pub struct GetOrCreateContentAddressableBlobStore { + pub name: String, + pub path: PathBuf, +} + +#[derive(Message)] +#[rtype(result = "Result, StoreError>")] +pub struct Get { + pub key: Vec, +} + +#[derive(Message)] +#[rtype(result = "Result<(), StoreError>")] +pub struct Set { + pub key: Vec, + pub value: String, +} + +#[derive(Message)] +#[rtype(result = "Result<(), StoreError>")] +pub struct MultiSet { + pub data: HashMap, String>, +} + +#[derive(Message)] +#[rtype(result = "Result<(), StoreError>")] +pub struct Delete { + pub key: Vec, +} + +#[derive(Message)] +#[rtype(result = "Result<(), StoreError>")] +pub struct MultiDelete { + pub data: Vec>, +} + +#[derive(Message)] +#[rtype(result = "Result, StoreError>")] +pub struct List { + pub prefix: Vec, +} + +#[derive(Debug)] +pub struct BlobReadError { + pub msg: String, +} + +pub enum PutBlobError { + StoreError(StoreError), + BlobReadError(BlobReadError), + IoError(std::io::Error), +} + +impl From for PutBlobError { + fn from(value: std::io::Error) -> Self { + Self::IoError(value) + } +} + +impl From for PutBlobError { + fn from(value: BlobReadError) -> Self { + Self::BlobReadError(value) + } +} + +#[derive(Debug)] +pub struct BlobInfo { + pub md5sum: String, + pub sha256sum: String, + pub size: u64, +} + +#[derive(Message)] +#[rtype(result = "Result")] +pub struct PutBlob { + pub data: Pin> + Send>>, +} + +pub enum GetBlobError { + StoreError(StoreError), + BlobReadError(BlobReadError), + IoError(std::io::Error), + BlobDoesNotExist, +} + +impl From for GetBlobError { + fn from(value: std::io::Error) -> Self { + Self::IoError(value) + } +} + +impl From for GetBlobError { + fn from(value: BlobReadError) -> Self { + Self::BlobReadError(value) + } +} + +#[derive(Message)] +#[rtype( + result = "Result> + Send + Sync>>, GetBlobError>" +)] +pub struct GetBlob { + pub sha256sum: String, +} + +pub enum DeleteBlobError { + StoreError(StoreError), + IoError(std::io::Error), + BlobDoesNotExist, +} + +impl From for DeleteBlobError { + fn from(value: std::io::Error) -> Self { + Self::IoError(value) + } +} + +#[derive(Message)] +#[rtype(result = "Result<(), DeleteBlobError>")] +pub struct DeleteBlob { + pub sha256sum: String, +} diff --git a/replistore/src/stores/mod.rs b/replistore/src/stores/mod.rs new file mode 100644 index 0000000..c0606cd --- /dev/null +++ b/replistore/src/stores/mod.rs @@ -0,0 +1,4 @@ +pub mod contentaddressableblob; +pub mod keyvalue; +pub mod messages; +pub mod storeowner; diff --git a/replistore/src/stores/storeowner.rs b/replistore/src/stores/storeowner.rs new file mode 100644 index 0000000..81ce86e --- /dev/null +++ b/replistore/src/stores/storeowner.rs @@ -0,0 +1,105 @@ +use actix::prelude::*; +use rocksdb::TransactionDB; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; + +use super::{ + contentaddressableblob::ContentAddressableBlobStore, + keyvalue::KeyValueStore, + messages::{ + GetOrCreateContentAddressableBlobStore, GetOrCreateError, GetOrCreateKeyValueStore, + StoreType, + }, +}; + +pub struct StoreOwner { + db: Arc, + kv_stores: HashMap>, + blob_stores: HashMap>, +} + +impl std::fmt::Debug for StoreOwner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StoreOwner") + .field("kv_stores", &self.kv_stores) + .field("blob_stores", &self.blob_stores) + .finish() + } +} + +impl StoreOwner { + pub fn new(db_path: PathBuf) -> Result { + let db = TransactionDB::open_default(db_path)?; + Ok(StoreOwner { + db: Arc::new(db), + kv_stores: HashMap::new(), + blob_stores: HashMap::new(), + }) + } + + fn check_or_create_store_type( + &self, + name: &str, + store_type: StoreType, + ) -> Result, GetOrCreateError> { + let type_key = format!("\0type\0{}", name); + match self.db.get(&type_key)? { + Some(entry) => { + if entry == store_type.as_bytes() { + Ok(Some(())) + } else { + Err(GetOrCreateError::StoreTypeMissmatch( + store_type.to_string(), + String::from_utf8(entry).unwrap(), + )) + } + } + None => { + self.db.put(&type_key, store_type.as_bytes())?; + Ok(Some(())) + } + } + } +} + +impl Actor for StoreOwner { + type Context = Context; +} + +impl Handler for StoreOwner { + type Result = Result, GetOrCreateError>; + + fn handle(&mut self, msg: GetOrCreateKeyValueStore, _ctx: &mut Context) -> Self::Result { + let GetOrCreateKeyValueStore { name } = msg; + self.check_or_create_store_type(&name, StoreType::KeyValue)?; + match self.kv_stores.get(&name) { + Some(addr) => Ok(addr.clone()), + None => { + let kv_store = KeyValueStore::new(name.clone(), self.db.clone()).start(); + self.kv_stores.insert(name, kv_store.clone()); + Ok(kv_store) + } + } + } +} + +impl Handler for StoreOwner { + type Result = Result, GetOrCreateError>; + + fn handle( + &mut self, + msg: GetOrCreateContentAddressableBlobStore, + _ctx: &mut Context, + ) -> Self::Result { + let GetOrCreateContentAddressableBlobStore { name, path } = msg; + self.check_or_create_store_type(&name, StoreType::ContentAddressableBlob)?; + match self.blob_stores.get(&name) { + Some(addr) => Ok(addr.clone()), + None => { + let blob_store = + ContentAddressableBlobStore::new(name.clone(), path, self.db.clone()).start(); + self.blob_stores.insert(name, blob_store.clone()); + Ok(blob_store) + } + } + } +}