diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 199302c76c..6df75595ba 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -20,6 +20,7 @@ from awx.main.models import JobEvent, AdHocCommandEvent, ProjectUpdateEvent, Inv from awx.main.constants import ACTIVE_STATES from awx.main.models.events import emit_event_detail from awx.main.utils.profiling import AWXProfiler +from awx.main.tasks.system import events_processed_hook import awx.main.analytics.subsystem_metrics as s_metrics from .base import BaseWorker @@ -46,7 +47,7 @@ def job_stats_wrapup(job_identifier, event=None): # If the status was a finished state before this update was made, send notifications # If not, we will send notifications when the status changes if uj.status not in ACTIVE_STATES: - uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed') + events_processed_hook(uj) except Exception: logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier)) diff --git a/awx/main/migrations/0196_indirect_managed_node_audit.py b/awx/main/migrations/0196_indirect_managed_node_audit.py new file mode 100644 index 0000000000..be9c5498a6 --- /dev/null +++ b/awx/main/migrations/0196_indirect_managed_node_audit.py @@ -0,0 +1,85 @@ +# Generated by Django 4.2.16 on 2025-01-29 20:13 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0195_EE_permissions'), + ] + + operations = [ + migrations.AddField( + model_name='job', + name='event_queries_processed', + field=models.BooleanField(default=True, help_text='Events of this job have been queried for indirect host information, or do not need processing.'), + ), + migrations.CreateModel( + name='EventQuery', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('fqcn', models.CharField(help_text='Fully-qualified collection name.', max_length=255)), + ('collection_version', models.CharField(help_text='Version of the collection this data applies to.', max_length=32)), + ('event_query', models.JSONField(default=dict, help_text='The extensions/audit/event_query.yml file content scraped from the collection.')), + ], + options={ + 'unique_together': {('fqcn', 'collection_version')}, + }, + ), + migrations.CreateModel( + name='IndirectManagedNodeAudit', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created', models.DateTimeField(auto_now_add=True)), + ('name', models.CharField(help_text='The Ansible name of the host that this audit record is for.', max_length=255)), + ('canonical_facts', models.JSONField(default=dict, help_text='Facts about the host that will be used for managed node deduplication.')), + ('facts', models.JSONField(default=dict, help_text='Non canonical facts having additional info about the managed node.')), + ('events', models.JSONField(default=list, help_text='List of fully-qualified names of modules that ran against the host in the job.')), + ('count', models.PositiveIntegerField(default=0, help_text='Counter of how many times registered modules were invoked on the host.')), + ( + 'host', + models.ForeignKey( + help_text='The host this audit record is for.', + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + related_name='host_indirect_host_audits', + to='main.host', + ), + ), + ( + 'inventory', + models.ForeignKey( + help_text='The inventory the related job ran against, and which the related host is in.', + null=True, + on_delete=django.db.models.deletion.DO_NOTHING, + related_name='inventory_indirect_host_audits', + to='main.inventory', + ), + ), + ( + 'job', + models.ForeignKey( + editable=False, + help_text='Data saved in this record only applies to this specified job.', + on_delete=django.db.models.deletion.DO_NOTHING, + related_name='job_indirect_host_audits', + to='main.job', + ), + ), + ( + 'organization', + models.ForeignKey( + help_text='Applicable organization, inferred from the related job.', + on_delete=django.db.models.deletion.DO_NOTHING, + related_name='organization_indirect_host_audits', + to='main.organization', + ), + ), + ], + options={ + 'unique_together': {('name', 'job')}, + }, + ), + ] diff --git a/awx/main/models/event_query.py b/awx/main/models/event_query.py new file mode 100644 index 0000000000..b608fea2ea --- /dev/null +++ b/awx/main/models/event_query.py @@ -0,0 +1,28 @@ +from django.core.exceptions import ValidationError +from django.db import models +from django.utils.translation import gettext_lazy as _ + +from awx.main.models import BaseModel + + +class EventQuery(BaseModel): + """ + Event queries are jq present in some collections and used to filter job events + for indirectly created resources. + """ + + class Meta: + app_label = 'main' + unique_together = ['fqcn', 'collection_version'] + + fqcn = models.CharField(max_length=255, help_text=_('Fully-qualified collection name.')) + collection_version = models.CharField(max_length=32, help_text=_('Version of the collection this data applies to.')) + event_query = models.JSONField(default=dict, help_text=_('The extensions/audit/event_query.yml file content scraped from the collection.')) + + def validate_unique(self, exclude=None): + try: + EventQuery.objects.get(fqcn=self.fqcn, collection_version=self.collection_version) + except EventQuery.DoesNotExist: + return + + raise ValidationError(f'an event query for collection {self.fqcn}, version {self.collection_version} already exists') diff --git a/awx/main/models/indirect_managed_node_audit.py b/awx/main/models/indirect_managed_node_audit.py new file mode 100644 index 0000000000..08e18403ea --- /dev/null +++ b/awx/main/models/indirect_managed_node_audit.py @@ -0,0 +1,54 @@ +from django.db.models.deletion import DO_NOTHING +from django.db.models.fields import DateTimeField, CharField, PositiveIntegerField +from django.db.models.fields.json import JSONField +from django.db.models.fields.related import ForeignKey +from django.utils.translation import gettext_lazy as _ + +from awx.main.models import BaseModel + + +class IndirectManagedNodeAudit(BaseModel): + """ + IndirectManagedNodeAudit stores information about indirectly created or managed hosts + """ + + class Meta: + app_label = 'main' + unique_together = [('name', 'job')] + + created = DateTimeField(auto_now_add=True) + + job = ForeignKey( + 'Job', + related_name='job_indirect_host_audits', + on_delete=DO_NOTHING, + editable=False, + help_text=_('Data saved in this record only applies to this specified job.'), + ) + + organization = ForeignKey( + 'Organization', + related_name='organization_indirect_host_audits', + on_delete=DO_NOTHING, + help_text=_('Applicable organization, inferred from the related job.'), + ) + + inventory = ForeignKey( + 'Inventory', + related_name='inventory_indirect_host_audits', + null=True, + on_delete=DO_NOTHING, + help_text=_('The inventory the related job ran against, and which the related host is in.'), + ) + + host = ForeignKey('Host', related_name='host_indirect_host_audits', null=True, on_delete=DO_NOTHING, help_text=_('The host this audit record is for.')) + + name = CharField(max_length=255, help_text=_('The Ansible name of the host that this audit record is for.')) + + canonical_facts = JSONField(default=dict, help_text=_('Facts about the host that will be used for managed node deduplication.')) + + facts = JSONField(default=dict, help_text=_('Non canonical facts having additional info about the managed node.')) + + events = JSONField(default=list, help_text=_('List of fully-qualified names of modules that ran against the host in the job.')) + + count = PositiveIntegerField(default=0, help_text=_('Counter of how many times registered modules were invoked on the host.')) diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index 307e49aaa6..e5f7864722 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -607,6 +607,10 @@ class Job(UnifiedJob, JobOptions, SurveyJobMixin, JobNotificationMixin, TaskMana default=1, help_text=_("If ran as part of sliced jobs, the total number of slices. If 1, job is not part of a sliced job."), ) + event_queries_processed = models.BooleanField( + default=True, + help_text=_("Events of this job have been queried for indirect host information, or do not need processing."), + ) def _get_parent_field_name(self): return 'job_template' diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py index 069bc408c9..c6d89d0b79 100644 --- a/awx/main/tasks/callback.py +++ b/awx/main/tasks/callback.py @@ -1,10 +1,15 @@ import json +import os.path import time import logging from collections import deque +from typing import Tuple, Optional + +from awx.main.models.event_query import EventQuery # Django from django.conf import settings +from django.core.exceptions import ValidationError from django_guid import get_guid from django.utils.functional import cached_property from django.db import connections @@ -15,11 +20,67 @@ from awx.main.constants import MINIMAL_EVENTS, ANSIBLE_RUNNER_NEEDS_UPDATE_MESSA from awx.main.utils.update_model import update_model from awx.main.queue import CallbackQueueDispatcher +from flags.state import flag_enabled + logger = logging.getLogger('awx.main.tasks.callback') +def collect_queries(query_file_contents) -> dict: + """ + collect_queries extracts host queries from the contents of + ansible_data.json + """ + result = {} + + try: + installed_collections = query_file_contents['installed_collections'] + except KeyError: + logger.error("installed_collections missing in callback response") + return result + + for key, value in installed_collections.items(): + if 'host_query' in value and 'version' in value: + result[key] = value + + return result + + +COLLECTION_FILENAME = "ansible_data.json" + + +def try_load_query_file(artifact_dir) -> Tuple[bool, Optional[dict]]: + """ + try_load_query_file checks the artifact directory after job completion and + returns the contents of ansible_data.json if present + """ + + if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): + return False, None + + queries_path = os.path.join(artifact_dir, COLLECTION_FILENAME) + if not os.path.isfile(queries_path): + logger.info(f"no query file found: {queries_path}") + return False, None + + try: + f = open(queries_path, "r") + except OSError as e: + logger.error(f"error opening query file {queries_path}: {e}") + return False, None + + with f: + try: + queries = json.load(f) + except ValueError as e: + logger.error(f"error parsing query file {queries_path}: {e}") + return False, None + + return True, queries + + class RunnerCallback: def __init__(self, model=None): + self.instance = None self.parent_workflow_job_id = None self.host_map = {} self.guid = get_guid() @@ -214,6 +275,32 @@ class RunnerCallback: self.delay_update(**{field_name: field_value}) def artifacts_handler(self, artifact_dir): + success, query_file_contents = try_load_query_file(artifact_dir) + if success: + self.delay_update(event_queries_processed=False) + collections_info = collect_queries(query_file_contents) + for collection, data in collections_info.items(): + version = data['version'] + event_query = data['host_query'] + instance = EventQuery(fqcn=collection, collection_version=version, event_query=event_query) + try: + instance.validate_unique() + instance.save() + + logger.info(f"eventy query for collection {collection}, version {version} created") + except ValidationError as e: + logger.info(e) + + if 'installed_collections' in query_file_contents: + self.delay_update(installed_collections=query_file_contents['installed_collections']) + else: + logger.warning(f'The file {COLLECTION_FILENAME} unexpectedly did not contain installed_collections') + + if 'ansible_version' in query_file_contents: + self.delay_update(ansible_version=query_file_contents['ansible_version']) + else: + logger.warning(f'The file {COLLECTION_FILENAME} unexpectedly did not contain ansible_version') + self.artifacts_processed = True diff --git a/awx/main/tasks/host_indirect.py b/awx/main/tasks/host_indirect.py new file mode 100644 index 0000000000..d7e9edffaf --- /dev/null +++ b/awx/main/tasks/host_indirect.py @@ -0,0 +1,200 @@ +import logging +from typing import Tuple, Union + +import yaml + +import jq + +from django.utils.timezone import now, timedelta +from django.conf import settings +from django.db import transaction + +# Django flags +from flags.state import flag_enabled + +from awx.main.dispatch.publish import task +from awx.main.dispatch import get_task_queuename +from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit +from awx.main.models.event_query import EventQuery +from awx.main.models import Job + +logger = logging.getLogger(__name__) + + +class UnhashableFacts(RuntimeError): + pass + + +def get_hashable_form(input_data: Union[dict, list, Tuple, int, float, str, bool]) -> Tuple[Union[Tuple, int, float, str, bool]]: + "Given a dictionary of JSON types, return something that can be hashed and is the same data" + if isinstance(input_data, (int, float, str, bool)): + return input_data # return scalars as-is + if isinstance(input_data, dict): + # Can't hash because we got a dict? Make the dict a tuple of tuples. + # Can't hash the data in the tuple in the tuple? We'll make tuples out of them too. + return tuple(sorted(((get_hashable_form(k), get_hashable_form(v)) for k, v in input_data.items()))) + elif isinstance(input_data, (list, tuple)): + # Nested list data might not be hashable, and lists were never hashable in the first place + return tuple(get_hashable_form(item) for item in input_data) + raise UnhashableFacts(f'Cannonical facts contains a {type(input_data)} type which can not be hashed.') + + +def build_indirect_host_data(job: Job, job_event_queries: dict[str, dict[str, str]]) -> list[IndirectManagedNodeAudit]: + results = {} + compiled_jq_expressions = {} # Cache for compiled jq expressions + facts_missing_logged = False + unhashable_facts_logged = False + + for event in job.job_events.filter(event_data__isnull=False).iterator(): + if 'res' not in event.event_data: + continue + + if 'resolved_action' not in event.event_data or event.event_data['resolved_action'] not in job_event_queries.keys(): + continue + + resolved_action = event.event_data['resolved_action'] + + # We expect a dict with a 'query' key for the resolved_action + if 'query' not in job_event_queries[resolved_action]: + continue + + # Recall from cache, or process the jq expression, and loop over the jq results + jq_str_for_event = job_event_queries[resolved_action]['query'] + + if jq_str_for_event not in compiled_jq_expressions: + compiled_jq_expressions[resolved_action] = jq.compile(jq_str_for_event) + compiled_jq = compiled_jq_expressions[resolved_action] + for data in compiled_jq.input(event.event_data['res']).all(): + # From this jq result (specific to a single Ansible module), get index information about this host record + if not data.get('canonical_facts'): + if not facts_missing_logged: + logger.error(f'jq output missing canonical_facts for module {resolved_action} on event {event.id} using jq:{jq_str_for_event}') + facts_missing_logged = True + continue + canonical_facts = data['canonical_facts'] + try: + hashable_facts = get_hashable_form(canonical_facts) + except UnhashableFacts: + if not unhashable_facts_logged: + logger.info(f'Could not hash canonical_facts {canonical_facts}, skipping') + unhashable_facts_logged = True + continue + + # Obtain the record based on the hashable canonical_facts now determined + facts = data.get('facts') + if hashable_facts in results: + audit_record = results[hashable_facts] + else: + audit_record = IndirectManagedNodeAudit( + canonical_facts=canonical_facts, + facts=facts, + job=job, + organization=job.organization, + name=event.host_name, + ) + results[hashable_facts] = audit_record + + # Increment rolling count fields + if resolved_action not in audit_record.events: + audit_record.events.append(resolved_action) + audit_record.count += 1 + + return list(results.values()) + + +def fetch_job_event_query(job: Job) -> dict[str, dict[str, str]]: + """Returns the following data structure + { + "demo.query.example": {"query": {canonical_facts: {host_name: .direct_host_name}}} + } + The keys are fully-qualified Ansible module names, and the values are dicts containing jq expressions. + + This contains all event query expressions that pertain to the given job + """ + net_job_data = {} + for fqcn, collection_data in job.installed_collections.items(): + event_query = EventQuery.objects.filter(fqcn=fqcn, collection_version=collection_data['version']).first() + if event_query: + collection_data = yaml.safe_load(event_query.event_query) + net_job_data.update(collection_data) + return net_job_data + + +def save_indirect_host_entries_of_job(job: Job) -> None: + "Once we have a job and we know that we want to do indirect host processing, this is called" + job_event_queries = fetch_job_event_query(job) + records = build_indirect_host_data(job, job_event_queries) + IndirectManagedNodeAudit.objects.bulk_create(records) + job.event_queries_processed = True + + +def cleanup_old_indirect_host_entries() -> None: + """ + We assume that indirect host audit results older than one week have already been collected for analysis + and can be cleaned up + """ + limit = now() - timedelta(days=settings.INDIRECT_HOST_AUDIT_RECORD_MAX_AGE_DAYS) + IndirectManagedNodeAudit.objects.filter(created__lt=limit).delete() + + +@task(queue=get_task_queuename) +def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> None: + try: + job = Job.objects.get(id=job_id) + except Job.DoesNotExist: + logger.debug(f'Job {job_id} seems to be deleted, bailing from save_indirect_host_entries') + return + + if wait_for_events: + # Gate running this task on the job having all events processed, not just EOF or playbook_on_stats + current_events = job.job_events.count() + if current_events < job.emitted_events: + logger.info(f'Event count {current_events} < {job.emitted_events} for job_id={job_id}, delaying processing of indirect host tracking') + return + job.log_lifecycle(f'finished processing {current_events} events, running save_indirect_host_entries') + + with transaction.atomic(): + """ + Pre-emptively set the job marker to 'events processed'. This prevents other instances from running the + same task. + """ + try: + job = Job.objects.select_for_update().get(id=job_id) + except job.DoesNotExist: + logger.debug(f'Job {job_id} seems to be deleted, bailing from save_indirect_host_entries') + return + + if job.event_queries_processed is True: + # this can mean one of two things: + # 1. another instance has already processed the events of this job + # 2. the artifacts_handler has not yet been called for this job + return + + job.event_queries_processed = True + job.save(update_fields=['event_queries_processed']) + + try: + save_indirect_host_entries_of_job(job) + except Exception: + logger.exception(f'Error processing indirect host data for job_id={job_id}') + + +@task(queue=get_task_queuename) +def cleanup_and_save_indirect_host_entries_fallback() -> None: + if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): + return + + try: + cleanup_old_indirect_host_entries() + except Exception as e: + logger.error(f"error cleaning up indirect host audit records: {e}") + + job_ct = 0 + right_now_time = now() + window_end = right_now_time - timedelta(minutes=settings.INDIRECT_HOST_QUERY_FALLBACK_MINUTES) + window_start = right_now_time - timedelta(days=settings.INDIRECT_HOST_QUERY_FALLBACK_GIVEUP_DAYS) + for job in Job.objects.filter(event_queries_processed=False, finished__lte=window_end, finished__gte=window_start).iterator(): + save_indirect_host_entries(job.id, wait_for_events=True) + job_ct += 1 + if job_ct: + logger.info(f'Restarted event processing for {job_ct} jobs') diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index d077bed816..ff4ddf606c 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -66,6 +66,7 @@ from awx.main.tasks.policy import evaluate_policy from awx.main.tasks.signals import with_signal_handling, signal_callback from awx.main.tasks.receptor import AWXReceptorJob from awx.main.tasks.facts import start_fact_cache, finish_fact_cache +from awx.main.tasks.system import update_smart_memberships_for_inventory, update_inventory_computed_fields, events_processed_hook from awx.main.exceptions import AwxTaskError, PolicyEvaluationError, PostRunError, ReceptorNodeNotFound from awx.main.utils.ansible import read_ansible_config from awx.main.utils.execution_environments import CONTAINER_ROOT, to_container_path @@ -80,9 +81,11 @@ from awx.main.utils.common import ( ) from awx.conf.license import get_license from awx.main.utils.handlers import SpecialInventoryHandler -from awx.main.tasks.system import update_smart_memberships_for_inventory, update_inventory_computed_fields from awx.main.utils.update_model import update_model +# Django flags +from flags.state import flag_enabled + logger = logging.getLogger('awx.main.tasks.jobs') @@ -436,20 +439,6 @@ class BaseTask(object): Hook for any steps to run after job/task is marked as complete. """ instance.log_lifecycle("finalize_run") - artifact_dir = os.path.join(private_data_dir, 'artifacts', str(self.instance.id)) - collections_info = os.path.join(artifact_dir, 'collections.json') - ansible_version_file = os.path.join(artifact_dir, 'ansible_version.txt') - - if os.path.exists(collections_info): - with open(collections_info) as ee_json_info: - ee_collections_info = json.loads(ee_json_info.read()) - instance.installed_collections = ee_collections_info - instance.save(update_fields=['installed_collections']) - if os.path.exists(ansible_version_file): - with open(ansible_version_file) as ee_ansible_info: - ansible_version_info = ee_ansible_info.readline() - instance.ansible_version = ansible_version_info - instance.save(update_fields=['ansible_version']) # Run task manager appropriately for speculative dependencies if instance.unifiedjob_blocked_jobs.exists(): @@ -652,7 +641,7 @@ class BaseTask(object): # Field host_status_counts is used as a metric to check if event processing is finished # we send notifications if it is, if not, callback receiver will send them if (self.instance.host_status_counts is not None) or (not self.runner_callback.wrapup_event_dispatched): - self.instance.send_notification_templates('succeeded' if status == 'successful' else 'failed') + events_processed_hook(self.instance) try: self.final_run_hook(self.instance, status, private_data_dir) @@ -927,11 +916,16 @@ class RunJob(SourceControlMixin, BaseTask): if authorize: env['ANSIBLE_NET_AUTH_PASS'] = network_cred.get_input('authorize_password', default='') - path_vars = ( + path_vars = [ ('ANSIBLE_COLLECTIONS_PATHS', 'collections_paths', 'requirements_collections', '~/.ansible/collections:/usr/share/ansible/collections'), ('ANSIBLE_ROLES_PATH', 'roles_path', 'requirements_roles', '~/.ansible/roles:/usr/share/ansible/roles:/etc/ansible/roles'), ('ANSIBLE_COLLECTIONS_PATH', 'collections_path', 'requirements_collections', '~/.ansible/collections:/usr/share/ansible/collections'), - ) + ] + + if flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): + path_vars.append( + ('ANSIBLE_CALLBACK_PLUGINS', 'callback_plugins', 'plugins_path', '~/.ansible/plugins:/plugins/callback:/usr/share/ansible/plugins/callback'), + ) config_values = read_ansible_config(os.path.join(private_data_dir, 'project'), list(map(lambda x: x[1], path_vars))) @@ -948,6 +942,11 @@ class RunJob(SourceControlMixin, BaseTask): paths = [os.path.join(CONTAINER_ROOT, folder)] + paths env[env_key] = os.pathsep.join(paths) + if flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): + env['ANSIBLE_CALLBACKS_ENABLED'] = 'indirect_instance_count' + if 'callbacks_enabled' in config_values: + env['ANSIBLE_CALLBACKS_ENABLED'] += ':' + config_values['callbacks_enabled'] + return env def build_args(self, job, private_data_dir, passwords): @@ -1388,6 +1387,17 @@ class RunProjectUpdate(BaseTask): shutil.copytree(cache_subpath, dest_subpath, symlinks=True) logger.debug('{0} {1} prepared {2} from cache'.format(type(project).__name__, project.pk, dest_subpath)) + if flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): + # copy the special callback (not stdout type) plugin to get list of collections + pdd_plugins_path = os.path.join(job_private_data_dir, 'plugins_path') + if not os.path.exists(pdd_plugins_path): + os.mkdir(pdd_plugins_path) + from awx.playbooks import library + + plugin_file_source = os.path.join(library.__path__._path[0], 'indirect_instance_count.py') + plugin_file_dest = os.path.join(pdd_plugins_path, 'indirect_instance_count.py') + shutil.copyfile(plugin_file_source, plugin_file_dest) + def post_run_hook(self, instance, status): super(RunProjectUpdate, self).post_run_hook(instance, status) # To avoid hangs, very important to release lock even if errors happen here diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index e0bb0dfc9f..3e93b50b93 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -29,6 +29,8 @@ from django.core.exceptions import ObjectDoesNotExist # Django-CRUM from crum import impersonate +# Django flags +from flags.state import flag_enabled # Runner import ansible_runner.cleanup @@ -62,6 +64,7 @@ from awx.main.utils.common import ignore_inventory_computed_fields, ignore_inven from awx.main.utils.reload import stop_local_services from awx.main.utils.pglock import advisory_lock from awx.main.tasks.helpers import is_run_threshold_reached +from awx.main.tasks.host_indirect import save_indirect_host_entries from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanup, administrative_workunit_reaper, write_receptor_config from awx.main.consumers import emit_channel_notification from awx.main import analytics @@ -364,6 +367,20 @@ def send_notifications(notification_list, job_id=None): logger.exception('Error saving notification {} result.'.format(notification.id)) +def events_processed_hook(unified_job): + """This method is intended to be called for every unified job + after the playbook_on_stats/EOF event is processed and final status is saved + Either one of these events could happen before the other, or there may be no events""" + unified_job.send_notification_templates('succeeded' if unified_job.status == 'successful' else 'failed') + if isinstance(unified_job, Job) and flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): + if unified_job.event_queries_processed is True: + # If this is called from callback receiver, it likely does not have updated model data + # a refresh now is formally robust + unified_job.refresh_from_db(fields=['event_queries_processed']) + if unified_job.event_queries_processed is False: + save_indirect_host_entries.delay(unified_job.id) + + @task(queue=get_task_queuename) def gather_analytics(): if is_run_threshold_reached(getattr(settings, 'AUTOMATION_ANALYTICS_LAST_GATHER', None), settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL): diff --git a/awx/main/tests/data/projects/host_query/extensions/audit/event_query.yml b/awx/main/tests/data/projects/host_query/extensions/audit/event_query.yml new file mode 100644 index 0000000000..fb27540cec --- /dev/null +++ b/awx/main/tests/data/projects/host_query/extensions/audit/event_query.yml @@ -0,0 +1,4 @@ +--- +demo.query.example: + query: >- + {canonical_facts: {host_name: .direct_host_name}, facts: {device_type: .device_type}} diff --git a/awx/main/tests/data/projects/host_query/plugins/modules/example.py b/awx/main/tests/data/projects/host_query/plugins/modules/example.py new file mode 100644 index 0000000000..fb1eb2314e --- /dev/null +++ b/awx/main/tests/data/projects/host_query/plugins/modules/example.py @@ -0,0 +1,77 @@ +#!/usr/bin/python + +# Same licensing as AWX +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + +DOCUMENTATION = r''' +--- +module: example + +short_description: Module for specific live tests + +version_added: "2.0.0" + +description: This module is part of a test collection in local source. + +options: + host_name: + description: Name to return as the host name. + required: false + type: str + +author: + - AWX Live Tests +''' + +EXAMPLES = r''' +- name: Test with defaults + demo.query.example: + +- name: Test with custom host name + demo.query.example: + host_name: foo_host +''' + +RETURN = r''' +direct_host_name: + description: The name of the host, this will be collected with the feature. + type: str + returned: always + sample: 'foo_host' +''' + +from ansible.module_utils.basic import AnsibleModule + + +def run_module(): + module_args = dict( + host_name=dict(type='str', required=False, default='foo_host_default'), + ) + + result = dict( + changed=False, + other_data='sample_string', + ) + + module = AnsibleModule(argument_spec=module_args, supports_check_mode=True) + + if module.check_mode: + module.exit_json(**result) + + result['direct_host_name'] = module.params['host_name'] + result['nested_host_name'] = {'host_name': module.params['host_name']} + + # non-cononical facts + result['device_type'] = 'Fake Host' + + module.exit_json(**result) + + +def main(): + run_module() + + +if __name__ == '__main__': + main() diff --git a/awx/main/tests/functional/tasks/test_host_indirect.py b/awx/main/tests/functional/tasks/test_host_indirect.py new file mode 100644 index 0000000000..bf9d3b5c32 --- /dev/null +++ b/awx/main/tests/functional/tasks/test_host_indirect.py @@ -0,0 +1,223 @@ +import yaml +from unittest import mock + +import pytest + +from django.utils.timezone import now, timedelta + +from awx.main.tasks.host_indirect import ( + build_indirect_host_data, + fetch_job_event_query, + save_indirect_host_entries, + cleanup_and_save_indirect_host_entries_fallback, +) +from awx.main.models.event_query import EventQuery +from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit + +"""These are unit tests, similar to test_indirect_host_counting in the live tests""" + + +TEST_JQ = "{canonical_facts: {host_name: .direct_host_name}, facts: {another_host_name: .direct_host_name}}" + + +@pytest.fixture +def bare_job(job_factory): + job = job_factory() + job.installed_collections = {'demo.query': {'version': '1.0.1'}, 'demo2.query': {'version': '1.0.1'}} + job.event_queries_processed = False + job.save(update_fields=['installed_collections', 'event_queries_processed']) + return job + + +def create_registered_event(job, task_name='demo.query.example'): + return job.job_events.create(event_data={'resolved_action': task_name, 'res': {'direct_host_name': 'foo_host'}}) + + +@pytest.fixture +def job_with_counted_event(bare_job): + create_registered_event(bare_job) + return bare_job + + +def create_event_query(fqcn='demo.query'): + module_name = f'{fqcn}.example' + return EventQuery.objects.create(fqcn=fqcn, collection_version='1.0.1', event_query=yaml.dump({module_name: {'query': TEST_JQ}}, default_flow_style=False)) + + +def create_audit_record(name, job, organization, created=now()): + record = IndirectManagedNodeAudit.objects.create(name=name, job=job, organization=organization) + record.created = created + record.save() + return record + + +@pytest.fixture +def event_query(): + "This is ordinarily created by the artifacts callback" + return create_event_query() + + +@pytest.fixture +def old_audit_record(bare_job, organization): + created_at = now() - timedelta(days=10) + return create_audit_record(name="old_job", job=bare_job, organization=organization, created=created_at) + + +@pytest.fixture +def new_audit_record(bare_job, organization): + return IndirectManagedNodeAudit.objects.create(name="new_job", job=bare_job, organization=organization) + + +# ---- end fixtures ---- + + +@pytest.mark.django_db +def test_build_with_no_results(bare_job): + # never filled in events, should do nothing + assert build_indirect_host_data(bare_job, {}) == [] + + +@pytest.mark.django_db +def test_collect_an_event(job_with_counted_event): + records = build_indirect_host_data(job_with_counted_event, {'demo.query.example': {'query': TEST_JQ}}) + assert len(records) == 1 + + +@pytest.mark.django_db +def test_fetch_job_event_query(bare_job, event_query): + assert fetch_job_event_query(bare_job) == {'demo.query.example': {'query': TEST_JQ}} + + +@pytest.mark.django_db +def test_fetch_multiple_job_event_query(bare_job): + create_event_query(fqcn='demo.query') + create_event_query(fqcn='demo2.query') + assert fetch_job_event_query(bare_job) == {'demo.query.example': {'query': TEST_JQ}, 'demo2.query.example': {'query': TEST_JQ}} + + +@pytest.mark.django_db +def test_save_indirect_host_entries(job_with_counted_event, event_query): + assert job_with_counted_event.event_queries_processed is False + save_indirect_host_entries(job_with_counted_event.id) + job_with_counted_event.refresh_from_db() + assert job_with_counted_event.event_queries_processed is True + assert IndirectManagedNodeAudit.objects.filter(job=job_with_counted_event).count() == 1 + host_audit = IndirectManagedNodeAudit.objects.filter(job=job_with_counted_event).first() + assert host_audit.count == 1 + assert host_audit.canonical_facts == {'host_name': 'foo_host'} + assert host_audit.facts == {'another_host_name': 'foo_host'} + assert host_audit.organization == job_with_counted_event.organization + + +@pytest.mark.django_db +def test_multiple_events_same_module_same_host(bare_job, event_query): + "This tests that the count field gives correct answers" + create_registered_event(bare_job) + create_registered_event(bare_job) + create_registered_event(bare_job) + + save_indirect_host_entries(bare_job.id) + + assert IndirectManagedNodeAudit.objects.filter(job=bare_job).count() == 1 + host_audit = IndirectManagedNodeAudit.objects.filter(job=bare_job).first() + + assert host_audit.count == 3 + assert host_audit.events == ['demo.query.example'] + + +@pytest.mark.django_db +def test_multiple_registered_modules(bare_job): + "This tests that the events will list multiple modules if more than 1 module from different collections is registered and used" + create_registered_event(bare_job, task_name='demo.query.example') + create_registered_event(bare_job, task_name='demo2.query.example') + + # These take the place of using the event_query fixture + create_event_query(fqcn='demo.query') + create_event_query(fqcn='demo2.query') + + save_indirect_host_entries(bare_job.id) + + assert IndirectManagedNodeAudit.objects.filter(job=bare_job).count() == 1 + host_audit = IndirectManagedNodeAudit.objects.filter(job=bare_job).first() + + assert host_audit.count == 2 + assert set(host_audit.events) == {'demo.query.example', 'demo2.query.example'} + + +@pytest.mark.django_db +def test_multiple_registered_modules_same_collection(bare_job): + "This tests that the events will list multiple modules if more than 1 module in same collection is registered and used" + create_registered_event(bare_job, task_name='demo.query.example') + create_registered_event(bare_job, task_name='demo.query.example2') + + # Takes place of event_query fixture, doing manually here + EventQuery.objects.create( + fqcn='demo.query', + collection_version='1.0.1', + event_query=yaml.dump( + { + 'demo.query.example': {'query': TEST_JQ}, + 'demo.query.example2': {'query': TEST_JQ}, + }, + default_flow_style=False, + ), + ) + + save_indirect_host_entries(bare_job.id) + + assert IndirectManagedNodeAudit.objects.filter(job=bare_job).count() == 1 + host_audit = IndirectManagedNodeAudit.objects.filter(job=bare_job).first() + + assert host_audit.count == 2 + assert set(host_audit.events) == {'demo.query.example', 'demo.query.example2'} + + +@pytest.mark.django_db +def test_events_not_fully_processed_no_op(bare_job): + # I have a job that produced 12 events, but those are not saved + bare_job.emitted_events = 12 + bare_job.finished = now() + bare_job.save(update_fields=['emitted_events', 'finished']) + + # Running the normal post-run task will do nothing at this point + assert bare_job.event_queries_processed is False + with mock.patch('time.sleep'): # for test speedup + save_indirect_host_entries(bare_job.id) + bare_job.refresh_from_db() + assert bare_job.event_queries_processed is False + + # Right away, the fallback processing will not run either + cleanup_and_save_indirect_host_entries_fallback() + bare_job.refresh_from_db() + assert bare_job.event_queries_processed is False + + # After 3 hours have passed... + bare_job.finished = now() - timedelta(hours=3) + + # Create the expected job events + for _ in range(12): + create_registered_event(bare_job) + + bare_job.save(update_fields=['finished']) + + # The fallback task will now process indirect host query data for this job + cleanup_and_save_indirect_host_entries_fallback() + + # Test code to process anyway, events collected or not + save_indirect_host_entries(bare_job.id, wait_for_events=False) + bare_job.refresh_from_db() + assert bare_job.event_queries_processed is True + + +@pytest.mark.django_db +def test_job_id_does_not_exist(): + save_indirect_host_entries(10000001) + + +@pytest.mark.django_db +def test_cleanup_old_audit_records(old_audit_record, new_audit_record): + count_before_cleanup = IndirectManagedNodeAudit.objects.count() + assert count_before_cleanup == 2 + cleanup_and_save_indirect_host_entries_fallback() + count_after_cleanup = IndirectManagedNodeAudit.objects.count() + assert count_after_cleanup == 1 diff --git a/awx/main/tests/live/tests/conftest.py b/awx/main/tests/live/tests/conftest.py new file mode 100644 index 0000000000..5210fd601a --- /dev/null +++ b/awx/main/tests/live/tests/conftest.py @@ -0,0 +1,182 @@ +import subprocess +import time +import os +import shutil +import tempfile + +import pytest + +from django.conf import settings + +from awx.api.versioning import reverse + +# These tests are invoked from the awx/main/tests/live/ subfolder +# so any fixtures from higher-up conftest files must be explicitly included +from awx.main.tests.functional.conftest import * # noqa +from awx.main.tests.conftest import load_all_credentials # noqa: F401; pylint: disable=unused-import +from awx.main.tests import data + +from awx.main.models import Project, JobTemplate, Organization, Inventory + + +PROJ_DATA = os.path.join(os.path.dirname(data.__file__), 'projects') + + +def _copy_folders(source_path, dest_path, clear=False): + "folder-by-folder, copy dirs in the source root dir to the destination root dir" + for dirname in os.listdir(source_path): + source_dir = os.path.join(source_path, dirname) + expected_dir = os.path.join(dest_path, dirname) + if clear and os.path.exists(expected_dir): + shutil.rmtree(expected_dir) + if (not os.path.isdir(source_dir)) or os.path.exists(expected_dir): + continue + shutil.copytree(source_dir, expected_dir) + + +GIT_COMMANDS = ( + 'git config --global init.defaultBranch devel; ' + 'git init; ' + 'git config user.email jenkins@ansible.com; ' + 'git config user.name DoneByTest; ' + 'git add .; ' + 'git commit -m "initial commit"' +) + + +@pytest.fixture(scope='session') +def live_tmp_folder(): + path = os.path.join(tempfile.gettempdir(), 'live_tests') + if os.path.exists(path): + shutil.rmtree(path) + os.mkdir(path) + _copy_folders(PROJ_DATA, path) + for dirname in os.listdir(path): + source_dir = os.path.join(path, dirname) + subprocess.run(GIT_COMMANDS, cwd=source_dir, shell=True) + if path not in settings.AWX_ISOLATION_SHOW_PATHS: + settings.AWX_ISOLATION_SHOW_PATHS = settings.AWX_ISOLATION_SHOW_PATHS + [path] + return path + + +def wait_to_leave_status(job, status, timeout=30, sleep_time=0.1): + """Wait until the job does NOT have the specified status with some timeout + + the default timeout is based on the task manager running a 20 second + schedule, and the API does not guarentee working jobs faster than this + """ + start = time.time() + while time.time() - start < timeout: + job.refresh_from_db() + if job.status != status: + return + time.sleep(sleep_time) + raise RuntimeError(f'Job failed to exit {status} in {timeout} seconds. job_explanation={job.job_explanation} tb={job.result_traceback}') + + +def wait_for_events(uj, timeout=2): + start = time.time() + while uj.event_processing_finished is False: + time.sleep(0.2) + uj.refresh_from_db() + if time.time() - start > timeout: + break + + +def unified_job_stdout(uj): + wait_for_events(uj) + return '\n'.join([event.stdout for event in uj.get_event_queryset().order_by('created')]) + + +def wait_for_job(job, final_status='successful', running_timeout=800): + wait_to_leave_status(job, 'pending') + wait_to_leave_status(job, 'waiting') + wait_to_leave_status(job, 'running', timeout=running_timeout) + + assert job.status == final_status, f'Job was not successful id={job.id} status={job.status} tb={job.result_traceback} output=\n{unified_job_stdout(job)}' + + +@pytest.fixture(scope='session') +def default_org(): + org = Organization.objects.filter(name='Default').first() + if org is None: + raise Exception('Tests expect Default org to already be created and it is not') + return org + + +@pytest.fixture(scope='session') +def demo_inv(default_org): + inventory, _ = Inventory.objects.get_or_create(name='Demo Inventory', defaults={'organization': default_org}) + return inventory + + +@pytest.fixture +def podman_image_generator(): + """ + Generate a tagless podman image from awx base EE + """ + + def fn(): + dockerfile = """ + FROM quay.io/ansible/awx-ee:latest + RUN echo "Hello, Podman!" > /tmp/hello.txt + """ + cmd = ['podman', 'build', '-f', '-'] # Create an image without a tag + subprocess.run(cmd, capture_output=True, input=dockerfile, text=True, check=True) + + return fn + + +@pytest.fixture +def run_job_from_playbook(default_org, demo_inv, post, admin): + def _rf(test_name, playbook, local_path=None, scm_url=None): + project_name = f'{test_name} project' + jt_name = f'{test_name} JT: {playbook}' + + old_proj = Project.objects.filter(name=project_name).first() + if old_proj: + old_proj.delete() + + old_jt = JobTemplate.objects.filter(name=jt_name).first() + if old_jt: + old_jt.delete() + + proj_kwargs = {'name': project_name, 'organization': default_org.id} + if local_path: + # manual path + proj_kwargs['scm_type'] = '' + proj_kwargs['local_path'] = local_path + elif scm_url: + proj_kwargs['scm_type'] = 'git' + proj_kwargs['scm_url'] = scm_url + else: + raise RuntimeError('Need to provide scm_url or local_path') + + result = post( + reverse('api:project_list'), + proj_kwargs, + admin, + expect=201, + ) + proj = Project.objects.get(id=result.data['id']) + + if proj.current_job: + wait_for_job(proj.current_job) + + assert proj.get_project_path() + assert playbook in proj.playbooks + + result = post( + reverse('api:job_template_list'), + {'name': jt_name, 'project': proj.id, 'playbook': playbook, 'inventory': demo_inv.id}, + admin, + expect=201, + ) + jt = JobTemplate.objects.get(id=result.data['id']) + job = jt.create_unified_job() + job.signal_start() + + wait_for_job(job) + assert job.status == 'successful' + + return _rf diff --git a/awx/main/tests/live/tests/projects/conftest.py b/awx/main/tests/live/tests/projects/conftest.py new file mode 100644 index 0000000000..39c8b76fbf --- /dev/null +++ b/awx/main/tests/live/tests/projects/conftest.py @@ -0,0 +1,14 @@ +import pytest +import os + +from django.conf import settings + +from awx.main.tests.live.tests.conftest import _copy_folders, PROJ_DATA + + +@pytest.fixture(scope='session') +def copy_project_folders(): + proj_root = settings.PROJECTS_ROOT + if not os.path.exists(proj_root): + os.mkdir(proj_root) + _copy_folders(PROJ_DATA, proj_root, clear=True) diff --git a/awx/main/tests/live/tests/test_indirect_host_counting.py b/awx/main/tests/live/tests/test_indirect_host_counting.py new file mode 100644 index 0000000000..7c86eb8d31 --- /dev/null +++ b/awx/main/tests/live/tests/test_indirect_host_counting.py @@ -0,0 +1,65 @@ +import yaml +import time + +from awx.main.tests.live.tests.conftest import wait_for_events +from awx.main.tasks.host_indirect import build_indirect_host_data, save_indirect_host_entries +from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit +from awx.main.models import Job + + +def test_indirect_host_counting(live_tmp_folder, run_job_from_playbook): + run_job_from_playbook('test_indirect_host_counting', 'run_task.yml', scm_url=f'file://{live_tmp_folder}/test_host_query') + job = Job.objects.filter(name__icontains='test_indirect_host_counting').order_by('-created').first() + wait_for_events(job) # We must wait for events because system tasks iterate on job.job_events.filter(...) + + # Data matches to awx/main/tests/data/projects/host_query/extensions/audit/event_query.yml + # this just does things in-line to be a more localized test for the immediate testing + module_jq_str = '{canonical_facts: {host_name: .direct_host_name}, facts: {device_type: .device_type}}' + event_query = {'demo.query.example': {'query': module_jq_str}} + + # Run the task logic directly with local data + results = build_indirect_host_data(job, event_query) + assert len(results) == 1 + host_audit_entry = results[0] + + canonical_facts = {'host_name': 'foo_host_default'} + facts = {'device_type': 'Fake Host'} + + # Asserts on data that will match to the input jq string from above + assert host_audit_entry.canonical_facts == canonical_facts + assert host_audit_entry.facts == facts + + # Test collection of data + assert 'demo.query' in job.installed_collections + assert 'host_query' in job.installed_collections['demo.query'] + hq_text = job.installed_collections['demo.query']['host_query'] + hq_data = yaml.safe_load(hq_text) + assert hq_data == {'demo.query.example': {'query': module_jq_str}} + + assert job.ansible_version + + # Poll for events finishing processing, because background task requires this + for _ in range(10): + if job.job_events.count() >= job.emitted_events: + break + time.sleep(0.2) + else: + raise RuntimeError(f'job id={job.id} never processed events') + + # Task might not run due to race condition, so make it run here + job.refresh_from_db() + if job.event_queries_processed is False: + save_indirect_host_entries.delay(job.id, wait_for_events=False) + # This will poll for the background task to finish + for _ in range(10): + if IndirectManagedNodeAudit.objects.filter(job=job).exists(): + break + time.sleep(0.2) + else: + raise RuntimeError(f'No IndirectManagedNodeAudit records ever populated for job_id={job.id}') + + assert IndirectManagedNodeAudit.objects.filter(job=job).count() == 1 + host_audit = IndirectManagedNodeAudit.objects.filter(job=job).first() + assert host_audit.canonical_facts == canonical_facts + assert host_audit.facts == facts + assert host_audit.organization == job.organization diff --git a/awx/main/tests/unit/tasks/test_host_indirect_unit.py b/awx/main/tests/unit/tasks/test_host_indirect_unit.py new file mode 100644 index 0000000000..2b128ca6fa --- /dev/null +++ b/awx/main/tests/unit/tasks/test_host_indirect_unit.py @@ -0,0 +1,56 @@ +import copy + +import pytest + +from awx.main.tasks.host_indirect import get_hashable_form + + +class TestHashableForm: + @pytest.mark.parametrize( + 'data', + [ + {'a': 'b'}, + ['a', 'b'], + ('a', 'b'), + {'a': {'b': 'c'}}, + {'a': ['b', 'c']}, + {'a': ('b', 'c')}, + ['a', ['b', 'c']], + ['a', ('b', 'c')], + ['a', {'b': 'c'}], + ], + ) + def test_compare_equal_data(self, data): + other_data = copy.deepcopy(data) + # A tuple of scalars may be cached so ids could legitimately be the same + if data != ('a', 'b'): + assert id(data) != id(other_data) # sanity + assert id(get_hashable_form(data)) != id(get_hashable_form(data)) + + assert get_hashable_form(data) == get_hashable_form(data) + assert hash(get_hashable_form(data)) == hash(get_hashable_form(data)) + + assert get_hashable_form(data) in {get_hashable_form(data): 1} # test lookup hit + + @pytest.mark.parametrize( + 'data, other_data', + [ + [{'a': 'b'}, {'a': 'c'}], + [{'a': 'b'}, {'a': 'b', 'c': 'd'}], + [['a', 'b'], ['a', 'c']], + [('a', 'b'), ('a', 'c')], + [{'a': {'b': 'c'}}, {'a': {'b': 'd'}}], + [{'a': ['b', 'c']}, {'a': ['b', 'd']}], + [{'a': ('b', 'c')}, {'a': ('b', 'd')}], + [['a', ['b', 'c']], ['a', ['b', 'd']]], + [['a', ('b', 'c')], ['a', ('b', 'd')]], + [['a', {'b': 'c'}], ['a', {'b': 'd'}]], + ], + ) + def test_compare_different_data(self, data, other_data): + assert data != other_data # sanity, otherwise why test this? + assert get_hashable_form(data) != get_hashable_form(other_data) + assert hash(get_hashable_form(data)) != hash(get_hashable_form(other_data)) + + assert get_hashable_form(other_data) not in {get_hashable_form(data): 1} # test lookup miss + assert get_hashable_form(data) not in {get_hashable_form(other_data): 1} diff --git a/awx/playbooks/library/indirect_instance_count.py b/awx/playbooks/library/indirect_instance_count.py new file mode 100644 index 0000000000..27f84d92a5 --- /dev/null +++ b/awx/playbooks/library/indirect_instance_count.py @@ -0,0 +1,94 @@ +# (C) 2012, Michael DeHaan, +# (c) 2017 Ansible Project +# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) + +from __future__ import absolute_import, division, print_function + +__metaclass__ = type + + +DOCUMENTATION = ''' + callback: host_query + type: notification + short_description: for demo of indirect host data and counting, this produces collection data + version_added: historical + description: + - Saves collection data to artifacts folder + requirements: + - Whitelist in configuration + - Set AWX_ISOLATED_DATA_DIR, AWX will do this +''' + +import os +import json +from importlib.resources import files + +from ansible.plugins.callback import CallbackBase + + +# NOTE: in Ansible 1.2 or later general logging is available without +# this plugin, just set ANSIBLE_LOG_PATH as an environment variable +# or log_path in the DEFAULTS section of your ansible configuration +# file. This callback is an example of per hosts logging for those +# that want it. + + +# Taken from https://github.com/ansible/ansible/blob/devel/lib/ansible/cli/galaxy.py#L1624 + +from ansible.cli.galaxy import with_collection_artifacts_manager +from ansible.release import __version__ + +from ansible.galaxy.collection import find_existing_collections +from ansible.utils.collection_loader import AnsibleCollectionConfig +import ansible.constants as C + + +@with_collection_artifacts_manager +def list_collections(artifacts_manager=None): + artifacts_manager.require_build_metadata = False + + default_collections_path = set(C.COLLECTIONS_PATHS) + collections_search_paths = default_collections_path | set(AnsibleCollectionConfig.collection_paths) + collections = list(find_existing_collections(list(collections_search_paths), artifacts_manager, dedupe=False)) + return collections + + +class CallbackModule(CallbackBase): + """ + logs playbook results, per host, in /var/log/ansible/hosts + """ + + CALLBACK_VERSION = 2.0 + CALLBACK_TYPE = 'notification' + CALLBACK_NAME = 'indirect_instance_count' + CALLBACK_NEEDS_WHITELIST = True + + TIME_FORMAT = "%b %d %Y %H:%M:%S" + MSG_FORMAT = "%(now)s - %(category)s - %(data)s\n\n" + + def v2_playbook_on_stats(self, stats): + artifact_dir = os.getenv('AWX_ISOLATED_DATA_DIR') + if not artifact_dir: + raise RuntimeError('Only suitable in AWX, did not find private_data_dir') + + collections_print = {} + # Loop over collections, from ansible-core these are Candidate objects + for candidate in list_collections(): + collection_print = { + 'version': candidate.ver, + } + + query_file = files(f'ansible_collections.{candidate.namespace}.{candidate.name}') / 'extensions' / 'audit' / 'event_query.yml' + if query_file.exists(): + with query_file.open('r') as f: + collection_print['host_query'] = f.read() + + collections_print[candidate.fqcn] = collection_print + + ansible_data = {'installed_collections': collections_print, 'ansible_version': __version__} + + write_path = os.path.join(artifact_dir, 'ansible_data.json') + with open(write_path, "w") as fd: + fd.write(json.dumps(ansible_data, indent=2)) + + super().v2_playbook_on_stats(stats) diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index 661aed5adc..45f85da62f 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -501,6 +501,10 @@ CELERYBEAT_SCHEDULE = { 'cleanup_host_metrics': {'task': 'awx.main.tasks.host_metrics.cleanup_host_metrics', 'schedule': timedelta(hours=3, minutes=30)}, 'host_metric_summary_monthly': {'task': 'awx.main.tasks.host_metrics.host_metric_summary_monthly', 'schedule': timedelta(hours=4)}, 'periodic_resource_sync': {'task': 'awx.main.tasks.system.periodic_resource_sync', 'schedule': timedelta(minutes=15)}, + 'cleanup_and_save_indirect_host_entries_fallback': { + 'task': 'awx.main.tasks.host_indirect.cleanup_and_save_indirect_host_entries_fallback', + 'schedule': timedelta(minutes=60), + }, } # Django Caching Configuration @@ -1213,6 +1217,18 @@ ANSIBLE_BASE_ALLOW_SINGLETON_ROLES_API = False # Do not allow creating user-def # system username for django-ansible-base SYSTEM_USERNAME = None +# For indirect host query processing +# if a job is not immediently confirmed to have all events processed +# it will be eligable for processing after this number of minutes +INDIRECT_HOST_QUERY_FALLBACK_MINUTES = 60 + +# If an error happens in event collection, give up after this time +INDIRECT_HOST_QUERY_FALLBACK_GIVEUP_DAYS = 3 + +# Maximum age for indirect host audit records +# Older records will be cleaned up +INDIRECT_HOST_AUDIT_RECORD_MAX_AGE_DAYS = 7 + # setting for Policy as Code feature FEATURE_POLICY_AS_CODE_ENABLED = False @@ -1232,4 +1248,7 @@ OPA_REQUEST_RETRIES = 2 # Number of retries to connect to OPA service, defaults # feature flags FLAG_SOURCES = ('flags.sources.SettingsFlagsSource',) -FLAGS = {'FEATURE_POLICY_AS_CODE_ENABLED': [{'condition': 'boolean', 'value': False}]} +FLAGS = { + 'FEATURE_POLICY_AS_CODE_ENABLED': [{'condition': 'boolean', 'value': False}], + 'FEATURE_INDIRECT_NODE_COUNTING_ENABLED': [{'condition': 'boolean', 'value': False}], +} diff --git a/awx/settings/development.py b/awx/settings/development.py index d38c2759e2..e4e58c7de2 100644 --- a/awx/settings/development.py +++ b/awx/settings/development.py @@ -67,6 +67,9 @@ CLUSTER_HOST_ID = socket.gethostname() AWX_CALLBACK_PROFILE = True +# this modifies FLAGS set by defaults +FLAGS['FEATURE_INDIRECT_NODE_COUNTING_ENABLED'] = [{'condition': 'boolean', 'value': True}] # noqa + # ======================!!!!!!! FOR DEVELOPMENT ONLY !!!!!!!================================= # Disable normal scheduled/triggered task managers (DependencyManager, TaskManager, WorkflowManager). # Allows user to trigger task managers directly for debugging and profiling purposes. diff --git a/licenses/jq.txt b/licenses/jq.txt new file mode 100644 index 0000000000..7e70661173 --- /dev/null +++ b/licenses/jq.txt @@ -0,0 +1,22 @@ +Copyright (c) 2013, Michael Williamson +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/requirements/requirements.in b/requirements/requirements.in index c8205ab3cd..50740aa7ef 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -1,6 +1,7 @@ aiohttp>=3.11.6 # CVE-2024-52304 ansiconv==1.0.0 # UPGRADE BLOCKER: from 2013, consider replacing instead of upgrading ansible-runner==2.4.0 +jq # used for indirect host counting feature asciichartpy asn1 azure-identity diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 233e2e1e59..ff44517535 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -30,6 +30,7 @@ asgiref==3.7.2 # channels-redis # daphne # django + # django-ansible-base # django-cors-headers asn1==2.7.0 # via -r /awx_devel/requirements/requirements.in @@ -164,7 +165,9 @@ django-crum==0.7.9 django-extensions==3.2.3 # via -r /awx_devel/requirements/requirements.in django-flags==5.0.13 - # via -r /awx_devel/requirements/requirements.in + # via + # -r /awx_devel/requirements/requirements.in + # django-ansible-base django-guid==3.2.1 # via -r /awx_devel/requirements/requirements.in django-oauth-toolkit==1.7.1 @@ -266,6 +269,8 @@ jmespath==1.0.1 # via # boto3 # botocore +jq==1.8.0 + # via -r /awx_devel/requirements/requirements.in json-log-formatter==0.5.2 # via -r /awx_devel/requirements/requirements.in jsonschema==4.21.1 @@ -412,6 +417,7 @@ pygerduty==0.38.3 pyjwt[crypto]==2.8.0 # via # adal + # django-ansible-base # msal # social-auth-core # twilio @@ -474,6 +480,7 @@ requests==2.32.3 # -r /awx_devel/requirements/requirements.in # adal # azure-core + # django-ansible-base # django-oauth-toolkit # kubernetes # msal @@ -533,6 +540,7 @@ sqlparse==0.5.0 # via # -r /awx_devel/requirements/requirements.in # django + # django-ansible-base tacacs-plus==1.0 # via -r /awx_devel/requirements/requirements.in tempora==5.5.1 @@ -572,6 +580,7 @@ urllib3==1.26.20 # via # -r /awx_devel/requirements/requirements.in # botocore + # django-ansible-base # kubernetes # requests uwsgi==2.0.28 diff --git a/requirements/requirements_dev.txt b/requirements/requirements_dev.txt index 0d88a477fe..7da864ef29 100644 --- a/requirements/requirements_dev.txt +++ b/requirements/requirements_dev.txt @@ -30,3 +30,4 @@ pip>=21.3,<=24.0 # PEP 660 – Editable installs for pyproject.toml based builds debugpy remote-pdb sdb + diff --git a/requirements/requirements_git.txt b/requirements/requirements_git.txt index 2c6e82524e..4b86683e26 100644 --- a/requirements/requirements_git.txt +++ b/requirements/requirements_git.txt @@ -1,4 +1,4 @@ git+https://github.com/ansible/system-certifi.git@devel#egg=certifi # Remove pbr from requirements.in when moving ansible-runner to requirements.in git+https://github.com/ansible/python3-saml.git@devel#egg=python3-saml -django-ansible-base @ git+ssh://git@github.com/ansible-automation-platform/django-ansible-base@stable-2.5#egg=django-ansible-base[rest-filters,jwt_consumer,resource-registry,rbac,feature-flags] +django-ansible-base @ git+https://github.com/alancoding/django-ansible-base@sqlparse#egg=django-ansible-base[rest-filters,jwt_consumer,resource-registry,rbac,feature-flags]