From fccb6744f9864757a9a6bfe6a6dcc8ac5b0c7ccb Mon Sep 17 00:00:00 2001 From: Alan Rominger Date: Thu, 28 May 2026 16:17:13 -0400 Subject: [PATCH] Address even more pytest warnings, and migrate smart inventory tests (#16330) * Address even more pytest warnings, co-authored with Opus 4.6 * Upgrade pyparsing * Attempt to update smart inventory logic * Move smart inventory tests here * Fix some failing dev env tests Assisted-by: claude * Use shared fixture for teardown * Fix test goof assisted-by: claude Opus 4.6 --- .../tests/functional/api/test_host_filter.py | 225 +++++++++++- .../functional/api/test_smart_inventory.py | 191 +++++++++++ .../functional/rbac/test_rbac_inventory.py | 97 ++++++ .../tests/live/tests/test_smart_inventory.py | 320 ++++++++++++++++++ .../tests/unit/notifications/test_grafana.py | 28 +- awx/main/utils/filters.py | 71 ++-- pytest.ini | 13 - requirements/requirements.in | 2 +- requirements/requirements.txt | 2 +- 9 files changed, 871 insertions(+), 78 deletions(-) create mode 100644 awx/main/tests/functional/api/test_smart_inventory.py create mode 100644 awx/main/tests/live/tests/test_smart_inventory.py diff --git a/awx/main/tests/functional/api/test_host_filter.py b/awx/main/tests/functional/api/test_host_filter.py index f65f737fd7..fdbf9b6f79 100644 --- a/awx/main/tests/functional/api/test_host_filter.py +++ b/awx/main/tests/functional/api/test_host_filter.py @@ -1,5 +1,3 @@ -# TODO: As of writing this our only concern is ensuring that the fact feature is reflected in the Host endpoint. -# Other host tests should live here to make this test suite more complete. import pytest import urllib.parse @@ -20,6 +18,48 @@ def inventory_structure(): Group.objects.create(name="g3", inventory=inv) +@pytest.fixture +def host_filter_inventory(): + """Inventory with hosts and groups matching the tower-qa test_host_filter structure. + + Groups: groupA (contains groupAA as child), groupAA, groupB + Hosts: hostA (in groupA), hostAA (in groupAA), hostB (in groupB), hostDup (in all 3 groups) + """ + org = Organization.objects.create(name="hf-org") + inv = Inventory.objects.create(name="hf-inv", organization=org) + + groupA = Group.objects.create(name="groupA", inventory=inv) + groupAA = Group.objects.create(name="groupAA", inventory=inv) + groupB = Group.objects.create(name="groupB", inventory=inv) + + hostA = Host.objects.create(name="hostA", inventory=inv) + hostAA = Host.objects.create(name="hostAA", inventory=inv) + hostB = Host.objects.create(name="hostB", inventory=inv) + hostDup = Host.objects.create(name="hostDup", inventory=inv) + + groupA.hosts.add(hostA, hostDup) + groupAA.hosts.add(hostAA, hostDup) + groupB.hosts.add(hostB, hostDup) + groupA.children.add(groupAA) + + return { + 'org': org, + 'inv': inv, + 'hosts': {'hostA': hostA, 'hostAA': hostAA, 'hostB': hostB, 'hostDup': hostDup}, + 'groups': {'groupA': groupA, 'groupAA': groupAA, 'groupB': groupB}, + } + + +def get_host_names(response): + return sorted(h['name'] for h in response.data['results']) + + +def host_filter_get(get, user, host_filter): + url = reverse('api:host_list') + params = "?host_filter=%s" % urllib.parse.quote(host_filter, safe='') + return get(url + params, user) + + @pytest.mark.django_db def test_q1(inventory_structure, get, user): def evaluate_query(query, expected_hosts): @@ -50,3 +90,184 @@ def test_q1(inventory_structure, get, user): # The following test verifies if the search in host_filter is case insensitive. query = 'search="HOST1"' evaluate_query(query, [hosts[0]]) + + +# --- Host filter query tests (migrated from tower-qa test_host_filter.py) --- + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "host_filter, expected", + [ + ("name=hostA", ["hostA"]), + ("name=not_found", []), + ("name=hostDup", ["hostDup"]), + ], +) +def test_basic_host_name_search(host_filter_inventory, get, admin_user, host_filter, expected): + response = host_filter_get(get, admin_user, host_filter) + assert response.status_code == 200 + assert get_host_names(response) == sorted(expected) + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "host_filter, expected", + [ + ("name=hostA or name=hostB", ["hostA", "hostB"]), + ("name=hostA or name=not_found", ["hostA"]), + ("name=not_found or name=not_found", []), + ("name=hostA or name=hostA", ["hostA"]), + ("name=hostDup or name=hostDup", ["hostDup"]), + ("name=hostA or name=hostAA or name=not_found", ["hostA", "hostAA"]), + ], +) +def test_host_name_search_with_or(host_filter_inventory, get, admin_user, host_filter, expected): + response = host_filter_get(get, admin_user, host_filter) + assert response.status_code == 200 + assert get_host_names(response) == sorted(expected) + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "host_filter, expected", + [ + ("name=hostA and name=hostB", []), + ("name=hostA and name=hostA", ["hostA"]), + ("name=not_found and name=not_found", []), + ("name=hostDup and name=hostDup", ["hostDup"]), + ("name=hostA and name=hostB and name=not_found", []), + ], +) +def test_host_name_search_with_and(host_filter_inventory, get, admin_user, host_filter, expected): + response = host_filter_get(get, admin_user, host_filter) + assert response.status_code == 200 + assert get_host_names(response) == sorted(expected) + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "host_filter, expected", + [ + ("groups__name=groupA", ["hostA", "hostDup"]), + ("groups__name=groupAA", ["hostAA", "hostDup"]), + ("groups__name=not_found", []), + ], +) +def test_basic_group_search(host_filter_inventory, get, admin_user, host_filter, expected): + response = host_filter_get(get, admin_user, host_filter) + assert response.status_code == 200 + assert get_host_names(response) == sorted(expected) + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "host_filter, expected", + [ + ("groups__name=groupA or groups__name=groupB", ["hostA", "hostB", "hostDup"]), + ("groups__name=groupA or groups__name=not_found", ["hostA", "hostDup"]), + ("groups__name=not_found or groups__name=not_found", []), + ("groups__name=groupA or groups__name=groupA", ["hostA", "hostDup"]), + ( + "groups__name=groupA or groups__name=groupAA or groups__name=not_found", + ["hostA", "hostAA", "hostDup"], + ), + ], +) +def test_group_search_with_or(host_filter_inventory, get, admin_user, host_filter, expected): + response = host_filter_get(get, admin_user, host_filter) + assert response.status_code == 200 + assert get_host_names(response) == sorted(expected) + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "host_filter, expected", + [ + ("groups__name=groupA and groups__name=groupB", ["hostDup"]), + ("groups__name=groupA and groups__name=groupA", ["hostA", "hostDup"]), + ("groups__name=not_found and groups__name=not_found", []), + ("groups__name=groupA and groups__name=groupB and groups__name=not_found", []), + ], +) +def test_group_search_with_and(host_filter_inventory, get, admin_user, host_filter, expected): + response = host_filter_get(get, admin_user, host_filter) + assert response.status_code == 200 + assert get_host_names(response) == sorted(expected) + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "host_filter, expected", + [ + ("name=hostA or groups__name=groupB", ["hostA", "hostB", "hostDup"]), + ("name=hostA and groups__name=groupA", ["hostA"]), + ("name=hostA and groups__name=not_found", []), + ("name=not_found and groups__name=not_found", []), + ("name=hostDup and groups__name=groupA", ["hostDup"]), + ("name=hostDup and groups__name=groupB", ["hostDup"]), + ], +) +def test_basic_hybrid_search(host_filter_inventory, get, admin_user, host_filter, expected): + response = host_filter_get(get, admin_user, host_filter) + assert response.status_code == 200 + assert get_host_names(response) == sorted(expected) + + +@pytest.mark.django_db +def test_smart_search(get, admin_user): + org = Organization.objects.create(name="search-org") + inv = Inventory.objects.create(name="search-inv", organization=org) + host = Host.objects.create(name="unique_search_target", description="findme_description", inventory=inv) + + for search_term in ["unique_search_target", "findme_description"]: + response = host_filter_get(get, admin_user, "search=%s" % search_term) + assert response.status_code == 200 + names = get_host_names(response) + assert host.name in names + + +@pytest.mark.django_db +def test_password_field_filter_blocked(get, admin_user): + url = reverse('api:host_list') + filters = [ + "created_by__password__icontains=pas3w3rd", + "search=foo or created_by__password__icontains=pas3w3rd", + "created_by__password__icontains=passw3rd or search=foo", + ] + for f in filters: + params = "?host_filter=%s" % urllib.parse.quote(f, safe='') + response = get(url + params, admin_user) + assert response.status_code == 400, f"Expected 400 for filter: {f}" + + +@pytest.mark.django_db +def test_unicode_host_filter(get, admin_user): + org = Organization.objects.create(name="unicode-org") + inv = Inventory.objects.create(name="unicode-inv", organization=org) + host = Host.objects.create(name="ホスト", inventory=inv) + group = Group.objects.create(name="グループ", inventory=inv) + group.hosts.add(host) + + response = host_filter_get(get, admin_user, "name=ホスト") + assert response.status_code == 200 + assert len(response.data['results']) == 1 + assert response.data['results'][0]['id'] == host.id + + response = host_filter_get(get, admin_user, "groups__name=グループ") + assert response.status_code == 200 + assert len(response.data['results']) == 1 + assert response.data['results'][0]['id'] == host.id + + +@pytest.mark.django_db +@pytest.mark.parametrize( + "invalid_filter", + ["string_without_equals", "1", "1.0", "true"], + ids=["bare_string", "integer", "float", "bool"], +) +def test_invalid_host_filter(get, admin_user, invalid_filter): + url = reverse('api:host_list') + params = "?host_filter=%s" % urllib.parse.quote(invalid_filter, safe='') + response = get(url + params, admin_user) + assert response.status_code == 400 diff --git a/awx/main/tests/functional/api/test_smart_inventory.py b/awx/main/tests/functional/api/test_smart_inventory.py new file mode 100644 index 0000000000..b60b73064f --- /dev/null +++ b/awx/main/tests/functional/api/test_smart_inventory.py @@ -0,0 +1,191 @@ +import json + +import pytest + +from awx.api.versioning import reverse +from awx.main.models import Organization, Host, Group, Inventory + + +@pytest.fixture +def smart_inv_org(): + return Organization.objects.create(name="smart-org") + + +@pytest.fixture +def smart_inv_source(smart_inv_org): + inv = Inventory.objects.create(name="smart-source-inv", organization=smart_inv_org) + Host.objects.create(name="hostA", inventory=inv) + Host.objects.create(name="hostB", inventory=inv) + Host.objects.create(name="hostDup", inventory=inv) + groupA = Group.objects.create(name="groupA", inventory=inv) + groupB = Group.objects.create(name="groupB", inventory=inv) + groupA.hosts.add(*inv.hosts.filter(name__in=["hostA", "hostDup"])) + groupB.hosts.add(*inv.hosts.filter(name__in=["hostB", "hostDup"])) + return inv + + +@pytest.mark.django_db +def test_create_smart_inventory(post, admin_user, smart_inv_org): + resp = post( + reverse('api:inventory_list'), + { + 'name': 'my-smart-inv', + 'kind': 'smart', + 'organization': smart_inv_org.pk, + 'host_filter': 'name=hostA', + }, + admin_user, + expect=201, + ) + assert resp.data['kind'] == 'smart' + assert resp.data['host_filter'] == 'name=hostA' + + +@pytest.mark.django_db +def test_create_smart_inventory_requires_host_filter(post, admin_user, smart_inv_org): + resp = post( + reverse('api:inventory_list'), + { + 'name': 'no-filter-smart', + 'kind': 'smart', + 'organization': smart_inv_org.pk, + }, + admin_user, + expect=400, + ) + assert 'host_filter' in json.dumps(resp.data) + + +@pytest.mark.django_db +def test_unable_to_create_host_in_smart_inventory(post, admin_user, smart_inv_org): + smart_inv = Inventory.objects.create( + name="no-host-create", + kind="smart", + host_filter="name=hostA", + organization=smart_inv_org, + ) + url = reverse('api:inventory_hosts_list', kwargs={'pk': smart_inv.pk}) + resp = post(url, {'name': 'new-host'}, admin_user, expect=400) + assert 'Cannot create' in json.dumps(resp.data) + + +@pytest.mark.django_db +def test_unable_to_create_group_in_smart_inventory(post, admin_user, smart_inv_org): + smart_inv = Inventory.objects.create( + name="no-group-create", + kind="smart", + host_filter="name=hostA", + organization=smart_inv_org, + ) + url = reverse('api:inventory_groups_list', kwargs={'pk': smart_inv.pk}) + resp = post(url, {'name': 'new-group'}, admin_user, expect=400) + assert 'Cannot create' in json.dumps(resp.data) + + +@pytest.mark.django_db +def test_unable_to_create_inventory_source_in_smart_inventory(post, admin_user, smart_inv_org): + smart_inv = Inventory.objects.create( + name="no-src-create", + kind="smart", + host_filter="name=hostA", + organization=smart_inv_org, + ) + url = reverse('api:inventory_inventory_sources_list', kwargs={'pk': smart_inv.pk}) + resp = post(url, {'name': 'new-src', 'source': 'ec2'}, admin_user, expect=400) + assert 'Cannot create' in json.dumps(resp.data) + + +@pytest.mark.django_db +def test_convert_smart_to_regular_inventory(admin_user, smart_inv_org): + smart_inv = Inventory.objects.create( + name="convert-to-regular", + kind="smart", + host_filter="name=anything", + organization=smart_inv_org, + ) + assert smart_inv.kind == 'smart' + smart_inv.host_filter = '' + smart_inv.kind = '' + smart_inv.save() + smart_inv.refresh_from_db() + assert smart_inv.kind == '' + assert not smart_inv.host_filter + + +@pytest.mark.django_db +def test_smart_inventory_deletion_does_not_cascade(admin_user, smart_inv_source, smart_inv_org): + host = smart_inv_source.hosts.first() + smart_inv = Inventory.objects.create( + name="delete-no-cascade", + kind="smart", + host_filter="name=%s" % host.name, + organization=smart_inv_org, + ) + smart_inv.delete() + assert Host.objects.filter(pk=host.pk).exists() + + +@pytest.mark.django_db +def test_urlencode_host_filter(post, admin_user, smart_inv_org): + post( + reverse('api:inventory_list'), + data={ + 'name': 'url-encoded-smart', + 'kind': 'smart', + 'organization': smart_inv_org.pk, + 'host_filter': 'ansible_facts__ansible_distribution_version=%227.4%22', + }, + user=admin_user, + expect=201, + ) + si = Inventory.objects.get(name='url-encoded-smart') + assert si.host_filter == 'ansible_facts__ansible_distribution_version="7.4"' + + +@pytest.mark.django_db +def test_host_filter_unicode(post, admin_user, smart_inv_org): + post( + reverse('api:inventory_list'), + data={ + 'name': 'unicode-smart', + 'kind': 'smart', + 'organization': smart_inv_org.pk, + 'host_filter': u'ansible_facts__ansible_distribution=レッドハット', + }, + user=admin_user, + expect=201, + ) + si = Inventory.objects.get(name='unicode-smart') + assert si.host_filter == u'ansible_facts__ansible_distribution=レッドハット' + + +@pytest.mark.django_db +@pytest.mark.parametrize("lookup", ['icontains', 'has_keys']) +def test_host_filter_invalid_ansible_facts_lookup(post, admin_user, smart_inv_org, lookup): + resp = post( + reverse('api:inventory_list'), + data={ + 'name': 'invalid-lookup-smart', + 'kind': 'smart', + 'organization': smart_inv_org.pk, + 'host_filter': u'ansible_facts__ansible_distribution__{}=cent'.format(lookup), + }, + user=admin_user, + expect=400, + ) + assert 'ansible_facts does not support searching with __{}'.format(lookup) in json.dumps(resp.data) + + +@pytest.mark.django_db +def test_host_filter_ansible_facts_exact(post, admin_user, smart_inv_org): + post( + reverse('api:inventory_list'), + data={ + 'name': 'exact-smart', + 'kind': 'smart', + 'organization': smart_inv_org.pk, + 'host_filter': 'ansible_facts__ansible_distribution__exact="CentOS"', + }, + user=admin_user, + expect=201, + ) diff --git a/awx/main/tests/functional/rbac/test_rbac_inventory.py b/awx/main/tests/functional/rbac/test_rbac_inventory.py index 87804d1c74..a3a1b8be4e 100644 --- a/awx/main/tests/functional/rbac/test_rbac_inventory.py +++ b/awx/main/tests/functional/rbac/test_rbac_inventory.py @@ -1,7 +1,13 @@ +import urllib.parse + import pytest +from awx.api.versioning import reverse from awx.main.models import ( + Group, Host, + Inventory, + Organization, Schedule, ) from awx.main.access import ( @@ -128,3 +134,94 @@ class TestSmartInventory: assert InventoryAccess(org_admin).can_admin(smart_inventory, {'host_filter': 'search=foo'}) smart_inventory.admin_role.members.add(rando) assert not InventoryAccess(rando).can_admin(smart_inventory, {'host_filter': 'search=foo'}) + + def test_host_filter_edit_unprivileged(self, smart_inventory, user): + unprivileged = user('unprivileged', False) + assert not InventoryAccess(unprivileged).can_change(smart_inventory, None) + assert not InventoryAccess(unprivileged).can_admin(smart_inventory, {'host_filter': 'search=bar'}) + + def test_host_filter_edit_inventory_admin_role(self, smart_inventory, user): + inv_admin = user('inv_admin', False) + smart_inventory.admin_role.members.add(inv_admin) + assert InventoryAccess(inv_admin).can_change(smart_inventory, None) + assert not InventoryAccess(inv_admin).can_admin(smart_inventory, {'host_filter': 'search=bar'}) + + def test_host_filter_edit_org_admin_via_api(self, smart_inventory, patch, user): + oa = user('smart_oa', False) + smart_inventory.organization.admin_role.members.add(oa) + url = reverse('api:inventory_detail', kwargs={'pk': smart_inventory.pk}) + resp = patch(url, {'host_filter': 'search=bar'}, oa, expect=200) + assert resp.data['host_filter'] == 'search=bar' + + @pytest.mark.parametrize("role_field", ['admin_role', 'use_role', 'adhoc_role', 'read_role']) + def test_inventory_role_cannot_edit_host_filter(self, smart_inventory, patch, user, role_field): + u = user('role_test_user', False) + getattr(smart_inventory, role_field).members.add(u) + url = reverse('api:inventory_detail', kwargs={'pk': smart_inventory.pk}) + patch(url, {'host_filter': 'search=bar'}, u, expect=403) + + +@pytest.mark.django_db +class TestHostFilterRBAC: + @pytest.fixture + def two_org_inventories(self): + orgA = Organization.objects.create(name="rbac-orgA") + orgB = Organization.objects.create(name="rbac-orgB") + invA = Inventory.objects.create(name="rbac-invA", organization=orgA) + invB = Inventory.objects.create(name="rbac-invB", organization=orgB) + hostA = Host.objects.create(name="shared_name", inventory=invA) + hostB = Host.objects.create(name="shared_name", inventory=invB) + groupA = Group.objects.create(name="shared_group", inventory=invA) + groupB = Group.objects.create(name="shared_group", inventory=invB) + groupA.hosts.add(hostA) + groupB.hosts.add(hostB) + return { + 'orgA': orgA, + 'orgB': orgB, + 'invA': invA, + 'invB': invB, + 'hostA': hostA, + 'hostB': hostB, + } + + @pytest.mark.parametrize("host_filter", ["name=shared_name", "groups__name=shared_group"]) + def test_host_filter_scoped_to_inventory_read_role(self, two_org_inventories, get, user, host_filter): + data = two_org_inventories + userA = user('rbac_userA', False) + userB = user('rbac_userB', False) + data['invA'].read_role.members.add(userA) + data['invB'].read_role.members.add(userB) + + url = reverse('api:host_list') + params = "?host_filter=%s" % urllib.parse.quote(host_filter, safe='') + + respA = get(url + params, userA) + idsA = [h['id'] for h in respA.data['results']] + assert data['hostA'].id in idsA + assert data['hostB'].id not in idsA + + respB = get(url + params, userB) + idsB = [h['id'] for h in respB.data['results']] + assert data['hostB'].id in idsB + assert data['hostA'].id not in idsB + + @pytest.mark.parametrize("host_filter", ["name=shared_name", "groups__name=shared_group"]) + def test_host_filter_scoped_to_org_admin(self, two_org_inventories, get, user, host_filter): + data = two_org_inventories + adminA = user('rbac_adminA', False) + adminB = user('rbac_adminB', False) + data['orgA'].admin_role.members.add(adminA) + data['orgB'].admin_role.members.add(adminB) + + url = reverse('api:host_list') + params = "?host_filter=%s" % urllib.parse.quote(host_filter, safe='') + + respA = get(url + params, adminA) + idsA = [h['id'] for h in respA.data['results']] + assert data['hostA'].id in idsA + assert data['hostB'].id not in idsA + + respB = get(url + params, adminB) + idsB = [h['id'] for h in respB.data['results']] + assert data['hostB'].id in idsB + assert data['hostA'].id not in idsB diff --git a/awx/main/tests/live/tests/test_smart_inventory.py b/awx/main/tests/live/tests/test_smart_inventory.py new file mode 100644 index 0000000000..d5fc364bbc --- /dev/null +++ b/awx/main/tests/live/tests/test_smart_inventory.py @@ -0,0 +1,320 @@ +"""Smart inventory tests that require PostgreSQL. + +These tests exercise SmartFilter and smart inventory host resolution against +a real PostgreSQL database. Most are unit-style tests that set ansible_facts +directly on Host objects rather than running playbooks. + +The smart inventory HostManager uses DISTINCT ON which requires PostgreSQL, +so any test that reads smart inventory hosts must run here (not in functional/). +""" + +import pytest + +from awx.main.models import Organization, Inventory, Host, Group +from awx.main.utils.filters import SmartFilter + + +@pytest.fixture +def fact_org(): + org, _ = Organization.objects.get_or_create(name='smart-inv-fact-test-org') + return org + + +@pytest.fixture +def fact_inventory(fact_org): + inv, created = Inventory.objects.get_or_create(name='smart-inv-fact-test-inv', organization=fact_org) + if not created: + inv.hosts.all().delete() + inv.groups.all().delete() + + groupA = Group.objects.create(name='factGroupA', inventory=inv) + groupB = Group.objects.create(name='factGroupB', inventory=inv) + + hostA = Host.objects.create( + name='factHostA', + inventory=inv, + ansible_facts={ + 'ansible_system': 'Linux', + 'ansible_distribution': 'CentOS', + 'ansible_python': { + 'version': {'major': 3, 'minor': 9, 'micro': 7}, + 'version_info': [3, 9, 7, 'final', 0], + }, + 'ansible_env': {'HOME': '/root'}, + }, + ) + hostB = Host.objects.create( + name='factHostB', + inventory=inv, + ansible_facts={ + 'ansible_system': 'Linux', + 'ansible_distribution': 'Ubuntu', + 'ansible_python': { + 'version': {'major': 3, 'minor': 11, 'micro': 2}, + 'version_info': [3, 11, 2, 'final', 0], + }, + 'ansible_env': {'HOME': '/home/user'}, + }, + ) + hostC = Host.objects.create( + name='factHostC', + inventory=inv, + ansible_facts={ + 'ansible_system': 'Darwin', + 'ansible_distribution': 'MacOSX', + 'ansible_python': { + 'version': {'major': 3, 'minor': 10, 'micro': 0}, + 'version_info': [3, 10, 0, 'final', 0], + }, + 'ansible_env': {'HOME': '/Users/test'}, + }, + ) + + groupA.hosts.add(hostA, hostC) + groupB.hosts.add(hostB, hostC) + + yield { + 'org': fact_org, + 'inv': inv, + 'hosts': {'hostA': hostA, 'hostB': hostB, 'hostC': hostC}, + 'groups': {'groupA': groupA, 'groupB': groupB}, + } + + hostA.delete() + hostB.delete() + hostC.delete() + groupA.delete() + groupB.delete() + + +@pytest.fixture +def smart_inventory_factory(): + created = [] + + def _factory(name, host_filter, organization): + inv = Inventory.objects.create(name=name, kind='smart', host_filter=host_filter, organization=organization) + created.append(inv) + return inv + + yield _factory + for inv in reversed(created): + inv.delete() + + +@pytest.fixture +def host_factory(): + created = [] + + def _factory(**kwargs): + host = Host.objects.create(**kwargs) + created.append(host) + return host + + yield _factory + for host in reversed(created): + if host.pk is not None: + host.delete() + + +@pytest.fixture +def group_factory(): + created = [] + + def _factory(**kwargs): + group = Group.objects.create(**kwargs) + created.append(group) + return group + + yield _factory + for group in reversed(created): + group.delete() + + +def query_names(filter_string): + return sorted(SmartFilter.query_from_string(filter_string).distinct().values_list('name', flat=True)) + + +# --- Fact-based filter tests (require PostgreSQL for JSONField __contains) --- + + +def test_fact_based_host_filter(fact_inventory): + assert query_names('ansible_facts__ansible_system=Linux') == ['factHostA', 'factHostB'] + assert query_names('ansible_facts__ansible_distribution=CentOS') == ['factHostA'] + assert query_names('ansible_facts__ansible_distribution=Ubuntu') == ['factHostB'] + assert query_names('ansible_facts__ansible_system=Darwin') == ['factHostC'] + assert query_names('ansible_facts__ansible_system=Windows') == [] + + +def test_nested_fact_search(fact_inventory): + assert query_names('ansible_facts__ansible_python__version__major=3') == ['factHostA', 'factHostB', 'factHostC'] + assert query_names('ansible_facts__ansible_python__version__minor=9') == ['factHostA'] + assert query_names('ansible_facts__ansible_python__version__minor=11') == ['factHostB'] + assert query_names('ansible_facts__ansible_env__HOME=/root') == ['factHostA'] + + +def test_list_fact_search(fact_inventory): + assert query_names('ansible_facts__ansible_python__version_info[]=9') == ['factHostA'] + assert query_names('ansible_facts__ansible_python__version_info[]=11') == ['factHostB'] + assert query_names('ansible_facts__ansible_python__version_info[]=3') == ['factHostA', 'factHostB', 'factHostC'] + + +def test_fact_search_with_or(fact_inventory): + assert query_names('ansible_facts__ansible_system=Linux or ansible_facts__ansible_system=Linux') == ['factHostA', 'factHostB'] + assert query_names('ansible_facts__ansible_system=Linux or ansible_facts__ansible_system=not_found') == ['factHostA', 'factHostB'] + assert query_names('ansible_facts__ansible_system=not_found or ansible_facts__ansible_system=not_found') == [] + assert query_names('ansible_facts__ansible_system=Linux or ansible_facts__ansible_system=Darwin') == ['factHostA', 'factHostB', 'factHostC'] + + +def test_fact_search_with_and(fact_inventory): + assert query_names('ansible_facts__ansible_system=Linux and ansible_facts__ansible_system=Linux') == ['factHostA', 'factHostB'] + assert query_names('ansible_facts__ansible_system=Linux and ansible_facts__ansible_system=not_found') == [] + assert query_names('ansible_facts__ansible_system=Linux and ansible_facts__ansible_distribution=CentOS') == ['factHostA'] + + +def test_hybrid_fact_name_group_search(fact_inventory): + assert query_names('name=factHostA or groups__name=factGroupB or ansible_facts__ansible_system=Linux') == ['factHostA', 'factHostB', 'factHostC'] + + assert query_names('name=factHostA or groups__name=factGroupA or ansible_facts__ansible_system=not_found') == ['factHostA', 'factHostC'] + + assert query_names('name=factHostA and groups__name=factGroupA and ansible_facts__ansible_system=not_found') == [] + + assert query_names('name=factHostA and groups__name=factGroupA and ansible_facts__ansible_system=Linux') == ['factHostA'] + + +def test_advanced_hybrid_with_parentheses(fact_inventory): + assert query_names('name=factHostA or (groups__name=factGroupB and ansible_facts__ansible_system=not_found)') == ['factHostA'] + + assert query_names('name=not_found or (groups__name=factGroupB and ansible_facts__ansible_system=Linux)') == ['factHostB'] + + assert query_names('(name=factHostA or groups__name=factGroupB) and ansible_facts__ansible_system=not_found') == [] + + assert query_names('(name=factHostA or groups__name=factGroupB) and ansible_facts__ansible_system=Linux') == ['factHostA', 'factHostB'] + + assert query_names('(name=factHostC or groups__name=factGroupA) and ansible_facts__ansible_system=Darwin') == ['factHostC'] + + +# --- Smart inventory host resolution tests (require PostgreSQL for DISTINCT ON) --- + + +def test_smart_inventory_hosts_by_name(fact_inventory, smart_inventory_factory): + org = fact_inventory['org'] + smart_inv = smart_inventory_factory('smart-by-name', 'name=factHostA', org) + hosts = sorted(smart_inv.hosts.values_list('name', flat=True)) + assert hosts == ['factHostA'] + + +def test_smart_inventory_hosts_by_group(fact_inventory, smart_inventory_factory): + org = fact_inventory['org'] + smart_inv = smart_inventory_factory('smart-by-group', 'groups__name=factGroupA', org) + hosts = sorted(smart_inv.hosts.values_list('name', flat=True)) + assert hosts == ['factHostA', 'factHostC'] + + +def test_smart_inventory_with_facts(fact_inventory, smart_inventory_factory): + org = fact_inventory['org'] + smart_inv = smart_inventory_factory('fact-smart-inv', 'ansible_facts__ansible_system=Linux', org) + hosts = sorted(smart_inv.hosts.values_list('name', flat=True)) + assert hosts == ['factHostA', 'factHostB'] + assert smart_inv.total_hosts == 2 + + +def test_smart_inventory_with_nested_facts(fact_inventory, smart_inventory_factory): + org = fact_inventory['org'] + smart_inv = smart_inventory_factory( + 'nested-fact-smart-inv', + 'ansible_facts__ansible_distribution=CentOS and ansible_facts__ansible_python__version__minor=9', + org, + ) + hosts = list(smart_inv.hosts.values_list('name', flat=True)) + assert hosts == ['factHostA'] + + +def test_host_filter_is_organization_scoped(fact_inventory, smart_inventory_factory, host_factory): + """Smart inventory only includes hosts from its own organization.""" + org1 = fact_inventory['org'] + org2, _ = Organization.objects.get_or_create(name='smart-inv-other-org') + inv2, _ = Inventory.objects.get_or_create(name='other-org-inv', organization=org2) + Host.objects.filter(name='factHostA', inventory=inv2).delete() + host_factory(name='factHostA', inventory=inv2) + + smart_inv = smart_inventory_factory('scoped-smart', 'name=factHostA', org1) + hosts = list(smart_inv.hosts.all()) + assert len(hosts) == 1 + assert hosts[0].inventory_id == fact_inventory['inv'].id + + +def test_duplicate_hosts_deduplicated(smart_inventory_factory, host_factory): + """Same-name hosts across inventories in the same org yield only one smart inventory entry.""" + org, _ = Organization.objects.get_or_create(name='smart-inv-dedup-org') + inv1, _ = Inventory.objects.get_or_create(name='dedup-inv1', organization=org) + inv2, _ = Inventory.objects.get_or_create(name='dedup-inv2', organization=org) + Host.objects.filter(name='dedup_host', inventory__in=[inv1, inv2]).delete() + host1 = host_factory(name='dedup_host', inventory=inv1) + host2 = host_factory(name='dedup_host', inventory=inv2) + + smart_inv = smart_inventory_factory('dedup-smart', 'name=dedup_host', org) + hosts = list(smart_inv.hosts.all()) + assert len(hosts) == 1 + assert hosts[0].id == min(host1.id, host2.id) + + +def test_host_sources_original_inventory(fact_inventory, smart_inventory_factory): + """Hosts in a smart inventory still reference their source inventory.""" + org = fact_inventory['org'] + source_inv = fact_inventory['inv'] + + smart_inv = smart_inventory_factory('sources-original', 'name=factHostA', org) + host = smart_inv.hosts.first() + assert host.inventory_id == source_inv.id + + +def test_host_updates_reflected_in_smart_inventory(fact_inventory, smart_inventory_factory, host_factory): + """Editing or deleting a host is immediately reflected in a smart inventory.""" + org = fact_inventory['org'] + inv = fact_inventory['inv'] + host = host_factory(name='mutable_host', inventory=inv) + + smart_inv = smart_inventory_factory('updates-reflected', 'name=mutable_host', org) + assert smart_inv.hosts.count() == 1 + + host.description = 'updated' + host.save() + assert smart_inv.hosts.first().description == 'updated' + + host.delete() + assert smart_inv.hosts.count() == 0 + + +def test_smart_inventory_duplicate_hosts_matching_group_names(fact_inventory, smart_inventory_factory, host_factory, group_factory): + """A host in multiple groups whose names match an icontains filter appears only once.""" + org = fact_inventory['org'] + inv = fact_inventory['inv'] + g1 = group_factory(name='dedup_another_group', inventory=inv) + g2 = group_factory(name='dedup_yet_another_group', inventory=inv) + host = host_factory(name='dedup_grouped_host', inventory=inv) + g1.hosts.add(host) + g2.hosts.add(host) + + smart_inv = smart_inventory_factory('group-dedup-smart', 'groups__name__icontains=dedup_another', org) + assert smart_inv.hosts.count() == 1 + + +def test_smart_inventory_computed_fields(fact_inventory, smart_inventory_factory): + """Smart inventory total_hosts and related computed fields are accurate.""" + org = fact_inventory['org'] + smart_inv = smart_inventory_factory('computed-fields', 'name=factHostA or name=factHostB', org) + assert smart_inv.total_hosts == 2 + assert smart_inv.total_groups == 0 + assert smart_inv.total_inventory_sources == 0 + assert smart_inv.has_inventory_sources is False + + +def test_smart_inventory_matches_host_filter(fact_inventory, smart_inventory_factory): + """Smart inventory hosts should match the equivalent SmartFilter query.""" + org = fact_inventory['org'] + host_filter = 'groups__name=factGroupA or groups__name=factGroupB' + + smart_inv = smart_inventory_factory('match-filter', host_filter, org) + smart_names = sorted(smart_inv.hosts.values_list('name', flat=True)) + filter_names = sorted(SmartFilter.query_from_string(host_filter).distinct().values_list('name', flat=True)) + assert smart_names == filter_names diff --git a/awx/main/tests/unit/notifications/test_grafana.py b/awx/main/tests/unit/notifications/test_grafana.py index d4b9c31aa2..4e21e683ea 100644 --- a/awx/main/tests/unit/notifications/test_grafana.py +++ b/awx/main/tests/unit/notifications/test_grafana.py @@ -10,8 +10,8 @@ def test_send_messages(): with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock: requests_mock.post.return_value.status_code = 200 m = {} - m['started'] = dt.datetime.utcfromtimestamp(60).isoformat() - m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat() + m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat() + m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat() m['subject'] = "test subject" backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='') message = EmailMessage( @@ -40,8 +40,8 @@ def test_send_messages_with_no_verify_ssl(): with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock: requests_mock.post.return_value.status_code = 200 m = {} - m['started'] = dt.datetime.utcfromtimestamp(60).isoformat() - m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat() + m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat() + m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat() m['subject'] = "test subject" backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='', grafana_no_verify_ssl=True) message = EmailMessage( @@ -71,8 +71,8 @@ def test_send_messages_with_dashboardid(dashboardId): with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock: requests_mock.post.return_value.status_code = 200 m = {} - m['started'] = dt.datetime.utcfromtimestamp(60).isoformat() - m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat() + m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat() + m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat() m['subject'] = "test subject" backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=dashboardId, panelId='') message = EmailMessage( @@ -102,8 +102,8 @@ def test_send_messages_with_panelid(panelId): with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock: requests_mock.post.return_value.status_code = 200 m = {} - m['started'] = dt.datetime.utcfromtimestamp(60).isoformat() - m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat() + m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat() + m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat() m['subject'] = "test subject" backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId=panelId) message = EmailMessage( @@ -132,8 +132,8 @@ def test_send_messages_with_bothids(): with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock: requests_mock.post.return_value.status_code = 200 m = {} - m['started'] = dt.datetime.utcfromtimestamp(60).isoformat() - m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat() + m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat() + m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat() m['subject'] = "test subject" backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='42', panelId='42') message = EmailMessage( @@ -162,8 +162,8 @@ def test_send_messages_with_emptyids(): with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock: requests_mock.post.return_value.status_code = 200 m = {} - m['started'] = dt.datetime.utcfromtimestamp(60).isoformat() - m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat() + m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat() + m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat() m['subject'] = "test subject" backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='') message = EmailMessage( @@ -192,8 +192,8 @@ def test_send_messages_with_tags(): with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock: requests_mock.post.return_value.status_code = 200 m = {} - m['started'] = dt.datetime.utcfromtimestamp(60).isoformat() - m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat() + m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat() + m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat() m['subject'] = "test subject" backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='', annotation_tags=["ansible"]) message = EmailMessage( diff --git a/awx/main/utils/filters.py b/awx/main/utils/filters.py index 389b1f93c4..3b22f7e4ab 100644 --- a/awx/main/utils/filters.py +++ b/awx/main/utils/filters.py @@ -2,14 +2,7 @@ import re from functools import reduce from django.core.exceptions import FieldDoesNotExist -from pyparsing import ( - infixNotation, - opAssoc, - Optional, - Literal, - CharsNotIn, - ParseException, -) +import pyparsing as pp import logging from logging import Filter @@ -247,32 +240,19 @@ class SmartFilter(object): return (assembled_k, assembled_v) def _extract_key_value(self, t): - t_len = len(t) + k = t[0] + v = t[1] if len(t) > 1 else u"" - k = None - v = None + # Strip quotes from key + if isinstance(k, str) and k.startswith('"') and k.endswith('"'): + k = k[1:-1] - # key - # "something"= - v_offset = 2 - if t_len >= 2 and t[0] == "\"" and t[2] == "\"": - k = t[1] - v_offset = 4 - # something= + # For quoted values, keep the quotes (strip_quotes_* will handle them later). + # For unquoted values, convert to the appropriate Python type. + if isinstance(v, str) and v.startswith('"') and v.endswith('"'): + pass # keep as-is, e.g. '"true"', '""', '"null"' else: - k = t[0] - - # value - # ="something" - if t_len > (v_offset + 2) and t[v_offset] == "\"" and t[v_offset + 2] == "\"": - v = u'"' + str(t[v_offset + 1]) + u'"' - # v = t[v_offset + 1] - # empty "" - elif t_len > (v_offset + 1): - v = u"" - # no "" - else: - v = string_to_type(t[v_offset]) + v = string_to_type(v) return (k, v) @@ -288,7 +268,7 @@ class SmartFilter(object): try: model = get_model(relation) except LookupError: - raise ParseException('No related field named %s' % relation) + raise pp.ParseException('No related field named %s' % relation) search_kwargs = {} if model is not None: @@ -328,34 +308,31 @@ class SmartFilter(object): def query_from_string(cls, filter_string): """ TODO: - * handle values with " via: a.b.c.d="hello\"world" * handle keys with " via: a.\"b.c="yeah" * handle key with __ in it """ filter_string_raw = filter_string filter_string = str(filter_string) - unicode_spaces = list(set(str(c) for c in filter_string if c.isspace())) - unicode_spaces_other = unicode_spaces + [u'(', u')', u'=', u'"'] - atom = CharsNotIn(unicode_spaces_other) - atom_inside_quotes = CharsNotIn(u'"') - atom_quoted = Literal('"') + Optional(atom_inside_quotes) + Literal('"') - EQUAL = Literal('=') + unquoted = pp.CharsNotIn('()= \t\r\n"') + unquoted.skipWhitespace = True + quoted = pp.QuotedString('"', esc_char='\\', unquote_results=False) + token = quoted | unquoted - grammar = (atom_quoted | atom) + EQUAL + Optional((atom_quoted | atom)) - grammar.setParseAction(cls.BoolOperand) + operand = token + pp.Suppress("=") + pp.Optional(token, default="") + operand.set_parse_action(cls.BoolOperand) - boolExpr = infixNotation( - grammar, + bool_expr = pp.infix_notation( + operand, [ - ("and", 2, opAssoc.LEFT, cls.BoolAnd), - ("or", 2, opAssoc.LEFT, cls.BoolOr), + (pp.Keyword("and"), 2, pp.OpAssoc.LEFT, cls.BoolAnd), + (pp.Keyword("or"), 2, pp.OpAssoc.LEFT, cls.BoolOr), ], ) try: - res = boolExpr.parseString('(' + filter_string + ')') - except (ParseException, FieldDoesNotExist): + res = bool_expr.parse_string(filter_string, parse_all=True) + except (pp.ParseException, FieldDoesNotExist): raise RuntimeError(u"Invalid query %s" % filter_string_raw) if len(res) > 0: diff --git a/pytest.ini b/pytest.ini index ae8838c0bb..07dd958b2f 100644 --- a/pytest.ini +++ b/pytest.ini @@ -15,19 +15,6 @@ markers = filterwarnings = error - # FIXME: Upgrade python-dateutil https://github.com/dateutil/dateutil/issues/1340 - once:datetime.datetime.utcfromtimestamp\(\) is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC:DeprecationWarning - - # NOTE: the following are present using python 3.11 - # FIXME: Delete this entry once `pyparsing` is updated. - once:module 'sre_constants' is deprecated:DeprecationWarning:_pytest.assertion.rewrite - - # FIXME: Delete this entry once `polymorphic` is updated. - once:pkg_resources is deprecated as an API. - - # FIXME: Delete this entry once `zope` is updated. - once:Deprecated call to `pkg_resources.declare_namespace.'zope'.`.\nImplementing implicit namespace packages .as specified in PEP 420. is preferred to `pkg_resources.declare_namespace`. See https.//setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages:DeprecationWarning: - # FIXME: Delete this entry once the deprecation is acted upon. # Note: RemovedInDjango51Warning may not exist in newer Django versions ignore:'index_together' is deprecated. Use 'Meta.indexes' in 'main.\w+' instead. diff --git a/requirements/requirements.in b/requirements/requirements.in index 2e4f687c25..493c35ec17 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -50,7 +50,7 @@ pyasn1>=0.6.2 # CVE-2026-2349 pygerduty PyGithub pyopenssl -pyparsing==2.4.7 # Upgrading to v3 of pyparsing introduce errors on smart host filtering: Expected 'or' term, found 'or' (at char 15), (line:1, col:16) +pyparsing>3.0 # Upgraded to v3 and changed import patterns python-daemon python-dsv-sdk>=1.0.4 python-tss-sdk>=1.2.1 diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 75729849bd..063fa4b953 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -392,7 +392,7 @@ pyopenssl==25.3.0 # via # -r /awx_devel/requirements/requirements.in # twisted -pyparsing==2.4.7 +pyparsing==3.3.2 # via -r /awx_devel/requirements/requirements.in python-daemon==3.1.2 # via