Skip to content

Commit 80ccc26

Browse files
yuanchaoarvql
authored andcommitted
feat: support triple
1 parent bb0330e commit 80ccc26

File tree

7 files changed

+73
-11
lines changed

7 files changed

+73
-11
lines changed

agent/crates/public/src/l7_protocol.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ pub enum L7Protocol {
6060
Tars = 46,
6161
SomeIp = 47,
6262
Iso8583 = 48,
63+
Triple = 49,
6364

6465
// SQL
6566
MySQL = 60,
@@ -103,6 +104,7 @@ impl L7Protocol {
103104
| Self::SofaRPC
104105
| Self::SomeIp
105106
| Self::Ping
107+
| Self::Triple
106108
| Self::Custom => true,
107109
_ => false,
108110
}

agent/src/common/l7_protocol_log.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ macro_rules! impl_protocol_parser {
7878
match p.protocol() {
7979
L7Protocol::Http1 => return "HTTP",
8080
L7Protocol::Http2 => return "HTTP2",
81+
L7Protocol::Triple => return "Triple",
8182
_ => unreachable!()
8283
}
8384
},
@@ -96,6 +97,7 @@ macro_rules! impl_protocol_parser {
9697
match value {
9798
"HTTP" => Ok(Self::Http(HttpLog::new_v1())),
9899
"HTTP2" => Ok(Self::Http(HttpLog::new_v2(false))),
100+
"Triple" => Ok(Self::Http(HttpLog::new_triple())),
99101
"Custom"=>Ok(Self::Custom(Default::default())),
100102
#[cfg(feature = "enterprise")]
101103
"ISO-8583"=>Ok(Self::Iso8583(Default::default())),
@@ -115,6 +117,7 @@ macro_rules! impl_protocol_parser {
115117
L7Protocol::Http1 => Some(L7ProtocolParser::Http(HttpLog::new_v1())),
116118
L7Protocol::Http2 => Some(L7ProtocolParser::Http(HttpLog::new_v2(false))),
117119
L7Protocol::Grpc => Some(L7ProtocolParser::Http(HttpLog::new_v2(true))),
120+
L7Protocol::Triple => Some(L7ProtocolParser::Http(HttpLog::new_triple())),
118121

119122
// in check_payload, need to get the default Custom by L7Protocol.
120123
// due to Custom not in macro, need to define explicit

agent/src/config/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1750,6 +1750,7 @@ impl Default for Filters {
17501750
("Tars".to_string(), "1-65535".to_string()),
17511751
("SomeIP".to_string(), "1-65535".to_string()),
17521752
("ISO8583".to_string(), "1-65535".to_string()),
1753+
("Triple".to_string(), "1-65535".to_string()),
17531754
("MySQL".to_string(), "1-65535".to_string()),
17541755
("PostgreSQL".to_string(), "1-65535".to_string()),
17551756
("Oracle".to_string(), "1521".to_string()),
@@ -1780,6 +1781,7 @@ impl Default for Filters {
17801781
("Tars".to_string(), vec![]),
17811782
("SomeIP".to_string(), vec![]),
17821783
("ISO8583".to_string(), vec![]),
1784+
("Triple".to_string(), vec![]),
17831785
("MySQL".to_string(), vec![]),
17841786
("PostgreSQL".to_string(), vec![]),
17851787
("Oracle".to_string(), vec![]),

agent/src/flow_generator/protocol_logs/http.rs

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,9 @@ pub struct HttpInfo {
336336
is_async: bool,
337337
#[serde(skip_serializing_if = "value_is_default")]
338338
is_reversed: bool,
339+
340+
#[serde(skip)]
341+
dubbo_service_version: String,
339342
}
340343

341344
impl L7LogAttribute for HttpInfo {
@@ -597,12 +600,17 @@ impl HttpInfo {
597600
if other_is_grpc {
598601
self.proto = L7Protocol::Grpc;
599602
}
603+
if other.proto == L7Protocol::Triple {
604+
self.proto = L7Protocol::Triple;
605+
}
600606
if other.is_reversed {
601607
self.is_reversed = other.is_reversed;
602608
}
603609
if other.biz_type > 0 {
604610
self.biz_type = other.biz_type;
605611
}
612+
613+
super::swap_if!(self, dubbo_service_version, is_empty, other);
606614
super::swap_if!(self, biz_code, is_empty, other);
607615
super::swap_if!(self, biz_scenario, is_empty, other);
608616

@@ -685,12 +693,21 @@ impl HttpInfo {
685693
}
686694

687695
fn get_version(&self) -> Field {
696+
if self.proto == L7Protocol::Triple {
697+
return Field::Str(Cow::Borrowed(&self.dubbo_service_version.as_str()));
698+
}
688699
Field::Str(Cow::Borrowed(&self.version.as_str()))
689700
}
690701

691702
fn set_version(&mut self, version: FieldSetter) {
692703
match version.into_inner() {
693-
Field::Str(s) => self.version = Version::try_from(s.borrow()).unwrap_or_default(),
704+
Field::Str(s) => {
705+
if self.proto == L7Protocol::Triple {
706+
self.dubbo_service_version = s.as_ref().to_string();
707+
} else {
708+
self.version = Version::try_from(s.borrow()).unwrap_or_default();
709+
}
710+
}
694711
_ => self.version = Version::Unknown,
695712
}
696713
}
@@ -797,7 +814,11 @@ impl From<HttpInfo> for L7ProtocolSendLog {
797814
resp_len: f.resp_content_length,
798815
captured_request_byte: f.captured_request_byte,
799816
captured_response_byte: f.captured_response_byte,
800-
version: Some(f.version.as_str().to_owned()),
817+
version: if f.proto == L7Protocol::Triple {
818+
Some(f.dubbo_service_version)
819+
} else {
820+
Some(f.version.as_str().to_owned())
821+
},
801822
req: L7Request {
802823
req_type,
803824
resource,
@@ -914,7 +935,7 @@ impl L7ProtocolParserInterface for HttpLog {
914935
// http2 有两个版本, 现在可以直接通过proto区分解析哪个版本的协议.
915936
match self.proto {
916937
L7Protocol::Http1 => self.http1_check_protocol(payload),
917-
L7Protocol::Http2 | L7Protocol::Grpc => {
938+
L7Protocol::Http2 | L7Protocol::Grpc | L7Protocol::Triple => {
918939
let Some(config) = param.parse_config else {
919940
return None;
920941
};
@@ -1015,7 +1036,7 @@ impl L7ProtocolParserInterface for HttpLog {
10151036
Ok(L7ParseResult::None)
10161037
}
10171038
}
1018-
L7Protocol::Http2 | L7Protocol::Grpc => {
1039+
L7Protocol::Http2 | L7Protocol::Grpc | L7Protocol::Triple => {
10191040
let mut infos = vec![];
10201041
let mut offset = 0;
10211042
let mut last_error = Err(Error::HttpHeaderParseFailed);
@@ -1070,7 +1091,10 @@ impl L7ProtocolParserInterface for HttpLog {
10701091
custom_policies,
10711092
);
10721093

1073-
if !info.is_invalid() || info.proto == L7Protocol::Grpc {
1094+
if !info.is_invalid()
1095+
|| info.proto == L7Protocol::Grpc
1096+
|| info.proto == L7Protocol::Triple
1097+
{
10741098
if let Some(h) = info.headers_offset.as_mut() {
10751099
*h += offset as u32;
10761100
}
@@ -1113,6 +1137,10 @@ impl L7ProtocolParserInterface for HttpLog {
11131137
proto: L7Protocol::Grpc,
11141138
..Default::default()
11151139
},
1140+
L7Protocol::Triple => Self {
1141+
proto: L7Protocol::Triple,
1142+
..Default::default()
1143+
},
11161144
_ => unreachable!(),
11171145
};
11181146
new_log.perf_stats = self.perf_stats.take();
@@ -1146,6 +1174,13 @@ impl HttpLog {
11461174
}
11471175
}
11481176

1177+
pub fn new_triple() -> Self {
1178+
Self {
1179+
proto: L7Protocol::Triple,
1180+
..Default::default()
1181+
}
1182+
}
1183+
11491184
fn set_info_by_config(
11501185
&mut self,
11511186
param: &ParseParam,
@@ -1540,7 +1575,7 @@ impl HttpLog {
15401575
}
15411576

15421577
match info.proto {
1543-
L7Protocol::Grpc => match (direction, info.method) {
1578+
L7Protocol::Grpc | L7Protocol::Triple => match (direction, info.method) {
15441579
(PacketDirection::ClientToServer, Method::_RequestData) => {
15451580
if !grpc_streaming_data_enabled {
15461581
return Err(Error::HttpHeaderParseFailed);
@@ -1740,7 +1775,9 @@ impl HttpLog {
17401775
is_httpv2 = true;
17411776
break;
17421777
}
1743-
} else if (header_frame_parsed || self.proto == L7Protocol::Grpc)
1778+
} else if (header_frame_parsed
1779+
|| self.proto == L7Protocol::Grpc
1780+
|| self.proto == L7Protocol::Triple)
17441781
&& httpv2_header.frame_type == HTTPV2_FRAME_DATA_TYPE
17451782
{
17461783
// HTTPv2协议中存在可以通过Headers帧中携带“Content-Length”字段,即可直接进行解析
@@ -1849,15 +1886,22 @@ impl HttpLog {
18491886
info.status_code = Some(code);
18501887
}
18511888
"host" | ":authority" => info.host = String::from_utf8_lossy(val).into_owned(),
1852-
":path" => {
1889+
":path" if info.path.is_empty() => {
18531890
info.path = String::from_utf8_lossy(val).into_owned();
18541891
}
18551892
"grpc-status" if config.grpc_streaming_data_enabled => {
18561893
info.msg_type = LogMessageType::Response;
18571894
let code = val.parse_to().unwrap_or_default();
18581895
info.grpc_status_code = Some(code);
18591896
}
1860-
"content-type" => {
1897+
"tri-service-version" => {
1898+
// change to triple protocol
1899+
self.proto = L7Protocol::Triple;
1900+
info.proto = L7Protocol::Triple;
1901+
info.method = Method::Post;
1902+
info.dubbo_service_version = String::from_utf8_lossy(val).into_owned();
1903+
}
1904+
"content-type" if self.proto != L7Protocol::Triple => {
18611905
// change to grpc protocol
18621906
if val.starts_with(b"application/grpc") {
18631907
self.proto = L7Protocol::Grpc;
@@ -1951,7 +1995,9 @@ impl HttpLog {
19511995
) {
19521996
let field_iter = match info.proto {
19531997
L7Protocol::Http1 => config.extra_log_fields.http.iter(),
1954-
L7Protocol::Http2 | L7Protocol::Grpc => config.extra_log_fields.http2.iter(),
1998+
L7Protocol::Http2 | L7Protocol::Grpc | L7Protocol::Triple => {
1999+
config.extra_log_fields.http2.iter()
2000+
}
19552001
_ => return,
19562002
};
19572003

agent/src/flow_generator/protocol_logs/parser.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ impl MetaAppProto {
235235
// | 64b flow_id | 24b 0 | 8b proto | 32b cap_seq |
236236
let mut key = (flow_id as u128) << 64;
237237

238-
if proto == L7Protocol::Grpc {
238+
if proto == L7Protocol::Grpc || proto == L7Protocol::Triple {
239239
proto = L7Protocol::Http2;
240240
}
241241
key |= (proto as u128) << 32;

server/libs/datatype/flow.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ const (
159159
L7_PROTOCOL_TARS L7Protocol = 46
160160
L7_PROTOCOL_SOME_IP L7Protocol = 47
161161
L7_PROTOCOL_ISO8583 L7Protocol = 48
162+
L7_PROTOCOL_TRIPLE L7Protocol = 49
162163
L7_PROTOCOL_MYSQL L7Protocol = 60
163164
L7_PROTOCOL_POSTGRE L7Protocol = 61
164165
L7_PROTOCOL_ORACLE L7Protocol = 62
@@ -696,6 +697,12 @@ func (p L7Protocol) String(isTLS bool) string {
696697
} else {
697698
return "ISO-8583"
698699
}
700+
case L7_PROTOCOL_TRIPLE:
701+
if isTLS {
702+
return "TRIPLE_TLS"
703+
} else {
704+
return "TRIPLE"
705+
}
699706
case L7_PROTOCOL_REDIS:
700707
if isTLS {
701708
return "Redis_TLS"
@@ -795,6 +802,7 @@ var L7ProtocolStringMap = map[string]L7Protocol{
795802
strings.ToLower(L7_PROTOCOL_POSTGRE.String(false)): L7_PROTOCOL_POSTGRE,
796803
strings.ToLower(L7_PROTOCOL_ORACLE.String(false)): L7_PROTOCOL_ORACLE,
797804
strings.ToLower(L7_PROTOCOL_ISO8583.String(false)): L7_PROTOCOL_ISO8583,
805+
strings.ToLower(L7_PROTOCOL_TRIPLE.String(false)): L7_PROTOCOL_TRIPLE,
798806
strings.ToLower(L7_PROTOCOL_REDIS.String(false)): L7_PROTOCOL_REDIS,
799807
strings.ToLower(L7_PROTOCOL_MONGODB.String(false)): L7_PROTOCOL_MONGODB,
800808
strings.ToLower(L7_PROTOCOL_KAFKA.String(false)): L7_PROTOCOL_KAFKA,

server/querier/db_descriptions/clickhouse/tag/enum/l7_protocol

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
46 , Tars ,
1111
47 , Some/IP ,
1212
48 , ISO-8583 ,
13+
49 , Triple ,
1314
60 , MySQL ,
1415
61 , PostgreSQL ,
1516
62 , Oracle ,

0 commit comments

Comments
 (0)