mirror of
https://github.com/ansible/awx.git
synced 2026-01-13 11:00:03 -03:30
Remove the IsolatedManager and its associated playbooks and plugins
This commit is contained in:
parent
b0cdfe7625
commit
6a599695db
@ -220,17 +220,11 @@ def projects_by_scm_type(since, **kwargs):
|
||||
return counts
|
||||
|
||||
|
||||
def _get_isolated_datetime(last_check):
|
||||
if last_check:
|
||||
return last_check.isoformat()
|
||||
return last_check
|
||||
|
||||
|
||||
@register('instance_info', '1.0', description=_('Cluster topology and capacity'))
|
||||
@register('instance_info', '1.1', description=_('Cluster topology and capacity'))
|
||||
def instance_info(since, include_hostnames=False, **kwargs):
|
||||
info = {}
|
||||
instances = models.Instance.objects.values_list('hostname').values(
|
||||
'uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'hostname', 'last_isolated_check', 'enabled'
|
||||
'uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'hostname', 'enabled'
|
||||
)
|
||||
for instance in instances:
|
||||
consumed_capacity = sum(x.task_impact for x in models.UnifiedJob.objects.filter(execution_node=instance['hostname'], status__in=('running', 'waiting')))
|
||||
@ -241,7 +235,6 @@ def instance_info(since, include_hostnames=False, **kwargs):
|
||||
'cpu': instance['cpu'],
|
||||
'memory': instance['memory'],
|
||||
'managed_by_policy': instance['managed_by_policy'],
|
||||
'last_isolated_check': _get_isolated_datetime(instance['last_isolated_check']),
|
||||
'enabled': instance['enabled'],
|
||||
'consumed_capacity': consumed_capacity,
|
||||
'remaining_capacity': instance['capacity'] - consumed_capacity,
|
||||
|
||||
@ -184,7 +184,6 @@ def metrics():
|
||||
INSTANCE_INFO.labels(hostname=hostname, instance_uuid=uuid).info(
|
||||
{
|
||||
'enabled': str(instance_data[uuid]['enabled']),
|
||||
'last_isolated_check': getattr(instance_data[uuid], 'last_isolated_check', 'None'),
|
||||
'managed_by_policy': str(instance_data[uuid]['managed_by_policy']),
|
||||
'version': instance_data[uuid]['version'],
|
||||
}
|
||||
|
||||
1
awx/main/isolated/.gitignore
vendored
1
awx/main/isolated/.gitignore
vendored
@ -1 +0,0 @@
|
||||
authorized_keys
|
||||
@ -1,365 +0,0 @@
|
||||
import fnmatch
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import stat
|
||||
import tempfile
|
||||
import time
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
from django.conf import settings
|
||||
import ansible_runner
|
||||
|
||||
import awx
|
||||
from awx.main.utils import get_system_task_capacity
|
||||
|
||||
logger = logging.getLogger('awx.isolated.manager')
|
||||
playbook_logger = logging.getLogger('awx.isolated.manager.playbooks')
|
||||
|
||||
|
||||
def set_pythonpath(venv_libdir, env):
|
||||
env.pop('PYTHONPATH', None) # default to none if no python_ver matches
|
||||
for version in os.listdir(venv_libdir):
|
||||
if fnmatch.fnmatch(version, 'python[23].*'):
|
||||
if os.path.isdir(os.path.join(venv_libdir, version)):
|
||||
env['PYTHONPATH'] = os.path.join(venv_libdir, version, "site-packages") + ":"
|
||||
break
|
||||
|
||||
|
||||
class IsolatedManager(object):
|
||||
def __init__(self, event_handler, canceled_callback=None, check_callback=None):
|
||||
"""
|
||||
:param event_handler: a callable used to persist event data from isolated nodes
|
||||
:param canceled_callback: a callable - which returns `True` or `False`
|
||||
- signifying if the job has been prematurely
|
||||
canceled
|
||||
"""
|
||||
self.event_handler = event_handler
|
||||
self.canceled_callback = canceled_callback
|
||||
self.check_callback = check_callback
|
||||
self.started_at = None
|
||||
self.captured_command_artifact = False
|
||||
self.instance = None
|
||||
|
||||
def build_inventory(self, hosts):
|
||||
inventory = '\n'.join(['{} ansible_ssh_user={}'.format(host, settings.AWX_ISOLATED_USERNAME) for host in hosts])
|
||||
|
||||
return inventory
|
||||
|
||||
def build_runner_params(self, hosts, verbosity=1):
|
||||
env = dict(os.environ.items())
|
||||
env['ANSIBLE_RETRY_FILES_ENABLED'] = 'False'
|
||||
env['ANSIBLE_HOST_KEY_CHECKING'] = str(settings.AWX_ISOLATED_HOST_KEY_CHECKING)
|
||||
env['ANSIBLE_LIBRARY'] = os.path.join(os.path.dirname(awx.__file__), 'plugins', 'isolated')
|
||||
env['ANSIBLE_COLLECTIONS_PATHS'] = settings.AWX_ANSIBLE_COLLECTIONS_PATHS
|
||||
set_pythonpath(os.path.join(settings.ANSIBLE_VENV_PATH, 'lib'), env)
|
||||
|
||||
def finished_callback(runner_obj):
|
||||
if runner_obj.status == 'failed' and runner_obj.config.playbook != 'check_isolated.yml':
|
||||
# failed for clean_isolated.yml just means the playbook hasn't
|
||||
# exited on the isolated host
|
||||
stdout = runner_obj.stdout.read()
|
||||
playbook_logger.error(stdout)
|
||||
elif runner_obj.status == 'timeout':
|
||||
# this means that the default idle timeout of
|
||||
# (2 * AWX_ISOLATED_CONNECTION_TIMEOUT) was exceeded
|
||||
# (meaning, we tried to sync with an isolated node, and we got
|
||||
# no new output for 2 * AWX_ISOLATED_CONNECTION_TIMEOUT seconds)
|
||||
# this _usually_ means SSH key auth from the controller ->
|
||||
# isolated didn't work, and ssh is hung waiting on interactive
|
||||
# input e.g.,
|
||||
#
|
||||
# awx@isolated's password:
|
||||
stdout = runner_obj.stdout.read()
|
||||
playbook_logger.error(stdout)
|
||||
else:
|
||||
playbook_logger.info(runner_obj.stdout.read())
|
||||
|
||||
return {
|
||||
'project_dir': os.path.abspath(os.path.join(os.path.dirname(awx.__file__), 'playbooks')),
|
||||
'inventory': self.build_inventory(hosts),
|
||||
'envvars': env,
|
||||
'finished_callback': finished_callback,
|
||||
'verbosity': verbosity,
|
||||
'cancel_callback': self.canceled_callback,
|
||||
'settings': {
|
||||
'job_timeout': settings.AWX_ISOLATED_LAUNCH_TIMEOUT,
|
||||
'suppress_ansible_output': True,
|
||||
},
|
||||
}
|
||||
|
||||
def path_to(self, *args):
|
||||
return os.path.join(self.private_data_dir, *args)
|
||||
|
||||
def run_management_playbook(self, playbook, private_data_dir, idle_timeout=None, **kw):
|
||||
iso_dir = tempfile.mkdtemp(prefix=playbook, dir=private_data_dir)
|
||||
params = self.runner_params.copy()
|
||||
params.get('envvars', dict())['ANSIBLE_CALLBACK_WHITELIST'] = 'profile_tasks'
|
||||
params['playbook'] = playbook
|
||||
params['private_data_dir'] = iso_dir
|
||||
if idle_timeout:
|
||||
params['settings']['idle_timeout'] = idle_timeout
|
||||
else:
|
||||
params['settings'].pop('idle_timeout', None)
|
||||
params.update(**kw)
|
||||
if all([getattr(settings, 'AWX_ISOLATED_KEY_GENERATION', False) is True, getattr(settings, 'AWX_ISOLATED_PRIVATE_KEY', None)]):
|
||||
params['ssh_key'] = settings.AWX_ISOLATED_PRIVATE_KEY
|
||||
return ansible_runner.interface.run(**params)
|
||||
|
||||
def dispatch(self, playbook=None, module=None, module_args=None):
|
||||
"""
|
||||
Ship the runner payload to a remote host for isolated execution.
|
||||
"""
|
||||
self.handled_events = set()
|
||||
self.started_at = time.time()
|
||||
|
||||
# exclude certain files from the rsync
|
||||
rsync_exclude = [
|
||||
# don't rsync source control metadata (it can be huge!)
|
||||
'- /project/.git',
|
||||
'- /project/.svn',
|
||||
# don't rsync job events that are in the process of being written
|
||||
'- /artifacts/job_events/*-partial.json.tmp',
|
||||
# don't rsync the ssh_key FIFO
|
||||
'- /env/ssh_key',
|
||||
# don't rsync kube config files
|
||||
'- .kubeconfig*',
|
||||
]
|
||||
|
||||
for filename, data in (['.rsync-filter', '\n'.join(rsync_exclude)],):
|
||||
path = self.path_to(filename)
|
||||
with open(path, 'w') as f:
|
||||
f.write(data)
|
||||
os.chmod(path, stat.S_IRUSR)
|
||||
|
||||
extravars = {
|
||||
'src': self.private_data_dir,
|
||||
'dest': settings.AWX_ISOLATION_BASE_PATH,
|
||||
'ident': self.ident,
|
||||
'job_id': self.instance.id,
|
||||
}
|
||||
if playbook:
|
||||
extravars['playbook'] = playbook
|
||||
if module and module_args:
|
||||
extravars['module'] = module
|
||||
extravars['module_args'] = module_args
|
||||
|
||||
logger.debug('Starting job {} on isolated host with `run_isolated.yml` playbook.'.format(self.instance.id))
|
||||
runner_obj = self.run_management_playbook(
|
||||
'run_isolated.yml', self.private_data_dir, idle_timeout=max(60, 2 * settings.AWX_ISOLATED_CONNECTION_TIMEOUT), extravars=extravars
|
||||
)
|
||||
|
||||
if runner_obj.status == 'failed':
|
||||
self.instance.result_traceback = runner_obj.stdout.read()
|
||||
self.instance.save(update_fields=['result_traceback'])
|
||||
return 'error', runner_obj.rc
|
||||
|
||||
return runner_obj.status, runner_obj.rc
|
||||
|
||||
def check(self, interval=None):
|
||||
"""
|
||||
Repeatedly poll the isolated node to determine if the job has run.
|
||||
|
||||
On success, copy job artifacts to the controlling node.
|
||||
On failure, continue to poll the isolated node (until the job timeout
|
||||
is exceeded).
|
||||
|
||||
For a completed job run, this function returns (status, rc),
|
||||
representing the status and return code of the isolated
|
||||
`ansible-playbook` run.
|
||||
|
||||
:param interval: an interval (in seconds) to wait between status polls
|
||||
"""
|
||||
interval = interval if interval is not None else settings.AWX_ISOLATED_CHECK_INTERVAL
|
||||
extravars = {'src': self.private_data_dir, 'job_id': self.instance.id}
|
||||
status = 'failed'
|
||||
rc = None
|
||||
last_check = time.time()
|
||||
|
||||
while status == 'failed':
|
||||
canceled = self.canceled_callback() if self.canceled_callback else False
|
||||
if not canceled and time.time() - last_check < interval:
|
||||
# If the job isn't canceled, but we haven't waited `interval` seconds, wait longer
|
||||
time.sleep(1)
|
||||
continue
|
||||
|
||||
if canceled:
|
||||
logger.warning('Isolated job {} was manually canceled.'.format(self.instance.id))
|
||||
|
||||
logger.debug('Checking on isolated job {} with `check_isolated.yml`.'.format(self.instance.id))
|
||||
time_start = datetime.datetime.now()
|
||||
runner_obj = self.run_management_playbook('check_isolated.yml', self.private_data_dir, extravars=extravars)
|
||||
time_end = datetime.datetime.now()
|
||||
time_diff = time_end - time_start
|
||||
logger.debug('Finished checking on isolated job {} with `check_isolated.yml` took {} seconds.'.format(self.instance.id, time_diff.total_seconds()))
|
||||
status, rc = runner_obj.status, runner_obj.rc
|
||||
|
||||
if self.check_callback is not None and not self.captured_command_artifact:
|
||||
command_path = self.path_to('artifacts', self.ident, 'command')
|
||||
# If the configuration artifact has been synced back, update the model
|
||||
if os.path.exists(command_path):
|
||||
try:
|
||||
with open(command_path, 'r') as f:
|
||||
data = json.load(f)
|
||||
self.check_callback(data)
|
||||
self.captured_command_artifact = True
|
||||
except json.decoder.JSONDecodeError: # Just in case it's not fully here yet.
|
||||
pass
|
||||
|
||||
self.consume_events()
|
||||
|
||||
last_check = time.time()
|
||||
|
||||
if status == 'successful':
|
||||
status_path = self.path_to('artifacts', self.ident, 'status')
|
||||
rc_path = self.path_to('artifacts', self.ident, 'rc')
|
||||
if os.path.exists(status_path):
|
||||
with open(status_path, 'r') as f:
|
||||
status = f.readline()
|
||||
with open(rc_path, 'r') as f:
|
||||
rc = int(f.readline())
|
||||
else:
|
||||
# if there's no status file, it means that runner _probably_
|
||||
# exited with a traceback (which should be logged to
|
||||
# daemon.log) Record it so we can see how runner failed.
|
||||
daemon_path = self.path_to('daemon.log')
|
||||
if os.path.exists(daemon_path):
|
||||
with open(daemon_path, 'r') as f:
|
||||
self.instance.result_traceback = f.read()
|
||||
self.instance.save(update_fields=['result_traceback'])
|
||||
else:
|
||||
logger.error('Failed to rsync daemon.log (is ansible-runner installed on the isolated host?)')
|
||||
status = 'failed'
|
||||
rc = 1
|
||||
|
||||
# consume events one last time just to be sure we didn't miss anything
|
||||
# in the final sync
|
||||
self.consume_events()
|
||||
|
||||
return status, rc
|
||||
|
||||
def consume_events(self):
|
||||
# discover new events and ingest them
|
||||
events_path = self.path_to('artifacts', self.ident, 'job_events')
|
||||
|
||||
# it's possible that `events_path` doesn't exist *yet*, because runner
|
||||
# hasn't actually written any events yet (if you ran e.g., a sleep 30)
|
||||
# only attempt to consume events if any were rsynced back
|
||||
if os.path.exists(events_path):
|
||||
for event in set(os.listdir(events_path)) - self.handled_events:
|
||||
path = os.path.join(events_path, event)
|
||||
if os.path.exists(path) and os.path.isfile(path):
|
||||
try:
|
||||
event_data = json.load(open(os.path.join(events_path, event), 'r'))
|
||||
except json.decoder.JSONDecodeError:
|
||||
# This means the event we got back isn't valid JSON
|
||||
# that can happen if runner is still partially
|
||||
# writing an event file while it's rsyncing
|
||||
# these event writes are _supposed_ to be atomic
|
||||
# but it doesn't look like they actually are in
|
||||
# practice
|
||||
# in this scenario, just ignore this event and try it
|
||||
# again on the next sync
|
||||
continue
|
||||
self.event_handler(event_data)
|
||||
self.handled_events.add(event)
|
||||
|
||||
def cleanup(self):
|
||||
extravars = {
|
||||
'private_data_dir': self.private_data_dir,
|
||||
'cleanup_dirs': [
|
||||
self.private_data_dir,
|
||||
],
|
||||
}
|
||||
logger.debug('Cleaning up job {} on isolated host with `clean_isolated.yml` playbook.'.format(self.instance.id))
|
||||
self.run_management_playbook('clean_isolated.yml', self.private_data_dir, extravars=extravars)
|
||||
|
||||
@classmethod
|
||||
def update_capacity(cls, instance, task_result):
|
||||
instance.version = 'ansible-runner-{}'.format(task_result['version'])
|
||||
|
||||
if instance.capacity == 0 and task_result['capacity_cpu']:
|
||||
logger.warning('Isolated instance {} has re-joined.'.format(instance.hostname))
|
||||
instance.cpu = int(task_result['cpu'])
|
||||
instance.memory = int(task_result['mem'])
|
||||
instance.cpu_capacity = int(task_result['capacity_cpu'])
|
||||
instance.mem_capacity = int(task_result['capacity_mem'])
|
||||
instance.capacity = get_system_task_capacity(
|
||||
scale=instance.capacity_adjustment, cpu_capacity=int(task_result['capacity_cpu']), mem_capacity=int(task_result['capacity_mem'])
|
||||
)
|
||||
instance.save(update_fields=['cpu', 'memory', 'cpu_capacity', 'mem_capacity', 'capacity', 'version', 'modified'])
|
||||
|
||||
def health_check(self, instance_qs):
|
||||
"""
|
||||
:param instance_qs: List of Django objects representing the
|
||||
isolated instances to manage
|
||||
Runs playbook that will
|
||||
- determine if instance is reachable
|
||||
- find the instance capacity
|
||||
- clean up orphaned private files
|
||||
Performs save on each instance to update its capacity.
|
||||
"""
|
||||
instance_qs = [i for i in instance_qs if i.enabled]
|
||||
if not len(instance_qs):
|
||||
return
|
||||
try:
|
||||
private_data_dir = tempfile.mkdtemp(prefix='awx_iso_heartbeat_', dir=settings.AWX_ISOLATION_BASE_PATH)
|
||||
self.runner_params = self.build_runner_params([instance.hostname for instance in instance_qs])
|
||||
self.runner_params['private_data_dir'] = private_data_dir
|
||||
self.runner_params['forks'] = len(instance_qs)
|
||||
runner_obj = self.run_management_playbook('heartbeat_isolated.yml', private_data_dir)
|
||||
|
||||
for instance in instance_qs:
|
||||
task_result = {}
|
||||
try:
|
||||
task_result = runner_obj.get_fact_cache(instance.hostname)
|
||||
except Exception:
|
||||
logger.exception('Failed to read status from isolated instances')
|
||||
if 'awx_capacity_cpu' in task_result and 'awx_capacity_mem' in task_result:
|
||||
task_result = {
|
||||
'cpu': task_result['awx_cpu'],
|
||||
'mem': task_result['awx_mem'],
|
||||
'capacity_cpu': task_result['awx_capacity_cpu'],
|
||||
'capacity_mem': task_result['awx_capacity_mem'],
|
||||
'version': task_result['awx_capacity_version'],
|
||||
}
|
||||
IsolatedManager.update_capacity(instance, task_result)
|
||||
logger.debug('Isolated instance {} successful heartbeat'.format(instance.hostname))
|
||||
elif instance.capacity == 0:
|
||||
logger.debug('Isolated instance {} previously marked as lost, could not re-join.'.format(instance.hostname))
|
||||
else:
|
||||
logger.warning('Could not update status of isolated instance {}'.format(instance.hostname))
|
||||
if instance.is_lost(isolated=True):
|
||||
instance.capacity = 0
|
||||
instance.save(update_fields=['capacity'])
|
||||
logger.error('Isolated instance {} last checked in at {}, marked as lost.'.format(instance.hostname, instance.modified))
|
||||
finally:
|
||||
if os.path.exists(private_data_dir):
|
||||
shutil.rmtree(private_data_dir)
|
||||
|
||||
def run(self, instance, private_data_dir, playbook, module, module_args, ident=None):
|
||||
"""
|
||||
Run a job on an isolated host.
|
||||
|
||||
:param instance: a `model.Job` instance
|
||||
:param private_data_dir: an absolute path on the local file system
|
||||
where job-specific data should be written
|
||||
(i.e., `/tmp/awx_N_xyz/`)
|
||||
:param playbook: the playbook to run
|
||||
:param module: the module to run
|
||||
:param module_args: the module args to use
|
||||
|
||||
For a completed job run, this function returns (status, rc),
|
||||
representing the status and return code of the isolated
|
||||
`ansible-playbook` run.
|
||||
"""
|
||||
self.ident = ident
|
||||
self.instance = instance
|
||||
self.private_data_dir = private_data_dir
|
||||
self.runner_params = self.build_runner_params([instance.execution_node], verbosity=min(5, self.instance.verbosity))
|
||||
|
||||
status, rc = self.dispatch(playbook, module, module_args)
|
||||
if status == 'successful':
|
||||
status, rc = self.check()
|
||||
return status, rc
|
||||
@ -10,7 +10,6 @@ from datetime import datetime as dt
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db import connection
|
||||
from django.db.models import Q
|
||||
from django.db.migrations.executor import MigrationExecutor
|
||||
|
||||
from awx.main.analytics.broadcast_websocket import (
|
||||
@ -140,8 +139,7 @@ class Command(BaseCommand):
|
||||
data[family.name] = family.samples[0].value
|
||||
|
||||
me = Instance.objects.me()
|
||||
# TODO: drop the isolated groups exclusion when the model is updated
|
||||
hostnames = [i.hostname for i in Instance.objects.exclude(Q(hostname=me.hostname) | Q(rampart_groups__controller__isnull=False))]
|
||||
hostnames = [i.hostname for i in Instance.objects.exclude(hostname=me.hostname)]
|
||||
|
||||
host_stats = Command.get_connection_status(me, hostnames, data)
|
||||
lines = Command._format_lines(host_stats)
|
||||
|
||||
@ -155,9 +155,6 @@ class InstanceManager(models.Manager):
|
||||
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
|
||||
return "tower"
|
||||
|
||||
def all_non_isolated(self):
|
||||
return self.exclude(rampart_groups__controller__isnull=False)
|
||||
|
||||
|
||||
class InstanceGroupManager(models.Manager):
|
||||
"""A custom manager class for the Instance model.
|
||||
|
||||
@ -33,7 +33,7 @@ import subprocess
|
||||
from django.conf import settings
|
||||
from django.db import transaction, DatabaseError, IntegrityError, ProgrammingError, connection
|
||||
from django.db.models.fields.related import ForeignKey
|
||||
from django.utils.timezone import now, timedelta
|
||||
from django.utils.timezone import now
|
||||
from django.utils.encoding import smart_str
|
||||
from django.contrib.auth.models import User
|
||||
from django.utils.translation import ugettext_lazy as _, gettext_noop
|
||||
@ -84,7 +84,6 @@ from awx.main.models import (
|
||||
from awx.main.constants import ACTIVE_STATES
|
||||
from awx.main.exceptions import AwxTaskError, PostRunError
|
||||
from awx.main.queue import CallbackQueueDispatcher
|
||||
from awx.main.isolated import manager as isolated_manager
|
||||
from awx.main.dispatch.publish import task
|
||||
from awx.main.dispatch import get_local_queuename, reaper
|
||||
from awx.main.utils import (
|
||||
@ -170,8 +169,6 @@ def dispatch_startup():
|
||||
#
|
||||
apply_cluster_membership_policies()
|
||||
cluster_node_heartbeat()
|
||||
if Instance.objects.me().is_controller():
|
||||
awx_isolated_heartbeat()
|
||||
Metrics().clear_values()
|
||||
|
||||
# Update Tower's rsyslog.conf file based on loggins settings in the db
|
||||
@ -205,13 +202,8 @@ def apply_cluster_membership_policies():
|
||||
started_compute = time.time()
|
||||
all_instances = list(Instance.objects.order_by('id'))
|
||||
all_groups = list(InstanceGroup.objects.prefetch_related('instances'))
|
||||
iso_hostnames = set([])
|
||||
for ig in all_groups:
|
||||
if ig.controller_id is not None:
|
||||
iso_hostnames.update(ig.policy_instance_list)
|
||||
|
||||
considered_instances = [inst for inst in all_instances if inst.hostname not in iso_hostnames]
|
||||
total_instances = len(considered_instances)
|
||||
total_instances = len(all_instances)
|
||||
actual_groups = []
|
||||
actual_instances = []
|
||||
Group = namedtuple('Group', ['obj', 'instances', 'prior_instances'])
|
||||
@ -232,18 +224,12 @@ def apply_cluster_membership_policies():
|
||||
if group_actual.instances:
|
||||
logger.debug("Policy List, adding Instances {} to Group {}".format(group_actual.instances, ig.name))
|
||||
|
||||
if ig.controller_id is None:
|
||||
actual_groups.append(group_actual)
|
||||
else:
|
||||
# For isolated groups, _only_ apply the policy_instance_list
|
||||
# do not add to in-memory list, so minimum rules not applied
|
||||
logger.debug('Committing instances to isolated group {}'.format(ig.name))
|
||||
ig.instances.set(group_actual.instances)
|
||||
actual_groups.append(group_actual)
|
||||
|
||||
# Process Instance minimum policies next, since it represents a concrete lower bound to the
|
||||
# number of instances to make available to instance groups
|
||||
actual_instances = [Node(obj=i, groups=[]) for i in considered_instances if i.managed_by_policy]
|
||||
logger.debug("Total non-isolated instances:{} available for policy: {}".format(total_instances, len(actual_instances)))
|
||||
actual_instances = [Node(obj=i, groups=[]) for i in all_instances if i.managed_by_policy]
|
||||
logger.debug("Total instances: {}, available for policy: {}".format(total_instances, len(actual_instances)))
|
||||
for g in sorted(actual_groups, key=lambda x: len(x.instances)):
|
||||
policy_min_added = []
|
||||
for i in sorted(actual_instances, key=lambda x: len(x.groups)):
|
||||
@ -285,7 +271,7 @@ def apply_cluster_membership_policies():
|
||||
logger.debug('Cluster policy no-op finished in {} seconds'.format(time.time() - started_compute))
|
||||
return
|
||||
|
||||
# On a differential basis, apply instances to non-isolated groups
|
||||
# On a differential basis, apply instances to groups
|
||||
with transaction.atomic():
|
||||
for g in actual_groups:
|
||||
if g.obj.is_container_group:
|
||||
@ -419,7 +405,7 @@ def cleanup_execution_environment_images():
|
||||
def cluster_node_heartbeat():
|
||||
logger.debug("Cluster node heartbeat task.")
|
||||
nowtime = now()
|
||||
instance_list = list(Instance.objects.all_non_isolated())
|
||||
instance_list = list(Instance.objects.all())
|
||||
this_inst = None
|
||||
lost_instances = []
|
||||
|
||||
@ -503,30 +489,6 @@ def awx_k8s_reaper():
|
||||
logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group))
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
def awx_isolated_heartbeat():
|
||||
local_hostname = settings.CLUSTER_HOST_ID
|
||||
logger.debug("Controlling node checking for any isolated management tasks.")
|
||||
poll_interval = settings.AWX_ISOLATED_PERIODIC_CHECK
|
||||
# Get isolated instances not checked since poll interval - some buffer
|
||||
nowtime = now()
|
||||
accept_before = nowtime - timedelta(seconds=(poll_interval - 10))
|
||||
isolated_instance_qs = Instance.objects.filter(
|
||||
rampart_groups__controller__instances__hostname=local_hostname,
|
||||
)
|
||||
isolated_instance_qs = isolated_instance_qs.filter(last_isolated_check__lt=accept_before) | isolated_instance_qs.filter(last_isolated_check=None)
|
||||
# Fast pass of isolated instances, claiming the nodes to update
|
||||
with transaction.atomic():
|
||||
for isolated_instance in isolated_instance_qs:
|
||||
isolated_instance.last_isolated_check = nowtime
|
||||
# Prevent modified time from being changed, as in normal heartbeat
|
||||
isolated_instance.save(update_fields=['last_isolated_check'])
|
||||
# Slow pass looping over isolated IGs and their isolated instances
|
||||
if len(isolated_instance_qs) > 0:
|
||||
logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs])))
|
||||
isolated_manager.IsolatedManager(CallbackQueueDispatcher.dispatch).health_check(isolated_instance_qs)
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
def awx_periodic_scheduler():
|
||||
with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired:
|
||||
@ -1017,7 +979,7 @@ class BaseTask(object):
|
||||
else:
|
||||
env['PATH'] = os.path.join(settings.AWX_VENV_PATH, "bin")
|
||||
|
||||
def build_env(self, instance, private_data_dir, isolated, private_data_files=None):
|
||||
def build_env(self, instance, private_data_dir, private_data_files=None):
|
||||
"""
|
||||
Build environment dictionary for ansible-playbook.
|
||||
"""
|
||||
@ -1138,7 +1100,7 @@ class BaseTask(object):
|
||||
"""
|
||||
instance.log_lifecycle("post_run")
|
||||
|
||||
def final_run_hook(self, instance, status, private_data_dir, fact_modification_times, isolated_manager_instance=None):
|
||||
def final_run_hook(self, instance, status, private_data_dir, fact_modification_times):
|
||||
"""
|
||||
Hook for any steps to run after job/task is marked as complete.
|
||||
"""
|
||||
@ -1280,16 +1242,6 @@ class BaseTask(object):
|
||||
job_env[k] = v
|
||||
self.instance = self.update_model(self.instance.pk, job_args=json.dumps(runner_config.command), job_cwd=runner_config.cwd, job_env=job_env)
|
||||
|
||||
def check_handler(self, config):
|
||||
"""
|
||||
IsolatedManager callback triggered by the repeated checks of the isolated node
|
||||
"""
|
||||
job_env = build_safe_env(config['env'])
|
||||
for k, v in self.safe_cred_env.items():
|
||||
if k in job_env:
|
||||
job_env[k] = v
|
||||
self.instance = self.update_model(self.instance.pk, job_args=json.dumps(config['command']), job_cwd=config['cwd'], job_env=job_env)
|
||||
|
||||
@with_path_cleanup
|
||||
def run(self, pk, **kwargs):
|
||||
"""
|
||||
@ -1317,7 +1269,6 @@ class BaseTask(object):
|
||||
self.safe_env = {}
|
||||
self.safe_cred_env = {}
|
||||
private_data_dir = None
|
||||
isolated_manager_instance = None
|
||||
|
||||
# store a reference to the parent workflow job (if any) so we can include
|
||||
# it in event data JSON
|
||||
@ -1325,7 +1276,6 @@ class BaseTask(object):
|
||||
self.parent_workflow_job_id = self.instance.get_workflow_job().id
|
||||
|
||||
try:
|
||||
isolated = self.instance.is_isolated()
|
||||
self.instance.send_notification_templates("running")
|
||||
private_data_dir = self.build_private_data_dir(self.instance)
|
||||
self.pre_run_hook(self.instance, private_data_dir)
|
||||
@ -1359,7 +1309,7 @@ class BaseTask(object):
|
||||
passwords = self.build_passwords(self.instance, kwargs)
|
||||
self.build_extra_vars_file(self.instance, private_data_dir)
|
||||
args = self.build_args(self.instance, private_data_dir, passwords)
|
||||
env = self.build_env(self.instance, private_data_dir, isolated, private_data_files=private_data_files)
|
||||
env = self.build_env(self.instance, private_data_dir, private_data_files=private_data_files)
|
||||
self.safe_env = build_safe_env(env)
|
||||
|
||||
credentials = self.build_credentials_list(self.instance)
|
||||
@ -1460,7 +1410,7 @@ class BaseTask(object):
|
||||
self.instance = self.update_model(pk, status=status, emitted_events=self.event_ct, **extra_update_fields)
|
||||
|
||||
try:
|
||||
self.final_run_hook(self.instance, status, private_data_dir, fact_modification_times, isolated_manager_instance=isolated_manager_instance)
|
||||
self.final_run_hook(self.instance, status, private_data_dir, fact_modification_times)
|
||||
except Exception:
|
||||
logger.exception('{} Final run hook errored.'.format(self.instance.log_format))
|
||||
|
||||
@ -1545,11 +1495,11 @@ class RunJob(BaseTask):
|
||||
|
||||
return passwords
|
||||
|
||||
def build_env(self, job, private_data_dir, isolated=False, private_data_files=None):
|
||||
def build_env(self, job, private_data_dir, private_data_files=None):
|
||||
"""
|
||||
Build environment dictionary for ansible-playbook.
|
||||
"""
|
||||
env = super(RunJob, self).build_env(job, private_data_dir, isolated=isolated, private_data_files=private_data_files)
|
||||
env = super(RunJob, self).build_env(job, private_data_dir, private_data_files=private_data_files)
|
||||
if private_data_files is None:
|
||||
private_data_files = {}
|
||||
# Set environment variables needed for inventory and job event
|
||||
@ -1560,10 +1510,9 @@ class RunJob(BaseTask):
|
||||
env['PROJECT_REVISION'] = job.project.scm_revision
|
||||
env['ANSIBLE_RETRY_FILES_ENABLED'] = "False"
|
||||
env['MAX_EVENT_RES'] = str(settings.MAX_EVENT_RES_DATA)
|
||||
if not isolated:
|
||||
if hasattr(settings, 'AWX_ANSIBLE_CALLBACK_PLUGINS') and settings.AWX_ANSIBLE_CALLBACK_PLUGINS:
|
||||
env['ANSIBLE_CALLBACK_PLUGINS'] = ':'.join(settings.AWX_ANSIBLE_CALLBACK_PLUGINS)
|
||||
env['AWX_HOST'] = settings.TOWER_URL_BASE
|
||||
if hasattr(settings, 'AWX_ANSIBLE_CALLBACK_PLUGINS') and settings.AWX_ANSIBLE_CALLBACK_PLUGINS:
|
||||
env['ANSIBLE_CALLBACK_PLUGINS'] = ':'.join(settings.AWX_ANSIBLE_CALLBACK_PLUGINS)
|
||||
env['AWX_HOST'] = settings.TOWER_URL_BASE
|
||||
|
||||
# Create a directory for ControlPath sockets that is unique to each job
|
||||
cp_dir = os.path.join(private_data_dir, 'cp')
|
||||
@ -1794,9 +1743,6 @@ class RunJob(BaseTask):
|
||||
if sync_needs:
|
||||
pu_ig = job.instance_group
|
||||
pu_en = job.execution_node
|
||||
if job.is_isolated() is True:
|
||||
pu_ig = pu_ig.controller
|
||||
pu_en = settings.CLUSTER_HOST_ID
|
||||
|
||||
sync_metafields = dict(
|
||||
launch_type="sync",
|
||||
@ -1851,7 +1797,7 @@ class RunJob(BaseTask):
|
||||
# ran inside of the event saving code
|
||||
update_smart_memberships_for_inventory(job.inventory)
|
||||
|
||||
def final_run_hook(self, job, status, private_data_dir, fact_modification_times, isolated_manager_instance=None):
|
||||
def final_run_hook(self, job, status, private_data_dir, fact_modification_times):
|
||||
super(RunJob, self).final_run_hook(job, status, private_data_dir, fact_modification_times)
|
||||
if not private_data_dir:
|
||||
# If there's no private data dir, that means we didn't get into the
|
||||
@ -1863,8 +1809,6 @@ class RunJob(BaseTask):
|
||||
os.path.join(private_data_dir, 'artifacts', 'fact_cache'),
|
||||
fact_modification_times,
|
||||
)
|
||||
if isolated_manager_instance and not job.is_container_group_task:
|
||||
isolated_manager_instance.cleanup()
|
||||
|
||||
try:
|
||||
inventory = job.inventory
|
||||
@ -1928,11 +1872,11 @@ class RunProjectUpdate(BaseTask):
|
||||
passwords['scm_password'] = project_update.credential.get_input('password', default='')
|
||||
return passwords
|
||||
|
||||
def build_env(self, project_update, private_data_dir, isolated=False, private_data_files=None):
|
||||
def build_env(self, project_update, private_data_dir, private_data_files=None):
|
||||
"""
|
||||
Build environment dictionary for ansible-playbook.
|
||||
"""
|
||||
env = super(RunProjectUpdate, self).build_env(project_update, private_data_dir, isolated=isolated, private_data_files=private_data_files)
|
||||
env = super(RunProjectUpdate, self).build_env(project_update, private_data_dir, private_data_files=private_data_files)
|
||||
env['ANSIBLE_RETRY_FILES_ENABLED'] = str(False)
|
||||
env['ANSIBLE_ASK_PASS'] = str(False)
|
||||
env['ANSIBLE_BECOME_ASK_PASS'] = str(False)
|
||||
@ -2387,14 +2331,14 @@ class RunInventoryUpdate(BaseTask):
|
||||
injector = InventorySource.injectors[inventory_update.source]()
|
||||
return injector.build_private_data(inventory_update, private_data_dir)
|
||||
|
||||
def build_env(self, inventory_update, private_data_dir, isolated, private_data_files=None):
|
||||
def build_env(self, inventory_update, private_data_dir, private_data_files=None):
|
||||
"""Build environment dictionary for ansible-inventory.
|
||||
|
||||
Most environment variables related to credentials or configuration
|
||||
are accomplished by the inventory source injectors (in this method)
|
||||
or custom credential type injectors (in main run method).
|
||||
"""
|
||||
env = super(RunInventoryUpdate, self).build_env(inventory_update, private_data_dir, isolated, private_data_files=private_data_files)
|
||||
env = super(RunInventoryUpdate, self).build_env(inventory_update, private_data_dir, private_data_files=private_data_files)
|
||||
|
||||
if private_data_files is None:
|
||||
private_data_files = {}
|
||||
@ -2711,11 +2655,11 @@ class RunAdHocCommand(BaseTask):
|
||||
passwords[field] = value
|
||||
return passwords
|
||||
|
||||
def build_env(self, ad_hoc_command, private_data_dir, isolated=False, private_data_files=None):
|
||||
def build_env(self, ad_hoc_command, private_data_dir, private_data_files=None):
|
||||
"""
|
||||
Build environment dictionary for ansible.
|
||||
"""
|
||||
env = super(RunAdHocCommand, self).build_env(ad_hoc_command, private_data_dir, isolated=isolated, private_data_files=private_data_files)
|
||||
env = super(RunAdHocCommand, self).build_env(ad_hoc_command, private_data_dir, private_data_files=private_data_files)
|
||||
# Set environment variables needed for inventory and ad hoc event
|
||||
# callbacks to work.
|
||||
env['AD_HOC_COMMAND_ID'] = str(ad_hoc_command.pk)
|
||||
@ -2821,11 +2765,6 @@ class RunAdHocCommand(BaseTask):
|
||||
d[r'Password:\s*?$'] = 'ssh_password'
|
||||
return d
|
||||
|
||||
def final_run_hook(self, adhoc_job, status, private_data_dir, fact_modification_times, isolated_manager_instance=None):
|
||||
super(RunAdHocCommand, self).final_run_hook(adhoc_job, status, private_data_dir, fact_modification_times)
|
||||
if isolated_manager_instance:
|
||||
isolated_manager_instance.cleanup()
|
||||
|
||||
|
||||
@task(queue=get_local_queuename)
|
||||
class RunSystemJob(BaseTask):
|
||||
@ -2867,8 +2806,8 @@ class RunSystemJob(BaseTask):
|
||||
os.chmod(path, stat.S_IRUSR)
|
||||
return path
|
||||
|
||||
def build_env(self, instance, private_data_dir, isolated=False, private_data_files=None):
|
||||
base_env = super(RunSystemJob, self).build_env(instance, private_data_dir, isolated=isolated, private_data_files=private_data_files)
|
||||
def build_env(self, instance, private_data_dir, private_data_files=None):
|
||||
base_env = super(RunSystemJob, self).build_env(instance, private_data_dir, private_data_files=private_data_files)
|
||||
# TODO: this is able to run by turning off isolation
|
||||
# the goal is to run it a container instead
|
||||
env = dict(os.environ.items())
|
||||
|
||||
@ -645,102 +645,6 @@ class TestAdhocRun(TestJobExecution):
|
||||
assert extra_vars['awx_user_name'] == "angry-spud"
|
||||
|
||||
|
||||
@pytest.mark.skip(reason="Isolated code path needs updating after runner integration")
|
||||
class TestIsolatedExecution(TestJobExecution):
|
||||
|
||||
ISOLATED_HOST = 'some-isolated-host'
|
||||
ISOLATED_CONTROLLER_HOST = 'some-isolated-controller-host'
|
||||
|
||||
@pytest.fixture
|
||||
def job(self):
|
||||
job = Job(pk=1, id=1, project=Project(), inventory=Inventory(), job_template=JobTemplate(id=1, name='foo'))
|
||||
job.controller_node = self.ISOLATED_CONTROLLER_HOST
|
||||
job.execution_node = self.ISOLATED_HOST
|
||||
return job
|
||||
|
||||
def test_with_ssh_credentials(self, job):
|
||||
ssh = CredentialType.defaults['ssh']()
|
||||
credential = Credential(pk=1, credential_type=ssh, inputs={'username': 'bob', 'password': 'secret', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})
|
||||
credential.inputs['password'] = encrypt_field(credential, 'password')
|
||||
job.credentials.add(credential)
|
||||
|
||||
private_data = tempfile.mkdtemp(prefix='awx_')
|
||||
self.task.build_private_data_dir = mock.Mock(return_value=private_data)
|
||||
|
||||
def _mock_job_artifacts(*args, **kw):
|
||||
artifacts = os.path.join(private_data, 'artifacts')
|
||||
if not os.path.exists(artifacts):
|
||||
os.makedirs(artifacts)
|
||||
if 'run_isolated.yml' in args[0]:
|
||||
for filename, data in (
|
||||
['status', 'successful'],
|
||||
['rc', '0'],
|
||||
['stdout', 'IT WORKED!'],
|
||||
):
|
||||
with open(os.path.join(artifacts, filename), 'w') as f:
|
||||
f.write(data)
|
||||
return ('successful', 0)
|
||||
|
||||
self.run_pexpect.side_effect = _mock_job_artifacts
|
||||
self.task.run(self.pk)
|
||||
|
||||
playbook_run = self.run_pexpect.call_args_list[0][0]
|
||||
assert ' '.join(playbook_run[0]).startswith(
|
||||
' '.join(
|
||||
[
|
||||
'ansible-playbook',
|
||||
'run_isolated.yml',
|
||||
'-u',
|
||||
settings.AWX_ISOLATED_USERNAME,
|
||||
'-T',
|
||||
str(settings.AWX_ISOLATED_CONNECTION_TIMEOUT),
|
||||
'-i',
|
||||
self.ISOLATED_HOST + ',',
|
||||
'-e',
|
||||
]
|
||||
)
|
||||
)
|
||||
extra_vars = playbook_run[0][playbook_run[0].index('-e') + 1]
|
||||
extra_vars = json.loads(extra_vars)
|
||||
assert extra_vars['dest'] == '/tmp'
|
||||
assert extra_vars['src'] == private_data
|
||||
|
||||
def test_systemctl_failure(self):
|
||||
# If systemctl fails, read the contents of `artifacts/systemctl_logs`
|
||||
mock_get = mock.Mock()
|
||||
ssh = CredentialType.defaults['ssh']()
|
||||
credential = Credential(
|
||||
pk=1,
|
||||
credential_type=ssh,
|
||||
inputs={
|
||||
'username': 'bob',
|
||||
},
|
||||
)
|
||||
self.instance.credentials.add(credential)
|
||||
|
||||
private_data = tempfile.mkdtemp(prefix='awx_')
|
||||
self.task.build_private_data_dir = mock.Mock(return_value=private_data)
|
||||
inventory = json.dumps({"all": {"hosts": ["localhost"]}})
|
||||
|
||||
def _mock_job_artifacts(*args, **kw):
|
||||
artifacts = os.path.join(private_data, 'artifacts')
|
||||
if not os.path.exists(artifacts):
|
||||
os.makedirs(artifacts)
|
||||
if 'run_isolated.yml' in args[0]:
|
||||
for filename, data in (['daemon.log', 'ERROR IN RUN.PY'],):
|
||||
with open(os.path.join(artifacts, filename), 'w') as f:
|
||||
f.write(data)
|
||||
return ('successful', 0)
|
||||
|
||||
self.run_pexpect.side_effect = _mock_job_artifacts
|
||||
|
||||
with mock.patch('time.sleep'):
|
||||
with mock.patch('requests.get') as mock_get:
|
||||
mock_get.return_value = mock.Mock(content=inventory)
|
||||
with pytest.raises(Exception):
|
||||
self.task.run(self.pk, self.ISOLATED_HOST)
|
||||
|
||||
|
||||
class TestJobCredentials(TestJobExecution):
|
||||
@pytest.fixture
|
||||
def job(self, execution_environment):
|
||||
@ -1625,7 +1529,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
||||
|
||||
private_data_files = task.build_private_data_files(inventory_update, private_data_dir)
|
||||
env = task.build_env(inventory_update, private_data_dir, False, private_data_files)
|
||||
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
||||
|
||||
assert 'AWS_ACCESS_KEY_ID' not in env
|
||||
assert 'AWS_SECRET_ACCESS_KEY' not in env
|
||||
@ -1645,7 +1549,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
||||
|
||||
private_data_files = task.build_private_data_files(inventory_update, private_data_dir)
|
||||
env = task.build_env(inventory_update, private_data_dir, False, private_data_files)
|
||||
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
||||
|
||||
safe_env = build_safe_env(env)
|
||||
|
||||
@ -1669,7 +1573,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
||||
|
||||
private_data_files = task.build_private_data_files(inventory_update, private_data_dir)
|
||||
env = task.build_env(inventory_update, private_data_dir, False, private_data_files)
|
||||
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
||||
|
||||
safe_env = {}
|
||||
credentials = task.build_credentials_list(inventory_update)
|
||||
@ -1706,7 +1610,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
||||
|
||||
private_data_files = task.build_private_data_files(inventory_update, private_data_dir)
|
||||
env = task.build_env(inventory_update, private_data_dir, False, private_data_files)
|
||||
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
||||
|
||||
safe_env = build_safe_env(env)
|
||||
|
||||
@ -1736,7 +1640,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
||||
|
||||
private_data_files = task.build_private_data_files(inventory_update, private_data_dir)
|
||||
env = task.build_env(inventory_update, private_data_dir, False, private_data_files)
|
||||
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
||||
|
||||
safe_env = build_safe_env(env)
|
||||
|
||||
@ -1763,7 +1667,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
|
||||
def run(expected_gce_zone):
|
||||
private_data_files = task.build_private_data_files(inventory_update, private_data_dir)
|
||||
env = task.build_env(inventory_update, private_data_dir, False, private_data_files)
|
||||
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
||||
safe_env = {}
|
||||
credentials = task.build_credentials_list(inventory_update)
|
||||
for credential in credentials:
|
||||
@ -1797,7 +1701,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
||||
|
||||
private_data_files = task.build_private_data_files(inventory_update, private_data_dir)
|
||||
env = task.build_env(inventory_update, private_data_dir, False, private_data_files)
|
||||
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
||||
|
||||
path = os.path.join(private_data_dir, os.path.basename(env['OS_CLIENT_CONFIG_FILE']))
|
||||
shade_config = open(path, 'r').read()
|
||||
@ -1832,7 +1736,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
||||
|
||||
private_data_files = task.build_private_data_files(inventory_update, private_data_dir)
|
||||
env = task.build_env(inventory_update, private_data_dir, False, private_data_files)
|
||||
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
||||
safe_env = build_safe_env(env)
|
||||
|
||||
assert env["FOREMAN_SERVER"] == "https://example.org"
|
||||
@ -1856,7 +1760,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
inventory_update.get_cloud_credential = get_cred
|
||||
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
||||
|
||||
env = task.build_env(inventory_update, private_data_dir, False)
|
||||
env = task.build_env(inventory_update, private_data_dir)
|
||||
|
||||
safe_env = build_safe_env(env)
|
||||
|
||||
@ -1888,7 +1792,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
inventory_update.get_cloud_credential = get_cred
|
||||
inventory_update.get_extra_credentials = mocker.Mock(return_value=[])
|
||||
|
||||
env = task.build_env(inventory_update, private_data_dir, False)
|
||||
env = task.build_env(inventory_update, private_data_dir)
|
||||
safe_env = {}
|
||||
credentials = task.build_credentials_list(inventory_update)
|
||||
for credential in credentials:
|
||||
@ -1919,7 +1823,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
settings.AWX_TASK_ENV = {'FOO': 'BAR'}
|
||||
|
||||
private_data_files = task.build_private_data_files(inventory_update, private_data_dir)
|
||||
env = task.build_env(inventory_update, private_data_dir, False, private_data_files)
|
||||
env = task.build_env(inventory_update, private_data_dir, private_data_files)
|
||||
|
||||
assert env['FOO'] == 'BAR'
|
||||
|
||||
|
||||
@ -32,14 +32,7 @@ def unwrap_broadcast_msg(payload: dict):
|
||||
|
||||
def get_broadcast_hosts():
|
||||
Instance = apps.get_model('main', 'Instance')
|
||||
instances = (
|
||||
# TODO: no longer filter for non-isolated after the models change
|
||||
Instance.objects.filter(rampart_groups__controller__isnull=True)
|
||||
.exclude(hostname=Instance.objects.me().hostname)
|
||||
.order_by('hostname')
|
||||
.values('hostname', 'ip_address')
|
||||
.distinct()
|
||||
)
|
||||
instances = Instance.objects.exclude(hostname=Instance.objects.me().hostname).order_by('hostname').values('hostname', 'ip_address').distinct()
|
||||
return {i['hostname']: i['ip_address'] or i['hostname'] for i in instances}
|
||||
|
||||
|
||||
|
||||
@ -1,70 +0,0 @@
|
||||
---
|
||||
# The following variables will be set by the runner of this playbook:
|
||||
# src: /tmp/some/path/private_data_dir/
|
||||
|
||||
- name: Poll for status of active job.
|
||||
hosts: all
|
||||
gather_facts: false
|
||||
collections:
|
||||
- ansible.posix
|
||||
|
||||
tasks:
|
||||
- name: "Output job the playbook is running for"
|
||||
debug:
|
||||
msg: "Checking on job {{ job_id }}"
|
||||
|
||||
- name: Determine if daemon process is alive.
|
||||
shell: "ansible-runner is-alive {{src}}"
|
||||
register: is_alive
|
||||
ignore_errors: true
|
||||
|
||||
- name: Copy artifacts from the isolated host.
|
||||
synchronize:
|
||||
src: "{{src}}/artifacts/"
|
||||
dest: "{{src}}/artifacts/"
|
||||
mode: pull
|
||||
delete: true
|
||||
recursive: true
|
||||
when: ansible_kubectl_config is not defined
|
||||
|
||||
- name: Copy daemon log from the isolated host
|
||||
synchronize:
|
||||
src: "{{src}}/daemon.log"
|
||||
dest: "{{src}}/daemon.log"
|
||||
mode: pull
|
||||
when: ansible_kubectl_config is not defined
|
||||
|
||||
- name: Copy artifacts from pod
|
||||
synchronize:
|
||||
src: "{{src}}/artifacts/"
|
||||
dest: "{{src}}/artifacts/"
|
||||
mode: pull
|
||||
delete: true
|
||||
recursive: true
|
||||
set_remote_user: false
|
||||
rsync_opts:
|
||||
- "--blocking-io"
|
||||
- "--rsh=$RSH"
|
||||
environment:
|
||||
RSH: "oc rsh --config={{ ansible_kubectl_config }}"
|
||||
delegate_to: localhost
|
||||
when: ansible_kubectl_config is defined
|
||||
|
||||
- name: Copy daemon log from pod
|
||||
synchronize:
|
||||
src: "{{src}}/daemon.log"
|
||||
dest: "{{src}}/daemon.log"
|
||||
mode: pull
|
||||
set_remote_user: false
|
||||
rsync_opts:
|
||||
- "--blocking-io"
|
||||
- "--rsh=$RSH"
|
||||
environment:
|
||||
RSH: "oc rsh --config={{ ansible_kubectl_config }}"
|
||||
delegate_to: localhost
|
||||
when: ansible_kubectl_config is defined
|
||||
|
||||
- name: Fail if previous check determined that process is not alive.
|
||||
fail:
|
||||
msg: "isolated task is still running"
|
||||
when: "is_alive.rc == 0"
|
||||
@ -1,31 +0,0 @@
|
||||
---
|
||||
|
||||
# The following variables will be set by the runner of this playbook:
|
||||
# cleanup_dirs: ['/tmp/path/private_data_dir/', '/tmp//path/proot_temp_dir/']
|
||||
# private_data_dir: '/tmp/path/private_data_dir/'
|
||||
|
||||
- name: Clean up from isolated job run.
|
||||
hosts: all
|
||||
gather_facts: false
|
||||
|
||||
tasks:
|
||||
|
||||
- name: cancel the job
|
||||
command: "ansible-runner stop {{private_data_dir}}"
|
||||
ignore_errors: true
|
||||
|
||||
- name: remove build artifacts
|
||||
file:
|
||||
path: '{{item}}'
|
||||
state: absent
|
||||
register: result
|
||||
with_items: "{{cleanup_dirs}}"
|
||||
until: result is succeeded
|
||||
ignore_errors: true
|
||||
retries: 3
|
||||
delay: 5
|
||||
|
||||
- name: fail if build artifacts were not cleaned
|
||||
fail:
|
||||
msg: 'Unable to cleanup build artifacts'
|
||||
when: not result is succeeded
|
||||
@ -1,12 +0,0 @@
|
||||
---
|
||||
- name: Periodic background status check of isolated instances.
|
||||
hosts: all
|
||||
gather_facts: false
|
||||
|
||||
tasks:
|
||||
|
||||
- name: Get capacity of the instance
|
||||
awx_capacity:
|
||||
|
||||
- name: Remove any stale temporary files
|
||||
awx_isolated_cleanup:
|
||||
@ -1,61 +0,0 @@
|
||||
---
|
||||
|
||||
# The following variables will be set by the runner of this playbook:
|
||||
# src: /tmp/some/path/private_data_dir
|
||||
# dest: /tmp/some/path/
|
||||
|
||||
- name: Prepare data, dispatch job in isolated environment.
|
||||
hosts: all
|
||||
gather_facts: false
|
||||
vars:
|
||||
secret: "{{ lookup('pipe', 'cat ' + src + '/env/ssh_key') }}"
|
||||
collections:
|
||||
- ansible.posix
|
||||
|
||||
tasks:
|
||||
- name: "Output job the playbook is running for"
|
||||
debug:
|
||||
msg: "Checking on job {{ job_id }}"
|
||||
|
||||
- name: synchronize job environment with isolated host
|
||||
synchronize:
|
||||
copy_links: true
|
||||
src: "{{ src }}"
|
||||
dest: "{{ dest }}"
|
||||
when: ansible_kubectl_config is not defined
|
||||
|
||||
- name: synchronize job environment with remote job container
|
||||
synchronize:
|
||||
copy_links: true
|
||||
src: "{{ src }}"
|
||||
dest: "{{ dest }}"
|
||||
set_remote_user: false
|
||||
rsync_opts:
|
||||
- "--blocking-io"
|
||||
- "--rsh=$RSH"
|
||||
environment:
|
||||
RSH: "oc rsh --config={{ ansible_kubectl_config }}"
|
||||
delegate_to: localhost
|
||||
when: ansible_kubectl_config is defined
|
||||
|
||||
- local_action: stat path="{{src}}/env/ssh_key"
|
||||
register: key
|
||||
|
||||
- name: create a named pipe for secret environment data
|
||||
command: "mkfifo {{src}}/env/ssh_key"
|
||||
when: key.stat.exists
|
||||
|
||||
- name: spawn the playbook
|
||||
command: "ansible-runner start {{src}} -p '{{playbook}}' -i {{ident}}"
|
||||
when: playbook is defined
|
||||
|
||||
- name: spawn the adhoc command
|
||||
command: "ansible-runner start {{src}} -m {{module}} -a {{module_args}} -i {{ident}}"
|
||||
when: module is defined
|
||||
|
||||
- name: write the secret environment data
|
||||
mkfifo:
|
||||
content: "{{secret}}"
|
||||
path: "{{src}}/env/ssh_key"
|
||||
when: key.stat.exists
|
||||
no_log: true
|
||||
@ -1,73 +0,0 @@
|
||||
# Copyright (c) 2017 Ansible by Red Hat
|
||||
#
|
||||
# This file is part of Ansible Tower, but depends on code imported from Ansible.
|
||||
#
|
||||
# Ansible is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Ansible is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from ansible.module_utils._text import to_text
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
|
||||
import subprocess
|
||||
import os
|
||||
import psutil
|
||||
|
||||
|
||||
def get_cpu_capacity():
|
||||
env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None)
|
||||
cpu = psutil.cpu_count()
|
||||
|
||||
if env_forkcpu:
|
||||
forkcpu = int(env_forkcpu)
|
||||
else:
|
||||
forkcpu = 4
|
||||
return (cpu, cpu * forkcpu)
|
||||
|
||||
|
||||
def get_mem_capacity():
|
||||
env_forkmem = os.getenv('SYSTEM_TASK_FORKS_MEM', None)
|
||||
if env_forkmem:
|
||||
forkmem = int(env_forkmem)
|
||||
else:
|
||||
forkmem = 100
|
||||
|
||||
mem = psutil.virtual_memory().total
|
||||
return (mem, max(1, ((mem / 1024 / 1024) - 2048) / forkmem))
|
||||
|
||||
|
||||
def main():
|
||||
module = AnsibleModule(argument_spec=dict())
|
||||
|
||||
ar = module.get_bin_path('ansible-runner', required=True)
|
||||
|
||||
try:
|
||||
version = subprocess.check_output([ar, '--version'], stderr=subprocess.STDOUT).strip()
|
||||
except subprocess.CalledProcessError as e:
|
||||
module.fail_json(msg=to_text(e))
|
||||
return
|
||||
# NOTE: Duplicated with awx.main.utils.common capacity utilities
|
||||
cpu, capacity_cpu = get_cpu_capacity()
|
||||
mem, capacity_mem = get_mem_capacity()
|
||||
|
||||
# Module never results in a change
|
||||
module.exit_json(
|
||||
changed=False,
|
||||
capacity_cpu=capacity_cpu,
|
||||
capacity_mem=capacity_mem,
|
||||
version=version,
|
||||
ansible_facts=dict(awx_cpu=cpu, awx_mem=mem, awx_capacity_cpu=capacity_cpu, awx_capacity_mem=capacity_mem, awx_capacity_version=version),
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -1,68 +0,0 @@
|
||||
# Copyright (c) 2017 Ansible by Red Hat
|
||||
#
|
||||
# This file is part of Ansible Tower, but depends on code imported from Ansible.
|
||||
#
|
||||
# Ansible is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# Ansible is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
|
||||
import glob
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import datetime
|
||||
import subprocess
|
||||
|
||||
|
||||
def main():
|
||||
module = AnsibleModule(argument_spec=dict())
|
||||
changed = False
|
||||
paths_removed = set([])
|
||||
|
||||
# If a folder was last modified before this datetime, it will always be deleted
|
||||
folder_cutoff = datetime.datetime.now() - datetime.timedelta(days=7)
|
||||
# If a folder does not have an associated job running and is older than
|
||||
# this datetime, then it will be deleted because its job has finished
|
||||
job_cutoff = datetime.datetime.now() - datetime.timedelta(hours=1)
|
||||
|
||||
for search_pattern in ['/tmp/awx_[0-9]*_*', '/tmp/ansible_runner_pi_*']:
|
||||
for path in glob.iglob(search_pattern):
|
||||
st = os.stat(path)
|
||||
modtime = datetime.datetime.fromtimestamp(st.st_mtime)
|
||||
|
||||
if modtime > job_cutoff:
|
||||
continue
|
||||
elif modtime > folder_cutoff:
|
||||
try:
|
||||
re_match = re.match(r'\/tmp\/awx_\d+_.+', path)
|
||||
if re_match is not None:
|
||||
try:
|
||||
if subprocess.check_call(['ansible-runner', 'is-alive', path]) == 0:
|
||||
continue
|
||||
except subprocess.CalledProcessError:
|
||||
# the job isn't running anymore, clean up this path
|
||||
module.debug('Deleting path {} its job has completed.'.format(path))
|
||||
except (ValueError, IndexError):
|
||||
continue
|
||||
else:
|
||||
module.debug('Deleting path {} because modification date is too old.'.format(path))
|
||||
changed = True
|
||||
paths_removed.add(path)
|
||||
shutil.rmtree(path)
|
||||
|
||||
module.exit_json(changed=changed, paths_removed=list(paths_removed))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -1,31 +0,0 @@
|
||||
import os
|
||||
import stat
|
||||
|
||||
from ansible.module_utils.basic import AnsibleModule
|
||||
|
||||
|
||||
#
|
||||
# the purpose of this plugin is to call mkfifo and
|
||||
# write raw SSH key data into the fifo created on the remote isolated host
|
||||
#
|
||||
|
||||
|
||||
def main():
|
||||
module = AnsibleModule(argument_spec={'path': {'required': True, 'type': 'str'}, 'content': {'required': True, 'type': 'str'}}, supports_check_mode=False)
|
||||
|
||||
path = module.params['path']
|
||||
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR)
|
||||
with open(path, 'w') as fifo:
|
||||
data = module.params['content']
|
||||
if 'OPENSSH PRIVATE KEY' in data and not data.endswith('\n'):
|
||||
# we use ansible's lookup() to read this file from the disk,
|
||||
# but ansible's lookup() *strips* newlines
|
||||
# OpenSSH wants certain private keys to end with a newline (or it
|
||||
# won't accept them)
|
||||
data += '\n'
|
||||
fifo.write(data)
|
||||
module.exit_json(dest=path, changed=True)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -126,7 +126,7 @@ LOGIN_URL = '/api/login/'
|
||||
PROJECTS_ROOT = '/var/lib/awx/projects/'
|
||||
|
||||
# Absolute filesystem path to the directory to host collections for
|
||||
# running inventory imports, isolated playbooks
|
||||
# running inventory imports
|
||||
AWX_ANSIBLE_COLLECTIONS_PATHS = os.path.join(BASE_DIR, 'vendor', 'awx_ansible_collections')
|
||||
|
||||
# Absolute filesystem path to the directory for job status stdout (default for
|
||||
@ -440,7 +440,6 @@ CELERYBEAT_SCHEDULE = {
|
||||
'k8s_reaper': {'task': 'awx.main.tasks.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},
|
||||
'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},
|
||||
'cleanup_images': {'task': 'awx.main.tasks.cleanup_execution_environment_images', 'schedule': timedelta(hours=3)},
|
||||
# 'isolated_heartbeat': set up at the end of production.py and development.py
|
||||
}
|
||||
|
||||
# Django Caching Configuration
|
||||
@ -854,12 +853,6 @@ LOGGING = {
|
||||
'filename': os.path.join(LOG_ROOT, 'tower_rbac_migrations.log'),
|
||||
'formatter': 'simple',
|
||||
},
|
||||
'isolated_manager': {
|
||||
'level': 'WARNING',
|
||||
'class': 'logging.handlers.WatchedFileHandler',
|
||||
'filename': os.path.join(LOG_ROOT, 'isolated_manager.log'),
|
||||
'formatter': 'simple',
|
||||
},
|
||||
'job_lifecycle': {
|
||||
'level': 'DEBUG',
|
||||
'class': 'logging.handlers.WatchedFileHandler',
|
||||
@ -881,8 +874,6 @@ LOGGING = {
|
||||
'awx.main.dispatch': {'handlers': ['dispatcher']},
|
||||
'awx.main.consumers': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'INFO'},
|
||||
'awx.main.wsbroadcast': {'handlers': ['wsbroadcast']},
|
||||
'awx.isolated.manager': {'level': 'WARNING', 'handlers': ['console', 'file', 'isolated_manager'], 'propagate': True},
|
||||
'awx.isolated.manager.playbooks': {'handlers': ['management_playbooks'], 'propagate': False},
|
||||
'awx.main.commands.inventory_import': {'handlers': ['inventory_import'], 'propagate': False},
|
||||
'awx.main.tasks': {'handlers': ['task_system', 'external_logger'], 'propagate': False},
|
||||
'awx.main.analytics': {'handlers': ['task_system', 'external_logger'], 'level': 'INFO', 'propagate': False},
|
||||
|
||||
@ -35,9 +35,6 @@ LOGGING['handlers']['console']['()'] = 'awx.main.utils.handlers.ColorHandler' #
|
||||
LOGGING['handlers']['task_system'] = LOGGING['handlers']['console'].copy() # noqa
|
||||
COLOR_LOGS = True
|
||||
|
||||
# Pipe management playbook output to console
|
||||
LOGGING['loggers']['awx.isolated.manager.playbooks']['propagate'] = True # noqa
|
||||
|
||||
# celery is annoyingly loud when docker containers start
|
||||
LOGGING['loggers'].pop('celery', None) # noqa
|
||||
# avoid awx.main.dispatch WARNING-level scaling worker up/down messages
|
||||
@ -136,17 +133,6 @@ if "pytest" in sys.modules:
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
CELERYBEAT_SCHEDULE.update(
|
||||
{ # noqa
|
||||
'isolated_heartbeat': {
|
||||
'task': 'awx.main.tasks.awx_isolated_heartbeat',
|
||||
'schedule': timedelta(seconds=AWX_ISOLATED_PERIODIC_CHECK), # noqa
|
||||
'options': {'expires': AWX_ISOLATED_PERIODIC_CHECK * 2}, # noqa
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
CLUSTER_HOST_ID = socket.gethostname()
|
||||
|
||||
AWX_CALLBACK_PROFILE = True
|
||||
|
||||
@ -91,14 +91,4 @@ except IOError:
|
||||
|
||||
# The below runs AFTER all of the custom settings are imported.
|
||||
|
||||
CELERYBEAT_SCHEDULE.update(
|
||||
{ # noqa
|
||||
'isolated_heartbeat': {
|
||||
'task': 'awx.main.tasks.awx_isolated_heartbeat',
|
||||
'schedule': timedelta(seconds=AWX_ISOLATED_PERIODIC_CHECK), # noqa
|
||||
'options': {'expires': AWX_ISOLATED_PERIODIC_CHECK * 2}, # noqa
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
DATABASES['default'].setdefault('OPTIONS', dict()).setdefault('application_name', f'{CLUSTER_HOST_ID}-{os.getpid()}-{" ".join(sys.argv)}'[:63]) # noqa
|
||||
|
||||
@ -58,7 +58,6 @@ LOGGING['loggers']['django_auth_ldap']['handlers'] = ['console']
|
||||
LOGGING['loggers']['social']['handlers'] = ['console']
|
||||
LOGGING['loggers']['system_tracking_migrations']['handlers'] = ['console']
|
||||
LOGGING['loggers']['rbac_migrations']['handlers'] = ['console']
|
||||
LOGGING['loggers']['awx.isolated.manager.playbooks']['handlers'] = ['console']
|
||||
LOGGING['handlers']['callback_receiver'] = {'class': 'logging.NullHandler'}
|
||||
LOGGING['handlers']['task_system'] = {'class': 'logging.NullHandler'}
|
||||
LOGGING['handlers']['tower_warnings'] = {'class': 'logging.NullHandler'}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user