From 1e5e565376bd5ace0456d9ef116d394d34e51529 Mon Sep 17 00:00:00 2001 From: Nejc Saje Date: Tue, 2 Dec 2025 09:41:20 +0100 Subject: [PATCH 1/3] Bump versions, get off lintlizard --- .github/workflows/release.yml | 4 ++-- .github/workflows/test.yaml | 10 ++++++---- Dockerfile | 2 +- requirements-lint.txt | 2 +- requirements.txt | 2 +- 5 files changed, 11 insertions(+), 9 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index ea8284df..30b55cae 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -9,7 +9,7 @@ on: jobs: build_and_upload: - runs-on: 'ubuntu-20.04' + runs-on: 'ubuntu-24.04' environment: production permissions: # id-token for the trusted publisher setup @@ -22,7 +22,7 @@ jobs: - uses: actions/setup-python@v2 name: Install Python with: - python-version: 3.10 + python-version: 3.12 - run: | pip install packaging diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 2b3c45cb..21e72a65 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -13,7 +13,7 @@ jobs: matrix: python-version: [ '3.10', '3.11', '3.12', '3.13' ] name: Lint ${{ matrix.python-version }} - runs-on: 'ubuntu-20.04' + runs-on: 'ubuntu-24.04' container: python:${{ matrix.python-version }} steps: - name: Checkout code @@ -22,16 +22,18 @@ jobs: - name: Lint code run: | pip install -c requirements.txt -r requirements-lint.txt - lintlizard --ci + mypy . + ruff check + ruff format # Run tests test: strategy: matrix: python-version: ['3.10', '3.11', '3.12', '3.13'] - os: ['ubuntu-20.04'] + os: ['ubuntu-24.04'] redis-version: [4, 5, "6.2.6", "7.0.9"] - redis-py-version: [3.3.0, 4.6.0] + redis-py-version: [4.6.0, 6.1.0] # Do not cancel any jobs when a single job fails fail-fast: false name: Python ${{ matrix.python-version }} on ${{ matrix.os }} with Redis ${{ matrix.redis-version }} and redis-py==${{ matrix.redis-py-version }} diff --git a/Dockerfile b/Dockerfile index 755d86e9..3fe061c5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM circleci/python:3.10 +FROM python:3.12 WORKDIR /src COPY requirements.txt . diff --git a/requirements-lint.txt b/requirements-lint.txt index b49ffbc6..97ebe83e 100644 --- a/requirements-lint.txt +++ b/requirements-lint.txt @@ -1,2 +1,2 @@ -lintlizard==0.26.0 +ruff==0.14.7 types-redis diff --git a/requirements.txt b/requirements.txt index cd109265..d0c4fe88 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ click==8.1.7 -redis==4.5.2 +redis==6.1.0 structlog==24.1.0 croniter From c961a7d5556c026c5a0b913e80107963665100a9 Mon Sep 17 00:00:00 2001 From: Nejc Saje Date: Tue, 2 Dec 2025 10:49:00 +0100 Subject: [PATCH 2/3] fix up typing --- pyproject.toml | 23 +++-------------------- requirements-lint.txt | 1 + setup.py | 2 +- tasktiger/runner.py | 4 +--- tasktiger/task.py | 14 +++++++------- tasktiger/tasktiger.py | 2 +- 6 files changed, 14 insertions(+), 32 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index bc013f4c..d7ea8535 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,20 +1,3 @@ -[tool.black] -line-length = 79 -exclude = ''' -/( - \.git -)/ -''' - -[tool.isort] -skip = ['.git', 'venv'] -known_tests = 'tests' -sections = ['FUTURE', 'STDLIB', 'THIRDPARTY', 'FIRSTPARTY', 'TESTS', 'LOCALFOLDER'] -default_section = 'THIRDPARTY' -use_parentheses = true -multi_line_output = 3 -include_trailing_comma = true -force_grid_wrap = 0 -combine_as_imports = true -line_length = 79 -float_to_top = true +[tool.mypy] +ignore_missing_imports = true +follow_imports = "skip" diff --git a/requirements-lint.txt b/requirements-lint.txt index 97ebe83e..416e5f6b 100644 --- a/requirements-lint.txt +++ b/requirements-lint.txt @@ -1,2 +1,3 @@ +mypy==1.19.0 ruff==0.14.7 types-redis diff --git a/setup.py b/setup.py index db43259a..f17f52fa 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ VERSION_FILE = "tasktiger/__init__.py" with open(VERSION_FILE, encoding="utf8") as fd: - version = re.search(r'__version__ = ([\'"])(.*?)\1', fd.read()).group(2) + version = re.search(r'__version__ = ([\'"])(.*?)\1', fd.read()).group(2) # type: ignore with open("README.rst", encoding="utf-8") as file: long_description = file.read() diff --git a/tasktiger/runner.py b/tasktiger/runner.py index fabc3526..ded3941a 100644 --- a/tasktiger/runner.py +++ b/tasktiger/runner.py @@ -44,9 +44,7 @@ def run_eager_task(self, task: "Task") -> None: """ raise NotImplementedError("Eager tasks are not supported.") - def on_permanent_error( - self, task: "Task", execution: Dict[str, Any] - ) -> None: + def on_permanent_error(self, task: "Task", execution: Dict[str, Any] | None) -> None: """ Called if the task fails permanently. diff --git a/tasktiger/task.py b/tasktiger/task.py index 9f15b609..f5e246e2 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -538,8 +538,9 @@ def tasks_from_queue( for idx, serialized_data, serialized_executions, ts in zip( range(len(items)), results[0], results[1:], tss ): - if serialized_data is None and include_not_found: - data = {"id": items[idx][0]} + if serialized_data is None: + if include_not_found: + data = {"id": items[idx][0]} else: data = json.loads(serialized_data) @@ -561,11 +562,10 @@ def tasks_from_queue( result = tiger.connection.mget( [tiger._key("task", item[0]) for item in items] ) - for idx, serialized_data, ts in zip( - range(len(items)), result, tss - ): - if serialized_data is None and include_not_found: - data = {"id": items[idx][0]} + for idx, serialized_data, ts in zip(range(len(items)), result, tss): + if serialized_data is None: + if include_not_found: + data = {"id": items[idx][0]} else: data = json.loads(serialized_data) diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index 76589af0..0f732059 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -13,6 +13,7 @@ Iterable, List, Optional, + ParamSpec, Tuple, Type, TypeVar, @@ -24,7 +25,6 @@ import redis import structlog from structlog.stdlib import BoundLogger -from typing_extensions import ParamSpec from ._internal import ( ACTIVE, From e02924edba639f36910e129c304f0c158f5e4e2f Mon Sep 17 00:00:00 2001 From: Nejc Saje Date: Tue, 2 Dec 2025 10:29:03 +0100 Subject: [PATCH 3/3] ruff format --- scripts/redis_scan.py | 1 + tasktiger/_internal.py | 5 +- tasktiger/executor.py | 12 ++--- tasktiger/flask_script.py | 4 +- tasktiger/migrations.py | 4 +- tasktiger/redis_scripts.py | 12 ++--- tasktiger/redis_semaphore.py | 4 +- tasktiger/retry.py | 12 ++--- tasktiger/rollbar.py | 4 +- tasktiger/runner.py | 12 ++--- tasktiger/schedule.py | 4 +- tasktiger/task.py | 20 ++------ tasktiger/tasktiger.py | 26 +++------- tasktiger/timeouts.py | 3 +- tasktiger/worker.py | 95 +++++++++-------------------------- tests/conftest.py | 3 +- tests/tasks.py | 52 +++++-------------- tests/tasks_periodic.py | 4 +- tests/test_base.py | 79 ++++++++--------------------- tests/test_context_manager.py | 15 ++---- tests/test_periodic.py | 24 +++------ tests/test_queue_size.py | 4 +- tests/test_redis_scripts.py | 14 ++---- tests/test_semaphore.py | 33 ++++-------- tests/test_workers.py | 12 ++--- 25 files changed, 122 insertions(+), 336 deletions(-) diff --git a/scripts/redis_scan.py b/scripts/redis_scan.py index 8896f9ad..04b7c6a7 100644 --- a/scripts/redis_scan.py +++ b/scripts/redis_scan.py @@ -4,6 +4,7 @@ Can also set the TTL for keys to facilitate removing data. """ + import signal import sys import time diff --git a/tasktiger/_internal.py b/tasktiger/_internal.py index 6f31d239..fbfcf973 100644 --- a/tasktiger/_internal.py +++ b/tasktiger/_internal.py @@ -97,8 +97,7 @@ def serialize_func_name(func: Union[Callable, Type]) -> str: """ if func.__module__ == "__main__": raise ValueError( - "Functions from the __main__ module cannot be processed by " - "workers." + "Functions from the __main__ module cannot be processed by workers." ) try: # This will only work on Python 3.3 or above, but it will allow us to use static/classmethods @@ -143,7 +142,7 @@ def serialize_retry_method(retry_method: Any) -> Tuple[str, Tuple]: def get_timestamp( - when: Optional[Union[datetime.timedelta, datetime.datetime]] + when: Optional[Union[datetime.timedelta, datetime.datetime]], ) -> Optional[float]: # convert timedelta to datetime if isinstance(when, datetime.timedelta): diff --git a/tasktiger/executor.py b/tasktiger/executor.py index ccc3c8d9..c42653cb 100644 --- a/tasktiger/executor.py +++ b/tasktiger/executor.py @@ -135,9 +135,7 @@ def execute_tasks(self, tasks: List[Task], log: BoundLogger) -> bool: hard_timeouts = self.worker.get_hard_timeouts(func, tasks) - with WorkerContextManagerStack( - self.config["CHILD_CONTEXT_MANAGERS"] - ): + with WorkerContextManagerStack(self.config["CHILD_CONTEXT_MANAGERS"]): if is_batch_func: # Batch process if the task supports it. g["current_tasks"] = tasks @@ -165,9 +163,7 @@ def execute_tasks(self, tasks: List[Task], log: BoundLogger) -> bool: execution["time_failed"] = time.time() if self.worker.store_tracebacks: # Currently we only log failed task executions to Redis. - execution["traceback"] = "".join( - traceback.format_exception(*exc_info) - ) + execution["traceback"] = "".join(traceback.format_exception(*exc_info)) execution["success"] = success execution["host"] = socket.gethostname() @@ -360,9 +356,7 @@ def check_child_exit() -> Optional[int]: execution = { "time_started": time_started, "time_failed": now, - "exception_name": serialize_func_name( - JobTimeoutException - ), + "exception_name": serialize_func_name(JobTimeoutException), "success": False, "host": socket.gethostname(), } diff --git a/tasktiger/flask_script.py b/tasktiger/flask_script.py index adfbd975..119d64e9 100644 --- a/tasktiger/flask_script.py +++ b/tasktiger/flask_script.py @@ -20,9 +20,7 @@ def __init__(self, tiger: "TaskTiger") -> None: super(TaskTigerCommand, self).__init__() self.tiger = tiger - def create_parser( - self, *args: Any, **kwargs: Any - ) -> argparse.ArgumentParser: + def create_parser(self, *args: Any, **kwargs: Any) -> argparse.ArgumentParser: # Override the default parser so we can pass all arguments to the # TaskTiger parser. func_stack = kwargs.pop("func_stack", ()) diff --git a/tasktiger/migrations.py b/tasktiger/migrations.py index 1b3a0a5b..714feb1f 100644 --- a/tasktiger/migrations.py +++ b/tasktiger/migrations.py @@ -21,9 +21,7 @@ def migrate_executions_count(tiger: "TaskTiger") -> None: """ ) - match = ( - redis_glob_escape(tiger.config["REDIS_PREFIX"]) + ":task:*:executions" - ) + match = redis_glob_escape(tiger.config["REDIS_PREFIX"]) + ":task:*:executions" for key in tiger.connection.scan_iter(count=100, match=match): migrate_task(keys=[key, key + "_count"]) diff --git a/tasktiger/redis_scripts.py b/tasktiger/redis_scripts.py index 29e4c541..64f8c314 100644 --- a/tasktiger/redis_scripts.py +++ b/tasktiger/redis_scripts.py @@ -299,16 +299,12 @@ def __init__(self, redis: Redis) -> None: self.redis = redis self._zadd_noupdate = redis.register_script(ZADD_NOUPDATE) - self._zadd_update_existing = redis.register_script( - ZADD_UPDATE_EXISTING - ) + self._zadd_update_existing = redis.register_script(ZADD_UPDATE_EXISTING) self._zadd_update_min = redis.register_script(ZADD_UPDATE_MIN) self._zadd_update_max = redis.register_script(ZADD_UPDATE_MAX) self._zpoppush = redis.register_script(ZPOPPUSH) - self._zpoppush_update_sets = redis.register_script( - ZPOPPUSH_UPDATE_SETS - ) + self._zpoppush_update_sets = redis.register_script(ZPOPPUSH_UPDATE_SETS) self._zpoppush_withscores = redis.register_script(ZPOPPUSH_WITHSCORES) self._zpoppush_exists_min_update_sets = redis.register_script( ZPOPPUSH_EXISTS_MIN_UPDATE_SETS @@ -319,9 +315,7 @@ def __init__(self, redis: Redis) -> None: self._srem_if_not_exists = redis.register_script(SREM_IF_NOT_EXISTS) - self._delete_if_not_in_zsets = redis.register_script( - DELETE_IF_NOT_IN_ZSETS - ) + self._delete_if_not_in_zsets = redis.register_script(DELETE_IF_NOT_IN_ZSETS) self._fail_if_not_in_zset = redis.register_script(FAIL_IF_NOT_IN_ZSET) diff --git a/tasktiger/redis_semaphore.py b/tasktiger/redis_semaphore.py index 23e28a33..ba467629 100644 --- a/tasktiger/redis_semaphore.py +++ b/tasktiger/redis_semaphore.py @@ -80,9 +80,7 @@ def set_system_lock(cls, redis: Redis, name: str, timeout: int) -> None: pipeline = redis.pipeline() pipeline.zadd(name, {SYSTEM_LOCK_ID: time.time() + timeout}) - pipeline.expire( - name, timeout + 10 - ) # timeout plus buffer for troubleshooting + pipeline.expire(name, timeout + 10) # timeout plus buffer for troubleshooting pipeline.execute() def release(self) -> None: diff --git a/tasktiger/retry.py b/tasktiger/retry.py index 0f22f0bd..4efb9af0 100644 --- a/tasktiger/retry.py +++ b/tasktiger/retry.py @@ -13,9 +13,7 @@ def fixed(delay: float, max_retries: int) -> RetryStrategy: return (_fixed, (delay, max_retries)) -def _linear( - retry: int, delay: float, increment: float, max_retries: int -) -> float: +def _linear(retry: int, delay: float, increment: float, max_retries: int) -> float: if retry > max_retries: raise StopRetry() return delay + increment * (retry - 1) @@ -25,15 +23,11 @@ def linear(delay: float, increment: float, max_retries: int) -> RetryStrategy: return (_linear, (delay, increment, max_retries)) -def _exponential( - retry: int, delay: float, factor: float, max_retries: int -) -> float: +def _exponential(retry: int, delay: float, factor: float, max_retries: int) -> float: if retry > max_retries: raise StopRetry() return delay * factor ** (retry - 1) -def exponential( - delay: float, factor: float, max_retries: int -) -> RetryStrategy: +def exponential(delay: float, factor: float, max_retries: int) -> RetryStrategy: return (_exponential, (delay, factor, max_retries)) diff --git a/tasktiger/rollbar.py b/tasktiger/rollbar.py index 1357d18d..b39d6315 100644 --- a/tasktiger/rollbar.py +++ b/tasktiger/rollbar.py @@ -26,9 +26,7 @@ def format_field(field: str, value: Any) -> str: return "%s: %s" % ( self.prefix, - " ".join( - format_field(key, data[key]) for key in KEYS if key in data - ), + " ".join(format_field(key, data[key]) for key in KEYS if key in data), ) def emit(self, record: Any) -> Any: diff --git a/tasktiger/runner.py b/tasktiger/runner.py index ded3941a..037c813b 100644 --- a/tasktiger/runner.py +++ b/tasktiger/runner.py @@ -26,9 +26,7 @@ def run_single_task(self, task: "Task", hard_timeout: float) -> None: """ raise NotImplementedError("Single tasks are not supported.") - def run_batch_tasks( - self, tasks: List["Task"], hard_timeout: float - ) -> None: + def run_batch_tasks(self, tasks: List["Task"], hard_timeout: float) -> None: """ Run the given tasks using the hard timeout in seconds. @@ -64,9 +62,7 @@ def run_single_task(self, task: "Task", hard_timeout: float) -> None: with UnixSignalDeathPenalty(hard_timeout): task.func(*task.args, **task.kwargs) - def run_batch_tasks( - self, tasks: List["Task"], hard_timeout: float - ) -> None: + def run_batch_tasks(self, tasks: List["Task"], hard_timeout: float) -> None: params = [{"args": task.args, "kwargs": task.kwargs} for task in tasks] func = tasks[0].func with UnixSignalDeathPenalty(hard_timeout): @@ -82,9 +78,7 @@ def run_eager_task(self, task: "Task") -> None: return func(*task.args, **task.kwargs) -def get_runner_class( - log: BoundLogger, tasks: List["Task"] -) -> Type[BaseRunner]: +def get_runner_class(log: BoundLogger, tasks: List["Task"]) -> Type[BaseRunner]: runner_class_paths = {task.serialized_runner_class for task in tasks} if len(runner_class_paths) > 1: log.error( diff --git a/tasktiger/schedule.py b/tasktiger/schedule.py index 80c2391a..35aa85f6 100644 --- a/tasktiger/schedule.py +++ b/tasktiger/schedule.py @@ -50,9 +50,7 @@ def periodic( For more details, see README. """ - period = ( - seconds + minutes * 60 + hours * 3600 + days * 86400 + weeks * 604800 - ) + period = seconds + minutes * 60 + hours * 3600 + days * 86400 + weeks * 604800 assert period > 0, "Must specify a positive period." if not start_date: # Saturday at midnight diff --git a/tasktiger/task.py b/tasktiger/task.py index f5e246e2..927cea4a 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -115,9 +115,7 @@ def __init__( max_queue_size = getattr(func, "_task_max_queue_size", None) if max_stored_executions is None: - max_stored_executions = getattr( - func, "_task_max_stored_executions", None - ) + max_stored_executions = getattr(func, "_task_max_stored_executions", None) if runner_class is None: runner_class = getattr(func, "_task_runner_class", None) @@ -158,9 +156,7 @@ def __init__( task["retry_method"] = serialize_retry_method(retry_method) if retry_on: - task["retry_on"] = [ - serialize_func_name(cls) for cls in retry_on - ] + task["retry_on"] = [serialize_func_name(cls) for cls in retry_on] if max_queue_size: task["max_queue_size"] = max_queue_size if max_stored_executions is not None: @@ -521,9 +517,7 @@ def tasks_from_queue( tasks = [] if items: - tss = [ - datetime.datetime.utcfromtimestamp(item[1]) for item in items - ] + tss = [datetime.datetime.utcfromtimestamp(item[1]) for item in items] if load_executions: pipeline = tiger.connection.pipeline() pipeline.mget([tiger._key("task", item[0]) for item in items]) @@ -544,9 +538,7 @@ def tasks_from_queue( else: data = json.loads(serialized_data) - executions = [ - json.loads(e) for e in serialized_executions if e - ] + executions = [json.loads(e) for e in serialized_executions if e] task = Task( tiger, @@ -569,9 +561,7 @@ def tasks_from_queue( else: data = json.loads(serialized_data) - task = Task( - tiger, queue=queue, _data=data, _state=state, _ts=ts - ) + task = Task(tiger, queue=queue, _data=data, _state=state, _ts=ts) tasks.append(task) return n, tasks diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index 0f732059..f0a6c39d 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -159,11 +159,7 @@ def __init__( self.periodic_task_funcs: Dict[str, Callable] = {} if lazy_init: - assert ( - connection is None - and config is None - and setup_structlog is False - ) + assert connection is None and config is None and setup_structlog is False else: self.init( connection=connection, @@ -293,9 +289,7 @@ def init( self.log.setLevel(logging.DEBUG) logging.basicConfig(format="%(message)s") - self.connection: redis.Redis = connection or redis.Redis( - decode_responses=True - ) + self.connection: redis.Redis = connection or redis.Redis(decode_responses=True) self.scripts: RedisScripts = RedisScripts(self.connection) def _get_current_task(self) -> Task: @@ -363,8 +357,7 @@ def task( max_queue_size: Optional[int] = ..., max_stored_executions: Optional[int] = ..., runner_class: Optional[Type["BaseRunner"]] = ..., - ) -> TaskCallable[P, R]: - ... + ) -> TaskCallable[P, R]: ... @overload def task( @@ -387,8 +380,7 @@ def task( max_queue_size: Optional[int] = ..., max_stored_executions: Optional[int] = ..., runner_class: Optional[Type["BaseRunner"]] = ..., - ) -> Callable[[Callable[P, R]], TaskCallable[P, R]]: - ... + ) -> Callable[[Callable[P, R]], TaskCallable[P, R]]: ... def task( self, @@ -445,9 +437,9 @@ def _wrap(func: Callable[P, R]) -> TaskCallable[P, R]: if schedule is not None: serialized_func = serialize_func_name(func) - assert ( - serialized_func not in self.periodic_task_funcs - ), "attempted duplicate registration of periodic task" + assert serialized_func not in self.periodic_task_funcs, ( + "attempted duplicate registration of periodic task" + ) self.periodic_task_funcs[serialized_func] = tc return tc @@ -683,9 +675,7 @@ def purge_errored_tasks( assert limit > 0, "If specified, limit must be greater than zero" only_queues = set(queues or self.config["ONLY_QUEUES"] or []) - exclude_queues_ = set( - exclude_queues or self.config["EXCLUDE_QUEUES"] or [] - ) + exclude_queues_ = set(exclude_queues or self.config["EXCLUDE_QUEUES"] or []) def errored_tasks() -> Iterable[Task]: queues_with_errors = self.connection.smembers(self._key(ERROR)) diff --git a/tasktiger/timeouts.py b/tasktiger/timeouts.py index cda33e33..af63e41c 100644 --- a/tasktiger/timeouts.py +++ b/tasktiger/timeouts.py @@ -45,8 +45,7 @@ def cancel_death_penalty(self) -> None: class UnixSignalDeathPenalty(BaseDeathPenalty): def handle_death_penalty(self, signum: int, frame: Any) -> None: raise JobTimeoutException( - "Job exceeded maximum timeout " - "value (%d seconds)." % self._timeout + "Job exceeded maximum timeout value (%d seconds)." % self._timeout ) def setup_death_penalty(self) -> None: diff --git a/tasktiger/worker.py b/tasktiger/worker.py index e4acd584..a1716b16 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -104,9 +104,7 @@ def __init__( if single_worker_queues: self.single_worker_queues = set(single_worker_queues) elif self.config["SINGLE_WORKER_QUEUES"]: - self.single_worker_queues = set( - self.config["SINGLE_WORKER_QUEUES"] - ) + self.single_worker_queues = set(self.config["SINGLE_WORKER_QUEUES"]) else: self.single_worker_queues = set() @@ -114,15 +112,10 @@ def __init__( self.max_workers_per_queue: Optional[int] = max_workers_per_queue else: self.max_workers_per_queue = None - assert ( - self.max_workers_per_queue is None - or self.max_workers_per_queue >= 1 - ) + assert self.max_workers_per_queue is None or self.max_workers_per_queue >= 1 if store_tracebacks is None: - self.store_tracebacks = bool( - self.config.get("STORE_TRACEBACKS", True) - ) + self.store_tracebacks = bool(self.config.get("STORE_TRACEBACKS", True)) else: self.store_tracebacks = bool(store_tracebacks) @@ -132,9 +125,9 @@ def __init__( # queues. This allows us to use worker group-specific locks to reduce # Redis load. self.worker_group_name = hashlib.sha256( - json.dumps( - [sorted(self.only_queues), sorted(self.exclude_queues)] - ).encode("utf8") + json.dumps([sorted(self.only_queues), sorted(self.exclude_queues)]).encode( + "utf8" + ) ).hexdigest() def _install_signal_handlers(self) -> None: @@ -190,9 +183,7 @@ def _worker_queue_scheduled_tasks(self) -> None: if not lock.acquire(blocking=False): return - queues = set( - self._filter_queues(self._retrieve_queues(self._key(SCHEDULED))) - ) + queues = set(self._filter_queues(self._retrieve_queues(self._key(SCHEDULED)))) now = time.time() for queue in queues: @@ -235,9 +226,7 @@ def _poll_for_queues(self) -> None: time.sleep(self.config["POLL_TASK_QUEUES_INTERVAL"]) self._refresh_queue_set() - def _pubsub_for_queues( - self, timeout: float = 0, batch_timeout: float = 0 - ) -> None: + def _pubsub_for_queues(self, timeout: float = 0, batch_timeout: float = 0) -> None: """ Check activity channel for new queues and wait as necessary. @@ -262,9 +251,7 @@ def _pubsub_for_queues( else: pubsub_sleep = start_time + timeout - time.time() message = self._pubsub.get_message( - timeout=0 - if pubsub_sleep < 0 or self._did_work - else pubsub_sleep + timeout=0 if pubsub_sleep < 0 or self._did_work else pubsub_sleep ) # Pull remaining messages off of channel @@ -322,26 +309,20 @@ def _worker_queue_expired_tasks(self) -> None: self.config["REQUEUE_EXPIRED_TASKS_BATCH_SIZE"], ) - for (queue, task_id) in task_data: + for queue, task_id in task_data: self.log.debug("expiring task", queue=queue, task_id=task_id) self._did_work = True try: task = Task.from_id(self.tiger, queue, ACTIVE, task_id) if task.should_retry_on(JobTimeoutException, logger=self.log): - self.log.info( - "queueing expired task", queue=queue, task_id=task_id - ) + self.log.info("queueing expired task", queue=queue, task_id=task_id) # Task is idempotent and can be requeued. If the task # already exists in the QUEUED queue, don't change its # time. - task._move( - from_state=ACTIVE, to_state=QUEUED, when=now, mode="nx" - ) + task._move(from_state=ACTIVE, to_state=QUEUED, when=now, mode="nx") else: - self.log.error( - "failing expired task", queue=queue, task_id=task_id - ) + self.log.error("failing expired task", queue=queue, task_id=task_id) # Assume the task can't be retried and move it to the error # queue. @@ -378,9 +359,7 @@ def get_hard_timeouts(self, func: Any, tasks: List[Task]) -> List[float]: is_batch_func = getattr(func, "_task_batch", False) if is_batch_func: task_timeouts = [ - task.hard_timeout - for task in tasks - if task.hard_timeout is not None + task.hard_timeout for task in tasks if task.hard_timeout is not None ] hard_timeout = ( (max(task_timeouts) if task_timeouts else None) @@ -411,9 +390,7 @@ def _get_queue_batch_size(self, queue: str) -> int: def _get_queue_lock( self, queue: str, log: BoundLogger - ) -> Union[ - Tuple[None, Literal[True]], Tuple[Optional[Semaphore], Literal[False]] - ]: + ) -> Union[Tuple[None, Literal[True]], Tuple[Optional[Semaphore], Literal[False]]]: """Get queue lock for max worker queues. For max worker queues it returns a Lock if acquired and whether @@ -544,9 +521,7 @@ def _process_queue_tasks( queue, tasks, queue_lock ) processed_count = processed_count + len(processed_tasks) - log.debug( - "processed", attempted=len(tasks), processed=processed_count - ) + log.debug("processed", attempted=len(tasks), processed=processed_count) for task in processed_tasks: self._finish_task_processing(queue, task, success, now) @@ -625,12 +600,7 @@ def _prepare_execution(self, tasks: List[Task]) -> None: # The tasks must use the same function. assert len(tasks) serialized_task_func = tasks[0].serialized_func - assert all( - [ - serialized_task_func == task.serialized_func - for task in tasks[1:] - ] - ) + assert all([serialized_task_func == task.serialized_func for task in tasks[1:]]) # Before executing periodic tasks, queue them for the next period. if serialized_task_func in self.tiger.periodic_task_funcs: @@ -705,9 +675,7 @@ def _execute_task_group( self._prepare_execution(ready_tasks) - success = self.executor.execute( - queue, ready_tasks, log, locks, queue_lock - ) + success = self.executor.execute(queue, ready_tasks, log, locks, queue_lock) if self.stats_thread: self.stats_thread.report_task_end() @@ -798,18 +766,14 @@ def _mark_done() -> None: exception_name=exception_name, ) else: - if task.should_retry_on( - exception_class, logger=log - ): + if task.should_retry_on(exception_class, logger=log): should_retry = True else: # If the task retries on JobTimeoutException, it should # be idempotent and we can retry. Note that this will # not increase the retry counter since we have no # execution stored on the task. - if task.should_retry_on( - JobTimeoutException, logger=log - ): + if task.should_retry_on(JobTimeoutException, logger=log): should_retry = True else: should_retry = True @@ -828,9 +792,7 @@ def _mark_done() -> None: try: func = import_attribute(retry_func) except TaskImportError: - log.error( - "could not import retry function", func=retry_func - ) + log.error("could not import retry function", func=retry_func) else: try: retry_delay = func(retry_num, *retry_args) @@ -942,9 +904,7 @@ def _queue_periodic_tasks(self) -> None: # We can safely queue the task here since periodic tasks are # unique. when = task._queue_for_next_period() - self.log.info( - "queued periodic task", func=task.serialized_func, when=when - ) + self.log.info("queued periodic task", func=task.serialized_func, when=when) def _refresh_queue_set(self) -> None: self._queue_set = set( @@ -965,9 +925,7 @@ def store_task_execution(self, tasks: List[Task], execution: Dict) -> None: for task in tasks: executions_key = self._key("task", task.id, "executions") - executions_count_key = self._key( - "task", task.id, "executions_count" - ) + executions_count_key = self._key("task", task.id, "executions_count") pipeline = self.connection.pipeline() pipeline.incr(executions_count_key) @@ -1007,9 +965,7 @@ def run( ) if exit_after: - exit_after_dt = ( - datetime.datetime.now(datetime.timezone.utc) + exit_after - ) + exit_after_dt = datetime.datetime.now(datetime.timezone.utc) + exit_after else: exit_after_dt = None @@ -1059,8 +1015,7 @@ def run( break if ( exit_after_dt - and datetime.datetime.now(datetime.timezone.utc) - > exit_after_dt + and datetime.datetime.now(datetime.timezone.utc) > exit_after_dt ): break if self._stop_requested: diff --git a/tests/conftest.py b/tests/conftest.py index 9aa86947..f6c04d43 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -46,8 +46,7 @@ def _ensure_queue(typ, data): task_ids = redis.zrange("t:%s:%s" % (typ, name), 0, -1) assert len(task_ids) == n ret[name] = [ - json.loads(redis.get("t:task:%s" % task_id)) - for task_id in task_ids + json.loads(redis.get("t:task:%s" % task_id)) for task_id in task_ids ] assert [task["id"] for task in ret[name]] == task_ids return ret diff --git a/tests/tasks.py b/tests/tasks.py index 9c465990..4eb2900c 100644 --- a/tests/tasks.py +++ b/tests/tasks.py @@ -57,27 +57,21 @@ def long_task_killed(): @tiger.task(hard_timeout=DELAY * 2) def long_task_ok(): # Signal task has started - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: conn.lpush(LONG_TASK_SIGNAL_KEY, "1") time.sleep(DELAY) def wait_for_long_task(): """Waits for a long task to start.""" - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: result = conn.blpop(LONG_TASK_SIGNAL_KEY, int(ceil(DELAY * 3))) assert result[1] == "1" @tiger.task(unique=True) def unique_task(value=None): - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: conn.lpush("unique_task", value) @@ -93,9 +87,7 @@ def unique_key_task(a, b): @tiger.task(lock=True) def locked_task(key, other=None): - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: data = conn.getset(key, 1) if data is not None: raise Exception("task failed, key already set") @@ -105,9 +97,7 @@ def locked_task(key, other=None): @tiger.task(queue="batch", batch=True) def batch_task(params): - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: try: conn.rpush("batch_task", json.dumps(params)) except Exception: @@ -118,9 +108,7 @@ def batch_task(params): @tiger.task(queue="batch") def non_batch_task(arg): - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: conn.rpush("batch_task", arg) if arg == 10: @@ -141,9 +129,7 @@ def retry_task_3(): def verify_current_task(): - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: try: tiger.current_tasks except RuntimeError: @@ -154,9 +140,7 @@ def verify_current_task(): @tiger.task(batch=True, queue="batch") def verify_current_tasks(tasks): - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: try: tasks = tiger.current_task except RuntimeError: @@ -167,18 +151,14 @@ def verify_current_tasks(tasks): def verify_current_serialized_func(): - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: serialized_func = tiger.current_serialized_func conn.set("serialized_func", serialized_func) @tiger.task(batch=True, queue="batch") def verify_current_serialized_func_batch(tasks): - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: serialized_func = tiger.current_serialized_func conn.set("serialized_func", serialized_func) @@ -223,9 +203,7 @@ def run_single_task(self, task, hard_timeout): assert hard_timeout == 300 assert task.func is simple_task - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: conn.set("task_id", task.id) def run_batch_tasks(self, tasks, hard_timeout): @@ -233,9 +211,7 @@ def run_batch_tasks(self, tasks, hard_timeout): assert hard_timeout == 300 assert len(tasks) == 2 - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: conn.set("task_args", ",".join(str(t.args[0]) for t in tasks)) def run_eager_task(self, task): @@ -246,7 +222,5 @@ class MyErrorRunnerClass(DefaultRunner): def on_permanent_error(self, task, execution): assert task.func is exception_task assert execution["exception_name"] == "builtins:Exception" - with redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) as conn: + with redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) as conn: conn.set("task_id", task.id) diff --git a/tests/tasks_periodic.py b/tests/tasks_periodic.py index f28af744..00646f3d 100644 --- a/tests/tasks_periodic.py +++ b/tests/tasks_periodic.py @@ -10,9 +10,7 @@ tiger = get_tiger() -@tiger.task( - schedule=periodic(seconds=1), queue="periodic", retry_on=(ValueError,) -) +@tiger.task(schedule=periodic(seconds=1), queue="periodic", retry_on=(ValueError,)) def periodic_task(): """Periodic task.""" conn = redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) diff --git a/tests/test_base.py b/tests/test_base.py index 29a7bd1b..a41a03eb 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -94,9 +94,7 @@ def test_simple_task(self): self._ensure_queues(queued={"default": 0}) assert not self.conn.exists("t:task:%s" % task["id"]) - @pytest.mark.skipif( - sys.version_info < (3, 3), reason="__qualname__ unavailable" - ) + @pytest.mark.skipif(sys.version_info < (3, 3), reason="__qualname__ unavailable") def test_staticmethod_task(self): self.tiger.delay(StaticTask.task) queues = self._ensure_queues(queued={"default": 1}) @@ -215,16 +213,12 @@ def test_exception_task(self, store_tracebacks): self.tiger.delay(exception_task) Worker(self.tiger).run(once=True) - queues = self._ensure_queues( - queued={"default": 0}, error={"default": 1} - ) + queues = self._ensure_queues(queued={"default": 0}, error={"default": 1}) task = queues["error"]["default"][0] assert task["func"] == "tests.tasks:exception_task" - executions = self.conn.lrange( - "t:task:%s:executions" % task["id"], 0, -1 - ) + executions = self.conn.lrange("t:task:%s:executions" % task["id"], 0, -1) assert len(executions) == 1 execution = json.loads(executions[0]) assert execution["exception_name"] == serialize_func_name(Exception) @@ -266,16 +260,12 @@ def test_long_task_ok(self): def test_long_task_killed(self): self.tiger.delay(long_task_killed) Worker(self.tiger).run(once=True) - queues = self._ensure_queues( - queued={"default": 0}, error={"default": 1} - ) + queues = self._ensure_queues(queued={"default": 0}, error={"default": 1}) task = queues["error"]["default"][0] assert task["func"] == "tests.tasks:long_task_killed" - executions = self.conn.lrange( - "t:task:%s:executions" % task["id"], 0, -1 - ) + executions = self.conn.lrange("t:task:%s:executions" % task["id"], 0, -1) assert len(executions) == 1 execution = json.loads(executions[0]) exception_name = execution["exception_name"] @@ -287,9 +277,7 @@ def test_unique_task_1(self): self.tiger.delay(unique_task, kwargs={"value": 2}) self.tiger.delay(unique_task, kwargs={"value": 2}) - queues = self._ensure_queues( - queued={"default": 2}, error={"default": 0} - ) + queues = self._ensure_queues(queued={"default": 2}, error={"default": 0}) task_1, task_2 = queues["queued"]["default"] @@ -320,9 +308,7 @@ def test_unique_key_task(self): self.tiger.delay(unique_key_task) self.tiger.delay(unique_key_task, kwargs={"b": 1}) - queues = self._ensure_queues( - queued={"default": 3}, error={"default": 0} - ) + queues = self._ensure_queues(queued={"default": 3}, error={"default": 0}) task_1, task_2, task_3 = queues["queued"]["default"] @@ -529,9 +515,7 @@ class CustomException(Exception): ) def test_retry_method(self): - task = self.tiger.delay( - exception_task, retry_method=linear(DELAY, DELAY, 3) - ) + task = self.tiger.delay(exception_task, retry_method=linear(DELAY, DELAY, 3)) def _run(n_executions): Worker(self.tiger).run(once=True) @@ -645,9 +629,7 @@ def test_retry_executions_count(self, count): Worker(self.tiger).run(once=True) - assert ( - int(self.conn.get(f"t:task:{task.id}:executions_count")) == count - ) + assert int(self.conn.get(f"t:task:{task.id}:executions_count")) == count assert self.conn.llen(f"t:task:{task.id}:executions") == count def test_batch_1(self): @@ -751,15 +733,9 @@ def test_batch_exception_3(self): self._ensure_queues(queued={"batch": 0}, error={"batch": 2}) def test_batch_lock_key(self): - self.tiger.delay( - batch_task, kwargs={"key": "1", "other": 1}, lock_key=("key,") - ) - self.tiger.delay( - batch_task, kwargs={"key": "2", "other": 2}, lock_key=("key,") - ) - self.tiger.delay( - batch_task, kwargs={"key": "2", "other": 3}, lock_key=("key,") - ) + self.tiger.delay(batch_task, kwargs={"key": "1", "other": 1}, lock_key=("key,")) + self.tiger.delay(batch_task, kwargs={"key": "2", "other": 2}, lock_key=("key,")) + self.tiger.delay(batch_task, kwargs={"key": "2", "other": 3}, lock_key=("key,")) self._ensure_queues(queued={"batch": 3}) Worker(self.tiger).run(once=True) @@ -788,9 +764,7 @@ def test_only_queues(self, only_queues, expected_unprocessed): self.tiger.delay(simple_task, queue="[ab\\c]*?") self.tiger.delay(simple_task, queue="[ab\\c]*?") - self._ensure_queues( - queued={"a": 1, "a.a": 1, "b": 1, "b.a": 1, "[ab\\c]*?": 2} - ) + self._ensure_queues(queued={"a": 1, "a.a": 1, "b": 1, "b.a": 1, "[ab\\c]*?": 2}) self.tiger.config["ONLY_QUEUES"] = only_queues @@ -828,9 +802,7 @@ def test_purge_errored_tasks_basic(self): self.tiger.delay(exception_task) Worker(self.tiger).run(once=True) - queues = self._ensure_queues( - queued={"default": 0}, error={"default": 1} - ) + queues = self._ensure_queues(queued={"default": 0}, error={"default": 1}) task = queues["error"]["default"][0] assert task["func"] == "tests.tasks:exception_task" @@ -1421,14 +1393,10 @@ def test_child_hanging_forever(self): task = queues["error"]["default"][0] assert task["func"] == "tests.tasks:sleep_task" - executions = self.conn.lrange( - "t:task:%s:executions" % task["id"], 0, -1 - ) + executions = self.conn.lrange("t:task:%s:executions" % task["id"], 0, -1) assert len(executions) == 1 execution = json.loads(executions[0]) - assert execution["exception_name"] == serialize_func_name( - JobTimeoutException - ) + assert execution["exception_name"] == serialize_func_name(JobTimeoutException) assert not execution["success"] def test_decorated_child_hard_timeout_precedence(self): @@ -1478,19 +1446,14 @@ def test_decorated_child_hard_timeout_precedence(self): task = queues["error"]["default"][0] assert task["func"] == "tests.tasks:decorated_task_sleep_timeout" - executions = self.conn.lrange( - "t:task:%s:executions" % task["id"], 0, -1 - ) + executions = self.conn.lrange("t:task:%s:executions" % task["id"], 0, -1) assert len(executions) == 1 execution = json.loads(executions[0]) - assert execution["exception_name"] == serialize_func_name( - JobTimeoutException - ) + assert execution["exception_name"] == serialize_func_name(JobTimeoutException) assert not execution["success"] # Ensure that task duration is lower than DEFAULT_HARD_TIMEOUT assert ( - execution["time_failed"] - execution["time_started"] - < DEFAULT_HARD_TIMEOUT + execution["time_failed"] - execution["time_started"] < DEFAULT_HARD_TIMEOUT ) @@ -1519,9 +1482,7 @@ def test_mixed_runner_class_batch_task(self): self._ensure_queues(error={"batch": 2}) def test_permanent_error(self): - task = self.tiger.delay( - exception_task, runner_class=MyErrorRunnerClass - ) + task = self.tiger.delay(exception_task, runner_class=MyErrorRunnerClass) Worker(self.tiger).run(once=True) assert self.conn.get("task_id") == task.id self.conn.delete("task_id") diff --git a/tests/test_context_manager.py b/tests/test_context_manager.py index 92af2cb3..4e3a8403 100644 --- a/tests/test_context_manager.py +++ b/tests/test_context_manager.py @@ -1,4 +1,5 @@ """Child context manager tests.""" + import redis from tasktiger import Worker @@ -17,9 +18,7 @@ class ContextManagerTester: def __init__(self, name): self.name = name - self.conn = redis.Redis( - host=REDIS_HOST, db=TEST_DB, decode_responses=True - ) + self.conn = redis.Redis(host=REDIS_HOST, db=TEST_DB, decode_responses=True) self.conn.set("cm:{}:enter".format(self.name), 0) self.conn.set("cm:{}:exit".format(self.name), 0) self.conn.set("cm:{}:exit_with_error".format(self.name), 0) @@ -51,15 +50,9 @@ def _test_context_managers(self, num, task, should_fail=False): assert self.conn.get("cm:{}:enter".format(cms[i].name)) == "1" assert self.conn.get("cm:{}:exit".format(cms[i].name)) == "1" if should_fail: - assert ( - self.conn.get("cm:{}:exit_with_error".format(cms[i].name)) - == "1" - ) + assert self.conn.get("cm:{}:exit_with_error".format(cms[i].name)) == "1" else: - assert ( - self.conn.get("cm:{}:exit_with_error".format(cms[i].name)) - == "0" - ) + assert self.conn.get("cm:{}:exit_with_error".format(cms[i].name)) == "0" def test_fixture(self): cms = self._get_context_managers(1).pop() diff --git a/tests/test_periodic.py b/tests/test_periodic.py index caede500..699cbd92 100644 --- a/tests/test_periodic.py +++ b/tests/test_periodic.py @@ -185,9 +185,7 @@ def test_periodic_execution_unique_ids(self): self._ensure_queues(queued={"periodic": 1}) # generate the expected unique id - expected_unique_id = gen_unique_id( - serialize_func_name(periodic_task), [], {} - ) + expected_unique_id = gen_unique_id(serialize_func_name(periodic_task), [], {}) # pull task out of the queue by id. If found, then the id is correct task = Task.from_id(tiger, "periodic", QUEUED, expected_unique_id) @@ -244,9 +242,7 @@ def test_periodic_execution_unique_ids_self_correct(self): sleep_until_next_second() # generate the ids - correct_unique_id = gen_unique_id( - serialize_func_name(periodic_task), [], {} - ) + correct_unique_id = gen_unique_id(serialize_func_name(periodic_task), [], {}) malformed_unique_id = gen_unique_id( serialize_func_name(periodic_task), None, None ) @@ -298,9 +294,7 @@ def test_successful_execution_clears_executions_from_retries(self): self._ensure_queues(queued={"periodic": 1}) Worker(tiger).run(once=True) - task = Task.from_id( - tiger, "periodic", SCHEDULED, task_id, load_executions=10 - ) + task = Task.from_id(tiger, "periodic", SCHEDULED, task_id, load_executions=10) assert len(task.executions) == 1 tiger.connection.delete("fail-periodic-task") @@ -315,9 +309,7 @@ def test_successful_execution_clears_executions_from_retries(self): Worker(tiger).run(once=True) # Ensure we cleared any previous executions. - task = Task.from_id( - tiger, "periodic", SCHEDULED, task_id, load_executions=10 - ) + task = Task.from_id(tiger, "periodic", SCHEDULED, task_id, load_executions=10) assert len(task.executions) == 0 def test_successful_execution_doesnt_clear_previous_errors(self): @@ -345,9 +337,7 @@ def test_successful_execution_doesnt_clear_previous_errors(self): self._ensure_queues(queued={"periodic": 1}) Worker(tiger).run(once=True) - task = Task.from_id( - tiger, "periodic", SCHEDULED, task_id, load_executions=10 - ) + task = Task.from_id(tiger, "periodic", SCHEDULED, task_id, load_executions=10) assert len(task.executions) == 1 tiger.connection.delete("fail-periodic-task") @@ -362,7 +352,5 @@ def test_successful_execution_doesnt_clear_previous_errors(self): Worker(tiger).run(once=True) # Ensure we didn't clear previous executions. - task = Task.from_id( - tiger, "periodic", SCHEDULED, task_id, load_executions=10 - ) + task = Task.from_id(tiger, "periodic", SCHEDULED, task_id, load_executions=10) assert len(task.executions) == 1 diff --git a/tests/test_queue_size.py b/tests/test_queue_size.py index c67edf41..7241f425 100644 --- a/tests/test_queue_size.py +++ b/tests/test_queue_size.py @@ -72,9 +72,7 @@ def test_task_all_states(self): # Queued self.tiger.delay(simple_task, queue="a", max_queue_size=3) - self._ensure_queues( - active={"a": 1}, queued={"a": 1}, scheduled={"a": 1} - ) + self._ensure_queues(active={"a": 1}, queued={"a": 1}, scheduled={"a": 1}) # Should fail to queue task to run immediately with pytest.raises(QueueFullException): diff --git a/tests/test_redis_scripts.py b/tests/test_redis_scripts.py index 00879362..f33494ac 100644 --- a/tests/test_redis_scripts.py +++ b/tests/test_redis_scripts.py @@ -74,9 +74,7 @@ def test_zpoppush_3(self): def test_zpoppush_withscores_1(self): self.conn.zadd("src", {"a": 1, "b": 2, "c": 3, "d": 4}) - result = self.scripts.zpoppush( - "src", "dst", 3, None, 10, withscores=True - ) + result = self.scripts.zpoppush("src", "dst", 3, None, 10, withscores=True) assert result == ["a", "1", "b", "2", "c", "3"] src = self.conn.zrange("src", 0, -1, withscores=True) @@ -87,9 +85,7 @@ def test_zpoppush_withscores_1(self): def test_zpoppush_withscores_2(self): self.conn.zadd("src", {"a": 1, "b": 2, "c": 3, "d": 4}) - result = self.scripts.zpoppush( - "src", "dst", 100, None, 10, withscores=True - ) + result = self.scripts.zpoppush("src", "dst", 100, None, 10, withscores=True) assert result == ["a", "1", "b", "2", "c", "3", "d", "4"] src = self.conn.zrange("src", 0, -1, withscores=True) @@ -125,7 +121,7 @@ def test_zpoppush_on_success_1(self, **kwargs): score=None, new_score=10, on_success=("update_sets", "val", "remove_set", "add_set"), - **kwargs + **kwargs, ) assert result == ["a", "b"] @@ -151,7 +147,7 @@ def test_zpoppush_on_success_2(self, **kwargs): score=0, new_score=10, on_success=("update_sets", "val", "remove_set", "add_set"), - **kwargs + **kwargs, ) assert result == [] @@ -177,7 +173,7 @@ def test_zpoppush_on_success_3(self, **kwargs): score=None, new_score=10, on_success=("update_sets", "val", "remove_set", "add_set"), - **kwargs + **kwargs, ) assert result == ["a", "b", "c", "d"] diff --git a/tests/test_semaphore.py b/tests/test_semaphore.py index b2fbffb1..76f41456 100644 --- a/tests/test_semaphore.py +++ b/tests/test_semaphore.py @@ -1,4 +1,5 @@ """Test Redis Semaphore lock.""" + import datetime import time @@ -31,12 +32,8 @@ def teardown_method(self, method): def test_simple_semaphore(self): """Test semaphore.""" - semaphore1 = Semaphore( - self.conn, "test_key", "id_1", max_locks=1, timeout=10 - ) - semaphore2 = Semaphore( - self.conn, "test_key", "id_2", max_locks=1, timeout=10 - ) + semaphore1 = Semaphore(self.conn, "test_key", "id_1", max_locks=1, timeout=10) + semaphore2 = Semaphore(self.conn, "test_key", "id_2", max_locks=1, timeout=10) # Get lock and then release with FreezeTime(datetime.datetime(2014, 1, 1)): @@ -64,15 +61,9 @@ def test_simple_semaphore(self): assert locks == 1 def test_multiple_locks(self): - semaphore1 = Semaphore( - self.conn, "test_key", "id_1", max_locks=2, timeout=10 - ) - semaphore2 = Semaphore( - self.conn, "test_key", "id_2", max_locks=2, timeout=10 - ) - semaphore3 = Semaphore( - self.conn, "test_key", "id_3", max_locks=2, timeout=10 - ) + semaphore1 = Semaphore(self.conn, "test_key", "id_1", max_locks=2, timeout=10) + semaphore2 = Semaphore(self.conn, "test_key", "id_2", max_locks=2, timeout=10) + semaphore3 = Semaphore(self.conn, "test_key", "id_3", max_locks=2, timeout=10) # First two locks should be acquired with FreezeTime(datetime.datetime(2014, 1, 1)): @@ -99,12 +90,8 @@ def test_multiple_locks(self): assert locks == 2 def test_semaphores_renew(self): - semaphore1 = Semaphore( - self.conn, "test_key", "id_1", max_locks=1, timeout=10 - ) - semaphore2 = Semaphore( - self.conn, "test_key", "id_2", max_locks=1, timeout=10 - ) + semaphore1 = Semaphore(self.conn, "test_key", "id_1", max_locks=1, timeout=10) + semaphore2 = Semaphore(self.conn, "test_key", "id_2", max_locks=1, timeout=10) with FreezeTime(datetime.datetime(2014, 1, 1)): acquired, locks = semaphore1.acquire() @@ -138,9 +125,7 @@ def test_semaphores_renew(self): # Test system lock shorter and longer than regular lock timeout @pytest.mark.parametrize("timeout", [8, 30]) def test_system_lock(self, timeout): - semaphore1 = Semaphore( - self.conn, "test_key", "id_1", max_locks=10, timeout=10 - ) + semaphore1 = Semaphore(self.conn, "test_key", "id_1", max_locks=10, timeout=10) with FreezeTime(datetime.datetime(2014, 1, 1)): Semaphore.set_system_lock(self.conn, "test_key", timeout) diff --git a/tests/test_workers.py b/tests/test_workers.py index deffa904..a4c895c9 100644 --- a/tests/test_workers.py +++ b/tests/test_workers.py @@ -218,17 +218,13 @@ def test_heartbeat(self, tiger): conn = tiger.connection heartbeat_1 = conn.zscore(queue_key, task.id) - queue_lock_1 = conn.zrange(queue_lock_key, 0, -1, withscores=True)[0][ - 1 - ] + queue_lock_1 = conn.zrange(queue_lock_key, 0, -1, withscores=True)[0][1] task_lock_1 = conn.pttl(task_lock_key) time.sleep(DELAY / 2) heartbeat_2 = conn.zscore(queue_key, task.id) - queue_lock_2 = conn.zrange(queue_lock_key, 0, -1, withscores=True)[0][ - 1 - ] + queue_lock_2 = conn.zrange(queue_lock_key, 0, -1, withscores=True)[0][1] task_lock_2 = conn.pttl(task_lock_key) assert heartbeat_2 > heartbeat_1 > 0 @@ -240,9 +236,7 @@ def test_heartbeat(self, tiger): worker.kill() - def test_stop_heartbeat_thread_on_unhandled_exception( - self, tiger, ensure_queues - ): + def test_stop_heartbeat_thread_on_unhandled_exception(self, tiger, ensure_queues): task = Task(tiger, system_exit_task) task.delay()