From d8cb47bf82d6f001493d80da1ba2cb4cad9b40e8 Mon Sep 17 00:00:00 2001 From: Yunfan Zhang Date: Mon, 4 Jun 2018 16:03:14 -0400 Subject: [PATCH] Added awx-manage command for expiring sessions. --- .../management/commands/expire_sessions.py | 36 ++++++++++ .../commands/test_expire_sessions.py | 67 +++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 awx/main/management/commands/expire_sessions.py create mode 100644 awx/main/tests/functional/commands/test_expire_sessions.py diff --git a/awx/main/management/commands/expire_sessions.py b/awx/main/management/commands/expire_sessions.py new file mode 100644 index 0000000000..7145c1d725 --- /dev/null +++ b/awx/main/management/commands/expire_sessions.py @@ -0,0 +1,36 @@ +# Python +from importlib import import_module + +# Django +from django.utils import timezone +from django.conf import settings +from django.contrib.auth import logout +from django.http import HttpRequest +from django.core.management.base import BaseCommand, CommandError +from django.contrib.auth.models import User +from django.contrib.sessions.models import Session +from django.core.exceptions import ObjectDoesNotExist + + +class Command(BaseCommand): + """Expire Django auth sessions for a user/all users""" + help='Expire Django auth sessions. Will expire all auth sessions if --user option is not supplied.' + + def add_arguments(self, parser): + parser.add_argument('--user', dest='user', type=str) + + def handle(self, *args, **options): + # Try to see if the user exist + try: + user = User.objects.get(username=options['user']) if options['user'] else None + except ObjectDoesNotExist: + raise CommandError('The user does not exist.') + # We use the following hack to filter out sessions that are still active, + # with consideration for timezones. + start = timezone.now() + sessions = Session.objects.filter(expire_date__gte=start).iterator() + request = HttpRequest() + for session in sessions: + if (user is None) or (user.id == int(session.get_decoded().get('_auth_user_id'))): + request.session = request.session = import_module(settings.SESSION_ENGINE).SessionStore(session.session_key) + logout(request) diff --git a/awx/main/tests/functional/commands/test_expire_sessions.py b/awx/main/tests/functional/commands/test_expire_sessions.py new file mode 100644 index 0000000000..91e811bdcf --- /dev/null +++ b/awx/main/tests/functional/commands/test_expire_sessions.py @@ -0,0 +1,67 @@ +# Python +import pytest +import string +import random + +# Django +from django.utils import timezone +from django.test import Client +from django.conf import settings +from django.contrib.auth.models import User +from django.contrib.sessions.models import Session +from django.core.management.base import CommandError + +# AWX +from awx.main.management.commands.expire_sessions import Command + + +@pytest.mark.django_db +class TestExpireSessionsCommand: + @staticmethod + def create_and_login_fake_users(): + # We already have Alice and Bob, so we are going to create Charlie and Dylan + charlie = User.objects.create_user('charlie', 'charlie@email.com', 'pass') + dylan = User.objects.create_user('dylan', 'dylan@email.com', 'word') + client_0 = Client() + client_1 = Client() + client_0.force_login(charlie, backend=settings.AUTHENTICATION_BACKENDS[0]) + client_1.force_login(dylan, backend=settings.AUTHENTICATION_BACKENDS[0]) + return charlie, dylan + + @staticmethod + def run_command(username=None): + command_obj = Command() + command_obj.handle(user=username) + + def test_expire_all_sessions(self): + charlie, dylan = self.create_and_login_fake_users() + self.run_command() + start = timezone.now() + sessions = Session.objects.filter(expire_date__gte=start) + for session in sessions: + user_id = int(session.get_decoded().get('_auth_user_id')) + if user_id == charlie.id or user_id == dylan.id: + self.fail('The user should not have active sessions.') + + def test_non_existing_user(self): + fake_username = '' + while fake_username == '' or User.objects.filter(username=fake_username).exists(): + fake_username = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(6)) + with pytest.raises(CommandError) as excinfo: + self.run_command(fake_username) + assert excinfo.value.message.strip() == 'The user does not exist.' + + def test_expire_one_user(self): + # alice should be logged out, but bob should not. + charlie, dylan = self.create_and_login_fake_users() + self.run_command('charlie') + start = timezone.now() + sessions = Session.objects.filter(expire_date__gte=start) + dylan_still_active = False + for session in sessions: + user_id = int(session.get_decoded().get('_auth_user_id')) + if user_id == charlie.id: + self.fail('Charlie should not have active sessions.') + elif user_id == dylan.id: + dylan_still_active = True + assert dylan_still_active