[4.6 backport] Feature indirect host counting (#15802) (#6858)

* Feature indirect host counting (#15802)

* AAP-37282 Add parse JQ data and test it for a `job` object in isolation (#15774)

* Add jq dependency

* Add file in progress

* Add license for jq

* Write test and get it passing

* Successfully test collection of `event_query.yml` data (#15761)

* Callback plugin method from cmeyers adapted to global collection list

Get tests passing

Mild rebranding

Put behind feature flag, flip true in dev

Add noqa flag

* Add missing wait_for_events

* feat: try grabbing query files from artifacts directory (#15776)

* Contract changes for the event_query collection callback plugin (#15785)

* Minor import changes to collection processing in callback plugin

* Move agreed location of event_query file

* feat: remaining schema changes for indirect host audits (#15787)

* Re-organize test file and move artifacts processing logic to callback (#15784)

* Rename the indirect host counting test file

* Combine artifacts saving logic

* Connect host audit model to jq logic via new task

* Add unit tests for indirect host counting (#15792)

* Do not get django flags from database (#15794)

* Document, implement, and test remaining indirect host audit fields (#15796)

* Document, implement, and test remaining indirect host audit fields

* Fix hashing

* AAP-39559 Wait for all event processing to finish, add fallback task (#15798)

* Wait for all event processing to finish, add fallback task

* Add flag check to periodic task

* feat: cleanup of old indirect host audit records (#15800)

* By default, do not count indirect hosts (#15801)

* By default, do not count indirect hosts

* Fix copy paste goof

* Fix linter issue from base branch

* prevent multiple tasks from processing the same job events, prevent p… (#15805)

prevent multiple tasks from processing the same job events, prevent periodic task from spawning another task per job

* Fix typos and other bugs found by Pablo review

* fix: rely on resolved_action instead of task, adapt to proposed query… (#15815)

* fix: rely on resolved_action instead of task, adapt to proposed query structure

* tests: update indirect host tests

* update remaining queries to new format

* update live test

* Remove polling loop for job finishing event processing (#15811)

* Remove polling loop for job finishing event processing

* Make awx/main/tests/live dramatically faster (#15780)

* AAP-37282 Add parse JQ data and test it for a `job` object in isolation (#15774)

* Add jq dependency

* Add file in progress

* Add license for jq

* Write test and get it passing

* Successfully test collection of `event_query.yml` data (#15761)

* Callback plugin method from cmeyers adapted to global collection list

Get tests passing

Mild rebranding

Put behind feature flag, flip true in dev

Add noqa flag

* Add missing wait_for_events

* feat: try grabbing query files from artifacts directory (#15776)

* Contract changes for the event_query collection callback plugin (#15785)

* Minor import changes to collection processing in callback plugin

* Move agreed location of event_query file

* feat: remaining schema changes for indirect host audits (#15787)

* Re-organize test file and move artifacts processing logic to callback (#15784)

* Rename the indirect host counting test file

* Combine artifacts saving logic

* Connect host audit model to jq logic via new task

* Document, implement, and test remaining indirect host audit fields (#15796)

* AAP-39559 Wait for all event processing to finish, add fallback task (#15798)

* Wait for all event processing to finish, add fallback task

* Add flag check to periodic task

* feat: cleanup of old indirect host audit records (#15800)

* prevent multiple tasks from processing the same job events, prevent p… (#15805)

prevent multiple tasks from processing the same job events, prevent periodic task from spawning another task per job

* Remove polling loop for job finishing event processing (#15811)

* Make awx/main/tests/live dramatically faster (#15780)

* reorder migrations to allow indirect instances backport

* cleanup for rebase and merge into devel

---------

Co-authored-by: Peter Braun <pbraun@redhat.com>
Co-authored-by: jessicamack <jmack@redhat.com>
Co-authored-by: Peter Braun <pbranu@redhat.com>
This commit is contained in:
Alan Rominger
2025-02-24 16:55:44 -05:00
committed by GitHub
parent 2d648d1225
commit b502a9444a
24 changed files with 1278 additions and 22 deletions

View File

@@ -20,6 +20,7 @@ from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, Inv
from awx.main.constants import ACTIVE_STATES
from awx.main.models.events import emit_event_detail
from awx.main.utils.profiling import AWXProfiler
from awx.main.tasks.system import events_processed_hook
import awx.main.analytics.subsystem_metrics as s_metrics
from .base import BaseWorker
@@ -46,7 +47,7 @@ def job_stats_wrapup(job_identifier, event=None):
# If the status was a finished state before this update was made, send notifications
# If not, we will send notifications when the status changes
if uj.status not in ACTIVE_STATES:
uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')
events_processed_hook(uj)
except Exception:
logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier))

View File

@@ -0,0 +1,85 @@
# Generated by Django 4.2.16 on 2025-01-29 20:13
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('main', '0195_EE_permissions'),
]
operations = [
migrations.AddField(
model_name='job',
name='event_queries_processed',
field=models.BooleanField(default=True, help_text='Events of this job have been queried for indirect host information, or do not need processing.'),
),
migrations.CreateModel(
name='EventQuery',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('fqcn', models.CharField(help_text='Fully-qualified collection name.', max_length=255)),
('collection_version', models.CharField(help_text='Version of the collection this data applies to.', max_length=32)),
('event_query', models.JSONField(default=dict, help_text='The extensions/audit/event_query.yml file content scraped from the collection.')),
],
options={
'unique_together': {('fqcn', 'collection_version')},
},
),
migrations.CreateModel(
name='IndirectManagedNodeAudit',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateTimeField(auto_now_add=True)),
('name', models.CharField(help_text='The Ansible name of the host that this audit record is for.', max_length=255)),
('canonical_facts', models.JSONField(default=dict, help_text='Facts about the host that will be used for managed node deduplication.')),
('facts', models.JSONField(default=dict, help_text='Non canonical facts having additional info about the managed node.')),
('events', models.JSONField(default=list, help_text='List of fully-qualified names of modules that ran against the host in the job.')),
('count', models.PositiveIntegerField(default=0, help_text='Counter of how many times registered modules were invoked on the host.')),
(
'host',
models.ForeignKey(
help_text='The host this audit record is for.',
null=True,
on_delete=django.db.models.deletion.DO_NOTHING,
related_name='host_indirect_host_audits',
to='main.host',
),
),
(
'inventory',
models.ForeignKey(
help_text='The inventory the related job ran against, and which the related host is in.',
null=True,
on_delete=django.db.models.deletion.DO_NOTHING,
related_name='inventory_indirect_host_audits',
to='main.inventory',
),
),
(
'job',
models.ForeignKey(
editable=False,
help_text='Data saved in this record only applies to this specified job.',
on_delete=django.db.models.deletion.DO_NOTHING,
related_name='job_indirect_host_audits',
to='main.job',
),
),
(
'organization',
models.ForeignKey(
help_text='Applicable organization, inferred from the related job.',
on_delete=django.db.models.deletion.DO_NOTHING,
related_name='organization_indirect_host_audits',
to='main.organization',
),
),
],
options={
'unique_together': {('name', 'job')},
},
),
]

View File

@@ -0,0 +1,28 @@
from django.core.exceptions import ValidationError
from django.db import models
from django.utils.translation import gettext_lazy as _
from awx.main.models import BaseModel
class EventQuery(BaseModel):
"""
Event queries are jq present in some collections and used to filter job events
for indirectly created resources.
"""
class Meta:
app_label = 'main'
unique_together = ['fqcn', 'collection_version']
fqcn = models.CharField(max_length=255, help_text=_('Fully-qualified collection name.'))
collection_version = models.CharField(max_length=32, help_text=_('Version of the collection this data applies to.'))
event_query = models.JSONField(default=dict, help_text=_('The extensions/audit/event_query.yml file content scraped from the collection.'))
def validate_unique(self, exclude=None):
try:
EventQuery.objects.get(fqcn=self.fqcn, collection_version=self.collection_version)
except EventQuery.DoesNotExist:
return
raise ValidationError(f'an event query for collection {self.fqcn}, version {self.collection_version} already exists')

View File

@@ -0,0 +1,54 @@
from django.db.models.deletion import DO_NOTHING
from django.db.models.fields import DateTimeField, CharField, PositiveIntegerField
from django.db.models.fields.json import JSONField
from django.db.models.fields.related import ForeignKey
from django.utils.translation import gettext_lazy as _
from awx.main.models import BaseModel
class IndirectManagedNodeAudit(BaseModel):
"""
IndirectManagedNodeAudit stores information about indirectly created or managed hosts
"""
class Meta:
app_label = 'main'
unique_together = [('name', 'job')]
created = DateTimeField(auto_now_add=True)
job = ForeignKey(
'Job',
related_name='job_indirect_host_audits',
on_delete=DO_NOTHING,
editable=False,
help_text=_('Data saved in this record only applies to this specified job.'),
)
organization = ForeignKey(
'Organization',
related_name='organization_indirect_host_audits',
on_delete=DO_NOTHING,
help_text=_('Applicable organization, inferred from the related job.'),
)
inventory = ForeignKey(
'Inventory',
related_name='inventory_indirect_host_audits',
null=True,
on_delete=DO_NOTHING,
help_text=_('The inventory the related job ran against, and which the related host is in.'),
)
host = ForeignKey('Host', related_name='host_indirect_host_audits', null=True, on_delete=DO_NOTHING, help_text=_('The host this audit record is for.'))
name = CharField(max_length=255, help_text=_('The Ansible name of the host that this audit record is for.'))
canonical_facts = JSONField(default=dict, help_text=_('Facts about the host that will be used for managed node deduplication.'))
facts = JSONField(default=dict, help_text=_('Non canonical facts having additional info about the managed node.'))
events = JSONField(default=list, help_text=_('List of fully-qualified names of modules that ran against the host in the job.'))
count = PositiveIntegerField(default=0, help_text=_('Counter of how many times registered modules were invoked on the host.'))

View File

@@ -607,6 +607,10 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana
default=1,
help_text=_("If ran as part of sliced jobs, the total number of slices. If 1, job is not part of a sliced job."),
)
event_queries_processed = models.BooleanField(
default=True,
help_text=_("Events of this job have been queried for indirect host information, or do not need processing."),
)
def _get_parent_field_name(self):
return 'job_template'

View File

@@ -1,10 +1,15 @@
import json
import os.path
import time
import logging
from collections import deque
from typing import Tuple, Optional
from awx.main.models.event_query import EventQuery
# Django
from django.conf import settings
from django.core.exceptions import ValidationError
from django_guid import get_guid
from django.utils.functional import cached_property
from django.db import connections
@@ -15,11 +20,67 @@ from awx.main.constants import MINIMAL_EVENTS, ANSIBLE_RUNNER_NEEDS_UPDATE_MESSA
from awx.main.utils.update_model import update_model
from awx.main.queue import CallbackQueueDispatcher
from flags.state import flag_enabled
logger = logging.getLogger('awx.main.tasks.callback')
def collect_queries(query_file_contents) -> dict:
"""
collect_queries extracts host queries from the contents of
ansible_data.json
"""
result = {}
try:
installed_collections = query_file_contents['installed_collections']
except KeyError:
logger.error("installed_collections missing in callback response")
return result
for key, value in installed_collections.items():
if 'host_query' in value and 'version' in value:
result[key] = value
return result
COLLECTION_FILENAME = "ansible_data.json"
def try_load_query_file(artifact_dir) -> Tuple[bool, Optional[dict]]:
"""
try_load_query_file checks the artifact directory after job completion and
returns the contents of ansible_data.json if present
"""
if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
return False, None
queries_path = os.path.join(artifact_dir, COLLECTION_FILENAME)
if not os.path.isfile(queries_path):
logger.info(f"no query file found: {queries_path}")
return False, None
try:
f = open(queries_path, "r")
except OSError as e:
logger.error(f"error opening query file {queries_path}: {e}")
return False, None
with f:
try:
queries = json.load(f)
except ValueError as e:
logger.error(f"error parsing query file {queries_path}: {e}")
return False, None
return True, queries
class RunnerCallback:
def __init__(self, model=None):
self.instance = None
self.parent_workflow_job_id = None
self.host_map = {}
self.guid = get_guid()
@@ -214,6 +275,32 @@ class RunnerCallback:
self.delay_update(**{field_name: field_value})
def artifacts_handler(self, artifact_dir):
success, query_file_contents = try_load_query_file(artifact_dir)
if success:
self.delay_update(event_queries_processed=False)
collections_info = collect_queries(query_file_contents)
for collection, data in collections_info.items():
version = data['version']
event_query = data['host_query']
instance = EventQuery(fqcn=collection, collection_version=version, event_query=event_query)
try:
instance.validate_unique()
instance.save()
logger.info(f"eventy query for collection {collection}, version {version} created")
except ValidationError as e:
logger.info(e)
if 'installed_collections' in query_file_contents:
self.delay_update(installed_collections=query_file_contents['installed_collections'])
else:
logger.warning(f'The file {COLLECTION_FILENAME} unexpectedly did not contain installed_collections')
if 'ansible_version' in query_file_contents:
self.delay_update(ansible_version=query_file_contents['ansible_version'])
else:
logger.warning(f'The file {COLLECTION_FILENAME} unexpectedly did not contain ansible_version')
self.artifacts_processed = True

View File

@@ -0,0 +1,200 @@
import logging
from typing import Tuple, Union
import yaml
import jq
from django.utils.timezone import now, timedelta
from django.conf import settings
from django.db import transaction
# Django flags
from flags.state import flag_enabled
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit
from awx.main.models.event_query import EventQuery
from awx.main.models import Job
logger = logging.getLogger(__name__)
class UnhashableFacts(RuntimeError):
pass
def get_hashable_form(input_data: Union[dict, list, Tuple, int, float, str, bool]) -> Tuple[Union[Tuple, int, float, str, bool]]:
"Given a dictionary of JSON types, return something that can be hashed and is the same data"
if isinstance(input_data, (int, float, str, bool)):
return input_data # return scalars as-is
if isinstance(input_data, dict):
# Can't hash because we got a dict? Make the dict a tuple of tuples.
# Can't hash the data in the tuple in the tuple? We'll make tuples out of them too.
return tuple(sorted(((get_hashable_form(k), get_hashable_form(v)) for k, v in input_data.items())))
elif isinstance(input_data, (list, tuple)):
# Nested list data might not be hashable, and lists were never hashable in the first place
return tuple(get_hashable_form(item) for item in input_data)
raise UnhashableFacts(f'Cannonical facts contains a {type(input_data)} type which can not be hashed.')
def build_indirect_host_data(job: Job, job_event_queries: dict[str, dict[str, str]]) -> list[IndirectManagedNodeAudit]:
results = {}
compiled_jq_expressions = {} # Cache for compiled jq expressions
facts_missing_logged = False
unhashable_facts_logged = False
for event in job.job_events.filter(event_data__isnull=False).iterator():
if 'res' not in event.event_data:
continue
if 'resolved_action' not in event.event_data or event.event_data['resolved_action'] not in job_event_queries.keys():
continue
resolved_action = event.event_data['resolved_action']
# We expect a dict with a 'query' key for the resolved_action
if 'query' not in job_event_queries[resolved_action]:
continue
# Recall from cache, or process the jq expression, and loop over the jq results
jq_str_for_event = job_event_queries[resolved_action]['query']
if jq_str_for_event not in compiled_jq_expressions:
compiled_jq_expressions[resolved_action] = jq.compile(jq_str_for_event)
compiled_jq = compiled_jq_expressions[resolved_action]
for data in compiled_jq.input(event.event_data['res']).all():
# From this jq result (specific to a single Ansible module), get index information about this host record
if not data.get('canonical_facts'):
if not facts_missing_logged:
logger.error(f'jq output missing canonical_facts for module {resolved_action} on event {event.id} using jq:{jq_str_for_event}')
facts_missing_logged = True
continue
canonical_facts = data['canonical_facts']
try:
hashable_facts = get_hashable_form(canonical_facts)
except UnhashableFacts:
if not unhashable_facts_logged:
logger.info(f'Could not hash canonical_facts {canonical_facts}, skipping')
unhashable_facts_logged = True
continue
# Obtain the record based on the hashable canonical_facts now determined
facts = data.get('facts')
if hashable_facts in results:
audit_record = results[hashable_facts]
else:
audit_record = IndirectManagedNodeAudit(
canonical_facts=canonical_facts,
facts=facts,
job=job,
organization=job.organization,
name=event.host_name,
)
results[hashable_facts] = audit_record
# Increment rolling count fields
if resolved_action not in audit_record.events:
audit_record.events.append(resolved_action)
audit_record.count += 1
return list(results.values())
def fetch_job_event_query(job: Job) -> dict[str, dict[str, str]]:
"""Returns the following data structure
{
"demo.query.example": {"query": {canonical_facts: {host_name: .direct_host_name}}}
}
The keys are fully-qualified Ansible module names, and the values are dicts containing jq expressions.
This contains all event query expressions that pertain to the given job
"""
net_job_data = {}
for fqcn, collection_data in job.installed_collections.items():
event_query = EventQuery.objects.filter(fqcn=fqcn, collection_version=collection_data['version']).first()
if event_query:
collection_data = yaml.safe_load(event_query.event_query)
net_job_data.update(collection_data)
return net_job_data
def save_indirect_host_entries_of_job(job: Job) -> None:
"Once we have a job and we know that we want to do indirect host processing, this is called"
job_event_queries = fetch_job_event_query(job)
records = build_indirect_host_data(job, job_event_queries)
IndirectManagedNodeAudit.objects.bulk_create(records)
job.event_queries_processed = True
def cleanup_old_indirect_host_entries() -> None:
"""
We assume that indirect host audit results older than one week have already been collected for analysis
and can be cleaned up
"""
limit = now() - timedelta(days=settings.INDIRECT_HOST_AUDIT_RECORD_MAX_AGE_DAYS)
IndirectManagedNodeAudit.objects.filter(created__lt=limit).delete()
@task(queue=get_task_queuename)
def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> None:
try:
job = Job.objects.get(id=job_id)
except Job.DoesNotExist:
logger.debug(f'Job {job_id} seems to be deleted, bailing from save_indirect_host_entries')
return
if wait_for_events:
# Gate running this task on the job having all events processed, not just EOF or playbook_on_stats
current_events = job.job_events.count()
if current_events < job.emitted_events:
logger.info(f'Event count {current_events} < {job.emitted_events} for job_id={job_id}, delaying processing of indirect host tracking')
return
job.log_lifecycle(f'finished processing {current_events} events, running save_indirect_host_entries')
with transaction.atomic():
"""
Pre-emptively set the job marker to 'events processed'. This prevents other instances from running the
same task.
"""
try:
job = Job.objects.select_for_update().get(id=job_id)
except job.DoesNotExist:
logger.debug(f'Job {job_id} seems to be deleted, bailing from save_indirect_host_entries')
return
if job.event_queries_processed is True:
# this can mean one of two things:
# 1. another instance has already processed the events of this job
# 2. the artifacts_handler has not yet been called for this job
return
job.event_queries_processed = True
job.save(update_fields=['event_queries_processed'])
try:
save_indirect_host_entries_of_job(job)
except Exception:
logger.exception(f'Error processing indirect host data for job_id={job_id}')
@task(queue=get_task_queuename)
def cleanup_and_save_indirect_host_entries_fallback() -> None:
if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
return
try:
cleanup_old_indirect_host_entries()
except Exception as e:
logger.error(f"error cleaning up indirect host audit records: {e}")
job_ct = 0
right_now_time = now()
window_end = right_now_time - timedelta(minutes=settings.INDIRECT_HOST_QUERY_FALLBACK_MINUTES)
window_start = right_now_time - timedelta(days=settings.INDIRECT_HOST_QUERY_FALLBACK_GIVEUP_DAYS)
for job in Job.objects.filter(event_queries_processed=False, finished__lte=window_end, finished__gte=window_start).iterator():
save_indirect_host_entries(job.id, wait_for_events=True)
job_ct += 1
if job_ct:
logger.info(f'Restarted event processing for {job_ct} jobs')

View File

@@ -66,6 +66,7 @@ from awx.main.tasks.policy import evaluate_policy
from awx.main.tasks.signals import with_signal_handling, signal_callback
from awx.main.tasks.receptor import AWXReceptorJob
from awx.main.tasks.facts import start_fact_cache, finish_fact_cache
from awx.main.tasks.system import update_smart_memberships_for_inventory, update_inventory_computed_fields, events_processed_hook
from awx.main.exceptions import AwxTaskError, PolicyEvaluationError, PostRunError, ReceptorNodeNotFound
from awx.main.utils.ansible import read_ansible_config
from awx.main.utils.execution_environments import CONTAINER_ROOT, to_container_path
@@ -80,9 +81,11 @@ from awx.main.utils.common import (
)
from awx.conf.license import get_license
from awx.main.utils.handlers import SpecialInventoryHandler
from awx.main.tasks.system import update_smart_memberships_for_inventory, update_inventory_computed_fields
from awx.main.utils.update_model import update_model
# Django flags
from flags.state import flag_enabled
logger = logging.getLogger('awx.main.tasks.jobs')
@@ -436,20 +439,6 @@ class BaseTask(object):
Hook for any steps to run after job/task is marked as complete.
"""
instance.log_lifecycle("finalize_run")
artifact_dir = os.path.join(private_data_dir, 'artifacts', str(self.instance.id))
collections_info = os.path.join(artifact_dir, 'collections.json')
ansible_version_file = os.path.join(artifact_dir, 'ansible_version.txt')
if os.path.exists(collections_info):
with open(collections_info) as ee_json_info:
ee_collections_info = json.loads(ee_json_info.read())
instance.installed_collections = ee_collections_info
instance.save(update_fields=['installed_collections'])
if os.path.exists(ansible_version_file):
with open(ansible_version_file) as ee_ansible_info:
ansible_version_info = ee_ansible_info.readline()
instance.ansible_version = ansible_version_info
instance.save(update_fields=['ansible_version'])
# Run task manager appropriately for speculative dependencies
if instance.unifiedjob_blocked_jobs.exists():
@@ -652,7 +641,7 @@ class BaseTask(object):
# Field host_status_counts is used as a metric to check if event processing is finished
# we send notifications if it is, if not, callback receiver will send them
if (self.instance.host_status_counts is not None) or (not self.runner_callback.wrapup_event_dispatched):
self.instance.send_notification_templates('succeeded' if status == 'successful' else 'failed')
events_processed_hook(self.instance)
try:
self.final_run_hook(self.instance, status, private_data_dir)
@@ -927,11 +916,16 @@ class RunJob(SourceControlMixin, BaseTask):
if authorize:
env['ANSIBLE_NET_AUTH_PASS'] = network_cred.get_input('authorize_password', default='')
path_vars = (
path_vars = [
('ANSIBLE_COLLECTIONS_PATHS', 'collections_paths', 'requirements_collections', '~/.ansible/collections:/usr/share/ansible/collections'),
('ANSIBLE_ROLES_PATH', 'roles_path', 'requirements_roles', '~/.ansible/roles:/usr/share/ansible/roles:/etc/ansible/roles'),
('ANSIBLE_COLLECTIONS_PATH', 'collections_path', 'requirements_collections', '~/.ansible/collections:/usr/share/ansible/collections'),
)
]
if flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
path_vars.append(
('ANSIBLE_CALLBACK_PLUGINS', 'callback_plugins', 'plugins_path', '~/.ansible/plugins:/plugins/callback:/usr/share/ansible/plugins/callback'),
)
config_values = read_ansible_config(os.path.join(private_data_dir, 'project'), list(map(lambda x: x[1], path_vars)))
@@ -948,6 +942,11 @@ class RunJob(SourceControlMixin, BaseTask):
paths = [os.path.join(CONTAINER_ROOT, folder)] + paths
env[env_key] = os.pathsep.join(paths)
if flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
env['ANSIBLE_CALLBACKS_ENABLED'] = 'indirect_instance_count'
if 'callbacks_enabled' in config_values:
env['ANSIBLE_CALLBACKS_ENABLED'] += ':' + config_values['callbacks_enabled']
return env
def build_args(self, job, private_data_dir, passwords):
@@ -1388,6 +1387,17 @@ class RunProjectUpdate(BaseTask):
shutil.copytree(cache_subpath, dest_subpath, symlinks=True)
logger.debug('{0} {1} prepared {2} from cache'.format(type(project).__name__, project.pk, dest_subpath))
if flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
# copy the special callback (not stdout type) plugin to get list of collections
pdd_plugins_path = os.path.join(job_private_data_dir, 'plugins_path')
if not os.path.exists(pdd_plugins_path):
os.mkdir(pdd_plugins_path)
from awx.playbooks import library
plugin_file_source = os.path.join(library.__path__._path[0], 'indirect_instance_count.py')
plugin_file_dest = os.path.join(pdd_plugins_path, 'indirect_instance_count.py')
shutil.copyfile(plugin_file_source, plugin_file_dest)
def post_run_hook(self, instance, status):
super(RunProjectUpdate, self).post_run_hook(instance, status)
# To avoid hangs, very important to release lock even if errors happen here

View File

@@ -29,6 +29,8 @@ from django.core.exceptions import ObjectDoesNotExist
# Django-CRUM
from crum import impersonate
# Django flags
from flags.state import flag_enabled
# Runner
import ansible_runner.cleanup
@@ -62,6 +64,7 @@ from awx.main.utils.common import ignore_inventory_computed_fields, ignore_inven
from awx.main.utils.reload import stop_local_services
from awx.main.utils.pglock import advisory_lock
from awx.main.tasks.helpers import is_run_threshold_reached
from awx.main.tasks.host_indirect import save_indirect_host_entries
from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanup, administrative_workunit_reaper, write_receptor_config
from awx.main.consumers import emit_channel_notification
from awx.main import analytics
@@ -364,6 +367,20 @@ def send_notifications(notification_list, job_id=None):
logger.exception('Error saving notification {} result.'.format(notification.id))
def events_processed_hook(unified_job):
"""This method is intended to be called for every unified job
after the playbook_on_stats/EOF event is processed and final status is saved
Either one of these events could happen before the other, or there may be no events"""
unified_job.send_notification_templates('succeeded' if unified_job.status == 'successful' else 'failed')
if isinstance(unified_job, Job) and flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
if unified_job.event_queries_processed is True:
# If this is called from callback receiver, it likely does not have updated model data
# a refresh now is formally robust
unified_job.refresh_from_db(fields=['event_queries_processed'])
if unified_job.event_queries_processed is False:
save_indirect_host_entries.delay(unified_job.id)
@task(queue=get_task_queuename)
def gather_analytics():
if is_run_threshold_reached(getattr(settings, 'AUTOMATION_ANALYTICS_LAST_GATHER', None), settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL):

View File

@@ -0,0 +1,4 @@
---
demo.query.example:
query: >-
{canonical_facts: {host_name: .direct_host_name}, facts: {device_type: .device_type}}

View File

@@ -0,0 +1,77 @@
#!/usr/bin/python
# Same licensing as AWX
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = r'''
---
module: example
short_description: Module for specific live tests
version_added: "2.0.0"
description: This module is part of a test collection in local source.
options:
host_name:
description: Name to return as the host name.
required: false
type: str
author:
- AWX Live Tests
'''
EXAMPLES = r'''
- name: Test with defaults
demo.query.example:
- name: Test with custom host name
demo.query.example:
host_name: foo_host
'''
RETURN = r'''
direct_host_name:
description: The name of the host, this will be collected with the feature.
type: str
returned: always
sample: 'foo_host'
'''
from ansible.module_utils.basic import AnsibleModule
def run_module():
module_args = dict(
host_name=dict(type='str', required=False, default='foo_host_default'),
)
result = dict(
changed=False,
other_data='sample_string',
)
module = AnsibleModule(argument_spec=module_args, supports_check_mode=True)
if module.check_mode:
module.exit_json(**result)
result['direct_host_name'] = module.params['host_name']
result['nested_host_name'] = {'host_name': module.params['host_name']}
# non-cononical facts
result['device_type'] = 'Fake Host'
module.exit_json(**result)
def main():
run_module()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,223 @@
import yaml
from unittest import mock
import pytest
from django.utils.timezone import now, timedelta
from awx.main.tasks.host_indirect import (
build_indirect_host_data,
fetch_job_event_query,
save_indirect_host_entries,
cleanup_and_save_indirect_host_entries_fallback,
)
from awx.main.models.event_query import EventQuery
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit
"""These are unit tests, similar to test_indirect_host_counting in the live tests"""
TEST_JQ = "{canonical_facts: {host_name: .direct_host_name}, facts: {another_host_name: .direct_host_name}}"
@pytest.fixture
def bare_job(job_factory):
job = job_factory()
job.installed_collections = {'demo.query': {'version': '1.0.1'}, 'demo2.query': {'version': '1.0.1'}}
job.event_queries_processed = False
job.save(update_fields=['installed_collections', 'event_queries_processed'])
return job
def create_registered_event(job, task_name='demo.query.example'):
return job.job_events.create(event_data={'resolved_action': task_name, 'res': {'direct_host_name': 'foo_host'}})
@pytest.fixture
def job_with_counted_event(bare_job):
create_registered_event(bare_job)
return bare_job
def create_event_query(fqcn='demo.query'):
module_name = f'{fqcn}.example'
return EventQuery.objects.create(fqcn=fqcn, collection_version='1.0.1', event_query=yaml.dump({module_name: {'query': TEST_JQ}}, default_flow_style=False))
def create_audit_record(name, job, organization, created=now()):
record = IndirectManagedNodeAudit.objects.create(name=name, job=job, organization=organization)
record.created = created
record.save()
return record
@pytest.fixture
def event_query():
"This is ordinarily created by the artifacts callback"
return create_event_query()
@pytest.fixture
def old_audit_record(bare_job, organization):
created_at = now() - timedelta(days=10)
return create_audit_record(name="old_job", job=bare_job, organization=organization, created=created_at)
@pytest.fixture
def new_audit_record(bare_job, organization):
return IndirectManagedNodeAudit.objects.create(name="new_job", job=bare_job, organization=organization)
# ---- end fixtures ----
@pytest.mark.django_db
def test_build_with_no_results(bare_job):
# never filled in events, should do nothing
assert build_indirect_host_data(bare_job, {}) == []
@pytest.mark.django_db
def test_collect_an_event(job_with_counted_event):
records = build_indirect_host_data(job_with_counted_event, {'demo.query.example': {'query': TEST_JQ}})
assert len(records) == 1
@pytest.mark.django_db
def test_fetch_job_event_query(bare_job, event_query):
assert fetch_job_event_query(bare_job) == {'demo.query.example': {'query': TEST_JQ}}
@pytest.mark.django_db
def test_fetch_multiple_job_event_query(bare_job):
create_event_query(fqcn='demo.query')
create_event_query(fqcn='demo2.query')
assert fetch_job_event_query(bare_job) == {'demo.query.example': {'query': TEST_JQ}, 'demo2.query.example': {'query': TEST_JQ}}
@pytest.mark.django_db
def test_save_indirect_host_entries(job_with_counted_event, event_query):
assert job_with_counted_event.event_queries_processed is False
save_indirect_host_entries(job_with_counted_event.id)
job_with_counted_event.refresh_from_db()
assert job_with_counted_event.event_queries_processed is True
assert IndirectManagedNodeAudit.objects.filter(job=job_with_counted_event).count() == 1
host_audit = IndirectManagedNodeAudit.objects.filter(job=job_with_counted_event).first()
assert host_audit.count == 1
assert host_audit.canonical_facts == {'host_name': 'foo_host'}
assert host_audit.facts == {'another_host_name': 'foo_host'}
assert host_audit.organization == job_with_counted_event.organization
@pytest.mark.django_db
def test_multiple_events_same_module_same_host(bare_job, event_query):
"This tests that the count field gives correct answers"
create_registered_event(bare_job)
create_registered_event(bare_job)
create_registered_event(bare_job)
save_indirect_host_entries(bare_job.id)
assert IndirectManagedNodeAudit.objects.filter(job=bare_job).count() == 1
host_audit = IndirectManagedNodeAudit.objects.filter(job=bare_job).first()
assert host_audit.count == 3
assert host_audit.events == ['demo.query.example']
@pytest.mark.django_db
def test_multiple_registered_modules(bare_job):
"This tests that the events will list multiple modules if more than 1 module from different collections is registered and used"
create_registered_event(bare_job, task_name='demo.query.example')
create_registered_event(bare_job, task_name='demo2.query.example')
# These take the place of using the event_query fixture
create_event_query(fqcn='demo.query')
create_event_query(fqcn='demo2.query')
save_indirect_host_entries(bare_job.id)
assert IndirectManagedNodeAudit.objects.filter(job=bare_job).count() == 1
host_audit = IndirectManagedNodeAudit.objects.filter(job=bare_job).first()
assert host_audit.count == 2
assert set(host_audit.events) == {'demo.query.example', 'demo2.query.example'}
@pytest.mark.django_db
def test_multiple_registered_modules_same_collection(bare_job):
"This tests that the events will list multiple modules if more than 1 module in same collection is registered and used"
create_registered_event(bare_job, task_name='demo.query.example')
create_registered_event(bare_job, task_name='demo.query.example2')
# Takes place of event_query fixture, doing manually here
EventQuery.objects.create(
fqcn='demo.query',
collection_version='1.0.1',
event_query=yaml.dump(
{
'demo.query.example': {'query': TEST_JQ},
'demo.query.example2': {'query': TEST_JQ},
},
default_flow_style=False,
),
)
save_indirect_host_entries(bare_job.id)
assert IndirectManagedNodeAudit.objects.filter(job=bare_job).count() == 1
host_audit = IndirectManagedNodeAudit.objects.filter(job=bare_job).first()
assert host_audit.count == 2
assert set(host_audit.events) == {'demo.query.example', 'demo.query.example2'}
@pytest.mark.django_db
def test_events_not_fully_processed_no_op(bare_job):
# I have a job that produced 12 events, but those are not saved
bare_job.emitted_events = 12
bare_job.finished = now()
bare_job.save(update_fields=['emitted_events', 'finished'])
# Running the normal post-run task will do nothing at this point
assert bare_job.event_queries_processed is False
with mock.patch('time.sleep'): # for test speedup
save_indirect_host_entries(bare_job.id)
bare_job.refresh_from_db()
assert bare_job.event_queries_processed is False
# Right away, the fallback processing will not run either
cleanup_and_save_indirect_host_entries_fallback()
bare_job.refresh_from_db()
assert bare_job.event_queries_processed is False
# After 3 hours have passed...
bare_job.finished = now() - timedelta(hours=3)
# Create the expected job events
for _ in range(12):
create_registered_event(bare_job)
bare_job.save(update_fields=['finished'])
# The fallback task will now process indirect host query data for this job
cleanup_and_save_indirect_host_entries_fallback()
# Test code to process anyway, events collected or not
save_indirect_host_entries(bare_job.id, wait_for_events=False)
bare_job.refresh_from_db()
assert bare_job.event_queries_processed is True
@pytest.mark.django_db
def test_job_id_does_not_exist():
save_indirect_host_entries(10000001)
@pytest.mark.django_db
def test_cleanup_old_audit_records(old_audit_record, new_audit_record):
count_before_cleanup = IndirectManagedNodeAudit.objects.count()
assert count_before_cleanup == 2
cleanup_and_save_indirect_host_entries_fallback()
count_after_cleanup = IndirectManagedNodeAudit.objects.count()
assert count_after_cleanup == 1

View File

@@ -0,0 +1,182 @@
import subprocess
import time
import os
import shutil
import tempfile
import pytest
from django.conf import settings
from awx.api.versioning import reverse
# These tests are invoked from the awx/main/tests/live/ subfolder
# so any fixtures from higher-up conftest files must be explicitly included
from awx.main.tests.functional.conftest import * # noqa
from awx.main.tests.conftest import load_all_credentials # noqa: F401; pylint: disable=unused-import
from awx.main.tests import data
from awx.main.models import Project, JobTemplate, Organization, Inventory
PROJ_DATA = os.path.join(os.path.dirname(data.__file__), 'projects')
def _copy_folders(source_path, dest_path, clear=False):
"folder-by-folder, copy dirs in the source root dir to the destination root dir"
for dirname in os.listdir(source_path):
source_dir = os.path.join(source_path, dirname)
expected_dir = os.path.join(dest_path, dirname)
if clear and os.path.exists(expected_dir):
shutil.rmtree(expected_dir)
if (not os.path.isdir(source_dir)) or os.path.exists(expected_dir):
continue
shutil.copytree(source_dir, expected_dir)
GIT_COMMANDS = (
'git config --global init.defaultBranch devel; '
'git init; '
'git config user.email jenkins@ansible.com; '
'git config user.name DoneByTest; '
'git add .; '
'git commit -m "initial commit"'
)
@pytest.fixture(scope='session')
def live_tmp_folder():
path = os.path.join(tempfile.gettempdir(), 'live_tests')
if os.path.exists(path):
shutil.rmtree(path)
os.mkdir(path)
_copy_folders(PROJ_DATA, path)
for dirname in os.listdir(path):
source_dir = os.path.join(path, dirname)
subprocess.run(GIT_COMMANDS, cwd=source_dir, shell=True)
if path not in settings.AWX_ISOLATION_SHOW_PATHS:
settings.AWX_ISOLATION_SHOW_PATHS = settings.AWX_ISOLATION_SHOW_PATHS + [path]
return path
def wait_to_leave_status(job, status, timeout=30, sleep_time=0.1):
"""Wait until the job does NOT have the specified status with some timeout
the default timeout is based on the task manager running a 20 second
schedule, and the API does not guarentee working jobs faster than this
"""
start = time.time()
while time.time() - start < timeout:
job.refresh_from_db()
if job.status != status:
return
time.sleep(sleep_time)
raise RuntimeError(f'Job failed to exit {status} in {timeout} seconds. job_explanation={job.job_explanation} tb={job.result_traceback}')
def wait_for_events(uj, timeout=2):
start = time.time()
while uj.event_processing_finished is False:
time.sleep(0.2)
uj.refresh_from_db()
if time.time() - start > timeout:
break
def unified_job_stdout(uj):
wait_for_events(uj)
return '\n'.join([event.stdout for event in uj.get_event_queryset().order_by('created')])
def wait_for_job(job, final_status='successful', running_timeout=800):
wait_to_leave_status(job, 'pending')
wait_to_leave_status(job, 'waiting')
wait_to_leave_status(job, 'running', timeout=running_timeout)
assert job.status == final_status, f'Job was not successful id={job.id} status={job.status} tb={job.result_traceback} output=\n{unified_job_stdout(job)}'
@pytest.fixture(scope='session')
def default_org():
org = Organization.objects.filter(name='Default').first()
if org is None:
raise Exception('Tests expect Default org to already be created and it is not')
return org
@pytest.fixture(scope='session')
def demo_inv(default_org):
inventory, _ = Inventory.objects.get_or_create(name='Demo Inventory', defaults={'organization': default_org})
return inventory
@pytest.fixture
def podman_image_generator():
"""
Generate a tagless podman image from awx base EE
"""
def fn():
dockerfile = """
FROM quay.io/ansible/awx-ee:latest
RUN echo "Hello, Podman!" > /tmp/hello.txt
"""
cmd = ['podman', 'build', '-f', '-'] # Create an image without a tag
subprocess.run(cmd, capture_output=True, input=dockerfile, text=True, check=True)
return fn
@pytest.fixture
def run_job_from_playbook(default_org, demo_inv, post, admin):
def _rf(test_name, playbook, local_path=None, scm_url=None):
project_name = f'{test_name} project'
jt_name = f'{test_name} JT: {playbook}'
old_proj = Project.objects.filter(name=project_name).first()
if old_proj:
old_proj.delete()
old_jt = JobTemplate.objects.filter(name=jt_name).first()
if old_jt:
old_jt.delete()
proj_kwargs = {'name': project_name, 'organization': default_org.id}
if local_path:
# manual path
proj_kwargs['scm_type'] = ''
proj_kwargs['local_path'] = local_path
elif scm_url:
proj_kwargs['scm_type'] = 'git'
proj_kwargs['scm_url'] = scm_url
else:
raise RuntimeError('Need to provide scm_url or local_path')
result = post(
reverse('api:project_list'),
proj_kwargs,
admin,
expect=201,
)
proj = Project.objects.get(id=result.data['id'])
if proj.current_job:
wait_for_job(proj.current_job)
assert proj.get_project_path()
assert playbook in proj.playbooks
result = post(
reverse('api:job_template_list'),
{'name': jt_name, 'project': proj.id, 'playbook': playbook, 'inventory': demo_inv.id},
admin,
expect=201,
)
jt = JobTemplate.objects.get(id=result.data['id'])
job = jt.create_unified_job()
job.signal_start()
wait_for_job(job)
assert job.status == 'successful'
return _rf

View File

@@ -0,0 +1,14 @@
import pytest
import os
from django.conf import settings
from awx.main.tests.live.tests.conftest import _copy_folders, PROJ_DATA
@pytest.fixture(scope='session')
def copy_project_folders():
proj_root = settings.PROJECTS_ROOT
if not os.path.exists(proj_root):
os.mkdir(proj_root)
_copy_folders(PROJ_DATA, proj_root, clear=True)

View File

@@ -0,0 +1,65 @@
import yaml
import time
from awx.main.tests.live.tests.conftest import wait_for_events
from awx.main.tasks.host_indirect import build_indirect_host_data, save_indirect_host_entries
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit
from awx.main.models import Job
def test_indirect_host_counting(live_tmp_folder, run_job_from_playbook):
run_job_from_playbook('test_indirect_host_counting', 'run_task.yml', scm_url=f'file://{live_tmp_folder}/test_host_query')
job = Job.objects.filter(name__icontains='test_indirect_host_counting').order_by('-created').first()
wait_for_events(job) # We must wait for events because system tasks iterate on job.job_events.filter(...)
# Data matches to awx/main/tests/data/projects/host_query/extensions/audit/event_query.yml
# this just does things in-line to be a more localized test for the immediate testing
module_jq_str = '{canonical_facts: {host_name: .direct_host_name}, facts: {device_type: .device_type}}'
event_query = {'demo.query.example': {'query': module_jq_str}}
# Run the task logic directly with local data
results = build_indirect_host_data(job, event_query)
assert len(results) == 1
host_audit_entry = results[0]
canonical_facts = {'host_name': 'foo_host_default'}
facts = {'device_type': 'Fake Host'}
# Asserts on data that will match to the input jq string from above
assert host_audit_entry.canonical_facts == canonical_facts
assert host_audit_entry.facts == facts
# Test collection of data
assert 'demo.query' in job.installed_collections
assert 'host_query' in job.installed_collections['demo.query']
hq_text = job.installed_collections['demo.query']['host_query']
hq_data = yaml.safe_load(hq_text)
assert hq_data == {'demo.query.example': {'query': module_jq_str}}
assert job.ansible_version
# Poll for events finishing processing, because background task requires this
for _ in range(10):
if job.job_events.count() >= job.emitted_events:
break
time.sleep(0.2)
else:
raise RuntimeError(f'job id={job.id} never processed events')
# Task might not run due to race condition, so make it run here
job.refresh_from_db()
if job.event_queries_processed is False:
save_indirect_host_entries.delay(job.id, wait_for_events=False)
# This will poll for the background task to finish
for _ in range(10):
if IndirectManagedNodeAudit.objects.filter(job=job).exists():
break
time.sleep(0.2)
else:
raise RuntimeError(f'No IndirectManagedNodeAudit records ever populated for job_id={job.id}')
assert IndirectManagedNodeAudit.objects.filter(job=job).count() == 1
host_audit = IndirectManagedNodeAudit.objects.filter(job=job).first()
assert host_audit.canonical_facts == canonical_facts
assert host_audit.facts == facts
assert host_audit.organization == job.organization

View File

@@ -0,0 +1,56 @@
import copy
import pytest
from awx.main.tasks.host_indirect import get_hashable_form
class TestHashableForm:
@pytest.mark.parametrize(
'data',
[
{'a': 'b'},
['a', 'b'],
('a', 'b'),
{'a': {'b': 'c'}},
{'a': ['b', 'c']},
{'a': ('b', 'c')},
['a', ['b', 'c']],
['a', ('b', 'c')],
['a', {'b': 'c'}],
],
)
def test_compare_equal_data(self, data):
other_data = copy.deepcopy(data)
# A tuple of scalars may be cached so ids could legitimately be the same
if data != ('a', 'b'):
assert id(data) != id(other_data) # sanity
assert id(get_hashable_form(data)) != id(get_hashable_form(data))
assert get_hashable_form(data) == get_hashable_form(data)
assert hash(get_hashable_form(data)) == hash(get_hashable_form(data))
assert get_hashable_form(data) in {get_hashable_form(data): 1} # test lookup hit
@pytest.mark.parametrize(
'data, other_data',
[
[{'a': 'b'}, {'a': 'c'}],
[{'a': 'b'}, {'a': 'b', 'c': 'd'}],
[['a', 'b'], ['a', 'c']],
[('a', 'b'), ('a', 'c')],
[{'a': {'b': 'c'}}, {'a': {'b': 'd'}}],
[{'a': ['b', 'c']}, {'a': ['b', 'd']}],
[{'a': ('b', 'c')}, {'a': ('b', 'd')}],
[['a', ['b', 'c']], ['a', ['b', 'd']]],
[['a', ('b', 'c')], ['a', ('b', 'd')]],
[['a', {'b': 'c'}], ['a', {'b': 'd'}]],
],
)
def test_compare_different_data(self, data, other_data):
assert data != other_data # sanity, otherwise why test this?
assert get_hashable_form(data) != get_hashable_form(other_data)
assert hash(get_hashable_form(data)) != hash(get_hashable_form(other_data))
assert get_hashable_form(other_data) not in {get_hashable_form(data): 1} # test lookup miss
assert get_hashable_form(data) not in {get_hashable_form(other_data): 1}

View File

@@ -0,0 +1,94 @@
# (C) 2012, Michael DeHaan, <michael.dehaan@gmail.com>
# (c) 2017 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
DOCUMENTATION = '''
callback: host_query
type: notification
short_description: for demo of indirect host data and counting, this produces collection data
version_added: historical
description:
- Saves collection data to artifacts folder
requirements:
- Whitelist in configuration
- Set AWX_ISOLATED_DATA_DIR, AWX will do this
'''
import os
import json
from importlib.resources import files
from ansible.plugins.callback import CallbackBase
# NOTE: in Ansible 1.2 or later general logging is available without
# this plugin, just set ANSIBLE_LOG_PATH as an environment variable
# or log_path in the DEFAULTS section of your ansible configuration
# file. This callback is an example of per hosts logging for those
# that want it.
# Taken from https://github.com/ansible/ansible/blob/devel/lib/ansible/cli/galaxy.py#L1624
from ansible.cli.galaxy import with_collection_artifacts_manager
from ansible.release import __version__
from ansible.galaxy.collection import find_existing_collections
from ansible.utils.collection_loader import AnsibleCollectionConfig
import ansible.constants as C
@with_collection_artifacts_manager
def list_collections(artifacts_manager=None):
artifacts_manager.require_build_metadata = False
default_collections_path = set(C.COLLECTIONS_PATHS)
collections_search_paths = default_collections_path | set(AnsibleCollectionConfig.collection_paths)
collections = list(find_existing_collections(list(collections_search_paths), artifacts_manager, dedupe=False))
return collections
class CallbackModule(CallbackBase):
"""
logs playbook results, per host, in /var/log/ansible/hosts
"""
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'notification'
CALLBACK_NAME = 'indirect_instance_count'
CALLBACK_NEEDS_WHITELIST = True
TIME_FORMAT = "%b %d %Y %H:%M:%S"
MSG_FORMAT = "%(now)s - %(category)s - %(data)s\n\n"
def v2_playbook_on_stats(self, stats):
artifact_dir = os.getenv('AWX_ISOLATED_DATA_DIR')
if not artifact_dir:
raise RuntimeError('Only suitable in AWX, did not find private_data_dir')
collections_print = {}
# Loop over collections, from ansible-core these are Candidate objects
for candidate in list_collections():
collection_print = {
'version': candidate.ver,
}
query_file = files(f'ansible_collections.{candidate.namespace}.{candidate.name}') / 'extensions' / 'audit' / 'event_query.yml'
if query_file.exists():
with query_file.open('r') as f:
collection_print['host_query'] = f.read()
collections_print[candidate.fqcn] = collection_print
ansible_data = {'installed_collections': collections_print, 'ansible_version': __version__}
write_path = os.path.join(artifact_dir, 'ansible_data.json')
with open(write_path, "w") as fd:
fd.write(json.dumps(ansible_data, indent=2))
super().v2_playbook_on_stats(stats)

View File

@@ -501,6 +501,10 @@ CELERYBEAT_SCHEDULE = {
'cleanup_host_metrics': {'task': 'awx.main.tasks.host_metrics.cleanup_host_metrics', 'schedule': timedelta(hours=3, minutes=30)},
'host_metric_summary_monthly': {'task': 'awx.main.tasks.host_metrics.host_metric_summary_monthly', 'schedule': timedelta(hours=4)},
'periodic_resource_sync': {'task': 'awx.main.tasks.system.periodic_resource_sync', 'schedule': timedelta(minutes=15)},
'cleanup_and_save_indirect_host_entries_fallback': {
'task': 'awx.main.tasks.host_indirect.cleanup_and_save_indirect_host_entries_fallback',
'schedule': timedelta(minutes=60),
},
}
# Django Caching Configuration
@@ -1213,6 +1217,18 @@ ANSIBLE_BASE_ALLOW_SINGLETON_ROLES_API = False # Do not allow creating user-def
# system username for django-ansible-base
SYSTEM_USERNAME = None
# For indirect host query processing
# if a job is not immediently confirmed to have all events processed
# it will be eligable for processing after this number of minutes
INDIRECT_HOST_QUERY_FALLBACK_MINUTES = 60
# If an error happens in event collection, give up after this time
INDIRECT_HOST_QUERY_FALLBACK_GIVEUP_DAYS = 3
# Maximum age for indirect host audit records
# Older records will be cleaned up
INDIRECT_HOST_AUDIT_RECORD_MAX_AGE_DAYS = 7
# setting for Policy as Code feature
FEATURE_POLICY_AS_CODE_ENABLED = False
@@ -1232,4 +1248,7 @@ OPA_REQUEST_RETRIES = 2 # Number of retries to connect to OPA service, defaults
# feature flags
FLAG_SOURCES = ('flags.sources.SettingsFlagsSource',)
FLAGS = {'FEATURE_POLICY_AS_CODE_ENABLED': [{'condition': 'boolean', 'value': False}]}
FLAGS = {
'FEATURE_POLICY_AS_CODE_ENABLED': [{'condition': 'boolean', 'value': False}],
'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}],
}

View File

@@ -67,6 +67,9 @@ CLUSTER_HOST_ID = socket.gethostname()
AWX_CALLBACK_PROFILE = True
# this modifies FLAGS set by defaults
FLAGS['FEATURE_INDIRECT_NODE_COUNTING_ENABLED'] = [{'condition': 'boolean', 'value': True}] # noqa
# ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!=================================
# Disable normal scheduled/triggered task managers (DependencyManager, TaskManager, WorkflowManager).
# Allows user to trigger task managers directly for debugging and profiling purposes.