Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions agent/src/ebpf/kernel/files_rw.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ static __inline int trace_io_event_common(void *ctx,
}

buffer->bytes_count = data_args->bytes_count;
buffer->latency = latency;
buffer->operation = direction;
//buffer->latency = latency;
//buffer->operation = direction;
struct __socket_data_buffer *v_buff =
bpf_map_lookup_elem(&NAME(data_buf), &k0);
if (!v_buff)
Expand All @@ -343,19 +343,34 @@ static __inline int trace_io_event_common(void *ctx,
v->fd = data_args->fd;
v->tgid = tgid;
v->pid = (__u32) pid_tgid;
v->coroutine_id = trace_key.goid;
//v->coroutine_id = trace_key.goid;
v->timestamp = data_args->enter_ts;
v->syscall_len = sizeof(*buffer);
v->source = DATA_SOURCE_IO_EVENT;
v->thread_trace_id = trace_id;
// hs_err_pid
if (buffer->filename[0] == 'h' && buffer->filename[1] == 's' &&
buffer->filename[2] == '_' && buffer->filename[3] == 'e' &&
buffer->filename[4] == 'r' && buffer->filename[5] == 'r' &&
buffer->filename[6] == '_' && buffer->filename[7] == 'p') {
v->source = DATA_SOURCE_FILE_WRITE;
v->syscall_len = data_args->bytes_count;
} else {
__sync_fetch_and_add(&tracer_ctx->push_buffer_refcnt, -1);
return 0;
//v->source = DATA_SOURCE_IO_EVENT;
//v->syscall_len = sizeof(*buffer);
}
//v->thread_trace_id = trace_id;
v->msg_type = MSG_COMMON;
bpf_get_current_comm(v->comm, sizeof(v->comm));
bool is_vecs = false;
if (data_args->iov != NULL)
is_vecs = true;
#if !defined(LINUX_VER_KFUNC) && !defined(LINUX_VER_5_2_PLUS)
struct tail_calls_context *context =
(struct tail_calls_context *)v->data;
context->max_size_limit = data_max_sz;
context->push_reassembly_bytes = 0;
context->vecs = false;
context->vecs = is_vecs;
context->is_close = false;
context->dir = direction;
#ifdef SUPPORTS_KPROBE_ONLY
Expand All @@ -366,7 +381,7 @@ static __inline int trace_io_event_common(void *ctx,
return 0;
#else
return __output_data_common(ctx, tracer_ctx, v_buff, data_args,
direction, false, data_max_sz, false, 0);
direction, is_vecs, data_max_sz, false, 0);
#endif
}

Expand Down Expand Up @@ -479,7 +494,9 @@ TP_SYSCALL_PROG(exit_preadv2) (struct syscall_comm_exit_ctx * ctx) {
#endif /* SUPPORTS_KPROBE_ONLY */

// File Write Event Tracing
static __inline int do_sys_enter_pwrite(int fd, enum syscall_src_func fn)
static __inline int do_sys_enter_pwrite(int fd, const char *buf,
struct iovec *iov, int iovlen,
enum syscall_src_func fn)
{
__u32 k0 = 0;
struct member_fields_offset *offset = members_offset__lookup(&k0);
Expand All @@ -490,6 +507,9 @@ static __inline int do_sys_enter_pwrite(int fd, enum syscall_src_func fn)
struct data_args_t write_args = {};
write_args.source_fn = fn;
write_args.fd = fd;
write_args.buf = buf;
write_args.iov = iov;
write_args.iovlen = iovlen;
write_args.enter_ts = bpf_ktime_get_ns();
active_write_args_map__update(&id, &write_args);
return 0;
Expand All @@ -500,7 +520,8 @@ static __inline int do_sys_enter_pwrite(int fd, enum syscall_src_func fn)
#ifdef SUPPORTS_KPROBE_ONLY
KPROG(ksys_pwrite64) (struct pt_regs * ctx) {
int fd = (int)PT_REGS_PARM1(ctx);
return do_sys_enter_pwrite(fd, SYSCALL_FUNC_PWRITE64);
const char *buf = (char *)PT_REGS_PARM2(ctx);
return do_sys_enter_pwrite(fd, buf, NULL, 0, SYSCALL_FUNC_PWRITE64);
}

/*
Expand All @@ -510,25 +531,32 @@ KPROG(ksys_pwrite64) (struct pt_regs * ctx) {
*/
KPROG(do_pwritev) (struct pt_regs * ctx) {
int fd = (int)PT_REGS_PARM1(ctx);
return do_sys_enter_pwrite(fd, SYSCALL_FUNC_PWRITEV);
struct iovec *iov = (struct iovec *)PT_REGS_PARM2(ctx);
int iovlen = (int)PT_REGS_PARM3(ctx);
return do_sys_enter_pwrite(fd, NULL, iov, iovlen, SYSCALL_FUNC_PWRITEV);
}
#else
// /sys/kernel/debug/tracing/events/syscalls/sys_enter_pwrite64/format
TP_SYSCALL_PROG(enter_pwrite64) (struct syscall_comm_enter_ctx * ctx) {
int fd = ctx->fd;
return do_sys_enter_pwrite(fd, SYSCALL_FUNC_PWRITE64);
const char *buf = (const char *)ctx->buf;
return do_sys_enter_pwrite(fd, buf, NULL, 0, SYSCALL_FUNC_PWRITE64);
}

// /sys/kernel/debug/tracing/events/syscalls/sys_enter_pwritev/format
TP_SYSCALL_PROG(enter_pwritev) (struct syscall_comm_enter_ctx * ctx) {
int fd = ctx->fd;
return do_sys_enter_pwrite(fd, SYSCALL_FUNC_PWRITEV);
struct iovec *iov = (struct iovec *)ctx->buf;
int iovlen = (int)ctx->count;
return do_sys_enter_pwrite(fd, NULL, iov, iovlen, SYSCALL_FUNC_PWRITEV);
}

// /sys/kernel/debug/tracing/events/syscalls/sys_enter_pwritev2/format
TP_SYSCALL_PROG(enter_pwritev2) (struct syscall_comm_enter_ctx * ctx) {
int fd = ctx->fd;
return do_sys_enter_pwrite(fd, SYSCALL_FUNC_PWRITEV2);
struct iovec *iov = (struct iovec *)ctx->buf;
int iovlen = (int)ctx->count;
return do_sys_enter_pwrite(fd, NULL, iov, iovlen, SYSCALL_FUNC_PWRITEV2);
}
#endif /* SUPPORTS_KPROBE_ONLY */

Expand Down
1 change: 1 addition & 0 deletions agent/src/ebpf/kernel/include/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ enum process_data_extra_source {
DATA_SOURCE_RESERVED,
DATA_SOURCE_DPDK,
DATA_SOURCE_UNIX_SOCKET,
DATA_SOURCE_FILE_WRITE
};

struct protocol_message_t {
Expand Down
3 changes: 3 additions & 0 deletions agent/src/ebpf/kernel/socket_trace.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -3282,12 +3282,14 @@ static __inline int __output_data_common(void *ctx,
* 1. The delay of the periodic push event exceeds the threshold (typically 50 milliseconds).
* 2. The number of events exceeds the maximum batch size (MAX_EVENTS_BURST, typically 32).
* 3. The data buffer is full (not enough space for another struct __socket_data).
* 4. Send the file content immediately when collecting it to prevent out-of-order issues caused by buffering.
*/
__u64 curr_time = bpf_ktime_get_ns();
__u64 diff = curr_time - tracer_ctx->last_period_timestamp;
if (diff > PERIODIC_PUSH_DELAY_THRESHOLD_NS ||
v_buff->events_num >= MAX_EVENTS_BURST ||
(args && args->extra_iovlen) ||
v->source == DATA_SOURCE_FILE_WRITE ||
((sizeof(v_buff->data) - v_buff->len) < sizeof(*v))) {
finalize_data_output(ctx, tracer_ctx, curr_time, diff, v_buff);
}
Expand Down Expand Up @@ -3391,6 +3393,7 @@ static __inline int output_data_common(void *ctx)
if (diff > PERIODIC_PUSH_DELAY_THRESHOLD_NS ||
v_buff->events_num >= MAX_EVENTS_BURST ||
(args && args->extra_iovlen) ||
v->source == DATA_SOURCE_FILE_WRITE ||
((sizeof(v_buff->data) - v_buff->len) < sizeof(*v))) {
finalize_data_output(ctx, tracer_ctx, curr_time, diff, v_buff);
}
Expand Down
191 changes: 30 additions & 161 deletions agent/src/ebpf/samples/rust/socket-tracer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,137 +202,6 @@ extern "C" fn debug_callback(_data: *mut c_char, len: c_int) {
}

extern "C" fn socket_trace_callback(_: *mut c_void, queue_id: c_int, sd: *mut SK_BPF_DATA) -> c_int {
unsafe {
let mut proto_tag = String::from("");
if sk_proto_safe(sd) == SOCK_DATA_OTHER {
proto_tag.push_str("ORTHER");
} else if sk_proto_safe(sd) == SOCK_DATA_HTTP1 {
proto_tag.push_str("HTTP1");
} else if sk_proto_safe(sd) == SOCK_DATA_HTTP2 {
proto_tag.push_str("HTTP2");
} else if sk_proto_safe(sd) == SOCK_DATA_DNS {
proto_tag.push_str("DNS");
} else if sk_proto_safe(sd) == SOCK_DATA_MYSQL {
proto_tag.push_str("MYSQL");
} else if sk_proto_safe(sd) == SOCK_DATA_POSTGRESQL {
proto_tag.push_str("POSTGRESQL");
} else if sk_proto_safe(sd) == SOCK_DATA_REDIS {
proto_tag.push_str("REDIS");
} else if sk_proto_safe(sd) == SOCK_DATA_KAFKA {
proto_tag.push_str("KAFKA");
} else if sk_proto_safe(sd) == SOCK_DATA_MQTT {
proto_tag.push_str("MQTT");
} else if sk_proto_safe(sd) == SOCK_DATA_AMQP {
proto_tag.push_str("AMQP");
} else if sk_proto_safe(sd) == SOCK_DATA_NATS {
proto_tag.push_str("NATS");
} else if sk_proto_safe(sd) == SOCK_DATA_PULSAR {
proto_tag.push_str("PULSAR");
} else if sk_proto_safe(sd) == SOCK_DATA_DUBBO {
proto_tag.push_str("DUBBO");
} else if sk_proto_safe(sd) == SOCK_DATA_SOFARPC {
proto_tag.push_str("SOFARPC");
} else if sk_proto_safe(sd) == SOCK_DATA_FASTCGI {
proto_tag.push_str("FASTCGI");
} else if sk_proto_safe(sd) == SOCK_DATA_BRPC {
proto_tag.push_str("BRPC");
} else if sk_proto_safe(sd) == SOCK_DATA_TARS {
proto_tag.push_str("TARS");
} else if sk_proto_safe(sd) == SOCK_DATA_SOME_IP {
proto_tag.push_str("SomeIP");
} else if sk_proto_safe(sd) == SOCK_DATA_ISO8583 {
proto_tag.push_str("ISO8583");
} else if sk_proto_safe(sd) == SOCK_DATA_MONGO {
proto_tag.push_str("MONGO");
} else if sk_proto_safe(sd) == SOCK_DATA_TLS {
proto_tag.push_str("TLS");
} else if sk_proto_safe(sd) == SOCK_DATA_ORACLE {
proto_tag.push_str("ORACLE");
} else if sk_proto_safe(sd) == SOCK_DATA_OPENWIRE {
proto_tag.push_str("OPENWIRE");
} else if sk_proto_safe(sd) == SOCK_DATA_ZMTP {
proto_tag.push_str("ZMTP");
} else if sk_proto_safe(sd) == SOCK_DATA_WEBSPHEREMQ {
proto_tag.push_str("WEBSPHEREMQ");
} else {
proto_tag.push_str("UNSPEC");
}

println!("+ --------------------------------- +");
if sk_proto_safe(sd) == SOCK_DATA_HTTP1 {
let data = sk_data_str_safe(sd);
println!("{} <{}> BATCHLAST {} DIR {} TYPE {} PID {} THREAD_ID {} COROUTINE_ID {} CONTAINER_ID {} SOURCE {} ROLE {} COMM {} {} LEN {} SYSCALL_LEN {} SOCKET_ID 0x{:x} TRACE_ID 0x{:x} TCP_SEQ {} DATA_SEQ {} TLS {} TimeStamp {}\n{}",
date_time((*sd).timestamp),
proto_tag,
(*sd).batch_last_data,
(*sd).direction,
(*sd).msg_type,
(*sd).process_id,
(*sd).thread_id,
(*sd).coroutine_id,
sd_container_id_safe(sd),
(*sd).source,
(*sd).socket_role,
process_name_safe(sd),
flow_info(sd),
(*sd).cap_len,
(*sd).syscall_len,
(*sd).socket_id,
(*sd).syscall_trace_id_call,
(*sd).tcp_seq,
(*sd).cap_seq,
(*sd).is_tls,
(*sd).timestamp,
data);
} else {
let data: Vec<u8> = sk_data_bytes_safe(sd);
println!("{} <{}> BATCHLAST {} DIR {} TYPE {} PID {} THREAD_ID {} COROUTINE_ID {} CONTAINER_ID {} SOURCE {} ROLE {} COMM {} {} LEN {} SYSCALL_LEN {} SOCKET_ID 0x{:x} TRACE_ID 0x{:x} TCP_SEQ {} DATA_SEQ {} TLS {} TimeStamp {}",
date_time((*sd).timestamp),
proto_tag,
(*sd).batch_last_data,
(*sd).direction,
(*sd).msg_type,
(*sd).process_id,
(*sd).thread_id,
(*sd).coroutine_id,
sd_container_id_safe(sd),
(*sd).source,
(*sd).socket_role,
process_name_safe(sd),
flow_info(sd),
(*sd).cap_len,
(*sd).syscall_len,
(*sd).socket_id,
(*sd).syscall_trace_id_call,
(*sd).tcp_seq,
(*sd).cap_seq,
(*sd).is_tls,
(*sd).timestamp);
if (*sd).source == 2 {
print_uprobe_http2_info((*sd).cap_data, (*sd).cap_len);
} else if (*sd).source == 4 {
print_io_event_info((*sd).cap_data, (*sd).cap_len);
} else if (*sd).source == 5 {
print_uprobe_grpc_dataframe((*sd).cap_data, (*sd).cap_len);
} else if sk_proto_safe(sd) == SOCK_DATA_OTHER {
for x in data.into_iter() {
print!("{} ", format!("{:02x}", x));
}
} else {
for x in data.into_iter() {
if x < 32 || x > 126 {
print!(".");
continue;
}
let b = x as char;
print!("{0}", b);
}
}
print!("\x1b[0m\n");
}

println!("+ --------------------------------- +\n");
}

0
}
Expand Down Expand Up @@ -416,29 +285,29 @@ fn main() {
}
}
unsafe {
enable_ebpf_protocol(SOCK_DATA_HTTP1 as c_int);
enable_ebpf_protocol(SOCK_DATA_HTTP2 as c_int);
enable_ebpf_protocol(SOCK_DATA_DUBBO as c_int);
enable_ebpf_protocol(SOCK_DATA_SOFARPC as c_int);
enable_ebpf_protocol(SOCK_DATA_FASTCGI as c_int);
enable_ebpf_protocol(SOCK_DATA_BRPC as c_int);
enable_ebpf_protocol(SOCK_DATA_TARS as c_int);
enable_ebpf_protocol(SOCK_DATA_SOME_IP as c_int);
enable_ebpf_protocol(SOCK_DATA_ISO8583 as c_int);
enable_ebpf_protocol(SOCK_DATA_MYSQL as c_int);
enable_ebpf_protocol(SOCK_DATA_POSTGRESQL as c_int);
enable_ebpf_protocol(SOCK_DATA_REDIS as c_int);
enable_ebpf_protocol(SOCK_DATA_KAFKA as c_int);
enable_ebpf_protocol(SOCK_DATA_MQTT as c_int);
enable_ebpf_protocol(SOCK_DATA_AMQP as c_int);
enable_ebpf_protocol(SOCK_DATA_OPENWIRE as c_int);
enable_ebpf_protocol(SOCK_DATA_ZMTP as c_int);
enable_ebpf_protocol(SOCK_DATA_WEBSPHEREMQ as c_int);
enable_ebpf_protocol(SOCK_DATA_NATS as c_int);
enable_ebpf_protocol(SOCK_DATA_PULSAR as c_int);
enable_ebpf_protocol(SOCK_DATA_DNS as c_int);
enable_ebpf_protocol(SOCK_DATA_MONGO as c_int);
enable_ebpf_protocol(SOCK_DATA_TLS as c_int);
// enable_ebpf_protocol(SOCK_DATA_HTTP1 as c_int);
// enable_ebpf_protocol(SOCK_DATA_HTTP2 as c_int);
// enable_ebpf_protocol(SOCK_DATA_DUBBO as c_int);
// enable_ebpf_protocol(SOCK_DATA_SOFARPC as c_int);
// enable_ebpf_protocol(SOCK_DATA_FASTCGI as c_int);
// enable_ebpf_protocol(SOCK_DATA_BRPC as c_int);
// enable_ebpf_protocol(SOCK_DATA_TARS as c_int);
// enable_ebpf_protocol(SOCK_DATA_SOME_IP as c_int);
// enable_ebpf_protocol(SOCK_DATA_ISO8583 as c_int);
// enable_ebpf_protocol(SOCK_DATA_MYSQL as c_int);
// enable_ebpf_protocol(SOCK_DATA_POSTGRESQL as c_int);
// enable_ebpf_protocol(SOCK_DATA_REDIS as c_int);
// enable_ebpf_protocol(SOCK_DATA_KAFKA as c_int);
// enable_ebpf_protocol(SOCK_DATA_MQTT as c_int);
// enable_ebpf_protocol(SOCK_DATA_AMQP as c_int);
// enable_ebpf_protocol(SOCK_DATA_OPENWIRE as c_int);
// enable_ebpf_protocol(SOCK_DATA_ZMTP as c_int);
// enable_ebpf_protocol(SOCK_DATA_WEBSPHEREMQ as c_int);
// enable_ebpf_protocol(SOCK_DATA_NATS as c_int);
// enable_ebpf_protocol(SOCK_DATA_PULSAR as c_int);
// enable_ebpf_protocol(SOCK_DATA_DNS as c_int);
// enable_ebpf_protocol(SOCK_DATA_MONGO as c_int);
// enable_ebpf_protocol(SOCK_DATA_TLS as c_int);

//set_feature_regex(
// FEATURE_UPROBE_OPENSSL,
Expand All @@ -449,9 +318,9 @@ fn main() {
// CString::new(".*".as_bytes()).unwrap().as_c_str().as_ptr(),
//);

//set_io_event_collect_mode(1);
set_io_event_collect_mode(2);

//set_io_event_minimal_duration(1000000);
set_io_event_minimal_duration(10);

//// enable go auto traceing,
//set_go_tracing_timeout(120);
Expand Down Expand Up @@ -682,11 +551,11 @@ fn main() {
// test data limit max
set_data_limit_max(10000);

//let empty_string = CString::new("").expect("CString::new failed");
//if datadump_set_config(0, empty_string.as_ptr(), 0, 60, debug_callback) != 0 {
// println!("datadump_set_config() error");
// ::std::process::exit(1);
//}
let empty_string = CString::new("").expect("CString::new failed");
if datadump_set_config(0, empty_string.as_ptr(), 0, 600000, debug_callback) != 0 {
println!("datadump_set_config() error");
::std::process::exit(1);
}

print!("socket_tracer_start() finish\n");

Expand Down
Loading