Merge pull request #9957 from jbradberry/isolated-removal

Isolated removal

SUMMARY
Removal of the isolated nodes feature.
ISSUE TYPE

Feature Pull Request

COMPONENT NAME

API

AWX VERSION

Reviewed-by: Alan Rominger <arominge@redhat.com>
Reviewed-by: Jeff Bradberry <None>
Reviewed-by: Elyézer Rezende <None>
Reviewed-by: Bianca Henderson <beeankha@gmail.com>
This commit is contained in:
softwarefactory-project-zuul[bot]
2021-04-29 19:15:43 +00:00
committed by GitHub
50 changed files with 113 additions and 1740 deletions

View File

@@ -33,7 +33,7 @@ import subprocess
from django.conf import settings
from django.db import transaction, DatabaseError, IntegrityError, ProgrammingError, connection
from django.db.models.fields.related import ForeignKey
from django.utils.timezone import now, timedelta
from django.utils.timezone import now
from django.utils.encoding import smart_str
from django.contrib.auth.models import User
from django.utils.translation import ugettext_lazy as _, gettext_noop
@@ -84,7 +84,6 @@ from awx.main.models import (
from awx.main.constants import ACTIVE_STATES
from awx.main.exceptions import AwxTaskError, PostRunError
from awx.main.queue import CallbackQueueDispatcher
from awx.main.isolated import manager as isolated_manager
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_local_queuename, reaper
from awx.main.utils import (
@@ -170,8 +169,6 @@ def dispatch_startup():
#
apply_cluster_membership_policies()
cluster_node_heartbeat()
if Instance.objects.me().is_controller():
awx_isolated_heartbeat()
Metrics().clear_values()
# Update Tower's rsyslog.conf file based on loggins settings in the db
@@ -205,13 +202,8 @@ def apply_cluster_membership_policies():
started_compute = time.time()
all_instances = list(Instance.objects.order_by('id'))
all_groups = list(InstanceGroup.objects.prefetch_related('instances'))
iso_hostnames = set([])
for ig in all_groups:
if ig.controller_id is not None:
iso_hostnames.update(ig.policy_instance_list)
considered_instances = [inst for inst in all_instances if inst.hostname not in iso_hostnames]
total_instances = len(considered_instances)
total_instances = len(all_instances)
actual_groups = []
actual_instances = []
Group = namedtuple('Group', ['obj', 'instances', 'prior_instances'])
@@ -232,18 +224,12 @@ def apply_cluster_membership_policies():
if group_actual.instances:
logger.debug("Policy List, adding Instances {} to Group {}".format(group_actual.instances, ig.name))
if ig.controller_id is None:
actual_groups.append(group_actual)
else:
# For isolated groups, _only_ apply the policy_instance_list
# do not add to in-memory list, so minimum rules not applied
logger.debug('Committing instances to isolated group {}'.format(ig.name))
ig.instances.set(group_actual.instances)
actual_groups.append(group_actual)
# Process Instance minimum policies next, since it represents a concrete lower bound to the
# number of instances to make available to instance groups
actual_instances = [Node(obj=i, groups=[]) for i in considered_instances if i.managed_by_policy]
logger.debug("Total non-isolated instances:{} available for policy: {}".format(total_instances, len(actual_instances)))
actual_instances = [Node(obj=i, groups=[]) for i in all_instances if i.managed_by_policy]
logger.debug("Total instances: {}, available for policy: {}".format(total_instances, len(actual_instances)))
for g in sorted(actual_groups, key=lambda x: len(x.instances)):
policy_min_added = []
for i in sorted(actual_instances, key=lambda x: len(x.groups)):
@@ -285,7 +271,7 @@ def apply_cluster_membership_policies():
logger.debug('Cluster policy no-op finished in {} seconds'.format(time.time() - started_compute))
return
# On a differential basis, apply instances to non-isolated groups
# On a differential basis, apply instances to groups
with transaction.atomic():
for g in actual_groups:
if g.obj.is_container_group:
@@ -419,7 +405,7 @@ def cleanup_execution_environment_images():
def cluster_node_heartbeat():
logger.debug("Cluster node heartbeat task.")
nowtime = now()
instance_list = list(Instance.objects.all_non_isolated())
instance_list = list(Instance.objects.all())
this_inst = None
lost_instances = []
@@ -503,30 +489,6 @@ def awx_k8s_reaper():
logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group))
@task(queue=get_local_queuename)
def awx_isolated_heartbeat():
local_hostname = settings.CLUSTER_HOST_ID
logger.debug("Controlling node checking for any isolated management tasks.")
poll_interval = settings.AWX_ISOLATED_PERIODIC_CHECK
# Get isolated instances not checked since poll interval - some buffer
nowtime = now()
accept_before = nowtime - timedelta(seconds=(poll_interval - 10))
isolated_instance_qs = Instance.objects.filter(
rampart_groups__controller__instances__hostname=local_hostname,
)
isolated_instance_qs = isolated_instance_qs.filter(last_isolated_check__lt=accept_before) | isolated_instance_qs.filter(last_isolated_check=None)
# Fast pass of isolated instances, claiming the nodes to update
with transaction.atomic():
for isolated_instance in isolated_instance_qs:
isolated_instance.last_isolated_check = nowtime
# Prevent modified time from being changed, as in normal heartbeat
isolated_instance.save(update_fields=['last_isolated_check'])
# Slow pass looping over isolated IGs and their isolated instances
if len(isolated_instance_qs) > 0:
logger.debug("Managing isolated instances {}.".format(','.join([inst.hostname for inst in isolated_instance_qs])))
isolated_manager.IsolatedManager(CallbackQueueDispatcher.dispatch).health_check(isolated_instance_qs)
@task(queue=get_local_queuename)
def awx_periodic_scheduler():
with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired:
@@ -1017,7 +979,7 @@ class BaseTask(object):
else:
env['PATH'] = os.path.join(settings.AWX_VENV_PATH, "bin")
def build_env(self, instance, private_data_dir, isolated, private_data_files=None):
def build_env(self, instance, private_data_dir, private_data_files=None):
"""
Build environment dictionary for ansible-playbook.
"""
@@ -1138,7 +1100,7 @@ class BaseTask(object):
"""
instance.log_lifecycle("post_run")
def final_run_hook(self, instance, status, private_data_dir, fact_modification_times, isolated_manager_instance=None):
def final_run_hook(self, instance, status, private_data_dir, fact_modification_times):
"""
Hook for any steps to run after job/task is marked as complete.
"""
@@ -1284,16 +1246,6 @@ class BaseTask(object):
if result_traceback:
self.instance = self.update_model(self.instance.pk, result_traceback=result_traceback)
def check_handler(self, config):
"""
IsolatedManager callback triggered by the repeated checks of the isolated node
"""
job_env = build_safe_env(config['env'])
for k, v in self.safe_cred_env.items():
if k in job_env:
job_env[k] = v
self.instance = self.update_model(self.instance.pk, job_args=json.dumps(config['command']), job_cwd=config['cwd'], job_env=job_env)
@with_path_cleanup
def run(self, pk, **kwargs):
"""
@@ -1321,7 +1273,6 @@ class BaseTask(object):
self.safe_env = {}
self.safe_cred_env = {}
private_data_dir = None
isolated_manager_instance = None
# store a reference to the parent workflow job (if any) so we can include
# it in event data JSON
@@ -1329,7 +1280,6 @@ class BaseTask(object):
self.parent_workflow_job_id = self.instance.get_workflow_job().id
try:
isolated = self.instance.is_isolated()
self.instance.send_notification_templates("running")
private_data_dir = self.build_private_data_dir(self.instance)
self.pre_run_hook(self.instance, private_data_dir)
@@ -1363,7 +1313,7 @@ class BaseTask(object):
passwords = self.build_passwords(self.instance, kwargs)
self.build_extra_vars_file(self.instance, private_data_dir)
args = self.build_args(self.instance, private_data_dir, passwords)
env = self.build_env(self.instance, private_data_dir, isolated, private_data_files=private_data_files)
env = self.build_env(self.instance, private_data_dir, private_data_files=private_data_files)
self.safe_env = build_safe_env(env)
credentials = self.build_credentials_list(self.instance)
@@ -1464,7 +1414,7 @@ class BaseTask(object):
self.instance = self.update_model(pk, status=status, emitted_events=self.event_ct, **extra_update_fields)
try:
self.final_run_hook(self.instance, status, private_data_dir, fact_modification_times, isolated_manager_instance=isolated_manager_instance)
self.final_run_hook(self.instance, status, private_data_dir, fact_modification_times)
except Exception:
logger.exception('{} Final run hook errored.'.format(self.instance.log_format))
@@ -1549,11 +1499,11 @@ class RunJob(BaseTask):
return passwords
def build_env(self, job, private_data_dir, isolated=False, private_data_files=None):
def build_env(self, job, private_data_dir, private_data_files=None):
"""
Build environment dictionary for ansible-playbook.
"""
env = super(RunJob, self).build_env(job, private_data_dir, isolated=isolated, private_data_files=private_data_files)
env = super(RunJob, self).build_env(job, private_data_dir, private_data_files=private_data_files)
if private_data_files is None:
private_data_files = {}
# Set environment variables needed for inventory and job event
@@ -1564,10 +1514,9 @@ class RunJob(BaseTask):
env['PROJECT_REVISION'] = job.project.scm_revision
env['ANSIBLE_RETRY_FILES_ENABLED'] = "False"
env['MAX_EVENT_RES'] = str(settings.MAX_EVENT_RES_DATA)
if not isolated:
if hasattr(settings, 'AWX_ANSIBLE_CALLBACK_PLUGINS') and settings.AWX_ANSIBLE_CALLBACK_PLUGINS:
env['ANSIBLE_CALLBACK_PLUGINS'] = ':'.join(settings.AWX_ANSIBLE_CALLBACK_PLUGINS)
env['AWX_HOST'] = settings.TOWER_URL_BASE
if hasattr(settings, 'AWX_ANSIBLE_CALLBACK_PLUGINS') and settings.AWX_ANSIBLE_CALLBACK_PLUGINS:
env['ANSIBLE_CALLBACK_PLUGINS'] = ':'.join(settings.AWX_ANSIBLE_CALLBACK_PLUGINS)
env['AWX_HOST'] = settings.TOWER_URL_BASE
# Create a directory for ControlPath sockets that is unique to each job
cp_dir = os.path.join(private_data_dir, 'cp')
@@ -1798,9 +1747,6 @@ class RunJob(BaseTask):
if sync_needs:
pu_ig = job.instance_group
pu_en = job.execution_node
if job.is_isolated() is True:
pu_ig = pu_ig.controller
pu_en = settings.CLUSTER_HOST_ID
sync_metafields = dict(
launch_type="sync",
@@ -1855,7 +1801,7 @@ class RunJob(BaseTask):
# ran inside of the event saving code
update_smart_memberships_for_inventory(job.inventory)
def final_run_hook(self, job, status, private_data_dir, fact_modification_times, isolated_manager_instance=None):
def final_run_hook(self, job, status, private_data_dir, fact_modification_times):
super(RunJob, self).final_run_hook(job, status, private_data_dir, fact_modification_times)
if not private_data_dir:
# If there's no private data dir, that means we didn't get into the
@@ -1867,8 +1813,6 @@ class RunJob(BaseTask):
os.path.join(private_data_dir, 'artifacts', 'fact_cache'),
fact_modification_times,
)
if isolated_manager_instance and not job.is_container_group_task:
isolated_manager_instance.cleanup()
try:
inventory = job.inventory
@@ -1932,11 +1876,11 @@ class RunProjectUpdate(BaseTask):
passwords['scm_password'] = project_update.credential.get_input('password', default='')
return passwords
def build_env(self, project_update, private_data_dir, isolated=False, private_data_files=None):
def build_env(self, project_update, private_data_dir, private_data_files=None):
"""
Build environment dictionary for ansible-playbook.
"""
env = super(RunProjectUpdate, self).build_env(project_update, private_data_dir, isolated=isolated, private_data_files=private_data_files)
env = super(RunProjectUpdate, self).build_env(project_update, private_data_dir, private_data_files=private_data_files)
env['ANSIBLE_RETRY_FILES_ENABLED'] = str(False)
env['ANSIBLE_ASK_PASS'] = str(False)
env['ANSIBLE_BECOME_ASK_PASS'] = str(False)
@@ -2391,14 +2335,14 @@ class RunInventoryUpdate(BaseTask):
injector = InventorySource.injectors[inventory_update.source]()
return injector.build_private_data(inventory_update, private_data_dir)
def build_env(self, inventory_update, private_data_dir, isolated, private_data_files=None):
def build_env(self, inventory_update, private_data_dir, private_data_files=None):
"""Build environment dictionary for ansible-inventory.
Most environment variables related to credentials or configuration
are accomplished by the inventory source injectors (in this method)
or custom credential type injectors (in main run method).
"""
env = super(RunInventoryUpdate, self).build_env(inventory_update, private_data_dir, isolated, private_data_files=private_data_files)
env = super(RunInventoryUpdate, self).build_env(inventory_update, private_data_dir, private_data_files=private_data_files)
if private_data_files is None:
private_data_files = {}
@@ -2715,11 +2659,11 @@ class RunAdHocCommand(BaseTask):
passwords[field] = value
return passwords
def build_env(self, ad_hoc_command, private_data_dir, isolated=False, private_data_files=None):
def build_env(self, ad_hoc_command, private_data_dir, private_data_files=None):
"""
Build environment dictionary for ansible.
"""
env = super(RunAdHocCommand, self).build_env(ad_hoc_command, private_data_dir, isolated=isolated, private_data_files=private_data_files)
env = super(RunAdHocCommand, self).build_env(ad_hoc_command, private_data_dir, private_data_files=private_data_files)
# Set environment variables needed for inventory and ad hoc event
# callbacks to work.
env['AD_HOC_COMMAND_ID'] = str(ad_hoc_command.pk)
@@ -2825,11 +2769,6 @@ class RunAdHocCommand(BaseTask):
d[r'Password:\s*?$'] = 'ssh_password'
return d
def final_run_hook(self, adhoc_job, status, private_data_dir, fact_modification_times, isolated_manager_instance=None):
super(RunAdHocCommand, self).final_run_hook(adhoc_job, status, private_data_dir, fact_modification_times)
if isolated_manager_instance:
isolated_manager_instance.cleanup()
@task(queue=get_local_queuename)
class RunSystemJob(BaseTask):
@@ -2871,8 +2810,8 @@ class RunSystemJob(BaseTask):
os.chmod(path, stat.S_IRUSR)
return path
def build_env(self, instance, private_data_dir, isolated=False, private_data_files=None):
base_env = super(RunSystemJob, self).build_env(instance, private_data_dir, isolated=isolated, private_data_files=private_data_files)
def build_env(self, instance, private_data_dir, private_data_files=None):
base_env = super(RunSystemJob, self).build_env(instance, private_data_dir, private_data_files=private_data_files)
# TODO: this is able to run by turning off isolation
# the goal is to run it a container instead
env = dict(os.environ.items())