From 3f0311a969fc0da45778bb074cb9e683547df5a6 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 26 Aug 2016 14:58:20 -0400 Subject: [PATCH 01/12] Integrate packaging for qpid/memcached Also * Remove redis packaging * Fix typo in contributing --- requirements/requirements.txt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 34176e14ed..ea7376531e 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -21,8 +21,6 @@ django-extensions==1.5.9 git+https://github.com/chrismeyersfsu/django-jsonbfield@fix-sqlite_serialization#egg=jsonbfield django-polymorphic==0.7.2 django-radius==1.0.0 -# NOTE: Remove when we transition packaging -django-redis-cache==1.6.5 djangorestframework==3.3.2 djangorestframework-yaml==1.0.2 django-split-settings==0.1.1 @@ -115,8 +113,6 @@ rackspace-auth-openstack==1.3 rackspace-novaclient==1.5 rax-default-network-flags-python-novaclient-ext==0.3.2 rax-scheduled-images-python-novaclient-ext==0.3.1 -# NOTE: Remove this when we transition packaging -redis==2.10.3 requests-oauthlib==0.5.0 requests==2.9.1 requestsexceptions==1.1.1 From c112fc3cf4d4f99763246748e7ccf85045a47d11 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 30 Aug 2016 10:46:13 -0400 Subject: [PATCH 02/12] Update qpid packaging, remove migrations * Remove old system migrations * Update qpid install deps for RH --- pytest.ini | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pytest.ini b/pytest.ini index 03c814599c..2993b1f577 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,7 +1,7 @@ [pytest] DJANGO_SETTINGS_MODULE = awx.settings.development -python_paths = venv/tower/lib/python2.7/site-packages -site_dirs = venv/tower/lib/python2.7/site-packages +python_paths = /venv/tower/lib/python2.7/site-packages +site_dirs = /venv/tower/lib/python2.7/site-packages python_files = *.py addopts = --reuse-db --nomigrations --tb=native markers = From f0b762267249f4d5e1bc00b31249ee940c7e61c5 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 31 Aug 2016 14:47:20 -0400 Subject: [PATCH 03/12] Update development environment for rabbit --- awx/settings/local_settings.py.docker_compose | 2 +- requirements/requirements.txt | 1 - requirements/requirements_dev.txt | 1 - tools/docker-compose.yml | 7 +++---- tools/docker-compose/start_development.sh | 2 +- 5 files changed, 5 insertions(+), 8 deletions(-) diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index e4b47f2ebc..e1169e4291 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -64,7 +64,7 @@ else: } # Celery AMQP configuration. -BROKER_URL = 'qpid://qpid:5672' +BROKER_URL = 'amqp://guest:guest@rabbitmq//' # Mongo host configuration MONGO_HOST = NotImplemented diff --git a/requirements/requirements.txt b/requirements/requirements.txt index ea7376531e..1a3ba9e7f3 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -108,7 +108,6 @@ python-troveclient==1.4.0 pytz==2015.7 PyYAML==3.11 pyzmq==14.5.0 -qpid-python==0.32.1 rackspace-auth-openstack==1.3 rackspace-novaclient==1.5 rax-default-network-flags-python-novaclient-ext==0.3.2 diff --git a/requirements/requirements_dev.txt b/requirements/requirements_dev.txt index d848638306..d7906ce28f 100644 --- a/requirements/requirements_dev.txt +++ b/requirements/requirements_dev.txt @@ -10,4 +10,3 @@ pytest-cov pytest-django pytest-pythonpath pytest-mock -qpid-tools diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index 638926b727..73a5c12bb9 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -9,7 +9,7 @@ services: links: - postgres - memcached - - qpid + - rabbitmq # - sync # volumes_from: # - sync @@ -23,9 +23,8 @@ services: memcached: image: memcached:alpine - qpid: - image: fedora/qpid:latest - entrypoint: qpidd --auth=no + rabbitmq: + image: rabbitmq:3-management # Source Code Synchronization Container # sync: diff --git a/tools/docker-compose/start_development.sh b/tools/docker-compose/start_development.sh index b75ef757c1..96812974fa 100755 --- a/tools/docker-compose/start_development.sh +++ b/tools/docker-compose/start_development.sh @@ -4,7 +4,7 @@ set +x # Wait for the databases to come up ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=postgres port=5432" all ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=memcached port=11211" all -ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=qpid port=5672" all +ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=rabbitmq port=5672" all # In case Tower in the container wants to connect to itself, use "docker exec" to attach to the container otherwise # TODO: FIX From f3b7fe18ce8b304feb5fb0b3fe4d0d4b14ed5a33 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Thu, 1 Sep 2016 11:41:20 -0400 Subject: [PATCH 04/12] Integrate memcached into setup playbook --- awx/settings/defaults.py | 6 +++--- awx/settings/local_settings.py.docker_compose | 15 --------------- 2 files changed, 3 insertions(+), 18 deletions(-) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 18e44cdc72..97157e4acd 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -351,7 +351,7 @@ CELERYBEAT_SCHEDULE = { }, } -# Use Redis as cache backend (except when testing). +# Django Caching Configuration if is_testing(): CACHES = { 'default': { @@ -361,8 +361,8 @@ if is_testing(): else: CACHES = { 'default': { - 'BACKEND': 'redis_cache.RedisCache', - 'LOCATION': BROKER_URL, + 'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache', + 'LOCATION': 'memcached:11211', }, } diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index e1169e4291..c85d89cb21 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -48,21 +48,6 @@ if is_testing(sys.argv): MONGO_DB = 'system_tracking_test' -# Django Caching Configuration -if is_testing(): - CACHES = { - 'default': { - 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', - }, - } -else: - CACHES = { - 'default': { - 'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache', - 'LOCATION': 'memcached:11211', - }, - } - # Celery AMQP configuration. BROKER_URL = 'amqp://guest:guest@rabbitmq//' From ebf103f345b5f4662074a5189cdccf2e57940cb3 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Thu, 8 Sep 2016 10:18:14 -0400 Subject: [PATCH 05/12] Initial Docker Compose workflow for Tower cluster The goal is to share a common pattern with the existing development work --- Makefile | 3 + awx/settings/local_settings.py.docker_compose | 35 ++++++++++- tools/docker-compose-cluster.yml | 59 +++++++++++++++++++ tools/docker-compose.yml | 5 ++ tools/docker-compose/Dockerfile-haproxy | 2 + tools/docker-compose/haproxy.cfg | 36 +++++++++++ 6 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 tools/docker-compose-cluster.yml create mode 100644 tools/docker-compose/Dockerfile-haproxy create mode 100644 tools/docker-compose/haproxy.cfg diff --git a/Makefile b/Makefile index 0dc2f5bbf1..8a320cd469 100644 --- a/Makefile +++ b/Makefile @@ -731,6 +731,9 @@ docker-auth: docker-compose: docker-auth TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose.yml up --no-recreate +docker-compose-cluster: docker-auth + TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose-cluster.yml up + docker-compose-test: docker-auth cd tools && TAG=$(COMPOSE_TAG) docker-compose run --rm --service-ports tower /bin/bash diff --git a/awx/settings/local_settings.py.docker_compose b/awx/settings/local_settings.py.docker_compose index c85d89cb21..4c20102746 100644 --- a/awx/settings/local_settings.py.docker_compose +++ b/awx/settings/local_settings.py.docker_compose @@ -11,6 +11,36 @@ ############################################################################### # MISC PROJECT SETTINGS ############################################################################### +import os + +def patch_broken_pipe_error(): + """Monkey Patch BaseServer.handle_error to not write + a stacktrace to stderr on broken pipe. + http://stackoverflow.com/a/22618740/362702""" + import sys + from SocketServer import BaseServer + from wsgiref import handlers + + handle_error = BaseServer.handle_error + log_exception = handlers.BaseHandler.log_exception + + def is_broken_pipe_error(): + type, err, tb = sys.exc_info() + return "Connection reset by peer" in repr(err) + + def my_handle_error(self, request, client_address): + if not is_broken_pipe_error(): + handle_error(self, request, client_address) + + def my_log_exception(self, exc_info): + if not is_broken_pipe_error(): + log_exception(self, exc_info) + + BaseServer.handle_error = my_handle_error + handlers.BaseHandler.log_exception = my_log_exception + +patch_broken_pipe_error() + ADMINS = ( # ('Your Name', 'your_email@domain.com'), @@ -49,7 +79,10 @@ if is_testing(sys.argv): MONGO_DB = 'system_tracking_test' # Celery AMQP configuration. -BROKER_URL = 'amqp://guest:guest@rabbitmq//' +BROKER_URL = "amqp://{}:{}@{}/{}".format(os.environ.get("RABBITMQ_USER"), + os.environ.get("RABBITMQ_PASS"), + os.environ.get("RABBITMQ_HOST"), + os.environ.get("RABBITMQ_VHOST")) # Mongo host configuration MONGO_HOST = NotImplemented diff --git a/tools/docker-compose-cluster.yml b/tools/docker-compose-cluster.yml new file mode 100644 index 0000000000..0ac007d96a --- /dev/null +++ b/tools/docker-compose-cluster.yml @@ -0,0 +1,59 @@ +version: '2' +services: + haproxy: + build: + context: ./docker-compose + dockerfile: Dockerfile-haproxy + depends_on: + - "tower_1" + - "tower_2" + - "tower_3" + ports: + - "8013:8013" + - "1936:1936" + tower_1: + image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + environment: + RABBITMQ_HOST: rabbitmq_1 + RABBITMQ_USER: guest + RABBITMQ_PASS: guest + RABBITMQ_VHOST: / + volumes: + - "../:/tower_devel" + tower_2: + image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + environment: + RABBITMQ_HOST: rabbitmq_2 + RABBITMQ_USER: guest + RABBITMQ_PASS: guest + RABBITMQ_VHOST: / + volumes: + - "../:/tower_devel" + tower_3: + image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + environment: + RABBITMQ_HOST: rabbitmq_3 + RABBITMQ_USER: guest + RABBITMQ_PASS: guest + RABBITMQ_VHOST: / + volumes: + - "../:/tower_devel" + rabbitmq_1: + image: gcr.io/ansible-tower-engineering/rabbit_cluster_node:latest + hostname: rabbitmq_1 + rabbitmq_2: + image: gcr.io/ansible-tower-engineering/rabbit_cluster_node:latest + hostname: rabbitmq_2 + environment: + - CLUSTERED=true + - CLUSTER_WITH=rabbitmq_1 + rabbitmq_3: + image: gcr.io/ansible-tower-engineering/rabbit_cluster_node:latest + hostname: rabbitmq_3 + environment: + - CLUSTERED=true + - CLUSTER_WITH=rabbitmq_1 + postgres: + image: postgres:9.4.1 + memcached: + image: memcached:alpine diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index 73a5c12bb9..d3804a1c95 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -3,6 +3,11 @@ services: # Primary Tower Development Container tower: image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + environment: + RABBITMQ_HOST: rabbitmq + RABBITMQ_USER: guest + RABBITMQ_PASS: guest + RABBITMQ_VHOST: tower ports: - "8080:8080" - "8013:8013" diff --git a/tools/docker-compose/Dockerfile-haproxy b/tools/docker-compose/Dockerfile-haproxy new file mode 100644 index 0000000000..9d38924939 --- /dev/null +++ b/tools/docker-compose/Dockerfile-haproxy @@ -0,0 +1,2 @@ +FROM haproxy:1.6-alpine +COPY haproxy.cfg /usr/local/etc/haproxy/haproxy.cfg diff --git a/tools/docker-compose/haproxy.cfg b/tools/docker-compose/haproxy.cfg new file mode 100644 index 0000000000..cfbb3965f7 --- /dev/null +++ b/tools/docker-compose/haproxy.cfg @@ -0,0 +1,36 @@ +global + debug + stats socket /tmp/admin.sock + stats timeout 30s + +defaults + log global + mode http + option httplog + option dontlognull + timeout connect 5000 + timeout client 50000 + timeout server 50000 + +frontend localnodes + bind *:8013 + mode http + default_backend nodes + +backend nodes + mode http + balance roundrobin + option forwardfor + option http-pretend-keepalive + http-request set-header X-Forwarded-Port %[dst_port] + http-request add-header X-Forwarded-Proto https if { ssl_fc } + option httpchk HEAD / HTTP/1.1\r\nHost:localhost + server tower_1 tower_1:8013 check + server tower_2 tower_2:8013 check + server tower_3 tower_3:8013 check + +listen stats + bind *:1936 + stats enable + stats uri / + From eafb6c92b58b88dffd7625493a0900338ee8790d Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Thu, 8 Sep 2016 11:02:51 -0400 Subject: [PATCH 06/12] Docker compose improvements * Make sure we explicitly set a hostname for tower nodes * Switch rabbit vhost to use the root --- tools/docker-compose-cluster.yml | 3 +++ tools/docker-compose.yml | 5 ++--- tools/docker-compose/start_development.sh | 2 +- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/tools/docker-compose-cluster.yml b/tools/docker-compose-cluster.yml index 0ac007d96a..86027f8849 100644 --- a/tools/docker-compose-cluster.yml +++ b/tools/docker-compose-cluster.yml @@ -13,6 +13,7 @@ services: - "1936:1936" tower_1: image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + hostname: tower_1 environment: RABBITMQ_HOST: rabbitmq_1 RABBITMQ_USER: guest @@ -22,6 +23,7 @@ services: - "../:/tower_devel" tower_2: image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + hostname: tower_2 environment: RABBITMQ_HOST: rabbitmq_2 RABBITMQ_USER: guest @@ -31,6 +33,7 @@ services: - "../:/tower_devel" tower_3: image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + hostname: tower_3 environment: RABBITMQ_HOST: rabbitmq_3 RABBITMQ_USER: guest diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index d3804a1c95..f34bb25766 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -3,11 +3,12 @@ services: # Primary Tower Development Container tower: image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} + hostname: tower environment: RABBITMQ_HOST: rabbitmq RABBITMQ_USER: guest RABBITMQ_PASS: guest - RABBITMQ_VHOST: tower + RABBITMQ_VHOST: / ports: - "8080:8080" - "8013:8013" @@ -24,10 +25,8 @@ services: # Postgres Database Container postgres: image: postgres:9.4.1 - memcached: image: memcached:alpine - rabbitmq: image: rabbitmq:3-management diff --git a/tools/docker-compose/start_development.sh b/tools/docker-compose/start_development.sh index 96812974fa..d0191dc2f4 100755 --- a/tools/docker-compose/start_development.sh +++ b/tools/docker-compose/start_development.sh @@ -4,7 +4,7 @@ set +x # Wait for the databases to come up ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=postgres port=5432" all ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=memcached port=11211" all -ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=rabbitmq port=5672" all +ansible -i "127.0.0.1," -c local -v -m wait_for -a "host=${RABBITMQ_HOST} port=5672" all # In case Tower in the container wants to connect to itself, use "docker exec" to attach to the container otherwise # TODO: FIX From 0c1e1fa2fbae45ebae84b04c18bf8551023aad50 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Thu, 8 Sep 2016 13:37:53 -0400 Subject: [PATCH 07/12] Refactor Tower HA Instance logic and models * Gut the HA middleware * Purge concept of primary and secondary. * UUID is not the primary host identifier, now it's based mostly on the username. Some work probably still left to do to make sure this is legit. Also removed unique constraint from the uuid field. This might become the cluster ident now... or it may just deprecate * No more secondary -> primary redirection * Initial revision of /api/v1/ping * Revise and gut tower-manage register_instance * Rename awx/main/socket.py to awx/main/socket_queue.py to prevent conflict with the "socket" module from python base * Revist/gut the Instance manager... not sure if this manager is really needed anymore --- Makefile | 4 +- awx/api/views.py | 21 +----- .../management/commands/_base_instance.py | 70 ++----------------- .../management/commands/register_instance.py | 48 +++---------- .../commands/run_callback_receiver.py | 2 +- .../commands/run_fact_cache_receiver.py | 2 +- .../commands/run_socketio_service.py | 2 +- awx/main/managers.py | 28 ++------ awx/main/middleware.py | 35 ---------- .../0033_v310_modify_ha_instance.py | 23 ++++++ awx/main/models/ha.py | 28 +------- awx/main/{socket.py => socket_queue.py} | 0 awx/main/utils.py | 2 +- awx/settings/defaults.py | 1 - 14 files changed, 56 insertions(+), 210 deletions(-) create mode 100644 awx/main/migrations/0033_v310_modify_ha_instance.py rename awx/main/{socket.py => socket_queue.py} (100%) diff --git a/Makefile b/Makefile index 8a320cd469..beea1eebd6 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,8 @@ COMPOSE_TAG ?= devel # NOTE: This defaults the container image version to the branch that's active # COMPOSE_TAG ?= $(GIT_BRANCH) +COMPOSE_HOST ?= $(shell hostname) + VENV_BASE ?= /venv SCL_PREFIX ?= CELERY_SCHEDULE_FILE ?= /celerybeat-schedule @@ -325,7 +327,7 @@ init: if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - tower-manage register_instance --primary --hostname=127.0.0.1; \ + tower-manage register_instance --hostname=$(COMPOSE_HOST); \ # Refresh development environment after pulling new code. refresh: clean requirements_dev version_file develop migrate diff --git a/awx/api/views.py b/awx/api/views.py index 39db4cd23c..a5e1b4fa00 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -166,28 +166,13 @@ class ApiV1PingView(APIView): # Most of this response is canned; just build the dictionary. response = { 'ha': is_ha_environment(), - 'role': Instance.objects.my_role(), 'version': get_awx_version(), } - # If this is an HA environment, we also include the IP address of - # all of the instances. - # - # Set up a default structure. - response['instances'] = { - 'primary': None, - 'secondaries': [], - } - - # Add all of the instances into the structure. + response['instances'] = [] for instance in Instance.objects.all(): - if instance.primary: - response['instances']['primary'] = instance.hostname - else: - response['instances']['secondaries'].append(instance.hostname) - response['instances']['secondaries'].sort() - - # Done; return the response. + response['instances'].append(instance.hostname) + response['instances'].sort() return Response(response) diff --git a/awx/main/management/commands/_base_instance.py b/awx/main/management/commands/_base_instance.py index c92fa3b640..807abfb76d 100644 --- a/awx/main/management/commands/_base_instance.py +++ b/awx/main/management/commands/_base_instance.py @@ -1,6 +1,7 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. +import socket from optparse import make_option from django.core.management.base import BaseCommand, CommandError @@ -21,13 +22,9 @@ class BaseCommandInstance(BaseCommand): def __init__(self): super(BaseCommandInstance, self).__init__() - self.enforce_primary_role = False - self.enforce_roles = False self.enforce_hostname_set = False self.enforce_unique_find = False - self.option_primary = False - self.option_secondary = False self.option_hostname = None self.option_uuid = None @@ -38,48 +35,24 @@ class BaseCommandInstance(BaseCommand): def generate_option_hostname(): return make_option('--hostname', dest='hostname', - default='', + default=socket.gethostname(), help='Find instance by specified hostname.') @staticmethod def generate_option_hostname_set(): return make_option('--hostname', dest='hostname', - default='', + default=socket.gethostname(), help='Hostname to assign to the new instance.') - @staticmethod - def generate_option_primary(): - return make_option('--primary', - action='store_true', - default=False, - dest='primary', - help='Register instance as primary.') - - @staticmethod - def generate_option_secondary(): - return make_option('--secondary', - action='store_true', - default=False, - dest='secondary', - help='Register instance as secondary.') - @staticmethod def generate_option_uuid(): + #TODO: Likely deprecated, maybe uuid becomes the cluster ident? return make_option('--uuid', dest='uuid', default='', help='Find instance by specified uuid.') - def include_option_primary_role(self): - BaseCommand.option_list += ( BaseCommandInstance.generate_option_primary(), ) - self.enforce_primary_role = True - - def include_options_roles(self): - self.include_option_primary_role() - BaseCommand.option_list += ( BaseCommandInstance.generate_option_secondary(), ) - self.enforce_roles = True - def include_option_hostname_set(self): BaseCommand.option_list += ( BaseCommandInstance.generate_option_hostname_set(), ) self.enforce_hostname_set = True @@ -94,12 +67,6 @@ class BaseCommandInstance(BaseCommand): def get_option_uuid(self): return self.option_uuid - def is_option_primary(self): - return self.option_primary - - def is_option_secondary(self): - return self.option_secondary - def get_UUID(self): return self.UUID @@ -109,31 +76,13 @@ class BaseCommandInstance(BaseCommand): @property def usage_error(self): - if self.enforce_roles and self.enforce_hostname_set: - return CommandError('--hostname and one of --primary or --secondary is required.') - elif self.enforce_hostname_set: + if self.enforce_hostname_set: return CommandError('--hostname is required.') - elif self.enforce_primary_role: - return CommandError('--primary is required.') - elif self.enforce_roles: - return CommandError('One of --primary or --secondary is required.') def handle(self, *args, **options): if self.enforce_hostname_set and self.enforce_unique_find: raise OptionEnforceError('Can not enforce --hostname as a setter and --hostname as a getter') - if self.enforce_roles: - self.option_primary = options['primary'] - self.option_secondary = options['secondary'] - - if self.is_option_primary() and self.is_option_secondary() or not (self.is_option_primary() or self.is_option_secondary()): - raise self.usage_error - elif self.enforce_primary_role: - if options['primary']: - self.option_primary = options['primary'] - else: - raise self.usage_error - if self.enforce_hostname_set: if options['hostname']: self.option_hostname = options['hostname'] @@ -162,11 +111,4 @@ class BaseCommandInstance(BaseCommand): @staticmethod def instance_str(instance): - return BaseCommandInstance.__instance_str(instance, ('uuid', 'hostname', 'role')) - - def update_projects(self, instance): - """Update all projects, ensuring the job runs against this instance, - which is the primary instance. - """ - for project in Project.objects.all(): - project.update() + return BaseCommandInstance.__instance_str(instance, ('uuid', 'hostname')) diff --git a/awx/main/management/commands/register_instance.py b/awx/main/management/commands/register_instance.py index 942eb9af4d..a7fc2f8011 100644 --- a/awx/main/management/commands/register_instance.py +++ b/awx/main/management/commands/register_instance.py @@ -9,22 +9,14 @@ from awx.main.models import Instance instance_str = BaseCommandInstance.instance_str class Command(BaseCommandInstance): - """Internal tower command. + """ + Internal tower command. Regsiter this instance with the database for HA tracking. This command is idempotent. - - This command will error out in the following conditions: - - * Attempting to register a secondary machine with no primary machines. - * Attempting to register a primary instance when a different primary - instance exists. - * Attempting to re-register an instance with changed values. """ def __init__(self): super(Command, self).__init__() - - self.include_options_roles() self.include_option_hostname_set() def handle(self, *args, **options): @@ -32,32 +24,10 @@ class Command(BaseCommandInstance): uuid = self.get_UUID() - # Is there an existing record for this machine? If so, retrieve that record and look for issues. - try: - instance = Instance.objects.get(uuid=uuid) - if instance.hostname != self.get_option_hostname(): - raise CommandError('Instance already registered with a different hostname %s.' % instance_str(instance)) - print("Instance already registered %s" % instance_str(instance)) - except Instance.DoesNotExist: - # Get a status on primary machines (excluding this one, regardless of its status). - other_instances = Instance.objects.exclude(uuid=uuid) - primaries = other_instances.filter(primary=True).count() - - # If this instance is being set to primary and a *different* primary machine alreadyexists, error out. - if self.is_option_primary() and primaries: - raise CommandError('Another instance is already registered as primary.') - - # Lastly, if there are no primary machines at all, then don't allow this to be registered as a secondary machine. - if self.is_option_secondary() and not primaries: - raise CommandError('Unable to register a secondary machine until another primary machine has been registered.') - - # Okay, we've checked for appropriate errata; perform the registration. - instance = Instance(uuid=uuid, primary=self.is_option_primary(), hostname=self.get_option_hostname()) - instance.save() - - # If this is a primary instance, update projects. - if instance.primary: - self.update_projects(instance) - - # Done! - print('Successfully registered instance %s.' % instance_str(instance)) + instance = Instance.objects.filter(hostname=self.get_option_hostname()) + if instance.exists(): + print("Instance already registered %s" % instance_str(instance[0])) + return + instance = Instance(uuid=uuid, hostname=self.get_option_hostname()) + instance.save() + print('Successfully registered instance %s.' % instance_str(instance)) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index e6080fa419..15b9b8f483 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -21,7 +21,7 @@ from django.db import connection # AWX from awx.main.models import * # noqa -from awx.main.socket import Socket +from awx.main.socket_queue import Socket logger = logging.getLogger('awx.main.commands.run_callback_receiver') diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py index 4241a2000c..90252e2a7d 100644 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ b/awx/main/management/commands/run_fact_cache_receiver.py @@ -14,7 +14,7 @@ from django.utils import timezone # AWX from awx.main.models.fact import Fact from awx.main.models.inventory import Host -from awx.main.socket import Socket +from awx.main.socket_queue import Socket logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver') diff --git a/awx/main/management/commands/run_socketio_service.py b/awx/main/management/commands/run_socketio_service.py index 0e3df4ccaf..4c233aa312 100644 --- a/awx/main/management/commands/run_socketio_service.py +++ b/awx/main/management/commands/run_socketio_service.py @@ -16,7 +16,7 @@ from django.core.management.base import NoArgsCommand # AWX import awx from awx.main.models import * # noqa -from awx.main.socket import Socket +from awx.main.socket_queue import Socket # socketio from socketio import socketio_manage diff --git a/awx/main/managers.py b/awx/main/managers.py index 4825a74cf8..ca4578daf4 100644 --- a/awx/main/managers.py +++ b/awx/main/managers.py @@ -2,6 +2,7 @@ # All Rights Reserved. import sys +import socket from django.conf import settings from django.db import models @@ -28,31 +29,12 @@ class InstanceManager(models.Manager): # If we are running unit tests, return a stub record. if len(sys.argv) >= 2 and sys.argv[1] == 'test': return self.model(id=1, primary=True, + hostname='localhost', uuid='00000000-0000-0000-0000-000000000000') # Return the appropriate record from the database. - return self.get(uuid=settings.SYSTEM_UUID) + return self.get(hostname=socket.gethostname()) def my_role(self): - """Return the role of the currently active instance, as a string - ('primary' or 'secondary'). - """ - # If we are running unit tests, we are primary, because reasons. - if len(sys.argv) >= 2 and sys.argv[1] == 'test': - return 'primary' - - # Check if this instance is primary; if so, return "primary", otherwise - # "secondary". - if self.me().primary: - return 'primary' - return 'secondary' - - def primary(self): - """Return the primary instance.""" - # If we are running unit tests, return a stub record. - if len(sys.argv) >= 2 and sys.argv[1] == 'test': - return self.model(id=1, primary=True, - uuid='00000000-0000-0000-0000-000000000000') - - # Return the appropriate record from the database. - return self.get(primary=True) + # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing + return "tower" diff --git a/awx/main/middleware.py b/awx/main/middleware.py index 37903886ac..fda98f1176 100644 --- a/awx/main/middleware.py +++ b/awx/main/middleware.py @@ -71,41 +71,6 @@ class ActivityStreamMiddleware(threading.local): if instance.id not in self.instance_ids: self.instance_ids.append(instance.id) - -class HAMiddleware(object): - """A middleware class that checks to see whether the request is being - served on a secondary instance, and redirects the request back to the - primary instance if so. - """ - def process_request(self, request): - """Process the request, and redirect if this is a request on a - secondary node. - """ - # Is this the primary node? If so, we can just return None and be done; - # we just want normal behavior in this case. - if Instance.objects.my_role() == 'primary': - return None - - # Always allow the /ping/ endpoint. - if request.path.startswith('/api/v1/ping'): - return None - - # Get the primary instance. - primary = Instance.objects.primary() - - # If this is a request to /, then we return a special landing page that - # informs the user that they are on the secondary instance and will - # be redirected. - if request.path == '/': - return TemplateResponse(request, 'ha/redirect.html', { - 'primary': primary, - 'redirect_seconds': 30, - 'version': version, - }) - - # Redirect to the base page of the primary instance. - return HttpResponseRedirect('http://%s%s' % (primary.hostname, request.path)) - class AuthTokenTimeoutMiddleware(object): """Presume that when the user includes the auth header, they go through the authentication mechanism. Further, that mechanism is presumed to extend diff --git a/awx/main/migrations/0033_v310_modify_ha_instance.py b/awx/main/migrations/0033_v310_modify_ha_instance.py new file mode 100644 index 0000000000..e4321f0235 --- /dev/null +++ b/awx/main/migrations/0033_v310_modify_ha_instance.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0032_v302_credential_permissions_update'), + ] + + operations = [ + migrations.RemoveField( + model_name='instance', + name='primary', + ), + migrations.AlterField( + model_name='instance', + name='uuid', + field=models.CharField(max_length=40), + ), + ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 3725e6afe5..a645c318e4 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -22,9 +22,8 @@ class Instance(models.Model): """ objects = InstanceManager() - uuid = models.CharField(max_length=40, unique=True) + uuid = models.CharField(max_length=40) hostname = models.CharField(max_length=250, unique=True) - primary = models.BooleanField(default=False) created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) @@ -33,29 +32,8 @@ class Instance(models.Model): @property def role(self): - """Return the role of this instance, as a string.""" - if self.primary: - return 'primary' - return 'secondary' - - @functools.wraps(models.Model.save) - def save(self, *args, **kwargs): - """Save the instance. If this is a secondary instance, then ensure - that any currently-running jobs that this instance started are - canceled. - """ - # Perform the normal save. - result = super(Instance, self).save(*args, **kwargs) - - # If this is not a primary instance, then kill any jobs that this - # instance was responsible for starting. - if not self.primary: - for job in UnifiedJob.objects.filter(job_origin__instance=self, - status__in=CAN_CANCEL): - job.cancel() - - # Return back the original result. - return result + # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing + return "tower" class JobOrigin(models.Model): diff --git a/awx/main/socket.py b/awx/main/socket_queue.py similarity index 100% rename from awx/main/socket.py rename to awx/main/socket_queue.py diff --git a/awx/main/utils.py b/awx/main/utils.py index 63235ffca3..270d62e50f 100644 --- a/awx/main/utils.py +++ b/awx/main/utils.py @@ -425,7 +425,7 @@ def get_system_task_capacity(): def emit_websocket_notification(endpoint, event, payload, token_key=None): - from awx.main.socket import Socket + from awx.main.socket_queue import Socket try: with Socket('websocket', 'w', nowait=True, logger=logger) as websocket: diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 97157e4acd..4c3d605f4c 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -152,7 +152,6 @@ MIDDLEWARE_CLASSES = ( # NOQA 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', - 'awx.main.middleware.HAMiddleware', 'awx.main.middleware.ActivityStreamMiddleware', 'awx.sso.middleware.SocialAuthMiddleware', 'crum.CurrentRequestUserMiddleware', From 13a0fd749f03b79116ef35bd47a62e81da062b21 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 9 Sep 2016 15:17:16 -0400 Subject: [PATCH 08/12] Purge old munin monitors and tools --- tools/munin_monitors/callbackr_alive | 16 --------------- tools/munin_monitors/celery_alive | 16 --------------- tools/munin_monitors/postgres_alive | 16 --------------- tools/munin_monitors/redis_alive | 16 --------------- tools/munin_monitors/socketio_alive | 16 --------------- tools/munin_monitors/taskmanager_alive | 16 --------------- tools/munin_monitors/tower_jobs | 27 -------------------------- 7 files changed, 123 deletions(-) delete mode 100755 tools/munin_monitors/callbackr_alive delete mode 100755 tools/munin_monitors/celery_alive delete mode 100755 tools/munin_monitors/postgres_alive delete mode 100755 tools/munin_monitors/redis_alive delete mode 100755 tools/munin_monitors/socketio_alive delete mode 100755 tools/munin_monitors/taskmanager_alive delete mode 100755 tools/munin_monitors/tower_jobs diff --git a/tools/munin_monitors/callbackr_alive b/tools/munin_monitors/callbackr_alive deleted file mode 100755 index 25fb029be8..0000000000 --- a/tools/munin_monitors/callbackr_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title Callback Receiver Processes -graph_vlabel num processes -graph_category tower -callbackr.label Callback Receiver Processes -EOM - exit 0;; -esac - -printf "callbackr.value " -ps ax | grep run_callback_receiver | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/celery_alive b/tools/munin_monitors/celery_alive deleted file mode 100755 index d96bdedf41..0000000000 --- a/tools/munin_monitors/celery_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title Celery Processes -graph_vlabel num processes -graph_category tower -celeryd.label Celery Processes -EOM - exit 0;; -esac - -printf "celeryd.value " -ps ax | grep celeryd | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/postgres_alive b/tools/munin_monitors/postgres_alive deleted file mode 100755 index 2a8115dcb6..0000000000 --- a/tools/munin_monitors/postgres_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title Postmaster Processes -graph_vlabel num processes -graph_category tower -postmaster.label Postmaster Processes -EOM - exit 0;; -esac - -printf "postmaster.value " -ps ax | grep postmaster | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/redis_alive b/tools/munin_monitors/redis_alive deleted file mode 100755 index 3f3573a006..0000000000 --- a/tools/munin_monitors/redis_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title Redis Processes -graph_vlabel num processes -graph_category tower -redis.label Redis Processes -EOM - exit 0;; -esac - -printf "redis.value " -ps ax | grep redis | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/socketio_alive b/tools/munin_monitors/socketio_alive deleted file mode 100755 index d035be40ea..0000000000 --- a/tools/munin_monitors/socketio_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title SocketIO Service Processes -graph_vlabel num processes -graph_category tower -socketio.label SocketIO Service Processes -EOM - exit 0;; -esac - -printf "socketio.value " -ps ax | grep run_socketio_service | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/taskmanager_alive b/tools/munin_monitors/taskmanager_alive deleted file mode 100755 index 25b2054208..0000000000 --- a/tools/munin_monitors/taskmanager_alive +++ /dev/null @@ -1,16 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -graph_title Task Manager Processes -graph_vlabel num processes -graph_category tower -taskm.label Task Manager Processes -EOM - exit 0;; -esac - -printf "taskm.value " -ps ax | grep run_task_system | grep -v grep | wc -l -printf "\n" diff --git a/tools/munin_monitors/tower_jobs b/tools/munin_monitors/tower_jobs deleted file mode 100755 index 8781fc6b76..0000000000 --- a/tools/munin_monitors/tower_jobs +++ /dev/null @@ -1,27 +0,0 @@ -#!/bin/sh - -case $1 in - config) - cat <<'EOM' -multigraph tower_jobs -graph_title Running Jobs breakdown -graph_vlabel job count -graph_category tower -running.label Running jobs -waiting.label Waiting jobs -pending.label Pending jobs -EOM - exit 0;; -esac - -printf "running.value " -awx-manage stats --stat jobs_running -printf "\n" - -printf "waiting.value " -awx-manage stats --stat jobs_waiting -printf "\n" - -printf "pending.value " -awx-manage stats --stat jobs_pending -printf "\n" From 807cced57133cefc4c24189e52b0667650f17fb8 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 9 Sep 2016 15:18:18 -0400 Subject: [PATCH 09/12] Implement a more dynamic celery queue system * Meant to be a starting point to more efficiently manage work routing and to balance work across all tower nodes * Integrate flower as a dev tool that starts alongside other nodes. Helpful for observing and monitoring the queues/exchanges * For the moment, force the task manager to only run on one node (not sure if this is needed) * Define queues and routes for all task work * Bump celery version to 3.1.23 * Expose flower through haproxy --- Makefile | 15 +++++++++++++-- Procfile | 1 + awx/main/tasks.py | 16 ++++++++-------- awx/settings/defaults.py | 20 ++++++++++++++++++++ requirements/requirements.txt | 2 +- requirements/requirements_dev.txt | 1 + tools/docker-compose-cluster.yml | 1 + tools/docker-compose.yml | 1 + tools/docker-compose/haproxy.cfg | 17 +++++++++++++++++ 9 files changed, 63 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index beea1eebd6..87a78a13d3 100644 --- a/Makefile +++ b/Makefile @@ -378,6 +378,12 @@ honcho: fi; \ honcho start +flower: + @if [ "$(VENV_BASE)" ]; then \ + . $(VENV_BASE)/tower/bin/activate; \ + fi; \ + $(PYTHON) manage.py celery flower --address=0.0.0.0 --port=5555 --broker=amqp://guest:guest@$(RABBITMQ_HOST):5672// + # Run the built-in development webserver (by default on http://localhost:8013). runserver: @if [ "$(VENV_BASE)" ]; then \ @@ -390,7 +396,8 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,2 -Ofair --schedule=$(CELERY_SCHEDULE_FILE) + $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default + #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) # Run to start the zeromq callback receiver receiver: @@ -403,7 +410,11 @@ taskmanager: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py run_task_system + if [ "$(COMPOSE_HOST)" == "tower_1" ] || [ "$(COMPOSE_HOST)" == "tower" ]; then \ + $(PYTHON) manage.py run_task_system; \ + else \ + while true; do sleep 2; done; \ + fi socketservice: @if [ "$(VENV_BASE)" ]; then \ diff --git a/Procfile b/Procfile index a301a6aa1a..433417f70b 100644 --- a/Procfile +++ b/Procfile @@ -4,3 +4,4 @@ taskmanager: make taskmanager receiver: make receiver socketservice: make socketservice factcacher: make factcacher +flower: make flower \ No newline at end of file diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 877ed4b2d2..806a819e3e 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -80,7 +80,7 @@ def celery_startup(conf=None, **kwargs): except Exception as e: logger.error("Failed to rebuild schedule {}: {}".format(sch, e)) -@task() +@task(queue='default') def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -101,7 +101,7 @@ def send_notifications(notification_list, job_id=None): if job_id is not None: job_actual.notifications.add(notification) -@task(bind=True) +@task(bind=True, queue='default') def run_administrative_checks(self): if not tower_settings.TOWER_ADMIN_ALERTS: return @@ -122,11 +122,11 @@ def run_administrative_checks(self): tower_admin_emails, fail_silently=True) -@task(bind=True) +@task(bind=True, queue='default') def cleanup_authtokens(self): AuthToken.objects.filter(expires__lt=now()).delete() -@task(bind=True) +@task(bind=True, queue='default') def tower_periodic_scheduler(self): def get_last_run(): if not os.path.exists(settings.SCHEDULE_METADATA_LOCATION): @@ -177,7 +177,7 @@ def tower_periodic_scheduler(self): new_unified_job.socketio_emit_status("failed") emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id)) -@task() +@task(queue='default') def notify_task_runner(metadata_dict): """Add the given task into the Tower task manager's queue, to be consumed by the task system. @@ -185,7 +185,7 @@ def notify_task_runner(metadata_dict): queue = FifoQueue('tower_task_manager') queue.push(metadata_dict) -@task(bind=True) +@task(bind=True, queue='default') def handle_work_success(self, result, task_actual): if task_actual['type'] == 'project_update': instance = ProjectUpdate.objects.get(id=task_actual['id']) @@ -227,7 +227,7 @@ def handle_work_success(self, result, task_actual): for n in all_notification_templates], job_id=task_actual['id']) -@task(bind=True) +@task(bind=True, queue='default') def handle_work_error(self, task_id, subtasks=None): print('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks))) @@ -294,7 +294,7 @@ def handle_work_error(self, task_id, subtasks=None): job_id=first_task_id) -@task() +@task(queue='default') def update_inventory_computed_fields(inventory_id, should_update_hosts=True): ''' Signal handler and wrapper around inventory.update_computed_fields to diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 4c3d605f4c..325536b535 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -8,6 +8,9 @@ import ldap import djcelery from datetime import timedelta +from kombu import Queue, Exchange +from kombu.common import Broadcast + # Update this module's local settings from the global settings module. from django.conf import global_settings this_module = sys.modules[__name__] @@ -326,6 +329,7 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') djcelery.setup_loader() BROKER_URL = 'redis://localhost/' +CELERY_DEFAULT_QUEUE = 'default' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] @@ -335,6 +339,22 @@ CELERYD_TASK_SOFT_TIME_LIMIT = None CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' +CELERY_QUEUES = ( + Queue('default', Exchange('default'), routing_key='default'), + Queue('jobs', Exchange('jobs'), routing_key='jobs'), + Broadcast('projects'), +) +CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_project_update': {'queue': 'projects'}, + 'awx.main.tasks.run_inventory_update': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs', + 'routing_key': 'jobs'}, + 'awx.main.tasks.run_system_job': {'queue': 'jobs', + 'routing_key': 'jobs'} +}) + CELERYBEAT_SCHEDULE = { 'tower_scheduler': { 'task': 'awx.main.tasks.tower_periodic_scheduler', diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 1a3ba9e7f3..fb5872f572 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -6,7 +6,7 @@ azure==2.0.0rc2 Babel==2.2.0 billiard==3.3.0.16 boto==2.40.0 -celery==3.1.10 +celery==3.1.23 cliff==1.15.0 cmd2==0.6.8 d2to1==0.2.11 # TODO: Still needed? diff --git a/requirements/requirements_dev.txt b/requirements/requirements_dev.txt index d7906ce28f..f7fef4a0d4 100644 --- a/requirements/requirements_dev.txt +++ b/requirements/requirements_dev.txt @@ -10,3 +10,4 @@ pytest-cov pytest-django pytest-pythonpath pytest-mock +flower diff --git a/tools/docker-compose-cluster.yml b/tools/docker-compose-cluster.yml index 86027f8849..1b1dee4041 100644 --- a/tools/docker-compose-cluster.yml +++ b/tools/docker-compose-cluster.yml @@ -11,6 +11,7 @@ services: ports: - "8013:8013" - "1936:1936" + - "5555:5555" tower_1: image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} hostname: tower_1 diff --git a/tools/docker-compose.yml b/tools/docker-compose.yml index f34bb25766..08aec5babd 100644 --- a/tools/docker-compose.yml +++ b/tools/docker-compose.yml @@ -12,6 +12,7 @@ services: ports: - "8080:8080" - "8013:8013" + - "5555:5555" links: - postgres - memcached diff --git a/tools/docker-compose/haproxy.cfg b/tools/docker-compose/haproxy.cfg index cfbb3965f7..01d3c94a4a 100644 --- a/tools/docker-compose/haproxy.cfg +++ b/tools/docker-compose/haproxy.cfg @@ -17,6 +17,11 @@ frontend localnodes mode http default_backend nodes +frontend flower + bind *:5555 + mode http + default_backend flower_nodes + backend nodes mode http balance roundrobin @@ -29,6 +34,18 @@ backend nodes server tower_2 tower_2:8013 check server tower_3 tower_3:8013 check +backend flower_nodes + mode http + balance roundrobin + option forwardfor + option http-pretend-keepalive + http-request set-header X-Forwarded-Port %[dst_port] + http-request add-header X-Forwarded-Proto https if { ssl_fc } + #option httpchk HEAD / HTTP/1.1\r\nHost:localhost + server tower_1 tower_1:5555 + server tower_2 tower_2:5555 + server tower_3 tower_3:5555 + listen stats bind *:1936 stats enable From 799f321760f9aab07fff9c24c2fa45f1d686b3e9 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Mon, 12 Sep 2016 10:43:40 -0400 Subject: [PATCH 10/12] Fix an issue running jobs in the cluster The old VENV_PATH settings were still in place --- awx/settings/defaults.py | 1 + awx/settings/development.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 325536b535..8619d695c5 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -342,6 +342,7 @@ CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('jobs', Exchange('jobs'), routing_key='jobs'), + # Projects use a fanout queue, this isn't super well supported Broadcast('projects'), ) CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs', diff --git a/awx/settings/development.py b/awx/settings/development.py index 438c152a0a..4c727e0bdc 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -71,9 +71,9 @@ include(optional('/etc/tower/settings.py'), scope=locals()) include(optional('/etc/tower/conf.d/*.py'), scope=locals()) ANSIBLE_USE_VENV = True -ANSIBLE_VENV_PATH = "/tower_devel/venv/ansible" +ANSIBLE_VENV_PATH = "/venv/ansible" TOWER_USE_VENV = True -TOWER_VENV_PATH = "/tower_devel/venv/tower" +TOWER_VENV_PATH = "/venv/tower" # If any local_*.py files are present in awx/settings/, use them to override # default settings for development. If not present, we can still run using From ab395b00095366c8fdba00a531dcb4782af915dd Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 14 Sep 2016 11:42:13 -0400 Subject: [PATCH 11/12] Integrate callback receiver refactoring * Drop ZMQ as the communication mechanism between job_event_callback and callback_receiver * Setup queue and exchange for callback broker communication * Refactor event plugin and callback receiver to efficiently handle message submission and processing * Integrate django caching for parent processing --- .../commands/run_callback_receiver.py | 333 +++++------------- .../migrations/0034_v310_jobevent_uuid.py | 19 + awx/main/models/jobs.py | 5 + awx/main/tasks.py | 3 +- awx/plugins/callback/job_event_callback.py | 99 +++--- awx/settings/development.py | 2 + requirements/requirements.txt | 2 +- requirements/requirements_ansible.txt | 1 + 8 files changed, 171 insertions(+), 293 deletions(-) create mode 100644 awx/main/migrations/0034_v310_jobevent_uuid.py diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 15b9b8f483..0e27ba06da 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -8,12 +8,17 @@ import datetime import logging import signal import time -from multiprocessing import Process, Queue -from Queue import Empty as QueueEmpty + +from kombu import Connection, Exchange, Queue +from kombu.mixins import ConsumerMixin +from kombu.log import get_logger +from kombu.utils import kwdict, reprcall +from kombu.utils.debug import setup_logging # Django from django.conf import settings from django.core.management.base import NoArgsCommand +from django.core.cache import cache from django.db import transaction, DatabaseError from django.utils.dateparse import parse_datetime from django.utils.timezone import FixedOffset @@ -25,156 +30,49 @@ from awx.main.socket_queue import Socket logger = logging.getLogger('awx.main.commands.run_callback_receiver') -WORKERS = 4 +class CallbackBrokerWorker(ConsumerMixin): -class CallbackReceiver(object): - def __init__(self): - self.parent_mappings = {} + def __init__(self, connection): + self.connection = connection - def run_subscriber(self, use_workers=True): - def shutdown_handler(active_workers): - def _handler(signum, frame): - try: - for active_worker in active_workers: - active_worker.terminate() - signal.signal(signum, signal.SIG_DFL) - os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it - except Exception: - # TODO: LOG - pass - return _handler + def get_consumers(self, Consumer, channel): + return [Consumer(queues=[Queue(settings.CALLBACK_QUEUE, + Exchange(settings.CALLBACK_QUEUE, type='direct'), + routing_key=settings.CALLBACK_QUEUE)], + accept=['json'], + callbacks=[self.process_task])] - def check_pre_handle(data): - event = data.get('event', '') - if event == 'playbook_on_play_start': - return True - return False - - worker_queues = [] - - if use_workers: - connection.close() - for idx in range(WORKERS): - queue_actual = Queue(settings.JOB_EVENT_MAX_QUEUE_SIZE) - w = Process(target=self.callback_worker, args=(queue_actual, idx,)) - w.start() - if settings.DEBUG: - logger.info('Started worker %s' % str(idx)) - worker_queues.append([0, queue_actual, w]) - elif settings.DEBUG: - logger.warn('Started callback receiver (no workers)') - - main_process = Process( - target=self.callback_handler, - args=(use_workers, worker_queues,) - ) - main_process.daemon = True - main_process.start() - - signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - while True: - workers_changed = False - idx = 0 - for queue_worker in worker_queues: - if not queue_worker[2].is_alive(): - logger.warn("Worker %s was not alive, restarting" % str(queue_worker)) - workers_changed = True - queue_worker[2].join() - w = Process(target=self.callback_worker, args=(queue_worker[1], idx,)) - w.daemon = True - w.start() - signal.signal(signal.SIGINT, shutdown_handler([w])) - signal.signal(signal.SIGTERM, shutdown_handler([w])) - queue_worker[2] = w - idx += 1 - if workers_changed: - signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - if not main_process.is_alive(): - logger.error("Main process is not alive") - for queue_worker in worker_queues: - queue_worker[2].terminate() - break - time.sleep(0.1) - - def write_queue_worker(self, preferred_queue, worker_queues, message): - queue_order = sorted(range(WORKERS), cmp=lambda x, y: -1 if x==preferred_queue else 0) - for queue_actual in queue_order: - try: - worker_actual = worker_queues[queue_actual] - worker_actual[1].put(message, block=True, timeout=2) - worker_actual[0] += 1 - return queue_actual - except Exception: - logger.warn("Could not write to queue %s" % preferred_queue) - continue - return None - - def callback_handler(self, use_workers, worker_queues): - total_messages = 0 - last_parent_events = {} - with Socket('callbacks', 'r') as callbacks: - for message in callbacks.listen(): - total_messages += 1 - if 'ad_hoc_command_id' in message: - self.process_ad_hoc_event(message) - elif not use_workers: - self.process_job_event(message) - else: - job_parent_events = last_parent_events.get(message['job_id'], {}) - if message['event'] in ('playbook_on_play_start', 'playbook_on_stats', 'playbook_on_vars_prompt'): - parent = job_parent_events.get('playbook_on_start', None) - elif message['event'] in ('playbook_on_notify', - 'playbook_on_setup', - 'playbook_on_task_start', - 'playbook_on_no_hosts_matched', - 'playbook_on_no_hosts_remaining', - 'playbook_on_include', - 'playbook_on_import_for_host', - 'playbook_on_not_import_for_host'): - parent = job_parent_events.get('playbook_on_play_start', None) - elif message['event'].startswith('runner_on_') or message['event'].startswith('runner_item_on_'): - list_parents = [] - list_parents.append(job_parent_events.get('playbook_on_setup', None)) - list_parents.append(job_parent_events.get('playbook_on_task_start', None)) - list_parents = sorted(filter(lambda x: x is not None, list_parents), cmp=lambda x, y: y.id - x.id) - parent = list_parents[0] if len(list_parents) > 0 else None - else: - parent = None - if parent is not None: - message['parent'] = parent.id - if 'created' in message: - del(message['created']) - if message['event'] in ('playbook_on_start', 'playbook_on_play_start', - 'playbook_on_setup', 'playbook_on_task_start'): - job_parent_events[message['event']] = self.process_job_event(message) - else: - if message['event'] == 'playbook_on_stats': - job_parent_events = {} - - actual_queue = self.write_queue_worker(total_messages % WORKERS, worker_queues, message) - # NOTE: It might be better to recycle the entire callback receiver process if one or more of the queues are too full - # the drawback is that if we under extremely high load we may be legitimately taking a while to process messages - if actual_queue is None: - logger.error("All queues full!") - sys.exit(1) - last_parent_events[message['job_id']] = job_parent_events - - @transaction.atomic - def process_job_event(self, data): - # Sanity check: Do we need to do anything at all? - event = data.get('event', '') - parent_id = data.get('parent', None) - if not event or 'job_id' not in data: - return + def process_task(self, body, message): + try: + if "event" not in body: + raise Exception("Payload does not have an event") + if "job_id" not in body: + raise Exception("Payload does not have a job_id") + if settings.DEBUG: + logger.info("Body: {}".format(body)) + logger.info("Message: {}".format(message)) + self.process_job_event(body) + except Exception as exc: + import traceback + traceback.print_exc() + logger.error('Callback Task Processor Raised Exception: %r', exc) + message.ack() + def process_job_event(self, payload): # Get the correct "verbose" value from the job. # If for any reason there's a problem, just use 0. + if 'ad_hoc_command_id' in payload: + event_type_key = 'ad_hoc_command_id' + event_object_type = AdHocCommand + else: + event_type_key = 'job_id' + event_object_type = Job + try: - verbose = Job.objects.get(id=data['job_id']).verbosity + verbose = event_object_type.objects.get(id=payload[event_type_key]).verbosity except Exception as e: - verbose = 0 + verbose=0 + # TODO: cache # Convert the datetime for the job event's creation appropriately, # and include a time zone for it. @@ -182,120 +80,58 @@ class CallbackReceiver(object): # In the event of any issue, throw it out, and Django will just save # the current time. try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) + if not isinstance(payload['created'], datetime.datetime): + payload['created'] = parse_datetime(payload['created']) + if not payload['created'].tzinfo: + payload['created'] = payload['created'].replace(tzinfo=FixedOffset(0)) except (KeyError, ValueError): - data.pop('created', None) + payload.pop('created', None) - # Print the data to stdout if we're in DEBUG mode. - if settings.DEBUG: - print(data) + event_uuid = payload.get("uuid", '') + parent_event_uuid = payload.get("parent_uuid", '') # Sanity check: Don't honor keys that we don't recognize. - for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', - 'created', 'counter'): - data.pop(key) + for key in payload.keys(): + if key not in (event_type_key, 'event', 'event_data', + 'created', 'counter', 'uuid'): + payload.pop(key) - # Save any modifications to the job event to the database. - # If we get a database error of some kind, bail out. try: # If we're not in verbose mode, wipe out any module # arguments. - res = data['event_data'].get('res', {}) + res = payload['event_data'].get('res', {}) if isinstance(res, dict): i = res.get('invocation', {}) if verbose == 0 and 'module_args' in i: i['module_args'] = '' - # Create a new JobEvent object. - job_event = JobEvent(**data) - if parent_id is not None: - job_event.parent = JobEvent.objects.get(id=parent_id) - job_event.save(post_process=True) - - # Retrun the job event object. - return job_event + if 'ad_hoc_command_id' in payload: + ad_hoc_command_event = AdHocCommandEvent.objects.create(**data) + return + + j = JobEvent(**payload) + if payload['event'] == 'playbook_on_start': + j.save() + cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300) + return + else: + if parent_event_uuid: + parent_id = cache.get("{}_{}".format(payload['job_id'], parent_event_uuid), None) + if parent_id is None: + parent_id_obj = JobEvent.objects.filter(uuid=parent_event_uuid, job_id=payload['job_id']) + if parent_id_obj.exists(): #Problematic if not there, means the parent hasn't been written yet... TODO + j.parent_id = parent_id_obj[0].id + print("Settings cache: {}_{} with value {}".format(payload['job_id'], parent_event_uuid, j.parent_id)) + cache.set("{}_{}".format(payload['job_id'], parent_event_uuid), j.parent_id, 300) + else: + print("Cache hit") + j.parent_id = parent_id + j.save() + if event_uuid: + cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300) except DatabaseError as e: - # Log the error and bail out. - logger.error('Database error saving job event: %s', e) - return None + logger.error("Database Error Saving Job Event: {}".format(e)) - @transaction.atomic - def process_ad_hoc_event(self, data): - # Sanity check: Do we need to do anything at all? - event = data.get('event', '') - if not event or 'ad_hoc_command_id' not in data: - return - - # Get the correct "verbose" value from the job. - # If for any reason there's a problem, just use 0. - try: - verbose = AdHocCommand.objects.get(id=data['ad_hoc_command_id']).verbosity - except Exception as e: - verbose = 0 - - # Convert the datetime for the job event's creation appropriately, - # and include a time zone for it. - # - # In the event of any issue, throw it out, and Django will just save - # the current time. - try: - if not isinstance(data['created'], datetime.datetime): - data['created'] = parse_datetime(data['created']) - if not data['created'].tzinfo: - data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) - except (KeyError, ValueError): - data.pop('created', None) - - # Print the data to stdout if we're in DEBUG mode. - if settings.DEBUG: - print(data) - - # Sanity check: Don't honor keys that we don't recognize. - for key in data.keys(): - if key not in ('ad_hoc_command_id', 'event', 'event_data', - 'created', 'counter'): - data.pop(key) - - # Save any modifications to the ad hoc command event to the database. - # If we get a database error of some kind, bail out. - try: - # If we're not in verbose mode, wipe out any module - # arguments. FIXME: Needed for adhoc? - res = data['event_data'].get('res', {}) - if isinstance(res, dict): - i = res.get('invocation', {}) - if verbose == 0 and 'module_args' in i: - i['module_args'] = '' - - # Create a new AdHocCommandEvent object. - ad_hoc_command_event = AdHocCommandEvent.objects.create(**data) - - # Retrun the ad hoc comamnd event object. - return ad_hoc_command_event - except DatabaseError as e: - # Log the error and bail out. - logger.error('Database error saving ad hoc command event: %s', e) - return None - - def callback_worker(self, queue_actual, idx): - messages_processed = 0 - while True: - try: - message = queue_actual.get(block=True, timeout=1) - except QueueEmpty: - continue - except Exception as e: - logger.error("Exception on listen socket, restarting: " + str(e)) - break - self.process_job_event(message) - messages_processed += 1 - if messages_processed >= settings.JOB_EVENT_RECYCLE_THRESHOLD: - logger.info("Shutting down message receiver") - break class Command(NoArgsCommand): ''' @@ -306,9 +142,10 @@ class Command(NoArgsCommand): help = 'Launch the job callback receiver' def handle_noargs(self, **options): - cr = CallbackReceiver() - try: - cr.run_subscriber() - except KeyboardInterrupt: - pass + with Connection(settings.BROKER_URL) as conn: + try: + worker = CallbackBrokerWorker(conn) + worker.run() + except KeyboardInterrupt: + print('Terminating Callback Receiver') diff --git a/awx/main/migrations/0034_v310_jobevent_uuid.py b/awx/main/migrations/0034_v310_jobevent_uuid.py new file mode 100644 index 0000000000..4feade28ef --- /dev/null +++ b/awx/main/migrations/0034_v310_jobevent_uuid.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0033_v310_modify_ha_instance'), + ] + + operations = [ + migrations.AddField( + model_name='jobevent', + name='uuid', + field=models.CharField(default=b'', max_length=1024, editable=False), + ), + ] diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index c233269ce9..ddff3f9343 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -950,6 +950,11 @@ class JobEvent(CreatedModifiedModel): default=False, editable=False, ) + uuid = models.CharField( + max_length=1024, + default='', + editable=False, + ) host = models.ForeignKey( 'Host', related_name='job_events_as_primary_host', diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 806a819e3e..b4617f505d 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -794,7 +794,8 @@ class RunJob(BaseTask): env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_path env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_TOKEN'] = job.task_auth_token or '' - env['CALLBACK_CONSUMER_PORT'] = str(settings.CALLBACK_CONSUMER_PORT) + env['CALLBACK_QUEUE'] = settings.CALLBACK_QUEUE + env['CALLBACK_CONNECTION'] = settings.BROKER_URL if getattr(settings, 'JOB_CALLBACK_DEBUG', False): env['JOB_CALLBACK_DEBUG'] = '2' elif settings.DEBUG: diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index a9c5b712ed..abec176b2f 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -39,37 +39,16 @@ import pwd import urlparse import re from copy import deepcopy +from uuid import uuid4 + +# Kombu +from kombu import Connection, Exchange, Producer # Requests import requests -# ZeroMQ -import zmq - import psutil -# Only use statsd if there's a statsd host in the environment -# otherwise just do a noop. -# NOTE: I've disabled this for the time being until we sort through the venv dependency around this -# if os.environ.get('GRAPHITE_PORT_8125_UDP_ADDR'): -# from statsd import StatsClient -# statsd = StatsClient(host=os.environ['GRAPHITE_PORT_8125_UDP_ADDR'], -# port=8125, -# prefix='tower.job.event_callback', -# maxudpsize=512) -# else: -# from statsd import StatsClient -# class NoStatsClient(StatsClient): -# def __init__(self, *args, **kwargs): -# pass -# def _prepare(self, stat, value, rate): -# pass -# def _send_stat(self, stat, value, rate): -# pass -# def _send(self, *args, **kwargs): -# pass -# statsd = NoStatsClient() - CENSOR_FIELD_WHITELIST = [ 'msg', 'failed', @@ -124,6 +103,7 @@ class TokenAuth(requests.auth.AuthBase): return request +# TODO: non v2_ events are deprecated and should be purge/refactored out class BaseCallbackModule(object): ''' Callback module for logging ansible-playbook job events via the REST API. @@ -132,12 +112,16 @@ class BaseCallbackModule(object): def __init__(self): self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') - self.callback_consumer_port = os.getenv('CALLBACK_CONSUMER_PORT', '') - self.context = None - self.socket = None + self.callback_connection = os.getenv('CALLBACK_CONNECTION', None) + self.connection_queue = os.getenv('CALLBACK_QUEUE', '') + self.connection = None + self.exchange = None self._init_logging() self._init_connection() self.counter = 0 + self.active_playbook = None + self.active_play = None + self.active_task = None def _init_logging(self): try: @@ -158,15 +142,11 @@ class BaseCallbackModule(object): self.logger.propagate = False def _init_connection(self): - self.context = None - self.socket = None + self.connection = None def _start_connection(self): - self.context = zmq.Context() - self.socket = self.context.socket(zmq.REQ) - self.socket.setsockopt(zmq.RCVTIMEO, 4000) - self.socket.setsockopt(zmq.LINGER, 2000) - self.socket.connect(self.callback_consumer_port) + self.connection = Connection(self.callback_connection) + self.exchange = Exchange(self.connection_queue, type='direct') def _post_job_event_queue_msg(self, event, event_data): self.counter += 1 @@ -176,6 +156,29 @@ class BaseCallbackModule(object): 'counter': self.counter, 'created': datetime.datetime.utcnow().isoformat(), } + if event in ('playbook_on_play_start', + 'playbook_on_stats', + 'playbook_on_vars_prompt'): + msg['parent_uuid'] = str(self.active_playbook) + elif event in ('playbook_on_notify', + 'playbook_on_setup', + 'playbook_on_task_start', + 'playbook_on_no_hosts_matched', + 'playbook_on_no_hosts_remaining', + 'playbook_on_include', + 'playbook_on_import_for_host', + 'playbook_on_not_import_for_host'): + msg['parent_uuid'] = str(self.active_play) + elif event.startswith('runner_on_') or event.startswith('runner_item_on_'): + msg['parent_uuid'] = str(self.active_task) + else: + msg['parent_uuid'] = '' + + if "uuid" in event_data: + msg['uuid'] = str(event_data['uuid']) + else: + msg['uuid'] = '' + if getattr(self, 'job_id', None): msg['job_id'] = self.job_id if getattr(self, 'ad_hoc_command_id', None): @@ -192,11 +195,16 @@ class BaseCallbackModule(object): self.connection_pid = active_pid if self.connection_pid != active_pid: self._init_connection() - if self.context is None: + if self.connection is None: self._start_connection() - self.socket.send_json(msg) - self.socket.recv() + producer = Producer(self.connection) + producer.publish(msg, + serializer='json', + compression='bzip2', + exchange=self.exchange, + declare=[self.exchange], + routing_key=self.connection_queue) return except Exception, e: self.logger.info('Publish Job Event Exception: %r, retry=%d', e, @@ -230,7 +238,7 @@ class BaseCallbackModule(object): if 'res' in event_data: event_data['res'] = censor(deepcopy(event_data['res'])) - if self.callback_consumer_port: + if self.callback_connection: self._post_job_event_queue_msg(event, event_data) else: self._post_rest_api_event(event, event_data) @@ -416,7 +424,9 @@ class JobCallbackModule(BaseCallbackModule): def v2_playbook_on_start(self, playbook): # NOTE: the playbook parameter was added late in Ansible 2.0 development # so we don't currently utilize but could later. - self.playbook_on_start() + # NOTE: Ansible doesn't generate a UUID for playbook_on_start so we'll do it for them + self.active_playbook = str(uuid4()) + self._log_event('playbook_on_start', uuid=self.active_playbook) def playbook_on_notify(self, host, handler): self._log_event('playbook_on_notify', host=host, handler=handler) @@ -446,14 +456,16 @@ class JobCallbackModule(BaseCallbackModule): is_conditional=is_conditional) def v2_playbook_on_task_start(self, task, is_conditional): - self._log_event('playbook_on_task_start', task=task, + self.active_task = task._uuid + self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid), name=task.get_name(), is_conditional=is_conditional) def v2_playbook_on_cleanup_task_start(self, task): # re-using playbook_on_task_start event here for this v2-specific # event, though we may consider any changes necessary to distinguish # this from a normal task - self._log_event('playbook_on_task_start', task=task, + self.active_task = task._uuid + self._log_event('playbook_on_task_start', task=task, uuid=str(task._uuid), name=task.get_name()) def playbook_on_vars_prompt(self, varname, private=True, prompt=None, @@ -507,7 +519,8 @@ class JobCallbackModule(BaseCallbackModule): play.name = ','.join(play.hosts) else: play.name = play.hosts - self._log_event('playbook_on_play_start', name=play.name, + self.active_play = play._uuid + self._log_event('playbook_on_play_start', name=play.name, uuid=str(play._uuid), pattern=play.hosts) def playbook_on_stats(self, stats): diff --git a/awx/settings/development.py b/awx/settings/development.py index 4c727e0bdc..722f397900 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -28,6 +28,8 @@ if 'celeryd' in sys.argv: CALLBACK_CONSUMER_PORT = "tcp://127.0.0.1:5557" CALLBACK_QUEUE_PORT = "ipc:///tmp/callback_receiver_dev.ipc" +CALLBACK_QUEUE = "callback_tasks" + # Enable PROOT for tower-qa integration tests AWX_PROOT_ENABLED = True diff --git a/requirements/requirements.txt b/requirements/requirements.txt index fb5872f572..433ae22e00 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -50,7 +50,7 @@ jsonpatch==1.12 jsonpointer==1.10 jsonschema==2.5.1 keyring==4.1 -kombu==3.0.30 +kombu==3.0.35 apache-libcloud==0.20.1 lxml==3.4.4 Markdown==2.4.1 diff --git a/requirements/requirements_ansible.txt b/requirements/requirements_ansible.txt index b35cb6fcbb..fe9fe45aed 100644 --- a/requirements/requirements_ansible.txt +++ b/requirements/requirements_ansible.txt @@ -25,6 +25,7 @@ jsonpatch==1.12 jsonpointer==1.10 jsonschema==2.5.1 keyring==4.1 +kombu==3.0.35 lxml==3.4.4 mock==1.0.1 monotonic==0.6 From 574a0fde058d69d6b894ddf42b074cf61db220d3 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Thu, 15 Sep 2016 10:15:14 -0400 Subject: [PATCH 12/12] Rename database migrations for devel integration --- ...v310_modify_ha_instance.py => 0034_v310_modify_ha_instance.py} | 0 .../{0034_v310_jobevent_uuid.py => 0035_v310_jobevent_uuid.py} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename awx/main/migrations/{0033_v310_modify_ha_instance.py => 0034_v310_modify_ha_instance.py} (100%) rename awx/main/migrations/{0034_v310_jobevent_uuid.py => 0035_v310_jobevent_uuid.py} (100%) diff --git a/awx/main/migrations/0033_v310_modify_ha_instance.py b/awx/main/migrations/0034_v310_modify_ha_instance.py similarity index 100% rename from awx/main/migrations/0033_v310_modify_ha_instance.py rename to awx/main/migrations/0034_v310_modify_ha_instance.py diff --git a/awx/main/migrations/0034_v310_jobevent_uuid.py b/awx/main/migrations/0035_v310_jobevent_uuid.py similarity index 100% rename from awx/main/migrations/0034_v310_jobevent_uuid.py rename to awx/main/migrations/0035_v310_jobevent_uuid.py