diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 873f822171..5ac87cfee2 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -3138,6 +3138,26 @@ class JobEventSerializer(BaseSerializer): return ret +class JobEventWebSocketSerializer(JobEventSerializer): + created = serializers.SerializerMethodField() + modified = serializers.SerializerMethodField() + event_name = serializers.CharField(source='event') + group_name = serializers.SerializerMethodField() + + class Meta: + model = JobEvent + fields = ('*', 'event_name', 'group_name',) + + def get_created(self, obj): + return obj.created.isoformat() + + def get_modified(self, obj): + return obj.modified.isoformat() + + def get_group_name(self, obj): + return 'job_events' + + class AdHocCommandEventSerializer(BaseSerializer): event_display = serializers.CharField(source='get_event_display', read_only=True) @@ -3177,6 +3197,26 @@ class AdHocCommandEventSerializer(BaseSerializer): return ret +class AdHocCommandEventWebSocketSerializer(AdHocCommandEventSerializer): + created = serializers.SerializerMethodField() + modified = serializers.SerializerMethodField() + event_name = serializers.CharField(source='event') + group_name = serializers.SerializerMethodField() + + class Meta: + model = AdHocCommandEvent + fields = ('*', 'event_name', 'group_name',) + + def get_created(self, obj): + return obj.created.isoformat() + + def get_modified(self, obj): + return obj.modified.isoformat() + + def get_group_name(self, obj): + return 'ad_hoc_command_events' + + class JobLaunchSerializer(BaseSerializer): passwords_needed_to_start = serializers.ReadOnlyField() diff --git a/awx/main/management/commands/replay_job_events.py b/awx/main/management/commands/replay_job_events.py new file mode 100644 index 0000000000..d7387ae080 --- /dev/null +++ b/awx/main/management/commands/replay_job_events.py @@ -0,0 +1,182 @@ +# Copyright (c) 2017 Ansible by Red Hat +# All Rights Reserved. + +import sys +import time +import json +from optparse import make_option + +from django.utils import timezone +from django.core.management.base import NoArgsCommand + +from awx.main.models import ( + UnifiedJob, + Job, + AdHocCommand, +) +from awx.main.consumers import emit_channel_notification +from awx.api.serializers import ( + JobEventWebSocketSerializer, + AdHocCommandEventWebSocketSerializer, +) + + +class ReplayJobEvents(): + + recording_start = None + replay_start = None + + def now(self): + return timezone.now() + + def start(self, first_event_created): + self.recording_start = first_event_created + self.replay_start = self.now() + + def lateness(self, now, created): + time_passed = now - self.recording_start + job_event_time = created - self.replay_start + + return (time_passed - job_event_time).total_seconds() + + def get_job(self, job_id): + try: + unified_job = UnifiedJob.objects.get(id=job_id) + except UnifiedJob.DoesNotExist: + print("UnifiedJob {} not found.".format(job_id)) + sys.exit(1) + + return unified_job.get_real_instance() + + def sleep(self, seconds): + time.sleep(seconds) + + def replay_elapsed(self): + return (self.now() - self.replay_start) + + def recording_elapsed(self, created): + return (created - self.recording_start) + + def replay_offset(self, created, speed): + return self.replay_elapsed().total_seconds() - (self.recording_elapsed(created).total_seconds() * (1.0 / speed)) + + def get_job_events(self, job): + job_events = job.job_events.order_by('created') + if job_events.count() == 0: + raise RuntimeError("No events for job id {}".format(job.id)) + return job_events + + def get_serializer(self, job): + if type(job) is Job: + return JobEventWebSocketSerializer + elif type(job) is AdHocCommand: + return AdHocCommandEventWebSocketSerializer + else: + raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job))) + sys.exit(1) + + def run(self, job_id, speed=1.0, verbosity=0): + stats = { + 'events_ontime': { + 'total': 0, + 'percentage': 0, + }, + 'events_late': { + 'total': 0, + 'percentage': 0, + 'lateness_total': 0, + 'lateness_average': 0, + }, + 'events_total': 0, + 'events_distance_total': 0, + 'events_distance_average': 0, + 'recording_start': 0, + 'recording_end': 0, + 'recording_duration': 0, + 'replay_start': 0, + 'replay_end': 0, + 'replay_duration': 0, + } + try: + job = self.get_job(job_id) + job_events = self.get_job_events(job) + serializer = self.get_serializer(job) + except RuntimeError as e: + print("{}".format(e.message)) + sys.exit(1) + + je_previous = None + for je_current in job_events: + if not je_previous: + stats['recording_start'] = je_current.created + self.start(je_current.created) + stats['replay_start'] = self.replay_start + je_previous = je_current + + je_serialized = serializer(je_current).data + emit_channel_notification('{}-{}'.format(je_serialized['group_name'], job.id), je_serialized) + + replay_offset = self.replay_offset(je_previous.created, speed) + recording_diff = (je_current.created - je_previous.created).total_seconds() * (1.0 / speed) + stats['events_distance_total'] += recording_diff + if verbosity >= 3: + print("recording: next job in {} seconds".format(recording_diff)) + if replay_offset >= 0: + replay_diff = recording_diff - replay_offset + + if replay_diff > 0: + stats['events_ontime']['total'] += 1 + if verbosity >= 3: + print("\treplay: sleep for {} seconds".format(replay_diff)) + self.sleep(replay_diff) + else: + stats['events_late']['total'] += 1 + stats['events_late']['lateness_total'] += (replay_diff * -1) + if verbosity >= 3: + print("\treplay: too far behind to sleep {} seconds".format(replay_diff)) + else: + replay_offset = self.replay_offset(je_current.created, speed) + stats['events_late']['lateness_total'] += (replay_offset * -1) + stats['events_late']['total'] += 1 + if verbosity >= 3: + print("\treplay: behind by {} seconds".format(replay_offset)) + + stats['events_total'] += 1 + je_previous = je_current + + stats['replay_end'] = self.now() + stats['replay_duration'] = (stats['replay_end'] - stats['replay_start']).total_seconds() + stats['replay_start'] = stats['replay_start'].isoformat() + stats['replay_end'] = stats['replay_end'].isoformat() + + stats['recording_end'] = je_current.created + stats['recording_duration'] = (stats['recording_end'] - stats['recording_start']).total_seconds() + stats['recording_start'] = stats['recording_start'].isoformat() + stats['recording_end'] = stats['recording_end'].isoformat() + + stats['events_ontime']['percentage'] = (stats['events_ontime']['total'] / float(stats['events_total'])) * 100.00 + stats['events_late']['percentage'] = (stats['events_late']['total'] / float(stats['events_total'])) * 100.00 + stats['events_distance_average'] = stats['events_distance_total'] / stats['events_total'] + stats['events_late']['lateness_average'] = stats['events_late']['lateness_total'] / stats['events_late']['total'] + if verbosity >= 2: + print(json.dumps(stats, indent=4, sort_keys=True)) + + +class Command(NoArgsCommand): + + help = 'Replay job events over websockets ordered by created on date.' + + option_list = NoArgsCommand.option_list + ( + make_option('--job_id', dest='job_id', type='int', metavar='j', + help='Id of the job to replay (job or adhoc)'), + make_option('--speed', dest='speed', type='int', metavar='s', + help='Speedup factor.'), + ) + + def handle_noargs(self, **options): + job_id = options.get('job_id') + speed = options.get('speed') or 1 + verbosity = options.get('verbosity') or 0 + + replayer = ReplayJobEvents() + replayer.run(job_id, speed, verbosity) diff --git a/awx/main/signals.py b/awx/main/signals.py index 77fd91a5c3..efda19f0cd 100644 --- a/awx/main/signals.py +++ b/awx/main/signals.py @@ -45,12 +45,7 @@ def emit_job_event_detail(sender, **kwargs): instance = kwargs['instance'] created = kwargs['created'] if created: - event_serialized = JobEventSerializer(instance).data - event_serialized['id'] = instance.id - event_serialized["created"] = event_serialized["created"].isoformat() - event_serialized["modified"] = event_serialized["modified"].isoformat() - event_serialized["event_name"] = instance.event - event_serialized["group_name"] = "job_events" + event_serialized = JobEventWebSocketSerializer(instance).data emit_channel_notification('job_events-' + str(instance.job.id), event_serialized) @@ -58,12 +53,7 @@ def emit_ad_hoc_command_event_detail(sender, **kwargs): instance = kwargs['instance'] created = kwargs['created'] if created: - event_serialized = AdHocCommandEventSerializer(instance).data - event_serialized['id'] = instance.id - event_serialized["created"] = event_serialized["created"].isoformat() - event_serialized["modified"] = event_serialized["modified"].isoformat() - event_serialized["event_name"] = instance.event - event_serialized["group_name"] = "ad_hoc_command_events" + event_serialized = AdHocCommandEventWebSocketSerializer(instance).data emit_channel_notification('ad_hoc_command_events-' + str(instance.ad_hoc_command_id), event_serialized) diff --git a/awx/main/tests/unit/commands/test_replay_job_events.py b/awx/main/tests/unit/commands/test_replay_job_events.py new file mode 100644 index 0000000000..1031e55b4b --- /dev/null +++ b/awx/main/tests/unit/commands/test_replay_job_events.py @@ -0,0 +1,91 @@ +# Copyright (c) 2017 Ansible by Red Hat +# All Rights Reserved + +# Python +import pytest +import mock +from datetime import timedelta + +# Django +from django.utils import timezone + +# AWX +from awx.main.models import ( + Job, + JobEvent, +) +from awx.main.management.commands.replay_job_events import ( + ReplayJobEvents, +) + + +class TestReplayJobEvents(): + + @pytest.fixture + def epoch(self): + return timezone.now() + + @pytest.fixture + def job_events(self, epoch): + return [ + JobEvent(created=epoch), + JobEvent(created=epoch + timedelta(seconds=10)), + JobEvent(created=epoch + timedelta(seconds=20)), + JobEvent(created=epoch + timedelta(seconds=30)), + JobEvent(created=epoch + timedelta(seconds=31)), + JobEvent(created=epoch + timedelta(seconds=31, milliseconds=1)), + JobEvent(created=epoch + timedelta(seconds=31, microseconds=1, milliseconds=1)), + ] + + @pytest.fixture + def mock_serializer_fn(self): + class MockSerializer(): + data = dict() + + + def fn(job_event): + serialized = MockSerializer() + serialized.data['group_name'] = 'foobar' + return serialized + return fn + + @pytest.fixture + def replayer(self, mocker, job_events, mock_serializer_fn): + r = ReplayJobEvents() + r.get_serializer = lambda self: mock_serializer_fn + r.get_job = mocker.MagicMock(return_value=Job(id=3)) + r.sleep = mocker.MagicMock() + r.get_job_events = lambda self: job_events + r.replay_offset = lambda *args, **kwarg: 0 + return r + + @mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None) + def test_sleep(self, mocker, replayer): + replayer.run(3, 1) + + assert replayer.sleep.call_count == 6 + replayer.sleep.assert_has_calls([ + mock.call(10.0), + mock.call(10.0), + mock.call(10.0), + mock.call(1.0), + mock.call(0.001), + mock.call(0.000001), + ]) + + @mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None) + def test_speed(self, mocker, replayer): + replayer.run(3, 2) + + assert replayer.sleep.call_count == 6 + replayer.sleep.assert_has_calls([ + mock.call(5.0), + mock.call(5.0), + mock.call(5.0), + mock.call(0.5), + mock.call(0.0005), + mock.call(0.0000005), + ]) + + # TODO: Test replay_offset() + # TODO: Test stat generation