Merge pull request #4039 from wwitzel3/workflow-channels

Workflow Event Channels Integration
This commit is contained in:
Wayne Witzel III
2016-11-18 14:41:40 -05:00
committed by GitHub
7 changed files with 57 additions and 17 deletions

View File

@@ -15,6 +15,11 @@ class Migration(migrations.Migration):
] ]
operations = [ operations = [
migrations.AlterField(
model_name='unifiedjob',
name='launch_type',
field=models.CharField(default=b'manual', max_length=20, editable=False, choices=[(b'manual', 'Manual'), (b'relaunch', 'Relaunch'), (b'callback', 'Callback'), (b'scheduled', 'Scheduled'), (b'dependency', 'Dependency'), (b'workflow', 'Workflow')]),
),
migrations.CreateModel( migrations.CreateModel(
name='WorkflowJob', name='WorkflowJob',
fields=[ fields=[
@@ -34,7 +39,7 @@ class Migration(migrations.Migration):
('modified', models.DateTimeField(default=None, editable=False)), ('modified', models.DateTimeField(default=None, editable=False)),
('always_nodes', models.ManyToManyField(related_name='workflowjobnodes_always', to='main.WorkflowJobNode', blank=True)), ('always_nodes', models.ManyToManyField(related_name='workflowjobnodes_always', to='main.WorkflowJobNode', blank=True)),
('failure_nodes', models.ManyToManyField(related_name='workflowjobnodes_failure', to='main.WorkflowJobNode', blank=True)), ('failure_nodes', models.ManyToManyField(related_name='workflowjobnodes_failure', to='main.WorkflowJobNode', blank=True)),
('job', models.ForeignKey(related_name='unified_job_nodes', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.UnifiedJob', null=True)), ('job', models.OneToOneField(related_name='unified_job_node', on_delete=django.db.models.deletion.SET_NULL, default=None, blank=True, to='main.UnifiedJob', null=True)),
('success_nodes', models.ManyToManyField(related_name='workflowjobnodes_success', to='main.WorkflowJobNode', blank=True)), ('success_nodes', models.ManyToManyField(related_name='workflowjobnodes_success', to='main.WorkflowJobNode', blank=True)),
], ],
options={ options={

View File

@@ -394,6 +394,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
('callback', _('Callback')), # Job was started via host callback. ('callback', _('Callback')), # Job was started via host callback.
('scheduled', _('Scheduled')), # Job was started from a schedule. ('scheduled', _('Scheduled')), # Job was started from a schedule.
('dependency', _('Dependency')), # Job was started as a dependency of another job. ('dependency', _('Dependency')), # Job was started as a dependency of another job.
('workflow', _('Workflow')), # Job was started from a workflow job.
] ]
PASSWORD_FIELDS = ('start_args',) PASSWORD_FIELDS = ('start_args',)
@@ -757,6 +758,16 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def result_stdout_limited(self, start_line=0, end_line=None, redact_sensitive=False): def result_stdout_limited(self, start_line=0, end_line=None, redact_sensitive=False):
return self._result_stdout_raw_limited(start_line, end_line, redact_sensitive, escape_ascii=True) return self._result_stdout_raw_limited(start_line, end_line, redact_sensitive, escape_ascii=True)
@property
def spawned_by_workflow(self):
return self.launch_type == 'workflow'
@property
def workflow_job_id(self):
if self.spawned_by_workflow:
return self.unified_job_node.workflow_job.pk
return None
@property @property
def celery_task(self): def celery_task(self):
try: try:
@@ -781,7 +792,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
def websocket_emit_data(self): def websocket_emit_data(self):
''' Return extra data that should be included when submitting data to the browser over the websocket connection ''' ''' Return extra data that should be included when submitting data to the browser over the websocket connection '''
return {} return {'workflow_job_id': self.workflow_job_id}
def websocket_emit_status(self, status): def websocket_emit_status(self, status):
status_data = dict(unified_job_id=self.id, status=status) status_data = dict(unified_job_id=self.id, status=status)
@@ -789,6 +800,11 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
status_data['group_name'] = 'jobs' status_data['group_name'] = 'jobs'
emit_channel_notification('jobs-status_changed', status_data) emit_channel_notification('jobs-status_changed', status_data)
if self.spawned_by_workflow:
status_data['group_name'] = "workflow_events"
emit_channel_notification('workflow_events-' + str(self.workflow_job_id), status_data)
def notification_data(self): def notification_data(self):
return dict(id=self.id, return dict(id=self.id,
name=self.name, name=self.name,

View File

@@ -187,9 +187,9 @@ class WorkflowJobTemplateNode(WorkflowNodeBase):
class WorkflowJobNode(WorkflowNodeBase): class WorkflowJobNode(WorkflowNodeBase):
job = models.ForeignKey( job = models.OneToOneField(
'UnifiedJob', 'UnifiedJob',
related_name='unified_job_nodes', related_name='unified_job_node',
blank=True, blank=True,
null=True, null=True,
default=None, default=None,
@@ -258,6 +258,8 @@ class WorkflowJobNode(WorkflowNodeBase):
extra_vars.update(functional_aa_dict) extra_vars.update(functional_aa_dict)
if extra_vars: if extra_vars:
data['extra_vars'] = extra_vars data['extra_vars'] = extra_vars
# ensure that unified jobs created by WorkflowJobs are marked
data['launch_type'] = 'workflow'
return data return data

View File

@@ -0,0 +1,16 @@
import mock
from awx.main.models import (
UnifiedJob,
WorkflowJob,
WorkflowJobNode,
)
def test_unified_job_workflow_attributes():
with mock.patch('django.db.ConnectionRouter.db_for_write'):
job = UnifiedJob(id=1, name="job-1", launch_type="workflow")
job.unified_job_node = WorkflowJobNode(workflow_job=WorkflowJob(pk=1))
assert job.spawned_by_workflow is True
assert job.workflow_job_id == 1

View File

@@ -215,6 +215,9 @@ export default
if(state.data && state.data.socket && state.data.socket.groups.hasOwnProperty( "ad_hoc_command_events")){ if(state.data && state.data.socket && state.data.socket.groups.hasOwnProperty( "ad_hoc_command_events")){
state.data.socket.groups.ad_hoc_command_events = [id]; state.data.socket.groups.ad_hoc_command_events = [id];
} }
if(state.data && state.data.socket && state.data.socket.groups.hasOwnProperty( "workflow_events")){
state.data.socket.groups.workflow_events = [id];
}
self.subscribe(state); self.subscribe(state);
} }
return true; return true;

View File

@@ -18,11 +18,7 @@ export default {
data: { data: {
socket: { socket: {
"groups":{ "groups":{
"jobs": ["status_changed", "summary"], "workflow_events": []
// not sure if you're gonna need to use job_events
// or if y'all will come up w/ a new socket group specifically
// for workflows
// "job_events": []
} }
} }
}, },

View File

@@ -21,8 +21,9 @@ Once you''ve connected you are not subscribed to any event groups. You subscribe
'groups': { 'groups': {
'jobs': ['status_changed', 'summary'], 'jobs': ['status_changed', 'summary'],
'schedules': ['changed'], 'schedules': ['changed'],
'ad_hoc_command_events': [ids,], 'ad_hoc_command_events': [ids...],
'job_events': [ids,], 'job_events': [ids...],
'workflow_events': [ids...]
'control': ['limit_reached'], 'control': ['limit_reached'],
} }
@@ -39,12 +40,13 @@ production and development deployments that I will point out, but the actual ser
between the two environments. between the two environments.
### Services ### Services
| Name | Details | | Name | Details |
|:---------:|:-----------------------------------------------------------------------------------------------------------:| |:-----------:|:-----------------------------------------------------------------------------------------------------------:|
| nginx | listens on ports 80/443, handles HTTPS proxying, serves static assets, routes requests for daphne and uwsgi | | nginx | listens on ports 80/443, handles HTTPS proxying, serves static assets, routes requests for daphne and uwsgi |
| uwsgi | listens on port 8050, handles API requests | | uwsgi | listens on port 8050, handles API requests |
| daphne | listens on port 8051, handles Websocket requests | | daphne | listens on port 8051, handles Websocket requests |
| runworker | no listening port, watches and processes the message queue | | runworker | no listening port, watches and processes the message queue |
| supervisord | (production-only) handles the process management of all the services except nginx |
When a request comes in to *nginx* and have the `Upgrade` header and is for the path `/websocket`, then *nginx* knows that it should When a request comes in to *nginx* and have the `Upgrade` header and is for the path `/websocket`, then *nginx* knows that it should
be routing that request to our *daphne* service. be routing that request to our *daphne* service.