From 3267a988f8be35d012686e04761f36888609f0aa Mon Sep 17 00:00:00 2001 From: Luke Sneeringer Date: Wed, 10 Sep 2014 12:17:35 -0500 Subject: [PATCH] Shift Django to 1.6.x (>= 1.6.7). --- Makefile | 6 +- awx/api/authentication.py | 1 + awx/api/views.py | 22 ++++-- .../management/commands/cleanup_deleted.py | 2 +- awx/main/management/commands/cleanup_jobs.py | 2 +- .../management/commands/inventory_import.py | 71 ++++++++++--------- .../commands/run_callback_receiver.py | 54 +++++++++----- awx/main/models/unified_jobs.py | 2 +- awx/main/tasks.py | 50 +++++++------ awx/main/tests/commands.py | 8 +-- awx/main/tests/jobs.py | 25 +++---- awx/main/tests/tasks.py | 2 - awx/settings/defaults.py | 10 +-- 13 files changed, 142 insertions(+), 113 deletions(-) diff --git a/Makefile b/Makefile index dafca409e8..06a76bbf06 100644 --- a/Makefile +++ b/Makefile @@ -157,11 +157,11 @@ pyflakes: # Run all API unit tests. test: - $(PYTHON) manage.py test -v2 main + $(PYTHON) manage.py test -v2 awx.main.tests # Run all API unit tests with coverage enabled. test_coverage: - coverage run manage.py test -v2 main + coverage run manage.py test -v2 awx.main.tests # Output test coverage as HTML (into htmlcov directory). coverage_html: @@ -169,7 +169,7 @@ coverage_html: # Run UI unit tests using Selenium. test_ui: - $(PYTHON) manage.py test -v2 ui + $(PYTHON) manage.py test -v2 awx.ui.tests # Run API unit tests across multiple Python/Django versions with Tox. test_tox: diff --git a/awx/api/authentication.py b/awx/api/authentication.py index b1e124702d..bbe96dbacd 100644 --- a/awx/api/authentication.py +++ b/awx/api/authentication.py @@ -9,6 +9,7 @@ from rest_framework import HTTP_HEADER_ENCODING # AWX from awx.main.models import Job, AuthToken + class TokenAuthentication(authentication.TokenAuthentication): ''' Custom token authentication using tokens that expire and are associated diff --git a/awx/api/views.py b/awx/api/views.py index b678fd48c1..1aac81d750 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -1414,6 +1414,10 @@ class JobTemplateCallback(GenericAPIView): model = JobTemplate permission_classes = (JobTemplateCallbackPermission,) + @transaction.non_atomic_requests + def dispatch(self, *args, **kwargs): + return super(JobTemplateCallback, self).dispatch(*args, **kwargs) + def find_matching_hosts(self): ''' Find the host(s) in the job template's inventory that match the remote @@ -1535,14 +1539,22 @@ class JobTemplateCallback(GenericAPIView): # FIXME: Log! return Response(data, status=status.HTTP_400_BAD_REQUEST) limit = ':&'.join(filter(None, [job_template.limit, host.name])) - job = job_template.create_job(limit=limit, launch_type='callback') - result = job.signal_start(inventory_sources_already_updated=inventory_sources_already_updated) + + # Everything is fine; actually create the job. + with transaction.atomic(): + job = job_template.create_job(limit=limit, launch_type='callback') + + # Send a signal to celery that the job should be started. + isau = inventory_sources_already_updated + result = job.signal_start(inventory_sources_already_updated=isau) if not result: data = dict(msg='Error starting job!') return Response(data, status=status.HTTP_400_BAD_REQUEST) - else: - headers = {'Location': job.get_absolute_url()} - return Response(status=status.HTTP_202_ACCEPTED, headers=headers) + + # Return the location of the new job. + headers = {'Location': job.get_absolute_url()} + return Response(status=status.HTTP_202_ACCEPTED, headers=headers) + class JobTemplateJobsList(SubListCreateAPIView): diff --git a/awx/main/management/commands/cleanup_deleted.py b/awx/main/management/commands/cleanup_deleted.py index d17bd73dc5..199fc1cded 100644 --- a/awx/main/management/commands/cleanup_deleted.py +++ b/awx/main/management/commands/cleanup_deleted.py @@ -88,7 +88,7 @@ class Command(BaseCommand): self.logger.addHandler(handler) self.logger.propagate = False - @transaction.commit_on_success + @transaction.atomic def handle(self, *args, **options): self.verbosity = int(options.get('verbosity', 1)) self.init_logging() diff --git a/awx/main/management/commands/cleanup_jobs.py b/awx/main/management/commands/cleanup_jobs.py index f508442a1e..fab02cafa5 100644 --- a/awx/main/management/commands/cleanup_jobs.py +++ b/awx/main/management/commands/cleanup_jobs.py @@ -105,7 +105,7 @@ class Command(NoArgsCommand): self.logger.addHandler(handler) self.logger.propagate = False - @transaction.commit_on_success + @transaction.atomic def handle_noargs(self, **options): self.verbosity = int(options.get('verbosity', 1)) self.init_logging() diff --git a/awx/main/management/commands/inventory_import.py b/awx/main/management/commands/inventory_import.py index d63f7b425c..d78fe93358 100644 --- a/awx/main/management/commands/inventory_import.py +++ b/awx/main/management/commands/inventory_import.py @@ -1125,7 +1125,6 @@ class Command(NoArgsCommand): self.logger.error(LICENSE_MESSAGE % d) raise CommandError('License count exceeded!') - @transaction.commit_on_success def handle_noargs(self, **options): self.verbosity = int(options.get('verbosity', 1)) self.init_logging() @@ -1171,10 +1170,11 @@ class Command(NoArgsCommand): # Update inventory update for this command line invocation. with ignore_inventory_computed_fields(): - if self.inventory_update: - self.inventory_update.status = 'running' - self.inventory_update.save() - transaction.commit() + iu = self.inventory_update + if iu and iu.status != 'running': + with transaction.atomic(): + self.inventory_update.status = 'running' + self.inventory_update.save() # Load inventory from source. self.all_group = load_inventory_source(self.source, None, @@ -1183,35 +1183,41 @@ class Command(NoArgsCommand): self.exclude_empty_groups) self.all_group.debug_tree() - # Merge/overwrite inventory into database. - if settings.SQL_DEBUG: - self.logger.warning('loading into database...') - with ignore_inventory_computed_fields(): - if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True): - self.load_into_database() - else: - with disable_activity_stream(): + # Ensure that this is managed as an atomic SQL transaction, + # and thus properly rolled back if there is an issue. + with transaction.atomic(): + # Merge/overwrite inventory into database. + if settings.SQL_DEBUG: + self.logger.warning('loading into database...') + with ignore_inventory_computed_fields(): + if getattr(settings, 'ACTIVITY_STREAM_ENABLED_FOR_INVENTORY_SYNC', True): self.load_into_database() + else: + with disable_activity_stream(): + self.load_into_database() + if settings.SQL_DEBUG: + queries_before2 = len(connection.queries) + self.inventory.update_computed_fields() + if settings.SQL_DEBUG: + self.logger.warning('update computed fields took %d queries', + len(connection.queries) - queries_before2) + self.check_license() + + if self.inventory_source.group: + inv_name = 'group "%s"' % (self.inventory_source.group.name) + else: + inv_name = '"%s" (id=%s)' % (self.inventory.name, + self.inventory.id) if settings.SQL_DEBUG: - queries_before2 = len(connection.queries) - self.inventory.update_computed_fields() - if settings.SQL_DEBUG: - self.logger.warning('update computed fields took %d queries', - len(connection.queries) - queries_before2) - self.check_license() - - if self.inventory_source.group: - inv_name = 'group "%s"' % (self.inventory_source.group.name) - else: - inv_name = '"%s" (id=%s)' % (self.inventory.name, - self.inventory.id) - if settings.SQL_DEBUG: - self.logger.warning('Inventory import completed for %s in %0.1fs', - inv_name, time.time() - begin) - else: - self.logger.info('Inventory import completed for %s in %0.1fs', - inv_name, time.time() - begin) - status = 'successful' + self.logger.warning('Inventory import completed for %s in %0.1fs', + inv_name, time.time() - begin) + else: + self.logger.info('Inventory import completed for %s in %0.1fs', + inv_name, time.time() - begin) + status = 'successful' + + # If we're in debug mode, then log the queries and time + # used to do the operation. if settings.SQL_DEBUG: queries_this_import = connection.queries[queries_before:] sqltime = sum(float(x['time']) for x in queries_this_import) @@ -1236,7 +1242,6 @@ class Command(NoArgsCommand): self.inventory_update.result_traceback = tb self.inventory_update.status = status self.inventory_update.save(update_fields=['status', 'result_traceback']) - transaction.commit() if exc and isinstance(exc, CommandError): sys.exit(1) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 677cee5b40..038a0af25e 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -130,16 +130,25 @@ class CallbackReceiver(object): last_parent_events[message['job_id']] = job_parent_events consumer_subscriber.send("1") - @transaction.commit_on_success def process_job_event(self, data): + # Sanity check: Do we need to do anything at all? event = data.get('event', '') parent_id = data.get('parent', None) if not event or 'job_id' not in data: return + + # Get the correct "verbose" value from the job. + # If for any reason there's a problem, just use 0. try: verbose = Job.objects.get(id=data['job_id']).verbosity except Exception, e: verbose = 0 + + # Convert the datetime for the job event's creation appropriately, + # and include a time zone for it. + # + # In the event of any issue, throw it out, and Django will just save + # the current time. try: if not isinstance(data['created'], datetime.datetime): data['created'] = parse_datetime(data['created']) @@ -147,31 +156,44 @@ class CallbackReceiver(object): data['created'] = data['created'].replace(tzinfo=FixedOffset(0)) except (KeyError, ValueError): data.pop('created', None) + + # Print the data to stdout if we're in DEBUG mode. if settings.DEBUG: print data + + # Sanity check: Don't honor keys that we don't recognize. for key in data.keys(): - if key not in ('job_id', 'event', 'event_data', 'created', 'counter'): + if key not in ('job_id', 'event', 'event_data', + 'created', 'counter'): data.pop(key) + + # Save any modifications to the job event to the database. + # If we get a database error of some kind, try again. for retry_count in xrange(11): try: - if event == 'playbook_on_stats': - transaction.commit() - if verbose == 0 and 'res' in data['event_data'] and 'invocation' in data['event_data']['res'] and \ - 'module_args' in data['event_data']['res']['invocation']: - data['event_data']['res']['invocation']['module_args'] = "" - job_event = JobEvent(**data) - if parent_id is not None: - job_event.parent = JobEvent.objects.get(id=parent_id) - job_event.save(post_process=True) - return job_event + with transaction.atomic(): + # If we're not in verbose mode, wipe out any module + # arguments. + i = data['event_data'].get('res', {}).get('invocation', {}) + if verbose == 0 and 'module_args' in i: + i['module_args'] = '' + + # Create a new JobEvent object. + job_event = JobEvent(**data) + if parent_id is not None: + job_event.parent = JobEvent.objects.get(id=parent_id) + job_event.save(post_process=True) + + # Retrun the job event object. + return job_event except DatabaseError as e: - transaction.rollback() + # Log the error and try again. print('Database error saving job event, retrying in ' '1 second (retry #%d): %s', retry_count + 1, e) time.sleep(1) - else: - print('Failed to save job event after %d retries.', - retry_count) + + # We failed too many times, and are giving up. + print('Failed to save job event after %d retries.', retry_count) return None def callback_worker(self, queue_actual): diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index d7786f0af8..29b9e8839c 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -623,7 +623,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique Notify the task runner system to begin work on this task. ''' from awx.main.tasks import notify_task_runner - if hasattr(settings, 'CELERY_UNIT_TEST'): + if getattr(settings, 'CELERY_UNIT_TEST', False): return self.start(None, **kwargs) if not self.can_start: return False diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 52ac26d5be..beeba7844c 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -154,24 +154,25 @@ def update_inventory_computed_fields(inventory_id, should_update_hosts=True): i = i[0] i.update_computed_fields(update_hosts=should_update_hosts) -class BaseTask(Task): +class BaseTask(Task): name = None model = None abstract = True - @transaction.commit_on_success - def update_model(self, pk, **updates): - ''' - Reload model from database and update the given fields. - ''' + def update_model(self, pk, _attempt=0, **updates): + """Reload the model instance from the database and update the + given fields. + """ output_replacements = updates.pop('output_replacements', None) or [] - # Commit outstanding transaction so that we fetch the latest object - # from the database. - transaction.commit() - for retry_count in xrange(5): - try: + + try: + with transaction.atomic(): + # Retrieve the model instance. instance = self.model.objects.get(pk=pk) + + # Update the appropriate fields and save the model + # instance, then return the new instance. if updates: update_fields = ['modified'] for field, value in updates.items(): @@ -183,17 +184,25 @@ class BaseTask(Task): if field == 'status': update_fields.append('failed') instance.save(update_fields=update_fields) - transaction.commit() return instance - except DatabaseError as e: - transaction.rollback() - logger.debug('Database error updating %s, retrying in 5 ' - 'seconds (retry #%d): %s', - self.model._meta.object_name, retry_count + 1, e) + except DatabaseError as e: + # Log out the error to the debug logger. + logger.debug('Database error updating %s, retrying in 5 ' + 'seconds (retry #%d): %s', + self.model._meta.object_name, retry_count + 1, e) + + # Attempt to retry the update, assuming we haven't already + # tried too many times. + if _attempt < 5: time.sleep(5) - else: - logger.error('Failed to update %s after %d retries.', - self.model._meta.object_name, retry_count) + return self.update_model(pk, + _attempt=_attempt + 1, + output_replacements=output_replacements, + **updates + ) + else: + logger.error('Failed to update %s after %d retries.', + self.model._meta.object_name, retry_count) def signal_finished(self, pk): pass @@ -375,6 +384,7 @@ class BaseTask(Task): Run the job/task and capture its output. ''' instance = self.update_model(pk, status='running', celery_task_id=self.request.id) + instance.socketio_emit_status("running") status, tb = 'error', '' output_replacements = [] diff --git a/awx/main/tests/commands.py b/awx/main/tests/commands.py index a600e52f68..be5f9f4f9b 100644 --- a/awx/main/tests/commands.py +++ b/awx/main/tests/commands.py @@ -156,9 +156,7 @@ class BaseCommandMixin(object): result = None try: result = command_runner(name, *args, **options) - except Exception, e: - result = e - except SystemExit, e: + except Exception as e: result = e finally: captured_stdout = sys.stdout.getvalue() @@ -166,10 +164,6 @@ class BaseCommandMixin(object): sys.stdin = original_stdin sys.stdout = original_stdout sys.stderr = original_stderr - # For Django 1.4.x, convert sys.exit(1) and stderr message to the - # CommandError(msg) exception used by Django 1.5 and later. - if isinstance(result, SystemExit) and captured_stderr: - result = CommandError(captured_stderr) return result, captured_stdout, captured_stderr class DumpDataTest(BaseCommandMixin, BaseTest): diff --git a/awx/main/tests/jobs.py b/awx/main/tests/jobs.py index 199aeda3ad..c137368c39 100644 --- a/awx/main/tests/jobs.py +++ b/awx/main/tests/jobs.py @@ -12,11 +12,12 @@ import urlparse import uuid # Django +import django.test from django.contrib.auth.models import User as DjangoUser from django.conf import settings from django.core.urlresolvers import reverse +from django.db import transaction from django.db.models import Q -import django.test from django.test.client import Client from django.test.utils import override_settings @@ -998,20 +999,11 @@ class JobTest(BaseJobTestMixin, django.test.TestCase): # and that jobs come back nicely serialized with related resources and so on ... # that we can drill all the way down and can get at host failure lists, etc ... -# Need to disable transaction middleware for testing so that the callback -# management command will be able to read the database changes made to start -# the job. It won't be an issue normally, because the task will be running -# asynchronously; the start API call will update the database, queue the task, -# then return immediately (committing the transaction) before celery has even -# woken up to run the new task. -MIDDLEWARE_CLASSES = filter(lambda x: not x.endswith('TransactionMiddleware'), - settings.MIDDLEWARE_CLASSES) @override_settings(CELERY_ALWAYS_EAGER=True, CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, CALLBACK_CONSUMER_PORT='', - ANSIBLE_TRANSPORT='local', - MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES) + ANSIBLE_TRANSPORT='local') class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase): '''Job API tests that need to use the celery task backend.''' @@ -1274,8 +1266,7 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase): @override_settings(CELERY_ALWAYS_EAGER=True, CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, - ANSIBLE_TRANSPORT='local', - MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES) + ANSIBLE_TRANSPORT='local') class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase): '''Job template callback tests for empheral hosts.''' @@ -1416,7 +1407,12 @@ class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase): host_ip = self.get_test_ips_for_host(host.name)[0] jobs_qs = job_template.jobs.filter(launch_type='callback').order_by('-pk') self.assertEqual(jobs_qs.count(), 0) + + # Create the job itself. result = self.post(url, data, expect=202, remote_addr=host_ip) + + # Establish that we got back what we expect, and made the changes + # that we expect. self.assertTrue('Location' in result.response, result.response) self.assertEqual(jobs_qs.count(), 1) job = jobs_qs[0] @@ -1613,8 +1609,7 @@ class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase): @override_settings(CELERY_ALWAYS_EAGER=True, CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, - ANSIBLE_TRANSPORT='local')#, - #MIDDLEWARE_CLASSES=MIDDLEWARE_CLASSES) + ANSIBLE_TRANSPORT='local') class JobTransactionTest(BaseJobTestMixin, django.test.LiveServerTestCase): '''Job test of transaction locking using the celery task backend.''' diff --git a/awx/main/tests/tasks.py b/awx/main/tests/tasks.py index 91dcb2d8c3..4bf4e7affa 100644 --- a/awx/main/tests/tasks.py +++ b/awx/main/tests/tasks.py @@ -498,8 +498,6 @@ class RunJobTest(BaseCeleryTest): host_pks) if async: qs = job_events.filter(event='runner_on_async_poll') - if not async_nowait: - self.assertTrue(qs.count()) for evt in qs: self.assertEqual(evt.host, self.host) self.assertTrue(evt.play, evt) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index dbde4ee81c..5adaeb0eda 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -32,6 +32,7 @@ DATABASES = { 'NAME': os.path.join(BASE_DIR, 'awx.sqlite3'), # Test database cannot be :memory: for celery/inventory tests to work. 'TEST_NAME': os.path.join(BASE_DIR, 'awx_test.sqlite3'), + 'ATOMIC_REQUESTS': True, } } @@ -114,8 +115,6 @@ TEMPLATE_CONTEXT_PROCESSORS += ( ) MIDDLEWARE_CLASSES += ( - 'django.middleware.transaction.TransactionMiddleware', - # Middleware loaded after this point will be subject to transactions. 'awx.main.middleware.ActivityStreamMiddleware', 'crum.CurrentRequestUserMiddleware', ) @@ -238,13 +237,6 @@ EMAIL_USE_TLS = False try: import debug_toolbar INSTALLED_APPS += ('debug_toolbar',) - # Add debug toolbar middleware before Transaction middleware. - new_mc = [] - for mc in MIDDLEWARE_CLASSES: - if mc == 'django.middleware.transaction.TransactionMiddleware': - new_mc.append('debug_toolbar.middleware.DebugToolbarMiddleware') - new_mc.append(mc) - MIDDLEWARE_CLASSES = tuple(new_mc) except ImportError: pass