From 0e04f4b10b59d018dafc48607548d2835b75080c Mon Sep 17 00:00:00 2001 From: JIN Jie Date: Wed, 31 Dec 2025 16:09:53 +0800 Subject: [PATCH] feat: move response_code to attribute on rewrite --- agent/crates/public/src/l7_protocol.rs | 6 +- agent/src/flow_generator/protocol_logs.rs | 13 +++- .../src/flow_generator/protocol_logs/http.rs | 76 ++++++++++--------- .../flow_generator/protocol_logs/rpc/dubbo.rs | 17 +++-- .../protocol_logs/rpc/some_ip.rs | 4 +- .../flow_generator/protocol_logs/sql/mysql.rs | 20 +++-- agent/src/plugin/wasm/vm.rs | 2 +- server/agent_config/README-CH.md | 7 +- server/agent_config/README.md | 7 +- server/agent_config/template.yaml | 14 +++- 10 files changed, 109 insertions(+), 57 deletions(-) diff --git a/agent/crates/public/src/l7_protocol.rs b/agent/crates/public/src/l7_protocol.rs index 45698232725..f214a476274 100644 --- a/agent/crates/public/src/l7_protocol.rs +++ b/agent/crates/public/src/l7_protocol.rs @@ -389,7 +389,11 @@ pub enum NativeTag { BizScenario, } -pub trait L7Log { +pub trait L7LogAttribute { + fn add_attribute(&mut self, _name: Cow<'_, str>, _value: Cow<'_, str>) {} +} + +pub trait L7Log: L7LogAttribute { fn get_response_status(&self) -> L7ResponseStatus; fn set_response_status(&mut self, response_status: L7ResponseStatus); diff --git a/agent/src/flow_generator/protocol_logs.rs b/agent/src/flow_generator/protocol_logs.rs index 3564b37fd38..fea8a336145 100644 --- a/agent/src/flow_generator/protocol_logs.rs +++ b/agent/src/flow_generator/protocol_logs.rs @@ -726,16 +726,27 @@ cfg_if::cfg_if! { use log::warn; use enterprise_utils::l7::custom_policy::custom_field_policy::enums::{Op, Operation}; - use public::l7_protocol::{FieldSetter, L7Log}; + use public::l7_protocol::{Field, FieldSetter, L7Log, NativeTag}; + + use consts::SYS_RESPONSE_CODE_ATTR; pub fn auto_merge_custom_field(op: Operation, log: &mut L) { let Operation { op, prio } = op; match op { Op::RewriteResponseStatus(status) => log.set_response_status(status), Op::RewriteNativeTag(tag, value) => { + // append to sys_response_code if response_code is not empty + if tag == NativeTag::ResponseCode { + match log.get_response_code() { + Field::Str(s) => log.add_attribute(Cow::Borrowed(SYS_RESPONSE_CODE_ATTR), Cow::Owned(s.to_string())), + Field::Int(i) => log.add_attribute(Cow::Borrowed(SYS_RESPONSE_CODE_ATTR), Cow::Owned(i.to_string())), + Field::None => (), + } + } let field = FieldSetter::new(CUSTOM_FIELD_POLICY_PRIORITY + prio, value.as_str().into()); log.set(tag, field); } + Op::AddAttribute(name, value) => log.add_attribute(Cow::Borrowed(name.as_str()), Cow::Borrowed(value.as_str())), _ => warn!("Ignored operation {op:?} that is not supported by auto custom field merging"), } } diff --git a/agent/src/flow_generator/protocol_logs/http.rs b/agent/src/flow_generator/protocol_logs/http.rs index cc879587569..526e512ef81 100755 --- a/agent/src/flow_generator/protocol_logs/http.rs +++ b/agent/src/flow_generator/protocol_logs/http.rs @@ -26,7 +26,9 @@ use hpack::Decoder; use nom::{AsBytes, ParseTo}; use serde::Serialize; -use public::l7_protocol::{Field, FieldSetter, L7Log, L7ProtocolChecker, LogMessageType}; +use public::l7_protocol::{ + Field, FieldSetter, L7Log, L7LogAttribute, L7ProtocolChecker, LogMessageType, +}; use public_derive::L7Log; use super::{ @@ -293,10 +295,9 @@ pub struct HttpInfo { #[serde(rename = "response_length", skip_serializing_if = "Option::is_none")] pub resp_content_length: Option, - // status_code == 0 means None #[l7_log(response_code)] - #[serde(rename = "response_code", skip_serializing_if = "value_is_default")] - pub status_code: u16, + #[serde(rename = "response_code", skip_serializing_if = "Option::is_none")] + pub status_code: Option, #[l7_log(response_status)] #[serde(rename = "response_status")] pub status: L7ResponseStatus, @@ -343,6 +344,15 @@ pub struct HttpInfo { is_reversed: bool, } +impl L7LogAttribute for HttpInfo { + fn add_attribute(&mut self, name: Cow<'_, str>, value: Cow<'_, str>) { + self.attributes.push(KeyVal { + key: name.into_owned(), + val: value.into_owned(), + }); + } +} + impl L7ProtocolInfoInterface for HttpInfo { fn session_id(&self) -> Option { self.stream_id @@ -420,7 +430,10 @@ impl L7ProtocolInfoInterface for HttpInfo { impl HttpInfo { fn is_invalid_status_code(&self) -> bool { - !(HTTP_STATUS_CODE_MIN..=HTTP_STATUS_CODE_MAX).contains(&self.status_code) + match self.status_code { + Some(code) => !(HTTP_STATUS_CODE_MIN..=HTTP_STATUS_CODE_MAX).contains(&code), + None => true, + } } fn is_invalid(&self) -> bool { @@ -431,10 +444,12 @@ impl HttpInfo { // when response_code is overwritten, put it into the attributes. fn response_code_to_attribute(&mut self) { - self.attributes.push(KeyVal { - key: SYS_RESPONSE_CODE_ATTR.to_string(), - val: self.status_code.to_string(), - }); + if let Some(code) = self.status_code { + self.attributes.push(KeyVal { + key: SYS_RESPONSE_CODE_ATTR.to_string(), + val: code.to_string(), + }); + } } pub fn merge_custom_to_http(&mut self, custom: CustomInfo, dir: PacketDirection) { @@ -467,7 +482,7 @@ impl HttpInfo { if dir == PacketDirection::ServerToClient { if let Some(code) = custom.resp.code { self.response_code_to_attribute(); - self.status_code = code as u16; + self.status_code = Some(code as u16); } if custom.resp.status != self.status { @@ -570,7 +585,7 @@ impl HttpInfo { if other.status != L7ResponseStatus::default() { self.status = other.status; } - if self.status_code == 0 { + if self.status_code.is_none() { self.status_code = other.status_code; } if self.grpc_status_code.is_none() && other.grpc_status_code.is_some() { @@ -623,10 +638,10 @@ impl HttpInfo { } pub fn is_empty(&self) -> bool { - return self.host.is_empty() + self.host.is_empty() && self.method.is_none() && self.path.is_empty() - && self.status_code == 0; + && self.status_code.is_none() } pub fn is_req_resp_end(&self) -> (bool, bool) { @@ -843,15 +858,15 @@ impl From for L7ProtocolSendLog { L7Protocol::Grpc => { if let Some(code) = f.grpc_status_code { Some(code as i32) - } else if f.status_code > 0 { - Some(f.status_code as i32) + } else if let Some(code) = f.status_code { + Some(code as i32) } else { None } } _ => { - if f.status_code > 0 { - Some(f.status_code as i32) + if let Some(code) = f.status_code { + Some(code as i32) } else { None } @@ -1213,8 +1228,11 @@ impl HttpLog { if param.direction == PacketDirection::ServerToClient { if let Some(code) = info.grpc_status_code { self.set_grpc_status(code, info); + } else if let Some(code) = info.status_code { + self.set_status(code, info); } else { - self.set_status(info.status_code, info); + // default to ok + info.status = L7ResponseStatus::Ok; } if let Some(l7_payload) = l7_payload { @@ -1505,7 +1523,7 @@ impl HttpLog { return Err(Error::HttpHeaderParseFailed); } info.version = version; - info.status_code = status_code; + info.status_code = Some(status_code); info.msg_type = LogMessageType::Response; } else { @@ -1633,8 +1651,9 @@ impl HttpLog { } (PacketDirection::ServerToClient, Method::_ResponseHeader) => { if info.grpc_status_code.is_none() { - if info.status_code == 0 { - return Err(Error::HttpHeaderParseFailed); + match info.status_code { + None | Some(0) => return Err(Error::HttpHeaderParseFailed), + _ => (), } if !grpc_streaming_data_enabled { info.msg_type = LogMessageType::Response; @@ -1916,8 +1935,8 @@ impl HttpLog { } ":status" => { info.msg_type = LogMessageType::Response; - let code = val.parse_to().unwrap_or_default(); - info.status_code = code; + let code: u16 = val.parse_to().unwrap_or_default(); + info.status_code = Some(code); } "host" | ":authority" => info.host = String::from_utf8_lossy(val).into_owned(), ":path" => { @@ -2078,11 +2097,6 @@ impl HttpLog { info.method = Method::try_from(value.as_str()).unwrap_or_default(); } } - // resp - NativeTag::ResponseCode => { - info.response_code_to_attribute(); - info.status_code = value.parse::().unwrap_or_default(); - } // trace NativeTag::SpanId => { if CUSTOM_FIELD_POLICY_PRIORITY <= info.span_id.prio() { @@ -2101,12 +2115,6 @@ impl HttpLog { _ => auto_merge_custom_field(op, info), } } - Op::AddAttribute(key, value) => { - info.attributes.push(KeyVal { - key: key.to_string(), - val: value.to_string(), - }); - } Op::AddMetric(key, value) => { info.metrics.push(MetricKeyVal { key: key.to_string(), diff --git a/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs b/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs index cc303b7ba7f..ed7c833a8ad 100644 --- a/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs +++ b/agent/src/flow_generator/protocol_logs/rpc/dubbo.rs @@ -21,7 +21,7 @@ use std::{borrow::Cow, mem::replace}; use serde::Serialize; -use public::l7_protocol::{Field, FieldSetter, L7Log, LogMessageType}; +use public::l7_protocol::{Field, FieldSetter, L7Log, L7LogAttribute, LogMessageType}; use public_derive::L7Log; use crate::{ @@ -171,6 +171,15 @@ pub struct DubboInfo { biz_scenario: String, } +impl L7LogAttribute for DubboInfo { + fn add_attribute(&mut self, name: Cow<'_, str>, value: Cow<'_, str>) { + self.attributes.push(KeyVal { + key: name.into_owned(), + val: value.into_owned(), + }); + } +} + impl DubboInfo { pub fn generate_endpoint(&self) -> Option { if !self.service_name.is_empty() || !self.method_name.is_empty() { @@ -1137,12 +1146,6 @@ impl DubboLog { _ => auto_merge_custom_field(op, info), } } - Op::AddAttribute(key, value) => { - info.attributes.push(KeyVal { - key: key.to_string(), - val: value.to_string(), - }); - } Op::AddMetric(key, value) => { info.metrics.push(MetricKeyVal { key: key.to_string(), diff --git a/agent/src/flow_generator/protocol_logs/rpc/some_ip.rs b/agent/src/flow_generator/protocol_logs/rpc/some_ip.rs index e849171af16..e6b52810175 100644 --- a/agent/src/flow_generator/protocol_logs/rpc/some_ip.rs +++ b/agent/src/flow_generator/protocol_logs/rpc/some_ip.rs @@ -365,7 +365,7 @@ mod tests { L7ParseResult::Single(s) => { output.push_str(&serde_json::to_string(&s).unwrap()); output.push_str( - format!(" check: {}", some_ip.check_payload(payload, param)).as_str(), + format!(" check: {:?}", some_ip.check_payload(payload, param)).as_str(), ); output.push_str("\n"); } @@ -373,7 +373,7 @@ mod tests { for i in m { output.push_str(&serde_json::to_string(&i).unwrap()); output.push_str( - format!(" check: {}", some_ip.check_payload(payload, param)) + format!(" check: {:?}", some_ip.check_payload(payload, param)) .as_str(), ); output.push_str("\n"); diff --git a/agent/src/flow_generator/protocol_logs/sql/mysql.rs b/agent/src/flow_generator/protocol_logs/sql/mysql.rs index cb0e1277b5c..f54f2580c4c 100644 --- a/agent/src/flow_generator/protocol_logs/sql/mysql.rs +++ b/agent/src/flow_generator/protocol_logs/sql/mysql.rs @@ -57,7 +57,9 @@ use crate::{ }, utils::bytes, }; -use public::l7_protocol::{Field, FieldSetter, L7Log, L7ProtocolChecker, LogMessageType}; +use public::l7_protocol::{ + Field, FieldSetter, L7Log, L7LogAttribute, L7ProtocolChecker, LogMessageType, +}; use public_derive::L7Log; cfg_if::cfg_if! { @@ -156,6 +158,7 @@ pub struct MysqlInfo { pub context: String, // response pub response_code: u8, + #[l7_log(response_code)] #[serde(skip)] pub error_code: Option, #[serde(rename = "sql_affected_rows", skip_serializing_if = "value_is_default")] @@ -193,6 +196,15 @@ pub struct MysqlInfo { attributes: Vec, } +impl L7LogAttribute for MysqlInfo { + fn add_attribute(&mut self, name: Cow<'_, str>, value: Cow<'_, str>) { + self.attributes.push(KeyVal { + key: name.into_owned(), + val: value.into_owned(), + }); + } +} + impl L7ProtocolInfoInterface for MysqlInfo { fn session_id(&self) -> Option { None @@ -1405,12 +1417,6 @@ impl MysqlLog { }; for op in self.custom_field_store.drain_with(policies, &*info) { match &op.op { - Op::AddAttribute(key, value) => { - info.attributes.push(KeyVal { - key: key.to_string(), - val: value.to_string(), - }); - } Op::AddMetric(_, _) | Op::SavePayload(_) => (), _ => auto_merge_custom_field(op, info), } diff --git a/agent/src/plugin/wasm/vm.rs b/agent/src/plugin/wasm/vm.rs index 5b75efc5af0..9f37ac79d11 100644 --- a/agent/src/plugin/wasm/vm.rs +++ b/agent/src/plugin/wasm/vm.rs @@ -423,7 +423,7 @@ impl From<(&ParseParam<'_>, &HttpInfo, &[u8])> for VmHttpRespCtx { let (param, info, payload) = value; Self { base_ctx: VmCtxBase::from((param, info.proto as u8, payload)), - code: info.status_code, + code: info.status_code.unwrap_or_default(), status: info.status, } } diff --git a/server/agent_config/README-CH.md b/server/agent_config/README-CH.md index e7369403bc2..08bc86678e5 100644 --- a/server/agent_config/README-CH.md +++ b/server/agent_config/README-CH.md @@ -8893,6 +8893,7 @@ processors: rewrite_native_tag: # 可以填写以下几种字段之一,用于覆写对应字段的值 # 注意对应的协议需要支持,否则配置无效 + # 当重写 response_code 时,自动将原来的非空值以 `sys_response_code` 为名写入 attribute 中 # - version # - request_type # - request_domain @@ -8912,6 +8913,9 @@ processors: # - biz_code # - biz_scenario name: version + # 映射字典名称,配置不为空时将输入用所配置的字典进行映射。配置为空时不生效 + # 注意:condition 中的黑白名单匹配映射后的结果 + remap: dict_1 condition: enum_whitelist: [] # 枚举白名单,当提取结果在白名单中时,进行重写。配置为空时不生效 enum_blacklist: [] # 枚举黑名单,当提取结果在黑名单中时,不进行重写 @@ -8928,7 +8932,7 @@ processors: # 直接用常量值作为字段值 const_fields: - value: "123" - # 输出配置,参考 fields 中 output 的说明进行配置,但不支持 metric,rewrite_response_status 和 rewrite_native_tag 中的 condition + # 输出配置,参考 fields 中 output 的说明进行配置,但不支持 metric,rewrite_response_status 和 rewrite_native_tag 中的 remap/condition output: attribute_name: "xyz" rewrite_native_tag: @@ -8944,6 +8948,7 @@ processors: metric_name: "xyz" rewrite_native_tag: name: version + remap: dict_1 condition: enum_whitelist: [] enum_blacklist: [] diff --git a/server/agent_config/README.md b/server/agent_config/README.md index 4bfd3a2e85e..ba827f72f5d 100644 --- a/server/agent_config/README.md +++ b/server/agent_config/README.md @@ -9083,6 +9083,7 @@ Example: rewrite_native_tag: # Fill one of the following fields to overwrite the corresponding value # Note: This requires support in the corresponding protocol, otherwise the configuration will not take effect + # When overwriting response_code, the original non-empty value will be saved into the attribute as `sys_response_code` # - version # - request_type # - request_domain @@ -9102,6 +9103,9 @@ Example: # - biz_code # - biz_scenario name: version + # When remapping, the input will be mapped using the configured dictionary. If empty, it will not take effect + # Note: The whitelist/blacklist in condition match the remapped result + remap: dict_1 condition: enum_whitelist: [] # Whitelist: when the extraction result is in the list, overwrite. If empty, does not take effect. enum_blacklist: [] # Blacklist: when result matches any in the list, do not update. @@ -9119,7 +9123,7 @@ Example: const_fields: - value: "123" # Output configuration, refer to the "output" section of "fields" - # "metric", "rewrite_response_status", and the "condition" in "rewrite_native_tag" are not supported here + # "metric", "rewrite_response_status", and the "condition" and "remap" in "rewrite_native_tag" are not supported here output: attribute_name: "xyz" rewrite_native_tag: @@ -9135,6 +9139,7 @@ Example: metric_name: "xyz" rewrite_native_tag: name: version + remap: dict_1 condition: enum_whitelist: [] enum_blacklist: [] diff --git a/server/agent_config/template.yaml b/server/agent_config/template.yaml index 2f54f218b10..dc39367fb5f 100644 --- a/server/agent_config/template.yaml +++ b/server/agent_config/template.yaml @@ -6426,6 +6426,7 @@ processors: # rewrite_native_tag: # # Fill one of the following fields to overwrite the corresponding value # # Note: This requires support in the corresponding protocol, otherwise the configuration will not take effect + # # When overwriting response_code, the original non-empty value will be saved into the attribute as `sys_response_code` # # - version # # - request_type # # - request_domain @@ -6445,6 +6446,9 @@ processors: # # - biz_code # # - biz_scenario # name: version + # # When remapping, the input will be mapped using the configured dictionary. If empty, it will not take effect + # # Note: The whitelist/blacklist in condition match the remapped result + # remap: dict_1 # condition: # enum_whitelist: [] # Whitelist: when the extraction result is in the list, overwrite. If empty, does not take effect. # enum_blacklist: [] # Blacklist: when result matches any in the list, do not update. @@ -6462,7 +6466,7 @@ processors: # const_fields: # - value: "123" # # Output configuration, refer to the "output" section of "fields" - # # "metric", "rewrite_response_status", and the "condition" in "rewrite_native_tag" are not supported here + # # "metric", "rewrite_response_status", and the "condition" and "remap" in "rewrite_native_tag" are not supported here # output: # attribute_name: "xyz" # rewrite_native_tag: @@ -6478,6 +6482,7 @@ processors: # metric_name: "xyz" # rewrite_native_tag: # name: version + # remap: dict_1 # condition: # enum_whitelist: [] # enum_blacklist: [] @@ -6655,6 +6660,7 @@ processors: # rewrite_native_tag: # # 可以填写以下几种字段之一,用于覆写对应字段的值 # # 注意对应的协议需要支持,否则配置无效 + # # 当重写 response_code 时,自动将原来的非空值以 `sys_response_code` 为名写入 attribute 中 # # - version # # - request_type # # - request_domain @@ -6674,6 +6680,9 @@ processors: # # - biz_code # # - biz_scenario # name: version + # # 映射字典名称,配置不为空时将输入用所配置的字典进行映射。配置为空时不生效 + # # 注意:condition 中的黑白名单匹配映射后的结果 + # remap: dict_1 # condition: # enum_whitelist: [] # 枚举白名单,当提取结果在白名单中时,进行重写。配置为空时不生效 # enum_blacklist: [] # 枚举黑名单,当提取结果在黑名单中时,不进行重写 @@ -6690,7 +6699,7 @@ processors: # # 直接用常量值作为字段值 # const_fields: # - value: "123" - # # 输出配置,参考 fields 中 output 的说明进行配置,但不支持 metric,rewrite_response_status 和 rewrite_native_tag 中的 condition + # # 输出配置,参考 fields 中 output 的说明进行配置,但不支持 metric,rewrite_response_status 和 rewrite_native_tag 中的 remap/condition # output: # attribute_name: "xyz" # rewrite_native_tag: @@ -6706,6 +6715,7 @@ processors: # metric_name: "xyz" # rewrite_native_tag: # name: version + # remap: dict_1 # condition: # enum_whitelist: [] # enum_blacklist: []