Compare commits

..

3 Commits

Author SHA1 Message Date
jainnikhil30
e5635657d0 add the migration for changing the default capacity adjustment 2025-05-22 14:36:22 +05:30
jainnikhil30
df8d2740d7 set the memory value to 64 in test so that the overcall capacity is not a float value 2025-05-15 16:16:55 +05:30
jainnikhil30
c8efa82aca adjust the default capacity_adjustment to 0.75 2025-05-15 16:03:36 +05:30
82 changed files with 678 additions and 1624 deletions

View File

@@ -2,7 +2,7 @@
codecov:
notify:
after_n_builds: 9 # Number of test matrix+lint jobs uploading coverage
after_n_builds: 6 # Number of test matrix+lint jobs uploading coverage
wait_for_ci: false
require_ci_to_pass: false

View File

@@ -17,23 +17,6 @@ exclude_also =
[run]
branch = True
# NOTE: `disable_warnings` is needed when `pytest-cov` runs in tandem
# NOTE: with `pytest-xdist`. These warnings are false negative in this
# NOTE: context.
#
# NOTE: It's `coveragepy` that emits the warnings and previously they
# NOTE: wouldn't get on the radar of `pytest`'s `filterwarnings`
# NOTE: mechanism. This changed, however, with `pytest >= 8.4`. And
# NOTE: since we set `filterwarnings = error`, those warnings are being
# NOTE: raised as exceptions, cascading into `pytest`'s internals and
# NOTE: causing tracebacks and crashes of the test sessions.
#
# Ref:
# * https://github.com/pytest-dev/pytest-cov/issues/693
# * https://github.com/pytest-dev/pytest-cov/pull/695
# * https://github.com/pytest-dev/pytest-cov/pull/696
disable_warnings =
module-not-measured
omit =
awx/main/migrations/*
awx/settings/defaults.py

View File

@@ -335,7 +335,6 @@ jobs:
with:
name: coverage-${{ matrix.target-regex.name }}
path: ~/.ansible/collections/ansible_collections/awx/awx/tests/output/coverage/
retention-days: 1
- uses: ./.github/actions/upload_awx_devel_logs
if: always()
@@ -353,7 +352,6 @@ jobs:
steps:
- uses: actions/checkout@v4
with:
persist-credentials: false
show-progress: false
- uses: ./.github/actions/setup-python
@@ -363,12 +361,23 @@ jobs:
- name: Upgrade ansible-core
run: python3 -m pip install --upgrade ansible-core
- name: Download coverage artifacts
- name: Download coverage artifacts A to H
uses: actions/download-artifact@v4
with:
merge-multiple: true
name: coverage-a-h
path: coverage
- name: Download coverage artifacts I to P
uses: actions/download-artifact@v4
with:
name: coverage-i-p
path: coverage
- name: Download coverage artifacts Z to Z
uses: actions/download-artifact@v4
with:
name: coverage-r-z0-9
path: coverage
pattern: coverage-*
- name: Combine coverage
run: |
@@ -386,6 +395,46 @@ jobs:
echo '## AWX Collection Integration Coverage HTML' >> $GITHUB_STEP_SUMMARY
echo 'Download the HTML artifacts to view the coverage report.' >> $GITHUB_STEP_SUMMARY
# This is a huge hack, there's no official action for removing artifacts currently.
# Also ACTIONS_RUNTIME_URL and ACTIONS_RUNTIME_TOKEN aren't available in normal run
# steps, so we have to use github-script to get them.
#
# The advantage of doing this, though, is that we save on artifact storage space.
- name: Get secret artifact runtime URL
uses: actions/github-script@v6
id: get-runtime-url
with:
result-encoding: string
script: |
const { ACTIONS_RUNTIME_URL } = process.env;
return ACTIONS_RUNTIME_URL;
- name: Get secret artifact runtime token
uses: actions/github-script@v6
id: get-runtime-token
with:
result-encoding: string
script: |
const { ACTIONS_RUNTIME_TOKEN } = process.env;
return ACTIONS_RUNTIME_TOKEN;
- name: Remove intermediary artifacts
env:
ACTIONS_RUNTIME_URL: ${{ steps.get-runtime-url.outputs.result }}
ACTIONS_RUNTIME_TOKEN: ${{ steps.get-runtime-token.outputs.result }}
run: |
echo "::add-mask::${ACTIONS_RUNTIME_TOKEN}"
artifacts=$(
curl -H "Authorization: Bearer $ACTIONS_RUNTIME_TOKEN" \
${ACTIONS_RUNTIME_URL}_apis/pipelines/workflows/${{ github.run_id }}/artifacts?api-version=6.0-preview \
| jq -r '.value | .[] | select(.name | startswith("coverage-")) | .url'
)
for artifact in $artifacts; do
curl -i -X DELETE -H "Accept: application/json;api-version=6.0-preview" -H "Authorization: Bearer $ACTIONS_RUNTIME_TOKEN" "$artifact"
done
- name: Upload coverage report as artifact
uses: actions/upload-artifact@v4
with:

View File

@@ -19,12 +19,6 @@ COLLECTION_VERSION ?= $(shell $(PYTHON) tools/scripts/scm_version.py | cut -d .
COLLECTION_SANITY_ARGS ?= --docker
# collection unit testing directories
COLLECTION_TEST_DIRS ?= awx_collection/test/awx
# pytest added args to collect coverage
COVERAGE_ARGS ?= --cov --cov-report=xml --junitxml=reports/junit.xml
# pytest test directories
TEST_DIRS ?= awx/main/tests/unit awx/main/tests/functional awx/conf/tests
# pytest args to run tests in parallel
PARALLEL_TESTS ?= -n auto
# collection integration test directories (defaults to all)
COLLECTION_TEST_TARGET ?=
# args for collection install
@@ -315,14 +309,14 @@ black: reports
@chmod +x .git/hooks/pre-commit
genschema: reports
$(MAKE) swagger PYTEST_ADDOPTS="--genschema --create-db "
$(MAKE) swagger PYTEST_ARGS="--genschema --create-db "
mv swagger.json schema.json
swagger: reports
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
(set -o pipefail && py.test $(COVERAGE_ARGS) $(PARALLEL_TESTS) awx/conf/tests/functional awx/main/tests/functional/api awx/main/tests/docs | tee reports/$@.report)
(set -o pipefail && py.test --cov --cov-report=xml --junitxml=reports/junit.xml $(PYTEST_ARGS) awx/conf/tests/functional awx/main/tests/functional/api awx/main/tests/docs | tee reports/$@.report)
@if [ "${GITHUB_ACTIONS}" = "true" ]; \
then \
echo 'cov-report-files=reports/coverage.xml' >> "${GITHUB_OUTPUT}"; \
@@ -340,12 +334,14 @@ api-lint:
awx-link:
[ -d "/awx_devel/awx.egg-info" ] || $(PYTHON) /awx_devel/tools/scripts/egg_info_dev
TEST_DIRS ?= awx/main/tests/unit awx/main/tests/functional awx/conf/tests
PYTEST_ARGS ?= -n auto
## Run all API unit tests.
test:
if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
PYTHONDONTWRITEBYTECODE=1 py.test -p no:cacheprovider $(PARALLEL_TESTS) $(TEST_DIRS)
PYTHONDONTWRITEBYTECODE=1 py.test -p no:cacheprovider $(PYTEST_ARGS) $(TEST_DIRS)
cd awxkit && $(VENV_BASE)/awx/bin/tox -re py3
awx-manage check_migrations --dry-run --check -n 'missing_migration_file'
@@ -354,7 +350,7 @@ live_test:
## Run all API unit tests with coverage enabled.
test_coverage:
$(MAKE) test PYTEST_ADDOPTS="--create-db $(COVERAGE_ARGS)"
$(MAKE) test PYTEST_ARGS="--create-db --cov --cov-report=xml --junitxml=reports/junit.xml"
@if [ "${GITHUB_ACTIONS}" = "true" ]; \
then \
echo 'cov-report-files=awxkit/coverage.xml,reports/coverage.xml' >> "${GITHUB_OUTPUT}"; \
@@ -362,7 +358,7 @@ test_coverage:
fi
test_migrations:
PYTHONDONTWRITEBYTECODE=1 py.test -p no:cacheprovider --migrations -m migration_test --create-db $(PARALLEL_TESTS) $(COVERAGE_ARGS) $(TEST_DIRS)
PYTHONDONTWRITEBYTECODE=1 py.test -p no:cacheprovider --migrations -m migration_test --create-db --cov=awx --cov-report=xml --junitxml=reports/junit.xml $(PYTEST_ARGS) $(TEST_DIRS)
@if [ "${GITHUB_ACTIONS}" = "true" ]; \
then \
echo 'cov-report-files=reports/coverage.xml' >> "${GITHUB_OUTPUT}"; \
@@ -380,7 +376,7 @@ test_collection:
fi && \
if ! [ -x "$(shell command -v ansible-playbook)" ]; then pip install ansible-core; fi
ansible --version
py.test $(COLLECTION_TEST_DIRS) $(COVERAGE_ARGS) -v
py.test $(COLLECTION_TEST_DIRS) --cov --cov-report=xml --junitxml=reports/junit.xml -v
@if [ "${GITHUB_ACTIONS}" = "true" ]; \
then \
echo 'cov-report-files=reports/coverage.xml' >> "${GITHUB_OUTPUT}"; \

View File

@@ -46,6 +46,9 @@ from ansible_base.lib.utils.models import get_type_for_model
from ansible_base.rbac.models import RoleEvaluation, ObjectRole
from ansible_base.rbac import permission_registry
# django-flags
from flags.state import flag_enabled
# AWX
from awx.main.access import get_user_capabilities
from awx.main.constants import ACTIVE_STATES, org_role_to_permission
@@ -734,10 +737,13 @@ class EmptySerializer(serializers.Serializer):
pass
class OpaQueryPathMixin(serializers.Serializer):
class OpaQueryPathEnabledMixin(serializers.Serializer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not flag_enabled("FEATURE_POLICY_AS_CODE_ENABLED") and 'opa_query_path' in self.fields:
self.fields.pop('opa_query_path')
def validate_opa_query_path(self, value):
# Decode the URL and re-encode it
decoded_value = urllib.parse.unquote(value)
@@ -749,7 +755,7 @@ class OpaQueryPathMixin(serializers.Serializer):
return value
class UnifiedJobTemplateSerializer(BaseSerializer, OpaQueryPathMixin):
class UnifiedJobTemplateSerializer(BaseSerializer, OpaQueryPathEnabledMixin):
# As a base serializer, the capabilities prefetch is not used directly,
# instead they are derived from the Workflow Job Template Serializer and the Job Template Serializer, respectively.
capabilities_prefetch = []
@@ -1182,7 +1188,7 @@ class UserActivityStreamSerializer(UserSerializer):
fields = ('*', '-is_system_auditor')
class OrganizationSerializer(BaseSerializer, OpaQueryPathMixin):
class OrganizationSerializer(BaseSerializer, OpaQueryPathEnabledMixin):
show_capabilities = ['edit', 'delete']
class Meta:
@@ -1541,7 +1547,7 @@ class LabelsListMixin(object):
return res
class InventorySerializer(LabelsListMixin, BaseSerializerWithVariables, OpaQueryPathMixin):
class InventorySerializer(LabelsListMixin, BaseSerializerWithVariables, OpaQueryPathEnabledMixin):
show_capabilities = ['edit', 'delete', 'adhoc', 'copy']
capabilities_prefetch = ['admin', 'adhoc', {'copy': 'organization.inventory_admin'}]

View File

@@ -38,7 +38,6 @@ class SettingsRegistry(object):
if setting in self._registry:
raise ImproperlyConfigured('Setting "{}" is already registered.'.format(setting))
category = kwargs.setdefault('category', None)
kwargs.setdefault('required', False) # No setting is ordinarily required
category_slug = kwargs.setdefault('category_slug', slugify(category or '') or None)
if category_slug in {'all', 'changed', 'user-defaults'}:
raise ImproperlyConfigured('"{}" is a reserved category slug.'.format(category_slug))

View File

@@ -3,13 +3,13 @@ import logging
# AWX
from awx.main.analytics.subsystem_metrics import DispatcherMetrics, CallbackReceiverMetrics
from awx.main.dispatch.publish import task as task_awx
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename
logger = logging.getLogger('awx.main.scheduler')
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def send_subsystem_metrics():
DispatcherMetrics().send_metrics()
CallbackReceiverMetrics().send_metrics()

View File

@@ -142,7 +142,7 @@ def config(since, **kwargs):
return {
'platform': {
'system': platform.system(),
'dist': (distro.name(), distro.version(), distro.codename()),
'dist': distro.linux_distribution(),
'release': platform.release(),
'type': install_type,
},

View File

@@ -324,10 +324,10 @@ def gather(dest=None, module=None, subset=None, since=None, until=None, collecti
settings.AUTOMATION_ANALYTICS_LAST_ENTRIES = json.dumps(last_entries, cls=DjangoJSONEncoder)
if collection_type != 'dry-run':
for fpath in tarfiles:
if os.path.exists(fpath):
os.remove(fpath)
if succeeded:
for fpath in tarfiles:
if os.path.exists(fpath):
os.remove(fpath)
with disable_activity_stream():
if not settings.AUTOMATION_ANALYTICS_LAST_GATHER or until > settings.AUTOMATION_ANALYTICS_LAST_GATHER:
# `AUTOMATION_ANALYTICS_LAST_GATHER` is set whether collection succeeds or fails;

View File

@@ -128,7 +128,6 @@ def metrics():
registry=REGISTRY,
)
LICENSE_EXPIRY = Gauge('awx_license_expiry', 'Time before license expires', registry=REGISTRY)
LICENSE_INSTANCE_TOTAL = Gauge('awx_license_instance_total', 'Total number of managed hosts provided by your license', registry=REGISTRY)
LICENSE_INSTANCE_FREE = Gauge('awx_license_instance_free', 'Number of remaining managed hosts provided by your license', registry=REGISTRY)
@@ -149,7 +148,6 @@ def metrics():
}
)
LICENSE_EXPIRY.set(str(license_info.get('time_remaining', 0)))
LICENSE_INSTANCE_TOTAL.set(str(license_info.get('instance_count', 0)))
LICENSE_INSTANCE_FREE.set(str(license_info.get('free_instances', 0)))

View File

@@ -1,9 +1,6 @@
import os
from dispatcherd.config import setup as dispatcher_setup
from django.apps import AppConfig
from django.db import connection
from django.utils.translation import gettext_lazy as _
from awx.main.utils.common import bypass_in_test, load_all_entry_points_for
from awx.main.utils.migration import is_database_synchronized
@@ -79,28 +76,9 @@ class MainConfig(AppConfig):
cls = entry_point.load()
InventorySourceOptions.injectors[entry_point_name] = cls
def configure_dispatcherd(self):
"""This implements the default configuration for dispatcherd
If running the tasking service like awx-manage run_dispatcher,
some additional config will be applied on top of this.
This configuration provides the minimum such that code can submit
tasks to pg_notify to run those tasks.
"""
from awx.main.dispatch.config import get_dispatcherd_config
if connection.vendor != 'postgresql':
config_dict = get_dispatcherd_config(mock_publish=True)
else:
config_dict = get_dispatcherd_config()
dispatcher_setup(config_dict)
def ready(self):
super().ready()
self.configure_dispatcherd()
"""
Credential loading triggers database operations. There are cases we want to call
awx-manage collectstatic without a database. All management commands invoke the ready() code

View File

@@ -4,6 +4,7 @@ import logging
# Django
from django.core.checks import Error
from django.utils.translation import gettext_lazy as _
from django.conf import settings
# Django REST Framework
from rest_framework import serializers
@@ -91,6 +92,7 @@ register(
),
category=_('System'),
category_slug='system',
required=False,
)
register(
@@ -237,6 +239,7 @@ register(
help_text=_('List of modules allowed to be used by ad-hoc jobs.'),
category=_('Jobs'),
category_slug='jobs',
required=False,
)
register(
@@ -247,6 +250,7 @@ register(
('never', _('Never')),
('template', _('Only On Job Template Definitions')),
],
required=True,
label=_('When can extra variables contain Jinja templates?'),
help_text=_(
'Ansible allows variable substitution via the Jinja2 templating '
@@ -271,6 +275,7 @@ register(
register(
'AWX_ISOLATION_SHOW_PATHS',
field_class=fields.StringListIsolatedPathField,
required=False,
label=_('Paths to expose to isolated jobs'),
help_text=_(
'List of paths that would otherwise be hidden to expose to isolated jobs. Enter one path per line. '
@@ -436,6 +441,7 @@ register(
register(
'AWX_ANSIBLE_CALLBACK_PLUGINS',
field_class=fields.StringListField,
required=False,
label=_('Ansible Callback Plugins'),
help_text=_('List of paths to search for extra callback plugins to be used when running jobs. Enter one path per line.'),
category=_('Jobs'),
@@ -549,6 +555,7 @@ register(
help_text=_('Port on Logging Aggregator to send logs to (if required and not provided in Logging Aggregator).'),
category=_('Logging'),
category_slug='logging',
required=False,
)
register(
'LOG_AGGREGATOR_TYPE',
@@ -570,6 +577,7 @@ register(
help_text=_('Username for external log aggregator (if required; HTTP/s only).'),
category=_('Logging'),
category_slug='logging',
required=False,
)
register(
'LOG_AGGREGATOR_PASSWORD',
@@ -581,6 +589,7 @@ register(
help_text=_('Password or authentication token for external log aggregator (if required; HTTP/s only).'),
category=_('Logging'),
category_slug='logging',
required=False,
)
register(
'LOG_AGGREGATOR_LOGGERS',
@@ -767,6 +776,7 @@ register(
allow_null=True,
category=_('System'),
category_slug='system',
required=False,
hidden=True,
)
register(
@@ -974,122 +984,123 @@ def csrf_trusted_origins_validate(serializer, attrs):
register_validate('system', csrf_trusted_origins_validate)
register(
'OPA_HOST',
field_class=fields.CharField,
label=_('OPA server hostname'),
default='',
help_text=_('The hostname used to connect to the OPA server. If empty, policy enforcement will be disabled.'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
)
if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to AppRegistryNotReady error
register(
'OPA_HOST',
field_class=fields.CharField,
label=_('OPA server hostname'),
default='',
help_text=_('The hostname used to connect to the OPA server. If empty, policy enforcement will be disabled.'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
)
register(
'OPA_PORT',
field_class=fields.IntegerField,
label=_('OPA server port'),
default=8181,
help_text=_('The port used to connect to the OPA server. Defaults to 8181.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_PORT',
field_class=fields.IntegerField,
label=_('OPA server port'),
default=8181,
help_text=_('The port used to connect to the OPA server. Defaults to 8181.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_SSL',
field_class=fields.BooleanField,
label=_('Use SSL for OPA connection'),
default=False,
help_text=_('Enable or disable the use of SSL to connect to the OPA server. Defaults to false.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_SSL',
field_class=fields.BooleanField,
label=_('Use SSL for OPA connection'),
default=False,
help_text=_('Enable or disable the use of SSL to connect to the OPA server. Defaults to false.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_AUTH_TYPE',
field_class=fields.ChoiceField,
label=_('OPA authentication type'),
choices=[OPA_AUTH_TYPES.NONE, OPA_AUTH_TYPES.TOKEN, OPA_AUTH_TYPES.CERTIFICATE],
default=OPA_AUTH_TYPES.NONE,
help_text=_('The authentication type that will be used to connect to the OPA server: "None", "Token", or "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_AUTH_TYPE',
field_class=fields.ChoiceField,
label=_('OPA authentication type'),
choices=[OPA_AUTH_TYPES.NONE, OPA_AUTH_TYPES.TOKEN, OPA_AUTH_TYPES.CERTIFICATE],
default=OPA_AUTH_TYPES.NONE,
help_text=_('The authentication type that will be used to connect to the OPA server: "None", "Token", or "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_AUTH_TOKEN',
field_class=fields.CharField,
label=_('OPA authentication token'),
default='',
help_text=_(
'The token for authentication to the OPA server. Required when OPA_AUTH_TYPE is "Token". If an authorization header is defined in OPA_AUTH_CUSTOM_HEADERS, it will be overridden by OPA_AUTH_TOKEN.'
),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
encrypted=True,
)
register(
'OPA_AUTH_TOKEN',
field_class=fields.CharField,
label=_('OPA authentication token'),
default='',
help_text=_(
'The token for authentication to the OPA server. Required when OPA_AUTH_TYPE is "Token". If an authorization header is defined in OPA_AUTH_CUSTOM_HEADERS, it will be overridden by OPA_AUTH_TOKEN.'
),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
encrypted=True,
)
register(
'OPA_AUTH_CLIENT_CERT',
field_class=fields.CharField,
label=_('OPA client certificate content'),
default='',
help_text=_('The content of the client certificate file for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
)
register(
'OPA_AUTH_CLIENT_CERT',
field_class=fields.CharField,
label=_('OPA client certificate content'),
default='',
help_text=_('The content of the client certificate file for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
)
register(
'OPA_AUTH_CLIENT_KEY',
field_class=fields.CharField,
label=_('OPA client key content'),
default='',
help_text=_('The content of the client key for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
encrypted=True,
)
register(
'OPA_AUTH_CLIENT_KEY',
field_class=fields.CharField,
label=_('OPA client key content'),
default='',
help_text=_('The content of the client key for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
encrypted=True,
)
register(
'OPA_AUTH_CA_CERT',
field_class=fields.CharField,
label=_('OPA CA certificate content'),
default='',
help_text=_('The content of the CA certificate for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
)
register(
'OPA_AUTH_CA_CERT',
field_class=fields.CharField,
label=_('OPA CA certificate content'),
default='',
help_text=_('The content of the CA certificate for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
)
register(
'OPA_AUTH_CUSTOM_HEADERS',
field_class=fields.DictField,
label=_('OPA custom authentication headers'),
default={},
help_text=_('Optional custom headers included in requests to the OPA server. Defaults to empty dictionary ({}).'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_AUTH_CUSTOM_HEADERS',
field_class=fields.DictField,
label=_('OPA custom authentication headers'),
default={},
help_text=_('Optional custom headers included in requests to the OPA server. Defaults to empty dictionary ({}).'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_REQUEST_TIMEOUT',
field_class=fields.FloatField,
label=_('OPA request timeout'),
default=1.5,
help_text=_('The number of seconds after which the connection to the OPA server will time out. Defaults to 1.5 seconds.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_REQUEST_TIMEOUT',
field_class=fields.FloatField,
label=_('OPA request timeout'),
default=1.5,
help_text=_('The number of seconds after which the connection to the OPA server will time out. Defaults to 1.5 seconds.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_REQUEST_RETRIES',
field_class=fields.IntegerField,
label=_('OPA request retry count'),
default=2,
help_text=_('The number of retry attempts for connecting to the OPA server. Default is 2.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_REQUEST_RETRIES',
field_class=fields.IntegerField,
label=_('OPA request retry count'),
default=2,
help_text=_('The number of retry attempts for connecting to the OPA server. Default is 2.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)

View File

@@ -77,8 +77,6 @@ LOGGER_BLOCKLIST = (
'awx.main.utils.log',
# loggers that may be called getting logging settings
'awx.conf',
# dispatcherd should only use 1 database connection
'dispatcherd',
)
# Reported version for node seen in receptor mesh but for which capacity check

View File

@@ -1,53 +0,0 @@
from django.conf import settings
from ansible_base.lib.utils.db import get_pg_notify_params
from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.pool import get_auto_max_workers
def get_dispatcherd_config(for_service: bool = False, mock_publish: bool = False) -> dict:
"""Return a dictionary config for dispatcherd
Parameters:
for_service: if True, include dynamic options needed for running the dispatcher service
this will require database access, you should delay evaluation until after app setup
"""
config = {
"version": 2,
"service": {
"pool_kwargs": {
"min_workers": settings.JOB_EVENT_WORKERS,
"max_workers": get_auto_max_workers(),
},
"main_kwargs": {"node_id": settings.CLUSTER_HOST_ID},
"process_manager_cls": "ForkServerManager",
"process_manager_kwargs": {"preload_modules": ['awx.main.dispatch.hazmat']},
},
"brokers": {
"socket": {"socket_path": settings.DISPATCHERD_DEBUGGING_SOCKFILE},
},
"publish": {"default_control_broker": "socket"},
"worker": {"worker_cls": "awx.main.dispatch.worker.dispatcherd.AWXTaskWorker"},
}
if mock_publish:
config["brokers"]["noop"] = {}
config["publish"]["default_broker"] = "noop"
else:
config["brokers"]["pg_notify"] = {
"config": get_pg_notify_params(),
"sync_connection_factory": "ansible_base.lib.utils.db.psycopg_connection_from_django",
"default_publish_channel": settings.CLUSTER_HOST_ID, # used for debugging commands
}
config["publish"]["default_broker"] = "pg_notify"
if for_service:
config["producers"] = {
"ScheduledProducer": {"task_schedule": settings.DISPATCHER_SCHEDULE},
"OnStartProducer": {"task_list": {"awx.main.tasks.system.dispatch_startup": {}}},
"ControlProducer": {},
}
config["brokers"]["pg_notify"]["channels"] = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
return config

View File

@@ -1,36 +0,0 @@
import django
# dispatcherd publisher logic is likely to be used, but needs manual preload
from dispatcherd.brokers import pg_notify # noqa
# Cache may not be initialized until we are in the worker, so preload here
from channels_redis import core # noqa
from awx import prepare_env
from dispatcherd.utils import resolve_callable
prepare_env()
django.setup() # noqa
from django.conf import settings
# Preload all periodic tasks so their imports will be in shared memory
for name, options in settings.CELERYBEAT_SCHEDULE.items():
resolve_callable(options['task'])
# Preload in-line import from tasks
from awx.main.scheduler.kubernetes import PodManager # noqa
from django.core.cache import cache as django_cache
from django.db import connection
connection.close()
django_cache.close()

View File

@@ -37,9 +37,6 @@ else:
logger = logging.getLogger('awx.main.dispatch')
RETIRED_SENTINEL_TASK = "[retired]"
class NoOpResultQueue(object):
def put(self, item):
pass
@@ -84,17 +81,11 @@ class PoolWorker(object):
self.queue = MPQueue(queue_size)
self.process = Process(target=target, args=(self.queue, self.finished) + args)
self.process.daemon = True
self.creation_time = time.monotonic()
self.retiring = False
def start(self):
self.process.start()
def put(self, body):
if self.retiring:
uuid = body.get('uuid', 'N/A') if isinstance(body, dict) else 'N/A'
logger.info(f"Worker pid:{self.pid} is retiring. Refusing new task {uuid}.")
raise QueueFull("Worker is retiring and not accepting new tasks") # AutoscalePool.write handles QueueFull
uuid = '?'
if isinstance(body, dict):
if not body.get('uuid'):
@@ -113,11 +104,6 @@ class PoolWorker(object):
"""
self.queue.put('QUIT')
@property
def age(self):
"""Returns the current age of the worker in seconds."""
return time.monotonic() - self.creation_time
@property
def pid(self):
return self.process.pid
@@ -164,8 +150,6 @@ class PoolWorker(object):
# the purpose of self.managed_tasks is to just track internal
# state of which events are *currently* being processed.
logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid))
if self.retiring:
self.managed_tasks[RETIRED_SENTINEL_TASK] = {'task': RETIRED_SENTINEL_TASK}
@property
def current_task(self):
@@ -281,8 +265,6 @@ class WorkerPool(object):
'{% for w in workers %}'
'. worker[pid:{{ w.pid }}]{% if not w.alive %} GONE exit={{ w.exitcode }}{% endif %}'
' sent={{ w.messages_sent }}'
' age={{ "%.0f"|format(w.age) }}s'
' retiring={{ w.retiring }}'
'{% if w.messages_finished %} finished={{ w.messages_finished }}{% endif %}'
' qsize={{ w.managed_tasks|length }}'
' rss={{ w.mb }}MB'
@@ -374,9 +356,6 @@ class AutoscalePool(WorkerPool):
def __init__(self, *args, **kwargs):
self.max_workers = kwargs.pop('max_workers', None)
self.max_worker_lifetime_seconds = kwargs.pop(
'max_worker_lifetime_seconds', getattr(settings, 'WORKER_MAX_LIFETIME_SECONDS', 14400)
) # Default to 4 hours
super(AutoscalePool, self).__init__(*args, **kwargs)
if self.max_workers is None:
@@ -436,7 +415,6 @@ class AutoscalePool(WorkerPool):
"""
orphaned = []
for w in self.workers[::]:
is_retirement_age = self.max_worker_lifetime_seconds is not None and w.age > self.max_worker_lifetime_seconds
if not w.alive:
# the worker process has exited
# 1. take the task it was running and enqueue the error
@@ -445,10 +423,6 @@ class AutoscalePool(WorkerPool):
# send them to another worker
logger.error('worker pid:{} is gone (exit={})'.format(w.pid, w.exitcode))
if w.current_task:
if w.current_task == {'task': RETIRED_SENTINEL_TASK}:
logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age))
self.workers.remove(w)
continue
if w.current_task != 'QUIT':
try:
for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']):
@@ -459,7 +433,6 @@ class AutoscalePool(WorkerPool):
logger.warning(f'Worker was told to quit but has not, pid={w.pid}')
orphaned.extend(w.orphaned_tasks)
self.workers.remove(w)
elif w.idle and len(self.workers) > self.min_workers:
# the process has an empty queue (it's idle) and we have
# more processes in the pool than we need (> min)
@@ -468,22 +441,6 @@ class AutoscalePool(WorkerPool):
logger.debug('scaling down worker pid:{}'.format(w.pid))
w.quit()
self.workers.remove(w)
elif w.idle and is_retirement_age:
logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age))
w.quit()
self.workers.remove(w)
elif is_retirement_age and not w.retiring and not w.idle:
logger.info(
f"Worker pid:{w.pid} (age: {w.age:.0f}s) exceeded max lifetime ({self.max_worker_lifetime_seconds:.0f}s). "
"Signaling for graceful retirement."
)
# Send QUIT signal; worker will finish current task then exit.
w.quit()
# mark as retiring to reject any future tasks that might be assigned in meantime
w.retiring = True
if w.alive:
# if we discover a task manager invocation that's been running
# too long, reap it (because otherwise it'll just hold the postgres

View File

@@ -4,9 +4,6 @@ import json
import time
from uuid import uuid4
from dispatcherd.publish import submit_task
from dispatcherd.utils import resolve_callable
from django_guid import get_guid
from django.conf import settings
@@ -96,19 +93,6 @@ class task:
@classmethod
def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
try:
from flags.state import flag_enabled
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
# At this point we have the import string, and submit_task wants the method, so back to that
actual_task = resolve_callable(cls.name)
return submit_task(actual_task, args=args, kwargs=kwargs, queue=queue, uuid=uuid, **kw)
except Exception:
logger.exception(f"[DISPATCHER] Failed to check for alternative dispatcherd implementation for {cls.name}")
# Continue with original implementation if anything fails
pass
# Original implementation follows
queue = queue or getattr(cls.queue, 'im_func', cls.queue)
if not queue:
msg = f'{cls.name}: Queue value required and may not be None'

View File

@@ -1,14 +0,0 @@
from dispatcherd.worker.task import TaskWorker
from django.db import connection
class AWXTaskWorker(TaskWorker):
def on_start(self) -> None:
"""Get worker connected so that first task it gets will be worked quickly"""
connection.ensure_connection()
def pre_task(self, message) -> None:
"""This should remedy bad connections that can not fix themselves"""
connection.close_if_unusable_or_obsolete()

View File

@@ -2,21 +2,13 @@
# All Rights Reserved.
import logging
import yaml
import os
import redis
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from flags.state import flag_enabled
from dispatcherd.factories import get_control_from_settings
from dispatcherd import run_service
from dispatcherd.config import setup as dispatcher_setup
from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.config import get_dispatcherd_config
from awx.main.dispatch.control import Control
from awx.main.dispatch.pool import AutoscalePool
from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker
@@ -48,44 +40,18 @@ class Command(BaseCommand):
),
)
def verify_dispatcherd_socket(self):
if not os.path.exists(settings.DISPATCHERD_DEBUGGING_SOCKFILE):
raise CommandError('Dispatcher is not running locally')
def handle(self, *arg, **options):
if options.get('status'):
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
ctl = get_control_from_settings()
running_data = ctl.control_with_reply('status')
if len(running_data) != 1:
raise CommandError('Did not receive expected number of replies')
print(yaml.dump(running_data[0], default_flow_style=False))
return
else:
print(Control('dispatcher').status())
return
print(Control('dispatcher').status())
return
if options.get('schedule'):
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
print('NOT YET IMPLEMENTED')
return
else:
print(Control('dispatcher').schedule())
print(Control('dispatcher').schedule())
return
if options.get('running'):
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
ctl = get_control_from_settings()
running_data = ctl.control_with_reply('running')
print(yaml.dump(running_data, default_flow_style=False))
return
else:
print(Control('dispatcher').running())
return
print(Control('dispatcher').running())
return
if options.get('reload'):
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
print('NOT YET IMPLEMENTED')
return
else:
return Control('dispatcher').control({'control': 'reload'})
return Control('dispatcher').control({'control': 'reload'})
if options.get('cancel'):
cancel_str = options.get('cancel')
try:
@@ -94,36 +60,21 @@ class Command(BaseCommand):
cancel_data = [cancel_str]
if not isinstance(cancel_data, list):
cancel_data = [cancel_str]
print(Control('dispatcher').cancel(cancel_data))
return
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
ctl = get_control_from_settings()
results = []
for task_id in cancel_data:
# For each task UUID, send an individual cancel command
result = ctl.control_with_reply('cancel', data={'uuid': task_id})
results.append(result)
print(yaml.dump(results, default_flow_style=False))
return
else:
print(Control('dispatcher').cancel(cancel_data))
return
consumer = None
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
dispatcher_setup(get_dispatcherd_config(for_service=True))
run_service()
else:
consumer = None
try:
DispatcherMetricsServer().start()
except redis.exceptions.ConnectionError as exc:
raise CommandError(f'Dispatcher could not connect to redis, error: {exc}')
try:
DispatcherMetricsServer().start()
except redis.exceptions.ConnectionError as exc:
raise CommandError(f'Dispatcher could not connect to redis, error: {exc}')
try:
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
consumer.run()
except KeyboardInterrupt:
logger.debug('Terminating Task Dispatcher')
if consumer:
consumer.stop()
try:
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
consumer.run()
except KeyboardInterrupt:
logger.debug('Terminating Task Dispatcher')
if consumer:
consumer.stop()

View File

@@ -1,3 +1,5 @@
# Generated by Django 4.2.10 on 2024-09-16 10:22
from django.db import migrations
@@ -6,4 +8,8 @@ class Migration(migrations.Migration):
('main', '0200_template_name_constraint'),
]
operations = []
operations = [
migrations.DeleteModel(
name='Profile',
),
]

View File

@@ -0,0 +1,26 @@
# Generated by Django 4.2.10 on 2024-09-16 15:21
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('main', '0201_delete_profile'),
]
operations = [
# delete all sso application migrations
migrations.RunSQL("DELETE FROM django_migrations WHERE app = 'sso';"),
# delete all sso application content group permissions
migrations.RunSQL(
"DELETE FROM auth_group_permissions "
"WHERE permission_id IN "
"(SELECT id FROM auth_permission WHERE content_type_id in (SELECT id FROM django_content_type WHERE app_label = 'sso'));"
),
# delete all sso application content permissions
migrations.RunSQL("DELETE FROM auth_permission " "WHERE content_type_id IN (SELECT id FROM django_content_type WHERE app_label = 'sso');"),
# delete sso application content type
migrations.RunSQL("DELETE FROM django_content_type WHERE app_label = 'sso';"),
# drop sso application created table
migrations.RunSQL("DROP TABLE IF EXISTS sso_userenterpriseauth;"),
]

View File

@@ -1,100 +0,0 @@
# Generated by Django 4.2.10 on 2024-09-16 10:22
from django.db import migrations, models
from awx.main.migrations._create_system_jobs import delete_clear_tokens_sjt
class Migration(migrations.Migration):
dependencies = [
('main', '0201_create_managed_creds'),
]
operations = [
migrations.DeleteModel(
name='Profile',
),
# Remove SSO app content
# delete all sso application migrations
migrations.RunSQL("DELETE FROM django_migrations WHERE app = 'sso';"),
# delete all sso application content group permissions
migrations.RunSQL(
"DELETE FROM auth_group_permissions "
"WHERE permission_id IN "
"(SELECT id FROM auth_permission WHERE content_type_id in (SELECT id FROM django_content_type WHERE app_label = 'sso'));"
),
# delete all sso application content permissions
migrations.RunSQL("DELETE FROM auth_permission " "WHERE content_type_id IN (SELECT id FROM django_content_type WHERE app_label = 'sso');"),
# delete sso application content type
migrations.RunSQL("DELETE FROM django_content_type WHERE app_label = 'sso';"),
# drop sso application created table
migrations.RunSQL("DROP TABLE IF EXISTS sso_userenterpriseauth;"),
# Alter inventory source source field
migrations.AlterField(
model_name='inventorysource',
name='source',
field=models.CharField(default=None, max_length=32),
),
migrations.AlterField(
model_name='inventoryupdate',
name='source',
field=models.CharField(default=None, max_length=32),
),
# Alter OAuth2Application unique together
migrations.AlterUniqueTogether(
name='oauth2application',
unique_together=None,
),
migrations.RemoveField(
model_name='oauth2application',
name='organization',
),
migrations.RemoveField(
model_name='oauth2application',
name='user',
),
migrations.RemoveField(
model_name='activitystream',
name='o_auth2_access_token',
),
migrations.RemoveField(
model_name='activitystream',
name='o_auth2_application',
),
migrations.DeleteModel(
name='OAuth2AccessToken',
),
migrations.DeleteModel(
name='OAuth2Application',
),
# Delete system token cleanup jobs, because tokens were deleted
migrations.RunPython(delete_clear_tokens_sjt, migrations.RunPython.noop),
migrations.AlterField(
model_name='systemjob',
name='job_type',
field=models.CharField(
blank=True,
choices=[
('cleanup_jobs', 'Remove jobs older than a certain number of days'),
('cleanup_activitystream', 'Remove activity stream entries older than a certain number of days'),
('cleanup_sessions', 'Removes expired browser sessions from the database'),
],
default='',
max_length=32,
),
),
migrations.AlterField(
model_name='systemjobtemplate',
name='job_type',
field=models.CharField(
blank=True,
choices=[
('cleanup_jobs', 'Remove jobs older than a certain number of days'),
('cleanup_activitystream', 'Remove activity stream entries older than a certain number of days'),
('cleanup_sessions', 'Removes expired browser sessions from the database'),
],
default='',
max_length=32,
),
),
]

View File

@@ -0,0 +1,23 @@
# Generated by Django 4.2.10 on 2024-10-22 15:58
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0202_remove_sso_app_content'),
]
operations = [
migrations.AlterField(
model_name='inventorysource',
name='source',
field=models.CharField(default=None, max_length=32),
),
migrations.AlterField(
model_name='inventoryupdate',
name='source',
field=models.CharField(default=None, max_length=32),
),
]

View File

@@ -0,0 +1,39 @@
# Generated by Django 4.2.10 on 2024-10-24 14:06
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('main', '0203_alter_inventorysource_source_and_more'),
]
operations = [
migrations.AlterUniqueTogether(
name='oauth2application',
unique_together=None,
),
migrations.RemoveField(
model_name='oauth2application',
name='organization',
),
migrations.RemoveField(
model_name='oauth2application',
name='user',
),
migrations.RemoveField(
model_name='activitystream',
name='o_auth2_access_token',
),
migrations.RemoveField(
model_name='activitystream',
name='o_auth2_application',
),
migrations.DeleteModel(
name='OAuth2AccessToken',
),
migrations.DeleteModel(
name='OAuth2Application',
),
]

View File

@@ -0,0 +1,44 @@
# Generated by Django 4.2.16 on 2024-12-18 16:05
from django.db import migrations, models
from awx.main.migrations._create_system_jobs import delete_clear_tokens_sjt
class Migration(migrations.Migration):
dependencies = [
('main', '0204_alter_oauth2application_unique_together_and_more'),
]
operations = [
migrations.RunPython(delete_clear_tokens_sjt, migrations.RunPython.noop),
migrations.AlterField(
model_name='systemjob',
name='job_type',
field=models.CharField(
blank=True,
choices=[
('cleanup_jobs', 'Remove jobs older than a certain number of days'),
('cleanup_activitystream', 'Remove activity stream entries older than a certain number of days'),
('cleanup_sessions', 'Removes expired browser sessions from the database'),
],
default='',
max_length=32,
),
),
migrations.AlterField(
model_name='systemjobtemplate',
name='job_type',
field=models.CharField(
blank=True,
choices=[
('cleanup_jobs', 'Remove jobs older than a certain number of days'),
('cleanup_activitystream', 'Remove activity stream entries older than a certain number of days'),
('cleanup_sessions', 'Removes expired browser sessions from the database'),
],
default='',
max_length=32,
),
),
]

View File

@@ -0,0 +1,22 @@
# Generated by Django 4.2.20 on 2025-05-22 08:57
from decimal import Decimal
import django.core.validators
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0205_delete_token_cleanup_job'),
]
operations = [
migrations.AlterField(
model_name='instance',
name='capacity_adjustment',
field=models.DecimalField(
decimal_places=2, default=Decimal('0.75'), max_digits=3, validators=[django.core.validators.MinValueValidator(Decimal('0'))]
),
),
]

View File

@@ -160,7 +160,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
default=100,
editable=False,
)
capacity_adjustment = models.DecimalField(default=Decimal(1.0), max_digits=3, decimal_places=2, validators=[MinValueValidator(Decimal(0.0))])
capacity_adjustment = models.DecimalField(default=Decimal(0.75), max_digits=3, decimal_places=2, validators=[MinValueValidator(Decimal(0.0))])
enabled = models.BooleanField(default=True)
managed_by_policy = models.BooleanField(default=True)

View File

@@ -24,7 +24,6 @@ from django.utils.translation import gettext_lazy as _
from django.utils.timezone import now
from django.utils.encoding import smart_str
from django.contrib.contenttypes.models import ContentType
from flags.state import flag_enabled
# REST Framework
from rest_framework.exceptions import ParseError
@@ -1370,30 +1369,7 @@ class UnifiedJob(
traceback=self.result_traceback,
)
def get_start_kwargs(self):
needed = self.get_passwords_needed_to_start()
decrypted_start_args = decrypt_field(self, 'start_args')
if not decrypted_start_args or decrypted_start_args == '{}':
return None
try:
start_args = json.loads(decrypted_start_args)
except Exception:
logger.exception(f'Unexpected malformed start_args on unified_job={self.id}')
return None
opts = dict([(field, start_args.get(field, '')) for field in needed])
if not all(opts.values()):
missing_fields = ', '.join([k for k, v in opts.items() if not v])
self.job_explanation = u'Missing needed fields: %s.' % missing_fields
self.save(update_fields=['job_explanation'])
return opts
def pre_start(self):
def pre_start(self, **kwargs):
if not self.can_start:
self.job_explanation = u'%s is not in a startable state: %s, expecting one of %s' % (self._meta.verbose_name, self.status, str(('new', 'waiting')))
self.save(update_fields=['job_explanation'])
@@ -1414,11 +1390,26 @@ class UnifiedJob(
self.save(update_fields=['job_explanation'])
return (False, None)
opts = self.get_start_kwargs()
needed = self.get_passwords_needed_to_start()
try:
start_args = json.loads(decrypt_field(self, 'start_args'))
except Exception:
start_args = None
if opts and (not all(opts.values())):
if start_args in (None, ''):
start_args = kwargs
opts = dict([(field, start_args.get(field, '')) for field in needed])
if not all(opts.values()):
missing_fields = ', '.join([k for k, v in opts.items() if not v])
self.job_explanation = u'Missing needed fields: %s.' % missing_fields
self.save(update_fields=['job_explanation'])
return (False, None)
if 'extra_vars' in kwargs:
self.handle_extra_data(kwargs['extra_vars'])
# remove any job_explanations that may have been set while job was in pending
if self.job_explanation != "":
self.job_explanation = ""
@@ -1479,44 +1470,21 @@ class UnifiedJob(
def cancel_dispatcher_process(self):
"""Returns True if dispatcher running this job acknowledged request and sent SIGTERM"""
if not self.celery_task_id:
return False
return
canceled = []
# Special case for task manager (used during workflow job cancellation)
if not connection.get_autocommit():
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
try:
from dispatcherd.factories import get_control_from_settings
ctl = get_control_from_settings()
ctl.control('cancel', data={'uuid': self.celery_task_id})
except Exception:
logger.exception("Error sending cancel command to new dispatcher")
else:
try:
ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id], with_reply=False)
except Exception:
logger.exception("Error sending cancel command to legacy dispatcher")
# this condition is purpose-written for the task manager, when it cancels jobs in workflows
ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id], with_reply=False)
return True # task manager itself needs to act under assumption that cancel was received
# Standard case with reply
try:
# Use control and reply mechanism to cancel and obtain confirmation
timeout = 5
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
from dispatcherd.factories import get_control_from_settings
ctl = get_control_from_settings()
results = ctl.control_with_reply('cancel', data={'uuid': self.celery_task_id}, expected_replies=1, timeout=timeout)
# Check if cancel was successful by checking if we got any results
return bool(results and len(results) > 0)
else:
# Original implementation
canceled = ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id])
canceled = ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id])
except socket.timeout:
logger.error(f'could not reach dispatcher on {self.controller_node} within {timeout}s')
except Exception:
logger.exception("error encountered when checking task status")
return bool(self.celery_task_id in canceled) # True or False, whether confirmation was obtained
def cancel(self, job_explanation=None, is_chain=False):

View File

@@ -19,9 +19,6 @@ from django.utils.timezone import now as tz_now
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
# django-flags
from flags.state import flag_enabled
from ansible_base.lib.utils.models import get_type_for_model
# django-ansible-base
@@ -51,7 +48,6 @@ from awx.main.signals import disable_activity_stream
from awx.main.constants import ACTIVE_STATES
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.task_manager_models import TaskManagerModels
from awx.main.tasks.jobs import dispatch_waiting_jobs
import awx.main.analytics.subsystem_metrics as s_metrics
from awx.main.utils import decrypt_field
@@ -435,7 +431,6 @@ class TaskManager(TaskBase):
# 5 minutes to start pending jobs. If this limit is reached, pending jobs
# will no longer be started and will be started on the next task manager cycle.
self.time_delta_job_explanation = timedelta(seconds=30)
self.control_nodes_to_notify: set[str] = set()
super().__init__(prefix="task_manager")
def after_lock_init(self):
@@ -524,19 +519,16 @@ class TaskManager(TaskBase):
task.save()
task.log_lifecycle("waiting")
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
self.control_nodes_to_notify.add(task.get_queue_name())
else:
# apply_async does a NOTIFY to the channel dispatcher is listening to
# postgres will treat this as part of the transaction, which is what we want
if task.status != 'failed' and type(task) is not WorkflowJob:
task_cls = task._get_task_class()
task_cls.apply_async(
[task.pk],
opts,
queue=task.get_queue_name(),
uuid=task.celery_task_id,
)
# apply_async does a NOTIFY to the channel dispatcher is listening to
# postgres will treat this as part of the transaction, which is what we want
if task.status != 'failed' and type(task) is not WorkflowJob:
task_cls = task._get_task_class()
task_cls.apply_async(
[task.pk],
opts,
queue=task.get_queue_name(),
uuid=task.celery_task_id,
)
# In exception cases, like a job failing pre-start checks, we send the websocket status message.
# For jobs going into waiting, we omit this because of performance issues, as it should go to running quickly
@@ -729,8 +721,3 @@ class TaskManager(TaskBase):
for workflow_approval in self.get_expired_workflow_approvals():
self.timeout_approval_node(workflow_approval)
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
for controller_node in self.control_nodes_to_notify:
logger.info(f'Notifying node {controller_node} of new waiting jobs.')
dispatch_waiting_jobs.apply_async(queue=controller_node)

View File

@@ -7,7 +7,7 @@ from django.conf import settings
# AWX
from awx import MODE
from awx.main.scheduler import TaskManager, DependencyManager, WorkflowManager
from awx.main.dispatch.publish import task as task_awx
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename
logger = logging.getLogger('awx.main.scheduler')
@@ -20,16 +20,16 @@ def run_manager(manager, prefix):
manager().schedule()
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def task_manager():
run_manager(TaskManager, "task")
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def dependency_manager():
run_manager(DependencyManager, "dependency")
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def workflow_manager():
run_manager(WorkflowManager, "workflow")

View File

@@ -1 +1 @@
from . import callback, facts, helpers, host_indirect, host_metrics, jobs, receptor, system # noqa
from . import host_metrics, jobs, receptor, system # noqa

View File

@@ -12,7 +12,7 @@ from django.db import transaction
# Django flags
from flags.state import flag_enabled
from awx.main.dispatch.publish import task as task_awx
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit
from awx.main.models.event_query import EventQuery
@@ -159,7 +159,7 @@ def cleanup_old_indirect_host_entries() -> None:
IndirectManagedNodeAudit.objects.filter(created__lt=limit).delete()
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> None:
try:
job = Job.objects.get(id=job_id)
@@ -201,7 +201,7 @@ def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> Non
logger.exception(f'Error processing indirect host data for job_id={job_id}')
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def cleanup_and_save_indirect_host_entries_fallback() -> None:
if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
return

View File

@@ -7,7 +7,7 @@ from django.db.models import Count, F
from django.db.models.functions import TruncMonth
from django.utils.timezone import now
from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.publish import task as task_awx
from awx.main.dispatch.publish import task
from awx.main.models.inventory import HostMetric, HostMetricSummaryMonthly
from awx.main.tasks.helpers import is_run_threshold_reached
from awx.conf.license import get_license
@@ -18,7 +18,7 @@ from awx.main.utils.db import bulk_update_sorted_by_id
logger = logging.getLogger('awx.main.tasks.host_metrics')
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def cleanup_host_metrics():
if is_run_threshold_reached(getattr(settings, 'CLEANUP_HOST_METRICS_LAST_TS', None), getattr(settings, 'CLEANUP_HOST_METRICS_INTERVAL', 30) * 86400):
logger.info(f"Executing cleanup_host_metrics, last ran at {getattr(settings, 'CLEANUP_HOST_METRICS_LAST_TS', '---')}")
@@ -29,7 +29,7 @@ def cleanup_host_metrics():
logger.info("Finished cleanup_host_metrics")
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def host_metric_summary_monthly():
"""Run cleanup host metrics summary monthly task each week"""
if is_run_threshold_reached(getattr(settings, 'HOST_METRIC_SUMMARY_TASK_LAST_TS', None), getattr(settings, 'HOST_METRIC_SUMMARY_TASK_INTERVAL', 7) * 86400):

View File

@@ -17,7 +17,6 @@ import urllib.parse as urlparse
# Django
from django.conf import settings
from django.db import transaction
# Shared code for the AWX platform
from awx_plugins.interfaces._temporary_private_container_api import CONTAINER_ROOT, get_incontainer_path
@@ -29,12 +28,8 @@ import ansible_runner
import git
from gitdb.exc import BadName as BadGitName
# Dispatcherd
from dispatcherd.publish import task
from dispatcherd.utils import serialize_task
# AWX
from awx.main.dispatch.publish import task as task_awx
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename
from awx.main.constants import (
PRIVILEGE_ESCALATION_METHODS,
@@ -42,13 +37,13 @@ from awx.main.constants import (
JOB_FOLDER_PREFIX,
MAX_ISOLATED_PATH_COLON_DELIMITER,
CONTAINER_VOLUMES_MOUNT_TYPES,
ACTIVE_STATES,
HOST_FACTS_FIELDS,
)
from awx.main.models import (
Instance,
Inventory,
InventorySource,
UnifiedJob,
Job,
AdHocCommand,
ProjectUpdate,
@@ -115,15 +110,6 @@ def with_path_cleanup(f):
return _wrapped
@task(on_duplicate='queue_one', bind=True, queue=get_task_queuename)
def dispatch_waiting_jobs(binder):
for uj in UnifiedJob.objects.filter(status='waiting', controller_node=settings.CLUSTER_HOST_ID).only('id', 'status', 'polymorphic_ctype', 'celery_task_id'):
kwargs = uj.get_start_kwargs()
if not kwargs:
kwargs = {}
binder.control('run', data={'task': serialize_task(uj._get_task_class()), 'args': [uj.id], 'kwargs': kwargs, 'uuid': uj.celery_task_id})
class BaseTask(object):
model = None
event_model = None
@@ -131,7 +117,6 @@ class BaseTask(object):
callback_class = RunnerCallback
def __init__(self):
self.instance = None
self.cleanup_paths = []
self.update_attempts = int(getattr(settings, 'DISPATCHER_DB_DOWNTOWN_TOLLERANCE', settings.DISPATCHER_DB_DOWNTIME_TOLERANCE) / 5)
self.runner_callback = self.callback_class(model=self.model)
@@ -319,8 +304,6 @@ class BaseTask(object):
# Add ANSIBLE_* settings to the subprocess environment.
for attr in dir(settings):
if attr == attr.upper() and attr.startswith('ANSIBLE_') and not attr.startswith('ANSIBLE_BASE_'):
if attr == 'ANSIBLE_STANDARD_SETTINGS_FILES':
continue # special case intended only for dynaconf use
env[attr] = str(getattr(settings, attr))
# Also set environment variables configured in AWX_TASK_ENV setting.
for key, value in settings.AWX_TASK_ENV.items():
@@ -459,21 +442,6 @@ class BaseTask(object):
"""
instance.log_lifecycle("finalize_run")
artifact_dir = os.path.join(private_data_dir, 'artifacts', str(self.instance.id))
collections_info = os.path.join(artifact_dir, 'collections.json')
ansible_version_file = os.path.join(artifact_dir, 'ansible_version.txt')
if os.path.exists(collections_info):
with open(collections_info) as ee_json_info:
ee_collections_info = json.loads(ee_json_info.read())
instance.installed_collections = ee_collections_info
instance.save(update_fields=['installed_collections'])
if os.path.exists(ansible_version_file):
with open(ansible_version_file) as ee_ansible_info:
ansible_version_info = ee_ansible_info.readline()
instance.ansible_version = ansible_version_info
instance.save(update_fields=['ansible_version'])
# Run task manager appropriately for speculative dependencies
if instance.unifiedjob_blocked_jobs.exists():
ScheduleTaskManager().schedule()
@@ -483,48 +451,27 @@ class BaseTask(object):
def should_use_fact_cache(self):
return False
def transition_status(self, pk: int) -> bool:
"""Atomically transition status to running, if False returned, another process got it"""
with transaction.atomic():
# Explanation of parts for the fetch:
# .values - avoid loading a full object, this is known to lead to deadlocks due to signals
# the signals load other related rows which another process may be locking, and happens in practice
# of=('self',) - keeps FK tables out of the lock list, another way deadlocks can happen
# .get - just load the single job
instance_data = UnifiedJob.objects.select_for_update(of=('self',)).values('status', 'cancel_flag').get(pk=pk)
# If status is not waiting (obtained under lock) then this process does not have clearence to run
if instance_data['status'] == 'waiting':
if instance_data['cancel_flag']:
updated_status = 'canceled'
else:
updated_status = 'running'
# Explanation of the update:
# .filter - again, do not load the full object
# .update - a bulk update on just that one row, avoid loading unintended data
UnifiedJob.objects.filter(pk=pk).update(status=updated_status, start_args='')
elif instance_data['status'] == 'running':
logger.info(f'Job {pk} is being ran by another process, exiting')
return False
return True
@with_path_cleanup
@with_signal_handling
def run(self, pk, **kwargs):
"""
Run the job/task and capture its output.
"""
if not self.instance: # Used to skip fetch for local runs
if not self.transition_status(pk):
logger.info(f'Job {pk} is being ran by another process, exiting')
return
self.instance = self.model.objects.get(pk=pk)
if self.instance.status != 'canceled' and self.instance.cancel_flag:
self.instance = self.update_model(self.instance.pk, start_args='', status='canceled')
if self.instance.status not in ACTIVE_STATES:
# Prevent starting the job if it has been reaped or handled by another process.
raise RuntimeError(f'Not starting {self.instance.status} task pk={pk} because {self.instance.status} is not a valid active state')
# Load the instance
self.instance = self.update_model(pk)
if self.instance.status != 'running':
logger.error(f'Not starting {self.instance.status} task pk={pk} because its status "{self.instance.status}" is not expected')
return
if self.instance.execution_environment_id is None:
from awx.main.signals import disable_activity_stream
with disable_activity_stream():
self.instance = self.update_model(self.instance.pk, execution_environment=self.instance.resolve_execution_environment())
# self.instance because of the update_model pattern and when it's used in callback handlers
self.instance = self.update_model(pk, status='running', start_args='') # blank field to remove encrypted passwords
self.instance.websocket_emit_status("running")
status, rc = 'error', None
self.runner_callback.event_ct = 0
@@ -537,12 +484,6 @@ class BaseTask(object):
private_data_dir = None
try:
if self.instance.execution_environment_id is None:
from awx.main.signals import disable_activity_stream
with disable_activity_stream():
self.instance = self.update_model(self.instance.pk, execution_environment=self.instance.resolve_execution_environment())
self.instance.send_notification_templates("running")
private_data_dir = self.build_private_data_dir(self.instance)
self.pre_run_hook(self.instance, private_data_dir)
@@ -550,7 +491,6 @@ class BaseTask(object):
self.build_project_dir(self.instance, private_data_dir)
self.instance.log_lifecycle("preparing_playbook")
if self.instance.cancel_flag or signal_callback():
logger.debug(f'detected pre-run cancel flag for {self.instance.log_format}')
self.instance = self.update_model(self.instance.pk, status='canceled')
if self.instance.status != 'running':
@@ -673,9 +613,12 @@ class BaseTask(object):
elif status == 'canceled':
self.instance = self.update_model(pk)
cancel_flag_value = getattr(self.instance, 'cancel_flag', False)
if cancel_flag_value is False:
if (cancel_flag_value is False) and signal_callback():
self.runner_callback.delay_update(skip_if_already_set=True, job_explanation="Task was canceled due to receiving a shutdown signal.")
status = 'failed'
elif cancel_flag_value is False:
self.runner_callback.delay_update(skip_if_already_set=True, job_explanation="The running ansible process received a shutdown signal.")
status = 'failed'
except PolicyEvaluationError as exc:
self.runner_callback.delay_update(job_explanation=str(exc), result_traceback=str(exc))
except ReceptorNodeNotFound as exc:
@@ -703,9 +646,6 @@ class BaseTask(object):
# Field host_status_counts is used as a metric to check if event processing is finished
# we send notifications if it is, if not, callback receiver will send them
if not self.instance:
logger.error(f'Unified job pk={pk} appears to be deleted while running')
return
if (self.instance.host_status_counts is not None) or (not self.runner_callback.wrapup_event_dispatched):
events_processed_hook(self.instance)
@@ -802,7 +742,6 @@ class SourceControlMixin(BaseTask):
try:
# the job private_data_dir is passed so sync can download roles and collections there
sync_task = RunProjectUpdate(job_private_data_dir=private_data_dir)
sync_task.instance = local_project_sync # avoids "waiting" status check, performance
sync_task.run(local_project_sync.id)
local_project_sync.refresh_from_db()
self.instance = self.update_model(self.instance.pk, scm_revision=local_project_sync.scm_revision)
@@ -866,7 +805,7 @@ class SourceControlMixin(BaseTask):
self.release_lock(project)
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
class RunJob(SourceControlMixin, BaseTask):
"""
Run a job using ansible-playbook.
@@ -1189,7 +1128,7 @@ class RunJob(SourceControlMixin, BaseTask):
update_inventory_computed_fields.delay(inventory.id)
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
class RunProjectUpdate(BaseTask):
model = ProjectUpdate
event_model = ProjectUpdateEvent
@@ -1528,7 +1467,7 @@ class RunProjectUpdate(BaseTask):
return []
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
class RunInventoryUpdate(SourceControlMixin, BaseTask):
model = InventoryUpdate
event_model = InventoryUpdateEvent
@@ -1791,7 +1730,7 @@ class RunInventoryUpdate(SourceControlMixin, BaseTask):
raise PostRunError('Error occured while saving inventory data, see traceback or server logs', status='error', tb=traceback.format_exc())
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
class RunAdHocCommand(BaseTask):
"""
Run an ad hoc command using ansible.
@@ -1944,7 +1883,7 @@ class RunAdHocCommand(BaseTask):
return d
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
class RunSystemJob(BaseTask):
model = SystemJob
event_model = SystemJobEvent

View File

@@ -8,6 +8,7 @@ from typing import Optional, Union
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from flags.state import flag_enabled
from opa_client import OpaClient
from opa_client.base import BaseClient
from requests import HTTPError
@@ -363,6 +364,9 @@ def opa_client(headers=None):
def evaluate_policy(instance):
# Policy evaluation for Policy as Code feature
if not flag_enabled("FEATURE_POLICY_AS_CODE_ENABLED"):
return
if not settings.OPA_HOST:
return

View File

@@ -32,7 +32,7 @@ from awx.main.constants import MAX_ISOLATED_PATH_COLON_DELIMITER
from awx.main.tasks.signals import signal_state, signal_callback, SignalExit
from awx.main.models import Instance, InstanceLink, UnifiedJob, ReceptorAddress
from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.publish import task as task_awx
from awx.main.dispatch.publish import task
# Receptorctl
from receptorctl.socket_interface import ReceptorControl
@@ -852,7 +852,7 @@ def reload_receptor():
raise RuntimeError("Receptor reload failed")
@task_awx()
@task()
def write_receptor_config():
"""
This task runs async on each control node, K8S only.
@@ -875,7 +875,7 @@ def write_receptor_config():
reload_receptor()
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def remove_deprovisioned_node(hostname):
InstanceLink.objects.filter(source__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)
InstanceLink.objects.filter(target__instance__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)

View File

@@ -14,21 +14,16 @@ class SignalExit(Exception):
class SignalState:
# SIGTERM: Sent by supervisord to process group on shutdown
# SIGUSR1: The dispatcherd cancel signal
signals = (signal.SIGTERM, signal.SIGINT, signal.SIGUSR1)
def reset(self):
for for_signal in self.signals:
self.signal_flags[for_signal] = False
self.original_methods[for_signal] = None
self.sigterm_flag = False
self.sigint_flag = False
self.is_active = False # for nested context managers
self.original_sigterm = None
self.original_sigint = None
self.raise_exception = False
def __init__(self):
self.signal_flags = {}
self.original_methods = {}
self.reset()
def raise_if_needed(self):
@@ -36,28 +31,31 @@ class SignalState:
self.raise_exception = False # so it is not raised a second time in error handling
raise SignalExit()
def set_signal_flag(self, *args, for_signal=None):
self.signal_flags[for_signal] = True
logger.info(f'Processed signal {for_signal}, set exit flag')
def set_sigterm_flag(self, *args):
self.sigterm_flag = True
self.raise_if_needed()
def set_sigint_flag(self, *args):
self.sigint_flag = True
self.raise_if_needed()
def connect_signals(self):
for for_signal in self.signals:
self.original_methods[for_signal] = signal.getsignal(for_signal)
signal.signal(for_signal, lambda *args, for_signal=for_signal: self.set_signal_flag(*args, for_signal=for_signal))
self.original_sigterm = signal.getsignal(signal.SIGTERM)
self.original_sigint = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGTERM, self.set_sigterm_flag)
signal.signal(signal.SIGINT, self.set_sigint_flag)
self.is_active = True
def restore_signals(self):
for for_signal in self.signals:
original_method = self.original_methods[for_signal]
signal.signal(for_signal, original_method)
# if we got a signal while context manager was active, call parent methods.
if self.signal_flags[for_signal]:
if callable(original_method):
try:
original_method()
except Exception as exc:
logger.info(f'Error processing original {for_signal} signal, error: {str(exc)}')
signal.signal(signal.SIGTERM, self.original_sigterm)
signal.signal(signal.SIGINT, self.original_sigint)
# if we got a signal while context manager was active, call parent methods.
if self.sigterm_flag:
if callable(self.original_sigterm):
self.original_sigterm()
if self.sigint_flag:
if callable(self.original_sigint):
self.original_sigint()
self.reset()
@@ -65,7 +63,7 @@ signal_state = SignalState()
def signal_callback():
return any(signal_state.signal_flags[for_signal] for for_signal in signal_state.signals)
return bool(signal_state.sigterm_flag or signal_state.sigint_flag)
def with_signal_handling(f):

View File

@@ -1,77 +1,78 @@
# Python
from collections import namedtuple
import functools
import importlib
import itertools
import json
import logging
import os
import psycopg
from io import StringIO
from contextlib import redirect_stdout
import shutil
import time
from collections import namedtuple
from contextlib import redirect_stdout
from datetime import datetime
from distutils.version import LooseVersion as Version
from io import StringIO
from datetime import datetime
# Runner
import ansible_runner.cleanup
import psycopg
from ansible_base.lib.utils.db import advisory_lock
# django-ansible-base
from ansible_base.resource_registry.tasks.sync import SyncExecutor
# Django
from django.conf import settings
from django.db import connection, transaction, DatabaseError, IntegrityError
from django.db.models.fields.related import ForeignKey
from django.utils.timezone import now, timedelta
from django.utils.encoding import smart_str
from django.contrib.auth.models import User
from django.utils.translation import gettext_lazy as _
from django.utils.translation import gettext_noop
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist
from django.db.models.query import QuerySet
# Django-CRUM
from crum import impersonate
# Django flags
from flags.state import flag_enabled
# Runner
import ansible_runner.cleanup
# dateutil
from dateutil.parser import parse as parse_date
# Django
from django.conf import settings
from django.contrib.auth.models import User
from django.core.cache import cache
from django.core.exceptions import ObjectDoesNotExist
from django.db import DatabaseError, IntegrityError, connection, transaction
from django.db.models.fields.related import ForeignKey
from django.db.models.query import QuerySet
from django.utils.encoding import smart_str
from django.utils.timezone import now, timedelta
from django.utils.translation import gettext_lazy as _
from django.utils.translation import gettext_noop
# Django flags
from flags.state import flag_enabled
from rest_framework.exceptions import PermissionDenied
# django-ansible-base
from ansible_base.resource_registry.tasks.sync import SyncExecutor
from ansible_base.lib.utils.db import advisory_lock
# AWX
from awx import __version__ as awx_application_version
from awx.conf import settings_registry
from awx.main import analytics
from awx.main.access import access_registry
from awx.main.analytics.subsystem_metrics import DispatcherMetrics
from awx.main.constants import ACTIVE_STATES, ERROR_STATES
from awx.main.consumers import emit_channel_notification
from awx.main.dispatch import get_task_queuename, reaper
from awx.main.dispatch.publish import task as task_awx
from awx.main.models import (
Schedule,
TowerScheduleState,
Instance,
InstanceGroup,
Inventory,
Job,
Notification,
Schedule,
SmartInventoryMembership,
TowerScheduleState,
UnifiedJob,
Notification,
Inventory,
SmartInventoryMembership,
Job,
convert_jsonfields,
)
from awx.main.constants import ACTIVE_STATES, ERROR_STATES
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename, reaper
from awx.main.utils.common import ignore_inventory_computed_fields, ignore_inventory_group_removal
from awx.main.utils.reload import stop_local_services
from awx.main.tasks.helpers import is_run_threshold_reached
from awx.main.tasks.host_indirect import save_indirect_host_entries
from awx.main.tasks.receptor import administrative_workunit_reaper, get_receptor_ctl, worker_cleanup, worker_info, write_receptor_config
from awx.main.utils.common import ignore_inventory_computed_fields, ignore_inventory_group_removal
from awx.main.utils.reload import stop_local_services
from dispatcherd.publish import task
from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanup, administrative_workunit_reaper, write_receptor_config
from awx.main.consumers import emit_channel_notification
from awx.main import analytics
from awx.conf import settings_registry
from awx.main.analytics.subsystem_metrics import DispatcherMetrics
from rest_framework.exceptions import PermissionDenied
logger = logging.getLogger('awx.main.tasks.system')
@@ -82,12 +83,7 @@ Try upgrading OpenSSH or providing your private key in an different format. \
'''
def _run_dispatch_startup_common():
"""
Execute the common startup initialization steps.
This includes updating schedules, syncing instance membership, and starting
local reaping and resetting metrics.
"""
def dispatch_startup():
startup_logger = logging.getLogger('awx.main.tasks')
# TODO: Enable this on VM installs
@@ -97,14 +93,14 @@ def _run_dispatch_startup_common():
try:
convert_jsonfields()
except Exception:
logger.exception("Failed JSON field conversion, skipping.")
logger.exception("Failed json field conversion, skipping.")
startup_logger.debug("Syncing schedules")
startup_logger.debug("Syncing Schedules")
for sch in Schedule.objects.all():
try:
sch.update_computed_fields()
except Exception:
logger.exception("Failed to rebuild schedule %s.", sch)
logger.exception("Failed to rebuild schedule {}.".format(sch))
#
# When the dispatcher starts, if the instance cannot be found in the database,
@@ -124,67 +120,25 @@ def _run_dispatch_startup_common():
apply_cluster_membership_policies()
cluster_node_heartbeat()
reaper.startup_reaping()
reaper.reap_waiting(grace_period=0)
m = DispatcherMetrics()
m.reset_values()
def _legacy_dispatch_startup():
"""
Legacy branch for startup: simply performs reaping of waiting jobs with a zero grace period.
"""
logger.debug("Legacy dispatcher: calling reaper.reap_waiting with grace_period=0")
reaper.reap_waiting(grace_period=0)
def _dispatcherd_dispatch_startup():
"""
New dispatcherd branch for startup: uses the control API to re-submit waiting jobs.
"""
logger.debug("Dispatcherd enabled: dispatching waiting jobs via control channel")
from awx.main.tasks.jobs import dispatch_waiting_jobs
dispatch_waiting_jobs.apply_async(queue=get_task_queuename())
def dispatch_startup():
"""
System initialization at startup.
First, execute the common logic.
Then, if FEATURE_DISPATCHERD_ENABLED is enabled, re-submit waiting jobs via the control API;
otherwise, fall back to legacy reaping of waiting jobs.
"""
_run_dispatch_startup_common()
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
_dispatcherd_dispatch_startup()
else:
_legacy_dispatch_startup()
def inform_cluster_of_shutdown():
"""
Clean system shutdown that marks the current instance offline.
In legacy mode, it also reaps waiting jobs.
In dispatcherd mode, it relies on dispatcherd's built-in cleanup.
"""
try:
inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal'))
except Instance.DoesNotExist:
logger.exception("Cluster host not found: %s", settings.CLUSTER_HOST_ID)
return
if flag_enabled('FEATURE_DISPATCHERD_ENABLED'):
logger.debug("Dispatcherd mode: no extra reaping required for instance %s", inst.hostname)
else:
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal'))
try:
logger.debug("Legacy mode: reaping waiting jobs for instance %s", inst.hostname)
reaper.reap_waiting(inst, grace_period=0)
reaper.reap_waiting(this_inst, grace_period=0)
except Exception:
logger.exception("Failed to reap waiting jobs for %s", inst.hostname)
logger.warning("Normal shutdown processed for instance %s; instance removed from capacity pool.", inst.hostname)
logger.exception('failed to reap waiting jobs for {}'.format(this_inst.hostname))
logger.warning('Normal shutdown signal for instance {}, removed self from capacity pool.'.format(this_inst.hostname))
except Exception:
logger.exception('Encountered problem with normal shutdown signal.')
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def migrate_jsonfield(table, pkfield, columns):
batchsize = 10000
with advisory_lock(f'json_migration_{table}', wait=False) as acquired:
@@ -230,7 +184,7 @@ def migrate_jsonfield(table, pkfield, columns):
logger.warning(f"Migration of {table} to jsonb is finished.")
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def apply_cluster_membership_policies():
from awx.main.signals import disable_activity_stream
@@ -342,7 +296,7 @@ def apply_cluster_membership_policies():
logger.debug('Cluster policy computation finished in {} seconds'.format(time.time() - started_compute))
@task_awx(queue='tower_settings_change')
@task(queue='tower_settings_change')
def clear_setting_cache(setting_keys):
# log that cache is being cleared
logger.info(f"clear_setting_cache of keys {setting_keys}")
@@ -355,7 +309,7 @@ def clear_setting_cache(setting_keys):
cache.delete_many(cache_keys)
@task_awx(queue='tower_broadcast_all')
@task(queue='tower_broadcast_all')
def delete_project_files(project_path):
# TODO: possibly implement some retry logic
lock_file = project_path + '.lock'
@@ -373,7 +327,7 @@ def delete_project_files(project_path):
logger.exception('Could not remove lock file {}'.format(lock_file))
@task_awx(queue='tower_broadcast_all')
@task(queue='tower_broadcast_all')
def profile_sql(threshold=1, minutes=1):
if threshold <= 0:
cache.delete('awx-profile-sql-threshold')
@@ -383,7 +337,7 @@ def profile_sql(threshold=1, minutes=1):
logger.error('SQL QUERIES >={}s ENABLED FOR {} MINUTE(S)'.format(threshold, minutes))
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def send_notifications(notification_list, job_id=None):
if not isinstance(notification_list, list):
raise TypeError("notification_list should be of type list")
@@ -428,13 +382,13 @@ def events_processed_hook(unified_job):
save_indirect_host_entries.delay(unified_job.id)
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def gather_analytics():
if is_run_threshold_reached(getattr(settings, 'AUTOMATION_ANALYTICS_LAST_GATHER', None), settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL):
analytics.gather()
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def purge_old_stdout_files():
nowtime = time.time()
for f in os.listdir(settings.JOBOUTPUT_ROOT):
@@ -496,18 +450,18 @@ class CleanupImagesAndFiles:
cls.run_remote(this_inst, **kwargs)
@task_awx(queue='tower_broadcast_all')
@task(queue='tower_broadcast_all')
def handle_removed_image(remove_images=None):
"""Special broadcast invocation of this method to handle case of deleted EE"""
CleanupImagesAndFiles.run(remove_images=remove_images, file_pattern='')
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def cleanup_images_and_files():
CleanupImagesAndFiles.run(image_prune=True)
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def cluster_node_health_check(node):
"""
Used for the health check endpoint, refreshes the status of the instance, but must be ran on target node
@@ -526,7 +480,7 @@ def cluster_node_health_check(node):
this_inst.local_health_check()
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def execution_node_health_check(node):
if node == '':
logger.warning('Remote health check incorrectly called with blank string')
@@ -594,16 +548,8 @@ def inspect_established_receptor_connections(mesh_status):
def inspect_execution_and_hop_nodes(instance_list):
with advisory_lock('inspect_execution_and_hop_nodes_lock', wait=False):
node_lookup = {inst.hostname: inst for inst in instance_list}
try:
ctl = get_receptor_ctl()
except FileNotFoundError:
logger.error('Receptor daemon not running, skipping execution node check')
return
try:
mesh_status = ctl.simple_command('status')
except ValueError as exc:
logger.error(f'Error running receptorctl status command, error: {str(exc)}')
return
ctl = get_receptor_ctl()
mesh_status = ctl.simple_command('status')
inspect_established_receptor_connections(mesh_status)
@@ -651,109 +597,8 @@ def inspect_execution_and_hop_nodes(instance_list):
execution_node_health_check.apply_async([hostname])
@task_awx(queue=get_task_queuename, bind_kwargs=['dispatch_time', 'worker_tasks'])
@task(queue=get_task_queuename, bind_kwargs=['dispatch_time', 'worker_tasks'])
def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None):
"""
Original implementation for AWX dispatcher.
Uses worker_tasks from bind_kwargs to track running tasks.
"""
# Run common instance management logic
this_inst, instance_list, lost_instances = _heartbeat_instance_management()
if this_inst is None:
return # Early return case from instance management
# Check versions
_heartbeat_check_versions(this_inst, instance_list)
# Handle lost instances
_heartbeat_handle_lost_instances(lost_instances, this_inst)
# Run local reaper - original implementation using worker_tasks
if worker_tasks is not None:
active_task_ids = []
for task_list in worker_tasks.values():
active_task_ids.extend(task_list)
# Convert dispatch_time to datetime
ref_time = datetime.fromisoformat(dispatch_time) if dispatch_time else now()
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=ref_time)
if max(len(task_list) for task_list in worker_tasks.values()) <= 1:
reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=ref_time)
@task(queue=get_task_queuename, bind=True)
def adispatch_cluster_node_heartbeat(binder):
"""
Dispatcherd implementation.
Uses Control API to get running tasks.
"""
# Run common instance management logic
this_inst, instance_list, lost_instances = _heartbeat_instance_management()
if this_inst is None:
return # Early return case from instance management
# Check versions
_heartbeat_check_versions(this_inst, instance_list)
# Handle lost instances
_heartbeat_handle_lost_instances(lost_instances, this_inst)
# Get running tasks using dispatcherd API
active_task_ids = _get_active_task_ids_from_dispatcherd(binder)
if active_task_ids is None:
logger.warning("No active task IDs retrieved from dispatcherd, skipping reaper")
return # Failed to get task IDs, don't attempt reaping
# Run local reaper using tasks from dispatcherd
ref_time = now() # No dispatch_time in dispatcherd version
logger.debug(f"Running reaper with {len(active_task_ids)} excluded UUIDs")
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=ref_time)
# If waiting jobs are hanging out, resubmit them
if UnifiedJob.objects.filter(controller_node=settings.CLUSTER_HOST_ID, status='waiting').exists():
from awx.main.tasks.jobs import dispatch_waiting_jobs
dispatch_waiting_jobs.apply_async(queue=get_task_queuename())
def _get_active_task_ids_from_dispatcherd(binder):
"""
Retrieve active task IDs from the dispatcherd control API.
Returns:
list: List of active task UUIDs
None: If there was an error retrieving the data
"""
active_task_ids = []
try:
logger.debug("Querying dispatcherd API for running tasks")
data = binder.control('running')
# Extract UUIDs from the running data
# Process running data: first item is a dict with node_id and task entries
data.pop('node_id', None)
# Extract task UUIDs from data structure
for task_key, task_value in data.items():
if isinstance(task_value, dict) and 'uuid' in task_value:
active_task_ids.append(task_value['uuid'])
logger.debug(f"Found active task with UUID: {task_value['uuid']}")
elif isinstance(task_key, str):
# Handle case where UUID might be the key
active_task_ids.append(task_key)
logger.debug(f"Found active task with key: {task_key}")
logger.debug(f"Retrieved {len(active_task_ids)} active task IDs from dispatcherd")
return active_task_ids
except Exception:
logger.exception("Failed to get running tasks from dispatcherd")
return None
def _heartbeat_instance_management():
"""Common logic for heartbeat instance management."""
logger.debug("Cluster node heartbeat task.")
nowtime = now()
instance_list = list(Instance.objects.filter(node_state__in=(Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED)))
@@ -780,7 +625,7 @@ def _heartbeat_instance_management():
this_inst.local_health_check()
if startup_event and this_inst.capacity != 0:
logger.warning(f'Rejoining the cluster as instance {this_inst.hostname}. Prior last_seen {last_last_seen}')
return None, None, None # Early return case
return
elif not last_last_seen:
logger.warning(f'Instance does not have recorded last_seen, updating to {nowtime}')
elif (nowtime - last_last_seen) > timedelta(seconds=settings.CLUSTER_NODE_HEARTBEAT_PERIOD + 2):
@@ -792,14 +637,8 @@ def _heartbeat_instance_management():
logger.warning(f'Recreated instance record {this_inst.hostname} after unexpected removal')
this_inst.local_health_check()
else:
logger.error("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
return None, None, None
return this_inst, instance_list, lost_instances
def _heartbeat_check_versions(this_inst, instance_list):
"""Check versions across instances and determine if shutdown is needed."""
raise RuntimeError("Cluster Host Not Found: {}".format(settings.CLUSTER_HOST_ID))
# IFF any node has a greater version than we do, then we'll shutdown services
for other_inst in instance_list:
if other_inst.node_type in ('execution', 'hop'):
continue
@@ -816,9 +655,6 @@ def _heartbeat_check_versions(this_inst, instance_list):
stop_local_services(communicate=False)
raise RuntimeError("Shutting down.")
def _heartbeat_handle_lost_instances(lost_instances, this_inst):
"""Handle lost instances by reaping their jobs and marking them offline."""
for other_inst in lost_instances:
try:
explanation = "Job reaped due to instance shutdown"
@@ -849,8 +685,17 @@ def _heartbeat_handle_lost_instances(lost_instances, this_inst):
else:
logger.exception('No SQL state available. Error marking {} as lost'.format(other_inst.hostname))
# Run local reaper
if worker_tasks is not None:
active_task_ids = []
for task_list in worker_tasks.values():
active_task_ids.extend(task_list)
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))
if max(len(task_list) for task_list in worker_tasks.values()) <= 1:
reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=datetime.fromisoformat(dispatch_time))
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def awx_receptor_workunit_reaper():
"""
When an AWX job is launched via receptor, files such as status, stdin, and stdout are created
@@ -873,16 +718,8 @@ def awx_receptor_workunit_reaper():
if not settings.RECEPTOR_RELEASE_WORK:
return
logger.debug("Checking for unreleased receptor work units")
try:
receptor_ctl = get_receptor_ctl()
except FileNotFoundError:
logger.info('Receptorctl sockfile not found for workunit reaper, doing nothing')
return
try:
receptor_work_list = receptor_ctl.simple_command("work list")
except ValueError as exc:
logger.info(f'Error getting work list for workunit reaper, error: {str(exc)}')
return
receptor_ctl = get_receptor_ctl()
receptor_work_list = receptor_ctl.simple_command("work list")
unit_ids = [id for id in receptor_work_list]
jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES)
@@ -896,7 +733,7 @@ def awx_receptor_workunit_reaper():
administrative_workunit_reaper(receptor_work_list)
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def awx_k8s_reaper():
if not settings.RECEPTOR_RELEASE_WORK:
return
@@ -919,7 +756,7 @@ def awx_k8s_reaper():
logger.exception("Failed to delete orphaned pod {} from {}".format(job.log_format, group))
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def awx_periodic_scheduler():
lock_session_timeout_milliseconds = settings.TASK_MANAGER_LOCK_TIMEOUT * 1000
with advisory_lock('awx_periodic_scheduler_lock', lock_session_timeout_milliseconds=lock_session_timeout_milliseconds, wait=False) as acquired:
@@ -978,7 +815,7 @@ def awx_periodic_scheduler():
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def handle_failure_notifications(task_ids):
"""A task-ified version of the method that sends notifications."""
found_task_ids = set()
@@ -993,7 +830,7 @@ def handle_failure_notifications(task_ids):
logger.warning(f'Could not send notifications for {deleted_tasks} because they were not found in the database')
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def update_inventory_computed_fields(inventory_id):
"""
Signal handler and wrapper around inventory.update_computed_fields to
@@ -1043,7 +880,7 @@ def update_smart_memberships_for_inventory(smart_inventory):
return False
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def update_host_smart_inventory_memberships():
smart_inventories = Inventory.objects.filter(kind='smart', host_filter__isnull=False, pending_deletion=False)
changed_inventories = set([])
@@ -1059,7 +896,7 @@ def update_host_smart_inventory_memberships():
smart_inventory.update_computed_fields()
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def delete_inventory(inventory_id, user_id, retries=5):
# Delete inventory as user
if user_id is None:
@@ -1121,7 +958,7 @@ def _reconstruct_relationships(copy_mapping):
new_obj.save()
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, permission_check_func=None):
logger.debug('Deep copy {} from {} to {}.'.format(model_name, obj_pk, new_obj_pk))
@@ -1176,7 +1013,7 @@ def deep_copy_model_obj(model_module, model_name, obj_pk, new_obj_pk, user_pk, p
update_inventory_computed_fields.delay(new_obj.id)
@task_awx(queue=get_task_queuename)
@task(queue=get_task_queuename)
def periodic_resource_sync():
if not getattr(settings, 'RESOURCE_SERVER', None):
logger.debug("Skipping periodic resource_sync, RESOURCE_SERVER not configured")

View File

@@ -8,12 +8,5 @@
"CONTROLLER_PASSWORD": "fooo",
"CONTROLLER_USERNAME": "fooo",
"CONTROLLER_OAUTH_TOKEN": "",
"CONTROLLER_VERIFY_SSL": "False",
"AAP_HOSTNAME": "https://foo.invalid",
"AAP_PASSWORD": "fooo",
"AAP_USERNAME": "fooo",
"AAP_VALIDATE_CERTS": "False",
"CONTROLLER_REQUEST_TIMEOUT": "fooo",
"AAP_REQUEST_TIMEOUT": "fooo",
"AAP_TOKEN": ""
"CONTROLLER_VERIFY_SSL": "False"
}

View File

@@ -1,9 +0,0 @@
---
- hosts: all
gather_facts: false
connection: local
vars:
sleep_interval: 5
tasks:
- name: sleep for a specified interval
command: sleep '{{ sleep_interval }}'

View File

@@ -1,57 +1,17 @@
import time
import logging
from dispatcherd.publish import task
from django.db import connection
from awx.main.dispatch import get_task_queuename
from awx.main.dispatch.publish import task as old_task
from ansible_base.lib.utils.db import advisory_lock
from awx.main.dispatch.publish import task
logger = logging.getLogger(__name__)
@old_task(queue=get_task_queuename)
@task(queue=get_task_queuename)
def sleep_task(seconds=10, log=False):
if log:
logger.info('starting sleep_task')
time.sleep(seconds)
if log:
logger.info('finished sleep_task')
@task()
def sleep_break_connection(seconds=0.2):
"""
Interact with the database in an intentionally breaking way.
After this finishes, queries made by this connection are expected to error
with "the connection is closed"
This is obviously a problem for any task that comes afterwards.
So this is used to break things so that the fixes may be demonstrated.
"""
with connection.cursor() as cursor:
cursor.execute(f"SET idle_session_timeout = '{seconds / 2}s';")
logger.info(f'sleeping for {seconds}s > {seconds / 2}s session timeout')
time.sleep(seconds)
for i in range(1, 3):
logger.info(f'\nRunning query number {i}')
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1;")
logger.info(' query worked, not expected')
except Exception as exc:
logger.info(f' query errored as expected\ntype: {type(exc)}\nstr: {str(exc)}')
logger.info(f'Connection present: {bool(connection.connection)}, reports closed: {getattr(connection.connection, "closed", "not_found")}')
@task()
def advisory_lock_exception():
time.sleep(0.2) # so it can fill up all the workers... hacky for now
with advisory_lock('advisory_lock_exception', lock_session_timeout_milliseconds=20):
raise RuntimeError('this is an intentional error')

View File

@@ -150,24 +150,3 @@ def test_ship_credential(setting_map, expected_result, expected_auth, temp_analy
assert mock_analytic_post.call_args[1]['auth'] == expected_auth
else:
mock_analytic_post.assert_not_called()
@pytest.mark.django_db
def test_gather_cleanup_on_auth_failure(mock_valid_license, temp_analytic_tar):
settings.INSIGHTS_TRACKING_STATE = True
settings.AUTOMATION_ANALYTICS_URL = 'https://example.com/api'
settings.REDHAT_USERNAME = 'test_user'
settings.REDHAT_PASSWORD = 'test_password'
with tempfile.NamedTemporaryFile(delete=False, suffix='.tar.gz') as temp_file:
temp_file_path = temp_file.name
try:
with mock.patch('awx.main.analytics.core.ship', return_value=False):
with mock.patch('awx.main.analytics.core.package', return_value=temp_file_path):
gather(module=importlib.import_module(__name__), collection_type='scheduled')
assert not os.path.exists(temp_file_path), "Temp file was not cleaned up after ship failure"
finally:
if os.path.exists(temp_file_path):
os.remove(temp_file_path)

View File

@@ -30,7 +30,6 @@ EXPECTED_VALUES = {
'awx_license_instance_free': 0,
'awx_pending_jobs_total': 0,
'awx_database_connections_total': 1,
'awx_license_expiry': 0,
}

View File

@@ -39,7 +39,7 @@ def test_dispatcher_max_workers_reserve(settings, fake_redis):
plus reserve worker count
"""
with override_settings(**settings):
i = Instance.objects.create(hostname='test-1', node_type='hybrid')
i = Instance.objects.create(hostname='test-1', node_type='hybrid', capacity_adjustment=1.0)
i.local_health_check()
assert get_auto_max_workers() == i.capacity + 7, (i.cpu, i.memory, i.cpu_capacity, i.mem_capacity)

View File

@@ -15,17 +15,3 @@ def test_does_not_run_reaped_job(mocker, mock_me):
job.refresh_from_db()
assert job.status == 'failed'
mock_run.assert_not_called()
@pytest.mark.django_db
def test_cancel_flag_on_start(jt_linked, caplog):
job = jt_linked.create_unified_job()
job.status = 'waiting'
job.cancel_flag = True
job.save()
task = RunJob()
task.run(job.id)
job = Job.objects.get(id=job.id)
assert job.status == 'canceled'

View File

@@ -5,11 +5,8 @@ import signal
import time
import yaml
from unittest import mock
from copy import deepcopy
from django.utils.timezone import now as tz_now
from django.conf import settings
from django.test.utils import override_settings
import pytest
from awx.main.models import Job, WorkflowJob, Instance
@@ -303,13 +300,6 @@ class TestTaskDispatcher:
class TestTaskPublisher:
@pytest.fixture(autouse=True)
def _disable_dispatcherd(self):
ffs = deepcopy(settings.FLAGS)
ffs['FEATURE_DISPATCHERD_ENABLED'][0]['value'] = False
with override_settings(FLAGS=ffs):
yield
def test_function_callable(self):
assert add(2, 2) == 4

View File

@@ -209,7 +209,7 @@ def test_inventory_update_injected_content(product_name, this_kind, inventory, f
source_vars=src_vars,
)
inventory_source.credentials.add(fake_credential_factory(this_kind))
inventory_update = inventory_source.create_unified_job(_eager_fields={'status': 'waiting'})
inventory_update = inventory_source.create_unified_job()
task = RunInventoryUpdate()
def substitute_run(awx_receptor_job):

View File

@@ -39,13 +39,13 @@ def test_orphan_unified_job_creation(instance, inventory):
@pytest.mark.django_db
@mock.patch('awx.main.tasks.system.inspect_execution_and_hop_nodes', lambda *args, **kwargs: None)
@mock.patch('awx.main.models.ha.get_cpu_effective_capacity', lambda cpu, is_control_node: 8)
@mock.patch('awx.main.models.ha.get_mem_effective_capacity', lambda mem, is_control_node: 62)
@mock.patch('awx.main.models.ha.get_mem_effective_capacity', lambda mem, is_control_node: 64)
def test_job_capacity_and_with_inactive_node():
i = Instance.objects.create(hostname='test-1')
i.save_health_data('18.0.1', 2, 8000)
assert i.enabled is True
assert i.capacity_adjustment == 1.0
assert i.capacity == 62
assert i.capacity_adjustment == 0.75
assert i.capacity == 50
i.enabled = False
i.save()
with override_settings(CLUSTER_HOST_ID=i.hostname):

View File

@@ -47,7 +47,6 @@ def index_licenses(path):
def parse_requirement(reqt):
parsed_requirement = parse_req_from_line(reqt.requirement, None)
assert parsed_requirement.requirement, reqt.__dict__
name = parsed_requirement.requirement.name
version = str(parsed_requirement.requirement.specifier)
if version.startswith('=='):

View File

@@ -36,9 +36,11 @@ def _parse_exception_message(exception: PolicyEvaluationError):
@pytest.fixture(autouse=True)
def setup_opa_settings():
def enable_flag():
with override_settings(
OPA_HOST='opa.example.com',
FLAGS={"FEATURE_POLICY_AS_CODE_ENABLED": [("boolean", True)]},
FLAG_SOURCES=('flags.sources.SettingsFlagsSource',),
):
yield

View File

@@ -175,7 +175,7 @@ def project_factory(post, default_org, admin):
@pytest.fixture
def run_job_from_playbook(demo_inv, post, admin, project_factory):
def _rf(test_name, playbook, local_path=None, scm_url=None, jt_params=None, proj=None, wait=True):
def _rf(test_name, playbook, local_path=None, scm_url=None, jt_params=None, proj=None):
jt_name = f'{test_name} JT: {playbook}'
if not proj:
@@ -206,9 +206,9 @@ def run_job_from_playbook(demo_inv, post, admin, project_factory):
job = jt.create_unified_job()
job.signal_start()
if wait:
wait_for_job(job)
assert job.status == 'successful'
wait_for_job(job)
assert job.status == 'successful'
return {'job': job, 'job_template': jt, 'project': proj}
return _rf

View File

@@ -1,74 +0,0 @@
import time
from dispatcherd.config import settings
from dispatcherd.factories import get_control_from_settings
from dispatcherd.utils import serialize_task
from awx.main.models import JobTemplate
from awx.main.tests.data.sleep_task import sleep_break_connection, advisory_lock_exception
from awx.main.tests.live.tests.conftest import wait_for_job
def poll_for_task_finish(task_name):
running_tasks = [1]
start = time.monotonic()
ctl = get_control_from_settings()
while running_tasks:
responses = ctl.control_with_reply('running')
assert len(responses) == 1
response = responses[0]
response.pop('node_id')
running_tasks = [task_data for task_data in response.values() if task_data['task'] == task_name]
if time.monotonic() - start > 5.0:
assert False, f'Never finished working through tasks: {running_tasks}'
def check_jobs_work():
jt = JobTemplate.objects.get(name='Demo Job Template')
job = jt.create_unified_job()
job.signal_start()
wait_for_job(job)
def test_advisory_lock_error_clears():
"""Run a task that has an exception while holding advisory_lock
This is regression testing for a bug in its exception handling
expected to be fixed by
https://github.com/ansible/django-ansible-base/pull/713
This is an "easier" test case than the next,
because it passes just by fixing the DAB case,
and passing this does not generally guarentee that
workers will not be left with a connection in a bad state.
"""
min_workers = settings.service['pool_kwargs']['min_workers']
for i in range(min_workers):
advisory_lock_exception.delay()
task_name = serialize_task(advisory_lock_exception)
poll_for_task_finish(task_name)
# Jobs should still work even after the breaking task has ran
check_jobs_work()
def test_can_recover_connection():
"""Run a task that intentionally times out the worker connection
If no connection fixing is implemented outside of that task scope,
then subsequent tasks will all error, thus checking that jobs run,
after running the sleep_break_connection task.
"""
min_workers = settings.service['pool_kwargs']['min_workers']
for i in range(min_workers):
sleep_break_connection.delay()
task_name = serialize_task(sleep_break_connection)
poll_for_task_finish(task_name)
# Jobs should still work even after the breaking task has ran
check_jobs_work()

View File

@@ -1,40 +0,0 @@
import time
from awx.api.versioning import reverse
from awx.main.models import Job
from awx.main.tests.live.tests.conftest import wait_for_events
def test_cancel_and_delete_job(live_tmp_folder, run_job_from_playbook, post, delete, admin):
res = run_job_from_playbook('test_cancel_and_delete_job', 'sleep.yml', scm_url=f'file://{live_tmp_folder}/debug', wait=False)
job = res['job']
assert job.status == 'pending'
# Wait for first event so that we can be sure the job is in-progress first
start = time.time()
timeout = 10.0
while not job.job_events.exists():
time.sleep(0.2)
if time.time() - start > timeout:
assert False, f'Did not receive first event for job_id={job.id} in {timeout} seconds'
# Now cancel the job
url = reverse("api:job_cancel", kwargs={'pk': job.pk})
post(url, user=admin, expect=202)
# Job status should change to expected status before infinity
start = time.time()
timeout = 5.0
job.refresh_from_db()
while job.status != 'canceled':
time.sleep(0.05)
job.refresh_from_db(fields=['status'])
if time.time() - start > timeout:
assert False, f'job_id={job.id} still status={job.status} after {timeout} seconds'
wait_for_events(job)
url = reverse("api:job_detail", kwargs={'pk': job.pk})
delete(url, user=admin, expect=204)
assert not Job.objects.filter(id=job.id).exists()

View File

@@ -50,7 +50,7 @@ def test_outer_inner_signal_handling():
@with_signal_handling
def f1():
assert signal_callback() is False
signal_state.set_signal_flag(for_signal=signal.SIGTERM)
signal_state.set_sigterm_flag()
assert signal_callback()
f2()
@@ -74,7 +74,7 @@ def test_inner_outer_signal_handling():
@with_signal_handling
def f2():
assert signal_callback() is False
signal_state.set_signal_flag(for_signal=signal.SIGINT)
signal_state.set_sigint_flag()
assert signal_callback()
@with_signal_handling

View File

@@ -107,7 +107,7 @@ def job():
@pytest.fixture
def adhoc_job():
return AdHocCommand(pk=1, id=1, inventory=Inventory(), status='waiting')
return AdHocCommand(pk=1, id=1, inventory=Inventory())
@pytest.fixture
@@ -481,6 +481,26 @@ class TestGenericRun:
assert update_model_call['status'] == 'error'
assert update_model_call['emitted_events'] == 0
def test_cancel_flag(self, job, update_model_wrapper, execution_environment, mock_me, mock_create_partition):
job.status = 'running'
job.cancel_flag = True
job.websocket_emit_status = mock.Mock()
job.send_notification_templates = mock.Mock()
job.execution_environment = execution_environment
task = jobs.RunJob()
task.instance = job
task.update_model = mock.Mock(wraps=update_model_wrapper)
task.model.objects.get = mock.Mock(return_value=job)
task.build_private_data_files = mock.Mock()
with mock.patch('awx.main.tasks.jobs.shutil.copytree'):
with pytest.raises(Exception):
task.run(1)
for c in [mock.call(1, start_args='', status='canceled')]:
assert c in task.update_model.call_args_list
def test_event_count(self, mock_me):
task = jobs.RunJob()
task.runner_callback.dispatcher = mock.MagicMock()
@@ -569,8 +589,6 @@ class TestAdhocRun(TestJobExecution):
adhoc_job.send_notification_templates = mock.Mock()
task = jobs.RunAdHocCommand()
adhoc_job.status = 'running' # to bypass status flip
task.instance = adhoc_job # to bypass fetch
task.update_model = mock.Mock(wraps=adhoc_update_model_wrapper)
task.model.objects.get = mock.Mock(return_value=adhoc_job)
task.build_inventory = mock.Mock()

View File

@@ -1,8 +1,8 @@
# Copyright (c) 2017 Ansible by Red Hat
# All Rights Reserved.
from awx.settings.application_name import set_application_name
from awx.settings.application_name import set_application_name
from django.conf import settings

View File

@@ -6,7 +6,7 @@ import urllib.parse as urlparse
from django.conf import settings
from awx.main.utils.reload import supervisor_service_command
from awx.main.dispatch.publish import task as task_awx
from awx.main.dispatch.publish import task
def construct_rsyslog_conf_template(settings=settings):
@@ -139,7 +139,7 @@ def construct_rsyslog_conf_template(settings=settings):
return tmpl
@task_awx(queue='rsyslog_configurer')
@task(queue='rsyslog_configurer')
def reconfigure_rsyslog():
tmpl = construct_rsyslog_conf_template()
# Write config to a temp file then move it to preserve atomicity

View File

@@ -243,23 +243,14 @@ class Licenser(object):
return []
def get_rhsm_subs(self, host, client_id, client_secret):
try:
client = OIDCClient(client_id, client_secret)
subs = client.make_request(
'GET',
host,
verify=True,
timeout=(5, 20),
)
except requests.RequestException:
logger.warning("Failed to connect to console.redhat.com using Service Account credentials. Falling back to basic auth.")
subs = requests.request(
'GET',
host,
auth=(client_id, client_secret),
verify=True,
timeout=(5, 20),
)
client = OIDCClient(client_id, client_secret)
subs = client.make_request(
'GET',
host,
verify=True,
timeout=(31, 31),
)
subs.raise_for_status()
subs_formatted = []
for sku in subs.json()['body']:

View File

@@ -422,9 +422,6 @@ DISPATCHER_DB_DOWNTIME_TOLERANCE = 40
# sqlite3 based tests will use this
DISPATCHER_MOCK_PUBLISH = False
# Debugging sockfile for the --status command
DISPATCHERD_DEBUGGING_SOCKFILE = os.path.join(BASE_DIR, 'dispatcherd.sock')
BROKER_URL = 'unix:///var/run/redis/redis.sock'
CELERYBEAT_SCHEDULE = {
'tower_scheduler': {'task': 'awx.main.tasks.system.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}},
@@ -449,17 +446,6 @@ CELERYBEAT_SCHEDULE = {
},
}
DISPATCHER_SCHEDULE = {}
for options in CELERYBEAT_SCHEDULE.values():
new_options = options.copy()
task_name = options['task']
# Handle the only one exception case of the heartbeat which has a new implementation
if task_name == 'awx.main.tasks.system.cluster_node_heartbeat':
task_name = 'awx.main.tasks.system.adispatch_cluster_node_heartbeat'
new_options['task'] = task_name
new_options['schedule'] = options['schedule'].total_seconds()
DISPATCHER_SCHEDULE[task_name] = new_options
# Django Caching Configuration
DJANGO_REDIS_IGNORE_EXCEPTIONS = True
CACHES = {'default': {'BACKEND': 'awx.main.cache.AWXRedisCache', 'LOCATION': 'unix:///var/run/redis/redis.sock?db=1'}}
@@ -809,7 +795,6 @@ LOGGING = {
'social': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'},
'system_tracking_migrations': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'},
'rbac_migrations': {'handlers': ['console', 'file', 'tower_warnings'], 'level': 'DEBUG'},
'dispatcherd': {'handlers': ['dispatcher', 'console'], 'level': 'INFO'},
},
}
@@ -1009,7 +994,7 @@ HOST_METRIC_SUMMARY_TASK_INTERVAL = 7 # days
# projects can take advantage.
METRICS_SERVICE_CALLBACK_RECEIVER = 'callback_receiver'
METRICS_SERVICE_DISPATCHER = 'dispatcherd'
METRICS_SERVICE_DISPATCHER = 'dispatcher'
METRICS_SERVICE_WEBSOCKETS = 'websockets'
METRICS_SUBSYSTEM_CONFIG = {
@@ -1092,6 +1077,10 @@ INDIRECT_HOST_QUERY_FALLBACK_GIVEUP_DAYS = 3
# Older records will be cleaned up
INDIRECT_HOST_AUDIT_RECORD_MAX_AGE_DAYS = 7
# setting for Policy as Code feature
FEATURE_POLICY_AS_CODE_ENABLED = False
OPA_HOST = '' # The hostname used to connect to the OPA server. If empty, policy enforcement will be disabled.
OPA_PORT = 8181 # The port used to connect to the OPA server. Defaults to 8181.
OPA_SSL = False # Enable or disable the use of SSL to connect to the OPA server. Defaults to false.
@@ -1109,10 +1098,5 @@ OPA_REQUEST_RETRIES = 2 # The number of retry attempts for connecting to the OP
FLAG_SOURCES = ('flags.sources.SettingsFlagsSource',)
FLAGS = {
'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}],
'FEATURE_DISPATCHERD_ENABLED': [{'condition': 'boolean', 'value': False}],
'FEATURE_POLICY_AS_CODE_ENABLED': [{'condition': 'boolean', 'value': False}],
}
# Dispatcher worker lifetime. If set to None, workers will never be retired
# based on age. Note workers will finish their last task before retiring if
# they are busy when they reach retirement age.
WORKER_MAX_LIFETIME_SECONDS = 14400 # seconds

View File

@@ -73,5 +73,4 @@ AWX_DISABLE_TASK_MANAGERS = False
def set_dev_flags(settings):
defaults_flags = settings.get("FLAGS", {})
defaults_flags['FEATURE_INDIRECT_NODE_COUNTING_ENABLED'] = [{'condition': 'boolean', 'value': True}]
defaults_flags['FEATURE_DISPATCHERD_ENABLED'] = [{'condition': 'boolean', 'value': True}]
return {'FLAGS': defaults_flags}

View File

@@ -23,13 +23,8 @@ ALLOWED_HOSTS = []
# only used for deprecated fields and management commands for them
BASE_VENV_PATH = os.path.realpath("/var/lib/awx/venv")
# Switch to a writable location for the dispatcher sockfile location
DISPATCHERD_DEBUGGING_SOCKFILE = os.path.realpath('/var/run/tower/dispatcherd.sock')
# Very important that this is editable (not read_only) in the API
AWX_ISOLATION_SHOW_PATHS = [
'/etc/pki/ca-trust:/etc/pki/ca-trust:O',
'/usr/share/pki:/usr/share/pki:O',
]
del os

View File

@@ -87,7 +87,7 @@ ui/src/webpack: $(UI_DIR)/src/node_modules/webpack
## True target for ui/src/webpack.
$(UI_DIR)/src/node_modules/webpack:
@echo "=== Installing webpack ==="
@cd $(UI_DIR)/src && n 18 && npm install webpack
@cd $(UI_DIR)/src && npm install webpack
.PHONY: clean/ui
## Clean ui

View File

@@ -263,7 +263,3 @@ plugin_routing:
removal_date: '2022-01-23'
warning_text: The tower_* modules have been deprecated, use awx.awx.workflow_node_wait instead.
redirect: awx.awx.workflow_node_wait
role:
deprecation:
removal_version: '25.0.0'
warning_text: This is replaced by the DAB role system, via the role_definition module.

View File

@@ -19,28 +19,19 @@ options:
- If value not set, will try environment variable C(CONTROLLER_HOST) and then config files
- If value not specified by any means, the value of C(127.0.0.1) will be used
type: str
aliases: [ tower_host, aap_hostname ]
aliases: [ tower_host ]
controller_username:
description:
- Username for your controller instance.
- If value not set, will try environment variable C(CONTROLLER_USERNAME) and then config files
type: str
aliases: [ tower_username, aap_username ]
aliases: [ tower_username ]
controller_password:
description:
- Password for your controller instance.
- If value not set, will try environment variable C(CONTROLLER_PASSWORD) and then config files
type: str
aliases: [ tower_password , aap_password ]
aap_token:
description:
- The OAuth token to use.
- This value can be in one of two formats.
- A string which is the token itself. (i.e. bqV5txm97wqJqtkxlMkhQz0pKhRMMX)
- A dictionary structure as returned by the token module.
- If value not set, will try environment variable C(CONTROLLER_OAUTH_TOKEN) and then config files
type: raw
version_added: "3.7.0"
aliases: [ tower_password ]
validate_certs:
description:
- Whether to allow insecure connections to AWX.
@@ -48,13 +39,12 @@ options:
- This should only be used on personally controlled sites using self-signed certificates.
- If value not set, will try environment variable C(CONTROLLER_VERIFY_SSL) and then config files
type: bool
aliases: [ tower_verify_ssl, aap_validate_certs ]
aliases: [ tower_verify_ssl ]
request_timeout:
description:
- Specify the timeout Ansible should use in requests to the controller host.
- Defaults to 10s, but this is handled by the shared module_utils code
type: float
aliases: [ aap_request_timeout ]
controller_config_file:
description:
- Path to the controller config file.

View File

@@ -17,38 +17,32 @@ options:
description: The network address of your Automation Platform Controller host.
env:
- name: CONTROLLER_HOST
- name: TOWER_HOST
deprecated:
collection_name: 'awx.awx'
version: '4.0.0'
why: Collection name change
alternatives: 'TOWER_HOST, AAP_HOSTNAME'
alternatives: 'CONTROLLER_HOST'
username:
description: The user that you plan to use to access inventories on the controller.
env:
- name: CONTROLLER_USERNAME
- name: TOWER_USERNAME
deprecated:
collection_name: 'awx.awx'
version: '4.0.0'
why: Collection name change
alternatives: 'TOWER_USERNAME, AAP_USERNAME'
alternatives: 'CONTROLLER_USERNAME'
password:
description: The password for your controller user.
env:
- name: CONTROLLER_PASSWORD
- name: TOWER_PASSWORD
deprecated:
collection_name: 'awx.awx'
version: '4.0.0'
why: Collection name change
alternatives: 'TOWER_PASSWORD, AAP_PASSWORD'
aap_token:
description:
- The OAuth token to use.
env:
- name: AAP_TOKEN
deprecated:
collection_name: 'awx.awx'
version: '4.0.0'
why: Collection name change
alternatives: 'CONTROLLER_PASSWORD'
verify_ssl:
description:
- Specify whether Ansible should verify the SSL certificate of the controller host.
@@ -56,11 +50,12 @@ options:
type: bool
env:
- name: CONTROLLER_VERIFY_SSL
- name: TOWER_VERIFY_SSL
deprecated:
collection_name: 'awx.awx'
version: '4.0.0'
why: Collection name change
alternatives: 'TOWER_VERIFY_SSL, AAP_VALIDATE_CERTS'
alternatives: 'CONTROLLER_VERIFY_SSL'
aliases: [ validate_certs ]
request_timeout:
description:
@@ -70,12 +65,7 @@ options:
type: float
env:
- name: CONTROLLER_REQUEST_TIMEOUT
deprecated:
collection_name: 'awx.awx'
version: '4.0.0'
why: Support for AAP variables
alternatives: 'AAP_REQUEST_TIMEOUT'
aliases: [ aap_request_timeout ]
notes:
- If no I(config_file) is provided we will attempt to use the tower-cli library
defaults to find your host information.

View File

@@ -15,6 +15,7 @@ from ansible.module_utils.six.moves.configparser import ConfigParser, NoOptionEr
from base64 import b64encode
from socket import getaddrinfo, IPPROTO_TCP
import time
import re
from json import loads, dumps
from os.path import isfile, expanduser, split, join, exists, isdir
from os import access, R_OK, getcwd, environ, getenv
@@ -49,40 +50,12 @@ class ItemNotDefined(Exception):
class ControllerModule(AnsibleModule):
url = None
AUTH_ARGSPEC = dict(
controller_host=dict(
required=False,
aliases=['tower_host', 'aap_hostname'],
fallback=(env_fallback, ['CONTROLLER_HOST', 'TOWER_HOST', 'AAP_HOSTNAME'])),
controller_username=dict(
required=False,
aliases=['tower_username', 'aap_username'],
fallback=(env_fallback, ['CONTROLLER_USERNAME', 'TOWER_USERNAME', 'AAP_USERNAME'])),
controller_password=dict(
no_log=True,
aliases=['tower_password', 'aap_password'],
required=False,
fallback=(env_fallback, ['CONTROLLER_PASSWORD', 'TOWER_PASSWORD', 'AAP_PASSWORD'])),
validate_certs=dict(
type='bool',
aliases=['tower_verify_ssl', 'aap_validate_certs'],
required=False,
fallback=(env_fallback, ['CONTROLLER_VERIFY_SSL', 'TOWER_VERIFY_SSL', 'AAP_VALIDATE_CERTS'])),
request_timeout=dict(
type='float',
aliases=['aap_request_timeout'],
required=False,
fallback=(env_fallback, ['CONTROLLER_REQUEST_TIMEOUT', 'AAP_REQUEST_TIMEOUT'])),
aap_token=dict(
type='raw',
no_log=True,
required=False,
fallback=(env_fallback, ['CONTROLLER_OAUTH_TOKEN', 'TOWER_OAUTH_TOKEN', 'AAP_TOKEN'])
),
controller_config_file=dict(
type='path',
aliases=['tower_config_file'],
required=False,
default=None),
controller_host=dict(required=False, aliases=['tower_host'], fallback=(env_fallback, ['CONTROLLER_HOST', 'TOWER_HOST'])),
controller_username=dict(required=False, aliases=['tower_username'], fallback=(env_fallback, ['CONTROLLER_USERNAME', 'TOWER_USERNAME'])),
controller_password=dict(no_log=True, aliases=['tower_password'], required=False, fallback=(env_fallback, ['CONTROLLER_PASSWORD', 'TOWER_PASSWORD'])),
validate_certs=dict(type='bool', aliases=['tower_verify_ssl'], required=False, fallback=(env_fallback, ['CONTROLLER_VERIFY_SSL', 'TOWER_VERIFY_SSL'])),
request_timeout=dict(type='float', required=False, fallback=(env_fallback, ['CONTROLLER_REQUEST_TIMEOUT'])),
controller_config_file=dict(type='path', aliases=['tower_config_file'], required=False, default=None),
)
# Associations of these types are ordered and have special consideration in the modified associations function
ordered_associations = ['instance_groups', 'galaxy_credentials', 'input_inventories']
@@ -133,7 +106,7 @@ class ControllerModule(AnsibleModule):
setattr(self, short_param, direct_value)
# Perform some basic validation
if not self.host.startswith(("https://", "http://")): # NOSONAR
if not re.match('^https{0,1}://', self.host):
self.host = "https://{0}".format(self.host)
# Try to parse the hostname as a url

View File

@@ -230,11 +230,7 @@ def main():
inventory_object = module.get_one('inventories', name_or_id=inventory, data=lookup_data)
if not inventory_object:
# if the inventory does not exist, then it can't have sources.
if state == 'absent':
module.exit_json(**module.json_output)
else:
module.fail_json(msg='The specified inventory, {0}, was not found.'.format(lookup_data))
module.fail_json(msg='The specified inventory, {0}, was not found.'.format(lookup_data))
inventory_source_object = module.get_one(
'inventory_sources',

View File

@@ -17,10 +17,6 @@ DOCUMENTATION = '''
module: role
author: "Wayne Witzel III (@wwitzel3)"
short_description: grant or revoke an Automation Platform Controller role.
deprecated:
removed_in: '25.0.0'
why: Endpoints corresponding to module will be removed in the API
alternative: Use the role_user_assignment and role_team_assignment modules instead.
description:
- Roles are used for access control, this module is for managing user access to server resources.
- Grant or revoke Automation Platform Controller roles to users. See U(https://www.ansible.com/tower) for an overview.

View File

@@ -166,7 +166,7 @@ EXAMPLES = '''
name: "{{ sched1 }}"
state: present
unified_job_template: "Demo Job Template"
rrule: "{{ query('awx.awx.schedule_rrule', 'week', start_date='2019-12-19 13:05:51') | first }}"
rrule: "{{ query('awx.awx.schedule_rrule', 'week', start_date='2019-12-19 13:05:51') }}"
register: result
- name: Build a complex schedule for every day except sunday using the rruleset plugin
@@ -174,14 +174,14 @@ EXAMPLES = '''
name: "{{ sched1 }}"
state: present
unified_job_template: "Demo Job Template"
rrule: "{{ query(awx.awx.schedule_rruleset, '2022-04-30 10:30:45', rules=rrules, timezone='UTC' ) | first }}"
rrule: "{{ query(awx.awx.schedule_rruleset, '2022-04-30 10:30:45', rules=rrules, timezone='UTC' ) }}"
vars:
rrules:
- frequency: 'day'
interval: 1
every: 1
- frequency: 'day'
interval: 1
byweekday: 'sunday'
every: 1
on_days: 'sunday'
include: false
- name: Delete 'my_schedule' schedule for my_workflow

View File

@@ -320,13 +320,9 @@ def main():
wfjt_search_fields['organization'] = organization_id
wfjt_data = module.get_one('workflow_job_templates', name_or_id=workflow_job_template, **{'data': wfjt_search_fields})
if wfjt_data is None:
if state == 'absent':
# if the workflow doesn't exist, it can't have workflow nodes.
module.exit_json(**module.json_output)
else:
module.fail_json(
msg="The workflow {0} in organization {1} was not found on the controller instance server".format(workflow_job_template, organization)
)
module.fail_json(
msg="The workflow {0} in organization {1} was not found on the controller instance server".format(workflow_job_template, organization)
)
workflow_job_template_id = wfjt_data['id']
search_fields['workflow_job_template'] = new_fields['workflow_job_template'] = workflow_job_template_id

View File

@@ -93,9 +93,6 @@ needs_development = ['inventory_script', 'instance']
needs_param_development = {
'host': ['instance_id'],
'workflow_approval': ['description', 'execution_environment'],
'inventory': ['opa_query_path'],
'job_template': ['opa_query_path'],
'organization': ['opa_query_path'],
}
# -----------------------------------------------------------------------------------------------------------

View File

@@ -121,22 +121,6 @@
that:
- "result is changed"
- name: Attempt to delete an inventory source from an inventory that does not exist
inventory_source:
name: "{{ inv_source3 }}"
source: scm
state: absent
source_project: "{{ project_name }}"
source_path: inventories/create_100_hosts.ini
description: Source for Test inventory
organization: Default
inventory: Does not exist
register: result
- assert:
that:
- "result is not changed"
always:
- name: Delete Inventory
inventory:

View File

@@ -1,12 +0,0 @@
---
- name: Perform an action with a different hostname via aap_hostname
inventory:
name: "Demo Inventory"
organization: Default
aap_hostname: https://foohostbar.invalid
ignore_errors: true
register: result
- assert:
that:
- "'foohostbar' in result.msg"

View File

@@ -428,18 +428,6 @@
that:
- "results is changed"
- name: Remove a node from a workflow that does not exist.
workflow_job_template_node:
identifier: root
unified_job_template: "{{ jt1_name }}"
workflow: Does not exist
state: absent
register: results
- assert:
that:
- "results is not changed"
- name: Create root node
workflow_job_template_node:
identifier: root

View File

@@ -40,6 +40,14 @@ class HelpfulArgumentParser(ArgumentParser):
self._print_message('\n')
self.exit(2, '%s: %s\n' % (self.prog, message))
def _parse_known_args(self, args, ns):
for arg in ('-h', '--help'):
# the -h argument is extraneous; if you leave it off,
# awx-cli will just print usage info
if arg in args:
args.remove(arg)
return super(HelpfulArgumentParser, self)._parse_known_args(args, ns)
def color_enabled():
return _color.enabled

View File

@@ -20,20 +20,6 @@ In this document, we will go into a bit of detail about how and when AWX runs Py
- Every node in an AWX cluster runs a periodic task that serves as
a heartbeat and capacity check
Transition to dispatcherd Library
---------------------------------
The task system logic is being split out into a new library:
https://github.com/ansible/dispatcherd
Currently AWX is in a transitionary period where this is put behind a feature flag.
The difference can be seen in how the task decorator is imported.
- old `from awx.main.dispatch.publish import task`
- transition `from awx.main.dispatch.publish import task as task_awx`
- new `from dispatcherd.publish import task`
Tasks, Queues and Workers
----------------
@@ -74,7 +60,7 @@ Defining and Running Tasks
Tasks are defined in AWX's source code, and generally live in the
`awx.main.tasks` module. Tasks can be defined as simple functions:
from awx.main.dispatch.publish import task as task_awx
from awx.main.dispatch.publish import task
@task()
def add(a, b):

View File

@@ -1,202 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Binary file not shown.

Binary file not shown.

View File

@@ -14,7 +14,7 @@ cryptography<42.0.0 # investigation is needed for 42+ to work with OpenSSL v3.0
Cython
daphne
distro
django==4.2.21 # CVE-2025-32873
django==4.2.20 # CVE-2025-26699
django-cors-headers
django-crum
django-extensions
@@ -70,4 +70,3 @@ setuptools_scm[toml] # see UPGRADE BLOCKERs, xmlsec build dep
setuptools-rust>=0.11.4 # cryptography build dep
pkgconfig>=1.5.1 # xmlsec build dep - needed for offline build
django-flags>=5.0.13
dispatcherd # tasking system, previously part of AWX code base

View File

@@ -128,11 +128,9 @@ deprecated==1.2.15
# opentelemetry-exporter-otlp-proto-http
# opentelemetry-semantic-conventions
# pygithub
dispatcherd==2025.5.21
# via -r /awx_devel/requirements/requirements.in
distro==1.9.0
# via -r /awx_devel/requirements/requirements.in
django==4.2.21
django==4.2.20
# via
# -r /awx_devel/requirements/requirements.in
# channels
@@ -368,7 +366,7 @@ protobuf==5.29.3
# opentelemetry-proto
psutil==6.1.1
# via -r /awx_devel/requirements/requirements.in
psycopg==3.2.6
psycopg==3.2.3
# via -r /awx_devel/requirements/requirements.in
ptyprocess==0.7.0
# via pexpect
@@ -427,7 +425,6 @@ pyyaml==6.0.2
# via
# -r /awx_devel/requirements/requirements.in
# ansible-runner
# dispatcherd
# djangorestframework-yaml
# kubernetes
# receptorctl

View File

@@ -1,12 +1,12 @@
#!/bin/bash
set -euo pipefail
set +x
cd /awx_devel
make clean
make awx-link
if [[ $# -eq 0 ]]; then
if [[ ! $@ ]]; then
make test
else
make "$@"
make $@
fi