mirror of
https://github.com/ansible/awx.git
synced 2026-04-11 04:59:22 -02:30
add speedup support to event replay and stats
* add tests * add verbosity support
This commit is contained in:
@@ -1,9 +1,12 @@
|
|||||||
# Copyright (c) 2015 Ansible, Inc.
|
# Copyright (c) 2017 Ansible by Red Hat
|
||||||
# All Rights Reserved
|
# All Rights Reserved.
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
|
import json
|
||||||
from optparse import make_option
|
from optparse import make_option
|
||||||
|
|
||||||
|
from django.utils import timezone
|
||||||
from django.core.management.base import NoArgsCommand
|
from django.core.management.base import NoArgsCommand
|
||||||
|
|
||||||
from awx.main.models import (
|
from awx.main.models import (
|
||||||
@@ -18,37 +21,162 @@ from awx.api.serializers import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class Command(NoArgsCommand):
|
class ReplayJobEvents():
|
||||||
|
|
||||||
help = 'Replay job events over websockets'
|
recording_start = None
|
||||||
|
replay_start = None
|
||||||
|
|
||||||
option_list = NoArgsCommand.option_list + (
|
def now(self):
|
||||||
make_option('--job_id', dest='job_id', type='int', metavar='j',
|
return timezone.now()
|
||||||
help='Id of the job to replay (job or adhoc)'),
|
|
||||||
)
|
|
||||||
|
|
||||||
def replay_job_events(self, job_id):
|
def start(self, first_event_created):
|
||||||
|
self.recording_start = first_event_created
|
||||||
|
self.replay_start = self.now()
|
||||||
|
|
||||||
|
def lateness(self, now, created):
|
||||||
|
time_passed = now - self.recording_start
|
||||||
|
job_event_time = created - self.replay_start
|
||||||
|
|
||||||
|
return (time_passed - job_event_time).total_seconds()
|
||||||
|
|
||||||
|
def get_job(self, job_id):
|
||||||
try:
|
try:
|
||||||
unified_job = UnifiedJob.objects.get(id=job_id)
|
unified_job = UnifiedJob.objects.get(id=job_id)
|
||||||
except UnifiedJob.DoesNotExist:
|
except UnifiedJob.DoesNotExist:
|
||||||
print("UnifiedJob {} not found.".format(job_id))
|
print("UnifiedJob {} not found.".format(job_id))
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
job = unified_job.get_real_instance()
|
return unified_job.get_real_instance()
|
||||||
|
|
||||||
|
def sleep(self, seconds):
|
||||||
|
time.sleep(seconds)
|
||||||
|
|
||||||
|
def replay_elapsed(self):
|
||||||
|
return (self.now() - self.replay_start)
|
||||||
|
|
||||||
|
def recording_elapsed(self, created):
|
||||||
|
return (created - self.recording_start)
|
||||||
|
|
||||||
|
def replay_offset(self, created, speed):
|
||||||
|
return self.replay_elapsed().total_seconds() - (self.recording_elapsed(created).total_seconds() * (1.0 / speed))
|
||||||
|
|
||||||
|
def get_job_events(self, job):
|
||||||
job_events = job.job_events.order_by('created')
|
job_events = job.job_events.order_by('created')
|
||||||
serializer = None
|
if job_events.count() == 0:
|
||||||
|
raise RuntimeError("No events for job id {}".format(job.id))
|
||||||
|
return job_events
|
||||||
|
|
||||||
|
def get_serializer(self, job):
|
||||||
if type(job) is Job:
|
if type(job) is Job:
|
||||||
serializer = JobEventWebSocketSerializer
|
return JobEventWebSocketSerializer
|
||||||
elif type(job) is AdHocCommand:
|
elif type(job) is AdHocCommand:
|
||||||
serializer = AdHocCommandEventWebSocketSerializer
|
return AdHocCommandEventWebSocketSerializer
|
||||||
else:
|
else:
|
||||||
print("Job is of type {} and replay is not yet supported.".format(type(job)))
|
raise RuntimeError("Job is of type {} and replay is not yet supported.".format(type(job)))
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
for je in job_events:
|
def run(self, job_id, speed=1.0, verbosity=0):
|
||||||
je_serialized = serializer(je).data
|
stats = {
|
||||||
|
'events_ontime': {
|
||||||
|
'total': 0,
|
||||||
|
'percentage': 0,
|
||||||
|
},
|
||||||
|
'events_late': {
|
||||||
|
'total': 0,
|
||||||
|
'percentage': 0,
|
||||||
|
'lateness_total': 0,
|
||||||
|
'lateness_average': 0,
|
||||||
|
},
|
||||||
|
'events_total': 0,
|
||||||
|
'events_distance_total': 0,
|
||||||
|
'events_distance_average': 0,
|
||||||
|
'recording_start': 0,
|
||||||
|
'recording_end': 0,
|
||||||
|
'recording_duration': 0,
|
||||||
|
'replay_start': 0,
|
||||||
|
'replay_end': 0,
|
||||||
|
'replay_duration': 0,
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
job = self.get_job(job_id)
|
||||||
|
job_events = self.get_job_events(job)
|
||||||
|
serializer = self.get_serializer(job)
|
||||||
|
except RuntimeError as e:
|
||||||
|
print("{}".format(e.message))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
je_previous = None
|
||||||
|
for je_current in job_events:
|
||||||
|
if not je_previous:
|
||||||
|
stats['recording_start'] = je_current.created
|
||||||
|
self.start(je_current.created)
|
||||||
|
stats['replay_start'] = self.replay_start
|
||||||
|
je_previous = je_current
|
||||||
|
|
||||||
|
je_serialized = serializer(je_current).data
|
||||||
emit_channel_notification('{}-{}'.format(je_serialized['group_name'], job.id), je_serialized)
|
emit_channel_notification('{}-{}'.format(je_serialized['group_name'], job.id), je_serialized)
|
||||||
|
|
||||||
|
replay_offset = self.replay_offset(je_previous.created, speed)
|
||||||
|
recording_diff = (je_current.created - je_previous.created).total_seconds() * (1.0 / speed)
|
||||||
|
stats['events_distance_total'] += recording_diff
|
||||||
|
if verbosity >= 3:
|
||||||
|
print("recording: next job in {} seconds".format(recording_diff))
|
||||||
|
if replay_offset >= 0:
|
||||||
|
replay_diff = recording_diff - replay_offset
|
||||||
|
|
||||||
|
if replay_diff > 0:
|
||||||
|
stats['events_ontime']['total'] += 1
|
||||||
|
if verbosity >= 3:
|
||||||
|
print("\treplay: sleep for {} seconds".format(replay_diff))
|
||||||
|
self.sleep(replay_diff)
|
||||||
|
else:
|
||||||
|
stats['events_late']['total'] += 1
|
||||||
|
stats['events_late']['lateness_total'] += (replay_diff * -1)
|
||||||
|
if verbosity >= 3:
|
||||||
|
print("\treplay: too far behind to sleep {} seconds".format(replay_diff))
|
||||||
|
else:
|
||||||
|
replay_offset = self.replay_offset(je_current.created, speed)
|
||||||
|
stats['events_late']['lateness_total'] += (replay_offset * -1)
|
||||||
|
stats['events_late']['total'] += 1
|
||||||
|
if verbosity >= 3:
|
||||||
|
print("\treplay: behind by {} seconds".format(replay_offset))
|
||||||
|
|
||||||
|
stats['events_total'] += 1
|
||||||
|
je_previous = je_current
|
||||||
|
|
||||||
|
stats['replay_end'] = self.now()
|
||||||
|
stats['replay_duration'] = (stats['replay_end'] - stats['replay_start']).total_seconds()
|
||||||
|
stats['replay_start'] = stats['replay_start'].isoformat()
|
||||||
|
stats['replay_end'] = stats['replay_end'].isoformat()
|
||||||
|
|
||||||
|
stats['recording_end'] = je_current.created
|
||||||
|
stats['recording_duration'] = (stats['recording_end'] - stats['recording_start']).total_seconds()
|
||||||
|
stats['recording_start'] = stats['recording_start'].isoformat()
|
||||||
|
stats['recording_end'] = stats['recording_end'].isoformat()
|
||||||
|
|
||||||
|
stats['events_ontime']['percentage'] = (stats['events_ontime']['total'] / float(stats['events_total'])) * 100.00
|
||||||
|
stats['events_late']['percentage'] = (stats['events_late']['total'] / float(stats['events_total'])) * 100.00
|
||||||
|
stats['events_distance_average'] = stats['events_distance_total'] / stats['events_total']
|
||||||
|
stats['events_late']['lateness_average'] = stats['events_late']['lateness_total'] / stats['events_late']['total']
|
||||||
|
if verbosity >= 2:
|
||||||
|
print(json.dumps(stats, indent=4, sort_keys=True))
|
||||||
|
|
||||||
|
|
||||||
|
class Command(NoArgsCommand):
|
||||||
|
|
||||||
|
help = 'Replay job events over websockets ordered by created on date.'
|
||||||
|
|
||||||
|
option_list = NoArgsCommand.option_list + (
|
||||||
|
make_option('--job_id', dest='job_id', type='int', metavar='j',
|
||||||
|
help='Id of the job to replay (job or adhoc)'),
|
||||||
|
make_option('--speed', dest='speed', type='int', metavar='s',
|
||||||
|
help='Speedup factor.'),
|
||||||
|
)
|
||||||
|
|
||||||
def handle_noargs(self, **options):
|
def handle_noargs(self, **options):
|
||||||
self.job_id = options.get('job_id')
|
job_id = options.get('job_id')
|
||||||
self.replay_job_events(self.job_id)
|
speed = options.get('speed') or 1
|
||||||
|
verbosity = options.get('verbosity') or 0
|
||||||
|
|
||||||
|
replayer = ReplayJobEvents()
|
||||||
|
replayer.run(job_id, speed, verbosity)
|
||||||
|
|||||||
91
awx/main/tests/unit/commands/test_replay_job_events.py
Normal file
91
awx/main/tests/unit/commands/test_replay_job_events.py
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
# Copyright (c) 2017 Ansible by Red Hat
|
||||||
|
# All Rights Reserved
|
||||||
|
|
||||||
|
# Python
|
||||||
|
import pytest
|
||||||
|
import mock
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
# Django
|
||||||
|
from django.utils import timezone
|
||||||
|
|
||||||
|
# AWX
|
||||||
|
from awx.main.models import (
|
||||||
|
Job,
|
||||||
|
JobEvent,
|
||||||
|
)
|
||||||
|
from awx.main.management.commands.replay_job_events import (
|
||||||
|
ReplayJobEvents,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestReplayJobEvents():
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def epoch(self):
|
||||||
|
return timezone.now()
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def job_events(self, epoch):
|
||||||
|
return [
|
||||||
|
JobEvent(created=epoch),
|
||||||
|
JobEvent(created=epoch + timedelta(seconds=10)),
|
||||||
|
JobEvent(created=epoch + timedelta(seconds=20)),
|
||||||
|
JobEvent(created=epoch + timedelta(seconds=30)),
|
||||||
|
JobEvent(created=epoch + timedelta(seconds=31)),
|
||||||
|
JobEvent(created=epoch + timedelta(seconds=31, milliseconds=1)),
|
||||||
|
JobEvent(created=epoch + timedelta(seconds=31, microseconds=1, milliseconds=1)),
|
||||||
|
]
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_serializer_fn(self):
|
||||||
|
class MockSerializer():
|
||||||
|
data = dict()
|
||||||
|
|
||||||
|
|
||||||
|
def fn(job_event):
|
||||||
|
serialized = MockSerializer()
|
||||||
|
serialized.data['group_name'] = 'foobar'
|
||||||
|
return serialized
|
||||||
|
return fn
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def replayer(self, mocker, job_events, mock_serializer_fn):
|
||||||
|
r = ReplayJobEvents()
|
||||||
|
r.get_serializer = lambda self: mock_serializer_fn
|
||||||
|
r.get_job = mocker.MagicMock(return_value=Job(id=3))
|
||||||
|
r.sleep = mocker.MagicMock()
|
||||||
|
r.get_job_events = lambda self: job_events
|
||||||
|
r.replay_offset = lambda *args, **kwarg: 0
|
||||||
|
return r
|
||||||
|
|
||||||
|
@mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None)
|
||||||
|
def test_sleep(self, mocker, replayer):
|
||||||
|
replayer.run(3, 1)
|
||||||
|
|
||||||
|
assert replayer.sleep.call_count == 6
|
||||||
|
replayer.sleep.assert_has_calls([
|
||||||
|
mock.call(10.0),
|
||||||
|
mock.call(10.0),
|
||||||
|
mock.call(10.0),
|
||||||
|
mock.call(1.0),
|
||||||
|
mock.call(0.001),
|
||||||
|
mock.call(0.000001),
|
||||||
|
])
|
||||||
|
|
||||||
|
@mock.patch('awx.main.management.commands.replay_job_events.emit_channel_notification', lambda *a, **kw: None)
|
||||||
|
def test_speed(self, mocker, replayer):
|
||||||
|
replayer.run(3, 2)
|
||||||
|
|
||||||
|
assert replayer.sleep.call_count == 6
|
||||||
|
replayer.sleep.assert_has_calls([
|
||||||
|
mock.call(5.0),
|
||||||
|
mock.call(5.0),
|
||||||
|
mock.call(5.0),
|
||||||
|
mock.call(0.5),
|
||||||
|
mock.call(0.0005),
|
||||||
|
mock.call(0.0000005),
|
||||||
|
])
|
||||||
|
|
||||||
|
# TODO: Test replay_offset()
|
||||||
|
# TODO: Test stat generation
|
||||||
Reference in New Issue
Block a user