Skip to content

Conversation

@noot
Copy link
Contributor

@noot noot commented Jun 24, 2025

  • refactor NodeStatusUpdater::process_nodes to run concurrently by storing tasks in a FuturesUnordered
  • this required refactoring the trait objects to use enums instead, as previously they couldn't be moved between tasks
  • also required moving some of the node group plugin methods to be standalone functions, i didn't change any code logic there.

closes #566

@noot noot requested a review from JannikSt June 25, 2025 17:36
@JannikSt JannikSt requested a review from Copilot July 8, 2025 13:09
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the node status updater to process nodes concurrently and replaces several trait-object–based plugin APIs with enum-based dispatch and standalone functions, enabling safe Send + 'static usage across tokio tasks.

  • Run NodeStatusUpdater::process_nodes in parallel via FuturesUnordered
  • Replace Box<dyn …Plugin> trait objects with StatusUpdatePlugin and SchedulerPlugin enums
  • Extract many plugin methods into standalone functions and update observer registration

Reviewed Changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated no comments.

Show a summary per file
File Description
crates/orchestrator/src/store/domains/task_store.rs Changed task observers to NodeGroupsPlugin and removed trait
crates/orchestrator/src/status_update/mod.rs Rewrote process_nodes to spawn and collect futures concurrently
crates/orchestrator/src/scheduler/mod.rs Switched scheduler plugins from Box<dyn> to SchedulerPlugin enum
crates/orchestrator/src/plugins/webhook/mod.rs Adjusted handle_status_change signature and removed trait impl
crates/orchestrator/src/plugins/traits.rs Deleted trait definitions; plugins now use enums & functions
crates/orchestrator/src/plugins/node_groups/tests.rs Updated node_groups tests to new API (add_observer, free fns)
crates/orchestrator/src/plugins/node_groups/status_update_impl.rs Inlined handle_status_change into NodeGroupsPlugin impl
crates/orchestrator/src/plugins/node_groups/scheduler_impl.rs Inlined filter_tasks into NodeGroupsPlugin impl
crates/orchestrator/src/plugins/node_groups/mod.rs Extracted many methods to free functions; removed trait cruft
crates/orchestrator/src/plugins/newest_task/mod.rs Made filter_tasks synchronous and updated its tests
crates/orchestrator/src/plugins/mod.rs Introduced StatusUpdatePlugin and SchedulerPlugin enums
crates/orchestrator/src/main.rs Updated plugin registration to use enums and add_observer
crates/orchestrator/src/lib.rs Removed unused events module import
crates/orchestrator/src/events/mod.rs Deleted TaskObserver trait definition
crates/orchestrator/src/discovery/monitor.rs Changed status_change_handlers to use StatusUpdatePlugin enum
crates/orchestrator/src/api/routes/task.rs Switched to free get_task_topologies function
crates/orchestrator/src/api/routes/storage.rs Updated storage tests to use new plugin API
Cargo.toml Added manual_let_else lint and removed duplicate entry
Comments suppressed due to low confidence (6)

crates/orchestrator/src/store/domains/task_store.rs:17

  • Restricting observers to NodeGroupsPlugin prevents other plugins from subscribing. Consider using a trait object or the new enum type to allow any observer.
    observers: Arc<Mutex<Vec<Arc<NodeGroupsPlugin>>>>,

crates/orchestrator/src/status_update/mod.rs:144

  • [nitpick] New concurrent processing in process_nodes should be backed by unit or integration tests to ensure error handling and performance behave as expected.
    pub async fn process_nodes(&self) -> Result<(), anyhow::Error> {

crates/orchestrator/src/store/domains/task_store.rs:2

  • The import path crate::NodeGroupsPlugin is incorrect; NodeGroupsPlugin lives under crate::plugins::node_groups. Update to use crate::plugins::node_groups::NodeGroupsPlugin;.
use crate::NodeGroupsPlugin;

crates/orchestrator/src/store/domains/task_store.rs:4

  • The futures::future import is unused after refactoring; consider removing it for clarity.
use futures::future;

crates/orchestrator/src/status_update/mod.rs:148

  • The FuturesUnordered instance is mutated later with .push(...) and must be declared let mut futures.
        let futures = FuturesUnordered::new();

crates/orchestrator/src/status_update/mod.rs:200

  • Missing import for Instant; add use std::time::Instant; at the top of the module.
    let start_time = Instant::now();

@JannikSt JannikSt merged commit bf2826c into develop Jul 8, 2025
1 check passed
@JannikSt JannikSt deleted the noot/process-nodes-concurrently branch July 8, 2025 13:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ensure node_status_update loop in orchestrator can run in parallel

3 participants