Notification serializers, views, and tasks

* Implement concrete Notification model for notification runs
* Implement NotificationTemplate and Notification serializers and views
* Implement ancillary views
* Implement NotificationTemplate trigger m2m fields on all job templates
  via a fields mixin
* Link NotificationTemplates with an org
* Link notifications with the activity stream
* Implement Notification celery tasks
* Extend Backend field parameters to identify sender and receiver as
  parameters needed by the message and not the backend itself
* Updates to backends to better fit the django email backend model as it
  relates to Messages
* Implement success job chain task + notifications
* Implement notifications in error job chain task
This commit is contained in:
Matthew Jones
2016-02-09 23:12:55 -05:00
parent 319deffc18
commit 8db2f60405
18 changed files with 502 additions and 20 deletions

View File

@@ -1496,6 +1496,19 @@ class NotificationTemplateAccess(BaseAccess):
return qs
return qs
class NotificationAccess(BaseAccess):
'''
I can see/use a notification if I have permission to
'''
model = Notification
def get_queryset(self):
qs = self.model.objects.distinct()
if self.user.is_superuser:
return qs
return qs
class ActivityStreamAccess(BaseAccess):
'''
I can see activity stream events only when I have permission on all objects included in the event
@@ -1696,3 +1709,4 @@ register_access(ActivityStream, ActivityStreamAccess)
register_access(CustomInventoryScript, CustomInventoryScriptAccess)
register_access(TowerSettings, TowerSettingsAccess)
register_access(NotificationTemplate, NotificationTemplateAccess)
register_access(Notification, NotificationAccess)

View File

@@ -61,3 +61,5 @@ activity_stream_registrar.connect(AdHocCommand)
activity_stream_registrar.connect(Schedule)
activity_stream_registrar.connect(CustomInventoryScript)
activity_stream_registrar.connect(TowerSettings)
activity_stream_registrar.connect(NotificationTemplate)
activity_stream_registrar.connect(Notification)

View File

@@ -53,6 +53,8 @@ class ActivityStream(models.Model):
ad_hoc_command = models.ManyToManyField("AdHocCommand", blank=True)
schedule = models.ManyToManyField("Schedule", blank=True)
custom_inventory_script = models.ManyToManyField("CustomInventoryScript", blank=True)
notification_template = models.ManyToManyField("NotificationTemplate", blank=True)
notification = models.ManyToManyField("Notification", blank=True)
def get_absolute_url(self):
return reverse('api:activity_stream_detail', args=(self.pk,))

View File

@@ -25,7 +25,7 @@ from awx.main.utils import encrypt_field
__all__ = ['VarsDictProperty', 'BaseModel', 'CreatedModifiedModel',
'PasswordFieldsModel', 'PrimordialModel', 'CommonModel',
'CommonModelNameNotUnique',
'CommonModelNameNotUnique', 'NotificationFieldsModel',
'PERM_INVENTORY_ADMIN', 'PERM_INVENTORY_READ',
'PERM_INVENTORY_WRITE', 'PERM_INVENTORY_DEPLOY', 'PERM_INVENTORY_SCAN',
'PERM_INVENTORY_CHECK', 'PERM_JOBTEMPLATE_CREATE', 'JOB_TYPE_CHOICES',
@@ -337,3 +337,26 @@ class CommonModelNameNotUnique(PrimordialModel):
max_length=512,
unique=False,
)
class NotificationFieldsModel(BaseModel):
class Meta:
abstract = True
notification_errors = models.ManyToManyField(
"NotificationTemplate",
blank=True,
related_name='%(class)s_notifications_for_errors'
)
notification_success = models.ManyToManyField(
"NotificationTemplate",
blank=True,
related_name='%(class)s_notifications_for_success'
)
notification_any = models.ManyToManyField(
"NotificationTemplate",
blank=True,
related_name='%(class)s_notifications_for_any'
)

View File

@@ -23,6 +23,7 @@ from awx.main.managers import HostManager
from awx.main.models.base import * # noqa
from awx.main.models.jobs import Job
from awx.main.models.unified_jobs import * # noqa
from awx.main.models.notifications import NotificationTemplate
from awx.main.utils import ignore_inventory_computed_fields, _inventory_updates
__all__ = ['Inventory', 'Host', 'Group', 'InventorySource', 'InventoryUpdate', 'CustomInventoryScript']
@@ -1180,6 +1181,15 @@ class InventorySource(UnifiedJobTemplate, InventorySourceOptions):
return True
return False
@property
def notifiers(self):
# Return all notifiers defined on the Project, and on the Organization for each trigger type
base_notifiers = NotificationTemplate.objects.filter(active=True)
error_notifiers = list(base_notifiers.filter(organization_notifications_for_errors__in=self))
success_notifiers = list(base_notifiers.filter(organization_notifications_for_success__in=self))
any_notifiers = list(base_notifiers.filter(organization_notifications_for_any__in=self))
return dict(error=error_notifiers, success=success_notifiers, any=any_notifiers)
def clean_source(self):
source = self.source
if source and self.group:

View File

@@ -22,6 +22,7 @@ from jsonfield import JSONField
from awx.main.constants import CLOUD_PROVIDERS
from awx.main.models.base import * # noqa
from awx.main.models.unified_jobs import * # noqa
from awx.main.models.notifications import NotificationTemplate
from awx.main.utils import decrypt_field, ignore_inventory_computed_fields
from awx.main.utils import emit_websocket_notification
from awx.main.redact import PlainTextCleaner
@@ -330,6 +331,16 @@ class JobTemplate(UnifiedJobTemplate, JobOptions):
def _can_update(self):
return self.can_start_without_user_input()
@property
def notifiers(self):
# Return all notifiers defined on the Job Template, on the Project, and on the Organization for each trigger type
# TODO: Currently there is no org fk on project so this will need to be added once that is
# available after the rbac pr
base_notifiers = NotificationTemplate.objects.filter(active=True)
error_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifications_for_errors__in=[self, self.project]))
success_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifications_for_success__in=[self, self.project]))
any_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifications_for_any__in=[self, self.project]))
return dict(error=error_notifiers, success=success_notifiers, any=any_notifiers)
class Job(UnifiedJob, JobOptions):
'''

View File

@@ -5,7 +5,9 @@ import logging
from django.db import models
from django.core.urlresolvers import reverse
from django.core.mail.message import EmailMessage
from django.utils.translation import ugettext_lazy as _
from django.utils.encoding import smart_str
from awx.main.models.base import * # noqa
from awx.main.notifications.email_backend import CustomEmailBackend
@@ -17,7 +19,7 @@ from jsonfield import JSONField
logger = logging.getLogger('awx.main.models.notifications')
__all__ = ['NotificationTemplate']
__all__ = ['NotificationTemplate', 'Notification']
class NotificationTemplate(CommonModel):
@@ -30,6 +32,14 @@ class NotificationTemplate(CommonModel):
class Meta:
app_label = 'main'
organization = models.ForeignKey(
'Organization',
blank=False,
null=True,
on_delete=models.SET_NULL,
related_name='notification_templates',
)
notification_type = models.CharField(
max_length = 32,
choices=NOTIFICATION_TYPE_CHOICES,
@@ -42,4 +52,83 @@ class NotificationTemplate(CommonModel):
@property
def notification_class(self):
return CLASS_FOR_NOTIFICATION_TYPE[self.notification_type]
return self.CLASS_FOR_NOTIFICATION_TYPE[self.notification_type]
@property
def recipients(self):
return self.notification_configuration[self.notification_class.recipient_parameter]
def generate_notification(self, subject, message):
notification = Notification(notifier=self,
notification_type=self.notification_type,
recipients=smart_str(self.recipients),
subject=subject,
body=message)
notification.save()
return notification
def send(self, subject, body):
recipients = self.notification_configuration.pop(self.notification_class.recipient_parameter)
sender = self.notification_configuration.pop(self.notification_class.sender_parameter, None)
backend_obj = self.notification_class(**self.notification_configuration)
notification_obj = EmailMessage(subject, body, sender, recipients)
return backend_obj.send_messages([notification_obj])
class Notification(CreatedModifiedModel):
'''
A notification event emitted when a Notifier is run
'''
NOTIFICATION_STATE_CHOICES = [
('pending', _('Pending')),
('successful', _('Successful')),
('failed', _('Failed')),
]
class Meta:
app_label = 'main'
ordering = ('pk',)
notifier = models.ForeignKey(
'NotificationTemplate',
related_name='notifications',
on_delete=models.CASCADE,
editable=False
)
status = models.CharField(
max_length=20,
choices=NOTIFICATION_STATE_CHOICES,
default='pending',
editable=False,
)
error = models.TextField(
blank=True,
default='',
editable=False,
)
notifications_sent = models.IntegerField(
default=0,
editable=False,
)
notification_type = models.CharField(
max_length = 32,
choices=NotificationTemplate.NOTIFICATION_TYPE_CHOICES,
)
recipients = models.TextField(
blank=True,
default='',
editable=False,
)
subject = models.TextField(
blank=True,
default='',
editable=False,
)
body = models.TextField(
blank=True,
default='',
editable=False,
)
def get_absolute_url(self):
return reverse('api:notification_detail', args=(self.pk,))

View File

@@ -23,7 +23,7 @@ from awx.main.conf import tower_settings
__all__ = ['Organization', 'Team', 'Permission', 'Profile', 'AuthToken']
class Organization(CommonModel):
class Organization(CommonModel, NotificationFieldsModel):
'''
An organization is the basic unit of multi-tenancy divisions
'''

View File

@@ -10,6 +10,7 @@ import urlparse
# Django
from django.conf import settings
from django.db import models
from django.db.models import Q
from django.utils.translation import ugettext_lazy as _
from django.utils.encoding import smart_str, smart_text
from django.core.exceptions import ValidationError
@@ -20,6 +21,7 @@ from django.utils.timezone import now, make_aware, get_default_timezone
from awx.lib.compat import slugify
from awx.main.models.base import * # noqa
from awx.main.models.jobs import Job
from awx.main.models.notifications import NotificationTemplate
from awx.main.models.unified_jobs import * # noqa
from awx.main.utils import update_scm_url
@@ -309,6 +311,23 @@ class Project(UnifiedJobTemplate, ProjectOptions):
return True
return False
@property
def notifiers(self):
# Return all notifiers defined on the Project, and on the Organization for each trigger type
# TODO: Currently there is no org fk on project so this will need to be added back once that is
# available after the rbac pr
base_notifiers = NotificationTemplate.objects.filter(active=True)
# error_notifiers = list(base_notifiers.filter(Q(project_notifications_for_errors__in=self) |
# Q(organization_notifications_for_errors__in=self.organization)))
# success_notifiers = list(base_notifiers.filter(Q(project_notifications_for_success__in=self) |
# Q(organization_notifications_for_success__in=self.organization)))
# any_notifiers = list(base_notifiers.filter(Q(project_notifications_for_any__in=self) |
# Q(organization_notifications_for_any__in=self.organization)))
error_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifications_for_errors=self))
success_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifications_for_success=self))
any_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifications_for_any=self))
return dict(error=error_notifiers, success=success_notifiers, any=any_notifiers)
def get_absolute_url(self):
return reverse('api:project_detail', args=(self.pk,))

View File

@@ -30,6 +30,7 @@ from djcelery.models import TaskMeta
# AWX
from awx.main.models.base import * # noqa
from awx.main.models.schedules import Schedule
from awx.main.models.notifications import Notification
from awx.main.utils import decrypt_field, emit_websocket_notification, _inventory_updates
from awx.main.redact import UriCleaner
@@ -40,7 +41,7 @@ logger = logging.getLogger('awx.main.models.unified_jobs')
CAN_CANCEL = ('new', 'pending', 'waiting', 'running')
class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique):
class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique, NotificationFieldsModel):
'''
Concrete base class for unified job templates.
'''
@@ -297,6 +298,14 @@ class UnifiedJobTemplate(PolymorphicModel, CommonModelNameNotUnique):
'''
return kwargs # Override if needed in subclass.
@property
def notifiers(self):
'''
Return notifiers relevant to this Unified Job Template
'''
# NOTE: Derived classes should implement
return NotificationTemplate.objects.none()
def create_unified_job(self, **kwargs):
'''
Create a new unified job based on this unified job template.
@@ -385,6 +394,11 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
editable=False,
related_name='%(class)s_blocked_jobs+',
)
notifications = models.ManyToManyField(
'Notification',
editable=False,
related_name='%(class)s_notifications',
)
cancel_flag = models.BooleanField(
blank=True,
default=False,

View File

@@ -12,5 +12,9 @@ class CustomEmailBackend(EmailBackend):
"username": {"label": "Username", "type": "string"},
"password": {"label": "Password", "type": "password"},
"use_tls": {"label": "Use TLS", "type": "bool"},
"use_ssl": {"label": "Use SSL", "type": "bool"}}
"use_ssl": {"label": "Use SSL", "type": "bool"},
"sender": {"label": "Sender Email", "type": "string"},
"recipients": {"label": "Recipient List", "type": "list"}}
recipient_parameter = "recipients"
sender_parameter = "sender"

View File

@@ -10,7 +10,10 @@ logger = logging.getLogger('awx.main.notifications.slack_backend')
class SlackBackend(BaseEmailBackend):
init_parameters = {"token": {"label": "Token", "type": "password"}}
init_parameters = {"token": {"label": "Token", "type": "password"},
"channels": {"label": "Destination Channels", "type": "list"}}
recipient_parameter = "channels"
sender_parameter = None
def __init__(self, token, fail_silently=False, **kwargs):
super(SlackBackend, self).__init__(fail_silently=fail_silently)
@@ -37,8 +40,9 @@ class SlackBackend(BaseEmailBackend):
sent_messages = 0
for m in messages:
try:
self.connection.rtm_send_message(m.to, m.body)
sent_messages += 1
for r in m.recipients():
self.connection.rtm_send_message(r, m.body)
sent_messages += 1
except Exception as e:
if not self.fail_silently:
raise

View File

@@ -13,7 +13,10 @@ class TwilioBackend(BaseEmailBackend):
init_parameters = {"account_sid": {"label": "Account SID", "type": "string"},
"account_token": {"label": "Account Token", "type": "password"},
"from_phone": {"label": "Source Phone Number", "type": "string"}}
"from_number": {"label": "Source Phone Number", "type": "string"},
"to_numbers": {"label": "Destination SMS Numbers", "type": "list"}}
recipient_parameter = "to_numbers"
sender_parameter = "from_number"
def __init__(self, account_sid, account_token, from_phone, fail_silently=False, **kwargs):
super(TwilioBackend, self).__init__(fail_silently=fail_silently)
@@ -34,7 +37,7 @@ class TwilioBackend(BaseEmailBackend):
try:
connection.messages.create(
to=m.to,
from_=self.from_phone,
from_=m.from_email,
body=m.body)
sent_messages += 1
except Exception as e:

View File

@@ -307,6 +307,8 @@ model_serializer_mapping = {
Job: JobSerializer,
AdHocCommand: AdHocCommandSerializer,
TowerSettings: TowerSettingsSerializer,
NotificationTemplate: NotificationTemplateSerializer,
Notification: NotificationSerializer,
}
def activity_stream_create(sender, instance, created, **kwargs):

View File

@@ -53,7 +53,7 @@ from awx.fact.utils.connection import test_mongo_connection
__all__ = ['RunJob', 'RunSystemJob', 'RunProjectUpdate', 'RunInventoryUpdate',
'RunAdHocCommand', 'handle_work_error', 'handle_work_success',
'update_inventory_computed_fields']
'update_inventory_computed_fields', 'send_notifications']
HIDDEN_PASSWORD = '**********'
@@ -65,6 +65,26 @@ Try upgrading OpenSSH or providing your private key in an different format. \
logger = logging.getLogger('awx.main.tasks')
@task()
def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list):
raise TypeError("notification_list should be of type list")
for notification_id in notification_list:
notification = Notification.objects.get(id=notification_id)
try:
sent = notification.notifier.send(notification.subject, notification.body)
notification.status = "successful"
notification.notifications_sent = sent
except Exception as e:
logger.error("Send Notification Failed {}".format(e))
notification.status = "failed"
notification.error = str(e)
finally:
notification.save()
if job_id is not None:
j = UnifiedJob.objects.get(id=job_id)
j.notifications.add(notification)
@task()
def bulk_inventory_element_delete(inventory, hosts=[], groups=[]):
from awx.main.signals import disable_activity_stream
@@ -162,12 +182,41 @@ def mongodb_control(cmd):
@task(bind=True)
def handle_work_success(self, result, task_actual):
# TODO: Perform Notification tasks
pass
if task_actual['type'] == 'project_update':
instance = ProjectUpdate.objects.get(id=task_actual['id'])
instance_name = instance.name
notifiers = instance.project.notifiers
friendly_name = "Project Update"
elif task_actual['type'] == 'inventory_update':
instance = InventoryUpdate.objects.get(id=task_actual['id'])
instance_name = instance.name
notifiers = instance.inventory_source.notifiers
friendly_name = "Inventory Update"
elif task_actual['type'] == 'job':
instance = Job.objects.get(id=task_actual['id'])
instance_name = instance.job_template.name
notifiers = instance.job_template.notifiers
friendly_name = "Job"
elif task_actual['type'] == 'ad_hoc_command':
instance = AdHocCommand.objects.get(id=task_actual['id'])
instance_name = instance.module_name
notifiers = [] # TODO: Ad-hoc commands need to notify someone
friendly_name = "AdHoc Command"
else:
return
notification_subject = "{} #{} '{}' succeeded on Ansible Tower".format(friendly_name,
task_actual['id'],
instance_name)
notification_body = "{} #{} '{}' succeeded on Ansible Tower\nTo view the output: {}".format(friendly_name,
task_actual['id'],
instance_name,
instance.get_absolute_url())
send_notifications.delay([n.generate_notification(notification_subject, notification_body)
for n in notifiers.get('success', []) + notifiers.get('any', [])],
job_id=task_actual['id'])
@task(bind=True)
def handle_work_error(self, task_id, subtasks=None):
# TODO: Perform Notification tasks
print('Executing error task id %s, subtasks: %s' %
(str(self.request.id), str(subtasks)))
first_task = None
@@ -180,15 +229,23 @@ def handle_work_error(self, task_id, subtasks=None):
if each_task['type'] == 'project_update':
instance = ProjectUpdate.objects.get(id=each_task['id'])
instance_name = instance.name
notifiers = instance.project.notifiers
friendly_name = "Project Update"
elif each_task['type'] == 'inventory_update':
instance = InventoryUpdate.objects.get(id=each_task['id'])
instance_name = instance.name
notifiers = instance.inventory_source.notifiers
friendly_name = "Inventory Update"
elif each_task['type'] == 'job':
instance = Job.objects.get(id=each_task['id'])
instance_name = instance.job_template.name
notifiers = instance.job_template.notifiers
friendly_name = "Job"
elif each_task['type'] == 'ad_hoc_command':
instance = AdHocCommand.objects.get(id=each_task['id'])
instance_name = instance.module_name
notifiers = []
friendly_name = "AdHoc Command"
else:
# Unknown task type
break
@@ -197,6 +254,7 @@ def handle_work_error(self, task_id, subtasks=None):
first_task_id = instance.id
first_task_type = each_task['type']
first_task_name = instance_name
first_task_friendly_name = friendly_name
if instance.celery_task_id != task_id:
instance.status = 'failed'
instance.failed = True
@@ -204,6 +262,17 @@ def handle_work_error(self, task_id, subtasks=None):
(first_task_type, first_task_name, first_task_id)
instance.save()
instance.socketio_emit_status("failed")
notification_subject = "{} #{} '{}' failed on Ansible Tower".format(first_task_friendly_name,
first_task_id,
first_task_name)
notification_body = "{} #{} '{}' failed on Ansible Tower\nTo view the output: {}".format(first_task_friendly_name,
first_task_id,
first_task_name,
first_task.get_absolute_url())
send_notifications.delay([n.generate_notification(notification_subject, notification_body).id
for n in notifiers.get('error', []) + notifiers.get('any', [])],
job_id=first_task_id)
@task()
def update_inventory_computed_fields(inventory_id, should_update_hosts=True):