mirror of
https://github.com/ansible/awx.git
synced 2026-01-11 18:09:57 -03:30
Fix flake8 E302 errors second round.
This commit is contained in:
parent
9e4655419e
commit
2d1a5425d1
@ -480,12 +480,12 @@ class SubListCreateAttachDetachAPIView(SubListCreateAPIView):
|
||||
return self.attach(request, *args, **kwargs)
|
||||
|
||||
|
||||
'''
|
||||
Models for which you want the last instance to be deleted from the database
|
||||
when the last disassociate is called should inherit from this class. Further,
|
||||
the model should implement is_detached()
|
||||
'''
|
||||
class DeleteLastUnattachLabelMixin(object):
|
||||
'''
|
||||
Models for which you want the last instance to be deleted from the database
|
||||
when the last disassociate is called should inherit from this class. Further,
|
||||
the model should implement is_detached()
|
||||
'''
|
||||
def unattach(self, request, *args, **kwargs):
|
||||
(sub_id, res) = super(DeleteLastUnattachLabelMixin, self).unattach_validate(request)
|
||||
if res:
|
||||
|
||||
@ -1295,6 +1295,7 @@ class GroupVariableDataSerializer(BaseVariableDataSerializer):
|
||||
class Meta:
|
||||
model = Group
|
||||
|
||||
|
||||
class CustomInventoryScriptSerializer(BaseSerializer):
|
||||
|
||||
script = serializers.CharField(trim_whitespace=False)
|
||||
|
||||
@ -9,6 +9,7 @@ from rest_framework.response import Response
|
||||
from rest_framework.settings import api_settings
|
||||
from rest_framework import status
|
||||
|
||||
|
||||
def paginated(method):
|
||||
"""Given an method with a Django REST Framework API method signature
|
||||
(e.g. `def get(self, request, ...):`), abstract out boilerplate pagination
|
||||
|
||||
@ -87,6 +87,7 @@ def api_exception_handler(exc, context):
|
||||
exc = ParseError(exc.args[0])
|
||||
return exception_handler(exc, context)
|
||||
|
||||
|
||||
class ApiRootView(APIView):
|
||||
|
||||
authentication_classes = []
|
||||
@ -1816,19 +1817,19 @@ class GroupList(ListCreateAPIView):
|
||||
capabilities_prefetch = ['inventory.admin', 'inventory.adhoc', 'inventory.update']
|
||||
|
||||
|
||||
'''
|
||||
Useful when you have a self-refering ManyToManyRelationship.
|
||||
* Tower uses a shallow (2-deep only) url pattern. For example:
|
||||
|
||||
When an object hangs off of a parent object you would have the url of the
|
||||
form /api/v1/parent_model/34/child_model. If you then wanted a child of the
|
||||
child model you would NOT do /api/v1/parent_model/34/child_model/87/child_child_model
|
||||
Instead, you would access the child_child_model via /api/v1/child_child_model/87/
|
||||
and you would create child_child_model's off of /api/v1/child_model/87/child_child_model_set
|
||||
Now, when creating child_child_model related to child_model you still want to
|
||||
link child_child_model to parent_model. That's what this class is for
|
||||
'''
|
||||
class EnforceParentRelationshipMixin(object):
|
||||
'''
|
||||
Useful when you have a self-refering ManyToManyRelationship.
|
||||
* Tower uses a shallow (2-deep only) url pattern. For example:
|
||||
|
||||
When an object hangs off of a parent object you would have the url of the
|
||||
form /api/v1/parent_model/34/child_model. If you then wanted a child of the
|
||||
child model you would NOT do /api/v1/parent_model/34/child_model/87/child_child_model
|
||||
Instead, you would access the child_child_model via /api/v1/child_child_model/87/
|
||||
and you would create child_child_model's off of /api/v1/child_model/87/child_child_model_set
|
||||
Now, when creating child_child_model related to child_model you still want to
|
||||
link child_child_model to parent_model. That's what this class is for
|
||||
'''
|
||||
enforce_parent_relationship = ''
|
||||
|
||||
def update_raw_data(self, data):
|
||||
@ -2849,12 +2850,15 @@ class WorkflowJobTemplateNodeChildrenBaseList(EnforceParentRelationshipMixin, Su
|
||||
class WorkflowJobTemplateNodeSuccessNodesList(WorkflowJobTemplateNodeChildrenBaseList):
|
||||
relationship = 'success_nodes'
|
||||
|
||||
|
||||
class WorkflowJobTemplateNodeFailureNodesList(WorkflowJobTemplateNodeChildrenBaseList):
|
||||
relationship = 'failure_nodes'
|
||||
|
||||
|
||||
class WorkflowJobTemplateNodeAlwaysNodesList(WorkflowJobTemplateNodeChildrenBaseList):
|
||||
relationship = 'always_nodes'
|
||||
|
||||
|
||||
class WorkflowJobNodeChildrenBaseList(SubListAPIView):
|
||||
|
||||
model = WorkflowJobNode
|
||||
@ -2876,9 +2880,11 @@ class WorkflowJobNodeChildrenBaseList(SubListAPIView):
|
||||
class WorkflowJobNodeSuccessNodesList(WorkflowJobNodeChildrenBaseList):
|
||||
relationship = 'success_nodes'
|
||||
|
||||
|
||||
class WorkflowJobNodeFailureNodesList(WorkflowJobNodeChildrenBaseList):
|
||||
relationship = 'failure_nodes'
|
||||
|
||||
|
||||
class WorkflowJobNodeAlwaysNodesList(WorkflowJobNodeChildrenBaseList):
|
||||
relationship = 'always_nodes'
|
||||
|
||||
@ -3190,6 +3196,7 @@ class JobLabelList(SubListAPIView):
|
||||
class WorkflowJobLabelList(JobLabelList):
|
||||
parent_model = WorkflowJob
|
||||
|
||||
|
||||
class JobActivityStreamList(SubListAPIView):
|
||||
|
||||
model = ActivityStream
|
||||
|
||||
@ -327,6 +327,7 @@ class ActivityStreamEnabled(threading.local):
|
||||
|
||||
activity_stream_enabled = ActivityStreamEnabled()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def disable_activity_stream():
|
||||
'''
|
||||
|
||||
@ -140,14 +140,17 @@ def test_get_inventory_ad_hoc_command_list(admin, alice, post_adhoc, get, invent
|
||||
def test_bad_data1(admin, post_adhoc):
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {'module_name': 'command', 'module_args': None}, admin, expect=400)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_bad_data2(admin, post_adhoc):
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {'job_type': 'baddata'}, admin, expect=400)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_bad_data3(admin, post_adhoc):
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {'verbosity': -1}, admin, expect=400)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_bad_data4(admin, post_adhoc):
|
||||
post_adhoc(reverse('api:ad_hoc_command_list'), {'forks': -1}, admin, expect=400)
|
||||
|
||||
@ -12,11 +12,11 @@ from django.core.urlresolvers import reverse
|
||||
from awx.conf.models import Setting
|
||||
|
||||
|
||||
'''
|
||||
Ensures that tests don't pick up dev container license file
|
||||
'''
|
||||
@pytest.fixture
|
||||
def mock_no_license_file(mocker):
|
||||
'''
|
||||
Ensures that tests don't pick up dev container license file
|
||||
'''
|
||||
os.environ['AWX_LICENSE_FILE'] = '/does_not_exist'
|
||||
return None
|
||||
|
||||
|
||||
@ -39,11 +39,11 @@ def test_cleanup_granularity(fact_scans, hosts):
|
||||
assert 60 == deleted_count
|
||||
|
||||
|
||||
'''
|
||||
Delete half of the scans
|
||||
'''
|
||||
@pytest.mark.django_db
|
||||
def test_cleanup_older_than(fact_scans, hosts):
|
||||
'''
|
||||
Delete half of the scans
|
||||
'''
|
||||
epoch = timezone.now()
|
||||
hosts(5)
|
||||
fact_scans(28, timestamp_epoch=epoch)
|
||||
@ -70,11 +70,11 @@ def test_cleanup_older_than_granularity_module(fact_scans, hosts):
|
||||
assert 20 == deleted_count
|
||||
|
||||
|
||||
'''
|
||||
Reduce the granularity of half of the facts scans, by half.
|
||||
'''
|
||||
@pytest.mark.django_db
|
||||
def test_cleanup_logic(fact_scans, hosts):
|
||||
'''
|
||||
Reduce the granularity of half of the facts scans, by half.
|
||||
'''
|
||||
epoch = timezone.now()
|
||||
hosts = hosts(5)
|
||||
fact_scans(60, timestamp_epoch=epoch)
|
||||
|
||||
@ -62,15 +62,15 @@ def test_process_fact_message_services(fact_msg_services):
|
||||
check_process_fact_message_module(fact_returned, fact_msg_services, 'services')
|
||||
|
||||
|
||||
'''
|
||||
We pickypack our fact sending onto the Ansible fact interface.
|
||||
The interface is <hostname, facts>. Where facts is a json blob of all the facts.
|
||||
This makes it hard to decipher what facts are new/changed.
|
||||
Because of this, we handle the same fact module data being sent multiple times
|
||||
and just keep the newest version.
|
||||
'''
|
||||
@pytest.mark.django_db
|
||||
def test_process_facts_message_ansible_overwrite(fact_scans, fact_msg_ansible):
|
||||
'''
|
||||
We pickypack our fact sending onto the Ansible fact interface.
|
||||
The interface is <hostname, facts>. Where facts is a json blob of all the facts.
|
||||
This makes it hard to decipher what facts are new/changed.
|
||||
Because of this, we handle the same fact module data being sent multiple times
|
||||
and just keep the newest version.
|
||||
'''
|
||||
#epoch = timezone.now()
|
||||
epoch = datetime.fromtimestamp(fact_msg_ansible['date_key'])
|
||||
fact_scans(fact_scans=1, timestamp_epoch=epoch)
|
||||
|
||||
@ -44,23 +44,23 @@ from awx.main.models.notifications import (
|
||||
)
|
||||
|
||||
|
||||
'''
|
||||
Disable all django model signals.
|
||||
'''
|
||||
@pytest.fixture(scope="session", autouse=False)
|
||||
def disable_signals():
|
||||
'''
|
||||
Disable all django model signals.
|
||||
'''
|
||||
mocked = mock.patch('django.dispatch.Signal.send', autospec=True)
|
||||
mocked.start()
|
||||
|
||||
|
||||
'''
|
||||
FIXME: Not sure how "far" just setting the BROKER_URL will get us.
|
||||
We may need to incluence CELERY's configuration like we do in the old unit tests (see base.py)
|
||||
|
||||
Allows django signal code to execute without the need for redis
|
||||
'''
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def celery_memory_broker():
|
||||
'''
|
||||
FIXME: Not sure how "far" just setting the BROKER_URL will get us.
|
||||
We may need to incluence CELERY's configuration like we do in the old unit tests (see base.py)
|
||||
|
||||
Allows django signal code to execute without the need for redis
|
||||
'''
|
||||
settings.BROKER_URL='memory://localhost/'
|
||||
|
||||
|
||||
|
||||
@ -22,18 +22,18 @@ def test_newest_scan_exact(hosts, fact_scans):
|
||||
assert fact_found == fact_known
|
||||
|
||||
|
||||
'''
|
||||
Show me the most recent state of the sytem at any point of time.
|
||||
or, said differently
|
||||
For any timestamp, get the first scan that is <= the timestamp.
|
||||
'''
|
||||
|
||||
'''
|
||||
Ensure most recent scan run is the scan returned.
|
||||
Query by future date.
|
||||
'''
|
||||
@pytest.mark.django_db
|
||||
def test_newest_scan_less_than(hosts, fact_scans):
|
||||
'''
|
||||
Show me the most recent state of the sytem at any point of time.
|
||||
or, said differently
|
||||
For any timestamp, get the first scan that is <= the timestamp.
|
||||
'''
|
||||
|
||||
'''
|
||||
Ensure most recent scan run is the scan returned.
|
||||
Query by future date.
|
||||
'''
|
||||
epoch = timezone.now()
|
||||
timestamp_future = epoch + timedelta(days=10)
|
||||
hosts = hosts(host_count=2)
|
||||
@ -51,11 +51,11 @@ def test_newest_scan_less_than(hosts, fact_scans):
|
||||
assert fact_found == fact_known
|
||||
|
||||
|
||||
'''
|
||||
Tests query Fact that is in the middle of the fact scan timeline, but not an exact timestamp.
|
||||
'''
|
||||
@pytest.mark.django_db
|
||||
def test_query_middle_of_timeline(hosts, fact_scans):
|
||||
'''
|
||||
Tests query Fact that is in the middle of the fact scan timeline, but not an exact timestamp.
|
||||
'''
|
||||
epoch = timezone.now()
|
||||
timestamp_middle = epoch + timedelta(days=1, hours=3)
|
||||
hosts = hosts(host_count=2)
|
||||
@ -73,11 +73,11 @@ def test_query_middle_of_timeline(hosts, fact_scans):
|
||||
assert fact_found == fact_known
|
||||
|
||||
|
||||
'''
|
||||
Query time less than any fact scan. Should return None
|
||||
'''
|
||||
@pytest.mark.django_db
|
||||
def test_query_result_empty(hosts, fact_scans):
|
||||
'''
|
||||
Query time less than any fact scan. Should return None
|
||||
'''
|
||||
epoch = timezone.now()
|
||||
timestamp_less = epoch - timedelta(days=1)
|
||||
hosts = hosts(host_count=2)
|
||||
@ -88,11 +88,11 @@ def test_query_result_empty(hosts, fact_scans):
|
||||
assert fact_found is None
|
||||
|
||||
|
||||
'''
|
||||
Query by fact module other than 'ansible'
|
||||
'''
|
||||
@pytest.mark.django_db
|
||||
def test_by_module(hosts, fact_scans):
|
||||
'''
|
||||
Query by fact module other than 'ansible'
|
||||
'''
|
||||
epoch = timezone.now()
|
||||
hosts = hosts(host_count=2)
|
||||
facts = fact_scans(fact_scans=3, timestamp_epoch=epoch)
|
||||
|
||||
@ -108,7 +108,6 @@ def test_cant_delete_role(delete, admin):
|
||||
assert response.status_code == 405
|
||||
|
||||
|
||||
|
||||
#
|
||||
# /user/<id>/roles
|
||||
#
|
||||
@ -260,6 +259,7 @@ def test_get_role(get, admin, role):
|
||||
assert response.status_code == 200
|
||||
assert response.data['id'] == role.id
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_put_role_405(put, admin, role):
|
||||
url = reverse('api:role_detail', args=(role.id,))
|
||||
@ -434,7 +434,6 @@ def test_role_children(get, team, admin, role):
|
||||
assert response.data['results'][0]['id'] == role.id or response.data['results'][1]['id'] == role.id
|
||||
|
||||
|
||||
|
||||
#
|
||||
# Generics
|
||||
#
|
||||
@ -458,6 +457,7 @@ def test_ensure_rbac_fields_are_present(organization, get, admin):
|
||||
role = org_role_response.data
|
||||
assert role['related']['organization'] == url
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_ensure_role_summary_is_present(organization, get, user):
|
||||
url = reverse('api:organization_detail', args=(organization.id,))
|
||||
|
||||
@ -14,6 +14,7 @@ from awx.main.access import (
|
||||
)
|
||||
from django.apps import apps
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
def test_custom_inv_script_access(organization, user):
|
||||
u = user('user', False)
|
||||
|
||||
@ -13,6 +13,7 @@ from django.core.management import call_command
|
||||
from awx.main.models import * # noqa
|
||||
from awx.main.tests.base import BaseTestMixin
|
||||
|
||||
|
||||
class BaseCommandMixin(BaseTestMixin):
|
||||
def create_test_inventories(self):
|
||||
self.setup_users()
|
||||
|
||||
@ -76,10 +76,10 @@ class AuthTokenLimitTest(BaseTest):
|
||||
self.assertEqual(AuthToken.reason_long('limit_reached'), response['detail'])
|
||||
|
||||
|
||||
'''
|
||||
Ensure ips from the X-Forwarded-For get honored and used in auth tokens
|
||||
'''
|
||||
class AuthTokenProxyTest(BaseTest):
|
||||
'''
|
||||
Ensure ips from the X-Forwarded-For get honored and used in auth tokens
|
||||
'''
|
||||
def check_token_and_expires_exist(self, response):
|
||||
self.assertTrue('token' in response)
|
||||
self.assertTrue('expires' in response)
|
||||
|
||||
@ -17,6 +17,7 @@ from awx.main.models import (
|
||||
def mock_JT_resource_data():
|
||||
return ({}, [])
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def job_template(mocker):
|
||||
mock_jt = mocker.MagicMock(pk=5)
|
||||
|
||||
@ -93,6 +93,7 @@ def workflow_job_unit():
|
||||
def workflow_job_template_unit():
|
||||
return WorkflowJobTemplate(name='workflow')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def jt_ask(job_template_factory):
|
||||
# note: factory sets ask_xxxx_on_launch to true for inventory & credential
|
||||
@ -116,6 +117,7 @@ example_prompts = dict(job_type='check', job_tags='quack', limit='duck', skip_ta
|
||||
def job_node_no_prompts(workflow_job_unit, jt_ask):
|
||||
return WorkflowJobNode(workflow_job=workflow_job_unit, unified_job_template=jt_ask)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def job_node_with_prompts(job_node_no_prompts):
|
||||
job_node_no_prompts.char_prompts = example_prompts
|
||||
|
||||
@ -152,6 +152,7 @@ def inventory_update_latest_factory(epoch):
|
||||
def inventory_update_latest(inventory_update_latest_factory):
|
||||
return inventory_update_latest_factory()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def successful_inventory_update_latest(inventory_update_latest_factory):
|
||||
iu = inventory_update_latest_factory()
|
||||
@ -203,11 +204,11 @@ def successful_inventory_update(epoch, inventory_update_factory):
|
||||
return inventory_update
|
||||
|
||||
|
||||
'''
|
||||
Job
|
||||
'''
|
||||
@pytest.fixture
|
||||
def job_factory(epoch):
|
||||
'''
|
||||
Job
|
||||
'''
|
||||
def fn(id=1, project__scm_update_on_launch=True, inventory__inventory_sources=[], allow_simultaneous=False):
|
||||
return JobDict({
|
||||
'id': id,
|
||||
@ -240,11 +241,11 @@ def running_job(job_factory):
|
||||
return job
|
||||
|
||||
|
||||
'''
|
||||
Inventory id -> [InventorySourceDict, ...]
|
||||
'''
|
||||
@pytest.fixture
|
||||
def inventory_source_factory():
|
||||
'''
|
||||
Inventory id -> [InventorySourceDict, ...]
|
||||
'''
|
||||
def fn(id=1):
|
||||
return InventorySourceDict({
|
||||
'id': id,
|
||||
|
||||
@ -186,6 +186,7 @@ def workflow_dag_always(factory_node):
|
||||
def workflow_dag(request):
|
||||
return request.getfuncargvalue(request.param)
|
||||
|
||||
|
||||
class TestWorkflowDAG():
|
||||
def test_bfs_nodes_to_run(self, workflow_dag):
|
||||
dag, expected, is_done = workflow_dag
|
||||
|
||||
@ -498,6 +498,7 @@ def cache_list_capabilities(page, prefetch_list, model, user):
|
||||
if obj.pk in ids_with_role:
|
||||
obj.capabilities_cache[display_method] = True
|
||||
|
||||
|
||||
def parse_yaml_or_json(vars_str):
|
||||
'''
|
||||
Attempt to parse a string with variables, and if attempt fails,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user