diff --git a/awx/main/scheduler/kubernetes.py b/awx/main/scheduler/kubernetes.py index 1aa1978276..f4e35caec9 100644 --- a/awx/main/scheduler/kubernetes.py +++ b/awx/main/scheduler/kubernetes.py @@ -26,9 +26,6 @@ class PodManager(object): namespace=self.namespace, _request_timeout=settings.AWX_CONTAINER_GROUP_DEFAULT_LAUNCH_TIMEOUT) - # We don't do any fancy timeout logic here because it is handled - # at a higher level in the job spawning process. See - # settings.AWX_ISOLATED_LAUNCH_TIMEOUT and settings.AWX_ISOLATED_CONNECTION_TIMEOUT while True: pod = self.kube_api.read_namespaced_pod(name=self.pod_name, namespace=self.namespace, diff --git a/awx/main/tasks.py b/awx/main/tasks.py index eb2d48546d..6253446cd5 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -40,6 +40,9 @@ from django.utils.translation import ugettext_lazy as _ from django.core.cache import cache from django.core.exceptions import ObjectDoesNotExist +# Kubernetes +from kubernetes.client.rest import ApiException + # Django-CRUM from crum import impersonate @@ -1183,6 +1186,18 @@ class BaseTask(object): ''' Run the job/task and capture its output. ''' + self.instance = self.model.objects.get(pk=pk) + containerized = self.instance.is_containerized + pod_manager = None + if containerized: + # Here we are trying to launch a pod before transitioning the job into a running + # state. For some scenarios (like waiting for resources to become available) we do this + # rather than marking the job as error or failed. This is not always desirable. Cases + # such as invalid authentication should surface as an error. + pod_manager = self.deploy_container_group_pod(self.instance) + if not pod_manager: + return + # self.instance because of the update_model pattern and when it's used in callback handlers self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords @@ -1208,7 +1223,6 @@ class BaseTask(object): try: isolated = self.instance.is_isolated() - containerized = self.instance.is_containerized self.instance.send_notification_templates("running") private_data_dir = self.build_private_data_dir(self.instance) self.pre_run_hook(self.instance, private_data_dir) @@ -1287,6 +1301,10 @@ class BaseTask(object): }, } + if containerized: + # We don't want HOME passed through to container groups. + params['envvars'].pop('HOME') + if isinstance(self.instance, AdHocCommand): params['module'] = self.build_module_name(self.instance) params['module_args'] = self.build_module_args(self.instance) @@ -1316,16 +1334,6 @@ class BaseTask(object): params.pop('inventory'), os.path.join(private_data_dir, 'inventory') ) - pod_manager = None - if containerized: - from awx.main.scheduler.kubernetes import PodManager # Avoid circular import - params['envvars'].pop('HOME') - pod_manager = PodManager(self.instance) - self.cleanup_paths.append(pod_manager.kube_config) - pod_manager.deploy() - self.instance.execution_node = pod_manager.pod_name - self.instance.save(update_fields=['execution_node']) - ansible_runner.utils.dump_artifacts(params) isolated_manager_instance = isolated_manager.IsolatedManager( @@ -1385,6 +1393,41 @@ class BaseTask(object): raise AwxTaskError.TaskError(self.instance, rc) + def deploy_container_group_pod(self, task): + from awx.main.scheduler.kubernetes import PodManager # Avoid circular import + pod_manager = PodManager(self.instance) + self.cleanup_paths.append(pod_manager.kube_config) + try: + log_name = task.log_format + logger.debug(f"Launching pod for {log_name}.") + pod_manager.deploy() + except (ApiException, Exception) as exc: + if isinstance(exc, ApiException) and exc.status == 403: + try: + if 'exceeded quota' in json.loads(exc.body)['message']: + # If the k8s cluster does not have capacity, we move the job back into + # pending and immediately reschedule the task manager. + logger.warn(exc.body) + logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.") + time.sleep(10) + self.update_model(task.pk, status='pending') + schedule_task_manager() + return + except Exception: + logger.exception(f"Unable to handle response from Kubernetes API for {log_name}.") + + logger.exception(f"Error when launching pod for {log_name}") + self.update_model(task.pk, status='error', result_traceback=exc.body) + return + + logger.debug(f"Pod online. Starting {log_name}.") + self.update_model(task.pk, execution_node=pod_manager.pod_name) + return pod_manager + + + + + @task() class RunJob(BaseTask): ''' diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 77c13fffd2..2bae5aef9a 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -368,6 +368,7 @@ class TestGenericRun(): task = tasks.RunJob() task.update_model = mock.Mock(return_value=job) + task.model.objects.get = mock.Mock(return_value=job) task.build_private_data_files = mock.Mock(side_effect=OSError()) with mock.patch('awx.main.tasks.copy_tree'): @@ -387,6 +388,7 @@ class TestGenericRun(): task = tasks.RunJob() task.update_model = mock.Mock(wraps=update_model_wrapper) + task.model.objects.get = mock.Mock(return_value=job) task.build_private_data_files = mock.Mock() with mock.patch('awx.main.tasks.copy_tree'): @@ -578,6 +580,7 @@ class TestAdhocRun(TestJobExecution): task = tasks.RunAdHocCommand() task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper) + task.model.objects.get = mock.Mock(return_value=adhoc_job) task.build_inventory = mock.Mock() with pytest.raises(Exception):