diff --git a/src/instana/__init__.py b/src/instana/__init__.py index 5d66246e..dd55ee49 100644 --- a/src/instana/__init__.py +++ b/src/instana/__init__.py @@ -83,7 +83,9 @@ def key_to_bool(k: str) -> bool: import inspect all_accepted_patch_all_args = inspect.getfullargspec(monkey.patch_all)[0] - provided_options = provided_options.replace(" ", "").replace("--", "").split(",") + provided_options = ( + provided_options.replace(" ", "").replace("--", "").split(",") + ) provided_options = [ k for k in provided_options if short_key(k) in all_accepted_patch_all_args @@ -210,11 +212,6 @@ def boot_agent() -> None: server as tornado_server, # noqa: F401 ) - # Hooks - from instana.hooks import ( - hook_gunicorn, # noqa: F401 - ) - def _start_profiler() -> None: """Start the Instana Auto Profile.""" diff --git a/src/instana/agent/host.py b/src/instana/agent/host.py index 9ecc74ca..5af9962d 100644 --- a/src/instana/agent/host.py +++ b/src/instana/agent/host.py @@ -83,6 +83,8 @@ def handle_fork(self) -> None: """ Forks happen. Here we handle them. """ + # Update boot PID to current PID to prevent duplicate fork detection + self._boot_pid = os.getpid() # Reset the Agent self.reset() @@ -457,9 +459,13 @@ def diagnostics(self) -> None: logger.warning( f"is_collector_thread_running?: {self.collector.is_reporting_thread_running()}" ) - logger.warning( - f"background_report_lock.locked?: {self.collector.background_report_lock.locked()}" + # RLock doesn't have a locked() method, so we check by trying to acquire + lock_acquired = self.collector.background_report_lock.acquire( + blocking=False ) + if lock_acquired: + self.collector.background_report_lock.release() + logger.warning(f"background_report_lock.locked?: {not lock_acquired}") logger.warning(f"ready_to_start: {self.collector.ready_to_start}") logger.warning(f"reporting_thread: {self.collector.reporting_thread}") logger.warning(f"report_interval: {self.collector.report_interval}") diff --git a/src/instana/collector/base.py b/src/instana/collector/base.py index 23c410b3..35dc9c19 100644 --- a/src/instana/collector/base.py +++ b/src/instana/collector/base.py @@ -12,7 +12,7 @@ from typing import TYPE_CHECKING, Any, DefaultDict, Dict, List, Type from instana.log import logger -from instana.util import DictionaryOfStan +from instana.util import DictionaryOfStan, get_lock if TYPE_CHECKING: from instana.agent.base import BaseAgent @@ -55,7 +55,7 @@ def __init__(self, agent: Type["BaseAgent"]) -> None: # Lock used synchronize reporting - no updates when sending # Used by the background reporting thread. Used to synchronize report attempts and so # that we never have two in progress at once. - self.background_report_lock = threading.Lock() + self.background_report_lock = get_lock() # Reporting interval for the background thread(s) self.report_interval = 1 @@ -70,9 +70,12 @@ def is_reporting_thread_running(self) -> bool: """ Indicates if there is a thread running with the name self.THREAD_NAME """ - for thread in threading.enumerate(): - if thread.name == self.THREAD_NAME: - return True + if ( + self.started + and self.reporting_thread is not None + and self.reporting_thread.is_alive() + ): + return True return False def start(self) -> None: @@ -80,6 +83,8 @@ def start(self) -> None: Starts the collector and starts reporting as long as the agent is in a ready state. @return: None """ + # Check if we already have a valid running thread using is_alive() + # This is more reliable than is_reporting_thread_running() after fork if self.is_reporting_thread_running(): if self.thread_shutdown.is_set(): # Force a restart. @@ -93,6 +98,7 @@ def start(self) -> None: logger.debug( f"BaseCollector.start non-fatal: call but thread already running (started: {self.started})" ) + return if self.agent.can_send(): logger.debug("BaseCollector.start: launching collection thread") @@ -120,6 +126,8 @@ def shutdown(self, report_final: bool = True) -> None: logger.debug("Collector.shutdown: Reporting final data.") self.prepare_and_report_data() self.started = False + # Clear the thread reference to ensure clean restart after fork + self.reporting_thread = None def background_report(self) -> None: """ diff --git a/src/instana/collector/host.py b/src/instana/collector/host.py index dfb2aacd..41d3e81d 100644 --- a/src/instana/collector/host.py +++ b/src/instana/collector/host.py @@ -6,7 +6,7 @@ """ from time import time -from typing import DefaultDict, Any +from typing import Any, DefaultDict from instana.collector.base import BaseCollector from instana.collector.helpers.runtime import RuntimeHelper @@ -43,19 +43,20 @@ def prepare_and_report_data(self) -> None: state machine case. """ try: - if self.agent.machine.fsm.current == "wait4init": + with self.agent.machine._lock: + current_state = self.agent.machine.fsm.current + + if current_state == "wait4init": # Test the host agent if we're ready to send data if self.agent.is_agent_ready(): - if self.agent.machine.fsm.current != "good2go": - logger.debug("Agent is ready. Getting to work.") - self.agent.machine.fsm.ready() + with self.agent.machine._lock: + if self.agent.machine.fsm.current != "good2go": + logger.debug("Agent is ready. Getting to work.") + self.agent.machine.fsm.ready() else: return - if ( - self.agent.machine.fsm.current == "good2go" - and self.agent.is_timed_out() - ): + if current_state == "good2go" and self.agent.is_timed_out(): logger.info( "The Instana host agent has gone offline or is no longer reachable for > 1 min. Will retry periodically." ) diff --git a/src/instana/fsm.py b/src/instana/fsm.py index c4145a5f..912af2df 100644 --- a/src/instana/fsm.py +++ b/src/instana/fsm.py @@ -13,7 +13,7 @@ from fysom import Fysom from instana.log import logger -from instana.util import get_default_gateway +from instana.util import get_default_gateway, get_lock from instana.util.process_discovery import Discovery from instana.version import VERSION @@ -25,14 +25,16 @@ class TheMachine: RETRY_PERIOD = 30 THREAD_NAME = "Instana Machine" - warnedPeriodic = False - def __init__(self, agent: "HostAgent") -> None: logger.debug("Initializing host agent state machine") + self._lock = get_lock() + self._warned_periodic = False + self.agent = agent self.fsm = Fysom( { + "initial": "*", "events": [ ("lookup", "*", "found"), ("announce", "found", "announced"), @@ -41,7 +43,7 @@ def __init__(self, agent: "HostAgent") -> None: ], "callbacks": { # Can add the following to debug - # "onchangestate": self.print_state_change, + # "onchangestate": self.print_state_change, "onlookup": self.lookup_agent_host, "onannounce": self.announce_sensor, "onpending": self.on_ready, @@ -50,10 +52,11 @@ def __init__(self, agent: "HostAgent") -> None: } ) - self.timer = threading.Timer(1, self.fsm.lookup) - self.timer.daemon = True - self.timer.name = self.THREAD_NAME - self.timer.start() + with self._lock: + self.timer = threading.Timer(1, self._safe_fsm_lookup) + self.timer.daemon = True + self.timer.name = self.THREAD_NAME + self.timer.start() @staticmethod def print_state_change(e: Any) -> None: @@ -61,6 +64,21 @@ def print_state_change(e: Any) -> None: f"========= ({os.getpid()}#{threading.current_thread().name}) FSM event: {e.event}, src: {e.src}, dst: {e.dst} ==========" ) + def _safe_fsm_lookup(self) -> None: + """Thread-safe wrapper for FSM lookup.""" + with self._lock: + self.fsm.lookup() + + def _safe_fsm_announce(self) -> None: + """Thread-safe wrapper for FSM announce.""" + with self._lock: + self.fsm.announce() + + def _safe_fsm_pending(self) -> None: + """Thread-safe wrapper for FSM pending.""" + with self._lock: + self.fsm.pending() + def reset(self) -> None: """ reset is called to start from scratch in a process. It may be called on first boot or @@ -72,14 +90,15 @@ def reset(self) -> None: :return: void """ logger.debug("State machine being reset. Will start a new announce cycle.") - self.fsm.lookup() + with self._lock: + self.fsm.lookup() def lookup_agent_host(self, e: Any) -> bool: host = self.agent.options.agent_host port = self.agent.options.agent_port if self.agent.is_agent_listening(host, port): - self.fsm.announce() + self._safe_fsm_announce() return True if os.path.exists("/proc/"): @@ -88,14 +107,15 @@ def lookup_agent_host(self, e: Any) -> bool: if self.agent.is_agent_listening(host, port): self.agent.options.agent_host = host self.agent.options.agent_port = port - self.fsm.announce() + self._safe_fsm_announce() return True - if self.warnedPeriodic is False: - logger.info( - "Instana Host Agent couldn't be found. Will retry periodically..." - ) - self.warnedPeriodic = True + with self._lock: + if self._warned_periodic is False: + logger.info( + "Instana Host Agent couldn't be found. Will retry periodically..." + ) + self._warned_periodic = True self.schedule_retry( self.lookup_agent_host, e, f"{self.THREAD_NAME}: agent_lookup" @@ -156,17 +176,18 @@ def announce_sensor(self, e: Any) -> bool: return False self.agent.set_from(payload) - self.fsm.pending() + self._safe_fsm_pending() logger.debug( f"Announced PID: {pid} (true PID: {self.agent.announce_data.pid}). Waiting for Agent Ready..." ) return True def schedule_retry(self, fun: Callable, e: Any, name: str) -> None: - self.timer = threading.Timer(self.RETRY_PERIOD, fun, [e]) - self.timer.daemon = True - self.timer.name = name - self.timer.start() + with self._lock: + self.timer = threading.Timer(self.RETRY_PERIOD, fun, [e]) + self.timer.daemon = True + self.timer.name = name + self.timer.start() def on_ready(self, _: Any) -> None: self.agent.start() diff --git a/src/instana/hooks/__init__.py b/src/instana/hooks/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/src/instana/hooks/hook_gunicorn.py b/src/instana/hooks/hook_gunicorn.py deleted file mode 100644 index e3fb8086..00000000 --- a/src/instana/hooks/hook_gunicorn.py +++ /dev/null @@ -1,25 +0,0 @@ -# (c) Copyright IBM Corp. 2021 -# (c) Copyright Instana Inc. 2019 - -try: - from instana.log import logger - from instana.singletons import agent - - import gunicorn - from gunicorn.arbiter import Arbiter - from gunicorn.config import Config - from gunicorn.workers.sync import SyncWorker - - def pre_fork(config: Config, server: Arbiter, worker: SyncWorker) -> None: - """This is our gunicorn hook to detect and act when worker processes are forked off.""" - logger.debug("Handling gunicorn fork...") - agent.handle_fork() - - Config.pre_fork = pre_fork - - logger.debug("Gunicorn pre-fork hook applied") -except ImportError: - logger.debug( - "gunicorn hooks: decorators not available: likely not running under gunicorn" - ) - pass diff --git a/src/instana/util/__init__.py b/src/instana/util/__init__.py index b8b44365..044c6e5f 100644 --- a/src/instana/util/__init__.py +++ b/src/instana/util/__init__.py @@ -3,12 +3,40 @@ import importlib.metadata import json +import threading from collections import defaultdict -from typing import Any, DefaultDict +from typing import Any, DefaultDict, Union from urllib import parse from instana.log import logger +try: + import gevent.lock + + GeventRLock = gevent.lock.RLock +except ImportError: + GeventRLock = None + + +def get_lock() -> Union[threading.RLock, Any]: + """ + Get an appropriate lock for the current environment. + Returns a gevent-compatible lock if gevent is active, otherwise a threading.Lock. + """ + try: + # Check if gevent is active by looking for monkey-patched threading + import gevent.monkey + + if gevent.monkey.is_module_patched("threading"): + from gevent.lock import RLock + + return RLock() + except (ImportError, AttributeError): + pass + + # Default to threading.RLock for regular threads + return threading.RLock() + def nested_dictionary() -> DefaultDict[str, Any]: return defaultdict(DictionaryOfStan) diff --git a/tests/collector/test_base_collector.py b/tests/collector/test_base_collector.py index dad090b6..4639fb5a 100644 --- a/tests/collector/test_base_collector.py +++ b/tests/collector/test_base_collector.py @@ -54,6 +54,9 @@ def reporting_function(): name=self.collector.THREAD_NAME, target=reporting_function ) sample_thread.start() + # Set the required state for is_reporting_thread_running to return True + self.collector.started = True + self.collector.reporting_thread = sample_thread try: assert self.collector.is_reporting_thread_running() finally: