From 62987196cbe87bb1761755ebb1aff744ea221909 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Thu, 12 Oct 2017 17:43:02 -0400 Subject: [PATCH] add speedup support to event replay and stats * add tests * add verbosity support --- .../management/commands/replay_job_events.py | 164 ++++++++++++++++-- .../unit/commands/test_replay_job_events.py | 91 ++++++++++ 2 files changed, 237 insertions(+), 18 deletions(-) create mode 100644 awx/main/tests/unit/commands/test_replay_job_events.py diff --git a/awx/main/management/commands/replay_job_events.py b/awx/main/management/commands/replay_job_events.py index 9a2d1e3096..d7387ae080 100644 --- a/awx/main/management/commands/replay_job_events.py +++ b/awx/main/management/commands/replay_job_events.py @@ -1,9 +1,12 @@ -# Copyright (c) 2015 Ansible, Inc. -# All Rights Reserved +# Copyright (c) 2017 Ansible by Red Hat +# All Rights Reserved. import sys +import time +import json from optparse import make_option +from django.utils import timezone from django.core.management.base import NoArgsCommand from awx.main.models import ( @@ -18,37 +21,162 @@ from awx.api.serializers import ( ) -class Command(NoArgsCommand): +class ReplayJobEvents(): - help = 'Replay job events over websockets' + recording_start = None + replay_start = None - option_list = NoArgsCommand.option_list + ( - make_option('--job_id', dest='job_id', type='int', metavar='j', - help='Id of the job to replay (job or adhoc)'), - ) + def now(self): + return timezone.now() - def replay_job_events(self, job_id): + def start(self, first_event_created): + self.recording_start = first_event_created + self.replay_start = self.now() + + def lateness(self, now, created): + time_passed = now - self.recording_start + job_event_time = created - self.replay_start + + return (time_passed - job_event_time).total_seconds() + + def get_job(self, job_id): try: unified_job = UnifiedJob.objects.get(id=job_id) except UnifiedJob.DoesNotExist: print("UnifiedJob {} not found.".format(job_id)) sys.exit(1) - job = unified_job.get_real_instance() + return unified_job.get_real_instance() + + def sleep(self, seconds): + time.sleep(seconds) + + def replay_elapsed(self): + return (self.now() - self.replay_start) + + def recording_elapsed(self, created): + return (created - self.recording_start) + + def replay_offset(self, created, speed): + return self.replay_elapsed().total_seconds() - (self.recording_elapsed(created).total_seconds() * (1.0 / speed)) + + def get_job_events(self, job): job_events = job.job_events.order_by('created') - serializer = None + if job_events.count() == 0: + raise RuntimeError("No events for job id {}".format(job.id)) + return job_events + + def get_serializer(self, job): if type(job) is Job: - serializer = JobEventWebSocketSerializer + return JobEventWebSocketSerializer elif type(job) is AdHocCommand: - serializer = AdHocCommandEventWebSocketSerializer + return AdHocCommandEventWebSocketSerializer else: - print("Job is of type {} and replay is not yet supported.".format(type(job))) + raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job))) sys.exit(1) - for je in job_events: - je_serialized = serializer(je).data + def run(self, job_id, speed=1.0, verbosity=0): + stats = { + 'events_ontime': { + 'total': 0, + 'percentage': 0, + }, + 'events_late': { + 'total': 0, + 'percentage': 0, + 'lateness_total': 0, + 'lateness_average': 0, + }, + 'events_total': 0, + 'events_distance_total': 0, + 'events_distance_average': 0, + 'recording_start': 0, + 'recording_end': 0, + 'recording_duration': 0, + 'replay_start': 0, + 'replay_end': 0, + 'replay_duration': 0, + } + try: + job = self.get_job(job_id) + job_events = self.get_job_events(job) + serializer = self.get_serializer(job) + except RuntimeError as e: + print("{}".format(e.message)) + sys.exit(1) + + je_previous = None + for je_current in job_events: + if not je_previous: + stats['recording_start'] = je_current.created + self.start(je_current.created) + stats['replay_start'] = self.replay_start + je_previous = je_current + + je_serialized = serializer(je_current).data emit_channel_notification('{}-{}'.format(je_serialized['group_name'], job.id), je_serialized) + replay_offset = self.replay_offset(je_previous.created, speed) + recording_diff = (je_current.created - je_previous.created).total_seconds() * (1.0 / speed) + stats['events_distance_total'] += recording_diff + if verbosity >= 3: + print("recording: next job in {} seconds".format(recording_diff)) + if replay_offset >= 0: + replay_diff = recording_diff - replay_offset + + if replay_diff > 0: + stats['events_ontime']['total'] += 1 + if verbosity >= 3: + print("\treplay: sleep for {} seconds".format(replay_diff)) + self.sleep(replay_diff) + else: + stats['events_late']['total'] += 1 + stats['events_late']['lateness_total'] += (replay_diff * -1) + if verbosity >= 3: + print("\treplay: too far behind to sleep {} seconds".format(replay_diff)) + else: + replay_offset = self.replay_offset(je_current.created, speed) + stats['events_late']['lateness_total'] += (replay_offset * -1) + stats['events_late']['total'] += 1 + if verbosity >= 3: + print("\treplay: behind by {} seconds".format(replay_offset)) + + stats['events_total'] += 1 + je_previous = je_current + + stats['replay_end'] = self.now() + stats['replay_duration'] = (stats['replay_end'] - stats['replay_start']).total_seconds() + stats['replay_start'] = stats['replay_start'].isoformat() + stats['replay_end'] = stats['replay_end'].isoformat() + + stats['recording_end'] = je_current.created + stats['recording_duration'] = (stats['recording_end'] - stats['recording_start']).total_seconds() + stats['recording_start'] = stats['recording_start'].isoformat() + stats['recording_end'] = stats['recording_end'].isoformat() + + stats['events_ontime']['percentage'] = (stats['events_ontime']['total'] / float(stats['events_total'])) * 100.00 + stats['events_late']['percentage'] = (stats['events_late']['total'] / float(stats['events_total'])) * 100.00 + stats['events_distance_average'] = stats['events_distance_total'] / stats['events_total'] + stats['events_late']['lateness_average'] = stats['events_late']['lateness_total'] / stats['events_late']['total'] + if verbosity >= 2: + print(json.dumps(stats, indent=4, sort_keys=True)) + + +class Command(NoArgsCommand): + + help = 'Replay job events over websockets ordered by created on date.' + + option_list = NoArgsCommand.option_list + ( + make_option('--job_id', dest='job_id', type='int', metavar='j', + help='Id of the job to replay (job or adhoc)'), + make_option('--speed', dest='speed', type='int', metavar='s', + help='Speedup factor.'), + ) + def handle_noargs(self, **options): - self.job_id = options.get('job_id') - self.replay_job_events(self.job_id) + job_id = options.get('job_id') + speed = options.get('speed') or 1 + verbosity = options.get('verbosity') or 0 + + replayer = ReplayJobEvents() + replayer.run(job_id, speed, verbosity) diff --git a/awx/main/tests/unit/commands/test_replay_job_events.py b/awx/main/tests/unit/commands/test_replay_job_events.py new file mode 100644 index 0000000000..1031e55b4b --- /dev/null +++ b/awx/main/tests/unit/commands/test_replay_job_events.py @@ -0,0 +1,91 @@ +# Copyright (c) 2017 Ansible by Red Hat +# All Rights Reserved + +# Python +import pytest +import mock +from datetime import timedelta + +# Django +from django.utils import timezone + +# AWX +from awx.main.models import ( + Job, + JobEvent, +) +from awx.main.management.commands.replay_job_events import ( + ReplayJobEvents, +) + + +class TestReplayJobEvents(): + + @pytest.fixture + def epoch(self): + return timezone.now() + + @pytest.fixture + def job_events(self, epoch): + return [ + JobEvent(created=epoch), + JobEvent(created=epoch + timedelta(seconds=10)), + JobEvent(created=epoch + timedelta(seconds=20)), + JobEvent(created=epoch + timedelta(seconds=30)), + JobEvent(created=epoch + timedelta(seconds=31)), + JobEvent(created=epoch + timedelta(seconds=31, milliseconds=1)), + JobEvent(created=epoch + timedelta(seconds=31, microseconds=1, milliseconds=1)), + ] + + @pytest.fixture + def mock_serializer_fn(self): + class MockSerializer(): + data = dict() + + + def fn(job_event): + serialized = MockSerializer() + serialized.data['group_name'] = 'foobar' + return serialized + return fn + + @pytest.fixture + def replayer(self, mocker, job_events, mock_serializer_fn): + r = ReplayJobEvents() + r.get_serializer = lambda self: mock_serializer_fn + r.get_job = mocker.MagicMock(return_value=Job(id=3)) + r.sleep = mocker.MagicMock() + r.get_job_events = lambda self: job_events + r.replay_offset = lambda *args, **kwarg: 0 + return r + + @mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None) + def test_sleep(self, mocker, replayer): + replayer.run(3, 1) + + assert replayer.sleep.call_count == 6 + replayer.sleep.assert_has_calls([ + mock.call(10.0), + mock.call(10.0), + mock.call(10.0), + mock.call(1.0), + mock.call(0.001), + mock.call(0.000001), + ]) + + @mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None) + def test_speed(self, mocker, replayer): + replayer.run(3, 2) + + assert replayer.sleep.call_count == 6 + replayer.sleep.assert_has_calls([ + mock.call(5.0), + mock.call(5.0), + mock.call(5.0), + mock.call(0.5), + mock.call(0.0005), + mock.call(0.0000005), + ]) + + # TODO: Test replay_offset() + # TODO: Test stat generation