diff --git a/awx/api/filters.py b/awx/api/filters.py index 5146ff0cd2..6885d73326 100644 --- a/awx/api/filters.py +++ b/awx/api/filters.py @@ -242,7 +242,7 @@ class FieldLookupBackend(BaseFilterBackend): queryset = queryset.filter(q) queryset = queryset.filter(*args).distinct() return queryset - except (FieldError, FieldDoesNotExist, ValueError) as e: + except (FieldError, FieldDoesNotExist, ValueError, TypeError) as e: raise ParseError(e.args[0]) except ValidationError as e: raise ParseError(e.messages) diff --git a/awx/lib/tower_display_callback/display.py b/awx/lib/tower_display_callback/display.py index 128c9349c7..ad5e8ba37a 100644 --- a/awx/lib/tower_display_callback/display.py +++ b/awx/lib/tower_display_callback/display.py @@ -26,7 +26,7 @@ import uuid from ansible.utils.display import Display # Tower Display Callback -from tower_display_callback.events import event_context +from .events import event_context __all__ = [] diff --git a/awx/lib/tower_display_callback/events.py b/awx/lib/tower_display_callback/events.py index 86fab2895b..c17cf2c7f1 100644 --- a/awx/lib/tower_display_callback/events.py +++ b/awx/lib/tower_display_callback/events.py @@ -22,14 +22,75 @@ import base64 import contextlib import datetime import json +import logging import multiprocessing import os import threading import uuid +# Kombu +from kombu import Connection, Exchange, Producer + __all__ = ['event_context'] +class CallbackQueueEventDispatcher(object): + + def __init__(self): + self.callback_connection = os.getenv('CALLBACK_CONNECTION', None) + self.connection_queue = os.getenv('CALLBACK_QUEUE', '') + self.connection = None + self.exchange = None + self._init_logging() + + def _init_logging(self): + try: + self.job_callback_debug = int(os.getenv('JOB_CALLBACK_DEBUG', '0')) + except ValueError: + self.job_callback_debug = 0 + self.logger = logging.getLogger('awx.plugins.callback.job_event_callback') + if self.job_callback_debug >= 2: + self.logger.setLevel(logging.DEBUG) + elif self.job_callback_debug >= 1: + self.logger.setLevel(logging.INFO) + else: + self.logger.setLevel(logging.WARNING) + handler = logging.StreamHandler() + formatter = logging.Formatter('%(levelname)-8s %(process)-8d %(message)s') + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.propagate = False + + def dispatch(self, obj): + if not self.callback_connection or not self.connection_queue: + return + active_pid = os.getpid() + for retry_count in xrange(4): + try: + if not hasattr(self, 'connection_pid'): + self.connection_pid = active_pid + if self.connection_pid != active_pid: + self.connection = None + if self.connection is None: + self.connection = Connection(self.callback_connection) + self.exchange = Exchange(self.connection_queue, type='direct') + + producer = Producer(self.connection) + producer.publish(obj, + serializer='json', + compression='bzip2', + exchange=self.exchange, + declare=[self.exchange], + routing_key=self.connection_queue) + return + except Exception, e: + self.logger.info('Publish Job Event Exception: %r, retry=%d', e, + retry_count, exc_info=True) + retry_count += 1 + if retry_count >= 3: + break + + class EventContext(object): ''' Store global and local (per thread/process) data associated with callback @@ -38,6 +99,7 @@ class EventContext(object): def __init__(self): self.display_lock = multiprocessing.RLock() + self.dispatcher = CallbackQueueEventDispatcher() def add_local(self, **kwargs): if not hasattr(self, '_local'): @@ -111,7 +173,9 @@ class EventContext(object): if event_data.get(key, False): event = key break - + max_res = int(os.getenv("MAX_EVENT_RES", 700000)) + if event not in ('playbook_on_stats',) and "res" in event_data and len(str(event_data['res'])) > max_res: + event_data['res'] = {} event_dict = dict(event=event, event_data=event_data) for key in event_data.keys(): if key in ('job_id', 'ad_hoc_command_id', 'uuid', 'parent_uuid', 'created', 'artifact_data'): @@ -136,7 +200,9 @@ class EventContext(object): fileobj.flush() def dump_begin(self, fileobj): - self.dump(fileobj, self.get_begin_dict()) + begin_dict = self.get_begin_dict() + self.dispatcher.dispatch(begin_dict) + self.dump(fileobj, {'uuid': begin_dict['uuid']}) def dump_end(self, fileobj): self.dump(fileobj, self.get_end_dict(), flush=True) diff --git a/awx/lib/tower_display_callback/module.py b/awx/lib/tower_display_callback/module.py index e61ef17624..59faa7ac79 100644 --- a/awx/lib/tower_display_callback/module.py +++ b/awx/lib/tower_display_callback/module.py @@ -29,8 +29,8 @@ from ansible.plugins.callback import CallbackBase from ansible.plugins.callback.default import CallbackModule as DefaultCallbackModule # Tower Display Callback -from tower_display_callback.events import event_context -from tower_display_callback.minimal import CallbackModule as MinimalCallbackModule +from .events import event_context +from .minimal import CallbackModule as MinimalCallbackModule class BaseCallbackModule(CallbackBase): diff --git a/awx/main/consumers.py b/awx/main/consumers.py index a8c56a264d..2cb1f450f2 100644 --- a/awx/main/consumers.py +++ b/awx/main/consumers.py @@ -6,6 +6,7 @@ from channels import Group from channels.sessions import channel_session from django.contrib.auth.models import User +from django.core.serializers.json import DjangoJSONEncoder from awx.main.models.organization import AuthToken @@ -86,4 +87,4 @@ def ws_receive(message): def emit_channel_notification(group, payload): - Group(group).send({"text": json.dumps(payload)}) + Group(group).send({"text": json.dumps(payload, cls=DjangoJSONEncoder)}) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index c0105b2587..7b959f6781 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -21,6 +21,7 @@ logger = logging.getLogger('awx.main.commands.run_callback_receiver') class CallbackBrokerWorker(ConsumerMixin): def __init__(self, connection): self.connection = connection + self.partial_events = {} def get_consumers(self, Consumer, channel): return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, @@ -31,18 +32,28 @@ class CallbackBrokerWorker(ConsumerMixin): def process_task(self, body, message): try: - if 'event' not in body: - raise Exception('Payload does not have an event') if 'job_id' not in body and 'ad_hoc_command_id' not in body: raise Exception('Payload does not have a job_id or ad_hoc_command_id') if settings.DEBUG: logger.info('Body: {}'.format(body)) logger.info('Message: {}'.format(message)) try: - if 'job_id' in body: - JobEvent.create_from_data(**body) - elif 'ad_hoc_command_id' in body: - AdHocCommandEvent.create_from_data(**body) + # If event came directly from callback without counter/stdout, + # save it until the rest of the event arrives. + if 'counter' not in body: + if 'uuid' in body: + self.partial_events[body['uuid']] = body + # If event has counter, try to combine it with any event data + # already received for the same uuid, then create the actual + # job event record. + else: + if 'uuid' in body: + partial_event = self.partial_events.pop(body['uuid'], {}) + body.update(partial_event) + if 'job_id' in body: + JobEvent.create_from_data(**body) + elif 'ad_hoc_command_id' in body: + AdHocCommandEvent.create_from_data(**body) except DatabaseError as e: logger.error('Database Error Saving Job Event: {}'.format(e)) except Exception as exc: diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index b7acf21775..19b62c7694 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -1073,8 +1073,8 @@ class JobEvent(CreatedModifiedModel): from awx.main.models.inventory import Host hostnames = set() try: - for v in self.event_data.values(): - hostnames.update(v.keys()) + for stat in ('changed', 'dark', 'failures', 'ok', 'processed', 'skipped'): + hostnames.update(self.event_data.get(stat, {}).keys()) except AttributeError: # In case event_data or v isn't a dict. pass with ignore_inventory_computed_fields(): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e0dcae0fca..addbe4c8f2 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -809,6 +809,7 @@ class RunJob(BaseTask): env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_TOKEN'] = job.task_auth_token or '' env['TOWER_HOST'] = settings.TOWER_URL_BASE + env['MAX_EVENT_RES'] = settings.MAX_EVENT_RES_DATA env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE env['CALLBACK_CONNECTION'] = settings.BROKER_URL if getattr(settings, 'JOB_CALLBACK_DEBUG', False): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index fbb9c8fb73..876ba56c85 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -152,6 +152,10 @@ REMOTE_HOST_HEADERS = ['REMOTE_ADDR', 'REMOTE_HOST'] # Note: This setting may be overridden by database settings. STDOUT_MAX_BYTES_DISPLAY = 1048576 +# The maximum size of the ansible callback event's res data structure +# beyond this limit and the value will be removed +MAX_EVENT_RES_DATA = 700000 + # Note: This setting may be overridden by database settings. EVENT_STDOUT_MAX_BYTES_DISPLAY = 1024 @@ -522,17 +526,6 @@ ANSIBLE_FORCE_COLOR = True # the celery task. AWX_TASK_ENV = {} -# Maximum number of job events processed by the callback receiver worker process -# before it recycles -JOB_EVENT_RECYCLE_THRESHOLD = 3000 - -# Number of workers used to proecess job events in parallel -JOB_EVENT_WORKERS = 4 - -# Maximum number of job events that can be waiting on a single worker queue before -# it can be skipped as too busy -JOB_EVENT_MAX_QUEUE_SIZE = 100 - # Flag to enable/disable updating hosts M2M when saving job events. CAPTURE_JOB_EVENT_HOSTS = False diff --git a/awx/sso/conf.py b/awx/sso/conf.py index c3ab7f7e56..237209a017 100644 --- a/awx/sso/conf.py +++ b/awx/sso/conf.py @@ -331,6 +331,7 @@ register( category=_('LDAP'), category_slug='ldap', feature_required='ldap', + default='MemberDNGroupType', ) register( diff --git a/awx/static/api/api.css b/awx/static/api/api.css index 61d51fae12..3b18c4273d 100644 --- a/awx/static/api/api.css +++ b/awx/static/api/api.css @@ -151,6 +151,9 @@ body .prettyprint .lit { body .prettyprint .str { color: #D9534F; } +body div.ansi_back { + display: inline-block; +} body .well.tab-content { padding: 20px; diff --git a/awx/ui/client/src/configuration/auth-form/configuration-auth.controller.js b/awx/ui/client/src/configuration/auth-form/configuration-auth.controller.js index 33d1a53c37..5c3d31a0bc 100644 --- a/awx/ui/client/src/configuration/auth-form/configuration-auth.controller.js +++ b/awx/ui/client/src/configuration/auth-form/configuration-auth.controller.js @@ -99,9 +99,9 @@ export default [ var dropdownOptions = [ {label: i18n._('Azure AD'), value: 'azure'}, - {label: i18n._('Github'), value: 'github'}, - {label: i18n._('Github Org'), value: 'github_org'}, - {label: i18n._('Github Team'), value: 'github_team'}, + {label: i18n._('GitHub'), value: 'github'}, + {label: i18n._('GitHub Org'), value: 'github_org'}, + {label: i18n._('GithHub Team'), value: 'github_team'}, {label: i18n._('Google OAuth2'), value: 'google_oauth'}, {label: i18n._('LDAP'), value: 'ldap'}, {label: i18n._('RADIUS'), value: 'radius'}, diff --git a/awx/ui/client/src/configuration/auth-form/configuration-auth.partial.html b/awx/ui/client/src/configuration/auth-form/configuration-auth.partial.html index 5efeeed532..71192e17c6 100644 --- a/awx/ui/client/src/configuration/auth-form/configuration-auth.partial.html +++ b/awx/ui/client/src/configuration/auth-form/configuration-auth.partial.html @@ -1,7 +1,7 @@