Files
awx/awx/main/dispatch/worker/callback.py
Alan Rominger f80bbc57d8 AAP-43117 Additional dispatcher removal simplifications and waiting reaper updates (#16243)
* Additional dispatcher removal simplifications and waiting repear updates

* Fix double call and logging message

* Implement bugbot comment, should reap running on lost instances

* Add test case for new pending behavior
2026-01-26 13:55:37 -05:00

345 lines
16 KiB
Python

import json
import logging
import os
import signal
import time
import datetime
from queue import Empty as QueueEmpty
from django.conf import settings
from django.utils.functional import cached_property
from django.utils.timezone import now as tz_now
from django import db
from django.db import transaction, connection as django_connection
from django_guid import set_guid
import psutil
import redis
from awx.main.utils.redis import get_redis_client
from awx.main.utils.db import set_connection_name
from awx.main.consumers import emit_channel_notification
from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent, UnifiedJob
from awx.main.constants import ACTIVE_STATES
from awx.main.models.events import emit_event_detail
from awx.main.utils.profiling import AWXProfiler
from awx.main.tasks.system import events_processed_hook
import awx.main.analytics.subsystem_metrics as s_metrics
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
def job_stats_wrapup(job_identifier, event=None):
"""Fill in the unified job host_status_counts, fire off notifications if needed"""
try:
# empty dict (versus default of None) can still indicate that events have been processed
# for job types like system jobs, and jobs with no hosts matched
host_status_counts = {}
if event:
host_status_counts = event.get_host_status_counts()
# Update host_status_counts while holding the row lock
with transaction.atomic():
uj = UnifiedJob.objects.select_for_update().get(pk=job_identifier)
uj.host_status_counts = host_status_counts
uj.save(update_fields=['host_status_counts'])
uj.log_lifecycle("stats_wrapup_finished")
# If the status was a finished state before this update was made, send notifications
# If not, we will send notifications when the status changes
if uj.status not in ACTIVE_STATES:
events_processed_hook(uj)
except Exception:
logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier))
class WorkerSignalHandler:
def __init__(self):
self.kill_now = False
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, self.exit_gracefully)
def exit_gracefully(self, *args, **kwargs):
self.kill_now = True
class CallbackBrokerWorker:
"""
A worker implementation that deserializes callback event data and persists
it into the database.
The code that *generates* these types of messages is found in the
ansible-runner display callback plugin.
"""
MAX_RETRIES = 2
INDIVIDUAL_EVENT_RETRIES = 3
last_stats = time.time()
last_flush = time.time()
total = 0
last_event = ''
prof = None
def __init__(self):
self.buff = {}
self.redis = get_redis_client()
self.subsystem_metrics = s_metrics.CallbackReceiverMetrics(auto_pipe_execute=False)
self.queue_pop = 0
self.queue_name = settings.CALLBACK_QUEUE
self.prof = AWXProfiler("CallbackBrokerWorker")
for key in self.redis.keys('awx_callback_receiver_statistics_*'):
self.redis.delete(key)
@cached_property
def pid(self):
"""This needs to be obtained after forking, or else it will give the parent process"""
return os.getpid()
def read(self):
has_redis_error = False
try:
res = self.redis.blpop(self.queue_name, timeout=1)
if res is None:
return {'event': 'FLUSH'}
self.total += 1
self.queue_pop += 1
self.subsystem_metrics.inc('callback_receiver_events_popped_redis', 1)
self.subsystem_metrics.inc('callback_receiver_events_in_memory', 1)
return json.loads(res[1])
except redis.exceptions.ConnectionError as exc:
# Low noise log, because very common and many workers will write this
logger.error(f"redis connection error: {exc}")
has_redis_error = True
time.sleep(5)
except redis.exceptions.RedisError:
logger.exception("encountered an error communicating with redis")
has_redis_error = True
time.sleep(1)
except (json.JSONDecodeError, KeyError):
logger.exception("failed to decode JSON message from redis")
finally:
if not has_redis_error:
self.record_statistics()
self.record_read_metrics()
return {'event': 'FLUSH'}
def record_read_metrics(self):
if self.queue_pop == 0:
return
if self.subsystem_metrics.should_pipe_execute() is True:
queue_size = self.redis.llen(self.queue_name)
self.subsystem_metrics.set('callback_receiver_events_queue_size_redis', queue_size)
self.subsystem_metrics.pipe_execute()
self.queue_pop = 0
def record_statistics(self):
# buffer stat recording to once per (by default) 5s
if time.time() - self.last_stats > settings.JOB_EVENT_STATISTICS_INTERVAL:
try:
self.redis.set(f'awx_callback_receiver_statistics_{self.pid}', self.debug())
self.last_stats = time.time()
except Exception:
logger.exception("encountered an error communicating with redis")
self.last_stats = time.time()
def debug(self):
return f'. worker[pid:{self.pid}] sent={self.total} rss={self.mb}MB {self.last_event}'
@property
def mb(self):
return '{:0.3f}'.format(psutil.Process(self.pid).memory_info().rss / 1024.0 / 1024.0)
def toggle_profiling(self, *args):
if not self.prof.is_started():
self.prof.start()
logger.error('profiling is enabled')
else:
filepath = self.prof.stop()
logger.error(f'profiling is disabled, wrote {filepath}')
def work_loop(self, idx, *args):
if settings.AWX_CALLBACK_PROFILE:
signal.signal(signal.SIGUSR1, self.toggle_profiling)
ppid = os.getppid()
signal_handler = WorkerSignalHandler()
set_connection_name('worker') # set application_name to distinguish from other dispatcher processes
while not signal_handler.kill_now:
# if the parent PID changes, this process has been orphaned
# via e.g., segfault or sigkill, we should exit too
if os.getppid() != ppid:
break
try:
body = self.read() # this is only for the callback, only reading from redis.
if body == 'QUIT':
break
except QueueEmpty:
continue
except Exception:
logger.exception("Exception on worker {}, reconnecting: ".format(idx))
continue
try:
for conn in db.connections.all():
# If the database connection has a hiccup during the prior message, close it
# so we can establish a new connection
conn.close_if_unusable_or_obsolete()
self.perform_work(body, *args)
except Exception:
logger.exception(f'Unhandled exception in perform_work in worker pid={os.getpid()}')
logger.debug('worker exiting gracefully pid:{}'.format(os.getpid()))
def flush(self, force=False):
now = tz_now()
if force or (time.time() - self.last_flush) > settings.JOB_EVENT_BUFFER_SECONDS or any([len(events) >= 1000 for events in self.buff.values()]):
metrics_bulk_events_saved = 0
metrics_singular_events_saved = 0
metrics_events_batch_save_errors = 0
metrics_events_broadcast = 0
metrics_events_missing_created = 0
metrics_total_job_event_processing_seconds = datetime.timedelta(seconds=0)
for cls, events in self.buff.items():
if not events:
continue
logger.debug(f'{cls.__name__}.objects.bulk_create({len(events)})')
for e in events:
e.modified = now # this can be set before created because now is set above on line 149
if not e.created:
e.created = now
metrics_events_missing_created += 1
else: # only calculate the seconds if the created time already has been set
metrics_total_job_event_processing_seconds += e.modified - e.created
metrics_duration_to_save = time.perf_counter()
saved_events = []
try:
cls.objects.bulk_create(events)
metrics_bulk_events_saved += len(events)
saved_events = events
self.buff[cls] = []
except Exception as exc:
# If the database is flaking, let ensure_connection throw a general exception
# will be caught by the outer loop, which goes into a proper sleep and retry loop
django_connection.ensure_connection()
logger.warning(f'Error in events bulk_create, will try indiviually, error: {str(exc)}')
# if an exception occurs, we should re-attempt to save the
# events one-by-one, because something in the list is
# broken/stale
metrics_events_batch_save_errors += 1
for e in events.copy():
try:
e.save()
metrics_singular_events_saved += 1
events.remove(e)
saved_events.append(e) # Importantly, remove successfully saved events from the buffer
except Exception as exc_indv:
retry_count = getattr(e, '_retry_count', 0) + 1
e._retry_count = retry_count
# special sanitization logic for postgres treatment of NUL 0x00 char
# This used to check the class of the exception but on the postgres3 upgrade it could appear
# as either DataError or ValueError, so now lets just try if its there.
if (retry_count == 1) and ("\x00" in e.stdout):
e.stdout = e.stdout.replace("\x00", "")
if retry_count >= self.INDIVIDUAL_EVENT_RETRIES:
logger.error(f'Hit max retries ({retry_count}) saving individual Event error: {str(exc_indv)}\ndata:\n{e.__dict__}')
events.remove(e)
else:
logger.info(f'Database Error Saving individual Event uuid={e.uuid} try={retry_count}, error: {str(exc_indv)}')
metrics_duration_to_save = time.perf_counter() - metrics_duration_to_save
for e in saved_events:
if not getattr(e, '_skip_websocket_message', False):
metrics_events_broadcast += 1
emit_event_detail(e)
if getattr(e, '_notification_trigger_event', False):
job_stats_wrapup(getattr(e, e.JOB_REFERENCE), event=e)
self.last_flush = time.time()
# only update metrics if we saved events
if (metrics_bulk_events_saved + metrics_singular_events_saved) > 0:
self.subsystem_metrics.inc('callback_receiver_batch_events_errors', metrics_events_batch_save_errors)
self.subsystem_metrics.inc('callback_receiver_events_insert_db_seconds', metrics_duration_to_save)
self.subsystem_metrics.inc('callback_receiver_events_insert_db', metrics_bulk_events_saved + metrics_singular_events_saved)
self.subsystem_metrics.observe('callback_receiver_batch_events_insert_db', metrics_bulk_events_saved)
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -(metrics_bulk_events_saved + metrics_singular_events_saved))
self.subsystem_metrics.inc('callback_receiver_events_broadcast', metrics_events_broadcast)
self.subsystem_metrics.set(
'callback_receiver_event_processing_avg_seconds',
metrics_total_job_event_processing_seconds.total_seconds()
/ (metrics_bulk_events_saved + metrics_singular_events_saved - metrics_events_missing_created),
)
if self.subsystem_metrics.should_pipe_execute() is True:
self.subsystem_metrics.pipe_execute()
def perform_work(self, body):
try:
flush = body.get('event') == 'FLUSH'
if flush:
self.last_event = ''
if not flush:
job_identifier = 'unknown job'
for cls in (JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent):
if cls.JOB_REFERENCE in body:
job_identifier = body[cls.JOB_REFERENCE]
break
self.last_event = f'\n\t- {cls.__name__} for #{job_identifier} ({body.get("event", "")} {body.get("uuid", "")})' # noqa
notification_trigger_event = bool(body.get('event') == cls.WRAPUP_EVENT)
if body.get('event') == 'EOF':
try:
if 'guid' in body:
set_guid(body['guid'])
final_counter = body.get('final_counter', 0)
logger.info('Starting EOF event processing for Job {}'.format(job_identifier))
# EOF events are sent when stdout for the running task is
# closed. don't actually persist them to the database; we
# just use them to report `summary` websocket events as an
# approximation for when a job is "done"
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=job_identifier, final_counter=final_counter))
if notification_trigger_event:
job_stats_wrapup(job_identifier)
except Exception:
logger.exception('Worker failed to perform EOF tasks: Job {}'.format(job_identifier))
finally:
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -1)
set_guid('')
return
skip_websocket_message = body.pop('skip_websocket_message', False)
event = cls.create_from_data(**body)
if skip_websocket_message: # if this event sends websocket messages, fire them off on flush
event._skip_websocket_message = True
if notification_trigger_event: # if this is an Ansible stats event, ensure notifications on flush
event._notification_trigger_event = True
self.buff.setdefault(cls, []).append(event)
retries = 0
while retries <= self.MAX_RETRIES:
try:
self.flush(force=flush)
break
except Exception as exc:
# Aside form bugs, exceptions here are assumed to be due to database flake
if retries >= self.MAX_RETRIES:
logger.exception('Worker could not re-establish database connectivity, giving up on one or more events.')
self.buff = {}
return
delay = 60 * retries
logger.warning(f'Database Error Flushing Job Events, retry #{retries + 1} in {delay} seconds: {str(exc)}')
django_connection.close()
time.sleep(delay)
retries += 1
except Exception:
logger.exception(f'Callback Task Processor Raised Unexpected Exception processing event data:\n{body}')