diff --git a/crates/api/src/lib.rs b/crates/api/src/lib.rs index 61d38768..57f8a7a9 100644 --- a/crates/api/src/lib.rs +++ b/crates/api/src/lib.rs @@ -374,7 +374,6 @@ pub struct SigningInProgressReport { pub rrsig_count: Option, pub rrsig_reused_count: Option, pub rrsig_time: Option, - pub insertion_time: Option, pub total_time: Option, pub threads_used: Option, } @@ -392,7 +391,6 @@ pub struct SigningFinishedReport { pub rrsig_count: usize, pub rrsig_reused_count: usize, pub rrsig_time: Duration, - pub insertion_time: Duration, pub total_time: Duration, pub threads_used: usize, pub finished_at: SystemTime, diff --git a/crates/cli/src/commands/zone.rs b/crates/cli/src/commands/zone.rs index d496531f..178e4abe 100644 --- a/crates/cli/src/commands/zone.rs +++ b/crates/cli/src/commands/zone.rs @@ -916,15 +916,6 @@ impl Progress { rrsig_count / (rrsig_time.as_secs() as usize) ); } - if let (Some(rrsig_count), Some(insertion_time)) = - (r.rrsig_count, r.insertion_time) - { - println!( - " Inserted signatures in {} ({} sig/s)", - format_duration(insertion_time), - rrsig_count / (insertion_time.as_secs() as usize) - ); - } if let Some(threads_used) = r.threads_used { println!(" Using {threads_used} threads to generate signatures"); } @@ -961,13 +952,6 @@ impl Progress { .checked_div(r.rrsig_time.as_secs() as usize) .unwrap_or(r.rrsig_count), ); - println!( - " Inserted signatures in {} ({} sig/s)", - format_duration(r.insertion_time), - r.rrsig_count - .checked_div(r.insertion_time.as_secs() as usize) - .unwrap_or(r.rrsig_count), - ); println!( " Took {} in total, using {} threads", format_duration(r.total_time), diff --git a/src/signer/mod.rs b/src/signer/mod.rs index 31be3b79..abc157e7 100644 --- a/src/signer/mod.rs +++ b/src/signer/mod.rs @@ -5,12 +5,11 @@ use std::sync::Arc; use cascade_zonedata::SignedZoneBuilder; -use tracing::{debug, error}; +use tracing::error; use crate::{ center::{Center, halt_zone}, - manager::record_zone_event, - zone::{HistoricalEvent, SigningTrigger, Zone}, + zone::{HistoricalEvent, SigningTrigger, Zone, ZoneHandle}, }; pub mod zone; @@ -30,38 +29,59 @@ pub mod zone; async fn sign( center: Arc
, zone: Arc, - builder: SignedZoneBuilder, + mut builder: SignedZoneBuilder, trigger: SigningTrigger, ) { - match center - .signer - .join_sign_zone_queue(¢er, &zone.name, !builder.have_next_loaded(), trigger) - .await - { - Ok(()) => {} - Err(error) if error.is_benign() => { - // Ignore this benign case. It was probably caused by dnst keyset - // cron triggering resigning before we even signed the first time, - // either because the zone was large and slow to load and sign, or - // because the unsigned zone was pending review. - debug!("Ignoring probably benign failure: {error}"); + let (status, _permits) = center.signer.wait_to_sign(&zone).await; + + let (result, builder) = tokio::task::spawn_blocking({ + let center = center.clone(); + let zone = zone.clone(); + let status = status.clone(); + move || { + let result = center + .signer + .sign_zone(¢er, &zone, &mut builder, trigger, status); + (result, builder) } - Err(error) => { - error!("Signing failed: {error}"); + }) + .await + .unwrap(); - // TODO: Inline these methods and use a single 'ZoneState' lock. + let mut status = status.write().unwrap(); + let mut state = zone.state.lock().unwrap(); + let mut handle = ZoneHandle { + zone: &zone, + state: &mut state, + center: ¢er, + }; + handle.state.signer.ongoing.finish(); - halt_zone(¢er, &zone.name, true, &error.to_string()); + match result { + Ok(()) => { + let built = builder.finish().unwrap_or_else(|_| unreachable!()); + handle.storage().finish_sign(built); + status.status.finish(true); + status.current_action = "Finished".to_string(); + } + Err(error) => { + error!("Signing failed: {error}"); + handle.storage().give_up_sign(builder); + status.status.finish(false); + status.current_action = "Aborted".to_string(); - record_zone_event( - ¢er, - &zone.name, + handle.state.record_event( HistoricalEvent::SigningFailed { trigger, reason: error.to_string(), }, None, // TODO ); + + std::mem::drop(state); + + // TODO: Inline. + halt_zone(¢er, &zone.name, true, &error.to_string()); } } } diff --git a/src/signer/zone.rs b/src/signer/zone.rs index 8e1bd922..462906f8 100644 --- a/src/signer/zone.rs +++ b/src/signer/zone.rs @@ -3,10 +3,11 @@ use std::{sync::Arc, time::SystemTime}; use cascade_zonedata::SignedZoneBuilder; +use tracing::info; use crate::{ center::Center, - util::AbortOnDrop, + util::BackgroundTasks, zone::{SigningTrigger, Zone, ZoneHandle, ZoneState}, }; @@ -35,28 +36,36 @@ impl SignerZoneHandle<'_> { } /// Enqueue a signing operation for a newly loaded instance of the zone. + #[tracing::instrument( + level = "trace", + skip_all, + fields(zone = %self.zone.name) + )] pub fn enqueue_sign(&mut self, builder: SignedZoneBuilder) { + info!("Enqueuing a sign operation"); + // A zone can have at most one 'SignedZoneBuilder' at a time. Because // we have 'builder', we are guaranteed that no other signing operations // are ongoing right now. A re-signing operation may be enqueued, but it // has lower priority than this (for now). assert!(self.state.signer.enqueued_sign.is_none()); - assert!(self.state.signer.ongoing.is_none()); // TODO: Keep state for a queue of pending (re-)signing operations, so // that the number of simultaneous operations can be limited. At the // moment, this queue is opaque and is handled within the asynchronous // task. - let handle = tokio::task::spawn(super::sign( - self.center.clone(), - self.zone.clone(), - builder, - SigningTrigger::ZoneChangesApproved, - )); - - self.state.signer.ongoing = Some(handle.into()); + let span = tracing::Span::none(); + self.state.signer.ongoing.spawn( + span, + super::sign( + self.center.clone(), + self.zone.clone(), + builder, + SigningTrigger::ZoneChangesApproved, + ), + ); } /// Enqueue a re-signing operation for the zone. @@ -64,7 +73,14 @@ impl SignerZoneHandle<'_> { /// ## Panics /// /// Panics if `keys_changed` and `sigs_need_refresh` are both `false`. + #[tracing::instrument( + level = "trace", + skip_all, + fields(zone = %self.zone.name, keys_changed, sigs_need_refresh) + )] pub fn enqueue_resign(&mut self, keys_changed: bool, sigs_need_refresh: bool) { + info!("Enqueuing a re-sign operation"); + assert!( keys_changed || sigs_need_refresh, "a reason for re-signing was not specified" @@ -93,7 +109,6 @@ impl SignerZoneHandle<'_> { // may be enqueued, but it has lower priority than this (for now). assert!(self.state.signer.enqueued_sign.is_none()); - assert!(self.state.signer.ongoing.is_none()); // TODO: 'SigningTrigger' can't express multiple reasons. let trigger = if keys_changed { @@ -102,14 +117,11 @@ impl SignerZoneHandle<'_> { SigningTrigger::SignatureExpiration }; - let handle = tokio::task::spawn(super::sign( - self.center.clone(), - self.zone.clone(), - builder, - trigger, - )); - - self.state.signer.ongoing = Some(handle.into()); + let span = tracing::Span::none(); + self.state.signer.ongoing.spawn( + span, + super::sign(self.center.clone(), self.zone.clone(), builder, trigger), + ); } else { // TODO: Track expiration time in 'SignerState'. let expiration_time = self @@ -147,7 +159,6 @@ impl SignerZoneHandle<'_> { .as_ref() .is_none_or(|o| o.builder.is_none()) ); - assert!(self.state.signer.ongoing.is_none()); // Load the one enqueued re-sign operation, if it exists. let Some(resign) = self.state.signer.enqueued_resign.take() else { @@ -179,14 +190,11 @@ impl SignerZoneHandle<'_> { SigningTrigger::SignatureExpiration }; - let handle = tokio::task::spawn(super::sign( - self.center.clone(), - self.zone.clone(), - builder, - trigger, - )); - - self.state.signer.ongoing = Some(handle.into()); + let span = tracing::Span::none(); + self.state.signer.ongoing.spawn( + span, + super::sign(self.center.clone(), self.zone.clone(), builder, trigger), + ); true } @@ -197,8 +205,8 @@ impl SignerZoneHandle<'_> { /// State for signing a zone. #[derive(Debug, Default)] pub struct SignerState { - /// A handle to an ongoing operation, if any. - pub ongoing: Option, + /// Ongoing (re-)signing operations. + pub ongoing: BackgroundTasks, /// An enqueued signing operation, if any. pub enqueued_sign: Option, diff --git a/src/units/key_manager.rs b/src/units/key_manager.rs index 4fb80839..711e59a9 100644 --- a/src/units/key_manager.rs +++ b/src/units/key_manager.rs @@ -1,11 +1,11 @@ use crate::api; use crate::api::{FileKeyImport, KeyImport, KmipKeyImport}; -use crate::center::{Center, ZoneAddError}; +use crate::center::{Center, ZoneAddError, get_zone}; use crate::manager::record_zone_event; use crate::policy::{KeyParameters, PolicyVersion}; use crate::units::http_server::KmipServerState; use crate::util::AbortOnDrop; -use crate::zone::{HistoricalEvent, SigningTrigger}; +use crate::zone::{HistoricalEvent, ZoneHandle}; use bytes::Bytes; use camino::{Utf8Path, Utf8PathBuf}; use cascade_api::keyset::{KeyRollCommand, KeyRollVariant}; @@ -25,7 +25,7 @@ use std::process::Output; use std::sync::Arc; use tokio::sync::Mutex; use tokio::time::Instant; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, warn}; //------------ KeyManager ---------------------------------------------------- @@ -437,13 +437,15 @@ impl KeyManager { } }; let _ = ks_info.insert(apex_name, new_info); - info!("[CC]: Instructing zone signer to re-sign the zone"); - center.signer.on_sign_zone( + let zone = get_zone(center, zone.apex_name()).unwrap(); + let mut state = zone.state.lock().unwrap(); + ZoneHandle { + zone: &zone, + state: &mut state, center, - zone.apex_name().clone(), - None, - SigningTrigger::ExternallyModifiedKeySetState, - ); + } + .signer() + .enqueue_resign(true, false); continue; } @@ -482,13 +484,15 @@ impl KeyManager { // signer. // let new_info = get_keyset_info(&state_path); let _ = ks_info.insert(apex_name, new_info); - info!("[CC]: Instructing zone signer to re-sign the zone"); - center.signer.on_sign_zone( + let zone = get_zone(center, zone.apex_name()).unwrap(); + let mut state = zone.state.lock().unwrap(); + ZoneHandle { + zone: &zone, + state: &mut state, center, - zone.apex_name().clone(), - None, - SigningTrigger::KeySetModifiedAfterCron, - ); + } + .signer() + .enqueue_resign(true, false); continue; } diff --git a/src/units/zone_server.rs b/src/units/zone_server.rs index ac686009..066f97ff 100644 --- a/src/units/zone_server.rs +++ b/src/units/zone_server.rs @@ -46,8 +46,8 @@ use crate::manager::Terminated; use crate::manager::record_zone_event; use crate::util::AbortOnDrop; use crate::zone::{ - HistoricalEvent, SignedZoneVersionState, SigningTrigger, UnsignedZoneVersionState, Zone, - ZoneHandle, ZoneVersionReviewState, + HistoricalEvent, SignedZoneVersionState, UnsignedZoneVersionState, Zone, ZoneHandle, + ZoneVersionReviewState, }; /// The source of a zone server. @@ -529,29 +529,15 @@ impl ZoneServer { zone: &Arc, zone_serial: Serial, ) { - { - let mut zone_state = zone.state.lock().unwrap(); - zone_state.record_event( - HistoricalEvent::UnsignedZoneReview { - status: crate::api::ZoneReviewStatus::Approved, - }, - Some(zone_serial), - ); - ZoneHandle { - zone, - state: &mut zone_state, - center, - } - .storage() - .approve_loaded(); - } - info!("[CC]: Instructing zone signer to sign the approved zone"); - center.signer.on_sign_zone( + let _ = zone_serial; // TODO + let mut state = zone.state.lock().unwrap(); + ZoneHandle { + zone, + state: &mut state, center, - zone.name.clone(), - Some(zone_serial), - SigningTrigger::ZoneChangesApproved, - ); + } + .storage() + .approve_loaded(); } fn on_signed_zone_approved( diff --git a/src/units/zone_signer.rs b/src/units/zone_signer.rs index 9477f46d..f4198436 100644 --- a/src/units/zone_signer.rs +++ b/src/units/zone_signer.rs @@ -1,40 +1,44 @@ use std::cmp::{Ordering, min}; use std::collections::{HashMap, VecDeque}; -use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, SystemTime}; use bytes::Bytes; -use domain::base::iana::{Class, SecurityAlgorithm}; +use cascade_zonedata::{OldRecord, SignedZoneBuilder}; +use domain::base::iana::SecurityAlgorithm; use domain::base::name::FlattenInto; -use domain::base::{CanonicalOrd, Record, Rtype, Serial}; +use domain::base::{CanonicalOrd, Record}; use domain::crypto::sign::{SecretKeyBytes, SignRaw}; use domain::dnssec::common::parse_from_bind; use domain::dnssec::sign::SigningConfig; use domain::dnssec::sign::denial::config::DenialConfig; -use domain::dnssec::sign::denial::nsec3::{GenerateNsec3Config, Nsec3ParamTtlMode}; +use domain::dnssec::sign::denial::nsec::generate_nsecs; +use domain::dnssec::sign::denial::nsec3::{ + GenerateNsec3Config, Nsec3ParamTtlMode, Nsec3Records, generate_nsec3s, +}; use domain::dnssec::sign::error::SigningError; use domain::dnssec::sign::keys::SigningKey; use domain::dnssec::sign::keys::keyset::{KeySet, KeyType}; -use domain::dnssec::sign::records::{RecordsIter, Sorter}; +use domain::dnssec::sign::records::RecordsIter; use domain::dnssec::sign::signatures::rrsigs::{GenerateRrsigConfig, sign_sorted_zone_records}; -use domain::dnssec::sign::traits::SignableZoneInPlace; +use domain::new::base::{RType, Serial}; +use domain::new::rdata::RecordData; use domain::rdata::dnssec::Timestamp; -use domain::rdata::{Dnskey, Nsec3param, Rrsig, Soa, ZoneRecordData}; +use domain::rdata::{Dnskey, Nsec3param}; use domain::zonefile::inplace::{Entry, Zonefile}; -use domain::zonetree::types::{StoredRecordData, ZoneUpdate}; -use domain::zonetree::update::ZoneUpdater; -use domain::zonetree::{StoredName, StoredRecord, Zone}; +use domain::zonetree::StoredName; use domain_kmip::KeyUrl; use domain_kmip::dep::kmip::client::pool::{ConnectionManager, KmipConnError, SyncConnPool}; use domain_kmip::{self, ClientCertificate, ConnectionSettings}; use jiff::tz::TimeZone; use jiff::{Timestamp as JiffTimestamp, Zoned}; +use rayon::iter::{ + IntoParallelIterator, IntoParallelRefIterator, ParallelExtend, ParallelIterator, +}; use rayon::slice::ParallelSliceMut; use serde::{Deserialize, Serialize}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch}; -use tokio::task::spawn_blocking; use tokio::time::Instant; use tracing::{Level, debug, error, info, trace, warn}; use url::Url; @@ -43,8 +47,7 @@ use crate::api::{ SigningFinishedReport, SigningInProgressReport, SigningQueueReport, SigningReport, SigningRequestedReport, SigningStageReport, }; -use crate::center::{Center, get_zone, halt_zone}; -use crate::common::light_weight_zone::LightWeightZone; +use crate::center::{Center, get_zone}; use crate::manager::{Terminated, record_zone_event}; use crate::policy::{PolicyVersion, SignerDenialPolicy, SignerSerialPolicy}; use crate::units::http_server::KmipServerState; @@ -55,7 +58,9 @@ use crate::util::{ AbortOnDrop, serialize_duration_as_secs, serialize_instant_as_duration_secs, serialize_opt_duration_as_secs, }; -use crate::zone::{HistoricalEvent, HistoricalEventType, PipelineMode, SigningTrigger}; +use crate::zone::{ + HistoricalEvent, HistoricalEventType, PipelineMode, SigningTrigger, Zone, ZoneHandle, +}; // Re-signing zones before signatures expire works as follows: // - compute when the first zone needs to be re-signed. Loop over unsigned @@ -76,7 +81,7 @@ use crate::zone::{HistoricalEvent, HistoricalEventType, PipelineMode, SigningTri pub struct ZoneSigner { // TODO: Discuss whether this semaphore is necessary. max_concurrent_operations: usize, - concurrent_operation_permits: Semaphore, + concurrent_operation_permits: Arc, max_concurrent_rrsig_generation_tasks: usize, signer_status: ZoneSignerStatus, kmip_servers: Arc>>, @@ -94,7 +99,7 @@ impl ZoneSigner { Self { max_concurrent_operations, - concurrent_operation_permits: Semaphore::new(max_concurrent_operations), + concurrent_operation_permits: Arc::new(Semaphore::new(max_concurrent_operations)), max_concurrent_rrsig_generation_tasks: (std::thread::available_parallelism() .unwrap() .get() @@ -206,7 +211,7 @@ impl ZoneSigner { ZoneSigningStatus::InProgress(s) => { Some(SigningStageReport::InProgress(SigningInProgressReport { requested_at: now_t.checked_sub(now.duration_since(s.requested_at))?, - zone_serial: s.zone_serial, + zone_serial: domain::base::Serial(s.zone_serial.into()), started_at: now_t.checked_sub(now.duration_since(s.started_at))?, unsigned_rr_count: s.unsigned_rr_count, walk_time: s.walk_time, @@ -216,7 +221,6 @@ impl ZoneSigner { rrsig_count: s.rrsig_count, rrsig_reused_count: s.rrsig_reused_count, rrsig_time: s.rrsig_time, - insertion_time: s.insertion_time, total_time: s.total_time, threads_used: s.threads_used, })) @@ -224,7 +228,7 @@ impl ZoneSigner { ZoneSigningStatus::Finished(s) => { Some(SigningStageReport::Finished(SigningFinishedReport { requested_at: now_t.checked_sub(now.duration_since(s.requested_at))?, - zone_serial: s.zone_serial, + zone_serial: domain::base::Serial(s.zone_serial.into()), started_at: now_t.checked_sub(now.duration_since(s.started_at))?, unsigned_rr_count: s.unsigned_rr_count, walk_time: s.walk_time, @@ -234,7 +238,6 @@ impl ZoneSigner { rrsig_count: s.rrsig_count, rrsig_reused_count: s.rrsig_reused_count, rrsig_time: s.rrsig_time, - insertion_time: s.insertion_time, total_time: s.total_time, threads_used: s.threads_used, finished_at: now_t.checked_sub(now.duration_since(s.finished_at))?, @@ -250,49 +253,6 @@ impl ZoneSigner { }) } - pub fn on_sign_zone( - &self, - center: &Arc
, - zone_name: StoredName, - zone_serial: Option, - trigger: SigningTrigger, - ) { - let center = center.clone(); - tokio::spawn(async move { - if let Err(err) = center - .signer - .join_sign_zone_queue(¢er, &zone_name, zone_serial.is_none(), trigger) - .await - { - if err.is_benign() { - // Ignore this benign case. It was probably caused - // by dnst keyset cron triggering resigning before - // we even signed the first time, either because - // the zone was large and slow to load and sign, - // or because the unsigned zone was pending - // review. - debug!( - "[ZS]: Ignoring probably benign failure to (re)sign '{zone_name}': {err}" - ); - } else { - error!("[ZS]: Signing of zone '{zone_name}' failed: {err}"); - - halt_zone(¢er, &zone_name, true, &err.to_string()); - - record_zone_event( - ¢er, - &zone_name, - HistoricalEvent::SigningFailed { - trigger, - reason: err.to_string(), - }, - zone_serial, - ); - } - } - }); - } - pub fn on_signing_report( &self, _center: &Arc
, @@ -323,28 +283,26 @@ impl ZoneSigner { let _ = self.next_resign_time_tx.send(self.next_resign_time(center)); } - /// Signs zone_name from the Center::signable_zones zone collection, - /// unless `resign_last_signed_zone_content` is true in which case - /// it resigns the copy of the zone from the Center::published_zones - /// collection instead. An alternative way to do this would be to only - /// read the right version of the signable zone, but that would only - /// be possible if the signable zone were definitely a ZoneApex zone - /// rather than a LightWeightZone (and XFR-in zones are LightWeightZone - /// instances). - pub async fn join_sign_zone_queue( + /// Enqueue a zone for signing, waiting until it can begin. + pub async fn wait_to_sign( &self, - center: &Arc
, - zone_name: &StoredName, - resign_last_signed_zone_content: bool, - trigger: SigningTrigger, - ) -> Result<(), SignerError> { + zone: &Arc, + ) -> ( + Arc>, + [OwnedSemaphorePermit; 3], + ) { + let zone_name = &zone.name; info!("[ZS]: Waiting to enqueue signing operation for zone '{zone_name}'."); self.signer_status.dump_queue(); - let (q_size, _q_permit, _zone_permit, status) = { + let (q_size, q_permit, zone_permit, status) = { let signer_status = &self.signer_status; - signer_status.enqueue(zone_name.clone()).await? + // TODO: Propagate the error properly. + signer_status + .enqueue(zone_name.clone()) + .await + .unwrap_or_else(|err| panic!("{err}")) }; let num_ops_in_progress = @@ -354,48 +312,32 @@ impl ZoneSigner { q_size - 1 ); - let _permit = self.concurrent_operation_permits.acquire().await.unwrap(); - - status.write().unwrap().current_action = "Signing".to_string(); - - let res = self - .sign_zone( - center, - zone_name, - resign_last_signed_zone_content, - trigger, - status.clone(), - ) - .await; - - let mut status = status.write().unwrap(); - if res.is_ok() { - status.status.finish(true); - status.current_action = "Finished".to_string(); - } else { - status.status.finish(false); - status.current_action = "Aborted".to_string(); - } + let permit = self + .concurrent_operation_permits + .clone() + .acquire_owned() + .await + .unwrap(); - res + // TODO: Why do we need three different permits? + (status, [q_permit, zone_permit, permit]) } - async fn sign_zone( + pub fn sign_zone( &self, center: &Arc
, - zone_name: &StoredName, - resign_last_signed_zone_content: bool, + zone: &Arc, + builder: &mut SignedZoneBuilder, trigger: SigningTrigger, status: Arc>, ) -> Result<(), SignerError> { + let zone_name = &zone.name; info!("[ZS]: Starting signing operation for zone '{zone_name}'"); let start = Instant::now(); let (last_signed_serial, policy) = { // Use a block to make sure that the mutex is clearly dropped. - let state = center.state.lock().unwrap(); - let zone = state.zones.get(zone_name).unwrap(); - let zone_state = zone.0.state.lock().unwrap(); + let zone_state = zone.state.lock().unwrap(); // Do NOT sign a zone that is halted. if zone_state.pipeline_mode != PipelineMode::Running { @@ -406,7 +348,8 @@ impl ZoneSigner { let last_signed_serial = zone_state .find_last_event(HistoricalEventType::SigningSucceeded, None) - .and_then(|item| item.serial); + .and_then(|item| item.serial) + .map(|serial| Serial::from(serial.0)); (last_signed_serial, zone_state.policy.clone().unwrap()) }; @@ -416,58 +359,45 @@ impl ZoneSigner { // // Lookup the zone to sign. // - status.write().unwrap().current_action = "Retrieving zone to sign".to_string(); - let signable_zone = match resign_last_signed_zone_content { - false => { - let signable_zones = center.signable_zones.load(); - let Some(signable_zone) = signable_zones.get_zone(&zone_name, Class::IN).cloned() - else { - debug!("Ignoring request to sign unavailable zone '{zone_name}'"); - return Err(SignerError::CannotSignUnapprovedZone); - }; - signable_zone - } - true => { - let published_zones = center.published_zones.load(); - debug!("Ignoring request to re-sign zone that was never published '{zone_name}'"); - published_zones - .get_zone(&zone_name, Class::IN) - .cloned() - .ok_or(SignerError::CannotResignNonPublishedZone)? - } - }; - - status.write().unwrap().current_action = "Querying zone SOA record".to_string(); - let soa_rr = get_zone_soa(signable_zone.clone(), zone_name.clone())?; - let ZoneRecordData::Soa(soa) = soa_rr.data() else { - return Err(SignerError::SoaNotFound); - }; + let mut writer = builder.replace().unwrap(); + let mut new_records = Vec::new(); + let loaded = writer + .next_loaded() + .or(writer.curr_loaded()) + .expect("a non-empty loaded instance must exist"); + let loaded_serial = loaded.soa().rdata.serial; let serial = match policy.signer.serial_policy { SignerSerialPolicy::Keep => { if let Some(previous_serial) = last_signed_serial - && soa.serial() <= previous_serial + && loaded_serial <= previous_serial { return Err(SignerError::KeepSerialPolicyViolated); } - soa.serial() + loaded_serial } SignerSerialPolicy::Counter => { - let mut serial = soa.serial(); + // Select the maximum of 'last_signed_serial + 1' and + // 'loaded_serial'. + // + // TODO: This is a partial workaround to help users starting + // out with counter mode. For ongoing discussion, see + // . + let mut serial = loaded_serial; if let Some(previous_serial) = last_signed_serial && serial <= previous_serial { - serial = previous_serial.add(1); + serial = previous_serial.inc(1); } serial } SignerSerialPolicy::UnixTime => { - let mut serial = Serial::now(); + let mut serial = Serial::unix_time(); if let Some(previous_serial) = last_signed_serial && serial <= previous_serial { - serial = previous_serial.add(1); + serial = previous_serial.inc(1); } serial @@ -483,32 +413,20 @@ impl ZoneSigner { if let Some(previous_serial) = last_signed_serial && serial <= previous_serial { - serial = previous_serial.add(1); + serial = previous_serial.inc(1); } serial } }; - let new_soa = ZoneRecordData::Soa(Soa::new( - soa.mname().clone(), - soa.rname().clone(), - serial, - soa.refresh(), - soa.retry(), - soa.expire(), - soa.minimum(), - )); - - let soa_rr = Record::new( - soa_rr.owner().clone(), - soa_rr.class(), - soa_rr.ttl(), - new_soa, - ); + let new_soa = { + let mut soa = loaded.soa().clone(); + soa.rdata.serial = serial; + soa + }; info!( - "[ZS]: Serials for zone '{zone_name}': last signed={last_signed_serial:?}, current={}, serial policy={}, new={serial}", - soa.serial(), + "[ZS]: Serials for zone '{zone_name}': last signed={last_signed_serial:?}, current={loaded_serial}, serial policy={}, new={serial}", policy.signer.serial_policy ); @@ -520,26 +438,14 @@ impl ZoneSigner { .write() .unwrap() .status - .start(soa.serial()) + .start(loaded_serial) .map_err(|_| SignerError::InternalError("Invalid status".to_string()))?; } - // - // Lookup the signed zone to update, or create a new empty zone to - // sign into. - // - let zone = self.get_or_insert_signed_zone(center, zone_name); - // // Create a signing configuration. // - // Ensure that the Mutexes are locked only in this block; - let policy = { - let zone = get_zone(center, zone_name).unwrap(); - let zone_state = zone.state.lock().unwrap(); - zone_state.policy.clone() - }; - let signing_config = self.signing_config(&policy.unwrap()); + let signing_config = self.signing_config(&policy); let rrsig_cfg = GenerateRrsigConfig::new(signing_config.inception, signing_config.expiration); @@ -549,9 +455,12 @@ impl ZoneSigner { status.write().unwrap().current_action = "Collecting records to sign".to_string(); debug!("[ZS]: Collecting records to sign for zone '{zone_name}'."); let walk_start = Instant::now(); - let passed_zone = signable_zone.clone(); - let mut records = spawn_blocking(|| collect_zone(passed_zone)).await.unwrap(); - records.push(soa_rr.clone()); + let mut records = loaded + .records() + .iter() + .map(|r| OldRecord::from(r.clone())) + .collect::>(); + records.push(new_soa.clone().into()); let walk_time = walk_start.elapsed(); let unsigned_rr_count = records.len(); @@ -568,7 +477,7 @@ impl ZoneSigner { status.write().unwrap().current_action = "Fetching apex RRs from the key manager".to_string(); // Read the DNSKEY RRs and DNSKEY RRSIG RR from the keyset state. - let state_path = mk_dnst_keyset_state_file_path(¢er.config.keys_dir, zone.apex_name()); + let state_path = mk_dnst_keyset_state_file_path(¢er.config.keys_dir, &zone.name); let state = std::fs::read_to_string(&state_path) .map_err(|_| SignerError::CannotReadStateFile(state_path.into_string()))?; let state: KeySetState = serde_json::from_str(&state).unwrap(); @@ -577,7 +486,9 @@ impl ZoneSigner { zonefile.extend_from_slice(dnskey_rr.as_bytes()); zonefile.extend_from_slice(b"\n"); if let Ok(Some(Entry::Record(rec))) = zonefile.next_entry() { - records.push(rec.flatten_into()); + let record: OldRecord = rec.flatten_into(); + new_records.push(record.clone().into()); + records.push(record); } } @@ -770,14 +681,8 @@ impl ZoneSigner { debug!("[ZS]: Sorting collected records for zone '{zone_name}'."); status.write().unwrap().current_action = "Sorting records".to_string(); let sort_start = Instant::now(); - let mut records = spawn_blocking(|| { - // Note: This may briefly use lots of CPU and many CPU cores. - MultiThreadedSorter::sort_by(&mut records, CanonicalOrd::canonical_cmp); - records.dedup(); - records - }) - .await - .unwrap(); + // Note: This may briefly use lots of CPU and many CPU cores. + records.par_sort_by(CanonicalOrd::canonical_cmp); let sort_time = sort_start.elapsed(); let unsigned_rr_count = records.len(); @@ -795,20 +700,47 @@ impl ZoneSigner { debug!("[ZS]: Generating denial records for zone '{zone_name}'."); status.write().unwrap().current_action = "Generating denial records".to_string(); let denial_start = Instant::now(); - let apex_owner = zone_name.clone(); - let unsigned_records = spawn_blocking(move || { - // By not passing any keys to sign_zone() will only add denial RRs, - // not RRSIGs. We could invoke generate_nsecs() or generate_nsec3s() - // directly here instead. - let no_keys: [&SigningKey; 0] = Default::default(); - records.sign_zone(&apex_owner, &signing_config, &no_keys)?; - Ok(records) - }) - .await - .unwrap() - .map_err(|err: SigningError| { - SignerError::SigningError(format!("Failed to generate denial RRs: {err}")) - })?; + match &signing_config.denial { + DenialConfig::AlreadyPresent => {} + + DenialConfig::Nsec(cfg) => { + let nsecs = generate_nsecs(&zone.name, RecordsIter::new_from_owned(&records), cfg) + .map_err(|err: SigningError| { + SignerError::SigningError(format!("Failed to generate denial RRs: {err}")) + })?; + + new_records.par_extend( + nsecs + .par_iter() + .map(|r| OldRecord::from_record(r.clone()).into()), + ); + records.par_extend(nsecs.into_par_iter().map(Record::from_record)); + } + + DenialConfig::Nsec3(cfg) => { + // RFC 5155 7.1 step 5: "Sort the set of NSEC3 RRs into hash + // order." We store the NSEC3s as we create them and sort them + // afterwards. + let Nsec3Records { nsec3s, nsec3param } = + generate_nsec3s(&zone.name, RecordsIter::new_from_owned(&records), cfg) + .map_err(|err: SigningError| { + SignerError::SigningError(format!( + "Failed to generate denial RRs: {err}" + )) + })?; + + // Add the generated NSEC3 records. + new_records.par_extend( + nsec3s + .par_iter() + .map(|r| OldRecord::from_record(r.clone()).into()), + ); + new_records.push(OldRecord::from_record(nsec3param.clone()).into()); + records.par_extend(nsec3s.into_par_iter().map(Record::from_record)); + records.push(Record::from_record(nsec3param)); + } + } + let unsigned_records = records; let denial_time = denial_start.elapsed(); let denial_rr_count = unsigned_records.len() - unsigned_rr_count; @@ -848,181 +780,85 @@ impl ZoneSigner { } } - // Create a zone updater which will be used to add RRs resulting - // from RRSIG generation to the signed zone. We set the create_diff - // argument to false because we sign the zone by deleting all records - // so from the point of view of the automatic diff creation logic all - // records added to the zone appear to be new. Once we add support for - // incremental signing (i.e. only regenerate, add and remove RRSIGs, - // and update the NSEC(3) chain as needed, we can capture a diff of - // the changes we make). - let mut updater = ZoneUpdater::new(zone.clone()).await.unwrap(); - - // Clear out any RRs in the current version of the signed zone. If the zone - // supports versioning this is a NO OP. - debug!("SIGNER: Deleting records in existing (if any) copy of signed zone."); - updater.apply(ZoneUpdate::DeleteAllRecords).await.unwrap(); - - // 'updater.apply()' is technically 'async', although we always - // implement it here with synchronous methods. This still forces - // us to wrap the whole thing in a future, so we spawn a relatively - // lightweight single-threaded Tokio runtime to handle it for us. - - // Insert all unsigned records into the updater. - let unsigned_updater_task = spawn_blocking({ - let runtime = tokio::runtime::Builder::new_current_thread() - .thread_name("cascade-worker") - .build() - .unwrap(); - - move || { - runtime.block_on(async move { - let start = Instant::now(); - - for record in &unsigned_records { - let record = Record::from_record(record.clone()); - updater.apply(ZoneUpdate::AddRecord(record)).await.unwrap(); - } + let generation_start = Instant::now(); - debug!( - "Inserted {} unsigned records in {:.1}s", - unsigned_records.len(), - start.elapsed().as_secs_f64() - ); + // Get the keys to sign with. Domain's 'sign_sorted_zone_records()' + // needs a slice of references, so we need to build that here. + let keys = signing_keys.iter().collect::>(); - (unsigned_records, updater) - }) - } - }); - let (unsigned_records, mut updater) = unsigned_updater_task.await.map_err(|_| { - SignerError::SigningError("Failed to insert unsigned records".to_string()) - })?; - - // At the moment, 'ZoneUpdater' only allows single-threaded access. It - // needs to be passed all of our records, which get created across many - // threads. Rather than collecting all the records and inserting them - // at once, we'll let the updater run in tandem with signing. If the - // updater can't keep up, the channel will accumulate a lot of objects, - // but that's okay. - let (updater_tx, updater_rx) = std::sync::mpsc::channel::>(); - - // The inserter task; it collects all signatures and adds them to the - // zone. It also computes the minimum expiration time for us. - let inserter_task = spawn_blocking({ - let runtime = tokio::runtime::Builder::new_current_thread() - .thread_name("cascade-worker") - .build() - .unwrap(); - - move || { - runtime.block_on(async move { - let mut total_signatures = 0usize; - let start = Instant::now(); - - while let Ok(signatures) = updater_rx.recv() { - total_signatures += signatures.len(); - for sig in signatures { - updater - .apply(ZoneUpdate::AddRecord(Record::from_record(sig))) - .await - .unwrap(); - } - } - - let duration = start.elapsed(); - debug!( - "Inserted {total_signatures} signatures over {:.1}s", - duration.as_secs_f64() - ); + // TODO: This generation code is incorrect; 'sign_sorted_zone_records' + // looks for zone cuts, but zone cuts may need to be detected _across_ + // the segments we split the records into. Zone cut detection needs to + // be re-implemented here with parallel execution in mind. This also + // applies to NSEC(3) generation, but it is currently single-threaded. - (updater, total_signatures, duration) - }) + // Split the records into segments. + let segments = rayon::iter::split(0..unsigned_records.len(), |range| { + // Always sign at least 1024 records at a time. + if range.len() < 1024 { + return (range, None); } - }); - - // Generate all signatures via Rayon on separate threads. - let generator_task = spawn_blocking({ - let zone_name = zone_name.clone(); - let signing_keys = Arc::new(signing_keys); - - move || { - // TODO: Install a dedicated Rayon thread pool over here? - - let start = Instant::now(); - // Get the keys to sign with. Domain's 'sign_sorted_zone_records()' - // needs a slice of references, so we need to build that here. - let keys = signing_keys.iter().collect::>(); - - let task = SignTask { - zone_name: &zone_name, - records: &unsigned_records, - range: 0..unsigned_records.len(), - config: &rrsig_cfg, - keys: &keys, - updater_tx: &updater_tx, - }; + let midpoint = range.start + range.len() / 2; + let left = range.start..midpoint; + let right = midpoint..range.end; + (left, Some(right)) + }); - task.execute().map(|_| start.elapsed()) - } + // Generate signatures from each segment. + let signatures = segments.map(|range| { + sign_sorted_zone_records( + &zone.name, + RecordsIter::new_from_owned(&unsigned_records[range]), + &keys, + &rrsig_cfg, + ) }); - // Wait for signature generation and insertion to finish. - let generation_time = generator_task - .await - .map_err(|_| SignerError::SigningError("Could not generate RRsigs".to_string()))? + // Collect the signatures together, folding errors together. + let signatures = signatures + .try_reduce(Vec::new, |mut a, mut b| { + a.append(&mut b); + Ok(a) + }) .map_err(|err| SignerError::SigningError(err.to_string()))?; + let total_signatures = signatures.len(); - let (mut updater, total_signatures, insertion_time) = inserter_task - .await - .map_err(|_| SignerError::SigningError("Could not insert all records".to_string()))?; + new_records.par_extend( + signatures + .into_par_iter() + .map(|r| OldRecord::from_record(r).into()), + ); + new_records.par_sort(); + writer.set_records(new_records).unwrap(); + + let generation_time = generation_start.elapsed(); let generation_rate = total_signatures as f64 / generation_time.as_secs_f64().min(0.001); - let insertion_rate = total_signatures as f64 / insertion_time.as_secs_f64().min(0.001); - // Finalize the signed zone update. - let ZoneRecordData::Soa(soa_data) = soa_rr.data() else { - unreachable!(); - }; - let zone_serial = soa_data.serial(); - - // Store the serial in the state. - // Note: We do NOT do this here because CentralCommand does it when it - // sees the ZoneSignedEvent. - // { - // // Use a block to make sure that the mutex is clearly dropped. - // let zone = get_zone(&self.center, zone_name).unwrap(); - // let mut zone_state = zone.state.lock().unwrap(); - // zone_state.record_event( - // HistoricalEvent::SigningSucceeded { trigger }, - // Some(zone_serial), - // ); - // zone.mark_dirty(&mut zone_state, &self.center); - // } - - updater.apply(ZoneUpdate::Finished(soa_rr)).await.unwrap(); + writer.set_soa(new_soa.clone()).unwrap(); + writer.apply().unwrap(); debug!("SIGNER: Determining min expiration time"); - let reader = zone.read(); - let apex_name = zone_name.clone(); + let reader = builder.next_signed().unwrap(); let min_expiration = Arc::new(MinTimestamp::new()); let saved_min_expiration = min_expiration.clone(); - reader.walk(Box::new(move |name, rrset, _cut| { - for r in rrset.data() { - if let ZoneRecordData::Rrsig(rrsig) = r { - if name == apex_name - && (rrsig.type_covered() == Rtype::DNSKEY - || rrsig.type_covered() == Rtype::CDS - || rrsig.type_covered() == Rtype::CDNSKEY) - { - // These types come from the key manager. - continue; - } + for record in reader.records() { + let RecordData::RRSig(sig) = record.rdata.get() else { + continue; + }; - min_expiration.add(rrsig.expiration()); - } + // Ignore RRSIG records for DNSKEY, CDS, and CDNSKEY records; these + // are generated by the key manager, using KSKs. + if sig.rtype == RType::DNSKEY + || sig.rtype == RType::from(59) + || sig.rtype == RType::from(60) + { + continue; } - })); + + min_expiration.add(u32::from(sig.expiration).into()); + } // Save the minimum of the expiration times. { @@ -1050,7 +886,6 @@ impl ZoneSigner { s.rrsig_count = Some(total_signatures); s.rrsig_reused_count = Some(0); // Not implemented yet s.rrsig_time = Some(generation_time); - s.insertion_time = Some(insertion_time); s.total_time = Some(total_time); } v.status.finish(true); @@ -1058,17 +893,15 @@ impl ZoneSigner { // Log signing statistics. info!( - "Signing statistics for {zone_name} serial: {zone_serial}:\n\ + "Signing statistics for {zone_name} serial: {serial}:\n\ Collected {unsigned_rr_count} records in {:.1}s, sorted in {:.1}s\n\ Generated {denial_rr_count} NSEC(3) records in {:.1}s\n\ Generated {total_signatures} signatures in {:.1}s ({generation_rate:.0}sig/s) - Inserted signatures in {:.1}s ({insertion_rate:.0}sig/s)\n\ Took {:.1}s in total, using {parallelism} threads", walk_time.as_secs_f64(), sort_time.as_secs_f64(), denial_time.as_secs_f64(), generation_time.as_secs_f64(), - insertion_time.as_secs_f64(), total_time.as_secs_f64() ); @@ -1076,7 +909,7 @@ impl ZoneSigner { center, zone_name, HistoricalEvent::SigningSucceeded { trigger }, - Some(zone_serial), + Some(domain::base::Serial(serial.into())), ); // Notify the review server that the zone is ready. @@ -1084,7 +917,7 @@ impl ZoneSigner { center.signed_review_server.on_seek_approval_for_zone( center, zone_name.clone(), - zone_serial, + domain::base::Serial(serial.into()), ); Ok(()) @@ -1105,29 +938,6 @@ impl ZoneSigner { (parallelism, chunk_size) } - fn get_or_insert_signed_zone(&self, center: &Arc
, zone_name: &StoredName) -> Zone { - // Create an empty zone to sign into if no existing signed zone exists. - let signed_zones = center.signed_zones.load(); - - signed_zones - .get_zone(zone_name, Class::IN) - .cloned() - .unwrap_or_else(move || { - // Use a LightWeightZone as it is able to fix RRSIG TTLs to - // be the same when walked as the record they sign, rather - // than being forced into a common RRSET with a common TTL. - let new_zone = Zone::new(LightWeightZone::new(zone_name.clone(), false)); - - center.signed_zones.rcu(|zones| { - let mut new_zones = Arc::unwrap_or_clone(zones.clone()); - new_zones.insert_zone(new_zone.clone()).unwrap(); - new_zones - }); - - new_zone - }) - } - fn signing_config(&self, policy: &PolicyVersion) -> SigningConfig { let denial = match &policy.signer.denial { SignerDenialPolicy::NSec => DenialConfig::Nsec(Default::default()), @@ -1263,13 +1073,15 @@ impl ZoneSigner { let mut resign_busy = center.resign_busy.lock().expect("should not fail"); resign_busy.insert(zone_name.clone(), min_expiration); } - info!("[CC]: Instructing zone signer to re-sign the zone"); - self.on_sign_zone( + let zone = get_zone(center, zone_name).unwrap(); + let mut state = zone.state.lock().unwrap(); + ZoneHandle { + zone: &zone, + state: &mut state, center, - zone_name.clone(), - None, - SigningTrigger::SignatureExpiration, - ); + } + .signer() + .enqueue_resign(false, true); } } } @@ -1310,204 +1122,6 @@ impl MinTimestamp { } } -/// A signature record. -type SigRecord = Record>; - -/// A task to sign a set of records. -#[derive(Clone)] -struct SignTask<'a> { - /// The name of the zone. - zone_name: &'a StoredName, - - /// The entire set of unsigned records. - records: &'a [Record], - - /// The apparent range of records to work on. - /// - /// The true range is slightly different; it rounds forward to full RRsets. - /// This means that some initial records might be skipped, and some records - /// beyond the end might be included. - range: Range, - - /// The signing configuration. - config: &'a GenerateRrsigConfig, - - /// The set of keys to sign with. - keys: &'a [&'a SigningKey], - - /// The zone updater to insert the records into. - updater_tx: &'a std::sync::mpsc::Sender>, -} - -impl SignTask<'_> { - /// The ideal batch size for signing records. - /// - /// Records will be signed when they are grouped into batches of this size - /// or smaller. - const BATCH_SIZE: usize = 4096; - - /// Execute this task. - /// - /// If the task is too big, it will be split into two and executed through - /// Rayon. This follows Rayon's concurrency paradigm, known as Cilk-style - /// parallelism. It's ideal for Rayon's work-stealing implementation. - pub fn execute(self) -> Result<(), SigningError> { - if self.range.len() <= Self::BATCH_SIZE { - // This task should take little enough time that we'll do it all - // on this thread, immediately. - - self.execute_now() - } else { - // Split the task into two and allow Rayon to execute them in - // parallel if it can. - - let (a, b) = self.split(); - match rayon::join(|| a.execute(), || b.execute()) { - (Ok(()), Ok(())) => Ok(()), - (Err(err), Ok(())) | (Ok(()), Err(err)) => Err(err), - // TODO: Do we want to combine errors somehow? - (Err(a), Err(_b)) => Err(a), - } - } - } - - /// Split this task in two. - fn split(self) -> (Self, Self) { - debug_assert!(self.range.len() > Self::BATCH_SIZE); - - // Just split the apparent range in two. - let midpoint = self.range.start + self.range.len() / 2; - let left_range = self.range.start..midpoint; - let right_range = midpoint..self.range.end; - - ( - Self { - range: left_range, - ..self.clone() - }, - Self { - range: right_range, - ..self.clone() - }, - ) - } - - /// Execute this task right here. - fn execute_now(self) -> Result<(), SigningError> { - // Determine the true range we want to sign. - - if self.range.is_empty() { - return Ok(()); - } - - let start = if self.range.start > 0 { - // The record immediately before our apparent range. - let previous = &self.records[self.range.start - 1]; - - self.records[self.range.clone()] - .iter() - .position(|r| r.owner() != previous.owner()) - .map_or(self.range.end, |p| self.range.start + p) - } else { - self.range.start - }; - - let end = { - // The last record in our apparent range. - let last = &self.records[self.range.end - 1]; - - self.records[self.range.end..] - .iter() - .position(|r| r.owner() != last.owner()) - .map_or(self.records.len(), |p| self.range.end + p) - }; - - let range = start..end; - - if range.is_empty() { - return Ok(()); - } - - // Perform the actual signing. - let signatures = sign_sorted_zone_records( - self.zone_name, - RecordsIter::new_from_owned(&self.records[range]), - self.keys, - self.config, - )?; - - // Return the signatures. - // - // If this fails, then the receiver must have panicked; an error about - // that will already be logged, so let's not pollute the logs further. - let _ = self.updater_tx.send(signatures); - - Ok(()) - } -} - -#[allow(clippy::result_large_err)] -fn get_zone_soa( - zone: Zone, - zone_name: StoredName, -) -> Result, SignerError> { - let answer = zone - .read() - .query(zone_name.clone(), Rtype::SOA) - .map_err(|_| SignerError::SoaNotFound)?; - let (soa_ttl, soa_data) = answer.content().first().ok_or(SignerError::SoaNotFound)?; - if !matches!(soa_data, ZoneRecordData::Soa(_)) { - return Err(SignerError::SoaNotFound); - }; - Ok(Record::new(zone_name.clone(), Class::IN, soa_ttl, soa_data)) -} - -fn collect_zone(zone: Zone) -> Vec { - // Temporary: Accumulate the zone into a vec as we can only sign over a - // slice at the moment, not over an iterator yet (nor can we iterate over - // a zone yet, only walk it ...). - let records = Arc::new(std::sync::Mutex::new(vec![])); - let passed_records = records.clone(); - - trace!("SIGNER: Walking"); - zone.read() - .walk(Box::new(move |owner, rrset, _at_zone_cut| { - let mut unlocked_records = passed_records.lock().unwrap(); - - // SKIP DNSSEC records that should be generated by the signing - // process (these will be present if re-signing a published signed - // zone rather than signing an unsigned zone). Skip The SOA as - // well. A new SOA will be added later. - if matches!( - rrset.rtype(), - Rtype::DNSKEY - | Rtype::RRSIG - | Rtype::NSEC - | Rtype::NSEC3 - | Rtype::CDS - | Rtype::CDNSKEY - | Rtype::SOA - ) { - return; - } - - unlocked_records.extend( - rrset.data().iter().map(|rdata| { - Record::new(owner.clone(), Class::IN, rrset.ttl(), rdata.to_owned()) - }), - ); - })); - - let records = Arc::into_inner(records).unwrap().into_inner().unwrap(); - - trace!( - "SIGNER: Walked: accumulated {} records for signing", - records.len() - ); - - records -} - fn parse_nsec3_config(opt_out: bool) -> GenerateNsec3Config { let mut params = Nsec3param::default(); if opt_out { @@ -1528,7 +1142,7 @@ impl std::fmt::Debug for ZoneSigner { //------------ ZoneSigningStatus --------------------------------------------- #[derive(Copy, Clone, Serialize)] -struct RequestedStatus { +pub struct RequestedStatus { #[serde(serialize_with = "serialize_instant_as_duration_secs")] requested_at: tokio::time::Instant, } @@ -1542,10 +1156,10 @@ impl RequestedStatus { } #[derive(Copy, Clone, Serialize)] -struct InProgressStatus { +pub struct InProgressStatus { #[serde(serialize_with = "serialize_instant_as_duration_secs")] requested_at: tokio::time::Instant, - zone_serial: Serial, + zone_serial: domain::base::Serial, #[serde(serialize_with = "serialize_instant_as_duration_secs")] started_at: tokio::time::Instant, unsigned_rr_count: Option, @@ -1561,8 +1175,6 @@ struct InProgressStatus { #[serde(serialize_with = "serialize_opt_duration_as_secs")] rrsig_time: Option, #[serde(serialize_with = "serialize_opt_duration_as_secs")] - insertion_time: Option, - #[serde(serialize_with = "serialize_opt_duration_as_secs")] total_time: Option, threads_used: Option, } @@ -1571,7 +1183,7 @@ impl InProgressStatus { fn new(requested_status: RequestedStatus, zone_serial: Serial) -> Self { Self { requested_at: requested_status.requested_at, - zone_serial, + zone_serial: domain::base::Serial(zone_serial.into()), started_at: Instant::now(), unsigned_rr_count: None, walk_time: None, @@ -1581,7 +1193,6 @@ impl InProgressStatus { rrsig_count: None, rrsig_reused_count: None, rrsig_time: None, - insertion_time: None, total_time: None, threads_used: None, } @@ -1589,12 +1200,12 @@ impl InProgressStatus { } #[derive(Copy, Clone, Serialize)] -struct FinishedStatus { +pub struct FinishedStatus { #[serde(serialize_with = "serialize_instant_as_duration_secs")] requested_at: tokio::time::Instant, #[serde(serialize_with = "serialize_instant_as_duration_secs")] started_at: tokio::time::Instant, - zone_serial: Serial, + zone_serial: domain::base::Serial, unsigned_rr_count: usize, #[serde(serialize_with = "serialize_duration_as_secs")] walk_time: Duration, @@ -1608,8 +1219,6 @@ struct FinishedStatus { #[serde(serialize_with = "serialize_duration_as_secs")] rrsig_time: Duration, #[serde(serialize_with = "serialize_duration_as_secs")] - insertion_time: Duration, - #[serde(serialize_with = "serialize_duration_as_secs")] total_time: Duration, threads_used: usize, #[serde(serialize_with = "serialize_instant_as_duration_secs")] @@ -1631,7 +1240,6 @@ impl FinishedStatus { rrsig_count: in_progress_status.rrsig_count.unwrap_or_default(), rrsig_reused_count: in_progress_status.rrsig_reused_count.unwrap_or_default(), rrsig_time: in_progress_status.rrsig_time.unwrap_or_default(), - insertion_time: in_progress_status.insertion_time.unwrap_or_default(), total_time: in_progress_status.total_time.unwrap_or_default(), threads_used: in_progress_status.threads_used.unwrap_or_default(), finished_at: Instant::now(), @@ -1641,7 +1249,7 @@ impl FinishedStatus { } #[derive(Copy, Clone, Serialize)] -enum ZoneSigningStatus { +pub enum ZoneSigningStatus { Requested(RequestedStatus), InProgress(InProgressStatus), @@ -1668,7 +1276,7 @@ impl ZoneSigningStatus { } } - fn finish(&mut self, succeeded: bool) { + pub fn finish(&mut self, succeeded: bool) { match *self { ZoneSigningStatus::Requested(_) => { *self = Self::Aborted; @@ -1697,10 +1305,10 @@ impl std::fmt::Display for ZoneSigningStatus { const SIGNING_QUEUE_SIZE: usize = 100; #[derive(Serialize)] -struct NamedZoneSigningStatus { - zone_name: StoredName, - current_action: String, - status: ZoneSigningStatus, +pub struct NamedZoneSigningStatus { + pub zone_name: StoredName, + pub current_action: String, + pub status: ZoneSigningStatus, } struct ZoneSignerStatus { @@ -2013,8 +1621,6 @@ pub fn load_binary_file(path: &Path) -> Vec { pub enum SignerError { SoaNotFound, - CannotSignUnapprovedZone, - CannotResignNonPublishedZone, SignerNotReady, InternalError(String), KeepSerialPolicyViolated, @@ -2029,23 +1635,10 @@ pub enum SignerError { SigningError(String), } -impl SignerError { - pub fn is_benign(&self) -> bool { - matches!( - self, - SignerError::CannotSignUnapprovedZone | SignerError::CannotResignNonPublishedZone - ) - } -} - impl std::fmt::Display for SignerError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { SignerError::SoaNotFound => f.write_str("SOA not found"), - SignerError::CannotSignUnapprovedZone => f.write_str("Cannot sign unapproved zone"), - SignerError::CannotResignNonPublishedZone => { - f.write_str("Cannot re-sign non-published zone") - } SignerError::SignerNotReady => f.write_str("Signer not ready"), SignerError::InternalError(err) => write!(f, "Internal error: {err}"), SignerError::KeepSerialPolicyViolated => { diff --git a/src/util.rs b/src/util.rs index 1ce4797f..aad87085 100644 --- a/src/util.rs +++ b/src/util.rs @@ -113,6 +113,12 @@ impl Drop for BackgroundTasks { } } +impl fmt::Debug for BackgroundTasks { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_set().entries(self.tasks.keys()).finish() + } +} + //------------------------------------------------------------------------------ /// Force a [`Future`] to evaluate synchronously. diff --git a/src/zone/storage.rs b/src/zone/storage.rs index 984fed2e..a7093448 100644 --- a/src/zone/storage.rs +++ b/src/zone/storage.rs @@ -6,6 +6,7 @@ use std::{fmt, sync::Arc}; +use cascade_api::ZoneReviewStatus; use cascade_zonedata::{ LoadedZoneBuilder, LoadedZoneBuilt, LoadedZonePersister, LoadedZoneReader, LoadedZoneReviewer, SignedZoneBuilder, SignedZoneBuilt, SignedZoneReader, SignedZoneReviewer, ZoneCleaner, @@ -16,6 +17,7 @@ use tracing::{info, trace, trace_span, warn}; use crate::{ center::Center, + common::light_weight_zone::LightWeightZone, util::{BackgroundTasks, force_future}, zone::{HistoricalEvent, PipelineMode, SigningTrigger, Zone, ZoneHandle, ZoneState}, }; @@ -290,6 +292,13 @@ impl StorageZoneHandle<'_> { fields(zone = %self.zone.name), )] pub fn approve_loaded(&mut self) { + self.state.record_event( + HistoricalEvent::UnsignedZoneReview { + status: ZoneReviewStatus::Approved, + }, + None, // TODO + ); + // Examine the current state. let machine = &mut self.state.storage.machine; match machine.take() { @@ -513,8 +522,10 @@ impl StorageZoneHandle<'_> { ) -> zonetree::Zone { use zonetree::{types::ZoneUpdate, update::ZoneUpdater}; - let zone = - zonetree::ZoneBuilder::new(zone.name.clone(), domain::base::iana::Class::IN).build(); + // Use a LightWeightZone as it is able to fix RRSIG TTLs to be the same + // when walked as the record they sign, rather than being forced into a + // common RRSET with a common TTL. + let zone = domain::zonetree::Zone::new(LightWeightZone::new(zone.name.clone(), false)); let mut updater = force_future(ZoneUpdater::new(zone.clone())).unwrap(); @@ -628,26 +639,17 @@ impl StorageZoneHandle<'_> { }; // Transition the state machine. - trace!("Finished persisting"); + trace!( + machine.old = "PersistingLoaded", + machine.new = "Signing", + "Finished persisting"); let machine = &mut handle.state.storage.machine; match machine.take() { ZoneDataStorage::PersistingLoaded(s) => { - // For now, transition all the way back to 'Passive' state. - trace!("Transitioning the state machine to 'Cleaning'"); - let (s, mut builder) = s.mark_complete(persisted); - builder.clear(); - let built = builder.finish().unwrap_or_else(|_| unreachable!()); - let (s, reviewer) = s.finish(built); - let old_signed_reviewer = - std::mem::replace(&mut handle.state.storage.signed_reviewer, reviewer); - let s = s.start(old_signed_reviewer); - let (s, persister) = s.mark_approved(); - let persisted = persister.persist(); - let (s, viewer) = s.mark_complete(persisted); - let old_viewer = std::mem::replace(&mut handle.state.storage.viewer, viewer); - let (s, cleaner) = s.switch(old_viewer); - *machine = ZoneDataStorage::Cleaning(s); - handle.storage().start_cleanup(cleaner); + let (s, builder) = s.mark_complete(persisted); + *machine = ZoneDataStorage::Signing(s); + + handle.signer().enqueue_sign(builder); } _ => unreachable!( @@ -655,9 +657,6 @@ impl StorageZoneHandle<'_> { ), } - // Notify the rest of Cascade that the storage is idle. - handle.storage().on_idle(); - handle.state.storage.background_tasks.finish(); }); }