Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ kube = { version = "0.98.0", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.24.0", features = ["latest"] }
base64 = "0.22.1"
aes-gcm = "0.10.3"
axum = "0.8.8"
prometheus = "0.14.0"

[build-dependencies]
cynic-codegen = { version = "3" }
4 changes: 3 additions & 1 deletion config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ manager:
# credentials_key_filepath: /path/to/private_key.pem

# Note: If both are set, filepath takes priority with a warning

prometheus:
enable: false
port: 14270
logger:
level: info
format: json
Expand Down
5 changes: 2 additions & 3 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl ApiConnector {
}

pub fn container_envs(&self) -> Vec<EnvVariable> {
let settings = crate::settings();
let settings = &crate::config::settings::SETTINGS;
let mut envs = self
.contract_configuration
.iter()
Expand All @@ -107,8 +107,7 @@ impl ApiConnector {

/// Display environment variables with sensitive values masked (if configured)
pub fn display_env_variables(&self) {
let settings = crate::settings();

let settings = &crate::config::settings::SETTINGS;
// Check if display is enabled in configuration
let should_display = settings
.manager
Expand Down
10 changes: 8 additions & 2 deletions src/api/openbas/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct ApiOpenBAS {

impl ApiOpenBAS {
pub fn new() -> Self {
let settings = crate::settings();
let settings = &crate::config::settings::SETTINGS;
let bearer = format!("{} {}", BEARER, settings.openbas.token);
let api_uri = format!("{}/api", &settings.openbas.url);
let daemon = settings.openbas.daemon.clone();
Expand Down Expand Up @@ -81,7 +81,13 @@ impl ComposerApi for ApiOpenBAS {
todo!()
}

async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool) -> Option<cynic::Id> {
async fn patch_health(
&self,
id: String,
restart_count: u32,
started_at: String,
is_in_reboot_loop: bool,
) -> Option<cynic::Id> {
todo!()
}
}
6 changes: 3 additions & 3 deletions src/api/opencti/manager/post_ping.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::api::opencti::ApiOpenCTI;
use crate::api::opencti::error_handler::{extract_optional_field, handle_graphql_response};
use crate::api::opencti::manager::ConnectorManager;
use crate::api::opencti::error_handler::{handle_graphql_response, extract_optional_field};
use crate::settings;

use tracing::error;

use crate::api::opencti::opencti as schema;
Expand Down Expand Up @@ -32,7 +32,7 @@ pub struct UpdateConnectorManagerStatusInput<'a> {
pub async fn ping(api: &ApiOpenCTI) -> Option<String> {
use cynic::MutationBuilder;

let settings = settings();
let settings = &crate::config::settings::SETTINGS;
let vars = UpdateConnectorManagerStatusVariables {
input: UpdateConnectorManagerStatusInput {
id: &cynic::Id::new(&settings.manager.id),
Expand Down
13 changes: 6 additions & 7 deletions src/api/opencti/manager/post_register.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::api::opencti::ApiOpenCTI;
use crate::api::opencti::error_handler::{extract_optional_field, handle_graphql_response};
use crate::api::opencti::manager::ConnectorManager;
use crate::api::opencti::error_handler::{handle_graphql_response, extract_optional_field};
use crate::api::opencti::opencti as schema;
use cynic;
use tracing::{error, info};
use rsa::{RsaPublicKey, pkcs1::EncodeRsaPublicKey};
use tracing::{error, info};

// region schema
#[derive(cynic::QueryVariables, Debug)]
Expand Down Expand Up @@ -34,9 +34,8 @@ pub struct RegisterConnectorsManagerInput<'a> {
pub async fn register(api: &ApiOpenCTI) {
use cynic::MutationBuilder;

let settings = crate::settings();
// Use the singleton private key
let priv_key = crate::private_key();
let settings = &crate::config::settings::SETTINGS;
let priv_key = &*crate::config::rsa::PRIVATE_KEY;
let pub_key = RsaPublicKey::from(priv_key);
let public_key = RsaPublicKey::to_pkcs1_pem(&pub_key, Default::default()).unwrap();

Expand All @@ -54,12 +53,12 @@ pub async fn register(api: &ApiOpenCTI) {
if let Some(data) = handle_graphql_response(
response,
"register_connectors_manager",
"OpenCTI backend does not support XTM composer manager registration. The composer will continue to run but won't be registered in OpenCTI."
"OpenCTI backend does not support XTM composer manager registration. The composer will continue to run but won't be registered in OpenCTI.",
) {
if let Some(manager) = extract_optional_field(
data.register_connectors_manager,
"register_connectors_manager",
"register_connectors_manager"
"register_connectors_manager",
) {
info!(manager_id = manager.id.into_inner(), "Manager registered");
}
Expand Down
18 changes: 12 additions & 6 deletions src/api/opencti/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use crate::config::settings::Daemon;
use async_trait::async_trait;
use cynic::Operation;
use cynic::http::CynicReqwestError;
use rsa::RsaPrivateKey;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::time::Duration;
use rsa::RsaPrivateKey;

pub mod connector;
pub mod manager;
pub mod error_handler;
pub mod manager;

const BEARER: &str = "Bearer";
const AUTHORIZATION_HEADER: &str = "Authorization";
Expand All @@ -30,23 +30,23 @@ pub struct ApiOpenCTI {

impl ApiOpenCTI {
pub fn new() -> Self {
let settings = crate::settings();
let settings = &crate::config::settings::SETTINGS;
let bearer = format!("{} {}", BEARER, settings.opencti.token);
let api_uri = format!("{}/graphql", &settings.opencti.url);
let daemon = settings.opencti.daemon.clone();
let logs_schedule = settings.opencti.logs_schedule;
let request_timeout = settings.opencti.request_timeout;
let connect_timeout = settings.opencti.connect_timeout;
// Use the singleton private key
let private_key = crate::private_key().clone();
let private_key = crate::config::rsa::PRIVATE_KEY.clone();
Self {
api_uri,
bearer,
daemon,
logs_schedule,
request_timeout,
connect_timeout,
private_key
private_key,
}
}

Expand Down Expand Up @@ -105,7 +105,13 @@ impl ComposerApi for ApiOpenCTI {
connector::post_logs::logs(id, logs, self).await
}

async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool) -> Option<cynic::Id> {
async fn patch_health(
&self,
id: String,
restart_count: u32,
started_at: String,
is_in_reboot_loop: bool,
) -> Option<cynic::Id> {
connector::post_health::health(id, restart_count, started_at, is_in_reboot_loop, self).await
}
}
1 change: 1 addition & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod rsa;
pub mod settings;
58 changes: 58 additions & 0 deletions src/config/rsa.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use rsa::{RsaPrivateKey, pkcs8::DecodePrivateKey};
use std::fs;
use std::sync::LazyLock;
use tracing::{info, warn};

// Singleton RSA private key for all application
pub static PRIVATE_KEY: LazyLock<RsaPrivateKey> =
LazyLock::new(|| load_and_verify_credentials_key());

// Load and verify RSA private key from configuration
fn load_and_verify_credentials_key() -> RsaPrivateKey {
let setting = &crate::config::settings::SETTINGS;

// Priority: file > environment variable
let key_content = if let Some(filepath) = &setting.manager.credentials_key_filepath {
// Warning if both are set
if setting.manager.credentials_key.is_some() {
warn!(
"Both credentials_key and credentials_key_filepath are set. Using filepath (priority)."
);
}

// Read key from file
match fs::read_to_string(filepath) {
Ok(content) => content,
Err(e) => panic!("Failed to read credentials key file '{}': {}", filepath, e),
}
} else if let Some(key) = &setting.manager.credentials_key {
// Use environment variable or config value
key.clone()
} else {
panic!(
"No credentials key provided! Set either 'manager.credentials_key' or 'manager.credentials_key_filepath' in configuration."
);
};

// Validate key format (trim to handle trailing whitespace)
// Check for presence of RSA PRIVATE KEY markers for PKCS#8 format
let trimmed_content = key_content.trim();
if !trimmed_content.contains("BEGIN PRIVATE KEY")
|| !trimmed_content.contains("END PRIVATE KEY")
{
panic!(
"Invalid private key format. Expected PKCS#8 PEM format with 'BEGIN PRIVATE KEY' and 'END PRIVATE KEY' markers."
);
}

// Parse and validate RSA private key using PKCS#8 format
match RsaPrivateKey::from_pkcs8_pem(&key_content) {
Ok(key) => {
info!("Successfully loaded RSA private key (PKCS#8 format)");
key
}
Err(e) => {
panic!("Failed to decode RSA private key: {}", e);
}
}
}
12 changes: 12 additions & 0 deletions src/config/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ use k8s_openapi::api::apps::v1::Deployment;
use k8s_openapi::api::core::v1::ResourceRequirements;
use serde::Deserialize;
use std::env;
use std::sync::LazyLock;

const ENV_PRODUCTION: &str = "production";

// Singleton settings for all application
pub static SETTINGS: LazyLock<Settings> = LazyLock::new(|| Settings::new().unwrap());

#[derive(Debug, Deserialize, Clone)]
#[allow(unused)]
pub struct Logger {
Expand Down Expand Up @@ -40,6 +44,7 @@ pub struct Manager {
pub credentials_key: Option<String>,
pub credentials_key_filepath: Option<String>,
pub debug: Option<Debug>,
pub prometheus: Option<Prometheus>,
}

#[derive(Debug, Deserialize, Clone)]
Expand Down Expand Up @@ -131,6 +136,13 @@ pub struct Docker {
pub ulimits: Option<Vec<std::collections::HashMap<String, serde_json::Value>>>,
}

#[derive(Debug, Deserialize, Clone)]
#[allow(unused)]
pub struct Prometheus {
pub enable: bool,
pub port: u16,
}

#[derive(Debug, Deserialize, Clone)]
#[allow(unused)]
pub struct Settings {
Expand Down
6 changes: 3 additions & 3 deletions src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ use crate::orchestrator::docker::DockerOrchestrator;
use crate::orchestrator::kubernetes::KubeOrchestrator;
use crate::orchestrator::portainer::docker::PortainerDockerOrchestrator;
use crate::orchestrator::{Orchestrator, composer};
use crate::settings;

use crate::system::signals;
use std::time::{Duration, Instant};
use tokio::task::JoinHandle;
use tokio::time::interval;

async fn orchestration(api: Box<dyn ComposerApi + Send + Sync>) {
let settings = settings();
let settings = &crate::config::settings::SETTINGS;
// Get current deployment in target orchestrator
let daemon_configuration = api.daemon();
let orchestrator: Box<dyn Orchestrator + Send + Sync> =
Expand Down Expand Up @@ -51,7 +51,7 @@ async fn orchestration(api: Box<dyn ComposerApi + Send + Sync>) {
}

pub async fn alive(api: Box<dyn ComposerApi + Send + Sync>) -> JoinHandle<()> {
let settings = settings();
let settings = &crate::config::settings::SETTINGS;
let mut interval = interval(Duration::from_secs(settings.manager.ping_alive_schedule));
tokio::spawn(async move {
// Start scheduling
Expand Down
5 changes: 2 additions & 3 deletions src/engine/openbas.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use tokio::task::JoinHandle;
use tracing::info;
use crate::api::ComposerApi;
use crate::api::openbas::ApiOpenBAS;
use crate::engine::{alive, orchestration};
use tokio::task::JoinHandle;
use tracing::info;

pub fn openbas_orchestration() -> JoinHandle<()> {
info!("Starting OpenBAS connectors orchestration");
Expand All @@ -19,4 +19,3 @@ pub fn openbas_alive() -> JoinHandle<()> {
alive(api).await;
})
}

6 changes: 3 additions & 3 deletions src/engine/opencti.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use tokio::task::JoinHandle;
use tracing::info;
use crate::api::ComposerApi;
use crate::api::opencti::ApiOpenCTI;
use crate::engine::{alive, orchestration};
use tokio::task::JoinHandle;
use tracing::info;

pub fn opencti_alive() -> JoinHandle<()> {
info!("Starting OpenCTI Composer ping alive");
Expand All @@ -18,4 +18,4 @@ pub fn opencti_orchestration() -> JoinHandle<()> {
let api: Box<dyn ComposerApi + Send + Sync> = Box::new(ApiOpenCTI::new());
orchestration(api).await;
})
}
}
Loading