From 3151e3d189bc245e3b1b225f4f07ab5919a5d3d7 Mon Sep 17 00:00:00 2001 From: snw35 Date: Mon, 15 Dec 2025 14:06:01 +0000 Subject: [PATCH 1/4] Multi stage build support --- .vscode/launch.json | 2 +- README.md | 20 +++ dfupdate.py | 430 ++++++++++++++++++++++++++++++++++++++++---- test_dfupdate.py | 215 ++++++---------------- 4 files changed, 467 insertions(+), 200 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index eec5e26..e0473e1 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -7,7 +7,7 @@ "request": "launch", "program": "dfupdate.py", "console": "integratedTerminal", - "args": "-d ../cloudenv/Dockerfile -n ../cloudenv/new_ver.json" + "args": "-d ../reforge/neo/Dockerfile -n ../reforge/neo/new_ver.json" } ] } diff --git a/README.md b/README.md index 69c5339..2e23b27 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,26 @@ include_regex = "\\d+\\.\\d+\\.?\\d?-alpine\\d\\.\\d+" Note that the entry must be called 'BASE' for dfupate to recognise it. The script will update the Dockerfile directly with any newer base image found. +For multi-stage Dockerfiles, dfupdate will attempt to update every `FROM` line. By default it treats the final stage as `BASE`. Earlier stages are matched using the stage name, stage index, or both: + +- If the stage is named (e.g. `FROM node:22-alpine AS builder`), use `BASE_` or `_BASE`, for example `BASE_BUILDER`. +- All stages can also be addressed by index with `BASE_STAGE_` or `BASE`, where the first `FROM` is index `0`. +- If none of the above are present, the final stage falls back to `BASE`. + +Example `nvchecker.toml` for a two-stage build: + +``` +[BASE_BUILDER] +source = "container" +container = "library/node" +include_regex = "\\d+\\.\\d+-alpine\\d+\\.\\d+" + +[BASE] +source = "container" +container = "library/python" +include_regex = "\\d+\\.\\d+\\.?\\d?-alpine\\d\\.\\d+" +``` + ### Required ENV Variables You can update included software in your Dockerfile using nvchecker and dfupdate. diff --git a/dfupdate.py b/dfupdate.py index 81f460a..60ab4a1 100644 --- a/dfupdate.py +++ b/dfupdate.py @@ -5,15 +5,20 @@ import argparse import hashlib +import io import json import logging import os import shutil import sys import tempfile +from collections import defaultdict +from dataclasses import dataclass import requests from dockerfile_parse import DockerfileParser +from dockerfile_parse.parser import image_from +from dockerfile_parse.util import WordSplitter from tenacity import ( retry, wait_exponential, @@ -25,6 +30,44 @@ logger = logging.getLogger("dfupdate") +@dataclass +class Stage: + index: int + image: str | None + alias: str | None + tokens: list[str] + startline: int + endline: int + image_token_index: int | None + changed: bool = False + + +@dataclass +class EnvEntry: + key: str + value: str + raw_value: str + style: str + token_index: int + + +@dataclass +class EnvInstruction: + stage_index: int + startline: int + endline: int + tokens: list[str] + entries: list[EnvEntry] + changed: bool = False + + +@dataclass +class FileEdit: + startline: int + endline: int + new_lines: list[str] + + class DFUpdater: """ Dockerfile updater class. @@ -40,30 +83,310 @@ def __init__(self, nvcheck_file: str, dockerfile: str): nvcheck_file: absolute or relative path to nvchecker file. dockerfile: absolute or relative path to dockerfile. """ - self.dfp = DockerfileParser(path=dockerfile) + self.dfp: DockerfileParser | None = None # Paths to dockerfile and nvchecker file self.nvcheck_file = nvcheck_file self.dockerfile = dockerfile # Map of software and version found in dockerfile self.dockerfile_versions = {} + self.software_versions: dict[str, set[str]] = {} # Global update flag self.updated = False + # Parsed dockerfile state + self.lines: list[str] | None = None + self.stages: list[Stage] = [] + self.env_instructions: list[EnvInstruction] = [] + self.env_entries_by_name: dict[str, list[tuple[EnvInstruction, EnvEntry]]] = {} + + def _ensure_loaded(self): + """ + Load Dockerfile content into memory and parse structure for + stages and ENV instructions. + """ + if self.lines is not None: + return + self.lines = load_file_content(self.dockerfile).splitlines(keepends=True) + content = "".join(self.lines) + self.dfp = DockerfileParser(fileobj=io.StringIO(content), cache_content=True) + self.stages = self._parse_stages() + self.env_instructions = self._parse_env_instructions() + self.env_entries_by_name = self._index_env_entries() + + def _parse_stages(self) -> list[Stage]: + if not self.dfp: + return [] + stages: list[Stage] = [] + stage_index = -1 + for instruction in self.dfp.structure: + if instruction["instruction"] != "FROM": + continue + stage_index += 1 + tokens = list(WordSplitter(instruction["value"]).split(dequote=False)) + image, alias = image_from(instruction["value"]) + image_token_index = self._find_image_token_index(tokens) + stages.append( + Stage( + index=stage_index, + image=image, + alias=alias, + tokens=tokens, + startline=instruction["startline"], + endline=instruction["endline"], + image_token_index=image_token_index, + ) + ) + return stages + + def _find_image_token_index(self, tokens: list[str]) -> int | None: + for idx, token in enumerate(tokens): + if token.startswith("--"): + continue + return idx + return None + + def _parse_env_entries(self, tokens: list[str]) -> list[EnvEntry]: + entries: list[EnvEntry] = [] + if "=" not in tokens[0]: + key = tokens[0] + raw_value = " ".join(tokens[1:]) if len(tokens) > 1 else "" + value = WordSplitter(raw_value).dequote() if raw_value else "" + entries.append( + EnvEntry( + key=key, + value=value, + raw_value=raw_value, + style="space", + token_index=0, + ) + ) + return entries + + for idx, token in enumerate(tokens): + if "=" not in token: + continue + key, raw_value = token.split("=", 1) + value = WordSplitter(raw_value).dequote() + entries.append( + EnvEntry( + key=key, + value=value, + raw_value=raw_value, + style="equals", + token_index=idx, + ) + ) + return entries + + def _parse_env_instructions(self) -> list[EnvInstruction]: + if not self.dfp: + return [] + env_instructions: list[EnvInstruction] = [] + stage_index = -1 + for instruction in self.dfp.structure: + if instruction["instruction"] == "FROM": + stage_index += 1 + continue + if instruction["instruction"] != "ENV": + continue + tokens = list(WordSplitter(instruction["value"]).split(dequote=False)) + if not tokens: + continue + entries = self._parse_env_entries(tokens) + env_instructions.append( + EnvInstruction( + stage_index=stage_index, + startline=instruction["startline"], + endline=instruction["endline"], + tokens=tokens, + entries=entries, + ) + ) + return env_instructions + + def _index_env_entries(self) -> dict[str, list[tuple[EnvInstruction, EnvEntry]]]: + mapping: defaultdict[str, list[tuple[EnvInstruction, EnvEntry]]] = defaultdict( + list + ) + for env_instruction in self.env_instructions: + for entry in env_instruction.entries: + mapping[entry.key].append((env_instruction, entry)) + return dict(mapping) + + def _format_env_value(self, raw_value: str, new_value: str) -> str: + if ( + raw_value + and len(raw_value) >= 2 + and raw_value[0] == raw_value[-1] + and raw_value[0] in ("'", '"') + ): + return f"{raw_value[0]}{new_value}{raw_value[0]}" + if any(ch.isspace() for ch in new_value): + return f'"{new_value}"' + return new_value + + def _update_env_entry( + self, env_instruction: EnvInstruction, entry: EnvEntry, new_value: str + ) -> bool: + formatted_value = self._format_env_value(entry.raw_value, new_value) + if entry.style == "equals": + new_token = f"{entry.key}={formatted_value}" + if env_instruction.tokens[entry.token_index] == new_token: + return False + env_instruction.tokens[entry.token_index] = new_token + else: + new_token = formatted_value + if ( + len(env_instruction.tokens) < 2 + or env_instruction.tokens[1] != new_token + or env_instruction.tokens[0] != entry.key + ): + env_instruction.tokens = [entry.key, new_token] + else: + return False + entry.value = new_value + entry.raw_value = formatted_value + env_instruction.changed = True + self.updated = True + return True + + def _update_env_value(self, env_name: str, new_value: str) -> bool: + changed = False + for env_instruction, entry in self.env_entries_by_name.get(env_name, []): + changed = ( + self._update_env_entry(env_instruction, entry, new_value) or changed + ) + return changed + + def _get_env_value( + self, env_name: str, stage_index: int | None = None + ) -> str | None: + entries = self.env_entries_by_name.get(env_name, []) + if stage_index is not None: + for env_instruction, entry in entries: + if env_instruction.stage_index == stage_index and entry.value: + return entry.value + for _, entry in entries: + if entry.value: + return entry.value + return None + + def _generate_edits(self) -> list[FileEdit]: + edits: list[FileEdit] = [] + for stage in self.stages: + if stage.changed and stage.image_token_index is not None: + new_content = f"FROM {' '.join(stage.tokens)}\n" + edits.append(FileEdit(stage.startline, stage.endline, [new_content])) + for env_instruction in self.env_instructions: + if env_instruction.changed: + new_content = f"ENV {' '.join(env_instruction.tokens)}\n" + edits.append( + FileEdit( + env_instruction.startline, + env_instruction.endline, + new_content.splitlines(keepends=True), + ) + ) + return sorted(edits, key=lambda edit: edit.startline) + + def _apply_edits(self, edits: list[FileEdit]): + if self.lines is None: + return + updated_lines = self.lines[:] + offset = 0 + for edit in edits: + start = edit.startline + offset + end = edit.endline + offset + updated_lines[start : end + 1] = edit.new_lines + offset += len(edit.new_lines) - (end - start + 1) + self.lines = updated_lines + + def _write_changes(self): + edits = self._generate_edits() + if not edits or self.lines is None: + return + self._apply_edits(edits) + new_content = "".join(self.lines) + atomic_write_file(self.dockerfile, new_content) + self.dfp = DockerfileParser( + fileobj=io.StringIO(new_content), cache_content=True + ) + logger.info("%s has been updated!", self.dockerfile) + + def _split_image(self, image: str | None) -> tuple[str | None, str, str | None]: + if not image: + return None, "", None + if "@" in image: + repo, digest = image.split("@", 1) + return repo, "@", digest + last_slash = image.rfind("/") + last_colon = image.rfind(":") + if last_colon > last_slash: + return image[:last_colon], ":", image[last_colon + 1 :] + return image, ":", None + + def _repo_key(self, repo: str | None) -> str | None: + if not repo: + return None + name = repo.split("/")[-1] + name = name.split(":")[0] + key = name.replace("-", "_").upper() + return key or None + + def _base_version_for_stage(self, stage: Stage, nvcheck_json: dict) -> str | None: + repo, _, _ = self._split_image(stage.image) + candidates = [] + if self.stages and stage.index == len(self.stages) - 1: + candidates.append("BASE") + if stage.alias: + alias = stage.alias.upper() + candidates.extend([f"BASE_{alias}", f"{alias}_BASE"]) + candidates.append(f"BASE_STAGE_{stage.index}") + candidates.append(f"BASE{stage.index}") + repo_key = self._repo_key(repo) + if repo_key: + candidates.append(f"{repo_key}_BASE") + seen: set[str] = set() + for key in candidates: + if key in seen: + continue + seen.add(key) + version = get_nested(nvcheck_json, [key, "version"]) + if not version: + version = nvcheck_json.get(key) + if version: + return str(version) + return None + + def _stage_label(self, stage: Stage) -> str: + return stage.alias or f"stage {stage.index}" def get_dockerfile_versions(self): """ Create list of software and versions found in dockerfile. """ - software_packages = { - key.rsplit("_", 1)[0]: value - for key, value in self.dfp.envs.items() - if key.endswith("_VERSION") and value - } - for sw, ver in software_packages.items(): - upgrade_flag = self.dfp.envs.get(f"{sw}_UPGRADE", "true").lower() - if upgrade_flag == "false": + self._ensure_loaded() + self.dockerfile_versions = {} + self.software_versions = {} + upgrade_flags: dict[str, str] = {} + for env_name, entries in self.env_entries_by_name.items(): + if not env_name.endswith("_UPGRADE"): + continue + sw = env_name.rsplit("_", 1)[0] + value = entries[-1][1].value.lower() if entries else "" + upgrade_flags[sw] = value + + for env_name, entries in self.env_entries_by_name.items(): + if not env_name.endswith("_VERSION"): + continue + sw = env_name.rsplit("_", 1)[0] + if upgrade_flags.get(sw) == "false": logger.info("%s upgrade set to false, skipping.", sw) continue - self.dockerfile_versions[sw] = ver + values = {entry.value for _, entry in entries if entry.value} + if not values: + continue + self.software_versions[sw] = values + self.dockerfile_versions[sw] = next(iter(values)) def get_nvcheck_json(self) -> dict: nvcheck_content = load_file_content(self.nvcheck_file) @@ -81,14 +404,37 @@ def update_base(self, nvcheck_json: dict): Args: nvcheck_json: dictionary containing parsed nvchecker file JSON. """ - base_image, base_tag = self.dfp.baseimage.rsplit(":", 1) - base_version = get_nested(nvcheck_json, ["BASE", "version"]) - if base_version != base_tag: - logger.info("Base image out of date: %s -> %s", base_tag, base_version) - self.dfp.baseimage = f"{base_image}:{base_version}" - logger.info("Base image updated.") - else: - logger.info("Base image is up to date: %s", base_tag) + self._ensure_loaded() + for stage in self.stages: + repo, separator, current_tag = self._split_image(stage.image) + desired_version = self._base_version_for_stage(stage, nvcheck_json) + if not desired_version: + continue + if stage.image_token_index is None or not repo: + logger.warning( + "Unable to update base image for %s", self._stage_label(stage) + ) + continue + if current_tag == desired_version: + logger.info( + "Base image is up to date for %s: %s", + self._stage_label(stage), + current_tag or "", + ) + continue + logger.info( + "Base image out of date for %s: %s -> %s", + self._stage_label(stage), + current_tag, + desired_version, + ) + separator_to_use = separator or ":" + stage.tokens[stage.image_token_index] = ( + f"{repo}{separator_to_use}{desired_version}" + ) + stage.changed = True + self.updated = True + logger.info("Base image updated for %s.", self._stage_label(stage)) def check_software(self, nvcheck_json: dict): """ @@ -97,54 +443,61 @@ def check_software(self, nvcheck_json: dict): Args: nvcheck_json: dictionary containing parsed nvchecker file JSON. """ + self._ensure_loaded() for sw, ver in self.dockerfile_versions.items(): - # Attempt newer nvchecker format first new_ver = get_nested(nvcheck_json, [sw, "version"]) - # Fall back to old format if not new_ver: new_ver = nvcheck_json.get(sw) if not new_ver: logger.warning("Failed to find %s in %s", sw, self.nvcheck_file) continue - if new_ver == ver: + new_ver_str = str(new_ver) + current_values = self.software_versions.get(sw, {ver}) + if current_values == {new_ver_str}: logger.info("%s is up to date", sw) else: - self.updated = True - self.update_software(sw, str(new_ver), ver) + self.update_software(sw, new_ver_str, current_values) if self.updated: - atomic_write_file(self.dockerfile, self.dfp.content) - logger.info("%s has been updated!", self.dockerfile) + self._write_changes() - def update_software(self, sw: str, new_ver: str, current_ver: str): + def update_software(self, sw: str, new_ver: str, current_versions: set[str]): """ Update the specified software. Args: sw: the software name to update (as found in the dockerfile). new_ver: the new version to update to. - current_ver: the current software version. + current_versions: set of currently detected versions. """ - logger.info("Updating %s: %s -> %s", sw, current_ver, new_ver) + current_example = next(iter(current_versions), "") + logger.info("Updating %s: %s -> %s", sw, current_example, new_ver) # Update bare ENV versions - self.dfp.envs[f"{sw}_VERSION"] = new_ver # type: ignore[attr-defined] + version_env = f"{sw}_VERSION" + self._update_env_value(version_env, new_ver) + + # Use the first stage where the version appears to look up related envs + stage_hint = None + if self.env_entries_by_name.get(version_env): + stage_hint = self.env_entries_by_name[version_env][0][0].stage_index # Check for remote URL and get new shasum - df_url = self.dfp.envs.get(f"{sw}_URL") - df_filename = self.dfp.envs.get(f"{sw}_FILENAME") - df_sha = self.dfp.envs.get(f"{sw}_SHA256") + df_url = self._get_env_value(f"{sw}_URL", stage_hint) + df_filename = self._get_env_value(f"{sw}_FILENAME", stage_hint) + df_sha = self._get_env_value(f"{sw}_SHA256", stage_hint) if df_url and df_filename and df_sha: logger.info("Found remote URL, fetching and calculating new shasum") full_url = df_url + "/" + df_filename - full_url = full_url.replace(current_ver, new_ver) + full_url = full_url.replace(current_example, new_ver) logger.info("Retrieving new SHA256 for %s from %s", sw, full_url) new_sha = get_remote_sha(full_url) if new_sha: - self.dfp.envs[f"{sw}_SHA256"] = new_sha # type: ignore[attr-defined] + self._update_env_value(f"{sw}_SHA256", new_sha) else: logger.error("Got empty shasum! Skipping %s", sw) # Reset ENV values to avoid updating - self.dfp.envs[f"{sw}_VERSION"] = current_ver # type: ignore[attr-defined] + if current_example: + self._update_env_value(version_env, current_example) else: logger.info( "Attribute not found: URL:%s filename:%s sha:%s", @@ -162,6 +515,13 @@ def update(self): Check each software package and updated if needed. """ nvcheck_json = self.get_nvcheck_json() + self.lines = None + self.stages = [] + self.env_instructions = [] + self.env_entries_by_name = {} + self.dfp = None + self.updated = False + self._ensure_loaded() self.get_dockerfile_versions() self.update_base(nvcheck_json) self.check_software(nvcheck_json) diff --git a/test_dfupdate.py b/test_dfupdate.py index 206c39d..6a80f58 100644 --- a/test_dfupdate.py +++ b/test_dfupdate.py @@ -1,44 +1,10 @@ import dfupdate +import json import os -import sys -import types +import tempfile import unittest from unittest import mock -fake_mod = types.ModuleType("dockerfile_parse") - - -class FakeDockerfileParser: - def __init__(self, *args, **kwargs): - self._content = "" - self.envs = {} - self.baseimage = "python:3.10" - - @property - def content(self): - return self._content - - @content.setter - def content(self, value): - self._content = value - # Parse simple "ENV KEY=VAL [KEY2=VAL2 ...]" lines - envs = {} - for line in value.splitlines(): - s = line.strip() - if not s.startswith("ENV "): - continue - parts = s[4:].split() - for part in parts: - if "=" in part: - k, v = part.split("=", 1) - envs[k.strip()] = v.strip() - # merge (overlay parsed entries) - self.envs.update(envs) - - -fake_mod.DockerfileParser = FakeDockerfileParser # type: ignore[attr-defined] -sys.modules.setdefault("dockerfile_parse", fake_mod) - class TestGetNested(unittest.TestCase): def test_get_nested_direct(self): @@ -121,21 +87,23 @@ def raise_for_status(self): class TestDFUpdater(unittest.TestCase): def setUp(self): - # Fresh updater with fake parser inside dfupdate - self.updater = dfupdate.DFUpdater("new_ver.json", "Dockerfile") - # Replace dfp with a fresh fake parser instance we can control - self.updater.dfp = FakeDockerfileParser() # type: ignore[attr-defined] - - @mock.patch( - "dfupdate.load_file_content", - return_value="FROM python:3.10\nENV FOO_VERSION=1.0\nENV BAR_VERSION=2.0 BAR_UPGRADE=false\n", - ) - def test_get_dockerfile_versions_respects_upgrade_flag(self, _m): - # Populate envs by setting content (simulating Dockerfile reading done inside DFUpdater methods) - self.updater.dfp.content = "FROM python:3.10\nENV FOO_VERSION=1.0\nENV BAR_VERSION=2.0 BAR_UPGRADE=false\n" - # Call method; it reads envs from parser + self.tempdir = tempfile.TemporaryDirectory() + self.addCleanup(self.tempdir.cleanup) + self.dockerfile_path = os.path.join(self.tempdir.name, "Dockerfile") + self.nvchecker_path = os.path.join(self.tempdir.name, "new_ver.json") + with open(self.nvchecker_path, "w", encoding="utf8") as fh: + json.dump({}, fh) + self.updater = dfupdate.DFUpdater(self.nvchecker_path, self.dockerfile_path) + + def _write_dockerfile(self, content: str): + with open(self.dockerfile_path, "w", encoding="utf8") as fh: + fh.write(content) + + def test_get_dockerfile_versions_respects_upgrade_flag(self): + self._write_dockerfile( + "FROM python:3.10\nENV FOO_VERSION=1.0\nENV BAR_VERSION=2.0\nENV BAR_UPGRADE=false\n" + ) self.updater.get_dockerfile_versions() - # BAR has upgrade=false and should be skipped self.assertEqual(self.updater.dockerfile_versions, {"FOO": "1.0"}) @mock.patch( @@ -146,123 +114,42 @@ def test_get_nvcheck_json(self, _m): nvj = self.updater.get_nvcheck_json() self.assertEqual(nvj["BASE"]["version"], "3.11") - def test_update_base_when_same(self): - self.updater.dfp.baseimage = "python:3.11" - nvj = {"BASE": {"version": "3.11"}} - with mock.patch.object(dfupdate, "logger") as mlog: - self.updater.update_base(nvj) - mlog.info.assert_any_call("Base image is up to date: %s", "3.11") - - def test_update_base_when_different(self): - self.updater.dfp.baseimage = "python:3.10" - nvj = {"BASE": {"version": "3.11"}} - with mock.patch.object(dfupdate, "logger") as mlog: - self.updater.update_base(nvj) - self.assertEqual(self.updater.dfp.baseimage, "python:3.11") - mlog.info.assert_any_call("Base image updated.") - - @mock.patch("dfupdate.atomic_write_file") + def test_update_base_multi_stage(self): + self._write_dockerfile( + "FROM python:3.10 AS builder\n" + "ENV FOO_VERSION=1.0\n" + "FROM alpine:3.18\n" + "ENV BAR_VERSION=2.0\n" + ) + nvj = {"BASE_BUILDER": {"version": "3.11"}, "BASE": {"version": "3.19"}} + self.updater.update_base(nvj) + # Trigger write of base changes + self.updater.check_software({}) + with open(self.dockerfile_path, "r", encoding="utf8") as fh: + content = fh.read() + self.assertIn("FROM python:3.11 AS builder", content) + self.assertIn("FROM alpine:3.19", content) + @mock.patch("dfupdate.get_remote_sha", return_value="deadbeef") - def test_check_software_updates_and_writes(self, msha, matomic): - # Prepare envs for URL/FILENAME/SHA logic - self.updater.dfp.envs = { - "FOO_VERSION": "1.0", - "FOO_URL": "https://example.com/downloads", - "FOO_FILENAME": "foo-1.0-linux.tgz", - "FOO_SHA256": "old", - } - self.updater.dfp.content = "Dockerfile content here" - # Detected software in Dockerfile - self.updater.dockerfile_versions = {"FOO": "1.0"} - # nvchecker says new version + def test_check_software_updates_across_stages(self, _msha): + self._write_dockerfile( + "FROM python:3.10 AS builder\n" + "ENV FOO_VERSION=1.0\n" + "ENV FOO_SHA256=old\n" + "FROM python:3.10\n" + "ENV FOO_VERSION=1.0\n" + "ENV FOO_URL https://example.com\n" + "ENV FOO_FILENAME foo-1.0.tgz\n" + "ENV FOO_SHA256 old\n" + ) nvj = {"FOO": {"version": "2.0"}} - + self.updater.get_dockerfile_versions() self.updater.check_software(nvj) - - # Should have updated envs - self.assertEqual(self.updater.dfp.envs["FOO_VERSION"], "2.0") - self.assertEqual(self.updater.dfp.envs["FOO_SHA256"], "deadbeef") - # Should have written file once - matomic.assert_called_once_with("Dockerfile", "Dockerfile content here") - # URL was constructed with version substituted - full_url_used = msha.call_args.args[0] - self.assertIn("foo-2.0-linux.tgz", full_url_used) - - @mock.patch.object(dfupdate, "atomic_write_file") - @mock.patch.object(dfupdate, "get_remote_sha", return_value=None) - def test_check_software_writes_even_if_sha_missing(self, msha, matomic): - self.updater.dfp.envs = { - "FOO_VERSION": "1.0", - "FOO_URL": "https://example.com", - "FOO_FILENAME": "foo-1.0.tgz", - "FOO_SHA256": "old", - } - self.updater.dfp.content = "x" - self.updater.dockerfile_versions = {"FOO": "1.0"} - nvj = {"FOO": {"version": "2.0"}} - - with mock.patch.object(dfupdate, "logger") as mlog: - self.updater.check_software(nvj) - # Current implementation sets updated True before SHA is known, - # so it still writes the Dockerfile. - matomic.assert_called_once() - mlog.error.assert_any_call("Got empty shasum! Skipping %s", "FOO") - - @mock.patch.object(dfupdate, "atomic_write_file") - def test_update_software_rollback_on_unknown_http_error(self, matomic): - # Given: Dockerfile has FOO v1.0 and download metadata - self.updater.dfp.envs = { - "FOO_VERSION": "1.0", - "FOO_URL": "https://example.com/files", - "FOO_FILENAME": "foo-1.0-linux.tgz", - "FOO_SHA256": "oldsha", - } - self.updater.dfp.content = "x" - self.updater.dockerfile_versions = {"FOO": "1.0"} - - # And nvchecker says 2.0 is available - nvj = {"FOO": {"version": "2.0"}} - - # When: fetching SHA raises an HTTPError with no status (Unknown) - class FakeResp: - def __init__(self): - pass - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc, tb): - pass - - def raise_for_status(self): - from requests.exceptions import HTTPError - - # Simulate "unknown" status (response=None) - raise HTTPError(response=None) - - def iter_content(self, chunk_size=1024): - yield b"" - - with mock.patch("requests.get", return_value=FakeResp()): - with mock.patch.object(dfupdate, "logger") as mlog: - self.updater.check_software(nvj) - - # Then: version was rolled back to current (1.0) - self.assertEqual(self.updater.dfp.envs.get("FOO_VERSION"), "1.0") - # SHA not updated - self.assertEqual(self.updater.dfp.envs.get("FOO_SHA256"), "oldsha") - # Dockerfile still written once due to updated flag set prior to SHA fetch - matomic.assert_called_once() - # And we logged an HTTP error with "Unknown" - # Collect error log call args as strings - error_msgs = [] - for call in mlog.error.mock_calls: - _, args, kwargs = call - if args: - error_msgs.append(" ".join(str(a) for a in args)) - self.assertTrue( - any("HTTP error" in m and "Unknown" in m for m in error_msgs) - ) + with open(self.dockerfile_path, "r", encoding="utf8") as fh: + content = fh.read() + self.assertEqual(content.count("FOO_VERSION=2.0"), 2) + self.assertIn("FOO_SHA256=deadbeef", content) + self.assertIn("FOO_SHA256 deadbeef", content) class TestParseArgs(unittest.TestCase): From caba0cec426fdd6a84cb06c6e71c837ed77067ba Mon Sep 17 00:00:00 2001 From: snw35 Date: Mon, 15 Dec 2025 14:38:22 +0000 Subject: [PATCH 2/4] Update version --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 81aca2a..5f708e6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ COPY dfupdate.py /dfupdate.py COPY docker-entrypoint.sh /docker-entrypoint.sh -ENV DFUPDATE_VERSION 2.1.1 +ENV DFUPDATE_VERSION 2.2.1 ENV REQUESTS_VERSION 2.32.5 ENV TENACITY_VERSION 9.1.2 ENV DOCKERFILE_PARSE_VERSION 2.0.1 From 3715ee3c32ea76f397bc56fcd61b2d5fb5785cb4 Mon Sep 17 00:00:00 2001 From: snw35 Date: Mon, 15 Dec 2025 18:31:42 +0000 Subject: [PATCH 3/4] Use own files for debug runs --- .vscode/launch.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index e0473e1..89f1477 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -7,7 +7,7 @@ "request": "launch", "program": "dfupdate.py", "console": "integratedTerminal", - "args": "-d ../reforge/neo/Dockerfile -n ../reforge/neo/new_ver.json" + "args": "-d ./Dockerfile -n ./new_ver.json" } ] } From eaf74e34ad87cf51da60abccd6fc97fed0d35aeb Mon Sep 17 00:00:00 2001 From: snw35 Date: Mon, 15 Dec 2025 18:42:48 +0000 Subject: [PATCH 4/4] Fix typing issues --- dfupdate.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/dfupdate.py b/dfupdate.py index 60ab4a1..118d1c1 100644 --- a/dfupdate.py +++ b/dfupdate.py @@ -14,6 +14,7 @@ import tempfile from collections import defaultdict from dataclasses import dataclass +from typing import Sequence, cast import requests from dockerfile_parse import DockerfileParser @@ -121,7 +122,9 @@ def _parse_stages(self) -> list[Stage]: if instruction["instruction"] != "FROM": continue stage_index += 1 - tokens = list(WordSplitter(instruction["value"]).split(dequote=False)) + tokens = cast( + list[str], list(WordSplitter(instruction["value"]).split(dequote=False)) + ) image, alias = image_from(instruction["value"]) image_token_index = self._find_image_token_index(tokens) stages.append( @@ -137,7 +140,7 @@ def _parse_stages(self) -> list[Stage]: ) return stages - def _find_image_token_index(self, tokens: list[str]) -> int | None: + def _find_image_token_index(self, tokens: Sequence[str]) -> int | None: for idx, token in enumerate(tokens): if token.startswith("--"): continue @@ -188,7 +191,9 @@ def _parse_env_instructions(self) -> list[EnvInstruction]: continue if instruction["instruction"] != "ENV": continue - tokens = list(WordSplitter(instruction["value"]).split(dequote=False)) + tokens = cast( + list[str], list(WordSplitter(instruction["value"]).split(dequote=False)) + ) if not tokens: continue entries = self._parse_env_entries(tokens)