Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion extensions/business/cybersec/red_mesh/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,14 @@

# Port order constants
PORT_ORDER_SHUFFLE = "SHUFFLE"
PORT_ORDER_SEQUENTIAL = "SEQUENTIAL"
PORT_ORDER_SEQUENTIAL = "SEQUENTIAL"

# LLM Agent API status constants
LLM_API_STATUS_OK = "ok"
LLM_API_STATUS_ERROR = "error"
LLM_API_STATUS_TIMEOUT = "timeout"

# LLM Analysis types
LLM_ANALYSIS_SECURITY_ASSESSMENT = "security_assessment"
LLM_ANALYSIS_VULNERABILITY_SUMMARY = "vulnerability_summary"
LLM_ANALYSIS_REMEDIATION_PLAN = "remediation_plan"
262 changes: 257 additions & 5 deletions extensions/business/cybersec/red_mesh/pentester_api_01.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,19 @@

from naeural_core.business.default.web_app.fast_api_web_app import FastApiWebAppPlugin as BasePlugin
from .redmesh_utils import PentestLocalWorker # Import PentestJob from separate module
from .constants import FEATURE_CATALOG
from .redmesh_llm_agent_mixin import _RedMeshLlmAgentMixin
from .constants import (
FEATURE_CATALOG,
LLM_ANALYSIS_SECURITY_ASSESSMENT,
LLM_ANALYSIS_VULNERABILITY_SUMMARY,
LLM_ANALYSIS_REMEDIATION_PLAN,
)

__VER__ = '0.8.2'

_CONFIG = {
**BasePlugin.CONFIG,

"TUNNEL_ENGINE_ENABLED": False,

'PORT': None,
Expand All @@ -49,9 +56,9 @@
"CHECK_JOBS_EACH" : 5,

"REDMESH_VERBOSE" : 10, # Verbosity level for debug messages (0 = off, 1+ = debug)

"NR_LOCAL_WORKERS" : 8,

"WARMUP_DELAY" : 30,

# Defines how ports are split across local workers.
Expand All @@ -68,12 +75,19 @@
"SCAN_MIN_RND_DELAY": 0.0, # minimum delay in seconds (0 = disabled)
"SCAN_MAX_RND_DELAY": 0.0, # maximum delay in seconds (0 = disabled)

# LLM Agent API integration for auto-analysis
"LLM_AGENT_API_ENABLED": False, # Enable LLM-powered analysis
"LLM_AGENT_API_HOST": "127.0.0.1", # Host where LLM Agent API is running
"LLM_AGENT_API_PORT": None, # Port for LLM Agent API (required if enabled)
"LLM_AGENT_API_TIMEOUT": 120, # Timeout in seconds for LLM API calls
"LLM_AUTO_ANALYSIS_TYPE": "security_assessment", # Default analysis type

'VALIDATION_RULES': {
**BasePlugin.CONFIG['VALIDATION_RULES'],
**BasePlugin.CONFIG['VALIDATION_RULES'],
},
}

class PentesterApi01Plugin(BasePlugin):
class PentesterApi01Plugin(BasePlugin, _RedMeshLlmAgentMixin):
"""
RedMesh API plugin for orchestrating decentralized pentest jobs.

Expand Down Expand Up @@ -846,6 +860,11 @@ def _maybe_finalize_pass(self):
job_specs["date_updated"] = self.time()
job_specs["date_finalized"] = self.time()
self.P(f"[SINGLEPASS] Job {job_id} complete. Status set to FINALIZED.")

# Run LLM auto-analysis on aggregated report (launcher only)
if self.cfg_llm_agent_api_enabled:
self._run_aggregated_llm_analysis(job_id, job_specs, workers, pass_nr=job_pass)

self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs)
continue

Expand All @@ -857,10 +876,19 @@ def _maybe_finalize_pass(self):
job_specs["date_updated"] = self.time()
job_specs["date_finalized"] = self.time()
self.P(f"[CONTINUOUS] Pass {job_pass} complete for job {job_id}. Status set to STOPPED (soft stop was scheduled)")

# Run LLM auto-analysis on aggregated report (launcher only)
if self.cfg_llm_agent_api_enabled:
self._run_aggregated_llm_analysis(job_id, job_specs, workers, pass_nr=job_pass)

self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs)
continue
# end if

# Run LLM auto-analysis for this pass (launcher only)
if self.cfg_llm_agent_api_enabled:
self._run_aggregated_llm_analysis(job_id, job_specs, workers, pass_nr=job_pass)

# Schedule next pass
interval = job_specs.get("monitor_interval", self.cfg_monitor_interval)
jitter = random.uniform(0, self.cfg_monitor_jitter)
Expand Down Expand Up @@ -1487,6 +1515,230 @@ def stop_monitoring(self, job_id: str, stop_type: str = "SOFT"):
}


@BasePlugin.endpoint(method="post")
def analyze_job(
self,
job_id: str,
analysis_type: str = "",
focus_areas: list[str] = None
):
"""
Manually trigger LLM analysis for a completed job.

Aggregates reports from all workers and runs analysis on the combined data.

Parameters
----------
job_id : str
Identifier of the job to analyze.
analysis_type : str, optional
Type of analysis: "security_assessment", "vulnerability_summary", "remediation_plan".
focus_areas : list[str], optional
Areas to focus on: ["web", "network", "databases", "authentication"].

Returns
-------
dict
LLM analysis result or error message.
"""
if not self.cfg_llm_agent_api_enabled:
return {"error": "LLM Agent API is not enabled", "job_id": job_id}

if not self.cfg_llm_agent_api_port:
return {"error": "LLM Agent API port not configured", "job_id": job_id}

# Get job from CStore
job_specs = self._get_job_from_cstore(job_id)
if not job_specs:
return {"error": "Job not found", "job_id": job_id}

workers = job_specs.get("workers", {})
if not workers:
return {"error": "No workers found for this job", "job_id": job_id}

# Check if all workers have finished
all_finished = all(w.get("finished") for w in workers.values())
if not all_finished:
return {"error": "Job not yet complete, some workers still running", "job_id": job_id}

# Collect and aggregate reports from all workers
aggregated_report = self._collect_aggregated_report(workers)

if not aggregated_report:
return {"error": "No report data available for this job", "job_id": job_id}

# Add job metadata to report for context
target = job_specs.get("target", "unknown")
aggregated_report["_job_metadata"] = {
"job_id": job_id,
"target": target,
"num_workers": len(workers),
"worker_addresses": list(workers.keys()),
"start_port": job_specs.get("start_port"),
"end_port": job_specs.get("end_port"),
"enabled_features": job_specs.get("enabled_features", []),
}

# Call LLM Agent API
analysis_type = analysis_type or self.cfg_llm_auto_analysis_type

analysis_result = self._call_llm_agent_api(
endpoint="/analyze_scan",
method="POST",
payload={
"scan_results": aggregated_report,
"analysis_type": analysis_type,
"focus_areas": focus_areas,
}
)

if "error" in analysis_result:
return {
"error": analysis_result.get("error"),
"status": analysis_result.get("status"),
"job_id": job_id,
}

# Save analysis to R1FS and store in pass_history
analysis_cid = None
pass_history = job_specs.get("pass_history", [])
current_pass = job_specs.get("job_pass", 1)

try:
analysis_cid = self.r1fs.add_json(analysis_result, show_logs=False)
if analysis_cid:
# Store in pass_history (find the latest completed pass)
if pass_history:
# Update the latest pass entry with analysis CID
pass_history[-1]["llm_analysis_cid"] = analysis_cid
else:
# No pass_history yet - create one
pass_history.append({
"pass_nr": current_pass,
"completed_at": self.time(),
"reports": {addr: w.get("report_cid") for addr, w in workers.items()},
"llm_analysis_cid": analysis_cid,
})
job_specs["pass_history"] = pass_history

self.chainstore_hset(hkey=self.cfg_instance_id, key=job_id, value=job_specs)
self.P(f"Manual LLM analysis saved for job {job_id}, CID: {analysis_cid}")
except Exception as e:
self.P(f"Failed to save analysis to R1FS: {e}", color='y')

return {
"job_id": job_id,
"target": target,
"num_workers": len(workers),
"pass_nr": pass_history[-1].get("pass_nr") if pass_history else current_pass,
"analysis_type": analysis_type,
"analysis": analysis_result,
"analysis_cid": analysis_cid,
}


@BasePlugin.endpoint
def get_analysis(self, job_id: str = "", cid: str = "", pass_nr: int = None):
"""
Retrieve LLM analysis for a job or by CID.

The analysis is generated by the launcher node after all workers complete,
containing the aggregated results from all distributed workers.

Parameters
----------
job_id : str, optional
Identifier of the job to get analysis for.
cid : str, optional
Direct CID of the analysis to retrieve.
pass_nr : int, optional
Pass number for continuous jobs. If not provided, returns the latest pass.

Returns
-------
dict
LLM analysis data or error message.
"""
# If CID provided directly, fetch it
if cid:
try:
analysis = self.r1fs.get_json(cid)
if analysis is None:
return {"error": "Analysis not found", "cid": cid}
return {"cid": cid, "analysis": analysis}
except Exception as e:
return {"error": str(e), "cid": cid}

# Otherwise, look up by job_id
if not job_id:
return {"error": "Either job_id or cid must be provided"}

job_specs = self._get_job_from_cstore(job_id)
if not job_specs:
return {"error": "Job not found", "job_id": job_id}

# Look for analysis in pass_history
pass_history = job_specs.get("pass_history", [])
job_status = job_specs.get("job_status", "RUNNING")

if not pass_history:
if job_status == "RUNNING":
return {"error": "Job still running, no passes completed yet", "job_id": job_id, "job_status": job_status}
return {"error": "No pass history available for this job", "job_id": job_id, "job_status": job_status}

# Find the requested pass (or latest if not specified)
target_pass = None
if pass_nr is not None:
for entry in pass_history:
if entry.get("pass_nr") == pass_nr:
target_pass = entry
break
if not target_pass:
return {"error": f"Pass {pass_nr} not found in history", "job_id": job_id, "available_passes": [e.get("pass_nr") for e in pass_history]}
else:
# Get the latest pass
target_pass = pass_history[-1]

analysis_cid = target_pass.get("llm_analysis_cid")
if not analysis_cid:
return {
"error": "No LLM analysis available for this pass",
"job_id": job_id,
"pass_nr": target_pass.get("pass_nr"),
"job_status": job_status
}

try:
analysis = self.r1fs.get_json(analysis_cid)
if analysis is None:
return {"error": "Analysis not found in R1FS", "cid": analysis_cid, "job_id": job_id}
return {
"job_id": job_id,
"pass_nr": target_pass.get("pass_nr"),
"completed_at": target_pass.get("completed_at"),
"cid": analysis_cid,
"target": job_specs.get("target"),
"num_workers": len(job_specs.get("workers", {})),
"total_passes": len(pass_history),
"analysis": analysis,
}
except Exception as e:
return {"error": str(e), "cid": analysis_cid, "job_id": job_id}


@BasePlugin.endpoint
def llm_health(self):
"""
Check health of the LLM Agent API connection.

Returns
-------
dict
Health status of the LLM Agent API.
"""
return self._get_llm_health_status()


def process(self):
"""
Periodic task handler: launch new jobs and close completed ones.
Expand Down
Loading