Merge pull request #2810 from chrismeyersfsu/feature-replay_job_status

emit job status lifecycle in event replayer

Reviewed-by: https://github.com/softwarefactory-project-zuul[bot]
This commit is contained in:
softwarefactory-project-zuul[bot] 2018-11-28 18:57:56 +00:00 committed by GitHub
commit f57fa9d1fb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 9 deletions

View File

@ -4,6 +4,7 @@
import sys
import time
import json
import random
from django.utils import timezone
from django.core.management.base import BaseCommand
@ -26,7 +27,21 @@ from awx.api.serializers import (
)
class ReplayJobEvents():
class JobStatusLifeCycle():
def emit_job_status(self, job, status):
# {"status": "successful", "project_id": 13, "unified_job_id": 659, "group_name": "jobs"}
job.websocket_emit_status(status)
def determine_job_event_finish_status_index(self, job_event_count, random_seed):
if random_seed == 0:
return job_event_count - 1
random.seed(random_seed)
job_event_index = random.randint(0, job_event_count - 1)
return job_event_index
class ReplayJobEvents(JobStatusLifeCycle):
recording_start = None
replay_start = None
@ -76,9 +91,10 @@ class ReplayJobEvents():
job_events = job.inventory_update_events.order_by('created')
elif type(job) is SystemJob:
job_events = job.system_job_events.order_by('created')
if job_events.count() == 0:
count = job_events.count()
if count == 0:
raise RuntimeError("No events for job id {}".format(job.id))
return job_events
return job_events, count
def get_serializer(self, job):
if type(job) is Job:
@ -95,7 +111,7 @@ class ReplayJobEvents():
raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job)))
sys.exit(1)
def run(self, job_id, speed=1.0, verbosity=0, skip_range=[]):
def run(self, job_id, speed=1.0, verbosity=0, skip_range=[], random_seed=0, final_status_delay=0, debug=False):
stats = {
'events_ontime': {
'total': 0,
@ -119,17 +135,27 @@ class ReplayJobEvents():
}
try:
job = self.get_job(job_id)
job_events = self.get_job_events(job)
job_events, job_event_count = self.get_job_events(job)
serializer = self.get_serializer(job)
except RuntimeError as e:
print("{}".format(e.message))
sys.exit(1)
je_previous = None
self.emit_job_status(job, 'pending')
self.emit_job_status(job, 'waiting')
self.emit_job_status(job, 'running')
finish_status_index = self.determine_job_event_finish_status_index(job_event_count, random_seed)
for n, je_current in enumerate(job_events):
if je_current.counter in skip_range:
continue
if debug:
raw_input("{} of {}:".format(n, job_event_count))
if not je_previous:
stats['recording_start'] = je_current.created
self.start(je_current.created)
@ -146,7 +172,7 @@ class ReplayJobEvents():
print("recording: next job in {} seconds".format(recording_diff))
if replay_offset >= 0:
replay_diff = recording_diff - replay_offset
if replay_diff > 0:
stats['events_ontime']['total'] += 1
if verbosity >= 3:
@ -167,6 +193,11 @@ class ReplayJobEvents():
stats['events_total'] += 1
je_previous = je_current
if n == finish_status_index:
if final_status_delay != 0:
self.sleep(final_status_delay)
self.emit_job_status(job, job.status)
if stats['events_total'] > 2:
stats['replay_end'] = self.now()
stats['replay_duration'] = (stats['replay_end'] - stats['replay_start']).total_seconds()
@ -206,16 +237,26 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--job_id', dest='job_id', type=int, metavar='j',
help='Id of the job to replay (job or adhoc)')
parser.add_argument('--speed', dest='speed', type=int, metavar='s',
parser.add_argument('--speed', dest='speed', type=float, metavar='s',
help='Speedup factor.')
parser.add_argument('--skip-range', dest='skip_range', type=str, metavar='k',
default='0:-1:1', help='Range of events to skip')
parser.add_argument('--random-seed', dest='random_seed', type=int, metavar='r',
default=0, help='Random number generator seed to use when determining job_event index to emit final job status')
parser.add_argument('--final-status-delay', dest='final_status_delay', type=float, metavar='f',
default=0, help='Delay between event and final status emit')
parser.add_argument('--debug', dest='debug', type=bool, metavar='d',
default=False, help='Enable step mode to control emission of job events one at a time.')
def handle(self, *args, **options):
job_id = options.get('job_id')
speed = options.get('speed') or 1
verbosity = options.get('verbosity') or 0
random_seed = options.get('random_seed')
final_status_delay = options.get('final_status_delay')
debug = options.get('debug')
skip = self._parse_slice_range(options.get('skip_range'))
replayer = ReplayJobEvents()
replayer.run(job_id, speed, verbosity, skip)
replayer.run(job_id, speed=speed, verbosity=verbosity, skip_range=skip, random_seed=random_seed,
final_status_delay=final_status_delay, debug=debug)

View File

@ -55,8 +55,9 @@ class TestReplayJobEvents():
r.get_serializer = lambda self: mock_serializer_fn
r.get_job = mocker.MagicMock(return_value=Job(id=3))
r.sleep = mocker.MagicMock()
r.get_job_events = lambda self: job_events
r.get_job_events = lambda self: (job_events, len(job_events))
r.replay_offset = lambda *args, **kwarg: 0
r.emit_job_status = lambda job, status: True
return r
@mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None)