diff --git a/awx/main/analytics/analytics_tasks.py b/awx/main/analytics/analytics_tasks.py index 6aa08ab9a4..481db3e57e 100644 --- a/awx/main/analytics/analytics_tasks.py +++ b/awx/main/analytics/analytics_tasks.py @@ -3,13 +3,13 @@ import logging # AWX from awx.main.analytics.subsystem_metrics import DispatcherMetrics, CallbackReceiverMetrics -from awx.main.dispatch.publish import task +from awx.main.dispatch.publish import task as task_awx from awx.main.dispatch import get_task_queuename logger = logging.getLogger('awx.main.scheduler') -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def send_subsystem_metrics(): DispatcherMetrics().send_metrics() CallbackReceiverMetrics().send_metrics() diff --git a/awx/main/apps.py b/awx/main/apps.py index c1c90f83c7..ed1e4a5abb 100644 --- a/awx/main/apps.py +++ b/awx/main/apps.py @@ -1,5 +1,7 @@ import os +from dispatcherd.config import setup as dispatcher_setup + from django.apps import AppConfig from django.utils.translation import gettext_lazy as _ from awx.main.utils.common import bypass_in_test, load_all_entry_points_for @@ -79,6 +81,10 @@ class MainConfig(AppConfig): def ready(self): super().ready() + from awx.main.dispatch.config import get_dispatcherd_config + + dispatcher_setup(get_dispatcherd_config()) + """ Credential loading triggers database operations. There are cases we want to call awx-manage collectstatic without a database. All management commands invoke the ready() code diff --git a/awx/main/constants.py b/awx/main/constants.py index 59f23f7c53..3b45153721 100644 --- a/awx/main/constants.py +++ b/awx/main/constants.py @@ -77,6 +77,8 @@ LOGGER_BLOCKLIST = ( 'awx.main.utils.log', # loggers that may be called getting logging settings 'awx.conf', + # dispatcherd should only use 1 database connection + 'dispatcherd', ) # Reported version for node seen in receptor mesh but for which capacity check diff --git a/awx/main/dispatch/config.py b/awx/main/dispatch/config.py new file mode 100644 index 0000000000..189653d37b --- /dev/null +++ b/awx/main/dispatch/config.py @@ -0,0 +1,50 @@ +from django.conf import settings + +from ansible_base.lib.utils.db import get_pg_notify_params +from awx.main.dispatch import get_task_queuename +from awx.main.dispatch.pool import get_auto_max_workers + + +def get_dispatcherd_config(for_service: bool = False) -> dict: + """Return a dictionary config for dispatcherd + + Parameters: + for_service: if True, include dynamic options needed for running the dispatcher service + this will require database access, you should delay evaluation until after app setup + """ + config = { + "version": 2, + "service": { + "pool_kwargs": { + "min_workers": settings.JOB_EVENT_WORKERS, + "max_workers": get_auto_max_workers(), + }, + "main_kwargs": {"node_id": settings.CLUSTER_HOST_ID}, + "process_manager_cls": "ForkServerManager", + "process_manager_kwargs": {"preload_modules": ['awx.main.dispatch.hazmat']}, + }, + "brokers": { + "pg_notify": { + "config": get_pg_notify_params(), + "sync_connection_factory": "ansible_base.lib.utils.db.psycopg_connection_from_django", + "default_publish_channel": settings.CLUSTER_HOST_ID, # used for debugging commands + }, + "socket": {"socket_path": settings.DISPATCHERD_DEBUGGING_SOCKFILE}, + }, + "publish": { + "default_control_broker": "socket", + "default_broker": "pg_notify", + }, + "worker": {"worker_cls": "awx.main.dispatch.worker.dispatcherd.AWXTaskWorker"}, + } + + if for_service: + config["producers"] = { + "ScheduledProducer": {"task_schedule": settings.DISPATCHER_SCHEDULE}, + "OnStartProducer": {"task_list": {"awx.main.tasks.system.dispatch_startup": {}}}, + "ControlProducer": {}, + } + + config["brokers"]["pg_notify"]["channels"] = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()] + + return config diff --git a/awx/main/dispatch/hazmat.py b/awx/main/dispatch/hazmat.py new file mode 100644 index 0000000000..a6bd2a8f38 --- /dev/null +++ b/awx/main/dispatch/hazmat.py @@ -0,0 +1,36 @@ +import django + +# dispatcherd publisher logic is likely to be used, but needs manual preload +from dispatcherd.brokers import pg_notify # noqa + +# Cache may not be initialized until we are in the worker, so preload here +from channels_redis import core # noqa + +from awx import prepare_env + +from dispatcherd.utils import resolve_callable + + +prepare_env() + +django.setup() # noqa + + +from django.conf import settings + + +# Preload all periodic tasks so their imports will be in shared memory +for name, options in settings.CELERYBEAT_SCHEDULE.items(): + resolve_callable(options['task']) + + +# Preload in-line import from tasks +from awx.main.scheduler.kubernetes import PodManager # noqa + + +from django.core.cache import cache as django_cache +from django.db import connection + + +connection.close() +django_cache.close() diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index df39e06de3..ebd8c35fea 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -4,6 +4,9 @@ import json import time from uuid import uuid4 +from dispatcherd.publish import submit_task +from dispatcherd.utils import resolve_callable + from django_guid import get_guid from django.conf import settings @@ -93,6 +96,19 @@ class task: @classmethod def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw): + try: + from flags.state import flag_enabled + + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + # At this point we have the import string, and submit_task wants the method, so back to that + actual_task = resolve_callable(cls.name) + return submit_task(actual_task, args=args, kwargs=kwargs, queue=queue, uuid=uuid, **kw) + except Exception: + logger.exception(f"[DISPATCHER] Failed to check for alternative dispatcherd implementation for {cls.name}") + # Continue with original implementation if anything fails + pass + + # Original implementation follows queue = queue or getattr(cls.queue, 'im_func', cls.queue) if not queue: msg = f'{cls.name}: Queue value required and may not be None' diff --git a/awx/main/dispatch/worker/dispatcherd.py b/awx/main/dispatch/worker/dispatcherd.py new file mode 100644 index 0000000000..72cfadf799 --- /dev/null +++ b/awx/main/dispatch/worker/dispatcherd.py @@ -0,0 +1,14 @@ +from dispatcherd.worker.task import TaskWorker + +from django.db import connection + + +class AWXTaskWorker(TaskWorker): + + def on_start(self) -> None: + """Get worker connected so that first task it gets will be worked quickly""" + connection.ensure_connection() + + def pre_task(self, message) -> None: + """This should remedy bad connections that can not fix themselves""" + connection.close_if_unusable_or_obsolete() diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index f1eab29b1f..55f056851c 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -2,13 +2,21 @@ # All Rights Reserved. import logging import yaml +import os import redis from django.conf import settings from django.core.management.base import BaseCommand, CommandError +from flags.state import flag_enabled + +from dispatcherd.factories import get_control_from_settings +from dispatcherd import run_service +from dispatcherd.config import setup as dispatcher_setup + from awx.main.dispatch import get_task_queuename +from awx.main.dispatch.config import get_dispatcherd_config from awx.main.dispatch.control import Control from awx.main.dispatch.pool import AutoscalePool from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker @@ -40,18 +48,44 @@ class Command(BaseCommand): ), ) + def verify_dispatcherd_socket(self): + if not os.path.exists(settings.DISPATCHERD_DEBUGGING_SOCKFILE): + raise CommandError('Dispatcher is not running locally') + def handle(self, *arg, **options): if options.get('status'): - print(Control('dispatcher').status()) - return + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + ctl = get_control_from_settings() + running_data = ctl.control_with_reply('status') + if len(running_data) != 1: + raise CommandError('Did not receive expected number of replies') + print(yaml.dump(running_data[0], default_flow_style=False)) + return + else: + print(Control('dispatcher').status()) + return if options.get('schedule'): - print(Control('dispatcher').schedule()) + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + print('NOT YET IMPLEMENTED') + return + else: + print(Control('dispatcher').schedule()) return if options.get('running'): - print(Control('dispatcher').running()) - return + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + ctl = get_control_from_settings() + running_data = ctl.control_with_reply('running') + print(yaml.dump(running_data, default_flow_style=False)) + return + else: + print(Control('dispatcher').running()) + return if options.get('reload'): - return Control('dispatcher').control({'control': 'reload'}) + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + print('NOT YET IMPLEMENTED') + return + else: + return Control('dispatcher').control({'control': 'reload'}) if options.get('cancel'): cancel_str = options.get('cancel') try: @@ -60,21 +94,36 @@ class Command(BaseCommand): cancel_data = [cancel_str] if not isinstance(cancel_data, list): cancel_data = [cancel_str] - print(Control('dispatcher').cancel(cancel_data)) - return - consumer = None + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + ctl = get_control_from_settings() + results = [] + for task_id in cancel_data: + # For each task UUID, send an individual cancel command + result = ctl.control_with_reply('cancel', data={'uuid': task_id}) + results.append(result) + print(yaml.dump(results, default_flow_style=False)) + return + else: + print(Control('dispatcher').cancel(cancel_data)) + return - try: - DispatcherMetricsServer().start() - except redis.exceptions.ConnectionError as exc: - raise CommandError(f'Dispatcher could not connect to redis, error: {exc}') + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + dispatcher_setup(get_dispatcherd_config(for_service=True)) + run_service() + else: + consumer = None - try: - queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()] - consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE) - consumer.run() - except KeyboardInterrupt: - logger.debug('Terminating Task Dispatcher') - if consumer: - consumer.stop() + try: + DispatcherMetricsServer().start() + except redis.exceptions.ConnectionError as exc: + raise CommandError(f'Dispatcher could not connect to redis, error: {exc}') + + try: + queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()] + consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE) + consumer.run() + except KeyboardInterrupt: + logger.debug('Terminating Task Dispatcher') + if consumer: + consumer.stop() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 70501be306..aed3204a6e 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -24,6 +24,7 @@ from django.utils.translation import gettext_lazy as _ from django.utils.timezone import now from django.utils.encoding import smart_str from django.contrib.contenttypes.models import ContentType +from flags.state import flag_enabled # REST Framework from rest_framework.exceptions import ParseError @@ -1369,7 +1370,30 @@ class UnifiedJob( traceback=self.result_traceback, ) - def pre_start(self, **kwargs): + def get_start_kwargs(self): + needed = self.get_passwords_needed_to_start() + + decrypted_start_args = decrypt_field(self, 'start_args') + + if not decrypted_start_args or decrypted_start_args == '{}': + return None + + try: + start_args = json.loads(decrypted_start_args) + except Exception: + logger.exception(f'Unexpected malformed start_args on unified_job={self.id}') + return None + + opts = dict([(field, start_args.get(field, '')) for field in needed]) + + if not all(opts.values()): + missing_fields = ', '.join([k for k, v in opts.items() if not v]) + self.job_explanation = u'Missing needed fields: %s.' % missing_fields + self.save(update_fields=['job_explanation']) + + return opts + + def pre_start(self): if not self.can_start: self.job_explanation = u'%s is not in a startable state: %s, expecting one of %s' % (self._meta.verbose_name, self.status, str(('new', 'waiting'))) self.save(update_fields=['job_explanation']) @@ -1390,26 +1414,11 @@ class UnifiedJob( self.save(update_fields=['job_explanation']) return (False, None) - needed = self.get_passwords_needed_to_start() - try: - start_args = json.loads(decrypt_field(self, 'start_args')) - except Exception: - start_args = None + opts = self.get_start_kwargs() - if start_args in (None, ''): - start_args = kwargs - - opts = dict([(field, start_args.get(field, '')) for field in needed]) - - if not all(opts.values()): - missing_fields = ', '.join([k for k, v in opts.items() if not v]) - self.job_explanation = u'Missing needed fields: %s.' % missing_fields - self.save(update_fields=['job_explanation']) + if opts and (not all(opts.values())): return (False, None) - if 'extra_vars' in kwargs: - self.handle_extra_data(kwargs['extra_vars']) - # remove any job_explanations that may have been set while job was in pending if self.job_explanation != "": self.job_explanation = "" @@ -1470,21 +1479,44 @@ class UnifiedJob( def cancel_dispatcher_process(self): """Returns True if dispatcher running this job acknowledged request and sent SIGTERM""" if not self.celery_task_id: - return + return False + canceled = [] + # Special case for task manager (used during workflow job cancellation) if not connection.get_autocommit(): - # this condition is purpose-written for the task manager, when it cancels jobs in workflows - ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id], with_reply=False) + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + try: + from dispatcherd.factories import get_control_from_settings + + ctl = get_control_from_settings() + ctl.control('cancel', data={'uuid': self.celery_task_id}) + except Exception: + logger.exception("Error sending cancel command to new dispatcher") + else: + try: + ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id], with_reply=False) + except Exception: + logger.exception("Error sending cancel command to legacy dispatcher") return True # task manager itself needs to act under assumption that cancel was received + # Standard case with reply try: - # Use control and reply mechanism to cancel and obtain confirmation timeout = 5 - canceled = ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id]) + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + from dispatcherd.factories import get_control_from_settings + + ctl = get_control_from_settings() + results = ctl.control_with_reply('cancel', data={'uuid': self.celery_task_id}, expected_replies=1, timeout=timeout) + # Check if cancel was successful by checking if we got any results + return bool(results and len(results) > 0) + else: + # Original implementation + canceled = ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id]) except socket.timeout: logger.error(f'could not reach dispatcher on {self.controller_node} within {timeout}s') except Exception: logger.exception("error encountered when checking task status") + return bool(self.celery_task_id in canceled) # True or False, whether confirmation was obtained def cancel(self, job_explanation=None, is_chain=False): diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 0025e493a2..67ed9599f4 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -19,6 +19,9 @@ from django.utils.timezone import now as tz_now from django.conf import settings from django.contrib.contenttypes.models import ContentType +# django-flags +from flags.state import flag_enabled + from ansible_base.lib.utils.models import get_type_for_model # django-ansible-base @@ -48,6 +51,7 @@ from awx.main.signals import disable_activity_stream from awx.main.constants import ACTIVE_STATES from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.scheduler.task_manager_models import TaskManagerModels +from awx.main.tasks.jobs import dispatch_waiting_jobs import awx.main.analytics.subsystem_metrics as s_metrics from awx.main.utils import decrypt_field @@ -431,6 +435,7 @@ class TaskManager(TaskBase): # 5 minutes to start pending jobs. If this limit is reached, pending jobs # will no longer be started and will be started on the next task manager cycle. self.time_delta_job_explanation = timedelta(seconds=30) + self.control_nodes_to_notify: set[str] = set() super().__init__(prefix="task_manager") def after_lock_init(self): @@ -519,16 +524,19 @@ class TaskManager(TaskBase): task.save() task.log_lifecycle("waiting") - # apply_async does a NOTIFY to the channel dispatcher is listening to - # postgres will treat this as part of the transaction, which is what we want - if task.status != 'failed' and type(task) is not WorkflowJob: - task_cls = task._get_task_class() - task_cls.apply_async( - [task.pk], - opts, - queue=task.get_queue_name(), - uuid=task.celery_task_id, - ) + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + self.control_nodes_to_notify.add(task.get_queue_name()) + else: + # apply_async does a NOTIFY to the channel dispatcher is listening to + # postgres will treat this as part of the transaction, which is what we want + if task.status != 'failed' and type(task) is not WorkflowJob: + task_cls = task._get_task_class() + task_cls.apply_async( + [task.pk], + opts, + queue=task.get_queue_name(), + uuid=task.celery_task_id, + ) # In exception cases, like a job failing pre-start checks, we send the websocket status message. # For jobs going into waiting, we omit this because of performance issues, as it should go to running quickly @@ -721,3 +729,8 @@ class TaskManager(TaskBase): for workflow_approval in self.get_expired_workflow_approvals(): self.timeout_approval_node(workflow_approval) + + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + for controller_node in self.control_nodes_to_notify: + logger.info(f'Notifying node {controller_node} of new waiting jobs.') + dispatch_waiting_jobs.apply_async(queue=controller_node) diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py index 9cdbea3a9d..b2ea8608f7 100644 --- a/awx/main/scheduler/tasks.py +++ b/awx/main/scheduler/tasks.py @@ -7,7 +7,7 @@ from django.conf import settings # AWX from awx import MODE from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager -from awx.main.dispatch.publish import task +from awx.main.dispatch.publish import task as task_awx from awx.main.dispatch import get_task_queuename logger = logging.getLogger('awx.main.scheduler') @@ -20,16 +20,16 @@ def run_manager(manager, prefix): manager().schedule() -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def task_manager(): run_manager(TaskManager, "task") -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def dependency_manager(): run_manager(DependencyManager, "dependency") -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def workflow_manager(): run_manager(WorkflowManager, "workflow") diff --git a/awx/main/tasks/__init__.py b/awx/main/tasks/__init__.py index 9794dd2b53..f35415863d 100644 --- a/awx/main/tasks/__init__.py +++ b/awx/main/tasks/__init__.py @@ -1 +1 @@ -from . import host_metrics, jobs, receptor, system # noqa +from . import callback, facts, helpers, host_indirect, host_metrics, jobs, receptor, system # noqa diff --git a/awx/main/tasks/host_indirect.py b/awx/main/tasks/host_indirect.py index 632a04a687..e57b437c4a 100644 --- a/awx/main/tasks/host_indirect.py +++ b/awx/main/tasks/host_indirect.py @@ -12,7 +12,7 @@ from django.db import transaction # Django flags from flags.state import flag_enabled -from awx.main.dispatch.publish import task +from awx.main.dispatch.publish import task as task_awx from awx.main.dispatch import get_task_queuename from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit from awx.main.models.event_query import EventQuery @@ -159,7 +159,7 @@ def cleanup_old_indirect_host_entries() -> None: IndirectManagedNodeAudit.objects.filter(created__lt=limit).delete() -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> None: try: job = Job.objects.get(id=job_id) @@ -201,7 +201,7 @@ def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> Non logger.exception(f'Error processing indirect host data for job_id={job_id}') -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def cleanup_and_save_indirect_host_entries_fallback() -> None: if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): return diff --git a/awx/main/tasks/host_metrics.py b/awx/main/tasks/host_metrics.py index 5b530355be..c4c776159a 100644 --- a/awx/main/tasks/host_metrics.py +++ b/awx/main/tasks/host_metrics.py @@ -7,7 +7,7 @@ from django.db.models import Count, F from django.db.models.functions import TruncMonth from django.utils.timezone import now from awx.main.dispatch import get_task_queuename -from awx.main.dispatch.publish import task +from awx.main.dispatch.publish import task as task_awx from awx.main.models.inventory import HostMetric, HostMetricSummaryMonthly from awx.main.tasks.helpers import is_run_threshold_reached from awx.conf.license import get_license @@ -18,7 +18,7 @@ from awx.main.utils.db import bulk_update_sorted_by_id logger = logging.getLogger('awx.main.tasks.host_metrics') -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def cleanup_host_metrics(): if is_run_threshold_reached(getattr(settings, 'CLEANUP_HOST_METRICS_LAST_TS', None), getattr(settings, 'CLEANUP_HOST_METRICS_INTERVAL', 30) * 86400): logger.info(f"Executing cleanup_host_metrics, last ran at {getattr(settings, 'CLEANUP_HOST_METRICS_LAST_TS', '---')}") @@ -29,7 +29,7 @@ def cleanup_host_metrics(): logger.info("Finished cleanup_host_metrics") -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def host_metric_summary_monthly(): """Run cleanup host metrics summary monthly task each week""" if is_run_threshold_reached(getattr(settings, 'HOST_METRIC_SUMMARY_TASK_LAST_TS', None), getattr(settings, 'HOST_METRIC_SUMMARY_TASK_INTERVAL', 7) * 86400): diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 4eb88ae0c0..15a8753b84 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -17,6 +17,7 @@ import urllib.parse as urlparse # Django from django.conf import settings +from django.db import transaction # Shared code for the AWX platform from awx_plugins.interfaces._temporary_private_container_api import CONTAINER_ROOT, get_incontainer_path @@ -28,8 +29,12 @@ import ansible_runner import git from gitdb.exc import BadName as BadGitName +# Dispatcherd +from dispatcherd.publish import task +from dispatcherd.utils import serialize_task + # AWX -from awx.main.dispatch.publish import task +from awx.main.dispatch.publish import task as task_awx from awx.main.dispatch import get_task_queuename from awx.main.constants import ( PRIVILEGE_ESCALATION_METHODS, @@ -37,13 +42,13 @@ from awx.main.constants import ( JOB_FOLDER_PREFIX, MAX_ISOLATED_PATH_COLON_DELIMITER, CONTAINER_VOLUMES_MOUNT_TYPES, - ACTIVE_STATES, HOST_FACTS_FIELDS, ) from awx.main.models import ( Instance, Inventory, InventorySource, + UnifiedJob, Job, AdHocCommand, ProjectUpdate, @@ -110,6 +115,15 @@ def with_path_cleanup(f): return _wrapped +@task(on_duplicate='queue_one', bind=True, queue=get_task_queuename) +def dispatch_waiting_jobs(binder): + for uj in UnifiedJob.objects.filter(status='waiting', controller_node=settings.CLUSTER_HOST_ID).only('id', 'status', 'polymorphic_ctype', 'celery_task_id'): + kwargs = uj.get_start_kwargs() + if not kwargs: + kwargs = {} + binder.control('run', data={'task': serialize_task(uj._get_task_class()), 'args': [uj.id], 'kwargs': kwargs, 'uuid': uj.celery_task_id}) + + class BaseTask(object): model = None event_model = None @@ -117,6 +131,7 @@ class BaseTask(object): callback_class = RunnerCallback def __init__(self): + self.instance = None self.cleanup_paths = [] self.update_attempts = int(getattr(settings, 'DISPATCHER_DB_DOWNTOWN_TOLLERANCE', settings.DISPATCHER_DB_DOWNTIME_TOLERANCE) / 5) self.runner_callback = self.callback_class(model=self.model) @@ -451,27 +466,48 @@ class BaseTask(object): def should_use_fact_cache(self): return False + def transition_status(self, pk: int) -> bool: + """Atomically transition status to running, if False returned, another process got it""" + with transaction.atomic(): + # Explanation of parts for the fetch: + # .values - avoid loading a full object, this is known to lead to deadlocks due to signals + # the signals load other related rows which another process may be locking, and happens in practice + # of=('self',) - keeps FK tables out of the lock list, another way deadlocks can happen + # .get - just load the single job + instance_data = UnifiedJob.objects.select_for_update(of=('self',)).values('status', 'cancel_flag').get(pk=pk) + + # If status is not waiting (obtained under lock) then this process does not have clearence to run + if instance_data['status'] == 'waiting': + if instance_data['cancel_flag']: + updated_status = 'canceled' + else: + updated_status = 'running' + # Explanation of the update: + # .filter - again, do not load the full object + # .update - a bulk update on just that one row, avoid loading unintended data + UnifiedJob.objects.filter(pk=pk).update(status=updated_status, start_args='') + elif instance_data['status'] == 'running': + logger.info(f'Job {pk} is being ran by another process, exiting') + return False + return True + @with_path_cleanup @with_signal_handling def run(self, pk, **kwargs): """ Run the job/task and capture its output. """ - self.instance = self.model.objects.get(pk=pk) - if self.instance.status != 'canceled' and self.instance.cancel_flag: - self.instance = self.update_model(self.instance.pk, start_args='', status='canceled') - if self.instance.status not in ACTIVE_STATES: - # Prevent starting the job if it has been reaped or handled by another process. - raise RuntimeError(f'Not starting {self.instance.status} task pk={pk} because {self.instance.status} is not a valid active state') + if not self.instance: # Used to skip fetch for local runs + if not self.transition_status(pk): + logger.info(f'Job {pk} is being ran by another process, exiting') + return - if self.instance.execution_environment_id is None: - from awx.main.signals import disable_activity_stream + # Load the instance + self.instance = self.update_model(pk) + if self.instance.status != 'running': + logger.error(f'Not starting {self.instance.status} task pk={pk} because its status "{self.instance.status}" is not expected') + return - with disable_activity_stream(): - self.instance = self.update_model(self.instance.pk, execution_environment=self.instance.resolve_execution_environment()) - - # self.instance because of the update_model pattern and when it's used in callback handlers - self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords self.instance.websocket_emit_status("running") status, rc = 'error', None self.runner_callback.event_ct = 0 @@ -484,6 +520,12 @@ class BaseTask(object): private_data_dir = None try: + if self.instance.execution_environment_id is None: + from awx.main.signals import disable_activity_stream + + with disable_activity_stream(): + self.instance = self.update_model(self.instance.pk, execution_environment=self.instance.resolve_execution_environment()) + self.instance.send_notification_templates("running") private_data_dir = self.build_private_data_dir(self.instance) self.pre_run_hook(self.instance, private_data_dir) @@ -491,6 +533,7 @@ class BaseTask(object): self.build_project_dir(self.instance, private_data_dir) self.instance.log_lifecycle("preparing_playbook") if self.instance.cancel_flag or signal_callback(): + logger.debug(f'detected pre-run cancel flag for {self.instance.log_format}') self.instance = self.update_model(self.instance.pk, status='canceled') if self.instance.status != 'running': @@ -613,12 +656,9 @@ class BaseTask(object): elif status == 'canceled': self.instance = self.update_model(pk) cancel_flag_value = getattr(self.instance, 'cancel_flag', False) - if (cancel_flag_value is False) and signal_callback(): + if cancel_flag_value is False: self.runner_callback.delay_update(skip_if_already_set=True, job_explanation="Task was canceled due to receiving a shutdown signal.") status = 'failed' - elif cancel_flag_value is False: - self.runner_callback.delay_update(skip_if_already_set=True, job_explanation="The running ansible process received a shutdown signal.") - status = 'failed' except PolicyEvaluationError as exc: self.runner_callback.delay_update(job_explanation=str(exc), result_traceback=str(exc)) except ReceptorNodeNotFound as exc: @@ -646,6 +686,9 @@ class BaseTask(object): # Field host_status_counts is used as a metric to check if event processing is finished # we send notifications if it is, if not, callback receiver will send them + if not self.instance: + logger.error(f'Unified job pk={pk} appears to be deleted while running') + return if (self.instance.host_status_counts is not None) or (not self.runner_callback.wrapup_event_dispatched): events_processed_hook(self.instance) @@ -742,6 +785,7 @@ class SourceControlMixin(BaseTask): try: # the job private_data_dir is passed so sync can download roles and collections there sync_task = RunProjectUpdate(job_private_data_dir=private_data_dir) + sync_task.instance = local_project_sync # avoids "waiting" status check, performance sync_task.run(local_project_sync.id) local_project_sync.refresh_from_db() self.instance = self.update_model(self.instance.pk, scm_revision=local_project_sync.scm_revision) @@ -805,7 +849,7 @@ class SourceControlMixin(BaseTask): self.release_lock(project) -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) class RunJob(SourceControlMixin, BaseTask): """ Run a job using ansible-playbook. @@ -1128,7 +1172,7 @@ class RunJob(SourceControlMixin, BaseTask): update_inventory_computed_fields.delay(inventory.id) -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) class RunProjectUpdate(BaseTask): model = ProjectUpdate event_model = ProjectUpdateEvent @@ -1467,7 +1511,7 @@ class RunProjectUpdate(BaseTask): return [] -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) class RunInventoryUpdate(SourceControlMixin, BaseTask): model = InventoryUpdate event_model = InventoryUpdateEvent @@ -1730,7 +1774,7 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask): raise PostRunError('Error occured while saving inventory data, see traceback or server logs', status='error', tb=traceback.format_exc()) -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) class RunAdHocCommand(BaseTask): """ Run an ad hoc command using ansible. @@ -1883,7 +1927,7 @@ class RunAdHocCommand(BaseTask): return d -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) class RunSystemJob(BaseTask): model = SystemJob event_model = SystemJobEvent diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index f3fb91c573..9ef7522c47 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -32,7 +32,7 @@ from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER from awx.main.tasks.signals import signal_state, signal_callback, SignalExit from awx.main.models import Instance, InstanceLink, UnifiedJob, ReceptorAddress from awx.main.dispatch import get_task_queuename -from awx.main.dispatch.publish import task +from awx.main.dispatch.publish import task as task_awx # Receptorctl from receptorctl.socket_interface import ReceptorControl @@ -852,7 +852,7 @@ def reload_receptor(): raise RuntimeError("Receptor reload failed") -@task() +@task_awx() def write_receptor_config(): """ This task runs async on each control node, K8S only. @@ -875,7 +875,7 @@ def write_receptor_config(): reload_receptor() -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def remove_deprovisioned_node(hostname): InstanceLink.objects.filter(source__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) InstanceLink.objects.filter(target__instance__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) diff --git a/awx/main/tasks/signals.py b/awx/main/tasks/signals.py index 7b4e4ba47a..749b07df1f 100644 --- a/awx/main/tasks/signals.py +++ b/awx/main/tasks/signals.py @@ -14,16 +14,21 @@ class SignalExit(Exception): class SignalState: + # SIGTERM: Sent by supervisord to process group on shutdown + # SIGUSR1: The dispatcherd cancel signal + signals = (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1) + def reset(self): - self.sigterm_flag = False - self.sigint_flag = False + for for_signal in self.signals: + self.signal_flags[for_signal] = False + self.original_methods[for_signal] = None self.is_active = False # for nested context managers - self.original_sigterm = None - self.original_sigint = None self.raise_exception = False def __init__(self): + self.signal_flags = {} + self.original_methods = {} self.reset() def raise_if_needed(self): @@ -31,31 +36,28 @@ class SignalState: self.raise_exception = False # so it is not raised a second time in error handling raise SignalExit() - def set_sigterm_flag(self, *args): - self.sigterm_flag = True - self.raise_if_needed() - - def set_sigint_flag(self, *args): - self.sigint_flag = True + def set_signal_flag(self, *args, for_signal=None): + self.signal_flags[for_signal] = True + logger.info(f'Processed signal {for_signal}, set exit flag') self.raise_if_needed() def connect_signals(self): - self.original_sigterm = signal.getsignal(signal.SIGTERM) - self.original_sigint = signal.getsignal(signal.SIGINT) - signal.signal(signal.SIGTERM, self.set_sigterm_flag) - signal.signal(signal.SIGINT, self.set_sigint_flag) + for for_signal in self.signals: + self.original_methods[for_signal] = signal.getsignal(for_signal) + signal.signal(for_signal, lambda *args, for_signal=for_signal: self.set_signal_flag(*args, for_signal=for_signal)) self.is_active = True def restore_signals(self): - signal.signal(signal.SIGTERM, self.original_sigterm) - signal.signal(signal.SIGINT, self.original_sigint) - # if we got a signal while context manager was active, call parent methods. - if self.sigterm_flag: - if callable(self.original_sigterm): - self.original_sigterm() - if self.sigint_flag: - if callable(self.original_sigint): - self.original_sigint() + for for_signal in self.signals: + original_method = self.original_methods[for_signal] + signal.signal(for_signal, original_method) + # if we got a signal while context manager was active, call parent methods. + if self.signal_flags[for_signal]: + if callable(original_method): + try: + original_method() + except Exception as exc: + logger.info(f'Error processing original {for_signal} signal, error: {str(exc)}') self.reset() @@ -63,7 +65,7 @@ signal_state = SignalState() def signal_callback(): - return bool(signal_state.sigterm_flag or signal_state.sigint_flag) + return any(signal_state.signal_flags[for_signal] for for_signal in signal_state.signals) def with_signal_handling(f): diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 91e4d0dee5..823636eeb4 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -1,78 +1,77 @@ # Python -from collections import namedtuple import functools import importlib import itertools import json import logging import os -import psycopg -from io import StringIO -from contextlib import redirect_stdout import shutil import time -from distutils.version import LooseVersion as Version +from collections import namedtuple +from contextlib import redirect_stdout from datetime import datetime +from distutils.version import LooseVersion as Version +from io import StringIO -# Django -from django.conf import settings -from django.db import connection, transaction, DatabaseError, IntegrityError -from django.db.models.fields.related import ForeignKey -from django.utils.timezone import now, timedelta -from django.utils.encoding import smart_str -from django.contrib.auth.models import User -from django.utils.translation import gettext_lazy as _ -from django.utils.translation import gettext_noop -from django.core.cache import cache -from django.core.exceptions import ObjectDoesNotExist -from django.db.models.query import QuerySet +# Runner +import ansible_runner.cleanup +import psycopg +from ansible_base.lib.utils.db import advisory_lock + +# django-ansible-base +from ansible_base.resource_registry.tasks.sync import SyncExecutor # Django-CRUM from crum import impersonate -# Django flags -from flags.state import flag_enabled - -# Runner -import ansible_runner.cleanup - # dateutil from dateutil.parser import parse as parse_date -# django-ansible-base -from ansible_base.resource_registry.tasks.sync import SyncExecutor -from ansible_base.lib.utils.db import advisory_lock +# Django +from django.conf import settings +from django.contrib.auth.models import User +from django.core.cache import cache +from django.core.exceptions import ObjectDoesNotExist +from django.db import DatabaseError, IntegrityError, connection, transaction +from django.db.models.fields.related import ForeignKey +from django.db.models.query import QuerySet +from django.utils.encoding import smart_str +from django.utils.timezone import now, timedelta +from django.utils.translation import gettext_lazy as _ +from django.utils.translation import gettext_noop + +# Django flags +from flags.state import flag_enabled +from rest_framework.exceptions import PermissionDenied # AWX from awx import __version__ as awx_application_version +from awx.conf import settings_registry +from awx.main import analytics from awx.main.access import access_registry +from awx.main.analytics.subsystem_metrics import DispatcherMetrics +from awx.main.constants import ACTIVE_STATES, ERROR_STATES +from awx.main.consumers import emit_channel_notification +from awx.main.dispatch import get_task_queuename, reaper +from awx.main.dispatch.publish import task as task_awx from awx.main.models import ( - Schedule, - TowerScheduleState, Instance, InstanceGroup, - UnifiedJob, - Notification, Inventory, - SmartInventoryMembership, Job, + Notification, + Schedule, + SmartInventoryMembership, + TowerScheduleState, + UnifiedJob, convert_jsonfields, ) -from awx.main.constants import ACTIVE_STATES, ERROR_STATES -from awx.main.dispatch.publish import task -from awx.main.dispatch import get_task_queuename, reaper -from awx.main.utils.common import ignore_inventory_computed_fields, ignore_inventory_group_removal - -from awx.main.utils.reload import stop_local_services from awx.main.tasks.helpers import is_run_threshold_reached from awx.main.tasks.host_indirect import save_indirect_host_entries -from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanup, administrative_workunit_reaper, write_receptor_config -from awx.main.consumers import emit_channel_notification -from awx.main import analytics -from awx.conf import settings_registry -from awx.main.analytics.subsystem_metrics import DispatcherMetrics - -from rest_framework.exceptions import PermissionDenied +from awx.main.tasks.receptor import administrative_workunit_reaper, get_receptor_ctl, worker_cleanup, worker_info, write_receptor_config +from awx.main.utils.common import ignore_inventory_computed_fields, ignore_inventory_group_removal +from awx.main.utils.reload import stop_local_services +from dispatcherd.publish import task logger = logging.getLogger('awx.main.tasks.system') @@ -83,7 +82,12 @@ Try upgrading OpenSSH or providing your private key in an different format. \ ''' -def dispatch_startup(): +def _run_dispatch_startup_common(): + """ + Execute the common startup initialization steps. + This includes updating schedules, syncing instance membership, and starting + local reaping and resetting metrics. + """ startup_logger = logging.getLogger('awx.main.tasks') # TODO: Enable this on VM installs @@ -93,14 +97,14 @@ def dispatch_startup(): try: convert_jsonfields() except Exception: - logger.exception("Failed json field conversion, skipping.") + logger.exception("Failed JSON field conversion, skipping.") - startup_logger.debug("Syncing Schedules") + startup_logger.debug("Syncing schedules") for sch in Schedule.objects.all(): try: sch.update_computed_fields() except Exception: - logger.exception("Failed to rebuild schedule {}.".format(sch)) + logger.exception("Failed to rebuild schedule %s.", sch) # # When the dispatcher starts, if the instance cannot be found in the database, @@ -120,25 +124,67 @@ def dispatch_startup(): apply_cluster_membership_policies() cluster_node_heartbeat() reaper.startup_reaping() - reaper.reap_waiting(grace_period=0) m = DispatcherMetrics() m.reset_values() +def _legacy_dispatch_startup(): + """ + Legacy branch for startup: simply performs reaping of waiting jobs with a zero grace period. + """ + logger.debug("Legacy dispatcher: calling reaper.reap_waiting with grace_period=0") + reaper.reap_waiting(grace_period=0) + + +def _dispatcherd_dispatch_startup(): + """ + New dispatcherd branch for startup: uses the control API to re-submit waiting jobs. + """ + logger.debug("Dispatcherd enabled: dispatching waiting jobs via control channel") + from awx.main.tasks.jobs import dispatch_waiting_jobs + + dispatch_waiting_jobs.apply_async(queue=get_task_queuename()) + + +def dispatch_startup(): + """ + System initialization at startup. + First, execute the common logic. + Then, if FEATURE_DISPATCHERD_ENABLED is enabled, re-submit waiting jobs via the control API; + otherwise, fall back to legacy reaping of waiting jobs. + """ + _run_dispatch_startup_common() + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + _dispatcherd_dispatch_startup() + else: + _legacy_dispatch_startup() + + def inform_cluster_of_shutdown(): + """ + Clean system shutdown that marks the current instance offline. + In legacy mode, it also reaps waiting jobs. + In dispatcherd mode, it relies on dispatcherd's built-in cleanup. + """ try: - this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID) - this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal')) + inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID) + inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal')) + except Instance.DoesNotExist: + logger.exception("Cluster host not found: %s", settings.CLUSTER_HOST_ID) + return + + if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): + logger.debug("Dispatcherd mode: no extra reaping required for instance %s", inst.hostname) + else: try: - reaper.reap_waiting(this_inst, grace_period=0) + logger.debug("Legacy mode: reaping waiting jobs for instance %s", inst.hostname) + reaper.reap_waiting(inst, grace_period=0) except Exception: - logger.exception('failed to reap waiting jobs for {}'.format(this_inst.hostname)) - logger.warning('Normal shutdown signal for instance {}, removed self from capacity pool.'.format(this_inst.hostname)) - except Exception: - logger.exception('Encountered problem with normal shutdown signal.') + logger.exception("Failed to reap waiting jobs for %s", inst.hostname) + logger.warning("Normal shutdown processed for instance %s; instance removed from capacity pool.", inst.hostname) -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def migrate_jsonfield(table, pkfield, columns): batchsize = 10000 with advisory_lock(f'json_migration_{table}', wait=False) as acquired: @@ -184,7 +230,7 @@ def migrate_jsonfield(table, pkfield, columns): logger.warning(f"Migration of {table} to jsonb is finished.") -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def apply_cluster_membership_policies(): from awx.main.signals import disable_activity_stream @@ -296,7 +342,7 @@ def apply_cluster_membership_policies(): logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute)) -@task(queue='tower_settings_change') +@task_awx(queue='tower_settings_change') def clear_setting_cache(setting_keys): # log that cache is being cleared logger.info(f"clear_setting_cache of keys {setting_keys}") @@ -309,7 +355,7 @@ def clear_setting_cache(setting_keys): cache.delete_many(cache_keys) -@task(queue='tower_broadcast_all') +@task_awx(queue='tower_broadcast_all') def delete_project_files(project_path): # TODO: possibly implement some retry logic lock_file = project_path + '.lock' @@ -327,7 +373,7 @@ def delete_project_files(project_path): logger.exception('Could not remove lock file {}'.format(lock_file)) -@task(queue='tower_broadcast_all') +@task_awx(queue='tower_broadcast_all') def profile_sql(threshold=1, minutes=1): if threshold <= 0: cache.delete('awx-profile-sql-threshold') @@ -337,7 +383,7 @@ def profile_sql(threshold=1, minutes=1): logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes)) -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -382,13 +428,13 @@ def events_processed_hook(unified_job): save_indirect_host_entries.delay(unified_job.id) -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def gather_analytics(): if is_run_threshold_reached(getattr(settings, 'AUTOMATION_ANALYTICS_LAST_GATHER', None), settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL): analytics.gather() -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def purge_old_stdout_files(): nowtime = time.time() for f in os.listdir(settings.JOBOUTPUT_ROOT): @@ -450,18 +496,18 @@ class CleanupImagesAndFiles: cls.run_remote(this_inst, **kwargs) -@task(queue='tower_broadcast_all') +@task_awx(queue='tower_broadcast_all') def handle_removed_image(remove_images=None): """Special broadcast invocation of this method to handle case of deleted EE""" CleanupImagesAndFiles.run(remove_images=remove_images, file_pattern='') -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def cleanup_images_and_files(): CleanupImagesAndFiles.run(image_prune=True) -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def cluster_node_health_check(node): """ Used for the health check endpoint, refreshes the status of the instance, but must be ran on target node @@ -480,7 +526,7 @@ def cluster_node_health_check(node): this_inst.local_health_check() -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def execution_node_health_check(node): if node == '': logger.warning('Remote health check incorrectly called with blank string') @@ -597,8 +643,109 @@ def inspect_execution_and_hop_nodes(instance_list): execution_node_health_check.apply_async([hostname]) -@task(queue=get_task_queuename, bind_kwargs=['dispatch_time', 'worker_tasks']) +@task_awx(queue=get_task_queuename, bind_kwargs=['dispatch_time', 'worker_tasks']) def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None): + """ + Original implementation for AWX dispatcher. + Uses worker_tasks from bind_kwargs to track running tasks. + """ + # Run common instance management logic + this_inst, instance_list, lost_instances = _heartbeat_instance_management() + if this_inst is None: + return # Early return case from instance management + + # Check versions + _heartbeat_check_versions(this_inst, instance_list) + + # Handle lost instances + _heartbeat_handle_lost_instances(lost_instances, this_inst) + + # Run local reaper - original implementation using worker_tasks + if worker_tasks is not None: + active_task_ids = [] + for task_list in worker_tasks.values(): + active_task_ids.extend(task_list) + + # Convert dispatch_time to datetime + ref_time = datetime.fromisoformat(dispatch_time) if dispatch_time else now() + + reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=ref_time) + + if max(len(task_list) for task_list in worker_tasks.values()) <= 1: + reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=ref_time) + + +@task(queue=get_task_queuename, bind=True) +def adispatch_cluster_node_heartbeat(binder): + """ + Dispatcherd implementation. + Uses Control API to get running tasks. + """ + # Run common instance management logic + this_inst, instance_list, lost_instances = _heartbeat_instance_management() + if this_inst is None: + return # Early return case from instance management + + # Check versions + _heartbeat_check_versions(this_inst, instance_list) + + # Handle lost instances + _heartbeat_handle_lost_instances(lost_instances, this_inst) + + # Get running tasks using dispatcherd API + active_task_ids = _get_active_task_ids_from_dispatcherd(binder) + if active_task_ids is None: + logger.warning("No active task IDs retrieved from dispatcherd, skipping reaper") + return # Failed to get task IDs, don't attempt reaping + + # Run local reaper using tasks from dispatcherd + ref_time = now() # No dispatch_time in dispatcherd version + logger.debug(f"Running reaper with {len(active_task_ids)} excluded UUIDs") + reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=ref_time) + # If waiting jobs are hanging out, resubmit them + if UnifiedJob.objects.filter(controller_node=settings.CLUSTER_HOST_ID, status='waiting').exists(): + from awx.main.tasks.jobs import dispatch_waiting_jobs + + dispatch_waiting_jobs.apply_async(queue=get_task_queuename()) + + +def _get_active_task_ids_from_dispatcherd(binder): + """ + Retrieve active task IDs from the dispatcherd control API. + + Returns: + list: List of active task UUIDs + None: If there was an error retrieving the data + """ + active_task_ids = [] + try: + + logger.debug("Querying dispatcherd API for running tasks") + data = binder.control('running') + + # Extract UUIDs from the running data + # Process running data: first item is a dict with node_id and task entries + data.pop('node_id', None) + + # Extract task UUIDs from data structure + for task_key, task_value in data.items(): + if isinstance(task_value, dict) and 'uuid' in task_value: + active_task_ids.append(task_value['uuid']) + logger.debug(f"Found active task with UUID: {task_value['uuid']}") + elif isinstance(task_key, str): + # Handle case where UUID might be the key + active_task_ids.append(task_key) + logger.debug(f"Found active task with key: {task_key}") + + logger.debug(f"Retrieved {len(active_task_ids)} active task IDs from dispatcherd") + return active_task_ids + except Exception: + logger.exception("Failed to get running tasks from dispatcherd") + return None + + +def _heartbeat_instance_management(): + """Common logic for heartbeat instance management.""" logger.debug("Cluster node heartbeat task.") nowtime = now() instance_list = list(Instance.objects.filter(node_state__in=(Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED))) @@ -625,7 +772,7 @@ def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None): this_inst.local_health_check() if startup_event and this_inst.capacity != 0: logger.warning(f'Rejoining the cluster as instance {this_inst.hostname}. Prior last_seen {last_last_seen}') - return + return None, None, None # Early return case elif not last_last_seen: logger.warning(f'Instance does not have recorded last_seen, updating to {nowtime}') elif (nowtime - last_last_seen) > timedelta(seconds=settings.CLUSTER_NODE_HEARTBEAT_PERIOD + 2): @@ -638,7 +785,12 @@ def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None): this_inst.local_health_check() else: raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID)) - # IFF any node has a greater version than we do, then we'll shutdown services + + return this_inst, instance_list, lost_instances + + +def _heartbeat_check_versions(this_inst, instance_list): + """Check versions across instances and determine if shutdown is needed.""" for other_inst in instance_list: if other_inst.node_type in ('execution', 'hop'): continue @@ -655,6 +807,9 @@ def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None): stop_local_services(communicate=False) raise RuntimeError("Shutting down.") + +def _heartbeat_handle_lost_instances(lost_instances, this_inst): + """Handle lost instances by reaping their jobs and marking them offline.""" for other_inst in lost_instances: try: explanation = "Job reaped due to instance shutdown" @@ -685,17 +840,8 @@ def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None): else: logger.exception('No SQL state available. Error marking {} as lost'.format(other_inst.hostname)) - # Run local reaper - if worker_tasks is not None: - active_task_ids = [] - for task_list in worker_tasks.values(): - active_task_ids.extend(task_list) - reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time)) - if max(len(task_list) for task_list in worker_tasks.values()) <= 1: - reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time)) - -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def awx_receptor_workunit_reaper(): """ When an AWX job is launched via receptor, files such as status, stdin, and stdout are created @@ -733,7 +879,7 @@ def awx_receptor_workunit_reaper(): administrative_workunit_reaper(receptor_work_list) -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def awx_k8s_reaper(): if not settings.RECEPTOR_RELEASE_WORK: return @@ -756,7 +902,7 @@ def awx_k8s_reaper(): logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group)) -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def awx_periodic_scheduler(): lock_session_timeout_milliseconds = settings.TASK_MANAGER_LOCK_TIMEOUT * 1000 with advisory_lock('awx_periodic_scheduler_lock', lock_session_timeout_milliseconds=lock_session_timeout_milliseconds, wait=False) as acquired: @@ -815,7 +961,7 @@ def awx_periodic_scheduler(): emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules")) -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def handle_failure_notifications(task_ids): """A task-ified version of the method that sends notifications.""" found_task_ids = set() @@ -830,7 +976,7 @@ def handle_failure_notifications(task_ids): logger.warning(f'Could not send notifications for {deleted_tasks} because they were not found in the database') -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def update_inventory_computed_fields(inventory_id): """ Signal handler and wrapper around inventory.update_computed_fields to @@ -880,7 +1026,7 @@ def update_smart_memberships_for_inventory(smart_inventory): return False -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def update_host_smart_inventory_memberships(): smart_inventories = Inventory.objects.filter(kind='smart', host_filter__isnull=False, pending_deletion=False) changed_inventories = set([]) @@ -896,7 +1042,7 @@ def update_host_smart_inventory_memberships(): smart_inventory.update_computed_fields() -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def delete_inventory(inventory_id, user_id, retries=5): # Delete inventory as user if user_id is None: @@ -958,7 +1104,7 @@ def _reconstruct_relationships(copy_mapping): new_obj.save() -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, permission_check_func=None): logger.debug('Deep copy {} from {} to {}.'.format(model_name, obj_pk, new_obj_pk)) @@ -1013,7 +1159,7 @@ def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, p update_inventory_computed_fields.delay(new_obj.id) -@task(queue=get_task_queuename) +@task_awx(queue=get_task_queuename) def periodic_resource_sync(): if not getattr(settings, 'RESOURCE_SERVER', None): logger.debug("Skipping periodic resource_sync, RESOURCE_SERVER not configured") diff --git a/awx/main/tests/conftest.py b/awx/main/tests/conftest.py index 71e9637ed9..a6f7b5aca9 100644 --- a/awx/main/tests/conftest.py +++ b/awx/main/tests/conftest.py @@ -209,6 +209,12 @@ def mock_get_event_queryset_no_job_created(): yield _fixture +@pytest.fixture(scope='session', autouse=True) +def mock_dispatcherd_publish(): + with mock.patch('dispatcherd.brokers.pg_notify.Broker.publish_message', autospec=True): + yield + + @pytest.fixture def mock_me(): "Allows Instance.objects.me() to work without touching the database" diff --git a/awx/main/tests/data/projects/debug/sleep.yml b/awx/main/tests/data/projects/debug/sleep.yml new file mode 100644 index 0000000000..5ae223f29d --- /dev/null +++ b/awx/main/tests/data/projects/debug/sleep.yml @@ -0,0 +1,9 @@ +--- +- hosts: all + gather_facts: false + connection: local + vars: + sleep_interval: 5 + tasks: + - name: sleep for a specified interval + command: sleep '{{ sleep_interval }}' diff --git a/awx/main/tests/data/sleep_task.py b/awx/main/tests/data/sleep_task.py index f9ff58b69a..59bc6254e2 100644 --- a/awx/main/tests/data/sleep_task.py +++ b/awx/main/tests/data/sleep_task.py @@ -1,17 +1,57 @@ import time import logging +from dispatcherd.publish import task + +from django.db import connection + from awx.main.dispatch import get_task_queuename -from awx.main.dispatch.publish import task +from awx.main.dispatch.publish import task as old_task + +from ansible_base.lib.utils.db import advisory_lock logger = logging.getLogger(__name__) -@task(queue=get_task_queuename) +@old_task(queue=get_task_queuename) def sleep_task(seconds=10, log=False): if log: logger.info('starting sleep_task') time.sleep(seconds) if log: logger.info('finished sleep_task') + + +@task() +def sleep_break_connection(seconds=0.2): + """ + Interact with the database in an intentionally breaking way. + After this finishes, queries made by this connection are expected to error + with "the connection is closed" + This is obviously a problem for any task that comes afterwards. + So this is used to break things so that the fixes may be demonstrated. + """ + with connection.cursor() as cursor: + cursor.execute(f"SET idle_session_timeout = '{seconds / 2}s';") + + logger.info(f'sleeping for {seconds}s > {seconds / 2}s session timeout') + time.sleep(seconds) + + for i in range(1, 3): + logger.info(f'\nRunning query number {i}') + try: + with connection.cursor() as cursor: + cursor.execute("SELECT 1;") + logger.info(' query worked, not expected') + except Exception as exc: + logger.info(f' query errored as expected\ntype: {type(exc)}\nstr: {str(exc)}') + + logger.info(f'Connection present: {bool(connection.connection)}, reports closed: {getattr(connection.connection, "closed", "not_found")}') + + +@task() +def advisory_lock_exception(): + time.sleep(0.2) # so it can fill up all the workers... hacky for now + with advisory_lock('advisory_lock_exception', lock_session_timeout_milliseconds=20): + raise RuntimeError('this is an intentional error') diff --git a/awx/main/tests/functional/tasks/test_tasks_jobs.py b/awx/main/tests/functional/tasks/test_tasks_jobs.py index 6f104b0207..012ee20fdb 100644 --- a/awx/main/tests/functional/tasks/test_tasks_jobs.py +++ b/awx/main/tests/functional/tasks/test_tasks_jobs.py @@ -15,3 +15,17 @@ def test_does_not_run_reaped_job(mocker, mock_me): job.refresh_from_db() assert job.status == 'failed' mock_run.assert_not_called() + + +@pytest.mark.django_db +def test_cancel_flag_on_start(jt_linked, caplog): + job = jt_linked.create_unified_job() + job.status = 'waiting' + job.cancel_flag = True + job.save() + + task = RunJob() + task.run(job.id) + + job = Job.objects.get(id=job.id) + assert job.status == 'canceled' diff --git a/awx/main/tests/functional/test_dispatch.py b/awx/main/tests/functional/test_dispatch.py index 5d55fef5fb..a5a344d504 100644 --- a/awx/main/tests/functional/test_dispatch.py +++ b/awx/main/tests/functional/test_dispatch.py @@ -5,8 +5,11 @@ import signal import time import yaml from unittest import mock +from copy import deepcopy from django.utils.timezone import now as tz_now +from django.conf import settings +from django.test.utils import override_settings import pytest from awx.main.models import Job, WorkflowJob, Instance @@ -300,6 +303,13 @@ class TestTaskDispatcher: class TestTaskPublisher: + @pytest.fixture(autouse=True) + def _disable_dispatcherd(self): + ffs = deepcopy(settings.FLAGS) + ffs['FEATURE_DISPATCHERD_ENABLED'][0]['value'] = False + with override_settings(FLAGS=ffs): + yield + def test_function_callable(self): assert add(2, 2) == 4 diff --git a/awx/main/tests/functional/test_inventory_source_injectors.py b/awx/main/tests/functional/test_inventory_source_injectors.py index c7a8063a51..12807ce249 100644 --- a/awx/main/tests/functional/test_inventory_source_injectors.py +++ b/awx/main/tests/functional/test_inventory_source_injectors.py @@ -209,7 +209,7 @@ def test_inventory_update_injected_content(product_name, this_kind, inventory, f source_vars=src_vars, ) inventory_source.credentials.add(fake_credential_factory(this_kind)) - inventory_update = inventory_source.create_unified_job() + inventory_update = inventory_source.create_unified_job(_eager_fields={'status': 'waiting'}) task = RunInventoryUpdate() def substitute_run(awx_receptor_job): diff --git a/awx/main/tests/functional/test_licenses.py b/awx/main/tests/functional/test_licenses.py index 167f37dd52..aefec6153b 100644 --- a/awx/main/tests/functional/test_licenses.py +++ b/awx/main/tests/functional/test_licenses.py @@ -47,6 +47,7 @@ def index_licenses(path): def parse_requirement(reqt): parsed_requirement = parse_req_from_line(reqt.requirement, None) + assert parsed_requirement.requirement, reqt.__dict__ name = parsed_requirement.requirement.name version = str(parsed_requirement.requirement.specifier) if version.startswith('=='): diff --git a/awx/main/tests/live/tests/conftest.py b/awx/main/tests/live/tests/conftest.py index cedfe84bfb..6c932d7b86 100644 --- a/awx/main/tests/live/tests/conftest.py +++ b/awx/main/tests/live/tests/conftest.py @@ -175,7 +175,7 @@ def project_factory(post, default_org, admin): @pytest.fixture def run_job_from_playbook(demo_inv, post, admin, project_factory): - def _rf(test_name, playbook, local_path=None, scm_url=None, jt_params=None, proj=None): + def _rf(test_name, playbook, local_path=None, scm_url=None, jt_params=None, proj=None, wait=True): jt_name = f'{test_name} JT: {playbook}' if not proj: @@ -206,9 +206,9 @@ def run_job_from_playbook(demo_inv, post, admin, project_factory): job = jt.create_unified_job() job.signal_start() - wait_for_job(job) - assert job.status == 'successful' - + if wait: + wait_for_job(job) + assert job.status == 'successful' return {'job': job, 'job_template': jt, 'project': proj} return _rf diff --git a/awx/main/tests/live/tests/dispatcherd/test_connection_recovery.py b/awx/main/tests/live/tests/dispatcherd/test_connection_recovery.py new file mode 100644 index 0000000000..183d9ca1d5 --- /dev/null +++ b/awx/main/tests/live/tests/dispatcherd/test_connection_recovery.py @@ -0,0 +1,74 @@ +import time + +from dispatcherd.config import settings +from dispatcherd.factories import get_control_from_settings +from dispatcherd.utils import serialize_task + +from awx.main.models import JobTemplate + +from awx.main.tests.data.sleep_task import sleep_break_connection, advisory_lock_exception +from awx.main.tests.live.tests.conftest import wait_for_job + + +def poll_for_task_finish(task_name): + running_tasks = [1] + start = time.monotonic() + ctl = get_control_from_settings() + while running_tasks: + responses = ctl.control_with_reply('running') + assert len(responses) == 1 + response = responses[0] + response.pop('node_id') + running_tasks = [task_data for task_data in response.values() if task_data['task'] == task_name] + if time.monotonic() - start > 5.0: + assert False, f'Never finished working through tasks: {running_tasks}' + + +def check_jobs_work(): + jt = JobTemplate.objects.get(name='Demo Job Template') + job = jt.create_unified_job() + job.signal_start() + wait_for_job(job) + + +def test_advisory_lock_error_clears(): + """Run a task that has an exception while holding advisory_lock + + This is regression testing for a bug in its exception handling + expected to be fixed by + https://github.com/ansible/django-ansible-base/pull/713 + + This is an "easier" test case than the next, + because it passes just by fixing the DAB case, + and passing this does not generally guarentee that + workers will not be left with a connection in a bad state. + """ + min_workers = settings.service['pool_kwargs']['min_workers'] + + for i in range(min_workers): + advisory_lock_exception.delay() + + task_name = serialize_task(advisory_lock_exception) + poll_for_task_finish(task_name) + + # Jobs should still work even after the breaking task has ran + check_jobs_work() + + +def test_can_recover_connection(): + """Run a task that intentionally times out the worker connection + + If no connection fixing is implemented outside of that task scope, + then subsequent tasks will all error, thus checking that jobs run, + after running the sleep_break_connection task. + """ + min_workers = settings.service['pool_kwargs']['min_workers'] + + for i in range(min_workers): + sleep_break_connection.delay() + + task_name = serialize_task(sleep_break_connection) + poll_for_task_finish(task_name) + + # Jobs should still work even after the breaking task has ran + check_jobs_work() diff --git a/awx/main/tests/live/tests/test_job_cancel.py b/awx/main/tests/live/tests/test_job_cancel.py new file mode 100644 index 0000000000..6a88d4b9a8 --- /dev/null +++ b/awx/main/tests/live/tests/test_job_cancel.py @@ -0,0 +1,40 @@ +import time + +from awx.api.versioning import reverse +from awx.main.models import Job + +from awx.main.tests.live.tests.conftest import wait_for_events + + +def test_cancel_and_delete_job(live_tmp_folder, run_job_from_playbook, post, delete, admin): + res = run_job_from_playbook('test_cancel_and_delete_job', 'sleep.yml', scm_url=f'file://{live_tmp_folder}/debug', wait=False) + job = res['job'] + assert job.status == 'pending' + + # Wait for first event so that we can be sure the job is in-progress first + start = time.time() + timeout = 10.0 + while not job.job_events.exists(): + time.sleep(0.2) + if time.time() - start > timeout: + assert False, f'Did not receive first event for job_id={job.id} in {timeout} seconds' + + # Now cancel the job + url = reverse("api:job_cancel", kwargs={'pk': job.pk}) + post(url, user=admin, expect=202) + + # Job status should change to expected status before infinity + start = time.time() + timeout = 5.0 + job.refresh_from_db() + while job.status != 'canceled': + time.sleep(0.05) + job.refresh_from_db(fields=['status']) + if time.time() - start > timeout: + assert False, f'job_id={job.id} still status={job.status} after {timeout} seconds' + + wait_for_events(job) + url = reverse("api:job_detail", kwargs={'pk': job.pk}) + delete(url, user=admin, expect=204) + + assert not Job.objects.filter(id=job.id).exists() diff --git a/awx/main/tests/unit/tasks/test_signals.py b/awx/main/tests/unit/tasks/test_signals.py index 75915504c5..2a63b30d38 100644 --- a/awx/main/tests/unit/tasks/test_signals.py +++ b/awx/main/tests/unit/tasks/test_signals.py @@ -50,7 +50,7 @@ def test_outer_inner_signal_handling(): @with_signal_handling def f1(): assert signal_callback() is False - signal_state.set_sigterm_flag() + signal_state.set_signal_flag(for_signal=signal.SIGTERM) assert signal_callback() f2() @@ -74,7 +74,7 @@ def test_inner_outer_signal_handling(): @with_signal_handling def f2(): assert signal_callback() is False - signal_state.set_sigint_flag() + signal_state.set_signal_flag(for_signal=signal.SIGINT) assert signal_callback() @with_signal_handling diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index f62e547c28..4ddbd5e5ce 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -107,7 +107,7 @@ def job(): @pytest.fixture def adhoc_job(): - return AdHocCommand(pk=1, id=1, inventory=Inventory()) + return AdHocCommand(pk=1, id=1, inventory=Inventory(), status='waiting') @pytest.fixture @@ -481,26 +481,6 @@ class TestGenericRun: assert update_model_call['status'] == 'error' assert update_model_call['emitted_events'] == 0 - def test_cancel_flag(self, job, update_model_wrapper, execution_environment, mock_me, mock_create_partition): - job.status = 'running' - job.cancel_flag = True - job.websocket_emit_status = mock.Mock() - job.send_notification_templates = mock.Mock() - job.execution_environment = execution_environment - - task = jobs.RunJob() - task.instance = job - task.update_model = mock.Mock(wraps=update_model_wrapper) - task.model.objects.get = mock.Mock(return_value=job) - task.build_private_data_files = mock.Mock() - - with mock.patch('awx.main.tasks.jobs.shutil.copytree'): - with pytest.raises(Exception): - task.run(1) - - for c in [mock.call(1, start_args='', status='canceled')]: - assert c in task.update_model.call_args_list - def test_event_count(self, mock_me): task = jobs.RunJob() task.runner_callback.dispatcher = mock.MagicMock() @@ -589,6 +569,8 @@ class TestAdhocRun(TestJobExecution): adhoc_job.send_notification_templates = mock.Mock() task = jobs.RunAdHocCommand() + adhoc_job.status = 'running' # to bypass status flip + task.instance = adhoc_job # to bypass fetch task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper) task.model.objects.get = mock.Mock(return_value=adhoc_job) task.build_inventory = mock.Mock() diff --git a/awx/main/utils/db.py b/awx/main/utils/db.py index 40e6598072..2078b28d49 100644 --- a/awx/main/utils/db.py +++ b/awx/main/utils/db.py @@ -1,8 +1,8 @@ # Copyright (c) 2017 Ansible by Red Hat # All Rights Reserved. - from awx.settings.application_name import set_application_name + from django.conf import settings diff --git a/awx/main/utils/external_logging.py b/awx/main/utils/external_logging.py index 061ab40a95..420b5073bc 100644 --- a/awx/main/utils/external_logging.py +++ b/awx/main/utils/external_logging.py @@ -6,7 +6,7 @@ import urllib.parse as urlparse from django.conf import settings from awx.main.utils.reload import supervisor_service_command -from awx.main.dispatch.publish import task +from awx.main.dispatch.publish import task as task_awx def construct_rsyslog_conf_template(settings=settings): @@ -139,7 +139,7 @@ def construct_rsyslog_conf_template(settings=settings): return tmpl -@task(queue='rsyslog_configurer') +@task_awx(queue='rsyslog_configurer') def reconfigure_rsyslog(): tmpl = construct_rsyslog_conf_template() # Write config to a temp file then move it to preserve atomicity diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 81ef13cc4d..abe857f7d0 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -422,6 +422,9 @@ DISPATCHER_DB_DOWNTIME_TOLERANCE = 40 # sqlite3 based tests will use this DISPATCHER_MOCK_PUBLISH = False +# Debugging sockfile for the --status command +DISPATCHERD_DEBUGGING_SOCKFILE = os.path.join(BASE_DIR, 'dispatcherd.sock') + BROKER_URL = 'unix:///var/run/redis/redis.sock' CELERYBEAT_SCHEDULE = { 'tower_scheduler': {'task': 'awx.main.tasks.system.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}}, @@ -446,6 +449,17 @@ CELERYBEAT_SCHEDULE = { }, } +DISPATCHER_SCHEDULE = {} +for options in CELERYBEAT_SCHEDULE.values(): + new_options = options.copy() + task_name = options['task'] + # Handle the only one exception case of the heartbeat which has a new implementation + if task_name == 'awx.main.tasks.system.cluster_node_heartbeat': + task_name = 'awx.main.tasks.system.adispatch_cluster_node_heartbeat' + new_options['task'] = task_name + new_options['schedule'] = options['schedule'].total_seconds() + DISPATCHER_SCHEDULE[task_name] = new_options + # Django Caching Configuration DJANGO_REDIS_IGNORE_EXCEPTIONS = True CACHES = {'default': {'BACKEND': 'awx.main.cache.AWXRedisCache', 'LOCATION': 'unix:///var/run/redis/redis.sock?db=1'}} @@ -795,6 +809,7 @@ LOGGING = { 'social': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'}, 'system_tracking_migrations': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'}, 'rbac_migrations': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'}, + 'dispatcherd': {'handlers': ['dispatcher', 'console'], 'level': 'INFO'}, }, } @@ -994,7 +1009,7 @@ HOST_METRIC_SUMMARY_TASK_INTERVAL = 7 # days # projects can take advantage. METRICS_SERVICE_CALLBACK_RECEIVER = 'callback_receiver' -METRICS_SERVICE_DISPATCHER = 'dispatcher' +METRICS_SERVICE_DISPATCHER = 'dispatcherd' METRICS_SERVICE_WEBSOCKETS = 'websockets' METRICS_SUBSYSTEM_CONFIG = { @@ -1099,6 +1114,7 @@ FLAG_SOURCES = ('flags.sources.SettingsFlagsSource',) FLAGS = { 'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}], 'FEATURE_POLICY_AS_CODE_ENABLED': [{'condition': 'boolean', 'value': False}], + 'FEATURE_DISPATCHERD_ENABLED': [{'condition': 'boolean', 'value': False}], } # Dispatcher worker lifetime. If set to None, workers will never be retired diff --git a/awx/settings/development_defaults.py b/awx/settings/development_defaults.py index bb726cb372..9725b0abb7 100644 --- a/awx/settings/development_defaults.py +++ b/awx/settings/development_defaults.py @@ -73,4 +73,5 @@ AWX_DISABLE_TASK_MANAGERS = False def set_dev_flags(settings): defaults_flags = settings.get("FLAGS", {}) defaults_flags['FEATURE_INDIRECT_NODE_COUNTING_ENABLED'] = [{'condition': 'boolean', 'value': True}] + defaults_flags['FEATURE_DISPATCHERD_ENABLED'] = [{'condition': 'boolean', 'value': True}] return {'FLAGS': defaults_flags} diff --git a/awx/settings/production_defaults.py b/awx/settings/production_defaults.py index 5599b81753..35bebc1f7b 100644 --- a/awx/settings/production_defaults.py +++ b/awx/settings/production_defaults.py @@ -23,8 +23,13 @@ ALLOWED_HOSTS = [] # only used for deprecated fields and management commands for them BASE_VENV_PATH = os.path.realpath("/var/lib/awx/venv") +# Switch to a writable location for the dispatcher sockfile location +DISPATCHERD_DEBUGGING_SOCKFILE = os.path.realpath('/var/run/tower/dispatcherd.sock') + # Very important that this is editable (not read_only) in the API AWX_ISOLATION_SHOW_PATHS = [ '/etc/pki/ca-trust:/etc/pki/ca-trust:O', '/usr/share/pki:/usr/share/pki:O', ] + +del os diff --git a/awx_collection/test/awx/conftest.py b/awx_collection/test/awx/conftest.py index 6de5ed9c25..a4bf207c20 100644 --- a/awx_collection/test/awx/conftest.py +++ b/awx_collection/test/awx/conftest.py @@ -18,7 +18,7 @@ import pytest from ansible.module_utils.six import raise_from from ansible_base.rbac.models import RoleDefinition, DABPermission -from awx.main.tests.conftest import load_all_credentials # noqa: F401; pylint: disable=unused-import +from awx.main.tests.conftest import load_all_credentials, mock_dispatcherd_publish # noqa: F401; pylint: disable=unused-import from awx.main.tests.functional.conftest import _request from awx.main.tests.functional.conftest import credentialtype_scm, credentialtype_ssh # noqa: F401; pylint: disable=unused-import from awx.main.models import ( diff --git a/docs/tasks.md b/docs/tasks.md index 65dce39ba4..aa91b90339 100644 --- a/docs/tasks.md +++ b/docs/tasks.md @@ -20,6 +20,20 @@ In this document, we will go into a bit of detail about how and when AWX runs Py - Every node in an AWX cluster runs a periodic task that serves as a heartbeat and capacity check +Transition to dispatcherd Library +--------------------------------- + +The task system logic is being split out into a new library: + +https://github.com/ansible/dispatcherd + +Currently AWX is in a transitionary period where this is put behind a feature flag. +The difference can be seen in how the task decorator is imported. + + - old `from awx.main.dispatch.publish import task` + - transition `from awx.main.dispatch.publish import task as task_awx` + - new `from dispatcherd.publish import task` + Tasks, Queues and Workers ---------------- @@ -60,7 +74,7 @@ Defining and Running Tasks Tasks are defined in AWX's source code, and generally live in the `awx.main.tasks` module. Tasks can be defined as simple functions: - from awx.main.dispatch.publish import task + from awx.main.dispatch.publish import task as task_awx @task() def add(a, b): diff --git a/licenses/dispatcherd.txt b/licenses/dispatcherd.txt new file mode 100644 index 0000000000..d645695673 --- /dev/null +++ b/licenses/dispatcherd.txt @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/licenses/psycopg-3.2.3.tar.gz b/licenses/psycopg-3.2.3.tar.gz deleted file mode 100644 index f9a5d56430..0000000000 Binary files a/licenses/psycopg-3.2.3.tar.gz and /dev/null differ diff --git a/licenses/psycopg-3.2.6.tar.gz b/licenses/psycopg-3.2.6.tar.gz new file mode 100644 index 0000000000..7732f588d1 Binary files /dev/null and b/licenses/psycopg-3.2.6.tar.gz differ diff --git a/requirements/requirements.in b/requirements/requirements.in index 3b9cb59b95..838926747c 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -70,3 +70,4 @@ setuptools_scm[toml] # see UPGRADE BLOCKERs, xmlsec build dep setuptools-rust>=0.11.4 # cryptography build dep pkgconfig>=1.5.1 # xmlsec build dep - needed for offline build django-flags>=5.0.13 +dispatcherd # tasking system, previously part of AWX code base diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 69b019b304..2c3c9a2cf6 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -128,6 +128,8 @@ deprecated==1.2.15 # opentelemetry-exporter-otlp-proto-http # opentelemetry-semantic-conventions # pygithub +dispatcherd==2025.5.12 + # via -r /awx_devel/requirements/requirements.in distro==1.9.0 # via -r /awx_devel/requirements/requirements.in django==4.2.20 @@ -366,7 +368,7 @@ protobuf==5.29.3 # opentelemetry-proto psutil==6.1.1 # via -r /awx_devel/requirements/requirements.in -psycopg==3.2.3 +psycopg==3.2.6 # via -r /awx_devel/requirements/requirements.in ptyprocess==0.7.0 # via pexpect @@ -425,6 +427,7 @@ pyyaml==6.0.2 # via # -r /awx_devel/requirements/requirements.in # ansible-runner + # dispatcherd # djangorestframework-yaml # kubernetes # receptorctl