Generalize socketio sender from task and emit more data specific to the type of unified job sending the update

This commit is contained in:
Matthew Jones
2014-04-22 15:52:45 -04:00
parent 5904d2e1ee
commit 4810b9532d
5 changed files with 27 additions and 12 deletions

View File

@@ -23,7 +23,7 @@ from django.utils.tzinfo import FixedOffset
# AWX # AWX
from awx.main.models import * from awx.main.models import *
from awx.main.tasks import handle_work_error from awx.main.tasks import handle_work_error
from awx.main.utils import get_system_task_capacity, decrypt_field, emit_websocket_notification from awx.main.utils import get_system_task_capacity, decrypt_field
# ZeroMQ # ZeroMQ
import zmq import zmq
@@ -175,7 +175,7 @@ def rebuild_graph(message):
task.status = 'failed' task.status = 'failed'
task.job_explanation += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed" task.job_explanation += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed"
task.save() task.save()
emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=task.id, status='failed')) task.socketio_emit_status("failed")
running_tasks.pop(running_tasks.index(task)) running_tasks.pop(running_tasks.index(task))
print("Task %s appears orphaned... marking as failed" % task) print("Task %s appears orphaned... marking as failed" % task)

View File

@@ -741,6 +741,9 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions):
from awx.main.tasks import RunInventoryUpdate from awx.main.tasks import RunInventoryUpdate
return RunInventoryUpdate return RunInventoryUpdate
def socketio_emit_data(self):
return dict(group_id=self.inventory_source.group.id)
def save(self, *args, **kwargs): def save(self, *args, **kwargs):
update_fields = kwargs.get('update_fields', []) update_fields = kwargs.get('update_fields', [])
if bool('license' in self.result_stdout and if bool('license' in self.result_stdout and

View File

@@ -346,6 +346,9 @@ class ProjectUpdate(UnifiedJob, ProjectOptions):
return True return True
return False return False
def socketio_emit_data(self):
return dict(project_id=self.project.id)
@property @property
def task_impact(self): def task_impact(self):
return 20 return 20

View File

@@ -539,6 +539,15 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
''' Given another task object determine if this task would be blocked by it ''' ''' Given another task object determine if this task would be blocked by it '''
raise NotImplementedError # Implement in subclass. raise NotImplementedError # Implement in subclass.
def socketio_emit_data(self):
''' Return extra data that should be included when submitting data to the browser over the websocket connection '''
return {}
def socketio_emit_status(self, status):
status_data = dict(unified_job_id=self.id, status=status)
status_data.update(self.socketio_emit_data())
emit_websocket_notification('/socket.io/jobs', 'status_changed', status_data)
def generate_dependencies(self, active_tasks): def generate_dependencies(self, active_tasks):
''' Generate any tasks that the current task might be dependent on given a list of active ''' Generate any tasks that the current task might be dependent on given a list of active
tasks that might preclude creating one''' tasks that might preclude creating one'''
@@ -583,7 +592,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
if not all(opts.values()): if not all(opts.values()):
return False return False
self.update_fields(start_args=json.dumps(kwargs), status='pending') self.update_fields(start_args=json.dumps(kwargs), status='pending')
emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=self.id, status='pending')) self.socketio_emit_status("pending")
task_type = get_type_for_model(self) task_type = get_type_for_model(self)
# notify_task_runner.delay(dict(task_type=task_type, id=self.id, metadata=kwargs)) # notify_task_runner.delay(dict(task_type=task_type, id=self.id, metadata=kwargs))
return True return True
@@ -623,7 +632,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
instance.job_explanation = 'Forced cancel' instance.job_explanation = 'Forced cancel'
update_fields.append('job_explanation') update_fields.append('job_explanation')
instance.save(update_fields=update_fields) instance.save(update_fields=update_fields)
emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=instance.id, status='canceled')) self.socketio_emit_status("canceled")
except: # FIXME: Log this exception! except: # FIXME: Log this exception!
if settings.DEBUG: if settings.DEBUG:
raise raise
@@ -633,7 +642,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
if not self.cancel_flag: if not self.cancel_flag:
self.cancel_flag = True self.cancel_flag = True
self.save(update_fields=['cancel_flag']) self.save(update_fields=['cancel_flag'])
emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=self.id, status='canceled')) self.socketio_emit_status("canceled")
if settings.BROKER_URL.startswith('amqp://'): if settings.BROKER_URL.startswith('amqp://'):
self._force_cancel() self._force_cancel()
return self.cancel_flag return self.cancel_flag

View File

@@ -37,7 +37,7 @@ from django.utils.timezone import now
# AWX # AWX
from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, Schedule, UnifiedJobTemplate from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, Schedule, UnifiedJobTemplate
from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url, emit_websocket_notification from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url
__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error'] __all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error']
@@ -84,7 +84,7 @@ def tower_periodic_scheduler(self):
new_unified_job.status = 'failed' new_unified_job.status = 'failed'
new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials" new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
new_unified_job.save(update_fields=['job_status', 'job_explanation']) new_unified_job.save(update_fields=['job_status', 'job_explanation'])
emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=new_unified_job.id, status='failed')) new_unified_job.socketio_emit_status("failed")
@task() @task()
def notify_task_runner(metadata_dict): def notify_task_runner(metadata_dict):
@@ -124,7 +124,7 @@ def handle_work_error(self, task_id, subtasks=None):
instance.job_explanation = "Previous Task Failed: %s for %s with celery task id: %s" % \ instance.job_explanation = "Previous Task Failed: %s for %s with celery task id: %s" % \
(first_task_type, first_task_name, task_id) (first_task_type, first_task_name, task_id)
instance.save() instance.save()
emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=instance.id, status='failed')) instance.socketio_emit_status("failed")
class BaseTask(Task): class BaseTask(Task):
@@ -335,8 +335,8 @@ class BaseTask(Task):
''' '''
Run the job/task and capture its output. Run the job/task and capture its output.
''' '''
emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=pk, status='running'))
instance = self.update_model(pk, status='running', celery_task_id=self.request.id) instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
instance.socketio_emit_status("running")
status, tb = 'error', '' status, tb = 'error', ''
output_replacements = [] output_replacements = []
try: try:
@@ -384,7 +384,7 @@ class BaseTask(Task):
instance = self.update_model(pk, status=status, result_traceback=tb, instance = self.update_model(pk, status=status, result_traceback=tb,
output_replacements=output_replacements) output_replacements=output_replacements)
self.post_run_hook(instance, **kwargs) self.post_run_hook(instance, **kwargs)
emit_websocket_notification('/socket.io/jobs', 'status_changed', dict(unified_job_id=pk, status=status)) instance.socketio_emit_status(status)
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'): if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
# Raising an exception will mark the job as 'failed' in celery # Raising an exception will mark the job as 'failed' in celery
# and will stop a task chain from continuing to execute # and will stop a task chain from continuing to execute