From 29d60844a8cee16bcd31657cecc444a692ade689 Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Fri, 29 Apr 2022 13:54:31 -0400 Subject: [PATCH] Fix notification timing issue by sending in the latter of 2 events (#12110) * Track host_status_counts and use that to process notifications * Remove now unused setting * Back out changes to callback class not needed after all * Skirt the need for duck typing by leaning on the cached field * Delete tests for deleted task * Revert "Back out changes to callback class not needed after all" This reverts commit 3b8ae350d218991d42bffd65ce4baac6f41926b2. * Directly hardcode stats_event_type for callback class * Fire notifications if stats event was never sent * Remove test content for deleted methods * Add placeholder for when no hosts matched * Make field default be None, denote events processed with empty dict * Make UI process null value for host_status_counts * Fix tracking of EOF dispatch for system jobs * Reorganize EVENT_MAP into class properties * Consolidate conditional I missed from EVENT_MAP refactor * Give up on the null condition, also applies for empty hosts * Remove cls position argument not being used * Move wrapup method out of class, add tests --- awx/api/serializers.py | 29 -------- awx/main/dispatch/worker/callback.py | 72 +++++++++++-------- .../0161_unifiedjob_host_status_counts.py | 18 +++++ awx/main/models/events.py | 18 ++--- awx/main/models/notifications.py | 15 +--- awx/main/models/unified_jobs.py | 7 ++ awx/main/tasks/callback.py | 28 ++++---- awx/main/tasks/jobs.py | 11 +-- awx/main/tasks/system.py | 19 ----- .../commands/test_callback_receiver.py | 26 +++++++ .../api/serializers/test_job_serializers.py | 45 ++---------- awx/main/tests/unit/test_tasks.py | 20 ------ awx/main/utils/update_model.py | 7 +- awx/settings/defaults.py | 3 - awx/ui/src/screens/Job/JobOutput/JobOutput.js | 2 +- 15 files changed, 139 insertions(+), 181 deletions(-) create mode 100644 awx/main/migrations/0161_unifiedjob_host_status_counts.py create mode 100644 awx/main/tests/functional/commands/test_callback_receiver.py diff --git a/awx/api/serializers.py b/awx/api/serializers.py index af76f96f20..f70e582c89 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -1607,7 +1607,6 @@ class ProjectUpdateSerializer(UnifiedJobSerializer, ProjectOptionsSerializer): class ProjectUpdateDetailSerializer(ProjectUpdateSerializer): - host_status_counts = serializers.SerializerMethodField(help_text=_('A count of hosts uniquely assigned to each status.')) playbook_counts = serializers.SerializerMethodField(help_text=_('A count of all plays and tasks for the job run.')) class Meta: @@ -1622,14 +1621,6 @@ class ProjectUpdateDetailSerializer(ProjectUpdateSerializer): return data - def get_host_status_counts(self, obj): - try: - counts = obj.project_update_events.only('event_data').get(event='playbook_on_stats').get_host_status_counts() - except ProjectUpdateEvent.DoesNotExist: - counts = {} - - return counts - class ProjectUpdateListSerializer(ProjectUpdateSerializer, UnifiedJobListSerializer): class Meta: @@ -3107,7 +3098,6 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer): class JobDetailSerializer(JobSerializer): - host_status_counts = serializers.SerializerMethodField(help_text=_('A count of hosts uniquely assigned to each status.')) playbook_counts = serializers.SerializerMethodField(help_text=_('A count of all plays and tasks for the job run.')) custom_virtualenv = serializers.ReadOnlyField() @@ -3123,14 +3113,6 @@ class JobDetailSerializer(JobSerializer): return data - def get_host_status_counts(self, obj): - try: - counts = obj.get_event_queryset().only('event_data').get(event='playbook_on_stats').get_host_status_counts() - except JobEvent.DoesNotExist: - counts = {} - - return counts - class JobCancelSerializer(BaseSerializer): @@ -3319,21 +3301,10 @@ class AdHocCommandSerializer(UnifiedJobSerializer): class AdHocCommandDetailSerializer(AdHocCommandSerializer): - - host_status_counts = serializers.SerializerMethodField(help_text=_('A count of hosts uniquely assigned to each status.')) - class Meta: model = AdHocCommand fields = ('*', 'host_status_counts') - def get_host_status_counts(self, obj): - try: - counts = obj.ad_hoc_command_events.only('event_data').get(event='playbook_on_stats').get_host_status_counts() - except AdHocCommandEvent.DoesNotExist: - counts = {} - - return counts - class AdHocCommandCancelSerializer(AdHocCommandSerializer): diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 5026e72c06..8decb39c22 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -7,7 +7,7 @@ import traceback from django.conf import settings from django.utils.timezone import now as tz_now -from django.db import DatabaseError, OperationalError, connection as django_connection +from django.db import DatabaseError, OperationalError, transaction, connection as django_connection from django.db.utils import InterfaceError, InternalError from django_guid import set_guid @@ -16,8 +16,8 @@ import psutil import redis from awx.main.consumers import emit_channel_notification -from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent, UnifiedJob, Job -from awx.main.tasks.system import handle_success_and_failure_notifications +from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent, UnifiedJob +from awx.main.constants import ACTIVE_STATES from awx.main.models.events import emit_event_detail from awx.main.utils.profiling import AWXProfiler import awx.main.analytics.subsystem_metrics as s_metrics @@ -26,6 +26,32 @@ from .base import BaseWorker logger = logging.getLogger('awx.main.commands.run_callback_receiver') +def job_stats_wrapup(job_identifier, event=None): + """Fill in the unified job host_status_counts, fire off notifications if needed""" + try: + # empty dict (versus default of None) can still indicate that events have been processed + # for job types like system jobs, and jobs with no hosts matched + host_status_counts = {} + if event: + host_status_counts = event.get_host_status_counts() + + # Update host_status_counts while holding the row lock + with transaction.atomic(): + uj = UnifiedJob.objects.select_for_update().get(pk=job_identifier) + uj.host_status_counts = host_status_counts + uj.save(update_fields=['host_status_counts']) + + uj.log_lifecycle("stats_wrapup_finished") + + # If the status was a finished state before this update was made, send notifications + # If not, we will send notifications when the status changes + if uj.status not in ACTIVE_STATES: + uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed') + + except Exception: + logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier)) + + class CallbackBrokerWorker(BaseWorker): """ A worker implementation that deserializes callback event data and persists @@ -146,6 +172,8 @@ class CallbackBrokerWorker(BaseWorker): if not getattr(e, '_skip_websocket_message', False): metrics_events_broadcast += 1 emit_event_detail(e) + if getattr(e, '_notification_trigger_event', False): + job_stats_wrapup(getattr(e, e.JOB_REFERENCE), event=e) self.buff = {} self.last_flush = time.time() # only update metrics if we saved events @@ -165,47 +193,32 @@ class CallbackBrokerWorker(BaseWorker): if flush: self.last_event = '' if not flush: - event_map = { - 'job_id': JobEvent, - 'ad_hoc_command_id': AdHocCommandEvent, - 'project_update_id': ProjectUpdateEvent, - 'inventory_update_id': InventoryUpdateEvent, - 'system_job_id': SystemJobEvent, - } - job_identifier = 'unknown job' - for key, cls in event_map.items(): - if key in body: - job_identifier = body[key] + for cls in (JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent): + if cls.JOB_REFERENCE in body: + job_identifier = body[cls.JOB_REFERENCE] break self.last_event = f'\n\t- {cls.__name__} for #{job_identifier} ({body.get("event", "")} {body.get("uuid", "")})' # noqa + notification_trigger_event = bool(body.get('event') == cls.WRAPUP_EVENT) + if body.get('event') == 'EOF': try: if 'guid' in body: set_guid(body['guid']) final_counter = body.get('final_counter', 0) - logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier)) + logger.info('Starting EOF event processing for Job {}'.format(job_identifier)) # EOF events are sent when stdout for the running task is # closed. don't actually persist them to the database; we # just use them to report `summary` websocket events as an # approximation for when a job is "done" emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=job_identifier, final_counter=final_counter)) - # Additionally, when we've processed all events, we should - # have all the data we need to send out success/failure - # notification templates - uj = UnifiedJob.objects.get(pk=job_identifier) - if isinstance(uj, Job): - # *actual playbooks* send their success/failure - # notifications in response to the playbook_on_stats - # event handling code in main.models.events - pass - elif hasattr(uj, 'send_notification_templates'): - handle_success_and_failure_notifications.apply_async([uj.id]) + if notification_trigger_event: + job_stats_wrapup(job_identifier) except Exception: - logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier)) + logger.exception('Worker failed to perform EOF tasks: Job {}'.format(job_identifier)) finally: self.subsystem_metrics.inc('callback_receiver_events_in_memory', -1) set_guid('') @@ -215,9 +228,12 @@ class CallbackBrokerWorker(BaseWorker): event = cls.create_from_data(**body) - if skip_websocket_message: + if skip_websocket_message: # if this event sends websocket messages, fire them off on flush event._skip_websocket_message = True + if notification_trigger_event: # if this is an Ansible stats event, ensure notifications on flush + event._notification_trigger_event = True + self.buff.setdefault(cls, []).append(event) retries = 0 diff --git a/awx/main/migrations/0161_unifiedjob_host_status_counts.py b/awx/main/migrations/0161_unifiedjob_host_status_counts.py new file mode 100644 index 0000000000..23871cb045 --- /dev/null +++ b/awx/main/migrations/0161_unifiedjob_host_status_counts.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.12 on 2022-04-27 02:16 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0160_alter_schedule_rrule'), + ] + + operations = [ + migrations.AddField( + model_name='unifiedjob', + name='host_status_counts', + field=models.JSONField(blank=True, default=None, editable=False, help_text='Playbook stats from the Ansible playbook_on_stats event.', null=True), + ), + ] diff --git a/awx/main/models/events.py b/awx/main/models/events.py index f80c23d58b..f6ecfc3817 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -6,7 +6,7 @@ from collections import defaultdict from django.conf import settings from django.core.exceptions import ObjectDoesNotExist -from django.db import models, DatabaseError, connection +from django.db import models, DatabaseError from django.utils.dateparse import parse_datetime from django.utils.text import Truncator from django.utils.timezone import utc, now @@ -126,6 +126,7 @@ class BasePlaybookEvent(CreatedModifiedModel): 'host_name', 'verbosity', ] + WRAPUP_EVENT = 'playbook_on_stats' class Meta: abstract = True @@ -384,14 +385,6 @@ class BasePlaybookEvent(CreatedModifiedModel): job.get_event_queryset().filter(uuid__in=changed).update(changed=True) job.get_event_queryset().filter(uuid__in=failed).update(failed=True) - # send success/failure notifications when we've finished handling the playbook_on_stats event - from awx.main.tasks.system import handle_success_and_failure_notifications # circular import - - def _send_notifications(): - handle_success_and_failure_notifications.apply_async([job.id]) - - connection.on_commit(_send_notifications) - for field in ('playbook', 'play', 'task', 'role'): value = force_str(event_data.get(field, '')).strip() if value != getattr(self, field): @@ -470,6 +463,7 @@ class JobEvent(BasePlaybookEvent): """ VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['job_id', 'workflow_job_id', 'job_created'] + JOB_REFERENCE = 'job_id' objects = DeferJobCreatedManager() @@ -600,6 +594,7 @@ UnpartitionedJobEvent._meta.db_table = '_unpartitioned_' + JobEvent._meta.db_tab class ProjectUpdateEvent(BasePlaybookEvent): VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['project_update_id', 'workflow_job_id', 'job_created'] + JOB_REFERENCE = 'project_update_id' objects = DeferJobCreatedManager() @@ -641,6 +636,7 @@ class BaseCommandEvent(CreatedModifiedModel): """ VALID_KEYS = ['event_data', 'created', 'counter', 'uuid', 'stdout', 'start_line', 'end_line', 'verbosity'] + WRAPUP_EVENT = 'EOF' class Meta: abstract = True @@ -736,6 +732,8 @@ class BaseCommandEvent(CreatedModifiedModel): class AdHocCommandEvent(BaseCommandEvent): VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command_id', 'event', 'host_name', 'host_id', 'workflow_job_id', 'job_created'] + WRAPUP_EVENT = 'playbook_on_stats' # exception to BaseCommandEvent + JOB_REFERENCE = 'ad_hoc_command_id' objects = DeferJobCreatedManager() @@ -836,6 +834,7 @@ UnpartitionedAdHocCommandEvent._meta.db_table = '_unpartitioned_' + AdHocCommand class InventoryUpdateEvent(BaseCommandEvent): VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['inventory_update_id', 'workflow_job_id', 'job_created'] + JOB_REFERENCE = 'inventory_update_id' objects = DeferJobCreatedManager() @@ -881,6 +880,7 @@ UnpartitionedInventoryUpdateEvent._meta.db_table = '_unpartitioned_' + Inventory class SystemJobEvent(BaseCommandEvent): VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['system_job_id', 'job_created'] + JOB_REFERENCE = 'system_job_id' objects = DeferJobCreatedManager() diff --git a/awx/main/models/notifications.py b/awx/main/models/notifications.py index fb8af5b5f0..266203528f 100644 --- a/awx/main/models/notifications.py +++ b/awx/main/models/notifications.py @@ -421,21 +421,8 @@ class JobNotificationMixin(object): The context will contain allowed content retrieved from a serialized job object (see JobNotificationMixin.JOB_FIELDS_ALLOWED_LIST the job's friendly name, and a url to the job run.""" - job_context = {'host_status_counts': {}} - summary = None - try: - has_event_property = any([f for f in self.event_class._meta.fields if f.name == 'event']) - except NotImplementedError: - has_event_property = False - if has_event_property: - qs = self.get_event_queryset() - if qs: - event = qs.only('event_data').filter(event='playbook_on_stats').first() - if event: - summary = event.get_host_status_counts() - job_context['host_status_counts'] = summary context = { - 'job': job_context, + 'job': {'host_status_counts': self.host_status_counts}, 'job_friendly_name': self.get_notification_friendly_name(), 'url': self.get_ui_url(), 'job_metadata': json.dumps(self.notification_data(), ensure_ascii=False, indent=4), diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 63a5e2588a..c924a12f6d 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -717,6 +717,13 @@ class UnifiedJob( editable=False, help_text=_("The version of Ansible Core installed in the execution environment."), ) + host_status_counts = models.JSONField( + blank=True, + null=True, + default=None, + editable=False, + help_text=_("Playbook stats from the Ansible playbook_on_stats event."), + ) work_unit_id = models.CharField( max_length=255, blank=True, default=None, editable=False, null=True, help_text=_("The Receptor work unit ID associated with this job.") ) diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py index f92a6028d2..529f9fc278 100644 --- a/awx/main/tasks/callback.py +++ b/awx/main/tasks/callback.py @@ -9,6 +9,7 @@ import stat from django.utils.timezone import now from django.conf import settings from django_guid import get_guid +from django.utils.functional import cached_property # AWX from awx.main.redact import UriCleaner @@ -20,8 +21,6 @@ logger = logging.getLogger('awx.main.tasks.callback') class RunnerCallback: - event_data_key = 'job_id' - def __init__(self, model=None): self.parent_workflow_job_id = None self.host_map = {} @@ -33,10 +32,19 @@ class RunnerCallback: self.event_ct = 0 self.model = model self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5) + self.wrapup_event_dispatched = False def update_model(self, pk, _attempt=0, **updates): return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates) + @cached_property + def wrapup_event_type(self): + return self.instance.event_class.WRAPUP_EVENT + + @cached_property + def event_data_key(self): + return self.instance.event_class.JOB_REFERENCE + def event_handler(self, event_data): # # ⚠️ D-D-D-DANGER ZONE ⚠️ @@ -130,6 +138,9 @@ class RunnerCallback: elif self.recent_event_timings.maxlen: self.recent_event_timings.append(time.time()) + if event_data.get('event', '') == self.wrapup_event_type: + self.wrapup_event_dispatched = True + event_data.setdefault(self.event_data_key, self.instance.id) self.dispatcher.dispatch(event_data) self.event_ct += 1 @@ -170,6 +181,8 @@ class RunnerCallback: } event_data.setdefault(self.event_data_key, self.instance.id) self.dispatcher.dispatch(event_data) + if self.wrapup_event_type == 'EOF': + self.wrapup_event_dispatched = True def status_handler(self, status_data, runner_config): """ @@ -212,9 +225,6 @@ class RunnerCallback: class RunnerCallbackForProjectUpdate(RunnerCallback): - - event_data_key = 'project_update_id' - def __init__(self, *args, **kwargs): super(RunnerCallbackForProjectUpdate, self).__init__(*args, **kwargs) self.playbook_new_revision = None @@ -231,9 +241,6 @@ class RunnerCallbackForProjectUpdate(RunnerCallback): class RunnerCallbackForInventoryUpdate(RunnerCallback): - - event_data_key = 'inventory_update_id' - def __init__(self, *args, **kwargs): super(RunnerCallbackForInventoryUpdate, self).__init__(*args, **kwargs) self.end_line = 0 @@ -245,9 +252,6 @@ class RunnerCallbackForInventoryUpdate(RunnerCallback): class RunnerCallbackForAdHocCommand(RunnerCallback): - - event_data_key = 'ad_hoc_command_id' - def __init__(self, *args, **kwargs): super(RunnerCallbackForAdHocCommand, self).__init__(*args, **kwargs) self.host_map = {} @@ -255,4 +259,4 @@ class RunnerCallbackForAdHocCommand(RunnerCallback): class RunnerCallbackForSystemJob(RunnerCallback): - event_data_key = 'system_job_id' + pass diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index 75fa109142..0ce5c42200 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -78,7 +78,7 @@ from awx.main.utils.common import ( ) from awx.conf.license import get_license from awx.main.utils.handlers import SpecialInventoryHandler -from awx.main.tasks.system import handle_success_and_failure_notifications, update_smart_memberships_for_inventory, update_inventory_computed_fields +from awx.main.tasks.system import update_smart_memberships_for_inventory, update_inventory_computed_fields from awx.main.utils.update_model import update_model from rest_framework.exceptions import PermissionDenied from django.utils.translation import gettext_lazy as _ @@ -552,8 +552,6 @@ class BaseTask(object): status = 'failed' extra_update_fields['job_explanation'] = self.instance.job_explanation - # ensure failure notification sends even if playbook_on_stats event is not triggered - handle_success_and_failure_notifications.apply_async([self.instance.id]) except ReceptorNodeNotFound as exc: extra_update_fields['job_explanation'] = str(exc) @@ -580,7 +578,12 @@ class BaseTask(object): extra_update_fields['result_traceback'] = "{}\n\n{}".format(extra_update_fields['result_traceback'], ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE) self.instance = self.update_model(pk) - self.instance = self.update_model(pk, status=status, emitted_events=self.runner_callback.event_ct, **extra_update_fields) + self.instance = self.update_model(pk, status=status, emitted_events=self.runner_callback.event_ct, select_for_update=True, **extra_update_fields) + + # Field host_status_counts is used as a metric to check if event processing is finished + # we send notifications if it is, if not, callback receiver will send them + if (self.instance.host_status_counts is not None) or (not self.runner_callback.wrapup_event_dispatched): + self.instance.send_notification_templates('succeeded' if status == 'successful' else 'failed') try: self.final_run_hook(self.instance, status, private_data_dir, fact_modification_times) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 9ab3010919..274ff546f1 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -716,25 +716,6 @@ def handle_work_error(task_id, *args, **kwargs): pass -@task(queue=get_local_queuename) -def handle_success_and_failure_notifications(job_id): - uj = UnifiedJob.objects.get(pk=job_id) - retries = 0 - while retries < settings.AWX_NOTIFICATION_JOB_FINISH_MAX_RETRY: - if uj.finished: - uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed') - return - else: - # wait a few seconds to avoid a race where the - # events are persisted _before_ the UJ.status - # changes from running -> successful - retries += 1 - time.sleep(1) - uj = UnifiedJob.objects.get(pk=job_id) - - logger.warning(f"Failed to even try to send notifications for job '{uj}' due to job not being in finished state.") - - @task(queue=get_local_queuename) def update_inventory_computed_fields(inventory_id): """ diff --git a/awx/main/tests/functional/commands/test_callback_receiver.py b/awx/main/tests/functional/commands/test_callback_receiver.py new file mode 100644 index 0000000000..389edf6ffb --- /dev/null +++ b/awx/main/tests/functional/commands/test_callback_receiver.py @@ -0,0 +1,26 @@ +import pytest + +from awx.main.dispatch.worker.callback import job_stats_wrapup +from awx.main.models.jobs import Job + + +@pytest.mark.django_db +def test_wrapup_does_not_send_notifications(mocker): + job = Job.objects.create(status='running') + assert job.host_status_counts is None + mock = mocker.patch('awx.main.models.notifications.JobNotificationMixin.send_notification_templates') + job_stats_wrapup(job.id) + job.refresh_from_db() + assert job.host_status_counts == {} + mock.assert_not_called() + + +@pytest.mark.django_db +def test_wrapup_does_send_notifications(mocker): + job = Job.objects.create(status='successful') + assert job.host_status_counts is None + mock = mocker.patch('awx.main.models.notifications.JobNotificationMixin.send_notification_templates') + job_stats_wrapup(job.id) + job.refresh_from_db() + assert job.host_status_counts == {} + mock.assert_called_once_with('succeeded') diff --git a/awx/main/tests/unit/api/serializers/test_job_serializers.py b/awx/main/tests/unit/api/serializers/test_job_serializers.py index cdcdadee82..e6a27afd05 100644 --- a/awx/main/tests/unit/api/serializers/test_job_serializers.py +++ b/awx/main/tests/unit/api/serializers/test_job_serializers.py @@ -1,16 +1,10 @@ # Python -from collections import namedtuple import pytest from unittest import mock import json # AWX -from awx.api.serializers import ( - JobDetailSerializer, - JobSerializer, - JobOptionsSerializer, - ProjectUpdateDetailSerializer, -) +from awx.api.serializers import JobSerializer, JobOptionsSerializer from awx.main.models import ( Label, @@ -108,7 +102,7 @@ class TestJobOptionsSerializerGetSummaryFields: class TestJobDetailSerializerGetHostStatusCountFields(object): - def test_hosts_are_counted_once(self, job, mocker): + def test_hosts_are_counted_once(self): mock_event = JobEvent( **{ 'event': 'playbook_on_stats', @@ -133,26 +127,11 @@ class TestJobDetailSerializerGetHostStatusCountFields(object): } ) - mock_qs = namedtuple('mock_qs', ['get'])(mocker.MagicMock(return_value=mock_event)) - only = mocker.MagicMock(return_value=mock_qs) - job.get_event_queryset = lambda *args, **kwargs: mocker.MagicMock(only=only) - - serializer = JobDetailSerializer() - host_status_counts = serializer.get_host_status_counts(job) - - assert host_status_counts == {'ok': 1, 'changed': 1, 'dark': 2} - - def test_host_status_counts_is_empty_dict_without_stats_event(self, job): - job.get_event_queryset = lambda *args, **kwargs: JobEvent.objects.none() - - serializer = JobDetailSerializer() - host_status_counts = serializer.get_host_status_counts(job) - - assert host_status_counts == {} + assert mock_event.get_host_status_counts() == {'ok': 1, 'changed': 1, 'dark': 2} class TestProjectUpdateDetailSerializerGetHostStatusCountFields(object): - def test_hosts_are_counted_once(self, project_update, mocker): + def test_hosts_are_counted_once(self): mock_event = ProjectUpdateEvent( **{ 'event': 'playbook_on_stats', @@ -177,18 +156,4 @@ class TestProjectUpdateDetailSerializerGetHostStatusCountFields(object): } ) - mock_qs = namedtuple('mock_qs', ['get'])(mocker.MagicMock(return_value=mock_event)) - project_update.project_update_events.only = mocker.MagicMock(return_value=mock_qs) - - serializer = ProjectUpdateDetailSerializer() - host_status_counts = serializer.get_host_status_counts(project_update) - - assert host_status_counts == {'ok': 1, 'changed': 1, 'dark': 2} - - def test_host_status_counts_is_empty_dict_without_stats_event(self, project_update): - project_update.project_update_events = ProjectUpdateEvent.objects.none() - - serializer = ProjectUpdateDetailSerializer() - host_status_counts = serializer.get_host_status_counts(project_update) - - assert host_status_counts == {} + assert mock_event.get_host_status_counts() == {'ok': 1, 'changed': 1, 'dark': 2} diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 22e1ea967b..3c5867d9ff 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -1919,26 +1919,6 @@ def test_managed_injector_redaction(injector_cls): assert 'very_secret_value' not in str(build_safe_env(env)) -@mock.patch('logging.getLogger') -def test_notification_job_not_finished(logging_getLogger, mocker): - uj = mocker.MagicMock() - uj.finished = False - logger = mocker.Mock() - logging_getLogger.return_value = logger - - with mocker.patch('awx.main.models.UnifiedJob.objects.get', uj): - system.handle_success_and_failure_notifications(1) - assert logger.warning.called_with(f"Failed to even try to send notifications for job '{uj}' due to job not being in finished state.") - - -def test_notification_job_finished(mocker): - uj = mocker.MagicMock(send_notification_templates=mocker.MagicMock(), finished=True) - - with mocker.patch('awx.main.models.UnifiedJob.objects.get', mocker.MagicMock(return_value=uj)): - system.handle_success_and_failure_notifications(1) - uj.send_notification_templates.assert_called() - - def test_job_run_no_ee(mock_me): org = Organization(pk=1) proj = Project(pk=1, organization=org) diff --git a/awx/main/utils/update_model.py b/awx/main/utils/update_model.py index 73acefd1fc..7d03b3964b 100644 --- a/awx/main/utils/update_model.py +++ b/awx/main/utils/update_model.py @@ -7,14 +7,17 @@ import time logger = logging.getLogger('awx.main.tasks.utils') -def update_model(model, pk, _attempt=0, _max_attempts=5, **updates): +def update_model(model, pk, _attempt=0, _max_attempts=5, select_for_update=False, **updates): """Reload the model instance from the database and update the given fields. """ try: with transaction.atomic(): # Retrieve the model instance. - instance = model.objects.get(pk=pk) + if select_for_update: + instance = model.objects.select_for_update().get(pk=pk) + else: + instance = model.objects.get(pk=pk) # Update the appropriate fields and save the model # instance, then return the new instance. diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index a040364238..0f528ada14 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -997,9 +997,6 @@ BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10 # How often websocket process will generate stats BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5 -# Number of times to retry sending a notification when waiting on a job to finish. -AWX_NOTIFICATION_JOB_FINISH_MAX_RETRY = 5 - DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'} # Name of the default task queue diff --git a/awx/ui/src/screens/Job/JobOutput/JobOutput.js b/awx/ui/src/screens/Job/JobOutput/JobOutput.js index 87f900aee1..cf2a4f4f6a 100644 --- a/awx/ui/src/screens/Job/JobOutput/JobOutput.js +++ b/awx/ui/src/screens/Job/JobOutput/JobOutput.js @@ -652,7 +652,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) { isDeleteDisabled={isDeleting} /> - +