Compare commits

..

1 Commits

Author SHA1 Message Date
snyk-bot
14ca188b1e fix: requirements/requirements_dev.txt to reduce vulnerabilities
The following vulnerabilities are fixed by pinning transitive dependencies:
- https://snyk.io/vuln/SNYK-PYTHON-JUPYTERLAB-13053585
2025-09-27 06:36:57 +00:00
29 changed files with 476 additions and 1166 deletions

View File

@@ -17,6 +17,7 @@ in as the first entry for your PR title.
##### COMPONENT NAME
<!--- Name of the module/plugin/module/task -->
- API
- UI
- Collection
- CLI
- Docs

View File

@@ -8,10 +8,3 @@ updates:
labels:
- "docs"
- "dependencies"
- package-ecosystem: "pip"
directory: "requirements/"
schedule:
interval: "daily" #run daily until we trust it, then back this off to weekly
open-pull-requests-limit: 2
labels:
- "dependencies"

View File

@@ -1,85 +0,0 @@
---
name: SonarQube
on:
workflow_run:
workflows:
- CI
types:
- completed
permissions: read-all
jobs:
sonarqube:
runs-on: ubuntu-latest
if: github.event.workflow_run.conclusion == 'success' && github.event.workflow_run.event == 'pull_request'
steps:
- name: Checkout Code
uses: actions/checkout@v4
with:
fetch-depth: 0
show-progress: false
- name: Download coverage report artifact
uses: actions/download-artifact@v4
with:
name: coverage-report
path: reports/
github-token: ${{ secrets.GITHUB_TOKEN }}
run-id: ${{ github.event.workflow_run.id }}
- name: Download PR number artifact
uses: actions/download-artifact@v4
with:
name: pr-number
path: .
github-token: ${{ secrets.GITHUB_TOKEN }}
run-id: ${{ github.event.workflow_run.id }}
- name: Extract PR number
run: |
cat pr-number.txt
echo "PR_NUMBER=$(cat pr-number.txt)" >> $GITHUB_ENV
- name: Get PR info
uses: octokit/request-action@v2.x
id: pr_info
with:
route: GET /repos/{repo}/pulls/{number}
repo: ${{ github.event.repository.full_name }}
number: ${{ env.PR_NUMBER }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Set PR info into env
run: |
echo "PR_BASE=${{ fromJson(steps.pr_info.outputs.data).base.ref }}" >> $GITHUB_ENV
echo "PR_HEAD=${{ fromJson(steps.pr_info.outputs.data).head.ref }}" >> $GITHUB_ENV
- name: Add base branch
run: |
gh pr checkout ${{ env.PR_NUMBER }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Extract and export repo owner/name
run: |
REPO_SLUG="${GITHUB_REPOSITORY}"
IFS="/" read -r REPO_OWNER REPO_NAME <<< "$REPO_SLUG"
echo "REPO_OWNER=$REPO_OWNER" >> $GITHUB_ENV
echo "REPO_NAME=$REPO_NAME" >> $GITHUB_ENV
- name: SonarQube scan
uses: SonarSource/sonarqube-scan-action@v5
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets[format('{0}', vars.SONAR_TOKEN_SECRET_NAME)] }}
with:
args: >
-Dsonar.organization=${{ env.REPO_OWNER }}
-Dsonar.projectKey=${{ env.REPO_OWNER }}_${{ env.REPO_NAME }}
-Dsonar.pullrequest.key=${{ env.PR_NUMBER }}
-Dsonar.pullrequest.branch=${{ env.PR_HEAD }}
-Dsonar.pullrequest.base=${{ env.PR_BASE }}
-Dsonar.scm.revision=${{ github.event.workflow_run.head_sha }}

View File

@@ -38,12 +38,11 @@ jobs:
--workdir=/awx_devel `make print-DEVEL_IMAGE_NAME` /start_tests.sh genschema
- name: Upload API Schema
uses: keithweaver/aws-s3-github-action@4dd5a7b81d54abaa23bbac92b27e85d7f405ae53
with:
command: cp
source: ${{ github.workspace }}/schema.json
destination: s3://awx-public-ci-files/${{ github.ref_name }}/schema.json
aws_access_key_id: ${{ secrets.AWS_ACCESS_KEY }}
aws_secret_access_key: ${{ secrets.AWS_SECRET_KEY }}
aws_region: us-east-1
flags: --acl public-read --only-show-errors
env:
AWS_ACCESS_KEY: ${{ secrets.AWS_ACCESS_KEY }}
AWS_SECRET_KEY: ${{ secrets.AWS_SECRET_KEY }}
AWS_REGION: 'us-east-1'
run: |
ansible localhost -c local, -m command -a "{{ ansible_python_interpreter + ' -m pip install boto3'}}"
ansible localhost -c local -m aws_s3 \
-a "src=${{ github.workspace }}/schema.json bucket=awx-public-ci-files object=${GITHUB_REF##*/}/schema.json mode=put permission=public-read"

View File

@@ -3,7 +3,7 @@
from django.urls import re_path
from awx.api.views import RoleList, RoleDetail, RoleUsersList, RoleTeamsList
from awx.api.views import RoleList, RoleDetail, RoleUsersList, RoleTeamsList, RoleParentsList, RoleChildrenList
urls = [
@@ -11,6 +11,8 @@ urls = [
re_path(r'^(?P<pk>[0-9]+)/$', RoleDetail.as_view(), name='role_detail'),
re_path(r'^(?P<pk>[0-9]+)/users/$', RoleUsersList.as_view(), name='role_users_list'),
re_path(r'^(?P<pk>[0-9]+)/teams/$', RoleTeamsList.as_view(), name='role_teams_list'),
re_path(r'^(?P<pk>[0-9]+)/parents/$', RoleParentsList.as_view(), name='role_parents_list'),
re_path(r'^(?P<pk>[0-9]+)/children/$', RoleChildrenList.as_view(), name='role_children_list'),
]
__all__ = ['urls']

View File

@@ -720,19 +720,9 @@ class TeamRolesList(SubListAttachDetachAPIView):
team = get_object_or_404(models.Team, pk=self.kwargs['pk'])
credential_content_type = ContentType.objects.get_for_model(models.Credential)
if role.content_type == credential_content_type:
if not role.content_object.organization:
data = dict(
msg=_("You cannot grant access to a credential that is not assigned to an organization (private credentials cannot be assigned to teams)")
)
if not role.content_object.organization or role.content_object.organization.id != team.organization.id:
data = dict(msg=_("You cannot grant credential access to a team when the Organization field isn't set, or belongs to a different organization"))
return Response(data, status=status.HTTP_400_BAD_REQUEST)
elif role.content_object.organization.id != team.organization.id:
if not request.user.is_superuser:
data = dict(
msg=_(
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
)
)
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(TeamRolesList, self).post(request, *args, **kwargs)
@@ -4213,21 +4203,9 @@ class RoleTeamsList(SubListAttachDetachAPIView):
credential_content_type = ContentType.objects.get_for_model(models.Credential)
if role.content_type == credential_content_type:
# Private credentials (no organization) are never allowed for teams
if not role.content_object.organization:
data = dict(
msg=_("You cannot grant access to a credential that is not assigned to an organization (private credentials cannot be assigned to teams)")
)
if not role.content_object.organization or role.content_object.organization.id != team.organization.id:
data = dict(msg=_("You cannot grant credential access to a team when the Organization field isn't set, or belongs to a different organization"))
return Response(data, status=status.HTTP_400_BAD_REQUEST)
# Cross-organization credentials are only allowed for superusers
elif role.content_object.organization.id != team.organization.id:
if not request.user.is_superuser:
data = dict(
msg=_(
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
)
)
return Response(data, status=status.HTTP_400_BAD_REQUEST)
action = 'attach'
if request.data.get('disassociate', None):
@@ -4247,6 +4225,34 @@ class RoleTeamsList(SubListAttachDetachAPIView):
return Response(status=status.HTTP_204_NO_CONTENT)
class RoleParentsList(SubListAPIView):
deprecated = True
model = models.Role
serializer_class = serializers.RoleSerializer
parent_model = models.Role
relationship = 'parents'
permission_classes = (IsAuthenticated,)
search_fields = ('role_field', 'content_type__model')
def get_queryset(self):
role = models.Role.objects.get(pk=self.kwargs['pk'])
return models.Role.filter_visible_roles(self.request.user, role.parents.all())
class RoleChildrenList(SubListAPIView):
deprecated = True
model = models.Role
serializer_class = serializers.RoleSerializer
parent_model = models.Role
relationship = 'children'
permission_classes = (IsAuthenticated,)
search_fields = ('role_field', 'content_type__model')
def get_queryset(self):
role = models.Role.objects.get(pk=self.kwargs['pk'])
return models.Role.filter_visible_roles(self.request.user, role.children.all())
# Create view functions for all of the class-based views to simplify inclusion
# in URL patterns and reverse URL lookups, converting CamelCase names to
# lowercase_with_underscore (e.g. MyView.as_view() becomes my_view).

View File

@@ -180,47 +180,16 @@ class ApiV2SubscriptionView(APIView):
def post(self, request):
data = request.data.copy()
if data.get('subscriptions_client_secret') == '$encrypted$':
data['subscriptions_client_secret'] = settings.SUBSCRIPTIONS_CLIENT_SECRET
try:
user = None
pw = None
basic_auth = False
# determine if the credentials are for basic auth or not
if data.get('subscriptions_client_id'):
user, pw = data.get('subscriptions_client_id'), data.get('subscriptions_client_secret')
if pw == '$encrypted$':
pw = settings.SUBSCRIPTIONS_CLIENT_SECRET
elif data.get('subscriptions_username'):
user, pw = data.get('subscriptions_username'), data.get('subscriptions_password')
if pw == '$encrypted$':
pw = settings.SUBSCRIPTIONS_PASSWORD
basic_auth = True
if not user or not pw:
return Response({"error": _("Missing subscription credentials")}, status=status.HTTP_400_BAD_REQUEST)
user, pw = data.get('subscriptions_client_id'), data.get('subscriptions_client_secret')
with set_environ(**settings.AWX_TASK_ENV):
validated = get_licenser().validate_rh(user, pw, basic_auth)
# update settings if the credentials were valid
if basic_auth:
if user:
settings.SUBSCRIPTIONS_USERNAME = user
if pw:
settings.SUBSCRIPTIONS_PASSWORD = pw
# mutual exclusion for basic auth and service account
# only one should be set at a given time so that
# config/attach/ knows which credentials to use
settings.SUBSCRIPTIONS_CLIENT_ID = ""
settings.SUBSCRIPTIONS_CLIENT_SECRET = ""
else:
if user:
settings.SUBSCRIPTIONS_CLIENT_ID = user
if pw:
settings.SUBSCRIPTIONS_CLIENT_SECRET = pw
# mutual exclusion for basic auth and service account
settings.SUBSCRIPTIONS_USERNAME = ""
settings.SUBSCRIPTIONS_PASSWORD = ""
validated = get_licenser().validate_rh(user, pw)
if user:
settings.SUBSCRIPTIONS_CLIENT_ID = data['subscriptions_client_id']
if pw:
settings.SUBSCRIPTIONS_CLIENT_SECRET = data['subscriptions_client_secret']
except Exception as exc:
msg = _("Invalid Subscription")
if isinstance(exc, TokenError) or (
@@ -256,21 +225,16 @@ class ApiV2AttachView(APIView):
if not subscription_id:
return Response({"error": _("No subscription ID provided.")}, status=status.HTTP_400_BAD_REQUEST)
# Ensure we always use the latest subscription credentials
cache.delete_many(['SUBSCRIPTIONS_CLIENT_ID', 'SUBSCRIPTIONS_CLIENT_SECRET', 'SUBSCRIPTIONS_USERNAME', 'SUBSCRIPTIONS_PASSWORD'])
cache.delete_many(['SUBSCRIPTIONS_CLIENT_ID', 'SUBSCRIPTIONS_CLIENT_SECRET'])
user = getattr(settings, 'SUBSCRIPTIONS_CLIENT_ID', None)
pw = getattr(settings, 'SUBSCRIPTIONS_CLIENT_SECRET', None)
basic_auth = False
if not (user and pw):
user = getattr(settings, 'SUBSCRIPTIONS_USERNAME', None)
pw = getattr(settings, 'SUBSCRIPTIONS_PASSWORD', None)
basic_auth = True
if not (user and pw):
return Response({"error": _("Missing subscription credentials")}, status=status.HTTP_400_BAD_REQUEST)
if subscription_id and user and pw:
data = request.data.copy()
try:
with set_environ(**settings.AWX_TASK_ENV):
validated = get_licenser().validate_rh(user, pw, basic_auth)
validated = get_licenser().validate_rh(user, pw)
except Exception as exc:
msg = _("Invalid Subscription")
if isinstance(exc, requests.exceptions.HTTPError) and getattr(getattr(exc, 'response', None), 'status_code', None) == 401:
@@ -284,7 +248,6 @@ class ApiV2AttachView(APIView):
else:
logger.exception(smart_str(u"Invalid subscription submitted."), extra=dict(actor=request.user.username))
return Response({"error": msg}, status=status.HTTP_400_BAD_REQUEST)
for sub in validated:
if sub['subscription_id'] == subscription_id:
sub['valid_key'] = True

View File

@@ -44,12 +44,11 @@ class MetricsServer(MetricsServerSettings):
class BaseM:
def __init__(self, field, help_text, labels=None):
def __init__(self, field, help_text):
self.field = field
self.help_text = help_text
self.current_value = 0
self.metric_has_changed = False
self.labels = labels or {}
def reset_value(self, conn):
conn.hset(root_key, self.field, 0)
@@ -70,16 +69,12 @@ class BaseM:
value = conn.hget(root_key, self.field)
return self.decode_value(value)
def to_prometheus(self, instance_data, namespace=None):
def to_prometheus(self, instance_data):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} gauge\n"
for instance in instance_data:
if self.field in instance_data[instance]:
# Build label string
labels = f'node="{instance}"'
if namespace:
labels += f',subsystem="{namespace}"'
# on upgrade, if there are stale instances, we can end up with issues where new metrics are not present
output_text += f'{self.field}{{{labels}}} {instance_data[instance][self.field]}\n'
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance][self.field]}\n'
return output_text
@@ -172,17 +167,14 @@ class HistogramM(BaseM):
self.sum.store_value(conn)
self.inf.store_value(conn)
def to_prometheus(self, instance_data, namespace=None):
def to_prometheus(self, instance_data):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} histogram\n"
for instance in instance_data:
# Build label string
node_label = f'node="{instance}"'
subsystem_label = f',subsystem="{namespace}"' if namespace else ''
for i, b in enumerate(self.buckets):
output_text += f'{self.field}_bucket{{le="{b}",{node_label}{subsystem_label}}} {sum(instance_data[instance][self.field]["counts"][0:i+1])}\n'
output_text += f'{self.field}_bucket{{le="+Inf",{node_label}{subsystem_label}}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_count{{{node_label}{subsystem_label}}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_sum{{{node_label}{subsystem_label}}} {instance_data[instance][self.field]["sum"]}\n'
output_text += f'{self.field}_bucket{{le="{b}",node="{instance}"}} {sum(instance_data[instance][self.field]["counts"][0:i+1])}\n'
output_text += f'{self.field}_bucket{{le="+Inf",node="{instance}"}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_count{{node="{instance}"}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_sum{{node="{instance}"}} {instance_data[instance][self.field]["sum"]}\n'
return output_text
@@ -281,22 +273,20 @@ class Metrics(MetricsNamespace):
def pipe_execute(self):
if self.metrics_have_changed is True:
duration_pipe_exec = time.perf_counter()
duration_to_save = time.perf_counter()
for m in self.METRICS:
self.METRICS[m].store_value(self.pipe)
self.pipe.execute()
self.last_pipe_execute = time.time()
self.metrics_have_changed = False
duration_pipe_exec = time.perf_counter() - duration_pipe_exec
duration_send_metrics = time.perf_counter()
self.send_metrics()
duration_send_metrics = time.perf_counter() - duration_send_metrics
# Increment operational metrics
self.METRICS['subsystem_metrics_pipe_execute_seconds'].inc(duration_pipe_exec)
duration_to_save = time.perf_counter() - duration_to_save
self.METRICS['subsystem_metrics_pipe_execute_seconds'].inc(duration_to_save)
self.METRICS['subsystem_metrics_pipe_execute_calls'].inc(1)
self.METRICS['subsystem_metrics_send_metrics_seconds'].inc(duration_send_metrics)
duration_to_save = time.perf_counter()
self.send_metrics()
duration_to_save = time.perf_counter() - duration_to_save
self.METRICS['subsystem_metrics_send_metrics_seconds'].inc(duration_to_save)
def send_metrics(self):
# more than one thread could be calling this at the same time, so should
@@ -362,13 +352,7 @@ class Metrics(MetricsNamespace):
if instance_data:
for field in self.METRICS:
if len(metrics_filter) == 0 or field in metrics_filter:
# Add subsystem label only for operational metrics
namespace = (
self._namespace
if field in ['subsystem_metrics_pipe_execute_seconds', 'subsystem_metrics_pipe_execute_calls', 'subsystem_metrics_send_metrics_seconds']
else None
)
output_text += self.METRICS[field].to_prometheus(instance_data, namespace)
output_text += self.METRICS[field].to_prometheus(instance_data)
return output_text
@@ -456,10 +440,7 @@ class CustomToPrometheusMetricsCollector(prometheus_client.registry.Collector):
logger.debug(f"No metric data not found in redis for metric namespace '{self._metrics._namespace}'")
return None
if not (host_metrics := instance_data.get(my_hostname)):
logger.debug(f"Metric data for this node '{my_hostname}' not found in redis for metric namespace '{self._metrics._namespace}'")
return None
host_metrics = instance_data.get(my_hostname)
for _, metric in self._metrics.METRICS.items():
entry = host_metrics.get(metric.field)
if not entry:

View File

@@ -144,35 +144,6 @@ register(
category_slug='system',
)
register(
'SUBSCRIPTIONS_USERNAME',
field_class=fields.CharField,
default='',
allow_blank=True,
encrypted=False,
read_only=False,
label=_('Red Hat Username for Subscriptions'),
help_text=_('Username used to retrieve subscription and content information'), # noqa
category=_('System'),
category_slug='system',
hidden=True,
)
register(
'SUBSCRIPTIONS_PASSWORD',
field_class=fields.CharField,
default='',
allow_blank=True,
encrypted=True,
read_only=False,
label=_('Red Hat Password for Subscriptions'),
help_text=_('Password used to retrieve subscription and content information'), # noqa
category=_('System'),
category_slug='system',
hidden=True,
)
register(
'SUBSCRIPTIONS_CLIENT_ID',
field_class=fields.CharField,
@@ -184,7 +155,6 @@ register(
help_text=_('Client ID used to retrieve subscription and content information'), # noqa
category=_('System'),
category_slug='system',
hidden=True,
)
register(
@@ -198,7 +168,6 @@ register(
help_text=_('Client secret used to retrieve subscription and content information'), # noqa
category=_('System'),
category_slug='system',
hidden=True,
)
register(

View File

@@ -14,14 +14,21 @@ from jinja2.exceptions import UndefinedError, TemplateSyntaxError, SecurityError
# Django
from django.core import exceptions as django_exceptions
from django.core.serializers.json import DjangoJSONEncoder
from django.db.models.signals import m2m_changed, post_save
from django.db.models.signals import (
post_save,
post_delete,
)
from django.db.models.signals import m2m_changed
from django.db import models
from django.db.models.fields.related import lazy_related_operation
from django.db.models.fields.related_descriptors import (
ReverseOneToOneDescriptor,
ForwardManyToOneDescriptor,
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
create_forward_many_to_many_manager,
)
from django.utils.encoding import smart_str
from django.db.models import JSONField
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
@@ -47,6 +54,7 @@ __all__ = [
'ImplicitRoleField',
'SmartFilterField',
'OrderedManyToManyField',
'update_role_parentage_for_instance',
'is_implicit_parent',
]
@@ -138,6 +146,34 @@ class AutoOneToOneField(models.OneToOneField):
setattr(cls, related.get_accessor_name(), AutoSingleRelatedObjectDescriptor(related))
def resolve_role_field(obj, field):
ret = []
field_components = field.split('.', 1)
if hasattr(obj, field_components[0]):
obj = getattr(obj, field_components[0])
else:
return []
if obj is None:
return []
if len(field_components) == 1:
# use extremely generous duck typing to accomidate all possible forms
# of the model that may be used during various migrations
if obj._meta.model_name != 'role' or obj._meta.app_label != 'main':
raise Exception(smart_str('{} refers to a {}, not a Role'.format(field, type(obj))))
ret.append(obj.id)
else:
if type(obj) is ManyToManyDescriptor:
for o in obj.all():
ret += resolve_role_field(o, field_components[1])
else:
ret += resolve_role_field(obj, field_components[1])
return ret
def is_implicit_parent(parent_role, child_role):
"""
Determine if the parent_role is an implicit parent as defined by
@@ -174,6 +210,34 @@ def is_implicit_parent(parent_role, child_role):
return False
def update_role_parentage_for_instance(instance):
"""update_role_parentage_for_instance
updates the parents listing for all the roles
of a given instance if they have changed
"""
parents_removed = set()
parents_added = set()
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
cur_role = getattr(instance, implicit_role_field.name)
original_parents = set(json.loads(cur_role.implicit_parents))
new_parents = implicit_role_field._resolve_parent_roles(instance)
removals = original_parents - new_parents
if removals:
cur_role.parents.remove(*list(removals))
parents_removed.add(cur_role.pk)
additions = new_parents - original_parents
if additions:
cur_role.parents.add(*list(additions))
parents_added.add(cur_role.pk)
new_parents_list = list(new_parents)
new_parents_list.sort()
new_parents_json = json.dumps(new_parents_list)
if cur_role.implicit_parents != new_parents_json:
cur_role.implicit_parents = new_parents_json
cur_role.save(update_fields=['implicit_parents'])
return (parents_added, parents_removed)
class ImplicitRoleDescriptor(ForwardManyToOneDescriptor):
pass
@@ -205,6 +269,65 @@ class ImplicitRoleField(models.ForeignKey):
getattr(cls, '__implicit_role_fields').append(self)
post_save.connect(self._post_save, cls, True, dispatch_uid='implicit-role-post-save')
post_delete.connect(self._post_delete, cls, True, dispatch_uid='implicit-role-post-delete')
function = lambda local, related, field: self.bind_m2m_changed(field, related, local)
lazy_related_operation(function, cls, "self", field=self)
def bind_m2m_changed(self, _self, _role_class, cls):
if not self.parent_role:
return
field_names = self.parent_role
if type(field_names) is not list:
field_names = [field_names]
for field_name in field_names:
if field_name.startswith('singleton:'):
continue
field_name, sep, field_attr = field_name.partition('.')
# Non existent fields will occur if ever a parent model is
# moved inside a migration, needed for job_template_organization_field
# migration in particular
# consistency is assured by unit test awx.main.tests.functional
field = getattr(cls, field_name, None)
if field and type(field) is ReverseManyToOneDescriptor or type(field) is ManyToManyDescriptor:
if '.' in field_attr:
raise Exception('Referencing deep roles through ManyToMany fields is unsupported.')
if type(field) is ReverseManyToOneDescriptor:
sender = field.through
else:
sender = field.related.through
reverse = type(field) is ManyToManyDescriptor
m2m_changed.connect(self.m2m_update(field_attr, reverse), sender, weak=False)
def m2m_update(self, field_attr, _reverse):
def _m2m_update(instance, action, model, pk_set, reverse, **kwargs):
if action == 'post_add' or action == 'pre_remove':
if _reverse:
reverse = not reverse
if reverse:
for pk in pk_set:
obj = model.objects.get(pk=pk)
if action == 'post_add':
getattr(instance, field_attr).children.add(getattr(obj, self.name))
if action == 'pre_remove':
getattr(instance, field_attr).children.remove(getattr(obj, self.name))
else:
for pk in pk_set:
obj = model.objects.get(pk=pk)
if action == 'post_add':
getattr(instance, self.name).parents.add(getattr(obj, field_attr))
if action == 'pre_remove':
getattr(instance, self.name).parents.remove(getattr(obj, field_attr))
return _m2m_update
def _post_save(self, instance, created, *args, **kwargs):
Role_ = utils.get_current_apps().get_model('main', 'Role')
@@ -214,24 +337,68 @@ class ImplicitRoleField(models.ForeignKey):
Model = utils.get_current_apps().get_model('main', instance.__class__.__name__)
latest_instance = Model.objects.get(pk=instance.pk)
# Create any missing role objects
missing_roles = []
for implicit_role_field in getattr(latest_instance.__class__, '__implicit_role_fields'):
cur_role = getattr(latest_instance, implicit_role_field.name, None)
if cur_role is None:
missing_roles.append(Role_(role_field=implicit_role_field.name, content_type_id=ct_id, object_id=latest_instance.id))
# Avoid circular import
from awx.main.models.rbac import batch_role_ancestor_rebuilding, Role
if len(missing_roles) > 0:
Role_.objects.bulk_create(missing_roles)
updates = {}
role_ids = []
for role in Role_.objects.filter(content_type_id=ct_id, object_id=latest_instance.id):
setattr(latest_instance, role.role_field, role)
updates[role.role_field] = role.id
role_ids.append(role.id)
type(latest_instance).objects.filter(pk=latest_instance.pk).update(**updates)
with batch_role_ancestor_rebuilding():
# Create any missing role objects
missing_roles = []
for implicit_role_field in getattr(latest_instance.__class__, '__implicit_role_fields'):
cur_role = getattr(latest_instance, implicit_role_field.name, None)
if cur_role is None:
missing_roles.append(Role_(role_field=implicit_role_field.name, content_type_id=ct_id, object_id=latest_instance.id))
instance.refresh_from_db()
if len(missing_roles) > 0:
Role_.objects.bulk_create(missing_roles)
updates = {}
role_ids = []
for role in Role_.objects.filter(content_type_id=ct_id, object_id=latest_instance.id):
setattr(latest_instance, role.role_field, role)
updates[role.role_field] = role.id
role_ids.append(role.id)
type(latest_instance).objects.filter(pk=latest_instance.pk).update(**updates)
Role.rebuild_role_ancestor_list(role_ids, [])
update_role_parentage_for_instance(latest_instance)
instance.refresh_from_db()
def _resolve_parent_roles(self, instance):
if not self.parent_role:
return set()
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
parent_roles = set()
for path in paths:
if path.startswith("singleton:"):
singleton_name = path[10:]
Role_ = utils.get_current_apps().get_model('main', 'Role')
qs = Role_.objects.filter(singleton_name=singleton_name)
if qs.count() >= 1:
role = qs[0]
else:
role = Role_.objects.create(singleton_name=singleton_name, role_field=singleton_name)
parents = [role.id]
else:
parents = resolve_role_field(instance, path)
for parent in parents:
parent_roles.add(parent)
return parent_roles
def _post_delete(self, instance, *args, **kwargs):
role_ids = []
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
role_ids.append(getattr(instance, implicit_role_field.name + '_id'))
Role_ = utils.get_current_apps().get_model('main', 'Role')
child_ids = [x for x in Role_.parents.through.objects.filter(to_role_id__in=role_ids).distinct().values_list('from_role_id', flat=True)]
Role_.objects.filter(id__in=role_ids).delete()
# Avoid circular import
from awx.main.models.rbac import Role
Role.rebuild_role_ancestor_list([], child_ids)
class SmartFilterField(models.TextField):

View File

@@ -10,11 +10,6 @@ def setup_tower_managed_defaults(apps, schema_editor):
CredentialType.setup_tower_managed_defaults(apps)
def setup_rbac_role_system_administrator(apps, schema_editor):
Role = apps.get_model('main', 'Role')
Role.objects.get_or_create(singleton_name='system_administrator', role_field='system_administrator')
class Migration(migrations.Migration):
dependencies = [
('main', '0200_template_name_constraint'),
@@ -22,5 +17,4 @@ class Migration(migrations.Migration):
operations = [
migrations.RunPython(setup_tower_managed_defaults),
migrations.RunPython(setup_rbac_role_system_administrator),
]

View File

@@ -3,6 +3,7 @@ from time import time
from django.db.models import Subquery, OuterRef, F
from awx.main.fields import update_role_parentage_for_instance
from awx.main.models.rbac import Role, batch_role_ancestor_rebuilding
logger = logging.getLogger('rbac_migrations')
@@ -237,10 +238,85 @@ def restore_inventory_admins_backward(apps, schema_editor):
def rebuild_role_hierarchy(apps, schema_editor):
"""Not used after DAB RBAC migration"""
pass
"""
This should be called in any migration when ownerships are changed.
Ex. I remove a user from the admin_role of a credential.
Ancestors are cached from parents for performance, this re-computes ancestors.
"""
logger.info('Computing role roots..')
start = time()
roots = Role.objects.all().values_list('id', flat=True)
stop = time()
logger.info('Found %d roots in %f seconds, rebuilding ancestry map' % (len(roots), stop - start))
start = time()
Role.rebuild_role_ancestor_list(roots, [])
stop = time()
logger.info('Rebuild ancestors completed in %f seconds' % (stop - start))
logger.info('Done.')
def rebuild_role_parentage(apps, schema_editor, models=None):
"""Not used after DAB RBAC migration"""
pass
"""
This should be called in any migration when any parent_role entry
is modified so that the cached parent fields will be updated. Ex:
foo_role = ImplicitRoleField(
parent_role=['bar_role'] # change to parent_role=['admin_role']
)
This is like rebuild_role_hierarchy, but that method updates ancestors,
whereas this method updates parents.
"""
start = time()
seen_models = set()
model_ct = 0
noop_ct = 0
ContentType = apps.get_model('contenttypes', "ContentType")
additions = set()
removals = set()
role_qs = Role.objects
if models:
# update_role_parentage_for_instance is expensive
# if the models have been downselected, ignore those which are not in the list
ct_ids = list(ContentType.objects.filter(model__in=[name.lower() for name in models]).values_list('id', flat=True))
role_qs = role_qs.filter(content_type__in=ct_ids)
for role in role_qs.iterator():
if not role.object_id:
continue
model_tuple = (role.content_type_id, role.object_id)
if model_tuple in seen_models:
continue
seen_models.add(model_tuple)
# The GenericForeignKey does not work right in migrations
# with the usage as role.content_object
# so we do the lookup ourselves with current migration models
ct = role.content_type
app = ct.app_label
ct_model = apps.get_model(app, ct.model)
content_object = ct_model.objects.get(pk=role.object_id)
parents_added, parents_removed = update_role_parentage_for_instance(content_object)
additions.update(parents_added)
removals.update(parents_removed)
if parents_added:
model_ct += 1
logger.debug('Added to parents of roles {} of {}'.format(parents_added, content_object))
if parents_removed:
model_ct += 1
logger.debug('Removed from parents of roles {} of {}'.format(parents_removed, content_object))
else:
noop_ct += 1
logger.debug('No changes to role parents for {} resources'.format(noop_ct))
logger.debug('Added parents to {} roles'.format(len(additions)))
logger.debug('Removed parents from {} roles'.format(len(removals)))
if model_ct:
logger.info('Updated implicit parents of {} resources'.format(model_ct))
logger.info('Rebuild parentage completed in %f seconds' % (time() - start))
# this is ran because the ordinary signals for
# Role.parents.add and Role.parents.remove not called in migration
Role.rebuild_role_ancestor_list(list(additions), list(removals))

View File

@@ -53,8 +53,8 @@ class GrafanaBackend(AWXBaseEmailBackend, CustomNotificationBase):
):
super(GrafanaBackend, self).__init__(fail_silently=fail_silently)
self.grafana_key = grafana_key
self.dashboardId = int(dashboardId) if dashboardId != '' else None
self.panelId = int(panelId) if panelId != '' else None
self.dashboardId = int(dashboardId) if dashboardId is not None and panelId != "" else None
self.panelId = int(panelId) if panelId is not None and panelId != "" else None
self.annotation_tags = annotation_tags if annotation_tags is not None else []
self.grafana_no_verify_ssl = grafana_no_verify_ssl
self.isRegion = isRegion

View File

@@ -38,6 +38,7 @@ from awx.main.models import (
InventorySource,
Job,
JobHostSummary,
JobTemplate,
Organization,
Project,
Role,
@@ -55,7 +56,10 @@ from awx.main.models import (
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, get_current_apps
from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates
from awx.main.tasks.system import update_inventory_computed_fields, handle_removed_image
from awx.main.fields import is_implicit_parent
from awx.main.fields import (
is_implicit_parent,
update_role_parentage_for_instance,
)
from awx.main import consumers
@@ -188,6 +192,31 @@ def cleanup_detached_labels_on_deleted_parent(sender, instance, **kwargs):
label.delete()
def save_related_job_templates(sender, instance, **kwargs):
"""save_related_job_templates loops through all of the
job templates that use an Inventory that have had their
Organization updated. This triggers the rebuilding of the RBAC hierarchy
and ensures the proper access restrictions.
"""
if sender is not Inventory:
raise ValueError('This signal callback is only intended for use with Project or Inventory')
update_fields = kwargs.get('update_fields', None)
if (update_fields and not ('organization' in update_fields or 'organization_id' in update_fields)) or kwargs.get('created', False):
return
if instance._prior_values_store.get('organization_id') != instance.organization_id:
jtq = JobTemplate.objects.filter(**{sender.__name__.lower(): instance})
for jt in jtq:
parents_added, parents_removed = update_role_parentage_for_instance(jt)
if parents_added or parents_removed:
logger.info(
'Permissions on JT {} changed due to inventory {} organization change from {} to {}.'.format(
jt.pk, instance.pk, instance._prior_values_store.get('organization_id'), instance.organization_id
)
)
def connect_computed_field_signals():
post_save.connect(emit_update_inventory_on_created_or_deleted, sender=Host)
post_delete.connect(emit_update_inventory_on_created_or_deleted, sender=Host)
@@ -201,6 +230,7 @@ def connect_computed_field_signals():
connect_computed_field_signals()
post_save.connect(save_related_job_templates, sender=Inventory)
m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through)
m2m_changed.connect(rbac_activity_stream, Role.members.through)
m2m_changed.connect(rbac_activity_stream, Role.parents.through)

View File

@@ -287,72 +287,6 @@ def test_sa_grant_private_credential_to_team_through_role_teams(post, credential
assert response.status_code == 400
@pytest.mark.django_db
def test_grant_credential_to_team_different_organization_through_role_teams(post, get, credential, organizations, admin, org_admin, team, team_member):
# # Test that credential from different org can be assigned to team by a superuser through role_teams_list endpoint
orgs = organizations(2)
credential.organization = orgs[0]
credential.save()
team.organization = orgs[1]
team.save()
# Non-superuser (org_admin) trying cross-org assignment should be denied
response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {'id': team.id}, org_admin)
assert response.status_code == 400
assert (
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
in response.data['msg']
)
# Superuser (admin) can do cross-org assignment
response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {'id': team.id}, admin)
assert response.status_code == 204
assert credential.use_role in team.member_role.children.all()
assert team_member in credential.read_role
assert team_member in credential.use_role
assert team_member not in credential.admin_role
@pytest.mark.django_db
def test_grant_credential_to_team_different_organization(post, get, credential, organizations, admin, org_admin, team, team_member):
# Test that credential from different org can be assigned to team by a superuser
orgs = organizations(2)
credential.organization = orgs[0]
credential.save()
team.organization = orgs[1]
team.save()
# Non-superuser (org_admin, ...) trying cross-org assignment should be denied
response = post(reverse('api:team_roles_list', kwargs={'pk': team.id}), {'id': credential.use_role.id}, org_admin)
assert response.status_code == 400
assert (
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
in response.data['msg']
)
# Superuser (system admin) can do cross-org assignment
response = post(reverse('api:team_roles_list', kwargs={'pk': team.id}), {'id': credential.use_role.id}, admin)
assert response.status_code == 204
assert credential.use_role in team.member_role.children.all()
assert team_member in credential.read_role
assert team_member in credential.use_role
assert team_member not in credential.admin_role
# Team member can see the credential in API
response = get(reverse('api:team_credentials_list', kwargs={'pk': team.id}), team_member)
assert response.status_code == 200
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == credential.id
# Team member can see the credential in general credentials API
response = get(reverse('api:credential_list'), team_member)
assert response.status_code == 200
assert any(cred['id'] == credential.id for cred in response.data['results'])
@pytest.mark.django_db
def test_sa_grant_private_credential_to_team_through_team_roles(post, credential, admin, team):
# not even a system admin can grant a private cred to a team though

View File

@@ -1,244 +0,0 @@
from unittest.mock import patch, MagicMock
import pytest
from awx.api.versioning import reverse
from rest_framework import status
@pytest.mark.django_db
class TestApiV2SubscriptionView:
"""Test cases for the /api/v2/config/subscriptions/ endpoint"""
def test_basic_auth(self, post, admin):
"""Test POST with subscriptions_username and subscriptions_password calls validate_rh with basic_auth=True"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_user', 'test_password', True)
def test_service_account(self, post, admin):
"""Test POST with subscriptions_client_id and subscriptions_client_secret calls validate_rh with basic_auth=False"""
data = {'subscriptions_client_id': 'test_client_id', 'subscriptions_client_secret': 'test_client_secret'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'test_client_secret', False)
def test_encrypted_password_basic_auth(self, post, admin, settings):
"""Test POST with $encrypted$ password uses settings value for basic auth"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': '$encrypted$'}
settings.SUBSCRIPTIONS_PASSWORD = 'actual_password_from_settings'
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_user', 'actual_password_from_settings', True)
def test_encrypted_client_secret_service_account(self, post, admin, settings):
"""Test POST with $encrypted$ client_secret uses settings value for service_account"""
data = {'subscriptions_client_id': 'test_client_id', 'subscriptions_client_secret': '$encrypted$'}
settings.SUBSCRIPTIONS_CLIENT_SECRET = 'actual_secret_from_settings'
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'actual_secret_from_settings', False)
def test_missing_username_returns_error(self, post, admin):
"""Test POST with missing username returns 400 error"""
data = {'subscriptions_password': 'test_password'}
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_missing_password_returns_error(self, post, admin, settings):
"""Test POST with missing password returns 400 error"""
data = {'subscriptions_username': 'test_user'}
settings.SUBSCRIPTIONS_PASSWORD = None
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_missing_client_id_returns_error(self, post, admin):
"""Test POST with missing client_id returns 400 error"""
data = {'subscriptions_client_secret': 'test_secret'}
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_missing_client_secret_returns_error(self, post, admin, settings):
"""Test POST with missing client_secret returns 400 error"""
data = {'subscriptions_client_id': 'test_client_id'}
settings.SUBSCRIPTIONS_CLIENT_SECRET = None
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_empty_username_returns_error(self, post, admin):
"""Test POST with empty username returns 400 error"""
data = {'subscriptions_username': '', 'subscriptions_password': 'test_password'}
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_empty_password_returns_error(self, post, admin, settings):
"""Test POST with empty password returns 400 error"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': ''}
settings.SUBSCRIPTIONS_PASSWORD = None
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_non_superuser_permission_denied(self, post, rando):
"""Test that non-superuser cannot access the endpoint"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'}
response = post(reverse('api:api_v2_subscription_view'), data, rando)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_settings_updated_on_successful_basic_auth(self, post, admin, settings):
"""Test that settings are updated when basic auth validation succeeds"""
data = {'subscriptions_username': 'new_username', 'subscriptions_password': 'new_password'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
assert settings.SUBSCRIPTIONS_USERNAME == 'new_username'
assert settings.SUBSCRIPTIONS_PASSWORD == 'new_password'
def test_settings_updated_on_successful_service_account(self, post, admin, settings):
"""Test that settings are updated when service account validation succeeds"""
data = {'subscriptions_client_id': 'new_client_id', 'subscriptions_client_secret': 'new_client_secret'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
assert settings.SUBSCRIPTIONS_CLIENT_ID == 'new_client_id'
assert settings.SUBSCRIPTIONS_CLIENT_SECRET == 'new_client_secret'
def test_validate_rh_exception_handling(self, post, admin):
"""Test that exceptions from validate_rh are properly handled"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.side_effect = Exception("Connection error")
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_mixed_credentials_prioritizes_client_id(self, post, admin):
"""Test that when both username and client_id are provided, client_id takes precedence"""
data = {
'subscriptions_username': 'test_user',
'subscriptions_password': 'test_password',
'subscriptions_client_id': 'test_client_id',
'subscriptions_client_secret': 'test_client_secret',
}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
# Should use service account (basic_auth=False) since client_id is present
mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'test_client_secret', False)
def test_basic_auth_clears_service_account_settings(self, post, admin, settings):
"""Test that setting basic auth credentials clears service account settings"""
# Pre-populate service account settings
settings.SUBSCRIPTIONS_CLIENT_ID = 'existing_client_id'
settings.SUBSCRIPTIONS_CLIENT_SECRET = 'existing_client_secret'
data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
# Basic auth settings should be set
assert settings.SUBSCRIPTIONS_USERNAME == 'test_user'
assert settings.SUBSCRIPTIONS_PASSWORD == 'test_password'
# Service account settings should be cleared
assert settings.SUBSCRIPTIONS_CLIENT_ID == ""
assert settings.SUBSCRIPTIONS_CLIENT_SECRET == ""
def test_service_account_clears_basic_auth_settings(self, post, admin, settings):
"""Test that setting service account credentials clears basic auth settings"""
# Pre-populate basic auth settings
settings.SUBSCRIPTIONS_USERNAME = 'existing_username'
settings.SUBSCRIPTIONS_PASSWORD = 'existing_password'
data = {'subscriptions_client_id': 'test_client_id', 'subscriptions_client_secret': 'test_client_secret'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
# Service account settings should be set
assert settings.SUBSCRIPTIONS_CLIENT_ID == 'test_client_id'
assert settings.SUBSCRIPTIONS_CLIENT_SECRET == 'test_client_secret'
# Basic auth settings should be cleared
assert settings.SUBSCRIPTIONS_USERNAME == ""
assert settings.SUBSCRIPTIONS_PASSWORD == ""

View File

@@ -387,6 +387,36 @@ def test_remove_team_from_role(post, team, admin, role):
assert role.parents.filter(id=team.member_role.id).count() == 0
#
# /roles/<id>/parents/
#
@pytest.mark.django_db
def test_role_parents(get, team, admin, role):
role.parents.add(team.member_role)
url = reverse('api:role_parents_list', kwargs={'pk': role.id})
response = get(url, admin)
assert response.status_code == 200
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == team.member_role.id
#
# /roles/<id>/children/
#
@pytest.mark.django_db
def test_role_children(get, team, admin, role):
role.parents.add(team.member_role)
url = reverse('api:role_children_list', kwargs={'pk': team.member_role.id})
response = get(url, admin)
assert response.status_code == 200
assert response.data['count'] == 2
assert response.data['results'][0]['id'] == role.id or response.data['results'][1]['id'] == role.id
#
# Generics
#

View File

@@ -167,9 +167,3 @@ class TestMigrationSmoke:
assert CredentialType.objects.filter(
name=expected_name
).exists(), f'Could not find {expected_name} credential type name, all names: {list(CredentialType.objects.values_list("name", flat=True))}'
# Verify the system_administrator role exists
Role = new_state.apps.get_model('main', 'Role')
assert Role.objects.filter(
singleton_name='system_administrator', role_field='system_administrator'
).exists(), "expected to find a system_administrator singleton role"

View File

@@ -13,7 +13,7 @@ def test_send_messages():
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='')
backend = grafana_backend.GrafanaBackend("testapikey")
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
@@ -43,7 +43,7 @@ def test_send_messages_with_no_verify_ssl():
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='', grafana_no_verify_ssl=True)
backend = grafana_backend.GrafanaBackend("testapikey", grafana_no_verify_ssl=True)
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
@@ -74,7 +74,7 @@ def test_send_messages_with_dashboardid(dashboardId):
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=dashboardId, panelId='')
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=dashboardId)
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
@@ -97,7 +97,7 @@ def test_send_messages_with_dashboardid(dashboardId):
assert sent_messages == 1
@pytest.mark.parametrize("panelId", ['42', '0'])
@pytest.mark.parametrize("panelId", [42, 0])
def test_send_messages_with_panelid(panelId):
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
@@ -105,7 +105,7 @@ def test_send_messages_with_panelid(panelId):
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId=panelId)
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=None, panelId=panelId)
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
@@ -122,7 +122,7 @@ def test_send_messages_with_panelid(panelId):
requests_mock.post.assert_called_once_with(
'https://example.com/api/annotations',
headers={'Content-Type': 'application/json', 'Authorization': 'Bearer testapikey'},
json={'text': 'test subject', 'isRegion': True, 'timeEnd': 120000, 'panelId': int(panelId), 'time': 60000},
json={'text': 'test subject', 'isRegion': True, 'timeEnd': 120000, 'panelId': panelId, 'time': 60000},
verify=True,
)
assert sent_messages == 1
@@ -135,7 +135,7 @@ def test_send_messages_with_bothids():
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='42', panelId='42')
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=42, panelId=42)
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
@@ -158,36 +158,6 @@ def test_send_messages_with_bothids():
assert sent_messages == 1
def test_send_messages_with_emptyids():
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
m = {}
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='')
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
[],
[
'https://example.com',
],
)
sent_messages = backend.send_messages(
[
message,
]
)
requests_mock.post.assert_called_once_with(
'https://example.com/api/annotations',
headers={'Content-Type': 'application/json', 'Authorization': 'Bearer testapikey'},
json={'text': 'test subject', 'isRegion': True, 'timeEnd': 120000, 'time': 60000},
verify=True,
)
assert sent_messages == 1
def test_send_messages_with_tags():
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
@@ -195,7 +165,7 @@ def test_send_messages_with_tags():
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='', annotation_tags=["ansible"])
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=None, panelId=None, annotation_tags=["ansible"])
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},

View File

@@ -0,0 +1,37 @@
import json
from http import HTTPStatus
from unittest.mock import patch
from requests import Response
from awx.main.utils.licensing import Licenser
def test_rhsm_licensing():
def mocked_requests_get(*args, **kwargs):
assert kwargs['verify'] == True
response = Response()
subs = json.dumps({'body': []})
response.status_code = HTTPStatus.OK
response._content = bytes(subs, 'utf-8')
return response
licenser = Licenser()
with patch('awx.main.utils.analytics_proxy.OIDCClient.make_request', new=mocked_requests_get):
subs = licenser.get_rhsm_subs('localhost', 'admin', 'admin')
assert subs == []
def test_satellite_licensing():
def mocked_requests_get(*args, **kwargs):
assert kwargs['verify'] == True
response = Response()
subs = json.dumps({'results': []})
response.status_code = HTTPStatus.OK
response._content = bytes(subs, 'utf-8')
return response
licenser = Licenser()
with patch('requests.get', new=mocked_requests_get):
subs = licenser.get_satellite_subs('localhost', 'admin', 'admin')
assert subs == []

View File

@@ -1,154 +0,0 @@
from unittest.mock import patch
from awx.main.utils.licensing import Licenser
def test_validate_rh_basic_auth_rhsm():
"""
Assert get_rhsm_subs is called when
- basic_auth=True
- host is subscription.rhsm.redhat.com
"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com') as mock_get_host, patch.object(
licenser, 'get_rhsm_subs', return_value=[]
) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs') as mock_get_satellite, patch.object(
licenser, 'get_crc_subs'
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
licenser.validate_rh('testuser', 'testpass', basic_auth=True)
# Assert the correct methods were called
mock_get_host.assert_called_once()
mock_get_rhsm.assert_called_once_with('https://subscription.rhsm.redhat.com', 'testuser', 'testpass')
mock_get_satellite.assert_not_called()
mock_get_crc.assert_not_called()
mock_generate.assert_called_once_with([], is_candlepin=True)
def test_validate_rh_basic_auth_satellite():
"""
Assert get_satellite_subs is called when
- basic_auth=True
- custom satellite host
"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://satellite.example.com') as mock_get_host, patch.object(
licenser, 'get_rhsm_subs'
) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs', return_value=[]) as mock_get_satellite, patch.object(
licenser, 'get_crc_subs'
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
licenser.validate_rh('testuser', 'testpass', basic_auth=True)
# Assert the correct methods were called
mock_get_host.assert_called_once()
mock_get_rhsm.assert_not_called()
mock_get_satellite.assert_called_once_with('https://satellite.example.com', 'testuser', 'testpass')
mock_get_crc.assert_not_called()
mock_generate.assert_called_once_with([], is_candlepin=True)
def test_validate_rh_service_account_crc():
"""
Assert get_crc_subs is called when
- basic_auth=False
"""
licenser = Licenser()
with patch('awx.main.utils.licensing.settings') as mock_settings, patch.object(licenser, 'get_host_from_rhsm_config') as mock_get_host, patch.object(
licenser, 'get_rhsm_subs'
) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs') as mock_get_satellite, patch.object(
licenser, 'get_crc_subs', return_value=[]
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
mock_settings.SUBSCRIPTIONS_RHSM_URL = 'https://console.redhat.com/api/rhsm/v1/subscriptions'
licenser.validate_rh('client_id', 'client_secret', basic_auth=False)
# Assert the correct methods were called
mock_get_host.assert_not_called()
mock_get_rhsm.assert_not_called()
mock_get_satellite.assert_not_called()
mock_get_crc.assert_called_once_with('https://console.redhat.com/api/rhsm/v1/subscriptions', 'client_id', 'client_secret')
mock_generate.assert_called_once_with([], is_candlepin=False)
def test_validate_rh_missing_user_raises_error():
"""Test validate_rh raises ValueError when user is missing"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'):
try:
licenser.validate_rh(None, 'testpass', basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_id or subscriptions_username is required' in str(e)
def test_validate_rh_missing_password_raises_error():
"""Test validate_rh raises ValueError when password is missing"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'):
try:
licenser.validate_rh('testuser', None, basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_secret or subscriptions_password is required' in str(e)
def test_validate_rh_no_host_fallback_to_candlepin():
"""Test validate_rh falls back to REDHAT_CANDLEPIN_HOST when no host from config
- basic_auth=True
- no host from config
- REDHAT_CANDLEPIN_HOST is set
"""
licenser = Licenser()
with patch('awx.main.utils.licensing.settings') as mock_settings, patch.object(
licenser, 'get_host_from_rhsm_config', return_value=None
) as mock_get_host, patch.object(licenser, 'get_rhsm_subs', return_value=[]) as mock_get_rhsm, patch.object(
licenser, 'get_satellite_subs', return_value=[]
) as mock_get_satellite, patch.object(
licenser, 'get_crc_subs'
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
mock_settings.REDHAT_CANDLEPIN_HOST = 'https://candlepin.example.com'
licenser.validate_rh('testuser', 'testpass', basic_auth=True)
# Assert the correct methods were called
mock_get_host.assert_called_once()
mock_get_rhsm.assert_not_called()
mock_get_satellite.assert_called_once_with('https://candlepin.example.com', 'testuser', 'testpass')
mock_get_crc.assert_not_called()
mock_generate.assert_called_once_with([], is_candlepin=True)
def test_validate_rh_empty_credentials_basic_auth():
"""Test validate_rh with empty string credentials raises ValueError"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'):
# Test empty user
try:
licenser.validate_rh(None, 'testpass', basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_id or subscriptions_username is required' in str(e)
# Test empty password
try:
licenser.validate_rh('testuser', None, basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_secret or subscriptions_password is required' in str(e)

View File

@@ -219,65 +219,30 @@ class Licenser(object):
kwargs['license_date'] = int(kwargs['license_date'])
self._attrs.update(kwargs)
def get_host_from_rhsm_config(self):
def validate_rh(self, user, pw):
try:
host = 'https://' + str(self.config.get("server", "hostname"))
except Exception:
logger.exception('Cannot access rhsm.conf, make sure subscription manager is installed and configured.')
host = None
return host
def validate_rh(self, user, pw, basic_auth):
# if basic auth is True, host is read from rhsm.conf (subscription.rhsm.redhat.com)
# if basic auth is False, host is settings.SUBSCRIPTIONS_RHSM_URL (console.redhat.com)
# if rhsm.conf is not found, host is settings.REDHAT_CANDLEPIN_HOST (satellite server)
if basic_auth:
host = self.get_host_from_rhsm_config()
if not host:
host = getattr(settings, 'REDHAT_CANDLEPIN_HOST', None)
else:
host = settings.SUBSCRIPTIONS_RHSM_URL
if not host:
raise ValueError('Could not get host url for subscriptions')
host = getattr(settings, 'REDHAT_CANDLEPIN_HOST', None)
if not user:
raise ValueError('subscriptions_client_id or subscriptions_username is required')
raise ValueError('subscriptions_client_id is required')
if not pw:
raise ValueError('subscriptions_client_secret or subscriptions_password is required')
raise ValueError('subscriptions_client_secret is required')
if host and user and pw:
if basic_auth:
if 'subscription.rhsm.redhat.com' in host:
json = self.get_rhsm_subs(host, user, pw)
else:
json = self.get_satellite_subs(host, user, pw)
if 'subscription.rhsm.redhat.com' in host:
json = self.get_rhsm_subs(settings.SUBSCRIPTIONS_RHSM_URL, user, pw)
else:
json = self.get_crc_subs(host, user, pw)
return self.generate_license_options_from_entitlements(json, is_candlepin=basic_auth)
json = self.get_satellite_subs(host, user, pw)
return self.generate_license_options_from_entitlements(json)
return []
def get_rhsm_subs(self, host, user, pw):
verify = getattr(settings, 'REDHAT_CANDLEPIN_VERIFY', True)
json = []
try:
subs = requests.get('/'.join([host, 'subscription/users/{}/owners'.format(user)]), verify=verify, auth=(user, pw))
except requests.exceptions.ConnectionError as error:
raise error
except OSError as error:
raise OSError(
'Unable to open certificate bundle {}. Check that the service is running on Red Hat Enterprise Linux.'.format(verify)
) from error # noqa
subs.raise_for_status()
for sub in subs.json():
resp = requests.get('/'.join([host, 'subscription/owners/{}/pools/?match=*tower*'.format(sub['key'])]), verify=verify, auth=(user, pw))
resp.raise_for_status()
json.extend(resp.json())
return json
def get_crc_subs(self, host, client_id, client_secret):
def get_rhsm_subs(self, host, client_id, client_secret):
try:
client = OIDCClient(client_id, client_secret)
subs = client.make_request(
@@ -355,21 +320,12 @@ class Licenser(object):
json.append(license)
return json
def is_appropriate_sub(self, sub):
if sub['activeSubscription'] is False:
return False
# Products that contain Ansible Tower
products = sub.get('providedProducts', [])
if any(product.get('productId') == '480' for product in products):
return True
return False
def is_appropriate_sat_sub(self, sub):
if 'Red Hat Ansible Automation' not in sub['subscription_name']:
return False
return True
def generate_license_options_from_entitlements(self, json, is_candlepin=False):
def generate_license_options_from_entitlements(self, json):
from dateutil.parser import parse
ValidSub = collections.namedtuple(
@@ -380,14 +336,12 @@ class Licenser(object):
satellite = sub.get('satellite')
if satellite:
is_valid = self.is_appropriate_sat_sub(sub)
elif is_candlepin:
is_valid = self.is_appropriate_sub(sub)
else:
# the list of subs from console.redhat.com and subscriptions.rhsm.redhat.com are already valid based on the query params we provided
# the list of subs from console.redhat.com are already valid based on the query params we provided
is_valid = True
if is_valid:
try:
if is_candlepin:
if satellite:
end_date = parse(sub.get('endDate'))
else:
end_date = parse(sub['subscriptions']['endDate'])
@@ -400,10 +354,10 @@ class Licenser(object):
continue
developer_license = False
support_level = sub.get('support_level', '')
support_level = ''
account_number = ''
usage = sub.get('usage', '')
if is_candlepin:
if satellite:
try:
quantity = int(sub['quantity'])
except Exception:
@@ -411,6 +365,7 @@ class Licenser(object):
sku = sub['productId']
subscription_id = sub['subscriptionId']
sub_name = sub['productName']
support_level = sub['support_level']
account_number = sub['accountNumber']
else:
try:
@@ -479,8 +434,6 @@ class Licenser(object):
license.update(subscription_id=sub.subscription_id)
license.update(account_number=sub.account_number)
licenses.append(license._attrs.copy())
# sort by sku
licenses.sort(key=lambda x: x['sku'])
return licenses
raise ValueError('No valid Red Hat Ansible Automation subscription could be found for this account.') # noqa

View File

@@ -19,27 +19,18 @@ short_description: Get subscription list
description:
- Get subscriptions available to Automation Platform Controller. See
U(https://www.ansible.com/tower) for an overview.
- The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions
options:
username:
description:
- Red Hat username to get available subscriptions.
required: False
type: str
password:
description:
- Red Hat password to get available subscriptions.
required: False
type: str
client_id:
description:
- Red Hat service account client ID to get available subscriptions.
required: False
- Red Hat service account client ID or Red Hat Satellite username to get available subscriptions.
- The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions
required: True
type: str
client_secret:
description:
- Red Hat service account client secret to get available subscriptions.
required: False
- Red Hat service account client secret or Red Hat Satellite password to get available subscriptions.
- The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions
required: True
type: str
filters:
description:
@@ -81,41 +72,19 @@ def main():
module = ControllerAPIModule(
argument_spec=dict(
username=dict(type='str', required=False),
password=dict(type='str', no_log=True, required=False),
client_id=dict(type='str', required=False),
client_secret=dict(type='str', no_log=True, required=False),
client_id=dict(type='str', required=True),
client_secret=dict(type='str', no_log=True, required=True),
filters=dict(type='dict', required=False, default={}),
),
mutually_exclusive=[
['username', 'client_id']
],
required_together=[
['username', 'password'],
['client_id', 'client_secret']
],
required_one_of=[
['username', 'client_id']
],
)
json_output = {'changed': False}
username = module.params.get('username')
password = module.params.get('password')
client_id = module.params.get('client_id')
client_secret = module.params.get('client_secret')
if username and password:
post_data = {
'subscriptions_username': username,
'subscriptions_password': password,
}
else:
post_data = {
'subscriptions_client_id': client_id,
'subscriptions_client_secret': client_secret,
}
# Check if Tower is already licensed
post_data = {
'subscriptions_client_secret': module.params.get('client_secret'),
'subscriptions_client_id': module.params.get('client_id'),
}
all_subscriptions = module.post_endpoint('config/subscriptions', data=post_data)['json']
json_output['subscriptions'] = []
for subscription in all_subscriptions:

View File

@@ -82,38 +82,7 @@ class CLI(object):
return '--help' in self.argv or '-h' in self.argv
def authenticate(self):
"""Configure the current session for authentication.
Uses Basic authentication when AWXKIT_FORCE_BASIC_AUTH environment variable
is set to true, otherwise defaults to session-based authentication.
For AAP Gateway environments, set AWXKIT_FORCE_BASIC_AUTH=true to bypass
session login restrictions.
"""
# Check if Basic auth is forced via environment variable
if config.get('force_basic_auth', False):
config.use_sessions = False
# Validate credentials are provided
username = self.get_config('username')
password = self.get_config('password')
if not username or not password:
raise ValueError(
"Basic authentication requires both username and password. "
"Provide --conf.username and --conf.password or set "
"CONTROLLER_USERNAME and CONTROLLER_PASSWORD environment variables."
)
# Apply Basic auth credentials to the session
try:
self.root.connection.login(username, password)
self.root.get()
except Exception as e:
raise RuntimeError(f"Basic authentication failed: {str(e)}. " "Verify credentials and network connectivity.") from e
return
# Use session-based authentication (default)
"""Configure the current session for basic auth"""
config.use_sessions = True
self.root.load_session().get()

View File

@@ -32,7 +32,6 @@ config.assume_untrusted = config.get('assume_untrusted', True)
config.client_connection_attempts = int(os.getenv('AWXKIT_CLIENT_CONNECTION_ATTEMPTS', 5))
config.prevent_teardown = to_bool(os.getenv('AWXKIT_PREVENT_TEARDOWN', False))
config.use_sessions = to_bool(os.getenv('AWXKIT_SESSIONS', False))
config.force_basic_auth = to_bool(os.getenv('AWXKIT_FORCE_BASIC_AUTH', False))
config.api_base_path = os.getenv('CONTROLLER_OPTIONAL_API_URLPATTERN_PREFIX', '/api/')
config.api_base_path = os.getenv('AWXKIT_API_BASE_PATH', config.api_base_path)
config.gateway_base_path = os.getenv('AWXKIT_GATEWAY_BASE_PATH', '/api/gateway/')

View File

@@ -1,103 +0,0 @@
import pytest
from typing import Tuple, List, Optional
from unittest.mock import Mock
from awxkit.cli import CLI
from awxkit import config
@pytest.fixture(autouse=True)
def reset_config_state(monkeypatch: pytest.MonkeyPatch) -> None:
"""Ensure clean config state for each test to prevent parallel test interference"""
monkeypatch.setattr(config, 'force_basic_auth', False, raising=False)
monkeypatch.setattr(config, 'use_sessions', False, raising=False)
# ============================================================================
# Test Helper Functions
# ============================================================================
def setup_basic_auth(cli_args: Optional[List[str]] = None) -> Tuple[CLI, Mock, Mock]:
"""Set up CLI with mocked connection for Basic auth testing"""
cli = CLI()
cli.parse_args(cli_args or ['awx', '--conf.username', 'testuser', '--conf.password', 'testpass'])
mock_root = Mock()
mock_connection = Mock()
mock_root.connection = mock_connection
cli.root = mock_root
return cli, mock_root, mock_connection
def setup_session_auth(cli_args: Optional[List[str]] = None) -> Tuple[CLI, Mock, Mock]:
"""Set up CLI with mocked session for Session auth testing"""
cli = CLI()
cli.parse_args(cli_args or ['awx', '--conf.username', 'testuser', '--conf.password', 'testpass'])
mock_root = Mock()
mock_load_session = Mock()
mock_root.load_session.return_value = mock_load_session
cli.root = mock_root
return cli, mock_root, mock_load_session
def test_basic_auth_enabled(monkeypatch):
"""Test that AWXKIT_FORCE_BASIC_AUTH=true enables Basic authentication"""
cli, mock_root, mock_connection = setup_basic_auth()
monkeypatch.setattr(config, 'force_basic_auth', True)
cli.authenticate()
mock_connection.login.assert_called_once_with('testuser', 'testpass')
mock_root.get.assert_called_once()
assert not config.use_sessions
def test_session_auth_default(monkeypatch):
"""Test that session auth is used by default (backward compatibility)"""
cli, mock_root, mock_load_session = setup_session_auth()
monkeypatch.setattr(config, 'force_basic_auth', False)
cli.authenticate()
mock_root.load_session.assert_called_once()
mock_load_session.get.assert_called_once()
assert config.use_sessions
def test_aap_gateway_scenario(monkeypatch):
"""Test the specific AAP Gateway scenario from AAP-46830"""
cli, mock_root, mock_connection = setup_basic_auth(
['awx', '--conf.host', 'https://aap-sbx.cambiahealth.com', '--conf.username', 'puretest', '--conf.password', 'testpass']
)
monkeypatch.setattr(config, 'force_basic_auth', True)
cli.authenticate()
mock_connection.login.assert_called_once_with('puretest', 'testpass')
mock_root.get.assert_called_once()
assert not config.use_sessions
def test_empty_credentials_error(monkeypatch):
"""Test error handling for explicitly empty credentials"""
cli, mock_root, mock_connection = setup_basic_auth(['awx', '--conf.username', '', '--conf.password', ''])
monkeypatch.setattr(config, 'force_basic_auth', True)
with pytest.raises(ValueError, match="Basic authentication requires both username and password"):
cli.authenticate()
mock_connection.login.assert_not_called()
def test_connection_failure(monkeypatch):
"""Test error handling when Basic auth connection fails"""
cli, mock_root, mock_connection = setup_basic_auth()
mock_connection.login.side_effect = Exception("Connection failed")
monkeypatch.setattr(config, 'force_basic_auth', True)
with pytest.raises(RuntimeError, match="Basic authentication failed: Connection failed"):
cli.authenticate()
mock_connection.login.assert_called_once_with('testuser', 'testpass')
assert not config.use_sessions

View File

@@ -29,3 +29,4 @@ debugpy
remote-pdb
sdb
jupyterlab>=4.4.8 # not directly required, pinned by Snyk to avoid a vulnerability

View File

@@ -1,5 +1,5 @@
certifi @ git+https://github.com/ansible/system-certifi.git@devel#egg=certifi
ansible-runner @ git+https://github.com/ansible/ansible-runner.git@devel#egg=ansible-runner
git+https://github.com/ansible/system-certifi.git@devel#egg=certifi
git+https://github.com/ansible/ansible-runner.git@devel#egg=ansible-runner
awx-plugins-core @ git+https://github.com/ansible/awx-plugins.git@devel#egg=awx-plugins-core[credentials-github-app]
django-ansible-base @ git+https://github.com/ansible/django-ansible-base@devel#egg=django-ansible-base[rest-filters,jwt_consumer,resource-registry,rbac,feature-flags]
awx_plugins.interfaces @ git+https://github.com/ansible/awx_plugins.interfaces.git

View File

@@ -1,141 +0,0 @@
# SonarCloud project configuration for AWX
# Complete documentation: https://docs.sonarqube.org/latest/analysis/analysis-parameters/
# =============================================================================
# PROJECT IDENTIFICATION (REQUIRED)
# =============================================================================
# The unique project identifier. This is mandatory.
# Do not duplicate or reuse!
# Available characters: [a-zA-Z0-9_:\.\-]
# Must have least one non-digit.
sonar.projectKey=ansible_awx
sonar.organization=ansible
# Project metadata
sonar.projectName=awx
# =============================================================================
# SOURCE AND TEST CONFIGURATION
# =============================================================================
# Source directories to analyze
sonar.sources=.
sonar.inclusions=awx/**
# Test directories
sonar.tests=awx/main/tests
# Test file patterns
sonar.test.inclusions=\
**/test_*.py,\
**/*_test.py,\
**/tests/**/*.py
# Set branch-specific new code definition
#
# This is important to always check against the main branch for new PRs,
# otherwise the PR may fail during backporting, since the old version of the code
# may not respect the minimum requirements for the existing Quality Gate.
sonar.newCode.referenceBranch=devel
# =============================================================================
# LANGUAGE CONFIGURATION
# =============================================================================
# Python versions supported by the project
#sonar.python.version=3.9,3.10,3.11
# File encoding
sonar.sourceEncoding=UTF-8
# =============================================================================
# REPORTS AND COVERAGE
# =============================================================================
# Test and coverage reports (paths relative to project root)
sonar.python.coverage.reportPaths=reports/coverage.xml
sonar.python.xunit.reportPath=/reports/junit.xml
# External tool reports (add these paths when tools are configured)
# sonar.python.pylint.reportPaths=reports/pylint-report.txt
# sonar.python.bandit.reportPaths=reports/bandit-report.json
# sonar.python.mypy.reportPath=reports/mypy-report.txt
# sonar.python.flake8.reportPaths=reports/flake8-report.txt
# sonar.python.xunit.reportPath=reports/junit.xml
# =============================================================================
# EXCLUSIONS - FILES AND DIRECTORIES TO IGNORE
# =============================================================================
# General exclusions - files and directories to ignore from analysis
sonar.exclusions=\
**/tests/**,\
**/__pycache__/**,\
**/*.pyc,\
**/*.pyo,\
**/*.pyd,\
**/build/**,\
**/dist/**,\
**/*.egg-info/**
# =============================================================================
# COVERAGE EXCLUSIONS
# =============================================================================
# Files to exclude from coverage calculations
sonar.coverage.exclusions=\
**/tests/**,\
**/.tox/**,\
**/test_*.py,\
**/*_test.py,\
**/conftest.py,\
**/migrations/**,\
**/settings*.py,\
**/defaults.py,\
**/manage.py,\
**/__main__.py,\
tools/scripts/**
# =============================================================================
# DUPLICATION EXCLUSIONS
# =============================================================================
# Ignore code duplication in migrations and tests
sonar.cpd.exclusions=\
**/migrations/**,\
**/tests/**
# =============================================================================
# ISSUE IGNORE RULES
# =============================================================================
# Ignore specific rules for certain file patterns
sonar.issue.ignore.multicriteria=e1
# Ignore "should be a variable" in migrations
sonar.issue.ignore.multicriteria.e1.ruleKey=python:S1192
sonar.issue.ignore.multicriteria.e1.resourceKey=**/migrations/**/*
# =============================================================================
# GITHUB INTEGRATION
# =============================================================================
# The following properties are automatically handled by GitHub Actions:
# sonar.pullrequest.key - handled automatically
# sonar.pullrequest.branch - handled automatically
# sonar.pullrequest.base - handled automatically
# =============================================================================
# DEBUGGING
# =============================================================================
# These are aggressive settings to ensure maximum detection
# do not use in production
# sonar.verbose=true
# sonar.log.level=DEBUG
# sonar.scm.exclusions.disabled=true
# sonar.java.skipUnchanged=false
# sonar.scm.forceReloadAll=true
# sonar.filesize.limit=100
# sonar.qualitygate.wait=true