generalize stdout event processing to emit events for *all* job types

* introduces three new models: `ProjectUpdateEvent`,
  `InventoryUpdateEvent`, and `SystemJobEvent`
* simplifies the stdout callback management in `tasks.py` - now _all_
  job run types capture and emit events to the callback receiver
* supports stdout reconstruction from events for stdout downloads for
  _all_ job types
* configures `ProjectUpdate` runs to configure the awx display callback
  (so we can capture real playbook events for `project_update.yml`)
* ProjectUpdate, InventoryUpdate, and SystemJob runs no longer write
  text blobs to the deprecated `main_unifiedjob.result_stdout_text` column

see: https://github.com/ansible/awx/issues/200
This commit is contained in:
Ryan Petrello
2017-12-13 16:05:46 -05:00
parent 815cd829e0
commit fc94b3a943
9 changed files with 924 additions and 767 deletions

View File

@@ -4586,25 +4586,18 @@ class UnifiedJobStdout(RetrieveAPIView):
tablename, related_name = { tablename, related_name = {
Job: ('main_jobevent', 'job_id'), Job: ('main_jobevent', 'job_id'),
AdHocCommand: ('main_adhoccommandevent', 'ad_hoc_command_id'), AdHocCommand: ('main_adhoccommandevent', 'ad_hoc_command_id'),
}.get(unified_job.__class__, (None, None)) }.get(unified_job.__class__, ('main_genericcommandevent', 'unified_job_id'))
if tablename is None: cursor.copy_expert(
# stdout job event reconstruction isn't supported "copy (select stdout from {} where {}={} order by start_line) to stdout".format(
# for certain job types (such as inventory syncs), tablename,
# so just grab the raw stdout from the DB related_name,
write_fd.write(unified_job.result_stdout_text) unified_job.id
write_fd.close() ),
else: write_fd
cursor.copy_expert( )
"copy (select stdout from {} where {}={} order by start_line) to stdout".format( write_fd.close()
tablename, subprocess.Popen("sed -i 's/\\\\r\\\\n/\\n/g' {}".format(unified_job.result_stdout_file),
related_name, shell=True).wait()
unified_job.id
),
write_fd
)
write_fd.close()
subprocess.Popen("sed -i 's/\\\\r\\\\n/\\n/g' {}".format(unified_job.result_stdout_file),
shell=True).wait()
except Exception as e: except Exception as e:
return Response({"error": _("Error generating stdout download file: {}".format(e))}) return Response({"error": _("Error generating stdout download file: {}".format(e))})
try: try:

View File

@@ -123,6 +123,8 @@ class EventContext(object):
event_data['job_id'] = int(os.getenv('JOB_ID', '0')) event_data['job_id'] = int(os.getenv('JOB_ID', '0'))
if os.getenv('AD_HOC_COMMAND_ID', ''): if os.getenv('AD_HOC_COMMAND_ID', ''):
event_data['ad_hoc_command_id'] = int(os.getenv('AD_HOC_COMMAND_ID', '0')) event_data['ad_hoc_command_id'] = int(os.getenv('AD_HOC_COMMAND_ID', '0'))
if os.getenv('PROJECT_UPDATE_ID', ''):
event_data['project_update_id'] = int(os.getenv('PROJECT_UPDATE_ID', '0'))
event_data.setdefault('pid', os.getpid()) event_data.setdefault('pid', os.getpid())
event_data.setdefault('uuid', str(uuid.uuid4())) event_data.setdefault('uuid', str(uuid.uuid4()))
event_data.setdefault('created', datetime.datetime.utcnow().isoformat()) event_data.setdefault('created', datetime.datetime.utcnow().isoformat())
@@ -145,7 +147,7 @@ class EventContext(object):
event_data['res'] = {} event_data['res'] = {}
event_dict = dict(event=event, event_data=event_data) event_dict = dict(event=event, event_data=event_data)
for key in event_data.keys(): for key in event_data.keys():
if key in ('job_id', 'ad_hoc_command_id', 'uuid', 'parent_uuid', 'created',): if key in ('job_id', 'ad_hoc_command_id', 'project_update_id', 'uuid', 'parent_uuid', 'created',):
event_dict[key] = event_data.pop(key) event_dict[key] = event_data.pop(key)
elif key in ('verbosity', 'pid'): elif key in ('verbosity', 'pid'):
event_dict[key] = event_data[key] event_dict[key] = event_data[key]

View File

@@ -128,8 +128,17 @@ class CallbackBrokerWorker(ConsumerMixin):
logger.error("Exception on worker thread, restarting: " + str(e)) logger.error("Exception on worker thread, restarting: " + str(e))
continue continue
try: try:
if 'job_id' not in body and 'ad_hoc_command_id' not in body:
raise Exception('Payload does not have a job_id or ad_hoc_command_id') event_map = {
'job_id': JobEvent,
'ad_hoc_command_id': AdHocCommandEvent,
'project_update_id': ProjectUpdateEvent,
'inventory_update_id': InventoryUpdateEvent,
'system_job_id': SystemJobEvent,
}
if not any([key in body for key in event_map]):
raise Exception('Payload does not have a job identifier')
if settings.DEBUG: if settings.DEBUG:
from pygments import highlight from pygments import highlight
from pygments.lexers import PythonLexer from pygments.lexers import PythonLexer
@@ -140,16 +149,15 @@ class CallbackBrokerWorker(ConsumerMixin):
)) ))
def _save_event_data(): def _save_event_data():
if 'job_id' in body: for key, cls in event_map.items():
JobEvent.create_from_data(**body) if key in body:
elif 'ad_hoc_command_id' in body: cls.create_from_data(**body)
AdHocCommandEvent.create_from_data(**body)
job_identifier = 'unknown job' job_identifier = 'unknown job'
if 'job_id' in body: for key in event_map.keys():
job_identifier = body['job_id'] if key in body:
elif 'ad_hoc_command_id' in body: job_identifier = body[key]
job_identifier = body['ad_hoc_command_id'] break
retries = 0 retries = 0
while retries <= self.MAX_RETRIES: while retries <= self.MAX_RETRIES:

View File

@@ -0,0 +1,81 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.7 on 2017-12-14 15:13
from __future__ import unicode_literals
import awx.main.fields
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('main', '0017_v330_move_deprecated_stdout'),
]
operations = [
migrations.CreateModel(
name='InventoryUpdateEvent',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateTimeField(default=None, editable=False)),
('modified', models.DateTimeField(default=None, editable=False)),
('event_data', awx.main.fields.JSONField(blank=True, default={})),
('uuid', models.CharField(default=b'', editable=False, max_length=1024)),
('counter', models.PositiveIntegerField(default=0, editable=False)),
('stdout', models.TextField(default=b'', editable=False)),
('verbosity', models.PositiveIntegerField(default=0, editable=False)),
('start_line', models.PositiveIntegerField(default=0, editable=False)),
('end_line', models.PositiveIntegerField(default=0, editable=False)),
('inventory_update', models.ForeignKey(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='generic_command_events', to='main.InventoryUpdate')),
],
options={
'ordering': ('-pk',),
},
),
migrations.CreateModel(
name='ProjectUpdateEvent',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateTimeField(default=None, editable=False)),
('modified', models.DateTimeField(default=None, editable=False)),
('event', models.CharField(choices=[(b'runner_on_failed', 'Host Failed'), (b'runner_on_ok', 'Host OK'), (b'runner_on_error', 'Host Failure'), (b'runner_on_skipped', 'Host Skipped'), (b'runner_on_unreachable', 'Host Unreachable'), (b'runner_on_no_hosts', 'No Hosts Remaining'), (b'runner_on_async_poll', 'Host Polling'), (b'runner_on_async_ok', 'Host Async OK'), (b'runner_on_async_failed', 'Host Async Failure'), (b'runner_item_on_ok', 'Item OK'), (b'runner_item_on_failed', 'Item Failed'), (b'runner_item_on_skipped', 'Item Skipped'), (b'runner_retry', 'Host Retry'), (b'runner_on_file_diff', 'File Difference'), (b'playbook_on_start', 'Playbook Started'), (b'playbook_on_notify', 'Running Handlers'), (b'playbook_on_include', 'Including File'), (b'playbook_on_no_hosts_matched', 'No Hosts Matched'), (b'playbook_on_no_hosts_remaining', 'No Hosts Remaining'), (b'playbook_on_task_start', 'Task Started'), (b'playbook_on_vars_prompt', 'Variables Prompted'), (b'playbook_on_setup', 'Gathering Facts'), (b'playbook_on_import_for_host', 'internal: on Import for Host'), (b'playbook_on_not_import_for_host', 'internal: on Not Import for Host'), (b'playbook_on_play_start', 'Play Started'), (b'playbook_on_stats', 'Playbook Complete'), (b'debug', 'Debug'), (b'verbose', 'Verbose'), (b'deprecated', 'Deprecated'), (b'warning', 'Warning'), (b'system_warning', 'System Warning'), (b'error', 'Error')], max_length=100)),
('event_data', awx.main.fields.JSONField(blank=True, default={})),
('failed', models.BooleanField(default=False, editable=False)),
('changed', models.BooleanField(default=False, editable=False)),
('uuid', models.CharField(default=b'', editable=False, max_length=1024)),
('playbook', models.CharField(default=b'', editable=False, max_length=1024)),
('play', models.CharField(default=b'', editable=False, max_length=1024)),
('role', models.CharField(default=b'', editable=False, max_length=1024)),
('task', models.CharField(default=b'', editable=False, max_length=1024)),
('counter', models.PositiveIntegerField(default=0, editable=False)),
('stdout', models.TextField(default=b'', editable=False)),
('verbosity', models.PositiveIntegerField(default=0, editable=False)),
('start_line', models.PositiveIntegerField(default=0, editable=False)),
('end_line', models.PositiveIntegerField(default=0, editable=False)),
('project_update', models.ForeignKey(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='generic_command_events', to='main.ProjectUpdate')),
],
options={
'ordering': ('pk',),
},
),
migrations.CreateModel(
name='SystemJobEvent',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateTimeField(default=None, editable=False)),
('modified', models.DateTimeField(default=None, editable=False)),
('event_data', awx.main.fields.JSONField(blank=True, default={})),
('uuid', models.CharField(default=b'', editable=False, max_length=1024)),
('counter', models.PositiveIntegerField(default=0, editable=False)),
('stdout', models.TextField(default=b'', editable=False)),
('verbosity', models.PositiveIntegerField(default=0, editable=False)),
('start_line', models.PositiveIntegerField(default=0, editable=False)),
('end_line', models.PositiveIntegerField(default=0, editable=False)),
('system_job', models.ForeignKey(editable=False, on_delete=django.db.models.deletion.CASCADE, related_name='generic_command_events', to='main.SystemJob')),
],
options={
'ordering': ('-pk',),
},
),
]

View File

@@ -12,6 +12,7 @@ from awx.main.models.credential import * # noqa
from awx.main.models.projects import * # noqa from awx.main.models.projects import * # noqa
from awx.main.models.inventory import * # noqa from awx.main.models.inventory import * # noqa
from awx.main.models.jobs import * # noqa from awx.main.models.jobs import * # noqa
from awx.main.models.events import * # noqa
from awx.main.models.ad_hoc_commands import * # noqa from awx.main.models.ad_hoc_commands import * # noqa
from awx.main.models.schedules import * # noqa from awx.main.models.schedules import * # noqa
from awx.main.models.activity_stream import * # noqa from awx.main.models.activity_stream import * # noqa

View File

@@ -2,16 +2,13 @@
# All Rights Reserved. # All Rights Reserved.
# Python # Python
import datetime
import logging import logging
from urlparse import urljoin from urlparse import urljoin
# Django # Django
from django.conf import settings from django.conf import settings
from django.db import models from django.db import models
from django.utils.dateparse import parse_datetime
from django.utils.text import Truncator from django.utils.text import Truncator
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
@@ -20,11 +17,10 @@ from awx.api.versioning import reverse
from awx.main.models.base import * # noqa from awx.main.models.base import * # noqa
from awx.main.models.unified_jobs import * # noqa from awx.main.models.unified_jobs import * # noqa
from awx.main.models.notifications import JobNotificationMixin, NotificationTemplate from awx.main.models.notifications import JobNotificationMixin, NotificationTemplate
from awx.main.fields import JSONField
logger = logging.getLogger('awx.main.models.ad_hoc_commands') logger = logging.getLogger('awx.main.models.ad_hoc_commands')
__all__ = ['AdHocCommand', 'AdHocCommandEvent'] __all__ = ['AdHocCommand']
class AdHocCommand(UnifiedJob, JobNotificationMixin): class AdHocCommand(UnifiedJob, JobNotificationMixin):
@@ -224,169 +220,3 @@ class AdHocCommand(UnifiedJob, JobNotificationMixin):
def get_notification_friendly_name(self): def get_notification_friendly_name(self):
return "AdHoc Command" return "AdHoc Command"
class AdHocCommandEvent(CreatedModifiedModel):
'''
An event/message logged from the ad hoc event callback for each host.
'''
EVENT_TYPES = [
# (event, verbose name, failed)
('runner_on_failed', _('Host Failed'), True),
('runner_on_ok', _('Host OK'), False),
('runner_on_unreachable', _('Host Unreachable'), True),
# Tower won't see no_hosts (check is done earlier without callback).
# ('runner_on_no_hosts', _('No Hosts Matched'), False),
# Tower will see skipped (when running in check mode for a module that
# does not support check mode).
('runner_on_skipped', _('Host Skipped'), False),
# Tower does not support async for ad hoc commands (not used in v2).
# ('runner_on_async_poll', _('Host Polling'), False),
# ('runner_on_async_ok', _('Host Async OK'), False),
# ('runner_on_async_failed', _('Host Async Failure'), True),
# Tower does not yet support --diff mode.
# ('runner_on_file_diff', _('File Difference'), False),
# Additional event types for captured stdout not directly related to
# runner events.
('debug', _('Debug'), False),
('verbose', _('Verbose'), False),
('deprecated', _('Deprecated'), False),
('warning', _('Warning'), False),
('system_warning', _('System Warning'), False),
('error', _('Error'), False),
]
FAILED_EVENTS = [x[0] for x in EVENT_TYPES if x[2]]
EVENT_CHOICES = [(x[0], x[1]) for x in EVENT_TYPES]
class Meta:
app_label = 'main'
ordering = ('-pk',)
index_together = [
('ad_hoc_command', 'event'),
('ad_hoc_command', 'uuid'),
('ad_hoc_command', 'start_line'),
('ad_hoc_command', 'end_line'),
]
ad_hoc_command = models.ForeignKey(
'AdHocCommand',
related_name='ad_hoc_command_events',
on_delete=models.CASCADE,
editable=False,
)
host = models.ForeignKey(
'Host',
related_name='ad_hoc_command_events',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
host_name = models.CharField(
max_length=1024,
default='',
editable=False,
)
event = models.CharField(
max_length=100,
choices=EVENT_CHOICES,
)
event_data = JSONField(
blank=True,
default={},
)
failed = models.BooleanField(
default=False,
editable=False,
)
changed = models.BooleanField(
default=False,
editable=False,
)
uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
counter = models.PositiveIntegerField(
default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
)
def get_absolute_url(self, request=None):
return reverse('api:ad_hoc_command_event_detail', kwargs={'pk': self.pk}, request=request)
def __unicode__(self):
return u'%s @ %s' % (self.get_event_display(), self.created.isoformat())
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
res = self.event_data.get('res', None)
if self.event in self.FAILED_EVENTS:
if not self.event_data.get('ignore_errors', False):
self.failed = True
if 'failed' not in update_fields:
update_fields.append('failed')
if isinstance(res, dict) and res.get('changed', False):
self.changed = True
if 'changed' not in update_fields:
update_fields.append('changed')
self.host_name = self.event_data.get('host', '').strip()
if 'host_name' not in update_fields:
update_fields.append('host_name')
if not self.host_id and self.host_name:
host_qs = self.ad_hoc_command.inventory.hosts.filter(name=self.host_name)
try:
host_id = host_qs.only('id').values_list('id', flat=True)
if host_id.exists():
self.host_id = host_id[0]
if 'host_id' not in update_fields:
update_fields.append('host_id')
except (IndexError, AttributeError):
pass
super(AdHocCommandEvent, self).save(*args, **kwargs)
@classmethod
def create_from_data(self, **kwargs):
# Convert the datetime for the ad hoc command event's creation
# appropriately, and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Sanity check: Don't honor keys that we don't recognize.
valid_keys = {'ad_hoc_command_id', 'event', 'event_data', 'created',
'counter', 'uuid', 'stdout', 'start_line', 'end_line',
'verbosity'}
for key in kwargs.keys():
if key not in valid_keys:
kwargs.pop(key)
return AdHocCommandEvent.objects.create(**kwargs)

774
awx/main/models/events.py Normal file
View File

@@ -0,0 +1,774 @@
import datetime
import logging
from django.conf import settings
from django.db import models
from django.utils.dateparse import parse_datetime
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _
from django.utils.encoding import force_text
from awx.api.versioning import reverse
from awx.main.consumers import emit_channel_notification
from awx.main.fields import JSONField
from awx.main.models.base import CreatedModifiedModel
from awx.main.utils import ignore_inventory_computed_fields
analytics_logger = logging.getLogger('awx.analytics.job_events')
__all__ = ['JobEvent', 'ProjectUpdateEvent', 'AdHocCommandEvent',
'InventoryUpdateEvent', 'SystemJobEvent']
class BasePlaybookEvent(CreatedModifiedModel):
'''
An event/message logged from a playbook callback for each host.
'''
VALID_KEYS = [
'event', 'event_data', 'playbook', 'play', 'role', 'task', 'created',
'counter', 'uuid', 'stdout', 'parent_uuid', 'start_line', 'end_line',
'verbosity'
]
class Meta:
abstract = True
# Playbook events will be structured to form the following hierarchy:
# - playbook_on_start (once for each playbook file)
# - playbook_on_vars_prompt (for each play, but before play starts, we
# currently don't handle responding to these prompts)
# - playbook_on_play_start (once for each play)
# - playbook_on_import_for_host (not logged, not used for v2)
# - playbook_on_not_import_for_host (not logged, not used for v2)
# - playbook_on_no_hosts_matched
# - playbook_on_no_hosts_remaining
# - playbook_on_include (only v2 - only used for handlers?)
# - playbook_on_setup (not used for v2)
# - runner_on*
# - playbook_on_task_start (once for each task within a play)
# - runner_on_failed
# - runner_on_ok
# - runner_on_error (not used for v2)
# - runner_on_skipped
# - runner_on_unreachable
# - runner_on_no_hosts (not used for v2)
# - runner_on_async_poll (not used for v2)
# - runner_on_async_ok (not used for v2)
# - runner_on_async_failed (not used for v2)
# - runner_on_file_diff (v2 event is v2_on_file_diff)
# - runner_item_on_ok (v2 only)
# - runner_item_on_failed (v2 only)
# - runner_item_on_skipped (v2 only)
# - runner_retry (v2 only)
# - playbook_on_notify (once for each notification from the play, not used for v2)
# - playbook_on_stats
EVENT_TYPES = [
# (level, event, verbose name, failed)
(3, 'runner_on_failed', _('Host Failed'), True),
(3, 'runner_on_ok', _('Host OK'), False),
(3, 'runner_on_error', _('Host Failure'), True),
(3, 'runner_on_skipped', _('Host Skipped'), False),
(3, 'runner_on_unreachable', _('Host Unreachable'), True),
(3, 'runner_on_no_hosts', _('No Hosts Remaining'), False),
(3, 'runner_on_async_poll', _('Host Polling'), False),
(3, 'runner_on_async_ok', _('Host Async OK'), False),
(3, 'runner_on_async_failed', _('Host Async Failure'), True),
(3, 'runner_item_on_ok', _('Item OK'), False),
(3, 'runner_item_on_failed', _('Item Failed'), True),
(3, 'runner_item_on_skipped', _('Item Skipped'), False),
(3, 'runner_retry', _('Host Retry'), False),
# Tower does not yet support --diff mode.
(3, 'runner_on_file_diff', _('File Difference'), False),
(0, 'playbook_on_start', _('Playbook Started'), False),
(2, 'playbook_on_notify', _('Running Handlers'), False),
(2, 'playbook_on_include', _('Including File'), False),
(2, 'playbook_on_no_hosts_matched', _('No Hosts Matched'), False),
(2, 'playbook_on_no_hosts_remaining', _('No Hosts Remaining'), False),
(2, 'playbook_on_task_start', _('Task Started'), False),
# Tower does not yet support vars_prompt (and will probably hang :)
(1, 'playbook_on_vars_prompt', _('Variables Prompted'), False),
(2, 'playbook_on_setup', _('Gathering Facts'), False),
(2, 'playbook_on_import_for_host', _('internal: on Import for Host'), False),
(2, 'playbook_on_not_import_for_host', _('internal: on Not Import for Host'), False),
(1, 'playbook_on_play_start', _('Play Started'), False),
(1, 'playbook_on_stats', _('Playbook Complete'), False),
# Additional event types for captured stdout not directly related to
# playbook or runner events.
(0, 'debug', _('Debug'), False),
(0, 'verbose', _('Verbose'), False),
(0, 'deprecated', _('Deprecated'), False),
(0, 'warning', _('Warning'), False),
(0, 'system_warning', _('System Warning'), False),
(0, 'error', _('Error'), True),
]
FAILED_EVENTS = [x[1] for x in EVENT_TYPES if x[3]]
EVENT_CHOICES = [(x[1], x[2]) for x in EVENT_TYPES]
LEVEL_FOR_EVENT = dict([(x[1], x[0]) for x in EVENT_TYPES])
event = models.CharField(
max_length=100,
choices=EVENT_CHOICES,
)
event_data = JSONField(
blank=True,
default={},
)
failed = models.BooleanField(
default=False,
editable=False,
)
changed = models.BooleanField(
default=False,
editable=False,
)
uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
playbook = models.CharField(
max_length=1024,
default='',
editable=False,
)
play = models.CharField(
max_length=1024,
default='',
editable=False,
)
role = models.CharField(
max_length=1024,
default='',
editable=False,
)
task = models.CharField(
max_length=1024,
default='',
editable=False,
)
counter = models.PositiveIntegerField(
default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
)
@property
def event_level(self):
return self.LEVEL_FOR_EVENT.get(self.event, 0)
def get_event_display2(self):
msg = self.get_event_display()
if self.event == 'playbook_on_play_start':
if self.play:
msg = "%s (%s)" % (msg, self.play)
elif self.event == 'playbook_on_task_start':
if self.task:
if self.event_data.get('is_conditional', False):
msg = 'Handler Notified'
if self.role:
msg = '%s (%s | %s)' % (msg, self.role, self.task)
else:
msg = "%s (%s)" % (msg, self.task)
# Change display for runner events triggered by async polling. Some of
# these events may not show in most cases, due to filterting them out
# of the job event queryset returned to the user.
res = self.event_data.get('res', {})
# Fix for existing records before we had added the workaround on save
# to change async_ok to async_failed.
if self.event == 'runner_on_async_ok':
try:
if res.get('failed', False) or res.get('rc', 0) != 0:
msg = 'Host Async Failed'
except (AttributeError, TypeError):
pass
# Runner events with ansible_job_id are part of async starting/polling.
if self.event in ('runner_on_ok', 'runner_on_failed'):
try:
module_name = res['invocation']['module_name']
job_id = res['ansible_job_id']
except (TypeError, KeyError, AttributeError):
module_name = None
job_id = None
if module_name and job_id:
if module_name == 'async_status':
msg = 'Host Async Checking'
else:
msg = 'Host Async Started'
# Handle both 1.2 on_failed and 1.3+ on_async_failed events when an
# async task times out.
if self.event in ('runner_on_failed', 'runner_on_async_failed'):
try:
if res['msg'] == 'timed out':
msg = 'Host Async Timeout'
except (TypeError, KeyError, AttributeError):
pass
return msg
def _update_from_event_data(self):
# Update event model fields from event data.
updated_fields = set()
event_data = self.event_data
res = event_data.get('res', None)
if self.event in self.FAILED_EVENTS and not event_data.get('ignore_errors', False):
self.failed = True
updated_fields.add('failed')
if isinstance(res, dict):
if res.get('changed', False):
self.changed = True
updated_fields.add('changed')
# If we're not in verbose mode, wipe out any module arguments.
invocation = res.get('invocation', None)
if isinstance(invocation, dict) and self.job_verbosity == 0 and 'module_args' in invocation:
event_data['res']['invocation']['module_args'] = ''
self.event_data = event_data
updated_fields.add('event_data')
if self.event == 'playbook_on_stats':
try:
failures_dict = event_data.get('failures', {})
dark_dict = event_data.get('dark', {})
self.failed = bool(sum(failures_dict.values()) +
sum(dark_dict.values()))
updated_fields.add('failed')
changed_dict = event_data.get('changed', {})
self.changed = bool(sum(changed_dict.values()))
updated_fields.add('changed')
except (AttributeError, TypeError):
pass
for field in ('playbook', 'play', 'task', 'role'):
value = force_text(event_data.get(field, '')).strip()
if value != getattr(self, field):
setattr(self, field, value)
updated_fields.add(field)
return updated_fields
@classmethod
def create_from_data(self, **kwargs):
pk = None
for key in ('job_id', 'project_update_id'):
if key in kwargs:
pk = key
if pk is None:
# payload must contain either a job_id or a project_update_id
return
# Convert the datetime for the job event's creation appropriately,
# and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Sanity check: Don't honor keys that we don't recognize.
for key in kwargs.keys():
if key not in self.VALID_KEYS:
kwargs.pop(key)
event_data = kwargs.get('event_data', None)
artifact_dict = None
if event_data:
artifact_dict = event_data.pop('artifact_data', None)
job_event = self.objects.create(**kwargs)
analytics_logger.info('Event data saved.', extra=dict(python_objects=dict(job_event=job_event)))
# Save artifact data to parent job (if provided).
if artifact_dict:
if event_data and isinstance(event_data, dict):
# Note: Core has not added support for marking artifacts as
# sensitive yet. Going forward, core will not use
# _ansible_no_log to denote sensitive set_stats calls.
# Instead, they plan to add a flag outside of the traditional
# no_log mechanism. no_log will not work for this feature,
# in core, because sensitive data is scrubbed before sending
# data to the callback. The playbook_on_stats is the callback
# in which the set_stats data is used.
# Again, the sensitive artifact feature has not yet landed in
# core. The below is how we mark artifacts payload as
# senstive
# artifact_dict['_ansible_no_log'] = True
#
parent_job = self.objects.filter(pk=pk).first()
if hasattr(parent_job, 'artifacts') and parent_job.artifacts != artifact_dict:
parent_job.artifacts = artifact_dict
parent_job.save(update_fields=['artifacts'])
return job_event
@property
def job_verbosity(self):
return 0
class JobEvent(BasePlaybookEvent):
'''
An event/message logged from the callback when running a job.
'''
VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['job_id']
class Meta:
app_label = 'main'
ordering = ('pk',)
index_together = [
('job', 'event'),
('job', 'uuid'),
('job', 'start_line'),
('job', 'end_line'),
('job', 'parent_uuid'),
]
job = models.ForeignKey(
'Job',
related_name='job_events',
on_delete=models.CASCADE,
editable=False,
)
host = models.ForeignKey(
'Host',
related_name='job_events_as_primary_host',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
host_name = models.CharField(
max_length=1024,
default='',
editable=False,
)
hosts = models.ManyToManyField(
'Host',
related_name='job_events',
editable=False,
)
parent = models.ForeignKey(
'self',
related_name='children',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
parent_uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
def get_absolute_url(self, request=None):
return reverse('api:job_event_detail', kwargs={'pk': self.pk}, request=request)
def __unicode__(self):
return u'%s @ %s' % (self.get_event_display2(), self.created.isoformat())
def _update_from_event_data(self):
# Update job event hostname
updated_fields = super(JobEvent, self)._update_from_event_data()
value = force_text(self.event_data.get('host', '')).strip()
if value != getattr(self, 'host_name'):
setattr(self, 'host_name', value)
updated_fields.add('host_name')
return updated_fields
def _update_parents_failed_and_changed(self):
# Update parent events to reflect failed, changed
runner_events = JobEvent.objects.filter(job=self.job,
event__startswith='runner_on')
changed_events = runner_events.filter(changed=True)
failed_events = runner_events.filter(failed=True)
JobEvent.objects.filter(uuid__in=changed_events.values_list('parent_uuid', flat=True)).update(changed=True)
JobEvent.objects.filter(uuid__in=failed_events.values_list('parent_uuid', flat=True)).update(failed=True)
def _update_hosts(self, extra_host_pks=None):
# Update job event hosts m2m from host_name, propagate to parent events.
extra_host_pks = set(extra_host_pks or [])
hostnames = set()
if self.host_name:
hostnames.add(self.host_name)
if self.event == 'playbook_on_stats':
try:
for v in self.event_data.values():
hostnames.update(v.keys())
except AttributeError: # In case event_data or v isn't a dict.
pass
qs = self.job.inventory.hosts.all()
qs = qs.filter(models.Q(name__in=hostnames) | models.Q(pk__in=extra_host_pks))
qs = qs.exclude(job_events__pk=self.id).only('id')
for host in qs:
self.hosts.add(host)
if self.parent_uuid:
parent = JobEvent.objects.filter(uuid=self.parent_uuid)
if parent.exists():
parent = parent[0]
parent._update_hosts(qs.values_list('id', flat=True))
def _hostnames(self):
hostnames = set()
try:
for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
hostnames.update(self.event_data.get(stat, {}).keys())
except AttributeError: # In case event_data or v isn't a dict.
pass
return hostnames
def _update_host_summary_from_stats(self, hostnames):
with ignore_inventory_computed_fields():
qs = self.job.inventory.hosts.filter(name__in=hostnames)
job = self.job
for host in hostnames:
host_stats = {}
for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
try:
host_stats[stat] = self.event_data.get(stat, {}).get(host, 0)
except AttributeError: # in case event_data[stat] isn't a dict.
pass
if qs.filter(name=host).exists():
host_actual = qs.get(name=host)
host_summary, created = job.job_host_summaries.get_or_create(host=host_actual, host_name=host_actual.name, defaults=host_stats)
else:
host_summary, created = job.job_host_summaries.get_or_create(host_name=host, defaults=host_stats)
if not created:
update_fields = []
for stat, value in host_stats.items():
if getattr(host_summary, stat) != value:
setattr(host_summary, stat, value)
update_fields.append(stat)
if update_fields:
host_summary.save(update_fields=update_fields)
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Update model fields and related objects unless we're only updating
# failed/changed flags triggered from a child event.
from_parent_update = kwargs.pop('from_parent_update', False)
if not from_parent_update:
# Update model fields from event data.
updated_fields = self._update_from_event_data()
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Update host related field from host_name.
if not self.host_id and self.host_name:
host_qs = self.job.inventory.hosts.filter(name=self.host_name)
host_id = host_qs.only('id').values_list('id', flat=True).first()
if host_id != self.host_id:
self.host_id = host_id
if 'host_id' not in update_fields:
update_fields.append('host_id')
super(JobEvent, self).save(*args, **kwargs)
# Update related objects after this event is saved.
if not from_parent_update:
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self._update_hosts()
if self.event == 'playbook_on_stats':
self._update_parents_failed_and_changed()
hostnames = self._hostnames()
self._update_host_summary_from_stats(hostnames)
self.job.inventory.update_computed_fields()
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=self.job.id))
@property
def job_verbosity(self):
return self.job.verbosity
class ProjectUpdateEvent(BasePlaybookEvent):
VALID_KEYS = BasePlaybookEvent.VALID_KEYS + ['project_update_id']
class Meta:
app_label = 'main'
ordering = ('pk',)
index_together = [
('project_update', 'event'),
('project_update', 'uuid'),
('project_update', 'start_line'),
('project_update', 'end_line'),
]
project_update = models.ForeignKey(
'ProjectUpdate',
related_name='project_update_events',
on_delete=models.CASCADE,
editable=False,
)
@property
def host_name(self):
return 'localhost'
class BaseCommandEvent(CreatedModifiedModel):
'''
An event/message logged from a command for each host.
'''
VALID_KEYS = [
'event_data', 'created', 'counter', 'uuid', 'stdout', 'start_line',
'end_line', 'verbosity'
]
class Meta:
abstract = True
event_data = JSONField(
blank=True,
default={},
)
uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
counter = models.PositiveIntegerField(
default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
)
def __unicode__(self):
return u'%s @ %s' % (self.get_event_display(), self.created.isoformat())
@classmethod
def create_from_data(self, **kwargs):
# Convert the datetime for the event's creation
# appropriately, and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Sanity check: Don't honor keys that we don't recognize.
for key in kwargs.keys():
if key not in self.VALID_KEYS:
kwargs.pop(key)
return self.objects.create(**kwargs)
class AdHocCommandEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['ad_hoc_command', 'event']
class Meta:
app_label = 'main'
ordering = ('-pk',)
index_together = [
('ad_hoc_command', 'event'),
('ad_hoc_command', 'uuid'),
('ad_hoc_command', 'start_line'),
('ad_hoc_command', 'end_line'),
]
EVENT_TYPES = [
# (event, verbose name, failed)
('runner_on_failed', _('Host Failed'), True),
('runner_on_ok', _('Host OK'), False),
('runner_on_unreachable', _('Host Unreachable'), True),
# Tower won't see no_hosts (check is done earlier without callback).
# ('runner_on_no_hosts', _('No Hosts Matched'), False),
# Tower will see skipped (when running in check mode for a module that
# does not support check mode).
('runner_on_skipped', _('Host Skipped'), False),
# Tower does not support async for ad hoc commands (not used in v2).
# ('runner_on_async_poll', _('Host Polling'), False),
# ('runner_on_async_ok', _('Host Async OK'), False),
# ('runner_on_async_failed', _('Host Async Failure'), True),
# Tower does not yet support --diff mode.
# ('runner_on_file_diff', _('File Difference'), False),
# Additional event types for captured stdout not directly related to
# runner events.
('debug', _('Debug'), False),
('verbose', _('Verbose'), False),
('deprecated', _('Deprecated'), False),
('warning', _('Warning'), False),
('system_warning', _('System Warning'), False),
('error', _('Error'), False),
]
FAILED_EVENTS = [x[0] for x in EVENT_TYPES if x[2]]
EVENT_CHOICES = [(x[0], x[1]) for x in EVENT_TYPES]
event = models.CharField(
max_length=100,
choices=EVENT_CHOICES,
)
failed = models.BooleanField(
default=False,
editable=False,
)
changed = models.BooleanField(
default=False,
editable=False,
)
ad_hoc_command = models.ForeignKey(
'AdHocCommand',
related_name='ad_hoc_command_events',
on_delete=models.CASCADE,
editable=False,
)
host = models.ForeignKey(
'Host',
related_name='ad_hoc_command_events',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
host_name = models.CharField(
max_length=1024,
default='',
editable=False,
)
def get_absolute_url(self, request=None):
return reverse('api:ad_hoc_command_event_detail', kwargs={'pk': self.pk}, request=request)
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
res = self.event_data.get('res', None)
if self.event in self.FAILED_EVENTS:
if not self.event_data.get('ignore_errors', False):
self.failed = True
if 'failed' not in update_fields:
update_fields.append('failed')
if isinstance(res, dict) and res.get('changed', False):
self.changed = True
if 'changed' not in update_fields:
update_fields.append('changed')
self.host_name = self.event_data.get('host', '').strip()
if 'host_name' not in update_fields:
update_fields.append('host_name')
if not self.host_id and self.host_name:
host_qs = self.ad_hoc_command.inventory.hosts.filter(name=self.host_name)
try:
host_id = host_qs.only('id').values_list('id', flat=True)
if host_id.exists():
self.host_id = host_id[0]
if 'host_id' not in update_fields:
update_fields.append('host_id')
except (IndexError, AttributeError):
pass
super(AdHocCommandEvent, self).save(*args, **kwargs)
class InventoryUpdateEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['inventory_update_id']
class Meta:
app_label = 'main'
ordering = ('-pk',)
index_together = [
('inventory_update', 'uuid'),
('inventory_update', 'start_line'),
('inventory_update', 'end_line'),
]
inventory_update = models.ForeignKey(
'InventoryUpdate',
related_name='inventory_update_events',
on_delete=models.CASCADE,
editable=False,
)
@property
def event(self):
return 'verbose'
@property
def failed(self):
return False
@property
def changed(self):
return False
class SystemJobEvent(BaseCommandEvent):
VALID_KEYS = BaseCommandEvent.VALID_KEYS + ['system_job_id']
class Meta:
app_label = 'main'
ordering = ('-pk',)
index_together = [
('system_job', 'uuid'),
('system_job', 'start_line'),
('system_job', 'end_line'),
]
system_job = models.ForeignKey(
'SystemJob',
related_name='system_job_events',
on_delete=models.CASCADE,
editable=False,
)
@property
def event(self):
return 'verbose'
@property
def failed(self):
return False
@property
def changed(self):
return False

View File

@@ -14,12 +14,9 @@ from django.conf import settings
from django.db import models from django.db import models
#from django.core.cache import cache #from django.core.cache import cache
import memcache import memcache
from django.db.models import Q, Count
from django.utils.dateparse import parse_datetime
from dateutil import parser from dateutil import parser
from dateutil.tz import tzutc from dateutil.tz import tzutc
from django.utils.encoding import force_text, smart_str from django.utils.encoding import smart_str
from django.utils.timezone import utc
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError, FieldDoesNotExist from django.core.exceptions import ValidationError, FieldDoesNotExist
@@ -34,22 +31,17 @@ from awx.main.models.notifications import (
NotificationTemplate, NotificationTemplate,
JobNotificationMixin, JobNotificationMixin,
) )
from awx.main.utils import ( from awx.main.utils import parse_yaml_or_json
ignore_inventory_computed_fields,
parse_yaml_or_json,
)
from awx.main.fields import ImplicitRoleField from awx.main.fields import ImplicitRoleField
from awx.main.models.mixins import ResourceMixin, SurveyJobTemplateMixin, SurveyJobMixin, TaskManagerJobMixin from awx.main.models.mixins import ResourceMixin, SurveyJobTemplateMixin, SurveyJobMixin, TaskManagerJobMixin
from awx.main.fields import JSONField, AskForField from awx.main.fields import JSONField, AskForField
from awx.main.consumers import emit_channel_notification
logger = logging.getLogger('awx.main.models.jobs') logger = logging.getLogger('awx.main.models.jobs')
analytics_logger = logging.getLogger('awx.analytics.job_events') analytics_logger = logging.getLogger('awx.analytics.job_events')
system_tracking_logger = logging.getLogger('awx.analytics.system_tracking') system_tracking_logger = logging.getLogger('awx.analytics.system_tracking')
__all__ = ['JobTemplate', 'JobLaunchConfig', 'Job', 'JobHostSummary', 'JobEvent', 'SystemJobTemplate', 'SystemJob'] __all__ = ['JobTemplate', 'JobLaunchConfig', 'Job', 'JobHostSummary', 'SystemJobTemplate', 'SystemJob']
class JobOptions(BaseModel): class JobOptions(BaseModel):
@@ -1032,477 +1024,6 @@ class JobHostSummary(CreatedModifiedModel):
#self.host.update_computed_fields() #self.host.update_computed_fields()
class JobEvent(CreatedModifiedModel):
'''
An event/message logged from the callback when running a job.
'''
# Playbook events will be structured to form the following hierarchy:
# - playbook_on_start (once for each playbook file)
# - playbook_on_vars_prompt (for each play, but before play starts, we
# currently don't handle responding to these prompts)
# - playbook_on_play_start (once for each play)
# - playbook_on_import_for_host (not logged, not used for v2)
# - playbook_on_not_import_for_host (not logged, not used for v2)
# - playbook_on_no_hosts_matched
# - playbook_on_no_hosts_remaining
# - playbook_on_include (only v2 - only used for handlers?)
# - playbook_on_setup (not used for v2)
# - runner_on*
# - playbook_on_task_start (once for each task within a play)
# - runner_on_failed
# - runner_on_ok
# - runner_on_error (not used for v2)
# - runner_on_skipped
# - runner_on_unreachable
# - runner_on_no_hosts (not used for v2)
# - runner_on_async_poll (not used for v2)
# - runner_on_async_ok (not used for v2)
# - runner_on_async_failed (not used for v2)
# - runner_on_file_diff (v2 event is v2_on_file_diff)
# - runner_item_on_ok (v2 only)
# - runner_item_on_failed (v2 only)
# - runner_item_on_skipped (v2 only)
# - runner_retry (v2 only)
# - playbook_on_notify (once for each notification from the play, not used for v2)
# - playbook_on_stats
EVENT_TYPES = [
# (level, event, verbose name, failed)
(3, 'runner_on_failed', _('Host Failed'), True),
(3, 'runner_on_ok', _('Host OK'), False),
(3, 'runner_on_error', _('Host Failure'), True),
(3, 'runner_on_skipped', _('Host Skipped'), False),
(3, 'runner_on_unreachable', _('Host Unreachable'), True),
(3, 'runner_on_no_hosts', _('No Hosts Remaining'), False),
(3, 'runner_on_async_poll', _('Host Polling'), False),
(3, 'runner_on_async_ok', _('Host Async OK'), False),
(3, 'runner_on_async_failed', _('Host Async Failure'), True),
(3, 'runner_item_on_ok', _('Item OK'), False),
(3, 'runner_item_on_failed', _('Item Failed'), True),
(3, 'runner_item_on_skipped', _('Item Skipped'), False),
(3, 'runner_retry', _('Host Retry'), False),
# Tower does not yet support --diff mode.
(3, 'runner_on_file_diff', _('File Difference'), False),
(0, 'playbook_on_start', _('Playbook Started'), False),
(2, 'playbook_on_notify', _('Running Handlers'), False),
(2, 'playbook_on_include', _('Including File'), False),
(2, 'playbook_on_no_hosts_matched', _('No Hosts Matched'), False),
(2, 'playbook_on_no_hosts_remaining', _('No Hosts Remaining'), False),
(2, 'playbook_on_task_start', _('Task Started'), False),
# Tower does not yet support vars_prompt (and will probably hang :)
(1, 'playbook_on_vars_prompt', _('Variables Prompted'), False),
(2, 'playbook_on_setup', _('Gathering Facts'), False),
(2, 'playbook_on_import_for_host', _('internal: on Import for Host'), False),
(2, 'playbook_on_not_import_for_host', _('internal: on Not Import for Host'), False),
(1, 'playbook_on_play_start', _('Play Started'), False),
(1, 'playbook_on_stats', _('Playbook Complete'), False),
# Additional event types for captured stdout not directly related to
# playbook or runner events.
(0, 'debug', _('Debug'), False),
(0, 'verbose', _('Verbose'), False),
(0, 'deprecated', _('Deprecated'), False),
(0, 'warning', _('Warning'), False),
(0, 'system_warning', _('System Warning'), False),
(0, 'error', _('Error'), True),
]
FAILED_EVENTS = [x[1] for x in EVENT_TYPES if x[3]]
EVENT_CHOICES = [(x[1], x[2]) for x in EVENT_TYPES]
LEVEL_FOR_EVENT = dict([(x[1], x[0]) for x in EVENT_TYPES])
class Meta:
app_label = 'main'
ordering = ('pk',)
index_together = [
('job', 'event'),
('job', 'uuid'),
('job', 'start_line'),
('job', 'end_line'),
('job', 'parent_uuid'),
]
job = models.ForeignKey(
'Job',
related_name='job_events',
on_delete=models.CASCADE,
editable=False,
)
event = models.CharField(
max_length=100,
choices=EVENT_CHOICES,
)
event_data = JSONField(
blank=True,
default={},
)
failed = models.BooleanField(
default=False,
editable=False,
)
changed = models.BooleanField(
default=False,
editable=False,
)
uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
host = models.ForeignKey(
'Host',
related_name='job_events_as_primary_host',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
host_name = models.CharField(
max_length=1024,
default='',
editable=False,
)
hosts = models.ManyToManyField(
'Host',
related_name='job_events',
editable=False,
)
playbook = models.CharField(
max_length=1024,
default='',
editable=False,
)
play = models.CharField(
max_length=1024,
default='',
editable=False,
)
role = models.CharField(
max_length=1024,
default='',
editable=False,
)
task = models.CharField(
max_length=1024,
default='',
editable=False,
)
parent = models.ForeignKey(
'self',
related_name='children',
null=True,
default=None,
on_delete=models.SET_NULL,
editable=False,
)
parent_uuid = models.CharField(
max_length=1024,
default='',
editable=False,
)
counter = models.PositiveIntegerField(
default=0,
editable=False,
)
stdout = models.TextField(
default='',
editable=False,
)
verbosity = models.PositiveIntegerField(
default=0,
editable=False,
)
start_line = models.PositiveIntegerField(
default=0,
editable=False,
)
end_line = models.PositiveIntegerField(
default=0,
editable=False,
)
def get_absolute_url(self, request=None):
return reverse('api:job_event_detail', kwargs={'pk': self.pk}, request=request)
def __unicode__(self):
return u'%s @ %s' % (self.get_event_display2(), self.created.isoformat())
@property
def event_level(self):
return self.LEVEL_FOR_EVENT.get(self.event, 0)
def get_event_display2(self):
msg = self.get_event_display()
if self.event == 'playbook_on_play_start':
if self.play:
msg = "%s (%s)" % (msg, self.play)
elif self.event == 'playbook_on_task_start':
if self.task:
if self.event_data.get('is_conditional', False):
msg = 'Handler Notified'
if self.role:
msg = '%s (%s | %s)' % (msg, self.role, self.task)
else:
msg = "%s (%s)" % (msg, self.task)
# Change display for runner events trigged by async polling. Some of
# these events may not show in most cases, due to filterting them out
# of the job event queryset returned to the user.
res = self.event_data.get('res', {})
# Fix for existing records before we had added the workaround on save
# to change async_ok to async_failed.
if self.event == 'runner_on_async_ok':
try:
if res.get('failed', False) or res.get('rc', 0) != 0:
msg = 'Host Async Failed'
except (AttributeError, TypeError):
pass
# Runner events with ansible_job_id are part of async starting/polling.
if self.event in ('runner_on_ok', 'runner_on_failed'):
try:
module_name = res['invocation']['module_name']
job_id = res['ansible_job_id']
except (TypeError, KeyError, AttributeError):
module_name = None
job_id = None
if module_name and job_id:
if module_name == 'async_status':
msg = 'Host Async Checking'
else:
msg = 'Host Async Started'
# Handle both 1.2 on_failed and 1.3+ on_async_failed events when an
# async task times out.
if self.event in ('runner_on_failed', 'runner_on_async_failed'):
try:
if res['msg'] == 'timed out':
msg = 'Host Async Timeout'
except (TypeError, KeyError, AttributeError):
pass
return msg
def _update_from_event_data(self):
# Update job event model fields from event data.
updated_fields = set()
job = self.job
verbosity = job.verbosity
event_data = self.event_data
res = event_data.get('res', None)
if self.event in self.FAILED_EVENTS and not event_data.get('ignore_errors', False):
self.failed = True
updated_fields.add('failed')
if isinstance(res, dict):
if res.get('changed', False):
self.changed = True
updated_fields.add('changed')
# If we're not in verbose mode, wipe out any module arguments.
invocation = res.get('invocation', None)
if isinstance(invocation, dict) and verbosity == 0 and 'module_args' in invocation:
event_data['res']['invocation']['module_args'] = ''
self.event_data = event_data
updated_fields.add('event_data')
if self.event == 'playbook_on_stats':
try:
failures_dict = event_data.get('failures', {})
dark_dict = event_data.get('dark', {})
self.failed = bool(sum(failures_dict.values()) +
sum(dark_dict.values()))
updated_fields.add('failed')
changed_dict = event_data.get('changed', {})
self.changed = bool(sum(changed_dict.values()))
updated_fields.add('changed')
except (AttributeError, TypeError):
pass
for field in ('playbook', 'play', 'task', 'role', 'host'):
value = force_text(event_data.get(field, '')).strip()
if field == 'host':
field = 'host_name'
if value != getattr(self, field):
setattr(self, field, value)
updated_fields.add(field)
return updated_fields
def _update_parents_failed_and_changed(self):
# Update parent events to reflect failed, changed
runner_events = JobEvent.objects.filter(job=self.job,
event__startswith='runner_on')
changed_events = runner_events.filter(changed=True)
failed_events = runner_events.filter(failed=True)
JobEvent.objects.filter(uuid__in=changed_events.values_list('parent_uuid', flat=True)).update(changed=True)
JobEvent.objects.filter(uuid__in=failed_events.values_list('parent_uuid', flat=True)).update(failed=True)
def _update_hosts(self, extra_host_pks=None):
# Update job event hosts m2m from host_name, propagate to parent events.
extra_host_pks = set(extra_host_pks or [])
hostnames = set()
if self.host_name:
hostnames.add(self.host_name)
if self.event == 'playbook_on_stats':
try:
for v in self.event_data.values():
hostnames.update(v.keys())
except AttributeError: # In case event_data or v isn't a dict.
pass
qs = self.job.inventory.hosts.all()
qs = qs.filter(Q(name__in=hostnames) | Q(pk__in=extra_host_pks))
qs = qs.exclude(job_events__pk=self.id).only('id')
for host in qs:
self.hosts.add(host)
if self.parent_uuid:
parent = JobEvent.objects.filter(uuid=self.parent_uuid)
if parent.exists():
parent = parent[0]
parent._update_hosts(qs.values_list('id', flat=True))
def _hostnames(self):
hostnames = set()
try:
for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
hostnames.update(self.event_data.get(stat, {}).keys())
except AttributeError: # In case event_data or v isn't a dict.
pass
return hostnames
def _update_host_summary_from_stats(self, hostnames):
with ignore_inventory_computed_fields():
qs = self.job.inventory.hosts.filter(name__in=hostnames)
job = self.job
for host in hostnames:
host_stats = {}
for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'):
try:
host_stats[stat] = self.event_data.get(stat, {}).get(host, 0)
except AttributeError: # in case event_data[stat] isn't a dict.
pass
if qs.filter(name=host).exists():
host_actual = qs.get(name=host)
host_summary, created = job.job_host_summaries.get_or_create(host=host_actual, host_name=host_actual.name, defaults=host_stats)
else:
host_summary, created = job.job_host_summaries.get_or_create(host_name=host, defaults=host_stats)
if not created:
update_fields = []
for stat, value in host_stats.items():
if getattr(host_summary, stat) != value:
setattr(host_summary, stat, value)
update_fields.append(stat)
if update_fields:
host_summary.save(update_fields=update_fields)
def save(self, *args, **kwargs):
# If update_fields has been specified, add our field names to it,
# if it hasn't been specified, then we're just doing a normal save.
update_fields = kwargs.get('update_fields', [])
# Update model fields and related objects unless we're only updating
# failed/changed flags triggered from a child event.
from_parent_update = kwargs.pop('from_parent_update', False)
if not from_parent_update:
# Update model fields from event data.
updated_fields = self._update_from_event_data()
for field in updated_fields:
if field not in update_fields:
update_fields.append(field)
# Update host related field from host_name.
if not self.host_id and self.host_name:
host_qs = self.job.inventory.hosts.filter(name=self.host_name)
host_id = host_qs.only('id').values_list('id', flat=True).first()
if host_id != self.host_id:
self.host_id = host_id
if 'host_id' not in update_fields:
update_fields.append('host_id')
super(JobEvent, self).save(*args, **kwargs)
# Update related objects after this event is saved.
if not from_parent_update:
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self._update_hosts()
if self.event == 'playbook_on_stats':
self._update_parents_failed_and_changed()
hostnames = self._hostnames()
self._update_host_summary_from_stats(hostnames)
self.job.inventory.update_computed_fields()
emit_channel_notification('jobs-summary', dict(group_name='jobs', unified_job_id=self.job.id))
@classmethod
def create_from_data(self, **kwargs):
# Must have a job_id specified.
if not kwargs.get('job_id', None):
return
# Convert the datetime for the job event's creation appropriately,
# and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(kwargs['created'], datetime.datetime):
kwargs['created'] = parse_datetime(kwargs['created'])
if not kwargs['created'].tzinfo:
kwargs['created'] = kwargs['created'].replace(tzinfo=utc)
except (KeyError, ValueError):
kwargs.pop('created', None)
# Sanity check: Don't honor keys that we don't recognize.
valid_keys = {'job_id', 'event', 'event_data', 'playbook', 'play',
'role', 'task', 'created', 'counter', 'uuid', 'stdout',
'parent_uuid', 'start_line', 'end_line', 'verbosity'}
for key in kwargs.keys():
if key not in valid_keys:
kwargs.pop(key)
event_data = kwargs.get('event_data', None)
artifact_dict = None
if event_data:
artifact_dict = event_data.pop('artifact_data', None)
job_event = JobEvent.objects.create(**kwargs)
analytics_logger.info('Job event data saved.', extra=dict(python_objects=dict(job_event=job_event)))
# Save artifact data to parent job (if provided).
if artifact_dict:
if event_data and isinstance(event_data, dict):
# Note: Core has not added support for marking artifacts as
# sensitive yet. Going forward, core will not use
# _ansible_no_log to denote sensitive set_stats calls.
# Instead, they plan to add a flag outside of the traditional
# no_log mechanism. no_log will not work for this feature,
# in core, because sensitive data is scrubbed before sending
# data to the callback. The playbook_on_stats is the callback
# in which the set_stats data is used.
# Again, the sensitive artifact feature has not yet landed in
# core. The below is how we mark artifacts payload as
# senstive
# artifact_dict['_ansible_no_log'] = True
#
parent_job = Job.objects.filter(pk=kwargs['job_id']).first()
if parent_job and parent_job.artifacts != artifact_dict:
parent_job.artifacts = artifact_dict
parent_job.save(update_fields=['artifacts'])
return job_event
@classmethod
def get_startevent_queryset(cls, parent_task, starting_events, ordering=None):
'''
We need to pull information about each start event.
This is super tricky, because this table has a one-to-many
relationship with itself (parent-child), and we're getting
information for an arbitrary number of children. This means we
need stats on grandchildren, sorted by child.
'''
qs = (JobEvent.objects.filter(parent__parent=parent_task,
parent__event__in=starting_events)
.values('parent__id', 'event', 'changed')
.annotate(num=Count('event'))
.order_by('parent__id'))
if ordering is not None:
qs = qs.order_by(ordering)
return qs
class SystemJobOptions(BaseModel): class SystemJobOptions(BaseModel):
''' '''
Common fields for SystemJobTemplate and SystemJob. Common fields for SystemJobTemplate and SystemJob.

View File

@@ -498,6 +498,7 @@ def with_path_cleanup(f):
class BaseTask(LogErrorsTask): class BaseTask(LogErrorsTask):
name = None name = None
model = None model = None
event_model = None
abstract = True abstract = True
cleanup_paths = [] cleanup_paths = []
proot_show_paths = [] proot_show_paths = []
@@ -518,17 +519,13 @@ class BaseTask(LogErrorsTask):
if updates: if updates:
update_fields = ['modified'] update_fields = ['modified']
for field, value in updates.items(): for field, value in updates.items():
if field in ('result_stdout', 'result_traceback'): if field in ('result_traceback'):
for srch, repl in output_replacements: for srch, repl in output_replacements:
value = value.replace(srch, repl) value = value.replace(srch, repl)
setattr(instance, field, value) setattr(instance, field, value)
update_fields.append(field) update_fields.append(field)
if field == 'status': if field == 'status':
update_fields.append('failed') update_fields.append('failed')
if 'result_stdout_text' in update_fields:
# result_stdout_text is now deprecated, and is no longer
# an actual Django field (it's a property)
update_fields.remove('result_stdout_text')
instance.save(update_fields=update_fields) instance.save(update_fields=update_fields)
return instance return instance
except DatabaseError as e: except DatabaseError as e:
@@ -742,10 +739,24 @@ class BaseTask(LogErrorsTask):
''' '''
if not os.path.exists(settings.JOBOUTPUT_ROOT): if not os.path.exists(settings.JOBOUTPUT_ROOT):
os.makedirs(settings.JOBOUTPUT_ROOT) os.makedirs(settings.JOBOUTPUT_ROOT)
stdout_filename = os.path.join(settings.JOBOUTPUT_ROOT, "%d-%s.out" % (instance.pk, str(uuid.uuid1()))) stdout_filename = os.path.join(
settings.JOBOUTPUT_ROOT,
"%d-%s.out" % (instance.pk, str(uuid.uuid1()))
)
stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8') stdout_handle = codecs.open(stdout_filename, 'w', encoding='utf-8')
assert stdout_handle.name == stdout_filename assert stdout_handle.name == stdout_filename
return stdout_handle
dispatcher = CallbackQueueDispatcher()
def event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
if 'uuid' in event_data:
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
if cache_event is not None:
event_data.update(cache_event)
dispatcher.dispatch(event_data)
return OutputEventFilter(stdout_handle, event_callback)
def pre_run_hook(self, instance, **kwargs): def pre_run_hook(self, instance, **kwargs):
''' '''
@@ -940,7 +951,8 @@ class RunJob(BaseTask):
name = 'awx.main.tasks.run_job' name = 'awx.main.tasks.run_job'
model = Job model = Job
event_data_key= 'job_id' event_model = JobEvent
event_data_key = 'job_id'
def build_private_data(self, job, **kwargs): def build_private_data(self, job, **kwargs):
''' '''
@@ -1207,29 +1219,6 @@ class RunJob(BaseTask):
d[re.compile(r'Vault password \({}\):\s*?$'.format(vault_id), re.M)] = k d[re.compile(r'Vault password \({}\):\s*?$'.format(vault_id), re.M)] = k
return d return d
def get_stdout_handle(self, instance):
'''
Wrap stdout file object to capture events.
'''
stdout_handle = super(RunJob, self).get_stdout_handle(instance)
if getattr(settings, 'USE_CALLBACK_QUEUE', False):
dispatcher = CallbackQueueDispatcher()
def job_event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
if 'uuid' in event_data:
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
if cache_event is not None:
event_data.update(cache_event)
dispatcher.dispatch(event_data)
else:
def job_event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
JobEvent.create_from_data(**event_data)
return OutputEventFilter(stdout_handle, job_event_callback)
def should_use_proot(self, instance, **kwargs): def should_use_proot(self, instance, **kwargs):
''' '''
Return whether this task should use proot. Return whether this task should use proot.
@@ -1290,6 +1279,8 @@ class RunProjectUpdate(BaseTask):
name = 'awx.main.tasks.run_project_update' name = 'awx.main.tasks.run_project_update'
model = ProjectUpdate model = ProjectUpdate
event_model = ProjectUpdateEvent
event_data_key = 'project_update_id'
@property @property
def proot_show_paths(self): def proot_show_paths(self):
@@ -1343,6 +1334,10 @@ class RunProjectUpdate(BaseTask):
# give ansible a hint about the intended tmpdir to work around issues # give ansible a hint about the intended tmpdir to work around issues
# like https://github.com/ansible/ansible/issues/30064 # like https://github.com/ansible/ansible/issues/30064
env['TMP'] = settings.AWX_PROOT_BASE_PATH env['TMP'] = settings.AWX_PROOT_BASE_PATH
env['CACHE'] = settings.CACHES['default']['LOCATION'] if 'LOCATION' in settings.CACHES['default'] else ''
env['PROJECT_UPDATE_ID'] = str(project_update.pk)
env['ANSIBLE_CALLBACK_PLUGINS'] = self.get_path_to('..', 'plugins', 'callback')
env['ANSIBLE_STDOUT_CALLBACK'] = 'awx_display'
return env return env
def _build_scm_url_extra_vars(self, project_update, **kwargs): def _build_scm_url_extra_vars(self, project_update, **kwargs):
@@ -1480,16 +1475,6 @@ class RunProjectUpdate(BaseTask):
def get_idle_timeout(self): def get_idle_timeout(self):
return getattr(settings, 'PROJECT_UPDATE_IDLE_TIMEOUT', None) return getattr(settings, 'PROJECT_UPDATE_IDLE_TIMEOUT', None)
def get_stdout_handle(self, instance):
stdout_handle = super(RunProjectUpdate, self).get_stdout_handle(instance)
pk = instance.pk
def raw_callback(data):
instance_actual = self.update_model(pk)
result_stdout_text = instance_actual.result_stdout_text + data
self.update_model(pk, result_stdout_text=result_stdout_text)
return OutputEventFilter(stdout_handle, raw_callback=raw_callback)
def _update_dependent_inventories(self, project_update, dependent_inventory_sources): def _update_dependent_inventories(self, project_update, dependent_inventory_sources):
project_request_id = '' if self.request.id is None else self.request.id project_request_id = '' if self.request.id is None else self.request.id
scm_revision = project_update.project.scm_revision scm_revision = project_update.project.scm_revision
@@ -1615,6 +1600,8 @@ class RunInventoryUpdate(BaseTask):
name = 'awx.main.tasks.run_inventory_update' name = 'awx.main.tasks.run_inventory_update'
model = InventoryUpdate model = InventoryUpdate
event_model = InventoryUpdateEvent
event_data_key = 'inventory_update_id'
def build_private_data(self, inventory_update, **kwargs): def build_private_data(self, inventory_update, **kwargs):
""" """
@@ -1986,16 +1973,6 @@ class RunInventoryUpdate(BaseTask):
args.append('--traceback') args.append('--traceback')
return args return args
def get_stdout_handle(self, instance):
stdout_handle = super(RunInventoryUpdate, self).get_stdout_handle(instance)
pk = instance.pk
def raw_callback(data):
instance_actual = self.update_model(pk)
result_stdout_text = instance_actual.result_stdout_text + data
self.update_model(pk, result_stdout_text=result_stdout_text)
return OutputEventFilter(stdout_handle, raw_callback=raw_callback)
def build_cwd(self, inventory_update, **kwargs): def build_cwd(self, inventory_update, **kwargs):
return self.get_path_to('..', 'plugins', 'inventory') return self.get_path_to('..', 'plugins', 'inventory')
@@ -2042,6 +2019,7 @@ class RunAdHocCommand(BaseTask):
name = 'awx.main.tasks.run_ad_hoc_command' name = 'awx.main.tasks.run_ad_hoc_command'
model = AdHocCommand model = AdHocCommand
event_model = AdHocCommandEvent
event_data_key = 'ad_hoc_command_id' event_data_key = 'ad_hoc_command_id'
def build_private_data(self, ad_hoc_command, **kwargs): def build_private_data(self, ad_hoc_command, **kwargs):
@@ -2199,29 +2177,6 @@ class RunAdHocCommand(BaseTask):
d[re.compile(r'Password:\s*?$', re.M)] = 'ssh_password' d[re.compile(r'Password:\s*?$', re.M)] = 'ssh_password'
return d return d
def get_stdout_handle(self, instance):
'''
Wrap stdout file object to capture events.
'''
stdout_handle = super(RunAdHocCommand, self).get_stdout_handle(instance)
if getattr(settings, 'USE_CALLBACK_QUEUE', False):
dispatcher = CallbackQueueDispatcher()
def ad_hoc_command_event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
if 'uuid' in event_data:
cache_event = cache.get('ev-{}'.format(event_data['uuid']), None)
if cache_event is not None:
event_data.update(cache_event)
dispatcher.dispatch(event_data)
else:
def ad_hoc_command_event_callback(event_data):
event_data.setdefault(self.event_data_key, instance.id)
AdHocCommandEvent.create_from_data(**event_data)
return OutputEventFilter(stdout_handle, ad_hoc_command_event_callback)
def should_use_proot(self, instance, **kwargs): def should_use_proot(self, instance, **kwargs):
''' '''
Return whether this task should use proot. Return whether this task should use proot.
@@ -2233,6 +2188,8 @@ class RunSystemJob(BaseTask):
name = 'awx.main.tasks.run_system_job' name = 'awx.main.tasks.run_system_job'
model = SystemJob model = SystemJob
event_model = SystemJobEvent
event_data_key = 'system_job_id'
def build_args(self, system_job, **kwargs): def build_args(self, system_job, **kwargs):
args = ['awx-manage', system_job.job_type] args = ['awx-manage', system_job.job_type]
@@ -2259,16 +2216,6 @@ class RunSystemJob(BaseTask):
logger.exception("%s Failed to parse system job", system_job.log_format) logger.exception("%s Failed to parse system job", system_job.log_format)
return args return args
def get_stdout_handle(self, instance):
stdout_handle = super(RunSystemJob, self).get_stdout_handle(instance)
pk = instance.pk
def raw_callback(data):
instance_actual = self.update_model(pk)
result_stdout_text = instance_actual.result_stdout_text + data
self.update_model(pk, result_stdout_text=result_stdout_text)
return OutputEventFilter(stdout_handle, raw_callback=raw_callback)
def build_env(self, instance, **kwargs): def build_env(self, instance, **kwargs):
env = super(RunSystemJob, self).build_env(instance, env = super(RunSystemJob, self).build_env(instance,
**kwargs) **kwargs)