Workaround for events with NUL char, touch up error loop (#13398)

* Workaround for events with NUL char, touch up error loop

This fixes an error where some events would not save
  due to having the 0x00 character which errors in postgres
  this adds a line to replace it with empty text

Hitting that kind of event put us in an infinite error loop
  so this change makes a number of changes to prevent similar loops
  the showcase example is a negative counter,
  this is not realistic in the real world but works for unit tests

These error loop fixes seek to esablish the cases where we clear the buffer
Some logic is removed from the outer loop, with the idea that
ensure_connection will better distinguish flake

* From review comments, delay NUL char sanitization to later

Use pop to make list operations more clear

* Fix incorrect use of pop
This commit is contained in:
Alan Rominger 2023-01-19 13:36:23 -05:00 committed by GitHub
parent 01a7076267
commit 8a4059d266
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 146 additions and 28 deletions

View File

@ -3,14 +3,12 @@ import logging
import os
import signal
import time
import traceback
import datetime
from django.conf import settings
from django.utils.functional import cached_property
from django.utils.timezone import now as tz_now
from django.db import DatabaseError, OperationalError, transaction, connection as django_connection
from django.db.utils import InterfaceError, InternalError
from django.db import transaction, connection as django_connection
from django_guid import set_guid
import psutil
@ -64,6 +62,7 @@ class CallbackBrokerWorker(BaseWorker):
"""
MAX_RETRIES = 2
INDIVIDUAL_EVENT_RETRIES = 3
last_stats = time.time()
last_flush = time.time()
total = 0
@ -164,38 +163,48 @@ class CallbackBrokerWorker(BaseWorker):
else: # only calculate the seconds if the created time already has been set
metrics_total_job_event_processing_seconds += e.modified - e.created
metrics_duration_to_save = time.perf_counter()
saved_events = []
try:
cls.objects.bulk_create(events)
metrics_bulk_events_saved += len(events)
saved_events = events
self.buff[cls] = []
except Exception as exc:
logger.warning(f'Error in events bulk_create, will try indiviually up to 5 errors, error {str(exc)}')
# If the database is flaking, let ensure_connection throw a general exception
# will be caught by the outer loop, which goes into a proper sleep and retry loop
django_connection.ensure_connection()
logger.warning(f'Error in events bulk_create, will try indiviually, error: {str(exc)}')
# if an exception occurs, we should re-attempt to save the
# events one-by-one, because something in the list is
# broken/stale
consecutive_errors = 0
events_saved = 0
metrics_events_batch_save_errors += 1
for e in events:
for e in events.copy():
try:
e.save()
events_saved += 1
consecutive_errors = 0
metrics_singular_events_saved += 1
events.remove(e)
saved_events.append(e) # Importantly, remove successfully saved events from the buffer
except Exception as exc_indv:
consecutive_errors += 1
logger.info(f'Database Error Saving individual Job Event, error {str(exc_indv)}')
if consecutive_errors >= 5:
raise
metrics_singular_events_saved += events_saved
if events_saved == 0:
raise
retry_count = getattr(e, '_retry_count', 0) + 1
e._retry_count = retry_count
# special sanitization logic for postgres treatment of NUL 0x00 char
if (retry_count == 1) and isinstance(exc_indv, ValueError) and ("\x00" in e.stdout):
e.stdout = e.stdout.replace("\x00", "")
if retry_count >= self.INDIVIDUAL_EVENT_RETRIES:
logger.error(f'Hit max retries ({retry_count}) saving individual Event error: {str(exc_indv)}\ndata:\n{e.__dict__}')
events.remove(e)
else:
logger.info(f'Database Error Saving individual Event uuid={e.uuid} try={retry_count}, error: {str(exc_indv)}')
metrics_duration_to_save = time.perf_counter() - metrics_duration_to_save
for e in events:
for e in saved_events:
if not getattr(e, '_skip_websocket_message', False):
metrics_events_broadcast += 1
emit_event_detail(e)
if getattr(e, '_notification_trigger_event', False):
job_stats_wrapup(getattr(e, e.JOB_REFERENCE), event=e)
self.buff = {}
self.last_flush = time.time()
# only update metrics if we saved events
if (metrics_bulk_events_saved + metrics_singular_events_saved) > 0:
@ -267,20 +276,16 @@ class CallbackBrokerWorker(BaseWorker):
try:
self.flush(force=flush)
break
except (OperationalError, InterfaceError, InternalError) as exc:
except Exception as exc:
# Aside form bugs, exceptions here are assumed to be due to database flake
if retries >= self.MAX_RETRIES:
logger.exception('Worker could not re-establish database connectivity, giving up on one or more events.')
self.buff = {}
return
delay = 60 * retries
logger.warning(f'Database Error Flushing Job Events, retry #{retries + 1} in {delay} seconds: {str(exc)}')
django_connection.close()
time.sleep(delay)
retries += 1
except DatabaseError:
logger.exception('Database Error Flushing Job Events')
django_connection.close()
break
except Exception as exc:
tb = traceback.format_exc()
logger.error('Callback Task Processor Raised Exception: %r', exc)
logger.error('Detail: {}'.format(tb))
except Exception:
logger.exception(f'Callback Task Processor Raised Unexpected Exception processing event data:\n{body}')

View File

@ -1,7 +1,15 @@
import pytest
import time
from unittest import mock
from uuid import uuid4
from django.test import TransactionTestCase
from awx.main.dispatch.worker.callback import job_stats_wrapup, CallbackBrokerWorker
from awx.main.dispatch.worker.callback import job_stats_wrapup
from awx.main.models.jobs import Job
from awx.main.models.inventory import InventoryUpdate, InventorySource
from awx.main.models.events import InventoryUpdateEvent
@pytest.mark.django_db
@ -24,3 +32,108 @@ def test_wrapup_does_send_notifications(mocker):
job.refresh_from_db()
assert job.host_status_counts == {}
mock.assert_called_once_with('succeeded')
class FakeRedis:
def keys(self, *args, **kwargs):
return []
def set(self):
pass
def get(self):
return None
@classmethod
def from_url(cls, *args, **kwargs):
return cls()
def pipeline(self):
return self
class TestCallbackBrokerWorker(TransactionTestCase):
@pytest.fixture(autouse=True)
def turn_off_websockets(self):
with mock.patch('awx.main.dispatch.worker.callback.emit_event_detail', lambda *a, **kw: None):
yield
def get_worker(self):
with mock.patch('redis.Redis', new=FakeRedis): # turn off redis stuff
return CallbackBrokerWorker()
def event_create_kwargs(self):
inventory_update = InventoryUpdate.objects.create(source='file', inventory_source=InventorySource.objects.create(source='file'))
return dict(inventory_update=inventory_update, created=inventory_update.created)
def test_flush_with_valid_event(self):
worker = self.get_worker()
events = [InventoryUpdateEvent(uuid=str(uuid4()), **self.event_create_kwargs())]
worker.buff = {InventoryUpdateEvent: events}
worker.flush()
assert worker.buff.get(InventoryUpdateEvent, []) == []
assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 1
def test_flush_with_invalid_event(self):
worker = self.get_worker()
kwargs = self.event_create_kwargs()
events = [
InventoryUpdateEvent(uuid=str(uuid4()), stdout='good1', **kwargs),
InventoryUpdateEvent(uuid=str(uuid4()), stdout='bad', counter=-2, **kwargs),
InventoryUpdateEvent(uuid=str(uuid4()), stdout='good2', **kwargs),
]
worker.buff = {InventoryUpdateEvent: events.copy()}
worker.flush()
assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 1
assert InventoryUpdateEvent.objects.filter(uuid=events[1].uuid).count() == 0
assert InventoryUpdateEvent.objects.filter(uuid=events[2].uuid).count() == 1
assert worker.buff == {InventoryUpdateEvent: [events[1]]}
def test_duplicate_key_not_saved_twice(self):
worker = self.get_worker()
events = [InventoryUpdateEvent(uuid=str(uuid4()), **self.event_create_kwargs())]
worker.buff = {InventoryUpdateEvent: events.copy()}
worker.flush()
# put current saved event in buffer (error case)
worker.buff = {InventoryUpdateEvent: [InventoryUpdateEvent.objects.get(uuid=events[0].uuid)]}
worker.last_flush = time.time() - 2.0
# here, the bulk_create will fail with UNIQUE constraint violation, but individual saves should resolve it
worker.flush()
assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 1
assert worker.buff.get(InventoryUpdateEvent, []) == []
def test_give_up_on_bad_event(self):
worker = self.get_worker()
events = [InventoryUpdateEvent(uuid=str(uuid4()), counter=-2, **self.event_create_kwargs())]
worker.buff = {InventoryUpdateEvent: events.copy()}
for i in range(5):
worker.last_flush = time.time() - 2.0
worker.flush()
# Could not save, should be logged, and buffer should be cleared
assert worker.buff.get(InventoryUpdateEvent, []) == []
assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 0 # sanity
def test_postgres_invalid_NUL_char(self):
# In postgres, text fields reject NUL character, 0x00
# tests use sqlite3 which will not raise an error
# but we can still test that it is sanitized before saving
worker = self.get_worker()
kwargs = self.event_create_kwargs()
events = [InventoryUpdateEvent(uuid=str(uuid4()), stdout="\x00", **kwargs)]
assert "\x00" in events[0].stdout # sanity
worker.buff = {InventoryUpdateEvent: events.copy()}
with mock.patch.object(InventoryUpdateEvent.objects, 'bulk_create', side_effect=ValueError):
with mock.patch.object(events[0], 'save', side_effect=ValueError):
worker.flush()
assert "\x00" not in events[0].stdout
worker.last_flush = time.time() - 2.0
worker.flush()
event = InventoryUpdateEvent.objects.get(uuid=events[0].uuid)
assert "\x00" not in event.stdout