Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ kube = { version = "0.98.0", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.24.0", features = ["latest"] }
base64 = "0.22.1"
aes-gcm = "0.10.3"
sha2 = "0.10.8"

[build-dependencies]
cynic-codegen = { version = "3" }
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM rust:1.85.0-alpine AS builder
FROM rust:1.92-alpine AS builder

WORKDIR /opt/xtm-composer
COPY . .
Expand Down
2 changes: 1 addition & 1 deletion config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ opencti:
env_type: docker
api_version: v1.41

openbas:
openaev:
enable: false
url: http://host.docker.internal:4000
token: ChangeMe
Expand Down
45 changes: 45 additions & 0 deletions src/api/decrypt_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use base64::{engine::general_purpose, Engine as _};
use aes_gcm::{
aead::{Aead, KeyInit},
Aes256Gcm, Nonce
};
use rsa::{Oaep, Pkcs1v15Encrypt, RsaPrivateKey};
use tracing::warn;
use sha2::Sha256;

pub fn parse_aes_encrypted_value(
private_key: &RsaPrivateKey,
encrypted_value: String
) -> Result<String, Box<dyn std::error::Error>> {
let encrypted_bytes = general_purpose::STANDARD.decode(encrypted_value)?;

let version = *encrypted_bytes.get(0)
.ok_or("Encrypted value is empty")?;

let aes_key_iv_encrypted_bytes = &encrypted_bytes[1..=512];
let aes_key_iv_decrypted_bytes = match version {
1 => private_key.decrypt(Pkcs1v15Encrypt, aes_key_iv_encrypted_bytes)?,
2 => private_key.decrypt(Oaep::new::<Sha256>(), aes_key_iv_encrypted_bytes)?,
_ => {
warn!(version, "Encryption version not handled");
return Ok(String::new());
}
};
let aes_key_bytes = &aes_key_iv_decrypted_bytes[0..32];
let aes_iv_bytes = &aes_key_iv_decrypted_bytes[32..44];
let encrypted_value_bytes = &encrypted_bytes[513..];

let cipher = Aes256Gcm::new_from_slice(&aes_key_bytes)?;
let nonce = Nonce::from_slice(&aes_iv_bytes);
let plaintext_result = cipher.decrypt(&nonce, encrypted_value_bytes.as_ref());
match plaintext_result {
Ok(plaintext) => {
let decoded_value = str::from_utf8(&plaintext)?.to_string();
Ok(decoded_value)
},
Err(e) => {
warn!(error = e.to_string(), "Fail to decode value");
Ok(String::from(""))
}
}
}
28 changes: 20 additions & 8 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use std::str::FromStr;
use std::time::Duration;
use tracing::info;

pub mod openbas;
pub mod openaev;
pub mod opencti;
mod decrypt_value;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct ContractsManifest {
Expand Down Expand Up @@ -99,11 +100,20 @@ impl ApiConnector {
is_sensitive: config.is_sensitive,
})
.collect::<Vec<EnvVariable>>();
envs.push(EnvVariable {
key: "OPENCTI_URL".into(),
value: settings.opencti.url.clone(),
is_sensitive: false,
});
if settings.opencti.enable {
envs.push(EnvVariable {
key: "OPENCTI_URL".into(),
value: settings.opencti.url.clone(),
is_sensitive: false,
});
}
if settings.openaev.enable {
envs.push(EnvVariable {
key: "OPENAEV_URL".into(),
value: settings.openaev.url.clone(),
is_sensitive: false,
});
}
envs.push(EnvVariable {
key: "OPENCTI_CONFIG_HASH".into(),
value: self.contract_hash.clone(),
Expand Down Expand Up @@ -171,7 +181,9 @@ pub trait ComposerApi {

async fn patch_status(&self, id: String, status: ConnectorStatus) -> Option<ApiConnector>;

async fn patch_logs(&self, id: String, logs: Vec<String>) -> Option<cynic::Id>;
async fn patch_logs(&self, id: String, logs: Vec<String>) -> Option<String>;

async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool) -> Option<String>;

async fn patch_health(&self, id: String, restart_count: u32, started_at: String, is_in_reboot_loop: bool) -> Option<cynic::Id>;
async fn container_removed_successfully(&self, id: String) -> ();
}
64 changes: 64 additions & 0 deletions src/api/openaev/api_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use serde::de::DeserializeOwned;
use tracing::error;

pub async fn handle_api_response<T>(
response: Result<reqwest::Response, reqwest::Error>,
operation_name: &str,
) -> Option<T>
where
T: DeserializeOwned
{
match response {
Ok(resp) if resp.status().is_success() => {
match resp.json::<T>().await {
Ok(data) => Some(data),
Err(err) => {
error!("Failed to parse JSON for {}: {}", operation_name, err.to_string());
None
}
}
}
Ok(resp) => {
error!(
status = resp.status().as_u16(),
"Failed to {}: non-success status code", operation_name
);
None
}
Err(err) => {
error!(
error = err.to_string(),
"Failed to {}, check your configuration", operation_name
);
None
}
}
}

pub async fn handle_api_text_response(
response: Result<reqwest::Response, reqwest::Error>,
operation_name: &str,
) -> Option<String> {
match response {
Ok(resp) if resp.status().is_success() => {
resp.text().await.ok().or_else(|| {
error!("Failed to read response body for {}", operation_name);
None
})
}
Ok(resp) => {
error!(
status = resp.status().as_u16(),
"Failed to {}: non-success status code", operation_name
);
None
}
Err(err) => {
error!(
error = err.to_string(),
"Failed to {}, check your configuration", operation_name
);
None
}
}
}
18 changes: 18 additions & 0 deletions src/api/openaev/connector/get_connector_instances.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use crate::api::ApiConnector;
use crate::api::openaev::api_handler::handle_api_response;
use crate::api::openaev::connector::ConnectorInstances;

pub async fn get_connector_instances(api: &crate::api::openaev::ApiOpenAEV) -> Option<Vec<ApiConnector>> {
let settings = crate::settings();
let get_connectors = api.get(&format!("/xtm-composer/{}/connector-instances", settings.manager.id))
.send()
.await;

handle_api_response::<Vec<ConnectorInstances>>(get_connectors, "fetch connector instances")
.await.map(|connectors| {
connectors
.into_iter()
.map(|connector| connector.to_api_connector(&api.private_key))
.collect()
})
}
78 changes: 78 additions & 0 deletions src/api/openaev/connector/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use rsa::{RsaPrivateKey};
use serde::Deserialize;
use tracing::warn;
use crate::api::{ApiConnector, ApiContractConfig};
use crate::api::decrypt_value::parse_aes_encrypted_value;

pub mod get_connector_instances;
pub mod patch_health;
pub mod patch_status;
pub mod post_logs;
pub mod notify_container_removed;

#[derive(Debug, Deserialize)]
pub struct ConnectorContractConfiguration {
pub configuration_key: String,
pub configuration_value: Option<String>,
pub configuration_is_encrypted: bool,
}

#[derive(Debug, Deserialize)]
pub struct ConnectorInstances {
pub connector_instance_id: String,
pub connector_instance_name: String,
pub connector_instance_hash: Option<String>,
pub connector_image: Option<String>,
pub connector_instance_current_status: Option<String>,
pub connector_instance_requested_status: Option<String>,
pub connector_instance_configurations: Option<Vec<ConnectorContractConfiguration>>,
}

impl ConnectorInstances {

pub fn to_api_connector(&self, private_key: &RsaPrivateKey )->ApiConnector {
let contract_configuration = self
.connector_instance_configurations
.as_ref()
.unwrap()
.into_iter()
.map(|c| {
let is_sensitive = c.configuration_is_encrypted;
if is_sensitive {
let encrypted_value = c.configuration_value.clone().unwrap_or_default();
let decoded_value_result = parse_aes_encrypted_value(private_key, encrypted_value);
match decoded_value_result {
Ok(decoded_value) => ApiContractConfig {
key: c.configuration_key.clone(),
value: decoded_value,
is_sensitive: true,
},
Err(e) => {
warn!(error = e.to_string(), "Fail to decode value");
ApiContractConfig {
key: c.configuration_key.clone(),
value: String::new(),
is_sensitive: true,
}
}
}
} else {
ApiContractConfig {
key: c.configuration_key.clone(),
value: c.configuration_value.clone().unwrap_or_default(),
is_sensitive: false,
}
}
})
.collect();
ApiConnector {
id: self.connector_instance_id.clone(),
name: self.connector_instance_name.clone(),
image: self.connector_image.clone().unwrap(),
contract_hash: self.connector_instance_hash.clone().unwrap(),
current_status: self.connector_instance_current_status.clone(),
requested_status: self.connector_instance_requested_status.clone().unwrap(),
contract_configuration,
}
}
}
14 changes: 14 additions & 0 deletions src/api/openaev/connector/notify_container_removed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::api::openaev::api_handler::{handle_api_text_response};
use crate::api::openaev::ApiOpenAEV;

pub async fn notify_container_removed(id: String, api: &ApiOpenAEV) {
let settings = crate::settings();
let response = api.delete(&format!("/xtm-composer/{}/connector-instances/{}", settings.manager.id, id))
.send()
.await;

let _ = handle_api_text_response(
response,
"Notify OpenAEV that the container has been successfully removed"
).await;
}
38 changes: 38 additions & 0 deletions src/api/openaev/connector/patch_health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use serde::Serialize;
use crate::api::openaev::api_handler::handle_api_response;
use crate::api::openaev::ApiOpenAEV;
use crate::api::openaev::connector::ConnectorInstances;

#[derive(Serialize)]
struct ConnectorInstanceHealthInput {
connector_instance_restart_count: u32,
connector_instance_started_at: String,
connector_instance_is_in_reboot_loop: bool
}

pub async fn update_health(
id: String,
restart_count: u32,
started_at: String,
is_in_reboot_loop: bool,
api: &ApiOpenAEV,
)-> Option<String> {
let settings = crate::settings();
let health_check_input = ConnectorInstanceHealthInput {
connector_instance_restart_count: restart_count,
connector_instance_started_at: started_at,
connector_instance_is_in_reboot_loop: is_in_reboot_loop
};

let health_check_response = api.put(&format!("/xtm-composer/{}/connector-instances/{}/health-check", settings.manager.id, id))
.json(&health_check_input)
.send()
.await;

let _ = handle_api_response::<ConnectorInstances>(
health_check_response,
"push health metrics"
).await;

Some(id)
}
32 changes: 32 additions & 0 deletions src/api/openaev/connector/patch_status.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use serde::Serialize;
use crate::api::{ApiConnector, ConnectorStatus};
use crate::api::openaev::api_handler::handle_api_response;
use crate::api::openaev::ApiOpenAEV;
use crate::api::openaev::connector::ConnectorInstances;
use crate::api::opencti::connector::post_status::ConnectorCurrentStatus;

#[derive(Serialize)]
struct UpdateConnectorInstanceStatusInput {
connector_instance_current_status: ConnectorCurrentStatus,
}

pub async fn update_status(id: String, status: ConnectorStatus, api: &ApiOpenAEV) -> Option<ApiConnector> {
let update_status = match status {
ConnectorStatus::Started => ConnectorCurrentStatus::Started,
_ => ConnectorCurrentStatus::Stopped,
};

let status_input = UpdateConnectorInstanceStatusInput {
connector_instance_current_status: update_status
};

let settings = crate::settings();
let update_status_response = api.put(&format!("/xtm-composer/{}/connector-instances/{}/status", settings.manager.id, id))
.json(&status_input)
.send()
.await;

handle_api_response::<ConnectorInstances>(update_status_response, "patch connector instance status")
.await
.map(|connector| connector.to_api_connector(&api.private_key))
}
Loading