From 1b987d440e0e0b5409e225ae280c9ee5cf746bf4 Mon Sep 17 00:00:00 2001 From: Gabriel Cocenza Date: Thu, 19 Feb 2026 14:49:23 -0300 Subject: [PATCH 1/2] feat: replace grafana-agent to otel - update libs - remove script to update libs and use declerative way using charmcraft.yaml - update tests --- DEVELOPMENT.md | 10 +- charmcraft.yaml | 5 + lib/charms/grafana_agent/v0/cos_agent.py | 715 +++++++++++++++++- lib/charms/operator_libs_linux/v2/snap.py | 674 ++++++++++++----- scripts/update-charm-libs.sh | 10 - .../functional/tests/bundles/jammy-yoga.yaml | 8 +- .../tests/charm_tests/openstack_exporter.py | 20 +- tests/functional/tests/tests.yaml | 2 +- tests/unit/test_charm.py | 17 +- 9 files changed, 1177 insertions(+), 284 deletions(-) delete mode 100755 scripts/update-charm-libs.sh diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 2bf7c55..bd6f9ba 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -25,22 +25,18 @@ tox # run 'lint' and 'unit' environments ## Fetching libraries -Charm libraries are managed with charmcraft and recorded in `./scripts/update-charm-libs.sh`. - To update the libraries included, run ```shell -./scripts/update-charm-libs.sh +charmcraft fetch-libs ``` -If you need to include more charm libraries, you can run: +If you need to include more charm libraries, you can add the new lib in the `charm-libs` section in charmcraft.yaml and run: ```shell -charmcraft fetch-lib +charmcraft fetch-libs ``` -And add the corresponding command to `./scripts/update-charm-libs.sh`. - ## Checking for dashboard and alert rule updates The openstack exporter dashboards and alert rules are managed diff --git a/charmcraft.yaml b/charmcraft.yaml index 94afdf6..c0c3d79 100644 --- a/charmcraft.yaml +++ b/charmcraft.yaml @@ -72,6 +72,11 @@ config: If the snap file has been attached via the openstack-exporter resource, this option has no effect. +charm-libs: + - lib: grafana-agent.cos_agent + version: "0" + - lib: operator-libs-linux.snap + version: "2" links: documentation: https://discourse.charmhub.io/t/openstack-exporter-docs-index/13876 diff --git a/lib/charms/grafana_agent/v0/cos_agent.py b/lib/charms/grafana_agent/v0/cos_agent.py index 870ba62..228550a 100644 --- a/lib/charms/grafana_agent/v0/cos_agent.py +++ b/lib/charms/grafana_agent/v0/cos_agent.py @@ -8,6 +8,8 @@ - `COSAgentProvider`: Use in machine charms that need to have a workload's metrics or logs scraped, or forward rule files or dashboards to Prometheus, Loki or Grafana through the Grafana Agent machine charm. + NOTE: Be sure to add `limit: 1` in your charm for the cos-agent relation. That is the only + way we currently have to prevent two different grafana agent apps deployed on the same VM. - `COSAgentConsumer`: Used in the Grafana Agent machine charm to manage the requirer side of the `cos_agent` interface. @@ -22,7 +24,6 @@ Using the `COSAgentProvider` object only requires instantiating it, typically in the `__init__` method of your charm (the one which sends telemetry). -The constructor of `COSAgentProvider` has only one required and nine optional parameters: ```python def __init__( @@ -36,6 +37,7 @@ def __init__( log_slots: Optional[List[str]] = None, dashboard_dirs: Optional[List[str]] = None, refresh_events: Optional[List] = None, + tracing_protocols: Optional[List[str]] = None, scrape_configs: Optional[Union[List[Dict], Callable]] = None, ): ``` @@ -65,6 +67,8 @@ def __init__( - `refresh_events`: List of events on which to refresh relation data. +- `tracing_protocols`: List of requested tracing protocols that the charm requires to send traces. + - `scrape_configs`: List of standard scrape_configs dicts or a callable that returns the list in case the configs need to be generated dynamically. The contents of this list will be merged with the configs from `metrics_endpoints`. @@ -108,6 +112,7 @@ def __init__(self, *args): log_slots=["my-app:slot"], dashboard_dirs=["./src/dashboards_1", "./src/dashboards_2"], refresh_events=["update-status", "upgrade-charm"], + tracing_protocols=["otlp_http", "otlp_grpc"], scrape_configs=[ { "job_name": "custom_job", @@ -206,19 +211,34 @@ def __init__(self, *args): ``` """ +import enum import json import logging +import socket from collections import namedtuple from itertools import chain from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable, ClassVar, Dict, List, Optional, Set, Tuple, Union +from typing import ( + TYPE_CHECKING, + Any, + Callable, + ClassVar, + Dict, + List, + Literal, + MutableMapping, + Optional, + Set, + Tuple, + Union, +) import pydantic -from cosl import GrafanaDashboard, JujuTopology -from cosl.rules import AlertRules +from cosl import DashboardPath40UID, JujuTopology, LZMABase64 +from cosl.rules import AlertRules, generic_alert_groups from ops.charm import RelationChangedEvent from ops.framework import EventBase, EventSource, Object, ObjectEvents -from ops.model import Relation +from ops.model import ModelError, Relation from ops.testing import CharmType if TYPE_CHECKING: @@ -234,22 +254,224 @@ class _MetricsEndpointDict(TypedDict): LIBID = "dc15fa84cef84ce58155fb84f6c6213a" LIBAPI = 0 -LIBPATCH = 8 +LIBPATCH = 24 -PYDEPS = ["cosl", "pydantic < 2"] +PYDEPS = ["cosl >= 0.0.50", "pydantic"] DEFAULT_RELATION_NAME = "cos-agent" DEFAULT_PEER_RELATION_NAME = "peers" -DEFAULT_SCRAPE_CONFIG = { - "static_configs": [{"targets": ["localhost:80"]}], - "metrics_path": "/metrics", -} logger = logging.getLogger(__name__) SnapEndpoint = namedtuple("SnapEndpoint", "owner, name") -class CosAgentProviderUnitData(pydantic.BaseModel): +class TransportProtocolType(str, enum.Enum): + """Receiver Type.""" + + http = "http" + grpc = "grpc" + + +receiver_protocol_to_transport_protocol = { + "zipkin": TransportProtocolType.http, + "kafka": TransportProtocolType.http, + "tempo_http": TransportProtocolType.http, + "tempo_grpc": TransportProtocolType.grpc, + "otlp_grpc": TransportProtocolType.grpc, + "otlp_http": TransportProtocolType.http, + "jaeger_thrift_http": TransportProtocolType.http, +} + +_tracing_receivers_ports = { + # OTLP receiver: see + # https://github.com/open-telemetry/opentelemetry-collector/tree/v0.96.0/receiver/otlpreceiver + "otlp_http": 4318, + "otlp_grpc": 4317, + # Jaeger receiver: see + # https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.96.0/receiver/jaegerreceiver + "jaeger_grpc": 14250, + "jaeger_thrift_http": 14268, + # Zipkin receiver: see + # https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.96.0/receiver/zipkinreceiver + "zipkin": 9411, +} + +ReceiverProtocol = Literal["otlp_grpc", "otlp_http", "zipkin", "jaeger_thrift_http", "jaeger_grpc"] + + +def _dedupe_list(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Deduplicate items in the list via object identity.""" + unique_items = [] + for item in items: + if item not in unique_items: + unique_items.append(item) + return unique_items + + +class TracingError(Exception): + """Base class for custom errors raised by tracing.""" + + +class NotReadyError(TracingError): + """Raised by the provider wrapper if a requirer hasn't published the required data (yet).""" + + +class ProtocolNotFoundError(TracingError): + """Raised if the user doesn't receive an endpoint for a protocol it requested.""" + + +class ProtocolNotRequestedError(ProtocolNotFoundError): + """Raised if the user attempts to obtain an endpoint for a protocol it did not request.""" + + +class DataValidationError(TracingError): + """Raised when data validation fails on IPU relation data.""" + + +class AmbiguousRelationUsageError(TracingError): + """Raised when one wrongly assumes that there can only be one relation on an endpoint.""" + + +# TODO we want to eventually use `DatabagModel` from cosl but it likely needs a move to common package first +if int(pydantic.version.VERSION.split(".")[0]) < 2: # type: ignore + + class DatabagModel(pydantic.BaseModel): # type: ignore + """Base databag model.""" + + class Config: + """Pydantic config.""" + + # ignore any extra fields in the databag + extra = "ignore" + """Ignore any extra fields in the databag.""" + allow_population_by_field_name = True + """Allow instantiating this class by field name (instead of forcing alias).""" + + _NEST_UNDER = None + + @classmethod + def load(cls, databag: MutableMapping): + """Load this model from a Juju databag.""" + if cls._NEST_UNDER: + return cls.parse_obj(json.loads(databag[cls._NEST_UNDER])) + + try: + data = { + k: json.loads(v) + for k, v in databag.items() + # Don't attempt to parse model-external values + if k in {f.alias for f in cls.__fields__.values()} + } + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + logger.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.parse_raw(json.dumps(data)) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + logger.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + if clear and databag: + databag.clear() + + if databag is None: + databag = {} + + if self._NEST_UNDER: + databag[self._NEST_UNDER] = self.json(by_alias=True) + return databag + + dct = self.dict() + for key, field in self.__fields__.items(): # type: ignore + value = dct[key] + databag[field.alias or key] = json.dumps(value) + + return databag + +else: + from pydantic import ConfigDict + + class DatabagModel(pydantic.BaseModel): + """Base databag model.""" + + model_config = ConfigDict( + # ignore any extra fields in the databag + extra="ignore", + # Allow instantiating this class by field name (instead of forcing alias). + populate_by_name=True, + # Custom config key: whether to nest the whole datastructure (as json) + # under a field or spread it out at the toplevel. + _NEST_UNDER=None, # type: ignore + arbitrary_types_allowed=True, + ) + """Pydantic config.""" + + @classmethod + def load(cls, databag: MutableMapping): + """Load this model from a Juju databag.""" + nest_under = cls.model_config.get("_NEST_UNDER") # type: ignore + if nest_under: + return cls.model_validate(json.loads(databag[nest_under])) # type: ignore + + try: + data = { + k: json.loads(v) + for k, v in databag.items() + # Don't attempt to parse model-external values + if k in {(f.alias or n) for n, f in cls.__fields__.items()} + } + except json.JSONDecodeError as e: + msg = f"invalid databag contents: expecting json. {databag}" + logger.error(msg) + raise DataValidationError(msg) from e + + try: + return cls.model_validate_json(json.dumps(data)) # type: ignore + except pydantic.ValidationError as e: + msg = f"failed to validate databag: {databag}" + logger.debug(msg, exc_info=True) + raise DataValidationError(msg) from e + + def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): + """Write the contents of this model to Juju databag. + + :param databag: the databag to write the data to. + :param clear: ensure the databag is cleared before writing it. + """ + if clear and databag: + databag.clear() + + if databag is None: + databag = {} + nest_under = self.model_config.get("_NEST_UNDER") + if nest_under: + databag[nest_under] = self.model_dump_json( # type: ignore + by_alias=True, + # skip keys whose values are default + exclude_defaults=True, + ) + return databag + + dct = self.model_dump() # type: ignore + for key, field in self.model_fields.items(): # type: ignore + value = dct[key] + if value == field.default: + continue + databag[field.alias or key] = json.dumps(value) + + return databag + + +class CosAgentProviderUnitData(DatabagModel): # type: ignore """Unit databag model for `cos-agent` relation.""" # The following entries are the same for all units of the same principal. @@ -257,7 +479,7 @@ class CosAgentProviderUnitData(pydantic.BaseModel): # this needs to make its way to the gagent leader metrics_alert_rules: dict log_alert_rules: dict - dashboards: List[GrafanaDashboard] + dashboards: List[str] # subordinate is no longer used but we should keep it until we bump the library to ensure # we don't break compatibility. subordinate: Optional[bool] = None @@ -267,13 +489,16 @@ class CosAgentProviderUnitData(pydantic.BaseModel): metrics_scrape_jobs: List[Dict] log_slots: List[str] + # Requested tracing protocols. + tracing_protocols: Optional[List[str]] = None + # when this whole datastructure is dumped into a databag, it will be nested under this key. # while not strictly necessary (we could have it 'flattened out' into the databag), # this simplifies working with the model. KEY: ClassVar[str] = "config" -class CosAgentPeersUnitData(pydantic.BaseModel): +class CosAgentPeersUnitData(DatabagModel): # type: ignore """Unit databag model for `peers` cos-agent machine charm peer relation.""" # We need the principal unit name and relation metadata to be able to render identifiers @@ -287,7 +512,7 @@ class CosAgentPeersUnitData(pydantic.BaseModel): # of the outgoing o11y relations. metrics_alert_rules: Optional[dict] log_alert_rules: Optional[dict] - dashboards: Optional[List[GrafanaDashboard]] + dashboards: Optional[List[str]] # when this whole datastructure is dumped into a databag, it will be nested under this key. # while not strictly necessary (we could have it 'flattened out' into the databag), @@ -304,6 +529,83 @@ def app_name(self) -> str: return self.unit_name.split("/")[0] +if int(pydantic.version.VERSION.split(".")[0]) < 2: # type: ignore + + class ProtocolType(pydantic.BaseModel): # type: ignore + """Protocol Type.""" + + class Config: + """Pydantic config.""" + + use_enum_values = True + """Allow serializing enum values.""" + + name: str = pydantic.Field( + ..., + description="Receiver protocol name. What protocols are supported (and what they are called) " + "may differ per provider.", + examples=["otlp_grpc", "otlp_http", "tempo_http"], + ) + + type: TransportProtocolType = pydantic.Field( + ..., + description="The transport protocol used by this receiver.", + examples=["http", "grpc"], + ) + +else: + + class ProtocolType(pydantic.BaseModel): + """Protocol Type.""" + + model_config = pydantic.ConfigDict( + # Allow serializing enum values. + use_enum_values=True + ) + """Pydantic config.""" + + name: str = pydantic.Field( + ..., + description="Receiver protocol name. What protocols are supported (and what they are called) " + "may differ per provider.", + examples=["otlp_grpc", "otlp_http", "tempo_http"], + ) + + type: TransportProtocolType = pydantic.Field( + ..., + description="The transport protocol used by this receiver.", + examples=["http", "grpc"], + ) + + +class Receiver(pydantic.BaseModel): + """Specification of an active receiver.""" + + protocol: ProtocolType = pydantic.Field(..., description="Receiver protocol name and type.") + url: Optional[str] = pydantic.Field( + ..., + description="""URL at which the receiver is reachable. If there's an ingress, it would be the external URL. + Otherwise, it would be the service's fqdn or internal IP. + If the protocol type is grpc, the url will not contain a scheme.""", + examples=[ + "http://traefik_address:2331", + "https://traefik_address:2331", + "http://tempo_public_ip:2331", + "https://tempo_public_ip:2331", + "tempo_public_ip:2331", + ], + ) + + +class CosAgentRequirerUnitData(DatabagModel): # type: ignore + """Application databag model for the COS-agent requirer.""" + + receivers: List[Receiver] = pydantic.Field( + ..., + description="List of all receivers enabled on the tracing provider.", + ) + + class COSAgentProvider(Object): """Integration endpoint wrapper for the provider side of the cos_agent interface.""" @@ -318,8 +620,10 @@ def __init__( log_slots: Optional[List[str]] = None, dashboard_dirs: Optional[List[str]] = None, refresh_events: Optional[List] = None, + tracing_protocols: Optional[List[str]] = None, *, - scrape_configs: Optional[Union[List[dict], Callable]] = None, + scrape_configs: Optional[Union[List[dict], Callable[[], List[Dict[str, Any]]]]] = None, + extra_alert_groups: Optional[Callable[[], Dict[str, Any]]] = None, ): """Create a COSAgentProvider instance. @@ -336,9 +640,13 @@ def __init__( in the form ["snap-name:slot", ...]. dashboard_dirs: Directory where the dashboards are stored. refresh_events: List of events on which to refresh relation data. + tracing_protocols: List of protocols that the charm will be using for sending traces. scrape_configs: List of standard scrape_configs dicts or a callable that returns the list in case the configs need to be generated dynamically. The contents of this list will be merged with the contents of `metrics_endpoints`. + extra_alert_groups: A callable that returns a dict of alert rule groups in case the + alerts need to be generated dynamically. The contents of this dict will be merged + with generic and bundled alert rules. """ super().__init__(charm, relation_name) dashboard_dirs = dashboard_dirs or ["./src/grafana_dashboards"] @@ -347,12 +655,15 @@ def __init__( self._relation_name = relation_name self._metrics_endpoints = metrics_endpoints or [] self._scrape_configs = scrape_configs or [] + self._extra_alert_groups = extra_alert_groups or {} self._metrics_rules = metrics_rules_dir self._logs_rules = logs_rules_dir self._recursive = recurse_rules_dirs self._log_slots = log_slots or [] self._dashboard_dirs = dashboard_dirs self._refresh_events = refresh_events or [self._charm.on.config_changed] + self._tracing_protocols = tracing_protocols + self._is_single_endpoint = charm.meta.relations[relation_name].limit == 1 events = self._charm.on[relation_name] self.framework.observe(events.relation_joined, self._on_refresh) @@ -377,6 +688,7 @@ def _on_refresh(self, event): dashboards=self._dashboards, metrics_scrape_jobs=self._scrape_jobs, log_slots=self._log_slots, + tracing_protocols=self._tracing_protocols, ) relation.data[self._charm.unit][data.KEY] = data.json() except ( @@ -387,10 +699,11 @@ def _on_refresh(self, event): @property def _scrape_jobs(self) -> List[Dict]: - """Return a prometheus_scrape-like data structure for jobs. + """Return a list of scrape_configs. https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config """ + # Optionally allow the charm to set the scrape_configs if callable(self._scrape_configs): scrape_configs = self._scrape_configs() else: @@ -398,32 +711,45 @@ def _scrape_jobs(self) -> List[Dict]: scrape_configs = self._scrape_configs.copy() # Convert "metrics_endpoints" to standard scrape_configs, and add them in + unit_name = self._charm.unit.name.replace("/", "_") for endpoint in self._metrics_endpoints: + port = endpoint["port"] + path = endpoint["path"] + sanitized_path = path.strip("/").replace("/", "_") scrape_configs.append( { - "metrics_path": endpoint["path"], - "static_configs": [{"targets": [f"localhost:{endpoint['port']}"]}], + "job_name": f"{unit_name}_localhost_{port}_{sanitized_path}", + "metrics_path": path, + "static_configs": [{"targets": [f"localhost:{port}"]}], } ) - scrape_configs = scrape_configs or [DEFAULT_SCRAPE_CONFIG] - - # Augment job name to include the app name and a unique id (index) - for idx, scrape_config in enumerate(scrape_configs): - scrape_config["job_name"] = "_".join( - [self._charm.app.name, str(idx), scrape_config.get("job_name", "default")] - ) + scrape_configs = scrape_configs or [] return scrape_configs @property def _metrics_alert_rules(self) -> Dict: - """Use (for now) the prometheus_scrape AlertRules to initialize this.""" + """Return a dict of alert rule groups.""" + # Optionally allow the charm to add the metrics_alert_rules + if callable(self._extra_alert_groups): + rules = self._extra_alert_groups() + else: + rules = {"groups": []} + alert_rules = AlertRules( query_type="promql", topology=JujuTopology.from_charm(self._charm) ) alert_rules.add_path(self._metrics_rules, recursive=self._recursive) - return alert_rules.as_dict() + alert_rules.add( + generic_alert_groups.application_rules, + group_name_prefix=JujuTopology.from_charm(self._charm).identifier, + ) + + # NOTE: The charm could supply rules we implement in this method, so we deduplicate + rules["groups"] = _dedupe_list(rules["groups"] + alert_rules.as_dict()["groups"]) + + return rules @property def _log_alert_rules(self) -> Dict: @@ -433,14 +759,144 @@ def _log_alert_rules(self) -> Dict: return alert_rules.as_dict() @property - def _dashboards(self) -> List[GrafanaDashboard]: - dashboards: List[GrafanaDashboard] = [] + def _dashboards(self) -> List[str]: + dashboards: List[str] = [] for d in self._dashboard_dirs: for path in Path(d).glob("*"): - dashboard = GrafanaDashboard._serialize(path.read_bytes()) - dashboards.append(dashboard) + with open(path, "rt") as fp: + dashboard = json.load(fp) + rel_path = str( + path.relative_to(self._charm.charm_dir) if path.is_absolute() else path + ) + # COSAgentProvider is somewhat analogous to GrafanaDashboardProvider. We need to overwrite the uid here + # because there is currently no other way to communicate the dashboard path separately. + # https://github.com/canonical/grafana-k8s-operator/pull/363 + dashboard["uid"] = DashboardPath40UID.generate(self._charm.meta.name, rel_path) + + # Add tags + tags: List[str] = dashboard.get("tags", []) + if not any(tag.startswith("charm: ") for tag in tags): + tags.append(f"charm: {self._charm.meta.name}") + dashboard["tags"] = tags + + dashboards.append(LZMABase64.compress(json.dumps(dashboard))) return dashboards + @property + def relations(self) -> List[Relation]: + """The tracing relations associated with this endpoint.""" + return self._charm.model.relations[self._relation_name] + + @property + def _relation(self) -> Optional[Relation]: + """If this wraps a single endpoint, the relation bound to it, if any.""" + if not self._is_single_endpoint: + objname = type(self).__name__ + raise AmbiguousRelationUsageError( + f"This {objname} wraps a {self._relation_name} endpoint that has " + "limit != 1. We can't determine what relation, of the possibly many, you are " + f"referring to. Please pass a relation instance while calling {objname}, " + "or set limit=1 in the charm metadata." + ) + relations = self.relations + return relations[0] if relations else None + + def is_ready(self, relation: Optional[Relation] = None): + """Is this endpoint ready?""" + relation = relation or self._relation + if not relation: + logger.debug(f"no relation on {self._relation_name!r}: tracing not ready") + return False + if relation.data is None: + logger.error(f"relation data is None for {relation}") + return False + if not relation.app: + logger.error(f"{relation} event received but there is no relation.app") + return False + try: + unit = next(iter(relation.units), None) + if not unit: + return False + databag = dict(relation.data[unit]) + CosAgentRequirerUnitData.load(databag) + + except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): + logger.info(f"failed validating relation data for {relation}") + return False + return True + + def get_all_endpoints( + self, relation: Optional[Relation] = None + ) -> Optional[CosAgentRequirerUnitData]: + """Unmarshalled relation data.""" + relation = relation or self._relation + if not relation or not self.is_ready(relation): + return None + unit = next(iter(relation.units), None) + if not unit: + return None + return CosAgentRequirerUnitData.load(relation.data[unit]) # type: ignore + + def _get_tracing_endpoint( + self, relation: Optional[Relation], protocol: ReceiverProtocol + ) -> str: + """Return a tracing endpoint URL if it is available or raise a ProtocolNotFoundError.""" + unit_data = self.get_all_endpoints(relation) + if not unit_data: + # we didn't find the protocol because the remote end didn't publish any data yet + # it might also mean that grafana-agent doesn't have a relation to the tracing backend + raise ProtocolNotFoundError(protocol) + receivers: List[Receiver] = [i for i in unit_data.receivers if i.protocol.name == protocol] + if not receivers: + # we didn't find the protocol because grafana-agent didn't return us the protocol that we requested + # the caller might want to verify that we did indeed request this protocol + raise ProtocolNotFoundError(protocol) + if len(receivers) > 1: + logger.warning( + f"too many receivers with protocol={protocol!r}; using first one. Found: {receivers}" + ) + + receiver = receivers[0] + if not receiver.url: + # grafana-agent isn't connected to the tracing backend yet + raise ProtocolNotFoundError(protocol) + return receiver.url + + def get_tracing_endpoint( + self, protocol: ReceiverProtocol, relation: Optional[Relation] = None + ) -> str: + """Receiver endpoint for the given protocol. + + It could happen that this function gets called before the provider publishes the endpoints. + In such a scenario, if a non-leader unit calls this function, a permission denied exception will be raised due to + restricted access. To prevent this, this function needs to be guarded by the `is_ready` check. + + Raises: + ProtocolNotRequestedError: + If the charm unit is the leader unit and attempts to obtain an endpoint for a protocol it did not request. + ProtocolNotFoundError: + If the charm attempts to obtain an endpoint when grafana-agent isn't related to a tracing backend. + """ + try: + return self._get_tracing_endpoint(relation or self._relation, protocol=protocol) + except ProtocolNotFoundError: + # let's see if we didn't find it because we didn't request the endpoint + requested_protocols = set() + relations = [relation] if relation else self.relations + for relation in relations: + try: + databag = CosAgentProviderUnitData.load(relation.data[self._charm.unit]) + except DataValidationError: + continue + + if databag.tracing_protocols: + requested_protocols.update(databag.tracing_protocols) + + if protocol not in requested_protocols: + raise ProtocolNotRequestedError(protocol, relation) + + raise + class COSAgentDataChanged(EventBase): """Event emitted by `COSAgentRequirer` when relation data changes.""" @@ -481,6 +937,7 @@ def __init__( relation_name: str = DEFAULT_RELATION_NAME, peer_relation_name: str = DEFAULT_PEER_RELATION_NAME, refresh_events: Optional[List[str]] = None, + is_tracing_ready: Optional[Callable] = None, ): """Create a COSAgentRequirer instance. @@ -489,18 +946,22 @@ def __init__( relation_name: The name of the relation to communicate over. peer_relation_name: The name of the peer relation to communicate over. refresh_events: List of events on which to refresh relation data. + is_tracing_ready: Custom function to evaluate whether the trace receiver url should be sent. """ super().__init__(charm, relation_name) self._charm = charm self._relation_name = relation_name self._peer_relation_name = peer_relation_name self._refresh_events = refresh_events or [self._charm.on.config_changed] + self._is_tracing_ready = is_tracing_ready events = self._charm.on[relation_name] self.framework.observe( events.relation_joined, self._on_relation_data_changed ) # TODO: do we need this? self.framework.observe(events.relation_changed, self._on_relation_data_changed) + self.framework.observe(events.relation_departed, self._on_relation_departed) + for event in self._refresh_events: self.framework.observe(event, self.trigger_refresh) # pyright: ignore @@ -528,6 +989,26 @@ def _on_peer_relation_changed(self, _): if self._charm.unit.is_leader(): self.on.data_changed.emit() # pyright: ignore + def _on_relation_departed(self, event): + """Remove provider's (principal's) alert rules and dashboards from peer data when the cos-agent relation to the principal is removed.""" + if not self.peer_relation: + event.defer() + return + # empty the departing unit's alert rules and dashboards from peer data + data = CosAgentPeersUnitData( + unit_name=event.unit.name, + relation_id=str(event.relation.id), + relation_name=event.relation.name, + metrics_alert_rules={}, + log_alert_rules={}, + dashboards=[], + ) + self.peer_relation.data[self._charm.unit][ + f"{CosAgentPeersUnitData.KEY}-{event.unit.name}" + ] = data.json() + + self.on.data_changed.emit() # pyright: ignore + def _on_relation_data_changed(self, event: RelationChangedEvent): # Peer data is the only means of communication between subordinate units. if not self.peer_relation: @@ -554,6 +1035,12 @@ def _on_relation_data_changed(self, event: RelationChangedEvent): if not (provider_data := self._validated_provider_data(raw)): return + # write enabled receivers to cos-agent relation + try: + self.update_tracing_receivers() + except ModelError: + raise + # Copy data from the cos_agent relation to the peer relation, so the leader could # follow up. # Save the originating unit name, so it could be used for topology later on by the leader. @@ -574,6 +1061,49 @@ def _on_relation_data_changed(self, event: RelationChangedEvent): # need to emit `on.data_changed`), so we're emitting `on.data_changed` either way. self.on.data_changed.emit() # pyright: ignore + def update_tracing_receivers(self): + """Updates the list of exposed tracing receivers in all relations.""" + tracing_ready = ( + self._is_tracing_ready if self._is_tracing_ready else self._charm.tracing.is_ready # type: ignore + ) + try: + for relation in self._charm.model.relations[self._relation_name]: + CosAgentRequirerUnitData( + receivers=[ + Receiver( + # if tracing isn't ready, we don't want the wrong receiver URLs present in the databag. + # however, because of the backwards compatibility requirements, we need to still provide + # the protocols list so that the charm with older cos_agent version doesn't error its hooks. + # before this change was added, the charm with old cos_agent version threw exceptions with + # connections to grafana-agent timing out. After the change, the charm will fail validating + # databag contents (as it expects a string in URL) but that won't cause any errors as + # tracing endpoints are the only content in the grafana-agent's side of the databag. + url=f"{self._get_tracing_receiver_url(protocol)}" + if tracing_ready() + else None, + protocol=ProtocolType( + name=protocol, + type=receiver_protocol_to_transport_protocol[protocol], + ), + ) + for protocol in self.requested_tracing_protocols() + ], + ).dump(relation.data[self._charm.unit]) + + except ModelError as e: + # args are bytes + msg = e.args[0] + if isinstance(msg, bytes): + if msg.startswith( + b"ERROR cannot read relation application settings: permission denied" + ): + logger.error( + f"encountered error {e} while attempting to update_relation_data." + f"The relation must be gone." + ) + return + raise + def _validated_provider_data(self, raw) -> Optional[CosAgentProviderUnitData]: try: return CosAgentProviderUnitData(**json.loads(raw)) @@ -586,6 +1116,54 @@ def trigger_refresh(self, _): # FIXME: Figure out what we should do here self.on.data_changed.emit() # pyright: ignore + def _get_requested_protocols(self, relation: Relation): + # Coherence check + units = relation.units + if len(units) > 1: + # should never happen + raise ValueError( + f"unexpected error: subordinate relation {relation} should have exactly one unit" + ) + + unit = next(iter(units), None) + + if not unit: + return None + + if not (raw := relation.data[unit].get(CosAgentProviderUnitData.KEY)): + return None + + if not (provider_data := self._validated_provider_data(raw)): + return None + + return provider_data.tracing_protocols + + def requested_tracing_protocols(self): + """All receiver protocols that have been requested by our related apps.""" + requested_protocols = set() + for relation in self._charm.model.relations[self._relation_name]: + try: + protocols = self._get_requested_protocols(relation) + except NotReadyError: + continue + if protocols: + requested_protocols.update(protocols) + return requested_protocols + + def _get_tracing_receiver_url(self, protocol: str): + scheme = "http" + try: + if self._charm.cert.enabled: # type: ignore + scheme = "https" + # not only Grafana Agent can implement cos_agent. If the charm doesn't have the `cert` attribute + # using our cert_handler, it won't have the `enabled` parameter. In this case, we pass and assume http. + except AttributeError: + pass + # the assumption is that a subordinate charm will always be accessible to its principal charm under its fqdn + if receiver_protocol_to_transport_protocol[protocol] == TransportProtocolType.grpc: + return f"{socket.getfqdn()}:{_tracing_receivers_ports[protocol]}" + return f"{scheme}://{socket.getfqdn()}:{_tracing_receivers_ports[protocol]}" + @property def _remote_data(self) -> List[Tuple[CosAgentProviderUnitData, JujuTopology]]: """Return a list of remote data from each of the related units. @@ -721,8 +1299,18 @@ def metrics_jobs(self) -> List[Dict]: @property def snap_log_endpoints(self) -> List[SnapEndpoint]: """Fetch logging endpoints exposed by related snaps.""" + endpoints = [] + endpoints_with_topology = self.snap_log_endpoints_with_topology + for endpoint, _ in endpoints_with_topology: + endpoints.append(endpoint) + + return endpoints + + @property + def snap_log_endpoints_with_topology(self) -> List[Tuple[SnapEndpoint, JujuTopology]]: + """Fetch logging endpoints and charm topology for each related snap.""" plugs = [] - for data, _ in self._remote_data: + for data, topology in self._remote_data: targets = data.log_slots if targets: for target in targets: @@ -733,15 +1321,16 @@ def snap_log_endpoints(self) -> List[SnapEndpoint]: "endpoints; this should not happen." ) else: - plugs.append(target) + plugs.append((target, topology)) endpoints = [] - for plug in plugs: + for plug, topology in plugs: if ":" not in plug: logger.error(f"invalid plug definition received: {plug}. Ignoring...") else: endpoint = SnapEndpoint(*plug.split(":")) - endpoints.append(endpoint) + endpoints.append((endpoint, topology)) + return endpoints @property @@ -789,7 +1378,7 @@ def dashboards(self) -> List[Dict[str, str]]: seen_apps.append(app_name) for encoded_dashboard in data.dashboards or (): - content = GrafanaDashboard(encoded_dashboard)._deserialize() + content = json.loads(LZMABase64.decompress(encoded_dashboard)) title = content.get("title", "no_title") @@ -804,3 +1393,55 @@ def dashboards(self) -> List[Dict[str, str]]: ) return dashboards + + +def charm_tracing_config( + endpoint_requirer: COSAgentProvider, cert_path: Optional[Union[Path, str]] +) -> Tuple[Optional[str], Optional[str]]: + """Utility function to determine the charm_tracing config you will likely want. + + If no endpoint is provided: + disable charm tracing. + If https endpoint is provided but cert_path is not found on disk: + disable charm tracing. + If https endpoint is provided and cert_path is None: + raise TracingError + Else: + proceed with charm tracing (with or without tls, as appropriate) + + Usage: + >>> from lib.charms.tempo_coordinator_k8s.v0.charm_tracing import trace_charm + >>> from lib.charms.tempo_coordinator_k8s.v0.tracing import charm_tracing_config + >>> @trace_charm(tracing_endpoint="my_endpoint", cert_path="cert_path") + >>> class MyCharm(...): + >>> _cert_path = "/path/to/cert/on/charm/container.crt" + >>> def __init__(self, ...): + >>> self.tracing = TracingEndpointRequirer(...) + >>> self.my_endpoint, self.cert_path = charm_tracing_config( + ... self.tracing, self._cert_path) + """ + if not endpoint_requirer.is_ready(): + return None, None + + try: + endpoint = endpoint_requirer.get_tracing_endpoint("otlp_http") + except ProtocolNotFoundError: + logger.warn( + "Endpoint for tracing wasn't provided as tracing backend isn't ready yet. If grafana-agent isn't connected to a tracing backend, integrate it. Otherwise this issue should resolve itself in a few events." + ) + return None, None + + if not endpoint: + return None, None + + is_https = endpoint.startswith("https://") + + if is_https: + if cert_path is None: + raise TracingError("Cannot send traces to an https endpoint without a certificate.") + if not Path(cert_path).exists(): + # if endpoint is https BUT we don't have a server_cert yet: + # disable charm tracing until we do to prevent tls errors + return None, None + return endpoint, str(cert_path) + return endpoint, None diff --git a/lib/charms/operator_libs_linux/v2/snap.py b/lib/charms/operator_libs_linux/v2/snap.py index 871ff5d..71e44ce 100644 --- a/lib/charms/operator_libs_linux/v2/snap.py +++ b/lib/charms/operator_libs_linux/v2/snap.py @@ -12,7 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Representations of the system's Snaps, and abstractions around managing them. +"""Legacy Charmhub-hosted snap library, deprecated in favour of ``charmlibs.snap``. + +WARNING: This library is deprecated and will no longer receive feature updates or bugfixes. +``charmlibs.snap`` version 1.0 is a bug-for-bug compatible migration of this library. +Add 'charmlibs-snap~=1.0' to your charm's dependencies, and remove this Charmhub-hosted library. +Then replace `from charms.operator_libs_linux.v2 import snap` with `from charmlibs import snap`. +Read more: +- https://documentation.ubuntu.com/charmlibs +- https://pypi.org/project/charmlibs-snap + +--- + +Representations of the system's Snaps, and abstractions around managing them. The `snap` module provides convenience methods for listing, installing, refreshing, and removing Snap packages, in addition to setting and getting configuration options for them. @@ -54,8 +66,14 @@ except snap.SnapError as e: logger.error("An exception occurred when installing snaps. Reason: %s" % e.message) ``` + +Dependencies: +Note that this module requires `opentelemetry-api`, which is already included into +your charm's virtual environment via `ops >= 2.21`. """ +from __future__ import annotations + import http.client import json import logging @@ -64,16 +82,36 @@ import socket import subprocess import sys +import time +import typing import urllib.error import urllib.parse import urllib.request -from collections.abc import Mapping from datetime import datetime, timedelta, timezone from enum import Enum from subprocess import CalledProcessError, CompletedProcess -from typing import Any, Dict, Iterable, List, Optional, Union +from typing import ( + Callable, + Iterable, + Literal, + Mapping, + NoReturn, + Sequence, + TypedDict, + TypeVar, +) + +import opentelemetry.trace + +if typing.TYPE_CHECKING: + # avoid typing_extensions import at runtime + from typing_extensions import NotRequired, ParamSpec, Required, Self, TypeAlias, Unpack + + _P = ParamSpec("_P") + _T = TypeVar("_T") logger = logging.getLogger(__name__) +tracer = opentelemetry.trace.get_tracer(__name__) # The unique Charmhub library identifier, never change it LIBID = "05394e5893f94f2d90feb7cbe6b633cd" @@ -83,15 +121,17 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 4 +LIBPATCH = 15 + +PYDEPS = ["opentelemetry-api"] # Regex to locate 7-bit C1 ANSI sequences ansi_filter = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") -def _cache_init(func): - def inner(*args, **kwargs): +def _cache_init(func: Callable[_P, _T]) -> Callable[_P, _T]: + def inner(*args: _P.args, **kwargs: _P.kwargs) -> _T: if _Cache.cache is None: _Cache.cache = SnapCache() return func(*args, **kwargs) @@ -99,8 +139,60 @@ def inner(*args, **kwargs): return inner -# recursive hints seems to error out pytest -JSONType = Union[Dict[str, Any], List[Any], str, int, float] +# this is used for return types, so it (a) uses concrete types and (b) does not contain None +# because setting snap config values to null removes the key so a null value can't be returned +_JSONLeaf: TypeAlias = 'str | int | float | bool' +JSONType: TypeAlias = "dict[str, JSONType] | list[JSONType] | _JSONLeaf" +# we also need a jsonable type for arguments, +# which (a) uses abstract types and (b) may contain None +JSONAble: TypeAlias = "Mapping[str, JSONAble] | Sequence[JSONAble] | _JSONLeaf | None" + + +class _AsyncChangeDict(TypedDict, total=True): + """The subset of the json returned by GET changes that we care about internally.""" + + status: str + data: JSONType + + +class _SnapDict(TypedDict, total=True): + """The subset of the json returned by GET snap/find that we care about internally.""" + + name: str + channel: str + revision: str + version: str + confinement: str + apps: NotRequired[list[dict[str, JSONType]] | None] + + +class SnapServiceDict(TypedDict, total=True): + """Dictionary representation returned by SnapService.as_dict.""" + + daemon: str | None + daemon_scope: str | None + enabled: bool + active: bool + activators: list[str] + + +# TypedDicts with hyphenated keys +_SnapServiceKwargsDict = TypedDict("_SnapServiceKwargsDict", {"daemon-scope": str}, total=False) +# the kwargs accepted by SnapService +_SnapServiceAppDict = TypedDict( + # the data we expect a Snap._apps entry to contain for a daemon + "_SnapServiceAppDict", + { + "name": "Required[str]", + "daemon": str, + "daemon_scope": str, + "daemon-scope": str, + "enabled": bool, + "active": bool, + "activators": "list[str]", + }, + total=False, +) class SnapService: @@ -108,20 +200,20 @@ class SnapService: def __init__( self, - daemon: Optional[str] = None, - daemon_scope: Optional[str] = None, + daemon: str | None = None, + daemon_scope: str | None = None, enabled: bool = False, active: bool = False, - activators: List[str] = [], - **kwargs, + activators: list[str] | None = None, + **kwargs: Unpack[_SnapServiceKwargsDict], ): self.daemon = daemon - self.daemon_scope = kwargs.get("daemon-scope", None) or daemon_scope + self.daemon_scope = kwargs.get("daemon-scope") or daemon_scope self.enabled = enabled self.active = active - self.activators = activators + self.activators = activators if activators is not None else [] - def as_dict(self) -> Dict: + def as_dict(self) -> SnapServiceDict: """Return instance representation as dict.""" return { "daemon": self.daemon, @@ -136,57 +228,54 @@ class MetaCache(type): """MetaCache class used for initialising the snap cache.""" @property - def cache(cls) -> "SnapCache": + def cache(cls) -> SnapCache: """Property for returning the snap cache.""" return cls._cache @cache.setter - def cache(cls, cache: "SnapCache") -> None: + def cache(cls, cache: SnapCache) -> None: """Setter for the snap cache.""" cls._cache = cache - def __getitem__(cls, name) -> "Snap": + def __getitem__(cls, name: str) -> Snap: """Snap cache getter.""" return cls._cache[name] -class _Cache(object, metaclass=MetaCache): +class _Cache(metaclass=MetaCache): _cache = None class Error(Exception): """Base class of most errors raised by this library.""" - def __repr__(self): + def __init__(self, message: str = "", *args: object): + super().__init__(message, *args) + self.message = message + + def __repr__(self) -> str: """Represent the Error class.""" - return "<{}.{} {}>".format(type(self).__module__, type(self).__name__, self.args) + return f"<{type(self).__module__}.{type(self).__name__} {self.args}>" @property - def name(self): + def name(self) -> str: """Return a string representation of the model plus class.""" - return "<{}.{}>".format(type(self).__module__, type(self).__name__) - - @property - def message(self): - """Return the message passed as an argument.""" - return self.args[0] + return f"<{type(self).__module__}.{type(self).__name__}>" class SnapAPIError(Error): """Raised when an HTTP API error occurs talking to the Snapd server.""" - def __init__(self, body: Dict, code: int, status: str, message: str): + def __init__(self, body: Mapping[str, JSONAble], code: int, status: str, message: str): super().__init__(message) # Makes str(e) return message self.body = body self.code = code self.status = status self._message = message - def __repr__(self): + def __repr__(self) -> str: """Represent the SnapAPIError class.""" - return "APIError({!r}, {!r}, {!r}, {!r})".format( - self.body, self.code, self.status, self._message - ) + return f"APIError({self.body!r}, {self.code!r}, {self.status!r}, {self._message!r})" class SnapState(Enum): @@ -201,12 +290,30 @@ class SnapState(Enum): class SnapError(Error): """Raised when there's an error running snap control commands.""" + @classmethod + def _from_called_process_error(cls, msg: str, error: CalledProcessError) -> Self: + lines = [msg] + if error.stdout: + lines.extend(['Stdout:', error.stdout]) + if error.stderr: + lines.extend(['Stderr:', error.stderr]) + try: + cmd = ['journalctl', '--unit', 'snapd', '--lines', '20'] + with tracer.start_as_current_span(cmd[0]) as span: + span.set_attribute("argv", cmd) + logs = subprocess.check_output(cmd, text=True) + except Exception as e: + lines.extend(['Error fetching logs:', str(e)]) + else: + lines.extend(['Latest logs:', logs]) + return cls('\n'.join(lines)) + class SnapNotFoundError(Error): """Raised when a requested snap is not known to the system.""" -class Snap(object): +class Snap: """Represents a snap package and its properties. `Snap` exposes the following properties about a snap: @@ -215,53 +322,55 @@ class Snap(object): - channel: "stable", "candidate", "beta", and "edge" are common - revision: a string representing the snap's revision - confinement: "classic", "strict", or "devmode" + - version: a string representing the snap's version, if set by the snap author """ def __init__( self, - name, + name: str, state: SnapState, channel: str, revision: str, confinement: str, - apps: Optional[List[Dict[str, str]]] = None, - cohort: Optional[str] = "", + apps: list[dict[str, JSONType]] | None = None, + cohort: str | None = None, + *, + version: str | None = None, ) -> None: self._name = name self._state = state self._channel = channel self._revision = revision self._confinement = confinement - self._cohort = cohort + self._cohort = cohort or "" self._apps = apps or [] + self._version = version self._snap_client = SnapClient() - def __eq__(self, other) -> bool: + def __eq__(self, other: object) -> bool: """Equality for comparison.""" return isinstance(other, self.__class__) and ( self._name, self._revision, ) == (other._name, other._revision) - def __hash__(self): + def __hash__(self) -> int: """Calculate a hash for this snap.""" return hash((self._name, self._revision)) - def __repr__(self): + def __repr__(self) -> str: """Represent the object such that it can be reconstructed.""" - return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__) + return f"<{self.__module__}.{type(self).__name__}: {self.__dict__}>" - def __str__(self): + def __str__(self) -> str: """Represent the snap object as a string.""" - return "<{}: {}-{}.{} -- {}>".format( - self.__class__.__name__, - self._name, - self._revision, - self._channel, - str(self._state), + return ( + f"<{type(self).__name__}: " + f"{self._name}-{self._revision}.{self._channel}" + f" -- {self._state}>" ) - def _snap(self, command: str, optargs: Optional[Iterable[str]] = None) -> str: + def _snap(self, command: str, optargs: Iterable[str] | None = None) -> str: """Perform a snap operation. Args: @@ -275,19 +384,18 @@ def _snap(self, command: str, optargs: Optional[Iterable[str]] = None) -> str: optargs = optargs or [] args = ["snap", command, self._name, *optargs] try: - return subprocess.check_output(args, universal_newlines=True) + with tracer.start_as_current_span(args[0]) as span: + span.set_attribute("argv", args) + return subprocess.check_output(args, text=True, stderr=subprocess.PIPE) except CalledProcessError as e: - raise SnapError( - "Snap: {!r}; command {!r} failed with output = {!r}".format( - self._name, args, e.output - ) - ) + msg = f'Snap: {self._name!r} -- command {args!r} failed!' + raise SnapError._from_called_process_error(msg=msg, error=e) from e def _snap_daemons( self, - command: List[str], - services: Optional[List[str]] = None, - ) -> CompletedProcess: + command: list[str], + services: list[str] | None = None, + ) -> CompletedProcess[str]: """Perform snap app commands. Args: @@ -299,18 +407,29 @@ def _snap_daemons( """ if services: # an attempt to keep the command constrained to the snap instance's services - services = ["{}.{}".format(self._name, service) for service in services] + services = [f"{self._name}.{service}" for service in services] else: services = [self._name] args = ["snap", *command, *services] try: - return subprocess.run(args, universal_newlines=True, check=True, capture_output=True) + with tracer.start_as_current_span(args[0]) as span: + span.set_attribute("argv", args) + return subprocess.run(args, text=True, check=True, capture_output=True) except CalledProcessError as e: - raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr)) - - def get(self, key: Optional[str], *, typed: bool = False) -> Any: + msg = f'Snap: {self._name!r} -- command {args!r} failed!' + raise SnapError._from_called_process_error(msg=msg, error=e) from e + + @typing.overload + def get(self, key: None | Literal[""], *, typed: Literal[False] = False) -> NoReturn: ... + @typing.overload + def get(self, key: str, *, typed: Literal[False] = False) -> str: ... + @typing.overload + def get(self, key: None | Literal[""], *, typed: Literal[True]) -> dict[str, JSONType]: ... + @typing.overload + def get(self, key: str, *, typed: Literal[True]) -> JSONType: ... + def get(self, key: str | None, *, typed: bool = False) -> JSONType | str: """Fetch snap configuration values. Args: @@ -319,7 +438,10 @@ def get(self, key: Optional[str], *, typed: bool = False) -> Any: Default is to return a string. """ if typed: - config = json.loads(self._snap("get", ["-d", key])) + args = ["-d"] + if key: + args.append(key) + config = json.loads(self._snap("get", args)) # json.loads -> Any if key: return config.get(key) return config @@ -327,9 +449,10 @@ def get(self, key: Optional[str], *, typed: bool = False) -> Any: if not key: raise TypeError("Key must be provided when typed=False") + # return a string return self._snap("get", [key]).strip() - def set(self, config: Dict[str, Any], *, typed: bool = False) -> str: + def set(self, config: dict[str, JSONAble], *, typed: bool = False) -> None: """Set a snap configuration value. Args: @@ -337,13 +460,11 @@ def set(self, config: Dict[str, Any], *, typed: bool = False) -> str: typed: set to True to convert all values in the config into typed values while configuring the snap (set with typed=True). Default is not to convert. """ - if typed: - kv = [f"{key}={json.dumps(val)}" for key, val in config.items()] - return self._snap("set", ["-t"] + kv) + if not typed: + config = {k: str(v) for k, v in config.items()} + self._snap_client._put_snap_conf(self._name, config) - return self._snap("set", [f"{key}={val}" for key, val in config.items()]) - - def unset(self, key) -> str: + def unset(self, key: str) -> str: """Unset a snap configuration value. Args: @@ -351,7 +472,7 @@ def unset(self, key) -> str: """ return self._snap("unset", [key]) - def start(self, services: Optional[List[str]] = None, enable: Optional[bool] = False) -> None: + def start(self, services: list[str] | None = None, enable: bool = False) -> None: """Start a snap's services. Args: @@ -361,7 +482,7 @@ def start(self, services: Optional[List[str]] = None, enable: Optional[bool] = F args = ["start", "--enable"] if enable else ["start"] self._snap_daemons(args, services) - def stop(self, services: Optional[List[str]] = None, disable: Optional[bool] = False) -> None: + def stop(self, services: list[str] | None = None, disable: bool = False) -> None: """Stop a snap's services. Args: @@ -371,7 +492,7 @@ def stop(self, services: Optional[List[str]] = None, disable: Optional[bool] = F args = ["stop", "--disable"] if disable else ["stop"] self._snap_daemons(args, services) - def logs(self, services: Optional[List[str]] = None, num_lines: Optional[int] = 10) -> str: + def logs(self, services: list[str] | None = None, num_lines: int = 10) -> str: """Fetch a snap services' logs. Args: @@ -379,12 +500,10 @@ def logs(self, services: Optional[List[str]] = None, num_lines: Optional[int] = (otherwise all) num_lines (int): (optional) integer number of log lines to return. Default `10` """ - args = ["logs", "-n={}".format(num_lines)] if num_lines else ["logs"] + args = ["logs", f"-n={num_lines}"] if num_lines else ["logs"] return self._snap_daemons(args, services).stdout - def connect( - self, plug: str, service: Optional[str] = None, slot: Optional[str] = None - ) -> None: + def connect(self, plug: str, service: str | None = None, slot: str | None = None) -> None: """Connect a plug to a slot. Args: @@ -395,20 +514,23 @@ def connect( Raises: SnapError if there is a problem encountered """ - command = ["connect", "{}:{}".format(self._name, plug)] + command = ["connect", f"{self._name}:{plug}"] if service and slot: - command = command + ["{}:{}".format(service, slot)] + command.append(f"{service}:{slot}") elif slot: - command = command + [slot] + command.append(slot) args = ["snap", *command] try: - subprocess.run(args, universal_newlines=True, check=True, capture_output=True) + with tracer.start_as_current_span(args[0]) as span: + span.set_attribute("argv", args) + subprocess.run(args, text=True, check=True, capture_output=True) except CalledProcessError as e: - raise SnapError("Could not {} for snap [{}]: {}".format(args, self._name, e.stderr)) + msg = f'Snap: {self._name!r} -- command {args!r} failed!' + raise SnapError._from_called_process_error(msg=msg, error=e) from e - def hold(self, duration: Optional[timedelta] = None) -> None: + def hold(self, duration: timedelta | None = None) -> None: """Add a refresh hold to a snap. Args: @@ -424,7 +546,7 @@ def unhold(self) -> None: """Remove the refresh hold of a snap.""" self._snap("refresh", ["--unhold"]) - def alias(self, application: str, alias: Optional[str] = None) -> None: + def alias(self, application: str, alias: str | None = None) -> None: """Create an alias for a given application. Args: @@ -435,17 +557,14 @@ def alias(self, application: str, alias: Optional[str] = None) -> None: alias = application args = ["snap", "alias", f"{self.name}.{application}", alias] try: - subprocess.check_output(args, universal_newlines=True) + with tracer.start_as_current_span(args[0]) as span: + span.set_attribute("argv", args) + subprocess.run(args, text=True, check=True, capture_output=True) except CalledProcessError as e: - raise SnapError( - "Snap: {!r}; command {!r} failed with output = {!r}".format( - self._name, args, e.output - ) - ) + msg = f'Snap: {self._name!r} -- command {args!r} failed!' + raise SnapError._from_called_process_error(msg=msg, error=e) from e - def restart( - self, services: Optional[List[str]] = None, reload: Optional[bool] = False - ) -> None: + def restart(self, services: list[str] | None = None, reload: bool = False) -> None: """Restarts a snap's services. Args: @@ -459,9 +578,9 @@ def restart( def _install( self, - channel: Optional[str] = "", - cohort: Optional[str] = "", - revision: Optional[str] = None, + channel: str = "", + cohort: str = "", + revision: str = "", ) -> None: """Add a snap to the system. @@ -472,27 +591,27 @@ def _install( """ cohort = cohort or self._cohort - args = [] + args: list[str] = [] if self.confinement == "classic": args.append("--classic") if self.confinement == "devmode": args.append("--devmode") if channel: - args.append('--channel="{}"'.format(channel)) + args.append(f'--channel="{channel}"') if revision: - args.append('--revision="{}"'.format(revision)) + args.append(f'--revision="{revision}"') if cohort: - args.append('--cohort="{}"'.format(cohort)) + args.append(f'--cohort="{cohort}"') self._snap("install", args) def _refresh( self, - channel: Optional[str] = "", - cohort: Optional[str] = "", - revision: Optional[str] = None, + channel: str = "", + cohort: str = "", + revision: str = "", devmode: bool = False, - leave_cohort: Optional[bool] = False, + leave_cohort: bool = False, ) -> None: """Refresh a snap. @@ -503,12 +622,15 @@ def _refresh( devmode: optionally, specify devmode confinement leave_cohort: leave the current cohort. """ - args = [] + args: list[str] = [] if channel: - args.append('--channel="{}"'.format(channel)) + args.append(f'--channel="{channel}"') if revision: - args.append('--revision="{}"'.format(revision)) + args.append(f'--revision="{revision}"') + + if self.confinement == 'classic': + args.append('--classic') if devmode: args.append("--devmode") @@ -520,7 +642,7 @@ def _refresh( self._cohort = "" args.append("--leave-cohort") elif cohort: - args.append('--cohort="{}"'.format(cohort)) + args.append(f'--cohort="{cohort}"') self._snap("refresh", args) @@ -536,11 +658,11 @@ def name(self) -> str: def ensure( self, state: SnapState, - classic: Optional[bool] = False, + classic: bool = False, devmode: bool = False, - channel: Optional[str] = "", - cohort: Optional[str] = "", - revision: Optional[str] = None, + channel: str | None = None, + cohort: str | None = None, + revision: str | None = None, ): """Ensure that a snap is in a given state. @@ -558,6 +680,10 @@ def ensure( Raises: SnapError if an error is encountered """ + channel = channel or "" + cohort = cohort or "" + revision = revision or "" + if classic and devmode: raise ValueError("Cannot set both classic and devmode confinement") @@ -580,10 +706,20 @@ def ensure( # We are installing or refreshing a snap. if self._state not in (SnapState.Present, SnapState.Latest): # The snap is not installed, so we install it. + logger.info( + "Installing snap %s, revision %s, tracking %s", self._name, revision, channel + ) self._install(channel, cohort, revision) - else: + logger.info("The snap installation completed successfully") + elif revision is None or revision != self._revision: # The snap is installed, but we are changing it (e.g., switching channels). + logger.info( + "Refreshing snap %s, revision %s, tracking %s", self._name, revision, channel + ) self._refresh(channel=channel, cohort=cohort, revision=revision, devmode=devmode) + logger.info("The snap refresh completed successfully") + else: + logger.info("Refresh of snap %s was unnecessary", self._name) self._update_snap_apps() self._state = state @@ -593,7 +729,7 @@ def _update_snap_apps(self) -> None: try: self._apps = self._snap_client.get_installed_snap_apps(self._name) except SnapAPIError: - logger.debug("Unable to retrieve snap apps for {}".format(self._name)) + logger.debug("Unable to retrieve snap apps for %s", self._name) self._apps = [] @property @@ -641,18 +777,19 @@ def confinement(self) -> str: return self._confinement @property - def apps(self) -> List: + def apps(self) -> list[dict[str, JSONType]]: """Returns (if any) the installed apps of the snap.""" self._update_snap_apps() return self._apps @property - def services(self) -> Dict: + def services(self) -> dict[str, SnapServiceDict]: """Returns (if any) the installed services of the snap.""" self._update_snap_apps() - services = {} + services: dict[str, SnapServiceDict] = {} for app in self._apps: if "daemon" in app: + app = typing.cast("_SnapServiceAppDict", app) services[app["name"]] = SnapService(**app).as_dict() return services @@ -663,11 +800,16 @@ def held(self) -> bool: info = self._snap("info") return "hold:" in info + @property + def version(self) -> str | None: + """Returns the version for a snap.""" + return self._version + class _UnixSocketConnection(http.client.HTTPConnection): """Implementation of HTTPConnection that connects to a named Unix socket.""" - def __init__(self, host, timeout=None, socket_path=None): + def __init__(self, host: str, timeout: float | None = None, socket_path: str | None = None): if timeout is None: super().__init__(host) else: @@ -677,7 +819,8 @@ def __init__(self, host, timeout=None, socket_path=None): def connect(self): """Override connect to use Unix socket (instead of TCP socket).""" if not hasattr(socket, "AF_UNIX"): - raise NotImplementedError("Unix sockets not supported on {}".format(sys.platform)) + raise NotImplementedError(f"Unix sockets not supported on {sys.platform}") + assert self.socket_path is not None # else TypeError on self.socket.connect self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) self.sock.connect(self.socket_path) if self.timeout is not None: @@ -691,9 +834,13 @@ def __init__(self, socket_path: str): super().__init__() self.socket_path = socket_path - def http_open(self, req) -> http.client.HTTPResponse: + def http_open(self, req: urllib.request.Request) -> http.client.HTTPResponse: """Override http_open to use a Unix socket connection (instead of TCP).""" - return self.do_open(_UnixSocketConnection, req, socket_path=self.socket_path) + return self.do_open( + typing.cast("urllib.request._HTTPConnectionProtocol", _UnixSocketConnection), + req, + socket_path=self.socket_path, + ) class SnapClient: @@ -707,7 +854,7 @@ class SnapClient: def __init__( self, socket_path: str = "/run/snapd.socket", - opener: Optional[urllib.request.OpenerDirector] = None, + opener: urllib.request.OpenerDirector | None = None, base_url: str = "http://localhost/v2/", timeout: float = 30.0, ): @@ -716,18 +863,21 @@ def __init__( Args: socket_path: a path to the socket on the filesystem. Defaults to /run/snap/snapd.socket opener: specifies an opener for unix socket, if unspecified a default is used - base_url: base url for making requests to the snap client. Defaults to - http://localhost/v2/ + base_url: base URL for making requests to the snap client. Must be an HTTP(S) URL. + Defaults to http://localhost/v2/ timeout: timeout in seconds to use when making requests to the API. Default is 30.0s. """ if opener is None: opener = self._get_default_opener(socket_path) self.opener = opener + # Address ruff's suspicious-url-open-usage (S310) + if not base_url.startswith(("http:", "https:")): + raise ValueError("base_url must start with 'http:' or 'https:'") self.base_url = base_url self.timeout = timeout @classmethod - def _get_default_opener(cls, socket_path): + def _get_default_opener(cls, socket_path: str) -> urllib.request.OpenerDirector: """Build the default opener to use for requests (HTTP over Unix socket).""" opener = urllib.request.OpenerDirector() opener.add_handler(_UnixSocketHandler(socket_path)) @@ -740,9 +890,9 @@ def _request( self, method: str, path: str, - query: Dict = None, - body: Dict = None, - ) -> JSONType: + query: dict[str, str] | None = None, + body: dict[str, JSONAble] | None = None, + ) -> JSONType | None: """Make a JSON request to the Snapd server with the given HTTP method and path. If query dict is provided, it is encoded and appended as a query string @@ -757,15 +907,42 @@ def _request( headers["Content-Type"] = "application/json" response = self._request_raw(method, path, query, headers, data) - return json.loads(response.read().decode())["result"] + response = json.loads(response.read().decode()) # json.loads -> Any + if response["type"] == "async": + return self._wait(response["change"]) # may be `None` due to `get` + return response["result"] + + def _wait(self, change_id: str, timeout: float = 300) -> JSONType | None: + """Wait for an async change to complete. + + The poll time is 100 milliseconds, the same as in snap clients. + """ + deadline = time.time() + timeout + while True: + if time.time() > deadline: + raise TimeoutError(f"timeout waiting for snap change {change_id}") + response = self._request("GET", f"changes/{change_id}") + response = typing.cast("_AsyncChangeDict", response) + status = response["status"] + if status == "Done": + return response.get("data") + if status == "Doing" or status == "Do": + time.sleep(0.1) + continue + if status == "Wait": + logger.warning("snap change %s succeeded with status 'Wait'", change_id) + return response.get("data") + raise SnapError( + f"snap change {response.get('kind')!r} id {change_id} failed with status {status}" + ) def _request_raw( self, method: str, path: str, - query: Dict = None, - headers: Dict = None, - data: bytes = None, + query: dict[str, str] | None = None, + headers: dict[str, str] | None = None, + data: bytes | None = None, ) -> http.client.HTTPResponse: """Make a request to the Snapd server; return the raw HTTPResponse object.""" url = self.base_url + path @@ -774,7 +951,7 @@ def _request_raw( if headers is None: headers = {} - request = urllib.request.Request(url, method=method, data=data, headers=headers) + request = urllib.request.Request(url, method=method, data=data, headers=headers) # noqa: S310 try: response = self.opener.open(request, timeout=self.timeout) @@ -782,31 +959,41 @@ def _request_raw( code = e.code status = e.reason message = "" + body: dict[str, JSONType] try: - body = json.loads(e.read().decode())["result"] - except (IOError, ValueError, KeyError) as e2: + body = json.loads(e.read().decode())["result"] # json.loads -> Any + except (OSError, ValueError, KeyError) as e2: # Will only happen on read error or if Pebble sends invalid JSON. body = {} - message = "{} - {}".format(type(e2).__name__, e2) - raise SnapAPIError(body, code, status, message) + message = f"{type(e2).__name__} - {e2}" + raise SnapAPIError(body, code, status, message) from e except urllib.error.URLError as e: - raise SnapAPIError({}, 500, "Not found", e.reason) + raise SnapAPIError({}, 500, "Not found", str(e.reason)) from e return response - def get_installed_snaps(self) -> Dict: + def get_installed_snaps(self) -> list[dict[str, JSONType]]: """Get information about currently installed snaps.""" - return self._request("GET", "snaps") + with tracer.start_as_current_span("get_installed_snaps"): + return self._request("GET", "snaps") # type: ignore - def get_snap_information(self, name: str) -> Dict: + def get_snap_information(self, name: str) -> dict[str, JSONType]: """Query the snap server for information about single snap.""" - return self._request("GET", "find", {"name": name})[0] + with tracer.start_as_current_span("get_snap_information") as span: + span.set_attribute("name", name) + return self._request("GET", "find", {"name": name})[0] # type: ignore - def get_installed_snap_apps(self, name: str) -> List: + def get_installed_snap_apps(self, name: str) -> list[dict[str, JSONType]]: """Query the snap server for apps belonging to a named, currently installed snap.""" - return self._request("GET", "apps", {"names": name, "select": "service"}) + with tracer.start_as_current_span("get_installed_snap_apps") as span: + span.set_attribute("name", name) + return self._request("GET", "apps", {"names": name, "select": "service"}) # type: ignore + def _put_snap_conf(self, name: str, conf: dict[str, JSONAble]) -> None: + """Set the configuration details for an installed snap.""" + self._request("PUT", f"snaps/{name}/conf", body=conf) -class SnapCache(Mapping): + +class SnapCache(Mapping[str, Snap]): """An abstraction to represent installed/available packages. When instantiated, `SnapCache` iterates through the list of installed @@ -819,12 +1006,12 @@ def __init__(self): if not self.snapd_installed: raise SnapError("snapd is not installed or not in /usr/bin") from None self._snap_client = SnapClient() - self._snap_map = {} + self._snap_map: dict[str, Snap | None] = {} if self.snapd_installed: self._load_available_snaps() self._load_installed_snaps() - def __contains__(self, key: str) -> bool: + def __contains__(self, key: object) -> bool: """Check if a given snap is in the cache.""" return key in self._snap_map @@ -832,26 +1019,26 @@ def __len__(self) -> int: """Report number of items in the snap cache.""" return len(self._snap_map) - def __iter__(self) -> Iterable["Snap"]: + def __iter__(self) -> Iterable[Snap | None]: # pyright: ignore[reportIncompatibleMethodOverride] """Provide iterator for the snap cache.""" return iter(self._snap_map.values()) def __getitem__(self, snap_name: str) -> Snap: """Return either the installed version or latest version for a given snap.""" - snap = self._snap_map.get(snap_name, None) - if snap is None: - # The snapd cache file may not have existed when _snap_map was - # populated. This is normal. - try: - self._snap_map[snap_name] = self._load_info(snap_name) - except SnapAPIError: - raise SnapNotFoundError("Snap '{}' not found!".format(snap_name)) - - return self._snap_map[snap_name] + snap = self._snap_map.get(snap_name) + if snap is not None: + return snap + # The snapd cache file may not have existed when _snap_map was + # populated. This is normal. + try: + snap = self._snap_map[snap_name] = self._load_info(snap_name) + except SnapAPIError as e: + raise SnapNotFoundError(f"Snap '{snap_name}' not found!") from e + return snap @property def snapd_installed(self) -> bool: - """Check whether snapd has been installled on the system.""" + """Check whether snapd has been installed on the system.""" return os.path.isfile("/usr/bin/snap") def _load_available_snaps(self) -> None: @@ -865,7 +1052,7 @@ def _load_available_snaps(self) -> None: # currently exist. return - with open("/var/cache/snapd/names", "r") as f: + with open("/var/cache/snapd/names") as f: for line in f: if line.strip(): self._snap_map[line.strip()] = None @@ -875,23 +1062,26 @@ def _load_installed_snaps(self) -> None: installed = self._snap_client.get_installed_snaps() for i in installed: + i = typing.cast("_SnapDict", i) snap = Snap( name=i["name"], state=SnapState.Latest, channel=i["channel"], revision=i["revision"], confinement=i["confinement"], - apps=i.get("apps", None), + apps=i.get("apps"), + version=i.get("version"), ) self._snap_map[snap.name] = snap - def _load_info(self, name) -> Snap: + def _load_info(self, name: str) -> Snap: """Load info for snaps which are not installed if requested. Args: name: a string representing the name of the snap """ info = self._snap_client.get_snap_information(name) + info = typing.cast("_SnapDict", info) return Snap( name=info["name"], @@ -900,19 +1090,40 @@ def _load_info(self, name) -> Snap: revision=info["revision"], confinement=info["confinement"], apps=None, + version=info.get("version"), ) +@typing.overload +def add( # return a single Snap if snap name is given as a string + snap_names: str, + state: str | SnapState = SnapState.Latest, + channel: str | None = None, + classic: bool = False, + devmode: bool = False, + cohort: str | None = None, + revision: str | None = None, +) -> Snap: ... +@typing.overload +def add( # may return a single Snap or a list depending if one or more snap names were given + snap_names: list[str], + state: str | SnapState = SnapState.Latest, + channel: str | None = None, + classic: bool = False, + devmode: bool = False, + cohort: str | None = None, + revision: str | None = None, +) -> Snap | list[Snap]: ... @_cache_init def add( - snap_names: Union[str, List[str]], - state: Union[str, SnapState] = SnapState.Latest, - channel: Optional[str] = "", - classic: Optional[bool] = False, + snap_names: str | list[str], + state: str | SnapState = SnapState.Latest, + channel: str | None = None, + classic: bool = False, devmode: bool = False, - cohort: Optional[str] = "", - revision: Optional[str] = None, -) -> Union[Snap, List[Snap]]: + cohort: str | None = None, + revision: str | None = None, +) -> Snap | list[Snap]: """Add a snap to the system. Args: @@ -940,11 +1151,25 @@ def add( if isinstance(state, str): state = SnapState(state) - return _wrap_snap_operations(snap_names, state, channel, classic, devmode, cohort, revision) + return _wrap_snap_operations( + snap_names=snap_names, + state=state, + channel=channel or "", + classic=classic, + devmode=devmode, + cohort=cohort or "", + revision=revision or "", + ) +@typing.overload +def remove(snap_names: str) -> Snap: ... +# return a single Snap if snap name is given as a string +@typing.overload +def remove(snap_names: list[str]) -> Snap | list[Snap]: ... +# may return a single Snap or a list depending if one or more snap names were given @_cache_init -def remove(snap_names: Union[str, List[str]]) -> Union[Snap, List[Snap]]: +def remove(snap_names: str | list[str]) -> Snap | list[Snap]: """Remove specified snap(s) from the system. Args: @@ -965,16 +1190,36 @@ def remove(snap_names: Union[str, List[str]]) -> Union[Snap, List[Snap]]: ) +@typing.overload +def ensure( # return a single Snap if snap name is given as a string + snap_names: str, + state: str, + channel: str | None = None, + classic: bool = False, + devmode: bool = False, + cohort: str | None = None, + revision: int | None = None, +) -> Snap: ... +@typing.overload +def ensure( # may return a single Snap or a list depending if one or more snap names were given + snap_names: list[str], + state: str, + channel: str | None = None, + classic: bool = False, + devmode: bool = False, + cohort: str | None = None, + revision: int | None = None, +) -> Snap | list[Snap]: ... @_cache_init def ensure( - snap_names: Union[str, List[str]], + snap_names: str | list[str], state: str, - channel: Optional[str] = "", - classic: Optional[bool] = False, + channel: str | None = None, + classic: bool = False, devmode: bool = False, - cohort: Optional[str] = "", - revision: Optional[int] = None, -) -> Union[Snap, List[Snap]]: + cohort: str | None = None, + revision: int | None = None, +) -> Snap | list[Snap]: """Ensure specified snaps are in a given state on the system. Args: @@ -1005,23 +1250,24 @@ def ensure( classic=classic, devmode=devmode, cohort=cohort, - revision=revision, + revision=str(revision) if revision is not None else None, ) else: return remove(snap_names) def _wrap_snap_operations( - snap_names: List[str], + snap_names: list[str], state: SnapState, channel: str, classic: bool, devmode: bool, - cohort: Optional[str] = "", - revision: Optional[str] = None, -) -> Union[Snap, List[Snap]]: + cohort: str = "", + revision: str = "", +) -> Snap | list[Snap]: """Wrap common operations for bare commands.""" - snaps = {"success": [], "failed": []} + snaps: list[Snap] = [] + errors: list[str] = [] op = "remove" if state is SnapState.Absent else "install or refresh" @@ -1039,27 +1285,25 @@ def _wrap_snap_operations( cohort=cohort, revision=revision, ) - snaps["success"].append(snap) - except SnapError as e: - logger.warning("Failed to {} snap {}: {}!".format(op, s, e.message)) - snaps["failed"].append(s) + snaps.append(snap) + except SnapError as e: # noqa: PERF203 + logger.warning("Failed to %s snap %s: %s!", op, s, e.message) + errors.append(s) except SnapNotFoundError: - logger.warning("Snap '{}' not found in cache!".format(s)) - snaps["failed"].append(s) + logger.warning("Snap '%s' not found in cache!", s) + errors.append(s) - if len(snaps["failed"]): - raise SnapError( - "Failed to install or refresh snap(s): {}".format(", ".join(list(snaps["failed"]))) - ) + if errors: + raise SnapError(f"Failed to install or refresh snap(s): {', '.join(errors)}") - return snaps["success"] if len(snaps["success"]) > 1 else snaps["success"][0] + return snaps if len(snaps) > 1 else snaps[0] def install_local( filename: str, - classic: Optional[bool] = False, - devmode: Optional[bool] = False, - dangerous: Optional[bool] = False, + classic: bool = False, + devmode: bool = False, + dangerous: bool = False, ) -> Snap: """Perform a snap operation. @@ -1084,7 +1328,13 @@ def install_local( if dangerous: args.append("--dangerous") try: - result = subprocess.check_output(args, universal_newlines=True).splitlines()[-1] + with tracer.start_as_current_span(args[0]) as span: + span.set_attribute("argv", args) + result = subprocess.check_output( + args, + text=True, + stderr=subprocess.PIPE, + ).splitlines()[-1] snap_name, _ = result.split(" ", 1) snap_name = ansi_filter.sub("", snap_name) @@ -1094,11 +1344,14 @@ def install_local( return c[snap_name] except SnapAPIError as e: logger.error( - "Could not find snap {} when querying Snapd socket: {}".format(snap_name, e.body) + "Could not find snap %s when querying Snapd socket: %s", + snap_name, + e.body, ) - raise SnapError("Failed to find snap {} in Snap cache".format(snap_name)) + raise SnapError(f"Failed to find snap {snap_name} in Snap cache") from e except CalledProcessError as e: - raise SnapError("Could not install snap {}: {}".format(filename, e.output)) + msg = f'Cound not install snap {filename}!' + raise SnapError._from_called_process_error(msg=msg, error=e) from e def _system_set(config_item: str, value: str) -> None: @@ -1108,14 +1361,17 @@ def _system_set(config_item: str, value: str) -> None: config_item: name of snap system setting. E.g. 'refresh.hold' value: value to assign """ - args = ["snap", "set", "system", "{}={}".format(config_item, value)] + args = ["snap", "set", "system", f"{config_item}={value}"] try: - subprocess.check_call(args, universal_newlines=True) - except CalledProcessError: - raise SnapError("Failed setting system config '{}' to '{}'".format(config_item, value)) + with tracer.start_as_current_span(args[0]) as span: + span.set_attribute("argv", args) + subprocess.run(args, text=True, check=True, capture_output=True) + except CalledProcessError as e: + msg = f"Failed setting system config '{config_item}' to '{value}'" + raise SnapError._from_called_process_error(msg=msg, error=e) from e -def hold_refresh(days: int = 90, forever: bool = False) -> bool: +def hold_refresh(days: int = 90, forever: bool = False) -> None: """Set the system-wide snap refresh hold. Args: @@ -1141,7 +1397,7 @@ def hold_refresh(days: int = 90, forever: bool = False) -> bool: # Format for the correct datetime format hold_date = target_date.strftime("%Y-%m-%dT%H:%M:%S%z") # Python dumps the offset in format '+0100', we need '+01:00' - hold_date = "{0}:{1}".format(hold_date[:-2], hold_date[-2:]) + hold_date = f"{hold_date[:-2]}:{hold_date[-2:]}" # Actually set the hold date _system_set("refresh.hold", hold_date) logger.info("Set system-wide snap refresh hold to: %s", hold_date) diff --git a/scripts/update-charm-libs.sh b/scripts/update-charm-libs.sh deleted file mode 100755 index 559e052..0000000 --- a/scripts/update-charm-libs.sh +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/sh - -# Update charm libraries that are managed with charmcraft. - -ROOT_DIR="$(dirname $0)/.." - -cd $ROOT_DIR - -charmcraft fetch-lib charms.operator_libs_linux.v2.snap -charmcraft fetch-lib charms.grafana_agent.v0.cos_agent diff --git a/tests/functional/tests/bundles/jammy-yoga.yaml b/tests/functional/tests/bundles/jammy-yoga.yaml index 7e33546..e04a418 100644 --- a/tests/functional/tests/bundles/jammy-yoga.yaml +++ b/tests/functional/tests/bundles/jammy-yoga.yaml @@ -6,9 +6,9 @@ applications: openstack-exporter: charm: openstack-exporter # To be overridden by overlay num_units: 1 - grafana-agent: - charm: grafana-agent - channel: stable + opentelemetry-collector: + charm: opentelemetry-collector + channel: 2/stable keystone: charm: keystone channel: yoga/edge @@ -201,5 +201,5 @@ relations: - vault:certificates - - openstack-exporter:credentials - keystone:identity-admin -- - grafana-agent:cos-agent +- - opentelemetry-collector:cos-agent - openstack-exporter:cos-agent diff --git a/tests/functional/tests/charm_tests/openstack_exporter.py b/tests/functional/tests/charm_tests/openstack_exporter.py index 7c80380..0db4891 100644 --- a/tests/functional/tests/charm_tests/openstack_exporter.py +++ b/tests/functional/tests/charm_tests/openstack_exporter.py @@ -211,24 +211,26 @@ def test_keystone_relation_changed(self): ) self.assertEqual(self.leader_unit.workload_status_message, "") - def test_grafana_agent_relation_changed(self): - """Test grafana-agent relation changed will reach expected status. + def test_otel_relation_changed(self): + """Test opentelemetry-collector relation changed will reach expected status. - Test the expected charm status when removing grafana-agent relation and - adding grafana-agent relation back. + Test the expected charm status when removing opentelemetry-collector relation and + adding opentelemetry-collector relation back. """ - # Remove grafana-agent relation - model.remove_relation(APP_NAME, "cos-agent", "grafana-agent:cos-agent") + # Remove opentelemetry-collector relation + model.remove_relation(APP_NAME, "cos-agent", "opentelemetry-collector:cos-agent") model.block_until_unit_wl_status( self.leader_unit_entity_id, "blocked", timeout=STATUS_TIMEOUT ) - self.assertEqual(self.leader_unit.workload_status_message, "Grafana Agent is not related") + self.assertEqual( + self.leader_unit.workload_status_message, "Opentelemetry Collector is not related" + ) # Be patient: wait until the relation is completely removed model.block_until_all_units_idle() - # Add back grafan-agent relation - model.add_relation(APP_NAME, "cos-agent", "grafana-agent:cos-agent") + # Add back opentelemetry-collector relation + model.add_relation(APP_NAME, "cos-agent", "opentelemetry-collector:cos-agent") model.block_until_unit_wl_status( self.leader_unit_entity_id, "active", timeout=STATUS_TIMEOUT ) diff --git a/tests/functional/tests/tests.yaml b/tests/functional/tests/tests.yaml index 07515f9..8075e0e 100644 --- a/tests/functional/tests/tests.yaml +++ b/tests/functional/tests/tests.yaml @@ -26,6 +26,6 @@ target_deploy_status: ovn-central: workload-status: waiting workload-status-message-prefix: "'ovsdb-peer' incomplete, 'certificates' awaiting server certificate data" - grafana-agent: + opentelemetry-collector: workload-status: blocked workload-status-message-regex: "^Missing .* for cos-agent$" diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index be1424f..9c084d2 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -154,7 +154,10 @@ def test_install_snap_error(self, mock_install): # Scenario 4: With upstream snap present ( {}, # Default config - [("credentials", "keystone"), ("cos-agent", "grafana-agent")], + [ + ("credentials", "keystone"), + ("cos-agent", "openstack-exporter"), + ], # Both relations { "keystone_data": {"random": "data"}, "upstream_present": True, # Upstream snap present @@ -170,7 +173,7 @@ def test_install_snap_error(self, mock_install): # Scenario 5: Non-default snap channel with resource ( {"snap_channel": "latest/edge"}, - [("credentials", "keystone"), ("cos-agent", "grafana-agent")], + [("credentials", "keystone"), ("cos-agent", "opentelemetry-collector")], { "keystone_data": {"random": "data"}, "upstream_present": False, @@ -186,7 +189,7 @@ def test_install_snap_error(self, mock_install): # Scenario 6: Snap not installed ( {}, # Default config - [("credentials", "keystone"), ("cos-agent", "grafana-agent")], + [("credentials", "keystone"), ("cos-agent", "opentelemetry-collector")], { "keystone_data": {"random": "data"}, "upstream_present": False, @@ -203,7 +206,7 @@ def test_install_snap_error(self, mock_install): # Scenario 7: Snap installed but service not active ( {}, # Default config - [("credentials", "keystone"), ("cos-agent", "grafana-agent")], + [("credentials", "keystone"), ("cos-agent", "opentelemetry-collector")], { "keystone_data": {"random": "data"}, "upstream_present": False, @@ -220,7 +223,7 @@ def test_install_snap_error(self, mock_install): # Scenario 8: Everything ok ( {"snap_channel": "latest/stable"}, # Default snap channel - [("credentials", "keystone"), ("cos-agent", "grafana-agent")], + [("credentials", "keystone"), ("cos-agent", "opentelemetry-collector")], { "keystone_data": {"random": "data"}, "upstream_present": False, @@ -492,7 +495,7 @@ def test_snap_service_with_no_keystone_data(self, _, __, mock_get_installed_snap mock_upstream_service if snap == UPSTREAM_SNAP else mock_snap_service ) self.harness.begin() - self.harness.add_relation("cos-agent", "grafana-agent") + self.harness.add_relation("cos-agent", "opentelemetry-collector") self.harness.charm._configure(mock.MagicMock()) mock_snap_service.stop.assert_called_once() @@ -512,7 +515,7 @@ def test_snap_service_with_keystone_data(self, _, __, mock_get_installed_snap_se mock_upstream_service if snap == UPSTREAM_SNAP else mock_snap_service ) self.harness.begin() - self.harness.add_relation("cos-agent", "grafana-agent") + self.harness.add_relation("cos-agent", "opentelemetry-collector") # Mock _write_cloud_config to avoid real file operations with mock.patch.object(self.harness.charm, "_write_cloud_config"): From 5480651c410feb69a6212a0d2a34e8bb16d6efc9 Mon Sep 17 00:00:00 2001 From: Gabriel Cocenza Date: Thu, 26 Feb 2026 09:45:08 -0300 Subject: [PATCH 2/2] - update cos-agent lib --- lib/charms/grafana_agent/v0/cos_agent.py | 45 ++++++++++++++++++------ 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/lib/charms/grafana_agent/v0/cos_agent.py b/lib/charms/grafana_agent/v0/cos_agent.py index 228550a..d394420 100644 --- a/lib/charms/grafana_agent/v0/cos_agent.py +++ b/lib/charms/grafana_agent/v0/cos_agent.py @@ -211,7 +211,9 @@ def __init__(self, *args): ``` """ +import copy import enum +import hashlib import json import logging import socket @@ -254,7 +256,7 @@ class _MetricsEndpointDict(TypedDict): LIBID = "dc15fa84cef84ce58155fb84f6c6213a" LIBAPI = 0 -LIBPATCH = 24 +LIBPATCH = 25 PYDEPS = ["cosl >= 0.0.50", "pydantic"] @@ -308,6 +310,13 @@ def _dedupe_list(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: return unique_items +def _dict_hash_except_key(scrape_config: Dict[str, Any], key: Optional[str]): + """Get a hash of the scrape_config dict, except for the specified key.""" + cfg_for_hash = {k: v for k, v in scrape_config.items() if k != key} + serialized = json.dumps(cfg_for_hash, sort_keys=True) + return hashlib.blake2b(serialized.encode(), digest_size=4).hexdigest() + + class TracingError(Exception): """Base class for custom errors raised by tracing.""" @@ -697,6 +706,27 @@ def _on_refresh(self, event): ) as e: logger.error("Invalid relation data provided: %s", e) + def _deterministic_scrape_configs( + self, scrape_configs: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: + """Get deterministic scrape_configs with stable job names. + + For stability across serializations, compute a short per-config hash + and append it to the existing job name (or 'default'). Keep the app + name as a prefix: __<8hex-hash>. + + Hash the whole scrape_config (except any existing job_name) so the + suffix is sensitive to all stable fields. Use deterministic JSON + serialization. + """ + local_scrape_configs = copy.deepcopy(scrape_configs) + for scrape_config in local_scrape_configs: + name = scrape_config.get("job_name", "default") + short_id = _dict_hash_except_key(scrape_config, "job_name") + scrape_config["job_name"] = f"{self._charm.app.name}_{name}_{short_id}" + + return sorted(local_scrape_configs, key=lambda c: c.get("job_name", "")) + @property def _scrape_jobs(self) -> List[Dict]: """Return a list of scrape_configs. @@ -711,22 +741,17 @@ def _scrape_jobs(self) -> List[Dict]: scrape_configs = self._scrape_configs.copy() # Convert "metrics_endpoints" to standard scrape_configs, and add them in - unit_name = self._charm.unit.name.replace("/", "_") for endpoint in self._metrics_endpoints: - port = endpoint["port"] - path = endpoint["path"] - sanitized_path = path.strip("/").replace("/", "_") scrape_configs.append( { - "job_name": f"{unit_name}_localhost_{port}_{sanitized_path}", - "metrics_path": path, - "static_configs": [{"targets": [f"localhost:{port}"]}], + "metrics_path": endpoint["path"], + "static_configs": [{"targets": [f"localhost:{endpoint['port']}"]}], } ) scrape_configs = scrape_configs or [] - return scrape_configs + return self._deterministic_scrape_configs(scrape_configs) @property def _metrics_alert_rules(self) -> Dict: @@ -742,7 +767,7 @@ def _metrics_alert_rules(self) -> Dict: ) alert_rules.add_path(self._metrics_rules, recursive=self._recursive) alert_rules.add( - generic_alert_groups.application_rules, + copy.deepcopy(generic_alert_groups.application_rules), group_name_prefix=JujuTopology.from_charm(self._charm).identifier, )