diff --git a/.env.example b/.env.example index ee59ea6..40a019b 100644 --- a/.env.example +++ b/.env.example @@ -1,2 +1,16 @@ +# Required Authentication Token OPENAI_API_KEY=your-key-here -GITHUB_TOKEN=your-token-here \ No newline at end of file +GITHUB_TOKEN=your-token-here +USER_NAME=your-github-username +DISCORD_WEBHOOK_URL=your-discord-webhook +SLACK_WEBHOOK_URL=your-slack-webhook +SNYK_TOKEN=your_token_here + +# File Extensions to collect from Github +GITHUB_EXTENSIONS=.js,.mjs,.jsx,.ts + +# Semgrep Rule (default : JavaScript) +SEMGREP_RULE=p/javascript + +# Server URL +LOG_API_URL=https://autofic-core-kmw6.onrender.com/ diff --git a/.github/workflows/autofic_javascript.yml b/.github/workflows/autofic_javascript.yml new file mode 100644 index 0000000..b6de162 --- /dev/null +++ b/.github/workflows/autofic_javascript.yml @@ -0,0 +1,52 @@ +name: Autofic SAST for Selected JavaScript Repos + +on: + schedule: + - cron: '00 21 * * 0' # 매주 월요일 오전 6시 + workflow_dispatch: + +jobs: + find-and-run: + runs-on: ubuntu-latest + + steps: + - name: Checkout this repository + uses: actions/checkout@v4 + with: + persist-credentials: true + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + pip install --upgrade pip + pip install -r requirements.txt + pip install -e . + + - name: Set Git config + run: | + git config --global user.email "github-actions@users.noreply.github.com" + git config --global user.name "github-actions" + + - name: Run ci_automation.py automatically + env: + GITHUB_TOKEN: ${{ secrets.GIT_TOKEN }} + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + USER_NAME: ${{ secrets.USER_NAME }} + DISCORD_WEBHOOK_URL: ${{ secrets.DISCORD_WEBHOOK_URL }} + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} + RESULT_SAVE_DIR: ${{ github.workspace }}/result, + LOG_API_URL: ${{secrets.LOG_API_URL}} + run: | + python src/autofic_core/ci_cd_auto/ci_automation.py + + - name: Trigger dashboard update in Dashboard repo + run: | + curl -X POST \ + -H "Accept: application/vnd.github+json" \ + -H "Authorization: Bearer ${{ secrets.GIT_TOKEN }}" \ + https://api.github.com/repos/Autofic/Dashboard/actions/workflows/update_dashboard.yml/dispatches \ + -d '{"ref":"main"}' diff --git a/.github/workflows/check_approval.yml b/.github/workflows/check_approval.yml new file mode 100644 index 0000000..647408e --- /dev/null +++ b/.github/workflows/check_approval.yml @@ -0,0 +1,29 @@ +name: Check PR Approval Status + +on: + schedule: + - cron: '00 21 * * *' # 매일 오전 6시 + workflow_dispatch: + +jobs: + check-approval: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install dependencies + run: pip install requests python-dotenv + + - name: Check PR Approval Status + env: + GITHUB_TOKEN: ${{ secrets.GIT_TOKEN }} + LOG_API_URL: ${{ secrets.LOG_API_URL }} + run: | + python src/autofic_core/log/check_approval.py diff --git a/.gitignore b/.gitignore index 0a19790..963745e 100644 --- a/.gitignore +++ b/.gitignore @@ -172,3 +172,9 @@ cython_debug/ # PyPI configuration file .pypirc + +# macOS +.DS_Store + +# backup files +log_backup.json \ No newline at end of file diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..935eda1 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,2 @@ +include README.md LICENSE +recursive-include src/autofic_core *.json *.md *.yml \ No newline at end of file diff --git a/README.md b/README.md index bcdca3e..52db375 100644 --- a/README.md +++ b/README.md @@ -1,147 +1,178 @@ -# ⚙️ AutoFiC +# AutoFiC -**LLM을 활용한 취약한 소스코드 수정 솔루션** +> **Remediate vulnerable source code at scale using LLMs and automation.** ---- +[![License](https://img.shields.io/github/license/AutoFiC/autofic-core)](./LICENSE) +[![Python](https://img.shields.io/badge/python-3.8+-blue.svg)](https://www.python.org/) -## 🚀 개발 환경 세팅 -### 1. Python 설치 -- [Python 공식 다운로드](https://www.python.org/downloads/) -- **Python 3.8 이상** 설치 권장 -- 설치 시 "Add Python to PATH" 옵션 반드시 체크 +## 🚀 Overview -### 2. Git 설치 및 레포지토리 클론 -- [Git 다운로드](https://git-scm.com/downloads) -- 터미널/명령 프롬프트/PowerShell/터미널 앱에서: - ``` - git clone https://github.com/AutoFiC/autofic-core.git - cd autofic-core - ``` +**AutoFiC** is the project, providing a CLI-based automation pipeline for detecting, analyzing, and remediating source code vulnerabilities using the power of LLMs and static analysis tools. -### 3. 가상환경(venv) 생성 및 활성화 +The project is designed for **automated security auditing, bulk code scanning, and mass vulnerability remediation** across multiple repositories, with seamless integration into modern CI/CD workflows. -- Windows (CMD) - ``` - python -m venv venv - venv\Scripts\activate - ``` -- Windows (PowerShell) - ``` - python -m venv venv - .\venv\Scripts\activate - ``` +## ✨ Features -- Windows (Git Bash) - ``` - python -m venv venv - source venv/Scripts/activate - ``` +- **Automated Vulnerability Detection** + Integrates with tools like **CodeQL, Semgrep, Snyk Code** to identify vulnerabilities in source code. -- macOS / Linux (터미널) - ``` - python3 -m venv venv - source venv/bin/activate - ``` +- **LLM-Powered Remediation** + Uses Large Language Models to suggest and patch vulnerabilities automatically. -> 가상환경이 활성화되면 프롬프트 앞에 `(venv)`가 표시됩니다. +- **Multi-Repository Support** + Bulk-clone and analyze many repositories with configurable filters (e.g., stars, language). + +- **CLI Tooling** + Command-line interface for easy integration into scripts and CI/CD pipelines. + +- **SARIF/JSON Reporting** + Outputs results in standardized formats for downstream processing or dashboards. + +- **Extensible and Modular** + Easily extend with new vulnerability scanners, languages, or custom rules. -### 4. pip 최신화 (권장) -``` -pip install --upgrade pip -``` -### 5. 필수 라이브러리 및 개발 모드 설치 +## 🏗️ Architecture + ``` -pip install -r requirements.txt -pip install -e . + +---------------------+ + | [GitHub Repos] | + +----------+----------+ + | + v + +---------------------+ + | Vulnerability Scan | (CodeQL / Semgrep / Snyk) + +----------+----------+ + | + SARIF/JSON v + +---------------------+ + | autofic-core | + | (Orchestrator) | + +----------+----------+ + | + +------------------+-------------------+ + | | + v v + +---------------------+ +---------------------+ + | LLM-based Patch |<-------------->| Patch Validator | + | (OpenAI, etc.) | | (Optional CI) | + +---------------------+ +---------------------+ + | + v + +---------------+ + | Auto PR to | + | GitHub Repo | + +---------------+ ``` +- **Vulnerability Scan** : Detect vulnerabilities with static analysis tools (CodeQL, Semgrep, Snyk). +- **autofic-core** : Parses findings, sends code to LLM, receives patch suggestions, applies fixes. +- **LLM-based Patch** : Uses large language models (e.g., OpenAI) to generate secure code patches. +- **Patch Validator (Optional)** : Runs CI/tests to validate patches. +- **Auto PR** : Automatically creates a pull request with the fix to the target repository. -### 6. 환경변수 파일 준비 -``` -cp .env.example .env -``` -> `.env` 파일에 본인의 GitHub 토큰, OpenAI API 키 등 필요한 값을 입력하세요. +## ⚡ Getting Started ---- +### 1. Prerequisites -## ⚡ 실행 방법 +- **Python 3.8+** +- [CodeQL CLI](https://codeql.github.com/docs/codeql-cli/) *(for CodeQL support)* +- [Semgrep CLI](https://semgrep.dev/docs/cli/) *(for Semgrep support)* +- [Snyk CLI](https://docs.snyk.io/snyk-cli/install-the-snyk-cli) *(optional)* +- GitHub Personal Access Token (if accessing private repos) -### 1. CLI 직접 실행 +### 2. Installation -``` -python -m autofic_core.cli --repo https://github.com/AutoFiC/autofic-core.git -``` +Clone the repo and install dependencies: -### 2. 명령어로 실행 (개발 모드 설치 후) +```bash +git clone https://github.com/AutoFiC/autofic-core.git +cd autofic-core +python -m venv .venv +source .venv/bin/activate # (Windows: .venv\Scripts\activate) +pip install --upgrade pip; pip install -r requirements.txt; pip install -e .; +```` -``` -autofic-core --repo https://github.com/AutoFiC/autofic-core.git -``` +### 3. Usage ---- +#### 🚦 CLI Example -## 🧪 테스트 방법 +```bash +python -m autofic_core.cli \ + --repo \ + --sast \ + --llm \ + --save-dir \ + --patch \ + --pr ``` -pytest tests/ -``` - -- 모든 테스트가 **passed** 되면 정상 - ---- - -## 📁 주요 파일 설명 - -| 파일/폴더 | 설명 | -|-----------------------|-------------------------------------------| -| src/autofic_core/ | 핵심 기능 Python 소스코드 | -| tests/ | 테스트 코드 | -| requirements.txt | 필수 라이브러리 목록 | -| pyproject.toml | 패키지/배포/엔트리포인트 설정 | -| .env.example | 환경변수 템플릿 (실제 값은 .env에 입력) | -| .gitignore | Git에 올리지 않을 파일/폴더 목록 | -| LICENSE | 오픈소스 라이선스(MIT) | -| README.md | 이 문서 | - ---- - -## 👥 협업 규칙 - -- **가상환경(venv)과 .env 파일은 Git에 올리지 마세요!** -- 기능 추가/수정은 반드시 브랜치 생성 후 Pull Request로 병합 -- 코드 리뷰/테스트 통과 후 main 브랜치에 반영 - ---- - -## 📝 커밋 메시지 규칙 - -- 커밋 메시지는 아래 형식을 지켜주세요. - - `Add: ...` (새 기능) - - `Fix: ...` (버그 수정) - - `Update: ...` (기존 코드/문서/설정 변경) - - `Remove: ...` (삭제) - - `Refactor: ...` (구조 개선) - - `Docs: ...` (문서) - - `Test: ...` (테스트) - - `Chore: ...` (환경/설정) -- 예시: - - `Add: SAST 실행 기능 구현` - - `Fix: 파일 필터링 버그 수정` - - `Docs: README 업데이트` - ---- - -## 🌿 브랜치명 규칙 - -- 브랜치명은 아래 형식을 권장합니다. - - `feature/기능명` (새 기능) - - `bugfix/이슈번호-설명` (버그 수정) - - `docs/문서명` (문서) - - `test/설명` (테스트) -- 예시: - - `feature/github-api-integration` - - `bugfix/34-filter-extension-error` - - `docs/update-readme` + +- --repo : Target repository URL +- --sast : Vulnerability scanner to use (semgrep, codeql, etc.) +- --llm : Enable LLM-based remediation +- --save-dir : Directory to store scan results +- --patch : Apply suggested patches +- --pr : Automatically create a Pull Request with fixes + +#### 🔄 Typical Workflow +- Scan the target repository for vulnerabilities using static analysis. +- Remediate detected vulnerabilities with automated LLM-based patch suggestions. +- Generate reports and/or create a Pull Request with the security fixes. +- See python -m autofic_core.cli --help for the full list of options and usage details. + + +## 🧩 Configuration + +Configuration is done via CLI flags and/or `.env` files. + +* `GITHUB_TOKEN` - For accessing private repositories and creating pull requests. +* `OPENAI_API_KEY` - For LLM-powered patch suggestions. +* `USER_NAME` - Name or ID for audit trails or commit information. +* `DISCORD_WEBHOOK_URL` - (Optional) Discord webhook URL for notifications. +* `SLACK_WEBHOOK_URL` - (Optional) Slack webhook URL for notifications. + + +## 🤝 Contributing + +We welcome all contributions! + +1. Fork the repo and create your branch : `git checkout -b feature/your-feature` +2. Commit your changes : `git commit -am 'Add new feature'` +3. Push to the branch : `git push origin feature/your-feature` +4. Open a Pull Request + + +## 📄 License + +This project is licensed under the Apache 2.0 License - see the [LICENSE](https://github.com/AutoFiC/autofic-core/blob/dev/LICENSE) file for details. + + +## 🙋 Contact + +* Issues/Feature Requests : [GitHub Issues](https://github.com/AutoFiC/autofic-core/issues) +* Main Team : [AutoFiC Organization](https://github.com/AutoFiC) +* Main Page : [AutoFiC Official](https://autofic.github.io) + + +## 👨‍💻 Developers + +**👩🏻‍💻 Development Team** +- Minchae Kim ([@minxxcozy](https://github.com/minxxcozy)) +- Eunsol Kim ([@eunsol1530](https://github.com/eunsol1530)) +- Jeongmin Oh ([@soonnae](https://github.com/soonnae)) +- Inyeong Jang ([@inyeongjang](https://github.com/inyeongjang)) + +**🔬 Research Team** +- Seonju Park ([@seoonju](https://github.com/seoonju)) +- Hongseo Jang ([@pxxguin](https://github.com/pxxguin)) +- Yunji Jeong ([@jungyun404](https://github.com/jungyun404)) +- Yunjeong Choe ([@yjchoe818](https://github.com/yjchoe818)) + +**👨🏻‍🏫 Mentor** +- Suhyun Park ([@lovehyun](https://github.com/lovehyun)) + +**👨🏻‍🏫 Project Leader** +- Changhyun Lee ([@eeche](https://github.com/eeche)) diff --git a/pyproject.toml b/pyproject.toml index f237b54..5afa1e7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,20 +6,40 @@ build-backend = "setuptools.build_meta" name = "autofic-core" version = "0.1.0" description = "A solution for remediating vulnerable source code using LLMs." -authors = [{name = "inyeongjang", email = "inyeongjang@gmail.com"}] +authors = [ + {name = "autofic", email = "autofic.whs@gmail.com"} +] readme = "README.md" license = {file = "LICENSE"} requires-python = ">=3.8" dependencies = [ - "click>=8.1.3", - "requests>=2.28.1", - "python-dotenv>=0.21.0", - "rich>=13.3.1" + "click>=8.1.3,<9.0.0", + "requests>=2.28.1,<3.0.0", + "python-dotenv>=0.21.0,<1.0.0", + "rich>=13.3.1,<14.0.0", + "openai>=1.0.0,<2.0.0", + "pyfiglet>=1.0.3,<2.0.0" +] +classifiers = [ + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Topic :: Software Development :: Build Tools", ] +keywords = ["llm", "security", "autofix", "static analysis", "code remediation"] [project.scripts] autofic-core = "autofic_core.cli:main" [tool.setuptools] -packages = ["autofic_core"] package-dir = {"" = "src"} + +[tool.setuptools.packages.find] +where = ["src"] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 529473d..635c34e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,31 @@ -click -pytest -requests -python-dotenv \ No newline at end of file +certifi==2025.4.26 +cffi==1.17.1 +charset-normalizer==3.4.2 +click==8.1.8 +cryptography==45.0.2 +Deprecated==1.2.18 +idna==3.10 +iniconfig==2.1.0 +markdown-it-py==3.0.0 +mdurl==0.1.2 +packaging==25.0 +pluggy==1.6.0 +pycparser==2.22 +PyGithub==2.6.1 +Pygments==2.19.1 +PyJWT==2.10.1 +PyNaCl==1.5.0 +pytest==8.3.5 +python-dotenv==1.1.0 +requests==2.32.3 +rich==13.5.2 +typing_extensions==4.13.2 +urllib3==2.4.0 +wrapt==1.17.2 +semgrep==1.122.0 +pydantic==2.11.6 +pydantic_core==2.33.2 +openai==1.88.0 +flask==3.0.3 +flask-cors==6.0.1 +pyfiglet==1.0.3 diff --git a/server/server.py b/server/server.py new file mode 100644 index 0000000..aa02dac --- /dev/null +++ b/server/server.py @@ -0,0 +1,119 @@ +import os +import json +from flask import Flask, request, jsonify, send_file +from flask_cors import CORS + +app = Flask(__name__) +CORS(app) + +class FlaskProcedure: + """ + A class to manage logging of pull requests and repository statuses. + + This class provides methods to read from, write to, and modify a JSON file + that acts as a simple log database. It is designed to separate data management + logic from the Flask routes for better modularity and testability. + + Attributes: + log_path (str): The absolute path to the log.json file. + """ + def __init__(self, log_path): + self.log_path = log_path + if not os.path.exists(self.log_path): + self.save_log({"prs": [], "repos": []}) + + def load_log(self): + with open(self.log_path, encoding='utf-8') as f: + return json.load(f) + + def save_log(self, data): + with open(self.log_path, 'w', encoding='utf-8') as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + def reset_log(self): + empty_log = {"prs": [], "repos": []} + self.save_log(empty_log) + + def is_same_repo_entry(self, existing, new): + return all( + existing.get(k) == new.get(k) + for k in [ + "name", "owner", "repo_url", + "vulnerabilities", "byClass", + "sastTool", "rerun", "analysis" + ] + ) + + def add_pr(self, new_pr): + data = self.load_log() + data.setdefault("prs", []).append(new_pr) + self.save_log(data) + return new_pr + + def add_repo_status(self, new_repo): + data = self.load_log() + repos = data.setdefault("repos", []) + data["repos"] = [r for r in repos if not self.is_same_repo_entry(r, new_repo)] + data["repos"].append(new_repo) + self.save_log(data) + return new_repo + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +LOG_PATH = os.path.join(BASE_DIR, 'log.json') +flask_manager = FlaskProcedure(LOG_PATH) + +@app.route('/log.json', methods=['GET']) +def get_log(): + return send_file(flask_manager.log_path) + +@app.route('/log.json', methods=['PUT']) +def put_log(): + data = request.get_json() + flask_manager.save_log(data) + return jsonify({"status": "ok"}) + +@app.route('/reset_log', methods=['POST']) +def reset_log(): + flask_manager.reset_log() + return jsonify({"status": "reset"}) + +@app.route('/add_pr', methods=['POST']) +def add_pr(): + new_pr = request.get_json() + added_pr = flask_manager.add_pr(new_pr) + return jsonify({"status": "added", "pr": added_pr}) + +@app.route('/add_repo_status', methods=['POST']) +def add_repo_status(): + new_repo = request.get_json() + added_repo = flask_manager.add_repo_status(new_repo) + return jsonify({"status": "added", "repo": added_repo}) + +@app.route('/update_approval', methods=['POST']) +def update_approval(): + data = request.get_json() + pr_number = data.get("pr_number") + approved = data.get("approved") + opened = data.get("opened") + repo_url = data.get("repo_url") + + if pr_number is None: + return jsonify({"error": "pr_number is required"}), 400 + + log_data = flask_manager.load_log() + updated = False + + for pr in log_data.get("prs", []): + if pr.get("pr_number") == pr_number and pr.get("repo_url") ==repo_url: + pr["approved"] = approved + pr["opened"] = opened + updated = True + + if updated: + flask_manager.save_log(log_data) + return jsonify({"status": "updated", "pr_number": pr_number}) + else: + return jsonify({"error": "PR not found"}), 404 + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5000) diff --git a/src/autofic_core/__init__.py b/src/autofic_core/__init__.py index c51ab4a..e69de29 100644 --- a/src/autofic_core/__init__.py +++ b/src/autofic_core/__init__.py @@ -1,2 +0,0 @@ -# 패키지 초기화 파일 -__version__ = "0.1.0" \ No newline at end of file diff --git a/src/autofic_core/app.py b/src/autofic_core/app.py new file mode 100644 index 0000000..932dc64 --- /dev/null +++ b/src/autofic_core/app.py @@ -0,0 +1,190 @@ +import os +import sys +import time +import traceback +from pathlib import Path +from rich.console import Console + +from autofic_core.errors import * +from autofic_core.utils.ui_utils import print_help_message, print_divider +from autofic_core.pipeline import AutoFiCPipeline +from autofic_core.log.log_writer import LogManager +from autofic_core.log.log_generator import LogGenerator + +from autofic_core.pr_auto.create_yml import AboutYml +from autofic_core.pr_auto.env_encrypt import EnvEncrypy +from autofic_core.pr_auto.pr_procedure import PRProcedure + +console = Console() + +class AutoFiCApp: + def __init__(self, explain, repo, save_dir, sast, llm, llm_retry, patch, pr): + self.explain = explain + self.repo = repo + self.save_dir = save_dir + self.sast = sast + self.llm = llm + self.llm_retry = llm_retry + self.patch = patch + self.pr = pr + self.log_manager = LogManager() + self.log_gen = LogGenerator() + + def run(self): + try: + self.validate_options() + if self.explain: + print_help_message() + return + + llm_flag = self.llm or self.llm_retry + pipeline = AutoFiCPipeline( + repo_url=self.repo, + save_dir=Path(self.save_dir), + sast=self.sast, + sast_tool=self.sast.lower() if self.sast else None, + llm=llm_flag, + llm_retry=self.llm_retry, + patch=self.patch, + pr=self.pr + ) + pipeline.run() + + if self.pr: + self.run_pr() + + except AutoficError as e: + console.print(str(e), style="red") + sys.exit(1) + + except Exception as e: + console.print(f"[ UNEXPECTED ERROR ] {str(e)}", style="red") + console.print(traceback.format_exc(), style="red") + sys.exit(1) + + def validate_options(self): + if not self.repo: + raise NoRepositoryError() + if not self.save_dir: + raise NoSaveDirError() + if self.llm and self.llm_retry: + raise LLMRetryOptionError() + if not self.sast and (self.llm or self.llm_retry): + raise LLMWithoutSastError() + if not (self.llm or self.llm_retry) and self.patch: + raise PatchWithoutLLMError() + if not self.patch and self.pr: + raise PRWithoutPatchError() + + def run_pr(self): + try: + print_divider("PR Automation Stage") + + pr_procedure = self.initialize_pr_procedure() + + console.print("[1] Initializing PR process & checking branches...\n", style="bold cyan") + time.sleep(0.5) + pr_procedure.post_init() + pr_procedure.mv_workdir() + pr_procedure.check_branch_exists() + + console.print("\n[2] Notifying webhooks...\n", style="bold cyan") + time.sleep(0.5) + self.notify_webhooks(pr_procedure) + + console.print("\n[3] Creating and pushing PR workflow YAML...\n", style="bold cyan") + time.sleep(0.5) + self.handle_pr_yml(pr_procedure) + + console.print("\n[4] Changing files for the pull request...\n", style="bold cyan") + time.sleep(0.5) + pr_procedure.change_files() + + console.print("\n[5] Updating branch and creating Pull Request...\n", style="bold cyan") + time.sleep(0.5) + pr_procedure.current_main_branch() + pr_procedure.generate_pr() + pr_number = pr_procedure.create_pr() + + console.print(f"\n[ SUCCESS ] Pull Request created successfully!\n", style="bold green") + time.sleep(0.5) + + self.finalize_logging(pr_procedure, pr_number) + + except Exception as e: + console.print(f"[ PR ERROR ] {e}", style="bold red") + console.print(traceback.format_exc(), style="red") + raise + + def initialize_pr_procedure(self): + base_branch = 'main' + save_dir = Path(self.save_dir).joinpath('repo') + repo_url = self.repo.rstrip('/').replace('.git', '') + json_path = str(Path(self.save_dir).joinpath("sast") / "before.json") + token = os.getenv('GITHUB_TOKEN') + user_name = os.getenv('USER_NAME') + tool = self.sast.lower() if self.sast else None + + return PRProcedure( + base_branch=base_branch, + repo_name="UNKNOWN", + upstream_owner="UNKNOWN", + save_dir=save_dir, + repo_url=repo_url, + token=token, + user_name=user_name, + json_path=json_path, + tool=tool + ) + + def notify_webhooks(self, pr_procedure): + secret_discord = os.getenv('DISCORD_WEBHOOK_URL') + secret_slack = os.getenv('SLACK_WEBHOOK_URL') + + user_name = pr_procedure.user_name + repo_name = pr_procedure.repo_name + token = pr_procedure.token + + EnvEncrypy(user_name, repo_name, token).webhook_secret_notifier('DISCORD_WEBHOOK_URL', secret_discord) + EnvEncrypy(user_name, repo_name, token).webhook_secret_notifier('SLACK_WEBHOOK_URL', secret_slack) + + def handle_pr_yml(self, pr_procedure): + user_name = pr_procedure.user_name + repo_name = pr_procedure.repo_name + token = pr_procedure.token + branch_name = pr_procedure.branch_name + + yml_handler = AboutYml() + yml_handler.create_pr_yml() + yml_handler.push_pr_yml(user_name, repo_name, token, branch_name) + + def finalize_logging(self, pr_procedure, pr_number): + if not pr_number: + return + + if pr_procedure.user_name == pr_procedure.upstream_owner: + return + + tool = self.sast.lower() if self.sast else None + repo_url = self.repo.rstrip('/').replace('.git', '') + + repo_data = self.log_gen.generate_repo_log( + save_dir=Path(self.save_dir), + name=pr_procedure.repo_name, + owner=pr_procedure.upstream_owner, + repo_url=repo_url, + sastTool=tool, + rerun=self.llm_retry + ) + + pr_log_data = self.log_gen.generate_pr_log( + owner=pr_procedure.upstream_owner, + repo=pr_procedure.repo_name, + user_name=pr_procedure.user_name, + repo_url=repo_url, + repo_hash=repo_data["repo_hash"], + pr_number=pr_number + ) + + self.log_manager.add_pr_log(pr_log_data) + self.log_manager.add_repo_status(repo_data) \ No newline at end of file diff --git a/src/autofic_core/ci_cd_auto/__init__.py b/src/autofic_core/ci_cd_auto/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/autofic_core/ci_cd_auto/ci_automation.py b/src/autofic_core/ci_cd_auto/ci_automation.py new file mode 100644 index 0000000..4f1cf81 --- /dev/null +++ b/src/autofic_core/ci_cd_auto/ci_automation.py @@ -0,0 +1,70 @@ +# Copyright 2025 Autofic Authors. All Rights Reserved. +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +"""Contains their functional aliases. +""" + +import subprocess +import os + +class Ci_Automate: + """ + Automates the execution of the Autofic static analysis tool on a list of repositories. + Each repository URL in REPO_URLS will be processed in sequence. + """ + + def __init__(self): + # List of repository URLs to process + self.REPO_URLS = [ + 'https://github.com/inyeongjang/corner4' + ] + self.save_dir = os.environ.get("RESULT_SAVE_DIR", os.path.abspath("result")) + + def run_autofic(self, repo_url): + print(f"\n[RUN] {repo_url}") + cmd = [ + 'python', '-m', 'autofic_core.cli', + '--repo', repo_url, + '--save-dir', self.save_dir, + '--sast', 'semgrep', + '--llm', '--patch', + '--pr' + ] + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + print(result.stdout) + print(result.stderr) + except subprocess.CalledProcessError as e: + print("=== CalledProcessError ===") + print("stdout:", e.stdout) + print("stderr:", e.stderr) + raise + + def main(self): + """ + Iterates over all repository URLs and runs the Autofic tool on each one. + If an exception occurs during processing, an error message is printed for that repository. + """ + for repo_url in self.REPO_URLS: + try: + self.run_autofic(repo_url) + except Exception as e: + print(f"[ERROR] {repo_url}: {e}") + +if __name__ == "__main__": + # Entry point: creates a Ci_Automate instance and starts the main automation process. + Ci_Automate().main() diff --git a/src/autofic_core/cli.py b/src/autofic_core/cli.py index 0d2aab0..aea77af 100644 --- a/src/autofic_core/cli.py +++ b/src/autofic_core/cli.py @@ -1,9 +1,35 @@ import click +from autofic_core.app import AutoFiCApp + +SAST_TOOL_CHOICES = ['semgrep', 'codeql', 'snykcode'] @click.command() -@click.option('--repo', help='GitHub repository URL') -def main(repo): - click.echo(f"Analyzing repo: {repo}") +@click.option('--explain', is_flag=True, help="Print AutoFiC usage guide.") +@click.option('--repo', required=False, help="Target GitHub repository URL to analyze (required).") +@click.option('--save-dir', required=False, default="artifacts/downloaded_repo", help="Directory to save analysis results.") +@click.option( + '--sast', + type=click.Choice(SAST_TOOL_CHOICES, case_sensitive=False), + required=False, + help='Select SAST tool to use (choose one of: semgrep, codeql, snykcode).' +) +@click.option('--llm', is_flag=True, help="Run LLM to fix vulnerable code and save responses.") +@click.option('--llm-retry', is_flag=True, help="Re-run LLM for final verification and fixes.") +@click.option('--patch', is_flag=True, help="Generate diffs and apply patches using git.") +@click.option('--pr', is_flag=True, help="Automatically create a pull request.") + +def main(explain, repo, save_dir, sast, llm, llm_retry, patch, pr): + app = AutoFiCApp( + explain=explain, + repo=repo, + save_dir=save_dir, + sast=sast, + llm=llm, + llm_retry=llm_retry, + patch=patch, + pr=pr + ) + app.run() -if __name__ == '__main__': +if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/autofic_core/download/__init__.py b/src/autofic_core/download/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/autofic_core/download/github_repo_handler.py b/src/autofic_core/download/github_repo_handler.py new file mode 100644 index 0000000..fc5fc36 --- /dev/null +++ b/src/autofic_core/download/github_repo_handler.py @@ -0,0 +1,120 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +import os +import requests +import subprocess +import shutil +from github import Github +from github.Repository import Repository +from urllib.parse import urlparse +from pydantic import BaseModel, Field, field_validator +from autofic_core.errors import GitHubTokenMissingError, RepoAccessError, RepoURLFormatError, ForkFailedError + +class GitHubRepoConfig(BaseModel): + repo_url: str + token: str = Field(default_factory=lambda: os.getenv("GITHUB_TOKEN")) + + @field_validator("token") + def validate_token(cls, v): + if not v: + raise GitHubTokenMissingError() + return v + + def get_owner_and_name(self) -> tuple[str, str]: + try: + path = urlparse(self.repo_url).path.strip("/") + owner, repo = path.split("/")[:2] + return owner, repo.removesuffix(".git") + except Exception: + raise RepoURLFormatError(self.repo_url) + + +class GitHubRepoHandler(): + def __init__(self, repo_url: str): + self.repo_url = repo_url + self.config = GitHubRepoConfig(repo_url=repo_url) + self.token = self.config.token + + if not self.token or self.token.strip() == "": + raise GitHubTokenMissingError() + + self.github = Github(self.token) + self._owner, self._name = self.config.get_owner_and_name() + + try: + self._current_user = self.github.get_user().login + except Exception as e: + raise GitHubTokenMissingError() + + self.needs_fork = self._owner != self._current_user # Determine whether you need a fork + + @staticmethod + def _parse_repo_url(url: str) -> tuple[str, str]: + try: + path = urlparse(url).path.strip("/") + owner, repo = path.split("/")[:2] + return owner, repo.removesuffix(".git") + except Exception: + raise RepoURLFormatError(url) + + def fetch_repo(self) -> Repository: + repo_name = f"{self._current_user}/{self._name}" + try: + return self.github.get_repo(repo_name) + except Exception as e: + raise RepoAccessError(f"{repo_name}: {e}") + + def fork(self) -> bool: + api_url = f"https://api.github.com/repos/{self._owner}/{self._name}/forks" + headers = { + "Authorization": f"token {self.token}", + "Accept": "application/vnd.github+json" + } + response = requests.post(api_url, headers=headers) + if response.status_code == 202: + return True + elif response.status_code == 401: + raise GitHubTokenMissingError("GITHUB_TOKEN is not set in the environment.") + elif response.status_code == 404: + raise RepoURLFormatError("Repository not found (404 Not Found).") + elif response.status_code == 403: + raise RepoAccessError("Access forbidden to the repository (403 Forbidden).") + elif response.status_code != 202: + raise ForkFailedError(response.status_code, response.text) + + + def clone_repo(self, save_dir: str, use_forked: bool = False) -> str: + save_dir = os.path.abspath(save_dir) # custom root directory + repo_path = os.path.join(save_dir, "repo") # Specify repo subfolder + + if os.path.exists(repo_path): + if os.path.isdir(repo_path): + shutil.rmtree(repo_path) + else: + raise ValueError(f"The specified path is not a directory : {repo_path}") + + clone_url = f"https://github.com/{self._current_user}/{self._name}.git" + + try: + subprocess.run(['git', 'clone', clone_url, repo_path], check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True) + except subprocess.CalledProcessError as e: + raise RepoAccessError(e) + + return repo_path \ No newline at end of file diff --git a/src/autofic_core/errors.py b/src/autofic_core/errors.py new file mode 100644 index 0000000..08ff31e --- /dev/null +++ b/src/autofic_core/errors.py @@ -0,0 +1,171 @@ +class AutoficError(Exception): + """Base class for all custom errors""" + pass + +# github_handler.py + +class GitHubTokenMissingError(AutoficError): + def __init__(self): + message = f"[ ERROR ] GitHub token is missing or invalid: Please ensure that the GITHUB_TOKEN environment variable is set correctly and contains a valid token." + super().__init__(message) + +class RepoURLFormatError(AutoficError): + def __init__(self, repo_url): + message = f"[ ERROR ] Invalid GitHub repository URL format: {repo_url}" + super().__init__(message) + +class RepoAccessError(AutoficError): + def __init__(self, original_error): + message = f"[ ERROR ] Cannot access repository: {original_error}" + super().__init__(message) + self.original_error = original_error + +class ForkFailedError(AutoficError): + def __init__(self, status_code, msg): + message = f"[ ERROR ] Failed to fork repository (HTTP {status_code}) - {msg}" + super().__init__(message) + +class AccessDeniedError(AutoficError): + def __init__(self, path, original_error): + message = ( + f"[ ERROR ] Access to the path '{path}' was denied. " + "Please close any applications or terminals using the directory and try again." + ) + super().__init__(message) + self.original_error = original_error + +# downloader.py + +class FileDownloadError(AutoficError): + def __init__(self, path, original_error): + message = f"{path} Failed to download file: {original_error}" + super().__init__(message) + +# semgrep_runner.py + +class SemgrepExecutionError(AutoficError): + def __init__(self, returncode, stdout=None, stderr=None): + self.returncode = returncode + self.stdout = stdout + self.stderr = stderr + message = f"Semgrep execution failed (return code:{returncode})" + super().__init__(message) + +# snykcode_runner.py + +class SnykCodeErrorMessages: + TOKEN_MISSING = "[ ERROR ] SNYK_TOKEN environment variable not set." + NO_JS_FILES_FOUND = "[ ERROR ] No JavaScript/TypeScript files found to analyze." + CLI_NOT_FOUND = "[ ERROR ] Unable to locate Snyk CLI. Please install or set SNYK_CMD_PATH." + +# prompt_generator.py + +class PromptGeneratorErrorCodes: + EMPTY_SNIPPET = "EMPTY_SNIPPET" + TEMPLATE_RENDER_ERROR = "TEMPLATE_RENDER_ERROR" + INVALID_SNIPPET_LIST = "INVALID_SNIPPET_LIST" + +class PromptGeneratorErrorMessages: + EMPTY_SNIPPET = "The provided code snippet is empty." + TEMPLATE_RENDER_ERROR = "An error occurred while rendering the prompt template." + INVALID_SNIPPET_LIST = "The input must be a list of SemgrepSnippet objects." + +class PromptGenerationException(AutoficError): + def __init__(self, code: str, message: str): + self.code = code + super().__init__(f"[ ERROR ] Prompt generation failed ({code}): {message}") + +# response_parser.py + +class ResponseParseError(AutoficError): + def __init__(self, filename: str, reason: str): + message = f"[ ERROR ] Failed to parse {filename}: {reason}" + super().__init__(message) + self.filename = filename + self.reason = reason + +# llm_runner.py + +class LLMExecutionError(AutoficError): + def __init__(self, message: str): + super().__init__(f"[ ERROR ] LLM execution failed: {message}") + self.message = message + +class CodeQLExecutionError(Exception): + """Raised when CodeQL execution fails.""" + def __init__(self): + super().__init__("[ERROR] CodeQL execution failed. Please check the log file for details.") + +# retry_prompt_generator.py + +class RetryPromptGenerationError(AutoficError): + def __init__(self, path: str, reason: str): + message = f"[ ERROR ] Failed to generate retry prompt for {path}: {reason}" + super().__init__(message) + self.path = path + self.reason = reason + +# diff_generator.py + +class DiffWarningMessages: + ORIGINAL_FILE_NOT_FOUND = "[ WARN ] Original file not found: {}" + NO_CHANGES_DETECTED = "[ WARN ] No changes detected in the file: {}" + +class DiffGenerationError(AutoficError): + def __init__(self, filename: str, reason: str): + message = f"[ ERROR ] Failed to generate diff: {filename} - {reason}" + super().__init__(message) + self.filename = filename + self.reason = reason + + +# apply_patch.py + +class PatchWarningMessages: + NO_DIFF_FILES = "[ WARN ] No .diff files found in {}" + PARSED_FILE_NOT_FOUND = "[ WARN ] Could not find matching file in parsed directory: {}" + RELATIVE_PATH_EXTRACTION_FAILED = "[ WARN ] Failed to extract relative path: {}" + ORIGINAL_FILE_MISSING = "[ WARN ] Original file does not exist: {}" + OVERWRITE_FILE_MISSING = "[ WARN ] Original file does not exist in repo: {}" + +class PatchErrorMessages: + PATCH_EXCEPTION = "[ ERROR ] Exception while applying {}: {}" + FALLBACK_DIFF_FAILED = "[ ERROR ] Failed to generate fallback diff: {}" + OVERWRITE_FAILED = "[ ERROR ] Failed to overwrite repo file: {}" + +class PatchFailMessages: + PATCH_FAILED = "[ FAIL ] Patch failed: {}" + FALLBACK_APPLY_FAILED = "[ FAIL ] Fallback diff failed: {}" + +# cli.py + +class NoRepositoryError(AutoficError): + def __init__(self): + message = f"[ ERROR ] The --repo option is required!" + super().__init__(message) + +class NoSaveDirError(AutoficError): + def __init__(self): + message = f"[ ERROR ] The --save-dir option is required" + super().__init__(message) + +class LLMRetryOptionError(AutoficError): + def __init__(self): + message = f"[ ERROR ] The --llm-retry option includes --llm automatically. Do not specify both!" + super().__init__(message) + +class LLMWithoutSastError(AutoficError): + def __init__(self): + message = f"[ ERROR ] The --llm or --llm-retry options cannot be used without --sast!" + super().__init__(message) + +class PatchWithoutLLMError(AutoficError): + def __init__(self): + message = f"[ ERROR ] The --patch option cannot be used without --llm or --llm-retry" + super().__init__(message) + +class PRWithoutPatchError(AutoficError): + def __init__(self): + message = f"[ ERROR ] The --pr option cannot be used without --patch!" + super().__init__(message) + diff --git a/src/autofic_core/llm/__init__.py b/src/autofic_core/llm/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/autofic_core/llm/llm_runner.py b/src/autofic_core/llm/llm_runner.py new file mode 100644 index 0000000..2fb734a --- /dev/null +++ b/src/autofic_core/llm/llm_runner.py @@ -0,0 +1,107 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +import os +import click +from pathlib import Path +from openai import OpenAI +from typing import Any +from dotenv import load_dotenv +from autofic_core.errors import LLMExecutionError +from autofic_core.sast.merger import merge_snippets_by_file +from autofic_core.llm.prompt_generator import PromptGenerator + +load_dotenv() + +client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) + +class LLMRunner: + """ + Run LLM with a given prompt. + """ + def __init__(self, model="gpt-4o"): + self.model = model + + def run(self, prompt: str) -> str: + """ + Run prompt and return response. + Raises: + LLMExecutionError: On OpenAI error + """ + try: + response = client.chat.completions.create( + model=self.model, + messages=[ + {"role": "system", "content": "You are a security code fixer."}, + {"role": "user", "content": prompt} + ], + temperature=0.3 + ) + return response.choices[0].message.content.strip() + except Exception as e: + raise LLMExecutionError(str(e)) + + +def save_md_response(content: str, prompt_obj: Any, output_dir: Path) -> str: + """ + Save response to a markdown file. + Returns: + Path: Saved file path + """ + output_dir.mkdir(parents=True, exist_ok=True) + + try: + path = Path(prompt_obj.snippet.path if hasattr(prompt_obj, "snippet") else prompt_obj.path) + except Exception as e: + raise RuntimeError(f"[ERROR] Failed to resolve output path: {e}") + + parts = [p for p in path.parts if p not in ("artifacts", "downloaded_repo")] + flat_path = "_".join(parts) + output_path = output_dir / f"response_{flat_path}.md" + + output_path.write_text(content, encoding="utf-8") + return output_path + + +def run_llm_for_semgrep_results( + semgrep_json_path: str, + output_dir: Path, + tool: str = "semgrep", + model: str = "gpt-4o", +) -> None: + """ + Run LLM for all prompts from a SAST result. + """ + if tool == "semgrep": + from autofic_core.sast.semgrep.preprocessor import SemgrepPreprocessor as Preprocessor + elif tool == "codeql": + from autofic_core.sast.codeql.preprocessor import CodeQLPreprocessor as Preprocessor + elif tool == "snykcode": + from autofic_core.sast.snykcode.preprocessor import SnykCodePreprocessor as Preprocessor + else: + raise ValueError(f"Unsupported SAST tool: {tool}") + + raw_snippets = Preprocessor.preprocess(semgrep_json_path) + merged_snippets = merge_snippets_by_file(raw_snippets) + prompts = PromptGenerator().generate_prompts(merged_snippets) + runner = LLMRunner(model=model) + + for prompt in prompts: + try: + result = runner.run(prompt.prompt) + save_md_response(result, prompt, output_dir) + except LLMExecutionError: + continue \ No newline at end of file diff --git a/src/autofic_core/llm/prompt_generator.py b/src/autofic_core/llm/prompt_generator.py new file mode 100644 index 0000000..c9125bd --- /dev/null +++ b/src/autofic_core/llm/prompt_generator.py @@ -0,0 +1,126 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +from typing import List +from pydantic import BaseModel +from autofic_core.sast.snippet import BaseSnippet +from autofic_core.errors import ( + PromptGenerationException, + PromptGeneratorErrorCodes, + PromptGeneratorErrorMessages, +) + + +class PromptTemplate(BaseModel): + title: str + content: str + + def render(self, file_snippet: BaseSnippet) -> str: + """Render a prompt based on the provided code snippet.""" + if not file_snippet.input.strip(): + raise PromptGenerationException( + PromptGeneratorErrorCodes.EMPTY_SNIPPET, + PromptGeneratorErrorMessages.EMPTY_SNIPPET, + ) + + vulnerabilities_str = ( + f"Type: {', '.join(file_snippet.vulnerability_class) or 'Unknown'}\n" + f"CWE: {', '.join(file_snippet.cwe) or 'N/A'}\n" + f"Description: {file_snippet.message or 'None'}\n" + f"Severity: {file_snippet.severity or 'Unknown'}\n" + f"Location: {file_snippet.start_line} ~ {file_snippet.end_line} (Only modify this code range)\n\n" + ) + + try: + return self.content.format( + input=file_snippet.input, + vulnerabilities=vulnerabilities_str, + ) + except Exception: + raise PromptGenerationException( + PromptGeneratorErrorCodes.TEMPLATE_RENDER_ERROR, + PromptGeneratorErrorMessages.TEMPLATE_RENDER_ERROR, + ) + + +class GeneratedPrompt(BaseModel): + title: str + prompt: str + snippet: BaseSnippet + + +class PromptGenerator: + def __init__(self): + self.template = PromptTemplate( + title="Refactoring Vulnerable Code Snippet (File Level)", + content=( + "The following is a JavaScript source file that contains security vulnerabilities.\n\n" + "```javascript\n" + "{input}\n" + "```\n\n" + "Detected vulnerabilities:\n\n" + "{vulnerabilities}" + "Please strictly follow the guidelines below when modifying the cozde:\n" + "- Modify **only the vulnerable parts** of the file with **minimal changes**.\n" + "- Preserve the **original line numbers, indentation, and code formatting** exactly.\n" + "- **Do not modify any part of the file that is unrelated to the vulnerabilities.**\n" + "- Output the **entire file**, not just the changed lines.\n" + "- This code will be used for diff-based automatic patching, so structural changes may cause the patch to fail.\n\n" + "Output format example:\n" + "1. Vulnerability Description: ...\n" + "2. Potential Risk: ...\n" + "3. Recommended Fix: ...\n" + "4. Final Modified Code:\n" + "```javascript\n" + "// Entire file content, but only vulnerable parts should be modified minimally\n" + "...entire code...\n" + "```\n" + "5. Additional Notes: (optional)\n" + ), + ) + + def generate_prompt(self, file_snippet: BaseSnippet) -> GeneratedPrompt: + """Generate a single prompt from one code snippet.""" + if not isinstance(file_snippet, BaseSnippet): + raise TypeError(f"[ ERROR ] generate_prompt: Invalid input type: {type(file_snippet)}") + rendered_prompt = self.template.render(file_snippet) + return GeneratedPrompt( + title=self.template.title, + prompt=rendered_prompt, + snippet=file_snippet, + ) + + def generate_prompts(self, file_snippets: List[BaseSnippet]) -> List[GeneratedPrompt]: + """Generate prompts from multiple snippets.""" + prompts = [] + for idx, snippet in enumerate(file_snippets): + if isinstance(snippet, dict): + snippet = BaseSnippet(**snippet) + elif not isinstance(snippet, BaseSnippet): + raise TypeError(f"[ ERROR ] generate_prompts: Invalid type at index {idx}: {type(snippet)}") + prompts.append(self.generate_prompt(snippet)) + return prompts + + def get_unique_file_paths(self, file_snippets: List[BaseSnippet]) -> List[str]: + """Extract unique paths from list of snippets.""" + paths = set() + for idx, snippet in enumerate(file_snippets): + if isinstance(snippet, dict): + snippet = BaseSnippet(**snippet) + elif not isinstance(snippet, BaseSnippet): + raise TypeError(f"[ ERROR ] get_unique_file_paths: Type error at index {idx}: {type(snippet)}") + paths.add(snippet.path) + return sorted(paths) \ No newline at end of file diff --git a/src/autofic_core/llm/response_parser.py b/src/autofic_core/llm/response_parser.py new file mode 100644 index 0000000..98ba035 --- /dev/null +++ b/src/autofic_core/llm/response_parser.py @@ -0,0 +1,109 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +import re +from pathlib import Path +from typing import List +from pydantic import BaseModel +from autofic_core.errors import ResponseParseError + +CODE_BLOCK_PATTERN = re.compile(r"```(?:js|javascript)\n([\s\S]+?)```", re.IGNORECASE | re.MULTILINE) + + +class ParsedResponse(BaseModel): + """Structured representation of parsed response.""" + filename: str + code: str + output_path: Path + + +def extract_code_blocks(content: str, filename: str) -> str: + """ + Extract JavaScript code blocks from markdown content. + Raises: + ResponseParseError: if code block not found + """ + matches = CODE_BLOCK_PATTERN.findall(content) + if not matches: + raise ResponseParseError(filename, "js/javascript 코드 블럭을 찾을 수 없습니다.") + return "\n\n".join(m.strip() for m in matches) + + +def parse_md_filename(md_filename: str) -> str: + """ + Convert response_*.md filename into relative path. + Raises: + ResponseParseError: if filename format is invalid + """ + stem = Path(md_filename).stem + if not stem.startswith("response_"): + raise ResponseParseError(md_filename, "잘못된 파일명 형식") + return stem[len("response_"):].replace("_", "/") + + +def parse_response(md_path: Path, output_dir: Path) -> ParsedResponse: + """ + Extract code from md file and return parsed result as model. + Raises: + ResponseParseError: on any parsing or I/O failure + """ + try: + content = md_path.read_text(encoding="utf-8") + code = extract_code_blocks(content, md_path.name) + rel_path = parse_md_filename(md_path.name) + output_path = output_dir / rel_path + return ParsedResponse( + filename=md_path.name, + code=code, + output_path=output_path + ) + except Exception as e: + raise ResponseParseError(md_path.name, str(e)) + + +def save_code_file(response: ParsedResponse) -> None: + """ + Save parsed code to target output path. + """ + response.output_path.parent.mkdir(parents=True, exist_ok=True) + response.output_path.write_text(response.code, encoding="utf-8") + + +class ResponseParser: + """ + Extract and save code blocks from response_*.md files. + """ + + def __init__(self, md_dir: Path, diff_dir: Path): + self.md_dir = md_dir + self.diff_dir = diff_dir + + def extract_and_save_all(self) -> bool: + """ + Extract all responses and save parsed code. + Returns: + bool: True if at least one file succeeded + """ + md_files = list(self.md_dir.glob("*.md")) + success = False + for md_file in md_files: + try: + parsed = parse_response(md_file, self.diff_dir) + save_code_file(parsed) + success = True + except ResponseParseError: + continue + return success diff --git a/src/autofic_core/llm/retry_prompt_generator.py b/src/autofic_core/llm/retry_prompt_generator.py new file mode 100644 index 0000000..e96aed6 --- /dev/null +++ b/src/autofic_core/llm/retry_prompt_generator.py @@ -0,0 +1,97 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +from typing import List +from pathlib import Path +from pydantic import BaseModel +from autofic_core.errors import RetryPromptGenerationError + + +class RetryPromptTemplate(BaseModel): + """Template structure for retry prompts.""" + title: str + content: str + + +class GeneratedRetryPrompt(BaseModel): + """Result of applying template to a parsed file.""" + title: str + prompt: str + path: str + + +class RetryPromptGenerator: + """ + Generates retry prompts from already-patched (parsed) files. + These prompts are used for re-validating the code via LLM. + """ + + def __init__(self, parsed_dir: Path): + self.parsed_dir = parsed_dir + self.template = RetryPromptTemplate( + title="Post-patch File Verification (LLM Re-analysis)", + content=( + "The following is a JavaScript source file. Please identify and fix any security vulnerabilities.\n\n" + "```javascript\n" + "{input}\n" + "```\n\n" + "Please strictly follow the guidelines below when modifying the code:\n" + "- Modify **only the vulnerable parts** of the file with **minimal changes**.\n" + "- Preserve the **original line numbers, indentation, and code formatting** exactly.\n" + "- **Do not modify any part of the file that is unrelated to the vulnerabilities.**\n" + "- Output the **entire file**, not just the changed lines.\n" + "- This code will be used for diff-based automatic patching, so structural changes may cause the patch to fail.\n\n" + "Output format example:\n" + "1. Vulnerability Description: ...\n" + "2. Potential Risk: ...\n" + "3. Recommended Fix: ...\n" + "4. Final Modified Code:\n" + "```javascript\n" + "// Entire file content, but only vulnerable parts should be modified minimally\n" + "...entire code...\n" + "```\n" + "5. Additional Notes: (optional)\n" + ), + ) + + def generate_prompt(self, file_path: Path) -> GeneratedRetryPrompt: + """Generate a single prompt from a file.""" + try: + code = file_path.read_text(encoding="utf-8") + except Exception as e: + raise RetryPromptGenerationError(str(file_path), str(e)) + + rendered = self.template.content.format(input=code) + return GeneratedRetryPrompt( + title=self.template.title, + prompt=rendered, + path=str(file_path.relative_to(self.parsed_dir)), + ) + + def generate_prompts(self) -> List[GeneratedRetryPrompt]: + """Generate prompts from all .parsed files in parsed_dir.""" + parsed_files = sorted(self.parsed_dir.glob("*.parsed")) + return [self.generate_prompt(f) for f in parsed_files] + + def get_unique_file_paths(self, prompts: List[GeneratedRetryPrompt]) -> List[str]: + """Return deduplicated, sorted list of relative paths from prompts.""" + seen = set() + result = [] + for p in prompts: + if p.path not in seen: + seen.add(p.path) + result.append(p.path) + return sorted(result) diff --git a/src/autofic_core/log/check_approval.py b/src/autofic_core/log/check_approval.py new file mode 100644 index 0000000..4738e71 --- /dev/null +++ b/src/autofic_core/log/check_approval.py @@ -0,0 +1,58 @@ +import os +import requests +from dotenv import load_dotenv + +load_dotenv() + +log_url = os.getenv("LOG_API_URL") +token = os.getenv("GITHUB_TOKEN") + +class GitHubChecker: + def __init__(self, token): + self.token = token + + def is_approved(self, owner, repo, pr_number): + url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}" + headers = { + "Authorization": f"token {self.token}", + "Accept": "application/vnd.github+json" + } + resp = requests.get(url, headers=headers) + if resp.status_code == 200: + return resp.json().get("merged", False) + return False + + def is_opened(self, owner, repo, pr_number): + url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}" + headers = { + "Authorization": f"token {self.token}", + "Accept": "application/vnd.github+json" + } + resp = requests.get(url, headers=headers) + if resp.status_code == 200: + return resp.json().get("state") == "open" + return False + +if __name__ == "__main__": + log_data = requests.get(f"{log_url}/log.json").json() + checker = GitHubChecker(token) + + for pr in log_data.get("prs", []): + pr_number = pr.get("pr_number") + owner = pr.get("owner") + repo = pr.get("repo") + repo_url = pr.get("repo_url") + + print(f"[INFO] Checking PR #{pr_number} for {owner}/{repo}...") + + approved = checker.is_approved(owner, repo, pr_number) + opened = checker.is_opened(owner, repo, pr_number) + + print(f" → approved: {approved}, opened: {opened}") + + requests.post(f"{log_url}/update_approval", json={ + "pr_number": pr_number, + "approved": approved, + "opened": opened, + "repo_url": repo_url + }) \ No newline at end of file diff --git a/src/autofic_core/log/log_generator.py b/src/autofic_core/log/log_generator.py new file mode 100644 index 0000000..a101095 --- /dev/null +++ b/src/autofic_core/log/log_generator.py @@ -0,0 +1,185 @@ +import datetime +import os +import json +import re +import hashlib +from pathlib import Path +from collections import Counter, defaultdict +from autofic_core.sast.semgrep.preprocessor import SemgrepPreprocessor +from autofic_core.sast.codeql.preprocessor import CodeQLPreprocessor +from autofic_core.sast.snykcode.preprocessor import SnykCodePreprocessor +from autofic_core.sast.snippet import BaseSnippet + +class LogGenerator: + def __init__(self, default_options=None): + self.default_options = default_options or {} + + def generate_pr_log(self, owner, repo, user_name, repo_url, repo_hash, pr_number): + today = datetime.datetime.now().isoformat() + return { + "date": today, + "owner": owner, + "repo": repo, + "user_name":user_name, + "repo_url": repo_url, + "repo_hash": repo_hash, + "pr_number": pr_number, + "opened": True, + "approved": False + } + + def generate_repo_log(self, save_dir, name, owner, repo_url, sastTool, rerun=False): + before_json = Path(save_dir) / 'sast' / 'before.json' + vulnerabilities = 0 + byClass = [] + + if sastTool == "semgrep": + snippets = SemgrepPreprocessor.preprocess(before_json) + elif sastTool == "codeql": + snippets = CodeQLPreprocessor.preprocess(before_json) + elif sastTool == "snykcode": + snippets = SnykCodePreprocessor.preprocess(before_json) + else: + raise ValueError(f"Unknown tool: {sastTool}") + + byClass_counter = Counter() + for item in snippets: + if item.vulnerability_class: + byClass_counter[item.vulnerability_class[0]] += 1 + byClass = [{"type": k, "count": v} for k, v in byClass_counter.items()] + vulnerabilities = sum(byClass_counter.values()) + + analysis_lines = [] + analysis_lines.append("🔧 Security Patch Summary\n") + analysis_lines.append(f"- SAST Tool: {sastTool.capitalize()}") + analysis_lines.append(f"- Total vulnerabilities Detected: {vulnerabilities}\n") + + if vulnerabilities > 0: + analysis_lines.append("| Type | Count |") + analysis_lines.append("|------|-------|") + for entry in byClass: + analysis_lines.append(f"| {entry['type']} | {entry['count']} |") + + # SAST Summary + analysis_lines.append("📁 File-by-File Summary\n") + + grouped_by_file = defaultdict(list) + repo_dir = Path(save_dir) / "repo" + + for item in snippets: + filename = os.path.relpath(item.path, repo_dir).replace("\\", "/") + grouped_by_file[filename].append(item) + + file_idx = 1 + for filename, items in grouped_by_file.items(): + analysis_lines.append(f"\n### {file_idx}. `{filename}`") + analysis_lines.append("🔏 SAST Analysis Summary") + + has_cwe = any(item.cwe for item in items) + has_ref = any(item.references for item in items) + + header = ["Line", "Type", "Level"] + if has_cwe: + header.append("CWE") + if has_ref: + header.append("Ref") + + analysis_lines.append("| " + " | ".join(header) + " |") + analysis_lines.append("|" + "|".join(["-" * len(h) for h in header]) + "|") + + for item in items: + line = str(item.start_line) if item.start_line == item.end_line else f"{item.start_line}~{item.end_line}" + vuln = item.vulnerability_class[0] if item.vulnerability_class else "N/A" + level = item.severity.upper() if item.severity else "N/A" + emoji = { + "ERROR": "🛑 ERROR", + "WARNING": "⚠️ WARNING", + "NOTE": "💡 NOTE" + }.get(level, level) + + row = [line, vuln, emoji] + + if has_cwe: + cwe = item.cwe[0].split(":")[0] if item.cwe else "N/A" + row.append(cwe) + if has_ref: + ref = item.references[0] if item.references else "" + ref = f"[🔗]({ref})" if ref else "" + row.append(ref) + + analysis_lines.append("| " + " | ".join(row) + " |") + + # LLM Summary + llm_dir = Path(save_dir) / 'llm' + analysis_lines.append("\n 🤖 LLM Analysis Summary") + + base_filename = filename.replace("/", "_") + llm_file = llm_dir / f"response_{base_filename}.md" + if llm_file.exists(): + with open(llm_file, encoding="utf-8") as f: + content = f.read().strip() + if not content: + continue + + file_name = llm_file.name.replace("response_", "").replace(".md", "").replace("_", "/") + analysis_lines.append(f"\n### 📄 `{file_name}`") + + parsed = self.parse_llm_response(content) + if parsed["Vulnerability"]: + analysis_lines.append("#### 🔸 Vulnerability Description") + analysis_lines.append(parsed["Vulnerability"]) + if parsed["Recommended Fix"]: + analysis_lines.append("#### 🔸 Recommended Fix") + analysis_lines.append(parsed["Recommended Fix"]) + if parsed["References"]: + analysis_lines.append("#### 🔸 Additional Notes") + analysis_lines.append(parsed["References"]) + + file_idx += 1 + + analysis_text = "\n".join(analysis_lines) + + repo_dict = { + "name": name, + "owner": owner, + "repo_url": repo_url, + "vulnerabilities": vulnerabilities, + "byClass": byClass, + "analysis": analysis_text, + "sastTool": sastTool, + "rerun": rerun + } + repo_dict["repo_hash"] = self.get_repo_hash(repo_dict) + return repo_dict + + def get_repo_hash(self, repo_dict): + keys_to_include = ["repo_url", "sastTool", "rerun", "vulnerabilities", "byClass", "analysis"] + filtered = {k: repo_dict.get(k) for k in keys_to_include} + hash_input = json.dumps(filtered, sort_keys=True) + return hashlib.sha256(hash_input.encode()).hexdigest() + + def parse_llm_response(self, content: str) -> dict: + sections = { + "Vulnerability": "", + "Risks": "", + "Recommended Fix": "", + "References": "" + } + + pattern = re.compile( + r"1\. Vulnerability Description\s*[::]?\s*(.*?)\s*" + r"2\. Potential Risk\s*[::]?\s*(.*?)\s*" + r"3\. Recommended Fix\s*[::]?\s*(.*?)\s*" + r"(?:4\. Final Modified Code.*?\s*)?" + r"5\. Additional Notes\s*[::]?\s*(.*)", + re.DOTALL + ) + + match = pattern.search(content) + if match: + sections["Vulnerability"] = match.group(1).strip() + sections["Risks"] = match.group(2).strip() + sections["Recommended Fix"] = match.group(3).strip() + sections["References"] = match.group(4).strip() + + return sections \ No newline at end of file diff --git a/src/autofic_core/log/log_writer.py b/src/autofic_core/log/log_writer.py new file mode 100644 index 0000000..c2710be --- /dev/null +++ b/src/autofic_core/log/log_writer.py @@ -0,0 +1,34 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +import requests +import os + +class LogManager: + def __init__(self): + self.api_base_url = (os.getenv('LOG_API_URL')).rstrip('/') + + def add_pr_log(self, pr_data): + url = f"{self.api_base_url}/add_pr" + response = requests.post(url, json=pr_data) + response.raise_for_status() + return response.json() + + def add_repo_status(self, repo_data): + url = f"{self.api_base_url}/add_repo_status" + response = requests.post(url, json=repo_data) + response.raise_for_status() + return response.json() diff --git a/src/autofic_core/patch/__init__.py b/src/autofic_core/patch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/autofic_core/patch/apply_patch.py b/src/autofic_core/patch/apply_patch.py new file mode 100644 index 0000000..b03c9d2 --- /dev/null +++ b/src/autofic_core/patch/apply_patch.py @@ -0,0 +1,168 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +import subprocess +from pathlib import Path +import shutil +from rich.console import Console +from autofic_core.errors import PatchWarningMessages, PatchErrorMessages, PatchFailMessages + +class PatchApplier: + def __init__( + self, + patch_dir: Path, + repo_dir: Path, + parsed_dir: Path = None, + fallback_dir: Path = None, + ): + self.patch_dir = Path(patch_dir) + self.repo_dir = Path(repo_dir) + self.parsed_dir = Path(parsed_dir) if parsed_dir else self.patch_dir.parent / "parsed" + self.fallback_dir = Path(fallback_dir) if fallback_dir else self.patch_dir / "fallbacks" + self.fallback_dir.mkdir(exist_ok=True, parents=True) + self.console = Console() + + def apply_all(self) -> bool: + patch_files = sorted(self.patch_dir.glob("*.diff")) + + if not patch_files: + self.console.print(f"[yellow][ WARN ] No .diff files found in {self.patch_dir}[/yellow]") + return False + + failed_patches = [] + + for patch_file in patch_files: + success = self.apply_single(patch_file) + if not success: + failed_patches.append(patch_file) + + if failed_patches: + self.console.print(f"[cyan][ INFO ] {len(failed_patches)} patches failed → trying overwrite from parsed (see logs)[/cyan]\n") + for patch_file in failed_patches: + self.overwrite_with_parsed(patch_file) + + return True + + def apply_single(self, patch_file: Path) -> bool: + try: + result = subprocess.run( + ["git", "apply", str(patch_file)], + cwd=self.repo_dir, + capture_output=True, + text=True, + ) + + if result.returncode == 0: + self.console.print(f"[white][✓] Patch applied: {patch_file.name}[/white]\n") + return True + else: + self.console.print(PatchFailMessages.PATCH_FAILED.format(patch_file.name), style="yellow") + self.console.print(result.stderr, style="yellow") + return False + + except Exception as e: + self.console.print(PatchErrorMessages.PATCH_EXCEPTION.format(patch_file.name, e), style="red") + return False + + def parsed_diff_apply(self, patch_file: Path) -> bool: + stem = patch_file.stem.replace("patch_", "") + + matched_file = None + for file in self.parsed_dir.rglob("*.*"): + if file.stem == stem: + matched_file = file + break + + if not matched_file: + self.console.print(PatchWarningMessages.PARSED_FILE_NOT_FOUND.format(stem), style="yellow") + return False + + try: + relative_path = matched_file.relative_to(self.parsed_dir) + except ValueError: + self.console.print(PatchWarningMessages.RELATIVE_PATH_EXTRACTION_FAILED.format(matched_file), style="yellow") + return False + + original_file = self.repo_dir / relative_path + parsed_file = self.parsed_dir / relative_path + + if not original_file.exists(): + self.console.print(PatchWarningMessages.ORIGINAL_FILE_MISSING.format(original_file), style="yellow") + return False + + fallback_diff = self.fallback_dir / f"parsed_{relative_path.with_suffix('.diff').name}" + + try: + with open(fallback_diff, "w", encoding="utf-8") as f: + subprocess.run( + ["git", "diff", "--no-index", str(original_file), str(parsed_file)], + check=True, + text=True, + stdout=f, + stderr=subprocess.DEVNULL, + ) + + result = subprocess.run( + ["git", "apply", str(fallback_diff)], + cwd=self.repo_dir, + capture_output=True, + text=True, + ) + + if result.returncode == 0: + self.console.print(f"[white][✓] Fallback diff applied: {fallback_diff.name}[/white]") + return True + else: + self.console.print(PatchFailMessages.FALLBACK_APPLY_FAILED.format(fallback_diff.name), style="red") + self.console.print(result.stderr, style="red") + return False + + except Exception as e: + self.console.print(PatchErrorMessages.FALLBACK_DIFF_FAILED.format(e), style="red") + return False + + def overwrite_with_parsed(self, patch_file: Path) -> bool: + stem = patch_file.stem.replace("patch_", "") + + matched_file = None + for file in self.parsed_dir.rglob("*.*"): + if file.stem == stem: + matched_file = file + break + + if not matched_file: + self.console.print(PatchWarningMessages.PARSED_FILE_NOT_FOUND.format(stem), style="yellow") + return False + + try: + relative_path = matched_file.relative_to(self.parsed_dir) + except ValueError: + self.console.print(PatchWarningMessages.RELATIVE_PATH_EXTRACTION_FAILED.format(matched_file), style="yellow") + return False + + repo_file = self.repo_dir / relative_path + + if not repo_file.exists(): + self.console.print(PatchWarningMessages.OVERWRITE_FILE_MISSING.format(repo_file), style="yellow") + return False + + try: + shutil.copyfile(matched_file, repo_file) + self.console.print(f"[white][✓] Overwrote repo file: {repo_file}[/white]\n") + return True + except Exception as e: + self.console.print(PatchErrorMessages.OVERWRITE_FAILED.format(e), style="red") + return False \ No newline at end of file diff --git a/src/autofic_core/patch/diff_generator.py b/src/autofic_core/patch/diff_generator.py new file mode 100644 index 0000000..e3adc51 --- /dev/null +++ b/src/autofic_core/patch/diff_generator.py @@ -0,0 +1,74 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +import difflib +from pathlib import Path +from autofic_core.errors import DiffGenerationError, DiffWarningMessages +from rich.console import Console + +class DiffGenerator: + def __init__(self, repo_dir: Path, parsed_dir: Path, patch_dir: Path): + self.repo_dir = repo_dir + self.parsed_dir = parsed_dir + self.patch_dir = patch_dir + self.console = Console() + + def generate_diff(self, original: str, modified: str, rel_path: Path) -> str: + from_path = f"a/{rel_path.as_posix()}" + to_path = f"b/{rel_path.as_posix()}" + + original_lines = [line.rstrip() + '\n' for line in original.splitlines()] + modified_lines = [line.rstrip() + '\n' for line in modified.splitlines()] + + return ''.join(difflib.unified_diff( + original_lines, modified_lines, + fromfile=from_path, + tofile=to_path, + lineterm="\n" + )) + + def run(self): + self.patch_dir.mkdir(parents=True, exist_ok=True) + parsed_files = list(self.parsed_dir.rglob("*.*")) + + for parsed_file in parsed_files: + if parsed_file.is_dir(): + continue + + try: + rel_path = parsed_file.relative_to(self.parsed_dir) + original_file = self.repo_dir / rel_path + + if not original_file.exists(): + self.console.print(DiffWarningMessages.ORIGINAL_FILE_NOT_FOUND.format(original_file), style="yellow") + continue + + original_code = original_file.read_text(encoding="utf-8") + modified_code = parsed_file.read_text(encoding="utf-8") + diff_text = self.generate_diff(original_code, modified_code, rel_path) + + if diff_text.strip(): + diff_name = rel_path.with_suffix('.diff').name + diff_path = self.patch_dir / diff_name + with open(diff_path, "w", encoding="utf-8", newline="\n") as f: + f.write(diff_text) + else: + self.console.print(DiffWarningMessages.NO_CHANGES_DETECTED.format(parsed_file.name), style="yellow") + + except Exception as e: + raise DiffGenerationError(parsed_file.name, str(e)) + + self.console.print("[ SUCCESS ] Diff files generated\n", style="bold green") \ No newline at end of file diff --git a/src/autofic_core/pipeline.py b/src/autofic_core/pipeline.py new file mode 100644 index 0000000..0b9e0d6 --- /dev/null +++ b/src/autofic_core/pipeline.py @@ -0,0 +1,461 @@ +import os +import sys +import json +import time +from pathlib import Path + +from dotenv import load_dotenv +from rich.console import Console +from pyfiglet import Figlet + +from autofic_core.errors import * +from autofic_core.utils.ui_utils import print_divider, print_summary +from autofic_core.utils.progress_utils import create_progress + +from autofic_core.download.github_repo_handler import GitHubRepoHandler + +from autofic_core.sast.snippet import BaseSnippet +from autofic_core.sast.semgrep.runner import SemgrepRunner +from autofic_core.sast.semgrep.preprocessor import SemgrepPreprocessor +from autofic_core.sast.codeql.runner import CodeQLRunner +from autofic_core.sast.codeql.preprocessor import CodeQLPreprocessor +from autofic_core.sast.snykcode.runner import SnykCodeRunner +from autofic_core.sast.snykcode.preprocessor import SnykCodePreprocessor +from autofic_core.sast.merger import merge_snippets_by_file + +from autofic_core.llm.prompt_generator import PromptGenerator +from autofic_core.llm.llm_runner import LLMRunner, save_md_response +from autofic_core.llm.retry_prompt_generator import RetryPromptGenerator +from autofic_core.llm.response_parser import ResponseParser +from autofic_core.patch.apply_patch import PatchApplier + +load_dotenv() +console = Console() + +f = Figlet(font="slant") +ascii_art = f.renderText("AutoFiC") +console.print(f"\n\n\n[magenta]{ascii_art}[/magenta]") + + +class RepositoryManager: + def __init__(self, repo_url: str, save_dir: Path): + self.repo_url = repo_url + self.save_dir = save_dir + self.clone_path = None + try: + self.handler = GitHubRepoHandler(repo_url=self.repo_url) + except (GitHubTokenMissingError, RepoURLFormatError): + raise + + def clone(self): + print_divider("Repository Cloning Stage") + + try: + if self.handler.needs_fork: + console.print("Attempting to fork the repository...\n", style="cyan") + self.handler.fork() + time.sleep(1) + console.print("[ SUCCESS ] Fork completed\n", style="bold green") + + self.clone_path = Path( + self.handler.clone_repo(save_dir=str(self.save_dir), use_forked=self.handler.needs_fork)) + console.print(f"[ SUCCESS ] Repository cloned successfully: {self.clone_path}", style="bold green") + + except ForkFailedError as e: + sys.exit(1) + + except RepoAccessError as e: + raise + + except (PermissionError, OSError) as e: + raise AccessDeniedError(path=str(self.save_dir), original_error=e) + + +class SemgrepHandler: + def __init__(self, repo_path: Path, save_dir: Path): + self.repo_path = repo_path + self.save_dir = save_dir + + def run(self): + description = "Running Semgrep...".ljust(28) + with create_progress() as progress: + task = progress.add_task(description, total=100) + + start = time.time() + runner = SemgrepRunner(repo_path=str(self.repo_path), rule="p/javascript") + result = runner.run_semgrep() + end = time.time() + + duration = max(end - start, 0.1) + step = duration / 100 + for _ in range(100): + progress.update(task, advance=1) + time.sleep(step) + progress.update(task, completed=100) + + if result.returncode != 0: + raise RuntimeError("Semgrep execution failed") + + return self._post_process(json.loads(result.stdout)) + + def _post_process(self, data): + sast_dir = self.save_dir / "sast" + sast_dir.mkdir(parents=True, exist_ok=True) + before_path = sast_dir / "before.json" + SemgrepPreprocessor.save_json_file(data, before_path) + snippets = SemgrepPreprocessor.preprocess(str(before_path), str(self.repo_path)) + merged = merge_snippets_by_file(snippets) + merged_path = sast_dir / "merged_snippets.json" + with open(merged_path, "w", encoding="utf-8") as f: + json.dump([s.model_dump() for s in merged], f, indent=2, ensure_ascii=False) + + if not merged: + console.print("\n[ INFO ] No vulnerabilities found.\n", style="yellow") + console.print( + "AutoFiC automation has been halted.--llm, --patch, and --pr stages will not be executed.\n", + style="yellow") + return None + + return merged_path + + +class CodeQLHandler: + def __init__(self, repo_path: Path, save_dir: Path): + self.repo_path = repo_path + self.save_dir = save_dir + + def run(self): + description = "Running CodeQL...".ljust(28) + with create_progress() as progress: + task = progress.add_task(description, total=100) + + start = time.time() + runner = CodeQLRunner(repo_path=str(self.repo_path)) + result_path = runner.run_codeql() + end = time.time() + + duration = max(end - start, 0.1) + step = duration / 100 + for _ in range(100): + progress.update(task, advance=1) + time.sleep(step) + progress.update(task, completed=100) + + with open(result_path, "r", encoding="utf-8") as f: + data = json.load(f) + return self._post_process(data) + + def _post_process(self, data): + sast_dir = self.save_dir / "sast" + sast_dir.mkdir(parents=True, exist_ok=True) + before_path = sast_dir / "before.json" + CodeQLPreprocessor.save_json_file(data, before_path) + snippets = CodeQLPreprocessor.preprocess(str(before_path), str(self.repo_path)) + merged = merge_snippets_by_file(snippets) + merged_path = sast_dir / "merged_snippets.json" + with open(merged_path, "w", encoding="utf-8") as f: + json.dump([s.model_dump() for s in merged], f, indent=2, ensure_ascii=False) + + if not merged: + console.print("\n[ INFO ] No vulnerabilities found.\n", style="yellow") + console.print( + "AutoFiC automation has been halted.--llm, --patch, and --pr stages will not be executed.\n", + style="yellow") + return None + + return merged_path + + +class SnykCodeHandler: + def __init__(self, repo_path: Path, save_dir: Path): + self.repo_path = repo_path + self.save_dir = save_dir + + def run(self): + description = "Running SnykCode...".ljust(28) + with create_progress() as progress: + task = progress.add_task(description, total=100) + + start = time.time() + runner = SnykCodeRunner(repo_path=str(self.repo_path)) + result = runner.run_snykcode() + end = time.time() + + duration = max(end - start, 0.1) + step = duration / 100 + for _ in range(100): + progress.update(task, advance=1) + time.sleep(step) + progress.update(task, completed=100) + + return self._post_process(json.loads(result.stdout)) + + def _post_process(self, data): + sast_dir = self.save_dir / "sast" + sast_dir.mkdir(parents=True, exist_ok=True) + before_path = sast_dir / "before.json" + SnykCodePreprocessor.save_json_file(data, before_path) + snippets = SnykCodePreprocessor.preprocess(str(before_path), str(self.repo_path)) + merged = merge_snippets_by_file(snippets) + merged_path = sast_dir / "merged_snippets.json" + with open(merged_path, "w", encoding="utf-8") as f: + json.dump([s.model_dump() for s in merged], f, indent=2, ensure_ascii=False) + + if not merged: + console.print("\n[ INFO ] No vulnerabilities found.\n", style="yellow") + console.print( + "AutoFiC automation has been halted.--llm, --patch, and --pr stages will not be executed.\n", + style="yellow") + return None + + return merged_path + + +class SASTAnalyzer: + def __init__(self, repo_path: Path, save_dir: Path, tool: str): + self.repo_path = repo_path + self.save_dir = save_dir + self.tool = tool + self.result_path = None + self.handler = self._get_handler() + + def _get_handler(self): + if self.tool == "semgrep": + return SemgrepHandler(self.repo_path, self.save_dir) + elif self.tool == "codeql": + return CodeQLHandler(self.repo_path, self.save_dir) + elif self.tool == "snykcode": + return SnykCodeHandler(self.repo_path, self.save_dir) + else: + raise ValueError(f"[ ERROR ] Unsupported SAST tool: {self.tool}") + + def run(self): + print_divider("SAST Analysis Stage") + + try: + merged_path = self.handler.run() + return merged_path + except Exception as e: + console.print(f"[ ERROR ] SAST tool [{self.tool}] failed: {e}", style="red") + raise + + def save_snippets(self, merged_snippets_path: Path): + with open(merged_snippets_path, "r", encoding="utf-8") as f: + merged_snippets = json.load(f) + + snippets_dir = self.save_dir / "snippets" + snippets_dir.mkdir(parents=True, exist_ok=True) + + for snippet_data in merged_snippets: + if isinstance(snippet_data, dict): + snippet_obj = BaseSnippet(**snippet_data) + elif isinstance(snippet_data, BaseSnippet): + snippet_obj = snippet_data + else: + raise TypeError(f"[ ERROR ] Unknown snippet type: {type(snippet_data)}") + + filename_base = snippet_obj.path.replace("\\", "_").replace("/", "_") + filename = f"snippet_{filename_base}.json" + path = snippets_dir / filename + + with open(path, "w", encoding="utf-8") as f_out: + json.dump(snippet_obj.snippet, f_out, indent=2, ensure_ascii=False) + + +class LLMProcessor: + def __init__(self, sast_result_path: Path, repo_path: Path, save_dir: Path, tool: str): + self.sast_result_path = sast_result_path + self.repo_path = repo_path + self.save_dir = save_dir + self.tool = tool + self.llm_output_dir = save_dir / "llm" + self.parsed_dir = save_dir / "parsed" + self.patch_dir = save_dir / "patch" + + def run(self): + print_divider("LLM Response Generation Stage") + + prompt_generator = PromptGenerator() + merged_path = self.save_dir / "sast" / "merged_snippets.json" + + with open(merged_path, "r", encoding="utf-8") as f: + merged_data = json.load(f) + file_snippets = [BaseSnippet(**item) for item in merged_data] + prompts = prompt_generator.generate_prompts(file_snippets) + + if not prompts: + console.print("[INFO] No valid prompts generated. Skipping LLM stage.\n", style="cyan") + return [], [] + + llm = LLMRunner() + self.llm_output_dir.mkdir(parents=True, exist_ok=True) + + description = "Generating LLM responses... \n".ljust(28) + with create_progress() as progress: + task = progress.add_task(description, total=len(prompts)) + + for p in prompts: + response = llm.run(p.prompt) + + save_md_response(response, p, output_dir=self.llm_output_dir) + + progress.update(task, advance=1) + time.sleep(0.01) + progress.update(task, completed=100) + + console.print(f"[ SUCCESS ] LLM responses saved → {self.llm_output_dir}", style="bold green") + return prompts, file_snippets + + def retry(self): + print_divider("LLM Retry Stage") + + retry_prompt_generator = RetryPromptGenerator(parsed_dir=self.parsed_dir) + retry_prompts = retry_prompt_generator.generate_prompts() + + console.print("[ RETRY ] Regenerating GPT responses for modified files...\n") + + llm = LLMRunner() + retry_output_dir = self.save_dir / "retry_llm" + retry_output_dir.mkdir(parents=True, exist_ok=True) + + console.print("\nStarting GPT retry response generation\n") + with create_progress() as progress: + task = progress.add_task("[magenta]Retrying LLM responses...", total=len(retry_prompts)) + for prompt in retry_prompts: + try: + response = llm.run(prompt.prompt) + save_md_response(response, prompt, output_dir=retry_output_dir) + except LLMExecutionError as e: + console.print(str(e), style="red") + finally: + progress.update(task, advance=1) + time.sleep(0.01) + progress.update(task, completed=100) + + console.print(f"\n[ SUCCESS ] Retry LLM responses saved → {retry_output_dir}\n", style="bold green") + + return retry_prompts, retry_output_dir + + def extract_and_save_parsed_code(self): + parser = ResponseParser(md_dir=self.llm_output_dir, diff_dir=self.parsed_dir) + + try: + success = parser.extract_and_save_all() + except ResponseParseError as e: + console.print(str(e), style="red") + success = False + + if not success: + console.print(f"\n[ WARN ] No parsable content found in LLM responses.\n", style="yellow") + + +class PatchManager: + def __init__(self, parsed_dir: Path, patch_dir: Path, repo_dir: Path): + self.parsed_dir = parsed_dir + self.patch_dir = patch_dir + self.repo_dir = repo_dir + + def run(self): + print_divider("Diff Generation and Patch Application Stage") + + from autofic_core.patch.diff_generator import DiffGenerator + diff_generator = DiffGenerator( + repo_dir=self.repo_dir, + parsed_dir=self.parsed_dir, + patch_dir=self.patch_dir, + ) + diff_generator.run() + time.sleep(0.1) + + patch_applier = PatchApplier( + patch_dir=self.patch_dir, + repo_dir=self.repo_dir, + parsed_dir=self.parsed_dir, + ) + success = patch_applier.apply_all() + + if success: + console.print(f"[ SUCCESS ] All patches successfully applied", style="bold green") + else: + console.print(f"\n[ WARN ] Some patches failed to apply → {self.repo_dir}\n", style="yellow") + + +class AutoFiCPipeline: + def __init__(self, repo_url: str, save_dir: Path, sast: bool, sast_tool: str, llm: bool, llm_retry: bool, patch: bool, pr: bool): + self.repo_url = repo_url + self.save_dir = save_dir.expanduser().resolve() + self.sast = sast + self.llm = llm + self.sast_tool = sast_tool + self.llm_retry = llm_retry + self.patch = patch + self.pr = pr + + self.repo_manager = RepositoryManager(self.repo_url, self.save_dir) + self.sast_analyzer = None + self.llm_processor = None + + def run(self): + self.repo_manager.clone() + + sast_result_path = None + if self.sast: + self.sast_analyzer = SASTAnalyzer( + self.repo_manager.clone_path, + self.save_dir, + tool=self.sast_tool, + ) + sast_result_path = self.sast_analyzer.run() + + if not sast_result_path: + sys.exit(0) + + self.sast_analyzer.save_snippets(sast_result_path) + + if self.llm: + if not sast_result_path: + raise RuntimeError("SAST results are required before running LLM.") + + merged_path = self.save_dir / "sast" / "merged_snippets.json" + if not merged_path.exists(): + console.print("[ INFO ] No merged_snippets.json file found. Skipping LLM stage.\n", style="cyan") + sys.exit(0) + + with open(merged_path, "r", encoding="utf-8") as f: + merged_data = json.load(f) + + self.llm_processor = LLMProcessor(sast_result_path, self.repo_manager.clone_path, self.save_dir, + self.sast_tool) + + try: + prompts, file_snippets = self.llm_processor.run() + except LLMExecutionError as e: + console.print(str(e), style="red") + sys.exit(1) + + if not prompts: + console.print("[ INFO ] No valid prompts returned from LLM processor. Exiting pipeline early.\n", + style="cyan") + sys.exit(0) + + self.llm_processor.extract_and_save_parsed_code() + + prompt_generator = PromptGenerator() + unique_file_paths = prompt_generator.get_unique_file_paths(file_snippets) + + llm_output_dir = self.llm_processor.llm_output_dir + response_files = sorted([f.name for f in llm_output_dir.glob("response_*.md")]) + + print_summary( + repo_url=self.repo_url, + detected_issues_count=len(unique_file_paths), + output_dir=str(llm_output_dir), + response_files=response_files + ) + + if self.patch: + parsed_dir = self.save_dir / ("retry_parsed" if self.llm_retry else "parsed") + patch_dir = self.save_dir / ("retry_patch" if self.llm_retry else "patch") + + patch_manager = PatchManager(parsed_dir, patch_dir, self.repo_manager.clone_path) + patch_manager.run() \ No newline at end of file diff --git a/src/autofic_core/pr_auto/__init__.py b/src/autofic_core/pr_auto/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/autofic_core/pr_auto/create_yml.py b/src/autofic_core/pr_auto/create_yml.py new file mode 100644 index 0000000..3120149 --- /dev/null +++ b/src/autofic_core/pr_auto/create_yml.py @@ -0,0 +1,90 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +"""Contains their functional aliases. +""" +import os +import subprocess +from rich.console import Console + +console = Console() + +# Handles creation and git operations for GitHub Actions workflow YAML files +class AboutYml: + """ + Class for managing GitHub Actions workflow YAML files. + Provides methods to create workflow files and push them to a repository. + """ + def __init__(self, start_dir="."): + """ + Initialize with the starting directory (default: current directory). + :param start_dir: Base directory for workflow file operations. + """ + self.start_dir = start_dir + + def create_pr_yml(self): + """ + Create the 'pr_notify.yml' GitHub Actions workflow file. + This workflow sends notifications to Discord and Slack when a pull request is opened, reopened, or closed. + """ + workflow_dir = os.path.join(self.start_dir, ".github", "workflows") + os.makedirs(workflow_dir, exist_ok=True) + + pr_notify_yml_path = os.path.join(workflow_dir, "pr_notify.yml") + pr_notify_yml_content = """name: PR Notifier + +on: + pull_request: + types: [opened, reopened, closed] + +jobs: + notify: + runs-on: ubuntu-latest + steps: + - name: Notify Discord + env: + DISCORD_WEBHOOK_URL: ${{ secrets.DISCORD_WEBHOOK_URL }} + run: | + curl -H "Content-Type: application/json" \ + -d '{"content": "🔔 Pull Request [${{ github.event.pull_request.title }}](${{ github.event.pull_request.html_url }}) by ${{ github.event.pull_request.user.login }} - ${{ github.event.action }}"}' \ + $DISCORD_WEBHOOK_URL + - name: Notify Slack + env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} + run: | + curl -H "Content-Type: application/json" \ + -d '{"text": ":bell: Pull Request <${{ github.event.pull_request.html_url }}|${{ github.event.pull_request.title }}> by ${{ github.event.pull_request.user.login }} - ${{ github.event.action }}"}' \ + $SLACK_WEBHOOK_URL +""" + with open(pr_notify_yml_path, "w", encoding="utf-8") as f: + f.write(pr_notify_yml_content) + + def push_pr_yml(self, user_name, repo_name, token, branch_name): + """ + Adds, commits, and pushes the created workflow YAML file to the specified git branch. + The remote URL is set to use the provided GitHub token for authentication. (Needed!) + + :param user_name: GitHub username (repository owner) + :param repo_name: Name of the repository + :param token: GitHub access token (for authentication) + :param branch_name: Name of the branch to push to + """ + repo_url = f'https://x-access-token:{token}@github.com/{user_name}/{repo_name}.git' + subprocess.run(['git', 'remote', 'set-url', 'origin', repo_url], check=True) + console.print("[ INFO ] Created GitHub Actions workflow file: pr_notify.yml\n", style="white") + subprocess.run(['git', 'add', '.github/workflows/pr_notify.yml'], check=True) + subprocess.run(['git', 'commit', '-m', "[ AutoFiC ] Create package.json and CI workflow"], check=True) + subprocess.run(['git', 'push', 'origin', branch_name], check=True) diff --git a/src/autofic_core/pr_auto/env_encrypt.py b/src/autofic_core/pr_auto/env_encrypt.py new file mode 100644 index 0000000..7a74bee --- /dev/null +++ b/src/autofic_core/pr_auto/env_encrypt.py @@ -0,0 +1,55 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +import base64 +import requests +from nacl import public, encoding +from rich.console import Console + +console = Console() + +class EnvEncrypy: + def __init__(self, user_name, repo_name, token): + self.user_name = user_name + self.repo_name = repo_name + self.token = token + + def webhook_secret_notifier(self, secret_name: str, webhook_url: str): + url = f'https://api.github.com/repos/{self.user_name}/{self.repo_name}/actions/secrets/public-key' + headers = {'Authorization': f'token {self.token}'} + resp = requests.get(url, headers=headers) + pubkey_info = resp.json() + + if 'key_id' not in pubkey_info or 'key' not in pubkey_info: + console.print(f"[ ERROR ] Invalid public key info: {pubkey_info}", style="red") + return + + key_id = pubkey_info['key_id'] + encrypted_value = self.encrypt(pubkey_info['key'], webhook_url) + + url2 = f'https://api.github.com/repos/{self.user_name}/{self.repo_name}/actions/secrets/{secret_name}' + payload = { + "encrypted_value": encrypted_value, + "key_id": key_id + } + resp2 = requests.put(url2, headers={**headers, 'Content-Type': 'application/json'}, json=payload) + console.print(f"[ INFO ] Secret registered: {secret_name}, {resp2.status_code}, {resp2.text}", style="white") + + def encrypt(self, public_key: str, secret_value: str) -> str: + public_key = public.PublicKey(public_key, encoding.Base64Encoder()) + sealed_box = public.SealedBox(public_key) + encrypted = sealed_box.encrypt(secret_value.encode("utf-8")) + return base64.b64encode(encrypted).decode("utf-8") \ No newline at end of file diff --git a/src/autofic_core/pr_auto/pr_procedure.py b/src/autofic_core/pr_auto/pr_procedure.py new file mode 100644 index 0000000..034a52b --- /dev/null +++ b/src/autofic_core/pr_auto/pr_procedure.py @@ -0,0 +1,424 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +"""Contains their functional aliases. +""" +import os +import re +import json +import time +import datetime +import requests +import subprocess +from pathlib import Path +from typing import List +from rich.console import Console +from collections import defaultdict + +from autofic_core.sast.snippet import BaseSnippet +from autofic_core.sast.semgrep.preprocessor import SemgrepPreprocessor +from autofic_core.sast.codeql.preprocessor import CodeQLPreprocessor +from autofic_core.sast.snykcode.preprocessor import SnykCodePreprocessor + +console = Console() + +class PRProcedure: + """ + Handles all modules required for the pull request workflow. + + Responsibilities include: + - Branch management + - File changes and commit operations + - Pull request generation to both fork and upstream repositories + - CI status monitoring and validation + - Generating PR markdown summaries from vulnerability reports + """ + + def __init__(self, base_branch: str, repo_name: str, + upstream_owner: str, save_dir: str, repo_url: str, token: str, user_name: str, json_path: str, tool: str): + """ + Initialize PRProcedure with repository and user configuration. + + :param base_branch: The default base branch for PRs (e.g., 'WHS_VULN_DETEC_1', 'WHS_VULN_DETEC_2') + :param repo_name: The name of the repository + :param upstream_owner: The original (upstream) repository owner + :param save_dir: Local directory for repository operations + :param repo_url: Repository URL + :param token: GitHub authentication token + :param user_name: GitHub username (forked owner) + """ + self.branch_name = f'WHS_VULN_DETEC_{1}' + self.base_branch = base_branch + self.repo_name = repo_name + self.upstream_owner = upstream_owner + self.save_dir = save_dir + self.repo_url = repo_url + self.token = token + self.user_name = user_name + self.json_path = json_path + self.tool = tool + + def post_init(self): + """ + Post-initialization: Extracts the repo owner and name from the URL if needed. + Raises RuntimeError for invalid configuration (if user_name not exist in .env). + """ + if not self.user_name: + raise RuntimeError + if self.repo_url.startswith("https://github.com/"): + parts = self.repo_url[len("https://github.com/"):].split('/') + if len(parts) >= 2: + # Extract original repo owner and name + self.upstream_owner, self.repo_name = parts[:2] + else: + raise RuntimeError("Invalid repo URL") + else: + raise RuntimeError("Not a github.com URL") + + def mv_workdir(self, save_dir: str = None): + """ + Move the working directory to the repository clone directory. + """ + os.chdir(save_dir or self.save_dir) + + def check_branch_exists(self): + """ + Checks for existing branches with 'WHS_VULN_DETEC_N' pattern, used by regular expression. + Determines next available number, creates and checks out new branch. + """ + branches = subprocess.check_output(['git', 'branch', '-r'], encoding='utf-8') + prefix = "origin/WHS_VULN_DETEC_" + nums = [ + int(m.group(1)) + for m in re.finditer(rf"{re.escape(prefix)}(\d+)", branches) + ] + if nums: + next_num = max(nums) + 1 + else: + next_num = 1 + self.branch_name = f'WHS_VULN_DETEC_{next_num}' + subprocess.run(['git', 'checkout', '-b', self.branch_name], check=True) + + def change_files(self): + """ + Stages all modified files and commits with a summary message based on vulnerability scan results. + Pushes the branch to the forked repository. + """ + with open('../sast/merged_snippets.json', 'r', encoding='utf-8') as f: + data = json.load(f) + self.vulnerabilities = len(data) + + # Stage all modified/created files except ignored ones + subprocess.run(['git', 'add', '--all'], check=True) + + # Remove common directories from staging + ignore_paths = [ + '.codeql-db', '.codeql-results', 'node_modules', '.github', + '.snyk', 'snyk_result.sarif.json', '.eslintcache', 'eslint_tmp_env', '.DS_Store' + ] + for path in ignore_paths: + if os.path.exists(path): + subprocess.run(['git', 'reset', '-q', path], check=False) + + commit_message = f"[ AutoFiC ] {self.vulnerabilities} malicious code detected!!" + subprocess.run(['git', 'commit', '-m', commit_message], check=True) + + try: + subprocess.run(['git', 'push', 'origin', self.branch_name], check=True) + return True + except subprocess.CalledProcessError: + return False + + def current_main_branch(self): + """ + Determines the main branch name ('main', 'master', or other). + Basic branch is almost both main and master. + But if both branche not exist, specify first branch. + """ + branches = subprocess.check_output(['git', 'branch', '-r'], encoding='utf-8') + if f'origin/main' in branches: + self.base_branch = 'main' + elif f'origin/master' in branches: + self.base_branch = 'master' + else: + self.base_branch = branches[0].split('/')[-1] + + def generate_pr(self) -> str: + """ + Creates a pull request on the fork repository. + Uses vulnerability scan results (by semgrep) to generate a detailed PR body. + If llm_generator implemented, then pr_body will add llm_result. + """ + console.print(f"[ INFO ] Creating PR on {self.user_name}/{self.repo_name}. Base branch: {self.base_branch}\n", style="white") + pr_url = f"https://api.github.com/repos/{self.user_name}/{self.repo_name}/pulls" + if self.tool == "semgrep": + snippets = SemgrepPreprocessor.preprocess(self.json_path) + elif self.tool == "codeql": + snippets = CodeQLPreprocessor.preprocess(self.json_path) + elif self.tool == "snykcode": + snippets = SnykCodePreprocessor.preprocess(self.json_path) + else: + raise ValueError(f"Unknown tool: {self.tool}") + pr_body = self.generate_markdown(snippets) + data_post = { + "title": f"[ AutoFiC ] Security Patch {datetime.datetime.now().strftime('%Y-%m-%d')}", + "head": f"{self.user_name}:{self.branch_name}", + "base": self.base_branch, + "body": pr_body + } + headers = { + "Authorization": f"token {self.token}", + "Accept": "application/vnd.github+json" + } + pr_resp = requests.post(pr_url, json=data_post, headers=headers) + if pr_resp.status_code in (201, 202): + pr_json = pr_resp.json() + time.sleep(0.05) + else: + return False + + def create_pr(self): + """ + After PR is opened on fork, waits for CI to pass and then automatically creates a PR to the upstream repository. + """ + + # Step 1. Find latest open PR on fork + prs_url = f"https://api.github.com/repos/{self.user_name}/{self.repo_name}/pulls" + headers = { + "Authorization": f"token {self.token}", + "Accept": "application/vnd.github+json" + } + prs_resp = requests.get(prs_url, headers=headers, params={"state": "open", "per_page": 1, "sort": "created", "direction": "desc"}) + prs = prs_resp.json() + if not prs: + return + recent_pr = prs[0] + pr_number = recent_pr["number"] + self.pr_branch = recent_pr["head"]["ref"] + + # Step 2. Find Actions run_id for that PR + runs_url = f"https://api.github.com/repos/{self.user_name}/{self.repo_name}/actions/runs" + run_id = None + for _ in range(60): # Wait up to 5 minutes + runs_resp = requests.get(runs_url, headers=headers, params={"event": "pull_request", "per_page": 20}) + runs = runs_resp.json().get("workflow_runs", []) + for run in runs: + pr_list = run.get("pull_requests", []) + if any(pr.get("number") == pr_number for pr in pr_list): + run_id = run["id"] + break + if run_id: + break + time.sleep(5) + else: + return + + # Step 3. Wait until the workflow run completes successfully + run_url = f"https://api.github.com/repos/{self.user_name}/{self.repo_name}/actions/runs/{run_id}" + for _ in range(120): # Wait up to 10 minutes + run_resp = requests.get(run_url, headers=headers) + run_info = run_resp.json() + run_status = run_info.get("status") + conclusion = run_info.get("conclusion") # This code block will judge whether pr to upstream repo + if run_status == "completed": + if conclusion == "success": + break + else: + return + time.sleep(5) + else: + return + + workflow_file = Path(".github/workflows/pr_notify.yml") + if workflow_file.exists(): + subprocess.run(['git', 'rm', str(workflow_file)], check=True) + subprocess.run(['git', 'commit', '-m', "chore: remove CI workflow before upstream PR"], check=True) + subprocess.run(['git', 'push', 'origin', self.pr_branch], check=True) + + # Step 4. If all checks pass('success'), create PR to upstream/original repository + pr_url = f"https://api.github.com/repos/{self.upstream_owner}/{self.repo_name}/pulls" + if self.tool == "semgrep": + snippets = SemgrepPreprocessor.preprocess(self.json_path) + elif self.tool == "codeql": + snippets = CodeQLPreprocessor.preprocess(self.json_path) + elif self.tool == "snykcode": + snippets = SnykCodePreprocessor.preprocess(self.json_path) + pr_body = self.generate_markdown(snippets) + data_post = { + "title": f"[ AutoFiC ] Security Patch {datetime.datetime.now().strftime('%Y-%m-%d')}", + "head": f"{self.user_name}:{self.pr_branch}", + "base": self.base_branch, + "body": pr_body + } + pr_resp = requests.post(pr_url, json=data_post, headers=headers) + if pr_resp.status_code in (201, 202): + pr_json = pr_resp.json() + return pr_json.get("number") + else: + return + + def generate_markdown(self, snippets: List[BaseSnippet]) -> str: + def get_severity_emoji(level: str) -> str: + level = level.upper() + return { + "ERROR": "🛑 ERROR", + "WARNING": "⚠️ WARNING", + "NOTE": "💡 NOTE" + }.get(level, level) + + def generate_markdown_from_llm(llm_path: str) -> str: + """ + Parses an LLM-generated markdown response and formats it into a GitHub PR body. + + Expected sections in the markdown file: + 1. Vulnerability Description + 2. Potential Risks + 3. Recommended Fix + 4. Final Patched Code + 5. References + """ + try: + with open(llm_path, encoding='utf-8') as f: + content = f.read() + except FileNotFoundError: + return { + "Vulnerability": "", + "Risks": "", + "Recommended Fix": "", + "References": "" + } + + sections = { + "Vulnerability": "", + "Risks": "", + "Recommended Fix": "", + "References": "" + } + + pattern = re.compile( + r"1\. Vulnerability Description\s*[::]?\s*(.*?)\s*" + r"2\. Potential Risk\s*[::]?\s*(.*?)\s*" + r"3\. Recommended Fix\s*[::]?\s*(.*?)\s*" + r"(?:4\. Final Modified Code.*?\s*)?" + r"5\. Additional Notes\s*[::]?\s*(.*)", + re.DOTALL + ) + + match = pattern.search(content) + if match: + sections["Vulnerability"] = match.group(1).strip() + sections["Risks"] = match.group(2).strip() + sections["Recommended Fix"] = match.group(3).strip() + sections["References"] = match.group(4).strip() + + return sections + + grouped_by_file = defaultdict(list) + for item in snippets: + filename = os.path.relpath(item.path, self.save_dir).replace("\\", "/") + grouped_by_file[filename].append(item) + + md = [ + "## 🔧 About This Pull Request", + "This patch was automatically created by **[ AutoFiC ](https://autofic.github.io)**,\nan open-source framework that combines **static analysis tools** with **AI-driven remediation**.", + "\nUsing **Semgrep**, **CodeQL**, and **Snyk Code**, AutoFiC detected potential **security flaws** and applied **verified fixes**.", + "Each patch includes **contextual explanations** powered by a **large language model** to support **review and decision-making**.", + "", + "## 🔐 Summary of Security Fixes", + ] + + if not grouped_by_file: + md.append("No vulnerabilities detected. No changes made.\n") + return "\n".join(md) + + md.append("### Overview\n") + md.append(f"> Detected by: **{self.tool.upper()}**\n") + md.append("| File | Total Issues |") + md.append("|------|---------------|") + for filename, items in grouped_by_file.items(): + md.append(f"| `{filename}` | **{len(items)}** |") + + file_idx = 1 + for filename, items in grouped_by_file.items(): + md.append(f"### {file_idx}. `{filename}`") + md.append("#### 🧩 SAST Analysis Summary") + has_cwe = any(item.cwe for items in grouped_by_file.values() for item in items) + has_ref = any(item.references for items in grouped_by_file.values() for item in items) + + header = ["Line", "Type", "Level"] + if has_cwe: + header.append("CWE") + if has_ref: + header.append("Ref") + md.append("| " + " | ".join(header) + " |") + md.append("|" + "|".join(["-" * len(col) for col in header]) + "|") + + for item in items: + line_info = f"{item.start_line}" if item.start_line == item.end_line else f"{item.start_line}~{item.end_line}" + vuln = item.vulnerability_class[0] if item.vulnerability_class else "N/A" + severity = item.severity.upper() if item.severity else "N/A" + + row = [line_info, vuln, get_severity_emoji(severity)] + + if has_cwe: + cwe = item.cwe[0].split(":")[0] if item.cwe else "N/A" + row.append(cwe) + if has_ref: + ref = item.references[0] if item.references else "" + ref_link = f"[🔗]({ref})" if ref else "" + row.append(ref_link) + + md.append("| " + " | ".join(row) + " |") + + llm_dir = os.path.abspath(os.path.join(self.save_dir, '..', 'llm')) + for eachname in os.listdir(llm_dir): + if not eachname.endswith('.md'): + continue + base_mdname = eachname[:-3] + if base_mdname.startswith("response_"): + base_mdname = base_mdname[len("response_"):] + llm_target_path = base_mdname.replace("_", "/") + if item.path == llm_target_path: + llm_path = os.path.join(llm_dir, eachname) + llm_summary = generate_markdown_from_llm(llm_path) + if llm_summary: + md.append("#### 📝 LLM Analysis\n") + if llm_summary["Vulnerability"]: + md.append("#### 🔸 Vulnerability Description") + md.append(llm_summary["Vulnerability"].strip()) + if llm_summary["Recommended Fix"]: + md.append("#### 🔸 Recommended Fix") + md.append(llm_summary["Recommended Fix"].strip()) + if llm_summary["References"]: + md.append("#### 🔸 Additional Notes") + md.append(llm_summary["References"].strip()) + + file_idx += 1 + + md.append("\n## 🛠 Fix Summary\n") + md.append( + "All identified **vulnerabilities** have been **remediated** following **security best practices** " + "such as **parameterized queries** and **proper input validation**. " + "Please refer to the **diff tab** for detailed **code changes**.\n" + ) + md.append( + "If you have **questions** or **feedback** regarding this **automated patch**, feel free to reach out via **[AutoFiC GitHub](https://github.com/autofic)**.\n" + ) + return "\n".join(md) + + def contains_all(self, text, *keywords): + """ Check if all keywords are present in the text.""" + return all(k in text for k in keywords) \ No newline at end of file diff --git a/src/autofic_core/sample_module.py b/src/autofic_core/sample_module.py deleted file mode 100644 index 8ec43c2..0000000 --- a/src/autofic_core/sample_module.py +++ /dev/null @@ -1,2 +0,0 @@ -def sample_function(): - return "Hello World" diff --git a/src/autofic_core/sast/__init__.py b/src/autofic_core/sast/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/autofic_core/sast/codeql/preprocessor.py b/src/autofic_core/sast/codeql/preprocessor.py new file mode 100644 index 0000000..5d27bc3 --- /dev/null +++ b/src/autofic_core/sast/codeql/preprocessor.py @@ -0,0 +1,147 @@ +# ============================================================================= +# Copyright 2025 Autofic Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +""" +CodeQLPreprocessor: Extracts vulnerability snippets from CodeQL SARIF results. + +- Parses SARIF JSON results +- Matches vulnerabilities to code regions +- Generates BaseSnippet objects for downstream processing +""" + +import json +import os +from typing import List +from autofic_core.sast.snippet import BaseSnippet + + +class CodeQLPreprocessor: + """ + Processes SARIF output from CodeQL and extracts vulnerability information + into a uniform BaseSnippet format. + """ + + @staticmethod + def read_json_file(path: str) -> dict: + """Reads JSON content from the given file path.""" + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + + @staticmethod + def save_json_file(data: dict, path: str) -> None: + """Saves the given dictionary as JSON to the specified path.""" + os.makedirs(os.path.dirname(path), exist_ok=True) + with open(path, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=4, ensure_ascii=False) + + @staticmethod + def preprocess(input_json_path: str, base_dir: str = ".") -> List[BaseSnippet]: + """ + Parses CodeQL SARIF results and extracts code snippets for each finding. + + Args: + input_json_path (str): Path to the SARIF result file. + base_dir (str): Base path to resolve relative file URIs. + + Returns: + List[BaseSnippet]: Extracted and structured vulnerability snippets. + """ + results = CodeQLPreprocessor.read_json_file(input_json_path) + + # Build rule metadata lookup from SARIF tool section + rule_metadata = {} + for run in results.get("runs", []): + for rule in run.get("tool", {}).get("driver", {}).get("rules", []): + rule_id = rule.get("id") + if rule_id: + rule_metadata[rule_id] = { + "cwe": [ + tag.split("/")[-1].replace("cwe-", "CWE-") + for tag in rule.get("properties", {}).get("tags", []) + if "cwe-" in tag + ], + "references": [rule.get("helpUri")] if rule.get("helpUri") else [], + "level": ( + rule.get("defaultConfiguration", {}).get("level") + or rule.get("properties", {}).get("problem.severity", "UNKNOWN") + ) + } + + processed: List[BaseSnippet] = [] + snippet_idx = 0 + + for run in results.get("runs", []): + for res in run.get("results", []): + location = res.get("locations", [{}])[0].get("physicalLocation", {}) + artifact = location.get("artifactLocation", {}) + region = location.get("region", {}) + + file_uri = artifact.get("uri", "Unknown") + full_path = os.path.join(base_dir, file_uri) + start_line = region.get("startLine", 0) + end_line = region.get("endLine") or start_line + + rule_id = res.get("ruleId") + meta = rule_metadata.get(rule_id, {}) if rule_id else {} + + # Normalize severity level + level = res.get("level") or meta.get("level", "UNKNOWN") + if isinstance(level, list): + level = level[0] if level else "UNKNOWN" + severity = str(level).upper() + + snippet = "" + lines = [] + + try: + if os.path.exists(full_path): + with open(full_path, "r", encoding="utf-8") as code_file: + lines = code_file.readlines() + + # Defensive check on line bounds + if start_line > len(lines) or start_line < 1: + continue + + raw_snippet = ( + lines[start_line - 1:end_line] + if end_line > start_line + else [lines[start_line - 1]] + ) + + if all(not line.strip() for line in raw_snippet): + continue + + snippet = "".join(raw_snippet) + + except Exception: + continue # Skip problematic entries silently + + processed.append(BaseSnippet( + input="".join(lines), + snippet=snippet.strip(), + path=file_uri, + idx=snippet_idx, + start_line=start_line, + end_line=end_line, + message=res.get("message", {}).get("text", ""), + severity=severity, + vulnerability_class=[rule_id.split("/", 1)[-1]] if rule_id else [], + cwe=meta.get("cwe", []), + references=meta.get("references", []) + )) + snippet_idx += 1 + + return processed \ No newline at end of file diff --git a/src/autofic_core/sast/codeql/runner.py b/src/autofic_core/sast/codeql/runner.py new file mode 100644 index 0000000..fd27c04 --- /dev/null +++ b/src/autofic_core/sast/codeql/runner.py @@ -0,0 +1,104 @@ +# ============================================================================= +# Copyright 2025 Autofic Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +""" +CodeQLRunner: Executes the full CodeQL analysis pipeline. + +Steps: +1. Downloads the query pack. +2. Creates a CodeQL database from the source. +3. Runs the analysis and saves the result as a SARIF file. +""" + +import subprocess +from pathlib import Path +from autofic_core.errors import CodeQLExecutionError + + +class CodeQLRunner: + """ + Executes CodeQL analysis for a given repository. + + Attributes: + repo_path (Path): Path to the cloned repository. + language (str): Programming language to analyze (default: "javascript"). + query_pack (str): Query pack path (e.g., "codeql/javascript-queries"). + db_path (Path): Path to the CodeQL database. + result_dir (Path): Directory for storing analysis results. + output_path (Path): Path to the generated SARIF report. + log_path (Path): Path to the log file for subprocess outputs. + """ + + def __init__(self, repo_path: Path, language: str = "javascript"): + self.repo_path = Path(repo_path).resolve() + self.language = language.lower() + self.query_pack = f"codeql/{self.language}-queries" + self.db_path = self.repo_path / ".codeql-db" + self.result_dir = self.repo_path / ".codeql-results" + self.output_path = self.result_dir / "codeql.sarif.json" + self.log_path = self.result_dir / "codeql.log" + + def _run_cmd(self, cmd: list[str], log_file): + """ + Executes a shell command and logs its output. + + Args: + cmd (list[str]): Command and arguments to run. + log_file: File handle to write stdout and stderr. + """ + subprocess.run(cmd, check=True, stdout=log_file, stderr=log_file) + + def run_codeql(self) -> Path: + """ + Runs the full CodeQL analysis pipeline: + 1. Downloads the query pack. + 2. Creates the database from source code. + 3. Analyzes the database and exports SARIF results. + + Returns: + Path: Path to the resulting SARIF JSON file. + + Raises: + CodeQLExecutionError: If any subprocess call fails. + """ + self.output_path.parent.mkdir(parents=True, exist_ok=True) + + try: + with self.log_path.open("w") as log_file: + # Step 1: Ensure query pack is downloaded + self._run_cmd([ + "codeql", "pack", "download", self.query_pack + ], log_file) + + # Step 2: Create a database from the repository + self._run_cmd([ + "codeql", "database", "create", str(self.db_path), + f"--language={self.language}", + "--source-root", str(self.repo_path) + ], log_file) + + # Step 3: Analyze the database and generate SARIF report + self._run_cmd([ + "codeql", "database", "analyze", str(self.db_path), + self.query_pack, + "--format=sarifv2.1.0", + "--output", str(self.output_path) + ], log_file) + + except subprocess.CalledProcessError: + raise CodeQLExecutionError() + + return self.output_path \ No newline at end of file diff --git a/src/autofic_core/sast/merger.py b/src/autofic_core/sast/merger.py new file mode 100644 index 0000000..3809ea8 --- /dev/null +++ b/src/autofic_core/sast/merger.py @@ -0,0 +1,66 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +from collections import defaultdict +from typing import List +from autofic_core.sast.snippet import BaseSnippet + + +def merge_snippets_by_file(snippets: List[BaseSnippet]) -> List[BaseSnippet]: + grouped = defaultdict(list) + + for snippet in snippets: + grouped[snippet.path].append(snippet) + + merged_snippets = [] + + for path, group in grouped.items(): + base = group[0] + start_line = min(s.start_line for s in group) + end_line = max(s.end_line for s in group) + + snippet_lines_set = set() + for s in group: + if s.snippet: + snippet_lines_set.update(s.snippet.splitlines()) + merged_snippet_text = "\n".join(sorted(snippet_lines_set)) + + merged_message = " | ".join(sorted(set(s.message for s in group if s.message))) + merged_vuln_class = sorted({vc for s in group for vc in s.vulnerability_class}) + merged_cwe = sorted({c for s in group for c in s.cwe}) + merged_references = sorted({r for s in group for r in s.references}) + + severity_order = {"INFO": 0, "WARNING": 1, "ERROR": 2} + severity = max( + (str(s.severity).upper() for s in group if s.severity), + key=lambda x: severity_order.get(x, -1), + default="" + ) + merged_snippets.append(BaseSnippet( + input=base.input, + idx=None, + start_line=start_line, + end_line=end_line, + snippet=merged_snippet_text, + message=merged_message, + vulnerability_class=merged_vuln_class, + cwe=merged_cwe, + severity=severity, + references=merged_references, + path=path + )) + + return merged_snippets \ No newline at end of file diff --git a/src/autofic_core/sast/semgrep/preprocessor.py b/src/autofic_core/sast/semgrep/preprocessor.py new file mode 100644 index 0000000..42e8aa6 --- /dev/null +++ b/src/autofic_core/sast/semgrep/preprocessor.py @@ -0,0 +1,91 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +import json +from pathlib import Path +from typing import List +from autofic_core.sast.snippet import BaseSnippet + +class SemgrepPreprocessor: + + @staticmethod + def ensure_list(value): + if value is None: + return [] + if isinstance(value, list): + return value + return [value] + + @staticmethod + def read_json_file(path: str) -> dict: + with open(path, 'r', encoding='utf-8') as f: + return json.load(f) + + @staticmethod + def save_json_file(data: dict, path: Path) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + @staticmethod + def preprocess(input_json_path: str, base_dir: str = ".") -> List[BaseSnippet]: + results = SemgrepPreprocessor.read_json_file(input_json_path) + base_dir_path = Path(base_dir).resolve() + processed: List[BaseSnippet] = [] + + items = results.get("results") if isinstance(results, dict) else results + + for idx, result in enumerate(items): + raw_path = result.get("path", "").strip().replace("\\", "/") + base_dir_str = str(base_dir_path).replace("\\", "/") + + rel_path = raw_path[len(base_dir_str):].lstrip("/") if raw_path.startswith(base_dir_str) else raw_path + + file_path = (base_dir_path / rel_path).resolve() + if not file_path.exists(): + raise FileNotFoundError(f"[ERROR] File not found: {file_path}") + + full_code = file_path.read_text(encoding='utf-8') + + if "start" in result and "line" in result["start"]: + start_line = result["start"]["line"] + end_line = result["end"]["line"] + else: + start_line = result.get("start_line", 0) + end_line = result.get("end_line", 0) + + lines = full_code.splitlines() + snippet_lines = lines[start_line - 1:end_line] if 0 < start_line <= end_line <= len(lines) else [] + snippet = "\n".join(snippet_lines) + + extra = result.get("extra", {}) + meta = extra.get("metadata", {}) + + processed.append(BaseSnippet( + input=full_code, + idx=idx, + start_line=start_line, + end_line=end_line, + snippet=snippet, + message=extra.get("message", ""), + vulnerability_class=SemgrepPreprocessor.ensure_list(meta.get("vulnerability_class")), + cwe=SemgrepPreprocessor.ensure_list(meta.get("cwe")), + severity=extra.get("severity", ""), + references=SemgrepPreprocessor.ensure_list(meta.get("references")), + path=rel_path + )) + + return processed \ No newline at end of file diff --git a/src/autofic_core/sast/semgrep/runner.py b/src/autofic_core/sast/semgrep/runner.py new file mode 100644 index 0000000..bb3f45f --- /dev/null +++ b/src/autofic_core/sast/semgrep/runner.py @@ -0,0 +1,44 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +import subprocess +from pydantic import BaseModel + +class SemgrepResult(BaseModel): + stdout: str + stderr: str + returncode: int + +class SemgrepRunner(BaseModel): + repo_path: str + rule: str + + def run_semgrep(self) -> SemgrepResult: + cmd = [ + "semgrep", + "--config", self.rule, + "--json", + "--include", "*.js", + "--include", "*.jsx", + "--include", "*.mjs", + self.repo_path + ] + + try: + completed = subprocess.run(cmd, capture_output=True, text=True, encoding='utf-8', check=True) + return SemgrepResult(stdout=completed.stdout, stderr=completed.stderr, returncode=completed.returncode) + except subprocess.CalledProcessError as err: + return SemgrepResult(stdout=err.stdout, stderr=err.stderr, returncode=err.returncode) \ No newline at end of file diff --git a/src/autofic_core/sast/snippet.py b/src/autofic_core/sast/snippet.py new file mode 100644 index 0000000..89f15b7 --- /dev/null +++ b/src/autofic_core/sast/snippet.py @@ -0,0 +1,50 @@ +# ============================================================================= +# Copyright 2025 Autofic Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +"""Defines a unified BaseSnippet model for all SAST tool outputs.""" + +from pydantic import BaseModel, Field +from typing import List, Optional + + +class BaseSnippet(BaseModel): + """ + Unified structure for all vulnerability snippets from Semgrep, CodeQL, SnykCode, etc. + + Attributes: + input (str): Full source code of the file. + idx (int): Unique index of the snippet within the file. + start_line (int): Start line number of the vulnerable code. + end_line (int): End line number of the vulnerable code. + snippet (str): Vulnerable code snippet. + message (str): Description of the vulnerability. + severity (str): Severity level (e.g., HIGH, MEDIUM, LOW). + path (str): File path relative to the repository root. + vulnerability_class (List[str]): List of vulnerability types (e.g., SQL Injection). + cwe (List[str]): List of CWE identifiers. + references (List[str]): List of external reference links. + """ + input: str + idx: Optional[int] = None + start_line: int + end_line: int + snippet: Optional[str] = None + message: str = "" + vulnerability_class: List[str] = Field(default_factory=list) + cwe: List[str] = Field(default_factory=list) + severity: str = "" + references: List[str] = Field(default_factory=list) + path: str diff --git a/src/autofic_core/sast/snykcode/preprocessor.py b/src/autofic_core/sast/snykcode/preprocessor.py new file mode 100644 index 0000000..d10fa2e --- /dev/null +++ b/src/autofic_core/sast/snykcode/preprocessor.py @@ -0,0 +1,128 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +""" +SnykCodePreprocessor extracts and normalizes SARIF-based Snyk Code scan results +into BaseSnippet objects for downstream LLM and patching workflows. +""" + +import json +import os +from pathlib import Path +from typing import List, Any + +from autofic_core.sast.snippet import BaseSnippet + + +class SnykCodePreprocessor: + """ + Preprocesses Snyk Code SARIF results into structured BaseSnippet objects. + """ + + @staticmethod + def read_json_file(path: str) -> dict: + """ + Load JSON file from given path. + + Args: + path (str): Path to SARIF result file. + + Returns: + dict: Parsed JSON content. + """ + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + @staticmethod + def save_json_file(data: Any, path: str) -> None: + """ + Save data to a JSON file with UTF-8 encoding. + + Args: + data (Any): Serializable Python object. + path (str): Destination file path. + """ + os.makedirs(Path(path).parent, exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + @staticmethod + def preprocess(input_json_path: str, base_dir: str = ".") -> List[BaseSnippet]: + """ + Convert Snyk SARIF JSON into BaseSnippet objects. + + Args: + input_json_path (str): Path to Snyk SARIF output file. + base_dir (str): Base path of the source repo. + + Returns: + List[BaseSnippet]: Parsed list of code vulnerability snippets. + """ + sarif = SnykCodePreprocessor.read_json_file(input_json_path) + base_path = Path(base_dir).resolve() + snippets: List[BaseSnippet] = [] + + for run in sarif.get("runs", []): + rules_map = { + rule.get("id"): rule + for rule in run.get("tool", {}).get("driver", {}).get("rules", []) + } + + for idx, result in enumerate(run.get("results", [])): + location = result.get("locations", [{}])[0].get("physicalLocation", {}) + region = location.get("region", {}) + file_uri = location.get("artifactLocation", {}).get("uri", "") + file_path = (base_path / file_uri).resolve() + + if not file_path.exists(): + continue # Skip non-existent files + + try: + lines = file_path.read_text(encoding="utf-8").splitlines() + except Exception as e: + continue # Skip unreadable files + + full_code = "\n".join(lines) + start_line = region.get("startLine", 0) + end_line = region.get("endLine", start_line) + snippet = "\n".join(lines[start_line - 1:end_line]) + + rule_id = result.get("ruleId", "") + rule = rules_map.get(rule_id, {}) + help_uri = rule.get("helpUri", "") + cwe_tags = rule.get("properties", {}).get("tags", []) + cwe = [ + t.split("/")[-1].replace("cwe-", "CWE-") + for t in cwe_tags + if "cwe" in t.lower() + ] + references = [help_uri] if help_uri else [] + + snippets.append(BaseSnippet( + input=full_code.strip(), + idx=idx, + start_line=start_line, + end_line=end_line, + snippet=snippet.strip(), + message=result.get("message", {}).get("text", ""), + severity=result.get("level", "").upper(), + path=file_uri, + vulnerability_class=[rule_id.split("/", 1)[-1]] if rule_id else [], + cwe=cwe, + references=references + )) + + return snippets \ No newline at end of file diff --git a/src/autofic_core/sast/snykcode/runner.py b/src/autofic_core/sast/snykcode/runner.py new file mode 100644 index 0000000..b84078c --- /dev/null +++ b/src/autofic_core/sast/snykcode/runner.py @@ -0,0 +1,194 @@ +# ============================================================================= +# Copyright 2025 AutoFiC Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================= + +""" +SnykCodeRunner is responsible for executing Snyk Code analysis on a given repository path. + +It authenticates using the SNYK_TOKEN, locates the Snyk CLI binary (via PATH or custom path), +runs the analysis, and saves the SARIF result as a JSON file. +""" + +import subprocess +import shutil +import os +from pathlib import Path +from typing import Optional +from pydantic import BaseModel +import tempfile +from dotenv import load_dotenv +from autofic_core.errors import SnykCodeErrorMessages + + +load_dotenv() + + +class SnykCodeResult(BaseModel): + """Model representing the result of Snyk Code analysis.""" + stdout: str + stderr: str + returncode: int + result_path: Optional[str] = None + + +class SnykCodeRunner: + """ + Handles Snyk Code CLI execution and SARIF result saving. + + Args: + repo_path (Path): The path to the cloned repository to analyze. + """ + + def __init__(self, repo_path: Path): + self.repo_path = Path(repo_path).resolve() + self.snyk_token = os.environ.get("SNYK_TOKEN") + + def run_snykcode(self) -> SnykCodeResult: + """ + Executes the Snyk CLI with 'code test --json' on the target repo. + + Returns: + SnykCodeResult: Contains stdout, stderr, returncode, and result path. + """ + snyk_cmd, use_shell, prepend_node = self._resolve_snyk_command() + + if not self.snyk_token: + raise EnvironmentError(SnykCodeErrorMessages.TOKEN_MISSING) + + self._ensure_config() + + # Set up environment for subprocess + env = os.environ.copy() + env["SNYK_TOKEN"] = self.snyk_token + + # Simulate `snyk auth` + self._ensure_authenticated(snyk_cmd, env) + + valid_exts = {".js", ".jsx", ".ts", ".mjs"} + target_files = [ + p for p in self.repo_path.rglob("*") + if p.suffix in valid_exts and p.is_file() + ] + + if not target_files: + return SnykCodeResult( + stdout="", + stderr=SnykCodeErrorMessages.NO_JS_FILES_FOUND, + returncode=1 + ) + + with tempfile.TemporaryDirectory() as temp_dir: + temp_path = Path(temp_dir) + + for src_file in target_files: + rel_path = src_file.relative_to(self.repo_path) + dst_file = temp_path / rel_path + dst_file.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(src_file, dst_file) + + # Build command + cmd = [snyk_cmd, "code", "test", "--json"] + if prepend_node: + cmd.insert(0, "node") + + try: + result = subprocess.run( + cmd if not use_shell else " ".join(cmd), + cwd=temp_path, + env=env, + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + check=False, + shell=use_shell + ) + + output_path = self.repo_path / "snyk_result.sarif.json" + output_path.write_text(result.stdout, encoding="utf-8") + + return SnykCodeResult( + stdout=result.stdout, + stderr=result.stderr, + returncode=result.returncode, + result_path=str(output_path) + ) + + except subprocess.CalledProcessError as err: + return SnykCodeResult( + stdout=err.stdout or "", + stderr=err.stderr or "", + returncode=err.returncode + ) + + def _ensure_authenticated(self, snyk_cmd: str, env: dict) -> None: + """ + Authenticates with the Snyk CLI using the API token. + + Args: + snyk_cmd (str): Path to the Snyk CLI command. + env (dict): Environment variables with the token. + """ + subprocess.run( + [snyk_cmd, "config", "set", f"api={self.snyk_token}"], + env=env, + cwd=self.repo_path, + capture_output=True, + text=True, + check=False + ) + + def _ensure_config(self) -> None: + """ + Ensures a .snyk config file exists in the repo directory. + """ + config_path = self.repo_path / ".snyk" + if not config_path.exists(): + config_path.write_text("# empty config\n") + + def _resolve_snyk_command(self) -> tuple[str, bool, bool]: + """ + Locates the Snyk CLI command. + + Priority: + 1. Environment variable SNYK_CMD_PATH + 2. System PATH (snyk, snyk.cmd, snyk.exe) + 3. npm global bin fallback + + Returns: + tuple: (snyk_path, use_shell_flag, prepend_node_flag) + """ + # 1. Custom path via env + env_path = os.getenv("SNYK_CMD_PATH") + if env_path and Path(env_path).exists(): + return env_path, env_path.endswith(".cmd"), env_path.endswith(".js") + + # 2. Search PATH + for candidate in ["snyk.cmd", "snyk.exe", "snyk"]: + path = shutil.which(candidate) + if path: + return path, candidate.endswith(".cmd"), False + + # 3. Fallback to npm global bin + try: + npm_bin = subprocess.check_output(["npm", "bin", "-g"], text=True).strip() + for fallback in ["snyk.cmd", "snyk"]: + fallback_path = Path(npm_bin) / fallback + if fallback_path.exists(): + return str(fallback_path), fallback_path.suffix == ".cmd", False + except Exception: + pass + + raise FileNotFoundError(SnykCodeErrorMessages.CLI_NOT_FOUND) \ No newline at end of file diff --git a/src/autofic_core/utils/__init__.py b/src/autofic_core/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/autofic_core/utils/progress_utils.py b/src/autofic_core/utils/progress_utils.py new file mode 100644 index 0000000..0797644 --- /dev/null +++ b/src/autofic_core/utils/progress_utils.py @@ -0,0 +1,16 @@ +from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn +from rich.style import Style + +def create_progress(): + return Progress( + SpinnerColumn(style="cyan"), + TextColumn("{task.description}", style=Style(color="cyan", bold=True), justify="left"), + BarColumn( + bar_width=55, + style=Style(color="blue"), + complete_style=Style(color="bright_blue"), + finished_style=Style(color="bright_blue", bold=True), + ), + TextColumn("{task.percentage:>3.0f}%", style=Style(color="blue", bold=True)), + transient=False + ) \ No newline at end of file diff --git a/src/autofic_core/utils/ui_utils.py b/src/autofic_core/utils/ui_utils.py new file mode 100644 index 0000000..b69a352 --- /dev/null +++ b/src/autofic_core/utils/ui_utils.py @@ -0,0 +1,77 @@ +import time +from urllib.parse import urlparse +from rich.console import Console +from pathlib import Path + +console = Console() + +def print_divider(title: str): + diamond = "◆" + stage = f"[ {title} ]" + content = f" {diamond} {stage} {diamond} " + + total_width = 90 + line_length = (total_width - len(content)) // 2 + line = "━" * line_length + divider = f"{line}{content}{line}" + + console.print("\n\n" + f"[bold bright_magenta]{divider}[/bold bright_magenta]\n\n") + + +def extract_repo_name(repo_url: str) -> str: + parsed = urlparse(repo_url) + return parsed.path.strip("/").split("/")[-1] + +def print_summary(repo_url: str, detected_issues_count: int, output_dir: str, response_files: list): + print_divider("AutoFiC Summary") + + repo_name = extract_repo_name(repo_url) + console.print(f"📦 [bold]Target Repository:[/bold] {repo_name}") + console.print() + console.print(f"🛡️ [bold]Files with detected vulnerabilities:[/bold] {detected_issues_count} files") + console.print() + console.print(f"🤖 [bold]LLM Responses:[/bold] Saved in the 'llm' folder") + + time.sleep(2.0) + + +def print_help_message(): + console.print("\n\n[blod magenta][ AutoFiC CLI Usage Guide ][/blod magenta]") + console.print(r""" + +✔️ How to use options: + +--explain Display AutoFiC usage guide + +--repo GitHub repository URL to analyze (required) +--save-dir Directory to save analysis results (required) + +--sast Run SAST analysis using selected tool (semgrep, codeql, snyk) + +--llm Run LLM to fix vulnerable code and save response +--llm-retry Re-run LLM to verify and finalize code + +--patch Generate diff and apply it to original file + +--pr Pull request the final modified files to both my forked repository and the original repository + + + +※ Example usage: + + [ For Window ] + -> python -m autofic_core.cli --repo https://github.com/user/project --save-dir "C:\\Users\Username\\download\\AutoFiCResult" --sast --llm --patch --pr + + [ For Mac ] + -> python -m autofic_core.cli --repo https://github.com/user/project --save-dir "/Users/Username/Desktop/AutoFiCResult" --sast semgrep --llm --patch --pr + + + +⚠️ Note: + + - The --save-dir option must be entered as an absolute path. + - The --sast option must be run before using --llm or --llm-retry options. + - The --llm and --llm-retry options can only be used with one of them. + - The --patch option must be run before using --llm or --llm-retry options. + - The --pr option must be run before using --patch option. + """) \ No newline at end of file diff --git a/tests/test_basic.py b/tests/test_basic.py deleted file mode 100644 index ce0b41c..0000000 --- a/tests/test_basic.py +++ /dev/null @@ -1,4 +0,0 @@ -from autofic_core.sample_module import sample_function - -def test_sample_function(): - assert sample_function() == "Hello World" \ No newline at end of file