Rebasing for initial task system work. Current work towards actual task

running flow
This commit is contained in:
Matthew Jones
2014-03-10 16:07:20 -04:00
parent 8960f17f28
commit 85b6aa2262
8 changed files with 366 additions and 62 deletions

View File

@@ -31,6 +31,7 @@ from jsonfield import JSONField
# AWX
from awx.main.models.base import *
from awx.main.utils import encrypt_field
# Celery
from celery import chain
@@ -298,7 +299,7 @@ class Job(CommonTask):
def _get_task_class(self):
from awx.main.tasks import RunJob
return RunJob
def _get_passwords_needed_to_start(self):
return self.passwords_needed_to_start
@@ -307,6 +308,28 @@ class Job(CommonTask):
kwargs['job_host_summaries__job__pk'] = self.pk
return Host.objects.filter(**kwargs)
def is_blocked_by(self, obj):
from awx.main.models import InventoryUpdate, ProjectUpdate
if type(obj) == Job:
if obj.job_template == self.job_template:
return True
return False
if type(obj) == InventoryUpdate:
for i_s in self.inventory.inventory_sources.filter(active=True):
if i_s == obj.inventory_source:
return True
return False
if type(obj) == ProjectUpdate:
if obj.project == self.project:
return True
return False
return False
@property
def task_impact(self):
# NOTE: We sorta have to assume the host count matches and that forks default to 5
return min(self._get_hosts().count(), 5 if self.forks == 0 else self.forks) * 10
@property
def successful_hosts(self):
return self._get_hosts(job_host_summaries__ok__gt=0)
@@ -335,64 +358,57 @@ class Job(CommonTask):
def processed_hosts(self):
return self._get_hosts(job_host_summaries__processed__gt=0)
def start(self, **kwargs):
def generate_dependencies(self, active_tasks):
from awx.main.models import InventoryUpdate, ProjectUpdate
inventory_sources = self.inventory.inventory_sources.filter(active=True, update_on_launch=True)
project_found = False
inventory_sources_found = []
dependencies = []
for obj in active_tasks:
if type(obj) == ProjectUpdate:
if obj.project == self.project:
project_found = True
if type(obj) == InventoryUpdate:
if obj.inventory_source in inventory_sources:
inventory_sources_found.append(obj.inventory_source)
if not project_found and self.project.scm_update_on_launch::
dependencies.append(self.project.project_updates.create())
if inventory_sources.count(): # and not has_setup_failures? Probably handled as an error scenario in the task runner
for source in inventory_sources:
if not source in inventory_sources_found:
dependencies.append(source.inventory_updates.create())
return dependencies
def signal_start(self, **kwargs):
json_args = json.dumps(kwargs)
self.start_args = json_args
self.save()
self.start_args = encrypt_field(self, 'start_args')
self.save()
signal_context = zmq.Context()
signal_socket = signal_context.socket(zmq.REQ)
signal_socket.connect(settings.TASK_COMMAND_PORT)
signal_socket.send_json(dict(task_type="ansible_playbook", id=self.id))
self.socket.recv()
return True
def start(self, error_callback, **kwargs):
from awx.main.tasks import handle_work_error
task_class = self._get_task_class()
if not self.can_start:
return False
needed = self._get_passwords_needed_to_start()
opts = dict([(field, kwargs.get(field, '')) for field in needed])
try:
stored_args = json.loads(decrypt_field(self, 'start_args'))
except Exception, e:
stored_args = None
if stored_args is None or stored_args == '':
opts = dict([(field, kwargs.get(field, '')) for field in needed])
else:
opts = stored_args
if not all(opts.values()):
return False
self.status = 'waiting'
self.save(update_fields=['status'])
transaction.commit()
runnable_tasks = []
run_tasks = []
inventory_updates_actual = []
project_update_actual = None
has_setup_failures = False
setup_failure_message = ""
project = self.project
inventory = self.inventory
is_qs = inventory.inventory_sources.filter(active=True, update_on_launch=True)
if project.scm_update_on_launch:
project_update_details = project.update_signature()
if not project_update_details:
has_setup_failures = True
setup_failure_message = "Failed to check dependent project update task"
else:
runnable_tasks.append({'obj': project_update_details[0],
'sig': project_update_details[1],
'type': 'project_update'})
if is_qs.count() and not has_setup_failures:
for inventory_source in is_qs:
inventory_update_details = inventory_source.update_signature()
if not inventory_update_details:
has_setup_failures = True
setup_failure_message = "Failed to check dependent inventory update task"
break
else:
runnable_tasks.append({'obj': inventory_update_details[0],
'sig': inventory_update_details[1],
'type': 'inventory_update'})
if has_setup_failures:
for each_task in runnable_tasks:
obj = each_task['obj']
obj.status = 'error'
obj.result_traceback = setup_failure_message
obj.save()
self.status = 'error'
self.result_traceback = setup_failure_message
self.save()
thisjob = {'type': 'job', 'id': self.id}
for idx in xrange(len(runnable_tasks)):
dependent_tasks = [{'type': r['type'], 'id': r['obj'].id} for r in runnable_tasks[idx:]] + [thisjob]
run_tasks.append(runnable_tasks[idx]['sig'].set(link_error=handle_work_error.s(subtasks=dependent_tasks)))
run_tasks.append(task_class().si(self.pk, **opts).set(link_error=handle_work_error.s(subtasks=[thisjob])))
res = chain(run_tasks)()
task_class().apply_async((self.pk, **opts), link_error=error_callback)
return True
class JobHostSummary(BaseModel):