mirror of
https://github.com/ansible/awx.git
synced 2026-01-09 23:12:08 -03:30
Merge branch 'ramparts_and_ha' into devel
* ramparts_and_ha: (21 commits) Rename database migrations for devel integration Integrate callback receiver refactoring Fix an issue running jobs in the cluster Implement a more dynamic celery queue system Purge old munin monitors and tools Refactor Tower HA Instance logic and models Docker compose improvements Initial Docker Compose workflow for Tower cluster Add memcached role for setup playbook Removing qpid from deb packaging Refactor rabbitmq role Integrate memcached into setup playbook Remove mongodb shutdown task Remove dependency on erlang_sd_notify Add initial rabbitmq role Initial rabbitmq setup playbook integration Update development environment for rabbit Replace qpid with rabbitmq Remove redis role from setup playbook Update qpid packaging, remove migrations ...
This commit is contained in:
commit
f5d2c5c18a
22
Makefile
22
Makefile
@ -15,6 +15,8 @@ COMPOSE_TAG ?= devel
|
||||
# NOTE: This defaults the container image version to the branch that's active
|
||||
# COMPOSE_TAG ?= $(GIT_BRANCH)
|
||||
|
||||
COMPOSE_HOST ?= $(shell hostname)
|
||||
|
||||
VENV_BASE ?= /venv
|
||||
SCL_PREFIX ?=
|
||||
CELERY_SCHEDULE_FILE ?= /celerybeat-schedule
|
||||
@ -328,7 +330,7 @@ init:
|
||||
if [ "$(VENV_BASE)" ]; then \
|
||||
. $(VENV_BASE)/tower/bin/activate; \
|
||||
fi; \
|
||||
tower-manage register_instance --primary --hostname=127.0.0.1; \
|
||||
tower-manage register_instance --hostname=$(COMPOSE_HOST); \
|
||||
|
||||
# Refresh development environment after pulling new code.
|
||||
refresh: clean requirements_dev version_file develop migrate
|
||||
@ -379,6 +381,12 @@ honcho:
|
||||
fi; \
|
||||
honcho start
|
||||
|
||||
flower:
|
||||
@if [ "$(VENV_BASE)" ]; then \
|
||||
. $(VENV_BASE)/tower/bin/activate; \
|
||||
fi; \
|
||||
$(PYTHON) manage.py celery flower --address=0.0.0.0 --port=5555 --broker=amqp://guest:guest@$(RABBITMQ_HOST):5672//
|
||||
|
||||
# Run the built-in development webserver (by default on http://localhost:8013).
|
||||
runserver:
|
||||
@if [ "$(VENV_BASE)" ]; then \
|
||||
@ -391,7 +399,8 @@ celeryd:
|
||||
@if [ "$(VENV_BASE)" ]; then \
|
||||
. $(VENV_BASE)/tower/bin/activate; \
|
||||
fi; \
|
||||
$(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,2 -Ofair --schedule=$(CELERY_SCHEDULE_FILE)
|
||||
$(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default
|
||||
#$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE)
|
||||
|
||||
# Run to start the zeromq callback receiver
|
||||
receiver:
|
||||
@ -404,7 +413,11 @@ taskmanager:
|
||||
@if [ "$(VENV_BASE)" ]; then \
|
||||
. $(VENV_BASE)/tower/bin/activate; \
|
||||
fi; \
|
||||
$(PYTHON) manage.py run_task_system
|
||||
if [ "$(COMPOSE_HOST)" == "tower_1" ] || [ "$(COMPOSE_HOST)" == "tower" ]; then \
|
||||
$(PYTHON) manage.py run_task_system; \
|
||||
else \
|
||||
while true; do sleep 2; done; \
|
||||
fi
|
||||
|
||||
socketservice:
|
||||
@if [ "$(VENV_BASE)" ]; then \
|
||||
@ -749,6 +762,9 @@ docker-auth:
|
||||
docker-compose: docker-auth
|
||||
TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose.yml up --no-recreate
|
||||
|
||||
docker-compose-cluster: docker-auth
|
||||
TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose-cluster.yml up
|
||||
|
||||
docker-compose-test: docker-auth
|
||||
cd tools && TAG=$(COMPOSE_TAG) docker-compose run --rm --service-ports tower /bin/bash
|
||||
|
||||
|
||||
1
Procfile
1
Procfile
@ -4,3 +4,4 @@ taskmanager: make taskmanager
|
||||
receiver: make receiver
|
||||
socketservice: make socketservice
|
||||
factcacher: make factcacher
|
||||
flower: make flower
|
||||
@ -171,28 +171,13 @@ class ApiV1PingView(APIView):
|
||||
# Most of this response is canned; just build the dictionary.
|
||||
response = {
|
||||
'ha': is_ha_environment(),
|
||||
'role': Instance.objects.my_role(),
|
||||
'version': get_awx_version(),
|
||||
}
|
||||
|
||||
# If this is an HA environment, we also include the IP address of
|
||||
# all of the instances.
|
||||
#
|
||||
# Set up a default structure.
|
||||
response['instances'] = {
|
||||
'primary': None,
|
||||
'secondaries': [],
|
||||
}
|
||||
|
||||
# Add all of the instances into the structure.
|
||||
response['instances'] = []
|
||||
for instance in Instance.objects.all():
|
||||
if instance.primary:
|
||||
response['instances']['primary'] = instance.hostname
|
||||
else:
|
||||
response['instances']['secondaries'].append(instance.hostname)
|
||||
response['instances']['secondaries'].sort()
|
||||
|
||||
# Done; return the response.
|
||||
response['instances'].append(instance.hostname)
|
||||
response['instances'].sort()
|
||||
return Response(response)
|
||||
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
# Copyright (c) 2015 Ansible, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
import socket
|
||||
from optparse import make_option
|
||||
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
@ -21,13 +22,9 @@ class BaseCommandInstance(BaseCommand):
|
||||
|
||||
def __init__(self):
|
||||
super(BaseCommandInstance, self).__init__()
|
||||
self.enforce_primary_role = False
|
||||
self.enforce_roles = False
|
||||
self.enforce_hostname_set = False
|
||||
self.enforce_unique_find = False
|
||||
|
||||
self.option_primary = False
|
||||
self.option_secondary = False
|
||||
self.option_hostname = None
|
||||
self.option_uuid = None
|
||||
|
||||
@ -38,48 +35,24 @@ class BaseCommandInstance(BaseCommand):
|
||||
def generate_option_hostname():
|
||||
return make_option('--hostname',
|
||||
dest='hostname',
|
||||
default='',
|
||||
default=socket.gethostname(),
|
||||
help='Find instance by specified hostname.')
|
||||
|
||||
@staticmethod
|
||||
def generate_option_hostname_set():
|
||||
return make_option('--hostname',
|
||||
dest='hostname',
|
||||
default='',
|
||||
default=socket.gethostname(),
|
||||
help='Hostname to assign to the new instance.')
|
||||
|
||||
@staticmethod
|
||||
def generate_option_primary():
|
||||
return make_option('--primary',
|
||||
action='store_true',
|
||||
default=False,
|
||||
dest='primary',
|
||||
help='Register instance as primary.')
|
||||
|
||||
@staticmethod
|
||||
def generate_option_secondary():
|
||||
return make_option('--secondary',
|
||||
action='store_true',
|
||||
default=False,
|
||||
dest='secondary',
|
||||
help='Register instance as secondary.')
|
||||
|
||||
@staticmethod
|
||||
def generate_option_uuid():
|
||||
#TODO: Likely deprecated, maybe uuid becomes the cluster ident?
|
||||
return make_option('--uuid',
|
||||
dest='uuid',
|
||||
default='',
|
||||
help='Find instance by specified uuid.')
|
||||
|
||||
def include_option_primary_role(self):
|
||||
BaseCommand.option_list += ( BaseCommandInstance.generate_option_primary(), )
|
||||
self.enforce_primary_role = True
|
||||
|
||||
def include_options_roles(self):
|
||||
self.include_option_primary_role()
|
||||
BaseCommand.option_list += ( BaseCommandInstance.generate_option_secondary(), )
|
||||
self.enforce_roles = True
|
||||
|
||||
def include_option_hostname_set(self):
|
||||
BaseCommand.option_list += ( BaseCommandInstance.generate_option_hostname_set(), )
|
||||
self.enforce_hostname_set = True
|
||||
@ -94,12 +67,6 @@ class BaseCommandInstance(BaseCommand):
|
||||
def get_option_uuid(self):
|
||||
return self.option_uuid
|
||||
|
||||
def is_option_primary(self):
|
||||
return self.option_primary
|
||||
|
||||
def is_option_secondary(self):
|
||||
return self.option_secondary
|
||||
|
||||
def get_UUID(self):
|
||||
return self.UUID
|
||||
|
||||
@ -109,31 +76,13 @@ class BaseCommandInstance(BaseCommand):
|
||||
|
||||
@property
|
||||
def usage_error(self):
|
||||
if self.enforce_roles and self.enforce_hostname_set:
|
||||
return CommandError('--hostname and one of --primary or --secondary is required.')
|
||||
elif self.enforce_hostname_set:
|
||||
if self.enforce_hostname_set:
|
||||
return CommandError('--hostname is required.')
|
||||
elif self.enforce_primary_role:
|
||||
return CommandError('--primary is required.')
|
||||
elif self.enforce_roles:
|
||||
return CommandError('One of --primary or --secondary is required.')
|
||||
|
||||
def handle(self, *args, **options):
|
||||
if self.enforce_hostname_set and self.enforce_unique_find:
|
||||
raise OptionEnforceError('Can not enforce --hostname as a setter and --hostname as a getter')
|
||||
|
||||
if self.enforce_roles:
|
||||
self.option_primary = options['primary']
|
||||
self.option_secondary = options['secondary']
|
||||
|
||||
if self.is_option_primary() and self.is_option_secondary() or not (self.is_option_primary() or self.is_option_secondary()):
|
||||
raise self.usage_error
|
||||
elif self.enforce_primary_role:
|
||||
if options['primary']:
|
||||
self.option_primary = options['primary']
|
||||
else:
|
||||
raise self.usage_error
|
||||
|
||||
if self.enforce_hostname_set:
|
||||
if options['hostname']:
|
||||
self.option_hostname = options['hostname']
|
||||
@ -162,11 +111,4 @@ class BaseCommandInstance(BaseCommand):
|
||||
|
||||
@staticmethod
|
||||
def instance_str(instance):
|
||||
return BaseCommandInstance.__instance_str(instance, ('uuid', 'hostname', 'role'))
|
||||
|
||||
def update_projects(self, instance):
|
||||
"""Update all projects, ensuring the job runs against this instance,
|
||||
which is the primary instance.
|
||||
"""
|
||||
for project in Project.objects.all():
|
||||
project.update()
|
||||
return BaseCommandInstance.__instance_str(instance, ('uuid', 'hostname'))
|
||||
|
||||
@ -9,22 +9,14 @@ from awx.main.models import Instance
|
||||
instance_str = BaseCommandInstance.instance_str
|
||||
|
||||
class Command(BaseCommandInstance):
|
||||
"""Internal tower command.
|
||||
"""
|
||||
Internal tower command.
|
||||
Regsiter this instance with the database for HA tracking.
|
||||
|
||||
This command is idempotent.
|
||||
|
||||
This command will error out in the following conditions:
|
||||
|
||||
* Attempting to register a secondary machine with no primary machines.
|
||||
* Attempting to register a primary instance when a different primary
|
||||
instance exists.
|
||||
* Attempting to re-register an instance with changed values.
|
||||
"""
|
||||
def __init__(self):
|
||||
super(Command, self).__init__()
|
||||
|
||||
self.include_options_roles()
|
||||
self.include_option_hostname_set()
|
||||
|
||||
def handle(self, *args, **options):
|
||||
@ -32,32 +24,10 @@ class Command(BaseCommandInstance):
|
||||
|
||||
uuid = self.get_UUID()
|
||||
|
||||
# Is there an existing record for this machine? If so, retrieve that record and look for issues.
|
||||
try:
|
||||
instance = Instance.objects.get(uuid=uuid)
|
||||
if instance.hostname != self.get_option_hostname():
|
||||
raise CommandError('Instance already registered with a different hostname %s.' % instance_str(instance))
|
||||
print("Instance already registered %s" % instance_str(instance))
|
||||
except Instance.DoesNotExist:
|
||||
# Get a status on primary machines (excluding this one, regardless of its status).
|
||||
other_instances = Instance.objects.exclude(uuid=uuid)
|
||||
primaries = other_instances.filter(primary=True).count()
|
||||
|
||||
# If this instance is being set to primary and a *different* primary machine alreadyexists, error out.
|
||||
if self.is_option_primary() and primaries:
|
||||
raise CommandError('Another instance is already registered as primary.')
|
||||
|
||||
# Lastly, if there are no primary machines at all, then don't allow this to be registered as a secondary machine.
|
||||
if self.is_option_secondary() and not primaries:
|
||||
raise CommandError('Unable to register a secondary machine until another primary machine has been registered.')
|
||||
|
||||
# Okay, we've checked for appropriate errata; perform the registration.
|
||||
instance = Instance(uuid=uuid, primary=self.is_option_primary(), hostname=self.get_option_hostname())
|
||||
instance.save()
|
||||
|
||||
# If this is a primary instance, update projects.
|
||||
if instance.primary:
|
||||
self.update_projects(instance)
|
||||
|
||||
# Done!
|
||||
print('Successfully registered instance %s.' % instance_str(instance))
|
||||
instance = Instance.objects.filter(hostname=self.get_option_hostname())
|
||||
if instance.exists():
|
||||
print("Instance already registered %s" % instance_str(instance[0]))
|
||||
return
|
||||
instance = Instance(uuid=uuid, hostname=self.get_option_hostname())
|
||||
instance.save()
|
||||
print('Successfully registered instance %s.' % instance_str(instance))
|
||||
|
||||
@ -8,12 +8,17 @@ import datetime
|
||||
import logging
|
||||
import signal
|
||||
import time
|
||||
from multiprocessing import Process, Queue
|
||||
from Queue import Empty as QueueEmpty
|
||||
|
||||
from kombu import Connection, Exchange, Queue
|
||||
from kombu.mixins import ConsumerMixin
|
||||
from kombu.log import get_logger
|
||||
from kombu.utils import kwdict, reprcall
|
||||
from kombu.utils.debug import setup_logging
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
from django.core.management.base import NoArgsCommand
|
||||
from django.core.cache import cache
|
||||
from django.db import transaction, DatabaseError
|
||||
from django.utils.dateparse import parse_datetime
|
||||
from django.utils.timezone import FixedOffset
|
||||
@ -21,160 +26,53 @@ from django.db import connection
|
||||
|
||||
# AWX
|
||||
from awx.main.models import * # noqa
|
||||
from awx.main.socket import Socket
|
||||
from awx.main.socket_queue import Socket
|
||||
|
||||
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
||||
|
||||
WORKERS = 4
|
||||
class CallbackBrokerWorker(ConsumerMixin):
|
||||
|
||||
class CallbackReceiver(object):
|
||||
def __init__(self):
|
||||
self.parent_mappings = {}
|
||||
def __init__(self, connection):
|
||||
self.connection = connection
|
||||
|
||||
def run_subscriber(self, use_workers=True):
|
||||
def shutdown_handler(active_workers):
|
||||
def _handler(signum, frame):
|
||||
try:
|
||||
for active_worker in active_workers:
|
||||
active_worker.terminate()
|
||||
signal.signal(signum, signal.SIG_DFL)
|
||||
os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it
|
||||
except Exception:
|
||||
# TODO: LOG
|
||||
pass
|
||||
return _handler
|
||||
def get_consumers(self, Consumer, channel):
|
||||
return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE,
|
||||
Exchange(settings.CALLBACK_QUEUE, type='direct'),
|
||||
routing_key=settings.CALLBACK_QUEUE)],
|
||||
accept=['json'],
|
||||
callbacks=[self.process_task])]
|
||||
|
||||
def check_pre_handle(data):
|
||||
event = data.get('event', '')
|
||||
if event == 'playbook_on_play_start':
|
||||
return True
|
||||
return False
|
||||
|
||||
worker_queues = []
|
||||
|
||||
if use_workers:
|
||||
connection.close()
|
||||
for idx in range(WORKERS):
|
||||
queue_actual = Queue(settings.JOB_EVENT_MAX_QUEUE_SIZE)
|
||||
w = Process(target=self.callback_worker, args=(queue_actual, idx,))
|
||||
w.start()
|
||||
if settings.DEBUG:
|
||||
logger.info('Started worker %s' % str(idx))
|
||||
worker_queues.append([0, queue_actual, w])
|
||||
elif settings.DEBUG:
|
||||
logger.warn('Started callback receiver (no workers)')
|
||||
|
||||
main_process = Process(
|
||||
target=self.callback_handler,
|
||||
args=(use_workers, worker_queues,)
|
||||
)
|
||||
main_process.daemon = True
|
||||
main_process.start()
|
||||
|
||||
signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process]))
|
||||
signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process]))
|
||||
while True:
|
||||
workers_changed = False
|
||||
idx = 0
|
||||
for queue_worker in worker_queues:
|
||||
if not queue_worker[2].is_alive():
|
||||
logger.warn("Worker %s was not alive, restarting" % str(queue_worker))
|
||||
workers_changed = True
|
||||
queue_worker[2].join()
|
||||
w = Process(target=self.callback_worker, args=(queue_worker[1], idx,))
|
||||
w.daemon = True
|
||||
w.start()
|
||||
signal.signal(signal.SIGINT, shutdown_handler([w]))
|
||||
signal.signal(signal.SIGTERM, shutdown_handler([w]))
|
||||
queue_worker[2] = w
|
||||
idx += 1
|
||||
if workers_changed:
|
||||
signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process]))
|
||||
signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process]))
|
||||
if not main_process.is_alive():
|
||||
logger.error("Main process is not alive")
|
||||
for queue_worker in worker_queues:
|
||||
queue_worker[2].terminate()
|
||||
break
|
||||
time.sleep(0.1)
|
||||
|
||||
def write_queue_worker(self, preferred_queue, worker_queues, message):
|
||||
queue_order = sorted(range(WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0)
|
||||
for queue_actual in queue_order:
|
||||
try:
|
||||
worker_actual = worker_queues[queue_actual]
|
||||
worker_actual[1].put(message, block=True, timeout=2)
|
||||
worker_actual[0] += 1
|
||||
return queue_actual
|
||||
except Exception:
|
||||
logger.warn("Could not write to queue %s" % preferred_queue)
|
||||
continue
|
||||
return None
|
||||
|
||||
def callback_handler(self, use_workers, worker_queues):
|
||||
total_messages = 0
|
||||
last_parent_events = {}
|
||||
with Socket('callbacks', 'r') as callbacks:
|
||||
for message in callbacks.listen():
|
||||
total_messages += 1
|
||||
if 'ad_hoc_command_id' in message:
|
||||
self.process_ad_hoc_event(message)
|
||||
elif not use_workers:
|
||||
self.process_job_event(message)
|
||||
else:
|
||||
job_parent_events = last_parent_events.get(message['job_id'], {})
|
||||
if message['event'] in ('playbook_on_play_start', 'playbook_on_stats', 'playbook_on_vars_prompt'):
|
||||
parent = job_parent_events.get('playbook_on_start', None)
|
||||
elif message['event'] in ('playbook_on_notify',
|
||||
'playbook_on_setup',
|
||||
'playbook_on_task_start',
|
||||
'playbook_on_no_hosts_matched',
|
||||
'playbook_on_no_hosts_remaining',
|
||||
'playbook_on_include',
|
||||
'playbook_on_import_for_host',
|
||||
'playbook_on_not_import_for_host'):
|
||||
parent = job_parent_events.get('playbook_on_play_start', None)
|
||||
elif message['event'].startswith('runner_on_') or message['event'].startswith('runner_item_on_'):
|
||||
list_parents = []
|
||||
list_parents.append(job_parent_events.get('playbook_on_setup', None))
|
||||
list_parents.append(job_parent_events.get('playbook_on_task_start', None))
|
||||
list_parents = sorted(filter(lambda x: x is not None, list_parents), cmp=lambda x, y: y.id - x.id)
|
||||
parent = list_parents[0] if len(list_parents) > 0 else None
|
||||
else:
|
||||
parent = None
|
||||
if parent is not None:
|
||||
message['parent'] = parent.id
|
||||
if 'created' in message:
|
||||
del(message['created'])
|
||||
if message['event'] in ('playbook_on_start', 'playbook_on_play_start',
|
||||
'playbook_on_setup', 'playbook_on_task_start'):
|
||||
job_parent_events[message['event']] = self.process_job_event(message)
|
||||
else:
|
||||
if message['event'] == 'playbook_on_stats':
|
||||
job_parent_events = {}
|
||||
|
||||
actual_queue = self.write_queue_worker(total_messages % WORKERS, worker_queues, message)
|
||||
# NOTE: It might be better to recycle the entire callback receiver process if one or more of the queues are too full
|
||||
# the drawback is that if we under extremely high load we may be legitimately taking a while to process messages
|
||||
if actual_queue is None:
|
||||
logger.error("All queues full!")
|
||||
sys.exit(1)
|
||||
last_parent_events[message['job_id']] = job_parent_events
|
||||
|
||||
@transaction.atomic
|
||||
def process_job_event(self, data):
|
||||
# Sanity check: Do we need to do anything at all?
|
||||
event = data.get('event', '')
|
||||
parent_id = data.get('parent', None)
|
||||
if not event or 'job_id' not in data:
|
||||
return
|
||||
def process_task(self, body, message):
|
||||
try:
|
||||
if "event" not in body:
|
||||
raise Exception("Payload does not have an event")
|
||||
if "job_id" not in body:
|
||||
raise Exception("Payload does not have a job_id")
|
||||
if settings.DEBUG:
|
||||
logger.info("Body: {}".format(body))
|
||||
logger.info("Message: {}".format(message))
|
||||
self.process_job_event(body)
|
||||
except Exception as exc:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
logger.error('Callback Task Processor Raised Exception: %r', exc)
|
||||
message.ack()
|
||||
|
||||
def process_job_event(self, payload):
|
||||
# Get the correct "verbose" value from the job.
|
||||
# If for any reason there's a problem, just use 0.
|
||||
if 'ad_hoc_command_id' in payload:
|
||||
event_type_key = 'ad_hoc_command_id'
|
||||
event_object_type = AdHocCommand
|
||||
else:
|
||||
event_type_key = 'job_id'
|
||||
event_object_type = Job
|
||||
|
||||
try:
|
||||
verbose = Job.objects.get(id=data['job_id']).verbosity
|
||||
verbose = event_object_type.objects.get(id=payload[event_type_key]).verbosity
|
||||
except Exception as e:
|
||||
verbose = 0
|
||||
verbose=0
|
||||
# TODO: cache
|
||||
|
||||
# Convert the datetime for the job event's creation appropriately,
|
||||
# and include a time zone for it.
|
||||
@ -182,120 +80,58 @@ class CallbackReceiver(object):
|
||||
# In the event of any issue, throw it out, and Django will just save
|
||||
# the current time.
|
||||
try:
|
||||
if not isinstance(data['created'], datetime.datetime):
|
||||
data['created'] = parse_datetime(data['created'])
|
||||
if not data['created'].tzinfo:
|
||||
data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
|
||||
if not isinstance(payload['created'], datetime.datetime):
|
||||
payload['created'] = parse_datetime(payload['created'])
|
||||
if not payload['created'].tzinfo:
|
||||
payload['created'] = payload['created'].replace(tzinfo=FixedOffset(0))
|
||||
except (KeyError, ValueError):
|
||||
data.pop('created', None)
|
||||
payload.pop('created', None)
|
||||
|
||||
# Print the data to stdout if we're in DEBUG mode.
|
||||
if settings.DEBUG:
|
||||
print(data)
|
||||
event_uuid = payload.get("uuid", '')
|
||||
parent_event_uuid = payload.get("parent_uuid", '')
|
||||
|
||||
# Sanity check: Don't honor keys that we don't recognize.
|
||||
for key in data.keys():
|
||||
if key not in ('job_id', 'event', 'event_data',
|
||||
'created', 'counter'):
|
||||
data.pop(key)
|
||||
for key in payload.keys():
|
||||
if key not in (event_type_key, 'event', 'event_data',
|
||||
'created', 'counter', 'uuid'):
|
||||
payload.pop(key)
|
||||
|
||||
# Save any modifications to the job event to the database.
|
||||
# If we get a database error of some kind, bail out.
|
||||
try:
|
||||
# If we're not in verbose mode, wipe out any module
|
||||
# arguments.
|
||||
res = data['event_data'].get('res', {})
|
||||
res = payload['event_data'].get('res', {})
|
||||
if isinstance(res, dict):
|
||||
i = res.get('invocation', {})
|
||||
if verbose == 0 and 'module_args' in i:
|
||||
i['module_args'] = ''
|
||||
|
||||
# Create a new JobEvent object.
|
||||
job_event = JobEvent(**data)
|
||||
if parent_id is not None:
|
||||
job_event.parent = JobEvent.objects.get(id=parent_id)
|
||||
job_event.save(post_process=True)
|
||||
|
||||
# Retrun the job event object.
|
||||
return job_event
|
||||
if 'ad_hoc_command_id' in payload:
|
||||
ad_hoc_command_event = AdHocCommandEvent.objects.create(**data)
|
||||
return
|
||||
|
||||
j = JobEvent(**payload)
|
||||
if payload['event'] == 'playbook_on_start':
|
||||
j.save()
|
||||
cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300)
|
||||
return
|
||||
else:
|
||||
if parent_event_uuid:
|
||||
parent_id = cache.get("{}_{}".format(payload['job_id'], parent_event_uuid), None)
|
||||
if parent_id is None:
|
||||
parent_id_obj = JobEvent.objects.filter(uuid=parent_event_uuid, job_id=payload['job_id'])
|
||||
if parent_id_obj.exists(): #Problematic if not there, means the parent hasn't been written yet... TODO
|
||||
j.parent_id = parent_id_obj[0].id
|
||||
print("Settings cache: {}_{} with value {}".format(payload['job_id'], parent_event_uuid, j.parent_id))
|
||||
cache.set("{}_{}".format(payload['job_id'], parent_event_uuid), j.parent_id, 300)
|
||||
else:
|
||||
print("Cache hit")
|
||||
j.parent_id = parent_id
|
||||
j.save()
|
||||
if event_uuid:
|
||||
cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300)
|
||||
except DatabaseError as e:
|
||||
# Log the error and bail out.
|
||||
logger.error('Database error saving job event: %s', e)
|
||||
return None
|
||||
logger.error("Database Error Saving Job Event: {}".format(e))
|
||||
|
||||
@transaction.atomic
|
||||
def process_ad_hoc_event(self, data):
|
||||
# Sanity check: Do we need to do anything at all?
|
||||
event = data.get('event', '')
|
||||
if not event or 'ad_hoc_command_id' not in data:
|
||||
return
|
||||
|
||||
# Get the correct "verbose" value from the job.
|
||||
# If for any reason there's a problem, just use 0.
|
||||
try:
|
||||
verbose = AdHocCommand.objects.get(id=data['ad_hoc_command_id']).verbosity
|
||||
except Exception as e:
|
||||
verbose = 0
|
||||
|
||||
# Convert the datetime for the job event's creation appropriately,
|
||||
# and include a time zone for it.
|
||||
#
|
||||
# In the event of any issue, throw it out, and Django will just save
|
||||
# the current time.
|
||||
try:
|
||||
if not isinstance(data['created'], datetime.datetime):
|
||||
data['created'] = parse_datetime(data['created'])
|
||||
if not data['created'].tzinfo:
|
||||
data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
|
||||
except (KeyError, ValueError):
|
||||
data.pop('created', None)
|
||||
|
||||
# Print the data to stdout if we're in DEBUG mode.
|
||||
if settings.DEBUG:
|
||||
print(data)
|
||||
|
||||
# Sanity check: Don't honor keys that we don't recognize.
|
||||
for key in data.keys():
|
||||
if key not in ('ad_hoc_command_id', 'event', 'event_data',
|
||||
'created', 'counter'):
|
||||
data.pop(key)
|
||||
|
||||
# Save any modifications to the ad hoc command event to the database.
|
||||
# If we get a database error of some kind, bail out.
|
||||
try:
|
||||
# If we're not in verbose mode, wipe out any module
|
||||
# arguments. FIXME: Needed for adhoc?
|
||||
res = data['event_data'].get('res', {})
|
||||
if isinstance(res, dict):
|
||||
i = res.get('invocation', {})
|
||||
if verbose == 0 and 'module_args' in i:
|
||||
i['module_args'] = ''
|
||||
|
||||
# Create a new AdHocCommandEvent object.
|
||||
ad_hoc_command_event = AdHocCommandEvent.objects.create(**data)
|
||||
|
||||
# Retrun the ad hoc comamnd event object.
|
||||
return ad_hoc_command_event
|
||||
except DatabaseError as e:
|
||||
# Log the error and bail out.
|
||||
logger.error('Database error saving ad hoc command event: %s', e)
|
||||
return None
|
||||
|
||||
def callback_worker(self, queue_actual, idx):
|
||||
messages_processed = 0
|
||||
while True:
|
||||
try:
|
||||
message = queue_actual.get(block=True, timeout=1)
|
||||
except QueueEmpty:
|
||||
continue
|
||||
except Exception as e:
|
||||
logger.error("Exception on listen socket, restarting: " + str(e))
|
||||
break
|
||||
self.process_job_event(message)
|
||||
messages_processed += 1
|
||||
if messages_processed >= settings.JOB_EVENT_RECYCLE_THRESHOLD:
|
||||
logger.info("Shutting down message receiver")
|
||||
break
|
||||
|
||||
class Command(NoArgsCommand):
|
||||
'''
|
||||
@ -306,9 +142,10 @@ class Command(NoArgsCommand):
|
||||
help = 'Launch the job callback receiver'
|
||||
|
||||
def handle_noargs(self, **options):
|
||||
cr = CallbackReceiver()
|
||||
try:
|
||||
cr.run_subscriber()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
with Connection(settings.BROKER_URL) as conn:
|
||||
try:
|
||||
worker = CallbackBrokerWorker(conn)
|
||||
worker.run()
|
||||
except KeyboardInterrupt:
|
||||
print('Terminating Callback Receiver')
|
||||
|
||||
|
||||
@ -14,7 +14,7 @@ from django.utils import timezone
|
||||
# AWX
|
||||
from awx.main.models.fact import Fact
|
||||
from awx.main.models.inventory import Host
|
||||
from awx.main.socket import Socket
|
||||
from awx.main.socket_queue import Socket
|
||||
|
||||
logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver')
|
||||
|
||||
|
||||
@ -16,7 +16,7 @@ from django.core.management.base import NoArgsCommand
|
||||
# AWX
|
||||
import awx
|
||||
from awx.main.models import * # noqa
|
||||
from awx.main.socket import Socket
|
||||
from awx.main.socket_queue import Socket
|
||||
|
||||
# socketio
|
||||
from socketio import socketio_manage
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
# All Rights Reserved.
|
||||
|
||||
import sys
|
||||
import socket
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import models
|
||||
@ -28,31 +29,12 @@ class InstanceManager(models.Manager):
|
||||
# If we are running unit tests, return a stub record.
|
||||
if len(sys.argv) >= 2 and sys.argv[1] == 'test':
|
||||
return self.model(id=1, primary=True,
|
||||
hostname='localhost',
|
||||
uuid='00000000-0000-0000-0000-000000000000')
|
||||
|
||||
# Return the appropriate record from the database.
|
||||
return self.get(uuid=settings.SYSTEM_UUID)
|
||||
return self.get(hostname=socket.gethostname())
|
||||
|
||||
def my_role(self):
|
||||
"""Return the role of the currently active instance, as a string
|
||||
('primary' or 'secondary').
|
||||
"""
|
||||
# If we are running unit tests, we are primary, because reasons.
|
||||
if len(sys.argv) >= 2 and sys.argv[1] == 'test':
|
||||
return 'primary'
|
||||
|
||||
# Check if this instance is primary; if so, return "primary", otherwise
|
||||
# "secondary".
|
||||
if self.me().primary:
|
||||
return 'primary'
|
||||
return 'secondary'
|
||||
|
||||
def primary(self):
|
||||
"""Return the primary instance."""
|
||||
# If we are running unit tests, return a stub record.
|
||||
if len(sys.argv) >= 2 and sys.argv[1] == 'test':
|
||||
return self.model(id=1, primary=True,
|
||||
uuid='00000000-0000-0000-0000-000000000000')
|
||||
|
||||
# Return the appropriate record from the database.
|
||||
return self.get(primary=True)
|
||||
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
|
||||
return "tower"
|
||||
|
||||
@ -71,41 +71,6 @@ class ActivityStreamMiddleware(threading.local):
|
||||
if instance.id not in self.instance_ids:
|
||||
self.instance_ids.append(instance.id)
|
||||
|
||||
|
||||
class HAMiddleware(object):
|
||||
"""A middleware class that checks to see whether the request is being
|
||||
served on a secondary instance, and redirects the request back to the
|
||||
primary instance if so.
|
||||
"""
|
||||
def process_request(self, request):
|
||||
"""Process the request, and redirect if this is a request on a
|
||||
secondary node.
|
||||
"""
|
||||
# Is this the primary node? If so, we can just return None and be done;
|
||||
# we just want normal behavior in this case.
|
||||
if Instance.objects.my_role() == 'primary':
|
||||
return None
|
||||
|
||||
# Always allow the /ping/ endpoint.
|
||||
if request.path.startswith('/api/v1/ping'):
|
||||
return None
|
||||
|
||||
# Get the primary instance.
|
||||
primary = Instance.objects.primary()
|
||||
|
||||
# If this is a request to /, then we return a special landing page that
|
||||
# informs the user that they are on the secondary instance and will
|
||||
# be redirected.
|
||||
if request.path == '/':
|
||||
return TemplateResponse(request, 'ha/redirect.html', {
|
||||
'primary': primary,
|
||||
'redirect_seconds': 30,
|
||||
'version': version,
|
||||
})
|
||||
|
||||
# Redirect to the base page of the primary instance.
|
||||
return HttpResponseRedirect('http://%s%s' % (primary.hostname, request.path))
|
||||
|
||||
class AuthTokenTimeoutMiddleware(object):
|
||||
"""Presume that when the user includes the auth header, they go through the
|
||||
authentication mechanism. Further, that mechanism is presumed to extend
|
||||
|
||||
23
awx/main/migrations/0034_v310_modify_ha_instance.py
Normal file
23
awx/main/migrations/0034_v310_modify_ha_instance.py
Normal file
@ -0,0 +1,23 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('main', '0032_v302_credential_permissions_update'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.RemoveField(
|
||||
model_name='instance',
|
||||
name='primary',
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='instance',
|
||||
name='uuid',
|
||||
field=models.CharField(max_length=40),
|
||||
),
|
||||
]
|
||||
19
awx/main/migrations/0035_v310_jobevent_uuid.py
Normal file
19
awx/main/migrations/0035_v310_jobevent_uuid.py
Normal file
@ -0,0 +1,19 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from __future__ import unicode_literals
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('main', '0033_v310_modify_ha_instance'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name='jobevent',
|
||||
name='uuid',
|
||||
field=models.CharField(default=b'', max_length=1024, editable=False),
|
||||
),
|
||||
]
|
||||
@ -22,9 +22,8 @@ class Instance(models.Model):
|
||||
"""
|
||||
objects = InstanceManager()
|
||||
|
||||
uuid = models.CharField(max_length=40, unique=True)
|
||||
uuid = models.CharField(max_length=40)
|
||||
hostname = models.CharField(max_length=250, unique=True)
|
||||
primary = models.BooleanField(default=False)
|
||||
created = models.DateTimeField(auto_now_add=True)
|
||||
modified = models.DateTimeField(auto_now=True)
|
||||
|
||||
@ -33,29 +32,8 @@ class Instance(models.Model):
|
||||
|
||||
@property
|
||||
def role(self):
|
||||
"""Return the role of this instance, as a string."""
|
||||
if self.primary:
|
||||
return 'primary'
|
||||
return 'secondary'
|
||||
|
||||
@functools.wraps(models.Model.save)
|
||||
def save(self, *args, **kwargs):
|
||||
"""Save the instance. If this is a secondary instance, then ensure
|
||||
that any currently-running jobs that this instance started are
|
||||
canceled.
|
||||
"""
|
||||
# Perform the normal save.
|
||||
result = super(Instance, self).save(*args, **kwargs)
|
||||
|
||||
# If this is not a primary instance, then kill any jobs that this
|
||||
# instance was responsible for starting.
|
||||
if not self.primary:
|
||||
for job in UnifiedJob.objects.filter(job_origin__instance=self,
|
||||
status__in=CAN_CANCEL):
|
||||
job.cancel()
|
||||
|
||||
# Return back the original result.
|
||||
return result
|
||||
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
|
||||
return "tower"
|
||||
|
||||
|
||||
class JobOrigin(models.Model):
|
||||
|
||||
@ -964,6 +964,11 @@ class JobEvent(CreatedModifiedModel):
|
||||
default=False,
|
||||
editable=False,
|
||||
)
|
||||
uuid = models.CharField(
|
||||
max_length=1024,
|
||||
default='',
|
||||
editable=False,
|
||||
)
|
||||
host = models.ForeignKey(
|
||||
'Host',
|
||||
related_name='job_events_as_primary_host',
|
||||
|
||||
@ -82,7 +82,7 @@ def celery_startup(conf=None, **kwargs):
|
||||
except Exception as e:
|
||||
logger.error("Failed to rebuild schedule {}: {}".format(sch, e))
|
||||
|
||||
@task()
|
||||
@task(queue='default')
|
||||
def send_notifications(notification_list, job_id=None):
|
||||
if not isinstance(notification_list, list):
|
||||
raise TypeError("notification_list should be of type list")
|
||||
@ -103,7 +103,7 @@ def send_notifications(notification_list, job_id=None):
|
||||
if job_id is not None:
|
||||
job_actual.notifications.add(notification)
|
||||
|
||||
@task(bind=True)
|
||||
@task(bind=True, queue='default')
|
||||
def run_administrative_checks(self):
|
||||
if not tower_settings.TOWER_ADMIN_ALERTS:
|
||||
return
|
||||
@ -124,11 +124,11 @@ def run_administrative_checks(self):
|
||||
tower_admin_emails,
|
||||
fail_silently=True)
|
||||
|
||||
@task(bind=True)
|
||||
@task(bind=True, queue='default')
|
||||
def cleanup_authtokens(self):
|
||||
AuthToken.objects.filter(expires__lt=now()).delete()
|
||||
|
||||
@task(bind=True)
|
||||
@task(bind=True, queue='default')
|
||||
def tower_periodic_scheduler(self):
|
||||
def get_last_run():
|
||||
if not os.path.exists(settings.SCHEDULE_METADATA_LOCATION):
|
||||
@ -179,7 +179,7 @@ def tower_periodic_scheduler(self):
|
||||
new_unified_job.socketio_emit_status("failed")
|
||||
emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id))
|
||||
|
||||
@task()
|
||||
@task(queue='default')
|
||||
def notify_task_runner(metadata_dict):
|
||||
"""Add the given task into the Tower task manager's queue, to be consumed
|
||||
by the task system.
|
||||
@ -187,7 +187,6 @@ def notify_task_runner(metadata_dict):
|
||||
queue = FifoQueue('tower_task_manager')
|
||||
queue.push(metadata_dict)
|
||||
|
||||
|
||||
def _send_notification_templates(instance, status_str):
|
||||
if status_str not in ['succeeded', 'failed']:
|
||||
raise ValueError("status_str must be either succeeded or failed")
|
||||
@ -203,7 +202,7 @@ def _send_notification_templates(instance, status_str):
|
||||
for n in all_notification_templates],
|
||||
job_id=instance.id)
|
||||
|
||||
@task(bind=True)
|
||||
@task(bind=True, queue='default')
|
||||
def handle_work_success(self, result, task_actual):
|
||||
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
|
||||
if not instance:
|
||||
@ -211,7 +210,7 @@ def handle_work_success(self, result, task_actual):
|
||||
|
||||
_send_notification_templates(instance, 'succeeded')
|
||||
|
||||
@task(bind=True)
|
||||
@task(bind=True, queue='default')
|
||||
def handle_work_error(self, task_id, subtasks=None):
|
||||
print('Executing error task id %s, subtasks: %s' %
|
||||
(str(self.request.id), str(subtasks)))
|
||||
@ -240,7 +239,7 @@ def handle_work_error(self, task_id, subtasks=None):
|
||||
if first_instance:
|
||||
_send_notification_templates(first_instance, 'failed')
|
||||
|
||||
@task()
|
||||
@task(queue='default')
|
||||
def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
|
||||
'''
|
||||
Signal handler and wrapper around inventory.update_computed_fields to
|
||||
@ -740,7 +739,8 @@ class RunJob(BaseTask):
|
||||
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path
|
||||
env['REST_API_URL'] = settings.INTERNAL_API_URL
|
||||
env['REST_API_TOKEN'] = job.task_auth_token or ''
|
||||
env['CALLBACK_CONSUMER_PORT'] = str(settings.CALLBACK_CONSUMER_PORT)
|
||||
env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE
|
||||
env['CALLBACK_CONNECTION'] = settings.BROKER_URL
|
||||
if getattr(settings, 'JOB_CALLBACK_DEBUG', False):
|
||||
env['JOB_CALLBACK_DEBUG'] = '2'
|
||||
elif settings.DEBUG:
|
||||
|
||||
@ -425,7 +425,7 @@ def get_system_task_capacity():
|
||||
|
||||
|
||||
def emit_websocket_notification(endpoint, event, payload, token_key=None):
|
||||
from awx.main.socket import Socket
|
||||
from awx.main.socket_queue import Socket
|
||||
|
||||
try:
|
||||
with Socket('websocket', 'w', nowait=True, logger=logger) as websocket:
|
||||
|
||||
@ -39,37 +39,16 @@ import pwd
|
||||
import urlparse
|
||||
import re
|
||||
from copy import deepcopy
|
||||
from uuid import uuid4
|
||||
|
||||
# Kombu
|
||||
from kombu import Connection, Exchange, Producer
|
||||
|
||||
# Requests
|
||||
import requests
|
||||
|
||||
# ZeroMQ
|
||||
import zmq
|
||||
|
||||
import psutil
|
||||
|
||||
# Only use statsd if there's a statsd host in the environment
|
||||
# otherwise just do a noop.
|
||||
# NOTE: I've disabled this for the time being until we sort through the venv dependency around this
|
||||
# if os.environ.get('GRAPHITE_PORT_8125_UDP_ADDR'):
|
||||
# from statsd import StatsClient
|
||||
# statsd = StatsClient(host=os.environ['GRAPHITE_PORT_8125_UDP_ADDR'],
|
||||
# port=8125,
|
||||
# prefix='tower.job.event_callback',
|
||||
# maxudpsize=512)
|
||||
# else:
|
||||
# from statsd import StatsClient
|
||||
# class NoStatsClient(StatsClient):
|
||||
# def __init__(self, *args, **kwargs):
|
||||
# pass
|
||||
# def _prepare(self, stat, value, rate):
|
||||
# pass
|
||||
# def _send_stat(self, stat, value, rate):
|
||||
# pass
|
||||
# def _send(self, *args, **kwargs):
|
||||
# pass
|
||||
# statsd = NoStatsClient()
|
||||
|
||||
CENSOR_FIELD_WHITELIST = [
|
||||
'msg',
|
||||
'failed',
|
||||
@ -124,6 +103,7 @@ class TokenAuth(requests.auth.AuthBase):
|
||||
return request
|
||||
|
||||
|
||||
# TODO: non v2_ events are deprecated and should be purge/refactored out
|
||||
class BaseCallbackModule(object):
|
||||
'''
|
||||
Callback module for logging ansible-playbook job events via the REST API.
|
||||
@ -132,12 +112,16 @@ class BaseCallbackModule(object):
|
||||
def __init__(self):
|
||||
self.base_url = os.getenv('REST_API_URL', '')
|
||||
self.auth_token = os.getenv('REST_API_TOKEN', '')
|
||||
self.callback_consumer_port = os.getenv('CALLBACK_CONSUMER_PORT', '')
|
||||
self.context = None
|
||||
self.socket = None
|
||||
self.callback_connection = os.getenv('CALLBACK_CONNECTION', None)
|
||||
self.connection_queue = os.getenv('CALLBACK_QUEUE', '')
|
||||
self.connection = None
|
||||
self.exchange = None
|
||||
self._init_logging()
|
||||
self._init_connection()
|
||||
self.counter = 0
|
||||
self.active_playbook = None
|
||||
self.active_play = None
|
||||
self.active_task = None
|
||||
|
||||
def _init_logging(self):
|
||||
try:
|
||||
@ -158,15 +142,11 @@ class BaseCallbackModule(object):
|
||||
self.logger.propagate = False
|
||||
|
||||
def _init_connection(self):
|
||||
self.context = None
|
||||
self.socket = None
|
||||
self.connection = None
|
||||
|
||||
def _start_connection(self):
|
||||
self.context = zmq.Context()
|
||||
self.socket = self.context.socket(zmq.REQ)
|
||||
self.socket.setsockopt(zmq.RCVTIMEO, 4000)
|
||||
self.socket.setsockopt(zmq.LINGER, 2000)
|
||||
self.socket.connect(self.callback_consumer_port)
|
||||
self.connection = Connection(self.callback_connection)
|
||||
self.exchange = Exchange(self.connection_queue, type='direct')
|
||||
|
||||
def _post_job_event_queue_msg(self, event, event_data):
|
||||
self.counter += 1
|
||||
@ -176,6 +156,29 @@ class BaseCallbackModule(object):
|
||||
'counter': self.counter,
|
||||
'created': datetime.datetime.utcnow().isoformat(),
|
||||
}
|
||||
if event in ('playbook_on_play_start',
|
||||
'playbook_on_stats',
|
||||
'playbook_on_vars_prompt'):
|
||||
msg['parent_uuid'] = str(self.active_playbook)
|
||||
elif event in ('playbook_on_notify',
|
||||
'playbook_on_setup',
|
||||
'playbook_on_task_start',
|
||||
'playbook_on_no_hosts_matched',
|
||||
'playbook_on_no_hosts_remaining',
|
||||
'playbook_on_include',
|
||||
'playbook_on_import_for_host',
|
||||
'playbook_on_not_import_for_host'):
|
||||
msg['parent_uuid'] = str(self.active_play)
|
||||
elif event.startswith('runner_on_') or event.startswith('runner_item_on_'):
|
||||
msg['parent_uuid'] = str(self.active_task)
|
||||
else:
|
||||
msg['parent_uuid'] = ''
|
||||
|
||||
if "uuid" in event_data:
|
||||
msg['uuid'] = str(event_data['uuid'])
|
||||
else:
|
||||
msg['uuid'] = ''
|
||||
|
||||
if getattr(self, 'job_id', None):
|
||||
msg['job_id'] = self.job_id
|
||||
if getattr(self, 'ad_hoc_command_id', None):
|
||||
@ -192,11 +195,16 @@ class BaseCallbackModule(object):
|
||||
self.connection_pid = active_pid
|
||||
if self.connection_pid != active_pid:
|
||||
self._init_connection()
|
||||
if self.context is None:
|
||||
if self.connection is None:
|
||||
self._start_connection()
|
||||
|
||||
self.socket.send_json(msg)
|
||||
self.socket.recv()
|
||||
producer = Producer(self.connection)
|
||||
producer.publish(msg,
|
||||
serializer='json',
|
||||
compression='bzip2',
|
||||
exchange=self.exchange,
|
||||
declare=[self.exchange],
|
||||
routing_key=self.connection_queue)
|
||||
return
|
||||
except Exception, e:
|
||||
self.logger.info('Publish Job Event Exception: %r, retry=%d', e,
|
||||
@ -230,7 +238,7 @@ class BaseCallbackModule(object):
|
||||
if 'res' in event_data:
|
||||
event_data['res'] = censor(deepcopy(event_data['res']))
|
||||
|
||||
if self.callback_consumer_port:
|
||||
if self.callback_connection:
|
||||
self._post_job_event_queue_msg(event, event_data)
|
||||
else:
|
||||
self._post_rest_api_event(event, event_data)
|
||||
@ -416,7 +424,9 @@ class JobCallbackModule(BaseCallbackModule):
|
||||
def v2_playbook_on_start(self, playbook):
|
||||
# NOTE: the playbook parameter was added late in Ansible 2.0 development
|
||||
# so we don't currently utilize but could later.
|
||||
self.playbook_on_start()
|
||||
# NOTE: Ansible doesn't generate a UUID for playbook_on_start so we'll do it for them
|
||||
self.active_playbook = str(uuid4())
|
||||
self._log_event('playbook_on_start', uuid=self.active_playbook)
|
||||
|
||||
def playbook_on_notify(self, host, handler):
|
||||
self._log_event('playbook_on_notify', host=host, handler=handler)
|
||||
@ -446,14 +456,16 @@ class JobCallbackModule(BaseCallbackModule):
|
||||
is_conditional=is_conditional)
|
||||
|
||||
def v2_playbook_on_task_start(self, task, is_conditional):
|
||||
self._log_event('playbook_on_task_start', task=task,
|
||||
self.active_task = task._uuid
|
||||
self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid),
|
||||
name=task.get_name(), is_conditional=is_conditional)
|
||||
|
||||
def v2_playbook_on_cleanup_task_start(self, task):
|
||||
# re-using playbook_on_task_start event here for this v2-specific
|
||||
# event, though we may consider any changes necessary to distinguish
|
||||
# this from a normal task
|
||||
self._log_event('playbook_on_task_start', task=task,
|
||||
self.active_task = task._uuid
|
||||
self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid),
|
||||
name=task.get_name())
|
||||
|
||||
def playbook_on_vars_prompt(self, varname, private=True, prompt=None,
|
||||
@ -507,7 +519,8 @@ class JobCallbackModule(BaseCallbackModule):
|
||||
play.name = ','.join(play.hosts)
|
||||
else:
|
||||
play.name = play.hosts
|
||||
self._log_event('playbook_on_play_start', name=play.name,
|
||||
self.active_play = play._uuid
|
||||
self._log_event('playbook_on_play_start', name=play.name, uuid=str(play._uuid),
|
||||
pattern=play.hosts)
|
||||
|
||||
def playbook_on_stats(self, stats):
|
||||
|
||||
@ -8,6 +8,9 @@ import ldap
|
||||
import djcelery
|
||||
from datetime import timedelta
|
||||
|
||||
from kombu import Queue, Exchange
|
||||
from kombu.common import Broadcast
|
||||
|
||||
# Update this module's local settings from the global settings module.
|
||||
from django.conf import global_settings
|
||||
this_module = sys.modules[__name__]
|
||||
@ -152,7 +155,6 @@ MIDDLEWARE_CLASSES = ( # NOQA
|
||||
'django.middleware.csrf.CsrfViewMiddleware',
|
||||
'django.contrib.auth.middleware.AuthenticationMiddleware',
|
||||
'django.contrib.messages.middleware.MessageMiddleware',
|
||||
'awx.main.middleware.HAMiddleware',
|
||||
'awx.main.middleware.ActivityStreamMiddleware',
|
||||
'awx.sso.middleware.SocialAuthMiddleware',
|
||||
'crum.CurrentRequestUserMiddleware',
|
||||
@ -327,6 +329,7 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
|
||||
djcelery.setup_loader()
|
||||
|
||||
BROKER_URL = 'redis://localhost/'
|
||||
CELERY_DEFAULT_QUEUE = 'default'
|
||||
CELERY_TASK_SERIALIZER = 'json'
|
||||
CELERY_RESULT_SERIALIZER = 'json'
|
||||
CELERY_ACCEPT_CONTENT = ['json']
|
||||
@ -336,6 +339,23 @@ CELERYD_TASK_SOFT_TIME_LIMIT = None
|
||||
CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler'
|
||||
CELERYBEAT_MAX_LOOP_INTERVAL = 60
|
||||
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
|
||||
CELERY_QUEUES = (
|
||||
Queue('default', Exchange('default'), routing_key='default'),
|
||||
Queue('jobs', Exchange('jobs'), routing_key='jobs'),
|
||||
# Projects use a fanout queue, this isn't super well supported
|
||||
Broadcast('projects'),
|
||||
)
|
||||
CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs',
|
||||
'routing_key': 'jobs'},
|
||||
'awx.main.tasks.run_project_update': {'queue': 'projects'},
|
||||
'awx.main.tasks.run_inventory_update': {'queue': 'jobs',
|
||||
'routing_key': 'jobs'},
|
||||
'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs',
|
||||
'routing_key': 'jobs'},
|
||||
'awx.main.tasks.run_system_job': {'queue': 'jobs',
|
||||
'routing_key': 'jobs'}
|
||||
})
|
||||
|
||||
CELERYBEAT_SCHEDULE = {
|
||||
'tower_scheduler': {
|
||||
'task': 'awx.main.tasks.tower_periodic_scheduler',
|
||||
|
||||
@ -41,6 +41,8 @@ if 'celeryd' in sys.argv:
|
||||
CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5557"
|
||||
CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver_dev.ipc"
|
||||
|
||||
CALLBACK_QUEUE = "callback_tasks"
|
||||
|
||||
# Enable PROOT for tower-qa integration tests
|
||||
AWX_PROOT_ENABLED = True
|
||||
|
||||
|
||||
@ -11,6 +11,36 @@
|
||||
###############################################################################
|
||||
# MISC PROJECT SETTINGS
|
||||
###############################################################################
|
||||
import os
|
||||
|
||||
def patch_broken_pipe_error():
|
||||
"""Monkey Patch BaseServer.handle_error to not write
|
||||
a stacktrace to stderr on broken pipe.
|
||||
http://stackoverflow.com/a/22618740/362702"""
|
||||
import sys
|
||||
from SocketServer import BaseServer
|
||||
from wsgiref import handlers
|
||||
|
||||
handle_error = BaseServer.handle_error
|
||||
log_exception = handlers.BaseHandler.log_exception
|
||||
|
||||
def is_broken_pipe_error():
|
||||
type, err, tb = sys.exc_info()
|
||||
return "Connection reset by peer" in repr(err)
|
||||
|
||||
def my_handle_error(self, request, client_address):
|
||||
if not is_broken_pipe_error():
|
||||
handle_error(self, request, client_address)
|
||||
|
||||
def my_log_exception(self, exc_info):
|
||||
if not is_broken_pipe_error():
|
||||
log_exception(self, exc_info)
|
||||
|
||||
BaseServer.handle_error = my_handle_error
|
||||
handlers.BaseHandler.log_exception = my_log_exception
|
||||
|
||||
patch_broken_pipe_error()
|
||||
|
||||
|
||||
ADMINS = (
|
||||
# ('Your Name', 'your_email@domain.com'),
|
||||
@ -49,7 +79,10 @@ if is_testing(sys.argv):
|
||||
MONGO_DB = 'system_tracking_test'
|
||||
|
||||
# Celery AMQP configuration.
|
||||
BROKER_URL = 'amqp://guest:guest@rabbitmq//'
|
||||
BROKER_URL = "amqp://{}:{}@{}/{}".format(os.environ.get("RABBITMQ_USER"),
|
||||
os.environ.get("RABBITMQ_PASS"),
|
||||
os.environ.get("RABBITMQ_HOST"),
|
||||
os.environ.get("RABBITMQ_VHOST"))
|
||||
|
||||
# Mongo host configuration
|
||||
MONGO_HOST = NotImplemented
|
||||
|
||||
@ -6,7 +6,7 @@ azure==2.0.0rc2
|
||||
Babel==2.2.0
|
||||
billiard==3.3.0.16
|
||||
boto==2.40.0
|
||||
celery==3.1.10
|
||||
celery==3.1.23
|
||||
cliff==1.15.0
|
||||
cmd2==0.6.8
|
||||
d2to1==0.2.11 # TODO: Still needed?
|
||||
@ -50,7 +50,7 @@ jsonpatch==1.12
|
||||
jsonpointer==1.10
|
||||
jsonschema==2.5.1
|
||||
keyring==4.1
|
||||
kombu==3.0.30
|
||||
kombu==3.0.35
|
||||
apache-libcloud==0.20.1
|
||||
lxml==3.4.4
|
||||
Markdown==2.4.1
|
||||
|
||||
@ -25,6 +25,7 @@ jsonpatch==1.12
|
||||
jsonpointer==1.10
|
||||
jsonschema==2.5.1
|
||||
keyring==4.1
|
||||
kombu==3.0.35
|
||||
lxml==3.4.4
|
||||
mock==1.0.1
|
||||
monotonic==0.6
|
||||
|
||||
@ -10,3 +10,4 @@ pytest-cov
|
||||
pytest-django
|
||||
pytest-pythonpath
|
||||
pytest-mock
|
||||
flower
|
||||
|
||||
63
tools/docker-compose-cluster.yml
Normal file
63
tools/docker-compose-cluster.yml
Normal file
@ -0,0 +1,63 @@
|
||||
version: '2'
|
||||
services:
|
||||
haproxy:
|
||||
build:
|
||||
context: ./docker-compose
|
||||
dockerfile: Dockerfile-haproxy
|
||||
depends_on:
|
||||
- "tower_1"
|
||||
- "tower_2"
|
||||
- "tower_3"
|
||||
ports:
|
||||
- "8013:8013"
|
||||
- "1936:1936"
|
||||
- "5555:5555"
|
||||
tower_1:
|
||||
image: gcr.io/ansible-tower-engineering/tower_devel:${TAG}
|
||||
hostname: tower_1
|
||||
environment:
|
||||
RABBITMQ_HOST: rabbitmq_1
|
||||
RABBITMQ_USER: guest
|
||||
RABBITMQ_PASS: guest
|
||||
RABBITMQ_VHOST: /
|
||||
volumes:
|
||||
- "../:/tower_devel"
|
||||
tower_2:
|
||||
image: gcr.io/ansible-tower-engineering/tower_devel:${TAG}
|
||||
hostname: tower_2
|
||||
environment:
|
||||
RABBITMQ_HOST: rabbitmq_2
|
||||
RABBITMQ_USER: guest
|
||||
RABBITMQ_PASS: guest
|
||||
RABBITMQ_VHOST: /
|
||||
volumes:
|
||||
- "../:/tower_devel"
|
||||
tower_3:
|
||||
image: gcr.io/ansible-tower-engineering/tower_devel:${TAG}
|
||||
hostname: tower_3
|
||||
environment:
|
||||
RABBITMQ_HOST: rabbitmq_3
|
||||
RABBITMQ_USER: guest
|
||||
RABBITMQ_PASS: guest
|
||||
RABBITMQ_VHOST: /
|
||||
volumes:
|
||||
- "../:/tower_devel"
|
||||
rabbitmq_1:
|
||||
image: gcr.io/ansible-tower-engineering/rabbit_cluster_node:latest
|
||||
hostname: rabbitmq_1
|
||||
rabbitmq_2:
|
||||
image: gcr.io/ansible-tower-engineering/rabbit_cluster_node:latest
|
||||
hostname: rabbitmq_2
|
||||
environment:
|
||||
- CLUSTERED=true
|
||||
- CLUSTER_WITH=rabbitmq_1
|
||||
rabbitmq_3:
|
||||
image: gcr.io/ansible-tower-engineering/rabbit_cluster_node:latest
|
||||
hostname: rabbitmq_3
|
||||
environment:
|
||||
- CLUSTERED=true
|
||||
- CLUSTER_WITH=rabbitmq_1
|
||||
postgres:
|
||||
image: postgres:9.4.1
|
||||
memcached:
|
||||
image: memcached:alpine
|
||||
@ -3,9 +3,16 @@ services:
|
||||
# Primary Tower Development Container
|
||||
tower:
|
||||
image: gcr.io/ansible-tower-engineering/tower_devel:${TAG}
|
||||
hostname: tower
|
||||
environment:
|
||||
RABBITMQ_HOST: rabbitmq
|
||||
RABBITMQ_USER: guest
|
||||
RABBITMQ_PASS: guest
|
||||
RABBITMQ_VHOST: /
|
||||
ports:
|
||||
- "8080:8080"
|
||||
- "8013:8013"
|
||||
- "5555:5555"
|
||||
links:
|
||||
- postgres
|
||||
- memcached
|
||||
@ -20,10 +27,8 @@ services:
|
||||
# Postgres Database Container
|
||||
postgres:
|
||||
image: postgres:9.4.1
|
||||
|
||||
memcached:
|
||||
image: memcached:alpine
|
||||
|
||||
rabbitmq:
|
||||
image: rabbitmq:3-management
|
||||
|
||||
|
||||
2
tools/docker-compose/Dockerfile-haproxy
Normal file
2
tools/docker-compose/Dockerfile-haproxy
Normal file
@ -0,0 +1,2 @@
|
||||
FROM haproxy:1.6-alpine
|
||||
COPY haproxy.cfg /usr/local/etc/haproxy/haproxy.cfg
|
||||
53
tools/docker-compose/haproxy.cfg
Normal file
53
tools/docker-compose/haproxy.cfg
Normal file
@ -0,0 +1,53 @@
|
||||
global
|
||||
debug
|
||||
stats socket /tmp/admin.sock
|
||||
stats timeout 30s
|
||||
|
||||
defaults
|
||||
log global
|
||||
mode http
|
||||
option httplog
|
||||
option dontlognull
|
||||
timeout connect 5000
|
||||
timeout client 50000
|
||||
timeout server 50000
|
||||
|
||||
frontend localnodes
|
||||
bind *:8013
|
||||
mode http
|
||||
default_backend nodes
|
||||
|
||||
frontend flower
|
||||
bind *:5555
|
||||
mode http
|
||||
default_backend flower_nodes
|
||||
|
||||
backend nodes
|
||||
mode http
|
||||
balance roundrobin
|
||||
option forwardfor
|
||||
option http-pretend-keepalive
|
||||
http-request set-header X-Forwarded-Port %[dst_port]
|
||||
http-request add-header X-Forwarded-Proto https if { ssl_fc }
|
||||
option httpchk HEAD / HTTP/1.1\r\nHost:localhost
|
||||
server tower_1 tower_1:8013 check
|
||||
server tower_2 tower_2:8013 check
|
||||
server tower_3 tower_3:8013 check
|
||||
|
||||
backend flower_nodes
|
||||
mode http
|
||||
balance roundrobin
|
||||
option forwardfor
|
||||
option http-pretend-keepalive
|
||||
http-request set-header X-Forwarded-Port %[dst_port]
|
||||
http-request add-header X-Forwarded-Proto https if { ssl_fc }
|
||||
#option httpchk HEAD / HTTP/1.1\r\nHost:localhost
|
||||
server tower_1 tower_1:5555
|
||||
server tower_2 tower_2:5555
|
||||
server tower_3 tower_3:5555
|
||||
|
||||
listen stats
|
||||
bind *:1936
|
||||
stats enable
|
||||
stats uri /
|
||||
|
||||
@ -4,7 +4,7 @@ set +x
|
||||
# Wait for the databases to come up
|
||||
ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=postgres port=5432" all
|
||||
ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=memcached port=11211" all
|
||||
ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=rabbitmq port=5672" all
|
||||
ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=${RABBITMQ_HOST} port=5672" all
|
||||
|
||||
# In case Tower in the container wants to connect to itself, use "docker exec" to attach to the container otherwise
|
||||
# TODO: FIX
|
||||
|
||||
@ -1,16 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
case $1 in
|
||||
config)
|
||||
cat <<'EOM'
|
||||
graph_title Callback Receiver Processes
|
||||
graph_vlabel num processes
|
||||
graph_category tower
|
||||
callbackr.label Callback Receiver Processes
|
||||
EOM
|
||||
exit 0;;
|
||||
esac
|
||||
|
||||
printf "callbackr.value "
|
||||
ps ax | grep run_callback_receiver | grep -v grep | wc -l
|
||||
printf "\n"
|
||||
@ -1,16 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
case $1 in
|
||||
config)
|
||||
cat <<'EOM'
|
||||
graph_title Celery Processes
|
||||
graph_vlabel num processes
|
||||
graph_category tower
|
||||
celeryd.label Celery Processes
|
||||
EOM
|
||||
exit 0;;
|
||||
esac
|
||||
|
||||
printf "celeryd.value "
|
||||
ps ax | grep celeryd | grep -v grep | wc -l
|
||||
printf "\n"
|
||||
@ -1,16 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
case $1 in
|
||||
config)
|
||||
cat <<'EOM'
|
||||
graph_title Postmaster Processes
|
||||
graph_vlabel num processes
|
||||
graph_category tower
|
||||
postmaster.label Postmaster Processes
|
||||
EOM
|
||||
exit 0;;
|
||||
esac
|
||||
|
||||
printf "postmaster.value "
|
||||
ps ax | grep postmaster | grep -v grep | wc -l
|
||||
printf "\n"
|
||||
@ -1,16 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
case $1 in
|
||||
config)
|
||||
cat <<'EOM'
|
||||
graph_title Redis Processes
|
||||
graph_vlabel num processes
|
||||
graph_category tower
|
||||
redis.label Redis Processes
|
||||
EOM
|
||||
exit 0;;
|
||||
esac
|
||||
|
||||
printf "redis.value "
|
||||
ps ax | grep redis | grep -v grep | wc -l
|
||||
printf "\n"
|
||||
@ -1,16 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
case $1 in
|
||||
config)
|
||||
cat <<'EOM'
|
||||
graph_title SocketIO Service Processes
|
||||
graph_vlabel num processes
|
||||
graph_category tower
|
||||
socketio.label SocketIO Service Processes
|
||||
EOM
|
||||
exit 0;;
|
||||
esac
|
||||
|
||||
printf "socketio.value "
|
||||
ps ax | grep run_socketio_service | grep -v grep | wc -l
|
||||
printf "\n"
|
||||
@ -1,16 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
case $1 in
|
||||
config)
|
||||
cat <<'EOM'
|
||||
graph_title Task Manager Processes
|
||||
graph_vlabel num processes
|
||||
graph_category tower
|
||||
taskm.label Task Manager Processes
|
||||
EOM
|
||||
exit 0;;
|
||||
esac
|
||||
|
||||
printf "taskm.value "
|
||||
ps ax | grep run_task_system | grep -v grep | wc -l
|
||||
printf "\n"
|
||||
@ -1,27 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
case $1 in
|
||||
config)
|
||||
cat <<'EOM'
|
||||
multigraph tower_jobs
|
||||
graph_title Running Jobs breakdown
|
||||
graph_vlabel job count
|
||||
graph_category tower
|
||||
running.label Running jobs
|
||||
waiting.label Waiting jobs
|
||||
pending.label Pending jobs
|
||||
EOM
|
||||
exit 0;;
|
||||
esac
|
||||
|
||||
printf "running.value "
|
||||
awx-manage stats --stat jobs_running
|
||||
printf "\n"
|
||||
|
||||
printf "waiting.value "
|
||||
awx-manage stats --stat jobs_waiting
|
||||
printf "\n"
|
||||
|
||||
printf "pending.value "
|
||||
awx-manage stats --stat jobs_pending
|
||||
printf "\n"
|
||||
Loading…
x
Reference in New Issue
Block a user