mirror of
https://github.com/ansible/awx.git
synced 2026-03-01 16:58:46 -03:30
Move some more tests out of root functional folder (#15753)
This commit is contained in:
27
awx/main/tests/functional/tasks/test_tasks_jobs.py
Normal file
27
awx/main/tests/functional/tasks/test_tasks_jobs.py
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
import pytest
|
||||||
|
import os
|
||||||
|
|
||||||
|
from awx.main.tasks.jobs import RunJob
|
||||||
|
from awx.main.models import Job
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def scm_revision_file(tmpdir_factory):
|
||||||
|
# Returns path to temporary testing revision file
|
||||||
|
revision_file = tmpdir_factory.mktemp('revisions').join('revision.txt')
|
||||||
|
with open(str(revision_file), 'w') as f:
|
||||||
|
f.write('1234567890123456789012345678901234567890')
|
||||||
|
return os.path.join(revision_file.dirname, 'revision.txt')
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
def test_does_not_run_reaped_job(mocker, mock_me):
|
||||||
|
job = Job.objects.create(status='failed', job_explanation='This job has been reaped.')
|
||||||
|
mock_run = mocker.patch('awx.main.tasks.jobs.ansible_runner.interface.run')
|
||||||
|
try:
|
||||||
|
RunJob().run(job.id)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
job.refresh_from_db()
|
||||||
|
assert job.status == 'failed'
|
||||||
|
mock_run.assert_not_called()
|
||||||
@@ -1,28 +1,37 @@
|
|||||||
import pytest
|
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
from awx.main.tasks.jobs import RunJob
|
import pytest
|
||||||
from awx.main.tasks.system import CleanupImagesAndFiles, execution_node_health_check
|
|
||||||
from awx.main.models import Instance, Job
|
|
||||||
|
|
||||||
|
from awx.main.tasks.system import CleanupImagesAndFiles, execution_node_health_check, inspect_established_receptor_connections
|
||||||
@pytest.fixture
|
from awx.main.models import Instance, Job, ReceptorAddress, InstanceLink
|
||||||
def scm_revision_file(tmpdir_factory):
|
|
||||||
# Returns path to temporary testing revision file
|
|
||||||
revision_file = tmpdir_factory.mktemp('revisions').join('revision.txt')
|
|
||||||
with open(str(revision_file), 'w') as f:
|
|
||||||
f.write('1234567890123456789012345678901234567890')
|
|
||||||
return os.path.join(revision_file.dirname, 'revision.txt')
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@pytest.mark.parametrize('node_type', ('control. hybrid'))
|
class TestLinkState:
|
||||||
def test_no_worker_info_on_AWX_nodes(node_type):
|
@pytest.fixture(autouse=True)
|
||||||
hostname = 'us-south-3-compute.invalid'
|
def configure_settings(self, settings):
|
||||||
Instance.objects.create(hostname=hostname, node_type=node_type)
|
settings.IS_K8S = True
|
||||||
assert execution_node_health_check(hostname) is None
|
|
||||||
|
def test_inspect_established_receptor_connections(self):
|
||||||
|
'''
|
||||||
|
Change link state from ADDING to ESTABLISHED
|
||||||
|
if the receptor status KnownConnectionCosts field
|
||||||
|
has an entry for the source and target node.
|
||||||
|
'''
|
||||||
|
hop1 = Instance.objects.create(hostname='hop1')
|
||||||
|
hop2 = Instance.objects.create(hostname='hop2')
|
||||||
|
hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2', port=5678)
|
||||||
|
InstanceLink.objects.create(source=hop1, target=hop2addr, link_state=InstanceLink.States.ADDING)
|
||||||
|
|
||||||
|
# calling with empty KnownConnectionCosts should not change the link state
|
||||||
|
inspect_established_receptor_connections({"KnownConnectionCosts": {}})
|
||||||
|
assert InstanceLink.objects.get(source=hop1, target=hop2addr).link_state == InstanceLink.States.ADDING
|
||||||
|
|
||||||
|
mesh_state = {"KnownConnectionCosts": {"hop1": {"hop2": 1}}}
|
||||||
|
inspect_established_receptor_connections(mesh_state)
|
||||||
|
assert InstanceLink.objects.get(source=hop1, target=hop2addr).link_state == InstanceLink.States.ESTABLISHED
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
@@ -46,6 +55,14 @@ def mock_job_folder(job_folder_factory):
|
|||||||
return job_folder_factory()
|
return job_folder_factory()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.django_db
|
||||||
|
@pytest.mark.parametrize('node_type', ('control. hybrid'))
|
||||||
|
def test_no_worker_info_on_AWX_nodes(node_type):
|
||||||
|
hostname = 'us-south-3-compute.invalid'
|
||||||
|
Instance.objects.create(hostname=hostname, node_type=node_type)
|
||||||
|
assert execution_node_health_check(hostname) is None
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
def test_folder_cleanup_stale_file(mock_job_folder, mock_me):
|
def test_folder_cleanup_stale_file(mock_job_folder, mock_me):
|
||||||
CleanupImagesAndFiles.run()
|
CleanupImagesAndFiles.run()
|
||||||
@@ -81,16 +98,3 @@ def test_folder_cleanup_multiple_running_jobs(job_folder_factory, me_inst):
|
|||||||
CleanupImagesAndFiles.run(grace_period=0)
|
CleanupImagesAndFiles.run(grace_period=0)
|
||||||
|
|
||||||
assert [os.path.exists(d) for d in dirs] == [True for i in range(num_jobs)]
|
assert [os.path.exists(d) for d in dirs] == [True for i in range(num_jobs)]
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
def test_does_not_run_reaped_job(mocker, mock_me):
|
|
||||||
job = Job.objects.create(status='failed', job_explanation='This job has been reaped.')
|
|
||||||
mock_run = mocker.patch('awx.main.tasks.jobs.ansible_runner.interface.run')
|
|
||||||
try:
|
|
||||||
RunJob().run(job.id)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
job.refresh_from_db()
|
|
||||||
assert job.status == 'failed'
|
|
||||||
mock_run.assert_not_called()
|
|
||||||
@@ -1,30 +0,0 @@
|
|||||||
import pytest
|
|
||||||
|
|
||||||
from awx.main.models import Instance, ReceptorAddress, InstanceLink
|
|
||||||
from awx.main.tasks.system import inspect_established_receptor_connections
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
|
||||||
class TestLinkState:
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def configure_settings(self, settings):
|
|
||||||
settings.IS_K8S = True
|
|
||||||
|
|
||||||
def test_inspect_established_receptor_connections(self):
|
|
||||||
'''
|
|
||||||
Change link state from ADDING to ESTABLISHED
|
|
||||||
if the receptor status KnownConnectionCosts field
|
|
||||||
has an entry for the source and target node.
|
|
||||||
'''
|
|
||||||
hop1 = Instance.objects.create(hostname='hop1')
|
|
||||||
hop2 = Instance.objects.create(hostname='hop2')
|
|
||||||
hop2addr = ReceptorAddress.objects.create(instance=hop2, address='hop2', port=5678)
|
|
||||||
InstanceLink.objects.create(source=hop1, target=hop2addr, link_state=InstanceLink.States.ADDING)
|
|
||||||
|
|
||||||
# calling with empty KnownConnectionCosts should not change the link state
|
|
||||||
inspect_established_receptor_connections({"KnownConnectionCosts": {}})
|
|
||||||
assert InstanceLink.objects.get(source=hop1, target=hop2addr).link_state == InstanceLink.States.ADDING
|
|
||||||
|
|
||||||
mesh_state = {"KnownConnectionCosts": {"hop1": {"hop2": 1}}}
|
|
||||||
inspect_established_receptor_connections(mesh_state)
|
|
||||||
assert InstanceLink.objects.get(source=hop1, target=hop2addr).link_state == InstanceLink.States.ESTABLISHED
|
|
||||||
Reference in New Issue
Block a user