multi-host isolated heartbeat w tower-isolated check

* use tower-expect command to determine job status when running
  the isolated heartbeat playbook
* grok JSON output of playbook to obtain result information
* run playbook against multiple isolated hosts at the same time
  (addresses scalability concerns)
This commit is contained in:
AlanCoding 2017-06-19 12:13:03 -04:00
parent f371dd71b2
commit 40287d8e78
9 changed files with 84 additions and 110 deletions

View File

@ -35,7 +35,7 @@ def main():
cap = 50
cap = 50 + ((int(total_mem_value) / 1024) - 2) * 75
# Module never results in a change and (hopefully) never fails
# Module never results in a change
module.exit_json(changed=False, capacity=cap)

View File

@ -22,53 +22,48 @@ import os
import re
import shutil
import datetime
import subprocess
def main():
module = AnsibleModule(
argument_spec = dict(
cutoff_pk = dict(required=False, default=0, type='int'),
)
argument_spec = dict()
)
cutoff_pk = module.params.get('cutoff_pk')
changed = False
jobs_removed = set([])
paths_removed = set([])
cutoff_time = datetime.datetime.now() - datetime.timedelta(days=7)
# If a folder was last modified before this datetime, it will always be deleted
folder_cutoff = datetime.datetime.now() - datetime.timedelta(days=7)
# If a folder does not have an associated job running and is older than
# this datetime, then it will be deleted because its job has finished
job_cutoff = datetime.datetime.now() - datetime.timedelta(hours=1)
for search_pattern, extract_pattern in [
('/tmp/ansible_tower/jobs/*', r'\/tmp\/ansible_tower\/jobs\/(?P<job_id>\d+)'),
('/tmp/ansible_tower_*', r'\/tmp\/ansible_tower_(?P<job_id>\d+)_*'),
('/tmp/ansible_tower_proot_*', None),
for search_pattern in [
'/tmp/ansible_tower_[0-9]*_*', '/tmp/ansible_tower_proot_*',
]:
for path in glob.iglob(search_pattern):
st = os.stat(path)
modtime = datetime.datetime.fromtimestamp(st.st_mtime)
if modtime > cutoff_time:
# If job's pk value is lower than threshold, we delete it
if modtime > job_cutoff:
continue
elif modtime > folder_cutoff:
try:
if extract_pattern is None:
continue
re_match = re.match(extract_pattern, path)
if re_match is None:
continue
job_id = int(re_match.group('job_id'))
if job_id >= cutoff_pk:
module.debug('Skipping job {}, which may still be running.'.format(job_id))
continue
re_match = re.match(r'\/tmp\/ansible_tower_\d+_.+', path)
if re_match is not None:
if subprocess.check_call(['tower-expect', 'is-alive', path]) == 0:
continue
else:
module.debug('Deleting path {} its job has completed.'.format(path))
except (ValueError, IndexError):
continue
else:
module.debug('Deleting path {} because modification date is too old.'.format(path))
job_id = 'unknown'
changed = True
jobs_removed.add(job_id)
if os.path.islink(path):
os.remove(path)
else:
shutil.rmtree(path)
paths_removed.add(path)
shutil.rmtree(path)
module.exit_json(changed=changed, jobs_removed=[j for j in jobs_removed])
module.exit_json(changed=changed, paths_removed=list(paths_removed))
if __name__ == '__main__':

View File

@ -57,9 +57,7 @@ class IsolatedManager(object):
self.cwd = cwd
self.env = env.copy()
# Do not use callbacks for controller's management jobs
self.env['ANSIBLE_CALLBACK_PLUGINS'] = ''
self.env['CALLBACK_QUEUE'] = ''
self.env['CALLBACK_CONNECTION'] = ''
self.env.update(self._base_management_env())
self.stdout_handle = stdout_handle
self.ssh_key_path = ssh_key_path
self.expect_passwords = {k.pattern: v for k, v in expect_passwords.items()}
@ -71,8 +69,18 @@ class IsolatedManager(object):
self.proot_cmd = proot_cmd
self.started_at = None
@property
def awx_playbook_path(self):
@staticmethod
def _base_management_env():
return {
'ANSIBLE_CALLBACK_PLUGINS': '',
'CALLBACK_QUEUE': '',
'CALLBACK_CONNECTION': '',
'ANSIBLE_RETRY_FILES_ENABLED': 'False',
'ANSIBLE_HOST_KEY_CHECKING': 'False'
}
@classmethod
def awx_playbook_path(cls):
return os.path.join(
os.path.dirname(awx.__file__),
'playbooks'
@ -134,7 +142,7 @@ class IsolatedManager(object):
buff = StringIO.StringIO()
logger.debug('Starting job on isolated host with `run_isolated.yml` playbook.')
status, rc = run.run_pexpect(
args, self.awx_playbook_path, self.env, buff,
args, self.awx_playbook_path(), self.env, buff,
expect_passwords={
re.compile(r'Secret:\s*?$', re.M): base64.b64encode(json.dumps(secrets))
},
@ -244,7 +252,7 @@ class IsolatedManager(object):
buff = cStringIO.StringIO()
logger.debug('Checking job on isolated host with `check_isolated.yml` playbook.')
status, rc = run.run_pexpect(
args, self.awx_playbook_path, self.env, buff,
args, self.awx_playbook_path(), self.env, buff,
cancelled_callback=self.cancelled_callback,
idle_timeout=remaining,
job_timeout=remaining,
@ -295,7 +303,7 @@ class IsolatedManager(object):
logger.debug('Cleaning up job on isolated host with `clean_isolated.yml` playbook.')
buff = cStringIO.StringIO()
status, rc = run.run_pexpect(
args, self.awx_playbook_path, self.env, buff,
args, self.awx_playbook_path(), self.env, buff,
idle_timeout=60, job_timeout=60,
pexpect_timeout=5
)
@ -304,46 +312,55 @@ class IsolatedManager(object):
# stdout_handle is closed by this point so writing output to logs is our only option
logger.warning('Cleanup from isolated job encountered error, output:\n{}'.format(buff.getvalue()))
@staticmethod
def health_check(instance_qs, cutoff_pk=0):
@classmethod
def health_check(cls, instance_qs):
'''
:param instance_qs: List of Django objects representing the
isolated instances to manage
:param cutoff_pk: Job id of the oldest job still in the running state
Method logic not yet written.
returns the instance's capacity or None if it is not reachable
Runs playbook that will
- determine if instance is reachable
- find the instance capacity
- clean up orphaned private files
Performs save on each instance to update its capacity.
'''
extra_vars = dict(
cutoff_pk=cutoff_pk,
)
hostname_string = ''
for instance in instance_qs:
hostname_string += '{},'.format(instance.hostname)
args = ['ansible-playbook', '-u', settings.AWX_ISOLATED_USERNAME, '-i',
hostname_string, 'heartbeat_isolated.yml', '-e',
json.dumps(extra_vars)]
module_path = os.path.join(os.path.dirname(awx.__file__), 'lib', 'management_modules')
playbook_path = os.path.join(os.path.dirname(awx.__file__), 'playbooks')
env = {
'ANSIBLE_LIBRARY': module_path,
'ANSIBLE_STDOUT_CALLBACK': 'json'
}
hostname_string, 'heartbeat_isolated.yml']
env = cls._base_management_env()
env['ANSIBLE_LIBRARY'] = os.path.join(os.path.dirname(awx.__file__), 'lib', 'management_modules')
env['ANSIBLE_STDOUT_CALLBACK'] = 'json'
buff = cStringIO.StringIO()
status, rc = run.run_pexpect(
args, playbook_path, env, buff,
args, cls.awx_playbook_path(), env, buff,
idle_timeout=60, job_timeout=60,
pexpect_timeout=5
)
output = buff.getvalue()
output = output[output.find('{'):] # Remove starting log statements
result = json.loads(output)
buff.close()
try:
result = json.loads(output)
if not isinstance(result, dict):
raise TypeError('Expected a dict but received {}.'.format(str(type(result))))
except (ValueError, AssertionError, TypeError):
logger.exception('Failed to read status from isolated instances, output:\n {}'.format(output))
return
for instance in instance_qs:
task_result = result['plays'][0]['tasks'][0]['hosts'][instance.hostname]
try:
task_result = result['plays'][0]['tasks'][0]['hosts'][instance.hostname]
except (KeyError, IndexError):
logger.exception('Failed to read status from isolated instance {}.'.format(instance.hostname))
continue
if 'capacity' in task_result:
instance.capacity = int(task_result['capacity'])
instance.save(update_fields=['capacity'])
elif 'msg' in task_result:
logger.warning('Could not update capacity of {}, msg={}'.format(instance.hostname, task_result['msg']))
else:
logger.warning('Could not update capacity of {}, msg={}'.format(
instance.hostname, task_result.get('msg', 'unknown failure')))
@staticmethod
def wrap_stdout_handle(instance, private_data_dir, stdout_handle):

View File

@ -726,18 +726,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
pass
super(UnifiedJob, self).delete()
@classmethod
def lowest_running_id(cls):
oldest_running_job = cls.objects.filter(status__in=ACTIVE_STATES).order_by('id').only('id').first()
if oldest_running_job is not None:
return oldest_running_job.id
else:
newest_finished_job = cls.objects.order_by('id').only('id').last()
if newest_finished_job is None:
return 1 # System has no finished jobs
else:
return newest_finished_job.id + 1
def copy_unified_job(self):
'''
Returns saved object, including related fields.

View File

@ -201,7 +201,7 @@ def tower_isolated_heartbeat(self):
local_hostname = settings.CLUSTER_HOST_ID
logger.debug("Controlling node checking for any isolated management tasks.")
poll_interval = settings.AWX_ISOLATED_PERIODIC_CHECK
# Add in some task buffer time
# Get isolated instances not checked since poll interval - some buffer
nowtime = now()
accept_before = nowtime - timedelta(seconds=(poll_interval - 10))
isolated_instance_qs = Instance.objects.filter(
@ -212,15 +212,12 @@ def tower_isolated_heartbeat(self):
with transaction.atomic():
for isolated_instance in isolated_instance_qs:
isolated_instance.last_isolated_check = nowtime
# Prevent modified time from being changed, as in normal heartbeat
isolated_instance.save(update_fields=['last_isolated_check'])
# Find the oldest job in the system and pass that to the cleanup
if not isolated_instance_qs:
return
cutoff_pk = UnifiedJob.lowest_running_id()
# Slow pass looping over isolated IGs and their isolated instances
if len(isolated_instance_qs) > 0:
logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs])))
isolated_manager.IsolatedManager.health_check(isolated_instance_qs, cutoff_pk=cutoff_pk)
isolated_manager.IsolatedManager.health_check(isolated_instance_qs)
@task(bind=True, queue='tower')

View File

@ -4,7 +4,7 @@ import pytest
from django.contrib.contenttypes.models import ContentType
# AWX
from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project, UnifiedJob
from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project
@pytest.mark.django_db
@ -65,16 +65,3 @@ class TestCreateUnifiedJob:
assert second_job.inventory == job_with_links.inventory
assert second_job.limit == 'my_server'
assert net_credential in second_job.extra_credentials.all()
@pytest.mark.django_db
def test_lowest_running_id():
assert UnifiedJob.lowest_running_id() == 1
Job.objects.create(status='finished')
old_job = Job.objects.create(status='finished')
assert UnifiedJob.lowest_running_id() == old_job.id + 1
old_running_job = Job.objects.create(status='running')
Job.objects.create(status='running')
assert UnifiedJob.lowest_running_id() == old_running_job.id
Job.objects.create(status='finished')
assert UnifiedJob.lowest_running_id() == old_running_job.id

View File

@ -104,8 +104,7 @@ class TestIsolatedManagementTask:
@pytest.fixture
def needs_updating(self, control_group):
ig = InstanceGroup.objects.create(name='thepentagon', controller=control_group)
inst = ig.instances.create(
hostname='isolated', capacity=103)
inst = ig.instances.create(hostname='isolated', capacity=103)
inst.last_isolated_check=now() - timedelta(seconds=MockSettings.AWX_ISOLATED_PERIODIC_CHECK)
inst.save()
return ig
@ -113,25 +112,25 @@ class TestIsolatedManagementTask:
@pytest.fixture
def just_updated(self, control_group):
ig = InstanceGroup.objects.create(name='thepentagon', controller=control_group)
inst = ig.instances.create(
hostname='isolated', capacity=103)
inst = ig.instances.create(hostname='isolated', capacity=103)
inst.last_isolated_check=now()
inst.save()
return inst
def test_takes_action(self, control_instance, needs_updating):
original_isolated_instance = needs_updating.instances.all().first()
with mock.patch('awx.main.tasks.settings', MockSettings()):
with mock.patch.object(isolated_manager.IsolatedManager, 'health_check') as check_mock:
check_mock.return_value = 98
tower_isolated_heartbeat()
iso_instance = Instance.objects.get(hostname='isolated')
check_mock.assert_called_once_with(iso_instance, cutoff_pk=mock.ANY)
assert iso_instance.capacity == 98
call_args, _ = check_mock.call_args
assert call_args[0][0] == iso_instance
assert iso_instance.last_isolated_check > original_isolated_instance.last_isolated_check
assert iso_instance.modified == original_isolated_instance.modified
def test_does_not_take_action(self, control_instance, just_updated):
with mock.patch('awx.main.tasks.settings', MockSettings()):
with mock.patch.object(isolated_manager.IsolatedManager, 'health_check') as check_mock:
check_mock.return_value = 98
tower_isolated_heartbeat()
iso_instance = Instance.objects.get(hostname='isolated')
check_mock.assert_not_called()

View File

@ -545,8 +545,8 @@ def get_system_task_capacity():
return settings.SYSTEM_TASK_CAPACITY
try:
out = subprocess.check_output(['free', '-m'])
except subprocess.CalledProcessError as e:
logger.error('Problem obtaining capacity from system, error:\n{}'.format(str(e)))
except subprocess.CalledProcessError:
logger.exception('Problem obtaining capacity from system.')
return 0
total_mem_value = out.split()[7]
if int(total_mem_value) <= 2048:

View File

@ -1,8 +1,4 @@
---
# The following variables will be set by the runner of this playbook:
# job_id_cutoff: <pk>
- hosts: all
gather_facts: false
@ -10,11 +6,6 @@
- name: Get capacity of the instance
tower_capacity:
register: result
- name: Print capacity in escaped string to scrape
debug: msg="{{ start_delimiter|default('') }}{{ result['capacity'] }}{{ end_delimiter|default('') }}"
- name: Remove any stale temporary files
tower_isolated_cleanup:
cutoff_pk: "{{ cutoff_pk | default(0) }}"