From 82dcf88a05dc63d67fd567eaf9a00757eb96adc1 Mon Sep 17 00:00:00 2001 From: John Ericson Date: Wed, 10 Dec 2025 14:57:09 -0500 Subject: [PATCH 1/5] Sort Cargo deps and remove stray comment It was not about `lapin`, it was about `http`. --- ofborg/Cargo.toml | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ofborg/Cargo.toml b/ofborg/Cargo.toml index e6fd5b11..38aad489 100644 --- a/ofborg/Cargo.toml +++ b/ofborg/Cargo.toml @@ -15,25 +15,24 @@ chrono = { version = "0.4.38", default-features = false, features = [ either = "1.13.0" fs2 = "0.4.3" futures-util = "0.3.31" +hex = "0.4.3" +hmac = "0.12.1" +http = "1" #hubcaps = "0.6" # for Conclusion::Skipped which is in master hubcaps = { git = "https://github.com/ofborg/hubcaps.git", rev = "50dbe6ec45c9dfea4e3cfdf27bbadfa565f69dec", default-features = false, features = ["app", "rustls-tls"] } -http = "1" # hyper = { version = "0.14", features = ["full"] } hyper = "=0.10.*" -# maybe can be removed when hyper is updated lapin = "2.5.4" lru-cache = "0.1.2" md5 = "0.8.0" nom = "4.2.3" regex = "1.11.1" +rustls-pemfile = "2.2.0" serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.135" +sha2 = "0.10.8" tempfile = "3.15.0" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["json", "env-filter"] } uuid = { version = "1.12", features = ["v4"] } -rustls-pemfile = "2.2.0" -hmac = "0.12.1" -sha2 = "0.10.8" -hex = "0.4.3" From 15022bf59eb9600579043ac52c278b0a88955eec Mon Sep 17 00:00:00 2001 From: John Ericson Date: Wed, 10 Dec 2025 15:53:44 -0500 Subject: [PATCH 2/5] cargo update --- Cargo.lock | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cedfdaa8..33ed7add 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1223,9 +1223,9 @@ checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" [[package]] name = "icu_properties" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99" +checksum = "020bfc02fe870ec3a66d93e677ccca0562506e5872c650f893269e08615d74ec" dependencies = [ "icu_collections", "icu_locale_core", @@ -1237,9 +1237,9 @@ dependencies = [ [[package]] name = "icu_properties_data" -version = "2.1.1" +version = "2.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899" +checksum = "616c294cf8d725c6afcd8f55abc17c56464ef6211f9ed59cccffe534129c77af" [[package]] name = "icu_provider" @@ -2060,9 +2060,9 @@ checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" [[package]] name = "reqwest" -version = "0.12.24" +version = "0.12.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d0946410b9f7b082a427e4ef5c8ff541a88b357bc6c637c40db3a68ac70a36f" +checksum = "b6eff9328d40131d43bd911d42d79eb6a47312002a4daefc9e37f17e74a7701a" dependencies = [ "base64 0.22.1", "bytes", @@ -2701,9 +2701,9 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.6.7" +version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cf146f99d442e8e68e585f5d798ccd3cad9a7835b917e09728880a862706456" +checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" dependencies = [ "bitflags 2.10.0", "bytes", From 8e925470ceafac6cc309992cf58e5e2722aef7c0 Mon Sep 17 00:00:00 2001 From: John Ericson Date: Sun, 30 Nov 2025 12:11:41 -0500 Subject: [PATCH 3/5] Pull out some functions before switching to tokio This should have absolutely no change in behavior, but it will make the commit doing the switch easier to review. --- ofborg/src/bin/github-webhook-receiver.rs | 225 +++++++++++----------- ofborg/src/bin/logapi.rs | 181 +++++++++-------- ofborg/src/bin/stats.rs | 27 ++- 3 files changed, 229 insertions(+), 204 deletions(-) diff --git a/ofborg/src/bin/github-webhook-receiver.rs b/ofborg/src/bin/github-webhook-receiver.rs index 7e911126..626e2313 100644 --- a/ofborg/src/bin/github-webhook-receiver.rs +++ b/ofborg/src/bin/github-webhook-receiver.rs @@ -1,7 +1,6 @@ use std::env; use std::error::Error; use std::io::Read as _; -use std::sync::Arc; #[macro_use] extern crate hyper; @@ -86,6 +85,117 @@ fn setup_amqp(chan: &mut Channel) -> Result<(), Box> { Ok(()) } +fn handle_request(mut req: Request, mut res: Response, webhook_secret: &str, chan: &Channel) { + // HTTP 405 + if req.method != hyper::Post { + *res.status_mut() = StatusCode::MethodNotAllowed; + return; + } + let hdr = req.headers.clone(); + + // Read body + let mut raw = Vec::new(); + if req.read_to_end(&mut raw).is_err() { + warn!("Failed to read body from client"); + *res.status_mut() = StatusCode::InternalServerError; + return; + } + let raw = raw.as_slice(); + + // Validate signature + { + let Some(sig) = hdr.get::() else { + *res.status_mut() = StatusCode::BadRequest; + let _ = res.send(b"Missing signature header"); + return; + }; + let mut components = sig.splitn(2, '='); + let Some(algo) = components.next() else { + *res.status_mut() = StatusCode::BadRequest; + let _ = res.send(b"Signature hash method missing"); + return; + }; + let Some(hash) = components.next() else { + *res.status_mut() = StatusCode::BadRequest; + let _ = res.send(b"Signature hash missing"); + return; + }; + let Ok(hash) = hex::decode(hash) else { + *res.status_mut() = StatusCode::BadRequest; + let _ = res.send(b"Invalid signature hash hex"); + return; + }; + + if algo != "sha256" { + *res.status_mut() = StatusCode::BadRequest; + let _ = res.send(b"Invalid signature hash method"); + return; + } + + let Ok(mut mac) = Hmac::::new_from_slice(webhook_secret.as_bytes()) else { + *res.status_mut() = StatusCode::InternalServerError; + error!("Unable to create HMAC from secret"); + return; + }; + mac.update(raw); + if mac.verify_slice(hash.as_slice()).is_err() { + *res.status_mut() = StatusCode::BadRequest; + let _ = res.send(b"Signature verification failed"); + return; + } + } + + // Parse body + let Some(ct) = hdr.get::() else { + *res.status_mut() = StatusCode::BadRequest; + let _ = res.send(b"No Content-Type header passed"); + return; + }; + if ct + != &ContentType(mime::Mime( + mime::TopLevel::Application, + mime::SubLevel::Json, + Vec::new(), + )) + { + *res.status_mut() = StatusCode::BadRequest; + let _ = res.send(b"Content-Type is not application/json. Webhook misconfigured?"); + return; + } + let input = match serde_json::from_slice::(raw) { + Ok(i) => i, + Err(e) => { + *res.status_mut() = StatusCode::BadRequest; + let _ = res.send(b"Invalid JSON"); + error!("Invalid JSON received: {e}"); + return; + } + }; + + // Build routing key + let Some(event_type) = hdr.get::() else { + *res.status_mut() = StatusCode::BadRequest; + let _ = res.send(b"Missing event type"); + return; + }; + let routing_key = format!("{event_type}.{}", input.repository.full_name.to_lowercase()); + + // Publish message + let _confirmation = task::block_on(async { + chan.basic_publish( + "github-events", + &routing_key, + BasicPublishOptions::default(), + raw, + BasicProperties::default() + .with_content_type("application/json".into()) + .with_delivery_mode(2), // persistent + ) + .await + }); + *res.status_mut() = StatusCode::NoContent; +} + fn main() -> Result<(), Box> { ofborg::setup_log(); @@ -99,7 +209,7 @@ fn main() -> Result<(), Box> { let webhook_secret = std::fs::read_to_string(cfg.webhook_secret_file) .expect("Unable to read webhook secret file"); - let webhook_secret = Arc::new(webhook_secret.trim().to_string()); + let webhook_secret = webhook_secret.trim().to_string(); let conn = easylapin::from_config(&cfg.rabbitmq)?; let mut chan = task::block_on(conn.create_channel())?; @@ -111,115 +221,8 @@ fn main() -> Result<(), Box> { .unwrap_or(1); info!("Will listen on {} with {threads} threads", cfg.listen); Server::http(cfg.listen)?.handle_threads( - move |mut req: Request, mut res: Response| { - // HTTP 405 - if req.method != hyper::Post { - *res.status_mut() = StatusCode::MethodNotAllowed; - return; - } - let hdr = req.headers.clone(); - - // Read body - let mut raw = Vec::new(); - if req.read_to_end(&mut raw).is_err() { - warn!("Failed to read body from client"); - *res.status_mut() = StatusCode::InternalServerError; - return; - } - let raw = raw.as_slice(); - - // Validate signature - { - let Some(sig) = hdr.get::() else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Missing signature header"); - return; - }; - let mut components = sig.splitn(2, '='); - let Some(algo) = components.next() else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Signature hash method missing"); - return; - }; - let Some(hash) = components.next() else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Signature hash missing"); - return; - }; - let Ok(hash) = hex::decode(hash) else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Invalid signature hash hex"); - return; - }; - - if algo != "sha256" { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Invalid signature hash method"); - return; - } - - let Ok(mut mac) = Hmac::::new_from_slice(webhook_secret.as_bytes()) else { - *res.status_mut() = StatusCode::InternalServerError; - error!("Unable to create HMAC from secret"); - return; - }; - mac.update(raw); - if mac.verify_slice(hash.as_slice()).is_err() { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Signature verification failed"); - return; - } - } - - // Parse body - let Some(ct) = hdr.get::() else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"No Content-Type header passed"); - return; - }; - if ct - != &ContentType(mime::Mime( - mime::TopLevel::Application, - mime::SubLevel::Json, - Vec::new(), - )) - { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Content-Type is not application/json. Webhook misconfigured?"); - return; - } - let input = match serde_json::from_slice::(raw) { - Ok(i) => i, - Err(e) => { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Invalid JSON"); - error!("Invalid JSON received: {e}"); - return; - } - }; - - // Build routing key - let Some(event_type) = hdr.get::() else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Missing event type"); - return; - }; - let routing_key = format!("{event_type}.{}", input.repository.full_name.to_lowercase()); - - // Publish message - let _confirmation = task::block_on(async { - chan.basic_publish( - "github-events", - &routing_key, - BasicPublishOptions::default(), - raw, - BasicProperties::default() - .with_content_type("application/json".into()) - .with_delivery_mode(2), // persistent - ) - .await - }); - *res.status_mut() = StatusCode::NoContent; + move |req: Request, res: Response| { + handle_request(req, res, &webhook_secret, &chan); }, threads, )?; diff --git a/ofborg/src/bin/logapi.rs b/ofborg/src/bin/logapi.rs index 7797c59d..966def22 100644 --- a/ofborg/src/bin/logapi.rs +++ b/ofborg/src/bin/logapi.rs @@ -21,6 +21,96 @@ struct LogResponse { attempts: HashMap, } +#[derive(Clone)] +struct LogApiConfig { + logs_path: String, + serve_root: String, +} + +fn handle_request(req: Request, mut res: Response, cfg: &LogApiConfig) { + if req.method != hyper::Get { + *res.status_mut() = StatusCode::MethodNotAllowed; + return; + } + + let uri = req.uri.to_string(); + let Some(reqd) = uri.strip_prefix("/logs/").map(ToOwned::to_owned) else { + *res.status_mut() = StatusCode::NotFound; + let _ = res.send(b"invalid uri"); + return; + }; + let path: PathBuf = [&cfg.logs_path, &reqd].iter().collect(); + let Ok(path) = std::fs::canonicalize(&path) else { + *res.status_mut() = StatusCode::NotFound; + let _ = res.send(b"absent"); + return; + }; + let Ok(iter) = std::fs::read_dir(path) else { + *res.status_mut() = StatusCode::NotFound; + let _ = res.send(b"non dir"); + return; + }; + + let mut attempts = HashMap::::new(); + for e in iter { + let Ok(e) = e else { continue }; + let e_metadata = e.metadata(); + if e_metadata.as_ref().map(|v| v.is_dir()).unwrap_or(true) { + *res.status_mut() = StatusCode::InternalServerError; + let _ = res.send(b"dir found"); + return; + } + + if e_metadata.as_ref().map(|v| v.is_file()).unwrap_or_default() { + let Ok(file_name) = e.file_name().into_string() else { + warn!("entry filename is not a utf-8 string: {:?}", e.file_name()); + continue; + }; + + if file_name.ends_with(".metadata.json") || file_name.ends_with(".result.json") { + let Ok(file) = std::fs::File::open(e.path()) else { + warn!("could not open file: {file_name}"); + continue; + }; + let Ok(json) = serde_json::from_reader::<_, serde_json::Value>(file) else { + warn!("file is not a valid json file: {file_name}"); + continue; + }; + let Some(attempt_id) = json + .get("attempt_id") + .and_then(|v| v.as_str()) + .map(ToOwned::to_owned) + else { + warn!("attempt_id not found in file: {file_name}"); + continue; + }; + let attempt_obj = attempts.entry(attempt_id).or_default(); + if file_name.ends_with(".metadata.json") { + attempt_obj.metadata = Some(json); + } else { + attempt_obj.result = Some(json); + } + } else { + let attempt_obj = attempts.entry(file_name.clone()).or_default(); + attempt_obj.log_url = Some(format!("{}/{reqd}/{file_name}", &cfg.serve_root)); + } + } + } + + *res.status_mut() = StatusCode::Ok; + res.headers_mut() + .set::(hyper::header::ContentType(mime::Mime( + mime::TopLevel::Application, + mime::SubLevel::Json, + Vec::new(), + ))); + let _ = res.send( + serde_json::to_string(&LogResponse { attempts }) + .unwrap_or_default() + .as_bytes(), + ); +} + fn main() -> Result<(), Box> { ofborg::setup_log(); @@ -32,95 +122,18 @@ fn main() -> Result<(), Box> { panic!(); }; + let api_cfg = LogApiConfig { + logs_path: cfg.logs_path, + serve_root: cfg.serve_root, + }; + let threads = std::thread::available_parallelism() .map(|x| x.get()) .unwrap_or(1); info!("Will listen on {} with {threads} threads", cfg.listen); Server::http(cfg.listen)?.handle_threads( - move |req: Request, mut res: Response| { - if req.method != hyper::Get { - *res.status_mut() = StatusCode::MethodNotAllowed; - return; - } - - let uri = req.uri.to_string(); - let Some(reqd) = uri.strip_prefix("/logs/").map(ToOwned::to_owned) else { - *res.status_mut() = StatusCode::NotFound; - let _ = res.send(b"invalid uri"); - return; - }; - let path: PathBuf = [&cfg.logs_path, &reqd].iter().collect(); - let Ok(path) = std::fs::canonicalize(&path) else { - *res.status_mut() = StatusCode::NotFound; - let _ = res.send(b"absent"); - return; - }; - let Ok(iter) = std::fs::read_dir(path) else { - *res.status_mut() = StatusCode::NotFound; - let _ = res.send(b"non dir"); - return; - }; - - let mut attempts = HashMap::::new(); - for e in iter { - let Ok(e) = e else { continue }; - let e_metadata = e.metadata(); - if e_metadata.as_ref().map(|v| v.is_dir()).unwrap_or(true) { - *res.status_mut() = StatusCode::InternalServerError; - let _ = res.send(b"dir found"); - return; - } - - if e_metadata.as_ref().map(|v| v.is_file()).unwrap_or_default() { - let Ok(file_name) = e.file_name().into_string() else { - warn!("entry filename is not a utf-8 string: {:?}", e.file_name()); - continue; - }; - - if file_name.ends_with(".metadata.json") || file_name.ends_with(".result.json") - { - let Ok(file) = std::fs::File::open(e.path()) else { - warn!("could not open file: {file_name}"); - continue; - }; - let Ok(json) = serde_json::from_reader::<_, serde_json::Value>(file) else { - warn!("file is not a valid json file: {file_name}"); - continue; - }; - let Some(attempt_id) = json - .get("attempt_id") - .and_then(|v| v.as_str()) - .map(ToOwned::to_owned) - else { - warn!("attempt_id not found in file: {file_name}"); - continue; - }; - let attempt_obj = attempts.entry(attempt_id).or_default(); - if file_name.ends_with(".metadata.json") { - attempt_obj.metadata = Some(json); - } else { - attempt_obj.result = Some(json); - } - } else { - let attempt_obj = attempts.entry(file_name.clone()).or_default(); - attempt_obj.log_url = - Some(format!("{}/{reqd}/{file_name}", &cfg.serve_root)); - } - } - } - - *res.status_mut() = StatusCode::Ok; - res.headers_mut() - .set::(hyper::header::ContentType(mime::Mime( - mime::TopLevel::Application, - mime::SubLevel::Json, - Vec::new(), - ))); - let _ = res.send( - serde_json::to_string(&LogResponse { attempts }) - .unwrap_or_default() - .as_bytes(), - ); + move |req: Request, res: Response| { + handle_request(req, res, &api_cfg); }, threads, )?; diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs index 779db455..b24958c4 100644 --- a/ofborg/src/bin/stats.rs +++ b/ofborg/src/bin/stats.rs @@ -1,5 +1,6 @@ use std::env; use std::error::Error; +use std::sync::Arc; use std::thread; use async_std::task; @@ -9,6 +10,17 @@ use tracing::{error, info}; use ofborg::easyamqp::{ChannelExt, ConsumerExt}; use ofborg::{config, easyamqp, easylapin, stats, tasks}; +fn run_http_server(metrics: Arc) { + let addr = "0.0.0.0:9898"; + info!("HTTP server listening on {}", addr); + Server::http(addr) + .expect("Failed to bind HTTP server") + .handle(move |_: Request, res: Response| { + res.send(metrics.prometheus_output().as_bytes()).unwrap(); + }) + .expect("Failed to start HTTP server"); +} + fn main() -> Result<(), Box> { ofborg::setup_log(); @@ -28,8 +40,8 @@ fn main() -> Result<(), Box> { let events = stats::RabbitMq::from_lapin(&cfg.whoami(), task::block_on(conn.create_channel())?); - let metrics = stats::MetricCollector::new(); - let collector = tasks::statscollector::StatCollectorWorker::new(events, metrics.clone()); + let metrics = Arc::new(stats::MetricCollector::new()); + let collector = tasks::statscollector::StatCollectorWorker::new(events, (*metrics).clone()); chan.declare_exchange(easyamqp::ExchangeConfig { exchange: "stats".to_owned(), @@ -70,13 +82,10 @@ fn main() -> Result<(), Box> { }, )?; - thread::spawn(|| { - let addr = "0.0.0.0:9898"; - info!("listening addr {:?}", addr); - Server::http(addr)?.handle(move |_: Request, res: Response| { - res.send(metrics.prometheus_output().as_bytes()).unwrap(); - })?; - Ok::<_, Box>(()) + // Spawn HTTP server in a separate thread + let metrics_clone = metrics.clone(); + thread::spawn(move || { + run_http_server(metrics_clone); }); info!("Fetching jobs from {}", &queue_name); From 054cbf26759d4aa8a9c1d4538994de5a1b905314 Mon Sep 17 00:00:00 2001 From: John Ericson Date: Wed, 10 Dec 2025 16:05:09 -0500 Subject: [PATCH 4/5] Wrap `task::block_on` in preparation for switching to Tokio This will lesson the diff. --- ofborg/src/bin/build-faker.rs | 4 ++-- ofborg/src/bin/builder.rs | 9 +++++---- ofborg/src/bin/evaluation-filter.rs | 6 +++--- ofborg/src/bin/github-comment-filter.rs | 6 +++--- ofborg/src/bin/github-comment-poster.rs | 6 +++--- ofborg/src/bin/github-webhook-receiver.rs | 8 ++++---- ofborg/src/bin/log-message-collector.rs | 6 +++--- ofborg/src/bin/mass-rebuilder.rs | 8 ++++---- ofborg/src/bin/stats.rs | 8 ++++---- ofborg/src/commitstatus.rs | 2 +- ofborg/src/config.rs | 2 +- ofborg/src/easylapin.rs | 24 +++++++++++++---------- ofborg/src/lib.rs | 6 ++++++ ofborg/src/stats.rs | 3 +-- ofborg/src/tasks/eval/nixpkgs.rs | 2 +- ofborg/src/tasks/evaluate.rs | 12 ++++++------ ofborg/src/tasks/githubcommentfilter.rs | 2 +- ofborg/src/tasks/githubcommentposter.rs | 2 +- 18 files changed, 63 insertions(+), 53 deletions(-) diff --git a/ofborg/src/bin/build-faker.rs b/ofborg/src/bin/build-faker.rs index e5f03505..bf510e5b 100644 --- a/ofborg/src/bin/build-faker.rs +++ b/ofborg/src/bin/build-faker.rs @@ -1,10 +1,10 @@ use std::env; use std::error::Error; -use async_std::task; use lapin::BasicProperties; use lapin::message::Delivery; +use ofborg::block_on; use ofborg::commentparser; use ofborg::config; use ofborg::easylapin; @@ -19,7 +19,7 @@ fn main() -> Result<(), Box> { let cfg = config::load(arg.as_ref()); let conn = easylapin::from_config(&cfg.builder.unwrap().rabbitmq)?; - let mut chan = task::block_on(conn.create_channel())?; + let mut chan = block_on(conn.create_channel())?; let repo_msg = Repo { clone_url: "https://github.com/nixos/ofborg.git".to_owned(), diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index e2d7a7a6..25778a08 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -2,8 +2,9 @@ use std::env; use std::error::Error; use std::path::Path; -use async_std::task::{self, JoinHandle}; +use async_std::task::{JoinHandle, spawn}; use futures_util::future; +use ofborg::block_on; use tracing::{error, info, warn}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; @@ -31,7 +32,7 @@ fn main() -> Result<(), Box> { handles.push(handle_ext); } - task::block_on(future::join_all(handles)); + block_on(future::join_all(handles)); drop(conn); // Close connection. info!("Closed the session... EOF"); @@ -43,7 +44,7 @@ fn create_handle( cfg: &config::Config, system: String, ) -> Result, Box> { - let mut chan = task::block_on(conn.create_channel())?; + let mut chan = block_on(conn.create_channel())?; let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root)); let nix = cfg.nix().with_system(system.clone()); @@ -104,5 +105,5 @@ fn create_handle( )?; info!("Fetching jobs from {}", &queue_name); - Ok(task::spawn(handle)) + Ok(spawn(handle)) } diff --git a/ofborg/src/bin/evaluation-filter.rs b/ofborg/src/bin/evaluation-filter.rs index d391f237..8c8b41a9 100644 --- a/ofborg/src/bin/evaluation-filter.rs +++ b/ofborg/src/bin/evaluation-filter.rs @@ -1,7 +1,7 @@ use std::env; use std::error::Error; -use async_std::task; +use ofborg::block_on; use tracing::{error, info}; use ofborg::config; @@ -23,7 +23,7 @@ fn main() -> Result<(), Box> { }; let conn = easylapin::from_config(&filter_cfg.rabbitmq)?; - let mut chan = task::block_on(conn.create_channel())?; + let mut chan = block_on(conn.create_channel())?; chan.declare_exchange(easyamqp::ExchangeConfig { exchange: "github-events".to_owned(), @@ -74,7 +74,7 @@ fn main() -> Result<(), Box> { )?; info!("Fetching jobs from {}", &queue_name); - task::block_on(handle); + block_on(handle); drop(conn); // Close connection. info!("Closed the session... EOF"); diff --git a/ofborg/src/bin/github-comment-filter.rs b/ofborg/src/bin/github-comment-filter.rs index 1ddf045c..95a17dfb 100644 --- a/ofborg/src/bin/github-comment-filter.rs +++ b/ofborg/src/bin/github-comment-filter.rs @@ -1,7 +1,7 @@ use std::env; use std::error::Error; -use async_std::task; +use ofborg::block_on; use ofborg::systems::System; use tracing::{error, info}; @@ -24,7 +24,7 @@ fn main() -> Result<(), Box> { }; let conn = easylapin::from_config(&filter_cfg.rabbitmq)?; - let mut chan = task::block_on(conn.create_channel())?; + let mut chan = block_on(conn.create_channel())?; chan.declare_exchange(easyamqp::ExchangeConfig { exchange: "github-events".to_owned(), @@ -98,7 +98,7 @@ fn main() -> Result<(), Box> { )?; info!("Fetching jobs from {}", &queue_name); - task::block_on(handle); + block_on(handle); drop(conn); // Close connection. info!("Closed the session... EOF"); diff --git a/ofborg/src/bin/github-comment-poster.rs b/ofborg/src/bin/github-comment-poster.rs index cf183d3d..579fddbd 100644 --- a/ofborg/src/bin/github-comment-poster.rs +++ b/ofborg/src/bin/github-comment-poster.rs @@ -1,7 +1,7 @@ use std::env; use std::error::Error; -use async_std::task; +use ofborg::block_on; use tracing::{error, info}; use ofborg::config; @@ -23,7 +23,7 @@ fn main() -> Result<(), Box> { }; let conn = easylapin::from_config(&poster_cfg.rabbitmq)?; - let mut chan = task::block_on(conn.create_channel())?; + let mut chan = block_on(conn.create_channel())?; chan.declare_exchange(easyamqp::ExchangeConfig { exchange: "build-results".to_owned(), @@ -63,7 +63,7 @@ fn main() -> Result<(), Box> { }, )?; - task::block_on(handle); + block_on(handle); drop(conn); // Close connection. info!("Closed the session... EOF"); diff --git a/ofborg/src/bin/github-webhook-receiver.rs b/ofborg/src/bin/github-webhook-receiver.rs index 626e2313..22a56845 100644 --- a/ofborg/src/bin/github-webhook-receiver.rs +++ b/ofborg/src/bin/github-webhook-receiver.rs @@ -4,7 +4,6 @@ use std::io::Read as _; #[macro_use] extern crate hyper; -use async_std::task; use hmac::{Hmac, Mac}; use hyper::header::ContentType; use hyper::mime; @@ -14,6 +13,7 @@ use hyper::{ }; use lapin::options::BasicPublishOptions; use lapin::{BasicProperties, Channel}; +use ofborg::block_on; use ofborg::ghevent::GenericWebhook; use ofborg::{config, easyamqp, easyamqp::ChannelExt, easylapin}; use sha2::Sha256; @@ -181,7 +181,7 @@ fn handle_request(mut req: Request, mut res: Response, webhook_secret: &str, cha let routing_key = format!("{event_type}.{}", input.repository.full_name.to_lowercase()); // Publish message - let _confirmation = task::block_on(async { + let _confirmation = block_on(async { chan.basic_publish( "github-events", &routing_key, @@ -212,10 +212,10 @@ fn main() -> Result<(), Box> { let webhook_secret = webhook_secret.trim().to_string(); let conn = easylapin::from_config(&cfg.rabbitmq)?; - let mut chan = task::block_on(conn.create_channel())?; + let mut chan = block_on(conn.create_channel())?; setup_amqp(&mut chan)?; - //let events = stats::RabbitMq::from_lapin(&cfg.whoami(), task::block_on(conn.create_channel())?); + //let events = stats::RabbitMq::from_lapin(&cfg.whoami(), block_on(conn.create_channel())?); let threads = std::thread::available_parallelism() .map(|x| x.get()) .unwrap_or(1); diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index fec6cb56..19bfadb9 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -2,7 +2,7 @@ use std::env; use std::error::Error; use std::path::PathBuf; -use async_std::task; +use ofborg::block_on; use tracing::{error, info}; use ofborg::config; @@ -24,7 +24,7 @@ fn main() -> Result<(), Box> { }; let conn = easylapin::from_config(&collector_cfg.rabbitmq)?; - let mut chan = task::block_on(conn.create_channel())?; + let mut chan = block_on(conn.create_channel())?; chan.declare_exchange(easyamqp::ExchangeConfig { exchange: "logs".to_owned(), @@ -70,7 +70,7 @@ fn main() -> Result<(), Box> { )?; info!("Fetching jobs from {}", &queue_name); - task::block_on(handle); + block_on(handle); drop(conn); // Close connection. info!("Closed the session... EOF"); diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index 902cb940..f96e45e3 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -2,7 +2,7 @@ use std::env; use std::error::Error; use std::path::Path; -use async_std::task; +use ofborg::block_on; use tracing::{error, info}; use ofborg::checkout; @@ -26,12 +26,12 @@ fn main() -> Result<(), Box> { }; let conn = easylapin::from_config(&rebuilder_cfg.rabbitmq)?; - let mut chan = task::block_on(conn.create_channel())?; + let mut chan = block_on(conn.create_channel())?; let root = Path::new(&cfg.checkout.root); let cloner = checkout::cached_cloner(&root.join(cfg.runner.instance.to_string())); - let events = stats::RabbitMq::from_lapin(&cfg.whoami(), task::block_on(conn.create_channel())?); + let events = stats::RabbitMq::from_lapin(&cfg.whoami(), block_on(conn.create_channel())?); let queue_name = String::from("mass-rebuild-check-jobs"); chan.declare_queue(easyamqp::QueueConfig { @@ -62,7 +62,7 @@ fn main() -> Result<(), Box> { )?; info!("Fetching jobs from {}", queue_name); - task::block_on(handle); + block_on(handle); drop(conn); // Close connection. info!("Closed the session... EOF"); diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs index b24958c4..848852d4 100644 --- a/ofborg/src/bin/stats.rs +++ b/ofborg/src/bin/stats.rs @@ -3,8 +3,8 @@ use std::error::Error; use std::sync::Arc; use std::thread; -use async_std::task; use hyper::server::{Request, Response, Server}; +use ofborg::block_on; use tracing::{error, info}; use ofborg::easyamqp::{ChannelExt, ConsumerExt}; @@ -36,9 +36,9 @@ fn main() -> Result<(), Box> { let conn = easylapin::from_config(&stats_cfg.rabbitmq)?; - let mut chan = task::block_on(conn.create_channel())?; + let mut chan = block_on(conn.create_channel())?; - let events = stats::RabbitMq::from_lapin(&cfg.whoami(), task::block_on(conn.create_channel())?); + let events = stats::RabbitMq::from_lapin(&cfg.whoami(), block_on(conn.create_channel())?); let metrics = Arc::new(stats::MetricCollector::new()); let collector = tasks::statscollector::StatCollectorWorker::new(events, (*metrics).clone()); @@ -89,7 +89,7 @@ fn main() -> Result<(), Box> { }); info!("Fetching jobs from {}", &queue_name); - task::block_on(handle); + block_on(handle); drop(conn); // Close connection. info!("Closed the session... EOF"); diff --git a/ofborg/src/commitstatus.rs b/ofborg/src/commitstatus.rs index 12c4b350..b982d904 100644 --- a/ofborg/src/commitstatus.rs +++ b/ofborg/src/commitstatus.rs @@ -57,7 +57,7 @@ impl CommitStatus { } else { self.description.clone() }; - async_std::task::block_on( + crate::block_on( self.api .create( self.sha.as_ref(), diff --git a/ofborg/src/config.rs b/ofborg/src/config.rs index 3300e925..fd4cbd45 100644 --- a/ofborg/src/config.rs +++ b/ofborg/src/config.rs @@ -339,7 +339,7 @@ impl GithubAppVendingMachine { let lookup_gh = Github::new(useragent, Credentials::JWT(jwt)).unwrap(); - match async_std::task::block_on(lookup_gh.app().find_repo_installation(owner, repo)) { + match crate::block_on(lookup_gh.app().find_repo_installation(owner, repo)) { Ok(install_id) => { debug!("Received install ID {:?}", install_id); Some(install_id.id) diff --git a/ofborg/src/easylapin.rs b/ofborg/src/easylapin.rs index f53dc6d4..4daf4fee 100644 --- a/ofborg/src/easylapin.rs +++ b/ofborg/src/easylapin.rs @@ -11,7 +11,6 @@ use crate::worker::{Action, SimpleWorker}; use async_std::future::Future; use async_std::stream::StreamExt; -use async_std::task; use lapin::message::Delivery; use lapin::options::{ BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions, @@ -31,7 +30,7 @@ pub fn from_config(cfg: &RabbitMqConfig) -> Result { client_properties: props, ..Default::default() }; - task::block_on(Connection::connect(&cfg.as_uri()?, opts)) + crate::block_on(Connection::connect(&cfg.as_uri()?, opts)) } impl ChannelExt for Channel { @@ -51,7 +50,12 @@ impl ChannelExt for Channel { ExchangeType::Fanout => ExchangeKind::Fanout, _ => panic!("exchange kind"), }; - task::block_on(self.exchange_declare(&config.exchange, kind, opts, FieldTable::default()))?; + crate::block_on(self.exchange_declare( + &config.exchange, + kind, + opts, + FieldTable::default(), + ))?; Ok(()) } @@ -64,7 +68,7 @@ impl ChannelExt for Channel { nowait: config.no_wait, }; - task::block_on(self.queue_declare(&config.queue, opts, FieldTable::default()))?; + crate::block_on(self.queue_declare(&config.queue, opts, FieldTable::default()))?; Ok(()) } @@ -73,7 +77,7 @@ impl ChannelExt for Channel { nowait: config.no_wait, }; - task::block_on(self.queue_bind( + crate::block_on(self.queue_bind( &config.queue, &config.exchange, &config.routing_key.unwrap_or_else(|| "".into()), @@ -89,7 +93,7 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel { type Handle = Pin + 'a>>; fn consume(self, mut worker: W, config: ConsumeConfig) -> Result { - let mut consumer = task::block_on(self.basic_consume( + let mut consumer = crate::block_on(self.basic_consume( &config.queue, &config.consumer_tag, BasicConsumeOptions::default(), @@ -127,7 +131,7 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for WorkerChannel { type Handle = Pin + 'a>>; fn consume(self, worker: W, config: ConsumeConfig) -> Result { - task::block_on(self.0.basic_qos(1, BasicQosOptions::default()))?; + crate::block_on(self.0.basic_qos(1, BasicQosOptions::default()))?; self.0.consume(worker, config) } } @@ -145,7 +149,7 @@ impl<'a> ChannelNotificationReceiver<'a> { impl NotificationReceiver for ChannelNotificationReceiver<'_> { fn tell(&mut self, action: Action) { - task::block_on(action_deliver(self.channel, self.deliver, action)) + crate::block_on(action_deliver(self.channel, self.deliver, action)) .expect("action deliver failure"); } } @@ -159,9 +163,9 @@ impl<'a, W: SimpleNotifyWorker + 'a + Send> ConsumerExt<'a, W> for NotifyChannel type Handle = Pin + 'a + Send>>; fn consume(self, worker: W, config: ConsumeConfig) -> Result { - task::block_on(self.0.basic_qos(1, BasicQosOptions::default()))?; + crate::block_on(self.0.basic_qos(1, BasicQosOptions::default()))?; - let mut consumer = task::block_on(self.0.basic_consume( + let mut consumer = crate::block_on(self.0.basic_consume( &config.queue, &config.consumer_tag, BasicConsumeOptions::default(), diff --git a/ofborg/src/lib.rs b/ofborg/src/lib.rs index 51d00f9f..21374903 100644 --- a/ofborg/src/lib.rs +++ b/ofborg/src/lib.rs @@ -9,6 +9,7 @@ extern crate nom; use std::env; +use std::future::Future; use tracing_subscriber::EnvFilter; use tracing_subscriber::prelude::*; @@ -108,3 +109,8 @@ pub fn setup_log() { tracing::info!("Logging configured"); } + +/// Block on a future from synchronous code. +pub fn block_on(f: F) -> F::Output { + async_std::task::block_on(f) +} diff --git a/ofborg/src/stats.rs b/ofborg/src/stats.rs index 618a9013..08deb27e 100644 --- a/ofborg/src/stats.rs +++ b/ofborg/src/stats.rs @@ -1,4 +1,3 @@ -use async_std::task; use lapin::options::BasicPublishOptions; include!(concat!(env!("OUT_DIR"), "/events.rs")); @@ -36,7 +35,7 @@ impl RabbitMq { impl SysEvents for RabbitMq { fn notify(&mut self, event: Event) { let props = lapin::BasicProperties::default().with_content_type("application/json".into()); - task::block_on(async { + crate::block_on(async { let _confirmaton = self .channel .basic_publish( diff --git a/ofborg/src/tasks/eval/nixpkgs.rs b/ofborg/src/tasks/eval/nixpkgs.rs index b0140c0e..52f85fae 100644 --- a/ofborg/src/tasks/eval/nixpkgs.rs +++ b/ofborg/src/tasks/eval/nixpkgs.rs @@ -49,7 +49,7 @@ impl<'a> NixpkgsStrategy<'a> { } fn tag_from_title(&self) { - let title = match async_std::task::block_on(self.issue_ref.get()) { + let title = match crate::block_on(self.issue_ref.get()) { Ok(issue) => issue.title.to_lowercase(), Err(_) => return, }; diff --git a/ofborg/src/tasks/evaluate.rs b/ofborg/src/tasks/evaluate.rs index 51b80e56..dc40e440 100644 --- a/ofborg/src/tasks/evaluate.rs +++ b/ofborg/src/tasks/evaluate.rs @@ -158,7 +158,7 @@ impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> { &self.job.pr.number, &self.job.pr.head_sha, &description ); - async_std::task::block_on( + crate::block_on( self.repo .statuses() .create(&self.job.pr.head_sha, &builder.build()) @@ -231,7 +231,7 @@ impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> { let issue_ref = repo.issue(job.pr.number); let auto_schedule_build_archs: Vec; - match async_std::task::block_on(issue_ref.get()) { + match crate::block_on(issue_ref.get()) { Ok(iss) => { if iss.state == "closed" { self.events.notify(Event::IssueAlreadyClosed); @@ -452,7 +452,7 @@ fn schedule_builds( pub fn update_labels(issueref: &hubcaps::issues::IssueRef, add: &[String], remove: &[String]) { let l = issueref.labels(); - let issue = async_std::task::block_on(issueref.get()).expect("Failed to get issue"); + let issue = crate::block_on(issueref.get()).expect("Failed to get issue"); let existing: Vec = issue.labels.iter().map(|l| l.name.clone()).collect(); @@ -472,11 +472,11 @@ pub fn update_labels(issueref: &hubcaps::issues::IssueRef, add: &[String], remov info!("Labeling issue #{issue}: + {to_add:?} , - {to_remove:?}, = {existing:?}"); - async_std::task::block_on(l.add(to_add.clone())) + crate::block_on(l.add(to_add.clone())) .unwrap_or_else(|err| panic!("Failed to add labels {to_add:?} to issue #{issue}: {err:?}")); for label in to_remove { - async_std::task::block_on(l.remove(&label)).unwrap_or_else(|err| { + crate::block_on(l.remove(&label)).unwrap_or_else(|err| { panic!("Failed to remove label {label:?} from issue #{issue}: {err:?}") }); } @@ -497,7 +497,7 @@ pub fn get_prefix( statuses: hubcaps::statuses::Statuses, sha: &str, ) -> Result<&str, CommitStatusError> { - if async_std::task::block_on(statuses.list(sha))? + if crate::block_on(statuses.list(sha))? .iter() .any(|s| s.context.starts_with("grahamcofborg-")) { diff --git a/ofborg/src/tasks/githubcommentfilter.rs b/ofborg/src/tasks/githubcommentfilter.rs index bd23b234..fd7b1ecb 100644 --- a/ofborg/src/tasks/githubcommentfilter.rs +++ b/ofborg/src/tasks/githubcommentfilter.rs @@ -65,7 +65,7 @@ impl worker::SimpleWorker for GitHubCommentWorker { let instructions = commentparser::parse(&job.comment.body); info!("Instructions: {:?}", instructions); - let pr = async_std::task::block_on( + let pr = crate::block_on( self.github .repo( job.repository.owner.login.clone(), diff --git a/ofborg/src/tasks/githubcommentposter.rs b/ofborg/src/tasks/githubcommentposter.rs index 8de1613b..c4381ce1 100644 --- a/ofborg/src/tasks/githubcommentposter.rs +++ b/ofborg/src/tasks/githubcommentposter.rs @@ -78,7 +78,7 @@ impl worker::SimpleWorker for GitHubCommentPoster { ); debug!("{:?}", check); - let check_create_attempt = async_std::task::block_on( + let check_create_attempt = crate::block_on( self.github_vend .for_repo(&repo.owner, &repo.name) .unwrap() From 5ceda763da0baa11856ccb6cff77fff12ea28307 Mon Sep 17 00:00:00 2001 From: John Ericson Date: Sun, 30 Nov 2025 12:06:24 -0500 Subject: [PATCH 5/5] Bump manually deps, async-std -> tokio The LLM did this, and I cannot (before or after) run all the tests without things hanging, so we should procede with caution. --- Cargo.lock | 472 ++++++---------------- ofborg/Cargo.toml | 9 +- ofborg/src/bin/build-faker.rs | 5 +- ofborg/src/bin/builder.rs | 10 +- ofborg/src/bin/github-webhook-receiver.rs | 257 +++++++----- ofborg/src/bin/logapi.rs | 112 ++--- ofborg/src/bin/stats.rs | 61 ++- ofborg/src/commitstatus.rs | 3 +- ofborg/src/easylapin.rs | 3 +- ofborg/src/lib.rs | 16 +- 10 files changed, 402 insertions(+), 546 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 33ed7add..fec5c0be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,8 +66,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f89f8273826a676282208e5af38461a07fe939def57396af6ad5997fcf56577d" dependencies = [ "amq-protocol-types", - "percent-encoding 2.3.2", - "url 2.5.7", + "percent-encoding", + "url", ] [[package]] @@ -92,7 +92,7 @@ dependencies = [ "num-traits", "rusticata-macros", "thiserror", - "time 0.3.44", + "time", ] [[package]] @@ -118,17 +118,6 @@ dependencies = [ "syn", ] -[[package]] -name = "async-channel" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" -dependencies = [ - "concurrent-queue", - "event-listener 2.5.3", - "futures-core", -] - [[package]] name = "async-channel" version = "2.5.0" @@ -155,29 +144,13 @@ dependencies = [ "slab", ] -[[package]] -name = "async-global-executor" -version = "2.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" -dependencies = [ - "async-channel 2.5.0", - "async-executor", - "async-io 2.6.0", - "async-lock 3.4.1", - "blocking", - "futures-lite 2.6.1", - "once_cell", - "tokio", -] - [[package]] name = "async-global-executor" version = "3.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13f937e26114b93193065fd44f507aa2e9169ad0cdabbb996920b1fe1ddea7ba" dependencies = [ - "async-channel 2.5.0", + "async-channel", "async-executor", "async-io 2.6.0", "async-lock 3.4.1", @@ -191,7 +164,7 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9af57045d58eeb1f7060e7025a1631cbc6399e0a1d10ad6735b3d0ea7f8346ce" dependencies = [ - "async-global-executor 3.1.0", + "async-global-executor", "async-trait", "executor-trait", ] @@ -207,7 +180,7 @@ dependencies = [ "cfg-if", "concurrent-queue", "futures-lite 1.13.0", - "log 0.4.29", + "log", "parking", "polling 2.8.0", "rustix 0.37.28", @@ -254,23 +227,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "async-process" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea6438ba0a08d81529c69b36700fa2f95837bfe3e776ab39cde9c14d9149da88" -dependencies = [ - "async-io 1.13.0", - "async-lock 2.8.0", - "async-signal", - "blocking", - "cfg-if", - "event-listener 3.1.0", - "futures-lite 1.13.0", - "rustix 0.38.44", - "windows-sys 0.48.0", -] - [[package]] name = "async-reactor-trait" version = "1.1.0" @@ -283,51 +239,6 @@ dependencies = [ "reactor-trait", ] -[[package]] -name = "async-signal" -version = "0.2.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43c070bbf59cd3570b6b2dd54cd772527c7c3620fce8be898406dd3ed6adc64c" -dependencies = [ - "async-io 2.6.0", - "async-lock 3.4.1", - "atomic-waker", - "cfg-if", - "futures-core", - "futures-io", - "rustix 1.1.2", - "signal-hook-registry", - "slab", - "windows-sys 0.61.2", -] - -[[package]] -name = "async-std" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" -dependencies = [ - "async-channel 1.9.0", - "async-global-executor 2.4.1", - "async-io 1.13.0", - "async-lock 2.8.0", - "async-process", - "crossbeam-utils", - "futures-channel", - "futures-core", - "futures-io", - "futures-lite 1.13.0", - "gloo-timers", - "kv-log-macro", - "log 0.4.29", - "memchr", - "once_cell", - "pin-project-lite", - "pin-utils", - "slab", - "wasm-bindgen-futures", -] - [[package]] name = "async-task" version = "4.7.1" @@ -357,16 +268,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" -[[package]] -name = "base64" -version = "0.9.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "489d6c0ed21b11d038c31b6ceccca973e65d73ba3bd8ecb9a2babf5546164643" -dependencies = [ - "byteorder", - "safemem", -] - [[package]] name = "base64" version = "0.13.1" @@ -427,7 +328,7 @@ version = "1.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" dependencies = [ - "async-channel 2.5.0", + "async-channel", "async-task", "futures-io", "futures-lite 2.6.1", @@ -446,12 +347,6 @@ version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" -[[package]] -name = "byteorder" -version = "1.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" - [[package]] name = "bytes" version = "1.11.0" @@ -680,6 +575,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + [[package]] name = "errno" version = "0.3.14" @@ -696,17 +597,6 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" -[[package]] -name = "event-listener" -version = "3.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d93877bcde0eb80ca09131a08d23f0a5c18a620b01db137dba666d18cd9b30c2" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - [[package]] name = "event-listener" version = "5.4.1" @@ -787,7 +677,7 @@ version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" dependencies = [ - "percent-encoding 2.3.2", + "percent-encoding", ] [[package]] @@ -921,7 +811,7 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi 0.11.1+wasi-snapshot-preview1", + "wasi", "wasm-bindgen", ] @@ -940,17 +830,30 @@ dependencies = [ ] [[package]] -name = "gloo-timers" -version = "0.2.6" +name = "h2" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" dependencies = [ - "futures-channel", + "atomic-waker", + "bytes", + "fnv", "futures-core", - "js-sys", - "wasm-bindgen", + "futures-sink", + "http 1.4.0", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", ] +[[package]] +name = "hashbrown" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -1045,33 +948,14 @@ dependencies = [ "http 1.4.0", "hyperx", "jsonwebtoken", - "log 0.4.29", - "mime 0.3.17", - "percent-encoding 2.3.2", + "log", + "mime", + "percent-encoding", "reqwest", "serde", "serde_derive", "serde_json", - "url 2.5.7", -] - -[[package]] -name = "hyper" -version = "0.10.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a0652d9a2609a968c14be1a9ea00bf4b1d64e2e1f53a1b51b6fff3a6e829273" -dependencies = [ - "base64 0.9.3", - "httparse", - "language-tags 0.2.2", - "log 0.3.9", - "mime 0.2.6", - "num_cpus", - "time 0.1.45", - "traitobject", - "typeable", - "unicase 1.4.2", - "url 1.7.2", + "url", ] [[package]] @@ -1084,9 +968,11 @@ dependencies = [ "bytes", "futures-channel", "futures-core", + "h2", "http 1.4.0", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -1102,7 +988,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ "http 1.4.0", - "hyper 1.8.1", + "hyper", "hyper-util", "rustls", "rustls-pki-types", @@ -1125,10 +1011,10 @@ dependencies = [ "futures-util", "http 1.4.0", "http-body", - "hyper 1.8.1", + "hyper", "ipnet", "libc", - "percent-encoding 2.3.2", + "percent-encoding", "pin-project-lite", "socket2 0.6.1", "tokio", @@ -1145,10 +1031,10 @@ dependencies = [ "bytes", "http 0.2.12", "httpdate", - "language-tags 0.3.2", - "mime 0.3.17", - "percent-encoding 2.3.2", - "unicase 2.8.1", + "language-tags", + "mime", + "percent-encoding", + "unicase", ] [[package]] @@ -1161,7 +1047,7 @@ dependencies = [ "core-foundation-sys", "iana-time-zone-haiku", "js-sys", - "log 0.4.29", + "log", "wasm-bindgen", "windows-core", ] @@ -1256,17 +1142,6 @@ dependencies = [ "zerovec", ] -[[package]] -name = "idna" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" -dependencies = [ - "matches", - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "1.1.0" @@ -1288,6 +1163,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "2.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" +dependencies = [ + "equivalent", + "hashbrown", +] + [[package]] name = "inout" version = "0.1.4" @@ -1364,21 +1249,6 @@ dependencies = [ "simple_asn1", ] -[[package]] -name = "kv-log-macro" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" -dependencies = [ - "log 0.4.29", -] - -[[package]] -name = "language-tags" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" - [[package]] name = "language-tags" version = "0.3.2" @@ -1431,12 +1301,6 @@ version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" -[[package]] -name = "linux-raw-sys" -version = "0.4.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" - [[package]] name = "linux-raw-sys" version = "0.11.0" @@ -1458,23 +1322,11 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "log" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" -dependencies = [ - "log 0.4.29", -] - [[package]] name = "log" version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" -dependencies = [ - "value-bag", -] [[package]] name = "lru-cache" @@ -1500,12 +1352,6 @@ dependencies = [ "regex-automata", ] -[[package]] -name = "matches" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" - [[package]] name = "md5" version = "0.8.0" @@ -1518,15 +1364,6 @@ version = "2.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" -[[package]] -name = "mime" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba626b8a6de5da682e1caa06bdb42a335aee5a84db8e5046a3e8ab17ba0a3ae0" -dependencies = [ - "log 0.3.9", -] - [[package]] name = "mime" version = "0.3.17" @@ -1546,7 +1383,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" dependencies = [ "libc", - "wasi 0.11.1+wasi-snapshot-preview1", + "wasi", "windows-sys 0.61.2", ] @@ -1613,21 +1450,10 @@ dependencies = [ "autocfg", ] -[[package]] -name = "num_cpus" -version = "1.17.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" -dependencies = [ - "hermit-abi 0.5.2", - "libc", -] - [[package]] name = "ofborg" version = "0.1.9" dependencies = [ - "async-std", "brace-expand", "chrono", "either", @@ -1636,11 +1462,14 @@ dependencies = [ "hex", "hmac", "http 1.4.0", + "http-body-util", "hubcaps", - "hyper 0.10.16", + "hyper", + "hyper-util", "lapin", "lru-cache", "md5", + "mime", "nom 4.2.3", "regex", "rustls-pemfile", @@ -1648,6 +1477,8 @@ dependencies = [ "serde_json", "sha2", "tempfile", + "tokio", + "tokio-stream", "tracing", "tracing-subscriber", "uuid", @@ -1657,7 +1488,7 @@ dependencies = [ name = "ofborg-simple-build" version = "0.1.0" dependencies = [ - "log 0.4.29", + "log", "ofborg", ] @@ -1761,12 +1592,6 @@ dependencies = [ "base64ct", ] -[[package]] -name = "percent-encoding" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" - [[package]] name = "percent-encoding" version = "2.3.2" @@ -1849,7 +1674,7 @@ dependencies = [ "cfg-if", "concurrent-queue", "libc", - "log 0.4.29", + "log", "pin-project-lite", "windows-sys 0.48.0", ] @@ -2070,12 +1895,12 @@ dependencies = [ "http 1.4.0", "http-body", "http-body-util", - "hyper 1.8.1", + "hyper", "hyper-rustls", "hyper-util", "js-sys", - "log 0.4.29", - "percent-encoding 2.3.2", + "log", + "percent-encoding", "pin-project-lite", "quinn", "rustls", @@ -2089,7 +1914,7 @@ dependencies = [ "tower", "tower-http", "tower-service", - "url 2.5.7", + "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", @@ -2154,19 +1979,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "rustix" -version = "0.38.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" -dependencies = [ - "bitflags 2.10.0", - "errno", - "libc", - "linux-raw-sys 0.4.15", - "windows-sys 0.59.0", -] - [[package]] name = "rustix" version = "1.1.2" @@ -2200,7 +2012,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70cc376c6ba1823ae229bacf8ad93c136d93524eab0e4e5e0e4f96b9c4e5b212" dependencies = [ - "log 0.4.29", + "log", "rustls", "rustls-native-certs", "rustls-pki-types", @@ -2262,12 +2074,6 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" -[[package]] -name = "safemem" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" - [[package]] name = "salsa20" version = "0.10.2" @@ -2418,15 +2224,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" -[[package]] -name = "signal-hook-registry" -version = "1.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7664a098b8e616bdfcc2dc0e9ac44eb231eedf41db4e9fe95d8d32ec728dedad" -dependencies = [ - "libc", -] - [[package]] name = "simple_asn1" version = "0.6.3" @@ -2436,7 +2233,7 @@ dependencies = [ "num-bigint", "num-traits", "thiserror", - "time 0.3.44", + "time", ] [[package]] @@ -2593,17 +2390,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "time" -version = "0.1.45" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi", -] - [[package]] name = "time" version = "0.3.44" @@ -2671,9 +2457,21 @@ dependencies = [ "mio", "pin-project-lite", "socket2 0.6.1", + "tokio-macros", "windows-sys 0.61.2", ] +[[package]] +name = "tokio-macros" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tokio-rustls" version = "0.26.4" @@ -2684,6 +2482,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tower" version = "0.5.2" @@ -2767,7 +2589,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" dependencies = [ - "log 0.4.29", + "log", "once_cell", "tracing-core", ] @@ -2803,66 +2625,30 @@ dependencies = [ "tracing-serde", ] -[[package]] -name = "traitobject" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04a79e25382e2e852e8da874249358d382ebaf259d0d34e75d8db16a7efabbc7" - [[package]] name = "try-lock" version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "typeable" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1410f6f91f21d1612654e7cc69193b0334f909dcf2c790c4826254fbb86f8887" - [[package]] name = "typenum" version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" -[[package]] -name = "unicase" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f4765f83163b74f957c797ad9253caf97f103fb064d3999aea9568d09fc8a33" -dependencies = [ - "version_check 0.1.5", -] - [[package]] name = "unicase" version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" -[[package]] -name = "unicode-bidi" -version = "0.3.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" - [[package]] name = "unicode-ident" version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" -[[package]] -name = "unicode-normalization" -version = "0.1.25" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fd4f6878c9cb28d874b009da9e8d183b5abc80117c40bbd187a1fde336be6e8" -dependencies = [ - "tinyvec", -] - [[package]] name = "untrusted" version = "0.7.1" @@ -2875,17 +2661,6 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" -[[package]] -name = "url" -version = "1.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" -dependencies = [ - "idna 0.1.5", - "matches", - "percent-encoding 1.0.1", -] - [[package]] name = "url" version = "2.5.7" @@ -2893,8 +2668,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" dependencies = [ "form_urlencoded", - "idna 1.1.0", - "percent-encoding 2.3.2", + "idna", + "percent-encoding", "serde", ] @@ -2921,12 +2696,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" -[[package]] -name = "value-bag" -version = "1.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ba6f5989077681266825251a52748b8c1d8a4ad098cc37e440103d0ea717fc0" - [[package]] name = "version_check" version = "0.1.5" @@ -2954,12 +2723,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasi" version = "0.11.1+wasi-snapshot-preview1" @@ -3161,15 +2924,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-sys" -version = "0.59.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" -dependencies = [ - "windows-targets 0.52.6", -] - [[package]] name = "windows-sys" version = "0.60.2" @@ -3411,7 +3165,7 @@ dependencies = [ "oid-registry", "rusticata-macros", "thiserror", - "time 0.3.44", + "time", ] [[package]] diff --git a/ofborg/Cargo.toml b/ofborg/Cargo.toml index 38aad489..b485b106 100644 --- a/ofborg/Cargo.toml +++ b/ofborg/Cargo.toml @@ -6,7 +6,6 @@ build = "build.rs" edition = "2024" [dependencies] -async-std = { version = "=1.12.0", features = ["unstable", "tokio1"] } brace-expand = "0.1.0" chrono = { version = "0.4.38", default-features = false, features = [ "clock", @@ -18,14 +17,16 @@ futures-util = "0.3.31" hex = "0.4.3" hmac = "0.12.1" http = "1" +http-body-util = "0.1" #hubcaps = "0.6" # for Conclusion::Skipped which is in master hubcaps = { git = "https://github.com/ofborg/hubcaps.git", rev = "50dbe6ec45c9dfea4e3cfdf27bbadfa565f69dec", default-features = false, features = ["app", "rustls-tls"] } -# hyper = { version = "0.14", features = ["full"] } -hyper = "=0.10.*" +hyper = { version = "1.0", features = ["full", "server", "http1"] } +hyper-util = { version = "0.1", features = ["server", "tokio", "http1"] } lapin = "2.5.4" lru-cache = "0.1.2" md5 = "0.8.0" +mime = "0.3" nom = "4.2.3" regex = "1.11.1" rustls-pemfile = "2.2.0" @@ -33,6 +34,8 @@ serde = { version = "1.0.217", features = ["derive"] } serde_json = "1.0.135" sha2 = "0.10.8" tempfile = "3.15.0" +tokio = { version = "1", features = ["rt-multi-thread", "net", "macros", "sync"] } +tokio-stream = "0.1" tracing = "0.1.41" tracing-subscriber = { version = "0.3.19", features = ["json", "env-filter"] } uuid = { version = "1.12", features = ["v4"] } diff --git a/ofborg/src/bin/build-faker.rs b/ofborg/src/bin/build-faker.rs index bf510e5b..e543e15b 100644 --- a/ofborg/src/bin/build-faker.rs +++ b/ofborg/src/bin/build-faker.rs @@ -1,8 +1,7 @@ -use std::env; -use std::error::Error; - use lapin::BasicProperties; use lapin::message::Delivery; +use std::env; +use std::error::Error; use ofborg::block_on; use ofborg::commentparser; diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index 25778a08..a671cae3 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -1,8 +1,9 @@ use std::env; use std::error::Error; +use std::future::Future; use std::path::Path; +use std::pin::Pin; -use async_std::task::{JoinHandle, spawn}; use futures_util::future; use ofborg::block_on; use tracing::{error, info, warn}; @@ -25,7 +26,7 @@ fn main() -> Result<(), Box> { }; let conn = easylapin::from_config(&builder_cfg.rabbitmq)?; - let mut handles = Vec::new(); + let mut handles: Vec + Send>>> = Vec::new(); for system in &cfg.nix.system { let handle_ext = self::create_handle(&conn, &cfg, system.to_string())?; @@ -39,11 +40,12 @@ fn main() -> Result<(), Box> { Ok(()) } +#[allow(clippy::type_complexity)] fn create_handle( conn: &lapin::Connection, cfg: &config::Config, system: String, -) -> Result, Box> { +) -> Result + Send>>, Box> { let mut chan = block_on(conn.create_channel())?; let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root)); @@ -105,5 +107,5 @@ fn create_handle( )?; info!("Fetching jobs from {}", &queue_name); - Ok(spawn(handle)) + Ok(handle) } diff --git a/ofborg/src/bin/github-webhook-receiver.rs b/ofborg/src/bin/github-webhook-receiver.rs index 22a56845..ef570f68 100644 --- a/ofborg/src/bin/github-webhook-receiver.rs +++ b/ofborg/src/bin/github-webhook-receiver.rs @@ -1,30 +1,28 @@ use std::env; use std::error::Error; -use std::io::Read as _; -#[macro_use] -extern crate hyper; +use std::net::SocketAddr; +use std::sync::Arc; use hmac::{Hmac, Mac}; -use hyper::header::ContentType; -use hyper::mime; -use hyper::{ - server::{Request, Response, Server}, - status::StatusCode, -}; +use http::{Method, StatusCode}; +use http_body_util::{BodyExt, Full}; +use hyper::body::{Bytes, Incoming}; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; use lapin::options::BasicPublishOptions; use lapin::{BasicProperties, Channel}; -use ofborg::block_on; use ofborg::ghevent::GenericWebhook; use ofborg::{config, easyamqp, easyamqp::ChannelExt, easylapin}; use sha2::Sha256; +use tokio::net::TcpListener; +use tokio::sync::Mutex; use tracing::{error, info, warn}; -header! { (XHubSignature256, "X-Hub-Signature-256") => [String] } -header! { (XGithubEvent, "X-Github-Event") => [String] } - /// Prepares the the exchange we will write to, the queues that are bound to it /// and binds them. -fn setup_amqp(chan: &mut Channel) -> Result<(), Box> { +fn setup_amqp(chan: &mut Channel) -> Result<(), Box> { chan.declare_exchange(easyamqp::ExchangeConfig { exchange: "github-events".to_owned(), exchange_type: easyamqp::ExchangeType::Topic, @@ -85,118 +83,152 @@ fn setup_amqp(chan: &mut Channel) -> Result<(), Box> { Ok(()) } -fn handle_request(mut req: Request, mut res: Response, webhook_secret: &str, chan: &Channel) { +fn response(status: StatusCode, body: &'static str) -> Response> { + Response::builder() + .status(status) + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +fn empty_response(status: StatusCode) -> Response> { + Response::builder() + .status(status) + .body(Full::new(Bytes::new())) + .unwrap() +} + +async fn handle_request( + req: Request, + webhook_secret: Arc, + chan: Arc>, +) -> Result>, hyper::Error> { // HTTP 405 - if req.method != hyper::Post { - *res.status_mut() = StatusCode::MethodNotAllowed; - return; + if req.method() != Method::POST { + return Ok(empty_response(StatusCode::METHOD_NOT_ALLOWED)); } - let hdr = req.headers.clone(); + + // Get headers before consuming body + let sig_header = req + .headers() + .get("X-Hub-Signature-256") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + let event_type = req + .headers() + .get("X-Github-Event") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); + let content_type = req + .headers() + .get("Content-Type") + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_string()); // Read body - let mut raw = Vec::new(); - if req.read_to_end(&mut raw).is_err() { - warn!("Failed to read body from client"); - *res.status_mut() = StatusCode::InternalServerError; - return; - } - let raw = raw.as_slice(); + let raw = match req.collect().await { + Ok(collected) => collected.to_bytes(), + Err(e) => { + warn!("Failed to read body from client: {e}"); + return Ok(response( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to read body", + )); + } + }; // Validate signature - { - let Some(sig) = hdr.get::() else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Missing signature header"); - return; - }; - let mut components = sig.splitn(2, '='); - let Some(algo) = components.next() else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Signature hash method missing"); - return; - }; - let Some(hash) = components.next() else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Signature hash missing"); - return; - }; - let Ok(hash) = hex::decode(hash) else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Invalid signature hash hex"); - return; - }; - - if algo != "sha256" { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Invalid signature hash method"); - return; - } + let Some(sig) = sig_header else { + return Ok(response( + StatusCode::BAD_REQUEST, + "Missing signature header", + )); + }; + let mut components = sig.splitn(2, '='); + let Some(algo) = components.next() else { + return Ok(response( + StatusCode::BAD_REQUEST, + "Signature hash method missing", + )); + }; + let Some(hash) = components.next() else { + return Ok(response(StatusCode::BAD_REQUEST, "Signature hash missing")); + }; + let Ok(hash) = hex::decode(hash) else { + return Ok(response( + StatusCode::BAD_REQUEST, + "Invalid signature hash hex", + )); + }; - let Ok(mut mac) = Hmac::::new_from_slice(webhook_secret.as_bytes()) else { - *res.status_mut() = StatusCode::InternalServerError; - error!("Unable to create HMAC from secret"); - return; - }; - mac.update(raw); - if mac.verify_slice(hash.as_slice()).is_err() { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Signature verification failed"); - return; - } + if algo != "sha256" { + return Ok(response( + StatusCode::BAD_REQUEST, + "Invalid signature hash method", + )); + } + + let Ok(mut mac) = Hmac::::new_from_slice(webhook_secret.as_bytes()) else { + error!("Unable to create HMAC from secret"); + return Ok(response( + StatusCode::INTERNAL_SERVER_ERROR, + "Internal error", + )); + }; + mac.update(&raw); + if mac.verify_slice(hash.as_slice()).is_err() { + return Ok(response( + StatusCode::BAD_REQUEST, + "Signature verification failed", + )); } // Parse body - let Some(ct) = hdr.get::() else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"No Content-Type header passed"); - return; + let Some(ct) = content_type else { + return Ok(response( + StatusCode::BAD_REQUEST, + "No Content-Type header passed", + )); }; - if ct - != &ContentType(mime::Mime( - mime::TopLevel::Application, - mime::SubLevel::Json, - Vec::new(), - )) - { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Content-Type is not application/json. Webhook misconfigured?"); - return; + if !ct.contains("application/json") { + return Ok(response( + StatusCode::BAD_REQUEST, + "Content-Type is not application/json. Webhook misconfigured?", + )); } - let input = match serde_json::from_slice::(raw) { + + let input = match serde_json::from_slice::(&raw) { Ok(i) => i, Err(e) => { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Invalid JSON"); error!("Invalid JSON received: {e}"); - return; + return Ok(response(StatusCode::BAD_REQUEST, "Invalid JSON")); } }; // Build routing key - let Some(event_type) = hdr.get::() else { - *res.status_mut() = StatusCode::BadRequest; - let _ = res.send(b"Missing event type"); - return; + let Some(event_type) = event_type else { + return Ok(response(StatusCode::BAD_REQUEST, "Missing event type")); }; let routing_key = format!("{event_type}.{}", input.repository.full_name.to_lowercase()); // Publish message - let _confirmation = block_on(async { - chan.basic_publish( + let chan = chan.lock().await; + let _confirmation = chan + .basic_publish( "github-events", &routing_key, BasicPublishOptions::default(), - raw, + &raw, BasicProperties::default() .with_content_type("application/json".into()) .with_delivery_mode(2), // persistent ) - .await - }); - *res.status_mut() = StatusCode::NoContent; + .await; + + Ok(empty_response(StatusCode::NO_CONTENT)) } -fn main() -> Result<(), Box> { +#[tokio::main] +async fn main() -> Result<(), Box> { ofborg::setup_log(); let arg = env::args() @@ -209,22 +241,31 @@ fn main() -> Result<(), Box> { let webhook_secret = std::fs::read_to_string(cfg.webhook_secret_file) .expect("Unable to read webhook secret file"); - let webhook_secret = webhook_secret.trim().to_string(); + let webhook_secret = Arc::new(webhook_secret.trim().to_string()); let conn = easylapin::from_config(&cfg.rabbitmq)?; - let mut chan = block_on(conn.create_channel())?; + let mut chan = conn.create_channel().await?; setup_amqp(&mut chan)?; + let chan = Arc::new(Mutex::new(chan)); - //let events = stats::RabbitMq::from_lapin(&cfg.whoami(), block_on(conn.create_channel())?); - let threads = std::thread::available_parallelism() - .map(|x| x.get()) - .unwrap_or(1); - info!("Will listen on {} with {threads} threads", cfg.listen); - Server::http(cfg.listen)?.handle_threads( - move |req: Request, res: Response| { - handle_request(req, res, &webhook_secret, &chan); - }, - threads, - )?; - Ok(()) + let addr: SocketAddr = cfg.listen.parse()?; + let listener = TcpListener::bind(addr).await?; + info!("Listening on {}", addr); + + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + + let webhook_secret = webhook_secret.clone(); + let chan = chan.clone(); + + tokio::task::spawn(async move { + let service = + service_fn(move |req| handle_request(req, webhook_secret.clone(), chan.clone())); + + if let Err(err) = http1::Builder::new().serve_connection(io, service).await { + warn!("Error serving connection: {:?}", err); + } + }); + } } diff --git a/ofborg/src/bin/logapi.rs b/ofborg/src/bin/logapi.rs index 966def22..46a80058 100644 --- a/ofborg/src/bin/logapi.rs +++ b/ofborg/src/bin/logapi.rs @@ -1,12 +1,15 @@ -use std::{collections::HashMap, error::Error, path::PathBuf}; - -use hyper::{ - header::ContentType, - mime, - server::{Request, Response, Server}, - status::StatusCode, -}; +use std::net::SocketAddr; +use std::{collections::HashMap, error::Error, path::PathBuf, sync::Arc}; + +use http::{Method, StatusCode}; +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; use ofborg::config; +use tokio::net::TcpListener; use tracing::{error, info, warn}; #[derive(serde::Serialize, Default)] @@ -27,28 +30,39 @@ struct LogApiConfig { serve_root: String, } -fn handle_request(req: Request, mut res: Response, cfg: &LogApiConfig) { - if req.method != hyper::Get { - *res.status_mut() = StatusCode::MethodNotAllowed; - return; +fn response(status: StatusCode, body: &'static str) -> Response> { + Response::builder() + .status(status) + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +fn json_response(status: StatusCode, body: String) -> Response> { + Response::builder() + .status(status) + .header("Content-Type", "application/json") + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +async fn handle_request( + req: Request, + cfg: Arc, +) -> Result>, hyper::Error> { + if req.method() != Method::GET { + return Ok(response(StatusCode::METHOD_NOT_ALLOWED, "")); } - let uri = req.uri.to_string(); + let uri = req.uri().path().to_string(); let Some(reqd) = uri.strip_prefix("/logs/").map(ToOwned::to_owned) else { - *res.status_mut() = StatusCode::NotFound; - let _ = res.send(b"invalid uri"); - return; + return Ok(response(StatusCode::NOT_FOUND, "invalid uri")); }; let path: PathBuf = [&cfg.logs_path, &reqd].iter().collect(); let Ok(path) = std::fs::canonicalize(&path) else { - *res.status_mut() = StatusCode::NotFound; - let _ = res.send(b"absent"); - return; + return Ok(response(StatusCode::NOT_FOUND, "absent")); }; let Ok(iter) = std::fs::read_dir(path) else { - *res.status_mut() = StatusCode::NotFound; - let _ = res.send(b"non dir"); - return; + return Ok(response(StatusCode::NOT_FOUND, "non dir")); }; let mut attempts = HashMap::::new(); @@ -56,9 +70,7 @@ fn handle_request(req: Request, mut res: Response, cfg: &LogApiConfig) { let Ok(e) = e else { continue }; let e_metadata = e.metadata(); if e_metadata.as_ref().map(|v| v.is_dir()).unwrap_or(true) { - *res.status_mut() = StatusCode::InternalServerError; - let _ = res.send(b"dir found"); - return; + return Ok(response(StatusCode::INTERNAL_SERVER_ERROR, "dir found")); } if e_metadata.as_ref().map(|v| v.is_file()).unwrap_or_default() { @@ -97,21 +109,12 @@ fn handle_request(req: Request, mut res: Response, cfg: &LogApiConfig) { } } - *res.status_mut() = StatusCode::Ok; - res.headers_mut() - .set::(hyper::header::ContentType(mime::Mime( - mime::TopLevel::Application, - mime::SubLevel::Json, - Vec::new(), - ))); - let _ = res.send( - serde_json::to_string(&LogResponse { attempts }) - .unwrap_or_default() - .as_bytes(), - ); + let body = serde_json::to_string(&LogResponse { attempts }).unwrap_or_default(); + Ok(json_response(StatusCode::OK, body)) } -fn main() -> Result<(), Box> { +#[tokio::main] +async fn main() -> Result<(), Box> { ofborg::setup_log(); let arg = std::env::args() @@ -122,20 +125,27 @@ fn main() -> Result<(), Box> { panic!(); }; - let api_cfg = LogApiConfig { + let api_cfg = Arc::new(LogApiConfig { logs_path: cfg.logs_path, serve_root: cfg.serve_root, - }; + }); - let threads = std::thread::available_parallelism() - .map(|x| x.get()) - .unwrap_or(1); - info!("Will listen on {} with {threads} threads", cfg.listen); - Server::http(cfg.listen)?.handle_threads( - move |req: Request, res: Response| { - handle_request(req, res, &api_cfg); - }, - threads, - )?; - Ok(()) + let addr: SocketAddr = cfg.listen.parse()?; + let listener = TcpListener::bind(addr).await?; + info!("Listening on {}", addr); + + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + + let api_cfg = api_cfg.clone(); + + tokio::task::spawn(async move { + let service = service_fn(move |req| handle_request(req, api_cfg.clone())); + + if let Err(err) = http1::Builder::new().serve_connection(io, service).await { + warn!("Error serving connection: {:?}", err); + } + }); + } } diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs index 848852d4..62a29651 100644 --- a/ofborg/src/bin/stats.rs +++ b/ofborg/src/bin/stats.rs @@ -1,24 +1,53 @@ use std::env; use std::error::Error; +use std::net::SocketAddr; use std::sync::Arc; -use std::thread; -use hyper::server::{Request, Response, Server}; +use http::StatusCode; +use http_body_util::Full; +use hyper::body::Bytes; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Request, Response}; +use hyper_util::rt::TokioIo; use ofborg::block_on; -use tracing::{error, info}; +use tokio::net::TcpListener; +use tracing::{error, info, warn}; use ofborg::easyamqp::{ChannelExt, ConsumerExt}; use ofborg::{config, easyamqp, easylapin, stats, tasks}; -fn run_http_server(metrics: Arc) { - let addr = "0.0.0.0:9898"; +fn response(body: String) -> Response> { + Response::builder() + .status(StatusCode::OK) + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +async fn run_http_server( + addr: SocketAddr, + metrics: Arc, +) -> Result<(), Box> { + let listener = TcpListener::bind(addr).await?; info!("HTTP server listening on {}", addr); - Server::http(addr) - .expect("Failed to bind HTTP server") - .handle(move |_: Request, res: Response| { - res.send(metrics.prometheus_output().as_bytes()).unwrap(); - }) - .expect("Failed to start HTTP server"); + + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + + let metrics = metrics.clone(); + + tokio::task::spawn(async move { + let service = service_fn(move |_req: Request| { + let metrics = metrics.clone(); + async move { Ok::<_, hyper::Error>(response(metrics.prometheus_output())) } + }); + + if let Err(err) = http1::Builder::new().serve_connection(io, service).await { + warn!("Error serving connection: {:?}", err); + } + }); + } } fn main() -> Result<(), Box> { @@ -82,10 +111,14 @@ fn main() -> Result<(), Box> { }, )?; - // Spawn HTTP server in a separate thread + // Spawn HTTP server in a separate thread with its own tokio runtime let metrics_clone = metrics.clone(); - thread::spawn(move || { - run_http_server(metrics_clone); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"); + let addr: SocketAddr = "0.0.0.0:9898".parse().unwrap(); + if let Err(e) = rt.block_on(run_http_server(addr, metrics_clone)) { + error!("HTTP server error: {:?}", e); + } }); info!("Fetching jobs from {}", &queue_name); diff --git a/ofborg/src/commitstatus.rs b/ofborg/src/commitstatus.rs index b982d904..53de8881 100644 --- a/ofborg/src/commitstatus.rs +++ b/ofborg/src/commitstatus.rs @@ -1,3 +1,4 @@ +use crate::block_on; use futures_util::future::TryFutureExt; use tracing::warn; @@ -57,7 +58,7 @@ impl CommitStatus { } else { self.description.clone() }; - crate::block_on( + block_on( self.api .create( self.sha.as_ref(), diff --git a/ofborg/src/easylapin.rs b/ofborg/src/easylapin.rs index 4daf4fee..e4d36906 100644 --- a/ofborg/src/easylapin.rs +++ b/ofborg/src/easylapin.rs @@ -9,8 +9,6 @@ use crate::notifyworker::{NotificationReceiver, SimpleNotifyWorker}; use crate::ofborg; use crate::worker::{Action, SimpleWorker}; -use async_std::future::Future; -use async_std::stream::StreamExt; use lapin::message::Delivery; use lapin::options::{ BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions, @@ -18,6 +16,7 @@ use lapin::options::{ }; use lapin::types::{AMQPValue, FieldTable}; use lapin::{BasicProperties, Channel, Connection, ConnectionProperties, ExchangeKind}; +use tokio_stream::StreamExt; use tracing::{debug, trace}; pub fn from_config(cfg: &RabbitMqConfig) -> Result { diff --git a/ofborg/src/lib.rs b/ofborg/src/lib.rs index 21374903..9dbf68a1 100644 --- a/ofborg/src/lib.rs +++ b/ofborg/src/lib.rs @@ -111,6 +111,20 @@ pub fn setup_log() { } /// Block on a future from synchronous code. +/// +/// This helper bridges sync and async code throughout the codebase, +/// used for both RabbitMQ (lapin) and GitHub API (hubcaps) operations. pub fn block_on(f: F) -> F::Output { - async_std::task::block_on(f) + // Try to use the current runtime if we're already in one + if let Ok(handle) = tokio::runtime::Handle::try_current() { + // We're inside a tokio runtime, use block_in_place + tokio::task::block_in_place(|| handle.block_on(f)) + } else { + // Create a new runtime for this blocking call + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Failed to create tokio runtime") + .block_on(f) + } }