From e720fe5dd07845d1493d4ac28f88f531684ec2c2 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Tue, 22 May 2018 18:09:29 -0400 Subject: [PATCH 1/6] decide the node a job will run early * Deciding the Instance that a Job runs on at celery task run-time makes it hard to evenly distribute tasks among Instnaces. Instead, the task manager will look at the world of running jobs and choose an instance node to run on; applying a deterministic job distribution algo. --- awx/main/models/ha.py | 23 ++++++++++++ awx/main/models/unified_jobs.py | 2 +- awx/main/scheduler/task_manager.py | 57 ++++++++++++++++++++++-------- awx/main/tasks.py | 4 ++- 4 files changed, 70 insertions(+), 16 deletions(-) diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 42e8117061..f158582871 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -92,6 +92,10 @@ class Instance(BaseModel): return sum(x.task_impact for x in UnifiedJob.objects.filter(execution_node=self.hostname, status__in=('running', 'waiting'))) + @property + def remaining_capacity(self): + return self.capacity - self.consumed_capacity + @property def role(self): # NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing @@ -187,6 +191,25 @@ class InstanceGroup(BaseModel, RelatedJobsMixin): validate_queuename(self.name) return self.name + def fit_task_to_most_remaining_capacity_instance(self, task): + instance_most_capacity = None + for i in self.instances.order_by('hostname'): + if i.remaining_capacity >= task.task_impact and \ + (instance_most_capacity is None or + i.remaining_capacity > instance_most_capacity.remaining_capacity): + instance_most_capacity = i + return instance_most_capacity + + def find_largest_idle_instance(self): + largest_instance = None + for i in self.instances.order_by('hostname'): + if i.jobs_running == 0: + if largest_instance is None: + largest_instance = i + elif i.capacity > largest_instance.capacity: + largest_instance = i + return largest_instance + class TowerScheduleState(SingletonModel): schedule_last_run = models.DateTimeField(auto_now_add=True) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index fd3d6f4082..750472323b 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1228,9 +1228,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique raise RuntimeError("Expected celery_task_id to be set on model.") kwargs['task_id'] = self.celery_task_id task_class = self._get_task_class() + args = [self.pk] from awx.main.models.ha import InstanceGroup ig = InstanceGroup.objects.get(name=queue) - args = [self.pk] if ig.controller_id: if self.supports_isolation(): # case of jobs and ad hoc commands isolated_instance = ig.instances.order_by('-capacity').first() diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 810fbafdac..1408601b79 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -234,7 +234,7 @@ class TaskManager(): def get_dependent_jobs_for_inv_and_proj_update(self, job_obj): return [{'type': j.model_to_str(), 'id': j.id} for j in job_obj.dependent_jobs.all()] - def start_task(self, task, rampart_group, dependent_tasks=None): + def start_task(self, task, rampart_group, dependent_tasks=None, instance=None): from awx.main.tasks import handle_work_error, handle_work_success dependent_tasks = dependent_tasks or [] @@ -269,7 +269,11 @@ class TaskManager(): task.log_format, task.instance_group_id, rampart_group.controller_id) else: task.instance_group = rampart_group - logger.info('Submitting %s to instance group %s.', task.log_format, task.instance_group_id) + if instance is not None: + task.execution_node = instance.hostname + logger.debug(six.text_type("Dependent {} is blocked from running").format(task.log_format)) + logger.info(six.text_type('Submitting {} to <{},{}>.').format( + task.log_format, task.instance_group_id, task.execution_node)) with disable_activity_stream(): task.celery_task_id = str(uuid.uuid4()) task.save() @@ -280,8 +284,8 @@ class TaskManager(): def post_commit(): task.websocket_emit_status(task.status) if task.status != 'failed': - if rampart_group is not None: - actual_queue=rampart_group.name + if instance is not None: + actual_queue=instance.hostname else: actual_queue=settings.CELERY_DEFAULT_QUEUE task.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler, queue=actual_queue) @@ -433,17 +437,32 @@ class TaskManager(): continue preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False + idle_instance_that_fits = None for rampart_group in preferred_instance_groups: + if idle_instance_that_fits is None: + idle_instance_that_fits = rampart_group.find_largest_idle_instance() if self.get_remaining_capacity(rampart_group.name) <= 0: logger.debug(six.text_type("Skipping group {} capacity <= 0").format(rampart_group.name)) continue - if not self.would_exceed_capacity(task, rampart_group.name): - logger.debug(six.text_type("Starting dependent {} in group {}").format(task.log_format, rampart_group.name)) + + execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task) + if execution_instance: + logger.debug(six.text_type("Starting dependent {} in group {} instance {}").format( + task.log_format, rampart_group.name, execution_instance.hostname)) + elif not execution_instance and idle_instance_that_fits: + execution_instance = idle_instance_that_fits + logger.debug(six.text_type("Starting dependent {} in group {} on idle instance {}").format( + task.log_format, rampart_group.name, execution_instance.hostname)) + if execution_instance: self.graph[rampart_group.name]['graph'].add_job(task) tasks_to_fail = filter(lambda t: t != task, dependency_tasks) tasks_to_fail += [dependent_task] - self.start_task(task, rampart_group, tasks_to_fail) + self.start_task(task, rampart_group, tasks_to_fail, execution_instance) found_acceptable_queue = True + break + else: + logger.debug(six.text_type("No instance available in group {} to run job {} w/ capacity requirement {}").format( + rampart_group.name, task.log_format, task.task_impact)) if not found_acceptable_queue: logger.debug(six.text_type("Dependent {} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format)) @@ -455,25 +474,35 @@ class TaskManager(): continue preferred_instance_groups = task.preferred_instance_groups found_acceptable_queue = False + idle_instance_that_fits = None if isinstance(task, WorkflowJob): - self.start_task(task, None, task.get_jobs_fail_chain()) + self.start_task(task, None, task.get_jobs_fail_chain(), None) continue for rampart_group in preferred_instance_groups: + if idle_instance_that_fits is None: + idle_instance_that_fits = rampart_group.find_largest_idle_instance() remaining_capacity = self.get_remaining_capacity(rampart_group.name) if remaining_capacity <= 0: logger.debug(six.text_type("Skipping group {}, remaining_capacity {} <= 0").format( rampart_group.name, remaining_capacity)) continue - if not self.would_exceed_capacity(task, rampart_group.name): - logger.debug(six.text_type("Starting {} in group {} (remaining_capacity={})").format( - task.log_format, rampart_group.name, remaining_capacity)) + + execution_instance = rampart_group.fit_task_to_most_remaining_capacity_instance(task) + if execution_instance: + logger.debug(six.text_type("Starting {} in group {} instance {} (remaining_capacity={})").format( + task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) + elif not execution_instance and idle_instance_that_fits: + execution_instance = idle_instance_that_fits + logger.debug(six.text_type("Starting {} in group {} instance {} (remaining_capacity={})").format( + task.log_format, rampart_group.name, execution_instance.hostname, remaining_capacity)) + if execution_instance: self.graph[rampart_group.name]['graph'].add_job(task) - self.start_task(task, rampart_group, task.get_jobs_fail_chain()) + self.start_task(task, rampart_group, task.get_jobs_fail_chain(), execution_instance) found_acceptable_queue = True break else: - logger.debug(six.text_type("Not enough capacity to run {} on {} (remaining_capacity={})").format( - task.log_format, rampart_group.name, remaining_capacity)) + logger.debug(six.text_type("No instance available in group {} to run job {} w/ capacity requirement {}").format( + rampart_group.name, task.log_format, task.task_impact)) if not found_acceptable_queue: logger.debug(six.text_type("{} couldn't be scheduled on graph, waiting for next cycle").format(task.log_format)) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 613caa6320..d1a06d2af7 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -872,10 +872,12 @@ class BaseTask(Task): ''' Run the job/task and capture its output. ''' + ''' execution_node = settings.CLUSTER_HOST_ID if isolated_host is not None: execution_node = isolated_host - instance = self.update_model(pk, status='running', execution_node=execution_node, + ''' + instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords instance.websocket_emit_status("running") From 8d352a4edfb54b46b8c441e2b0aad643ebd9103e Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 25 May 2018 14:53:24 -0400 Subject: [PATCH 2/6] conform isolated system to new early node choice * Randomly chose an instance in the controller instance group for which to control the isolated node run. Note the chosen instance via a job controller_node field --- awx/api/serializers.py | 7 ++--- .../0040_v330_unifiedjob_controller_node.py | 20 ++++++++++++++ awx/main/models/unified_jobs.py | 26 +++++++++++-------- awx/main/scheduler/task_manager.py | 22 +++++++++++----- awx/main/tasks.py | 9 +++---- 5 files changed, 57 insertions(+), 27 deletions(-) create mode 100644 awx/main/migrations/0040_v330_unifiedjob_controller_node.py diff --git a/awx/api/serializers.py b/awx/api/serializers.py index e7d09f8dde..2376956da5 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -702,7 +702,8 @@ class UnifiedJobSerializer(BaseSerializer): model = UnifiedJob fields = ('*', 'unified_job_template', 'launch_type', 'status', 'failed', 'started', 'finished', 'elapsed', 'job_args', - 'job_cwd', 'job_env', 'job_explanation', 'execution_node', + 'job_cwd', 'job_env', 'job_explanation', + 'execution_node', 'controller_node', 'result_traceback', 'event_processing_finished') extra_kwargs = { 'unified_job_template': { @@ -3434,7 +3435,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer): class Meta: model = WorkflowJob fields = ('*', 'workflow_job_template', 'extra_vars', 'allow_simultaneous', - '-execution_node', '-event_processing_finished',) + '-execution_node', '-event_processing_finished', '-controller_node',) def get_related(self, obj): res = super(WorkflowJobSerializer, self).get_related(obj) @@ -3463,7 +3464,7 @@ class WorkflowJobSerializer(LabelsListMixin, UnifiedJobSerializer): class WorkflowJobListSerializer(WorkflowJobSerializer, UnifiedJobListSerializer): class Meta: - fields = ('*', '-execution_node',) + fields = ('*', '-execution_node', '-controller_node',) class WorkflowJobCancelSerializer(WorkflowJobSerializer): diff --git a/awx/main/migrations/0040_v330_unifiedjob_controller_node.py b/awx/main/migrations/0040_v330_unifiedjob_controller_node.py new file mode 100644 index 0000000000..8b127dd06d --- /dev/null +++ b/awx/main/migrations/0040_v330_unifiedjob_controller_node.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.11.11 on 2018-05-25 18:58 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0039_v330_custom_venv_help_text'), + ] + + operations = [ + migrations.AddField( + model_name='unifiedjob', + name='controller_node', + field=models.TextField(blank=True, default=b'', editable=False, help_text='The instance that managed the isolated execution environment.'), + ), + ] diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 750472323b..62c5fc4e42 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -507,7 +507,8 @@ class StdoutMaxBytesExceeded(Exception): self.supported = supported -class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin): +class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique, + UnifiedJobTypeStringMixin, TaskManagerUnifiedJobMixin): ''' Concrete base class for unified job run by the task engine. ''' @@ -571,6 +572,12 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique editable=False, help_text=_("The node the job executed on."), ) + controller_node = models.TextField( + blank=True, + default='', + editable=False, + help_text=_("The instance that managed the isolated execution environment."), + ) notifications = models.ManyToManyField( 'Notification', editable=False, @@ -1228,17 +1235,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique raise RuntimeError("Expected celery_task_id to be set on model.") kwargs['task_id'] = self.celery_task_id task_class = self._get_task_class() - args = [self.pk] - from awx.main.models.ha import InstanceGroup - ig = InstanceGroup.objects.get(name=queue) - if ig.controller_id: - if self.supports_isolation(): # case of jobs and ad hoc commands - isolated_instance = ig.instances.order_by('-capacity').first() - args.append(isolated_instance.hostname) - else: # proj & inv updates, system jobs run on controller - queue = ig.controller.name kwargs['queue'] = queue - task_class().apply_async(args, opts, **kwargs) + task_class().apply_async([self.pk], opts, **kwargs) def start(self, error_callback, success_callback, **kwargs): ''' @@ -1400,3 +1398,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique r['{}_schedule_id'.format(name)] = self.schedule.pk r['{}_schedule_name'.format(name)] = self.schedule.name return r + + def get_celery_queue_name(self): + return self.controller_node or self.execution_node or settings.CELERY_DEFAULT_QUEUE + + def get_isolated_execution_node_name(self): + return self.execution_node if self.controller_node else None diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 1408601b79..9ad7a79652 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -7,6 +7,7 @@ import logging import uuid import json import six +import random from sets import Set # Django @@ -265,8 +266,16 @@ class TaskManager(): elif not task.supports_isolation() and rampart_group.controller_id: # non-Ansible jobs on isolated instances run on controller task.instance_group = rampart_group.controller - logger.info('Submitting isolated %s to queue %s via %s.', - task.log_format, task.instance_group_id, rampart_group.controller_id) + task.execution_node = random.choice(list(rampart_group.controller.instances.all().values_list('hostname', flat=True))) + logger.info(six.text_type('Submitting isolated {} to queue {}.').format( + task.log_format, task.instance_group.name, task.execution_node)) + elif task.supports_isolation() and rampart_group.controller_id: + # TODO: Select from only online nodes in the controller node + task.instance_group = rampart_group + task.execution_node = instance.hostname + task.controller_node = random.choice(list(rampart_group.controller.instances.all().values_list('hostname', flat=True))) + logger.info(six.text_type('Submitting isolated {} to queue {} controlled by {}.').format( + task.log_format, task.execution_node, task.controller_node)) else: task.instance_group = rampart_group if instance is not None: @@ -284,11 +293,10 @@ class TaskManager(): def post_commit(): task.websocket_emit_status(task.status) if task.status != 'failed': - if instance is not None: - actual_queue=instance.hostname - else: - actual_queue=settings.CELERY_DEFAULT_QUEUE - task.start_celery_task(opts, error_callback=error_handler, success_callback=success_handler, queue=actual_queue) + task.start_celery_task(opts, + error_callback=error_handler, + success_callback=success_handler, + queue=task.get_celery_queue_name()) connection.on_commit(post_commit) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d1a06d2af7..e9ff60b0da 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -868,15 +868,10 @@ class BaseTask(Task): ''' @with_path_cleanup - def run(self, pk, isolated_host=None, **kwargs): + def run(self, pk, **kwargs): ''' Run the job/task and capture its output. ''' - ''' - execution_node = settings.CLUSTER_HOST_ID - if isolated_host is not None: - execution_node = isolated_host - ''' instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords @@ -886,6 +881,8 @@ class BaseTask(Task): extra_update_fields = {} event_ct = 0 stdout_handle = None + isolated_host = instance.get_isolated_execution_node_name() + try: kwargs['isolated'] = isolated_host is not None self.pre_run_hook(instance, **kwargs) From 9863fe71dcaa987ed28d70ac6873ece058344ab3 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Thu, 31 May 2018 09:37:38 -0400 Subject: [PATCH 3/6] do not require privileged iso container * The init call w/ privileged was causing my laptop to wig out. This changeset still functions w/ out requiring privileged access. --- tools/docker-isolated-override.yml | 4 +++- tools/docker-isolated/Dockerfile | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/docker-isolated-override.yml b/tools/docker-isolated-override.yml index 9f9d473def..55ff09e97d 100644 --- a/tools/docker-isolated-override.yml +++ b/tools/docker-isolated-override.yml @@ -14,4 +14,6 @@ services: - "../awx/main/expect:/awx_devel" - "../awx/lib:/awx_lib" - "/sys/fs/cgroup:/sys/fs/cgroup:ro" - privileged: true + tmpfs: + - "/tmp:exec" + - "/run" diff --git a/tools/docker-isolated/Dockerfile b/tools/docker-isolated/Dockerfile index 69af7526cc..53a8b67481 100644 --- a/tools/docker-isolated/Dockerfile +++ b/tools/docker-isolated/Dockerfile @@ -27,4 +27,7 @@ RUN ssh-keygen -A RUN mkdir -p /root/.ssh RUN touch /root/.ssh/authorized_keys +STOPSIGNAL SIGRTMIN+3 + + CMD ["/usr/sbin/init"] From 9d732cdbdf01cb94320873f8a01f309f9eb95767 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Thu, 31 May 2018 13:48:47 -0400 Subject: [PATCH 4/6] update unit and functional tests --- .../functional/models/test_unified_job.py | 4 +- .../task_management/test_rampart_groups.py | 21 +++++---- .../task_management/test_scheduler.py | 43 +++++++++++-------- awx/main/tests/unit/test_tasks.py | 17 +++++--- 4 files changed, 51 insertions(+), 34 deletions(-) diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index 05d5aa318a..0ee56349fe 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -88,7 +88,7 @@ class TestIsolatedRuns: with mock.patch.object(job, '_get_task_class') as task_class: task_class.return_value = MockTaskClass job.start_celery_task([], error_callback, success_callback, 'thepentagon') - mock_async.assert_called_with([job.id, 'iso2'], [], + mock_async.assert_called_with([job.id], [], link_error=error_callback, link=success_callback, queue='thepentagon', @@ -100,7 +100,7 @@ class TestIsolatedRuns: with mock.patch.object(job, '_get_task_class') as task_class: task_class.return_value = MockTaskClass job.start_celery_task([], error_callback, success_callback, 'thepentagon') - mock_async.assert_called_with([job.id, 'iso1'], [], + mock_async.assert_called_with([job.id], [], link_error=error_callback, link=success_callback, queue='thepentagon', diff --git a/awx/main/tests/functional/task_management/test_rampart_groups.py b/awx/main/tests/functional/task_management/test_rampart_groups.py index ce79b78003..0eae5b2805 100644 --- a/awx/main/tests/functional/task_management/test_rampart_groups.py +++ b/awx/main/tests/functional/task_management/test_rampart_groups.py @@ -31,7 +31,7 @@ def test_multi_group_basic_job_launch(instance_factory, default_instance_group, mock_task_impact.return_value = 500 with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_has_calls([mock.call(j1, ig1, []), mock.call(j2, ig2, [])]) + TaskManager.start_task.assert_has_calls([mock.call(j1, ig1, [], i1), mock.call(j2, ig2, [], i2)]) @@ -65,15 +65,18 @@ def test_multi_group_with_shared_dependency(instance_factory, default_instance_g with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() pu = p.project_updates.first() - TaskManager.start_task.assert_called_once_with(pu, default_instance_group, [j1]) + TaskManager.start_task.assert_called_once_with(pu, + default_instance_group, + [j1], + default_instance_group.instances.all()[0]) pu.finished = pu.created + timedelta(seconds=1) pu.status = "successful" pu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_any_call(j1, ig1, []) - TaskManager.start_task.assert_any_call(j2, ig2, []) + TaskManager.start_task.assert_any_call(j1, ig1, [], i1) + TaskManager.start_task.assert_any_call(j2, ig2, [], i2) assert TaskManager.start_task.call_count == 2 @@ -85,7 +88,7 @@ def test_workflow_job_no_instancegroup(workflow_job_template_factory, default_in wfj.save() with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(wfj, None, []) + TaskManager.start_task.assert_called_once_with(wfj, None, [], None) assert wfj.instance_group is None @@ -131,8 +134,9 @@ def test_overcapacity_blocking_other_groups_unaffected(instance_factory, default mock_task_impact.return_value = 500 with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: tm.schedule() - mock_job.assert_has_calls([mock.call(j1, ig1, []), mock.call(j1_1, ig1, []), - mock.call(j2, ig2, [])]) + mock_job.assert_has_calls([mock.call(j1, ig1, [], i1), + mock.call(j1_1, ig1, [], i1), + mock.call(j2, ig2, [], i2)]) assert mock_job.call_count == 3 @@ -163,7 +167,8 @@ def test_failover_group_run(instance_factory, default_instance_group, mocker, mock_task_impact.return_value = 500 with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: tm.schedule() - mock_job.assert_has_calls([mock.call(j1, ig1, []), mock.call(j1_1, ig2, [])]) + mock_job.assert_has_calls([mock.call(j1, ig1, [], i1), + mock.call(j1_1, ig2, [], i2)]) assert mock_job.call_count == 2 diff --git a/awx/main/tests/functional/task_management/test_scheduler.py b/awx/main/tests/functional/task_management/test_scheduler.py index 82625533c7..813adff7cf 100644 --- a/awx/main/tests/functional/task_management/test_scheduler.py +++ b/awx/main/tests/functional/task_management/test_scheduler.py @@ -18,6 +18,7 @@ from awx.main.models.notifications import JobNotificationMixin @pytest.mark.django_db def test_single_job_scheduler_launch(default_instance_group, job_template_factory, mocker): + instance = default_instance_group.instances.all()[0] objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) @@ -26,11 +27,12 @@ def test_single_job_scheduler_launch(default_instance_group, job_template_factor j.save() with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j, default_instance_group, []) + TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance) @pytest.mark.django_db def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_template_factory, mocker): + instance = default_instance_group.instances.all()[0] objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"]) @@ -42,16 +44,17 @@ def test_single_jt_multi_job_launch_blocks_last(default_instance_group, job_temp j2.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j1, default_instance_group, []) + TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [], instance) j1.status = "successful" j1.save() with mocker.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j2, default_instance_group, []) + TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [], instance) @pytest.mark.django_db def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group, job_template_factory, mocker): + instance = default_instance_group.instances.all()[0] objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start", "job_should_not_start"]) @@ -68,12 +71,13 @@ def test_single_jt_multi_job_launch_allow_simul_allowed(default_instance_group, j2.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group, []), - mock.call(j2, default_instance_group, [])]) + TaskManager.start_task.assert_has_calls([mock.call(j1, default_instance_group, [], instance), + mock.call(j2, default_instance_group, [], instance)]) @pytest.mark.django_db def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory, mocker): + instance = default_instance_group.instances.all()[0] objects1 = job_template_factory('jt1', organization='org1', project='proj1', inventory='inv1', credential='cred1', jobs=["job_should_start"]) @@ -91,20 +95,20 @@ def test_multi_jt_capacity_blocking(default_instance_group, job_template_factory mock_task_impact.return_value = 500 with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: tm.schedule() - mock_job.assert_called_once_with(j1, default_instance_group, []) + mock_job.assert_called_once_with(j1, default_instance_group, [], instance) j1.status = "successful" j1.save() with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job: tm.schedule() - mock_job.assert_called_once_with(j2, default_instance_group, []) - - + mock_job.assert_called_once_with(j2, default_instance_group, [], instance) + @pytest.mark.django_db def test_single_job_dependencies_project_launch(default_instance_group, job_template_factory, mocker): objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) + instance = default_instance_group.instances.all()[0] j = objects.jobs["job_should_start"] j.status = 'pending' j.save() @@ -121,12 +125,12 @@ def test_single_job_dependencies_project_launch(default_instance_group, job_temp mock_pu.assert_called_once_with(j) pu = [x for x in p.project_updates.all()] assert len(pu) == 1 - TaskManager.start_task.assert_called_once_with(pu[0], default_instance_group, [j]) + TaskManager.start_task.assert_called_once_with(pu[0], default_instance_group, [j], instance) pu[0].status = "successful" pu[0].save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j, default_instance_group, []) + TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance) @pytest.mark.django_db @@ -134,6 +138,7 @@ def test_single_job_dependencies_inventory_update_launch(default_instance_group, objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) + instance = default_instance_group.instances.all()[0] j = objects.jobs["job_should_start"] j.status = 'pending' j.save() @@ -151,12 +156,12 @@ def test_single_job_dependencies_inventory_update_launch(default_instance_group, mock_iu.assert_called_once_with(j, ii) iu = [x for x in ii.inventory_updates.all()] assert len(iu) == 1 - TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [j]) + TaskManager.start_task.assert_called_once_with(iu[0], default_instance_group, [j], instance) iu[0].status = "successful" iu[0].save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j, default_instance_group, []) + TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance) @pytest.mark.django_db @@ -164,6 +169,7 @@ def test_job_dependency_with_already_updated(default_instance_group, job_templat objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["job_should_start"]) + instance = default_instance_group.instances.all()[0] j = objects.jobs["job_should_start"] j.status = 'pending' j.save() @@ -185,11 +191,12 @@ def test_job_dependency_with_already_updated(default_instance_group, job_templat mock_iu.assert_not_called() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j, default_instance_group, []) + TaskManager.start_task.assert_called_once_with(j, default_instance_group, [], instance) @pytest.mark.django_db def test_shared_dependencies_launch(default_instance_group, job_template_factory, mocker, inventory_source_factory): + instance = default_instance_group.instances.all()[0] objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred', jobs=["first_job", "second_job"]) @@ -218,8 +225,8 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory TaskManager().schedule() pu = p.project_updates.first() iu = ii.inventory_updates.first() - TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [iu, j1]), - mock.call(iu, default_instance_group, [pu, j1])]) + TaskManager.start_task.assert_has_calls([mock.call(pu, default_instance_group, [iu, j1], instance), + mock.call(iu, default_instance_group, [pu, j1], instance)]) pu.status = "successful" pu.finished = pu.created + timedelta(seconds=1) pu.save() @@ -228,12 +235,12 @@ def test_shared_dependencies_launch(default_instance_group, job_template_factory iu.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j1, default_instance_group, []) + TaskManager.start_task.assert_called_once_with(j1, default_instance_group, [], instance) j1.status = "successful" j1.save() with mock.patch("awx.main.scheduler.TaskManager.start_task"): TaskManager().schedule() - TaskManager.start_task.assert_called_once_with(j2, default_instance_group, []) + TaskManager.start_task.assert_called_once_with(j2, default_instance_group, [], instance) pu = [x for x in p.project_updates.all()] iu = [x for x in ii.inventory_updates.all()] assert len(pu) == 1 diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 07a111654b..0920767c57 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -196,7 +196,7 @@ def parse_extra_vars(args): return extra_vars -class TestJobExecution: +class TestJobExecution(object): """ For job runs, test that `ansible-playbook` is invoked with the proper arguments, environment variables, and pexpect passwords for a variety of @@ -440,7 +440,7 @@ class TestGenericRun(TestJobExecution): with pytest.raises(Exception): self.task.run(self.pk) for c in [ - mock.call(self.pk, execution_node=settings.CLUSTER_HOST_ID, status='running', start_args=''), + mock.call(self.pk, status='running', start_args=''), mock.call(self.pk, status='canceled') ]: assert c in self.task.update_model.call_args_list @@ -626,7 +626,12 @@ class TestAdhocRun(TestJobExecution): class TestIsolatedExecution(TestJobExecution): - REMOTE_HOST = 'some-isolated-host' + ISOLATED_HOST = 'some-isolated-host' + + def get_instance(self): + instance = super(TestIsolatedExecution, self).get_instance() + instance.get_isolated_execution_node_name = mock.Mock(return_value=self.ISOLATED_HOST) + return instance def test_with_ssh_credentials(self): ssh = CredentialType.defaults['ssh']() @@ -659,12 +664,12 @@ class TestIsolatedExecution(TestJobExecution): f.write(data) return ('successful', 0) self.run_pexpect.side_effect = _mock_job_artifacts - self.task.run(self.pk, self.REMOTE_HOST) + self.task.run(self.pk) playbook_run = self.run_pexpect.call_args_list[0][0] assert ' '.join(playbook_run[0]).startswith(' '.join([ 'ansible-playbook', 'run_isolated.yml', '-u', settings.AWX_ISOLATED_USERNAME, - '-T', str(settings.AWX_ISOLATED_CONNECTION_TIMEOUT), '-i', self.REMOTE_HOST + ',', + '-T', str(settings.AWX_ISOLATED_CONNECTION_TIMEOUT), '-i', self.ISOLATED_HOST + ',', '-e', ])) extra_vars = playbook_run[0][playbook_run[0].index('-e') + 1] @@ -705,7 +710,7 @@ class TestIsolatedExecution(TestJobExecution): with mock.patch('requests.get') as mock_get: mock_get.return_value = mock.Mock(content=inventory) with pytest.raises(Exception): - self.task.run(self.pk, self.REMOTE_HOST) + self.task.run(self.pk, self.ISOLATED_HOST) class TestJobCredentials(TestJobExecution): From b94cf379f61f781ac496a4484cf5f59ae53c4615 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Fri, 1 Jun 2018 15:36:24 -0400 Subject: [PATCH 5/6] do not choose offline instances --- awx/main/models/ha.py | 13 +++- awx/main/models/unified_jobs.py | 4 +- awx/main/scheduler/task_manager.py | 4 +- awx/main/tests/unit/models/test_ha.py | 85 +++++++++++++++++++++++++++ awx/main/tests/unit/test_tasks.py | 4 +- 5 files changed, 101 insertions(+), 9 deletions(-) create mode 100644 awx/main/tests/unit/models/test_ha.py diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index f158582871..16589c0a77 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -1,6 +1,8 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. +import six +import random from decimal import Decimal from django.core.exceptions import ValidationError @@ -11,7 +13,6 @@ from django.utils.translation import ugettext_lazy as _ from django.conf import settings from django.utils.timezone import now, timedelta -import six from solo.models import SingletonModel from awx import __version__ as awx_application_version @@ -193,7 +194,7 @@ class InstanceGroup(BaseModel, RelatedJobsMixin): def fit_task_to_most_remaining_capacity_instance(self, task): instance_most_capacity = None - for i in self.instances.order_by('hostname'): + for i in self.instances.filter(capacity__gt=0).order_by('hostname'): if i.remaining_capacity >= task.task_impact and \ (instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity): @@ -202,7 +203,7 @@ class InstanceGroup(BaseModel, RelatedJobsMixin): def find_largest_idle_instance(self): largest_instance = None - for i in self.instances.order_by('hostname'): + for i in self.instances.filter(capacity__gt=0).order_by('hostname'): if i.jobs_running == 0: if largest_instance is None: largest_instance = i @@ -210,6 +211,12 @@ class InstanceGroup(BaseModel, RelatedJobsMixin): largest_instance = i return largest_instance + def choose_online_controller_node(self): + return random.choice(list(self.controller + .instances + .filter(capacity__gt=0) + .values_list('hostname', flat=True))) + class TowerScheduleState(SingletonModel): schedule_last_run = models.DateTimeField(auto_now_add=True) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 62c5fc4e42..1be11ebcca 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -1402,5 +1402,5 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique def get_celery_queue_name(self): return self.controller_node or self.execution_node or settings.CELERY_DEFAULT_QUEUE - def get_isolated_execution_node_name(self): - return self.execution_node if self.controller_node else None + def is_isolated(self): + return bool(self.controller_node) diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 9ad7a79652..198f0e3652 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -270,17 +270,15 @@ class TaskManager(): logger.info(six.text_type('Submitting isolated {} to queue {}.').format( task.log_format, task.instance_group.name, task.execution_node)) elif task.supports_isolation() and rampart_group.controller_id: - # TODO: Select from only online nodes in the controller node task.instance_group = rampart_group task.execution_node = instance.hostname - task.controller_node = random.choice(list(rampart_group.controller.instances.all().values_list('hostname', flat=True))) + task.controller_node = rampart_group.choose_online_controller_node() logger.info(six.text_type('Submitting isolated {} to queue {} controlled by {}.').format( task.log_format, task.execution_node, task.controller_node)) else: task.instance_group = rampart_group if instance is not None: task.execution_node = instance.hostname - logger.debug(six.text_type("Dependent {} is blocked from running").format(task.log_format)) logger.info(six.text_type('Submitting {} to <{},{}>.').format( task.log_format, task.instance_group_id, task.execution_node)) with disable_activity_stream(): diff --git a/awx/main/tests/unit/models/test_ha.py b/awx/main/tests/unit/models/test_ha.py new file mode 100644 index 0000000000..4ceb83c77a --- /dev/null +++ b/awx/main/tests/unit/models/test_ha.py @@ -0,0 +1,85 @@ +import pytest +import mock +from mock import Mock + +from awx.main.models import ( + Job, + InstanceGroup, +) + + +def T(impact): + j = mock.Mock(Job()) + j.task_impact = impact + return j + + +def Is(param): + ''' + param: + [remaining_capacity1, remaining_capacity2, remaining_capacity3, ...] + [(jobs_running1, capacity1), (jobs_running2, capacity2), (jobs_running3, capacity3), ...] + ''' + + instances = [] + if isinstance(param[0], tuple): + for (jobs_running, capacity) in param: + inst = Mock() + inst.capacity = capacity + inst.jobs_running = jobs_running + instances.append(inst) + else: + for i in param: + inst = Mock() + inst.remaining_capacity = i + instances.append(inst) + return instances + + +class TestInstanceGroup(object): + @pytest.mark.parametrize('task,instances,instance_fit_index,reason', [ + (T(100), Is([100]), 0, "Only one, pick it"), + (T(100), Is([100, 100]), 0, "Two equally good fits, pick the first"), + (T(100), Is([50, 100]), 1, "First instance not as good as second instance"), + (T(100), Is([50, 0, 20, 100, 100, 100, 30, 20]), 3, "Pick Instance [3] as it is the first that the task fits in."), + (T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"), + ]) + def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason): + with mock.patch.object(InstanceGroup, + 'instances', + Mock(spec_set=['filter'], + filter=lambda *args, **kargs: Mock(spec_set=['order_by'], + order_by=lambda x: instances))): + ig = InstanceGroup(id=10) + + if instance_fit_index is None: + assert ig.fit_task_to_most_remaining_capacity_instance(task) is None, reason + else: + assert ig.fit_task_to_most_remaining_capacity_instance(task) == \ + instances[instance_fit_index], reason + + + @pytest.mark.parametrize('instances,instance_fit_index,reason', [ + (Is([(0, 100)]), 0, "One idle instance, pick it"), + (Is([(1, 100)]), None, "One un-idle instance, pick nothing"), + (Is([(0, 100), (0, 200), (1, 500), (0, 700)]), 3, "Pick the largest idle instance"), + (Is([(0, 100), (0, 200), (1, 10000), (0, 700), (0, 699)]), 3, "Pick the largest idle instance"), + (Is([(0, 0)]), None, "One idle but down instance, don't pick it"), + ]) + def test_find_largest_idle_instance(self, instances, instance_fit_index, reason): + def filter_offline_instances(*args): + return filter(lambda i: i.capacity > 0, instances) + + with mock.patch.object(InstanceGroup, + 'instances', + Mock(spec_set=['filter'], + filter=lambda *args, **kargs: Mock(spec_set=['order_by'], + order_by=filter_offline_instances))): + ig = InstanceGroup(id=10) + + if instance_fit_index is None: + assert ig.find_largest_idle_instance() is None, reason + else: + assert ig.find_largest_idle_instance() == \ + instances[instance_fit_index], reason + diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 0920767c57..ccb84c2fa1 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -627,10 +627,12 @@ class TestAdhocRun(TestJobExecution): class TestIsolatedExecution(TestJobExecution): ISOLATED_HOST = 'some-isolated-host' + ISOLATED_CONTROLLER_HOST = 'some-isolated-controller-host' def get_instance(self): instance = super(TestIsolatedExecution, self).get_instance() - instance.get_isolated_execution_node_name = mock.Mock(return_value=self.ISOLATED_HOST) + instance.controller_node = self.ISOLATED_CONTROLLER_HOST + instance.execution_node = self.ISOLATED_HOST return instance def test_with_ssh_credentials(self): From 7b0b4f562d33f362faf3252da3feb76ed9ae9001 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Mon, 4 Jun 2018 09:19:38 -0400 Subject: [PATCH 6/6] get isolated execution at the point its needed * Instead of passing around the isolated host that the task is to execute on; grab the isolated execution host from the instance further down the call stack. Without passing the isolated hostname around. --- awx/main/expect/isolated_manager.py | 6 ++---- awx/main/tasks.py | 12 +++++------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/awx/main/expect/isolated_manager.py b/awx/main/expect/isolated_manager.py index 71ce262fef..9b4ce1db6a 100644 --- a/awx/main/expect/isolated_manager.py +++ b/awx/main/expect/isolated_manager.py @@ -468,13 +468,11 @@ class IsolatedManager(object): return OutputEventFilter(job_event_callback) - def run(self, instance, host, private_data_dir, proot_temp_dir): + def run(self, instance, private_data_dir, proot_temp_dir): """ Run a job on an isolated host. :param instance: a `model.Job` instance - :param host: the hostname (or IP address) to run the - isolated job on :param private_data_dir: an absolute path on the local file system where job-specific data should be written (i.e., `/tmp/ansible_awx_xyz/`) @@ -486,7 +484,7 @@ class IsolatedManager(object): `ansible-playbook` run. """ self.instance = instance - self.host = host + self.host = instance.execution_node self.private_data_dir = private_data_dir self.proot_temp_dir = proot_temp_dir status, rc = self.dispatch() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e9ff60b0da..be10d3ef58 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -881,10 +881,8 @@ class BaseTask(Task): extra_update_fields = {} event_ct = 0 stdout_handle = None - isolated_host = instance.get_isolated_execution_node_name() try: - kwargs['isolated'] = isolated_host is not None self.pre_run_hook(instance, **kwargs) if instance.cancel_flag: instance = self.update_model(instance.pk, status='canceled') @@ -944,7 +942,7 @@ class BaseTask(Task): credential, env, safe_env, args, safe_args, kwargs['private_data_dir'] ) - if isolated_host is None: + if instance.is_isolated() is False: stdout_handle = self.get_stdout_handle(instance) else: stdout_handle = isolated_manager.IsolatedManager.get_stdout_handle( @@ -960,7 +958,7 @@ class BaseTask(Task): ssh_key_path = self.get_ssh_key_path(instance, **kwargs) # If we're executing on an isolated host, don't bother adding the # key to the agent in this environment - if ssh_key_path and isolated_host is None: + if ssh_key_path and instance.is_isolated() is False: ssh_auth_sock = os.path.join(kwargs['private_data_dir'], 'ssh_auth.sock') args = run.wrap_args_with_ssh_agent(args, ssh_key_path, ssh_auth_sock) safe_args = run.wrap_args_with_ssh_agent(safe_args, ssh_key_path, ssh_auth_sock) @@ -980,11 +978,11 @@ class BaseTask(Task): proot_cmd=getattr(settings, 'AWX_PROOT_CMD', 'bwrap'), ) instance = self.update_model(instance.pk, output_replacements=output_replacements) - if isolated_host: + if instance.is_isolated() is True: manager_instance = isolated_manager.IsolatedManager( args, cwd, env, stdout_handle, ssh_key_path, **_kw ) - status, rc = manager_instance.run(instance, isolated_host, + status, rc = manager_instance.run(instance, kwargs['private_data_dir'], kwargs.get('proot_temp_dir')) else: @@ -1335,7 +1333,7 @@ class RunJob(BaseTask): job_request_id = '' if self.request.id is None else self.request.id pu_ig = job.instance_group pu_en = job.execution_node - if kwargs['isolated']: + if job.is_isolated() is True: pu_ig = pu_ig.controller pu_en = settings.CLUSTER_HOST_ID local_project_sync = job.project.create_project_update(