Compare commits

..

16 Commits

Author SHA1 Message Date
thedoubl3j
0377b3830b Update operator timeout
* updated the operator timeout to near healthy run time
2026-01-23 10:39:55 -05:00
Jake Jackson
331ae92475 Merge branch 'devel' into move_to_dispatcherd 2026-01-23 10:06:30 -05:00
Jake Jackson
e355df6cc6 Merge branch 'devel' into move_to_dispatcherd 2026-01-22 11:03:45 -05:00
thedoubl3j
806ef7c345 Fix attribute error in server logs
* on a secret hunt to find the hidden attribute error in the server logs
2026-01-20 16:08:46 -05:00
thedoubl3j
8acdd0cbf4 Fix imports and linter findings
* add back more missing things
2026-01-20 15:41:33 -05:00
thedoubl3j
381c7fdc5d Adjust heartbeat arg and more formatting
* fixed the call to cluster_node_heartbeat missing binder
* formatting/linter fixes
2026-01-20 15:21:23 -05:00
thedoubl3j
d75fcc13f6 Fix dispatcher run call and remove dispatch settin
* added back some code that was lost in the merge conflict
* remove dispatcher mock publish setting
2026-01-20 14:38:54 -05:00
thedoubl3j
bb8ecc5919 Add back hazmat for config and remove baseworker
* added back hazmat per @alancoding feedback around config
* removed baseworker completely and refactored it into the callback
  worker
2026-01-19 20:33:23 -05:00
thedoubl3j
1019ac0439 Update function comments 2026-01-19 20:30:41 -05:00
thedoubl3j
cddee29f23 More chainsaw work
* fixed imports and addressed clusternode heartbeat test
* took a chainsaw to task.py as well
2026-01-19 20:30:41 -05:00
thedoubl3j
3b896a00a9 Clean up imports and fix some tests
* removed unused imports
* adjusted test import to pull correct method
2026-01-19 20:30:41 -05:00
thedoubl3j
e386326498 Remove control and hazmat (squash this not done)
* moved status out and deleted control as no longer needed
* removed hazmat
2026-01-19 20:30:41 -05:00
thedoubl3j
5209bfcf82 add back auto_max_workers
* added back get_auto_max_workers into common utils
* formatting edits
2026-01-19 20:30:07 -05:00
thedoubl3j
ebd51cd074 Keep callback receiver working
* remove any code that is not used by the call back receiver
2026-01-19 20:26:04 -05:00
thedoubl3j
f9f4bf2d1a Add decorator
* moved to dispatcher decorator
* updated as many as I could find
2026-01-19 20:26:04 -05:00
thedoubl3j
e55578b64e WIP First pass
* started removing feature flags and adjusting logic
* WIP
2026-01-19 20:26:04 -05:00
76 changed files with 427 additions and 1494 deletions

View File

@@ -112,27 +112,25 @@ jobs:
path: reports/coverage.xml
retention-days: 5
- name: >-
Upload ${{
matrix.tests.coverage-upload-name || 'awx'
}} jUnit test reports to the unified dashboard
- name: Upload awx jUnit test reports
if: >-
!cancelled()
&& steps.make-run.outputs.test-result-files != ''
&& github.event_name == 'push'
&& env.UPSTREAM_REPOSITORY_ID == github.repository_id
&& github.ref_name == github.event.repository.default_branch
uses: ansible/gh-action-record-test-results@cd5956ead39ec66351d0779470c8cff9638dd2b8
with:
aggregation-server-url: ${{ vars.PDE_ORG_RESULTS_AGGREGATOR_UPLOAD_URL }}
http-auth-password: >-
${{ secrets.PDE_ORG_RESULTS_UPLOAD_PASSWORD }}
http-auth-username: >-
${{ vars.PDE_ORG_RESULTS_AGGREGATOR_UPLOAD_USER }}
project-component-name: >-
${{ matrix.tests.coverage-upload-name || 'awx' }}
test-result-files: >-
${{ steps.make-run.outputs.test-result-files }}
run: |
for junit_file in $(echo '${{ steps.make-run.outputs.test-result-files }}' | sed 's/,/ /')
do
curl \
-v \
--user "${{ vars.PDE_ORG_RESULTS_AGGREGATOR_UPLOAD_USER }}:${{ secrets.PDE_ORG_RESULTS_UPLOAD_PASSWORD }}" \
--form "xunit_xml=@${junit_file}" \
--form "component_name=${{ matrix.tests.coverage-upload-name || 'awx' }}" \
--form "git_commit_sha=${{ github.sha }}" \
--form "git_repository_url=https://github.com/${{ github.repository }}" \
"${{ vars.PDE_ORG_RESULTS_AGGREGATOR_UPLOAD_URL }}/api/results/upload/"
done
dev-env:
runs-on: ubuntu-latest
@@ -296,16 +294,18 @@ jobs:
&& github.event_name == 'push'
&& env.UPSTREAM_REPOSITORY_ID == github.repository_id
&& github.ref_name == github.event.repository.default_branch
uses: ansible/gh-action-record-test-results@cd5956ead39ec66351d0779470c8cff9638dd2b8
with:
aggregation-server-url: ${{ vars.PDE_ORG_RESULTS_AGGREGATOR_UPLOAD_URL }}
http-auth-password: >-
${{ secrets.PDE_ORG_RESULTS_UPLOAD_PASSWORD }}
http-auth-username: >-
${{ vars.PDE_ORG_RESULTS_AGGREGATOR_UPLOAD_USER }}
project-component-name: awx
test-result-files: >-
${{ steps.make-run.outputs.test-result-files }}
run: |
for junit_file in $(echo '${{ steps.make-run.outputs.test-result-files }}' | sed 's/,/ /')
do
curl \
-v \
--user "${{ vars.PDE_ORG_RESULTS_AGGREGATOR_UPLOAD_USER }}:${{ secrets.PDE_ORG_RESULTS_UPLOAD_PASSWORD }}" \
--form "xunit_xml=@${junit_file}" \
--form "component_name=awx" \
--form "git_commit_sha=${{ github.sha }}" \
--form "git_repository_url=https://github.com/${{ github.repository }}" \
"${{ vars.PDE_ORG_RESULTS_AGGREGATOR_UPLOAD_URL }}/api/results/upload/"
done
collection-integration:
name: awx_collection integration

View File

@@ -1,6 +1,6 @@
-include awx/ui/Makefile
PYTHON := $(notdir $(shell for i in python3.12 python3.11 python3; do command -v $$i; done|sed 1q))
PYTHON := $(notdir $(shell for i in python3.12 python3; do command -v $$i; done|sed 1q))
SHELL := bash
DOCKER_COMPOSE ?= docker compose
OFFICIAL ?= no
@@ -289,7 +289,7 @@ dispatcher:
@if [ "$(VENV_BASE)" ]; then \
. $(VENV_BASE)/awx/bin/activate; \
fi; \
$(PYTHON) manage.py dispatcherd
$(PYTHON) manage.py run_dispatcher
## Run to start the zeromq callback receiver
receiver:

View File

@@ -111,7 +111,7 @@ class UnifiedJobEventPagination(Pagination):
def __init__(self, *args, **kwargs):
self.use_limit_paginator = False
self.limit_pagination = LimitPagination()
super().__init__(*args, **kwargs)
return super().__init__(*args, **kwargs)
def paginate_queryset(self, queryset, request, view=None):
if 'limit' in request.query_params:

View File

@@ -9,50 +9,6 @@ from drf_spectacular.views import (
)
def filter_credential_type_schema(
result,
generator, # NOSONAR
request, # NOSONAR
public, # NOSONAR
):
"""
Postprocessing hook to filter CredentialType kind enum values.
For CredentialTypeRequest and PatchedCredentialTypeRequest schemas (POST/PUT/PATCH),
filter the 'kind' enum to only show 'cloud' and 'net' values.
This ensures the OpenAPI schema accurately reflects that only 'cloud' and 'net'
credential types can be created or modified via the API, matching the validation
in CredentialTypeSerializer.validate().
Args:
result: The OpenAPI schema dict to be modified
generator, request, public: Required by drf-spectacular interface (unused)
Returns:
The modified OpenAPI schema dict
"""
schemas = result.get('components', {}).get('schemas', {})
# Filter CredentialTypeRequest (POST/PUT) - field is required
if 'CredentialTypeRequest' in schemas:
kind_prop = schemas['CredentialTypeRequest'].get('properties', {}).get('kind', {})
if 'enum' in kind_prop:
# Filter to only cloud and net (no None - field is required)
kind_prop['enum'] = ['cloud', 'net']
kind_prop['description'] = "* `cloud` - Cloud\\n* `net` - Network"
# Filter PatchedCredentialTypeRequest (PATCH) - field is optional
if 'PatchedCredentialTypeRequest' in schemas:
kind_prop = schemas['PatchedCredentialTypeRequest'].get('properties', {}).get('kind', {})
if 'enum' in kind_prop:
# Filter to only cloud and net (None allowed - field can be omitted in PATCH)
kind_prop['enum'] = ['cloud', 'net', None]
kind_prop['description'] = "* `cloud` - Cloud\\n* `net` - Network"
return result
class CustomAutoSchema(AutoSchema):
"""Custom AutoSchema to add swagger_topic to tags and handle deprecated endpoints."""

View File

@@ -1230,7 +1230,7 @@ class OrganizationSerializer(BaseSerializer, OpaQueryPathMixin):
# to a team. This provides a hint to the ui so it can know to not
# display these roles for team role selection.
for key in ('admin_role', 'member_role'):
if summary_dict and key in summary_dict.get('object_roles', {}):
if key in summary_dict.get('object_roles', {}):
summary_dict['object_roles'][key]['user_only'] = True
return summary_dict
@@ -2165,13 +2165,13 @@ class BulkHostDeleteSerializer(serializers.Serializer):
attrs['hosts_data'] = attrs['host_qs'].values()
if len(attrs['host_qs']) == 0:
error_hosts = dict.fromkeys(attrs['hosts'], "Hosts do not exist or you lack permission to delete it")
error_hosts = {host: "Hosts do not exist or you lack permission to delete it" for host in attrs['hosts']}
raise serializers.ValidationError({'hosts': error_hosts})
if len(attrs['host_qs']) < len(attrs['hosts']):
hosts_exists = [host['id'] for host in attrs['hosts_data']]
failed_hosts = list(set(attrs['hosts']).difference(hosts_exists))
error_hosts = dict.fromkeys(failed_hosts, "Hosts do not exist or you lack permission to delete it")
error_hosts = {host: "Hosts do not exist or you lack permission to delete it" for host in failed_hosts}
raise serializers.ValidationError({'hosts': error_hosts})
# Getting all inventories that the hosts can be in
@@ -3527,7 +3527,7 @@ class JobRelaunchSerializer(BaseSerializer):
choices=NEW_JOB_TYPE_CHOICES,
write_only=True,
)
credential_passwords = VerbatimField(required=False, write_only=True)
credential_passwords = VerbatimField(required=True, write_only=True)
class Meta:
model = Job

View File

@@ -1,6 +1,6 @@
{% if content_only %}<div class="nocode ansi_fore ansi_back{% if dark %} ansi_dark{% endif %}">{% else %}
<!DOCTYPE HTML>
<html lang="en">
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8">
<title>{{ title }}</title>

View File

@@ -52,7 +52,6 @@ from ansi2html import Ansi2HTMLConverter
from datetime import timezone as dt_timezone
from wsgiref.util import FileWrapper
from drf_spectacular.utils import extend_schema_view, extend_schema
# django-ansible-base
from ansible_base.lib.utils.requests import get_remote_hosts
@@ -379,10 +378,6 @@ class DashboardJobsGraphView(APIView):
class InstanceList(ListCreateAPIView):
"""
Creates an instance if used on a Kubernetes or OpenShift deployment of Ansible Automation Platform.
"""
name = _("Instances")
model = models.Instance
serializer_class = serializers.InstanceSerializer
@@ -1459,7 +1454,7 @@ class CredentialList(ListCreateAPIView):
@extend_schema_if_available(
extensions={
"x-ai-description": "Create a new credential. The `inputs` field contain type-specific input fields. The required fields depend on related `credential_type`. Use GET /v2/credential_types/{id}/ (tool name: controller.credential_types_retrieve) and inspect `inputs` field for the specific credential type's expected schema. The fields `user` and `team` are deprecated and should not be included in the payload."
"x-ai-description": "Create a new credential. The `inputs` field contain type-specific input fields. The required fields depend on related `credential_type`. Use GET /v2/credential_types/{id}/ (tool name: controller.credential_types_retrieve) and inspect `inputs` field for the specific credential type's expected schema."
}
)
def post(self, request, *args, **kwargs):
@@ -1608,11 +1603,7 @@ class CredentialExternalTest(SubDetailAPIView):
obj_permission_type = 'use'
resource_purpose = 'test external credential'
@extend_schema_if_available(extensions={"x-ai-description": """Test update the input values and metadata of an external credential.
This endpoint supports testing credentials that connect to external secret management systems
such as CyberArk AIM, CyberArk Conjur, HashiCorp Vault, AWS Secrets Manager, Azure Key Vault,
Centrify Vault, Thycotic DevOps Secrets Vault, and GitHub App Installation Access Token Lookup.
It does not support standard credential types such as Machine, SCM, and Cloud."""})
@extend_schema_if_available(extensions={"x-ai-description": "Test update the input values and metadata of an external credential"})
def post(self, request, *args, **kwargs):
obj = self.get_object()
backend_kwargs = {}
@@ -1626,16 +1617,13 @@ class CredentialExternalTest(SubDetailAPIView):
with set_environ(**settings.AWX_TASK_ENV):
obj.credential_type.plugin.backend(**backend_kwargs)
return Response({}, status=status.HTTP_202_ACCEPTED)
except requests.exceptions.HTTPError:
message = """Test operation is not supported for credential type {}.
This endpoint only supports credentials that connect to
external secret management systems such as CyberArk, HashiCorp
Vault, or cloud-based secret managers.""".format(obj.credential_type.kind)
return Response({'detail': message}, status=status.HTTP_400_BAD_REQUEST)
except requests.exceptions.HTTPError as exc:
message = 'HTTP {}'.format(exc.response.status_code)
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
except Exception as exc:
message = exc.__class__.__name__
exc_args = getattr(exc, 'args', [])
for a in exc_args:
args = getattr(exc, 'args', [])
for a in args:
if isinstance(getattr(a, 'reason', None), ConnectTimeoutError):
message = str(a.reason)
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
@@ -1693,8 +1681,8 @@ class CredentialTypeExternalTest(SubDetailAPIView):
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
except Exception as exc:
message = exc.__class__.__name__
args_exc = getattr(exc, 'args', [])
for a in args_exc:
args = getattr(exc, 'args', [])
for a in args:
if isinstance(getattr(a, 'reason', None), ConnectTimeoutError):
message = str(a.reason)
return Response({'inputs': message}, status=status.HTTP_400_BAD_REQUEST)
@@ -2481,11 +2469,6 @@ class JobTemplateDetail(RelatedJobsPreventDeleteMixin, RetrieveUpdateDestroyAPIV
resource_purpose = 'job template detail'
@extend_schema_view(
retrieve=extend_schema(
extensions={'x-ai-description': 'List job template launch criteria'},
)
)
class JobTemplateLaunch(RetrieveAPIView):
model = models.JobTemplate
obj_permission_type = 'start'
@@ -2494,9 +2477,6 @@ class JobTemplateLaunch(RetrieveAPIView):
resource_purpose = 'launch a job from a job template'
def update_raw_data(self, data):
"""
Use the ID of a job template to retrieve its launch details.
"""
try:
obj = self.get_object()
except PermissionDenied:
@@ -3330,11 +3310,6 @@ class WorkflowJobTemplateLabelList(JobTemplateLabelList):
resource_purpose = 'labels of a workflow job template'
@extend_schema_view(
retrieve=extend_schema(
extensions={'x-ai-description': 'List workflow job template launch criteria.'},
)
)
class WorkflowJobTemplateLaunch(RetrieveAPIView):
model = models.WorkflowJobTemplate
obj_permission_type = 'start'
@@ -3343,9 +3318,6 @@ class WorkflowJobTemplateLaunch(RetrieveAPIView):
resource_purpose = 'launch a workflow job from a workflow job template'
def update_raw_data(self, data):
"""
Use the ID of a workflow job template to retrieve its launch details.
"""
try:
obj = self.get_object()
except PermissionDenied:
@@ -3738,11 +3710,6 @@ class JobCancel(GenericCancelView):
return super().post(request, *args, **kwargs)
@extend_schema_view(
retrieve=extend_schema(
extensions={'x-ai-description': 'List job relaunch criteria'},
)
)
class JobRelaunch(RetrieveAPIView):
model = models.Job
obj_permission_type = 'start'
@@ -3750,7 +3717,6 @@ class JobRelaunch(RetrieveAPIView):
resource_purpose = 'relaunch a job'
def update_raw_data(self, data):
"""Use the ID of a job to retrieve data on retry attempts and necessary passwords."""
data = super(JobRelaunch, self).update_raw_data(data)
try:
obj = self.get_object()

View File

@@ -133,7 +133,7 @@ class WebhookReceiverBase(APIView):
@csrf_exempt
@extend_schema_if_available(extensions={"x-ai-description": "Receive a webhook event and trigger a job"})
def post(self, request, *args, **kwargs_in):
def post(self, request, *args, **kwargs):
# Ensure that the full contents of the request are captured for multiple uses.
request.body

View File

@@ -1,41 +0,0 @@
import http.client
import socket
import urllib.error
import urllib.request
import logging
from django.conf import settings
logger = logging.getLogger(__name__)
def get_dispatcherd_metrics(request):
metrics_cfg = settings.METRICS_SUBSYSTEM_CONFIG.get('server', {}).get(settings.METRICS_SERVICE_DISPATCHER, {})
host = metrics_cfg.get('host', 'localhost')
port = metrics_cfg.get('port', 8015)
metrics_filter = []
if request is not None and hasattr(request, "query_params"):
try:
nodes_filter = request.query_params.getlist("node")
except Exception:
nodes_filter = []
if nodes_filter and settings.CLUSTER_HOST_ID not in nodes_filter:
return ''
try:
metrics_filter = request.query_params.getlist("metric")
except Exception:
metrics_filter = []
if metrics_filter:
# Right now we have no way of filtering the dispatcherd metrics
# so just avoid getting in the way if another metric is filtered for
return ''
url = f"http://{host}:{port}/metrics"
try:
with urllib.request.urlopen(url, timeout=1.0) as response:
payload = response.read()
if not payload:
return ''
return payload.decode('utf-8')
except (urllib.error.URLError, UnicodeError, socket.timeout, TimeoutError, http.client.HTTPException) as exc:
logger.debug(f"Failed to collect dispatcherd metrics from {url}: {exc}")
return ''

View File

@@ -15,7 +15,6 @@ from rest_framework.request import Request
from awx.main.consumers import emit_channel_notification
from awx.main.utils import is_testing
from awx.main.utils.redis import get_redis_client
from .dispatcherd_metrics import get_dispatcherd_metrics
root_key = settings.SUBSYSTEM_METRICS_REDIS_KEY_PREFIX
logger = logging.getLogger('awx.main.analytics')
@@ -399,6 +398,11 @@ class DispatcherMetrics(Metrics):
SetFloatM('workflow_manager_recorded_timestamp', 'Unix timestamp when metrics were last recorded'),
SetFloatM('workflow_manager_spawn_workflow_graph_jobs_seconds', 'Time spent spawning workflow tasks'),
SetFloatM('workflow_manager_get_tasks_seconds', 'Time spent loading workflow tasks from db'),
# dispatcher subsystem metrics
SetIntM('dispatcher_pool_scale_up_events', 'Number of times local dispatcher scaled up a worker since startup'),
SetIntM('dispatcher_pool_active_task_count', 'Number of active tasks in the worker pool when last task was submitted'),
SetIntM('dispatcher_pool_max_worker_count', 'Highest number of workers in worker pool in last collection interval, about 20s'),
SetFloatM('dispatcher_availability', 'Fraction of time (in last collection interval) dispatcher was able to receive messages'),
]
def __init__(self, *args, **kwargs):
@@ -426,12 +430,8 @@ class CallbackReceiverMetrics(Metrics):
def metrics(request):
output_text = ''
output_text += DispatcherMetrics().generate_metrics(request)
output_text += CallbackReceiverMetrics().generate_metrics(request)
dispatcherd_metrics = get_dispatcherd_metrics(request)
if dispatcherd_metrics:
output_text += dispatcherd_metrics
for m in [DispatcherMetrics(), CallbackReceiverMetrics()]:
output_text += m.generate_metrics(request)
return output_text
@@ -481,6 +481,13 @@ class CallbackReceiverMetricsServer(MetricsServer):
super().__init__(settings.METRICS_SERVICE_CALLBACK_RECEIVER, registry)
class DispatcherMetricsServer(MetricsServer):
def __init__(self):
registry = CollectorRegistry(auto_describe=True)
registry.register(CustomToPrometheusMetricsCollector(DispatcherMetrics(metrics_have_changed=False)))
super().__init__(settings.METRICS_SERVICE_DISPATCHER, registry)
class WebsocketsMetricsServer(MetricsServer):
def __init__(self):
registry = CollectorRegistry(auto_describe=True)

View File

@@ -82,7 +82,7 @@ class MainConfig(AppConfig):
def configure_dispatcherd(self):
"""This implements the default configuration for dispatcherd
If running the tasking service like awx-manage dispatcherd,
If running the tasking service like awx-manage run_dispatcher,
some additional config will be applied on top of this.
This configuration provides the minimum such that code can submit
tasks to pg_notify to run those tasks.

View File

@@ -30,7 +30,7 @@ def get_dispatcherd_config(for_service: bool = False, mock_publish: bool = False
},
"main_kwargs": {"node_id": settings.CLUSTER_HOST_ID},
"process_manager_cls": "ForkServerManager",
"process_manager_kwargs": {"preload_modules": ['awx.main.dispatch.prefork']},
"process_manager_kwargs": {"preload_modules": ['awx.main.dispatch.hazmat']},
},
"brokers": {},
"publish": {},
@@ -38,8 +38,8 @@ def get_dispatcherd_config(for_service: bool = False, mock_publish: bool = False
}
if mock_publish:
config["brokers"]["dispatcherd.testing.brokers.noop"] = {}
config["publish"]["default_broker"] = "dispatcherd.testing.brokers.noop"
config["brokers"]["noop"] = {}
config["publish"]["default_broker"] = "noop"
else:
config["brokers"]["pg_notify"] = {
"config": get_pg_notify_params(),
@@ -56,11 +56,5 @@ def get_dispatcherd_config(for_service: bool = False, mock_publish: bool = False
}
config["brokers"]["pg_notify"]["channels"] = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
metrics_cfg = settings.METRICS_SUBSYSTEM_CONFIG.get('server', {}).get(settings.METRICS_SERVICE_DISPATCHER)
if metrics_cfg:
config["service"]["metrics_kwargs"] = {
"host": metrics_cfg.get("host", "localhost"),
"port": metrics_cfg.get("port", 8015),
}
return config

View File

@@ -18,7 +18,7 @@ django.setup() # noqa
from django.conf import settings
# Preload all periodic tasks so their imports will be in shared memory
for name, options in settings.DISPATCHER_SCHEDULE.items():
for name, options in settings.CELERYBEAT_SCHEDULE.items():
resolve_callable(options['task'])

View File

@@ -1,4 +1,6 @@
import logging
import os
import time
from multiprocessing import Process
@@ -13,12 +15,13 @@ class PoolWorker(object):
"""
A simple wrapper around a multiprocessing.Process that tracks a worker child process.
The worker process runs the provided target function.
The worker process runs the provided target function and tracks its creation time.
"""
def __init__(self, target, args):
def __init__(self, target, args, **kwargs):
self.process = Process(target=target, args=args)
self.process.daemon = True
self.creation_time = time.monotonic()
def start(self):
self.process.start()
@@ -35,20 +38,44 @@ class WorkerPool(object):
pool = WorkerPool(workers_num=4) # spawn four worker processes
"""
def __init__(self, workers_num=None):
self.workers_num = workers_num or settings.JOB_EVENT_WORKERS
pool_cls = PoolWorker
debug_meta = ''
def init_workers(self, target):
def __init__(self, workers_num=None):
self.name = settings.CLUSTER_HOST_ID
self.pid = os.getpid()
self.workers_num = workers_num or settings.JOB_EVENT_WORKERS
self.workers = []
def __len__(self):
return len(self.workers)
def init_workers(self, target, *target_args):
self.target = target
self.target_args = target_args
for idx in range(self.workers_num):
# It's important to close these because we're _about_ to fork, and we
# don't want the forked processes to inherit the open sockets
# for the DB and cache connections (that way lies race conditions)
django_connection.close()
django_cache.close()
worker = PoolWorker(target, (idx,))
try:
worker.start()
except Exception:
logger.exception('could not fork')
else:
logger.debug('scaling up worker pid:{}'.format(worker.process.pid))
self.up()
def up(self):
idx = len(self.workers)
# It's important to close these because we're _about_ to fork, and we
# don't want the forked processes to inherit the open sockets
# for the DB and cache connections (that way lies race conditions)
django_connection.close()
django_cache.close()
worker = self.pool_cls(self.target, (idx,) + self.target_args)
self.workers.append(worker)
try:
worker.start()
except Exception:
logger.exception('could not fork')
else:
logger.debug('scaling up worker pid:{}'.format(worker.process.pid))
return idx, worker
def stop(self, signum):
try:
for worker in self.workers:
os.kill(worker.pid, signum)
except Exception:
logger.exception('could not kill {}'.format(worker.pid))

View File

@@ -1,6 +1,9 @@
from datetime import timedelta
import logging
from django.db.models import Q
from django.conf import settings
from django.utils.timezone import now as tz_now
from django.contrib.contenttypes.models import ContentType
from awx.main.models import Instance, UnifiedJob, WorkflowJob
@@ -47,6 +50,26 @@ def reap_job(j, status, job_explanation=None):
logger.error(f'{j.log_format} is no longer {status_before}; reaping')
def reap_waiting(instance=None, status='failed', job_explanation=None, grace_period=None, excluded_uuids=None, ref_time=None):
"""
Reap all jobs in waiting for this instance.
"""
if grace_period is None:
grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT
if instance is None:
hostname = Instance.objects.my_hostname()
else:
hostname = instance.hostname
if ref_time is None:
ref_time = tz_now()
jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=ref_time - timedelta(seconds=grace_period), controller_node=hostname)
if excluded_uuids:
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
for j in jobs:
reap_job(j, status, job_explanation=job_explanation)
def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=None, ref_time=None):
"""
Reap all jobs in running for this instance.

View File

@@ -19,24 +19,49 @@ def signame(sig):
return dict((k, v) for v, k in signal.__dict__.items() if v.startswith('SIG') and not v.startswith('SIG_'))[sig]
class AWXConsumerRedis(object):
class WorkerSignalHandler:
def __init__(self):
self.kill_now = False
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, self.exit_gracefully)
def exit_gracefully(self, *args, **kwargs):
self.kill_now = True
class AWXConsumerBase(object):
last_stats = time.time()
def __init__(self, name, worker, queues=[], pool=None):
self.should_stop = False
def __init__(self, name, worker):
self.name = name
self.pool = WorkerPool()
self.pool.init_workers(worker.work_loop)
self.total_messages = 0
self.queues = queues
self.worker = worker
self.pool = pool
if pool is None:
self.pool = WorkerPool()
self.pool.init_workers(self.worker.work_loop)
self.redis = get_redis_client()
def run(self):
def run(self, *args, **kwargs):
signal.signal(signal.SIGINT, self.stop)
signal.signal(signal.SIGTERM, self.stop)
# Child should implement other things here
def stop(self, signum, frame):
self.should_stop = True
logger.warning('received {}, stopping'.format(signame(signum)))
raise SystemExit()
class AWXConsumerRedis(AWXConsumerBase):
def run(self, *args, **kwargs):
super(AWXConsumerRedis, self).run(*args, **kwargs)
logger.info(f'Callback receiver started with pid={os.getpid()}')
db.connection.close() # logs use database, so close connection
while True:
time.sleep(60)
def stop(self, signum, frame):
logger.warning('received {}, stopping'.format(signame(signum)))
raise SystemExit()

View File

@@ -26,6 +26,7 @@ from awx.main.models.events import emit_event_detail
from awx.main.utils.profiling import AWXProfiler
from awx.main.tasks.system import events_processed_hook
import awx.main.analytics.subsystem_metrics as s_metrics
from .base import WorkerSignalHandler
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
@@ -56,16 +57,6 @@ def job_stats_wrapup(job_identifier, event=None):
logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier))
class WorkerSignalHandler:
def __init__(self):
self.kill_now = False
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, self.exit_gracefully)
def exit_gracefully(self, *args, **kwargs):
self.kill_now = True
class CallbackBrokerWorker:
"""
A worker implementation that deserializes callback event data and persists

View File

@@ -1,3 +1,4 @@
import inspect
import logging
import importlib
import time
@@ -36,13 +37,18 @@ def run_callable(body):
if 'guid' in body:
set_guid(body.pop('guid'))
_call = resolve_callable(task)
if inspect.isclass(_call):
# the callable is a class, e.g., RunJob; instantiate and
# return its `run()` method
_call = _call().run
log_extra = ''
logger_method = logger.debug
if 'time_pub' in body:
time_publish = time.time() - body['time_pub']
if time_publish > 5.0:
if ('time_ack' in body) and ('time_pub' in body):
time_publish = body['time_ack'] - body['time_pub']
time_waiting = time.time() - body['time_ack']
if time_waiting > 5.0 or time_publish > 5.0:
# If task too a very long time to process, add this information to the log
log_extra = f' took {time_publish:.4f} to send message'
log_extra = f' took {time_publish:.4f} to ack, {time_waiting:.4f} in local dispatcher'
logger_method = logger.info
# don't print kwargs, they often contain launch-time secrets
logger_method(f'task {uuid} starting {task}(*{args}){log_extra}')

View File

@@ -1,88 +0,0 @@
import argparse
import inspect
import logging
import os
import sys
import yaml
from django.core.management.base import BaseCommand, CommandError
from django.db import connection
from dispatcherd.cli import (
CONTROL_ARG_SCHEMAS,
DEFAULT_CONFIG_FILE,
_base_cli_parent,
_control_common_parent,
_register_control_arguments,
_build_command_data_from_args,
)
from dispatcherd.config import setup as dispatcher_setup
from dispatcherd.factories import get_control_from_settings
from dispatcherd.service import control_tasks
from awx.main.dispatch.config import get_dispatcherd_config
from awx.main.management.commands.dispatcherd import ensure_no_dispatcherd_env_config
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Dispatcher control operations'
def add_arguments(self, parser):
parser.description = 'Run dispatcherd control commands using awx-manage.'
base_parent = _base_cli_parent()
control_parent = _control_common_parent()
parser._add_container_actions(base_parent)
parser._add_container_actions(control_parent)
subparsers = parser.add_subparsers(dest='command', metavar='command')
subparsers.required = True
shared_parents = [base_parent, control_parent]
for command in control_tasks.__all__:
func = getattr(control_tasks, command, None)
doc = inspect.getdoc(func) or ''
summary = doc.splitlines()[0] if doc else None
command_parser = subparsers.add_parser(
command,
help=summary,
description=doc,
parents=shared_parents,
)
_register_control_arguments(command_parser, CONTROL_ARG_SCHEMAS.get(command))
def handle(self, *args, **options):
command = options.pop('command', None)
if not command:
raise CommandError('No dispatcher control command specified')
for django_opt in ('verbosity', 'traceback', 'no_color', 'force_color', 'skip_checks'):
options.pop(django_opt, None)
log_level = options.pop('log_level', 'DEBUG')
config_path = os.path.abspath(options.pop('config', DEFAULT_CONFIG_FILE))
expected_replies = options.pop('expected_replies', 1)
logging.basicConfig(level=getattr(logging, log_level), stream=sys.stdout)
logger.debug(f"Configured standard out logging at {log_level} level")
default_config = os.path.abspath(DEFAULT_CONFIG_FILE)
ensure_no_dispatcherd_env_config()
if config_path != default_config:
raise CommandError('The config path CLI option is not allowed for the awx-manage command')
if connection.vendor == 'sqlite':
raise CommandError('dispatcherctl is not supported with sqlite3; use a PostgreSQL database')
else:
logger.info('Using config generated from awx.main.dispatch.config.get_dispatcherd_config')
dispatcher_setup(get_dispatcherd_config())
schema_namespace = argparse.Namespace(**options)
data = _build_command_data_from_args(schema_namespace, command)
ctl = get_control_from_settings()
returned = ctl.control_with_reply(command, data=data, expected_replies=expected_replies)
self.stdout.write(yaml.dump(returned, default_flow_style=False))
if len(returned) < expected_replies:
logger.error(f'Obtained only {len(returned)} of {expected_replies}, exiting with non-zero code')
raise CommandError('dispatcherctl returned fewer replies than expected')

View File

@@ -1,85 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
import copy
import hashlib
import json
import logging
import logging.config
import os
from django.conf import settings
from django.core.cache import cache as django_cache
from django.core.management.base import BaseCommand, CommandError
from django.db import connection
from dispatcherd.config import setup as dispatcher_setup
from awx.main.dispatch.config import get_dispatcherd_config
logger = logging.getLogger('awx.main.dispatch')
from dispatcherd import run_service
def _json_default(value):
if isinstance(value, set):
return sorted(value)
if isinstance(value, tuple):
return list(value)
return str(value)
def _hash_config(config):
serialized = json.dumps(config, sort_keys=True, separators=(',', ':'), default=_json_default)
return hashlib.sha256(serialized.encode('utf-8')).hexdigest()
def ensure_no_dispatcherd_env_config():
if os.getenv('DISPATCHERD_CONFIG_FILE'):
raise CommandError('DISPATCHERD_CONFIG_FILE is set but awx-manage dispatcherd uses dynamic config from code')
class Command(BaseCommand):
help = (
'Run the background task service, this is the supported entrypoint since the introduction of dispatcherd as a library. '
'This replaces the prior awx-manage run_dispatcher service, and control actions are at awx-manage dispatcherctl.'
)
def add_arguments(self, parser):
return
def handle(self, *arg, **options):
ensure_no_dispatcherd_env_config()
self.configure_dispatcher_logging()
config = get_dispatcherd_config(for_service=True)
config_hash = _hash_config(config)
logger.info(
'Using dispatcherd config generated from awx.main.dispatch.config.get_dispatcherd_config (sha256=%s)',
config_hash,
)
# Close the connection, because the pg_notify broker will create new async connection
connection.close()
django_cache.close()
dispatcher_setup(config)
run_service()
def configure_dispatcher_logging(self):
# Apply special log rule for the parent process
special_logging = copy.deepcopy(settings.LOGGING)
changed_handlers = []
for handler_name, handler_config in special_logging.get('handlers', {}).items():
filters = handler_config.get('filters', [])
if 'dynamic_level_filter' in filters:
handler_config['filters'] = [flt for flt in filters if flt != 'dynamic_level_filter']
changed_handlers.append(handler_name)
logger.info(f'Dispatcherd main process replaced log level filter for handlers: {changed_handlers}')
# Apply the custom logging level here, before the asyncio code starts
special_logging.setdefault('loggers', {}).setdefault('dispatcherd', {})
special_logging['loggers']['dispatcherd']['level'] = settings.LOG_AGGREGATOR_LEVEL
logging.config.dictConfig(special_logging)

View File

@@ -3,6 +3,7 @@
import redis
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
import redis.exceptions
@@ -35,7 +36,11 @@ class Command(BaseCommand):
raise CommandError(f'Callback receiver could not connect to redis, error: {exc}')
try:
consumer = AWXConsumerRedis('callback_receiver', CallbackBrokerWorker())
consumer = AWXConsumerRedis(
'callback_receiver',
CallbackBrokerWorker(),
queues=[getattr(settings, 'CALLBACK_QUEUE', '')],
)
consumer.run()
except KeyboardInterrupt:
print('Terminating Callback Receiver')

View File

@@ -1,20 +1,26 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import logging
import logging.config
import yaml
import copy
from django.core.management.base import CommandError
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.core.cache import cache as django_cache
from django.db import connection
from dispatcherd.factories import get_control_from_settings
from dispatcherd import run_service
from dispatcherd.config import setup as dispatcher_setup
from awx.main.management.commands.dispatcherd import Command as DispatcherdCommand
from awx.main.dispatch.config import get_dispatcherd_config
logger = logging.getLogger('awx.main.dispatch')
class Command(DispatcherdCommand):
help = 'Launch the task dispatcher (deprecated; use awx-manage dispatcherd)'
class Command(BaseCommand):
help = 'Launch the task dispatcher'
def add_arguments(self, parser):
parser.add_argument('--status', dest='status', action='store_true', help='print the internal state of any running dispatchers')
@@ -28,10 +34,8 @@ class Command(DispatcherdCommand):
'Only running tasks can be canceled, queued tasks must be started before they can be canceled.'
),
)
super().add_arguments(parser)
def handle(self, *args, **options):
logger.warning('awx-manage run_dispatcher is deprecated; use awx-manage dispatcherd')
def handle(self, *arg, **options):
if options.get('status'):
ctl = get_control_from_settings()
running_data = ctl.control_with_reply('status')
@@ -61,4 +65,28 @@ class Command(DispatcherdCommand):
results.append(result)
print(yaml.dump(results, default_flow_style=False))
return
return super().handle(*args, **options)
self.configure_dispatcher_logging()
# Close the connection, because the pg_notify broker will create new async connection
connection.close()
django_cache.close()
dispatcher_setup(get_dispatcherd_config(for_service=True))
run_service()
dispatcher_setup(get_dispatcherd_config(for_service=True))
run_service()
def configure_dispatcher_logging(self):
# Apply special log rule for the parent process
special_logging = copy.deepcopy(settings.LOGGING)
for handler_name, handler_config in special_logging.get('handlers', {}).items():
filters = handler_config.get('filters', [])
if 'dynamic_level_filter' in filters:
handler_config['filters'] = [flt for flt in filters if flt != 'dynamic_level_filter']
logger.info(f'Dispatcherd main process replaced log level filter for {handler_name} handler')
# Apply the custom logging level here, before the asyncio code starts
special_logging.setdefault('loggers', {}).setdefault('dispatcherd', {})
special_logging['loggers']['dispatcherd']['level'] = settings.LOG_AGGREGATOR_LEVEL
logging.config.dictConfig(special_logging)

View File

@@ -386,6 +386,7 @@ class gce(PluginFileInjector):
# auth related items
ret['auth_kind'] = "serviceaccount"
filters = []
# TODO: implement gce group_by options
# gce never processed the group_by field, if it had, we would selectively
# apply those options here, but it did not, so all groups are added here
@@ -419,6 +420,8 @@ class gce(PluginFileInjector):
if keyed_groups:
ret['keyed_groups'] = keyed_groups
if filters:
ret['filters'] = filters
if compose_dict:
ret['compose'] = compose_dict
if inventory_source.source_regions and 'all' not in inventory_source.source_regions:

View File

@@ -315,11 +315,12 @@ class PrimordialModel(HasEditsMixin, CreatedModifiedModel):
)
def __init__(self, *args, **kwargs):
super(PrimordialModel, self).__init__(*args, **kwargs)
r = super(PrimordialModel, self).__init__(*args, **kwargs)
if self.pk:
self._prior_values_store = self._get_fields_snapshot()
else:
self._prior_values_store = {}
return r
def save(self, *args, **kwargs):
update_fields = kwargs.get('update_fields', [])

View File

@@ -50,8 +50,9 @@ class HasPolicyEditsMixin(HasEditsMixin):
abstract = True
def __init__(self, *args, **kwargs):
super(BaseModel, self).__init__(*args, **kwargs)
r = super(BaseModel, self).__init__(*args, **kwargs)
self._prior_values_store = self._get_fields_snapshot()
return r
def save(self, *args, **kwargs):
super(BaseModel, self).save(*args, **kwargs)

View File

@@ -10,6 +10,7 @@ import json
import logging
import os
import re
import socket
import subprocess
import tempfile
from collections import OrderedDict
@@ -918,7 +919,7 @@ class UnifiedJob(
# If we have a start and finished time, and haven't already calculated
# out the time that elapsed, do so.
if self.started and self.finished and self.elapsed == decimal.Decimal(0):
if self.started and self.finished and self.elapsed == 0.0:
td = self.finished - self.started
elapsed = decimal.Decimal(td.total_seconds())
self.elapsed = elapsed.quantize(dq)
@@ -1354,6 +1355,8 @@ class UnifiedJob(
status_data['instance_group_name'] = None
elif status in ['successful', 'failed', 'canceled'] and self.finished:
status_data['finished'] = datetime.datetime.strftime(self.finished, "%Y-%m-%dT%H:%M:%S.%fZ")
elif status == 'running':
status_data['started'] = datetime.datetime.strftime(self.finished, "%Y-%m-%dT%H:%M:%S.%fZ")
status_data.update(self.websocket_emit_data())
status_data['group_name'] = 'jobs'
if getattr(self, 'unified_job_template_id', None):
@@ -1485,17 +1488,40 @@ class UnifiedJob(
return 'Previous Task Canceled: {"job_type": "%s", "job_name": "%s", "job_id": "%s"}' % (self.model_to_str(), self.name, self.id)
return None
def fallback_cancel(self):
if not self.celery_task_id:
self.refresh_from_db(fields=['celery_task_id'])
self.cancel_dispatcher_process()
def cancel_dispatcher_process(self):
"""Returns True if dispatcher running this job acknowledged request and sent SIGTERM"""
if not self.celery_task_id:
return False
# Special case for task manager (used during workflow job cancellation)
if not connection.get_autocommit():
try:
ctl = get_control_from_settings()
ctl.control('cancel', data={'uuid': self.celery_task_id})
except Exception:
logger.exception("Error sending cancel command to dispatcher")
return True # task manager itself needs to act under assumption that cancel was received
# Standard case with reply
try:
logger.info(f'Sending cancel message to pg_notify channel {self.controller_node} for task {self.celery_task_id}')
ctl = get_control_from_settings(default_publish_channel=self.controller_node)
ctl.control('cancel', data={'uuid': self.celery_task_id})
timeout = 5
ctl = get_control_from_settings()
results = ctl.control_with_reply('cancel', data={'uuid': self.celery_task_id}, expected_replies=1, timeout=timeout)
# Check if cancel was successful by checking if we got any results
return bool(results and len(results) > 0)
except socket.timeout:
logger.error(f'could not reach dispatcher on {self.controller_node} within {timeout}s')
except Exception:
logger.exception("Error sending cancel command to dispatcher")
logger.exception("error encountered when checking task status")
return False # whether confirmation was obtained
def cancel(self, job_explanation=None, is_chain=False):
if self.can_cancel:
@@ -1518,13 +1544,19 @@ class UnifiedJob(
# the job control process will use the cancel_flag to distinguish a shutdown from a cancel
self.save(update_fields=cancel_fields)
# Be extra sure we have the task id, in case job is transitioning into running right now
if not self.celery_task_id:
self.refresh_from_db(fields=['celery_task_id', 'controller_node'])
# send pg_notify message to cancel, will not send until transaction completes
controller_notified = False
if self.celery_task_id:
self.cancel_dispatcher_process()
controller_notified = self.cancel_dispatcher_process()
# If a SIGTERM signal was sent to the control process, and acked by the dispatcher
# then we want to let its own cleanup change status, otherwise change status now
if not controller_notified:
if self.status != 'canceled':
self.status = 'canceled'
self.save(update_fields=['status'])
# Avoid race condition where we have stale model from pending state but job has already started,
# its checking signal but not cancel_flag, so re-send signal after updating cancel fields
self.fallback_cancel()
return self.cancel_flag

View File

@@ -785,7 +785,7 @@ class WorkflowJob(UnifiedJob, WorkflowJobOptions, SurveyJobMixin, JobNotificatio
def cancel_dispatcher_process(self):
# WorkflowJobs don't _actually_ run anything in the dispatcher, so
# there's no point in asking the dispatcher if it knows about this task
return
return True
class WorkflowApprovalTemplate(UnifiedJobTemplate, RelatedJobsMixin):

View File

@@ -76,12 +76,10 @@ class GrafanaBackend(AWXBaseEmailBackend, CustomNotificationBase):
grafana_headers = {}
if 'started' in m.body:
try:
epoch = datetime.datetime.fromtimestamp(0, tz=datetime.timezone.utc)
grafana_data['time'] = grafana_data['timeEnd'] = int(
(dp.parse(m.body['started']).replace(tzinfo=datetime.timezone.utc) - epoch).total_seconds() * 1000
)
epoch = datetime.datetime.utcfromtimestamp(0)
grafana_data['time'] = grafana_data['timeEnd'] = int((dp.parse(m.body['started']).replace(tzinfo=None) - epoch).total_seconds() * 1000)
if m.body.get('finished'):
grafana_data['timeEnd'] = int((dp.parse(m.body['finished']).replace(tzinfo=datetime.timezone.utc) - epoch).total_seconds() * 1000)
grafana_data['timeEnd'] = int((dp.parse(m.body['finished']).replace(tzinfo=None) - epoch).total_seconds() * 1000)
except ValueError:
logger.error(smart_str(_("Error converting time {} or timeEnd {} to int.").format(m.body['started'], m.body['finished'])))
if not self.fail_silently:

View File

@@ -1,7 +1,6 @@
# Copyright (c) 2016 Ansible, Inc.
# All Rights Reserved.
import base64
import json
import logging
import requests
@@ -85,25 +84,20 @@ class WebhookBackend(AWXBaseEmailBackend, CustomNotificationBase):
if resp.status_code not in [301, 307]:
break
# convert the url to a base64 encoded string for safe logging
url_log_safe = base64.b64encode(url.encode('UTF-8'))
# get the next URL to try
url_next = resp.headers.get("Location", None)
url_next_log_safe = base64.b64encode(url_next.encode('UTF-8')) if url_next else b'None'
# we've hit a redirect. extract the redirect URL out of the first response header and try again
logger.warning(f"Received a {resp.status_code} from {url_log_safe}, trying to reach redirect url {url_next_log_safe}; attempt #{retries+1}")
logger.warning(
f"Received a {resp.status_code} from {url}, trying to reach redirect url {resp.headers.get('Location', None)}; attempt #{retries+1}"
)
# take the first redirect URL in the response header and try that
url = url_next
url = resp.headers.get("Location", None)
if url is None:
err = f"Webhook notification received redirect to a blank URL from {url_log_safe}. Response headers={resp.headers}"
err = f"Webhook notification received redirect to a blank URL from {url}. Response headers={resp.headers}"
break
else:
# no break condition in the loop encountered; therefore we have hit the maximum number of retries
err = f"Webhook notification max number of retries [{self.MAX_RETRIES}] exceeded. Failed to send webhook notification to {url_log_safe}"
err = f"Webhook notification max number of retries [{self.MAX_RETRIES}] exceeded. Failed to send webhook notification to {url}"
if resp.status_code >= 400:
err = f"Error sending webhook notification: {resp.status_code}"

View File

@@ -84,7 +84,6 @@ from awx.main.utils.common import (
create_partition,
ScheduleWorkflowManager,
ScheduleTaskManager,
getattr_dne,
)
from awx.conf.license import get_license
from awx.main.utils.handlers import SpecialInventoryHandler
@@ -93,76 +92,9 @@ from awx.main.utils.update_model import update_model
# Django flags
from flags.state import flag_enabled
# Workload Identity
from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope
logger = logging.getLogger('awx.main.tasks.jobs')
def populate_claims_for_workload(unified_job) -> dict:
"""
Extract JWT claims from a Controller workload for the aap_controller_automation_job scope.
"""
# Related objects in the UnifiedJob model, applies to all job types
organization = getattr_dne(unified_job, 'organization')
ujt = getattr_dne(unified_job, 'unified_job_template')
instance_group = getattr_dne(unified_job, 'instance_group')
claims = {
AutomationControllerJobScope.CLAIM_JOB_ID: unified_job.id,
AutomationControllerJobScope.CLAIM_JOB_NAME: unified_job.name,
AutomationControllerJobScope.CLAIM_LAUNCH_TYPE: unified_job.launch_type,
}
# Related objects in the UnifiedJob model, applies to all job types
# null cases are omitted because of OIDC
if organization := getattr_dne(unified_job, 'organization'):
claims[AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME] = organization.name
claims[AutomationControllerJobScope.CLAIM_ORGANIZATION_ID] = organization.id
if ujt := getattr_dne(unified_job, 'unified_job_template'):
claims[AutomationControllerJobScope.CLAIM_UNIFIED_JOB_TEMPLATE_NAME] = ujt.name
claims[AutomationControllerJobScope.CLAIM_UNIFIED_JOB_TEMPLATE_ID] = ujt.id
if instance_group := getattr_dne(unified_job, 'instance_group'):
claims[AutomationControllerJobScope.CLAIM_INSTANCE_GROUP_NAME] = instance_group.name
claims[AutomationControllerJobScope.CLAIM_INSTANCE_GROUP_ID] = instance_group.id
# Related objects on concrete models, may not be valid for type of unified_job
if inventory := getattr_dne(unified_job, 'inventory', None):
claims[AutomationControllerJobScope.CLAIM_INVENTORY_NAME] = inventory.name
claims[AutomationControllerJobScope.CLAIM_INVENTORY_ID] = inventory.id
if execution_environment := getattr_dne(unified_job, 'execution_environment', None):
claims[AutomationControllerJobScope.CLAIM_EXECUTION_ENVIRONMENT_NAME] = execution_environment.name
claims[AutomationControllerJobScope.CLAIM_EXECUTION_ENVIRONMENT_ID] = execution_environment.id
if project := getattr_dne(unified_job, 'project', None):
claims[AutomationControllerJobScope.CLAIM_PROJECT_NAME] = project.name
claims[AutomationControllerJobScope.CLAIM_PROJECT_ID] = project.id
if jt := getattr_dne(unified_job, 'job_template', None):
claims[AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_NAME] = jt.name
claims[AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_ID] = jt.id
# Only valid for job templates
if hasattr(unified_job, 'playbook'):
claims[AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME] = unified_job.playbook
# Not valid for inventory updates and system jobs
if hasattr(unified_job, 'job_type'):
claims[AutomationControllerJobScope.CLAIM_JOB_TYPE] = unified_job.job_type
launched_by: dict = unified_job.launched_by
if 'name' in launched_by:
claims[AutomationControllerJobScope.CLAIM_LAUNCHED_BY_NAME] = launched_by['name']
if 'id' in launched_by:
claims[AutomationControllerJobScope.CLAIM_LAUNCHED_BY_ID] = launched_by['id']
return claims
def with_path_cleanup(f):
@functools.wraps(f)
def _wrapped(self, *args, **kwargs):
@@ -1396,6 +1328,7 @@ class RunProjectUpdate(BaseTask):
'local_path': os.path.basename(project_update.project.local_path),
'project_path': project_update.get_project_path(check_if_exists=False), # deprecated
'insights_url': settings.INSIGHTS_URL_BASE,
'oidc_endpoint': settings.INSIGHTS_OIDC_ENDPOINT,
'awx_license_type': get_license().get('license_type', 'UNLICENSED'),
'awx_version': get_awx_version(),
'scm_url': scm_url,

View File

@@ -69,7 +69,7 @@ def signal_callback():
def with_signal_handling(f):
"""
Change signal handling to make signal_callback return True in event of SIGTERM, SIGINT, or SIGUSR1.
Change signal handling to make signal_callback return True in event of SIGTERM or SIGINT.
"""
@functools.wraps(f)

View File

@@ -93,10 +93,7 @@ def _run_dispatch_startup_common():
# TODO: Enable this on VM installs
if settings.IS_K8S:
try:
write_receptor_config()
except Exception:
logger.exception("Failed to write receptor config, skipping.")
write_receptor_config()
try:
convert_jsonfields()
@@ -760,16 +757,14 @@ def _heartbeat_check_versions(this_inst, instance_list):
def _heartbeat_handle_lost_instances(lost_instances, this_inst):
"""Handle lost instances by reaping their running jobs and marking them offline."""
"""Handle lost instances by reaping their jobs and marking them offline."""
for other_inst in lost_instances:
try:
# Any jobs marked as running will be marked as error
explanation = "Job reaped due to instance shutdown"
reaper.reap(other_inst, job_explanation=explanation)
# Any jobs that were waiting to be processed by this node will be handed back to task manager
UnifiedJob.objects.filter(status='waiting', controller_node=other_inst.hostname).update(status='pending', controller_node='', execution_node='')
reaper.reap_waiting(other_inst, grace_period=0, job_explanation=explanation)
except Exception:
logger.exception('failed to re-process jobs for lost instance {}'.format(other_inst.hostname))
logger.exception('failed to reap jobs for {}'.format(other_inst.hostname))
try:
if settings.AWX_AUTO_DEPROVISION_INSTANCES and other_inst.node_type == "control":
deprovision_hostname = other_inst.hostname

View File

@@ -1,11 +1,8 @@
import pytest
from django.test import RequestFactory
from prometheus_client.parser import text_string_to_metric_families
from rest_framework.request import Request
from awx.main import models
from awx.main.analytics.metrics import metrics
from awx.main.analytics.dispatcherd_metrics import get_dispatcherd_metrics
from awx.api.versioning import reverse
EXPECTED_VALUES = {
@@ -80,55 +77,3 @@ def test_metrics_http_methods(get, post, patch, put, options, admin):
assert patch(get_metrics_view_db_only(), user=admin).status_code == 405
assert post(get_metrics_view_db_only(), user=admin).status_code == 405
assert options(get_metrics_view_db_only(), user=admin).status_code == 200
class DummyMetricsResponse:
def __init__(self, payload):
self._payload = payload
def read(self):
return self._payload
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def test_dispatcherd_metrics_node_filter_match(mocker, settings):
settings.CLUSTER_HOST_ID = "awx-1"
payload = b'# HELP test_metric A test metric\n# TYPE test_metric gauge\ntest_metric 1\n'
def fake_urlopen(url, timeout=1.0):
return DummyMetricsResponse(payload)
mocker.patch('urllib.request.urlopen', fake_urlopen)
request = Request(RequestFactory().get('/api/v2/metrics/', {'node': 'awx-1'}))
assert get_dispatcherd_metrics(request) == payload.decode('utf-8')
def test_dispatcherd_metrics_node_filter_excludes_local(mocker, settings):
settings.CLUSTER_HOST_ID = "awx-1"
def fake_urlopen(*args, **kwargs):
raise AssertionError("urlopen should not be called when node filter excludes local node")
mocker.patch('urllib.request.urlopen', fake_urlopen)
request = Request(RequestFactory().get('/api/v2/metrics/', {'node': 'awx-2'}))
assert get_dispatcherd_metrics(request) == ''
def test_dispatcherd_metrics_metric_filter_excludes_unrelated(mocker):
def fake_urlopen(*args, **kwargs):
raise AssertionError("urlopen should not be called when metric filter excludes dispatcherd metrics")
mocker.patch('urllib.request.urlopen', fake_urlopen)
request = Request(RequestFactory().get('/api/v2/metrics/', {'metric': 'awx_system_info'}))
assert get_dispatcherd_metrics(request) == ''

View File

@@ -1,17 +0,0 @@
import pytest
from awx.main.dispatch.config import get_dispatcherd_config
from awx.main.management.commands.dispatcherd import _hash_config
@pytest.mark.django_db
def test_dispatcherd_config_hash_is_stable(settings, monkeypatch):
monkeypatch.setenv('AWX_COMPONENT', 'dispatcher')
settings.CLUSTER_HOST_ID = 'test-node'
settings.JOB_EVENT_WORKERS = 1
settings.DISPATCHER_SCHEDULE = {}
config_one = get_dispatcherd_config(for_service=True)
config_two = get_dispatcherd_config(for_service=True)
assert _hash_config(config_one) == _hash_config(config_two)

View File

@@ -1,6 +1,5 @@
import itertools
import pytest
from uuid import uuid4
# CRUM
from crum import impersonate
@@ -34,64 +33,6 @@ def test_soft_unique_together(post, project, admin_user):
assert 'combination already exists' in str(r.data)
@pytest.mark.django_db
class TestJobCancel:
"""
Coverage for UnifiedJob.cancel, focused on interaction with dispatcherd objects.
Using mocks for the dispatcherd objects, because tests by default use a no-op broker.
"""
def test_cancel_sets_flag_and_clears_start_args(self, mocker):
job = Job.objects.create(status='running', name='foo-job', celery_task_id=str(uuid4()), controller_node='foo', start_args='{"secret": "value"}')
job.websocket_emit_status = mocker.MagicMock()
assert job.can_cancel is True
assert job.cancel_flag is False
job.cancel()
job.refresh_from_db()
assert job.cancel_flag is True
assert job.start_args == ''
def test_cancel_sets_job_explanation(self, mocker):
job = Job.objects.create(status='running', name='foo-job', celery_task_id=str(uuid4()), controller_node='foo')
job.websocket_emit_status = mocker.MagicMock()
job_explanation = 'giggity giggity'
job.cancel(job_explanation=job_explanation)
job.refresh_from_db()
assert job.job_explanation == job_explanation
def test_cancel_sends_control_message(self, mocker):
celery_task_id = str(uuid4())
job = Job.objects.create(status='running', name='foo-job', celery_task_id=celery_task_id, controller_node='foo')
job.websocket_emit_status = mocker.MagicMock()
control = mocker.MagicMock()
get_control = mocker.patch('awx.main.models.unified_jobs.get_control_from_settings', return_value=control)
job.cancel()
get_control.assert_called_once_with(default_publish_channel='foo')
control.control.assert_called_once_with('cancel', data={'uuid': celery_task_id})
def test_cancel_refreshes_task_id_before_sending_control(self, mocker):
job = Job.objects.create(status='pending', name='foo-job', celery_task_id='', controller_node='bar')
job.websocket_emit_status = mocker.MagicMock()
celery_task_id = str(uuid4())
Job.objects.filter(pk=job.pk).update(status='running', celery_task_id=celery_task_id)
control = mocker.MagicMock()
get_control = mocker.patch('awx.main.models.unified_jobs.get_control_from_settings', return_value=control)
refresh_spy = mocker.spy(job, 'refresh_from_db')
job.cancel()
refresh_spy.assert_called_once_with(fields=['celery_task_id', 'controller_node'])
get_control.assert_called_once_with(default_publish_channel='bar')
control.control.assert_called_once_with('cancel', data={'uuid': celery_task_id})
@pytest.mark.django_db
class TestCreateUnifiedJob:
"""

View File

@@ -9,7 +9,7 @@ from unittest import mock
import pytest
from awx.main.tasks.system import CleanupImagesAndFiles, execution_node_health_check, inspect_established_receptor_connections, clear_setting_cache
from awx.main.management.commands.dispatcherd import Command
from awx.main.management.commands.run_dispatcher import Command
from awx.main.models import Instance, Job, ReceptorAddress, InstanceLink

View File

@@ -5,7 +5,6 @@ import pytest
from awx.main.models import Job, WorkflowJob, Instance
from awx.main.dispatch import reaper
from awx.main.tasks import system
from dispatcherd.publish import task
'''
@@ -62,6 +61,11 @@ class TestJobReaper(object):
('running', '', '', None, False), # running, not assigned to the instance
('running', 'awx', '', None, True), # running, has the instance as its execution_node
('running', '', 'awx', None, True), # running, has the instance as its controller_node
('waiting', '', '', None, False), # waiting, not assigned to the instance
('waiting', 'awx', '', None, False), # waiting, was edited less than a minute ago
('waiting', '', 'awx', None, False), # waiting, was edited less than a minute ago
('waiting', 'awx', '', yesterday, False), # waiting, managed by another node, ignore
('waiting', '', 'awx', yesterday, True), # waiting, assigned to the controller_node, stale
],
)
def test_should_reap(self, status, fail, execution_node, controller_node, modified):
@@ -79,6 +83,7 @@ class TestJobReaper(object):
# (because .save() overwrites it to _now_)
Job.objects.filter(id=j.id).update(modified=modified)
reaper.reap(i)
reaper.reap_waiting(i)
job = Job.objects.first()
if fail:
assert job.status == 'failed'
@@ -87,20 +92,6 @@ class TestJobReaper(object):
else:
assert job.status == status
def test_waiting_job_sent_back_to_pending(self):
this_inst = Instance(hostname='awx')
this_inst.save()
lost_inst = Instance(hostname='lost', node_type=Instance.Types.EXECUTION, node_state=Instance.States.UNAVAILABLE)
lost_inst.save()
job = Job.objects.create(status='waiting', controller_node=lost_inst.hostname, execution_node='lost')
system._heartbeat_handle_lost_instances([lost_inst], this_inst)
job.refresh_from_db()
assert job.status == 'pending'
assert job.controller_node == ''
assert job.execution_node == ''
@pytest.mark.parametrize(
'excluded_uuids, fail, started',
[

View File

@@ -69,7 +69,7 @@ def live_tmp_folder():
settings._awx_conf_memoizedcache.clear()
# cache is cleared in test environment, but need to clear in test environment
clear_setting_cache.delay(['AWX_ISOLATION_SHOW_PATHS'])
time.sleep(5.0) # for _awx_conf_memoizedcache to expire on all workers
time.sleep(0.2) # allow task to finish, we have no real metric to know
else:
logger.info(f'Believed that {path} is already in settings.AWX_ISOLATION_SHOW_PATHS: {settings.AWX_ISOLATION_SHOW_PATHS}')
return path

View File

@@ -1,4 +1,3 @@
import copy
import warnings
from unittest.mock import Mock, patch
@@ -9,7 +8,6 @@ from awx.api.schema import (
AuthenticatedSpectacularAPIView,
AuthenticatedSpectacularSwaggerView,
AuthenticatedSpectacularRedocView,
filter_credential_type_schema,
)
@@ -273,152 +271,3 @@ class TestAuthenticatedSchemaViews:
def test_authenticated_spectacular_redoc_view_requires_authentication(self):
"""Test that AuthenticatedSpectacularRedocView requires authentication."""
assert IsAuthenticated in AuthenticatedSpectacularRedocView.permission_classes
class TestFilterCredentialTypeSchema:
"""Unit tests for filter_credential_type_schema postprocessing hook."""
def test_filters_both_schemas_correctly(self):
"""Test that both CredentialTypeRequest and PatchedCredentialTypeRequest schemas are filtered."""
result = {
'components': {
'schemas': {
'CredentialTypeRequest': {
'properties': {
'kind': {
'enum': [
'ssh',
'vault',
'net',
'scm',
'cloud',
'registry',
'token',
'insights',
'external',
'kubernetes',
'galaxy',
'cryptography',
None,
],
'type': 'string',
}
}
},
'PatchedCredentialTypeRequest': {
'properties': {
'kind': {
'enum': [
'ssh',
'vault',
'net',
'scm',
'cloud',
'registry',
'token',
'insights',
'external',
'kubernetes',
'galaxy',
'cryptography',
None,
],
'type': 'string',
}
}
},
}
}
}
returned = filter_credential_type_schema(result, None, None, None)
# POST/PUT schema: no None (required field)
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net']
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['description'] == "* `cloud` - Cloud\\n* `net` - Network"
# PATCH schema: includes None (optional field)
assert result['components']['schemas']['PatchedCredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net', None]
assert result['components']['schemas']['PatchedCredentialTypeRequest']['properties']['kind']['description'] == "* `cloud` - Cloud\\n* `net` - Network"
# Other properties should be preserved
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['type'] == 'string'
# Function should return the result
assert returned is result
def test_handles_empty_result(self):
"""Test graceful handling when result dict is empty."""
result = {}
original = copy.deepcopy(result)
returned = filter_credential_type_schema(result, None, None, None)
assert result == original
assert returned is result
def test_handles_missing_enum(self):
"""Test that schemas without enum key are not modified."""
result = {'components': {'schemas': {'CredentialTypeRequest': {'properties': {'kind': {'type': 'string', 'description': 'Some description'}}}}}}
original = copy.deepcopy(result)
filter_credential_type_schema(result, None, None, None)
assert result == original
def test_filters_only_target_schemas(self):
"""Test that only CredentialTypeRequest schemas are modified, not others."""
result = {
'components': {
'schemas': {
'CredentialTypeRequest': {'properties': {'kind': {'enum': ['ssh', 'cloud', 'net', None]}}},
'OtherSchema': {'properties': {'kind': {'enum': ['option1', 'option2']}}},
}
}
}
other_schema_before = copy.deepcopy(result['components']['schemas']['OtherSchema'])
filter_credential_type_schema(result, None, None, None)
# CredentialTypeRequest should be filtered (no None for required field)
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net']
# OtherSchema should be unchanged
assert result['components']['schemas']['OtherSchema'] == other_schema_before
def test_handles_only_one_schema_present(self):
"""Test that function works when only one target schema is present."""
result = {'components': {'schemas': {'CredentialTypeRequest': {'properties': {'kind': {'enum': ['ssh', 'cloud', 'net', None]}}}}}}
filter_credential_type_schema(result, None, None, None)
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net']
def test_handles_missing_properties(self):
"""Test graceful handling when schema has no properties key."""
result = {'components': {'schemas': {'CredentialTypeRequest': {}}}}
original = copy.deepcopy(result)
filter_credential_type_schema(result, None, None, None)
assert result == original
def test_differentiates_required_vs_optional_fields(self):
"""Test that CredentialTypeRequest excludes None but PatchedCredentialTypeRequest includes it."""
result = {
'components': {
'schemas': {
'CredentialTypeRequest': {'properties': {'kind': {'enum': ['ssh', 'vault', 'net', 'scm', 'cloud', 'registry', None]}}},
'PatchedCredentialTypeRequest': {'properties': {'kind': {'enum': ['ssh', 'vault', 'net', 'scm', 'cloud', 'registry', None]}}},
}
}
}
filter_credential_type_schema(result, None, None, None)
# POST/PUT schema: no None (required field)
assert result['components']['schemas']['CredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net']
# PATCH schema: includes None (optional field)
assert result['components']['schemas']['PatchedCredentialTypeRequest']['properties']['kind']['enum'] == ['cloud', 'net', None]

View File

@@ -1,92 +0,0 @@
import io
import pytest
from django.core.management.base import CommandError
from awx.main.management.commands import dispatcherctl
@pytest.fixture(autouse=True)
def clear_dispatcher_env(monkeypatch, mocker):
monkeypatch.delenv('DISPATCHERD_CONFIG_FILE', raising=False)
mocker.patch.object(dispatcherctl.logging, 'basicConfig')
mocker.patch.object(dispatcherctl, 'connection', mocker.Mock(vendor='postgresql'))
def test_dispatcherctl_runs_control_with_generated_config(mocker):
command = dispatcherctl.Command()
command.stdout = io.StringIO()
data = {'foo': 'bar'}
mocker.patch.object(dispatcherctl, '_build_command_data_from_args', return_value=data)
dispatcher_setup = mocker.patch.object(dispatcherctl, 'dispatcher_setup')
config_data = {'setting': 'value'}
mocker.patch.object(dispatcherctl, 'get_dispatcherd_config', return_value=config_data)
control = mocker.Mock()
control.control_with_reply.return_value = [{'status': 'ok'}]
mocker.patch.object(dispatcherctl, 'get_control_from_settings', return_value=control)
mocker.patch.object(dispatcherctl.yaml, 'dump', return_value='payload\n')
command.handle(
command='running',
config=dispatcherctl.DEFAULT_CONFIG_FILE,
expected_replies=1,
log_level='INFO',
)
dispatcher_setup.assert_called_once_with(config_data)
control.control_with_reply.assert_called_once_with('running', data=data, expected_replies=1)
assert command.stdout.getvalue() == 'payload\n'
def test_dispatcherctl_rejects_custom_config_path():
command = dispatcherctl.Command()
command.stdout = io.StringIO()
with pytest.raises(CommandError):
command.handle(
command='running',
config='/tmp/dispatcher.yml',
expected_replies=1,
log_level='INFO',
)
def test_dispatcherctl_rejects_sqlite_db(mocker):
command = dispatcherctl.Command()
command.stdout = io.StringIO()
mocker.patch.object(dispatcherctl, 'connection', mocker.Mock(vendor='sqlite'))
with pytest.raises(CommandError, match='sqlite3'):
command.handle(
command='running',
config=dispatcherctl.DEFAULT_CONFIG_FILE,
expected_replies=1,
log_level='INFO',
)
def test_dispatcherctl_raises_when_replies_missing(mocker):
command = dispatcherctl.Command()
command.stdout = io.StringIO()
mocker.patch.object(dispatcherctl, '_build_command_data_from_args', return_value={})
mocker.patch.object(dispatcherctl, 'dispatcher_setup')
mocker.patch.object(dispatcherctl, 'get_dispatcherd_config', return_value={})
control = mocker.Mock()
control.control_with_reply.return_value = [{'status': 'ok'}]
mocker.patch.object(dispatcherctl, 'get_control_from_settings', return_value=control)
mocker.patch.object(dispatcherctl.yaml, 'dump', return_value='- status: ok\n')
with pytest.raises(CommandError):
command.handle(
command='running',
config=dispatcherctl.DEFAULT_CONFIG_FILE,
expected_replies=2,
log_level='INFO',
)
control.control_with_reply.assert_called_once_with('running', data={}, expected_replies=2)

View File

@@ -1,3 +1,4 @@
import pytest
from unittest import mock
from awx.main.models import UnifiedJob, UnifiedJobTemplate, WorkflowJob, WorkflowJobNode, WorkflowApprovalTemplate, Job, User, Project, JobTemplate, Inventory
@@ -21,6 +22,52 @@ def test_unified_job_workflow_attributes():
assert job.workflow_job_id == 1
def mock_on_commit(f):
f()
@pytest.fixture
def unified_job(mocker):
mocker.patch.object(UnifiedJob, 'can_cancel', return_value=True)
j = UnifiedJob()
j.status = 'pending'
j.cancel_flag = None
j.save = mocker.MagicMock()
j.websocket_emit_status = mocker.MagicMock()
j.fallback_cancel = mocker.MagicMock()
return j
def test_cancel(unified_job):
with mock.patch('awx.main.models.unified_jobs.connection.on_commit', wraps=mock_on_commit):
unified_job.cancel()
assert unified_job.cancel_flag is True
assert unified_job.status == 'canceled'
assert unified_job.job_explanation == ''
# Note: the websocket emit status check is just reflecting the state of the current code.
# Some more thought may want to go into only emitting canceled if/when the job record
# status is changed to canceled. Unlike, currently, where it's emitted unconditionally.
unified_job.websocket_emit_status.assert_called_with("canceled")
assert [(args, kwargs) for args, kwargs in unified_job.save.call_args_list] == [
((), {'update_fields': ['cancel_flag', 'start_args']}),
((), {'update_fields': ['status']}),
]
def test_cancel_job_explanation(unified_job):
job_explanation = 'giggity giggity'
with mock.patch('awx.main.models.unified_jobs.connection.on_commit'):
unified_job.cancel(job_explanation=job_explanation)
assert unified_job.job_explanation == job_explanation
assert [(args, kwargs) for args, kwargs in unified_job.save.call_args_list] == [
((), {'update_fields': ['cancel_flag', 'start_args', 'job_explanation']}),
((), {'update_fields': ['status']}),
]
def test_organization_copy_to_jobs():
"""
All unified job types should infer their organization from their template organization

View File

@@ -226,140 +226,3 @@ def test_send_messages_with_additional_headers():
allow_redirects=False,
)
assert sent_messages == 1
def test_send_messages_with_redirects_ok():
with mock.patch('awx.main.notifications.webhook_backend.requests') as requests_mock, mock.patch(
'awx.main.notifications.webhook_backend.get_awx_http_client_headers'
) as version_mock:
# First two calls return redirects, third call returns 200
requests_mock.post.side_effect = [
mock.Mock(status_code=301, headers={"Location": "http://redirect1.com"}),
mock.Mock(status_code=307, headers={"Location": "http://redirect2.com"}),
mock.Mock(status_code=200),
]
version_mock.return_value = {'Content-Type': 'application/json', 'User-Agent': 'AWX 0.0.1.dev (open)'}
backend = webhook_backend.WebhookBackend('POST', None)
message = EmailMessage(
'test subject',
{'text': 'test body'},
[],
[
'http://example.com',
],
)
sent_messages = backend.send_messages(
[
message,
]
)
assert requests_mock.post.call_count == 3
requests_mock.post.assert_called_with(
url='http://redirect2.com',
auth=None,
data=json.dumps({'text': 'test body'}, ensure_ascii=False).encode('utf-8'),
headers={'Content-Type': 'application/json', 'User-Agent': 'AWX 0.0.1.dev (open)'},
verify=True,
allow_redirects=False,
)
assert sent_messages == 1
def test_send_messages_with_redirects_blank():
with mock.patch('awx.main.notifications.webhook_backend.requests') as requests_mock, mock.patch(
'awx.main.notifications.webhook_backend.get_awx_http_client_headers'
) as version_mock, mock.patch('awx.main.notifications.webhook_backend.logger') as logger_mock:
# First call returns a redirect with Location header, second call returns 301 but NO Location header
requests_mock.post.side_effect = [
mock.Mock(status_code=301, headers={"Location": "http://redirect1.com"}),
mock.Mock(status_code=301, headers={}), # 301 with no Location header
]
version_mock.return_value = {'Content-Type': 'application/json', 'User-Agent': 'AWX 0.0.1.dev (open)'}
backend = webhook_backend.WebhookBackend('POST', None, fail_silently=True)
message = EmailMessage(
'test subject',
{'text': 'test body'},
[],
[
'http://example.com',
],
)
sent_messages = backend.send_messages(
[
message,
]
)
# Should make 2 requests (initial + 1 redirect attempt)
assert requests_mock.post.call_count == 2
# The error message should be logged
logger_mock.error.assert_called_once()
error_call_args = logger_mock.error.call_args[0][0]
assert "redirect to a blank URL" in error_call_args
assert sent_messages == 0
def test_send_messages_with_redirects_max_retries_exceeded():
with mock.patch('awx.main.notifications.webhook_backend.requests') as requests_mock, mock.patch(
'awx.main.notifications.webhook_backend.get_awx_http_client_headers'
) as version_mock, mock.patch('awx.main.notifications.webhook_backend.logger') as logger_mock:
# Return MAX_RETRIES (5) redirect responses to exceed the retry limit
requests_mock.post.side_effect = [
mock.Mock(status_code=301, headers={"Location": "http://redirect1.com"}),
mock.Mock(status_code=301, headers={"Location": "http://redirect2.com"}),
mock.Mock(status_code=307, headers={"Location": "http://redirect3.com"}),
mock.Mock(status_code=301, headers={"Location": "http://redirect4.com"}),
mock.Mock(status_code=307, headers={"Location": "http://redirect5.com"}),
]
version_mock.return_value = {'Content-Type': 'application/json', 'User-Agent': 'AWX 0.0.1.dev (open)'}
backend = webhook_backend.WebhookBackend('POST', None, fail_silently=True)
message = EmailMessage(
'test subject',
{'text': 'test body'},
[],
[
'http://example.com',
],
)
sent_messages = backend.send_messages(
[
message,
]
)
# Should make exactly 5 requests (MAX_RETRIES)
assert requests_mock.post.call_count == 5
# The error message should be logged for exceeding max retries
logger_mock.error.assert_called_once()
error_call_args = logger_mock.error.call_args[0][0]
assert "max number of retries" in error_call_args
assert "[5]" in error_call_args
assert sent_messages == 0
def test_send_messages_with_error_status_code():
with mock.patch('awx.main.notifications.webhook_backend.requests') as requests_mock, mock.patch(
'awx.main.notifications.webhook_backend.get_awx_http_client_headers'
) as version_mock, mock.patch('awx.main.notifications.webhook_backend.logger') as logger_mock:
# Return a 404 error status code
requests_mock.post.return_value = mock.Mock(status_code=404)
version_mock.return_value = {'Content-Type': 'application/json', 'User-Agent': 'AWX 0.0.1.dev (open)'}
backend = webhook_backend.WebhookBackend('POST', None, fail_silently=True)
message = EmailMessage(
'test subject',
{'text': 'test body'},
[],
[
'http://example.com',
],
)
sent_messages = backend.send_messages(
[
message,
]
)
# Should make exactly 1 request
assert requests_mock.post.call_count == 1
# The error message should be logged
logger_mock.error.assert_called_once()
error_call_args = logger_mock.error.call_args[0][0]
assert "Error sending webhook notification: 404" in error_call_args
assert sent_messages == 0

View File

@@ -1,19 +1,20 @@
import pytest
from django.conf import settings
from datetime import timedelta
@pytest.mark.parametrize(
"task_name",
"job_name,function_path",
[
'awx.main.tasks.system.awx_periodic_scheduler',
('tower_scheduler', 'awx.main.tasks.system.awx_periodic_scheduler'),
],
)
def test_DISPATCHER_SCHEDULE(mocker, task_name):
assert task_name in settings.DISPATCHER_SCHEDULE
assert 'schedule' in settings.DISPATCHER_SCHEDULE[task_name]
assert type(settings.DISPATCHER_SCHEDULE[task_name]['schedule']) in (int, float)
assert settings.DISPATCHER_SCHEDULE[task_name]['task'] == task_name
def test_CELERYBEAT_SCHEDULE(mocker, job_name, function_path):
assert job_name in settings.CELERYBEAT_SCHEDULE
assert 'schedule' in settings.CELERYBEAT_SCHEDULE[job_name]
assert type(settings.CELERYBEAT_SCHEDULE[job_name]['schedule']) is timedelta
assert settings.CELERYBEAT_SCHEDULE[job_name]['task'] == function_path
# Ensures that the function exists
mocker.patch(task_name)
mocker.patch(function_path)

View File

@@ -18,17 +18,8 @@ from awx.main.models import (
Job,
Organization,
Project,
JobTemplate,
UnifiedJobTemplate,
InstanceGroup,
ExecutionEnvironment,
ProjectUpdate,
InventoryUpdate,
InventorySource,
AdHocCommand,
)
from awx.main.tasks import jobs
from ansible_base.lib.workload_identity.controller import AutomationControllerJobScope
@pytest.fixture
@@ -197,233 +188,3 @@ def test_invalid_host_facts(mock_facts_settings, bulk_update_sorted_by_id, priva
with pytest.raises(pytest.fail.Exception):
if failures:
pytest.fail(f" {len(failures)} facts cleared failures : {','.join(failures)}")
@pytest.mark.parametrize(
"job_attrs,expected_claims",
[
(
{
'id': 100,
'name': 'Test Job',
'job_type': 'run',
'launch_type': 'manual',
'playbook': 'site.yml',
'organization': Organization(id=1, name='Test Org'),
'inventory': Inventory(id=2, name='Test Inventory'),
'project': Project(id=3, name='Test Project'),
'execution_environment': ExecutionEnvironment(id=4, name='Test EE'),
'job_template': JobTemplate(id=5, name='Test Job Template'),
'unified_job_template': UnifiedJobTemplate(pk=6, id=6, name='Test Unified Job Template'),
'instance_group': InstanceGroup(id=7, name='Test Instance Group'),
},
{
AutomationControllerJobScope.CLAIM_JOB_ID: 100,
AutomationControllerJobScope.CLAIM_JOB_NAME: 'Test Job',
AutomationControllerJobScope.CLAIM_JOB_TYPE: 'run',
AutomationControllerJobScope.CLAIM_LAUNCH_TYPE: 'manual',
AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME: 'site.yml',
AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: 'Test Org',
AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: 1,
AutomationControllerJobScope.CLAIM_INVENTORY_NAME: 'Test Inventory',
AutomationControllerJobScope.CLAIM_INVENTORY_ID: 2,
AutomationControllerJobScope.CLAIM_EXECUTION_ENVIRONMENT_NAME: 'Test EE',
AutomationControllerJobScope.CLAIM_EXECUTION_ENVIRONMENT_ID: 4,
AutomationControllerJobScope.CLAIM_PROJECT_NAME: 'Test Project',
AutomationControllerJobScope.CLAIM_PROJECT_ID: 3,
AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_NAME: 'Test Job Template',
AutomationControllerJobScope.CLAIM_JOB_TEMPLATE_ID: 5,
AutomationControllerJobScope.CLAIM_UNIFIED_JOB_TEMPLATE_NAME: 'Test Unified Job Template',
AutomationControllerJobScope.CLAIM_UNIFIED_JOB_TEMPLATE_ID: 6,
AutomationControllerJobScope.CLAIM_INSTANCE_GROUP_NAME: 'Test Instance Group',
AutomationControllerJobScope.CLAIM_INSTANCE_GROUP_ID: 7,
},
),
(
{'id': 100, 'name': 'Test', 'job_type': 'run', 'launch_type': 'manual', 'organization': Organization(id=1, name='')},
{
AutomationControllerJobScope.CLAIM_JOB_ID: 100,
AutomationControllerJobScope.CLAIM_JOB_NAME: 'Test',
AutomationControllerJobScope.CLAIM_JOB_TYPE: 'run',
AutomationControllerJobScope.CLAIM_LAUNCH_TYPE: 'manual',
AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: 1,
AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: '',
AutomationControllerJobScope.CLAIM_PLAYBOOK_NAME: '',
},
),
],
)
def test_populate_claims_for_workload(job_attrs, expected_claims):
job = Job()
for attr, value in job_attrs.items():
setattr(job, attr, value)
claims = jobs.populate_claims_for_workload(job)
assert claims == expected_claims
@pytest.mark.parametrize(
"workload_attrs,expected_claims",
[
(
{
'id': 200,
'name': 'Git Sync',
'job_type': 'check',
'launch_type': 'sync',
'organization': Organization(id=1, name='Test Org'),
'project': Project(pk=3, id=3, name='Test Project'),
'unified_job_template': Project(pk=3, id=3, name='Test Project'),
'execution_environment': ExecutionEnvironment(id=4, name='Test EE'),
'instance_group': InstanceGroup(id=7, name='Test Instance Group'),
},
{
AutomationControllerJobScope.CLAIM_JOB_ID: 200,
AutomationControllerJobScope.CLAIM_JOB_NAME: 'Git Sync',
AutomationControllerJobScope.CLAIM_JOB_TYPE: 'check',
AutomationControllerJobScope.CLAIM_LAUNCH_TYPE: 'sync',
AutomationControllerJobScope.CLAIM_LAUNCHED_BY_NAME: 'Test Project',
AutomationControllerJobScope.CLAIM_LAUNCHED_BY_ID: 3,
AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: 'Test Org',
AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: 1,
AutomationControllerJobScope.CLAIM_PROJECT_NAME: 'Test Project',
AutomationControllerJobScope.CLAIM_PROJECT_ID: 3,
AutomationControllerJobScope.CLAIM_UNIFIED_JOB_TEMPLATE_NAME: 'Test Project',
AutomationControllerJobScope.CLAIM_UNIFIED_JOB_TEMPLATE_ID: 3,
AutomationControllerJobScope.CLAIM_EXECUTION_ENVIRONMENT_NAME: 'Test EE',
AutomationControllerJobScope.CLAIM_EXECUTION_ENVIRONMENT_ID: 4,
AutomationControllerJobScope.CLAIM_INSTANCE_GROUP_NAME: 'Test Instance Group',
AutomationControllerJobScope.CLAIM_INSTANCE_GROUP_ID: 7,
},
),
(
{
'id': 201,
'name': 'Minimal Project Update',
'job_type': 'run',
'launch_type': 'manual',
},
{
AutomationControllerJobScope.CLAIM_JOB_ID: 201,
AutomationControllerJobScope.CLAIM_JOB_NAME: 'Minimal Project Update',
AutomationControllerJobScope.CLAIM_JOB_TYPE: 'run',
AutomationControllerJobScope.CLAIM_LAUNCH_TYPE: 'manual',
},
),
],
)
def test_populate_claims_for_project_update(workload_attrs, expected_claims):
project_update = ProjectUpdate()
for attr, value in workload_attrs.items():
setattr(project_update, attr, value)
claims = jobs.populate_claims_for_workload(project_update)
assert claims == expected_claims
@pytest.mark.parametrize(
"workload_attrs,expected_claims",
[
(
{
'id': 300,
'name': 'AWS Sync',
'launch_type': 'scheduled',
'organization': Organization(id=1, name='Test Org'),
'inventory': Inventory(id=2, name='AWS Inventory'),
'unified_job_template': InventorySource(pk=8, id=8, name='AWS Source'),
'execution_environment': ExecutionEnvironment(id=4, name='Test EE'),
'instance_group': InstanceGroup(id=7, name='Test Instance Group'),
},
{
AutomationControllerJobScope.CLAIM_JOB_ID: 300,
AutomationControllerJobScope.CLAIM_JOB_NAME: 'AWS Sync',
AutomationControllerJobScope.CLAIM_LAUNCH_TYPE: 'scheduled',
AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: 'Test Org',
AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: 1,
AutomationControllerJobScope.CLAIM_INVENTORY_NAME: 'AWS Inventory',
AutomationControllerJobScope.CLAIM_INVENTORY_ID: 2,
AutomationControllerJobScope.CLAIM_UNIFIED_JOB_TEMPLATE_NAME: 'AWS Source',
AutomationControllerJobScope.CLAIM_UNIFIED_JOB_TEMPLATE_ID: 8,
AutomationControllerJobScope.CLAIM_EXECUTION_ENVIRONMENT_NAME: 'Test EE',
AutomationControllerJobScope.CLAIM_EXECUTION_ENVIRONMENT_ID: 4,
AutomationControllerJobScope.CLAIM_INSTANCE_GROUP_NAME: 'Test Instance Group',
AutomationControllerJobScope.CLAIM_INSTANCE_GROUP_ID: 7,
},
),
(
{
'id': 301,
'name': 'Minimal Inventory Update',
'launch_type': 'manual',
},
{
AutomationControllerJobScope.CLAIM_JOB_ID: 301,
AutomationControllerJobScope.CLAIM_JOB_NAME: 'Minimal Inventory Update',
AutomationControllerJobScope.CLAIM_LAUNCH_TYPE: 'manual',
},
),
],
)
def test_populate_claims_for_inventory_update(workload_attrs, expected_claims):
inventory_update = InventoryUpdate()
for attr, value in workload_attrs.items():
setattr(inventory_update, attr, value)
claims = jobs.populate_claims_for_workload(inventory_update)
assert claims == expected_claims
@pytest.mark.parametrize(
"workload_attrs,expected_claims",
[
(
{
'id': 400,
'name': 'Ping All Hosts',
'job_type': 'run',
'launch_type': 'manual',
'organization': Organization(id=1, name='Test Org'),
'inventory': Inventory(id=2, name='Test Inventory'),
'execution_environment': ExecutionEnvironment(id=4, name='Test EE'),
'instance_group': InstanceGroup(id=7, name='Test Instance Group'),
},
{
AutomationControllerJobScope.CLAIM_JOB_ID: 400,
AutomationControllerJobScope.CLAIM_JOB_NAME: 'Ping All Hosts',
AutomationControllerJobScope.CLAIM_JOB_TYPE: 'run',
AutomationControllerJobScope.CLAIM_LAUNCH_TYPE: 'manual',
AutomationControllerJobScope.CLAIM_ORGANIZATION_NAME: 'Test Org',
AutomationControllerJobScope.CLAIM_ORGANIZATION_ID: 1,
AutomationControllerJobScope.CLAIM_INVENTORY_NAME: 'Test Inventory',
AutomationControllerJobScope.CLAIM_INVENTORY_ID: 2,
AutomationControllerJobScope.CLAIM_EXECUTION_ENVIRONMENT_NAME: 'Test EE',
AutomationControllerJobScope.CLAIM_EXECUTION_ENVIRONMENT_ID: 4,
AutomationControllerJobScope.CLAIM_INSTANCE_GROUP_NAME: 'Test Instance Group',
AutomationControllerJobScope.CLAIM_INSTANCE_GROUP_ID: 7,
},
),
(
{
'id': 401,
'name': 'Minimal Ad Hoc',
'job_type': 'run',
'launch_type': 'manual',
},
{
AutomationControllerJobScope.CLAIM_JOB_ID: 401,
AutomationControllerJobScope.CLAIM_JOB_NAME: 'Minimal Ad Hoc',
AutomationControllerJobScope.CLAIM_JOB_TYPE: 'run',
AutomationControllerJobScope.CLAIM_LAUNCH_TYPE: 'manual',
},
),
],
)
def test_populate_claims_for_adhoc_command(workload_attrs, expected_claims):
adhoc_command = AdHocCommand()
for attr, value in workload_attrs.items():
setattr(adhoc_command, attr, value)
claims = jobs.populate_claims_for_workload(adhoc_command)
assert claims == expected_claims

View File

@@ -12,10 +12,6 @@ def pytest_sigterm():
pytest_sigterm.called_count += 1
def pytest_sigusr1():
pytest_sigusr1.called_count += 1
def tmp_signals_for_test(func):
"""
When we run our internal signal handlers, it will call the original signal
@@ -30,17 +26,13 @@ def tmp_signals_for_test(func):
def wrapper():
original_sigterm = signal.getsignal(signal.SIGTERM)
original_sigint = signal.getsignal(signal.SIGINT)
original_sigusr1 = signal.getsignal(signal.SIGUSR1)
signal.signal(signal.SIGTERM, pytest_sigterm)
signal.signal(signal.SIGINT, pytest_sigint)
signal.signal(signal.SIGUSR1, pytest_sigusr1)
pytest_sigterm.called_count = 0
pytest_sigint.called_count = 0
pytest_sigusr1.called_count = 0
func()
signal.signal(signal.SIGTERM, original_sigterm)
signal.signal(signal.SIGINT, original_sigint)
signal.signal(signal.SIGUSR1, original_sigusr1)
return wrapper
@@ -66,13 +58,11 @@ def test_outer_inner_signal_handling():
assert signal_callback() is False
assert pytest_sigterm.called_count == 0
assert pytest_sigint.called_count == 0
assert pytest_sigusr1.called_count == 0
f1()
assert signal_callback() is False
assert signal.getsignal(signal.SIGTERM) is original_sigterm
assert pytest_sigterm.called_count == 1
assert pytest_sigint.called_count == 0
assert pytest_sigusr1.called_count == 0
@tmp_signals_for_test
@@ -97,31 +87,8 @@ def test_inner_outer_signal_handling():
assert signal_callback() is False
assert pytest_sigterm.called_count == 0
assert pytest_sigint.called_count == 0
assert pytest_sigusr1.called_count == 0
f1()
assert signal_callback() is False
assert signal.getsignal(signal.SIGTERM) is original_sigterm
assert pytest_sigterm.called_count == 0
assert pytest_sigint.called_count == 1
assert pytest_sigusr1.called_count == 0
@tmp_signals_for_test
def test_sigusr1_signal_handling():
@with_signal_handling
def f1():
assert signal_callback() is False
signal_state.set_signal_flag(for_signal=signal.SIGUSR1)
assert signal_callback()
original_sigusr1 = signal.getsignal(signal.SIGUSR1)
assert signal_callback() is False
assert pytest_sigterm.called_count == 0
assert pytest_sigint.called_count == 0
assert pytest_sigusr1.called_count == 0
f1()
assert signal_callback() is False
assert signal.getsignal(signal.SIGUSR1) is original_sigusr1
assert pytest_sigterm.called_count == 0
assert pytest_sigint.called_count == 0
assert pytest_sigusr1.called_count == 1

View File

@@ -132,25 +132,6 @@ def test_cert_with_key():
assert not pem_objects[1]['key_enc']
def test_ssh_key_with_whitespace():
# Test that SSH keys with leading/trailing whitespace/newlines are properly sanitized
# This addresses issue #14219 where copy-paste can introduce hidden newlines
valid_key_with_whitespace = "\n\n" + TEST_SSH_KEY_DATA + "\n\n"
pem_objects = validate_ssh_private_key(valid_key_with_whitespace)
assert pem_objects[0]['key_type'] == 'rsa'
assert not pem_objects[0]['key_enc']
# Test with just leading whitespace
valid_key_leading = "\n\n\n" + TEST_SSH_KEY_DATA
pem_objects = validate_ssh_private_key(valid_key_leading)
assert pem_objects[0]['key_type'] == 'rsa'
# Test with just trailing whitespace
valid_key_trailing = TEST_SSH_KEY_DATA + "\n\n\n"
pem_objects = validate_ssh_private_key(valid_key_trailing)
assert pem_objects[0]['key_type'] == 'rsa'
@pytest.mark.parametrize(
"var_str",
[

View File

@@ -1000,15 +1000,9 @@ def getattrd(obj, name, default=NoDefaultProvided):
raise
empty = object()
def getattr_dne(obj, name, default=empty, notfound=ObjectDoesNotExist):
def getattr_dne(obj, name, notfound=ObjectDoesNotExist):
try:
if default is empty:
return getattr(obj, name)
else:
return getattr(obj, name, default)
return getattr(obj, name)
except notfound:
return None

View File

@@ -257,7 +257,8 @@ class LogstashFormatter(LogstashFormatterBase):
return fields
def format(self, record):
stamp = datetime.fromtimestamp(record.created, tz=tzutc())
stamp = datetime.utcfromtimestamp(record.created)
stamp = stamp.replace(tzinfo=tzutc())
message = {
# Field not included, but exist in related logs
# 'path': record.pathname

View File

@@ -181,8 +181,6 @@ def validate_ssh_private_key(data):
certificates; should handle any valid options for ssh_private_key on a
credential.
"""
# Strip leading and trailing whitespace/newlines to handle common copy-paste issues
data = data.strip()
return validate_pem(data, min_keys=1)

View File

@@ -94,7 +94,7 @@ class WebsocketRelayConnection:
except asyncio.CancelledError:
# TODO: Check if connected and disconnect
# Possibly use run_until_complete() if disconnect is async
logger.warning(f"Connection from {self.name} to {self.remote_host} canceled.")
logger.warning(f"Connection from {self.name} to {self.remote_host} cancelled.")
except client_exceptions.ClientConnectorError as e:
logger.warning(f"Connection from {self.name} to {self.remote_host} failed: '{e}'.", exc_info=True)
except asyncio.TimeoutError:
@@ -291,7 +291,7 @@ class WebSocketRelayManager(object):
except asyncio.TimeoutError:
logger.warning(f"Tried to cancel relay connection for {hostname} but it timed out during cleanup.")
except asyncio.CancelledError:
# Handle the case where the task was already canceled by the time we got here.
# Handle the case where the task was already cancelled by the time we got here.
pass
del self.relay_connections[hostname]

View File

@@ -83,6 +83,7 @@ class ActionModule(ActionBase):
password = self._task.args.get('password', None)
client_id = self._task.args.get('client_id', None)
client_secret = self._task.args.get('client_secret', None)
oidc_endpoint = self._task.args.get('oidc_endpoint', DEFAULT_OIDC_ENDPOINT)
session.headers.update(
{
@@ -92,7 +93,7 @@ class ActionModule(ActionBase):
)
if authentication == 'service_account' or (client_id and client_secret):
data = self._obtain_auth_token(DEFAULT_OIDC_ENDPOINT, client_id, client_secret)
data = self._obtain_auth_token(oidc_endpoint, client_id, client_secret)
if 'token' not in data:
result['failed'] = data['failed']
result['msg'] = data['msg']

View File

@@ -7,6 +7,7 @@ import os
import re # noqa
import tempfile
import socket
from datetime import timedelta
DEBUG = True
SQL_DEBUG = DEBUG
@@ -419,30 +420,40 @@ BROKER_URL = 'unix:///var/run/redis/redis.sock'
REDIS_RETRY_COUNT = 3 # Number of retries for Redis connection errors
REDIS_BACKOFF_CAP = 1.0 # Maximum backoff delay in seconds for Redis retries
REDIS_BACKOFF_BASE = 0.5 # Base for exponential backoff calculation for Redis retries
DISPATCHER_SCHEDULE = {
'awx.main.tasks.system.awx_periodic_scheduler': {'task': 'awx.main.tasks.system.awx_periodic_scheduler', 'schedule': 30, 'options': {'expires': 20}},
'awx.main.tasks.system.cluster_node_heartbeat': {
CELERYBEAT_SCHEDULE = {
'tower_scheduler': {'task': 'awx.main.tasks.system.awx_periodic_scheduler', 'schedule': timedelta(seconds=30), 'options': {'expires': 20}},
'cluster_heartbeat': {
'task': 'awx.main.tasks.system.cluster_node_heartbeat',
'schedule': CLUSTER_NODE_HEARTBEAT_PERIOD,
'schedule': timedelta(seconds=CLUSTER_NODE_HEARTBEAT_PERIOD),
'options': {'expires': 50},
},
'awx.main.tasks.system.gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': 300},
'awx.main.scheduler.tasks.task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': 20, 'options': {'expires': 20}},
'awx.main.scheduler.tasks.dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': 20, 'options': {'expires': 20}},
'awx.main.tasks.system.awx_k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': 60, 'options': {'expires': 50}},
'awx.main.tasks.system.awx_receptor_workunit_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': 60},
'awx.main.analytics.analytics_tasks.send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': 20},
'awx.main.tasks.system.cleanup_images_and_files': {'task': 'awx.main.tasks.system.cleanup_images_and_files', 'schedule': 10800},
'awx.main.tasks.host_metrics.cleanup_host_metrics': {'task': 'awx.main.tasks.host_metrics.cleanup_host_metrics', 'schedule': 12600},
'awx.main.tasks.host_metrics.host_metric_summary_monthly': {'task': 'awx.main.tasks.host_metrics.host_metric_summary_monthly', 'schedule': 14400},
'awx.main.tasks.system.periodic_resource_sync': {'task': 'awx.main.tasks.system.periodic_resource_sync', 'schedule': 900},
'awx.main.tasks.host_indirect.cleanup_and_save_indirect_host_entries_fallback': {
'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)},
'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
'k8s_reaper': {'task': 'awx.main.tasks.system.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}},
'receptor_reaper': {'task': 'awx.main.tasks.system.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)},
'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)},
'cleanup_images': {'task': 'awx.main.tasks.system.cleanup_images_and_files', 'schedule': timedelta(hours=3)},
'cleanup_host_metrics': {'task': 'awx.main.tasks.host_metrics.cleanup_host_metrics', 'schedule': timedelta(hours=3, minutes=30)},
'host_metric_summary_monthly': {'task': 'awx.main.tasks.host_metrics.host_metric_summary_monthly', 'schedule': timedelta(hours=4)},
'periodic_resource_sync': {'task': 'awx.main.tasks.system.periodic_resource_sync', 'schedule': timedelta(minutes=15)},
'cleanup_and_save_indirect_host_entries_fallback': {
'task': 'awx.main.tasks.host_indirect.cleanup_and_save_indirect_host_entries_fallback',
'schedule': 3600,
'schedule': timedelta(minutes=60),
},
}
DISPATCHER_SCHEDULE = {}
for options in CELERYBEAT_SCHEDULE.values():
new_options = options.copy()
task_name = options['task']
# Handle the only one exception case of the heartbeat which has a new implementation
if task_name == 'awx.main.tasks.system.cluster_node_heartbeat':
task_name = 'awx.main.tasks.system.cluster_node_heartbeat'
new_options['task'] = task_name
new_options['schedule'] = options['schedule'].total_seconds()
DISPATCHER_SCHEDULE[task_name] = new_options
# Django Caching Configuration
DJANGO_REDIS_IGNORE_EXCEPTIONS = True
CACHES = {'default': {'BACKEND': 'awx.main.cache.AWXRedisCache', 'LOCATION': 'unix:///var/run/redis/redis.sock?db=1'}}
@@ -700,6 +711,7 @@ DISABLE_LOCAL_AUTH = False
TOWER_URL_BASE = "https://platformhost"
INSIGHTS_URL_BASE = "https://example.org"
INSIGHTS_OIDC_ENDPOINT = "https://sso.example.org/"
INSIGHTS_AGENT_MIME = 'application/example'
# See https://github.com/ansible/awx-facts-playbooks
INSIGHTS_SYSTEM_ID_FILE = '/etc/redhat-access-insights/machine-id'
@@ -1026,14 +1038,12 @@ SPECTACULAR_SETTINGS = {
'SCHEMA_PATH_PREFIX': r'/api/v[0-9]',
'DEFAULT_GENERATOR_CLASS': 'drf_spectacular.generators.SchemaGenerator',
'SCHEMA_COERCE_PATH_PK_SUFFIX': True,
'CONTACT': {'email': 'ansible-community@redhat.com'},
'CONTACT': {'email': 'controller-eng@redhat.com'},
'LICENSE': {'name': 'Apache License'},
'TERMS_OF_SERVICE': 'https://www.google.com/policies/terms/',
# Use our custom schema class that handles swagger_topic and deprecated views
'DEFAULT_SCHEMA_CLASS': 'awx.api.schema.CustomAutoSchema',
'COMPONENT_SPLIT_REQUEST': True,
# Postprocessing hook to filter CredentialType enum values
'POSTPROCESSING_HOOKS': ['awx.api.schema.filter_credential_type_schema'],
'SWAGGER_UI_SETTINGS': {
'deepLinking': True,
'persistAuthorization': True,

View File

@@ -14,7 +14,7 @@ $(function() {
$('span.str').each(function() {
var s = $(this).html();
if (s.match(/^\"\/.+\/\"$/) || s.match(/^\"\/.+\/\?.*\"$/)) {
$(this).html('"<a href=' + s + '>' + s.replaceAll('"', '') + '</a>"');
$(this).html('"<a href=' + s + '>' + s.replace(/\"/g, '') + '</a>"');
}
});
@@ -27,7 +27,7 @@ $(function() {
}).each(function() {
$(this).nextUntil('span.pun:contains("]")').filter('span.str').each(function() {
if ($(this).text().match(/^\".+\"$/)) {
var s = $(this).text().replaceAll('"', '');
var s = $(this).text().replace(/\"/g, '');
$(this).html('"<a href="' + '?host=' + s + '">' + s + '</a>"');
}
else if ($(this).text() !== '"') {

View File

@@ -1,5 +1,4 @@
<!DOCTYPE html>
<html lang="en">
<html>
<head>
<title>Redirecting</title>
<meta http-equiv="refresh" content="0;URL='/#'"/>

View File

@@ -1,5 +1,5 @@
<!DOCTYPE html>
<html lang="en">
<html>
<head>
<title>On Break...</title>
<meta http-equiv="refresh" content="2">
@@ -8,7 +8,7 @@
<body>
<div class="container">
<div class="upper_div">
<img class="main_image" src="/static/awx-spud-reading.svg" alt="AWX mascot reading a book"/>
<img class="main_image" src="/static/awx-spud-reading.svg"/>
<span class="error_number">502</span>
</div>
<div class="message_div">

View File

@@ -1,5 +1,5 @@
<!DOCTYPE html>
<html lang="en">
<html>
<head>
<title>On Break...</title>
<meta http-equiv="refresh" content="2">
@@ -8,7 +8,7 @@
<body>
<div class="container">
<div class="upper_div">
<img class="main_image" src="/static/awx-spud-reading.svg" alt="AWX mascot reading a book"/>
<img class="main_image" src="/static/awx-spud-reading.svg"/>
<span class="error_number">504</span>
</div>
<div class="message_div">

View File

@@ -28,6 +28,7 @@ body {
.upper_div {
background-color: #F8EBA7;
justify-content: center;
align-items: center;
text-align: center;
height: 50%;
align-items: flex-end;
@@ -47,7 +48,7 @@ body {
right: 90px;
font-size:200px;
color: #FDBA48;
font-family: Impact, Haettenschweiler, "Franklin Gothic Bold", Charcoal, "Helvetica Inserat", "Bitstream Vera Sans Bold", "Arial Black", sans-serif;
font-family: Impact, Haettenschweiler, "Franklin Gothic Bold", Charcoal, "Helvetica Inserat", "Bitstream Vera Sans Bold", "Arial Black", "sans serif";
}
.message_div {
@@ -61,7 +62,7 @@ body {
.m1,.m2,.m3 {
color: #151515;
width: 100%;
font-family: redhat-display-medium, sans-serif;
font-family: redhat-display-medium;
}
.m1 {
@@ -77,5 +78,5 @@ body {
.m3 {
font-size: 16px;
padding-top: 20px;
font-family: redhat-display-regular, sans-serif;
font-family: redhat-display-regular;
}

View File

@@ -18,7 +18,7 @@ div.response-info span.meta {
<div class="container">
<div class="navbar-header">
<a class="navbar-brand" href="/">
<img class="logo" src="{% static 'media/logo-header.svg' %}" alt="AWX">
<img class="logo" src="{% static 'media/logo-header.svg' %}">
</a>
<a class="navbar-title" href="{{ request.get_full_path }}">
<span>&nbsp;&mdash; {{name}}</span>

View File

@@ -9,7 +9,7 @@
<div class="well" style="width: 320px; margin-left: auto; margin-right: auto">
<div class="row-fluid">
<form action="{% url 'api:login' %}" method="post">
<form action="{% url 'api:login' %}" role="form" method="post">
{% csrf_token %}
<input type="hidden" name="next" value={% if request.GET.next %}"{{ request.GET.next }}"{% elif request.POST.next %}"{{ request.POST.next }}"{% else %}"{% url 'api:api_root_view' %}"{% endif %} />
<div class="clearfix control-group {% if form.username.errors %}error{% endif %}"

View File

@@ -1,7 +1,6 @@
import sys
import os
import shlex
import warnings
from datetime import datetime
from importlib import import_module
@@ -9,10 +8,10 @@ from importlib import import_module
sys.path.insert(0, os.path.abspath('./rst/rest_api/_swagger'))
project = u'Ansible AWX'
copyright = u'2026, Red Hat'
copyright = u'2024, Red Hat'
author = u'Red Hat'
pubdateshort = '2026-01-07'
pubdateshort = '2024-11-22'
pubdate = datetime.strptime(pubdateshort, '%Y-%m-%d').strftime('%B %d, %Y')
# The name for this set of Sphinx documents. If None, it defaults to
@@ -36,7 +35,6 @@ extensions = [
'sphinx.ext.coverage',
'sphinx.ext.ifconfig',
'sphinx_ansible_theme',
'sphinxcontrib.redoc',
'notfound.extension',
'swagger',
]
@@ -63,27 +61,6 @@ language = 'en'
locale_dirs = ['locale/'] # path is example but recommended.
gettext_compact = False # optional.
redoc = [
{
'name': 'AWX OpenAPI Reference',
'page': 'open_api/explorer',
'spec': 'rst/open_api/schema.json',
'embed': True,
'opts': {
'suppress-warnings': True,
'hide-hostname': True,
}
}
]
# Suppress pkg_resources deprecation from sphinxcontrib-redoc
warnings.filterwarnings(
'ignore',
message='pkg_resources is deprecated',
category=UserWarning,
module='sphinxcontrib.redoc',
)
rst_epilog = """
.. |atapi| replace:: *AWX API Guide*
.. |atrn| replace:: *AWX Release Notes*
@@ -112,4 +89,3 @@ rst_epilog = """
pubdateshort,
pubdate,
)

View File

@@ -1,19 +0,0 @@
import requests
downloads = [
{"url": "https://awx-public-ci-files.s3.amazonaws.com/community-docs/swagger.json", "path": "./docs/docsite/rst/rest_api/_swagger/swagger.json"},
{"url": "https://s3.amazonaws.com/awx-public-ci-files/awx/devel/schema.json", "path": "./docs/docsite/rst/open_api/schema.json"},
]
for item in downloads:
url = item["url"]
filepath = item["path"]
response = requests.get(url)
if response.status_code == 200:
with open(filepath, 'wb') as file:
file.write(response.content)
print(f"JSON file downloaded to {filepath}")
else:
print(f"Request failed with status code: {response.status_code}")

View File

@@ -3,8 +3,6 @@
sphinx # Tooling to build HTML from RST source.
sphinx-ansible-theme # Ansible community theme for Sphinx doc builds.
sphinx-notfound-page # Sphinx extension for custom 404 page.
sphinxcontrib-redoc # Renders OpenAPI spec in human readable format.
setuptools >= 65.0 # Provides pkg_resources module for compatibility. Needed by sphinxcontrib-redoc.
docutils # Tooling for RST processing and the swagger extension.
Jinja2 # Requires investigation. Possibly inherited from previous repo with a custom theme.
PyYaml # Requires investigation. Possibly used as tooling for swagger API reference content.

View File

@@ -8,10 +8,6 @@ alabaster==1.0.0
# via sphinx
ansible-pygments==0.1.1
# via sphinx-ansible-theme
attrs==25.4.0
# via
# jsonschema
# referencing
babel==2.16.0
# via sphinx
certifi==2024.8.30
@@ -31,11 +27,6 @@ jinja2==3.1.4
# via
# -r docs/docsite/requirements.in
# sphinx
# sphinxcontrib-redoc
jsonschema==4.26.0
# via sphinxcontrib-redoc
jsonschema-specifications==2025.9.1
# via jsonschema
markupsafe==3.0.2
# via jinja2
packaging==24.2
@@ -45,21 +36,9 @@ pygments==2.18.0
# ansible-pygments
# sphinx
pyyaml==6.0.2
# via
# -r docs/docsite/requirements.in
# sphinxcontrib-redoc
referencing==0.37.0
# via
# jsonschema
# jsonschema-specifications
# via -r docs/docsite/requirements.in
requests==2.32.3
# via sphinx
rpds-py==0.30.0
# via
# jsonschema
# referencing
six==1.17.0
# via sphinxcontrib-redoc
snowballstemmer==2.2.0
# via sphinx
sphinx==8.1.3
@@ -69,7 +48,6 @@ sphinx==8.1.3
# sphinx-notfound-page
# sphinx-rtd-theme
# sphinxcontrib-jquery
# sphinxcontrib-redoc
sphinx-ansible-theme==0.10.3
# via -r docs/docsite/requirements.in
sphinx-notfound-page==1.0.4
@@ -88,15 +66,7 @@ sphinxcontrib-jsmath==1.0.1
# via sphinx
sphinxcontrib-qthelp==2.0.0
# via sphinx
sphinxcontrib-redoc==1.6.0
# via -r docs/docsite/requirements.in
sphinxcontrib-serializinghtml==2.0.0
# via sphinx
typing-extensions==4.15.0
# via referencing
urllib3==2.2.3
# via requests
# The following packages are considered to be unsafe in a requirements file:
setuptools==80.9.0
# via -r docs/docsite/requirements.in

View File

@@ -18,11 +18,9 @@ Ansible AWX helps teams manage complex multi-tier deployments by adding control,
contributor/DJANGO_REQUIREMENTS
contributor/API_REQUIREMENTS
.. toctree::
:maxdepth: 2
:caption: Developers
rest_api/index
open_api/index

View File

@@ -1 +0,0 @@
:orphan:

View File

@@ -1,10 +0,0 @@
===================
AWX OpenAPI Schema
===================
This document describes the OpenAPI 3.0.3 specification for the AWX API (version v2).
This schema serves as the complete API documentation and contract for interacting programmatically with AWX, which is used for managing Ansible automation workflows, inventories, credentials, and job execution.
* `Explore the AWX OpenAPI Schema <explorer.html>`_
* `Download the AWX OpenAPI Schema <https://s3.amazonaws.com/awx-public-ci-files/awx/devel/schema.json>`_

View File

@@ -0,0 +1,13 @@
import requests
url = "https://awx-public-ci-files.s3.amazonaws.com/community-docs/swagger.json"
swagger_json = "./docs/docsite/rst/rest_api/_swagger/swagger.json"
response = requests.get(url)
if response.status_code == 200:
with open(swagger_json, 'wb') as file:
file.write(response.content)
print(f"JSON file downloaded to {swagger_json}")
else:
print(f"Request failed with status code: {response.status_code}")

View File

@@ -110,7 +110,7 @@ associated Python code:
Dispatcher Implementation
-------------------------
Every node in an AWX install runs `awx-manage dispatcherd`, a Python process
Every node in an AWX install runs `awx-manage run_dispatcher`, a Python process
that uses the `kombu` library to consume messages from the appropriate queues
for that node (the default shared queue, a queue specific to the node's
hostname, and the broadcast queue). The Dispatcher process manages a pool of
@@ -121,11 +121,11 @@ the associated Python code.
Debugging
---------
`awx-manage dispatcherctl` includes a few flags that allow interaction and
`awx-manage run_dispatcher` includes a few flags that allow interaction and
debugging:
```
[root@awx /]# awx-manage dispatcherctl status
[root@awx /]# awx-manage run_dispatcher --status
2018-09-14 18:39:22,223 WARNING awx.main.dispatch checking dispatcher status for awx
awx[pid:9610] workers total=4 min=4 max=60
. worker[pid:9758] sent=12 finished=12 qsize=0 rss=106.730MB [IDLE]
@@ -139,7 +139,7 @@ This outputs running and queued task UUIDs handled by a specific dispatcher
(which corresponds to `main_unifiedjob.celery_task_id` in the database):
```
[root@awx /]# awx-manage dispatcherctl running
[root@awx /]# awx-manage run_dispatcher --running
2018-09-14 18:39:22,223 WARNING awx.main.dispatch checking dispatcher running for awx
['eb3b0a83-86da-413d-902a-16d7530a6b25', 'f447266a-23da-42b4-8025-fe379d2db96f']
```

View File

@@ -46,7 +46,6 @@ pexpect
prometheus-client
psycopg
psutil
pyasn1>=0.6.2 # CVE-2026-2349
pygerduty
PyGithub
pyopenssl

View File

@@ -116,7 +116,7 @@ cython==3.1.3
# via -r /awx_devel/requirements/requirements.in
daphne==4.2.1
# via -r /awx_devel/requirements/requirements.in
dispatcherd[pg-notify]==2026.01.27
dispatcherd[pg-notify]==2025.12.12
# via -r /awx_devel/requirements/requirements.in
distro==1.9.0
# via -r /awx_devel/requirements/requirements.in
@@ -363,9 +363,8 @@ psycopg==3.2.10
# dispatcherd
ptyprocess==0.7.0
# via pexpect
pyasn1==0.6.2
pyasn1==0.6.1
# via
# -r /awx_devel/requirements/requirements.in
# pyasn1-modules
# service-identity
pyasn1-modules==0.4.2

View File

@@ -77,9 +77,7 @@ sonar.exclusions=\
**/*.pyd,\
**/build/**,\
**/dist/**,\
**/*.egg-info/**,\
**/download-json.py,\
docs/docsite/conf.py
**/*.egg-info/**
# =============================================================================
# COVERAGE EXCLUSIONS

View File

@@ -10,7 +10,7 @@ pidfile = /var/run/supervisor/supervisor.task.pid
command = make dispatcher
directory = /awx_devel
{% else %}
command = awx-manage dispatcherd
command = awx-manage run_dispatcher
directory = /var/lib/awx
{% endif %}
autorestart = true

View File

@@ -4,7 +4,7 @@ minfds = 4096
nodaemon=true
[program:awx-dispatcher]
command = awx-manage dispatcherd
command = awx-manage run_dispatcher
autorestart = true
stopasgroup=true
killasgroup=true

View File

@@ -102,7 +102,7 @@
"-b",
"provision_instance",
"run_callback_receiver",
"dispatcherd",
"run_dispatcher",
"run_rsyslog_configurer",
"run_ws_heartbeat",
"run_wsrelay",
@@ -112,7 +112,7 @@
"-b",
"provision_instance",
"run_callback_receiver",
"dispatcherd",
"run_dispatcher",
"run_rsyslog_configurer",
"run_ws_heartbeat",
"run_wsrelay",

View File

@@ -9,7 +9,7 @@ except ImportError:
SOSREPORT_CONTROLLER_COMMANDS = [
"awx-manage --version", # controller version
"awx-manage list_instances", # controller cluster configuration
"awx-manage dispatcherctl status", # controller dispatch comprehensive status
"awx-manage run_dispatcher --status", # controller dispatch worker status
"awx-manage run_callback_receiver --status", # controller callback worker status
"awx-manage check_license --data", # controller license status
"awx-manage run_wsrelay --status", # controller websocket relay status

View File

@@ -36,5 +36,5 @@ deps =
-r{toxinidir}/docs/docsite/requirements.in
-c{toxinidir}/docs/docsite/requirements.txt
commands =
python {toxinidir}/docs/docsite/download-json.py
python {toxinidir}/docs/docsite/rst/rest_api/_swagger/download-json.py
sphinx-build -T -E -W -n --keep-going {tty:--color} -j auto -c docs/docsite -d docs/docsite/build/doctrees -b html docs/docsite/rst docs/docsite/build/html