From dc333dfc68d2bb39a4e440b8225a3c6e1f1a2434 Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Thu, 2 Oct 2025 07:43:06 -0700 Subject: [PATCH 1/2] Reapply "feat(fsm): add support to announce Windows processes." This reverts commit d53fdd8dda397fd6cf82d7361b00a5575a00e1b1. Signed-off-by: Paulo Vital --- src/instana/fsm.py | 110 +++++++++++++++++++++++++++++++-------------- 1 file changed, 77 insertions(+), 33 deletions(-) diff --git a/src/instana/fsm.py b/src/instana/fsm.py index c4145a5f..725b15a0 100644 --- a/src/instana/fsm.py +++ b/src/instana/fsm.py @@ -8,13 +8,14 @@ import subprocess import sys import threading -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any, Callable, List from fysom import Fysom from instana.log import logger from instana.util import get_default_gateway from instana.util.process_discovery import Discovery +from instana.util.runtime import is_windows from instana.version import VERSION if TYPE_CHECKING: @@ -103,34 +104,18 @@ def lookup_agent_host(self, e: Any) -> bool: return False def announce_sensor(self, e: Any) -> bool: + pid: int = os.getpid() logger.debug( - f"Attempting to make an announcement to the agent on {self.agent.options.agent_host}:{self.agent.options.agent_port}" + f"Attempting to announce PID {pid} to the agent on {self.agent.options.agent_host}:{self.agent.options.agent_port}" ) - pid = os.getpid() - try: - if os.path.isfile("/proc/self/cmdline"): - with open("/proc/self/cmdline") as cmd: - cmdinfo = cmd.read() - cmdline = cmdinfo.split("\x00") - else: - # Python doesn't provide a reliable method to determine what - # the OS process command line may be. Here we are forced to - # rely on ps rather than adding a dependency on something like - # psutil which requires dev packages, gcc etc... - proc = subprocess.Popen( - ["ps", "-p", str(pid), "-o", "args"], stdout=subprocess.PIPE - ) - (out, _) = proc.communicate() - parts = out.split(b"\n") - cmdline = [parts[1].decode("utf-8")] - except Exception: - cmdline = sys.argv - logger.debug("announce_sensor", exc_info=True) + cmdline = self._get_cmdline(pid) d = Discovery(pid=self.__get_real_pid(), name=cmdline[0], args=cmdline[1:]) - # If we're on a system with a procfs + # File descriptor (fd) and inode detection on a procfs systems. + # Unfortunatly this process can not be isolated in a method since it + # doesn't detect the inode correctly on containers. if os.path.exists("/proc/"): try: # In CentOS 7, some odd things can happen such as: @@ -144,7 +129,9 @@ def announce_sensor(self, e: Any) -> bool: d.fd = sock.fileno() d.inode = os.readlink(path) except: # noqa: E722 - logger.debug("Error generating file descriptor: ", exc_info=True) + logger.debug( + "Error generating file descriptor and inode: ", exc_info=True + ) payload = self.agent.announce(d) @@ -189,28 +176,85 @@ def on_good2go(self, _: Any) -> None: def __get_real_pid(self) -> int: """ Attempts to determine the true process ID by querying the - /proc//sched file. This works on systems with a proc filesystem. - Otherwise default to os default. + /proc//sched file on Linux systems or using the OS default PID. + For Windows, we use the standard OS PID as there's no equivalent concept + of container PIDs vs host PIDs. """ pid = None + # For Linux systems with procfs if os.path.exists("/proc/"): sched_file = f"/proc/{os.getpid()}/sched" if os.path.isfile(sched_file): try: - file = open(sched_file) - line = file.readline() - g = re.search(r"\((\d+),", line) - if g and len(g.groups()) == 1: - pid = int(g.groups()[0]) + with open(sched_file) as file: + line = file.readline() + g = re.search(r"\((\d+),", line) + if g and len(g.groups()) == 1: + pid = int(g.groups()[0]) except Exception: - logger.debug("parsing sched file failed", exc_info=True) + logger.debug("parsing sched file failed: ", exc_info=True) + # For Windows or if Linux method failed if pid is None: pid = os.getpid() return pid + def _get_cmdline_windows(self) -> List[str]: + """ + Get command line using Windows API + """ + import ctypes + from ctypes import wintypes + + GetCommandLineW = ctypes.windll.kernel32.GetCommandLineW + GetCommandLineW.argtypes = [] + GetCommandLineW.restype = wintypes.LPCWSTR + + cmd = GetCommandLineW() + # Simple parsing - this is a basic approach and might need refinement + # for complex command lines with quotes and spaces + return cmd.split() + + def _get_cmdline_linux_proc(self) -> List[str]: + """ + Get command line from Linux /proc filesystem + """ + with open("/proc/self/cmdline") as cmd: + cmdinfo = cmd.read() + return cmdinfo.split("\x00") + + def _get_cmdline_unix_ps(self, pid: int) -> List[str]: + """ + Get command line using ps command (for Unix-like systems without /proc) + """ + proc = subprocess.Popen( + ["ps", "-p", str(pid), "-o", "args"], stdout=subprocess.PIPE + ) + (out, _) = proc.communicate() + parts = out.split(b"\n") + return [parts[1].decode("utf-8")] + + def _get_cmdline_unix(self, pid: int) -> List[str]: + """ + Get command line using Unix + """ + if os.path.isfile("/proc/self/cmdline"): + return self._get_cmdline_linux_proc() + else: + return self._get_cmdline_unix_ps(pid) -# Made with Bob + def _get_cmdline(self, pid: int) -> List[str]: + """ + Get command line in a platform-independent way + """ + try: + if is_windows(): + return self._get_cmdline_windows() + else: + return self._get_cmdline_unix(pid) + except Exception: + logger.debug("Error getting command line: ", exc_info=True) + return sys.argv From 137959690ca9b82657e56c63028bb44c3ffc546a Mon Sep 17 00:00:00 2001 From: Paulo Vital Date: Mon, 12 Jan 2026 17:01:42 +0100 Subject: [PATCH 2/2] test(fsm): Add FSM cmdline-related unit tests. Unit tests for TheMachine cmdline-related methods in fsm.py. This test module provides comprehensive coverage for the command line retrieval functions that work across different platforms (Windows, Linux, Unix). Tested functions: - _get_cmdline_windows(): Retrieves command line on Windows using ctypes - _get_cmdline_linux_proc(): Retrieves command line from /proc/self/cmdline - _get_cmdline_unix_ps(): Retrieves command line using ps command - _get_cmdline_unix(): Dispatches to appropriate Unix method - _get_cmdline(): Main entry point with platform detection and error handling Signed-off-by: Paulo Vital --- tests/test_fsm_cmdline.py | 384 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 384 insertions(+) create mode 100644 tests/test_fsm_cmdline.py diff --git a/tests/test_fsm_cmdline.py b/tests/test_fsm_cmdline.py new file mode 100644 index 00000000..5c9eed99 --- /dev/null +++ b/tests/test_fsm_cmdline.py @@ -0,0 +1,384 @@ +# (c) Copyright IBM Corp. 2025 +""" +Unit tests for TheMachine cmdline-related methods in fsm.py. + +This test module provides comprehensive coverage for the command line retrieval +functions that work across different platforms (Windows, Linux, Unix). + +Tested functions: +- _get_cmdline_windows(): Retrieves command line on Windows using ctypes +- _get_cmdline_linux_proc(): Retrieves command line from /proc/self/cmdline +- _get_cmdline_unix_ps(): Retrieves command line using ps command +- _get_cmdline_unix(): Dispatches to appropriate Unix method +- _get_cmdline(): Main entry point with platform detection and error handling + +""" + +import os +import subprocess +import sys +from typing import Generator +from unittest.mock import Mock, mock_open, patch + +import pytest + +from instana.fsm import TheMachine + + +class TestTheMachineCmdline: + """Test suite for TheMachine cmdline-related methods.""" + + @pytest.fixture(autouse=True) + def _resource(self) -> Generator[None, None, None]: + """Setup and teardown for each test.""" + with patch("instana.fsm.TheMachine.__init__", return_value=None): + self.machine = TheMachine(Mock()) + yield + + @pytest.mark.parametrize( + "cmdline_input,expected_output", + [ + ( + "C:\\Python\\python.exe script.py arg1 arg2", + ["C:\\Python\\python.exe", "script.py", "arg1", "arg2"], + ), + ( + "python.exe -m module --flag value", + ["python.exe", "-m", "module", "--flag", "value"], + ), + ("single_command", ["single_command"]), + ( + "cmd.exe /c echo hello", + ["cmd.exe", "/c", "echo", "hello"], + ), + ], + ids=[ + "full_path_with_args", + "python_module_with_flags", + "single_command", + "cmd_with_subcommand", + ], + ) + def test_get_cmdline_windows( + self, cmdline_input: str, expected_output: list, mocker + ) -> None: + """Test _get_cmdline_windows with various command line formats.""" + mocker.patch( + "ctypes.windll", + create=True, + ) + + with patch("ctypes.windll.kernel32.GetCommandLineW") as mock_get_cmdline: + mock_get_cmdline.return_value = cmdline_input + result = self.machine._get_cmdline_windows() + assert result == expected_output + + def test_get_cmdline_windows_empty_string(self, mocker) -> None: + """Test _get_cmdline_windows with empty command line.""" + mocker.patch( + "ctypes.windll", + create=True, + ) + + with patch("ctypes.windll.kernel32.GetCommandLineW") as mock_get_cmdline: + mock_get_cmdline.return_value = "" + result = self.machine._get_cmdline_windows() + assert result == [] + + @pytest.mark.parametrize( + "proc_content,expected_output", + [ + ( + "python\x00script.py\x00arg1\x00arg2\x00", + ["python", "script.py", "arg1", "arg2", ""], + ), + ( + "/usr/bin/python3\x00-m\x00flask\x00run\x00", + ["/usr/bin/python3", "-m", "flask", "run", ""], + ), + ("gunicorn\x00app:app\x00", ["gunicorn", "app:app", ""]), + ("/usr/bin/python\x00", ["/usr/bin/python", ""]), + ( + "python3\x00-c\x00print('hello')\x00", + ["python3", "-c", "print('hello')", ""], + ), + ], + ids=[ + "basic_script_with_args", + "python_module", + "gunicorn_app", + "single_executable", + "python_command", + ], + ) + def test_get_cmdline_linux_proc( + self, proc_content: str, expected_output: list + ) -> None: + """Test _get_cmdline_linux_proc with various /proc/self/cmdline formats.""" + with patch("builtins.open", mock_open(read_data=proc_content)): + result = self.machine._get_cmdline_linux_proc() + assert result == expected_output + + def test_get_cmdline_linux_proc_file_not_found(self) -> None: + """Test _get_cmdline_linux_proc when file doesn't exist.""" + with patch("builtins.open", side_effect=FileNotFoundError()): + with pytest.raises(FileNotFoundError): + self.machine._get_cmdline_linux_proc() + + def test_get_cmdline_linux_proc_permission_error(self) -> None: + """Test _get_cmdline_linux_proc with permission error.""" + with patch("builtins.open", side_effect=PermissionError()): + with pytest.raises(PermissionError): + self.machine._get_cmdline_linux_proc() + + @pytest.mark.parametrize( + "ps_output,expected_output", + [ + ( + b"COMMAND\npython script.py arg1 arg2\n", + ["python script.py arg1 arg2"], + ), + ( + b"COMMAND\n/usr/bin/python3 -m flask run\n", + ["/usr/bin/python3 -m flask run"], + ), + (b"COMMAND\ngunicorn app:app\n", ["gunicorn app:app"]), + (b"COMMAND\npython\n", ["python"]), + ], + ids=[ + "script_with_args", + "python_module", + "gunicorn", + "single_command", + ], + ) + def test_get_cmdline_unix_ps(self, ps_output: bytes, expected_output: list) -> None: + """Test _get_cmdline_unix_ps with various ps command outputs.""" + mock_proc = Mock() + mock_proc.communicate.return_value = (ps_output, b"") + + with patch("subprocess.Popen", return_value=mock_proc) as mock_popen: + result = self.machine._get_cmdline_unix_ps(1234) + assert result == expected_output + mock_popen.assert_called_once_with( + ["ps", "-p", "1234", "-o", "args"], stdout=subprocess.PIPE + ) + + def test_get_cmdline_unix_ps_with_different_pid(self) -> None: + """Test _get_cmdline_unix_ps with different PID values.""" + mock_proc = Mock() + mock_proc.communicate.return_value = (b"COMMAND\ntest_process\n", b"") + + with patch("subprocess.Popen", return_value=mock_proc) as mock_popen: + result = self.machine._get_cmdline_unix_ps(9999) + assert result == ["test_process"] + mock_popen.assert_called_once_with( + ["ps", "-p", "9999", "-o", "args"], stdout=subprocess.PIPE + ) + + def test_get_cmdline_unix_ps_empty_output(self) -> None: + """Test _get_cmdline_unix_ps with empty ps output.""" + mock_proc = Mock() + mock_proc.communicate.return_value = (b"COMMAND\n\n", b"") + + with patch("subprocess.Popen", return_value=mock_proc): + result = self.machine._get_cmdline_unix_ps(1234) + assert result == [""] + + def test_get_cmdline_unix_ps_subprocess_error(self) -> None: + """Test _get_cmdline_unix_ps when subprocess fails.""" + with patch( + "subprocess.Popen", side_effect=subprocess.SubprocessError("Test error") + ): + with pytest.raises(subprocess.SubprocessError): + self.machine._get_cmdline_unix_ps(1234) + + @pytest.mark.parametrize( + "proc_exists,proc_content,expected_output", + [ + ( + True, + "python\x00script.py\x00", + ["python", "script.py", ""], + ), + ( + False, + None, + ["ps_output"], + ), + ], + ids=["proc_exists", "proc_not_exists"], + ) + def test_get_cmdline_unix( + self, proc_exists: bool, proc_content: str, expected_output: list + ) -> None: + """Test _get_cmdline_unix with and without /proc filesystem.""" + with patch("os.path.isfile", return_value=proc_exists): + if proc_exists: + with patch("builtins.open", mock_open(read_data=proc_content)): + result = self.machine._get_cmdline_unix(1234) + assert result == expected_output + else: + mock_proc = Mock() + mock_proc.communicate.return_value = (b"COMMAND\nps_output\n", b"") + with patch("subprocess.Popen", return_value=mock_proc): + result = self.machine._get_cmdline_unix(1234) + assert result == expected_output + + def test_get_cmdline_unix_proc_file_check(self) -> None: + """Test _get_cmdline_unix checks for /proc/self/cmdline correctly.""" + with patch("os.path.isfile") as mock_isfile: + mock_isfile.return_value = True + with patch("builtins.open", mock_open(read_data="test\x00")): + self.machine._get_cmdline_unix(1234) + mock_isfile.assert_called_once_with("/proc/self/cmdline") + + @pytest.mark.parametrize( + "is_windows_value,expected_method", + [ + (True, "_get_cmdline_windows"), + (False, "_get_cmdline_unix"), + ], + ids=["windows", "unix"], + ) + def test_get_cmdline_platform_detection( + self, is_windows_value: bool, expected_method: str + ) -> None: + """Test _get_cmdline correctly detects platform and calls appropriate method.""" + with patch("instana.fsm.is_windows", return_value=is_windows_value): + if is_windows_value: + with patch.object( + self.machine, "_get_cmdline_windows", return_value=["windows_cmd"] + ) as mock_method: + result = self.machine._get_cmdline(1234) + assert result == ["windows_cmd"] + mock_method.assert_called_once() + else: + with patch.object( + self.machine, "_get_cmdline_unix", return_value=["unix_cmd"] + ) as mock_method: + result = self.machine._get_cmdline(1234) + assert result == ["unix_cmd"] + mock_method.assert_called_once_with(1234) + + def test_get_cmdline_windows_exception_fallback(self) -> None: + """Test _get_cmdline falls back to sys.argv on Windows exception.""" + with patch("instana.fsm.is_windows", return_value=True), patch.object( + self.machine, "_get_cmdline_windows", side_effect=Exception("Test error") + ), patch("instana.fsm.logger.debug") as mock_logger: + result = self.machine._get_cmdline(1234) + assert result == sys.argv + mock_logger.assert_called_once() + + def test_get_cmdline_unix_exception_fallback(self) -> None: + """Test _get_cmdline falls back to sys.argv on Unix exception.""" + with patch("instana.fsm.is_windows", return_value=False), patch.object( + self.machine, "_get_cmdline_unix", side_effect=Exception("Test error") + ), patch("instana.fsm.logger.debug") as mock_logger: + result = self.machine._get_cmdline(1234) + assert result == sys.argv + mock_logger.assert_called_once() + + @pytest.mark.parametrize( + "exception_type", + [ + OSError, + IOError, + PermissionError, + FileNotFoundError, + RuntimeError, + ], + ids=[ + "OSError", + "IOError", + "PermissionError", + "FileNotFoundError", + "RuntimeError", + ], + ) + def test_get_cmdline_various_exceptions(self, exception_type: type) -> None: + """Test _get_cmdline handles various exception types gracefully.""" + with patch("instana.fsm.is_windows", return_value=False), patch.object( + self.machine, "_get_cmdline_unix", side_effect=exception_type("Test error") + ): + result = self.machine._get_cmdline(1234) + assert result == sys.argv + + def test_get_cmdline_with_actual_pid(self) -> None: + """Test _get_cmdline with actual process ID.""" + current_pid = os.getpid() + with patch("instana.fsm.is_windows", return_value=False), patch.object( + self.machine, "_get_cmdline_unix", return_value=["test_cmd"] + ) as mock_method: + result = self.machine._get_cmdline(current_pid) + assert result == ["test_cmd"] + mock_method.assert_called_once_with(current_pid) + + def test_get_cmdline_windows_with_quotes(self, mocker) -> None: + """Test _get_cmdline_windows handles command lines with quotes.""" + cmdline_with_quotes = '"C:\\Program Files\\Python\\python.exe" "my script.py"' + mocker.patch( + "ctypes.windll", + create=True, + ) + + with patch("ctypes.windll.kernel32.GetCommandLineW") as mock_get_cmdline: + mock_get_cmdline.return_value = cmdline_with_quotes + result = self.machine._get_cmdline_windows() + # Note: Simple split() doesn't handle quotes properly, this tests current behavior + assert isinstance(result, list) + assert len(result) > 0 + + def test_get_cmdline_linux_proc_with_empty_args(self) -> None: + """Test _get_cmdline_linux_proc with command that has empty arguments.""" + proc_content = "python\x00\x00\x00" + with patch("builtins.open", mock_open(read_data=proc_content)): + result = self.machine._get_cmdline_linux_proc() + assert result == ["python", "", "", ""] + + def test_get_cmdline_unix_ps_with_multiline_output(self) -> None: + """Test _get_cmdline_unix_ps handles multiline ps output correctly.""" + ps_output = b"COMMAND\npython script.py\nextra line\n" + mock_proc = Mock() + mock_proc.communicate.return_value = (ps_output, b"") + + with patch("subprocess.Popen", return_value=mock_proc): + result = self.machine._get_cmdline_unix_ps(1234) + # Should only take the second line (index 1) + assert result == ["python script.py"] + + def test_get_cmdline_unix_ps_with_special_characters(self) -> None: + """Test _get_cmdline_unix_ps with special characters in command.""" + ps_output = b"COMMAND\npython -c 'print(\"hello\")'\n" + mock_proc = Mock() + mock_proc.communicate.return_value = (ps_output, b"") + + with patch("subprocess.Popen", return_value=mock_proc): + result = self.machine._get_cmdline_unix_ps(1234) + assert result == ["python -c 'print(\"hello\")'"] + + def test_get_cmdline_linux_proc_with_unicode(self) -> None: + """Test _get_cmdline_linux_proc with unicode characters.""" + proc_content = "python\x00script_café.py\x00" + with patch("builtins.open", mock_open(read_data=proc_content)): + result = self.machine._get_cmdline_linux_proc() + assert "script_café.py" in result + + @pytest.mark.parametrize( + "pid_value", + [1, 100, 9999, 65535], + ids=["pid_1", "pid_100", "pid_9999", "pid_max"], + ) + def test_get_cmdline_unix_ps_with_various_pids(self, pid_value: int) -> None: + """Test _get_cmdline_unix_ps with various PID values.""" + mock_proc = Mock() + mock_proc.communicate.return_value = (b"COMMAND\ntest\n", b"") + + with patch("subprocess.Popen", return_value=mock_proc) as mock_popen: + self.machine._get_cmdline_unix_ps(pid_value) + mock_popen.assert_called_once_with( + ["ps", "-p", str(pid_value), "-o", "args"], stdout=subprocess.PIPE + ) + + +# Made with Bob