diff --git a/awx/api/serializers.py b/awx/api/serializers.py
index af76f96f20..f70e582c89 100644
--- a/awx/api/serializers.py
+++ b/awx/api/serializers.py
@@ -1607,7 +1607,6 @@ class ProjectUpdateSerializer(UnifiedJobSerializer, ProjectOptionsSerializer):
class ProjectUpdateDetailSerializer(ProjectUpdateSerializer):
- host_status_counts = serializers.SerializerMethodField(help_text=_('A count of hosts uniquely assigned to each status.'))
playbook_counts = serializers.SerializerMethodField(help_text=_('A count of all plays and tasks for the job run.'))
class Meta:
@@ -1622,14 +1621,6 @@ class ProjectUpdateDetailSerializer(ProjectUpdateSerializer):
return data
- def get_host_status_counts(self, obj):
- try:
- counts = obj.project_update_events.only('event_data').get(event='playbook_on_stats').get_host_status_counts()
- except ProjectUpdateEvent.DoesNotExist:
- counts = {}
-
- return counts
-
class ProjectUpdateListSerializer(ProjectUpdateSerializer, UnifiedJobListSerializer):
class Meta:
@@ -3107,7 +3098,6 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer):
class JobDetailSerializer(JobSerializer):
- host_status_counts = serializers.SerializerMethodField(help_text=_('A count of hosts uniquely assigned to each status.'))
playbook_counts = serializers.SerializerMethodField(help_text=_('A count of all plays and tasks for the job run.'))
custom_virtualenv = serializers.ReadOnlyField()
@@ -3123,14 +3113,6 @@ class JobDetailSerializer(JobSerializer):
return data
- def get_host_status_counts(self, obj):
- try:
- counts = obj.get_event_queryset().only('event_data').get(event='playbook_on_stats').get_host_status_counts()
- except JobEvent.DoesNotExist:
- counts = {}
-
- return counts
-
class JobCancelSerializer(BaseSerializer):
@@ -3319,21 +3301,10 @@ class AdHocCommandSerializer(UnifiedJobSerializer):
class AdHocCommandDetailSerializer(AdHocCommandSerializer):
-
- host_status_counts = serializers.SerializerMethodField(help_text=_('A count of hosts uniquely assigned to each status.'))
-
class Meta:
model = AdHocCommand
fields = ('*', 'host_status_counts')
- def get_host_status_counts(self, obj):
- try:
- counts = obj.ad_hoc_command_events.only('event_data').get(event='playbook_on_stats').get_host_status_counts()
- except AdHocCommandEvent.DoesNotExist:
- counts = {}
-
- return counts
-
class AdHocCommandCancelSerializer(AdHocCommandSerializer):
diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py
index 5026e72c06..8decb39c22 100644
--- a/awx/main/dispatch/worker/callback.py
+++ b/awx/main/dispatch/worker/callback.py
@@ -7,7 +7,7 @@ import traceback
from django.conf import settings
from django.utils.timezone import now as tz_now
-from django.db import DatabaseError, OperationalError, connection as django_connection
+from django.db import DatabaseError, OperationalError, transaction, connection as django_connection
from django.db.utils import InterfaceError, InternalError
from django_guid import set_guid
@@ -16,8 +16,8 @@ import psutil
import redis
from awx.main.consumers import emit_channel_notification
-from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent, UnifiedJob, Job
-from awx.main.tasks.system import handle_success_and_failure_notifications
+from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent, UnifiedJob
+from awx.main.constants import ACTIVE_STATES
from awx.main.models.events import emit_event_detail
from awx.main.utils.profiling import AWXProfiler
import awx.main.analytics.subsystem_metrics as s_metrics
@@ -26,6 +26,32 @@ from .base import BaseWorker
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
+def job_stats_wrapup(job_identifier, event=None):
+ """Fill in the unified job host_status_counts, fire off notifications if needed"""
+ try:
+ # empty dict (versus default of None) can still indicate that events have been processed
+ # for job types like system jobs, and jobs with no hosts matched
+ host_status_counts = {}
+ if event:
+ host_status_counts = event.get_host_status_counts()
+
+ # Update host_status_counts while holding the row lock
+ with transaction.atomic():
+ uj = UnifiedJob.objects.select_for_update().get(pk=job_identifier)
+ uj.host_status_counts = host_status_counts
+ uj.save(update_fields=['host_status_counts'])
+
+ uj.log_lifecycle("stats_wrapup_finished")
+
+ # If the status was a finished state before this update was made, send notifications
+ # If not, we will send notifications when the status changes
+ if uj.status not in ACTIVE_STATES:
+ uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')
+
+ except Exception:
+ logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier))
+
+
class CallbackBrokerWorker(BaseWorker):
"""
A worker implementation that deserializes callback event data and persists
@@ -146,6 +172,8 @@ class CallbackBrokerWorker(BaseWorker):
if not getattr(e, '_skip_websocket_message', False):
metrics_events_broadcast += 1
emit_event_detail(e)
+ if getattr(e, '_notification_trigger_event', False):
+ job_stats_wrapup(getattr(e, e.JOB_REFERENCE), event=e)
self.buff = {}
self.last_flush = time.time()
# only update metrics if we saved events
@@ -165,47 +193,32 @@ class CallbackBrokerWorker(BaseWorker):
if flush:
self.last_event = ''
if not flush:
- event_map = {
- 'job_id': JobEvent,
- 'ad_hoc_command_id': AdHocCommandEvent,
- 'project_update_id': ProjectUpdateEvent,
- 'inventory_update_id': InventoryUpdateEvent,
- 'system_job_id': SystemJobEvent,
- }
-
job_identifier = 'unknown job'
- for key, cls in event_map.items():
- if key in body:
- job_identifier = body[key]
+ for cls in (JobEvent, AdHocCommandEvent, ProjectUpdateEvent, InventoryUpdateEvent, SystemJobEvent):
+ if cls.JOB_REFERENCE in body:
+ job_identifier = body[cls.JOB_REFERENCE]
break
self.last_event = f'\n\t- {cls.__name__} for #{job_identifier} ({body.get("event", "")} {body.get("uuid", "")})' # noqa
+ notification_trigger_event = bool(body.get('event') == cls.WRAPUP_EVENT)
+
if body.get('event') == 'EOF':
try:
if 'guid' in body:
set_guid(body['guid'])
final_counter = body.get('final_counter', 0)
- logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier))
+ logger.info('Starting EOF event processing for Job {}'.format(job_identifier))
# EOF events are sent when stdout for the running task is
# closed. don't actually persist them to the database; we
# just use them to report `summary` websocket events as an
# approximation for when a job is "done"
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=job_identifier, final_counter=final_counter))
- # Additionally, when we've processed all events, we should
- # have all the data we need to send out success/failure
- # notification templates
- uj = UnifiedJob.objects.get(pk=job_identifier)
- if isinstance(uj, Job):
- # *actual playbooks* send their success/failure
- # notifications in response to the playbook_on_stats
- # event handling code in main.models.events
- pass
- elif hasattr(uj, 'send_notification_templates'):
- handle_success_and_failure_notifications.apply_async([uj.id])
+ if notification_trigger_event:
+ job_stats_wrapup(job_identifier)
except Exception:
- logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier))
+ logger.exception('Worker failed to perform EOF tasks: Job {}'.format(job_identifier))
finally:
self.subsystem_metrics.inc('callback_receiver_events_in_memory', -1)
set_guid('')
@@ -215,9 +228,12 @@ class CallbackBrokerWorker(BaseWorker):
event = cls.create_from_data(**body)
- if skip_websocket_message:
+ if skip_websocket_message: # if this event sends websocket messages, fire them off on flush
event._skip_websocket_message = True
+ if notification_trigger_event: # if this is an Ansible stats event, ensure notifications on flush
+ event._notification_trigger_event = True
+
self.buff.setdefault(cls, []).append(event)
retries = 0
diff --git a/awx/main/migrations/0161_unifiedjob_host_status_counts.py b/awx/main/migrations/0161_unifiedjob_host_status_counts.py
new file mode 100644
index 0000000000..23871cb045
--- /dev/null
+++ b/awx/main/migrations/0161_unifiedjob_host_status_counts.py
@@ -0,0 +1,18 @@
+# Generated by Django 3.2.12 on 2022-04-27 02:16
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('main', '0160_alter_schedule_rrule'),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name='unifiedjob',
+ name='host_status_counts',
+ field=models.JSONField(blank=True, default=None, editable=False, help_text='Playbook stats from the Ansible playbook_on_stats event.', null=True),
+ ),
+ ]
diff --git a/awx/main/models/events.py b/awx/main/models/events.py
index f80c23d58b..f6ecfc3817 100644
--- a/awx/main/models/events.py
+++ b/awx/main/models/events.py
@@ -6,7 +6,7 @@ from collections import defaultdict
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
-from django.db import models, DatabaseError, connection
+from django.db import models, DatabaseError
from django.utils.dateparse import parse_datetime
from django.utils.text import Truncator
from django.utils.timezone import utc, now
@@ -126,6 +126,7 @@ class BasePlaybookEvent(CreatedModifiedModel):
'host_name',
'verbosity',
]
+ WRAPUP_EVENT = 'playbook_on_stats'
class Meta:
abstract = True
@@ -384,14 +385,6 @@ class BasePlaybookEvent(CreatedModifiedModel):
job.get_event_queryset().filter(uuid__in=changed).update(changed=True)
job.get_event_queryset().filter(uuid__in=failed).update(failed=True)
- # send success/failure notifications when we've finished handling the playbook_on_stats event
- from awx.main.tasks.system import handle_success_and_failure_notifications # circular import
-
- def _send_notifications():
- handle_success_and_failure_notifications.apply_async([job.id])
-
- connection.on_commit(_send_notifications)
-
for field in ('playbook', 'play', 'task', 'role'):
value = force_str(event_data.get(field, '')).strip()
if value != getattr(self, field):
@@ -470,6 +463,7 @@ class JobEvent(BasePlaybookEvent):
"""
VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['job_id', 'workflow_job_id', 'job_created']
+ JOB_REFERENCE = 'job_id'
objects = DeferJobCreatedManager()
@@ -600,6 +594,7 @@ UnpartitionedJobEvent._meta.db_table = '_unpartitioned_' + JobEvent._meta.db_tab
class ProjectUpdateEvent(BasePlaybookEvent):
VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['project_update_id', 'workflow_job_id', 'job_created']
+ JOB_REFERENCE = 'project_update_id'
objects = DeferJobCreatedManager()
@@ -641,6 +636,7 @@ class BaseCommandEvent(CreatedModifiedModel):
"""
VALID_KEYS = ['event_data', 'created', 'counter', 'uuid', 'stdout', 'start_line', 'end_line', 'verbosity']
+ WRAPUP_EVENT = 'EOF'
class Meta:
abstract = True
@@ -736,6 +732,8 @@ class BaseCommandEvent(CreatedModifiedModel):
class AdHocCommandEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command_id', 'event', 'host_name', 'host_id', 'workflow_job_id', 'job_created']
+ WRAPUP_EVENT = 'playbook_on_stats' # exception to BaseCommandEvent
+ JOB_REFERENCE = 'ad_hoc_command_id'
objects = DeferJobCreatedManager()
@@ -836,6 +834,7 @@ UnpartitionedAdHocCommandEvent._meta.db_table = '_unpartitioned_' + AdHocCommand
class InventoryUpdateEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['inventory_update_id', 'workflow_job_id', 'job_created']
+ JOB_REFERENCE = 'inventory_update_id'
objects = DeferJobCreatedManager()
@@ -881,6 +880,7 @@ UnpartitionedInventoryUpdateEvent._meta.db_table = '_unpartitioned_' + Inventory
class SystemJobEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['system_job_id', 'job_created']
+ JOB_REFERENCE = 'system_job_id'
objects = DeferJobCreatedManager()
diff --git a/awx/main/models/notifications.py b/awx/main/models/notifications.py
index fb8af5b5f0..266203528f 100644
--- a/awx/main/models/notifications.py
+++ b/awx/main/models/notifications.py
@@ -421,21 +421,8 @@ class JobNotificationMixin(object):
The context will contain allowed content retrieved from a serialized job object
(see JobNotificationMixin.JOB_FIELDS_ALLOWED_LIST the job's friendly name,
and a url to the job run."""
- job_context = {'host_status_counts': {}}
- summary = None
- try:
- has_event_property = any([f for f in self.event_class._meta.fields if f.name == 'event'])
- except NotImplementedError:
- has_event_property = False
- if has_event_property:
- qs = self.get_event_queryset()
- if qs:
- event = qs.only('event_data').filter(event='playbook_on_stats').first()
- if event:
- summary = event.get_host_status_counts()
- job_context['host_status_counts'] = summary
context = {
- 'job': job_context,
+ 'job': {'host_status_counts': self.host_status_counts},
'job_friendly_name': self.get_notification_friendly_name(),
'url': self.get_ui_url(),
'job_metadata': json.dumps(self.notification_data(), ensure_ascii=False, indent=4),
diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py
index 63a5e2588a..c924a12f6d 100644
--- a/awx/main/models/unified_jobs.py
+++ b/awx/main/models/unified_jobs.py
@@ -717,6 +717,13 @@ class UnifiedJob(
editable=False,
help_text=_("The version of Ansible Core installed in the execution environment."),
)
+ host_status_counts = models.JSONField(
+ blank=True,
+ null=True,
+ default=None,
+ editable=False,
+ help_text=_("Playbook stats from the Ansible playbook_on_stats event."),
+ )
work_unit_id = models.CharField(
max_length=255, blank=True, default=None, editable=False, null=True, help_text=_("The Receptor work unit ID associated with this job.")
)
diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py
index f92a6028d2..529f9fc278 100644
--- a/awx/main/tasks/callback.py
+++ b/awx/main/tasks/callback.py
@@ -9,6 +9,7 @@ import stat
from django.utils.timezone import now
from django.conf import settings
from django_guid import get_guid
+from django.utils.functional import cached_property
# AWX
from awx.main.redact import UriCleaner
@@ -20,8 +21,6 @@ logger = logging.getLogger('awx.main.tasks.callback')
class RunnerCallback:
- event_data_key = 'job_id'
-
def __init__(self, model=None):
self.parent_workflow_job_id = None
self.host_map = {}
@@ -33,10 +32,19 @@ class RunnerCallback:
self.event_ct = 0
self.model = model
self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5)
+ self.wrapup_event_dispatched = False
def update_model(self, pk, _attempt=0, **updates):
return update_model(self.model, pk, _attempt=0, _max_attempts=self.update_attempts, **updates)
+ @cached_property
+ def wrapup_event_type(self):
+ return self.instance.event_class.WRAPUP_EVENT
+
+ @cached_property
+ def event_data_key(self):
+ return self.instance.event_class.JOB_REFERENCE
+
def event_handler(self, event_data):
#
# ⚠️ D-D-D-DANGER ZONE ⚠️
@@ -130,6 +138,9 @@ class RunnerCallback:
elif self.recent_event_timings.maxlen:
self.recent_event_timings.append(time.time())
+ if event_data.get('event', '') == self.wrapup_event_type:
+ self.wrapup_event_dispatched = True
+
event_data.setdefault(self.event_data_key, self.instance.id)
self.dispatcher.dispatch(event_data)
self.event_ct += 1
@@ -170,6 +181,8 @@ class RunnerCallback:
}
event_data.setdefault(self.event_data_key, self.instance.id)
self.dispatcher.dispatch(event_data)
+ if self.wrapup_event_type == 'EOF':
+ self.wrapup_event_dispatched = True
def status_handler(self, status_data, runner_config):
"""
@@ -212,9 +225,6 @@ class RunnerCallback:
class RunnerCallbackForProjectUpdate(RunnerCallback):
-
- event_data_key = 'project_update_id'
-
def __init__(self, *args, **kwargs):
super(RunnerCallbackForProjectUpdate, self).__init__(*args, **kwargs)
self.playbook_new_revision = None
@@ -231,9 +241,6 @@ class RunnerCallbackForProjectUpdate(RunnerCallback):
class RunnerCallbackForInventoryUpdate(RunnerCallback):
-
- event_data_key = 'inventory_update_id'
-
def __init__(self, *args, **kwargs):
super(RunnerCallbackForInventoryUpdate, self).__init__(*args, **kwargs)
self.end_line = 0
@@ -245,9 +252,6 @@ class RunnerCallbackForInventoryUpdate(RunnerCallback):
class RunnerCallbackForAdHocCommand(RunnerCallback):
-
- event_data_key = 'ad_hoc_command_id'
-
def __init__(self, *args, **kwargs):
super(RunnerCallbackForAdHocCommand, self).__init__(*args, **kwargs)
self.host_map = {}
@@ -255,4 +259,4 @@ class RunnerCallbackForAdHocCommand(RunnerCallback):
class RunnerCallbackForSystemJob(RunnerCallback):
- event_data_key = 'system_job_id'
+ pass
diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py
index 75fa109142..0ce5c42200 100644
--- a/awx/main/tasks/jobs.py
+++ b/awx/main/tasks/jobs.py
@@ -78,7 +78,7 @@ from awx.main.utils.common import (
)
from awx.conf.license import get_license
from awx.main.utils.handlers import SpecialInventoryHandler
-from awx.main.tasks.system import handle_success_and_failure_notifications, update_smart_memberships_for_inventory, update_inventory_computed_fields
+from awx.main.tasks.system import update_smart_memberships_for_inventory, update_inventory_computed_fields
from awx.main.utils.update_model import update_model
from rest_framework.exceptions import PermissionDenied
from django.utils.translation import gettext_lazy as _
@@ -552,8 +552,6 @@ class BaseTask(object):
status = 'failed'
extra_update_fields['job_explanation'] = self.instance.job_explanation
- # ensure failure notification sends even if playbook_on_stats event is not triggered
- handle_success_and_failure_notifications.apply_async([self.instance.id])
except ReceptorNodeNotFound as exc:
extra_update_fields['job_explanation'] = str(exc)
@@ -580,7 +578,12 @@ class BaseTask(object):
extra_update_fields['result_traceback'] = "{}\n\n{}".format(extra_update_fields['result_traceback'], ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE)
self.instance = self.update_model(pk)
- self.instance = self.update_model(pk, status=status, emitted_events=self.runner_callback.event_ct, **extra_update_fields)
+ self.instance = self.update_model(pk, status=status, emitted_events=self.runner_callback.event_ct, select_for_update=True, **extra_update_fields)
+
+ # Field host_status_counts is used as a metric to check if event processing is finished
+ # we send notifications if it is, if not, callback receiver will send them
+ if (self.instance.host_status_counts is not None) or (not self.runner_callback.wrapup_event_dispatched):
+ self.instance.send_notification_templates('succeeded' if status == 'successful' else 'failed')
try:
self.final_run_hook(self.instance, status, private_data_dir, fact_modification_times)
diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py
index 9ab3010919..274ff546f1 100644
--- a/awx/main/tasks/system.py
+++ b/awx/main/tasks/system.py
@@ -716,25 +716,6 @@ def handle_work_error(task_id, *args, **kwargs):
pass
-@task(queue=get_local_queuename)
-def handle_success_and_failure_notifications(job_id):
- uj = UnifiedJob.objects.get(pk=job_id)
- retries = 0
- while retries < settings.AWX_NOTIFICATION_JOB_FINISH_MAX_RETRY:
- if uj.finished:
- uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')
- return
- else:
- # wait a few seconds to avoid a race where the
- # events are persisted _before_ the UJ.status
- # changes from running -> successful
- retries += 1
- time.sleep(1)
- uj = UnifiedJob.objects.get(pk=job_id)
-
- logger.warning(f"Failed to even try to send notifications for job '{uj}' due to job not being in finished state.")
-
-
@task(queue=get_local_queuename)
def update_inventory_computed_fields(inventory_id):
"""
diff --git a/awx/main/tests/functional/commands/test_callback_receiver.py b/awx/main/tests/functional/commands/test_callback_receiver.py
new file mode 100644
index 0000000000..389edf6ffb
--- /dev/null
+++ b/awx/main/tests/functional/commands/test_callback_receiver.py
@@ -0,0 +1,26 @@
+import pytest
+
+from awx.main.dispatch.worker.callback import job_stats_wrapup
+from awx.main.models.jobs import Job
+
+
+@pytest.mark.django_db
+def test_wrapup_does_not_send_notifications(mocker):
+ job = Job.objects.create(status='running')
+ assert job.host_status_counts is None
+ mock = mocker.patch('awx.main.models.notifications.JobNotificationMixin.send_notification_templates')
+ job_stats_wrapup(job.id)
+ job.refresh_from_db()
+ assert job.host_status_counts == {}
+ mock.assert_not_called()
+
+
+@pytest.mark.django_db
+def test_wrapup_does_send_notifications(mocker):
+ job = Job.objects.create(status='successful')
+ assert job.host_status_counts is None
+ mock = mocker.patch('awx.main.models.notifications.JobNotificationMixin.send_notification_templates')
+ job_stats_wrapup(job.id)
+ job.refresh_from_db()
+ assert job.host_status_counts == {}
+ mock.assert_called_once_with('succeeded')
diff --git a/awx/main/tests/unit/api/serializers/test_job_serializers.py b/awx/main/tests/unit/api/serializers/test_job_serializers.py
index cdcdadee82..e6a27afd05 100644
--- a/awx/main/tests/unit/api/serializers/test_job_serializers.py
+++ b/awx/main/tests/unit/api/serializers/test_job_serializers.py
@@ -1,16 +1,10 @@
# Python
-from collections import namedtuple
import pytest
from unittest import mock
import json
# AWX
-from awx.api.serializers import (
- JobDetailSerializer,
- JobSerializer,
- JobOptionsSerializer,
- ProjectUpdateDetailSerializer,
-)
+from awx.api.serializers import JobSerializer, JobOptionsSerializer
from awx.main.models import (
Label,
@@ -108,7 +102,7 @@ class TestJobOptionsSerializerGetSummaryFields:
class TestJobDetailSerializerGetHostStatusCountFields(object):
- def test_hosts_are_counted_once(self, job, mocker):
+ def test_hosts_are_counted_once(self):
mock_event = JobEvent(
**{
'event': 'playbook_on_stats',
@@ -133,26 +127,11 @@ class TestJobDetailSerializerGetHostStatusCountFields(object):
}
)
- mock_qs = namedtuple('mock_qs', ['get'])(mocker.MagicMock(return_value=mock_event))
- only = mocker.MagicMock(return_value=mock_qs)
- job.get_event_queryset = lambda *args, **kwargs: mocker.MagicMock(only=only)
-
- serializer = JobDetailSerializer()
- host_status_counts = serializer.get_host_status_counts(job)
-
- assert host_status_counts == {'ok': 1, 'changed': 1, 'dark': 2}
-
- def test_host_status_counts_is_empty_dict_without_stats_event(self, job):
- job.get_event_queryset = lambda *args, **kwargs: JobEvent.objects.none()
-
- serializer = JobDetailSerializer()
- host_status_counts = serializer.get_host_status_counts(job)
-
- assert host_status_counts == {}
+ assert mock_event.get_host_status_counts() == {'ok': 1, 'changed': 1, 'dark': 2}
class TestProjectUpdateDetailSerializerGetHostStatusCountFields(object):
- def test_hosts_are_counted_once(self, project_update, mocker):
+ def test_hosts_are_counted_once(self):
mock_event = ProjectUpdateEvent(
**{
'event': 'playbook_on_stats',
@@ -177,18 +156,4 @@ class TestProjectUpdateDetailSerializerGetHostStatusCountFields(object):
}
)
- mock_qs = namedtuple('mock_qs', ['get'])(mocker.MagicMock(return_value=mock_event))
- project_update.project_update_events.only = mocker.MagicMock(return_value=mock_qs)
-
- serializer = ProjectUpdateDetailSerializer()
- host_status_counts = serializer.get_host_status_counts(project_update)
-
- assert host_status_counts == {'ok': 1, 'changed': 1, 'dark': 2}
-
- def test_host_status_counts_is_empty_dict_without_stats_event(self, project_update):
- project_update.project_update_events = ProjectUpdateEvent.objects.none()
-
- serializer = ProjectUpdateDetailSerializer()
- host_status_counts = serializer.get_host_status_counts(project_update)
-
- assert host_status_counts == {}
+ assert mock_event.get_host_status_counts() == {'ok': 1, 'changed': 1, 'dark': 2}
diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py
index 22e1ea967b..3c5867d9ff 100644
--- a/awx/main/tests/unit/test_tasks.py
+++ b/awx/main/tests/unit/test_tasks.py
@@ -1919,26 +1919,6 @@ def test_managed_injector_redaction(injector_cls):
assert 'very_secret_value' not in str(build_safe_env(env))
-@mock.patch('logging.getLogger')
-def test_notification_job_not_finished(logging_getLogger, mocker):
- uj = mocker.MagicMock()
- uj.finished = False
- logger = mocker.Mock()
- logging_getLogger.return_value = logger
-
- with mocker.patch('awx.main.models.UnifiedJob.objects.get', uj):
- system.handle_success_and_failure_notifications(1)
- assert logger.warning.called_with(f"Failed to even try to send notifications for job '{uj}' due to job not being in finished state.")
-
-
-def test_notification_job_finished(mocker):
- uj = mocker.MagicMock(send_notification_templates=mocker.MagicMock(), finished=True)
-
- with mocker.patch('awx.main.models.UnifiedJob.objects.get', mocker.MagicMock(return_value=uj)):
- system.handle_success_and_failure_notifications(1)
- uj.send_notification_templates.assert_called()
-
-
def test_job_run_no_ee(mock_me):
org = Organization(pk=1)
proj = Project(pk=1, organization=org)
diff --git a/awx/main/utils/update_model.py b/awx/main/utils/update_model.py
index 73acefd1fc..7d03b3964b 100644
--- a/awx/main/utils/update_model.py
+++ b/awx/main/utils/update_model.py
@@ -7,14 +7,17 @@ import time
logger = logging.getLogger('awx.main.tasks.utils')
-def update_model(model, pk, _attempt=0, _max_attempts=5, **updates):
+def update_model(model, pk, _attempt=0, _max_attempts=5, select_for_update=False, **updates):
"""Reload the model instance from the database and update the
given fields.
"""
try:
with transaction.atomic():
# Retrieve the model instance.
- instance = model.objects.get(pk=pk)
+ if select_for_update:
+ instance = model.objects.select_for_update().get(pk=pk)
+ else:
+ instance = model.objects.get(pk=pk)
# Update the appropriate fields and save the model
# instance, then return the new instance.
diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py
index a040364238..0f528ada14 100644
--- a/awx/settings/defaults.py
+++ b/awx/settings/defaults.py
@@ -997,9 +997,6 @@ BROADCAST_WEBSOCKET_NEW_INSTANCE_POLL_RATE_SECONDS = 10
# How often websocket process will generate stats
BROADCAST_WEBSOCKET_STATS_POLL_RATE_SECONDS = 5
-# Number of times to retry sending a notification when waiting on a job to finish.
-AWX_NOTIFICATION_JOB_FINISH_MAX_RETRY = 5
-
DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'}
# Name of the default task queue
diff --git a/awx/ui/src/screens/Job/JobOutput/JobOutput.js b/awx/ui/src/screens/Job/JobOutput/JobOutput.js
index 87f900aee1..cf2a4f4f6a 100644
--- a/awx/ui/src/screens/Job/JobOutput/JobOutput.js
+++ b/awx/ui/src/screens/Job/JobOutput/JobOutput.js
@@ -652,7 +652,7 @@ function JobOutput({ job, eventRelatedSearchableKeys, eventSearchableKeys }) {
isDeleteDisabled={isDeleting}
/>
-
+