From 705f8af440b6c21d03b82ff7c71f3c07b263b686 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 5 Apr 2017 10:42:47 -0400 Subject: [PATCH] Update views and serializers to support instance group (ramparts) * includes top level views for instances and instance groups and extending those views to be able to view running jobs * Associative endpoints on Organizations, Inventories, and Job Templates * Related and summary field entries where appropriate * Adding job model references to executing instance group * Fix up default queue properties for clustering from the settings file * Update production and default settings for instance queues in settings --- Makefile | 3 +- awx/api/serializers.py | 37 ++++- awx/api/urls.py | 19 +++ awx/api/views.py | 137 ++++++++++++++++++ awx/main/access.py | 12 ++ .../commands/instance_group_remove.py | 29 ++++ .../management/commands/list_instances.py | 10 +- .../management/commands/register_queue.py | 10 +- .../management/commands/unregister_queue.py | 23 +++ awx/main/models/ha.py | 16 +- awx/main/tasks.py | 8 +- awx/settings/defaults.py | 31 ++-- tools/docker-compose/supervisor.conf | 2 +- 13 files changed, 303 insertions(+), 34 deletions(-) create mode 100644 awx/main/management/commands/instance_group_remove.py create mode 100644 awx/main/management/commands/unregister_queue.py diff --git a/Makefile b/Makefile index 25654e4669..120009a2cf 100644 --- a/Makefile +++ b/Makefile @@ -336,6 +336,7 @@ init: . $(VENV_BASE)/tower/bin/activate; \ fi; \ tower-manage register_instance --hostname=$(COMPOSE_HOST); \ + tower-manage register_queue --queuename=tower --hostnames=$(COMPOSE_HOST);\ # Refresh development environment after pulling new code. refresh: clean requirements_dev version_file develop migrate @@ -438,7 +439,7 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q default,tower_scheduler,tower_broadcast_all,tower,$(COMPOSE_HOST) -n celery@$(COMPOSE_HOST) + $(PYTHON) manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q tower_scheduler,tower_broadcast_all,tower,$(COMPOSE_HOST) -n celery@$(COMPOSE_HOST) #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) # Run to start the zeromq callback receiver diff --git a/awx/api/serializers.py b/awx/api/serializers.py index af4629ee02..d25209a8e2 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -105,6 +105,7 @@ SUMMARIZABLE_FK_FIELDS = { 'source_script': ('name', 'description'), 'role': ('id', 'role_field'), 'notification_template': DEFAULT_SUMMARY_FIELDS, + 'instance_group': {'id', 'name'} } @@ -895,6 +896,7 @@ class OrganizationSerializer(BaseSerializer): notification_templates_error = self.reverse('api:organization_notification_templates_error_list', kwargs={'pk': obj.pk}), object_roles = self.reverse('api:organization_object_roles_list', kwargs={'pk': obj.pk}), access_list = self.reverse('api:organization_access_list', kwargs={'pk': obj.pk}), + instance_groups = self.reverse('api:organization_instance_groups_list', kwargs={'pk': obj.pk}), )) return res @@ -1128,6 +1130,7 @@ class InventorySerializer(BaseSerializerWithVariables): ad_hoc_commands = self.reverse('api:inventory_ad_hoc_commands_list', kwargs={'pk': obj.pk}), access_list = self.reverse('api:inventory_access_list', kwargs={'pk': obj.pk}), object_roles = self.reverse('api:inventory_object_roles_list', kwargs={'pk': obj.pk}), + instance_groups = self.reverse('api:inventory_instance_groups_list', kwargs={'pk': obj.pk}), )) if obj.organization: res['organization'] = self.reverse('api:organization_detail', kwargs={'pk': obj.organization.pk}) @@ -2211,6 +2214,7 @@ class JobTemplateSerializer(JobTemplateMixin, UnifiedJobTemplateSerializer, JobO survey_spec = self.reverse('api:job_template_survey_spec', kwargs={'pk': obj.pk}), labels = self.reverse('api:job_template_label_list', kwargs={'pk': obj.pk}), object_roles = self.reverse('api:job_template_object_roles_list', kwargs={'pk': obj.pk}), + instance_groups = self.reverse('api:job_template_instance_groups_list', kwargs={'pk': obj.pk}), )) if obj.host_config_key: res['callback'] = self.reverse('api:job_template_callback', kwargs={'pk': obj.pk}) @@ -2263,7 +2267,7 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer): fields = ('*', 'job_template', 'passwords_needed_to_start', 'ask_variables_on_launch', 'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch', 'ask_job_type_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch', - 'allow_simultaneous', 'artifacts', 'scm_revision') + 'allow_simultaneous', 'artifacts', 'scm_revision', 'instance_group') def get_related(self, obj): res = super(JobSerializer, self).get_related(obj) @@ -3230,6 +3234,37 @@ class ScheduleSerializer(BaseSerializer): return value +class InstanceSerializer(BaseSerializer): + + consumed_capacity = serializers.SerializerMethodField() + + class Meta: + model = Instance + fields = ("related", "id", "uuid", "hostname", "created", "modified", "version", "capacity", "consumed_capacity") + + def get_related(self, obj): + res = super(InstanceSerializer, self).get_related(obj) + res['jobs'] = self.reverse('api:instance_unified_jobs_list', kwargs={'pk': obj.pk}) + res['instance_groups'] = self.reverse('api:instance_instance_groups_list', kwargs={'pk': obj.pk}) + return res + + def get_consumed_capacity(self, obj): + return obj.consumed_capacity + + +class InstanceGroupSerializer(BaseSerializer): + + class Meta: + model = InstanceGroup + fields = ("related", "id", "name", "created", "modified", "capacity", "consumed_capacity") + + def get_related(self, obj): + res = super(InstanceGroupSerializer, self).get_related(obj) + res['jobs'] = self.reverse('api:instance_group_unified_jobs_list', kwargs={'pk': obj.pk}) + res['instances'] = self.reverse('api:instance_group_instance_list', kwargs={'pk': obj.pk}) + return res + + class ActivityStreamSerializer(BaseSerializer): changes = serializers.SerializerMethodField() diff --git a/awx/api/urls.py b/awx/api/urls.py index eb7e331cee..f134e271f1 100644 --- a/awx/api/urls.py +++ b/awx/api/urls.py @@ -26,6 +26,7 @@ organization_urls = patterns('awx.api.views', url(r'^(?P[0-9]+)/notification_templates_any/$', 'organization_notification_templates_any_list'), url(r'^(?P[0-9]+)/notification_templates_error/$', 'organization_notification_templates_error_list'), url(r'^(?P[0-9]+)/notification_templates_success/$', 'organization_notification_templates_success_list'), + url(r'^(?P[0-9]+)/instance_groups/$', 'organization_instance_groups_list'), url(r'^(?P[0-9]+)/object_roles/$', 'organization_object_roles_list'), url(r'^(?P[0-9]+)/access_list/$', 'organization_access_list'), ) @@ -99,6 +100,7 @@ inventory_urls = patterns('awx.api.views', url(r'^(?P[0-9]+)/ad_hoc_commands/$', 'inventory_ad_hoc_commands_list'), url(r'^(?P[0-9]+)/access_list/$', 'inventory_access_list'), url(r'^(?P[0-9]+)/object_roles/$', 'inventory_object_roles_list'), + url(r'^(?P[0-9]+)/instance_groups/$', 'inventory_instance_groups_list'), #url(r'^(?P[0-9]+)/single_fact/$', 'inventory_single_fact_view'), ) @@ -200,6 +202,7 @@ job_template_urls = patterns('awx.api.views', url(r'^(?P[0-9]+)/notification_templates_any/$', 'job_template_notification_templates_any_list'), url(r'^(?P[0-9]+)/notification_templates_error/$', 'job_template_notification_templates_error_list'), url(r'^(?P[0-9]+)/notification_templates_success/$', 'job_template_notification_templates_success_list'), + url(r'^(?P[0-9]+)/instance_groups/$', 'job_template_instance_groups_list'), url(r'^(?P[0-9]+)/access_list/$', 'job_template_access_list'), url(r'^(?P[0-9]+)/object_roles/$', 'job_template_object_roles_list'), url(r'^(?P[0-9]+)/labels/$', 'job_template_label_list'), @@ -338,6 +341,20 @@ activity_stream_urls = patterns('awx.api.views', url(r'^(?P[0-9]+)/$', 'activity_stream_detail'), ) +instance_urls = patterns('awx.api.views', + url(r'^$', 'instance_list'), + url(r'^(?P[0-9]+)/$', 'instance_detail'), + url(r'^(?P[0-9]+)/jobs/$', 'instance_unified_jobs_list'), + url(r'^(?P[0-9]+)/instance_groups/$', 'instance_instance_groups_list'), +) + +instance_group_urls = patterns('awx.api.views', + url(r'^$', 'instance_group_list'), + url(r'^(?P[0-9]+)/$', 'instance_group_detail'), + url(r'^(?P[0-9]+)/jobs/$', 'instance_group_unified_jobs_list'), + url(r'^(?P[0-9]+)/instances/$', 'instance_group_instance_list'), +) + v1_urls = patterns('awx.api.views', url(r'^$', 'api_v1_root_view'), url(r'^ping/$', 'api_v1_ping_view'), @@ -348,6 +365,8 @@ v1_urls = patterns('awx.api.views', url(r'^dashboard/$', 'dashboard_view'), url(r'^dashboard/graphs/jobs/$','dashboard_jobs_graph_view'), url(r'^settings/', include('awx.conf.urls')), + url(r'^instances/', include(instance_urls)), + url(r'^instance_groups/', include(instance_group_urls)), url(r'^schedules/', include(schedule_urls)), url(r'^organizations/', include(organization_urls)), url(r'^users/', include(user_urls)), diff --git a/awx/api/views.py b/awx/api/views.py index c496f99d9f..c966f4a765 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -159,6 +159,8 @@ class ApiVersionRootView(APIView): data = OrderedDict() data['authtoken'] = reverse('api:auth_token_view', request=request) data['ping'] = reverse('api:api_v1_ping_view', request=request) + data['instances'] = reverse('api:instance_list', request=request) + data['instance_groups'] = reverse('api:instance_group_list', request=request) data['config'] = reverse('api:api_v1_config_view', request=request) data['settings'] = reverse('api:setting_category_list', request=request) data['me'] = reverse('api:user_me_list', request=request) @@ -238,6 +240,11 @@ class ApiV1PingView(APIView): response['instances'].append(dict(node=instance.hostname, heartbeat=instance.modified, capacity=instance.capacity, version=instance.version)) response['instances'].sort() + response['rampart_groups'] = [] + for instance_group in InstanceGroup.objects.all(): + response['rampart_groups'].append(dict(name=instance_group.name, + capacity=instance_group.capacity, + instances=[x.hostname for x in instance_group.instances.all()])) return Response(response) @@ -498,6 +505,88 @@ class DashboardJobsGraphView(APIView): return Response(dashboard_data) +class InstanceList(ListAPIView): + + view_name = _("Instances") + model = Instance + serializer_class = InstanceSerializer + new_in_320 = True + + +class InstanceDetail(RetrieveAPIView): + + view_name = _("Instance Detail") + model = Instance + serializer_class = InstanceSerializer + new_in_320 = True + + +class InstanceUnifiedJobsList(SubListAPIView): + + view_name = _("Instance Running Jobs") + model = UnifiedJob + serializer_class = UnifiedJobSerializer + parent_model = Instance + new_in_320 = True + + def get_queryset(self): + po = self.get_parent_object() + qs = get_user_queryset(self.request.user, UnifiedJob) + qs = qs.filter(execution_node=po.hostname, status__in=('running', 'waiting', 'pending')) + return qs + + +class InstanceInstanceGroupsList(SubListAPIView): + + view_name = _("Instance's Instance Groups") + model = InstanceGroup + serializer_class = InstanceGroupSerializer + parent_model = Instance + new_in_320 = True + relationship = 'rampart_groups' + + +class InstanceGroupList(ListAPIView): + + view_name = _("Instance Groups") + model = InstanceGroup + serializer_class = InstanceGroupSerializer + new_in_320 = True + + +class InstanceGroupDetail(RetrieveAPIView): + + view_name = _("Instance Group Detail") + model = InstanceGroup + serializer_class = InstanceGroupSerializer + new_in_320 = True + + +class InstanceGroupUnifiedJobsList(SubListAPIView): + + view_name = _("Instance Group Running Jobs") + model = UnifiedJob + serializer_class = UnifiedJobSerializer + parent_model = InstanceGroup + new_in_320 = True + + def get_queryset(self): + po = self.get_parent_object() + qs = get_user_queryset(self.request.user, UnifiedJob) + qs = qs.filter(instance_group=po, status__in=('running', 'waiting', 'pending')) + return qs + + +class InstanceGroupInstanceList(SubListAPIView): + + view_name = _("Instance Group's Instances") + model = Instance + serializer_class = InstanceSerializer + parent_model = InstanceGroup + new_in_320 = True + relationship = "instances" + + class ScheduleList(ListAPIView): view_name = _("Schedules") @@ -904,6 +993,22 @@ class OrganizationNotificationTemplatesSuccessList(SubListCreateAttachDetachAPIV new_in_300 = True +class OrganizationInstanceGroupsList(SubListCreateAttachDetachAPIView): + + model = InstanceGroup + serializer_class = InstanceGroupSerializer + parent_model = Organization + relationship = 'instance_groups' + new_in_320 = True + + def post(self, request, *args, **kwargs): + sub_id = request.data.get('id', None) + if not sub_id: + return Response(dict(msg=_("Instance Group 'id' field is missing.")), + status=status.HTTP_400_BAD_REQUEST) + return super(OrganizationInstanceGroupsList, self).post(request, *args, **kwargs) + + class OrganizationAccessList(ResourceAccessList): model = User # needs to be User for AccessLists's @@ -1747,6 +1852,22 @@ class InventoryActivityStreamList(ActivityStreamEnforcementMixin, SubListAPIView return qs.filter(Q(inventory=parent) | Q(host__in=parent.hosts.all()) | Q(group__in=parent.groups.all())) +class InventoryInstanceGroupsList(SubListCreateAttachDetachAPIView): + + model = InstanceGroup + serializer_class = InstanceGroupSerializer + parent_model = Inventory + relationship = 'instance_groups' + new_in_320 = True + + def post(self, request, *args, **kwargs): + sub_id = request.data.get('id', None) + if not sub_id: + return Response(dict(msg=_("Instance Group 'id' field is missing.")), + status=status.HTTP_400_BAD_REQUEST) + return super(InventoryInstanceGroupsList, self).post(request, *args, **kwargs) + + class InventoryAccessList(ResourceAccessList): model = User # needs to be User for AccessLists's @@ -2888,6 +3009,22 @@ class JobTemplateJobsList(SubListCreateAPIView): parent_key = 'job_template' +class JobTemplateInstanceGroupsList(SubListCreateAttachDetachAPIView): + + model = InstanceGroup + serializer_class = InstanceGroupSerializer + parent_model = JobTemplate + relationship = 'instance_groups' + new_in_320 = True + + def post(self, request, *args, **kwargs): + sub_id = request.data.get('id', None) + if not sub_id: + return Response(dict(msg=_("Instance Group 'id' field is missing.")), + status=status.HTTP_400_BAD_REQUEST) + return super(JobTemplateInstanceGroupsList, self).post(request, *args, **kwargs) + + class JobTemplateAccessList(ResourceAccessList): model = User # needs to be User for AccessLists's diff --git a/awx/main/access.py b/awx/main/access.py index de118499c5..a96d40de40 100644 --- a/awx/main/access.py +++ b/awx/main/access.py @@ -389,6 +389,16 @@ class BaseAccess(object): return False +class InstanceAccess(BaseAccess): + + model = Instance + + +class InstanceGroupAccess(BaseAccess): + + model = InstanceGroup + + class UserAccess(BaseAccess): ''' I can see user records when: @@ -2314,3 +2324,5 @@ register_access(WorkflowJobTemplateNode, WorkflowJobTemplateNodeAccess) register_access(WorkflowJobNode, WorkflowJobNodeAccess) register_access(WorkflowJobTemplate, WorkflowJobTemplateAccess) register_access(WorkflowJob, WorkflowJobAccess) +register_access(Instance, InstanceAccess) +register_access(InstanceGroup, InstanceGroupAccess) diff --git a/awx/main/management/commands/instance_group_remove.py b/awx/main/management/commands/instance_group_remove.py new file mode 100644 index 0000000000..22c6f2815a --- /dev/null +++ b/awx/main/management/commands/instance_group_remove.py @@ -0,0 +1,29 @@ +# Copyright (c) 2017 Ansible Tower by Red Hat +# All Rights Reserved. +from awx.main.models import Instance, InstanceGroup + +from optparse import make_option +from django.core.management.base import BaseCommand + + +class Command(BaseCommand): + + option_list = BaseCommand.option_list + ( + make_option('--queuename', dest='queuename', type='string', + help='Queue to be removed from'), + make_option('--hostname', dest='hostnames', type='string', + help='Host to remove from queue'), + ) + + def handle(self, **options): + ig = InstanceGroup.objects.filter(name=options.get('queuename')) + if not ig.exists(): + print("Queue doesn't exist") + ig = ig.first() + i = Instance.objects.filter(name=options.get("hostname")) + if not i.exists(): + print("Host doesn't exist") + i = i.first() + ig.instances.remove(i) + print("Instance removed from instance group") + diff --git a/awx/main/management/commands/list_instances.py b/awx/main/management/commands/list_instances.py index e193a45dd0..f0140e441e 100644 --- a/awx/main/management/commands/list_instances.py +++ b/awx/main/management/commands/list_instances.py @@ -1,7 +1,7 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved -from awx.main.models import Instance +from awx.main.models import Instance, InstanceGroup from django.core.management.base import NoArgsCommand @@ -13,4 +13,10 @@ class Command(NoArgsCommand): super(Command, self).__init__() for instance in Instance.objects.all(): - print("hostname: {}; created: {}; heartbeat: {}".format(instance.hostname, instance.created, instance.modified)) + print("hostname: {}; created: {}; heartbeat: {}; capacity: {}".format(instance.hostname, instance.created, + instance.modified, instance.capacity)) + for instance_group in InstanceGroup.objects.all(): + print("Instance Group: {}; created: {}; capacity: {}; members: {}".format(instance_group.name, + instance_group.created, + instance_group.capacity, + [x.hostname for x in instance_group.instances.all()])) diff --git a/awx/main/management/commands/register_queue.py b/awx/main/management/commands/register_queue.py index 0772e692f4..f4d8149dd6 100644 --- a/awx/main/management/commands/register_queue.py +++ b/awx/main/management/commands/register_queue.py @@ -29,11 +29,11 @@ class Command(BaseCommand): instance_list = [x.strip() for x in options.get('hostnames').split(",")] for inst_name in instance_list: instance = Instance.objects.filter(hostname=inst_name) - if instance.exists() and instance not in ig.instances: - ig.instances.add(instance) - print("Added instance {} to {}".format(instance, ig)) + if instance.exists() and instance not in ig.instances.all(): + ig.instances.add(instance[0]) + print("Added instance {} to {}".format(instance[0], ig)) elif not instance.exists(): - print("Instance does not exist: {}".format(instance)) + print("Instance does not exist: {}".format(inst_name)) sys.exit(1) else: - print("Instance already registered {}".format(instance)) + print("Instance already registered {}".format(instance[0])) diff --git a/awx/main/management/commands/unregister_queue.py b/awx/main/management/commands/unregister_queue.py new file mode 100644 index 0000000000..75499d5f94 --- /dev/null +++ b/awx/main/management/commands/unregister_queue.py @@ -0,0 +1,23 @@ +# Copyright (c) 2017 Ansible Tower by Red Hat +# All Rights Reserved. +from awx.main.models import InstanceGroup + +from optparse import make_option +from django.core.management.base import BaseCommand + + +class Command(BaseCommand): + + option_list = BaseCommand.option_list + ( + make_option('--queuename', dest='queuename', type='string', + help='Queue to create/update'), + ) + + def handle(self, **options): + ig = InstanceGroup.objects.filter(name=options.get('queuename')) + if not ig.exists(): + print("Instance group doesn't exist") + return + ig = ig.first() + ig.delete() + print("Instance Group Removed") diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 275689d905..00c22343f9 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -14,7 +14,7 @@ from awx.main.models.jobs import Job from awx.main.models.projects import ProjectUpdate from awx.main.models.unified_jobs import UnifiedJob -__all__ = ('Instance', 'JobOrigin', 'TowerScheduleState',) +__all__ = ('Instance', 'InstanceGroup', 'JobOrigin', 'TowerScheduleState',) class Instance(models.Model): @@ -34,6 +34,11 @@ class Instance(models.Model): class Meta: app_label = 'main' + @property + def consumed_capacity(self): + return sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname, + status__in=('running', 'waiting'))) + @property def role(self): # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing @@ -52,6 +57,15 @@ class InstanceGroup(models.Model): help_text=_('Instances that are members of this InstanceGroup'), ) + @property + def capacity(self): + return sum([x[0] for x in self.instances.values_list('capacity')]) + + @property + def consumed_capacity(self): + return sum(x.task_impact for x in UnifiedJob.objects.filter(instance_group=self, + status__in=('running', 'waiting'))) + class Meta: app_label = 'main' diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e9d7e2ee39..b5d068f778 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -102,7 +102,7 @@ def _clear_cache_keys(set_of_keys): cache.delete_many(set_of_keys) -@task(queue='broadcast_all') +@task(queue='tower_broadcast_all') def process_cache_changes(cache_keys): logger.warn('Processing cache changes, task args: {0.args!r} kwargs: {0.kwargs!r}'.format( process_cache_changes.request)) @@ -114,7 +114,7 @@ def process_cache_changes(cache_keys): break -@task(queue='default') +@task(queue='tower') def send_notifications(notification_list, job_id=None): if not isinstance(notification_list, list): raise TypeError("notification_list should be of type list") @@ -138,7 +138,7 @@ def send_notifications(notification_list, job_id=None): notification.save() -@task(bind=True, queue='default') +@task(bind=True, queue='tower') def run_administrative_checks(self): logger.warn("Running administrative checks.") if not settings.TOWER_ADMIN_ALERTS: @@ -160,7 +160,7 @@ def run_administrative_checks(self): fail_silently=True) -@task(bind=True, queue='default') +@task(bind=True, queue='tower') def cleanup_authtokens(self): logger.warn("Cleaning up expired authtokens.") AuthToken.objects.filter(expires__lt=now()).delete() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index bd8a62c66d..dcf6dbea2e 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -400,7 +400,7 @@ os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199') djcelery.setup_loader() BROKER_URL = 'amqp://guest:guest@localhost:5672//' -CELERY_DEFAULT_QUEUE = 'default' +CELERY_DEFAULT_QUEUE = 'tower' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT = ['json'] @@ -414,25 +414,18 @@ CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERY_IMPORTS = ('awx.main.scheduler.tasks',) CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), - Queue('jobs', Exchange('jobs'), routing_key='jobs'), - Queue('scheduler', Exchange('scheduler', type='topic'), routing_key='scheduler.job.#', durable=False), - Broadcast('broadcast_all') - # Projects use a fanout queue, this isn't super well supported + Queue('tower', Exchange('tower'), routing_key='tower'), + Queue('tower_scheduler', Exchange('scheduler', type='topic'), routing_key='tower_scheduler.job.#', durable=False), + Broadcast('tower_broadcast_all') ) -CELERY_ROUTES = {'awx.main.tasks.run_job': {'queue': 'jobs', - 'routing_key': 'jobs'}, - 'awx.main.tasks.run_project_update': {'queue': 'jobs', - 'routing_key': 'jobs'}, - 'awx.main.tasks.run_inventory_update': {'queue': 'jobs', - 'routing_key': 'jobs'}, - 'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs', - 'routing_key': 'jobs'}, - 'awx.main.tasks.run_system_job': {'queue': 'jobs', - 'routing_key': 'jobs'}, - 'awx.main.scheduler.tasks.run_job_launch': {'queue': 'scheduler', - 'routing_key': 'scheduler.job.launch'}, - 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler', - 'routing_key': 'scheduler.job.complete'}, +CELERY_ROUTES = {'awx.main.scheduler.tasks.run_fail_inconsistent_running_jobs': {'queue': 'tower', + 'routing_key': 'tower'}, + 'awx.main.scheduler.tasks.run_task_manager': {'queue': 'tower', + 'routing_key': 'tower'}, + 'awx.main.scheduler.tasks.run_job_launch': {'queue': 'tower_scheduler', + 'routing_key': 'tower_scheduler.job.launch'}, + 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'tower_scheduler', + 'routing_key': 'tower_scheduler.job.complete'}, 'awx.main.tasks.cluster_node_heartbeat': {'queue': 'default', 'routing_key': 'cluster.heartbeat'}, 'awx.main.tasks.purge_old_stdout_files': {'queue': 'default', diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index aab7d8aeb7..b46d05bf4c 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -4,7 +4,7 @@ minfds = 4096 nodaemon=true [program:celeryd] -command = python manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=/celerybeat-schedule -Q projects,jobs,default,scheduler,broadcast_all,%(ENV_HOSTNAME)s -n celery@%(ENV_HOSTNAME)s +command = python manage.py celeryd -l DEBUG -B --autoreload --autoscale=20,3 --schedule=/celerybeat-schedule -Q tower_scheduler,tower_broadcast_all,tower,%(ENV_HOSTNAME)s -n celery@%(ENV_HOSTNAME)s autostart = true autorestart = true redirect_stderr=true