Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/sysd_example.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass

from sysdi import TimedUnit, UnitManager
from sysdi import ServiceUnit, TimedUnit, UnitManager
from sysdi.contrib import cronitor


Expand Down Expand Up @@ -53,9 +53,30 @@ class Starship(TimedUnit):
),
)


# Service-only (no timer) unit for chaining
@dataclass
class SvcStarship(ServiceUnit):
exec_bin: str = '/bin/starship'


# Chain: A runs on a schedule; on success triggers B; on success triggers C
um_chain = UnitManager(unit_prefix='utm-chain-')
alpha = Starship(
'Diagnostics Head',
'diagnostics run',
start_delay='30s',
run_every='15m',
)
beta = SvcStarship('Diagnostics Beta', 'beta stage')
gamma = SvcStarship('Diagnostics Gamma', 'gamma stage')
um_chain.chain('Diagnostics Chain', alpha, beta, gamma)


# Call this in a cli command (or something) to:
# - Write units to disk
# - Reload systemd daemon
# - Enable timer units
# - Enable login linger: which indicates timers should run even when the user is logged out
# um.sync(linger='enable')
# um_chain.sync(linger=None)
1 change: 1 addition & 0 deletions src/sysdi/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .core import ExecWrap as ExecWrap
from .core import ServiceUnit as ServiceUnit
from .core import TimedUnit as TimedUnit
from .core import UnitManager as UnitManager
from .core import WebPing as WebPing
229 changes: 223 additions & 6 deletions src/sysdi/core.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Iterable, Sequence
from dataclasses import dataclass
import logging
import os
Expand Down Expand Up @@ -84,9 +85,10 @@ def sync(self, *, linger: str | None, install_dpath: str | os.PathLike | None =
linger_disable()

def unit_names(self):
return [u.unit_name('service') for u in self.units] + [
u.unit_name('timer') for u in self.units
]
names: list[str] = []
for u in self.units:
names.extend(u.managed_unit_names())
return names

def stale(self):
managed_names = set(self.unit_names())
Expand All @@ -111,12 +113,53 @@ def remove_all(self):
"""Remove all unit files, services, and timers that match the prefix."""
self.remove_stale()
for unit in self.units:
# Timer first to avoid systemd warning about timer being able to start service.
self.remove_unit(unit.unit_name('timer'))
self.remove_unit(unit.unit_name('service'))
for name in unit.managed_unit_names():
self.remove_unit(name)

daemon_reload()

def chain(self, chain_name: str, *units):
if not units:
raise ValueError('chain must include at least one unit')
if len({id(u) for u in units}) != len(units):
raise ValueError('chain units must be unique instances')

self.register(*units)

def _append_success(unit: object, next_unit: object):
try:
curr = unit.on_success # type: ignore[attr-defined]
except AttributeError:
curr = None
items: list[object]
if curr is None:
items = []
elif isinstance(curr, str | bytes):
items = [curr]
else:
items = list(curr) # type: ignore[arg-type]
items.append(next_unit)
unit.on_success = items # type: ignore[attr-defined]

for i in range(len(units) - 1):
_append_success(units[i], units[i + 1])

# Create a target that wants the first trigger (timer or service)
first = units[0]
wants: list[str] = []
try:
wants.append(first.unit_name('timer')) # type: ignore[arg-type]
except AssertionError:
wants.append(first.unit_name('service')) # type: ignore[arg-type]

tgt = TargetUnit(
description=f'Chain: {chain_name}',
unit_basename=slugify(chain_name),
wants=wants,
)
tgt.unit_prefix = self.unit_prefix
self.units.append(tgt)

def remove_stale(self):
"""
Remove any unit files, services, or timers that match the prefix but aren't being
Expand Down Expand Up @@ -266,6 +309,10 @@ class TimedUnit:
# Exec Pre/Post support
exec_wrap: ExecWrap | None = None

# Chain/Dependency support (Unit options)
on_success: str | object | Sequence[str | object] | None = None
on_failure: str | object | Sequence[str | object] | None = None

# Other Unit Config
service_extra: list[str] | None = None
timer_extra: list[str] | None = None
Expand Down Expand Up @@ -333,6 +380,39 @@ def option(self, lines, opt_name):

lines.append(f'{opt_name}={value}')

def _normalize_refs(self, refs: str | object | Sequence[str | object] | None) -> list[str]:
if refs is None:
return []
if isinstance(refs, str | bytes):
items: Iterable[str | object] = [refs]
else:
items = refs # type: ignore[assignment]
names: list[str] = []
for r in items:
if isinstance(r, str | bytes):
names.append(r)
else:
try:
rpfx = getattr(r, 'unit_prefix', None)
spfx = getattr(self, 'unit_prefix', None)
if rpfx is None and spfx is not None and hasattr(r, 'unit_basename'):
names.append(f'{spfx}{r.unit_basename}.service') # type: ignore[attr-defined]
else:
names.append(r.unit_name('service')) # type: ignore[attr-defined]
except Exception as e: # pragma: no cover - defensive
raise TypeError('Invalid unit reference for OnSuccess/OnFailure') from e
return names

def _unit_dependency_lines(self) -> list[str]:
lines: list[str] = []
succ = self._normalize_refs(self.on_success)
fail = self._normalize_refs(self.on_failure)
if succ:
lines.append('OnSuccess=' + ' '.join(succ))
if fail:
lines.append('OnFailure=' + ' '.join(fail))
return lines

def timer(self):
lines = []
lines.extend(
Expand Down Expand Up @@ -374,6 +454,9 @@ def service(self):
f'Description={self.description}',
),
)
# Add chain dependencies if configured
lines.extend(self._unit_dependency_lines())

if self.retry_max_tries and self.retry_interval_seconds:
# limit interval must be set to more than (tries * interval) to contain the burst
limit_interval = (self.retry_max_tries * self.retry_interval_seconds) + 15
Expand Down Expand Up @@ -433,3 +516,137 @@ def install(self, install_dpath):
def unit_name(self, type_):
assert type_ in ('service', 'timer')
return f'{self.unit_prefix}{self.unit_basename}.{type_}'

def managed_unit_names(self) -> list[str]:
return [self.unit_name('timer'), self.unit_name('service')]


@dataclass
class ServiceUnit:
description: str
exec_args: str = ''
exec_bin: str = ''

service_type: str = 'oneshot'

retry_interval_seconds: int | None = None
retry_max_tries: int | None = None

exec_wrap: ExecWrap | None = None

on_success: str | object | Sequence[str | object] | None = None
on_failure: str | object | Sequence[str | object] | None = None

service_extra: list[str] | None = None

unit_basename: str | None = None
unit_prefix: str | None = None

def __post_init__(self):
if not self.exec_bin:
raise ValueError('exec_bin must be set')
self.unit_basename = self.unit_basename or slugify(self.description)

@property
def exec_start(self):
return f'{self.exec_bin} {self.exec_args}'.strip()

def _normalize_refs(self, refs: str | object | Sequence[str | object] | None) -> list[str]:
if refs is None:
return []
if isinstance(refs, str | bytes):
items: Iterable[str | object] = [refs]
else:
items = refs # type: ignore[assignment]
names: list[str] = []
for r in items:
if isinstance(r, str | bytes):
names.append(r)
else:
try:
rpfx = getattr(r, 'unit_prefix', None)
spfx = getattr(self, 'unit_prefix', None)
if rpfx is None and spfx is not None and hasattr(r, 'unit_basename'):
names.append(f'{spfx}{r.unit_basename}.service') # type: ignore[attr-defined]
else:
names.append(r.unit_name('service')) # type: ignore[attr-defined]
except Exception as e: # pragma: no cover
raise TypeError('Invalid unit reference for OnSuccess/OnFailure') from e
return names

def _unit_dependency_lines(self) -> list[str]:
lines: list[str] = []
succ = self._normalize_refs(self.on_success)
fail = self._normalize_refs(self.on_failure)
if succ:
lines.append('OnSuccess=' + ' '.join(succ))
if fail:
lines.append('OnFailure=' + ' '.join(fail))
return lines

def service(self):
lines: list[str] = []
lines.extend(('[Unit]', f'Description={self.description}'))
lines.extend(self._unit_dependency_lines())

if self.retry_max_tries and self.retry_interval_seconds:
limit_interval = (self.retry_max_tries * self.retry_interval_seconds) + 15
lines.extend(
(
f'StartLimitInterval={limit_interval}',
f'StartLimitBurst={self.retry_max_tries}',
),
)

lines.extend(('', '[Service]', f'Type={self.service_type}'))
if self.retry_interval_seconds:
lines.extend(('Restart=on-failure', f'RestartSec={self.retry_interval_seconds}'))

lines.append(f'ExecStart={self.exec_start}')

if self.exec_wrap:
lines.append(f'ExecStartPre={self.exec_wrap.pre()}')
lines.append(f'ExecStopPost={self.exec_wrap.post()}')

lines.extend(self.service_extra or ())
return '\n'.join(lines) + '\n'

def install(self, install_dpath: Path):
install_dpath.mkdir(parents=True, exist_ok=True)
service_fname = self.unit_name('service')
service_fpath = install_dpath.joinpath(service_fname)
service_fpath.write_text(self.service())
log.info(f'(Re)installed {service_fname}')
daemon_reload()

def unit_name(self, type_: str):
assert type_ == 'service'
return f'{self.unit_prefix}{self.unit_basename}.{type_}'

def managed_unit_names(self) -> list[str]:
return [self.unit_name('service')]


@dataclass
class TargetUnit:
description: str
unit_basename: str
wants: list[str] | None = None
unit_prefix: str | None = None

def install(self, install_dpath: Path):
install_dpath.mkdir(parents=True, exist_ok=True)
fname = self.unit_name()
fpath = install_dpath.joinpath(fname)
lines = ['[Unit]', f'Description={self.description}']
if self.wants:
lines.append('Wants=' + ' '.join(self.wants))
fpath.write_text('\n'.join(lines) + '\n')
log.info(f'(Re)installed {fname}')
daemon_reload()

def unit_name(self) -> str:
return f'{self.unit_prefix}{self.unit_basename}.target'

def managed_unit_names(self) -> list[str]:
return [self.unit_name()]
2 changes: 2 additions & 0 deletions src/sysdi_tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def test_ok(self, um: UnitManager, tmp_path: Path):
'Check exec wrap',
exec_bin='/usr/bin/true',
exec_wrap=FileWrap(tmp_path),
on_active_sec='1000s',
),
)
um.sync(linger=None)
Expand All @@ -59,6 +60,7 @@ def test_fail(self, um: UnitManager, tmp_path: Path):
'Check exec wrap',
exec_bin='/usr/bin/false',
exec_wrap=FileWrap(tmp_path),
on_active_sec='1000s',
),
)
um.sync(linger=None)
Expand Down
Loading