Address even more pytest warnings, and migrate smart inventory tests (#16330)

* Address even more pytest warnings, co-authored with Opus 4.6

* Upgrade pyparsing

* Attempt to update smart inventory logic

* Move smart inventory tests here

* Fix some failing dev env tests

Assisted-by: claude

* Use shared fixture for teardown

* Fix test goof

assisted-by: claude Opus 4.6
This commit is contained in:
Alan Rominger
2026-05-28 16:17:13 -04:00
committed by GitHub
parent 200a68aefa
commit fccb6744f9
9 changed files with 871 additions and 78 deletions

View File

@@ -1,5 +1,3 @@
# TODO: As of writing this our only concern is ensuring that the fact feature is reflected in the Host endpoint.
# Other host tests should live here to make this test suite more complete.
import pytest
import urllib.parse
@@ -20,6 +18,48 @@ def inventory_structure():
Group.objects.create(name="g3", inventory=inv)
@pytest.fixture
def host_filter_inventory():
"""Inventory with hosts and groups matching the tower-qa test_host_filter structure.
Groups: groupA (contains groupAA as child), groupAA, groupB
Hosts: hostA (in groupA), hostAA (in groupAA), hostB (in groupB), hostDup (in all 3 groups)
"""
org = Organization.objects.create(name="hf-org")
inv = Inventory.objects.create(name="hf-inv", organization=org)
groupA = Group.objects.create(name="groupA", inventory=inv)
groupAA = Group.objects.create(name="groupAA", inventory=inv)
groupB = Group.objects.create(name="groupB", inventory=inv)
hostA = Host.objects.create(name="hostA", inventory=inv)
hostAA = Host.objects.create(name="hostAA", inventory=inv)
hostB = Host.objects.create(name="hostB", inventory=inv)
hostDup = Host.objects.create(name="hostDup", inventory=inv)
groupA.hosts.add(hostA, hostDup)
groupAA.hosts.add(hostAA, hostDup)
groupB.hosts.add(hostB, hostDup)
groupA.children.add(groupAA)
return {
'org': org,
'inv': inv,
'hosts': {'hostA': hostA, 'hostAA': hostAA, 'hostB': hostB, 'hostDup': hostDup},
'groups': {'groupA': groupA, 'groupAA': groupAA, 'groupB': groupB},
}
def get_host_names(response):
return sorted(h['name'] for h in response.data['results'])
def host_filter_get(get, user, host_filter):
url = reverse('api:host_list')
params = "?host_filter=%s" % urllib.parse.quote(host_filter, safe='')
return get(url + params, user)
@pytest.mark.django_db
def test_q1(inventory_structure, get, user):
def evaluate_query(query, expected_hosts):
@@ -50,3 +90,184 @@ def test_q1(inventory_structure, get, user):
# The following test verifies if the search in host_filter is case insensitive.
query = 'search="HOST1"'
evaluate_query(query, [hosts[0]])
# --- Host filter query tests (migrated from tower-qa test_host_filter.py) ---
@pytest.mark.django_db
@pytest.mark.parametrize(
"host_filter, expected",
[
("name=hostA", ["hostA"]),
("name=not_found", []),
("name=hostDup", ["hostDup"]),
],
)
def test_basic_host_name_search(host_filter_inventory, get, admin_user, host_filter, expected):
response = host_filter_get(get, admin_user, host_filter)
assert response.status_code == 200
assert get_host_names(response) == sorted(expected)
@pytest.mark.django_db
@pytest.mark.parametrize(
"host_filter, expected",
[
("name=hostA or name=hostB", ["hostA", "hostB"]),
("name=hostA or name=not_found", ["hostA"]),
("name=not_found or name=not_found", []),
("name=hostA or name=hostA", ["hostA"]),
("name=hostDup or name=hostDup", ["hostDup"]),
("name=hostA or name=hostAA or name=not_found", ["hostA", "hostAA"]),
],
)
def test_host_name_search_with_or(host_filter_inventory, get, admin_user, host_filter, expected):
response = host_filter_get(get, admin_user, host_filter)
assert response.status_code == 200
assert get_host_names(response) == sorted(expected)
@pytest.mark.django_db
@pytest.mark.parametrize(
"host_filter, expected",
[
("name=hostA and name=hostB", []),
("name=hostA and name=hostA", ["hostA"]),
("name=not_found and name=not_found", []),
("name=hostDup and name=hostDup", ["hostDup"]),
("name=hostA and name=hostB and name=not_found", []),
],
)
def test_host_name_search_with_and(host_filter_inventory, get, admin_user, host_filter, expected):
response = host_filter_get(get, admin_user, host_filter)
assert response.status_code == 200
assert get_host_names(response) == sorted(expected)
@pytest.mark.django_db
@pytest.mark.parametrize(
"host_filter, expected",
[
("groups__name=groupA", ["hostA", "hostDup"]),
("groups__name=groupAA", ["hostAA", "hostDup"]),
("groups__name=not_found", []),
],
)
def test_basic_group_search(host_filter_inventory, get, admin_user, host_filter, expected):
response = host_filter_get(get, admin_user, host_filter)
assert response.status_code == 200
assert get_host_names(response) == sorted(expected)
@pytest.mark.django_db
@pytest.mark.parametrize(
"host_filter, expected",
[
("groups__name=groupA or groups__name=groupB", ["hostA", "hostB", "hostDup"]),
("groups__name=groupA or groups__name=not_found", ["hostA", "hostDup"]),
("groups__name=not_found or groups__name=not_found", []),
("groups__name=groupA or groups__name=groupA", ["hostA", "hostDup"]),
(
"groups__name=groupA or groups__name=groupAA or groups__name=not_found",
["hostA", "hostAA", "hostDup"],
),
],
)
def test_group_search_with_or(host_filter_inventory, get, admin_user, host_filter, expected):
response = host_filter_get(get, admin_user, host_filter)
assert response.status_code == 200
assert get_host_names(response) == sorted(expected)
@pytest.mark.django_db
@pytest.mark.parametrize(
"host_filter, expected",
[
("groups__name=groupA and groups__name=groupB", ["hostDup"]),
("groups__name=groupA and groups__name=groupA", ["hostA", "hostDup"]),
("groups__name=not_found and groups__name=not_found", []),
("groups__name=groupA and groups__name=groupB and groups__name=not_found", []),
],
)
def test_group_search_with_and(host_filter_inventory, get, admin_user, host_filter, expected):
response = host_filter_get(get, admin_user, host_filter)
assert response.status_code == 200
assert get_host_names(response) == sorted(expected)
@pytest.mark.django_db
@pytest.mark.parametrize(
"host_filter, expected",
[
("name=hostA or groups__name=groupB", ["hostA", "hostB", "hostDup"]),
("name=hostA and groups__name=groupA", ["hostA"]),
("name=hostA and groups__name=not_found", []),
("name=not_found and groups__name=not_found", []),
("name=hostDup and groups__name=groupA", ["hostDup"]),
("name=hostDup and groups__name=groupB", ["hostDup"]),
],
)
def test_basic_hybrid_search(host_filter_inventory, get, admin_user, host_filter, expected):
response = host_filter_get(get, admin_user, host_filter)
assert response.status_code == 200
assert get_host_names(response) == sorted(expected)
@pytest.mark.django_db
def test_smart_search(get, admin_user):
org = Organization.objects.create(name="search-org")
inv = Inventory.objects.create(name="search-inv", organization=org)
host = Host.objects.create(name="unique_search_target", description="findme_description", inventory=inv)
for search_term in ["unique_search_target", "findme_description"]:
response = host_filter_get(get, admin_user, "search=%s" % search_term)
assert response.status_code == 200
names = get_host_names(response)
assert host.name in names
@pytest.mark.django_db
def test_password_field_filter_blocked(get, admin_user):
url = reverse('api:host_list')
filters = [
"created_by__password__icontains=pas3w3rd",
"search=foo or created_by__password__icontains=pas3w3rd",
"created_by__password__icontains=passw3rd or search=foo",
]
for f in filters:
params = "?host_filter=%s" % urllib.parse.quote(f, safe='')
response = get(url + params, admin_user)
assert response.status_code == 400, f"Expected 400 for filter: {f}"
@pytest.mark.django_db
def test_unicode_host_filter(get, admin_user):
org = Organization.objects.create(name="unicode-org")
inv = Inventory.objects.create(name="unicode-inv", organization=org)
host = Host.objects.create(name="ホスト", inventory=inv)
group = Group.objects.create(name="グループ", inventory=inv)
group.hosts.add(host)
response = host_filter_get(get, admin_user, "name=ホスト")
assert response.status_code == 200
assert len(response.data['results']) == 1
assert response.data['results'][0]['id'] == host.id
response = host_filter_get(get, admin_user, "groups__name=グループ")
assert response.status_code == 200
assert len(response.data['results']) == 1
assert response.data['results'][0]['id'] == host.id
@pytest.mark.django_db
@pytest.mark.parametrize(
"invalid_filter",
["string_without_equals", "1", "1.0", "true"],
ids=["bare_string", "integer", "float", "bool"],
)
def test_invalid_host_filter(get, admin_user, invalid_filter):
url = reverse('api:host_list')
params = "?host_filter=%s" % urllib.parse.quote(invalid_filter, safe='')
response = get(url + params, admin_user)
assert response.status_code == 400

View File

@@ -0,0 +1,191 @@
import json
import pytest
from awx.api.versioning import reverse
from awx.main.models import Organization, Host, Group, Inventory
@pytest.fixture
def smart_inv_org():
return Organization.objects.create(name="smart-org")
@pytest.fixture
def smart_inv_source(smart_inv_org):
inv = Inventory.objects.create(name="smart-source-inv", organization=smart_inv_org)
Host.objects.create(name="hostA", inventory=inv)
Host.objects.create(name="hostB", inventory=inv)
Host.objects.create(name="hostDup", inventory=inv)
groupA = Group.objects.create(name="groupA", inventory=inv)
groupB = Group.objects.create(name="groupB", inventory=inv)
groupA.hosts.add(*inv.hosts.filter(name__in=["hostA", "hostDup"]))
groupB.hosts.add(*inv.hosts.filter(name__in=["hostB", "hostDup"]))
return inv
@pytest.mark.django_db
def test_create_smart_inventory(post, admin_user, smart_inv_org):
resp = post(
reverse('api:inventory_list'),
{
'name': 'my-smart-inv',
'kind': 'smart',
'organization': smart_inv_org.pk,
'host_filter': 'name=hostA',
},
admin_user,
expect=201,
)
assert resp.data['kind'] == 'smart'
assert resp.data['host_filter'] == 'name=hostA'
@pytest.mark.django_db
def test_create_smart_inventory_requires_host_filter(post, admin_user, smart_inv_org):
resp = post(
reverse('api:inventory_list'),
{
'name': 'no-filter-smart',
'kind': 'smart',
'organization': smart_inv_org.pk,
},
admin_user,
expect=400,
)
assert 'host_filter' in json.dumps(resp.data)
@pytest.mark.django_db
def test_unable_to_create_host_in_smart_inventory(post, admin_user, smart_inv_org):
smart_inv = Inventory.objects.create(
name="no-host-create",
kind="smart",
host_filter="name=hostA",
organization=smart_inv_org,
)
url = reverse('api:inventory_hosts_list', kwargs={'pk': smart_inv.pk})
resp = post(url, {'name': 'new-host'}, admin_user, expect=400)
assert 'Cannot create' in json.dumps(resp.data)
@pytest.mark.django_db
def test_unable_to_create_group_in_smart_inventory(post, admin_user, smart_inv_org):
smart_inv = Inventory.objects.create(
name="no-group-create",
kind="smart",
host_filter="name=hostA",
organization=smart_inv_org,
)
url = reverse('api:inventory_groups_list', kwargs={'pk': smart_inv.pk})
resp = post(url, {'name': 'new-group'}, admin_user, expect=400)
assert 'Cannot create' in json.dumps(resp.data)
@pytest.mark.django_db
def test_unable_to_create_inventory_source_in_smart_inventory(post, admin_user, smart_inv_org):
smart_inv = Inventory.objects.create(
name="no-src-create",
kind="smart",
host_filter="name=hostA",
organization=smart_inv_org,
)
url = reverse('api:inventory_inventory_sources_list', kwargs={'pk': smart_inv.pk})
resp = post(url, {'name': 'new-src', 'source': 'ec2'}, admin_user, expect=400)
assert 'Cannot create' in json.dumps(resp.data)
@pytest.mark.django_db
def test_convert_smart_to_regular_inventory(admin_user, smart_inv_org):
smart_inv = Inventory.objects.create(
name="convert-to-regular",
kind="smart",
host_filter="name=anything",
organization=smart_inv_org,
)
assert smart_inv.kind == 'smart'
smart_inv.host_filter = ''
smart_inv.kind = ''
smart_inv.save()
smart_inv.refresh_from_db()
assert smart_inv.kind == ''
assert not smart_inv.host_filter
@pytest.mark.django_db
def test_smart_inventory_deletion_does_not_cascade(admin_user, smart_inv_source, smart_inv_org):
host = smart_inv_source.hosts.first()
smart_inv = Inventory.objects.create(
name="delete-no-cascade",
kind="smart",
host_filter="name=%s" % host.name,
organization=smart_inv_org,
)
smart_inv.delete()
assert Host.objects.filter(pk=host.pk).exists()
@pytest.mark.django_db
def test_urlencode_host_filter(post, admin_user, smart_inv_org):
post(
reverse('api:inventory_list'),
data={
'name': 'url-encoded-smart',
'kind': 'smart',
'organization': smart_inv_org.pk,
'host_filter': 'ansible_facts__ansible_distribution_version=%227.4%22',
},
user=admin_user,
expect=201,
)
si = Inventory.objects.get(name='url-encoded-smart')
assert si.host_filter == 'ansible_facts__ansible_distribution_version="7.4"'
@pytest.mark.django_db
def test_host_filter_unicode(post, admin_user, smart_inv_org):
post(
reverse('api:inventory_list'),
data={
'name': 'unicode-smart',
'kind': 'smart',
'organization': smart_inv_org.pk,
'host_filter': u'ansible_facts__ansible_distribution=レッドハット',
},
user=admin_user,
expect=201,
)
si = Inventory.objects.get(name='unicode-smart')
assert si.host_filter == u'ansible_facts__ansible_distribution=レッドハット'
@pytest.mark.django_db
@pytest.mark.parametrize("lookup", ['icontains', 'has_keys'])
def test_host_filter_invalid_ansible_facts_lookup(post, admin_user, smart_inv_org, lookup):
resp = post(
reverse('api:inventory_list'),
data={
'name': 'invalid-lookup-smart',
'kind': 'smart',
'organization': smart_inv_org.pk,
'host_filter': u'ansible_facts__ansible_distribution__{}=cent'.format(lookup),
},
user=admin_user,
expect=400,
)
assert 'ansible_facts does not support searching with __{}'.format(lookup) in json.dumps(resp.data)
@pytest.mark.django_db
def test_host_filter_ansible_facts_exact(post, admin_user, smart_inv_org):
post(
reverse('api:inventory_list'),
data={
'name': 'exact-smart',
'kind': 'smart',
'organization': smart_inv_org.pk,
'host_filter': 'ansible_facts__ansible_distribution__exact="CentOS"',
},
user=admin_user,
expect=201,
)

View File

@@ -1,7 +1,13 @@
import urllib.parse
import pytest
from awx.api.versioning import reverse
from awx.main.models import (
Group,
Host,
Inventory,
Organization,
Schedule,
)
from awx.main.access import (
@@ -128,3 +134,94 @@ class TestSmartInventory:
assert InventoryAccess(org_admin).can_admin(smart_inventory, {'host_filter': 'search=foo'})
smart_inventory.admin_role.members.add(rando)
assert not InventoryAccess(rando).can_admin(smart_inventory, {'host_filter': 'search=foo'})
def test_host_filter_edit_unprivileged(self, smart_inventory, user):
unprivileged = user('unprivileged', False)
assert not InventoryAccess(unprivileged).can_change(smart_inventory, None)
assert not InventoryAccess(unprivileged).can_admin(smart_inventory, {'host_filter': 'search=bar'})
def test_host_filter_edit_inventory_admin_role(self, smart_inventory, user):
inv_admin = user('inv_admin', False)
smart_inventory.admin_role.members.add(inv_admin)
assert InventoryAccess(inv_admin).can_change(smart_inventory, None)
assert not InventoryAccess(inv_admin).can_admin(smart_inventory, {'host_filter': 'search=bar'})
def test_host_filter_edit_org_admin_via_api(self, smart_inventory, patch, user):
oa = user('smart_oa', False)
smart_inventory.organization.admin_role.members.add(oa)
url = reverse('api:inventory_detail', kwargs={'pk': smart_inventory.pk})
resp = patch(url, {'host_filter': 'search=bar'}, oa, expect=200)
assert resp.data['host_filter'] == 'search=bar'
@pytest.mark.parametrize("role_field", ['admin_role', 'use_role', 'adhoc_role', 'read_role'])
def test_inventory_role_cannot_edit_host_filter(self, smart_inventory, patch, user, role_field):
u = user('role_test_user', False)
getattr(smart_inventory, role_field).members.add(u)
url = reverse('api:inventory_detail', kwargs={'pk': smart_inventory.pk})
patch(url, {'host_filter': 'search=bar'}, u, expect=403)
@pytest.mark.django_db
class TestHostFilterRBAC:
@pytest.fixture
def two_org_inventories(self):
orgA = Organization.objects.create(name="rbac-orgA")
orgB = Organization.objects.create(name="rbac-orgB")
invA = Inventory.objects.create(name="rbac-invA", organization=orgA)
invB = Inventory.objects.create(name="rbac-invB", organization=orgB)
hostA = Host.objects.create(name="shared_name", inventory=invA)
hostB = Host.objects.create(name="shared_name", inventory=invB)
groupA = Group.objects.create(name="shared_group", inventory=invA)
groupB = Group.objects.create(name="shared_group", inventory=invB)
groupA.hosts.add(hostA)
groupB.hosts.add(hostB)
return {
'orgA': orgA,
'orgB': orgB,
'invA': invA,
'invB': invB,
'hostA': hostA,
'hostB': hostB,
}
@pytest.mark.parametrize("host_filter", ["name=shared_name", "groups__name=shared_group"])
def test_host_filter_scoped_to_inventory_read_role(self, two_org_inventories, get, user, host_filter):
data = two_org_inventories
userA = user('rbac_userA', False)
userB = user('rbac_userB', False)
data['invA'].read_role.members.add(userA)
data['invB'].read_role.members.add(userB)
url = reverse('api:host_list')
params = "?host_filter=%s" % urllib.parse.quote(host_filter, safe='')
respA = get(url + params, userA)
idsA = [h['id'] for h in respA.data['results']]
assert data['hostA'].id in idsA
assert data['hostB'].id not in idsA
respB = get(url + params, userB)
idsB = [h['id'] for h in respB.data['results']]
assert data['hostB'].id in idsB
assert data['hostA'].id not in idsB
@pytest.mark.parametrize("host_filter", ["name=shared_name", "groups__name=shared_group"])
def test_host_filter_scoped_to_org_admin(self, two_org_inventories, get, user, host_filter):
data = two_org_inventories
adminA = user('rbac_adminA', False)
adminB = user('rbac_adminB', False)
data['orgA'].admin_role.members.add(adminA)
data['orgB'].admin_role.members.add(adminB)
url = reverse('api:host_list')
params = "?host_filter=%s" % urllib.parse.quote(host_filter, safe='')
respA = get(url + params, adminA)
idsA = [h['id'] for h in respA.data['results']]
assert data['hostA'].id in idsA
assert data['hostB'].id not in idsA
respB = get(url + params, adminB)
idsB = [h['id'] for h in respB.data['results']]
assert data['hostB'].id in idsB
assert data['hostA'].id not in idsB

View File

@@ -0,0 +1,320 @@
"""Smart inventory tests that require PostgreSQL.
These tests exercise SmartFilter and smart inventory host resolution against
a real PostgreSQL database. Most are unit-style tests that set ansible_facts
directly on Host objects rather than running playbooks.
The smart inventory HostManager uses DISTINCT ON which requires PostgreSQL,
so any test that reads smart inventory hosts must run here (not in functional/).
"""
import pytest
from awx.main.models import Organization, Inventory, Host, Group
from awx.main.utils.filters import SmartFilter
@pytest.fixture
def fact_org():
org, _ = Organization.objects.get_or_create(name='smart-inv-fact-test-org')
return org
@pytest.fixture
def fact_inventory(fact_org):
inv, created = Inventory.objects.get_or_create(name='smart-inv-fact-test-inv', organization=fact_org)
if not created:
inv.hosts.all().delete()
inv.groups.all().delete()
groupA = Group.objects.create(name='factGroupA', inventory=inv)
groupB = Group.objects.create(name='factGroupB', inventory=inv)
hostA = Host.objects.create(
name='factHostA',
inventory=inv,
ansible_facts={
'ansible_system': 'Linux',
'ansible_distribution': 'CentOS',
'ansible_python': {
'version': {'major': 3, 'minor': 9, 'micro': 7},
'version_info': [3, 9, 7, 'final', 0],
},
'ansible_env': {'HOME': '/root'},
},
)
hostB = Host.objects.create(
name='factHostB',
inventory=inv,
ansible_facts={
'ansible_system': 'Linux',
'ansible_distribution': 'Ubuntu',
'ansible_python': {
'version': {'major': 3, 'minor': 11, 'micro': 2},
'version_info': [3, 11, 2, 'final', 0],
},
'ansible_env': {'HOME': '/home/user'},
},
)
hostC = Host.objects.create(
name='factHostC',
inventory=inv,
ansible_facts={
'ansible_system': 'Darwin',
'ansible_distribution': 'MacOSX',
'ansible_python': {
'version': {'major': 3, 'minor': 10, 'micro': 0},
'version_info': [3, 10, 0, 'final', 0],
},
'ansible_env': {'HOME': '/Users/test'},
},
)
groupA.hosts.add(hostA, hostC)
groupB.hosts.add(hostB, hostC)
yield {
'org': fact_org,
'inv': inv,
'hosts': {'hostA': hostA, 'hostB': hostB, 'hostC': hostC},
'groups': {'groupA': groupA, 'groupB': groupB},
}
hostA.delete()
hostB.delete()
hostC.delete()
groupA.delete()
groupB.delete()
@pytest.fixture
def smart_inventory_factory():
created = []
def _factory(name, host_filter, organization):
inv = Inventory.objects.create(name=name, kind='smart', host_filter=host_filter, organization=organization)
created.append(inv)
return inv
yield _factory
for inv in reversed(created):
inv.delete()
@pytest.fixture
def host_factory():
created = []
def _factory(**kwargs):
host = Host.objects.create(**kwargs)
created.append(host)
return host
yield _factory
for host in reversed(created):
if host.pk is not None:
host.delete()
@pytest.fixture
def group_factory():
created = []
def _factory(**kwargs):
group = Group.objects.create(**kwargs)
created.append(group)
return group
yield _factory
for group in reversed(created):
group.delete()
def query_names(filter_string):
return sorted(SmartFilter.query_from_string(filter_string).distinct().values_list('name', flat=True))
# --- Fact-based filter tests (require PostgreSQL for JSONField __contains) ---
def test_fact_based_host_filter(fact_inventory):
assert query_names('ansible_facts__ansible_system=Linux') == ['factHostA', 'factHostB']
assert query_names('ansible_facts__ansible_distribution=CentOS') == ['factHostA']
assert query_names('ansible_facts__ansible_distribution=Ubuntu') == ['factHostB']
assert query_names('ansible_facts__ansible_system=Darwin') == ['factHostC']
assert query_names('ansible_facts__ansible_system=Windows') == []
def test_nested_fact_search(fact_inventory):
assert query_names('ansible_facts__ansible_python__version__major=3') == ['factHostA', 'factHostB', 'factHostC']
assert query_names('ansible_facts__ansible_python__version__minor=9') == ['factHostA']
assert query_names('ansible_facts__ansible_python__version__minor=11') == ['factHostB']
assert query_names('ansible_facts__ansible_env__HOME=/root') == ['factHostA']
def test_list_fact_search(fact_inventory):
assert query_names('ansible_facts__ansible_python__version_info[]=9') == ['factHostA']
assert query_names('ansible_facts__ansible_python__version_info[]=11') == ['factHostB']
assert query_names('ansible_facts__ansible_python__version_info[]=3') == ['factHostA', 'factHostB', 'factHostC']
def test_fact_search_with_or(fact_inventory):
assert query_names('ansible_facts__ansible_system=Linux or ansible_facts__ansible_system=Linux') == ['factHostA', 'factHostB']
assert query_names('ansible_facts__ansible_system=Linux or ansible_facts__ansible_system=not_found') == ['factHostA', 'factHostB']
assert query_names('ansible_facts__ansible_system=not_found or ansible_facts__ansible_system=not_found') == []
assert query_names('ansible_facts__ansible_system=Linux or ansible_facts__ansible_system=Darwin') == ['factHostA', 'factHostB', 'factHostC']
def test_fact_search_with_and(fact_inventory):
assert query_names('ansible_facts__ansible_system=Linux and ansible_facts__ansible_system=Linux') == ['factHostA', 'factHostB']
assert query_names('ansible_facts__ansible_system=Linux and ansible_facts__ansible_system=not_found') == []
assert query_names('ansible_facts__ansible_system=Linux and ansible_facts__ansible_distribution=CentOS') == ['factHostA']
def test_hybrid_fact_name_group_search(fact_inventory):
assert query_names('name=factHostA or groups__name=factGroupB or ansible_facts__ansible_system=Linux') == ['factHostA', 'factHostB', 'factHostC']
assert query_names('name=factHostA or groups__name=factGroupA or ansible_facts__ansible_system=not_found') == ['factHostA', 'factHostC']
assert query_names('name=factHostA and groups__name=factGroupA and ansible_facts__ansible_system=not_found') == []
assert query_names('name=factHostA and groups__name=factGroupA and ansible_facts__ansible_system=Linux') == ['factHostA']
def test_advanced_hybrid_with_parentheses(fact_inventory):
assert query_names('name=factHostA or (groups__name=factGroupB and ansible_facts__ansible_system=not_found)') == ['factHostA']
assert query_names('name=not_found or (groups__name=factGroupB and ansible_facts__ansible_system=Linux)') == ['factHostB']
assert query_names('(name=factHostA or groups__name=factGroupB) and ansible_facts__ansible_system=not_found') == []
assert query_names('(name=factHostA or groups__name=factGroupB) and ansible_facts__ansible_system=Linux') == ['factHostA', 'factHostB']
assert query_names('(name=factHostC or groups__name=factGroupA) and ansible_facts__ansible_system=Darwin') == ['factHostC']
# --- Smart inventory host resolution tests (require PostgreSQL for DISTINCT ON) ---
def test_smart_inventory_hosts_by_name(fact_inventory, smart_inventory_factory):
org = fact_inventory['org']
smart_inv = smart_inventory_factory('smart-by-name', 'name=factHostA', org)
hosts = sorted(smart_inv.hosts.values_list('name', flat=True))
assert hosts == ['factHostA']
def test_smart_inventory_hosts_by_group(fact_inventory, smart_inventory_factory):
org = fact_inventory['org']
smart_inv = smart_inventory_factory('smart-by-group', 'groups__name=factGroupA', org)
hosts = sorted(smart_inv.hosts.values_list('name', flat=True))
assert hosts == ['factHostA', 'factHostC']
def test_smart_inventory_with_facts(fact_inventory, smart_inventory_factory):
org = fact_inventory['org']
smart_inv = smart_inventory_factory('fact-smart-inv', 'ansible_facts__ansible_system=Linux', org)
hosts = sorted(smart_inv.hosts.values_list('name', flat=True))
assert hosts == ['factHostA', 'factHostB']
assert smart_inv.total_hosts == 2
def test_smart_inventory_with_nested_facts(fact_inventory, smart_inventory_factory):
org = fact_inventory['org']
smart_inv = smart_inventory_factory(
'nested-fact-smart-inv',
'ansible_facts__ansible_distribution=CentOS and ansible_facts__ansible_python__version__minor=9',
org,
)
hosts = list(smart_inv.hosts.values_list('name', flat=True))
assert hosts == ['factHostA']
def test_host_filter_is_organization_scoped(fact_inventory, smart_inventory_factory, host_factory):
"""Smart inventory only includes hosts from its own organization."""
org1 = fact_inventory['org']
org2, _ = Organization.objects.get_or_create(name='smart-inv-other-org')
inv2, _ = Inventory.objects.get_or_create(name='other-org-inv', organization=org2)
Host.objects.filter(name='factHostA', inventory=inv2).delete()
host_factory(name='factHostA', inventory=inv2)
smart_inv = smart_inventory_factory('scoped-smart', 'name=factHostA', org1)
hosts = list(smart_inv.hosts.all())
assert len(hosts) == 1
assert hosts[0].inventory_id == fact_inventory['inv'].id
def test_duplicate_hosts_deduplicated(smart_inventory_factory, host_factory):
"""Same-name hosts across inventories in the same org yield only one smart inventory entry."""
org, _ = Organization.objects.get_or_create(name='smart-inv-dedup-org')
inv1, _ = Inventory.objects.get_or_create(name='dedup-inv1', organization=org)
inv2, _ = Inventory.objects.get_or_create(name='dedup-inv2', organization=org)
Host.objects.filter(name='dedup_host', inventory__in=[inv1, inv2]).delete()
host1 = host_factory(name='dedup_host', inventory=inv1)
host2 = host_factory(name='dedup_host', inventory=inv2)
smart_inv = smart_inventory_factory('dedup-smart', 'name=dedup_host', org)
hosts = list(smart_inv.hosts.all())
assert len(hosts) == 1
assert hosts[0].id == min(host1.id, host2.id)
def test_host_sources_original_inventory(fact_inventory, smart_inventory_factory):
"""Hosts in a smart inventory still reference their source inventory."""
org = fact_inventory['org']
source_inv = fact_inventory['inv']
smart_inv = smart_inventory_factory('sources-original', 'name=factHostA', org)
host = smart_inv.hosts.first()
assert host.inventory_id == source_inv.id
def test_host_updates_reflected_in_smart_inventory(fact_inventory, smart_inventory_factory, host_factory):
"""Editing or deleting a host is immediately reflected in a smart inventory."""
org = fact_inventory['org']
inv = fact_inventory['inv']
host = host_factory(name='mutable_host', inventory=inv)
smart_inv = smart_inventory_factory('updates-reflected', 'name=mutable_host', org)
assert smart_inv.hosts.count() == 1
host.description = 'updated'
host.save()
assert smart_inv.hosts.first().description == 'updated'
host.delete()
assert smart_inv.hosts.count() == 0
def test_smart_inventory_duplicate_hosts_matching_group_names(fact_inventory, smart_inventory_factory, host_factory, group_factory):
"""A host in multiple groups whose names match an icontains filter appears only once."""
org = fact_inventory['org']
inv = fact_inventory['inv']
g1 = group_factory(name='dedup_another_group', inventory=inv)
g2 = group_factory(name='dedup_yet_another_group', inventory=inv)
host = host_factory(name='dedup_grouped_host', inventory=inv)
g1.hosts.add(host)
g2.hosts.add(host)
smart_inv = smart_inventory_factory('group-dedup-smart', 'groups__name__icontains=dedup_another', org)
assert smart_inv.hosts.count() == 1
def test_smart_inventory_computed_fields(fact_inventory, smart_inventory_factory):
"""Smart inventory total_hosts and related computed fields are accurate."""
org = fact_inventory['org']
smart_inv = smart_inventory_factory('computed-fields', 'name=factHostA or name=factHostB', org)
assert smart_inv.total_hosts == 2
assert smart_inv.total_groups == 0
assert smart_inv.total_inventory_sources == 0
assert smart_inv.has_inventory_sources is False
def test_smart_inventory_matches_host_filter(fact_inventory, smart_inventory_factory):
"""Smart inventory hosts should match the equivalent SmartFilter query."""
org = fact_inventory['org']
host_filter = 'groups__name=factGroupA or groups__name=factGroupB'
smart_inv = smart_inventory_factory('match-filter', host_filter, org)
smart_names = sorted(smart_inv.hosts.values_list('name', flat=True))
filter_names = sorted(SmartFilter.query_from_string(host_filter).distinct().values_list('name', flat=True))
assert smart_names == filter_names

View File

@@ -10,8 +10,8 @@ def test_send_messages():
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
m = {}
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat()
m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='')
message = EmailMessage(
@@ -40,8 +40,8 @@ def test_send_messages_with_no_verify_ssl():
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
m = {}
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat()
m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='', grafana_no_verify_ssl=True)
message = EmailMessage(
@@ -71,8 +71,8 @@ def test_send_messages_with_dashboardid(dashboardId):
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
m = {}
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat()
m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId=dashboardId, panelId='')
message = EmailMessage(
@@ -102,8 +102,8 @@ def test_send_messages_with_panelid(panelId):
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
m = {}
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat()
m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId=panelId)
message = EmailMessage(
@@ -132,8 +132,8 @@ def test_send_messages_with_bothids():
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
m = {}
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat()
m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='42', panelId='42')
message = EmailMessage(
@@ -162,8 +162,8 @@ def test_send_messages_with_emptyids():
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
m = {}
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat()
m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='')
message = EmailMessage(
@@ -192,8 +192,8 @@ def test_send_messages_with_tags():
with mock.patch('awx.main.notifications.grafana_backend.requests') as requests_mock:
requests_mock.post.return_value.status_code = 200
m = {}
m['started'] = dt.datetime.utcfromtimestamp(60).isoformat()
m['finished'] = dt.datetime.utcfromtimestamp(120).isoformat()
m['started'] = dt.datetime.fromtimestamp(60, tz=dt.timezone.utc).isoformat()
m['finished'] = dt.datetime.fromtimestamp(120, tz=dt.timezone.utc).isoformat()
m['subject'] = "test subject"
backend = grafana_backend.GrafanaBackend("testapikey", dashboardId='', panelId='', annotation_tags=["ansible"])
message = EmailMessage(

View File

@@ -2,14 +2,7 @@ import re
from functools import reduce
from django.core.exceptions import FieldDoesNotExist
from pyparsing import (
infixNotation,
opAssoc,
Optional,
Literal,
CharsNotIn,
ParseException,
)
import pyparsing as pp
import logging
from logging import Filter
@@ -247,32 +240,19 @@ class SmartFilter(object):
return (assembled_k, assembled_v)
def _extract_key_value(self, t):
t_len = len(t)
k = t[0]
v = t[1] if len(t) > 1 else u""
k = None
v = None
# Strip quotes from key
if isinstance(k, str) and k.startswith('"') and k.endswith('"'):
k = k[1:-1]
# key
# "something"=
v_offset = 2
if t_len >= 2 and t[0] == "\"" and t[2] == "\"":
k = t[1]
v_offset = 4
# something=
# For quoted values, keep the quotes (strip_quotes_* will handle them later).
# For unquoted values, convert to the appropriate Python type.
if isinstance(v, str) and v.startswith('"') and v.endswith('"'):
pass # keep as-is, e.g. '"true"', '""', '"null"'
else:
k = t[0]
# value
# ="something"
if t_len > (v_offset + 2) and t[v_offset] == "\"" and t[v_offset + 2] == "\"":
v = u'"' + str(t[v_offset + 1]) + u'"'
# v = t[v_offset + 1]
# empty ""
elif t_len > (v_offset + 1):
v = u""
# no ""
else:
v = string_to_type(t[v_offset])
v = string_to_type(v)
return (k, v)
@@ -288,7 +268,7 @@ class SmartFilter(object):
try:
model = get_model(relation)
except LookupError:
raise ParseException('No related field named %s' % relation)
raise pp.ParseException('No related field named %s' % relation)
search_kwargs = {}
if model is not None:
@@ -328,34 +308,31 @@ class SmartFilter(object):
def query_from_string(cls, filter_string):
"""
TODO:
* handle values with " via: a.b.c.d="hello\"world"
* handle keys with " via: a.\"b.c="yeah"
* handle key with __ in it
"""
filter_string_raw = filter_string
filter_string = str(filter_string)
unicode_spaces = list(set(str(c) for c in filter_string if c.isspace()))
unicode_spaces_other = unicode_spaces + [u'(', u')', u'=', u'"']
atom = CharsNotIn(unicode_spaces_other)
atom_inside_quotes = CharsNotIn(u'"')
atom_quoted = Literal('"') + Optional(atom_inside_quotes) + Literal('"')
EQUAL = Literal('=')
unquoted = pp.CharsNotIn('()= \t\r\n"')
unquoted.skipWhitespace = True
quoted = pp.QuotedString('"', esc_char='\\', unquote_results=False)
token = quoted | unquoted
grammar = (atom_quoted | atom) + EQUAL + Optional((atom_quoted | atom))
grammar.setParseAction(cls.BoolOperand)
operand = token + pp.Suppress("=") + pp.Optional(token, default="")
operand.set_parse_action(cls.BoolOperand)
boolExpr = infixNotation(
grammar,
bool_expr = pp.infix_notation(
operand,
[
("and", 2, opAssoc.LEFT, cls.BoolAnd),
("or", 2, opAssoc.LEFT, cls.BoolOr),
(pp.Keyword("and"), 2, pp.OpAssoc.LEFT, cls.BoolAnd),
(pp.Keyword("or"), 2, pp.OpAssoc.LEFT, cls.BoolOr),
],
)
try:
res = boolExpr.parseString('(' + filter_string + ')')
except (ParseException, FieldDoesNotExist):
res = bool_expr.parse_string(filter_string, parse_all=True)
except (pp.ParseException, FieldDoesNotExist):
raise RuntimeError(u"Invalid query %s" % filter_string_raw)
if len(res) > 0:

View File

@@ -15,19 +15,6 @@ markers =
filterwarnings =
error
# FIXME: Upgrade python-dateutil https://github.com/dateutil/dateutil/issues/1340
once:datetime.datetime.utcfromtimestamp\(\) is deprecated and scheduled for removal in a future version. Use timezone-aware objects to represent datetimes in UTC:DeprecationWarning
# NOTE: the following are present using python 3.11
# FIXME: Delete this entry once `pyparsing` is updated.
once:module 'sre_constants' is deprecated:DeprecationWarning:_pytest.assertion.rewrite
# FIXME: Delete this entry once `polymorphic` is updated.
once:pkg_resources is deprecated as an API.
# FIXME: Delete this entry once `zope` is updated.
once:Deprecated call to `pkg_resources.declare_namespace.'zope'.`.\nImplementing implicit namespace packages .as specified in PEP 420. is preferred to `pkg_resources.declare_namespace`. See https.//setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages:DeprecationWarning:
# FIXME: Delete this entry once the deprecation is acted upon.
# Note: RemovedInDjango51Warning may not exist in newer Django versions
ignore:'index_together' is deprecated. Use 'Meta.indexes' in 'main.\w+' instead.

View File

@@ -50,7 +50,7 @@ pyasn1>=0.6.2 # CVE-2026-2349
pygerduty
PyGithub
pyopenssl
pyparsing==2.4.7 # Upgrading to v3 of pyparsing introduce errors on smart host filtering: Expected 'or' term, found 'or' (at char 15), (line:1, col:16)
pyparsing>3.0 # Upgraded to v3 and changed import patterns
python-daemon
python-dsv-sdk>=1.0.4
python-tss-sdk>=1.2.1

View File

@@ -392,7 +392,7 @@ pyopenssl==25.3.0
# via
# -r /awx_devel/requirements/requirements.in
# twisted
pyparsing==2.4.7
pyparsing==3.3.2
# via -r /awx_devel/requirements/requirements.in
python-daemon==3.1.2
# via