Merge branch 'ansible:devel' into devel

This commit is contained in:
djyasin
2022-07-13 09:53:22 -04:00
committed by GitHub
130 changed files with 24020 additions and 19856 deletions

View File

@@ -129,7 +129,7 @@ def config(since, **kwargs):
}
@register('counts', '1.1', description=_('Counts of objects such as organizations, inventories, and projects'))
@register('counts', '1.2', description=_('Counts of objects such as organizations, inventories, and projects'))
def counts(since, **kwargs):
counts = {}
for cls in (
@@ -172,6 +172,13 @@ def counts(since, **kwargs):
.count()
)
counts['pending_jobs'] = models.UnifiedJob.objects.exclude(launch_type='sync').filter(status__in=('pending',)).count()
if connection.vendor == 'postgresql':
with connection.cursor() as cursor:
cursor.execute(f"select count(*) from pg_stat_activity where datname=\'{connection.settings_dict['NAME']}\'")
counts['database_connections'] = cursor.fetchone()[0]
else:
# We should be using postgresql, but if we do that change that ever we should change the below value
counts['database_connections'] = 1
return counts

View File

@@ -126,6 +126,8 @@ def metrics():
LICENSE_INSTANCE_TOTAL = Gauge('awx_license_instance_total', 'Total number of managed hosts provided by your license', registry=REGISTRY)
LICENSE_INSTANCE_FREE = Gauge('awx_license_instance_free', 'Number of remaining managed hosts provided by your license', registry=REGISTRY)
DATABASE_CONNECTIONS = Gauge('awx_database_connections_total', 'Number of connections to database', registry=REGISTRY)
license_info = get_license()
SYSTEM_INFO.info(
{
@@ -163,6 +165,8 @@ def metrics():
USER_SESSIONS.labels(type='user').set(current_counts['active_user_sessions'])
USER_SESSIONS.labels(type='anonymous').set(current_counts['active_anonymous_sessions'])
DATABASE_CONNECTIONS.set(current_counts['database_connections'])
all_job_data = job_counts(None)
statuses = all_job_data.get('status', {})
for status, value in statuses.items():

View File

@@ -10,6 +10,27 @@ from awx.main.models import Instance, UnifiedJob, WorkflowJob
logger = logging.getLogger('awx.main.dispatch')
def startup_reaping():
"""
If this particular instance is starting, then we know that any running jobs are invalid
so we will reap those jobs as a special action here
"""
me = Instance.objects.me()
jobs = UnifiedJob.objects.filter(status='running', controller_node=me.hostname)
job_ids = []
for j in jobs:
job_ids.append(j.id)
j.status = 'failed'
j.start_args = ''
j.job_explanation += 'Task was marked as running at system start up. The system must have not shut down properly, so it has been marked as failed.'
j.save(update_fields=['status', 'start_args', 'job_explanation'])
if hasattr(j, 'send_notification_templates'):
j.send_notification_templates('failed')
j.websocket_emit_status('failed')
if job_ids:
logger.error(f'Unified jobs {job_ids} were reaped on dispatch startup')
def reap_job(j, status):
if UnifiedJob.objects.get(id=j.id).status not in ('running', 'waiting'):
# just in case, don't reap jobs that aren't running

View File

@@ -169,8 +169,9 @@ class AWXConsumerPG(AWXConsumerBase):
logger.exception(f"Error consuming new events from postgres, will retry for {self.pg_max_wait} s")
self.pg_down_time = time.time()
self.pg_is_down = True
if time.time() - self.pg_down_time > self.pg_max_wait:
logger.warning(f"Postgres event consumer has not recovered in {self.pg_max_wait} s, exiting")
current_downtime = time.time() - self.pg_down_time
if current_downtime > self.pg_max_wait:
logger.exception(f"Postgres event consumer has not recovered in {current_downtime} s, exiting")
raise
# Wait for a second before next attempt, but still listen for any shutdown signals
for i in range(10):
@@ -179,6 +180,10 @@ class AWXConsumerPG(AWXConsumerBase):
time.sleep(0.1)
for conn in db.connections.all():
conn.close_if_unusable_or_obsolete()
except Exception:
# Log unanticipated exception in addition to writing to stderr to get timestamps and other metadata
logger.exception('Encountered unhandled error in dispatcher main loop')
raise
class BaseWorker(object):

View File

@@ -53,7 +53,7 @@ class Command(BaseCommand):
# (like the node heartbeat)
periodic.run_continuously()
reaper.reap()
reaper.startup_reaping()
consumer = None
try:

View File

@@ -0,0 +1,40 @@
# Generated by Django 3.2.13 on 2022-06-21 21:29
from django.db import migrations
import logging
logger = logging.getLogger("awx")
def forwards(apps, schema_editor):
InventorySource = apps.get_model('main', 'InventorySource')
sources = InventorySource.objects.filter(update_on_project_update=True)
for src in sources:
if src.update_on_launch == False:
src.update_on_launch = True
src.save(update_fields=['update_on_launch'])
logger.info(f"Setting update_on_launch to True for {src}")
proj = src.source_project
if proj and proj.scm_update_on_launch is False:
proj.scm_update_on_launch = True
proj.save(update_fields=['scm_update_on_launch'])
logger.warning(f"Setting scm_update_on_launch to True for {proj}")
class Migration(migrations.Migration):
dependencies = [
('main', '0163_convert_job_tags_to_textfield'),
]
operations = [
migrations.RunPython(forwards, migrations.RunPython.noop),
migrations.RemoveField(
model_name='inventorysource',
name='scm_last_revision',
),
migrations.RemoveField(
model_name='inventorysource',
name='update_on_project_update',
),
]

View File

@@ -35,6 +35,7 @@ def gce(cred, env, private_data_dir):
container_path = to_container_path(path, private_data_dir)
env['GCE_CREDENTIALS_FILE_PATH'] = container_path
env['GCP_SERVICE_ACCOUNT_FILE'] = container_path
env['GOOGLE_APPLICATION_CREDENTIALS'] = container_path
# Handle env variables for new module types.
# This includes gcp_compute inventory plugin and

View File

@@ -985,22 +985,11 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions, CustomVirtualE
default=None,
null=True,
)
scm_last_revision = models.CharField(
max_length=1024,
blank=True,
default='',
editable=False,
)
update_on_project_update = models.BooleanField(
default=False,
help_text=_(
'This field is deprecated and will be removed in a future release. '
'In future release, functionality will be migrated to source project update_on_launch.'
),
)
update_on_launch = models.BooleanField(
default=False,
)
update_cache_timeout = models.PositiveIntegerField(
default=0,
)
@@ -1038,14 +1027,6 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions, CustomVirtualE
self.name = 'inventory source (%s)' % replace_text
if 'name' not in update_fields:
update_fields.append('name')
# Reset revision if SCM source has changed parameters
if self.source == 'scm' and not is_new_instance:
before_is = self.__class__.objects.get(pk=self.pk)
if before_is.source_path != self.source_path or before_is.source_project_id != self.source_project_id:
# Reset the scm_revision if file changed to force update
self.scm_last_revision = ''
if 'scm_last_revision' not in update_fields:
update_fields.append('scm_last_revision')
# Do the actual save.
super(InventorySource, self).save(*args, **kwargs)
@@ -1054,10 +1035,6 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions, CustomVirtualE
if replace_text in self.name:
self.name = self.name.replace(replace_text, str(self.pk))
super(InventorySource, self).save(update_fields=['name'])
if self.source == 'scm' and is_new_instance and self.update_on_project_update:
# Schedule a new Project update if one is not already queued
if self.source_project and not self.source_project.project_updates.filter(status__in=['new', 'pending', 'waiting']).exists():
self.update()
if not getattr(_inventory_updates, 'is_updating', False):
if self.inventory is not None:
self.inventory.update_computed_fields()
@@ -1147,25 +1124,6 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions, CustomVirtualE
)
return dict(error=list(error_notification_templates), started=list(started_notification_templates), success=list(success_notification_templates))
def clean_update_on_project_update(self):
if (
self.update_on_project_update is True
and self.source == 'scm'
and InventorySource.objects.filter(Q(inventory=self.inventory, update_on_project_update=True, source='scm') & ~Q(id=self.id)).exists()
):
raise ValidationError(_("More than one SCM-based inventory source with update on project update per-inventory not allowed."))
return self.update_on_project_update
def clean_update_on_launch(self):
if self.update_on_project_update is True and self.source == 'scm' and self.update_on_launch is True:
raise ValidationError(
_(
"Cannot update SCM-based inventory source on launch if set to update on project update. "
"Instead, configure the corresponding source project to update on launch."
)
)
return self.update_on_launch
def clean_source_path(self):
if self.source != 'scm' and self.source_path:
raise ValidationError(_("Cannot set source_path if not SCM type."))
@@ -1301,13 +1259,6 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions, JobNotificationMixin,
return self.global_instance_groups
return selected_groups
def cancel(self, job_explanation=None, is_chain=False):
res = super(InventoryUpdate, self).cancel(job_explanation=job_explanation, is_chain=is_chain)
if res:
if self.launch_type != 'scm' and self.source_project_update:
self.source_project_update.cancel(job_explanation=job_explanation)
return res
class CustomInventoryScript(CommonModelNameNotUnique, ResourceMixin):
class Meta:

View File

@@ -743,6 +743,12 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
return "$hidden due to Ansible no_log flag$"
return artifacts
def get_effective_artifacts(self, **kwargs):
"""Return unified job artifacts (from set_stats) to pass downstream in workflows"""
if isinstance(self.artifacts, dict):
return self.artifacts
return {}
@property
def is_container_group_task(self):
return bool(self.instance_group and self.instance_group.is_container_group)

View File

@@ -533,7 +533,7 @@ class UnifiedJob(
('workflow', _('Workflow')), # Job was started from a workflow job.
('webhook', _('Webhook')), # Job was started from a webhook event.
('sync', _('Sync')), # Job was started from a project sync.
('scm', _('SCM Update')), # Job was created as an Inventory SCM sync.
('scm', _('SCM Update')), # (deprecated) Job was created as an Inventory SCM sync.
]
PASSWORD_FIELDS = ('start_args',)
@@ -1204,6 +1204,10 @@ class UnifiedJob(
pass
return None
def get_effective_artifacts(self, **kwargs):
"""Return unified job artifacts (from set_stats) to pass downstream in workflows"""
return {}
def get_passwords_needed_to_start(self):
return []

View File

@@ -318,8 +318,8 @@ class WorkflowJobNode(WorkflowNodeBase):
for parent_node in self.get_parent_nodes():
is_root_node = False
aa_dict.update(parent_node.ancestor_artifacts)
if parent_node.job and hasattr(parent_node.job, 'artifacts'):
aa_dict.update(parent_node.job.artifacts)
if parent_node.job:
aa_dict.update(parent_node.job.get_effective_artifacts(parents_set=set([self.workflow_job_id])))
if aa_dict and not is_root_node:
self.ancestor_artifacts = aa_dict
self.save(update_fields=['ancestor_artifacts'])
@@ -682,6 +682,27 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
wj = wj.get_workflow_job()
return ancestors
def get_effective_artifacts(self, **kwargs):
"""
For downstream jobs of a workflow nested inside of a workflow,
we send aggregated artifacts from the nodes inside of the nested workflow
"""
artifacts = {}
job_queryset = (
UnifiedJob.objects.filter(unified_job_node__workflow_job=self)
.defer('job_args', 'job_cwd', 'start_args', 'result_traceback')
.order_by('finished', 'id')
.filter(status__in=['successful', 'failed'])
.iterator()
)
parents_set = kwargs.get('parents_set', set())
new_parents_set = parents_set | {self.id}
for job in job_queryset:
if job.id in parents_set:
continue
artifacts.update(job.get_effective_artifacts(parents_set=new_parents_set))
return artifacts
def get_notification_templates(self):
return self.workflow_job_template.notification_templates

View File

@@ -248,11 +248,11 @@ class TaskManager:
workflow_job.save(update_fields=update_fields)
status_changed = True
if status_changed:
if workflow_job.spawned_by_workflow:
schedule_task_manager()
workflow_job.websocket_emit_status(workflow_job.status)
# Operations whose queries rely on modifications made during the atomic scheduling session
workflow_job.send_notification_templates('succeeded' if workflow_job.status == 'successful' else 'failed')
if workflow_job.spawned_by_workflow:
schedule_task_manager()
return result
@timeit

View File

@@ -16,6 +16,7 @@ from awx.main.redact import UriCleaner
from awx.main.constants import MINIMAL_EVENTS, ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE
from awx.main.utils.update_model import update_model
from awx.main.queue import CallbackQueueDispatcher
from awx.main.tasks.signals import signal_callback
logger = logging.getLogger('awx.main.tasks.callback')
@@ -179,7 +180,13 @@ class RunnerCallback:
Ansible runner callback to tell the job when/if it is canceled
"""
unified_job_id = self.instance.pk
self.instance = self.update_model(unified_job_id)
if signal_callback():
return True
try:
self.instance = self.update_model(unified_job_id)
except Exception:
logger.exception(f'Encountered error during cancel check for {unified_job_id}, canceling now')
return True
if not self.instance:
logger.error('unified job {} was deleted while running, canceling'.format(unified_job_id))
return True

View File

@@ -19,7 +19,6 @@ from uuid import uuid4
# Django
from django.conf import settings
from django.db import transaction
# Runner
@@ -34,7 +33,6 @@ from gitdb.exc import BadName as BadGitName
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_local_queuename
from awx.main.constants import (
ACTIVE_STATES,
PRIVILEGE_ESCALATION_METHODS,
STANDARD_INVENTORY_UPDATE_ENV,
JOB_FOLDER_PREFIX,
@@ -64,6 +62,7 @@ from awx.main.tasks.callback import (
RunnerCallbackForProjectUpdate,
RunnerCallbackForSystemJob,
)
from awx.main.tasks.signals import with_signal_handling, signal_callback
from awx.main.tasks.receptor import AWXReceptorJob
from awx.main.exceptions import AwxTaskError, PostRunError, ReceptorNodeNotFound
from awx.main.utils.ansible import read_ansible_config
@@ -394,6 +393,7 @@ class BaseTask(object):
instance.save(update_fields=['ansible_version'])
@with_path_cleanup
@with_signal_handling
def run(self, pk, **kwargs):
"""
Run the job/task and capture its output.
@@ -425,7 +425,7 @@ class BaseTask(object):
private_data_dir = self.build_private_data_dir(self.instance)
self.pre_run_hook(self.instance, private_data_dir)
self.instance.log_lifecycle("preparing_playbook")
if self.instance.cancel_flag:
if self.instance.cancel_flag or signal_callback():
self.instance = self.update_model(self.instance.pk, status='canceled')
if self.instance.status != 'running':
# Stop the task chain and prevent starting the job if it has
@@ -547,6 +547,11 @@ class BaseTask(object):
self.runner_callback.delay_update(skip_if_already_set=True, job_explanation=f"Job terminated due to {status}")
if status == 'timeout':
status = 'failed'
elif status == 'canceled':
self.instance = self.update_model(pk)
if (getattr(self.instance, 'cancel_flag', False) is False) and signal_callback():
self.runner_callback.delay_update(job_explanation="Task was canceled due to receiving a shutdown signal.")
status = 'failed'
except ReceptorNodeNotFound as exc:
self.runner_callback.delay_update(job_explanation=str(exc))
except Exception:
@@ -1168,64 +1173,6 @@ class RunProjectUpdate(BaseTask):
d[r'^Are you sure you want to continue connecting \(yes/no\)\?\s*?$'] = 'yes'
return d
def _update_dependent_inventories(self, project_update, dependent_inventory_sources):
scm_revision = project_update.project.scm_revision
inv_update_class = InventoryUpdate._get_task_class()
for inv_src in dependent_inventory_sources:
if not inv_src.update_on_project_update:
continue
if inv_src.scm_last_revision == scm_revision:
logger.debug('Skipping SCM inventory update for `{}` because ' 'project has not changed.'.format(inv_src.name))
continue
logger.debug('Local dependent inventory update for `{}`.'.format(inv_src.name))
with transaction.atomic():
if InventoryUpdate.objects.filter(inventory_source=inv_src, status__in=ACTIVE_STATES).exists():
logger.debug('Skipping SCM inventory update for `{}` because ' 'another update is already active.'.format(inv_src.name))
continue
if settings.IS_K8S:
instance_group = InventoryUpdate(inventory_source=inv_src).preferred_instance_groups[0]
else:
instance_group = project_update.instance_group
local_inv_update = inv_src.create_inventory_update(
_eager_fields=dict(
launch_type='scm',
status='running',
instance_group=instance_group,
execution_node=project_update.execution_node,
controller_node=project_update.execution_node,
source_project_update=project_update,
celery_task_id=project_update.celery_task_id,
)
)
local_inv_update.log_lifecycle("controller_node_chosen")
local_inv_update.log_lifecycle("execution_node_chosen")
try:
create_partition(local_inv_update.event_class._meta.db_table, start=local_inv_update.created)
inv_update_class().run(local_inv_update.id)
except Exception:
logger.exception('{} Unhandled exception updating dependent SCM inventory sources.'.format(project_update.log_format))
try:
project_update.refresh_from_db()
except ProjectUpdate.DoesNotExist:
logger.warning('Project update deleted during updates of dependent SCM inventory sources.')
break
try:
local_inv_update.refresh_from_db()
except InventoryUpdate.DoesNotExist:
logger.warning('%s Dependent inventory update deleted during execution.', project_update.log_format)
continue
if project_update.cancel_flag:
logger.info('Project update {} was canceled while updating dependent inventories.'.format(project_update.log_format))
break
if local_inv_update.cancel_flag:
logger.info('Continuing to process project dependencies after {} was canceled'.format(local_inv_update.log_format))
if local_inv_update.status == 'successful':
inv_src.scm_last_revision = scm_revision
inv_src.save(update_fields=['scm_last_revision'])
def release_lock(self, instance):
try:
fcntl.lockf(self.lock_fd, fcntl.LOCK_UN)
@@ -1435,12 +1382,6 @@ class RunProjectUpdate(BaseTask):
p.inventory_files = p.inventories
p.save(update_fields=['scm_revision', 'playbook_files', 'inventory_files'])
# Update any inventories that depend on this project
dependent_inventory_sources = p.scm_inventory_sources.filter(update_on_project_update=True)
if len(dependent_inventory_sources) > 0:
if status == 'successful' and instance.launch_type != 'sync':
self._update_dependent_inventories(instance, dependent_inventory_sources)
def build_execution_environment_params(self, instance, private_data_dir):
if settings.IS_K8S:
return {}
@@ -1620,9 +1561,7 @@ class RunInventoryUpdate(BaseTask):
source_project = None
if inventory_update.inventory_source:
source_project = inventory_update.inventory_source.source_project
if (
inventory_update.source == 'scm' and inventory_update.launch_type != 'scm' and source_project and source_project.scm_type
): # never ever update manual projects
if inventory_update.source == 'scm' and source_project and source_project.scm_type: # never ever update manual projects
# Check if the content cache exists, so that we do not unnecessarily re-download roles
sync_needs = ['update_{}'.format(source_project.scm_type)]
@@ -1655,8 +1594,6 @@ class RunInventoryUpdate(BaseTask):
sync_task = project_update_task(job_private_data_dir=private_data_dir)
sync_task.run(local_project_sync.id)
local_project_sync.refresh_from_db()
inventory_update.inventory_source.scm_last_revision = local_project_sync.scm_revision
inventory_update.inventory_source.save(update_fields=['scm_last_revision'])
except Exception:
inventory_update = self.update_model(
inventory_update.pk,
@@ -1667,9 +1604,6 @@ class RunInventoryUpdate(BaseTask):
),
)
raise
elif inventory_update.source == 'scm' and inventory_update.launch_type == 'scm' and source_project:
# This follows update, not sync, so make copy here
RunProjectUpdate.make_local_copy(source_project, private_data_dir)
def post_run_hook(self, inventory_update, status):
super(RunInventoryUpdate, self).post_run_hook(inventory_update, status)

63
awx/main/tasks/signals.py Normal file
View File

@@ -0,0 +1,63 @@
import signal
import functools
import logging
logger = logging.getLogger('awx.main.tasks.signals')
__all__ = ['with_signal_handling', 'signal_callback']
class SignalState:
def reset(self):
self.sigterm_flag = False
self.is_active = False
self.original_sigterm = None
self.original_sigint = None
def __init__(self):
self.reset()
def set_flag(self, *args):
"""Method to pass into the python signal.signal method to receive signals"""
self.sigterm_flag = True
def connect_signals(self):
self.original_sigterm = signal.getsignal(signal.SIGTERM)
self.original_sigint = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGTERM, self.set_flag)
signal.signal(signal.SIGINT, self.set_flag)
self.is_active = True
def restore_signals(self):
signal.signal(signal.SIGTERM, self.original_sigterm)
signal.signal(signal.SIGINT, self.original_sigint)
self.reset()
signal_state = SignalState()
def signal_callback():
return signal_state.sigterm_flag
def with_signal_handling(f):
"""
Change signal handling to make signal_callback return True in event of SIGTERM or SIGINT.
"""
@functools.wraps(f)
def _wrapped(*args, **kwargs):
try:
this_is_outermost_caller = False
if not signal_state.is_active:
signal_state.connect_signals()
this_is_outermost_caller = True
return f(*args, **kwargs)
finally:
if this_is_outermost_caller:
signal_state.restore_signals()
return _wrapped

View File

@@ -114,10 +114,6 @@ def inform_cluster_of_shutdown():
try:
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal'))
try:
reaper.reap(this_inst)
except Exception:
logger.exception('failed to reap jobs for {}'.format(this_inst.hostname))
logger.warning('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.'.format(this_inst.hostname))
except Exception:
logger.exception('Encountered problem with normal shutdown signal.')

View File

@@ -2,8 +2,9 @@
"ANSIBLE_JINJA2_NATIVE": "True",
"ANSIBLE_TRANSFORM_INVALID_GROUP_CHARS": "never",
"GCE_CREDENTIALS_FILE_PATH": "{{ file_reference }}",
"GOOGLE_APPLICATION_CREDENTIALS": "{{ file_reference }}",
"GCP_AUTH_KIND": "serviceaccount",
"GCP_ENV_TYPE": "tower",
"GCP_PROJECT": "fooo",
"GCP_SERVICE_ACCOUNT_FILE": "{{ file_reference }}"
}
}

View File

@@ -26,6 +26,7 @@ def test_empty():
"workflow_job_template": 0,
"unified_job": 0,
"pending_jobs": 0,
"database_connections": 1,
}

View File

@@ -31,6 +31,7 @@ EXPECTED_VALUES = {
'awx_license_instance_total': 0,
'awx_license_instance_free': 0,
'awx_pending_jobs_total': 0,
'awx_database_connections_total': 1,
}

View File

@@ -9,9 +9,7 @@ from awx.api.versioning import reverse
@pytest.fixture
def ec2_source(inventory, project):
with mock.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.update'):
return inventory.inventory_sources.create(
name='some_source', update_on_project_update=True, source='ec2', source_project=project, scm_last_revision=project.scm_revision
)
return inventory.inventory_sources.create(name='some_source', source='ec2', source_project=project)
@pytest.fixture

View File

@@ -13,9 +13,7 @@ from awx.main.models import InventorySource, Inventory, ActivityStream
@pytest.fixture
def scm_inventory(inventory, project):
with mock.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.update'):
inventory.inventory_sources.create(
name='foobar', update_on_project_update=True, source='scm', source_project=project, scm_last_revision=project.scm_revision
)
inventory.inventory_sources.create(name='foobar', source='scm', source_project=project)
return inventory
@@ -23,9 +21,7 @@ def scm_inventory(inventory, project):
def factory_scm_inventory(inventory, project):
def fn(**kwargs):
with mock.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.update'):
return inventory.inventory_sources.create(
source_project=project, overwrite_vars=True, source='scm', scm_last_revision=project.scm_revision, **kwargs
)
return inventory.inventory_sources.create(source_project=project, overwrite_vars=True, source='scm', **kwargs)
return fn
@@ -544,15 +540,12 @@ class TestControlledBySCM:
def test_safe_method_works(self, get, options, scm_inventory, admin_user):
get(scm_inventory.get_absolute_url(), admin_user, expect=200)
options(scm_inventory.get_absolute_url(), admin_user, expect=200)
assert InventorySource.objects.get(inventory=scm_inventory.pk).scm_last_revision != ''
def test_vars_edit_reset(self, patch, scm_inventory, admin_user):
patch(scm_inventory.get_absolute_url(), {'variables': 'hello: world'}, admin_user, expect=200)
assert InventorySource.objects.get(inventory=scm_inventory.pk).scm_last_revision == ''
def test_name_edit_allowed(self, patch, scm_inventory, admin_user):
patch(scm_inventory.get_absolute_url(), {'variables': '---', 'name': 'newname'}, admin_user, expect=200)
assert InventorySource.objects.get(inventory=scm_inventory.pk).scm_last_revision != ''
def test_host_associations_reset(self, post, scm_inventory, admin_user):
inv_src = scm_inventory.inventory_sources.first()
@@ -560,14 +553,12 @@ class TestControlledBySCM:
g = inv_src.groups.create(name='fooland', inventory=scm_inventory)
post(reverse('api:host_groups_list', kwargs={'pk': h.id}), {'id': g.id}, admin_user, expect=204)
post(reverse('api:group_hosts_list', kwargs={'pk': g.id}), {'id': h.id}, admin_user, expect=204)
assert InventorySource.objects.get(inventory=scm_inventory.pk).scm_last_revision == ''
def test_group_group_associations_reset(self, post, scm_inventory, admin_user):
inv_src = scm_inventory.inventory_sources.first()
g1 = inv_src.groups.create(name='barland', inventory=scm_inventory)
g2 = inv_src.groups.create(name='fooland', inventory=scm_inventory)
post(reverse('api:group_children_list', kwargs={'pk': g1.id}), {'id': g2.id}, admin_user, expect=204)
assert InventorySource.objects.get(inventory=scm_inventory.pk).scm_last_revision == ''
def test_host_group_delete_reset(self, delete, scm_inventory, admin_user):
inv_src = scm_inventory.inventory_sources.first()
@@ -575,7 +566,6 @@ class TestControlledBySCM:
g = inv_src.groups.create(name='fooland', inventory=scm_inventory)
delete(h.get_absolute_url(), admin_user, expect=204)
delete(g.get_absolute_url(), admin_user, expect=204)
assert InventorySource.objects.get(inventory=scm_inventory.pk).scm_last_revision == ''
def test_remove_scm_inv_src(self, delete, scm_inventory, admin_user):
inv_src = scm_inventory.inventory_sources.first()
@@ -588,7 +578,6 @@ class TestControlledBySCM:
{
'name': 'new inv src',
'source_project': project.pk,
'update_on_project_update': False,
'source': 'scm',
'overwrite_vars': True,
'source_vars': 'plugin: a.b.c',
@@ -597,27 +586,6 @@ class TestControlledBySCM:
expect=201,
)
def test_adding_inv_src_prohibited(self, post, scm_inventory, project, admin_user):
post(
reverse('api:inventory_inventory_sources_list', kwargs={'pk': scm_inventory.id}),
{'name': 'new inv src', 'source_project': project.pk, 'update_on_project_update': True, 'source': 'scm', 'overwrite_vars': True},
admin_user,
expect=400,
)
def test_two_update_on_project_update_inv_src_prohibited(self, patch, scm_inventory, factory_scm_inventory, project, admin_user):
scm_inventory2 = factory_scm_inventory(name="scm_inventory2")
res = patch(
reverse('api:inventory_source_detail', kwargs={'pk': scm_inventory2.id}),
{
'update_on_project_update': True,
},
admin_user,
expect=400,
)
content = json.loads(res.content)
assert content['update_on_project_update'] == ["More than one SCM-based inventory source with update on project update " "per-inventory not allowed."]
def test_adding_inv_src_without_proj_access_prohibited(self, post, project, inventory, rando):
inventory.admin_role.members.add(rando)
post(

View File

@@ -347,9 +347,7 @@ def scm_inventory_source(inventory, project):
source_project=project,
source='scm',
source_path='inventory_file',
update_on_project_update=True,
inventory=inventory,
scm_last_revision=project.scm_revision,
)
with mock.patch('awx.main.models.unified_jobs.UnifiedJobTemplate.update'):
inv_src.save()

View File

@@ -3,8 +3,6 @@
import pytest
from unittest import mock
from django.core.exceptions import ValidationError
# AWX
from awx.main.models import Host, Inventory, InventorySource, InventoryUpdate, CredentialType, Credential, Job
from awx.main.constants import CLOUD_PROVIDERS
@@ -123,19 +121,6 @@ class TestActiveCount:
@pytest.mark.django_db
class TestSCMUpdateFeatures:
def test_automatic_project_update_on_create(self, inventory, project):
inv_src = InventorySource(source_project=project, source_path='inventory_file', inventory=inventory, update_on_project_update=True, source='scm')
with mock.patch.object(inv_src, 'update') as mck_update:
inv_src.save()
mck_update.assert_called_once_with()
def test_reset_scm_revision(self, scm_inventory_source):
starting_rev = scm_inventory_source.scm_last_revision
assert starting_rev != ''
scm_inventory_source.source_path = '/newfolder/newfile.ini'
scm_inventory_source.save()
assert scm_inventory_source.scm_last_revision == ''
def test_source_location(self, scm_inventory_source):
# Combines project directory with the inventory file specified
inventory_update = InventoryUpdate(inventory_source=scm_inventory_source, source_path=scm_inventory_source.source_path)
@@ -167,22 +152,6 @@ class TestRelatedJobs:
assert job.id in [jerb.id for jerb in group._get_related_jobs()]
@pytest.mark.django_db
class TestSCMClean:
def test_clean_update_on_project_update_multiple(self, inventory):
inv_src1 = InventorySource(inventory=inventory, update_on_project_update=True, source='scm')
inv_src1.clean_update_on_project_update()
inv_src1.save()
inv_src1.source_vars = '---\nhello: world'
inv_src1.clean_update_on_project_update()
inv_src2 = InventorySource(inventory=inventory, update_on_project_update=True, source='scm')
with pytest.raises(ValidationError):
inv_src2.clean_update_on_project_update()
@pytest.mark.django_db
class TestInventorySourceInjectors:
def test_extra_credentials(self, project, credential):

View File

@@ -19,6 +19,7 @@ from awx.api.views import WorkflowJobTemplateNodeSuccessNodesList
# Django
from django.test import TransactionTestCase
from django.core.exceptions import ValidationError
from django.utils.timezone import now
class TestWorkflowDAGFunctional(TransactionTestCase):
@@ -381,3 +382,38 @@ def test_workflow_ancestors_recursion_prevention(organization):
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=wfjt, job=wfj) # well, this is a problem
# mostly, we just care that this assertion finishes in finite time
assert wfj.get_ancestor_workflows() == []
@pytest.mark.django_db
class TestCombinedArtifacts:
@pytest.fixture
def wfj_artifacts(self, job_template, organization):
wfjt = WorkflowJobTemplate.objects.create(organization=organization, name='has_artifacts')
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt, launch_type='workflow')
job = job_template.create_unified_job(_eager_fields=dict(artifacts={'foooo': 'bar'}, status='successful', finished=now()))
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=job_template, job=job)
return wfj
def test_multiple_types(self, project, wfj_artifacts):
project_update = project.create_unified_job()
WorkflowJobNode.objects.create(workflow_job=wfj_artifacts, unified_job_template=project, job=project_update)
assert wfj_artifacts.get_effective_artifacts() == {'foooo': 'bar'}
def test_precedence_based_on_time(self, wfj_artifacts, job_template):
later_job = job_template.create_unified_job(
_eager_fields=dict(artifacts={'foooo': 'zoo'}, status='successful', finished=now()) # finished later, should win
)
WorkflowJobNode.objects.create(workflow_job=wfj_artifacts, unified_job_template=job_template, job=later_job)
assert wfj_artifacts.get_effective_artifacts() == {'foooo': 'zoo'}
def test_bad_data_with_artifacts(self, organization):
# This is toxic database data, this tests that it doesn't create an infinite loop
wfjt = WorkflowJobTemplate.objects.create(organization=organization, name='child')
wfj = WorkflowJob.objects.create(workflow_job_template=wfjt, launch_type='workflow')
WorkflowJobNode.objects.create(workflow_job=wfj, unified_job_template=wfjt, job=wfj)
job = Job.objects.create(artifacts={'foo': 'bar'}, status='successful')
WorkflowJobNode.objects.create(workflow_job=wfj, job=job)
# mostly, we just care that this assertion finishes in finite time
assert wfj.get_effective_artifacts() == {'foo': 'bar'}

View File

@@ -4,9 +4,8 @@ import os
import tempfile
import shutil
from awx.main.tasks.jobs import RunProjectUpdate, RunInventoryUpdate
from awx.main.tasks.system import execution_node_health_check, _cleanup_images_and_files
from awx.main.models import ProjectUpdate, InventoryUpdate, InventorySource, Instance, Job
from awx.main.models import Instance, Job
@pytest.fixture
@@ -27,63 +26,6 @@ def test_no_worker_info_on_AWX_nodes(node_type):
execution_node_health_check(hostname)
@pytest.mark.django_db
class TestDependentInventoryUpdate:
def test_dependent_inventory_updates_is_called(self, scm_inventory_source, scm_revision_file, mock_me):
task = RunProjectUpdate()
task.revision_path = scm_revision_file
proj_update = scm_inventory_source.source_project.create_project_update()
with mock.patch.object(RunProjectUpdate, '_update_dependent_inventories') as inv_update_mck:
with mock.patch.object(RunProjectUpdate, 'release_lock'):
task.post_run_hook(proj_update, 'successful')
inv_update_mck.assert_called_once_with(proj_update, mock.ANY)
def test_no_unwanted_dependent_inventory_updates(self, project, scm_revision_file, mock_me):
task = RunProjectUpdate()
task.revision_path = scm_revision_file
proj_update = project.create_project_update()
with mock.patch.object(RunProjectUpdate, '_update_dependent_inventories') as inv_update_mck:
with mock.patch.object(RunProjectUpdate, 'release_lock'):
task.post_run_hook(proj_update, 'successful')
assert not inv_update_mck.called
def test_dependent_inventory_updates(self, scm_inventory_source, default_instance_group, mock_me):
task = RunProjectUpdate()
scm_inventory_source.scm_last_revision = ''
proj_update = ProjectUpdate.objects.create(project=scm_inventory_source.source_project)
with mock.patch.object(RunInventoryUpdate, 'run') as iu_run_mock:
with mock.patch('awx.main.tasks.jobs.create_partition'):
task._update_dependent_inventories(proj_update, [scm_inventory_source])
assert InventoryUpdate.objects.count() == 1
inv_update = InventoryUpdate.objects.first()
iu_run_mock.assert_called_once_with(inv_update.id)
assert inv_update.source_project_update_id == proj_update.pk
def test_dependent_inventory_project_cancel(self, project, inventory, default_instance_group, mock_me):
"""
Test that dependent inventory updates exhibit good behavior on cancel
of the source project update
"""
task = RunProjectUpdate()
proj_update = ProjectUpdate.objects.create(project=project)
kwargs = dict(source_project=project, source='scm', source_path='inventory_file', update_on_project_update=True, inventory=inventory)
is1 = InventorySource.objects.create(name="test-scm-inv", **kwargs)
is2 = InventorySource.objects.create(name="test-scm-inv2", **kwargs)
def user_cancels_project(pk):
ProjectUpdate.objects.all().update(cancel_flag=True)
with mock.patch.object(RunInventoryUpdate, 'run') as iu_run_mock:
with mock.patch('awx.main.tasks.jobs.create_partition'):
iu_run_mock.side_effect = user_cancels_project
task._update_dependent_inventories(proj_update, [is1, is2])
# Verify that it bails after 1st update, detecting a cancel
assert is2.inventory_updates.count() == 0
iu_run_mock.assert_called_once()
@pytest.fixture
def mock_job_folder(request):
pdd_path = tempfile.mkdtemp(prefix='awx_123_')

View File

@@ -69,21 +69,21 @@ class TestJobTemplateLabelList:
class TestInventoryInventorySourcesUpdate:
@pytest.mark.parametrize(
"can_update, can_access, is_source, is_up_on_proj, expected",
"can_update, can_access, is_source, expected",
[
(True, True, "ec2", False, [{'status': 'started', 'inventory_update': 1, 'inventory_source': 1}]),
(False, True, "gce", False, [{'status': 'Could not start because `can_update` returned False', 'inventory_source': 1}]),
(True, False, "scm", True, [{'status': 'started', 'inventory_update': 1, 'inventory_source': 1}]),
(True, True, "ec2", [{'status': 'started', 'inventory_update': 1, 'inventory_source': 1}]),
(False, True, "gce", [{'status': 'Could not start because `can_update` returned False', 'inventory_source': 1}]),
(True, False, "scm", [{'status': 'started', 'inventory_update': 1, 'inventory_source': 1}]),
],
)
def test_post(self, mocker, can_update, can_access, is_source, is_up_on_proj, expected):
def test_post(self, mocker, can_update, can_access, is_source, expected):
class InventoryUpdate:
id = 1
class Project:
name = 'project'
InventorySource = namedtuple('InventorySource', ['source', 'update_on_project_update', 'pk', 'can_update', 'update', 'source_project'])
InventorySource = namedtuple('InventorySource', ['source', 'pk', 'can_update', 'update', 'source_project'])
class InventorySources(object):
def all(self):
@@ -92,7 +92,6 @@ class TestInventoryInventorySourcesUpdate:
pk=1,
source=is_source,
source_project=Project,
update_on_project_update=is_up_on_proj,
can_update=can_update,
update=lambda: InventoryUpdate,
)

View File

@@ -1,28 +1,13 @@
import pytest
from unittest import mock
from django.core.exceptions import ValidationError
from awx.main.models import (
UnifiedJob,
InventoryUpdate,
InventorySource,
)
def test_cancel(mocker):
with mock.patch.object(UnifiedJob, 'cancel', return_value=True) as parent_cancel:
iu = InventoryUpdate()
iu.save = mocker.MagicMock()
build_job_explanation_mock = mocker.MagicMock()
iu._build_job_explanation = mocker.MagicMock(return_value=build_job_explanation_mock)
iu.cancel()
parent_cancel.assert_called_with(is_chain=False, job_explanation=None)
def test__build_job_explanation():
iu = InventoryUpdate(id=3, name='I_am_an_Inventory_Update')
@@ -53,9 +38,3 @@ class TestControlledBySCM:
with pytest.raises(ValidationError):
inv_src.clean_source_path()
def test_clean_update_on_launch_update_on_project_update(self):
inv_src = InventorySource(update_on_project_update=True, update_on_launch=True, source='scm')
with pytest.raises(ValidationError):
inv_src.clean_update_on_launch()

View File

@@ -1,7 +1,7 @@
from awx.main.tasks.callback import RunnerCallback
from awx.main.constants import ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
def test_delay_update(mock_me):

View File

@@ -0,0 +1,50 @@
import signal
from awx.main.tasks.signals import signal_state, signal_callback, with_signal_handling
def test_outer_inner_signal_handling():
"""
Even if the flag is set in the outer context, its value should persist in the inner context
"""
@with_signal_handling
def f2():
assert signal_callback()
@with_signal_handling
def f1():
assert signal_callback() is False
signal_state.set_flag()
assert signal_callback()
f2()
original_sigterm = signal.getsignal(signal.SIGTERM)
assert signal_callback() is False
f1()
assert signal_callback() is False
assert signal.getsignal(signal.SIGTERM) is original_sigterm
def test_inner_outer_signal_handling():
"""
Even if the flag is set in the inner context, its value should persist in the outer context
"""
@with_signal_handling
def f2():
assert signal_callback() is False
signal_state.set_flag()
assert signal_callback()
@with_signal_handling
def f1():
assert signal_callback() is False
f2()
assert signal_callback()
original_sigterm = signal.getsignal(signal.SIGTERM)
assert signal_callback() is False
f1()
assert signal_callback() is False
assert signal.getsignal(signal.SIGTERM) is original_sigterm

View File

@@ -922,7 +922,8 @@ class TestJobCredentials(TestJobExecution):
assert env['AWS_SECURITY_TOKEN'] == 'token'
assert safe_env['AWS_SECRET_ACCESS_KEY'] == HIDDEN_PASSWORD
def test_gce_credentials(self, private_data_dir, job, mock_me):
@pytest.mark.parametrize("cred_env_var", ['GCE_CREDENTIALS_FILE_PATH', 'GOOGLE_APPLICATION_CREDENTIALS'])
def test_gce_credentials(self, cred_env_var, private_data_dir, job, mock_me):
gce = CredentialType.defaults['gce']()
credential = Credential(pk=1, credential_type=gce, inputs={'username': 'bob', 'project': 'some-project', 'ssh_key_data': self.EXAMPLE_PRIVATE_KEY})
credential.inputs['ssh_key_data'] = encrypt_field(credential, 'ssh_key_data')
@@ -931,7 +932,7 @@ class TestJobCredentials(TestJobExecution):
env = {}
safe_env = {}
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
runner_path = env['GCE_CREDENTIALS_FILE_PATH']
runner_path = env[cred_env_var]
local_path = to_host_path(runner_path, private_data_dir)
json_data = json.load(open(local_path, 'rb'))
assert json_data['type'] == 'service_account'
@@ -1316,6 +1317,7 @@ class TestJobCredentials(TestJobExecution):
assert env['AZURE_AD_USER'] == 'bob'
assert env['AZURE_PASSWORD'] == 'secret'
# Because this is testing a mix of multiple cloud creds, we are not going to test the GOOGLE_APPLICATION_CREDENTIALS here
path = to_host_path(env['GCE_CREDENTIALS_FILE_PATH'], private_data_dir)
json_data = json.load(open(path, 'rb'))
assert json_data['type'] == 'service_account'
@@ -1645,7 +1647,8 @@ class TestInventoryUpdateCredentials(TestJobExecution):
assert safe_env['AZURE_PASSWORD'] == HIDDEN_PASSWORD
def test_gce_source(self, inventory_update, private_data_dir, mocker, mock_me):
@pytest.mark.parametrize("cred_env_var", ['GCE_CREDENTIALS_FILE_PATH', 'GOOGLE_APPLICATION_CREDENTIALS'])
def test_gce_source(self, cred_env_var, inventory_update, private_data_dir, mocker, mock_me):
task = jobs.RunInventoryUpdate()
task.instance = inventory_update
gce = CredentialType.defaults['gce']()
@@ -1669,7 +1672,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
credential.credential_type.inject_credential(credential, env, safe_env, [], private_data_dir)
assert env['GCE_ZONE'] == expected_gce_zone
json_data = json.load(open(env['GCE_CREDENTIALS_FILE_PATH'], 'rb'))
json_data = json.load(open(env[cred_env_var], 'rb'))
assert json_data['type'] == 'service_account'
assert json_data['private_key'] == self.EXAMPLE_PRIVATE_KEY
assert json_data['client_email'] == 'bob'

View File

@@ -3,6 +3,8 @@ from django.db import transaction, DatabaseError, InterfaceError
import logging
import time
from awx.main.tasks.signals import signal_callback
logger = logging.getLogger('awx.main.tasks.utils')
@@ -37,7 +39,10 @@ def update_model(model, pk, _attempt=0, _max_attempts=5, select_for_update=False
# Attempt to retry the update, assuming we haven't already
# tried too many times.
if _attempt < _max_attempts:
time.sleep(5)
for i in range(5):
time.sleep(1)
if signal_callback():
raise RuntimeError(f'Could not fetch {pk} because of receiving abort signal')
return update_model(model, pk, _attempt=_attempt + 1, _max_attempts=_max_attempts, **updates)
else:
logger.error('Failed to update %s after %d retries.', model._meta.object_name, _attempt)