From 6a17e5b65bf9421c2e650d23817c79ce81248c6f Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 3 Sep 2021 16:37:37 -0400 Subject: [PATCH] Allow manually running a health check, and make other adjustments to the health check trigger (#11002) * Full finalize the planned work for health checks of execution nodes * Implementation of instance health_check endpoint * Also do version conditional to node_type * Do not use receptor mesh to check main cluster nodes health * Fix bugs from testing health check of cluster nodes, add doc * Add a few fields to health check serializer missed before * Light refactoring of error field processing * Fix errors clearing error, write more unit tests * Update health check info in docs * Bump migration of health check after rebase * Mark string for translation * Add related health_check link for system auditors too * Handle health_check cluster node timeout, add errors for peer judgement --- Makefile | 2 +- awx/api/permissions.py | 13 +++-- awx/api/serializers.py | 12 +++++ .../templates/api/instance_health_check.md | 33 ++++++++++++ awx/api/urls/instance.py | 3 +- awx/api/views/__init__.py | 51 +++++++++++++++++++ awx/conf/views.py | 4 +- .../migrations/0155_improved_health_check.py | 25 +++++++++ awx/main/models/ha.py | 40 ++++++++++----- awx/main/tasks.py | 50 ++++++++++++++---- .../tests/functional/api/test_instance.py | 37 ++++++++++++-- awx/main/tests/functional/test_instances.py | 17 +++++++ awx/main/utils/receptor.py | 17 ++++--- awx/settings/defaults.py | 1 + docs/receptor_mesh.md | 33 +++++++----- 15 files changed, 285 insertions(+), 53 deletions(-) create mode 100644 awx/api/templates/api/instance_health_check.md create mode 100644 awx/main/migrations/0155_improved_health_check.py diff --git a/Makefile b/Makefile index 596037817b..1a9aa1a45a 100644 --- a/Makefile +++ b/Makefile @@ -175,7 +175,7 @@ init: fi; \ $(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST) --node_type=$(MAIN_NODE_TYPE); \ $(MANAGEMENT_COMMAND) register_queue --queuename=controlplane --instance_percent=100;\ - $(MANAGEMENT_COMMAND) register_queue --queuename=default; + $(MANAGEMENT_COMMAND) register_queue --queuename=default --instance_percent=100; if [ ! -f /etc/receptor/certs/awx.key ]; then \ rm -f /etc/receptor/certs/*; \ receptor --cert-init commonname="AWX Test CA" bits=2048 outcert=/etc/receptor/certs/ca.crt outkey=/etc/receptor/certs/ca.key; \ diff --git a/awx/api/permissions.py b/awx/api/permissions.py index 6613ca5858..a951928626 100644 --- a/awx/api/permissions.py +++ b/awx/api/permissions.py @@ -25,7 +25,7 @@ __all__ = [ 'ProjectUpdatePermission', 'InventoryInventorySourcesUpdatePermission', 'UserPermission', - 'IsSuperUser', + 'IsSystemAdminOrAuditor', 'InstanceGroupTowerPermission', 'WorkflowApprovalPermission', ] @@ -236,13 +236,18 @@ class UserPermission(ModelAccessPermission): raise PermissionDenied() -class IsSuperUser(permissions.BasePermission): +class IsSystemAdminOrAuditor(permissions.BasePermission): """ - Allows access only to admin users. + Allows write access only to system admin users. + Allows read access only to system auditor users. """ def has_permission(self, request, view): - return request.user and request.user.is_superuser + if not request.user: + return False + if request.method == 'GET': + return request.user.is_superuser or request.user.is_system_auditor + return request.user.is_superuser class InstanceGroupTowerPermission(ModelAccessPermission): diff --git a/awx/api/serializers.py b/awx/api/serializers.py index be4d5581e2..c3e2a427f9 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -4786,6 +4786,9 @@ class InstanceSerializer(BaseSerializer): "hostname", "created", "modified", + "last_seen", + "last_health_check", + "errors", 'capacity_adjustment', "version", "capacity", @@ -4806,6 +4809,8 @@ class InstanceSerializer(BaseSerializer): res = super(InstanceSerializer, self).get_related(obj) res['jobs'] = self.reverse('api:instance_unified_jobs_list', kwargs={'pk': obj.pk}) res['instance_groups'] = self.reverse('api:instance_instance_groups_list', kwargs={'pk': obj.pk}) + if self.context['request'].user.is_superuser or self.context['request'].user.is_system_auditor: + res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk}) return res def get_consumed_capacity(self, obj): @@ -4818,6 +4823,13 @@ class InstanceSerializer(BaseSerializer): return float("{0:.2f}".format(((float(obj.capacity) - float(obj.consumed_capacity)) / (float(obj.capacity))) * 100)) +class InstanceHealthCheckSerializer(BaseSerializer): + class Meta: + model = Instance + read_only_fields = ('uuid', 'hostname', 'version', 'last_health_check', 'errors', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity', 'capacity') + fields = read_only_fields + + class InstanceGroupSerializer(BaseSerializer): show_capabilities = ['edit', 'delete'] diff --git a/awx/api/templates/api/instance_health_check.md b/awx/api/templates/api/instance_health_check.md new file mode 100644 index 0000000000..16e799c253 --- /dev/null +++ b/awx/api/templates/api/instance_health_check.md @@ -0,0 +1,33 @@ +{% ifmeth GET %} +# Health Check Data + +Health checks are used to obtain important data about an instance. +Instance fields affected by the health check are shown in this view. +Fundamentally, health checks require running code on the machine in question. + + - For instances with `node_type` of "control" or "hybrid", health checks are +performed as part of a periodic task that runs in the background. + - For instances with `node_type` of "execution", health checks are done by submitting +a work unit through the receptor mesh. + +If ran through the receptor mesh, the invoked command is: + +``` +ansible-runner worker --worker-info +``` + +For execution nodes, these checks are _not_ performed on a regular basis. +Health checks against functional nodes will be ran when the node is first discovered. +Health checks against nodes with errors will be repeated at a reduced frequency. + +{% endifmeth %} + +{% ifmeth POST %} +# Manually Initiate a Health Check +For purposes of error remediation or debugging, a health check can be +manually initiated by making a POST request to this endpoint. + +This will submit the work unit to the target node through the receptor mesh and wait for it to finish. +The model will be updated with the result. +Up-to-date values of the fields will be returned in the response data. +{% endifmeth %} diff --git a/awx/api/urls/instance.py b/awx/api/urls/instance.py index abff37c5d9..dd75db2b21 100644 --- a/awx/api/urls/instance.py +++ b/awx/api/urls/instance.py @@ -3,7 +3,7 @@ from django.conf.urls import url -from awx.api.views import InstanceList, InstanceDetail, InstanceUnifiedJobsList, InstanceInstanceGroupsList +from awx.api.views import InstanceList, InstanceDetail, InstanceUnifiedJobsList, InstanceInstanceGroupsList, InstanceHealthCheck urls = [ @@ -11,6 +11,7 @@ urls = [ url(r'^(?P[0-9]+)/$', InstanceDetail.as_view(), name='instance_detail'), url(r'^(?P[0-9]+)/jobs/$', InstanceUnifiedJobsList.as_view(), name='instance_unified_jobs_list'), url(r'^(?P[0-9]+)/instance_groups/$', InstanceInstanceGroupsList.as_view(), name='instance_instance_groups_list'), + url(r'^(?P[0-9]+)/health_check/$', InstanceHealthCheck.as_view(), name='instance_health_check'), ] __all__ = ['urls'] diff --git a/awx/api/views/__init__.py b/awx/api/views/__init__.py index 42354c1500..e9077f96f7 100644 --- a/awx/api/views/__init__.py +++ b/awx/api/views/__init__.py @@ -108,6 +108,7 @@ from awx.api.permissions import ( InstanceGroupTowerPermission, VariableDataPermission, WorkflowApprovalPermission, + IsSystemAdminOrAuditor, ) from awx.api import renderers from awx.api import serializers @@ -408,6 +409,56 @@ class InstanceInstanceGroupsList(InstanceGroupMembershipMixin, SubListCreateAtta return None +class InstanceHealthCheck(GenericAPIView): + + name = _('Instance Health Check') + model = models.Instance + serializer_class = serializers.InstanceHealthCheckSerializer + permission_classes = (IsSystemAdminOrAuditor,) + + def get(self, request, *args, **kwargs): + obj = self.get_object() + data = self.get_serializer(data=request.data).to_representation(obj) + return Response(data, status=status.HTTP_200_OK) + + def post(self, request, *args, **kwargs): + obj = self.get_object() + + if obj.node_type == 'execution': + from awx.main.tasks import execution_node_health_check + + runner_data = execution_node_health_check(obj.hostname) + obj.refresh_from_db() + data = self.get_serializer(data=request.data).to_representation(obj) + # Add in some extra unsaved fields + for extra_field in ('transmit_timing', 'run_timing'): + if extra_field in runner_data: + data[extra_field] = runner_data[extra_field] + else: + from awx.main.tasks import cluster_node_health_check + + if settings.CLUSTER_HOST_ID == obj.hostname: + cluster_node_health_check(obj.hostname) + else: + cluster_node_health_check.apply_async([obj.hostname], queue=obj.hostname) + start_time = time.time() + prior_check_time = obj.last_health_check + while time.time() - start_time < 50.0: + obj.refresh_from_db(fields=['last_health_check']) + if obj.last_health_check != prior_check_time: + break + if time.time() - start_time < 1.0: + time.sleep(0.1) + else: + time.sleep(1.0) + else: + obj.mark_offline(errors=_('Health check initiated by user determined this instance to be unresponsive')) + obj.refresh_from_db() + data = self.get_serializer(data=request.data).to_representation(obj) + + return Response(data, status=status.HTTP_200_OK) + + class InstanceGroupList(ListCreateAPIView): name = _("Instance Groups") diff --git a/awx/conf/views.py b/awx/conf/views.py index e98b28cc49..f0ff1607b7 100644 --- a/awx/conf/views.py +++ b/awx/conf/views.py @@ -23,7 +23,7 @@ from rest_framework import status # AWX from awx.api.generics import APIView, GenericAPIView, ListAPIView, RetrieveUpdateDestroyAPIView -from awx.api.permissions import IsSuperUser +from awx.api.permissions import IsSystemAdminOrAuditor from awx.api.versioning import reverse from awx.main.utils import camelcase_to_underscore from awx.main.tasks import handle_setting_changes @@ -150,7 +150,7 @@ class SettingLoggingTest(GenericAPIView): name = _('Logging Connectivity Test') model = Setting serializer_class = SettingSingletonSerializer - permission_classes = (IsSuperUser,) + permission_classes = (IsSystemAdminOrAuditor,) filter_backends = [] def post(self, request, *args, **kwargs): diff --git a/awx/main/migrations/0155_improved_health_check.py b/awx/main/migrations/0155_improved_health_check.py new file mode 100644 index 0000000000..a82fa152b5 --- /dev/null +++ b/awx/main/migrations/0155_improved_health_check.py @@ -0,0 +1,25 @@ +# Generated by Django 2.2.20 on 2021-08-31 17:41 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0154_set_default_uuid'), + ] + + operations = [ + migrations.AddField( + model_name='instance', + name='errors', + field=models.TextField(blank=True, default='', editable=False, help_text='Any error details from the last health check.'), + ), + migrations.AddField( + model_name='instance', + name='last_health_check', + field=models.DateTimeField( + editable=False, help_text='Last time a health check was ran on this instance to refresh cpu, memory, and capacity.', null=True + ), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 6ad9ec2c4a..0b75e4a964 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -82,11 +82,22 @@ class Instance(HasPolicyEditsMixin, BaseModel): editable=False, help_text=_('Total system memory of this instance in bytes.'), ) + errors = models.TextField( + default='', + blank=True, + editable=False, + help_text=_('Any error details from the last health check.'), + ) last_seen = models.DateTimeField( null=True, editable=False, help_text=_('Last time instance ran its heartbeat task for main cluster nodes. Last known connection to receptor mesh for execution nodes.'), ) + last_health_check = models.DateTimeField( + null=True, + editable=False, + help_text=_('Last time a health check was ran on this instance to refresh cpu, memory, and capacity.'), + ) # Capacity management capacity = models.PositiveIntegerField( default=100, @@ -152,15 +163,16 @@ class Instance(HasPolicyEditsMixin, BaseModel): grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD return self.last_seen < ref_time - timedelta(seconds=grace_period) - def mark_offline(self, update_last_seen=False, perform_save=True): - if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and (not update_last_seen): + def mark_offline(self, update_last_seen=False, perform_save=True, errors=''): + if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and self.errors == errors and (not update_last_seen): return self.cpu_capacity = self.mem_capacity = self.capacity = 0 + self.errors = errors if update_last_seen: self.last_seen = now() if perform_save: - update_fields = ['capacity', 'cpu_capacity', 'mem_capacity'] + update_fields = ['capacity', 'cpu_capacity', 'mem_capacity', 'errors'] if update_last_seen: update_fields += ['last_seen'] self.save(update_fields=update_fields) @@ -180,11 +192,12 @@ class Instance(HasPolicyEditsMixin, BaseModel): self.mem_capacity = get_mem_effective_capacity(self.memory) self.set_capacity_value() - def save_health_data(self, version, cpu, memory, uuid=None, last_seen=None, has_error=False): - update_fields = [] + def save_health_data(self, version, cpu, memory, uuid=None, update_last_seen=False, errors=''): + self.last_health_check = now() + update_fields = ['last_health_check'] - if last_seen is not None and self.last_seen != last_seen: - self.last_seen = last_seen + if update_last_seen: + self.last_seen = self.last_health_check update_fields.append('last_seen') if uuid is not None and self.uuid != uuid: @@ -207,25 +220,26 @@ class Instance(HasPolicyEditsMixin, BaseModel): self.memory = new_memory update_fields.append('memory') - if not has_error: + if not errors: self.refresh_capacity_fields() + self.errors = '' else: - self.mark_offline(perform_save=False) - update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity']) + self.mark_offline(perform_save=False, errors=errors) + update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity', 'errors']) self.save(update_fields=update_fields) def local_health_check(self): """Only call this method on the instance that this record represents""" - has_error = False + errors = None try: # if redis is down for some reason, that means we can't persist # playbook event data; we should consider this a zero capacity event redis.Redis.from_url(settings.BROKER_URL).ping() except redis.ConnectionError: - has_error = True + errors = _('Failed to connect ot Redis') - self.save_health_data(awx_application_version, get_cpu_count(), get_mem_in_bytes(), last_seen=now(), has_error=has_error) + self.save_health_data(awx_application_version, get_cpu_count(), get_mem_in_bytes(), update_last_seen=True, errors=errors) class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 0f12c4af12..858e721082 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -177,7 +177,7 @@ def dispatch_startup(): def inform_cluster_of_shutdown(): try: this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID) - this_inst.mark_offline(update_last_seen=True) # No thank you to new jobs while shut down + this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal')) try: reaper.reap(this_inst) except Exception: @@ -408,14 +408,37 @@ def cleanup_execution_environment_images(): logger.debug(f"Failed to delete image {image_name}") +@task(queue=get_local_queuename) +def cluster_node_health_check(node): + ''' + Used for the health check endpoint, refreshes the status of the instance, but must be ran on target node + ''' + if node == '': + logger.warn('Local health check incorrectly called with blank string') + return + elif node != settings.CLUSTER_HOST_ID: + logger.warn(f'Local health check for {node} incorrectly sent to {settings.CLUSTER_HOST_ID}') + return + try: + this_inst = Instance.objects.me() + except Instance.DoesNotExist: + logger.warn(f'Instance record for {node} missing, could not check capacity.') + return + this_inst.local_health_check() + + @task(queue=get_local_queuename) def execution_node_health_check(node): + if node == '': + logger.warn('Remote health check incorrectly called with blank string') + return try: instance = Instance.objects.get(hostname=node) except Instance.DoesNotExist: logger.warn(f'Instance record for {node} missing, could not check capacity.') return - data = worker_info(node) + + data = worker_info(node, work_type='ansible-runner' if instance.node_type == 'execution' else 'local') prior_capacity = instance.capacity @@ -424,7 +447,7 @@ def execution_node_health_check(node): cpu=data.get('cpu_count', 0), memory=data.get('mem_in_bytes', 0), uuid=data.get('uuid'), - has_error=bool(data.get('errors')), + errors='\n'.join(data.get('errors', [])), ) if data['errors']: @@ -436,6 +459,8 @@ def execution_node_health_check(node): else: logger.info('Set capacity of execution node {} to {}, worker info data:\n{}'.format(node, instance.capacity, json.dumps(data, indent=2))) + return data + def inspect_execution_nodes(instance_list): with advisory_lock('inspect_execution_nodes_lock', wait=False): @@ -488,10 +513,12 @@ def inspect_execution_nodes(instance_list): logger.warn(f'Execution node attempting to rejoin as instance {hostname}.') execution_node_health_check.apply_async([hostname]) elif instance.capacity == 0: - # Periodically re-run the health check of errored nodes, in case someone fixed it - # TODO: perhaps decrease the frequency of these checks - logger.debug(f'Restarting health check for execution node {hostname} with known errors.') - execution_node_health_check.apply_async([hostname]) + # nodes with proven connection but need remediation run health checks are reduced frequency + if not instance.last_health_check or (nowtime - instance.last_health_check).total_seconds() >= settings.EXECUTION_NODE_REMEDIATION_CHECKS: + # Periodically re-run the health check of errored nodes, in case someone fixed it + # TODO: perhaps decrease the frequency of these checks + logger.debug(f'Restarting health check for execution node {hostname} with known errors.') + execution_node_health_check.apply_async([hostname]) @task(queue=get_local_queuename) @@ -556,7 +583,7 @@ def cluster_node_heartbeat(): # If auto deprovisining is on, don't bother setting the capacity to 0 # since we will delete the node anyway. if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES: - other_inst.mark_offline() + other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive')) logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen)) elif settings.AWX_AUTO_DEPROVISION_INSTANCES: deprovision_hostname = other_inst.hostname @@ -3028,12 +3055,17 @@ class AWXReceptorJob: # We establish a connection to the Receptor socket receptor_ctl = get_receptor_ctl() + res = None try: - return self._run_internal(receptor_ctl) + res = self._run_internal(receptor_ctl) + return res finally: # Make sure to always release the work unit if we established it if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK: receptor_ctl.simple_command(f"work release {self.unit_id}") + # If an error occured without the job itself failing, it could be a broken instance + if self.work_type == 'ansible-runner' and res is None or getattr(res, 'rc', None) is None: + execution_node_health_check(self.task.instance.execution_node) def _run_internal(self, receptor_ctl): # Create a socketpair. Where the left side will be used for writing our payload diff --git a/awx/main/tests/functional/api/test_instance.py b/awx/main/tests/functional/api/test_instance.py index 88f0586fd9..b94b860b01 100644 --- a/awx/main/tests/functional/api/test_instance.py +++ b/awx/main/tests/functional/api/test_instance.py @@ -1,13 +1,22 @@ import pytest -from awx.api.versioning import reverse +from unittest import mock +from awx.api.versioning import reverse from awx.main.models.ha import Instance +import redis + +# Django +from django.test.utils import override_settings + + +INSTANCE_KWARGS = dict(hostname='example-host', cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42) + @pytest.mark.django_db def test_disabled_zeros_capacity(patch, admin_user): - instance = Instance.objects.create(hostname='example-host', cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42) + instance = Instance.objects.create(**INSTANCE_KWARGS) url = reverse('api:instance_detail', kwargs={'pk': instance.pk}) @@ -20,7 +29,7 @@ def test_disabled_zeros_capacity(patch, admin_user): @pytest.mark.django_db def test_enabled_sets_capacity(patch, admin_user): - instance = Instance.objects.create(hostname='example-host', enabled=False, cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42, capacity=0) + instance = Instance.objects.create(enabled=False, capacity=0, **INSTANCE_KWARGS) assert instance.capacity == 0 url = reverse('api:instance_detail', kwargs={'pk': instance.pk}) @@ -30,3 +39,25 @@ def test_enabled_sets_capacity(patch, admin_user): instance.refresh_from_db() assert instance.capacity > 0 + + +@pytest.mark.django_db +def test_auditor_user_health_check(get, post, system_auditor): + instance = Instance.objects.create(**INSTANCE_KWARGS) + url = reverse('api:instance_health_check', kwargs={'pk': instance.pk}) + r = get(url=url, user=system_auditor, expect=200) + assert r.data['cpu_capacity'] == instance.cpu_capacity + post(url=url, user=system_auditor, expect=403) + + +@pytest.mark.django_db +@mock.patch.object(redis.client.Redis, 'ping', lambda self: True) +def test_health_check_usage(get, post, admin_user): + instance = Instance.objects.create(**INSTANCE_KWARGS) + url = reverse('api:instance_health_check', kwargs={'pk': instance.pk}) + r = get(url=url, user=admin_user, expect=200) + assert r.data['cpu_capacity'] == instance.cpu_capacity + assert r.data['last_health_check'] is None + with override_settings(CLUSTER_HOST_ID=instance.hostname): # force direct call of cluster_node_health_check + r = post(url=url, user=admin_user, expect=200) + assert r.data['last_health_check'] is not None diff --git a/awx/main/tests/functional/test_instances.py b/awx/main/tests/functional/test_instances.py index efb9904d2d..bac6259694 100644 --- a/awx/main/tests/functional/test_instances.py +++ b/awx/main/tests/functional/test_instances.py @@ -324,6 +324,23 @@ def test_instance_group_capacity(instance_factory, instance_group_factory): assert ig_single.capacity == 100 +@pytest.mark.django_db +def test_health_check_clears_errors(): + instance = Instance.objects.create(hostname='foo-1', enabled=True, capacity=0, errors='something went wrong') + data = dict(version='ansible-runner-4.2', cpu=782, memory=int(39e9), uuid='asdfasdfasdfasdfasdf', errors='') + instance.save_health_data(**data) + for k, v in data.items(): + assert getattr(instance, k) == v + + +@pytest.mark.django_db +def test_health_check_oh_no(): + instance = Instance.objects.create(hostname='foo-2', enabled=True, capacity=52, cpu=8, memory=int(40e9)) + instance.save_health_data('', 0, 0, errors='This it not a real instance!') + assert instance.capacity == instance.cpu_capacity == 0 + assert instance.errors == 'This it not a real instance!' + + @pytest.mark.django_db class TestInstanceGroupOrdering: def test_ad_hoc_instance_groups(self, instance_group_factory, inventory, default_instance_group): diff --git a/awx/main/utils/receptor.py b/awx/main/utils/receptor.py index cec32756f9..9781d92720 100644 --- a/awx/main/utils/receptor.py +++ b/awx/main/utils/receptor.py @@ -28,13 +28,16 @@ def get_receptor_ctl(): return ReceptorControl(receptor_sockfile) -def worker_info(node_name): +def worker_info(node_name, work_type='ansible-runner'): receptor_ctl = get_receptor_ctl() transmit_start = time.time() error_list = [] data = {'errors': error_list, 'transmit_timing': 0.0} - result = receptor_ctl.submit_work(worktype='ansible-runner', payload='', params={"params": f"--worker-info"}, ttl='20s', node=node_name) + kwargs = {} + if work_type != 'local': + kwargs['ttl'] = '20s' + result = receptor_ctl.submit_work(worktype=work_type, payload='', params={"params": f"--worker-info"}, node=node_name, **kwargs) unit_id = result['unitid'] run_start = time.time() @@ -90,9 +93,11 @@ def worker_info(node_name): error_list.extend(remote_data.pop('errors', [])) # merge both error lists data.update(remote_data) - # see tasks.py usage of keys - missing_keys = set(('runner_version', 'mem_in_bytes', 'cpu_count')) - set(data.keys()) - if missing_keys: - data['errors'].append('Worker failed to return keys {}'.format(' '.join(missing_keys))) + # If we have a connection error, missing keys would be trivial consequence of that + if not data['errors']: + # see tasks.py usage of keys + missing_keys = set(('runner_version', 'mem_in_bytes', 'cpu_count')) - set(data.keys()) + if missing_keys: + data['errors'].append('Worker failed to return keys {}'.format(' '.join(missing_keys))) return data diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index b4b5876863..7acccbea5e 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -422,6 +422,7 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') # heartbeat period can factor into some forms of logic, so it is maintained as a setting here CLUSTER_NODE_HEARTBEAT_PERIOD = 60 RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34 +EXECUTION_NODE_REMEDIATION_CHECKS = 60 * 10 # once every 10 minutes check if an execution node errors have been resolved BROKER_URL = 'unix:///var/run/redis/redis.sock' CELERYBEAT_SCHEDULE = { diff --git a/docs/receptor_mesh.md b/docs/receptor_mesh.md index 6a3a816d77..38c53f120c 100644 --- a/docs/receptor_mesh.md +++ b/docs/receptor_mesh.md @@ -61,10 +61,9 @@ Here is a listing of work types that you may encounter: - `kubernetes-runtime-auth` - user-space jobs ran in a container group - `kubernetes-incluster-auth` - project updates and management jobs on OpenShift Container Platform -### Auto-discovery of execution nodes +### Auto-discovery of Execution Nodes -Instances in control plane must be registered by the installer via `awx-manage` -commands like `awx-manage register_queue` or `awx-manage register_instance`. +Instances in control plane must be registered by the installer via `awx-manage register_queue` or `awx-manage register_instance`. Execution-only nodes are automatically discovered after they have been configured and join the receptor mesh. Control nodes should see them as a "Known Node". @@ -72,32 +71,38 @@ Control nodes should see them as a "Known Node". Control nodes check the receptor network (reported via `receptorctl status`) when their heartbeat task runs. Nodes on the receptor network are compared against the `Instance` model in the database. -If a node appears in the mesh network which is not in the database, then a "health check" is started. -Fields like `cpu`, `memory`, and `version` will obtain a non-default value through this process. +If a node appears in the receptor mesh which is not in the database, +then a database entry is created and added to the "default" instance group. In order to run jobs on execution nodes, either the installer needs to pre-register the node, or user needs to make a PATCH request to `/api/v2/instances/N/` to change the `enabled` field to true. -Execution nodes should automatically be placed in the default instance group. #### Health Check Mechanics -All relevant data for health checks is reported from the ansible-runner command: +Fields like `cpu`, `memory`, and `version` will obtain a non-default value from the health check. +If the instance has problems that would prevent jobs from running, `capacity` will be set to zero, +and details will be shown in the instance's `errors` field. + +For execution nodes, relevant data for health checks is reported from the ansible-runner command: ``` ansible-runner worker --worker-info ``` This will output YAML data to standard out containing CPU, memory, and other metrics used to compute `capacity`. - AWX invokes this command by submitting a receptor work unit (of type `ansible-runner`) to the target execution node. -If you have the development environment running, you can run a one-off health check of a node with this command: -``` -echo "from awx.main.utils.receptor import worker_info; worker_info('receptor-1')" | awx-manage shell_plus --quiet -``` +##### Health Check Triggers -This must be ran as the awx user inside one of the hybrid or control nodes. -This will not affect actual `Instance` record, but will just run the command and report the data. +Health checks for execution nodes have several triggers that can cause it to run. + - When an execution node is auto-discovered, a health check is started + - For execution nodes with errors, health checks are re-ran once about every 10 minutes for auto-remediation + - If a job had an error _not from the Ansible subprocess_ then a health check is started to check for instance errors + - System administrators can manually trigger a health check by making a POST request to `/api/v2/instances/N/health_check/`. + +Healthy execution nodes will _not_ have health checks ran on a regular basis. + +Control and hybrid nodes run health checks via a periodic task (bypassing ansible-runner). ### Development Environment