diff --git a/docs/agents/host_manager.rst b/docs/agents/host_manager.rst index 51d8f4b6..21216841 100644 --- a/docs/agents/host_manager.rst +++ b/docs/agents/host_manager.rst @@ -58,6 +58,4 @@ Agent API Supporting APIs --------------- -.. automethod:: ocs.agents.host_manager.agent.HostManager._process_target_states - .. automethod:: ocs.agents.host_manager.agent.HostManager._reload_config diff --git a/docs/user/centralized_management.rst b/docs/user/centralized_management.rst index ad74fe66..d7e4f28e 100644 --- a/docs/user/centralized_management.rst +++ b/docs/user/centralized_management.rst @@ -98,8 +98,7 @@ block would become: 'arguments': [['--serial-number', 'LSA22BB'], ['--mode', 'acq']]}, {'agent-class': 'HostManager', - 'instance-id': 'hm-host-1', - 'arguments': [['--initial-state', 'up']], + 'instance-id': 'hm-host-1'}, }, ] } @@ -205,9 +204,14 @@ in ocsbow or ocs-web, will show up as simply "[docker]". Advanced host config ~~~~~~~~~~~~~~~~~~~~ -In some cases you might want to temporarily exclude an agent from -HostManager control. You can do this by setting ``'manage': -'no'``. +The ``manage`` setting in the instance description can be used to +fine-tune the treatment of each Agent instance by HostManager. For +example, to exclude an instance from HostManager tracking and control, +specify ``'manage': 'ignore'``. It is also possible to specify that +certain instances should not be started automatically (for example +``"host/down"`` or ``"docker/down"``). For information on the +available settings for "manage", see the description in +:meth:`ocs.site_config.InstanceConfig.from_dict`. It is possible to mix host- and docker-based agents in a single host config block, and control them all with a single HostManager instance. @@ -291,18 +295,28 @@ The agent in host-1-docker has the annotation [d] beside its class name, indicating this is an agent managed through a docker container. (The docker service name, in this example, would be ocs-LSARR00.) -A managed docker container that has not been associated with a -specific agent will show up with agent-class "[docker]" and an -instance-id corresponding to the service name; for example:: +If an Agent has been configured with ``'manage': 'ignore'``, it will +be marked with suffix ``[unman]`` and will have question marks in the +state and target fields, e.g.:: - [instance-id] [agent-class] [state] [target] - influxdb [docker] up up + [instance-id] [agent-class] [state] [target] + registry RegistryAgent[unman] ? ? -Note that if an Agent has been configured with ``'manage': 'no'``, it -will show with question marks in the state and target fields, e.g.:: +If the SCF seen by ocsbow and the information in HostManager are not +in agreement, then the agent-class will include two values, connected +with a slash. For example, if the local SCF expects the instance to +be managed through docker, but the HostManager reports it running on +the host, then the line might look like this:: - [instance-id] [agent-class] [state] [target] - registry RegistryAgent ? ? + [instance-id] [agent-class] [state] [target] + LSARR00 Lakeshore372Agent[d]/Lakeshore372Agent up up + +A managed docker container that has not been associated with a +specific instance will show up with agent-class "?/[docker]" and an +instance-id corresponding to the service name. For example:: + + [instance-id] [agent-class] [state] [target] + influxdb ?/[docker] up up ``state`` and ``target`` @@ -312,7 +326,9 @@ The ``state`` column shows whether the Agent is currently running (``up``) or not (``down``). This column may also show the value ``unstable``, which indicates that an Agent keeps restarting (this usually indicates a code, configuration, or hardware error that is -causing the agent to crash shortly after start-up). +causing the agent to crash shortly after start-up). The value may +also be ``?``, indicating that the agent is marked to be run through +Docker, but no corresponding docker service has been identified. For the non-HostManager agents, the ``target`` column shows the state that HostManager will try to achieve for that Agent. So if diff --git a/docs/user/quickstart.rst b/docs/user/quickstart.rst index 3886a0a9..71538fc4 100644 --- a/docs/user/quickstart.rst +++ b/docs/user/quickstart.rst @@ -67,8 +67,7 @@ structure. # on the host 'agent-instances': [ {'agent-class': 'HostManager', - 'instance-id': 'hm-1', - 'arguments': ['--initial-state', 'up']}, + 'instance-id': 'hm-1'}, ] } diff --git a/ocs/agents/host_manager/agent.py b/ocs/agents/host_manager/agent.py index dc5bb650..fc7bd0f2 100644 --- a/ocs/agents/host_manager/agent.py +++ b/ocs/agents/host_manager/agent.py @@ -14,6 +14,10 @@ VALID_TARGETS = ['up', 'down'] +# "agent_class" value used for docker services that do not seem to +# have a corresponding SCF entry. +NONAGENT_DOCKER = '[docker]' + class HostManager: """ @@ -28,7 +32,6 @@ def __init__(self, agent, docker_composes=[], docker_compose_bin=None, self.agent = agent self.running = False self.database = {} # key is instance_id (or docker service name). - self.docker_services = {} # key is service name. self.docker_composes = docker_composes self.docker_compose_bin = docker_compose_bin self.docker_service_prefix = docker_service_prefix @@ -39,103 +42,185 @@ def _get_local_instances(self): instances. Returns: + success (bool): True if config was successfully scanned. + False otherwise, indiating perhaps we should go into a bit + of a lock down while operator sorts that out. agent_dict (dict): Maps instance-id to a dict describing the agent config. The config is as contained in HostConfig.instances but where 'instance-id', 'agent-class', and 'manage' are all guaranteed populated - (and manage is one of ['yes', 'no', 'docker']). + (and manage is a valid full description, e.g. "host/down"). warnings: A list of strings, each of which corresponds to some problem found in the config. """ + warnings = [] + instances = {} + # Load site config file. - site, hc, _ = site_config.get_config( - self.agent.site_args, '*host*') - self.site_config_file = site.source_file - self.host_name = hc.name - self.working_dir = hc.working_dir + try: + site, hc, _ = site_config.get_config( + self.agent.site_args, '*host*') + self.site_config_file = site.source_file + self.host_name = hc.name + self.working_dir = hc.working_dir + except Exception as e: + warnings.append('Failed to read site config file -- ' + f'likely syntax error: {e}') + return returnValue((False, instances, warnings)) # Scan for agent scripts in (deprecated) script registry - for p in hc.agent_paths: - if p not in sys.path: - sys.path.append(p) - site_config.scan_for_agents() + try: + for p in hc.agent_paths: + if p not in sys.path: + sys.path.append(p) + site_config.scan_for_agents() + except Exception as e: + warnings.append('Failed to scan for old plugin agents -- ' + f'likely plugin config problem: {e}') + return returnValue((False, instances, warnings)) # Gather managed items from site config. - warnings = [] - instances = {} for inst in hc.instances: if inst['instance-id'] in instances: warnings.append( f'Configuration problem, instance-id={inst["instance-id"]} ' f'has multiple entries. Ignoring repeats.') continue - # Make sure 'manage' is set, and valid. - default_manage = 'no' \ - if inst['agent-class'] == 'HostManager' else 'yes' - inst['manage'] = inst.get('manage', default_manage) - if inst['manage'] not in ['yes', 'no', 'docker']: - warnings.append( - f'Configuration problem, invalid manage={inst["manage"]} ' - f'for instance_id={inst["instance-id"]}.') - continue + if inst['agent-class'] == 'HostManager': + inst['manage'] = 'ignore' + else: + # Make sure 'manage' is set, and valid. + inst['manage'] = inst.get('manage', None) + try: + inst['manage'] = site_config.InstanceConfig._MANAGE_MAP[inst['manage']] + except KeyError: + warnings.append( + f'Configuration problem, invalid manage={inst["manage"]} ' + f'for instance_id={inst["instance-id"]}.') + continue instances[inst['instance-id']] = inst - returnValue((instances, warnings)) + returnValue((True, instances, warnings)) yield @inlineCallbacks - def _update_docker_services(self): - """Parse the docker-compose.yaml files and update the internal cache - of docker service information. + def _update_docker_services(self, session): + """Parse the docker-compose.yaml files and return the current + status of all services. For any services matching + self.database entries, the corresponding DockerContainerHelper + is updated with the new info. Returns: - dead (dict): a dict of service entries that were removed - from self.docker_services because they are nolonger - configured in any compose file. + docker_services (dict): state information for all detected + services, keyed by the service name. """ # Read services from all docker-compose files. docker_services = {} for compose in self.docker_composes: - services = yield hm_utils.parse_docker_state( - compose, docker_compose_bin=self.docker_compose_bin) - docker_services.update(services) - - # Mark containers that have disappeared. - dead = {} - for k in self.docker_services: - if k not in docker_services: - dead[k] = self.docker_services.pop(k) + try: + services = yield hm_utils.parse_docker_state( + compose, docker_compose_bin=self.docker_compose_bin) + this_ok = True + this_msg = f'Successfully parsed {compose} and its service states.' + except Exception as e: + this_ok = False + this_msg = (f'Failed to interpret {compose} and/or ' + f'its service states: {e}') + + # Don't issue the same complaint more than once per minute or so + compose_was_ok, timestamp, last_msg = self.config_parse_status.get( + compose, (False, 0, '')) + if (this_ok != compose_was_ok) \ + or (not this_ok and time.time() - timestamp > 60) \ + or (not this_ok and this_msg != last_msg): + session.add_message(this_msg) + self.config_parse_status[compose] = (this_ok, time.time(), this_msg) + + if this_ok: + docker_services.update(services) + + # Update all docker things in the database. + retirees = [] + assigned_services = [] + for key, instance in self.database.items(): + if instance['management'] != 'docker': + continue - # Everything else is good. - self.docker_services.update(docker_services) - returnValue(dead) + service_name = instance['agent_script'] + service_data = docker_services.get(service_name) + assigned_services.append(service_name) + + prot = instance.get('prot') + if prot is None: + if service_data is not None: + # Create a prot entry with the service info. + instance['prot'] = hm_utils.DockerContainerHelper(service_data) + instance['operable'] = True + if instance['agent_class'] != NONAGENT_DOCKER: + instance['agent_class'] = _clsname_tool(instance['agent_class'], '[d]') + else: + if service_data is not None: + prot.update(service_data) + else: + # service_data is missing, but there used to be a + # service there. Close it out. + instance['prot'] = None + instance['operable'] = False + if instance['agent_class'] == NONAGENT_DOCKER: + session.add_message(f'Deleting non-agent service {key}') + retirees.append(key) + else: + session.add_message(f'Marking missing service for {key}') + instance['agent_class'] = _clsname_tool(instance['agent_class'], '[d?]') + + # If a non-agent [docker] service has disappeared, there's no + # reason to show it in a list, and no persistent state / + # operations to worry about. So just delete it. + for r in retirees: + self.database.pop(r) + + # Create entries for any new un-matched docker services. + unassigned_services = set(docker_services.keys()) \ + .difference(assigned_services) + for srv in unassigned_services: + instance = hm_utils.ManagedInstance.init( + management='docker', + instance_id=srv, + agent_class=NONAGENT_DOCKER, + full_name=(f'[docker]:{srv}')) + instance.update({ + 'agent_script': srv, + 'operable': True, + }) + service_data = docker_services[srv] + instance['prot'] = hm_utils.DockerContainerHelper(service_data) + + self.database[srv] = instance + # If it's up, leave it up. + if service_data['running']: + instance['target_state'] = 'up' + instance['next_action'] = 'up' + + returnValue(docker_services) @inlineCallbacks def _reload_config(self, session): - """ - Notes: - First, the site config file is parsed and used to update the - internal database of child instances. Any previously - unknown child Agent is added to the internal tracking - database, and assigned a target_state of "down". Any - previously known child Agent instance is not modified in the - tracking database (unless a specific request is given, - through ``requests``). If any child Agent instances in the - internal database appear to have been removed from the site - config, then they are set to have target_state "down" and - will be deleted from the database when that state is - reached. - """ - # Parse the site config and compose files. - agent_dict, warnings = yield self._get_local_instances() - for w in warnings: - session.add_message(w) - yield self._update_docker_services() + """This helper function is called by both the ``manager`` + Process at startup, and the ``update`` Task. - # Get agent class list from modern plugin system. - agent_plugins = agent_cli.build_agent_list() + The Site Config File is parsed and used to update the internal + database of child instances. Any previously unknown child + Agent is added to the internal tracking database, and assigned + whatever target state is specified for that instance. Any + previously known child Agent instance is not modified. + If any child Agent instances in the internal database appear + to have been removed from the SCF, then they are set to have + target_state "down" and will be deleted from the database when + that state is reached. + + """ def retire(db_key): instance = self.database.get(db_key, None) if instance is None: @@ -144,141 +229,115 @@ def retire(db_key): instance['at'] = time.time() instance['target_state'] = 'down' - # First identify items that we were managing that have - # disappeared from the configs. + def _full_name(cls, iid): + if cls != NONAGENT_DOCKER: + cls, _ = _clsname_tool(cls) + return f'{cls}:{iid}' + + def same_base_class(a, b): + return _clsname_tool(a)[0] == _clsname_tool(b)[0] + + # Parse the site config. + parse_ok, agent_dict, warnings = yield self._get_local_instances() + for w in warnings: + session.add_message(w) + + self.config_parse_status['[SCF]'] = (parse_ok, time.time(), ''.join(warnings)) + if not parse_ok: + return warnings + + # Any agents in the database that are not listed in the latest + # agent_dict should be immediately retired. That includes + # things that are suddenly marked as manage=no. Ignore docker + # non-agents. for iid, instance in self.database.items(): - if (instance['management'] == 'host' - and iid not in agent_dict) or \ - (instance['management'] == 'docker' - and instance['agent_script'] not in self.docker_services): - # Sheesh + if instance['agent_class'] != NONAGENT_DOCKER and ( + iid not in agent_dict or agent_dict[iid].get('manage') == 'ignore'): session.add_message( f'Retiring {instance["full_name"]}, which has disappeared from ' - f'configuration file(s) or have manage:no.') + f'configuration file(s) or has manage:no.') retire(iid) - # We have three kinds of managed things: - # - agents managed on the host system (iid only) - # - agents managed through docker (iid & srv) - # - non-agents managed through docker (srv only) - # - # Make a list of items we need to manage, including all three - # kinds of thing. Store tuples: - # (db_key, instance_id, service_name, agent_class, management) - new_managed = [] - docker_nonagents = list(self.docker_services.keys()) - + # Create / update entries for every agent in the host + # description, unless it is explicitly marked as ignore. for iid, hinst in agent_dict.items(): - srv = self.docker_service_prefix + iid + if hinst['manage'] == 'ignore': + continue + cls = hinst['agent-class'] - mgmt = 'host' - if srv in docker_nonagents: - docker_nonagents.remove(srv) - cls += '[d]' - mgmt = 'docker' - if hinst['manage'] != 'docker': - session.add_message( - f'The agent config for instance-id=' - f'{iid} was matched to docker service ' - f'{srv}, but config does not specify ' - f'manage:docker! Dropping both.') - retire(iid) - continue - else: - srv = None - if hinst['manage'] == 'no': - continue - if hinst['manage'] == 'docker': - session.add_message( - f'No docker config found for instance-id=' - f'{iid}, though manage:docker specified ' - f'in config. Dropping.') - retire(iid) - continue - new_managed.append((iid, iid, srv, cls, mgmt)) + srv = None # The expected docker service name, if any - for srv in docker_nonagents: - new_managed.append((srv, srv, srv, '[docker]', 'docker')) + mgmt, start_state = hinst['manage'].split('/') + if mgmt == 'docker': + cls = _clsname_tool(cls, '[d?]') + srv = self.docker_service_prefix + iid + + # See if we already tracking this agent. + instance = self.database.get(iid) - # Compare new managed items to stuff already in our database. - for db_key, iid, srv, cls, mgmt in new_managed: - instance = self.database.get(db_key, None) - if instance is not None and \ - instance['management'] == 'retired': - instance = None if instance is not None: - # So instance is some kind of actively managed container. - if (instance['agent_class'] != cls - or instance['management'] != mgmt): + # Already tracking; just check for major config change. + _cls = instance['agent_class'] + _mgmt = instance['management'] + if not same_base_class(_cls, cls) or _mgmt != mgmt: session.add_message( - f'Managed agent "{db_key}" changed agent_class ' - f'({instance["agent_class"]} -> {cls}) or management ' - f'({instance["management"]} -> {mgmt}) and is being ' - f'reset!') + f'Managed agent "{iid}" changed agent_class ' + f'({_cls} -> {cls}) or management ' + f'({_mgmt} -> {mgmt}) and is being reset!') + # Bring down existing instance + self._terminate_instance(iid) + # Start a new one instance = None + + # Do we have an unmatched docker entry for this? + if instance is None and srv in self.database: + # Re-register it under instance_id + instance = self.database.pop(srv) + self.database[iid] = instance + instance.update({ + 'instance_id': iid, + 'agent_class': _clsname_tool(cls, '[d]'), + 'full_name': _full_name(cls, iid), + }) + if instance is None: instance = hm_utils.ManagedInstance.init( management=mgmt, instance_id=iid, agent_class=cls, - full_name=(f'{cls}:{db_key}'), + full_name=_full_name(cls, iid), ) - if mgmt == 'docker': - instance['agent_script'] = srv - instance['prot'] = self._get_docker_helper(instance) - if instance['prot'].status[0] is None: - session.add_message( - 'On startup, detected active container for %s' % iid) - # Mark current state as up... by the end - # of this function target_state will be up - # or down and that will determine if - # container is left up or stopped. - instance['next_action'] = 'up' - else: - # Check for the agent class in the plugin system; - # then check the (deprecated) agent script registry. - if cls in agent_plugins: - session.add_message(f'Found plugin for "{cls}"') - instance['agent_script'] = '__plugin__' - elif cls in site_config.agent_script_reg: - session.add_message(f'Found launcher script for "{cls}"') - instance['agent_script'] = site_config.agent_script_reg[cls] - else: - session.add_message(f'No plugin (nor launcher script) ' - f'found for agent_class "{cls}"!') - session.add_message(f'Tracking {instance["full_name"]}') - self.database[db_key] = instance - yield warnings + instance['target_state'] = start_state + self.database[iid] = instance - @inlineCallbacks - def _check_docker_states(self): - """Scan the docker-compose files, again, and update the database - information ('running' state, most importantly) for all - services. + # Get agent class list from modern plugin system. + agent_plugins = agent_cli.build_agent_list() - It is the policy of this function to ignore things that are - odd rather than deal with them somehow. + # Assign plugins / scripts / whatever to any new instances. + for iid, instance in self.database.items(): + if instance['agent_script'] is not None: + continue + if instance['management'] == 'host': + cls = instance['agent_class'] + # Check for the agent class in the plugin system; + # then check the (deprecated) agent script registry. + if cls in agent_plugins: + session.add_message(f'Found plugin for "{cls}"') + instance['agent_script'] = '__plugin__' + instance['operable'] = True + elif cls in site_config.agent_script_reg: + session.add_message(f'Found launcher script for "{cls}"') + instance['agent_script'] = site_config.agent_script_reg[cls] + instance['operable'] = True + else: + session.add_message(f'No plugin (nor launcher script) ' + f'found for agent_class "{cls}"!') + elif instance['management'] == 'docker': + instance['agent_script'] = self.docker_service_prefix + iid - """ - # Dict of database entries, indexed by docker service name. - docker_managed = {info['agent_script']: info - for info in self.database.values() - if info['management'] == 'docker'} - for compose in self.docker_composes: - services = yield hm_utils.parse_docker_state( - compose, docker_compose_bin=self.docker_compose_bin) - for k, info in services.items(): - db = docker_managed.get(k) - if db is not None: - if db['prot'] is None: - db['prot'] = self._get_docker_helper(db) - db['prot'].update(info) - - def _get_docker_helper(self, instance): - service_name = instance['agent_script'] - return hm_utils.DockerContainerHelper( - self.docker_services[service_name], - docker_compose_bin=self.docker_compose_bin) + # Read the compose files; query container states; updater stuff. + yield self._update_docker_services(session) + returnValue(warnings) def _launch_instance(self, instance): """Launch an Agent instance (whether 'host' or 'docker' managed) using @@ -300,7 +359,7 @@ def _launch_instance(self, instance): """ if instance['management'] == 'docker': - prot = self._get_docker_helper(instance) + prot = instance['prot'] else: iid = instance['instance_id'] pyth = sys.executable @@ -333,49 +392,13 @@ def _terminate_instance(self, key): return True, 'Kill requested.' def _process_target_states(self, session, requests=[]): - """Update the child Agent target states. The manager Process will - then try to maintain those states. This function is used both - for first-time init of the manager Process, but also for - setting new target states while the manager Process is - running. - - Arguments: - session: The operation session object (for logging). - requests (list): Default is []. Each entry must be a tuple - of the form (instance_id, target_state). The instance_id - must be a string that matches an item in the current - database, or be the string 'all', which will match all - items in the current database. The target_state must be - 'up' or 'down'. - reload_config (bool): Default is True. If True, the site - config file and docker-compose files are reparsed in order - to (re-)populate the database of child Agent instances. - - Examples: - - :: - - _process_target_states(session, requests=[('thermo1', 'down')]) - _process_target_states(session, requests=[('all', 'up')]) - - Notes: - State update requests in the ``requests`` list are processed - in order. For example, if the requests were [('all', 'up'), - ('data1', 'down')]. This would result in setting all known - children to have target_state "up", except for "data1" which - would be given target state of "down". + """This is a helper function for parsing target_state change + requests; see the update Task. """ # Special requests will target specific instance_id; make a map for that. - addressable = {} - for k, v in self.database.items(): - if v['management'] == 'retired': - continue - if k in addressable: - session.add_message('Internal state problem; multiple agents ' - 'with instance_id=%s' % k[1]) - continue - addressable[k] = v + addressable = {k: v for k, v in self.database.items() + if v['management'] != 'retired'} for key, state in requests: if state not in VALID_TARGETS: @@ -388,6 +411,8 @@ def _process_target_states(self, session, requests=[]): else: if key in addressable: addressable[key]['target_state'] = state + else: + session.add_message(f'Ignoring invalid target, {key}') @ocs_agent.param('requests', default=[]) @ocs_agent.param('reload_config', default=True, type=bool) @@ -400,19 +425,26 @@ def manager(self, session, params): from a client, the Process will launch or terminate child Agents. + Args: + + requests (list): List of agent instance target state + requests; e.g. [('instance1', 'down')]. See description + in :meth:`update` Task. + reload_config (bool): When starting up, discard any cached + database of tracked agents and rescan the Site Config + File. This is mostly for debugging. + Notes: If an Agent process exits unexpectedly, it will be relaunched within a few seconds. - Prior to starting the management loop, this function - (re-)parses the site config and docker compose files (unless - ``reload_config`` is False). It passes ``requests`` to the - ``_update_target_states`` function; please see that - docstring for formatting. + When this Process is started (or restarted), the list of + tracked agents and their status is completely reset, and the + Site Config File is read in. Once this process is running, the target states for managed - Agents can be manipulated through the ``update`` task. + Agents can be manipulated through the :meth:`update` task. Note that when a stop is requested on this Process, all managed Agents will be moved to the "down" state and an @@ -437,7 +469,7 @@ def manager(self, session, params): {'next_action': 'up', 'target_state': 'up', 'stability': 1.0, - 'agent_class': 'FakeDataAgent', + 'agent_class': 'FakeDataAgent[d]', 'instance_id': 'faker6'}, ], } @@ -445,25 +477,34 @@ def manager(self, session, params): If you are looking for the "current state", it's called "next_action" here. + The agent_class may include a suffix [d] or [d?], indicating + that the agent is configured to run within a docker + container. (The question mark indicates that the + HostManager cannot actually identify the docker-compose + service associated with the agent description in the SCF.) + """ + self.config_parse_status = {} + session.data = { + 'child_states': [], + 'config_parse_status': self.config_parse_status, + } + self.running = True session.set_status('running') if params['reload_config']: + self.database = {} yield self._reload_config(session) self._process_target_states(session, params['requests']) - session.data = { - 'child_states': [], - } - next_docker_update = time.time() any_jobs = False while self.running or any_jobs: if time.time() >= next_docker_update: - yield self._check_docker_states() + yield self._update_docker_services(session) next_docker_update = time.time() + 2 sleep_times = [1.] @@ -482,8 +523,7 @@ def manager(self, session, params): if actions['terminate']: self._terminate_instance(key) if actions['launch']: - reactor.callFromThread( - self._launch_instance, db) + reactor.callFromThread(self._launch_instance, db) if actions['sleep']: sleep_times.append(actions['sleep']) any_jobs = (any_jobs or (db['next_action'] != 'down')) @@ -493,8 +533,9 @@ def manager(self, session, params): db['fail_times']) # Clean up retired items. - self.database = {k: v for k, v in self.database.items() - if v['management'] != 'retired' or v['next_action'] != 'down'} + self.database = { + k: v for k, v in self.database.items() + if v['management'] != 'retired' or v['next_action'] not in ['down', '?']} # Update session info. child_states = [] @@ -525,14 +566,49 @@ def _stop_manager(self, session, params): def update(self, session, params): """update(requests=[], reload_config=False) - **Task** - Update the manager process' child Agent parameters. + **Task** - Update the target state for any subset of the + managed agent instances. Optionally, trigger a full reload of + the Site Config File first. - This Task will fail if the manager Process is not running. + Args: + requests (list): Default is []. Each entry must be a tuple + of the form ``(instance_id, target_state)``. The + ``instance_id`` must be a string that matches an item in + the current list of tracked agent instances, or be the + string 'all', which will match all items being tracked. + The ``target_state`` must be 'up' or 'down'. + reload_config (bool): Default is False. If True, the site + config file and docker-compose files are reparsed in order + to (re-)populate the database of child Agent instances. + + Examples: + :: - If ``reload_config`` is True, the management agent - configuration will be reloaded by ``_reload_config``. Then - the ``requests`` are passed to ``_process_target_states``. - See those docstrings for more info. + update(requests=[('thermo1', 'down')]) + update(requests=[('all', 'up')]) + update(reload_config=True) + + + Notes: + Starting and stopping agent instances is handled by the + :meth:`manager` Process; if that Process is not running then + no action is taken by this Task and it will exit with an + error. + + The entries in the ``requests`` list are processed in order. + For example, if the requests were [('all', 'up'), ('data1', + 'down')]. This would result in setting all known children + to have target_state "up", except for "data1" which would be + given target state of "down". + + If ``reload_config`` is True, the Site Config File will be + reloaded (as described in :meth:`_reload_config`) before + any of the requests are processed. + + Managed docker-compose.yaml files are reparsed, continously, + by the manager process -- no specific action is taken with + those in this Task. Note that adding/changing the list of + docker-compose.yaml files requires restarting the agent. """ if not self.running: @@ -563,13 +639,24 @@ def die(self, session, params): return True, 'This HostManager should terminate in about 1 second.' +def _clsname_tool(name, new_suffix=None): + try: + i = name.index('[') + except ValueError: + i = len(name) + base, suffix = name[:i], name[i:] + if new_suffix is None: + return base, suffix + return base + new_suffix + + def make_parser(parser=None): if parser is None: parser = argparse.ArgumentParser() pgroup = parser.add_argument_group('Agent Options') pgroup.add_argument('--initial-state', default=None, choices=['up', 'down'], - help="Sets the target state for managed agents, " + help="Force a single target state for all agents, " "on start-up.") pgroup.add_argument('--docker-compose', default=None, help="Comma-separated list of docker-compose files " diff --git a/ocs/agents/host_manager/drivers.py b/ocs/agents/host_manager/drivers.py index bc9d0ad1..613cfd79 100644 --- a/ocs/agents/host_manager/drivers.py +++ b/ocs/agents/host_manager/drivers.py @@ -14,16 +14,20 @@ class ManagedInstance(dict): Properties that must be set explicitly by user: - 'management' (str): Either 'host', 'docker', or 'retired'. - - 'agent_class' (str): The agent class. This will have special - value 'docker' if the instance corresponds to a docker-compose - service that has not been matched to a site_config entry. + - 'agent_class' (str): The agent class name, which may include a + suffix ([d] or [d?]) if the agent is managed through Docker. + For instances corresponding to docker services that do not have + a corresponding SCF entry, the value here will be '[docker]'. - 'instance_id' (str): The agent instance-id, or the docker service name if the instance is an unmatched docker-compose service. - - 'full_name' (str): instance_id:agent_class + - 'full_name' (str): agent_class:instance_id Properties that are given a default value by init function: + - 'operable' (bool): indicates whether the instance can be + manipulated (whether calls to up/down should be expected to + work). - 'agent_script' (str): Path to the launcher script (if host system managed). If docker-managed, this is the service name. - 'prot': The twisted ProcessProtocol object (if host system @@ -46,6 +50,7 @@ def init(cls, **kwargs): # Note some core things are not included. self = cls({ 'agent_script': None, + 'operable': False, 'prot': None, 'next_action': 'down', 'target_state': 'down', @@ -85,11 +90,17 @@ def resolve_child_state(db): # State machine. prot = db['prot'] + # If the entry is not "operable", send next_action to '?' and + # don't try to do anything else. + + if not db['operable']: + db['next_action'] = '?' + # The uninterruptible transition state(s) are most easily handled # in the same way regardless of target state. # Transitional: wait_start, which bridges from start -> up. - if db['next_action'] == 'wait_start': + elif db['next_action'] == 'wait_start': if prot is not None: messages.append('Launched {full_name}'.format(**db)) db['next_action'] = 'up' @@ -126,13 +137,15 @@ def resolve_child_state(db): elif db['next_action'] == 'start': messages.append( 'Requested launch for {full_name}'.format(**db)) - db['prot'] = None actions['launch'] = True db['next_action'] = 'wait_start' now = time.time() db['at'] = now + 1. elif db['next_action'] == 'up': - stat, t = prot.status + if prot is None: + stat, t = 0, None + else: + stat, t = prot.status if stat is not None: messages.append('Detected exit of {full_name} ' 'with code {stat}.'.format(stat=stat, **db)) @@ -281,12 +294,10 @@ def _run_docker_compose(args, docker_compose_bin=None): class DockerContainerHelper: - """Class for managing the docker container associated with some service. Provides some of the same interface as - AgentProcessHelper in HostManager agent. Pass in a service - description dict (such as the ones returned by - parse_docker_state). + AgentProcessHelper. Pass in a service description dict (such as + the ones returned by parse_docker_state). """ @@ -309,6 +320,7 @@ def update(self, service): self.status = None, time.time() else: self.status = service['exit_code'], time.time() + self.killed = False def up(self): self.d = _run_docker_compose( @@ -375,33 +387,50 @@ def parse_docker_state(docker_compose_file, docker_compose_bin=None): # Run docker inspect. for cont_id in cont_ids: - out, err, code = yield utils.getProcessOutputAndValue( - 'docker', ['inspect', cont_id], env=os.environ) - if code != 0 and 'No such object' in err.decode('utf8'): - # This is likely due to a race condition where some - # container was brought down since we ran docker-compose. - # Just drop the entry. - print(f'(no such object: {cont_id}') + try: + info = yield _inspectContainer(cont_id, docker_compose_file) + except RuntimeError as e: + print(f'Warning, failed to inspect container {cont_id}; {e}.') + continue + if info is None: continue - elif code != 0: - raise RuntimeError( - f'Trouble running "docker inspect %s".\n' - f'stdout: {out}\n stderr {err}') - # Reconcile config against docker-compose ... - info = yaml.safe_load(out)[0] - config = info['Config']['Labels'] - _dc_file = os.path.join(config['com.docker.compose.project.working_dir'], - config['com.docker.compose.project.config_files']) - if not os.path.samefile(docker_compose_file, _dc_file): - raise RuntimeError("Consistency problem: container started from " - "some other compose file?\n%s\n%s" % (docker_compose_file, _dc_file)) - service = config['com.docker.compose.service'] + + service = info.pop('service') if service not in summary: raise RuntimeError("Consistency problem: image does not self-report " "as a listed service? (%s)" % (service)) - summary[service].update({ - 'running': info['State']['Running'], - 'exit_code': info['State'].get('ExitCode', 127), - 'container_found': True, - }) + summary[service].update(info) + return summary + + +@inlineCallbacks +def _inspectContainer(cont_id, docker_compose_file): + """Run docker inspect on cont_id, return dict with the results.""" + out, err, code = yield utils.getProcessOutputAndValue( + 'docker', ['inspect', cont_id], env=os.environ) + if code != 0 and 'No such object' in err.decode('utf8'): + # This is likely due to a race condition where some + # container was brought down since we ran docker-compose. + # Return None to indicate this -- caller should just ignore for now. + print(f'(_inspectContainer: warning, no such object: {cont_id}') + return None + elif code != 0: + raise RuntimeError( + f'Trouble running "docker inspect {cont_id}".\n' + f'stdout: {out}\n stderr {err}') + # Reconcile config against docker-compose ... + info = yaml.safe_load(out)[0] + config = info['Config']['Labels'] + _dc_file = os.path.join(config['com.docker.compose.project.working_dir'], + config['com.docker.compose.project.config_files']) + if not os.path.samefile(docker_compose_file, _dc_file): + raise RuntimeError("Consistency problem: container started from " + "some other compose file?\n%s\n%s" % (docker_compose_file, _dc_file)) + service = config['com.docker.compose.service'] + return { + 'service': service, + 'running': info['State']['Running'], + 'exit_code': info['State'].get('ExitCode', 127), + 'container_found': True, + } diff --git a/ocs/ocsbow.py b/ocs/ocsbow.py index e48192b6..702a5a32 100644 --- a/ocs/ocsbow.py +++ b/ocs/ocsbow.py @@ -53,6 +53,8 @@ def get_parser(): listed in the local copy of the site config).""") p.add_argument('--host', '-H', default=None, action='append', help='Limit hosts that are displayed.') + p.add_argument('--reload-config', '-r', default=None, action='store_true', + help='Request each HM to reprocess the site config file.') # common args for up and down target_parser = argparse.ArgumentParser(add_help=False) @@ -184,7 +186,7 @@ def crossbar_test(args, site_config): return ok, msg -def get_status(args, site_config, restrict_hosts=None): +def get_status(args, site_config, restrict_hosts=None, reload_config=False): """Assemble a detailed description of the site configuration, that goes somewhat beyond what's in the site config by querying each HostManager it finds to identify docker-based or other secret @@ -203,37 +205,93 @@ def get_status(args, site_config, restrict_hosts=None): 'warnings': [], } - # Loop over hosts ... + all_instance_ids = [] + warn_reloads = False + warn_consistency = False + + # Loop over hosts found in the SCF ... for host_name, host_data in site.hosts.items(): if restrict_hosts is not None and host_name not in restrict_hosts: continue hms = [] agent_info = {} - blank_state = {'current': '?', - 'target': '?'} + sort_order = ['hm', 'host', 'docker', 'ignore', 'other'] + + # Loop over agent instances described in the SCF for this host. + # This should result in agent_info entries with keys: + # - instance-id (already present) + # - agent-class (already present) + # - manage (may be present) + # - sort-token + # - agent-class-note (includes modifiers such as [d]) + # - agent-class-hm (agent-class-note from HM placeholder) + # - current (current state place-holder for HM) + # - target (target state place-holder for HM) + for idx, inst in enumerate(host_data.instances): inst = inst.copy() - if inst.get('manage') is None: - inst['manage'] = 'yes' - inst.update(blank_state) + + # Fill in defaults ... + try: + inst['manage'] = ocs.site_config.InstanceConfig \ + ._MANAGE_MAP[inst.get('manage')] + except KeyError: + # Not a known 'manage' setting. + pass + + inst.update({ + 'agent-class-note': inst['agent-class'], + 'agent-class-hm': '?', + 'current': '?', + 'target': '?', + 'sort-token': sort_order[-1], + }) + + # Set sort-token and record HMs. if inst['agent-class'] == HOSTMANAGER_CLASS: - sort_order = 0 + inst['manage'] = 'hm' hms.append(HostManagerManager( args, site_config, instance_id=inst['instance-id'])) - else: - sort_order = ['x', 'yes', 'no', 'docker'].index(inst['manage']) + _st = inst['manage'].split('/')[0] # hm, host, docker, ignore + if _st in sort_order: + inst['sort-token'] = _st + + # Modify agent-class name to get table-ready version; this + # should be the same string HM status returns, later. + if inst['sort-token'] == 'docker': + inst['agent-class-note'] += '[d]' + elif inst['sort-token'] == 'hm': + inst['agent-class-hm'] = inst['agent-class-note'] + elif inst['sort-token'] in ['ignore', 'other']: + # Mark as "unmanaged"; hotwire -hm value to match. + inst['agent-class-note'] += '[unman]' + inst['agent-class-hm'] = inst['agent-class-note'] + iid = inst['instance-id'] - if iid in agent_info: + if iid in all_instance_ids: output['warnings'].append( f'***WARNING -- site config contains multiple entries ' f'with instance-id={iid}; ignoring all but first.') continue - agent_info[iid] = (sort_order, idx, iid, inst) + agent_info[iid] = inst + all_instance_ids.append(iid) + + # Are we supposed to refresh_config these? + if reload_config and len(hms): + print('Requesting config reloads ...') + for hm in hms: + if hm.reload_config(): + print(f' Ok on {hm.instance_id}.') + else: + print(f' !! Failed on {hm.instance_id}.') + warn_reloads = True + print() - order = [v[2] for v in sorted(agent_info.values())] - agent_info = {k: agent_info[k][3] for k in order} + # Now loop through HostManagers and see what they know for hm in hms: info = hm.status() + + # Stow some info about the HostMan itself... cinfo = { 'target': 'n/a', } @@ -247,44 +305,98 @@ def get_status(args, site_config, restrict_hosts=None): cinfo['current'] = '?' agent_info[hm.instance_id].update(cinfo) + # Enrich agent_info using HostManager records. This + # updates (perhaps): + # - next_action + # - target_state + # - class_name_display + # + # Generally one of three things can happen: + # + # 1. The HM instance-id is matched to the SCF, and the + # agent_class / execution method (docker etc) agree. + # 2. The HM instance-id is matched to SCF, but with + # different class/method. + # 3. The HM reports an instance-id that is not known to + # the SCF (or at least not on this host). + # + # In the latter two cases, the agent-class-show will + # display the two discrepant values, between /, with a '?' + # for a missing thing. + found = [] for cinfo in info['child_states']: this_id = cinfo['instance_id'] - # Watch for [d] suffix, and steal it. - if cinfo['agent_class'].endswith('[d]'): - agent_info[this_id]['agent-class'] = cinfo['agent_class'] if this_id in found: output['warnings'].append( - f'***WARNING -- HostManager reports multiple states ' - f'for instance-id={this_id}; ignoring all but first.') + f'***WARNING -- HostManager {hm.instance_id}reports ' + f'multiple states for instance-id={this_id}; ' + 'ignoring all but first.') continue found.append(this_id) - if this_id not in agent_info: - # Secret agent! - agent_info[this_id] = { + + if this_id in agent_info: + inst = agent_info[this_id] + inst['agent-class-hm'] = cinfo['agent_class'] + else: + # Create a record for this unknown instance. + _m = 'host' + if '[d' in cinfo['agent_class']: + _m = 'docker' + inst = { 'instance-id': this_id, - 'agent-class': '[docker]', - 'manage': 'yes', + 'agent-class': '?', + 'agent-class-note': '?', + 'agent-class-hm': cinfo['agent_class'], + 'manage': _m, + 'sort-token': _m, } - agent_info[this_id].update(blank_state) + agent_info[this_id] = inst + + inst['target'] = cinfo['target_state'] if cinfo['next_action'] != 'down' and \ cinfo['stability'] <= 0.5: - cinfo['next_action'] = 'unstable' - agent_info[this_id].update({ - 'current': cinfo['next_action'], - 'target': cinfo['target_state'] - }) + inst['current'] = 'unstable' + else: + inst['current'] = cinfo['next_action'] + + # Populate the agent-class-show. + for k, v in agent_info.items(): + a, b = v['agent-class-note'], v['agent-class-hm'] + v['agent-class-show'] = a + if a != b: + v['agent-class-show'] = '%s/%s' % (a, b) + warn_consistency = True + + # Sort the agent_info dict. + sorted_keys = sorted([(sort_order.index(v['sort-token']), k) + for k, v in agent_info.items()]) + agent_info = {k: agent_info[k] for _, k in sorted_keys} + output['hosts'].append({ 'host_name': host_name, 'hostmanager_count': len(hms), 'agent_info': agent_info}) + + if warn_reloads: + output['warnings'].append( + '***WARNING -- failed to request reload_config on some HostManagers.') + + if warn_consistency: + output['warnings'].append( + '***WARNING -- where agent-class value contains a slash (/) it ' + 'indicates an inconsistency between the local Site Config ' + '(pre-slash) and HostManager reported (post-slash) Agent ' + 'Class values.') + return output def print_status(args, site_config): site, host, instance = site_config - status = get_status(args, site_config, restrict_hosts=args.host) + status = get_status(args, site_config, restrict_hosts=args.host, + reload_config=args.reload_config) print('ocs status') print('----------') @@ -298,17 +410,17 @@ def print_status(args, site_config): for hstat in status['hosts']: header = {'instance-id': '[instance-id]', - 'agent-class': '[agent-class]', + 'agent-class-show': '[agent-class]', 'current': '[state]', 'target': '[target]'} - field_widths = {'instance-id': 30, - 'agent-class': 20} + field_widths = {'instance-id': 20, + 'agent-class-show': 30} if len(hstat['agent_info']): field_widths = {k: max(v0, max([len(v[k]) for v in hstat['agent_info'].values()])) for k, v0 in field_widths.items()} - fmt = ' {instance-id:%i} {agent-class:%i} {current:>10} {target:>10}' % ( - field_widths['instance-id'], field_widths['agent-class']) + fmt = ' {instance-id:%i} {agent-class-show:%i} {current:>10} {target:>10}' % ( + field_widths['instance-id'], field_widths['agent-class-show']) header = fmt.format(**header) print('-' * len(header)) print(f'Host: {hstat["host_name"]}\n') @@ -321,6 +433,7 @@ def print_status(args, site_config): print('Important Notes:') for w in status['warnings']: print(' ' + w) + print() def print_config(args, site_config): @@ -644,6 +757,12 @@ def start(self, check=True, timeout=5., up=None, foreground=False): return False, 'Agent did not register within %.1f seconds.' % timeout return True, 'Agent launched.' + def reload_config(self): + # Request a config reload. Returns True if the request + # succeeded. + err, msg, session = self.client.update(reload_config=True) + return (err == ocs.OK) and session['success'] + def agent_control(self, request, targets): try: # In all cases, make sure process is running. @@ -673,6 +792,15 @@ def agent_control(self, request, targets): err, msg, session = \ self.client.manager.start(**params) else: + known_children = [cs.get('instance_id') + for cs in session['data']['child_states']] + targets_not_found = [t for t in targets if t not in known_children] + if len(targets_not_found): + print('Warning, some targets not known to HostManager: ') + print(f' {targets_not_found}') + print('Requesting reload_config.') + params['reload_config'] = True + err, msg, session = \ self.client.update.start(**params) @@ -827,6 +955,7 @@ def main(args=None): if args.command is None: args.command = 'status' args.host = None + args.reload_config = None if args.command == 'config': print_config(args, site_config) @@ -850,7 +979,7 @@ def main(args=None): for inst in others: if inst['instance-id'] not in args.instance: continue - if inst['manage'] not in ['yes', 'docker']: + if inst['sort-token'] not in ['host', 'docker']: raise OcsbowError( "Cannot perform action on '%s', as it is not " "configured as a managed Agent." % inst['instance-id']) @@ -871,6 +1000,9 @@ def client(hm): clients[iid] = HostManagerManager(args, site_config, iid) return clients[iid] + if len(hms) == 0 and len(agents) == 0: + print(f'No targets found matching "{args.instance}"') + for hm in hms: print(f' {args.command} hostmanager {hm["instance-id"]} all') if args.dry_run: diff --git a/ocs/site_config.py b/ocs/site_config.py index f25fa375..ac39af16 100644 --- a/ocs/site_config.py +++ b/ocs/site_config.py @@ -47,9 +47,11 @@ def from_dict(cls, data): """ self = cls() - for k, v in data.get('hosts', {}).items(): - assert (k not in self.hosts) # duplicate host name in config file! - self.hosts[k] = HostConfig.from_dict(v, parent=self, name=k) + hosts = data.get('hosts') + if hosts: + for k, v in hosts.items(): + assert (k not in self.hosts) # duplicate host name in config file! + self.hosts[k] = HostConfig.from_dict(v, parent=self, name=k) self.hub = HubConfig.from_dict(data['hub'], parent=self) return self @@ -201,6 +203,29 @@ def summary(self): class InstanceConfig: + + _MANAGE_MAP = { + # Fundamental states + 'host/up': 'host/up', + 'host/down': 'host/down', + 'docker/up': 'docker/up', + 'docker/down': 'docker/down', + 'ignore': 'ignore', + + # Aliases for deprecated yes / no. + 'yes': 'host/up', + 'no': 'ignore', + + # Non-deprecated aliases. + 'docker': 'docker/up', + 'host': 'host/up', + 'up': 'host/up', + 'down': 'host/down', + + # Default. + None: 'host/up', + } + def __init__(self): self.arguments = [] @@ -234,9 +259,43 @@ def from_dict(cls, data, parent=None): to ['--key1', 'value', '--key2', 'value']. ``manage`` (str, optional): - A string to help HostManager decide how to manage this - agent. Value should be one of ["yes", "no", "docker"] - (default is "yes"). + A string describing how a HostManager should manage this + agent. See notes. + + Notes: + + The ``manage`` value is only relevant if a HostManager is + configured to operate on the host. In that case, the + HostManager's treatment of the agent instance depends on + the value of ``manage``: + + - "ignore": HostManager will not attempt to manage the + agent instance. + - "host/up": HostManager will manage the agent instance, + launching it on the host system. On startup, the + instance will be set to target_state "up" (i.e. the + HostManager will try to start it). + - "host/down": like host/up, but HostManager will not + start up the agent instance until explicitly requested + to do. + - "docker/up": HostManager will manage the agent instance + through Docker. On Startup, the instance will be set to + target_state "up". + - "docker/down": Like docker/up, but the instance will be + forced to target_state "down" on startup. + + In earlier versions of OCS, the acceptable values were + "yes", "no", and "docker". Those were equivalent to + current values of "host/down", "ignore", and "docker/down". + + Those values are still accepted, but note that "yes" and + "docker" are now equivalent to "host/up" and "docker/up". + + The following abbreviated values are also accepted: + + - "host": same as "host/up" + - "up": same as "host/up" + - "down": same as "host/down" """ self = cls() @@ -244,8 +303,7 @@ def from_dict(cls, data, parent=None): self.data = data self.arguments = self.data.get('arguments', []) self.manage = self.data.get('manage') - if self.manage is None: - self.manage = "yes" + self.manage = self._MANAGE_MAP.get(self.manage, self.manage) return self diff --git a/tests/default.yaml b/tests/default.yaml index 449ac234..7666fce9 100644 --- a/tests/default.yaml +++ b/tests/default.yaml @@ -19,15 +19,18 @@ hosts: 'arguments': []}, {'agent-class': 'FakeDataAgent', 'instance-id': 'fake-data-local', + 'manage': 'host/down', 'arguments': [['--mode', 'idle'], ['--num-channels', '16'], ['--sample-rate', '5'], ['--frame-length', '10']]}, {'agent-class': 'RegistryAgent', 'instance-id': 'registry', + 'manage': 'host/up', 'arguments': []}, {'agent-class': 'AggregatorAgent', 'instance-id': 'aggregator-local', + 'manage': 'ignore', 'arguments': [['--initial-state', 'idle'], ['--time-per-file', '30'], ['--data-dir', '/tmp/data/']]}, diff --git a/tests/integration/test_host_manager_agent_integration.py b/tests/integration/test_host_manager_agent_integration.py index 983181e2..1cdc45bc 100644 --- a/tests/integration/test_host_manager_agent_integration.py +++ b/tests/integration/test_host_manager_agent_integration.py @@ -38,13 +38,35 @@ def find_child(resp, instance_id): return v raise ValueError - target = 'fake-data-local' + # Check whether managed agents are getting to their requested + # initial state. The expectations here are matched to manage: + # settings in default.yaml. We check the "up" ones first, because + # once we've waited for those to come up we've probably waited + # long enough to ensure the 'down' ones aren't going to also come + # up unexpectedly. - state = find_child(resp, target) - assert (state['target_state'] == 'down') - assert (state['next_action'] == 'down') + timeout = time.time() + 10 + for target, is_managed, init_state in [ + ('registry', True, 'up'), + ('influxagent-local', True, 'up'), + ('fake-data-local', True, 'down'), + ('aggregator-local', False, None), + ]: + print(f'Waiting for {target} ...') + if is_managed: + while time.time() < timeout: + resp = client.manager.status() + state = find_child(resp, target) + if state['next_action'] == init_state: + break + time.sleep(.5) + assert state['target_state'] == init_state + else: + with pytest.raises(ValueError): + state = find_child(resp, target) # Start it up + target = 'fake-data-local' resp = client.update(requests=[(target, 'up')]) print(resp)