mirror of
https://github.com/ansible/awx.git
synced 2026-03-05 18:51:06 -03:30
emit job status lifecycle in event replayer
This commit is contained in:
@@ -4,6 +4,7 @@
|
|||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import json
|
import json
|
||||||
|
import random
|
||||||
|
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from django.core.management.base import BaseCommand
|
from django.core.management.base import BaseCommand
|
||||||
@@ -26,7 +27,18 @@ from awx.api.serializers import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class ReplayJobEvents():
|
class JobStatusLifeCycle():
|
||||||
|
def emit_job_status(self, job, status):
|
||||||
|
# {"status": "successful", "project_id": 13, "unified_job_id": 659, "group_name": "jobs"}
|
||||||
|
job.websocket_emit_status(status)
|
||||||
|
|
||||||
|
def determine_job_event_finish_status_index(self, job_event_count, random_seed=7):
|
||||||
|
random.seed(random_seed)
|
||||||
|
job_event_index = random.randint(0, job_event_count - 1)
|
||||||
|
return job_event_index
|
||||||
|
|
||||||
|
|
||||||
|
class ReplayJobEvents(JobStatusLifeCycle):
|
||||||
|
|
||||||
recording_start = None
|
recording_start = None
|
||||||
replay_start = None
|
replay_start = None
|
||||||
@@ -76,9 +88,10 @@ class ReplayJobEvents():
|
|||||||
job_events = job.inventory_update_events.order_by('created')
|
job_events = job.inventory_update_events.order_by('created')
|
||||||
elif type(job) is SystemJob:
|
elif type(job) is SystemJob:
|
||||||
job_events = job.system_job_events.order_by('created')
|
job_events = job.system_job_events.order_by('created')
|
||||||
if job_events.count() == 0:
|
count = job_events.count()
|
||||||
|
if count == 0:
|
||||||
raise RuntimeError("No events for job id {}".format(job.id))
|
raise RuntimeError("No events for job id {}".format(job.id))
|
||||||
return job_events
|
return job_events, count
|
||||||
|
|
||||||
def get_serializer(self, job):
|
def get_serializer(self, job):
|
||||||
if type(job) is Job:
|
if type(job) is Job:
|
||||||
@@ -95,7 +108,7 @@ class ReplayJobEvents():
|
|||||||
raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job)))
|
raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job)))
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
def run(self, job_id, speed=1.0, verbosity=0, skip_range=[]):
|
def run(self, job_id, speed=1.0, verbosity=0, skip_range=[], random_seed=7):
|
||||||
stats = {
|
stats = {
|
||||||
'events_ontime': {
|
'events_ontime': {
|
||||||
'total': 0,
|
'total': 0,
|
||||||
@@ -119,17 +132,30 @@ class ReplayJobEvents():
|
|||||||
}
|
}
|
||||||
try:
|
try:
|
||||||
job = self.get_job(job_id)
|
job = self.get_job(job_id)
|
||||||
job_events = self.get_job_events(job)
|
job_events, job_event_count = self.get_job_events(job)
|
||||||
serializer = self.get_serializer(job)
|
serializer = self.get_serializer(job)
|
||||||
except RuntimeError as e:
|
except RuntimeError as e:
|
||||||
print("{}".format(e.message))
|
print("{}".format(e.message))
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
|
if random_seed == -1:
|
||||||
|
random_seed = 7
|
||||||
|
|
||||||
je_previous = None
|
je_previous = None
|
||||||
|
|
||||||
|
self.emit_job_status(job, 'pending')
|
||||||
|
self.emit_job_status(job, 'waiting')
|
||||||
|
self.emit_job_status(job, 'running')
|
||||||
|
|
||||||
|
finish_status_index = self.determine_job_event_finish_status_index(job_event_count, random_seed)
|
||||||
|
|
||||||
for n, je_current in enumerate(job_events):
|
for n, je_current in enumerate(job_events):
|
||||||
if je_current.counter in skip_range:
|
if je_current.counter in skip_range:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if n == finish_status_index:
|
||||||
|
self.emit_job_status(job, job.status)
|
||||||
|
|
||||||
if not je_previous:
|
if not je_previous:
|
||||||
stats['recording_start'] = je_current.created
|
stats['recording_start'] = je_current.created
|
||||||
self.start(je_current.created)
|
self.start(je_current.created)
|
||||||
@@ -146,7 +172,7 @@ class ReplayJobEvents():
|
|||||||
print("recording: next job in {} seconds".format(recording_diff))
|
print("recording: next job in {} seconds".format(recording_diff))
|
||||||
if replay_offset >= 0:
|
if replay_offset >= 0:
|
||||||
replay_diff = recording_diff - replay_offset
|
replay_diff = recording_diff - replay_offset
|
||||||
|
|
||||||
if replay_diff > 0:
|
if replay_diff > 0:
|
||||||
stats['events_ontime']['total'] += 1
|
stats['events_ontime']['total'] += 1
|
||||||
if verbosity >= 3:
|
if verbosity >= 3:
|
||||||
@@ -210,12 +236,15 @@ class Command(BaseCommand):
|
|||||||
help='Speedup factor.')
|
help='Speedup factor.')
|
||||||
parser.add_argument('--skip-range', dest='skip_range', type=str, metavar='k',
|
parser.add_argument('--skip-range', dest='skip_range', type=str, metavar='k',
|
||||||
default='0:-1:1', help='Range of events to skip')
|
default='0:-1:1', help='Range of events to skip')
|
||||||
|
parser.add_argument('--random-seed', dest='random_seed', type=int, metavar='r',
|
||||||
|
default=7, help='Seed to use for random number generation')
|
||||||
|
|
||||||
def handle(self, *args, **options):
|
def handle(self, *args, **options):
|
||||||
job_id = options.get('job_id')
|
job_id = options.get('job_id')
|
||||||
speed = options.get('speed') or 1
|
speed = options.get('speed') or 1
|
||||||
verbosity = options.get('verbosity') or 0
|
verbosity = options.get('verbosity') or 0
|
||||||
|
random_seed = options.get('random_seed')
|
||||||
skip = self._parse_slice_range(options.get('skip_range'))
|
skip = self._parse_slice_range(options.get('skip_range'))
|
||||||
|
|
||||||
replayer = ReplayJobEvents()
|
replayer = ReplayJobEvents()
|
||||||
replayer.run(job_id, speed, verbosity, skip)
|
replayer.run(job_id, speed=speed, verbosity=verbosity, skip_range=skip, random_seed=random_seed)
|
||||||
|
|||||||
Reference in New Issue
Block a user