-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlog_writter.py
More file actions
160 lines (146 loc) · 5.82 KB
/
log_writter.py
File metadata and controls
160 lines (146 loc) · 5.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import json
import hmac, hashlib
import uuid
import gzip
import datetime
class LogWriter:
def __init__(self, base_dir, hmac_key: bytes, app_env: str):
self.base_dir = base_dir # base directory to store log files
self.hmac_key = hmac_key # HMAC key for signing anchors
self.app_env = app_env # application environment (e.g., 'practice', 'live')
self.sid = None # session ID
self.prev_hash = None # previous hash for chaining
self.fp = None # file pointer to current log file
self.current_date = None # current date for daily file management
os.makedirs(base_dir, exist_ok=True)
def _file_path(self, date_str):
"""
Generate filename for a given date's log file.
"""
return os.path.join(self.base_dir, f"{date_str}.log.gz")
def _manifest_path(self, date_str):
"""
Generate filename for a given date's anchor file (signed summary)
"""
return os.path.join(self.base_dir, f"{date_str}.anchor.json")
def _now(self):
"""
Get current UTC time.
"""
return datetime.datetime.now(datetime.timezone.utc)
def _date_str(self):
"""
Get current date string in YYYY-MM-DD iso format.
"""
return self._now().date().isoformat()
def _open_today(self):
"""
Open today's log file, creating a new one and resetting the hash chain if the date has changed
"""
date_str = self._date_str()
if self.current_date == date_str and self.fp:
return # already opened for today
self._close_with_anchor() # close previous day's file with anchor
path = self._file_path(date_str)
self.fp = gzip.open(path, "ab")
self.current_date = date_str
self.prev_hash = None # reset hash chain for new day
def _close_with_anchor(self):
"""
Close current log file and write an anchor file with summary and signature.
"""
if not self.fp or not self.current_date:
return # nothing to close
self.fp.flush()
self.fp.close()
self.fp = None
# Create anchor file with summary
anchor = {
"date": self.current_date,
"sid": self.sid,
"last_hash": self.prev_hash,
"env": self.app_env,
"ts": self._now().isoformat()
}
msg = json.dumps(anchor, sort_keys = True, separators=(",", ":")).encode()
# signs the anchor with HMAC-SHA256 to prove file wasn't tampered
sig = hmac.new(self.hmac_key, msg, hashlib.sha256).hexdigest()
anchor["sig"] = sig
with open(self._manifest_path(self.current_date), "w", encoding = "utf-8") as mf:
json.dump(anchor, mf, separators=(",", ":" ))
self.current_date = None
def start_session(self):
self.sid = str(uuid.uuid4())
self._open_today()
# Log "session.start" event
self._write("session.start", {"env": self.app_env})
def end_session(self):
self._write("session.end", {})
self._close_with_anchor()
def _canonical(self, entry):
"""
Creates a deterministic JSON string (keys sorted, no spaces) needed for consistent hashing
"""
return json.dumps(entry, sort_keys=True, separators=(",", ":")).encode()
def _write(self, event_type, payload, cid=None):
"""
Write a log entry with event type, payload, timestamp, session ID, and optional correlation ID.
Each entry is chained with a SHA256 hash of the previous entry for integrity.
"""
self._open_today()
entry = {
"ts": self._now().isoformat(),
"e" : event_type,
"sid": self.sid,
"cid": cid,
"p": payload if payload else None,
"ph": self.prev_hash
}
canonical = self._canonical(entry)
curr_hash = hashlib.sha256(canonical).hexdigest()
entry["h"] = curr_hash
self.prev_hash = curr_hash
line = self._canonical(entry) + b"\n"
self.fp.write(line)
def log_req(self, method, url_path, host, headers = None, body = None, cid = None, env = None):
payload = {
"method": method,
"path": url_path,
"host": host,
"env": env or self.app_env,
}
# redaction of sensitive headers
if headers:
redacted = {}
for k, v in headers.items():
redacted[k] = "***" if k.lower() == "authorization" else v
payload["hdr"] = redacted
# whitelist safe body fields (symbol, side, qty, price) for logging
if isinstance(body, dict):
wl = {k: body[k] for k in ("symbol", "side", "qty", "price") if k in body}
if wl:
payload["body"] = wl
self._write("req", payload, cid = cid)
def log_resp(self, status, ms, cid = None, klass = None):
payload = {
"status": status,
"ms": ms,
}
if klass:
payload["klass"] = klass
self._write("resp", payload, cid = cid)
def log_order(self, symbol, side, qty, price, cid=None):
# bucket values tom reduce sensitivity
def bucket(x, cuts):
for c in cuts:
if x <= c:
return f"<= {c}"
return f"> {cuts[-1]}"
payload = {
"sym": symbol,
"side": side,
"qty_b": bucket(qty, [10, 50, 100, 500, 1000, 5000]),
"pr_b": bucket(price, [10, 50, 100, 500, 1000, 5000]),
}
self._write("order", payload, cid = cid)