setup playbook and heartbeat for isolated deployments

* Allow isolated_group_ use in setup playbook
* Tweaks to host/queue registration commands complementing setup
* Create isolated heartbeat task and check capacity
* Add content about isolated instances to acceptance docs
This commit is contained in:
AlanCoding 2017-06-12 08:43:46 -04:00
parent 8061667ace
commit dd1a261bc3
17 changed files with 325 additions and 19 deletions

View File

@ -950,10 +950,10 @@ docker-isolated:
TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml create
docker start tools_tower_1
docker start tools_isolated_1
if [ "`docker exec -i -t tools_isolated_1 cat /root/.ssh/authorized_keys`" != "" ]; then \
if [ "`docker exec -i -t tools_isolated_1 cat /root/.ssh/authorized_keys`" == "`docker exec -t tools_tower_1 cat /root/.ssh/id_rsa.pub`" ]; then \
echo "SSH keys already copied to isolated instance"; \
else \
docker exec "tools_isolated_1" bash -c "mkdir -p /root/.ssh && echo $$(docker exec -t tools_tower_1 cat /root/.ssh/id_rsa.pub) >> /root/.ssh/authorized_keys"; \
docker exec "tools_isolated_1" bash -c "mkdir -p /root/.ssh && rm -f /root/.ssh/authorized_keys && echo $$(docker exec -t tools_tower_1 cat /root/.ssh/id_rsa.pub) >> /root/.ssh/authorized_keys"; \
fi
TAG=$(COMPOSE_TAG) docker-compose -f tools/docker-compose.yml -f tools/docker-isolated-override.yml up

View File

@ -0,0 +1,40 @@
# Copyright (c) 2017 Ansible by Red Hat
#
# This file is part of Ansible Tower, but depends on code imported from Ansible.
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
from ansible.module_utils.basic import AnsibleModule
import subprocess
def main():
module = AnsibleModule(
argument_spec = dict()
)
# Duplicated with awx.main.utils.common.get_system_task_capacity
proc = subprocess.Popen(['free', '-m'], stdout=subprocess.PIPE)
out,err = proc.communicate()
total_mem_value = out.split()[7]
if int(total_mem_value) <= 2048:
cap = 50
cap = 50 + ((int(total_mem_value) / 1024) - 2) * 75
# Module never results in a change and (hopefully) never fails
module.exit_json(changed=False, capacity=cap)
if __name__ == '__main__':
main()

View File

@ -304,6 +304,40 @@ class IsolatedManager(object):
# stdout_handle is closed by this point so writing output to logs is our only option
logger.warning('Cleanup from isolated job encountered error, output:\n{}'.format(buff.getvalue()))
@staticmethod
def health_check(instance, cutoff_pk=0):
'''
:param instance: Django object representing the isolated instance
:param cutoff_pk: Job id of the oldest job still in the running state
Method logic not yet written.
returns the instance's capacity or None if it is not reachable
'''
start_delimiter = 'wNqCXG6uul'
end_delimiter = 'n6kmoFyyAP'
extra_vars = dict(
cutoff_pk=cutoff_pk,
start_delimiter=start_delimiter,
end_delimiter=end_delimiter
)
args = ['ansible-playbook', '-u', settings.AWX_ISOLATED_USERNAME, '-i',
'%s,' % instance.hostname, 'heartbeat_isolated.yml', '-e',
json.dumps(extra_vars)]
module_path = os.path.join(os.path.dirname(awx.__file__), 'lib', 'management_modules')
playbook_path = os.path.join(os.path.dirname(awx.__file__), 'playbooks')
env = {'ANSIBLE_LIBRARY': module_path}
buff = cStringIO.StringIO()
status, rc = run.run_pexpect(
args, playbook_path, env, buff,
idle_timeout=60, job_timeout=60,
pexpect_timeout=5
)
output = buff.getvalue()
if status != 'successful':
return 0 # recognized by task manager as 'unreachable'
result = re.search('{}(.*){}'.format(start_delimiter, end_delimiter), output)
cap = result.group(1)
return cap
@staticmethod
def wrap_stdout_handle(instance, private_data_dir, stdout_handle):
dispatcher = CallbackQueueDispatcher()

View File

@ -23,11 +23,13 @@ class Command(BaseCommand):
instance = Instance.objects.filter(hostname=options.get('name'))
if instance.exists():
instance.delete()
print("Instance Removed")
result = subprocess.Popen("rabbitmqctl forget_cluster_node rabbitmq@{}".format(options.get('name')), shell=True).wait()
if result != 0:
print("Node deprovisioning may have failed when attempting to remove the RabbitMQ instance from the cluster")
else:
print('Successfully deprovisioned {}'.format(options.get('name')))
print('(changed: True)')
else:
print('No instance found matching name {}'.format(options.get('name')))

View File

@ -17,14 +17,32 @@ class Command(BaseCommand):
option_list = BaseCommand.option_list + (
make_option('--hostname', dest='hostname', type='string',
help='Hostname used during provisioning'),
make_option('--hostnames', dest='hostnames', type='string',
help='Alternatively hostnames can be provided with '
'this option as a comma-Delimited list'),
)
def handle(self, **options):
uuid = settings.SYSTEM_UUID
instance = Instance.objects.filter(hostname=options.get('hostname'))
def _register_hostname(self, hostname):
if not hostname:
return
instance = Instance.objects.filter(hostname=hostname)
if instance.exists():
print("Instance already registered {}".format(instance[0]))
return
instance = Instance(uuid=uuid, hostname=options.get('hostname'))
instance = Instance(uuid=self.uuid, hostname=hostname)
instance.save()
print('Successfully registered instance {}'.format(instance))
print('Successfully registered instance {}'.format(hostname))
self.changed = True
def handle(self, **options):
self.uuid = settings.SYSTEM_UUID
self.changed = False
self._register_hostname(options.get('hostname'))
hostname_list = []
if options.get('hostnames'):
hostname_list = options.get('hostnames').split(",")
instance_list = [x.strip() for x in hostname_list if x]
for inst_name in instance_list:
self._register_hostname(inst_name)
if self.changed:
print('(changed: True)')

View File

@ -5,7 +5,7 @@ import sys
from awx.main.models import Instance, InstanceGroup
from optparse import make_option
from django.core.management.base import BaseCommand
from django.core.management.base import BaseCommand, CommandError
class Command(BaseCommand):
@ -20,34 +20,44 @@ class Command(BaseCommand):
)
def handle(self, **options):
if not options.get('queuename'):
raise CommandError("Specify `--queuename` to use this command.")
changed = False
ig = InstanceGroup.objects.filter(name=options.get('queuename'))
control_ig = None
if options.get('controller'):
control_ig = InstanceGroup.objects.filter(name=options.get('controller')).first()
if ig.exists():
print("Instance Group already registered {}".format(ig[0]))
print("Instance Group already registered {}".format(ig[0].name))
ig = ig[0]
if control_ig and ig.controller_id != control_ig.pk:
ig.controller = control_ig
ig.save()
print("Set controller group {} on {}.".format(control_ig, ig))
print("Set controller group {} on {}.".format(control_ig.name, ig.name))
changed = True
else:
print("Creating instance group {}".format(options.get('queuename')))
ig = InstanceGroup(name=options.get('queuename'))
if control_ig:
ig.controller = control_ig
ig.save()
changed = True
hostname_list = []
if options.get('hostnames'):
hostname_list = options.get('hostnames').split(",")
instance_list = [x.strip() for x in hostname_list]
instance_list = [x.strip() for x in hostname_list if x]
for inst_name in instance_list:
instance = Instance.objects.filter(hostname=inst_name)
if instance.exists() and instance not in ig.instances.all():
if instance.exists() and instance[0] not in ig.instances.all():
ig.instances.add(instance[0])
print("Added instance {} to {}".format(instance[0], ig))
print("Added instance {} to {}".format(instance[0].hostname, ig.name))
changed = True
elif not instance.exists():
print("Instance does not exist: {}".format(inst_name))
if changed:
print('(changed: True)')
sys.exit(1)
else:
print("Instance already registered {}".format(instance[0]))
print("Instance already registered {}".format(instance[0].hostname))
if changed:
print('(changed: True)')

View File

@ -30,3 +30,4 @@ class Command(BaseCommand):
ig = ig.first()
ig.delete()
print("Instance Group Removed")
print('(changed: True)')

View File

@ -48,4 +48,9 @@ class Migration(migrations.Migration):
name='instance_group',
field=models.ManyToManyField(to='main.InstanceGroup', blank=True),
),
migrations.AddField(
model_name='instance',
name='last_isolated_check',
field=models.DateTimeField(auto_now_add=True, null=True),
),
]

View File

@ -26,6 +26,11 @@ class Instance(models.Model):
hostname = models.CharField(max_length=250, unique=True)
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
last_isolated_check = models.DateTimeField(
null=True,
editable=False,
auto_now_add=True
)
version = models.CharField(max_length=24, blank=True)
capacity = models.PositiveIntegerField(
default=100,

View File

@ -726,6 +726,18 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
pass
super(UnifiedJob, self).delete()
@classmethod
def lowest_running_id(cls):
oldest_running_job = cls.objects.filter(status__in=ACTIVE_STATES).order_by('id').only('id').first()
if oldest_running_job is not None:
return oldest_running_job.id
else:
newest_finished_job = cls.objects.order_by('id').only('id').last()
if newest_finished_job is None:
return 1 # System has no finished jobs
else:
return newest_finished_job.id + 1
def copy_unified_job(self):
'''
Returns saved object, including related fields.

View File

@ -19,7 +19,6 @@ import traceback
import urlparse
import uuid
from distutils.version import LooseVersion as Version
from datetime import timedelta
import yaml
import fcntl
try:
@ -34,7 +33,7 @@ from celery.signals import celeryd_init, worker_process_init
# Django
from django.conf import settings
from django.db import transaction, DatabaseError, IntegrityError
from django.utils.timezone import now
from django.utils.timezone import now, timedelta
from django.utils.encoding import smart_str
from django.core.mail import send_mail
from django.contrib.auth.models import User
@ -197,6 +196,33 @@ def cluster_node_heartbeat(self):
stop_local_services(['uwsgi', 'celery', 'beat', 'callback', 'fact'])
@task(bind=True)
def tower_isolated_heartbeat(self):
local_hostname = settings.CLUSTER_HOST_ID
logger.debug("Controlling node checking for any isolated management tasks.")
poll_interval = settings.AWX_ISOLATED_PERIODIC_CHECK
# Add in some task buffer time
nowtime = now()
accept_before = nowtime - timedelta(seconds=(poll_interval - 10))
isolated_instance_qs = Instance.objects.filter(
rampart_groups__controller__instances__hostname=local_hostname,
last_isolated_check__lt=accept_before
)
# Fast pass of isolated instances, claiming the nodes to update
with transaction.atomic():
for isolated_instance in isolated_instance_qs:
isolated_instance.last_isolated_check = nowtime
isolated_instance.save(update_fields=['last_isolated_check'])
# Find the oldest job in the system and pass that to the cleanup
if not isolated_instance_qs:
return
cutoff_pk = UnifiedJob.lowest_running_id()
# Slow pass looping over isolated IGs and their isolated instances
for isolated_instance in isolated_instance_qs:
logger.debug("Managing isolated instance {}.".format(isolated_instance.hostname))
isolated_instance.capacity = isolated_manager.IsolatedManager.health_check(isolated_instance, cutoff_pk=cutoff_pk)
isolated_instance.save(update_fields=['capacity'])
@task(bind=True, queue='tower')
def tower_periodic_scheduler(self):

View File

@ -4,7 +4,7 @@ import pytest
from django.contrib.contenttypes.models import ContentType
# AWX
from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project
from awx.main.models import UnifiedJobTemplate, Job, JobTemplate, WorkflowJobTemplate, Project, UnifiedJob
@pytest.mark.django_db
@ -65,3 +65,16 @@ class TestCreateUnifiedJob:
assert second_job.inventory == job_with_links.inventory
assert second_job.limit == 'my_server'
assert net_credential in second_job.extra_credentials.all()
@pytest.mark.django_db
def test_lowest_running_id():
assert UnifiedJob.lowest_running_id() == 1
Job.objects.create(status='finished')
old_job = Job.objects.create(status='finished')
assert UnifiedJob.lowest_running_id() == old_job.id + 1
old_running_job = Job.objects.create(status='running')
Job.objects.create(status='running')
assert UnifiedJob.lowest_running_id() == old_running_job.id
Job.objects.create(status='finished')
assert UnifiedJob.lowest_running_id() == old_running_job.id

View File

@ -2,8 +2,17 @@ import pytest
import mock
import os
from awx.main.tasks import RunProjectUpdate, RunInventoryUpdate
from awx.main.models import ProjectUpdate, InventoryUpdate, InventorySource
from django.utils.timezone import now, timedelta
from awx.main.tasks import (
RunProjectUpdate, RunInventoryUpdate,
tower_isolated_heartbeat,
isolated_manager
)
from awx.main.models import (
ProjectUpdate, InventoryUpdate, InventorySource,
Instance, InstanceGroup
)
@pytest.fixture
@ -73,3 +82,57 @@ class TestDependentInventoryUpdate:
# Verify that it bails after 1st update, detecting a cancel
assert is2.inventory_updates.count() == 0
iu_run_mock.assert_called_once()
class MockSettings:
AWX_ISOLATED_PERIODIC_CHECK = 60
CLUSTER_HOST_ID = 'tower_1'
@pytest.mark.django_db
class TestIsolatedManagementTask:
@pytest.fixture
def control_group(self):
return InstanceGroup.objects.create(name='alpha')
@pytest.fixture
def control_instance(self, control_group):
return control_group.instances.create(hostname='tower_1')
@pytest.fixture
def needs_updating(self, control_group):
ig = InstanceGroup.objects.create(name='thepentagon', controller=control_group)
inst = ig.instances.create(
hostname='isolated', capacity=103)
inst.last_isolated_check=now() - timedelta(seconds=MockSettings.AWX_ISOLATED_PERIODIC_CHECK)
inst.save()
return ig
@pytest.fixture
def just_updated(self, control_group):
ig = InstanceGroup.objects.create(name='thepentagon', controller=control_group)
inst = ig.instances.create(
hostname='isolated', capacity=103)
inst.last_isolated_check=now()
inst.save()
return inst
def test_takes_action(self, control_instance, needs_updating):
with mock.patch('awx.main.tasks.settings', MockSettings()):
with mock.patch.object(isolated_manager.IsolatedManager, 'health_check') as check_mock:
check_mock.return_value = 98
tower_isolated_heartbeat()
iso_instance = Instance.objects.get(hostname='isolated')
check_mock.assert_called_once_with(iso_instance, cutoff_pk=mock.ANY)
assert iso_instance.capacity == 98
def test_does_not_take_action(self, control_instance, just_updated):
with mock.patch('awx.main.tasks.settings', MockSettings()):
with mock.patch.object(isolated_manager.IsolatedManager, 'health_check') as check_mock:
check_mock.return_value = 98
tower_isolated_heartbeat()
iso_instance = Instance.objects.get(hostname='isolated')
check_mock.assert_not_called()
assert iso_instance.capacity == 103

View File

@ -0,0 +1,16 @@
---
# The following variables will be set by the runner of this playbook:
# job_id_cutoff: <pk>
- hosts: all
gather_facts: false
tasks:
- name: Get capacity of the instance
tower_capacity:
register: result
- name: Print capacity in escaped string to scrape out
debug: msg="{{ start_delimiter|default('') }}{{ result['capacity'] }}{{ end_delimiter|default('') }}"

View File

@ -609,6 +609,8 @@ AWX_ISOLATED_CHECK_INTERVAL = 30
# The timeout (in seconds) for launching jobs on isolated nodes
AWX_ISOLATED_LAUNCH_TIMEOUT = 600
# The time between the background isolated heartbeat status check
AWX_ISOLATED_PERIODIC_CHECK = 600
# Enable Pendo on the UI, possible values are 'off', 'anonymous', and 'detailed'
# Note: This setting may be overridden by database settings.

View File

@ -114,6 +114,13 @@ except ImportError:
CLUSTER_HOST_ID = socket.gethostname()
CELERY_ROUTES['awx.main.tasks.cluster_node_heartbeat'] = {'queue': CLUSTER_HOST_ID, 'routing_key': CLUSTER_HOST_ID}
# Production only runs this schedule on controlling nodes
# but development will just run it on all nodes
CELERYBEAT_SCHEDULE['isolated_heartbeat'] = {
'task': 'awx.main.tasks.tower_isolated_heartbeat',
'schedule': timedelta(seconds = AWX_ISOLATED_PERIODIC_CHECK),
'options': {'expires': AWX_ISOLATED_PERIODIC_CHECK * 2,}
}
# Supervisor service name dictionary used for programatic restart
SERVICE_NAME_DICT = {

View File

@ -112,6 +112,58 @@ rabbitmq_use_long_name=false
rabbitmq_enable_manager=false
```
### Security Isolated Rampart Groups
In Tower versions 3.2+ customers may optionally define isolated groups
inside security-restricted networking zones to run jobs from.
Instances in these groups will _not_ have a full install of Tower, but will have a minimal
set of utilities used to run jobs on them. These must be specified
in the inventory file prefixed with `isolated_group_`. An example inventory
file is shown below.
```
[tower]
towerA
towerB
towerC
[instance_group_security]
towerB
towerC
[isolated_group_govcloud]
isolatedA
isolatedB
[isolated_group_govcloud:vars]
controller=security
```
In this example, when a job runs inside of the `govcloud` isolated group, a
managing task runs simultaneously on either one of the two instances in
the `security` ordinary instance group.
Networking security rules must allow
connections to all nodes in an isolated group from all nodes in its controller
group. The system is designed such that
isolated instances never make requests to any of their controllers.
The controlling instance for a particular job will send management commands to
a daemon that runs the job, and will slurp job artifacts.
Isolated groups are architected such that they may exist inside of a VPC
with security rules that _only_ permit the instances in its `controller`
group to access them.
Recommendations for system configuration with isolated groups:
- Do not put any isolated instances inside the `tower` group or other
ordinary instance groups.
- Define the `controller` variable as either a group var or as a hostvar
on all the instances in the isolated group. Please _do not_ allow
isolated instances in the same group have a different value for this
variable - the behavior in this case can not be predicted.
- Do not put an isolated instance in more than 1 isolated group.
### Provisioning and Deprovisioning Instances and Groups
* Provisioning