Skip to content

Commit

Permalink
Stop passing MononokeApp to the worker.
Browse files Browse the repository at this point in the history
Summary:
## This stack

Running the worker for all repos is causing OOMs; sharding should help.

## This diff

The `MononokeApp` instance is supposed to be a singleton and remain in the top context, not passed around. With the preparation from the previous diff, we can clean this up now.

Reviewed By: singhsrb

Differential Revision: D64410195

fbshipit-source-id: 90b6e27429f9ee765f70ba975e516fe75c4c1dc2
  • Loading branch information
andreacampi authored and facebook-github-bot committed Oct 17, 2024
1 parent 112b915 commit 63c6e58
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 24 deletions.
7 changes: 5 additions & 2 deletions eden/mononoke/async_requests/async_requests_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub async fn build(
))
}

async fn open_sql_connection(
pub async fn open_sql_connection(
fb: FacebookInit,
app: &MononokeApp,
) -> Result<SqlLongRunningRequestsQueue, Error> {
Expand All @@ -61,7 +61,10 @@ async fn open_sql_connection(
}
}

async fn open_blobstore(fb: FacebookInit, app: &MononokeApp) -> Result<Arc<dyn Blobstore>, Error> {
pub async fn open_blobstore(
fb: FacebookInit,
app: &MononokeApp,
) -> Result<Arc<dyn Blobstore>, Error> {
let config = app.repo_configs().common.async_requests_config.clone();
if let Some(config) = config.blobstore {
let options = app.blobstore_options();
Expand Down
5 changes: 2 additions & 3 deletions eden/mononoke/async_requests/worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ fn main(fb: FacebookInit) -> Result<()> {
} else {
Some(repos)
};
let queue = Arc::new(runtime.block_on(async_requests_client::build(fb, &app, filter_repos))?);
let worker = runtime.block_on(worker::AsyncMethodRequestWorker::new(
app.fb,
&app,
args.clone(),
Arc::new(ctx),
filter_repos,
queue,
mononoke,
megarepo,
will_exit.clone(),
Expand Down
25 changes: 6 additions & 19 deletions eden/mononoke/async_requests/worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use async_requests::types::AsynchronousRequestParams;
Expand All @@ -31,7 +30,6 @@ use async_trait::async_trait;
use cloned::cloned;
use context::CoreContext;
use executor_lib::RepoShardedProcessExecutor;
use fbinit::FacebookInit;
use futures::future::abortable;
use futures::future::select;
use futures::future::Either;
Expand All @@ -44,8 +42,6 @@ use megarepo_api::MegarepoApi;
use mononoke_api::Mononoke;
use mononoke_api::MononokeRepo;
use mononoke_api::Repo;
use mononoke_api::RepositoryId;
use mononoke_app::MononokeApp;
use mononoke_types::Timestamp;
use slog::debug;
use slog::error;
Expand Down Expand Up @@ -85,23 +81,17 @@ pub struct AsyncMethodRequestWorker {
mononoke: Arc<Mononoke<Repo>>,
megarepo: Arc<MegarepoApi<Repo>>,
name: String,
queue: AsyncMethodRequestQueue,
queue: Arc<AsyncMethodRequestQueue>,
will_exit: Arc<AtomicBool>,
limit: Option<usize>,
concurrency_limit: usize,
}

impl AsyncMethodRequestWorker {
/// Creates a new tailer instance that's going to use provided megarepo API
/// The name argument should uniquely identify tailer instance and will be put
/// in the queue table so it's possible to find out which instance is working on
/// a given task (for debugging purposes).
pub(crate) async fn new(
fb: FacebookInit,
app: &MononokeApp,
args: Arc<AsyncRequestsWorkerArgs>,
ctx: Arc<CoreContext>,
repos: Option<Vec<RepositoryId>>,
queue: Arc<AsyncMethodRequestQueue>,
mononoke: Arc<Mononoke<Repo>>,
megarepo: Arc<MegarepoApi<Repo>>,
will_exit: Arc<AtomicBool>,
Expand All @@ -121,9 +111,6 @@ impl AsyncMethodRequestWorker {
}
};

let queue = async_requests_client::build(fb, app, repos)
.await
.context("acquiring the async requests queue")?;
Ok(Self {
ctx,
mononoke,
Expand Down Expand Up @@ -199,7 +186,7 @@ impl AsyncMethodRequestWorker {
pub fn request_stream(
&self,
ctx: &CoreContext,
queue: AsyncMethodRequestQueue,
queue: Arc<AsyncMethodRequestQueue>,
will_exit: Arc<AtomicBool>,
) -> impl Stream<Item = Result<(RequestId, AsynchronousRequestParams), AsyncRequestsError>>
{
Expand All @@ -218,7 +205,7 @@ impl AsyncMethodRequestWorker {
fn request_stream_inner(
ctx: CoreContext,
claimed_by: ClaimedBy,
queue: AsyncMethodRequestQueue,
queue: Arc<AsyncMethodRequestQueue>,
will_exit: Arc<AtomicBool>,
sleep_time: Duration,
abandoned_threshold_secs: i64,
Expand Down Expand Up @@ -489,7 +476,7 @@ mod test {

#[mononoke::fbinit_test]
async fn test_request_stream_simple(fb: FacebookInit) -> Result<(), Error> {
let q = AsyncMethodRequestQueue::new_test_in_memory().unwrap();
let q = Arc::new(AsyncMethodRequestQueue::new_test_in_memory().unwrap());
let ctx = CoreContext::test_mock(fb);

let params = thrift::MegarepoSyncChangesetParams {
Expand Down Expand Up @@ -529,7 +516,7 @@ mod test {

#[mononoke::fbinit_test]
async fn test_request_stream_clear_abandoned(fb: FacebookInit) -> Result<(), Error> {
let q = AsyncMethodRequestQueue::new_test_in_memory().unwrap();
let q = Arc::new(AsyncMethodRequestQueue::new_test_in_memory().unwrap());
let ctx = CoreContext::test_mock(fb);

let params = thrift::MegarepoSyncChangesetParams {
Expand Down

0 comments on commit 63c6e58

Please sign in to comment.