Compare commits

...

14 Commits

Author SHA1 Message Date
Jake Jackson
c8174a843b Update requirements_git.txt to use new format 2025-10-14 11:16:32 -04:00
Jake Jackson
72a42f23d5 Remove 'UI' from PR template component options (#16123)
Removed 'UI' from the list of component names in the PR template.
2025-10-13 10:23:56 -04:00
Jake Jackson
309724b12b Add SonarQube Coverage and report generation (#16112)
* added sonar config file and started cleaning up
* we do not place the report at the root of the repo
* limit scope to only the awx directory and its contents
* update exclusions for things in awx/ that we don't want covered
2025-10-10 10:44:35 -04:00
Seth Foster
300605ff73 Make subscriptions credentials mutually exclusive (#16126)
settings.SUBSCRIPTIONS_USERNAME and
settings.SUBSCRIPTIONS_CLIENT_ID

should be mutually exclusive. This is because
the POST to api/v2/config/attach/ accepts only
a subscription_id, and infers which credentials to
use based on settings. If both are set, it is ambiguous
and can lead to unexpected 400s when attempting
to attach a license.

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
2025-10-09 16:57:58 -04:00
Jake Jackson
77fab1c534 Update dependabot to check python deps (#16127)
* add list item to check python deps daily
* move to weekly after we gain confidence
2025-10-09 19:40:34 +00:00
Chris Meyers
51b2524b25 Gracefully handle hostname change in metrics code
* Previously, we would error out because we assumed that when we got a
  metrics payload from redis, that there was data in it and it was for
  the current host.
* Now, we do not assume that since we got a metrics payload, that is
  well formed and for the current hostname because the hostname could
  have changed and we could have not yet collected metrics for the new
  host.
2025-10-09 14:08:01 -04:00
Chris Coutinho
612e8e7688 Fix duplicate metrics in AWX subsystem_metrics (#15964)
Separate out operation subsystem metrics to fix duplicate error

Remove unnecessary comments

Revert to single subsystem_metrics_* metric with labels

Format via black
2025-10-09 10:28:55 +02:00
Andrea Restle-Lay
0d18308112 [AAP-46830]: Fix AWX CLI authentication with AAP Gateway environments (#16113)
* migrate pr #16081 to new fork

* add test coverage - add clearer error messaging

* update tests to use monkeypatch
2025-10-02 10:06:10 -04:00
Chris Meyers
f51af03424 Create system_administrator rbac role in migration
* We had race conditions with the system_administrator role being
  created just-in-time. Instead of fixing the race condition(s), dodge
  them by ensuring the role always exists
2025-10-02 08:25:40 -04:00
Alan Rominger
622f6ea166 AAP-53980 Disconnect logic to fill in role parents (#15462)
* Disconnect logic to fill in role parents

Get tests passing hopefully

Whatever SonarCloud

* remove role parents/children endpoints and related views

* remove duplicate get_queryset method from RoleTeamsList

---------

Co-authored-by: Peter Braun <pbraun@redhat.com>
2025-10-02 13:06:37 +02:00
Seth Foster
2729076f7f Add basic auth to subscription management API (#16103)
Allow users to do subscription management using
Red Hat username and password.

In basic auth case, the candlepin API
at subscriptions.rhsm.redhat.com will be used instead
of console.redhat.com.

Signed-off-by: Seth Foster <fosterbseth@gmail.com>
2025-10-02 01:06:47 -04:00
Alan Rominger
6db08bfa4e Rewrite the s3 upload step to fix breakage with new Ansible version (#16111)
* Rewrite the s3 upload step to fix breakage with new Ansible version

* Use commit hash for security

* Add the public read flag
2025-09-30 15:47:54 -04:00
Stevenson Michel
ceed41d352 Sharing Credentials Across Organizations (#16106)
* Added tests for cross org sharing of credentials

* added negative testing for sharing of credentials

* added conditions and tests for roleteamslist regarding cross org credentials

* removed redundant codes

* made error message more articulated and specific
2025-09-30 10:44:27 -04:00
jessicamack
98697a8ce7 Fix Grafana notification bug (#16104)
* accept empty string for dashboard and panel IDs

* update grafana tests and add new one
2025-09-29 10:04:58 -04:00
28 changed files with 1165 additions and 474 deletions

View File

@@ -17,7 +17,6 @@ in as the first entry for your PR title.
##### COMPONENT NAME
<!--- Name of the module/plugin/module/task -->
- API
- UI
- Collection
- CLI
- Docs

View File

@@ -8,3 +8,10 @@ updates:
labels:
- "docs"
- "dependencies"
- package-ecosystem: "pip"
directory: "requirements/"
schedule:
interval: "daily" #run daily until we trust it, then back this off to weekly
open-pull-requests-limit: 2
labels:
- "dependencies"

85
.github/workflows/sonarcloud_pr.yml vendored Normal file
View File

@@ -0,0 +1,85 @@
---
name: SonarQube
on:
workflow_run:
workflows:
- CI
types:
- completed
permissions: read-all
jobs:
sonarqube:
runs-on: ubuntu-latest
if: github.event.workflow_run.conclusion == 'success' && github.event.workflow_run.event == 'pull_request'
steps:
- name: Checkout Code
uses: actions/checkout@v4
with:
fetch-depth: 0
show-progress: false
- name: Download coverage report artifact
uses: actions/download-artifact@v4
with:
name: coverage-report
path: reports/
github-token: ${{ secrets.GITHUB_TOKEN }}
run-id: ${{ github.event.workflow_run.id }}
- name: Download PR number artifact
uses: actions/download-artifact@v4
with:
name: pr-number
path: .
github-token: ${{ secrets.GITHUB_TOKEN }}
run-id: ${{ github.event.workflow_run.id }}
- name: Extract PR number
run: |
cat pr-number.txt
echo "PR_NUMBER=$(cat pr-number.txt)" >> $GITHUB_ENV
- name: Get PR info
uses: octokit/request-action@v2.x
id: pr_info
with:
route: GET /repos/{repo}/pulls/{number}
repo: ${{ github.event.repository.full_name }}
number: ${{ env.PR_NUMBER }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Set PR info into env
run: |
echo "PR_BASE=${{ fromJson(steps.pr_info.outputs.data).base.ref }}" >> $GITHUB_ENV
echo "PR_HEAD=${{ fromJson(steps.pr_info.outputs.data).head.ref }}" >> $GITHUB_ENV
- name: Add base branch
run: |
gh pr checkout ${{ env.PR_NUMBER }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Extract and export repo owner/name
run: |
REPO_SLUG="${GITHUB_REPOSITORY}"
IFS="/" read -r REPO_OWNER REPO_NAME <<< "$REPO_SLUG"
echo "REPO_OWNER=$REPO_OWNER" >> $GITHUB_ENV
echo "REPO_NAME=$REPO_NAME" >> $GITHUB_ENV
- name: SonarQube scan
uses: SonarSource/sonarqube-scan-action@v5
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
SONAR_TOKEN: ${{ secrets[format('{0}', vars.SONAR_TOKEN_SECRET_NAME)] }}
with:
args: >
-Dsonar.organization=${{ env.REPO_OWNER }}
-Dsonar.projectKey=${{ env.REPO_OWNER }}_${{ env.REPO_NAME }}
-Dsonar.pullrequest.key=${{ env.PR_NUMBER }}
-Dsonar.pullrequest.branch=${{ env.PR_HEAD }}
-Dsonar.pullrequest.base=${{ env.PR_BASE }}
-Dsonar.scm.revision=${{ github.event.workflow_run.head_sha }}

View File

@@ -38,11 +38,12 @@ jobs:
--workdir=/awx_devel `make print-DEVEL_IMAGE_NAME` /start_tests.sh genschema
- name: Upload API Schema
env:
AWS_ACCESS_KEY: ${{ secrets.AWS_ACCESS_KEY }}
AWS_SECRET_KEY: ${{ secrets.AWS_SECRET_KEY }}
AWS_REGION: 'us-east-1'
run: |
ansible localhost -c local, -m command -a "{{ ansible_python_interpreter + ' -m pip install boto3'}}"
ansible localhost -c local -m aws_s3 \
-a "src=${{ github.workspace }}/schema.json bucket=awx-public-ci-files object=${GITHUB_REF##*/}/schema.json mode=put permission=public-read"
uses: keithweaver/aws-s3-github-action@4dd5a7b81d54abaa23bbac92b27e85d7f405ae53
with:
command: cp
source: ${{ github.workspace }}/schema.json
destination: s3://awx-public-ci-files/${{ github.ref_name }}/schema.json
aws_access_key_id: ${{ secrets.AWS_ACCESS_KEY }}
aws_secret_access_key: ${{ secrets.AWS_SECRET_KEY }}
aws_region: us-east-1
flags: --acl public-read --only-show-errors

View File

@@ -3,7 +3,7 @@
from django.urls import re_path
from awx.api.views import RoleList, RoleDetail, RoleUsersList, RoleTeamsList, RoleParentsList, RoleChildrenList
from awx.api.views import RoleList, RoleDetail, RoleUsersList, RoleTeamsList
urls = [
@@ -11,8 +11,6 @@ urls = [
re_path(r'^(?P<pk>[0-9]+)/$', RoleDetail.as_view(), name='role_detail'),
re_path(r'^(?P<pk>[0-9]+)/users/$', RoleUsersList.as_view(), name='role_users_list'),
re_path(r'^(?P<pk>[0-9]+)/teams/$', RoleTeamsList.as_view(), name='role_teams_list'),
re_path(r'^(?P<pk>[0-9]+)/parents/$', RoleParentsList.as_view(), name='role_parents_list'),
re_path(r'^(?P<pk>[0-9]+)/children/$', RoleChildrenList.as_view(), name='role_children_list'),
]
__all__ = ['urls']

View File

@@ -720,9 +720,19 @@ class TeamRolesList(SubListAttachDetachAPIView):
team = get_object_or_404(models.Team, pk=self.kwargs['pk'])
credential_content_type = ContentType.objects.get_for_model(models.Credential)
if role.content_type == credential_content_type:
if not role.content_object.organization or role.content_object.organization.id != team.organization.id:
data = dict(msg=_("You cannot grant credential access to a team when the Organization field isn't set, or belongs to a different organization"))
if not role.content_object.organization:
data = dict(
msg=_("You cannot grant access to a credential that is not assigned to an organization (private credentials cannot be assigned to teams)")
)
return Response(data, status=status.HTTP_400_BAD_REQUEST)
elif role.content_object.organization.id != team.organization.id:
if not request.user.is_superuser:
data = dict(
msg=_(
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
)
)
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(TeamRolesList, self).post(request, *args, **kwargs)
@@ -4203,9 +4213,21 @@ class RoleTeamsList(SubListAttachDetachAPIView):
credential_content_type = ContentType.objects.get_for_model(models.Credential)
if role.content_type == credential_content_type:
if not role.content_object.organization or role.content_object.organization.id != team.organization.id:
data = dict(msg=_("You cannot grant credential access to a team when the Organization field isn't set, or belongs to a different organization"))
# Private credentials (no organization) are never allowed for teams
if not role.content_object.organization:
data = dict(
msg=_("You cannot grant access to a credential that is not assigned to an organization (private credentials cannot be assigned to teams)")
)
return Response(data, status=status.HTTP_400_BAD_REQUEST)
# Cross-organization credentials are only allowed for superusers
elif role.content_object.organization.id != team.organization.id:
if not request.user.is_superuser:
data = dict(
msg=_(
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
)
)
return Response(data, status=status.HTTP_400_BAD_REQUEST)
action = 'attach'
if request.data.get('disassociate', None):
@@ -4225,34 +4247,6 @@ class RoleTeamsList(SubListAttachDetachAPIView):
return Response(status=status.HTTP_204_NO_CONTENT)
class RoleParentsList(SubListAPIView):
deprecated = True
model = models.Role
serializer_class = serializers.RoleSerializer
parent_model = models.Role
relationship = 'parents'
permission_classes = (IsAuthenticated,)
search_fields = ('role_field', 'content_type__model')
def get_queryset(self):
role = models.Role.objects.get(pk=self.kwargs['pk'])
return models.Role.filter_visible_roles(self.request.user, role.parents.all())
class RoleChildrenList(SubListAPIView):
deprecated = True
model = models.Role
serializer_class = serializers.RoleSerializer
parent_model = models.Role
relationship = 'children'
permission_classes = (IsAuthenticated,)
search_fields = ('role_field', 'content_type__model')
def get_queryset(self):
role = models.Role.objects.get(pk=self.kwargs['pk'])
return models.Role.filter_visible_roles(self.request.user, role.children.all())
# Create view functions for all of the class-based views to simplify inclusion
# in URL patterns and reverse URL lookups, converting CamelCase names to
# lowercase_with_underscore (e.g. MyView.as_view() becomes my_view).

View File

@@ -180,16 +180,47 @@ class ApiV2SubscriptionView(APIView):
def post(self, request):
data = request.data.copy()
if data.get('subscriptions_client_secret') == '$encrypted$':
data['subscriptions_client_secret'] = settings.SUBSCRIPTIONS_CLIENT_SECRET
try:
user, pw = data.get('subscriptions_client_id'), data.get('subscriptions_client_secret')
user = None
pw = None
basic_auth = False
# determine if the credentials are for basic auth or not
if data.get('subscriptions_client_id'):
user, pw = data.get('subscriptions_client_id'), data.get('subscriptions_client_secret')
if pw == '$encrypted$':
pw = settings.SUBSCRIPTIONS_CLIENT_SECRET
elif data.get('subscriptions_username'):
user, pw = data.get('subscriptions_username'), data.get('subscriptions_password')
if pw == '$encrypted$':
pw = settings.SUBSCRIPTIONS_PASSWORD
basic_auth = True
if not user or not pw:
return Response({"error": _("Missing subscription credentials")}, status=status.HTTP_400_BAD_REQUEST)
with set_environ(**settings.AWX_TASK_ENV):
validated = get_licenser().validate_rh(user, pw)
if user:
settings.SUBSCRIPTIONS_CLIENT_ID = data['subscriptions_client_id']
if pw:
settings.SUBSCRIPTIONS_CLIENT_SECRET = data['subscriptions_client_secret']
validated = get_licenser().validate_rh(user, pw, basic_auth)
# update settings if the credentials were valid
if basic_auth:
if user:
settings.SUBSCRIPTIONS_USERNAME = user
if pw:
settings.SUBSCRIPTIONS_PASSWORD = pw
# mutual exclusion for basic auth and service account
# only one should be set at a given time so that
# config/attach/ knows which credentials to use
settings.SUBSCRIPTIONS_CLIENT_ID = ""
settings.SUBSCRIPTIONS_CLIENT_SECRET = ""
else:
if user:
settings.SUBSCRIPTIONS_CLIENT_ID = user
if pw:
settings.SUBSCRIPTIONS_CLIENT_SECRET = pw
# mutual exclusion for basic auth and service account
settings.SUBSCRIPTIONS_USERNAME = ""
settings.SUBSCRIPTIONS_PASSWORD = ""
except Exception as exc:
msg = _("Invalid Subscription")
if isinstance(exc, TokenError) or (
@@ -225,16 +256,21 @@ class ApiV2AttachView(APIView):
if not subscription_id:
return Response({"error": _("No subscription ID provided.")}, status=status.HTTP_400_BAD_REQUEST)
# Ensure we always use the latest subscription credentials
cache.delete_many(['SUBSCRIPTIONS_CLIENT_ID', 'SUBSCRIPTIONS_CLIENT_SECRET'])
cache.delete_many(['SUBSCRIPTIONS_CLIENT_ID', 'SUBSCRIPTIONS_CLIENT_SECRET', 'SUBSCRIPTIONS_USERNAME', 'SUBSCRIPTIONS_PASSWORD'])
user = getattr(settings, 'SUBSCRIPTIONS_CLIENT_ID', None)
pw = getattr(settings, 'SUBSCRIPTIONS_CLIENT_SECRET', None)
basic_auth = False
if not (user and pw):
user = getattr(settings, 'SUBSCRIPTIONS_USERNAME', None)
pw = getattr(settings, 'SUBSCRIPTIONS_PASSWORD', None)
basic_auth = True
if not (user and pw):
return Response({"error": _("Missing subscription credentials")}, status=status.HTTP_400_BAD_REQUEST)
if subscription_id and user and pw:
data = request.data.copy()
try:
with set_environ(**settings.AWX_TASK_ENV):
validated = get_licenser().validate_rh(user, pw)
validated = get_licenser().validate_rh(user, pw, basic_auth)
except Exception as exc:
msg = _("Invalid Subscription")
if isinstance(exc, requests.exceptions.HTTPError) and getattr(getattr(exc, 'response', None), 'status_code', None) == 401:
@@ -248,6 +284,7 @@ class ApiV2AttachView(APIView):
else:
logger.exception(smart_str(u"Invalid subscription submitted."), extra=dict(actor=request.user.username))
return Response({"error": msg}, status=status.HTTP_400_BAD_REQUEST)
for sub in validated:
if sub['subscription_id'] == subscription_id:
sub['valid_key'] = True

View File

@@ -44,11 +44,12 @@ class MetricsServer(MetricsServerSettings):
class BaseM:
def __init__(self, field, help_text):
def __init__(self, field, help_text, labels=None):
self.field = field
self.help_text = help_text
self.current_value = 0
self.metric_has_changed = False
self.labels = labels or {}
def reset_value(self, conn):
conn.hset(root_key, self.field, 0)
@@ -69,12 +70,16 @@ class BaseM:
value = conn.hget(root_key, self.field)
return self.decode_value(value)
def to_prometheus(self, instance_data):
def to_prometheus(self, instance_data, namespace=None):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} gauge\n"
for instance in instance_data:
if self.field in instance_data[instance]:
# Build label string
labels = f'node="{instance}"'
if namespace:
labels += f',subsystem="{namespace}"'
# on upgrade, if there are stale instances, we can end up with issues where new metrics are not present
output_text += f'{self.field}{{node="{instance}"}} {instance_data[instance][self.field]}\n'
output_text += f'{self.field}{{{labels}}} {instance_data[instance][self.field]}\n'
return output_text
@@ -167,14 +172,17 @@ class HistogramM(BaseM):
self.sum.store_value(conn)
self.inf.store_value(conn)
def to_prometheus(self, instance_data):
def to_prometheus(self, instance_data, namespace=None):
output_text = f"# HELP {self.field} {self.help_text}\n# TYPE {self.field} histogram\n"
for instance in instance_data:
# Build label string
node_label = f'node="{instance}"'
subsystem_label = f',subsystem="{namespace}"' if namespace else ''
for i, b in enumerate(self.buckets):
output_text += f'{self.field}_bucket{{le="{b}",node="{instance}"}} {sum(instance_data[instance][self.field]["counts"][0:i+1])}\n'
output_text += f'{self.field}_bucket{{le="+Inf",node="{instance}"}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_count{{node="{instance}"}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_sum{{node="{instance}"}} {instance_data[instance][self.field]["sum"]}\n'
output_text += f'{self.field}_bucket{{le="{b}",{node_label}{subsystem_label}}} {sum(instance_data[instance][self.field]["counts"][0:i+1])}\n'
output_text += f'{self.field}_bucket{{le="+Inf",{node_label}{subsystem_label}}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_count{{{node_label}{subsystem_label}}} {instance_data[instance][self.field]["inf"]}\n'
output_text += f'{self.field}_sum{{{node_label}{subsystem_label}}} {instance_data[instance][self.field]["sum"]}\n'
return output_text
@@ -273,20 +281,22 @@ class Metrics(MetricsNamespace):
def pipe_execute(self):
if self.metrics_have_changed is True:
duration_to_save = time.perf_counter()
duration_pipe_exec = time.perf_counter()
for m in self.METRICS:
self.METRICS[m].store_value(self.pipe)
self.pipe.execute()
self.last_pipe_execute = time.time()
self.metrics_have_changed = False
duration_to_save = time.perf_counter() - duration_to_save
self.METRICS['subsystem_metrics_pipe_execute_seconds'].inc(duration_to_save)
self.METRICS['subsystem_metrics_pipe_execute_calls'].inc(1)
duration_pipe_exec = time.perf_counter() - duration_pipe_exec
duration_to_save = time.perf_counter()
duration_send_metrics = time.perf_counter()
self.send_metrics()
duration_to_save = time.perf_counter() - duration_to_save
self.METRICS['subsystem_metrics_send_metrics_seconds'].inc(duration_to_save)
duration_send_metrics = time.perf_counter() - duration_send_metrics
# Increment operational metrics
self.METRICS['subsystem_metrics_pipe_execute_seconds'].inc(duration_pipe_exec)
self.METRICS['subsystem_metrics_pipe_execute_calls'].inc(1)
self.METRICS['subsystem_metrics_send_metrics_seconds'].inc(duration_send_metrics)
def send_metrics(self):
# more than one thread could be calling this at the same time, so should
@@ -352,7 +362,13 @@ class Metrics(MetricsNamespace):
if instance_data:
for field in self.METRICS:
if len(metrics_filter) == 0 or field in metrics_filter:
output_text += self.METRICS[field].to_prometheus(instance_data)
# Add subsystem label only for operational metrics
namespace = (
self._namespace
if field in ['subsystem_metrics_pipe_execute_seconds', 'subsystem_metrics_pipe_execute_calls', 'subsystem_metrics_send_metrics_seconds']
else None
)
output_text += self.METRICS[field].to_prometheus(instance_data, namespace)
return output_text
@@ -440,7 +456,10 @@ class CustomToPrometheusMetricsCollector(prometheus_client.registry.Collector):
logger.debug(f"No metric data not found in redis for metric namespace '{self._metrics._namespace}'")
return None
host_metrics = instance_data.get(my_hostname)
if not (host_metrics := instance_data.get(my_hostname)):
logger.debug(f"Metric data for this node '{my_hostname}' not found in redis for metric namespace '{self._metrics._namespace}'")
return None
for _, metric in self._metrics.METRICS.items():
entry = host_metrics.get(metric.field)
if not entry:

View File

@@ -144,6 +144,35 @@ register(
category_slug='system',
)
register(
'SUBSCRIPTIONS_USERNAME',
field_class=fields.CharField,
default='',
allow_blank=True,
encrypted=False,
read_only=False,
label=_('Red Hat Username for Subscriptions'),
help_text=_('Username used to retrieve subscription and content information'), # noqa
category=_('System'),
category_slug='system',
hidden=True,
)
register(
'SUBSCRIPTIONS_PASSWORD',
field_class=fields.CharField,
default='',
allow_blank=True,
encrypted=True,
read_only=False,
label=_('Red Hat Password for Subscriptions'),
help_text=_('Password used to retrieve subscription and content information'), # noqa
category=_('System'),
category_slug='system',
hidden=True,
)
register(
'SUBSCRIPTIONS_CLIENT_ID',
field_class=fields.CharField,
@@ -155,6 +184,7 @@ register(
help_text=_('Client ID used to retrieve subscription and content information'), # noqa
category=_('System'),
category_slug='system',
hidden=True,
)
register(
@@ -168,6 +198,7 @@ register(
help_text=_('Client secret used to retrieve subscription and content information'), # noqa
category=_('System'),
category_slug='system',
hidden=True,
)
register(

View File

@@ -14,21 +14,14 @@ from jinja2.exceptions import UndefinedError, TemplateSyntaxError, SecurityError
# Django
from django.core import exceptions as django_exceptions
from django.core.serializers.json import DjangoJSONEncoder
from django.db.models.signals import (
post_save,
post_delete,
)
from django.db.models.signals import m2m_changed
from django.db.models.signals import m2m_changed, post_save
from django.db import models
from django.db.models.fields.related import lazy_related_operation
from django.db.models.fields.related_descriptors import (
ReverseOneToOneDescriptor,
ForwardManyToOneDescriptor,
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
create_forward_many_to_many_manager,
)
from django.utils.encoding import smart_str
from django.db.models import JSONField
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _
@@ -54,7 +47,6 @@ __all__ = [
'ImplicitRoleField',
'SmartFilterField',
'OrderedManyToManyField',
'update_role_parentage_for_instance',
'is_implicit_parent',
]
@@ -146,34 +138,6 @@ class AutoOneToOneField(models.OneToOneField):
setattr(cls, related.get_accessor_name(), AutoSingleRelatedObjectDescriptor(related))
def resolve_role_field(obj, field):
ret = []
field_components = field.split('.', 1)
if hasattr(obj, field_components[0]):
obj = getattr(obj, field_components[0])
else:
return []
if obj is None:
return []
if len(field_components) == 1:
# use extremely generous duck typing to accomidate all possible forms
# of the model that may be used during various migrations
if obj._meta.model_name != 'role' or obj._meta.app_label != 'main':
raise Exception(smart_str('{} refers to a {}, not a Role'.format(field, type(obj))))
ret.append(obj.id)
else:
if type(obj) is ManyToManyDescriptor:
for o in obj.all():
ret += resolve_role_field(o, field_components[1])
else:
ret += resolve_role_field(obj, field_components[1])
return ret
def is_implicit_parent(parent_role, child_role):
"""
Determine if the parent_role is an implicit parent as defined by
@@ -210,34 +174,6 @@ def is_implicit_parent(parent_role, child_role):
return False
def update_role_parentage_for_instance(instance):
"""update_role_parentage_for_instance
updates the parents listing for all the roles
of a given instance if they have changed
"""
parents_removed = set()
parents_added = set()
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
cur_role = getattr(instance, implicit_role_field.name)
original_parents = set(json.loads(cur_role.implicit_parents))
new_parents = implicit_role_field._resolve_parent_roles(instance)
removals = original_parents - new_parents
if removals:
cur_role.parents.remove(*list(removals))
parents_removed.add(cur_role.pk)
additions = new_parents - original_parents
if additions:
cur_role.parents.add(*list(additions))
parents_added.add(cur_role.pk)
new_parents_list = list(new_parents)
new_parents_list.sort()
new_parents_json = json.dumps(new_parents_list)
if cur_role.implicit_parents != new_parents_json:
cur_role.implicit_parents = new_parents_json
cur_role.save(update_fields=['implicit_parents'])
return (parents_added, parents_removed)
class ImplicitRoleDescriptor(ForwardManyToOneDescriptor):
pass
@@ -269,65 +205,6 @@ class ImplicitRoleField(models.ForeignKey):
getattr(cls, '__implicit_role_fields').append(self)
post_save.connect(self._post_save, cls, True, dispatch_uid='implicit-role-post-save')
post_delete.connect(self._post_delete, cls, True, dispatch_uid='implicit-role-post-delete')
function = lambda local, related, field: self.bind_m2m_changed(field, related, local)
lazy_related_operation(function, cls, "self", field=self)
def bind_m2m_changed(self, _self, _role_class, cls):
if not self.parent_role:
return
field_names = self.parent_role
if type(field_names) is not list:
field_names = [field_names]
for field_name in field_names:
if field_name.startswith('singleton:'):
continue
field_name, sep, field_attr = field_name.partition('.')
# Non existent fields will occur if ever a parent model is
# moved inside a migration, needed for job_template_organization_field
# migration in particular
# consistency is assured by unit test awx.main.tests.functional
field = getattr(cls, field_name, None)
if field and type(field) is ReverseManyToOneDescriptor or type(field) is ManyToManyDescriptor:
if '.' in field_attr:
raise Exception('Referencing deep roles through ManyToMany fields is unsupported.')
if type(field) is ReverseManyToOneDescriptor:
sender = field.through
else:
sender = field.related.through
reverse = type(field) is ManyToManyDescriptor
m2m_changed.connect(self.m2m_update(field_attr, reverse), sender, weak=False)
def m2m_update(self, field_attr, _reverse):
def _m2m_update(instance, action, model, pk_set, reverse, **kwargs):
if action == 'post_add' or action == 'pre_remove':
if _reverse:
reverse = not reverse
if reverse:
for pk in pk_set:
obj = model.objects.get(pk=pk)
if action == 'post_add':
getattr(instance, field_attr).children.add(getattr(obj, self.name))
if action == 'pre_remove':
getattr(instance, field_attr).children.remove(getattr(obj, self.name))
else:
for pk in pk_set:
obj = model.objects.get(pk=pk)
if action == 'post_add':
getattr(instance, self.name).parents.add(getattr(obj, field_attr))
if action == 'pre_remove':
getattr(instance, self.name).parents.remove(getattr(obj, field_attr))
return _m2m_update
def _post_save(self, instance, created, *args, **kwargs):
Role_ = utils.get_current_apps().get_model('main', 'Role')
@@ -337,68 +214,24 @@ class ImplicitRoleField(models.ForeignKey):
Model = utils.get_current_apps().get_model('main', instance.__class__.__name__)
latest_instance = Model.objects.get(pk=instance.pk)
# Avoid circular import
from awx.main.models.rbac import batch_role_ancestor_rebuilding, Role
# Create any missing role objects
missing_roles = []
for implicit_role_field in getattr(latest_instance.__class__, '__implicit_role_fields'):
cur_role = getattr(latest_instance, implicit_role_field.name, None)
if cur_role is None:
missing_roles.append(Role_(role_field=implicit_role_field.name, content_type_id=ct_id, object_id=latest_instance.id))
with batch_role_ancestor_rebuilding():
# Create any missing role objects
missing_roles = []
for implicit_role_field in getattr(latest_instance.__class__, '__implicit_role_fields'):
cur_role = getattr(latest_instance, implicit_role_field.name, None)
if cur_role is None:
missing_roles.append(Role_(role_field=implicit_role_field.name, content_type_id=ct_id, object_id=latest_instance.id))
if len(missing_roles) > 0:
Role_.objects.bulk_create(missing_roles)
updates = {}
role_ids = []
for role in Role_.objects.filter(content_type_id=ct_id, object_id=latest_instance.id):
setattr(latest_instance, role.role_field, role)
updates[role.role_field] = role.id
role_ids.append(role.id)
type(latest_instance).objects.filter(pk=latest_instance.pk).update(**updates)
if len(missing_roles) > 0:
Role_.objects.bulk_create(missing_roles)
updates = {}
role_ids = []
for role in Role_.objects.filter(content_type_id=ct_id, object_id=latest_instance.id):
setattr(latest_instance, role.role_field, role)
updates[role.role_field] = role.id
role_ids.append(role.id)
type(latest_instance).objects.filter(pk=latest_instance.pk).update(**updates)
Role.rebuild_role_ancestor_list(role_ids, [])
update_role_parentage_for_instance(latest_instance)
instance.refresh_from_db()
def _resolve_parent_roles(self, instance):
if not self.parent_role:
return set()
paths = self.parent_role if type(self.parent_role) is list else [self.parent_role]
parent_roles = set()
for path in paths:
if path.startswith("singleton:"):
singleton_name = path[10:]
Role_ = utils.get_current_apps().get_model('main', 'Role')
qs = Role_.objects.filter(singleton_name=singleton_name)
if qs.count() >= 1:
role = qs[0]
else:
role = Role_.objects.create(singleton_name=singleton_name, role_field=singleton_name)
parents = [role.id]
else:
parents = resolve_role_field(instance, path)
for parent in parents:
parent_roles.add(parent)
return parent_roles
def _post_delete(self, instance, *args, **kwargs):
role_ids = []
for implicit_role_field in getattr(instance.__class__, '__implicit_role_fields'):
role_ids.append(getattr(instance, implicit_role_field.name + '_id'))
Role_ = utils.get_current_apps().get_model('main', 'Role')
child_ids = [x for x in Role_.parents.through.objects.filter(to_role_id__in=role_ids).distinct().values_list('from_role_id', flat=True)]
Role_.objects.filter(id__in=role_ids).delete()
# Avoid circular import
from awx.main.models.rbac import Role
Role.rebuild_role_ancestor_list([], child_ids)
instance.refresh_from_db()
class SmartFilterField(models.TextField):

View File

@@ -10,6 +10,11 @@ def setup_tower_managed_defaults(apps, schema_editor):
CredentialType.setup_tower_managed_defaults(apps)
def setup_rbac_role_system_administrator(apps, schema_editor):
Role = apps.get_model('main', 'Role')
Role.objects.get_or_create(singleton_name='system_administrator', role_field='system_administrator')
class Migration(migrations.Migration):
dependencies = [
('main', '0200_template_name_constraint'),
@@ -17,4 +22,5 @@ class Migration(migrations.Migration):
operations = [
migrations.RunPython(setup_tower_managed_defaults),
migrations.RunPython(setup_rbac_role_system_administrator),
]

View File

@@ -3,7 +3,6 @@ from time import time
from django.db.models import Subquery, OuterRef, F
from awx.main.fields import update_role_parentage_for_instance
from awx.main.models.rbac import Role, batch_role_ancestor_rebuilding
logger = logging.getLogger('rbac_migrations')
@@ -238,85 +237,10 @@ def restore_inventory_admins_backward(apps, schema_editor):
def rebuild_role_hierarchy(apps, schema_editor):
"""
This should be called in any migration when ownerships are changed.
Ex. I remove a user from the admin_role of a credential.
Ancestors are cached from parents for performance, this re-computes ancestors.
"""
logger.info('Computing role roots..')
start = time()
roots = Role.objects.all().values_list('id', flat=True)
stop = time()
logger.info('Found %d roots in %f seconds, rebuilding ancestry map' % (len(roots), stop - start))
start = time()
Role.rebuild_role_ancestor_list(roots, [])
stop = time()
logger.info('Rebuild ancestors completed in %f seconds' % (stop - start))
logger.info('Done.')
"""Not used after DAB RBAC migration"""
pass
def rebuild_role_parentage(apps, schema_editor, models=None):
"""
This should be called in any migration when any parent_role entry
is modified so that the cached parent fields will be updated. Ex:
foo_role = ImplicitRoleField(
parent_role=['bar_role'] # change to parent_role=['admin_role']
)
This is like rebuild_role_hierarchy, but that method updates ancestors,
whereas this method updates parents.
"""
start = time()
seen_models = set()
model_ct = 0
noop_ct = 0
ContentType = apps.get_model('contenttypes', "ContentType")
additions = set()
removals = set()
role_qs = Role.objects
if models:
# update_role_parentage_for_instance is expensive
# if the models have been downselected, ignore those which are not in the list
ct_ids = list(ContentType.objects.filter(model__in=[name.lower() for name in models]).values_list('id', flat=True))
role_qs = role_qs.filter(content_type__in=ct_ids)
for role in role_qs.iterator():
if not role.object_id:
continue
model_tuple = (role.content_type_id, role.object_id)
if model_tuple in seen_models:
continue
seen_models.add(model_tuple)
# The GenericForeignKey does not work right in migrations
# with the usage as role.content_object
# so we do the lookup ourselves with current migration models
ct = role.content_type
app = ct.app_label
ct_model = apps.get_model(app, ct.model)
content_object = ct_model.objects.get(pk=role.object_id)
parents_added, parents_removed = update_role_parentage_for_instance(content_object)
additions.update(parents_added)
removals.update(parents_removed)
if parents_added:
model_ct += 1
logger.debug('Added to parents of roles {} of {}'.format(parents_added, content_object))
if parents_removed:
model_ct += 1
logger.debug('Removed from parents of roles {} of {}'.format(parents_removed, content_object))
else:
noop_ct += 1
logger.debug('No changes to role parents for {} resources'.format(noop_ct))
logger.debug('Added parents to {} roles'.format(len(additions)))
logger.debug('Removed parents from {} roles'.format(len(removals)))
if model_ct:
logger.info('Updated implicit parents of {} resources'.format(model_ct))
logger.info('Rebuild parentage completed in %f seconds' % (time() - start))
# this is ran because the ordinary signals for
# Role.parents.add and Role.parents.remove not called in migration
Role.rebuild_role_ancestor_list(list(additions), list(removals))
"""Not used after DAB RBAC migration"""
pass

View File

@@ -53,8 +53,8 @@ class GrafanaBackend(AWXBaseEmailBackend, CustomNotificationBase):
):
super(GrafanaBackend, self).__init__(fail_silently=fail_silently)
self.grafana_key = grafana_key
self.dashboardId = int(dashboardId) if dashboardId is not None and panelId != "" else None
self.panelId = int(panelId) if panelId is not None and panelId != "" else None
self.dashboardId = int(dashboardId) if dashboardId != '' else None
self.panelId = int(panelId) if panelId != '' else None
self.annotation_tags = annotation_tags if annotation_tags is not None else []
self.grafana_no_verify_ssl = grafana_no_verify_ssl
self.isRegion = isRegion

View File

@@ -38,7 +38,6 @@ from awx.main.models import (
InventorySource,
Job,
JobHostSummary,
JobTemplate,
Organization,
Project,
Role,
@@ -56,10 +55,7 @@ from awx.main.models import (
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, get_current_apps
from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates
from awx.main.tasks.system import update_inventory_computed_fields, handle_removed_image
from awx.main.fields import (
is_implicit_parent,
update_role_parentage_for_instance,
)
from awx.main.fields import is_implicit_parent
from awx.main import consumers
@@ -192,31 +188,6 @@ def cleanup_detached_labels_on_deleted_parent(sender, instance, **kwargs):
label.delete()
def save_related_job_templates(sender, instance, **kwargs):
"""save_related_job_templates loops through all of the
job templates that use an Inventory that have had their
Organization updated. This triggers the rebuilding of the RBAC hierarchy
and ensures the proper access restrictions.
"""
if sender is not Inventory:
raise ValueError('This signal callback is only intended for use with Project or Inventory')
update_fields = kwargs.get('update_fields', None)
if (update_fields and not ('organization' in update_fields or 'organization_id' in update_fields)) or kwargs.get('created', False):
return
if instance._prior_values_store.get('organization_id') != instance.organization_id:
jtq = JobTemplate.objects.filter(**{sender.__name__.lower(): instance})
for jt in jtq:
parents_added, parents_removed = update_role_parentage_for_instance(jt)
if parents_added or parents_removed:
logger.info(
'Permissions on JT {} changed due to inventory {} organization change from {} to {}.'.format(
jt.pk, instance.pk, instance._prior_values_store.get('organization_id'), instance.organization_id
)
)
def connect_computed_field_signals():
post_save.connect(emit_update_inventory_on_created_or_deleted, sender=Host)
post_delete.connect(emit_update_inventory_on_created_or_deleted, sender=Host)
@@ -230,7 +201,6 @@ def connect_computed_field_signals():
connect_computed_field_signals()
post_save.connect(save_related_job_templates, sender=Inventory)
m2m_changed.connect(rebuild_role_ancestor_list, Role.parents.through)
m2m_changed.connect(rbac_activity_stream, Role.members.through)
m2m_changed.connect(rbac_activity_stream, Role.parents.through)

View File

@@ -287,6 +287,72 @@ def test_sa_grant_private_credential_to_team_through_role_teams(post, credential
assert response.status_code == 400
@pytest.mark.django_db
def test_grant_credential_to_team_different_organization_through_role_teams(post, get, credential, organizations, admin, org_admin, team, team_member):
# # Test that credential from different org can be assigned to team by a superuser through role_teams_list endpoint
orgs = organizations(2)
credential.organization = orgs[0]
credential.save()
team.organization = orgs[1]
team.save()
# Non-superuser (org_admin) trying cross-org assignment should be denied
response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {'id': team.id}, org_admin)
assert response.status_code == 400
assert (
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
in response.data['msg']
)
# Superuser (admin) can do cross-org assignment
response = post(reverse('api:role_teams_list', kwargs={'pk': credential.use_role.id}), {'id': team.id}, admin)
assert response.status_code == 204
assert credential.use_role in team.member_role.children.all()
assert team_member in credential.read_role
assert team_member in credential.use_role
assert team_member not in credential.admin_role
@pytest.mark.django_db
def test_grant_credential_to_team_different_organization(post, get, credential, organizations, admin, org_admin, team, team_member):
# Test that credential from different org can be assigned to team by a superuser
orgs = organizations(2)
credential.organization = orgs[0]
credential.save()
team.organization = orgs[1]
team.save()
# Non-superuser (org_admin, ...) trying cross-org assignment should be denied
response = post(reverse('api:team_roles_list', kwargs={'pk': team.id}), {'id': credential.use_role.id}, org_admin)
assert response.status_code == 400
assert (
"You cannot grant a team access to a credential in a different organization. Only superusers can grant cross-organization credential access to teams"
in response.data['msg']
)
# Superuser (system admin) can do cross-org assignment
response = post(reverse('api:team_roles_list', kwargs={'pk': team.id}), {'id': credential.use_role.id}, admin)
assert response.status_code == 204
assert credential.use_role in team.member_role.children.all()
assert team_member in credential.read_role
assert team_member in credential.use_role
assert team_member not in credential.admin_role
# Team member can see the credential in API
response = get(reverse('api:team_credentials_list', kwargs={'pk': team.id}), team_member)
assert response.status_code == 200
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == credential.id
# Team member can see the credential in general credentials API
response = get(reverse('api:credential_list'), team_member)
assert response.status_code == 200
assert any(cred['id'] == credential.id for cred in response.data['results'])
@pytest.mark.django_db
def test_sa_grant_private_credential_to_team_through_team_roles(post, credential, admin, team):
# not even a system admin can grant a private cred to a team though

View File

@@ -0,0 +1,244 @@
from unittest.mock import patch, MagicMock
import pytest
from awx.api.versioning import reverse
from rest_framework import status
@pytest.mark.django_db
class TestApiV2SubscriptionView:
"""Test cases for the /api/v2/config/subscriptions/ endpoint"""
def test_basic_auth(self, post, admin):
"""Test POST with subscriptions_username and subscriptions_password calls validate_rh with basic_auth=True"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_user', 'test_password', True)
def test_service_account(self, post, admin):
"""Test POST with subscriptions_client_id and subscriptions_client_secret calls validate_rh with basic_auth=False"""
data = {'subscriptions_client_id': 'test_client_id', 'subscriptions_client_secret': 'test_client_secret'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'test_client_secret', False)
def test_encrypted_password_basic_auth(self, post, admin, settings):
"""Test POST with $encrypted$ password uses settings value for basic auth"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': '$encrypted$'}
settings.SUBSCRIPTIONS_PASSWORD = 'actual_password_from_settings'
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_user', 'actual_password_from_settings', True)
def test_encrypted_client_secret_service_account(self, post, admin, settings):
"""Test POST with $encrypted$ client_secret uses settings value for service_account"""
data = {'subscriptions_client_id': 'test_client_id', 'subscriptions_client_secret': '$encrypted$'}
settings.SUBSCRIPTIONS_CLIENT_SECRET = 'actual_secret_from_settings'
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'actual_secret_from_settings', False)
def test_missing_username_returns_error(self, post, admin):
"""Test POST with missing username returns 400 error"""
data = {'subscriptions_password': 'test_password'}
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_missing_password_returns_error(self, post, admin, settings):
"""Test POST with missing password returns 400 error"""
data = {'subscriptions_username': 'test_user'}
settings.SUBSCRIPTIONS_PASSWORD = None
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_missing_client_id_returns_error(self, post, admin):
"""Test POST with missing client_id returns 400 error"""
data = {'subscriptions_client_secret': 'test_secret'}
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_missing_client_secret_returns_error(self, post, admin, settings):
"""Test POST with missing client_secret returns 400 error"""
data = {'subscriptions_client_id': 'test_client_id'}
settings.SUBSCRIPTIONS_CLIENT_SECRET = None
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_empty_username_returns_error(self, post, admin):
"""Test POST with empty username returns 400 error"""
data = {'subscriptions_username': '', 'subscriptions_password': 'test_password'}
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_empty_password_returns_error(self, post, admin, settings):
"""Test POST with empty password returns 400 error"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': ''}
settings.SUBSCRIPTIONS_PASSWORD = None
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert 'Missing subscription credentials' in response.data['error']
def test_non_superuser_permission_denied(self, post, rando):
"""Test that non-superuser cannot access the endpoint"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'}
response = post(reverse('api:api_v2_subscription_view'), data, rando)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_settings_updated_on_successful_basic_auth(self, post, admin, settings):
"""Test that settings are updated when basic auth validation succeeds"""
data = {'subscriptions_username': 'new_username', 'subscriptions_password': 'new_password'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
assert settings.SUBSCRIPTIONS_USERNAME == 'new_username'
assert settings.SUBSCRIPTIONS_PASSWORD == 'new_password'
def test_settings_updated_on_successful_service_account(self, post, admin, settings):
"""Test that settings are updated when service account validation succeeds"""
data = {'subscriptions_client_id': 'new_client_id', 'subscriptions_client_secret': 'new_client_secret'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
assert settings.SUBSCRIPTIONS_CLIENT_ID == 'new_client_id'
assert settings.SUBSCRIPTIONS_CLIENT_SECRET == 'new_client_secret'
def test_validate_rh_exception_handling(self, post, admin):
"""Test that exceptions from validate_rh are properly handled"""
data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.side_effect = Exception("Connection error")
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_mixed_credentials_prioritizes_client_id(self, post, admin):
"""Test that when both username and client_id are provided, client_id takes precedence"""
data = {
'subscriptions_username': 'test_user',
'subscriptions_password': 'test_password',
'subscriptions_client_id': 'test_client_id',
'subscriptions_client_secret': 'test_client_secret',
}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
# Should use service account (basic_auth=False) since client_id is present
mock_licenser.validate_rh.assert_called_once_with('test_client_id', 'test_client_secret', False)
def test_basic_auth_clears_service_account_settings(self, post, admin, settings):
"""Test that setting basic auth credentials clears service account settings"""
# Pre-populate service account settings
settings.SUBSCRIPTIONS_CLIENT_ID = 'existing_client_id'
settings.SUBSCRIPTIONS_CLIENT_SECRET = 'existing_client_secret'
data = {'subscriptions_username': 'test_user', 'subscriptions_password': 'test_password'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
# Basic auth settings should be set
assert settings.SUBSCRIPTIONS_USERNAME == 'test_user'
assert settings.SUBSCRIPTIONS_PASSWORD == 'test_password'
# Service account settings should be cleared
assert settings.SUBSCRIPTIONS_CLIENT_ID == ""
assert settings.SUBSCRIPTIONS_CLIENT_SECRET == ""
def test_service_account_clears_basic_auth_settings(self, post, admin, settings):
"""Test that setting service account credentials clears basic auth settings"""
# Pre-populate basic auth settings
settings.SUBSCRIPTIONS_USERNAME = 'existing_username'
settings.SUBSCRIPTIONS_PASSWORD = 'existing_password'
data = {'subscriptions_client_id': 'test_client_id', 'subscriptions_client_secret': 'test_client_secret'}
with patch('awx.api.views.root.get_licenser') as mock_get_licenser:
mock_licenser = MagicMock()
mock_licenser.validate_rh.return_value = []
mock_get_licenser.return_value = mock_licenser
response = post(reverse('api:api_v2_subscription_view'), data, admin)
assert response.status_code == status.HTTP_200_OK
# Service account settings should be set
assert settings.SUBSCRIPTIONS_CLIENT_ID == 'test_client_id'
assert settings.SUBSCRIPTIONS_CLIENT_SECRET == 'test_client_secret'
# Basic auth settings should be cleared
assert settings.SUBSCRIPTIONS_USERNAME == ""
assert settings.SUBSCRIPTIONS_PASSWORD == ""

View File

@@ -387,36 +387,6 @@ def test_remove_team_from_role(post, team, admin, role):
assert role.parents.filter(id=team.member_role.id).count() == 0
#
# /roles/<id>/parents/
#
@pytest.mark.django_db
def test_role_parents(get, team, admin, role):
role.parents.add(team.member_role)
url = reverse('api:role_parents_list', kwargs={'pk': role.id})
response = get(url, admin)
assert response.status_code == 200
assert response.data['count'] == 1
assert response.data['results'][0]['id'] == team.member_role.id
#
# /roles/<id>/children/
#
@pytest.mark.django_db
def test_role_children(get, team, admin, role):
role.parents.add(team.member_role)
url = reverse('api:role_children_list', kwargs={'pk': team.member_role.id})
response = get(url, admin)
assert response.status_code == 200
assert response.data['count'] == 2
assert response.data['results'][0]['id'] == role.id or response.data['results'][1]['id'] == role.id
#
# Generics
#

View File

@@ -167,3 +167,9 @@ class TestMigrationSmoke:
assert CredentialType.objects.filter(
name=expected_name
).exists(), f'Could not find {expected_name} credential type name, all names: {list(CredentialType.objects.values_list("name", flat=True))}'
# Verify the system_administrator role exists
Role = new_state.apps.get_model('main', 'Role')
assert Role.objects.filter(
singleton_name='system_administrator', role_field='system_administrator'
).exists(), "expected to find a system_administrator singleton role"

View File

@@ -13,7 +13,7 @@ def test_send_messages():
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey")
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='')
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
@@ -43,7 +43,7 @@ def test_send_messages_with_no_verify_ssl():
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", grafana_no_verify_ssl=True)
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='', grafana_no_verify_ssl=True)
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
@@ -74,7 +74,7 @@ def test_send_messages_with_dashboardid(dashboardId):
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=dashboardId)
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=dashboardId, panelId='')
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
@@ -97,7 +97,7 @@ def test_send_messages_with_dashboardid(dashboardId):
assert sent_messages == 1
@pytest.mark.parametrize("panelId", [42, 0])
@pytest.mark.parametrize("panelId", ['42', '0'])
def test_send_messages_with_panelid(panelId):
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
@@ -105,7 +105,7 @@ def test_send_messages_with_panelid(panelId):
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=None, panelId=panelId)
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId=panelId)
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
@@ -122,7 +122,7 @@ def test_send_messages_with_panelid(panelId):
requests_mock.post.assert_called_once_with(
'https://example.com/api/annotations',
headers={'Content-Type': 'application/json', 'Authorization': 'Bearer testapikey'},
json={'text': 'test subject', 'isRegion': True, 'timeEnd': 120000, 'panelId': panelId, 'time': 60000},
json={'text': 'test subject', 'isRegion': True, 'timeEnd': 120000, 'panelId': int(panelId), 'time': 60000},
verify=True,
)
assert sent_messages == 1
@@ -135,7 +135,7 @@ def test_send_messages_with_bothids():
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=42, panelId=42)
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='42', panelId='42')
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
@@ -158,6 +158,36 @@ def test_send_messages_with_bothids():
assert sent_messages == 1
def test_send_messages_with_emptyids():
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
m = {}
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='')
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},
[],
[
'https://example.com',
],
)
sent_messages = backend.send_messages(
[
message,
]
)
requests_mock.post.assert_called_once_with(
'https://example.com/api/annotations',
headers={'Content-Type': 'application/json', 'Authorization': 'Bearer testapikey'},
json={'text': 'test subject', 'isRegion': True, 'timeEnd': 120000, 'time': 60000},
verify=True,
)
assert sent_messages == 1
def test_send_messages_with_tags():
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
@@ -165,7 +195,7 @@ def test_send_messages_with_tags():
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=None, panelId=None, annotation_tags=["ansible"])
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='', annotation_tags=["ansible"])
message = EmailMessage(
m['subject'],
{"started": m['started'], "finished": m['finished']},

View File

@@ -1,37 +0,0 @@
import json
from http import HTTPStatus
from unittest.mock import patch
from requests import Response
from awx.main.utils.licensing import Licenser
def test_rhsm_licensing():
def mocked_requests_get(*args, **kwargs):
assert kwargs['verify'] == True
response = Response()
subs = json.dumps({'body': []})
response.status_code = HTTPStatus.OK
response._content = bytes(subs, 'utf-8')
return response
licenser = Licenser()
with patch('awx.main.utils.analytics_proxy.OIDCClient.make_request', new=mocked_requests_get):
subs = licenser.get_rhsm_subs('localhost', 'admin', 'admin')
assert subs == []
def test_satellite_licensing():
def mocked_requests_get(*args, **kwargs):
assert kwargs['verify'] == True
response = Response()
subs = json.dumps({'results': []})
response.status_code = HTTPStatus.OK
response._content = bytes(subs, 'utf-8')
return response
licenser = Licenser()
with patch('requests.get', new=mocked_requests_get):
subs = licenser.get_satellite_subs('localhost', 'admin', 'admin')
assert subs == []

View File

@@ -0,0 +1,154 @@
from unittest.mock import patch
from awx.main.utils.licensing import Licenser
def test_validate_rh_basic_auth_rhsm():
"""
Assert get_rhsm_subs is called when
- basic_auth=True
- host is subscription.rhsm.redhat.com
"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com') as mock_get_host, patch.object(
licenser, 'get_rhsm_subs', return_value=[]
) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs') as mock_get_satellite, patch.object(
licenser, 'get_crc_subs'
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
licenser.validate_rh('testuser', 'testpass', basic_auth=True)
# Assert the correct methods were called
mock_get_host.assert_called_once()
mock_get_rhsm.assert_called_once_with('https://subscription.rhsm.redhat.com', 'testuser', 'testpass')
mock_get_satellite.assert_not_called()
mock_get_crc.assert_not_called()
mock_generate.assert_called_once_with([], is_candlepin=True)
def test_validate_rh_basic_auth_satellite():
"""
Assert get_satellite_subs is called when
- basic_auth=True
- custom satellite host
"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://satellite.example.com') as mock_get_host, patch.object(
licenser, 'get_rhsm_subs'
) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs', return_value=[]) as mock_get_satellite, patch.object(
licenser, 'get_crc_subs'
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
licenser.validate_rh('testuser', 'testpass', basic_auth=True)
# Assert the correct methods were called
mock_get_host.assert_called_once()
mock_get_rhsm.assert_not_called()
mock_get_satellite.assert_called_once_with('https://satellite.example.com', 'testuser', 'testpass')
mock_get_crc.assert_not_called()
mock_generate.assert_called_once_with([], is_candlepin=True)
def test_validate_rh_service_account_crc():
"""
Assert get_crc_subs is called when
- basic_auth=False
"""
licenser = Licenser()
with patch('awx.main.utils.licensing.settings') as mock_settings, patch.object(licenser, 'get_host_from_rhsm_config') as mock_get_host, patch.object(
licenser, 'get_rhsm_subs'
) as mock_get_rhsm, patch.object(licenser, 'get_satellite_subs') as mock_get_satellite, patch.object(
licenser, 'get_crc_subs', return_value=[]
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
mock_settings.SUBSCRIPTIONS_RHSM_URL = 'https://console.redhat.com/api/rhsm/v1/subscriptions'
licenser.validate_rh('client_id', 'client_secret', basic_auth=False)
# Assert the correct methods were called
mock_get_host.assert_not_called()
mock_get_rhsm.assert_not_called()
mock_get_satellite.assert_not_called()
mock_get_crc.assert_called_once_with('https://console.redhat.com/api/rhsm/v1/subscriptions', 'client_id', 'client_secret')
mock_generate.assert_called_once_with([], is_candlepin=False)
def test_validate_rh_missing_user_raises_error():
"""Test validate_rh raises ValueError when user is missing"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'):
try:
licenser.validate_rh(None, 'testpass', basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_id or subscriptions_username is required' in str(e)
def test_validate_rh_missing_password_raises_error():
"""Test validate_rh raises ValueError when password is missing"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'):
try:
licenser.validate_rh('testuser', None, basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_secret or subscriptions_password is required' in str(e)
def test_validate_rh_no_host_fallback_to_candlepin():
"""Test validate_rh falls back to REDHAT_CANDLEPIN_HOST when no host from config
- basic_auth=True
- no host from config
- REDHAT_CANDLEPIN_HOST is set
"""
licenser = Licenser()
with patch('awx.main.utils.licensing.settings') as mock_settings, patch.object(
licenser, 'get_host_from_rhsm_config', return_value=None
) as mock_get_host, patch.object(licenser, 'get_rhsm_subs', return_value=[]) as mock_get_rhsm, patch.object(
licenser, 'get_satellite_subs', return_value=[]
) as mock_get_satellite, patch.object(
licenser, 'get_crc_subs'
) as mock_get_crc, patch.object(
licenser, 'generate_license_options_from_entitlements'
) as mock_generate:
mock_settings.REDHAT_CANDLEPIN_HOST = 'https://candlepin.example.com'
licenser.validate_rh('testuser', 'testpass', basic_auth=True)
# Assert the correct methods were called
mock_get_host.assert_called_once()
mock_get_rhsm.assert_not_called()
mock_get_satellite.assert_called_once_with('https://candlepin.example.com', 'testuser', 'testpass')
mock_get_crc.assert_not_called()
mock_generate.assert_called_once_with([], is_candlepin=True)
def test_validate_rh_empty_credentials_basic_auth():
"""Test validate_rh with empty string credentials raises ValueError"""
licenser = Licenser()
with patch.object(licenser, 'get_host_from_rhsm_config', return_value='https://subscription.rhsm.redhat.com'):
# Test empty user
try:
licenser.validate_rh(None, 'testpass', basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_id or subscriptions_username is required' in str(e)
# Test empty password
try:
licenser.validate_rh('testuser', None, basic_auth=True)
assert False, "Expected ValueError to be raised"
except ValueError as e:
assert 'subscriptions_client_secret or subscriptions_password is required' in str(e)

View File

@@ -219,30 +219,65 @@ class Licenser(object):
kwargs['license_date'] = int(kwargs['license_date'])
self._attrs.update(kwargs)
def validate_rh(self, user, pw):
def get_host_from_rhsm_config(self):
try:
host = 'https://' + str(self.config.get("server", "hostname"))
except Exception:
logger.exception('Cannot access rhsm.conf, make sure subscription manager is installed and configured.')
host = None
return host
def validate_rh(self, user, pw, basic_auth):
# if basic auth is True, host is read from rhsm.conf (subscription.rhsm.redhat.com)
# if basic auth is False, host is settings.SUBSCRIPTIONS_RHSM_URL (console.redhat.com)
# if rhsm.conf is not found, host is settings.REDHAT_CANDLEPIN_HOST (satellite server)
if basic_auth:
host = self.get_host_from_rhsm_config()
if not host:
host = getattr(settings, 'REDHAT_CANDLEPIN_HOST', None)
else:
host = settings.SUBSCRIPTIONS_RHSM_URL
if not host:
host = getattr(settings, 'REDHAT_CANDLEPIN_HOST', None)
raise ValueError('Could not get host url for subscriptions')
if not user:
raise ValueError('subscriptions_client_id is required')
raise ValueError('subscriptions_client_id or subscriptions_username is required')
if not pw:
raise ValueError('subscriptions_client_secret is required')
raise ValueError('subscriptions_client_secret or subscriptions_password is required')
if host and user and pw:
if 'subscription.rhsm.redhat.com' in host:
json = self.get_rhsm_subs(settings.SUBSCRIPTIONS_RHSM_URL, user, pw)
if basic_auth:
if 'subscription.rhsm.redhat.com' in host:
json = self.get_rhsm_subs(host, user, pw)
else:
json = self.get_satellite_subs(host, user, pw)
else:
json = self.get_satellite_subs(host, user, pw)
return self.generate_license_options_from_entitlements(json)
json = self.get_crc_subs(host, user, pw)
return self.generate_license_options_from_entitlements(json, is_candlepin=basic_auth)
return []
def get_rhsm_subs(self, host, client_id, client_secret):
def get_rhsm_subs(self, host, user, pw):
verify = getattr(settings, 'REDHAT_CANDLEPIN_VERIFY', True)
json = []
try:
subs = requests.get('/'.join([host, 'subscription/users/{}/owners'.format(user)]), verify=verify, auth=(user, pw))
except requests.exceptions.ConnectionError as error:
raise error
except OSError as error:
raise OSError(
'Unable to open certificate bundle {}. Check that the service is running on Red Hat Enterprise Linux.'.format(verify)
) from error # noqa
subs.raise_for_status()
for sub in subs.json():
resp = requests.get('/'.join([host, 'subscription/owners/{}/pools/?match=*tower*'.format(sub['key'])]), verify=verify, auth=(user, pw))
resp.raise_for_status()
json.extend(resp.json())
return json
def get_crc_subs(self, host, client_id, client_secret):
try:
client = OIDCClient(client_id, client_secret)
subs = client.make_request(
@@ -320,12 +355,21 @@ class Licenser(object):
json.append(license)
return json
def is_appropriate_sub(self, sub):
if sub['activeSubscription'] is False:
return False
# Products that contain Ansible Tower
products = sub.get('providedProducts', [])
if any(product.get('productId') == '480' for product in products):
return True
return False
def is_appropriate_sat_sub(self, sub):
if 'Red Hat Ansible Automation' not in sub['subscription_name']:
return False
return True
def generate_license_options_from_entitlements(self, json):
def generate_license_options_from_entitlements(self, json, is_candlepin=False):
from dateutil.parser import parse
ValidSub = collections.namedtuple(
@@ -336,12 +380,14 @@ class Licenser(object):
satellite = sub.get('satellite')
if satellite:
is_valid = self.is_appropriate_sat_sub(sub)
elif is_candlepin:
is_valid = self.is_appropriate_sub(sub)
else:
# the list of subs from console.redhat.com are already valid based on the query params we provided
# the list of subs from console.redhat.com and subscriptions.rhsm.redhat.com are already valid based on the query params we provided
is_valid = True
if is_valid:
try:
if satellite:
if is_candlepin:
end_date = parse(sub.get('endDate'))
else:
end_date = parse(sub['subscriptions']['endDate'])
@@ -354,10 +400,10 @@ class Licenser(object):
continue
developer_license = False
support_level = ''
support_level = sub.get('support_level', '')
account_number = ''
usage = sub.get('usage', '')
if satellite:
if is_candlepin:
try:
quantity = int(sub['quantity'])
except Exception:
@@ -365,7 +411,6 @@ class Licenser(object):
sku = sub['productId']
subscription_id = sub['subscriptionId']
sub_name = sub['productName']
support_level = sub['support_level']
account_number = sub['accountNumber']
else:
try:
@@ -434,6 +479,8 @@ class Licenser(object):
license.update(subscription_id=sub.subscription_id)
license.update(account_number=sub.account_number)
licenses.append(license._attrs.copy())
# sort by sku
licenses.sort(key=lambda x: x['sku'])
return licenses
raise ValueError('No valid Red Hat Ansible Automation subscription could be found for this account.') # noqa

View File

@@ -19,18 +19,27 @@ short_description: Get subscription list
description:
- Get subscriptions available to Automation Platform Controller. See
U(https://www.ansible.com/tower) for an overview.
- The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions
options:
username:
description:
- Red Hat username to get available subscriptions.
required: False
type: str
password:
description:
- Red Hat password to get available subscriptions.
required: False
type: str
client_id:
description:
- Red Hat service account client ID or Red Hat Satellite username to get available subscriptions.
- The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions
required: True
- Red Hat service account client ID to get available subscriptions.
required: False
type: str
client_secret:
description:
- Red Hat service account client secret or Red Hat Satellite password to get available subscriptions.
- The credentials you use will be stored for future use in retrieving renewal or expanded subscriptions
required: True
- Red Hat service account client secret to get available subscriptions.
required: False
type: str
filters:
description:
@@ -72,19 +81,41 @@ def main():
module = ControllerAPIModule(
argument_spec=dict(
client_id=dict(type='str', required=True),
client_secret=dict(type='str', no_log=True, required=True),
username=dict(type='str', required=False),
password=dict(type='str', no_log=True, required=False),
client_id=dict(type='str', required=False),
client_secret=dict(type='str', no_log=True, required=False),
filters=dict(type='dict', required=False, default={}),
),
mutually_exclusive=[
['username', 'client_id']
],
required_together=[
['username', 'password'],
['client_id', 'client_secret']
],
required_one_of=[
['username', 'client_id']
],
)
json_output = {'changed': False}
username = module.params.get('username')
password = module.params.get('password')
client_id = module.params.get('client_id')
client_secret = module.params.get('client_secret')
if username and password:
post_data = {
'subscriptions_username': username,
'subscriptions_password': password,
}
else:
post_data = {
'subscriptions_client_id': client_id,
'subscriptions_client_secret': client_secret,
}
# Check if Tower is already licensed
post_data = {
'subscriptions_client_secret': module.params.get('client_secret'),
'subscriptions_client_id': module.params.get('client_id'),
}
all_subscriptions = module.post_endpoint('config/subscriptions', data=post_data)['json']
json_output['subscriptions'] = []
for subscription in all_subscriptions:

View File

@@ -82,7 +82,38 @@ class CLI(object):
return '--help' in self.argv or '-h' in self.argv
def authenticate(self):
"""Configure the current session for basic auth"""
"""Configure the current session for authentication.
Uses Basic authentication when AWXKIT_FORCE_BASIC_AUTH environment variable
is set to true, otherwise defaults to session-based authentication.
For AAP Gateway environments, set AWXKIT_FORCE_BASIC_AUTH=true to bypass
session login restrictions.
"""
# Check if Basic auth is forced via environment variable
if config.get('force_basic_auth', False):
config.use_sessions = False
# Validate credentials are provided
username = self.get_config('username')
password = self.get_config('password')
if not username or not password:
raise ValueError(
"Basic authentication requires both username and password. "
"Provide --conf.username and --conf.password or set "
"CONTROLLER_USERNAME and CONTROLLER_PASSWORD environment variables."
)
# Apply Basic auth credentials to the session
try:
self.root.connection.login(username, password)
self.root.get()
except Exception as e:
raise RuntimeError(f"Basic authentication failed: {str(e)}. " "Verify credentials and network connectivity.") from e
return
# Use session-based authentication (default)
config.use_sessions = True
self.root.load_session().get()

View File

@@ -32,6 +32,7 @@ config.assume_untrusted = config.get('assume_untrusted', True)
config.client_connection_attempts = int(os.getenv('AWXKIT_CLIENT_CONNECTION_ATTEMPTS', 5))
config.prevent_teardown = to_bool(os.getenv('AWXKIT_PREVENT_TEARDOWN', False))
config.use_sessions = to_bool(os.getenv('AWXKIT_SESSIONS', False))
config.force_basic_auth = to_bool(os.getenv('AWXKIT_FORCE_BASIC_AUTH', False))
config.api_base_path = os.getenv('CONTROLLER_OPTIONAL_API_URLPATTERN_PREFIX', '/api/')
config.api_base_path = os.getenv('AWXKIT_API_BASE_PATH', config.api_base_path)
config.gateway_base_path = os.getenv('AWXKIT_GATEWAY_BASE_PATH', '/api/gateway/')

View File

@@ -0,0 +1,103 @@
import pytest
from typing import Tuple, List, Optional
from unittest.mock import Mock
from awxkit.cli import CLI
from awxkit import config
@pytest.fixture(autouse=True)
def reset_config_state(monkeypatch: pytest.MonkeyPatch) -> None:
"""Ensure clean config state for each test to prevent parallel test interference"""
monkeypatch.setattr(config, 'force_basic_auth', False, raising=False)
monkeypatch.setattr(config, 'use_sessions', False, raising=False)
# ============================================================================
# Test Helper Functions
# ============================================================================
def setup_basic_auth(cli_args: Optional[List[str]] = None) -> Tuple[CLI, Mock, Mock]:
"""Set up CLI with mocked connection for Basic auth testing"""
cli = CLI()
cli.parse_args(cli_args or ['awx', '--conf.username', 'testuser', '--conf.password', 'testpass'])
mock_root = Mock()
mock_connection = Mock()
mock_root.connection = mock_connection
cli.root = mock_root
return cli, mock_root, mock_connection
def setup_session_auth(cli_args: Optional[List[str]] = None) -> Tuple[CLI, Mock, Mock]:
"""Set up CLI with mocked session for Session auth testing"""
cli = CLI()
cli.parse_args(cli_args or ['awx', '--conf.username', 'testuser', '--conf.password', 'testpass'])
mock_root = Mock()
mock_load_session = Mock()
mock_root.load_session.return_value = mock_load_session
cli.root = mock_root
return cli, mock_root, mock_load_session
def test_basic_auth_enabled(monkeypatch):
"""Test that AWXKIT_FORCE_BASIC_AUTH=true enables Basic authentication"""
cli, mock_root, mock_connection = setup_basic_auth()
monkeypatch.setattr(config, 'force_basic_auth', True)
cli.authenticate()
mock_connection.login.assert_called_once_with('testuser', 'testpass')
mock_root.get.assert_called_once()
assert not config.use_sessions
def test_session_auth_default(monkeypatch):
"""Test that session auth is used by default (backward compatibility)"""
cli, mock_root, mock_load_session = setup_session_auth()
monkeypatch.setattr(config, 'force_basic_auth', False)
cli.authenticate()
mock_root.load_session.assert_called_once()
mock_load_session.get.assert_called_once()
assert config.use_sessions
def test_aap_gateway_scenario(monkeypatch):
"""Test the specific AAP Gateway scenario from AAP-46830"""
cli, mock_root, mock_connection = setup_basic_auth(
['awx', '--conf.host', 'https://aap-sbx.cambiahealth.com', '--conf.username', 'puretest', '--conf.password', 'testpass']
)
monkeypatch.setattr(config, 'force_basic_auth', True)
cli.authenticate()
mock_connection.login.assert_called_once_with('puretest', 'testpass')
mock_root.get.assert_called_once()
assert not config.use_sessions
def test_empty_credentials_error(monkeypatch):
"""Test error handling for explicitly empty credentials"""
cli, mock_root, mock_connection = setup_basic_auth(['awx', '--conf.username', '', '--conf.password', ''])
monkeypatch.setattr(config, 'force_basic_auth', True)
with pytest.raises(ValueError, match="Basic authentication requires both username and password"):
cli.authenticate()
mock_connection.login.assert_not_called()
def test_connection_failure(monkeypatch):
"""Test error handling when Basic auth connection fails"""
cli, mock_root, mock_connection = setup_basic_auth()
mock_connection.login.side_effect = Exception("Connection failed")
monkeypatch.setattr(config, 'force_basic_auth', True)
with pytest.raises(RuntimeError, match="Basic authentication failed: Connection failed"):
cli.authenticate()
mock_connection.login.assert_called_once_with('testuser', 'testpass')
assert not config.use_sessions

View File

@@ -1,5 +1,5 @@
git+https://github.com/ansible/system-certifi.git@devel#egg=certifi
git+https://github.com/ansible/ansible-runner.git@devel#egg=ansible-runner
certifi @ git+https://github.com/ansible/system-certifi.git@devel#egg=certifi
ansible-runner @ git+https://github.com/ansible/ansible-runner.git@devel#egg=ansible-runner
awx-plugins-core @ git+https://github.com/ansible/awx-plugins.git@devel#egg=awx-plugins-core[credentials-github-app]
django-ansible-base @ git+https://github.com/ansible/django-ansible-base@devel#egg=django-ansible-base[rest-filters,jwt_consumer,resource-registry,rbac,feature-flags]
awx_plugins.interfaces @ git+https://github.com/ansible/awx_plugins.interfaces.git

141
sonar-project.properties Normal file
View File

@@ -0,0 +1,141 @@
# SonarCloud project configuration for AWX
# Complete documentation: https://docs.sonarqube.org/latest/analysis/analysis-parameters/
# =============================================================================
# PROJECT IDENTIFICATION (REQUIRED)
# =============================================================================
# The unique project identifier. This is mandatory.
# Do not duplicate or reuse!
# Available characters: [a-zA-Z0-9_:\.\-]
# Must have least one non-digit.
sonar.projectKey=ansible_awx
sonar.organization=ansible
# Project metadata
sonar.projectName=awx
# =============================================================================
# SOURCE AND TEST CONFIGURATION
# =============================================================================
# Source directories to analyze
sonar.sources=.
sonar.inclusions=awx/**
# Test directories
sonar.tests=awx/main/tests
# Test file patterns
sonar.test.inclusions=\
**/test_*.py,\
**/*_test.py,\
**/tests/**/*.py
# Set branch-specific new code definition
#
# This is important to always check against the main branch for new PRs,
# otherwise the PR may fail during backporting, since the old version of the code
# may not respect the minimum requirements for the existing Quality Gate.
sonar.newCode.referenceBranch=devel
# =============================================================================
# LANGUAGE CONFIGURATION
# =============================================================================
# Python versions supported by the project
#sonar.python.version=3.9,3.10,3.11
# File encoding
sonar.sourceEncoding=UTF-8
# =============================================================================
# REPORTS AND COVERAGE
# =============================================================================
# Test and coverage reports (paths relative to project root)
sonar.python.coverage.reportPaths=reports/coverage.xml
sonar.python.xunit.reportPath=/reports/junit.xml
# External tool reports (add these paths when tools are configured)
# sonar.python.pylint.reportPaths=reports/pylint-report.txt
# sonar.python.bandit.reportPaths=reports/bandit-report.json
# sonar.python.mypy.reportPath=reports/mypy-report.txt
# sonar.python.flake8.reportPaths=reports/flake8-report.txt
# sonar.python.xunit.reportPath=reports/junit.xml
# =============================================================================
# EXCLUSIONS - FILES AND DIRECTORIES TO IGNORE
# =============================================================================
# General exclusions - files and directories to ignore from analysis
sonar.exclusions=\
**/tests/**,\
**/__pycache__/**,\
**/*.pyc,\
**/*.pyo,\
**/*.pyd,\
**/build/**,\
**/dist/**,\
**/*.egg-info/**
# =============================================================================
# COVERAGE EXCLUSIONS
# =============================================================================
# Files to exclude from coverage calculations
sonar.coverage.exclusions=\
**/tests/**,\
**/.tox/**,\
**/test_*.py,\
**/*_test.py,\
**/conftest.py,\
**/migrations/**,\
**/settings*.py,\
**/defaults.py,\
**/manage.py,\
**/__main__.py,\
tools/scripts/**
# =============================================================================
# DUPLICATION EXCLUSIONS
# =============================================================================
# Ignore code duplication in migrations and tests
sonar.cpd.exclusions=\
**/migrations/**,\
**/tests/**
# =============================================================================
# ISSUE IGNORE RULES
# =============================================================================
# Ignore specific rules for certain file patterns
sonar.issue.ignore.multicriteria=e1
# Ignore "should be a variable" in migrations
sonar.issue.ignore.multicriteria.e1.ruleKey=python:S1192
sonar.issue.ignore.multicriteria.e1.resourceKey=**/migrations/**/*
# =============================================================================
# GITHUB INTEGRATION
# =============================================================================
# The following properties are automatically handled by GitHub Actions:
# sonar.pullrequest.key - handled automatically
# sonar.pullrequest.branch - handled automatically
# sonar.pullrequest.base - handled automatically
# =============================================================================
# DEBUGGING
# =============================================================================
# These are aggressive settings to ensure maximum detection
# do not use in production
# sonar.verbose=true
# sonar.log.level=DEBUG
# sonar.scm.exclusions.disabled=true
# sonar.java.skipUnchanged=false
# sonar.scm.forceReloadAll=true
# sonar.filesize.limit=100
# sonar.qualitygate.wait=true