From 837e473aa9b6c8c6b7209f808d4d53d7fdcd679b Mon Sep 17 00:00:00 2001 From: "nkvuong@gmail.com" Date: Wed, 23 Mar 2022 18:19:43 +0000 Subject: [PATCH 1/6] add readme and comments to dlt notebooks --- ...t_bronze_silver.py => 01_raw_bronze_silver.py} | 15 ++++++++++----- ...dit_logs_dlt_gold.py => 02_silver_fan_gold.py} | 1 + audit_logs/DLT/README.md | 14 ++++++++++++++ 3 files changed, 25 insertions(+), 5 deletions(-) rename audit_logs/DLT/{aws_audit_logs_dlt_bronze_silver.py => 01_raw_bronze_silver.py} (82%) rename audit_logs/DLT/{aws_audit_logs_dlt_gold.py => 02_silver_fan_gold.py} (94%) create mode 100644 audit_logs/DLT/README.md diff --git a/audit_logs/DLT/aws_audit_logs_dlt_bronze_silver.py b/audit_logs/DLT/01_raw_bronze_silver.py similarity index 82% rename from audit_logs/DLT/aws_audit_logs_dlt_bronze_silver.py rename to audit_logs/DLT/01_raw_bronze_silver.py index 0621141..a1f79cf 100644 --- a/audit_logs/DLT/aws_audit_logs_dlt_bronze_silver.py +++ b/audit_logs/DLT/01_raw_bronze_silver.py @@ -4,7 +4,11 @@ from pyspark.sql.types import * import json, time + +# edit the location where logs are delivered log_bucket = "s3://databricks-e2-certification-logs-bwp2xp/audit-logs" + +# edit the location where the autoloader schema are stored sink_bucket = "dbfs:/tmp" @dlt.table( @@ -25,15 +29,14 @@ def bronze(): # COMMAND ---------- -# create silver table @udf(StringType()) def strip_null_udf(raw): return json.dumps({i: raw.asDict()[i] for i in raw.asDict() if raw.asDict()[i] != None}) @dlt.table( - comment="Audit logs cleaned and prepared for analysis.", - table_properties={"quality":"silver"}, - partition_cols = [ 'date' ] + comment="Audit logs cleaned and prepared for analysis. Strip out all empty keys for every record, parse email address from a nested field and parse UNIX epoch to UTC timestamp.", + table_properties={"quality":"silver"}, + partition_cols = [ 'date' ] ) def silver(): return ( @@ -58,6 +61,8 @@ def bronze_silver_verification(): # COMMAND ---------- +# these udfs is used to calculate the super-schema based on all events of a given service + @udf(StringType()) def just_keys_udf(string): return [i for i in json.loads(string).keys()] @@ -80,7 +85,7 @@ def extract_schema_udf(keys): return schema.json() @dlt.table( - comment="Services and their schemas" + comment="List of services and their corresponding super-schema" ) def silver_services_schema(): diff --git a/audit_logs/DLT/aws_audit_logs_dlt_gold.py b/audit_logs/DLT/02_silver_fan_gold.py similarity index 94% rename from audit_logs/DLT/aws_audit_logs_dlt_gold.py rename to audit_logs/DLT/02_silver_fan_gold.py index 580b267..a803f56 100644 --- a/audit_logs/DLT/aws_audit_logs_dlt_gold.py +++ b/audit_logs/DLT/02_silver_fan_gold.py @@ -4,6 +4,7 @@ from pyspark.sql.types import * import json +# edit the full table name to match the silver tables the first pipeline writes out to services_table = "vuong_nguyen_audit.silver_services_schema" silver_table = "vuong_nguyen_audit.silver" diff --git a/audit_logs/DLT/README.md b/audit_logs/DLT/README.md new file mode 100644 index 0000000..dbbd984 --- /dev/null +++ b/audit_logs/DLT/README.md @@ -0,0 +1,14 @@ +## DLT pipelines for audit log processing + +Currently, Databricks delivers audit logs for all enabled workspaces as per delivery SLA in JSON format to a customer-owned S3 bucket/blob storage. These audit logs contain events for specific actions related to primary resources like clusters, jobs, and the workspace. To simplify delivery and further analysis by the customers, Databricks logs each event for every action as a separate record and stores all the relevant parameters into a sparse StructType called requestParams. + +In order to make this information more accessible, we recommend an ETL process based on [Delta Live Tables](https://databricks.com/product/delta-live-tables) + +Our ETL process requires 2 DLT pipelines: + +- The first pipeline (01-raw-bronze-silver) does the following + - Stream from the raw JSON files that Databricks delivers using Autoloader to a bronze Delta Lake table. This creates a durable copy of the raw data that allows us to replay our ETL, should we find any issues in downstream tables. + - Stream from a bronze Delta Lake table to a silver Delta Lake table such that it takes the sparse requestParams StructType and strips out all empty keys for every record, along with performing some other basic transformations like parsing email address from a nested field and parsing UNIX epoch to UTC timestamp. +- The second pipeline (02-silver-fan-gold) then streams to individual gold Delta Lake tables for each Databricks service tracked in the audit logs. As the list of service is dynamically inferred from the data in the silver table, we need to create a separate pipeline due to current limitation + +The 2 pipelines are then linked together in a multi-task job, to be executed on a regular schedule From 6eb40c7137341e94a49209e5ccc453740acc1125 Mon Sep 17 00:00:00 2001 From: "44292934+nkvuong@users.noreply.github.com" <44292934+nkvuong@users.noreply.github.com> Date: Thu, 24 Mar 2022 17:01:50 +0000 Subject: [PATCH 2/6] remove schema location for autoloader --- audit_logs/DLT/01_raw_bronze_silver.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/audit_logs/DLT/01_raw_bronze_silver.py b/audit_logs/DLT/01_raw_bronze_silver.py index a1f79cf..63fad3c 100644 --- a/audit_logs/DLT/01_raw_bronze_silver.py +++ b/audit_logs/DLT/01_raw_bronze_silver.py @@ -8,9 +8,6 @@ # edit the location where logs are delivered log_bucket = "s3://databricks-e2-certification-logs-bwp2xp/audit-logs" -# edit the location where the autoloader schema are stored -sink_bucket = "dbfs:/tmp" - @dlt.table( comment="The raw audit logs, ingested from the s3 location configured with Databricks audit log configuration", table_properties={"quality":"bronze"}, @@ -23,7 +20,6 @@ def bronze(): .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", True) .option("cloudFiles.schemaHints", "workspaceId long") - .option("cloudFiles.schemaLocation", f"{sink_bucket}/audit_log_schema") .load(log_bucket) ) From dfd11aa6138ad34d93a5082e78aabfd8104f2022 Mon Sep 17 00:00:00 2001 From: "44292934+nkvuong@users.noreply.github.com" <44292934+nkvuong@users.noreply.github.com> Date: Fri, 25 Mar 2022 13:46:50 +0000 Subject: [PATCH 3/6] parametrise inputs --- audit_logs/DLT/01_raw_bronze_silver.py | 4 ++-- audit_logs/DLT/02_silver_fan_gold.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/audit_logs/DLT/01_raw_bronze_silver.py b/audit_logs/DLT/01_raw_bronze_silver.py index 63fad3c..90ce5c3 100644 --- a/audit_logs/DLT/01_raw_bronze_silver.py +++ b/audit_logs/DLT/01_raw_bronze_silver.py @@ -5,8 +5,8 @@ import json, time -# edit the location where logs are delivered -log_bucket = "s3://databricks-e2-certification-logs-bwp2xp/audit-logs" +# retrieve the location where logs are delivered +log_bucket = spark.conf.get("mypipeline.log_bucket") @dlt.table( comment="The raw audit logs, ingested from the s3 location configured with Databricks audit log configuration", diff --git a/audit_logs/DLT/02_silver_fan_gold.py b/audit_logs/DLT/02_silver_fan_gold.py index a803f56..5673bce 100644 --- a/audit_logs/DLT/02_silver_fan_gold.py +++ b/audit_logs/DLT/02_silver_fan_gold.py @@ -4,9 +4,9 @@ from pyspark.sql.types import * import json -# edit the full table name to match the silver tables the first pipeline writes out to -services_table = "vuong_nguyen_audit.silver_services_schema" -silver_table = "vuong_nguyen_audit.silver" +# retrieve the full table name to match the silver tables the first pipeline writes out to +services_table = spark.conf.get("mypipeline.services_table") +silver_table = spark.conf.get("mypipeline.silver_table") # COMMAND ---------- From 7b486bc27fc67d7e97f4c6afe85dac8eb4fe48ec Mon Sep 17 00:00:00 2001 From: "nkvuong@gmail.com" Date: Tue, 29 Mar 2022 14:24:09 +0000 Subject: [PATCH 4/6] add az_eh_notebook --- audit_logs/azure_eh_audit_logs.py | 222 ++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 audit_logs/azure_eh_audit_logs.py diff --git a/audit_logs/azure_eh_audit_logs.py b/audit_logs/azure_eh_audit_logs.py new file mode 100644 index 0000000..b04d3d5 --- /dev/null +++ b/audit_logs/azure_eh_audit_logs.py @@ -0,0 +1,222 @@ +# Databricks notebook source +dbutils.widgets.text("catalog", "audit_logs") +dbutils.widgets.text("database", "azure") +dbutils.widgets.text("sink_bucket", "azure") + +# COMMAND ---------- + +catalog = dbutils.widgets.get("catalog") +database = dbutils.widgets.get("database") +sink_bucket = dbutils.widgets.get("sink_bucket").strip("/") + +# COMMAND ---------- + +from pyspark.sql.functions import col, from_unixtime, from_utc_timestamp, from_json +from pyspark.sql.types import * + +# COMMAND ---------- + +spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}") +spark.sql(f"CREATE DATABASE IF NOT EXISTS {catalog}.{database}") + +# COMMAND ---------- + +# event hub configuration + +TOPIC = "unity" +BOOTSTRAP_SERVERS = "fieldengdeveastus2ehb.servicebus.windows.net:9093" +EH_SASL = f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connSharedAccessKey}";' + +# COMMAND ---------- + +df = (spark.readStream + .format("kafka") + .option("subscribe", TOPIC) + .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) + .option("kafka.sasl.mechanism", "PLAIN") + .option("kafka.security.protocol", "SASL_SSL") + .option("kafka.sasl.jaas.config", EH_SASL) + .option("kafka.request.timeout.ms", "60000") + .option("kafka.session.timeout.ms", "60000") + .option("failOnDataLoss", "false") + .option("startingOffsets", "latest") + .load() + .withColumn("deserializedBody", col("value").cast("string")) + .drop("value") + ) + +# COMMAND ---------- + +bronze_table = f"{catalog}.{database}.bronze" +checkpoint_path = f"{sink_bucket}/checkpoints" + +(streamDF + .writeStream + .format("delta") + .partitionBy("date") + .outputMode("append") + .option("checkpointLocation", f"{checkpoint_path}/bronze") + .option("cloudFiles.maxBytesPerTrigger", "1g") + .option("mergeSchema", True) + .trigger(availableNow=True) + .toTable(bronze_table) +) + +# COMMAND ---------- + +while spark.streams.active != []: + print("Waiting for streaming query to finish.") + time.sleep(5) + +spark.sql(f"OPTIMIZE {bronze_table}") + +# COMMAND ---------- + +raw_schema = (StructType([ + StructField("records",ArrayType(StructType([ + StructField("Host", StringType()), + StructField("category", StringType()), + StructField("identity", StringType()), + StructField("operationName", StringType()), + StructField("operationVersion", StringType()), + StructField("properties", StructType([ + StructField("actionName", StringType()), + StructField("logId", StringType()), + StructField("requestId", StringType()), + StructField("requestParams", StringType()), + StructField("response", StringType()), + StructField("serviceName", StringType()), + StructField("sessionId", StringType()), + StructField("sourceIPAddress", StringType()), + StructField("userAgent", StringType())])), + StructField("resourceId", StringType()), + StructField("time", StringType()), + ]))) +])) + +bronzeDF = spark.readStream.table(bronze_table) + +query = (bronzeDF + .select("deserializedBody") + .withColumn("parsedBody", from_json("deserializedBody", raw_schema)) + .select(explode("parsedBody.records").alias("streamRecord")) + .selectExpr("streamRecord.*") + .withColumn("version", col("operationVersion")) + .withColumn("time", col("time").cast("timestamp")) + .withColumn("timestamp", unix_timestamp("time") * 1000) + .withColumn("date_time", from_utc_timestamp(from_unixtime(col("timestamp")/1000), "UTC")) + .select(col("category").alias("serviceName"), "version", "timestamp", "date", "properties", col("identity").alias("userIdentity")) + .selectExpr("*", "properties.*") + .withColumnRenamed("requestParams", "flattened") + .withColumn("identity", from_json("userIdentity", "email STRING, subjectName STRING")) + .drop("properties", "userIdentity") + ) + +# COMMAND ---------- + +silver_table = f"{catalog}.{database}.silver" + +( + query + .writeStream + .format("delta") + .partitionBy("date") + .outputMode("append") + .option("checkpointLocation", f"{checkpoint_path}/silver") + .option("mergeSchema", True) + .trigger(availableNow=True) + .toTable(silver_table) +) + +# COMMAND ---------- + +while spark.streams.active != []: + print("Waiting for streaming query to finish.") + time.sleep(5) + +# COMMAND ---------- + +assert(spark.table(bronze_table).count() == spark.table(silver_table).count()) + +# COMMAND ---------- + +spark.sql(f"OPTIMIZE {silver_table}") + +# COMMAND ---------- + +@udf(StringType()) +def just_keys_udf(string): + return [i for i in json.loads(string).keys()] + +# COMMAND ---------- + +def flatten_table(service): + + service_name = service.replace("-","_") + + flattenedStream = spark.readStream.table(silver_table) + flattened = spark.table(silver_table) + + schema = StructType() + + keys = ( + flattened + .filter(col("serviceName") == service_name) + .select(just_keys_udf(col("flattened")).alias("keys")) + .distinct() + .collect() + ) + + keysList = [i.asDict()['keys'][1:-1].split(", ") for i in keys] + + keysDistinct = {key for keys in keysList for key in keys if key != ""} + + if len(keysDistinct) == 0: + schema.add(StructField('placeholder', StringType())) + else: + for key in keysDistinct: + schema.add(StructField(key, StringType())) + + # write the df with the correct schema to table + (flattenedStream + .filter(col("serviceName") == service_name) + .withColumn("requestParams", from_json(col("requestParams"), schema)) + .drop("flattened") + .writeStream + .partitionBy("date") + .outputMode("append") + .format("delta") + .option("checkpointLocation", f"{checkpoint_path}/gold/{service_name}") + .option("mergeSchema", True) + .trigger(availableNow=True) + .toTable(f"{catalog}.{database}.{service_name}") + ) + + # optimize the table as well + spark.sql(f"OPTIMIZE {catalog}.{database}.{service_name}") + +# COMMAND ---------- + +import threading + +#For each table name (i.e. event type) create a separate thread and run the ThreadWorker function to save the data to Delta tables. +threads = [ + threading.Thread(target=flatten_table, args=(service)) + for service in spark.table(silver_table).select("serviceName").distinct().collect() +] + +for thread in threads: + thread.start() + +for thread in threads: + thread.join() + +# COMMAND ---------- + +while spark.streams.active != []: + print("Waiting for streaming query to finish.") + time.sleep(5) + +# COMMAND ---------- + +display(spark.sql(f"SHOW TABLES IN {catalog}.{database}")) From 5139b5fc8d9fcb2a5969c079ddffbaa7aa3b3817 Mon Sep 17 00:00:00 2001 From: "nkvuong@gmail.com" Date: Tue, 29 Mar 2022 17:46:53 +0000 Subject: [PATCH 5/6] update event hub etl --- ...dit_logs.py => azure_eh_audit_logs_etl.py} | 40 +++++++++---------- 1 file changed, 19 insertions(+), 21 deletions(-) rename audit_logs/{azure_eh_audit_logs.py => azure_eh_audit_logs_etl.py} (84%) diff --git a/audit_logs/azure_eh_audit_logs.py b/audit_logs/azure_eh_audit_logs_etl.py similarity index 84% rename from audit_logs/azure_eh_audit_logs.py rename to audit_logs/azure_eh_audit_logs_etl.py index b04d3d5..4c96e3e 100644 --- a/audit_logs/azure_eh_audit_logs.py +++ b/audit_logs/azure_eh_audit_logs_etl.py @@ -1,7 +1,7 @@ # Databricks notebook source dbutils.widgets.text("catalog", "audit_logs") dbutils.widgets.text("database", "azure") -dbutils.widgets.text("sink_bucket", "azure") +dbutils.widgets.text("sink_bucket", "s3://databricks-uc-field-eng-kpwfklmr/unity-azure") # COMMAND ---------- @@ -11,8 +11,9 @@ # COMMAND ---------- -from pyspark.sql.functions import col, from_unixtime, from_utc_timestamp, from_json +from pyspark.sql.functions import col, from_json, explode, unix_timestamp from pyspark.sql.types import * +import time, json # COMMAND ---------- @@ -23,6 +24,8 @@ # event hub configuration +connSharedAccessKey = + TOPIC = "unity" BOOTSTRAP_SERVERS = "fieldengdeveastus2ehb.servicebus.windows.net:9093" EH_SASL = f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connSharedAccessKey}";' @@ -39,9 +42,11 @@ .option("kafka.request.timeout.ms", "60000") .option("kafka.session.timeout.ms", "60000") .option("failOnDataLoss", "false") - .option("startingOffsets", "latest") + .option("startingOffsets", "earliest") + .option("maxOffsetsPerTrigger", 100000) .load() .withColumn("deserializedBody", col("value").cast("string")) + .withColumn("date", col("timestamp").cast("date")) .drop("value") ) @@ -50,13 +55,12 @@ bronze_table = f"{catalog}.{database}.bronze" checkpoint_path = f"{sink_bucket}/checkpoints" -(streamDF +(df .writeStream .format("delta") .partitionBy("date") .outputMode("append") - .option("checkpointLocation", f"{checkpoint_path}/bronze") - .option("cloudFiles.maxBytesPerTrigger", "1g") + .option("checkpointLocation", f"{checkpoint_path}/bronze") .option("mergeSchema", True) .trigger(availableNow=True) .toTable(bronze_table) @@ -102,13 +106,14 @@ .select(explode("parsedBody.records").alias("streamRecord")) .selectExpr("streamRecord.*") .withColumn("version", col("operationVersion")) - .withColumn("time", col("time").cast("timestamp")) - .withColumn("timestamp", unix_timestamp("time") * 1000) - .withColumn("date_time", from_utc_timestamp(from_unixtime(col("timestamp")/1000), "UTC")) - .select(col("category").alias("serviceName"), "version", "timestamp", "date", "properties", col("identity").alias("userIdentity")) + .withColumn("date_time", col("time").cast("timestamp")) + .withColumn("timestamp", unix_timestamp(col("date_time")) * 1000) + .withColumn("date", col("time").cast("date")) + .select("category", "version", "timestamp", "date_time", "date", "properties", col("identity").alias("userIdentity")) .selectExpr("*", "properties.*") .withColumnRenamed("requestParams", "flattened") .withColumn("identity", from_json("userIdentity", "email STRING, subjectName STRING")) + .withColumn("response", from_json("response", "errorMessage STRING,result STRING,statusCode BIGINT")) .drop("properties", "userIdentity") ) @@ -131,16 +136,9 @@ # COMMAND ---------- while spark.streams.active != []: - print("Waiting for streaming query to finish.") - time.sleep(5) - -# COMMAND ---------- - -assert(spark.table(bronze_table).count() == spark.table(silver_table).count()) - -# COMMAND ---------- - -spark.sql(f"OPTIMIZE {silver_table}") + print("Waiting for streaming query to finish.") + time.sleep(5) +spark.sql(f"OPTIMIZE {silver_table}") # COMMAND ---------- @@ -180,7 +178,7 @@ def flatten_table(service): # write the df with the correct schema to table (flattenedStream .filter(col("serviceName") == service_name) - .withColumn("requestParams", from_json(col("requestParams"), schema)) + .withColumn("requestParams", from_json(col("flattened"), schema)) .drop("flattened") .writeStream .partitionBy("date") From 084393698f611d5d6e6990557b37edbd4ff01365 Mon Sep 17 00:00:00 2001 From: "44292934+nkvuong@users.noreply.github.com" <44292934+nkvuong@users.noreply.github.com> Date: Wed, 20 Apr 2022 09:05:53 +0000 Subject: [PATCH 6/6] add dlt azure notebook from eventhub --- .../DLT/01_raw_bronze_silver_event_hub.py | 127 ++++++++++++++++++ audit_logs/DLT/README.md | 11 +- 2 files changed, 133 insertions(+), 5 deletions(-) create mode 100644 audit_logs/DLT/01_raw_bronze_silver_event_hub.py diff --git a/audit_logs/DLT/01_raw_bronze_silver_event_hub.py b/audit_logs/DLT/01_raw_bronze_silver_event_hub.py new file mode 100644 index 0000000..844f787 --- /dev/null +++ b/audit_logs/DLT/01_raw_bronze_silver_event_hub.py @@ -0,0 +1,127 @@ +# Databricks notebook source +import dlt +from pyspark.sql.functions import * +from pyspark.sql.types import * +import json, time + +# retrieve Shared Access Key from secret scope +connSharedAccessKey = dbutils.secrets.get(spark.conf.get("mypipeline.secret_scope_name"), spark.conf.get("mypipeline.secret_name")) + +# Name of Eventhubs namespace +eh_ns_name = spark.conf.get("mypipeline.eh_ns_name") +eh_topic_name = spark.conf.get("mypipeline.eh_topic_name") + +# event hub configuration +BOOTSTRAP_SERVERS = f"{eh_ns_name}.servicebus.windows.net:9093" +EH_SASL = f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{connSharedAccessKey}";' + +@dlt.table( + comment="The raw audit logs, ingested from the event hub configured with Databricks audit log configuration", + table_properties={"quality":"bronze"}, + partition_cols = [ 'date' ] +) +def bronze(): + return (spark.readStream + .format("kafka") + .option("subscribe", eh_topic_name) + .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) + .option("kafka.sasl.mechanism", "PLAIN") + .option("kafka.security.protocol", "SASL_SSL") + .option("kafka.sasl.jaas.config", EH_SASL) + .option("kafka.request.timeout.ms", "60000") + .option("kafka.session.timeout.ms", "60000") + .option("failOnDataLoss", "false") + .option("startingOffsets", "earliest") + .option("maxOffsetsPerTrigger", 100000) + .load() + .withColumn("deserializedBody", col("value").cast("string")) + .withColumn("date", col("timestamp").cast("date")) + .drop("value") + ) + +# COMMAND ---------- + +@udf(StringType()) +def strip_null_udf(raw): + return json.dumps({i: raw.asDict()[i] for i in raw.asDict() if raw.asDict()[i] != None}) + +raw_schema = (StructType([ + StructField("records", ArrayType(StructType([ + StructField("Host", StringType()), + StructField("category", StringType()), + StructField("identity", StringType()), + StructField("operationName", StringType()), + StructField("operationVersion", StringType()), + StructField("properties", StructType([ + StructField("actionName", StringType()), + StructField("logId", StringType()), + StructField("requestId", StringType()), + StructField("requestParams", StringType()), + StructField("response", StringType()), + StructField("serviceName", StringType()), + StructField("sessionId", StringType()), + StructField("sourceIPAddress", StringType()), + StructField("userAgent", StringType())])), + StructField("resourceId", StringType()), + StructField("time", StringType()), + ]))) +])) + +@dlt.table( + comment="Audit logs cleaned and prepared for analysis. Strip out all empty keys for every record, parse email address from a nested field and parse UNIX epoch to UTC timestamp.", + table_properties={"quality":"silver"}, + partition_cols = [ 'date' ] +) +def silver(): + return ( + dlt.read_stream("bronze") + .select("deserializedBody") + .withColumn("parsedBody", from_json("deserializedBody", raw_schema)) + .select(explode("parsedBody.records").alias("streamRecord")) + .selectExpr("streamRecord.*") + .withColumn("version", col("operationVersion")) + .withColumn("date_time", col("time").cast("timestamp")) + .withColumn("timestamp", unix_timestamp(col("date_time")) * 1000) + .withColumn("date", col("time").cast("date")) + .select("category", "version", "timestamp", "date_time", "date", "properties", col("identity").alias("userIdentity")) + .selectExpr("*", "properties.*") + .withColumnRenamed("requestParams", "flattened") + .withColumn("identity", from_json("userIdentity", "email STRING, subjectName STRING")) + .withColumn("response", from_json("response", "errorMessage STRING,result STRING,statusCode BIGINT")) + .drop("properties", "userIdentity") + ) + +# COMMAND ---------- + +# these udfs is used to calculate the super-schema based on all events of a given service + +@udf(StringType()) +def just_keys_udf(string): + return [i for i in json.loads(string).keys()] + +@udf(StringType()) +def extract_schema_udf(keys): + + schema = StructType() + + keysList = [i[1:-1].split(", ") for i in keys] + + keysDistinct = {key for keys in keysList for key in keys if key != ""} + + if len(keysDistinct) == 0: + schema.add(StructField("placeholder", StringType())) + else: + for key in keysDistinct: + schema.add(StructField(key, StringType())) + + return schema.json() + +@dlt.table( + comment="List of services and their corresponding super-schema" +) +def silver_services_schema(): + + return (dlt.read("silver") + .select('serviceName', just_keys_udf(col("flattened")).alias("keys")) + .groupBy('serviceName').agg(extract_schema_udf(collect_set("keys")).alias("schema")) + ) diff --git a/audit_logs/DLT/README.md b/audit_logs/DLT/README.md index dbbd984..4b2d84f 100644 --- a/audit_logs/DLT/README.md +++ b/audit_logs/DLT/README.md @@ -1,14 +1,15 @@ ## DLT pipelines for audit log processing -Currently, Databricks delivers audit logs for all enabled workspaces as per delivery SLA in JSON format to a customer-owned S3 bucket/blob storage. These audit logs contain events for specific actions related to primary resources like clusters, jobs, and the workspace. To simplify delivery and further analysis by the customers, Databricks logs each event for every action as a separate record and stores all the relevant parameters into a sparse StructType called requestParams. +Currently, Databricks delivers audit logs for all enabled workspaces as per delivery SLA in JSON format to a customer-owned S3 bucket on AWS, or to a specified Event Hub on Azure. These audit logs contain events for specific actions related to primary resources like clusters, jobs, and the workspace. To simplify delivery and further analysis by the customers, Databricks logs each event for every action as a separate record and stores all the relevant parameters into a sparse StructType called requestParams. In order to make this information more accessible, we recommend an ETL process based on [Delta Live Tables](https://databricks.com/product/delta-live-tables) Our ETL process requires 2 DLT pipelines: -- The first pipeline (01-raw-bronze-silver) does the following - - Stream from the raw JSON files that Databricks delivers using Autoloader to a bronze Delta Lake table. This creates a durable copy of the raw data that allows us to replay our ETL, should we find any issues in downstream tables. +- The first pipeline (`01-raw-bronze-silver` on AWS and `01-raw-bronze-silver-event-hub` on Azure) does the following: + - On AWS, stream from the raw JSON files that Databricks delivers using Autoloader to a bronze Delta Lake table. This creates a durable copy of the raw data that allows us to replay our ETL, should we find any issues in downstream tables. + - On Azure, stream from the Event Hub that Databricks delivers audit logs to. This creates a durable copy of the raw data that allows us to replay our ETL, should we find any issues in downstream tables. - Stream from a bronze Delta Lake table to a silver Delta Lake table such that it takes the sparse requestParams StructType and strips out all empty keys for every record, along with performing some other basic transformations like parsing email address from a nested field and parsing UNIX epoch to UTC timestamp. -- The second pipeline (02-silver-fan-gold) then streams to individual gold Delta Lake tables for each Databricks service tracked in the audit logs. As the list of service is dynamically inferred from the data in the silver table, we need to create a separate pipeline due to current limitation +- The second pipeline (`02-silver-fan-gold`) then streams to individual gold Delta Lake tables for each Databricks service tracked in the audit logs. As the list of service is dynamically inferred from the data in the silver table, we need to create a separate pipeline due to current limitation -The 2 pipelines are then linked together in a multi-task job, to be executed on a regular schedule +The 2 pipelines are then linked together in a multi-task job, to be executed on a regular schedule \ No newline at end of file