mirror of
https://github.com/ansible/awx.git
synced 2026-01-14 19:30:39 -03:30
3127 lines
138 KiB
Python
3127 lines
138 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (c) 2015 Ansible, Inc.
|
|
# All Rights Reserved.
|
|
|
|
# Python
|
|
from collections import OrderedDict, namedtuple
|
|
import errno
|
|
import functools
|
|
import importlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import stat
|
|
import tempfile
|
|
import time
|
|
import traceback
|
|
from distutils.dir_util import copy_tree
|
|
from distutils.version import LooseVersion as Version
|
|
import yaml
|
|
import fcntl
|
|
from pathlib import Path
|
|
from uuid import uuid4
|
|
import urllib.parse as urlparse
|
|
import socket
|
|
import threading
|
|
import concurrent.futures
|
|
from base64 import b64encode
|
|
import subprocess
|
|
import sys
|
|
|
|
# Django
|
|
from django.conf import settings
|
|
from django.db import transaction, DatabaseError, IntegrityError, ProgrammingError, connection
|
|
from django.db.models.fields.related import ForeignKey
|
|
from django.utils.timezone import now
|
|
from django.utils.encoding import smart_str
|
|
from django.contrib.auth.models import User
|
|
from django.utils.translation import ugettext_lazy as _, gettext_noop
|
|
from django.core.cache import cache
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
from django_guid.middleware import GuidMiddleware
|
|
|
|
# Django-CRUM
|
|
from crum import impersonate
|
|
|
|
# GitPython
|
|
import git
|
|
from gitdb.exc import BadName as BadGitName
|
|
|
|
# Runner
|
|
import ansible_runner
|
|
|
|
# Receptor
|
|
from receptorctl.socket_interface import ReceptorControl
|
|
|
|
# AWX
|
|
from awx import __version__ as awx_application_version
|
|
from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV
|
|
from awx.main.access import access_registry
|
|
from awx.main.redact import UriCleaner
|
|
from awx.main.models import (
|
|
Schedule,
|
|
TowerScheduleState,
|
|
Instance,
|
|
InstanceGroup,
|
|
UnifiedJob,
|
|
Notification,
|
|
Inventory,
|
|
InventorySource,
|
|
SmartInventoryMembership,
|
|
Job,
|
|
AdHocCommand,
|
|
ProjectUpdate,
|
|
InventoryUpdate,
|
|
SystemJob,
|
|
JobEvent,
|
|
ProjectUpdateEvent,
|
|
InventoryUpdateEvent,
|
|
AdHocCommandEvent,
|
|
SystemJobEvent,
|
|
build_safe_env,
|
|
)
|
|
from awx.main.constants import ACTIVE_STATES
|
|
from awx.main.exceptions import AwxTaskError, PostRunError
|
|
from awx.main.queue import CallbackQueueDispatcher
|
|
from awx.main.dispatch.publish import task
|
|
from awx.main.dispatch import get_local_queuename, reaper
|
|
from awx.main.utils import (
|
|
update_scm_url,
|
|
ignore_inventory_computed_fields,
|
|
ignore_inventory_group_removal,
|
|
extract_ansible_vars,
|
|
schedule_task_manager,
|
|
get_awx_version,
|
|
deepmerge,
|
|
parse_yaml_or_json,
|
|
cleanup_new_process,
|
|
)
|
|
from awx.main.utils.execution_environments import get_default_execution_environment, get_default_pod_spec, CONTAINER_ROOT, to_container_path
|
|
from awx.main.utils.ansible import read_ansible_config
|
|
from awx.main.utils.external_logging import reconfigure_rsyslog
|
|
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
|
|
from awx.main.utils.reload import stop_local_services
|
|
from awx.main.utils.pglock import advisory_lock
|
|
from awx.main.utils.handlers import SpecialInventoryHandler
|
|
from awx.main.consumers import emit_channel_notification
|
|
from awx.main import analytics
|
|
from awx.conf import settings_registry
|
|
from awx.conf.license import get_license
|
|
from awx.main.analytics.subsystem_metrics import Metrics
|
|
|
|
from rest_framework.exceptions import PermissionDenied
|
|
|
|
|
|
__all__ = [
|
|
'RunJob',
|
|
'RunSystemJob',
|
|
'RunProjectUpdate',
|
|
'RunInventoryUpdate',
|
|
'RunAdHocCommand',
|
|
'handle_work_error',
|
|
'handle_work_success',
|
|
'apply_cluster_membership_policies',
|
|
'update_inventory_computed_fields',
|
|
'update_host_smart_inventory_memberships',
|
|
'send_notifications',
|
|
'purge_old_stdout_files',
|
|
]
|
|
|
|
HIDDEN_PASSWORD = '**********'
|
|
|
|
OPENSSH_KEY_ERROR = u'''\
|
|
It looks like you're trying to use a private key in OpenSSH format, which \
|
|
isn't supported by the installed version of OpenSSH on this instance. \
|
|
Try upgrading OpenSSH or providing your private key in an different format. \
|
|
'''
|
|
|
|
logger = logging.getLogger('awx.main.tasks')
|
|
|
|
|
|
class InvalidVirtualenvError(Exception):
|
|
def __init__(self, message):
|
|
self.message = message
|
|
|
|
|
|
def dispatch_startup():
|
|
startup_logger = logging.getLogger('awx.main.tasks')
|
|
startup_logger.debug("Syncing Schedules")
|
|
for sch in Schedule.objects.all():
|
|
try:
|
|
sch.update_computed_fields()
|
|
except Exception:
|
|
logger.exception("Failed to rebuild schedule {}.".format(sch))
|
|
|
|
#
|
|
# When the dispatcher starts, if the instance cannot be found in the database,
|
|
# automatically register it. This is mostly useful for openshift-based
|
|
# deployments where:
|
|
#
|
|
# 2 Instances come online
|
|
# Instance B encounters a network blip, Instance A notices, and
|
|
# deprovisions it
|
|
# Instance B's connectivity is restored, the dispatcher starts, and it
|
|
# re-registers itself
|
|
#
|
|
# In traditional container-less deployments, instances don't get
|
|
# deprovisioned when they miss their heartbeat, so this code is mostly a
|
|
# no-op.
|
|
#
|
|
apply_cluster_membership_policies()
|
|
cluster_node_heartbeat()
|
|
Metrics().clear_values()
|
|
|
|
# Update Tower's rsyslog.conf file based on loggins settings in the db
|
|
reconfigure_rsyslog()
|
|
|
|
|
|
def inform_cluster_of_shutdown():
|
|
try:
|
|
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
|
|
this_inst.capacity = 0 # No thank you to new jobs while shut down
|
|
this_inst.save(update_fields=['capacity', 'modified'])
|
|
try:
|
|
reaper.reap(this_inst)
|
|
except Exception:
|
|
logger.exception('failed to reap jobs for {}'.format(this_inst.hostname))
|
|
logger.warning('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.'.format(this_inst.hostname))
|
|
except Exception:
|
|
logger.exception('Encountered problem with normal shutdown signal.')
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def apply_cluster_membership_policies():
|
|
started_waiting = time.time()
|
|
with advisory_lock('cluster_policy_lock', wait=True):
|
|
lock_time = time.time() - started_waiting
|
|
if lock_time > 1.0:
|
|
to_log = logger.info
|
|
else:
|
|
to_log = logger.debug
|
|
to_log('Waited {} seconds to obtain lock name: cluster_policy_lock'.format(lock_time))
|
|
started_compute = time.time()
|
|
all_instances = list(Instance.objects.order_by('id'))
|
|
all_groups = list(InstanceGroup.objects.prefetch_related('instances'))
|
|
|
|
total_instances = len(all_instances)
|
|
actual_groups = []
|
|
actual_instances = []
|
|
Group = namedtuple('Group', ['obj', 'instances', 'prior_instances'])
|
|
Node = namedtuple('Instance', ['obj', 'groups'])
|
|
|
|
# Process policy instance list first, these will represent manually managed memberships
|
|
instance_hostnames_map = {inst.hostname: inst for inst in all_instances}
|
|
for ig in all_groups:
|
|
group_actual = Group(obj=ig, instances=[], prior_instances=[instance.pk for instance in ig.instances.all()]) # obtained in prefetch
|
|
for hostname in ig.policy_instance_list:
|
|
if hostname not in instance_hostnames_map:
|
|
logger.info("Unknown instance {} in {} policy list".format(hostname, ig.name))
|
|
continue
|
|
inst = instance_hostnames_map[hostname]
|
|
group_actual.instances.append(inst.id)
|
|
# NOTE: arguable behavior: policy-list-group is not added to
|
|
# instance's group count for consideration in minimum-policy rules
|
|
if group_actual.instances:
|
|
logger.debug("Policy List, adding Instances {} to Group {}".format(group_actual.instances, ig.name))
|
|
|
|
actual_groups.append(group_actual)
|
|
|
|
# Process Instance minimum policies next, since it represents a concrete lower bound to the
|
|
# number of instances to make available to instance groups
|
|
actual_instances = [Node(obj=i, groups=[]) for i in all_instances if i.managed_by_policy]
|
|
logger.debug("Total instances: {}, available for policy: {}".format(total_instances, len(actual_instances)))
|
|
for g in sorted(actual_groups, key=lambda x: len(x.instances)):
|
|
policy_min_added = []
|
|
for i in sorted(actual_instances, key=lambda x: len(x.groups)):
|
|
if len(g.instances) >= g.obj.policy_instance_minimum:
|
|
break
|
|
if i.obj.id in g.instances:
|
|
# If the instance is already _in_ the group, it was
|
|
# applied earlier via the policy list
|
|
continue
|
|
g.instances.append(i.obj.id)
|
|
i.groups.append(g.obj.id)
|
|
policy_min_added.append(i.obj.id)
|
|
if policy_min_added:
|
|
logger.debug("Policy minimum, adding Instances {} to Group {}".format(policy_min_added, g.obj.name))
|
|
|
|
# Finally, process instance policy percentages
|
|
for g in sorted(actual_groups, key=lambda x: len(x.instances)):
|
|
policy_per_added = []
|
|
for i in sorted(actual_instances, key=lambda x: len(x.groups)):
|
|
if i.obj.id in g.instances:
|
|
# If the instance is already _in_ the group, it was
|
|
# applied earlier via a minimum policy or policy list
|
|
continue
|
|
if 100 * float(len(g.instances)) / len(actual_instances) >= g.obj.policy_instance_percentage:
|
|
break
|
|
g.instances.append(i.obj.id)
|
|
i.groups.append(g.obj.id)
|
|
policy_per_added.append(i.obj.id)
|
|
if policy_per_added:
|
|
logger.debug("Policy percentage, adding Instances {} to Group {}".format(policy_per_added, g.obj.name))
|
|
|
|
# Determine if any changes need to be made
|
|
needs_change = False
|
|
for g in actual_groups:
|
|
if set(g.instances) != set(g.prior_instances):
|
|
needs_change = True
|
|
break
|
|
if not needs_change:
|
|
logger.debug('Cluster policy no-op finished in {} seconds'.format(time.time() - started_compute))
|
|
return
|
|
|
|
# On a differential basis, apply instances to groups
|
|
with transaction.atomic():
|
|
for g in actual_groups:
|
|
if g.obj.is_container_group:
|
|
logger.debug('Skipping containerized group {} for policy calculation'.format(g.obj.name))
|
|
continue
|
|
instances_to_add = set(g.instances) - set(g.prior_instances)
|
|
instances_to_remove = set(g.prior_instances) - set(g.instances)
|
|
if instances_to_add:
|
|
logger.debug('Adding instances {} to group {}'.format(list(instances_to_add), g.obj.name))
|
|
g.obj.instances.add(*instances_to_add)
|
|
if instances_to_remove:
|
|
logger.debug('Removing instances {} from group {}'.format(list(instances_to_remove), g.obj.name))
|
|
g.obj.instances.remove(*instances_to_remove)
|
|
logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))
|
|
|
|
|
|
@task(queue='tower_broadcast_all')
|
|
def handle_setting_changes(setting_keys):
|
|
orig_len = len(setting_keys)
|
|
for i in range(orig_len):
|
|
for dependent_key in settings_registry.get_dependent_settings(setting_keys[i]):
|
|
setting_keys.append(dependent_key)
|
|
cache_keys = set(setting_keys)
|
|
logger.debug('cache delete_many(%r)', cache_keys)
|
|
cache.delete_many(cache_keys)
|
|
|
|
if any([setting.startswith('LOG_AGGREGATOR') for setting in setting_keys]):
|
|
reconfigure_rsyslog()
|
|
|
|
|
|
@task(queue='tower_broadcast_all')
|
|
def delete_project_files(project_path):
|
|
# TODO: possibly implement some retry logic
|
|
lock_file = project_path + '.lock'
|
|
if os.path.exists(project_path):
|
|
try:
|
|
shutil.rmtree(project_path)
|
|
logger.debug('Success removing project files {}'.format(project_path))
|
|
except Exception:
|
|
logger.exception('Could not remove project directory {}'.format(project_path))
|
|
if os.path.exists(lock_file):
|
|
try:
|
|
os.remove(lock_file)
|
|
logger.debug('Success removing {}'.format(lock_file))
|
|
except Exception:
|
|
logger.exception('Could not remove lock file {}'.format(lock_file))
|
|
|
|
|
|
@task(queue='tower_broadcast_all')
|
|
def profile_sql(threshold=1, minutes=1):
|
|
if threshold <= 0:
|
|
cache.delete('awx-profile-sql-threshold')
|
|
logger.error('SQL PROFILING DISABLED')
|
|
else:
|
|
cache.set('awx-profile-sql-threshold', threshold, timeout=minutes * 60)
|
|
logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes))
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def send_notifications(notification_list, job_id=None):
|
|
if not isinstance(notification_list, list):
|
|
raise TypeError("notification_list should be of type list")
|
|
if job_id is not None:
|
|
job_actual = UnifiedJob.objects.get(id=job_id)
|
|
|
|
notifications = Notification.objects.filter(id__in=notification_list)
|
|
if job_id is not None:
|
|
job_actual.notifications.add(*notifications)
|
|
|
|
for notification in notifications:
|
|
update_fields = ['status', 'notifications_sent']
|
|
try:
|
|
sent = notification.notification_template.send(notification.subject, notification.body)
|
|
notification.status = "successful"
|
|
notification.notifications_sent = sent
|
|
if job_id is not None:
|
|
job_actual.log_lifecycle("notifications_sent")
|
|
except Exception as e:
|
|
logger.exception("Send Notification Failed {}".format(e))
|
|
notification.status = "failed"
|
|
notification.error = smart_str(e)
|
|
update_fields.append('error')
|
|
finally:
|
|
try:
|
|
notification.save(update_fields=update_fields)
|
|
except Exception:
|
|
logger.exception('Error saving notification {} result.'.format(notification.id))
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def gather_analytics():
|
|
from awx.conf.models import Setting
|
|
from rest_framework.fields import DateTimeField
|
|
|
|
last_gather = Setting.objects.filter(key='AUTOMATION_ANALYTICS_LAST_GATHER').first()
|
|
last_time = DateTimeField().to_internal_value(last_gather.value) if last_gather and last_gather.value else None
|
|
gather_time = now()
|
|
|
|
if not last_time or ((gather_time - last_time).total_seconds() > settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL):
|
|
analytics.gather()
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def purge_old_stdout_files():
|
|
nowtime = time.time()
|
|
for f in os.listdir(settings.JOBOUTPUT_ROOT):
|
|
if os.path.getctime(os.path.join(settings.JOBOUTPUT_ROOT, f)) < nowtime - settings.LOCAL_STDOUT_EXPIRE_TIME:
|
|
os.unlink(os.path.join(settings.JOBOUTPUT_ROOT, f))
|
|
logger.debug("Removing {}".format(os.path.join(settings.JOBOUTPUT_ROOT, f)))
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def cleanup_execution_environment_images():
|
|
if settings.IS_K8S:
|
|
return
|
|
process = subprocess.run('podman images --filter="dangling=true" --format json'.split(" "), capture_output=True)
|
|
if process.returncode != 0:
|
|
logger.debug("Cleanup execution environment images: could not get list of images")
|
|
return
|
|
if len(process.stdout) > 0:
|
|
images_system = json.loads(process.stdout)
|
|
for e in images_system:
|
|
image_name = e["Id"]
|
|
logger.debug(f"Cleanup execution environment images: deleting {image_name}")
|
|
process = subprocess.run(['podman', 'rmi', image_name, '-f'], stdout=subprocess.DEVNULL)
|
|
if process.returncode != 0:
|
|
logger.debug(f"Failed to delete image {image_name}")
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def cluster_node_heartbeat():
|
|
logger.debug("Cluster node heartbeat task.")
|
|
nowtime = now()
|
|
instance_list = list(Instance.objects.all())
|
|
this_inst = None
|
|
lost_instances = []
|
|
|
|
(changed, instance) = Instance.objects.get_or_register()
|
|
if changed:
|
|
logger.info("Registered tower node '{}'".format(instance.hostname))
|
|
|
|
for inst in list(instance_list):
|
|
if inst.hostname == settings.CLUSTER_HOST_ID:
|
|
this_inst = inst
|
|
instance_list.remove(inst)
|
|
elif inst.is_lost(ref_time=nowtime):
|
|
lost_instances.append(inst)
|
|
instance_list.remove(inst)
|
|
if this_inst:
|
|
startup_event = this_inst.is_lost(ref_time=nowtime)
|
|
this_inst.refresh_capacity()
|
|
if startup_event:
|
|
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
|
|
return
|
|
else:
|
|
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
|
|
# IFF any node has a greater version than we do, then we'll shutdown services
|
|
for other_inst in instance_list:
|
|
if other_inst.version == "":
|
|
continue
|
|
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
|
|
logger.error(
|
|
"Host {} reports version {}, but this node {} is at {}, shutting down".format(
|
|
other_inst.hostname, other_inst.version, this_inst.hostname, this_inst.version
|
|
)
|
|
)
|
|
# Shutdown signal will set the capacity to zero to ensure no Jobs get added to this instance.
|
|
# The heartbeat task will reset the capacity to the system capacity after upgrade.
|
|
stop_local_services(communicate=False)
|
|
raise RuntimeError("Shutting down.")
|
|
for other_inst in lost_instances:
|
|
try:
|
|
reaper.reap(other_inst)
|
|
except Exception:
|
|
logger.exception('failed to reap jobs for {}'.format(other_inst.hostname))
|
|
try:
|
|
# Capacity could already be 0 because:
|
|
# * It's a new node and it never had a heartbeat
|
|
# * It was set to 0 by another tower node running this method
|
|
# * It was set to 0 by this node, but auto deprovisioning is off
|
|
#
|
|
# If auto deprovisining is on, don't bother setting the capacity to 0
|
|
# since we will delete the node anyway.
|
|
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
|
other_inst.capacity = 0
|
|
other_inst.save(update_fields=['capacity'])
|
|
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.modified))
|
|
elif settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
|
deprovision_hostname = other_inst.hostname
|
|
other_inst.delete()
|
|
logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname))
|
|
except DatabaseError as e:
|
|
if 'did not affect any rows' in str(e):
|
|
logger.debug('Another instance has marked {} as lost'.format(other_inst.hostname))
|
|
else:
|
|
logger.exception('Error marking {} as lost'.format(other_inst.hostname))
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def awx_k8s_reaper():
|
|
if not settings.RECEPTOR_RELEASE_WORK:
|
|
return
|
|
|
|
from awx.main.scheduler.kubernetes import PodManager # prevent circular import
|
|
|
|
for group in InstanceGroup.objects.filter(is_container_group=True).iterator():
|
|
logger.debug("Checking for orphaned k8s pods for {}.".format(group))
|
|
pods = PodManager.list_active_jobs(group)
|
|
for job in UnifiedJob.objects.filter(pk__in=pods.keys()).exclude(status__in=ACTIVE_STATES):
|
|
logger.debug('{} is no longer active, reaping orphaned k8s pod'.format(job.log_format))
|
|
try:
|
|
pm = PodManager(job)
|
|
pm.kube_api.delete_namespaced_pod(name=pods[job.id], namespace=pm.namespace, _request_timeout=settings.AWX_CONTAINER_GROUP_K8S_API_TIMEOUT)
|
|
except Exception:
|
|
logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group))
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def awx_periodic_scheduler():
|
|
with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired:
|
|
if acquired is False:
|
|
logger.debug("Not running periodic scheduler, another task holds lock")
|
|
return
|
|
logger.debug("Starting periodic scheduler")
|
|
|
|
run_now = now()
|
|
state = TowerScheduleState.get_solo()
|
|
last_run = state.schedule_last_run
|
|
logger.debug("Last scheduler run was: %s", last_run)
|
|
state.schedule_last_run = run_now
|
|
state.save()
|
|
|
|
old_schedules = Schedule.objects.enabled().before(last_run)
|
|
for schedule in old_schedules:
|
|
schedule.update_computed_fields()
|
|
schedules = Schedule.objects.enabled().between(last_run, run_now)
|
|
|
|
invalid_license = False
|
|
try:
|
|
access_registry[Job](None).check_license(quiet=True)
|
|
except PermissionDenied as e:
|
|
invalid_license = e
|
|
|
|
for schedule in schedules:
|
|
template = schedule.unified_job_template
|
|
schedule.update_computed_fields() # To update next_run timestamp.
|
|
if template.cache_timeout_blocked:
|
|
logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
|
|
continue
|
|
try:
|
|
job_kwargs = schedule.get_job_kwargs()
|
|
new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
|
|
logger.debug('Spawned {} from schedule {}-{}.'.format(new_unified_job.log_format, schedule.name, schedule.pk))
|
|
|
|
if invalid_license:
|
|
new_unified_job.status = 'failed'
|
|
new_unified_job.job_explanation = str(invalid_license)
|
|
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
|
new_unified_job.websocket_emit_status("failed")
|
|
raise invalid_license
|
|
can_start = new_unified_job.signal_start()
|
|
except Exception:
|
|
logger.exception('Error spawning scheduled job.')
|
|
continue
|
|
if not can_start:
|
|
new_unified_job.status = 'failed'
|
|
new_unified_job.job_explanation = gettext_noop(
|
|
"Scheduled job could not start because it \
|
|
was not in the right state or required manual credentials"
|
|
)
|
|
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
|
new_unified_job.websocket_emit_status("failed")
|
|
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
|
|
state.save()
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def handle_work_success(task_actual):
|
|
try:
|
|
instance = UnifiedJob.get_instance_by_type(task_actual['type'], task_actual['id'])
|
|
except ObjectDoesNotExist:
|
|
logger.warning('Missing {} `{}` in success callback.'.format(task_actual['type'], task_actual['id']))
|
|
return
|
|
if not instance:
|
|
return
|
|
|
|
schedule_task_manager()
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def handle_work_error(task_id, *args, **kwargs):
|
|
subtasks = kwargs.get('subtasks', None)
|
|
logger.debug('Executing error task id %s, subtasks: %s' % (task_id, str(subtasks)))
|
|
first_instance = None
|
|
first_instance_type = ''
|
|
if subtasks is not None:
|
|
for each_task in subtasks:
|
|
try:
|
|
instance = UnifiedJob.get_instance_by_type(each_task['type'], each_task['id'])
|
|
if not instance:
|
|
# Unknown task type
|
|
logger.warn("Unknown task type: {}".format(each_task['type']))
|
|
continue
|
|
except ObjectDoesNotExist:
|
|
logger.warning('Missing {} `{}` in error callback.'.format(each_task['type'], each_task['id']))
|
|
continue
|
|
|
|
if first_instance is None:
|
|
first_instance = instance
|
|
first_instance_type = each_task['type']
|
|
|
|
if instance.celery_task_id != task_id and not instance.cancel_flag and not instance.status == 'successful':
|
|
instance.status = 'failed'
|
|
instance.failed = True
|
|
if not instance.job_explanation:
|
|
instance.job_explanation = 'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (
|
|
first_instance_type,
|
|
first_instance.name,
|
|
first_instance.id,
|
|
)
|
|
instance.save()
|
|
instance.websocket_emit_status("failed")
|
|
|
|
# We only send 1 job complete message since all the job completion message
|
|
# handling does is trigger the scheduler. If we extend the functionality of
|
|
# what the job complete message handler does then we may want to send a
|
|
# completion event for each job here.
|
|
if first_instance:
|
|
schedule_task_manager()
|
|
pass
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def handle_success_and_failure_notifications(job_id):
|
|
uj = UnifiedJob.objects.get(pk=job_id)
|
|
retries = 0
|
|
while retries < 5:
|
|
if uj.finished:
|
|
uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')
|
|
return
|
|
else:
|
|
# wait a few seconds to avoid a race where the
|
|
# events are persisted _before_ the UJ.status
|
|
# changes from running -> successful
|
|
retries += 1
|
|
time.sleep(1)
|
|
uj = UnifiedJob.objects.get(pk=job_id)
|
|
|
|
logger.warn(f"Failed to even try to send notifications for job '{uj}' due to job not being in finished state.")
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def update_inventory_computed_fields(inventory_id):
|
|
"""
|
|
Signal handler and wrapper around inventory.update_computed_fields to
|
|
prevent unnecessary recursive calls.
|
|
"""
|
|
i = Inventory.objects.filter(id=inventory_id)
|
|
if not i.exists():
|
|
logger.error("Update Inventory Computed Fields failed due to missing inventory: " + str(inventory_id))
|
|
return
|
|
i = i[0]
|
|
try:
|
|
i.update_computed_fields()
|
|
except DatabaseError as e:
|
|
if 'did not affect any rows' in str(e):
|
|
logger.debug('Exiting duplicate update_inventory_computed_fields task.')
|
|
return
|
|
raise
|
|
|
|
|
|
def update_smart_memberships_for_inventory(smart_inventory):
|
|
current = set(SmartInventoryMembership.objects.filter(inventory=smart_inventory).values_list('host_id', flat=True))
|
|
new = set(smart_inventory.hosts.values_list('id', flat=True))
|
|
additions = new - current
|
|
removals = current - new
|
|
if additions or removals:
|
|
with transaction.atomic():
|
|
if removals:
|
|
SmartInventoryMembership.objects.filter(inventory=smart_inventory, host_id__in=removals).delete()
|
|
if additions:
|
|
add_for_inventory = [SmartInventoryMembership(inventory_id=smart_inventory.id, host_id=host_id) for host_id in additions]
|
|
SmartInventoryMembership.objects.bulk_create(add_for_inventory, ignore_conflicts=True)
|
|
logger.debug(
|
|
'Smart host membership cached for {}, {} additions, {} removals, {} total count.'.format(
|
|
smart_inventory.pk, len(additions), len(removals), len(new)
|
|
)
|
|
)
|
|
return True # changed
|
|
return False
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def update_host_smart_inventory_memberships():
|
|
smart_inventories = Inventory.objects.filter(kind='smart', host_filter__isnull=False, pending_deletion=False)
|
|
changed_inventories = set([])
|
|
for smart_inventory in smart_inventories:
|
|
try:
|
|
changed = update_smart_memberships_for_inventory(smart_inventory)
|
|
if changed:
|
|
changed_inventories.add(smart_inventory)
|
|
except IntegrityError:
|
|
logger.exception('Failed to update smart inventory memberships for {}'.format(smart_inventory.pk))
|
|
# Update computed fields for changed inventories outside atomic action
|
|
for smart_inventory in changed_inventories:
|
|
smart_inventory.update_computed_fields()
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def migrate_legacy_event_data(tblname):
|
|
#
|
|
# NOTE: this function is not actually in use anymore,
|
|
# but has been intentionally kept for historical purposes,
|
|
# and to serve as an illustration if we ever need to perform
|
|
# bulk modification/migration of event data in the future.
|
|
#
|
|
if 'event' not in tblname:
|
|
return
|
|
with advisory_lock(f'bigint_migration_{tblname}', wait=False) as acquired:
|
|
if acquired is False:
|
|
return
|
|
chunk = settings.JOB_EVENT_MIGRATION_CHUNK_SIZE
|
|
|
|
def _remaining():
|
|
try:
|
|
cursor.execute(f'SELECT MAX(id) FROM _old_{tblname};')
|
|
return cursor.fetchone()[0]
|
|
except ProgrammingError:
|
|
# the table is gone (migration is unnecessary)
|
|
return None
|
|
|
|
with connection.cursor() as cursor:
|
|
total_rows = _remaining()
|
|
while total_rows:
|
|
with transaction.atomic():
|
|
cursor.execute(f'INSERT INTO {tblname} SELECT * FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk} RETURNING id;')
|
|
last_insert_pk = cursor.fetchone()
|
|
if last_insert_pk is None:
|
|
# this means that the SELECT from the old table was
|
|
# empty, and there was nothing to insert (so we're done)
|
|
break
|
|
last_insert_pk = last_insert_pk[0]
|
|
cursor.execute(f'DELETE FROM _old_{tblname} WHERE id IN (SELECT id FROM _old_{tblname} ORDER BY id DESC LIMIT {chunk});')
|
|
logger.warn(f'migrated int -> bigint rows to {tblname} from _old_{tblname}; # ({last_insert_pk} rows remaining)')
|
|
|
|
if _remaining() is None:
|
|
cursor.execute(f'DROP TABLE IF EXISTS _old_{tblname}')
|
|
logger.warn(f'{tblname} primary key migration to bigint has finished')
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def delete_inventory(inventory_id, user_id, retries=5):
|
|
# Delete inventory as user
|
|
if user_id is None:
|
|
user = None
|
|
else:
|
|
try:
|
|
user = User.objects.get(id=user_id)
|
|
except Exception:
|
|
user = None
|
|
with ignore_inventory_computed_fields(), ignore_inventory_group_removal(), impersonate(user):
|
|
try:
|
|
i = Inventory.objects.get(id=inventory_id)
|
|
for host in i.hosts.iterator():
|
|
host.job_events_as_primary_host.update(host=None)
|
|
i.delete()
|
|
emit_channel_notification('inventories-status_changed', {'group_name': 'inventories', 'inventory_id': inventory_id, 'status': 'deleted'})
|
|
logger.debug('Deleted inventory {} as user {}.'.format(inventory_id, user_id))
|
|
except Inventory.DoesNotExist:
|
|
logger.exception("Delete Inventory failed due to missing inventory: " + str(inventory_id))
|
|
return
|
|
except DatabaseError:
|
|
logger.exception('Database error deleting inventory {}, but will retry.'.format(inventory_id))
|
|
if retries > 0:
|
|
time.sleep(10)
|
|
delete_inventory(inventory_id, user_id, retries=retries - 1)
|
|
|
|
|
|
def with_path_cleanup(f):
|
|
@functools.wraps(f)
|
|
def _wrapped(self, *args, **kwargs):
|
|
try:
|
|
return f(self, *args, **kwargs)
|
|
finally:
|
|
for p in self.cleanup_paths:
|
|
try:
|
|
if os.path.isdir(p):
|
|
shutil.rmtree(p, ignore_errors=True)
|
|
elif os.path.exists(p):
|
|
os.remove(p)
|
|
except OSError:
|
|
logger.exception("Failed to remove tmp file: {}".format(p))
|
|
self.cleanup_paths = []
|
|
|
|
return _wrapped
|
|
|
|
|
|
class BaseTask(object):
|
|
model = None
|
|
event_model = None
|
|
abstract = True
|
|
|
|
def __init__(self):
|
|
self.cleanup_paths = []
|
|
self.parent_workflow_job_id = None
|
|
self.host_map = {}
|
|
self.guid = GuidMiddleware.get_guid()
|
|
self.job_created = None
|
|
|
|
def update_model(self, pk, _attempt=0, **updates):
|
|
"""Reload the model instance from the database and update the
|
|
given fields.
|
|
"""
|
|
try:
|
|
with transaction.atomic():
|
|
# Retrieve the model instance.
|
|
instance = self.model.objects.get(pk=pk)
|
|
|
|
# Update the appropriate fields and save the model
|
|
# instance, then return the new instance.
|
|
if updates:
|
|
update_fields = ['modified']
|
|
for field, value in updates.items():
|
|
setattr(instance, field, value)
|
|
update_fields.append(field)
|
|
if field == 'status':
|
|
update_fields.append('failed')
|
|
instance.save(update_fields=update_fields)
|
|
return instance
|
|
except DatabaseError as e:
|
|
# Log out the error to the debug logger.
|
|
logger.debug('Database error updating %s, retrying in 5 ' 'seconds (retry #%d): %s', self.model._meta.object_name, _attempt + 1, e)
|
|
|
|
# Attempt to retry the update, assuming we haven't already
|
|
# tried too many times.
|
|
if _attempt < 5:
|
|
time.sleep(5)
|
|
return self.update_model(pk, _attempt=_attempt + 1, **updates)
|
|
else:
|
|
logger.error('Failed to update %s after %d retries.', self.model._meta.object_name, _attempt)
|
|
|
|
def get_path_to(self, *args):
|
|
"""
|
|
Return absolute path relative to this file.
|
|
"""
|
|
return os.path.abspath(os.path.join(os.path.dirname(__file__), *args))
|
|
|
|
def build_execution_environment_params(self, instance, private_data_dir):
|
|
if settings.IS_K8S:
|
|
return {}
|
|
|
|
image = instance.execution_environment.image
|
|
params = {
|
|
"container_image": image,
|
|
"process_isolation": True,
|
|
"container_options": ['--user=root'],
|
|
}
|
|
|
|
if instance.execution_environment.credential:
|
|
cred = instance.execution_environment.credential
|
|
if cred.has_inputs(field_names=('host', 'username', 'password')):
|
|
path = os.path.split(private_data_dir)[0]
|
|
with open(path + '/auth.json', 'w') as authfile:
|
|
os.chmod(authfile.name, stat.S_IRUSR | stat.S_IWUSR)
|
|
|
|
host = cred.get_input('host')
|
|
username = cred.get_input('username')
|
|
password = cred.get_input('password')
|
|
token = "{}:{}".format(username, password)
|
|
auth_data = {'auths': {host: {'auth': b64encode(token.encode('UTF-8')).decode('UTF-8')}}}
|
|
authfile.write(json.dumps(auth_data, indent=4))
|
|
params["container_options"].append(f'--authfile={authfile.name}')
|
|
else:
|
|
raise RuntimeError('Please recheck that your host, username, and password fields are all filled.')
|
|
|
|
pull = instance.execution_environment.pull
|
|
if pull:
|
|
params['container_options'].append(f'--pull={pull}')
|
|
|
|
if settings.AWX_ISOLATION_SHOW_PATHS:
|
|
params['container_volume_mounts'] = []
|
|
for this_path in settings.AWX_ISOLATION_SHOW_PATHS:
|
|
# Using z allows the dir to mounted by multiple containers
|
|
# Uppercase Z restricts access (in weird ways) to 1 container at a time
|
|
params['container_volume_mounts'].append(f'{this_path}:{this_path}:z')
|
|
return params
|
|
|
|
def build_private_data(self, instance, private_data_dir):
|
|
"""
|
|
Return SSH private key data (only if stored in DB as ssh_key_data).
|
|
Return structure is a dict of the form:
|
|
"""
|
|
|
|
def build_private_data_dir(self, instance):
|
|
"""
|
|
Create a temporary directory for job-related files.
|
|
"""
|
|
pdd_wrapper_path = tempfile.mkdtemp(prefix=f'pdd_wrapper_{instance.pk}_', dir=settings.AWX_ISOLATION_BASE_PATH)
|
|
os.chmod(pdd_wrapper_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
|
if settings.AWX_CLEANUP_PATHS:
|
|
self.cleanup_paths.append(pdd_wrapper_path)
|
|
|
|
path = tempfile.mkdtemp(prefix='awx_%s_' % instance.pk, dir=pdd_wrapper_path)
|
|
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
|
# Ansible runner requires that project exists,
|
|
# and we will write files in the other folders without pre-creating the folder
|
|
for subfolder in ('project', 'inventory', 'env'):
|
|
runner_subfolder = os.path.join(path, subfolder)
|
|
if not os.path.exists(runner_subfolder):
|
|
os.mkdir(runner_subfolder)
|
|
return path
|
|
|
|
def build_private_data_files(self, instance, private_data_dir):
|
|
"""
|
|
Creates temporary files containing the private data.
|
|
Returns a dictionary i.e.,
|
|
|
|
{
|
|
'credentials': {
|
|
<awx.main.models.Credential>: '/path/to/decrypted/data',
|
|
<awx.main.models.Credential>: '/path/to/decrypted/data',
|
|
...
|
|
},
|
|
'certificates': {
|
|
<awx.main.models.Credential>: /path/to/signed/ssh/certificate,
|
|
<awx.main.models.Credential>: /path/to/signed/ssh/certificate,
|
|
...
|
|
}
|
|
}
|
|
"""
|
|
private_data = self.build_private_data(instance, private_data_dir)
|
|
private_data_files = {'credentials': {}}
|
|
if private_data is not None:
|
|
for credential, data in private_data.get('credentials', {}).items():
|
|
# OpenSSH formatted keys must have a trailing newline to be
|
|
# accepted by ssh-add.
|
|
if 'OPENSSH PRIVATE KEY' in data and not data.endswith('\n'):
|
|
data += '\n'
|
|
# For credentials used with ssh-add, write to a named pipe which
|
|
# will be read then closed, instead of leaving the SSH key on disk.
|
|
if credential and credential.credential_type.namespace in ('ssh', 'scm'):
|
|
try:
|
|
os.mkdir(os.path.join(private_data_dir, 'env'))
|
|
except OSError as e:
|
|
if e.errno != errno.EEXIST:
|
|
raise
|
|
path = os.path.join(private_data_dir, 'env', 'ssh_key')
|
|
ansible_runner.utils.open_fifo_write(path, data.encode())
|
|
private_data_files['credentials']['ssh'] = path
|
|
# Ansible network modules do not yet support ssh-agent.
|
|
# Instead, ssh private key file is explicitly passed via an
|
|
# env variable.
|
|
else:
|
|
handle, path = tempfile.mkstemp(dir=os.path.join(private_data_dir, 'env'))
|
|
f = os.fdopen(handle, 'w')
|
|
f.write(data)
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
|
|
private_data_files['credentials'][credential] = path
|
|
for credential, data in private_data.get('certificates', {}).items():
|
|
artifact_dir = os.path.join(private_data_dir, 'artifacts', str(self.instance.id))
|
|
if not os.path.exists(artifact_dir):
|
|
os.makedirs(artifact_dir, mode=0o700)
|
|
path = os.path.join(artifact_dir, 'ssh_key_data-cert.pub')
|
|
with open(path, 'w') as f:
|
|
f.write(data)
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
|
|
return private_data_files
|
|
|
|
def build_passwords(self, instance, runtime_passwords):
|
|
"""
|
|
Build a dictionary of passwords for responding to prompts.
|
|
"""
|
|
return {
|
|
'yes': 'yes',
|
|
'no': 'no',
|
|
'': '',
|
|
}
|
|
|
|
def build_extra_vars_file(self, instance, private_data_dir):
|
|
"""
|
|
Build ansible yaml file filled with extra vars to be passed via -e@file.yml
|
|
"""
|
|
|
|
def _write_extra_vars_file(self, private_data_dir, vars, safe_dict={}):
|
|
env_path = os.path.join(private_data_dir, 'env')
|
|
try:
|
|
os.mkdir(env_path, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
except OSError as e:
|
|
if e.errno != errno.EEXIST:
|
|
raise
|
|
|
|
path = os.path.join(env_path, 'extravars')
|
|
handle = os.open(path, os.O_RDWR | os.O_CREAT, stat.S_IREAD | stat.S_IWRITE)
|
|
f = os.fdopen(handle, 'w')
|
|
if settings.ALLOW_JINJA_IN_EXTRA_VARS == 'always':
|
|
f.write(yaml.safe_dump(vars))
|
|
else:
|
|
f.write(safe_dump(vars, safe_dict))
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR)
|
|
return path
|
|
|
|
def add_awx_venv(self, env):
|
|
env['VIRTUAL_ENV'] = settings.AWX_VENV_PATH
|
|
if 'PATH' in env:
|
|
env['PATH'] = os.path.join(settings.AWX_VENV_PATH, "bin") + ":" + env['PATH']
|
|
else:
|
|
env['PATH'] = os.path.join(settings.AWX_VENV_PATH, "bin")
|
|
|
|
def build_env(self, instance, private_data_dir, private_data_files=None):
|
|
"""
|
|
Build environment dictionary for ansible-playbook.
|
|
"""
|
|
env = {}
|
|
# Add ANSIBLE_* settings to the subprocess environment.
|
|
for attr in dir(settings):
|
|
if attr == attr.upper() and attr.startswith('ANSIBLE_'):
|
|
env[attr] = str(getattr(settings, attr))
|
|
# Also set environment variables configured in AWX_TASK_ENV setting.
|
|
for key, value in settings.AWX_TASK_ENV.items():
|
|
env[key] = str(value)
|
|
|
|
env['AWX_PRIVATE_DATA_DIR'] = private_data_dir
|
|
|
|
if self.instance.execution_environment is None:
|
|
raise RuntimeError('The project could not sync because there is no Execution Environment.')
|
|
|
|
ee_cred = self.instance.execution_environment.credential
|
|
if ee_cred:
|
|
verify_ssl = ee_cred.get_input('verify_ssl')
|
|
if not verify_ssl:
|
|
pdd_wrapper_path = os.path.split(private_data_dir)[0]
|
|
registries_conf_path = os.path.join(pdd_wrapper_path, 'registries.conf')
|
|
host = ee_cred.get_input('host')
|
|
|
|
with open(registries_conf_path, 'w') as registries_conf:
|
|
os.chmod(registries_conf.name, stat.S_IRUSR | stat.S_IWUSR)
|
|
|
|
lines = [
|
|
'[[registry]]',
|
|
'location = "{}"'.format(host),
|
|
'insecure = true',
|
|
]
|
|
|
|
registries_conf.write('\n'.join(lines))
|
|
|
|
# Podman >= 3.1.0
|
|
env['CONTAINERS_REGISTRIES_CONF'] = registries_conf_path
|
|
# Podman < 3.1.0
|
|
env['REGISTRIES_CONFIG_PATH'] = registries_conf_path
|
|
|
|
return env
|
|
|
|
def build_inventory(self, instance, private_data_dir):
|
|
script_params = dict(hostvars=True, towervars=True)
|
|
if hasattr(instance, 'job_slice_number'):
|
|
script_params['slice_number'] = instance.job_slice_number
|
|
script_params['slice_count'] = instance.job_slice_count
|
|
script_data = instance.inventory.get_script_data(**script_params)
|
|
# maintain a list of host_name --> host_id
|
|
# so we can associate emitted events to Host objects
|
|
self.host_map = {hostname: hv.pop('remote_tower_id', '') for hostname, hv in script_data.get('_meta', {}).get('hostvars', {}).items()}
|
|
json_data = json.dumps(script_data)
|
|
path = os.path.join(private_data_dir, 'inventory')
|
|
fn = os.path.join(path, 'hosts')
|
|
with open(fn, 'w') as f:
|
|
os.chmod(fn, stat.S_IRUSR | stat.S_IXUSR | stat.S_IWUSR)
|
|
f.write('#! /usr/bin/env python3\n# -*- coding: utf-8 -*-\nprint(%r)\n' % json_data)
|
|
return fn
|
|
|
|
def build_args(self, instance, private_data_dir, passwords):
|
|
raise NotImplementedError
|
|
|
|
def write_args_file(self, private_data_dir, args):
|
|
env_path = os.path.join(private_data_dir, 'env')
|
|
try:
|
|
os.mkdir(env_path, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
except OSError as e:
|
|
if e.errno != errno.EEXIST:
|
|
raise
|
|
|
|
path = os.path.join(env_path, 'cmdline')
|
|
handle = os.open(path, os.O_RDWR | os.O_CREAT, stat.S_IREAD | stat.S_IWRITE)
|
|
f = os.fdopen(handle, 'w')
|
|
f.write(ansible_runner.utils.args2cmdline(*args))
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR)
|
|
return path
|
|
|
|
def build_credentials_list(self, instance):
|
|
return []
|
|
|
|
def get_instance_timeout(self, instance):
|
|
global_timeout_setting_name = instance._global_timeout_setting()
|
|
if global_timeout_setting_name:
|
|
global_timeout = getattr(settings, global_timeout_setting_name, 0)
|
|
local_timeout = getattr(instance, 'timeout', 0)
|
|
job_timeout = global_timeout if local_timeout == 0 else local_timeout
|
|
job_timeout = 0 if local_timeout < 0 else job_timeout
|
|
else:
|
|
job_timeout = 0
|
|
return job_timeout
|
|
|
|
def get_password_prompts(self, passwords={}):
|
|
"""
|
|
Return a dictionary where keys are strings or regular expressions for
|
|
prompts, and values are password lookup keys (keys that are returned
|
|
from build_passwords).
|
|
"""
|
|
return OrderedDict()
|
|
|
|
def create_expect_passwords_data_struct(self, password_prompts, passwords):
|
|
expect_passwords = {}
|
|
for k, v in password_prompts.items():
|
|
expect_passwords[k] = passwords.get(v, '') or ''
|
|
return expect_passwords
|
|
|
|
def pre_run_hook(self, instance, private_data_dir):
|
|
"""
|
|
Hook for any steps to run before the job/task starts
|
|
"""
|
|
instance.log_lifecycle("pre_run")
|
|
|
|
def post_run_hook(self, instance, status):
|
|
"""
|
|
Hook for any steps to run before job/task is marked as complete.
|
|
"""
|
|
instance.log_lifecycle("post_run")
|
|
|
|
def final_run_hook(self, instance, status, private_data_dir, fact_modification_times):
|
|
"""
|
|
Hook for any steps to run after job/task is marked as complete.
|
|
"""
|
|
instance.log_lifecycle("finalize_run")
|
|
job_profiling_dir = os.path.join(private_data_dir, 'artifacts/playbook_profiling')
|
|
awx_profiling_dir = '/var/log/tower/playbook_profiling/'
|
|
collections_info = os.path.join(private_data_dir, 'artifacts/', 'collections.json')
|
|
ansible_version_file = os.path.join(private_data_dir, 'artifacts/', 'ansible_version.txt')
|
|
|
|
if not os.path.exists(awx_profiling_dir):
|
|
os.mkdir(awx_profiling_dir)
|
|
if os.path.isdir(job_profiling_dir):
|
|
shutil.copytree(job_profiling_dir, os.path.join(awx_profiling_dir, str(instance.pk)))
|
|
if os.path.exists(collections_info):
|
|
with open(collections_info) as ee_json_info:
|
|
ee_collections_info = json.loads(ee_json_info.read())
|
|
instance.installed_collections = ee_collections_info
|
|
instance.save(update_fields=['installed_collections'])
|
|
if os.path.exists(ansible_version_file):
|
|
with open(ansible_version_file) as ee_ansible_info:
|
|
ansible_version_info = ee_ansible_info.readline()
|
|
instance.ansible_version = ansible_version_info
|
|
instance.save(update_fields=['ansible_version'])
|
|
|
|
def event_handler(self, event_data):
|
|
#
|
|
# ⚠️ D-D-D-DANGER ZONE ⚠️
|
|
# This method is called once for *every event* emitted by Ansible
|
|
# Runner as a playbook runs. That means that changes to the code in
|
|
# this method are _very_ likely to introduce performance regressions.
|
|
#
|
|
# Even if this function is made on average .05s slower, it can have
|
|
# devastating performance implications for playbooks that emit
|
|
# tens or hundreds of thousands of events.
|
|
#
|
|
# Proceed with caution!
|
|
#
|
|
"""
|
|
Ansible runner puts a parent_uuid on each event, no matter what the type.
|
|
AWX only saves the parent_uuid if the event is for a Job.
|
|
"""
|
|
# cache end_line locally for RunInventoryUpdate tasks
|
|
# which generate job events from two 'streams':
|
|
# ansible-inventory and the awx.main.commands.inventory_import
|
|
# logger
|
|
if isinstance(self, RunInventoryUpdate):
|
|
self.end_line = event_data['end_line']
|
|
|
|
if event_data.get(self.event_data_key, None):
|
|
if self.event_data_key != 'job_id':
|
|
event_data.pop('parent_uuid', None)
|
|
if self.parent_workflow_job_id:
|
|
event_data['workflow_job_id'] = self.parent_workflow_job_id
|
|
# Do we have to check if the field exists? if it doesn't
|
|
# how will be eventually store the event in the db?
|
|
if self.job_created:
|
|
event_data['job_created'] = self.job_created
|
|
if self.host_map:
|
|
host = event_data.get('event_data', {}).get('host', '').strip()
|
|
if host:
|
|
event_data['host_name'] = host
|
|
if host in self.host_map:
|
|
event_data['host_id'] = self.host_map[host]
|
|
else:
|
|
event_data['host_name'] = ''
|
|
event_data['host_id'] = ''
|
|
if event_data.get('event') == 'playbook_on_stats':
|
|
event_data['host_map'] = self.host_map
|
|
|
|
if isinstance(self, RunProjectUpdate):
|
|
# it's common for Ansible's SCM modules to print
|
|
# error messages on failure that contain the plaintext
|
|
# basic auth credentials (username + password)
|
|
# it's also common for the nested event data itself (['res']['...'])
|
|
# to contain unredacted text on failure
|
|
# this is a _little_ expensive to filter
|
|
# with regex, but project updates don't have many events,
|
|
# so it *should* have a negligible performance impact
|
|
task = event_data.get('event_data', {}).get('task_action')
|
|
try:
|
|
if task in ('git', 'svn'):
|
|
event_data_json = json.dumps(event_data)
|
|
event_data_json = UriCleaner.remove_sensitive(event_data_json)
|
|
event_data = json.loads(event_data_json)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
if 'event_data' in event_data:
|
|
event_data['event_data']['guid'] = self.guid
|
|
|
|
event_data.setdefault(self.event_data_key, self.instance.id)
|
|
self.dispatcher.dispatch(event_data)
|
|
self.event_ct += 1
|
|
|
|
'''
|
|
Handle artifacts
|
|
'''
|
|
if event_data.get('event_data', {}).get('artifact_data', {}):
|
|
self.instance.artifacts = event_data['event_data']['artifact_data']
|
|
self.instance.save(update_fields=['artifacts'])
|
|
|
|
return False
|
|
|
|
def cancel_callback(self):
|
|
"""
|
|
Ansible runner callback to tell the job when/if it is canceled
|
|
"""
|
|
unified_job_id = self.instance.pk
|
|
self.instance = self.update_model(unified_job_id)
|
|
if not self.instance:
|
|
logger.error('unified job {} was deleted while running, canceling'.format(unified_job_id))
|
|
return True
|
|
if self.instance.cancel_flag or self.instance.status == 'canceled':
|
|
cancel_wait = (now() - self.instance.modified).seconds if self.instance.modified else 0
|
|
if cancel_wait > 5:
|
|
logger.warn('Request to cancel {} took {} seconds to complete.'.format(self.instance.log_format, cancel_wait))
|
|
return True
|
|
return False
|
|
|
|
def finished_callback(self, runner_obj):
|
|
"""
|
|
Ansible runner callback triggered on finished run
|
|
"""
|
|
event_data = {
|
|
'event': 'EOF',
|
|
'final_counter': self.event_ct,
|
|
'guid': self.guid,
|
|
}
|
|
event_data.setdefault(self.event_data_key, self.instance.id)
|
|
self.dispatcher.dispatch(event_data)
|
|
|
|
def status_handler(self, status_data, runner_config):
|
|
"""
|
|
Ansible runner callback triggered on status transition
|
|
"""
|
|
if status_data['status'] == 'starting':
|
|
job_env = dict(runner_config.env)
|
|
'''
|
|
Take the safe environment variables and overwrite
|
|
'''
|
|
for k, v in self.safe_env.items():
|
|
if k in job_env:
|
|
job_env[k] = v
|
|
self.instance = self.update_model(self.instance.pk, job_args=json.dumps(runner_config.command), job_cwd=runner_config.cwd, job_env=job_env)
|
|
elif status_data['status'] == 'error':
|
|
result_traceback = status_data.get('result_traceback', None)
|
|
if result_traceback:
|
|
self.instance = self.update_model(self.instance.pk, result_traceback=result_traceback)
|
|
|
|
@with_path_cleanup
|
|
def run(self, pk, **kwargs):
|
|
"""
|
|
Run the job/task and capture its output.
|
|
"""
|
|
self.instance = self.model.objects.get(pk=pk)
|
|
|
|
if self.instance.execution_environment_id is None:
|
|
from awx.main.signals import disable_activity_stream
|
|
|
|
with disable_activity_stream():
|
|
self.instance = self.update_model(self.instance.pk, execution_environment=self.instance.resolve_execution_environment())
|
|
|
|
# self.instance because of the update_model pattern and when it's used in callback handlers
|
|
self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords
|
|
self.instance.websocket_emit_status("running")
|
|
status, rc = 'error', None
|
|
extra_update_fields = {}
|
|
fact_modification_times = {}
|
|
self.event_ct = 0
|
|
|
|
'''
|
|
Needs to be an object property because status_handler uses it in a callback context
|
|
'''
|
|
self.safe_env = {}
|
|
self.safe_cred_env = {}
|
|
private_data_dir = None
|
|
|
|
# store a reference to the parent workflow job (if any) so we can include
|
|
# it in event data JSON
|
|
if self.instance.spawned_by_workflow:
|
|
self.parent_workflow_job_id = self.instance.get_workflow_job().id
|
|
|
|
self.job_created = str(self.instance.created)
|
|
|
|
try:
|
|
self.instance.send_notification_templates("running")
|
|
private_data_dir = self.build_private_data_dir(self.instance)
|
|
self.pre_run_hook(self.instance, private_data_dir)
|
|
self.instance.log_lifecycle("preparing_playbook")
|
|
if self.instance.cancel_flag:
|
|
self.instance = self.update_model(self.instance.pk, status='canceled')
|
|
if self.instance.status != 'running':
|
|
# Stop the task chain and prevent starting the job if it has
|
|
# already been canceled.
|
|
self.instance = self.update_model(pk)
|
|
status = self.instance.status
|
|
raise RuntimeError('not starting %s task' % self.instance.status)
|
|
|
|
if not os.path.exists(settings.AWX_ISOLATION_BASE_PATH):
|
|
raise RuntimeError('AWX_ISOLATION_BASE_PATH=%s does not exist' % settings.AWX_ISOLATION_BASE_PATH)
|
|
|
|
# store a record of the venv used at runtime
|
|
if hasattr(self.instance, 'custom_virtualenv'):
|
|
self.update_model(pk, custom_virtualenv=getattr(self.instance, 'ansible_virtualenv_path', settings.ANSIBLE_VENV_PATH))
|
|
|
|
# Fetch "cached" fact data from prior runs and put on the disk
|
|
# where ansible expects to find it
|
|
if getattr(self.instance, 'use_fact_cache', False):
|
|
self.instance.start_job_fact_cache(
|
|
os.path.join(private_data_dir, 'artifacts', str(self.instance.id), 'fact_cache'),
|
|
fact_modification_times,
|
|
)
|
|
|
|
# May have to serialize the value
|
|
private_data_files = self.build_private_data_files(self.instance, private_data_dir)
|
|
passwords = self.build_passwords(self.instance, kwargs)
|
|
self.build_extra_vars_file(self.instance, private_data_dir)
|
|
args = self.build_args(self.instance, private_data_dir, passwords)
|
|
env = self.build_env(self.instance, private_data_dir, private_data_files=private_data_files)
|
|
self.safe_env = build_safe_env(env)
|
|
|
|
credentials = self.build_credentials_list(self.instance)
|
|
|
|
for credential in credentials:
|
|
if credential:
|
|
credential.credential_type.inject_credential(credential, env, self.safe_cred_env, args, private_data_dir)
|
|
|
|
self.safe_env.update(self.safe_cred_env)
|
|
|
|
self.write_args_file(private_data_dir, args)
|
|
|
|
password_prompts = self.get_password_prompts(passwords)
|
|
expect_passwords = self.create_expect_passwords_data_struct(password_prompts, passwords)
|
|
|
|
params = {
|
|
'ident': self.instance.id,
|
|
'private_data_dir': private_data_dir,
|
|
'playbook': self.build_playbook_path_relative_to_cwd(self.instance, private_data_dir),
|
|
'inventory': self.build_inventory(self.instance, private_data_dir),
|
|
'passwords': expect_passwords,
|
|
'envvars': env,
|
|
'settings': {
|
|
'job_timeout': self.get_instance_timeout(self.instance),
|
|
'suppress_ansible_output': True,
|
|
},
|
|
}
|
|
|
|
if isinstance(self.instance, AdHocCommand):
|
|
params['module'] = self.build_module_name(self.instance)
|
|
params['module_args'] = self.build_module_args(self.instance)
|
|
|
|
if getattr(self.instance, 'use_fact_cache', False):
|
|
# Enable Ansible fact cache.
|
|
params['fact_cache_type'] = 'jsonfile'
|
|
else:
|
|
# Disable Ansible fact cache.
|
|
params['fact_cache_type'] = ''
|
|
|
|
if self.instance.is_container_group_task or settings.IS_K8S:
|
|
params['envvars'].pop('HOME', None)
|
|
|
|
'''
|
|
Delete parameters if the values are None or empty array
|
|
'''
|
|
for v in ['passwords', 'playbook', 'inventory']:
|
|
if not params[v]:
|
|
del params[v]
|
|
|
|
self.dispatcher = CallbackQueueDispatcher()
|
|
|
|
self.instance.log_lifecycle("running_playbook")
|
|
if isinstance(self.instance, SystemJob):
|
|
res = ansible_runner.interface.run(
|
|
project_dir=settings.BASE_DIR,
|
|
event_handler=self.event_handler,
|
|
finished_callback=self.finished_callback,
|
|
status_handler=self.status_handler,
|
|
**params,
|
|
)
|
|
else:
|
|
receptor_job = AWXReceptorJob(self, params)
|
|
self.unit_id = receptor_job.unit_id
|
|
res = receptor_job.run()
|
|
|
|
if not res:
|
|
return
|
|
|
|
status = res.status
|
|
rc = res.rc
|
|
|
|
if status == 'timeout':
|
|
self.instance.job_explanation = "Job terminated due to timeout"
|
|
status = 'failed'
|
|
extra_update_fields['job_explanation'] = self.instance.job_explanation
|
|
# ensure failure notification sends even if playbook_on_stats event is not triggered
|
|
handle_success_and_failure_notifications.apply_async([self.instance.job.id])
|
|
|
|
except InvalidVirtualenvError as e:
|
|
extra_update_fields['job_explanation'] = e.message
|
|
logger.error('{} {}'.format(self.instance.log_format, e.message))
|
|
except Exception:
|
|
# this could catch programming or file system errors
|
|
extra_update_fields['result_traceback'] = traceback.format_exc()
|
|
logger.exception('%s Exception occurred while running task', self.instance.log_format)
|
|
finally:
|
|
logger.debug('%s finished running, producing %s events.', self.instance.log_format, self.event_ct)
|
|
|
|
try:
|
|
self.post_run_hook(self.instance, status)
|
|
except PostRunError as exc:
|
|
if status == 'successful':
|
|
status = exc.status
|
|
extra_update_fields['job_explanation'] = exc.args[0]
|
|
if exc.tb:
|
|
extra_update_fields['result_traceback'] = exc.tb
|
|
except Exception:
|
|
logger.exception('{} Post run hook errored.'.format(self.instance.log_format))
|
|
|
|
self.instance = self.update_model(pk)
|
|
self.instance = self.update_model(pk, status=status, emitted_events=self.event_ct, **extra_update_fields)
|
|
|
|
try:
|
|
self.final_run_hook(self.instance, status, private_data_dir, fact_modification_times)
|
|
except Exception:
|
|
logger.exception('{} Final run hook errored.'.format(self.instance.log_format))
|
|
|
|
self.instance.websocket_emit_status(status)
|
|
if status != 'successful':
|
|
if status == 'canceled':
|
|
raise AwxTaskError.TaskCancel(self.instance, rc)
|
|
else:
|
|
raise AwxTaskError.TaskError(self.instance, rc)
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
class RunJob(BaseTask):
|
|
"""
|
|
Run a job using ansible-playbook.
|
|
"""
|
|
|
|
model = Job
|
|
event_model = JobEvent
|
|
event_data_key = 'job_id'
|
|
|
|
def build_private_data(self, job, private_data_dir):
|
|
"""
|
|
Returns a dict of the form
|
|
{
|
|
'credentials': {
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
...
|
|
},
|
|
'certificates': {
|
|
<awx.main.models.Credential>: <signed SSH certificate data>,
|
|
<awx.main.models.Credential>: <signed SSH certificate data>,
|
|
...
|
|
}
|
|
}
|
|
"""
|
|
private_data = {'credentials': {}}
|
|
for credential in job.credentials.prefetch_related('input_sources__source_credential').all():
|
|
# If we were sent SSH credentials, decrypt them and send them
|
|
# back (they will be written to a temporary file).
|
|
if credential.has_input('ssh_key_data'):
|
|
private_data['credentials'][credential] = credential.get_input('ssh_key_data', default='')
|
|
if credential.has_input('ssh_public_key_data'):
|
|
private_data.setdefault('certificates', {})[credential] = credential.get_input('ssh_public_key_data', default='')
|
|
|
|
return private_data
|
|
|
|
def build_passwords(self, job, runtime_passwords):
|
|
"""
|
|
Build a dictionary of passwords for SSH private key, SSH user, sudo/su
|
|
and ansible-vault.
|
|
"""
|
|
passwords = super(RunJob, self).build_passwords(job, runtime_passwords)
|
|
cred = job.machine_credential
|
|
if cred:
|
|
for field in ('ssh_key_unlock', 'ssh_password', 'become_password', 'vault_password'):
|
|
value = runtime_passwords.get(field, cred.get_input('password' if field == 'ssh_password' else field, default=''))
|
|
if value not in ('', 'ASK'):
|
|
passwords[field] = value
|
|
|
|
for cred in job.vault_credentials:
|
|
field = 'vault_password'
|
|
vault_id = cred.get_input('vault_id', default=None)
|
|
if vault_id:
|
|
field = 'vault_password.{}'.format(vault_id)
|
|
if field in passwords:
|
|
raise RuntimeError('multiple vault credentials were specified with --vault-id {}@prompt'.format(vault_id))
|
|
value = runtime_passwords.get(field, cred.get_input('vault_password', default=''))
|
|
if value not in ('', 'ASK'):
|
|
passwords[field] = value
|
|
|
|
'''
|
|
Only 1 value can be provided for a unique prompt string. Prefer ssh
|
|
key unlock over network key unlock.
|
|
'''
|
|
if 'ssh_key_unlock' not in passwords:
|
|
for cred in job.network_credentials:
|
|
if cred.inputs.get('ssh_key_unlock'):
|
|
passwords['ssh_key_unlock'] = runtime_passwords.get('ssh_key_unlock', cred.get_input('ssh_key_unlock', default=''))
|
|
break
|
|
|
|
return passwords
|
|
|
|
def build_env(self, job, private_data_dir, private_data_files=None):
|
|
"""
|
|
Build environment dictionary for ansible-playbook.
|
|
"""
|
|
env = super(RunJob, self).build_env(job, private_data_dir, private_data_files=private_data_files)
|
|
if private_data_files is None:
|
|
private_data_files = {}
|
|
# Set environment variables needed for inventory and job event
|
|
# callbacks to work.
|
|
env['JOB_ID'] = str(job.pk)
|
|
env['INVENTORY_ID'] = str(job.inventory.pk)
|
|
if job.project:
|
|
env['PROJECT_REVISION'] = job.project.scm_revision
|
|
env['ANSIBLE_RETRY_FILES_ENABLED'] = "False"
|
|
env['MAX_EVENT_RES'] = str(settings.MAX_EVENT_RES_DATA)
|
|
if hasattr(settings, 'AWX_ANSIBLE_CALLBACK_PLUGINS') and settings.AWX_ANSIBLE_CALLBACK_PLUGINS:
|
|
env['ANSIBLE_CALLBACK_PLUGINS'] = ':'.join(settings.AWX_ANSIBLE_CALLBACK_PLUGINS)
|
|
env['AWX_HOST'] = settings.TOWER_URL_BASE
|
|
|
|
# Create a directory for ControlPath sockets that is unique to each job
|
|
cp_dir = os.path.join(private_data_dir, 'cp')
|
|
if not os.path.exists(cp_dir):
|
|
os.mkdir(cp_dir, 0o700)
|
|
# FIXME: more elegant way to manage this path in container
|
|
env['ANSIBLE_SSH_CONTROL_PATH_DIR'] = '/runner/cp'
|
|
|
|
# Set environment variables for cloud credentials.
|
|
cred_files = private_data_files.get('credentials', {})
|
|
for cloud_cred in job.cloud_credentials:
|
|
if cloud_cred and cloud_cred.credential_type.namespace == 'openstack' and cred_files.get(cloud_cred, ''):
|
|
env['OS_CLIENT_CONFIG_FILE'] = to_container_path(cred_files.get(cloud_cred, ''), private_data_dir)
|
|
|
|
for network_cred in job.network_credentials:
|
|
env['ANSIBLE_NET_USERNAME'] = network_cred.get_input('username', default='')
|
|
env['ANSIBLE_NET_PASSWORD'] = network_cred.get_input('password', default='')
|
|
|
|
ssh_keyfile = cred_files.get(network_cred, '')
|
|
if ssh_keyfile:
|
|
env['ANSIBLE_NET_SSH_KEYFILE'] = ssh_keyfile
|
|
|
|
authorize = network_cred.get_input('authorize', default=False)
|
|
env['ANSIBLE_NET_AUTHORIZE'] = str(int(authorize))
|
|
if authorize:
|
|
env['ANSIBLE_NET_AUTH_PASS'] = network_cred.get_input('authorize_password', default='')
|
|
|
|
path_vars = (
|
|
('ANSIBLE_COLLECTIONS_PATHS', 'collections_paths', 'requirements_collections', '~/.ansible/collections:/usr/share/ansible/collections'),
|
|
('ANSIBLE_ROLES_PATH', 'roles_path', 'requirements_roles', '~/.ansible/roles:/usr/share/ansible/roles:/etc/ansible/roles'),
|
|
)
|
|
|
|
config_values = read_ansible_config(job.project.get_project_path(), list(map(lambda x: x[1], path_vars)))
|
|
|
|
for env_key, config_setting, folder, default in path_vars:
|
|
paths = default.split(':')
|
|
if env_key in env:
|
|
for path in env[env_key].split(':'):
|
|
if path not in paths:
|
|
paths = [env[env_key]] + paths
|
|
elif config_setting in config_values:
|
|
for path in config_values[config_setting].split(':'):
|
|
if path not in paths:
|
|
paths = [config_values[config_setting]] + paths
|
|
paths = [os.path.join(CONTAINER_ROOT, folder)] + paths
|
|
env[env_key] = os.pathsep.join(paths)
|
|
|
|
return env
|
|
|
|
def build_args(self, job, private_data_dir, passwords):
|
|
"""
|
|
Build command line argument list for running ansible-playbook,
|
|
optionally using ssh-agent for public/private key authentication.
|
|
"""
|
|
creds = job.machine_credential
|
|
|
|
ssh_username, become_username, become_method = '', '', ''
|
|
if creds:
|
|
ssh_username = creds.get_input('username', default='')
|
|
become_method = creds.get_input('become_method', default='')
|
|
become_username = creds.get_input('become_username', default='')
|
|
else:
|
|
become_method = None
|
|
become_username = ""
|
|
# Always specify the normal SSH user as root by default. Since this
|
|
# task is normally running in the background under a service account,
|
|
# it doesn't make sense to rely on ansible-playbook's default of using
|
|
# the current user.
|
|
ssh_username = ssh_username or 'root'
|
|
args = []
|
|
if job.job_type == 'check':
|
|
args.append('--check')
|
|
args.extend(['-u', sanitize_jinja(ssh_username)])
|
|
if 'ssh_password' in passwords:
|
|
args.append('--ask-pass')
|
|
if job.become_enabled:
|
|
args.append('--become')
|
|
if job.diff_mode:
|
|
args.append('--diff')
|
|
if become_method:
|
|
args.extend(['--become-method', sanitize_jinja(become_method)])
|
|
if become_username:
|
|
args.extend(['--become-user', sanitize_jinja(become_username)])
|
|
if 'become_password' in passwords:
|
|
args.append('--ask-become-pass')
|
|
|
|
# Support prompting for multiple vault passwords
|
|
for k, v in passwords.items():
|
|
if k.startswith('vault_password'):
|
|
if k == 'vault_password':
|
|
args.append('--ask-vault-pass')
|
|
else:
|
|
# split only on the first dot in case the vault ID itself contains a dot
|
|
vault_id = k.split('.', 1)[1]
|
|
args.append('--vault-id')
|
|
args.append('{}@prompt'.format(vault_id))
|
|
|
|
if job.forks:
|
|
if settings.MAX_FORKS > 0 and job.forks > settings.MAX_FORKS:
|
|
logger.warning(f'Maximum number of forks ({settings.MAX_FORKS}) exceeded.')
|
|
args.append('--forks=%d' % settings.MAX_FORKS)
|
|
else:
|
|
args.append('--forks=%d' % job.forks)
|
|
if job.force_handlers:
|
|
args.append('--force-handlers')
|
|
if job.limit:
|
|
args.extend(['-l', job.limit])
|
|
if job.verbosity:
|
|
args.append('-%s' % ('v' * min(5, job.verbosity)))
|
|
if job.job_tags:
|
|
args.extend(['-t', job.job_tags])
|
|
if job.skip_tags:
|
|
args.append('--skip-tags=%s' % job.skip_tags)
|
|
if job.start_at_task:
|
|
args.append('--start-at-task=%s' % job.start_at_task)
|
|
|
|
return args
|
|
|
|
def build_playbook_path_relative_to_cwd(self, job, private_data_dir):
|
|
return job.playbook
|
|
|
|
def build_extra_vars_file(self, job, private_data_dir):
|
|
# Define special extra_vars for AWX, combine with job.extra_vars.
|
|
extra_vars = job.awx_meta_vars()
|
|
|
|
if job.extra_vars_dict:
|
|
extra_vars.update(json.loads(job.decrypted_extra_vars()))
|
|
|
|
# By default, all extra vars disallow Jinja2 template usage for
|
|
# security reasons; top level key-values defined in JT.extra_vars, however,
|
|
# are allowed as "safe" (because they can only be set by users with
|
|
# higher levels of privilege - those that have the ability create and
|
|
# edit Job Templates)
|
|
safe_dict = {}
|
|
if job.job_template and settings.ALLOW_JINJA_IN_EXTRA_VARS == 'template':
|
|
safe_dict = job.job_template.extra_vars_dict
|
|
|
|
return self._write_extra_vars_file(private_data_dir, extra_vars, safe_dict)
|
|
|
|
def build_credentials_list(self, job):
|
|
return job.credentials.prefetch_related('input_sources__source_credential').all()
|
|
|
|
def get_password_prompts(self, passwords={}):
|
|
d = super(RunJob, self).get_password_prompts(passwords)
|
|
d[r'Enter passphrase for .*:\s*?$'] = 'ssh_key_unlock'
|
|
d[r'Bad passphrase, try again for .*:\s*?$'] = ''
|
|
for method in PRIVILEGE_ESCALATION_METHODS:
|
|
d[r'%s password.*:\s*?$' % (method[0])] = 'become_password'
|
|
d[r'%s password.*:\s*?$' % (method[0].upper())] = 'become_password'
|
|
d[r'BECOME password.*:\s*?$'] = 'become_password'
|
|
d[r'SSH password:\s*?$'] = 'ssh_password'
|
|
d[r'Password:\s*?$'] = 'ssh_password'
|
|
d[r'Vault password:\s*?$'] = 'vault_password'
|
|
for k, v in passwords.items():
|
|
if k.startswith('vault_password.'):
|
|
# split only on the first dot in case the vault ID itself contains a dot
|
|
vault_id = k.split('.', 1)[1]
|
|
d[r'Vault password \({}\):\s*?$'.format(vault_id)] = k
|
|
return d
|
|
|
|
def build_execution_environment_params(self, instance, private_data_dir):
|
|
if settings.IS_K8S:
|
|
return {}
|
|
|
|
params = super(RunJob, self).build_execution_environment_params(instance, private_data_dir)
|
|
# If this has an insights agent and it is not already mounted then show it
|
|
insights_dir = os.path.dirname(settings.INSIGHTS_SYSTEM_ID_FILE)
|
|
if instance.use_fact_cache and os.path.exists(insights_dir):
|
|
logger.info('not parent of others')
|
|
params.setdefault('container_volume_mounts', [])
|
|
params['container_volume_mounts'].extend(
|
|
[
|
|
f"{insights_dir}:{insights_dir}:Z",
|
|
]
|
|
)
|
|
|
|
return params
|
|
|
|
def pre_run_hook(self, job, private_data_dir):
|
|
super(RunJob, self).pre_run_hook(job, private_data_dir)
|
|
if job.inventory is None:
|
|
error = _('Job could not start because it does not have a valid inventory.')
|
|
self.update_model(job.pk, status='failed', job_explanation=error)
|
|
raise RuntimeError(error)
|
|
elif job.project is None:
|
|
error = _('Job could not start because it does not have a valid project.')
|
|
self.update_model(job.pk, status='failed', job_explanation=error)
|
|
raise RuntimeError(error)
|
|
elif job.execution_environment is None:
|
|
error = _('Job could not start because no Execution Environment could be found.')
|
|
self.update_model(job.pk, status='error', job_explanation=error)
|
|
raise RuntimeError(error)
|
|
elif job.project.status in ('error', 'failed'):
|
|
msg = _('The project revision for this job template is unknown due to a failed update.')
|
|
job = self.update_model(job.pk, status='failed', job_explanation=msg)
|
|
raise RuntimeError(msg)
|
|
|
|
project_path = job.project.get_project_path(check_if_exists=False)
|
|
job_revision = job.project.scm_revision
|
|
sync_needs = []
|
|
source_update_tag = 'update_{}'.format(job.project.scm_type)
|
|
branch_override = bool(job.scm_branch and job.scm_branch != job.project.scm_branch)
|
|
if not job.project.scm_type:
|
|
pass # manual projects are not synced, user has responsibility for that
|
|
elif not os.path.exists(project_path):
|
|
logger.debug('Performing fresh clone of {} on this instance.'.format(job.project))
|
|
sync_needs.append(source_update_tag)
|
|
elif job.project.scm_type == 'git' and job.project.scm_revision and (not branch_override):
|
|
try:
|
|
git_repo = git.Repo(project_path)
|
|
|
|
if job_revision == git_repo.head.commit.hexsha:
|
|
logger.debug('Skipping project sync for {} because commit is locally available'.format(job.log_format))
|
|
else:
|
|
sync_needs.append(source_update_tag)
|
|
except (ValueError, BadGitName, git.exc.InvalidGitRepositoryError):
|
|
logger.debug('Needed commit for {} not in local source tree, will sync with remote'.format(job.log_format))
|
|
sync_needs.append(source_update_tag)
|
|
else:
|
|
logger.debug('Project not available locally, {} will sync with remote'.format(job.log_format))
|
|
sync_needs.append(source_update_tag)
|
|
|
|
has_cache = os.path.exists(os.path.join(job.project.get_cache_path(), job.project.cache_id))
|
|
# Galaxy requirements are not supported for manual projects
|
|
if job.project.scm_type and ((not has_cache) or branch_override):
|
|
sync_needs.extend(['install_roles', 'install_collections'])
|
|
|
|
if sync_needs:
|
|
pu_ig = job.instance_group
|
|
pu_en = job.execution_node
|
|
|
|
sync_metafields = dict(
|
|
launch_type="sync",
|
|
job_type='run',
|
|
job_tags=','.join(sync_needs),
|
|
status='running',
|
|
instance_group=pu_ig,
|
|
execution_node=pu_en,
|
|
celery_task_id=job.celery_task_id,
|
|
)
|
|
if branch_override:
|
|
sync_metafields['scm_branch'] = job.scm_branch
|
|
sync_metafields['scm_clean'] = True # to accomidate force pushes
|
|
if 'update_' not in sync_metafields['job_tags']:
|
|
sync_metafields['scm_revision'] = job_revision
|
|
local_project_sync = job.project.create_project_update(_eager_fields=sync_metafields)
|
|
# save the associated job before calling run() so that a
|
|
# cancel() call on the job can cancel the project update
|
|
job = self.update_model(job.pk, project_update=local_project_sync)
|
|
|
|
project_update_task = local_project_sync._get_task_class()
|
|
try:
|
|
# the job private_data_dir is passed so sync can download roles and collections there
|
|
sync_task = project_update_task(job_private_data_dir=private_data_dir)
|
|
sync_task.run(local_project_sync.id)
|
|
local_project_sync.refresh_from_db()
|
|
job = self.update_model(job.pk, scm_revision=local_project_sync.scm_revision)
|
|
except Exception:
|
|
local_project_sync.refresh_from_db()
|
|
if local_project_sync.status != 'canceled':
|
|
job = self.update_model(
|
|
job.pk,
|
|
status='failed',
|
|
job_explanation=(
|
|
'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}'
|
|
% ('project_update', local_project_sync.name, local_project_sync.id)
|
|
),
|
|
)
|
|
raise
|
|
job.refresh_from_db()
|
|
if job.cancel_flag:
|
|
return
|
|
else:
|
|
# Case where a local sync is not needed, meaning that local tree is
|
|
# up-to-date with project, job is running project current version
|
|
if job_revision:
|
|
job = self.update_model(job.pk, scm_revision=job_revision)
|
|
# Project update does not copy the folder, so copy here
|
|
RunProjectUpdate.make_local_copy(job.project, private_data_dir, scm_revision=job_revision)
|
|
|
|
if job.inventory.kind == 'smart':
|
|
# cache smart inventory memberships so that the host_filter query is not
|
|
# ran inside of the event saving code
|
|
update_smart_memberships_for_inventory(job.inventory)
|
|
|
|
def final_run_hook(self, job, status, private_data_dir, fact_modification_times):
|
|
super(RunJob, self).final_run_hook(job, status, private_data_dir, fact_modification_times)
|
|
if not private_data_dir:
|
|
# If there's no private data dir, that means we didn't get into the
|
|
# actual `run()` call; this _usually_ means something failed in
|
|
# the pre_run_hook method
|
|
return
|
|
if job.use_fact_cache:
|
|
job.finish_job_fact_cache(
|
|
os.path.join(private_data_dir, 'artifacts', 'fact_cache'),
|
|
fact_modification_times,
|
|
)
|
|
|
|
try:
|
|
inventory = job.inventory
|
|
except Inventory.DoesNotExist:
|
|
pass
|
|
else:
|
|
if inventory is not None:
|
|
update_inventory_computed_fields.delay(inventory.id)
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
class RunProjectUpdate(BaseTask):
|
|
|
|
model = ProjectUpdate
|
|
event_model = ProjectUpdateEvent
|
|
event_data_key = 'project_update_id'
|
|
|
|
def __init__(self, *args, job_private_data_dir=None, **kwargs):
|
|
super(RunProjectUpdate, self).__init__(*args, **kwargs)
|
|
self.playbook_new_revision = None
|
|
self.original_branch = None
|
|
self.job_private_data_dir = job_private_data_dir
|
|
|
|
def event_handler(self, event_data):
|
|
super(RunProjectUpdate, self).event_handler(event_data)
|
|
returned_data = event_data.get('event_data', {})
|
|
if returned_data.get('task_action', '') == 'set_fact':
|
|
returned_facts = returned_data.get('res', {}).get('ansible_facts', {})
|
|
if 'scm_version' in returned_facts:
|
|
self.playbook_new_revision = returned_facts['scm_version']
|
|
|
|
def build_private_data(self, project_update, private_data_dir):
|
|
"""
|
|
Return SSH private key data needed for this project update.
|
|
|
|
Returns a dict of the form
|
|
{
|
|
'credentials': {
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>
|
|
}
|
|
}
|
|
"""
|
|
private_data = {'credentials': {}}
|
|
if project_update.credential:
|
|
credential = project_update.credential
|
|
if credential.has_input('ssh_key_data'):
|
|
private_data['credentials'][credential] = credential.get_input('ssh_key_data', default='')
|
|
return private_data
|
|
|
|
def build_passwords(self, project_update, runtime_passwords):
|
|
"""
|
|
Build a dictionary of passwords for SSH private key unlock and SCM
|
|
username/password.
|
|
"""
|
|
passwords = super(RunProjectUpdate, self).build_passwords(project_update, runtime_passwords)
|
|
if project_update.credential:
|
|
passwords['scm_key_unlock'] = project_update.credential.get_input('ssh_key_unlock', default='')
|
|
passwords['scm_username'] = project_update.credential.get_input('username', default='')
|
|
passwords['scm_password'] = project_update.credential.get_input('password', default='')
|
|
return passwords
|
|
|
|
def build_env(self, project_update, private_data_dir, private_data_files=None):
|
|
"""
|
|
Build environment dictionary for ansible-playbook.
|
|
"""
|
|
env = super(RunProjectUpdate, self).build_env(project_update, private_data_dir, private_data_files=private_data_files)
|
|
env['ANSIBLE_RETRY_FILES_ENABLED'] = str(False)
|
|
env['ANSIBLE_ASK_PASS'] = str(False)
|
|
env['ANSIBLE_BECOME_ASK_PASS'] = str(False)
|
|
env['DISPLAY'] = '' # Prevent stupid password popup when running tests.
|
|
# give ansible a hint about the intended tmpdir to work around issues
|
|
# like https://github.com/ansible/ansible/issues/30064
|
|
env['TMP'] = settings.AWX_ISOLATION_BASE_PATH
|
|
env['PROJECT_UPDATE_ID'] = str(project_update.pk)
|
|
if settings.GALAXY_IGNORE_CERTS:
|
|
env['ANSIBLE_GALAXY_IGNORE'] = True
|
|
|
|
# build out env vars for Galaxy credentials (in order)
|
|
galaxy_server_list = []
|
|
if project_update.project.organization:
|
|
for i, cred in enumerate(project_update.project.organization.galaxy_credentials.all()):
|
|
env[f'ANSIBLE_GALAXY_SERVER_SERVER{i}_URL'] = cred.get_input('url')
|
|
auth_url = cred.get_input('auth_url', default=None)
|
|
token = cred.get_input('token', default=None)
|
|
if token:
|
|
env[f'ANSIBLE_GALAXY_SERVER_SERVER{i}_TOKEN'] = token
|
|
if auth_url:
|
|
env[f'ANSIBLE_GALAXY_SERVER_SERVER{i}_AUTH_URL'] = auth_url
|
|
galaxy_server_list.append(f'server{i}')
|
|
|
|
if galaxy_server_list:
|
|
env['ANSIBLE_GALAXY_SERVER_LIST'] = ','.join(galaxy_server_list)
|
|
|
|
return env
|
|
|
|
def _build_scm_url_extra_vars(self, project_update):
|
|
"""
|
|
Helper method to build SCM url and extra vars with parameters needed
|
|
for authentication.
|
|
"""
|
|
extra_vars = {}
|
|
if project_update.credential:
|
|
scm_username = project_update.credential.get_input('username', default='')
|
|
scm_password = project_update.credential.get_input('password', default='')
|
|
else:
|
|
scm_username = ''
|
|
scm_password = ''
|
|
scm_type = project_update.scm_type
|
|
scm_url = update_scm_url(scm_type, project_update.scm_url, check_special_cases=False)
|
|
scm_url_parts = urlparse.urlsplit(scm_url)
|
|
# Prefer the username/password in the URL, if provided.
|
|
scm_username = scm_url_parts.username or scm_username
|
|
scm_password = scm_url_parts.password or scm_password
|
|
if scm_username:
|
|
if scm_type == 'svn':
|
|
extra_vars['scm_username'] = scm_username
|
|
extra_vars['scm_password'] = scm_password
|
|
scm_password = False
|
|
if scm_url_parts.scheme != 'svn+ssh':
|
|
scm_username = False
|
|
elif scm_url_parts.scheme.endswith('ssh'):
|
|
scm_password = False
|
|
elif scm_type in ('insights', 'archive'):
|
|
extra_vars['scm_username'] = scm_username
|
|
extra_vars['scm_password'] = scm_password
|
|
scm_url = update_scm_url(scm_type, scm_url, scm_username, scm_password, scp_format=True)
|
|
else:
|
|
scm_url = update_scm_url(scm_type, scm_url, scp_format=True)
|
|
|
|
# Pass the extra accept_hostkey parameter to the git module.
|
|
if scm_type == 'git' and scm_url_parts.scheme.endswith('ssh'):
|
|
extra_vars['scm_accept_hostkey'] = 'true'
|
|
|
|
return scm_url, extra_vars
|
|
|
|
def build_inventory(self, instance, private_data_dir):
|
|
return 'localhost,'
|
|
|
|
def build_args(self, project_update, private_data_dir, passwords):
|
|
"""
|
|
Build command line argument list for running ansible-playbook,
|
|
optionally using ssh-agent for public/private key authentication.
|
|
"""
|
|
args = []
|
|
if getattr(settings, 'PROJECT_UPDATE_VVV', False):
|
|
args.append('-vvv')
|
|
if project_update.job_tags:
|
|
args.extend(['-t', project_update.job_tags])
|
|
return args
|
|
|
|
def build_extra_vars_file(self, project_update, private_data_dir):
|
|
extra_vars = {}
|
|
scm_url, extra_vars_new = self._build_scm_url_extra_vars(project_update)
|
|
extra_vars.update(extra_vars_new)
|
|
|
|
scm_branch = project_update.scm_branch
|
|
if project_update.job_type == 'run' and (not project_update.branch_override):
|
|
if project_update.project.scm_revision:
|
|
scm_branch = project_update.project.scm_revision
|
|
elif not scm_branch:
|
|
raise RuntimeError('Could not determine a revision to run from project.')
|
|
elif not scm_branch:
|
|
scm_branch = 'HEAD'
|
|
|
|
galaxy_creds_are_defined = project_update.project.organization and project_update.project.organization.galaxy_credentials.exists()
|
|
if not galaxy_creds_are_defined and (settings.AWX_ROLES_ENABLED or settings.AWX_COLLECTIONS_ENABLED):
|
|
logger.warning('Galaxy role/collection syncing is enabled, but no ' f'credentials are configured for {project_update.project.organization}.')
|
|
|
|
extra_vars.update(
|
|
{
|
|
'projects_root': settings.PROJECTS_ROOT.rstrip('/'),
|
|
'local_path': os.path.basename(project_update.project.local_path),
|
|
'project_path': project_update.get_project_path(check_if_exists=False), # deprecated
|
|
'insights_url': settings.INSIGHTS_URL_BASE,
|
|
'awx_license_type': get_license().get('license_type', 'UNLICENSED'),
|
|
'awx_version': get_awx_version(),
|
|
'scm_url': scm_url,
|
|
'scm_branch': scm_branch,
|
|
'scm_clean': project_update.scm_clean,
|
|
'scm_track_submodules': project_update.scm_track_submodules,
|
|
'roles_enabled': galaxy_creds_are_defined and settings.AWX_ROLES_ENABLED,
|
|
'collections_enabled': galaxy_creds_are_defined and settings.AWX_COLLECTIONS_ENABLED,
|
|
}
|
|
)
|
|
# apply custom refspec from user for PR refs and the like
|
|
if project_update.scm_refspec:
|
|
extra_vars['scm_refspec'] = project_update.scm_refspec
|
|
elif project_update.project.allow_override:
|
|
# If branch is override-able, do extra fetch for all branches
|
|
extra_vars['scm_refspec'] = 'refs/heads/*:refs/remotes/origin/*'
|
|
|
|
if project_update.scm_type == 'archive':
|
|
# for raw archive, prevent error moving files between volumes
|
|
extra_vars['ansible_remote_tmp'] = os.path.join(project_update.get_project_path(check_if_exists=False), '.ansible_awx', 'tmp')
|
|
|
|
self._write_extra_vars_file(private_data_dir, extra_vars)
|
|
|
|
def build_playbook_path_relative_to_cwd(self, project_update, private_data_dir):
|
|
return os.path.join('project_update.yml')
|
|
|
|
def get_password_prompts(self, passwords={}):
|
|
d = super(RunProjectUpdate, self).get_password_prompts(passwords)
|
|
d[r'Username for.*:\s*?$'] = 'scm_username'
|
|
d[r'Password for.*:\s*?$'] = 'scm_password'
|
|
d[r'Password:\s*?$'] = 'scm_password'
|
|
d[r'\S+?@\S+?\'s\s+?password:\s*?$'] = 'scm_password'
|
|
d[r'Enter passphrase for .*:\s*?$'] = 'scm_key_unlock'
|
|
d[r'Bad passphrase, try again for .*:\s*?$'] = ''
|
|
# FIXME: Configure whether we should auto accept host keys?
|
|
d[r'^Are you sure you want to continue connecting \(yes/no\)\?\s*?$'] = 'yes'
|
|
return d
|
|
|
|
def _update_dependent_inventories(self, project_update, dependent_inventory_sources):
|
|
scm_revision = project_update.project.scm_revision
|
|
inv_update_class = InventoryUpdate._get_task_class()
|
|
for inv_src in dependent_inventory_sources:
|
|
if not inv_src.update_on_project_update:
|
|
continue
|
|
if inv_src.scm_last_revision == scm_revision:
|
|
logger.debug('Skipping SCM inventory update for `{}` because ' 'project has not changed.'.format(inv_src.name))
|
|
continue
|
|
logger.debug('Local dependent inventory update for `{}`.'.format(inv_src.name))
|
|
with transaction.atomic():
|
|
if InventoryUpdate.objects.filter(inventory_source=inv_src, status__in=ACTIVE_STATES).exists():
|
|
logger.debug('Skipping SCM inventory update for `{}` because ' 'another update is already active.'.format(inv_src.name))
|
|
continue
|
|
local_inv_update = inv_src.create_inventory_update(
|
|
_eager_fields=dict(
|
|
launch_type='scm',
|
|
status='running',
|
|
instance_group=project_update.instance_group,
|
|
execution_node=project_update.execution_node,
|
|
source_project_update=project_update,
|
|
celery_task_id=project_update.celery_task_id,
|
|
)
|
|
)
|
|
try:
|
|
inv_update_class().run(local_inv_update.id)
|
|
except Exception:
|
|
logger.exception('{} Unhandled exception updating dependent SCM inventory sources.'.format(project_update.log_format))
|
|
|
|
try:
|
|
project_update.refresh_from_db()
|
|
except ProjectUpdate.DoesNotExist:
|
|
logger.warning('Project update deleted during updates of dependent SCM inventory sources.')
|
|
break
|
|
try:
|
|
local_inv_update.refresh_from_db()
|
|
except InventoryUpdate.DoesNotExist:
|
|
logger.warning('%s Dependent inventory update deleted during execution.', project_update.log_format)
|
|
continue
|
|
if project_update.cancel_flag:
|
|
logger.info('Project update {} was canceled while updating dependent inventories.'.format(project_update.log_format))
|
|
break
|
|
if local_inv_update.cancel_flag:
|
|
logger.info('Continuing to process project dependencies after {} was canceled'.format(local_inv_update.log_format))
|
|
if local_inv_update.status == 'successful':
|
|
inv_src.scm_last_revision = scm_revision
|
|
inv_src.save(update_fields=['scm_last_revision'])
|
|
|
|
def release_lock(self, instance):
|
|
try:
|
|
fcntl.lockf(self.lock_fd, fcntl.LOCK_UN)
|
|
except IOError as e:
|
|
logger.error("I/O error({0}) while trying to release lock file [{1}]: {2}".format(e.errno, instance.get_lock_file(), e.strerror))
|
|
os.close(self.lock_fd)
|
|
raise
|
|
|
|
os.close(self.lock_fd)
|
|
self.lock_fd = None
|
|
|
|
'''
|
|
Note: We don't support blocking=False
|
|
'''
|
|
|
|
def acquire_lock(self, instance, blocking=True):
|
|
lock_path = instance.get_lock_file()
|
|
if lock_path is None:
|
|
# If from migration or someone blanked local_path for any other reason, recoverable by save
|
|
instance.save()
|
|
lock_path = instance.get_lock_file()
|
|
if lock_path is None:
|
|
raise RuntimeError(u'Invalid lock file path')
|
|
|
|
try:
|
|
self.lock_fd = os.open(lock_path, os.O_RDWR | os.O_CREAT)
|
|
except OSError as e:
|
|
logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
|
raise
|
|
|
|
start_time = time.time()
|
|
while True:
|
|
try:
|
|
instance.refresh_from_db(fields=['cancel_flag'])
|
|
if instance.cancel_flag:
|
|
logger.debug("ProjectUpdate({0}) was canceled".format(instance.pk))
|
|
return
|
|
fcntl.lockf(self.lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
break
|
|
except IOError as e:
|
|
if e.errno not in (errno.EAGAIN, errno.EACCES):
|
|
os.close(self.lock_fd)
|
|
logger.error("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(e.errno, lock_path, e.strerror))
|
|
raise
|
|
else:
|
|
time.sleep(1.0)
|
|
waiting_time = time.time() - start_time
|
|
|
|
if waiting_time > 1.0:
|
|
logger.info('{} spent {} waiting to acquire lock for local source tree ' 'for path {}.'.format(instance.log_format, waiting_time, lock_path))
|
|
|
|
def pre_run_hook(self, instance, private_data_dir):
|
|
super(RunProjectUpdate, self).pre_run_hook(instance, private_data_dir)
|
|
# re-create root project folder if a natural disaster has destroyed it
|
|
if not os.path.exists(settings.PROJECTS_ROOT):
|
|
os.mkdir(settings.PROJECTS_ROOT)
|
|
project_path = instance.project.get_project_path(check_if_exists=False)
|
|
if not os.path.exists(project_path):
|
|
os.makedirs(project_path) # used as container mount
|
|
|
|
self.acquire_lock(instance)
|
|
|
|
self.original_branch = None
|
|
if instance.scm_type == 'git' and instance.branch_override:
|
|
if os.path.exists(project_path):
|
|
git_repo = git.Repo(project_path)
|
|
if git_repo.head.is_detached:
|
|
self.original_branch = git_repo.head.commit
|
|
else:
|
|
self.original_branch = git_repo.active_branch
|
|
|
|
stage_path = os.path.join(instance.get_cache_path(), 'stage')
|
|
if os.path.exists(stage_path):
|
|
logger.warning('{0} unexpectedly existed before update'.format(stage_path))
|
|
shutil.rmtree(stage_path)
|
|
os.makedirs(stage_path) # presence of empty cache indicates lack of roles or collections
|
|
|
|
# the project update playbook is not in a git repo, but uses a vendoring directory
|
|
# to be consistent with the ansible-runner model,
|
|
# that is moved into the runner project folder here
|
|
awx_playbooks = self.get_path_to('..', 'playbooks')
|
|
copy_tree(awx_playbooks, os.path.join(private_data_dir, 'project'))
|
|
|
|
@staticmethod
|
|
def clear_project_cache(cache_dir, keep_value):
|
|
if os.path.isdir(cache_dir):
|
|
for entry in os.listdir(cache_dir):
|
|
old_path = os.path.join(cache_dir, entry)
|
|
if entry not in (keep_value, 'stage'):
|
|
# invalidate, then delete
|
|
new_path = os.path.join(cache_dir, '.~~delete~~' + entry)
|
|
try:
|
|
os.rename(old_path, new_path)
|
|
shutil.rmtree(new_path)
|
|
except OSError:
|
|
logger.warning(f"Could not remove cache directory {old_path}")
|
|
|
|
@staticmethod
|
|
def make_local_copy(p, job_private_data_dir, scm_revision=None):
|
|
"""Copy project content (roles and collections) to a job private_data_dir
|
|
|
|
:param object p: Either a project or a project update
|
|
:param str job_private_data_dir: The root of the target ansible-runner folder
|
|
:param str scm_revision: For branch_override cases, the git revision to copy
|
|
"""
|
|
project_path = p.get_project_path(check_if_exists=False)
|
|
destination_folder = os.path.join(job_private_data_dir, 'project')
|
|
if not scm_revision:
|
|
scm_revision = p.scm_revision
|
|
|
|
if p.scm_type == 'git':
|
|
git_repo = git.Repo(project_path)
|
|
if not os.path.exists(destination_folder):
|
|
os.mkdir(destination_folder, stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC)
|
|
tmp_branch_name = 'awx_internal/{}'.format(uuid4())
|
|
# always clone based on specific job revision
|
|
if not p.scm_revision:
|
|
raise RuntimeError('Unexpectedly could not determine a revision to run from project.')
|
|
source_branch = git_repo.create_head(tmp_branch_name, p.scm_revision)
|
|
# git clone must take file:// syntax for source repo or else options like depth will be ignored
|
|
source_as_uri = Path(project_path).as_uri()
|
|
git.Repo.clone_from(
|
|
source_as_uri,
|
|
destination_folder,
|
|
branch=source_branch,
|
|
depth=1,
|
|
single_branch=True, # shallow, do not copy full history
|
|
)
|
|
# submodules copied in loop because shallow copies from local HEADs are ideal
|
|
# and no git clone submodule options are compatible with minimum requirements
|
|
for submodule in git_repo.submodules:
|
|
subrepo_path = os.path.abspath(os.path.join(project_path, submodule.path))
|
|
subrepo_destination_folder = os.path.abspath(os.path.join(destination_folder, submodule.path))
|
|
subrepo_uri = Path(subrepo_path).as_uri()
|
|
git.Repo.clone_from(subrepo_uri, subrepo_destination_folder, depth=1, single_branch=True)
|
|
# force option is necessary because remote refs are not counted, although no information is lost
|
|
git_repo.delete_head(tmp_branch_name, force=True)
|
|
else:
|
|
copy_tree(project_path, destination_folder, preserve_symlinks=1)
|
|
|
|
# copy over the roles and collection cache to job folder
|
|
cache_path = os.path.join(p.get_cache_path(), p.cache_id)
|
|
subfolders = []
|
|
if settings.AWX_COLLECTIONS_ENABLED:
|
|
subfolders.append('requirements_collections')
|
|
if settings.AWX_ROLES_ENABLED:
|
|
subfolders.append('requirements_roles')
|
|
for subfolder in subfolders:
|
|
cache_subpath = os.path.join(cache_path, subfolder)
|
|
if os.path.exists(cache_subpath):
|
|
dest_subpath = os.path.join(job_private_data_dir, subfolder)
|
|
copy_tree(cache_subpath, dest_subpath, preserve_symlinks=1)
|
|
logger.debug('{0} {1} prepared {2} from cache'.format(type(p).__name__, p.pk, dest_subpath))
|
|
|
|
def post_run_hook(self, instance, status):
|
|
super(RunProjectUpdate, self).post_run_hook(instance, status)
|
|
# To avoid hangs, very important to release lock even if errors happen here
|
|
try:
|
|
if self.playbook_new_revision:
|
|
instance.scm_revision = self.playbook_new_revision
|
|
instance.save(update_fields=['scm_revision'])
|
|
|
|
# Roles and collection folders copy to durable cache
|
|
base_path = instance.get_cache_path()
|
|
stage_path = os.path.join(base_path, 'stage')
|
|
if status == 'successful' and 'install_' in instance.job_tags:
|
|
# Clear other caches before saving this one, and if branch is overridden
|
|
# do not clear cache for main branch, but do clear it for other branches
|
|
self.clear_project_cache(base_path, keep_value=instance.project.cache_id)
|
|
cache_path = os.path.join(base_path, instance.cache_id)
|
|
if os.path.exists(stage_path):
|
|
if os.path.exists(cache_path):
|
|
logger.warning('Rewriting cache at {0}, performance may suffer'.format(cache_path))
|
|
shutil.rmtree(cache_path)
|
|
os.rename(stage_path, cache_path)
|
|
logger.debug('{0} wrote to cache at {1}'.format(instance.log_format, cache_path))
|
|
elif os.path.exists(stage_path):
|
|
shutil.rmtree(stage_path) # cannot trust content update produced
|
|
|
|
if self.job_private_data_dir:
|
|
if status == 'successful':
|
|
# copy project folder before resetting to default branch
|
|
# because some git-tree-specific resources (like submodules) might matter
|
|
self.make_local_copy(instance, self.job_private_data_dir)
|
|
if self.original_branch:
|
|
# for git project syncs, non-default branches can be problems
|
|
# restore to branch the repo was on before this run
|
|
try:
|
|
self.original_branch.checkout()
|
|
except Exception:
|
|
# this could have failed due to dirty tree, but difficult to predict all cases
|
|
logger.exception('Failed to restore project repo to prior state after {}'.format(instance.log_format))
|
|
finally:
|
|
self.release_lock(instance)
|
|
p = instance.project
|
|
if instance.job_type == 'check' and status not in (
|
|
'failed',
|
|
'canceled',
|
|
):
|
|
if self.playbook_new_revision:
|
|
p.scm_revision = self.playbook_new_revision
|
|
else:
|
|
if status == 'successful':
|
|
logger.error("{} Could not find scm revision in check".format(instance.log_format))
|
|
p.playbook_files = p.playbooks
|
|
p.inventory_files = p.inventories
|
|
p.save(update_fields=['scm_revision', 'playbook_files', 'inventory_files'])
|
|
|
|
# Update any inventories that depend on this project
|
|
dependent_inventory_sources = p.scm_inventory_sources.filter(update_on_project_update=True)
|
|
if len(dependent_inventory_sources) > 0:
|
|
if status == 'successful' and instance.launch_type != 'sync':
|
|
self._update_dependent_inventories(instance, dependent_inventory_sources)
|
|
|
|
def build_execution_environment_params(self, instance, private_data_dir):
|
|
if settings.IS_K8S:
|
|
return {}
|
|
|
|
params = super(RunProjectUpdate, self).build_execution_environment_params(instance, private_data_dir)
|
|
project_path = instance.get_project_path(check_if_exists=False)
|
|
cache_path = instance.get_cache_path()
|
|
params.setdefault('container_volume_mounts', [])
|
|
params['container_volume_mounts'].extend(
|
|
[
|
|
f"{project_path}:{project_path}:Z",
|
|
f"{cache_path}:{cache_path}:Z",
|
|
]
|
|
)
|
|
return params
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
class RunInventoryUpdate(BaseTask):
|
|
|
|
model = InventoryUpdate
|
|
event_model = InventoryUpdateEvent
|
|
event_data_key = 'inventory_update_id'
|
|
|
|
def build_private_data(self, inventory_update, private_data_dir):
|
|
"""
|
|
Return private data needed for inventory update.
|
|
|
|
Returns a dict of the form
|
|
{
|
|
'credentials': {
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>
|
|
}
|
|
}
|
|
|
|
If no private data is needed, return None.
|
|
"""
|
|
if inventory_update.source in InventorySource.injectors:
|
|
injector = InventorySource.injectors[inventory_update.source]()
|
|
return injector.build_private_data(inventory_update, private_data_dir)
|
|
|
|
def build_env(self, inventory_update, private_data_dir, private_data_files=None):
|
|
"""Build environment dictionary for ansible-inventory.
|
|
|
|
Most environment variables related to credentials or configuration
|
|
are accomplished by the inventory source injectors (in this method)
|
|
or custom credential type injectors (in main run method).
|
|
"""
|
|
env = super(RunInventoryUpdate, self).build_env(inventory_update, private_data_dir, private_data_files=private_data_files)
|
|
|
|
if private_data_files is None:
|
|
private_data_files = {}
|
|
# Pass inventory source ID to inventory script.
|
|
env['INVENTORY_SOURCE_ID'] = str(inventory_update.inventory_source_id)
|
|
env['INVENTORY_UPDATE_ID'] = str(inventory_update.pk)
|
|
env.update(STANDARD_INVENTORY_UPDATE_ENV)
|
|
|
|
injector = None
|
|
if inventory_update.source in InventorySource.injectors:
|
|
injector = InventorySource.injectors[inventory_update.source]()
|
|
|
|
if injector is not None:
|
|
env = injector.build_env(inventory_update, env, private_data_dir, private_data_files)
|
|
# All CLOUD_PROVIDERS sources implement as inventory plugin from collection
|
|
env['ANSIBLE_INVENTORY_ENABLED'] = 'auto'
|
|
|
|
if inventory_update.source == 'scm':
|
|
for env_k in inventory_update.source_vars_dict:
|
|
if str(env_k) not in env and str(env_k) not in settings.INV_ENV_VARIABLE_BLOCKED:
|
|
env[str(env_k)] = str(inventory_update.source_vars_dict[env_k])
|
|
elif inventory_update.source == 'file':
|
|
raise NotImplementedError('Cannot update file sources through the task system.')
|
|
|
|
if inventory_update.source == 'scm' and inventory_update.source_project_update:
|
|
env_key = 'ANSIBLE_COLLECTIONS_PATHS'
|
|
config_setting = 'collections_paths'
|
|
folder = 'requirements_collections'
|
|
default = '~/.ansible/collections:/usr/share/ansible/collections'
|
|
|
|
config_values = read_ansible_config(os.path.join(private_data_dir, 'project'), [config_setting])
|
|
|
|
paths = default.split(':')
|
|
if env_key in env:
|
|
for path in env[env_key].split(':'):
|
|
if path not in paths:
|
|
paths = [env[env_key]] + paths
|
|
elif config_setting in config_values:
|
|
for path in config_values[config_setting].split(':'):
|
|
if path not in paths:
|
|
paths = [config_values[config_setting]] + paths
|
|
paths = [os.path.join(CONTAINER_ROOT, folder)] + paths
|
|
env[env_key] = os.pathsep.join(paths)
|
|
if 'ANSIBLE_COLLECTIONS_PATHS' in env:
|
|
paths = env['ANSIBLE_COLLECTIONS_PATHS'].split(':')
|
|
else:
|
|
paths = ['~/.ansible/collections', '/usr/share/ansible/collections']
|
|
paths.append('/usr/share/automation-controller/collections')
|
|
env['ANSIBLE_COLLECTIONS_PATHS'] = os.pathsep.join(paths)
|
|
|
|
return env
|
|
|
|
def write_args_file(self, private_data_dir, args):
|
|
path = os.path.join(private_data_dir, 'args')
|
|
handle = os.open(path, os.O_RDWR | os.O_CREAT, stat.S_IREAD | stat.S_IWRITE)
|
|
f = os.fdopen(handle, 'w')
|
|
f.write(' '.join(args))
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR)
|
|
return path
|
|
|
|
def build_args(self, inventory_update, private_data_dir, passwords):
|
|
"""Build the command line argument list for running an inventory
|
|
import.
|
|
"""
|
|
# Get the inventory source and inventory.
|
|
inventory_source = inventory_update.inventory_source
|
|
inventory = inventory_source.inventory
|
|
|
|
if inventory is None:
|
|
raise RuntimeError('Inventory Source is not associated with an Inventory.')
|
|
|
|
args = ['ansible-inventory', '--list', '--export']
|
|
|
|
# Add arguments for the source inventory file/script/thing
|
|
rel_path = self.pseudo_build_inventory(inventory_update, private_data_dir)
|
|
container_location = os.path.join(CONTAINER_ROOT, rel_path)
|
|
source_location = os.path.join(private_data_dir, rel_path)
|
|
|
|
args.append('-i')
|
|
args.append(container_location)
|
|
|
|
args.append('--output')
|
|
args.append(os.path.join(CONTAINER_ROOT, 'artifacts', str(inventory_update.id), 'output.json'))
|
|
|
|
if os.path.isdir(source_location):
|
|
playbook_dir = container_location
|
|
else:
|
|
playbook_dir = os.path.dirname(container_location)
|
|
args.extend(['--playbook-dir', playbook_dir])
|
|
|
|
if inventory_update.verbosity:
|
|
args.append('-' + 'v' * min(5, inventory_update.verbosity * 2 + 1))
|
|
|
|
return args
|
|
|
|
def build_inventory(self, inventory_update, private_data_dir):
|
|
return None # what runner expects in order to not deal with inventory
|
|
|
|
def pseudo_build_inventory(self, inventory_update, private_data_dir):
|
|
"""Inventory imports are ran through a management command
|
|
we pass the inventory in args to that command, so this is not considered
|
|
to be "Ansible" inventory (by runner) even though it is
|
|
Eventually, we would like to cut out the management command,
|
|
and thus use this as the real inventory
|
|
"""
|
|
src = inventory_update.source
|
|
|
|
injector = None
|
|
if inventory_update.source in InventorySource.injectors:
|
|
injector = InventorySource.injectors[src]()
|
|
|
|
if injector is not None:
|
|
content = injector.inventory_contents(inventory_update, private_data_dir)
|
|
# must be a statically named file
|
|
inventory_path = os.path.join(private_data_dir, 'inventory', injector.filename)
|
|
with open(inventory_path, 'w') as f:
|
|
f.write(content)
|
|
os.chmod(inventory_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
|
|
|
|
rel_path = os.path.join('inventory', injector.filename)
|
|
elif src == 'scm':
|
|
rel_path = os.path.join('project', inventory_update.source_path)
|
|
|
|
return rel_path
|
|
|
|
def build_playbook_path_relative_to_cwd(self, inventory_update, private_data_dir):
|
|
return None
|
|
|
|
def build_credentials_list(self, inventory_update):
|
|
# All credentials not used by inventory source injector
|
|
return inventory_update.get_extra_credentials()
|
|
|
|
def pre_run_hook(self, inventory_update, private_data_dir):
|
|
super(RunInventoryUpdate, self).pre_run_hook(inventory_update, private_data_dir)
|
|
source_project = None
|
|
if inventory_update.inventory_source:
|
|
source_project = inventory_update.inventory_source.source_project
|
|
if (
|
|
inventory_update.source == 'scm' and inventory_update.launch_type != 'scm' and source_project and source_project.scm_type
|
|
): # never ever update manual projects
|
|
|
|
# Check if the content cache exists, so that we do not unnecessarily re-download roles
|
|
sync_needs = ['update_{}'.format(source_project.scm_type)]
|
|
has_cache = os.path.exists(os.path.join(source_project.get_cache_path(), source_project.cache_id))
|
|
# Galaxy requirements are not supported for manual projects
|
|
if not has_cache:
|
|
sync_needs.extend(['install_roles', 'install_collections'])
|
|
|
|
local_project_sync = source_project.create_project_update(
|
|
_eager_fields=dict(
|
|
launch_type="sync",
|
|
job_type='run',
|
|
job_tags=','.join(sync_needs),
|
|
status='running',
|
|
execution_node=inventory_update.execution_node,
|
|
instance_group=inventory_update.instance_group,
|
|
celery_task_id=inventory_update.celery_task_id,
|
|
)
|
|
)
|
|
# associate the inventory update before calling run() so that a
|
|
# cancel() call on the inventory update can cancel the project update
|
|
local_project_sync.scm_inventory_updates.add(inventory_update)
|
|
|
|
project_update_task = local_project_sync._get_task_class()
|
|
try:
|
|
sync_task = project_update_task(job_private_data_dir=private_data_dir)
|
|
sync_task.run(local_project_sync.id)
|
|
local_project_sync.refresh_from_db()
|
|
inventory_update.inventory_source.scm_last_revision = local_project_sync.scm_revision
|
|
inventory_update.inventory_source.save(update_fields=['scm_last_revision'])
|
|
except Exception:
|
|
inventory_update = self.update_model(
|
|
inventory_update.pk,
|
|
status='failed',
|
|
job_explanation=(
|
|
'Previous Task Failed: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}'
|
|
% ('project_update', local_project_sync.name, local_project_sync.id)
|
|
),
|
|
)
|
|
raise
|
|
elif inventory_update.source == 'scm' and inventory_update.launch_type == 'scm' and source_project:
|
|
# This follows update, not sync, so make copy here
|
|
RunProjectUpdate.make_local_copy(source_project, private_data_dir)
|
|
|
|
def post_run_hook(self, inventory_update, status):
|
|
super(RunInventoryUpdate, self).post_run_hook(inventory_update, status)
|
|
if status != 'successful':
|
|
return # nothing to save, step out of the way to allow error reporting
|
|
|
|
private_data_dir = inventory_update.job_env['AWX_PRIVATE_DATA_DIR']
|
|
expected_output = os.path.join(private_data_dir, 'artifacts', 'output.json')
|
|
with open(expected_output) as f:
|
|
data = json.load(f)
|
|
|
|
# build inventory save options
|
|
options = dict(
|
|
overwrite=inventory_update.overwrite,
|
|
overwrite_vars=inventory_update.overwrite_vars,
|
|
)
|
|
src = inventory_update.source
|
|
|
|
if inventory_update.enabled_var:
|
|
options['enabled_var'] = inventory_update.enabled_var
|
|
options['enabled_value'] = inventory_update.enabled_value
|
|
else:
|
|
if getattr(settings, '%s_ENABLED_VAR' % src.upper(), False):
|
|
options['enabled_var'] = getattr(settings, '%s_ENABLED_VAR' % src.upper())
|
|
if getattr(settings, '%s_ENABLED_VALUE' % src.upper(), False):
|
|
options['enabled_value'] = getattr(settings, '%s_ENABLED_VALUE' % src.upper())
|
|
|
|
if inventory_update.host_filter:
|
|
options['host_filter'] = inventory_update.host_filter
|
|
|
|
if getattr(settings, '%s_EXCLUDE_EMPTY_GROUPS' % src.upper()):
|
|
options['exclude_empty_groups'] = True
|
|
if getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper(), False):
|
|
options['instance_id_var'] = getattr(settings, '%s_INSTANCE_ID_VAR' % src.upper())
|
|
|
|
# Verbosity is applied to saving process, as well as ansible-inventory CLI option
|
|
if inventory_update.verbosity:
|
|
options['verbosity'] = inventory_update.verbosity
|
|
|
|
handler = SpecialInventoryHandler(
|
|
self.event_handler,
|
|
self.cancel_callback,
|
|
verbosity=inventory_update.verbosity,
|
|
job_timeout=self.get_instance_timeout(self.instance),
|
|
start_time=inventory_update.started,
|
|
counter=self.event_ct,
|
|
initial_line=self.end_line,
|
|
)
|
|
inv_logger = logging.getLogger('awx.main.commands.inventory_import')
|
|
formatter = inv_logger.handlers[0].formatter
|
|
formatter.job_start = inventory_update.started
|
|
handler.formatter = formatter
|
|
inv_logger.handlers[0] = handler
|
|
|
|
from awx.main.management.commands.inventory_import import Command as InventoryImportCommand
|
|
|
|
cmd = InventoryImportCommand()
|
|
try:
|
|
# save the inventory data to database.
|
|
# canceling exceptions will be handled in the global post_run_hook
|
|
cmd.perform_update(options, data, inventory_update)
|
|
except PermissionDenied as exc:
|
|
logger.exception('License error saving {} content'.format(inventory_update.log_format))
|
|
raise PostRunError(str(exc), status='error')
|
|
except PostRunError:
|
|
logger.exception('Error saving {} content, rolling back changes'.format(inventory_update.log_format))
|
|
raise
|
|
except Exception:
|
|
logger.exception('Exception saving {} content, rolling back changes.'.format(inventory_update.log_format))
|
|
raise PostRunError('Error occured while saving inventory data, see traceback or server logs', status='error', tb=traceback.format_exc())
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
class RunAdHocCommand(BaseTask):
|
|
"""
|
|
Run an ad hoc command using ansible.
|
|
"""
|
|
|
|
model = AdHocCommand
|
|
event_model = AdHocCommandEvent
|
|
event_data_key = 'ad_hoc_command_id'
|
|
|
|
def build_private_data(self, ad_hoc_command, private_data_dir):
|
|
"""
|
|
Return SSH private key data needed for this ad hoc command (only if
|
|
stored in DB as ssh_key_data).
|
|
|
|
Returns a dict of the form
|
|
{
|
|
'credentials': {
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
<awx.main.models.Credential>: <credential_decrypted_ssh_key_data>,
|
|
...
|
|
},
|
|
'certificates': {
|
|
<awx.main.models.Credential>: <signed SSH certificate data>,
|
|
<awx.main.models.Credential>: <signed SSH certificate data>,
|
|
...
|
|
}
|
|
}
|
|
"""
|
|
# If we were sent SSH credentials, decrypt them and send them
|
|
# back (they will be written to a temporary file).
|
|
creds = ad_hoc_command.credential
|
|
private_data = {'credentials': {}}
|
|
if creds and creds.has_input('ssh_key_data'):
|
|
private_data['credentials'][creds] = creds.get_input('ssh_key_data', default='')
|
|
if creds and creds.has_input('ssh_public_key_data'):
|
|
private_data.setdefault('certificates', {})[creds] = creds.get_input('ssh_public_key_data', default='')
|
|
return private_data
|
|
|
|
def build_passwords(self, ad_hoc_command, runtime_passwords):
|
|
"""
|
|
Build a dictionary of passwords for SSH private key, SSH user and
|
|
sudo/su.
|
|
"""
|
|
passwords = super(RunAdHocCommand, self).build_passwords(ad_hoc_command, runtime_passwords)
|
|
cred = ad_hoc_command.credential
|
|
if cred:
|
|
for field in ('ssh_key_unlock', 'ssh_password', 'become_password'):
|
|
value = runtime_passwords.get(field, cred.get_input('password' if field == 'ssh_password' else field, default=''))
|
|
if value not in ('', 'ASK'):
|
|
passwords[field] = value
|
|
return passwords
|
|
|
|
def build_env(self, ad_hoc_command, private_data_dir, private_data_files=None):
|
|
"""
|
|
Build environment dictionary for ansible.
|
|
"""
|
|
env = super(RunAdHocCommand, self).build_env(ad_hoc_command, private_data_dir, private_data_files=private_data_files)
|
|
# Set environment variables needed for inventory and ad hoc event
|
|
# callbacks to work.
|
|
env['AD_HOC_COMMAND_ID'] = str(ad_hoc_command.pk)
|
|
env['INVENTORY_ID'] = str(ad_hoc_command.inventory.pk)
|
|
env['INVENTORY_HOSTVARS'] = str(True)
|
|
env['ANSIBLE_LOAD_CALLBACK_PLUGINS'] = '1'
|
|
env['ANSIBLE_SFTP_BATCH_MODE'] = 'False'
|
|
|
|
return env
|
|
|
|
def build_args(self, ad_hoc_command, private_data_dir, passwords):
|
|
"""
|
|
Build command line argument list for running ansible, optionally using
|
|
ssh-agent for public/private key authentication.
|
|
"""
|
|
creds = ad_hoc_command.credential
|
|
ssh_username, become_username, become_method = '', '', ''
|
|
if creds:
|
|
ssh_username = creds.get_input('username', default='')
|
|
become_method = creds.get_input('become_method', default='')
|
|
become_username = creds.get_input('become_username', default='')
|
|
else:
|
|
become_method = None
|
|
become_username = ""
|
|
# Always specify the normal SSH user as root by default. Since this
|
|
# task is normally running in the background under a service account,
|
|
# it doesn't make sense to rely on ansible's default of using the
|
|
# current user.
|
|
ssh_username = ssh_username or 'root'
|
|
args = []
|
|
if ad_hoc_command.job_type == 'check':
|
|
args.append('--check')
|
|
args.extend(['-u', sanitize_jinja(ssh_username)])
|
|
if 'ssh_password' in passwords:
|
|
args.append('--ask-pass')
|
|
# We only specify sudo/su user and password if explicitly given by the
|
|
# credential. Credential should never specify both sudo and su.
|
|
if ad_hoc_command.become_enabled:
|
|
args.append('--become')
|
|
if become_method:
|
|
args.extend(['--become-method', sanitize_jinja(become_method)])
|
|
if become_username:
|
|
args.extend(['--become-user', sanitize_jinja(become_username)])
|
|
if 'become_password' in passwords:
|
|
args.append('--ask-become-pass')
|
|
|
|
if ad_hoc_command.forks: # FIXME: Max limit?
|
|
args.append('--forks=%d' % ad_hoc_command.forks)
|
|
if ad_hoc_command.diff_mode:
|
|
args.append('--diff')
|
|
if ad_hoc_command.verbosity:
|
|
args.append('-%s' % ('v' * min(5, ad_hoc_command.verbosity)))
|
|
|
|
extra_vars = ad_hoc_command.awx_meta_vars()
|
|
|
|
if ad_hoc_command.extra_vars_dict:
|
|
redacted_extra_vars, removed_vars = extract_ansible_vars(ad_hoc_command.extra_vars_dict)
|
|
if removed_vars:
|
|
raise ValueError(_("{} are prohibited from use in ad hoc commands.").format(", ".join(removed_vars)))
|
|
extra_vars.update(ad_hoc_command.extra_vars_dict)
|
|
|
|
if ad_hoc_command.limit:
|
|
args.append(ad_hoc_command.limit)
|
|
else:
|
|
args.append('all')
|
|
|
|
return args
|
|
|
|
def build_extra_vars_file(self, ad_hoc_command, private_data_dir):
|
|
extra_vars = ad_hoc_command.awx_meta_vars()
|
|
|
|
if ad_hoc_command.extra_vars_dict:
|
|
redacted_extra_vars, removed_vars = extract_ansible_vars(ad_hoc_command.extra_vars_dict)
|
|
if removed_vars:
|
|
raise ValueError(_("{} are prohibited from use in ad hoc commands.").format(", ".join(removed_vars)))
|
|
extra_vars.update(ad_hoc_command.extra_vars_dict)
|
|
self._write_extra_vars_file(private_data_dir, extra_vars)
|
|
|
|
def build_module_name(self, ad_hoc_command):
|
|
return ad_hoc_command.module_name
|
|
|
|
def build_module_args(self, ad_hoc_command):
|
|
module_args = ad_hoc_command.module_args
|
|
if settings.ALLOW_JINJA_IN_EXTRA_VARS != 'always':
|
|
module_args = sanitize_jinja(module_args)
|
|
return module_args
|
|
|
|
def build_playbook_path_relative_to_cwd(self, job, private_data_dir):
|
|
return None
|
|
|
|
def get_password_prompts(self, passwords={}):
|
|
d = super(RunAdHocCommand, self).get_password_prompts()
|
|
d[r'Enter passphrase for .*:\s*?$'] = 'ssh_key_unlock'
|
|
d[r'Bad passphrase, try again for .*:\s*?$'] = ''
|
|
for method in PRIVILEGE_ESCALATION_METHODS:
|
|
d[r'%s password.*:\s*?$' % (method[0])] = 'become_password'
|
|
d[r'%s password.*:\s*?$' % (method[0].upper())] = 'become_password'
|
|
d[r'BECOME password.*:\s*?$'] = 'become_password'
|
|
d[r'SSH password:\s*?$'] = 'ssh_password'
|
|
d[r'Password:\s*?$'] = 'ssh_password'
|
|
return d
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
class RunSystemJob(BaseTask):
|
|
|
|
model = SystemJob
|
|
event_model = SystemJobEvent
|
|
event_data_key = 'system_job_id'
|
|
|
|
def build_execution_environment_params(self, system_job, private_data_dir):
|
|
return {}
|
|
|
|
def build_args(self, system_job, private_data_dir, passwords):
|
|
args = ['awx-manage', system_job.job_type]
|
|
try:
|
|
# System Job extra_vars can be blank, must be JSON if not blank
|
|
if system_job.extra_vars == '':
|
|
json_vars = {}
|
|
else:
|
|
json_vars = json.loads(system_job.extra_vars)
|
|
if system_job.job_type in ('cleanup_jobs', 'cleanup_activitystream'):
|
|
if 'days' in json_vars:
|
|
args.extend(['--days', str(json_vars.get('days', 60))])
|
|
if 'dry_run' in json_vars and json_vars['dry_run']:
|
|
args.extend(['--dry-run'])
|
|
if system_job.job_type == 'cleanup_jobs':
|
|
args.extend(
|
|
['--jobs', '--project-updates', '--inventory-updates', '--management-jobs', '--ad-hoc-commands', '--workflow-jobs', '--notifications']
|
|
)
|
|
except Exception:
|
|
logger.exception("{} Failed to parse system job".format(system_job.log_format))
|
|
return args
|
|
|
|
def write_args_file(self, private_data_dir, args):
|
|
path = os.path.join(private_data_dir, 'args')
|
|
handle = os.open(path, os.O_RDWR | os.O_CREAT, stat.S_IREAD | stat.S_IWRITE)
|
|
f = os.fdopen(handle, 'w')
|
|
f.write(' '.join(args))
|
|
f.close()
|
|
os.chmod(path, stat.S_IRUSR)
|
|
return path
|
|
|
|
def build_env(self, instance, private_data_dir, private_data_files=None):
|
|
base_env = super(RunSystemJob, self).build_env(instance, private_data_dir, private_data_files=private_data_files)
|
|
# TODO: this is able to run by turning off isolation
|
|
# the goal is to run it a container instead
|
|
env = dict(os.environ.items())
|
|
env.update(base_env)
|
|
return env
|
|
|
|
def build_playbook_path_relative_to_cwd(self, job, private_data_dir):
|
|
return None
|
|
|
|
def build_inventory(self, instance, private_data_dir):
|
|
return None
|
|
|
|
|
|
def _reconstruct_relationships(copy_mapping):
|
|
for old_obj, new_obj in copy_mapping.items():
|
|
model = type(old_obj)
|
|
for field_name in getattr(model, 'FIELDS_TO_PRESERVE_AT_COPY', []):
|
|
field = model._meta.get_field(field_name)
|
|
if isinstance(field, ForeignKey):
|
|
if getattr(new_obj, field_name, None):
|
|
continue
|
|
related_obj = getattr(old_obj, field_name)
|
|
related_obj = copy_mapping.get(related_obj, related_obj)
|
|
setattr(new_obj, field_name, related_obj)
|
|
elif field.many_to_many:
|
|
for related_obj in getattr(old_obj, field_name).all():
|
|
logger.debug('Deep copy: Adding {} to {}({}).{} relationship'.format(related_obj, new_obj, model, field_name))
|
|
getattr(new_obj, field_name).add(copy_mapping.get(related_obj, related_obj))
|
|
new_obj.save()
|
|
|
|
|
|
@task(queue=get_local_queuename)
|
|
def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, uuid, permission_check_func=None):
|
|
sub_obj_list = cache.get(uuid)
|
|
if sub_obj_list is None:
|
|
logger.error('Deep copy {} from {} to {} failed unexpectedly.'.format(model_name, obj_pk, new_obj_pk))
|
|
return
|
|
|
|
logger.debug('Deep copy {} from {} to {}.'.format(model_name, obj_pk, new_obj_pk))
|
|
from awx.api.generics import CopyAPIView
|
|
from awx.main.signals import disable_activity_stream
|
|
|
|
model = getattr(importlib.import_module(model_module), model_name, None)
|
|
if model is None:
|
|
return
|
|
try:
|
|
obj = model.objects.get(pk=obj_pk)
|
|
new_obj = model.objects.get(pk=new_obj_pk)
|
|
creater = User.objects.get(pk=user_pk)
|
|
except ObjectDoesNotExist:
|
|
logger.warning("Object or user no longer exists.")
|
|
return
|
|
with transaction.atomic(), ignore_inventory_computed_fields(), disable_activity_stream():
|
|
copy_mapping = {}
|
|
for sub_obj_setup in sub_obj_list:
|
|
sub_model = getattr(importlib.import_module(sub_obj_setup[0]), sub_obj_setup[1], None)
|
|
if sub_model is None:
|
|
continue
|
|
try:
|
|
sub_obj = sub_model.objects.get(pk=sub_obj_setup[2])
|
|
except ObjectDoesNotExist:
|
|
continue
|
|
copy_mapping.update(CopyAPIView.copy_model_obj(obj, new_obj, sub_model, sub_obj, creater))
|
|
_reconstruct_relationships(copy_mapping)
|
|
if permission_check_func:
|
|
permission_check_func = getattr(getattr(importlib.import_module(permission_check_func[0]), permission_check_func[1]), permission_check_func[2])
|
|
permission_check_func(creater, copy_mapping.values())
|
|
if isinstance(new_obj, Inventory):
|
|
update_inventory_computed_fields.delay(new_obj.id)
|
|
|
|
|
|
class TransmitterThread(threading.Thread):
|
|
def run(self):
|
|
self.exc = None
|
|
|
|
try:
|
|
super().run()
|
|
except Exception:
|
|
self.exc = sys.exc_info()
|
|
|
|
|
|
class AWXReceptorJob:
|
|
def __init__(self, task=None, runner_params=None):
|
|
self.task = task
|
|
self.runner_params = runner_params
|
|
self.unit_id = None
|
|
|
|
if self.task and not self.task.instance.is_container_group_task:
|
|
execution_environment_params = self.task.build_execution_environment_params(self.task.instance, runner_params['private_data_dir'])
|
|
self.runner_params['settings'].update(execution_environment_params)
|
|
|
|
def run(self):
|
|
# We establish a connection to the Receptor socket
|
|
receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock')
|
|
|
|
try:
|
|
return self._run_internal(receptor_ctl)
|
|
finally:
|
|
# Make sure to always release the work unit if we established it
|
|
if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK:
|
|
receptor_ctl.simple_command(f"work release {self.unit_id}")
|
|
|
|
def _run_internal(self, receptor_ctl):
|
|
# Create a socketpair. Where the left side will be used for writing our payload
|
|
# (private data dir, kwargs). The right side will be passed to Receptor for
|
|
# reading.
|
|
sockin, sockout = socket.socketpair()
|
|
|
|
transmitter_thread = TransmitterThread(target=self.transmit, args=[sockin])
|
|
transmitter_thread.start()
|
|
|
|
# submit our work, passing
|
|
# in the right side of our socketpair for reading.
|
|
result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params)
|
|
self.unit_id = result['unitid']
|
|
|
|
sockin.close()
|
|
sockout.close()
|
|
|
|
if transmitter_thread.exc:
|
|
raise transmitter_thread.exc[1].with_traceback(transmitter_thread.exc[2])
|
|
|
|
transmitter_thread.join()
|
|
|
|
resultsock, resultfile = receptor_ctl.get_work_results(self.unit_id, return_socket=True, return_sockfile=True)
|
|
# Both "processor" and "cancel_watcher" are spawned in separate threads.
|
|
# We wait for the first one to return. If cancel_watcher returns first,
|
|
# we yank the socket out from underneath the processor, which will cause it
|
|
# to exit. A reference to the processor_future is passed into the cancel_watcher_future,
|
|
# Which exits if the job has finished normally. The context manager ensures we do not
|
|
# leave any threads laying around.
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
|
processor_future = executor.submit(self.processor, resultfile)
|
|
cancel_watcher_future = executor.submit(self.cancel_watcher, processor_future)
|
|
futures = [processor_future, cancel_watcher_future]
|
|
first_future = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
|
|
|
|
res = list(first_future.done)[0].result()
|
|
if res.status == 'canceled':
|
|
receptor_ctl.simple_command(f"work cancel {self.unit_id}")
|
|
resultsock.shutdown(socket.SHUT_RDWR)
|
|
resultfile.close()
|
|
elif res.status == 'error':
|
|
# TODO: There should be a more efficient way of getting this information
|
|
receptor_work_list = receptor_ctl.simple_command("work list")
|
|
detail = receptor_work_list[self.unit_id]['Detail']
|
|
state_name = receptor_work_list[self.unit_id]['StateName']
|
|
|
|
if 'exceeded quota' in detail:
|
|
logger.warn(detail)
|
|
log_name = self.task.instance.log_format
|
|
logger.warn(f"Could not launch pod for {log_name}. Exceeded quota.")
|
|
self.task.update_model(self.task.instance.pk, status='pending')
|
|
return
|
|
# If ansible-runner ran, but an error occured at runtime, the traceback information
|
|
# is saved via the status_handler passed in to the processor.
|
|
if state_name == 'Succeeded':
|
|
return res
|
|
|
|
raise RuntimeError(detail)
|
|
|
|
return res
|
|
|
|
# Spawned in a thread so Receptor can start reading before we finish writing, we
|
|
# write our payload to the left side of our socketpair.
|
|
@cleanup_new_process
|
|
def transmit(self, _socket):
|
|
if not settings.IS_K8S and self.work_type == 'local':
|
|
self.runner_params['only_transmit_kwargs'] = True
|
|
|
|
try:
|
|
ansible_runner.interface.run(streamer='transmit', _output=_socket.makefile('wb'), **self.runner_params)
|
|
finally:
|
|
# Socket must be shutdown here, or the reader will hang forever.
|
|
_socket.shutdown(socket.SHUT_WR)
|
|
|
|
@cleanup_new_process
|
|
def processor(self, resultfile):
|
|
return ansible_runner.interface.run(
|
|
streamer='process',
|
|
quiet=True,
|
|
_input=resultfile,
|
|
event_handler=self.task.event_handler,
|
|
finished_callback=self.task.finished_callback,
|
|
status_handler=self.task.status_handler,
|
|
**self.runner_params,
|
|
)
|
|
|
|
@property
|
|
def receptor_params(self):
|
|
if self.task.instance.is_container_group_task:
|
|
spec_yaml = yaml.dump(self.pod_definition, explicit_start=True)
|
|
|
|
receptor_params = {
|
|
"secret_kube_pod": spec_yaml,
|
|
"pod_pending_timeout": getattr(settings, 'AWX_CONTAINER_GROUP_POD_PENDING_TIMEOUT', "5m"),
|
|
}
|
|
|
|
if self.credential:
|
|
kubeconfig_yaml = yaml.dump(self.kube_config, explicit_start=True)
|
|
receptor_params["secret_kube_config"] = kubeconfig_yaml
|
|
else:
|
|
private_data_dir = self.runner_params['private_data_dir']
|
|
receptor_params = {"params": f"--private-data-dir={private_data_dir}"}
|
|
|
|
return receptor_params
|
|
|
|
@property
|
|
def work_type(self):
|
|
if self.task.instance.is_container_group_task:
|
|
if self.credential:
|
|
work_type = 'kubernetes-runtime-auth'
|
|
else:
|
|
work_type = 'kubernetes-incluster-auth'
|
|
else:
|
|
work_type = 'local'
|
|
|
|
return work_type
|
|
|
|
@cleanup_new_process
|
|
def cancel_watcher(self, processor_future):
|
|
while True:
|
|
if processor_future.done():
|
|
return processor_future.result()
|
|
|
|
if self.task.cancel_callback():
|
|
result = namedtuple('result', ['status', 'rc'])
|
|
return result('canceled', 1)
|
|
|
|
if hasattr(self, 'unit_id') and 'RECEPTOR_UNIT_ID' not in self.task.instance.job_env:
|
|
self.task.instance.job_env['RECEPTOR_UNIT_ID'] = self.unit_id
|
|
self.task.update_model(self.task.instance.pk, job_env=self.task.instance.job_env)
|
|
|
|
time.sleep(1)
|
|
|
|
@property
|
|
def pod_definition(self):
|
|
if self.task and self.task.instance.execution_environment:
|
|
ee = self.task.instance.execution_environment
|
|
else:
|
|
ee = get_default_execution_environment()
|
|
|
|
default_pod_spec = get_default_pod_spec()
|
|
|
|
pod_spec_override = {}
|
|
if self.task and self.task.instance.instance_group.pod_spec_override:
|
|
pod_spec_override = parse_yaml_or_json(self.task.instance.instance_group.pod_spec_override)
|
|
pod_spec = {**default_pod_spec, **pod_spec_override}
|
|
|
|
pod_spec['spec']['containers'][0]['image'] = ee.image
|
|
pod_spec['spec']['containers'][0]['args'] = ['ansible-runner', 'worker', '--private-data-dir=/runner']
|
|
|
|
# Enforce EE Pull Policy
|
|
pull_options = {"always": "Always", "missing": "IfNotPresent", "never": "Never"}
|
|
if self.task and self.task.instance.execution_environment:
|
|
if self.task.instance.execution_environment.pull:
|
|
pod_spec['spec']['containers'][0]['imagePullPolicy'] = pull_options[self.task.instance.execution_environment.pull]
|
|
|
|
if self.task and self.task.instance.is_container_group_task:
|
|
# If EE credential is passed, create an imagePullSecret
|
|
if self.task.instance.execution_environment and self.task.instance.execution_environment.credential:
|
|
# Create pull secret in k8s cluster based on ee cred
|
|
from awx.main.scheduler.kubernetes import PodManager # prevent circular import
|
|
|
|
pm = PodManager(self.task.instance)
|
|
secret_name = pm.create_secret(job=self.task.instance)
|
|
|
|
# Inject secret name into podspec
|
|
pod_spec['spec']['imagePullSecrets'] = [{"name": secret_name}]
|
|
|
|
if self.task:
|
|
pod_spec['metadata'] = deepmerge(
|
|
pod_spec.get('metadata', {}),
|
|
dict(name=self.pod_name, labels={'ansible-awx': settings.INSTALL_UUID, 'ansible-awx-job-id': str(self.task.instance.id)}),
|
|
)
|
|
|
|
return pod_spec
|
|
|
|
@property
|
|
def pod_name(self):
|
|
return f"automation-job-{self.task.instance.id}"
|
|
|
|
@property
|
|
def credential(self):
|
|
return self.task.instance.instance_group.credential
|
|
|
|
@property
|
|
def namespace(self):
|
|
return self.pod_definition['metadata']['namespace']
|
|
|
|
@property
|
|
def kube_config(self):
|
|
host_input = self.credential.get_input('host')
|
|
config = {
|
|
"apiVersion": "v1",
|
|
"kind": "Config",
|
|
"preferences": {},
|
|
"clusters": [{"name": host_input, "cluster": {"server": host_input}}],
|
|
"users": [{"name": host_input, "user": {"token": self.credential.get_input('bearer_token')}}],
|
|
"contexts": [{"name": host_input, "context": {"cluster": host_input, "user": host_input, "namespace": self.namespace}}],
|
|
"current-context": host_input,
|
|
}
|
|
|
|
if self.credential.get_input('verify_ssl') and 'ssl_ca_cert' in self.credential.inputs:
|
|
config["clusters"][0]["cluster"]["certificate-authority-data"] = b64encode(
|
|
self.credential.get_input('ssl_ca_cert').encode() # encode to bytes
|
|
).decode() # decode the base64 data into a str
|
|
else:
|
|
config["clusters"][0]["cluster"]["insecure-skip-tls-verify"] = True
|
|
return config
|