AC-620 Change to minimize queries when adding a new job event, post process events after job has completed.

This commit is contained in:
Chris Church
2013-11-25 11:17:38 -05:00
parent 9038784bb6
commit 6ec1160b0f
3 changed files with 25 additions and 13 deletions

View File

@@ -618,7 +618,10 @@ class JobEvent(BaseModel):
# Skip normal checks on save if we're only updating failed/changed
# flags triggered from a child event.
from_parent_update = kwargs.pop('from_parent_update', False)
if not from_parent_update:
# Only update job event hierarchy and related models during post
# processing (after running job).
post_process = kwargs.pop('post_process', False)
if post_process and not from_parent_update:
res = self.event_data.get('res', None)
# Workaround for Ansible 1.2, where the runner_on_async_ok event is
# created even when the async task failed. Change the event to be
@@ -664,7 +667,7 @@ class JobEvent(BaseModel):
self.parent = self._find_parent()
update_fields.extend(['play', 'task', 'parent'])
super(JobEvent, self).save(*args, **kwargs)
if not from_parent_update:
if post_process and not from_parent_update:
self.update_parent_failed_and_changed()
self.update_hosts()
self.update_host_summary_from_stats()

View File

@@ -56,7 +56,7 @@ class BaseTask(Task):
transaction.commit()
instance = self.model.objects.get(pk=pk)
if updates:
update_fields = []
update_fields = ['modified']
for field, value in updates.items():
if field in ('result_stdout', 'result_traceback'):
for srch, repl in output_replacements:
@@ -166,7 +166,7 @@ class BaseTask(Task):
passwords when requested.
'''
status, stdout = 'error', ''
logfile = task_stdout_handle#cStringIO.StringIO()
logfile = task_stdout_handle
logfile_pos = logfile.tell()
child = pexpect.spawn(args[0], args[1:], cwd=cwd, env=env)
child.logfile_read = logfile
@@ -183,15 +183,10 @@ class BaseTask(Task):
self.update_model(instance.pk, status='running', output_replacements=output_replacements)
while child.isalive():
result_id = child.expect(expect_list, timeout=pexpect_timeout)
#print 'pexpect result_id', result_id, expect_list[result_id], expect_passwords.get(result_id, None)
if result_id in expect_passwords:
child.sendline(expect_passwords[result_id])
if logfile_pos != logfile.tell():
#old_logfile_pos = logfile_pos
logfile_pos = logfile.tell()
#updates['result_stdout'] = logfile.getvalue()
#task_stdout_handle.write(logfile.getvalue()[old_logfile_pos:logfile_pos])
#task_stdout_handle.flush()
last_stdout_update = time.time()
# Update instance status here (also updates modified timestamp, so
# we have a way to know the task is still running, otherwise the
@@ -211,7 +206,6 @@ class BaseTask(Task):
status = 'successful'
else:
status = 'failed'
#stdout = logfile.getvalue()
return status, stdout
def pre_run_check(self, instance, **kwargs):
@@ -255,7 +249,7 @@ class BaseTask(Task):
@transaction.commit_on_success
def run(self, pk, **kwargs):
'''
Run the job/task using ansible-playbook and capture its output.
Run the job/task and capture its output.
'''
instance = self.update_model(pk)
status, stdout, tb = 'error', '', ''
@@ -525,6 +519,16 @@ class RunJob(BaseTask):
else:
return False
def post_run_hook(self, job, **kwargs):
'''
Hook for actions to run after job/task has completed.
'''
super(RunJob, self).post_run_hook(job, **kwargs)
# Update job event fields after job has completed.
for job_event in job.job_events.order_by('pk'):
job_event.save(post_process=True)
class RunProjectUpdate(BaseTask):
name = 'run_project_update'

View File

@@ -7,6 +7,7 @@ import json
import socket
import struct
import threading
import time
import urlparse
import uuid
@@ -1331,7 +1332,8 @@ class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase):
@override_settings(CELERY_ALWAYS_EAGER=True,
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
ANSIBLE_TRANSPORT='local')
ANSIBLE_TRANSPORT='local')#,
#MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES)
class JobTransactionTest(BaseJobTestMixin, django.test.LiveServerTestCase):
'''Job test of transaction locking using the celery task backend.'''
@@ -1343,7 +1345,9 @@ class JobTransactionTest(BaseJobTestMixin, django.test.LiveServerTestCase):
super(JobTransactionTest, self).tearDown()
def _job_detail_polling_thread(self, url, auth, errors):
time.sleep(1)
while True:
time.sleep(0.1)
try:
response = requests.get(url, auth=auth)
response.raise_for_status()
@@ -1370,7 +1374,7 @@ class JobTransactionTest(BaseJobTestMixin, django.test.LiveServerTestCase):
# Create lots of extra test hosts to trigger job event callbacks
job = self.job_eng_run
inv = job.inventory
for x in xrange(100):
for x in xrange(50):
h = inv.hosts.create(name='local-%d' % x)
for g in inv.groups.all():
g.hosts.add(h)
@@ -1389,3 +1393,4 @@ class JobTransactionTest(BaseJobTestMixin, django.test.LiveServerTestCase):
job = Job.objects.get(pk=job.pk)
self.assertEqual(job.status, 'successful', job.result_stdout)
self.assertFalse(errors)