From 2fa2cd8beb3b8c47e65d0ac45338500805c51506 Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Wed, 12 Nov 2025 23:18:57 -0500 Subject: [PATCH] Add timeout and on duplicate to system tasks (#16169) Modify the invocation of @task_awx to accept timeout and on_duplicate keyword arguments. These arguments are only used in the new dispatcher implementation. Add decorator params: - timeout - on_duplicate to tasks to ensure better recovery for stuck or long-running processes. --------- Signed-off-by: Seth Foster --- awx/main/analytics/analytics_tasks.py | 2 +- awx/main/dispatch/publish.py | 21 +++++++++- awx/main/tasks/host_indirect.py | 4 +- awx/main/tasks/receptor.py | 4 +- awx/main/tasks/system.py | 57 +++++++++------------------ awx/main/utils/external_logging.py | 2 +- 6 files changed, 44 insertions(+), 46 deletions(-) diff --git a/awx/main/analytics/analytics_tasks.py b/awx/main/analytics/analytics_tasks.py index 481db3e57e..3ab6d4bad1 100644 --- a/awx/main/analytics/analytics_tasks.py +++ b/awx/main/analytics/analytics_tasks.py @@ -9,7 +9,7 @@ from awx.main.dispatch import get_task_queuename logger = logging.getLogger('awx.main.scheduler') -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=300, on_duplicate='discard') def send_subsystem_metrics(): DispatcherMetrics().send_metrics() CallbackReceiverMetrics().send_metrics() diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index ebd8c35fea..4aef040a88 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -5,6 +5,7 @@ import time from uuid import uuid4 from dispatcherd.publish import submit_task +from dispatcherd.processors.blocker import Blocker from dispatcherd.utils import resolve_callable from django_guid import get_guid @@ -60,13 +61,17 @@ class task: print(f"Time I was dispatched: {dispatch_time}") """ - def __init__(self, queue=None, bind_kwargs=None): + def __init__(self, queue=None, bind_kwargs=None, timeout=None, on_duplicate=None): self.queue = queue self.bind_kwargs = bind_kwargs + self.timeout = timeout + self.on_duplicate = on_duplicate def __call__(self, fn=None): queue = self.queue bind_kwargs = self.bind_kwargs + timeout = self.timeout + on_duplicate = self.on_duplicate class PublisherMixin(object): queue = None @@ -102,7 +107,19 @@ class task: if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): # At this point we have the import string, and submit_task wants the method, so back to that actual_task = resolve_callable(cls.name) - return submit_task(actual_task, args=args, kwargs=kwargs, queue=queue, uuid=uuid, **kw) + processor_options = () + if on_duplicate is not None: + processor_options = (Blocker.Params(on_duplicate=on_duplicate),) + return submit_task( + actual_task, + args=args, + kwargs=kwargs, + queue=queue, + uuid=uuid, + timeout=timeout, + processor_options=processor_options, + **kw, + ) except Exception: logger.exception(f"[DISPATCHER] Failed to check for alternative dispatcherd implementation for {cls.name}") # Continue with original implementation if anything fails diff --git a/awx/main/tasks/host_indirect.py b/awx/main/tasks/host_indirect.py index 632a04a687..11f32c248a 100644 --- a/awx/main/tasks/host_indirect.py +++ b/awx/main/tasks/host_indirect.py @@ -159,7 +159,7 @@ def cleanup_old_indirect_host_entries() -> None: IndirectManagedNodeAudit.objects.filter(created__lt=limit).delete() -@task(queue=get_task_queuename) +@task(queue=get_task_queuename, timeout=3600 * 5) def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> None: try: job = Job.objects.get(id=job_id) @@ -201,7 +201,7 @@ def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> Non logger.exception(f'Error processing indirect host data for job_id={job_id}') -@task(queue=get_task_queuename) +@task(queue=get_task_queuename, timeout=3600 * 5) def cleanup_and_save_indirect_host_entries_fallback() -> None: if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): return diff --git a/awx/main/tasks/receptor.py b/awx/main/tasks/receptor.py index 9ef7522c47..ad62b315be 100644 --- a/awx/main/tasks/receptor.py +++ b/awx/main/tasks/receptor.py @@ -852,7 +852,7 @@ def reload_receptor(): raise RuntimeError("Receptor reload failed") -@task_awx() +@task_awx(on_duplicate='queue_one') def write_receptor_config(): """ This task runs async on each control node, K8S only. @@ -875,7 +875,7 @@ def write_receptor_config(): reload_receptor() -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, on_duplicate='discard') def remove_deprovisioned_node(hostname): InstanceLink.objects.filter(source__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) InstanceLink.objects.filter(target__instance__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index d80bace63d..679e1786dc 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -184,7 +184,7 @@ def inform_cluster_of_shutdown(): logger.warning("Normal shutdown processed for instance %s; instance removed from capacity pool.", inst.hostname) -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=3600 * 5) def migrate_jsonfield(table, pkfield, columns): batchsize = 10000 with advisory_lock(f'json_migration_{table}', wait=False) as acquired: @@ -230,7 +230,7 @@ def migrate_jsonfield(table, pkfield, columns): logger.warning(f"Migration of {table} to jsonb is finished.") -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='queue_one') def apply_cluster_membership_policies(): from awx.main.signals import disable_activity_stream @@ -342,7 +342,7 @@ def apply_cluster_membership_policies(): logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute)) -@task_awx(queue='tower_settings_change') +@task_awx(queue='tower_settings_change', timeout=600) def clear_setting_cache(setting_keys): # log that cache is being cleared logger.info(f"clear_setting_cache of keys {setting_keys}") @@ -355,7 +355,7 @@ def clear_setting_cache(setting_keys): cache.delete_many(cache_keys) -@task_awx(queue='tower_broadcast_all') +@task_awx(queue='tower_broadcast_all', timeout=600) def delete_project_files(project_path): # TODO: possibly implement some retry logic lock_file = project_path + '.lock' @@ -383,7 +383,7 @@ def profile_sql(threshold=1, minutes=1): logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes)) -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=1800) def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -428,13 +428,13 @@ def events_processed_hook(unified_job): save_indirect_host_entries.delay(unified_job.id) -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=3600 * 5, on_duplicate='discard') def gather_analytics(): if is_run_threshold_reached(getattr(settings, 'AUTOMATION_ANALYTICS_LAST_GATHER', None), settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL): analytics.gather() -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=600, on_duplicate='queue_one') def purge_old_stdout_files(): nowtime = time.time() for f in os.listdir(settings.JOBOUTPUT_ROOT): @@ -496,37 +496,18 @@ class CleanupImagesAndFiles: cls.run_remote(this_inst, **kwargs) -@task_awx(queue='tower_broadcast_all') +@task_awx(queue='tower_broadcast_all', timeout=3600) def handle_removed_image(remove_images=None): """Special broadcast invocation of this method to handle case of deleted EE""" CleanupImagesAndFiles.run(remove_images=remove_images, file_pattern='') -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='queue_one') def cleanup_images_and_files(): CleanupImagesAndFiles.run(image_prune=True) -@task_awx(queue=get_task_queuename) -def cluster_node_health_check(node): - """ - Used for the health check endpoint, refreshes the status of the instance, but must be ran on target node - """ - if node == '': - logger.warning('Local health check incorrectly called with blank string') - return - elif node != settings.CLUSTER_HOST_ID: - logger.warning(f'Local health check for {node} incorrectly sent to {settings.CLUSTER_HOST_ID}') - return - try: - this_inst = Instance.objects.me() - except Instance.DoesNotExist: - logger.warning(f'Instance record for {node} missing, could not check capacity.') - return - this_inst.local_health_check() - - -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=600, on_duplicate='queue_one') def execution_node_health_check(node): if node == '': logger.warning('Remote health check incorrectly called with blank string') @@ -850,7 +831,7 @@ def _heartbeat_handle_lost_instances(lost_instances, this_inst): logger.exception('No SQL state available. Error marking {} as lost'.format(other_inst.hostname)) -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=1800, on_duplicate='queue_one') def awx_receptor_workunit_reaper(): """ When an AWX job is launched via receptor, files such as status, stdin, and stdout are created @@ -896,7 +877,7 @@ def awx_receptor_workunit_reaper(): administrative_workunit_reaper(receptor_work_list) -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=1800, on_duplicate='queue_one') def awx_k8s_reaper(): if not settings.RECEPTOR_RELEASE_WORK: return @@ -919,7 +900,7 @@ def awx_k8s_reaper(): logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group)) -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=3600 * 5, on_duplicate='discard') def awx_periodic_scheduler(): lock_session_timeout_milliseconds = settings.TASK_MANAGER_LOCK_TIMEOUT * 1000 with advisory_lock('awx_periodic_scheduler_lock', lock_session_timeout_milliseconds=lock_session_timeout_milliseconds, wait=False) as acquired: @@ -978,7 +959,7 @@ def awx_periodic_scheduler(): emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules")) -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=3600) def handle_failure_notifications(task_ids): """A task-ified version of the method that sends notifications.""" found_task_ids = set() @@ -993,7 +974,7 @@ def handle_failure_notifications(task_ids): logger.warning(f'Could not send notifications for {deleted_tasks} because they were not found in the database') -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=3600 * 5) def update_inventory_computed_fields(inventory_id): """ Signal handler and wrapper around inventory.update_computed_fields to @@ -1043,7 +1024,7 @@ def update_smart_memberships_for_inventory(smart_inventory): return False -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='queue_one') def update_host_smart_inventory_memberships(): smart_inventories = Inventory.objects.filter(kind='smart', host_filter__isnull=False, pending_deletion=False) changed_inventories = set([]) @@ -1059,7 +1040,7 @@ def update_host_smart_inventory_memberships(): smart_inventory.update_computed_fields() -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=3600 * 5) def delete_inventory(inventory_id, user_id, retries=5): # Delete inventory as user if user_id is None: @@ -1121,7 +1102,7 @@ def _reconstruct_relationships(copy_mapping): new_obj.save() -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=600) def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, permission_check_func=None): logger.debug('Deep copy {} from {} to {}.'.format(model_name, obj_pk, new_obj_pk)) @@ -1176,7 +1157,7 @@ def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, p update_inventory_computed_fields.delay(new_obj.id) -@task_awx(queue=get_task_queuename) +@task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='discard') def periodic_resource_sync(): if not getattr(settings, 'RESOURCE_SERVER', None): logger.debug("Skipping periodic resource_sync, RESOURCE_SERVER not configured") diff --git a/awx/main/utils/external_logging.py b/awx/main/utils/external_logging.py index 420b5073bc..21aa104a15 100644 --- a/awx/main/utils/external_logging.py +++ b/awx/main/utils/external_logging.py @@ -139,7 +139,7 @@ def construct_rsyslog_conf_template(settings=settings): return tmpl -@task_awx(queue='rsyslog_configurer') +@task_awx(queue='rsyslog_configurer', timeout=600, on_duplicate='queue_one') def reconfigure_rsyslog(): tmpl = construct_rsyslog_conf_template() # Write config to a temp file then move it to preserve atomicity