mirror of
https://github.com/ansible/awx.git
synced 2026-05-07 17:37:37 -02:30
Implement max event websocket rate as setting
This commit is contained in:
@@ -344,6 +344,17 @@ register(
|
|||||||
category_slug='jobs',
|
category_slug='jobs',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
register(
|
||||||
|
'MAX_WEBSOCKET_EVENT_RATE',
|
||||||
|
field_class=fields.IntegerField,
|
||||||
|
min_value=0,
|
||||||
|
default=30,
|
||||||
|
label=_('Job Event Maximum Websocket Messages Per Second'),
|
||||||
|
help_text=_('Maximum number of messages to update the UI live job output with per second. Value of 0 means no limit.'),
|
||||||
|
category=_('Jobs'),
|
||||||
|
category_slug='jobs',
|
||||||
|
)
|
||||||
|
|
||||||
register(
|
register(
|
||||||
'SCHEDULE_MAX_JOBS',
|
'SCHEDULE_MAX_JOBS',
|
||||||
field_class=fields.IntegerField,
|
field_class=fields.IntegerField,
|
||||||
|
|||||||
@@ -729,10 +729,6 @@ def with_path_cleanup(f):
|
|||||||
return _wrapped
|
return _wrapped
|
||||||
|
|
||||||
|
|
||||||
# TODO: move to CTiT settings if we ever get serious about this
|
|
||||||
MAX_WEBSOCKET_EVENT_RATE = 30
|
|
||||||
|
|
||||||
|
|
||||||
class BaseTask(object):
|
class BaseTask(object):
|
||||||
model = None
|
model = None
|
||||||
event_model = None
|
event_model = None
|
||||||
@@ -744,7 +740,7 @@ class BaseTask(object):
|
|||||||
self.host_map = {}
|
self.host_map = {}
|
||||||
self.guid = GuidMiddleware.get_guid()
|
self.guid = GuidMiddleware.get_guid()
|
||||||
self.job_created = None
|
self.job_created = None
|
||||||
self.recent_event_timings = deque(maxlen=MAX_WEBSOCKET_EVENT_RATE)
|
self.recent_event_timings = deque(maxlen=settings.MAX_WEBSOCKET_EVENT_RATE)
|
||||||
|
|
||||||
def update_model(self, pk, _attempt=0, **updates):
|
def update_model(self, pk, _attempt=0, **updates):
|
||||||
"""Reload the model instance from the database and update the
|
"""Reload the model instance from the database and update the
|
||||||
@@ -1157,8 +1153,8 @@ class BaseTask(object):
|
|||||||
event_data['event_data']['guid'] = self.guid
|
event_data['event_data']['guid'] = self.guid
|
||||||
|
|
||||||
# To prevent overwhelming the broadcast queue, skip some websocket messages
|
# To prevent overwhelming the broadcast queue, skip some websocket messages
|
||||||
cpu_time = time.time()
|
|
||||||
if self.recent_event_timings:
|
if self.recent_event_timings:
|
||||||
|
cpu_time = time.time()
|
||||||
first_window_time = self.recent_event_timings[0]
|
first_window_time = self.recent_event_timings[0]
|
||||||
inverse_effective_rate = cpu_time - first_window_time
|
inverse_effective_rate = cpu_time - first_window_time
|
||||||
# if last 30 events came in under 1 second ago
|
# if last 30 events came in under 1 second ago
|
||||||
@@ -1175,8 +1171,8 @@ class BaseTask(object):
|
|||||||
if self.recent_event_timings[0] == self.recent_event_timings[-1]:
|
if self.recent_event_timings[0] == self.recent_event_timings[-1]:
|
||||||
logger.info('Starting a window of event emission, will pause if I see too many')
|
logger.info('Starting a window of event emission, will pause if I see too many')
|
||||||
self.recent_event_timings.append(cpu_time)
|
self.recent_event_timings.append(cpu_time)
|
||||||
else:
|
elif self.recent_event_timings.maxlen:
|
||||||
self.recent_event_timings.append(cpu_time)
|
self.recent_event_timings.append(time.time())
|
||||||
|
|
||||||
event_data.setdefault(self.event_data_key, self.instance.id)
|
event_data.setdefault(self.event_data_key, self.instance.id)
|
||||||
self.dispatcher.dispatch(event_data)
|
self.dispatcher.dispatch(event_data)
|
||||||
|
|||||||
Reference in New Issue
Block a user