Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion agent/crates/public/src/l7_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,11 @@ pub enum NativeTag {
BizScenario,
}

pub trait L7Log {
pub trait L7LogAttribute {
fn add_attribute(&mut self, _name: Cow<'_, str>, _value: Cow<'_, str>) {}
}

pub trait L7Log: L7LogAttribute {
fn get_response_status(&self) -> L7ResponseStatus;
fn set_response_status(&mut self, response_status: L7ResponseStatus);

Expand Down
13 changes: 12 additions & 1 deletion agent/src/flow_generator/protocol_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -726,16 +726,27 @@ cfg_if::cfg_if! {
use log::warn;

use enterprise_utils::l7::custom_policy::custom_field_policy::enums::{Op, Operation};
use public::l7_protocol::{FieldSetter, L7Log};
use public::l7_protocol::{Field, FieldSetter, L7Log, NativeTag};

use consts::SYS_RESPONSE_CODE_ATTR;

pub fn auto_merge_custom_field<L: L7Log>(op: Operation, log: &mut L) {
let Operation { op, prio } = op;
match op {
Op::RewriteResponseStatus(status) => log.set_response_status(status),
Op::RewriteNativeTag(tag, value) => {
// append to sys_response_code if response_code is not empty
if tag == NativeTag::ResponseCode {
match log.get_response_code() {
Field::Str(s) => log.add_attribute(Cow::Borrowed(SYS_RESPONSE_CODE_ATTR), Cow::Owned(s.to_string())),
Field::Int(i) => log.add_attribute(Cow::Borrowed(SYS_RESPONSE_CODE_ATTR), Cow::Owned(i.to_string())),
Field::None => (),
}
}
let field = FieldSetter::new(CUSTOM_FIELD_POLICY_PRIORITY + prio, value.as_str().into());
log.set(tag, field);
}
Op::AddAttribute(name, value) => log.add_attribute(Cow::Borrowed(name.as_str()), Cow::Borrowed(value.as_str())),
_ => warn!("Ignored operation {op:?} that is not supported by auto custom field merging"),
}
}
Expand Down
76 changes: 42 additions & 34 deletions agent/src/flow_generator/protocol_logs/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use hpack::Decoder;
use nom::{AsBytes, ParseTo};
use serde::Serialize;

use public::l7_protocol::{Field, FieldSetter, L7Log, L7ProtocolChecker, LogMessageType};
use public::l7_protocol::{
Field, FieldSetter, L7Log, L7LogAttribute, L7ProtocolChecker, LogMessageType,
};
use public_derive::L7Log;

use super::{
Expand Down Expand Up @@ -293,10 +295,9 @@ pub struct HttpInfo {
#[serde(rename = "response_length", skip_serializing_if = "Option::is_none")]
pub resp_content_length: Option<u32>,

// status_code == 0 means None
#[l7_log(response_code)]
#[serde(rename = "response_code", skip_serializing_if = "value_is_default")]
pub status_code: u16,
#[serde(rename = "response_code", skip_serializing_if = "Option::is_none")]
pub status_code: Option<u16>,
#[l7_log(response_status)]
#[serde(rename = "response_status")]
pub status: L7ResponseStatus,
Expand Down Expand Up @@ -343,6 +344,15 @@ pub struct HttpInfo {
is_reversed: bool,
}

impl L7LogAttribute for HttpInfo {
fn add_attribute(&mut self, name: Cow<'_, str>, value: Cow<'_, str>) {
self.attributes.push(KeyVal {
key: name.into_owned(),
val: value.into_owned(),
});
}
}

impl L7ProtocolInfoInterface for HttpInfo {
fn session_id(&self) -> Option<u32> {
self.stream_id
Expand Down Expand Up @@ -420,7 +430,10 @@ impl L7ProtocolInfoInterface for HttpInfo {

impl HttpInfo {
fn is_invalid_status_code(&self) -> bool {
!(HTTP_STATUS_CODE_MIN..=HTTP_STATUS_CODE_MAX).contains(&self.status_code)
match self.status_code {
Some(code) => !(HTTP_STATUS_CODE_MIN..=HTTP_STATUS_CODE_MAX).contains(&code),
None => true,
}
}

fn is_invalid(&self) -> bool {
Expand All @@ -431,10 +444,12 @@ impl HttpInfo {

// when response_code is overwritten, put it into the attributes.
fn response_code_to_attribute(&mut self) {
self.attributes.push(KeyVal {
key: SYS_RESPONSE_CODE_ATTR.to_string(),
val: self.status_code.to_string(),
});
if let Some(code) = self.status_code {
self.attributes.push(KeyVal {
key: SYS_RESPONSE_CODE_ATTR.to_string(),
val: code.to_string(),
});
}
}

pub fn merge_custom_to_http(&mut self, custom: CustomInfo, dir: PacketDirection) {
Expand Down Expand Up @@ -467,7 +482,7 @@ impl HttpInfo {
if dir == PacketDirection::ServerToClient {
if let Some(code) = custom.resp.code {
self.response_code_to_attribute();
self.status_code = code as u16;
self.status_code = Some(code as u16);
}

if custom.resp.status != self.status {
Expand Down Expand Up @@ -570,7 +585,7 @@ impl HttpInfo {
if other.status != L7ResponseStatus::default() {
self.status = other.status;
}
if self.status_code == 0 {
if self.status_code.is_none() {
self.status_code = other.status_code;
}
if self.grpc_status_code.is_none() && other.grpc_status_code.is_some() {
Expand Down Expand Up @@ -623,10 +638,10 @@ impl HttpInfo {
}

pub fn is_empty(&self) -> bool {
return self.host.is_empty()
self.host.is_empty()
&& self.method.is_none()
&& self.path.is_empty()
&& self.status_code == 0;
&& self.status_code.is_none()
}

pub fn is_req_resp_end(&self) -> (bool, bool) {
Expand Down Expand Up @@ -843,15 +858,15 @@ impl From<HttpInfo> for L7ProtocolSendLog {
L7Protocol::Grpc => {
if let Some(code) = f.grpc_status_code {
Some(code as i32)
} else if f.status_code > 0 {
Some(f.status_code as i32)
} else if let Some(code) = f.status_code {
Some(code as i32)
} else {
None
}
}
_ => {
if f.status_code > 0 {
Some(f.status_code as i32)
if let Some(code) = f.status_code {
Some(code as i32)
} else {
None
}
Expand Down Expand Up @@ -1213,8 +1228,11 @@ impl HttpLog {
if param.direction == PacketDirection::ServerToClient {
if let Some(code) = info.grpc_status_code {
self.set_grpc_status(code, info);
} else if let Some(code) = info.status_code {
self.set_status(code, info);
} else {
self.set_status(info.status_code, info);
// default to ok
info.status = L7ResponseStatus::Ok;
}

if let Some(l7_payload) = l7_payload {
Expand Down Expand Up @@ -1505,7 +1523,7 @@ impl HttpLog {
return Err(Error::HttpHeaderParseFailed);
}
info.version = version;
info.status_code = status_code;
info.status_code = Some(status_code);

info.msg_type = LogMessageType::Response;
} else {
Expand Down Expand Up @@ -1633,8 +1651,9 @@ impl HttpLog {
}
(PacketDirection::ServerToClient, Method::_ResponseHeader) => {
if info.grpc_status_code.is_none() {
if info.status_code == 0 {
return Err(Error::HttpHeaderParseFailed);
match info.status_code {
None | Some(0) => return Err(Error::HttpHeaderParseFailed),
_ => (),
}
if !grpc_streaming_data_enabled {
info.msg_type = LogMessageType::Response;
Expand Down Expand Up @@ -1916,8 +1935,8 @@ impl HttpLog {
}
":status" => {
info.msg_type = LogMessageType::Response;
let code = val.parse_to().unwrap_or_default();
info.status_code = code;
let code: u16 = val.parse_to().unwrap_or_default();
info.status_code = Some(code);
}
"host" | ":authority" => info.host = String::from_utf8_lossy(val).into_owned(),
":path" => {
Expand Down Expand Up @@ -2078,11 +2097,6 @@ impl HttpLog {
info.method = Method::try_from(value.as_str()).unwrap_or_default();
}
}
// resp
NativeTag::ResponseCode => {
info.response_code_to_attribute();
info.status_code = value.parse::<u16>().unwrap_or_default();
}
// trace
NativeTag::SpanId => {
if CUSTOM_FIELD_POLICY_PRIORITY <= info.span_id.prio() {
Expand All @@ -2101,12 +2115,6 @@ impl HttpLog {
_ => auto_merge_custom_field(op, info),
}
}
Op::AddAttribute(key, value) => {
info.attributes.push(KeyVal {
key: key.to_string(),
val: value.to_string(),
});
}
Op::AddMetric(key, value) => {
info.metrics.push(MetricKeyVal {
key: key.to_string(),
Expand Down
17 changes: 10 additions & 7 deletions agent/src/flow_generator/protocol_logs/rpc/dubbo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{borrow::Cow, mem::replace};

use serde::Serialize;

use public::l7_protocol::{Field, FieldSetter, L7Log, LogMessageType};
use public::l7_protocol::{Field, FieldSetter, L7Log, L7LogAttribute, LogMessageType};
use public_derive::L7Log;

use crate::{
Expand Down Expand Up @@ -171,6 +171,15 @@ pub struct DubboInfo {
biz_scenario: String,
}

impl L7LogAttribute for DubboInfo {
fn add_attribute(&mut self, name: Cow<'_, str>, value: Cow<'_, str>) {
self.attributes.push(KeyVal {
key: name.into_owned(),
val: value.into_owned(),
});
}
}

impl DubboInfo {
pub fn generate_endpoint(&self) -> Option<String> {
if !self.service_name.is_empty() || !self.method_name.is_empty() {
Expand Down Expand Up @@ -1137,12 +1146,6 @@ impl DubboLog {
_ => auto_merge_custom_field(op, info),
}
}
Op::AddAttribute(key, value) => {
info.attributes.push(KeyVal {
key: key.to_string(),
val: value.to_string(),
});
}
Op::AddMetric(key, value) => {
info.metrics.push(MetricKeyVal {
key: key.to_string(),
Expand Down
4 changes: 2 additions & 2 deletions agent/src/flow_generator/protocol_logs/rpc/some_ip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,15 @@ mod tests {
L7ParseResult::Single(s) => {
output.push_str(&serde_json::to_string(&s).unwrap());
output.push_str(
format!(" check: {}", some_ip.check_payload(payload, param)).as_str(),
format!(" check: {:?}", some_ip.check_payload(payload, param)).as_str(),
);
output.push_str("\n");
}
L7ParseResult::Multi(m) => {
for i in m {
output.push_str(&serde_json::to_string(&i).unwrap());
output.push_str(
format!(" check: {}", some_ip.check_payload(payload, param))
format!(" check: {:?}", some_ip.check_payload(payload, param))
.as_str(),
);
output.push_str("\n");
Expand Down
20 changes: 13 additions & 7 deletions agent/src/flow_generator/protocol_logs/sql/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ use crate::{
},
utils::bytes,
};
use public::l7_protocol::{Field, FieldSetter, L7Log, L7ProtocolChecker, LogMessageType};
use public::l7_protocol::{
Field, FieldSetter, L7Log, L7LogAttribute, L7ProtocolChecker, LogMessageType,
};
use public_derive::L7Log;

cfg_if::cfg_if! {
Expand Down Expand Up @@ -156,6 +158,7 @@ pub struct MysqlInfo {
pub context: String,
// response
pub response_code: u8,
#[l7_log(response_code)]
#[serde(skip)]
pub error_code: Option<i32>,
#[serde(rename = "sql_affected_rows", skip_serializing_if = "value_is_default")]
Expand Down Expand Up @@ -193,6 +196,15 @@ pub struct MysqlInfo {
attributes: Vec<KeyVal>,
}

impl L7LogAttribute for MysqlInfo {
fn add_attribute(&mut self, name: Cow<'_, str>, value: Cow<'_, str>) {
self.attributes.push(KeyVal {
key: name.into_owned(),
val: value.into_owned(),
});
}
}

impl L7ProtocolInfoInterface for MysqlInfo {
fn session_id(&self) -> Option<u32> {
None
Expand Down Expand Up @@ -1405,12 +1417,6 @@ impl MysqlLog {
};
for op in self.custom_field_store.drain_with(policies, &*info) {
match &op.op {
Op::AddAttribute(key, value) => {
info.attributes.push(KeyVal {
key: key.to_string(),
val: value.to_string(),
});
}
Op::AddMetric(_, _) | Op::SavePayload(_) => (),
_ => auto_merge_custom_field(op, info),
}
Expand Down
2 changes: 1 addition & 1 deletion agent/src/plugin/wasm/vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ impl From<(&ParseParam<'_>, &HttpInfo, &[u8])> for VmHttpRespCtx {
let (param, info, payload) = value;
Self {
base_ctx: VmCtxBase::from((param, info.proto as u8, payload)),
code: info.status_code,
code: info.status_code.unwrap_or_default(),
status: info.status,
}
}
Expand Down
7 changes: 6 additions & 1 deletion server/agent_config/README-CH.md
Original file line number Diff line number Diff line change
Expand Up @@ -8923,6 +8923,7 @@ processors:
rewrite_native_tag:
# 可以填写以下几种字段之一,用于覆写对应字段的值
# 注意对应的协议需要支持,否则配置无效
# 当重写 response_code 时,自动将原来的非空值以 `sys_response_code` 为名写入 attribute 中
# - version
# - request_type
# - request_domain
Expand All @@ -8942,6 +8943,9 @@ processors:
# - biz_code
# - biz_scenario
name: version
# 映射字典名称,配置不为空时将输入用所配置的字典进行映射。配置为空时不生效
# 注意:condition 中的黑白名单匹配映射后的结果
remap: dict_1
condition:
enum_whitelist: [] # 枚举白名单,当提取结果在白名单中时,进行重写。配置为空时不生效
enum_blacklist: [] # 枚举黑名单,当提取结果在黑名单中时,不进行重写
Expand All @@ -8958,7 +8962,7 @@ processors:
# 直接用常量值作为字段值
const_fields:
- value: "123"
# 输出配置,参考 fields 中 output 的说明进行配置,但不支持 metric,rewrite_response_status 和 rewrite_native_tag 中的 condition
# 输出配置,参考 fields 中 output 的说明进行配置,但不支持 metric,rewrite_response_status 和 rewrite_native_tag 中的 remap/condition
output:
attribute_name: "xyz"
rewrite_native_tag:
Expand All @@ -8974,6 +8978,7 @@ processors:
metric_name: "xyz"
rewrite_native_tag:
name: version
remap: dict_1
condition:
enum_whitelist: []
enum_blacklist: []
Expand Down
Loading
Loading