mirror of
https://github.com/ansible/awx.git
synced 2026-05-19 14:57:39 -02:30
Update timeout implementation, placeholder code for possible websocket support
This commit is contained in:
@@ -121,8 +121,8 @@ SUMMARIZABLE_FK_FIELDS = {
|
|||||||
'job_template': DEFAULT_SUMMARY_FIELDS,
|
'job_template': DEFAULT_SUMMARY_FIELDS,
|
||||||
'workflow_job_template': DEFAULT_SUMMARY_FIELDS,
|
'workflow_job_template': DEFAULT_SUMMARY_FIELDS,
|
||||||
'workflow_job': DEFAULT_SUMMARY_FIELDS,
|
'workflow_job': DEFAULT_SUMMARY_FIELDS,
|
||||||
'workflow_approval_template': DEFAULT_SUMMARY_FIELDS,
|
'workflow_approval_template': DEFAULT_SUMMARY_FIELDS + ('timeout',),
|
||||||
'workflow_approval': DEFAULT_SUMMARY_FIELDS,
|
'workflow_approval': DEFAULT_SUMMARY_FIELDS + ('timeout',),
|
||||||
'schedule': DEFAULT_SUMMARY_FIELDS + ('next_run',),
|
'schedule': DEFAULT_SUMMARY_FIELDS + ('next_run',),
|
||||||
'unified_job_template': DEFAULT_SUMMARY_FIELDS + ('unified_job_type',),
|
'unified_job_template': DEFAULT_SUMMARY_FIELDS + ('unified_job_type',),
|
||||||
'last_job': DEFAULT_SUMMARY_FIELDS + ('finished', 'status', 'failed', 'license_error'),
|
'last_job': DEFAULT_SUMMARY_FIELDS + ('finished', 'status', 'failed', 'license_error'),
|
||||||
@@ -3413,10 +3413,16 @@ class WorkflowApprovalViewSerializer(UnifiedJobSerializer):
|
|||||||
class WorkflowApprovalSerializer(UnifiedJobSerializer):
|
class WorkflowApprovalSerializer(UnifiedJobSerializer):
|
||||||
|
|
||||||
can_approve_or_deny = serializers.SerializerMethodField()
|
can_approve_or_deny = serializers.SerializerMethodField()
|
||||||
|
approval_expiration = serializers.SerializerMethodField()
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
model = WorkflowApproval
|
model = WorkflowApproval
|
||||||
fields = (['*', '-controller_node', '-execution_node', 'can_approve_or_deny'])
|
fields = (['*', '-controller_node', '-execution_node', 'can_approve_or_deny', 'approval_expiration'])
|
||||||
|
|
||||||
|
def get_approval_expiration(self, obj):
|
||||||
|
if obj.status != 'pending' or obj.timeout == 0:
|
||||||
|
return None
|
||||||
|
return now() + timedelta(seconds=obj.timeout)
|
||||||
|
|
||||||
def get_can_approve_or_deny(self, obj):
|
def get_can_approve_or_deny(self, obj):
|
||||||
request = self.context.get('request', None)
|
request = self.context.get('request', None)
|
||||||
@@ -3437,9 +3443,15 @@ class WorkflowApprovalSerializer(UnifiedJobSerializer):
|
|||||||
class WorkflowApprovalListSerializer(WorkflowApprovalSerializer, UnifiedJobListSerializer):
|
class WorkflowApprovalListSerializer(WorkflowApprovalSerializer, UnifiedJobListSerializer):
|
||||||
|
|
||||||
can_approve_or_deny = serializers.SerializerMethodField()
|
can_approve_or_deny = serializers.SerializerMethodField()
|
||||||
|
approval_expiration = serializers.SerializerMethodField()
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
fields = ('*', '-execution_node', '-controller_node', 'can_approve_or_deny')
|
fields = ('*', '-execution_node', '-controller_node', 'can_approve_or_deny', 'approval_expiration')
|
||||||
|
|
||||||
|
def get_approval_expiration(self, obj):
|
||||||
|
if obj.status != 'pending' or obj.timeout == 0:
|
||||||
|
return None
|
||||||
|
return now() + timedelta(seconds=obj.timeout)
|
||||||
|
|
||||||
def get_can_approve_or_deny(self, obj):
|
def get_can_approve_or_deny(self, obj):
|
||||||
request = self.context.get('request', None)
|
request = self.context.get('request', None)
|
||||||
|
|||||||
@@ -4457,7 +4457,8 @@ class WorkflowApprovalApprove(RetrieveAPIView):
|
|||||||
|
|
||||||
def post(self, request, *args, **kwargs):
|
def post(self, request, *args, **kwargs):
|
||||||
obj = self.get_object()
|
obj = self.get_object()
|
||||||
request.user.can_access(models.WorkflowApproval, 'approve_or_deny', obj)
|
if not request.user.can_access(models.WorkflowApproval, 'approve_or_deny', obj):
|
||||||
|
raise PermissionDenied(detail=_("User does not have permission to approve or deny this workflow."))
|
||||||
if obj.status != 'pending':
|
if obj.status != 'pending':
|
||||||
return Response("This workflow step has already been approved or denied.", status=status.HTTP_400_BAD_REQUEST)
|
return Response("This workflow step has already been approved or denied.", status=status.HTTP_400_BAD_REQUEST)
|
||||||
obj.approve()
|
obj.approve()
|
||||||
@@ -4471,7 +4472,8 @@ class WorkflowApprovalDeny(RetrieveAPIView):
|
|||||||
|
|
||||||
def post(self, request, *args, **kwargs):
|
def post(self, request, *args, **kwargs):
|
||||||
obj = self.get_object()
|
obj = self.get_object()
|
||||||
request.user.can_access(models.WorkflowApproval, 'approve_or_deny', obj)
|
if not request.user.can_access(models.WorkflowApproval, 'approve_or_deny', obj):
|
||||||
|
raise PermissionDenied(detail=_("User does not have permission to approve or deny this workflow."))
|
||||||
if obj.status != 'pending':
|
if obj.status != 'pending':
|
||||||
return Response("This workflow step has already been approved or denied.", status=status.HTTP_400_BAD_REQUEST)
|
return Response("This workflow step has already been approved or denied.", status=status.HTTP_400_BAD_REQUEST)
|
||||||
obj.deny()
|
obj.deny()
|
||||||
|
|||||||
@@ -2379,7 +2379,7 @@ class UnifiedJobTemplateAccess(BaseAccess):
|
|||||||
Q(pk__in=self.model.accessible_pk_qs(self.user, 'read_role')) |
|
Q(pk__in=self.model.accessible_pk_qs(self.user, 'read_role')) |
|
||||||
Q(inventorysource__inventory__id__in=Inventory._accessible_pk_qs(
|
Q(inventorysource__inventory__id__in=Inventory._accessible_pk_qs(
|
||||||
Inventory, self.user, 'read_role'))
|
Inventory, self.user, 'read_role'))
|
||||||
).exclude(polymorphic_ctype__model='workflowapprovaltemplate') # &&&&&&
|
) # &&&&&& (filter out approvals from UJT endpoint here...?)
|
||||||
|
|
||||||
def can_start(self, obj, validate_license=True):
|
def can_start(self, obj, validate_license=True):
|
||||||
access_class = access_registry[obj.__class__]
|
access_class = access_registry[obj.__class__]
|
||||||
@@ -2429,7 +2429,7 @@ class UnifiedJobAccess(BaseAccess):
|
|||||||
Q(adhoccommand__inventory__id__in=inv_pk_qs) |
|
Q(adhoccommand__inventory__id__in=inv_pk_qs) |
|
||||||
Q(job__inventory__organization__in=org_auditor_qs) |
|
Q(job__inventory__organization__in=org_auditor_qs) |
|
||||||
Q(job__project__organization__in=org_auditor_qs)
|
Q(job__project__organization__in=org_auditor_qs)
|
||||||
).exclude(polymorphic_ctype__model='workflowapproval') # &&&&&&
|
) # &&&&&& (for filtering out approvals from UJ endpoint...?)
|
||||||
return qs
|
return qs
|
||||||
|
|
||||||
|
|
||||||
@@ -2632,7 +2632,6 @@ class ActivityStreamAccess(BaseAccess):
|
|||||||
app_set = OAuth2ApplicationAccess(self.user).filtered_queryset()
|
app_set = OAuth2ApplicationAccess(self.user).filtered_queryset()
|
||||||
token_set = OAuth2TokenAccess(self.user).filtered_queryset()
|
token_set = OAuth2TokenAccess(self.user).filtered_queryset()
|
||||||
|
|
||||||
# &&&&&& Activity Stream + RBAC here??
|
|
||||||
return qs.filter(
|
return qs.filter(
|
||||||
Q(ad_hoc_command__inventory__in=inventory_set) |
|
Q(ad_hoc_command__inventory__in=inventory_set) |
|
||||||
Q(o_auth2_application__in=app_set) |
|
Q(o_auth2_application__in=app_set) |
|
||||||
|
|||||||
@@ -70,4 +70,9 @@ class Migration(migrations.Migration):
|
|||||||
name='read_role',
|
name='read_role',
|
||||||
field=awx.main.fields.ImplicitRoleField(editable=False, null='True', on_delete=django.db.models.deletion.CASCADE, parent_role=['singleton:system_auditor', 'organization.auditor_role', 'execute_role', 'admin_role', 'approval_role'], related_name='+', to='main.Role'),
|
field=awx.main.fields.ImplicitRoleField(editable=False, null='True', on_delete=django.db.models.deletion.CASCADE, parent_role=['singleton:system_auditor', 'organization.auditor_role', 'execute_role', 'admin_role', 'approval_role'], related_name='+', to='main.Role'),
|
||||||
),
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='workflowapproval',
|
||||||
|
name='timeout',
|
||||||
|
field=models.IntegerField(blank=True, default=0, help_text='The amount of time (in seconds) before the approval node expires and fails.'),
|
||||||
|
),
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -632,7 +632,7 @@ class WorkflowApprovalTemplate(UnifiedJobTemplate):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _get_unified_job_field_names(cls):
|
def _get_unified_job_field_names(cls):
|
||||||
return ['name', 'description']
|
return ['name', 'description', 'timeout']
|
||||||
|
|
||||||
def get_absolute_url(self, request=None):
|
def get_absolute_url(self, request=None):
|
||||||
return reverse('api:workflow_approval_template_detail', kwargs={'pk': self.pk}, request=request)
|
return reverse('api:workflow_approval_template_detail', kwargs={'pk': self.pk}, request=request)
|
||||||
@@ -650,6 +650,11 @@ class WorkflowApproval(UnifiedJob):
|
|||||||
default=None,
|
default=None,
|
||||||
on_delete=models.SET_NULL,
|
on_delete=models.SET_NULL,
|
||||||
)
|
)
|
||||||
|
timeout = models.IntegerField(
|
||||||
|
blank=True,
|
||||||
|
default=0,
|
||||||
|
help_text=_("The amount of time (in seconds) before the approval node expires and fails."),
|
||||||
|
)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def _get_unified_job_template_class(cls):
|
def _get_unified_job_template_class(cls):
|
||||||
@@ -676,3 +681,9 @@ class WorkflowApproval(UnifiedJob):
|
|||||||
self.save()
|
self.save()
|
||||||
schedule_task_manager()
|
schedule_task_manager()
|
||||||
return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request)
|
return reverse('api:workflow_approval_deny', kwargs={'pk': self.pk}, request=request)
|
||||||
|
|
||||||
|
# &&&&&& Possible placeholder for websocket support
|
||||||
|
# def websocket_emit_data(self):
|
||||||
|
# websocket_data = super(WorkflowApproval, self).websocket_emit_data()
|
||||||
|
# websocket_data.update(dict(project_id=self.project.id)) # ?????
|
||||||
|
# return websocket_data
|
||||||
|
|||||||
@@ -519,20 +519,27 @@ class TaskManager():
|
|||||||
if not found_acceptable_queue:
|
if not found_acceptable_queue:
|
||||||
logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
|
logger.debug("{} couldn't be scheduled on graph, waiting for next cycle".format(task.log_format))
|
||||||
|
|
||||||
def timeout_approval_node(self):
|
def timeout_approval_node(self): # Add websocket stuff for when it transitions to "timed out" (maybe a websocket_emit_status() call)
|
||||||
workflow_approvals = WorkflowApproval.objects.filter(status='pending').prefetch_related('workflow_approval_template')
|
workflow_approvals = WorkflowApproval.objects.filter(status='pending')
|
||||||
now = tz_now()
|
now = tz_now()
|
||||||
for task in workflow_approvals:
|
for task in workflow_approvals:
|
||||||
# TODO: copy the timeout to the job itself at launch time, not the template
|
# TODO: copy the timeout to the job itself at launch time, not the template <---- Started to implement steps to do this, but unsure...
|
||||||
approval_timeout_seconds = timedelta(seconds=task.workflow_approval_template.timeout)
|
approval_timeout_seconds = timedelta(seconds=task.timeout)
|
||||||
if task.workflow_approval_template.timeout == 0:
|
if task.timeout == 0:
|
||||||
continue
|
continue
|
||||||
if (now - task.created) >= approval_timeout_seconds:
|
if (now - task.created) >= approval_timeout_seconds:
|
||||||
logger.info("The approval node {} ({}) has expired after {} seconds.".format(task.name, task.id, task.workflow_approval_template.timeout))
|
logger.info("The approval node {} ({}) has expired after {} seconds.".format(task.name, task.pk, task.timeout))
|
||||||
task.status = 'failed'
|
task.status = 'failed'
|
||||||
task.job_explanation = _("This approval node has timed out.")
|
task.job_explanation = _("This approval node has timed out.")
|
||||||
task.save(update_fields=['status', 'job_explanation'])
|
task.save(update_fields=['status', 'job_explanation'])
|
||||||
|
|
||||||
|
# &&&&&& Placeholder for websocket support
|
||||||
|
# def detect_pending_approval(self):
|
||||||
|
# workflow_approvals = WorkflowApproval.objects.filter(status='pending').prefetch_related('workflow_approval_template')
|
||||||
|
# for task in workflow_approvals:
|
||||||
|
# if task.status == 'pending':
|
||||||
|
# workflow_approvals.websocket_emit_status(task.status)
|
||||||
|
|
||||||
def calculate_capacity_consumed(self, tasks):
|
def calculate_capacity_consumed(self, tasks):
|
||||||
self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph)
|
self.graph = InstanceGroup.objects.capacity_values(tasks=tasks, graph=self.graph)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user