AAP-60470 Add dispatcherctl and dispatcherd commands as updated interface to dispatcherd lib (#16206)

* Add dispatcherctl command

* Add tests for dispatcherctl command

* Exit early if sqlite3

* Switch to dispatcherd mgmt cmd

* Move unwanted command options to run_dispatcher

* Add test for new stuff

* Update the SOS report status command

* make docs always reference new command

* Consistently error if given config file
This commit is contained in:
Alan Rominger
2026-01-27 15:57:23 -05:00
committed by GitHub
parent 1128ad5a57
commit 271383d018
13 changed files with 304 additions and 49 deletions

View File

@@ -289,7 +289,7 @@ dispatcher:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(PYTHON) manage.py run_dispatcher
$(PYTHON) manage.py dispatcherd
## Run to start the zeromq callback receiver
receiver:

View File

@@ -82,7 +82,7 @@ class MainConfig(AppConfig):
def configure_dispatcherd(self):
"""This implements the default configuration for dispatcherd
If running the tasking service like awx-manage run_dispatcher,
If running the tasking service like awx-manage dispatcherd,
some additional config will be applied on top of this.
This configuration provides the minimum such that code can submit
tasks to pg_notify to run those tasks.

View File

@@ -0,0 +1,88 @@
import argparse
import inspect
import logging
import os
import sys
import yaml
from django.core.management.base import BaseCommand, CommandError
from django.db import connection
from dispatcherd.cli import (
CONTROL_ARG_SCHEMAS,
DEFAULT_CONFIG_FILE,
_base_cli_parent,
_control_common_parent,
_register_control_arguments,
_build_command_data_from_args,
)
from dispatcherd.config import setup as dispatcher_setup
from dispatcherd.factories import get_control_from_settings
from dispatcherd.service import control_tasks
from awx.main.dispatch.config import get_dispatcherd_config
from awx.main.management.commands.dispatcherd import ensure_no_dispatcherd_env_config
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Dispatcher control operations'
def add_arguments(self, parser):
parser.description = 'Run dispatcherd control commands using awx-manage.'
base_parent = _base_cli_parent()
control_parent = _control_common_parent()
parser._add_container_actions(base_parent)
parser._add_container_actions(control_parent)
subparsers = parser.add_subparsers(dest='command', metavar='command')
subparsers.required = True
shared_parents = [base_parent, control_parent]
for command in control_tasks.__all__:
func = getattr(control_tasks, command, None)
doc = inspect.getdoc(func) or ''
summary = doc.splitlines()[0] if doc else None
command_parser = subparsers.add_parser(
command,
help=summary,
description=doc,
parents=shared_parents,
)
_register_control_arguments(command_parser, CONTROL_ARG_SCHEMAS.get(command))
def handle(self, *args, **options):
command = options.pop('command', None)
if not command:
raise CommandError('No dispatcher control command specified')
for django_opt in ('verbosity', 'traceback', 'no_color', 'force_color', 'skip_checks'):
options.pop(django_opt, None)
log_level = options.pop('log_level', 'DEBUG')
config_path = os.path.abspath(options.pop('config', DEFAULT_CONFIG_FILE))
expected_replies = options.pop('expected_replies', 1)
logging.basicConfig(level=getattr(logging, log_level), stream=sys.stdout)
logger.debug(f"Configured standard out logging at {log_level} level")
default_config = os.path.abspath(DEFAULT_CONFIG_FILE)
ensure_no_dispatcherd_env_config()
if config_path != default_config:
raise CommandError('The config path CLI option is not allowed for the awx-manage command')
if connection.vendor == 'sqlite':
raise CommandError('dispatcherctl is not supported with sqlite3; use a PostgreSQL database')
else:
logger.info('Using config generated from awx.main.dispatch.config.get_dispatcherd_config')
dispatcher_setup(get_dispatcherd_config())
schema_namespace = argparse.Namespace(**options)
data = _build_command_data_from_args(schema_namespace, command)
ctl = get_control_from_settings()
returned = ctl.control_with_reply(command, data=data, expected_replies=expected_replies)
self.stdout.write(yaml.dump(returned, default_flow_style=False))
if len(returned) < expected_replies:
logger.error(f'Obtained only {len(returned)} of {expected_replies}, exiting with non-zero code')
raise CommandError('dispatcherctl returned fewer replies than expected')

View File

@@ -0,0 +1,85 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
import copy
import hashlib
import json
import logging
import logging.config
import os
from django.conf import settings
from django.core.cache import cache as django_cache
from django.core.management.base import BaseCommand, CommandError
from django.db import connection
from dispatcherd.config import setup as dispatcher_setup
from awx.main.dispatch.config import get_dispatcherd_config
logger = logging.getLogger('awx.main.dispatch')
from dispatcherd import run_service
def _json_default(value):
if isinstance(value, set):
return sorted(value)
if isinstance(value, tuple):
return list(value)
return str(value)
def _hash_config(config):
serialized = json.dumps(config, sort_keys=True, separators=(',', ':'), default=_json_default)
return hashlib.sha256(serialized.encode('utf-8')).hexdigest()
def ensure_no_dispatcherd_env_config():
if os.getenv('DISPATCHERD_CONFIG_FILE'):
raise CommandError('DISPATCHERD_CONFIG_FILE is set but awx-manage dispatcherd uses dynamic config from code')
class Command(BaseCommand):
help = (
'Run the background task service, this is the supported entrypoint since the introduction of dispatcherd as a library. '
'This replaces the prior awx-manage run_dispatcher service, and control actions are at awx-manage dispatcherctl.'
)
def add_arguments(self, parser):
return
def handle(self, *arg, **options):
ensure_no_dispatcherd_env_config()
self.configure_dispatcher_logging()
config = get_dispatcherd_config(for_service=True)
config_hash = _hash_config(config)
logger.info(
'Using dispatcherd config generated from awx.main.dispatch.config.get_dispatcherd_config (sha256=%s)',
config_hash,
)
# Close the connection, because the pg_notify broker will create new async connection
connection.close()
django_cache.close()
dispatcher_setup(config)
run_service()
def configure_dispatcher_logging(self):
# Apply special log rule for the parent process
special_logging = copy.deepcopy(settings.LOGGING)
changed_handlers = []
for handler_name, handler_config in special_logging.get('handlers', {}).items():
filters = handler_config.get('filters', [])
if 'dynamic_level_filter' in filters:
handler_config['filters'] = [flt for flt in filters if flt != 'dynamic_level_filter']
changed_handlers.append(handler_name)
logger.info(f'Dispatcherd main process replaced log level filter for handlers: {changed_handlers}')
# Apply the custom logging level here, before the asyncio code starts
special_logging.setdefault('loggers', {}).setdefault('dispatcherd', {})
special_logging['loggers']['dispatcherd']['level'] = settings.LOG_AGGREGATOR_LEVEL
logging.config.dictConfig(special_logging)

View File

@@ -1,26 +1,20 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import logging
import logging.config
import yaml
import copy
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.core.cache import cache as django_cache
from django.db import connection
import yaml
from django.core.management.base import CommandError
from dispatcherd.factories import get_control_from_settings
from dispatcherd import run_service
from dispatcherd.config import setup as dispatcher_setup
from awx.main.dispatch.config import get_dispatcherd_config
from awx.main.management.commands.dispatcherd import Command as DispatcherdCommand
logger = logging.getLogger('awx.main.dispatch')
class Command(BaseCommand):
help = 'Launch the task dispatcher'
class Command(DispatcherdCommand):
help = 'Launch the task dispatcher (deprecated; use awx-manage dispatcherd)'
def add_arguments(self, parser):
parser.add_argument('--status', dest='status', action='store_true', help='print the internal state of any running dispatchers')
@@ -34,8 +28,10 @@ class Command(BaseCommand):
'Only running tasks can be canceled, queued tasks must be started before they can be canceled.'
),
)
super().add_arguments(parser)
def handle(self, *arg, **options):
def handle(self, *args, **options):
logger.warning('awx-manage run_dispatcher is deprecated; use awx-manage dispatcherd')
if options.get('status'):
ctl = get_control_from_settings()
running_data = ctl.control_with_reply('status')
@@ -65,27 +61,4 @@ class Command(BaseCommand):
results.append(result)
print(yaml.dump(results, default_flow_style=False))
return
self.configure_dispatcher_logging()
# Close the connection, because the pg_notify broker will create new async connection
connection.close()
django_cache.close()
dispatcher_setup(get_dispatcherd_config(for_service=True))
run_service()
def configure_dispatcher_logging(self):
# Apply special log rule for the parent process
special_logging = copy.deepcopy(settings.LOGGING)
changed_handlers = []
for handler_name, handler_config in special_logging.get('handlers', {}).items():
filters = handler_config.get('filters', [])
if 'dynamic_level_filter' in filters:
handler_config['filters'] = [flt for flt in filters if flt != 'dynamic_level_filter']
changed_handlers.append(handler_name)
logger.info(f'Dispatcherd main process replaced log level filter for handlers: {changed_handlers}')
# Apply the custom logging level here, before the asyncio code starts
special_logging.setdefault('loggers', {}).setdefault('dispatcherd', {})
special_logging['loggers']['dispatcherd']['level'] = settings.LOG_AGGREGATOR_LEVEL
logging.config.dictConfig(special_logging)
return super().handle(*args, **options)

View File

@@ -0,0 +1,17 @@
import pytest
from awx.main.dispatch.config import get_dispatcherd_config
from awx.main.management.commands.dispatcherd import _hash_config
@pytest.mark.django_db
def test_dispatcherd_config_hash_is_stable(settings, monkeypatch):
monkeypatch.setenv('AWX_COMPONENT', 'dispatcher')
settings.CLUSTER_HOST_ID = 'test-node'
settings.JOB_EVENT_WORKERS = 1
settings.DISPATCHER_SCHEDULE = {}
config_one = get_dispatcherd_config(for_service=True)
config_two = get_dispatcherd_config(for_service=True)
assert _hash_config(config_one) == _hash_config(config_two)

View File

@@ -9,7 +9,7 @@ from unittest import mock
import pytest
from awx.main.tasks.system import CleanupImagesAndFiles, execution_node_health_check, inspect_established_receptor_connections, clear_setting_cache
from awx.main.management.commands.run_dispatcher import Command
from awx.main.management.commands.dispatcherd import Command
from awx.main.models import Instance, Job, ReceptorAddress, InstanceLink

View File

@@ -0,0 +1,92 @@
import io
import pytest
from django.core.management.base import CommandError
from awx.main.management.commands import dispatcherctl
@pytest.fixture(autouse=True)
def clear_dispatcher_env(monkeypatch, mocker):
monkeypatch.delenv('DISPATCHERD_CONFIG_FILE', raising=False)
mocker.patch.object(dispatcherctl.logging, 'basicConfig')
mocker.patch.object(dispatcherctl, 'connection', mocker.Mock(vendor='postgresql'))
def test_dispatcherctl_runs_control_with_generated_config(mocker):
command = dispatcherctl.Command()
command.stdout = io.StringIO()
data = {'foo': 'bar'}
mocker.patch.object(dispatcherctl, '_build_command_data_from_args', return_value=data)
dispatcher_setup = mocker.patch.object(dispatcherctl, 'dispatcher_setup')
config_data = {'setting': 'value'}
mocker.patch.object(dispatcherctl, 'get_dispatcherd_config', return_value=config_data)
control = mocker.Mock()
control.control_with_reply.return_value = [{'status': 'ok'}]
mocker.patch.object(dispatcherctl, 'get_control_from_settings', return_value=control)
mocker.patch.object(dispatcherctl.yaml, 'dump', return_value='payload\n')
command.handle(
command='running',
config=dispatcherctl.DEFAULT_CONFIG_FILE,
expected_replies=1,
log_level='INFO',
)
dispatcher_setup.assert_called_once_with(config_data)
control.control_with_reply.assert_called_once_with('running', data=data, expected_replies=1)
assert command.stdout.getvalue() == 'payload\n'
def test_dispatcherctl_rejects_custom_config_path():
command = dispatcherctl.Command()
command.stdout = io.StringIO()
with pytest.raises(CommandError):
command.handle(
command='running',
config='/tmp/dispatcher.yml',
expected_replies=1,
log_level='INFO',
)
def test_dispatcherctl_rejects_sqlite_db(mocker):
command = dispatcherctl.Command()
command.stdout = io.StringIO()
mocker.patch.object(dispatcherctl, 'connection', mocker.Mock(vendor='sqlite'))
with pytest.raises(CommandError, match='sqlite3'):
command.handle(
command='running',
config=dispatcherctl.DEFAULT_CONFIG_FILE,
expected_replies=1,
log_level='INFO',
)
def test_dispatcherctl_raises_when_replies_missing(mocker):
command = dispatcherctl.Command()
command.stdout = io.StringIO()
mocker.patch.object(dispatcherctl, '_build_command_data_from_args', return_value={})
mocker.patch.object(dispatcherctl, 'dispatcher_setup')
mocker.patch.object(dispatcherctl, 'get_dispatcherd_config', return_value={})
control = mocker.Mock()
control.control_with_reply.return_value = [{'status': 'ok'}]
mocker.patch.object(dispatcherctl, 'get_control_from_settings', return_value=control)
mocker.patch.object(dispatcherctl.yaml, 'dump', return_value='- status: ok\n')
with pytest.raises(CommandError):
command.handle(
command='running',
config=dispatcherctl.DEFAULT_CONFIG_FILE,
expected_replies=2,
log_level='INFO',
)
control.control_with_reply.assert_called_once_with('running', data={}, expected_replies=2)

View File

@@ -110,7 +110,7 @@ associated Python code:
Dispatcher Implementation
-------------------------
Every node in an AWX install runs `awx-manage run_dispatcher`, a Python process
Every node in an AWX install runs `awx-manage dispatcherd`, a Python process
that uses the `kombu` library to consume messages from the appropriate queues
for that node (the default shared queue, a queue specific to the node's
hostname, and the broadcast queue). The Dispatcher process manages a pool of
@@ -121,11 +121,11 @@ the associated Python code.
Debugging
---------
`awx-manage run_dispatcher` includes a few flags that allow interaction and
`awx-manage dispatcherctl` includes a few flags that allow interaction and
debugging:
```
[root@awx /]# awx-manage run_dispatcher --status
[root@awx /]# awx-manage dispatcherctl status
2018-09-14 18:39:22,223 WARNING awx.main.dispatch checking dispatcher status for awx
awx[pid:9610] workers total=4 min=4 max=60
. worker[pid:9758] sent=12 finished=12 qsize=0 rss=106.730MB [IDLE]
@@ -139,7 +139,7 @@ This outputs running and queued task UUIDs handled by a specific dispatcher
(which corresponds to `main_unifiedjob.celery_task_id` in the database):
```
[root@awx /]# awx-manage run_dispatcher --running
[root@awx /]# awx-manage dispatcherctl running
2018-09-14 18:39:22,223 WARNING awx.main.dispatch checking dispatcher running for awx
['eb3b0a83-86da-413d-902a-16d7530a6b25', 'f447266a-23da-42b4-8025-fe379d2db96f']
```

View File

@@ -10,7 +10,7 @@ pidfile = /var/run/supervisor/supervisor.task.pid
command = make dispatcher
directory = /awx_devel
{% else %}
command = awx-manage run_dispatcher
command = awx-manage dispatcherd
directory = /var/lib/awx
{% endif %}
autorestart = true

View File

@@ -4,7 +4,7 @@ minfds = 4096
nodaemon=true
[program:awx-dispatcher]
command = awx-manage run_dispatcher
command = awx-manage dispatcherd
autorestart = true
stopasgroup=true
killasgroup=true

View File

@@ -102,7 +102,7 @@
"-b",
"provision_instance",
"run_callback_receiver",
"run_dispatcher",
"dispatcherd",
"run_rsyslog_configurer",
"run_ws_heartbeat",
"run_wsrelay",
@@ -112,7 +112,7 @@
"-b",
"provision_instance",
"run_callback_receiver",
"run_dispatcher",
"dispatcherd",
"run_rsyslog_configurer",
"run_ws_heartbeat",
"run_wsrelay",

View File

@@ -9,7 +9,7 @@ except ImportError:
SOSREPORT_CONTROLLER_COMMANDS = [
"awx-manage --version", # controller version
"awx-manage list_instances", # controller cluster configuration
"awx-manage run_dispatcher --status", # controller dispatch worker status
"awx-manage dispatcherctl status", # controller dispatch comprehensive status
"awx-manage run_callback_receiver --status", # controller callback worker status
"awx-manage check_license --data", # controller license status
"awx-manage run_wsrelay --status", # controller websocket relay status