mirror of
https://github.com/ansible/awx.git
synced 2026-01-14 19:30:39 -03:30
Added awx-manage command for expiring sessions.
This commit is contained in:
parent
b3b38777c1
commit
d8cb47bf82
36
awx/main/management/commands/expire_sessions.py
Normal file
36
awx/main/management/commands/expire_sessions.py
Normal file
@ -0,0 +1,36 @@
|
||||
# Python
|
||||
from importlib import import_module
|
||||
|
||||
# Django
|
||||
from django.utils import timezone
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import logout
|
||||
from django.http import HttpRequest
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.sessions.models import Session
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
"""Expire Django auth sessions for a user/all users"""
|
||||
help='Expire Django auth sessions. Will expire all auth sessions if --user option is not supplied.'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('--user', dest='user', type=str)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
# Try to see if the user exist
|
||||
try:
|
||||
user = User.objects.get(username=options['user']) if options['user'] else None
|
||||
except ObjectDoesNotExist:
|
||||
raise CommandError('The user does not exist.')
|
||||
# We use the following hack to filter out sessions that are still active,
|
||||
# with consideration for timezones.
|
||||
start = timezone.now()
|
||||
sessions = Session.objects.filter(expire_date__gte=start).iterator()
|
||||
request = HttpRequest()
|
||||
for session in sessions:
|
||||
if (user is None) or (user.id == int(session.get_decoded().get('_auth_user_id'))):
|
||||
request.session = request.session = import_module(settings.SESSION_ENGINE).SessionStore(session.session_key)
|
||||
logout(request)
|
||||
67
awx/main/tests/functional/commands/test_expire_sessions.py
Normal file
67
awx/main/tests/functional/commands/test_expire_sessions.py
Normal file
@ -0,0 +1,67 @@
|
||||
# Python
|
||||
import pytest
|
||||
import string
|
||||
import random
|
||||
|
||||
# Django
|
||||
from django.utils import timezone
|
||||
from django.test import Client
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.sessions.models import Session
|
||||
from django.core.management.base import CommandError
|
||||
|
||||
# AWX
|
||||
from awx.main.management.commands.expire_sessions import Command
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestExpireSessionsCommand:
|
||||
@staticmethod
|
||||
def create_and_login_fake_users():
|
||||
# We already have Alice and Bob, so we are going to create Charlie and Dylan
|
||||
charlie = User.objects.create_user('charlie', 'charlie@email.com', 'pass')
|
||||
dylan = User.objects.create_user('dylan', 'dylan@email.com', 'word')
|
||||
client_0 = Client()
|
||||
client_1 = Client()
|
||||
client_0.force_login(charlie, backend=settings.AUTHENTICATION_BACKENDS[0])
|
||||
client_1.force_login(dylan, backend=settings.AUTHENTICATION_BACKENDS[0])
|
||||
return charlie, dylan
|
||||
|
||||
@staticmethod
|
||||
def run_command(username=None):
|
||||
command_obj = Command()
|
||||
command_obj.handle(user=username)
|
||||
|
||||
def test_expire_all_sessions(self):
|
||||
charlie, dylan = self.create_and_login_fake_users()
|
||||
self.run_command()
|
||||
start = timezone.now()
|
||||
sessions = Session.objects.filter(expire_date__gte=start)
|
||||
for session in sessions:
|
||||
user_id = int(session.get_decoded().get('_auth_user_id'))
|
||||
if user_id == charlie.id or user_id == dylan.id:
|
||||
self.fail('The user should not have active sessions.')
|
||||
|
||||
def test_non_existing_user(self):
|
||||
fake_username = ''
|
||||
while fake_username == '' or User.objects.filter(username=fake_username).exists():
|
||||
fake_username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6))
|
||||
with pytest.raises(CommandError) as excinfo:
|
||||
self.run_command(fake_username)
|
||||
assert excinfo.value.message.strip() == 'The user does not exist.'
|
||||
|
||||
def test_expire_one_user(self):
|
||||
# alice should be logged out, but bob should not.
|
||||
charlie, dylan = self.create_and_login_fake_users()
|
||||
self.run_command('charlie')
|
||||
start = timezone.now()
|
||||
sessions = Session.objects.filter(expire_date__gte=start)
|
||||
dylan_still_active = False
|
||||
for session in sessions:
|
||||
user_id = int(session.get_decoded().get('_auth_user_id'))
|
||||
if user_id == charlie.id:
|
||||
self.fail('Charlie should not have active sessions.')
|
||||
elif user_id == dylan.id:
|
||||
dylan_still_active = True
|
||||
assert dylan_still_active
|
||||
Loading…
x
Reference in New Issue
Block a user