diff --git a/Makefile b/Makefile index 4323239aff..c1b7d582e1 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,8 @@ COMPOSE_TAG ?= devel # NOTE: This defaults the container image version to the branch that's active # COMPOSE_TAG ?= $(GIT_BRANCH) +COMPOSE_HOST ?= $(shell hostname) + VENV_BASE ?= /venv SCL_PREFIX ?= CELERY_SCHEDULE_FILE ?= /celerybeat-schedule @@ -328,7 +330,7 @@ init: if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - tower-manage register_instance --primary --hostname=127.0.0.1; \ + tower-manage register_instance --hostname=$(COMPOSE_HOST); \ # Refresh development environment after pulling new code. refresh: clean requirements_dev version_file develop migrate @@ -379,6 +381,12 @@ honcho: fi; \ honcho start +flower: + @if [ "$(VENV_BASE)" ]; then \ + . $(VENV_BASE)/tower/bin/activate; \ + fi; \ + $(PYTHON) manage.py celery flower --address=0.0.0.0 --port=5555 --broker=amqp://guest:guest@$(RABBITMQ_HOST):5672// + # Run the built-in development webserver (by default on http://localhost:8013). runserver: @if [ "$(VENV_BASE)" ]; then \ @@ -391,7 +399,8 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,2 -Ofair --schedule=$(CELERY_SCHEDULE_FILE) + $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default + #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) # Run to start the zeromq callback receiver receiver: @@ -404,7 +413,11 @@ taskmanager: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py run_task_system + if [ "$(COMPOSE_HOST)" == "tower_1" ] || [ "$(COMPOSE_HOST)" == "tower" ]; then \ + $(PYTHON) manage.py run_task_system; \ + else \ + while true; do sleep 2; done; \ + fi socketservice: @if [ "$(VENV_BASE)" ]; then \ @@ -749,6 +762,9 @@ docker-auth: docker-compose: docker-auth TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose.yml up --no-recreate +docker-compose-cluster: docker-auth + TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose-cluster.yml up + docker-compose-test: docker-auth cd tools && TAG=$(COMPOSE_TAG) docker-compose run --rm --service-ports tower /bin/bash diff --git a/Procfile b/Procfile index a301a6aa1a..433417f70b 100644 --- a/Procfile +++ b/Procfile @@ -4,3 +4,4 @@ taskmanager: make taskmanager receiver: make receiver socketservice: make socketservice factcacher: make factcacher +flower: make flower \ No newline at end of file diff --git a/awx/api/views.py b/awx/api/views.py index bdd0fb074d..efbbecf10e 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -171,28 +171,13 @@ class ApiV1PingView(APIView): # Most of this response is canned; just build the dictionary. response = { 'ha': is_ha_environment(), - 'role': Instance.objects.my_role(), 'version': get_awx_version(), } - # If this is an HA environment, we also include the IP address of - # all of the instances. - # - # Set up a default structure. - response['instances'] = { - 'primary': None, - 'secondaries': [], - } - - # Add all of the instances into the structure. + response['instances'] = [] for instance in Instance.objects.all(): - if instance.primary: - response['instances']['primary'] = instance.hostname - else: - response['instances']['secondaries'].append(instance.hostname) - response['instances']['secondaries'].sort() - - # Done; return the response. + response['instances'].append(instance.hostname) + response['instances'].sort() return Response(response) diff --git a/awx/main/management/commands/_base_instance.py b/awx/main/management/commands/_base_instance.py index c92fa3b640..807abfb76d 100644 --- a/awx/main/management/commands/_base_instance.py +++ b/awx/main/management/commands/_base_instance.py @@ -1,6 +1,7 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. +import socket from optparse import make_option from django.core.management.base import BaseCommand, CommandError @@ -21,13 +22,9 @@ class BaseCommandInstance(BaseCommand): def __init__(self): super(BaseCommandInstance, self).__init__() - self.enforce_primary_role = False - self.enforce_roles = False self.enforce_hostname_set = False self.enforce_unique_find = False - self.option_primary = False - self.option_secondary = False self.option_hostname = None self.option_uuid = None @@ -38,48 +35,24 @@ class BaseCommandInstance(BaseCommand): def generate_option_hostname(): return make_option('--hostname', dest='hostname', - default='', + default=socket.gethostname(), help='Find instance by specified hostname.') @staticmethod def generate_option_hostname_set(): return make_option('--hostname', dest='hostname', - default='', + default=socket.gethostname(), help='Hostname to assign to the new instance.') - @staticmethod - def generate_option_primary(): - return make_option('--primary', - action='store_true', - default=False, - dest='primary', - help='Register instance as primary.') - - @staticmethod - def generate_option_secondary(): - return make_option('--secondary', - action='store_true', - default=False, - dest='secondary', - help='Register instance as secondary.') - @staticmethod def generate_option_uuid(): + #TODO: Likely deprecated, maybe uuid becomes the cluster ident? return make_option('--uuid', dest='uuid', default='', help='Find instance by specified uuid.') - def include_option_primary_role(self): - BaseCommand.option_list += ( BaseCommandInstance.generate_option_primary(), ) - self.enforce_primary_role = True - - def include_options_roles(self): - self.include_option_primary_role() - BaseCommand.option_list += ( BaseCommandInstance.generate_option_secondary(), ) - self.enforce_roles = True - def include_option_hostname_set(self): BaseCommand.option_list += ( BaseCommandInstance.generate_option_hostname_set(), ) self.enforce_hostname_set = True @@ -94,12 +67,6 @@ class BaseCommandInstance(BaseCommand): def get_option_uuid(self): return self.option_uuid - def is_option_primary(self): - return self.option_primary - - def is_option_secondary(self): - return self.option_secondary - def get_UUID(self): return self.UUID @@ -109,31 +76,13 @@ class BaseCommandInstance(BaseCommand): @property def usage_error(self): - if self.enforce_roles and self.enforce_hostname_set: - return CommandError('--hostname and one of --primary or --secondary is required.') - elif self.enforce_hostname_set: + if self.enforce_hostname_set: return CommandError('--hostname is required.') - elif self.enforce_primary_role: - return CommandError('--primary is required.') - elif self.enforce_roles: - return CommandError('One of --primary or --secondary is required.') def handle(self, *args, **options): if self.enforce_hostname_set and self.enforce_unique_find: raise OptionEnforceError('Can not enforce --hostname as a setter and --hostname as a getter') - if self.enforce_roles: - self.option_primary = options['primary'] - self.option_secondary = options['secondary'] - - if self.is_option_primary() and self.is_option_secondary() or not (self.is_option_primary() or self.is_option_secondary()): - raise self.usage_error - elif self.enforce_primary_role: - if options['primary']: - self.option_primary = options['primary'] - else: - raise self.usage_error - if self.enforce_hostname_set: if options['hostname']: self.option_hostname = options['hostname'] @@ -162,11 +111,4 @@ class BaseCommandInstance(BaseCommand): @staticmethod def instance_str(instance): - return BaseCommandInstance.__instance_str(instance, ('uuid', 'hostname', 'role')) - - def update_projects(self, instance): - """Update all projects, ensuring the job runs against this instance, - which is the primary instance. - """ - for project in Project.objects.all(): - project.update() + return BaseCommandInstance.__instance_str(instance, ('uuid', 'hostname')) diff --git a/awx/main/management/commands/register_instance.py b/awx/main/management/commands/register_instance.py index 942eb9af4d..a7fc2f8011 100644 --- a/awx/main/management/commands/register_instance.py +++ b/awx/main/management/commands/register_instance.py @@ -9,22 +9,14 @@ from awx.main.models import Instance instance_str = BaseCommandInstance.instance_str class Command(BaseCommandInstance): - """Internal tower command. + """ + Internal tower command. Regsiter this instance with the database for HA tracking. This command is idempotent. - - This command will error out in the following conditions: - - * Attempting to register a secondary machine with no primary machines. - * Attempting to register a primary instance when a different primary - instance exists. - * Attempting to re-register an instance with changed values. """ def __init__(self): super(Command, self).__init__() - - self.include_options_roles() self.include_option_hostname_set() def handle(self, *args, **options): @@ -32,32 +24,10 @@ class Command(BaseCommandInstance): uuid = self.get_UUID() - # Is there an existing record for this machine? If so, retrieve that record and look for issues. - try: - instance = Instance.objects.get(uuid=uuid) - if instance.hostname != self.get_option_hostname(): - raise CommandError('Instance already registered with a different hostname %s.' % instance_str(instance)) - print("Instance already registered %s" % instance_str(instance)) - except Instance.DoesNotExist: - # Get a status on primary machines (excluding this one, regardless of its status). - other_instances = Instance.objects.exclude(uuid=uuid) - primaries = other_instances.filter(primary=True).count() - - # If this instance is being set to primary and a *different* primary machine alreadyexists, error out. - if self.is_option_primary() and primaries: - raise CommandError('Another instance is already registered as primary.') - - # Lastly, if there are no primary machines at all, then don't allow this to be registered as a secondary machine. - if self.is_option_secondary() and not primaries: - raise CommandError('Unable to register a secondary machine until another primary machine has been registered.') - - # Okay, we've checked for appropriate errata; perform the registration. - instance = Instance(uuid=uuid, primary=self.is_option_primary(), hostname=self.get_option_hostname()) - instance.save() - - # If this is a primary instance, update projects. - if instance.primary: - self.update_projects(instance) - - # Done! - print('Successfully registered instance %s.' % instance_str(instance)) + instance = Instance.objects.filter(hostname=self.get_option_hostname()) + if instance.exists(): + print("Instance already registered %s" % instance_str(instance[0])) + return + instance = Instance(uuid=uuid, hostname=self.get_option_hostname()) + instance.save() + print('Successfully registered instance %s.' % instance_str(instance)) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index e6080fa419..0e27ba06da 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -8,12 +8,17 @@ import datetime import logging import signal import time -from multiprocessing import Process, Queue -from Queue import Empty as QueueEmpty + +from kombu import Connection, Exchange, Queue +from kombu.mixins import ConsumerMixin +from kombu.log import get_logger +from kombu.utils import kwdict, reprcall +from kombu.utils.debug import setup_logging # Django from django.conf import settings from django.core.management.base import NoArgsCommand +from django.core.cache import cache from django.db import transaction, DatabaseError from django.utils.dateparse import parse_datetime from django.utils.timezone import FixedOffset @@ -21,160 +26,53 @@ from django.db import connection # AWX from awx.main.models import * # noqa -from awx.main.socket import Socket +from awx.main.socket_queue import Socket logger = logging.getLogger('awx.main.commands.run_callback_receiver') -WORKERS = 4 +class CallbackBrokerWorker(ConsumerMixin): -class CallbackReceiver(object): - def __init__(self): - self.parent_mappings = {} + def __init__(self, connection): + self.connection = connection - def run_subscriber(self, use_workers=True): - def shutdown_handler(active_workers): - def _handler(signum, frame): - try: - for active_worker in active_workers: - active_worker.terminate() - signal.signal(signum, signal.SIG_DFL) - os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it - except Exception: - # TODO: LOG - pass - return _handler + def get_consumers(self, Consumer, channel): + return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, + Exchange(settings.CALLBACK_QUEUE, type='direct'), + routing_key=settings.CALLBACK_QUEUE)], + accept=['json'], + callbacks=[self.process_task])] - def check_pre_handle(data): - event = data.get('event', '') - if event == 'playbook_on_play_start': - return True - return False - - worker_queues = [] - - if use_workers: - connection.close() - for idx in range(WORKERS): - queue_actual = Queue(settings.JOB_EVENT_MAX_QUEUE_SIZE) - w = Process(target=self.callback_worker, args=(queue_actual, idx,)) - w.start() - if settings.DEBUG: - logger.info('Started worker %s' % str(idx)) - worker_queues.append([0, queue_actual, w]) - elif settings.DEBUG: - logger.warn('Started callback receiver (no workers)') - - main_process = Process( - target=self.callback_handler, - args=(use_workers, worker_queues,) - ) - main_process.daemon = True - main_process.start() - - signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - while True: - workers_changed = False - idx = 0 - for queue_worker in worker_queues: - if not queue_worker[2].is_alive(): - logger.warn("Worker %s was not alive, restarting" % str(queue_worker)) - workers_changed = True - queue_worker[2].join() - w = Process(target=self.callback_worker, args=(queue_worker[1], idx,)) - w.daemon = True - w.start() - signal.signal(signal.SIGINT, shutdown_handler([w])) - signal.signal(signal.SIGTERM, shutdown_handler([w])) - queue_worker[2] = w - idx += 1 - if workers_changed: - signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - if not main_process.is_alive(): - logger.error("Main process is not alive") - for queue_worker in worker_queues: - queue_worker[2].terminate() - break - time.sleep(0.1) - - def write_queue_worker(self, preferred_queue, worker_queues, message): - queue_order = sorted(range(WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0) - for queue_actual in queue_order: - try: - worker_actual = worker_queues[queue_actual] - worker_actual[1].put(message, block=True, timeout=2) - worker_actual[0] += 1 - return queue_actual - except Exception: - logger.warn("Could not write to queue %s" % preferred_queue) - continue - return None - - def callback_handler(self, use_workers, worker_queues): - total_messages = 0 - last_parent_events = {} - with Socket('callbacks', 'r') as callbacks: - for message in callbacks.listen(): - total_messages += 1 - if 'ad_hoc_command_id' in message: - self.process_ad_hoc_event(message) - elif not use_workers: - self.process_job_event(message) - else: - job_parent_events = last_parent_events.get(message['job_id'], {}) - if message['event'] in ('playbook_on_play_start', 'playbook_on_stats', 'playbook_on_vars_prompt'): - parent = job_parent_events.get('playbook_on_start', None) - elif message['event'] in ('playbook_on_notify', - 'playbook_on_setup', - 'playbook_on_task_start', - 'playbook_on_no_hosts_matched', - 'playbook_on_no_hosts_remaining', - 'playbook_on_include', - 'playbook_on_import_for_host', - 'playbook_on_not_import_for_host'): - parent = job_parent_events.get('playbook_on_play_start', None) - elif message['event'].startswith('runner_on_') or message['event'].startswith('runner_item_on_'): - list_parents = [] - list_parents.append(job_parent_events.get('playbook_on_setup', None)) - list_parents.append(job_parent_events.get('playbook_on_task_start', None)) - list_parents = sorted(filter(lambda x: x is not None, list_parents), cmp=lambda x, y: y.id - x.id) - parent = list_parents[0] if len(list_parents) > 0 else None - else: - parent = None - if parent is not None: - message['parent'] = parent.id - if 'created' in message: - del(message['created']) - if message['event'] in ('playbook_on_start', 'playbook_on_play_start', - 'playbook_on_setup', 'playbook_on_task_start'): - job_parent_events[message['event']] = self.process_job_event(message) - else: - if message['event'] == 'playbook_on_stats': - job_parent_events = {} - - actual_queue = self.write_queue_worker(total_messages % WORKERS, worker_queues, message) - # NOTE: It might be better to recycle the entire callback receiver process if one or more of the queues are too full - # the drawback is that if we under extremely high load we may be legitimately taking a while to process messages - if actual_queue is None: - logger.error("All queues full!") - sys.exit(1) - last_parent_events[message['job_id']] = job_parent_events - - @transaction.atomic - def process_job_event(self, data): - # Sanity check: Do we need to do anything at all? - event = data.get('event', '') - parent_id = data.get('parent', None) - if not event or 'job_id' not in data: - return + def process_task(self, body, message): + try: + if "event" not in body: + raise Exception("Payload does not have an event") + if "job_id" not in body: + raise Exception("Payload does not have a job_id") + if settings.DEBUG: + logger.info("Body: {}".format(body)) + logger.info("Message: {}".format(message)) + self.process_job_event(body) + except Exception as exc: + import traceback + traceback.print_exc() + logger.error('Callback Task Processor Raised Exception: %r', exc) + message.ack() + def process_job_event(self, payload): # Get the correct "verbose" value from the job. # If for any reason there's a problem, just use 0. + if 'ad_hoc_command_id' in payload: + event_type_key = 'ad_hoc_command_id' + event_object_type = AdHocCommand + else: + event_type_key = 'job_id' + event_object_type = Job + try: - verbose = Job.objects.get(id=data['job_id']).verbosity + verbose = event_object_type.objects.get(id=payload[event_type_key]).verbosity except Exception as e: - verbose = 0 + verbose=0 + # TODO: cache # Convert the datetime for the job event's creation appropriately, # and include a time zone for it. @@ -182,120 +80,58 @@ class CallbackReceiver(object): # In the event of any issue, throw it out, and Django will just save # the current time. try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) + if not isinstance(payload['created'], datetime.datetime): + payload['created'] = parse_datetime(payload['created']) + if not payload['created'].tzinfo: + payload['created'] = payload['created'].replace(tzinfo=FixedOffset(0)) except (KeyError, ValueError): - data.pop('created', None) + payload.pop('created', None) - # Print the data to stdout if we're in DEBUG mode. - if settings.DEBUG: - print(data) + event_uuid = payload.get("uuid", '') + parent_event_uuid = payload.get("parent_uuid", '') # Sanity check: Don't honor keys that we don't recognize. - for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', - 'created', 'counter'): - data.pop(key) + for key in payload.keys(): + if key not in (event_type_key, 'event', 'event_data', + 'created', 'counter', 'uuid'): + payload.pop(key) - # Save any modifications to the job event to the database. - # If we get a database error of some kind, bail out. try: # If we're not in verbose mode, wipe out any module # arguments. - res = data['event_data'].get('res', {}) + res = payload['event_data'].get('res', {}) if isinstance(res, dict): i = res.get('invocation', {}) if verbose == 0 and 'module_args' in i: i['module_args'] = '' - # Create a new JobEvent object. - job_event = JobEvent(**data) - if parent_id is not None: - job_event.parent = JobEvent.objects.get(id=parent_id) - job_event.save(post_process=True) - - # Retrun the job event object. - return job_event + if 'ad_hoc_command_id' in payload: + ad_hoc_command_event = AdHocCommandEvent.objects.create(**data) + return + + j = JobEvent(**payload) + if payload['event'] == 'playbook_on_start': + j.save() + cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300) + return + else: + if parent_event_uuid: + parent_id = cache.get("{}_{}".format(payload['job_id'], parent_event_uuid), None) + if parent_id is None: + parent_id_obj = JobEvent.objects.filter(uuid=parent_event_uuid, job_id=payload['job_id']) + if parent_id_obj.exists(): #Problematic if not there, means the parent hasn't been written yet... TODO + j.parent_id = parent_id_obj[0].id + print("Settings cache: {}_{} with value {}".format(payload['job_id'], parent_event_uuid, j.parent_id)) + cache.set("{}_{}".format(payload['job_id'], parent_event_uuid), j.parent_id, 300) + else: + print("Cache hit") + j.parent_id = parent_id + j.save() + if event_uuid: + cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300) except DatabaseError as e: - # Log the error and bail out. - logger.error('Database error saving job event: %s', e) - return None + logger.error("Database Error Saving Job Event: {}".format(e)) - @transaction.atomic - def process_ad_hoc_event(self, data): - # Sanity check: Do we need to do anything at all? - event = data.get('event', '') - if not event or 'ad_hoc_command_id' not in data: - return - - # Get the correct "verbose" value from the job. - # If for any reason there's a problem, just use 0. - try: - verbose = AdHocCommand.objects.get(id=data['ad_hoc_command_id']).verbosity - except Exception as e: - verbose = 0 - - # Convert the datetime for the job event's creation appropriately, - # and include a time zone for it. - # - # In the event of any issue, throw it out, and Django will just save - # the current time. - try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) - except (KeyError, ValueError): - data.pop('created', None) - - # Print the data to stdout if we're in DEBUG mode. - if settings.DEBUG: - print(data) - - # Sanity check: Don't honor keys that we don't recognize. - for key in data.keys(): - if key not in ('ad_hoc_command_id', 'event', 'event_data', - 'created', 'counter'): - data.pop(key) - - # Save any modifications to the ad hoc command event to the database. - # If we get a database error of some kind, bail out. - try: - # If we're not in verbose mode, wipe out any module - # arguments. FIXME: Needed for adhoc? - res = data['event_data'].get('res', {}) - if isinstance(res, dict): - i = res.get('invocation', {}) - if verbose == 0 and 'module_args' in i: - i['module_args'] = '' - - # Create a new AdHocCommandEvent object. - ad_hoc_command_event = AdHocCommandEvent.objects.create(**data) - - # Retrun the ad hoc comamnd event object. - return ad_hoc_command_event - except DatabaseError as e: - # Log the error and bail out. - logger.error('Database error saving ad hoc command event: %s', e) - return None - - def callback_worker(self, queue_actual, idx): - messages_processed = 0 - while True: - try: - message = queue_actual.get(block=True, timeout=1) - except QueueEmpty: - continue - except Exception as e: - logger.error("Exception on listen socket, restarting: " + str(e)) - break - self.process_job_event(message) - messages_processed += 1 - if messages_processed >= settings.JOB_EVENT_RECYCLE_THRESHOLD: - logger.info("Shutting down message receiver") - break class Command(NoArgsCommand): ''' @@ -306,9 +142,10 @@ class Command(NoArgsCommand): help = 'Launch the job callback receiver' def handle_noargs(self, **options): - cr = CallbackReceiver() - try: - cr.run_subscriber() - except KeyboardInterrupt: - pass + with Connection(settings.BROKER_URL) as conn: + try: + worker = CallbackBrokerWorker(conn) + worker.run() + except KeyboardInterrupt: + print('Terminating Callback Receiver') diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py index 4241a2000c..90252e2a7d 100644 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ b/awx/main/management/commands/run_fact_cache_receiver.py @@ -14,7 +14,7 @@ from django.utils import timezone # AWX from awx.main.models.fact import Fact from awx.main.models.inventory import Host -from awx.main.socket import Socket +from awx.main.socket_queue import Socket logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver') diff --git a/awx/main/management/commands/run_socketio_service.py b/awx/main/management/commands/run_socketio_service.py index 0e3df4ccaf..4c233aa312 100644 --- a/awx/main/management/commands/run_socketio_service.py +++ b/awx/main/management/commands/run_socketio_service.py @@ -16,7 +16,7 @@ from django.core.management.base import NoArgsCommand # AWX import awx from awx.main.models import * # noqa -from awx.main.socket import Socket +from awx.main.socket_queue import Socket # socketio from socketio import socketio_manage diff --git a/awx/main/managers.py b/awx/main/managers.py index 4825a74cf8..ca4578daf4 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -2,6 +2,7 @@ # All Rights Reserved. import sys +import socket from django.conf import settings from django.db import models @@ -28,31 +29,12 @@ class InstanceManager(models.Manager): # If we are running unit tests, return a stub record. if len(sys.argv) >= 2 and sys.argv[1] == 'test': return self.model(id=1, primary=True, + hostname='localhost', uuid='00000000-0000-0000-0000-000000000000') # Return the appropriate record from the database. - return self.get(uuid=settings.SYSTEM_UUID) + return self.get(hostname=socket.gethostname()) def my_role(self): - """Return the role of the currently active instance, as a string - ('primary' or 'secondary'). - """ - # If we are running unit tests, we are primary, because reasons. - if len(sys.argv) >= 2 and sys.argv[1] == 'test': - return 'primary' - - # Check if this instance is primary; if so, return "primary", otherwise - # "secondary". - if self.me().primary: - return 'primary' - return 'secondary' - - def primary(self): - """Return the primary instance.""" - # If we are running unit tests, return a stub record. - if len(sys.argv) >= 2 and sys.argv[1] == 'test': - return self.model(id=1, primary=True, - uuid='00000000-0000-0000-0000-000000000000') - - # Return the appropriate record from the database. - return self.get(primary=True) + # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing + return "tower" diff --git a/awx/main/middleware.py b/awx/main/middleware.py index 37903886ac..fda98f1176 100644 --- a/awx/main/middleware.py +++ b/awx/main/middleware.py @@ -71,41 +71,6 @@ class ActivityStreamMiddleware(threading.local): if instance.id not in self.instance_ids: self.instance_ids.append(instance.id) - -class HAMiddleware(object): - """A middleware class that checks to see whether the request is being - served on a secondary instance, and redirects the request back to the - primary instance if so. - """ - def process_request(self, request): - """Process the request, and redirect if this is a request on a - secondary node. - """ - # Is this the primary node? If so, we can just return None and be done; - # we just want normal behavior in this case. - if Instance.objects.my_role() == 'primary': - return None - - # Always allow the /ping/ endpoint. - if request.path.startswith('/api/v1/ping'): - return None - - # Get the primary instance. - primary = Instance.objects.primary() - - # If this is a request to /, then we return a special landing page that - # informs the user that they are on the secondary instance and will - # be redirected. - if request.path == '/': - return TemplateResponse(request, 'ha/redirect.html', { - 'primary': primary, - 'redirect_seconds': 30, - 'version': version, - }) - - # Redirect to the base page of the primary instance. - return HttpResponseRedirect('http://%s%s' % (primary.hostname, request.path)) - class AuthTokenTimeoutMiddleware(object): """Presume that when the user includes the auth header, they go through the authentication mechanism. Further, that mechanism is presumed to extend diff --git a/awx/main/migrations/0034_v310_modify_ha_instance.py b/awx/main/migrations/0034_v310_modify_ha_instance.py new file mode 100644 index 0000000000..e4321f0235 --- /dev/null +++ b/awx/main/migrations/0034_v310_modify_ha_instance.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0032_v302_credential_permissions_update'), + ] + + operations = [ + migrations.RemoveField( + model_name='instance', + name='primary', + ), + migrations.AlterField( + model_name='instance', + name='uuid', + field=models.CharField(max_length=40), + ), + ] diff --git a/awx/main/migrations/0035_v310_jobevent_uuid.py b/awx/main/migrations/0035_v310_jobevent_uuid.py new file mode 100644 index 0000000000..4feade28ef --- /dev/null +++ b/awx/main/migrations/0035_v310_jobevent_uuid.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0033_v310_modify_ha_instance'), + ] + + operations = [ + migrations.AddField( + model_name='jobevent', + name='uuid', + field=models.CharField(default=b'', max_length=1024, editable=False), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 3725e6afe5..a645c318e4 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -22,9 +22,8 @@ class Instance(models.Model): """ objects = InstanceManager() - uuid = models.CharField(max_length=40, unique=True) + uuid = models.CharField(max_length=40) hostname = models.CharField(max_length=250, unique=True) - primary = models.BooleanField(default=False) created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) @@ -33,29 +32,8 @@ class Instance(models.Model): @property def role(self): - """Return the role of this instance, as a string.""" - if self.primary: - return 'primary' - return 'secondary' - - @functools.wraps(models.Model.save) - def save(self, *args, **kwargs): - """Save the instance. If this is a secondary instance, then ensure - that any currently-running jobs that this instance started are - canceled. - """ - # Perform the normal save. - result = super(Instance, self).save(*args, **kwargs) - - # If this is not a primary instance, then kill any jobs that this - # instance was responsible for starting. - if not self.primary: - for job in UnifiedJob.objects.filter(job_origin__instance=self, - status__in=CAN_CANCEL): - job.cancel() - - # Return back the original result. - return result + # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing + return "tower" class JobOrigin(models.Model): diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 8d47ed7938..9356884a19 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -964,6 +964,11 @@ class JobEvent(CreatedModifiedModel): default=False, editable=False, ) + uuid = models.CharField( + max_length=1024, + default='', + editable=False, + ) host = models.ForeignKey( 'Host', related_name='job_events_as_primary_host', diff --git a/awx/main/socket.py b/awx/main/socket_queue.py similarity index 100% rename from awx/main/socket.py rename to awx/main/socket_queue.py diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e94b0e1f66..8c8f564917 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -82,7 +82,7 @@ def celery_startup(conf=None, **kwargs): except Exception as e: logger.error("Failed to rebuild schedule {}: {}".format(sch, e)) -@task() +@task(queue='default') def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -103,7 +103,7 @@ def send_notifications(notification_list, job_id=None): if job_id is not None: job_actual.notifications.add(notification) -@task(bind=True) +@task(bind=True, queue='default') def run_administrative_checks(self): if not tower_settings.TOWER_ADMIN_ALERTS: return @@ -124,11 +124,11 @@ def run_administrative_checks(self): tower_admin_emails, fail_silently=True) -@task(bind=True) +@task(bind=True, queue='default') def cleanup_authtokens(self): AuthToken.objects.filter(expires__lt=now()).delete() -@task(bind=True) +@task(bind=True, queue='default') def tower_periodic_scheduler(self): def get_last_run(): if not os.path.exists(settings.SCHEDULE_METADATA_LOCATION): @@ -179,7 +179,7 @@ def tower_periodic_scheduler(self): new_unified_job.socketio_emit_status("failed") emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id)) -@task() +@task(queue='default') def notify_task_runner(metadata_dict): """Add the given task into the Tower task manager's queue, to be consumed by the task system. @@ -187,7 +187,6 @@ def notify_task_runner(metadata_dict): queue = FifoQueue('tower_task_manager') queue.push(metadata_dict) - def _send_notification_templates(instance, status_str): if status_str not in ['succeeded', 'failed']: raise ValueError("status_str must be either succeeded or failed") @@ -203,7 +202,7 @@ def _send_notification_templates(instance, status_str): for n in all_notification_templates], job_id=instance.id) -@task(bind=True) +@task(bind=True, queue='default') def handle_work_success(self, result, task_actual): instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) if not instance: @@ -211,7 +210,7 @@ def handle_work_success(self, result, task_actual): _send_notification_templates(instance, 'succeeded') -@task(bind=True) +@task(bind=True, queue='default') def handle_work_error(self, task_id, subtasks=None): print('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks))) @@ -240,7 +239,7 @@ def handle_work_error(self, task_id, subtasks=None): if first_instance: _send_notification_templates(first_instance, 'failed') -@task() +@task(queue='default') def update_inventory_computed_fields(inventory_id, should_update_hosts=True): ''' Signal handler and wrapper around inventory.update_computed_fields to @@ -740,7 +739,8 @@ class RunJob(BaseTask): env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_TOKEN'] = job.task_auth_token or '' - env['CALLBACK_CONSUMER_PORT'] = str(settings.CALLBACK_CONSUMER_PORT) + env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE + env['CALLBACK_CONNECTION'] = settings.BROKER_URL if getattr(settings, 'JOB_CALLBACK_DEBUG', False): env['JOB_CALLBACK_DEBUG'] = '2' elif settings.DEBUG: diff --git a/awx/main/utils.py b/awx/main/utils.py index 63235ffca3..270d62e50f 100644 --- a/awx/main/utils.py +++ b/awx/main/utils.py @@ -425,7 +425,7 @@ def get_system_task_capacity(): def emit_websocket_notification(endpoint, event, payload, token_key=None): - from awx.main.socket import Socket + from awx.main.socket_queue import Socket try: with Socket('websocket', 'w', nowait=True, logger=logger) as websocket: diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index a9c5b712ed..abec176b2f 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -39,37 +39,16 @@ import pwd import urlparse import re from copy import deepcopy +from uuid import uuid4 + +# Kombu +from kombu import Connection, Exchange, Producer # Requests import requests -# ZeroMQ -import zmq - import psutil -# Only use statsd if there's a statsd host in the environment -# otherwise just do a noop. -# NOTE: I've disabled this for the time being until we sort through the venv dependency around this -# if os.environ.get('GRAPHITE_PORT_8125_UDP_ADDR'): -# from statsd import StatsClient -# statsd = StatsClient(host=os.environ['GRAPHITE_PORT_8125_UDP_ADDR'], -# port=8125, -# prefix='tower.job.event_callback', -# maxudpsize=512) -# else: -# from statsd import StatsClient -# class NoStatsClient(StatsClient): -# def __init__(self, *args, **kwargs): -# pass -# def _prepare(self, stat, value, rate): -# pass -# def _send_stat(self, stat, value, rate): -# pass -# def _send(self, *args, **kwargs): -# pass -# statsd = NoStatsClient() - CENSOR_FIELD_WHITELIST = [ 'msg', 'failed', @@ -124,6 +103,7 @@ class TokenAuth(requests.auth.AuthBase): return request +# TODO: non v2_ events are deprecated and should be purge/refactored out class BaseCallbackModule(object): ''' Callback module for logging ansible-playbook job events via the REST API. @@ -132,12 +112,16 @@ class BaseCallbackModule(object): def __init__(self): self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') - self.callback_consumer_port = os.getenv('CALLBACK_CONSUMER_PORT', '') - self.context = None - self.socket = None + self.callback_connection = os.getenv('CALLBACK_CONNECTION', None) + self.connection_queue = os.getenv('CALLBACK_QUEUE', '') + self.connection = None + self.exchange = None self._init_logging() self._init_connection() self.counter = 0 + self.active_playbook = None + self.active_play = None + self.active_task = None def _init_logging(self): try: @@ -158,15 +142,11 @@ class BaseCallbackModule(object): self.logger.propagate = False def _init_connection(self): - self.context = None - self.socket = None + self.connection = None def _start_connection(self): - self.context = zmq.Context() - self.socket = self.context.socket(zmq.REQ) - self.socket.setsockopt(zmq.RCVTIMEO, 4000) - self.socket.setsockopt(zmq.LINGER, 2000) - self.socket.connect(self.callback_consumer_port) + self.connection = Connection(self.callback_connection) + self.exchange = Exchange(self.connection_queue, type='direct') def _post_job_event_queue_msg(self, event, event_data): self.counter += 1 @@ -176,6 +156,29 @@ class BaseCallbackModule(object): 'counter': self.counter, 'created': datetime.datetime.utcnow().isoformat(), } + if event in ('playbook_on_play_start', + 'playbook_on_stats', + 'playbook_on_vars_prompt'): + msg['parent_uuid'] = str(self.active_playbook) + elif event in ('playbook_on_notify', + 'playbook_on_setup', + 'playbook_on_task_start', + 'playbook_on_no_hosts_matched', + 'playbook_on_no_hosts_remaining', + 'playbook_on_include', + 'playbook_on_import_for_host', + 'playbook_on_not_import_for_host'): + msg['parent_uuid'] = str(self.active_play) + elif event.startswith('runner_on_') or event.startswith('runner_item_on_'): + msg['parent_uuid'] = str(self.active_task) + else: + msg['parent_uuid'] = '' + + if "uuid" in event_data: + msg['uuid'] = str(event_data['uuid']) + else: + msg['uuid'] = '' + if getattr(self, 'job_id', None): msg['job_id'] = self.job_id if getattr(self, 'ad_hoc_command_id', None): @@ -192,11 +195,16 @@ class BaseCallbackModule(object): self.connection_pid = active_pid if self.connection_pid != active_pid: self._init_connection() - if self.context is None: + if self.connection is None: self._start_connection() - self.socket.send_json(msg) - self.socket.recv() + producer = Producer(self.connection) + producer.publish(msg, + serializer='json', + compression='bzip2', + exchange=self.exchange, + declare=[self.exchange], + routing_key=self.connection_queue) return except Exception, e: self.logger.info('Publish Job Event Exception: %r, retry=%d', e, @@ -230,7 +238,7 @@ class BaseCallbackModule(object): if 'res' in event_data: event_data['res'] = censor(deepcopy(event_data['res'])) - if self.callback_consumer_port: + if self.callback_connection: self._post_job_event_queue_msg(event, event_data) else: self._post_rest_api_event(event, event_data) @@ -416,7 +424,9 @@ class JobCallbackModule(BaseCallbackModule): def v2_playbook_on_start(self, playbook): # NOTE: the playbook parameter was added late in Ansible 2.0 development # so we don't currently utilize but could later. - self.playbook_on_start() + # NOTE: Ansible doesn't generate a UUID for playbook_on_start so we'll do it for them + self.active_playbook = str(uuid4()) + self._log_event('playbook_on_start', uuid=self.active_playbook) def playbook_on_notify(self, host, handler): self._log_event('playbook_on_notify', host=host, handler=handler) @@ -446,14 +456,16 @@ class JobCallbackModule(BaseCallbackModule): is_conditional=is_conditional) def v2_playbook_on_task_start(self, task, is_conditional): - self._log_event('playbook_on_task_start', task=task, + self.active_task = task._uuid + self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid), name=task.get_name(), is_conditional=is_conditional) def v2_playbook_on_cleanup_task_start(self, task): # re-using playbook_on_task_start event here for this v2-specific # event, though we may consider any changes necessary to distinguish # this from a normal task - self._log_event('playbook_on_task_start', task=task, + self.active_task = task._uuid + self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid), name=task.get_name()) def playbook_on_vars_prompt(self, varname, private=True, prompt=None, @@ -507,7 +519,8 @@ class JobCallbackModule(BaseCallbackModule): play.name = ','.join(play.hosts) else: play.name = play.hosts - self._log_event('playbook_on_play_start', name=play.name, + self.active_play = play._uuid + self._log_event('playbook_on_play_start', name=play.name, uuid=str(play._uuid), pattern=play.hosts) def playbook_on_stats(self, stats): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 89389c02c7..6633785229 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -8,6 +8,9 @@ import ldap import djcelery from datetime import timedelta +from kombu import Queue, Exchange +from kombu.common import Broadcast + # Update this module's local settings from the global settings module. from django.conf import global_settings this_module = sys.modules[__name__] @@ -152,7 +155,6 @@ MIDDLEWARE_CLASSES = ( # NOQA 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', - 'awx.main.middleware.HAMiddleware', 'awx.main.middleware.ActivityStreamMiddleware', 'awx.sso.middleware.SocialAuthMiddleware', 'crum.CurrentRequestUserMiddleware', @@ -327,6 +329,7 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') djcelery.setup_loader() BROKER_URL = 'redis://localhost/' +CELERY_DEFAULT_QUEUE = 'default' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] @@ -336,6 +339,23 @@ CELERYD_TASK_SOFT_TIME_LIMIT = None CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' +CELERY_QUEUES = ( + Queue('default', Exchange('default'), routing_key='default'), + Queue('jobs', Exchange('jobs'), routing_key='jobs'), + # Projects use a fanout queue, this isn't super well supported + Broadcast('projects'), +) +CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_project_update': {'queue': 'projects'}, + 'awx.main.tasks.run_inventory_update': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_system_job': {'queue': 'jobs', + 'routing_key': 'jobs'} +}) + CELERYBEAT_SCHEDULE = { 'tower_scheduler': { 'task': 'awx.main.tasks.tower_periodic_scheduler', diff --git a/awx/settings/development.py b/awx/settings/development.py index 4bebee113c..2140d2a856 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -41,6 +41,8 @@ if 'celeryd' in sys.argv: CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5557" CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver_dev.ipc" +CALLBACK_QUEUE = "callback_tasks" + # Enable PROOT for tower-qa integration tests AWX_PROOT_ENABLED = True diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index c85d89cb21..4c20102746 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -11,6 +11,36 @@ ############################################################################### # MISC PROJECT SETTINGS ############################################################################### +import os + +def patch_broken_pipe_error(): + """Monkey Patch BaseServer.handle_error to not write + a stacktrace to stderr on broken pipe. + http://stackoverflow.com/a/22618740/362702""" + import sys + from SocketServer import BaseServer + from wsgiref import handlers + + handle_error = BaseServer.handle_error + log_exception = handlers.BaseHandler.log_exception + + def is_broken_pipe_error(): + type, err, tb = sys.exc_info() + return "Connection reset by peer" in repr(err) + + def my_handle_error(self, request, client_address): + if not is_broken_pipe_error(): + handle_error(self, request, client_address) + + def my_log_exception(self, exc_info): + if not is_broken_pipe_error(): + log_exception(self, exc_info) + + BaseServer.handle_error = my_handle_error + handlers.BaseHandler.log_exception = my_log_exception + +patch_broken_pipe_error() + ADMINS = ( # ('Your Name', 'your_email@domain.com'), @@ -49,7 +79,10 @@ if is_testing(sys.argv): MONGO_DB = 'system_tracking_test' # Celery AMQP configuration. -BROKER_URL = 'amqp://guest:guest@rabbitmq//' +BROKER_URL = "amqp://{}:{}@{}/{}".format(os.environ.get("RABBITMQ_USER"), + os.environ.get("RABBITMQ_PASS"), + os.environ.get("RABBITMQ_HOST"), + os.environ.get("RABBITMQ_VHOST")) # Mongo host configuration MONGO_HOST = NotImplemented diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 1a3ba9e7f3..433ae22e00 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -6,7 +6,7 @@ azure==2.0.0rc2 Babel==2.2.0 billiard==3.3.0.16 boto==2.40.0 -celery==3.1.10 +celery==3.1.23 cliff==1.15.0 cmd2==0.6.8 d2to1==0.2.11 # TODO: Still needed? @@ -50,7 +50,7 @@ jsonpatch==1.12 jsonpointer==1.10 jsonschema==2.5.1 keyring==4.1 -kombu==3.0.30 +kombu==3.0.35 apache-libcloud==0.20.1 lxml==3.4.4 Markdown==2.4.1 diff --git a/requirements/requirements_ansible.txt b/requirements/requirements_ansible.txt index e62e3e5ead..f11125b5f9 100644 --- a/requirements/requirements_ansible.txt +++ b/requirements/requirements_ansible.txt @@ -25,6 +25,7 @@ jsonpatch==1.12 jsonpointer==1.10 jsonschema==2.5.1 keyring==4.1 +kombu==3.0.35 lxml==3.4.4 mock==1.0.1 monotonic==0.6 diff --git a/requirements/requirements_dev.txt b/requirements/requirements_dev.txt index 5fa57df995..9e0d516959 100644 --- a/requirements/requirements_dev.txt +++ b/requirements/requirements_dev.txt @@ -10,3 +10,4 @@ pytest-cov pytest-django pytest-pythonpath pytest-mock +flower diff --git a/tools/docker-compose-cluster.yml b/tools/docker-compose-cluster.yml new file mode 100644 index 0000000000..1b1dee4041 --- /dev/null +++ b/tools/docker-compose-cluster.yml @@ -0,0 +1,63 @@ +version: '2' +services: + haproxy: + build: + context: ./docker-compose + dockerfile: Dockerfile-haproxy + depends_on: + - "tower_1" + - "tower_2" + - "tower_3" + ports: + - "8013:8013" + - "1936:1936" + - "5555:5555" + tower_1: + image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + hostname: tower_1 + environment: + RABBITMQ_HOST: rabbitmq_1 + RABBITMQ_USER: guest + RABBITMQ_PASS: guest + RABBITMQ_VHOST: / + volumes: + - "../:/tower_devel" + tower_2: + image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + hostname: tower_2 + environment: + RABBITMQ_HOST: rabbitmq_2 + RABBITMQ_USER: guest + RABBITMQ_PASS: guest + RABBITMQ_VHOST: / + volumes: + - "../:/tower_devel" + tower_3: + image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + hostname: tower_3 + environment: + RABBITMQ_HOST: rabbitmq_3 + RABBITMQ_USER: guest + RABBITMQ_PASS: guest + RABBITMQ_VHOST: / + volumes: + - "../:/tower_devel" + rabbitmq_1: + image: gcr.io/ansible-tower-engineering/rabbit_cluster_node:latest + hostname: rabbitmq_1 + rabbitmq_2: + image: gcr.io/ansible-tower-engineering/rabbit_cluster_node:latest + hostname: rabbitmq_2 + environment: + - CLUSTERED=true + - CLUSTER_WITH=rabbitmq_1 + rabbitmq_3: + image: gcr.io/ansible-tower-engineering/rabbit_cluster_node:latest + hostname: rabbitmq_3 + environment: + - CLUSTERED=true + - CLUSTER_WITH=rabbitmq_1 + postgres: + image: postgres:9.4.1 + memcached: + image: memcached:alpine diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index ee58ae8dd9..3c8f1def1f 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -3,9 +3,16 @@ services: # Primary Tower Development Container tower: image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + hostname: tower + environment: + RABBITMQ_HOST: rabbitmq + RABBITMQ_USER: guest + RABBITMQ_PASS: guest + RABBITMQ_VHOST: / ports: - "8080:8080" - "8013:8013" + - "5555:5555" links: - postgres - memcached @@ -20,10 +27,8 @@ services: # Postgres Database Container postgres: image: postgres:9.4.1 - memcached: image: memcached:alpine - rabbitmq: image: rabbitmq:3-management diff --git a/tools/docker-compose/Dockerfile-haproxy b/tools/docker-compose/Dockerfile-haproxy new file mode 100644 index 0000000000..9d38924939 --- /dev/null +++ b/tools/docker-compose/Dockerfile-haproxy @@ -0,0 +1,2 @@ +FROM haproxy:1.6-alpine +COPY haproxy.cfg /usr/local/etc/haproxy/haproxy.cfg diff --git a/tools/docker-compose/haproxy.cfg b/tools/docker-compose/haproxy.cfg new file mode 100644 index 0000000000..01d3c94a4a --- /dev/null +++ b/tools/docker-compose/haproxy.cfg @@ -0,0 +1,53 @@ +global + debug + stats socket /tmp/admin.sock + stats timeout 30s + +defaults + log global + mode http + option httplog + option dontlognull + timeout connect 5000 + timeout client 50000 + timeout server 50000 + +frontend localnodes + bind *:8013 + mode http + default_backend nodes + +frontend flower + bind *:5555 + mode http + default_backend flower_nodes + +backend nodes + mode http + balance roundrobin + option forwardfor + option http-pretend-keepalive + http-request set-header X-Forwarded-Port %[dst_port] + http-request add-header X-Forwarded-Proto https if { ssl_fc } + option httpchk HEAD / HTTP/1.1\r\nHost:localhost + server tower_1 tower_1:8013 check + server tower_2 tower_2:8013 check + server tower_3 tower_3:8013 check + +backend flower_nodes + mode http + balance roundrobin + option forwardfor + option http-pretend-keepalive + http-request set-header X-Forwarded-Port %[dst_port] + http-request add-header X-Forwarded-Proto https if { ssl_fc } + #option httpchk HEAD / HTTP/1.1\r\nHost:localhost + server tower_1 tower_1:5555 + server tower_2 tower_2:5555 + server tower_3 tower_3:5555 + +listen stats + bind *:1936 + stats enable + stats uri / + diff --git a/tools/docker-compose/start_development.sh b/tools/docker-compose/start_development.sh index 96812974fa..d0191dc2f4 100755 --- a/tools/docker-compose/start_development.sh +++ b/tools/docker-compose/start_development.sh @@ -4,7 +4,7 @@ set +x # Wait for the databases to come up ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=postgres port=5432" all ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=memcached port=11211" all -ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=rabbitmq port=5672" all +ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=${RABBITMQ_HOST} port=5672" all # In case Tower in the container wants to connect to itself, use "docker exec" to attach to the container otherwise # TODO: FIX diff --git a/tools/munin_monitors/callbackr_alive b/tools/munin_monitors/callbackr_alive deleted file mode 100755 index 25fb029be8..0000000000 --- a/tools/munin_monitors/callbackr_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title Callback Receiver Processes -graph_vlabel num processes -graph_category tower -callbackr.label Callback Receiver Processes -EOM - exit 0;; -esac - -printf "callbackr.value " -ps ax | grep run_callback_receiver | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/celery_alive b/tools/munin_monitors/celery_alive deleted file mode 100755 index d96bdedf41..0000000000 --- a/tools/munin_monitors/celery_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title Celery Processes -graph_vlabel num processes -graph_category tower -celeryd.label Celery Processes -EOM - exit 0;; -esac - -printf "celeryd.value " -ps ax | grep celeryd | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/postgres_alive b/tools/munin_monitors/postgres_alive deleted file mode 100755 index 2a8115dcb6..0000000000 --- a/tools/munin_monitors/postgres_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title Postmaster Processes -graph_vlabel num processes -graph_category tower -postmaster.label Postmaster Processes -EOM - exit 0;; -esac - -printf "postmaster.value " -ps ax | grep postmaster | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/redis_alive b/tools/munin_monitors/redis_alive deleted file mode 100755 index 3f3573a006..0000000000 --- a/tools/munin_monitors/redis_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title Redis Processes -graph_vlabel num processes -graph_category tower -redis.label Redis Processes -EOM - exit 0;; -esac - -printf "redis.value " -ps ax | grep redis | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/socketio_alive b/tools/munin_monitors/socketio_alive deleted file mode 100755 index d035be40ea..0000000000 --- a/tools/munin_monitors/socketio_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title SocketIO Service Processes -graph_vlabel num processes -graph_category tower -socketio.label SocketIO Service Processes -EOM - exit 0;; -esac - -printf "socketio.value " -ps ax | grep run_socketio_service | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/taskmanager_alive b/tools/munin_monitors/taskmanager_alive deleted file mode 100755 index 25b2054208..0000000000 --- a/tools/munin_monitors/taskmanager_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title Task Manager Processes -graph_vlabel num processes -graph_category tower -taskm.label Task Manager Processes -EOM - exit 0;; -esac - -printf "taskm.value " -ps ax | grep run_task_system | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/tower_jobs b/tools/munin_monitors/tower_jobs deleted file mode 100755 index 8781fc6b76..0000000000 --- a/tools/munin_monitors/tower_jobs +++ /dev/null @@ -1,27 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -multigraph tower_jobs -graph_title Running Jobs breakdown -graph_vlabel job count -graph_category tower -running.label Running jobs -waiting.label Waiting jobs -pending.label Pending jobs -EOM - exit 0;; -esac - -printf "running.value " -awx-manage stats --stat jobs_running -printf "\n" - -printf "waiting.value " -awx-manage stats --stat jobs_waiting -printf "\n" - -printf "pending.value " -awx-manage stats --stat jobs_pending -printf "\n"