diff --git a/engine/artifacts/openapi.json b/engine/artifacts/openapi.json index e9f3f44bd3..08b0886563 100644 --- a/engine/artifacts/openapi.json +++ b/engine/artifacts/openapi.json @@ -1682,7 +1682,12 @@ ], "properties": { "normal": { - "type": "object" + "type": "object", + "properties": { + "drain_on_version_upgrade": { + "type": "boolean" + } + } } } }, @@ -1701,6 +1706,9 @@ "max_runners" ], "properties": { + "drain_on_version_upgrade": { + "type": "boolean" + }, "headers": { "type": [ "object", diff --git a/engine/packages/api-types/src/namespaces/runner_configs.rs b/engine/packages/api-types/src/namespaces/runner_configs.rs index 26f1d87d29..19643b60a9 100644 --- a/engine/packages/api-types/src/namespaces/runner_configs.rs +++ b/engine/packages/api-types/src/namespaces/runner_configs.rs @@ -9,6 +9,8 @@ pub struct RunnerConfig { pub kind: RunnerConfigKind, #[serde(default, skip_serializing_if = "Option::is_none")] pub metadata: Option, + #[serde(default = "default_drain_on_version_upgrade")] + pub drain_on_version_upgrade: bool, } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] @@ -27,9 +29,17 @@ pub enum RunnerConfigKind { }, } +fn default_drain_on_version_upgrade() -> bool { + false +} + impl Into for RunnerConfig { fn into(self) -> rivet_types::runner_configs::RunnerConfig { - let RunnerConfig { kind, metadata } = self; + let RunnerConfig { + kind, + metadata, + drain_on_version_upgrade, + } = self; let kind = match kind { RunnerConfigKind::Normal {} => rivet_types::runner_configs::RunnerConfigKind::Normal {}, RunnerConfigKind::Serverless { @@ -50,7 +60,10 @@ impl Into for RunnerConfig { runners_margin: runners_margin.unwrap_or_default(), }, }; - - rivet_types::runner_configs::RunnerConfig { kind, metadata } + rivet_types::runner_configs::RunnerConfig { + kind, + metadata, + drain_on_version_upgrade, + } } } diff --git a/engine/packages/engine/tests/actors_kv_drop.rs b/engine/packages/engine/tests/actors_kv_drop.rs index 1b8c837f65..5b1699ad96 100644 --- a/engine/packages/engine/tests/actors_kv_drop.rs +++ b/engine/packages/engine/tests/actors_kv_drop.rs @@ -1,7 +1,7 @@ use anyhow::*; use async_trait::async_trait; use common::test_runner::*; -use rivet_runner_protocol as rp; +use rivet_runner_protocol::mk2 as rp; use std::sync::{Arc, Mutex}; mod common; diff --git a/engine/packages/engine/tests/actors_kv_list.rs b/engine/packages/engine/tests/actors_kv_list.rs index fe5bdd856b..01bd456bb9 100644 --- a/engine/packages/engine/tests/actors_kv_list.rs +++ b/engine/packages/engine/tests/actors_kv_list.rs @@ -1,7 +1,7 @@ use anyhow::*; use async_trait::async_trait; use common::test_runner::*; -use rivet_runner_protocol as rp; +use rivet_runner_protocol::mk2 as rp; use std::sync::{Arc, Mutex}; mod common; diff --git a/engine/packages/engine/tests/actors_kv_misc.rs b/engine/packages/engine/tests/actors_kv_misc.rs index 6e1d450ef9..b68008e1a8 100644 --- a/engine/packages/engine/tests/actors_kv_misc.rs +++ b/engine/packages/engine/tests/actors_kv_misc.rs @@ -1,7 +1,7 @@ use anyhow::*; use async_trait::async_trait; use common::test_runner::*; -use rivet_runner_protocol as rp; +use rivet_runner_protocol::mk2 as rp; use std::sync::{Arc, Mutex}; mod common; diff --git a/engine/packages/engine/tests/api_runner_configs_list.rs b/engine/packages/engine/tests/api_runner_configs_list.rs index ce31f5d9c5..69e9f2c054 100644 --- a/engine/packages/engine/tests/api_runner_configs_list.rs +++ b/engine/packages/engine/tests/api_runner_configs_list.rs @@ -41,6 +41,7 @@ fn list_runner_configs_single_runner() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -95,6 +96,7 @@ fn list_runner_configs_multiple_runners() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -147,6 +149,7 @@ fn list_runner_configs_multiple_dcs() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); datacenters.insert( @@ -154,6 +157,7 @@ fn list_runner_configs_multiple_dcs() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -210,6 +214,7 @@ fn list_runner_configs_filter_by_name() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -260,6 +265,7 @@ fn list_runner_configs_filter_by_variant_normal() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -320,6 +326,7 @@ fn list_runner_configs_filter_by_variant_serverless() { runners_margin: Some(2), }, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -453,6 +460,7 @@ fn list_runner_configs_validates_returned_data() { runners_margin: Some(3), }, metadata: Some(serde_json::json!({"key": "value"})), + drain_on_version_upgrade: true, }, ); @@ -499,6 +507,7 @@ fn list_runner_configs_validates_returned_data() { min_runners, max_runners, runners_margin, + .. } = &dc_config.config.kind { assert_eq!(url, "http://localhost:9000"); @@ -530,6 +539,7 @@ fn list_runner_configs_mixed_variants() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -565,6 +575,7 @@ fn list_runner_configs_mixed_variants() { runners_margin: Some(2), }, metadata: None, + drain_on_version_upgrade: true, }, ); diff --git a/engine/packages/engine/tests/api_runner_configs_upsert.rs b/engine/packages/engine/tests/api_runner_configs_upsert.rs index 074c0d7fe9..50474eb6f9 100644 --- a/engine/packages/engine/tests/api_runner_configs_upsert.rs +++ b/engine/packages/engine/tests/api_runner_configs_upsert.rs @@ -16,6 +16,7 @@ fn upsert_runner_config_normal_single_dc() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -50,6 +51,7 @@ fn upsert_runner_config_normal_multiple_dcs() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); datacenters.insert( @@ -57,6 +59,7 @@ fn upsert_runner_config_normal_multiple_dcs() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -97,6 +100,7 @@ fn upsert_runner_config_serverless() { runners_margin: Some(2), }, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -129,6 +133,7 @@ fn upsert_runner_config_update_existing() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -156,6 +161,7 @@ fn upsert_runner_config_update_existing() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: Some(serde_json::json!({"test": "value"})), + drain_on_version_upgrade: true, }, ); @@ -189,6 +195,7 @@ fn upsert_runner_config_returns_endpoint_changed() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -232,6 +239,7 @@ fn upsert_runner_config_with_metadata() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: Some(metadata_value), + drain_on_version_upgrade: true, }, ); @@ -268,6 +276,7 @@ fn upsert_runner_config_removes_missing_dcs() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); datacenters.insert( @@ -275,6 +284,7 @@ fn upsert_runner_config_removes_missing_dcs() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -300,6 +310,7 @@ fn upsert_runner_config_removes_missing_dcs() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -365,6 +376,7 @@ fn upsert_runner_config_empty_map_deletes_all() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -438,6 +450,7 @@ fn upsert_runner_config_non_existent_namespace() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -473,6 +486,7 @@ fn upsert_runner_config_overwrites_different_variant() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -506,6 +520,7 @@ fn upsert_runner_config_overwrites_different_variant() { runners_margin: Some(2), }, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -540,6 +555,7 @@ fn upsert_runner_config_idempotent() { rivet_api_types::namespaces::runner_configs::RunnerConfig { kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, metadata: None, + drain_on_version_upgrade: true, }, ); @@ -603,6 +619,7 @@ fn upsert_runner_config_serverless_slots_per_runner_zero() { runners_margin: Some(2), }, metadata: None, + drain_on_version_upgrade: true, }, ); diff --git a/engine/packages/engine/tests/common/test_runner/actor.rs b/engine/packages/engine/tests/common/test_runner/actor.rs index deb0c29bdd..1476f7d697 100644 --- a/engine/packages/engine/tests/common/test_runner/actor.rs +++ b/engine/packages/engine/tests/common/test_runner/actor.rs @@ -1,6 +1,6 @@ use anyhow::*; use async_trait::async_trait; -use rivet_runner_protocol as rp; +use rivet_runner_protocol::mk2 as rp; use std::time::Duration; use tokio::sync::{mpsc, oneshot}; @@ -47,33 +47,25 @@ impl ActorConfig { impl ActorConfig { /// Send a sleep intent pub fn send_sleep_intent(&self) { - let event = protocol::make_actor_intent( - &self.actor_id, - self.generation, - rp::ActorIntent::ActorIntentSleep, - ); + let event = protocol::make_actor_intent(rp::ActorIntent::ActorIntentSleep); self.send_event(event); } /// Send a stop intent pub fn send_stop_intent(&self) { - let event = protocol::make_actor_intent( - &self.actor_id, - self.generation, - rp::ActorIntent::ActorIntentStop, - ); + let event = protocol::make_actor_intent(rp::ActorIntent::ActorIntentStop); self.send_event(event); } /// Set an alarm to wake at specified timestamp (milliseconds) pub fn send_set_alarm(&self, alarm_ts: i64) { - let event = protocol::make_set_alarm(&self.actor_id, self.generation, Some(alarm_ts)); + let event = protocol::make_set_alarm(Some(alarm_ts)); self.send_event(event); } /// Clear the alarm pub fn send_clear_alarm(&self) { - let event = protocol::make_set_alarm(&self.actor_id, self.generation, None); + let event = protocol::make_set_alarm(None); self.send_event(event); } diff --git a/engine/packages/engine/tests/common/test_runner/protocol.rs b/engine/packages/engine/tests/common/test_runner/protocol.rs index 33deeb9980..9591c7fada 100644 --- a/engine/packages/engine/tests/common/test_runner/protocol.rs +++ b/engine/packages/engine/tests/common/test_runner/protocol.rs @@ -1,57 +1,51 @@ use anyhow::*; use rivet_runner_protocol as rp; +use rivet_runner_protocol::mk2 as rp2; use vbare::OwnedVersionedData; -pub const PROTOCOL_VERSION: u16 = rp::PROTOCOL_MK1_VERSION; +pub const PROTOCOL_VERSION: u16 = rp::PROTOCOL_MK2_VERSION; -/// Helper to decode messages from server -pub fn decode_to_client(buf: &[u8], protocol_version: u16) -> Result { +/// Helper to decode messages from server (MK2) +pub fn decode_to_client(buf: &[u8], protocol_version: u16) -> Result { // Use versioned deserialization to handle protocol version properly - ::deserialize(buf, protocol_version) + ::deserialize(buf, protocol_version) } -/// Helper to encode messages to server -pub fn encode_to_server(msg: rp::ToServer) -> Vec { - rp::versioned::ToServer::wrap_latest(msg) +/// Helper to encode messages to server (MK2) +pub fn encode_to_server(msg: rp2::ToServer) -> Vec { + rp::versioned::ToServerMk2::wrap_latest(msg) .serialize(PROTOCOL_VERSION) .expect("failed to serialize ToServer") } -/// Helper to create event wrapper with index -pub fn make_event_wrapper(index: u64, event: rp::Event) -> rp::EventWrapper { - rp::EventWrapper { - index: index as i64, +/// Helper to create event wrapper with checkpoint (MK2) +pub fn make_event_wrapper( + actor_id: &str, + generation: u32, + index: u64, + event: rp2::Event, +) -> rp2::EventWrapper { + rp2::EventWrapper { + checkpoint: rp2::ActorCheckpoint { + actor_id: actor_id.to_string(), + generation, + index: index as i64, + }, inner: event, } } -/// Helper to create actor state update event -pub fn make_actor_state_update( - actor_id: &str, - generation: u32, - state: rp::ActorState, -) -> rp::Event { - rp::Event::EventActorStateUpdate(rp::EventActorStateUpdate { - actor_id: actor_id.to_string(), - generation, - state, - }) +/// Helper to create actor state update event (MK2) +pub fn make_actor_state_update(state: rp2::ActorState) -> rp2::Event { + rp2::Event::EventActorStateUpdate(rp2::EventActorStateUpdate { state }) } -/// Helper to create actor intent event -pub fn make_actor_intent(actor_id: &str, generation: u32, intent: rp::ActorIntent) -> rp::Event { - rp::Event::EventActorIntent(rp::EventActorIntent { - actor_id: actor_id.to_string(), - generation, - intent, - }) +/// Helper to create actor intent event (MK2) +pub fn make_actor_intent(intent: rp2::ActorIntent) -> rp2::Event { + rp2::Event::EventActorIntent(rp2::EventActorIntent { intent }) } -/// Helper to create set alarm event -pub fn make_set_alarm(actor_id: &str, generation: u32, alarm_ts: Option) -> rp::Event { - rp::Event::EventActorSetAlarm(rp::EventActorSetAlarm { - actor_id: actor_id.to_string(), - generation, - alarm_ts, - }) +/// Helper to create set alarm event (MK2) +pub fn make_set_alarm(alarm_ts: Option) -> rp2::Event { + rp2::Event::EventActorSetAlarm(rp2::EventActorSetAlarm { alarm_ts }) } diff --git a/engine/packages/engine/tests/common/test_runner/runner.rs b/engine/packages/engine/tests/common/test_runner/runner.rs index cbc3be927a..44d43b7541 100644 --- a/engine/packages/engine/tests/common/test_runner/runner.rs +++ b/engine/packages/engine/tests/common/test_runner/runner.rs @@ -1,7 +1,7 @@ use super::{actor::*, protocol}; use anyhow::*; use futures_util::{SinkExt, StreamExt}; -use rivet_runner_protocol as rp; +use rivet_runner_protocol::mk2 as rp; use rivet_util::Id; use std::{ collections::HashMap, @@ -53,8 +53,8 @@ pub struct TestRunner { // State pub runner_id: Arc>>, actors: Arc>>, - last_command_idx: Arc>, - next_event_idx: Arc>, + /// Per-actor event indices for MK2 checkpoints + actor_event_indices: Arc>>, event_history: Arc>>, shutdown: Arc, is_child_task: bool, @@ -162,8 +162,7 @@ impl TestRunnerBuilder { config, runner_id: Arc::new(Mutex::new(None)), actors: Arc::new(Mutex::new(HashMap::new())), - last_command_idx: Arc::new(Mutex::new(-1)), - next_event_idx: Arc::new(Mutex::new(0)), + actor_event_indices: Arc::new(Mutex::new(HashMap::new())), event_history: Arc::new(Mutex::new(Vec::new())), shutdown: Arc::new(AtomicBool::new(false)), is_child_task: false, @@ -241,8 +240,7 @@ impl TestRunner { config: self.config.clone(), runner_id: self.runner_id.clone(), actors: self.actors.clone(), - last_command_idx: self.last_command_idx.clone(), - next_event_idx: self.next_event_idx.clone(), + actor_event_indices: self.actor_event_indices.clone(), event_history: self.event_history.clone(), is_child_task: true, shutdown: self.shutdown.clone(), @@ -263,6 +261,13 @@ impl TestRunner { loop { let runner_id = self.runner_id.lock().await; if let Some(id) = runner_id.as_ref() { + // In MK2, we need to wait for the workflow to process the Init signal + // and mark the runner as eligible for actor allocation. + // This can take some time due to workflow processing: + // 1. Workflow receives Init signal + // 2. Workflow executes MarkEligible activity + // 3. Database is updated with runner allocation index + tokio::time::sleep(Duration::from_millis(2000)).await; return id.clone(); } drop(runner_id); @@ -330,18 +335,12 @@ impl TestRunner { ) } - async fn build_init_message(&self) -> rp::ToServer { - let last_command_idx = *self.last_command_idx.lock().await; - + fn build_init_message(&self) -> rp::ToServer { + // MK2 init doesn't have lastCommandIdx - uses checkpoints instead rp::ToServer::ToServerInit(rp::ToServerInit { name: self.config.runner_name.clone(), version: self.config.version, total_slots: self.config.total_slots, - last_command_idx: if last_command_idx >= 0 { - Some(last_command_idx) - } else { - None - }, prepopulate_actor_names: None, metadata: None, }) @@ -353,7 +352,7 @@ impl TestRunner { mut shutdown_rx: oneshot::Receiver<()>, ) -> Result<()> { // Send init message - let init_msg = self.build_init_message().await; + let init_msg = self.build_init_message(); let encoded = protocol::encode_to_server(init_msg); ws_stream .send(Message::Binary(encoded.into())) @@ -381,11 +380,11 @@ impl TestRunner { break; } - // Send ping - let ping = rp::ToServer::ToServerPing(rp::ToServerPing { + // Send pong (MK2 uses ToServerPong instead of ToServerPing) + let pong = rp::ToServer::ToServerPong(rp::ToServerPong { ts: chrono::Utc::now().timestamp_millis(), }); - let encoded = protocol::encode_to_server(ping); + let encoded = protocol::encode_to_server(pong); ws_stream.send(Message::Binary(encoded.into())).await?; } @@ -456,17 +455,26 @@ impl TestRunner { ws_stream: &mut WsStream, actor_event: ActorEvent, ) -> Result<()> { - let mut idx = self.next_event_idx.lock().await; - let event_wrapper = protocol::make_event_wrapper(*idx, actor_event.event); + // Get next event index for this actor (MK2 uses per-actor checkpoints) + let mut indices = self.actor_event_indices.lock().await; + let idx = indices.entry(actor_event.actor_id.clone()).or_insert(-1); *idx += 1; - drop(idx); + let event_idx = *idx; + drop(indices); + + let event_wrapper = protocol::make_event_wrapper( + &actor_event.actor_id, + actor_event.generation, + event_idx as u64, + actor_event.event, + ); self.event_history.lock().await.push(event_wrapper.clone()); tracing::debug!( actor_id = ?actor_event.actor_id, generation = actor_event.generation, - event_idx = event_wrapper.index, + event_idx = event_idx, "sending actor event" ); @@ -501,30 +509,16 @@ impl TestRunner { Ok(()) } - async fn handle_init(&self, init: rp::ToClientInit, ws_stream: &mut WsStream) -> Result<()> { + async fn handle_init(&self, init: rp::ToClientInit, _ws_stream: &mut WsStream) -> Result<()> { tracing::info!( runner_id = %init.runner_id, - last_event_idx = ?init.last_event_idx, "received init from server" ); *self.runner_id.lock().await = Some(init.runner_id.clone()); - // Resend unacknowledged events - let events = self.event_history.lock().await; - let to_resend: Vec<_> = events - .iter() - .filter(|e| e.index > init.last_event_idx) - .cloned() - .collect(); - drop(events); - - if !to_resend.is_empty() { - tracing::info!(count = to_resend.len(), "resending unacknowledged events"); - let msg = rp::ToServer::ToServerEvents(to_resend); - let encoded = protocol::encode_to_server(msg); - ws_stream.send(Message::Binary(encoded.into())).await?; - } + // MK2 doesn't have lastEventIdx in init - events are acked via checkpoints + // For simplicity, we don't resend events on reconnect in the test runner Ok(()) } @@ -537,22 +531,35 @@ impl TestRunner { tracing::info!(count = commands.len(), "received commands"); for cmd_wrapper in commands { + let checkpoint = &cmd_wrapper.checkpoint; tracing::debug!( - index = cmd_wrapper.index, + actor_id = %checkpoint.actor_id, + generation = checkpoint.generation, + index = checkpoint.index, command = ?cmd_wrapper.inner, "processing command" ); match cmd_wrapper.inner { rp::Command::CommandStartActor(start_cmd) => { - self.handle_start_actor(start_cmd, ws_stream).await?; + self.handle_start_actor( + checkpoint.actor_id.clone(), + checkpoint.generation, + start_cmd, + ws_stream, + ) + .await?; } - rp::Command::CommandStopActor(stop_cmd) => { - self.handle_stop_actor(stop_cmd, ws_stream).await?; + rp::Command::CommandStopActor => { + // MK2 CommandStopActor is void - actor info is in checkpoint + self.handle_stop_actor( + checkpoint.actor_id.clone(), + checkpoint.generation, + ws_stream, + ) + .await?; } } - - *self.last_command_idx.lock().await = cmd_wrapper.index as i64; } Ok(()) @@ -560,12 +567,11 @@ impl TestRunner { async fn handle_start_actor( &self, + actor_id: String, + generation: u32, cmd: rp::CommandStartActor, - ws_stream: &mut WsStream, + _ws_stream: &mut WsStream, ) -> Result<()> { - let actor_id = cmd.actor_id.clone(); - let generation = cmd.generation; - tracing::info!(?actor_id, generation, name = %cmd.config.name, "starting actor"); // Create actor config @@ -656,11 +662,7 @@ impl TestRunner { // Handle start result and send state update via event match start_result { ActorStartResult::Running => { - let event = protocol::make_actor_state_update( - &actor_id, - generation, - rp::ActorState::ActorStateRunning, - ); + let event = protocol::make_actor_state_update(rp::ActorState::ActorStateRunning); self.event_tx .send(ActorEvent { actor_id: actor_id.clone(), @@ -680,11 +682,8 @@ impl TestRunner { "delaying before sending running state" ); tokio::time::sleep(duration).await; - let event = protocol::make_actor_state_update( - &actor_id_clone, - generation, - rp::ActorState::ActorStateRunning, - ); + let event = + protocol::make_actor_state_update(rp::ActorState::ActorStateRunning); event_tx .send(ActorEvent { actor_id: actor_id_clone, @@ -704,18 +703,16 @@ impl TestRunner { } ActorStartResult::Crash { code, message } => { tracing::warn!(?actor_id, generation, code, %message, "actor crashed on start"); - let event = protocol::make_actor_state_update( - &actor_id, - generation, - rp::ActorState::ActorStateStopped(rp::ActorStateStopped { + let event = protocol::make_actor_state_update(rp::ActorState::ActorStateStopped( + rp::ActorStateStopped { code: if code == 0 { rp::StopCode::Ok } else { rp::StopCode::Error }, message: Some(message), - }), - ); + }, + )); let _ = self .event_tx .send(ActorEvent { @@ -733,12 +730,10 @@ impl TestRunner { async fn handle_stop_actor( &self, - cmd: rp::CommandStopActor, + actor_id: String, + generation: u32, ws_stream: &mut WsStream, ) -> Result<()> { - let actor_id = cmd.actor_id.clone(); - let generation = cmd.generation; - tracing::info!(?actor_id, generation, "stopping actor"); // Get actor @@ -819,15 +814,29 @@ impl TestRunner { } async fn handle_ack_events(&self, ack: rp::ToClientAckEvents) { - let last_acked_idx = ack.last_event_idx; + // MK2 uses per-actor checkpoints for acknowledgments + let checkpoints = &ack.last_event_checkpoints; let mut events = self.event_history.lock().await; let original_len = events.len(); - events.retain(|e| e.index > last_acked_idx); + + // Remove events that have been acknowledged based on checkpoints + events.retain(|e| { + // Check if this event's checkpoint is covered by any ack checkpoint + !checkpoints.iter().any(|ck| { + ck.actor_id == e.checkpoint.actor_id + && ck.generation == e.checkpoint.generation + && ck.index >= e.checkpoint.index + }) + }); let pruned = original_len - events.len(); if pruned > 0 { - tracing::debug!(last_acked_idx, pruned, "pruned acknowledged events"); + tracing::debug!( + checkpoint_count = checkpoints.len(), + pruned, + "pruned acknowledged events" + ); } } @@ -838,7 +847,7 @@ impl TestRunner { state: rp::ActorState, ws_stream: &mut WsStream, ) -> Result<()> { - let event = protocol::make_actor_state_update(actor_id, generation, state); + let event = protocol::make_actor_state_update(state); self.send_actor_event( ws_stream, diff --git a/engine/packages/engine/tests/runner_drain_on_version.rs b/engine/packages/engine/tests/runner_drain_on_version.rs new file mode 100644 index 0000000000..4fb33a9679 --- /dev/null +++ b/engine/packages/engine/tests/runner_drain_on_version.rs @@ -0,0 +1,464 @@ +mod common; + +use anyhow::{Result, bail}; +use gas::prelude::Id; +use std::collections::HashMap; +use std::time::Duration; + +/// Helper to wait for a specific runner version to be drained (drain_ts set). +/// Polls the database until the condition is met or timeout occurs. +async fn wait_for_runner_drained( + ctx: &common::TestCtx, + namespace_id: Id, + runner_name: &str, + expected_version: u32, + timeout_secs: u64, +) -> Result<()> { + let start = std::time::Instant::now(); + loop { + let runners_res = ctx + .leader_dc() + .workflow_ctx + .op(pegboard::ops::runner::list_for_ns::Input { + namespace_id, + name: Some(runner_name.to_string()), + include_stopped: true, + created_before: None, + limit: 100, + }) + .await?; + + let is_drained = runners_res + .runners + .iter() + .any(|r| r.version == expected_version && r.drain_ts.is_some()); + + if is_drained { + return Ok(()); + } + + if start.elapsed() > std::time::Duration::from_secs(timeout_secs) { + let versions: Vec<_> = runners_res + .runners + .iter() + .map(|r| (r.version, r.drain_ts.is_some())) + .collect(); + bail!( + "timeout waiting for runner v{} to be drained. Current runners (version, is_drained): {:?}", + expected_version, + versions + ); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + } +} + +/// Helper to wait for multiple runner versions to be drained. +async fn wait_for_runners_drained( + ctx: &common::TestCtx, + namespace_id: Id, + runner_name: &str, + expected_versions: &[u32], + timeout_secs: u64, +) -> Result<()> { + let start = std::time::Instant::now(); + loop { + let runners_res = ctx + .leader_dc() + .workflow_ctx + .op(pegboard::ops::runner::list_for_ns::Input { + namespace_id, + name: Some(runner_name.to_string()), + include_stopped: true, + created_before: None, + limit: 100, + }) + .await?; + + let all_drained = expected_versions.iter().all(|&version| { + runners_res + .runners + .iter() + .any(|r| r.version == version && r.drain_ts.is_some()) + }); + + if all_drained { + return Ok(()); + } + + if start.elapsed() > std::time::Duration::from_secs(timeout_secs) { + let versions: Vec<_> = runners_res + .runners + .iter() + .map(|r| (r.version, r.drain_ts.is_some())) + .collect(); + bail!( + "timeout waiting for runners {:?} to be drained. Current runners (version, is_drained): {:?}", + expected_versions, + versions + ); + } + + tokio::time::sleep(Duration::from_millis(100)).await; + } +} + +// MARK: Normal runner drain tests + +#[test] +fn drain_on_version_upgrade_normal_runner() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, namespace_id) = common::setup_test_namespace(ctx.leader_dc()).await; + + let runner_name = "drain-test-normal"; + + // Create runner config with drain_on_version_upgrade enabled + let mut datacenters = HashMap::new(); + datacenters.insert( + "dc-1".to_string(), + rivet_api_types::namespaces::runner_configs::RunnerConfig { + kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, + metadata: None, + drain_on_version_upgrade: true, + }, + ); + + common::api::public::runner_configs_upsert( + ctx.leader_dc().guard_port(), + rivet_api_peer::runner_configs::UpsertPath { + runner_name: runner_name.to_string(), + }, + rivet_api_peer::runner_configs::UpsertQuery { + namespace: namespace.clone(), + }, + rivet_api_public::runner_configs::upsert::UpsertRequest { datacenters }, + ) + .await + .expect("failed to upsert runner config"); + + // Start runner v1 + let runner_v1 = common::test_runner::TestRunnerBuilder::new(&namespace) + .with_runner_name(runner_name) + .with_runner_key("key-v1") + .with_version(1) + .with_total_slots(10) + .with_actor_behavior("test-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .build(ctx.leader_dc()) + .await + .expect("failed to build runner v1"); + + runner_v1.start().await.expect("failed to start runner v1"); + let runner_v1_id = runner_v1.wait_ready().await; + + tracing::info!(%runner_v1_id, "runner v1 ready"); + + // Wait for runner to be registered + tokio::time::sleep(Duration::from_millis(500)).await; + + // Start runner v2 - should trigger drain of v1 + let runner_v2 = common::test_runner::TestRunnerBuilder::new(&namespace) + .with_runner_name(runner_name) + .with_runner_key("key-v2") + .with_version(2) + .with_total_slots(10) + .with_actor_behavior("test-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .build(ctx.leader_dc()) + .await + .expect("failed to build runner v2"); + + runner_v2.start().await.expect("failed to start runner v2"); + let runner_v2_id = runner_v2.wait_ready().await; + + tracing::info!(%runner_v2_id, "runner v2 ready"); + + // Wait for v1 to be drained + wait_for_runner_drained(&ctx, namespace_id, runner_name, 1, 10) + .await + .expect("v1 runner should be drained"); + + // Cleanup + runner_v2.shutdown().await; + }); +} + +#[test] +fn drain_on_version_upgrade_disabled_normal_runner() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, namespace_id) = common::setup_test_namespace(ctx.leader_dc()).await; + + let runner_name = "no-drain-test-normal"; + + // Create runner config with drain_on_version_upgrade disabled + let mut datacenters = HashMap::new(); + datacenters.insert( + "dc-1".to_string(), + rivet_api_types::namespaces::runner_configs::RunnerConfig { + kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, + metadata: None, + drain_on_version_upgrade: false, + }, + ); + + common::api::public::runner_configs_upsert( + ctx.leader_dc().guard_port(), + rivet_api_peer::runner_configs::UpsertPath { + runner_name: runner_name.to_string(), + }, + rivet_api_peer::runner_configs::UpsertQuery { + namespace: namespace.clone(), + }, + rivet_api_public::runner_configs::upsert::UpsertRequest { datacenters }, + ) + .await + .expect("failed to upsert runner config"); + + // Start runner v1 + let runner_v1 = common::test_runner::TestRunnerBuilder::new(&namespace) + .with_runner_name(runner_name) + .with_runner_key("key-v1") + .with_version(1) + .with_total_slots(10) + .with_actor_behavior("test-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .build(ctx.leader_dc()) + .await + .expect("failed to build runner v1"); + + runner_v1.start().await.expect("failed to start runner v1"); + runner_v1.wait_ready().await; + + // Wait for runner to be registered + tokio::time::sleep(Duration::from_millis(500)).await; + + // Start runner v2 + let runner_v2 = common::test_runner::TestRunnerBuilder::new(&namespace) + .with_runner_name(runner_name) + .with_runner_key("key-v2") + .with_version(2) + .with_total_slots(10) + .with_actor_behavior("test-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .build(ctx.leader_dc()) + .await + .expect("failed to build runner v2"); + + runner_v2.start().await.expect("failed to start runner v2"); + runner_v2.wait_ready().await; + + tokio::time::sleep(Duration::from_secs(1)).await; + + // Both runners should still be active + let runners_res = ctx + .leader_dc() + .workflow_ctx + .op(pegboard::ops::runner::list_for_ns::Input { + namespace_id, + name: Some(runner_name.to_string()), + include_stopped: false, + created_before: None, + limit: 100, + }) + .await + .expect("failed to list runners"); + + let active_runners: Vec<_> = runners_res + .runners + .iter() + .filter(|r| r.stop_ts.is_none()) + .collect(); + + assert_eq!( + active_runners.len(), + 2, + "Both runners should be active when drain is disabled" + ); + + // Cleanup + runner_v1.shutdown().await; + runner_v2.shutdown().await; + }); +} + +// MARK: Serverless runner drain tests + +#[test] +fn drain_on_version_upgrade_serverless_runner() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, namespace_id) = common::setup_test_namespace(ctx.leader_dc()).await; + + let runner_name = "drain-test-serverless"; + + // Create serverless runner config with drain_on_version_upgrade enabled + let mut datacenters = HashMap::new(); + datacenters.insert( + "dc-1".to_string(), + rivet_api_types::namespaces::runner_configs::RunnerConfig { + kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Serverless { + url: "http://example.com".to_string(), + headers: None, + request_lifespan: 30, + slots_per_runner: 10, + min_runners: Some(1), + max_runners: 5, + runners_margin: Some(2), + }, + metadata: None, + drain_on_version_upgrade: true, + }, + ); + + common::api::public::runner_configs_upsert( + ctx.leader_dc().guard_port(), + rivet_api_peer::runner_configs::UpsertPath { + runner_name: runner_name.to_string(), + }, + rivet_api_peer::runner_configs::UpsertQuery { + namespace: namespace.clone(), + }, + rivet_api_public::runner_configs::upsert::UpsertRequest { datacenters }, + ) + .await + .expect("failed to upsert serverless runner config"); + + // Start runner v1 + let runner_v1 = common::test_runner::TestRunnerBuilder::new(&namespace) + .with_runner_name(runner_name) + .with_runner_key("key-v1") + .with_version(1) + .with_total_slots(10) + .with_actor_behavior("test-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .build(ctx.leader_dc()) + .await + .expect("failed to build runner v1"); + + runner_v1.start().await.expect("failed to start runner v1"); + runner_v1.wait_ready().await; + + tokio::time::sleep(Duration::from_millis(500)).await; + + // Start runner v2 - should trigger drain of v1 + let runner_v2 = common::test_runner::TestRunnerBuilder::new(&namespace) + .with_runner_name(runner_name) + .with_runner_key("key-v2") + .with_version(2) + .with_total_slots(10) + .with_actor_behavior("test-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .build(ctx.leader_dc()) + .await + .expect("failed to build runner v2"); + + runner_v2.start().await.expect("failed to start runner v2"); + runner_v2.wait_ready().await; + + // Wait for v1 to be drained + wait_for_runner_drained(&ctx, namespace_id, runner_name, 1, 10) + .await + .expect("v1 runner should be drained"); + + // Cleanup + runner_v2.shutdown().await; + }); +} + +#[test] +fn drain_on_version_upgrade_multiple_older_versions() { + common::run(common::TestOpts::new(1), |ctx| async move { + let (namespace, namespace_id) = common::setup_test_namespace(ctx.leader_dc()).await; + + let runner_name = "drain-test-multiple"; + + // Create runner config with drain_on_version_upgrade enabled + let mut datacenters = HashMap::new(); + datacenters.insert( + "dc-1".to_string(), + rivet_api_types::namespaces::runner_configs::RunnerConfig { + kind: rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Normal {}, + metadata: None, + drain_on_version_upgrade: true, + }, + ); + + common::api::public::runner_configs_upsert( + ctx.leader_dc().guard_port(), + rivet_api_peer::runner_configs::UpsertPath { + runner_name: runner_name.to_string(), + }, + rivet_api_peer::runner_configs::UpsertQuery { + namespace: namespace.clone(), + }, + rivet_api_public::runner_configs::upsert::UpsertRequest { datacenters }, + ) + .await + .expect("failed to upsert runner config"); + + // Start runners v1 and v2 + let runner_v1 = common::test_runner::TestRunnerBuilder::new(&namespace) + .with_runner_name(runner_name) + .with_runner_key("key-v1") + .with_version(1) + .with_total_slots(10) + .with_actor_behavior("test-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .build(ctx.leader_dc()) + .await + .expect("failed to build runner v1"); + + runner_v1.start().await.expect("failed to start runner v1"); + runner_v1.wait_ready().await; + + tokio::time::sleep(Duration::from_millis(300)).await; + + let runner_v2 = common::test_runner::TestRunnerBuilder::new(&namespace) + .with_runner_name(runner_name) + .with_runner_key("key-v2") + .with_version(2) + .with_total_slots(10) + .with_actor_behavior("test-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .build(ctx.leader_dc()) + .await + .expect("failed to build runner v2"); + + runner_v2.start().await.expect("failed to start runner v2"); + runner_v2.wait_ready().await; + + tokio::time::sleep(Duration::from_millis(500)).await; + + // Start runner v3 - should drain both v1 and v2 + let runner_v3 = common::test_runner::TestRunnerBuilder::new(&namespace) + .with_runner_name(runner_name) + .with_runner_key("key-v3") + .with_version(3) + .with_total_slots(10) + .with_actor_behavior("test-actor", |_| { + Box::new(common::test_runner::EchoActor::new()) + }) + .build(ctx.leader_dc()) + .await + .expect("failed to build runner v3"); + + runner_v3.start().await.expect("failed to start runner v3"); + runner_v3.wait_ready().await; + + // Wait for v1 and v2 to be drained + wait_for_runners_drained(&ctx, namespace_id, runner_name, &[1, 2], 10) + .await + .expect("v1 and v2 runners should be drained"); + + // Cleanup + runner_v3.shutdown().await; + }); +} diff --git a/engine/packages/pegboard/src/metrics.rs b/engine/packages/pegboard/src/metrics.rs index cc7754331c..502944d755 100644 --- a/engine/packages/pegboard/src/metrics.rs +++ b/engine/packages/pegboard/src/metrics.rs @@ -15,4 +15,11 @@ lazy_static::lazy_static! { MICRO_BUCKETS.to_vec(), *REGISTRY ).unwrap(); + + pub static ref RUNNER_VERSION_UPGRADE_DRAIN: IntCounterVec = register_int_counter_vec_with_registry!( + "pegboard_runner_version_upgrade_drain", + "Count of runners drained due to version upgrade.", + &["namespace_id", "runner_name"], + *REGISTRY + ).unwrap(); } diff --git a/engine/packages/pegboard/src/ops/runner/drain.rs b/engine/packages/pegboard/src/ops/runner/drain.rs new file mode 100644 index 0000000000..0e79b74e0d --- /dev/null +++ b/engine/packages/pegboard/src/ops/runner/drain.rs @@ -0,0 +1,115 @@ +use anyhow::Result; +use futures_util::TryStreamExt; +use gas::prelude::*; +use rivet_metrics::KeyValue; +use universaldb::options::StreamingMode; +use universaldb::utils::IsolationLevel::*; + +use crate::{keys, metrics}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Input { + pub namespace_id: Id, + pub name: String, + pub version: u32, + /// Whether to publish drain signals via pubsub. Set to false if the caller + /// will handle sending signals (e.g. from a workflow). + pub send_runner_stop_signals: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Output { + pub older_runner_workflow_ids: Vec, +} + +#[operation] +pub async fn pegboard_runner_drain_older_versions( + ctx: &OperationCtx, + input: &Input, +) -> Result { + let configs = ctx + .op(crate::ops::runner_config::get::Input { + runners: vec![(input.namespace_id, input.name.clone())], + bypass_cache: false, + }) + .await?; + + // Use config's drain_on_version_upgrade if config exists, otherwise default to false + let drain_enabled = configs + .into_iter() + .next() + .map(|c| c.config.drain_on_version_upgrade) + .unwrap_or(false); + + if !drain_enabled { + return Ok(Output { + older_runner_workflow_ids: vec![], + }); + } + + // Scan RunnerAllocIdxKey for older versions + let older_runners = ctx + .udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + let mut older_runners = Vec::new(); + + let runner_alloc_subspace = keys::subspace().subspace( + &keys::ns::RunnerAllocIdxKey::subspace(input.namespace_id, input.name.clone()), + ); + + let mut stream = tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(&runner_alloc_subspace).into() + }, + Snapshot, + ); + + while let Some(entry) = stream.try_next().await? { + let (key, data) = tx.read_entry::(&entry)?; + + // Only collect runners with older versions + if key.version < input.version { + older_runners.push(data.workflow_id); + } + } + + Ok(older_runners) + }) + .custom_instrument(tracing::info_span!("drain_older_versions_tx")) + .await?; + + if !older_runners.is_empty() { + tracing::info!( + namespace_id = %input.namespace_id, + runner_name = %input.name, + new_version = input.version, + older_runner_count = older_runners.len(), + "draining older runner versions due to drain_on_version_upgrade" + ); + + metrics::RUNNER_VERSION_UPGRADE_DRAIN.add( + older_runners.len() as u64, + &[ + KeyValue::new("namespace_id", input.namespace_id.to_string()), + KeyValue::new("runner_name", input.name.clone()), + ], + ); + + if input.send_runner_stop_signals { + for workflow_id in &older_runners { + ctx.signal(crate::workflows::runner2::Stop { + reset_actor_rescheduling: false, + }) + .to_workflow_id(*workflow_id) + .send() + .await?; + } + } + } + + Ok(Output { + older_runner_workflow_ids: older_runners, + }) +} diff --git a/engine/packages/pegboard/src/ops/runner/mod.rs b/engine/packages/pegboard/src/ops/runner/mod.rs index 601f9bd3ee..2c7c189174 100644 --- a/engine/packages/pegboard/src/ops/runner/mod.rs +++ b/engine/packages/pegboard/src/ops/runner/mod.rs @@ -1,3 +1,4 @@ +pub mod drain; pub mod find_dc_with_runner; pub mod get; pub mod get_by_key; diff --git a/engine/packages/pegboard/src/workflows/runner.rs b/engine/packages/pegboard/src/workflows/runner.rs index 5ffb3e8665..7420d2103a 100644 --- a/engine/packages/pegboard/src/workflows/runner.rs +++ b/engine/packages/pegboard/src/workflows/runner.rs @@ -131,6 +131,9 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> create_ts: ctx.create_ts(), }) .await?; + + // NOTE: This intentionally does not implement drain_on_version_upgrade + // like runner2.rs since this workflow is legacy. } // Check for pending actors (which happen when there is not enough runner capacity) diff --git a/engine/packages/pegboard/src/workflows/runner2.rs b/engine/packages/pegboard/src/workflows/runner2.rs index 04cab6eb9a..b938eeeff8 100644 --- a/engine/packages/pegboard/src/workflows/runner2.rs +++ b/engine/packages/pegboard/src/workflows/runner2.rs @@ -64,6 +64,23 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<() }) .await?; + // Drain older runner versions if configured + let drain_result = ctx + .activity(DrainOlderVersionsInput { + namespace_id: input.namespace_id, + name: input.name.clone(), + version: input.version, + }) + .await?; + for workflow_id in drain_result.older_runner_workflow_ids { + ctx.signal(Stop { + reset_actor_rescheduling: false, + }) + .to_workflow_id(workflow_id) + .send() + .await?; + } + // Check for pending actors (which happen when there is not enough runner capacity) let res = ctx .activity(AllocatePendingActorsInput { @@ -827,6 +844,34 @@ async fn send_messages_to_runner( Ok(()) } +#[derive(Debug, Serialize, Deserialize, Hash)] +struct DrainOlderVersionsInput { + namespace_id: Id, + name: String, + version: u32, +} + +#[activity(DrainOlderVersions)] +async fn drain_older_versions( + ctx: &ActivityCtx, + input: &DrainOlderVersionsInput, +) -> Result { + tracing::info!( + namespace_id = %input.namespace_id, + name = %input.name, + version = input.version, + "drain_older_versions activity called" + ); + ctx.op(crate::ops::runner::drain::Input { + namespace_id: input.namespace_id, + name: input.name.clone(), + version: input.version, + // Signals are sent by the workflow directly + send_runner_stop_signals: false, + }) + .await +} + #[signal("pegboard_runner_init")] pub struct Init {} diff --git a/engine/packages/types/src/runner_configs.rs b/engine/packages/types/src/runner_configs.rs index b7ba3fa4d3..01add46f14 100644 --- a/engine/packages/types/src/runner_configs.rs +++ b/engine/packages/types/src/runner_configs.rs @@ -9,6 +9,8 @@ pub struct RunnerConfig { pub kind: RunnerConfigKind, #[serde(default, skip_serializing_if = "Option::is_none")] pub metadata: Option, + #[serde(default = "default_drain_on_version_upgrade")] + pub drain_on_version_upgrade: bool, } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] @@ -27,14 +29,23 @@ pub enum RunnerConfigKind { }, } -impl From for rivet_data::generated::namespace_runner_config_v2::RunnerConfig { +fn default_drain_on_version_upgrade() -> bool { + false +} + +impl From for rivet_data::generated::namespace_runner_config_v3::RunnerConfig { fn from(value: RunnerConfig) -> Self { - let RunnerConfig { kind, metadata } = value; - rivet_data::generated::namespace_runner_config_v2::RunnerConfig { + let RunnerConfig { + kind, + metadata, + drain_on_version_upgrade, + } = value; + rivet_data::generated::namespace_runner_config_v3::RunnerConfig { metadata: metadata.and_then(|value| serde_json::to_string(&value).ok()), + drain_on_version_upgrade, kind: match kind { RunnerConfigKind::Normal {} => { - rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Normal + rivet_data::generated::namespace_runner_config_v3::RunnerConfigKind::Normal } RunnerConfigKind::Serverless { url, @@ -45,8 +56,8 @@ impl From for rivet_data::generated::namespace_runner_config_v2::R max_runners, runners_margin, } => { - rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Serverless( - rivet_data::generated::namespace_runner_config_v2::Serverless { + rivet_data::generated::namespace_runner_config_v3::RunnerConfigKind::Serverless( + rivet_data::generated::namespace_runner_config_v3::Serverless { url, headers: headers.into(), request_lifespan, @@ -62,17 +73,21 @@ impl From for rivet_data::generated::namespace_runner_config_v2::R } } -impl From for RunnerConfig { - fn from(value: rivet_data::generated::namespace_runner_config_v2::RunnerConfig) -> Self { - let rivet_data::generated::namespace_runner_config_v2::RunnerConfig { metadata, kind } = - value; +impl From for RunnerConfig { + fn from(value: rivet_data::generated::namespace_runner_config_v3::RunnerConfig) -> Self { + let rivet_data::generated::namespace_runner_config_v3::RunnerConfig { + metadata, + kind, + drain_on_version_upgrade, + } = value; RunnerConfig { metadata: metadata.and_then(|raw| serde_json::from_str(&raw).ok()), + drain_on_version_upgrade, kind: match kind { - rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Normal => { + rivet_data::generated::namespace_runner_config_v3::RunnerConfigKind::Normal => { RunnerConfigKind::Normal {} } - rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Serverless( + rivet_data::generated::namespace_runner_config_v3::RunnerConfigKind::Serverless( o, ) => RunnerConfigKind::Serverless { url: o.url, diff --git a/engine/sdks/rust/data/src/lib.rs b/engine/sdks/rust/data/src/lib.rs index 5f2690d4ae..1e652d7279 100644 --- a/engine/sdks/rust/data/src/lib.rs +++ b/engine/sdks/rust/data/src/lib.rs @@ -6,6 +6,6 @@ pub const PEGBOARD_RUNNER_ADDRESS_VERSION: u16 = 1; pub const PEGBOARD_RUNNER_METADATA_VERSION: u16 = 1; pub const PEGBOARD_NAMESPACE_ACTOR_BY_KEY_VERSION: u16 = 1; pub const PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION: u16 = 2; -pub const PEGBOARD_NAMESPACE_RUNNER_CONFIG_VERSION: u16 = 2; +pub const PEGBOARD_NAMESPACE_RUNNER_CONFIG_VERSION: u16 = 3; pub const PEGBOARD_NAMESPACE_RUNNER_BY_KEY_VERSION: u16 = 1; pub const PEGBOARD_NAMESPACE_ACTOR_NAME_VERSION: u16 = 1; diff --git a/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs b/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs index 67d62d5050..8f108402f6 100644 --- a/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs +++ b/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs @@ -6,18 +6,19 @@ use crate::generated::*; pub enum NamespaceRunnerConfig { V1(namespace_runner_config_v1::Data), V2(namespace_runner_config_v2::RunnerConfig), + V3(namespace_runner_config_v3::RunnerConfig), } impl OwnedVersionedData for NamespaceRunnerConfig { - type Latest = namespace_runner_config_v2::RunnerConfig; + type Latest = namespace_runner_config_v3::RunnerConfig; - fn wrap_latest(latest: namespace_runner_config_v2::RunnerConfig) -> Self { - NamespaceRunnerConfig::V2(latest) + fn wrap_latest(latest: namespace_runner_config_v3::RunnerConfig) -> Self { + NamespaceRunnerConfig::V3(latest) } fn unwrap_latest(self) -> Result { #[allow(irrefutable_let_patterns)] - if let NamespaceRunnerConfig::V2(data) = self { + if let NamespaceRunnerConfig::V3(data) = self { Ok(data) } else { bail!("version not latest"); @@ -28,6 +29,7 @@ impl OwnedVersionedData for NamespaceRunnerConfig { match version { 1 => Ok(NamespaceRunnerConfig::V1(serde_bare::from_slice(payload)?)), 2 => Ok(NamespaceRunnerConfig::V2(serde_bare::from_slice(payload)?)), + 3 => Ok(NamespaceRunnerConfig::V3(serde_bare::from_slice(payload)?)), _ => bail!("invalid version: {version}"), } } @@ -36,15 +38,16 @@ impl OwnedVersionedData for NamespaceRunnerConfig { match self { NamespaceRunnerConfig::V1(data) => serde_bare::to_vec(&data).map_err(Into::into), NamespaceRunnerConfig::V2(data) => serde_bare::to_vec(&data).map_err(Into::into), + NamespaceRunnerConfig::V3(data) => serde_bare::to_vec(&data).map_err(Into::into), } } fn deserialize_converters() -> Vec Result> { - vec![Self::v1_to_v2] + vec![Self::v1_to_v2, Self::v2_to_v3] } fn serialize_converters() -> Vec Result> { - vec![Self::v2_to_v1] + vec![Self::v3_to_v2, Self::v2_to_v1] } } @@ -122,4 +125,71 @@ impl NamespaceRunnerConfig { bail!("unexpected version"); } } + + fn v2_to_v3(self) -> Result { + if let NamespaceRunnerConfig::V2(config) = self { + let namespace_runner_config_v2::RunnerConfig { kind, metadata } = config; + + let kind = match kind { + namespace_runner_config_v2::RunnerConfigKind::Serverless(serverless) => { + namespace_runner_config_v3::RunnerConfigKind::Serverless( + namespace_runner_config_v3::Serverless { + url: serverless.url, + headers: serverless.headers, + request_lifespan: serverless.request_lifespan, + slots_per_runner: serverless.slots_per_runner, + min_runners: serverless.min_runners, + max_runners: serverless.max_runners, + runners_margin: serverless.runners_margin, + }, + ) + } + namespace_runner_config_v2::RunnerConfigKind::Normal => { + namespace_runner_config_v3::RunnerConfigKind::Normal + } + }; + + Ok(NamespaceRunnerConfig::V3( + namespace_runner_config_v3::RunnerConfig { + kind, + metadata, + // Default to false for v2 -> v3 migration + drain_on_version_upgrade: false, + }, + )) + } else { + bail!("unexpected version"); + } + } + + fn v3_to_v2(self) -> Result { + if let NamespaceRunnerConfig::V3(config) = self { + let namespace_runner_config_v3::RunnerConfig { kind, metadata, .. } = config; + + let kind = match kind { + namespace_runner_config_v3::RunnerConfigKind::Serverless(serverless) => { + namespace_runner_config_v2::RunnerConfigKind::Serverless( + namespace_runner_config_v2::Serverless { + url: serverless.url, + headers: serverless.headers, + request_lifespan: serverless.request_lifespan, + slots_per_runner: serverless.slots_per_runner, + min_runners: serverless.min_runners, + max_runners: serverless.max_runners, + runners_margin: serverless.runners_margin, + }, + ) + } + namespace_runner_config_v3::RunnerConfigKind::Normal => { + namespace_runner_config_v2::RunnerConfigKind::Normal + } + }; + + Ok(NamespaceRunnerConfig::V2( + namespace_runner_config_v2::RunnerConfig { kind, metadata }, + )) + } else { + bail!("unexpected version"); + } + } } diff --git a/engine/sdks/rust/runner-protocol/src/util.rs b/engine/sdks/rust/runner-protocol/src/util.rs index 9a4034963e..3c996517aa 100644 --- a/engine/sdks/rust/runner-protocol/src/util.rs +++ b/engine/sdks/rust/runner-protocol/src/util.rs @@ -1,14 +1,14 @@ /// Generate a new 4-byte gateway ID from a random u32 -pub fn generate_gateway_id() -> crate::GatewayId { +pub fn generate_gateway_id() -> crate::mk2::GatewayId { rand::random::().to_le_bytes() } /// Generate a new 4-byte request ID from a random u32 -pub fn generate_request_id() -> crate::RequestId { +pub fn generate_request_id() -> crate::mk2::RequestId { rand::random::().to_le_bytes() } /// Convert a GatewayId to a hex string -pub fn id_to_string(gateway_id: &crate::GatewayId) -> String { +pub fn id_to_string(gateway_id: &crate::mk2::GatewayId) -> String { hex::encode(gateway_id) } diff --git a/engine/sdks/schemas/data/namespace.runner_config.v3.bare b/engine/sdks/schemas/data/namespace.runner_config.v3.bare new file mode 100644 index 0000000000..61cbb1447f --- /dev/null +++ b/engine/sdks/schemas/data/namespace.runner_config.v3.bare @@ -0,0 +1,24 @@ +type Json str + +type Serverless struct { + url: str + headers: map + request_lifespan: u32 + slots_per_runner: u32 + min_runners: u32 + max_runners: u32 + runners_margin: u32 +} + +type Normal void + +type RunnerConfigKind union { + Serverless | + Normal +} + +type RunnerConfig struct { + kind: RunnerConfigKind + metadata: optional + drain_on_version_upgrade: bool +} diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index 80a28eff0d..e6c1cbd65a 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -79,7 +79,6 @@ export class EngineActorDriver implements ActorDriver { #runner: Runner; #actors: Map = new Map(); #actorRouter: ActorRouter; - #version: number = 1; // Version for the runner protocol #alarmTimeout?: LongTimeoutHandle; #runnerStarted: PromiseWithResolvers = promiseWithResolvers(); @@ -139,7 +138,7 @@ export class EngineActorDriver implements ActorDriver { // Create runner configuration const engineRunnerConfig: EngineRunnerConfig = { - version: this.#version, + version: config.runner.version, endpoint: getEndpoint(config), token, namespace: config.namespace, diff --git a/rivetkit-typescript/packages/rivetkit/src/registry/config/runner.ts b/rivetkit-typescript/packages/rivetkit/src/registry/config/runner.ts index 83a9cb035a..ee3ccd209e 100644 --- a/rivetkit-typescript/packages/rivetkit/src/registry/config/runner.ts +++ b/rivetkit-typescript/packages/rivetkit/src/registry/config/runner.ts @@ -4,6 +4,7 @@ import { getRivetTotalSlots, getRivetRunner, getRivetRunnerKey, + getRivetRunnerVersion, } from "@/utils/env-vars"; export const RunnerConfigSchema = z.object({ @@ -14,6 +15,7 @@ export const RunnerConfigSchema = z.object({ .string() .optional() .transform((x) => x ?? getRivetRunnerKey()), + version: z.number().default(() => getRivetRunnerVersion() ?? 1), }); export type RunnerConfigInput = z.input; export type RunnerConfig = z.infer; diff --git a/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts b/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts index 1edb923766..775355204d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts +++ b/rivetkit-typescript/packages/rivetkit/src/utils/env-vars.ts @@ -25,6 +25,10 @@ export const getRivetRunEngineVersion = (): string | undefined => getEnvUniversal("RIVET_RUN_ENGINE_VERSION"); export const getRivetRunnerKind = (): string | undefined => getEnvUniversal("RIVET_RUNNER_KIND"); +export const getRivetRunnerVersion = (): number | undefined => { + const value = getEnvUniversal("RIVET_RUNNER_VERSION"); + return value !== undefined ? parseInt(value, 10) : undefined; +}; // RivetKit configuration export const getRivetkitInspectorToken = (): string | undefined =>