Skip to content

Commit

Permalink
Merge branch 'replistore'
Browse files Browse the repository at this point in the history
Currently not integrated but will change "soon"
  • Loading branch information
huettner94 committed Jul 28, 2024
2 parents 36d4928 + 7d391db commit 262a09a
Show file tree
Hide file tree
Showing 16 changed files with 2,794 additions and 155 deletions.
1,039 changes: 885 additions & 154 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ members = [
"cli",
"client",
"dtrd",
"tcpcl"
"tcpcl",
"replistore"
]

resolver = "2"
39 changes: 39 additions & 0 deletions replistore/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[package]
name = "replistore"
version = "0.1.0"
edition = "2021"
resolver = "2"

[dependencies]
log = "0.4"
env_logger = "0.10.0"
prost = "0.12.1"
dtrd_client = {path = "../client"}
tokio = { version = "1.34.0", features = ["full", "tracing"] }
futures-util = "0.3.28"
s3s = "0.8.0"
hyper = "0.14.27"
anyhow = "1.0.75"
async-trait = "0.1.74"
tokio-util = { version = "0.7.10", features = ["full"] }
bytes = "1.5.0"
time = "0.3.30"
md-5 = "0.10.6"
hex = "0.4.3"
sha2 = "0.10.8"
actix = "0.13.1"
actix-rt = "2.9.0"
console-subscriber = "0.2.0"
tracing = "0.1.40"
tracing-opentelemetry = "0.23.0"
tracing-subscriber = "0.3.18"
opentelemetry = "0.22.0"
opentelemetry-otlp = "0.15.0"
opentelemetry_sdk = { version = "0.22.1", features = ["rt-tokio"] }
futures = "0.3.30"
uuid = {version = "1.8.0", features = ["v4"]}
# until https://github.com/rust-rocksdb/rust-rocksdb/pull/868/commits/ef029f292c058d3465fcc203b32473ecaddfcd4f is released
rocksdb = { git = "https://github.com/rust-rocksdb/rust-rocksdb" }

[build-dependencies]
prost-build = { version = "0.12.1" }
18 changes: 18 additions & 0 deletions replistore/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (C) 2023 Felix Huettner
//
// This file is part of DTRD.
//
// DTRD is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// DTRD is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

pub mod settings;
41 changes: 41 additions & 0 deletions replistore/src/common/settings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (C) 2023 Felix Huettner
//
// This file is part of DTRD.
//
// DTRD is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// DTRD is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::env;

#[derive(Debug, Clone)]
pub struct Settings {
pub tokio_tracing_port: Option<String>,
}

impl Default for Settings {
fn default() -> Self {
Self {
tokio_tracing_port: None,
}
}
}

impl Settings {
pub fn from_env() -> Self {
let mut settings = Settings::default();
if let Ok(setting) = env::var("TOKIO_TRACING_PORT") {
settings.tokio_tracing_port = Some(setting);
};
settings
}
}
18 changes: 18 additions & 0 deletions replistore/src/frontend/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (C) 2023 Felix Huettner
//
// This file is part of DTRD.
//
// DTRD is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// DTRD is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

pub mod s3;
193 changes: 193 additions & 0 deletions replistore/src/frontend/s3/messages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
use std::pin::Pin;

use actix::prelude::*;
use time::OffsetDateTime;

use crate::stores::messages::{DeleteBlobError, GetBlobError, PutBlobError, StoreError};

#[derive(Debug)]
pub struct S3Error {
store_error: StoreError,
}

impl From<StoreError> for S3Error {
fn from(value: StoreError) -> Self {
S3Error { store_error: value }
}
}

#[derive(Debug)]
pub struct Object {
pub key: String,
pub md5sum: String,
pub sha256sum: String,
pub last_modified: OffsetDateTime,
pub size: u64,
}

#[derive(Message)]
#[rtype(result = "Result<Vec<String>, S3Error>")]
pub struct ListBuckets {}

pub enum CreateBucketError {
S3Error(S3Error),
BucketAlreadyExists,
}

impl From<StoreError> for CreateBucketError {
fn from(value: StoreError) -> Self {
Self::S3Error(value.into())
}
}

#[derive(Message)]
#[rtype(result = "Result<String, CreateBucketError>")]
pub struct CreateBucket {
pub name: String,
}

#[derive(Message)]
#[rtype(result = "Result<Option<()>, S3Error>")]
pub struct HeadBucket {
pub name: String,
}

#[derive(Debug)]
pub struct ReadDataError {
pub msg: String,
}

pub enum PutObjectError {
S3Error(S3Error),
BucketNotFound,
ReadDataError(ReadDataError),
}

impl From<StoreError> for PutObjectError {
fn from(value: StoreError) -> Self {
Self::S3Error(value.into())
}
}

impl From<PutBlobError> for PutObjectError {
fn from(value: PutBlobError) -> Self {
match value {
PutBlobError::StoreError(e) => e.into(),
PutBlobError::BlobReadError(e) => Self::ReadDataError(ReadDataError { msg: e.msg }),
PutBlobError::IoError(e) => Self::ReadDataError(ReadDataError { msg: e.to_string() }),
}
}
}

#[derive(Message)]
#[rtype(result = "Result<Object, PutObjectError>")]
pub struct PutObject {
pub bucket: String,
pub key: String,
pub data: Pin<Box<dyn Stream<Item = Result<bytes::Bytes, ReadDataError>> + Send>>,
}

pub enum ListObjectError {
S3Error(S3Error),
BucketNotFound,
}

impl From<StoreError> for ListObjectError {
fn from(value: StoreError) -> Self {
Self::S3Error(value.into())
}
}

#[derive(Message)]
#[rtype(result = "Result<Vec<Object>, ListObjectError>")]
pub struct ListObject {
pub bucket: String,
pub prefix: String,
}

pub enum HeadObjectError {
S3Error(S3Error),
BucketNotFound,
ObjectNotFound,
}

impl From<StoreError> for HeadObjectError {
fn from(value: StoreError) -> Self {
Self::S3Error(value.into())
}
}

#[derive(Message)]
#[rtype(result = "Result<Object, HeadObjectError>")]
pub struct HeadObject {
pub bucket: String,
pub key: String,
}

pub enum GetObjectError {
S3Error(S3Error),
BucketNotFound,
ObjectNotFound,
ReadDataError(ReadDataError),
}

impl From<StoreError> for GetObjectError {
fn from(value: StoreError) -> Self {
Self::S3Error(value.into())
}
}

impl From<GetBlobError> for GetObjectError {
fn from(value: GetBlobError) -> Self {
match value {
GetBlobError::StoreError(e) => e.into(),
GetBlobError::BlobReadError(e) => Self::ReadDataError(ReadDataError { msg: e.msg }),
GetBlobError::IoError(e) => Self::ReadDataError(ReadDataError { msg: e.to_string() }),
GetBlobError::BlobDoesNotExist => GetObjectError::ObjectNotFound,
}
}
}

pub struct GetObjectResult {
pub metadata: Object,
pub data: Pin<Box<dyn Stream<Item = Result<bytes::Bytes, ReadDataError>> + Send + Sync>>,
}

#[derive(Message)]
#[rtype(result = "Result<GetObjectResult, GetObjectError>")]
pub struct GetObject {
pub bucket: String,
pub key: String,
}

pub enum DeleteObjectError {
S3Error(S3Error),
BucketNotFound,
ObjectNotFound,
ReadDataError(ReadDataError),
}

impl From<StoreError> for DeleteObjectError {
fn from(value: StoreError) -> Self {
Self::S3Error(value.into())
}
}

impl From<DeleteBlobError> for DeleteObjectError {
fn from(value: DeleteBlobError) -> Self {
match value {
DeleteBlobError::StoreError(e) => e.into(),
DeleteBlobError::IoError(e) => {
Self::ReadDataError(ReadDataError { msg: e.to_string() })
}
DeleteBlobError::BlobDoesNotExist => DeleteObjectError::ObjectNotFound,
}
}
}

#[derive(Message)]
#[rtype(result = "Result<(), DeleteObjectError>")]
pub struct DeleteObject {
pub bucket: String,
pub key: String,
}
3 changes: 3 additions & 0 deletions replistore/src/frontend/s3/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub(super) mod messages;
pub mod s3;
pub mod s3_frontend;
Loading

0 comments on commit 262a09a

Please sign in to comment.