Add timeout and on duplicate to system tasks (#16169)

Modify the invocation of @task_awx to accept timeout and
on_duplicate keyword arguments. These arguments are
only used in the new dispatcher implementation.

Add decorator params:
- timeout
- on_duplicate

to tasks to ensure better recovery for
stuck or long-running processes.

---------

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
This commit is contained in:
Seth Foster 2025-11-12 23:18:57 -05:00 committed by GitHub
parent f81859510c
commit 2fa2cd8beb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 44 additions and 46 deletions

View File

@ -9,7 +9,7 @@ from awx.main.dispatch import get_task_queuename
logger = logging.getLogger('awx.main.scheduler') logger = logging.getLogger('awx.main.scheduler')
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=300, on_duplicate='discard')
def send_subsystem_metrics(): def send_subsystem_metrics():
DispatcherMetrics().send_metrics() DispatcherMetrics().send_metrics()
CallbackReceiverMetrics().send_metrics() CallbackReceiverMetrics().send_metrics()

View File

@ -5,6 +5,7 @@ import time
from uuid import uuid4 from uuid import uuid4
from dispatcherd.publish import submit_task from dispatcherd.publish import submit_task
from dispatcherd.processors.blocker import Blocker
from dispatcherd.utils import resolve_callable from dispatcherd.utils import resolve_callable
from django_guid import get_guid from django_guid import get_guid
@ -60,13 +61,17 @@ class task:
print(f"Time I was dispatched: {dispatch_time}") print(f"Time I was dispatched: {dispatch_time}")
""" """
def __init__(self, queue=None, bind_kwargs=None): def __init__(self, queue=None, bind_kwargs=None, timeout=None, on_duplicate=None):
self.queue = queue self.queue = queue
self.bind_kwargs = bind_kwargs self.bind_kwargs = bind_kwargs
self.timeout = timeout
self.on_duplicate = on_duplicate
def __call__(self, fn=None): def __call__(self, fn=None):
queue = self.queue queue = self.queue
bind_kwargs = self.bind_kwargs bind_kwargs = self.bind_kwargs
timeout = self.timeout
on_duplicate = self.on_duplicate
class PublisherMixin(object): class PublisherMixin(object):
queue = None queue = None
@ -102,7 +107,19 @@ class task:
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
# At this point we have the import string, and submit_task wants the method, so back to that # At this point we have the import string, and submit_task wants the method, so back to that
actual_task = resolve_callable(cls.name) actual_task = resolve_callable(cls.name)
return submit_task(actual_task, args=args, kwargs=kwargs, queue=queue, uuid=uuid, **kw) processor_options = ()
if on_duplicate is not None:
processor_options = (Blocker.Params(on_duplicate=on_duplicate),)
return submit_task(
actual_task,
args=args,
kwargs=kwargs,
queue=queue,
uuid=uuid,
timeout=timeout,
processor_options=processor_options,
**kw,
)
except Exception: except Exception:
logger.exception(f"[DISPATCHER] Failed to check for alternative dispatcherd implementation for {cls.name}") logger.exception(f"[DISPATCHER] Failed to check for alternative dispatcherd implementation for {cls.name}")
# Continue with original implementation if anything fails # Continue with original implementation if anything fails

View File

@ -159,7 +159,7 @@ def cleanup_old_indirect_host_entries() -> None:
IndirectManagedNodeAudit.objects.filter(created__lt=limit).delete() IndirectManagedNodeAudit.objects.filter(created__lt=limit).delete()
@task(queue=get_task_queuename) @task(queue=get_task_queuename, timeout=3600 * 5)
def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> None: def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> None:
try: try:
job = Job.objects.get(id=job_id) job = Job.objects.get(id=job_id)
@ -201,7 +201,7 @@ def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> Non
logger.exception(f'Error processing indirect host data for job_id={job_id}') logger.exception(f'Error processing indirect host data for job_id={job_id}')
@task(queue=get_task_queuename) @task(queue=get_task_queuename, timeout=3600 * 5)
def cleanup_and_save_indirect_host_entries_fallback() -> None: def cleanup_and_save_indirect_host_entries_fallback() -> None:
if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
return return

View File

@ -852,7 +852,7 @@ def reload_receptor():
raise RuntimeError("Receptor reload failed") raise RuntimeError("Receptor reload failed")
@task_awx() @task_awx(on_duplicate='queue_one')
def write_receptor_config(): def write_receptor_config():
""" """
This task runs async on each control node, K8S only. This task runs async on each control node, K8S only.
@ -875,7 +875,7 @@ def write_receptor_config():
reload_receptor() reload_receptor()
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, on_duplicate='discard')
def remove_deprovisioned_node(hostname): def remove_deprovisioned_node(hostname):
InstanceLink.objects.filter(source__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) InstanceLink.objects.filter(source__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)
InstanceLink.objects.filter(target__instance__hostname=hostname).update(link_state=InstanceLink.States.REMOVING) InstanceLink.objects.filter(target__instance__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)

View File

@ -184,7 +184,7 @@ def inform_cluster_of_shutdown():
logger.warning("Normal shutdown processed for instance %s; instance removed from capacity pool.", inst.hostname) logger.warning("Normal shutdown processed for instance %s; instance removed from capacity pool.", inst.hostname)
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=3600 * 5)
def migrate_jsonfield(table, pkfield, columns): def migrate_jsonfield(table, pkfield, columns):
batchsize = 10000 batchsize = 10000
with advisory_lock(f'json_migration_{table}', wait=False) as acquired: with advisory_lock(f'json_migration_{table}', wait=False) as acquired:
@ -230,7 +230,7 @@ def migrate_jsonfield(table, pkfield, columns):
logger.warning(f"Migration of {table} to jsonb is finished.") logger.warning(f"Migration of {table} to jsonb is finished.")
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='queue_one')
def apply_cluster_membership_policies(): def apply_cluster_membership_policies():
from awx.main.signals import disable_activity_stream from awx.main.signals import disable_activity_stream
@ -342,7 +342,7 @@ def apply_cluster_membership_policies():
logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute)) logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))
@task_awx(queue='tower_settings_change') @task_awx(queue='tower_settings_change', timeout=600)
def clear_setting_cache(setting_keys): def clear_setting_cache(setting_keys):
# log that cache is being cleared # log that cache is being cleared
logger.info(f"clear_setting_cache of keys {setting_keys}") logger.info(f"clear_setting_cache of keys {setting_keys}")
@ -355,7 +355,7 @@ def clear_setting_cache(setting_keys):
cache.delete_many(cache_keys) cache.delete_many(cache_keys)
@task_awx(queue='tower_broadcast_all') @task_awx(queue='tower_broadcast_all', timeout=600)
def delete_project_files(project_path): def delete_project_files(project_path):
# TODO: possibly implement some retry logic # TODO: possibly implement some retry logic
lock_file = project_path + '.lock' lock_file = project_path + '.lock'
@ -383,7 +383,7 @@ def profile_sql(threshold=1, minutes=1):
logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes)) logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes))
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=1800)
def send_notifications(notification_list, job_id=None): def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list): if not isinstance(notification_list, list):
raise TypeError("notification_list should be of type list") raise TypeError("notification_list should be of type list")
@ -428,13 +428,13 @@ def events_processed_hook(unified_job):
save_indirect_host_entries.delay(unified_job.id) save_indirect_host_entries.delay(unified_job.id)
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=3600 * 5, on_duplicate='discard')
def gather_analytics(): def gather_analytics():
if is_run_threshold_reached(getattr(settings, 'AUTOMATION_ANALYTICS_LAST_GATHER', None), settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL): if is_run_threshold_reached(getattr(settings, 'AUTOMATION_ANALYTICS_LAST_GATHER', None), settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL):
analytics.gather() analytics.gather()
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=600, on_duplicate='queue_one')
def purge_old_stdout_files(): def purge_old_stdout_files():
nowtime = time.time() nowtime = time.time()
for f in os.listdir(settings.JOBOUTPUT_ROOT): for f in os.listdir(settings.JOBOUTPUT_ROOT):
@ -496,37 +496,18 @@ class CleanupImagesAndFiles:
cls.run_remote(this_inst, **kwargs) cls.run_remote(this_inst, **kwargs)
@task_awx(queue='tower_broadcast_all') @task_awx(queue='tower_broadcast_all', timeout=3600)
def handle_removed_image(remove_images=None): def handle_removed_image(remove_images=None):
"""Special broadcast invocation of this method to handle case of deleted EE""" """Special broadcast invocation of this method to handle case of deleted EE"""
CleanupImagesAndFiles.run(remove_images=remove_images, file_pattern='') CleanupImagesAndFiles.run(remove_images=remove_images, file_pattern='')
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='queue_one')
def cleanup_images_and_files(): def cleanup_images_and_files():
CleanupImagesAndFiles.run(image_prune=True) CleanupImagesAndFiles.run(image_prune=True)
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=600, on_duplicate='queue_one')
def cluster_node_health_check(node):
"""
Used for the health check endpoint, refreshes the status of the instance, but must be ran on target node
"""
if node == '':
logger.warning('Local health check incorrectly called with blank string')
return
elif node != settings.CLUSTER_HOST_ID:
logger.warning(f'Local health check for {node} incorrectly sent to {settings.CLUSTER_HOST_ID}')
return
try:
this_inst = Instance.objects.me()
except Instance.DoesNotExist:
logger.warning(f'Instance record for {node} missing, could not check capacity.')
return
this_inst.local_health_check()
@task_awx(queue=get_task_queuename)
def execution_node_health_check(node): def execution_node_health_check(node):
if node == '': if node == '':
logger.warning('Remote health check incorrectly called with blank string') logger.warning('Remote health check incorrectly called with blank string')
@ -850,7 +831,7 @@ def _heartbeat_handle_lost_instances(lost_instances, this_inst):
logger.exception('No SQL state available. Error marking {} as lost'.format(other_inst.hostname)) logger.exception('No SQL state available. Error marking {} as lost'.format(other_inst.hostname))
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=1800, on_duplicate='queue_one')
def awx_receptor_workunit_reaper(): def awx_receptor_workunit_reaper():
""" """
When an AWX job is launched via receptor, files such as status, stdin, and stdout are created When an AWX job is launched via receptor, files such as status, stdin, and stdout are created
@ -896,7 +877,7 @@ def awx_receptor_workunit_reaper():
administrative_workunit_reaper(receptor_work_list) administrative_workunit_reaper(receptor_work_list)
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=1800, on_duplicate='queue_one')
def awx_k8s_reaper(): def awx_k8s_reaper():
if not settings.RECEPTOR_RELEASE_WORK: if not settings.RECEPTOR_RELEASE_WORK:
return return
@ -919,7 +900,7 @@ def awx_k8s_reaper():
logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group)) logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group))
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=3600 * 5, on_duplicate='discard')
def awx_periodic_scheduler(): def awx_periodic_scheduler():
lock_session_timeout_milliseconds = settings.TASK_MANAGER_LOCK_TIMEOUT * 1000 lock_session_timeout_milliseconds = settings.TASK_MANAGER_LOCK_TIMEOUT * 1000
with advisory_lock('awx_periodic_scheduler_lock', lock_session_timeout_milliseconds=lock_session_timeout_milliseconds, wait=False) as acquired: with advisory_lock('awx_periodic_scheduler_lock', lock_session_timeout_milliseconds=lock_session_timeout_milliseconds, wait=False) as acquired:
@ -978,7 +959,7 @@ def awx_periodic_scheduler():
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules")) emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=3600)
def handle_failure_notifications(task_ids): def handle_failure_notifications(task_ids):
"""A task-ified version of the method that sends notifications.""" """A task-ified version of the method that sends notifications."""
found_task_ids = set() found_task_ids = set()
@ -993,7 +974,7 @@ def handle_failure_notifications(task_ids):
logger.warning(f'Could not send notifications for {deleted_tasks} because they were not found in the database') logger.warning(f'Could not send notifications for {deleted_tasks} because they were not found in the database')
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=3600 * 5)
def update_inventory_computed_fields(inventory_id): def update_inventory_computed_fields(inventory_id):
""" """
Signal handler and wrapper around inventory.update_computed_fields to Signal handler and wrapper around inventory.update_computed_fields to
@ -1043,7 +1024,7 @@ def update_smart_memberships_for_inventory(smart_inventory):
return False return False
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='queue_one')
def update_host_smart_inventory_memberships(): def update_host_smart_inventory_memberships():
smart_inventories = Inventory.objects.filter(kind='smart', host_filter__isnull=False, pending_deletion=False) smart_inventories = Inventory.objects.filter(kind='smart', host_filter__isnull=False, pending_deletion=False)
changed_inventories = set([]) changed_inventories = set([])
@ -1059,7 +1040,7 @@ def update_host_smart_inventory_memberships():
smart_inventory.update_computed_fields() smart_inventory.update_computed_fields()
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=3600 * 5)
def delete_inventory(inventory_id, user_id, retries=5): def delete_inventory(inventory_id, user_id, retries=5):
# Delete inventory as user # Delete inventory as user
if user_id is None: if user_id is None:
@ -1121,7 +1102,7 @@ def _reconstruct_relationships(copy_mapping):
new_obj.save() new_obj.save()
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=600)
def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, permission_check_func=None): def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, permission_check_func=None):
logger.debug('Deep copy {} from {} to {}.'.format(model_name, obj_pk, new_obj_pk)) logger.debug('Deep copy {} from {} to {}.'.format(model_name, obj_pk, new_obj_pk))
@ -1176,7 +1157,7 @@ def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, p
update_inventory_computed_fields.delay(new_obj.id) update_inventory_computed_fields.delay(new_obj.id)
@task_awx(queue=get_task_queuename) @task_awx(queue=get_task_queuename, timeout=3600, on_duplicate='discard')
def periodic_resource_sync(): def periodic_resource_sync():
if not getattr(settings, 'RESOURCE_SERVER', None): if not getattr(settings, 'RESOURCE_SERVER', None):
logger.debug("Skipping periodic resource_sync, RESOURCE_SERVER not configured") logger.debug("Skipping periodic resource_sync, RESOURCE_SERVER not configured")

View File

@ -139,7 +139,7 @@ def construct_rsyslog_conf_template(settings=settings):
return tmpl return tmpl
@task_awx(queue='rsyslog_configurer') @task_awx(queue='rsyslog_configurer', timeout=600, on_duplicate='queue_one')
def reconfigure_rsyslog(): def reconfigure_rsyslog():
tmpl = construct_rsyslog_conf_template() tmpl = construct_rsyslog_conf_template()
# Write config to a temp file then move it to preserve atomicity # Write config to a temp file then move it to preserve atomicity