Files
awx/awx/main/constants.py
Alan Rominger 94764a1f17 AAP-42649 Flag-gated use of "dispatcherd" as its own library (#15981)
Use dynamic AWX max_workers value

Make basic --status and --running commands work

Make feature flag enabled true by default for development

* [dispatcherd] Dispatcher socket-based `--status` demo working (#15908)

* Fix Task Decorator to Work With and Without Feature Flag (AAP-41775) (#15911)

* refactor(system): extract common heartbeat helpers and split cluster_node_heartbeat

Extract common heartbeat logic into helper functions:  _heartbeat_instance_management: consolidates instance management, health checks, and lost-instance detection.  _heartbeat_check_versions: compares instance versions and initiates shutdown when necessary.  _heartbeat_handle_lost_instances: reaps jobs and marks lost instances offline.

Refactor the original cluster_node_heartbeat to use these helpers and retain legacy behavior (using bind_kwargs).

Introduce adispatch_cluster_node_heartbeat for dispatcherd: uses the control API to retrieve running tasks and reaps them.

Link the two implementations by attaching adispatch_cluster_node_heartbeat as the _new_method on cluster_node_heartbeat.

* feat(publish): delegate heartbeat task submission to new dispatcherd implementation

Update apply_async to check at runtime if FEATURE_NEW_DISPATCHER is enabled.

When the task is cluster_node_heartbeat and a _new_method is attached, delegate the task submission to the new dispatcherd implementation.

Preserve the original behavior for all other tasks and fallback on error.

* refactor(system): extract task ID retrieval from dispatcherd into helper function

Improves readability of adispatch_cluster_node_heartbeat by extracting
the complex UUID parsing logic into a dedicated helper function.
Adds clearer error handling and follows established code patterns.

* fix(dispatcher): Enable task decorator to work with and without feature flag

Implemented a new approach for handling task execution with feature flags
by attaching alternative implementations to apply_async._new_method. This
allows cluster_node_heartbeat to work correctly with both the legacy and
new dispatcher systems without modifying core decorator logic.

AAP-41775

* fix(dispatcher): Improve error handling and logging in feature flag implementation

- Add error handling when attaching alternative dispatcher implementation
- Fix method self-reference in apply_async to properly use cls.apply_async
- Document limitations of this targeted approach for specific tasks
- Add logging for better debugging of dispatcher selection
- Ensure decorator timing by keeping method attachment after function definitions

This completes the robust implementation for switching between dispatcher
implementations based on feature flags.

AAP-41775

* fix(dispatcher): Implement registry pattern for dispatcher feature flag compatibility

Replaces direct method attribute assignment with a global registry for
alternative implementations. The original approach tried to attach new
methods directly to apply_async bound methods, which fails because bound
methods don't support attribute assignment in Python.

The registry pattern:
- Creates a global ALTERNATIVE_TASK_IMPLEMENTATIONS dict in publish.py
- Registers alternative implementations by task name
- Modifies apply_async to check the registry when feature flag is enabled
- Adds extensive logging throughout the process for debugging

This enables cluster_node_heartbeat to work correctly with both the legacy
and new dispatcher implementations based on the FEATURE_NEW_DISPATCHER flag.

AAP-41775

* refactor(dispatcher): Remove excessive logging from dispatcher implementation

Reduces verbose debugging logs while maintaining essential logging for critical
operations. Preserves:
- Task implementation selection based on feature flag
- Registration success/failure messages
- Critical error reporting

Removed:
- Registry content debugging messages
- Repetitive task diagnostics
- Non-essential information logging

AAP-41775

* fix(dispatcher): Fix shallow copy in dispatcher schedule conversion

This resolves "AttributeError: 'float' object has no attribute 'total_seconds'"
errors when the dispatcher is restarted.

Refs: AAP-41775

* Use IPC mechanism to get running tasks (#15926)
* Allow tasks from tasks
* Fix failure to limit to waiting jobs
* Get job record with lock
* Fix failures in dispatcherd feature branch (#15930)
* Fully handle DispatcherCancel
* Complete rest of preload import work
* Complete dispatcherd integration & job cancellation (AAP-43033) (#15941)
* feat(dispatcher): Implement job cancellation for new dispatcher

Adds feature-flag-aware job cancellation that routes cancel requests to either
the legacy dispatcher or the new dispatcherd library based on the
FEATURE_NEW_DISPATCHER flag.

- Updates cancel_dispatcher_process() to use dispatcherd's control API when enabled
- Handles both direct cancellation and task manager workflow cancellation cases
- Works with DispatcherCancel exception handling to properly handle SIGUSR1 signals

AAP-43033

* fix(dispatcher): Update run_dispatcher.py to properly handle task cancellation

Modifies the cancel command in run_dispatcher.py to properly cancel tasks
when the FEATURE_NEW_DISPATCHER flag is enabled, rather than just listing
running tasks.

The implementation translates each task UUID to the appropriate
filter format expected by the dispatcherd control API, maintaining the same
behavior as the original implementation.

Part of: AAP-43033

* refactor(system): Refactor dispatch_startup() to extract common startup logic and branch based on feature flag

This commit refactors the dispatch_startup() function to improve clarity and consistency across the legacy
and new dispatcherd flows.

No dispatcher-specific functionality is needed beyond the changes made, so this refactoring improves robustness without
altering core behavior.

* refactor(system): Refactor inform_cluster_of_shutdown() for clarity

* refactor(tasks): Replace @task with @task_awx across 22 tasks for dispatcher compatibility

- Migrated all task decorators to use @task_awx, ensuring dispatcher-aware behavior.
- Tested each task with the new dispatcherd, verifying that tasks using the registry pattern execute correctly without needing binder‐based alternative implementations.
- Removed redundant logging and outdated comments.
- Legacy tasks that do not require special parameter extraction continue to use their original logic.
- This commit reflects our complete journey of testing and verifying dispatcherd compatibility across all 22 tasks.

* refactor(publish): fix linter

* Fix bug from the branch rebase

* AAP-43763 Add tests for connection management in dispatcherd workers (#15949)

* Add test for job cancel in live tests
* Fix bug from the branch rebase
* Add test for connection recovery after connection broke
* Add test for breaking connection

* Fix dispatcherd bugs: schedule aliases, job kwargs handling, cancel handling (#15960)

* Put in job kwargs handling, not done before

* AAP-44382 [dispatcherd] Fixes for running with feature flag off (#15973)

* Use correct decorator for test of tasks

* Finalize dispatcherd feature branch (#15975)

* Work dispatcherd into dependency management system

* Use util methods from DAB

* Rename the dispatcherd feature flag, and flip default to not-enabled

* Move to new submit_task method

* Update the location of the sock file

* AAP-44381 Make dispatcherd config loading more lazy (#15979)

* Make dispatcherd config loading more lazy

* Make submission error more obvious

* Fix signal handling gap, hijack SIGUSR1 from dispatcherd (#15983)

* Fix signal handling gap, hijack SIGUSR1 from dispatcherd

* Minor adjustments to dispatcherd status command

* [dispatcherd] Get rid of alternative task registry (#15984)

Get rid of alternative task registry

* Fix deadlock error and other cleanup errors (#15987)
* Move to proper error handling location

---------

Co-authored-by: artem_tiupin <70763601+art-tapin@users.noreply.github.com>
2025-05-16 09:39:22 -04:00

143 lines
4.9 KiB
Python

# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import re
from django.utils.translation import gettext_lazy as _
__all__ = [
'PRIVILEGE_ESCALATION_METHODS',
'ANSI_SGR_PATTERN',
'CAN_CANCEL',
'ACTIVE_STATES',
'STANDARD_INVENTORY_UPDATE_ENV',
]
PRIVILEGE_ESCALATION_METHODS = [
('sudo', _('Sudo')),
('su', _('Su')),
('pbrun', _('Pbrun')),
('pfexec', _('Pfexec')),
('dzdo', _('DZDO')),
('pmrun', _('Pmrun')),
('runas', _('Runas')),
('enable', _('Enable')),
('doas', _('Doas')),
('ksu', _('Ksu')),
('machinectl', _('Machinectl')),
('sesu', _('Sesu')),
]
CHOICES_PRIVILEGE_ESCALATION_METHODS = [('', _('None'))] + PRIVILEGE_ESCALATION_METHODS
ANSI_SGR_PATTERN = re.compile(r'\x1b\[[0-9;]*m')
STANDARD_INVENTORY_UPDATE_ENV = {
# Failure to parse inventory should always be fatal
'ANSIBLE_INVENTORY_UNPARSED_FAILED': 'True',
# Always use the --export option for ansible-inventory
'ANSIBLE_INVENTORY_EXPORT': 'True',
# Redirecting output to stderr allows JSON parsing to still work with -vvv
'ANSIBLE_VERBOSE_TO_STDERR': 'True',
# if ansible-inventory --limit is used for an inventory import, unmatched should be a failure
'ANSIBLE_HOST_PATTERN_MISMATCH': 'error',
}
CAN_CANCEL = ('new', 'pending', 'waiting', 'running')
ACTIVE_STATES = CAN_CANCEL
ERROR_STATES = ('error',)
MINIMAL_EVENTS = set(['playbook_on_play_start', 'playbook_on_task_start', 'playbook_on_stats', 'EOF'])
CENSOR_VALUE = '************'
ENV_BLOCKLIST = frozenset(
(
'VIRTUAL_ENV',
'PATH',
'PYTHONPATH',
'JOB_ID',
'INVENTORY_ID',
'INVENTORY_SOURCE_ID',
'INVENTORY_UPDATE_ID',
'AD_HOC_COMMAND_ID',
'REST_API_URL',
'REST_API_TOKEN',
'MAX_EVENT_RES',
'CALLBACK_QUEUE',
'CALLBACK_CONNECTION',
'CACHE',
'JOB_CALLBACK_DEBUG',
'INVENTORY_HOSTVARS',
'AWX_HOST',
'PROJECT_REVISION',
'SUPERVISOR_CONFIG_PATH',
)
)
# loggers that may be called in process of emitting a log
LOGGER_BLOCKLIST = (
'awx.main.utils.handlers',
'awx.main.utils.formatters',
'awx.main.utils.filters',
'awx.main.utils.encryption',
'awx.main.utils.log',
# loggers that may be called getting logging settings
'awx.conf',
# dispatcherd should only use 1 database connection
'dispatcherd',
)
# Reported version for node seen in receptor mesh but for which capacity check
# failed or is in progress
RECEPTOR_PENDING = 'ansible-runner-???'
# Naming pattern for AWX jobs in /tmp folder, like /tmp/awx_42_xiwm
# also update awxkit.api.pages.unified_jobs if changed
JOB_FOLDER_PREFIX = 'awx_%s_'
# :z option tells Podman that two containers share the volume content with r/w
# :O option tells Podman to mount the directory from the host as a temporary storage using the overlay file system.
# :ro or :rw option to mount a volume in read-only or read-write mode, respectively. By default, the volumes are mounted read-write.
# see podman-run manpage for further details
# /HOST-DIR:/CONTAINER-DIR:OPTIONS
CONTAINER_VOLUMES_MOUNT_TYPES = ['z', 'O', 'ro', 'rw']
MAX_ISOLATED_PATH_COLON_DELIMITER = 2
SURVEY_TYPE_MAPPING = {'text': str, 'textarea': str, 'password': str, 'multiplechoice': str, 'multiselect': str, 'integer': int, 'float': (float, int)}
JOB_VARIABLE_PREFIXES = [
'awx',
'tower',
]
# Note, the \u001b[... are ansi color codes. We don't currenly import any of the python modules which define the codes.
# Importing a library just for this message seemed like overkill
ANSIBLE_RUNNER_NEEDS_UPDATE_MESSAGE = (
'\u001b[31m \u001b[1m This can be caused if the version of ansible-runner in your execution environment is out of date.\u001b[0m'
)
# Values for setting SUBSCRIPTION_USAGE_MODEL
SUBSCRIPTION_USAGE_MODEL_UNIQUE_HOSTS = 'unique_managed_hosts'
# Shared prefetch to use for creating a queryset for the purpose of writing or saving facts
HOST_FACTS_FIELDS = ('name', 'ansible_facts', 'ansible_facts_modified', 'modified', 'inventory_id')
# Data for RBAC compatibility layer
role_name_to_perm_mapping = {
'adhoc_role': ['adhoc_'],
'approval_role': ['approve_'],
'auditor_role': ['audit_'],
'admin_role': ['change_', 'add_', 'delete_'],
'execute_role': ['execute_'],
'read_role': ['view_'],
'update_role': ['update_'],
'member_role': ['member_'],
'use_role': ['use_'],
}
org_role_to_permission = {
'notification_admin_role': 'add_notificationtemplate',
'project_admin_role': 'add_project',
'execute_role': 'execute_jobtemplate',
'inventory_admin_role': 'add_inventory',
'credential_admin_role': 'add_credential',
'workflow_admin_role': 'add_workflowjobtemplate',
'job_template_admin_role': 'change_jobtemplate', # TODO: this doesnt really work, solution not clear
'execution_environment_admin_role': 'add_executionenvironment',
'auditor_role': 'view_project', # TODO: also doesnt really work
}