Merge branch 'release_3.2.0' into devel

This commit is contained in:
Matthew Jones
2017-09-18 10:55:45 -04:00
48 changed files with 838 additions and 403 deletions

View File

@@ -27,7 +27,7 @@ from awx.main.models import * # noqa
from awx.main.models.unified_jobs import ACTIVE_STATES
from awx.main.models.mixins import ResourceMixin
from awx.conf.license import LicenseForbids
from awx.conf.license import LicenseForbids, feature_enabled
__all__ = ['get_user_queryset', 'check_user_access', 'check_user_access_with_errors',
'user_accessible_objects', 'consumer_access',
@@ -140,7 +140,14 @@ def get_user_capabilities(user, instance, **kwargs):
convenient for the user interface to consume and hide or show various
actions in the interface.
'''
access_class = access_registry[instance.__class__]
cls = instance.__class__
# When `.defer()` is used w/ the Django ORM, the result is a subclass of
# the original that represents e.g.,
# awx.main.models.ad_hoc_commands.AdHocCommand_Deferred_result_stdout_text
# We want to do the access registry lookup keyed on the base class name.
if getattr(cls, '_deferred', False):
cls = instance.__class__.__bases__[0]
access_class = access_registry[cls]
return access_class(user).get_user_capabilities(instance, **kwargs)
@@ -323,6 +330,10 @@ class BaseAccess(object):
if validation_errors:
user_capabilities[display_method] = False
continue
elif isinstance(obj, (WorkflowJobTemplate, WorkflowJob)):
if not feature_enabled('workflows'):
user_capabilities[display_method] = (display_method == 'delete')
continue
elif display_method == 'copy' and isinstance(obj, WorkflowJobTemplate) and obj.organization_id is None:
user_capabilities[display_method] = self.user.is_superuser
continue
@@ -482,8 +493,10 @@ class UserAccess(BaseAccess):
def can_change(self, obj, data):
if data is not None and ('is_superuser' in data or 'is_system_auditor' in data):
if (to_python_boolean(data.get('is_superuser', 'false'), allow_none=True) or
to_python_boolean(data.get('is_system_auditor', 'false'), allow_none=True)) and not self.user.is_superuser:
if to_python_boolean(data.get('is_superuser', 'false'), allow_none=True) and \
not self.user.is_superuser:
return False
if to_python_boolean(data.get('is_system_auditor', 'false'), allow_none=True) and not (self.user.is_superuser or self.user == obj):
return False
# A user can be changed if they are themselves, or by org admins or
# superusers. Change permission implies changing only certain fields
@@ -2068,6 +2081,8 @@ class UnifiedJobAccess(BaseAccess):
# 'job_template__project',
# 'job_template__credential',
#)
# TODO: remove this defer in 3.3 when we implement https://github.com/ansible/ansible-tower/issues/5436
qs = qs.defer('result_stdout_text')
return qs.all()

View File

@@ -1,8 +1,11 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import re
from django.utils.translation import ugettext_lazy as _
CLOUD_PROVIDERS = ('azure', 'azure_rm', 'ec2', 'gce', 'rax', 'vmware', 'openstack', 'satellite6', 'cloudforms')
SCHEDULEABLE_PROVIDERS = CLOUD_PROVIDERS + ('custom', 'scm',)
PRIVILEGE_ESCALATION_METHODS = [ ('sudo', _('Sudo')), ('su', _('Su')), ('pbrun', _('Pbrun')), ('pfexec', _('Pfexec')), ('dzdo', _('DZDO')), ('pmrun', _('Pmrun')), ('runas', _('Runas'))]
ANSI_SGR_PATTERN = re.compile(r'\x1b\[[0-9;]*m')

View File

@@ -17,6 +17,11 @@ class Command(BaseCommand):
Deprovision a Tower cluster node
"""
help = (
'Remove instance from the database. '
'Specify `--hostname` to use this command.'
)
option_list = BaseCommand.option_list + (
make_option('--hostname', dest='hostname', type='string',
help='Hostname used during provisioning'),

View File

@@ -16,6 +16,11 @@ class Command(BaseCommand):
Regsiter this instance with the database for HA tracking.
"""
help = (
'Add instance to the database. '
'Specify `--hostname` to use this command.'
)
option_list = BaseCommand.option_list + (
make_option('--hostname', dest='hostname', type='string',
help='Hostname used during provisioning'),

View File

@@ -14,7 +14,7 @@ def _create_fact_scan_project(ContentType, Project, org):
ct = ContentType.objects.get_for_model(Project)
name = "Tower Fact Scan - {}".format(org.name if org else "No Organization")
proj = Project(name=name,
scm_url='https://github.com/ansible/tower-fact-modules',
scm_url='https://github.com/ansible/awx-facts-playbooks',
scm_type='git',
scm_update_on_launch=True,
scm_update_cache_timeout=86400,

View File

@@ -145,3 +145,17 @@ activity_stream_registrar.connect(WorkflowJob)
# prevent API filtering on certain Django-supplied sensitive fields
prevent_search(User._meta.get_field('password'))
# Always, always, always defer result_stdout_text for polymorphic UnifiedJob rows
# TODO: remove this defer in 3.3 when we implement https://github.com/ansible/ansible-tower/issues/5436
def defer_stdout(f):
def _wrapped(*args, **kwargs):
objs = f(*args, **kwargs)
objs.query.deferred_loading[0].add('result_stdout_text')
return objs
return _wrapped
for cls in UnifiedJob.__subclasses__():
cls.base_objects.filter = defer_stdout(cls.base_objects.filter)

View File

@@ -300,7 +300,7 @@ class TaskManager():
# Already processed dependencies for this job
if job.dependent_jobs.all():
return False
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("created")
latest_inventory_update = InventoryUpdate.objects.filter(inventory_source=inventory_source).order_by("-created")
if not latest_inventory_update.exists():
return True
latest_inventory_update = latest_inventory_update.first()
@@ -323,7 +323,7 @@ class TaskManager():
now = tz_now()
if job.dependent_jobs.all():
return False
latest_project_update = ProjectUpdate.objects.filter(project=job.project).order_by("created")
latest_project_update = ProjectUpdate.objects.filter(project=job.project, job_type='check').order_by("-created")
if not latest_project_update.exists():
return True
latest_project_update = latest_project_update.first()
@@ -421,26 +421,40 @@ class TaskManager():
if not found_acceptable_queue:
logger.debug("%s couldn't be scheduled on graph, waiting for next cycle", task.log_format)
def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time):
def fail_jobs_if_not_in_celery(self, node_jobs, active_tasks, celery_task_start_time,
isolated=False):
for task in node_jobs:
if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
if isinstance(task, WorkflowJob):
continue
if task.modified > celery_task_start_time:
continue
task.status = 'failed'
task.job_explanation += ' '.join((
'Task was marked as running in Tower but was not present in',
'Celery, so it has been marked as failed.',
))
new_status = 'failed'
if isolated:
new_status = 'error'
task.status = new_status
if isolated:
# TODO: cancel and reap artifacts of lost jobs from heartbeat
task.job_explanation += ' '.join((
'Task was marked as running in Tower but its ',
'controller management daemon was not present in',
'Celery, so it has been marked as failed.',
'Task may still be running, but contactability is unknown.'
))
else:
task.job_explanation += ' '.join((
'Task was marked as running in Tower but was not present in',
'Celery, so it has been marked as failed.',
))
try:
task.save(update_fields=['status', 'job_explanation'])
except DatabaseError:
logger.error("Task {} DB error in marking failed. Job possibly deleted.".format(task.log_format))
continue
awx_tasks._send_notification_templates(task, 'failed')
task.websocket_emit_status('failed')
logger.error("Task {} has no record in celery. Marking as failed".format(task.log_format))
task.websocket_emit_status(new_status)
logger.error("{}Task {} has no record in celery. Marking as failed".format(
'Isolated ' if isolated else '', task.log_format))
def cleanup_inconsistent_celery_tasks(self):
'''
@@ -471,26 +485,36 @@ class TaskManager():
self.fail_jobs_if_not_in_celery(waiting_tasks, all_celery_task_ids, celery_task_start_time)
for node, node_jobs in running_tasks.iteritems():
isolated = False
if node in active_queues:
active_tasks = active_queues[node]
else:
'''
Node task list not found in celery. If tower thinks the node is down
then fail all the jobs on the node.
Node task list not found in celery. We may branch into cases:
- instance is unknown to tower, system is improperly configured
- instance is reported as down, then fail all jobs on the node
- instance is an isolated node, then check running tasks
among all allowed controller nodes for management process
'''
try:
instance = Instance.objects.get(hostname=node)
if instance.capacity == 0:
active_tasks = []
else:
continue
except Instance.DoesNotExist:
logger.error("Execution node Instance {} not found in database. "
"The node is currently executing jobs {}".format(node,
[j.log_format for j in node_jobs]))
active_tasks = []
instance = Instance.objects.filter(hostname=node).first()
self.fail_jobs_if_not_in_celery(node_jobs, active_tasks, celery_task_start_time)
if instance is None:
logger.error("Execution node Instance {} not found in database. "
"The node is currently executing jobs {}".format(
node, [j.log_format for j in node_jobs]))
active_tasks = []
elif instance.capacity == 0:
active_tasks = []
elif instance.rampart_groups.filter(controller__isnull=False).exists():
active_tasks = all_celery_task_ids
isolated = True
else:
continue
self.fail_jobs_if_not_in_celery(
node_jobs, active_tasks, celery_task_start_time,
isolated=isolated
)
def calculate_capacity_consumed(self, tasks):
self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph)

View File

@@ -388,6 +388,9 @@ def activity_stream_create(sender, instance, created, **kwargs):
# Skip recording any inventory source directly associated with a group.
if isinstance(instance, InventorySource) and instance.deprecated_group:
return
_type = type(instance)
if getattr(_type, '_deferred', False):
return
object1 = camelcase_to_underscore(instance.__class__.__name__)
changes = model_to_dict(instance, model_serializer_mapping)
# Special case where Job survey password variables need to be hidden
@@ -421,6 +424,9 @@ def activity_stream_update(sender, instance, **kwargs):
changes = model_instance_diff(old, new, model_serializer_mapping)
if changes is None:
return
_type = type(instance)
if getattr(_type, '_deferred', False):
return
object1 = camelcase_to_underscore(instance.__class__.__name__)
activity_entry = ActivityStream(
operation='update',
@@ -445,6 +451,9 @@ def activity_stream_delete(sender, instance, **kwargs):
# explicitly called with flag on in Inventory.schedule_deletion.
if isinstance(instance, Inventory) and not kwargs.get('inventory_delete_flag', False):
return
_type = type(instance)
if getattr(_type, '_deferred', False):
return
changes = model_to_dict(instance)
object1 = camelcase_to_underscore(instance.__class__.__name__)
activity_entry = ActivityStream(
@@ -466,6 +475,9 @@ def activity_stream_associate(sender, instance, **kwargs):
else:
return
obj1 = instance
_type = type(instance)
if getattr(_type, '_deferred', False):
return
object1=camelcase_to_underscore(obj1.__class__.__name__)
obj_rel = sender.__module__ + "." + sender.__name__
@@ -476,6 +488,9 @@ def activity_stream_associate(sender, instance, **kwargs):
if not obj2_actual.exists():
continue
obj2_actual = obj2_actual[0]
_type = type(obj2_actual)
if getattr(_type, '_deferred', False):
return
if isinstance(obj2_actual, Role) and obj2_actual.content_object is not None:
obj2_actual = obj2_actual.content_object
object2 = camelcase_to_underscore(obj2_actual.__class__.__name__)

View File

@@ -320,7 +320,11 @@ def awx_periodic_scheduler(self):
def _send_notification_templates(instance, status_str):
if status_str not in ['succeeded', 'failed']:
raise ValueError(_("status_str must be either succeeded or failed"))
notification_templates = instance.get_notification_templates()
try:
notification_templates = instance.get_notification_templates()
except:
logger.warn("No notification template defined for emitting notification")
notification_templates = None
if notification_templates:
if status_str == 'succeeded':
notification_template_type = 'success'
@@ -482,6 +486,7 @@ class BaseTask(LogErrorsTask):
model = None
abstract = True
cleanup_paths = []
proot_show_paths = []
def update_model(self, pk, _attempt=0, **updates):
"""Reload the model instance from the database and update the
@@ -793,6 +798,7 @@ class BaseTask(LogErrorsTask):
# May have to serialize the value
kwargs['private_data_files'] = self.build_private_data_files(instance, **kwargs)
kwargs['passwords'] = self.build_passwords(instance, **kwargs)
kwargs['proot_show_paths'] = self.proot_show_paths
args = self.build_args(instance, **kwargs)
safe_args = self.build_safe_args(instance, **kwargs)
output_replacements = self.build_output_replacements(instance, **kwargs)
@@ -1068,6 +1074,7 @@ class RunJob(BaseTask):
env['VMWARE_USER'] = cloud_cred.username
env['VMWARE_PASSWORD'] = decrypt_field(cloud_cred, 'password')
env['VMWARE_HOST'] = cloud_cred.host
env['VMWARE_VALIDATE_CERTS'] = str(settings.VMWARE_VALIDATE_CERTS)
elif cloud_cred and cloud_cred.kind == 'openstack':
env['OS_CLIENT_CONFIG_FILE'] = cred_files.get(cloud_cred, '')
@@ -1285,6 +1292,10 @@ class RunProjectUpdate(BaseTask):
name = 'awx.main.tasks.run_project_update'
model = ProjectUpdate
@property
def proot_show_paths(self):
return [settings.PROJECTS_ROOT]
def build_private_data(self, project_update, **kwargs):
'''
Return SSH private key data needed for this project update.
@@ -1298,7 +1309,7 @@ class RunProjectUpdate(BaseTask):
}
}
'''
handle, self.revision_path = tempfile.mkstemp(dir=settings.AWX_PROOT_BASE_PATH)
handle, self.revision_path = tempfile.mkstemp(dir=settings.PROJECTS_ROOT)
self.cleanup_paths.append(self.revision_path)
private_data = {'credentials': {}}
if project_update.credential:
@@ -1591,6 +1602,12 @@ class RunProjectUpdate(BaseTask):
if status == 'successful' and instance.launch_type != 'sync':
self._update_dependent_inventories(instance, dependent_inventory_sources)
def should_use_proot(self, instance, **kwargs):
'''
Return whether this task should use proot.
'''
return getattr(settings, 'AWX_PROOT_ENABLED', False)
class RunInventoryUpdate(BaseTask):

View File

@@ -44,6 +44,12 @@ def test_system_auditor_is_system_auditor(system_auditor):
assert system_auditor.is_system_auditor
@pytest.mark.django_db
def test_system_auditor_can_modify_self(system_auditor):
access = UserAccess(system_auditor)
assert access.can_change(obj=system_auditor, data=dict(is_system_auditor='true'))
@pytest.mark.django_db
def test_user_queryset(user):
u = user('pete', False)

View File

@@ -21,7 +21,7 @@ class TestCleanupInconsistentCeleryTasks():
@mock.patch.object(TaskManager, 'get_active_tasks', return_value=([], {}))
@mock.patch.object(TaskManager, 'get_running_tasks', return_value=({'host1': [Job(id=2), Job(id=3),]}, []))
@mock.patch.object(InstanceGroup.objects, 'prefetch_related', return_value=[])
@mock.patch.object(Instance.objects, 'get', side_effect=Instance.DoesNotExist)
@mock.patch.object(Instance.objects, 'filter', return_value=mock.MagicMock(first=lambda: None))
@mock.patch('awx.main.scheduler.logger')
def test_instance_does_not_exist(self, logger_mock, *args):
logger_mock.error = mock.MagicMock(side_effect=RuntimeError("mocked"))

View File

@@ -181,6 +181,8 @@ class TestJobExecution:
EXAMPLE_PRIVATE_KEY = '-----BEGIN PRIVATE KEY-----\nxyz==\n-----END PRIVATE KEY-----'
def setup_method(self, method):
if not os.path.exists(settings.PROJECTS_ROOT):
os.mkdir(settings.PROJECTS_ROOT)
self.project_path = tempfile.mkdtemp(prefix='awx_project_')
with open(os.path.join(self.project_path, 'helloworld.yml'), 'w') as f:
f.write('---')
@@ -281,6 +283,15 @@ class TestGenericRun(TestJobExecution):
args, cwd, env, stdout = call_args
assert args[0] == 'bwrap'
def test_bwrap_virtualenvs_are_readonly(self):
self.task.run(self.pk)
assert self.run_pexpect.call_count == 1
call_args, _ = self.run_pexpect.call_args_list[0]
args, cwd, env, stdout = call_args
assert '--ro-bind %s %s' % (settings.ANSIBLE_VENV_PATH, settings.ANSIBLE_VENV_PATH) in ' '.join(args) # noqa
assert '--ro-bind %s %s' % (settings.AWX_VENV_PATH, settings.AWX_VENV_PATH) in ' '.join(args) # noqa
def test_awx_task_env(self):
patch = mock.patch('awx.main.tasks.settings.AWX_TASK_ENV', {'FOO': 'BAR'})
patch.start()
@@ -1096,6 +1107,27 @@ class TestProjectUpdateCredentials(TestJobExecution):
]
}
def test_bwrap_exposes_projects_root(self):
ssh = CredentialType.defaults['ssh']()
self.instance.scm_type = 'git'
self.instance.credential = Credential(
pk=1,
credential_type=ssh,
)
self.task.run(self.pk)
assert self.run_pexpect.call_count == 1
call_args, call_kwargs = self.run_pexpect.call_args_list[0]
args, cwd, env, stdout = call_args
assert ' '.join(args).startswith('bwrap')
' '.join([
'--bind',
settings.PROJECTS_ROOT,
settings.PROJECTS_ROOT,
]) in ' '.join(args)
assert '"scm_revision_output": "/projects/tmp' in ' '.join(args)
def test_username_and_password_auth(self, scm_type):
ssh = CredentialType.defaults['ssh']()
self.instance.scm_type = scm_type

View File

@@ -108,13 +108,14 @@ class RequireDebugTrueOrTest(logging.Filter):
return settings.DEBUG or 'test' in sys.argv
def memoize(ttl=60, cache_key=None):
def memoize(ttl=60, cache_key=None, cache_name='default'):
'''
Decorator to wrap a function and cache its result.
'''
from django.core.cache import cache
from django.core.cache import caches
def _memoizer(f, *args, **kwargs):
cache = caches[cache_name]
key = cache_key or slugify('%s %r %r' % (f.__name__, args, kwargs))
value = cache.get(key)
if value is None:
@@ -696,8 +697,13 @@ def wrap_args_with_proot(args, cwd, **kwargs):
show_paths = [cwd, kwargs['private_data_dir']]
else:
show_paths = [cwd]
show_paths.extend([settings.ANSIBLE_VENV_PATH, settings.AWX_VENV_PATH])
for venv in (
settings.ANSIBLE_VENV_PATH,
settings.AWX_VENV_PATH
):
new_args.extend(['--ro-bind', venv, venv])
show_paths.extend(getattr(settings, 'AWX_PROOT_SHOW_PATHS', None) or [])
show_paths.extend(kwargs.get('proot_show_paths', []))
for path in sorted(set(show_paths)):
if not os.path.exists(path):
continue