From 300020df0711670305e7ceb463d14770935bfbb0 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Wed, 10 Aug 2016 14:33:25 -0400 Subject: [PATCH 01/25] Make system admin and system auditor visible to oprhaned users. --- awx/main/models/rbac.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/awx/main/models/rbac.py b/awx/main/models/rbac.py index 3cb016ffde..8592a9c632 100644 --- a/awx/main/models/rbac.py +++ b/awx/main/models/rbac.py @@ -376,12 +376,13 @@ class Role(models.Model): @staticmethod @check_singleton - def visible_roles(user): + def visible_roles(user, include_super=True): sql_params = { 'ancestors_table': Role.ancestors.through._meta.db_table, 'parents_table': Role.parents.through._meta.db_table, 'roles_table': Role._meta.db_table, - 'ids': ','.join(str(x) for x in user.roles.values_list('id', flat=True)) + 'ids': ','.join(str(x) for x in user.roles.values_list('id', flat=True)), + 'mandatories': ','.join(('\'system_administrator\'', '\'system_auditor\'')), } qs = Role.objects.extra( @@ -394,6 +395,17 @@ class Role(models.Model): ) ''' % sql_params] ) + if include_super: + super_qs = Role.objects.extra( + where = [''' + %(roles_table)s.id IN ( + SELECT DISTINCT visible_roles_t3.id + FROM %(roles_table)s as visible_roles_t3 + WHERE visible_roles_t3.singleton_name IN (%(mandatories)s) + ) + ''' % sql_params] + ) + qs = qs | super_qs return qs @staticmethod From b719b7276f34873e4d43958651ea0e2bd1d6e7ba Mon Sep 17 00:00:00 2001 From: jangsutsr Date: Sun, 14 Aug 2016 20:10:45 -0400 Subject: [PATCH 02/25] Refactor for better performance. --- awx/api/views.py | 11 ++++++++++- awx/main/models/rbac.py | 14 +------------- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/awx/api/views.py b/awx/api/views.py index efbbecf10e..189d687222 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -3830,7 +3830,16 @@ class RoleList(ListAPIView): new_in_300 = True def get_queryset(self): - return Role.visible_roles(self.request.user) + result = Role.visible_roles(self.request.user) + # Sanity check: is the requesting user an orphaned non-admin/auditor? + # if yes, make system admin/auditor mandatorily visible. + if not self.request.user.organizations.exists() and\ + not self.request.user.is_superuser and\ + not self.request.user.is_system_auditor: + mandatories = ('system_administrator', 'system_auditor') + super_qs = Role.objects.filter(singleton_name__in=mandatories) + result = result | super_qs + return result class RoleDetail(RetrieveAPIView): diff --git a/awx/main/models/rbac.py b/awx/main/models/rbac.py index 8592a9c632..f469c1a7ac 100644 --- a/awx/main/models/rbac.py +++ b/awx/main/models/rbac.py @@ -376,13 +376,12 @@ class Role(models.Model): @staticmethod @check_singleton - def visible_roles(user, include_super=True): + def visible_roles(user): sql_params = { 'ancestors_table': Role.ancestors.through._meta.db_table, 'parents_table': Role.parents.through._meta.db_table, 'roles_table': Role._meta.db_table, 'ids': ','.join(str(x) for x in user.roles.values_list('id', flat=True)), - 'mandatories': ','.join(('\'system_administrator\'', '\'system_auditor\'')), } qs = Role.objects.extra( @@ -395,17 +394,6 @@ class Role(models.Model): ) ''' % sql_params] ) - if include_super: - super_qs = Role.objects.extra( - where = [''' - %(roles_table)s.id IN ( - SELECT DISTINCT visible_roles_t3.id - FROM %(roles_table)s as visible_roles_t3 - WHERE visible_roles_t3.singleton_name IN (%(mandatories)s) - ) - ''' % sql_params] - ) - qs = qs | super_qs return qs @staticmethod From 89cbceeab88dad7b35f43f7b9c8c5f136e15b379 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Tue, 20 Sep 2016 11:39:44 -0400 Subject: [PATCH 03/25] Functional test added. --- awx/main/tests/functional/api/test_role.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 awx/main/tests/functional/api/test_role.py diff --git a/awx/main/tests/functional/api/test_role.py b/awx/main/tests/functional/api/test_role.py new file mode 100644 index 0000000000..94215521a5 --- /dev/null +++ b/awx/main/tests/functional/api/test_role.py @@ -0,0 +1,13 @@ +import pytest + +from django.core.urlresolvers import reverse + +@pytest.mark.django_db +def test_admin_visible_to_orphaned_users(get, alice): + names = set() + + response = get(reverse('api:role_list'), user=alice) + for item in response.data['results']: + names.add(item['name']) + assert 'System Auditor' in names + assert 'System Administrator' in names From 2f24d286385edc75ddf9c17e6ed54f53f5e14a04 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Mon, 26 Sep 2016 10:35:29 -0400 Subject: [PATCH 04/25] fix bug where user content_object has no name attribute in access_list --- awx/api/serializers.py | 5 ++++- awx/main/tests/functional/api/test_rbac_displays.py | 6 ++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 5b45c9de5b..79199c27a0 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -1581,10 +1581,13 @@ class ResourceAccessListElementSerializer(UserSerializer): def format_role_perm(role): role_dict = { 'id': role.id, 'name': role.name, 'description': role.description} - if role.content_type is not None: + try: role_dict['resource_name'] = role.content_object.name role_dict['resource_type'] = role.content_type.name role_dict['related'] = reverse_gfk(role.content_object) + except: + pass + if role.content_type is not None: role_dict['user_capabilities'] = {'unattach': requesting_user.can_access( Role, 'unattach', role, user, 'members', data={}, skip_sub_obj_read_check=False)} else: diff --git a/awx/main/tests/functional/api/test_rbac_displays.py b/awx/main/tests/functional/api/test_rbac_displays.py index eb94e01dff..45b4a8f832 100644 --- a/awx/main/tests/functional/api/test_rbac_displays.py +++ b/awx/main/tests/functional/api/test_rbac_displays.py @@ -221,6 +221,12 @@ class TestAccessListCapabilities: direct_access_list = response.data['results'][0]['summary_fields']['direct_access'] assert direct_access_list[0]['role']['user_capabilities']['unattach'] == 'foobar' + def test_user_access_list_direct_access_capability(self, rando, get): + "When a user views their own access list, they can not unattach their admin role" + response = get(reverse('api:user_access_list', args=(rando.id,)), rando) + direct_access_list = response.data['results'][0]['summary_fields']['direct_access'] + assert not direct_access_list[0]['role']['user_capabilities']['unattach'] + @pytest.mark.django_db def test_team_roles_unattach(mocker, team, team_member, inventory, mock_access_method, get): From 3951f63df57b6df0f45b729cd7c083cd6584b508 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Mon, 26 Sep 2016 11:31:18 -0400 Subject: [PATCH 05/25] add exception type to try-except for access_list details --- awx/api/serializers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 79199c27a0..6e8a91c72a 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -1585,7 +1585,7 @@ class ResourceAccessListElementSerializer(UserSerializer): role_dict['resource_name'] = role.content_object.name role_dict['resource_type'] = role.content_type.name role_dict['related'] = reverse_gfk(role.content_object) - except: + except AttributeError: pass if role.content_type is not None: role_dict['user_capabilities'] = {'unattach': requesting_user.can_access( From cc90204b0f1fbfea16d0f76516e5be61d28ebd35 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Tue, 20 Sep 2016 10:14:38 -0400 Subject: [PATCH 06/25] task manager using messages * First pass, adapt singleton task manager to process messages and run jobs based on events instead of a busy loop. * Still need to make message handing run in celery, not in a consumption loop --- awx/api/views.py | 1 + .../management/commands/run_task_system.py | 391 +++++++----------- awx/main/models/unified_jobs.py | 11 + awx/main/scheduler/__init__.py | 0 awx/main/scheduler/dag_simple.py | 133 ++++++ awx/main/scheduler/dag_workflow.py | 74 ++++ awx/main/tasks.py | 20 + awx/settings/defaults.py | 6 + 8 files changed, 386 insertions(+), 250 deletions(-) create mode 100644 awx/main/scheduler/__init__.py create mode 100644 awx/main/scheduler/dag_simple.py create mode 100644 awx/main/scheduler/dag_workflow.py diff --git a/awx/api/views.py b/awx/api/views.py index 551bb814e9..19e3fd425f 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2290,6 +2290,7 @@ class JobTemplateLaunch(RetrieveAPIView, GenericAPIView): new_job = obj.create_unified_job(**kv) result = new_job.signal_start(**kv) + if not result: data = dict(passwords_needed_to_start=new_job.passwords_needed_to_start) new_job.delete() diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 855491f08c..b29b2e4d88 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -7,6 +7,10 @@ import datetime import logging import signal import time +import traceback + +from kombu import Connection, Exchange, Queue, Producer +from kombu.mixins import ConsumerMixin # Django from django.conf import settings @@ -17,6 +21,8 @@ from awx.main.models import * # noqa from awx.main.queue import FifoQueue from awx.main.tasks import handle_work_error, handle_work_success from awx.main.utils import get_system_task_capacity +from awx.main.scheduler.dag_simple import SimpleDAG +from awx.main.scheduler.dag_workflow import WorkflowDAG # Celery from celery.task.control import inspect @@ -25,208 +31,6 @@ logger = logging.getLogger('awx.main.commands.run_task_system') queue = FifoQueue('tower_task_manager') -class SimpleDAG(object): - ''' A simple implementation of a directed acyclic graph ''' - - def __init__(self): - self.nodes = [] - self.edges = [] - - def __contains__(self, obj): - for node in self.nodes: - if node['node_object'] == obj: - return True - return False - - def __len__(self): - return len(self.nodes) - - def __iter__(self): - return self.nodes.__iter__() - - def generate_graphviz_plot(self): - def short_string_obj(obj): - if type(obj) == Job: - type_str = "Job" - if type(obj) == AdHocCommand: - type_str = "AdHocCommand" - elif type(obj) == InventoryUpdate: - type_str = "Inventory" - elif type(obj) == ProjectUpdate: - type_str = "Project" - elif type(obj) == WorkflowJob: - type_str = "Workflow" - else: - type_str = "Unknown" - type_str += "%s" % str(obj.id) - return type_str - - doc = """ - digraph g { - rankdir = LR - """ - for n in self.nodes: - doc += "%s [color = %s]\n" % ( - short_string_obj(n['node_object']), - "red" if n['node_object'].status == 'running' else "black", - ) - for from_node, to_node, label in self.edges: - doc += "%s -> %s [ label=\"%s\" ];\n" % ( - short_string_obj(self.nodes[from_node]['node_object']), - short_string_obj(self.nodes[to_node]['node_object']), - label, - ) - doc += "}\n" - gv_file = open('/tmp/graph.gv', 'w') - gv_file.write(doc) - gv_file.close() - - def add_node(self, obj, metadata=None): - if self.find_ord(obj) is None: - self.nodes.append(dict(node_object=obj, metadata=metadata)) - - def add_edge(self, from_obj, to_obj, label=None): - from_obj_ord = self.find_ord(from_obj) - to_obj_ord = self.find_ord(to_obj) - if from_obj_ord is None or to_obj_ord is None: - raise LookupError("Object not found") - self.edges.append((from_obj_ord, to_obj_ord, label)) - - def add_edges(self, edgelist): - for edge_pair in edgelist: - self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2]) - - def find_ord(self, obj): - for idx in range(len(self.nodes)): - if obj == self.nodes[idx]['node_object']: - return idx - return None - - def get_node_type(self, obj): - if type(obj) == Job: - return "job" - elif type(obj) == AdHocCommand: - return "ad_hoc_command" - elif type(obj) == InventoryUpdate: - return "inventory_update" - elif type(obj) == ProjectUpdate: - return "project_update" - elif type(obj) == SystemJob: - return "system_job" - elif type(obj) == WorkflowJob: - return "workflow_job" - return "unknown" - - def get_dependencies(self, obj, label=None): - antecedents = [] - this_ord = self.find_ord(obj) - for node, dep, lbl in self.edges: - if label: - if node == this_ord and lbl == label: - antecedents.append(self.nodes[dep]) - else: - if node == this_ord: - antecedents.append(self.nodes[dep]) - return antecedents - - def get_dependents(self, obj, label=None): - decendents = [] - this_ord = self.find_ord(obj) - for node, dep, lbl in self.edges: - if label: - if dep == this_ord and lbl == label: - decendents.append(self.nodes[node]) - else: - if dep == this_ord: - decendents.append(self.nodes[node]) - return decendents - - def get_leaf_nodes(self): - leafs = [] - for n in self.nodes: - if len(self.get_dependencies(n['node_object'])) < 1: - leafs.append(n) - return leafs - - def get_root_nodes(self): - roots = [] - for n in self.nodes: - if len(self.get_dependents(n['node_object'])) < 1: - roots.append(n) - return roots - -class WorkflowDAG(SimpleDAG): - def __init__(self, workflow_job=None): - super(WorkflowDAG, self).__init__() - if workflow_job: - self._init_graph(workflow_job) - - def _init_graph(self, workflow_job): - workflow_nodes = workflow_job.workflow_job_nodes.all() - for workflow_node in workflow_nodes: - self.add_node(workflow_node) - - for node_type in ['success_nodes', 'failure_nodes', 'always_nodes']: - for workflow_node in workflow_nodes: - related_nodes = getattr(workflow_node, node_type).all() - for related_node in related_nodes: - self.add_edge(workflow_node, related_node, node_type) - - def bfs_nodes_to_run(self): - root_nodes = self.get_root_nodes() - nodes = root_nodes - nodes_found = [] - - for index, n in enumerate(nodes): - obj = n['node_object'] - job = obj.job - - if not job: - nodes_found.append(n) - # Job is about to run or is running. Hold our horses and wait for - # the job to finish. We can't proceed down the graph path until we - # have the job result. - elif job.status not in ['failed', 'error', 'successful']: - continue - elif job.status in ['failed', 'error']: - children_failed = self.get_dependencies(obj, 'failure_nodes') - children_always = self.get_dependencies(obj, 'always_nodes') - children_all = children_failed + children_always - nodes.extend(children_all) - elif job.status in ['successful']: - children_success = self.get_dependencies(obj, 'success_nodes') - nodes.extend(children_success) - else: - logger.warn("Incorrect graph structure") - return [n['node_object'] for n in nodes_found] - - def is_workflow_done(self): - root_nodes = self.get_root_nodes() - nodes = root_nodes - - for index, n in enumerate(nodes): - obj = n['node_object'] - job = obj.job - - if not job: - return False - # Job is about to run or is running. Hold our horses and wait for - # the job to finish. We can't proceed down the graph path until we - # have the job result. - elif job.status not in ['failed', 'error', 'successful']: - return False - elif job.status in ['failed', 'error']: - children_failed = self.get_dependencies(obj, 'failure_nodes') - children_always = self.get_dependencies(obj, 'always_nodes') - children_all = children_failed + children_always - nodes.extend(children_all) - elif job.status in ['successful']: - children_success = self.get_dependencies(obj, 'success_nodes') - nodes.extend(children_success) - else: - logger.warn("Incorrect graph structure") - return True - def get_tasks(): """Fetch all Tower tasks that are relevant to the task management system. @@ -247,6 +51,7 @@ def get_tasks(): graph_project_updates + graph_system_jobs + graph_workflow_jobs, key=lambda task: task.created) + print("Returning all_actions %s" % len(all_actions)) return all_actions def get_running_workflow_jobs(): @@ -277,14 +82,16 @@ def do_spawn_workflow_jobs(): #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) -def rebuild_graph(message): +def rebuild_graph(): """Regenerate the task graph by refreshing known tasks from Tower, purging orphaned running tasks, and creating dependencies for new tasks before generating directed edge relationships between those tasks. """ + ''' # Sanity check: Only do this on the primary node. if Instance.objects.my_role() == 'secondary': return None + ''' inspector = inspect() if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): @@ -297,6 +104,7 @@ def rebuild_graph(message): all_sorted_tasks = get_tasks() if not len(all_sorted_tasks): + print("All sorted task len is not? <%s, %s>" % (len(all_sorted_tasks), all_sorted_tasks)) return None active_tasks = [] @@ -417,53 +225,132 @@ def process_graph(graph, task_capacity): 'Remaining Capacity: %s' % (str(node_obj), str(impact), str(remaining_volume))) -def run_taskmanager(): - """Receive task start and finish signals to rebuild a dependency graph - and manage the actual running of tasks. - """ - def shutdown_handler(): - def _handler(signum, frame): - signal.signal(signum, signal.SIG_DFL) - os.kill(os.getpid(), signum) - return _handler - signal.signal(signal.SIGINT, shutdown_handler()) - signal.signal(signal.SIGTERM, shutdown_handler()) - paused = False - task_capacity = get_system_task_capacity() - last_rebuild = datetime.datetime.fromtimestamp(0) - # Attempt to pull messages off of the task system queue into perpetuity. - # - # A quick explanation of what is happening here: - # The popping messages off the queue bit is something of a sham. We remove - # the messages from the queue and then immediately throw them away. The - # `rebuild_graph` function, while it takes the message as an argument, - # ignores it. - # - # What actually happens is that we just check the database every 10 seconds - # to see what the task dependency graph looks like, and go do that. This - # is the job of the `rebuild_graph` function. - # - # There is some placeholder here: we may choose to actually use the message - # in the future. - while True: - # Pop a message off the queue. - # (If the queue is empty, None will be returned.) - message = queue.pop() +#logger = logging.getLogger('awx.main.scheduler') - # Parse out the message appropriately, rebuilding our graph if - # appropriate. - if (datetime.datetime.now() - last_rebuild).seconds > 10: - if message is not None and 'pause' in message: - logger.info("Pause command received: %s" % str(message)) - paused = message['pause'] - graph = rebuild_graph(message) - if not paused and graph is not None: - process_graph(graph, task_capacity) - last_rebuild = datetime.datetime.now() - time.sleep(0.1) +class CallbackBrokerWorker(ConsumerMixin): + def __init__(self, connection): + self.connection = connection + def get_consumers(self, Consumer, channel): + print("get_consumers() OK") + return [Consumer(queues=[Queue(settings.SCHEDULER_QUEUE, + Exchange(settings.SCHEDULER_QUEUE, type='topic'), + routing_key='scheduler.job.launch'),], + accept=['json'], + callbacks=[self.process_job_launch,]), + Consumer(queues=[Queue(settings.SCHEDULER_QUEUE, + Exchange(settings.SCHEDULER_QUEUE, type='topic'), + routing_key='scheduler.job.complete'),], + accept=['json'], + callbacks=[self.process_job_complete,] + )] + + def schedule(self): + task_capacity = get_system_task_capacity() + graph = rebuild_graph() + if graph: + process_graph(graph, task_capacity) + + def process_job_msg(self, body, message): + try: + if settings.DEBUG: + logger.info("Body: {}".format(body)) + logger.info("Message: {}".format(message)) + + if "msg_type" not in body: + raise Exception("Payload does not have a msg_type") + if "job_id" not in body: + raise Exception("Payload does not have a job_id") + + func = getattr(self, "process_%s" % body['msg_type'], None) + if not func: + raise AttributeError("No processor for message type %s" % body['msg_type']) + func(body) + + # Raised by processors when msg isn't in the expected form. + except LookupError as e: + logger.error(e) + except AttributeError as e: + logger.error(e) + except Exception as exc: + import traceback + traceback.print_exc() + logger.error('Callback Task Processor Raised Exception: %r', exc) + finally: + message.ack() + self.schedule() + + def process_job_launch(self, body, message): + print("process_job_launch()") + if "job_id" not in body: + raise KeyError("Payload does not contain job_id") + + ''' + Wait for job to exist. + The job is created in a transaction then the message is created, but + the transaction may not have completed. + + FIXME: We could generate the message in a Django signal handler. + OR, we could call an explicit commit in the view and then send the + message. + + ''' + retries = 10 + retry = 0 + while not UnifiedJob.objects.filter(id=body['job_id']).exists(): + time.sleep(0.3) + + if retry >= retries: + logger.error("Failed to process 'job_launch' message for job %d" % body['job_id']) + # ack the message so we don't build up the queue. + # + # The job can still be chosen to run during tower startup or + # when another job is started or completes + message.ack() + return + retry += 1 + + job = UnifiedJob.objects.get(id=body['job_id']) + + self.schedule() + message.ack() + + def process_job_complete(self, body, message): + print("process_job_complete()") + if "job_id" not in body: + raise KeyError("Payload does not contain job_id") + + # TODO: use list of finished status from jobs.py or unified_jobs.py + finished_status = ['successful', 'error', 'failed', 'completed'] + q = UnifiedJob.objects.filter(id=body['job_id']) + + # Ensure that the job is updated in the database before we call to + # schedule the next job. + retries = 10 + retry = 0 + while True: + # Job not found, most likely deleted. That's fine + if not q.exists(): + logger.warn("Failed to find job '%d' while processing 'job_complete' message. Presume that it was deleted." % body['job_id']) + break + + job = q[0] + if job.status in finished_status: + break + + time.sleep(0.3) + + if retry >= retries: + logger.error("Expected job status '%s' to be one of '%s' while processing 'job_complete' message." % (job.status, finished_status)) + message.ack() + return + retry += 1 + + message.ack() + self.schedule() + class Command(NoArgsCommand): """Tower Task Management System This daemon is designed to reside between our tasks and celery and @@ -477,7 +364,11 @@ class Command(NoArgsCommand): help = 'Launch the Tower task management system' def handle_noargs(self, **options): - try: - run_taskmanager() - except KeyboardInterrupt: - pass + with Connection(settings.BROKER_URL) as conn: + try: + worker = CallbackBrokerWorker(conn) + worker.run() + except KeyboardInterrupt: + print('Terminating Task Management System') + + diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 950b6fc99b..6806ff7d16 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -852,6 +852,17 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique self.update_fields(start_args=json.dumps(kwargs), status='pending') self.socketio_emit_status("pending") + from kombu import Connection, Exchange, Producer + connection = Connection(settings.BROKER_URL) + exchange = Exchange(settings.SCHEDULER_QUEUE, type='topic') + producer = Producer(connection) + producer.publish({ 'msg_type': 'job_launch', 'job_id': self.id }, + serializer='json', + compression='bzip2', + exchange=exchange, + declare=[exchange], + routing_key='scheduler.job.launch') + # Each type of unified job has a different Task class; get the # appropirate one. # task_type = get_type_for_model(self) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py new file mode 100644 index 0000000000..f04c60159a --- /dev/null +++ b/awx/main/scheduler/dag_simple.py @@ -0,0 +1,133 @@ + +from awx.main.models import * # noqa + +class SimpleDAG(object): + ''' A simple implementation of a directed acyclic graph ''' + + def __init__(self): + self.nodes = [] + self.edges = [] + + def __contains__(self, obj): + for node in self.nodes: + if node['node_object'] == obj: + return True + return False + + def __len__(self): + return len(self.nodes) + + def __iter__(self): + return self.nodes.__iter__() + + def generate_graphviz_plot(self): + def short_string_obj(obj): + if type(obj) == Job: + type_str = "Job" + if type(obj) == AdHocCommand: + type_str = "AdHocCommand" + elif type(obj) == InventoryUpdate: + type_str = "Inventory" + elif type(obj) == ProjectUpdate: + type_str = "Project" + elif type(obj) == WorkflowJob: + type_str = "Workflow" + else: + type_str = "Unknown" + type_str += "%s" % str(obj.id) + return type_str + + doc = """ + digraph g { + rankdir = LR + """ + for n in self.nodes: + doc += "%s [color = %s]\n" % ( + short_string_obj(n['node_object']), + "red" if n['node_object'].status == 'running' else "black", + ) + for from_node, to_node, label in self.edges: + doc += "%s -> %s [ label=\"%s\" ];\n" % ( + short_string_obj(self.nodes[from_node]['node_object']), + short_string_obj(self.nodes[to_node]['node_object']), + label, + ) + doc += "}\n" + gv_file = open('/tmp/graph.gv', 'w') + gv_file.write(doc) + gv_file.close() + + def add_node(self, obj, metadata=None): + if self.find_ord(obj) is None: + self.nodes.append(dict(node_object=obj, metadata=metadata)) + + def add_edge(self, from_obj, to_obj, label=None): + from_obj_ord = self.find_ord(from_obj) + to_obj_ord = self.find_ord(to_obj) + if from_obj_ord is None or to_obj_ord is None: + raise LookupError("Object not found") + self.edges.append((from_obj_ord, to_obj_ord, label)) + + def add_edges(self, edgelist): + for edge_pair in edgelist: + self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2]) + + def find_ord(self, obj): + for idx in range(len(self.nodes)): + if obj == self.nodes[idx]['node_object']: + return idx + return None + + def get_node_type(self, obj): + if type(obj) == Job: + return "job" + elif type(obj) == AdHocCommand: + return "ad_hoc_command" + elif type(obj) == InventoryUpdate: + return "inventory_update" + elif type(obj) == ProjectUpdate: + return "project_update" + elif type(obj) == SystemJob: + return "system_job" + elif type(obj) == WorkflowJob: + return "workflow_job" + return "unknown" + + def get_dependencies(self, obj, label=None): + antecedents = [] + this_ord = self.find_ord(obj) + for node, dep, lbl in self.edges: + if label: + if node == this_ord and lbl == label: + antecedents.append(self.nodes[dep]) + else: + if node == this_ord: + antecedents.append(self.nodes[dep]) + return antecedents + + def get_dependents(self, obj, label=None): + decendents = [] + this_ord = self.find_ord(obj) + for node, dep, lbl in self.edges: + if label: + if dep == this_ord and lbl == label: + decendents.append(self.nodes[node]) + else: + if dep == this_ord: + decendents.append(self.nodes[node]) + return decendents + + def get_leaf_nodes(self): + leafs = [] + for n in self.nodes: + if len(self.get_dependencies(n['node_object'])) < 1: + leafs.append(n) + return leafs + + def get_root_nodes(self): + roots = [] + for n in self.nodes: + if len(self.get_dependents(n['node_object'])) < 1: + roots.append(n) + return roots + diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py new file mode 100644 index 0000000000..1a8269c064 --- /dev/null +++ b/awx/main/scheduler/dag_workflow.py @@ -0,0 +1,74 @@ +from dag_simple import SimpleDAG + +class WorkflowDAG(SimpleDAG): + def __init__(self, workflow_job=None): + super(WorkflowDAG, self).__init__() + if workflow_job: + self._init_graph(workflow_job) + + def _init_graph(self, workflow_job): + workflow_nodes = workflow_job.workflow_job_nodes.all() + for workflow_node in workflow_nodes: + self.add_node(workflow_node) + + for node_type in ['success_nodes', 'failure_nodes', 'always_nodes']: + for workflow_node in workflow_nodes: + related_nodes = getattr(workflow_node, node_type).all() + for related_node in related_nodes: + self.add_edge(workflow_node, related_node, node_type) + + def bfs_nodes_to_run(self): + root_nodes = self.get_root_nodes() + nodes = root_nodes + nodes_found = [] + + for index, n in enumerate(nodes): + obj = n['node_object'] + job = obj.job + + if not job: + nodes_found.append(n) + # Job is about to run or is running. Hold our horses and wait for + # the job to finish. We can't proceed down the graph path until we + # have the job result. + elif job.status not in ['failed', 'error', 'successful']: + continue + elif job.status in ['failed', 'error']: + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_failed + children_always + nodes.extend(children_all) + elif job.status in ['successful']: + children_success = self.get_dependencies(obj, 'success_nodes') + nodes.extend(children_success) + else: + logger.warn("Incorrect graph structure") + return [n['node_object'] for n in nodes_found] + + def is_workflow_done(self): + root_nodes = self.get_root_nodes() + nodes = root_nodes + + for index, n in enumerate(nodes): + obj = n['node_object'] + job = obj.job + + if not job: + return False + # Job is about to run or is running. Hold our horses and wait for + # the job to finish. We can't proceed down the graph path until we + # have the job result. + elif job.status not in ['failed', 'error', 'successful']: + return False + elif job.status in ['failed', 'error']: + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_failed + children_always + nodes.extend(children_all) + elif job.status in ['successful']: + children_success = self.get_dependencies(obj, 'success_nodes') + nodes.extend(children_success) + else: + logger.warn("Incorrect graph structure") + return True + diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 097dca517d..86552b6404 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -31,6 +31,9 @@ except: # Pexpect import pexpect +# Kombu +from kombu import Connection, Exchange, Queue, Producer + # Celery from celery import Task, task from celery.signals import celeryd_init @@ -202,6 +205,18 @@ def _send_notification_templates(instance, status_str): for n in all_notification_templates], job_id=instance.id) + +def _send_job_complete_msg(instance): + connection = Connection(settings.BROKER_URL) + exchange = Exchange(settings.SCHEDULER_QUEUE, type='topic') + producer = Producer(connection) + producer.publish({ 'job_id': instance.id, 'msg_type': 'job_complete' }, + serializer='json', + compression='bzip2', + exchange=exchange, + declare=[exchange], + routing_key='scheduler.job.complete') + @task(bind=True, queue='default') def handle_work_success(self, result, task_actual): instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) @@ -210,6 +225,8 @@ def handle_work_success(self, result, task_actual): _send_notification_templates(instance, 'succeeded') + _send_job_complete_msg(instance) + @task(bind=True, queue='default') def handle_work_error(self, task_id, subtasks=None): print('Executing error task id %s, subtasks: %s' % @@ -238,6 +255,9 @@ def handle_work_error(self, task_id, subtasks=None): if first_instance: _send_notification_templates(first_instance, 'failed') + + if first_instance: + _send_job_complete_msg(first_instance) @task(queue='default') def update_inventory_computed_fields(inventory_id, should_update_hosts=True): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 31c8b3b8f3..8a65d7d322 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -342,6 +342,7 @@ CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('jobs', Exchange('jobs'), routing_key='jobs'), + #Queue('scheduler', Exchange('scheduler'), routing_key='scheduler.job.#'), # Projects use a fanout queue, this isn't super well supported Broadcast('projects'), ) @@ -737,6 +738,7 @@ ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC = False INTERNAL_API_URL = 'http://127.0.0.1:%s' % DEVSERVER_DEFAULT_PORT CALLBACK_QUEUE = "callback_tasks" +SCHEDULER_QUEUE = "scheduler" TASK_COMMAND_PORT = 6559 @@ -1042,6 +1044,10 @@ LOGGING = { 'handlers': ['console', 'file', 'task_system'], 'propagate': False }, + 'awx.main.scheduler': { + 'handlers': ['console', 'file', 'task_system'], + 'propagate': False + }, 'awx.main.commands.run_fact_cache_receiver': { 'handlers': ['console', 'file', 'fact_receiver'], 'propagate': False From cdb65ccac9ead06a081d641ffbe084374cc0dd77 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 22 Sep 2016 11:20:24 -0400 Subject: [PATCH 07/25] replace task manager with event driven scheduler --- Makefile | 13 +- Procfile | 3 +- .../management/commands/run_task_system.py | 374 ------------------ awx/main/models/unified_jobs.py | 12 +- awx/main/scheduler/__init__.py | 222 +++++++++++ awx/main/scheduler/dag_simple.py | 9 +- awx/main/scheduler/dag_workflow.py | 8 +- awx/main/scheduler/tasks.py | 79 ++++ awx/main/tasks.py | 43 +- awx/main/tests/base.py | 14 +- .../test_dag.py} | 12 +- awx/settings/defaults.py | 11 +- 12 files changed, 344 insertions(+), 456 deletions(-) delete mode 100644 awx/main/management/commands/run_task_system.py create mode 100644 awx/main/scheduler/tasks.py rename awx/main/tests/unit/{commands/test_run_task_system.py => scheduler/test_dag.py} (97%) diff --git a/Makefile b/Makefile index fd9d87cd2e..52c30c4bb7 100644 --- a/Makefile +++ b/Makefile @@ -357,7 +357,6 @@ server_noattach: tmux rename-window 'Tower' tmux select-window -t tower:0 tmux split-window -v 'exec make celeryd' - tmux split-window -h 'exec make taskmanager' tmux new-window 'exec make receiver' tmux select-window -t tower:1 tmux rename-window 'Extra Services' @@ -397,7 +396,7 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default + $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) # Run to start the zeromq callback receiver @@ -407,16 +406,6 @@ receiver: fi; \ $(PYTHON) manage.py run_callback_receiver -taskmanager: - @if [ "$(VENV_BASE)" ]; then \ - . $(VENV_BASE)/tower/bin/activate; \ - fi; \ - if [ "$(COMPOSE_HOST)" == "tower_1" ] || [ "$(COMPOSE_HOST)" == "tower" ]; then \ - $(PYTHON) manage.py run_task_system; \ - else \ - while true; do sleep 2; done; \ - fi - socketservice: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ diff --git a/Procfile b/Procfile index 433417f70b..b8dd37a983 100644 --- a/Procfile +++ b/Procfile @@ -1,7 +1,6 @@ runserver: make runserver celeryd: make celeryd -taskmanager: make taskmanager receiver: make receiver socketservice: make socketservice factcacher: make factcacher -flower: make flower \ No newline at end of file +flower: make flower diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py deleted file mode 100644 index b29b2e4d88..0000000000 --- a/awx/main/management/commands/run_task_system.py +++ /dev/null @@ -1,374 +0,0 @@ -#Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved - -# Python -import os -import datetime -import logging -import signal -import time -import traceback - -from kombu import Connection, Exchange, Queue, Producer -from kombu.mixins import ConsumerMixin - -# Django -from django.conf import settings -from django.core.management.base import NoArgsCommand - -# AWX -from awx.main.models import * # noqa -from awx.main.queue import FifoQueue -from awx.main.tasks import handle_work_error, handle_work_success -from awx.main.utils import get_system_task_capacity -from awx.main.scheduler.dag_simple import SimpleDAG -from awx.main.scheduler.dag_workflow import WorkflowDAG - -# Celery -from celery.task.control import inspect - -logger = logging.getLogger('awx.main.commands.run_task_system') - -queue = FifoQueue('tower_task_manager') - -def get_tasks(): - """Fetch all Tower tasks that are relevant to the task management - system. - """ - RELEVANT_JOBS = ('pending', 'waiting', 'running') - # TODO: Replace this when we can grab all objects in a sane way. - graph_jobs = [j for j in Job.objects.filter(status__in=RELEVANT_JOBS)] - graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(status__in=RELEVANT_JOBS)] - graph_inventory_updates = [iu for iu in - InventoryUpdate.objects.filter(status__in=RELEVANT_JOBS)] - graph_project_updates = [pu for pu in - ProjectUpdate.objects.filter(status__in=RELEVANT_JOBS)] - graph_system_jobs = [sj for sj in - SystemJob.objects.filter(status__in=RELEVANT_JOBS)] - graph_workflow_jobs = [wf for wf in - WorkflowJob.objects.filter(status__in=RELEVANT_JOBS)] - all_actions = sorted(graph_jobs + graph_ad_hoc_commands + graph_inventory_updates + - graph_project_updates + graph_system_jobs + - graph_workflow_jobs, - key=lambda task: task.created) - print("Returning all_actions %s" % len(all_actions)) - return all_actions - -def get_running_workflow_jobs(): - graph_workflow_jobs = [wf for wf in - WorkflowJob.objects.filter(status='running')] - return graph_workflow_jobs - -def do_spawn_workflow_jobs(): - workflow_jobs = get_running_workflow_jobs() - for workflow_job in workflow_jobs: - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - for spawn_node in spawn_nodes: - # TODO: Inject job template template params as kwargs. - # Make sure to take into account extra_vars merge logic - kv = {} - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - can_start = job.signal_start(**kv) - if not can_start: - job.status = 'failed' - job.job_explanation = "Workflow job could not start because it was not in the right state or required manual credentials" - job.save(update_fields=['status', 'job_explanation']) - job.socketio_emit_status("failed") - - # TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ? - #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - - -def rebuild_graph(): - """Regenerate the task graph by refreshing known tasks from Tower, purging - orphaned running tasks, and creating dependencies for new tasks before - generating directed edge relationships between those tasks. - """ - ''' - # Sanity check: Only do this on the primary node. - if Instance.objects.my_role() == 'secondary': - return None - ''' - - inspector = inspect() - if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): - active_task_queues = inspector.active() - else: - logger.warn("Ignoring celery task inspector") - active_task_queues = None - - do_spawn_workflow_jobs() - - all_sorted_tasks = get_tasks() - if not len(all_sorted_tasks): - print("All sorted task len is not? <%s, %s>" % (len(all_sorted_tasks), all_sorted_tasks)) - return None - - active_tasks = [] - if active_task_queues is not None: - for queue in active_task_queues: - active_tasks += [at['id'] for at in active_task_queues[queue]] - else: - logger.error("Could not communicate with celery!") - # TODO: Something needs to be done here to signal to the system - # as a whole that celery appears to be down. - if not hasattr(settings, 'CELERY_UNIT_TEST'): - return None - - running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) - waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) - new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) - - # Check running tasks and make sure they are active in celery - logger.debug("Active celery tasks: " + str(active_tasks)) - for task in list(running_tasks): - if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): - # NOTE: Pull status again and make sure it didn't finish in - # the meantime? - task.status = 'failed' - task.job_explanation += ' '.join(( - 'Task was marked as running in Tower but was not present in', - 'Celery, so it has been marked as failed.', - )) - task.save() - task.socketio_emit_status("failed") - running_tasks.pop(running_tasks.index(task)) - logger.error("Task %s appears orphaned... marking as failed" % task) - - # Create and process dependencies for new tasks - for task in new_tasks: - logger.debug("Checking dependencies for: %s" % str(task)) - try: - task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) - except Exception, e: - logger.error("Failed processing dependencies for {}: {}".format(task, e)) - task.status = 'failed' - task.job_explanation += 'Task failed to generate dependencies: {}'.format(e) - task.save() - task.socketio_emit_status("failed") - continue - logger.debug("New dependencies: %s" % str(task_dependencies)) - for dep in task_dependencies: - # We recalculate the created time for the moment to ensure the - # dependencies are always sorted in the right order relative to - # the dependent task. - time_delt = len(task_dependencies) - task_dependencies.index(dep) - dep.created = task.created - datetime.timedelta(seconds=1 + time_delt) - dep.status = 'waiting' - dep.save() - waiting_tasks.insert(waiting_tasks.index(task), dep) - if not hasattr(settings, 'UNIT_TEST_IGNORE_TASK_WAIT'): - task.status = 'waiting' - task.save() - - # Rebuild graph - graph = SimpleDAG() - for task in running_tasks: - graph.add_node(task) - for wait_task in waiting_tasks[:50]: - node_dependencies = [] - for node in graph: - if wait_task.is_blocked_by(node['node_object']): - node_dependencies.append(node['node_object']) - graph.add_node(wait_task) - for dependency in node_dependencies: - graph.add_edge(wait_task, dependency) - if settings.DEBUG: - graph.generate_graphviz_plot() - return graph - -def process_graph(graph, task_capacity): - """Given a task dependency graph, start and manage tasks given their - priority and weight. - """ - leaf_nodes = graph.get_leaf_nodes() - running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) - running_impact = sum([t['node_object'].task_impact for t in running_nodes]) - ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) - remaining_volume = task_capacity - running_impact - logger.info('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' - 'Remaining Capacity: %s' % - (str(running_nodes), str(task_capacity), - str(running_impact), str(remaining_volume))) - logger.info("Ready Nodes: %s" % str(ready_nodes)) - for task_node in ready_nodes: - node_obj = task_node['node_object'] - # NOTE: This could be used to pass metadata through the task system - # node_args = task_node['metadata'] - impact = node_obj.task_impact - if impact <= remaining_volume or running_impact == 0: - node_dependencies = graph.get_dependents(node_obj) - # Allow other tasks to continue if a job fails, even if they are - # other jobs. - if graph.get_node_type(node_obj) == 'job': - node_dependencies = [] - dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \ - [{'type': graph.get_node_type(n['node_object']), - 'id': n['node_object'].id} for n in node_dependencies] - error_handler = handle_work_error.s(subtasks=dependent_nodes) - success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj), - 'id': node_obj.id}) - start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) - if not start_status: - node_obj.status = 'failed' - if node_obj.job_explanation: - node_obj.job_explanation += ' ' - node_obj.job_explanation += 'Task failed pre-start check.' - node_obj.save() - continue - remaining_volume -= impact - running_impact += impact - logger.info('Started Node: %s (capacity hit: %s) ' - 'Remaining Capacity: %s' % - (str(node_obj), str(impact), str(remaining_volume))) - - -#logger = logging.getLogger('awx.main.scheduler') - -class CallbackBrokerWorker(ConsumerMixin): - - def __init__(self, connection): - self.connection = connection - - def get_consumers(self, Consumer, channel): - print("get_consumers() OK") - return [Consumer(queues=[Queue(settings.SCHEDULER_QUEUE, - Exchange(settings.SCHEDULER_QUEUE, type='topic'), - routing_key='scheduler.job.launch'),], - accept=['json'], - callbacks=[self.process_job_launch,]), - Consumer(queues=[Queue(settings.SCHEDULER_QUEUE, - Exchange(settings.SCHEDULER_QUEUE, type='topic'), - routing_key='scheduler.job.complete'),], - accept=['json'], - callbacks=[self.process_job_complete,] - )] - - def schedule(self): - task_capacity = get_system_task_capacity() - graph = rebuild_graph() - if graph: - process_graph(graph, task_capacity) - - def process_job_msg(self, body, message): - try: - if settings.DEBUG: - logger.info("Body: {}".format(body)) - logger.info("Message: {}".format(message)) - - if "msg_type" not in body: - raise Exception("Payload does not have a msg_type") - if "job_id" not in body: - raise Exception("Payload does not have a job_id") - - func = getattr(self, "process_%s" % body['msg_type'], None) - if not func: - raise AttributeError("No processor for message type %s" % body['msg_type']) - func(body) - - # Raised by processors when msg isn't in the expected form. - except LookupError as e: - logger.error(e) - except AttributeError as e: - logger.error(e) - except Exception as exc: - import traceback - traceback.print_exc() - logger.error('Callback Task Processor Raised Exception: %r', exc) - finally: - message.ack() - self.schedule() - - def process_job_launch(self, body, message): - print("process_job_launch()") - if "job_id" not in body: - raise KeyError("Payload does not contain job_id") - - ''' - Wait for job to exist. - The job is created in a transaction then the message is created, but - the transaction may not have completed. - - FIXME: We could generate the message in a Django signal handler. - OR, we could call an explicit commit in the view and then send the - message. - - ''' - retries = 10 - retry = 0 - while not UnifiedJob.objects.filter(id=body['job_id']).exists(): - time.sleep(0.3) - - if retry >= retries: - logger.error("Failed to process 'job_launch' message for job %d" % body['job_id']) - # ack the message so we don't build up the queue. - # - # The job can still be chosen to run during tower startup or - # when another job is started or completes - message.ack() - return - retry += 1 - - job = UnifiedJob.objects.get(id=body['job_id']) - - self.schedule() - message.ack() - - def process_job_complete(self, body, message): - print("process_job_complete()") - if "job_id" not in body: - raise KeyError("Payload does not contain job_id") - - # TODO: use list of finished status from jobs.py or unified_jobs.py - finished_status = ['successful', 'error', 'failed', 'completed'] - q = UnifiedJob.objects.filter(id=body['job_id']) - - # Ensure that the job is updated in the database before we call to - # schedule the next job. - retries = 10 - retry = 0 - while True: - # Job not found, most likely deleted. That's fine - if not q.exists(): - logger.warn("Failed to find job '%d' while processing 'job_complete' message. Presume that it was deleted." % body['job_id']) - break - - job = q[0] - if job.status in finished_status: - break - - time.sleep(0.3) - - if retry >= retries: - logger.error("Expected job status '%s' to be one of '%s' while processing 'job_complete' message." % (job.status, finished_status)) - message.ack() - return - retry += 1 - - message.ack() - self.schedule() - -class Command(NoArgsCommand): - """Tower Task Management System - This daemon is designed to reside between our tasks and celery and - provide a mechanism for understanding the relationship between those tasks - and their dependencies. - - It also actively prevents situations in which Tower can get blocked - because it doesn't have an understanding of what is progressing through - celery. - """ - help = 'Launch the Tower task management system' - - def handle_noargs(self, **options): - with Connection(settings.BROKER_URL) as conn: - try: - worker = CallbackBrokerWorker(conn) - worker.run() - except KeyboardInterrupt: - print('Terminating Task Management System') - - diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 6806ff7d16..a81bcb6aca 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -852,16 +852,8 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique self.update_fields(start_args=json.dumps(kwargs), status='pending') self.socketio_emit_status("pending") - from kombu import Connection, Exchange, Producer - connection = Connection(settings.BROKER_URL) - exchange = Exchange(settings.SCHEDULER_QUEUE, type='topic') - producer = Producer(connection) - producer.publish({ 'msg_type': 'job_launch', 'job_id': self.id }, - serializer='json', - compression='bzip2', - exchange=exchange, - declare=[exchange], - routing_key='scheduler.job.launch') + from awx.main.scheduler.tasks import run_job_launch + run_job_launch.delay(self.id) # Each type of unified job has a different Task class; get the # appropirate one. diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index e69de29bb2..1c3a1bc515 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -0,0 +1,222 @@ +#Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved + +# Python +import datetime +import logging + +# Django +from django.conf import settings + +# AWX +from awx.main.models import * # noqa +from awx.main.tasks import handle_work_error, handle_work_success +from awx.main.utils import get_system_task_capacity +from awx.main.scheduler.dag_simple import SimpleDAG +from awx.main.scheduler.dag_workflow import WorkflowDAG + +# Celery +from celery.task.control import inspect + +logger = logging.getLogger('awx.main.scheduler') + +def get_tasks(): + """Fetch all Tower tasks that are relevant to the task management + system. + """ + RELEVANT_JOBS = ('pending', 'waiting', 'running') + # TODO: Replace this when we can grab all objects in a sane way. + graph_jobs = [j for j in Job.objects.filter(status__in=RELEVANT_JOBS)] + graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(status__in=RELEVANT_JOBS)] + graph_inventory_updates = [iu for iu in + InventoryUpdate.objects.filter(status__in=RELEVANT_JOBS)] + graph_project_updates = [pu for pu in + ProjectUpdate.objects.filter(status__in=RELEVANT_JOBS)] + graph_system_jobs = [sj for sj in + SystemJob.objects.filter(status__in=RELEVANT_JOBS)] + graph_workflow_jobs = [wf for wf in + WorkflowJob.objects.filter(status__in=RELEVANT_JOBS)] + all_actions = sorted(graph_jobs + graph_ad_hoc_commands + graph_inventory_updates + + graph_project_updates + graph_system_jobs + + graph_workflow_jobs, + key=lambda task: task.created) + return all_actions + +def get_running_workflow_jobs(): + graph_workflow_jobs = [wf for wf in + WorkflowJob.objects.filter(status='running')] + return graph_workflow_jobs + +def do_spawn_workflow_jobs(): + workflow_jobs = get_running_workflow_jobs() + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + spawn_nodes = dag.bfs_nodes_to_run() + for spawn_node in spawn_nodes: + # TODO: Inject job template template params as kwargs. + # Make sure to take into account extra_vars merge logic + kv = {} + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + can_start = job.signal_start(**kv) + if not can_start: + job.status = 'failed' + job.job_explanation = "Workflow job could not start because it was not in the right state or required manual credentials" + job.save(update_fields=['status', 'job_explanation']) + job.socketio_emit_status("failed") + + # TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ? + #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + + +def rebuild_graph(): + """Regenerate the task graph by refreshing known tasks from Tower, purging + orphaned running tasks, and creating dependencies for new tasks before + generating directed edge relationships between those tasks. + """ + ''' + # Sanity check: Only do this on the primary node. + if Instance.objects.my_role() == 'secondary': + return None + ''' + + inspector = inspect() + if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): + active_task_queues = inspector.active() + else: + logger.warn("Ignoring celery task inspector") + active_task_queues = None + + do_spawn_workflow_jobs() + + all_sorted_tasks = get_tasks() + if not len(all_sorted_tasks): + return None + + active_tasks = [] + if active_task_queues is not None: + for queue in active_task_queues: + active_tasks += [at['id'] for at in active_task_queues[queue]] + else: + logger.error("Could not communicate with celery!") + # TODO: Something needs to be done here to signal to the system + # as a whole that celery appears to be down. + if not hasattr(settings, 'CELERY_UNIT_TEST'): + return None + + running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) + waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) + new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) + + # Check running tasks and make sure they are active in celery + logger.debug("Active celery tasks: " + str(active_tasks)) + for task in list(running_tasks): + if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): + # NOTE: Pull status again and make sure it didn't finish in + # the meantime? + task.status = 'failed' + task.job_explanation += ' '.join(( + 'Task was marked as running in Tower but was not present in', + 'Celery, so it has been marked as failed.', + )) + task.save() + task.socketio_emit_status("failed") + running_tasks.pop(running_tasks.index(task)) + logger.error("Task %s appears orphaned... marking as failed" % task) + + # Create and process dependencies for new tasks + for task in new_tasks: + logger.debug("Checking dependencies for: %s" % str(task)) + try: + task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) + except Exception, e: + logger.error("Failed processing dependencies for {}: {}".format(task, e)) + task.status = 'failed' + task.job_explanation += 'Task failed to generate dependencies: {}'.format(e) + task.save() + task.socketio_emit_status("failed") + continue + logger.debug("New dependencies: %s" % str(task_dependencies)) + for dep in task_dependencies: + # We recalculate the created time for the moment to ensure the + # dependencies are always sorted in the right order relative to + # the dependent task. + time_delt = len(task_dependencies) - task_dependencies.index(dep) + dep.created = task.created - datetime.timedelta(seconds=1 + time_delt) + dep.status = 'waiting' + dep.save() + waiting_tasks.insert(waiting_tasks.index(task), dep) + if not hasattr(settings, 'UNIT_TEST_IGNORE_TASK_WAIT'): + task.status = 'waiting' + task.save() + + # Rebuild graph + graph = SimpleDAG() + for task in running_tasks: + graph.add_node(task) + for wait_task in waiting_tasks[:50]: + node_dependencies = [] + for node in graph: + if wait_task.is_blocked_by(node['node_object']): + node_dependencies.append(node['node_object']) + graph.add_node(wait_task) + for dependency in node_dependencies: + graph.add_edge(wait_task, dependency) + if settings.DEBUG: + graph.generate_graphviz_plot() + return graph + +def process_graph(graph, task_capacity): + """Given a task dependency graph, start and manage tasks given their + priority and weight. + """ + leaf_nodes = graph.get_leaf_nodes() + running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) + running_impact = sum([t['node_object'].task_impact for t in running_nodes]) + ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) + remaining_volume = task_capacity - running_impact + logger.info('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' + 'Remaining Capacity: %s' % + (str(running_nodes), str(task_capacity), + str(running_impact), str(remaining_volume))) + logger.info("Ready Nodes: %s" % str(ready_nodes)) + for task_node in ready_nodes: + node_obj = task_node['node_object'] + # NOTE: This could be used to pass metadata through the task system + # node_args = task_node['metadata'] + impact = node_obj.task_impact + if impact <= remaining_volume or running_impact == 0: + node_dependencies = graph.get_dependents(node_obj) + # Allow other tasks to continue if a job fails, even if they are + # other jobs. + if graph.get_node_type(node_obj) == 'job': + node_dependencies = [] + dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \ + [{'type': graph.get_node_type(n['node_object']), + 'id': n['node_object'].id} for n in node_dependencies] + error_handler = handle_work_error.s(subtasks=dependent_nodes) + success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj), + 'id': node_obj.id}) + start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) + if not start_status: + node_obj.status = 'failed' + if node_obj.job_explanation: + node_obj.job_explanation += ' ' + node_obj.job_explanation += 'Task failed pre-start check.' + node_obj.save() + continue + remaining_volume -= impact + running_impact += impact + logger.info('Started Node: %s (capacity hit: %s) ' + 'Remaining Capacity: %s' % + (str(node_obj), str(impact), str(remaining_volume))) + + + +def schedule(): + task_capacity = get_system_task_capacity() + graph = rebuild_graph() + if graph: + process_graph(graph, task_capacity) + diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py index f04c60159a..79b20520e2 100644 --- a/awx/main/scheduler/dag_simple.py +++ b/awx/main/scheduler/dag_simple.py @@ -1,5 +1,12 @@ -from awx.main.models import * # noqa +from awx.main.models import ( + Job, + AdHocCommand, + InventoryUpdate, + ProjectUpdate, + WorkflowJob, + SystemJob, +) class SimpleDAG(object): ''' A simple implementation of a directed acyclic graph ''' diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py index 1a8269c064..c891b2ec32 100644 --- a/awx/main/scheduler/dag_workflow.py +++ b/awx/main/scheduler/dag_workflow.py @@ -1,4 +1,6 @@ -from dag_simple import SimpleDAG + +# AWX +from awx.main.scheduler.dag_simple import SimpleDAG class WorkflowDAG(SimpleDAG): def __init__(self, workflow_job=None): @@ -41,8 +43,6 @@ class WorkflowDAG(SimpleDAG): elif job.status in ['successful']: children_success = self.get_dependencies(obj, 'success_nodes') nodes.extend(children_success) - else: - logger.warn("Incorrect graph structure") return [n['node_object'] for n in nodes_found] def is_workflow_done(self): @@ -68,7 +68,5 @@ class WorkflowDAG(SimpleDAG): elif job.status in ['successful']: children_success = self.get_dependencies(obj, 'success_nodes') nodes.extend(children_success) - else: - logger.warn("Incorrect graph structure") return True diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py new file mode 100644 index 0000000000..343bdd1546 --- /dev/null +++ b/awx/main/scheduler/tasks.py @@ -0,0 +1,79 @@ + +# Python +import logging +import time + +# Celery +from celery import task + +# AWX +from awx.main.models import UnifiedJob +from awx.main.scheduler import schedule + +logger = logging.getLogger('awx.main.scheduler') + +# TODO: move logic to UnifiedJob model and use bind=True feature of celery. +# Would we need the request loop then? I think so. Even if we get the in-memory +# updated model, the call to schedule() may get stale data. + +@task +def run_job_launch(job_id): + # Wait for job to exist. + # The job is created in a transaction then the message is created, but + # the transaction may not have completed. + + # FIXME: We could generate the message in a Django signal handler. + # OR, we could call an explicit commit in the view and then send the + # message. + + retries = 10 + retry = 0 + while not UnifiedJob.objects.filter(id=job_id).exists(): + time.sleep(0.3) + + if retry >= retries: + logger.error("Failed to process 'job_launch' message for job %d" % job_id) + # ack the message so we don't build up the queue. + # + # The job can still be chosen to run during tower startup or + # when another job is started or completes + return + retry += 1 + + # "Safe" to get the job now since it exists. + # Really, there is a race condition from exists to get + + # TODO: while not loop should call get wrapped in a try except + #job = UnifiedJob.objects.get(id=job_id) + + schedule() + +@task +def run_job_complete(job_id): + # TODO: use list of finished status from jobs.py or unified_jobs.py + finished_status = ['successful', 'error', 'failed', 'completed'] + q = UnifiedJob.objects.filter(id=job_id) + + # Ensure that the job is updated in the database before we call to + # schedule the next job. + retries = 10 + retry = 0 + while True: + # Job not found, most likely deleted. That's fine + if not q.exists(): + logger.warn("Failed to find job '%d' while processing 'job_complete' message. Presume that it was deleted." % job_id) + break + + job = q[0] + if job.status in finished_status: + break + + time.sleep(0.3) + + if retry >= retries: + logger.error("Expected job status '%s' to be one of '%s' while processing 'job_complete' message." % (job.status, finished_status)) + return + retry += 1 + + schedule() + diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 86552b6404..31db196a9b 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -31,9 +31,6 @@ except: # Pexpect import pexpect -# Kombu -from kombu import Connection, Exchange, Queue, Producer - # Celery from celery import Task, task from celery.signals import celeryd_init @@ -50,18 +47,18 @@ from django.contrib.auth.models import User from awx.main.constants import CLOUD_PROVIDERS from awx.main.models import * # noqa from awx.main.models import UnifiedJob -from awx.main.queue import FifoQueue from awx.main.conf import tower_settings from awx.main.task_engine import TaskSerializer, TASK_TIMEOUT_INTERVAL from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, emit_websocket_notification, check_proot_installed, build_proot_temp_dir, wrap_args_with_proot) +from awx.main.scheduler.dag_workflow import WorkflowDAG __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error', 'handle_work_success', 'update_inventory_computed_fields', 'send_notifications', 'run_administrative_checks', - 'run_workflow_job'] + 'RunJobLaunch'] HIDDEN_PASSWORD = '**********' @@ -182,14 +179,6 @@ def tower_periodic_scheduler(self): new_unified_job.socketio_emit_status("failed") emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id)) -@task(queue='default') -def notify_task_runner(metadata_dict): - """Add the given task into the Tower task manager's queue, to be consumed - by the task system. - """ - queue = FifoQueue('tower_task_manager') - queue.push(metadata_dict) - def _send_notification_templates(instance, status_str): if status_str not in ['succeeded', 'failed']: raise ValueError("status_str must be either succeeded or failed") @@ -206,17 +195,6 @@ def _send_notification_templates(instance, status_str): job_id=instance.id) -def _send_job_complete_msg(instance): - connection = Connection(settings.BROKER_URL) - exchange = Exchange(settings.SCHEDULER_QUEUE, type='topic') - producer = Producer(connection) - producer.publish({ 'job_id': instance.id, 'msg_type': 'job_complete' }, - serializer='json', - compression='bzip2', - exchange=exchange, - declare=[exchange], - routing_key='scheduler.job.complete') - @task(bind=True, queue='default') def handle_work_success(self, result, task_actual): instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) @@ -225,7 +203,8 @@ def handle_work_success(self, result, task_actual): _send_notification_templates(instance, 'succeeded') - _send_job_complete_msg(instance) + from awx.main.scheduler.tasks import run_job_complete + run_job_complete.delay(instance.id) @task(bind=True, queue='default') def handle_work_error(self, task_id, subtasks=None): @@ -256,8 +235,14 @@ def handle_work_error(self, task_id, subtasks=None): if first_instance: _send_notification_templates(first_instance, 'failed') + # We only send 1 job complete message since all the job completion message + # handling does is trigger the scheduler. If we extend the functionality of + # what the job complete message handler does then we may want to send a + # completion event for each job here. if first_instance: - _send_job_complete_msg(first_instance) + from awx.main.scheduler.tasks import run_job_complete + run_job_complete.delay(first_instance.id) + pass @task(queue='default') def update_inventory_computed_fields(inventory_id, should_update_hosts=True): @@ -323,10 +308,6 @@ class BaseTask(Task): logger.error('Failed to update %s after %d retries.', self.model._meta.object_name, _attempt) - def signal_finished(self, pk): - pass - # notify_task_runner(dict(complete=pk)) - def get_path_to(self, *args): ''' Return absolute path relative to this file. @@ -1690,7 +1671,7 @@ class RunWorkflowJob(BaseTask): model = WorkflowJob def run(self, pk, **kwargs): - from awx.main.management.commands.run_task_system import WorkflowDAG + print("I'm a running a workflow job") ''' Run the job/task and capture its output. ''' diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index 6b35297a07..34be4081b8 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -30,7 +30,7 @@ from django.utils.encoding import force_text # AWX from awx.main.models import * # noqa -from awx.main.management.commands.run_task_system import run_taskmanager +from awx.main.management.commands.run_callback_receiver import CallbackReceiver from awx.main.utils import get_ansible_version from awx.main.task_engine import TaskEngager as LicenseWriter from awx.sso.backends import LDAPSettings @@ -654,18 +654,6 @@ class BaseTestMixin(MockCommonlySlowTestMixin): u'expected no traceback, got:\n%s' % job.result_traceback) - - def start_taskmanager(self, command_port): - self.start_redis() - self.taskmanager_process = Process(target=run_taskmanager, - args=(command_port,)) - self.taskmanager_process.start() - - def terminate_taskmanager(self): - if hasattr(self, 'taskmanager_process'): - self.taskmanager_process.terminate() - self.stop_redis() - class BaseTest(BaseTestMixin, django.test.TestCase): ''' Base class for unit tests. diff --git a/awx/main/tests/unit/commands/test_run_task_system.py b/awx/main/tests/unit/scheduler/test_dag.py similarity index 97% rename from awx/main/tests/unit/commands/test_run_task_system.py rename to awx/main/tests/unit/scheduler/test_dag.py index bc62394b21..84fb2d37f2 100644 --- a/awx/main/tests/unit/commands/test_run_task_system.py +++ b/awx/main/tests/unit/scheduler/test_dag.py @@ -1,10 +1,12 @@ -from awx.main.management.commands.run_task_system import ( - SimpleDAG, - WorkflowDAG, -) + +# Python +import pytest + +# AWX +from awx.main.scheduler.dag_simple import SimpleDAG +from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.models import Job from awx.main.models.workflow import WorkflowJobNode -import pytest @pytest.fixture def dag_root(): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 8a65d7d322..20a80ecca4 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -339,10 +339,11 @@ CELERYD_TASK_SOFT_TIME_LIMIT = None CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' +CELERY_IMPORTS = ('awx.main.scheduler.tasks',) CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('jobs', Exchange('jobs'), routing_key='jobs'), - #Queue('scheduler', Exchange('scheduler'), routing_key='scheduler.job.#'), + Queue('scheduler', Exchange('scheduler', type='topic'), routing_key='scheduler.job.#'), # Projects use a fanout queue, this isn't super well supported Broadcast('projects'), ) @@ -354,7 +355,11 @@ CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs', 'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs', 'routing_key': 'jobs'}, 'awx.main.tasks.run_system_job': {'queue': 'jobs', - 'routing_key': 'jobs'}}) + 'routing_key': 'jobs'}, + 'awx.main.scheduler.tasks.run_job_launch': {'queue': 'scheduler', + 'routing_key': 'scheduler.job.launch'}, + 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler', + 'routing_key': 'scheduler.job.complete'},}) CELERYBEAT_SCHEDULE = { 'tower_scheduler': { @@ -1040,7 +1045,7 @@ LOGGING = { 'handlers': ['console', 'file', 'socketio_service'], 'propagate': False }, - 'awx.main.commands.run_task_system': { + 'awx.main.tasks': { 'handlers': ['console', 'file', 'task_system'], 'propagate': False }, From 04f69727f214b1286904cac2557a852126e3e3c7 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 22 Sep 2016 15:36:58 -0400 Subject: [PATCH 08/25] fully message driven job execution TODO: * Need a distributed lock (leverage postgres) * Less memory-intensive graph representation * Maybe serializer/deserializer graph to database * Iterative graph building instead of full rebuild. --- awx/main/models/unified_jobs.py | 32 ++++++++++----- awx/main/models/workflow.py | 8 ++++ awx/main/scheduler/__init__.py | 72 ++++++++++++++++++++++++--------- awx/main/tasks.py | 23 +++++++---- awx/settings/defaults.py | 2 +- 5 files changed, 100 insertions(+), 37 deletions(-) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index a81bcb6aca..bcdde810e9 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -798,34 +798,43 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique status=self.status, traceback=self.result_traceback) - def start(self, error_callback, success_callback, **kwargs): - ''' - Start the task running via Celery. - ''' - task_class = self._get_task_class() + def pre_start(self, **kwargs): if not self.can_start: self.job_explanation = u'%s is not in a startable status: %s, expecting one of %s' % (self._meta.verbose_name, self.status, str(('new', 'waiting'))) self.save(update_fields=['job_explanation']) - return False + return (False, None) + needed = self.get_passwords_needed_to_start() try: start_args = json.loads(decrypt_field(self, 'start_args')) except Exception: start_args = None + if start_args in (None, ''): start_args = kwargs + opts = dict([(field, start_args.get(field, '')) for field in needed]) + if not all(opts.values()): missing_fields = ', '.join([k for k,v in opts.items() if not v]) self.job_explanation = u'Missing needed fields: %s.' % missing_fields self.save(update_fields=['job_explanation']) - return False - #extra_data = dict([(field, kwargs[field]) for field in kwargs - # if field not in needed]) + return (False, None) + if 'extra_vars' in kwargs: self.handle_extra_data(kwargs['extra_vars']) - task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) - return True + + return (True, opts) + + def start(self, error_callback, success_callback, **kwargs): + ''' + Start the task running via Celery. + ''' + task_class = self._get_task_class() + (res, opts) = self.pre_start(**kwargs) + if res: + task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) + return res def signal_start(self, **kwargs): """Notify the task runner system to begin work on this task.""" @@ -852,6 +861,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique self.update_fields(start_args=json.dumps(kwargs), status='pending') self.socketio_emit_status("pending") + print("Running job launch for job %s" % self.name) from awx.main.scheduler.tasks import run_job_launch run_job_launch.delay(self.id) diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index 3c95fb17e8..68066ee58a 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -240,3 +240,11 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow def get_notification_friendly_name(self): return "Workflow Job" + def start(self, *args, **kwargs): + (res, opts) = self.pre_start(**kwargs) + if res: + self.status = 'running' + self.save() + self.socketio_emit_status("running") + return res + diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 1c3a1bc515..f10cb2dcd6 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -4,13 +4,14 @@ # Python import datetime import logging +import struct, fcntl, os # Django from django.conf import settings +from django.db import transaction # AWX from awx.main.models import * # noqa -from awx.main.tasks import handle_work_error, handle_work_success from awx.main.utils import get_system_task_capacity from awx.main.scheduler.dag_simple import SimpleDAG from awx.main.scheduler.dag_workflow import WorkflowDAG @@ -47,8 +48,8 @@ def get_running_workflow_jobs(): WorkflowJob.objects.filter(status='running')] return graph_workflow_jobs -def do_spawn_workflow_jobs(): - workflow_jobs = get_running_workflow_jobs() +def spawn_workflow_graph_jobs(workflow_jobs): + # TODO: Consider using transaction.atomic for workflow_job in workflow_jobs: dag = WorkflowDAG(workflow_job) spawn_nodes = dag.bfs_nodes_to_run() @@ -69,6 +70,16 @@ def do_spawn_workflow_jobs(): # TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ? #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) +# See comment in tasks.py::RunWorkflowJob::run() +def process_finished_workflow_jobs(workflow_jobs): + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + if dag.is_workflow_done(): + with transaction.atomic(): + # TODO: detect if wfj failed + workflow_job.status = 'completed' + workflow_job.save() + workflow_job.socketio_emit_status('completed') def rebuild_graph(): """Regenerate the task graph by refreshing known tasks from Tower, purging @@ -88,8 +99,6 @@ def rebuild_graph(): logger.warn("Ignoring celery task inspector") active_task_queues = None - do_spawn_workflow_jobs() - all_sorted_tasks = get_tasks() if not len(all_sorted_tasks): return None @@ -106,12 +115,13 @@ def rebuild_graph(): return None running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) + running_celery_tasks = filter(lambda t: type(t) != WorkflowJob, running_tasks) waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) # Check running tasks and make sure they are active in celery logger.debug("Active celery tasks: " + str(active_tasks)) - for task in list(running_tasks): + for task in list(running_celery_tasks): if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): # NOTE: Pull status again and make sure it didn't finish in # the meantime? @@ -122,7 +132,7 @@ def rebuild_graph(): )) task.save() task.socketio_emit_status("failed") - running_tasks.pop(running_tasks.index(task)) + running_tasks.pop(task) logger.error("Task %s appears orphaned... marking as failed" % task) # Create and process dependencies for new tasks @@ -171,6 +181,8 @@ def process_graph(graph, task_capacity): """Given a task dependency graph, start and manage tasks given their priority and weight. """ + from awx.main.tasks import handle_work_error, handle_work_success + leaf_nodes = graph.get_leaf_nodes() running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) running_impact = sum([t['node_object'].task_impact for t in running_nodes]) @@ -190,33 +202,57 @@ def process_graph(graph, task_capacity): node_dependencies = graph.get_dependents(node_obj) # Allow other tasks to continue if a job fails, even if they are # other jobs. - if graph.get_node_type(node_obj) == 'job': + + node_type = graph.get_node_type(node_obj) + if node_type == 'job': + # clear dependencies because a job can block (not necessarily + # depend) on other jobs that share the same job template node_dependencies = [] + + # Make the workflow_job look like it's started by setting status to + # running, but don't make a celery Task for it. + # Introduce jobs from the workflow so they are candidates to run. + # Call process_graph() again to allow choosing for run, the + # created candidate jobs. + elif node_type == 'workflow_job': + node_obj.start() + spawn_workflow_graph_jobs([node_obj]) + return process_graph(graph, task_capacity) + dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \ [{'type': graph.get_node_type(n['node_object']), 'id': n['node_object'].id} for n in node_dependencies] error_handler = handle_work_error.s(subtasks=dependent_nodes) success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj), 'id': node_obj.id}) - start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) - if not start_status: - node_obj.status = 'failed' - if node_obj.job_explanation: - node_obj.job_explanation += ' ' - node_obj.job_explanation += 'Task failed pre-start check.' - node_obj.save() - continue + with transaction.atomic(): + start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) + if not start_status: + node_obj.status = 'failed' + if node_obj.job_explanation: + node_obj.job_explanation += ' ' + node_obj.job_explanation += 'Task failed pre-start check.' + node_obj.save() + continue remaining_volume -= impact running_impact += impact logger.info('Started Node: %s (capacity hit: %s) ' 'Remaining Capacity: %s' % (str(node_obj), str(impact), str(remaining_volume))) - - def schedule(): + lockfile = open("/tmp/tower_scheduler.lock", "w") + fcntl.lockf(lockfile, fcntl.LOCK_EX) + task_capacity = get_system_task_capacity() + + workflow_jobs = get_running_workflow_jobs() + process_finished_workflow_jobs(workflow_jobs) + spawn_workflow_graph_jobs(workflow_jobs) + graph = rebuild_graph() if graph: process_graph(graph, task_capacity) + fcntl.lockf(lockfile, fcntl.LOCK_UN) + diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 31db196a9b..bc09c58f60 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1665,21 +1665,30 @@ class RunSystemJob(BaseTask): def build_cwd(self, instance, **kwargs): return settings.BASE_DIR +''' class RunWorkflowJob(BaseTask): name = 'awx.main.tasks.run_workflow_job' model = WorkflowJob def run(self, pk, **kwargs): - print("I'm a running a workflow job") - ''' - Run the job/task and capture its output. - ''' - pass + #Run the job/task and capture its output. instance = self.update_model(pk, status='running', celery_task_id=self.request.id) instance.socketio_emit_status("running") - # FIXME: Detect workflow run completion + # FIXME: Currently, the workflow job busy waits until the graph run is + # complete. Instead, the workflow job should return or never even run, + # because all of the "launch logic" can be done schedule(). + + # However, other aspects of our system depend on a 1-1 relationship + # between a Job and a Celery Task. + # + # * If we let the workflow job task (RunWorkflowJob.run()) complete + # then how do we trigger the handle_work_error and + # handle_work_success subtasks? + # + # * How do we handle the recovery process? (i.e. there is an entry in + # the database but not in celery). while True: dag = WorkflowDAG(instance) if dag.is_workflow_done(): @@ -1689,4 +1698,4 @@ class RunWorkflowJob(BaseTask): time.sleep(1) instance.socketio_emit_status(instance.status) # TODO: Handle cancel - +''' diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 20a80ecca4..445ce8924f 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -360,7 +360,7 @@ CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs', 'routing_key': 'scheduler.job.launch'}, 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler', 'routing_key': 'scheduler.job.complete'},}) - + CELERYBEAT_SCHEDULE = { 'tower_scheduler': { 'task': 'awx.main.tasks.tower_periodic_scheduler', From 3a8033dec4c9fecaa34b39791a5ddc33bbdccbd7 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 22 Sep 2016 17:54:27 -0400 Subject: [PATCH 09/25] cheesy global lock --- awx/main/scheduler/__init__.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index f10cb2dcd6..e670fa6300 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -241,18 +241,18 @@ def process_graph(graph, task_capacity): (str(node_obj), str(impact), str(remaining_volume))) def schedule(): - lockfile = open("/tmp/tower_scheduler.lock", "w") - fcntl.lockf(lockfile, fcntl.LOCK_EX) + with transaction.atomic(): + # Lock + Instance.objects.select_for_update().all()[0] - task_capacity = get_system_task_capacity() + task_capacity = get_system_task_capacity() - workflow_jobs = get_running_workflow_jobs() - process_finished_workflow_jobs(workflow_jobs) - spawn_workflow_graph_jobs(workflow_jobs) + workflow_jobs = get_running_workflow_jobs() + process_finished_workflow_jobs(workflow_jobs) + spawn_workflow_graph_jobs(workflow_jobs) - graph = rebuild_graph() - if graph: - process_graph(graph, task_capacity) - - fcntl.lockf(lockfile, fcntl.LOCK_UN) + graph = rebuild_graph() + if graph: + process_graph(graph, task_capacity) + # Unlock, due to transaction ending From 89250dcf3686ac06f752b94fb3ac6f070ccaf9cc Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Tue, 27 Sep 2016 16:04:00 -0400 Subject: [PATCH 10/25] removed wait_task look restriction --- awx/main/scheduler/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index e670fa6300..24958c0e0d 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -165,7 +165,7 @@ def rebuild_graph(): graph = SimpleDAG() for task in running_tasks: graph.add_node(task) - for wait_task in waiting_tasks[:50]: + for wait_task in waiting_tasks: node_dependencies = [] for node in graph: if wait_task.is_blocked_by(node['node_object']): From d65120538dc6bd75d994f2d1497258c5dcd15c97 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Tue, 27 Sep 2016 16:05:30 -0400 Subject: [PATCH 11/25] scheduler messages need not be durable --- awx/settings/defaults.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 445ce8924f..2e3858e331 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -343,7 +343,7 @@ CELERY_IMPORTS = ('awx.main.scheduler.tasks',) CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('jobs', Exchange('jobs'), routing_key='jobs'), - Queue('scheduler', Exchange('scheduler', type='topic'), routing_key='scheduler.job.#'), + Queue('scheduler', Exchange('scheduler', type='topic'), routing_key='scheduler.job.#', durable=False), # Projects use a fanout queue, this isn't super well supported Broadcast('projects'), ) From 0007ef2546378ac957a0016099418545b2d6ac91 Mon Sep 17 00:00:00 2001 From: Chris Church Date: Wed, 28 Sep 2016 09:58:19 -0400 Subject: [PATCH 12/25] Add --skip-errors option to migrate_to_database_settings command, allow any false/null value for 'off' in pendo setting. --- .../commands/migrate_to_database_settings.py | 86 ++++++++++++------- awx/ui/conf.py | 11 ++- 2 files changed, 67 insertions(+), 30 deletions(-) diff --git a/awx/conf/management/commands/migrate_to_database_settings.py b/awx/conf/management/commands/migrate_to_database_settings.py index 36fd783475..f708ae4a1d 100644 --- a/awx/conf/management/commands/migrate_to_database_settings.py +++ b/awx/conf/management/commands/migrate_to_database_settings.py @@ -38,6 +38,13 @@ class Command(BaseCommand): default=False, help='Only show which settings would be commented/migrated.', ) + parser.add_argument( + '--skip-errors', + action='store_true', + dest='skip_errors', + default=False, + help='Skip over settings that would raise an error when commenting/migrating.', + ) parser.add_argument( '--no-comment', action='store_true', @@ -56,6 +63,7 @@ class Command(BaseCommand): def handle(self, *args, **options): self.verbosity = int(options.get('verbosity', 1)) self.dry_run = bool(options.get('dry_run', False)) + self.skip_errors = bool(options.get('skip_errors', False)) self.no_comment = bool(options.get('no_comment', False)) self.backup_suffix = options.get('backup_suffix', '') self.categories = options.get('category', None) or ['all'] @@ -134,17 +142,14 @@ class Command(BaseCommand): def _check_if_needs_comment(self, patterns, setting): files_to_comment = [] - try: - # If any diffs are returned, this setting needs to be commented. - diffs = comment_assignments(patterns, setting, dry_run=True) - if setting == 'LICENSE': - diffs.extend(self._comment_license_file(dry_run=True)) - for diff in diffs: - for line in diff.splitlines(): - if line.startswith('+++ '): - files_to_comment.append(line[4:]) - except Exception as e: - raise CommandError('Error commenting {0}: {1!r}'.format(setting, e)) + # If any diffs are returned, this setting needs to be commented. + diffs = comment_assignments(patterns, setting, dry_run=True) + if setting == 'LICENSE': + diffs.extend(self._comment_license_file(dry_run=True)) + for diff in diffs: + for line in diff.splitlines(): + if line.startswith('+++ '): + files_to_comment.append(line[4:]) return files_to_comment def _check_if_needs_migration(self, setting): @@ -163,26 +168,39 @@ class Command(BaseCommand): return current_value return empty - def _display_tbd(self, setting, files_to_comment, migrate_value): + def _display_tbd(self, setting, files_to_comment, migrate_value, comment_error=None, migrate_error=None): if self.verbosity >= 1: if files_to_comment: if migrate_value is not empty: action = 'Migrate + Comment' else: action = 'Comment' + if comment_error or migrate_error: + action = self.style.ERROR('{} (skipped)'.format(action)) + else: + action = self.style.OK(action) self.stdout.write(' {}: {}'.format( self.style.LABEL(setting), - self.style.OK(action), + action, )) if self.verbosity >= 2: - if migrate_value is not empty: + if migrate_error: + self.stdout.write(' - Migrate value: {}'.format( + self.style.ERROR(migrate_error), + )) + elif migrate_value is not empty: self.stdout.write(' - Migrate value: {}'.format( self.style.VALUE(repr(migrate_value)), )) - for file_to_comment in files_to_comment: - self.stdout.write(' - Comment in: {}'.format( - self.style.VALUE(file_to_comment), + if comment_error: + self.stdout.write(' - Comment: {}'.format( + self.style.ERROR(comment_error), )) + elif files_to_comment: + for file_to_comment in files_to_comment: + self.stdout.write(' - Comment in: {}'.format( + self.style.VALUE(file_to_comment), + )) else: if self.verbosity >= 2: self.stdout.write(' {}: {}'.format( @@ -255,15 +273,33 @@ class Command(BaseCommand): to_migrate = collections.OrderedDict() to_comment = collections.OrderedDict() for name in registered_settings: - files_to_comment = self._check_if_needs_comment(patterns, name) + comment_error, migrate_error = None, None + files_to_comment = [] + try: + files_to_comment = self._check_if_needs_comment(patterns, name) + except Exception as e: + comment_error = 'Error commenting {0}: {1!r}'.format(name, e) + if not self.skip_errors: + raise CommandError(comment_error) if files_to_comment: to_comment[name] = files_to_comment migrate_value = empty if files_to_comment: migrate_value = self._check_if_needs_migration(name) if migrate_value is not empty: - to_migrate[name] = migrate_value - self._display_tbd(name, files_to_comment, migrate_value) + field = settings_registry.get_setting_field(name) + assert not field.read_only + try: + data = field.to_representation(migrate_value) + setting_value = field.run_validation(data) + db_value = field.to_representation(setting_value) + to_migrate[name] = db_value + except Exception as e: + to_comment.pop(name) + migrate_error = 'Unable to assign value {0!r} to setting "{1}: {2!s}".'.format(migrate_value, name, e) + if not self.skip_errors: + raise CommandError(migrate_error) + self._display_tbd(name, files_to_comment, migrate_value, comment_error, migrate_error) if self.verbosity == 1 and not to_migrate and not to_comment: self.stdout.write(' No settings found to migrate or comment!') @@ -275,15 +311,7 @@ class Command(BaseCommand): self.stdout.write(self.style.HEADING('Migrating settings to database:')) if not to_migrate: self.stdout.write(' No settings to migrate!') - for name, value in to_migrate.items(): - field = settings_registry.get_setting_field(name) - assert not field.read_only - try: - data = field.to_representation(value) - setting_value = field.run_validation(data) - db_value = field.to_representation(setting_value) - except Exception as e: - raise CommandError('Unable to assign value {0!r} to setting "{1}: {2!s}".'.format(value, name, e)) + for name, db_value in to_migrate.items(): display_value = json.dumps(db_value, indent=4) # Always encode "raw" strings as JSON. if isinstance(db_value, basestring): diff --git a/awx/ui/conf.py b/awx/ui/conf.py index 46fd4288c4..a92eeea35c 100644 --- a/awx/ui/conf.py +++ b/awx/ui/conf.py @@ -8,9 +8,18 @@ from django.utils.translation import ugettext_lazy as _ from awx.conf import fields, register +class PendoTrackingStateField(fields.ChoiceField): + + def to_internal_value(self, data): + # Any false/null values get converted to 'off'. + if data in fields.NullBooleanField.FALSE_VALUES or data in fields.NullBooleanField.NULL_VALUES: + return 'off' + return super(PendoTrackingStateField, self).to_internal_value(data) + + register( 'PENDO_TRACKING_STATE', - field_class=fields.ChoiceField, + field_class=PendoTrackingStateField, choices=[ ('off', _('Off')), ('anonymous', _('Anonymous')), From 0d538f8b0f5065bcb31264c7d63ca1cf139a8dc4 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Wed, 28 Sep 2016 10:23:45 -0400 Subject: [PATCH 13/25] Revert "removed wait_task look restriction" This reverts commit f159fd45406c6af2aacc48a44e2a3993bfa19ce8. --- awx/main/scheduler/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index 24958c0e0d..e670fa6300 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -165,7 +165,7 @@ def rebuild_graph(): graph = SimpleDAG() for task in running_tasks: graph.add_node(task) - for wait_task in waiting_tasks: + for wait_task in waiting_tasks[:50]: node_dependencies = [] for node in graph: if wait_task.is_blocked_by(node['node_object']): From 0ce7b31502eb23e0ea18998444cf105030274727 Mon Sep 17 00:00:00 2001 From: Chris Church Date: Wed, 28 Sep 2016 11:04:35 -0400 Subject: [PATCH 14/25] Fix default value validation for LDAP/SAML settings to prevent warnings. --- awx/conf/fields.py | 5 +++++ awx/sso/conf.py | 3 +++ awx/sso/fields.py | 6 +++++- 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/awx/conf/fields.py b/awx/conf/fields.py index a560d3a637..ae299137e6 100644 --- a/awx/conf/fields.py +++ b/awx/conf/fields.py @@ -34,6 +34,11 @@ class URLField(CharField): validator_kwargs['schemes'] = schemes self.validators.append(URLValidator(**validator_kwargs)) + def to_representation(self, value): + if value is None: + return '' + return super(URLField, self).to_representation(value) + def run_validators(self, value): if self.allow_plain_hostname: try: diff --git a/awx/sso/conf.py b/awx/sso/conf.py index 264b609367..e0842f6031 100644 --- a/awx/sso/conf.py +++ b/awx/sso/conf.py @@ -169,6 +169,7 @@ register( field_class=fields.URLField, schemes=('ldap', 'ldaps'), allow_blank=True, + default='', label=_('LDAP Server URI'), help_text=_('URI to connect to LDAP server, such as "ldap://ldap.example.com:389" ' '(non-SSL) or "ldaps://ldap.example.com:636" (SSL). LDAP authentication ' @@ -880,6 +881,7 @@ register( register( 'SOCIAL_AUTH_SAML_TECHNICAL_CONTACT', field_class=fields.SAMLContactField, + allow_blank=True, default={}, label=_('SAML Service Provider Technical Contact'), help_text=_('Configure this setting with your contact information.'), @@ -894,6 +896,7 @@ register( register( 'SOCIAL_AUTH_SAML_SUPPORT_CONTACT', field_class=fields.SAMLContactField, + allow_blank=True, default={}, label=_('SAML Service Provider Support Contact'), help_text=_('Configure this setting with your contact information.'), diff --git a/awx/sso/fields.py b/awx/sso/fields.py index 6655ad3523..a0d472756e 100644 --- a/awx/sso/fields.py +++ b/awx/sso/fields.py @@ -349,6 +349,10 @@ class BaseDictWithChildField(fields.DictField): } allow_unknown_keys = False + def __init__(self, *args, **kwargs): + self.allow_blank = kwargs.pop('allow_blank', False) + super(BaseDictWithChildField, self).__init__(*args, **kwargs) + def to_representation(self, value): value = super(BaseDictWithChildField, self).to_representation(value) for k, v in value.items(): @@ -367,7 +371,7 @@ class BaseDictWithChildField(fields.DictField): continue elif key not in data: missing_keys.add(key) - if missing_keys: + if missing_keys and (data or not self.allow_blank): keys_display = json.dumps(list(missing_keys)).lstrip('[').rstrip(']') self.fail('missing_keys', missing_keys=keys_display) if not self.allow_unknown_keys: From f6c50cc63a8946ae81c3c17effb1a1ff5f5f1308 Mon Sep 17 00:00:00 2001 From: Graham Mainwaring Date: Wed, 28 Sep 2016 12:38:37 -0400 Subject: [PATCH 15/25] Update Makefile so that ui-release does not get rebuilt over and over when troubleshooting build/release processes --- .gitignore | 3 ++- Makefile | 32 +++++++++++++++++++------------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index 20e90fc35c..afd8aa7187 100644 --- a/.gitignore +++ b/.gitignore @@ -51,8 +51,9 @@ __pycache__ /tmp npm-debug.log -# UI build debugging +# UI build flag files awx/ui/.deps_built +awx/ui/.release_built # Testing .cache diff --git a/Makefile b/Makefile index fd9d87cd2e..b00b8b3536 100644 --- a/Makefile +++ b/Makefile @@ -170,6 +170,10 @@ ifeq ($(DISTRO),ubuntu) SETUP_INSTALL_ARGS += --install-layout=deb endif +# UI flag files +UI_DEPS_FLAG_FILE = awx/ui/.deps_built +UI_RELEASE_FLAG_FILE = awx/ui/.release_built + .DEFAULT_GOAL := build .PHONY: clean clean-tmp clean-venv rebase push requirements requirements_dev \ @@ -213,7 +217,8 @@ clean-bundle: clean-ui: rm -rf awx/ui/static/ rm -rf awx/ui/node_modules/ - rm -f awx/ui/.deps_built + rm -f $(UI_DEPS_FLAG_FILE) + rm -f $(UI_RELEASE_FLAG_FILE) clean-tmp: rm -rf tmp/ @@ -224,7 +229,6 @@ clean-venv: # Remove temporary build files, compiled Python files. clean: clean-rpm clean-deb clean-ui clean-tar clean-packer clean-bundle rm -rf awx/lib/site-packages - rm -rf awx/lib/.deps_built rm -rf dist/* rm -rf tmp mkdir tmp @@ -482,32 +486,35 @@ test_jenkins : test_coverage # UI TASKS # -------------------------------------- -ui-deps-built: awx/ui/package.json +$(UI_DEPS_FLAG_FILE): awx/ui/package.json $(NPM_BIN) --unsafe-perm --prefix awx/ui install awx/ui - touch awx/ui/.deps_built + touch $(UI_DEPS_FLAG_FILE) -ui-docker-machine: ui-deps-built +ui-docker-machine: $(UI_DEPS_FLAG_FILE) $(NPM_BIN) --prefix awx/ui run build-docker-machine -ui-docker: ui-deps-built +ui-docker: $(UI_DEPS_FLAG_FILE) $(NPM_BIN) --prefix awx/ui run build-docker-cid -ui-release: ui-deps-built - $(NPM_BIN) --prefix awx/ui run build-release +ui-release: $(UI_RELEASE_FLAG_FILE) -ui-test: ui-deps-built +$(UI_RELEASE_FLAG_FILE): $(UI_DEPS_FLAG_FILE) + $(NPM_BIN) --prefix awx/ui run build-release + touch $(UI_RELEASE_FLAG_FILE) + +ui-test: $(UI_DEPS_FLAG_FILE) $(NPM_BIN) --prefix awx/ui run test -ui-test-ci: ui-deps-built +ui-test-ci: $(UI_DEPS_FLAG_FILE) $(NPM_BIN) --prefix awx/ui run test:ci testjs_ci: echo "Update UI unittests later" #ui-test-ci -jshint: ui-deps-built +jshint: $(UI_DEPS_FLAG_FILE) grunt --gruntfile awx/ui/Gruntfile.js jshint #Depends on node 6.x and npm 3.x installed on Jenkins slave -ui-test-saucelabs: ui-deps-built +ui-test-saucelabs: $(UI_DEPS_FLAG_FILE) $(NPM_BIN) --prefix awx/ui run test:saucelabs # END UI TASKS @@ -773,7 +780,6 @@ docker-compose-build: MACHINE?=default docker-clean: - rm -f awx/lib/.deps_built eval $$(docker-machine env $(MACHINE)) $(foreach container_id,$(shell docker ps -f name=tools_tower -aq),docker stop $(container_id); docker rm -f $(container_id);) -docker images | grep "tower_devel" | awk '{print $3}' | xargs docker rmi From 38f5ae21f8ffb513c1ae78624d3577c844934549 Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Wed, 28 Sep 2016 14:14:29 -0400 Subject: [PATCH 16/25] Set up post_process flag for JobEvent save in callback receiver. --- awx/main/management/commands/run_callback_receiver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index dcb3906013..d2b89cd44d 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -117,7 +117,7 @@ class CallbackBrokerWorker(ConsumerMixin): else: print("Cache hit") j.parent_id = parent_id - j.save() + j.save(post_process=True) if event_uuid: cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300) except DatabaseError as e: From eaa6567cf312174f1c7b8d55f2b456545a75d1ed Mon Sep 17 00:00:00 2001 From: Aaron Tan Date: Wed, 28 Sep 2016 15:45:23 -0400 Subject: [PATCH 17/25] Convert notification_subject from byte string to unicode. --- awx/main/models/notifications.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/awx/main/models/notifications.py b/awx/main/models/notifications.py index 442b5dc2c8..a9dbcdebdc 100644 --- a/awx/main/models/notifications.py +++ b/awx/main/models/notifications.py @@ -181,11 +181,11 @@ class JobNotificationMixin(object): def _build_notification_message(self, status_str): notification_body = self.notification_data() - notification_subject = "{} #{} '{}' {} on Ansible Tower: {}".format(self.get_notification_friendly_name(), - self.id, - self.name, - status_str, - notification_body['url']) + notification_subject = u"{} #{} '{}' {} on Ansible Tower: {}".format(self.get_notification_friendly_name(), + self.id, + self.name, + status_str, + notification_body['url']) notification_body['friendly_name'] = self.get_notification_friendly_name() return (notification_subject, notification_body) From 2f205a6862633e2b86d1c45fa57f948fcb62d188 Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Wed, 28 Sep 2016 16:02:00 -0400 Subject: [PATCH 18/25] show capabilities on launch and relaunch, remove unintended capabilities --- awx/api/serializers.py | 1 - awx/api/views.py | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 753fdad8b6..a170329875 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -1075,7 +1075,6 @@ class InventoryDetailSerializer(InventorySerializer): class InventoryScriptSerializer(InventorySerializer): - show_capabilities = ['copy', 'edit', 'delete'] class Meta: fields = () diff --git a/awx/api/views.py b/awx/api/views.py index 23dca79c80..3fc668e615 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2282,7 +2282,7 @@ class JobTemplateLaunch(RetrieveAPIView, GenericAPIView): else: data = OrderedDict() data['ignored_fields'] = ignored_fields - data.update(JobSerializer(new_job).to_representation(new_job)) + data.update(JobSerializer(new_job, context=self.get_serializer_context()).to_representation(new_job)) data['job'] = new_job.id return Response(data, status=status.HTTP_201_CREATED) @@ -2965,7 +2965,7 @@ class JobRelaunch(RetrieveAPIView, GenericAPIView): data = dict(passwords_needed_to_start=new_job.passwords_needed_to_start) return Response(data, status=status.HTTP_400_BAD_REQUEST) else: - data = JobSerializer(new_job).data + data = JobSerializer(new_job, context=self.get_serializer_context()).data # Add job key to match what old relaunch returned. data['job'] = new_job.id headers = {'Location': new_job.get_absolute_url()} @@ -3423,7 +3423,7 @@ class AdHocCommandRelaunch(GenericAPIView): data = dict(passwords_needed_to_start=new_ad_hoc_command.passwords_needed_to_start) return Response(data, status=status.HTTP_400_BAD_REQUEST) else: - data = AdHocCommandSerializer(new_ad_hoc_command).data + data = AdHocCommandSerializer(new_ad_hoc_command, context=self.get_serializer_context()).data # Add ad_hoc_command key to match what was previously returned. data['ad_hoc_command'] = new_ad_hoc_command.id headers = {'Location': new_ad_hoc_command.get_absolute_url()} From bdd444fb44c120cb2bb808060761c5d01ad3d132 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 29 Sep 2016 09:53:36 -0400 Subject: [PATCH 19/25] removed print --- awx/main/models/unified_jobs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index bcdde810e9..d304f24d79 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -861,7 +861,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique self.update_fields(start_args=json.dumps(kwargs), status='pending') self.socketio_emit_status("pending") - print("Running job launch for job %s" % self.name) from awx.main.scheduler.tasks import run_job_launch run_job_launch.delay(self.id) From fbc1dff4de8f59ecfd763231d7c761c466d90b11 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 29 Sep 2016 10:23:00 -0400 Subject: [PATCH 20/25] flake8 fixes --- awx/main/scheduler/__init__.py | 1 - awx/main/scheduler/dag_simple.py | 12 ++++++------ awx/main/tasks.py | 1 - awx/main/tests/base.py | 2 -- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py index e670fa6300..03c87d8ddb 100644 --- a/awx/main/scheduler/__init__.py +++ b/awx/main/scheduler/__init__.py @@ -4,7 +4,6 @@ # Python import datetime import logging -import struct, fcntl, os # Django from django.conf import settings diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py index 79b20520e2..aeb0ff759e 100644 --- a/awx/main/scheduler/dag_simple.py +++ b/awx/main/scheduler/dag_simple.py @@ -1,11 +1,11 @@ from awx.main.models import ( - Job, - AdHocCommand, - InventoryUpdate, - ProjectUpdate, - WorkflowJob, - SystemJob, + Job, + AdHocCommand, + InventoryUpdate, + ProjectUpdate, + WorkflowJob, + SystemJob, ) class SimpleDAG(object): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index bc09c58f60..10fb82c67a 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -52,7 +52,6 @@ from awx.main.task_engine import TaskSerializer, TASK_TIMEOUT_INTERVAL from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, emit_websocket_notification, check_proot_installed, build_proot_temp_dir, wrap_args_with_proot) -from awx.main.scheduler.dag_workflow import WorkflowDAG __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error', diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index 34be4081b8..3950b538c5 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -12,7 +12,6 @@ import sys import tempfile import time import urllib -from multiprocessing import Process import re import mock @@ -30,7 +29,6 @@ from django.utils.encoding import force_text # AWX from awx.main.models import * # noqa -from awx.main.management.commands.run_callback_receiver import CallbackReceiver from awx.main.utils import get_ansible_version from awx.main.task_engine import TaskEngager as LicenseWriter from awx.sso.backends import LDAPSettings From 6b0e3378862b783cc1fb03818f868db0a9cd8cb5 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Thu, 29 Sep 2016 10:37:18 -0400 Subject: [PATCH 21/25] Update postgres yum/apt repo locations *Thanks postgres team --- tools/docker-compose/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/docker-compose/Dockerfile b/tools/docker-compose/Dockerfile index c583c4ddfd..ddd54616bb 100644 --- a/tools/docker-compose/Dockerfile +++ b/tools/docker-compose/Dockerfile @@ -9,7 +9,7 @@ RUN mkdir /tmp/requirements ADD requirements/requirements.txt requirements/requirements_ansible.txt requirements/requirements_dev.txt requirements/requirements_jenkins.txt /tmp/requirements/ RUN yum -y update && yum -y install curl epel-release RUN curl --silent --location https://rpm.nodesource.com/setup_6.x | bash - -RUN yum -y localinstall http://yum.postgresql.org/9.4/redhat/rhel-6-x86_64/pgdg-centos94-9.4-1.noarch.rpm +RUN yum -y localinstall http://download.postgresql.org/pub/repos/yum/9.4/redhat/rhel-6-x86_64/pgdg-centos94-9.4-3.noarch.rpm ADD tools/docker-compose/proot.repo /etc/yum.repos.d/proot.repo RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git mercurial subversion python-devel python-psycopg2 make postgresql postgresql-devel nodejs python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel proot python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel RUN pip install flake8 pytest==2.9.2 pytest-pythonpath pytest-django pytest-cov pytest-mock dateutils django-debug-toolbar==1.4 pyflakes==1.0.0 virtualenv From a648beba9098f1e397f1e5b4d1dee4116aaad172 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Thu, 29 Sep 2016 12:03:06 -0400 Subject: [PATCH 22/25] Rev django security version --- requirements/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 3e82d6bc81..d236b5d1f1 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -12,7 +12,7 @@ cliff==1.15.0 cmd2==0.6.8 d2to1==0.2.11 # TODO: Still needed? defusedxml==0.4.1 -Django==1.8.10 +Django==1.8.15 debtcollector==1.2.0 decorator==4.0.6 django-auth-ldap==1.2.6 From ec2b0ac90d9d1ddda5e215d804decc8431bc8bfc Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 29 Sep 2016 16:01:15 -0400 Subject: [PATCH 23/25] add back in removed method --- awx/main/tasks.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 07cd7df835..6dbac70108 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -305,6 +305,10 @@ class BaseTask(Task): logger.error('Failed to update %s after %d retries.', self.model._meta.object_name, _attempt) + def signal_finished(self, pk): + pass + # notify_task_runner(dict(complete=pk)) + def get_path_to(self, *args): ''' Return absolute path relative to this file. From 1a60dd89bdacf11ba0c5443887e1981bbd66c24b Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 29 Sep 2016 16:01:15 -0400 Subject: [PATCH 24/25] add back in removed method --- awx/main/tasks.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 07cd7df835..6dbac70108 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -305,6 +305,10 @@ class BaseTask(Task): logger.error('Failed to update %s after %d retries.', self.model._meta.object_name, _attempt) + def signal_finished(self, pk): + pass + # notify_task_runner(dict(complete=pk)) + def get_path_to(self, *args): ''' Return absolute path relative to this file. From 9cafebd8db85e2330b46cf2ddfff2cd1a661c93d Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 29 Sep 2016 16:17:05 -0400 Subject: [PATCH 25/25] remove job to jt allow_simultaneous dependency * Foreshadowing of what's to come with the task manager. When deciding on what job to run in our task manager, we can't depend on job template fields. Otherwise, this would cost us a query. --- awx/api/serializers.py | 3 ++- .../migrations/0037_job_allow_simultaneous.py | 19 +++++++++++++++++++ awx/main/models/jobs.py | 10 +++++----- 3 files changed, 26 insertions(+), 6 deletions(-) create mode 100644 awx/main/migrations/0037_job_allow_simultaneous.py diff --git a/awx/api/serializers.py b/awx/api/serializers.py index db249bac9a..97191607b7 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -1953,7 +1953,8 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer): model = Job fields = ('*', 'job_template', 'passwords_needed_to_start', 'ask_variables_on_launch', 'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch', - 'ask_job_type_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch') + 'ask_job_type_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch', + 'allow_simultaneous',) def get_related(self, obj): res = super(JobSerializer, self).get_related(obj) diff --git a/awx/main/migrations/0037_job_allow_simultaneous.py b/awx/main/migrations/0037_job_allow_simultaneous.py new file mode 100644 index 0000000000..8a2e89df94 --- /dev/null +++ b/awx/main/migrations/0037_job_allow_simultaneous.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0036_v310_remove_tower_settings'), + ] + + operations = [ + migrations.AddField( + model_name='job', + name='allow_simultaneous', + field=models.BooleanField(default=False), + ), + ] diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 1602872d2b..8fa9c8d176 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -138,6 +138,9 @@ class JobOptions(BaseModel): become_enabled = models.BooleanField( default=False, ) + allow_simultaneous = models.BooleanField( + default=False, + ) extra_vars_dict = VarsDictProperty('extra_vars', True) @@ -236,9 +239,6 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin): read_role = ImplicitRoleField( parent_role=['project.organization.auditor_role', 'inventory.organization.auditor_role', 'execute_role', 'admin_role'], ) - allow_simultaneous = models.BooleanField( - default=False, - ) @classmethod @@ -251,7 +251,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin): 'playbook', 'credential', 'cloud_credential', 'network_credential', 'forks', 'schedule', 'limit', 'verbosity', 'job_tags', 'extra_vars', 'launch_type', 'force_handlers', 'skip_tags', 'start_at_task', 'become_enabled', - 'labels', 'survey_passwords'] + 'labels', 'survey_passwords', 'allow_simultaneous',] def resource_validation_data(self): ''' @@ -616,7 +616,7 @@ class Job(UnifiedJob, JobOptions, JobNotificationMixin): if obj.job_template is not None and obj.inventory is not None: if obj.job_template == self.job_template and \ obj.inventory == self.inventory: - if self.job_template.allow_simultaneous: + if self.allow_simultaneous: return False if obj.launch_type == 'callback' and self.launch_type == 'callback' and \ obj.limit != self.limit: