diff --git a/Makefile b/Makefile index 9d8d6f665b..b5e070b6ec 100644 --- a/Makefile +++ b/Makefile @@ -950,10 +950,10 @@ docker-isolated: TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml create docker start tools_tower_1 docker start tools_isolated_1 - if [ "`docker exec -i -t tools_isolated_1 cat /root/.ssh/authorized_keys`" != "" ]; then \ + if [ "`docker exec -i -t tools_isolated_1 cat /root/.ssh/authorized_keys`" == "`docker exec -t tools_tower_1 cat /root/.ssh/id_rsa.pub`" ]; then \ echo "SSH keys already copied to isolated instance"; \ else \ - docker exec "tools_isolated_1" bash -c "mkdir -p /root/.ssh && echo $$(docker exec -t tools_tower_1 cat /root/.ssh/id_rsa.pub) >> /root/.ssh/authorized_keys"; \ + docker exec "tools_isolated_1" bash -c "mkdir -p /root/.ssh && rm -f /root/.ssh/authorized_keys && echo $$(docker exec -t tools_tower_1 cat /root/.ssh/id_rsa.pub) >> /root/.ssh/authorized_keys"; \ fi TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml up diff --git a/awx/lib/management_modules/tower_capacity.py b/awx/lib/management_modules/tower_capacity.py new file mode 100644 index 0000000000..bd8dcab80e --- /dev/null +++ b/awx/lib/management_modules/tower_capacity.py @@ -0,0 +1,40 @@ +# Copyright (c) 2017 Ansible by Red Hat +# +# This file is part of Ansible Tower, but depends on code imported from Ansible. +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +from ansible.module_utils.basic import AnsibleModule + +import subprocess + + +def main(): + module = AnsibleModule( + argument_spec = dict() + ) + # Duplicated with awx.main.utils.common.get_system_task_capacity + proc = subprocess.Popen(['free', '-m'], stdout=subprocess.PIPE) + out,err = proc.communicate() + total_mem_value = out.split()[7] + if int(total_mem_value) <= 2048: + cap = 50 + cap = 50 + ((int(total_mem_value) / 1024) - 2) * 75 + + # Module never results in a change and (hopefully) never fails + module.exit_json(changed=False, capacity=cap) + + +if __name__ == '__main__': + main() diff --git a/awx/main/isolated/isolated_manager.py b/awx/main/isolated/isolated_manager.py index caa2e1db01..44aa11e1a3 100644 --- a/awx/main/isolated/isolated_manager.py +++ b/awx/main/isolated/isolated_manager.py @@ -304,6 +304,40 @@ class IsolatedManager(object): # stdout_handle is closed by this point so writing output to logs is our only option logger.warning('Cleanup from isolated job encountered error, output:\n{}'.format(buff.getvalue())) + @staticmethod + def health_check(instance, cutoff_pk=0): + ''' + :param instance: Django object representing the isolated instance + :param cutoff_pk: Job id of the oldest job still in the running state + Method logic not yet written. + returns the instance's capacity or None if it is not reachable + ''' + start_delimiter = 'wNqCXG6uul' + end_delimiter = 'n6kmoFyyAP' + extra_vars = dict( + cutoff_pk=cutoff_pk, + start_delimiter=start_delimiter, + end_delimiter=end_delimiter + ) + args = ['ansible-playbook', '-u', settings.AWX_ISOLATED_USERNAME, '-i', + '%s,' % instance.hostname, 'heartbeat_isolated.yml', '-e', + json.dumps(extra_vars)] + module_path = os.path.join(os.path.dirname(awx.__file__), 'lib', 'management_modules') + playbook_path = os.path.join(os.path.dirname(awx.__file__), 'playbooks') + env = {'ANSIBLE_LIBRARY': module_path} + buff = cStringIO.StringIO() + status, rc = run.run_pexpect( + args, playbook_path, env, buff, + idle_timeout=60, job_timeout=60, + pexpect_timeout=5 + ) + output = buff.getvalue() + if status != 'successful': + return 0 # recognized by task manager as 'unreachable' + result = re.search('{}(.*){}'.format(start_delimiter, end_delimiter), output) + cap = result.group(1) + return cap + @staticmethod def wrap_stdout_handle(instance, private_data_dir, stdout_handle): dispatcher = CallbackQueueDispatcher() diff --git a/awx/main/management/commands/deprovision_node.py b/awx/main/management/commands/deprovision_node.py index 8412b5bd86..d64a780434 100644 --- a/awx/main/management/commands/deprovision_node.py +++ b/awx/main/management/commands/deprovision_node.py @@ -23,11 +23,13 @@ class Command(BaseCommand): instance = Instance.objects.filter(hostname=options.get('name')) if instance.exists(): instance.delete() + print("Instance Removed") result = subprocess.Popen("rabbitmqctl forget_cluster_node rabbitmq@{}".format(options.get('name')), shell=True).wait() if result != 0: print("Node deprovisioning may have failed when attempting to remove the RabbitMQ instance from the cluster") else: print('Successfully deprovisioned {}'.format(options.get('name'))) + print('(changed: True)') else: print('No instance found matching name {}'.format(options.get('name'))) diff --git a/awx/main/management/commands/register_instance.py b/awx/main/management/commands/register_instance.py index 7ce6be787b..6895aa644f 100644 --- a/awx/main/management/commands/register_instance.py +++ b/awx/main/management/commands/register_instance.py @@ -17,14 +17,32 @@ class Command(BaseCommand): option_list = BaseCommand.option_list + ( make_option('--hostname', dest='hostname', type='string', help='Hostname used during provisioning'), + make_option('--hostnames', dest='hostnames', type='string', + help='Alternatively hostnames can be provided with ' + 'this option as a comma-Delimited list'), ) - def handle(self, **options): - uuid = settings.SYSTEM_UUID - instance = Instance.objects.filter(hostname=options.get('hostname')) + def _register_hostname(self, hostname): + if not hostname: + return + instance = Instance.objects.filter(hostname=hostname) if instance.exists(): print("Instance already registered {}".format(instance[0])) return - instance = Instance(uuid=uuid, hostname=options.get('hostname')) + instance = Instance(uuid=self.uuid, hostname=hostname) instance.save() - print('Successfully registered instance {}'.format(instance)) + print('Successfully registered instance {}'.format(hostname)) + self.changed = True + + def handle(self, **options): + self.uuid = settings.SYSTEM_UUID + self.changed = False + self._register_hostname(options.get('hostname')) + hostname_list = [] + if options.get('hostnames'): + hostname_list = options.get('hostnames').split(",") + instance_list = [x.strip() for x in hostname_list if x] + for inst_name in instance_list: + self._register_hostname(inst_name) + if self.changed: + print('(changed: True)') diff --git a/awx/main/management/commands/register_queue.py b/awx/main/management/commands/register_queue.py index e0ca862a37..3601b009d4 100644 --- a/awx/main/management/commands/register_queue.py +++ b/awx/main/management/commands/register_queue.py @@ -5,7 +5,7 @@ import sys from awx.main.models import Instance, InstanceGroup from optparse import make_option -from django.core.management.base import BaseCommand +from django.core.management.base import BaseCommand, CommandError class Command(BaseCommand): @@ -20,34 +20,44 @@ class Command(BaseCommand): ) def handle(self, **options): + if not options.get('queuename'): + raise CommandError("Specify `--queuename` to use this command.") + changed = False ig = InstanceGroup.objects.filter(name=options.get('queuename')) control_ig = None if options.get('controller'): control_ig = InstanceGroup.objects.filter(name=options.get('controller')).first() if ig.exists(): - print("Instance Group already registered {}".format(ig[0])) + print("Instance Group already registered {}".format(ig[0].name)) ig = ig[0] if control_ig and ig.controller_id != control_ig.pk: ig.controller = control_ig ig.save() - print("Set controller group {} on {}.".format(control_ig, ig)) + print("Set controller group {} on {}.".format(control_ig.name, ig.name)) + changed = True else: print("Creating instance group {}".format(options.get('queuename'))) ig = InstanceGroup(name=options.get('queuename')) if control_ig: ig.controller = control_ig ig.save() + changed = True hostname_list = [] if options.get('hostnames'): hostname_list = options.get('hostnames').split(",") - instance_list = [x.strip() for x in hostname_list] + instance_list = [x.strip() for x in hostname_list if x] for inst_name in instance_list: instance = Instance.objects.filter(hostname=inst_name) - if instance.exists() and instance not in ig.instances.all(): + if instance.exists() and instance[0] not in ig.instances.all(): ig.instances.add(instance[0]) - print("Added instance {} to {}".format(instance[0], ig)) + print("Added instance {} to {}".format(instance[0].hostname, ig.name)) + changed = True elif not instance.exists(): print("Instance does not exist: {}".format(inst_name)) + if changed: + print('(changed: True)') sys.exit(1) else: - print("Instance already registered {}".format(instance[0])) + print("Instance already registered {}".format(instance[0].hostname)) + if changed: + print('(changed: True)') diff --git a/awx/main/management/commands/unregister_queue.py b/awx/main/management/commands/unregister_queue.py index 388a8f0588..335ce38dbc 100644 --- a/awx/main/management/commands/unregister_queue.py +++ b/awx/main/management/commands/unregister_queue.py @@ -30,3 +30,4 @@ class Command(BaseCommand): ig = ig.first() ig.delete() print("Instance Group Removed") + print('(changed: True)') diff --git a/awx/main/migrations/0043_v320_instancegroups.py b/awx/main/migrations/0043_v320_instancegroups.py index bf0585acf6..3f7d279307 100644 --- a/awx/main/migrations/0043_v320_instancegroups.py +++ b/awx/main/migrations/0043_v320_instancegroups.py @@ -48,4 +48,9 @@ class Migration(migrations.Migration): name='instance_group', field=models.ManyToManyField(to='main.InstanceGroup', blank=True), ), + migrations.AddField( + model_name='instance', + name='last_isolated_check', + field=models.DateTimeField(auto_now_add=True, null=True), + ), ] diff --git a/awx/main/models/ha.py b/awx/main/models/ha.py index 134646d2cb..600a72af27 100644 --- a/awx/main/models/ha.py +++ b/awx/main/models/ha.py @@ -26,6 +26,11 @@ class Instance(models.Model): hostname = models.CharField(max_length=250, unique=True) created = models.DateTimeField(auto_now_add=True) modified = models.DateTimeField(auto_now=True) + last_isolated_check = models.DateTimeField( + null=True, + editable=False, + auto_now_add=True + ) version = models.CharField(max_length=24, blank=True) capacity = models.PositiveIntegerField( default=100, diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 55c53f1d8f..c8259c7590 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -726,6 +726,18 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique pass super(UnifiedJob, self).delete() + @classmethod + def lowest_running_id(cls): + oldest_running_job = cls.objects.filter(status__in=ACTIVE_STATES).order_by('id').only('id').first() + if oldest_running_job is not None: + return oldest_running_job.id + else: + newest_finished_job = cls.objects.order_by('id').only('id').last() + if newest_finished_job is None: + return 1 # System has no finished jobs + else: + return newest_finished_job.id + 1 + def copy_unified_job(self): ''' Returns saved object, including related fields. diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 8445e6ce19..27f16a5d38 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -19,7 +19,6 @@ import traceback import urlparse import uuid from distutils.version import LooseVersion as Version -from datetime import timedelta import yaml import fcntl try: @@ -34,7 +33,7 @@ from celery.signals import celeryd_init, worker_process_init # Django from django.conf import settings from django.db import transaction, DatabaseError, IntegrityError -from django.utils.timezone import now +from django.utils.timezone import now, timedelta from django.utils.encoding import smart_str from django.core.mail import send_mail from django.contrib.auth.models import User @@ -197,6 +196,33 @@ def cluster_node_heartbeat(self): stop_local_services(['uwsgi', 'celery', 'beat', 'callback', 'fact']) +@task(bind=True) +def tower_isolated_heartbeat(self): + local_hostname = settings.CLUSTER_HOST_ID + logger.debug("Controlling node checking for any isolated management tasks.") + poll_interval = settings.AWX_ISOLATED_PERIODIC_CHECK + # Add in some task buffer time + nowtime = now() + accept_before = nowtime - timedelta(seconds=(poll_interval - 10)) + isolated_instance_qs = Instance.objects.filter( + rampart_groups__controller__instances__hostname=local_hostname, + last_isolated_check__lt=accept_before + ) + # Fast pass of isolated instances, claiming the nodes to update + with transaction.atomic(): + for isolated_instance in isolated_instance_qs: + isolated_instance.last_isolated_check = nowtime + isolated_instance.save(update_fields=['last_isolated_check']) + # Find the oldest job in the system and pass that to the cleanup + if not isolated_instance_qs: + return + cutoff_pk = UnifiedJob.lowest_running_id() + # Slow pass looping over isolated IGs and their isolated instances + for isolated_instance in isolated_instance_qs: + logger.debug("Managing isolated instance {}.".format(isolated_instance.hostname)) + isolated_instance.capacity = isolated_manager.IsolatedManager.health_check(isolated_instance, cutoff_pk=cutoff_pk) + isolated_instance.save(update_fields=['capacity']) + @task(bind=True, queue='tower') def tower_periodic_scheduler(self): diff --git a/awx/main/tests/functional/models/test_unified_job.py b/awx/main/tests/functional/models/test_unified_job.py index 5b89644457..358dfc02ab 100644 --- a/awx/main/tests/functional/models/test_unified_job.py +++ b/awx/main/tests/functional/models/test_unified_job.py @@ -4,7 +4,7 @@ import pytest from django.contrib.contenttypes.models import ContentType # AWX -from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project +from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project, UnifiedJob @pytest.mark.django_db @@ -65,3 +65,16 @@ class TestCreateUnifiedJob: assert second_job.inventory == job_with_links.inventory assert second_job.limit == 'my_server' assert net_credential in second_job.extra_credentials.all() + + +@pytest.mark.django_db +def test_lowest_running_id(): + assert UnifiedJob.lowest_running_id() == 1 + Job.objects.create(status='finished') + old_job = Job.objects.create(status='finished') + assert UnifiedJob.lowest_running_id() == old_job.id + 1 + old_running_job = Job.objects.create(status='running') + Job.objects.create(status='running') + assert UnifiedJob.lowest_running_id() == old_running_job.id + Job.objects.create(status='finished') + assert UnifiedJob.lowest_running_id() == old_running_job.id diff --git a/awx/main/tests/functional/test_tasks.py b/awx/main/tests/functional/test_tasks.py index f09847c2d2..da1f774047 100644 --- a/awx/main/tests/functional/test_tasks.py +++ b/awx/main/tests/functional/test_tasks.py @@ -2,8 +2,17 @@ import pytest import mock import os -from awx.main.tasks import RunProjectUpdate, RunInventoryUpdate -from awx.main.models import ProjectUpdate, InventoryUpdate, InventorySource +from django.utils.timezone import now, timedelta + +from awx.main.tasks import ( + RunProjectUpdate, RunInventoryUpdate, + tower_isolated_heartbeat, + isolated_manager +) +from awx.main.models import ( + ProjectUpdate, InventoryUpdate, InventorySource, + Instance, InstanceGroup +) @pytest.fixture @@ -73,3 +82,57 @@ class TestDependentInventoryUpdate: # Verify that it bails after 1st update, detecting a cancel assert is2.inventory_updates.count() == 0 iu_run_mock.assert_called_once() + + + +class MockSettings: + AWX_ISOLATED_PERIODIC_CHECK = 60 + CLUSTER_HOST_ID = 'tower_1' + + +@pytest.mark.django_db +class TestIsolatedManagementTask: + + @pytest.fixture + def control_group(self): + return InstanceGroup.objects.create(name='alpha') + + @pytest.fixture + def control_instance(self, control_group): + return control_group.instances.create(hostname='tower_1') + + @pytest.fixture + def needs_updating(self, control_group): + ig = InstanceGroup.objects.create(name='thepentagon', controller=control_group) + inst = ig.instances.create( + hostname='isolated', capacity=103) + inst.last_isolated_check=now() - timedelta(seconds=MockSettings.AWX_ISOLATED_PERIODIC_CHECK) + inst.save() + return ig + + @pytest.fixture + def just_updated(self, control_group): + ig = InstanceGroup.objects.create(name='thepentagon', controller=control_group) + inst = ig.instances.create( + hostname='isolated', capacity=103) + inst.last_isolated_check=now() + inst.save() + return inst + + def test_takes_action(self, control_instance, needs_updating): + with mock.patch('awx.main.tasks.settings', MockSettings()): + with mock.patch.object(isolated_manager.IsolatedManager, 'health_check') as check_mock: + check_mock.return_value = 98 + tower_isolated_heartbeat() + iso_instance = Instance.objects.get(hostname='isolated') + check_mock.assert_called_once_with(iso_instance, cutoff_pk=mock.ANY) + assert iso_instance.capacity == 98 + + def test_does_not_take_action(self, control_instance, just_updated): + with mock.patch('awx.main.tasks.settings', MockSettings()): + with mock.patch.object(isolated_manager.IsolatedManager, 'health_check') as check_mock: + check_mock.return_value = 98 + tower_isolated_heartbeat() + iso_instance = Instance.objects.get(hostname='isolated') + check_mock.assert_not_called() + assert iso_instance.capacity == 103 diff --git a/awx/playbooks/heartbeat_isolated.yml b/awx/playbooks/heartbeat_isolated.yml new file mode 100644 index 0000000000..403e3d7439 --- /dev/null +++ b/awx/playbooks/heartbeat_isolated.yml @@ -0,0 +1,16 @@ +--- + +# The following variables will be set by the runner of this playbook: +# job_id_cutoff: + +- hosts: all + gather_facts: false + + tasks: + + - name: Get capacity of the instance + tower_capacity: + register: result + + - name: Print capacity in escaped string to scrape out + debug: msg="{{ start_delimiter|default('') }}{{ result['capacity'] }}{{ end_delimiter|default('') }}" diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 6530331863..537f5f07b6 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -609,6 +609,8 @@ AWX_ISOLATED_CHECK_INTERVAL = 30 # The timeout (in seconds) for launching jobs on isolated nodes AWX_ISOLATED_LAUNCH_TIMEOUT = 600 +# The time between the background isolated heartbeat status check +AWX_ISOLATED_PERIODIC_CHECK = 600 # Enable Pendo on the UI, possible values are 'off', 'anonymous', and 'detailed' # Note: This setting may be overridden by database settings. diff --git a/awx/settings/development.py b/awx/settings/development.py index 414bcfb48f..0edf353f6a 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -114,6 +114,13 @@ except ImportError: CLUSTER_HOST_ID = socket.gethostname() CELERY_ROUTES['awx.main.tasks.cluster_node_heartbeat'] = {'queue': CLUSTER_HOST_ID, 'routing_key': CLUSTER_HOST_ID} +# Production only runs this schedule on controlling nodes +# but development will just run it on all nodes +CELERYBEAT_SCHEDULE['isolated_heartbeat'] = { + 'task': 'awx.main.tasks.tower_isolated_heartbeat', + 'schedule': timedelta(seconds = AWX_ISOLATED_PERIODIC_CHECK), + 'options': {'expires': AWX_ISOLATED_PERIODIC_CHECK * 2,} +} # Supervisor service name dictionary used for programatic restart SERVICE_NAME_DICT = { diff --git a/docs/clustering.md b/docs/clustering.md index 56fe723cf0..05dba666a1 100644 --- a/docs/clustering.md +++ b/docs/clustering.md @@ -112,6 +112,58 @@ rabbitmq_use_long_name=false rabbitmq_enable_manager=false ``` +### Security Isolated Rampart Groups + +In Tower versions 3.2+ customers may optionally define isolated groups +inside security-restricted networking zones to run jobs from. +Instances in these groups will _not_ have a full install of Tower, but will have a minimal +set of utilities used to run jobs on them. These must be specified +in the inventory file prefixed with `isolated_group_`. An example inventory +file is shown below. + +``` +[tower] +towerA +towerB +towerC + +[instance_group_security] +towerB +towerC + +[isolated_group_govcloud] +isolatedA +isolatedB + +[isolated_group_govcloud:vars] +controller=security +``` + +In this example, when a job runs inside of the `govcloud` isolated group, a +managing task runs simultaneously on either one of the two instances in +the `security` ordinary instance group. + +Networking security rules must allow +connections to all nodes in an isolated group from all nodes in its controller +group. The system is designed such that +isolated instances never make requests to any of their controllers. +The controlling instance for a particular job will send management commands to +a daemon that runs the job, and will slurp job artifacts. + +Isolated groups are architected such that they may exist inside of a VPC +with security rules that _only_ permit the instances in its `controller` +group to access them. + +Recommendations for system configuration with isolated groups: + - Do not put any isolated instances inside the `tower` group or other + ordinary instance groups. + - Define the `controller` variable as either a group var or as a hostvar + on all the instances in the isolated group. Please _do not_ allow + isolated instances in the same group have a different value for this + variable - the behavior in this case can not be predicted. + - Do not put an isolated instance in more than 1 isolated group. + + ### Provisioning and Deprovisioning Instances and Groups * Provisioning