Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions app/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ app:
allow_multiple_trace_ids_in_tracing_result: false
# use external apm data to fill trace data, default: false
call_apm_api_to_supplement_trace: false
# source data for tracing, default: all (set to empty means all)
# source data for tracing, default: [trace_id, syscall, tcp_seq, x_request_id] (equals to empty)
# available options: [trace_id, syscall, tcp_seq, x_request_id, dns]
# in default settings, we do not trace DNS protocol
# each source means one of tracing method enabled
# trace_id: global seq through requests
# syscall: eBPF tracing through process/threads
# tcp_seq: network layer tracing through TCP seq
# x_request_id: application layer tracing through X-Request-ID/http header
# dns: DNS protocol tracing through DNS transaction-id (request_id)
# dns: DNS protocol tracing through DNS transaction-id (request_id), IT'S DISABLED BY DEFAULT, REQUIRED ENABLED MANUALLY IF NEEDED
tracing_source: []
11 changes: 6 additions & 5 deletions app/app/application/l7_flow_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
import uuid
import pandas as pd
from log import logger
from typing import List, Dict, Set, Callable
from typing import List, Dict, Set, Callable, Tuple

from ast import Tuple
from pandas import DataFrame
from collections import defaultdict
from data.querier_client import Querier
Expand Down Expand Up @@ -223,11 +222,12 @@
TRACING_SRC_SYSCALL = "syscall"
TRACING_SRC_TCP_SEQ = "tcp_seq"
TRACING_SRC_X_REQ_ID = "x_request_id"
# 默认不开启 dns 追踪
TRACING_SRC_DNS = "dns"

DEFAULT_TRACING_SOURCE = [
TRACING_SRC_TRACE_ID, TRACING_SRC_SYSCALL, TRACING_SRC_TCP_SEQ,
TRACING_SRC_X_REQ_ID, TRACING_SRC_DNS
TRACING_SRC_X_REQ_ID
]


Expand Down Expand Up @@ -282,7 +282,7 @@ async def query_and_trace_flowmetas(
max_iteration: int = config.max_iteration,
network_delay_us: int = config.network_delay_us,
host_clock_offset_us: int = config.host_clock_offset_us,
app_spans_from_api: list = []) -> Tuple(list, list):
app_spans_from_api: list = []) -> Tuple[Set, list]:
"""多次迭代,查询可追踪到的所有 l7_flow_log 的摘要
参数说明:
time_filter: 查询的时间范围过滤条件,SQL表达式
Expand Down Expand Up @@ -312,7 +312,7 @@ async def query_and_trace_flowmetas(
dataframe_flowmetas = await self.query_flowmetas("1=1", base_filter)
if type(dataframe_flowmetas) != DataFrame or dataframe_flowmetas.empty:
# when app_spans_from_api got values from api, return it
return [], app_spans_from_api
return set(), app_spans_from_api
l7_flow_ids = set(dataframe_flowmetas['_id']) # set(flow._id)

# 用于下一轮迭代,记录元信息
Expand Down Expand Up @@ -430,6 +430,7 @@ async def query_and_trace_flowmetas(
# 写入 trace_id_index 时,遇到空 trace_id 有可能会复用 index,导致重复
# 于是,这里可能导致误查询,需要额外过滤一下 len(new_trace_id_arr)=0(trace_id='') 的情况
new_trace_id_flow_delete_index.append(index)

if new_trace_id_flow_delete_index:
new_trace_id_flows = new_trace_id_flows.drop(
new_trace_id_flow_delete_index).reset_index(
Expand Down
2 changes: 1 addition & 1 deletion app/app/common/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
TAP_SIDE_CLIENT_HYPERVISOR: 4,
TAP_SIDE_CLIENT_GATEWAY_HYPERVISOR: 5,
TAP_SIDE_CLIENT_GATEWAY: 6,
TAP_SIDE_SERVER_GATEWAY: 6, # 由于可能多次穿越网关区域,c-gw和s-gw还需要重排
TAP_SIDE_SERVER_GATEWAY: 7, # 由于可能多次穿越网关区域,c-gw和s-gw还需要重排
TAP_SIDE_SERVER_GATEWAY_HYPERVISOR: 8,
TAP_SIDE_SERVER_HYPERVISOR: 9,
TAP_SIDE_SERVER_POD_NODE: 10,
Expand Down
2 changes: 1 addition & 1 deletion app/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def parse_spec(self, cfg):
'call_apm_api_to_supplement_trace', False)
self.tracing_source = spec.get(
'tracing_source',
["trace_id", "syscall", "tcp_seq", "x_request_id", "dns"])
["trace_id", "syscall", "tcp_seq", "x_request_id"])

def parse_querier(self, cfg):
querier = cfg.get('querier', dict())
Expand Down