diff --git a/awx/main/dispatch/pool.py b/awx/main/dispatch/pool.py index 8830a69af8..7c203eebce 100644 --- a/awx/main/dispatch/pool.py +++ b/awx/main/dispatch/pool.py @@ -7,6 +7,7 @@ import time import traceback from datetime import datetime from uuid import uuid4 +import json import collections from multiprocessing import Process @@ -25,7 +26,10 @@ from ansible_base.lib.logging.runtime import log_excess_runtime from awx.main.models import UnifiedJob from awx.main.dispatch import reaper -from awx.main.utils.common import convert_mem_str_to_bytes, get_mem_effective_capacity +from awx.main.utils.common import get_mem_effective_capacity, get_corrected_memory, get_corrected_cpu, get_cpu_effective_capacity + +# ansible-runner +from ansible_runner.utils.capacity import get_mem_in_bytes, get_cpu_count if 'run_callback_receiver' in sys.argv: logger = logging.getLogger('awx.main.commands.run_callback_receiver') @@ -307,6 +311,41 @@ class WorkerPool(object): logger.exception('could not kill {}'.format(worker.pid)) +def get_auto_max_workers(): + """Method we normally rely on to get max_workers + + Uses almost same logic as Instance.local_health_check + The important thing is to be MORE than Instance.capacity + so that the task-manager does not over-schedule this node + + Ideally we would just use the capacity from the database plus reserve workers, + but this poses some bootstrap problems where OCP task containers + register themselves after startup + """ + # Get memory from ansible-runner + total_memory_gb = get_mem_in_bytes() + + # This may replace memory calculation with a user override + corrected_memory = get_corrected_memory(total_memory_gb) + + # Get same number as max forks based on memory, this function takes memory as bytes + mem_capacity = get_mem_effective_capacity(corrected_memory, is_control_node=True) + + # Follow same process for CPU capacity constraint + cpu_count = get_cpu_count() + corrected_cpu = get_corrected_cpu(cpu_count) + cpu_capacity = get_cpu_effective_capacity(corrected_cpu, is_control_node=True) + + # Here is what is different from health checks, + auto_max = max(mem_capacity, cpu_capacity) + + # add magic number of extra workers to ensure + # we have a few extra workers to run the heartbeat + auto_max += 7 + + return auto_max + + class AutoscalePool(WorkerPool): """ An extended pool implementation that automatically scales workers up and @@ -320,19 +359,7 @@ class AutoscalePool(WorkerPool): super(AutoscalePool, self).__init__(*args, **kwargs) if self.max_workers is None: - settings_absmem = getattr(settings, 'SYSTEM_TASK_ABS_MEM', None) - if settings_absmem is not None: - # There are 1073741824 bytes in a gigabyte. Convert bytes to gigabytes by dividing by 2**30 - total_memory_gb = convert_mem_str_to_bytes(settings_absmem) // 2**30 - else: - total_memory_gb = (psutil.virtual_memory().total >> 30) + 1 # noqa: round up - - # Get same number as max forks based on memory, this function takes memory as bytes - self.max_workers = get_mem_effective_capacity(total_memory_gb * 2**30) - - # add magic prime number of extra workers to ensure - # we have a few extra workers to run the heartbeat - self.max_workers += 7 + self.max_workers = get_auto_max_workers() # max workers can't be less than min_workers self.max_workers = max(self.min_workers, self.max_workers) @@ -346,6 +373,9 @@ class AutoscalePool(WorkerPool): self.scale_up_ct = 0 self.worker_count_max = 0 + # last time we wrote current tasks, to avoid too much log spam + self.last_task_list_log = time.monotonic() + def produce_subsystem_metrics(self, metrics_object): metrics_object.set('dispatcher_pool_scale_up_events', self.scale_up_ct) metrics_object.set('dispatcher_pool_active_task_count', sum(len(w.managed_tasks) for w in self.workers)) @@ -463,6 +493,14 @@ class AutoscalePool(WorkerPool): self.worker_count_max = new_worker_ct return ret + @staticmethod + def fast_task_serialization(current_task): + try: + return str(current_task.get('task')) + ' - ' + str(sorted(current_task.get('args', []))) + ' - ' + str(sorted(current_task.get('kwargs', {}))) + except Exception: + # just make sure this does not make things worse + return str(current_task) + def write(self, preferred_queue, body): if 'guid' in body: set_guid(body['guid']) @@ -484,6 +522,15 @@ class AutoscalePool(WorkerPool): if isinstance(body, dict): task_name = body.get('task') logger.warning(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}') + # Once every 10 seconds write out task list for debugging + if time.monotonic() - self.last_task_list_log >= 10.0: + task_counts = {} + for worker in self.workers: + task_slug = self.fast_task_serialization(worker.current_task) + task_counts.setdefault(task_slug, 0) + task_counts[task_slug] += 1 + logger.info(f'Running tasks by count:\n{json.dumps(task_counts, indent=2)}') + self.last_task_list_log = time.monotonic() return super(AutoscalePool, self).write(preferred_queue, body) except Exception: for conn in connections.all(): diff --git a/awx/main/dispatch/worker/base.py b/awx/main/dispatch/worker/base.py index 7dcea95b5b..252df053d9 100644 --- a/awx/main/dispatch/worker/base.py +++ b/awx/main/dispatch/worker/base.py @@ -238,7 +238,7 @@ class AWXConsumerPG(AWXConsumerBase): def run(self, *args, **kwargs): super(AWXConsumerPG, self).run(*args, **kwargs) - logger.info(f"Running worker {self.name} listening to queues {self.queues}") + logger.info(f"Running {self.name}, workers min={self.pool.min_workers} max={self.pool.max_workers}, listening to queues {self.queues}") init = False while True: diff --git a/awx/main/tests/data/sleep_task.py b/awx/main/tests/data/sleep_task.py new file mode 100644 index 0000000000..f9ff58b69a --- /dev/null +++ b/awx/main/tests/data/sleep_task.py @@ -0,0 +1,17 @@ +import time +import logging + +from awx.main.dispatch import get_task_queuename +from awx.main.dispatch.publish import task + + +logger = logging.getLogger(__name__) + + +@task(queue=get_task_queuename) +def sleep_task(seconds=10, log=False): + if log: + logger.info('starting sleep_task') + time.sleep(seconds) + if log: + logger.info('finished sleep_task') diff --git a/awx/main/tests/functional/commands/test_callback_receiver.py b/awx/main/tests/functional/commands/test_callback_receiver.py index 7b9346fe73..145c48d605 100644 --- a/awx/main/tests/functional/commands/test_callback_receiver.py +++ b/awx/main/tests/functional/commands/test_callback_receiver.py @@ -34,40 +34,18 @@ def test_wrapup_does_send_notifications(mocker): mock.assert_called_once_with('succeeded') -class FakeRedis: - def keys(self, *args, **kwargs): - return [] - - def set(self): - pass - - def get(self): - return None - - @classmethod - def from_url(cls, *args, **kwargs): - return cls() - - def pipeline(self): - return self - - class TestCallbackBrokerWorker(TransactionTestCase): @pytest.fixture(autouse=True) - def turn_off_websockets(self): + def turn_off_websockets_and_redis(self, fake_redis): with mock.patch('awx.main.dispatch.worker.callback.emit_event_detail', lambda *a, **kw: None): yield - def get_worker(self): - with mock.patch('redis.Redis', new=FakeRedis): # turn off redis stuff - return CallbackBrokerWorker() - def event_create_kwargs(self): inventory_update = InventoryUpdate.objects.create(source='file', inventory_source=InventorySource.objects.create(source='file')) return dict(inventory_update=inventory_update, created=inventory_update.created) def test_flush_with_valid_event(self): - worker = self.get_worker() + worker = CallbackBrokerWorker() events = [InventoryUpdateEvent(uuid=str(uuid4()), **self.event_create_kwargs())] worker.buff = {InventoryUpdateEvent: events} worker.flush() @@ -75,7 +53,7 @@ class TestCallbackBrokerWorker(TransactionTestCase): assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 1 def test_flush_with_invalid_event(self): - worker = self.get_worker() + worker = CallbackBrokerWorker() kwargs = self.event_create_kwargs() events = [ InventoryUpdateEvent(uuid=str(uuid4()), stdout='good1', **kwargs), @@ -90,7 +68,7 @@ class TestCallbackBrokerWorker(TransactionTestCase): assert worker.buff == {InventoryUpdateEvent: [events[1]]} def test_duplicate_key_not_saved_twice(self): - worker = self.get_worker() + worker = CallbackBrokerWorker() events = [InventoryUpdateEvent(uuid=str(uuid4()), **self.event_create_kwargs())] worker.buff = {InventoryUpdateEvent: events.copy()} worker.flush() @@ -104,7 +82,7 @@ class TestCallbackBrokerWorker(TransactionTestCase): assert worker.buff.get(InventoryUpdateEvent, []) == [] def test_give_up_on_bad_event(self): - worker = self.get_worker() + worker = CallbackBrokerWorker() events = [InventoryUpdateEvent(uuid=str(uuid4()), counter=-2, **self.event_create_kwargs())] worker.buff = {InventoryUpdateEvent: events.copy()} @@ -117,7 +95,7 @@ class TestCallbackBrokerWorker(TransactionTestCase): assert InventoryUpdateEvent.objects.filter(uuid=events[0].uuid).count() == 0 # sanity def test_flush_with_empty_buffer(self): - worker = self.get_worker() + worker = CallbackBrokerWorker() worker.buff = {InventoryUpdateEvent: []} with mock.patch.object(InventoryUpdateEvent.objects, 'bulk_create') as flush_mock: worker.flush() @@ -127,7 +105,7 @@ class TestCallbackBrokerWorker(TransactionTestCase): # In postgres, text fields reject NUL character, 0x00 # tests use sqlite3 which will not raise an error # but we can still test that it is sanitized before saving - worker = self.get_worker() + worker = CallbackBrokerWorker() kwargs = self.event_create_kwargs() events = [InventoryUpdateEvent(uuid=str(uuid4()), stdout="\x00", **kwargs)] assert "\x00" in events[0].stdout # sanity diff --git a/awx/main/tests/functional/conftest.py b/awx/main/tests/functional/conftest.py index 13c2fd5338..6fbed69760 100644 --- a/awx/main/tests/functional/conftest.py +++ b/awx/main/tests/functional/conftest.py @@ -63,6 +63,33 @@ def swagger_autogen(requests=__SWAGGER_REQUESTS__): return requests +class FakeRedis: + def keys(self, *args, **kwargs): + return [] + + def set(self): + pass + + def get(self): + return None + + @classmethod + def from_url(cls, *args, **kwargs): + return cls() + + def pipeline(self): + return self + + def ping(self): + return + + +@pytest.fixture +def fake_redis(): + with mock.patch('redis.Redis', new=FakeRedis): # turn off redis stuff + yield + + @pytest.fixture def user(): def u(name, is_superuser=False): diff --git a/awx/main/tests/functional/models/test_ha.py b/awx/main/tests/functional/models/test_ha.py index 0b4ced53c2..bf8c5309c7 100644 --- a/awx/main/tests/functional/models/test_ha.py +++ b/awx/main/tests/functional/models/test_ha.py @@ -3,6 +3,10 @@ import pytest # AWX from awx.main.ha import is_ha_environment from awx.main.models.ha import Instance +from awx.main.dispatch.pool import get_auto_max_workers + +# Django +from django.test.utils import override_settings @pytest.mark.django_db @@ -17,3 +21,25 @@ def test_db_localhost(): Instance.objects.create(hostname='foo', node_type='hybrid') Instance.objects.create(hostname='bar', node_type='execution') assert is_ha_environment() is False + + +@pytest.mark.django_db +@pytest.mark.parametrize( + 'settings', + [ + dict(SYSTEM_TASK_ABS_MEM='16Gi', SYSTEM_TASK_ABS_CPU='24', SYSTEM_TASK_FORKS_MEM=400, SYSTEM_TASK_FORKS_CPU=4), + dict(SYSTEM_TASK_ABS_MEM='124Gi', SYSTEM_TASK_ABS_CPU='2', SYSTEM_TASK_FORKS_MEM=None, SYSTEM_TASK_FORKS_CPU=None), + ], + ids=['cpu_dominated', 'memory_dominated'], +) +def test_dispatcher_max_workers_reserve(settings, fake_redis): + """This tests that the dispatcher max_workers matches instance capacity + + Assumes capacity_adjustment is 1, + plus reserve worker count + """ + with override_settings(**settings): + i = Instance.objects.create(hostname='test-1', node_type='hybrid') + i.local_health_check() + + assert get_auto_max_workers() == i.capacity + 7, (i.cpu, i.memory, i.cpu_capacity, i.mem_capacity) diff --git a/tools/scripts/firehose_tasks.py b/tools/scripts/firehose_tasks.py new file mode 100755 index 0000000000..823cdb010f --- /dev/null +++ b/tools/scripts/firehose_tasks.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python + +from django import setup + +from awx import prepare_env + +prepare_env() + +setup() + +# Keeping this in test folder allows it to be importable +from awx.main.tests.data.sleep_task import sleep_task + + +for i in range(634): + sleep_task.delay()