From bafbb2091b33524b4e1eb86928f192e7bfe246b8 Mon Sep 17 00:00:00 2001 From: toderian Date: Sun, 1 Feb 2026 16:37:48 +0200 Subject: [PATCH 1/4] feat: implement redmesh llm_agent_api and deepseek integration --- .../business/cybersec/red_mesh/constants.py | 12 +- .../cybersec/red_mesh/pentester_api_01.py | 537 +++++++++++++++- .../red_mesh/redmesh_llm_agent_api.py | 587 ++++++++++++++++++ 3 files changed, 1134 insertions(+), 2 deletions(-) create mode 100644 extensions/business/cybersec/red_mesh/redmesh_llm_agent_api.py diff --git a/extensions/business/cybersec/red_mesh/constants.py b/extensions/business/cybersec/red_mesh/constants.py index a3f0e401..65bd84b1 100644 --- a/extensions/business/cybersec/red_mesh/constants.py +++ b/extensions/business/cybersec/red_mesh/constants.py @@ -96,4 +96,14 @@ # Port order constants PORT_ORDER_SHUFFLE = "SHUFFLE" -PORT_ORDER_SEQUENTIAL = "SEQUENTIAL" \ No newline at end of file +PORT_ORDER_SEQUENTIAL = "SEQUENTIAL" + +# LLM Agent API status constants +LLM_API_STATUS_OK = "ok" +LLM_API_STATUS_ERROR = "error" +LLM_API_STATUS_TIMEOUT = "timeout" + +# LLM Analysis types +LLM_ANALYSIS_SECURITY_ASSESSMENT = "security_assessment" +LLM_ANALYSIS_VULNERABILITY_SUMMARY = "vulnerability_summary" +LLM_ANALYSIS_REMEDIATION_PLAN = "remediation_plan" \ No newline at end of file diff --git a/extensions/business/cybersec/red_mesh/pentester_api_01.py b/extensions/business/cybersec/red_mesh/pentester_api_01.py index 11bbeb22..da4d6dab 100644 --- a/extensions/business/cybersec/red_mesh/pentester_api_01.py +++ b/extensions/business/cybersec/red_mesh/pentester_api_01.py @@ -31,10 +31,16 @@ """ import random +import requests from naeural_core.business.default.web_app.fast_api_web_app import FastApiWebAppPlugin as BasePlugin from .redmesh_utils import PentestLocalWorker # Import PentestJob from separate module -from .constants import FEATURE_CATALOG +from .constants import ( + FEATURE_CATALOG, + LLM_ANALYSIS_SECURITY_ASSESSMENT, + LLM_ANALYSIS_VULNERABILITY_SUMMARY, + LLM_ANALYSIS_REMEDIATION_PLAN, +) __VER__ = '0.8.2' @@ -68,6 +74,13 @@ "SCAN_MIN_RND_DELAY": 0.0, # minimum delay in seconds (0 = disabled) "SCAN_MAX_RND_DELAY": 0.0, # maximum delay in seconds (0 = disabled) + # LLM Agent API integration for auto-analysis + "LLM_AGENT_API_ENABLED": False, # Enable LLM-powered analysis + "LLM_AGENT_API_HOST": "127.0.0.1", # Host where LLM Agent API is running + "LLM_AGENT_API_PORT": None, # Port for LLM Agent API (required if enabled) + "LLM_AGENT_API_TIMEOUT": 120, # Timeout in seconds for LLM API calls + "LLM_AUTO_ANALYSIS_TYPE": "security_assessment", # Default analysis type + 'VALIDATION_RULES': { **BasePlugin.CONFIG['VALIDATION_RULES'], }, @@ -184,6 +197,259 @@ def Pd(self, s, *args, score=-1, **kwargs): return + """LLM AGENT API CLIENT METHODS""" + + def _get_llm_agent_api_url(self, endpoint: str) -> str: + """ + Build URL for LLM Agent API endpoint. + + Parameters + ---------- + endpoint : str + API endpoint path (e.g., "/chat", "/analyze_scan"). + + Returns + ------- + str + Full URL to the endpoint. + """ + host = self.cfg_llm_agent_api_host + port = self.cfg_llm_agent_api_port + endpoint = endpoint.lstrip("/") + return f"http://{host}:{port}/{endpoint}" + + def _call_llm_agent_api( + self, + endpoint: str, + method: str = "POST", + payload: dict = None, + timeout: int = None + ) -> dict: + """ + Make HTTP request to the LLM Agent API. + + Parameters + ---------- + endpoint : str + API endpoint to call (e.g., "/analyze_scan", "/health"). + method : str, optional + HTTP method (default: "POST"). + payload : dict, optional + JSON payload for POST requests. + timeout : int, optional + Request timeout in seconds. + + Returns + ------- + dict + API response or error object. + """ + if not self.cfg_llm_agent_api_enabled: + return {"error": "LLM Agent API is not enabled", "status": "disabled"} + + if not self.cfg_llm_agent_api_port: + return {"error": "LLM Agent API port not configured", "status": "config_error"} + + url = self._get_llm_agent_api_url(endpoint) + timeout = timeout or self.cfg_llm_agent_api_timeout + + try: + self.Pd(f"Calling LLM Agent API: {method} {url}") + + if method.upper() == "GET": + response = requests.get(url, timeout=timeout) + else: + response = requests.post( + url, + json=payload or {}, + headers={"Content-Type": "application/json"}, + timeout=timeout + ) + + if response.status_code != 200: + return { + "error": f"LLM Agent API returned status {response.status_code}", + "status": "api_error", + "details": response.text + } + + # Unwrap response if FastAPI wrapped it (extract 'result' from envelope) + response_data = response.json() + if isinstance(response_data, dict) and "result" in response_data: + return response_data["result"] + return response_data + + except requests.exceptions.ConnectionError: + self.P(f"LLM Agent API not reachable at {url}", color='y') + return {"error": "LLM Agent API not reachable", "status": "connection_error"} + except requests.exceptions.Timeout: + self.P(f"LLM Agent API request timed out", color='y') + return {"error": "LLM Agent API request timed out", "status": "timeout"} + except Exception as e: + self.P(f"Error calling LLM Agent API: {e}", color='r') + return {"error": str(e), "status": "error"} + + def _auto_analyze_report(self, job_id: str, report: dict, target: str) -> dict: + """ + Automatically analyze a completed scan report using LLM Agent API. + + Parameters + ---------- + job_id : str + Identifier of the completed job. + report : dict + Aggregated scan report to analyze. + target : str + Target hostname/IP that was scanned. + + Returns + ------- + dict + LLM analysis result or error object. + """ + if not self.cfg_llm_agent_api_enabled: + self.Pd("LLM auto-analysis skipped (not enabled)") + return None + + self.P(f"Running LLM auto-analysis for job {job_id}, target {target}...") + + analysis_result = self._call_llm_agent_api( + endpoint="/analyze_scan", + method="POST", + payload={ + "scan_results": report, + "analysis_type": self.cfg_llm_auto_analysis_type, + "focus_areas": None, + } + ) + + if "error" in analysis_result: + self.P(f"LLM auto-analysis failed for job {job_id}: {analysis_result.get('error')}", color='y') + else: + self.P(f"LLM auto-analysis completed for job {job_id}") + + return analysis_result + + def _collect_aggregated_report(self, workers: dict) -> dict: + """ + Collect and aggregate reports from all workers. + + Parameters + ---------- + workers : dict + Worker entries from job_specs containing report_cid or result. + + Returns + ------- + dict + Aggregated report combining all worker data. + """ + all_reports = {} + + for addr, worker_entry in workers.items(): + report = None + report_cid = worker_entry.get("report_cid") + + # Try to fetch from R1FS first + if report_cid: + try: + report = self.r1fs.get_json(report_cid) + self.Pd(f"Fetched report from R1FS for worker {addr}: CID {report_cid}") + except Exception as e: + self.P(f"Failed to fetch report from R1FS for {addr}: {e}", color='y') + + # Fallback to direct result + if not report: + report = worker_entry.get("result") + + if report: + all_reports[addr] = report + + if not all_reports: + self.P("No reports found to aggregate", color='y') + return {} + + # Aggregate all reports + aggregated = self._get_aggregated_report(all_reports) + return aggregated + + def _run_aggregated_llm_analysis(self, job_id: str, job_specs: dict, workers: dict, pass_nr: int = None): + """ + Run LLM analysis on aggregated report from all workers. + + Called by the launcher node after all workers complete. + + Parameters + ---------- + job_id : str + Identifier of the job. + job_specs : dict + Job specification (will be updated with analysis CID). + workers : dict + Worker entries containing report data. + pass_nr : int, optional + Pass number for continuous monitoring jobs. If provided, analysis CID + is stored in pass_history entry instead of job-level. + + Returns + ------- + str or None + Analysis CID if successful, None otherwise. + """ + target = job_specs.get("target", "unknown") + run_mode = job_specs.get("run_mode", "SINGLEPASS") + pass_info = f" (pass {pass_nr})" if pass_nr else "" + self.P(f"Running aggregated LLM analysis for job {job_id}{pass_info}, target {target}...") + + # Collect and aggregate reports from all workers + aggregated_report = self._collect_aggregated_report(workers) + + if not aggregated_report: + self.P(f"No data to analyze for job {job_id}", color='y') + return None + + # Add job metadata to report for context + aggregated_report["_job_metadata"] = { + "job_id": job_id, + "target": target, + "num_workers": len(workers), + "worker_addresses": list(workers.keys()), + "start_port": job_specs.get("start_port"), + "end_port": job_specs.get("end_port"), + "enabled_features": job_specs.get("enabled_features", []), + "run_mode": run_mode, + "pass_nr": pass_nr, + } + + # Call LLM analysis + llm_analysis = self._auto_analyze_report(job_id, aggregated_report, target) + + if not llm_analysis or "error" in llm_analysis: + self.P(f"LLM analysis failed for job {job_id}: {llm_analysis.get('error') if llm_analysis else 'No response'}", color='y') + return None + + # Save analysis to R1FS + try: + analysis_cid = self.r1fs.add_json(llm_analysis, show_logs=False) + if analysis_cid: + # Always store in pass_history for consistency (both SINGLEPASS and CONTINUOUS) + pass_history = job_specs.get("pass_history", []) + for entry in pass_history: + if entry.get("pass_nr") == pass_nr: + entry["llm_analysis_cid"] = analysis_cid + break + self.P(f"LLM analysis for pass {pass_nr} saved, CID: {analysis_cid}") + return analysis_cid + else: + self.P(f"Failed to save LLM analysis to R1FS for job {job_id}", color='y') + return None + except Exception as e: + self.P(f"Error saving LLM analysis to R1FS: {e}", color='r') + return None + + """END LLM AGENT API CLIENT METHODS""" + + def __post_init(self): """ Perform warmup: reconcile existing jobs in CStore, migrate legacy keys, @@ -846,6 +1112,11 @@ def _maybe_finalize_pass(self): job_specs["date_updated"] = self.time() job_specs["date_finalized"] = self.time() self.P(f"[SINGLEPASS] Job {job_id} complete. Status set to FINALIZED.") + + # Run LLM auto-analysis on aggregated report (launcher only) + if self.cfg_llm_agent_api_enabled: + self._run_aggregated_llm_analysis(job_id, job_specs, workers, pass_nr=job_pass) + self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs) continue @@ -857,10 +1128,19 @@ def _maybe_finalize_pass(self): job_specs["date_updated"] = self.time() job_specs["date_finalized"] = self.time() self.P(f"[CONTINUOUS] Pass {job_pass} complete for job {job_id}. Status set to STOPPED (soft stop was scheduled)") + + # Run LLM auto-analysis on aggregated report (launcher only) + if self.cfg_llm_agent_api_enabled: + self._run_aggregated_llm_analysis(job_id, job_specs, workers, pass_nr=job_pass) + self.chainstore_hset(hkey=self.cfg_instance_id, key=job_key, value=job_specs) continue # end if + # Run LLM auto-analysis for this pass (launcher only) + if self.cfg_llm_agent_api_enabled: + self._run_aggregated_llm_analysis(job_id, job_specs, workers, pass_nr=job_pass) + # Schedule next pass interval = job_specs.get("monitor_interval", self.cfg_monitor_interval) jitter = random.uniform(0, self.cfg_monitor_jitter) @@ -1487,6 +1767,261 @@ def stop_monitoring(self, job_id: str, stop_type: str = "SOFT"): } + @BasePlugin.endpoint(method="post") + def analyze_job( + self, + job_id: str, + analysis_type: str = "", + focus_areas: list[str] = None + ): + """ + Manually trigger LLM analysis for a completed job. + + Aggregates reports from all workers and runs analysis on the combined data. + + Parameters + ---------- + job_id : str + Identifier of the job to analyze. + analysis_type : str, optional + Type of analysis: "security_assessment", "vulnerability_summary", "remediation_plan". + focus_areas : list[str], optional + Areas to focus on: ["web", "network", "databases", "authentication"]. + + Returns + ------- + dict + LLM analysis result or error message. + """ + if not self.cfg_llm_agent_api_enabled: + return {"error": "LLM Agent API is not enabled", "job_id": job_id} + + if not self.cfg_llm_agent_api_port: + return {"error": "LLM Agent API port not configured", "job_id": job_id} + + # Get job from CStore + job_specs = self._get_job_from_cstore(job_id) + if not job_specs: + return {"error": "Job not found", "job_id": job_id} + + workers = job_specs.get("workers", {}) + if not workers: + return {"error": "No workers found for this job", "job_id": job_id} + + # Check if all workers have finished + all_finished = all(w.get("finished") for w in workers.values()) + if not all_finished: + return {"error": "Job not yet complete, some workers still running", "job_id": job_id} + + # Collect and aggregate reports from all workers + aggregated_report = self._collect_aggregated_report(workers) + + if not aggregated_report: + return {"error": "No report data available for this job", "job_id": job_id} + + # Add job metadata to report for context + target = job_specs.get("target", "unknown") + aggregated_report["_job_metadata"] = { + "job_id": job_id, + "target": target, + "num_workers": len(workers), + "worker_addresses": list(workers.keys()), + "start_port": job_specs.get("start_port"), + "end_port": job_specs.get("end_port"), + "enabled_features": job_specs.get("enabled_features", []), + } + + # Call LLM Agent API + analysis_type = analysis_type or self.cfg_llm_auto_analysis_type + + analysis_result = self._call_llm_agent_api( + endpoint="/analyze_scan", + method="POST", + payload={ + "scan_results": aggregated_report, + "analysis_type": analysis_type, + "focus_areas": focus_areas, + } + ) + + if "error" in analysis_result: + return { + "error": analysis_result.get("error"), + "status": analysis_result.get("status"), + "job_id": job_id, + } + + # Save analysis to R1FS and store in pass_history + analysis_cid = None + pass_history = job_specs.get("pass_history", []) + current_pass = job_specs.get("job_pass", 1) + + try: + analysis_cid = self.r1fs.add_json(analysis_result, show_logs=False) + if analysis_cid: + # Store in pass_history (find the latest completed pass) + if pass_history: + # Update the latest pass entry with analysis CID + pass_history[-1]["llm_analysis_cid"] = analysis_cid + else: + # No pass_history yet - create one + pass_history.append({ + "pass_nr": current_pass, + "completed_at": self.time(), + "reports": {addr: w.get("report_cid") for addr, w in workers.items()}, + "llm_analysis_cid": analysis_cid, + }) + job_specs["pass_history"] = pass_history + + self.chainstore_hset(hkey=self.cfg_instance_id, key=job_id, value=job_specs) + self.P(f"Manual LLM analysis saved for job {job_id}, CID: {analysis_cid}") + except Exception as e: + self.P(f"Failed to save analysis to R1FS: {e}", color='y') + + return { + "job_id": job_id, + "target": target, + "num_workers": len(workers), + "pass_nr": pass_history[-1].get("pass_nr") if pass_history else current_pass, + "analysis_type": analysis_type, + "analysis": analysis_result, + "analysis_cid": analysis_cid, + } + + + @BasePlugin.endpoint + def get_analysis(self, job_id: str = "", cid: str = "", pass_nr: int = None): + """ + Retrieve LLM analysis for a job or by CID. + + The analysis is generated by the launcher node after all workers complete, + containing the aggregated results from all distributed workers. + + Parameters + ---------- + job_id : str, optional + Identifier of the job to get analysis for. + cid : str, optional + Direct CID of the analysis to retrieve. + pass_nr : int, optional + Pass number for continuous jobs. If not provided, returns the latest pass. + + Returns + ------- + dict + LLM analysis data or error message. + """ + # If CID provided directly, fetch it + if cid: + try: + analysis = self.r1fs.get_json(cid) + if analysis is None: + return {"error": "Analysis not found", "cid": cid} + return {"cid": cid, "analysis": analysis} + except Exception as e: + return {"error": str(e), "cid": cid} + + # Otherwise, look up by job_id + if not job_id: + return {"error": "Either job_id or cid must be provided"} + + job_specs = self._get_job_from_cstore(job_id) + if not job_specs: + return {"error": "Job not found", "job_id": job_id} + + # Look for analysis in pass_history + pass_history = job_specs.get("pass_history", []) + job_status = job_specs.get("job_status", "RUNNING") + + if not pass_history: + if job_status == "RUNNING": + return {"error": "Job still running, no passes completed yet", "job_id": job_id, "job_status": job_status} + return {"error": "No pass history available for this job", "job_id": job_id, "job_status": job_status} + + # Find the requested pass (or latest if not specified) + target_pass = None + if pass_nr is not None: + for entry in pass_history: + if entry.get("pass_nr") == pass_nr: + target_pass = entry + break + if not target_pass: + return {"error": f"Pass {pass_nr} not found in history", "job_id": job_id, "available_passes": [e.get("pass_nr") for e in pass_history]} + else: + # Get the latest pass + target_pass = pass_history[-1] + + analysis_cid = target_pass.get("llm_analysis_cid") + if not analysis_cid: + return { + "error": "No LLM analysis available for this pass", + "job_id": job_id, + "pass_nr": target_pass.get("pass_nr"), + "job_status": job_status + } + + try: + analysis = self.r1fs.get_json(analysis_cid) + if analysis is None: + return {"error": "Analysis not found in R1FS", "cid": analysis_cid, "job_id": job_id} + return { + "job_id": job_id, + "pass_nr": target_pass.get("pass_nr"), + "completed_at": target_pass.get("completed_at"), + "cid": analysis_cid, + "target": job_specs.get("target"), + "num_workers": len(job_specs.get("workers", {})), + "total_passes": len(pass_history), + "analysis": analysis, + } + except Exception as e: + return {"error": str(e), "cid": analysis_cid, "job_id": job_id} + + + @BasePlugin.endpoint + def llm_health(self): + """ + Check health of the LLM Agent API connection. + + Returns + ------- + dict + Health status of the LLM Agent API. + """ + if not self.cfg_llm_agent_api_enabled: + return { + "enabled": False, + "status": "disabled", + "message": "LLM Agent API is not enabled in configuration" + } + + if not self.cfg_llm_agent_api_port: + return { + "enabled": True, + "status": "config_error", + "message": "LLM Agent API port not configured" + } + + result = self._call_llm_agent_api(endpoint="/health", method="GET", timeout=5) + + if "error" in result: + return { + "enabled": True, + "status": result.get("status", "error"), + "message": result.get("error"), + "host": self.cfg_llm_agent_api_host, + "port": self.cfg_llm_agent_api_port, + } + + return { + "enabled": True, + "status": "ok", + "host": self.cfg_llm_agent_api_host, + "port": self.cfg_llm_agent_api_port, + "llm_agent_health": result, + } + + def process(self): """ Periodic task handler: launch new jobs and close completed ones. diff --git a/extensions/business/cybersec/red_mesh/redmesh_llm_agent_api.py b/extensions/business/cybersec/red_mesh/redmesh_llm_agent_api.py new file mode 100644 index 00000000..27e2752f --- /dev/null +++ b/extensions/business/cybersec/red_mesh/redmesh_llm_agent_api.py @@ -0,0 +1,587 @@ +""" +RedMesh LLM Agent API Plugin + +Local API for DeepSeek LLM integration in RedMesh workflows. +Provides chat completion and scan analysis endpoints that proxy to DeepSeek API. + +Pipeline configuration example: +```json +{ + "NAME": "redmesh_llm_agent", + "TYPE": "Void", + "PLUGINS": [ + { + "SIGNATURE": "REDMESH_LLM_AGENT_API", + "INSTANCES": [ + { + "INSTANCE_ID": "llm_agent", + "PORT": 5050, + "DEEPSEEK_MODEL": "deepseek-chat" + } + ] + } + ] +} +``` + +Available Endpoints: +- POST /chat - Chat completion via DeepSeek API +- POST /analyze_scan - Analyze RedMesh scan results with LLM +- GET /health - Health check with API key status +- GET /status - Request metrics + +Environment Variables: +- DEEPSEEK_API_KEY: API key for DeepSeek (required) +""" + +import json +import requests +import traceback + +from typing import Any, Dict, List, Optional + +from naeural_core.business.default.web_app.fast_api_web_app import FastApiWebAppPlugin as BasePlugin + +from .constants import ( + LLM_API_STATUS_OK, + LLM_API_STATUS_ERROR, + LLM_API_STATUS_TIMEOUT, + LLM_ANALYSIS_SECURITY_ASSESSMENT, + LLM_ANALYSIS_VULNERABILITY_SUMMARY, + LLM_ANALYSIS_REMEDIATION_PLAN, +) + +__VER__ = '0.1.0' + +_CONFIG = { + **BasePlugin.CONFIG, + + # Local-only mode (no tunneling, no auth needed) + "TUNNEL_ENGINE_ENABLED": False, + "ALLOW_EMPTY_INPUTS": True, + + # Return raw responses without node metadata wrapper + "RESPONSE_FORMAT": "RAW", + + "PORT": None, + + # API metadata + "API_TITLE": "RedMesh LLM Agent API", + "API_SUMMARY": "Local API for DeepSeek LLM integration in RedMesh workflows.", + + # DeepSeek configuration + "DEEPSEEK_API_URL": "https://api.deepseek.com/chat/completions", + "DEEPSEEK_API_KEY": None, # API key (can be provided directly via config) + "DEEPSEEK_API_KEY_ENV": "DEEPSEEK_API_KEY", # Fallback: env var name if key not in config + "DEEPSEEK_MODEL": "deepseek-chat", + + # Request defaults + "DEFAULT_TEMPERATURE": 0.7, + "DEFAULT_MAX_TOKENS": 1024, + "DEFAULT_TOP_P": 1.0, + + # HTTP timeouts + "REQUEST_TIMEOUT_SECONDS": 120, + + # Debug/logging + "REDMESH_VERBOSE": 10, + + 'VALIDATION_RULES': { + **BasePlugin.CONFIG['VALIDATION_RULES'], + }, +} + +# System prompts for scan analysis +ANALYSIS_PROMPTS = { + LLM_ANALYSIS_SECURITY_ASSESSMENT: """You are a cybersecurity expert analyzing network scan results. +Provide a comprehensive security assessment of the target based on the scan data. +Include: +1. Executive summary of security posture +2. Key findings organized by severity (Critical, High, Medium, Low) +3. Attack surface analysis +4. Overall risk rating + +Be specific and reference the actual findings from the scan data.""", + + LLM_ANALYSIS_VULNERABILITY_SUMMARY: """You are a cybersecurity expert analyzing network scan results. +Provide a prioritized vulnerability summary based on the scan data. +Include: +1. Vulnerabilities ranked by severity and exploitability +2. CVE references where applicable +3. Potential impact of each vulnerability +4. Quick wins (easy fixes with high impact) + +Focus on actionable findings.""", + + LLM_ANALYSIS_REMEDIATION_PLAN: """You are a cybersecurity expert analyzing network scan results. +Provide a detailed remediation plan based on the scan data. +Include: +1. Prioritized remediation steps +2. Specific commands or configurations to fix issues +3. Estimated effort for each fix +4. Dependencies between fixes +5. Verification steps to confirm remediation + +Be practical and provide copy-paste ready solutions where possible.""", +} + + +class RedmeshLlmAgentApiPlugin(BasePlugin): + """ + RedMesh LLM Agent API plugin for DeepSeek integration. + + This plugin exposes FastAPI endpoints for: + - General chat completion via DeepSeek API + - Automated analysis of RedMesh scan results + + Attributes + ---------- + CONFIG : dict + Plugin configuration merged with BasePlugin defaults. + _api_key : str or None + DeepSeek API key loaded from environment. + _request_count : int + Total number of API requests made. + _error_count : int + Total number of failed requests. + """ + CONFIG = _CONFIG + + def on_init(self): + """Initialize plugin and validate DeepSeek API key.""" + super(RedmeshLlmAgentApiPlugin, self).on_init() + self._api_key = self._load_api_key() + self._request_count = 0 + self._error_count = 0 + self._last_request_time = None + + if not self._api_key: + self.P("WARNING: DeepSeek API key not configured! Set the DEEPSEEK_API_KEY environment variable.", color='r') + else: + self.P(f"RedMesh LLM Agent API initialized. Model: {self.cfg_deepseek_model}") + return + + def get_additional_fastapi_data(self): + """Override to return empty dict - no node metadata in responses.""" + return {} + + def _load_api_key(self) -> Optional[str]: + """ + Load API key from config or environment variable. + + Priority: + 1. DEEPSEEK_API_KEY config parameter (direct) + 2. Environment variable specified by DEEPSEEK_API_KEY_ENV + + Returns + ------- + str or None + The API key if found, otherwise None. + """ + # First check if API key is provided directly in config + api_key = self.cfg_deepseek_api_key + if api_key: + api_key = api_key.strip() + if api_key: + self.Pd("Using API key from config") + return api_key + + # Fallback to environment variable + env_name = self.cfg_deepseek_api_key_env + api_key = self.os_environ.get(env_name, None) + if api_key: + api_key = api_key.strip() + if api_key: + self.Pd(f"Using API key from environment variable {env_name}") + return api_key + + return None + + def P(self, s, *args, **kwargs): + """Prefixed logger for RedMesh LLM messages.""" + s = "[REDMESH_LLM] " + str(s) + return super(RedmeshLlmAgentApiPlugin, self).P(s, *args, **kwargs) + + def Pd(self, s, *args, score=-1, **kwargs): + """Debug logging with verbosity control.""" + if self.cfg_redmesh_verbose > score: + s = "[DEBUG] " + str(s) + self.P(s, *args, **kwargs) + return + + def _build_deepseek_request( + self, + messages: List[Dict], + model: Optional[str] = None, + temperature: Optional[float] = None, + max_tokens: Optional[int] = None, + top_p: Optional[float] = None, + ) -> Dict: + """ + Build the payload for DeepSeek API. + + Parameters + ---------- + messages : list of dict + Chat messages in OpenAI format. + model : str, optional + Model name override. + temperature : float, optional + Sampling temperature override. + max_tokens : int, optional + Max tokens override. + top_p : float, optional + Nucleus sampling override. + + Returns + ------- + dict + DeepSeek API request payload. + """ + return { + "model": model or self.cfg_deepseek_model, + "messages": messages, + "temperature": temperature if temperature is not None else self.cfg_default_temperature, + "max_tokens": max_tokens if max_tokens is not None else self.cfg_default_max_tokens, + "top_p": top_p if top_p is not None else self.cfg_default_top_p, + "stream": False, + } + + def _call_deepseek_api(self, payload: Dict) -> Dict: + """ + Execute HTTP request to DeepSeek API. + + Parameters + ---------- + payload : dict + Request payload for DeepSeek API. + + Returns + ------- + dict + API response or error object. + """ + if not self._api_key: + return { + "error": "DeepSeek API key not configured", + "status": LLM_API_STATUS_ERROR, + } + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self._api_key}" + } + + self._request_count += 1 + self._last_request_time = self.time() + + try: + self.Pd(f"Calling DeepSeek API: {self.cfg_deepseek_api_url}") + response = requests.post( + self.cfg_deepseek_api_url, + headers=headers, + json=payload, + timeout=self.cfg_request_timeout_seconds + ) + + if response.status_code != 200: + self._error_count += 1 + error_detail = response.text + try: + error_detail = response.json() + except Exception: + pass + return { + "error": f"DeepSeek API returned status {response.status_code}", + "status": LLM_API_STATUS_ERROR, + "details": error_detail, + } + + return response.json() + + except requests.exceptions.Timeout: + self._error_count += 1 + self.P("DeepSeek API request timed out", color='r') + return { + "error": "DeepSeek API request timed out", + "status": LLM_API_STATUS_TIMEOUT, + } + except requests.exceptions.RequestException as e: + self._error_count += 1 + self.P(f"DeepSeek API request failed: {e}", color='r') + return { + "error": str(e), + "status": LLM_API_STATUS_ERROR, + } + except Exception as e: + self._error_count += 1 + self.P(f"Unexpected error calling DeepSeek API: {e}\n{traceback.format_exc()}", color='r') + return { + "error": f"Unexpected error: {e}", + "status": LLM_API_STATUS_ERROR, + } + + def _validate_messages(self, messages: List[Dict]) -> Optional[str]: + """ + Validate chat messages format. + + Parameters + ---------- + messages : list of dict + Messages to validate. + + Returns + ------- + str or None + Error message if validation fails, otherwise None. + """ + if not isinstance(messages, list) or len(messages) == 0: + return "`messages` must be a non-empty list of message dicts." + + for idx, message in enumerate(messages): + if not isinstance(message, dict): + return f"Message at index {idx} must be a dict." + + role = message.get('role', None) + content = message.get('content', None) + + if role not in {'system', 'user', 'assistant'}: + return f"Message {idx} has invalid role '{role}'. Must be 'system', 'user', or 'assistant'." + + if not isinstance(content, str) or not content.strip(): + return f"Message {idx} content must be a non-empty string." + + return None + + """API ENDPOINTS""" + + @BasePlugin.endpoint(method="GET") + def health(self) -> Dict: + """ + Check API health and DeepSeek configuration. + + Returns + ------- + dict + Health status including API key presence and metrics. + """ + return { + "status": LLM_API_STATUS_OK, + "api_key_configured": self._api_key is not None, + "model": self.cfg_deepseek_model, + "api_url": self.cfg_deepseek_api_url, + "uptime_seconds": self.time() - self.start_time if hasattr(self, 'start_time') else 0, + "version": __VER__, + } + + @BasePlugin.endpoint(method="GET") + def status(self) -> Dict: + """ + Get detailed API status including request metrics. + + Returns + ------- + dict + Status with request counts and configuration. + """ + return { + "status": LLM_API_STATUS_OK, + "metrics": { + "total_requests": self._request_count, + "failed_requests": self._error_count, + "success_rate": ( + (self._request_count - self._error_count) / self._request_count * 100 + if self._request_count > 0 else 100.0 + ), + "last_request_time": self._last_request_time, + }, + "config": { + "model": self.cfg_deepseek_model, + "default_temperature": self.cfg_default_temperature, + "default_max_tokens": self.cfg_default_max_tokens, + "timeout_seconds": self.cfg_request_timeout_seconds, + }, + } + + @BasePlugin.endpoint(method="POST") + def chat( + self, + messages: List[Dict[str, Any]], + model: Optional[str] = None, + temperature: Optional[float] = None, + max_tokens: Optional[int] = None, + top_p: Optional[float] = None, + **kwargs + ) -> Dict: + """ + Send a chat completion request to DeepSeek API. + + Parameters + ---------- + messages : list of dict + Chat messages in OpenAI format: [{"role": "user", "content": "..."}] + model : str, optional + Model name (default: deepseek-chat) + temperature : float, optional + Sampling temperature 0-2 (default: 0.7) + max_tokens : int, optional + Max tokens to generate (default: 1024) + top_p : float, optional + Nucleus sampling (default: 1.0) + + Returns + ------- + dict + DeepSeek API response or error object. + """ + # Validate messages + err = self._validate_messages(messages) + if err is not None: + return { + "error": err, + "status": LLM_API_STATUS_ERROR, + } + + # Build and send request + payload = self._build_deepseek_request( + messages=messages, + model=model, + temperature=temperature, + max_tokens=max_tokens, + top_p=top_p, + ) + + self.Pd(f"Chat request: {len(messages)} messages, model={payload['model']}") + return self._call_deepseek_api(payload) + + @BasePlugin.endpoint(method="POST") + def analyze_scan( + self, + scan_results: Dict[str, Any], + analysis_type: str = LLM_ANALYSIS_SECURITY_ASSESSMENT, + focus_areas: Optional[List[str]] = None, + model: Optional[str] = None, + temperature: Optional[float] = None, + max_tokens: Optional[int] = None, + **kwargs + ) -> Dict: + """ + Analyze RedMesh scan results using DeepSeek LLM. + + Parameters + ---------- + scan_results : dict + RedMesh scan output containing open_ports, service_info, web_tests_info. + analysis_type : str, optional + Type of analysis to perform: + - "security_assessment" (default): Overall security posture evaluation + - "vulnerability_summary": Prioritized list of findings with severity + - "remediation_plan": Actionable steps to fix identified issues + focus_areas : list of str, optional + Specific areas to focus on: ["web", "network", "databases", "authentication"] + model : str, optional + Model name override. + temperature : float, optional + Sampling temperature override. + max_tokens : int, optional + Max tokens override. + + Returns + ------- + dict + LLM analysis response or error object. + """ + # Validate scan_results + if not isinstance(scan_results, dict): + return { + "error": "`scan_results` must be a dict containing scan data.", + "status": LLM_API_STATUS_ERROR, + } + + # Validate analysis_type + valid_types = [ + LLM_ANALYSIS_SECURITY_ASSESSMENT, + LLM_ANALYSIS_VULNERABILITY_SUMMARY, + LLM_ANALYSIS_REMEDIATION_PLAN, + ] + if analysis_type not in valid_types: + return { + "error": f"Invalid analysis_type '{analysis_type}'. Must be one of: {valid_types}", + "status": LLM_API_STATUS_ERROR, + } + + # Get system prompt for analysis type + system_prompt = ANALYSIS_PROMPTS.get(analysis_type, ANALYSIS_PROMPTS[LLM_ANALYSIS_SECURITY_ASSESSMENT]) + + # Add focus areas if provided + if focus_areas: + focus_str = ", ".join(focus_areas) + system_prompt += f"\n\nFocus your analysis on these areas: {focus_str}" + + # Format scan results for LLM + try: + scan_json = json.dumps(scan_results, indent=2, default=str) + except Exception as e: + return { + "error": f"Failed to serialize scan_results: {e}", + "status": LLM_API_STATUS_ERROR, + } + + # Build messages + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": f"Analyze the following scan results:\n\n```json\n{scan_json}\n```"}, + ] + + # Build and send request + # Use higher max_tokens for analysis by default + effective_max_tokens = max_tokens if max_tokens is not None else 2048 + + payload = self._build_deepseek_request( + messages=messages, + model=model, + temperature=temperature, + max_tokens=effective_max_tokens, + ) + + self.Pd(f"Analyze scan request: type={analysis_type}, focus={focus_areas}") + response = self._call_deepseek_api(payload) + + # Extract only what we need from the response + if "error" not in response: + # Get the analysis content from DeepSeek response + content = None + choices = response.get("choices", []) + if choices: + content = choices[0].get("message", {}).get("content", "") + + # Get token usage for cost tracking + usage = response.get("usage", {}) + + # Return clean, minimal structure + return { + "analysis_type": analysis_type, + "focus_areas": focus_areas, + "model": response.get("model"), + "content": content, + "usage": { + "prompt_tokens": usage.get("prompt_tokens"), + "completion_tokens": usage.get("completion_tokens"), + "total_tokens": usage.get("total_tokens"), + }, + "scan_summary": { + "open_ports": len(scan_results.get("open_ports", [])), + "has_service_info": "service_info" in scan_results, + "has_web_tests": "web_tests_info" in scan_results, + }, + "created_at": self.time(), + } + + return response + + """END API ENDPOINTS""" + + def process(self): + """Main plugin loop (minimal for this API-only plugin).""" + super(RedmeshLlmAgentApiPlugin, self).process() + return From a5511ceeb8768a597f1c7bcbeac389ce6c47a547 Mon Sep 17 00:00:00 2001 From: toderian Date: Sun, 1 Feb 2026 17:20:51 +0200 Subject: [PATCH 2/4] fix: move llm methods to a mixin --- .../cybersec/red_mesh/llm_agent_mixin.py | 334 ++++++++++++++++++ .../cybersec/red_mesh/pentester_api_01.py | 297 +--------------- 2 files changed, 341 insertions(+), 290 deletions(-) create mode 100644 extensions/business/cybersec/red_mesh/llm_agent_mixin.py diff --git a/extensions/business/cybersec/red_mesh/llm_agent_mixin.py b/extensions/business/cybersec/red_mesh/llm_agent_mixin.py new file mode 100644 index 00000000..21e7e1e1 --- /dev/null +++ b/extensions/business/cybersec/red_mesh/llm_agent_mixin.py @@ -0,0 +1,334 @@ +""" +LLM Agent API Mixin for RedMesh Pentester. + +This mixin provides LLM integration methods for analyzing scan results +via the RedMesh LLM Agent API (DeepSeek). + +Usage: + class PentesterApi01Plugin(_LlmAgentMixin, BasePlugin): + ... +""" + +import requests +from typing import Optional + + +class _LlmAgentMixin(object): + """ + Mixin providing LLM Agent API integration for RedMesh plugins. + + This mixin expects the host class to have the following config attributes: + - cfg_llm_agent_api_enabled: bool + - cfg_llm_agent_api_host: str + - cfg_llm_agent_api_port: int + - cfg_llm_agent_api_timeout: int + - cfg_llm_auto_analysis_type: str + + And the following methods/attributes: + - self.r1fs: R1FS instance + - self.P(): logging method + - self.Pd(): debug logging method + - self._get_aggregated_report(): report aggregation method + """ + + def __init__(self): + super(_LlmAgentMixin, self).__init__() + return + + def _get_llm_agent_api_url(self, endpoint: str) -> str: + """ + Build URL for LLM Agent API endpoint. + + Parameters + ---------- + endpoint : str + API endpoint path (e.g., "/chat", "/analyze_scan"). + + Returns + ------- + str + Full URL to the endpoint. + """ + host = self.cfg_llm_agent_api_host + port = self.cfg_llm_agent_api_port + endpoint = endpoint.lstrip("/") + return f"http://{host}:{port}/{endpoint}" + + def _call_llm_agent_api( + self, + endpoint: str, + method: str = "POST", + payload: dict = None, + timeout: int = None + ) -> dict: + """ + Make HTTP request to the LLM Agent API. + + Parameters + ---------- + endpoint : str + API endpoint to call (e.g., "/analyze_scan", "/health"). + method : str, optional + HTTP method (default: "POST"). + payload : dict, optional + JSON payload for POST requests. + timeout : int, optional + Request timeout in seconds. + + Returns + ------- + dict + API response or error object. + """ + if not self.cfg_llm_agent_api_enabled: + return {"error": "LLM Agent API is not enabled", "status": "disabled"} + + if not self.cfg_llm_agent_api_port: + return {"error": "LLM Agent API port not configured", "status": "config_error"} + + url = self._get_llm_agent_api_url(endpoint) + timeout = timeout or self.cfg_llm_agent_api_timeout + + try: + self.Pd(f"Calling LLM Agent API: {method} {url}") + + if method.upper() == "GET": + response = requests.get(url, timeout=timeout) + else: + response = requests.post( + url, + json=payload or {}, + headers={"Content-Type": "application/json"}, + timeout=timeout + ) + + if response.status_code != 200: + return { + "error": f"LLM Agent API returned status {response.status_code}", + "status": "api_error", + "details": response.text + } + + # Unwrap response if FastAPI wrapped it (extract 'result' from envelope) + response_data = response.json() + if isinstance(response_data, dict) and "result" in response_data: + return response_data["result"] + return response_data + + except requests.exceptions.ConnectionError: + self.P(f"LLM Agent API not reachable at {url}", color='y') + return {"error": "LLM Agent API not reachable", "status": "connection_error"} + except requests.exceptions.Timeout: + self.P(f"LLM Agent API request timed out", color='y') + return {"error": "LLM Agent API request timed out", "status": "timeout"} + except Exception as e: + self.P(f"Error calling LLM Agent API: {e}", color='r') + return {"error": str(e), "status": "error"} + + def _auto_analyze_report(self, job_id: str, report: dict, target: str) -> Optional[dict]: + """ + Automatically analyze a completed scan report using LLM Agent API. + + Parameters + ---------- + job_id : str + Identifier of the completed job. + report : dict + Aggregated scan report to analyze. + target : str + Target hostname/IP that was scanned. + + Returns + ------- + dict or None + LLM analysis result or None if disabled/failed. + """ + if not self.cfg_llm_agent_api_enabled: + self.Pd("LLM auto-analysis skipped (not enabled)") + return None + + self.P(f"Running LLM auto-analysis for job {job_id}, target {target}...") + + analysis_result = self._call_llm_agent_api( + endpoint="/analyze_scan", + method="POST", + payload={ + "scan_results": report, + "analysis_type": self.cfg_llm_auto_analysis_type, + "focus_areas": None, + } + ) + + if "error" in analysis_result: + self.P(f"LLM auto-analysis failed for job {job_id}: {analysis_result.get('error')}", color='y') + else: + self.P(f"LLM auto-analysis completed for job {job_id}") + + return analysis_result + + def _collect_aggregated_report(self, workers: dict) -> dict: + """ + Collect and aggregate reports from all workers. + + Parameters + ---------- + workers : dict + Worker entries from job_specs containing report_cid or result. + + Returns + ------- + dict + Aggregated report combining all worker data. + """ + all_reports = {} + + for addr, worker_entry in workers.items(): + report = None + report_cid = worker_entry.get("report_cid") + + # Try to fetch from R1FS first + if report_cid: + try: + report = self.r1fs.get_json(report_cid) + self.Pd(f"Fetched report from R1FS for worker {addr}: CID {report_cid}") + except Exception as e: + self.P(f"Failed to fetch report from R1FS for {addr}: {e}", color='y') + + # Fallback to direct result + if not report: + report = worker_entry.get("result") + + if report: + all_reports[addr] = report + + if not all_reports: + self.P("No reports found to aggregate", color='y') + return {} + + # Aggregate all reports (method from host class) + aggregated = self._get_aggregated_report(all_reports) + return aggregated + + def _run_aggregated_llm_analysis( + self, + job_id: str, + job_specs: dict, + workers: dict, + pass_nr: int = None + ) -> Optional[str]: + """ + Run LLM analysis on aggregated report from all workers. + + Called by the launcher node after all workers complete. + + Parameters + ---------- + job_id : str + Identifier of the job. + job_specs : dict + Job specification (will be updated with analysis CID). + workers : dict + Worker entries containing report data. + pass_nr : int, optional + Pass number for continuous monitoring jobs. + + Returns + ------- + str or None + Analysis CID if successful, None otherwise. + """ + target = job_specs.get("target", "unknown") + run_mode = job_specs.get("run_mode", "SINGLEPASS") + pass_info = f" (pass {pass_nr})" if pass_nr else "" + self.P(f"Running aggregated LLM analysis for job {job_id}{pass_info}, target {target}...") + + # Collect and aggregate reports from all workers + aggregated_report = self._collect_aggregated_report(workers) + + if not aggregated_report: + self.P(f"No data to analyze for job {job_id}", color='y') + return None + + # Add job metadata to report for context + aggregated_report["_job_metadata"] = { + "job_id": job_id, + "target": target, + "num_workers": len(workers), + "worker_addresses": list(workers.keys()), + "start_port": job_specs.get("start_port"), + "end_port": job_specs.get("end_port"), + "enabled_features": job_specs.get("enabled_features", []), + "run_mode": run_mode, + "pass_nr": pass_nr, + } + + # Call LLM analysis + llm_analysis = self._auto_analyze_report(job_id, aggregated_report, target) + + if not llm_analysis or "error" in llm_analysis: + self.P( + f"LLM analysis failed for job {job_id}: {llm_analysis.get('error') if llm_analysis else 'No response'}", + color='y' + ) + return None + + # Save analysis to R1FS + try: + analysis_cid = self.r1fs.add_json(llm_analysis, show_logs=False) + if analysis_cid: + # Always store in pass_history for consistency (both SINGLEPASS and CONTINUOUS) + pass_history = job_specs.get("pass_history", []) + for entry in pass_history: + if entry.get("pass_nr") == pass_nr: + entry["llm_analysis_cid"] = analysis_cid + break + self.P(f"LLM analysis for pass {pass_nr} saved, CID: {analysis_cid}") + return analysis_cid + else: + self.P(f"Failed to save LLM analysis to R1FS for job {job_id}", color='y') + return None + except Exception as e: + self.P(f"Error saving LLM analysis to R1FS: {e}", color='r') + return None + + def _get_llm_health_status(self) -> dict: + """ + Check health of the LLM Agent API connection. + + Returns + ------- + dict + Health status of the LLM Agent API. + """ + if not self.cfg_llm_agent_api_enabled: + return { + "enabled": False, + "status": "disabled", + "message": "LLM Agent API integration is disabled", + } + + if not self.cfg_llm_agent_api_port: + return { + "enabled": True, + "status": "config_error", + "message": "LLM Agent API port not configured", + } + + result = self._call_llm_agent_api(endpoint="/health", method="GET", timeout=5) + + if "error" in result: + return { + "enabled": True, + "status": result.get("status", "error"), + "message": result.get("error"), + "host": self.cfg_llm_agent_api_host, + "port": self.cfg_llm_agent_api_port, + } + + return { + "enabled": True, + "status": "ok", + "host": self.cfg_llm_agent_api_host, + "port": self.cfg_llm_agent_api_port, + "llm_agent_health": result, + } diff --git a/extensions/business/cybersec/red_mesh/pentester_api_01.py b/extensions/business/cybersec/red_mesh/pentester_api_01.py index da4d6dab..c0fd74c0 100644 --- a/extensions/business/cybersec/red_mesh/pentester_api_01.py +++ b/extensions/business/cybersec/red_mesh/pentester_api_01.py @@ -31,10 +31,10 @@ """ import random -import requests from naeural_core.business.default.web_app.fast_api_web_app import FastApiWebAppPlugin as BasePlugin from .redmesh_utils import PentestLocalWorker # Import PentestJob from separate module +from .llm_agent_mixin import _LlmAgentMixin from .constants import ( FEATURE_CATALOG, LLM_ANALYSIS_SECURITY_ASSESSMENT, @@ -46,6 +46,7 @@ _CONFIG = { **BasePlugin.CONFIG, + "TUNNEL_ENGINE_ENABLED": False, 'PORT': None, @@ -55,9 +56,9 @@ "CHECK_JOBS_EACH" : 5, "REDMESH_VERBOSE" : 10, # Verbosity level for debug messages (0 = off, 1+ = debug) - + "NR_LOCAL_WORKERS" : 8, - + "WARMUP_DELAY" : 30, # Defines how ports are split across local workers. @@ -82,11 +83,11 @@ "LLM_AUTO_ANALYSIS_TYPE": "security_assessment", # Default analysis type 'VALIDATION_RULES': { - **BasePlugin.CONFIG['VALIDATION_RULES'], + **BasePlugin.CONFIG['VALIDATION_RULES'], }, } -class PentesterApi01Plugin(BasePlugin): +class PentesterApi01Plugin(_LlmAgentMixin, BasePlugin): """ RedMesh API plugin for orchestrating decentralized pentest jobs. @@ -197,259 +198,6 @@ def Pd(self, s, *args, score=-1, **kwargs): return - """LLM AGENT API CLIENT METHODS""" - - def _get_llm_agent_api_url(self, endpoint: str) -> str: - """ - Build URL for LLM Agent API endpoint. - - Parameters - ---------- - endpoint : str - API endpoint path (e.g., "/chat", "/analyze_scan"). - - Returns - ------- - str - Full URL to the endpoint. - """ - host = self.cfg_llm_agent_api_host - port = self.cfg_llm_agent_api_port - endpoint = endpoint.lstrip("/") - return f"http://{host}:{port}/{endpoint}" - - def _call_llm_agent_api( - self, - endpoint: str, - method: str = "POST", - payload: dict = None, - timeout: int = None - ) -> dict: - """ - Make HTTP request to the LLM Agent API. - - Parameters - ---------- - endpoint : str - API endpoint to call (e.g., "/analyze_scan", "/health"). - method : str, optional - HTTP method (default: "POST"). - payload : dict, optional - JSON payload for POST requests. - timeout : int, optional - Request timeout in seconds. - - Returns - ------- - dict - API response or error object. - """ - if not self.cfg_llm_agent_api_enabled: - return {"error": "LLM Agent API is not enabled", "status": "disabled"} - - if not self.cfg_llm_agent_api_port: - return {"error": "LLM Agent API port not configured", "status": "config_error"} - - url = self._get_llm_agent_api_url(endpoint) - timeout = timeout or self.cfg_llm_agent_api_timeout - - try: - self.Pd(f"Calling LLM Agent API: {method} {url}") - - if method.upper() == "GET": - response = requests.get(url, timeout=timeout) - else: - response = requests.post( - url, - json=payload or {}, - headers={"Content-Type": "application/json"}, - timeout=timeout - ) - - if response.status_code != 200: - return { - "error": f"LLM Agent API returned status {response.status_code}", - "status": "api_error", - "details": response.text - } - - # Unwrap response if FastAPI wrapped it (extract 'result' from envelope) - response_data = response.json() - if isinstance(response_data, dict) and "result" in response_data: - return response_data["result"] - return response_data - - except requests.exceptions.ConnectionError: - self.P(f"LLM Agent API not reachable at {url}", color='y') - return {"error": "LLM Agent API not reachable", "status": "connection_error"} - except requests.exceptions.Timeout: - self.P(f"LLM Agent API request timed out", color='y') - return {"error": "LLM Agent API request timed out", "status": "timeout"} - except Exception as e: - self.P(f"Error calling LLM Agent API: {e}", color='r') - return {"error": str(e), "status": "error"} - - def _auto_analyze_report(self, job_id: str, report: dict, target: str) -> dict: - """ - Automatically analyze a completed scan report using LLM Agent API. - - Parameters - ---------- - job_id : str - Identifier of the completed job. - report : dict - Aggregated scan report to analyze. - target : str - Target hostname/IP that was scanned. - - Returns - ------- - dict - LLM analysis result or error object. - """ - if not self.cfg_llm_agent_api_enabled: - self.Pd("LLM auto-analysis skipped (not enabled)") - return None - - self.P(f"Running LLM auto-analysis for job {job_id}, target {target}...") - - analysis_result = self._call_llm_agent_api( - endpoint="/analyze_scan", - method="POST", - payload={ - "scan_results": report, - "analysis_type": self.cfg_llm_auto_analysis_type, - "focus_areas": None, - } - ) - - if "error" in analysis_result: - self.P(f"LLM auto-analysis failed for job {job_id}: {analysis_result.get('error')}", color='y') - else: - self.P(f"LLM auto-analysis completed for job {job_id}") - - return analysis_result - - def _collect_aggregated_report(self, workers: dict) -> dict: - """ - Collect and aggregate reports from all workers. - - Parameters - ---------- - workers : dict - Worker entries from job_specs containing report_cid or result. - - Returns - ------- - dict - Aggregated report combining all worker data. - """ - all_reports = {} - - for addr, worker_entry in workers.items(): - report = None - report_cid = worker_entry.get("report_cid") - - # Try to fetch from R1FS first - if report_cid: - try: - report = self.r1fs.get_json(report_cid) - self.Pd(f"Fetched report from R1FS for worker {addr}: CID {report_cid}") - except Exception as e: - self.P(f"Failed to fetch report from R1FS for {addr}: {e}", color='y') - - # Fallback to direct result - if not report: - report = worker_entry.get("result") - - if report: - all_reports[addr] = report - - if not all_reports: - self.P("No reports found to aggregate", color='y') - return {} - - # Aggregate all reports - aggregated = self._get_aggregated_report(all_reports) - return aggregated - - def _run_aggregated_llm_analysis(self, job_id: str, job_specs: dict, workers: dict, pass_nr: int = None): - """ - Run LLM analysis on aggregated report from all workers. - - Called by the launcher node after all workers complete. - - Parameters - ---------- - job_id : str - Identifier of the job. - job_specs : dict - Job specification (will be updated with analysis CID). - workers : dict - Worker entries containing report data. - pass_nr : int, optional - Pass number for continuous monitoring jobs. If provided, analysis CID - is stored in pass_history entry instead of job-level. - - Returns - ------- - str or None - Analysis CID if successful, None otherwise. - """ - target = job_specs.get("target", "unknown") - run_mode = job_specs.get("run_mode", "SINGLEPASS") - pass_info = f" (pass {pass_nr})" if pass_nr else "" - self.P(f"Running aggregated LLM analysis for job {job_id}{pass_info}, target {target}...") - - # Collect and aggregate reports from all workers - aggregated_report = self._collect_aggregated_report(workers) - - if not aggregated_report: - self.P(f"No data to analyze for job {job_id}", color='y') - return None - - # Add job metadata to report for context - aggregated_report["_job_metadata"] = { - "job_id": job_id, - "target": target, - "num_workers": len(workers), - "worker_addresses": list(workers.keys()), - "start_port": job_specs.get("start_port"), - "end_port": job_specs.get("end_port"), - "enabled_features": job_specs.get("enabled_features", []), - "run_mode": run_mode, - "pass_nr": pass_nr, - } - - # Call LLM analysis - llm_analysis = self._auto_analyze_report(job_id, aggregated_report, target) - - if not llm_analysis or "error" in llm_analysis: - self.P(f"LLM analysis failed for job {job_id}: {llm_analysis.get('error') if llm_analysis else 'No response'}", color='y') - return None - - # Save analysis to R1FS - try: - analysis_cid = self.r1fs.add_json(llm_analysis, show_logs=False) - if analysis_cid: - # Always store in pass_history for consistency (both SINGLEPASS and CONTINUOUS) - pass_history = job_specs.get("pass_history", []) - for entry in pass_history: - if entry.get("pass_nr") == pass_nr: - entry["llm_analysis_cid"] = analysis_cid - break - self.P(f"LLM analysis for pass {pass_nr} saved, CID: {analysis_cid}") - return analysis_cid - else: - self.P(f"Failed to save LLM analysis to R1FS for job {job_id}", color='y') - return None - except Exception as e: - self.P(f"Error saving LLM analysis to R1FS: {e}", color='r') - return None - - """END LLM AGENT API CLIENT METHODS""" - - def __post_init(self): """ Perform warmup: reconcile existing jobs in CStore, migrate legacy keys, @@ -1988,38 +1736,7 @@ def llm_health(self): dict Health status of the LLM Agent API. """ - if not self.cfg_llm_agent_api_enabled: - return { - "enabled": False, - "status": "disabled", - "message": "LLM Agent API is not enabled in configuration" - } - - if not self.cfg_llm_agent_api_port: - return { - "enabled": True, - "status": "config_error", - "message": "LLM Agent API port not configured" - } - - result = self._call_llm_agent_api(endpoint="/health", method="GET", timeout=5) - - if "error" in result: - return { - "enabled": True, - "status": result.get("status", "error"), - "message": result.get("error"), - "host": self.cfg_llm_agent_api_host, - "port": self.cfg_llm_agent_api_port, - } - - return { - "enabled": True, - "status": "ok", - "host": self.cfg_llm_agent_api_host, - "port": self.cfg_llm_agent_api_port, - "llm_agent_health": result, - } + return self._get_llm_health_status() def process(self): From ba7d58b6a01fd1d84376dc47aae2fab9ad3059b0 Mon Sep 17 00:00:00 2001 From: toderian Date: Sun, 1 Feb 2026 21:23:16 +0200 Subject: [PATCH 3/4] fix: rename redmesh llm agent mixin --- extensions/business/cybersec/red_mesh/pentester_api_01.py | 4 ++-- .../{llm_agent_mixin.py => redmesh_llm_agent_mixin.py} | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) rename extensions/business/cybersec/red_mesh/{llm_agent_mixin.py => redmesh_llm_agent_mixin.py} (98%) diff --git a/extensions/business/cybersec/red_mesh/pentester_api_01.py b/extensions/business/cybersec/red_mesh/pentester_api_01.py index c0fd74c0..dbba0e31 100644 --- a/extensions/business/cybersec/red_mesh/pentester_api_01.py +++ b/extensions/business/cybersec/red_mesh/pentester_api_01.py @@ -34,7 +34,7 @@ from naeural_core.business.default.web_app.fast_api_web_app import FastApiWebAppPlugin as BasePlugin from .redmesh_utils import PentestLocalWorker # Import PentestJob from separate module -from .llm_agent_mixin import _LlmAgentMixin +from .redmesh_llm_agent_mixin import _RedMeshLlmAgentMixin from .constants import ( FEATURE_CATALOG, LLM_ANALYSIS_SECURITY_ASSESSMENT, @@ -87,7 +87,7 @@ }, } -class PentesterApi01Plugin(_LlmAgentMixin, BasePlugin): +class PentesterApi01Plugin(BasePlugin, _RedMeshLlmAgentMixin): """ RedMesh API plugin for orchestrating decentralized pentest jobs. diff --git a/extensions/business/cybersec/red_mesh/llm_agent_mixin.py b/extensions/business/cybersec/red_mesh/redmesh_llm_agent_mixin.py similarity index 98% rename from extensions/business/cybersec/red_mesh/llm_agent_mixin.py rename to extensions/business/cybersec/red_mesh/redmesh_llm_agent_mixin.py index 21e7e1e1..a71fb17a 100644 --- a/extensions/business/cybersec/red_mesh/llm_agent_mixin.py +++ b/extensions/business/cybersec/red_mesh/redmesh_llm_agent_mixin.py @@ -13,7 +13,7 @@ class PentesterApi01Plugin(_LlmAgentMixin, BasePlugin): from typing import Optional -class _LlmAgentMixin(object): +class _RedMeshLlmAgentMixin(object): """ Mixin providing LLM Agent API integration for RedMesh plugins. @@ -31,8 +31,8 @@ class _LlmAgentMixin(object): - self._get_aggregated_report(): report aggregation method """ - def __init__(self): - super(_LlmAgentMixin, self).__init__() + def __init__(self, **kwargs): + super(_RedMeshLlmAgentMixin, self).__init__(**kwargs) return def _get_llm_agent_api_url(self, endpoint: str) -> str: From ff28568ffbc722ff49d87d3681b331714b5666f6 Mon Sep 17 00:00:00 2001 From: toderian Date: Wed, 4 Feb 2026 15:37:26 +0200 Subject: [PATCH 4/4] chore: inc version --- ver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ver.py b/ver.py index 80d8c788..31928058 100644 --- a/ver.py +++ b/ver.py @@ -1 +1 @@ -__VER__ = '2.10.32' +__VER__ = '2.10.33'