Implement a more dynamic celery queue system

* Meant to be a starting point to more efficiently manage work routing
  and to balance work across all tower nodes
* Integrate flower as a dev tool that starts alongside other nodes.
  Helpful for observing and monitoring the queues/exchanges
* For the moment, force the task manager to only run on one node (not
  sure if this is needed)
* Define queues and routes for all task work
* Bump celery version to 3.1.23
* Expose flower through haproxy
This commit is contained in:
Matthew Jones
2016-09-09 15:18:18 -04:00
parent 13a0fd749f
commit 807cced571
9 changed files with 63 additions and 11 deletions

View File

@@ -378,6 +378,12 @@ honcho:
fi; \ fi; \
honcho start honcho start
flower:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/tower/bin/activate; \
fi; \
$(PYTHON) manage.py celery flower --address=0.0.0.0 --port=5555 --broker=amqp://guest:guest@$(RABBITMQ_HOST):5672//
# Run the built-in development webserver (by default on http://localhost:8013). # Run the built-in development webserver (by default on http://localhost:8013).
runserver: runserver:
@if [ "$(VENV_BASE)" ]; then \ @if [ "$(VENV_BASE)" ]; then \
@@ -390,7 +396,8 @@ celeryd:
@if [ "$(VENV_BASE)" ]; then \ @if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/tower/bin/activate; \ . $(VENV_BASE)/tower/bin/activate; \
fi; \ fi; \
$(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,2 -Ofair --schedule=$(CELERY_SCHEDULE_FILE) $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default
#$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE)
# Run to start the zeromq callback receiver # Run to start the zeromq callback receiver
receiver: receiver:
@@ -403,7 +410,11 @@ taskmanager:
@if [ "$(VENV_BASE)" ]; then \ @if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/tower/bin/activate; \ . $(VENV_BASE)/tower/bin/activate; \
fi; \ fi; \
$(PYTHON) manage.py run_task_system if [ "$(COMPOSE_HOST)" == "tower_1" ] || [ "$(COMPOSE_HOST)" == "tower" ]; then \
$(PYTHON) manage.py run_task_system; \
else \
while true; do sleep 2; done; \
fi
socketservice: socketservice:
@if [ "$(VENV_BASE)" ]; then \ @if [ "$(VENV_BASE)" ]; then \

View File

@@ -4,3 +4,4 @@ taskmanager: make taskmanager
receiver: make receiver receiver: make receiver
socketservice: make socketservice socketservice: make socketservice
factcacher: make factcacher factcacher: make factcacher
flower: make flower

View File

@@ -80,7 +80,7 @@ def celery_startup(conf=None, **kwargs):
except Exception as e: except Exception as e:
logger.error("Failed to rebuild schedule {}: {}".format(sch, e)) logger.error("Failed to rebuild schedule {}: {}".format(sch, e))
@task() @task(queue='default')
def send_notifications(notification_list, job_id=None): def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list): if not isinstance(notification_list, list):
raise TypeError("notification_list should be of type list") raise TypeError("notification_list should be of type list")
@@ -101,7 +101,7 @@ def send_notifications(notification_list, job_id=None):
if job_id is not None: if job_id is not None:
job_actual.notifications.add(notification) job_actual.notifications.add(notification)
@task(bind=True) @task(bind=True, queue='default')
def run_administrative_checks(self): def run_administrative_checks(self):
if not tower_settings.TOWER_ADMIN_ALERTS: if not tower_settings.TOWER_ADMIN_ALERTS:
return return
@@ -122,11 +122,11 @@ def run_administrative_checks(self):
tower_admin_emails, tower_admin_emails,
fail_silently=True) fail_silently=True)
@task(bind=True) @task(bind=True, queue='default')
def cleanup_authtokens(self): def cleanup_authtokens(self):
AuthToken.objects.filter(expires__lt=now()).delete() AuthToken.objects.filter(expires__lt=now()).delete()
@task(bind=True) @task(bind=True, queue='default')
def tower_periodic_scheduler(self): def tower_periodic_scheduler(self):
def get_last_run(): def get_last_run():
if not os.path.exists(settings.SCHEDULE_METADATA_LOCATION): if not os.path.exists(settings.SCHEDULE_METADATA_LOCATION):
@@ -177,7 +177,7 @@ def tower_periodic_scheduler(self):
new_unified_job.socketio_emit_status("failed") new_unified_job.socketio_emit_status("failed")
emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id)) emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id))
@task() @task(queue='default')
def notify_task_runner(metadata_dict): def notify_task_runner(metadata_dict):
"""Add the given task into the Tower task manager's queue, to be consumed """Add the given task into the Tower task manager's queue, to be consumed
by the task system. by the task system.
@@ -185,7 +185,7 @@ def notify_task_runner(metadata_dict):
queue = FifoQueue('tower_task_manager') queue = FifoQueue('tower_task_manager')
queue.push(metadata_dict) queue.push(metadata_dict)
@task(bind=True) @task(bind=True, queue='default')
def handle_work_success(self, result, task_actual): def handle_work_success(self, result, task_actual):
if task_actual['type'] == 'project_update': if task_actual['type'] == 'project_update':
instance = ProjectUpdate.objects.get(id=task_actual['id']) instance = ProjectUpdate.objects.get(id=task_actual['id'])
@@ -227,7 +227,7 @@ def handle_work_success(self, result, task_actual):
for n in all_notification_templates], for n in all_notification_templates],
job_id=task_actual['id']) job_id=task_actual['id'])
@task(bind=True) @task(bind=True, queue='default')
def handle_work_error(self, task_id, subtasks=None): def handle_work_error(self, task_id, subtasks=None):
print('Executing error task id %s, subtasks: %s' % print('Executing error task id %s, subtasks: %s' %
(str(self.request.id), str(subtasks))) (str(self.request.id), str(subtasks)))
@@ -294,7 +294,7 @@ def handle_work_error(self, task_id, subtasks=None):
job_id=first_task_id) job_id=first_task_id)
@task() @task(queue='default')
def update_inventory_computed_fields(inventory_id, should_update_hosts=True): def update_inventory_computed_fields(inventory_id, should_update_hosts=True):
''' '''
Signal handler and wrapper around inventory.update_computed_fields to Signal handler and wrapper around inventory.update_computed_fields to

View File

@@ -8,6 +8,9 @@ import ldap
import djcelery import djcelery
from datetime import timedelta from datetime import timedelta
from kombu import Queue, Exchange
from kombu.common import Broadcast
# Update this module's local settings from the global settings module. # Update this module's local settings from the global settings module.
from django.conf import global_settings from django.conf import global_settings
this_module = sys.modules[__name__] this_module = sys.modules[__name__]
@@ -326,6 +329,7 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
djcelery.setup_loader() djcelery.setup_loader()
BROKER_URL = 'redis://localhost/' BROKER_URL = 'redis://localhost/'
CELERY_DEFAULT_QUEUE = 'default'
CELERY_TASK_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json'] CELERY_ACCEPT_CONTENT = ['json']
@@ -335,6 +339,22 @@ CELERYD_TASK_SOFT_TIME_LIMIT = None
CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler'
CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERYBEAT_MAX_LOOP_INTERVAL = 60
CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('jobs', Exchange('jobs'), routing_key='jobs'),
Broadcast('projects'),
)
CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs',
'routing_key': 'jobs'},
'awx.main.tasks.run_project_update': {'queue': 'projects'},
'awx.main.tasks.run_inventory_update': {'queue': 'jobs',
'routing_key': 'jobs'},
'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs',
'routing_key': 'jobs'},
'awx.main.tasks.run_system_job': {'queue': 'jobs',
'routing_key': 'jobs'}
})
CELERYBEAT_SCHEDULE = { CELERYBEAT_SCHEDULE = {
'tower_scheduler': { 'tower_scheduler': {
'task': 'awx.main.tasks.tower_periodic_scheduler', 'task': 'awx.main.tasks.tower_periodic_scheduler',

View File

@@ -6,7 +6,7 @@ azure==2.0.0rc2
Babel==2.2.0 Babel==2.2.0
billiard==3.3.0.16 billiard==3.3.0.16
boto==2.40.0 boto==2.40.0
celery==3.1.10 celery==3.1.23
cliff==1.15.0 cliff==1.15.0
cmd2==0.6.8 cmd2==0.6.8
d2to1==0.2.11 # TODO: Still needed? d2to1==0.2.11 # TODO: Still needed?

View File

@@ -10,3 +10,4 @@ pytest-cov
pytest-django pytest-django
pytest-pythonpath pytest-pythonpath
pytest-mock pytest-mock
flower

View File

@@ -11,6 +11,7 @@ services:
ports: ports:
- "8013:8013" - "8013:8013"
- "1936:1936" - "1936:1936"
- "5555:5555"
tower_1: tower_1:
image: gcr.io/ansible-tower-engineering/tower_devel:${TAG} image: gcr.io/ansible-tower-engineering/tower_devel:${TAG}
hostname: tower_1 hostname: tower_1

View File

@@ -12,6 +12,7 @@ services:
ports: ports:
- "8080:8080" - "8080:8080"
- "8013:8013" - "8013:8013"
- "5555:5555"
links: links:
- postgres - postgres
- memcached - memcached

View File

@@ -17,6 +17,11 @@ frontend localnodes
mode http mode http
default_backend nodes default_backend nodes
frontend flower
bind *:5555
mode http
default_backend flower_nodes
backend nodes backend nodes
mode http mode http
balance roundrobin balance roundrobin
@@ -29,6 +34,18 @@ backend nodes
server tower_2 tower_2:8013 check server tower_2 tower_2:8013 check
server tower_3 tower_3:8013 check server tower_3 tower_3:8013 check
backend flower_nodes
mode http
balance roundrobin
option forwardfor
option http-pretend-keepalive
http-request set-header X-Forwarded-Port %[dst_port]
http-request add-header X-Forwarded-Proto https if { ssl_fc }
#option httpchk HEAD / HTTP/1.1\r\nHost:localhost
server tower_1 tower_1:5555
server tower_2 tower_2:5555
server tower_3 tower_3:5555
listen stats listen stats
bind *:1936 bind *:1936
stats enable stats enable