Fixing up more pep8 issues

This commit is contained in:
Matthew Jones 2015-02-04 16:32:04 -05:00
parent fb9e231189
commit 9e97783b13
16 changed files with 178 additions and 186 deletions

View File

@ -348,7 +348,7 @@ class HostAccess(BaseAccess):
return obj and self.user.can_access(Inventory, 'read', obj.inventory)
def can_add(self, data):
if not data or not 'inventory' in data:
if not data or 'inventory' not in data:
return False
# Checks for admin or change permission on inventory.
@ -419,7 +419,7 @@ class GroupAccess(BaseAccess):
return obj and self.user.can_access(Inventory, 'read', obj.inventory)
def can_add(self, data):
if not data or not 'inventory' in data:
if not data or 'inventory' not in data:
return False
# Checks for admin or change permission on inventory.
inventory_pk = get_pk_from_dict(data, 'inventory')

View File

@ -38,8 +38,7 @@ from django.utils.timezone import now
# AWX
from awx.main.constants import CLOUD_PROVIDERS
from awx.main.models import * # Job, JobEvent, ProjectUpdate, InventoryUpdate,
# Schedule, UnifiedJobTemplate, Instance
from awx.main.models import *
from awx.main.queue import FifoQueue
from awx.main.utils import (get_ansible_version, decrypt_field, update_scm_url,
ignore_inventory_computed_fields, emit_websocket_notification,

View File

@ -13,4 +13,4 @@ from awx.main.tests.jobs import *
from awx.main.tests.activity_stream import *
from awx.main.tests.schedules import *
from awx.main.tests.redact import *
from awx.main.tests.views import *
from awx.main.tests.views import *

View File

@ -34,13 +34,13 @@ class ActivityStreamTest(BaseTest):
self.org_created = self.post(reverse('api:organization_list'), dict(name='test org', description='test descr'), expect=201, auth=self.get_super_credentials())
def test_get_activity_stream_list(self):
url = self.collection()
url = self.collection()
with self.current_user(self.super_django_user):
self.options(url, expect=200)
self.head(url, expect=200)
response = self.get(url, expect=200)
self.check_pagination_and_size(response, 1, previous=None, next=None)
with self.current_user(self.super_django_user):
self.options(url, expect=200)
self.head(url, expect=200)
response = self.get(url, expect=200)
self.check_pagination_and_size(response, 1, previous=None, next=None)
def test_basic_fields(self):
item_id = ActivityStream.objects.order_by('pk')[0].pk

View File

@ -166,12 +166,11 @@ class BaseTestMixin(QueueTestMixin):
def create_test_license_file(self, instance_count=10000):
writer = LicenseWriter(
company_name='AWX',
contact_name='AWX Admin',
contact_email='awx@example.com',
license_date=int(time.time() + 3600),
instance_count=instance_count,
)
company_name='AWX',
contact_name='AWX Admin',
contact_email='awx@example.com',
license_date=int(time.time() + 3600),
instance_count=instance_count)
handle, license_path = tempfile.mkstemp(suffix='.json')
os.close(handle)
writer.write_file(license_path)
@ -416,7 +415,7 @@ class BaseTestMixin(QueueTestMixin):
self.assertFalse(response.content)
#if return_response_object:
# return response
if response.status_code not in [ 204, 405 ] and method_name != 'head' and response.content:
if response.status_code not in [204, 405] and method_name != 'head' and response.content:
# no JSON responses in these at least for now, 409 should probably return some (FIXME)
if response['Content-Type'].startswith('application/json'):
obj = json.loads(response.content)
@ -540,7 +539,7 @@ class BaseTestMixin(QueueTestMixin):
total = qs.count()
if limit is not None:
if limit > 0:
qs = qs[offset:offset+limit]
qs = qs[offset:offset + limit]
else:
qs = qs.none()
self.check_pagination_and_size(response, total, offset > 0,
@ -633,6 +632,7 @@ class URI(object):
def __string__(self):
return self.get_uri()
def __repr__(self):
return self.get_uri()

View File

@ -579,7 +579,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
for host in new_inv.hosts.all():
if host.name == 'web1.example.com':
self.assertEqual(host.variables_dict,
{'ansible_ssh_host': 'w1.example.net'})
{'ansible_ssh_host': 'w1.example.net'})
elif host.name in ('db1.example.com', 'db2.example.com') and source and os.path.isdir(source):
self.assertEqual(host.variables_dict, {'test_host_name': host.name})
elif host.name in ('web3.example.com', 'fe80::1610:9fff:fedd:b654'):
@ -670,7 +670,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
for host in new_inv.hosts.filter(active=True):
if host.name == 'web1.example.com':
self.assertEqual(host.variables_dict,
{'ansible_ssh_host': 'w1.example.net'})
{'ansible_ssh_host': 'w1.example.net'})
elif host.name in ('web3.example.com', 'fe80::1610:9fff:fedd:b654'):
self.assertEqual(host.variables_dict, {'ansible_ssh_port': 1022})
elif host.name == '10.12.14.16':
@ -899,7 +899,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
self.assertEqual(new_inv.hosts.count(), 0)
self.assertEqual(new_inv.groups.count(), 0)
inv_file = os.path.join(os.path.dirname(__file__), 'data',
'splunk_inventory.py')
'splunk_inventory.py')
result, stdout, stderr = self.run_command('inventory_import',
inventory_id=new_inv.pk,
source=inv_file, verbosity=0)

View File

@ -58,10 +58,9 @@ class InventoryTest(BaseTest):
# create a permission here on the 'other' user so they have edit access on the org
# we may add another permission type later.
self.perm_read = Permission.objects.create(
inventory = self.inventory_b,
user = self.other_django_user,
permission_type = 'read'
)
inventory = self.inventory_b,
user = self.other_django_user,
permission_type = 'read')
def tearDown(self):
super(InventoryTest, self).tearDown()
@ -344,10 +343,9 @@ class InventoryTest(BaseTest):
# a normal user with inventory edit permissions (on any inventory) can create hosts
edit_perm = Permission.objects.create(
user = self.other_django_user,
inventory = Inventory.objects.get(pk=inv.pk),
permission_type = PERM_INVENTORY_WRITE
)
user = self.other_django_user,
inventory = Inventory.objects.get(pk=inv.pk),
permission_type = PERM_INVENTORY_WRITE)
host_data3 = self.post(hosts, data=new_host_c, expect=201, auth=self.get_other_credentials())
# Port should be split out into host variables, other variables kept intact.
@ -425,7 +423,7 @@ class InventoryTest(BaseTest):
undel_group = inv.groups.create(name='nondel')
del_children_url = reverse('api:group_children_list', args=(del_group.pk,))
nondel_url = reverse('api:group_detail',
args=(Group.objects.get(name='nondel').pk,))
args=(Group.objects.get(name='nondel').pk,))
del_group.mark_inactive()
nondel_detail = self.get(nondel_url, expect=200, auth=self.get_normal_credentials())
self.post(del_children_url, data=nondel_detail, expect=403, auth=self.get_normal_credentials())
@ -681,7 +679,7 @@ class InventoryTest(BaseTest):
# a super user can set subgroups
subgroups_url = reverse('api:group_children_list',
args=(Group.objects.get(name='web2').pk,))
args=(Group.objects.get(name='web2').pk,))
child_url = reverse('api:group_detail',
args=(Group.objects.get(name='web4').pk,))
subgroups_url2 = reverse('api:group_children_list',
@ -1143,15 +1141,15 @@ class InventoryUpdatesTest(BaseTransactionTest):
#print inventory_update.result_stdout
if should_error:
self.assertEqual(inventory_update.status, 'error',
inventory_update.result_stdout + \
inventory_update.result_stdout +
inventory_update.result_traceback)
elif should_fail:
self.assertEqual(inventory_update.status, 'failed',
inventory_update.result_stdout + \
inventory_update.result_stdout +
inventory_update.result_traceback)
elif should_fail is False:
self.assertEqual(inventory_update.status, 'successful',
inventory_update.result_stdout + \
inventory_update.result_stdout +
inventory_update.result_traceback)
else:
pass # If should_fail is None, we don't care.
@ -1510,8 +1508,8 @@ class InventoryUpdatesTest(BaseTransactionTest):
cache_path = tempfile.mkdtemp(prefix='awx_ec2_')
self._temp_paths.append(cache_path)
inventory_source = self.update_inventory_source(self.group,
source='ec2', credential=credential, source_regions=source_regions,
source_vars='---\n\nnested_groups: false\ncache_path: %s\n' % cache_path)
source='ec2', credential=credential, source_regions=source_regions,
source_vars='---\n\nnested_groups: false\ncache_path: %s\n' % cache_path)
# Check first without instance_id set (to import by name only).
with self.settings(EC2_INSTANCE_ID_VAR=''):
self.check_inventory_source(inventory_source)
@ -1593,8 +1591,8 @@ class InventoryUpdatesTest(BaseTransactionTest):
cache_path_pattern = os.path.join(tempfile.gettempdir(), 'awx_ec2_*')
old_cache_paths = set(glob.glob(cache_path_pattern))
inventory_source = self.update_inventory_source(self.group,
source='ec2', credential=credential, source_regions=source_regions,
source_vars='---') # nested_groups is true by default.
source='ec2', credential=credential, source_regions=source_regions,
source_vars='---') # nested_groups is true by default.
self.check_inventory_source(inventory_source)
# Verify that main group is in top level groups (hasn't been added as
# its own child).
@ -1673,12 +1671,13 @@ class InventoryUpdatesTest(BaseTransactionTest):
return
# Print out group/host tree for debugging.
print
def draw_tree(g, d=0):
print (' ' * d) + '+ ' + g.name
for h in g.hosts.order_by('name'):
print (' ' * d) + ' - ' + h.name
for c in g.children.order_by('name'):
draw_tree(c, d+1)
draw_tree(c, d + 1)
for g in self.inventory.root_groups.order_by('name'):
draw_tree(g)
@ -1699,7 +1698,7 @@ class InventoryUpdatesTest(BaseTransactionTest):
group.save()
self.group = group
inventory_source = self.update_inventory_source(self.group,
source='rax', credential=credential, source_regions=source_regions)
source='rax', credential=credential, source_regions=source_regions)
# Check first without instance_id set (to import by name only).
with self.settings(RAX_INSTANCE_ID_VAR=''):
self.check_inventory_source(inventory_source)
@ -1728,7 +1727,7 @@ class InventoryUpdatesTest(BaseTransactionTest):
# If test source regions is given, test again with empty string.
if source_regions:
inventory_source2 = self.update_inventory_source(self.group2,
source='rax', credential=credential, source_regions='')
source='rax', credential=credential, source_regions='')
self.check_inventory_source(inventory_source2)
# Verify that main group is in top level groups (hasn't been added as
# its own child).
@ -1747,7 +1746,7 @@ class InventoryUpdatesTest(BaseTransactionTest):
password=source_password,
host=source_host)
inventory_source = self.update_inventory_source(self.group,
source='vmware', credential=credential)
source='vmware', credential=credential)
# Check first without instance_id set (to import by name only).
with self.settings(VMWARE_INSTANCE_ID_VAR=''):
self.check_inventory_source(inventory_source)

View File

@ -55,15 +55,15 @@ TEST_SIMPLE_REQUIRED_SURVEY = '''
"description": "Description",
"spec": [
{
"type": "text",
"question_name": "favorite color",
"question_description": "What is your favorite color?",
"variable": "favorite_color",
"choices": "",
"min": "",
"max": "",
"required": true,
"default": "blue"
"type": "text",
"question_name": "favorite color",
"question_description": "What is your favorite color?",
"variable": "favorite_color",
"choices": "",
"min": "",
"max": "",
"required": true,
"default": "blue"
}
]
}
@ -75,15 +75,15 @@ TEST_SIMPLE_NONREQUIRED_SURVEY = '''
"description": "Description",
"spec": [
{
"type": "text",
"question_name": "unladen swallow",
"question_description": "What is the airspeed velocity of an unladen swallow?",
"variable": "unladen_swallow",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "european"
"type": "text",
"question_name": "unladen swallow",
"question_description": "What is the airspeed velocity of an unladen swallow?",
"variable": "unladen_swallow",
"choices": "",
"min": "",
"max": "",
"required": false,
"default": "european"
}
]
}
@ -95,59 +95,59 @@ TEST_SURVEY_REQUIREMENTS = '''
"description": "Description",
"spec": [
{
"type": "text",
"question_name": "cantbeshort",
"question_description": "What is a long answer",
"variable": "long_answer",
"choices": "",
"min": 5,
"max": "",
"required": false,
"default": "yes"
"type": "text",
"question_name": "cantbeshort",
"question_description": "What is a long answer",
"variable": "long_answer",
"choices": "",
"min": 5,
"max": "",
"required": false,
"default": "yes"
},
{
"type": "text",
"question_name": "cantbelong",
"question_description": "What is a short answer",
"variable": "short_answer",
"choices": "",
"min": "",
"max": 5,
"required": false,
"default": "yes"
"type": "text",
"question_name": "cantbelong",
"question_description": "What is a short answer",
"variable": "short_answer",
"choices": "",
"min": "",
"max": 5,
"required": false,
"default": "yes"
},
{
"type": "text",
"question_name": "reqd",
"question_description": "I should be required",
"variable": "reqd_answer",
"choices": "",
"min": "",
"max": "",
"required": true,
"default": "yes"
"type": "text",
"question_name": "reqd",
"question_description": "I should be required",
"variable": "reqd_answer",
"choices": "",
"min": "",
"max": "",
"required": true,
"default": "yes"
},
{
"type": "multiplechoice",
"question_name": "achoice",
"question_description": "Need one of these",
"variable": "single_choice",
"choices": ["one", "two"],
"min": "",
"max": "",
"required": false,
"default": "yes"
"type": "multiplechoice",
"question_name": "achoice",
"question_description": "Need one of these",
"variable": "single_choice",
"choices": ["one", "two"],
"min": "",
"max": "",
"required": false,
"default": "yes"
},
{
"type": "multiselect",
"question_name": "mchoice",
"question_description": "Can have multiples of these",
"variable": "multi_choice",
"choices": ["one", "two", "three"],
"min": "",
"max": "",
"required": false,
"default": "yes"
"type": "multiselect",
"question_name": "mchoice",
"question_description": "Can have multiples of these",
"variable": "multi_choice",
"choices": ["one", "two", "three"],
"min": "",
"max": "",
"required": false,
"default": "yes"
},
{
"type": "integer",
@ -398,17 +398,15 @@ class BaseJobTestMixin(BaseTestMixin):
# Operations is divided into teams to work on the east/west servers.
# Greg and Holly work on east, Greg and iris work on west.
self.team_ops_east = self.org_ops.teams.create(
name='easterners',
created_by=self.user_sue,
)
name='easterners',
created_by=self.user_sue)
self.team_ops_east.projects.add(self.proj_prod)
self.team_ops_east.projects.add(self.proj_prod_east)
self.team_ops_east.users.add(self.user_greg)
self.team_ops_east.users.add(self.user_holly)
self.team_ops_west = self.org_ops.teams.create(
name='westerners',
created_by=self.user_sue,
)
name='westerners',
created_by=self.user_sue)
self.team_ops_west.projects.add(self.proj_prod)
self.team_ops_west.projects.add(self.proj_prod_west)
self.team_ops_west.users.add(self.user_greg)
@ -463,7 +461,7 @@ class BaseJobTestMixin(BaseTestMixin):
self.cred_doug = self.user_doug.credentials.create(
username='doug',
password='doug doesn\'t mind his password being saved. this '
'is why we dont\'t let doug actually run jobs.',
'is why we dont\'t let doug actually run jobs.',
created_by=self.user_sue,
)
self.cred_eve = self.user_eve.credentials.create(

View File

@ -43,12 +43,11 @@ class LicenseTests(BaseTest):
def test_license_writer(self):
writer = TaskEngager(
company_name='acmecorp',
contact_name='Michael DeHaan',
contact_email='michael@ansibleworks.com',
license_date=25000, # seconds since epoch
instance_count=500
)
company_name='acmecorp',
contact_name='Michael DeHaan',
contact_email='michael@ansibleworks.com',
license_date=25000, # seconds since epoch
instance_count=500)
data = writer.get_data()
@ -69,12 +68,12 @@ class LicenseTests(BaseTest):
assert vdata['available_instances'] == 500
assert vdata['current_instances'] == 12
assert vdata['free_instances'] == 488
assert vdata['date_warning'] == True
assert vdata['date_expired'] == True
assert vdata['date_warning'] is True
assert vdata['date_expired'] is True
assert vdata['license_date'] == 25000
assert vdata['time_remaining'] < 0
assert vdata['valid_key'] == True
assert vdata['compliant'] == False
assert vdata['valid_key'] is True
assert vdata['compliant'] is False
def test_expired_licenses(self):
reader = TaskSerializer()
@ -88,7 +87,7 @@ class LicenseTests(BaseTest):
strdata = writer.get_string()
vdata = reader.from_string(strdata)
assert vdata['compliant'] == False
assert vdata['compliant'] is False
assert vdata['grace_period_remaining'] < 0
writer = TaskEngager(
@ -101,7 +100,7 @@ class LicenseTests(BaseTest):
strdata = writer.get_string()
vdata = reader.from_string(strdata)
assert vdata['compliant'] == False
assert vdata['compliant'] is False
assert vdata['grace_period_remaining'] < 0
writer = TaskEngager(
@ -114,5 +113,5 @@ class LicenseTests(BaseTest):
strdata = writer.get_string()
vdata = reader.from_string(strdata)
assert vdata['compliant'] == False
assert vdata['compliant'] is False
assert vdata['grace_period_remaining'] > 0

View File

@ -109,7 +109,7 @@ class OrganizationsTest(BaseTest):
# make sure super user can fetch records
data = self.get(urls[0], expect=200, auth=self.get_super_credentials())
[self.assertTrue(key in data) for key in ['name', 'description', 'url' ]]
[self.assertTrue(key in data) for key in ['name', 'description', 'url']]
# make sure invalid user cannot
data = self.get(urls[0], expect=401, auth=self.get_invalid_credentials())

View File

@ -420,7 +420,7 @@ class ProjectsTest(BaseTransactionTest):
for x in all_users['results']:
self.post(team_users, data=x, expect=403, auth=self.get_nobody_credentials())
self.post(team_users, data=dict(x, is_superuser=False),
expect=204, auth=self.get_normal_credentials())
expect=204, auth=self.get_normal_credentials())
# The normal admin user can't create a super user vicariously through the team/project
self.post(team_users, data=dict(username='attempted_superuser_create', is_superuser=True),
expect=403, auth=self.get_normal_credentials())
@ -932,6 +932,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
# FIXME: Add some invalid URLs.
]
def is_exception(e):
return bool(isinstance(e, Exception) or
(isinstance(e, type) and issubclass(e, Exception)))
@ -951,15 +952,15 @@ class ProjectUpdatesTest(BaseTransactionTest):
url, username='testuser')
else:
updated_url = update_scm_url(scm_type, url,
username='testuser')
username='testuser')
self.assertEqual(new_url_u, updated_url)
if is_exception(new_url_up):
self.assertRaises(new_url_up, update_scm_url, scm_type,
url, username='testuser', password='testpass')
else:
updated_url = update_scm_url(scm_type, url,
username='testuser',
password='testpass')
username='testuser',
password='testpass')
self.assertEqual(new_url_up, updated_url)
def is_public_key_in_authorized_keys(self):
@ -1175,7 +1176,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
project.scm_password = 'not a\\ valid\' "password'
project.save()
if project.scm_type == 'svn':
self.check_project_update(project, should_fail=True)#should_still_fail)
self.check_project_update(project, should_fail=True) # should_still_fail)
else:
self.check_project_update(project, should_fail=should_still_fail)
# Test that we can delete project updates.
@ -1300,7 +1301,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_url = getattr(settings, 'TEST_GIT_PRIVATE_SSH', '')
scm_key_data = getattr(settings, 'TEST_GIT_KEY_DATA', '')
scm_username = getattr(settings, 'TEST_GIT_USERNAME', '')
scm_password = 'blahblahblah'#getattr(settings, 'TEST_GIT_PASSWORD', '')
scm_password = 'blahblahblah' # getattr(settings, 'TEST_GIT_PASSWORD', '')
if not all([scm_url, scm_key_data, scm_username, scm_password]):
self.skipTest('no private git repo defined for ssh!')
project = self.create_project(
@ -1320,8 +1321,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_password=scm_password,
)
should_error = bool('github.com' in scm_url and scm_username != 'git')
self.check_project_update(project2, should_fail=None)#,
#should_error=should_error)
self.check_project_update(project2, should_fail=None) # , should_error=should_error)
def test_scm_key_unlock_on_project_update(self):
scm_url = 'git@github.com:ansible/ansible.github.com.git'
@ -1611,40 +1611,39 @@ class ProjectUpdatesTest(BaseTransactionTest):
# TODO: We need to test this another way due to concurrency conflicts
# Will add some tests for the task runner system
def _test_update_on_launch(self):
scm_url = getattr(settings, 'TEST_GIT_PUBLIC_HTTPS',
'https://github.com/ansible/ansible.github.com.git')
if not all([scm_url]):
self.skipTest('no public git repo defined for https!')
self.organization = self.make_organizations(self.super_django_user, 1)[0]
self.inventory = Inventory.objects.create(name='test-inventory',
description='description for test-inventory',
organization=self.organization)
self.host = self.inventory.hosts.create(name='host.example.com',
inventory=self.inventory)
self.group = self.inventory.groups.create(name='test-group',
inventory=self.inventory)
self.group.hosts.add(self.host)
self.credential = Credential.objects.create(name='test-creds',
user=self.super_django_user)
self.project = self.create_project(
name='my public git project over https',
scm_type='git',
scm_url=scm_url,
scm_update_on_launch=True,
)
# First update triggered by saving a new project with SCM.
self.assertEqual(self.project.project_updates.count(), 1)
self.check_project_update(self.project)
self.assertEqual(self.project.project_updates.count(), 2)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.signal_start())
time.sleep(10) # Need some time to wait for the dependency to run
job = Job.objects.get(pk=job.pk)
self.assertTrue(job.status in ('successful', 'failed'))
self.assertEqual(self.project.project_updates.count(), 3)
scm_url = getattr(settings, 'TEST_GIT_PUBLIC_HTTPS',
'https://github.com/ansible/ansible.github.com.git')
if not all([scm_url]):
self.skipTest('no public git repo defined for https!')
self.organization = self.make_organizations(self.super_django_user, 1)[0]
self.inventory = Inventory.objects.create(name='test-inventory',
description='description for test-inventory',
organization=self.organization)
self.host = self.inventory.hosts.create(name='host.example.com',
inventory=self.inventory)
self.group = self.inventory.groups.create(name='test-group',
inventory=self.inventory)
self.group.hosts.add(self.host)
self.credential = Credential.objects.create(name='test-creds',
user=self.super_django_user)
self.project = self.create_project(
name='my public git project over https',
scm_type='git',
scm_url=scm_url,
scm_update_on_launch=True)
# First update triggered by saving a new project with SCM.
self.assertEqual(self.project.project_updates.count(), 1)
self.check_project_update(self.project)
self.assertEqual(self.project.project_updates.count(), 2)
job_template = self.create_test_job_template()
job = self.create_test_job(job_template=job_template)
self.assertEqual(job.status, 'new')
self.assertFalse(job.passwords_needed_to_start)
self.assertTrue(job.signal_start())
time.sleep(10) # Need some time to wait for the dependency to run
job = Job.objects.get(pk=job.pk)
self.assertTrue(job.status in ('successful', 'failed'))
self.assertEqual(self.project.project_updates.count(), 3)
def _test_update_on_launch_with_project_passwords(self):
scm_url = getattr(settings, 'TEST_GIT_PRIVATE_HTTPS', '')

View File

@ -36,12 +36,11 @@ GOOD_SCHEDULES = ["DTSTART:20500331T055000Z RRULE:FREQ=MINUTELY;INTERVAL=10;COUN
"DTSTART:20140331T075000Z RRULE:FREQ=YEARLY;INTERVAL=1;BYSETPOS=-1;BYMONTH=8;BYDAY=SU",
"DTSTART:20140331T075000Z RRULE:FREQ=WEEKLY;INTERVAL=1;UNTIL=20230401T075000Z;BYDAY=MO,WE,FR",
"DTSTART:20140331T075000Z RRULE:FREQ=HOURLY;INTERVAL=1;UNTIL=20230610T075000Z",
"DTSTART:20140411T040000Z RRULE:FREQ=WEEKLY;INTERVAL=1;UNTIL=20140411T040000Z;BYDAY=WE",
]
"DTSTART:20140411T040000Z RRULE:FREQ=WEEKLY;INTERVAL=1;UNTIL=20140411T040000Z;BYDAY=WE"]
BAD_SCHEDULES = ["", "DTSTART:20140331T055000 RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"DTSTART:20240331T075000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=10000000",
"DTSTART:20240331T075000Z RRULE:FREQ=DAILY;INTERVAL=1;COUNT=10000000",
"DTSTART;TZID=US-Eastern:19961105T090000 RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5",
"DTSTART:20140331T055000Z RRULE:FREQ=SECONDLY;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=SECONDLY",
@ -51,8 +50,7 @@ BAD_SCHEDULES = ["", "DTSTART:20140331T055000 RRULE:FREQ=MINUTELY;INTERVAL=10;CO
"DTSTART:20140331T055000Z RRULE:FREQ=YEARLY;BYYEARDAY=120;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=YEARLY;BYWEEKNO=10;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=HOURLY;INTERVAL=1 DTSTART:20140331T055000Z RRULE:FREQ=HOURLY;INTERVAL=1",
"DTSTART:20140331T055000Z RRULE:FREQ=HOURLY;INTERVAL=1 RRULE:FREQ=HOURLY;INTERVAL=1",
]
"DTSTART:20140331T055000Z RRULE:FREQ=HOURLY;INTERVAL=1 RRULE:FREQ=HOURLY;INTERVAL=1"]
class ScheduleTest(BaseTest):
def setUp(self):

View File

@ -52,7 +52,7 @@ class BaseScriptTest(BaseLiveServerTest):
pargs = [name]
for k,v in options.items():
pargs.append('%s%s' % ('-' if len(k) == 1 else '--', k))
if not v is True:
if v is not True:
pargs.append(str(v))
for arg in args:
pargs.append(str(arg))
@ -176,7 +176,7 @@ class InventoryScriptTest(BaseScriptTest):
hostnames = hosts.values_list('name', flat=True)
self.assertEqual(set(v['hosts']), set(hostnames))
else:
self.assertTrue(v['hosts'] == [ 'localhost' ])
self.assertTrue(v['hosts'] == ['localhost'])
for group in inventory.groups.filter(active=False):
self.assertFalse(group.name in data.keys(),

View File

@ -471,8 +471,8 @@ class RunJobTest(BaseCeleryTest):
self.assertTrue(job.result_stdout)
else:
self.assertTrue(job.result_stdout in ('', 'stdout capture is missing'),
u'expected no stdout, got:\n%s' %
job.result_stdout)
u'expected no stdout, got:\n%s' %
job.result_stdout)
if expect_traceback:
self.assertTrue(job.result_traceback)
else:
@ -580,7 +580,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(evt.host, self.host)
self.assertTrue(evt.play, evt)
self.assertTrue(evt.task, evt)
self.assertEqual(evt.failed, False)#should_be_failed)
self.assertEqual(evt.failed, False) # should_be_failed)
if getattr(settings, 'CAPTURE_JOB_EVENT_HOSTS', False):
self.assertEqual(set(evt.hosts.values_list('pk', flat=True)),
host_pks)

View File

@ -59,7 +59,7 @@ class AuthTokenProxyTest(BaseTest):
def _request_auth_token(self, remote_addr):
auth_token_url = reverse('api:auth_token_view')
client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr }
client_kwargs = {'HTTP_X_FORWARDED_FOR': remote_addr}
# Request a new auth token from the remote address specified via 'HTTP_X_FORWARDED_FOR'
data = dict(zip(('username', 'password'), self.get_super_credentials()))
@ -79,7 +79,7 @@ class AuthTokenProxyTest(BaseTest):
auth_token = self._request_auth_token(remote_addr)
# Verify we can access our own user information, from the remote address specified via HTTP_X_FORWARDED_FOR
client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr }
client_kwargs = {'HTTP_X_FORWARDED_FOR': remote_addr}
response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
self.check_me_is_admin(response)
@ -94,7 +94,7 @@ class AuthTokenProxyTest(BaseTest):
auth_token = self._request_auth_token(remote_addr)
# Verify we can access our own user information, from the remote address specified via HTTP_X_FORWARDED_FOR
client_kwargs = { 'HTTP_X_FORWARDED_FOR': remote_addr_diff }
client_kwargs = {'HTTP_X_FORWARDED_FOR': remote_addr_diff}
response = self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr_diff)
@ -104,7 +104,7 @@ class AuthTokenProxyTest(BaseTest):
auth_token = self._request_auth_token(remote_addr)
client_kwargs = { 'HTTP_X_FORARDED_FOR': '' }
client_kwargs = {'HTTP_X_FORARDED_FOR': ''}
response = self._get_me(expect=200, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
self.check_me_is_admin(response)
@ -810,14 +810,14 @@ class UsersTest(BaseTest):
# Check list view with page size of 1, remaining pages.
qs = base_qs.order_by('username')
for n in xrange(1, base_qs.count()):
url = '%s?order_by=username&page_size=1&page=%d' % (base_url, n+1)
url = '%s?order_by=username&page_size=1&page=%d' % (base_url, n + 1)
self.check_get_list(url, self.super_django_user, qs,
check_order=True, offset=n, limit=1)
# Check list view with page size of 2.
qs = base_qs.order_by('username')
for n in xrange(0, base_qs.count(), 2):
url = '%s?order_by=username&page_size=2&page=%d' % (base_url, (n/2)+1)
url = '%s?order_by=username&page_size=2&page=%d' % (base_url, (n / 2) + 1)
self.check_get_list(url, self.super_django_user, qs,
check_order=True, offset=n, limit=2)

View File

@ -14,4 +14,4 @@
# W391 - Blank line at end of file
# W293 - Blank line contains whitespace
ignore=E201,E203,E221,E225,E231,E241,E251,E261,E265,E302,E303,E501,W291,W391,W293
exclude=awx/lib/site-packages,awx/ui,awx/api/urls.py,awx/main/migrations
exclude=awx/lib/site-packages,awx/ui,awx/api/urls.py,awx/main/migrations,awx/main/tests/data