mirror of
https://github.com/ansible/awx.git
synced 2026-05-11 03:17:38 -02:30
auto-create partition
This commit is contained in:
@@ -34,7 +34,7 @@ from awx.main.models import (
|
|||||||
)
|
)
|
||||||
from awx.main.scheduler.dag_workflow import WorkflowDAG
|
from awx.main.scheduler.dag_workflow import WorkflowDAG
|
||||||
from awx.main.utils.pglock import advisory_lock
|
from awx.main.utils.pglock import advisory_lock
|
||||||
from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager
|
from awx.main.utils import get_type_for_model, task_manager_bulk_reschedule, schedule_task_manager, create_partition
|
||||||
from awx.main.signals import disable_activity_stream
|
from awx.main.signals import disable_activity_stream
|
||||||
from awx.main.scheduler.dependency_graph import DependencyGraph
|
from awx.main.scheduler.dependency_graph import DependencyGraph
|
||||||
from awx.main.utils import decrypt_field
|
from awx.main.utils import decrypt_field
|
||||||
@@ -252,6 +252,16 @@ class TaskManager:
|
|||||||
}
|
}
|
||||||
dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks]
|
dependencies = [{'type': get_type_for_model(type(t)), 'id': t.id} for t in dependent_tasks]
|
||||||
|
|
||||||
|
controller_node = None
|
||||||
|
if task.supports_isolation() and rampart_group.controller_id:
|
||||||
|
try:
|
||||||
|
controller_node = rampart_group.choose_online_controller_node()
|
||||||
|
except IndexError:
|
||||||
|
logger.debug("No controllers available in group {} to run {}".format(rampart_group.name, task.log_format))
|
||||||
|
return
|
||||||
|
|
||||||
|
# Before task leaves pending state, ensure that job_event partitions exist
|
||||||
|
create_partition()
|
||||||
task.status = 'waiting'
|
task.status = 'waiting'
|
||||||
|
|
||||||
(start_status, opts) = task.pre_start()
|
(start_status, opts) = task.pre_start()
|
||||||
|
|||||||
@@ -1026,16 +1026,21 @@ def deepmerge(a, b):
|
|||||||
return b
|
return b
|
||||||
|
|
||||||
|
|
||||||
def create_partition(start, end=None, partition_label=None):
|
def create_partition(start=None, end=None, partition_label=None):
|
||||||
"""Creates new partition tables for events.
|
"""Creates new partition tables for events.
|
||||||
If not specified, end is set to the end of the current hour."""
|
- start defaults to beginning of current hour
|
||||||
|
- end defaults to end of current hour
|
||||||
|
- partition_label defaults to YYYYMMDD_HH"""
|
||||||
|
|
||||||
|
if not start:
|
||||||
|
start = now().replace(microsecond=0, second=0, minute=0)
|
||||||
if not end:
|
if not end:
|
||||||
end = start.replace(microsecond=0, second=0, minute=0) + timedelta(hours=1)
|
end = start.replace(microsecond=0, second=0, minute=0) + timedelta(hours=1)
|
||||||
start_timestamp = str(start)
|
start_timestamp = str(start)
|
||||||
end_timestamp = str(end)
|
end_timestamp = str(end)
|
||||||
|
|
||||||
if not partition_label:
|
if not partition_label:
|
||||||
partition_label = start.strftime('%Y_%m_%d_%H')
|
partition_label = start.strftime('%Y%m%d_%H')
|
||||||
|
|
||||||
with connection.cursor() as cursor:
|
with connection.cursor() as cursor:
|
||||||
# Only partitioning main_jobevent on first pass
|
# Only partitioning main_jobevent on first pass
|
||||||
|
|||||||
Reference in New Issue
Block a user