diff --git a/Makefile b/Makefile index fb7f1509d4..ec03524e99 100644 --- a/Makefile +++ b/Makefile @@ -289,7 +289,7 @@ dispatcher: @if [ "$(VENV_BASE)" ]; then \ . $(VENV_BASE)/awx/bin/activate; \ fi; \ - $(PYTHON) manage.py run_dispatcher + $(PYTHON) manage.py dispatcherd ## Run to start the zeromq callback receiver receiver: diff --git a/awx/main/apps.py b/awx/main/apps.py index 16c76e6455..acb6c8ea93 100644 --- a/awx/main/apps.py +++ b/awx/main/apps.py @@ -82,7 +82,7 @@ class MainConfig(AppConfig): def configure_dispatcherd(self): """This implements the default configuration for dispatcherd - If running the tasking service like awx-manage run_dispatcher, + If running the tasking service like awx-manage dispatcherd, some additional config will be applied on top of this. This configuration provides the minimum such that code can submit tasks to pg_notify to run those tasks. diff --git a/awx/main/management/commands/dispatcherctl.py b/awx/main/management/commands/dispatcherctl.py new file mode 100644 index 0000000000..b7283c9b2a --- /dev/null +++ b/awx/main/management/commands/dispatcherctl.py @@ -0,0 +1,88 @@ +import argparse +import inspect +import logging +import os +import sys + +import yaml + +from django.core.management.base import BaseCommand, CommandError +from django.db import connection + +from dispatcherd.cli import ( + CONTROL_ARG_SCHEMAS, + DEFAULT_CONFIG_FILE, + _base_cli_parent, + _control_common_parent, + _register_control_arguments, + _build_command_data_from_args, +) +from dispatcherd.config import setup as dispatcher_setup +from dispatcherd.factories import get_control_from_settings +from dispatcherd.service import control_tasks + +from awx.main.dispatch.config import get_dispatcherd_config +from awx.main.management.commands.dispatcherd import ensure_no_dispatcherd_env_config + +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = 'Dispatcher control operations' + + def add_arguments(self, parser): + parser.description = 'Run dispatcherd control commands using awx-manage.' + base_parent = _base_cli_parent() + control_parent = _control_common_parent() + parser._add_container_actions(base_parent) + parser._add_container_actions(control_parent) + + subparsers = parser.add_subparsers(dest='command', metavar='command') + subparsers.required = True + shared_parents = [base_parent, control_parent] + for command in control_tasks.__all__: + func = getattr(control_tasks, command, None) + doc = inspect.getdoc(func) or '' + summary = doc.splitlines()[0] if doc else None + command_parser = subparsers.add_parser( + command, + help=summary, + description=doc, + parents=shared_parents, + ) + _register_control_arguments(command_parser, CONTROL_ARG_SCHEMAS.get(command)) + + def handle(self, *args, **options): + command = options.pop('command', None) + if not command: + raise CommandError('No dispatcher control command specified') + + for django_opt in ('verbosity', 'traceback', 'no_color', 'force_color', 'skip_checks'): + options.pop(django_opt, None) + + log_level = options.pop('log_level', 'DEBUG') + config_path = os.path.abspath(options.pop('config', DEFAULT_CONFIG_FILE)) + expected_replies = options.pop('expected_replies', 1) + + logging.basicConfig(level=getattr(logging, log_level), stream=sys.stdout) + logger.debug(f"Configured standard out logging at {log_level} level") + + default_config = os.path.abspath(DEFAULT_CONFIG_FILE) + ensure_no_dispatcherd_env_config() + if config_path != default_config: + raise CommandError('The config path CLI option is not allowed for the awx-manage command') + if connection.vendor == 'sqlite': + raise CommandError('dispatcherctl is not supported with sqlite3; use a PostgreSQL database') + else: + logger.info('Using config generated from awx.main.dispatch.config.get_dispatcherd_config') + dispatcher_setup(get_dispatcherd_config()) + + schema_namespace = argparse.Namespace(**options) + data = _build_command_data_from_args(schema_namespace, command) + + ctl = get_control_from_settings() + returned = ctl.control_with_reply(command, data=data, expected_replies=expected_replies) + self.stdout.write(yaml.dump(returned, default_flow_style=False)) + if len(returned) < expected_replies: + logger.error(f'Obtained only {len(returned)} of {expected_replies}, exiting with non-zero code') + raise CommandError('dispatcherctl returned fewer replies than expected') diff --git a/awx/main/management/commands/dispatcherd.py b/awx/main/management/commands/dispatcherd.py new file mode 100644 index 0000000000..69a7625571 --- /dev/null +++ b/awx/main/management/commands/dispatcherd.py @@ -0,0 +1,85 @@ +# Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved +import copy +import hashlib +import json +import logging +import logging.config +import os + +from django.conf import settings +from django.core.cache import cache as django_cache +from django.core.management.base import BaseCommand, CommandError +from django.db import connection + +from dispatcherd.config import setup as dispatcher_setup + +from awx.main.dispatch.config import get_dispatcherd_config + +logger = logging.getLogger('awx.main.dispatch') + + +from dispatcherd import run_service + + +def _json_default(value): + if isinstance(value, set): + return sorted(value) + if isinstance(value, tuple): + return list(value) + return str(value) + + +def _hash_config(config): + serialized = json.dumps(config, sort_keys=True, separators=(',', ':'), default=_json_default) + return hashlib.sha256(serialized.encode('utf-8')).hexdigest() + + +def ensure_no_dispatcherd_env_config(): + if os.getenv('DISPATCHERD_CONFIG_FILE'): + raise CommandError('DISPATCHERD_CONFIG_FILE is set but awx-manage dispatcherd uses dynamic config from code') + + +class Command(BaseCommand): + help = ( + 'Run the background task service, this is the supported entrypoint since the introduction of dispatcherd as a library. ' + 'This replaces the prior awx-manage run_dispatcher service, and control actions are at awx-manage dispatcherctl.' + ) + + def add_arguments(self, parser): + return + + def handle(self, *arg, **options): + ensure_no_dispatcherd_env_config() + + self.configure_dispatcher_logging() + config = get_dispatcherd_config(for_service=True) + config_hash = _hash_config(config) + logger.info( + 'Using dispatcherd config generated from awx.main.dispatch.config.get_dispatcherd_config (sha256=%s)', + config_hash, + ) + + # Close the connection, because the pg_notify broker will create new async connection + connection.close() + django_cache.close() + dispatcher_setup(config) + + run_service() + + def configure_dispatcher_logging(self): + # Apply special log rule for the parent process + special_logging = copy.deepcopy(settings.LOGGING) + changed_handlers = [] + for handler_name, handler_config in special_logging.get('handlers', {}).items(): + filters = handler_config.get('filters', []) + if 'dynamic_level_filter' in filters: + handler_config['filters'] = [flt for flt in filters if flt != 'dynamic_level_filter'] + changed_handlers.append(handler_name) + logger.info(f'Dispatcherd main process replaced log level filter for handlers: {changed_handlers}') + + # Apply the custom logging level here, before the asyncio code starts + special_logging.setdefault('loggers', {}).setdefault('dispatcherd', {}) + special_logging['loggers']['dispatcherd']['level'] = settings.LOG_AGGREGATOR_LEVEL + + logging.config.dictConfig(special_logging) diff --git a/awx/main/management/commands/run_dispatcher.py b/awx/main/management/commands/run_dispatcher.py index a5e85abe33..1acd30c7bb 100644 --- a/awx/main/management/commands/run_dispatcher.py +++ b/awx/main/management/commands/run_dispatcher.py @@ -1,26 +1,20 @@ # Copyright (c) 2015 Ansible, Inc. # All Rights Reserved. import logging -import logging.config -import yaml -import copy -from django.conf import settings -from django.core.management.base import BaseCommand, CommandError -from django.core.cache import cache as django_cache -from django.db import connection +import yaml + +from django.core.management.base import CommandError from dispatcherd.factories import get_control_from_settings -from dispatcherd import run_service -from dispatcherd.config import setup as dispatcher_setup -from awx.main.dispatch.config import get_dispatcherd_config +from awx.main.management.commands.dispatcherd import Command as DispatcherdCommand logger = logging.getLogger('awx.main.dispatch') -class Command(BaseCommand): - help = 'Launch the task dispatcher' +class Command(DispatcherdCommand): + help = 'Launch the task dispatcher (deprecated; use awx-manage dispatcherd)' def add_arguments(self, parser): parser.add_argument('--status', dest='status', action='store_true', help='print the internal state of any running dispatchers') @@ -34,8 +28,10 @@ class Command(BaseCommand): 'Only running tasks can be canceled, queued tasks must be started before they can be canceled.' ), ) + super().add_arguments(parser) - def handle(self, *arg, **options): + def handle(self, *args, **options): + logger.warning('awx-manage run_dispatcher is deprecated; use awx-manage dispatcherd') if options.get('status'): ctl = get_control_from_settings() running_data = ctl.control_with_reply('status') @@ -65,27 +61,4 @@ class Command(BaseCommand): results.append(result) print(yaml.dump(results, default_flow_style=False)) return - - self.configure_dispatcher_logging() - # Close the connection, because the pg_notify broker will create new async connection - connection.close() - django_cache.close() - dispatcher_setup(get_dispatcherd_config(for_service=True)) - run_service() - - def configure_dispatcher_logging(self): - # Apply special log rule for the parent process - special_logging = copy.deepcopy(settings.LOGGING) - changed_handlers = [] - for handler_name, handler_config in special_logging.get('handlers', {}).items(): - filters = handler_config.get('filters', []) - if 'dynamic_level_filter' in filters: - handler_config['filters'] = [flt for flt in filters if flt != 'dynamic_level_filter'] - changed_handlers.append(handler_name) - logger.info(f'Dispatcherd main process replaced log level filter for handlers: {changed_handlers}') - - # Apply the custom logging level here, before the asyncio code starts - special_logging.setdefault('loggers', {}).setdefault('dispatcherd', {}) - special_logging['loggers']['dispatcherd']['level'] = settings.LOG_AGGREGATOR_LEVEL - - logging.config.dictConfig(special_logging) + return super().handle(*args, **options) diff --git a/awx/main/tests/functional/management/test_dispatcherd.py b/awx/main/tests/functional/management/test_dispatcherd.py new file mode 100644 index 0000000000..92f1d208d9 --- /dev/null +++ b/awx/main/tests/functional/management/test_dispatcherd.py @@ -0,0 +1,17 @@ +import pytest + +from awx.main.dispatch.config import get_dispatcherd_config +from awx.main.management.commands.dispatcherd import _hash_config + + +@pytest.mark.django_db +def test_dispatcherd_config_hash_is_stable(settings, monkeypatch): + monkeypatch.setenv('AWX_COMPONENT', 'dispatcher') + settings.CLUSTER_HOST_ID = 'test-node' + settings.JOB_EVENT_WORKERS = 1 + settings.DISPATCHER_SCHEDULE = {} + + config_one = get_dispatcherd_config(for_service=True) + config_two = get_dispatcherd_config(for_service=True) + + assert _hash_config(config_one) == _hash_config(config_two) diff --git a/awx/main/tests/functional/tasks/test_tasks_system.py b/awx/main/tests/functional/tasks/test_tasks_system.py index 73f751ee51..96ce7aa413 100644 --- a/awx/main/tests/functional/tasks/test_tasks_system.py +++ b/awx/main/tests/functional/tasks/test_tasks_system.py @@ -9,7 +9,7 @@ from unittest import mock import pytest from awx.main.tasks.system import CleanupImagesAndFiles, execution_node_health_check, inspect_established_receptor_connections, clear_setting_cache -from awx.main.management.commands.run_dispatcher import Command +from awx.main.management.commands.dispatcherd import Command from awx.main.models import Instance, Job, ReceptorAddress, InstanceLink diff --git a/awx/main/tests/unit/commands/test_dispatcherctl.py b/awx/main/tests/unit/commands/test_dispatcherctl.py new file mode 100644 index 0000000000..50804577c3 --- /dev/null +++ b/awx/main/tests/unit/commands/test_dispatcherctl.py @@ -0,0 +1,92 @@ +import io + +import pytest + +from django.core.management.base import CommandError + +from awx.main.management.commands import dispatcherctl + + +@pytest.fixture(autouse=True) +def clear_dispatcher_env(monkeypatch, mocker): + monkeypatch.delenv('DISPATCHERD_CONFIG_FILE', raising=False) + mocker.patch.object(dispatcherctl.logging, 'basicConfig') + mocker.patch.object(dispatcherctl, 'connection', mocker.Mock(vendor='postgresql')) + + +def test_dispatcherctl_runs_control_with_generated_config(mocker): + command = dispatcherctl.Command() + command.stdout = io.StringIO() + + data = {'foo': 'bar'} + mocker.patch.object(dispatcherctl, '_build_command_data_from_args', return_value=data) + dispatcher_setup = mocker.patch.object(dispatcherctl, 'dispatcher_setup') + config_data = {'setting': 'value'} + mocker.patch.object(dispatcherctl, 'get_dispatcherd_config', return_value=config_data) + + control = mocker.Mock() + control.control_with_reply.return_value = [{'status': 'ok'}] + mocker.patch.object(dispatcherctl, 'get_control_from_settings', return_value=control) + mocker.patch.object(dispatcherctl.yaml, 'dump', return_value='payload\n') + + command.handle( + command='running', + config=dispatcherctl.DEFAULT_CONFIG_FILE, + expected_replies=1, + log_level='INFO', + ) + + dispatcher_setup.assert_called_once_with(config_data) + control.control_with_reply.assert_called_once_with('running', data=data, expected_replies=1) + assert command.stdout.getvalue() == 'payload\n' + + +def test_dispatcherctl_rejects_custom_config_path(): + command = dispatcherctl.Command() + command.stdout = io.StringIO() + + with pytest.raises(CommandError): + command.handle( + command='running', + config='/tmp/dispatcher.yml', + expected_replies=1, + log_level='INFO', + ) + + +def test_dispatcherctl_rejects_sqlite_db(mocker): + command = dispatcherctl.Command() + command.stdout = io.StringIO() + + mocker.patch.object(dispatcherctl, 'connection', mocker.Mock(vendor='sqlite')) + + with pytest.raises(CommandError, match='sqlite3'): + command.handle( + command='running', + config=dispatcherctl.DEFAULT_CONFIG_FILE, + expected_replies=1, + log_level='INFO', + ) + + +def test_dispatcherctl_raises_when_replies_missing(mocker): + command = dispatcherctl.Command() + command.stdout = io.StringIO() + + mocker.patch.object(dispatcherctl, '_build_command_data_from_args', return_value={}) + mocker.patch.object(dispatcherctl, 'dispatcher_setup') + mocker.patch.object(dispatcherctl, 'get_dispatcherd_config', return_value={}) + control = mocker.Mock() + control.control_with_reply.return_value = [{'status': 'ok'}] + mocker.patch.object(dispatcherctl, 'get_control_from_settings', return_value=control) + mocker.patch.object(dispatcherctl.yaml, 'dump', return_value='- status: ok\n') + + with pytest.raises(CommandError): + command.handle( + command='running', + config=dispatcherctl.DEFAULT_CONFIG_FILE, + expected_replies=2, + log_level='INFO', + ) + + control.control_with_reply.assert_called_once_with('running', data={}, expected_replies=2) diff --git a/docs/tasks.md b/docs/tasks.md index 6c3b7e3c71..5796bed730 100644 --- a/docs/tasks.md +++ b/docs/tasks.md @@ -110,7 +110,7 @@ associated Python code: Dispatcher Implementation ------------------------- -Every node in an AWX install runs `awx-manage run_dispatcher`, a Python process +Every node in an AWX install runs `awx-manage dispatcherd`, a Python process that uses the `kombu` library to consume messages from the appropriate queues for that node (the default shared queue, a queue specific to the node's hostname, and the broadcast queue). The Dispatcher process manages a pool of @@ -121,11 +121,11 @@ the associated Python code. Debugging --------- -`awx-manage run_dispatcher` includes a few flags that allow interaction and +`awx-manage dispatcherctl` includes a few flags that allow interaction and debugging: ``` -[root@awx /]# awx-manage run_dispatcher --status +[root@awx /]# awx-manage dispatcherctl status 2018-09-14 18:39:22,223 WARNING awx.main.dispatch checking dispatcher status for awx awx[pid:9610] workers total=4 min=4 max=60 . worker[pid:9758] sent=12 finished=12 qsize=0 rss=106.730MB [IDLE] @@ -139,7 +139,7 @@ This outputs running and queued task UUIDs handled by a specific dispatcher (which corresponds to `main_unifiedjob.celery_task_id` in the database): ``` -[root@awx /]# awx-manage run_dispatcher --running +[root@awx /]# awx-manage dispatcherctl running 2018-09-14 18:39:22,223 WARNING awx.main.dispatch checking dispatcher running for awx ['eb3b0a83-86da-413d-902a-16d7530a6b25', 'f447266a-23da-42b4-8025-fe379d2db96f'] ``` diff --git a/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 b/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 index b102a50977..2f75a26124 100644 --- a/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 +++ b/tools/ansible/roles/dockerfile/templates/supervisor_task.conf.j2 @@ -10,7 +10,7 @@ pidfile = /var/run/supervisor/supervisor.task.pid command = make dispatcher directory = /awx_devel {% else %} -command = awx-manage run_dispatcher +command = awx-manage dispatcherd directory = /var/lib/awx {% endif %} autorestart = true diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index 9541b54531..b670af40ae 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -4,7 +4,7 @@ minfds = 4096 nodaemon=true [program:awx-dispatcher] -command = awx-manage run_dispatcher +command = awx-manage dispatcherd autorestart = true stopasgroup=true killasgroup=true diff --git a/tools/grafana/dashboards/services_dashboard.json b/tools/grafana/dashboards/services_dashboard.json index bcf9140724..51694cee12 100644 --- a/tools/grafana/dashboards/services_dashboard.json +++ b/tools/grafana/dashboards/services_dashboard.json @@ -102,7 +102,7 @@ "-b", "provision_instance", "run_callback_receiver", - "run_dispatcher", + "dispatcherd", "run_rsyslog_configurer", "run_ws_heartbeat", "run_wsrelay", @@ -112,7 +112,7 @@ "-b", "provision_instance", "run_callback_receiver", - "run_dispatcher", + "dispatcherd", "run_rsyslog_configurer", "run_ws_heartbeat", "run_wsrelay", diff --git a/tools/sosreport/controller.py b/tools/sosreport/controller.py index 318e2cc7ef..f369b71fea 100644 --- a/tools/sosreport/controller.py +++ b/tools/sosreport/controller.py @@ -9,7 +9,7 @@ except ImportError: SOSREPORT_CONTROLLER_COMMANDS = [ "awx-manage --version", # controller version "awx-manage list_instances", # controller cluster configuration - "awx-manage run_dispatcher --status", # controller dispatch worker status + "awx-manage dispatcherctl status", # controller dispatch comprehensive status "awx-manage run_callback_receiver --status", # controller callback worker status "awx-manage check_license --data", # controller license status "awx-manage run_wsrelay --status", # controller websocket relay status