diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 0578a4ff97..b0588265a4 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -3,14 +3,12 @@ import logging import os import signal import time -import traceback import datetime from django.conf import settings from django.utils.functional import cached_property from django.utils.timezone import now as tz_now -from django.db import DatabaseError, OperationalError, transaction, connection as django_connection -from django.db.utils import InterfaceError, InternalError +from django.db import transaction, connection as django_connection from django_guid import set_guid import psutil @@ -64,6 +62,7 @@ class CallbackBrokerWorker(BaseWorker): """ MAX_RETRIES = 2 + INDIVIDUAL_EVENT_RETRIES = 3 last_stats = time.time() last_flush = time.time() total = 0 @@ -164,38 +163,48 @@ class CallbackBrokerWorker(BaseWorker): else: # only calculate the seconds if the created time already has been set metrics_total_job_event_processing_seconds += e.modified - e.created metrics_duration_to_save = time.perf_counter() + saved_events = [] try: cls.objects.bulk_create(events) metrics_bulk_events_saved += len(events) + saved_events = events + self.buff[cls] = [] except Exception as exc: - logger.warning(f'Error in events bulk_create, will try indiviually up to 5 errors, error {str(exc)}') + # If the database is flaking, let ensure_connection throw a general exception + # will be caught by the outer loop, which goes into a proper sleep and retry loop + django_connection.ensure_connection() + logger.warning(f'Error in events bulk_create, will try indiviually, error: {str(exc)}') # if an exception occurs, we should re-attempt to save the # events one-by-one, because something in the list is # broken/stale - consecutive_errors = 0 - events_saved = 0 metrics_events_batch_save_errors += 1 - for e in events: + for e in events.copy(): try: e.save() - events_saved += 1 - consecutive_errors = 0 + metrics_singular_events_saved += 1 + events.remove(e) + saved_events.append(e) # Importantly, remove successfully saved events from the buffer except Exception as exc_indv: - consecutive_errors += 1 - logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}') - if consecutive_errors >= 5: - raise - metrics_singular_events_saved += events_saved - if events_saved == 0: - raise + retry_count = getattr(e, '_retry_count', 0) + 1 + e._retry_count = retry_count + + # special sanitization logic for postgres treatment of NUL 0x00 char + if (retry_count == 1) and isinstance(exc_indv, ValueError) and ("\x00" in e.stdout): + e.stdout = e.stdout.replace("\x00", "") + + if retry_count >= self.INDIVIDUAL_EVENT_RETRIES: + logger.error(f'Hit max retries ({retry_count}) saving individual Event error: {str(exc_indv)}\ndata:\n{e.__dict__}') + events.remove(e) + else: + logger.info(f'Database Error Saving individual Event uuid={e.uuid} try={retry_count}, error: {str(exc_indv)}') + metrics_duration_to_save = time.perf_counter() - metrics_duration_to_save - for e in events: + for e in saved_events: if not getattr(e, '_skip_websocket_message', False): metrics_events_broadcast += 1 emit_event_detail(e) if getattr(e, '_notification_trigger_event', False): job_stats_wrapup(getattr(e, e.JOB_REFERENCE), event=e) - self.buff = {} self.last_flush = time.time() # only update metrics if we saved events if (metrics_bulk_events_saved + metrics_singular_events_saved) > 0: @@ -267,20 +276,16 @@ class CallbackBrokerWorker(BaseWorker): try: self.flush(force=flush) break - except (OperationalError, InterfaceError, InternalError) as exc: + except Exception as exc: + # Aside form bugs, exceptions here are assumed to be due to database flake if retries >= self.MAX_RETRIES: logger.exception('Worker could not re-establish database connectivity, giving up on one or more events.') + self.buff = {} return delay = 60 * retries logger.warning(f'Database Error Flushing Job Events, retry #{retries + 1} in {delay} seconds: {str(exc)}') django_connection.close() time.sleep(delay) retries += 1 - except DatabaseError: - logger.exception('Database Error Flushing Job Events') - django_connection.close() - break - except Exception as exc: - tb = traceback.format_exc() - logger.error('Callback Task Processor Raised Exception: %r', exc) - logger.error('Detail: {}'.format(tb)) + except Exception: + logger.exception(f'Callback Task Processor Raised Unexpected Exception processing event data:\n{body}') diff --git a/awx/main/tests/functional/commands/test_callback_receiver.py b/awx/main/tests/functional/commands/test_callback_receiver.py index 389edf6ffb..234392fb44 100644 --- a/awx/main/tests/functional/commands/test_callback_receiver.py +++ b/awx/main/tests/functional/commands/test_callback_receiver.py @@ -1,7 +1,15 @@ import pytest +import time +from unittest import mock +from uuid import uuid4 + +from django.test import TransactionTestCase + +from awx.main.dispatch.worker.callback import job_stats_wrapup, CallbackBrokerWorker -from awx.main.dispatch.worker.callback import job_stats_wrapup from awx.main.models.jobs import Job +from awx.main.models.inventory import InventoryUpdate, InventorySource +from awx.main.models.events import InventoryUpdateEvent @pytest.mark.django_db @@ -24,3 +32,108 @@ def test_wrapup_does_send_notifications(mocker): job.refresh_from_db() assert job.host_status_counts == {} mock.assert_called_once_with('succeeded') + + +class FakeRedis: + def keys(self, *args, **kwargs): + return [] + + def set(self): + pass + + def get(self): + return None + + @classmethod + def from_url(cls, *args, **kwargs): + return cls() + + def pipeline(self): + return self + + +class TestCallbackBrokerWorker(TransactionTestCase): + @pytest.fixture(autouse=True) + def turn_off_websockets(self): + with mock.patch('awx.main.dispatch.worker.callback.emit_event_detail', lambda *a, **kw: None): + yield + + def get_worker(self): + with mock.patch('redis.Redis', new=FakeRedis): # turn off redis stuff + return CallbackBrokerWorker() + + def event_create_kwargs(self): + inventory_update = InventoryUpdate.objects.create(source='file', inventory_source=InventorySource.objects.create(source='file')) + return dict(inventory_update=inventory_update, created=inventory_update.created) + + def test_flush_with_valid_event(self): + worker = self.get_worker() + events = [InventoryUpdateEvent(uuid=str(uuid4()), **self.event_create_kwargs())] + worker.buff = {InventoryUpdateEvent: events} + worker.flush() + assert worker.buff.get(InventoryUpdateEvent, []) == [] + assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 1 + + def test_flush_with_invalid_event(self): + worker = self.get_worker() + kwargs = self.event_create_kwargs() + events = [ + InventoryUpdateEvent(uuid=str(uuid4()), stdout='good1', **kwargs), + InventoryUpdateEvent(uuid=str(uuid4()), stdout='bad', counter=-2, **kwargs), + InventoryUpdateEvent(uuid=str(uuid4()), stdout='good2', **kwargs), + ] + worker.buff = {InventoryUpdateEvent: events.copy()} + worker.flush() + assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 1 + assert InventoryUpdateEvent.objects.filter(uuid=events[1].uuid).count() == 0 + assert InventoryUpdateEvent.objects.filter(uuid=events[2].uuid).count() == 1 + assert worker.buff == {InventoryUpdateEvent: [events[1]]} + + def test_duplicate_key_not_saved_twice(self): + worker = self.get_worker() + events = [InventoryUpdateEvent(uuid=str(uuid4()), **self.event_create_kwargs())] + worker.buff = {InventoryUpdateEvent: events.copy()} + worker.flush() + + # put current saved event in buffer (error case) + worker.buff = {InventoryUpdateEvent: [InventoryUpdateEvent.objects.get(uuid=events[0].uuid)]} + worker.last_flush = time.time() - 2.0 + # here, the bulk_create will fail with UNIQUE constraint violation, but individual saves should resolve it + worker.flush() + assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 1 + assert worker.buff.get(InventoryUpdateEvent, []) == [] + + def test_give_up_on_bad_event(self): + worker = self.get_worker() + events = [InventoryUpdateEvent(uuid=str(uuid4()), counter=-2, **self.event_create_kwargs())] + worker.buff = {InventoryUpdateEvent: events.copy()} + + for i in range(5): + worker.last_flush = time.time() - 2.0 + worker.flush() + + # Could not save, should be logged, and buffer should be cleared + assert worker.buff.get(InventoryUpdateEvent, []) == [] + assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 0 # sanity + + def test_postgres_invalid_NUL_char(self): + # In postgres, text fields reject NUL character, 0x00 + # tests use sqlite3 which will not raise an error + # but we can still test that it is sanitized before saving + worker = self.get_worker() + kwargs = self.event_create_kwargs() + events = [InventoryUpdateEvent(uuid=str(uuid4()), stdout="\x00", **kwargs)] + assert "\x00" in events[0].stdout # sanity + worker.buff = {InventoryUpdateEvent: events.copy()} + + with mock.patch.object(InventoryUpdateEvent.objects, 'bulk_create', side_effect=ValueError): + with mock.patch.object(events[0], 'save', side_effect=ValueError): + worker.flush() + + assert "\x00" not in events[0].stdout + + worker.last_flush = time.time() - 2.0 + worker.flush() + + event = InventoryUpdateEvent.objects.get(uuid=events[0].uuid) + assert "\x00" not in event.stdout