mirror of
https://github.com/ansible/awx.git
synced 2026-03-22 19:35:02 -02:30
Send signals when jobs/tasks change states
This commit is contained in:
@@ -23,7 +23,7 @@ from django.utils.tzinfo import FixedOffset
|
|||||||
# AWX
|
# AWX
|
||||||
from awx.main.models import *
|
from awx.main.models import *
|
||||||
from awx.main.tasks import handle_work_error
|
from awx.main.tasks import handle_work_error
|
||||||
from awx.main.utils import get_system_task_capacity, decrypt_field
|
from awx.main.utils import get_system_task_capacity, decrypt_field, emit_websocket_notification
|
||||||
|
|
||||||
# ZeroMQ
|
# ZeroMQ
|
||||||
import zmq
|
import zmq
|
||||||
@@ -175,6 +175,7 @@ def rebuild_graph(message):
|
|||||||
task.status = 'failed'
|
task.status = 'failed'
|
||||||
task.job_explanation += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed"
|
task.job_explanation += "Task was marked as running in Tower but was not present in Celery so it has been marked as failed"
|
||||||
task.save()
|
task.save()
|
||||||
|
emit_websocket_notification('/socket.io/jobs', 'job_canceled', dict(unified_job_id=task.id))
|
||||||
running_tasks.pop(running_tasks.index(task))
|
running_tasks.pop(running_tasks.index(task))
|
||||||
print("Task %s appears orphaned... marking as failed" % task)
|
print("Task %s appears orphaned... marking as failed" % task)
|
||||||
|
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ from djcelery.models import TaskMeta
|
|||||||
# AWX
|
# AWX
|
||||||
from awx.main.models.base import *
|
from awx.main.models.base import *
|
||||||
from awx.main.models.schedules import Schedule
|
from awx.main.models.schedules import Schedule
|
||||||
from awx.main.utils import decrypt_field, get_type_for_model
|
from awx.main.utils import decrypt_field, get_type_for_model, emit_websocket_notification
|
||||||
|
|
||||||
__all__ = ['UnifiedJobTemplate', 'UnifiedJob']
|
__all__ = ['UnifiedJobTemplate', 'UnifiedJob']
|
||||||
|
|
||||||
@@ -583,6 +583,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
if not all(opts.values()):
|
if not all(opts.values()):
|
||||||
return False
|
return False
|
||||||
self.update_fields(start_args=json.dumps(kwargs), status='pending')
|
self.update_fields(start_args=json.dumps(kwargs), status='pending')
|
||||||
|
emit_websocket_notification('/socket.io/jobs', 'job_started', dict(unified_job_id=self.id))
|
||||||
task_type = get_type_for_model(self)
|
task_type = get_type_for_model(self)
|
||||||
# notify_task_runner.delay(dict(task_type=task_type, id=self.id, metadata=kwargs))
|
# notify_task_runner.delay(dict(task_type=task_type, id=self.id, metadata=kwargs))
|
||||||
return True
|
return True
|
||||||
@@ -631,6 +632,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
|
|||||||
if not self.cancel_flag:
|
if not self.cancel_flag:
|
||||||
self.cancel_flag = True
|
self.cancel_flag = True
|
||||||
self.save(update_fields=['cancel_flag'])
|
self.save(update_fields=['cancel_flag'])
|
||||||
|
emit_websocket_notification('/socket.io/jobs', 'job_canceled', dict(unified_job_id=self.id))
|
||||||
if settings.BROKER_URL.startswith('amqp://'):
|
if settings.BROKER_URL.startswith('amqp://'):
|
||||||
self._force_cancel()
|
self._force_cancel()
|
||||||
return self.cancel_flag
|
return self.cancel_flag
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ from django.utils.timezone import now
|
|||||||
|
|
||||||
# AWX
|
# AWX
|
||||||
from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, Schedule, UnifiedJobTemplate
|
from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate, Schedule, UnifiedJobTemplate
|
||||||
from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url
|
from awx.main.utils import get_ansible_version, decrypt_field, update_scm_url, emit_websocket_notification
|
||||||
|
|
||||||
__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error']
|
__all__ = ['RunJob', 'RunProjectUpdate', 'RunInventoryUpdate', 'handle_work_error']
|
||||||
|
|
||||||
@@ -123,9 +123,10 @@ def handle_work_error(self, task_id, subtasks=None):
|
|||||||
instance.job_explanation = "Previous Task Failed: %s for %s with celery task id: %s" % \
|
instance.job_explanation = "Previous Task Failed: %s for %s with celery task id: %s" % \
|
||||||
(first_task_type, first_task_name, task_id)
|
(first_task_type, first_task_name, task_id)
|
||||||
instance.save()
|
instance.save()
|
||||||
|
emit_websocket_notification('/socket.io/jobs', 'job_error', dict(unified_job_id=instance.id))
|
||||||
|
|
||||||
class BaseTask(Task):
|
class BaseTask(Task):
|
||||||
|
|
||||||
name = None
|
name = None
|
||||||
model = None
|
model = None
|
||||||
abstract = True
|
abstract = True
|
||||||
@@ -333,6 +334,7 @@ class BaseTask(Task):
|
|||||||
'''
|
'''
|
||||||
Run the job/task and capture its output.
|
Run the job/task and capture its output.
|
||||||
'''
|
'''
|
||||||
|
emit_websocket_notification('/socket.io/jobs', 'job_running', dict(unified_job_id=pk))
|
||||||
instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
|
instance = self.update_model(pk, status='running', celery_task_id=self.request.id)
|
||||||
status, tb = 'error', ''
|
status, tb = 'error', ''
|
||||||
output_replacements = []
|
output_replacements = []
|
||||||
@@ -381,6 +383,7 @@ class BaseTask(Task):
|
|||||||
instance = self.update_model(pk, status=status, result_traceback=tb,
|
instance = self.update_model(pk, status=status, result_traceback=tb,
|
||||||
output_replacements=output_replacements)
|
output_replacements=output_replacements)
|
||||||
self.post_run_hook(instance, **kwargs)
|
self.post_run_hook(instance, **kwargs)
|
||||||
|
emit_websocket_notification('/socket.io/jobs', 'job_finished', dict(unified_job_id=pk))
|
||||||
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
|
if status != 'successful' and not hasattr(settings, 'CELERY_UNIT_TEST'):
|
||||||
# Raising an exception will mark the job as 'failed' in celery
|
# Raising an exception will mark the job as 'failed' in celery
|
||||||
# and will stop a task chain from continuing to execute
|
# and will stop a task chain from continuing to execute
|
||||||
@@ -581,7 +584,7 @@ class RunJob(BaseTask):
|
|||||||
|
|
||||||
|
|
||||||
class RunProjectUpdate(BaseTask):
|
class RunProjectUpdate(BaseTask):
|
||||||
|
|
||||||
name = 'awx.main.tasks.run_project_update'
|
name = 'awx.main.tasks.run_project_update'
|
||||||
model = ProjectUpdate
|
model = ProjectUpdate
|
||||||
|
|
||||||
|
|||||||
@@ -16,6 +16,9 @@ from rest_framework.exceptions import ParseError, PermissionDenied
|
|||||||
# PyCrypto
|
# PyCrypto
|
||||||
from Crypto.Cipher import AES
|
from Crypto.Cipher import AES
|
||||||
|
|
||||||
|
# ZeroMQ
|
||||||
|
import zmq
|
||||||
|
|
||||||
__all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore',
|
__all__ = ['get_object_or_400', 'get_object_or_403', 'camelcase_to_underscore',
|
||||||
'get_ansible_version', 'get_awx_version', 'update_scm_url',
|
'get_ansible_version', 'get_awx_version', 'update_scm_url',
|
||||||
'get_type_for_model', 'get_model_for_type']
|
'get_type_for_model', 'get_model_for_type']
|
||||||
@@ -162,7 +165,7 @@ def update_scm_url(scm_type, url, username=True, password=True,
|
|||||||
elif scm_type == 'git' and ':' in url:
|
elif scm_type == 'git' and ':' in url:
|
||||||
if url.count(':') > 1:
|
if url.count(':') > 1:
|
||||||
raise ValueError('Invalid %s URL' % scm_type)
|
raise ValueError('Invalid %s URL' % scm_type)
|
||||||
|
|
||||||
modified_url = '/'.join(url.split(':', 1))
|
modified_url = '/'.join(url.split(':', 1))
|
||||||
parts = urlparse.urlsplit('ssh://%s' % modified_url)
|
parts = urlparse.urlsplit('ssh://%s' % modified_url)
|
||||||
# Handle local paths specified without file scheme (e.g. /path/to/foo).
|
# Handle local paths specified without file scheme (e.g. /path/to/foo).
|
||||||
@@ -325,6 +328,9 @@ def get_model_for_type(type):
|
|||||||
return ct_model
|
return ct_model
|
||||||
|
|
||||||
def get_system_task_capacity():
|
def get_system_task_capacity():
|
||||||
|
'''
|
||||||
|
Measure system memory and use it as a baseline for determining the system's capacity
|
||||||
|
'''
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
if hasattr(settings, 'SYSTEM_TASK_CAPACITY'):
|
if hasattr(settings, 'SYSTEM_TASK_CAPACITY'):
|
||||||
return settings.SYSTEM_TASK_CAPACITY
|
return settings.SYSTEM_TASK_CAPACITY
|
||||||
@@ -334,3 +340,11 @@ def get_system_task_capacity():
|
|||||||
if int(total_mem_value) <= 2048:
|
if int(total_mem_value) <= 2048:
|
||||||
return 50
|
return 50
|
||||||
return 50 + ((int(total_mem_value) / 1024) - 2) * 75
|
return 50 + ((int(total_mem_value) / 1024) - 2) * 75
|
||||||
|
|
||||||
|
def emit_websocket_notification(endpoint, event, payload):
|
||||||
|
emit_context = zmq.Context()
|
||||||
|
emit_socket = emit_context.socket(zmq.PUSH)
|
||||||
|
emit_socket.connect(settings.SOCKETIO_NOTIFICATION_PORT)
|
||||||
|
payload['event'] = event
|
||||||
|
payload['endpoint'] = endpoint
|
||||||
|
emit_socket.send_json(payload);
|
||||||
|
|||||||
Reference in New Issue
Block a user