Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ If this is your first time contributing, please read the Pull Request Walkthroug
- [ ] Follows the [Pulp policy on AI Usage](https://pulpproject.org/help/more/governance/ai_policy/)
- [ ] (For new features) - User documentation and test coverage has been added

See: [Pull Request Walkthrough](https://pulpproject.org/pulpcore/docs/dev/guides/pull-request-walkthrough/)
See: [Pull Request Walkthrough](https://pulpproject.org/pulpcore/docs/dev/guides/pull-request-walkthrough/)
2 changes: 1 addition & 1 deletion .github/workflows/scripts/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ if [ "$TEST" = "azure" ]; then
- ./azurite:/etc/pulp\
command: "azurite-blob --skipApiVersionCheck --blobHost 0.0.0.0"' vars/main.yaml
sed -i -e '$a azure_test: true\
pulp_scenario_settings: {"MEDIA_ROOT": "", "STORAGES": {"default": {"BACKEND": "storages.backends.azure_storage.AzureStorage", "OPTIONS": {"account_key": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "account_name": "devstoreaccount1", "azure_container": "pulp-test", "connection_string": "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://ci-azurite:10000/devstoreaccount1;", "expiration_secs": 120, "location": "pulp3", "overwrite_files": true}}, "staticfiles": {"BACKEND": "django.contrib.staticfiles.storage.StaticFilesStorage"}}, "api_root_rewrite_header": "X-API-Root", "content_origin": null, "domain_enabled": true, "rest_framework__default_authentication_classes": "@merge pulpcore.app.authentication.PulpRemoteUserAuthentication", "rest_framework__default_permission_classes": ["pulpcore.plugin.access_policy.DefaultAccessPolicy"], "task_diagnostics": ["memory"]}\
pulp_scenario_settings: {"MEDIA_ROOT": "", "STORAGES": {"default": {"BACKEND": "storages.backends.azure_storage.AzureStorage", "OPTIONS": {"account_key": "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", "account_name": "devstoreaccount1", "azure_container": "pulp-test", "connection_string": "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://ci-azurite:10000/devstoreaccount1;", "expiration_secs": 120, "location": "pulp3", "overwrite_files": true}}, "staticfiles": {"BACKEND": "django.contrib.staticfiles.storage.StaticFilesStorage"}}, "api_root_rewrite_header": "X-API-Root", "content_origin": null, "domain_enabled": true, "rest_framework__default_authentication_classes": "@merge pulpcore.app.authentication.PulpRemoteUserAuthentication", "rest_framework__default_permission_classes": ["pulpcore.plugin.access_policy.DefaultAccessPolicy"], "task_diagnostics": ["memory"], "worker_type": "redis"}\
pulp_scenario_env: {}\
' vars/main.yaml
fi
Expand Down
1 change: 1 addition & 0 deletions CHANGES/7210.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added WORKER_TYPE setting. Defaults to 'pulpcore'. 'redis' is also available.
14 changes: 14 additions & 0 deletions docs/admin/reference/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,20 @@ The number of seconds before a worker should be considered lost.

Defaults to `30` seconds.

### WORKER\_TYPE

The worker implementation to use for task execution.

Available options:

- `"pulpcore"` (default): Uses PostgreSQL advisory locks for task coordination. This is the traditional worker implementation.
- `"redis"`: Uses Redis distributed locks for task coordination. This implementation produces less load on the DB.

!!! note
The Redis worker requires a Redis server to be configured and accessible.

Defaults to `"pulpcore"`.

### WORKING\_DIRECTORY

The directory used by workers to stage files temporarily.
Expand Down
16 changes: 11 additions & 5 deletions pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
from gettext import gettext as _

from django.conf import settings
from django.contrib.postgres.fields import ArrayField, HStoreField
from django.contrib.postgres.indexes import GinIndex
from django.core.serializers.json import DjangoJSONEncoder
Expand Down Expand Up @@ -205,13 +206,18 @@ def set_running(self):
This updates the :attr:`started_at` and sets the :attr:`state` to :attr:`RUNNING`.
"""
started_at = timezone.now()
rows = Task.objects.filter(
pk=self.pk,
state=TASK_STATES.WAITING,
app_lock=AppStatus.objects.current(),
).update(
filter_kwargs = {
"pk": self.pk,
"state": TASK_STATES.WAITING,
}
# Only check app_lock for PulpcoreWorker, not RedisWorker
if settings.WORKER_TYPE != "redis":
filter_kwargs["app_lock"] = AppStatus.objects.current()

rows = Task.objects.filter(**filter_kwargs).update(
state=TASK_STATES.RUNNING,
started_at=started_at,
app_lock=AppStatus.objects.current(),
)
if rows == 1:
self.state = TASK_STATES.RUNNING
Expand Down
4 changes: 4 additions & 0 deletions pulpcore/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@
CONTENT_APP_TTL = 30
WORKER_TTL = 30

# Worker implementation type
# Options: "pulpcore" (default, PostgreSQL advisory locks) or "redis" (Redis distributed locks)
WORKER_TYPE = "pulpcore"

# Seconds for a task to finish on semi graceful worker shutdown (approx)
# On SIGHUP, SIGTERM the currently running task will be awaited forever.
# On SIGINT, this value represents the time before the worker will attempt to kill the subprocess.
Expand Down
43 changes: 43 additions & 0 deletions pulpcore/app/tasks/test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import backoff
import os
import signal
import time
from pulpcore.app.models import TaskGroup
from pulpcore.tasking.tasks import dispatch
Expand Down Expand Up @@ -32,3 +34,44 @@ def dummy_group_task(inbetween=3, intervals=None):
for interval in intervals:
dispatch(sleep, args=(interval,), task_group=task_group)
time.sleep(inbetween)


def missing_worker():
"""
Simulates a worker crash by sending SIGKILL to parent process and itself.

This task is used for testing worker cleanup behavior when a worker
unexpectedly dies while executing a task.
"""
parent_pid = os.getppid()
current_pid = os.getpid()

# Kill parent process (the worker)
os.kill(parent_pid, signal.SIGKILL)

# Kill current process (the task subprocess)
os.kill(current_pid, signal.SIGKILL)


def failing_task(error_message="Task intentionally failed"):
"""
A task that always raises a RuntimeError.

This task is used for testing error handling in worker task execution.

Args:
error_message (str): The error message to include in the RuntimeError
"""
raise RuntimeError(error_message)


async def afailing_task(error_message="Task intentionally failed"):
"""
An async task that always raises a RuntimeError.

This task is used for testing error handling in immediate task execution.

Args:
error_message (str): The error message to include in the RuntimeError
"""
raise RuntimeError(error_message)
26 changes: 23 additions & 3 deletions pulpcore/app/viewsets/task.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from gettext import gettext as _

from django.conf import settings
from django.db import transaction
from django.db.models import Prefetch
from django_filters.rest_framework import filters
Expand Down Expand Up @@ -42,7 +43,7 @@
CreatedResourcesFilter,
)
from pulpcore.constants import TASK_INCOMPLETE_STATES, TASK_STATES
from pulpcore.tasking.tasks import dispatch, cancel_task, cancel_task_group
from pulpcore.tasking.tasks import dispatch
from pulpcore.app.role_util import get_objects_for_user


Expand Down Expand Up @@ -227,7 +228,17 @@ def partial_update(self, request, pk=None, partial=True):
serializer.is_valid(raise_exception=True)

task = self.get_object()
task = cancel_task(task.pk)

# Call the appropriate cancel_task function based on worker type
if settings.WORKER_TYPE == "redis":
from pulpcore.tasking.redis_tasks import cancel_task

task = cancel_task(task.pk)
else:
from pulpcore.tasking.tasks import cancel_task

task = cancel_task(task.pk)

# Check whether task is actually canceled
http_status = (
None
Expand Down Expand Up @@ -346,7 +357,16 @@ def partial_update(self, request, pk=None, partial=True):
).count()
):
raise PermissionDenied()
task_group = cancel_task_group(task_group.pk)

# Call the appropriate cancel_task_group function based on worker type
if settings.WORKER_TYPE == "redis":
from pulpcore.tasking.redis_tasks import cancel_task_group

task_group = cancel_task_group(task_group.pk)
else:
from pulpcore.tasking.tasks import cancel_task_group

task_group = cancel_task_group(task_group.pk)
# Check whether task group is actually canceled
serializer = TaskGroupSerializer(task_group, context={"request": request})
task_statuses = (
Expand Down
9 changes: 8 additions & 1 deletion pulpcore/tasking/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,14 @@
configure_periodic_telemetry,
)
from pulpcore.constants import TASK_FINAL_STATES, TASK_STATES
from pulpcore.tasking.tasks import dispatch, execute_task
from pulpcore.tasking.tasks import dispatch

# Conditionally import execute_task based on WORKER_TYPE
if settings.WORKER_TYPE == "redis":
from pulpcore.tasking.redis_tasks import execute_task
else:
from pulpcore.tasking.tasks import execute_task


_logger = logging.getLogger(__name__)

Expand Down
15 changes: 13 additions & 2 deletions pulpcore/tasking/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from django.conf import settings # noqa: E402: module level not at top
from pulpcore.tasking.worker import PulpcoreWorker # noqa: E402: module level not at top
from pulpcore.tasking.redis_worker import RedisWorker # noqa: E402: module level not at top


_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,6 +60,16 @@ def worker(
if name_template:
settings.set("WORKER_NAME_TEMPLATE", name_template)

_logger.info("Starting distributed type worker")
worker_type = settings.WORKER_TYPE
_logger.info("Starting %s worker", worker_type)

PulpcoreWorker(auxiliary=auxiliary).run(burst=burst)
if worker_type == "redis":
if auxiliary:
_logger.warning(
"RedisWorker does not support auxiliary mode, ignoring --auxiliary flag"
)
RedisWorker().run(burst=burst)
elif worker_type == "pulpcore":
PulpcoreWorker(auxiliary=auxiliary).run(burst=burst)
else:
raise ValueError(f"Invalid WORKER_TYPE: {worker_type}. Must be 'pulpcore' or 'redis'.")
Loading