diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 684d136033..4bee60fafd 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -12,4 +12,4 @@ If this is your first time contributing, please read the Pull Request Walkthroug - [ ] Follows the [Pulp policy on AI Usage](https://pulpproject.org/help/more/governance/ai_policy/) - [ ] (For new features) - User documentation and test coverage has been added -See: [Pull Request Walkthrough](https://pulpproject.org/pulpcore/docs/dev/guides/pull-request-walkthrough/) \ No newline at end of file +See: [Pull Request Walkthrough](https://pulpproject.org/pulpcore/docs/dev/guides/pull-request-walkthrough/) diff --git a/.github/workflows/scripts/install.sh b/.github/workflows/scripts/install.sh index e4914dae98..47a5567701 100755 --- a/.github/workflows/scripts/install.sh +++ b/.github/workflows/scripts/install.sh @@ -122,7 +122,7 @@ if [ "$TEST" = "azure" ]; then - ./azurite:/etc/pulp\ command: "azurite-blob --skipApiVersionCheck --blobHost 0.0.0.0"' vars/main.yaml sed -i -e '$a azure_test: true\ -pulp_scenario_settings: {"MEDIA_ROOT": "", "STORAGES": {"default": {"BACKEND": "storages.backends.azure_storage.AzureStorage", "OPTIONS": {"account_key": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "account_name": "devstoreaccount1", "azure_container": "pulp-test", "connection_string": "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://ci-azurite:10000/devstoreaccount1;", "expiration_secs": 120, "location": "pulp3", "overwrite_files": true}}, "staticfiles": {"BACKEND": "django.contrib.staticfiles.storage.StaticFilesStorage"}}, "api_root_rewrite_header": "X-API-Root", "content_origin": null, "domain_enabled": true, "rest_framework__default_authentication_classes": "@merge pulpcore.app.authentication.PulpRemoteUserAuthentication", "rest_framework__default_permission_classes": ["pulpcore.plugin.access_policy.DefaultAccessPolicy"], "task_diagnostics": ["memory"]}\ +pulp_scenario_settings: {"MEDIA_ROOT": "", "STORAGES": {"default": {"BACKEND": "storages.backends.azure_storage.AzureStorage", "OPTIONS": {"account_key": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "account_name": "devstoreaccount1", "azure_container": "pulp-test", "connection_string": "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://ci-azurite:10000/devstoreaccount1;", "expiration_secs": 120, "location": "pulp3", "overwrite_files": true}}, "staticfiles": {"BACKEND": "django.contrib.staticfiles.storage.StaticFilesStorage"}}, "api_root_rewrite_header": "X-API-Root", "content_origin": null, "domain_enabled": true, "rest_framework__default_authentication_classes": "@merge pulpcore.app.authentication.PulpRemoteUserAuthentication", "rest_framework__default_permission_classes": ["pulpcore.plugin.access_policy.DefaultAccessPolicy"], "task_diagnostics": ["memory"], "worker_type": "redis"}\ pulp_scenario_env: {}\ ' vars/main.yaml fi diff --git a/CHANGES/7210.feature b/CHANGES/7210.feature new file mode 100644 index 0000000000..7abad74e93 --- /dev/null +++ b/CHANGES/7210.feature @@ -0,0 +1 @@ +Added WORKER_TYPE setting. Defaults to 'pulpcore'. 'redis' is also available. diff --git a/docs/admin/reference/settings.md b/docs/admin/reference/settings.md index f6b2b9afd4..10cee742bc 100644 --- a/docs/admin/reference/settings.md +++ b/docs/admin/reference/settings.md @@ -405,6 +405,20 @@ The number of seconds before a worker should be considered lost. Defaults to `30` seconds. +### WORKER\_TYPE + +The worker implementation to use for task execution. + +Available options: + +- `"pulpcore"` (default): Uses PostgreSQL advisory locks for task coordination. This is the traditional worker implementation. +- `"redis"`: Uses Redis distributed locks for task coordination. This implementation produces less load on the DB. + +!!! note + The Redis worker requires a Redis server to be configured and accessible. + +Defaults to `"pulpcore"`. + ### WORKING\_DIRECTORY The directory used by workers to stage files temporarily. diff --git a/pulpcore/app/models/task.py b/pulpcore/app/models/task.py index 75d05682d8..476d10dec4 100644 --- a/pulpcore/app/models/task.py +++ b/pulpcore/app/models/task.py @@ -5,6 +5,7 @@ import logging from gettext import gettext as _ +from django.conf import settings from django.contrib.postgres.fields import ArrayField, HStoreField from django.contrib.postgres.indexes import GinIndex from django.core.serializers.json import DjangoJSONEncoder @@ -205,13 +206,18 @@ def set_running(self): This updates the :attr:`started_at` and sets the :attr:`state` to :attr:`RUNNING`. """ started_at = timezone.now() - rows = Task.objects.filter( - pk=self.pk, - state=TASK_STATES.WAITING, - app_lock=AppStatus.objects.current(), - ).update( + filter_kwargs = { + "pk": self.pk, + "state": TASK_STATES.WAITING, + } + # Only check app_lock for PulpcoreWorker, not RedisWorker + if settings.WORKER_TYPE != "redis": + filter_kwargs["app_lock"] = AppStatus.objects.current() + + rows = Task.objects.filter(**filter_kwargs).update( state=TASK_STATES.RUNNING, started_at=started_at, + app_lock=AppStatus.objects.current(), ) if rows == 1: self.state = TASK_STATES.RUNNING diff --git a/pulpcore/app/settings.py b/pulpcore/app/settings.py index fd9084fa46..2c2c9be586 100644 --- a/pulpcore/app/settings.py +++ b/pulpcore/app/settings.py @@ -293,6 +293,10 @@ CONTENT_APP_TTL = 30 WORKER_TTL = 30 +# Worker implementation type +# Options: "pulpcore" (default, PostgreSQL advisory locks) or "redis" (Redis distributed locks) +WORKER_TYPE = "pulpcore" + # Seconds for a task to finish on semi graceful worker shutdown (approx) # On SIGHUP, SIGTERM the currently running task will be awaited forever. # On SIGINT, this value represents the time before the worker will attempt to kill the subprocess. diff --git a/pulpcore/app/tasks/test.py b/pulpcore/app/tasks/test.py index c52b9dc2a7..7c0616ae88 100644 --- a/pulpcore/app/tasks/test.py +++ b/pulpcore/app/tasks/test.py @@ -1,5 +1,7 @@ import asyncio import backoff +import os +import signal import time from pulpcore.app.models import TaskGroup from pulpcore.tasking.tasks import dispatch @@ -32,3 +34,44 @@ def dummy_group_task(inbetween=3, intervals=None): for interval in intervals: dispatch(sleep, args=(interval,), task_group=task_group) time.sleep(inbetween) + + +def missing_worker(): + """ + Simulates a worker crash by sending SIGKILL to parent process and itself. + + This task is used for testing worker cleanup behavior when a worker + unexpectedly dies while executing a task. + """ + parent_pid = os.getppid() + current_pid = os.getpid() + + # Kill parent process (the worker) + os.kill(parent_pid, signal.SIGKILL) + + # Kill current process (the task subprocess) + os.kill(current_pid, signal.SIGKILL) + + +def failing_task(error_message="Task intentionally failed"): + """ + A task that always raises a RuntimeError. + + This task is used for testing error handling in worker task execution. + + Args: + error_message (str): The error message to include in the RuntimeError + """ + raise RuntimeError(error_message) + + +async def afailing_task(error_message="Task intentionally failed"): + """ + An async task that always raises a RuntimeError. + + This task is used for testing error handling in immediate task execution. + + Args: + error_message (str): The error message to include in the RuntimeError + """ + raise RuntimeError(error_message) diff --git a/pulpcore/app/viewsets/task.py b/pulpcore/app/viewsets/task.py index a3f2f6d567..a13a36911a 100644 --- a/pulpcore/app/viewsets/task.py +++ b/pulpcore/app/viewsets/task.py @@ -1,5 +1,6 @@ from gettext import gettext as _ +from django.conf import settings from django.db import transaction from django.db.models import Prefetch from django_filters.rest_framework import filters @@ -42,7 +43,7 @@ CreatedResourcesFilter, ) from pulpcore.constants import TASK_INCOMPLETE_STATES, TASK_STATES -from pulpcore.tasking.tasks import dispatch, cancel_task, cancel_task_group +from pulpcore.tasking.tasks import dispatch from pulpcore.app.role_util import get_objects_for_user @@ -227,7 +228,17 @@ def partial_update(self, request, pk=None, partial=True): serializer.is_valid(raise_exception=True) task = self.get_object() - task = cancel_task(task.pk) + + # Call the appropriate cancel_task function based on worker type + if settings.WORKER_TYPE == "redis": + from pulpcore.tasking.redis_tasks import cancel_task + + task = cancel_task(task.pk) + else: + from pulpcore.tasking.tasks import cancel_task + + task = cancel_task(task.pk) + # Check whether task is actually canceled http_status = ( None @@ -346,7 +357,16 @@ def partial_update(self, request, pk=None, partial=True): ).count() ): raise PermissionDenied() - task_group = cancel_task_group(task_group.pk) + + # Call the appropriate cancel_task_group function based on worker type + if settings.WORKER_TYPE == "redis": + from pulpcore.tasking.redis_tasks import cancel_task_group + + task_group = cancel_task_group(task_group.pk) + else: + from pulpcore.tasking.tasks import cancel_task_group + + task_group = cancel_task_group(task_group.pk) # Check whether task group is actually canceled serializer = TaskGroupSerializer(task_group, context={"request": request}) task_statuses = ( diff --git a/pulpcore/pytest_plugin.py b/pulpcore/pytest_plugin.py index 98a49306f0..efaee4cc40 100644 --- a/pulpcore/pytest_plugin.py +++ b/pulpcore/pytest_plugin.py @@ -56,6 +56,14 @@ def pytest_collection_modifyitems(config, items): if "nightly" in item.keywords: item.add_marker(skip_nightly) + # Skip long_running tests if --timeout is below 600 + timeout = config.getoption("--timeout", default=None) + if timeout is None or float(timeout) < 600: + skip_long = pytest.mark.skip(reason="needs --timeout >= 600 to run") + for item in items: + if "long_running" in item.keywords: + item.add_marker(skip_long) + class PulpTaskTimeoutError(Exception): """Exception to describe task and taskgroup timeout errors.""" @@ -82,6 +90,10 @@ def pytest_configure(config): "markers", "nightly: marks tests as intended to run during the nightly CI run", ) + config.addinivalue_line( + "markers", + "long_running: marks tests that need a long pytest timeout (--timeout >= 600)", + ) @pytest.fixture(scope="session") diff --git a/pulpcore/tasking/_util.py b/pulpcore/tasking/_util.py index 7274d7cb8c..e1e17acc38 100644 --- a/pulpcore/tasking/_util.py +++ b/pulpcore/tasking/_util.py @@ -23,7 +23,14 @@ configure_periodic_telemetry, ) from pulpcore.constants import TASK_FINAL_STATES, TASK_STATES -from pulpcore.tasking.tasks import dispatch, execute_task +from pulpcore.tasking.tasks import dispatch + +# Conditionally import execute_task based on WORKER_TYPE +if settings.WORKER_TYPE == "redis": + from pulpcore.tasking.redis_tasks import execute_task +else: + from pulpcore.tasking.tasks import execute_task + _logger = logging.getLogger(__name__) diff --git a/pulpcore/tasking/entrypoint.py b/pulpcore/tasking/entrypoint.py index fdc9b0de84..210291d462 100644 --- a/pulpcore/tasking/entrypoint.py +++ b/pulpcore/tasking/entrypoint.py @@ -10,6 +10,7 @@ from django.conf import settings # noqa: E402: module level not at top from pulpcore.tasking.worker import PulpcoreWorker # noqa: E402: module level not at top +from pulpcore.tasking.redis_worker import RedisWorker # noqa: E402: module level not at top _logger = logging.getLogger(__name__) @@ -59,6 +60,16 @@ def worker( if name_template: settings.set("WORKER_NAME_TEMPLATE", name_template) - _logger.info("Starting distributed type worker") + worker_type = settings.WORKER_TYPE + _logger.info("Starting %s worker", worker_type) - PulpcoreWorker(auxiliary=auxiliary).run(burst=burst) + if worker_type == "redis": + if auxiliary: + _logger.warning( + "RedisWorker does not support auxiliary mode, ignoring --auxiliary flag" + ) + RedisWorker().run(burst=burst) + elif worker_type == "pulpcore": + PulpcoreWorker(auxiliary=auxiliary).run(burst=burst) + else: + raise ValueError(f"Invalid WORKER_TYPE: {worker_type}. Must be 'pulpcore' or 'redis'.") diff --git a/pulpcore/tasking/redis_locks.py b/pulpcore/tasking/redis_locks.py new file mode 100644 index 0000000000..8bc1fb1007 --- /dev/null +++ b/pulpcore/tasking/redis_locks.py @@ -0,0 +1,476 @@ +""" +Redis distributed lock utilities for task resource coordination. + +This module provides functions and Lua scripts for managing exclusive and shared +resource locks using Redis. +""" + +import logging +from asgiref.sync import sync_to_async + +from pulpcore.app.redis_connection import get_redis_connection + + +_logger = logging.getLogger(__name__) + +# Redis key prefix for resource locks +REDIS_LOCK_PREFIX = "pulp:resource_lock:" + +REDIS_ACQUIRE_LOCKS_SCRIPT = """ +-- KEYS[1]: task_lock_key +-- KEYS[2...]: exclusive_lock_keys, then shared_lock_keys +-- ARGV[1]: lock_owner (worker name) +-- ARGV[2]: number of exclusive resources +-- ARGV[3...]: exclusive resource names, then shared resource names (for error reporting) +-- Returns: empty table if success, table of blocked resource names if failed + +local task_lock_key = KEYS[1] +local lock_owner = ARGV[1] +local num_exclusive = tonumber(ARGV[2]) +local blocked_resources = {} + +-- Check task lock first (fail fast) +if redis.call("exists", task_lock_key) == 1 then + table.insert(blocked_resources, "__task_lock__") + return blocked_resources +end + +-- Check exclusive resource locks +-- Resource keys start at KEYS[2] +for i = 1, num_exclusive do + local key = KEYS[1 + i] + local resource_name = ARGV[2 + i] + + -- Check if lock exists + if redis.call("exists", key) == 1 then + -- Lock already held, add to blocked list + table.insert(blocked_resources, resource_name) + end +end + +-- If any exclusive locks were blocked, don't proceed +if #blocked_resources > 0 then + return blocked_resources +end + +-- Check shared resources - ensure no exclusive locks exist +-- Shared resource keys start at KEYS[2 + num_exclusive] +for i = num_exclusive + 1, #KEYS - 1 do + local key = KEYS[1 + i] + local shared_resource_name = ARGV[2 + i] + + -- Check if there's an exclusive lock (string value) + local lock_type = redis.call("type", key) + if lock_type["ok"] == "string" then + -- Exclusive lock exists on a shared resource we need + table.insert(blocked_resources, shared_resource_name) + end +end + +-- If any shared resources are blocked by exclusive locks, fail +if #blocked_resources > 0 then + return blocked_resources +end + +-- All checks passed, acquire ALL locks atomically +-- Acquire task lock (no expiration - will be deleted explicitly on completion) +redis.call("set", task_lock_key, lock_owner) + +-- Acquire exclusive resource locks +for i = 1, num_exclusive do + local key = KEYS[1 + i] + redis.call("set", key, lock_owner) +end + +-- Acquire shared resource locks +for i = num_exclusive + 1, #KEYS - 1 do + local key = KEYS[1 + i] + redis.call("sadd", key, lock_owner) +end + +-- Return empty table to indicate success +return {} +""" + +REDIS_RELEASE_LOCKS_SCRIPT = """ +-- KEYS[1]: task_lock_key +-- KEYS[2...]: exclusive_lock_keys, shared_lock_keys +-- ARGV[1]: lock_owner +-- ARGV[2]: number of exclusive resources +-- ARGV[3...]: resource names for error reporting +-- Returns: {not_owned_exclusive, not_in_shared, task_lock_not_owned} + +local task_lock_key = KEYS[1] +local lock_owner = ARGV[1] +local num_exclusive = tonumber(ARGV[2]) +local not_owned_exclusive = {} +local not_in_shared = {} +local task_lock_not_owned = false + +-- Release exclusive locks +-- Resource keys start at KEYS[2] +for i = 1, num_exclusive do + local key = KEYS[1 + i] + local resource_name = ARGV[2 + i] + + -- Check if we own the lock + local current_owner = redis.call("get", key) + if current_owner == lock_owner then + redis.call("del", key) + elseif current_owner ~= false then + -- Lock exists but we don't own it + table.insert(not_owned_exclusive, resource_name) + end + -- If current_owner is false (nil), lock doesn't exist - already released +end + +-- Release shared locks +-- Shared keys start at KEYS[2 + num_exclusive] +for i = num_exclusive + 1, #KEYS - 1 do + local key = KEYS[1 + i] + local resource_name = ARGV[2 + i] + + -- Remove from set + local removed = redis.call("srem", key, lock_owner) + if removed == 0 then + -- We weren't in the set + table.insert(not_in_shared, resource_name) + end +end + +-- Release task lock +local task_lock_owner = redis.call("get", task_lock_key) +if task_lock_owner == lock_owner then + redis.call("del", task_lock_key) +elseif task_lock_owner ~= false then + -- Task lock exists but we don't own it + task_lock_not_owned = true +end + +return {not_owned_exclusive, not_in_shared, task_lock_not_owned} +""" + + +def resource_to_lock_key(resource_name): + """ + Convert a resource name to a Redis lock key. + + Args: + resource_name (str): The resource name (e.g., "prn:rpm.repository:abc123") + + Returns: + str: A Redis key for the resource lock + """ + return f"{REDIS_LOCK_PREFIX}{resource_name}" + + +def get_task_lock_key(task_id): + """ + Get the Redis lock key for a task. + + Args: + task_id: The task ID (task.pk or UUID string) + + Returns: + str: A Redis key for the task lock + """ + return f"task:{task_id}" + + +def extract_task_resources(task): + """ + Extract exclusive and shared resources from a task. + + Args: + task: Task object with reserved_resources_record field + + Returns: + tuple: (exclusive_resources, shared_resources) + exclusive_resources: List of exclusive resource names + shared_resources: List of shared resource names (with "shared:" prefix stripped) + """ + reserved_resources_record = task.reserved_resources_record or [] + + exclusive_resources = [ + resource for resource in reserved_resources_record if not resource.startswith("shared:") + ] + + shared_resources = [ + resource[7:] # Remove "shared:" prefix + for resource in reserved_resources_record + if resource.startswith("shared:") + ] + + return exclusive_resources, shared_resources + + +def safe_release_task_locks(task, lock_owner=None): + """ + Safely release all locks for a task with idempotency check. + + This function: + 1. Checks if locks have already been released (idempotent) + 2. Extracts resources from task.reserved_resources_record + 3. Determines lock owner (from AppStatus or task-specific identifier) + 4. Releases task lock and all resource locks atomically + 5. Marks locks as released to prevent double-release + + Args: + task: The Task object to release locks for + lock_owner: Optional lock owner identifier. If not provided, will use + AppStatus.objects.current() or fall back to f"immediate-{task.pk}" + + Returns: + bool: True if locks were released, False if already released or no Redis connection + """ + from pulpcore.app.models import AppStatus + + # Check if locks already released (idempotent) + if getattr(task, "_all_locks_released", False): + return False + + redis_conn = get_redis_connection() + if not redis_conn: + _logger.warning("Redis connection not available for releasing locks for task %s", task.pk) + return False + + # Extract resources from task + exclusive_resources, shared_resources = extract_task_resources(task) + + # Determine lock owner + if lock_owner is None: + current_app = AppStatus.objects.current() + lock_owner = current_app.name if current_app else f"immediate-{task.pk}" + + # Build task lock key + task_lock_key = get_task_lock_key(task.pk) + + # Release all locks atomically + release_resource_locks( + redis_conn, lock_owner, task_lock_key, exclusive_resources, shared_resources + ) + + # Mark all locks as released + task._all_locks_released = True + return True + + +async def async_safe_release_task_locks(task, lock_owner=None): + """ + Async version: Safely release all locks for a task with idempotency check. + + This function: + 1. Checks if locks have already been released (idempotent) + 2. Extracts resources from task.reserved_resources_record + 3. Determines lock owner (from AppStatus or task-specific identifier) + 4. Releases task lock and all resource locks atomically + 5. Marks locks as released to prevent double-release + + Args: + task: The Task object to release locks for + lock_owner: Optional lock owner identifier. If not provided, will use + AppStatus.objects.current() or fall back to f"immediate-{task.pk}" + + Returns: + bool: True if locks were released, False if already released or no Redis connection + """ + from pulpcore.app.models import AppStatus + + # Check if locks already released (idempotent) + if getattr(task, "_all_locks_released", False): + return False + + redis_conn = get_redis_connection() + if not redis_conn: + _logger.warning("Redis connection not available for releasing locks for task %s", task.pk) + return False + + # Extract resources from task + exclusive_resources, shared_resources = extract_task_resources(task) + + # Determine lock owner + if lock_owner is None: + current_app = await sync_to_async(AppStatus.objects.current)() + lock_owner = current_app.name if current_app else f"immediate-{task.pk}" + + # Build task lock key + task_lock_key = get_task_lock_key(task.pk) + + # Release all locks atomically + await async_release_resource_locks( + redis_conn, lock_owner, task_lock_key, exclusive_resources, shared_resources + ) + + # Mark all locks as released + task._all_locks_released = True + return True + + +def acquire_locks(redis_conn, lock_owner, task_lock_key, exclusive_resources, shared_resources): + """ + Atomically try to acquire task lock and resource locks. + + Args: + redis_conn: Redis connection + lock_owner (str): The identifier of the lock owner (worker/task) + task_lock_key (str): Redis key for the task lock (e.g., "task:{task_id}") + exclusive_resources (list): List of exclusive resource names + shared_resources (list): List of shared resource names + + Returns: + list: Empty list if all locks acquired successfully, + list of blocked resource names if acquisition failed + (includes "__task_lock__" if task lock is held by another worker) + """ + if not redis_conn: + return [] + + # Sort resources deterministically to prevent deadlocks + exclusive_resources = sorted(exclusive_resources) if exclusive_resources else [] + shared_resources = sorted(shared_resources) if shared_resources else [] + + # Build KEYS list: task_lock_key, then exclusive lock keys, then shared lock keys + keys = [task_lock_key] + for resource in exclusive_resources: + keys.append(resource_to_lock_key(resource)) + for resource in shared_resources: + keys.append(resource_to_lock_key(resource)) + + # Build ARGV list: lock_owner, num_exclusive, resource names (for error reporting) + args = [lock_owner, str(len(exclusive_resources))] + args.extend(exclusive_resources) + args.extend(shared_resources) + + # Register and execute the Lua script + acquire_script = redis_conn.register_script(REDIS_ACQUIRE_LOCKS_SCRIPT) + try: + blocked_resources = acquire_script(keys=keys, args=args) + # Redis returns list of blocked resources or empty list + return blocked_resources if blocked_resources else [] + except Exception as e: + _logger.error("Error acquiring locks: %s", e) + return ["error"] # Return non-empty list to indicate failure + + +def release_resource_locks(redis_conn, lock_owner, task_lock_key, resources, shared_resources=None): + """ + Atomically release task lock and resource locks. + + Uses a Lua script to ensure we only release locks that we own. + + Args: + redis_conn: Redis connection + lock_owner (str): The identifier of the lock owner + task_lock_key (str): Redis key for the task lock (e.g., "task:{task_id}") + resources (list): List of exclusive resource names to release locks for + shared_resources (list): Optional list of shared resource names + """ + if not redis_conn: + return + + exclusive_resources = resources if resources else [] + shared_resources = shared_resources if shared_resources else [] + + # Build KEYS list: task_lock_key, then exclusive lock keys, then shared lock keys + keys = [task_lock_key] + for resource in exclusive_resources: + keys.append(resource_to_lock_key(resource)) + for resource in shared_resources: + keys.append(resource_to_lock_key(resource)) + + # Build ARGV list: lock_owner, num_exclusive, resource names (for error reporting) + args = [lock_owner, str(len(exclusive_resources))] + args.extend(exclusive_resources) + args.extend(shared_resources) + + # Register and execute the Lua script + release_script = redis_conn.register_script(REDIS_RELEASE_LOCKS_SCRIPT) + try: + result = release_script(keys=keys, args=args) + # Result is [not_owned_exclusive, not_in_shared, task_lock_not_owned] + not_owned_exclusive = result[0] if result and len(result) > 0 else [] + not_in_shared = result[1] if result and len(result) > 1 else [] + task_lock_not_owned = result[2] if result and len(result) > 2 else False + + # Log warnings for locks we didn't own + for resource in not_owned_exclusive: + _logger.warning("Lock for resource %s was not owned by %s", resource, lock_owner) + for resource in not_in_shared: + _logger.warning("Shared resource %s did not contain %s", resource, lock_owner) + if task_lock_not_owned: + _logger.warning("Task lock %s was not owned by %s", task_lock_key, lock_owner) + + # Log debug for successful releases + num_released_exclusive = len(exclusive_resources) - len(not_owned_exclusive) + num_released_shared = len(shared_resources) - len(not_in_shared) + if num_released_exclusive > 0: + _logger.debug("Released %d exclusive lock(s)", num_released_exclusive) + if num_released_shared > 0: + _logger.debug("Released %d shared lock(s)", num_released_shared) + if not task_lock_not_owned: + _logger.debug("Released task lock %s", task_lock_key) + except Exception as e: + _logger.error("Error releasing locks: %s", e) + + +async def async_release_resource_locks( + redis_conn, lock_owner, task_lock_key, resources, shared_resources=None +): + """ + Async version: Atomically release task lock and resource locks. + + Uses a Lua script to ensure we only release locks that we own. + + Args: + redis_conn: Redis connection + lock_owner (str): The identifier of the lock owner + task_lock_key (str): Redis key for the task lock (e.g., "task:{task_id}") + resources (list): List of exclusive resource names to release locks for + shared_resources (list): Optional list of shared resource names + """ + if not redis_conn: + return + + exclusive_resources = resources if resources else [] + shared_resources = shared_resources if shared_resources else [] + + # Build KEYS list: task_lock_key, then exclusive lock keys, then shared lock keys + keys = [task_lock_key] + for resource in exclusive_resources: + keys.append(resource_to_lock_key(resource)) + for resource in shared_resources: + keys.append(resource_to_lock_key(resource)) + + # Build ARGV list: lock_owner, num_exclusive, resource names (for error reporting) + args = [lock_owner, str(len(exclusive_resources))] + args.extend(exclusive_resources) + args.extend(shared_resources) + + # Register and execute the Lua script + release_script = await sync_to_async(redis_conn.register_script)(REDIS_RELEASE_LOCKS_SCRIPT) + try: + result = await sync_to_async(release_script)(keys=keys, args=args) + # Result is [not_owned_exclusive, not_in_shared, task_lock_not_owned] + not_owned_exclusive = result[0] if result and len(result) > 0 else [] + not_in_shared = result[1] if result and len(result) > 1 else [] + task_lock_not_owned = result[2] if result and len(result) > 2 else False + + # Log warnings for locks we didn't own + for resource in not_owned_exclusive: + _logger.warning("Lock for resource %s was not owned by %s", resource, lock_owner) + for resource in not_in_shared: + _logger.warning("Shared resource %s did not contain %s", resource, lock_owner) + if task_lock_not_owned: + _logger.warning("Task lock %s was not owned by %s", task_lock_key, lock_owner) + + # Log debug for successful releases + num_released_exclusive = len(exclusive_resources) - len(not_owned_exclusive) + num_released_shared = len(shared_resources) - len(not_in_shared) + if num_released_exclusive > 0: + _logger.debug("Released %d exclusive lock(s)", num_released_exclusive) + if num_released_shared > 0: + _logger.debug("Released %d shared lock(s)", num_released_shared) + if not task_lock_not_owned: + _logger.debug("Released task lock %s", task_lock_key) + except Exception as e: + _logger.error("Error releasing locks: %s", e) diff --git a/pulpcore/tasking/redis_tasks.py b/pulpcore/tasking/redis_tasks.py new file mode 100644 index 0000000000..0dabdbff02 --- /dev/null +++ b/pulpcore/tasking/redis_tasks.py @@ -0,0 +1,618 @@ +""" +Task dispatch functions for Redis-based worker implementation. + +This module contains dispatch logic specific to the Redis worker that uses +Redis distributed locks for task coordination. +""" + +import contextvars +import logging +import sys +from asgiref.sync import sync_to_async + +from pulpcore.app.models import Task, TaskGroup, AppStatus +from pulpcore.app.redis_connection import get_redis_connection +from pulpcore.app.util import get_domain +from pulpcore.app.contexts import with_task_context, awith_task_context +from pulpcore.constants import TASK_STATES, TASK_FINAL_STATES +from pulpcore.exceptions.base import ( + PulpException, + InternalErrorException, +) +from pulp_glue.common.exceptions import ( + PulpException as PulpGlueException, +) +from pulpcore.tasking.redis_locks import ( + acquire_locks, + extract_task_resources, + get_task_lock_key, + safe_release_task_locks, + async_safe_release_task_locks, +) +from pulpcore.tasking.tasks import ( + called_from_content_app, + get_function_name, + get_version, + get_resources, + get_task_payload, + get_task_function, + aget_task_function, + log_task_start, + log_task_completed, + log_task_failed, + using_workdir, +) +from pulpcore.tasking.kafka import send_task_notification + + +_logger = logging.getLogger(__name__) + +# Redis key prefix for task cancellation +REDIS_CANCEL_PREFIX = "pulp:task:cancel:" + + +def publish_cancel_signal(task_id): + """ + Publish a cancellation signal for a task via Redis. + + Args: + task_id (str): The task ID to cancel + + Returns: + bool: True if signal was published, False otherwise + """ + redis_conn = get_redis_connection() + if not redis_conn: + _logger.error("Redis connection not available for task cancellation") + return False + + try: + # Publish to the task-specific cancellation channel + cancel_key = f"{REDIS_CANCEL_PREFIX}{task_id}" + # Set a value with expiration (24 hours) in case worker missed it + redis_conn.setex(cancel_key, 86400, "cancel") + _logger.info("Published cancellation signal for task %s", task_id) + return True + except Exception as e: + _logger.error("Error publishing cancellation signal for task %s: %s", task_id, e) + return False + + +def check_cancel_signal(task_id): + """ + Check if a cancellation signal exists for a task. + + Args: + task_id (str): The task ID to check + + Returns: + bool: True if cancellation signal exists, False otherwise + """ + redis_conn = get_redis_connection() + if not redis_conn: + return False + + try: + cancel_key = f"{REDIS_CANCEL_PREFIX}{task_id}" + return redis_conn.exists(cancel_key) > 0 + except Exception as e: + _logger.error("Error checking cancellation signal for task %s: %s", task_id, e) + return False + + +def clear_cancel_signal(task_id): + """ + Clear a cancellation signal for a task. + + Args: + task_id (str): The task ID to clear cancellation signal for + """ + redis_conn = get_redis_connection() + if not redis_conn: + return + + try: + cancel_key = f"{REDIS_CANCEL_PREFIX}{task_id}" + redis_conn.delete(cancel_key) + _logger.debug("Cleared cancellation signal for task %s", task_id) + except Exception as e: + _logger.error("Error clearing cancellation signal for task %s: %s", task_id, e) + + +def cancel_task(task_id): + """ + Cancel a task using Redis-based signaling. + + This method cancels only the task with given task_id, not the spawned tasks. + This also updates task's state to 'canceling'. + + Args: + task_id (str): The ID of the task you wish to cancel + + Returns: + Task: The task object + + Raises: + Task.DoesNotExist: If a task with given task_id does not exist + """ + task = Task.objects.select_related("pulp_domain").get(pk=task_id) + + if task.state in TASK_FINAL_STATES: + # If the task is already done, just stop. + _logger.debug( + "Task [%s] in domain: %s already in a final state: %s", + task_id, + task.pulp_domain.name, + task.state, + ) + return task + + _logger.info("Canceling task: %s in domain: %s", task_id, task.pulp_domain.name) + + # This is the only valid transition without holding the task lock. + task.set_canceling() + + if task.app_lock is None: + # Task was WAITING — no worker is executing it. + # Set app_lock so set_canceled() ownership check passes. + Task.objects.filter(pk=task.pk).update(app_lock=AppStatus.objects.current()) + task.app_lock = AppStatus.objects.current() + task.set_canceled() + else: + # Task is RUNNING — signal the supervising worker. + publish_cancel_signal(task.pk) + + return task + + +def cancel_task_group(task_group_id): + """ + Cancel the task group that is represented by the given task_group_id using Redis. + + This method attempts to cancel all tasks in the task group. + + Args: + task_group_id (str): The ID of the task group you wish to cancel + + Returns: + TaskGroup: The task group object + + Raises: + TaskGroup.DoesNotExist: If a task group with given task_group_id does not exist + """ + task_group = TaskGroup.objects.get(pk=task_group_id) + task_group.all_tasks_dispatched = True + task_group.save(update_fields=["all_tasks_dispatched"]) + + TASK_RUNNING_STATES = (TASK_STATES.RUNNING, TASK_STATES.WAITING) + tasks = task_group.tasks.filter(state__in=TASK_RUNNING_STATES).values_list("pk", flat=True) + for task_id in tasks: + try: + cancel_task(task_id) + except RuntimeError: + pass + return task_group + + +def execute_task(task): + """Redis-aware task execution that releases Redis locks for immediate tasks.""" + # This extra stack is needed to isolate the current_task ContextVar + contextvars.copy_context().run(_execute_task, task) + + +def _execute_task(task): + try: + # Log execution context information + current_app = AppStatus.objects.current() + if current_app: + _logger.info( + "TASK EXECUTION: Task %s being executed by %s (app_type=%s)", + task.pk, + current_app.name, + current_app.app_type, + ) + else: + _logger.info( + "TASK EXECUTION: Task %s being executed with no AppStatus.current()", task.pk + ) + + with with_task_context(task): + task.set_running() + domain = get_domain() + exception_info = None + result = None + + # Execute task and capture result or exception + try: + log_task_start(task, domain) + task_function = get_task_function(task) + result = task_function() + except (PulpException, PulpGlueException): + exc_type, exc, _ = sys.exc_info() + exception_info = (exc_type, exc, None) + except Exception: + exc_type, exc, tb = sys.exc_info() + exception_info = (exc_type, InternalErrorException(), tb) + + # Release locks BEFORE transitioning to final state + # This ensures resources are freed even if we crash + # during state transition + safe_release_task_locks(task) + + # NOW transition to final state after locks are released + if exception_info: + exc_type, exc, tb = exception_info + task.set_failed(exc) + log_task_failed(task, exc_type, exc, tb, domain) + send_task_notification(task) + return None + else: + task.set_completed(result) + log_task_completed(task, domain) + send_task_notification(task) + return result + finally: + # Safety net: if we crashed before reaching the lock release above, + # still try to release locks here (e.g., if crash during task execution) + if safe_release_task_locks(task): + _logger.warning( + "SAFETY NET: Task %s releasing all locks in " + "finally block (this shouldn't normally happen)", + task.pk, + ) + + +async def aexecute_task(task): + """Redis-aware async task execution that releases Redis locks for immediate tasks.""" + # This extra stack is needed to isolate the current_task ContextVar + await contextvars.copy_context().run(_aexecute_task, task) + + +async def _aexecute_task(task): + try: + # Log execution context information + current_app = await sync_to_async(AppStatus.objects.current)() + if current_app: + _logger.info( + "TASK EXECUTION (async): Task %s being executed by %s (app_type=%s)", + task.pk, + current_app.name, + current_app.app_type, + ) + else: + _logger.info( + "TASK EXECUTION (async): Task %s being executed with no AppStatus.current()", + task.pk, + ) + + async with awith_task_context(task): + await sync_to_async(task.set_running)() + domain = get_domain() + exception_info = None + result = None + + # Execute task and capture result or exception + try: + task_coroutine_fn = await aget_task_function(task) + result = await task_coroutine_fn() + except (PulpException, PulpGlueException): + exc_type, exc, _ = sys.exc_info() + exception_info = (exc_type, exc, None) + except Exception: + exc_type, exc, tb = sys.exc_info() + exception_info = (exc_type, InternalErrorException(), tb) + + # Release locks BEFORE transitioning to final state + await async_safe_release_task_locks(task) + + # NOW transition to final state after locks are released + if exception_info: + exc_type, exc, tb = exception_info + await sync_to_async(task.set_failed)(exc) + log_task_failed(task, exc_type, exc, tb, domain) + send_task_notification(task) + return None + else: + await sync_to_async(task.set_completed)(result) + log_task_completed(task, domain) + send_task_notification(task) + return result + finally: + # Safety net: if we crashed before reaching the lock release above, + # release all locks atomically here (task lock + resource locks) + if await async_safe_release_task_locks(task): + _logger.warning( + "SAFETY NET (async): Task %s releasing all locks " + "in finally block (this shouldn't normally happen)", + task.pk, + ) + + +def are_resources_available(task: Task) -> bool: + """ + Atomically try to acquire task lock and resource locks for immediate task. + + Resource conflicts are handled by Redis lock acquisition - if another task + holds conflicting resource locks, acquire_locks() will fail atomically. + + Args: + task: The task to acquire locks for. + + Returns: + bool: True if all locks were acquired, False otherwise. + """ + redis_conn = get_redis_connection() + if not redis_conn: + _logger.error("Redis connection not available for immediate task locking") + return False + + # Extract resources from task + exclusive_resources, shared_resources = extract_task_resources(task) + + # Use AppStatus.current() to get a worker identifier for the lock value + # For immediate tasks, we use a special identifier + current_app = AppStatus.objects.current() + lock_owner = current_app.name if current_app else f"immediate-{task.pk}" + + # Build task lock key + task_lock_key = get_task_lock_key(task.pk) + + try: + # Atomically acquire task lock + all resource locks + blocked_resource_list = acquire_locks( + redis_conn, lock_owner, task_lock_key, exclusive_resources, shared_resources + ) + + if not blocked_resource_list: + # All locks acquired successfully + _logger.debug( + "Successfully acquired task lock and all resource locks for immediate task %s", + task.pk, + ) + return True + else: + # Failed to acquire locks + _logger.debug( + "Failed to acquire locks for immediate task %s (blocked: %s)", + task.pk, + blocked_resource_list, + ) + return False + + except Exception as e: + _logger.error("Error acquiring locks for immediate task %s: %s", task.pk, e) + return False + + +async def async_are_resources_available(task: Task) -> bool: + """ + Atomically try to acquire task lock and resource locks for immediate task (async version). + + Resource conflicts are handled by Redis lock acquisition - if another task + holds conflicting resource locks, acquire_locks() will fail atomically. + + Args: + task: The task to acquire locks for. + + Returns: + bool: True if all locks were acquired, False otherwise. + """ + redis_conn = get_redis_connection() + if not redis_conn: + _logger.error("Redis connection not available for immediate task locking") + return False + + # Extract resources from task + exclusive_resources, shared_resources = extract_task_resources(task) + + # Use AppStatus.current() to get a worker identifier for the lock value + # For immediate tasks, we use a special identifier + current_app = await sync_to_async(AppStatus.objects.current)() + lock_owner = current_app.name if current_app else f"immediate-{task.pk}" + + # Build task lock key + task_lock_key = get_task_lock_key(task.pk) + + try: + # Atomically acquire task lock + all resource locks + blocked_resource_list = await sync_to_async(acquire_locks)( + redis_conn, lock_owner, task_lock_key, exclusive_resources, shared_resources + ) + + if not blocked_resource_list: + # All locks acquired successfully + _logger.debug( + "Successfully acquired task lock and all resource locks for immediate task %s", + task.pk, + ) + return True + else: + # Failed to acquire locks + _logger.debug( + "Failed to acquire locks for immediate task %s (blocked: %s)", + task.pk, + blocked_resource_list, + ) + return False + + except Exception as e: + _logger.error("Error acquiring locks for immediate task %s: %s", task.pk, e) + return False + + +def dispatch( + func, + args=None, + kwargs=None, + task_group=None, + exclusive_resources=None, + shared_resources=None, + immediate=False, + deferred=True, + versions=None, +): + """ + Enqueue a message to Pulp workers with Redis-based resource locking. + + This version uses Redis distributed locks instead of PostgreSQL advisory locks. + + Args: + func (callable | str): The function to be run when the necessary locks are acquired. + args (tuple): The positional arguments to pass on to the task. + kwargs (dict): The keyword arguments to pass on to the task. + task_group (pulpcore.app.models.TaskGroup): A TaskGroup to add the created Task to. + exclusive_resources (list): A list of resources this task needs exclusive access to while + running. Each resource can be either a `str` or a `django.models.Model` instance. + shared_resources (list): A list of resources this task needs non-exclusive access to while + running. Each resource can be either a `str` or a `django.models.Model` instance. + immediate (bool): Whether to allow running this task immediately. It must be guaranteed to + execute fast without blocking. If not all resource constraints are met, the task will + either be returned in a canceled state or, if `deferred` is `True` be left in the queue + to be picked up by a worker eventually. Defaults to `False`. + deferred (bool): Whether to allow defer running the task to a pulpcore_worker. Defaults to + `True`. `immediate` and `deferred` cannot both be `False`. + versions (Optional[Dict[str, str]]): Minimum versions of components by app_label the worker + must provide to handle the task. + + Returns (pulpcore.app.models.Task): The Pulp Task that was created. + + Raises: + ValueError: When `resources` is an unsupported type. + """ + + execute_now = immediate and not called_from_content_app() + assert deferred or immediate, "A task must be at least `deferred` or `immediate`." + function_name = get_function_name(func) + versions = get_version(versions, function_name) + _, resources = get_resources(exclusive_resources, shared_resources, immediate) + app_lock = None if not execute_now else AppStatus.objects.current() # Lazy evaluation... + task_payload = get_task_payload( + function_name, task_group, args, kwargs, resources, versions, immediate, deferred, app_lock + ) + task = Task.objects.create(**task_payload) + if execute_now: + # Try to atomically acquire task lock and resource locks + # are_resources_available() now acquires ALL locks atomically + if are_resources_available(task): + # All locks acquired successfully + # Reload task state from database to check if it's still WAITING + # (a worker might have executed it between creation and lock acquisition) + task.refresh_from_db() + + if task.state != TASK_STATES.WAITING: + # Task was already executed by a worker, release locks and return + _logger.info( + "IMMEDIATE DISPATCH: Task %s already in state " + "'%s', releasing locks without execution", + task.pk, + task.state, + ) + safe_release_task_locks(task) + return task + + # Task is still WAITING, proceed with execution + current_app = AppStatus.objects.current() + lock_owner = current_app.name if current_app else f"immediate-{task.pk}" + try: + _logger.info( + "IMMEDIATE DISPATCH: Task %s acquired all locks," + " executing in API process (AppStatus=%s)", + task.pk, + lock_owner, + ) + with using_workdir(): + execute_task(task) + except Exception: + # Exception during execute_task() + # Atomically release all locks as safety net + safe_release_task_locks(task, lock_owner) + raise + elif deferred: + # Locks not available, defer to worker + # No locks were acquired (atomic operation failed), so nothing to clean up + _logger.info( + "IMMEDIATE DISPATCH: Task %s could not acquire locks, deferring to worker", task.pk + ) + else: + # Can't acquire locks and can't be deferred - cancel task + # No locks were acquired, so just set state + task.set_canceling() + task.set_canceled(TASK_STATES.CANCELED, "Resources temporarily unavailable.") + return task + + +async def adispatch( + func, + args=None, + kwargs=None, + task_group=None, + exclusive_resources=None, + shared_resources=None, + immediate=False, + deferred=True, + versions=None, +): + """Async version of Redis-based dispatch.""" + execute_now = immediate and not called_from_content_app() + assert deferred or immediate, "A task must be at least `deferred` or `immediate`." + function_name = get_function_name(func) + versions = get_version(versions, function_name) + _, resources = get_resources(exclusive_resources, shared_resources, immediate) + app_lock = None if not execute_now else AppStatus.objects.current() # Lazy evaluation... + task_payload = get_task_payload( + function_name, task_group, args, kwargs, resources, versions, immediate, deferred, app_lock + ) + task = await Task.objects.acreate(**task_payload) + if execute_now: + # Try to atomically acquire task lock and resource locks + # async_are_resources_available() now acquires ALL locks atomically + if await async_are_resources_available(task): + # All locks acquired successfully + # Reload task state from database to check if it's still WAITING + # (a worker might have executed it between creation and lock acquisition) + await task.arefresh_from_db() + + if task.state != TASK_STATES.WAITING: + # Task was already executed by a worker, release locks and return + _logger.info( + "IMMEDIATE DISPATCH (async): Task %s already in" + " state '%s', releasing locks without execution", + task.pk, + task.state, + ) + await async_safe_release_task_locks(task) + return task + + # Task is still WAITING, proceed with execution + current_app = await sync_to_async(AppStatus.objects.current)() + lock_owner = current_app.name if current_app else f"immediate-{task.pk}" + try: + _logger.info( + "IMMEDIATE DISPATCH (async): Task %s acquired " + "all locks, executing in API process " + "(AppStatus=%s)", + task.pk, + lock_owner, + ) + with using_workdir(): + await aexecute_task(task) + except Exception: + # Exception during aexecute_task() + # Atomically release all locks as safety net + await async_safe_release_task_locks(task, lock_owner) + raise + elif deferred: + # Locks not available, defer to worker + # No locks were acquired (atomic operation failed), so nothing to clean up + _logger.info( + "IMMEDIATE DISPATCH (async): Task %s could not acquire locks, deferring to worker", + task.pk, + ) + else: + # Can't acquire locks and can't be deferred - cancel task + # No locks were acquired, so just set state + await sync_to_async(task.set_canceling)() + await sync_to_async(task.set_canceled)( + TASK_STATES.CANCELED, "Resources temporarily unavailable." + ) + return task diff --git a/pulpcore/tasking/redis_worker.py b/pulpcore/tasking/redis_worker.py new file mode 100644 index 0000000000..407cad239b --- /dev/null +++ b/pulpcore/tasking/redis_worker.py @@ -0,0 +1,783 @@ +""" +Redis-based worker implementation using distributed lock-based task fetching. + +This implementation uses a fundamentally different algorithm where workers compete +directly for task resources using Redis distributed locks, eliminating the need +for the unblocking mechanism and all task cancellation support. +""" + +from gettext import gettext as _ +import functools +import logging +import os +import random +import select +import signal +import time +from datetime import timedelta +from multiprocessing import Process +from tempfile import TemporaryDirectory + +from django.conf import settings +from django.db import connection, transaction, DatabaseError, IntegrityError +from django.utils import timezone + +from pulpcore.constants import ( + TASK_STATES, + TASK_INCOMPLETE_STATES, + TASK_FINAL_STATES, + TASK_SCHEDULING_LOCK, + WORKER_CLEANUP_LOCK, + TASK_METRICS_LOCK, +) +from pulpcore.metrics import init_otel_meter +from pulpcore.app.apps import pulp_plugin_configs +from pulpcore.app.util import get_worker_name +from pulpcore.app.models import Task, AppStatus +from pulpcore.app.redis_connection import get_redis_connection +from pulpcore.tasking.storage import WorkerDirectory +from pulpcore.tasking._util import ( + dispatch_scheduled_tasks, + perform_task, + startup_hook, +) +from pulpcore.tasking.redis_locks import ( + release_resource_locks, + acquire_locks, + extract_task_resources, + get_task_lock_key, +) +from pulpcore.tasking.tasks import using_workdir +from pulpcore.tasking.redis_tasks import execute_task + + +_logger = logging.getLogger(__name__) +random.seed() + +# Seconds for a task to finish on semi graceful worker shutdown (approx) +TASK_GRACE_INTERVAL = settings.TASK_GRACE_INTERVAL +# Seconds between attempts to kill the subprocess (approx) +TASK_KILL_INTERVAL = 1 +# Number of heartbeats between cleaning up worker processes +WORKER_CLEANUP_INTERVAL = 100 +# Number of heartbeats between rechecking ignored tasks +IGNORED_TASKS_CLEANUP_INTERVAL = 100 +# Number of heartbeats between recording metrics +METRIC_HEARTBEAT_INTERVAL = 3 +# Number of tasks to fetch in each query +FETCH_TASK_LIMIT = 20 + + +def exclusive(lock): + """ + Runs function in a transaction holding the specified lock. + Returns None if the lock could not be acquired. + It should be used for actions that only need to be performed by a single worker. + """ + + def _decorator(f): + @functools.wraps(f) + def _f(self, *args, **kwargs): + with transaction.atomic(): + with connection.cursor() as cursor: + cursor.execute("SELECT pg_try_advisory_xact_lock(%s, %s)", [0, lock]) + acquired = cursor.fetchone()[0] + if acquired: + return f(self, *args, **kwargs) + else: + return None + + return _f + + return _decorator + + +class RedisWorker: + """ + Worker implementation using Redis distributed lock-based resource acquisition. + + This worker uses a simpler algorithm where: + 1. Query waiting tasks (sorted by creation time, limited) + 2. For each task, try to acquire Redis distributed locks for all resources + 3. If all locks acquired, claim the task + 4. Process resources in deterministic (sorted) order to prevent deadlocks + 5. Lock values contain worker names to enable cleanup of stale locks + + Note: This implementation does NOT support task cancellation. + """ + + def __init__(self): + # Notification states from signal handlers + self.shutdown_requested = False + self.wakeup_handle = False + + self.ignored_task_ids = [] + self.ignored_task_countdown = IGNORED_TASKS_CLEANUP_INTERVAL + + self.task = None + self.name = get_worker_name() + self.heartbeat_period = timedelta(seconds=settings.WORKER_TTL / 3) + self.versions = {app.label: app.version for app in pulp_plugin_configs()} + self.app_status = AppStatus.objects.create( + name=self.name, app_type="worker", versions=self.versions + ) + + # This defaults to immediate task cancellation. + # It will be set into the future on moderately graceful worker shutdown, + # and set to None for fully graceful shutdown. + self.task_grace_timeout = timezone.now() + + self.worker_cleanup_countdown = random.randint( + int(WORKER_CLEANUP_INTERVAL / 10), WORKER_CLEANUP_INTERVAL + ) + + # Metric recording interval + self.metric_heartbeat_countdown = METRIC_HEARTBEAT_INTERVAL + + # Cache worker count for sleep calculation (updated during beat) + self.num_workers = 1 + + # Redis connection for distributed locks + self.redis_conn = get_redis_connection() + + # Add a file descriptor to trigger select on signals + self.sentinel, sentinel_w = os.pipe() + os.set_blocking(self.sentinel, False) + os.set_blocking(sentinel_w, False) + signal.set_wakeup_fd(sentinel_w) + + self._init_instrumentation() + + startup_hook() + + _logger.info("Initialized RedisWorker with Redis lock-based algorithm") + + def _init_instrumentation(self): + """Initialize OpenTelemetry instrumentation if enabled.""" + if settings.OTEL_ENABLED: + meter = init_otel_meter("pulp-worker") + self.waiting_tasks_meter = meter.create_gauge( + name="waiting_tasks", + description="Number of waiting and running tasks minus the number of workers.", + unit="tasks", + ) + self.otel_enabled = True + else: + self.otel_enabled = False + + def _signal_handler(self, thesignal, frame): + """Handle shutdown signals.""" + if thesignal in (signal.SIGHUP, signal.SIGTERM): + _logger.info(_("Worker %s was requested to shut down gracefully."), self.name) + # Wait forever... + self.task_grace_timeout = None + else: + # Reset signal handlers to default + # If you kill the process a second time it's not graceful anymore. + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGHUP, signal.SIG_DFL) + + _logger.info(_("Worker %s was requested to shut down."), self.name) + self.task_grace_timeout = timezone.now() + timezone.timedelta( + seconds=TASK_GRACE_INTERVAL + ) + self.shutdown_requested = True + + def shutdown(self): + """Cleanup worker on shutdown.""" + self.app_status.delete() + _logger.info(_("Worker %s was shut down."), self.name) + + def handle_worker_heartbeat(self): + """ + Update worker heartbeat records. + + If the update fails (the record was deleted, the database is unreachable, ...) the worker + is shut down. + """ + msg = "Worker heartbeat from '{name}' at time {timestamp}".format( + timestamp=self.app_status.last_heartbeat, name=self.name + ) + try: + self.app_status.save_heartbeat() + _logger.debug(msg) + except (IntegrityError, DatabaseError): + _logger.error(f"Updating the heartbeat of worker {self.name} failed.") + self.shutdown_requested = True + + def cleanup_ignored_tasks(self): + """Remove tasks from ignored list that are no longer incomplete.""" + for pk in ( + Task.objects.filter(pk__in=self.ignored_task_ids) + .exclude(state__in=TASK_INCOMPLETE_STATES) + .values_list("pk", flat=True) + ): + self.ignored_task_ids.remove(pk) + + def cleanup_redis_locks_for_worker(self, app_worker): + """ + Clean up Redis locks held by a specific worker and fail its tasks. + + This is called when a worker is detected as missing to: + 1. Query the database for tasks held by the worker (via app_lock FK) + 2. Release the task's Redis resource locks + 3. Set app_lock to the current (cleaning) worker + 4. Mark those tasks as FAILED + + If the database query finds no tasks, falls back to scanning Redis + to catch the edge case where a worker crashed between fetch_task() + (Redis locks acquired) and set_running() (which sets app_lock). + In that case the task is still WAITING, so we only release the Redis + locks and let another worker pick it up. + + Args: + app_worker (AppStatus): The AppStatus object of the missing worker + """ + if not self.redis_conn: + return + + worker_name = app_worker.name + + try: + # Primary path: query database for the task held by the missing worker. + # A worker runs at most one task at a time, so we expect at most one. + task = ( + Task.objects.filter(app_lock=app_worker) + .exclude(state__in=TASK_FINAL_STATES) + .select_related("pulp_domain") + .first() + ) + + if task: + # Extract resources from the task's reserved_resources_record + exclusive_resources, shared_resources = extract_task_resources(task) + + # Release Redis locks using the missing worker's name as the lock owner + task_lock_key = get_task_lock_key(task.pk) + release_resource_locks( + self.redis_conn, + worker_name, + task_lock_key, + exclusive_resources, + shared_resources, + ) + _logger.info( + "Released task lock + %d exclusive + %d shared resource locks " + "for task %s from missing worker %s", + len(exclusive_resources), + len(shared_resources), + task.pk, + worker_name, + ) + + # Set app_lock to the current (cleaning) worker so set_failed() + # ownership check passes + Task.objects.filter(pk=task.pk).update(app_lock=self.app_status) + task.app_lock = self.app_status + + # Set to canceling first + task.set_canceling() + error_msg = "Worker has gone missing." + task.set_canceled(final_state=TASK_STATES.FAILED, reason=error_msg) + _logger.warning( + "Marked task %s as FAILED " "(was being executed by missing worker %s)", + task.pk, + worker_name, + ) + else: + # Fallback: scan Redis for an orphaned lock if no task found in DB. + # This catches the edge case where a worker crashed between + # fetch_task() (Redis locks acquired) and set_running() (sets + # app_lock). In this window the task is still WAITING with no + # app_lock, so we just release the Redis locks and let another + # worker pick it up. + task_lock_pattern = "task:*" + for key in self.redis_conn.scan_iter(match=task_lock_pattern, count=100): + lock_holder = self.redis_conn.get(key) + if lock_holder and lock_holder.decode("utf-8") == worker_name: + task_uuid = key.decode("utf-8").split(":", 1)[1] + + try: + task = Task.objects.select_related("pulp_domain").get(pk=task_uuid) + + # Extract resources and release Redis locks + exclusive_resources, shared_resources = extract_task_resources(task) + task_lock_key = get_task_lock_key(task_uuid) + release_resource_locks( + self.redis_conn, + worker_name, + task_lock_key, + exclusive_resources, + shared_resources, + ) + _logger.info( + "Fallback: released Redis locks for task %s " + "from missing worker %s (task remains %s)", + task_uuid, + worker_name, + task.state, + ) + except Task.DoesNotExist: + _logger.warning( + "Task %s locked by missing worker %s " "not found in database", + task_uuid, + worker_name, + ) + # Delete the orphaned Redis task lock + self.redis_conn.delete(key) + except Exception as e: + _logger.error("Error cleaning up locks for worker %s: %s", worker_name, e) + + @exclusive(WORKER_CLEANUP_LOCK) + def app_worker_cleanup(self): + """Cleanup records of missing app processes and their Redis locks.""" + qs = AppStatus.objects.missing() + for app_worker in qs: + _logger.warning( + "Cleanup record of missing %s process %s.", app_worker.app_type, app_worker.name + ) + # Clean up any Redis locks held by this missing process + # This includes workers and API processes (which can hold locks for immediate tasks) + self.cleanup_redis_locks_for_worker(app_worker) + qs.delete() + + @exclusive(TASK_SCHEDULING_LOCK) + def dispatch_scheduled_tasks(self): + """Dispatch scheduled tasks.""" + dispatch_scheduled_tasks() + + @exclusive(TASK_METRICS_LOCK) + def record_waiting_tasks_metric(self): + """ + Record metrics for waiting tasks in the queue. + + This method counts all tasks in RUNNING or WAITING state that are older + than 5 seconds, then subtracts the number of active workers to get the + number of tasks waiting to be picked up by workers. + """ + # Calculate the cutoff time (5 seconds ago) + cutoff_time = timezone.now() - timedelta(seconds=5) + + # Count tasks in RUNNING or WAITING state older than 5 seconds + task_count = Task.objects.filter( + state__in=[TASK_STATES.RUNNING, TASK_STATES.WAITING], pulp_created__lt=cutoff_time + ).count() + + # Calculate waiting tasks: total tasks - workers + waiting_tasks = task_count - self.num_workers + + # Set the metric value + self.waiting_tasks_meter.set(waiting_tasks) + + _logger.debug( + "Waiting tasks metric: %d tasks (%d total tasks older than 5s - %d workers)", + waiting_tasks, + task_count, + self.num_workers, + ) + + def beat(self): + """Periodic worker maintenance tasks (heartbeat, cleanup, etc.).""" + now = timezone.now() + if self.app_status.last_heartbeat < now - self.heartbeat_period: + self.handle_worker_heartbeat() + if self.ignored_task_ids: + self.ignored_task_countdown -= 1 + if self.ignored_task_countdown <= 0: + self.ignored_task_countdown = IGNORED_TASKS_CLEANUP_INTERVAL + self.cleanup_ignored_tasks() + + self.worker_cleanup_countdown -= 1 + if self.worker_cleanup_countdown <= 0: + self.worker_cleanup_countdown = WORKER_CLEANUP_INTERVAL + self.app_worker_cleanup() + + self.dispatch_scheduled_tasks() + + # Record metrics periodically + if self.otel_enabled: + self.metric_heartbeat_countdown -= 1 + if self.metric_heartbeat_countdown <= 0: + self.metric_heartbeat_countdown = METRIC_HEARTBEAT_INTERVAL + self.record_waiting_tasks_metric() + + # Update cached worker count for sleep calculation + self.num_workers = AppStatus.objects.online().filter(app_type="worker").count() + + def _release_resource_locks(self, task_lock_key, resources, shared_resources=None): + """ + Atomically release task lock and resource locks. + + Uses a Lua script to ensure we only release locks that we own. + + Args: + task_lock_key (str): Redis key for the task lock (e.g., "task:{task_id}") + resources (list): List of exclusive resource names to release locks for + shared_resources (list): Optional list of shared resource names + """ + release_resource_locks( + self.redis_conn, self.name, task_lock_key, resources, shared_resources + ) + + def _maybe_release_locks(self, task, mark_released=True): + """ + Release locks for a task if not already released. + + Args: + task: Task object to release locks for + mark_released (bool): Whether to mark locks as released (default: True) + + Returns: + bool: True if locks were released, False if already released + """ + if not getattr(task, "_all_locks_released", False): + exclusive_resources, shared_resources = extract_task_resources(task) + task_lock_key = get_task_lock_key(task.pk) + self._release_resource_locks( + task_lock_key, exclusive_resources or [], shared_resources or [] + ) + if mark_released: + task._all_locks_released = True + return True + return False + + def is_compatible(self, task): + """ + Check if this worker is compatible with the task's version requirements. + + Args: + task: Task object + + Returns: + bool: True if compatible, False otherwise + """ + from packaging.version import parse as parse_version + + unmatched_versions = [ + f"task: {label}>={version} worker: {self.versions.get(label)}" + for label, version in task.versions.items() + if label not in self.versions + or parse_version(self.versions[label]) < parse_version(version) + ] + if unmatched_versions: + domain = task.pulp_domain + _logger.info( + _("Incompatible versions to execute task %s in domain: %s by worker %s: %s"), + task.pk, + domain.name, + self.name, + ",".join(unmatched_versions), + ) + return False + return True + + def fetch_task(self): + """ + Fetch an available waiting task using Redis locks. + + This method: + 1. Queries waiting tasks (sorted by creation time, limited) + 2. For each task, attempts to acquire Redis distributed locks for exclusive resources + 3. If resource locks acquired, attempts to claim the task + with a Redis task lock (24h expiration) + 4. Returns the first task for which both locks can be acquired + + Returns: + Task: A task object if one was successfully locked, None otherwise + """ + # Query waiting tasks, sorted by creation time, limited + waiting_tasks = ( + Task.objects.filter(state=TASK_STATES.WAITING) + .exclude(pk__in=self.ignored_task_ids) + .order_by("pulp_created") + .select_related("pulp_domain")[:FETCH_TASK_LIMIT] + ) + + # Track resources that are blocked to preserve FIFO ordering + # blocked_exclusive: resources where an earlier task wanted exclusive access and failed + # blocked_shared: resources where an earlier task wanted shared access and failed + blocked_exclusive = set() + blocked_shared = set() + + # Try to acquire locks for each task + for task in waiting_tasks: + try: + # Extract resources from task + exclusive_resources, shared_resources = extract_task_resources(task) + + # Check if this task should skip to preserve FIFO ordering + should_skip = False + + # Skip if we need exclusive access but an earlier task already tried and failed + for resource in exclusive_resources: + if resource in blocked_exclusive or resource in blocked_shared: + should_skip = True + break + + # Skip if we need shared access but earlier task wanted it and exclusive lock exists + if not should_skip: + for resource in shared_resources: + if resource in blocked_shared: + should_skip = True + break + + if should_skip: + continue + + # Atomically try to acquire task lock and resource locks in a single operation + task_lock_key = get_task_lock_key(task.pk) + + blocked_resource_list = acquire_locks( + self.redis_conn, self.name, task_lock_key, exclusive_resources, shared_resources + ) + + if not blocked_resource_list: + # All locks acquired successfully (task lock + resource locks)! + return task + else: + # Failed to acquire locks (task lock or resource locks blocked) + # No cleanup needed - Lua script is all-or-nothing + if "__task_lock__" not in blocked_resource_list: + # Mark resources as blocked for FIFO ordering + # If this task wanted exclusive access, + # block exclusive access for later tasks + for resource in exclusive_resources: + if resource in blocked_resource_list: + blocked_exclusive.add(resource) + # If this task wanted shared access, block shared access for later tasks + # (only if exclusive lock exists, which is why it failed) + for resource in shared_resources: + if resource in blocked_resource_list: + blocked_shared.add(resource) + continue + + except Exception as e: + _logger.error("Error processing task %s: %s", task.pk, e) + continue + + # No task could be locked + return None + + def supervise_immediate_task(self, task): + """Call and supervise the immediate async task process. + + This function must only be called while holding the lock for that task.""" + self.task = task + _logger.info( + "WORKER IMMEDIATE EXECUTION: Worker %s executing immediate task %s in domain: %s", + self.name, + task.pk, + task.pulp_domain.name, + ) + with using_workdir(): + execute_task(task) + self.task = None + + def supervise_task(self, task): + """Call and supervise the task process while heart beating. + + This function must only be called while holding the lock for that task. + Supports task cancellation via Redis signals.""" + + from pulpcore.tasking.redis_tasks import check_cancel_signal, clear_cancel_signal + + self.task = task + cancel_state = None + cancel_reason = None + domain = task.pulp_domain + _logger.info( + "WORKER DEFERRED EXECUTION: Worker %s executing deferred task %s in domain: %s", + self.name, + task.pk, + domain.name, + ) + with TemporaryDirectory(dir=".") as task_working_dir_rel_path: + task_process = Process(target=perform_task, args=(task.pk, task_working_dir_rel_path)) + task_process.start() + + # Heartbeat while waiting for task to complete + while task_process.is_alive(): + if cancel_state: + if self.task_grace_timeout is None or self.task_grace_timeout > timezone.now(): + _logger.info("Wait for canceled task to abort.") + else: + self.task_grace_timeout = timezone.now() + timezone.timedelta( + seconds=TASK_KILL_INTERVAL + ) + _logger.info( + "Aborting current task %s in domain: %s due to cancellation.", + task.pk, + domain.name, + ) + os.kill(task_process.pid, signal.SIGUSR1) + + # Wait for a short period or until process completes + r, w, x = select.select( + [self.sentinel, task_process.sentinel], + [], + [], + self.heartbeat_period.seconds, + ) + # Call beat to keep worker heartbeat alive and perform periodic tasks + self.beat() + + # Check for cancellation signal + if check_cancel_signal(task.pk): + _logger.info( + _("Received signal to cancel current task %s in domain: %s."), + task.pk, + domain.name, + ) + cancel_state = TASK_STATES.CANCELED + clear_cancel_signal(task.pk) + + if self.sentinel in r: + os.read(self.sentinel, 256) + + if task_process.sentinel in r: + if not task_process.is_alive(): + break + + # If shutdown was requested, handle gracefully or abort + if self.shutdown_requested: + if self.task_grace_timeout is None or self.task_grace_timeout > timezone.now(): + msg = ( + "Worker shutdown requested, waiting for task {pk} in domain: {name} " + "to finish.".format(pk=task.pk, name=domain.name) + ) + _logger.info(msg) + else: + _logger.info( + "Aborting current task %s in domain: %s due to worker shutdown.", + task.pk, + domain.name, + ) + cancel_state = TASK_STATES.FAILED + cancel_reason = "Aborted during worker shutdown." + + task_process.join() + if not cancel_state and task_process.exitcode != 0: + _logger.warning( + "Task process for %s exited with non zero exitcode %i.", + task.pk, + task_process.exitcode, + ) + cancel_state = TASK_STATES.FAILED + if task_process.exitcode < 0: + cancel_reason = "Killed by signal {sig_num}.".format( + sig_num=-task_process.exitcode + ) + + # Handle cancellation after task process has finished + if cancel_state: + from pulpcore.tasking._util import delete_incomplete_resources + + # Reload task from database to get current state + task.refresh_from_db() + # Only clean up if task is not already in a final state + # (subprocess may have already handled cancellation) + if task.state not in TASK_FINAL_STATES: + # Release locks BEFORE setting canceled state + # Atomically release task lock + resource locks in a single operation + self._maybe_release_locks(task) + + task.set_canceling() + _logger.info( + "Cleaning up task %s in domain: %s and marking as %s.", + task.pk, + domain.name, + cancel_state, + ) + delete_incomplete_resources(task) + task.set_canceled(final_state=cancel_state, reason=cancel_reason) + + self.task = None + + def handle_tasks(self): + """Pick and supervise tasks until there are no more available tasks.""" + while not self.shutdown_requested: + task = None + try: + task = self.fetch_task() + if task is None: + # No task found + break + + if not self.is_compatible(task): + # Incompatible task, add to ignored list + self.ignored_task_ids.append(task.pk) + # Atomically release task lock + resource locks so other workers can attempt it + self._maybe_release_locks(task, mark_released=False) + break + + # Check if task is still WAITING after acquiring locks + # (an API process might have executed it between query and lock acquisition) + task.refresh_from_db() + if task.state != TASK_STATES.WAITING: + # Task was already executed, release locks and skip + _logger.info( + "Task %s already in state '%s' after acquiring locks, skipping execution", + task.pk, + task.state, + ) + self._maybe_release_locks(task) + continue + + # Task is compatible and still waiting, execute it + if task.immediate: + self.supervise_immediate_task(task) + else: + self.supervise_task(task) + finally: + # Safety net: if _execute_task() crashed before releasing locks, + # atomically release all locks here (task lock + resource locks) + # NOTE: Only for immediate tasks that execute in this process. + # Deferred tasks execute in subprocess which handles its own lock release. + if task and task.immediate: + self._maybe_release_locks(task) + + def sleep(self): + """Sleep while calling beat() to maintain heartbeat and perform periodic tasks. + + Sleep time = (num_workers * 10ms) + random_jitter(0.5ms, 1.5ms) + """ + # Calculate sleep time: (num_workers * 10ms) + jitter(0.5-1.5ms) + base_sleep_ms = self.num_workers * 10.0 + jitter_ms = random.uniform(0.5, 1.5) + sleep_time_seconds = (base_sleep_ms + jitter_ms) / 1000.0 + + _logger.debug( + _("Worker %s sleeping for %.4f seconds (workers=%d)"), + self.name, + sleep_time_seconds, + self.num_workers, + ) + + # Call beat before sleeping to maintain heartbeat and perform periodic tasks + self.beat() + + time.sleep(sleep_time_seconds) + + def run(self, burst=False): + """Main worker loop.""" + with WorkerDirectory(self.name): + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + signal.signal(signal.SIGHUP, self._signal_handler) + + if burst: + # Burst mode: process tasks until none are available + self.handle_tasks() + else: + # Normal mode: loop and sleep when no tasks available + while not self.shutdown_requested: + if self.shutdown_requested: + break + self.handle_tasks() + if self.shutdown_requested: + break + # Sleep until work arrives or heartbeat needed + self.sleep() + + self.shutdown() diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index eb270250c7..42e3a7803e 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -277,7 +277,23 @@ def dispatch( Raises: ValueError: When `resources` is an unsupported type. """ + # Check WORKER_TYPE setting and delegate to appropriate implementation + if settings.WORKER_TYPE == "redis": + from pulpcore.tasking.redis_tasks import dispatch as redis_dispatch + + return redis_dispatch( + func, + args, + kwargs, + task_group, + exclusive_resources, + shared_resources, + immediate, + deferred, + versions, + ) + # Original pulpcore implementation using PostgreSQL advisory locks execute_now = immediate and not called_from_content_app() assert deferred or immediate, "A task must be at least `deferred` or `immediate`." send_wakeup_signal = not execute_now @@ -319,6 +335,23 @@ async def adispatch( versions=None, ): """Async version of dispatch.""" + # Check WORKER_TYPE setting and delegate to appropriate implementation + if settings.WORKER_TYPE == "redis": + from pulpcore.tasking.redis_tasks import adispatch as redis_adispatch + + return await redis_adispatch( + func, + args, + kwargs, + task_group, + exclusive_resources, + shared_resources, + immediate, + deferred, + versions, + ) + + # Original pulpcore implementation using PostgreSQL advisory locks execute_now = immediate and not called_from_content_app() assert deferred or immediate, "A task must be at least `deferred` or `immediate`." function_name = get_function_name(func) diff --git a/pulpcore/tests/functional/api/test_tasking.py b/pulpcore/tests/functional/api/test_tasking.py index 9906c2441f..410600c5f5 100644 --- a/pulpcore/tests/functional/api/test_tasking.py +++ b/pulpcore/tests/functional/api/test_tasking.py @@ -86,6 +86,60 @@ def test_multi_resource_locking(dispatch_task, monitor_task): assert task1.finished_at < task5.started_at +@pytest.mark.long_running +@pytest.mark.parallel +def test_worker_cleanup_on_missing_worker(dispatch_task, monitor_task, pulpcore_bindings): + """ + Test that when a worker dies unexpectedly while executing a task, + the worker cleanup process marks the task as failed and releases its locks, + allowing subsequent tasks requiring the same resource to execute. + """ + # Use a unique resource identifier to avoid conflicts with other tests + resource = str(uuid4()) + + # Dispatch the missing_worker task that will kill its worker process + task_href1 = dispatch_task( + "pulpcore.app.tasks.test.missing_worker", exclusive_resources=[resource] + ) + + # Wait for the task to start running and the worker to die + time.sleep(2) + + # Dispatch a second task that requires the same resource + task_href2 = dispatch_task( + "pulpcore.app.tasks.test.sleep", args=(1,), exclusive_resources=[resource] + ) + + # Wait for worker cleanup to run (happens every ~100 heartbeats) + # In tests, this should happen relatively quickly + # Monitor both tasks - the first should be marked as failed, + # and the second should complete successfully + max_wait = 600 # Wait up to 600 seconds for cleanup + start_time = time.time() + + task1 = None + task2 = None + + while time.time() - start_time < max_wait: + task1 = pulpcore_bindings.TasksApi.read(task_href1) + task2 = pulpcore_bindings.TasksApi.read(task_href2) + + # Check if task1 is failed and task2 is completed + if task1.state == "failed" and task2.state == "completed": + break + + time.sleep(1) + + # Verify task1 was marked as failed + assert task1.state == "failed", f"Task 1 should be failed but is {task1.state}" + assert ( + "worker" in task1.error["reason"].lower() and "missing" in task1.error["reason"].lower() + ), f"Task 1 error should mention missing worker: {task1.error['reason']}" + + # Verify task2 completed successfully after locks were released + assert task2.state == "completed", f"Task 2 should be completed but is {task2.state}" + + @pytest.mark.parallel def test_delete_cancel_waiting_task(dispatch_task, pulpcore_bindings): # Queue one task after a long running one @@ -490,6 +544,106 @@ def test_timeouts_on_api_worker(self, pulpcore_bindings, dispatch_task): assert "An internal error occurred." in task.error["description"] +@pytest.mark.parallel +def test_immediate_task_execution_in_worker(dispatch_task, monitor_task): + """ + GIVEN an immediate async task marked as deferred + AND a resource is blocked by another task + WHEN the immediate task cannot execute in the API due to blocked resource + THEN the task is deferred to a worker and executes successfully + + This test verifies that workers can correctly execute immediate async tasks + using aexecute_task() instead of execute_task(). + """ + # Use a unique resource to avoid conflicts with other tests + resource = str(uuid4()) + + # Dispatch a blocking task that holds the resource + blocking_task_href = dispatch_task( + "pulpcore.app.tasks.test.sleep", + args=(3,), # Runs for 3 seconds + exclusive_resources=[resource], + ) + + # Dispatch the immediate task that needs the same resource + # Since the resource is blocked, API cannot execute it immediately + # It will be deferred to a worker + task_href = dispatch_task( + "pulpcore.app.tasks.test.asleep", + args=(0.1,), # Short sleep to make test fast + immediate=True, + deferred=True, + exclusive_resources=[resource], + ) + + # Monitor the task - it should complete successfully after blocking task finishes + task = monitor_task(task_href) + + # Verify task completed successfully + assert task.state == "completed", f"Task should be completed but is {task.state}" + + # Verify state transitions occurred correctly + assert task.started_at is not None, "Task should have a started_at timestamp" + assert task.finished_at is not None, "Task should have a finished_at timestamp" + assert task.started_at < task.finished_at, "Task should have started before finishing" + + # Verify there's no error + assert task.error is None, f"Task should not have an error but has: {task.error}" + + # Clean up blocking task + monitor_task(blocking_task_href) + + +@pytest.mark.parallel +def test_failing_immediate_task_error_handling(dispatch_task, monitor_task): + """ + GIVEN a task that raises a RuntimeError + AND the task is an async function + WHEN dispatching the task as immediate and deferred + THEN the task fails with the correct error message + AND the error field contains the exception details + """ + custom_error_message = "This is a custom error message" + with pytest.raises(PulpTaskError) as ctx: + task_href = dispatch_task( + "pulpcore.app.tasks.test.afailing_task", + kwargs={"error_message": custom_error_message}, + immediate=True, + deferred=True, + ) + monitor_task(task_href) + + task = ctx.value.task + assert task.state == "failed" + assert task.error is not None + assert "description" in task.error + assert "PLP0000" in task.error["description"] + + +@pytest.mark.parallel +def test_failing_worker_task_error_handling(dispatch_task, monitor_task): + """ + GIVEN a task that raises a RuntimeError + AND the task is a sync function + WHEN dispatching the task as deferred (executes on worker) + THEN the task fails with the correct error message + AND the error field contains the exception details + """ + custom_error_message = "Worker task failed with custom error" + with pytest.raises(PulpTaskError) as ctx: + task_href = dispatch_task( + "pulpcore.app.tasks.test.failing_task", + kwargs={"error_message": custom_error_message}, + ) + monitor_task(task_href) + + task = ctx.value.task + assert task.state == "failed" + assert task.error is not None + assert "description" in task.error + assert "PLP0000" in task.error["description"] + + @pytest.fixture def resource_blocker(pulpcore_bindings, dispatch_task): @@ -513,6 +667,45 @@ def _resource_blocker(exclusive_resources: list[str], duration=20): return _resource_blocker +@pytest.mark.parallel +def test_immediate_task_with_available_resources(dispatch_task, pulpcore_bindings): + """ + Test that immediate tasks with resources can execute when resources are available. + + This test verifies that the are_resources_available() function correctly acquires + locks via acquire_locks() when resources are not blocked. This is important for + Redis worker implementation where immediate tasks need to acquire distributed locks. + + GIVEN an immediate async task with exclusive and shared resource requirements + WHEN the resources are available (not blocked by other tasks) + THEN the task executes immediately and completes successfully + AND verifies that acquire_locks() is properly imported and callable + """ + # Use unique resources to ensure they're not blocked + unique_exclusive = f"exclusive-{uuid4()}" + unique_shared = f"shared-{uuid4()}" + + task_href = dispatch_task( + "pulpcore.app.tasks.test.asleep", + args=(0.1,), # Quick execution + immediate=True, + deferred=False, + exclusive_resources=[unique_exclusive], + shared_resources=[unique_shared], + ) + + # Task should complete immediately since resources are available + task = pulpcore_bindings.TasksApi.read(task_href) + assert task.state == "completed", ( + f"Task should have completed immediately with available resources, " + f"but is in state: {task.state}" + ) + + # Verify task executed successfully + assert task.finished_at is not None + assert task.error is None + + class TestImmediateTaskWithBlockedResource: @pytest.mark.parallel diff --git a/template_config.yml b/template_config.yml index cfedee80fc..3d5b74bcef 100644 --- a/template_config.yml +++ b/template_config.yml @@ -85,6 +85,7 @@ pulp_settings_azure: - pulpcore.plugin.access_policy.DefaultAccessPolicy task_diagnostics: - memory + worker_type: redis pulp_settings_gcp: null pulp_settings_s3: DISABLED_authentication_backends: '@merge django.contrib.auth.backends.RemoteUserBackend'