Merge pull request #5618 from ryanpetrello/callback-write-speed

heavily optimize the write speed of the callback receiver

Reviewed-by: https://github.com/apps/softwarefactory-project-zuul
This commit is contained in:
softwarefactory-project-zuul[bot] 2020-01-14 17:37:19 +00:00 committed by GitHub
commit b12c2a142d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 370 additions and 495 deletions

View File

@ -3874,26 +3874,6 @@ class JobEventSerializer(BaseSerializer):
return data
class JobEventWebSocketSerializer(JobEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()
class Meta:
model = JobEvent
fields = ('*', 'event_name', 'group_name',)
def get_created(self, obj):
return obj.created.isoformat()
def get_modified(self, obj):
return obj.modified.isoformat()
def get_group_name(self, obj):
return 'job_events'
class ProjectUpdateEventSerializer(JobEventSerializer):
stdout = serializers.SerializerMethodField()
event_data = serializers.SerializerMethodField()
@ -3925,26 +3905,6 @@ class ProjectUpdateEventSerializer(JobEventSerializer):
return {}
class ProjectUpdateEventWebSocketSerializer(ProjectUpdateEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()
class Meta:
model = ProjectUpdateEvent
fields = ('*', 'event_name', 'group_name',)
def get_created(self, obj):
return obj.created.isoformat()
def get_modified(self, obj):
return obj.modified.isoformat()
def get_group_name(self, obj):
return 'project_update_events'
class AdHocCommandEventSerializer(BaseSerializer):
event_display = serializers.CharField(source='get_event_display', read_only=True)
@ -3976,26 +3936,6 @@ class AdHocCommandEventSerializer(BaseSerializer):
return data
class AdHocCommandEventWebSocketSerializer(AdHocCommandEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()
class Meta:
model = AdHocCommandEvent
fields = ('*', 'event_name', 'group_name',)
def get_created(self, obj):
return obj.created.isoformat()
def get_modified(self, obj):
return obj.modified.isoformat()
def get_group_name(self, obj):
return 'ad_hoc_command_events'
class InventoryUpdateEventSerializer(AdHocCommandEventSerializer):
class Meta:
@ -4011,26 +3951,6 @@ class InventoryUpdateEventSerializer(AdHocCommandEventSerializer):
return res
class InventoryUpdateEventWebSocketSerializer(InventoryUpdateEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()
class Meta:
model = InventoryUpdateEvent
fields = ('*', 'event_name', 'group_name',)
def get_created(self, obj):
return obj.created.isoformat()
def get_modified(self, obj):
return obj.modified.isoformat()
def get_group_name(self, obj):
return 'inventory_update_events'
class SystemJobEventSerializer(AdHocCommandEventSerializer):
class Meta:
@ -4046,26 +3966,6 @@ class SystemJobEventSerializer(AdHocCommandEventSerializer):
return res
class SystemJobEventWebSocketSerializer(SystemJobEventSerializer):
created = serializers.SerializerMethodField()
modified = serializers.SerializerMethodField()
event_name = serializers.CharField(source='event')
group_name = serializers.SerializerMethodField()
class Meta:
model = SystemJobEvent
fields = ('*', 'event_name', 'group_name',)
def get_created(self, obj):
return obj.created.isoformat()
def get_modified(self, obj):
return obj.modified.isoformat()
def get_group_name(self, obj):
return 'system_job_events'
class JobLaunchSerializer(BaseSerializer):
# Representational fields

View File

@ -28,6 +28,8 @@ from awx.conf import settings_registry
from awx.conf.models import Setting
from awx.conf.migrations._reencrypt import decrypt_field as old_decrypt_field
import cachetools
# FIXME: Gracefully handle when settings are accessed before the database is
# ready (or during migrations).
@ -136,6 +138,14 @@ def filter_sensitive(registry, key, value):
return value
# settings.__getattr__ is called *constantly*, and the LOG_AGGREGATOR_ ones are
# so ubiquitous when external logging is enabled that they should kept in memory
# with a short TTL to avoid even having to contact memcached
# the primary use case for this optimization is the callback receiver
# when external logging is enabled
LOGGING_SETTINGS_CACHE = cachetools.TTLCache(maxsize=50, ttl=1)
class EncryptedCacheProxy(object):
def __init__(self, cache, registry, encrypter=None, decrypter=None):
@ -437,11 +447,17 @@ class SettingsWrapper(UserSettingsHolder):
return self._get_default('SETTINGS_MODULE')
def __getattr__(self, name):
if name.startswith('LOG_AGGREGATOR_'):
cached = LOGGING_SETTINGS_CACHE.get(name)
if cached:
return cached
value = empty
if name in self.all_supported_settings:
with _ctit_db_wrapper(trans_safe=True):
value = self._get_local(name)
if value is not empty:
if name.startswith('LOG_AGGREGATOR_'):
LOGGING_SETTINGS_CACHE[name] = value
return value
value = self._get_default(name)
# sometimes users specify RabbitMQ passwords that contain

View File

@ -277,7 +277,7 @@ class WorkerPool(object):
logger.warn("could not write to queue %s" % preferred_queue)
logger.warn("detail: {}".format(tb))
write_attempt_order.append(preferred_queue)
logger.warn("could not write payload to any queue, attempted order: {}".format(write_attempt_order))
logger.error("could not write payload to any queue, attempted order: {}".format(write_attempt_order))
return None
def stop(self, signum):

View File

@ -119,6 +119,9 @@ class AWXConsumer(ConsumerMixin):
class BaseWorker(object):
def read(self, queue):
return queue.get(block=True, timeout=1)
def work_loop(self, queue, finished, idx, *args):
ppid = os.getppid()
signal_handler = WorkerSignalHandler()
@ -128,7 +131,7 @@ class BaseWorker(object):
if os.getppid() != ppid:
break
try:
body = queue.get(block=True, timeout=1)
body = self.read(queue)
if body == 'QUIT':
break
except QueueEmpty:

View File

@ -1,19 +1,26 @@
import logging
import time
import traceback
from queue import Empty as QueueEmpty
from django.utils.timezone import now as tz_now
from django.conf import settings
from django.db import DatabaseError, OperationalError, connection as django_connection
from django.db.utils import InterfaceError, InternalError
from django.db.utils import InterfaceError, InternalError, IntegrityError
from awx.main.consumers import emit_channel_notification
from awx.main.models import (JobEvent, AdHocCommandEvent, ProjectUpdateEvent,
InventoryUpdateEvent, SystemJobEvent, UnifiedJob)
from awx.main.models.events import emit_event_detail
from .base import BaseWorker
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
# the number of seconds to buffer events in memory before flushing
# using JobEvent.objects.bulk_create()
BUFFER_SECONDS = .1
class CallbackBrokerWorker(BaseWorker):
'''
@ -26,89 +33,130 @@ class CallbackBrokerWorker(BaseWorker):
MAX_RETRIES = 2
def __init__(self):
self.buff = {}
def read(self, queue):
try:
return queue.get(block=True, timeout=BUFFER_SECONDS)
except QueueEmpty:
return {'event': 'FLUSH'}
def flush(self, force=False):
now = tz_now()
if (
force or
any([len(events) >= 1000 for events in self.buff.values()])
):
for cls, events in self.buff.items():
logger.debug(f'{cls.__name__}.objects.bulk_create({len(events)})')
for e in events:
if not e.created:
e.created = now
e.modified = now
try:
cls.objects.bulk_create(events)
except Exception as exc:
# if an exception occurs, we should re-attempt to save the
# events one-by-one, because something in the list is
# broken/stale (e.g., an IntegrityError on a specific event)
for e in events:
try:
if (
isinstance(exc, IntegrityError),
getattr(e, 'host_id', '')
):
# this is one potential IntegrityError we can
# work around - if the host disappears before
# the event can be processed
e.host_id = None
e.save()
except Exception:
logger.exception('Database Error Saving Job Event')
for e in events:
emit_event_detail(e)
self.buff = {}
def perform_work(self, body):
try:
event_map = {
'job_id': JobEvent,
'ad_hoc_command_id': AdHocCommandEvent,
'project_update_id': ProjectUpdateEvent,
'inventory_update_id': InventoryUpdateEvent,
'system_job_id': SystemJobEvent,
}
flush = body.get('event') == 'FLUSH'
if not flush:
event_map = {
'job_id': JobEvent,
'ad_hoc_command_id': AdHocCommandEvent,
'project_update_id': ProjectUpdateEvent,
'inventory_update_id': InventoryUpdateEvent,
'system_job_id': SystemJobEvent,
}
if not any([key in body for key in event_map]):
raise Exception('Payload does not have a job identifier')
def _save_event_data():
job_identifier = 'unknown job'
job_key = 'unknown'
for key, cls in event_map.items():
if key in body:
cls.create_from_data(**body)
job_identifier = body[key]
job_key = key
break
job_identifier = 'unknown job'
job_key = 'unknown'
for key in event_map.keys():
if key in body:
job_identifier = body[key]
job_key = key
break
if settings.DEBUG:
from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import Terminal256Formatter
from pprint import pformat
if body.get('event') == 'EOF':
event_thing = 'EOF event'
else:
event_thing = 'event {}'.format(body.get('counter', 'unknown'))
logger.debug('Callback worker received {} for {} {}'.format(
event_thing, job_key[:-len('_id')], job_identifier
))
logger.debug('Body: {}'.format(
highlight(pformat(body, width=160), PythonLexer(), Terminal256Formatter(style='friendly'))
)[:1024 * 4])
if settings.DEBUG:
from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import Terminal256Formatter
from pprint import pformat
if body.get('event') == 'EOF':
event_thing = 'EOF event'
else:
event_thing = 'event {}'.format(body.get('counter', 'unknown'))
logger.info('Callback worker received {} for {} {}'.format(
event_thing, job_key[:-len('_id')], job_identifier
))
logger.debug('Body: {}'.format(
highlight(pformat(body, width=160), PythonLexer(), Terminal256Formatter(style='friendly'))
)[:1024 * 4])
try:
final_counter = body.get('final_counter', 0)
logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier))
# EOF events are sent when stdout for the running task is
# closed. don't actually persist them to the database; we
# just use them to report `summary` websocket events as an
# approximation for when a job is "done"
emit_channel_notification(
'jobs-summary',
dict(group_name='jobs', unified_job_id=job_identifier, final_counter=final_counter)
)
# Additionally, when we've processed all events, we should
# have all the data we need to send out success/failure
# notification templates
uj = UnifiedJob.objects.get(pk=job_identifier)
if hasattr(uj, 'send_notification_templates'):
retries = 0
while retries < 5:
if uj.finished:
uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')
break
else:
# wait a few seconds to avoid a race where the
# events are persisted _before_ the UJ.status
# changes from running -> successful
retries += 1
time.sleep(1)
uj = UnifiedJob.objects.get(pk=job_identifier)
except Exception:
logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier))
return
if body.get('event') == 'EOF':
try:
final_counter = body.get('final_counter', 0)
logger.info('Event processing is finished for Job {}, sending notifications'.format(job_identifier))
# EOF events are sent when stdout for the running task is
# closed. don't actually persist them to the database; we
# just use them to report `summary` websocket events as an
# approximation for when a job is "done"
emit_channel_notification(
'jobs-summary',
dict(group_name='jobs', unified_job_id=job_identifier, final_counter=final_counter)
)
# Additionally, when we've processed all events, we should
# have all the data we need to send out success/failure
# notification templates
uj = UnifiedJob.objects.get(pk=job_identifier)
if hasattr(uj, 'send_notification_templates'):
retries = 0
while retries < 5:
if uj.finished:
uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')
break
else:
# wait a few seconds to avoid a race where the
# events are persisted _before_ the UJ.status
# changes from running -> successful
retries += 1
time.sleep(1)
uj = UnifiedJob.objects.get(pk=job_identifier)
except Exception:
logger.exception('Worker failed to emit notifications: Job {}'.format(job_identifier))
return
event = cls.create_from_data(**body)
self.buff.setdefault(cls, []).append(event)
retries = 0
while retries <= self.MAX_RETRIES:
try:
_save_event_data()
self.flush(force=flush)
break
except (OperationalError, InterfaceError, InternalError):
if retries >= self.MAX_RETRIES:
logger.exception('Worker could not re-establish database connectivity, giving up on event for Job {}'.format(job_identifier))
logger.exception('Worker could not re-establish database connectivity, giving up on one or more events.')
return
delay = 60 * retries
logger.exception('Database Error Saving Job Event, retry #{i} in {delay} seconds:'.format(
@ -119,7 +167,7 @@ class CallbackBrokerWorker(BaseWorker):
time.sleep(delay)
retries += 1
except DatabaseError:
logger.exception('Database Error Saving Job Event for Job {}'.format(job_identifier))
logger.exception('Database Error Saving Job Event')
break
except Exception as exc:
tb = traceback.format_exc()

View File

@ -0,0 +1,37 @@
import time
import sys
from django.db import connection
from django.core.management.base import BaseCommand
class Command(BaseCommand):
def handle(self, *args, **options):
with connection.cursor() as cursor:
clear = False
while True:
lines = []
for relation in (
'main_jobevent', 'main_inventoryupdateevent',
'main_projectupdateevent', 'main_adhoccommandevent'
):
lines.append(relation)
for label, interval in (
('last minute: ', '1 minute'),
('last 5 minutes:', '5 minutes'),
('last hour: ', '1 hour'),
):
cursor.execute(
f"SELECT MAX(id) - MIN(id) FROM {relation} WHERE modified > now() - '{interval}'::interval;"
)
events = cursor.fetchone()[0] or 0
lines.append(f'{label} {events}')
lines.append('')
if clear:
for i in range(20):
sys.stdout.write('\x1b[1A\x1b[2K')
for l in lines:
print(l)
clear = True
time.sleep(.25)

View File

@ -9,6 +9,7 @@ import random
from django.utils import timezone
from django.core.management.base import BaseCommand
from awx.main.models.events import emit_event_detail
from awx.main.models import (
UnifiedJob,
Job,
@ -17,14 +18,6 @@ from awx.main.models import (
InventoryUpdate,
SystemJob
)
from awx.main.consumers import emit_channel_notification
from awx.api.serializers import (
JobEventWebSocketSerializer,
AdHocCommandEventWebSocketSerializer,
ProjectUpdateEventWebSocketSerializer,
InventoryUpdateEventWebSocketSerializer,
SystemJobEventWebSocketSerializer
)
class JobStatusLifeCycle():
@ -96,21 +89,6 @@ class ReplayJobEvents(JobStatusLifeCycle):
raise RuntimeError("No events for job id {}".format(job.id))
return job_events, count
def get_serializer(self, job):
if type(job) is Job:
return JobEventWebSocketSerializer
elif type(job) is AdHocCommand:
return AdHocCommandEventWebSocketSerializer
elif type(job) is ProjectUpdate:
return ProjectUpdateEventWebSocketSerializer
elif type(job) is InventoryUpdate:
return InventoryUpdateEventWebSocketSerializer
elif type(job) is SystemJob:
return SystemJobEventWebSocketSerializer
else:
raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job)))
sys.exit(1)
def run(self, job_id, speed=1.0, verbosity=0, skip_range=[], random_seed=0, final_status_delay=0, debug=False):
stats = {
'events_ontime': {
@ -136,7 +114,6 @@ class ReplayJobEvents(JobStatusLifeCycle):
try:
job = self.get_job(job_id)
job_events, job_event_count = self.get_job_events(job)
serializer = self.get_serializer(job)
except RuntimeError as e:
print("{}".format(e.message))
sys.exit(1)
@ -162,8 +139,7 @@ class ReplayJobEvents(JobStatusLifeCycle):
stats['replay_start'] = self.replay_start
je_previous = je_current
je_serialized = serializer(je_current).data
emit_channel_notification('{}-{}'.format(je_serialized['group_name'], job.id), je_serialized)
emit_event_detail(je_current)
replay_offset = self.replay_offset(je_previous.created, speed)
recording_diff = (je_current.created - je_previous.created).total_seconds() * (1.0 / speed)

View File

@ -1,8 +1,9 @@
# -*- coding: utf-8 -*-
import datetime
import logging
from collections import defaultdict
from django.conf import settings
from django.db import models, DatabaseError
from django.utils.dateparse import parse_datetime
from django.utils.text import Truncator
@ -11,9 +12,10 @@ from django.utils.translation import ugettext_lazy as _
from django.utils.encoding import force_text
from awx.api.versioning import reverse
from awx.main import consumers
from awx.main.fields import JSONField
from awx.main.models.base import CreatedModifiedModel
from awx.main.utils import ignore_inventory_computed_fields
from awx.main.utils import ignore_inventory_computed_fields, camelcase_to_underscore
analytics_logger = logging.getLogger('awx.analytics.job_events')
@ -55,6 +57,51 @@ def create_host_status_counts(event_data):
return dict(host_status_counts)
def emit_event_detail(event):
cls = event.__class__
relation = {
JobEvent: 'job_id',
AdHocCommandEvent: 'ad_hoc_command_id',
ProjectUpdateEvent: 'project_update_id',
InventoryUpdateEvent: 'inventory_update_id',
SystemJobEvent: 'system_job_id',
}[cls]
url = ''
if isinstance(event, JobEvent):
url = '/api/v2/job_events/{}'.format(event.id)
if isinstance(event, AdHocCommandEvent):
url = '/api/v2/ad_hoc_command_events/{}'.format(event.id)
group = camelcase_to_underscore(cls.__name__) + 's'
timestamp = event.created.isoformat()
consumers.emit_channel_notification(
'-'.join([group, str(getattr(event, relation))]),
{
'id': event.id,
relation.replace('_id', ''): getattr(event, relation),
'created': timestamp,
'modified': timestamp,
'group_name': group,
'url': url,
'stdout': event.stdout,
'counter': event.counter,
'uuid': event.uuid,
'parent_uuid': getattr(event, 'parent_uuid', ''),
'start_line': event.start_line,
'end_line': event.end_line,
'event': event.event,
'event_data': getattr(event, 'event_data', {}),
'failed': event.failed,
'changed': event.changed,
'event_level': getattr(event, 'event_level', ''),
'play': getattr(event, 'play', ''),
'role': getattr(event, 'role', ''),
'task': getattr(event, 'task', ''),
}
)
class BasePlaybookEvent(CreatedModifiedModel):
'''
An event/message logged from a playbook callback for each host.
@ -63,7 +110,7 @@ class BasePlaybookEvent(CreatedModifiedModel):
VALID_KEYS = [
'event', 'event_data', 'playbook', 'play', 'role', 'task', 'created',
'counter', 'uuid', 'stdout', 'parent_uuid', 'start_line', 'end_line',
'verbosity'
'host_id', 'host_name', 'verbosity',
]
class Meta:
@ -271,37 +318,67 @@ class BasePlaybookEvent(CreatedModifiedModel):
def _update_from_event_data(self):
# Update event model fields from event data.
updated_fields = set()
event_data = self.event_data
res = event_data.get('res', None)
if self.event in self.FAILED_EVENTS and not event_data.get('ignore_errors', False):
self.failed = True
updated_fields.add('failed')
if isinstance(res, dict):
if res.get('changed', False):
self.changed = True
updated_fields.add('changed')
if self.event == 'playbook_on_stats':
try:
failures_dict = event_data.get('failures', {})
dark_dict = event_data.get('dark', {})
self.failed = bool(sum(failures_dict.values()) +
sum(dark_dict.values()))
updated_fields.add('failed')
changed_dict = event_data.get('changed', {})
self.changed = bool(sum(changed_dict.values()))
updated_fields.add('changed')
except (AttributeError, TypeError):
pass
if isinstance(self, JobEvent):
hostnames = self._hostnames()
self._update_host_summary_from_stats(hostnames)
if self.job.inventory:
try:
self.job.inventory.update_computed_fields()
except DatabaseError:
logger.exception('Computed fields database error saving event {}'.format(self.pk))
# find parent links and progagate changed=T and failed=T
changed = self.job.job_events.filter(changed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa
failed = self.job.job_events.filter(failed=True).exclude(parent_uuid=None).only('parent_uuid').values_list('parent_uuid', flat=True).distinct() # noqa
JobEvent.objects.filter(
job_id=self.job_id, uuid__in=changed
).update(changed=True)
JobEvent.objects.filter(
job_id=self.job_id, uuid__in=failed
).update(failed=True)
for field in ('playbook', 'play', 'task', 'role'):
value = force_text(event_data.get(field, '')).strip()
if value != getattr(self, field):
setattr(self, field, value)
updated_fields.add(field)
return updated_fields
if isinstance(self, JobEvent):
analytics_logger.info(
'Event data saved.',
extra=dict(python_objects=dict(job_event=self))
)
@classmethod
def create_from_data(cls, **kwargs):
#
# ⚠️ D-D-D-DANGER ZONE ⚠️
# This function is called by the callback receiver *once* for *every
# event* emitted by Ansible as a playbook runs. That means that
# changes to this function are _very_ susceptible to introducing
# performance regressions (which the user will experience as "my
# playbook stdout takes too long to show up"), *especially* code which
# might invoke additional database queries per event.
#
# Proceed with caution!
#
pk = None
for key in ('job_id', 'project_update_id'):
if key in kwargs:
@ -325,74 +402,16 @@ class BasePlaybookEvent(CreatedModifiedModel):
sanitize_event_keys(kwargs, cls.VALID_KEYS)
workflow_job_id = kwargs.pop('workflow_job_id', None)
job_event = cls.objects.create(**kwargs)
event = cls(**kwargs)
if workflow_job_id:
setattr(job_event, 'workflow_job_id', workflow_job_id)
analytics_logger.info('Event data saved.', extra=dict(python_objects=dict(job_event=job_event)))
return job_event
setattr(event, 'workflow_job_id', workflow_job_id)
event._update_from_event_data()
return event
@property
def job_verbosity(self):
return 0
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Update model fields and related objects unless we're only updating
# failed/changed flags triggered from a child event.
from_parent_update = kwargs.pop('from_parent_update', False)
if not from_parent_update:
# Update model fields from event data.
updated_fields = self._update_from_event_data()
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Update host related field from host_name.
if hasattr(self, 'job') and not self.host_id and self.host_name:
if self.job.inventory.kind == 'smart':
# optimization to avoid calling inventory.hosts, which
# can take a long time to run under some circumstances
from awx.main.models.inventory import SmartInventoryMembership
membership = SmartInventoryMembership.objects.filter(
inventory=self.job.inventory, host__name=self.host_name
).first()
if membership:
host_id = membership.host_id
else:
host_id = None
else:
host_qs = self.job.inventory.hosts.filter(name=self.host_name)
host_id = host_qs.only('id').values_list('id', flat=True).first()
if host_id != self.host_id:
self.host_id = host_id
if 'host_id' not in update_fields:
update_fields.append('host_id')
super(BasePlaybookEvent, self).save(*args, **kwargs)
# Update related objects after this event is saved.
if hasattr(self, 'job') and not from_parent_update:
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self._update_hosts()
if self.parent_uuid:
kwargs = {}
if self.changed is True:
kwargs['changed'] = True
if self.failed is True:
kwargs['failed'] = True
if kwargs:
JobEvent.objects.filter(job_id=self.job_id, uuid=self.parent_uuid).update(**kwargs)
if self.event == 'playbook_on_stats':
hostnames = self._hostnames()
self._update_host_summary_from_stats(hostnames)
try:
self.job.inventory.update_computed_fields()
except DatabaseError:
logger.exception('Computed fields database error saving event {}'.format(self.pk))
class JobEvent(BasePlaybookEvent):
'''
@ -456,38 +475,6 @@ class JobEvent(BasePlaybookEvent):
def __str__(self):
return u'%s @ %s' % (self.get_event_display2(), self.created.isoformat())
def _update_from_event_data(self):
# Update job event hostname
updated_fields = super(JobEvent, self)._update_from_event_data()
value = force_text(self.event_data.get('host', '')).strip()
if value != getattr(self, 'host_name'):
setattr(self, 'host_name', value)
updated_fields.add('host_name')
return updated_fields
def _update_hosts(self, extra_host_pks=None):
# Update job event hosts m2m from host_name, propagate to parent events.
extra_host_pks = set(extra_host_pks or [])
hostnames = set()
if self.host_name:
hostnames.add(self.host_name)
if self.event == 'playbook_on_stats':
try:
for v in self.event_data.values():
hostnames.update(v.keys())
except AttributeError: # In case event_data or v isn't a dict.
pass
qs = self.job.inventory.hosts.all()
qs = qs.filter(models.Q(name__in=hostnames) | models.Q(pk__in=extra_host_pks))
qs = qs.exclude(job_events__pk=self.id).only('id')
for host in qs:
self.hosts.add(host)
if self.parent_uuid:
parent = JobEvent.objects.filter(uuid=self.parent_uuid)
if parent.exists():
parent = parent[0]
parent._update_hosts(qs.values_list('id', flat=True))
def _hostnames(self):
hostnames = set()
try:
@ -605,6 +592,17 @@ class BaseCommandEvent(CreatedModifiedModel):
@classmethod
def create_from_data(cls, **kwargs):
#
# ⚠️ D-D-D-DANGER ZONE ⚠️
# This function is called by the callback receiver *once* for *every
# event* emitted by Ansible as a playbook runs. That means that
# changes to this function are _very_ susceptible to introducing
# performance regressions (which the user will experience as "my
# playbook stdout takes too long to show up"), *especially* code which
# might invoke additional database queries per event.
#
# Proceed with caution!
#
# Convert the datetime for the event's creation
# appropriately, and include a time zone for it.
#
@ -619,13 +617,8 @@ class BaseCommandEvent(CreatedModifiedModel):
kwargs.pop('created', None)
sanitize_event_keys(kwargs, cls.VALID_KEYS)
kwargs.pop('workflow_job_id', None)
event = cls.objects.create(**kwargs)
if isinstance(event, AdHocCommandEvent):
analytics_logger.info(
'Event data saved.',
extra=dict(python_objects=dict(job_event=event))
)
event = cls(**kwargs)
event._update_from_event_data()
return event
def get_event_display(self):
@ -640,10 +633,15 @@ class BaseCommandEvent(CreatedModifiedModel):
def get_host_status_counts(self):
return create_host_status_counts(getattr(self, 'event_data', {}))
def _update_from_event_data(self):
pass
class AdHocCommandEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command_id', 'event', 'workflow_job_id']
VALID_KEYS = BaseCommandEvent.VALID_KEYS + [
'ad_hoc_command_id', 'event', 'host_name', 'host_id', 'workflow_job_id'
]
class Meta:
app_label = 'main'
@ -719,34 +717,18 @@ class AdHocCommandEvent(BaseCommandEvent):
def get_absolute_url(self, request=None):
return reverse('api:ad_hoc_command_event_detail', kwargs={'pk': self.pk}, request=request)
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
def _update_from_event_data(self):
res = self.event_data.get('res', None)
if self.event in self.FAILED_EVENTS:
if not self.event_data.get('ignore_errors', False):
self.failed = True
if 'failed' not in update_fields:
update_fields.append('failed')
if isinstance(res, dict) and res.get('changed', False):
self.changed = True
if 'changed' not in update_fields:
update_fields.append('changed')
self.host_name = self.event_data.get('host', '').strip()
if 'host_name' not in update_fields:
update_fields.append('host_name')
if not self.host_id and self.host_name:
host_qs = self.ad_hoc_command.inventory.hosts.filter(name=self.host_name)
try:
host_id = host_qs.only('id').values_list('id', flat=True)
if host_id.exists():
self.host_id = host_id[0]
if 'host_id' not in update_fields:
update_fields.append('host_id')
except (IndexError, AttributeError):
pass
super(AdHocCommandEvent, self).save(*args, **kwargs)
analytics_logger.info(
'Event data saved.',
extra=dict(python_objects=dict(job_event=self))
)
class InventoryUpdateEvent(BaseCommandEvent):

View File

@ -30,12 +30,11 @@ from crum.signals import current_user_getter
# AWX
from awx.main.models import (
ActivityStream, AdHocCommandEvent, Group, Host, InstanceGroup, Inventory,
InventorySource, InventoryUpdateEvent, Job, JobEvent, JobHostSummary,
JobTemplate, OAuth2AccessToken, Organization, Project, ProjectUpdateEvent,
Role, SystemJob, SystemJobEvent, SystemJobTemplate, UnifiedJob,
UnifiedJobTemplate, User, UserSessionMembership, WorkflowJobTemplateNode,
WorkflowApproval, WorkflowApprovalTemplate, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
ActivityStream, Group, Host, InstanceGroup, Inventory, InventorySource,
Job, JobHostSummary, JobTemplate, OAuth2AccessToken, Organization, Project,
Role, SystemJob, SystemJobTemplate, UnifiedJob, UnifiedJobTemplate, User,
UserSessionMembership, WorkflowJobTemplateNode, WorkflowApproval,
WorkflowApprovalTemplate, ROLE_SINGLETON_SYSTEM_ADMINISTRATOR
)
from awx.main.constants import CENSOR_VALUE
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, get_current_apps
@ -72,42 +71,6 @@ def get_current_user_or_none():
return u
def emit_event_detail(serializer, relation, **kwargs):
instance = kwargs['instance']
created = kwargs['created']
if created:
event_serializer = serializer(instance)
consumers.emit_channel_notification(
'-'.join([event_serializer.get_group_name(instance), str(getattr(instance, relation))]),
event_serializer.data
)
def emit_job_event_detail(sender, **kwargs):
from awx.api import serializers
emit_event_detail(serializers.JobEventWebSocketSerializer, 'job_id', **kwargs)
def emit_ad_hoc_command_event_detail(sender, **kwargs):
from awx.api import serializers
emit_event_detail(serializers.AdHocCommandEventWebSocketSerializer, 'ad_hoc_command_id', **kwargs)
def emit_project_update_event_detail(sender, **kwargs):
from awx.api import serializers
emit_event_detail(serializers.ProjectUpdateEventWebSocketSerializer, 'project_update_id', **kwargs)
def emit_inventory_update_event_detail(sender, **kwargs):
from awx.api import serializers
emit_event_detail(serializers.InventoryUpdateEventWebSocketSerializer, 'inventory_update_id', **kwargs)
def emit_system_job_event_detail(sender, **kwargs):
from awx.api import serializers
emit_event_detail(serializers.SystemJobEventWebSocketSerializer, 'system_job_id', **kwargs)
def emit_update_inventory_computed_fields(sender, **kwargs):
logger.debug("In update inventory computed fields")
if getattr(_inventory_updates, 'is_updating', False):
@ -258,11 +221,6 @@ connect_computed_field_signals()
post_save.connect(save_related_job_templates, sender=Project)
post_save.connect(save_related_job_templates, sender=Inventory)
post_save.connect(emit_job_event_detail, sender=JobEvent)
post_save.connect(emit_ad_hoc_command_event_detail, sender=AdHocCommandEvent)
post_save.connect(emit_project_update_event_detail, sender=ProjectUpdateEvent)
post_save.connect(emit_inventory_update_event_detail, sender=InventoryUpdateEvent)
post_save.connect(emit_system_job_event_detail, sender=SystemJobEvent)
m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through)
m2m_changed.connect(rbac_activity_stream, Role.members.through)
m2m_changed.connect(rbac_activity_stream, Role.parents.through)

View File

@ -703,6 +703,7 @@ class BaseTask(object):
def __init__(self):
self.cleanup_paths = []
self.parent_workflow_job_id = None
self.host_map = {}
def update_model(self, pk, _attempt=0, **updates):
"""Reload the model instance from the database and update the
@ -1001,11 +1002,17 @@ class BaseTask(object):
return False
def build_inventory(self, instance, private_data_dir):
script_params = dict(hostvars=True)
script_params = dict(hostvars=True, towervars=True)
if hasattr(instance, 'job_slice_number'):
script_params['slice_number'] = instance.job_slice_number
script_params['slice_count'] = instance.job_slice_count
script_data = instance.inventory.get_script_data(**script_params)
# maintain a list of host_name --> host_id
# so we can associate emitted events to Host objects
self.host_map = {
hostname: hv.pop('remote_tower_id', '')
for hostname, hv in script_data.get('_meta', {}).get('hostvars', {}).items()
}
json_data = json.dumps(script_data)
handle, path = tempfile.mkstemp(dir=private_data_dir)
f = os.fdopen(handle, 'w')
@ -1114,6 +1121,15 @@ class BaseTask(object):
event_data.pop('parent_uuid', None)
if self.parent_workflow_job_id:
event_data['workflow_job_id'] = self.parent_workflow_job_id
if self.host_map:
host = event_data.get('event_data', {}).get('host', '').strip()
if host:
event_data['host_name'] = host
if host in self.host_map:
event_data['host_id'] = self.host_map[host]
else:
event_data['host_name'] = ''
event_data['host_id'] = ''
should_write_event = False
event_data.setdefault(self.event_data_key, self.instance.id)
self.dispatcher.dispatch(event_data)

View File

@ -15,7 +15,7 @@ def test_job_events_sublist_truncation(get, organization_factory, job_template_f
inventory='test_inv', project='test_proj').job_template
job = jt.create_unified_job()
JobEvent.create_from_data(job_id=job.pk, uuid='abc123', event='runner_on_start',
stdout='a' * 1025)
stdout='a' * 1025).save()
url = reverse('api:job_job_events_list', kwargs={'pk': job.pk})
if not truncate:
@ -35,7 +35,7 @@ def test_ad_hoc_events_sublist_truncation(get, organization_factory, job_templat
adhoc = AdHocCommand()
adhoc.save()
AdHocCommandEvent.create_from_data(ad_hoc_command_id=adhoc.pk, uuid='abc123', event='runner_on_start',
stdout='a' * 1025)
stdout='a' * 1025).save()
url = reverse('api:ad_hoc_command_ad_hoc_command_events_list', kwargs={'pk': adhoc.pk})
if not truncate:

View File

@ -5,6 +5,7 @@
# Python
import pytest
import os
import time
from django.conf import settings
from kombu.utils.url import parse_url
@ -276,6 +277,7 @@ def test_logging_aggregrator_connection_test_valid(mocker, get, post, admin):
def test_logging_aggregrator_connection_test_with_masked_password(mocker, patch, post, admin):
url = reverse('api:setting_singleton_detail', kwargs={'category_slug': 'logging'})
patch(url, user=admin, data={'LOG_AGGREGATOR_PASSWORD': 'password123'}, expect=200)
time.sleep(1) # log settings are cached slightly
with mock.patch.object(AWXProxyHandler, 'perform_test') as perform_test:
url = reverse('api:setting_logging_test')

View File

@ -37,26 +37,26 @@ class TestKeyRegeneration:
def test_encrypted_setting_values(self):
# test basic decryption
settings.LOG_AGGREGATOR_PASSWORD = 'sensitive'
s = Setting.objects.filter(key='LOG_AGGREGATOR_PASSWORD').first()
settings.REDHAT_PASSWORD = 'sensitive'
s = Setting.objects.filter(key='REDHAT_PASSWORD').first()
assert s.value.startswith(PREFIX)
assert settings.LOG_AGGREGATOR_PASSWORD == 'sensitive'
assert settings.REDHAT_PASSWORD == 'sensitive'
# re-key the setting value
new_key = regenerate_secret_key.Command().handle()
new_setting = Setting.objects.filter(key='LOG_AGGREGATOR_PASSWORD').first()
new_setting = Setting.objects.filter(key='REDHAT_PASSWORD').first()
assert s.value != new_setting.value
# wipe out the local cache so the value is pulled from the DB again
settings.cache.delete('LOG_AGGREGATOR_PASSWORD')
settings.cache.delete('REDHAT_PASSWORD')
# verify that the old SECRET_KEY doesn't work
with pytest.raises(InvalidToken):
settings.LOG_AGGREGATOR_PASSWORD
settings.REDHAT_PASSWORD
# verify that the new SECRET_KEY *does* work
with override_settings(SECRET_KEY=new_key):
assert settings.LOG_AGGREGATOR_PASSWORD == 'sensitive'
assert settings.REDHAT_PASSWORD == 'sensitive'
def test_encrypted_notification_secrets(self, notification_template_with_encrypt):
# test basic decryption

View File

@ -1,18 +1,15 @@
from unittest import mock
import pytest
from awx.main.models import (Job, JobEvent, ProjectUpdate, ProjectUpdateEvent,
AdHocCommand, AdHocCommandEvent, InventoryUpdate,
InventorySource, InventoryUpdateEvent, SystemJob,
SystemJobEvent)
from awx.main.models import Job, JobEvent
@pytest.mark.django_db
@mock.patch('awx.main.consumers.emit_channel_notification')
@mock.patch('awx.main.models.events.emit_event_detail')
def test_parent_changed(emit):
j = Job()
j.save()
JobEvent.create_from_data(job_id=j.pk, uuid='abc123', event='playbook_on_task_start')
JobEvent.create_from_data(job_id=j.pk, uuid='abc123', event='playbook_on_task_start').save()
assert JobEvent.objects.count() == 1
for e in JobEvent.objects.all():
assert e.changed is False
@ -24,19 +21,26 @@ def test_parent_changed(emit):
event_data={
'res': {'changed': ['localhost']}
}
)
assert JobEvent.objects.count() == 2
for e in JobEvent.objects.all():
).save()
# the `playbook_on_stats` event is where we update the parent changed linkage
JobEvent.create_from_data(
job_id=j.pk,
parent_uuid='abc123',
event='playbook_on_stats'
).save()
events = JobEvent.objects.filter(event__in=['playbook_on_task_start', 'runner_on_ok'])
assert events.count() == 2
for e in events.all():
assert e.changed is True
@pytest.mark.django_db
@pytest.mark.parametrize('event', JobEvent.FAILED_EVENTS)
@mock.patch('awx.main.consumers.emit_channel_notification')
@mock.patch('awx.main.models.events.emit_event_detail')
def test_parent_failed(emit, event):
j = Job()
j.save()
JobEvent.create_from_data(job_id=j.pk, uuid='abc123', event='playbook_on_task_start')
JobEvent.create_from_data(job_id=j.pk, uuid='abc123', event='playbook_on_task_start').save()
assert JobEvent.objects.count() == 1
for e in JobEvent.objects.all():
assert e.failed is False
@ -45,69 +49,15 @@ def test_parent_failed(emit, event):
job_id=j.pk,
parent_uuid='abc123',
event=event
)
assert JobEvent.objects.count() == 2
for e in JobEvent.objects.all():
).save()
# the `playbook_on_stats` event is where we update the parent failed linkage
JobEvent.create_from_data(
job_id=j.pk,
parent_uuid='abc123',
event='playbook_on_stats'
).save()
events = JobEvent.objects.filter(event__in=['playbook_on_task_start', event])
assert events.count() == 2
for e in events.all():
assert e.failed is True
@pytest.mark.django_db
@mock.patch('awx.main.consumers.emit_channel_notification')
def test_job_event_websocket_notifications(emit):
j = Job(id=123)
j.save()
JobEvent.create_from_data(job_id=j.pk)
assert len(emit.call_args_list) == 1
topic, payload = emit.call_args_list[0][0]
assert topic == 'job_events-123'
assert payload['job'] == 123
@pytest.mark.django_db
@mock.patch('awx.main.consumers.emit_channel_notification')
def test_ad_hoc_event_websocket_notifications(emit):
ahc = AdHocCommand(id=123)
ahc.save()
AdHocCommandEvent.create_from_data(ad_hoc_command_id=ahc.pk)
assert len(emit.call_args_list) == 1
topic, payload = emit.call_args_list[0][0]
assert topic == 'ad_hoc_command_events-123'
assert payload['ad_hoc_command'] == 123
@pytest.mark.django_db
@mock.patch('awx.main.consumers.emit_channel_notification')
def test_project_update_event_websocket_notifications(emit, project):
pu = ProjectUpdate(id=123, project=project)
pu.save()
ProjectUpdateEvent.create_from_data(project_update_id=pu.pk)
assert len(emit.call_args_list) == 1
topic, payload = emit.call_args_list[0][0]
assert topic == 'project_update_events-123'
assert payload['project_update'] == 123
@pytest.mark.django_db
@mock.patch('awx.main.consumers.emit_channel_notification')
def test_inventory_update_event_websocket_notifications(emit, inventory):
source = InventorySource()
source.save()
iu = InventoryUpdate(id=123, inventory_source=source)
iu.save()
InventoryUpdateEvent.create_from_data(inventory_update_id=iu.pk)
assert len(emit.call_args_list) == 1
topic, payload = emit.call_args_list[0][0]
assert topic == 'inventory_update_events-123'
assert payload['inventory_update'] == 123
@pytest.mark.django_db
@mock.patch('awx.main.consumers.emit_channel_notification')
def test_system_job_event_websocket_notifications(emit, inventory):
j = SystemJob(id=123)
j.save()
SystemJobEvent.create_from_data(system_job_id=j.pk)
assert len(emit.call_args_list) == 1
topic, payload = emit.call_args_list[0][0]
assert topic == 'system_job_events-123'
assert payload['system_job'] == 123

View File

@ -60,7 +60,7 @@ class TestReplayJobEvents():
r.emit_job_status = lambda job, status: True
return r
@mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None)
@mock.patch('awx.main.management.commands.replay_job_events.emit_event_detail', lambda *a, **kw: None)
def test_sleep(self, mocker, replayer):
replayer.run(3, 1)
@ -74,7 +74,7 @@ class TestReplayJobEvents():
mock.call(0.000001),
])
@mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None)
@mock.patch('awx.main.management.commands.replay_job_events.emit_event_detail', lambda *a, **kw: None)
def test_speed(self, mocker, replayer):
replayer.run(3, 2)

View File

@ -1,6 +1,5 @@
from datetime import datetime
from django.utils.timezone import utc
from unittest import mock
import pytest
from awx.main.models import (JobEvent, ProjectUpdateEvent, AdHocCommandEvent,
@ -18,16 +17,11 @@ from awx.main.models import (JobEvent, ProjectUpdateEvent, AdHocCommandEvent,
datetime(2018, 1, 1).isoformat(), datetime(2018, 1, 1)
])
def test_event_parse_created(job_identifier, cls, created):
with mock.patch.object(cls, 'objects') as manager:
cls.create_from_data(**{
job_identifier: 123,
'created': created
})
expected_created = datetime(2018, 1, 1).replace(tzinfo=utc)
manager.create.assert_called_with(**{
job_identifier: 123,
'created': expected_created
})
event = cls.create_from_data(**{
job_identifier: 123,
'created': created
})
assert event.created == datetime(2018, 1, 1).replace(tzinfo=utc)
@pytest.mark.parametrize('job_identifier, cls', [
@ -38,24 +32,20 @@ def test_event_parse_created(job_identifier, cls, created):
['system_job_id', SystemJobEvent],
])
def test_playbook_event_strip_invalid_keys(job_identifier, cls):
with mock.patch.object(cls, 'objects') as manager:
cls.create_from_data(**{
job_identifier: 123,
'extra_key': 'extra_value'
})
manager.create.assert_called_with(**{job_identifier: 123})
event = cls.create_from_data(**{
job_identifier: 123,
'extra_key': 'extra_value'
})
assert getattr(event, job_identifier) == 123
assert not hasattr(event, 'extra_key')
@pytest.mark.parametrize('field', [
'play', 'role', 'task', 'playbook'
])
def test_really_long_event_fields(field):
with mock.patch.object(JobEvent, 'objects') as manager:
JobEvent.create_from_data(**{
'job_id': 123,
'event_data': {field: 'X' * 4096}
})
manager.create.assert_called_with(**{
'job_id': 123,
'event_data': {field: 'X' * 1023 + ''}
})
event = JobEvent.create_from_data(**{
'job_id': 123,
'event_data': {field: 'X' * 4096}
})
assert event.event_data[field] == 'X' * 1023 + ''

View File

@ -576,9 +576,6 @@ ANSIBLE_INVENTORY_UNPARSED_FAILED = True
# Additional environment variables to be passed to the ansible subprocesses
AWX_TASK_ENV = {}
# Flag to enable/disable updating hosts M2M when saving job events.
CAPTURE_JOB_EVENT_HOSTS = False
# Rebuild Host Smart Inventory memberships.
AWX_REBUILD_SMART_MEMBERSHIP = False