Skip to content

Commit a245dc7

Browse files
committed
wip bgtask stuff
1 parent 2204b7c commit a245dc7

File tree

4 files changed

+139
-0
lines changed

4 files changed

+139
-0
lines changed

nexus/db-model/src/instance_state.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,8 @@ impl From<external::InstanceState> for InstanceState {
7171
Self::new(state)
7272
}
7373
}
74+
75+
impl diesel::query_builder::QueryId for InstanceStateEnum {
76+
type QueryId = ();
77+
const HAS_STATIC_QUERY_ID: bool = false;
78+
}

nexus/db-queries/src/db/datastore/instance.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,40 @@ impl DataStore {
233233
.collect())
234234
}
235235

236+
/// List all instances with active VMMs in the `Destroyed` state that don't
237+
/// have currently-running instance-updater sagas.
238+
pub async fn find_instances_with_destroyed_active_vmms(
239+
&self,
240+
opctx: &OpContext,
241+
) -> ListResultVec<InstanceAndActiveVmm> {
242+
use db::model::InstanceState as DbInstanceState;
243+
use db::schema::instance::dsl;
244+
use db::schema::vmm::dsl as vmm_dsl;
245+
use omicron_common::api::external::InstanceState;
246+
let destroyed = DbInstanceState::new(InstanceState::Destroyed);
247+
Ok(vmm_dsl::vmm
248+
.filter(vmm_dsl::time_deleted.is_not_null())
249+
.filter(vmm_dsl::state.eq(destroyed))
250+
.inner_join(
251+
dsl::instance.on(dsl::active_propolis_id
252+
.eq(vmm_dsl::id.nullable())
253+
.and(dsl::time_deleted.is_null())
254+
.and(dsl::updater_id.is_null())),
255+
)
256+
.select((Instance::as_select(), Vmm::as_select()))
257+
.load_async::<(Instance, Vmm)>(
258+
&*self.pool_connection_authorized(opctx).await?,
259+
)
260+
.await
261+
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?
262+
.into_iter()
263+
.map(|(instance, vmm)| InstanceAndActiveVmm {
264+
instance,
265+
vmm: Some(vmm),
266+
})
267+
.collect())
268+
}
269+
236270
/// Fetches information about an Instance that the caller has previously
237271
/// fetched
238272
///
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
// This Source Code Form is subject to the terms of the Mozilla Public
2+
// License, v. 2.0. If a copy of the MPL was not distributed with this
3+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
5+
//! Background task for detecting instances in need of update sagas.
6+
//!
7+
//! TODO this is currently a placeholder for a future PR
8+
9+
use super::common::BackgroundTask;
10+
use crate::app::sagas::SagaRequest;
11+
use futures::future::BoxFuture;
12+
use futures::FutureExt;
13+
use nexus_db_queries::context::OpContext;
14+
use nexus_db_queries::db::DataStore;
15+
use serde_json::json;
16+
use std::sync::Arc;
17+
use tokio::sync::mpsc::Sender;
18+
19+
pub struct InstanceUpdater {
20+
datastore: Arc<DataStore>,
21+
saga_req: Sender<SagaRequest>,
22+
}
23+
24+
impl InstanceUpdater {
25+
pub fn new(
26+
datastore: Arc<DataStore>,
27+
saga_req: Sender<SagaRequest>,
28+
) -> Self {
29+
InstanceUpdater { datastore, saga_req }
30+
}
31+
32+
async fn activate2(
33+
&mut self,
34+
opctx: &OpContext,
35+
) -> Result<Updated, anyhow::Error> {
36+
let mut updated = Updated::default();
37+
38+
let log = &opctx.log;
39+
40+
slog::debug!(
41+
&log,
42+
"looking for instances with destroyed active VMMs..."
43+
);
44+
45+
let destroyed_active_vmms = self
46+
.datastore
47+
.find_instances_with_destroyed_active_vmms(opctx)
48+
.await
49+
.context("failed to find instances with destroyed active VMMs")?;
50+
51+
slog::info!(
52+
&log,
53+
"listed instances with destroyed active VMMs";
54+
"count" => destroyed_active_vmms.len(),
55+
);
56+
57+
updated.destroyed_active_vmms = destroyed_active_vmms.len();
58+
59+
for (instance, vmm) in destroyed_active_vmms {
60+
let saga = SagaRequest::InstanceUpdate {};
61+
}
62+
63+
Ok(updated)
64+
}
65+
}
66+
67+
#[derive(Default)]
68+
struct Updated {
69+
destroyed_active_vmms: usize,
70+
sagas_started: usize,
71+
}
72+
73+
impl BackgroundTask for InstanceUpdater {
74+
fn activate<'a>(
75+
&'a mut self,
76+
opctx: &'a OpContext,
77+
) -> BoxFuture<'a, serde_json::Value> {
78+
async {
79+
match self.activate2(opctx).await {
80+
Ok(updated) => json!({
81+
"destroyed_active_vmms": updated.destroyed_active_vmms,
82+
"error": None,
83+
}),
84+
Err(error) => {
85+
slog::error!(
86+
opctx.log,
87+
"failed to start instance update saga(s)";
88+
"error" => ?error,
89+
);
90+
json!({
91+
"destroyed_active_vmms": 0,
92+
"error": error.to_string(),
93+
})
94+
}
95+
}
96+
}
97+
.boxed()
98+
}
99+
}

nexus/src/app/background/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ mod dns_propagation;
1313
mod dns_servers;
1414
mod external_endpoints;
1515
mod init;
16+
mod instance_updater;
1617
mod instance_watcher;
1718
mod inventory_collection;
1819
mod metrics_producer_gc;

0 commit comments

Comments
 (0)