mirror of
https://github.com/ansible/awx.git
synced 2026-02-15 10:10:01 -03:30
Removed old comments/code, better test coverage.
This commit is contained in:
@@ -243,7 +243,7 @@ class AcomInventoryTest(BaseCommandTest):
|
||||
|
||||
def test_with_invalid_inventory_id(self):
|
||||
inventory_pks = set(map(lambda x: x.pk, self.inventories))
|
||||
invalid_id = [x for x in xrange(9999) if x not in inventory_pks][0]
|
||||
invalid_id = [x for x in xrange(1, 9999) if x not in inventory_pks][0]
|
||||
os.environ['ACOM_INVENTORY_ID'] = str(invalid_id)
|
||||
result, stdout, stderr = self.run_command('acom_inventory', list=True)
|
||||
self.assertTrue(isinstance(result, CommandError))
|
||||
|
||||
@@ -23,6 +23,13 @@ from django.test.client import Client
|
||||
from lib.main.models import *
|
||||
from lib.main.tests.base import BaseTest
|
||||
|
||||
TEST_PLAYBOOK = '''- hosts: mygroup
|
||||
gather_facts: false
|
||||
tasks:
|
||||
- name: woohoo
|
||||
command: test 1 = 1
|
||||
'''
|
||||
|
||||
class ProjectsTest(BaseTest):
|
||||
|
||||
# tests for users, projects, and teams
|
||||
@@ -93,6 +100,51 @@ class ProjectsTest(BaseTest):
|
||||
# here is a user without any permissions...
|
||||
return ('nobody', 'nobody')
|
||||
|
||||
def test_available_playbooks(self):
|
||||
def write_test_file(project, name, content):
|
||||
full_path = os.path.join(project.local_path, name)
|
||||
if not os.path.exists(os.path.dirname(full_path)):
|
||||
os.makedirs(os.path.dirname(full_path))
|
||||
f = file(os.path.join(project.local_path, name), 'wb')
|
||||
f.write(content)
|
||||
f.close()
|
||||
# Invalid local_path
|
||||
project = self.projects[0]
|
||||
project.local_path = os.path.join(project.local_path,
|
||||
'does_not_exist')
|
||||
project.save()
|
||||
self.assertEqual(len(project.available_playbooks), 0)
|
||||
# Simple playbook
|
||||
project = self.projects[1]
|
||||
write_test_file(project, 'foo.yml', TEST_PLAYBOOK)
|
||||
self.assertEqual(len(project.available_playbooks), 1)
|
||||
# Other files
|
||||
project = self.projects[2]
|
||||
write_test_file(project, 'foo.txt', 'not a playbook')
|
||||
self.assertEqual(len(project.available_playbooks), 0)
|
||||
# Empty playbook
|
||||
project = self.projects[3]
|
||||
write_test_file(project, 'blah.yml', '')
|
||||
self.assertEqual(len(project.available_playbooks), 0)
|
||||
# Invalid YAML
|
||||
project = self.projects[4]
|
||||
write_test_file(project, 'blah.yml', TEST_PLAYBOOK + '----')
|
||||
self.assertEqual(len(project.available_playbooks), 0)
|
||||
# No hosts or includes
|
||||
project = self.projects[5]
|
||||
playbook_content = TEST_PLAYBOOK.replace('hosts', 'hoists')
|
||||
write_test_file(project, 'blah.yml', playbook_content)
|
||||
self.assertEqual(len(project.available_playbooks), 0)
|
||||
# Playbook in roles folder
|
||||
project = self.projects[6]
|
||||
write_test_file(project, 'roles/blah.yml', TEST_PLAYBOOK)
|
||||
self.assertEqual(len(project.available_playbooks), 0)
|
||||
# Playbook in tasks folder
|
||||
project = self.projects[7]
|
||||
write_test_file(project, 'tasks/blah.yml', TEST_PLAYBOOK)
|
||||
self.assertEqual(len(project.available_playbooks), 0)
|
||||
|
||||
|
||||
def test_mainline(self):
|
||||
|
||||
# =====================================================================
|
||||
|
||||
@@ -135,9 +135,11 @@ class RunJobTest(BaseCeleryTest):
|
||||
# Monkeypatch RunJob to capture list of command line arguments.
|
||||
self.original_build_args = RunJob.build_args
|
||||
self.run_job_args = None
|
||||
self.build_args_callback = lambda: None
|
||||
def new_build_args(_self, job, **kw):
|
||||
args = self.original_build_args(_self, job, **kw)
|
||||
self.run_job_args = args
|
||||
self.build_args_callback()
|
||||
return args
|
||||
RunJob.build_args = new_build_args
|
||||
|
||||
@@ -218,6 +220,9 @@ class RunJobTest(BaseCeleryTest):
|
||||
self.assertEqual(job.status, 'successful')
|
||||
self.assertTrue(job.result_stdout)
|
||||
job_events = job.job_events.all()
|
||||
for job_event in job_events:
|
||||
unicode(job_event) # For test coverage.
|
||||
job_event.save()
|
||||
self.assertEqual(job_events.filter(event='playbook_on_start').count(), 1)
|
||||
self.assertEqual(job_events.filter(event='playbook_on_play_start').count(), 1)
|
||||
self.assertEqual(job_events.filter(event='playbook_on_task_start').count(), 2)
|
||||
@@ -225,6 +230,8 @@ class RunJobTest(BaseCeleryTest):
|
||||
for evt in job_events.filter(event='runner_on_ok'):
|
||||
self.assertEqual(evt.host, self.host)
|
||||
self.assertEqual(job_events.filter(event='playbook_on_stats').count(), 1)
|
||||
for job_host_summary in job.job_host_summaries.all():
|
||||
unicode(job_host_summary) # For test coverage.
|
||||
self.assertEqual(job.successful_hosts.count(), 1)
|
||||
self.assertEqual(job.failed_hosts.count(), 0)
|
||||
self.assertEqual(job.changed_hosts.count(), 1)
|
||||
@@ -310,18 +317,38 @@ class RunJobTest(BaseCeleryTest):
|
||||
self.assertEqual(job.skipped_hosts.count(), 1)
|
||||
self.assertEqual(job.processed_hosts.count(), 1)
|
||||
|
||||
def _cancel_job_callback(self):
|
||||
job = Job.objects.get(pk=self.job.pk)
|
||||
self.assertTrue(job.cancel())
|
||||
self.assertTrue(job.cancel()) # No change from calling again.
|
||||
|
||||
def test_cancel_job(self):
|
||||
self.create_test_project(TEST_PLAYBOOK)
|
||||
job_template = self.create_test_job_template()
|
||||
# The cancel_flag isn't checked until after the job is started, so
|
||||
# setting it here will allow the job to start, then interrupt it.
|
||||
job = self.create_test_job(job_template=job_template, cancel_flag=True)
|
||||
# Pass save=False just for the sake of test coverage.
|
||||
job = self.create_test_job(job_template=job_template, save=False)
|
||||
job.save()
|
||||
self.assertEqual(job.status, 'new')
|
||||
self.assertEqual(job.cancel_flag, False)
|
||||
# Calling cancel before start has no effect.
|
||||
self.assertFalse(job.cancel())
|
||||
self.assertEqual(job.cancel_flag, False)
|
||||
self.assertFalse(job.get_passwords_needed_to_start())
|
||||
self.build_args_callback = self._cancel_job_callback
|
||||
self.assertTrue(job.start())
|
||||
self.assertEqual(job.status, 'pending')
|
||||
job = Job.objects.get(pk=job.pk)
|
||||
self.assertEqual(job.status, 'canceled')
|
||||
self.assertEqual(job.cancel_flag, True)
|
||||
# Calling cancel afterwards just returns the cancel flag.
|
||||
self.assertTrue(job.cancel())
|
||||
# Read attribute for test coverage.
|
||||
job.celery_task
|
||||
job.celery_task_id = ''
|
||||
job.save()
|
||||
self.assertEqual(job.celery_task, None)
|
||||
# Unable to start job again.
|
||||
self.assertFalse(job.start())
|
||||
|
||||
def test_extra_job_options(self):
|
||||
self.create_test_project(TEST_PLAYBOOK)
|
||||
|
||||
Reference in New Issue
Block a user