diff --git a/awx/lib/management_modules/tower_capacity.py b/awx/lib/management_modules/tower_capacity.py index 2c3785fe1a..03bbb0cecd 100644 --- a/awx/lib/management_modules/tower_capacity.py +++ b/awx/lib/management_modules/tower_capacity.py @@ -35,7 +35,7 @@ def main(): cap = 50 cap = 50 + ((int(total_mem_value) / 1024) - 2) * 75 - # Module never results in a change and (hopefully) never fails + # Module never results in a change module.exit_json(changed=False, capacity=cap) diff --git a/awx/lib/management_modules/tower_isolated_cleanup.py b/awx/lib/management_modules/tower_isolated_cleanup.py index 94b4f4063a..529a24fd9d 100644 --- a/awx/lib/management_modules/tower_isolated_cleanup.py +++ b/awx/lib/management_modules/tower_isolated_cleanup.py @@ -22,53 +22,48 @@ import os import re import shutil import datetime +import subprocess def main(): module = AnsibleModule( - argument_spec = dict( - cutoff_pk = dict(required=False, default=0, type='int'), - ) + argument_spec = dict() ) - cutoff_pk = module.params.get('cutoff_pk') changed = False - jobs_removed = set([]) + paths_removed = set([]) - cutoff_time = datetime.datetime.now() - datetime.timedelta(days=7) + # If a folder was last modified before this datetime, it will always be deleted + folder_cutoff = datetime.datetime.now() - datetime.timedelta(days=7) + # If a folder does not have an associated job running and is older than + # this datetime, then it will be deleted because its job has finished + job_cutoff = datetime.datetime.now() - datetime.timedelta(hours=1) - for search_pattern, extract_pattern in [ - ('/tmp/ansible_tower/jobs/*', r'\/tmp\/ansible_tower\/jobs\/(?P\d+)'), - ('/tmp/ansible_tower_*', r'\/tmp\/ansible_tower_(?P\d+)_*'), - ('/tmp/ansible_tower_proot_*', None), + for search_pattern in [ + '/tmp/ansible_tower_[0-9]*_*', '/tmp/ansible_tower_proot_*', ]: for path in glob.iglob(search_pattern): st = os.stat(path) modtime = datetime.datetime.fromtimestamp(st.st_mtime) - if modtime > cutoff_time: - # If job's pk value is lower than threshold, we delete it + + if modtime > job_cutoff: + continue + elif modtime > folder_cutoff: try: - if extract_pattern is None: - continue - re_match = re.match(extract_pattern, path) - if re_match is None: - continue - job_id = int(re_match.group('job_id')) - if job_id >= cutoff_pk: - module.debug('Skipping job {}, which may still be running.'.format(job_id)) - continue + re_match = re.match(r'\/tmp\/ansible_tower_\d+_.+', path) + if re_match is not None: + if subprocess.check_call(['tower-expect', 'is-alive', path]) == 0: + continue + else: + module.debug('Deleting path {} its job has completed.'.format(path)) except (ValueError, IndexError): continue else: module.debug('Deleting path {} because modification date is too old.'.format(path)) - job_id = 'unknown' changed = True - jobs_removed.add(job_id) - if os.path.islink(path): - os.remove(path) - else: - shutil.rmtree(path) + paths_removed.add(path) + shutil.rmtree(path) - module.exit_json(changed=changed, jobs_removed=[j for j in jobs_removed]) + module.exit_json(changed=changed, paths_removed=list(paths_removed)) if __name__ == '__main__': diff --git a/awx/main/isolated/isolated_manager.py b/awx/main/isolated/isolated_manager.py index 1a08b98549..139231271b 100644 --- a/awx/main/isolated/isolated_manager.py +++ b/awx/main/isolated/isolated_manager.py @@ -57,9 +57,7 @@ class IsolatedManager(object): self.cwd = cwd self.env = env.copy() # Do not use callbacks for controller's management jobs - self.env['ANSIBLE_CALLBACK_PLUGINS'] = '' - self.env['CALLBACK_QUEUE'] = '' - self.env['CALLBACK_CONNECTION'] = '' + self.env.update(self._base_management_env()) self.stdout_handle = stdout_handle self.ssh_key_path = ssh_key_path self.expect_passwords = {k.pattern: v for k, v in expect_passwords.items()} @@ -71,8 +69,18 @@ class IsolatedManager(object): self.proot_cmd = proot_cmd self.started_at = None - @property - def awx_playbook_path(self): + @staticmethod + def _base_management_env(): + return { + 'ANSIBLE_CALLBACK_PLUGINS': '', + 'CALLBACK_QUEUE': '', + 'CALLBACK_CONNECTION': '', + 'ANSIBLE_RETRY_FILES_ENABLED': 'False', + 'ANSIBLE_HOST_KEY_CHECKING': 'False' + } + + @classmethod + def awx_playbook_path(cls): return os.path.join( os.path.dirname(awx.__file__), 'playbooks' @@ -134,7 +142,7 @@ class IsolatedManager(object): buff = StringIO.StringIO() logger.debug('Starting job on isolated host with `run_isolated.yml` playbook.') status, rc = run.run_pexpect( - args, self.awx_playbook_path, self.env, buff, + args, self.awx_playbook_path(), self.env, buff, expect_passwords={ re.compile(r'Secret:\s*?$', re.M): base64.b64encode(json.dumps(secrets)) }, @@ -244,7 +252,7 @@ class IsolatedManager(object): buff = cStringIO.StringIO() logger.debug('Checking job on isolated host with `check_isolated.yml` playbook.') status, rc = run.run_pexpect( - args, self.awx_playbook_path, self.env, buff, + args, self.awx_playbook_path(), self.env, buff, cancelled_callback=self.cancelled_callback, idle_timeout=remaining, job_timeout=remaining, @@ -295,7 +303,7 @@ class IsolatedManager(object): logger.debug('Cleaning up job on isolated host with `clean_isolated.yml` playbook.') buff = cStringIO.StringIO() status, rc = run.run_pexpect( - args, self.awx_playbook_path, self.env, buff, + args, self.awx_playbook_path(), self.env, buff, idle_timeout=60, job_timeout=60, pexpect_timeout=5 ) @@ -304,46 +312,55 @@ class IsolatedManager(object): # stdout_handle is closed by this point so writing output to logs is our only option logger.warning('Cleanup from isolated job encountered error, output:\n{}'.format(buff.getvalue())) - @staticmethod - def health_check(instance_qs, cutoff_pk=0): + @classmethod + def health_check(cls, instance_qs): ''' :param instance_qs: List of Django objects representing the isolated instances to manage - :param cutoff_pk: Job id of the oldest job still in the running state - Method logic not yet written. - returns the instance's capacity or None if it is not reachable + Runs playbook that will + - determine if instance is reachable + - find the instance capacity + - clean up orphaned private files + Performs save on each instance to update its capacity. ''' - extra_vars = dict( - cutoff_pk=cutoff_pk, - ) hostname_string = '' for instance in instance_qs: hostname_string += '{},'.format(instance.hostname) args = ['ansible-playbook', '-u', settings.AWX_ISOLATED_USERNAME, '-i', - hostname_string, 'heartbeat_isolated.yml', '-e', - json.dumps(extra_vars)] - module_path = os.path.join(os.path.dirname(awx.__file__), 'lib', 'management_modules') - playbook_path = os.path.join(os.path.dirname(awx.__file__), 'playbooks') - env = { - 'ANSIBLE_LIBRARY': module_path, - 'ANSIBLE_STDOUT_CALLBACK': 'json' - } + hostname_string, 'heartbeat_isolated.yml'] + env = cls._base_management_env() + env['ANSIBLE_LIBRARY'] = os.path.join(os.path.dirname(awx.__file__), 'lib', 'management_modules') + env['ANSIBLE_STDOUT_CALLBACK'] = 'json' + buff = cStringIO.StringIO() status, rc = run.run_pexpect( - args, playbook_path, env, buff, + args, cls.awx_playbook_path(), env, buff, idle_timeout=60, job_timeout=60, pexpect_timeout=5 ) output = buff.getvalue() - output = output[output.find('{'):] # Remove starting log statements - result = json.loads(output) + buff.close() + + try: + result = json.loads(output) + if not isinstance(result, dict): + raise TypeError('Expected a dict but received {}.'.format(str(type(result)))) + except (ValueError, AssertionError, TypeError): + logger.exception('Failed to read status from isolated instances, output:\n {}'.format(output)) + return + for instance in instance_qs: - task_result = result['plays'][0]['tasks'][0]['hosts'][instance.hostname] + try: + task_result = result['plays'][0]['tasks'][0]['hosts'][instance.hostname] + except (KeyError, IndexError): + logger.exception('Failed to read status from isolated instance {}.'.format(instance.hostname)) + continue if 'capacity' in task_result: instance.capacity = int(task_result['capacity']) instance.save(update_fields=['capacity']) - elif 'msg' in task_result: - logger.warning('Could not update capacity of {}, msg={}'.format(instance.hostname, task_result['msg'])) + else: + logger.warning('Could not update capacity of {}, msg={}'.format( + instance.hostname, task_result.get('msg', 'unknown failure'))) @staticmethod def wrap_stdout_handle(instance, private_data_dir, stdout_handle): diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index c8259c7590..55c53f1d8f 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -726,18 +726,6 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique pass super(UnifiedJob, self).delete() - @classmethod - def lowest_running_id(cls): - oldest_running_job = cls.objects.filter(status__in=ACTIVE_STATES).order_by('id').only('id').first() - if oldest_running_job is not None: - return oldest_running_job.id - else: - newest_finished_job = cls.objects.order_by('id').only('id').last() - if newest_finished_job is None: - return 1 # System has no finished jobs - else: - return newest_finished_job.id + 1 - def copy_unified_job(self): ''' Returns saved object, including related fields. diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 09e32c25f5..19bd26bb1f 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -201,7 +201,7 @@ def tower_isolated_heartbeat(self): local_hostname = settings.CLUSTER_HOST_ID logger.debug("Controlling node checking for any isolated management tasks.") poll_interval = settings.AWX_ISOLATED_PERIODIC_CHECK - # Add in some task buffer time + # Get isolated instances not checked since poll interval - some buffer nowtime = now() accept_before = nowtime - timedelta(seconds=(poll_interval - 10)) isolated_instance_qs = Instance.objects.filter( @@ -212,15 +212,12 @@ def tower_isolated_heartbeat(self): with transaction.atomic(): for isolated_instance in isolated_instance_qs: isolated_instance.last_isolated_check = nowtime + # Prevent modified time from being changed, as in normal heartbeat isolated_instance.save(update_fields=['last_isolated_check']) - # Find the oldest job in the system and pass that to the cleanup - if not isolated_instance_qs: - return - cutoff_pk = UnifiedJob.lowest_running_id() # Slow pass looping over isolated IGs and their isolated instances if len(isolated_instance_qs) > 0: logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs]))) - isolated_manager.IsolatedManager.health_check(isolated_instance_qs, cutoff_pk=cutoff_pk) + isolated_manager.IsolatedManager.health_check(isolated_instance_qs) @task(bind=True, queue='tower') diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index 358dfc02ab..5b89644457 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -4,7 +4,7 @@ import pytest from django.contrib.contenttypes.models import ContentType # AWX -from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project, UnifiedJob +from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project @pytest.mark.django_db @@ -65,16 +65,3 @@ class TestCreateUnifiedJob: assert second_job.inventory == job_with_links.inventory assert second_job.limit == 'my_server' assert net_credential in second_job.extra_credentials.all() - - -@pytest.mark.django_db -def test_lowest_running_id(): - assert UnifiedJob.lowest_running_id() == 1 - Job.objects.create(status='finished') - old_job = Job.objects.create(status='finished') - assert UnifiedJob.lowest_running_id() == old_job.id + 1 - old_running_job = Job.objects.create(status='running') - Job.objects.create(status='running') - assert UnifiedJob.lowest_running_id() == old_running_job.id - Job.objects.create(status='finished') - assert UnifiedJob.lowest_running_id() == old_running_job.id diff --git a/awx/main/tests/functional/test_tasks.py b/awx/main/tests/functional/test_tasks.py index da1f774047..065d979819 100644 --- a/awx/main/tests/functional/test_tasks.py +++ b/awx/main/tests/functional/test_tasks.py @@ -104,8 +104,7 @@ class TestIsolatedManagementTask: @pytest.fixture def needs_updating(self, control_group): ig = InstanceGroup.objects.create(name='thepentagon', controller=control_group) - inst = ig.instances.create( - hostname='isolated', capacity=103) + inst = ig.instances.create(hostname='isolated', capacity=103) inst.last_isolated_check=now() - timedelta(seconds=MockSettings.AWX_ISOLATED_PERIODIC_CHECK) inst.save() return ig @@ -113,25 +112,25 @@ class TestIsolatedManagementTask: @pytest.fixture def just_updated(self, control_group): ig = InstanceGroup.objects.create(name='thepentagon', controller=control_group) - inst = ig.instances.create( - hostname='isolated', capacity=103) + inst = ig.instances.create(hostname='isolated', capacity=103) inst.last_isolated_check=now() inst.save() return inst def test_takes_action(self, control_instance, needs_updating): + original_isolated_instance = needs_updating.instances.all().first() with mock.patch('awx.main.tasks.settings', MockSettings()): with mock.patch.object(isolated_manager.IsolatedManager, 'health_check') as check_mock: - check_mock.return_value = 98 tower_isolated_heartbeat() iso_instance = Instance.objects.get(hostname='isolated') - check_mock.assert_called_once_with(iso_instance, cutoff_pk=mock.ANY) - assert iso_instance.capacity == 98 + call_args, _ = check_mock.call_args + assert call_args[0][0] == iso_instance + assert iso_instance.last_isolated_check > original_isolated_instance.last_isolated_check + assert iso_instance.modified == original_isolated_instance.modified def test_does_not_take_action(self, control_instance, just_updated): with mock.patch('awx.main.tasks.settings', MockSettings()): with mock.patch.object(isolated_manager.IsolatedManager, 'health_check') as check_mock: - check_mock.return_value = 98 tower_isolated_heartbeat() iso_instance = Instance.objects.get(hostname='isolated') check_mock.assert_not_called() diff --git a/awx/main/utils/common.py b/awx/main/utils/common.py index d8c9e11f20..c10f44a800 100644 --- a/awx/main/utils/common.py +++ b/awx/main/utils/common.py @@ -545,8 +545,8 @@ def get_system_task_capacity(): return settings.SYSTEM_TASK_CAPACITY try: out = subprocess.check_output(['free', '-m']) - except subprocess.CalledProcessError as e: - logger.error('Problem obtaining capacity from system, error:\n{}'.format(str(e))) + except subprocess.CalledProcessError: + logger.exception('Problem obtaining capacity from system.') return 0 total_mem_value = out.split()[7] if int(total_mem_value) <= 2048: diff --git a/awx/playbooks/heartbeat_isolated.yml b/awx/playbooks/heartbeat_isolated.yml index 3232254a20..58b2f52b3c 100644 --- a/awx/playbooks/heartbeat_isolated.yml +++ b/awx/playbooks/heartbeat_isolated.yml @@ -1,8 +1,4 @@ --- - -# The following variables will be set by the runner of this playbook: -# job_id_cutoff: - - hosts: all gather_facts: false @@ -10,11 +6,6 @@ - name: Get capacity of the instance tower_capacity: - register: result - - - name: Print capacity in escaped string to scrape - debug: msg="{{ start_delimiter|default('') }}{{ result['capacity'] }}{{ end_delimiter|default('') }}" - name: Remove any stale temporary files tower_isolated_cleanup: - cutoff_pk: "{{ cutoff_pk | default(0) }}"