WIP First pass

* started removing feature flags and adjusting logic
* WIP
This commit is contained in:
thedoubl3j
2025-12-17 14:29:00 -05:00
parent b34ee01fb3
commit e55578b64e
11 changed files with 56 additions and 589 deletions

View File

@@ -1,6 +1,5 @@
import inspect
import logging
import json
import time
from uuid import uuid4
@@ -9,9 +8,6 @@ from dispatcherd.processors.blocker import Blocker
from dispatcherd.utils import resolve_callable
from django_guid import get_guid
from django.conf import settings
from . import pg_bus_conn
logger = logging.getLogger('awx.main.dispatch')
@@ -101,43 +97,21 @@ class task:
@classmethod
def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
try:
from flags.state import flag_enabled
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
# At this point we have the import string, and submit_task wants the method, so back to that
actual_task = resolve_callable(cls.name)
processor_options = ()
if on_duplicate is not None:
processor_options = (Blocker.Params(on_duplicate=on_duplicate),)
return submit_task(
actual_task,
args=args,
kwargs=kwargs,
queue=queue,
uuid=uuid,
timeout=timeout,
processor_options=processor_options,
**kw,
)
except Exception:
logger.exception(f"[DISPATCHER] Failed to check for alternative dispatcherd implementation for {cls.name}")
# Continue with original implementation if anything fails
pass
# Original implementation follows
queue = queue or getattr(cls.queue, 'im_func', cls.queue)
if not queue:
msg = f'{cls.name}: Queue value required and may not be None'
logger.error(msg)
raise ValueError(msg)
obj = cls.get_async_body(args=args, kwargs=kwargs, uuid=uuid, **kw)
if callable(queue):
queue = queue()
if not settings.DISPATCHER_MOCK_PUBLISH:
with pg_bus_conn() as conn:
conn.notify(queue, json.dumps(obj))
return (obj, queue)
# At this point we have the import string, and submit_task wants the method, so back to that
actual_task = resolve_callable(cls.name)
processor_options = ()
if on_duplicate is not None:
processor_options = (Blocker.Params(on_duplicate=on_duplicate),)
return submit_task(
actual_task,
args=args,
kwargs=kwargs,
queue=queue,
uuid=uuid,
timeout=timeout,
processor_options=processor_options,
**kw,
)
# If the object we're wrapping *is* a class (e.g., RunJob), return
# a *new* class that inherits from the wrapped class *and* BaseTask

View File

@@ -12,18 +12,11 @@ from django.db import connection
from django.core.management.base import BaseCommand, CommandError
from django.core.cache import cache as django_cache
from flags.state import flag_enabled
from dispatcherd.factories import get_control_from_settings
from dispatcherd import run_service
from dispatcherd.config import setup as dispatcher_setup
from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.config import get_dispatcherd_config
from awx.main.dispatch.control import Control
from awx.main.dispatch.pool import AutoscalePool
from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker
from awx.main.analytics.subsystem_metrics import DispatcherMetricsServer
logger = logging.getLogger('awx.main.dispatch')
@@ -33,14 +26,7 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--status', dest='status', action='store_true', help='print the internal state of any running dispatchers')
parser.add_argument('--schedule', dest='schedule', action='store_true', help='print the current status of schedules being ran by dispatcher')
parser.add_argument('--running', dest='running', action='store_true', help='print the UUIDs of any tasked managed by this dispatcher')
parser.add_argument(
'--reload',
dest='reload',
action='store_true',
help=('cause the dispatcher to recycle all of its worker processes; running jobs will run to completion first'),
)
parser.add_argument(
'--cancel',
dest='cancel',
@@ -53,38 +39,17 @@ class Command(BaseCommand):
def handle(self, *arg, **options):
if options.get('status'):
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
ctl = get_control_from_settings()
running_data = ctl.control_with_reply('status')
if len(running_data) != 1:
raise CommandError('Did not receive expected number of replies')
print(yaml.dump(running_data[0], default_flow_style=False))
return
else:
print(Control('dispatcher').status())
return
if options.get('schedule'):
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
print('NOT YET IMPLEMENTED')
return
else:
print(Control('dispatcher').schedule())
ctl = get_control_from_settings()
running_data = ctl.control_with_reply('status')
if len(running_data) != 1:
raise CommandError('Did not receive expected number of replies')
print(yaml.dump(running_data[0], default_flow_style=False))
return
if options.get('running'):
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
ctl = get_control_from_settings()
running_data = ctl.control_with_reply('running')
print(yaml.dump(running_data, default_flow_style=False))
return
else:
print(Control('dispatcher').running())
return
if options.get('reload'):
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
print('NOT YET IMPLEMENTED')
return
else:
return Control('dispatcher').control({'control': 'reload'})
ctl = get_control_from_settings()
running_data = ctl.control_with_reply('running')
print(yaml.dump(running_data, default_flow_style=False))
return
if options.get('cancel'):
cancel_str = options.get('cancel')
try:
@@ -94,56 +59,14 @@ class Command(BaseCommand):
if not isinstance(cancel_data, list):
cancel_data = [cancel_str]
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
ctl = get_control_from_settings()
results = []
for task_id in cancel_data:
# For each task UUID, send an individual cancel command
result = ctl.control_with_reply('cancel', data={'uuid': task_id})
results.append(result)
print(yaml.dump(results, default_flow_style=False))
return
else:
print(Control('dispatcher').cancel(cancel_data))
return
ctl = get_control_from_settings()
results = []
for task_id in cancel_data:
# For each task UUID, send an individual cancel command
result = ctl.control_with_reply('cancel', data={'uuid': task_id})
results.append(result)
print(yaml.dump(results, default_flow_style=False))
return
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
self.configure_dispatcher_logging()
# Close the connection, because the pg_notify broker will create new async connection
connection.close()
django_cache.close()
dispatcher_setup(get_dispatcherd_config(for_service=True))
run_service()
else:
consumer = None
try:
DispatcherMetricsServer().start()
except redis.exceptions.ConnectionError as exc:
raise CommandError(f'Dispatcher could not connect to redis, error: {exc}')
try:
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
consumer.run()
except KeyboardInterrupt:
logger.debug('Terminating Task Dispatcher')
if consumer:
consumer.stop()
def configure_dispatcher_logging(self):
# Apply special log rule for the parent process
special_logging = copy.deepcopy(settings.LOGGING)
for handler_name, handler_config in special_logging.get('handlers', {}).items():
filters = handler_config.get('filters', [])
if 'dynamic_level_filter' in filters:
handler_config['filters'] = [flt for flt in filters if flt != 'dynamic_level_filter']
logger.info(f'Dispatcherd main process replaced log level filter for {handler_name} handler')
# Apply the custom logging level here, before the asyncio code starts
special_logging.setdefault('loggers', {}).setdefault('dispatcherd', {})
special_logging['loggers']['dispatcherd']['level'] = settings.LOG_AGGREGATOR_LEVEL
logging.config.dictConfig(special_logging)
dispatcher_setup(get_dispatcherd_config(for_service=True))
run_service()

View File

@@ -39,7 +39,6 @@ from ansible_base.rbac.models import RoleEvaluation
# AWX
from awx.main.models.base import CommonModelNameNotUnique, PasswordFieldsModel, NotificationFieldsModel
from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.control import Control as ControlDispatcher
from awx.main.registrar import activity_stream_registrar
from awx.main.models.mixins import TaskManagerUnifiedJobMixin, ExecutionEnvironmentMixin
from awx.main.models.rbac import to_permissions
@@ -1497,43 +1496,32 @@ class UnifiedJob(
if not self.celery_task_id:
return False
canceled = []
# Special case for task manager (used during workflow job cancellation)
if not connection.get_autocommit():
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
try:
from dispatcherd.factories import get_control_from_settings
try:
from dispatcherd.factories import get_control_from_settings
ctl = get_control_from_settings()
ctl.control('cancel', data={'uuid': self.celery_task_id})
except Exception:
logger.exception("Error sending cancel command to new dispatcher")
else:
try:
ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id], with_reply=False)
except Exception:
logger.exception("Error sending cancel command to legacy dispatcher")
ctl = get_control_from_settings()
ctl.control('cancel', data={'uuid': self.celery_task_id})
except Exception:
logger.exception("Error sending cancel command to dispatcher")
return True # task manager itself needs to act under assumption that cancel was received
# Standard case with reply
try:
timeout = 5
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
from dispatcherd.factories import get_control_from_settings
from dispatcherd.factories import get_control_from_settings
ctl = get_control_from_settings()
results = ctl.control_with_reply('cancel', data={'uuid': self.celery_task_id}, expected_replies=1, timeout=timeout)
# Check if cancel was successful by checking if we got any results
return bool(results and len(results) > 0)
else:
# Original implementation
canceled = ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id])
ctl = get_control_from_settings()
results = ctl.control_with_reply('cancel', data={'uuid': self.celery_task_id}, expected_replies=1, timeout=timeout)
# Check if cancel was successful by checking if we got any results
return bool(results and len(results) > 0)
except socket.timeout:
logger.error(f'could not reach dispatcher on {self.controller_node} within {timeout}s')
except Exception:
logger.exception("error encountered when checking task status")
return bool(self.celery_task_id in canceled) # True or False, whether confirmation was obtained
return False # whether confirmation was obtained
def cancel(self, job_explanation=None, is_chain=False):
if self.can_cancel:

View File

@@ -19,9 +19,6 @@ from django.utils.timezone import now as tz_now
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
# django-flags
from flags.state import flag_enabled
from ansible_base.lib.utils.models import get_type_for_model
# django-ansible-base
@@ -523,19 +520,7 @@ class TaskManager(TaskBase):
task.save()
task.log_lifecycle("waiting")
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
self.control_nodes_to_notify.add(task.get_queue_name())
else:
# apply_async does a NOTIFY to the channel dispatcher is listening to
# postgres will treat this as part of the transaction, which is what we want
if task.status != 'failed' and type(task) is not WorkflowJob:
task_cls = task._get_task_class()
task_cls.apply_async(
[task.pk],
opts,
queue=task.get_queue_name(),
uuid=task.celery_task_id,
)
self.control_nodes_to_notify.add(task.get_queue_name())
# In exception cases, like a job failing pre-start checks, we send the websocket status message.
# For jobs going into waiting, we omit this because of performance issues, as it should go to running quickly
@@ -729,7 +714,6 @@ class TaskManager(TaskBase):
for workflow_approval in self.get_expired_workflow_approvals():
self.timeout_approval_node(workflow_approval)
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
for controller_node in self.control_nodes_to_notify:
logger.info(f'Notifying node {controller_node} of new waiting jobs.')
dispatch_waiting_jobs.apply_async(queue=controller_node)
for controller_node in self.control_nodes_to_notify:
logger.info(f'Notifying node {controller_node} of new waiting jobs.')
dispatch_waiting_jobs.apply_async(queue=controller_node)

View File

@@ -131,12 +131,6 @@ def _run_dispatch_startup_common():
m.reset_values()
def _legacy_dispatch_startup():
"""
Legacy branch for startup: simply performs reaping of waiting jobs with a zero grace period.
"""
logger.debug("Legacy dispatcher: calling reaper.reap_waiting with grace_period=0")
reaper.reap_waiting(grace_period=0)
def _dispatcherd_dispatch_startup():
@@ -153,21 +147,16 @@ def dispatch_startup():
"""
System initialization at startup.
First, execute the common logic.
Then, if FEATURE_DISPATCHERD_ENABLED is enabled, re-submit waiting jobs via the control API;
otherwise, fall back to legacy reaping of waiting jobs.
Then, re-submit waiting jobs via the control API.
"""
_run_dispatch_startup_common()
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
_dispatcherd_dispatch_startup()
else:
_legacy_dispatch_startup()
_dispatcherd_dispatch_startup()
def inform_cluster_of_shutdown():
"""
Clean system shutdown that marks the current instance offline.
In legacy mode, it also reaps waiting jobs.
In dispatcherd mode, it relies on dispatcherd's built-in cleanup.
Relies on dispatcherd's built-in cleanup.
"""
try:
inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
@@ -176,14 +165,7 @@ def inform_cluster_of_shutdown():
logger.exception("Cluster host not found: %s", settings.CLUSTER_HOST_ID)
return
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
logger.debug("Dispatcherd mode: no extra reaping required for instance %s", inst.hostname)
else:
try:
logger.debug("Legacy mode: reaping waiting jobs for instance %s", inst.hostname)
reaper.reap_waiting(inst, grace_period=0)
except Exception:
logger.exception("Failed to reap waiting jobs for %s", inst.hostname)
logger.debug("No extra reaping required for instance %s", inst.hostname)
logger.warning("Normal shutdown processed for instance %s; instance removed from capacity pool.", inst.hostname)

View File

@@ -21,7 +21,7 @@ def test_feature_flags_list_endpoint_override(get, flag_val):
bob = User.objects.create(username='bob', password='test_user', is_superuser=True)
AAPFlag.objects.all().delete()
flag_name = "FEATURE_DISPATCHERD_ENABLED"
flag_name = "FEATURE_INDIRECT_NODE_COUNTING_ENABLED"
setattr(settings, flag_name, flag_val)
seed_feature_flags()
url = "/api/v2/feature_flags/states/"

View File

@@ -1,20 +1,11 @@
import datetime
import multiprocessing
import random
import signal
import time
import yaml
from unittest import mock
from flags.state import disable_flag, enable_flag
from django.utils.timezone import now as tz_now
import pytest
from awx.main.models import Job, WorkflowJob, Instance
from awx.main.dispatch import reaper
from awx.main.dispatch.pool import StatefulPoolWorker, WorkerPool, AutoscalePool
from awx.main.dispatch.publish import task
from awx.main.dispatch.worker import BaseWorker, TaskWorker
from awx.main.dispatch.periodic import Scheduler
'''
Prevent logger.<warn, debug, error> calls from triggering database operations
@@ -57,294 +48,6 @@ def multiply(a, b):
return a * b
class SimpleWorker(BaseWorker):
def perform_work(self, body, *args):
pass
class ResultWriter(BaseWorker):
def perform_work(self, body, result_queue):
result_queue.put(body + '!!!')
class SlowResultWriter(BaseWorker):
def perform_work(self, body, result_queue):
time.sleep(3)
super(SlowResultWriter, self).perform_work(body, result_queue)
@pytest.mark.usefixtures("disable_database_settings")
class TestPoolWorker:
def setup_method(self, test_method):
self.worker = StatefulPoolWorker(1000, self.tick, tuple())
def tick(self):
self.worker.finished.put(self.worker.queue.get()['uuid'])
time.sleep(0.5)
def test_qsize(self):
assert self.worker.qsize == 0
for i in range(3):
self.worker.put({'task': 'abc123'})
assert self.worker.qsize == 3
def test_put(self):
assert len(self.worker.managed_tasks) == 0
assert self.worker.messages_finished == 0
self.worker.put({'task': 'abc123'})
assert len(self.worker.managed_tasks) == 1
assert self.worker.messages_sent == 1
def test_managed_tasks(self):
self.worker.put({'task': 'abc123'})
self.worker.calculate_managed_tasks()
assert len(self.worker.managed_tasks) == 1
self.tick()
self.worker.calculate_managed_tasks()
assert len(self.worker.managed_tasks) == 0
def test_current_task(self):
self.worker.put({'task': 'abc123'})
assert self.worker.current_task['task'] == 'abc123'
def test_quit(self):
self.worker.quit()
assert self.worker.queue.get() == 'QUIT'
def test_idle_busy(self):
assert self.worker.idle is True
assert self.worker.busy is False
self.worker.put({'task': 'abc123'})
assert self.worker.busy is True
assert self.worker.idle is False
@pytest.mark.django_db
class TestWorkerPool:
def setup_method(self, test_method):
self.pool = WorkerPool(min_workers=3)
def teardown_method(self, test_method):
self.pool.stop(signal.SIGTERM)
def test_worker(self):
self.pool.init_workers(SimpleWorker().work_loop)
assert len(self.pool) == 3
for worker in self.pool.workers:
assert worker.messages_sent == 0
assert worker.alive is True
def test_single_task(self):
self.pool.init_workers(SimpleWorker().work_loop)
self.pool.write(0, 'xyz')
assert self.pool.workers[0].messages_sent == 1 # worker at index 0 handled one task
assert self.pool.workers[1].messages_sent == 0
assert self.pool.workers[2].messages_sent == 0
def test_queue_preference(self):
self.pool.init_workers(SimpleWorker().work_loop)
self.pool.write(2, 'xyz')
assert self.pool.workers[0].messages_sent == 0
assert self.pool.workers[1].messages_sent == 0
assert self.pool.workers[2].messages_sent == 1 # worker at index 2 handled one task
def test_worker_processing(self):
result_queue = multiprocessing.Queue()
self.pool.init_workers(ResultWriter().work_loop, result_queue)
for i in range(10):
self.pool.write(random.choice(range(len(self.pool))), 'Hello, Worker {}'.format(i))
all_messages = [result_queue.get(timeout=1) for i in range(10)]
all_messages.sort()
assert all_messages == ['Hello, Worker {}!!!'.format(i) for i in range(10)]
total_handled = sum([worker.messages_sent for worker in self.pool.workers])
assert total_handled == 10
@pytest.mark.django_db
class TestAutoScaling:
def setup_method(self, test_method):
self.pool = AutoscalePool(min_workers=2, max_workers=10)
def teardown_method(self, test_method):
self.pool.stop(signal.SIGTERM)
def test_scale_up(self):
result_queue = multiprocessing.Queue()
self.pool.init_workers(SlowResultWriter().work_loop, result_queue)
# start with two workers, write an event to each worker and make it busy
assert len(self.pool) == 2
for i, w in enumerate(self.pool.workers):
w.put('Hello, Worker {}'.format(0))
assert len(self.pool) == 2
# wait for the subprocesses to start working on their tasks and be marked busy
time.sleep(1)
assert self.pool.should_grow
# write a third message, expect a new worker to spawn because all
# workers are busy
self.pool.write(0, 'Hello, Worker {}'.format(2))
assert len(self.pool) == 3
def test_scale_down(self):
self.pool.init_workers(ResultWriter().work_loop, multiprocessing.Queue())
# start with two workers, and scale up to 10 workers
assert len(self.pool) == 2
for i in range(8):
self.pool.up()
assert len(self.pool) == 10
# cleanup should scale down to 8 workers
self.pool.cleanup()
assert len(self.pool) == 2
def test_max_scale_up(self):
self.pool.init_workers(ResultWriter().work_loop, multiprocessing.Queue())
assert len(self.pool) == 2
for i in range(25):
self.pool.up()
assert self.pool.max_workers == 10
assert self.pool.full is True
assert len(self.pool) == 10
def test_equal_worker_distribution(self):
# if all workers are busy, spawn new workers *before* adding messages
# to an existing queue
self.pool.init_workers(SlowResultWriter().work_loop, multiprocessing.Queue)
# start with two workers, write an event to each worker and make it busy
assert len(self.pool) == 2
for i in range(10):
self.pool.write(0, 'Hello, World!')
assert len(self.pool) == 10
for w in self.pool.workers:
assert w.busy
assert len(w.managed_tasks) == 1
# the queue is full at 10, the _next_ write should put the message into
# a worker's backlog
assert len(self.pool) == 10
for w in self.pool.workers:
assert w.messages_sent == 1
self.pool.write(0, 'Hello, World!')
assert len(self.pool) == 10
assert self.pool.workers[0].messages_sent == 2
@pytest.mark.timeout(20)
def test_lost_worker_autoscale(self):
# if a worker exits, it should be replaced automatically up to min_workers
self.pool.init_workers(ResultWriter().work_loop, multiprocessing.Queue())
# start with two workers, kill one of them
assert len(self.pool) == 2
assert not self.pool.should_grow
alive_pid = self.pool.workers[1].pid
self.pool.workers[0].process.kill()
self.pool.workers[0].process.join() # waits for process to full terminate
# clean up and the dead worker
self.pool.cleanup()
assert len(self.pool) == 1
assert self.pool.workers[0].pid == alive_pid
# the next queue write should replace the lost worker
self.pool.write(0, 'Hello, Worker')
assert len(self.pool) == 2
@pytest.mark.usefixtures("disable_database_settings")
class TestTaskDispatcher:
@property
def tm(self):
return TaskWorker()
def test_function_dispatch(self):
result = self.tm.perform_work({'task': 'awx.main.tests.functional.test_dispatch.add', 'args': [2, 2]})
assert result == 4
def test_function_dispatch_must_be_decorated(self):
result = self.tm.perform_work({'task': 'awx.main.tests.functional.test_dispatch.restricted', 'args': [2, 2]})
assert isinstance(result, ValueError)
assert str(result) == 'awx.main.tests.functional.test_dispatch.restricted is not decorated with @task()' # noqa
def test_method_dispatch(self):
result = self.tm.perform_work({'task': 'awx.main.tests.functional.test_dispatch.Adder', 'args': [2, 2]})
assert result == 4
def test_method_dispatch_must_be_decorated(self):
result = self.tm.perform_work({'task': 'awx.main.tests.functional.test_dispatch.Restricted', 'args': [2, 2]})
assert isinstance(result, ValueError)
assert str(result) == 'awx.main.tests.functional.test_dispatch.Restricted is not decorated with @task()' # noqa
def test_python_function_cannot_be_imported(self):
result = self.tm.perform_work(
{
'task': 'os.system',
'args': ['ls'],
}
)
assert isinstance(result, ValueError)
assert str(result) == 'os.system is not a valid awx task' # noqa
def test_undefined_function_cannot_be_imported(self):
result = self.tm.perform_work({'task': 'awx.foo.bar'})
assert isinstance(result, ModuleNotFoundError)
assert str(result) == "No module named 'awx.foo'" # noqa
@pytest.mark.django_db
class TestTaskPublisher:
@pytest.fixture(autouse=True)
def _disable_dispatcherd(self):
flag_name = "FEATURE_DISPATCHERD_ENABLED"
disable_flag(flag_name)
yield
enable_flag(flag_name)
def test_function_callable(self):
assert add(2, 2) == 4
def test_method_callable(self):
assert Adder().run(2, 2) == 4
def test_function_apply_async(self):
message, queue = add.apply_async([2, 2], queue='foobar')
assert message['args'] == [2, 2]
assert message['kwargs'] == {}
assert message['task'] == 'awx.main.tests.functional.test_dispatch.add'
assert queue == 'foobar'
def test_method_apply_async(self):
message, queue = Adder.apply_async([2, 2], queue='foobar')
assert message['args'] == [2, 2]
assert message['kwargs'] == {}
assert message['task'] == 'awx.main.tests.functional.test_dispatch.Adder'
assert queue == 'foobar'
def test_apply_async_queue_required(self):
with pytest.raises(ValueError) as e:
message, queue = add.apply_async([2, 2])
assert "awx.main.tests.functional.test_dispatch.add: Queue value required and may not be None" == e.value.args[0]
def test_queue_defined_in_task_decorator(self):
message, queue = multiply.apply_async([2, 2])
assert queue == 'hard-math'
def test_queue_overridden_from_task_decorator(self):
message, queue = multiply.apply_async([2, 2], queue='not-so-hard')
assert queue == 'not-so-hard'
def test_apply_with_callable_queuename(self):
message, queue = add.apply_async([2, 2], queue=lambda: 'called')
assert queue == 'called'
yesterday = tz_now() - datetime.timedelta(days=1)
minute = tz_now() - datetime.timedelta(seconds=120)
now = tz_now()
@@ -448,76 +151,3 @@ class TestJobReaper(object):
assert job.started > ref_time
assert job.status == 'running'
assert job.job_explanation == ''
@pytest.mark.django_db
class TestScheduler:
def test_too_many_schedules_freak_out(self):
with pytest.raises(RuntimeError):
Scheduler({'job1': {'schedule': datetime.timedelta(seconds=1)}, 'job2': {'schedule': datetime.timedelta(seconds=1)}})
def test_spread_out(self):
scheduler = Scheduler(
{
'job1': {'schedule': datetime.timedelta(seconds=16)},
'job2': {'schedule': datetime.timedelta(seconds=16)},
'job3': {'schedule': datetime.timedelta(seconds=16)},
'job4': {'schedule': datetime.timedelta(seconds=16)},
}
)
assert [job.offset for job in scheduler.jobs] == [0, 4, 8, 12]
def test_missed_schedule(self, mocker):
scheduler = Scheduler({'job1': {'schedule': datetime.timedelta(seconds=10)}})
assert scheduler.jobs[0].missed_runs(time.time() - scheduler.global_start) == 0
mocker.patch('awx.main.dispatch.periodic.time.time', return_value=scheduler.global_start + 50)
scheduler.get_and_mark_pending()
assert scheduler.jobs[0].missed_runs(50) > 1
def test_advance_schedule(self, mocker):
scheduler = Scheduler(
{
'job1': {'schedule': datetime.timedelta(seconds=30)},
'joba': {'schedule': datetime.timedelta(seconds=20)},
'jobb': {'schedule': datetime.timedelta(seconds=20)},
}
)
for job in scheduler.jobs:
# HACK: the offsets automatically added make this a hard test to write... so remove offsets
job.offset = 0.0
mocker.patch('awx.main.dispatch.periodic.time.time', return_value=scheduler.global_start + 29)
to_run = scheduler.get_and_mark_pending()
assert set(job.name for job in to_run) == set(['joba', 'jobb'])
mocker.patch('awx.main.dispatch.periodic.time.time', return_value=scheduler.global_start + 39)
to_run = scheduler.get_and_mark_pending()
assert len(to_run) == 1
assert to_run[0].name == 'job1'
@staticmethod
def get_job(scheduler, name):
for job in scheduler.jobs:
if job.name == name:
return job
def test_scheduler_debug(self, mocker):
scheduler = Scheduler(
{
'joba': {'schedule': datetime.timedelta(seconds=20)},
'jobb': {'schedule': datetime.timedelta(seconds=50)},
'jobc': {'schedule': datetime.timedelta(seconds=500)},
'jobd': {'schedule': datetime.timedelta(seconds=20)},
}
)
rel_time = 119.9 # slightly under the 6th 20-second bin, to avoid offset problems
current_time = scheduler.global_start + rel_time
mocker.patch('awx.main.dispatch.periodic.time.time', return_value=current_time - 1.0e-8)
self.get_job(scheduler, 'jobb').mark_run(rel_time)
self.get_job(scheduler, 'jobd').mark_run(rel_time - 20.0)
output = scheduler.debug()
data = yaml.safe_load(output)
assert data['schedule_list']['jobc']['last_run_seconds_ago'] is None
assert data['schedule_list']['joba']['missed_runs'] == 4
assert data['schedule_list']['jobd']['missed_runs'] == 3
assert data['schedule_list']['jobd']['completed_runs'] == 1
assert data['schedule_list']['jobb']['next_run_in_seconds'] > 25.0

View File

@@ -10,7 +10,6 @@ LOCAL_SETTINGS = (
'NAMED_URL_GRAPH',
'DISPATCHER_MOCK_PUBLISH',
# Platform flags are managed by the platform flags system and have environment-specific defaults
'FEATURE_DISPATCHERD_ENABLED',
'FEATURE_INDIRECT_NODE_COUNTING_ENABLED',
)
@@ -87,12 +86,9 @@ def test_development_defaults_feature_flags(monkeypatch):
spec.loader.exec_module(development_defaults)
# Also import through the development settings to ensure both paths are tested
from awx.settings.development import FEATURE_INDIRECT_NODE_COUNTING_ENABLED, FEATURE_DISPATCHERD_ENABLED
from awx.settings.development import FEATURE_INDIRECT_NODE_COUNTING_ENABLED
# Verify the feature flags are set correctly in both the module and settings
assert hasattr(development_defaults, 'FEATURE_INDIRECT_NODE_COUNTING_ENABLED')
assert development_defaults.FEATURE_INDIRECT_NODE_COUNTING_ENABLED is True
assert hasattr(development_defaults, 'FEATURE_DISPATCHERD_ENABLED')
assert development_defaults.FEATURE_DISPATCHERD_ENABLED is True
assert FEATURE_INDIRECT_NODE_COUNTING_ENABLED is True
assert FEATURE_DISPATCHERD_ENABLED is True

View File

@@ -1149,7 +1149,6 @@ OPA_REQUEST_RETRIES = 2 # The number of retry attempts for connecting to the OP
# feature flags
FEATURE_INDIRECT_NODE_COUNTING_ENABLED = False
FEATURE_DISPATCHERD_ENABLED = False
# Dispatcher worker lifetime. If set to None, workers will never be retired
# based on age. Note workers will finish their last task before retiring if

View File

@@ -69,4 +69,3 @@ AWX_DISABLE_TASK_MANAGERS = False
# ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!=================================
FEATURE_INDIRECT_NODE_COUNTING_ENABLED = True
FEATURE_DISPATCHERD_ENABLED = True

View File

@@ -145,14 +145,6 @@ This outputs running and queued task UUIDs handled by a specific dispatcher
['eb3b0a83-86da-413d-902a-16d7530a6b25', 'f447266a-23da-42b4-8025-fe379d2db96f']
```
Additionally, you can tell the local running dispatcher to recycle all of the
workers in its pool. It will wait for any running jobs to finish and exit when
work has completed, spinning up replacement workers.
```
awx-manage run_dispatcher --reload
```
* * *
In the following sections, we will go further into the details regarding AWX tasks. They are all decorated by `@task()` in [awx/awx/main/tasks.py](https://github.com/ansible/awx/blob/devel/awx/main/tasks.py)