mirror of
https://github.com/ansible/awx.git
synced 2026-03-13 23:17:32 -02:30
1908 lines
76 KiB
Python
1908 lines
76 KiB
Python
# Copyright (c) 2014 AnsibleWorks, Inc.
|
|
# All Rights Reserved.
|
|
|
|
# Python
|
|
import contextlib
|
|
import json
|
|
import socket
|
|
import struct
|
|
import threading
|
|
import time
|
|
import urlparse
|
|
import uuid
|
|
|
|
# Django
|
|
import django.test
|
|
from django.contrib.auth.models import User as DjangoUser
|
|
from django.conf import settings
|
|
from django.core.urlresolvers import reverse
|
|
from django.db import transaction
|
|
from django.db.models import Q
|
|
from django.test.client import Client
|
|
from django.test.utils import override_settings
|
|
|
|
# Requests
|
|
import requests
|
|
|
|
# AWX
|
|
from awx.main.models import *
|
|
from awx.main.tests.base import BaseTestMixin
|
|
|
|
__all__ = ['JobTemplateTest', 'JobTest', 'JobStartCancelTest',
|
|
'JobTemplateCallbackTest', 'JobTransactionTest']
|
|
|
|
TEST_PLAYBOOK = '''- hosts: all
|
|
gather_facts: false
|
|
tasks:
|
|
- name: woohoo
|
|
command: test 1 = 1
|
|
'''
|
|
|
|
TEST_ASYNC_PLAYBOOK = '''
|
|
- hosts: all
|
|
gather_facts: false
|
|
tasks:
|
|
- name: async task should pass
|
|
command: sleep 10
|
|
async: 20
|
|
poll: 1
|
|
'''
|
|
|
|
TEST_SIMPLE_REQUIRED_SURVEY = '''
|
|
{
|
|
"name": "Simple",
|
|
"description": "Description",
|
|
"spec": [
|
|
{
|
|
"type": "text",
|
|
"question_name": "favorite color",
|
|
"question_description": "What is your favorite color?",
|
|
"variable": "favorite_color",
|
|
"choices": "",
|
|
"min": "",
|
|
"max": "",
|
|
"required": true,
|
|
"default": "blue"
|
|
}
|
|
]
|
|
}
|
|
'''
|
|
|
|
TEST_SIMPLE_NONREQUIRED_SURVEY = '''
|
|
{
|
|
"name": "Simple",
|
|
"description": "Description",
|
|
"spec": [
|
|
{
|
|
"type": "text",
|
|
"question_name": "unladen swallow",
|
|
"question_description": "What is the airspeed velocity of an unladen swallow?",
|
|
"variable": "unladen_swallow",
|
|
"choices": "",
|
|
"min": "",
|
|
"max": "",
|
|
"required": false,
|
|
"default": "european"
|
|
}
|
|
]
|
|
}
|
|
'''
|
|
|
|
TEST_SURVEY_REQUIREMENTS = '''
|
|
{
|
|
"name": "Simple",
|
|
"description": "Description",
|
|
"spec": [
|
|
{
|
|
"type": "text",
|
|
"question_name": "cantbeshort",
|
|
"question_description": "What is a long answer",
|
|
"variable": "long_answer",
|
|
"choices": "",
|
|
"min": 5,
|
|
"max": "",
|
|
"required": false,
|
|
"default": "yes"
|
|
},
|
|
{
|
|
"type": "text",
|
|
"question_name": "cantbelong",
|
|
"question_description": "What is a short answer",
|
|
"variable": "short_answer",
|
|
"choices": "",
|
|
"min": "",
|
|
"max": 5,
|
|
"required": false,
|
|
"default": "yes"
|
|
},
|
|
{
|
|
"type": "text",
|
|
"question_name": "reqd",
|
|
"question_description": "I should be required",
|
|
"variable": "reqd_answer",
|
|
"choices": "",
|
|
"min": "",
|
|
"max": "",
|
|
"required": true,
|
|
"default": "yes"
|
|
},
|
|
{
|
|
"type": "multiplechoice",
|
|
"question_name": "achoice",
|
|
"question_description": "Need one of these",
|
|
"variable": "single_choice",
|
|
"choices": ["one", "two"],
|
|
"min": "",
|
|
"max": "",
|
|
"required": false,
|
|
"default": "yes"
|
|
},
|
|
{
|
|
"type": "multiselect",
|
|
"question_name": "mchoice",
|
|
"question_description": "Can have multiples of these",
|
|
"variable": "multi_choice",
|
|
"choices": ["one", "two", "three"],
|
|
"min": "",
|
|
"max": "",
|
|
"required": false,
|
|
"default": "yes"
|
|
},
|
|
{
|
|
"type": "integer",
|
|
"question_name": "integerchoice",
|
|
"question_description": "I need an int here",
|
|
"variable": "int_answer",
|
|
"choices": "",
|
|
"min": 1,
|
|
"max": 5,
|
|
"required": false,
|
|
"default": ""
|
|
},
|
|
{
|
|
"type": "integer",
|
|
"question_name": "integerchoicewithoutmax",
|
|
"question_description": "I need an int here without max",
|
|
"variable": "int_answer_no_max",
|
|
"choices": "",
|
|
"min": 1,
|
|
"required": false,
|
|
"default": ""
|
|
},
|
|
{
|
|
"type": "float",
|
|
"question_name": "float",
|
|
"question_description": "I need a float here",
|
|
"variable": "float_answer",
|
|
"choices": "",
|
|
"min": 2,
|
|
"max": 5,
|
|
"required": false,
|
|
"default": ""
|
|
},
|
|
{
|
|
"type": "json",
|
|
"question_name": "jsonanswer",
|
|
"question_description": "This answer should validate as json",
|
|
"variable": "json_answer",
|
|
"choices": "",
|
|
"min": "",
|
|
"max": "",
|
|
"required": false,
|
|
"default": ""
|
|
}
|
|
]
|
|
}
|
|
'''
|
|
|
|
class BaseJobTestMixin(BaseTestMixin):
|
|
|
|
|
|
def _create_inventory(self, name, organization, created_by,
|
|
groups_hosts_dict):
|
|
'''Helper method for creating inventory with groups and hosts.'''
|
|
inventory = organization.inventories.create(
|
|
name=name,
|
|
created_by=created_by,
|
|
)
|
|
for group_name, host_names in groups_hosts_dict.items():
|
|
group = inventory.groups.create(
|
|
name=group_name,
|
|
created_by=created_by,
|
|
)
|
|
for host_name in host_names:
|
|
host = inventory.hosts.create(
|
|
name=host_name,
|
|
created_by=created_by,
|
|
)
|
|
group.hosts.add(host)
|
|
return inventory
|
|
|
|
def make_job(self, job_template, created_by, inital_state='new'):
|
|
j_actual = job_template.create_job(created_by=created_by)
|
|
j_actual.status = inital_state
|
|
return j_actual
|
|
|
|
def populate(self):
|
|
# Here's a little story about the Ansible Bread Company, or ABC. They
|
|
# make machines that make bread - bakers, slicers, and packagers - and
|
|
# these machines are each controlled by a Linux boxes, which is in turn
|
|
# managed by Ansible Commander.
|
|
|
|
# Sue is the super user. You don't mess with Sue or you're toast. Ha.
|
|
self.user_sue = self.make_user('sue', super_user=True)
|
|
|
|
# There are three organizations in ABC using Ansible, since it's the
|
|
# best thing for dev ops automation since, well, sliced bread.
|
|
|
|
# Engineering - They design and build the machines.
|
|
self.org_eng = Organization.objects.create(
|
|
name='engineering',
|
|
created_by=self.user_sue,
|
|
)
|
|
# Support - They fix it when it's not working.
|
|
self.org_sup = Organization.objects.create(
|
|
name='support',
|
|
created_by=self.user_sue,
|
|
)
|
|
# Operations - They implement the production lines using the machines.
|
|
self.org_ops = Organization.objects.create(
|
|
name='operations',
|
|
created_by=self.user_sue,
|
|
)
|
|
|
|
# Alex is Sue's IT assistant who can also administer all of the
|
|
# organizations.
|
|
self.user_alex = self.make_user('alex')
|
|
self.org_eng.admins.add(self.user_alex)
|
|
self.org_sup.admins.add(self.user_alex)
|
|
self.org_ops.admins.add(self.user_alex)
|
|
|
|
# Bob is the head of engineering. He's an admin for engineering, but
|
|
# also a user within the operations organization (so he can see the
|
|
# results if things go wrong in production).
|
|
self.user_bob = self.make_user('bob')
|
|
self.org_eng.admins.add(self.user_bob)
|
|
self.org_ops.users.add(self.user_bob)
|
|
|
|
# Chuck is the lead engineer. He has full reign over engineering, but
|
|
# no other organizations.
|
|
self.user_chuck = self.make_user('chuck')
|
|
self.org_eng.admins.add(self.user_chuck)
|
|
|
|
# Doug is the other engineer working under Chuck. He can write
|
|
# playbooks and check them, but Chuck doesn't quite think he's ready to
|
|
# run them yet. Poor Doug.
|
|
self.user_doug = self.make_user('doug')
|
|
self.org_eng.users.add(self.user_doug)
|
|
|
|
# Juan is another engineer working under Chuck. He has a little more freedom
|
|
# to run playbooks but can't create job templates
|
|
self.user_juan = self.make_user('juan')
|
|
self.org_eng.users.add(self.user_juan)
|
|
|
|
# Hannibal is Chuck's right-hand man. Chuck usually has him create the job
|
|
# templates that the rest of the team will use
|
|
self.user_hannibal = self.make_user('hannibal')
|
|
self.org_eng.users.add(self.user_hannibal)
|
|
|
|
# Eve is the head of support. She can also see what goes on in
|
|
# operations to help them troubleshoot problems.
|
|
self.user_eve = self.make_user('eve')
|
|
self.org_sup.admins.add(self.user_eve)
|
|
self.org_ops.users.add(self.user_eve)
|
|
|
|
# Frank is the other support guy.
|
|
self.user_frank = self.make_user('frank')
|
|
self.org_sup.users.add(self.user_frank)
|
|
|
|
# Greg is the head of operations.
|
|
self.user_greg = self.make_user('greg')
|
|
self.org_ops.admins.add(self.user_greg)
|
|
|
|
# Holly is an operations engineer.
|
|
self.user_holly = self.make_user('holly')
|
|
self.org_ops.users.add(self.user_holly)
|
|
|
|
# Iris is another operations engineer.
|
|
self.user_iris = self.make_user('iris')
|
|
self.org_ops.users.add(self.user_iris)
|
|
|
|
# Randall and Billybob are new ops interns that ops uses to test
|
|
# their playbooks and inventory
|
|
self.user_randall = self.make_user('randall')
|
|
self.org_ops.users.add(self.user_randall)
|
|
|
|
# He works with Randall
|
|
self.user_billybob = self.make_user('billybob')
|
|
self.org_ops.users.add(self.user_billybob)
|
|
|
|
# Jim is the newest intern. He can login, but can't do anything quite yet
|
|
# except make everyone else fresh coffee.
|
|
self.user_jim = self.make_user('jim')
|
|
|
|
# There are three main projects, one each for the development, test and
|
|
# production branches of the playbook repository. All three orgs can
|
|
# use the production branch, support can use the production and testing
|
|
# branches, and operations can only use the production branch.
|
|
self.proj_dev = self.make_project('dev', 'development branch',
|
|
self.user_sue, TEST_PLAYBOOK)
|
|
self.org_eng.projects.add(self.proj_dev)
|
|
self.proj_test = self.make_project('test', 'testing branch',
|
|
self.user_sue, TEST_PLAYBOOK)
|
|
self.org_eng.projects.add(self.proj_test)
|
|
self.org_sup.projects.add(self.proj_test)
|
|
self.proj_prod = self.make_project('prod', 'production branch',
|
|
self.user_sue, TEST_PLAYBOOK)
|
|
self.org_eng.projects.add(self.proj_prod)
|
|
self.org_sup.projects.add(self.proj_prod)
|
|
self.org_ops.projects.add(self.proj_prod)
|
|
|
|
# Operations also has 2 additional projects specific to the east/west
|
|
# production environments.
|
|
self.proj_prod_east = self.make_project('prod-east',
|
|
'east production branch',
|
|
self.user_sue, TEST_PLAYBOOK)
|
|
self.org_ops.projects.add(self.proj_prod_east)
|
|
self.proj_prod_west = self.make_project('prod-west',
|
|
'west production branch',
|
|
self.user_sue, TEST_PLAYBOOK)
|
|
self.org_ops.projects.add(self.proj_prod_west)
|
|
|
|
# The engineering organization has a set of servers to use for
|
|
# development and testing (2 bakers, 1 slicer, 1 packager).
|
|
self.inv_eng = self._create_inventory(
|
|
name='engineering environment',
|
|
organization=self.org_eng,
|
|
created_by=self.user_sue,
|
|
groups_hosts_dict={
|
|
'bakers': ['eng-baker1', 'eng-baker2'],
|
|
'slicers': ['eng-slicer1'],
|
|
'packagers': ['eng-packager1'],
|
|
},
|
|
)
|
|
|
|
# The support organization has a set of servers to use for
|
|
# testing and reproducing problems from operations (1 baker, 1 slicer,
|
|
# 1 packager).
|
|
self.inv_sup = self._create_inventory(
|
|
name='support environment',
|
|
organization=self.org_sup,
|
|
created_by=self.user_sue,
|
|
groups_hosts_dict={
|
|
'bakers': ['sup-baker1'],
|
|
'slicers': ['sup-slicer1'],
|
|
'packagers': ['sup-packager1'],
|
|
},
|
|
)
|
|
|
|
# The operations organization manages multiple sets of servers for the
|
|
# east and west production facilities.
|
|
self.inv_ops_east = self._create_inventory(
|
|
name='east production environment',
|
|
organization=self.org_ops,
|
|
created_by=self.user_sue,
|
|
groups_hosts_dict={
|
|
'bakers': ['east-baker%d' % n for n in range(1, 4)],
|
|
'slicers': ['east-slicer%d' % n for n in range(1, 3)],
|
|
'packagers': ['east-packager%d' % n for n in range(1, 3)],
|
|
},
|
|
)
|
|
self.inv_ops_west = self._create_inventory(
|
|
name='west production environment',
|
|
organization=self.org_ops,
|
|
created_by=self.user_sue,
|
|
groups_hosts_dict={
|
|
'bakers': ['west-baker%d' % n for n in range(1, 6)],
|
|
'slicers': ['west-slicer%d' % n for n in range(1, 4)],
|
|
'packagers': ['west-packager%d' % n for n in range(1, 3)],
|
|
},
|
|
)
|
|
|
|
# Operations is divided into teams to work on the east/west servers.
|
|
# Greg and Holly work on east, Greg and iris work on west.
|
|
self.team_ops_east = self.org_ops.teams.create(
|
|
name='easterners',
|
|
created_by=self.user_sue,
|
|
)
|
|
self.team_ops_east.projects.add(self.proj_prod)
|
|
self.team_ops_east.projects.add(self.proj_prod_east)
|
|
self.team_ops_east.users.add(self.user_greg)
|
|
self.team_ops_east.users.add(self.user_holly)
|
|
self.team_ops_west = self.org_ops.teams.create(
|
|
name='westerners',
|
|
created_by=self.user_sue,
|
|
)
|
|
self.team_ops_west.projects.add(self.proj_prod)
|
|
self.team_ops_west.projects.add(self.proj_prod_west)
|
|
self.team_ops_west.users.add(self.user_greg)
|
|
self.team_ops_west.users.add(self.user_iris)
|
|
|
|
# The south team is no longer active having been folded into the east team
|
|
self.team_ops_south = self.org_ops.teams.create(
|
|
name='southerners',
|
|
created_by=self.user_sue,
|
|
active=False,
|
|
)
|
|
self.team_ops_south.projects.add(self.proj_prod)
|
|
self.team_ops_south.users.add(self.user_greg)
|
|
|
|
# The north team is going to be deleted
|
|
self.team_ops_north = self.org_ops.teams.create(
|
|
name='northerners',
|
|
created_by=self.user_sue,
|
|
)
|
|
self.team_ops_north.projects.add(self.proj_prod)
|
|
self.team_ops_north.users.add(self.user_greg)
|
|
|
|
# The testers team are interns that can only check playbooks but can't
|
|
# run them
|
|
self.team_ops_testers = self.org_ops.teams.create(
|
|
name='testers',
|
|
created_by=self.user_sue,
|
|
)
|
|
self.team_ops_testers.projects.add(self.proj_prod)
|
|
self.team_ops_testers.users.add(self.user_randall)
|
|
self.team_ops_testers.users.add(self.user_billybob)
|
|
|
|
# Each user has his/her own set of credentials.
|
|
from awx.main.tests.tasks import (TEST_SSH_KEY_DATA,
|
|
TEST_SSH_KEY_DATA_LOCKED,
|
|
TEST_SSH_KEY_DATA_UNLOCK)
|
|
self.cred_sue = self.user_sue.credentials.create(
|
|
username='sue',
|
|
password=TEST_SSH_KEY_DATA,
|
|
created_by=self.user_sue,
|
|
)
|
|
self.cred_bob = self.user_bob.credentials.create(
|
|
username='bob',
|
|
password='ASK',
|
|
created_by=self.user_sue,
|
|
)
|
|
self.cred_chuck = self.user_chuck.credentials.create(
|
|
username='chuck',
|
|
ssh_key_data=TEST_SSH_KEY_DATA,
|
|
created_by=self.user_sue,
|
|
)
|
|
self.cred_doug = self.user_doug.credentials.create(
|
|
username='doug',
|
|
password='doug doesn\'t mind his password being saved. this '
|
|
'is why we dont\'t let doug actually run jobs.',
|
|
created_by=self.user_sue,
|
|
)
|
|
self.cred_eve = self.user_eve.credentials.create(
|
|
username='eve',
|
|
password='ASK',
|
|
sudo_username='root',
|
|
sudo_password='ASK',
|
|
created_by=self.user_sue,
|
|
)
|
|
self.cred_frank = self.user_frank.credentials.create(
|
|
username='frank',
|
|
password='fr@nk the t@nk',
|
|
created_by=self.user_sue,
|
|
)
|
|
self.cred_greg = self.user_greg.credentials.create(
|
|
username='greg',
|
|
ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
|
|
ssh_key_unlock='ASK',
|
|
created_by=self.user_sue,
|
|
)
|
|
self.cred_holly = self.user_holly.credentials.create(
|
|
username='holly',
|
|
password='holly rocks',
|
|
created_by=self.user_sue,
|
|
)
|
|
self.cred_iris = self.user_iris.credentials.create(
|
|
username='iris',
|
|
password='ASK',
|
|
created_by=self.user_sue,
|
|
)
|
|
|
|
# Each operations team also has shared credentials they can use.
|
|
self.cred_ops_east = self.team_ops_east.credentials.create(
|
|
username='east',
|
|
ssh_key_data=TEST_SSH_KEY_DATA_LOCKED,
|
|
ssh_key_unlock=TEST_SSH_KEY_DATA_UNLOCK,
|
|
created_by = self.user_sue,
|
|
)
|
|
self.cred_ops_west = self.team_ops_west.credentials.create(
|
|
username='west',
|
|
password='Heading270',
|
|
created_by = self.user_sue,
|
|
)
|
|
self.cred_ops_south = self.team_ops_south.credentials.create(
|
|
username='south',
|
|
password='Heading180',
|
|
created_by = self.user_sue,
|
|
)
|
|
|
|
self.cred_ops_north = self.team_ops_north.credentials.create(
|
|
username='north',
|
|
password='Heading0',
|
|
created_by = self.user_sue,
|
|
)
|
|
|
|
self.cred_ops_test = self.team_ops_testers.credentials.create(
|
|
username='testers',
|
|
password='HeadingNone',
|
|
created_by = self.user_sue,
|
|
)
|
|
|
|
self.ops_testers_permission = Permission.objects.create(
|
|
inventory = self.inv_ops_west,
|
|
project = self.proj_prod,
|
|
team = self.team_ops_testers,
|
|
permission_type = PERM_INVENTORY_CHECK,
|
|
created_by = self.user_sue
|
|
)
|
|
|
|
self.doug_check_permission = Permission.objects.create(
|
|
inventory = self.inv_eng,
|
|
project = self.proj_dev,
|
|
user = self.user_doug,
|
|
permission_type = PERM_INVENTORY_CHECK,
|
|
created_by = self.user_sue
|
|
)
|
|
|
|
self.juan_deploy_permission = Permission.objects.create(
|
|
inventory = self.inv_eng,
|
|
project = self.proj_dev,
|
|
user = self.user_juan,
|
|
permission_type = PERM_INVENTORY_DEPLOY,
|
|
created_by = self.user_sue
|
|
)
|
|
|
|
self.hannibal_create_permission = Permission.objects.create(
|
|
inventory = self.inv_eng,
|
|
project = self.proj_dev,
|
|
user = self.user_hannibal,
|
|
permission_type = PERM_JOBTEMPLATE_CREATE,
|
|
created_by = self.user_sue
|
|
)
|
|
|
|
# FIXME: Define explicit permissions for tests.
|
|
# other django user is on the project team and can deploy
|
|
#self.permission1 = Permission.objects.create(
|
|
# inventory = self.inventory,
|
|
# project = self.project,
|
|
# team = self.team,
|
|
# permission_type = PERM_INVENTORY_DEPLOY,
|
|
# created_by = self.normal_django_user
|
|
#)
|
|
# individual permission granted to other2 user, can run check mode
|
|
#self.permission2 = Permission.objects.create(
|
|
# inventory = self.inventory,
|
|
# project = self.project,
|
|
# user = self.other2_django_user,
|
|
# permission_type = PERM_INVENTORY_CHECK,
|
|
# created_by = self.normal_django_user
|
|
#)
|
|
|
|
# Engineering has job templates to check/run the dev project onto
|
|
# their own inventory.
|
|
self.jt_eng_check = JobTemplate.objects.create(
|
|
name='eng-dev-check',
|
|
job_type='check',
|
|
inventory= self.inv_eng,
|
|
project=self.proj_dev,
|
|
playbook=self.proj_dev.playbooks[0],
|
|
host_config_key=uuid.uuid4().hex,
|
|
created_by=self.user_sue,
|
|
)
|
|
# self.job_eng_check = self.jt_eng_check.create_job(
|
|
# created_by=self.user_sue,
|
|
# credential=self.cred_doug,
|
|
# )
|
|
self.jt_eng_run = JobTemplate.objects.create(
|
|
name='eng-dev-run',
|
|
job_type='run',
|
|
inventory= self.inv_eng,
|
|
project=self.proj_dev,
|
|
playbook=self.proj_dev.playbooks[0],
|
|
host_config_key=uuid.uuid4().hex,
|
|
created_by=self.user_sue,
|
|
)
|
|
# self.job_eng_run = self.jt_eng_run.create_job(
|
|
# created_by=self.user_sue,
|
|
# credential=self.cred_chuck,
|
|
# )
|
|
|
|
# Support has job templates to check/run the test project onto
|
|
# their own inventory.
|
|
self.jt_sup_check = JobTemplate.objects.create(
|
|
name='sup-test-check',
|
|
job_type='check',
|
|
inventory= self.inv_sup,
|
|
project=self.proj_test,
|
|
playbook=self.proj_test.playbooks[0],
|
|
host_config_key=uuid.uuid4().hex,
|
|
created_by=self.user_sue,
|
|
)
|
|
# self.job_sup_check = self.jt_sup_check.create_job(
|
|
# created_by=self.user_sue,
|
|
# credential=self.cred_frank,
|
|
# )
|
|
self.jt_sup_run = JobTemplate.objects.create(
|
|
name='sup-test-run',
|
|
job_type='run',
|
|
inventory= self.inv_sup,
|
|
project=self.proj_test,
|
|
playbook=self.proj_test.playbooks[0],
|
|
host_config_key=uuid.uuid4().hex,
|
|
credential=self.cred_eve,
|
|
created_by=self.user_sue,
|
|
)
|
|
# self.job_sup_run = self.jt_sup_run.create_job(
|
|
# created_by=self.user_sue,
|
|
# )
|
|
|
|
# Operations has job templates to check/run the prod project onto
|
|
# both east and west inventories, by default using the team credential.
|
|
self.jt_ops_east_check = JobTemplate.objects.create(
|
|
name='ops-east-prod-check',
|
|
job_type='check',
|
|
inventory= self.inv_ops_east,
|
|
project=self.proj_prod,
|
|
playbook=self.proj_prod.playbooks[0],
|
|
credential=self.cred_ops_east,
|
|
host_config_key=uuid.uuid4().hex,
|
|
created_by=self.user_sue,
|
|
)
|
|
# self.job_ops_east_check = self.jt_ops_east_check.create_job(
|
|
# created_by=self.user_sue,
|
|
# )
|
|
self.jt_ops_east_run = JobTemplate.objects.create(
|
|
name='ops-east-prod-run',
|
|
job_type='run',
|
|
inventory= self.inv_ops_east,
|
|
project=self.proj_prod,
|
|
playbook=self.proj_prod.playbooks[0],
|
|
credential=self.cred_ops_east,
|
|
host_config_key=uuid.uuid4().hex,
|
|
created_by=self.user_sue,
|
|
)
|
|
# self.job_ops_east_run = self.jt_ops_east_run.create_job(
|
|
# created_by=self.user_sue,
|
|
# )
|
|
self.jt_ops_west_check = JobTemplate.objects.create(
|
|
name='ops-west-prod-check',
|
|
job_type='check',
|
|
inventory= self.inv_ops_west,
|
|
project=self.proj_prod,
|
|
playbook=self.proj_prod.playbooks[0],
|
|
credential=self.cred_ops_west,
|
|
host_config_key=uuid.uuid4().hex,
|
|
created_by=self.user_sue,
|
|
)
|
|
# self.job_ops_west_check = self.jt_ops_west_check.create_job(
|
|
# created_by=self.user_sue,
|
|
# )
|
|
self.jt_ops_west_run = JobTemplate.objects.create(
|
|
name='ops-west-prod-run',
|
|
job_type='run',
|
|
inventory= self.inv_ops_west,
|
|
project=self.proj_prod,
|
|
playbook=self.proj_prod.playbooks[0],
|
|
credential=self.cred_ops_west,
|
|
host_config_key=uuid.uuid4().hex,
|
|
created_by=self.user_sue,
|
|
)
|
|
# self.job_ops_west_run = self.jt_ops_west_run.create_job(
|
|
# created_by=self.user_sue,
|
|
# )
|
|
|
|
def setUp(self):
|
|
super(BaseJobTestMixin, self).setUp()
|
|
self.start_redis()
|
|
self.setup_instances()
|
|
self.populate()
|
|
self.start_queue()
|
|
|
|
def tearDown(self):
|
|
super(BaseJobTestMixin, self).tearDown()
|
|
self.stop_redis()
|
|
self.terminate_queue()
|
|
|
|
class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
|
|
|
|
JOB_TEMPLATE_FIELDS = ('id', 'type', 'url', 'related', 'summary_fields',
|
|
'created', 'modified', 'name', 'description',
|
|
'job_type', 'inventory', 'project', 'playbook',
|
|
'credential', 'use_su_credential', 'sudo_su_flag',
|
|
'cloud_credential', 'force_handlers', 'forks',
|
|
'limit', 'verbosity', 'extra_vars',
|
|
'ask_variables_on_launch', 'job_tags', 'skip_tags',
|
|
'start_at_task', 'host_config_key', 'status',
|
|
'next_job_run', 'has_schedules', 'last_job_run',
|
|
'last_job_failed', 'survey_enabled')
|
|
|
|
def test_get_job_template_list(self):
|
|
url = reverse('api:job_template_list')
|
|
qs = JobTemplate.objects.distinct()
|
|
fields = self.JOB_TEMPLATE_FIELDS
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url)
|
|
|
|
# Sue's credentials (superuser) == 200, full list
|
|
self.check_get_list(url, self.user_sue, qs, fields)
|
|
|
|
# Alex's credentials (admin of all orgs) == 200, full list
|
|
self.check_get_list(url, self.user_alex, qs, fields)
|
|
|
|
# Bob's credentials (admin of eng, user of ops) == 200, all from
|
|
# engineering and operations.
|
|
bob_qs = qs.filter(
|
|
Q(project__organizations__admins__in=[self.user_bob]) |
|
|
Q(project__teams__users__in=[self.user_bob]),
|
|
)
|
|
#self.check_get_list(url, self.user_bob, bob_qs, fields)
|
|
|
|
# Chuck's credentials (admin of eng) == 200, all from engineering.
|
|
chuck_qs = qs.filter(
|
|
Q(project__organizations__admins__in=[self.user_chuck]) |
|
|
Q(project__teams__users__in=[self.user_chuck]),
|
|
)
|
|
#self.check_get_list(url, self.user_chuck, chuck_qs, fields)
|
|
|
|
# Doug's credentials (user of eng) == 200, none?.
|
|
doug_qs = qs.filter(
|
|
Q(project__organizations__admins__in=[self.user_doug]) |
|
|
Q(project__teams__users__in=[self.user_doug]),
|
|
)
|
|
#self.check_get_list(url, self.user_doug, doug_qs, fields)
|
|
|
|
# FIXME: Check with other credentials.
|
|
|
|
def test_credentials_list(self):
|
|
url = reverse('api:credential_list')
|
|
# Greg can't see the 'south' credential because the 'southerns' team is inactive
|
|
with self.current_user(self.user_greg):
|
|
all_credentials = self.get(url, expect=200)
|
|
self.assertFalse('south' in [x['username'] for x in all_credentials['results']])
|
|
|
|
url2 = reverse('api:team_detail', args=(self.team_ops_north.id,))
|
|
# Sue shouldn't be able to see the north credential once deleting its team
|
|
with self.current_user(self.user_sue):
|
|
self.delete(url2, expect=204)
|
|
all_credentials = self.get(url, expect=200)
|
|
self.assertFalse('north' in [x['username'] for x in all_credentials['results']])
|
|
|
|
def test_post_job_template_list(self):
|
|
url = reverse('api:job_template_list')
|
|
data = dict(
|
|
name = 'new job template',
|
|
job_type = PERM_INVENTORY_DEPLOY,
|
|
inventory = self.inv_eng.pk,
|
|
project = self.proj_dev.pk,
|
|
playbook = self.proj_dev.playbooks[0],
|
|
)
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url, data, methods=('post',))
|
|
|
|
# sue can always add job templates.
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(url, data, expect=201)
|
|
detail_url = reverse('api:job_template_detail',
|
|
args=(response['id'],))
|
|
self.assertEquals(response['url'], detail_url)
|
|
|
|
# Check that all fields provided were set.
|
|
jt = JobTemplate.objects.get(pk=response['id'])
|
|
self.assertEqual(jt.name, data['name'])
|
|
self.assertEqual(jt.job_type, data['job_type'])
|
|
self.assertEqual(jt.inventory.pk, data['inventory'])
|
|
self.assertEqual(jt.credential, None)
|
|
self.assertEqual(jt.project.pk, data['project'])
|
|
self.assertEqual(jt.playbook, data['playbook'])
|
|
|
|
# Test that all required fields are really required.
|
|
data['name'] = 'another new job template'
|
|
for field in ('name', 'job_type', 'inventory', 'project', 'playbook'):
|
|
with self.current_user(self.user_sue):
|
|
d = dict(data.items())
|
|
d.pop(field)
|
|
response = self.post(url, d, expect=400)
|
|
self.assertTrue(field in response,
|
|
'no error for field "%s" in response' % field)
|
|
|
|
# Test invalid value for job_type.
|
|
with self.current_user(self.user_sue):
|
|
d = dict(data.items())
|
|
d['job_type'] = 'world domination'
|
|
response = self.post(url, d, expect=400)
|
|
self.assertTrue('job_type' in response)
|
|
|
|
# Test playbook not in list of project playbooks.
|
|
with self.current_user(self.user_sue):
|
|
d = dict(data.items())
|
|
d['playbook'] = 'no_playbook_here.yml'
|
|
response = self.post(url, d, expect=400)
|
|
self.assertTrue('playbook' in response)
|
|
|
|
# Test unique constraint on names.
|
|
with self.current_user(self.user_sue):
|
|
d = dict(data.items())
|
|
d['name'] = 'new job template'
|
|
response = self.post(url, d, expect=400)
|
|
self.assertTrue('name' in response)
|
|
self.assertTrue('Job template with this Name already exists.' in response['name'])
|
|
self.assertTrue('__all__' not in response)
|
|
|
|
data = dict(
|
|
name = 'ops job template',
|
|
job_type = PERM_INVENTORY_DEPLOY,
|
|
inventory = self.inv_ops_west.pk,
|
|
project = self.proj_prod.pk,
|
|
playbook = self.proj_prod.playbooks[0],
|
|
)
|
|
|
|
# randall can't create a job template since his job is to check templates
|
|
# as per the ops testers team permission
|
|
with self.current_user(self.user_randall):
|
|
response = self.post(url, data, expect=403)
|
|
|
|
# greg can because he is the head of operations
|
|
with self.current_user(self.user_greg):
|
|
response = self.post(url, data, expect=201)
|
|
|
|
data = dict(
|
|
name = 'eng job template',
|
|
job_type = PERM_INVENTORY_DEPLOY,
|
|
inventory = self.inv_eng.pk,
|
|
project = self.proj_dev.pk,
|
|
playbook = self.proj_dev.playbooks[0],
|
|
)
|
|
|
|
# Neither Juan or Doug can create job templates as they have Permissions
|
|
# that only allow them to DEPLOY and CHECK respectively
|
|
with self.current_user(self.user_juan):
|
|
response = self.post(url, data, expect=403)
|
|
|
|
with self.current_user(self.user_doug):
|
|
response = self.post(url, data, expect=403)
|
|
|
|
# Hannibal, despite his questionable social habits has a user Permission
|
|
# that allows him to create playbooks
|
|
with self.current_user(self.user_hannibal):
|
|
response = self.post(url, data, expect=201)
|
|
|
|
# FIXME: Check other credentials and optional fields.
|
|
|
|
def test_get_job_template_detail(self):
|
|
jt = self.jt_eng_run
|
|
url = reverse('api:job_template_detail', args=(jt.pk,))
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url)
|
|
|
|
# sue can read the job template detail.
|
|
with self.current_user(self.user_sue):
|
|
self.options(url)
|
|
self.head(url)
|
|
response = self.get(url)
|
|
self.assertEqual(response['url'], url)
|
|
self.assertEqual(response['cloud_credential'], None)
|
|
|
|
# FIXME: Check other credentials and optional fields.
|
|
|
|
# TODO: add more tests that show
|
|
# the method used to START a JobTemplate follow the exact same permissions as those to create it ...
|
|
# and that jobs come back nicely serialized with related resources and so on ...
|
|
# that we can drill all the way down and can get at host failure lists, etc ...
|
|
|
|
def test_put_job_template_detail(self):
|
|
jt = self.jt_eng_run
|
|
url = reverse('api:job_template_detail', args=(jt.pk,))
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url, methods=('put',))# 'patch'))
|
|
|
|
# sue can update the job template detail.
|
|
with self.current_user(self.user_sue):
|
|
data = self.get(url)
|
|
data['name'] = '%s-updated' % data['name']
|
|
response = self.put(url, data)
|
|
#patch_data = dict(name='%s-changed' % data['name'])
|
|
#response = self.patch(url, patch_data)
|
|
|
|
# FIXME: Check other credentials and optional fields.
|
|
|
|
def test_get_job_template_job_list(self):
|
|
jt = self.jt_eng_run
|
|
url = reverse('api:job_template_jobs_list', args=(jt.pk,))
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url)
|
|
|
|
# sue can read the job template job list.
|
|
with self.current_user(self.user_sue):
|
|
self.options(url)
|
|
self.head(url)
|
|
response = self.get(url)
|
|
qs = jt.jobs.all()
|
|
self.check_pagination_and_size(response, qs.count())
|
|
self.check_list_ids(response, qs)
|
|
|
|
# FIXME: Check other credentials and optional fields.
|
|
|
|
def test_post_job_template_job_list(self):
|
|
jt = self.jt_eng_run
|
|
url = reverse('api:job_template_jobs_list', args=(jt.pk,))
|
|
data = dict(
|
|
credential=self.cred_bob.pk,
|
|
)
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url, data, methods=('post',))
|
|
|
|
# sue can create a new job from the template.
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(url, data, expect=201)
|
|
|
|
# FIXME: Check other credentials and optional fields.
|
|
|
|
def test_post_job_template_survey(self):
|
|
url = reverse('api:job_template_list')
|
|
data = dict(
|
|
name = 'launched job template',
|
|
job_type = PERM_INVENTORY_DEPLOY,
|
|
inventory = self.inv_eng.pk,
|
|
project = self.proj_dev.pk,
|
|
playbook = self.proj_dev.playbooks[0],
|
|
credential = self.cred_sue.pk,
|
|
survey_enabled = True,
|
|
)
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(url, data, expect=201)
|
|
new_jt_id = response['id']
|
|
detail_url = reverse('api:job_template_detail',
|
|
args=(new_jt_id,))
|
|
self.assertEquals(response['url'], detail_url)
|
|
url = reverse('api:job_template_survey_spec', args=(new_jt_id,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(url, json.loads(TEST_SIMPLE_REQUIRED_SURVEY), expect=200)
|
|
launch_url = reverse('api:job_template_launch', args=(new_jt_id,))
|
|
response = self.get(launch_url)
|
|
self.assertTrue('favorite_color' in response['variables_needed_to_start'])
|
|
response = self.post(launch_url, dict(favorite_color="green"), expect=202)
|
|
job = Job.objects.get(pk=response["job"])
|
|
job_extra = json.loads(job.extra_vars)
|
|
self.assertTrue("favorite_color" in job_extra)
|
|
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(url, json.loads(TEST_SIMPLE_NONREQUIRED_SURVEY), expect=200)
|
|
launch_url = reverse('api:job_template_launch', args=(new_jt_id,))
|
|
response = self.get(launch_url)
|
|
self.assertTrue(len(response['variables_needed_to_start']) == 0)
|
|
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(url, json.loads(TEST_SURVEY_REQUIREMENTS), expect=200)
|
|
launch_url = reverse('api:job_template_launch', args=(new_jt_id,))
|
|
# Just the required answer should work
|
|
self.post(launch_url, dict(reqd_answer="foo"), expect=202)
|
|
# Short answer but requires a long answer
|
|
self.post(launch_url, dict(long_answer='a', reqd_answer="foo"), expect=400)
|
|
# Long answer but requires a short answer
|
|
self.post(launch_url, dict(short_answer='thisissomelongtext', reqd_answer="foo"), expect=400)
|
|
# Long answer but missing required answer
|
|
self.post(launch_url, dict(long_answer='thisissomelongtext'), expect=400)
|
|
# Integer that's not big enough
|
|
self.post(launch_url, dict(int_answer=0, reqd_answer="foo"), expect=400)
|
|
# Integer that's too big
|
|
self.post(launch_url, dict(int_answer=10, reqd_answer="foo"), expect=400)
|
|
# Integer that's just riiiiight
|
|
self.post(launch_url, dict(int_answer=3, reqd_answer="foo"), expect=202)
|
|
# Integer bigger than min with no max defined
|
|
self.post(launch_url, dict(int_answer_no_max=3, reqd_answer="foo"), expect=202)
|
|
# Integer answer that's the wrong type
|
|
self.post(launch_url, dict(int_answer="test", reqd_answer="foo"), expect=400)
|
|
# Float that's too big
|
|
self.post(launch_url, dict(float_answer=10.5, reqd_answer="foo"), expect=400)
|
|
# Float that's too small
|
|
self.post(launch_url, dict(float_answer=1.995, reqd_answer="foo"), expect=400)
|
|
# float that's just riiiiight
|
|
self.post(launch_url, dict(float_answer=2.01, reqd_answer="foo"), expect=202)
|
|
# float answer that's the wrong type
|
|
self.post(launch_url, dict(float_answer="test", reqd_answer="foo"), expect=400)
|
|
# Wrong choice in single choice
|
|
self.post(launch_url, dict(reqd_answer="foo", single_choice="three"), expect=400)
|
|
# Wrong choice in multi choice
|
|
self.post(launch_url, dict(reqd_answer="foo", multi_choice=["four"]), expect=400)
|
|
# Wrong type for multi choicen
|
|
self.post(launch_url, dict(reqd_answer="foo", multi_choice="two"), expect=400)
|
|
# Right choice in single choice
|
|
self.post(launch_url, dict(reqd_answer="foo", single_choice="two"), expect=202)
|
|
# Right choices in multi choice
|
|
self.post(launch_url, dict(reqd_answer="foo", multi_choice=["one", "two"]), expect=202)
|
|
# Nested json
|
|
self.post(launch_url, dict(json_answer=dict(test="val", num=1), reqd_answer="foo"), expect=202)
|
|
|
|
def test_launch_job_template(self):
|
|
url = reverse('api:job_template_list')
|
|
data = dict(
|
|
name = 'launched job template',
|
|
job_type = PERM_INVENTORY_DEPLOY,
|
|
inventory = self.inv_eng.pk,
|
|
project = self.proj_dev.pk,
|
|
credential = self.cred_sue.pk,
|
|
playbook = self.proj_dev.playbooks[0],
|
|
)
|
|
data_no_cred = dict(
|
|
name = 'launched job template no credential',
|
|
job_type = PERM_INVENTORY_DEPLOY,
|
|
inventory = self.inv_eng.pk,
|
|
project = self.proj_dev.pk,
|
|
playbook = self.proj_dev.playbooks[0],
|
|
)
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(url, data, expect=201)
|
|
detail_url = reverse('api:job_template_detail',
|
|
args=(response['id'],))
|
|
self.assertEquals(response['url'], detail_url)
|
|
|
|
launch_url = reverse('api:job_template_launch',
|
|
args=(response['id'],))
|
|
|
|
# You can still post the job template without a credential, just can't launch it without one
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(url, data_no_cred, expect=201)
|
|
detail_url = reverse('api:job_template_detail',
|
|
args=(response['id'],))
|
|
self.assertEquals(response['url'], detail_url)
|
|
|
|
no_launch_url = reverse('api:job_template_launch',
|
|
args=(response['id'],))
|
|
# Invalid auth can't trigger the launch endpoint
|
|
self.check_invalid_auth(launch_url, {}, methods=('post',))
|
|
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(launch_url, {}, expect=202)
|
|
j = Job.objects.get(pk=response['job'])
|
|
self.assertTrue(j.status == 'new')
|
|
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(launch_url, {'credential': self.cred_doug.pk}, expect=202)
|
|
j = Job.objects.get(pk=response['job'])
|
|
self.assertTrue(j.status == 'new')
|
|
|
|
# Can't launch a job template without a credential defined
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(no_launch_url, {}, expect=400)
|
|
|
|
class JobTest(BaseJobTestMixin, django.test.TestCase):
|
|
|
|
def test_get_job_list(self):
|
|
url = reverse('api:job_list')
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url)
|
|
|
|
# sue's credentials (superuser) == 200, full list
|
|
with self.current_user(self.user_sue):
|
|
self.options(url)
|
|
self.head(url)
|
|
response = self.get(url)
|
|
qs = Job.objects.all()
|
|
self.check_pagination_and_size(response, qs.count())
|
|
self.check_list_ids(response, qs)
|
|
|
|
# FIXME: Check individual job result fields.
|
|
# FIXME: Check with other credentials.
|
|
|
|
def test_post_job_list(self):
|
|
url = reverse('api:job_list')
|
|
data = dict(
|
|
name='new job without template',
|
|
job_type=PERM_INVENTORY_DEPLOY,
|
|
inventory=self.inv_ops_east.pk,
|
|
project=self.proj_prod.pk,
|
|
playbook=self.proj_prod.playbooks[0],
|
|
credential=self.cred_ops_east.pk,
|
|
)
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url, data, methods=('post',))
|
|
|
|
# sue can create a new job without a template.
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(url, data, expect=201)
|
|
|
|
# alex can't create a job without a template, only super users can do that
|
|
with self.current_user(self.user_alex):
|
|
response = self.post(url, data, expect=403)
|
|
|
|
# sue can also create a job here from a template.
|
|
jt = self.jt_ops_east_run
|
|
data = dict(
|
|
name='new job from template',
|
|
job_template=jt.pk,
|
|
)
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(url, data, expect=201)
|
|
|
|
# sue can't create a job when it is hidden due to inactive team
|
|
|
|
# FIXME: Check with other credentials and optional fields.
|
|
|
|
def test_get_job_detail(self):
|
|
#job = self.job_ops_east_run
|
|
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success')
|
|
url = reverse('api:job_detail', args=(job.pk,))
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url)
|
|
|
|
# sue can read the job detail.
|
|
with self.current_user(self.user_sue):
|
|
self.options(url)
|
|
self.head(url)
|
|
response = self.get(url)
|
|
self.assertEqual(response['url'], url)
|
|
self.assertEqual(response['cloud_credential'], None)
|
|
|
|
# FIXME: Check with other credentials and optional fields.
|
|
|
|
def test_put_job_detail(self):
|
|
#job = self.job_ops_west_run
|
|
job = self.make_job(self.jt_ops_west_run, self.user_sue, 'success')
|
|
url = reverse('api:job_detail', args=(job.pk,))
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url, methods=('put',))# 'patch'))
|
|
|
|
# sue can update the job detail only if the job is new.
|
|
job.status = 'new'
|
|
job.save()
|
|
self.assertEqual(job.status, 'new')
|
|
with self.current_user(self.user_sue):
|
|
data = self.get(url)
|
|
data['limit'] = '%s-updated' % data['limit']
|
|
response = self.put(url, data)
|
|
#patch_data = dict(limit='%s-changed' % data['limit'])
|
|
#response = self.patch(url, patch_data)
|
|
|
|
# sue cannot update the job detail if it is in any other state.
|
|
for status in ('pending', 'running', 'successful', 'failed', 'error',
|
|
'canceled'):
|
|
job.status = status
|
|
job.save()
|
|
with self.current_user(self.user_sue):
|
|
data = self.get(url)
|
|
data['limit'] = '%s-updated' % data['limit']
|
|
self.put(url, data, expect=405)
|
|
#patch_data = dict(limit='%s-changed' % data['limit'])
|
|
#self.patch(url, patch_data, expect=405)
|
|
|
|
# FIXME: Check with other credentials and readonly fields.
|
|
|
|
def _test_mainline(self):
|
|
url = reverse('api:job_list')
|
|
|
|
# job templates
|
|
data = self.get('/api/v1/job_templates/', expect=401)
|
|
data = self.get('/api/v1/job_templates/', expect=200, auth=self.get_normal_credentials())
|
|
self.assertTrue(data['count'], 2)
|
|
|
|
rec = dict(
|
|
name = 'job-foo',
|
|
credential = self.credential.pk,
|
|
inventory = self.inventory.pk,
|
|
project = self.project.pk,
|
|
job_type = PERM_INVENTORY_DEPLOY
|
|
)
|
|
|
|
# org admin can add job type
|
|
posted = self.post('/api/v1/job_templates/', rec, expect=201, auth=self.get_normal_credentials())
|
|
self.assertEquals(posted['url'], '/api/v1/job_templates/3/')
|
|
|
|
# other_django_user is on a team that can deploy, so can create both deploy and check type jobs
|
|
rec['name'] = 'job-foo2'
|
|
posted = self.post('/api/v1/job_templates/', rec, expect=201, auth=self.get_other_credentials())
|
|
rec['name'] = 'job-foo3'
|
|
rec['job_type'] = PERM_INVENTORY_CHECK
|
|
posted = self.post('/api/v1/job_templates/', rec, expect=201, auth=self.get_other_credentials())
|
|
|
|
# other2_django_user has individual permissions to run check mode, but not deploy
|
|
# nobody user can't even run check mode
|
|
rec['name'] = 'job-foo4'
|
|
self.post('/api/v1/job_templates/', rec, expect=403, auth=self.get_nobody_credentials())
|
|
rec['credential'] = self.credential2.pk
|
|
posted = self.post('/api/v1/job_templates/', rec, expect=201, auth=self.get_other2_credentials())
|
|
rec['name'] = 'job-foo5'
|
|
rec['job_type'] = PERM_INVENTORY_DEPLOY
|
|
self.post('/api/v1/job_templates/', rec, expect=403, auth=self.get_nobody_credentials())
|
|
self.post('/api/v1/job_templates/', rec, expect=201, auth=self.get_other2_credentials())
|
|
url = posted['url']
|
|
|
|
# verify we can also get the job template record
|
|
got = self.get(url, expect=200, auth=self.get_other2_credentials())
|
|
self.failUnlessEqual(got['url'], '/api/v1/job_templates/6/')
|
|
|
|
# TODO: add more tests that show
|
|
# the method used to START a JobTemplate follow the exact same permissions as those to create it ...
|
|
# and that jobs come back nicely serialized with related resources and so on ...
|
|
# that we can drill all the way down and can get at host failure lists, etc ...
|
|
|
|
|
|
@override_settings(CELERY_ALWAYS_EAGER=True,
|
|
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
|
|
ANSIBLE_TRANSPORT='local')
|
|
class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
|
|
'''Job API tests that need to use the celery task backend.'''
|
|
|
|
def setUp(self):
|
|
super(JobStartCancelTest, self).setUp()
|
|
settings.INTERNAL_API_URL = self.live_server_url
|
|
|
|
def tearDown(self):
|
|
super(JobStartCancelTest, self).tearDown()
|
|
|
|
def test_job_start(self):
|
|
#job = self.job_ops_east_run
|
|
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success')
|
|
url = reverse('api:job_start', args=(job.pk,))
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url)
|
|
self.check_invalid_auth(url, methods=('post',))
|
|
|
|
# Sue can start a job (when passwords are already saved) as long as the
|
|
# status is new. Reverse list so "new" will be last.
|
|
for status in reversed([x[0] for x in Job.STATUS_CHOICES]):
|
|
if status == 'waiting':
|
|
continue
|
|
job.status = status
|
|
job.save()
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
if status == 'new':
|
|
self.assertTrue(response['can_start'])
|
|
self.assertFalse(response['passwords_needed_to_start'])
|
|
# response = self.post(url, {}, expect=202)
|
|
# job = Job.objects.get(pk=job.pk)
|
|
# self.assertEqual(job.status, 'successful',
|
|
# job.result_stdout)
|
|
else:
|
|
self.assertFalse(response['can_start'])
|
|
response = self.post(url, {}, expect=405)
|
|
|
|
# Test with a job that prompts for SSH and sudo passwords.
|
|
#job = self.job_sup_run
|
|
job = self.make_job(self.jt_sup_run, self.user_sue, 'new')
|
|
url = reverse('api:job_start', args=(job.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
self.assertTrue(response['can_start'])
|
|
self.assertEqual(set(response['passwords_needed_to_start']),
|
|
set(['ssh_password', 'sudo_password']))
|
|
data = dict()
|
|
response = self.post(url, data, expect=400)
|
|
data['ssh_password'] = 'sshpass'
|
|
response = self.post(url, data, expect=400)
|
|
data2 = dict(sudo_password='sudopass')
|
|
response = self.post(url, data2, expect=400)
|
|
data.update(data2)
|
|
response = self.post(url, data, expect=202)
|
|
job = Job.objects.get(pk=job.pk)
|
|
# FIXME: Test run gets the following error in this case:
|
|
# fatal: [hostname] => sudo output closed while waiting for password prompt:
|
|
#self.assertEqual(job.status, 'successful')
|
|
|
|
# Test with a job that prompts for SSH unlock key, given the wrong key.
|
|
#job = self.jt_ops_west_run.create_job(
|
|
# credential=self.cred_greg,
|
|
# created_by=self.user_sue,
|
|
#)
|
|
job = self.make_job(self.jt_ops_west_run, self.user_sue, 'new')
|
|
job.credential = self.cred_greg
|
|
job.save()
|
|
|
|
url = reverse('api:job_start', args=(job.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
self.assertTrue(response['can_start'])
|
|
self.assertEqual(set(response['passwords_needed_to_start']),
|
|
set(['ssh_key_unlock']))
|
|
data = dict()
|
|
response = self.post(url, data, expect=400)
|
|
# The job should start but fail.
|
|
data['ssh_key_unlock'] = 'sshunlock'
|
|
response = self.post(url, data, expect=202)
|
|
# job = Job.objects.get(pk=job.pk)
|
|
# self.assertEqual(job.status, 'failed')
|
|
|
|
# Test with a job that prompts for SSH unlock key, given the right key.
|
|
from awx.main.tests.tasks import TEST_SSH_KEY_DATA_UNLOCK
|
|
# job = self.jt_ops_west_run.create_job(
|
|
# credential=self.cred_greg,
|
|
# created_by=self.user_sue,
|
|
# )
|
|
job = self.make_job(self.jt_ops_west_run, self.user_sue, 'new')
|
|
job.credential = self.cred_greg
|
|
job.save()
|
|
url = reverse('api:job_start', args=(job.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
self.assertTrue(response['can_start'])
|
|
self.assertEqual(set(response['passwords_needed_to_start']),
|
|
set(['ssh_key_unlock']))
|
|
data = dict()
|
|
response = self.post(url, data, expect=400)
|
|
data['ssh_key_unlock'] = TEST_SSH_KEY_DATA_UNLOCK
|
|
response = self.post(url, data, expect=202)
|
|
# job = Job.objects.get(pk=job.pk)
|
|
# self.assertEqual(job.status, 'successful')
|
|
|
|
# FIXME: Test with other users, test when passwords are required.
|
|
|
|
def test_job_relaunch(self):
|
|
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'success')
|
|
url = reverse('api:job_relaunch', args=(job.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.post(url, {}, expect=202)
|
|
j = Job.objects.get(pk=response['job'])
|
|
self.assertTrue(j.status == 'successful')
|
|
# Test with a job that prompts for SSH and sudo passwords.
|
|
job = self.make_job(self.jt_sup_run, self.user_sue, 'success')
|
|
url = reverse('api:job_start', args=(job.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
self.assertEqual(set(response['passwords_needed_to_start']),
|
|
set(['ssh_password', 'sudo_password']))
|
|
data = dict()
|
|
response = self.post(url, data, expect=400)
|
|
data['ssh_password'] = 'sshpass'
|
|
response = self.post(url, data, expect=400)
|
|
data2 = dict(sudo_password='sudopass')
|
|
response = self.post(url, data2, expect=400)
|
|
data.update(data2)
|
|
response = self.post(url, data, expect=202)
|
|
job = Job.objects.get(pk=job.pk)
|
|
|
|
def test_job_cancel(self):
|
|
#job = self.job_ops_east_run
|
|
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
|
|
url = reverse('api:job_cancel', args=(job.pk,))
|
|
|
|
# Test with no auth and with invalid login.
|
|
self.check_invalid_auth(url)
|
|
self.check_invalid_auth(url, methods=('post',))
|
|
|
|
# sue can cancel the job, but only when it is pending or running.
|
|
for status in [x[0] for x in Job.STATUS_CHOICES]:
|
|
if status == 'waiting':
|
|
continue
|
|
job.status = status
|
|
job.save()
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
if status in ('new', 'pending', 'waiting', 'running'):
|
|
self.assertTrue(response['can_cancel'])
|
|
response = self.post(url, {}, expect=202)
|
|
else:
|
|
self.assertFalse(response['can_cancel'])
|
|
response = self.post(url, {}, expect=405)
|
|
|
|
# FIXME: Test with other users.
|
|
|
|
def test_get_job_results(self):
|
|
# Start/run a job and then access its results via the API.
|
|
#job = self.job_ops_east_run
|
|
job = self.make_job(self.jt_ops_east_run, self.user_sue, 'new')
|
|
job.signal_start()
|
|
|
|
# Check that the job detail has been updated.
|
|
url = reverse('api:job_detail', args=(job.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
self.assertEqual(response['status'], 'successful',
|
|
response['result_traceback'])
|
|
self.assertTrue(response['result_stdout'])
|
|
|
|
# Test job events for completed job.
|
|
url = reverse('api:job_job_events_list', args=(job.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
qs = job.job_events.all()
|
|
self.assertTrue(qs.count())
|
|
self.check_pagination_and_size(response, qs.count())
|
|
self.check_list_ids(response, qs)
|
|
|
|
# Test individual job event detail records.
|
|
host_ids = set()
|
|
for job_event in job.job_events.all():
|
|
if job_event.host:
|
|
host_ids.add(job_event.host.pk)
|
|
url = reverse('api:job_event_detail', args=(job_event.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
|
|
# Also test job event list for each host.
|
|
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
|
|
for host in Host.objects.filter(pk__in=host_ids):
|
|
url = reverse('api:host_job_events_list', args=(host.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
qs = host.job_events.all()
|
|
self.assertTrue(qs.count())
|
|
self.check_pagination_and_size(response, qs.count())
|
|
self.check_list_ids(response, qs)
|
|
|
|
# Test job event list for groups.
|
|
for group in self.inv_ops_east.groups.all():
|
|
url = reverse('api:group_job_events_list', args=(group.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
qs = group.job_events.all()
|
|
self.assertTrue(qs.count(), group)
|
|
self.check_pagination_and_size(response, qs.count())
|
|
self.check_list_ids(response, qs)
|
|
|
|
# Test global job event list.
|
|
url = reverse('api:job_event_list')
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
qs = JobEvent.objects.all()
|
|
self.assertTrue(qs.count())
|
|
self.check_pagination_and_size(response, qs.count())
|
|
self.check_list_ids(response, qs)
|
|
|
|
# Test job host summaries for completed job.
|
|
url = reverse('api:job_job_host_summaries_list', args=(job.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
qs = job.job_host_summaries.all()
|
|
self.assertTrue(qs.count())
|
|
self.check_pagination_and_size(response, qs.count())
|
|
self.check_list_ids(response, qs)
|
|
# Every host referenced by a job_event should be present as a job
|
|
# host summary record.
|
|
self.assertEqual(host_ids,
|
|
set(qs.values_list('host__pk', flat=True)))
|
|
|
|
# Test individual job host summary records.
|
|
for job_host_summary in job.job_host_summaries.all():
|
|
url = reverse('api:job_host_summary_detail',
|
|
args=(job_host_summary.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
|
|
# Test job host summaries for each host.
|
|
for host in Host.objects.filter(pk__in=host_ids):
|
|
url = reverse('api:host_job_host_summaries_list', args=(host.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
qs = host.job_host_summaries.all()
|
|
self.assertTrue(qs.count())
|
|
self.check_pagination_and_size(response, qs.count())
|
|
self.check_list_ids(response, qs)
|
|
|
|
# Test job host summaries for groups.
|
|
for group in self.inv_ops_east.groups.all():
|
|
url = reverse('api:group_job_host_summaries_list', args=(group.pk,))
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url)
|
|
qs = group.job_host_summaries.all()
|
|
self.assertTrue(qs.count())
|
|
self.check_pagination_and_size(response, qs.count())
|
|
self.check_list_ids(response, qs)
|
|
|
|
@override_settings(CELERY_ALWAYS_EAGER=True,
|
|
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
|
|
ANSIBLE_TRANSPORT='local')
|
|
class JobTemplateCallbackTest(BaseJobTestMixin, django.test.LiveServerTestCase):
|
|
'''Job template callback tests for empheral hosts.'''
|
|
|
|
def setUp(self):
|
|
super(JobTemplateCallbackTest, self).setUp()
|
|
settings.INTERNAL_API_URL = self.live_server_url
|
|
# Monkeypatch socket module DNS lookup functions for testing.
|
|
self._original_gethostbyaddr = socket.gethostbyaddr
|
|
self._original_getaddrinfo = socket.getaddrinfo
|
|
socket.gethostbyaddr = self.gethostbyaddr
|
|
socket.getaddrinfo = self.getaddrinfo
|
|
|
|
def tearDown(self):
|
|
super(JobTemplateCallbackTest, self).tearDown()
|
|
socket.gethostbyaddr = self._original_gethostbyaddr
|
|
socket.getaddrinfo = self._original_getaddrinfo
|
|
|
|
def atoh(self, a):
|
|
'''Convert IP address to integer in host byte order.'''
|
|
return socket.ntohl(struct.unpack('I', socket.inet_aton(a))[0])
|
|
|
|
def htoa(self, n):
|
|
'''Convert integer in host byte order to IP address.'''
|
|
return socket.inet_ntoa(struct.pack('I', socket.htonl(n)))
|
|
|
|
def get_test_ips_for_host(self, host):
|
|
'''Return test IP address(es) for given test hostname.'''
|
|
ips = []
|
|
try:
|
|
h = Host.objects.exclude(name__endswith='-alias').get(name=host)
|
|
# Primary IP for host (both forward/reverse lookups work).
|
|
val = self.atoh('127.10.0.0') + h.pk
|
|
ips.append(self.htoa(val))
|
|
# Secondary IP for host (both forward/reverse lookups work).
|
|
if h.pk % 2 == 0:
|
|
val = self.atoh('127.20.0.0') + h.pk
|
|
ips.append(self.htoa(val))
|
|
# Additional IP for host (only forward lookups work).
|
|
if h.pk % 3 == 0:
|
|
val = self.atoh('127.30.0.0') + h.pk
|
|
ips.append(self.htoa(val))
|
|
# Additional IP for host (neither forward/reverse lookups work).
|
|
if h.pk % 3 == 1:
|
|
val = self.atoh('127.40.0.0') + h.pk
|
|
ips.append(self.htoa(val))
|
|
except Host.DoesNotExist:
|
|
pass
|
|
return ips
|
|
|
|
def get_test_host_for_ip(self, ip):
|
|
'''Return test hostname for given test IP address.'''
|
|
if not ip.startswith('127.10.') and not ip.startswith('127.20.'):
|
|
return None
|
|
val = self.atoh(ip)
|
|
try:
|
|
return Host.objects.get(pk=(val & 0x0ffff)).name
|
|
except Host.DoesNotExist:
|
|
return None
|
|
|
|
def test_dummy_host_ip_lookup(self):
|
|
all_ips = set()
|
|
for host in Host.objects.all():
|
|
ips = self.get_test_ips_for_host(host.name)
|
|
self.assertTrue(ips)
|
|
all_ips.update(ips)
|
|
ips = self.get_test_ips_for_host('invalid_host_name')
|
|
self.assertFalse(ips)
|
|
for ip in all_ips:
|
|
host = self.get_test_host_for_ip(ip)
|
|
if ip.startswith('127.30.') or ip.startswith('127.40.'):
|
|
continue
|
|
self.assertTrue(host)
|
|
ips = self.get_test_ips_for_host(host)
|
|
self.assertTrue(ip in ips)
|
|
host = self.get_test_host_for_ip('127.10.254.254')
|
|
self.assertFalse(host)
|
|
|
|
def gethostbyaddr(self, ip):
|
|
if not ip.startswith('127.'):
|
|
return self._original_gethostbyaddr(ip)
|
|
host = self.get_test_host_for_ip(ip)
|
|
if not host:
|
|
raise socket.herror('unknown test host')
|
|
raddr = '.'.join(list(reversed(ip.split('.'))) + ['in-addr', 'arpa'])
|
|
return (host, [raddr], [ip])
|
|
|
|
def getaddrinfo(self, host, port, family=0, socktype=0, proto=0, flags=0):
|
|
if family or socktype or proto or flags:
|
|
return self._original_getaddrinfo(host, port, family, socktype,
|
|
proto, flags)
|
|
port = port or 0
|
|
try:
|
|
socket.inet_aton(host)
|
|
addrs = [host]
|
|
except socket.error:
|
|
addrs = self.get_test_ips_for_host(host)
|
|
addrs = [x for x in addrs if not x.startswith('127.40.')]
|
|
if not addrs:
|
|
raise socket.gaierror('test host not found')
|
|
results = []
|
|
for addr in addrs:
|
|
results.append((socket.AF_INET, socket.SOCK_STREAM,
|
|
socket.IPPROTO_TCP, '', (addr, port)))
|
|
results.append((socket.AF_INET, socket.SOCK_DGRAM,
|
|
socket.IPPROTO_UDP, '', (addr, port)))
|
|
return results
|
|
|
|
def test_job_template_callback(self):
|
|
# Set ansible_ssh_host for certain hosts, update name to be an alias.
|
|
for host in Host.objects.all():
|
|
ips = self.get_test_ips_for_host(host.name)
|
|
for ip in ips:
|
|
if ip.startswith('127.40.'):
|
|
host.name = '%s-alias' % host.name
|
|
host_vars = host.variables_dict
|
|
host_vars['ansible_ssh_host'] = ip
|
|
host.variables = json.dumps(host_vars)
|
|
host.save()
|
|
|
|
# Find a valid job template to use to test the callback.
|
|
job_template = None
|
|
qs = JobTemplate.objects.filter(job_type='run',
|
|
credential__isnull=False)
|
|
qs = qs.exclude(host_config_key='')
|
|
for jt in qs:
|
|
if not jt.can_start_without_user_input():
|
|
continue
|
|
job_template = jt
|
|
break
|
|
self.assertTrue(job_template)
|
|
url = reverse('api:job_template_callback', args=(job_template.pk,))
|
|
data = dict(host_config_key=job_template.host_config_key)
|
|
|
|
# Test a POST to start a new job.
|
|
host_qs = job_template.inventory.hosts.order_by('pk')
|
|
host_qs = host_qs.exclude(variables__icontains='ansible_ssh_host')
|
|
host = host_qs[0]
|
|
host_ip = self.get_test_ips_for_host(host.name)[0]
|
|
jobs_qs = job_template.jobs.filter(launch_type='callback').order_by('-pk')
|
|
self.assertEqual(jobs_qs.count(), 0)
|
|
|
|
# Create the job itself.
|
|
result = self.post(url, data, expect=202, remote_addr=host_ip)
|
|
|
|
# Establish that we got back what we expect, and made the changes
|
|
# that we expect.
|
|
self.assertTrue('Location' in result.response, result.response)
|
|
self.assertEqual(jobs_qs.count(), 1)
|
|
job = jobs_qs[0]
|
|
self.assertEqual(urlparse.urlsplit(result.response['Location']).path,
|
|
job.get_absolute_url())
|
|
self.assertEqual(job.launch_type, 'callback')
|
|
self.assertEqual(job.limit, host.name)
|
|
self.assertEqual(job.hosts.count(), 1)
|
|
self.assertEqual(job.hosts.all()[0], host)
|
|
|
|
# GET as unauthenticated user will prompt for authentication.
|
|
self.get(url, expect=401, remote_addr=host_ip)
|
|
|
|
# Test GET (as super user) to validate host.
|
|
with self.current_user(self.user_sue):
|
|
response = self.get(url, expect=200, remote_addr=host_ip)
|
|
self.assertEqual(response['host_config_key'],
|
|
job_template.host_config_key)
|
|
self.assertEqual(response['matching_hosts'], [host.name])
|
|
|
|
# POST but leave out the host_config_key.
|
|
self.post(url, {}, expect=403, remote_addr=host_ip)
|
|
|
|
# Try with REMOTE_ADDR empty.
|
|
self.post(url, data, expect=400, remote_addr='')
|
|
|
|
# Try with REMOTE_ADDR set to an unknown address.
|
|
self.post(url, data, expect=400, remote_addr='127.127.0.1')
|
|
|
|
# Create a pending job that will block creation of another job
|
|
j = job_template.create_job(limit=job.limit, launch_type='callback')
|
|
j.status = 'pending'
|
|
j.save()
|
|
# This should fail since there is already a pending/waiting callback job on that job template involving that host
|
|
self.post(url, data, expect=400, remote_addr=host_ip)
|
|
j.delete() # Remove that so it's not hanging around
|
|
|
|
# Try using an alternate IP for the host (but one that also resolves
|
|
# via reverse lookup).
|
|
host = None
|
|
host_ip = None
|
|
host_qs = job_template.inventory.hosts.order_by('pk')
|
|
host_qs = host_qs.exclude(variables__icontains='ansible_ssh_host')
|
|
for h in host_qs:
|
|
ips = self.get_test_ips_for_host(h.name)
|
|
for ip in ips:
|
|
if ip.startswith('127.20.'):
|
|
host = h
|
|
host_ip = ip
|
|
break
|
|
if host_ip:
|
|
break
|
|
self.assertTrue(host)
|
|
self.assertEqual(jobs_qs.count(), 1)
|
|
self.post(url, data, expect=202, remote_addr=host_ip)
|
|
self.assertEqual(jobs_qs.count(), 2)
|
|
job = jobs_qs[0]
|
|
self.assertEqual(job.launch_type, 'callback')
|
|
self.assertEqual(job.limit, host.name)
|
|
self.assertEqual(job.hosts.count(), 1)
|
|
self.assertEqual(job.hosts.all()[0], host)
|
|
|
|
# Try using an IP for the host that doesn't resolve via reverse lookup,
|
|
# but can be found by doing a forward lookup on the host name.
|
|
host = None
|
|
host_ip = None
|
|
host_qs = job_template.inventory.hosts.order_by('pk')
|
|
host_qs = host_qs.exclude(variables__icontains='ansible_ssh_host')
|
|
for h in host_qs:
|
|
ips = self.get_test_ips_for_host(h.name)
|
|
for ip in ips:
|
|
if ip.startswith('127.30.'):
|
|
host = h
|
|
host_ip = ip
|
|
break
|
|
if host_ip:
|
|
break
|
|
self.assertTrue(host)
|
|
self.assertEqual(jobs_qs.count(), 2)
|
|
self.post(url, data, expect=202, remote_addr=host_ip)
|
|
self.assertEqual(jobs_qs.count(), 3)
|
|
job = jobs_qs[0]
|
|
self.assertEqual(job.launch_type, 'callback')
|
|
self.assertEqual(job.limit, host.name)
|
|
self.assertEqual(job.hosts.count(), 1)
|
|
self.assertEqual(job.hosts.all()[0], host)
|
|
|
|
# Try using address only specified via ansible_ssh_host.
|
|
host_qs = job_template.inventory.hosts.order_by('pk')
|
|
host_qs = host_qs.filter(variables__icontains='ansible_ssh_host')
|
|
host = host_qs[0]
|
|
host_ip = host.variables_dict['ansible_ssh_host']
|
|
self.assertEqual(jobs_qs.count(), 3)
|
|
self.post(url, data, expect=202, remote_addr=host_ip)
|
|
self.assertEqual(jobs_qs.count(), 4)
|
|
job = jobs_qs[0]
|
|
self.assertEqual(job.launch_type, 'callback')
|
|
self.assertEqual(job.limit, host.name)
|
|
self.assertEqual(job.hosts.count(), 1)
|
|
self.assertEqual(job.hosts.all()[0], host)
|
|
|
|
# Set a limit on the job template to verify the callback job limit is
|
|
# set to the intersection of this limit and the host name.
|
|
job_template.limit = 'bakers:slicers:packagers'
|
|
job_template.save(update_fields=['limit'])
|
|
|
|
# Try when hostname is also an IP address, even if a different one is
|
|
# specified via ansible_ssh_host.
|
|
host_qs = job_template.inventory.hosts.order_by('pk')
|
|
host_qs = host_qs.exclude(variables__icontains='ansible_ssh_host')
|
|
host = None
|
|
host_ip = None
|
|
for h in host_qs:
|
|
ips = self.get_test_ips_for_host(h.name)
|
|
if len(ips) > 1:
|
|
host = h
|
|
host.name = list(ips)[0]
|
|
host_vars = host.variables_dict
|
|
host_vars['ansible_ssh_host'] = list(ips)[1]
|
|
host.variables = json.dumps(host_vars)
|
|
host.save()
|
|
host_ip = list(ips)[0]
|
|
break
|
|
self.assertTrue(host)
|
|
self.assertEqual(jobs_qs.count(), 4)
|
|
self.post(url, data, expect=202, remote_addr=host_ip)
|
|
self.assertEqual(jobs_qs.count(), 5)
|
|
job = jobs_qs[0]
|
|
self.assertEqual(job.launch_type, 'callback')
|
|
self.assertEqual(job.limit, ':&'.join([job_template.limit, host.name]))
|
|
self.assertEqual(job.hosts.count(), 1)
|
|
self.assertEqual(job.hosts.all()[0], host)
|
|
|
|
# Find a new job template to use.
|
|
job_template = None
|
|
qs = JobTemplate.objects.filter(job_type='check',
|
|
credential__isnull=False)
|
|
qs = qs.exclude(host_config_key='')
|
|
for jt in qs:
|
|
if not jt.can_start_without_user_input():
|
|
continue
|
|
job_template = jt
|
|
break
|
|
self.assertTrue(job_template)
|
|
url = reverse('api:job_template_callback', args=(job_template.pk,))
|
|
data = dict(host_config_key=job_template.host_config_key)
|
|
|
|
# Should get an error when multiple hosts match to the same IP.
|
|
host_qs = job_template.inventory.hosts.order_by('pk')
|
|
host_qs = host_qs.exclude(name__endswith='-alias')
|
|
for host in host_qs:
|
|
host_vars = host.variables_dict
|
|
host_vars['ansible_ssh_host'] = '127.50.0.1'
|
|
host.variables = json.dumps(host_vars)
|
|
host.save()
|
|
host = host_qs[0]
|
|
host_ip = host.variables_dict['ansible_ssh_host']
|
|
self.post(url, data, expect=400, remote_addr=host_ip)
|
|
|
|
# Find a job template to run that doesn't have a credential.
|
|
job_template = None
|
|
qs = JobTemplate.objects.filter(job_type='run',
|
|
credential__isnull=True)
|
|
qs = qs.exclude(host_config_key='')
|
|
for jt in qs:
|
|
job_template = jt
|
|
break
|
|
self.assertTrue(job_template)
|
|
url = reverse('api:job_template_callback', args=(job_template.pk,))
|
|
data = dict(host_config_key=job_template.host_config_key)
|
|
|
|
# Test POST to start a new job when the template has no credential.
|
|
host_qs = job_template.inventory.hosts.order_by('pk')
|
|
host_qs = host_qs.exclude(variables__icontains='ansible_ssh_host')
|
|
host = host_qs[0]
|
|
host_ip = self.get_test_ips_for_host(host.name)[0]
|
|
self.post(url, data, expect=400, remote_addr=host_ip)
|
|
|
|
# Find a job template to run that has a credential but would require
|
|
# user input.
|
|
job_template = None
|
|
qs = JobTemplate.objects.filter(job_type='run',
|
|
credential__isnull=False)
|
|
qs = qs.exclude(host_config_key='')
|
|
for jt in qs:
|
|
if jt.can_start_without_user_input():
|
|
continue
|
|
job_template = jt
|
|
break
|
|
self.assertTrue(job_template)
|
|
url = reverse('api:job_template_callback', args=(job_template.pk,))
|
|
data = dict(host_config_key=job_template.host_config_key)
|
|
|
|
# Test POST to start a new job when the credential would require user
|
|
# input.
|
|
host_qs = job_template.inventory.hosts.order_by('pk')
|
|
host_qs = host_qs.exclude(variables__icontains='ansible_ssh_host')
|
|
host = host_qs[0]
|
|
host_ip = self.get_test_ips_for_host(host.name)[0]
|
|
self.post(url, data, expect=400, remote_addr=host_ip)
|
|
|
|
|
|
@override_settings(CELERY_ALWAYS_EAGER=True,
|
|
CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
|
|
ANSIBLE_TRANSPORT='local')
|
|
class JobTransactionTest(BaseJobTestMixin, django.test.LiveServerTestCase):
|
|
'''Job test of transaction locking using the celery task backend.'''
|
|
|
|
def setUp(self):
|
|
super(JobTransactionTest, self).setUp()
|
|
settings.INTERNAL_API_URL = self.live_server_url
|
|
|
|
def tearDown(self):
|
|
super(JobTransactionTest, self).tearDown()
|
|
|
|
def _job_detail_polling_thread(self, url, auth, errors):
|
|
time.sleep(1)
|
|
while True:
|
|
time.sleep(0.1)
|
|
try:
|
|
response = requests.get(url, auth=auth)
|
|
response.raise_for_status()
|
|
data = json.loads(response.content)
|
|
if data.get('status', '') not in ('new', 'pending', 'running'):
|
|
break
|
|
except Exception, e:
|
|
errors.append(e)
|
|
break
|
|
|
|
@contextlib.contextmanager
|
|
def poll_job_detail(self, url, auth, errors):
|
|
try:
|
|
t = threading.Thread(target=self._job_detail_polling_thread,
|
|
args=(url, auth, errors))
|
|
t.start()
|
|
yield
|
|
finally:
|
|
t.join(20)
|
|
|
|
def test_for_job_deadlocks(self):
|
|
if 'postgresql' not in settings.DATABASES['default']['ENGINE']:
|
|
self.skipTest('Not using PostgreSQL')
|
|
# Create lots of extra test hosts to trigger job event callbacks
|
|
#job = self.job_eng_run
|
|
job = self.make_job(self.jt_eng_run, self.user_sue, 'new')
|
|
inv = job.inventory
|
|
for x in xrange(50):
|
|
h = inv.hosts.create(name='local-%d' % x)
|
|
for g in inv.groups.all():
|
|
g.hosts.add(h)
|
|
|
|
job_detail_url = reverse('api:job_detail', args=(job.pk,))
|
|
job_detail_url = urlparse.urljoin(self.live_server_url, job_detail_url)
|
|
auth = ('sue', self._user_passwords['sue'])
|
|
errors = []
|
|
with self.poll_job_detail(job_detail_url, auth, errors):
|
|
with self.current_user(self.user_sue):
|
|
url = reverse('api:job_start', args=(job.pk,))
|
|
response = self.get(url)
|
|
self.assertTrue(response['can_start'])
|
|
self.assertFalse(response['passwords_needed_to_start'])
|
|
response = self.post(url, {}, expect=202)
|
|
job = Job.objects.get(pk=job.pk)
|
|
self.assertEqual(job.status, 'successful', job.result_stdout)
|
|
self.assertFalse(errors)
|
|
|