Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:

jobs:
build_and_upload:
runs-on: 'ubuntu-20.04'
runs-on: 'ubuntu-24.04'
environment: production
permissions:
# id-token for the trusted publisher setup
Expand All @@ -22,7 +22,7 @@ jobs:
- uses: actions/setup-python@v2
name: Install Python
with:
python-version: 3.10
python-version: 3.12

- run: |
pip install packaging
Expand Down
10 changes: 6 additions & 4 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
matrix:
python-version: [ '3.10', '3.11', '3.12', '3.13' ]
name: Lint ${{ matrix.python-version }}
runs-on: 'ubuntu-20.04'
runs-on: 'ubuntu-24.04'
container: python:${{ matrix.python-version }}
steps:
- name: Checkout code
Expand All @@ -22,16 +22,18 @@ jobs:
- name: Lint code
run: |
pip install -c requirements.txt -r requirements-lint.txt
lintlizard --ci
mypy .
ruff check
ruff format

# Run tests
test:
strategy:
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13']
os: ['ubuntu-20.04']
os: ['ubuntu-24.04']
redis-version: [4, 5, "6.2.6", "7.0.9"]
redis-py-version: [3.3.0, 4.6.0]
redis-py-version: [4.6.0, 6.1.0]
# Do not cancel any jobs when a single job fails
fail-fast: false
name: Python ${{ matrix.python-version }} on ${{ matrix.os }} with Redis ${{ matrix.redis-version }} and redis-py==${{ matrix.redis-py-version }}
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM circleci/python:3.10
FROM python:3.12

WORKDIR /src
COPY requirements.txt .
Expand Down
23 changes: 3 additions & 20 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,3 @@
[tool.black]
line-length = 79
exclude = '''
/(
\.git
)/
'''

[tool.isort]
skip = ['.git', 'venv']
known_tests = 'tests'
sections = ['FUTURE', 'STDLIB', 'THIRDPARTY', 'FIRSTPARTY', 'TESTS', 'LOCALFOLDER']
default_section = 'THIRDPARTY'
use_parentheses = true
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
combine_as_imports = true
line_length = 79
float_to_top = true
[tool.mypy]
ignore_missing_imports = true
follow_imports = "skip"
3 changes: 2 additions & 1 deletion requirements-lint.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
lintlizard==0.26.0
mypy==1.19.0
ruff==0.14.7
types-redis
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
click==8.1.7
redis==4.5.2
redis==6.1.0
structlog==24.1.0
croniter
1 change: 1 addition & 0 deletions scripts/redis_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

Can also set the TTL for keys to facilitate removing data.
"""

import signal
import sys
import time
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

VERSION_FILE = "tasktiger/__init__.py"
with open(VERSION_FILE, encoding="utf8") as fd:
version = re.search(r'__version__ = ([\'"])(.*?)\1', fd.read()).group(2)
version = re.search(r'__version__ = ([\'"])(.*?)\1', fd.read()).group(2) # type: ignore

with open("README.rst", encoding="utf-8") as file:
long_description = file.read()
Expand Down
5 changes: 2 additions & 3 deletions tasktiger/_internal.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ def serialize_func_name(func: Union[Callable, Type]) -> str:
"""
if func.__module__ == "__main__":
raise ValueError(
"Functions from the __main__ module cannot be processed by "
"workers."
"Functions from the __main__ module cannot be processed by workers."
)
try:
# This will only work on Python 3.3 or above, but it will allow us to use static/classmethods
Expand Down Expand Up @@ -143,7 +142,7 @@ def serialize_retry_method(retry_method: Any) -> Tuple[str, Tuple]:


def get_timestamp(
when: Optional[Union[datetime.timedelta, datetime.datetime]]
when: Optional[Union[datetime.timedelta, datetime.datetime]],
) -> Optional[float]:
# convert timedelta to datetime
if isinstance(when, datetime.timedelta):
Expand Down
12 changes: 3 additions & 9 deletions tasktiger/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ def execute_tasks(self, tasks: List[Task], log: BoundLogger) -> bool:

hard_timeouts = self.worker.get_hard_timeouts(func, tasks)

with WorkerContextManagerStack(
self.config["CHILD_CONTEXT_MANAGERS"]
):
with WorkerContextManagerStack(self.config["CHILD_CONTEXT_MANAGERS"]):
if is_batch_func:
# Batch process if the task supports it.
g["current_tasks"] = tasks
Expand Down Expand Up @@ -165,9 +163,7 @@ def execute_tasks(self, tasks: List[Task], log: BoundLogger) -> bool:
execution["time_failed"] = time.time()
if self.worker.store_tracebacks:
# Currently we only log failed task executions to Redis.
execution["traceback"] = "".join(
traceback.format_exception(*exc_info)
)
execution["traceback"] = "".join(traceback.format_exception(*exc_info))
execution["success"] = success
execution["host"] = socket.gethostname()

Expand Down Expand Up @@ -360,9 +356,7 @@ def check_child_exit() -> Optional[int]:
execution = {
"time_started": time_started,
"time_failed": now,
"exception_name": serialize_func_name(
JobTimeoutException
),
"exception_name": serialize_func_name(JobTimeoutException),
"success": False,
"host": socket.gethostname(),
}
Expand Down
4 changes: 1 addition & 3 deletions tasktiger/flask_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ def __init__(self, tiger: "TaskTiger") -> None:
super(TaskTigerCommand, self).__init__()
self.tiger = tiger

def create_parser(
self, *args: Any, **kwargs: Any
) -> argparse.ArgumentParser:
def create_parser(self, *args: Any, **kwargs: Any) -> argparse.ArgumentParser:
# Override the default parser so we can pass all arguments to the
# TaskTiger parser.
func_stack = kwargs.pop("func_stack", ())
Expand Down
4 changes: 1 addition & 3 deletions tasktiger/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ def migrate_executions_count(tiger: "TaskTiger") -> None:
"""
)

match = (
redis_glob_escape(tiger.config["REDIS_PREFIX"]) + ":task:*:executions"
)
match = redis_glob_escape(tiger.config["REDIS_PREFIX"]) + ":task:*:executions"

for key in tiger.connection.scan_iter(count=100, match=match):
migrate_task(keys=[key, key + "_count"])
12 changes: 3 additions & 9 deletions tasktiger/redis_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,16 +299,12 @@ def __init__(self, redis: Redis) -> None:
self.redis = redis

self._zadd_noupdate = redis.register_script(ZADD_NOUPDATE)
self._zadd_update_existing = redis.register_script(
ZADD_UPDATE_EXISTING
)
self._zadd_update_existing = redis.register_script(ZADD_UPDATE_EXISTING)
self._zadd_update_min = redis.register_script(ZADD_UPDATE_MIN)
self._zadd_update_max = redis.register_script(ZADD_UPDATE_MAX)

self._zpoppush = redis.register_script(ZPOPPUSH)
self._zpoppush_update_sets = redis.register_script(
ZPOPPUSH_UPDATE_SETS
)
self._zpoppush_update_sets = redis.register_script(ZPOPPUSH_UPDATE_SETS)
self._zpoppush_withscores = redis.register_script(ZPOPPUSH_WITHSCORES)
self._zpoppush_exists_min_update_sets = redis.register_script(
ZPOPPUSH_EXISTS_MIN_UPDATE_SETS
Expand All @@ -319,9 +315,7 @@ def __init__(self, redis: Redis) -> None:

self._srem_if_not_exists = redis.register_script(SREM_IF_NOT_EXISTS)

self._delete_if_not_in_zsets = redis.register_script(
DELETE_IF_NOT_IN_ZSETS
)
self._delete_if_not_in_zsets = redis.register_script(DELETE_IF_NOT_IN_ZSETS)

self._fail_if_not_in_zset = redis.register_script(FAIL_IF_NOT_IN_ZSET)

Expand Down
4 changes: 1 addition & 3 deletions tasktiger/redis_semaphore.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ def set_system_lock(cls, redis: Redis, name: str, timeout: int) -> None:

pipeline = redis.pipeline()
pipeline.zadd(name, {SYSTEM_LOCK_ID: time.time() + timeout})
pipeline.expire(
name, timeout + 10
) # timeout plus buffer for troubleshooting
pipeline.expire(name, timeout + 10) # timeout plus buffer for troubleshooting
pipeline.execute()

def release(self) -> None:
Expand Down
12 changes: 3 additions & 9 deletions tasktiger/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ def fixed(delay: float, max_retries: int) -> RetryStrategy:
return (_fixed, (delay, max_retries))


def _linear(
retry: int, delay: float, increment: float, max_retries: int
) -> float:
def _linear(retry: int, delay: float, increment: float, max_retries: int) -> float:
if retry > max_retries:
raise StopRetry()
return delay + increment * (retry - 1)
Expand All @@ -25,15 +23,11 @@ def linear(delay: float, increment: float, max_retries: int) -> RetryStrategy:
return (_linear, (delay, increment, max_retries))


def _exponential(
retry: int, delay: float, factor: float, max_retries: int
) -> float:
def _exponential(retry: int, delay: float, factor: float, max_retries: int) -> float:
if retry > max_retries:
raise StopRetry()
return delay * factor ** (retry - 1)


def exponential(
delay: float, factor: float, max_retries: int
) -> RetryStrategy:
def exponential(delay: float, factor: float, max_retries: int) -> RetryStrategy:
return (_exponential, (delay, factor, max_retries))
4 changes: 1 addition & 3 deletions tasktiger/rollbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ def format_field(field: str, value: Any) -> str:

return "%s: %s" % (
self.prefix,
" ".join(
format_field(key, data[key]) for key in KEYS if key in data
),
" ".join(format_field(key, data[key]) for key in KEYS if key in data),
)

def emit(self, record: Any) -> Any:
Expand Down
16 changes: 4 additions & 12 deletions tasktiger/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ def run_single_task(self, task: "Task", hard_timeout: float) -> None:
"""
raise NotImplementedError("Single tasks are not supported.")

def run_batch_tasks(
self, tasks: List["Task"], hard_timeout: float
) -> None:
def run_batch_tasks(self, tasks: List["Task"], hard_timeout: float) -> None:
"""
Run the given tasks using the hard timeout in seconds.

Expand All @@ -44,9 +42,7 @@ def run_eager_task(self, task: "Task") -> None:
"""
raise NotImplementedError("Eager tasks are not supported.")

def on_permanent_error(
self, task: "Task", execution: Dict[str, Any]
) -> None:
def on_permanent_error(self, task: "Task", execution: Dict[str, Any] | None) -> None:
"""
Called if the task fails permanently.

Expand All @@ -66,9 +62,7 @@ def run_single_task(self, task: "Task", hard_timeout: float) -> None:
with UnixSignalDeathPenalty(hard_timeout):
task.func(*task.args, **task.kwargs)

def run_batch_tasks(
self, tasks: List["Task"], hard_timeout: float
) -> None:
def run_batch_tasks(self, tasks: List["Task"], hard_timeout: float) -> None:
params = [{"args": task.args, "kwargs": task.kwargs} for task in tasks]
func = tasks[0].func
with UnixSignalDeathPenalty(hard_timeout):
Expand All @@ -84,9 +78,7 @@ def run_eager_task(self, task: "Task") -> None:
return func(*task.args, **task.kwargs)


def get_runner_class(
log: BoundLogger, tasks: List["Task"]
) -> Type[BaseRunner]:
def get_runner_class(log: BoundLogger, tasks: List["Task"]) -> Type[BaseRunner]:
runner_class_paths = {task.serialized_runner_class for task in tasks}
if len(runner_class_paths) > 1:
log.error(
Expand Down
4 changes: 1 addition & 3 deletions tasktiger/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ def periodic(

For more details, see README.
"""
period = (
seconds + minutes * 60 + hours * 3600 + days * 86400 + weeks * 604800
)
period = seconds + minutes * 60 + hours * 3600 + days * 86400 + weeks * 604800
assert period > 0, "Must specify a positive period."
if not start_date:
# Saturday at midnight
Expand Down
34 changes: 12 additions & 22 deletions tasktiger/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ def __init__(
max_queue_size = getattr(func, "_task_max_queue_size", None)

if max_stored_executions is None:
max_stored_executions = getattr(
func, "_task_max_stored_executions", None
)
max_stored_executions = getattr(func, "_task_max_stored_executions", None)

if runner_class is None:
runner_class = getattr(func, "_task_runner_class", None)
Expand Down Expand Up @@ -158,9 +156,7 @@ def __init__(

task["retry_method"] = serialize_retry_method(retry_method)
if retry_on:
task["retry_on"] = [
serialize_func_name(cls) for cls in retry_on
]
task["retry_on"] = [serialize_func_name(cls) for cls in retry_on]
if max_queue_size:
task["max_queue_size"] = max_queue_size
if max_stored_executions is not None:
Expand Down Expand Up @@ -521,9 +517,7 @@ def tasks_from_queue(
tasks = []

if items:
tss = [
datetime.datetime.utcfromtimestamp(item[1]) for item in items
]
tss = [datetime.datetime.utcfromtimestamp(item[1]) for item in items]
if load_executions:
pipeline = tiger.connection.pipeline()
pipeline.mget([tiger._key("task", item[0]) for item in items])
Expand All @@ -538,14 +532,13 @@ def tasks_from_queue(
for idx, serialized_data, serialized_executions, ts in zip(
range(len(items)), results[0], results[1:], tss
):
if serialized_data is None and include_not_found:
data = {"id": items[idx][0]}
if serialized_data is None:
if include_not_found:
data = {"id": items[idx][0]}
else:
data = json.loads(serialized_data)

executions = [
json.loads(e) for e in serialized_executions if e
]
executions = [json.loads(e) for e in serialized_executions if e]

task = Task(
tiger,
Expand All @@ -561,17 +554,14 @@ def tasks_from_queue(
result = tiger.connection.mget(
[tiger._key("task", item[0]) for item in items]
)
for idx, serialized_data, ts in zip(
range(len(items)), result, tss
):
if serialized_data is None and include_not_found:
data = {"id": items[idx][0]}
for idx, serialized_data, ts in zip(range(len(items)), result, tss):
if serialized_data is None:
if include_not_found:
data = {"id": items[idx][0]}
else:
data = json.loads(serialized_data)

task = Task(
tiger, queue=queue, _data=data, _state=state, _ts=ts
)
task = Task(tiger, queue=queue, _data=data, _state=state, _ts=ts)
tasks.append(task)

return n, tasks
Expand Down
Loading