Skip to content

Commit bf1e9be

Browse files
committed
Code review updates
1 parent 5f232c9 commit bf1e9be

File tree

8 files changed

+52
-43
lines changed

8 files changed

+52
-43
lines changed

crates/hyperqueue/src/bin/hq.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ use hyperqueue::common::utils::fs::absolute_path;
4646
use hyperqueue::server::bootstrap::get_client_session;
4747
use hyperqueue::server::event::streamer::{EventFilter, EventFilterFlags};
4848
use hyperqueue::transfer::messages::{
49-
FromClientMessage, JobInfoRequest, StreamEvents, ToClientMessage,
49+
FromClientMessage, JobInfoRequest, StreamEvents, StreamEventsMode, ToClientMessage,
5050
};
5151
use hyperqueue::worker::hwdetect::{
5252
detect_additional_resources, detect_cpus, prune_hyper_threading,
@@ -157,8 +157,7 @@ async fn command_job_wait(gsettings: &GlobalSettings, opts: JobWaitOpts) -> anyh
157157
selector: opts.selector,
158158
include_running_tasks: true
159159
}, Some(StreamEvents {
160-
past_events: false,
161-
live_events: true,
160+
mode: StreamEventsMode::LiveEvents,
162161
enable_worker_overviews: false,
163162
filter: EventFilter::new(None, EventFilterFlags::JOB_EVENTS)
164163
})),
@@ -182,8 +181,7 @@ async fn command_job_progress(
182181
selector: opts.selector,
183182
include_running_tasks: true
184183
}, Some(StreamEvents {
185-
past_events: false,
186-
live_events: true,
184+
mode: StreamEventsMode::LiveEvents,
187185
enable_worker_overviews: false,
188186
filter: EventFilter::new(None, flags)
189187
})),

crates/hyperqueue/src/client/commands/journal/mod.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use crate::rpc_call;
77
use crate::server::bootstrap::get_client_session;
88
use crate::server::event::journal::JournalReader;
99
use crate::server::event::streamer::EventFilter;
10-
use crate::transfer::messages::{FromClientMessage, StreamEvents, ToClientMessage};
10+
use crate::transfer::messages::{
11+
FromClientMessage, StreamEvents, StreamEventsMode, ToClientMessage,
12+
};
1113
use anyhow::anyhow;
1214
use clap::{Parser, ValueHint};
1315
use std::io::{BufWriter, Write};
@@ -61,24 +63,19 @@ struct ExportOpts {
6163
pub async fn command_journal(gsettings: &GlobalSettings, opts: JournalOpts) -> anyhow::Result<()> {
6264
match opts.command {
6365
JournalCommand::Export(opts) => export_json(opts),
64-
JournalCommand::Replay => stream_json(gsettings, true, false).await,
65-
JournalCommand::Stream => stream_json(gsettings, true, true).await,
66+
JournalCommand::Replay => stream_json(gsettings, StreamEventsMode::PastEvents).await,
67+
JournalCommand::Stream => stream_json(gsettings, StreamEventsMode::PastAndLiveEvents).await,
6668
JournalCommand::Prune => prune_journal(gsettings).await,
6769
JournalCommand::Flush => flush_journal(gsettings).await,
6870
}
6971
}
7072

71-
async fn stream_json(
72-
gsettings: &GlobalSettings,
73-
past_events: bool,
74-
live_events: bool,
75-
) -> anyhow::Result<()> {
73+
async fn stream_json(gsettings: &GlobalSettings, mode: StreamEventsMode) -> anyhow::Result<()> {
7674
let mut connection = get_client_session(gsettings.server_directory()).await?;
7775
connection
7876
.connection()
7977
.send(FromClientMessage::StreamEvents(StreamEvents {
80-
past_events,
81-
live_events,
78+
mode,
8279
enable_worker_overviews: false,
8380
filter: EventFilter::all_events(),
8481
}))

crates/hyperqueue/src/client/commands/journal/output.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::client::output::json::format_datetime;
22

3-
use crate::server::event::payload::EventPayload;
43
use crate::server::event::Event;
4+
use crate::server::event::payload::EventPayload;
55
use crate::transfer::messages::{
66
JobSubmitDescription, JobTaskDescription, SubmitRequest, TaskDescription,
77
};

crates/hyperqueue/src/client/commands/submit/command.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use crate::server::event::streamer::{EventFilter, EventFilterFlags};
1919
use crate::transfer::connection::ClientSession;
2020
use crate::transfer::messages::{
2121
FromClientMessage, JobDescription, JobSubmitDescription, JobTaskDescription, PinMode,
22-
StreamEvents, SubmitRequest, SubmitResponse, TaskDescription, TaskKind, TaskKindProgram,
23-
ToClientMessage,
22+
StreamEvents, StreamEventsMode, SubmitRequest, SubmitResponse, TaskDescription, TaskKind,
23+
TaskKindProgram, ToClientMessage,
2424
};
2525
use anyhow::{anyhow, bail};
2626
use bstr::BString;
@@ -780,8 +780,7 @@ pub(crate) async fn send_submit_request(
780780
flags.insert(EventFilterFlags::NOTIFY_EVENTS);
781781
}
782782
Some(StreamEvents {
783-
past_events: false,
784-
live_events: true,
783+
mode: StreamEventsMode::LiveEvents,
785784
enable_worker_overviews: false,
786785
filter: EventFilter::new(None, flags),
787786
})

crates/hyperqueue/src/common/cli.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -400,10 +400,6 @@ pub struct JobWaitOpts {
400400
/// Waits until all tasks are completed, even if the job is still open
401401
#[clap(long, action)]
402402
pub without_close: bool,
403-
404-
/// Call the program in the argument that is executed whenever a task produce a notification
405-
#[clap(long, action)]
406-
pub run_on_notify: Option<String>,
407403
}
408404

409405
#[derive(Parser)]

crates/hyperqueue/src/dashboard/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ use crate::server::event::Event;
1313
use crate::server::event::journal::JournalReader;
1414
use crate::server::event::streamer::EventFilter;
1515
use crate::transfer::connection::ClientSession;
16-
use crate::transfer::messages::{FromClientMessage, StreamEvents, ToClientMessage};
16+
use crate::transfer::messages::{
17+
FromClientMessage, StreamEvents, StreamEventsMode, ToClientMessage,
18+
};
1719
use std::time::Duration;
1820

1921
// The time range in which the live timeline is display ([now() - duration, now()])
@@ -47,8 +49,7 @@ pub async fn preload_dashboard_events(
4749
// Start streaming events
4850
connection
4951
.send(FromClientMessage::StreamEvents(StreamEvents {
50-
past_events: true,
51-
live_events: true,
52+
mode: StreamEventsMode::PastAndLiveEvents,
5253
enable_worker_overviews: true,
5354
filter: EventFilter::all_events(),
5455
}))

crates/hyperqueue/src/server/client/mod.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -139,23 +139,19 @@ async fn start_streaming<
139139
Tx::Error: Debug,
140140
{
141141
let StreamEvents {
142-
past_events,
143-
live_events,
142+
mode,
144143
enable_worker_overviews,
145144
filter,
146145
} = stream_events;
147146
if enable_worker_overviews {
148147
senders.server_control.add_worker_overview_listener();
149148
}
150149
log::debug!("Start streaming events to client");
151-
if !live_events && !past_events {
152-
return;
153-
}
154150

155151
/* We create two event queues, one for historic events and one for live events
156152
So while historic events are loaded from the file and streamed, live events are already
157153
collected and sent immediately once the historic events are sent */
158-
let live = if live_events {
154+
let live = if mode.is_live_events_enabled() {
159155
let (tx2, rx2) = mpsc::unbounded_channel::<Event>();
160156
let listener_id = senders.events.register_listener(filter, tx2);
161157
Some((rx2, listener_id))
@@ -170,7 +166,7 @@ async fn start_streaming<
170166
// If we use a journal, we can replay historical events from it.
171167
// If not, we can at least try to reconstruct a few basic events
172168
// based on the current state.
173-
if past_events {
169+
if mode.is_past_events_enabled() {
174170
let (tx1, rx1) = mpsc::unbounded_channel::<Event>();
175171
if senders.events.is_journal_enabled() {
176172
senders.events.start_journal_replay(tx1);
@@ -181,7 +177,7 @@ async fn start_streaming<
181177
}
182178

183179
if let Some((rx2, listener_id)) = live {
184-
if past_events {
180+
if mode.is_past_events_enabled() {
185181
let _ = tx.send(ToClientMessage::EventLiveBoundary).await;
186182
}
187183
crate::server::client::stream_events(&mut tx, &mut rx, rx2).await;

crates/hyperqueue/src/transfer/messages.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,39 @@ pub struct TaskKindProgram {
9898
pub task_dir: bool,
9999
}
100100

101+
#[derive(Serialize, Deserialize, Debug, Clone)]
102+
pub enum StreamEventsMode {
103+
PastEvents,
104+
LiveEvents,
105+
PastAndLiveEvents,
106+
}
107+
108+
impl StreamEventsMode {
109+
pub fn is_live_events_enabled(&self) -> bool {
110+
matches!(
111+
self,
112+
StreamEventsMode::LiveEvents | StreamEventsMode::PastAndLiveEvents
113+
)
114+
}
115+
116+
pub fn is_past_events_enabled(&self) -> bool {
117+
matches!(
118+
self,
119+
StreamEventsMode::PastEvents | StreamEventsMode::PastAndLiveEvents
120+
)
121+
}
122+
}
123+
101124
#[derive(Serialize, Deserialize, Debug, Clone)]
102125
pub struct StreamEvents {
103-
/// If true, replay historical events and then start streaming live events.
104-
pub past_events: bool,
105-
/// If true, and then start streaming live events.
106-
/// If both `live_events` and `past_events` are true, then historical events are replayed first
107-
/// then event `EventLiveBoundary` is sent, then live events are streamed.
108-
pub live_events: bool,
109-
/// Enable worker overviews during the event streaming.
126+
/// When the mode is PathAndLiveEvents then:
127+
/// 1) historical events are replayed;
128+
/// 2) event `EventLiveBoundary` is sent,
129+
/// 3) live events are streamed.
130+
pub mode: StreamEventsMode,
131+
/// Enable worker overviews during the event streaming
110132
pub enable_worker_overviews: bool,
111-
/// Stream events only relevant for a given jobs
133+
/// Stream events only relevant for given jobs
112134
pub filter: EventFilter,
113135
}
114136

0 commit comments

Comments
 (0)