From 042c7ffe5bfc4166c72bc830f0e636d2f2967624 Mon Sep 17 00:00:00 2001 From: chris meyers Date: Tue, 27 Nov 2018 11:52:18 -0500 Subject: [PATCH 1/3] emit job status lifecycle in event replayer --- .../management/commands/replay_job_events.py | 43 ++++++++++++++++--- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/awx/main/management/commands/replay_job_events.py b/awx/main/management/commands/replay_job_events.py index da28870a60..7b634a450e 100644 --- a/awx/main/management/commands/replay_job_events.py +++ b/awx/main/management/commands/replay_job_events.py @@ -4,6 +4,7 @@ import sys import time import json +import random from django.utils import timezone from django.core.management.base import BaseCommand @@ -26,7 +27,18 @@ from awx.api.serializers import ( ) -class ReplayJobEvents(): +class JobStatusLifeCycle(): + def emit_job_status(self, job, status): + # {"status": "successful", "project_id": 13, "unified_job_id": 659, "group_name": "jobs"} + job.websocket_emit_status(status) + + def determine_job_event_finish_status_index(self, job_event_count, random_seed=7): + random.seed(random_seed) + job_event_index = random.randint(0, job_event_count - 1) + return job_event_index + + +class ReplayJobEvents(JobStatusLifeCycle): recording_start = None replay_start = None @@ -76,9 +88,10 @@ class ReplayJobEvents(): job_events = job.inventory_update_events.order_by('created') elif type(job) is SystemJob: job_events = job.system_job_events.order_by('created') - if job_events.count() == 0: + count = job_events.count() + if count == 0: raise RuntimeError("No events for job id {}".format(job.id)) - return job_events + return job_events, count def get_serializer(self, job): if type(job) is Job: @@ -95,7 +108,7 @@ class ReplayJobEvents(): raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job))) sys.exit(1) - def run(self, job_id, speed=1.0, verbosity=0, skip_range=[]): + def run(self, job_id, speed=1.0, verbosity=0, skip_range=[], random_seed=7): stats = { 'events_ontime': { 'total': 0, @@ -119,17 +132,30 @@ class ReplayJobEvents(): } try: job = self.get_job(job_id) - job_events = self.get_job_events(job) + job_events, job_event_count = self.get_job_events(job) serializer = self.get_serializer(job) except RuntimeError as e: print("{}".format(e.message)) sys.exit(1) + if random_seed == -1: + random_seed = 7 + je_previous = None + + self.emit_job_status(job, 'pending') + self.emit_job_status(job, 'waiting') + self.emit_job_status(job, 'running') + + finish_status_index = self.determine_job_event_finish_status_index(job_event_count, random_seed) + for n, je_current in enumerate(job_events): if je_current.counter in skip_range: continue + if n == finish_status_index: + self.emit_job_status(job, job.status) + if not je_previous: stats['recording_start'] = je_current.created self.start(je_current.created) @@ -146,7 +172,7 @@ class ReplayJobEvents(): print("recording: next job in {} seconds".format(recording_diff)) if replay_offset >= 0: replay_diff = recording_diff - replay_offset - + if replay_diff > 0: stats['events_ontime']['total'] += 1 if verbosity >= 3: @@ -210,12 +236,15 @@ class Command(BaseCommand): help='Speedup factor.') parser.add_argument('--skip-range', dest='skip_range', type=str, metavar='k', default='0:-1:1', help='Range of events to skip') + parser.add_argument('--random-seed', dest='random_seed', type=int, metavar='r', + default=7, help='Seed to use for random number generation') def handle(self, *args, **options): job_id = options.get('job_id') speed = options.get('speed') or 1 verbosity = options.get('verbosity') or 0 + random_seed = options.get('random_seed') skip = self._parse_slice_range(options.get('skip_range')) replayer = ReplayJobEvents() - replayer.run(job_id, speed, verbosity, skip) + replayer.run(job_id, speed=speed, verbosity=verbosity, skip_range=skip, random_seed=random_seed) From e214dcac85003b96a06bd7e25fe9174c567f437e Mon Sep 17 00:00:00 2001 From: chris meyers Date: Tue, 27 Nov 2018 12:45:49 -0500 Subject: [PATCH 2/3] add slowdown, final status delay, and debug * slowdown by using --speed 0.1 <-- decimal * optionally specify a delay between the event and the final status * debug mode where you can step through emitting the job events --- .../management/commands/replay_job_events.py | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/awx/main/management/commands/replay_job_events.py b/awx/main/management/commands/replay_job_events.py index 7b634a450e..2925e47606 100644 --- a/awx/main/management/commands/replay_job_events.py +++ b/awx/main/management/commands/replay_job_events.py @@ -32,7 +32,10 @@ class JobStatusLifeCycle(): # {"status": "successful", "project_id": 13, "unified_job_id": 659, "group_name": "jobs"} job.websocket_emit_status(status) - def determine_job_event_finish_status_index(self, job_event_count, random_seed=7): + def determine_job_event_finish_status_index(self, job_event_count, random_seed): + if random_seed == 0: + return job_event_count - 1 + random.seed(random_seed) job_event_index = random.randint(0, job_event_count - 1) return job_event_index @@ -108,7 +111,7 @@ class ReplayJobEvents(JobStatusLifeCycle): raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job))) sys.exit(1) - def run(self, job_id, speed=1.0, verbosity=0, skip_range=[], random_seed=7): + def run(self, job_id, speed=1.0, verbosity=0, skip_range=[], random_seed=0, final_status_delay=0, debug=False): stats = { 'events_ontime': { 'total': 0, @@ -138,9 +141,6 @@ class ReplayJobEvents(JobStatusLifeCycle): print("{}".format(e.message)) sys.exit(1) - if random_seed == -1: - random_seed = 7 - je_previous = None self.emit_job_status(job, 'pending') @@ -153,8 +153,8 @@ class ReplayJobEvents(JobStatusLifeCycle): if je_current.counter in skip_range: continue - if n == finish_status_index: - self.emit_job_status(job, job.status) + if debug: + raw_input("{} of {}:".format(n, job_event_count)) if not je_previous: stats['recording_start'] = je_current.created @@ -193,6 +193,10 @@ class ReplayJobEvents(JobStatusLifeCycle): stats['events_total'] += 1 je_previous = je_current + if n == finish_status_index: + self.sleep(final_status_delay) + self.emit_job_status(job, job.status) + if stats['events_total'] > 2: stats['replay_end'] = self.now() stats['replay_duration'] = (stats['replay_end'] - stats['replay_start']).total_seconds() @@ -232,19 +236,26 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument('--job_id', dest='job_id', type=int, metavar='j', help='Id of the job to replay (job or adhoc)') - parser.add_argument('--speed', dest='speed', type=int, metavar='s', + parser.add_argument('--speed', dest='speed', type=float, metavar='s', help='Speedup factor.') parser.add_argument('--skip-range', dest='skip_range', type=str, metavar='k', default='0:-1:1', help='Range of events to skip') parser.add_argument('--random-seed', dest='random_seed', type=int, metavar='r', - default=7, help='Seed to use for random number generation') + default=0, help='Random number generator seed to use when determining job_event index to emit final job status') + parser.add_argument('--final-status-delay', dest='final_status_delay', type=float, metavar='f', + default=0, help='Delay between event and final status emit') + parser.add_argument('--debug', dest='debug', type=bool, metavar='d', + default=False, help='Enable step mode to control emission of job events one at a time.') def handle(self, *args, **options): job_id = options.get('job_id') speed = options.get('speed') or 1 verbosity = options.get('verbosity') or 0 random_seed = options.get('random_seed') + final_status_delay = options.get('final_status_delay') + debug = options.get('debug') skip = self._parse_slice_range(options.get('skip_range')) replayer = ReplayJobEvents() - replayer.run(job_id, speed=speed, verbosity=verbosity, skip_range=skip, random_seed=random_seed) + replayer.run(job_id, speed=speed, verbosity=verbosity, skip_range=skip, random_seed=random_seed, + final_status_delay=final_status_delay, debug=debug) From 83760deb9d83160208bd6e4cde81737c3cf1660d Mon Sep 17 00:00:00 2001 From: chris meyers Date: Wed, 28 Nov 2018 13:33:24 -0500 Subject: [PATCH 3/3] align tests with new replay get_job interface --- awx/main/management/commands/replay_job_events.py | 3 ++- awx/main/tests/unit/commands/test_replay_job_events.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/awx/main/management/commands/replay_job_events.py b/awx/main/management/commands/replay_job_events.py index 2925e47606..68634092f6 100644 --- a/awx/main/management/commands/replay_job_events.py +++ b/awx/main/management/commands/replay_job_events.py @@ -194,7 +194,8 @@ class ReplayJobEvents(JobStatusLifeCycle): je_previous = je_current if n == finish_status_index: - self.sleep(final_status_delay) + if final_status_delay != 0: + self.sleep(final_status_delay) self.emit_job_status(job, job.status) if stats['events_total'] > 2: diff --git a/awx/main/tests/unit/commands/test_replay_job_events.py b/awx/main/tests/unit/commands/test_replay_job_events.py index 1031e55b4b..36b668a7bf 100644 --- a/awx/main/tests/unit/commands/test_replay_job_events.py +++ b/awx/main/tests/unit/commands/test_replay_job_events.py @@ -55,8 +55,9 @@ class TestReplayJobEvents(): r.get_serializer = lambda self: mock_serializer_fn r.get_job = mocker.MagicMock(return_value=Job(id=3)) r.sleep = mocker.MagicMock() - r.get_job_events = lambda self: job_events + r.get_job_events = lambda self: (job_events, len(job_events)) r.replay_offset = lambda *args, **kwarg: 0 + r.emit_job_status = lambda job, status: True return r @mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None)