Remove oauth provider (#15666)

* Remove oauth provider

This removes the oauth provider functionality from awx. The
oauth2_provider app and all references to it have been removed.
Migrations to delete the two tables that locally overwrote
oauth2_provider tables are included. This change does not include
migrations to delete the tables provided by the oauth2_provider app.

Also not included here are changes to awxkit, awx_collection or the ui.

* Fix linters

* Update migrations after rebase

* Update collection tests for auth changes

The changes in https://github.com/ansible/awx/pull/15554 will cause a
few collection tests to fail, depending on what the test configuration
is. This changes the tests to look for a specific warning rather than
counting the number of warnings emitted.

* Update migration

* Removed unused oauth_scopes references

---------

Co-authored-by: Mike Graves <mgraves@redhat.com>
Co-authored-by: Alan Rominger <arominge@redhat.com>
This commit is contained in:
Pablo H.
2024-11-26 18:59:37 +01:00
committed by GitHub
parent 789a43077f
commit 268ca7c78a
44 changed files with 61 additions and 1821 deletions

View File

@@ -17,9 +17,6 @@ from django.core.exceptions import ObjectDoesNotExist, FieldDoesNotExist
# Django REST Framework
from rest_framework.exceptions import ParseError, PermissionDenied
# Django OAuth Toolkit
from awx.main.models.oauth import OAuth2Application, OAuth2AccessToken
# django-ansible-base
from ansible_base.lib.utils.validation import to_python_boolean
from ansible_base.rbac.models import RoleEvaluation
@@ -441,10 +438,7 @@ class BaseAccess(object):
# Actions not possible for reason unrelated to RBAC
# Cannot copy with validation errors, or update a manual group/project
if 'write' not in getattr(self.user, 'oauth_scopes', ['write']):
user_capabilities[display_method] = False # Read tokens cannot take any actions
continue
elif display_method in ['copy', 'start', 'schedule'] and isinstance(obj, JobTemplate):
if display_method in ['copy', 'start', 'schedule'] and isinstance(obj, JobTemplate):
if obj.validation_errors:
user_capabilities[display_method] = False
continue
@@ -753,82 +747,6 @@ class UserAccess(BaseAccess):
return False
class OAuth2ApplicationAccess(BaseAccess):
"""
I can read, change or delete OAuth 2 applications when:
- I am a superuser.
- I am the admin of the organization of the user of the application.
- I am a user in the organization of the application.
I can create OAuth 2 applications when:
- I am a superuser.
- I am the admin of the organization of the application.
"""
model = OAuth2Application
select_related = ('user',)
prefetch_related = ('organization', 'oauth2accesstoken_set')
def filtered_queryset(self):
org_access_qs = Organization.access_qs(self.user, 'member')
return self.model.objects.filter(organization__in=org_access_qs)
def can_change(self, obj, data):
return self.user.is_superuser or self.check_related('organization', Organization, data, obj=obj, role_field='admin_role', mandatory=True)
def can_delete(self, obj):
return self.user.is_superuser or obj.organization in self.user.admin_of_organizations
def can_add(self, data):
if self.user.is_superuser:
return True
if not data:
return Organization.access_qs(self.user, 'change').exists()
return self.check_related('organization', Organization, data, role_field='admin_role', mandatory=True)
class OAuth2TokenAccess(BaseAccess):
"""
I can read, change or delete an app token when:
- I am a superuser.
- I am the admin of the organization of the application of the token.
- I am the user of the token.
I can create an OAuth2 app token when:
- I have the read permission of the related application.
I can read, change or delete a personal token when:
- I am the user of the token
- I am the superuser
I can create an OAuth2 Personal Access Token when:
- I am a user. But I can only create a PAT for myself.
"""
model = OAuth2AccessToken
select_related = ('user', 'application')
prefetch_related = ('refresh_token',)
def filtered_queryset(self):
org_access_qs = Organization.objects.filter(Q(admin_role__members=self.user) | Q(auditor_role__members=self.user))
return self.model.objects.filter(application__organization__in=org_access_qs) | self.model.objects.filter(user__id=self.user.pk)
def can_delete(self, obj):
if (self.user.is_superuser) | (obj.user == self.user):
return True
elif not obj.application:
return False
return self.user in obj.application.organization.admin_role
def can_change(self, obj, data):
return self.can_delete(obj)
def can_add(self, data):
if 'application' in data:
app = get_object_from_data('application', OAuth2Application, data)
if app is None:
return True
return OAuth2ApplicationAccess(self.user).can_read(app)
return True
class OrganizationAccess(NotificationAttachMixin, BaseAccess):
"""
I can see organizations when:
@@ -2741,8 +2659,6 @@ class ActivityStreamAccess(BaseAccess):
'credential_type',
'team',
'ad_hoc_command',
'o_auth2_application',
'o_auth2_access_token',
'notification_template',
'notification',
'label',
@@ -2828,14 +2744,6 @@ class ActivityStreamAccess(BaseAccess):
if team_set:
q |= Q(team__in=team_set)
app_set = OAuth2ApplicationAccess(self.user).filtered_queryset()
if app_set:
q |= Q(o_auth2_application__in=app_set)
token_set = OAuth2TokenAccess(self.user).filtered_queryset()
if token_set:
q |= Q(o_auth2_access_token__in=token_set)
return qs.filter(q).distinct()
def can_add(self, data):

View File

@@ -1,26 +0,0 @@
import logging
from django.core import management
from django.core.management.base import BaseCommand
from awx.main.models import OAuth2AccessToken
from oauth2_provider.models import RefreshToken
class Command(BaseCommand):
def init_logging(self):
log_levels = dict(enumerate([logging.ERROR, logging.INFO, logging.DEBUG, 0]))
self.logger = logging.getLogger('awx.main.commands.cleanup_tokens')
self.logger.setLevel(log_levels.get(self.verbosity, 0))
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
self.logger.propagate = False
def execute(self, *args, **options):
self.verbosity = int(options.get('verbosity', 1))
self.init_logging()
total_accesstokens = OAuth2AccessToken.objects.all().count()
total_refreshtokens = RefreshToken.objects.all().count()
management.call_command('cleartokens')
self.logger.info("Expired OAuth 2 Access Tokens deleted: {}".format(total_accesstokens - OAuth2AccessToken.objects.all().count()))
self.logger.info("Expired OAuth 2 Refresh Tokens deleted: {}".format(total_refreshtokens - RefreshToken.objects.all().count()))

View File

@@ -1,34 +0,0 @@
# Django
from django.core.management.base import BaseCommand, CommandError
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
# AWX
from awx.api.serializers import OAuth2TokenSerializer
class Command(BaseCommand):
"""Command that creates an OAuth2 token for a certain user. Returns the value of created token."""
help = 'Creates an OAuth2 token for a user.'
def add_arguments(self, parser):
parser.add_argument('--user', dest='user', type=str)
def handle(self, *args, **options):
if not options['user']:
raise CommandError('Username not supplied. Usage: awx-manage create_oauth2_token --user=username.')
try:
user = User.objects.get(username=options['user'])
except ObjectDoesNotExist:
raise CommandError('The user does not exist.')
config = {'user': user, 'scope': 'write'}
serializer_obj = OAuth2TokenSerializer()
class FakeRequest(object):
def __init__(self):
self.user = user
serializer_obj.context['request'] = FakeRequest()
token_record = serializer_obj.create(config)
self.stdout.write(token_record.token)

View File

@@ -10,7 +10,7 @@ from django.db.models.signals import post_save
from awx.conf import settings_registry
from awx.conf.models import Setting
from awx.conf.signals import on_post_save_setting
from awx.main.models import UnifiedJob, Credential, NotificationTemplate, Job, JobTemplate, WorkflowJob, WorkflowJobTemplate, OAuth2Application
from awx.main.models import UnifiedJob, Credential, NotificationTemplate, Job, JobTemplate, WorkflowJob, WorkflowJobTemplate
from awx.main.utils.encryption import encrypt_field, decrypt_field, encrypt_value, decrypt_value, get_encryption_key
@@ -45,7 +45,6 @@ class Command(BaseCommand):
self._notification_templates()
self._credentials()
self._unified_jobs()
self._oauth2_app_secrets()
self._settings()
self._survey_passwords()
return self.new_key
@@ -74,13 +73,6 @@ class Command(BaseCommand):
uj.start_args = encrypt_field(uj, 'start_args', secret_key=self.new_key)
uj.save()
def _oauth2_app_secrets(self):
for app in OAuth2Application.objects.iterator():
raw = app.client_secret
app.client_secret = raw
encrypted = encrypt_value(raw, secret_key=self.new_key)
OAuth2Application.objects.filter(pk=app.pk).update(client_secret=encrypted)
def _settings(self):
# don't update the cache, the *actual* value isn't changing
post_save.disconnect(on_post_save_setting, sender=Setting)

View File

@@ -1,38 +0,0 @@
# Django
from django.core.management.base import BaseCommand, CommandError
from django.contrib.auth.models import User
from django.core.exceptions import ObjectDoesNotExist
# AWX
from awx.main.models.oauth import OAuth2AccessToken
from oauth2_provider.models import RefreshToken
def revoke_tokens(token_list):
for token in token_list:
token.revoke()
print('revoked {} {}'.format(token.__class__.__name__, token.token))
class Command(BaseCommand):
"""Command that revokes OAuth2 access tokens."""
help = 'Revokes OAuth2 access tokens. Use --all to revoke access and refresh tokens.'
def add_arguments(self, parser):
parser.add_argument('--user', dest='user', type=str, help='revoke OAuth2 tokens for a specific username')
parser.add_argument('--all', dest='all', action='store_true', help='revoke OAuth2 access tokens and refresh tokens')
def handle(self, *args, **options):
if not options['user']:
if options['all']:
revoke_tokens(RefreshToken.objects.filter(revoked=None))
revoke_tokens(OAuth2AccessToken.objects.all())
else:
try:
user = User.objects.get(username=options['user'])
except ObjectDoesNotExist:
raise CommandError('A user with that username does not exist.')
if options['all']:
revoke_tokens(RefreshToken.objects.filter(revoked=None).filter(user=user))
revoke_tokens(user.main_oauth2accesstoken.filter(user=user))

View File

@@ -5,7 +5,6 @@ from __future__ import unicode_literals
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import oauth2_provider
import re
@@ -13,19 +12,13 @@ class Migration(migrations.Migration):
dependencies = [
('main', '0024_v330_create_user_session_membership'),
]
run_before = [
# As of this migration, OAuth2Application and OAuth2AccessToken are models in main app
# Grant and RefreshToken models are still in the oauth2_provider app and reference
# the app and token models, so these must be created before the oauth2_provider models
('oauth2_provider', '0001_initial')
]
operations = [
migrations.CreateModel(
name='OAuth2Application',
fields=[
('id', models.BigAutoField(primary_key=True, serialize=False)),
('client_id', models.CharField(db_index=True, default=oauth2_provider.generators.generate_client_id, max_length=100, unique=True)),
('client_id', models.CharField(db_index=True, default=lambda: "", max_length=100, unique=True)),
(
'redirect_uris',
models.TextField(blank=True, help_text='Allowed URIs list, space separated'),
@@ -43,7 +36,7 @@ class Migration(migrations.Migration):
max_length=32,
),
),
('client_secret', models.CharField(blank=True, db_index=True, default=oauth2_provider.generators.generate_client_secret, max_length=255)),
('client_secret', models.CharField(blank=True, db_index=True, default=lambda: "", max_length=255)),
('name', models.CharField(blank=True, max_length=255)),
('skip_authorization', models.BooleanField(default=False)),
('created', models.DateTimeField(auto_now_add=True)),
@@ -72,10 +65,6 @@ class Migration(migrations.Migration):
('updated', models.DateTimeField(auto_now=True)),
('description', models.CharField(blank=True, default='', max_length=200)),
('last_used', models.DateTimeField(default=None, editable=False, null=True)),
(
'application',
models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.OAUTH2_PROVIDER_APPLICATION_MODEL),
),
(
'user',
models.ForeignKey(

View File

@@ -4,7 +4,6 @@ from __future__ import unicode_literals
import awx.main.fields
from django.db import migrations
import oauth2_provider.generators
class Migration(migrations.Migration):
@@ -16,8 +15,6 @@ class Migration(migrations.Migration):
migrations.AlterField(
model_name='oauth2application',
name='client_secret',
field=awx.main.fields.OAuth2ClientSecretField(
blank=True, db_index=True, default=oauth2_provider.generators.generate_client_secret, max_length=1024
),
field=awx.main.fields.OAuth2ClientSecretField(blank=True, db_index=True, default=lambda: "", max_length=1024),
),
]

View File

@@ -6,7 +6,6 @@ import awx.main.fields
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import oauth2_provider.generators
# TODO: Squash all of these migrations with '0024_v330_add_oauth_activity_stream_registrar'
@@ -54,7 +53,7 @@ class Migration(migrations.Migration):
field=awx.main.fields.OAuth2ClientSecretField(
blank=True,
db_index=True,
default=oauth2_provider.generators.generate_client_secret,
default=lambda: "",
help_text='Used for more stringent verification of access to an application when creating a token.',
max_length=1024,
),

View File

@@ -2,27 +2,12 @@
# Generated by Django 1.11.11 on 2018-06-14 21:03
from __future__ import unicode_literals
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.OAUTH2_PROVIDER_REFRESH_TOKEN_MODEL),
('main', '0040_v330_unifiedjob_controller_node'),
]
operations = [
migrations.AddField(
model_name='oauth2accesstoken',
name='source_refresh_token',
field=models.OneToOneField(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name='refreshed_access_token',
to=settings.OAUTH2_PROVIDER_REFRESH_TOKEN_MODEL,
),
),
]
operations = []

View File

@@ -1,25 +1,14 @@
# Generated by Django 3.2.16 on 2023-04-21 14:15
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.OAUTH2_PROVIDER_ID_TOKEN_MODEL),
('main', '0182_constructed_inventory'),
('oauth2_provider', '0005_auto_20211222_2352'),
]
operations = [
migrations.AddField(
model_name='oauth2accesstoken',
name='id_token',
field=models.OneToOneField(
blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='access_token', to=settings.OAUTH2_PROVIDER_ID_TOKEN_MODEL
),
),
migrations.AddField(
model_name='oauth2application',
name='algorithm',

View File

@@ -0,0 +1,39 @@
# Generated by Django 4.2.10 on 2024-10-24 14:06
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('main', '0198_alter_inventorysource_source_and_more'),
]
operations = [
migrations.AlterUniqueTogether(
name='oauth2application',
unique_together=None,
),
migrations.RemoveField(
model_name='oauth2application',
name='organization',
),
migrations.RemoveField(
model_name='oauth2application',
name='user',
),
migrations.RemoveField(
model_name='activitystream',
name='o_auth2_access_token',
),
migrations.RemoveField(
model_name='activitystream',
name='o_auth2_application',
),
migrations.DeleteModel(
name='OAuth2AccessToken',
),
migrations.DeleteModel(
name='OAuth2Application',
),
]

View File

@@ -93,9 +93,6 @@ from awx.main.models.workflow import ( # noqa
WorkflowApproval,
WorkflowApprovalTemplate,
)
from awx.api.versioning import reverse
from awx.main.models.oauth import OAuth2AccessToken, OAuth2Application # noqa
from oauth2_provider.models import Grant, RefreshToken # noqa -- needed django-oauth-toolkit model migrations
# Add custom methods to User model for permissions checks.
@@ -244,19 +241,6 @@ def user_is_system_auditor(user, tf):
User.add_to_class('is_system_auditor', user_is_system_auditor)
def o_auth2_application_get_absolute_url(self, request=None):
return reverse('api:o_auth2_application_detail', kwargs={'pk': self.pk}, request=request)
OAuth2Application.add_to_class('get_absolute_url', o_auth2_application_get_absolute_url)
def o_auth2_token_get_absolute_url(self, request=None):
return reverse('api:o_auth2_token_detail', kwargs={'pk': self.pk}, request=request)
OAuth2AccessToken.add_to_class('get_absolute_url', o_auth2_token_get_absolute_url)
from awx.main.registrar import activity_stream_registrar # noqa
activity_stream_registrar.connect(Organization)
@@ -288,8 +272,6 @@ activity_stream_registrar.connect(WorkflowJobTemplateNode)
activity_stream_registrar.connect(WorkflowJob)
activity_stream_registrar.connect(WorkflowApproval)
activity_stream_registrar.connect(WorkflowApprovalTemplate)
activity_stream_registrar.connect(OAuth2Application)
activity_stream_registrar.connect(OAuth2AccessToken)
# Register models
permission_registry.register(Project, Team, WorkflowJobTemplate, JobTemplate, Inventory, Organization, Credential, NotificationTemplate, ExecutionEnvironment)
@@ -297,8 +279,3 @@ permission_registry.register(InstanceGroup, parent_field_name=None) # Not part
# prevent API filtering on certain Django-supplied sensitive fields
prevent_search(User._meta.get_field('password'))
prevent_search(OAuth2AccessToken._meta.get_field('token'))
prevent_search(RefreshToken._meta.get_field('token'))
prevent_search(OAuth2Application._meta.get_field('client_secret'))
prevent_search(OAuth2Application._meta.get_field('client_id'))
prevent_search(Grant._meta.get_field('code'))

View File

@@ -81,8 +81,6 @@ class ActivityStream(models.Model):
role = models.ManyToManyField("Role", blank=True)
instance = models.ManyToManyField("Instance", blank=True)
instance_group = models.ManyToManyField("InstanceGroup", blank=True)
o_auth2_application = models.ManyToManyField("OAuth2Application", blank=True)
o_auth2_access_token = models.ManyToManyField("OAuth2AccessToken", blank=True)
setting = models.JSONField(default=dict, blank=True)

View File

@@ -1,125 +0,0 @@
# Python
import logging
import re
# Django
from django.core.validators import RegexValidator
from django.db import models, connection
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from django.conf import settings
# Django OAuth Toolkit
from oauth2_provider.models import AbstractApplication, AbstractAccessToken
from oauth2_provider.generators import generate_client_secret
from awx.main.fields import OAuth2ClientSecretField
DATA_URI_RE = re.compile(r'.*') # FIXME
__all__ = ['OAuth2AccessToken', 'OAuth2Application']
logger = logging.getLogger('awx.main.models.oauth')
class OAuth2Application(AbstractApplication):
class Meta:
app_label = 'main'
verbose_name = _('application')
unique_together = (("name", "organization"),)
ordering = ('organization', 'name')
CLIENT_CONFIDENTIAL = "confidential"
CLIENT_PUBLIC = "public"
CLIENT_TYPES = (
(CLIENT_CONFIDENTIAL, _("Confidential")),
(CLIENT_PUBLIC, _("Public")),
)
GRANT_AUTHORIZATION_CODE = "authorization-code"
GRANT_PASSWORD = "password"
GRANT_TYPES = (
(GRANT_AUTHORIZATION_CODE, _("Authorization code")),
(GRANT_PASSWORD, _("Resource owner password-based")),
)
description = models.TextField(
default='',
blank=True,
)
logo_data = models.TextField(
default='',
editable=False,
validators=[RegexValidator(DATA_URI_RE)],
)
organization = models.ForeignKey(
'Organization',
related_name='applications',
help_text=_('Organization containing this application.'),
on_delete=models.CASCADE,
null=True,
)
client_secret = OAuth2ClientSecretField(
max_length=1024,
blank=True,
default=generate_client_secret,
db_index=True,
help_text=_('Used for more stringent verification of access to an application when creating a token.'),
)
client_type = models.CharField(
max_length=32, choices=CLIENT_TYPES, help_text=_('Set to Public or Confidential depending on how secure the client device is.')
)
skip_authorization = models.BooleanField(default=False, help_text=_('Set True to skip authorization step for completely trusted applications.'))
authorization_grant_type = models.CharField(
max_length=32, choices=GRANT_TYPES, help_text=_('The Grant type the user must use for acquire tokens for this application.')
)
class OAuth2AccessToken(AbstractAccessToken):
class Meta:
app_label = 'main'
verbose_name = _('access token')
ordering = ('id',)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
blank=True,
null=True,
related_name="%(app_label)s_%(class)s",
help_text=_('The user representing the token owner'),
)
description = models.TextField(
default='',
blank=True,
)
last_used = models.DateTimeField(
null=True,
default=None,
editable=False,
)
scope = models.TextField(
blank=True,
default='write',
help_text=_(
'Allowed scopes, further restricts user\'s permissions. Must be a simple space-separated string with allowed scopes [\'read\', \'write\'].'
),
)
modified = models.DateTimeField(editable=False, auto_now=True)
def is_valid(self, scopes=None):
valid = super(OAuth2AccessToken, self).is_valid(scopes)
if valid:
self.last_used = now()
def _update_last_used():
if OAuth2AccessToken.objects.filter(pk=self.pk).exists():
self.save(update_fields=['last_used'])
connection.on_commit(_update_last_used)
return valid
def save(self, *args, **kwargs):
super(OAuth2AccessToken, self).save(*args, **kwargs)

View File

@@ -39,7 +39,6 @@ from awx.main.models import (
Job,
JobHostSummary,
JobTemplate,
OAuth2AccessToken,
Organization,
Project,
Role,
@@ -54,7 +53,6 @@ from awx.main.models import (
WorkflowApprovalTemplate,
ROLE_SINGLETON_SYSTEM_ADMINISTRATOR,
)
from awx.main.constants import CENSOR_VALUE
from awx.main.utils import model_instance_diff, model_to_dict, camelcase_to_underscore, get_current_apps
from awx.main.utils import ignore_inventory_computed_fields, ignore_inventory_group_removal, _inventory_updates
from awx.main.tasks.system import update_inventory_computed_fields, handle_removed_image
@@ -400,8 +398,6 @@ def model_serializer_mapping():
models.WorkflowApproval: serializers.WorkflowApprovalActivityStreamSerializer,
models.WorkflowApprovalTemplate: serializers.WorkflowApprovalTemplateSerializer,
models.WorkflowJob: serializers.WorkflowJobSerializer,
models.OAuth2AccessToken: serializers.OAuth2TokenSerializer,
models.OAuth2Application: serializers.OAuth2ApplicationSerializer,
}
@@ -443,8 +439,6 @@ def activity_stream_create(sender, instance, created, **kwargs):
changes['labels'] = [label.name for label in instance.labels.iterator()]
if 'extra_vars' in changes:
changes['extra_vars'] = instance.display_extra_vars()
if type(instance) == OAuth2AccessToken:
changes['token'] = CENSOR_VALUE
activity_entry = get_activity_stream_class()(operation='create', object1=object1, changes=json.dumps(changes), actor=get_current_user_or_none())
# TODO: Weird situation where cascade SETNULL doesn't work
# it might actually be a good idea to remove all of these FK references since
@@ -506,8 +500,6 @@ def activity_stream_delete(sender, instance, **kwargs):
return
changes.update(model_to_dict(instance, model_serializer_mapping()))
object1 = camelcase_to_underscore(instance.__class__.__name__)
if type(instance) == OAuth2AccessToken:
changes['token'] = CENSOR_VALUE
activity_entry = get_activity_stream_class()(operation='delete', changes=json.dumps(changes), object1=object1, actor=get_current_user_or_none())
activity_entry.save()
connection.on_commit(lambda: emit_activity_stream_change(activity_entry))
@@ -669,13 +661,3 @@ def save_user_session_membership(sender, **kwargs):
membership.delete()
if len(expired):
consumers.emit_channel_notification('control-limit_reached_{}'.format(user_id), dict(group_name='control', reason='limit_reached'))
@receiver(post_save, sender=OAuth2AccessToken)
def create_access_token_user_if_missing(sender, **kwargs):
obj = kwargs['instance']
if obj.application and obj.application.user:
obj.user = obj.application.user
post_save.disconnect(create_access_token_user_if_missing, sender=OAuth2AccessToken)
obj.save()
post_save.connect(create_access_token_user_if_missing, sender=OAuth2AccessToken)

View File

@@ -1,296 +0,0 @@
import base64
import json
import time
import pytest
from django.db import connection
from django.test.utils import override_settings
from django.utils.encoding import smart_str, smart_bytes
from rest_framework.reverse import reverse as drf_reverse
from awx.main.utils.encryption import decrypt_value, get_encryption_key
from awx.api.versioning import reverse
from awx.main.models.oauth import OAuth2Application as Application, OAuth2AccessToken as AccessToken
from oauth2_provider.models import RefreshToken
@pytest.mark.django_db
def test_personal_access_token_creation(oauth_application, post, alice):
url = drf_reverse('api:oauth_authorization_root_view') + 'token/'
resp = post(
url,
data='grant_type=password&username=alice&password=alice&scope=read',
content_type='application/x-www-form-urlencoded',
HTTP_AUTHORIZATION='Basic ' + smart_str(base64.b64encode(smart_bytes(':'.join([oauth_application.client_id, oauth_application.client_secret])))),
)
resp_json = smart_str(resp._container[0])
assert 'access_token' in resp_json
assert 'scope' in resp_json
assert 'refresh_token' in resp_json
@pytest.mark.django_db
def test_pat_creation_no_default_scope(oauth_application, post, admin):
# tests that the default scope is overriden
url = reverse('api:o_auth2_token_list')
response = post(
url,
{
'description': 'test token',
'scope': 'read',
'application': oauth_application.pk,
},
admin,
)
assert response.data['scope'] == 'read'
@pytest.mark.django_db
def test_pat_creation_no_scope(oauth_application, post, admin):
url = reverse('api:o_auth2_token_list')
response = post(
url,
{
'description': 'test token',
'application': oauth_application.pk,
},
admin,
)
assert response.data['scope'] == 'write'
@pytest.mark.django_db
def test_oauth2_application_create(admin, organization, post):
response = post(
reverse('api:o_auth2_application_list'),
{
'name': 'test app',
'organization': organization.pk,
'client_type': 'confidential',
'authorization_grant_type': 'password',
},
admin,
expect=201,
)
assert 'modified' in response.data
assert 'updated' not in response.data
created_app = Application.objects.get(client_id=response.data['client_id'])
assert created_app.name == 'test app'
assert created_app.skip_authorization is False
assert created_app.redirect_uris == ''
assert created_app.client_type == 'confidential'
assert created_app.authorization_grant_type == 'password'
assert created_app.organization == organization
@pytest.mark.django_db
def test_oauth2_validator(admin, oauth_application, post):
post(
reverse('api:o_auth2_application_list'),
{
'name': 'Write App Token',
'application': oauth_application.pk,
'scope': 'Write',
},
admin,
expect=400,
)
@pytest.mark.django_db
def test_oauth_application_update(oauth_application, organization, patch, admin, alice):
patch(
reverse('api:o_auth2_application_detail', kwargs={'pk': oauth_application.pk}),
{
'name': 'Test app with immutable grant type and user',
'organization': organization.pk,
'redirect_uris': 'http://localhost/api/',
'authorization_grant_type': 'password',
'skip_authorization': True,
},
admin,
expect=200,
)
updated_app = Application.objects.get(client_id=oauth_application.client_id)
assert updated_app.name == 'Test app with immutable grant type and user'
assert updated_app.redirect_uris == 'http://localhost/api/'
assert updated_app.skip_authorization is True
assert updated_app.authorization_grant_type == 'password'
assert updated_app.organization == organization
@pytest.mark.django_db
def test_oauth_application_encryption(admin, organization, post):
response = post(
reverse('api:o_auth2_application_list'),
{
'name': 'test app',
'organization': organization.pk,
'client_type': 'confidential',
'authorization_grant_type': 'password',
},
admin,
expect=201,
)
pk = response.data.get('id')
secret = response.data.get('client_secret')
with connection.cursor() as cursor:
encrypted = cursor.execute('SELECT client_secret FROM main_oauth2application WHERE id={}'.format(pk)).fetchone()[0]
assert encrypted.startswith('$encrypted$')
assert decrypt_value(get_encryption_key('value', pk=None), encrypted) == secret
@pytest.mark.django_db
def test_oauth_token_create(oauth_application, get, post, admin):
response = post(reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}), {'scope': 'read'}, admin, expect=201)
assert 'modified' in response.data and response.data['modified'] is not None
assert 'updated' not in response.data
token = AccessToken.objects.get(token=response.data['token'])
refresh_token = RefreshToken.objects.get(token=response.data['refresh_token'])
assert token.application == oauth_application
assert refresh_token.application == oauth_application
assert token.user == admin
assert refresh_token.user == admin
assert refresh_token.access_token == token
assert token.scope == 'read'
response = get(reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}), admin, expect=200)
assert response.data['count'] == 1
response = get(reverse('api:o_auth2_application_detail', kwargs={'pk': oauth_application.pk}), admin, expect=200)
assert response.data['summary_fields']['tokens']['count'] == 1
assert response.data['summary_fields']['tokens']['results'][0] == {'id': token.pk, 'scope': token.scope, 'token': '************'}
response = post(reverse('api:o_auth2_token_list'), {'scope': 'read', 'application': oauth_application.pk}, admin, expect=201)
assert response.data['refresh_token']
response = post(
reverse('api:user_authorized_token_list', kwargs={'pk': admin.pk}), {'scope': 'read', 'application': oauth_application.pk}, admin, expect=201
)
assert response.data['refresh_token']
response = post(reverse('api:application_o_auth2_token_list', kwargs={'pk': oauth_application.pk}), {'scope': 'read'}, admin, expect=201)
assert response.data['refresh_token']
@pytest.mark.django_db
def test_oauth_token_update(oauth_application, post, patch, admin):
response = post(reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}), {'scope': 'read'}, admin, expect=201)
token = AccessToken.objects.get(token=response.data['token'])
patch(reverse('api:o_auth2_token_detail', kwargs={'pk': token.pk}), {'scope': 'write'}, admin, expect=200)
token = AccessToken.objects.get(token=token.token)
assert token.scope == 'write'
@pytest.mark.django_db
def test_oauth_token_delete(oauth_application, post, delete, get, admin):
response = post(reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}), {'scope': 'read'}, admin, expect=201)
token = AccessToken.objects.get(token=response.data['token'])
delete(reverse('api:o_auth2_token_detail', kwargs={'pk': token.pk}), admin, expect=204)
assert AccessToken.objects.count() == 0
assert RefreshToken.objects.count() == 1
response = get(reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}), admin, expect=200)
assert response.data['count'] == 0
response = get(reverse('api:o_auth2_application_detail', kwargs={'pk': oauth_application.pk}), admin, expect=200)
assert response.data['summary_fields']['tokens']['count'] == 0
@pytest.mark.django_db
def test_oauth_application_delete(oauth_application, post, delete, admin):
post(reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}), {'scope': 'read'}, admin, expect=201)
delete(reverse('api:o_auth2_application_detail', kwargs={'pk': oauth_application.pk}), admin, expect=204)
assert Application.objects.filter(client_id=oauth_application.client_id).count() == 0
assert RefreshToken.objects.filter(application=oauth_application).count() == 0
assert AccessToken.objects.filter(application=oauth_application).count() == 0
@pytest.mark.django_db
def test_oauth_list_user_tokens(oauth_application, post, get, admin, alice):
for user in (admin, alice):
url = reverse('api:o_auth2_token_list', kwargs={'pk': user.pk})
post(url, {'scope': 'read'}, user, expect=201)
response = get(url, admin, expect=200)
assert response.data['count'] == 1
@pytest.mark.django_db
def test_refresh_accesstoken(oauth_application, post, get, delete, admin):
response = post(reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}), {'scope': 'read'}, admin, expect=201)
assert AccessToken.objects.count() == 1
assert RefreshToken.objects.count() == 1
token = AccessToken.objects.get(token=response.data['token'])
refresh_token = RefreshToken.objects.get(token=response.data['refresh_token'])
refresh_url = drf_reverse('api:oauth_authorization_root_view') + 'token/'
response = post(
refresh_url,
data='grant_type=refresh_token&refresh_token=' + refresh_token.token,
content_type='application/x-www-form-urlencoded',
HTTP_AUTHORIZATION='Basic ' + smart_str(base64.b64encode(smart_bytes(':'.join([oauth_application.client_id, oauth_application.client_secret])))),
)
assert RefreshToken.objects.filter(token=refresh_token).exists()
original_refresh_token = RefreshToken.objects.get(token=refresh_token)
assert token not in AccessToken.objects.all()
assert AccessToken.objects.count() == 1
# the same RefreshToken remains but is marked revoked
assert RefreshToken.objects.count() == 2
new_token = json.loads(response._container[0])['access_token']
new_refresh_token = json.loads(response._container[0])['refresh_token']
assert AccessToken.objects.filter(token=new_token).count() == 1
# checks that RefreshTokens are rotated (new RefreshToken issued)
assert RefreshToken.objects.filter(token=new_refresh_token).count() == 1
assert original_refresh_token.revoked # is not None
@pytest.mark.django_db
def test_refresh_token_expiration_is_respected(oauth_application, post, get, delete, admin):
response = post(reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}), {'scope': 'read'}, admin, expect=201)
assert AccessToken.objects.count() == 1
assert RefreshToken.objects.count() == 1
refresh_token = RefreshToken.objects.get(token=response.data['refresh_token'])
refresh_url = drf_reverse('api:oauth_authorization_root_view') + 'token/'
short_lived = {'ACCESS_TOKEN_EXPIRE_SECONDS': 1, 'AUTHORIZATION_CODE_EXPIRE_SECONDS': 1, 'REFRESH_TOKEN_EXPIRE_SECONDS': 1}
time.sleep(1)
with override_settings(OAUTH2_PROVIDER=short_lived):
response = post(
refresh_url,
data='grant_type=refresh_token&refresh_token=' + refresh_token.token,
content_type='application/x-www-form-urlencoded',
HTTP_AUTHORIZATION='Basic ' + smart_str(base64.b64encode(smart_bytes(':'.join([oauth_application.client_id, oauth_application.client_secret])))),
)
assert response.status_code == 403
assert b'The refresh token has expired.' in response.content
assert RefreshToken.objects.filter(token=refresh_token).exists()
assert AccessToken.objects.count() == 1
assert RefreshToken.objects.count() == 1
@pytest.mark.django_db
def test_revoke_access_then_refreshtoken(oauth_application, post, get, delete, admin):
response = post(reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}), {'scope': 'read'}, admin, expect=201)
token = AccessToken.objects.get(token=response.data['token'])
refresh_token = RefreshToken.objects.get(token=response.data['refresh_token'])
assert AccessToken.objects.count() == 1
assert RefreshToken.objects.count() == 1
token.revoke()
assert AccessToken.objects.count() == 0
assert RefreshToken.objects.count() == 1
assert not refresh_token.revoked
refresh_token.revoke()
assert AccessToken.objects.count() == 0
assert RefreshToken.objects.count() == 1
@pytest.mark.django_db
def test_revoke_refreshtoken(oauth_application, post, get, delete, admin):
response = post(reverse('api:o_auth2_application_token_list', kwargs={'pk': oauth_application.pk}), {'scope': 'read'}, admin, expect=201)
refresh_token = RefreshToken.objects.get(token=response.data['refresh_token'])
assert AccessToken.objects.count() == 1
assert RefreshToken.objects.count() == 1
refresh_token.revoke()
assert AccessToken.objects.count() == 0
# the same RefreshToken is recycled
new_refresh_token = RefreshToken.objects.all().first()
assert refresh_token == new_refresh_token
assert new_refresh_token.revoked

View File

@@ -1,44 +0,0 @@
# Python
import pytest
import string
import random
from io import StringIO
# Django
from django.contrib.auth.models import User
from django.core.management import call_command
from django.core.management.base import CommandError
# AWX
from awx.main.models.oauth import OAuth2AccessToken
@pytest.mark.django_db
@pytest.mark.inventory_import
class TestOAuth2CreateCommand:
def test_no_user_option(self):
out = StringIO()
with pytest.raises(CommandError) as excinfo:
call_command('create_oauth2_token', stdout=out)
assert 'Username not supplied.' in str(excinfo.value)
out.close()
def test_non_existing_user(self):
out = StringIO()
fake_username = ''
while fake_username == '' or User.objects.filter(username=fake_username).exists():
fake_username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
arg = '--user=' + fake_username
with pytest.raises(CommandError) as excinfo:
call_command('create_oauth2_token', arg, stdout=out)
assert 'The user does not exist.' in str(excinfo.value)
out.close()
def test_correct_user(self, alice):
out = StringIO()
arg = '--user=' + 'alice'
call_command('create_oauth2_token', arg, stdout=out)
generated_token = out.getvalue().strip()
assert OAuth2AccessToken.objects.filter(user=alice, token=generated_token).count() == 1
assert OAuth2AccessToken.objects.get(user=alice, token=generated_token).scope == 'write'
out.close()

View File

@@ -1,62 +0,0 @@
# Python
import datetime
import pytest
import string
import random
from io import StringIO
# Django
from django.core.management import call_command
from django.core.management.base import CommandError
# AWX
from awx.main.models import RefreshToken
from awx.main.models.oauth import OAuth2AccessToken
from awx.api.versioning import reverse
@pytest.mark.django_db
class TestOAuth2RevokeCommand:
def test_non_existing_user(self):
out = StringIO()
fake_username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
arg = '--user=' + fake_username
with pytest.raises(CommandError) as excinfo:
call_command('revoke_oauth2_tokens', arg, stdout=out)
assert 'A user with that username does not exist' in str(excinfo.value)
out.close()
def test_revoke_all_access_tokens(self, post, admin, alice):
url = reverse('api:o_auth2_token_list')
for user in (admin, alice):
post(url, {'description': 'test token', 'scope': 'read'}, user)
assert OAuth2AccessToken.objects.count() == 2
call_command('revoke_oauth2_tokens')
assert OAuth2AccessToken.objects.count() == 0
def test_revoke_access_token_for_user(self, post, admin, alice):
url = reverse('api:o_auth2_token_list')
post(url, {'description': 'test token', 'scope': 'read'}, alice)
assert OAuth2AccessToken.objects.count() == 1
call_command('revoke_oauth2_tokens', '--user=admin')
assert OAuth2AccessToken.objects.count() == 1
call_command('revoke_oauth2_tokens', '--user=alice')
assert OAuth2AccessToken.objects.count() == 0
def test_revoke_all_refresh_tokens(self, post, admin, oauth_application):
url = reverse('api:o_auth2_token_list')
post(url, {'description': 'test token for', 'scope': 'read', 'application': oauth_application.pk}, admin)
assert OAuth2AccessToken.objects.count() == 1
assert RefreshToken.objects.count() == 1
call_command('revoke_oauth2_tokens')
assert OAuth2AccessToken.objects.count() == 0
assert RefreshToken.objects.count() == 1
for r in RefreshToken.objects.all():
assert r.revoked is None
call_command('revoke_oauth2_tokens', '--all')
assert RefreshToken.objects.count() == 1
for r in RefreshToken.objects.all():
assert r.revoked is not None
assert isinstance(r.revoked, datetime.datetime)

View File

@@ -147,22 +147,6 @@ class TestKeyRegeneration:
with override_settings(SECRET_KEY=new_key):
assert json.loads(new_job.decrypted_extra_vars())['secret_key'] == 'donttell'
def test_oauth2_application_client_secret(self, oauth_application):
# test basic decryption
secret = oauth_application.client_secret
assert len(secret) == 128
# re-key the client_secret
new_key = regenerate_secret_key.Command().handle()
# verify that the old SECRET_KEY doesn't work
with pytest.raises(InvalidToken):
models.OAuth2Application.objects.get(pk=oauth_application.pk).client_secret
# verify that the new SECRET_KEY *does* work
with override_settings(SECRET_KEY=new_key):
assert models.OAuth2Application.objects.get(pk=oauth_application.pk).client_secret == secret
def test_use_custom_key_with_tower_secret_key_env_var(self):
custom_key = 'MXSq9uqcwezBOChl/UfmbW1k4op+bC+FQtwPqgJ1u9XV'
os.environ['TOWER_SECRET_KEY'] = custom_key

View File

@@ -44,7 +44,6 @@ from awx.main.models.events import (
)
from awx.main.models.workflow import WorkflowJobTemplate
from awx.main.models.ad_hoc_commands import AdHocCommand
from awx.main.models.oauth import OAuth2Application as Application
from awx.main.models.execution_environments import ExecutionEnvironment
from awx.main.utils import is_testing
@@ -812,11 +811,6 @@ def get_db_prep_save(self, value, connection, **kwargs):
return value
@pytest.fixture
def oauth_application(admin):
return Application.objects.create(name='test app', user=admin, client_type='confidential', authorization_grant_type='password')
class MockCopy:
events = []
index = -1

View File

@@ -1,247 +0,0 @@
import pytest
from awx.main.access import (
OAuth2ApplicationAccess,
OAuth2TokenAccess,
ActivityStreamAccess,
)
from awx.main.models.oauth import (
OAuth2Application as Application,
OAuth2AccessToken as AccessToken,
)
from awx.main.models import ActivityStream
from awx.api.versioning import reverse
@pytest.mark.django_db
class TestOAuth2Application:
@pytest.mark.parametrize(
"user_for_access, can_access_list",
[
(0, [True, True]),
(1, [True, True]),
(2, [True, True]),
(3, [False, False]),
],
)
def test_can_read(self, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization):
user_list = [admin, org_admin, org_member, alice]
access = OAuth2ApplicationAccess(user_list[user_for_access])
app_creation_user_list = [admin, org_admin]
for user, can_access in zip(app_creation_user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username),
user=user,
client_type='confidential',
authorization_grant_type='password',
organization=organization,
)
assert access.can_read(app) is can_access
def test_admin_only_can_read(self, user, organization):
user = user('org-admin', False)
organization.admin_role.members.add(user)
access = OAuth2ApplicationAccess(user)
app = Application.objects.create(
name='test app for {}'.format(user.username), user=user, client_type='confidential', authorization_grant_type='password', organization=organization
)
assert access.can_read(app) is True
def test_app_activity_stream(self, org_admin, alice, organization):
app = Application.objects.create(
name='test app for {}'.format(org_admin.username),
user=org_admin,
client_type='confidential',
authorization_grant_type='password',
organization=organization,
)
access = OAuth2ApplicationAccess(org_admin)
assert access.can_read(app) is True
access = ActivityStreamAccess(org_admin)
activity_stream = ActivityStream.objects.filter(o_auth2_application=app).latest('pk')
assert access.can_read(activity_stream) is True
access = ActivityStreamAccess(alice)
assert access.can_read(app) is False
assert access.can_read(activity_stream) is False
def test_token_activity_stream(self, org_admin, alice, organization, post):
app = Application.objects.create(
name='test app for {}'.format(org_admin.username),
user=org_admin,
client_type='confidential',
authorization_grant_type='password',
organization=organization,
)
response = post(reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}), {'scope': 'read'}, org_admin, expect=201)
token = AccessToken.objects.get(token=response.data['token'])
access = OAuth2ApplicationAccess(org_admin)
assert access.can_read(app) is True
access = ActivityStreamAccess(org_admin)
activity_stream = ActivityStream.objects.filter(o_auth2_access_token=token).latest('pk')
assert access.can_read(activity_stream) is True
access = ActivityStreamAccess(alice)
assert access.can_read(token) is False
assert access.can_read(activity_stream) is False
def test_can_edit_delete_app_org_admin(self, admin, org_admin, org_member, alice, organization):
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, True, False, False]
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username),
user=org_admin,
client_type='confidential',
authorization_grant_type='password',
organization=organization,
)
access = OAuth2ApplicationAccess(user)
assert access.can_change(app, {}) is can_access
assert access.can_delete(app) is can_access
def test_can_edit_delete_app_admin(self, admin, org_admin, org_member, alice, organization):
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, True, False, False]
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username),
user=admin,
client_type='confidential',
authorization_grant_type='password',
organization=organization,
)
access = OAuth2ApplicationAccess(user)
assert access.can_change(app, {}) is can_access
assert access.can_delete(app) is can_access
def test_superuser_can_always_create(self, admin, org_admin, org_member, alice, organization):
access = OAuth2ApplicationAccess(admin)
for user in [admin, org_admin, org_member, alice]:
assert access.can_add(
{'name': 'test app', 'user': user.pk, 'client_type': 'confidential', 'authorization_grant_type': 'password', 'organization': organization.id}
)
def test_normal_user_cannot_create(self, admin, org_admin, org_member, alice, organization):
for access_user in [org_member, alice]:
access = OAuth2ApplicationAccess(access_user)
for user in [admin, org_admin, org_member, alice]:
assert not access.can_add(
{
'name': 'test app',
'user': user.pk,
'client_type': 'confidential',
'authorization_grant_type': 'password',
'organization': organization.id,
}
)
@pytest.mark.django_db
class TestOAuth2Token:
def test_can_read_change_delete_app_token(self, post, admin, org_admin, org_member, alice, organization):
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, True, False, False]
app = Application.objects.create(
name='test app for {}'.format(admin.username),
user=admin,
client_type='confidential',
authorization_grant_type='password',
organization=organization,
)
response = post(reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}), {'scope': 'read'}, admin, expect=201)
for user, can_access in zip(user_list, can_access_list):
token = AccessToken.objects.get(token=response.data['token'])
access = OAuth2TokenAccess(user)
assert access.can_read(token) is can_access
assert access.can_change(token, {}) is can_access
assert access.can_delete(token) is can_access
def test_auditor_can_read(self, post, admin, org_admin, org_member, alice, system_auditor, organization):
user_list = [admin, org_admin, org_member]
can_access_list = [True, True, True]
cannot_access_list = [False, False, False]
app = Application.objects.create(
name='test app for {}'.format(admin.username),
user=admin,
client_type='confidential',
authorization_grant_type='password',
organization=organization,
)
for user, can_access, cannot_access in zip(user_list, can_access_list, cannot_access_list):
response = post(reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}), {'scope': 'read'}, user, expect=201)
token = AccessToken.objects.get(token=response.data['token'])
access = OAuth2TokenAccess(system_auditor)
assert access.can_read(token) is can_access
assert access.can_change(token, {}) is cannot_access
assert access.can_delete(token) is cannot_access
def test_user_auditor_can_change(self, post, org_member, org_admin, system_auditor, organization):
app = Application.objects.create(
name='test app for {}'.format(org_admin.username),
user=org_admin,
client_type='confidential',
authorization_grant_type='password',
organization=organization,
)
response = post(reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}), {'scope': 'read'}, org_member, expect=201)
token = AccessToken.objects.get(token=response.data['token'])
access = OAuth2TokenAccess(system_auditor)
assert access.can_read(token) is True
assert access.can_change(token, {}) is False
assert access.can_delete(token) is False
dual_user = system_auditor
organization.admin_role.members.add(dual_user)
access = OAuth2TokenAccess(dual_user)
assert access.can_read(token) is True
assert access.can_change(token, {}) is True
assert access.can_delete(token) is True
def test_can_read_change_delete_personal_token_org_member(self, post, admin, org_admin, org_member, alice):
# Tests who can read a token created by an org-member
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, False, True, False]
response = post(reverse('api:user_personal_token_list', kwargs={'pk': org_member.pk}), {'scope': 'read'}, org_member, expect=201)
token = AccessToken.objects.get(token=response.data['token'])
for user, can_access in zip(user_list, can_access_list):
access = OAuth2TokenAccess(user)
assert access.can_read(token) is can_access
assert access.can_change(token, {}) is can_access
assert access.can_delete(token) is can_access
def test_can_read_personal_token_creator(self, post, admin, org_admin, org_member, alice):
# Tests the token's creator can read their tokens
user_list = [admin, org_admin, org_member, alice]
can_access_list = [True, True, True, True]
for user, can_access in zip(user_list, can_access_list):
response = post(reverse('api:user_personal_token_list', kwargs={'pk': user.pk}), {'scope': 'read', 'application': None}, user, expect=201)
token = AccessToken.objects.get(token=response.data['token'])
access = OAuth2TokenAccess(user)
assert access.can_read(token) is can_access
assert access.can_change(token, {}) is can_access
assert access.can_delete(token) is can_access
@pytest.mark.parametrize(
"user_for_access, can_access_list",
[
(0, [True, True]),
(1, [True, True]),
(2, [True, True]),
(3, [False, False]),
],
)
def test_can_create(self, post, admin, org_admin, org_member, alice, user_for_access, can_access_list, organization):
user_list = [admin, org_admin, org_member, alice]
for user, can_access in zip(user_list, can_access_list):
app = Application.objects.create(
name='test app for {}'.format(user.username),
user=user,
client_type='confidential',
authorization_grant_type='password',
organization=organization,
)
post(
reverse('api:o_auth2_application_token_list', kwargs={'pk': app.pk}),
{'scope': 'read'},
user_list[user_for_access],
expect=201 if can_access else 403,
)

View File

@@ -1,8 +0,0 @@
import pytest
from awx.api.serializers import OAuth2TokenSerializer
@pytest.mark.parametrize('scope, expect', [('', False), ('read', True), ('read read', False), ('write read', True), ('read rainbow', False)])
def test_invalid_scopes(scope, expect):
assert OAuth2TokenSerializer()._is_valid_scope(scope) is expect

View File

@@ -9,7 +9,6 @@ from ansible_base.rest_filters.rest_framework.field_lookup_backend import FieldL
from awx.main.models import (
AdHocCommand,
ActivityStream,
Job,
JobTemplate,
SystemJob,
@@ -19,7 +18,6 @@ from awx.main.models import (
WorkflowJobTemplate,
WorkflowJobOptions,
)
from awx.main.models.oauth import OAuth2Application
from awx.main.models.jobs import JobOptions
@@ -28,7 +26,6 @@ from awx.main.models.jobs import JobOptions
[
(User, 'password__icontains'),
(User, 'settings__value__icontains'),
(User, 'main_oauth2accesstoken__token__gt'),
(UnifiedJob, 'job_args__icontains'),
(UnifiedJob, 'job_env__icontains'),
(UnifiedJob, 'start_args__icontains'),
@@ -40,8 +37,6 @@ from awx.main.models.jobs import JobOptions
(WorkflowJob, 'survey_passwords__icontains'),
(JobTemplate, 'survey_spec__icontains'),
(WorkflowJobTemplate, 'survey_spec__icontains'),
(ActivityStream, 'o_auth2_application__client_secret__gt'),
(OAuth2Application, 'grant__code__gt'),
],
)
def test_filter_sensitive_fields_and_relations(model, query):

View File

@@ -361,7 +361,7 @@ def get_allowed_fields(obj, serializer_mapping):
else:
allowed_fields = [x.name for x in obj._meta.fields]
ACTIVITY_STREAM_FIELD_EXCLUSIONS = {'user': ['last_login'], 'oauth2accesstoken': ['last_used'], 'oauth2application': ['client_secret']}
ACTIVITY_STREAM_FIELD_EXCLUSIONS = {'user': ['last_login']}
model_name = obj._meta.model_name
fields_excluded = ACTIVITY_STREAM_FIELD_EXCLUSIONS.get(model_name, [])
# see definition of from_db for CredentialType