diff --git a/cmd/replayer/main.go b/cmd/replayer/main.go index 0bf1da1a..bdac919e 100644 --- a/cmd/replayer/main.go +++ b/cmd/replayer/main.go @@ -135,7 +135,7 @@ func main() { Speed: *speed, Username: *username, Password: *password, - Format: *format, + Format: replaycmd.TrafficFormat(*format), ReadOnly: *readonly, StartTime: *startTime, CommandStartTime: *cmdStartTime, diff --git a/pkg/server/api/traffic.go b/pkg/server/api/traffic.go index be1bce30..cc523026 100644 --- a/pkg/server/api/traffic.go +++ b/pkg/server/api/traffic.go @@ -100,7 +100,7 @@ func (h *Server) TrafficReplay(c *gin.Context) { } cfg.Username = c.PostForm("username") cfg.Password = c.PostForm("password") - cfg.Format = c.PostForm("format") + cfg.Format = cmd.TrafficFormat(c.PostForm("format")) cfg.ReadOnly = strings.EqualFold(c.PostForm("readonly"), "true") cfg.IgnoreErrs = strings.EqualFold(c.PostForm("ignore-errs"), "true") cfg.KeyFile = globalCfg.Security.EncryptionKeyPath diff --git a/pkg/sqlreplay/cmd/audit_log_extension.go b/pkg/sqlreplay/cmd/audit_log_extension.go new file mode 100644 index 00000000..e8cca6f4 --- /dev/null +++ b/pkg/sqlreplay/cmd/audit_log_extension.go @@ -0,0 +1,310 @@ +// Copyright 2026 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package cmd + +import ( + "fmt" + "strconv" + "strings" + "time" + + "github.com/pingcap/tiproxy/lib/util/errors" + pnet "github.com/pingcap/tiproxy/pkg/proxy/net" + "github.com/siddontang/go/hack" + "go.uber.org/zap" +) + +var _ AuditLogDecoder = (*AuditLogExtensionDecoder)(nil) + +type AuditLogExtensionDecoder struct { + connInfo map[uint64]auditLogPluginConnCtx + commandEndTime time.Time + // pendingCmds contains the commands that has not been returned yet. + pendingCmds []*Command + psCloseStrategy PSCloseStrategy + idAllocator *ConnIDAllocator + lg *zap.Logger +} + +func NewAuditLogExtensionDecoder(lg *zap.Logger) AuditLogDecoder { + return &AuditLogExtensionDecoder{ + connInfo: make(map[uint64]auditLogPluginConnCtx), + psCloseStrategy: PSCloseStrategyDirected, + lg: lg, + } +} + +// EnableFilterCommandWithRetry implements [AuditLogDecoder]. +func (decoder *AuditLogExtensionDecoder) EnableFilterCommandWithRetry() { + // do nothing for extension decoder, it's not supported yet +} + +// SetCommandEndTime implements [AuditLogDecoder]. +func (decoder *AuditLogExtensionDecoder) SetCommandEndTime(t time.Time) { + decoder.commandEndTime = t +} + +// SetIDAllocator implements [AuditLogDecoder]. +func (decoder *AuditLogExtensionDecoder) SetIDAllocator(alloc *ConnIDAllocator) { + decoder.idAllocator = alloc +} + +// SetPSCloseStrategy implements [AuditLogDecoder]. +func (decoder *AuditLogExtensionDecoder) SetPSCloseStrategy(s PSCloseStrategy) { + decoder.psCloseStrategy = s +} + +// SetCommandStartTime implements [AuditLogDecoder]. +func (decoder *AuditLogExtensionDecoder) SetCommandStartTime(t time.Time) { + // do nothing for extension decoder +} + +func (decoder *AuditLogExtensionDecoder) Decode(reader LineReader) (retCmd *Command, err error) { + defer func() { + if retCmd != nil { + fmt.Println("Decoded command:", retCmd.ConnID, retCmd.Line, retCmd.StartTs, retCmd.EndTs, "error:", err) + } + }() + if len(decoder.pendingCmds) > 0 { + cmd := decoder.pendingCmds[0] + decoder.pendingCmds = decoder.pendingCmds[1:] + return cmd, nil + } + + kvs := make(map[string]string, 25) + for { + line, filename, lineIdx, err := reader.ReadLine() + if err != nil { + return nil, err + } + clear(kvs) + err = parseLog(kvs, hack.String(line)) + if err != nil { + return nil, errors.Errorf("%s, line %d: %s", filename, lineIdx, err.Error()) + } + connStr := kvs[auditPluginKeyConnID] + if len(connStr) == 0 { + return nil, errors.Errorf("%s, line %d: no connection id in line: %s", filename, lineIdx, line) + } + upstreamConnID, err := strconv.ParseUint(connStr, 10, 64) + if err != nil { + return nil, errors.Errorf("%s, line %d: parsing connection id failed: %s", filename, lineIdx, connStr) + } + + // TODO: add both startTs and endTs in extension log. We only have the endTS is the current format. + endTs, err := time.Parse(timeLayout, kvs[auditPluginKeyLogTime]) + if endTs.Before(decoder.commandEndTime) { + // Ignore the commands before CommandEndTime. + continue + } + + var connID uint64 + if connCtx, ok := decoder.connInfo[upstreamConnID]; ok { + connID = connCtx.connID + } else { + // New connection, allocate a new connection ID. + if decoder.idAllocator == nil { + connID = upstreamConnID + } else { + connID = decoder.idAllocator.alloc() + } + connCtx.connID = connID + decoder.connInfo[upstreamConnID] = connCtx + } + + eventStr := kvs[auditPluginKeyEvent] + if len(eventStr) <= 4 { + return nil, errors.Errorf("%s, line %d: invalid event field: %s", filename, lineIdx, eventStr) + } + // Remove the surrounding quotes and brackets. + eventStr = eventStr[2 : len(eventStr)-2] + events := strings.Split(eventStr, ",") + var cmds []*Command + switch events[0] { + case "CONNECTION": + if len(events) > 1 && events[1] == "DISCONNECT" { + delete(decoder.connInfo, upstreamConnID) + cmds = []*Command{{ + Type: pnet.ComQuit, + Payload: []byte{pnet.ComQuit.Byte()}, + }} + } + case "QUERY": + cmds, err = decoder.parseQueryEvent(kvs, events, upstreamConnID) + } + if err != nil { + return nil, errors.Wrapf(err, "%s, line %d", filename, lineIdx) + } + // The log is ignored, skip. + if len(cmds) == 0 { + continue + } + + db := kvs[auditPluginKeyCurDB] + for _, cmd := range cmds { + cmd.Success = true + cmd.UpstreamConnID = upstreamConnID + cmd.ConnID = connID + // We don't have an accurate startTs in extension log. + cmd.StartTs = endTs + cmd.CurDB = db + cmd.FileName = filename + cmd.Line = lineIdx + cmd.EndTs = endTs + cmd.kvs = kvs + } + if len(cmds) > 1 { + decoder.pendingCmds = cmds[1:] + } + return cmds[0], nil + } +} + +func (decoder *AuditLogExtensionDecoder) parseQueryEvent(kvs map[string]string, events []string, connID uint64) ([]*Command, error) { + connInfo := decoder.connInfo[connID] + if connInfo.preparedStmt == nil { + connInfo.preparedStmt = make(map[uint32]struct{}) + connInfo.preparedStmtSql = make(map[string]uint32) + } + + var sql string + sqlStr := kvs[auditPluginKeySQL] + if len(sqlStr) > 0 { + var err error + sql, err = parseSQL(sqlStr) + if err != nil { + return nil, errors.Wrapf(err, "unquote sql failed: %s", sqlStr) + } + } + cmds := make([]*Command, 0, 3) + // Only handle two events: + // - QUERY,EXECUTE + // - QUERY + if events[0] == "QUERY" && len(events) > 1 && events[1] == "EXECUTE" { + params, ok := kvs[auditPluginKeyParams] + if !ok { + return nil, nil + } + args, err := parseExecuteParamsForExtension(params) + if err != nil { + return nil, err + } + + var stmtID uint32 + var shouldPrepare bool + + switch decoder.psCloseStrategy { + case PSCloseStrategyAlways: + connInfo.lastPsID++ + decoder.connInfo[connID] = connInfo + stmtID = connInfo.lastPsID + shouldPrepare = true + case PSCloseStrategyNever: + if id, ok := connInfo.preparedStmtSql[sql]; ok { + shouldPrepare = false + stmtID = id + } else { + connInfo.lastPsID++ + connInfo.preparedStmtSql[sql] = connInfo.lastPsID + decoder.connInfo[connID] = connInfo + stmtID = connInfo.lastPsID + shouldPrepare = true + } + } + + // Append PREPARE command if needed. + if shouldPrepare { + cmds = append(cmds, &Command{ + CapturedPsID: stmtID, + Type: pnet.ComStmtPrepare, + StmtType: kvs[auditPluginKeyStmtType], + PreparedStmt: sql, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, hack.Slice(sql)...), + }) + } + + // Append EXECUTE command + executeReq, err := pnet.MakeExecuteStmtRequest(stmtID, args, true) + if err != nil { + return nil, errors.Wrapf(err, "make execute request failed") + } + cmds = append(cmds, &Command{ + CapturedPsID: stmtID, + Type: pnet.ComStmtExecute, + StmtType: kvs[auditPluginKeyStmtType], + PreparedStmt: sql, + Params: args, + Payload: executeReq, + }) + connInfo.lastCmd = cmds[len(cmds)-1] + + // Append CLOSE command if needed. + if decoder.psCloseStrategy == PSCloseStrategyAlways { + // close the prepared statement right after it's executed. + cmds = append(cmds, &Command{ + CapturedPsID: stmtID, + Type: pnet.ComStmtClose, + StmtType: kvs[auditPluginKeyStmtType], + PreparedStmt: sql, + Payload: pnet.MakeCloseStmtRequest(stmtID), + }) + } + } else if events[0] == "QUERY" { + cmds = append(cmds, &Command{ + Type: pnet.ComQuery, + StmtType: kvs[auditPluginKeyStmtType], + Payload: append([]byte{pnet.ComQuery.Byte()}, hack.Slice(sql)...), + }) + connInfo.lastCmd = cmds[0] + } + + decoder.connInfo[connID] = connInfo + return cmds, nil +} + +// parseExecuteParamsForExtension parses the param in audit log extension field like "[1,abc,NULL,\"test bytes\""]" +// This function has the following known limitations: +// - All params are returned as string type. It cannot distinguish int 1 and string "1". +// - It cannot distinguish single empty string and no param. +func parseExecuteParamsForExtension(value string) ([]any, error) { + v, err := strconv.Unquote(value) + if err != nil { + return nil, errors.Wrapf(err, "unquote execute params failed: %s", value) + } + if v[0] != '[' || v[len(v)-1] != ']' { + return nil, errors.Errorf("no brackets in params: %s", value) + } + v = v[1 : len(v)-1] + if len(v) == 0 { + return nil, nil + } + + params := make([]any, 0, 10) + for idx := 0; idx < len(v); idx++ { + switch v[idx] { + case '"': + endIdx := skipQuotes(v[idx+1:], false) + if endIdx == -1 { + return nil, errors.Errorf("unterminated quote in params: %s", v[idx+1:]) + } + + unquoted, err := strconv.Unquote(v[idx : idx+endIdx+2]) + if err != nil { + return nil, errors.Wrapf(err, "unquote param failed: %s", v[idx:idx+endIdx+2]) + } + params = append(params, unquoted) + idx += endIdx + 1 + case ',', ' ': + default: + endIdx := strings.Index(v[idx:], ",") + if endIdx == -1 { + endIdx = len(v) - idx + } + params = append(params, v[idx:idx+endIdx]) + idx += endIdx - 1 + } + } + + return params, nil +} diff --git a/pkg/sqlreplay/cmd/audit_log_extension_test.go b/pkg/sqlreplay/cmd/audit_log_extension_test.go new file mode 100644 index 00000000..4d748f78 --- /dev/null +++ b/pkg/sqlreplay/cmd/audit_log_extension_test.go @@ -0,0 +1,480 @@ +// Copyright 2026 PingCAP, Inc. +// SPDX-License-Identifier: Apache-2.0 + +package cmd + +import ( + "testing" + "time" + + pnet "github.com/pingcap/tiproxy/pkg/proxy/net" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestDecodeMultiLinesForExtension(t *testing.T) { + tests := []struct { + lines string + cmds []*Command + }{ + { + // db is changed in the second sql + lines: `[2026/01/08 19:44:11.099 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-0009] [EVENT="[QUERY,QUERY_DDL]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)"] +[2026/01/08 19:44:11.110 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-000a] [EVENT="[QUERY,QUERY_DML,INSERT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test1] [SQL_TEXT="INSERT IGNORE INTO test_table (id, name, age, salary, price, weight, is_active, created_at) VALUES (1, 'base_record', 25, 50000.00, 99.99, 123.456789, true, '2023-01-01 12:00:00')"] [AFFECTED_ROWS=1]`, + cmds: []*Command{ + { + CurDB: "test", + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 99000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 99000000, time.Local), + ConnID: 260047062, + UpstreamConnID: 260047062, + Type: pnet.ComQuery, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)")...), + Line: 1, + Success: true, + }, + { + CurDB: "test1", + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 110000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 110000000, time.Local), + ConnID: 260047062, + UpstreamConnID: 260047062, + Type: pnet.ComQuery, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("INSERT IGNORE INTO test_table (id, name, age, salary, price, weight, is_active, created_at) VALUES (1, 'base_record', 25, 50000.00, 99.99, 123.456789, true, '2023-01-01 12:00:00')")...), + Line: 2, + Success: true, + }, + }, + }, + { + // db stays the same in the second sql + lines: `[2026/01/08 19:44:11.099 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-0009] [EVENT="[QUERY,QUERY_DDL]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)"] +[2026/01/08 19:44:11.110 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-000a] [EVENT="[QUERY,QUERY_DML,INSERT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="INSERT IGNORE INTO test_table (id, name, age, salary, price, weight, is_active, created_at) VALUES (1, 'base_record', 25, 50000.00, 99.99, 123.456789, true, '2023-01-01 12:00:00')"] [AFFECTED_ROWS=1]`, + cmds: []*Command{ + { + CurDB: "test", + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 99000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 99000000, time.Local), + ConnID: 260047062, + UpstreamConnID: 260047062, + Type: pnet.ComQuery, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)")...), + Line: 1, + Success: true, + }, + { + CurDB: "test", + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 110000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 110000000, time.Local), + ConnID: 260047062, + UpstreamConnID: 260047062, + Type: pnet.ComQuery, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("INSERT IGNORE INTO test_table (id, name, age, salary, price, weight, is_active, created_at) VALUES (1, 'base_record', 25, 50000.00, 99.99, 123.456789, true, '2023-01-01 12:00:00')")...), + Line: 2, + Success: true, + }, + }, + }, + { + // new connection + quit connection + lines: `[2026/01/08 19:44:27.746 +08:00] [INFO] [ID=2eda2307-54c1-49e5-9e65-eebf340b6d8f-0002] [EVENT="[CONNECTION,CONNECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047206] [SESSION_ALIAS=] [TABLES="[]"] [STATUS_CODE=1] [CURRENT_DB=test] [CONNECTION_TYPE=TCP] [PID=454064] [SERVER_VERSION=v9.0.0-beta.2.pre-1017-gbedae51b62] [SSL_VERSION=] [HOST_IP=127.0.0.1] [HOST_PORT=4000] [CLIENT_IP=127.0.0.1] [CLIENT_PORT=49366] [AUTH_METHOD=mysql_native_password] [CONN_ATTRS="{\"_client_name\":\"Go-MySQL-Driver\",\"_os\":\"linux\",\"_pid\":\"460879\",\"_platform\":\"amd64\",\"_server_host\":\"127.0.0.1\"}"] +[2026/01/08 19:44:27.782 +08:00] [INFO] [ID=2eda2307-54c1-49e5-9e65-eebf340b6d8f-0003] [EVENT="[QUERY,QUERY_DDL]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047206] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)"] +[2026/01/08 19:44:27.963 +08:00] [INFO] [ID=2eda2307-54c1-49e5-9e65-eebf340b6d8f-000e] [EVENT="[CONNECTION,DISCONNECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047206] [SESSION_ALIAS=] [TABLES="[]"] [STATUS_CODE=1] [CONNECTION_TYPE=TCP] [PID=454064] [SERVER_VERSION=v9.0.0-beta.2.pre-1017-gbedae51b62] [SSL_VERSION=] [HOST_IP=127.0.0.1] [HOST_PORT=4000] [CLIENT_IP=127.0.0.1] [CLIENT_PORT=49366] [AUTH_METHOD=mysql_native_password] [CONN_ATTRS="{\"_client_name\":\"Go-MySQL-Driver\",\"_os\":\"linux\",\"_pid\":\"460879\",\"_platform\":\"amd64\",\"_server_host\":\"127.0.0.1\"}"]`, + cmds: []*Command{ + { + CurDB: "test", + StartTs: time.Date(2026, 1, 8, 19, 44, 27, 782000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 27, 782000000, time.Local), + ConnID: 260047206, + UpstreamConnID: 260047206, + Type: pnet.ComQuery, + Payload: append([]byte{pnet.ComQuery.Byte()}, []byte("CREATE TABLE IF NOT EXISTS test_table (id BIGINT PRIMARY KEY, name VARCHAR(255), age INT, salary DECIMAL(10,2), price FLOAT, weight DOUBLE, is_active BOOLEAN, data BLOB, created_at TIMESTAMP)")...), + Line: 2, + Success: true, + }, + { + CurDB: "", + StartTs: time.Date(2026, 1, 8, 19, 44, 27, 963000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 27, 963000000, time.Local), + ConnID: 260047206, + UpstreamConnID: 260047206, + Type: pnet.ComQuit, + Payload: []byte{pnet.ComQuit.Byte()}, + Line: 3, + Success: true, + }, + }, + }, + { + // 2 prepared statements + lines: `[2026/01/08 19:44:11.114 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[1]"] +[2026/01/08 19:44:11.379 +08:00] [INFO] [ID=187143c0-a8ea-44ec-b28d-5af6129fb3f9-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[0]"]`, + cmds: []*Command{ + { + Type: pnet.ComStmtPrepare, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT * FROM test_table WHERE is_active = ?")...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: []byte{0x17, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x31}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"1"}, + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtClose, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtPrepare, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 2, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT * FROM test_table WHERE is_active = ?")...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 2, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 2, + Payload: []byte{0x17, 0x02, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x30}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"0"}, + Line: 2, + Success: true, + }, + { + Type: pnet.ComStmtClose, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 2, + Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{2, 0, 0, 0}...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 2, + Success: true, + }, + }, + }, + { + // 2 different connections + lines: `[2026/01/08 19:44:11.114 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[1]"] +[2026/01/08 19:44:11.379 +08:00] [INFO] [ID=187143c0-a8ea-44ec-b28d-5af6129fb3f9-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047063] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[0]"]`, + cmds: []*Command{ + { + Type: pnet.ComStmtPrepare, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT * FROM test_table WHERE is_active = ?")...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: []byte{0x17, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x31}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"1"}, + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtClose, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtPrepare, + CurDB: "test", + ConnID: 260047063, + UpstreamConnID: 260047063, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT * FROM test_table WHERE is_active = ?")...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 2, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047063, + UpstreamConnID: 260047063, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 1, + Payload: []byte{0x17, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x30}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"0"}, + Line: 2, + Success: true, + }, + { + Type: pnet.ComStmtClose, + CurDB: "test", + ConnID: 260047063, + UpstreamConnID: 260047063, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtClose.Byte()}, []byte{1, 0, 0, 0}...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 2, + Success: true, + }, + }, + }, + } + + for i, test := range tests { + decoder := NewAuditLogExtensionDecoder(zap.NewNop()) + decoder.SetPSCloseStrategy(PSCloseStrategyAlways) + mr := mockReader{data: append([]byte(test.lines), '\n'), filename: "my/file"} + cmds, err := decodeCmds(decoder, &mr) + if err != nil { + require.ErrorContains(t, err, "EOF", "case %d", i) + } + for _, cmd := range test.cmds { + cmd.FileName = "my/file" + } + require.Equal(t, test.cmds, cmds, "case %d", i) + } +} + +func TestDecodeAuditExtensionInNeverMode(t *testing.T) { + tests := []struct { + lines string + cmds []*Command + }{ + { + lines: `[2026/01/08 19:44:11.114 +08:00] [INFO] [ID=f1c681c2-8d80-4677-9dd4-6b7222610aa8-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[1]"] +[2026/01/08 19:44:11.379 +08:00] [INFO] [ID=187143c0-a8ea-44ec-b28d-5af6129fb3f9-000b] [EVENT="[QUERY,EXECUTE,SELECT]"] [USER=root] [ROLES="[]"] [CONNECTION_ID=260047062] [SESSION_ALIAS=] [TABLES="[` + "`" + `test` + "`" + `.` + "`" + `test_table` + "`" + `]"] [STATUS_CODE=1] [CURRENT_DB=test] [SQL_TEXT="SELECT * FROM test_table WHERE is_active = ?"] [EXECUTE_PARAMS="[0]"]`, + cmds: []*Command{ + { + Type: pnet.ComStmtPrepare, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: append([]byte{pnet.ComStmtPrepare.Byte()}, []byte("SELECT * FROM test_table WHERE is_active = ?")...), + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 114000000, time.Local), + CapturedPsID: 1, + Payload: []byte{0x17, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x31}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"1"}, + Line: 1, + Success: true, + }, + { + Type: pnet.ComStmtExecute, + CurDB: "test", + ConnID: 260047062, + UpstreamConnID: 260047062, + StartTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + EndTs: time.Date(2026, 1, 8, 19, 44, 11, 379000000, time.Local), + CapturedPsID: 1, + Payload: []byte{0x17, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x01, 0xfe, 0x00, 0x01, 0x30}, + StmtType: "", + PreparedStmt: "SELECT * FROM test_table WHERE is_active = ?", + Params: []any{"0"}, + Line: 2, + Success: true, + }, + }, + }, + } + + for i, test := range tests { + decoder := NewAuditLogExtensionDecoder(zap.NewNop()) + decoder.SetPSCloseStrategy(PSCloseStrategyNever) + mr := mockReader{data: append([]byte(test.lines), '\n')} + cmds, err := decodeCmds(decoder, &mr) + require.Error(t, err, "case %d", i) + require.Equal(t, test.cmds, cmds, "case %d", i) + } +} + +func TestParseExecuteParamsForExtension(t *testing.T) { + tests := []struct { + input string + expected []any + hasError bool + }{ + { + input: `"[]"`, + expected: nil, + hasError: false, + }, + { + input: `"[1]"`, + expected: []any{"1"}, + hasError: false, + }, + { + input: `"[1,2,3]"`, + expected: []any{"1", "2", "3"}, + hasError: false, + }, + { + input: `"[abc]"`, + expected: []any{"abc"}, + hasError: false, + }, + { + input: `"['abc']"`, + expected: []any{"'abc'"}, + hasError: false, + }, + { + input: `"[NULL]"`, + expected: []any{"NULL"}, + hasError: false, + }, + { + input: `"[1,abc,NULL,\"test bytes\"]"`, + expected: []any{"1", "abc", "NULL", "test bytes"}, + hasError: false, + }, + { + input: `"[\"test\\\"escape\"]"`, + expected: []any{"test\"escape"}, + hasError: false, + }, + { + input: `"[\"hello world\"]"`, + expected: []any{"hello world"}, + hasError: false, + }, + { + input: `"[1, 2, 3]"`, + expected: []any{"1", "2", "3"}, + hasError: false, + }, + { + input: `"[\"hello, world\"]"`, + expected: []any{"hello, world"}, + hasError: false, + }, + { + input: `"[1.5,2.75,3.14]"`, + expected: []any{"1.5", "2.75", "3.14"}, + hasError: false, + }, + { + input: `"[true,false]"`, + expected: []any{"true", "false"}, + hasError: false, + }, + { + input: `"[1,2,3"`, + expected: nil, + hasError: true, + }, + { + input: `"1,2,3]"`, + expected: nil, + hasError: true, + }, + { + input: `"[\"abc ]"`, + expected: nil, + hasError: true, + }, + { + input: `[1,2,3]`, + expected: nil, + hasError: true, + }, + } + + for _, tt := range tests { + result, err := parseExecuteParamsForExtension(tt.input) + if tt.hasError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, tt.expected, result) + } + } +} diff --git a/pkg/sqlreplay/cmd/audit_log_plugin.go b/pkg/sqlreplay/cmd/audit_log_plugin.go index db9a2f67..9ca56408 100644 --- a/pkg/sqlreplay/cmd/audit_log_plugin.go +++ b/pkg/sqlreplay/cmd/audit_log_plugin.go @@ -31,6 +31,11 @@ const ( auditPluginKeyPreparedStmtID = "PREPARED_STMT_ID" auditPluginKeyRetry = "RETRY" + // auditPluginKeyLogTime is a special marker for the first field in audit log line, which is usually the + // log time. It doesn't have key in the original format, but replayer will give it a special key. + // As it's not a real key, it'll start with underscore to avoid conflict with real keys. + auditPluginKeyLogTime = "_LOG_TIME" + auditPluginClassGeneral = "GENERAL" auditPluginClassTableAccess = "TABLE_ACCESS" auditPluginClassConnect = "CONNECTION" @@ -72,7 +77,7 @@ type auditLogPluginConnCtx struct { lastCmd *Command } -func NewAuditLogPluginDecoder(dedup *DeDup, lg *zap.Logger) *AuditLogPluginDecoder { +func NewAuditLogPluginDecoder(dedup *DeDup, lg *zap.Logger) AuditLogDecoder { return &AuditLogPluginDecoder{ connInfo: make(map[uint64]auditLogPluginConnCtx), psCloseStrategy: PSCloseStrategyDirected, @@ -82,6 +87,7 @@ func NewAuditLogPluginDecoder(dedup *DeDup, lg *zap.Logger) *AuditLogPluginDecod } var _ CmdDecoder = (*AuditLogPluginDecoder)(nil) +var _ AuditLogDecoder = (*AuditLogPluginDecoder)(nil) // PSCloseStrategy defines when to close the prepared statements. type PSCloseStrategy string @@ -97,6 +103,15 @@ const ( PSCloseStrategyDirected PSCloseStrategy = "directed" ) +type AuditLogDecoder interface { + CmdDecoder + + SetPSCloseStrategy(s PSCloseStrategy) + SetIDAllocator(alloc *ConnIDAllocator) + SetCommandEndTime(t time.Time) + EnableFilterCommandWithRetry() +} + type AuditLogPluginDecoder struct { connInfo map[uint64]auditLogPluginConnCtx commandStartTime time.Time @@ -245,6 +260,7 @@ func (decoder *AuditLogPluginDecoder) SetIDAllocator(alloc *ConnIDAllocator) { // All SQL_TEXT are converted into one line in audit log. func parseLog(kv map[string]string, line string) error { + firstField := true for idx := 0; idx < len(line); idx++ { switch line[idx] { case '[': @@ -253,7 +269,11 @@ func parseLog(kv map[string]string, line string) error { return err } idx += endIdx + 1 - if len(key) > 0 { + + if firstField { + kv[auditPluginKeyLogTime] = value + firstField = false + } else if len(key) > 0 { kv[key] = value } } diff --git a/pkg/sqlreplay/cmd/audit_log_plugin_test.go b/pkg/sqlreplay/cmd/audit_log_plugin_test.go index e642a000..77ddb045 100644 --- a/pkg/sqlreplay/cmd/audit_log_plugin_test.go +++ b/pkg/sqlreplay/cmd/audit_log_plugin_test.go @@ -192,6 +192,7 @@ func TestParseLog(t *testing.T) { }, { line: "[abc]", + kvs: map[string]string{auditPluginKeyLogTime: "abc"}, hasErr: false, }, { @@ -216,7 +217,7 @@ func TestParseLog(t *testing.T) { }, { line: "[abc=def]", - kvs: map[string]string{"abc": "def"}, + kvs: map[string]string{auditPluginKeyLogTime: "def"}, hasErr: false, }, { @@ -225,12 +226,12 @@ func TestParseLog(t *testing.T) { }, { line: "[abc=def=ghi]", - kvs: map[string]string{"abc": "def=ghi"}, + kvs: map[string]string{auditPluginKeyLogTime: "def=ghi"}, hasErr: false, }, { line: "[a=\"b\"]", - kvs: map[string]string{"a": "\"b\""}, + kvs: map[string]string{auditPluginKeyLogTime: "\"b\""}, hasErr: false, }, { @@ -239,7 +240,7 @@ func TestParseLog(t *testing.T) { }, { line: "[abc][a=b]", - kvs: map[string]string{"a": "b"}, + kvs: map[string]string{auditPluginKeyLogTime: "abc", "a": "b"}, hasErr: false, }, { @@ -248,16 +249,17 @@ func TestParseLog(t *testing.T) { }, { line: "a[abc]a", + kvs: map[string]string{auditPluginKeyLogTime: "abc"}, hasErr: false, }, { line: "a[a=b]a", - kvs: map[string]string{"a": "b"}, + kvs: map[string]string{auditPluginKeyLogTime: "b"}, hasErr: false, }, { line: "a[a=b]a[c=d]", - kvs: map[string]string{"a": "b", "c": "d"}, + kvs: map[string]string{auditPluginKeyLogTime: "b", "c": "d"}, hasErr: false, }, { @@ -266,17 +268,17 @@ func TestParseLog(t *testing.T) { }, { line: `[2025/09/06 17:03:53.720 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.720 +08:00] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=1336.083] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"[=]\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, - kvs: map[string]string{"ID": "17571494330", "TIMESTAMP": "2025/09/06 17:03:53.720 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "1336.083", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"[=]\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, + kvs: map[string]string{auditPluginKeyLogTime: "2025/09/06 17:03:53.720 +08:00", "ID": "17571494330", "TIMESTAMP": "2025/09/06 17:03:53.720 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "1336.083", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"[=]\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, hasErr: false, }, { line: `[2025/09/06 17:03:53.717 +08:00] [INFO] [logger.go:77] [ID=17571494330] [TIMESTAMP=2025/09/06 17:03:53.717 +08:00] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=824806376.375] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"\n\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, - kvs: map[string]string{"ID": "17571494330", "TIMESTAMP": "2025/09/06 17:03:53.717 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "824806376.375", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"\\n\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, + kvs: map[string]string{auditPluginKeyLogTime: "2025/09/06 17:03:53.717 +08:00", "ID": "17571494330", "TIMESTAMP": "2025/09/06 17:03:53.717 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "824806376.375", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"\\n\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, hasErr: false, }, { line: `[2025/09/06 16:50:08.917 +08:00] [INFO] [logger.go:77] [ID=17571486080] [TIMESTAMP=2025/09/06 16:50:08.917 +08:00] [EVENT_CLASS=GENERAL] [EVENT_SUBCLASS=] [STATUS_CODE=0] [COST_TIME=2442.333] [HOST=127.0.0.1] [CLIENT_IP=127.0.0.1] [USER=root] [DATABASES="[]"] [TABLES="[]"] [SQL_TEXT="select \"\n\""] [ROWS=0] [CONNECTION_ID=3695181836] [CLIENT_PORT=63912] [PID=61215] [COMMAND=Query] [SQL_STATEMENTS=Select]`, - kvs: map[string]string{"ID": "17571486080", "TIMESTAMP": "2025/09/06 16:50:08.917 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "2442.333", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"\\n\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, + kvs: map[string]string{auditPluginKeyLogTime: "2025/09/06 16:50:08.917 +08:00", "ID": "17571486080", "TIMESTAMP": "2025/09/06 16:50:08.917 +08:00", "EVENT_CLASS": "GENERAL", "EVENT_SUBCLASS": "", "STATUS_CODE": "0", "COST_TIME": "2442.333", "HOST": "127.0.0.1", "CLIENT_IP": "127.0.0.1", "USER": "root", "DATABASES": "\"[]\"", "TABLES": "\"[]\"", "SQL_TEXT": "\"select \\\"\\n\\\"\"", "ROWS": "0", "CONNECTION_ID": "3695181836", "CLIENT_PORT": "63912", "PID": "61215", "COMMAND": "Query", "SQL_STATEMENTS": "Select"}, hasErr: false, }, } diff --git a/pkg/sqlreplay/cmd/cmd.go b/pkg/sqlreplay/cmd/cmd.go index ea25ec6a..4e0669cb 100644 --- a/pkg/sqlreplay/cmd/cmd.go +++ b/pkg/sqlreplay/cmd/cmd.go @@ -15,11 +15,23 @@ import ( "go.uber.org/zap" ) +// TrafficFormat is the supported format of traffic files. +type TrafficFormat string + const ( - FormatNative = "native" - FormatAuditLogPlugin = "audit_log_plugin" + FormatNative TrafficFormat = "native" + FormatAuditLogPlugin TrafficFormat = "audit_log_plugin" + FormatAuditLogExtension TrafficFormat = "audit_log_extension" ) +func (f TrafficFormat) String() string { + return string(f) +} + +func (f TrafficFormat) IsAuditLogFormat() bool { + return f == FormatAuditLogPlugin || f == FormatAuditLogExtension +} + type LineReader interface { String() string ReadLine() ([]byte, string, int, error) @@ -27,7 +39,7 @@ type LineReader interface { Close() } -func NewCmdEncoder(_ string) CmdEncoder { +func NewCmdEncoder(_ TrafficFormat) CmdEncoder { // Only support writing native format return NewNativeEncoder() } @@ -36,10 +48,12 @@ type CmdEncoder interface { Encode(c *Command, writer *bytes.Buffer) error } -func NewCmdDecoder(format string, dedup *DeDup, lg *zap.Logger) CmdDecoder { +func NewCmdDecoder(format TrafficFormat, dedup *DeDup, lg *zap.Logger) CmdDecoder { switch format { case FormatAuditLogPlugin: return NewAuditLogPluginDecoder(dedup, lg) + case FormatAuditLogExtension: + return NewAuditLogExtensionDecoder(lg) default: return NewNativeDecoder() } diff --git a/pkg/sqlreplay/manager/job.go b/pkg/sqlreplay/manager/job.go index 21a5c9e8..e82245b4 100644 --- a/pkg/sqlreplay/manager/job.go +++ b/pkg/sqlreplay/manager/job.go @@ -10,6 +10,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tiproxy/pkg/sqlreplay/capture" + "github.com/pingcap/tiproxy/pkg/sqlreplay/cmd" "github.com/pingcap/tiproxy/pkg/sqlreplay/replay" "github.com/siddontang/go/hack" ) @@ -129,13 +130,13 @@ type replayJob struct { type replayJob4Marshal struct { job4Marshal - LastCmdTs string `json:"last_cmd_ts,omitempty"` - Input string `json:"input,omitempty"` - Username string `json:"username,omitempty"` - Format string `json:"format,omitempty"` - Speed float64 `json:"speed,omitempty"` - ReadOnly bool `json:"readonly,omitempty"` - Addr string `json:"addr,omitempty"` + LastCmdTs string `json:"last_cmd_ts,omitempty"` + Input string `json:"input,omitempty"` + Username string `json:"username,omitempty"` + Format cmd.TrafficFormat `json:"format,omitempty"` + Speed float64 `json:"speed,omitempty"` + ReadOnly bool `json:"readonly,omitempty"` + Addr string `json:"addr,omitempty"` } func (job *replayJob) Type() JobType { diff --git a/pkg/sqlreplay/replay/mock_test.go b/pkg/sqlreplay/replay/mock_test.go index 096796ec..fd6f8bab 100644 --- a/pkg/sqlreplay/replay/mock_test.go +++ b/pkg/sqlreplay/replay/mock_test.go @@ -163,7 +163,7 @@ func newMockNormalLoader() *mockNormalLoader { return &mockNormalLoader{} } -func (m *mockNormalLoader) writeCommand(command *cmd.Command, format string) { +func (m *mockNormalLoader) writeCommand(command *cmd.Command, format cmd.TrafficFormat) { encoder := cmd.NewCmdEncoder(format) _ = encoder.Encode(command, &m.buf) } diff --git a/pkg/sqlreplay/replay/replay.go b/pkg/sqlreplay/replay/replay.go index 0ff58d1e..ac8afb26 100644 --- a/pkg/sqlreplay/replay/replay.go +++ b/pkg/sqlreplay/replay/replay.go @@ -83,7 +83,7 @@ type Replay interface { } type ReplayConfig struct { - Format string + Format cmd.TrafficFormat Input string Username string Password string `json:"-"` @@ -141,7 +141,7 @@ func (cfg *ReplayConfig) Validate() ([]storage.ExternalStorage, error) { return nil, errors.New("input is required") } inputs := strings.Split(cfg.Input, ",") - if len(inputs) > 1 && cfg.Format != cmd.FormatAuditLogPlugin { + if len(inputs) > 1 && !cfg.Format.IsAuditLogFormat() { return nil, errors.New("only `audit_log_plugin` format supports multiple input files") } var storages []storage.ExternalStorage @@ -172,7 +172,7 @@ func (cfg *ReplayConfig) Validate() ([]storage.ExternalStorage, error) { return storages, errors.Errorf("speed should be between %f and %f", minSpeed, maxSpeed) } switch cfg.Format { - case cmd.FormatAuditLogPlugin, cmd.FormatNative, "": + case cmd.FormatAuditLogPlugin, cmd.FormatAuditLogExtension, cmd.FormatNative, "": default: return storages, errors.Errorf("invalid traffic file format %s", cfg.Format) } @@ -203,10 +203,22 @@ func (cfg *ReplayConfig) Validate() ([]storage.ExternalStorage, error) { default: return storages, errors.Errorf("invalid prepared statement close strategy %s", cfg.PSCloseStrategy) } - if cfg.Format != cmd.FormatAuditLogPlugin && !cfg.CommandEndTime.IsZero() { + if !cfg.Format.IsAuditLogFormat() && !cfg.CommandEndTime.IsZero() { return storages, errors.New("command end time is only supported for `audit_log_plugin` format") } + if cfg.Format == cmd.FormatAuditLogExtension { + if cfg.PSCloseStrategy == cmd.PSCloseStrategyDirected { + return storages, errors.New("prepared statement directed close strategy is not supported for audit log plugin v2 format") + } + if cfg.FilterCommandWithRetry { + return storages, errors.New("filtering commands with retry is not supported for audit log plugin v2 format") + } + if !cfg.CommandStartTime.IsZero() { + return storages, errors.New("command start time filter is not supported for audit log plugin v2 format") + } + } + if cfg.DynamicInput { if len(storages) != 1 { return storages, errors.New("dynamic input cannot be enabled with more than one input") @@ -254,7 +266,7 @@ func (cfg *ReplayConfig) LoadFromCheckpoint() error { cfg.CommandStartTime = time.Unix(0, state.CurCmdTs) } // Only load `CommandEndTime` for `audit_log_plugin` format, or it'll not pass validation. - if state.CurCmdEndTs > 0 && cfg.Format == cmd.FormatAuditLogPlugin { + if state.CurCmdEndTs > 0 && cfg.Format.IsAuditLogFormat() { cfg.CommandEndTime = time.Unix(0, state.CurCmdEndTs) } return nil @@ -616,7 +628,7 @@ func (r *replay) constructDecoderForReader(ctx context.Context, reader cmd.LineR // impossible for decoder to know whether `use xxx` will be executed, and thus cannot maintain // the current session state correctly. cmdDecoder.SetCommandStartTime(r.cfg.CommandStartTime) - if auditLogDecoder, ok := cmdDecoder.(*cmd.AuditLogPluginDecoder); ok { + if auditLogDecoder, ok := cmdDecoder.(cmd.AuditLogDecoder); ok { auditLogDecoder.SetPSCloseStrategy(r.cfg.PSCloseStrategy) auditLogDecoder.SetIDAllocator(idAllocator) auditLogDecoder.SetCommandEndTime(r.cfg.CommandEndTime) diff --git a/pkg/sqlreplay/store/line.go b/pkg/sqlreplay/store/line.go index 84b3f677..13a4e62d 100644 --- a/pkg/sqlreplay/store/line.go +++ b/pkg/sqlreplay/store/line.go @@ -30,7 +30,7 @@ func NewWriter(lg *zap.Logger, externalStorage storage.ExternalStorage, cfg Writ type ReaderCfg struct { Dir string - Format string + Format cmd.TrafficFormat EncryptionMethod string EncryptionKey []byte // Reader will skip the files whose end time is before FileNameFilterTime. diff --git a/pkg/sqlreplay/store/rotate.go b/pkg/sqlreplay/store/rotate.go index 575465a6..4e976742 100644 --- a/pkg/sqlreplay/store/rotate.go +++ b/pkg/sqlreplay/store/rotate.go @@ -231,7 +231,7 @@ func (r *rotateReader) openFileLoop(ctx context.Context) error { } fileIdx := parseFunc(name, fileNamePrefix) if fileIdx == 0 { - r.lg.Warn("traffic file name is invalid", zap.String("filename", name), zap.String("format", r.cfg.Format)) + r.lg.Warn("traffic file name is invalid", zap.String("filename", name), zap.String("format", string(r.cfg.Format))) return false, nil } if fileIdx <= curFileIdx { @@ -302,7 +302,7 @@ func (r *rotateReader) nextReader() error { // The return value of the function indicates whether this file is valid or not. // For S3 storage and audit log format, it'll stop walking once the fn returns true. func (r *rotateReader) walkFile(ctx context.Context, curfileName string, fn func(string, int64) (bool, error)) error { - if s3, ok := r.storage.(*storage.S3Storage); ok && r.cfg.Format == cmd.FormatAuditLogPlugin { + if s3, ok := r.storage.(*storage.S3Storage); ok && r.cfg.Format.IsAuditLogFormat() { return r.walkS3ForAuditLogFile(ctx, curfileName, s3.GetS3APIHandle(), s3.GetOptions(), fn) } return r.storage.WalkDir(ctx, &storage.WalkOption{}, func(name string, size int64) error { @@ -401,25 +401,22 @@ func (r *rotateReader) walkS3ForAuditLogFile(ctx context.Context, curFileName st return nil } -func getFileNamePrefix(format string) string { - switch format { - case cmd.FormatAuditLogPlugin: +func getFileNamePrefix(format cmd.TrafficFormat) string { + if format.IsAuditLogFormat() { return auditFileNamePrefix } return fileNamePrefix } -func getParseFileNameFunc(format string) func(string, string) int64 { - switch format { - case cmd.FormatAuditLogPlugin: +func getParseFileNameFunc(format cmd.TrafficFormat) func(string, string) int64 { + if format.IsAuditLogFormat() { return parseFileTimeToIdx } return parseFileIdx } -func getFilterFileNameFunc(format string, fileNameFilterTime time.Time) func(string, string) bool { - switch format { - case cmd.FormatAuditLogPlugin: +func getFilterFileNameFunc(format cmd.TrafficFormat, fileNameFilterTime time.Time) func(string, string) bool { + if format.IsAuditLogFormat() { return func(name, fileNamePrefix string) bool { return filterFileByTime(name, fileNamePrefix, fileNameFilterTime) } diff --git a/pkg/sqlreplay/store/rotate_test.go b/pkg/sqlreplay/store/rotate_test.go index df4407af..1f8a2c52 100644 --- a/pkg/sqlreplay/store/rotate_test.go +++ b/pkg/sqlreplay/store/rotate_test.go @@ -159,7 +159,7 @@ func TestParseFileTime(t *testing.T) { func TestIterateFiles(t *testing.T) { tests := []struct { - format string + format cmd.TrafficFormat fileNames []string order []string }{