Skip to content

Commit b02e4ed

Browse files
authored
Merge pull request #3 from tarazou9/logparse
Add garnet log file into logparse
2 parents 8a41b3c + 9dd4440 commit b02e4ed

File tree

2 files changed

+225
-0
lines changed

2 files changed

+225
-0
lines changed

garnet_log_events.py

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
# Copyright (c) Microsoft Corporation. All rights reserved.
2+
3+
# Utility methods to deal with log events in DB.
4+
5+
import json
6+
import os
7+
import shutil
8+
import sqlite3
9+
import statsd
10+
import time
11+
import traceback
12+
13+
from contextlib import closing
14+
15+
# set up statsd
16+
stats = statsd.StatsClient('localhost', 8125)
17+
account_name = os.getenv("MONITORING_MDM_ACCOUNT");
18+
namespace = os.getenv("MONITORING_MDM_NAMESPACE");
19+
tenant = os.getenv("MONITORING_TENANT");
20+
role = os.getenv("MONITORING_ROLE");
21+
role_instance = os.getenv("MONITORING_ROLE_INSTANCE");
22+
23+
metric_identifier = {
24+
"Account": account_name,
25+
"Namespace": namespace,
26+
}
27+
28+
def create_connection(database_file):
29+
return sqlite3.connect(database_file)
30+
31+
def create_table(connection):
32+
with closing(connection.cursor()) as cursor:
33+
query = """CREATE TABLE IF NOT EXISTS LogEvent (
34+
event_product text,
35+
event_category text,
36+
event_type text,
37+
event_date text,
38+
attributes text,
39+
PRIMARY KEY(event_product, event_category, event_type)
40+
); """
41+
cursor.execute(query)
42+
43+
def upsert_events(connection, events_map):
44+
'''
45+
Values of the events_map are upserted into the DB as part of a single transaction,
46+
and committed at the end, to minimize disk I/O.
47+
'''
48+
if len(events_map) == 0:
49+
return
50+
51+
success = False
52+
retry_attempt = 0
53+
max_retry_attempts = 5
54+
while not success and retry_attempt < max_retry_attempts:
55+
with closing(connection.cursor()) as cursor:
56+
try:
57+
for event in events_map.values():
58+
attributes = get_attributes(event)
59+
upsert(connection, cursor, event["event_product"], event["event_category"], event["event_type"], event["date"], attributes)
60+
61+
connection.commit()
62+
success = True
63+
64+
except sqlite3.Error as e:
65+
print("Error occurred while comitting log events {0}. Retrying: {1}".format(str(events_map), str(e)))
66+
connection.rollback()
67+
success = False
68+
retry_attempt += 1
69+
if retry_attempt == max_retry_attempts:
70+
print("Emitting metrics for commit error")
71+
emit_commit_error_metrics()
72+
else:
73+
time.sleep(1)
74+
75+
def get_attributes(event):
76+
attributes = {}
77+
78+
try:
79+
if event["event_category"] == 'startup':
80+
if event["event_type"] == 'ip_address_conflict':
81+
attributes = {"endpoint": event["endpoint"]}
82+
83+
if event["event_category"] == 'tombstone':
84+
if event["event_type"] == 'warning_threshold_exceeded':
85+
attributes = {"tombstoned_cells": event["tombstoned_cells"], "keyspace": event["keyspace"], "table": event["table"]}
86+
87+
if event["event_type"] == 'error_threshold_exceeded':
88+
attributes = {"live_cells": event['live_cells'], "tombstoned_cells": event['tombstoned_cells'], "keyspace": event['keyspace'], "table": event['table'], "key": event['key'], "requested_columns": event['requested_columns'], slice_start: event['slice_start'], "slice_end": event['slice_end'], "deletion_info": event['deletion_info']}
89+
except:
90+
ex = traceback.format_exc()
91+
print("get_attributes: exception encountered - " + ex)
92+
93+
return json.dumps(attributes)
94+
95+
def upsert(connection, cursor, event_product, event_category, event_type, event_date, attributes):
96+
query = """INSERT INTO LogEvent(event_product, event_category, event_type, event_date)
97+
VALUES (?, ?, ?, ?) ON CONFLICT(event_product, event_category, event_type) DO UPDATE set event_date=?, attributes=?"""
98+
values = (event_product, event_category, event_type, event_date, event_date, attributes)
99+
100+
cursor.execute(query, values)
101+
102+
def emit_commit_error_metrics():
103+
dims = metric_identifier.copy()
104+
dims['Metric'] = "CommitLogEventsError"
105+
dims['Dims'] = {
106+
'Tenant': tenant,
107+
"Role": role,
108+
"RoleInstance": role_instance,
109+
"Service": "cassandra",
110+
}
111+
112+
emit_metrics(dims)
113+
114+
emit_metrics_to_file(dims, '/var/log/logevents_commit_error_metrics_new.json', '/var/log/logevents_commit_error_metrics.json')
115+
116+
def emit_metrics(dims):
117+
jsonDims = None
118+
try:
119+
jsonDims = json.dumps(dims)
120+
stats.gauge(jsonDims, 1)
121+
except Exception as e:
122+
print("Error emitting metrics " + jsonDims + ": " + str(e))
123+
124+
def emit_metrics_to_file(dims, src, dst):
125+
# Dump metrics into local filesystem
126+
try:
127+
# Make a copy of destination file if it already exists
128+
if os.path.exists(dst):
129+
shutil.copyfile(dst, src)
130+
131+
# Append to it and rename it as the destination file
132+
with open(src, 'a+') as f:
133+
f.write(json.dumps(dims))
134+
f.close()
135+
os.rename(f.name, dst)
136+
137+
except Exception as e:
138+
print("Error writing out metrics to file "+ dst +": " + str(e))
139+
140+
def init():
141+
database_file = r"/var/lib/garnet/logevents.db"
142+
143+
connection = create_connection(database_file)
144+
connection.execute('pragma journal_mode=wal')
145+
146+
create_table(connection)
147+
148+
return connection
149+
150+
def main():
151+
with closing(init()) as conn:
152+
print("Log events stored in Nova DB:")
153+
154+
query = '''SELECT * FROM LogEvent'''
155+
with closing(conn.cursor()) as cursor:
156+
cursor.execute(query)
157+
output = cursor.fetchall()
158+
for row in output:
159+
print(row)
160+
161+
if __name__ == '__main__':
162+
main()

log_garnet_daemon.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#!/home/azureuser/garnet-logparse/.venv/bin/python3
2+
3+
# Copyright (c) Microsoft Corporation. All rights reserved.
4+
5+
# Processes garnet log, emits it to Geneva, and stores log events in durable storage.
6+
7+
import datetime
8+
import garnet_log_events
9+
import systemlog
10+
import time
11+
12+
from contextlib import closing
13+
from fluent import sender
14+
from os import path
15+
from os import environ
16+
from pygtail import Pygtail
17+
18+
19+
# Set up fluent
20+
logger = sender.FluentSender('nova', port=25234, nanosecond_precision=True)
21+
22+
log_file = '/var/log/garnet/log/garnet.log'
23+
sleep_time = 5 # Sleep time in seconds
24+
address = environ.get('ADDRESS')
25+
26+
try:
27+
with closing(garnet_log_events.init()) as connection:
28+
while True:
29+
if not path.exists(log_file):
30+
print("Log file {0} does not exist. Going to sleep for {1} seconds".format(log_file, sleep_time))
31+
time.sleep(sleep_time)
32+
continue
33+
34+
print("File found")
35+
lines = Pygtail(log_file) # Fetch log lines
36+
37+
events = dict()
38+
num_lines = 0
39+
40+
for parsed_line in systemlog.parse_log(lines): # Processes each log line, and outputs it as a map of fields
41+
num_lines += 1
42+
# Emit the parsed log to Geneva
43+
timestamp = datetime.datetime.timestamp(parsed_line["date"])
44+
parsed_line["date"] = str(parsed_line["date"]) # If not converted to string, fluentd throws a serialization error for datetime object
45+
parsed_line["address"] = address
46+
if logger.emit_with_time('cassandra', timestamp, parsed_line):
47+
print("log sent successfully")
48+
else:
49+
print("Failed to send log")
50+
51+
# Add the parsed log to a map, which will be iterated over later, and stored persistently in the DB
52+
if parsed_line['event_type'] != 'unknown':
53+
key = "{0}:{1}:{2}".format(parsed_line["event_product"], parsed_line["event_category"], parsed_line["event_type"])
54+
events[key] = parsed_line
55+
56+
if num_lines == 0:
57+
# This is to keep Pygtail from consuming >99% CPU
58+
time.sleep(1)
59+
continue
60+
61+
garnet_log_events.upsert_events(connection, events)
62+
finally:
63+
print("Log parsing stopped")

0 commit comments

Comments
 (0)