diff --git a/.gitignore b/.gitignore index 20e90fc35c..afd8aa7187 100644 --- a/.gitignore +++ b/.gitignore @@ -51,8 +51,9 @@ __pycache__ /tmp npm-debug.log -# UI build debugging +# UI build flag files awx/ui/.deps_built +awx/ui/.release_built # Testing .cache diff --git a/Makefile b/Makefile index fd9d87cd2e..510fff2318 100644 --- a/Makefile +++ b/Makefile @@ -170,6 +170,10 @@ ifeq ($(DISTRO),ubuntu) SETUP_INSTALL_ARGS += --install-layout=deb endif +# UI flag files +UI_DEPS_FLAG_FILE = awx/ui/.deps_built +UI_RELEASE_FLAG_FILE = awx/ui/.release_built + .DEFAULT_GOAL := build .PHONY: clean clean-tmp clean-venv rebase push requirements requirements_dev \ @@ -213,7 +217,8 @@ clean-bundle: clean-ui: rm -rf awx/ui/static/ rm -rf awx/ui/node_modules/ - rm -f awx/ui/.deps_built + rm -f $(UI_DEPS_FLAG_FILE) + rm -f $(UI_RELEASE_FLAG_FILE) clean-tmp: rm -rf tmp/ @@ -224,7 +229,6 @@ clean-venv: # Remove temporary build files, compiled Python files. clean: clean-rpm clean-deb clean-ui clean-tar clean-packer clean-bundle rm -rf awx/lib/site-packages - rm -rf awx/lib/.deps_built rm -rf dist/* rm -rf tmp mkdir tmp @@ -357,7 +361,6 @@ server_noattach: tmux rename-window 'Tower' tmux select-window -t tower:0 tmux split-window -v 'exec make celeryd' - tmux split-window -h 'exec make taskmanager' tmux new-window 'exec make receiver' tmux select-window -t tower:1 tmux rename-window 'Extra Services' @@ -397,7 +400,7 @@ celeryd: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ fi; \ - $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default + $(PYTHON) manage.py celeryd -l DEBUG -B --autoscale=20,3 --schedule=$(CELERY_SCHEDULE_FILE) -Q projects,jobs,default,scheduler #$(PYTHON) manage.py celery multi show projects jobs default -l DEBUG -Q:projects projects -Q:jobs jobs -Q:default default -c:projects 1 -c:jobs 3 -c:default 3 -Ofair -B --schedule=$(CELERY_SCHEDULE_FILE) # Run to start the zeromq callback receiver @@ -407,16 +410,6 @@ receiver: fi; \ $(PYTHON) manage.py run_callback_receiver -taskmanager: - @if [ "$(VENV_BASE)" ]; then \ - . $(VENV_BASE)/tower/bin/activate; \ - fi; \ - if [ "$(COMPOSE_HOST)" == "tower_1" ] || [ "$(COMPOSE_HOST)" == "tower" ]; then \ - $(PYTHON) manage.py run_task_system; \ - else \ - while true; do sleep 2; done; \ - fi - socketservice: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/tower/bin/activate; \ @@ -482,32 +475,35 @@ test_jenkins : test_coverage # UI TASKS # -------------------------------------- -ui-deps-built: awx/ui/package.json +$(UI_DEPS_FLAG_FILE): awx/ui/package.json $(NPM_BIN) --unsafe-perm --prefix awx/ui install awx/ui - touch awx/ui/.deps_built + touch $(UI_DEPS_FLAG_FILE) -ui-docker-machine: ui-deps-built +ui-docker-machine: $(UI_DEPS_FLAG_FILE) $(NPM_BIN) --prefix awx/ui run build-docker-machine -ui-docker: ui-deps-built +ui-docker: $(UI_DEPS_FLAG_FILE) $(NPM_BIN) --prefix awx/ui run build-docker-cid -ui-release: ui-deps-built - $(NPM_BIN) --prefix awx/ui run build-release +ui-release: $(UI_RELEASE_FLAG_FILE) -ui-test: ui-deps-built +$(UI_RELEASE_FLAG_FILE): $(UI_DEPS_FLAG_FILE) + $(NPM_BIN) --prefix awx/ui run build-release + touch $(UI_RELEASE_FLAG_FILE) + +ui-test: $(UI_DEPS_FLAG_FILE) $(NPM_BIN) --prefix awx/ui run test -ui-test-ci: ui-deps-built +ui-test-ci: $(UI_DEPS_FLAG_FILE) $(NPM_BIN) --prefix awx/ui run test:ci testjs_ci: echo "Update UI unittests later" #ui-test-ci -jshint: ui-deps-built +jshint: $(UI_DEPS_FLAG_FILE) grunt --gruntfile awx/ui/Gruntfile.js jshint #Depends on node 6.x and npm 3.x installed on Jenkins slave -ui-test-saucelabs: ui-deps-built +ui-test-saucelabs: $(UI_DEPS_FLAG_FILE) $(NPM_BIN) --prefix awx/ui run test:saucelabs # END UI TASKS @@ -773,7 +769,6 @@ docker-compose-build: MACHINE?=default docker-clean: - rm -f awx/lib/.deps_built eval $$(docker-machine env $(MACHINE)) $(foreach container_id,$(shell docker ps -f name=tools_tower -aq),docker stop $(container_id); docker rm -f $(container_id);) -docker images | grep "tower_devel" | awk '{print $3}' | xargs docker rmi diff --git a/Procfile b/Procfile index 433417f70b..b8dd37a983 100644 --- a/Procfile +++ b/Procfile @@ -1,7 +1,6 @@ runserver: make runserver celeryd: make celeryd -taskmanager: make taskmanager receiver: make receiver socketservice: make socketservice factcacher: make factcacher -flower: make flower \ No newline at end of file +flower: make flower diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 1bdd81c2fc..6d8c234802 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -1069,7 +1069,6 @@ class InventoryDetailSerializer(InventorySerializer): class InventoryScriptSerializer(InventorySerializer): - show_capabilities = ['copy', 'edit', 'delete'] class Meta: fields = () @@ -1563,10 +1562,13 @@ class ResourceAccessListElementSerializer(UserSerializer): def format_role_perm(role): role_dict = { 'id': role.id, 'name': role.name, 'description': role.description} - if role.content_type is not None: + try: role_dict['resource_name'] = role.content_object.name role_dict['resource_type'] = role.content_type.name role_dict['related'] = reverse_gfk(role.content_object) + except AttributeError: + pass + if role.content_type is not None: role_dict['user_capabilities'] = {'unattach': requesting_user.can_access( Role, 'unattach', role, user, 'members', data={}, skip_sub_obj_read_check=False)} else: @@ -1923,7 +1925,8 @@ class JobSerializer(UnifiedJobSerializer, JobOptionsSerializer): model = Job fields = ('*', 'job_template', 'passwords_needed_to_start', 'ask_variables_on_launch', 'ask_limit_on_launch', 'ask_tags_on_launch', 'ask_skip_tags_on_launch', - 'ask_job_type_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch') + 'ask_job_type_on_launch', 'ask_inventory_on_launch', 'ask_credential_on_launch', + 'allow_simultaneous',) def get_related(self, obj): res = super(JobSerializer, self).get_related(obj) diff --git a/awx/api/views.py b/awx/api/views.py index 01579c29c2..6127600856 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -2275,6 +2275,7 @@ class JobTemplateLaunch(RetrieveAPIView, GenericAPIView): new_job = obj.create_unified_job(**kv) result = new_job.signal_start(**kv) + if not result: data = dict(passwords_needed_to_start=new_job.passwords_needed_to_start) new_job.delete() @@ -2282,7 +2283,7 @@ class JobTemplateLaunch(RetrieveAPIView, GenericAPIView): else: data = OrderedDict() data['ignored_fields'] = ignored_fields - data.update(JobSerializer(new_job).to_representation(new_job)) + data.update(JobSerializer(new_job, context=self.get_serializer_context()).to_representation(new_job)) data['job'] = new_job.id return Response(data, status=status.HTTP_201_CREATED) @@ -2968,7 +2969,7 @@ class JobRelaunch(RetrieveAPIView, GenericAPIView): data = dict(passwords_needed_to_start=new_job.passwords_needed_to_start) return Response(data, status=status.HTTP_400_BAD_REQUEST) else: - data = JobSerializer(new_job).data + data = JobSerializer(new_job, context=self.get_serializer_context()).data # Add job key to match what old relaunch returned. data['job'] = new_job.id headers = {'Location': new_job.get_absolute_url()} @@ -3426,7 +3427,7 @@ class AdHocCommandRelaunch(GenericAPIView): data = dict(passwords_needed_to_start=new_ad_hoc_command.passwords_needed_to_start) return Response(data, status=status.HTTP_400_BAD_REQUEST) else: - data = AdHocCommandSerializer(new_ad_hoc_command).data + data = AdHocCommandSerializer(new_ad_hoc_command, context=self.get_serializer_context()).data # Add ad_hoc_command key to match what was previously returned. data['ad_hoc_command'] = new_ad_hoc_command.id headers = {'Location': new_ad_hoc_command.get_absolute_url()} @@ -3762,7 +3763,16 @@ class RoleList(ListAPIView): new_in_300 = True def get_queryset(self): - return Role.visible_roles(self.request.user) + result = Role.visible_roles(self.request.user) + # Sanity check: is the requesting user an orphaned non-admin/auditor? + # if yes, make system admin/auditor mandatorily visible. + if not self.request.user.organizations.exists() and\ + not self.request.user.is_superuser and\ + not self.request.user.is_system_auditor: + mandatories = ('system_administrator', 'system_auditor') + super_qs = Role.objects.filter(singleton_name__in=mandatories) + result = result | super_qs + return result class RoleDetail(RetrieveAPIView): diff --git a/awx/conf/fields.py b/awx/conf/fields.py index a560d3a637..ae299137e6 100644 --- a/awx/conf/fields.py +++ b/awx/conf/fields.py @@ -34,6 +34,11 @@ class URLField(CharField): validator_kwargs['schemes'] = schemes self.validators.append(URLValidator(**validator_kwargs)) + def to_representation(self, value): + if value is None: + return '' + return super(URLField, self).to_representation(value) + def run_validators(self, value): if self.allow_plain_hostname: try: diff --git a/awx/conf/management/commands/migrate_to_database_settings.py b/awx/conf/management/commands/migrate_to_database_settings.py index 36fd783475..f708ae4a1d 100644 --- a/awx/conf/management/commands/migrate_to_database_settings.py +++ b/awx/conf/management/commands/migrate_to_database_settings.py @@ -38,6 +38,13 @@ class Command(BaseCommand): default=False, help='Only show which settings would be commented/migrated.', ) + parser.add_argument( + '--skip-errors', + action='store_true', + dest='skip_errors', + default=False, + help='Skip over settings that would raise an error when commenting/migrating.', + ) parser.add_argument( '--no-comment', action='store_true', @@ -56,6 +63,7 @@ class Command(BaseCommand): def handle(self, *args, **options): self.verbosity = int(options.get('verbosity', 1)) self.dry_run = bool(options.get('dry_run', False)) + self.skip_errors = bool(options.get('skip_errors', False)) self.no_comment = bool(options.get('no_comment', False)) self.backup_suffix = options.get('backup_suffix', '') self.categories = options.get('category', None) or ['all'] @@ -134,17 +142,14 @@ class Command(BaseCommand): def _check_if_needs_comment(self, patterns, setting): files_to_comment = [] - try: - # If any diffs are returned, this setting needs to be commented. - diffs = comment_assignments(patterns, setting, dry_run=True) - if setting == 'LICENSE': - diffs.extend(self._comment_license_file(dry_run=True)) - for diff in diffs: - for line in diff.splitlines(): - if line.startswith('+++ '): - files_to_comment.append(line[4:]) - except Exception as e: - raise CommandError('Error commenting {0}: {1!r}'.format(setting, e)) + # If any diffs are returned, this setting needs to be commented. + diffs = comment_assignments(patterns, setting, dry_run=True) + if setting == 'LICENSE': + diffs.extend(self._comment_license_file(dry_run=True)) + for diff in diffs: + for line in diff.splitlines(): + if line.startswith('+++ '): + files_to_comment.append(line[4:]) return files_to_comment def _check_if_needs_migration(self, setting): @@ -163,26 +168,39 @@ class Command(BaseCommand): return current_value return empty - def _display_tbd(self, setting, files_to_comment, migrate_value): + def _display_tbd(self, setting, files_to_comment, migrate_value, comment_error=None, migrate_error=None): if self.verbosity >= 1: if files_to_comment: if migrate_value is not empty: action = 'Migrate + Comment' else: action = 'Comment' + if comment_error or migrate_error: + action = self.style.ERROR('{} (skipped)'.format(action)) + else: + action = self.style.OK(action) self.stdout.write(' {}: {}'.format( self.style.LABEL(setting), - self.style.OK(action), + action, )) if self.verbosity >= 2: - if migrate_value is not empty: + if migrate_error: + self.stdout.write(' - Migrate value: {}'.format( + self.style.ERROR(migrate_error), + )) + elif migrate_value is not empty: self.stdout.write(' - Migrate value: {}'.format( self.style.VALUE(repr(migrate_value)), )) - for file_to_comment in files_to_comment: - self.stdout.write(' - Comment in: {}'.format( - self.style.VALUE(file_to_comment), + if comment_error: + self.stdout.write(' - Comment: {}'.format( + self.style.ERROR(comment_error), )) + elif files_to_comment: + for file_to_comment in files_to_comment: + self.stdout.write(' - Comment in: {}'.format( + self.style.VALUE(file_to_comment), + )) else: if self.verbosity >= 2: self.stdout.write(' {}: {}'.format( @@ -255,15 +273,33 @@ class Command(BaseCommand): to_migrate = collections.OrderedDict() to_comment = collections.OrderedDict() for name in registered_settings: - files_to_comment = self._check_if_needs_comment(patterns, name) + comment_error, migrate_error = None, None + files_to_comment = [] + try: + files_to_comment = self._check_if_needs_comment(patterns, name) + except Exception as e: + comment_error = 'Error commenting {0}: {1!r}'.format(name, e) + if not self.skip_errors: + raise CommandError(comment_error) if files_to_comment: to_comment[name] = files_to_comment migrate_value = empty if files_to_comment: migrate_value = self._check_if_needs_migration(name) if migrate_value is not empty: - to_migrate[name] = migrate_value - self._display_tbd(name, files_to_comment, migrate_value) + field = settings_registry.get_setting_field(name) + assert not field.read_only + try: + data = field.to_representation(migrate_value) + setting_value = field.run_validation(data) + db_value = field.to_representation(setting_value) + to_migrate[name] = db_value + except Exception as e: + to_comment.pop(name) + migrate_error = 'Unable to assign value {0!r} to setting "{1}: {2!s}".'.format(migrate_value, name, e) + if not self.skip_errors: + raise CommandError(migrate_error) + self._display_tbd(name, files_to_comment, migrate_value, comment_error, migrate_error) if self.verbosity == 1 and not to_migrate and not to_comment: self.stdout.write(' No settings found to migrate or comment!') @@ -275,15 +311,7 @@ class Command(BaseCommand): self.stdout.write(self.style.HEADING('Migrating settings to database:')) if not to_migrate: self.stdout.write(' No settings to migrate!') - for name, value in to_migrate.items(): - field = settings_registry.get_setting_field(name) - assert not field.read_only - try: - data = field.to_representation(value) - setting_value = field.run_validation(data) - db_value = field.to_representation(setting_value) - except Exception as e: - raise CommandError('Unable to assign value {0!r} to setting "{1}: {2!s}".'.format(value, name, e)) + for name, db_value in to_migrate.items(): display_value = json.dumps(db_value, indent=4) # Always encode "raw" strings as JSON. if isinstance(db_value, basestring): diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index dcb3906013..d2b89cd44d 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -117,7 +117,7 @@ class CallbackBrokerWorker(ConsumerMixin): else: print("Cache hit") j.parent_id = parent_id - j.save() + j.save(post_process=True) if event_uuid: cache.set("{}_{}".format(payload['job_id'], event_uuid), j.id, 300) except DatabaseError as e: diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py deleted file mode 100644 index 07b3ab3df5..0000000000 --- a/awx/main/management/commands/run_task_system.py +++ /dev/null @@ -1,481 +0,0 @@ -#Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved - -# Python -import os -import datetime -import logging -import signal -import time - -# Django -from django.conf import settings -from django.core.management.base import NoArgsCommand - -# AWX -from awx.main.models import * # noqa -from awx.main.queue import FifoQueue -from awx.main.tasks import handle_work_error, handle_work_success -from awx.main.utils import get_system_task_capacity - -# Celery -from celery.task.control import inspect - -logger = logging.getLogger('awx.main.commands.run_task_system') - -queue = FifoQueue('tower_task_manager') - -class SimpleDAG(object): - ''' A simple implementation of a directed acyclic graph ''' - - def __init__(self): - self.nodes = [] - self.edges = [] - - def __contains__(self, obj): - for node in self.nodes: - if node['node_object'] == obj: - return True - return False - - def __len__(self): - return len(self.nodes) - - def __iter__(self): - return self.nodes.__iter__() - - def generate_graphviz_plot(self): - def short_string_obj(obj): - if type(obj) == Job: - type_str = "Job" - if type(obj) == AdHocCommand: - type_str = "AdHocCommand" - elif type(obj) == InventoryUpdate: - type_str = "Inventory" - elif type(obj) == ProjectUpdate: - type_str = "Project" - elif type(obj) == WorkflowJob: - type_str = "Workflow" - else: - type_str = "Unknown" - type_str += "%s" % str(obj.id) - return type_str - - doc = """ - digraph g { - rankdir = LR - """ - for n in self.nodes: - doc += "%s [color = %s]\n" % ( - short_string_obj(n['node_object']), - "red" if n['node_object'].status == 'running' else "black", - ) - for from_node, to_node, label in self.edges: - doc += "%s -> %s [ label=\"%s\" ];\n" % ( - short_string_obj(self.nodes[from_node]['node_object']), - short_string_obj(self.nodes[to_node]['node_object']), - label, - ) - doc += "}\n" - gv_file = open('/tmp/graph.gv', 'w') - gv_file.write(doc) - gv_file.close() - - def add_node(self, obj, metadata=None): - if self.find_ord(obj) is None: - self.nodes.append(dict(node_object=obj, metadata=metadata)) - - def add_edge(self, from_obj, to_obj, label=None): - from_obj_ord = self.find_ord(from_obj) - to_obj_ord = self.find_ord(to_obj) - if from_obj_ord is None or to_obj_ord is None: - raise LookupError("Object not found") - self.edges.append((from_obj_ord, to_obj_ord, label)) - - def add_edges(self, edgelist): - for edge_pair in edgelist: - self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2]) - - def find_ord(self, obj): - for idx in range(len(self.nodes)): - if obj == self.nodes[idx]['node_object']: - return idx - return None - - def get_node_type(self, obj): - if type(obj) == Job: - return "job" - elif type(obj) == AdHocCommand: - return "ad_hoc_command" - elif type(obj) == InventoryUpdate: - return "inventory_update" - elif type(obj) == ProjectUpdate: - return "project_update" - elif type(obj) == SystemJob: - return "system_job" - elif type(obj) == WorkflowJob: - return "workflow_job" - return "unknown" - - def get_dependencies(self, obj, label=None): - antecedents = [] - this_ord = self.find_ord(obj) - for node, dep, lbl in self.edges: - if label: - if node == this_ord and lbl == label: - antecedents.append(self.nodes[dep]) - else: - if node == this_ord: - antecedents.append(self.nodes[dep]) - return antecedents - - def get_dependents(self, obj, label=None): - decendents = [] - this_ord = self.find_ord(obj) - for node, dep, lbl in self.edges: - if label: - if dep == this_ord and lbl == label: - decendents.append(self.nodes[node]) - else: - if dep == this_ord: - decendents.append(self.nodes[node]) - return decendents - - def get_leaf_nodes(self): - leafs = [] - for n in self.nodes: - if len(self.get_dependencies(n['node_object'])) < 1: - leafs.append(n) - return leafs - - def get_root_nodes(self): - roots = [] - for n in self.nodes: - if len(self.get_dependents(n['node_object'])) < 1: - roots.append(n) - return roots - -class WorkflowDAG(SimpleDAG): - def __init__(self, workflow_job=None): - super(WorkflowDAG, self).__init__() - if workflow_job: - self._init_graph(workflow_job) - - def _init_graph(self, workflow_job): - workflow_nodes = workflow_job.workflow_job_nodes.all() - for workflow_node in workflow_nodes: - self.add_node(workflow_node) - - for node_type in ['success_nodes', 'failure_nodes', 'always_nodes']: - for workflow_node in workflow_nodes: - related_nodes = getattr(workflow_node, node_type).all() - for related_node in related_nodes: - self.add_edge(workflow_node, related_node, node_type) - - def bfs_nodes_to_run(self): - root_nodes = self.get_root_nodes() - nodes = root_nodes - nodes_found = [] - - for index, n in enumerate(nodes): - obj = n['node_object'] - job = obj.job - - if not job: - nodes_found.append(n) - # Job is about to run or is running. Hold our horses and wait for - # the job to finish. We can't proceed down the graph path until we - # have the job result. - elif job.status not in ['failed', 'error', 'successful']: - continue - elif job.status in ['failed', 'error']: - children_failed = self.get_dependencies(obj, 'failure_nodes') - children_always = self.get_dependencies(obj, 'always_nodes') - children_all = children_failed + children_always - nodes.extend(children_all) - elif job.status in ['successful']: - children_success = self.get_dependencies(obj, 'success_nodes') - nodes.extend(children_success) - else: - logger.warn("Incorrect graph structure") - return [n['node_object'] for n in nodes_found] - - def is_workflow_done(self): - root_nodes = self.get_root_nodes() - nodes = root_nodes - - for index, n in enumerate(nodes): - obj = n['node_object'] - job = obj.job - - if not job: - return False - # Job is about to run or is running. Hold our horses and wait for - # the job to finish. We can't proceed down the graph path until we - # have the job result. - elif job.status not in ['failed', 'error', 'successful']: - return False - elif job.status in ['failed', 'error']: - children_failed = self.get_dependencies(obj, 'failure_nodes') - children_always = self.get_dependencies(obj, 'always_nodes') - children_all = children_failed + children_always - nodes.extend(children_all) - elif job.status in ['successful']: - children_success = self.get_dependencies(obj, 'success_nodes') - nodes.extend(children_success) - else: - logger.warn("Incorrect graph structure") - return True - -def get_tasks(): - """Fetch all Tower tasks that are relevant to the task management - system. - """ - RELEVANT_JOBS = ('pending', 'waiting', 'running') - # TODO: Replace this when we can grab all objects in a sane way. - graph_jobs = [j for j in Job.objects.filter(status__in=RELEVANT_JOBS)] - graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(status__in=RELEVANT_JOBS)] - graph_inventory_updates = [iu for iu in - InventoryUpdate.objects.filter(status__in=RELEVANT_JOBS)] - graph_project_updates = [pu for pu in - ProjectUpdate.objects.filter(status__in=RELEVANT_JOBS)] - graph_system_jobs = [sj for sj in - SystemJob.objects.filter(status__in=RELEVANT_JOBS)] - graph_workflow_jobs = [wf for wf in - WorkflowJob.objects.filter(status__in=RELEVANT_JOBS)] - all_actions = sorted(graph_jobs + graph_ad_hoc_commands + graph_inventory_updates + - graph_project_updates + graph_system_jobs + - graph_workflow_jobs, - key=lambda task: task.created) - return all_actions - -def get_running_workflow_jobs(): - graph_workflow_jobs = [wf for wf in - WorkflowJob.objects.filter(status='running')] - return graph_workflow_jobs - -def do_spawn_workflow_jobs(): - workflow_jobs = get_running_workflow_jobs() - for workflow_job in workflow_jobs: - dag = WorkflowDAG(workflow_job) - spawn_nodes = dag.bfs_nodes_to_run() - for spawn_node in spawn_nodes: - kv = spawn_node.get_job_kwargs() - job = spawn_node.unified_job_template.create_unified_job(**kv) - spawn_node.job = job - spawn_node.save() - can_start = job.signal_start(**kv) - if not can_start: - job.status = 'failed' - job.job_explanation = "Workflow job could not start because it was not in the right state or required manual credentials" - job.save(update_fields=['status', 'job_explanation']) - job.socketio_emit_status("failed") - - # TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ? - #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) - - -def rebuild_graph(message): - """Regenerate the task graph by refreshing known tasks from Tower, purging - orphaned running tasks, and creating dependencies for new tasks before - generating directed edge relationships between those tasks. - """ - # Sanity check: Only do this on the primary node. - if Instance.objects.my_role() == 'secondary': - return None - - inspector = inspect() - if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): - active_task_queues = inspector.active() - else: - logger.warn("Ignoring celery task inspector") - active_task_queues = None - - do_spawn_workflow_jobs() - - all_sorted_tasks = get_tasks() - if not len(all_sorted_tasks): - return None - - active_tasks = [] - if active_task_queues is not None: - for queue in active_task_queues: - active_tasks += [at['id'] for at in active_task_queues[queue]] - else: - logger.error("Could not communicate with celery!") - # TODO: Something needs to be done here to signal to the system - # as a whole that celery appears to be down. - if not hasattr(settings, 'CELERY_UNIT_TEST'): - return None - - running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) - waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) - new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) - - # Check running tasks and make sure they are active in celery - logger.debug("Active celery tasks: " + str(active_tasks)) - for task in list(running_tasks): - if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): - # NOTE: Pull status again and make sure it didn't finish in - # the meantime? - task.status = 'failed' - task.job_explanation += ' '.join(( - 'Task was marked as running in Tower but was not present in', - 'Celery, so it has been marked as failed.', - )) - task.save() - task.socketio_emit_status("failed") - running_tasks.pop(running_tasks.index(task)) - logger.error("Task %s appears orphaned... marking as failed" % task) - - # Create and process dependencies for new tasks - for task in new_tasks: - logger.debug("Checking dependencies for: %s" % str(task)) - try: - task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) - except Exception, e: - logger.error("Failed processing dependencies for {}: {}".format(task, e)) - task.status = 'failed' - task.job_explanation += 'Task failed to generate dependencies: {}'.format(e) - task.save() - task.socketio_emit_status("failed") - continue - logger.debug("New dependencies: %s" % str(task_dependencies)) - for dep in task_dependencies: - # We recalculate the created time for the moment to ensure the - # dependencies are always sorted in the right order relative to - # the dependent task. - time_delt = len(task_dependencies) - task_dependencies.index(dep) - dep.created = task.created - datetime.timedelta(seconds=1 + time_delt) - dep.status = 'waiting' - dep.save() - waiting_tasks.insert(waiting_tasks.index(task), dep) - if not hasattr(settings, 'UNIT_TEST_IGNORE_TASK_WAIT'): - task.status = 'waiting' - task.save() - - # Rebuild graph - graph = SimpleDAG() - for task in running_tasks: - graph.add_node(task) - for wait_task in waiting_tasks[:50]: - node_dependencies = [] - for node in graph: - if wait_task.is_blocked_by(node['node_object']): - node_dependencies.append(node['node_object']) - graph.add_node(wait_task) - for dependency in node_dependencies: - graph.add_edge(wait_task, dependency) - if settings.DEBUG: - graph.generate_graphviz_plot() - return graph - -def process_graph(graph, task_capacity): - """Given a task dependency graph, start and manage tasks given their - priority and weight. - """ - leaf_nodes = graph.get_leaf_nodes() - running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) - running_impact = sum([t['node_object'].task_impact for t in running_nodes]) - ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) - remaining_volume = task_capacity - running_impact - logger.info('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' - 'Remaining Capacity: %s' % - (str(running_nodes), str(task_capacity), - str(running_impact), str(remaining_volume))) - logger.info("Ready Nodes: %s" % str(ready_nodes)) - for task_node in ready_nodes: - node_obj = task_node['node_object'] - # NOTE: This could be used to pass metadata through the task system - # node_args = task_node['metadata'] - impact = node_obj.task_impact - if impact <= remaining_volume or running_impact == 0: - node_dependencies = graph.get_dependents(node_obj) - # Allow other tasks to continue if a job fails, even if they are - # other jobs. - if graph.get_node_type(node_obj) == 'job': - node_dependencies = [] - dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \ - [{'type': graph.get_node_type(n['node_object']), - 'id': n['node_object'].id} for n in node_dependencies] - error_handler = handle_work_error.s(subtasks=dependent_nodes) - success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj), - 'id': node_obj.id}) - start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) - if not start_status: - node_obj.status = 'failed' - if node_obj.job_explanation: - node_obj.job_explanation += ' ' - node_obj.job_explanation += 'Task failed pre-start check.' - node_obj.save() - continue - remaining_volume -= impact - running_impact += impact - logger.info('Started Node: %s (capacity hit: %s) ' - 'Remaining Capacity: %s' % - (str(node_obj), str(impact), str(remaining_volume))) - -def run_taskmanager(): - """Receive task start and finish signals to rebuild a dependency graph - and manage the actual running of tasks. - """ - def shutdown_handler(): - def _handler(signum, frame): - signal.signal(signum, signal.SIG_DFL) - os.kill(os.getpid(), signum) - return _handler - signal.signal(signal.SIGINT, shutdown_handler()) - signal.signal(signal.SIGTERM, shutdown_handler()) - paused = False - task_capacity = get_system_task_capacity() - last_rebuild = datetime.datetime.fromtimestamp(0) - - # Attempt to pull messages off of the task system queue into perpetuity. - # - # A quick explanation of what is happening here: - # The popping messages off the queue bit is something of a sham. We remove - # the messages from the queue and then immediately throw them away. The - # `rebuild_graph` function, while it takes the message as an argument, - # ignores it. - # - # What actually happens is that we just check the database every 10 seconds - # to see what the task dependency graph looks like, and go do that. This - # is the job of the `rebuild_graph` function. - # - # There is some placeholder here: we may choose to actually use the message - # in the future. - while True: - # Pop a message off the queue. - # (If the queue is empty, None will be returned.) - message = queue.pop() - - # Parse out the message appropriately, rebuilding our graph if - # appropriate. - if (datetime.datetime.now() - last_rebuild).seconds > 10: - if message is not None and 'pause' in message: - logger.info("Pause command received: %s" % str(message)) - paused = message['pause'] - graph = rebuild_graph(message) - if not paused and graph is not None: - process_graph(graph, task_capacity) - last_rebuild = datetime.datetime.now() - time.sleep(0.1) - - -class Command(NoArgsCommand): - """Tower Task Management System - This daemon is designed to reside between our tasks and celery and - provide a mechanism for understanding the relationship between those tasks - and their dependencies. - - It also actively prevents situations in which Tower can get blocked - because it doesn't have an understanding of what is progressing through - celery. - """ - help = 'Launch the Tower task management system' - - def handle_noargs(self, **options): - try: - run_taskmanager() - except KeyboardInterrupt: - pass diff --git a/awx/main/migrations/0037_job_allow_simultaneous.py b/awx/main/migrations/0037_job_allow_simultaneous.py new file mode 100644 index 0000000000..8a2e89df94 --- /dev/null +++ b/awx/main/migrations/0037_job_allow_simultaneous.py @@ -0,0 +1,19 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0036_v310_remove_tower_settings'), + ] + + operations = [ + migrations.AddField( + model_name='job', + name='allow_simultaneous', + field=models.BooleanField(default=False), + ), + ] diff --git a/awx/main/migrations/0038_v310_workflow_rbac_prompts.py b/awx/main/migrations/0038_v310_workflow_rbac_prompts.py index c134ed36fc..2778311dd8 100644 --- a/awx/main/migrations/0038_v310_workflow_rbac_prompts.py +++ b/awx/main/migrations/0038_v310_workflow_rbac_prompts.py @@ -10,7 +10,7 @@ import awx.main.fields class Migration(migrations.Migration): dependencies = [ - ('main', '0036_v310_remove_tower_settings'), + ('main', '0037_job_allow_simultaneous'), ] operations = [ diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index b478897d6a..d08a5fba40 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -139,6 +139,9 @@ class JobOptions(BaseModel): become_enabled = models.BooleanField( default=False, ) + allow_simultaneous = models.BooleanField( + default=False, + ) extra_vars_dict = VarsDictProperty('extra_vars', True) @@ -237,9 +240,6 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin): read_role = ImplicitRoleField( parent_role=['project.organization.auditor_role', 'inventory.organization.auditor_role', 'execute_role', 'admin_role'], ) - allow_simultaneous = models.BooleanField( - default=False, - ) @classmethod @@ -252,7 +252,7 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin): 'playbook', 'credential', 'cloud_credential', 'network_credential', 'forks', 'schedule', 'limit', 'verbosity', 'job_tags', 'extra_vars', 'launch_type', 'force_handlers', 'skip_tags', 'start_at_task', 'become_enabled', - 'labels', 'survey_passwords'] + 'labels', 'survey_passwords', 'allow_simultaneous',] def resource_validation_data(self): ''' @@ -628,7 +628,7 @@ class Job(UnifiedJob, JobOptions, JobNotificationMixin): if obj.job_template is not None and obj.inventory is not None: if obj.job_template == self.job_template and \ obj.inventory == self.inventory: - if self.job_template.allow_simultaneous: + if self.allow_simultaneous: return False if obj.launch_type == 'callback' and self.launch_type == 'callback' and \ obj.limit != self.limit: diff --git a/awx/main/models/notifications.py b/awx/main/models/notifications.py index 442b5dc2c8..a9dbcdebdc 100644 --- a/awx/main/models/notifications.py +++ b/awx/main/models/notifications.py @@ -181,11 +181,11 @@ class JobNotificationMixin(object): def _build_notification_message(self, status_str): notification_body = self.notification_data() - notification_subject = "{} #{} '{}' {} on Ansible Tower: {}".format(self.get_notification_friendly_name(), - self.id, - self.name, - status_str, - notification_body['url']) + notification_subject = u"{} #{} '{}' {} on Ansible Tower: {}".format(self.get_notification_friendly_name(), + self.id, + self.name, + status_str, + notification_body['url']) notification_body['friendly_name'] = self.get_notification_friendly_name() return (notification_subject, notification_body) diff --git a/awx/main/models/rbac.py b/awx/main/models/rbac.py index 27977e9030..5d6bef1bfc 100644 --- a/awx/main/models/rbac.py +++ b/awx/main/models/rbac.py @@ -381,7 +381,7 @@ class Role(models.Model): 'ancestors_table': Role.ancestors.through._meta.db_table, 'parents_table': Role.parents.through._meta.db_table, 'roles_table': Role._meta.db_table, - 'ids': ','.join(str(x) for x in user.roles.values_list('id', flat=True)) + 'ids': ','.join(str(x) for x in user.roles.values_list('id', flat=True)), } qs = Role.objects.extra( diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 950b6fc99b..d304f24d79 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -798,34 +798,43 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique status=self.status, traceback=self.result_traceback) - def start(self, error_callback, success_callback, **kwargs): - ''' - Start the task running via Celery. - ''' - task_class = self._get_task_class() + def pre_start(self, **kwargs): if not self.can_start: self.job_explanation = u'%s is not in a startable status: %s, expecting one of %s' % (self._meta.verbose_name, self.status, str(('new', 'waiting'))) self.save(update_fields=['job_explanation']) - return False + return (False, None) + needed = self.get_passwords_needed_to_start() try: start_args = json.loads(decrypt_field(self, 'start_args')) except Exception: start_args = None + if start_args in (None, ''): start_args = kwargs + opts = dict([(field, start_args.get(field, '')) for field in needed]) + if not all(opts.values()): missing_fields = ', '.join([k for k,v in opts.items() if not v]) self.job_explanation = u'Missing needed fields: %s.' % missing_fields self.save(update_fields=['job_explanation']) - return False - #extra_data = dict([(field, kwargs[field]) for field in kwargs - # if field not in needed]) + return (False, None) + if 'extra_vars' in kwargs: self.handle_extra_data(kwargs['extra_vars']) - task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) - return True + + return (True, opts) + + def start(self, error_callback, success_callback, **kwargs): + ''' + Start the task running via Celery. + ''' + task_class = self._get_task_class() + (res, opts) = self.pre_start(**kwargs) + if res: + task_class().apply_async((self.pk,), opts, link_error=error_callback, link=success_callback) + return res def signal_start(self, **kwargs): """Notify the task runner system to begin work on this task.""" @@ -852,6 +861,9 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique self.update_fields(start_args=json.dumps(kwargs), status='pending') self.socketio_emit_status("pending") + from awx.main.scheduler.tasks import run_job_launch + run_job_launch.delay(self.id) + # Each type of unified job has a different Task class; get the # appropirate one. # task_type = get_type_for_model(self) diff --git a/awx/main/models/workflow.py b/awx/main/models/workflow.py index d300347d82..94a5706942 100644 --- a/awx/main/models/workflow.py +++ b/awx/main/models/workflow.py @@ -363,3 +363,11 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, JobNotificationMixin, Workflow def get_notification_friendly_name(self): return "Workflow Job" + def start(self, *args, **kwargs): + (res, opts) = self.pre_start(**kwargs) + if res: + self.status = 'running' + self.save() + self.socketio_emit_status("running") + return res + diff --git a/awx/main/scheduler/__init__.py b/awx/main/scheduler/__init__.py new file mode 100644 index 0000000000..6c15488251 --- /dev/null +++ b/awx/main/scheduler/__init__.py @@ -0,0 +1,255 @@ +#Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved + +# Python +import datetime +import logging + +# Django +from django.conf import settings +from django.db import transaction + +# AWX +from awx.main.models import * # noqa +from awx.main.utils import get_system_task_capacity +from awx.main.scheduler.dag_simple import SimpleDAG +from awx.main.scheduler.dag_workflow import WorkflowDAG + +# Celery +from celery.task.control import inspect + +logger = logging.getLogger('awx.main.scheduler') + +def get_tasks(): + """Fetch all Tower tasks that are relevant to the task management + system. + """ + RELEVANT_JOBS = ('pending', 'waiting', 'running') + # TODO: Replace this when we can grab all objects in a sane way. + graph_jobs = [j for j in Job.objects.filter(status__in=RELEVANT_JOBS)] + graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(status__in=RELEVANT_JOBS)] + graph_inventory_updates = [iu for iu in + InventoryUpdate.objects.filter(status__in=RELEVANT_JOBS)] + graph_project_updates = [pu for pu in + ProjectUpdate.objects.filter(status__in=RELEVANT_JOBS)] + graph_system_jobs = [sj for sj in + SystemJob.objects.filter(status__in=RELEVANT_JOBS)] + graph_workflow_jobs = [wf for wf in + WorkflowJob.objects.filter(status__in=RELEVANT_JOBS)] + all_actions = sorted(graph_jobs + graph_ad_hoc_commands + graph_inventory_updates + + graph_project_updates + graph_system_jobs + + graph_workflow_jobs, + key=lambda task: task.created) + return all_actions + +def get_running_workflow_jobs(): + graph_workflow_jobs = [wf for wf in + WorkflowJob.objects.filter(status='running')] + return graph_workflow_jobs + +def spawn_workflow_graph_jobs(workflow_jobs): + # TODO: Consider using transaction.atomic + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + spawn_nodes = dag.bfs_nodes_to_run() + for spawn_node in spawn_nodes: + kv = spawn_node.get_job_kwargs() + job = spawn_node.unified_job_template.create_unified_job(**kv) + spawn_node.job = job + spawn_node.save() + can_start = job.signal_start(**kv) + if not can_start: + job.status = 'failed' + job.job_explanation = "Workflow job could not start because it was not in the right state or required manual credentials" + job.save(update_fields=['status', 'job_explanation']) + job.socketio_emit_status("failed") + + # TODO: should we emit a status on the socket here similar to tasks.py tower_periodic_scheduler() ? + #emit_websocket_notification('/socket.io/jobs', '', dict(id=)) + +# See comment in tasks.py::RunWorkflowJob::run() +def process_finished_workflow_jobs(workflow_jobs): + for workflow_job in workflow_jobs: + dag = WorkflowDAG(workflow_job) + if dag.is_workflow_done(): + with transaction.atomic(): + # TODO: detect if wfj failed + workflow_job.status = 'completed' + workflow_job.save() + workflow_job.socketio_emit_status('completed') + +def rebuild_graph(): + """Regenerate the task graph by refreshing known tasks from Tower, purging + orphaned running tasks, and creating dependencies for new tasks before + generating directed edge relationships between those tasks. + """ + ''' + # Sanity check: Only do this on the primary node. + if Instance.objects.my_role() == 'secondary': + return None + ''' + + inspector = inspect() + if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): + active_task_queues = inspector.active() + else: + logger.warn("Ignoring celery task inspector") + active_task_queues = None + + all_sorted_tasks = get_tasks() + if not len(all_sorted_tasks): + return None + + active_tasks = [] + if active_task_queues is not None: + for queue in active_task_queues: + active_tasks += [at['id'] for at in active_task_queues[queue]] + else: + logger.error("Could not communicate with celery!") + # TODO: Something needs to be done here to signal to the system + # as a whole that celery appears to be down. + if not hasattr(settings, 'CELERY_UNIT_TEST'): + return None + + running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) + running_celery_tasks = filter(lambda t: type(t) != WorkflowJob, running_tasks) + waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) + new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) + + # Check running tasks and make sure they are active in celery + logger.debug("Active celery tasks: " + str(active_tasks)) + for task in list(running_celery_tasks): + if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): + # NOTE: Pull status again and make sure it didn't finish in + # the meantime? + task.status = 'failed' + task.job_explanation += ' '.join(( + 'Task was marked as running in Tower but was not present in', + 'Celery, so it has been marked as failed.', + )) + task.save() + task.socketio_emit_status("failed") + running_tasks.pop(task) + logger.error("Task %s appears orphaned... marking as failed" % task) + + # Create and process dependencies for new tasks + for task in new_tasks: + logger.debug("Checking dependencies for: %s" % str(task)) + try: + task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) + except Exception, e: + logger.error("Failed processing dependencies for {}: {}".format(task, e)) + task.status = 'failed' + task.job_explanation += 'Task failed to generate dependencies: {}'.format(e) + task.save() + task.socketio_emit_status("failed") + continue + logger.debug("New dependencies: %s" % str(task_dependencies)) + for dep in task_dependencies: + # We recalculate the created time for the moment to ensure the + # dependencies are always sorted in the right order relative to + # the dependent task. + time_delt = len(task_dependencies) - task_dependencies.index(dep) + dep.created = task.created - datetime.timedelta(seconds=1 + time_delt) + dep.status = 'waiting' + dep.save() + waiting_tasks.insert(waiting_tasks.index(task), dep) + if not hasattr(settings, 'UNIT_TEST_IGNORE_TASK_WAIT'): + task.status = 'waiting' + task.save() + + # Rebuild graph + graph = SimpleDAG() + for task in running_tasks: + graph.add_node(task) + for wait_task in waiting_tasks[:50]: + node_dependencies = [] + for node in graph: + if wait_task.is_blocked_by(node['node_object']): + node_dependencies.append(node['node_object']) + graph.add_node(wait_task) + for dependency in node_dependencies: + graph.add_edge(wait_task, dependency) + if settings.DEBUG: + graph.generate_graphviz_plot() + return graph + +def process_graph(graph, task_capacity): + """Given a task dependency graph, start and manage tasks given their + priority and weight. + """ + from awx.main.tasks import handle_work_error, handle_work_success + + leaf_nodes = graph.get_leaf_nodes() + running_nodes = filter(lambda x: x['node_object'].status == 'running', leaf_nodes) + running_impact = sum([t['node_object'].task_impact for t in running_nodes]) + ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) + remaining_volume = task_capacity - running_impact + logger.info('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' + 'Remaining Capacity: %s' % + (str(running_nodes), str(task_capacity), + str(running_impact), str(remaining_volume))) + logger.info("Ready Nodes: %s" % str(ready_nodes)) + for task_node in ready_nodes: + node_obj = task_node['node_object'] + # NOTE: This could be used to pass metadata through the task system + # node_args = task_node['metadata'] + impact = node_obj.task_impact + if impact <= remaining_volume or running_impact == 0: + node_dependencies = graph.get_dependents(node_obj) + # Allow other tasks to continue if a job fails, even if they are + # other jobs. + + node_type = graph.get_node_type(node_obj) + if node_type == 'job': + # clear dependencies because a job can block (not necessarily + # depend) on other jobs that share the same job template + node_dependencies = [] + + # Make the workflow_job look like it's started by setting status to + # running, but don't make a celery Task for it. + # Introduce jobs from the workflow so they are candidates to run. + # Call process_graph() again to allow choosing for run, the + # created candidate jobs. + elif node_type == 'workflow_job': + node_obj.start() + spawn_workflow_graph_jobs([node_obj]) + return process_graph(graph, task_capacity) + + dependent_nodes = [{'type': graph.get_node_type(node_obj), 'id': node_obj.id}] + \ + [{'type': graph.get_node_type(n['node_object']), + 'id': n['node_object'].id} for n in node_dependencies] + error_handler = handle_work_error.s(subtasks=dependent_nodes) + success_handler = handle_work_success.s(task_actual={'type': graph.get_node_type(node_obj), + 'id': node_obj.id}) + with transaction.atomic(): + start_status = node_obj.start(error_callback=error_handler, success_callback=success_handler) + if not start_status: + node_obj.status = 'failed' + if node_obj.job_explanation: + node_obj.job_explanation += ' ' + node_obj.job_explanation += 'Task failed pre-start check.' + node_obj.save() + continue + remaining_volume -= impact + running_impact += impact + logger.info('Started Node: %s (capacity hit: %s) ' + 'Remaining Capacity: %s' % + (str(node_obj), str(impact), str(remaining_volume))) + +def schedule(): + with transaction.atomic(): + # Lock + Instance.objects.select_for_update().all()[0] + + task_capacity = get_system_task_capacity() + + workflow_jobs = get_running_workflow_jobs() + process_finished_workflow_jobs(workflow_jobs) + spawn_workflow_graph_jobs(workflow_jobs) + + graph = rebuild_graph() + if graph: + process_graph(graph, task_capacity) + + # Unlock, due to transaction ending diff --git a/awx/main/scheduler/dag_simple.py b/awx/main/scheduler/dag_simple.py new file mode 100644 index 0000000000..aeb0ff759e --- /dev/null +++ b/awx/main/scheduler/dag_simple.py @@ -0,0 +1,140 @@ + +from awx.main.models import ( + Job, + AdHocCommand, + InventoryUpdate, + ProjectUpdate, + WorkflowJob, + SystemJob, +) + +class SimpleDAG(object): + ''' A simple implementation of a directed acyclic graph ''' + + def __init__(self): + self.nodes = [] + self.edges = [] + + def __contains__(self, obj): + for node in self.nodes: + if node['node_object'] == obj: + return True + return False + + def __len__(self): + return len(self.nodes) + + def __iter__(self): + return self.nodes.__iter__() + + def generate_graphviz_plot(self): + def short_string_obj(obj): + if type(obj) == Job: + type_str = "Job" + if type(obj) == AdHocCommand: + type_str = "AdHocCommand" + elif type(obj) == InventoryUpdate: + type_str = "Inventory" + elif type(obj) == ProjectUpdate: + type_str = "Project" + elif type(obj) == WorkflowJob: + type_str = "Workflow" + else: + type_str = "Unknown" + type_str += "%s" % str(obj.id) + return type_str + + doc = """ + digraph g { + rankdir = LR + """ + for n in self.nodes: + doc += "%s [color = %s]\n" % ( + short_string_obj(n['node_object']), + "red" if n['node_object'].status == 'running' else "black", + ) + for from_node, to_node, label in self.edges: + doc += "%s -> %s [ label=\"%s\" ];\n" % ( + short_string_obj(self.nodes[from_node]['node_object']), + short_string_obj(self.nodes[to_node]['node_object']), + label, + ) + doc += "}\n" + gv_file = open('/tmp/graph.gv', 'w') + gv_file.write(doc) + gv_file.close() + + def add_node(self, obj, metadata=None): + if self.find_ord(obj) is None: + self.nodes.append(dict(node_object=obj, metadata=metadata)) + + def add_edge(self, from_obj, to_obj, label=None): + from_obj_ord = self.find_ord(from_obj) + to_obj_ord = self.find_ord(to_obj) + if from_obj_ord is None or to_obj_ord is None: + raise LookupError("Object not found") + self.edges.append((from_obj_ord, to_obj_ord, label)) + + def add_edges(self, edgelist): + for edge_pair in edgelist: + self.add_edge(edge_pair[0], edge_pair[1], edge_pair[2]) + + def find_ord(self, obj): + for idx in range(len(self.nodes)): + if obj == self.nodes[idx]['node_object']: + return idx + return None + + def get_node_type(self, obj): + if type(obj) == Job: + return "job" + elif type(obj) == AdHocCommand: + return "ad_hoc_command" + elif type(obj) == InventoryUpdate: + return "inventory_update" + elif type(obj) == ProjectUpdate: + return "project_update" + elif type(obj) == SystemJob: + return "system_job" + elif type(obj) == WorkflowJob: + return "workflow_job" + return "unknown" + + def get_dependencies(self, obj, label=None): + antecedents = [] + this_ord = self.find_ord(obj) + for node, dep, lbl in self.edges: + if label: + if node == this_ord and lbl == label: + antecedents.append(self.nodes[dep]) + else: + if node == this_ord: + antecedents.append(self.nodes[dep]) + return antecedents + + def get_dependents(self, obj, label=None): + decendents = [] + this_ord = self.find_ord(obj) + for node, dep, lbl in self.edges: + if label: + if dep == this_ord and lbl == label: + decendents.append(self.nodes[node]) + else: + if dep == this_ord: + decendents.append(self.nodes[node]) + return decendents + + def get_leaf_nodes(self): + leafs = [] + for n in self.nodes: + if len(self.get_dependencies(n['node_object'])) < 1: + leafs.append(n) + return leafs + + def get_root_nodes(self): + roots = [] + for n in self.nodes: + if len(self.get_dependents(n['node_object'])) < 1: + roots.append(n) + return roots + diff --git a/awx/main/scheduler/dag_workflow.py b/awx/main/scheduler/dag_workflow.py new file mode 100644 index 0000000000..c891b2ec32 --- /dev/null +++ b/awx/main/scheduler/dag_workflow.py @@ -0,0 +1,72 @@ + +# AWX +from awx.main.scheduler.dag_simple import SimpleDAG + +class WorkflowDAG(SimpleDAG): + def __init__(self, workflow_job=None): + super(WorkflowDAG, self).__init__() + if workflow_job: + self._init_graph(workflow_job) + + def _init_graph(self, workflow_job): + workflow_nodes = workflow_job.workflow_job_nodes.all() + for workflow_node in workflow_nodes: + self.add_node(workflow_node) + + for node_type in ['success_nodes', 'failure_nodes', 'always_nodes']: + for workflow_node in workflow_nodes: + related_nodes = getattr(workflow_node, node_type).all() + for related_node in related_nodes: + self.add_edge(workflow_node, related_node, node_type) + + def bfs_nodes_to_run(self): + root_nodes = self.get_root_nodes() + nodes = root_nodes + nodes_found = [] + + for index, n in enumerate(nodes): + obj = n['node_object'] + job = obj.job + + if not job: + nodes_found.append(n) + # Job is about to run or is running. Hold our horses and wait for + # the job to finish. We can't proceed down the graph path until we + # have the job result. + elif job.status not in ['failed', 'error', 'successful']: + continue + elif job.status in ['failed', 'error']: + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_failed + children_always + nodes.extend(children_all) + elif job.status in ['successful']: + children_success = self.get_dependencies(obj, 'success_nodes') + nodes.extend(children_success) + return [n['node_object'] for n in nodes_found] + + def is_workflow_done(self): + root_nodes = self.get_root_nodes() + nodes = root_nodes + + for index, n in enumerate(nodes): + obj = n['node_object'] + job = obj.job + + if not job: + return False + # Job is about to run or is running. Hold our horses and wait for + # the job to finish. We can't proceed down the graph path until we + # have the job result. + elif job.status not in ['failed', 'error', 'successful']: + return False + elif job.status in ['failed', 'error']: + children_failed = self.get_dependencies(obj, 'failure_nodes') + children_always = self.get_dependencies(obj, 'always_nodes') + children_all = children_failed + children_always + nodes.extend(children_all) + elif job.status in ['successful']: + children_success = self.get_dependencies(obj, 'success_nodes') + nodes.extend(children_success) + return True + diff --git a/awx/main/scheduler/tasks.py b/awx/main/scheduler/tasks.py new file mode 100644 index 0000000000..343bdd1546 --- /dev/null +++ b/awx/main/scheduler/tasks.py @@ -0,0 +1,79 @@ + +# Python +import logging +import time + +# Celery +from celery import task + +# AWX +from awx.main.models import UnifiedJob +from awx.main.scheduler import schedule + +logger = logging.getLogger('awx.main.scheduler') + +# TODO: move logic to UnifiedJob model and use bind=True feature of celery. +# Would we need the request loop then? I think so. Even if we get the in-memory +# updated model, the call to schedule() may get stale data. + +@task +def run_job_launch(job_id): + # Wait for job to exist. + # The job is created in a transaction then the message is created, but + # the transaction may not have completed. + + # FIXME: We could generate the message in a Django signal handler. + # OR, we could call an explicit commit in the view and then send the + # message. + + retries = 10 + retry = 0 + while not UnifiedJob.objects.filter(id=job_id).exists(): + time.sleep(0.3) + + if retry >= retries: + logger.error("Failed to process 'job_launch' message for job %d" % job_id) + # ack the message so we don't build up the queue. + # + # The job can still be chosen to run during tower startup or + # when another job is started or completes + return + retry += 1 + + # "Safe" to get the job now since it exists. + # Really, there is a race condition from exists to get + + # TODO: while not loop should call get wrapped in a try except + #job = UnifiedJob.objects.get(id=job_id) + + schedule() + +@task +def run_job_complete(job_id): + # TODO: use list of finished status from jobs.py or unified_jobs.py + finished_status = ['successful', 'error', 'failed', 'completed'] + q = UnifiedJob.objects.filter(id=job_id) + + # Ensure that the job is updated in the database before we call to + # schedule the next job. + retries = 10 + retry = 0 + while True: + # Job not found, most likely deleted. That's fine + if not q.exists(): + logger.warn("Failed to find job '%d' while processing 'job_complete' message. Presume that it was deleted." % job_id) + break + + job = q[0] + if job.status in finished_status: + break + + time.sleep(0.3) + + if retry >= retries: + logger.error("Expected job status '%s' to be one of '%s' while processing 'job_complete' message." % (job.status, finished_status)) + return + retry += 1 + + schedule() + diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c4138cdb52..6dbac70108 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -47,7 +47,6 @@ from django.contrib.auth.models import User from awx.main.constants import CLOUD_PROVIDERS from awx.main.models import * # noqa from awx.main.models import UnifiedJob -from awx.main.queue import FifoQueue from awx.main.task_engine import TaskEnhancer from awx.main.utils import (get_ansible_version, get_ssh_version, decrypt_field, update_scm_url, emit_websocket_notification, @@ -57,7 +56,7 @@ __all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'RunAdHocCommand', 'RunWorkflowJob', 'handle_work_error', 'handle_work_success', 'update_inventory_computed_fields', 'send_notifications', 'run_administrative_checks', - 'run_workflow_job'] + 'RunJobLaunch'] HIDDEN_PASSWORD = '**********' @@ -177,14 +176,6 @@ def tower_periodic_scheduler(self): new_unified_job.socketio_emit_status("failed") emit_websocket_notification('/socket.io/schedules', 'schedule_changed', dict(id=schedule.id)) -@task(queue='default') -def notify_task_runner(metadata_dict): - """Add the given task into the Tower task manager's queue, to be consumed - by the task system. - """ - queue = FifoQueue('tower_task_manager') - queue.push(metadata_dict) - def _send_notification_templates(instance, status_str): if status_str not in ['succeeded', 'failed']: raise ValueError("status_str must be either succeeded or failed") @@ -200,6 +191,7 @@ def _send_notification_templates(instance, status_str): for n in all_notification_templates], job_id=instance.id) + @task(bind=True, queue='default') def handle_work_success(self, result, task_actual): instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id']) @@ -208,6 +200,9 @@ def handle_work_success(self, result, task_actual): _send_notification_templates(instance, 'succeeded') + from awx.main.scheduler.tasks import run_job_complete + run_job_complete.delay(instance.id) + @task(bind=True, queue='default') def handle_work_error(self, task_id, subtasks=None): print('Executing error task id %s, subtasks: %s' % @@ -236,6 +231,15 @@ def handle_work_error(self, task_id, subtasks=None): if first_instance: _send_notification_templates(first_instance, 'failed') + + # We only send 1 job complete message since all the job completion message + # handling does is trigger the scheduler. If we extend the functionality of + # what the job complete message handler does then we may want to send a + # completion event for each job here. + if first_instance: + from awx.main.scheduler.tasks import run_job_complete + run_job_complete.delay(first_instance.id) + pass @task(queue='default') def update_inventory_computed_fields(inventory_id, should_update_hosts=True): @@ -1662,21 +1666,30 @@ class RunSystemJob(BaseTask): def build_cwd(self, instance, **kwargs): return settings.BASE_DIR +''' class RunWorkflowJob(BaseTask): name = 'awx.main.tasks.run_workflow_job' model = WorkflowJob def run(self, pk, **kwargs): - from awx.main.management.commands.run_task_system import WorkflowDAG - ''' - Run the job/task and capture its output. - ''' - pass + #Run the job/task and capture its output. instance = self.update_model(pk, status='running', celery_task_id=self.request.id) instance.socketio_emit_status("running") - # FIXME: Detect workflow run completion + # FIXME: Currently, the workflow job busy waits until the graph run is + # complete. Instead, the workflow job should return or never even run, + # because all of the "launch logic" can be done schedule(). + + # However, other aspects of our system depend on a 1-1 relationship + # between a Job and a Celery Task. + # + # * If we let the workflow job task (RunWorkflowJob.run()) complete + # then how do we trigger the handle_work_error and + # handle_work_success subtasks? + # + # * How do we handle the recovery process? (i.e. there is an entry in + # the database but not in celery). while True: dag = WorkflowDAG(instance) if dag.is_workflow_done(): @@ -1686,4 +1699,4 @@ class RunWorkflowJob(BaseTask): time.sleep(1) instance.socketio_emit_status(instance.status) # TODO: Handle cancel - +''' diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index 287eb8a8c5..21587c8ded 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -12,7 +12,6 @@ import sys import tempfile import time import urllib -from multiprocessing import Process import re import mock @@ -30,7 +29,6 @@ from django.utils.encoding import force_text # AWX from awx.main.models import * # noqa -from awx.main.management.commands.run_task_system import run_taskmanager from awx.main.task_engine import TaskEnhancer from awx.main.utils import get_ansible_version from awx.sso.backends import LDAPSettings @@ -644,18 +642,6 @@ class BaseTestMixin(MockCommonlySlowTestMixin): u'expected no traceback, got:\n%s' % job.result_traceback) - - def start_taskmanager(self, command_port): - self.start_redis() - self.taskmanager_process = Process(target=run_taskmanager, - args=(command_port,)) - self.taskmanager_process.start() - - def terminate_taskmanager(self): - if hasattr(self, 'taskmanager_process'): - self.taskmanager_process.terminate() - self.stop_redis() - class BaseTest(BaseTestMixin, django.test.TestCase): ''' Base class for unit tests. diff --git a/awx/main/tests/functional/api/test_rbac_displays.py b/awx/main/tests/functional/api/test_rbac_displays.py index eb94e01dff..45b4a8f832 100644 --- a/awx/main/tests/functional/api/test_rbac_displays.py +++ b/awx/main/tests/functional/api/test_rbac_displays.py @@ -221,6 +221,12 @@ class TestAccessListCapabilities: direct_access_list = response.data['results'][0]['summary_fields']['direct_access'] assert direct_access_list[0]['role']['user_capabilities']['unattach'] == 'foobar' + def test_user_access_list_direct_access_capability(self, rando, get): + "When a user views their own access list, they can not unattach their admin role" + response = get(reverse('api:user_access_list', args=(rando.id,)), rando) + direct_access_list = response.data['results'][0]['summary_fields']['direct_access'] + assert not direct_access_list[0]['role']['user_capabilities']['unattach'] + @pytest.mark.django_db def test_team_roles_unattach(mocker, team, team_member, inventory, mock_access_method, get): diff --git a/awx/main/tests/functional/api/test_role.py b/awx/main/tests/functional/api/test_role.py new file mode 100644 index 0000000000..94215521a5 --- /dev/null +++ b/awx/main/tests/functional/api/test_role.py @@ -0,0 +1,13 @@ +import pytest + +from django.core.urlresolvers import reverse + +@pytest.mark.django_db +def test_admin_visible_to_orphaned_users(get, alice): + names = set() + + response = get(reverse('api:role_list'), user=alice) + for item in response.data['results']: + names.add(item['name']) + assert 'System Auditor' in names + assert 'System Administrator' in names diff --git a/awx/main/tests/unit/commands/test_run_task_system.py b/awx/main/tests/unit/scheduler/test_dag.py similarity index 97% rename from awx/main/tests/unit/commands/test_run_task_system.py rename to awx/main/tests/unit/scheduler/test_dag.py index bc62394b21..84fb2d37f2 100644 --- a/awx/main/tests/unit/commands/test_run_task_system.py +++ b/awx/main/tests/unit/scheduler/test_dag.py @@ -1,10 +1,12 @@ -from awx.main.management.commands.run_task_system import ( - SimpleDAG, - WorkflowDAG, -) + +# Python +import pytest + +# AWX +from awx.main.scheduler.dag_simple import SimpleDAG +from awx.main.scheduler.dag_workflow import WorkflowDAG from awx.main.models import Job from awx.main.models.workflow import WorkflowJobNode -import pytest @pytest.fixture def dag_root(): diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 33f2868bbb..9da2142c19 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -348,9 +348,11 @@ CELERYD_TASK_SOFT_TIME_LIMIT = None CELERYBEAT_SCHEDULER = 'celery.beat.PersistentScheduler' CELERYBEAT_MAX_LOOP_INTERVAL = 60 CELERY_RESULT_BACKEND = 'djcelery.backends.database:DatabaseBackend' +CELERY_IMPORTS = ('awx.main.scheduler.tasks',) CELERY_QUEUES = ( Queue('default', Exchange('default'), routing_key='default'), Queue('jobs', Exchange('jobs'), routing_key='jobs'), + Queue('scheduler', Exchange('scheduler', type='topic'), routing_key='scheduler.job.#', durable=False), # Projects use a fanout queue, this isn't super well supported Broadcast('projects'), ) @@ -362,8 +364,12 @@ CELERY_ROUTES = ({'awx.main.tasks.run_job': {'queue': 'jobs', 'awx.main.tasks.run_ad_hoc_command': {'queue': 'jobs', 'routing_key': 'jobs'}, 'awx.main.tasks.run_system_job': {'queue': 'jobs', - 'routing_key': 'jobs'}}) - + 'routing_key': 'jobs'}, + 'awx.main.scheduler.tasks.run_job_launch': {'queue': 'scheduler', + 'routing_key': 'scheduler.job.launch'}, + 'awx.main.scheduler.tasks.run_job_complete': {'queue': 'scheduler', + 'routing_key': 'scheduler.job.complete'},}) + CELERYBEAT_SCHEDULE = { 'tower_scheduler': { 'task': 'awx.main.tasks.tower_periodic_scheduler', @@ -755,6 +761,7 @@ ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC = False INTERNAL_API_URL = 'http://127.0.0.1:%s' % DEVSERVER_DEFAULT_PORT CALLBACK_QUEUE = "callback_tasks" +SCHEDULER_QUEUE = "scheduler" TASK_COMMAND_PORT = 6559 @@ -917,7 +924,11 @@ LOGGING = { 'handlers': ['console', 'file', 'socketio_service'], 'propagate': False }, - 'awx.main.commands.run_task_system': { + 'awx.main.tasks': { + 'handlers': ['console', 'file', 'task_system'], + 'propagate': False + }, + 'awx.main.scheduler': { 'handlers': ['console', 'file', 'task_system'], 'propagate': False }, diff --git a/awx/sso/conf.py b/awx/sso/conf.py index 264b609367..e0842f6031 100644 --- a/awx/sso/conf.py +++ b/awx/sso/conf.py @@ -169,6 +169,7 @@ register( field_class=fields.URLField, schemes=('ldap', 'ldaps'), allow_blank=True, + default='', label=_('LDAP Server URI'), help_text=_('URI to connect to LDAP server, such as "ldap://ldap.example.com:389" ' '(non-SSL) or "ldaps://ldap.example.com:636" (SSL). LDAP authentication ' @@ -880,6 +881,7 @@ register( register( 'SOCIAL_AUTH_SAML_TECHNICAL_CONTACT', field_class=fields.SAMLContactField, + allow_blank=True, default={}, label=_('SAML Service Provider Technical Contact'), help_text=_('Configure this setting with your contact information.'), @@ -894,6 +896,7 @@ register( register( 'SOCIAL_AUTH_SAML_SUPPORT_CONTACT', field_class=fields.SAMLContactField, + allow_blank=True, default={}, label=_('SAML Service Provider Support Contact'), help_text=_('Configure this setting with your contact information.'), diff --git a/awx/sso/fields.py b/awx/sso/fields.py index 6655ad3523..a0d472756e 100644 --- a/awx/sso/fields.py +++ b/awx/sso/fields.py @@ -349,6 +349,10 @@ class BaseDictWithChildField(fields.DictField): } allow_unknown_keys = False + def __init__(self, *args, **kwargs): + self.allow_blank = kwargs.pop('allow_blank', False) + super(BaseDictWithChildField, self).__init__(*args, **kwargs) + def to_representation(self, value): value = super(BaseDictWithChildField, self).to_representation(value) for k, v in value.items(): @@ -367,7 +371,7 @@ class BaseDictWithChildField(fields.DictField): continue elif key not in data: missing_keys.add(key) - if missing_keys: + if missing_keys and (data or not self.allow_blank): keys_display = json.dumps(list(missing_keys)).lstrip('[').rstrip(']') self.fail('missing_keys', missing_keys=keys_display) if not self.allow_unknown_keys: diff --git a/awx/ui/conf.py b/awx/ui/conf.py index 46fd4288c4..a92eeea35c 100644 --- a/awx/ui/conf.py +++ b/awx/ui/conf.py @@ -8,9 +8,18 @@ from django.utils.translation import ugettext_lazy as _ from awx.conf import fields, register +class PendoTrackingStateField(fields.ChoiceField): + + def to_internal_value(self, data): + # Any false/null values get converted to 'off'. + if data in fields.NullBooleanField.FALSE_VALUES or data in fields.NullBooleanField.NULL_VALUES: + return 'off' + return super(PendoTrackingStateField, self).to_internal_value(data) + + register( 'PENDO_TRACKING_STATE', - field_class=fields.ChoiceField, + field_class=PendoTrackingStateField, choices=[ ('off', _('Off')), ('anonymous', _('Anonymous')), diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 3e82d6bc81..d236b5d1f1 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -12,7 +12,7 @@ cliff==1.15.0 cmd2==0.6.8 d2to1==0.2.11 # TODO: Still needed? defusedxml==0.4.1 -Django==1.8.10 +Django==1.8.15 debtcollector==1.2.0 decorator==4.0.6 django-auth-ldap==1.2.6 diff --git a/tools/docker-compose/Dockerfile b/tools/docker-compose/Dockerfile index c583c4ddfd..ddd54616bb 100644 --- a/tools/docker-compose/Dockerfile +++ b/tools/docker-compose/Dockerfile @@ -9,7 +9,7 @@ RUN mkdir /tmp/requirements ADD requirements/requirements.txt requirements/requirements_ansible.txt requirements/requirements_dev.txt requirements/requirements_jenkins.txt /tmp/requirements/ RUN yum -y update && yum -y install curl epel-release RUN curl --silent --location https://rpm.nodesource.com/setup_6.x | bash - -RUN yum -y localinstall http://yum.postgresql.org/9.4/redhat/rhel-6-x86_64/pgdg-centos94-9.4-1.noarch.rpm +RUN yum -y localinstall http://download.postgresql.org/pub/repos/yum/9.4/redhat/rhel-6-x86_64/pgdg-centos94-9.4-3.noarch.rpm ADD tools/docker-compose/proot.repo /etc/yum.repos.d/proot.repo RUN yum -y update && yum -y install openssh-server ansible mg vim tmux git mercurial subversion python-devel python-psycopg2 make postgresql postgresql-devel nodejs python-psutil libxml2-devel libxslt-devel libstdc++.so.6 gcc cyrus-sasl-devel cyrus-sasl openldap-devel libffi-devel zeromq-devel proot python-pip xmlsec1-devel swig krb5-devel xmlsec1-openssl xmlsec1 xmlsec1-openssl-devel libtool-ltdl-devel RUN pip install flake8 pytest==2.9.2 pytest-pythonpath pytest-django pytest-cov pytest-mock dateutils django-debug-toolbar==1.4 pyflakes==1.0.0 virtualenv