From e55578b64e756bc6612ae94a337f69c248254a0a Mon Sep 17 00:00:00 2001 From: thedoubl3j Date: Wed, 17 Dec 2025 14:29:00 -0500 Subject: [PATCH] WIP First pass * started removing feature flags and adjusting logic * WIP --- awx/main/dispatch/publish.py | 56 +-- .../management/commands/run_dispatcher.py | 115 +----- awx/main/models/unified_jobs.py | 36 +- awx/main/scheduler/task_manager.py | 24 +- awx/main/tasks/system.py | 26 +- .../test_feature_flags_api.py | 2 +- awx/main/tests/functional/test_dispatch.py | 370 ------------------ awx/main/tests/unit/test_settings.py | 6 +- awx/settings/defaults.py | 1 - awx/settings/development_defaults.py | 1 - docs/tasks.md | 8 - 11 files changed, 56 insertions(+), 589 deletions(-) diff --git a/awx/main/dispatch/publish.py b/awx/main/dispatch/publish.py index 4aef040a88..451915afde 100644 --- a/awx/main/dispatch/publish.py +++ b/awx/main/dispatch/publish.py @@ -1,6 +1,5 @@ import inspect import logging -import json import time from uuid import uuid4 @@ -9,9 +8,6 @@ from dispatcherd.processors.blocker import Blocker from dispatcherd.utils import resolve_callable from django_guid import get_guid -from django.conf import settings - -from . import pg_bus_conn logger = logging.getLogger('awx.main.dispatch') @@ -101,43 +97,21 @@ class task: @classmethod def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw): - try: - from flags.state import flag_enabled - - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - # At this point we have the import string, and submit_task wants the method, so back to that - actual_task = resolve_callable(cls.name) - processor_options = () - if on_duplicate is not None: - processor_options = (Blocker.Params(on_duplicate=on_duplicate),) - return submit_task( - actual_task, - args=args, - kwargs=kwargs, - queue=queue, - uuid=uuid, - timeout=timeout, - processor_options=processor_options, - **kw, - ) - except Exception: - logger.exception(f"[DISPATCHER] Failed to check for alternative dispatcherd implementation for {cls.name}") - # Continue with original implementation if anything fails - pass - - # Original implementation follows - queue = queue or getattr(cls.queue, 'im_func', cls.queue) - if not queue: - msg = f'{cls.name}: Queue value required and may not be None' - logger.error(msg) - raise ValueError(msg) - obj = cls.get_async_body(args=args, kwargs=kwargs, uuid=uuid, **kw) - if callable(queue): - queue = queue() - if not settings.DISPATCHER_MOCK_PUBLISH: - with pg_bus_conn() as conn: - conn.notify(queue, json.dumps(obj)) - return (obj, queue) + # At this point we have the import string, and submit_task wants the method, so back to that + actual_task = resolve_callable(cls.name) + processor_options = () + if on_duplicate is not None: + processor_options = (Blocker.Params(on_duplicate=on_duplicate),) + return submit_task( + actual_task, + args=args, + kwargs=kwargs, + queue=queue, + uuid=uuid, + timeout=timeout, + processor_options=processor_options, + **kw, + ) # If the object we're wrapping *is* a class (e.g., RunJob), return # a *new* class that inherits from the wrapped class *and* BaseTask diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index 5571b56a0b..63b42c6d64 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -12,18 +12,11 @@ from django.db import connection from django.core.management.base import BaseCommand, CommandError from django.core.cache import cache as django_cache -from flags.state import flag_enabled - from dispatcherd.factories import get_control_from_settings from dispatcherd import run_service from dispatcherd.config import setup as dispatcher_setup -from awx.main.dispatch import get_task_queuename from awx.main.dispatch.config import get_dispatcherd_config -from awx.main.dispatch.control import Control -from awx.main.dispatch.pool import AutoscalePool -from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker -from awx.main.analytics.subsystem_metrics import DispatcherMetricsServer logger = logging.getLogger('awx.main.dispatch') @@ -33,14 +26,7 @@ class Command(BaseCommand): def add_arguments(self, parser): parser.add_argument('--status', dest='status', action='store_true', help='print the internal state of any running dispatchers') - parser.add_argument('--schedule', dest='schedule', action='store_true', help='print the current status of schedules being ran by dispatcher') parser.add_argument('--running', dest='running', action='store_true', help='print the UUIDs of any tasked managed by this dispatcher') - parser.add_argument( - '--reload', - dest='reload', - action='store_true', - help=('cause the dispatcher to recycle all of its worker processes; running jobs will run to completion first'), - ) parser.add_argument( '--cancel', dest='cancel', @@ -53,38 +39,17 @@ class Command(BaseCommand): def handle(self, *arg, **options): if options.get('status'): - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - ctl = get_control_from_settings() - running_data = ctl.control_with_reply('status') - if len(running_data) != 1: - raise CommandError('Did not receive expected number of replies') - print(yaml.dump(running_data[0], default_flow_style=False)) - return - else: - print(Control('dispatcher').status()) - return - if options.get('schedule'): - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - print('NOT YET IMPLEMENTED') - return - else: - print(Control('dispatcher').schedule()) + ctl = get_control_from_settings() + running_data = ctl.control_with_reply('status') + if len(running_data) != 1: + raise CommandError('Did not receive expected number of replies') + print(yaml.dump(running_data[0], default_flow_style=False)) return if options.get('running'): - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - ctl = get_control_from_settings() - running_data = ctl.control_with_reply('running') - print(yaml.dump(running_data, default_flow_style=False)) - return - else: - print(Control('dispatcher').running()) - return - if options.get('reload'): - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - print('NOT YET IMPLEMENTED') - return - else: - return Control('dispatcher').control({'control': 'reload'}) + ctl = get_control_from_settings() + running_data = ctl.control_with_reply('running') + print(yaml.dump(running_data, default_flow_style=False)) + return if options.get('cancel'): cancel_str = options.get('cancel') try: @@ -94,56 +59,14 @@ class Command(BaseCommand): if not isinstance(cancel_data, list): cancel_data = [cancel_str] - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - ctl = get_control_from_settings() - results = [] - for task_id in cancel_data: - # For each task UUID, send an individual cancel command - result = ctl.control_with_reply('cancel', data={'uuid': task_id}) - results.append(result) - print(yaml.dump(results, default_flow_style=False)) - return - else: - print(Control('dispatcher').cancel(cancel_data)) - return + ctl = get_control_from_settings() + results = [] + for task_id in cancel_data: + # For each task UUID, send an individual cancel command + result = ctl.control_with_reply('cancel', data={'uuid': task_id}) + results.append(result) + print(yaml.dump(results, default_flow_style=False)) + return - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - self.configure_dispatcher_logging() - - # Close the connection, because the pg_notify broker will create new async connection - connection.close() - django_cache.close() - - dispatcher_setup(get_dispatcherd_config(for_service=True)) - run_service() - else: - consumer = None - - try: - DispatcherMetricsServer().start() - except redis.exceptions.ConnectionError as exc: - raise CommandError(f'Dispatcher could not connect to redis, error: {exc}') - - try: - queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()] - consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE) - consumer.run() - except KeyboardInterrupt: - logger.debug('Terminating Task Dispatcher') - if consumer: - consumer.stop() - - def configure_dispatcher_logging(self): - # Apply special log rule for the parent process - special_logging = copy.deepcopy(settings.LOGGING) - for handler_name, handler_config in special_logging.get('handlers', {}).items(): - filters = handler_config.get('filters', []) - if 'dynamic_level_filter' in filters: - handler_config['filters'] = [flt for flt in filters if flt != 'dynamic_level_filter'] - logger.info(f'Dispatcherd main process replaced log level filter for {handler_name} handler') - - # Apply the custom logging level here, before the asyncio code starts - special_logging.setdefault('loggers', {}).setdefault('dispatcherd', {}) - special_logging['loggers']['dispatcherd']['level'] = settings.LOG_AGGREGATOR_LEVEL - - logging.config.dictConfig(special_logging) + dispatcher_setup(get_dispatcherd_config(for_service=True)) + run_service() diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 3a3ce545a5..bfffee49f9 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -39,7 +39,6 @@ from ansible_base.rbac.models import RoleEvaluation # AWX from awx.main.models.base import CommonModelNameNotUnique, PasswordFieldsModel, NotificationFieldsModel from awx.main.dispatch import get_task_queuename -from awx.main.dispatch.control import Control as ControlDispatcher from awx.main.registrar import activity_stream_registrar from awx.main.models.mixins import TaskManagerUnifiedJobMixin, ExecutionEnvironmentMixin from awx.main.models.rbac import to_permissions @@ -1497,43 +1496,32 @@ class UnifiedJob( if not self.celery_task_id: return False - canceled = [] # Special case for task manager (used during workflow job cancellation) if not connection.get_autocommit(): - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - try: - from dispatcherd.factories import get_control_from_settings + try: + from dispatcherd.factories import get_control_from_settings - ctl = get_control_from_settings() - ctl.control('cancel', data={'uuid': self.celery_task_id}) - except Exception: - logger.exception("Error sending cancel command to new dispatcher") - else: - try: - ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id], with_reply=False) - except Exception: - logger.exception("Error sending cancel command to legacy dispatcher") + ctl = get_control_from_settings() + ctl.control('cancel', data={'uuid': self.celery_task_id}) + except Exception: + logger.exception("Error sending cancel command to dispatcher") return True # task manager itself needs to act under assumption that cancel was received # Standard case with reply try: timeout = 5 - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - from dispatcherd.factories import get_control_from_settings + from dispatcherd.factories import get_control_from_settings - ctl = get_control_from_settings() - results = ctl.control_with_reply('cancel', data={'uuid': self.celery_task_id}, expected_replies=1, timeout=timeout) - # Check if cancel was successful by checking if we got any results - return bool(results and len(results) > 0) - else: - # Original implementation - canceled = ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id]) + ctl = get_control_from_settings() + results = ctl.control_with_reply('cancel', data={'uuid': self.celery_task_id}, expected_replies=1, timeout=timeout) + # Check if cancel was successful by checking if we got any results + return bool(results and len(results) > 0) except socket.timeout: logger.error(f'could not reach dispatcher on {self.controller_node} within {timeout}s') except Exception: logger.exception("error encountered when checking task status") - return bool(self.celery_task_id in canceled) # True or False, whether confirmation was obtained + return False # whether confirmation was obtained def cancel(self, job_explanation=None, is_chain=False): if self.can_cancel: diff --git a/awx/main/scheduler/task_manager.py b/awx/main/scheduler/task_manager.py index 5904c47d57..5fc1c0b51c 100644 --- a/awx/main/scheduler/task_manager.py +++ b/awx/main/scheduler/task_manager.py @@ -19,9 +19,6 @@ from django.utils.timezone import now as tz_now from django.conf import settings from django.contrib.contenttypes.models import ContentType -# django-flags -from flags.state import flag_enabled - from ansible_base.lib.utils.models import get_type_for_model # django-ansible-base @@ -523,19 +520,7 @@ class TaskManager(TaskBase): task.save() task.log_lifecycle("waiting") - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - self.control_nodes_to_notify.add(task.get_queue_name()) - else: - # apply_async does a NOTIFY to the channel dispatcher is listening to - # postgres will treat this as part of the transaction, which is what we want - if task.status != 'failed' and type(task) is not WorkflowJob: - task_cls = task._get_task_class() - task_cls.apply_async( - [task.pk], - opts, - queue=task.get_queue_name(), - uuid=task.celery_task_id, - ) + self.control_nodes_to_notify.add(task.get_queue_name()) # In exception cases, like a job failing pre-start checks, we send the websocket status message. # For jobs going into waiting, we omit this because of performance issues, as it should go to running quickly @@ -729,7 +714,6 @@ class TaskManager(TaskBase): for workflow_approval in self.get_expired_workflow_approvals(): self.timeout_approval_node(workflow_approval) - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - for controller_node in self.control_nodes_to_notify: - logger.info(f'Notifying node {controller_node} of new waiting jobs.') - dispatch_waiting_jobs.apply_async(queue=controller_node) + for controller_node in self.control_nodes_to_notify: + logger.info(f'Notifying node {controller_node} of new waiting jobs.') + dispatch_waiting_jobs.apply_async(queue=controller_node) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index 6d9656346a..ae2c6a0d52 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -131,12 +131,6 @@ def _run_dispatch_startup_common(): m.reset_values() -def _legacy_dispatch_startup(): - """ - Legacy branch for startup: simply performs reaping of waiting jobs with a zero grace period. - """ - logger.debug("Legacy dispatcher: calling reaper.reap_waiting with grace_period=0") - reaper.reap_waiting(grace_period=0) def _dispatcherd_dispatch_startup(): @@ -153,21 +147,16 @@ def dispatch_startup(): """ System initialization at startup. First, execute the common logic. - Then, if FEATURE_DISPATCHERD_ENABLED is enabled, re-submit waiting jobs via the control API; - otherwise, fall back to legacy reaping of waiting jobs. + Then, re-submit waiting jobs via the control API. """ _run_dispatch_startup_common() - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - _dispatcherd_dispatch_startup() - else: - _legacy_dispatch_startup() + _dispatcherd_dispatch_startup() def inform_cluster_of_shutdown(): """ Clean system shutdown that marks the current instance offline. - In legacy mode, it also reaps waiting jobs. - In dispatcherd mode, it relies on dispatcherd's built-in cleanup. + Relies on dispatcherd's built-in cleanup. """ try: inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID) @@ -176,14 +165,7 @@ def inform_cluster_of_shutdown(): logger.exception("Cluster host not found: %s", settings.CLUSTER_HOST_ID) return - if flag_enabled('FEATURE_DISPATCHERD_ENABLED'): - logger.debug("Dispatcherd mode: no extra reaping required for instance %s", inst.hostname) - else: - try: - logger.debug("Legacy mode: reaping waiting jobs for instance %s", inst.hostname) - reaper.reap_waiting(inst, grace_period=0) - except Exception: - logger.exception("Failed to reap waiting jobs for %s", inst.hostname) + logger.debug("No extra reaping required for instance %s", inst.hostname) logger.warning("Normal shutdown processed for instance %s; instance removed from capacity pool.", inst.hostname) diff --git a/awx/main/tests/functional/dab_feature_flags/test_feature_flags_api.py b/awx/main/tests/functional/dab_feature_flags/test_feature_flags_api.py index 8007ff7b84..fb483000fb 100644 --- a/awx/main/tests/functional/dab_feature_flags/test_feature_flags_api.py +++ b/awx/main/tests/functional/dab_feature_flags/test_feature_flags_api.py @@ -21,7 +21,7 @@ def test_feature_flags_list_endpoint_override(get, flag_val): bob = User.objects.create(username='bob', password='test_user', is_superuser=True) AAPFlag.objects.all().delete() - flag_name = "FEATURE_DISPATCHERD_ENABLED" + flag_name = "FEATURE_INDIRECT_NODE_COUNTING_ENABLED" setattr(settings, flag_name, flag_val) seed_feature_flags() url = "/api/v2/feature_flags/states/" diff --git a/awx/main/tests/functional/test_dispatch.py b/awx/main/tests/functional/test_dispatch.py index 382f858c28..2c3a665ad9 100644 --- a/awx/main/tests/functional/test_dispatch.py +++ b/awx/main/tests/functional/test_dispatch.py @@ -1,20 +1,11 @@ import datetime -import multiprocessing -import random -import signal -import time -import yaml from unittest import mock -from flags.state import disable_flag, enable_flag from django.utils.timezone import now as tz_now import pytest from awx.main.models import Job, WorkflowJob, Instance from awx.main.dispatch import reaper -from awx.main.dispatch.pool import StatefulPoolWorker, WorkerPool, AutoscalePool from awx.main.dispatch.publish import task -from awx.main.dispatch.worker import BaseWorker, TaskWorker -from awx.main.dispatch.periodic import Scheduler ''' Prevent logger. calls from triggering database operations @@ -57,294 +48,6 @@ def multiply(a, b): return a * b -class SimpleWorker(BaseWorker): - def perform_work(self, body, *args): - pass - - -class ResultWriter(BaseWorker): - def perform_work(self, body, result_queue): - result_queue.put(body + '!!!') - - -class SlowResultWriter(BaseWorker): - def perform_work(self, body, result_queue): - time.sleep(3) - super(SlowResultWriter, self).perform_work(body, result_queue) - - -@pytest.mark.usefixtures("disable_database_settings") -class TestPoolWorker: - def setup_method(self, test_method): - self.worker = StatefulPoolWorker(1000, self.tick, tuple()) - - def tick(self): - self.worker.finished.put(self.worker.queue.get()['uuid']) - time.sleep(0.5) - - def test_qsize(self): - assert self.worker.qsize == 0 - for i in range(3): - self.worker.put({'task': 'abc123'}) - assert self.worker.qsize == 3 - - def test_put(self): - assert len(self.worker.managed_tasks) == 0 - assert self.worker.messages_finished == 0 - self.worker.put({'task': 'abc123'}) - - assert len(self.worker.managed_tasks) == 1 - assert self.worker.messages_sent == 1 - - def test_managed_tasks(self): - self.worker.put({'task': 'abc123'}) - self.worker.calculate_managed_tasks() - assert len(self.worker.managed_tasks) == 1 - - self.tick() - self.worker.calculate_managed_tasks() - assert len(self.worker.managed_tasks) == 0 - - def test_current_task(self): - self.worker.put({'task': 'abc123'}) - assert self.worker.current_task['task'] == 'abc123' - - def test_quit(self): - self.worker.quit() - assert self.worker.queue.get() == 'QUIT' - - def test_idle_busy(self): - assert self.worker.idle is True - assert self.worker.busy is False - self.worker.put({'task': 'abc123'}) - assert self.worker.busy is True - assert self.worker.idle is False - - -@pytest.mark.django_db -class TestWorkerPool: - def setup_method(self, test_method): - self.pool = WorkerPool(min_workers=3) - - def teardown_method(self, test_method): - self.pool.stop(signal.SIGTERM) - - def test_worker(self): - self.pool.init_workers(SimpleWorker().work_loop) - assert len(self.pool) == 3 - for worker in self.pool.workers: - assert worker.messages_sent == 0 - assert worker.alive is True - - def test_single_task(self): - self.pool.init_workers(SimpleWorker().work_loop) - self.pool.write(0, 'xyz') - assert self.pool.workers[0].messages_sent == 1 # worker at index 0 handled one task - assert self.pool.workers[1].messages_sent == 0 - assert self.pool.workers[2].messages_sent == 0 - - def test_queue_preference(self): - self.pool.init_workers(SimpleWorker().work_loop) - self.pool.write(2, 'xyz') - assert self.pool.workers[0].messages_sent == 0 - assert self.pool.workers[1].messages_sent == 0 - assert self.pool.workers[2].messages_sent == 1 # worker at index 2 handled one task - - def test_worker_processing(self): - result_queue = multiprocessing.Queue() - self.pool.init_workers(ResultWriter().work_loop, result_queue) - for i in range(10): - self.pool.write(random.choice(range(len(self.pool))), 'Hello, Worker {}'.format(i)) - all_messages = [result_queue.get(timeout=1) for i in range(10)] - all_messages.sort() - assert all_messages == ['Hello, Worker {}!!!'.format(i) for i in range(10)] - - total_handled = sum([worker.messages_sent for worker in self.pool.workers]) - assert total_handled == 10 - - -@pytest.mark.django_db -class TestAutoScaling: - def setup_method(self, test_method): - self.pool = AutoscalePool(min_workers=2, max_workers=10) - - def teardown_method(self, test_method): - self.pool.stop(signal.SIGTERM) - - def test_scale_up(self): - result_queue = multiprocessing.Queue() - self.pool.init_workers(SlowResultWriter().work_loop, result_queue) - - # start with two workers, write an event to each worker and make it busy - assert len(self.pool) == 2 - for i, w in enumerate(self.pool.workers): - w.put('Hello, Worker {}'.format(0)) - assert len(self.pool) == 2 - - # wait for the subprocesses to start working on their tasks and be marked busy - time.sleep(1) - assert self.pool.should_grow - - # write a third message, expect a new worker to spawn because all - # workers are busy - self.pool.write(0, 'Hello, Worker {}'.format(2)) - assert len(self.pool) == 3 - - def test_scale_down(self): - self.pool.init_workers(ResultWriter().work_loop, multiprocessing.Queue()) - - # start with two workers, and scale up to 10 workers - assert len(self.pool) == 2 - for i in range(8): - self.pool.up() - assert len(self.pool) == 10 - - # cleanup should scale down to 8 workers - self.pool.cleanup() - assert len(self.pool) == 2 - - def test_max_scale_up(self): - self.pool.init_workers(ResultWriter().work_loop, multiprocessing.Queue()) - - assert len(self.pool) == 2 - for i in range(25): - self.pool.up() - assert self.pool.max_workers == 10 - assert self.pool.full is True - assert len(self.pool) == 10 - - def test_equal_worker_distribution(self): - # if all workers are busy, spawn new workers *before* adding messages - # to an existing queue - self.pool.init_workers(SlowResultWriter().work_loop, multiprocessing.Queue) - - # start with two workers, write an event to each worker and make it busy - assert len(self.pool) == 2 - for i in range(10): - self.pool.write(0, 'Hello, World!') - assert len(self.pool) == 10 - for w in self.pool.workers: - assert w.busy - assert len(w.managed_tasks) == 1 - - # the queue is full at 10, the _next_ write should put the message into - # a worker's backlog - assert len(self.pool) == 10 - for w in self.pool.workers: - assert w.messages_sent == 1 - self.pool.write(0, 'Hello, World!') - assert len(self.pool) == 10 - assert self.pool.workers[0].messages_sent == 2 - - @pytest.mark.timeout(20) - def test_lost_worker_autoscale(self): - # if a worker exits, it should be replaced automatically up to min_workers - self.pool.init_workers(ResultWriter().work_loop, multiprocessing.Queue()) - - # start with two workers, kill one of them - assert len(self.pool) == 2 - assert not self.pool.should_grow - alive_pid = self.pool.workers[1].pid - self.pool.workers[0].process.kill() - self.pool.workers[0].process.join() # waits for process to full terminate - - # clean up and the dead worker - self.pool.cleanup() - assert len(self.pool) == 1 - assert self.pool.workers[0].pid == alive_pid - - # the next queue write should replace the lost worker - self.pool.write(0, 'Hello, Worker') - assert len(self.pool) == 2 - - -@pytest.mark.usefixtures("disable_database_settings") -class TestTaskDispatcher: - @property - def tm(self): - return TaskWorker() - - def test_function_dispatch(self): - result = self.tm.perform_work({'task': 'awx.main.tests.functional.test_dispatch.add', 'args': [2, 2]}) - assert result == 4 - - def test_function_dispatch_must_be_decorated(self): - result = self.tm.perform_work({'task': 'awx.main.tests.functional.test_dispatch.restricted', 'args': [2, 2]}) - assert isinstance(result, ValueError) - assert str(result) == 'awx.main.tests.functional.test_dispatch.restricted is not decorated with @task()' # noqa - - def test_method_dispatch(self): - result = self.tm.perform_work({'task': 'awx.main.tests.functional.test_dispatch.Adder', 'args': [2, 2]}) - assert result == 4 - - def test_method_dispatch_must_be_decorated(self): - result = self.tm.perform_work({'task': 'awx.main.tests.functional.test_dispatch.Restricted', 'args': [2, 2]}) - assert isinstance(result, ValueError) - assert str(result) == 'awx.main.tests.functional.test_dispatch.Restricted is not decorated with @task()' # noqa - - def test_python_function_cannot_be_imported(self): - result = self.tm.perform_work( - { - 'task': 'os.system', - 'args': ['ls'], - } - ) - assert isinstance(result, ValueError) - assert str(result) == 'os.system is not a valid awx task' # noqa - - def test_undefined_function_cannot_be_imported(self): - result = self.tm.perform_work({'task': 'awx.foo.bar'}) - assert isinstance(result, ModuleNotFoundError) - assert str(result) == "No module named 'awx.foo'" # noqa - - -@pytest.mark.django_db -class TestTaskPublisher: - @pytest.fixture(autouse=True) - def _disable_dispatcherd(self): - flag_name = "FEATURE_DISPATCHERD_ENABLED" - disable_flag(flag_name) - yield - enable_flag(flag_name) - - def test_function_callable(self): - assert add(2, 2) == 4 - - def test_method_callable(self): - assert Adder().run(2, 2) == 4 - - def test_function_apply_async(self): - message, queue = add.apply_async([2, 2], queue='foobar') - assert message['args'] == [2, 2] - assert message['kwargs'] == {} - assert message['task'] == 'awx.main.tests.functional.test_dispatch.add' - assert queue == 'foobar' - - def test_method_apply_async(self): - message, queue = Adder.apply_async([2, 2], queue='foobar') - assert message['args'] == [2, 2] - assert message['kwargs'] == {} - assert message['task'] == 'awx.main.tests.functional.test_dispatch.Adder' - assert queue == 'foobar' - - def test_apply_async_queue_required(self): - with pytest.raises(ValueError) as e: - message, queue = add.apply_async([2, 2]) - assert "awx.main.tests.functional.test_dispatch.add: Queue value required and may not be None" == e.value.args[0] - - def test_queue_defined_in_task_decorator(self): - message, queue = multiply.apply_async([2, 2]) - assert queue == 'hard-math' - - def test_queue_overridden_from_task_decorator(self): - message, queue = multiply.apply_async([2, 2], queue='not-so-hard') - assert queue == 'not-so-hard' - - def test_apply_with_callable_queuename(self): - message, queue = add.apply_async([2, 2], queue=lambda: 'called') - assert queue == 'called' - - yesterday = tz_now() - datetime.timedelta(days=1) minute = tz_now() - datetime.timedelta(seconds=120) now = tz_now() @@ -448,76 +151,3 @@ class TestJobReaper(object): assert job.started > ref_time assert job.status == 'running' assert job.job_explanation == '' - - -@pytest.mark.django_db -class TestScheduler: - def test_too_many_schedules_freak_out(self): - with pytest.raises(RuntimeError): - Scheduler({'job1': {'schedule': datetime.timedelta(seconds=1)}, 'job2': {'schedule': datetime.timedelta(seconds=1)}}) - - def test_spread_out(self): - scheduler = Scheduler( - { - 'job1': {'schedule': datetime.timedelta(seconds=16)}, - 'job2': {'schedule': datetime.timedelta(seconds=16)}, - 'job3': {'schedule': datetime.timedelta(seconds=16)}, - 'job4': {'schedule': datetime.timedelta(seconds=16)}, - } - ) - assert [job.offset for job in scheduler.jobs] == [0, 4, 8, 12] - - def test_missed_schedule(self, mocker): - scheduler = Scheduler({'job1': {'schedule': datetime.timedelta(seconds=10)}}) - assert scheduler.jobs[0].missed_runs(time.time() - scheduler.global_start) == 0 - mocker.patch('awx.main.dispatch.periodic.time.time', return_value=scheduler.global_start + 50) - scheduler.get_and_mark_pending() - assert scheduler.jobs[0].missed_runs(50) > 1 - - def test_advance_schedule(self, mocker): - scheduler = Scheduler( - { - 'job1': {'schedule': datetime.timedelta(seconds=30)}, - 'joba': {'schedule': datetime.timedelta(seconds=20)}, - 'jobb': {'schedule': datetime.timedelta(seconds=20)}, - } - ) - for job in scheduler.jobs: - # HACK: the offsets automatically added make this a hard test to write... so remove offsets - job.offset = 0.0 - mocker.patch('awx.main.dispatch.periodic.time.time', return_value=scheduler.global_start + 29) - to_run = scheduler.get_and_mark_pending() - assert set(job.name for job in to_run) == set(['joba', 'jobb']) - mocker.patch('awx.main.dispatch.periodic.time.time', return_value=scheduler.global_start + 39) - to_run = scheduler.get_and_mark_pending() - assert len(to_run) == 1 - assert to_run[0].name == 'job1' - - @staticmethod - def get_job(scheduler, name): - for job in scheduler.jobs: - if job.name == name: - return job - - def test_scheduler_debug(self, mocker): - scheduler = Scheduler( - { - 'joba': {'schedule': datetime.timedelta(seconds=20)}, - 'jobb': {'schedule': datetime.timedelta(seconds=50)}, - 'jobc': {'schedule': datetime.timedelta(seconds=500)}, - 'jobd': {'schedule': datetime.timedelta(seconds=20)}, - } - ) - rel_time = 119.9 # slightly under the 6th 20-second bin, to avoid offset problems - current_time = scheduler.global_start + rel_time - mocker.patch('awx.main.dispatch.periodic.time.time', return_value=current_time - 1.0e-8) - self.get_job(scheduler, 'jobb').mark_run(rel_time) - self.get_job(scheduler, 'jobd').mark_run(rel_time - 20.0) - - output = scheduler.debug() - data = yaml.safe_load(output) - assert data['schedule_list']['jobc']['last_run_seconds_ago'] is None - assert data['schedule_list']['joba']['missed_runs'] == 4 - assert data['schedule_list']['jobd']['missed_runs'] == 3 - assert data['schedule_list']['jobd']['completed_runs'] == 1 - assert data['schedule_list']['jobb']['next_run_in_seconds'] > 25.0 diff --git a/awx/main/tests/unit/test_settings.py b/awx/main/tests/unit/test_settings.py index 42ad771b1a..5103d0759e 100644 --- a/awx/main/tests/unit/test_settings.py +++ b/awx/main/tests/unit/test_settings.py @@ -10,7 +10,6 @@ LOCAL_SETTINGS = ( 'NAMED_URL_GRAPH', 'DISPATCHER_MOCK_PUBLISH', # Platform flags are managed by the platform flags system and have environment-specific defaults - 'FEATURE_DISPATCHERD_ENABLED', 'FEATURE_INDIRECT_NODE_COUNTING_ENABLED', ) @@ -87,12 +86,9 @@ def test_development_defaults_feature_flags(monkeypatch): spec.loader.exec_module(development_defaults) # Also import through the development settings to ensure both paths are tested - from awx.settings.development import FEATURE_INDIRECT_NODE_COUNTING_ENABLED, FEATURE_DISPATCHERD_ENABLED + from awx.settings.development import FEATURE_INDIRECT_NODE_COUNTING_ENABLED # Verify the feature flags are set correctly in both the module and settings assert hasattr(development_defaults, 'FEATURE_INDIRECT_NODE_COUNTING_ENABLED') assert development_defaults.FEATURE_INDIRECT_NODE_COUNTING_ENABLED is True - assert hasattr(development_defaults, 'FEATURE_DISPATCHERD_ENABLED') - assert development_defaults.FEATURE_DISPATCHERD_ENABLED is True assert FEATURE_INDIRECT_NODE_COUNTING_ENABLED is True - assert FEATURE_DISPATCHERD_ENABLED is True diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 9a2bc5204d..1f4a472bc9 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -1149,7 +1149,6 @@ OPA_REQUEST_RETRIES = 2 # The number of retry attempts for connecting to the OP # feature flags FEATURE_INDIRECT_NODE_COUNTING_ENABLED = False -FEATURE_DISPATCHERD_ENABLED = False # Dispatcher worker lifetime. If set to None, workers will never be retired # based on age. Note workers will finish their last task before retiring if diff --git a/awx/settings/development_defaults.py b/awx/settings/development_defaults.py index c7f43e880e..49cb1a68b6 100644 --- a/awx/settings/development_defaults.py +++ b/awx/settings/development_defaults.py @@ -69,4 +69,3 @@ AWX_DISABLE_TASK_MANAGERS = False # ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!================================= FEATURE_INDIRECT_NODE_COUNTING_ENABLED = True -FEATURE_DISPATCHERD_ENABLED = True diff --git a/docs/tasks.md b/docs/tasks.md index aa91b90339..c5134fc785 100644 --- a/docs/tasks.md +++ b/docs/tasks.md @@ -145,14 +145,6 @@ This outputs running and queued task UUIDs handled by a specific dispatcher ['eb3b0a83-86da-413d-902a-16d7530a6b25', 'f447266a-23da-42b4-8025-fe379d2db96f'] ``` -Additionally, you can tell the local running dispatcher to recycle all of the -workers in its pool. It will wait for any running jobs to finish and exit when -work has completed, spinning up replacement workers. - -``` -awx-manage run_dispatcher --reload -``` - * * * In the following sections, we will go further into the details regarding AWX tasks. They are all decorated by `@task()` in [awx/awx/main/tasks.py](https://github.com/ansible/awx/blob/devel/awx/main/tasks.py)