From ff118f2177be0da999a0a383cd67c6c35e25fe3d Mon Sep 17 00:00:00 2001 From: Seth Foster Date: Fri, 1 Jul 2022 14:46:29 -0400 Subject: [PATCH] Manage pending workflow jobs in Workflow Manager get_tasks uses UnifiedJob Additionally, make local overrides run after development settings --- .../management/commands/inventory_import.py | 2 +- awx/main/scheduler/task_manager.py | 51 +++++++------------ awx/settings/development.py | 28 +++++----- 3 files changed, 33 insertions(+), 48 deletions(-) diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index 78acec423d..4361be300a 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -862,7 +862,7 @@ class Command(BaseCommand): overwrite_vars=bool(options.get('overwrite_vars', False)), ) inventory_update = inventory_source.create_inventory_update( - _eager_fields=dict(job_args=json.dumps(sys.argv), job_env=dict(os.environ.items()), job_cwd=os.getcwd()) + _eager_fields=dict(status='running', job_args=json.dumps(sys.argv), job_env=dict(os.environ.items()), job_cwd=os.getcwd()) ) data = AnsibleInventoryLoader(source=source, verbosity=verbosity).load() diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 6970518df1..172f144ff0 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -76,17 +76,8 @@ class TaskBase: @timeit def get_tasks(self, filter_args): - jobs = [j for j in Job.objects.filter(**filter_args).prefetch_related('instance_group')] - inventory_updates_qs = InventoryUpdate.objects.filter(**filter_args).exclude(source='file').prefetch_related('inventory_source', 'instance_group') - inventory_updates = [i for i in inventory_updates_qs] - # Notice the job_type='check': we want to prevent implicit project updates from blocking our jobs. - project_updates = [p for p in ProjectUpdate.objects.filter(**filter_args).filter(job_type='check').prefetch_related('instance_group')] - system_jobs = [s for s in SystemJob.objects.filter(**filter_args).prefetch_related('instance_group')] - ad_hoc_commands = [a for a in AdHocCommand.objects.filter(**filter_args).prefetch_related('instance_group')] - workflow_jobs = [w for w in WorkflowJob.objects.filter(**filter_args)] - all_tasks = sorted(jobs + project_updates + inventory_updates + system_jobs + ad_hoc_commands + workflow_jobs, key=lambda task: task.created) - logger.debug(f"{self.prefix} {all_tasks}") - return all_tasks + qs = UnifiedJob.objects.filter(**filter_args).exclude(launch_type='sync').order_by('created').prefetch_related('instance_group') + return [task for task in qs if not type(task) is WorkflowJob] def record_aggregate_metrics(self, *args): if not settings.IS_TESTING(): @@ -256,7 +247,18 @@ class WorkflowManager(TaskBase): @timeit def get_tasks(self): - return self.get_running_workflow_jobs() + workflow_jobs_running = [wf for wf in WorkflowJob.objects.filter(status='running')] + workflow_jobs_pending = [wf for wf in WorkflowJob.objects.filter(status='pending')] + workflow_to_start = [] + running_workflow_pk = {wf.pk for wf in workflow_jobs_running} + for wf in workflow_jobs_pending: + if wf.allow_simultaneous or wf.pk not in running_workflow_pk: + wf.status = 'running' + workflow_to_start.append(wf) + + WorkflowJob.objects.bulk_update(workflow_to_start, ['status']) + workflow_jobs_running.extend(workflow_to_start) + return workflow_jobs_running @timeit def _schedule(self): @@ -536,18 +538,10 @@ class TaskManager(TaskBase): task.save() # TODO: run error handler to fail sub-tasks and send notifications else: - if type(task) is WorkflowJob: - task.status = 'running' - task.send_notification_templates('running') - logger.debug('Transitioning %s to running status.', task.log_format) - schedule_task_manager() # at this point we already have control/execution nodes selected for the following cases - else: - task.instance_group = instance_group - execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' - logger.debug( - f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.' - ) + task.instance_group = instance_group + execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else '' + logger.debug(f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.') with disable_activity_stream(): task.celery_task_id = str(uuid.uuid4()) task.save() @@ -577,7 +571,6 @@ class TaskManager(TaskBase): @timeit def process_pending_tasks(self, pending_tasks): - running_workflow_templates = {wf.unified_job_template_id for wf in self.get_running_workflow_jobs()} tasks_to_update_job_explanation = [] for task in pending_tasks: if self.start_task_limit <= 0: @@ -596,16 +589,6 @@ class TaskManager(TaskBase): found_acceptable_queue = False preferred_instance_groups = task.preferred_instance_groups - if isinstance(task, WorkflowJob): - if task.unified_job_template_id in running_workflow_templates: - if not task.allow_simultaneous: - logger.debug("{} is blocked from running, workflow already running".format(task.log_format)) - continue - else: - running_workflow_templates.add(task.unified_job_template_id) - self.start_task(task, None, task.get_jobs_fail_chain(), None) - continue - # Determine if there is control capacity for the task if task.capacity_type == 'control': control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT diff --git a/awx/settings/development.py b/awx/settings/development.py index c5b5ab1a36..ee500dae7c 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -78,18 +78,6 @@ include(optional('/etc/tower/conf.d/*.py'), scope=locals()) BASE_VENV_PATH = "/var/lib/awx/venv/" AWX_VENV_PATH = os.path.join(BASE_VENV_PATH, "awx") -# If any local_*.py files are present in awx/settings/, use them to override -# default settings for development. If not present, we can still run using -# only the defaults. -try: - if os.getenv('AWX_KUBE_DEVEL', False): - include(optional('minikube.py'), scope=locals()) - else: - include(optional('local_*.py'), scope=locals()) -except ImportError: - traceback.print_exc() - sys.exit(1) - # Use SQLite for unit tests instead of PostgreSQL. If the lines below are # commented out, Django will create the test_awx-dev database in PostgreSQL to # run unit tests. @@ -114,8 +102,22 @@ AWX_CALLBACK_PROFILE = True # Disable normal scheduled/triggered task managers (DependencyManager, TaskManager, WorkflowManager). # Allows user to trigger task managers directly for debugging and profiling purposes. # Only works in combination with settings.SETTINGS_MODULE == 'awx.settings.development' -AWX_DISABLE_TASK_MANAGERS = os.getenv('AWX_DISABLE_TASK_MANAGERS', False) +AWX_DISABLE_TASK_MANAGERS = False # ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!================================= if 'sqlite3' not in DATABASES['default']['ENGINE']: # noqa DATABASES['default'].setdefault('OPTIONS', dict()).setdefault('application_name', f'{CLUSTER_HOST_ID}-{os.getpid()}-{" ".join(sys.argv)}'[:63]) # noqa + + +# If any local_*.py files are present in awx/settings/, use them to override +# default settings for development. If not present, we can still run using +# only the defaults. +# this needs to stay at the bottom of this file +try: + if os.getenv('AWX_KUBE_DEVEL', False): + include(optional('minikube.py'), scope=locals()) + else: + include(optional('local_*.py'), scope=locals()) +except ImportError: + traceback.print_exc() + sys.exit(1)