Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ async def init(host, port, app_setup_func=None):
app.router.add_route("*", web_svc.get_route(WebService.REST_KEY), website_handler.rest_api)
app.router.add_route("GET", web_svc.get_route(WebService.EXPORT_PDF_KEY), website_handler.pdf_export)
app.router.add_route("GET", web_svc.get_route(WebService.EXPORT_NAV_KEY), website_handler.nav_export)
app.router.add_route("GET", web_svc.get_route(WebService.EXPORT_AFB_KEY), website_handler.afb_export)
app.router.add_route("GET", web_svc.get_route(WebService.COOKIE_KEY), website_handler.accept_cookies)
if not web_svc.is_local:
app.router.add_route("GET", web_svc.get_route(WebService.WHAT_TO_SUBMIT_KEY), website_handler.what_to_submit)
Expand Down
2 changes: 1 addition & 1 deletion spindle
237 changes: 237 additions & 0 deletions tests/test_afb_export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import os

from datetime import datetime
from tests.thread_app_test import ThreadAppTest
from threadcomponents.constants import UID as UID_KEY
from uuid import uuid4


class TestAFBExport(ThreadAppTest):
"""A test suite for checking AFB-export of reports."""

DB_TEST_FILE = os.path.join("tests", "threadtestafbexport.db")

async def setUpAsync(self):
await super().setUpAsync()

self.report_id, self.report_title = str(uuid4()), "Blitzball: A Guide"
await self.submit_test_report(
dict(
uid=self.report_id,
title=self.report_title,
url="lets.blitz",
date_written="2025-07-25",
),
sentences=[
":~$ echo $JECHT_SHOT",
"For more test data, I need to say some random IP addresses.",
"2607[:]f8b0[:]400e[:]c01[::]8a",
"74[.]125[.]199[.]138",
],
attacks_found=[
[("d99999", "Drain"), ("f32451", "Firaga"), ("s12345", "Yevon")],
[("d99999", "Drain"), ("f12345", "Fire")],
[],
[],
],
)

self.report_id2, self.report_title2 = str(uuid4()), "Chocobo Racing: A Guide"
await self.submit_test_report(
dict(
uid=self.report_id2,
title=self.report_title2,
url="kw.eh",
date_written="2025-07-25",
),
sentences=[
"Kweh.",
"Kweh!?",
],
attacks_found=[
[("f32451", "Firaga")],
[],
],
post_confirm_attack=True,
confirm_attack="f32451",
)

await self.confirm_report_attacks()
await self.save_report_iocs()

async def confirm_report_attacks(self):
"""Confirms the attacks in the test-report."""
await self.confirm_report_sentence_attacks(self.report_id, 0, ["d99999", "s12345"])
await self.confirm_report_sentence_attacks(self.report_id, 1, ["f12345"])

async def save_report_iocs(self):
"""Saves the IoCs in the test-report."""
sentence = await self.db.get("report_sentences", equal=dict(report_uid=self.report_id, sen_index=0))
await self.client.post(
"/rest",
json=dict(
index="add_indicator_of_compromise",
sentence_id=sentence[0][UID_KEY],
ioc_text="echo $JECHT_SHOT",
),
)

for sentence_index in range(2, 4):
sentence = await self.db.get(
"report_sentences",
equal=dict(report_uid=self.report_id, sen_index=sentence_index),
)
await self.client.post(
"/rest",
json=dict(
index="suggest_and_save_ioc",
sentence_id=sentence[0][UID_KEY],
),
)

async def get_export_afb_response(self):
"""Requests the AFB export and returns the response."""
return await self.client.get("/export/afb/Blitzball%3A%20A%20Guide")

async def get_attached_afb_as_json(self):
"""Returns the exported AFB as json."""
response = await self.get_export_afb_response()
return await response.json()

@staticmethod
def get_entries_from_export(exported, id_value):
"""Returns the entries from the exported data given an ID-type."""
all_objects = exported["objects"]
filtered_list = []

for current in all_objects:
if current["id"] == id_value:
filtered_list.append(current)

return filtered_list

async def trigger_object_by_id_value_test(self, id_value, expected_properties):
"""Given an ID-value in the objects-list, tests the objects for that ID-value are correct."""
exported = await self.get_attached_afb_as_json()

retrieved = self.get_entries_from_export(exported, id_value)
expected_count = len(expected_properties)
self.assertEqual(len(retrieved), expected_count, f"{expected_count} x {id_value} entries not set.")

for current in retrieved:
current_props = dict(current["properties"])

for key, value in current_props.items():
if isinstance(value, list):
current_props[key] = dict(value)

self.assertTrue(current_props in expected_properties, f"{id_value} object is an unexpected entry.")

async def test_response_headers(self):
"""Tests the response headers reflect an attachment."""
response = await self.get_export_afb_response()

self.assertEqual(response.content_disposition.type, "attachment", "Response is not an attachment.")
self.assertEqual(
response.content_disposition.filename,
"Blitzball__A_Guide.afb",
"Attachment's filename is different than expected.",
)

async def test_schema(self):
"""Tests the schema is correctly set in the export."""
exported = await self.get_attached_afb_as_json()
self.assertEqual(exported["schema"], "attack_flow_v2", "Schema not correctly set.")

async def test_camera_object(self):
"""Tests the camera-object is correctly set in the export."""
exported = await self.get_attached_afb_as_json()
camera = exported["camera"]

camera_fields = sorted(camera.keys())
self.assertEqual(camera_fields, ["k", "x", "y"], "Camera-object not correctly set.")

numbers_provided = all([isinstance(_, int) or isinstance(_, float) for _ in camera.values()])
self.assertTrue(numbers_provided, "Camera-values are not numbers.")

async def test_flow_object(self):
"""Tests the flow-object is correctly set in the export."""
exported = await self.get_attached_afb_as_json()

flow_objects = self.get_entries_from_export(exported, "flow")
self.assertEqual(len(flow_objects), 1, "1 x Flow entry not set.")

properties = dict(flow_objects[0]["properties"])
self.assertEqual(properties["name"], self.report_title, "Report title not set in export.")

today = datetime.now()
exported_date = datetime.strptime(properties["created"], "%Y-%m-%dT%H:%M:%S.%fZ")
self.assertEqual(today.date(), exported_date.date(), "Created timestamp is not of today.")

object_list = sorted(flow_objects[0]["objects"])
declared_ids = []

for current in exported["objects"]:
if current["id"] != "flow":
declared_ids.append(current["instance"])

self.assertEqual(object_list, sorted(declared_ids), "Objects-list has incorrect IDs.")

async def test_anchors_key_declared(self):
"""Tests objects from the export have an anchor key."""
exported = await self.get_attached_afb_as_json()

for current in exported["objects"]:
if current["id"] != "flow":
self.assertTrue("anchors" in current, "Non-flow entry missing 'anchors' key.")

async def test_layout_object(self):
"""Tests the layout-object is correctly set in the export."""
exported = await self.get_attached_afb_as_json()

flow_object = self.get_entries_from_export(exported, "flow")[0]
object_list = sorted(flow_object["objects"])
layout = exported["layout"]

layout_keys = sorted(layout.keys())
self.assertEqual(layout_keys, object_list, "Layout-object not correctly set.")

layout_values = [ln for l_coords in layout.values() for ln in l_coords]
numbers_provided = all([isinstance(_, int) or isinstance(_, float) for _ in layout_values])
self.assertTrue(numbers_provided, "Layout-values are not numbers.")

async def test_ipv6_object_declared(self):
"""Tests an IPv6 entry is correctly included in the export."""
expected = dict(value="2607[:]f8b0[:]400e[:]c01[::]8a")
await self.trigger_object_by_id_value_test("ipv6_addr", [expected])

async def test_ipv4_object_declared(self):
"""Tests an IPv4 entry is correctly included in the export."""
expected = dict(value="74[.]125[.]199[.]138")
await self.trigger_object_by_id_value_test("ipv4_addr", [expected])

async def test_process_object_declared(self):
"""Tests a process-entry is correctly included in the export."""
expected = dict(command_line=":~$ echo $JECHT_SHOT")
await self.trigger_object_by_id_value_test("process", [expected])

async def test_malware_object_declared(self):
"""Tests a malware-entry is correctly included in the export."""
expected = dict(name="Yevon")
await self.trigger_object_by_id_value_test("malware", [expected])

async def test_action_objects_declared(self):
"""Tests action-entries are correctly included in the export."""
expected_1 = dict(
name="Drain",
technique_id="T1029",
technique_ref="d99999",
ttp=dict(technique="T1029"),
)
expected_2 = dict(
name="Fire",
technique_id="T1562",
technique_ref="f12345",
ttp=dict(technique="T1562"),
)
await self.trigger_object_by_id_value_test("action", [expected_1, expected_2])
25 changes: 23 additions & 2 deletions tests/thread_app_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def setUpClass(cls):
report_exporter = ReportExporter(services=services_with_limit)
cls.web_api_with_limit = WebAPI(services=services_with_limit, report_exporter=report_exporter)
# Some test-attack data
cls.attacks = dict(d99999="Drain", f12345="Fire", f32451="Firaga", s00001="requiem")
cls.attacks = dict(d99999="Drain", f12345="Fire", f32451="Firaga", s00001="requiem", s12345="Yevon")

@classmethod
def tearDownClass(cls):
Expand All @@ -122,10 +122,13 @@ async def setUpAsync(self):
attack_2 = dict(uid="f32451", tid="T1562.004", name=self.attacks.get("f32451"))
attack_3 = dict(uid="d99999", tid="T1029", name=self.attacks.get("d99999"))
attack_4 = dict(uid="s00001", tid="T1485", name=self.attacks.get("s00001"), inactive=1)
for attack in [attack_1, attack_2, attack_3, attack_4]:
attack_5 = dict(uid="s12345", tid="S1030", name=self.attacks.get("s12345"))

for attack in [attack_1, attack_2, attack_3, attack_4, attack_5]:
# Ignoring Integrity Error in case other test case already has inserted this data (causing duplicate UIDs)
with suppress(sqlite3.IntegrityError):
await self.db.insert("attack_uids", attack)

# Insert some category, keyword & country data
cat_1 = dict(uid="c010101", keyname="aerospace", name="Aerospace")
cat_2 = dict(uid="c123456", keyname="music", name="Music")
Expand All @@ -134,11 +137,13 @@ async def setUpAsync(self):
group_2 = dict(uid="apt2", name="APT2")
group_3 = dict(uid="apt3", name="APT3")
self.data_svc.country_dict = dict(HB="Hobbiton", TA="Tatooine", WA="Wakanda")

for cat, group in [(cat_1, group_1), (cat_2, group_2), (cat_3, group_3)]:
with suppress(sqlite3.IntegrityError):
await self.db.insert("categories", cat)
await self.db.insert("keywords", group)
self.web_svc.categories_dict[cat["keyname"]] = dict(name=cat["name"], sub_categories=[])

# Carry out pre-launch tasks except for prepare_queue(): replace the call of this to return (and do) nothing
# We don't want multiple prepare_queue() calls so the queue does not accumulate between tests
with patch.object(RestService, "prepare_queue", return_value=None):
Expand All @@ -161,6 +166,7 @@ async def get_application(self):
app.router.add_route("GET", self.web_svc.get_route(WebService.EDIT_KEY), self.web_api.edit)
app.router.add_route("GET", self.web_svc.get_route(WebService.ABOUT_KEY), self.web_api.about)
app.router.add_route("GET", self.web_svc.get_route(WebService.HOW_IT_WORKS_KEY), self.web_api.how_it_works)
app.router.add_route("GET", self.web_svc.get_route(WebService.EXPORT_AFB_KEY), self.web_api.afb_export)
app.router.add_route("*", self.web_svc.get_route(WebService.REST_KEY), self.web_api.rest_api)
# A different route for limit-testing
app.router.add_route(
Expand Down Expand Up @@ -245,6 +251,7 @@ async def submit_test_report(
await self.db.insert("reports", report)
# Mock the analysis of the report
await self.rest_svc.start_analysis(criteria=report)

if post_confirm_attack:
# Get the report sentences for this report
db_sentences = await self.db.get("report_sentences", equal=dict(report_uid=report[UID_KEY]))
Expand All @@ -259,6 +266,20 @@ async def submit_test_report(
"/rest", json=dict(index="add_attack", sentence_id=sen_id, attack_uid=confirm_attack)
)

async def confirm_report_sentence_attacks(self, report_id, sentence_index, attack_list):
"""Accepts a list of attacks for a given test-sentence index."""
sentence = await self.db.get("report_sentences", equal=dict(report_uid=report_id, sen_index=sentence_index))

for accept in attack_list:
await self.client.post(
"/rest",
json=dict(
index="add_attack",
sentence_id=sentence[0][UID_KEY],
attack_uid=accept,
),
)

def mock_current_attack_data(self, attack_list=None):
"""Helper-method to mock the retrieval of the current Att%ck data by returning a specified attack-list."""
attack_list = attack_list or []
Expand Down
3 changes: 3 additions & 0 deletions threadcomponents/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@
UID = "uid"
URL = "url"
TITLE = "title"

TTP = "ttp"
IOC = "ioc"
23 changes: 21 additions & 2 deletions threadcomponents/handlers/web_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import logging

from aiohttp import web as aiohttp_web
from aiohttp.web_exceptions import HTTPException
from aiohttp_jinja2 import template, web
from aiohttp_security import authorized_userid
Expand Down Expand Up @@ -324,6 +325,7 @@ async def edit(self, request):
final_html = await self.web_svc.build_final_html(original_html, sentences)
pdf_link = self.web_svc.get_route(self.web_svc.EXPORT_PDF_KEY, param=title_quoted)
nav_link = self.web_svc.get_route(self.web_svc.EXPORT_NAV_KEY, param=title_quoted)
afb_link = self.web_svc.get_route(self.web_svc.EXPORT_AFB_KEY, param=title_quoted)
# Add some help-text
completed_info, private_info, sen_limit_help = None, None, None
is_completed = int(report_status == ReportStatus.COMPLETED.value)
Expand Down Expand Up @@ -358,6 +360,7 @@ async def edit(self, request):
original_html=original_html,
pdf_link=pdf_link,
nav_link=nav_link,
afb_link=afb_link,
unchecked=unchecked,
completed_help_text=completed_info,
private_help_text=private_info,
Expand Down Expand Up @@ -388,14 +391,30 @@ async def edit(self, request):
template_data.update(same_dates=True)
return template_data

@staticmethod
def get_downloadable_export_response(file_data, media_type, filename):
"""Returns a response with a downloadable file."""
return aiohttp_web.Response(
body=file_data,
headers={
"Content-Type": f"application/{media_type}",
"Content-Disposition": f"attachment; filename={filename}",
},
)

async def afb_export(self, request):
"""Function to export report in AFB format."""
filename, json_str = await self.report_exporter.afb_export(request)
return self.get_downloadable_export_response(json_str, "json", filename)

async def nav_export(self, request):
"""
Function to export confirmed sentences in layer json format
:param request: The title of the report information
:return: the layer json
"""
json_str = await self.report_exporter.nav_export(request)
return web.json_response(json_str)
filename, json_str = await self.report_exporter.nav_export(request)
return self.get_downloadable_export_response(json_str, "json", filename)

async def pdf_export(self, request):
"""
Expand Down
Loading