Merge pull request #10727 from ansible/mesh_code

Code changes to support execution-only nodes in receptor mesh
This commit is contained in:
Alan Rominger 2021-08-24 13:39:19 -04:00 committed by GitHub
commit e7dbe90cb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 745 additions and 335 deletions

View File

@ -173,7 +173,15 @@ init:
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(MANAGEMENT_COMMAND) provision_instance --hostname=$(COMPOSE_HOST); \
$(MANAGEMENT_COMMAND) register_queue --queuename=controlplane --instance_percent=100;
$(MANAGEMENT_COMMAND) register_queue --queuename=controlplane --instance_percent=100;\
if [ ! -f /etc/receptor/certs/awx.key ]; then \
rm -f /etc/receptor/certs/*; \
receptor --cert-init commonname="AWX Test CA" bits=2048 outcert=/etc/receptor/certs/ca.crt outkey=/etc/receptor/certs/ca.key; \
for node in $(RECEPTOR_MUTUAL_TLS); do \
receptor --cert-makereq bits=2048 commonname="$$node test cert" dnsname=$$node nodeid=$$node outreq=/etc/receptor/certs/$$node.csr outkey=/etc/receptor/certs/$$node.key; \
receptor --cert-signreq req=/etc/receptor/certs/$$node.csr cacert=/etc/receptor/certs/ca.crt cakey=/etc/receptor/certs/ca.key outcert=/etc/receptor/certs/$$node.crt verify=yes; \
done; \
fi
# Refresh development environment after pulling new code.
refresh: clean requirements_dev version_file develop migrate
@ -471,7 +479,8 @@ awx/projects:
COMPOSE_UP_OPTS ?=
COMPOSE_OPTS ?=
CLUSTER_NODE_COUNT ?= 1
CONTROL_PLANE_NODE_COUNT ?= 1
EXECUTION_NODE_COUNT ?= 2
MINIKUBE_CONTAINER_GROUP ?= false
docker-compose-sources: .git/hooks/pre-commit
@ -482,7 +491,8 @@ docker-compose-sources: .git/hooks/pre-commit
ansible-playbook -i tools/docker-compose/inventory tools/docker-compose/ansible/sources.yml \
-e awx_image=$(DEV_DOCKER_TAG_BASE)/awx_devel \
-e awx_image_tag=$(COMPOSE_TAG) \
-e cluster_node_count=$(CLUSTER_NODE_COUNT) \
-e control_plane_node_count=$(CONTROL_PLANE_NODE_COUNT) \
-e execution_node_count=$(EXECUTION_NODE_COUNT) \
-e minikube_container_group=$(MINIKUBE_CONTAINER_GROUP)

View File

@ -374,8 +374,8 @@ class InstanceDetail(RetrieveUpdateAPIView):
r = super(InstanceDetail, self).update(request, *args, **kwargs)
if status.is_success(r.status_code):
obj = self.get_object()
obj.refresh_capacity()
obj.save()
obj.set_capacity_value()
obj.save(update_fields=['capacity'])
r.data = serializers.InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj)
return r

View File

@ -77,3 +77,7 @@ LOGGER_BLOCKLIST = (
# loggers that may be called getting logging settings
'awx.conf',
)
# Reported version for node seen in receptor mesh but for which capacity check
# failed or is in progress
RECEPTOR_PENDING = 'ansible-runner-???'

View File

@ -10,6 +10,7 @@ from django.conf import settings
from awx.main.utils.filters import SmartFilter
from awx.main.utils.pglock import advisory_lock
from awx.main.constants import RECEPTOR_PENDING
___all__ = ['HostManager', 'InstanceManager', 'InstanceGroupManager', 'DeferJobCreatedManager']
@ -104,20 +105,18 @@ class InstanceManager(models.Manager):
"""Return the currently active instance."""
# If we are running unit tests, return a stub record.
if settings.IS_TESTING(sys.argv) or hasattr(sys, '_called_from_test'):
return self.model(id=1, hostname='localhost', uuid='00000000-0000-0000-0000-000000000000')
return self.model(id=1, hostname=settings.CLUSTER_HOST_ID, uuid='00000000-0000-0000-0000-000000000000')
node = self.filter(hostname=settings.CLUSTER_HOST_ID)
if node.exists():
return node[0]
raise RuntimeError("No instance found with the current cluster host id")
def register(self, uuid=None, hostname=None, ip_address=None, node_type=None):
def register(self, uuid=None, hostname=None, ip_address=None, node_type='hybrid', defaults=None):
if not uuid:
uuid = settings.SYSTEM_UUID
if not hostname:
hostname = settings.CLUSTER_HOST_ID
if not node_type:
node_type = "hybrid"
with advisory_lock('instance_registration_%s' % hostname):
if settings.AWX_AUTO_DEPROVISION_INSTANCES:
# detect any instances with the same IP address.
@ -130,6 +129,7 @@ class InstanceManager(models.Manager):
other_inst.save(update_fields=['ip_address'])
logger.warning("IP address {0} conflict detected, ip address unset for host {1}.".format(ip_address, other_hostname))
# Return existing instance that matches hostname
instance = self.filter(hostname=hostname)
if instance.exists():
instance = instance.get()
@ -145,7 +145,14 @@ class InstanceManager(models.Manager):
return (True, instance)
else:
return (False, instance)
instance = self.create(uuid=uuid, hostname=hostname, ip_address=ip_address, capacity=0, node_type=node_type)
# Create new instance, and fill in default values
create_defaults = dict(capacity=0, uuid=uuid)
if defaults is not None:
create_defaults.update(defaults)
if node_type == 'execution' and 'version' not in create_defaults:
create_defaults['version'] = RECEPTOR_PENDING
instance = self.create(hostname=hostname, ip_address=ip_address, node_type=node_type, **create_defaults)
return (True, instance)
def get_or_register(self):

View File

@ -0,0 +1,27 @@
# Generated by Django 2.2.20 on 2021-08-12 13:55
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0152_instance_node_type'),
]
operations = [
migrations.AddField(
model_name='instance',
name='last_seen',
field=models.DateTimeField(
editable=False,
help_text='Last time instance ran its heartbeat task for main cluster nodes. Last known connection to receptor mesh for execution nodes.',
null=True,
),
),
migrations.AlterField(
model_name='instance',
name='memory',
field=models.BigIntegerField(default=0, editable=False, help_text='Total system memory of this instance in bytes.'),
),
]

View File

@ -2,6 +2,8 @@
# All Rights Reserved.
from decimal import Decimal
import random
import logging
from django.core.validators import MinValueValidator
from django.db import models, connection
@ -20,11 +22,13 @@ from awx.main.managers import InstanceManager, InstanceGroupManager
from awx.main.fields import JSONField
from awx.main.models.base import BaseModel, HasEditsMixin, prevent_search
from awx.main.models.unified_jobs import UnifiedJob
from awx.main.utils import get_cpu_capacity, get_mem_capacity, get_system_task_capacity
from awx.main.utils.common import measure_cpu, get_corrected_cpu, get_cpu_effective_capacity, measure_memory, get_corrected_memory, get_mem_effective_capacity
from awx.main.models.mixins import RelatedJobsMixin
__all__ = ('Instance', 'InstanceGroup', 'TowerScheduleState')
logger = logging.getLogger('awx.main.models.ha')
class HasPolicyEditsMixin(HasEditsMixin):
class Meta:
@ -51,6 +55,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
objects = InstanceManager()
# Fields set in instance registration
uuid = models.CharField(max_length=40)
hostname = models.CharField(max_length=250, unique=True)
ip_address = models.CharField(
@ -60,16 +65,11 @@ class Instance(HasPolicyEditsMixin, BaseModel):
max_length=50,
unique=True,
)
# Auto-fields, implementation is different from BaseModel
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
# Fields defined in health check or heartbeat
version = models.CharField(max_length=120, blank=True)
capacity = models.PositiveIntegerField(
default=100,
editable=False,
)
capacity_adjustment = models.DecimalField(default=Decimal(1.0), max_digits=3, decimal_places=2, validators=[MinValueValidator(0)])
enabled = models.BooleanField(default=True)
managed_by_policy = models.BooleanField(default=True)
cpu = models.IntegerField(
default=0,
editable=False,
@ -77,7 +77,22 @@ class Instance(HasPolicyEditsMixin, BaseModel):
memory = models.BigIntegerField(
default=0,
editable=False,
help_text=_('Total system memory of this instance in bytes.'),
)
last_seen = models.DateTimeField(
null=True,
editable=False,
help_text=_('Last time instance ran its heartbeat task for main cluster nodes. Last known connection to receptor mesh for execution nodes.'),
)
# Capacity management
capacity = models.PositiveIntegerField(
default=100,
editable=False,
)
capacity_adjustment = models.DecimalField(default=Decimal(1.0), max_digits=3, decimal_places=2, validators=[MinValueValidator(0)])
enabled = models.BooleanField(default=True)
managed_by_policy = models.BooleanField(default=True)
cpu_capacity = models.IntegerField(
default=0,
editable=False,
@ -106,11 +121,6 @@ class Instance(HasPolicyEditsMixin, BaseModel):
def remaining_capacity(self):
return self.capacity - self.consumed_capacity
@property
def role(self):
# NOTE: TODO: Likely to repurpose this once standalone ramparts are a thing
return "awx"
@property
def jobs_running(self):
return UnifiedJob.objects.filter(
@ -125,33 +135,94 @@ class Instance(HasPolicyEditsMixin, BaseModel):
def jobs_total(self):
return UnifiedJob.objects.filter(execution_node=self.hostname).count()
@staticmethod
def choose_online_control_plane_node():
return random.choice(Instance.objects.filter(enabled=True).filter(node_type__in=['control', 'hybrid']).values_list('hostname', flat=True))
def is_lost(self, ref_time=None):
if self.last_seen is None:
return True
if ref_time is None:
ref_time = now()
grace_period = 120
return self.modified < ref_time - timedelta(seconds=grace_period)
grace_period = settings.CLUSTER_NODE_HEARTBEAT_PERIOD * 2
if self.node_type == 'execution':
grace_period += settings.RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD
return self.last_seen < ref_time - timedelta(seconds=grace_period)
def refresh_capacity(self):
cpu = get_cpu_capacity()
mem = get_mem_capacity()
def mark_offline(self, update_last_seen=False, perform_save=True):
if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and (not update_last_seen):
return
self.cpu_capacity = self.mem_capacity = self.capacity = 0
if update_last_seen:
self.last_seen = now()
if perform_save:
update_fields = ['capacity', 'cpu_capacity', 'mem_capacity']
if update_last_seen:
update_fields += ['last_seen']
self.save(update_fields=update_fields)
def set_capacity_value(self):
"""Sets capacity according to capacity adjustment rule (no save)"""
if self.enabled:
self.capacity = get_system_task_capacity(self.capacity_adjustment)
lower_cap = min(self.mem_capacity, self.cpu_capacity)
higher_cap = max(self.mem_capacity, self.cpu_capacity)
self.capacity = lower_cap + (higher_cap - lower_cap) * self.capacity_adjustment
else:
self.capacity = 0
def refresh_capacity_fields(self):
"""Update derived capacity fields from cpu and memory (no save)"""
self.cpu_capacity = get_cpu_effective_capacity(self.cpu)
self.mem_capacity = get_mem_effective_capacity(self.memory)
self.set_capacity_value()
def save_health_data(self, version, cpu, memory, uuid=None, last_seen=None, has_error=False):
update_fields = []
if last_seen is not None and self.last_seen != last_seen:
self.last_seen = last_seen
update_fields.append('last_seen')
if uuid is not None and self.uuid != uuid:
if self.uuid is not None:
logger.warn(f'Self-reported uuid of {self.hostname} changed from {self.uuid} to {uuid}')
self.uuid = uuid
update_fields.append('uuid')
if self.version != version:
self.version = version
update_fields.append('version')
new_cpu = get_corrected_cpu(cpu)
if new_cpu != self.cpu:
self.cpu = new_cpu
update_fields.append('cpu')
new_memory = get_corrected_memory(memory)
if new_memory != self.memory:
self.memory = new_memory
update_fields.append('memory')
if not has_error:
self.refresh_capacity_fields()
else:
self.mark_offline(perform_save=False)
update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity'])
self.save(update_fields=update_fields)
def local_health_check(self):
"""Only call this method on the instance that this record represents"""
has_error = False
try:
# if redis is down for some reason, that means we can't persist
# playbook event data; we should consider this a zero capacity event
redis.Redis.from_url(settings.BROKER_URL).ping()
except redis.ConnectionError:
self.capacity = 0
has_error = True
self.cpu = cpu[0]
self.memory = mem[0]
self.cpu_capacity = cpu[1]
self.mem_capacity = mem[1]
self.version = awx_application_version
self.save(update_fields=['capacity', 'version', 'modified', 'cpu', 'memory', 'cpu_capacity', 'mem_capacity'])
self.save_health_data(awx_application_version, measure_cpu(), measure_memory(), last_seen=now(), has_error=has_error)
class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
@ -198,6 +269,10 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
def capacity(self):
return sum([inst.capacity for inst in self.instances.all()])
@property
def execution_capacity(self):
return sum([inst.capacity for inst in self.instances.filter(node_type__in=['hybrid', 'execution'])])
@property
def jobs_running(self):
return UnifiedJob.objects.filter(status__in=('running', 'waiting'), instance_group=self).count()
@ -220,6 +295,8 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
def fit_task_to_most_remaining_capacity_instance(task, instances):
instance_most_capacity = None
for i in instances:
if i.node_type == 'control':
continue
if i.remaining_capacity >= task.task_impact and (
instance_most_capacity is None or i.remaining_capacity > instance_most_capacity.remaining_capacity
):

View File

@ -75,6 +75,7 @@ class TaskManager:
instances_partial = [
SimpleNamespace(
obj=instance,
node_type=instance.node_type,
remaining_capacity=instance.remaining_capacity,
capacity=instance.capacity,
jobs_running=instance.jobs_running,
@ -86,7 +87,7 @@ class TaskManager:
instances_by_hostname = {i.hostname: i for i in instances_partial}
for rampart_group in InstanceGroup.objects.prefetch_related('instances'):
self.graph[rampart_group.name] = dict(graph=DependencyGraph(), capacity_total=rampart_group.capacity, consumed_capacity=0, instances=[])
self.graph[rampart_group.name] = dict(graph=DependencyGraph(), capacity_total=rampart_group.execution_capacity, consumed_capacity=0, instances=[])
for instance in rampart_group.instances.filter(enabled=True).order_by('hostname'):
if instance.hostname in instances_by_hostname:
self.graph[rampart_group.name]['instances'].append(instances_by_hostname[instance.hostname])
@ -289,9 +290,14 @@ class TaskManager:
logger.debug('Submitting containerized {} to queue {}.'.format(task.log_format, task.execution_node))
else:
task.instance_group = rampart_group
if instance is not None:
task.execution_node = instance.hostname
logger.debug('Submitting {} to <instance group, instance> <{},{}>.'.format(task.log_format, task.instance_group_id, task.execution_node))
task.execution_node = instance.hostname
try:
controller_node = Instance.choose_online_control_plane_node()
except IndexError:
logger.warning("No control plane nodes available to manage {}".format(task.log_format))
return
task.controller_node = controller_node
logger.debug('Submitting job {} to queue {} controlled by {}.'.format(task.log_format, task.execution_node, task.controller_node))
with disable_activity_stream():
task.celery_task_id = str(uuid.uuid4())
task.save()

View File

@ -52,8 +52,8 @@ from gitdb.exc import BadName as BadGitName
# Runner
import ansible_runner
# Receptor
from receptorctl.socket_interface import ReceptorControl
# dateutil
from dateutil.parser import parse as parse_date
# AWX
from awx import __version__ as awx_application_version
@ -106,6 +106,7 @@ from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
from awx.main.utils.reload import stop_local_services
from awx.main.utils.pglock import advisory_lock
from awx.main.utils.handlers import SpecialInventoryHandler
from awx.main.utils.receptor import get_receptor_ctl, worker_info
from awx.main.consumers import emit_channel_notification
from awx.main import analytics
from awx.conf import settings_registry
@ -176,8 +177,7 @@ def dispatch_startup():
def inform_cluster_of_shutdown():
try:
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.capacity = 0 # No thank you to new jobs while shut down
this_inst.save(update_fields=['capacity', 'modified'])
this_inst.mark_offline(update_last_seen=True) # No thank you to new jobs while shut down
try:
reaper.reap(this_inst)
except Exception:
@ -399,6 +399,78 @@ def cleanup_execution_environment_images():
logger.debug(f"Failed to delete image {image_name}")
@task(queue=get_local_queuename)
def execution_node_health_check(node):
try:
instance = Instance.objects.get(hostname=node)
except Instance.DoesNotExist:
logger.warn(f'Instance record for {node} missing, could not check capacity.')
return
data = worker_info(node)
prior_capacity = instance.capacity
instance.save_health_data(
version='ansible-runner-' + data.get('runner_version', '???'),
cpu=data.get('cpu_count', 0),
memory=data.get('mem_in_bytes', 0),
uuid=data.get('uuid'),
has_error=bool(data.get('errors')),
)
if data['errors']:
formatted_error = "\n".join(data["errors"])
if prior_capacity:
logger.warn(f'Health check marking execution node {node} as lost, errors:\n{formatted_error}')
else:
logger.info(f'Failed to find capacity of new or lost execution node {node}, errors:\n{formatted_error}')
else:
logger.info('Set capacity of execution node {} to {}, worker info data:\n{}'.format(node, instance.capacity, json.dumps(data, indent=2)))
def inspect_execution_nodes(instance_list):
node_lookup = {}
for inst in instance_list:
if inst.node_type == 'execution':
node_lookup[inst.hostname] = inst
ctl = get_receptor_ctl()
connections = ctl.simple_command('status')['Advertisements']
nowtime = now()
for ad in connections:
hostname = ad['NodeID']
commands = ad.get('WorkCommands') or []
if 'ansible-runner' not in commands:
continue
changed = False
if hostname in node_lookup:
instance = node_lookup[hostname]
else:
(changed, instance) = Instance.objects.register(hostname=hostname, node_type='execution')
was_lost = instance.is_lost(ref_time=nowtime)
last_seen = parse_date(ad['Time'])
if instance.last_seen and instance.last_seen >= last_seen:
continue
instance.last_seen = last_seen
instance.save(update_fields=['last_seen'])
if changed:
logger.warn("Registered execution node '{}'".format(hostname))
execution_node_health_check.apply_async([hostname])
elif was_lost:
# if the instance *was* lost, but has appeared again,
# attempt to re-establish the initial capacity and version
# check
logger.warn(f'Execution node attempting to rejoin as instance {hostname}.')
execution_node_health_check.apply_async([hostname])
elif instance.capacity == 0:
# Periodically re-run the health check of errored nodes, in case someone fixed it
# TODO: perhaps decrease the frequency of these checks
logger.debug(f'Restarting health check for execution node {hostname} with known errors.')
execution_node_health_check.apply_async([hostname])
@task(queue=get_local_queuename)
def cluster_node_heartbeat():
logger.debug("Cluster node heartbeat task.")
@ -407,28 +479,34 @@ def cluster_node_heartbeat():
this_inst = None
lost_instances = []
(changed, instance) = Instance.objects.get_or_register()
if changed:
logger.info("Registered tower node '{}'".format(instance.hostname))
for inst in list(instance_list):
for inst in instance_list:
if inst.hostname == settings.CLUSTER_HOST_ID:
this_inst = inst
instance_list.remove(inst)
elif inst.is_lost(ref_time=nowtime):
break
else:
(changed, this_inst) = Instance.objects.get_or_register()
if changed:
logger.info("Registered tower control node '{}'".format(this_inst.hostname))
inspect_execution_nodes(instance_list)
for inst in list(instance_list):
if inst.is_lost(ref_time=nowtime):
lost_instances.append(inst)
instance_list.remove(inst)
if this_inst:
startup_event = this_inst.is_lost(ref_time=nowtime)
this_inst.refresh_capacity()
if startup_event:
this_inst.local_health_check()
if startup_event and this_inst.capacity != 0:
logger.warning('Rejoining the cluster as instance {}.'.format(this_inst.hostname))
return
else:
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
# IFF any node has a greater version than we do, then we'll shutdown services
for other_inst in instance_list:
if other_inst.version == "":
if other_inst.version == "" or other_inst.version.startswith('ansible-runner') or other_inst.node_type == 'execution':
continue
if Version(other_inst.version.split('-', 1)[0]) > Version(awx_application_version.split('-', 1)[0]) and not settings.DEBUG:
logger.error(
@ -440,6 +518,7 @@ def cluster_node_heartbeat():
# The heartbeat task will reset the capacity to the system capacity after upgrade.
stop_local_services(communicate=False)
raise RuntimeError("Shutting down.")
for other_inst in lost_instances:
try:
reaper.reap(other_inst)
@ -454,9 +533,8 @@ def cluster_node_heartbeat():
# If auto deprovisining is on, don't bother setting the capacity to 0
# since we will delete the node anyway.
if other_inst.capacity != 0 and not settings.AWX_AUTO_DEPROVISION_INSTANCES:
other_inst.capacity = 0
other_inst.save(update_fields=['capacity'])
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.modified))
other_inst.mark_offline()
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen))
elif settings.AWX_AUTO_DEPROVISION_INSTANCES:
deprovision_hostname = other_inst.hostname
other_inst.delete()
@ -752,10 +830,6 @@ def with_path_cleanup(f):
return _wrapped
def get_receptor_ctl():
return ReceptorControl('/var/run/receptor/receptor.sock')
class BaseTask(object):
model = None
event_model = None
@ -2927,18 +3001,18 @@ class AWXReceptorJob:
execution_environment_params = self.task.build_execution_environment_params(self.task.instance, runner_params['private_data_dir'])
self.runner_params['settings'].update(execution_environment_params)
def run(self):
def run(self, work_type=None):
# We establish a connection to the Receptor socket
receptor_ctl = get_receptor_ctl()
try:
return self._run_internal(receptor_ctl)
return self._run_internal(receptor_ctl, work_type=work_type)
finally:
# Make sure to always release the work unit if we established it
if self.unit_id is not None and settings.RECEPTOR_RELEASE_WORK:
receptor_ctl.simple_command(f"work release {self.unit_id}")
def _run_internal(self, receptor_ctl):
def _run_internal(self, receptor_ctl, work_type=None):
# Create a socketpair. Where the left side will be used for writing our payload
# (private data dir, kwargs). The right side will be passed to Receptor for
# reading.
@ -2949,7 +3023,14 @@ class AWXReceptorJob:
# submit our work, passing
# in the right side of our socketpair for reading.
result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params)
_kw = {}
work_type = work_type or self.work_type
if work_type == 'ansible-runner':
_kw['node'] = self.task.instance.execution_node
logger.debug(f'receptorctl.submit_work(node={_kw["node"]})')
else:
logger.debug(f'receptorctl.submit_work({work_type})')
result = receptor_ctl.submit_work(worktype=work_type, payload=sockout.makefile('rb'), params=self.receptor_params, **_kw)
self.unit_id = result['unitid']
self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid'])
@ -2997,7 +3078,13 @@ class AWXReceptorJob:
return res
if not self.task.instance.result_traceback:
raise RuntimeError(detail)
try:
resultsock = receptor_ctl.get_work_results(self.unit_id, return_sockfile=True)
lines = resultsock.readlines()
self.task.instance.result_traceback = b"".join(lines).decode()
self.task.instance.save(update_fields=['result_traceback'])
except Exception:
raise RuntimeError(detail)
return res
@ -3005,7 +3092,7 @@ class AWXReceptorJob:
# write our payload to the left side of our socketpair.
@cleanup_new_process
def transmit(self, _socket):
if not settings.IS_K8S and self.work_type == 'local':
if not settings.IS_K8S and self.work_type == 'local' and 'only_transmit_kwargs' not in self.runner_params:
self.runner_params['only_transmit_kwargs'] = True
try:
@ -3052,6 +3139,11 @@ class AWXReceptorJob:
work_type = 'kubernetes-runtime-auth'
else:
work_type = 'kubernetes-incluster-auth'
elif isinstance(self.task.instance, (Job, AdHocCommand)):
if self.task.instance.execution_node == self.task.instance.controller_node:
work_type = 'local'
else:
work_type = 'ansible-runner'
else:
work_type = 'local'

View File

@ -0,0 +1,32 @@
import pytest
from awx.api.versioning import reverse
from awx.main.models.ha import Instance
@pytest.mark.django_db
def test_disabled_zeros_capacity(patch, admin_user):
instance = Instance.objects.create(hostname='example-host', cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42)
url = reverse('api:instance_detail', kwargs={'pk': instance.pk})
r = patch(url=url, data={'enabled': False}, user=admin_user)
assert r.data['capacity'] == 0
instance.refresh_from_db()
assert instance.capacity == 0
@pytest.mark.django_db
def test_enabled_sets_capacity(patch, admin_user):
instance = Instance.objects.create(hostname='example-host', enabled=False, cpu=6, memory=36000000000, cpu_capacity=6, mem_capacity=42, capacity=0)
assert instance.capacity == 0
url = reverse('api:instance_detail', kwargs={'pk': instance.pk})
r = patch(url=url, data={'enabled': True}, user=admin_user)
assert r.data['capacity'] > 0
instance.refresh_from_db()
assert instance.capacity > 0

View File

@ -20,24 +20,27 @@ def test_orphan_unified_job_creation(instance, inventory):
@pytest.mark.django_db
@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8))
@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62))
@mock.patch('awx.main.tasks.inspect_execution_nodes', lambda *args, **kwargs: None)
@mock.patch('awx.main.models.ha.get_cpu_effective_capacity', lambda cpu: 8)
@mock.patch('awx.main.models.ha.get_mem_effective_capacity', lambda mem: 62)
def test_job_capacity_and_with_inactive_node():
i = Instance.objects.create(hostname='test-1')
with mock.patch.object(redis.client.Redis, 'ping', lambda self: True):
i.refresh_capacity()
i.save_health_data('18.0.1', 2, 8000)
assert i.enabled is True
assert i.capacity_adjustment == 1.0
assert i.capacity == 62
i.enabled = False
i.save()
with override_settings(CLUSTER_HOST_ID=i.hostname):
cluster_node_heartbeat()
with mock.patch.object(redis.client.Redis, 'ping', lambda self: True):
cluster_node_heartbeat()
i = Instance.objects.get(id=i.id)
assert i.capacity == 0
@pytest.mark.django_db
@mock.patch('awx.main.utils.common.get_cpu_capacity', lambda: (2, 8))
@mock.patch('awx.main.utils.common.get_mem_capacity', lambda: (8000, 62))
@mock.patch('awx.main.models.ha.get_cpu_effective_capacity', lambda cpu: 8)
@mock.patch('awx.main.models.ha.get_mem_effective_capacity', lambda mem: 62)
def test_job_capacity_with_redis_disabled():
i = Instance.objects.create(hostname='test-1')
@ -45,7 +48,7 @@ def test_job_capacity_with_redis_disabled():
raise redis.ConnectionError()
with mock.patch.object(redis.client.Redis, 'ping', _raise):
i.refresh_capacity()
i.local_health_check()
assert i.capacity == 0

View File

@ -1,10 +1,19 @@
import pytest
from unittest import mock
from unittest.mock import Mock
from decimal import Decimal
from awx.main.models import (
InstanceGroup,
)
from awx.main.models import InstanceGroup, Instance
@pytest.mark.parametrize('capacity_adjustment', [0.0, 0.25, 0.5, 0.75, 1, 1.5, 3])
def test_capacity_adjustment_no_save(capacity_adjustment):
inst = Instance(hostname='test-host', capacity_adjustment=Decimal(capacity_adjustment), capacity=0, cpu_capacity=10, mem_capacity=1000)
assert inst.capacity == 0
assert inst.capacity_adjustment == capacity_adjustment # sanity
inst.set_capacity_value()
assert inst.capacity > 0
assert inst.capacity == (float(inst.capacity_adjustment) * abs(inst.mem_capacity - inst.cpu_capacity) + min(inst.mem_capacity, inst.cpu_capacity))
def T(impact):

View File

@ -18,8 +18,6 @@ import tempfile
import psutil
from functools import reduce, wraps
from decimal import Decimal
# Django
from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist
from django.utils.dateparse import parse_datetime
@ -72,9 +70,6 @@ __all__ = [
'set_current_apps',
'extract_ansible_vars',
'get_search_fields',
'get_system_task_capacity',
'get_cpu_capacity',
'get_mem_capacity',
'model_to_dict',
'NullablePromptPseudoField',
'model_instance_diff',
@ -699,12 +694,32 @@ def parse_yaml_or_json(vars_str, silent_failure=True):
return vars_dict
def get_cpu_capacity():
def get_cpu_effective_capacity(cpu_count):
from django.conf import settings
settings_forkcpu = getattr(settings, 'SYSTEM_TASK_FORKS_CPU', None)
env_forkcpu = os.getenv('SYSTEM_TASK_FORKS_CPU', None)
if env_forkcpu:
forkcpu = int(env_forkcpu)
elif settings_forkcpu:
forkcpu = int(settings_forkcpu)
else:
forkcpu = 4
return cpu_count * forkcpu
def measure_cpu(): # TODO: replace with import from ansible-runner
return psutil.cpu_count()
def get_corrected_cpu(cpu_count): # formerlly get_cpu_capacity
"""Some environments will do a correction to the reported CPU number
because the given OpenShift value is a lie
"""
from django.conf import settings
settings_abscpu = getattr(settings, 'SYSTEM_TASK_ABS_CPU', None)
env_abscpu = os.getenv('SYSTEM_TASK_ABS_CPU', None)
@ -713,23 +728,32 @@ def get_cpu_capacity():
elif settings_abscpu is not None:
return 0, int(settings_abscpu)
cpu = psutil.cpu_count()
if env_forkcpu:
forkcpu = int(env_forkcpu)
elif settings_forkcpu:
forkcpu = int(settings_forkcpu)
else:
forkcpu = 4
return (cpu, cpu * forkcpu)
return cpu_count # no correction
def get_mem_capacity():
def get_mem_effective_capacity(mem_mb):
from django.conf import settings
settings_forkmem = getattr(settings, 'SYSTEM_TASK_FORKS_MEM', None)
env_forkmem = os.getenv('SYSTEM_TASK_FORKS_MEM', None)
if env_forkmem:
forkmem = int(env_forkmem)
elif settings_forkmem:
forkmem = int(settings_forkmem)
else:
forkmem = 100
return max(1, ((mem_mb // 1024 // 1024) - 2048) // forkmem)
def measure_memory(): # TODO: replace with import from ansible-runner
return psutil.virtual_memory().total
def get_corrected_memory(memory):
from django.conf import settings
settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None)
env_absmem = os.getenv('SYSTEM_TASK_ABS_MEM', None)
@ -738,40 +762,7 @@ def get_mem_capacity():
elif settings_absmem is not None:
return 0, int(settings_absmem)
if env_forkmem:
forkmem = int(env_forkmem)
elif settings_forkmem:
forkmem = int(settings_forkmem)
else:
forkmem = 100
mem = psutil.virtual_memory().total
return (mem, max(1, ((mem // 1024 // 1024) - 2048) // forkmem))
def get_system_task_capacity(scale=Decimal(1.0), cpu_capacity=None, mem_capacity=None):
"""
Measure system memory and use it as a baseline for determining the system's capacity
"""
from django.conf import settings
settings_forks = getattr(settings, 'SYSTEM_TASK_FORKS_CAPACITY', None)
env_forks = os.getenv('SYSTEM_TASK_FORKS_CAPACITY', None)
if env_forks:
return int(env_forks)
elif settings_forks:
return int(settings_forks)
if cpu_capacity is None:
_, cpu_cap = get_cpu_capacity()
else:
cpu_cap = cpu_capacity
if mem_capacity is None:
_, mem_cap = get_mem_capacity()
else:
mem_cap = mem_capacity
return min(mem_cap, cpu_cap) + ((max(mem_cap, cpu_cap) - min(mem_cap, cpu_cap)) * scale)
return memory
_inventory_updates = threading.local()

View File

@ -0,0 +1,98 @@
import logging
import yaml
import time
from receptorctl.socket_interface import ReceptorControl
logger = logging.getLogger('awx.main.utils.receptor')
def get_receptor_sockfile():
receptor_conf = '/etc/receptor/receptor.conf'
with open(receptor_conf, 'r') as f:
data = yaml.safe_load(f)
for section in data:
for entry_name, entry_data in section.items():
if entry_name == 'control-service':
if 'filename' in entry_data:
return entry_data['filename']
else:
raise RuntimeError(f'Receptor conf {receptor_conf} control-service entry does not have a filename parameter')
else:
raise RuntimeError(f'Receptor conf {receptor_conf} does not have control-service entry needed to get sockfile')
def get_receptor_ctl():
receptor_sockfile = get_receptor_sockfile()
return ReceptorControl(receptor_sockfile)
def worker_info(node_name):
receptor_ctl = get_receptor_ctl()
transmit_start = time.time()
error_list = []
data = {'errors': error_list, 'transmit_timing': 0.0}
result = receptor_ctl.submit_work(worktype='ansible-runner', payload='', params={"params": f"--worker-info"}, ttl='20s', node=node_name)
unit_id = result['unitid']
run_start = time.time()
data['transmit_timing'] = run_start - transmit_start
data['run_timing'] = 0.0
try:
resultfile = receptor_ctl.get_work_results(unit_id)
stdout = ''
while data['run_timing'] < 20.0:
status = receptor_ctl.simple_command(f'work status {unit_id}')
state_name = status.get('StateName')
if state_name not in ('Pending', 'Running'):
break
data['run_timing'] = time.time() - run_start
time.sleep(0.5)
else:
error_list.append(f'Timeout getting worker info on {node_name}, state remains in {state_name}')
stdout = resultfile.read()
stdout = str(stdout, encoding='utf-8')
finally:
res = receptor_ctl.simple_command(f"work release {unit_id}")
if res != {'released': unit_id}:
logger.warn(f'Could not confirm release of receptor work unit id {unit_id} from {node_name}, data: {res}')
receptor_ctl.close()
if state_name.lower() == 'failed':
work_detail = status.get('Detail', '')
if not work_detail.startswith('exit status'):
error_list.append(f'Receptor error getting worker info from {node_name}, detail:\n{work_detail}')
elif 'unrecognized arguments: --worker-info' in stdout:
error_list.append(f'Old version (2.0.1 or earlier) of ansible-runner on node {node_name} without --worker-info')
else:
error_list.append(f'Unknown ansible-runner error on node {node_name}, stdout:\n{stdout}')
else:
yaml_stdout = stdout.strip()
remote_data = {}
try:
remote_data = yaml.safe_load(yaml_stdout)
except Exception as json_e:
error_list.append(f'Failed to parse node {node_name} --worker-info output as YAML, error: {json_e}, data:\n{yaml_stdout}')
if not isinstance(remote_data, dict):
error_list.append(f'Remote node {node_name} --worker-info output is not a YAML dict, output:{stdout}')
else:
error_list.extend(remote_data.pop('errors', [])) # merge both error lists
data.update(remote_data)
# see tasks.py usage of keys
missing_keys = set(('runner_version', 'mem_in_bytes', 'cpu_count')) - set(data.keys())
if missing_keys:
data['errors'].append('Worker failed to return keys {}'.format(' '.join(missing_keys)))
return data

View File

@ -32,7 +32,13 @@ def unwrap_broadcast_msg(payload: dict):
def get_broadcast_hosts():
Instance = apps.get_model('main', 'Instance')
instances = Instance.objects.exclude(hostname=Instance.objects.me().hostname).order_by('hostname').values('hostname', 'ip_address').distinct()
instances = (
Instance.objects.exclude(hostname=Instance.objects.me().hostname)
.exclude(node_type='execution')
.order_by('hostname')
.values('hostname', 'ip_address')
.distinct()
)
return {i['hostname']: i['ip_address'] or i['hostname'] for i in instances}

View File

@ -419,10 +419,18 @@ DEVSERVER_DEFAULT_PORT = '8013'
# Set default ports for live server tests.
os.environ.setdefault('DJANGO_LIVE_TEST_SERVER_ADDRESS', 'localhost:9013-9199')
# heartbeat period can factor into some forms of logic, so it is maintained as a setting here
CLUSTER_NODE_HEARTBEAT_PERIOD = 60
RECEPTOR_SERVICE_ADVERTISEMENT_PERIOD = 60 # https://github.com/ansible/receptor/blob/aa1d589e154d8a0cb99a220aff8f98faf2273be6/pkg/netceptor/netceptor.go#L34
BROKER_URL = 'unix:///var/run/redis/redis.sock'
CELERYBEAT_SCHEDULE = {
'tower_scheduler': {'task': 'awx.main.tasks.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}},
'cluster_heartbeat': {'task': 'awx.main.tasks.cluster_node_heartbeat', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},
'cluster_heartbeat': {
'task': 'awx.main.tasks.cluster_node_heartbeat',
'schedule': timedelta(seconds=CLUSTER_NODE_HEARTBEAT_PERIOD),
'options': {'expires': 50},
},
'gather_analytics': {'task': 'awx.main.tasks.gather_analytics', 'schedule': timedelta(minutes=5)},
'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'k8s_reaper': {'task': 'awx.main.tasks.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},

135
docs/receptor_mesh.md Normal file
View File

@ -0,0 +1,135 @@
## Receptor Mesh
AWX uses a [Receptor](https://github.com/ansible/receptor) mesh to transmit "user-space" unified jobs:
- jobs
- ad hoc commands
- inventory updates
to the node where they run.
> NOTE: user-space jobs are what carry out the user's Ansible automation. These job types run inside of the designated execution environment so that the needed content is available.
> NOTE: The word "node" corresponds to entries in the `Instance` database model, or the `/api/v2/instances/` endpoint, and is a machine participating in the cluster / mesh.
The unified jobs API reports `controller_node` and `execution_node` fields.
The execution node is where the job runs, and the controller node interfaces between the job and server functions.
Before a job can start, the controller node prepares the `private_data_dir` needed for the job to run.
Next, the controller node sends the data via `ansible-runner`'s `transmit`, and connects to the output stream with `process`.
For details on these commands, see the [ansible-runner docs on remote execution](https://ansible-runner.readthedocs.io/en/latest/remote_jobs.html).
On the other side, the execution node runs the job under `ansible-runner worker`.
### Split of Control Plane versus Execution Plane
Instances in the **control plane** run persistent AWX services (like the web server, task dispatcher, etc.), project updates, and management jobs. Instances in the **execution plane** run user-space jobs.
The task manager logic will not send user-space jobs to **control-only** nodes.
In the inventory definition, the user can set a flag to designate this node type.
**Execution-only** nodes have a minimal set of software requirements needed to participate in the receptor mesh and run jobs under ansible-runner with podman isolation.
These _only_ run user-space jobs, and may be geographically separated (with high latency) from the control plane.
They may not even have a direct connection to the cluster, and use other receptor **hop** nodes to communicate.
**Hybrid** (control & execution nodes) are instances in the control plane that are allowed to run user-space jobs.
#### Receptor Configuration Work Type
Execution-only nodes need to advertise the "ansible-runner" work type.
User-space jobs are submitted as a receptor work unit with this work type.
An entry like this should appear in its `receptor.conf` (receptor configuration file):
```
- work-command:
worktype: ansible-runner
command: ansible-runner
params: worker
allowruntimeparams: true
```
Control (and hybrid) nodes advertise the "local" work type instead.
So the entry is the same as above, except that it has `worktype: local`.
Project updates are submitted as this work type.
If user-space jobs run on a hybrid node, they will also run as the "local" work type.
Here is a listing of work types that you may encounter:
- `local` - any ansible-runner job ran in a traditional install
- `ansible-runner` - remote execution of user-space jobs
- `kubernetes-runtime-auth` - user-space jobs ran in a container group
- `kubernetes-incluster-auth` - project updates and management jobs on OpenShift Container Platform
### Auto-discovery of execution nodes
Instances in control plane must be registered by the installer via `awx-manage`
commands like `awx-manage register_queue` or `awx-manage register_instance`.
Execution-only nodes are automatically discovered after they have been configured and join the receptor mesh.
Control nodes should see them as a "Known Node".
Control nodes check the receptor network (reported via `receptorctl status`) when their heartbeat task runs.
Nodes on the receptor network are compared against the `Instance` model in the database.
If a node appears in the mesh network which is not in the database, then a "health check" is started.
The `capacity` field will obtain a non-zero value through this process, which is necessary to run jobs.
#### Health Check Mechanics
All relevant data for health checks is reported from the ansible-runner command:
```
ansible-runner worker --worker-info
```
This will output YAML data to standard out containing CPU, memory, and other metrics used to compute `capacity`.
AWX invokes this command by submitting a receptor work unit (of type `ansible-runner`) to the target execution node.
If you have the development environment running, you can run a one-off health check of a node with this command:
```
echo "from awx.main.utils.receptor import worker_info; worker_info('receptor-1')" | awx-manage shell_plus --quiet
```
This must be ran as the awx user inside one of the hybrid or control nodes.
This will not affect actual `Instance` record, but will just run the command and report the data.
### Development Environment
A "toy" cluster with execution nodes and a hop node is created by the docker-compose Makefile target.
By default, it will create 1 hybrid node, 1 hop node, and 2 execution nodes.
The number of nodes can be changed with environment variables, for example:
```
CONTROL_PLANE_NODE_COUNT=2 EXECUTION_NODE_COUNT=3 COMPOSE_TAG=devel make docker-compose
```
This will spin up a topology represented below.
(names are the receptor node names, which differ from the AWX Instance names and network address in some cases)
```
┌──────────────┐
│ │
┌──────────────┐ ┌──────────┤ receptor-1 │
│ │ │ │ │
│ awx_1 │◄──────────┐ │ └──────────────┘
│ │ │ ▼
└──────┬───────┘ ┌──────┴───────┐ ┌──────────────┐
│ │ │ │ │
│ │ receptor-hop │◄───────┤ receptor-2 │
▼ │ │ │ │
┌──────────────┐ └──────────────┘ └──────────────┘
│ │ ▲
│ awx_2 │ │ ┌──────────────┐
│ │ │ │ │
└──────────────┘ └──────────┤ receptor-3 │
│ │
└──────────────┘
```
All execution (`receptor-*`) nodes connect to the hop node.
Only the `awx_1` node connects to the hop node out of the AWX cluster.
`awx_1` connects to `awx_2`, fulfilling the requirement that the AWX cluster is fully connected.
For an example, if a job is launched with `awx_2` as the `controller_node` and `receptor-3` as the `execution_node`,
then `awx_2` communicates to `receptor-3` via `awx_1` and then `receptor-hop`.

View File

@ -1,119 +0,0 @@
---
version: '2'
services:
haproxy:
build:
context: ./docker-compose
dockerfile: Dockerfile-haproxy
container_name: tools_haproxy_1
depends_on:
- "awx-1"
- "awx-2"
- "awx-3"
ports:
- "8013:8013"
- "8043:8043"
- "1936:1936"
awx-1:
user: ${CURRENT_UID}
container_name: tools_awx_1_1
privileged: true
image: ${DEV_DOCKER_TAG_BASE}/awx_devel:${TAG}
hostname: awx-1
environment:
CURRENT_UID:
SDB_HOST: 0.0.0.0
SDB_PORT: 5899
AWX_GROUP_QUEUES: alpha,tower
command: launch_awx.sh
working_dir: "/awx_devel"
volumes:
- "../:/awx_devel"
- "./redis/redis_socket_ha_1:/var/run/redis/"
- "./docker-compose/supervisor.conf:/etc/supervisord.conf"
- "./docker-compose-cluster/awx-1-receptor.conf:/etc/receptor/receptor.conf"
ports:
- "2222:2222"
- "5899-5999:5899-5999"
awx-2:
user: ${CURRENT_UID}
container_name: tools_awx_2_1
privileged: true
image: ${DEV_DOCKER_TAG_BASE}/awx_devel:${TAG}
hostname: awx-2
command: launch_awx.sh
working_dir: "/awx_devel"
environment:
CURRENT_UID:
SDB_HOST: 0.0.0.0
SDB_PORT: 7899
AWX_GROUP_QUEUES: bravo,tower
volumes:
- "../:/awx_devel"
- "./redis/redis_socket_ha_2:/var/run/redis/"
- "./docker-compose/supervisor.conf:/etc/supervisord.conf"
- "./docker-compose-cluster/awx-2-receptor.conf:/etc/receptor/receptor.conf"
ports:
- "2223:2222"
- "7899-7999:7899-7999"
awx-3:
user: ${CURRENT_UID}
container_name: tools_awx_3_1
privileged: true
image: ${DEV_DOCKER_TAG_BASE}/awx_devel:${TAG}
hostname: awx-3
command: launch_awx.sh
working_dir: "/awx_devel"
environment:
CURRENT_UID:
SDB_HOST: 0.0.0.0
SDB_PORT: 8899
AWX_GROUP_QUEUES: charlie,tower
volumes:
- "../:/awx_devel"
- "./redis/redis_socket_ha_3:/var/run/redis/"
- "./docker-compose/supervisor.conf:/etc/supervisord.conf"
- "./docker-compose-cluster/awx-3-receptor.conf:/etc/receptor/receptor.conf"
ports:
- "2224:2222"
- "8899-8999:8899-8999"
redis_1:
user: ${CURRENT_UID}
image: redis:latest
container_name: tools_redis_1_1
command: ["/usr/local/etc/redis/redis.conf"]
volumes:
- "./redis/redis.conf:/usr/local/etc/redis/redis.conf"
- "./redis/redis_socket_ha_1:/var/run/redis/"
redis_2:
user: ${CURRENT_UID}
image: redis:latest
container_name: tools_redis_2_1
command: ["/usr/local/etc/redis/redis.conf"]
volumes:
- "./redis/redis.conf:/usr/local/etc/redis/redis.conf"
- "./redis/redis_socket_ha_2:/var/run/redis/"
redis_3:
user: ${CURRENT_UID}
image: redis:latest
container_name: tools_redis_3_1
command: ["/usr/local/etc/redis/redis.conf"]
volumes:
- "./redis/redis.conf:/usr/local/etc/redis/redis.conf"
- "./redis/redis_socket_ha_3:/var/run/redis/"
postgres:
image: postgres:12
container_name: tools_postgres_1
environment:
POSTGRES_HOST_AUTH_METHOD: trust
volumes:
- "awx_db:/var/lib/postgresql/data"
volumes:
awx_db:

View File

@ -1,23 +0,0 @@
---
- log-level: info
- control-service:
service: control
filename: /var/run/receptor/receptor.sock
- tcp-listener:
port: 2200
- tcp-peer:
address: awx-2:2200
- work-command:
worktype: worker
command: ansible-runner
params: worker
- work-kubernetes:
worktype: ocp
namespace: receptor
image: quay.io/shanemcd/ee
authmethod: kubeconfig

View File

@ -1,23 +0,0 @@
---
- log-level: info
- control-service:
service: control
filename: /var/run/receptor/receptor.sock
- tcp-listener:
port: 2200
- tcp-peer:
address: awx-3:2200
- work-command:
worktype: worker
command: ansible-runner
params: worker
- work-kubernetes:
worktype: ocp
namespace: receptor
image: quay.io/shanemcd/ee
authmethod: kubeconfig

View File

@ -1,23 +0,0 @@
---
- log-level: info
- control-service:
service: control
filename: /var/run/receptor/receptor.sock
- tcp-listener:
port: 2200
- tcp-peer:
address: awx-1:2200
- work-command:
worktype: worker
command: ansible-runner
params: worker
- work-kubernetes:
worktype: ocp
namespace: receptor
image: quay.io/shanemcd/ee
authmethod: kubeconfig

View File

@ -211,10 +211,10 @@ need to call `bootstrap_development.sh` first.
Certain features or bugs are only applicable when running a cluster of AWX nodes. To bring up a 3 node cluster development environment simply run the below command.
```bash
(host)$ CLUSTER_NODE_COUNT=3 make docker-compose
(host)$ CONTROL_PLANE_NODE_COUNT=3 make docker-compose
```
`CLUSTER_NODE_COUNT` is configurable and defaults to 1, effectively a non-clustered AWX.
`CONTROL_PLANE_NODE_COUNT` is configurable and defaults to 1, effectively a non-clustered AWX.
Note that you may see multiple messages of the form `2021-03-04 20:11:47,666 WARNING [-] awx.main.wsbroadcast Connection from awx_2 to awx_5 failed: 'Cannot connect to host awx_5:8013 ssl:False [Name or service not known]'.`. This can happen when you bring up a cluster of many nodes, say 10, then you bring up a cluster of less nodes, say 3. In this example, there will be 7 `Instance` records in the database that represent AWX instances. The AWX development environment mimics the VM deployment (vs. kubernetes) and expects the missing nodes to be brought back to healthy by the admin. The warning message you are seeing is all of the AWX nodes trying to connect the websocket backplane. You can manually delete the `Instance` records from the database i.e. `Instance.objects.get(hostname='awx_9').delete()` to stop the warnings.

View File

@ -5,5 +5,5 @@ awx_image: 'quay.io/ansible/awx_devel'
pg_port: 5432
pg_username: 'awx'
pg_database: 'awx'
cluster_node_count: 1
control_plane_node_count: 1
minikube_container_group: false

View File

@ -1,10 +1,13 @@
---
- name: Create _sources directory
- name: Create _sources directories
file:
path: "{{ sources_dest }}/secrets"
path: "{{ sources_dest }}/{{ item }}"
state: 'directory'
mode: '0700'
loop:
- secrets
- receptor
- name: Detect secrets
stat:
@ -81,3 +84,23 @@
src: docker-compose.yml.j2
dest: "{{ sources_dest }}/{{ compose_name }}"
mode: '0600'
- name: Render Receptor Config(s) for Control Plane
template:
src: "receptor-awx.conf.j2"
dest: "{{ sources_dest }}/receptor/receptor-awx-{{ item }}.conf"
mode: '0600'
with_sequence: start=1 end={{ control_plane_node_count }}
- name: Render Receptor Hop Config
template:
src: "receptor-hop.conf.j2"
dest: "{{ sources_dest }}/receptor/receptor-hop.conf"
mode: '0600'
- name: Render Receptor Worker Config(s)
template:
src: "receptor-worker.conf.j2"
dest: "{{ sources_dest }}/receptor/receptor-worker-{{ item }}.conf"
mode: '0600'
with_sequence: start=1 end={{ execution_node_count }}

View File

@ -1,7 +1,7 @@
---
version: '2.1'
services:
{% for i in range(cluster_node_count|int) %}
{% for i in range(control_plane_node_count|int) %}
{% set container_postfix = loop.index %}
{% set awx_sdb_port_start = 7899 + (loop.index0*1000) | int %}
{% set awx_sdb_port_end = 7999 + (loop.index0*1000) | int %}
@ -35,16 +35,16 @@ services:
- "../../docker-compose/_sources/websocket_secret.py:/etc/tower/conf.d/websocket_secret.py"
- "../../docker-compose/_sources/local_settings.py:/etc/tower/conf.d/local_settings.py"
- "../../docker-compose/_sources/SECRET_KEY:/etc/tower/SECRET_KEY"
- "../../docker-compose/receptor.conf:/etc/receptor/receptor.conf"
- "../../docker-compose/_sources/receptor/receptor-awx-{{ loop.index }}.conf:/etc/receptor/receptor.conf"
# - "../../docker-compose/_sources/certs:/etc/receptor/certs" # TODO: optionally generate certs
- "/sys/fs/cgroup:/sys/fs/cgroup"
- "~/.kube/config:/var/lib/awx/.kube/config"
- "redis_socket_{{ container_postfix }}:/var/run/redis/:rw"
- "receptor_{{ container_postfix }}:/var/run/receptor/"
privileged: true
tty: true
ports:
- "{{ awx_sdb_port_start }}-{{ awx_sdb_port_end }}:{{ awx_sdb_port_start }}-{{ awx_sdb_port_end }}" # sdb-listen
{% if cluster_node_count|int == 1 %}
{% if control_plane_node_count|int == 1 %}
- "6899:6899"
- "8080:8080" # unused but mapped for debugging
- "8888:8888" # jupyter notebook
@ -61,7 +61,7 @@ services:
entrypoint: ["redis-server"]
command: ["/usr/local/etc/redis/redis.conf"]
{% endfor %}
{% if cluster_node_count|int > 1 %}
{% if control_plane_node_count|int > 1 %}
haproxy:
image: haproxy
user: "{{ ansible_user_uid }}"
@ -72,7 +72,7 @@ services:
- "8043:8043"
- "1936:1936"
depends_on:
{% for i in range(cluster_node_count|int) -%}
{% for i in range(control_plane_node_count|int) -%}
{% set container_postfix = loop.index %}
- "awx_{{ container_postfix }}"
{% endfor %}
@ -93,13 +93,40 @@ services:
POSTGRES_PASSWORD: {{ pg_password }}
volumes:
- "awx_db:/var/lib/postgresql/data"
{% if execution_node_count|int > 0 %}
receptor-hop:
image: quay.io/project-receptor/receptor:latest
user: root
container_name: tools_receptor_hop
hostname: receptor-hop
command: 'receptor --config /etc/receptor/receptor.conf'
links:
- awx_1
ports:
- "5555:5555"
volumes:
- "../../docker-compose/_sources/receptor/receptor-hop.conf:/etc/receptor/receptor.conf"
{% for i in range(execution_node_count|int) -%}
receptor-{{ loop.index }}:
image: quay.io/awx/awx_devel:devel
user: "{{ ansible_user_uid }}"
container_name: tools_receptor_{{ loop.index }}
hostname: receptor-1
command: 'receptor --config /etc/receptor/receptor.conf'
links:
- receptor-hop
volumes:
- "../../docker-compose/_sources/receptor/receptor-worker-{{ loop.index }}.conf:/etc/receptor/receptor.conf"
- "/sys/fs/cgroup:/sys/fs/cgroup"
privileged: true
{% endfor %}
{% endif %}
volumes:
awx_db:
name: tools_awx_db
{% for i in range(cluster_node_count|int) -%}
{% for i in range(control_plane_node_count|int) -%}
{% set container_postfix = loop.index %}
receptor_{{ container_postfix }}:
name: tools_receptor_{{ container_postfix }}
redis_socket_{{ container_postfix }}:
name: tools_redis_socket_{{ container_postfix }}
{% endfor -%}

View File

@ -29,7 +29,7 @@ backend nodes
http-request set-header X-Forwarded-Port %[dst_port]
http-request add-header X-Forwarded-Proto https if { ssl_fc }
option httpchk HEAD / HTTP/1.1\r\nHost:localhost
{% for i in range(cluster_node_count|int) %}
{% for i in range(control_plane_node_count|int) %}
{% set container_postfix = loop.index %}
server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8013 check
{% endfor %}
@ -37,7 +37,7 @@ backend nodes
backend nodes_ssl
mode tcp
balance roundrobin
{% for i in range(cluster_node_count|int) %}
{% for i in range(control_plane_node_count|int) %}
{% set container_postfix = loop.index %}
server tools_awx_{{ container_postfix }} tools_awx_{{ container_postfix }}:8043 check
{% endfor %}

View File

@ -1,12 +1,25 @@
---
- node:
id: foo
id: awx_{{ item }}
- log-level: debug
- log-level: info
- tcp-listener:
port: 2222
{% for i in range(item | int + 1, control_plane_node_count | int + 1) %}
- tcp-peer:
address: awx_{{ i }}:2222
redial: true
{% endfor %}
#- tls-server:
# name: mutual-tls
# cert: /etc/receptor/certs/awx.crt
# key: /etc/receptor/certs/awx.key
# requireclientcert: true
# clientcas: /etc/receptor/certs/ca.crt
- control-service:
service: control
filename: /var/run/receptor/receptor.sock

View File

@ -0,0 +1,12 @@
---
- node:
id: receptor-hop
- log-level: info
- tcp-peer:
address: awx_1:2222
redial: true
- tcp-listener:
port: 5555

View File

@ -0,0 +1,18 @@
---
- node:
id: receptor-{{ item }}
- log-level: info
- tcp-peer:
address: tools_receptor_hop:5555
redial: true
- work-command:
worktype: ansible-runner
command: ansible-runner
params: worker
allowruntimeparams: true
- control-service:
service: control