diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 115838c66c..9de7d93774 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -34,7 +34,7 @@ from awx.main.models import ( ) from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.utils.pglock import advisory_lock -from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager +from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager, create_partition from awx.main.signals import disable_activity_stream from awx.main.scheduler.dependency_graph import DependencyGraph from awx.main.utils import decrypt_field @@ -252,6 +252,16 @@ class TaskManager: } dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks] + controller_node = None + if task.supports_isolation() and rampart_group.controller_id: + try: + controller_node = rampart_group.choose_online_controller_node() + except IndexError: + logger.debug("No controllers available in group {} to run {}".format(rampart_group.name, task.log_format)) + return + + # Before task leaves pending state, ensure that job_event partitions exist + create_partition() task.status = 'waiting' (start_status, opts) = task.pre_start() diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index 3343abd31e..4dcd207dc6 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -1026,16 +1026,21 @@ def deepmerge(a, b): return b -def create_partition(start, end=None, partition_label=None): +def create_partition(start=None, end=None, partition_label=None): """Creates new partition tables for events. - If not specified, end is set to the end of the current hour.""" + - start defaults to beginning of current hour + - end defaults to end of current hour + - partition_label defaults to YYYYMMDD_HH""" + + if not start: + start = now().replace(microsecond=0, second=0, minute=0) if not end: end = start.replace(microsecond=0, second=0, minute=0) + timedelta(hours=1) start_timestamp = str(start) end_timestamp = str(end) if not partition_label: - partition_label = start.strftime('%Y_%m_%d_%H') + partition_label = start.strftime('%Y%m%d_%H') with connection.cursor() as cursor: # Only partitioning main_jobevent on first pass