Include proper invocation for non-job tasks with error callback

This commit is contained in:
Matthew Jones
2014-03-10 17:00:35 -04:00
parent 85b6aa2262
commit d269dc1ecc
2 changed files with 3 additions and 18 deletions

View File

@@ -183,7 +183,7 @@ def process_graph(graph, task_capacity):
if impact <= remaining_volume or running_impact == 0: if impact <= remaining_volume or running_impact == 0:
dependent_nodes = [{'type': graph.get_node_type(n), 'id': n.id} for n in graph.get_dependents()] dependent_nodes = [{'type': graph.get_node_type(n), 'id': n.id} for n in graph.get_dependents()]
error_handler = handle_work_error.s(subtasks=dependent_nodes) error_handler = handle_work_error.s(subtasks=dependent_nodes)
node_obj.start(error_callback=error_handler) start_status = node_obj.start(error_callback=error_handler)
remaining_volume -= impact remaining_volume -= impact
running_impact += impact running_impact += impact

View File

@@ -395,9 +395,7 @@ class CommonTask(PrimordialModel):
''' Notify the task runner system to begin work on this task ''' ''' Notify the task runner system to begin work on this task '''
raise NotImplementedError raise NotImplementedError
def start_signature(self, **kwargs): def start(self, error_callback, **kwargs):
from awx.main.tasks import handle_work_error
task_class = self._get_task_class() task_class = self._get_task_class()
if not self.can_start: if not self.can_start:
return False return False
@@ -405,20 +403,7 @@ class CommonTask(PrimordialModel):
opts = dict([(field, kwargs.get(field, '')) for field in needed]) opts = dict([(field, kwargs.get(field, '')) for field in needed])
if not all(opts.values()): if not all(opts.values()):
return False return False
task_actual = task_class().si(self.pk, **opts) task_class().apply_async((self.pk, **opts), link_error=error_callback)
return task_actual
def start(self, error_callback, **kwargs):
task_actual = self.start_signature(**kwargs)
# TODO: Callback for status
task_result = task_actual.delay()
# Reload instance from database so we don't clobber results from task
# (mainly from tests when using Django 1.4.x).
instance = self.__class__.objects.get(pk=self.pk)
# The TaskMeta instance in the database isn't created until the worker
# starts processing the task, so we can only store the task ID here.
instance.celery_task_id = task_result.task_id
instance.save(update_fields=['celery_task_id'])
return True return True
@property @property