Skip to content

Commit

Permalink
merge: live build logs
Browse files Browse the repository at this point in the history
  • Loading branch information
VirtCode authored May 21, 2024
2 parents 10fd06e + 694e1d8 commit 93cc9b9
Show file tree
Hide file tree
Showing 15 changed files with 637 additions and 57 deletions.
366 changes: 336 additions & 30 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,7 @@ rand = "0.8.5"

# web, TODO: for some reason, when this builds with openssl (i.e. not rustls), the server does too, which causes problems with the container. WHYYY?
reqwest = { version = "0.11.23", default-features = false, features = ["rustls-tls", "blocking", "json"] }
chrono = { version = "0.4.31", features = ["serde"] }
chrono = { version = "0.4.31", features = ["serde"] }
reqwest-eventsource = { version = "0.6.0" }
futures = "0.3.30"
tokio = "1.37.0"
2 changes: 1 addition & 1 deletion cli/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ serene info my-package pkgbuild
# See more information about the latest build. Supply an id for a specific one.
serene info my-package build

# See the logs of the latest build. Supply an id for a specific one.
# See the logs of the latest build. Supply an id for a specific one. Add `--subscribe` to get live logs until next build is finished and `--linger` to indefinitely attach to live logs.
serene info my-package logs
```

Expand Down
10 changes: 9 additions & 1 deletion cli/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ pub enum InfoCommand {
/// get logs from a build
Logs {
/// id of the build, latest if empty
id: Option<String>
id: Option<String>,

/// subscribe to live logs until next build is finished
#[clap(short, long)]
subscribe: bool,

/// indefinitely stay attached to live logs
#[clap(short, long)]
linger: bool
},

/// get the pkgbuild used to build the current package
Expand Down
8 changes: 7 additions & 1 deletion cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ fn main() -> anyhow::Result<()> {
None => { requests::info(&config, &name, all); }
Some(InfoCommand::Pkgbuild) => { requests::pkgbuild(&config, &name); }
Some(InfoCommand::Build { id }) => { requests::build_info(&config, &name, &id); }
Some(InfoCommand::Logs { id }) => { requests::build_logs(&config, &name, &id); }
Some(InfoCommand::Logs { id, subscribe, linger }) => {
if id.is_some() {
requests::build_logs(&config, &name, &id);
} else {
requests::subscribe_build_logs(&config, linger, subscribe, &name)
}
}
Some(InfoCommand::Set { property }) => { requests::set_setting(&config, &name, property) }
}
}
Expand Down
31 changes: 31 additions & 0 deletions cli/src/web/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
pub mod requests;
pub mod data;

use futures::StreamExt;
use reqwest::blocking::{Client, Response};
use reqwest_eventsource::Event;
use serde::{Serialize};
use serde::de::DeserializeOwned;
use tokio::runtime::Runtime;
use crate::config::Config;

type Result<T> = std::result::Result<T, Error>;
Expand All @@ -12,6 +15,9 @@ enum Error {
Client {
error: reqwest::Error,
},
Event {
error: reqwest_eventsource::Error,
},
Server {
message: String,
},
Expand All @@ -27,6 +33,9 @@ impl Error {
Error::Client { error } => {
error!("failed to connect to server: {:#}", error);
}
Error::Event { error } => {
error!("error in event source: {}", error.to_string())
}
Error::Server { message } => {
error!("{}", message);
}
Expand Down Expand Up @@ -109,4 +118,26 @@ pub fn get<R: DeserializeOwned>(config: &Config, path: &str) -> Result<R> {
.send();

process_result(result)
}

pub fn eventsource<F>(config: &Config, path: &str, mut cb: F) -> Result<()> where F: FnMut(Event) {
let full_path = format!("{path}?auth={}", config.secret);
let mut con = reqwest_eventsource::EventSource::get(get_url(config, full_path.as_str()));

let rt = Runtime::new().expect("should be able to create runtime");

rt.block_on(async {
while let Some(event) = con.next().await {
match event {
Ok(event) => cb(event),
Err(err) => {
con.close();
return Err(Error::Event { error: err })
},
}
}
Ok(())
})?;

Ok(())
}
54 changes: 50 additions & 4 deletions cli/src/web/requests.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::process::exit;
use std::str::FromStr;
use anyhow::Context;
use chrono::{Local, Utc};
use colored::{ColoredString, Colorize};
use reqwest_eventsource::Event;
use serene_data::build::{BuildInfo, BuildState};
use serene_data::package::{MakepkgFlag, PackageAddRequest, PackageAddSource, PackageBuildRequest, PackageInfo, PackagePeek, PackageSettingsRequest};
use serene_data::package::{BroadcastEvent, MakepkgFlag, PackageAddRequest, PackageAddSource, PackageBuildRequest, PackageInfo, PackagePeek, PackageSettingsRequest};
use crate::command::SettingsSubcommand;
use crate::complete::save_completions;
use crate::config::Config;
use crate::table::{ago, Column, table};
use crate::web::{delete_empty, get, post, post_empty, post_simple};
use crate::web::{delete_empty, eventsource, get, post, post_empty, post_simple};
use crate::web::data::{BuildProgressFormatter, BuildStateFormatter, describe_cron_timezone_hack, get_build_id};

pub fn add_aur(c: &Config, name: &str, replace: bool) {
Expand Down Expand Up @@ -221,13 +222,58 @@ pub fn build_info(c: &Config, package: &str, build: &Option<String>) {


pub fn build_logs(c: &Config, package: &str, build: &Option<String>) {
println!("Querying server for package builds...");
match get::<String>(c, format!("package/{}/build/{}/logs", package, build.as_ref().unwrap_or(&"latest".to_string())).as_str()) {
Ok(logs) => { println!("{logs}") }
Err(e) => { e.print() }
}
}

fn latest_build_logs_quiet(c: &Config, package: &str) -> Option<String> {
get::<String>(c, format!("package/{}/build/latest/logs", package).as_str()).ok()
}

pub fn subscribe_build_logs(c: &Config, linger: bool, subscribe: bool, package: &str) {
let mut first_build_finished = false;
if !subscribe {
let latest = latest_build_logs_quiet(c, package);
if let Some(latest) = latest {
print!("{latest}");

if !linger {
return
}

first_build_finished = true;
println!("\n{}", "Package build finished".italic().dimmed());
}
}

let Err(err) = eventsource(c, format!("package/{}/build/logs/subscribe", package).as_str(), |event| {
if let Event::Message(event) = event {
if let Ok(broadcast_event) = BroadcastEvent::from_str(&event.event) {
match broadcast_event {
BroadcastEvent::BuildStart => {
if linger && first_build_finished {
println!("\n{}", "New package build started".italic().dimmed())
}
},
BroadcastEvent::BuildEnd => {
if !linger {
exit(0);
} else {
first_build_finished = true;
println!("\n{}", "Package build finished".italic().dimmed())
}
},
BroadcastEvent::Log => print!("{}", event.data),
_ => {}
}
}
}
}) else { return };
err.print();
}

pub fn set_setting(c: &Config, package: &str, setting: SettingsSubcommand) {
let request = match setting {
SettingsSubcommand::Clean { enabled } => {
Expand Down
2 changes: 2 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ srcinfo = "1.1.0"
# async
tokio = { version = "1.35.0", features = ["full"]}
tokio-util = { version = "0.7.10", features = ["compat"]}
tokio-stream = "0.1.15"

futures = "0.3.29"
futures-util = "0.3.29"
Expand All @@ -33,6 +34,7 @@ hyper = "0.14.27"
# web
actix-web = "4.4.0"
actix-files = "0.6.2"
actix-web-lab = "0.20.2"

# storage
serde = "1.0.193"
Expand Down
15 changes: 15 additions & 0 deletions server/data/src/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,19 @@ pub struct PackageInfo {

/// date added
pub added: DateTime<Utc>,
}

/// All events which can be emitted by the broadcast for a package
#[derive(Serialize, Deserialize, EnumString, Display, Clone)]
#[strum(serialize_all = "lowercase")]
#[serde(rename_all = "lowercase")]
pub enum BroadcastEvent {
/// A build job for the package was started
BuildStart,
/// A build job for the package finished
BuildEnd,
/// Log message for the package build
Log,
/// Ping to the event subscriber
Ping
}
2 changes: 1 addition & 1 deletion server/src/build/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl Builder {

self.runner.read().await.upload_sources(&container, package).await?;

let status = self.runner.read().await.build(&container).await?;
let status = self.runner.read().await.build(&container, package).await?;

Ok((status, container))
}
Expand Down
12 changes: 9 additions & 3 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod config;
mod build;
mod database;

use std::sync::{Arc};
use std::sync::Arc;
use actix_web::{App, HttpMessage, HttpServer};
use actix_web::web::Data;
use anyhow::Context;
Expand All @@ -19,17 +19,21 @@ use crate::config::CONFIG;
use crate::package::Package;
use crate::runner::{Runner};
use crate::repository::PackageRepository;
use crate::web::broadcast::Broadcast;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
env_logger::init();

// initializing database
let db = database::connect().await?;


// initialize broadcast
let broadcast = Broadcast::new();

// initializing runner
let runner = Arc::new(RwLock::new(
Runner::new()
Runner::new(broadcast.clone())
.context("failed to connect to docker")?
));

Expand Down Expand Up @@ -82,6 +86,7 @@ async fn main() -> anyhow::Result<()> {
.app_data(Data::new(db.clone()))
.app_data(Data::from(schedule.clone()))
.app_data(Data::from(builder.clone()))
.app_data(Data::from(broadcast.clone()))
.service(repository::webservice())
.service(web::add)
.service(web::list)
Expand All @@ -91,6 +96,7 @@ async fn main() -> anyhow::Result<()> {
.service(web::get_all_builds)
.service(web::get_build)
.service(web::get_logs)
.service(web::subscribe_logs)
.service(web::settings)
.service(web::pkgbuild)
).bind(("0.0.0.0", CONFIG.port))?.run().await?;
Expand Down
51 changes: 38 additions & 13 deletions server/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ pub mod archive;

use std::error::Error;
use std::io::Read;
use std::sync::Arc;
use std::vec;
use anyhow::{Context};
use async_tar::Archive;
use bollard::container::{Config, CreateContainerOptions, DownloadFromContainerOptions, ListContainersOptions, LogsOptions, StartContainerOptions, UploadToContainerOptions, WaitContainerOptions};
Expand All @@ -16,6 +18,7 @@ use tokio_util::io::StreamReader;
use tokio_util::compat::{TokioAsyncReadCompatExt};
use crate::config::CONFIG;
use crate::package::Package;
use crate::web::broadcast::{Broadcast, Event};

const RUNNER_IMAGE_BUILD_IN: &str = "/app/build";
const RUNNER_IMAGE_BUILD_OUT: &str = "/app/target";
Expand All @@ -35,12 +38,13 @@ pub type ContainerId = String;
/// this is a wrapper for docker which creates and interacts with runner containers
pub struct Runner {
pub docker: Docker,
broadcast: Arc<Broadcast>
}

impl Runner {

/// creates a new runner by taking the docker from the default socket
pub fn new() -> anyhow::Result<Self> {
pub fn new(broadcast: Arc<Broadcast>) -> anyhow::Result<Self> {
let docker = if let Some(url) = &CONFIG.docker_url {

if url.starts_with("tcp://") || url.starts_with("http://") {
Expand All @@ -60,38 +64,59 @@ impl Runner {
};

Ok(Self {
docker: docker.context("failed to initialize docker")?
docker: docker.context("failed to initialize docker")?,
broadcast
})
}

/// builds the package inside a container
pub async fn build(&self, container: &ContainerId) -> anyhow::Result<RunStatus> {
pub async fn build(&self, container: &ContainerId, package: &Package) -> anyhow::Result<RunStatus> {
let start = Utc::now();

// start container
self.docker.start_container(container, None::<StartContainerOptions<String>>).await?;

// wait for container to exit and collect logs
let result =
self.docker.wait_container(container, None::<WaitContainerOptions<String>>).collect::<Vec<_>>().await;

let end = Utc::now();
self.broadcast.notify(&package.base, Event::BuildStart).await;

// retrieve logs
let log_options = LogsOptions {
stdout: true, stderr: true,
follow: true, // follow is needed since we continuously read from the stream
since: start.timestamp(),
..Default::default()
};

let logs: Vec<String> = self.docker.logs::<String>(container, Some(log_options)).filter_map(|r| async {
r.ok().map(|c| c.to_string())
}).collect::<Vec<_>>().await;
let mut stream = self.docker.logs::<String>(container, Some(log_options));
let base = package.base.clone();
let broadcast = self.broadcast.clone();
let log_collector = tokio::spawn(async move {
let mut logs = vec![];
// collect logs from stream until the container exits (and the log stream closes)
while let Some(next) = stream.next().await {
if let Ok(log) = next {
let value = log.to_string();
logs.push(value.clone());
broadcast.notify(&base, Event::Log(value)).await;
}
}
logs.join("")
});


// wait for container to exit
let result =
self.docker.wait_container(container, None::<WaitContainerOptions<String>>).collect::<Vec<_>>().await;

let end = Utc::now();

// get logs from log collector thread
let logs = log_collector.await.unwrap_or_default();

self.broadcast.notify(&package.base, Event::BuildFinish).await;

Ok(RunStatus {
success: result.first().and_then(|r| r.as_ref().ok()).is_some(),
logs: logs.join(""),

logs,
started: start,
ended: end,
})
Expand Down
Loading

0 comments on commit 93cc9b9

Please sign in to comment.