Skip to content

Commit

Permalink
Implement sharded mode execution
Browse files Browse the repository at this point in the history
Summary:
## This stack

Running the worker for all repos is causing OOMs; sharding should help.

## This diff

With the preparation we've done eariler in the stack, we can now enabled sharded execution.

Reviewed By: singhsrb

Differential Revision: D64402510

fbshipit-source-id: 9b0a57a0423d47a5c48233fea63bd50e0cc5af1e
  • Loading branch information
andreacampi authored and facebook-github-bot committed Oct 17, 2024
1 parent 63c6e58 commit dc46653
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 36 deletions.
4 changes: 2 additions & 2 deletions eden/mononoke/async_requests/async_requests_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use mononoke_api::MononokeRepo;
use mononoke_api::RepositoryId;
use mononoke_app::MononokeApp;
use requests_table::SqlLongRunningRequestsQueue;
use slog::info;
use slog::debug;
use sql_construct::SqlConstructFromDatabaseConfig;
use sql_ext::facebook::MysqlOptions;

Expand All @@ -46,7 +46,7 @@ pub async fn open_sql_connection(
) -> Result<SqlLongRunningRequestsQueue, Error> {
let config = app.repo_configs().common.async_requests_config.clone();
if let Some(config) = config.db_config {
info!(
debug!(
app.logger(),
"Initializing async_requests with an explicit config"
);
Expand Down
4 changes: 3 additions & 1 deletion eden/mononoke/async_requests/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ async-trait = "0.1.71"
async_requests = { version = "0.1.0", path = "../lib" }
async_requests_client = { version = "0.1.0", path = "../async_requests_client" }
async_requests_types_thrift = { version = "0.1.0", path = "../if" }
blobstore = { version = "0.1.0", path = "../../blobstore" }
clap = { version = "4.5.20", features = ["derive", "env", "string", "unicode", "wrap_help"] }
cloned = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
cmdlib_logging = { version = "0.1.0", path = "../../cmdlib/log" }
Expand All @@ -31,6 +32,8 @@ mononoke_api = { version = "0.1.0", path = "../../mononoke_api" }
mononoke_app = { version = "0.1.0", path = "../../cmdlib/mononoke_app" }
mononoke_types = { version = "0.1.0", path = "../../mononoke_types" }
repo_authorization = { version = "0.1.0", path = "../../repo_authorization" }
requests_table = { version = "0.1.0", path = "../requests_table" }
sharding_ext = { version = "0.1.0", path = "../../cmdlib/sharding_ext" }
slog = { version = "2.7", features = ["max_level_trace", "nested-values"] }
source_control = { version = "0.1.0", path = "../../scs/if" }
stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
Expand All @@ -39,4 +42,3 @@ tokio = { version = "1.37.0", features = ["full", "test-util", "tracing"] }
[dev-dependencies]
fbinit-tokio = { version = "0.1.2", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
mononoke_macros = { version = "0.1.0", path = "../../mononoke_macros" }
requests_table = { version = "0.1.0", path = "../requests_table" }
3 changes: 3 additions & 0 deletions eden/mononoke/async_requests/worker/TARGETS
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ rust_binary(
"//common/rust/shed/stats:stats",
"//eden/mononoke/async_requests:async_requests",
"//eden/mononoke/async_requests:async_requests_client",
"//eden/mononoke/async_requests:requests_table",
"//eden/mononoke/async_requests/if:async_requests_types-thrift-rust",
"//eden/mononoke/blobstore:blobstore",
"//eden/mononoke/blobstore:ephemeral_blobstore",
"//eden/mononoke/cmdlib:cmdlib_logging",
"//eden/mononoke/cmdlib:environment",
"//eden/mononoke/cmdlib/mononoke_app:mononoke_app",
"//eden/mononoke/cmdlib/sharding:executor_lib",
"//eden/mononoke/cmdlib/sharding_ext:sharding_ext",
"//eden/mononoke/megarepo_api:megarepo_api",
"//eden/mononoke/megarepo_api:megarepo_error",
"//eden/mononoke/metaconfig:metaconfig_types",
Expand Down
195 changes: 162 additions & 33 deletions eden/mononoke/async_requests/worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,27 @@ use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use anyhow::Context;
use anyhow::Result;
use async_requests::AsyncMethodRequestQueue;
use async_requests_client::open_blobstore;
use async_requests_client::open_sql_connection;
use async_trait::async_trait;
use clap::Parser;
use cloned::cloned;
use cmdlib_logging::ScribeLoggingArgs;
use context::CoreContext;
use context::SessionContainer;
use environment::BookmarkCacheDerivedData;
use environment::BookmarkCacheKind;
use environment::BookmarkCacheOptions;
use executor_lib::args::ShardedExecutorArgs;
use executor_lib::RepoShardedProcess;
use executor_lib::RepoShardedProcessExecutor;
use fbinit::FacebookInit;
use megarepo_api::MegarepoApi;
use metaconfig_types::ShardedService;
use mononoke_api::Mononoke;
use mononoke_api::Repo;
use mononoke_app::args::HooksAppExtension;
use mononoke_app::args::RepoFilterAppExtension;
Expand All @@ -33,12 +43,21 @@ use mononoke_app::args::WarmBookmarksCacheExtension;
use mononoke_app::fb303::AliveService;
use mononoke_app::fb303::Fb303AppExtension;
use mononoke_app::MononokeAppBuilder;
use mononoke_app::MononokeReposManager;
use requests_table::SqlLongRunningRequestsQueue;
use sharding_ext::RepoShard;
use slog::info;

const SERVICE_NAME: &str = "async_requests_worker";

const SM_CLEANUP_TIMEOUT_SECS: u64 = 60;

/// Processes the megarepo async requests
#[derive(Parser)]
struct AsyncRequestsWorkerArgs {
#[clap(flatten)]
pub sharded_executor_args: ShardedExecutorArgs,

#[clap(flatten)]
shutdown_timeout_args: ShutdownTimeoutArgs,
#[clap(flatten)]
Expand All @@ -55,6 +74,75 @@ struct AsyncRequestsWorkerArgs {
process_all_repos: bool,
}

pub struct WorkerProcess {
ctx: Arc<CoreContext>,
args: Arc<AsyncRequestsWorkerArgs>,
repos_mgr: Arc<MononokeReposManager<Repo>>,
mononoke: Arc<Mononoke<Repo>>,
megarepo: Arc<MegarepoApi<Repo>>,
sql_connection: Arc<SqlLongRunningRequestsQueue>,
blobstore: Arc<dyn blobstore::Blobstore>,
will_exit: Arc<AtomicBool>,
}

impl WorkerProcess {
pub(crate) fn new(
ctx: Arc<CoreContext>,
args: Arc<AsyncRequestsWorkerArgs>,
repos_mgr: Arc<MononokeReposManager<Repo>>,
mononoke: Arc<Mononoke<Repo>>,
megarepo: Arc<MegarepoApi<Repo>>,
sql_connection: Arc<SqlLongRunningRequestsQueue>,
blobstore: Arc<dyn blobstore::Blobstore>,
will_exit: Arc<AtomicBool>,
) -> Self {
Self {
ctx,
args,
repos_mgr,
mononoke,
megarepo,
sql_connection,
blobstore,
will_exit,
}
}
}

#[async_trait]
impl RepoShardedProcess for WorkerProcess {
async fn setup(&self, repo: &RepoShard) -> Result<Arc<dyn RepoShardedProcessExecutor>> {
let repo_name = repo.repo_name.as_str();
let logger = self.repos_mgr.repo_logger(repo_name);
info!(&logger, "Setting up repo {}", repo_name);

let repo = self
.repos_mgr
.add_repo(repo_name)
.await
.with_context(|| format!("Failure in setting up repo {}", repo_name))?;
let repos = vec![repo.repo_identity.id()];
info!(&logger, "Completed setup for repos {:?}", repos);

let queue = Arc::new(AsyncMethodRequestQueue::new(
self.sql_connection.clone(),
self.blobstore.clone(),
Some(repos),
));

let executor = worker::AsyncMethodRequestWorker::new(
self.args.clone(),
self.ctx.clone(),
queue,
self.mononoke.clone(),
self.megarepo.clone(),
self.will_exit.clone(),
)
.await?;
Ok(Arc::new(executor))
}
}

#[fbinit::main]
fn main(fb: FacebookInit) -> Result<()> {
let app = MononokeAppBuilder::new(fb)
Expand All @@ -69,50 +157,91 @@ fn main(fb: FacebookInit) -> Result<()> {
.build::<AsyncRequestsWorkerArgs>()?;

let args: Arc<AsyncRequestsWorkerArgs> = Arc::new(app.args()?);

let env = app.environment();
let logger = app.logger().clone();
let runtime = app.runtime().clone();
let session = SessionContainer::new_with_defaults(env.fb);
let ctx = session.new_context(app.logger().clone(), env.scuba_sample_builder.clone());

let mononoke = Arc::new(
runtime
.block_on(app.open_managed_repos::<Repo>(Some(ShardedService::AsyncRequestsWorker)))?
.make_mononoke_api()?,
);
let repos = mononoke.known_repo_ids();
let ctx = Arc::new(session.new_context(app.logger().clone(), env.scuba_sample_builder.clone()));

let service_name = Some(ShardedService::AsyncRequestsWorker);
let repos_mgr = Arc::new(runtime.block_on(app.open_managed_repos(service_name))?);
let mononoke = Arc::new(repos_mgr.make_mononoke_api()?);
let megarepo = Arc::new(MegarepoApi::new(&app, mononoke.clone())?);

let sql_connection = Arc::new(runtime.block_on(open_sql_connection(fb, &app))?);
let blobstore = runtime.block_on(open_blobstore(fb, &app))?;
let will_exit = Arc::new(AtomicBool::new(false));
let filter_repos = if args.process_all_repos {
None
} else {
Some(repos)
};
let queue = Arc::new(runtime.block_on(async_requests_client::build(fb, &app, filter_repos))?);
let worker = runtime.block_on(worker::AsyncMethodRequestWorker::new(
args.clone(),
Arc::new(ctx),
queue,
mononoke,
megarepo,
will_exit.clone(),
))?;

app.start_monitoring(SERVICE_NAME, AliveService)?;
app.start_stats_aggregation()?;

let run_worker = { move |_app| async move { worker.execute().await } };

app.run_until_terminated(
run_worker,
move || will_exit.store(true, Ordering::Relaxed),
args.shutdown_timeout_args.shutdown_grace_period,
async {
// the code to gracefully stop things goes here
if let Some(mut executor) = args.sharded_executor_args.clone().build_executor(
app.fb,
runtime.clone(),
&logger,
|| {
Arc::new(WorkerProcess::new(
ctx.clone(),
args.clone(),
repos_mgr.clone(),
mononoke.clone(),
megarepo.clone(),
sql_connection.clone(),
blobstore.clone(),
will_exit.clone(),
))
},
args.shutdown_timeout_args.shutdown_timeout,
)?;
true, // enable shard (repo) level healing
SM_CLEANUP_TIMEOUT_SECS,
)? {
info!(logger, "Starting sharded process");
// The Sharded Process Executor needs to branch off and execute
// on its own dedicated task spawned off the common tokio runtime.
runtime.spawn({
let logger = logger.clone();
{
cloned!(will_exit);
async move { executor.block_and_execute(&logger, will_exit).await }
}
});

app.wait_until_terminated(
move || will_exit.store(true, Ordering::Relaxed),
args.shutdown_timeout_args.shutdown_grace_period,
async {
info!(logger, "Shutdown");
},
args.shutdown_timeout_args.shutdown_timeout,
)?;
} else {
let logger = logger.clone();
let queue = Arc::new(AsyncMethodRequestQueue::new(
sql_connection,
blobstore,
None,
));

info!(logger, "Starting unsharded executor for all repos");
let executor = runtime.block_on(worker::AsyncMethodRequestWorker::new(
args.clone(),
ctx.clone(),
queue.clone(),
mononoke.clone(),
megarepo.clone(),
will_exit.clone(),
))?;
let run_worker = { move |_app| async move { executor.execute().await } };

app.run_until_terminated(
run_worker,
move || will_exit.store(true, Ordering::Relaxed),
args.shutdown_timeout_args.shutdown_grace_period,
async {
info!(logger, "Shutdown");
},
args.shutdown_timeout_args.shutdown_timeout,
)?;
}

Ok(())
}

0 comments on commit dc46653

Please sign in to comment.