Fixing flake8 errors, this should be almost all of them!

This commit is contained in:
Matthew Jones 2015-02-11 16:42:55 -05:00
parent 59d824a643
commit 6a18a50d99
38 changed files with 252 additions and 407 deletions

View File

@ -1,12 +1,12 @@
# Copyright (c) 2014 AnsibleWorks, Inc.
# All Rights Reserved.
__version__ = '2.2.0'
import os
import sys
import warnings
__version__ = '2.2.0'
__all__ = ['__version__']
# Check for the presence/absence of "devonly" module to determine if running

View File

@ -7,11 +7,8 @@ import logging
from optparse import make_option
# Django
from django.core.management.base import NoArgsCommand, CommandError
from django.db import transaction
from django.contrib.auth.models import User
from django.utils.dateparse import parse_datetime
from django.utils.timezone import now, is_aware, make_aware
from django.core.management.base import NoArgsCommand
from django.utils.timezone import now
# AWX
from awx.main.models import ActivityStream

View File

@ -7,14 +7,14 @@ import logging
from optparse import make_option
# Django
from django.core.management.base import BaseCommand, CommandError
from django.core.management.base import BaseCommand
from django.db import transaction
from django.contrib.auth.models import User
from django.utils.dateparse import parse_datetime
from django.utils.timezone import now, is_aware, make_aware
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
class Command(BaseCommand):
'''

View File

@ -9,9 +9,7 @@ from optparse import make_option
# Django
from django.core.management.base import NoArgsCommand, CommandError
from django.db import transaction
from django.contrib.auth.models import User
from django.utils.dateparse import parse_datetime
from django.utils.timezone import now, is_aware, make_aware
from django.utils.timezone import now
# AWX
from awx.main.models import Job, ProjectUpdate, InventoryUpdate, SystemJob
@ -131,7 +129,7 @@ class Command(NoArgsCommand):
self.dry_run = bool(options.get('dry_run', False))
try:
self.cutoff = now() - datetime.timedelta(days=self.days)
except OverflowError as e:
except OverflowError:
raise CommandError('--days specified is too large. Try something less than 99999 (about 270 years).')
self.only_jobs = bool(options.get('only_jobs', False))
self.only_project_updates = bool(options.get('only_project_updates', False))

View File

@ -22,12 +22,10 @@ import yaml
from django.conf import settings
from django.core.management.base import NoArgsCommand, CommandError
from django.db import connection, transaction
from django.db.models import Q
from django.contrib.auth.models import User
# AWX
from awx.main.models import *
from awx.main.utils import ignore_inventory_computed_fields, check_proot_installed, build_proot_temp_dir, wrap_args_with_proot
from awx.main.models import * # noqa
from awx.main.utils import ignore_inventory_computed_fields, check_proot_installed, wrap_args_with_proot
from awx.main.signals import disable_activity_stream
from awx.main.task_engine import TaskSerializer as LicenseReader

View File

@ -1,22 +1,18 @@
# Copyright (c) 2014 Ansible, Inc.
# All Rights Reserved
from optparse import make_option
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from awx.main.management.commands._base_instance import BaseCommandInstance
instance_str = BaseCommandInstance.instance_str
from awx.main.models import Instance
instance_str = BaseCommandInstance.instance_str
class Command(BaseCommandInstance):
"""List instances from the Tower database
"""
def handle(self, **options):
super(Command, self).__init__()
for instance in Instance.objects.all():
print("uuid: %s; hostname: %s; primary: %s; created: %s; modified: %s" %
(instance.uuid, instance.hostname, instance.primary, instance.created, instance.modified))

View File

@ -1,14 +1,12 @@
# Copyright (c) 2014 Ansible, Inc.
# All Rights Reserved
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.core.management.base import CommandError
from awx.main.management.commands._base_instance import BaseCommandInstance
instance_str = BaseCommandInstance.instance_str
from awx.main.models import Instance
instance_str = BaseCommandInstance.instance_str
class Command(BaseCommandInstance):
"""Internal tower command.

View File

@ -1,15 +1,12 @@
# Copyright (c) 2014 Ansible, Inc.
# All Rights Reserved
from optparse import make_option
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.core.management.base import CommandError
from awx.main.management.commands._base_instance import BaseCommandInstance
instance_str = BaseCommandInstance.instance_str
from awx.main.models import Instance
instance_str = BaseCommandInstance.instance_str
class Command(BaseCommandInstance):
"""Internal tower command.

View File

@ -6,25 +6,21 @@ import os
import sys
import datetime
import logging
import json
import signal
import time
from contextlib import closing
from optparse import make_option
from multiprocessing import Process, Queue
# Django
from django.conf import settings
from django.core.management.base import NoArgsCommand, CommandError
from django.core.management.base import NoArgsCommand
from django.db import transaction, DatabaseError
from django.contrib.auth.models import User
from django.utils.dateparse import parse_datetime
from django.utils.timezone import now, is_aware, make_aware
from django.utils.timezone import now
from django.utils.tzinfo import FixedOffset
from django.db import connection
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.socket import Socket
MAX_REQUESTS = 10000
@ -46,7 +42,8 @@ class CallbackReceiver(object):
active_worker.terminate()
signal.signal(signum, signal.SIG_DFL)
os.kill(os.getpid(), signum) # Rethrow signal, this time without catching it
except Exception, e:
except Exception:
# TODO: LOG
pass
return _handler
@ -99,7 +96,6 @@ class CallbackReceiver(object):
time.sleep(0.1)
def callback_handler(self, use_workers, worker_queues):
message_number = 0
total_messages = 0
last_parent_events = {}

View File

@ -3,31 +3,22 @@
# Python
import os
import datetime
import logging
import json
import signal
import time
import urllib
from optparse import make_option
from threading import Thread
# Django
from django.conf import settings
from django.core.management.base import NoArgsCommand, CommandError
from django.db import transaction, DatabaseError
from django.contrib.auth.models import User
from django.utils.dateparse import parse_datetime
from django.utils.timezone import now, is_aware, make_aware
from django.utils.tzinfo import FixedOffset
from django.core.management.base import NoArgsCommand
from django.utils.timezone import now
# AWX
import awx
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.socket import Socket
# gevent & socketio
import gevent
# socketio
from socketio import socketio_manage
from socketio.server import SocketIOServer
from socketio.namespace import BaseNamespace
@ -164,7 +155,6 @@ class Command(NoArgsCommand):
self.verbosity = int(options.get('verbosity', 1))
self.init_logging()
socketio_listen_port = settings.SOCKETIO_LISTEN_PORT
socketio_notification_port = settings.SOCKETIO_NOTIFICATION_PORT
try:
if os.path.exists('/etc/tower/tower.cert') and os.path.exists('/etc/tower/tower.key'):
@ -175,7 +165,6 @@ class Command(NoArgsCommand):
print 'Listening on port http://0.0.0.0:' + str(socketio_listen_port)
server = SocketIOServer(('0.0.0.0', socketio_listen_port), TowerSocket(), resource='socket.io')
#gevent.spawn(notification_handler, socketio_notification_port, server)
handler_thread = Thread(target=notification_handler, args=(server,))
handler_thread.daemon = True
handler_thread.start()

View File

@ -5,26 +5,19 @@
import os
import datetime
import logging
import json
import signal
import time
from optparse import make_option
from multiprocessing import Process
# Django
from django.conf import settings
from django.core.management.base import NoArgsCommand, CommandError
from django.db import transaction, DatabaseError
from django.contrib.auth.models import User
from django.utils.dateparse import parse_datetime
from django.utils.timezone import now, is_aware, make_aware
from django.utils.tzinfo import FixedOffset
from django.core.management.base import NoArgsCommand
from django.utils.timezone import now
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.queue import FifoQueue
from awx.main.tasks import handle_work_error
from awx.main.utils import get_system_task_capacity, decrypt_field
from awx.main.utils import get_system_task_capacity
# Celery
from celery.task.control import inspect
@ -260,9 +253,9 @@ def process_graph(graph, task_capacity):
print_log("Ready Nodes: %s" % str(ready_nodes))
for task_node in ready_nodes:
node_obj = task_node['node_object']
node_args = task_node['metadata']
# NOTE: This could be used to pass metadata through the task system
# node_args = task_node['metadata']
impact = node_obj.task_impact
node_is_job = graph.get_node_type(node_obj) == 'job'
if impact <= remaining_volume or running_impact == 0:
node_dependencies = graph.get_dependents(node_obj)
# Allow other tasks to continue if a job fails, even if they are

View File

@ -4,16 +4,10 @@
from optparse import make_option
# Django
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, DatabaseError
from django.contrib.auth.models import User
from django.utils.dateparse import parse_datetime
from django.utils.timezone import now, is_aware, make_aware
from django.utils.tzinfo import FixedOffset
from django.core.management.base import BaseCommand
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
class Command(BaseCommand):
'''

View File

@ -1,17 +1,15 @@
# Copyright (c) 2014 Ansible, Inc.
# All Rights Reserved
from optparse import make_option
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.core.management.base import CommandError
from django.db import transaction
from awx.main.management.commands._base_instance import BaseCommandInstance
instance_str = BaseCommandInstance.instance_str
from awx.main.models import Instance
instance_str = BaseCommandInstance.instance_str
class Command(BaseCommandInstance):
"""Set an already registered instance to primary or secondary for HA
tracking.
@ -38,7 +36,6 @@ class Command(BaseCommandInstance):
# Is there an existing record for this machine? If so, retrieve that record and look for issues.
try:
instance = Instance.objects.get(**self.get_unique_fields())
existing = True
except Instance.DoesNotExist:
raise CommandError('No matching instance found to update.')

View File

@ -1,7 +1,6 @@
# Copyright (c) 2014 Ansible, Inc.
# All Rights Reserved
import sys
from django.contrib.auth.models import User
from django.core.management.base import BaseCommand, CommandError

View File

@ -2,19 +2,19 @@
# All Rights Reserved.
# Django
from django.conf import settings
from django.conf import settings # noqa
# AWX
from awx.main.models.base import *
from awx.main.models.unified_jobs import *
from awx.main.models.organization import *
from awx.main.models.credential import *
from awx.main.models.projects import *
from awx.main.models.inventory import *
from awx.main.models.jobs import *
from awx.main.models.schedules import *
from awx.main.models.activity_stream import *
from awx.main.models.ha import *
from awx.main.models.base import * # noqa
from awx.main.models.unified_jobs import * # noqa
from awx.main.models.organization import * # noqa
from awx.main.models.credential import * # noqa
from awx.main.models.projects import * # noqa
from awx.main.models.inventory import * # noqa
from awx.main.models.jobs import * # noqa
from awx.main.models.schedules import * # noqa
from awx.main.models.activity_stream import * # noqa
from awx.main.models.ha import * # noqa
# Monkeypatch Django serializer to ignore django-taggit fields (which break
# the dumpdata command; see https://github.com/alex/django-taggit/issues/155).
@ -29,15 +29,15 @@ def _new_handle_m2m_field(self, obj, field):
_PythonSerializer.handle_m2m_field = _new_handle_m2m_field
# Add custom methods to User model for permissions checks.
from django.contrib.auth.models import User
from awx.main.access import *
from django.contrib.auth.models import User # noqa
from awx.main.access import * # noqa
User.add_to_class('get_queryset', get_user_queryset)
User.add_to_class('can_access', check_user_access)
# Import signal handlers only after models have been defined.
import awx.main.signals
import awx.main.signals # noqa
from awx.main.registrar import activity_stream_registrar
from awx.main.registrar import activity_stream_registrar # noqa
activity_stream_registrar.connect(Organization)
activity_stream_registrar.connect(Inventory)
activity_stream_registrar.connect(Host)

View File

@ -2,7 +2,6 @@
# All Rights Reserved.
# Django
from django.conf import settings
from django.db import models
from django.core.urlresolvers import reverse
from django.utils.translation import ugettext_lazy as _

View File

@ -9,25 +9,14 @@ import shlex
import yaml
# Django
from django.conf import settings
from django.db import models
from django.db.models import signals
from django.core.exceptions import ValidationError
from django.utils.translation import ugettext_lazy as _
from django.utils.timezone import now
# Django-JSONField
from jsonfield import JSONField
# Django-Polymorphic
from polymorphic import PolymorphicModel
# Django-Taggit
from taggit.managers import TaggableManager
# Django-Celery
from djcelery.models import TaskMeta
# Django-CRUM
from crum import get_current_user

View File

@ -6,17 +6,15 @@ import base64
import re
# Django
from django.conf import settings
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError, NON_FIELD_ERRORS
from django.core.urlresolvers import reverse
# AWX
from awx.main import storage
from awx.main.constants import CLOUD_PROVIDERS
from awx.main.utils import decrypt_field
from awx.main.models.base import *
from awx.main.models.base import * # noqa
__all__ = ['Credential']

View File

@ -3,37 +3,27 @@
# Python
import datetime
import hashlib
import hmac
import json
import logging
import os
import re
import shlex
import uuid
import copy
import random
# Django
from django.conf import settings
from django.db import models, connection
from django.db.models import Q
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.db import transaction
from django.core.exceptions import ValidationError, NON_FIELD_ERRORS
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
from django.contrib.auth.models import User
from django.utils.timezone import now, make_aware, get_default_timezone
from django.core.cache import cache
from django.utils.timezone import now
# AWX
from awx.main.constants import CLOUD_PROVIDERS
from awx.main.fields import AutoOneToOneField
from awx.main.managers import HostManager
from awx.main.models.base import *
from awx.main.models.base import * # noqa
from awx.main.models.jobs import Job
from awx.main.models.unified_jobs import *
from awx.main.utils import encrypt_field, ignore_inventory_computed_fields, _inventory_updates
from awx.main.models.unified_jobs import * # noqa
from awx.main.utils import ignore_inventory_computed_fields, _inventory_updates
__all__ = ['Inventory', 'Host', 'Group', 'InventorySource', 'InventoryUpdate', 'CustomInventoryScript']
@ -220,7 +210,7 @@ class Inventory(CommonModel):
group_hosts_map = self.get_group_hosts_map(active=True)
active_host_pks = set(self.hosts.filter(active=True).values_list('pk', flat=True))
failed_host_pks = set(self.hosts.filter(active=True, last_job_host_summary__job__active=True, last_job_host_summary__failed=True).values_list('pk', flat=True))
active_group_pks = set(self.groups.filter(active=True).values_list('pk', flat=True))
# active_group_pks = set(self.groups.filter(active=True).values_list('pk', flat=True))
failed_group_pks = set() # Update below as we check each group.
groups_with_cloud_pks = set(self.groups.filter(active=True, inventory_sources__active=True, inventory_sources__source__in=CLOUD_INVENTORY_SOURCES).values_list('pk', flat=True))
groups_to_update = {}
@ -539,7 +529,7 @@ class Group(CommonModelNameNotUnique):
@transaction.atomic
def mark_inactive_recursive(self):
from awx.main.tasks import update_inventory_computed_fields, bulk_inventory_element_delete
from awx.main.tasks import bulk_inventory_element_delete
from awx.main.utils import ignore_inventory_computed_fields
from awx.main.signals import disable_activity_stream

View File

@ -2,43 +2,28 @@
# All Rights Reserved.
# Python
import datetime
import hashlib
import hmac
import json
import logging
import os
import re
import shlex
import uuid
# Django
from django.conf import settings
from django.db import models
from django.db.models import Q
from django.db import transaction
from django.utils.translation import ugettext_lazy as _
from django.core.exceptions import ValidationError, NON_FIELD_ERRORS
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
from django.contrib.auth.models import User
from django.utils.timezone import now, make_aware, get_default_timezone
# Django-JSONField
from jsonfield import JSONField
# Django-Polymorphic
from polymorphic import PolymorphicModel
# AWX
from awx.main.constants import CLOUD_PROVIDERS
from awx.main.models.base import *
from awx.main.models.unified_jobs import *
from awx.main.utils import encrypt_field, decrypt_field, ignore_inventory_computed_fields
from awx.main.models.base import * # noqa
from awx.main.models.unified_jobs import * # noqa
from awx.main.utils import decrypt_field, ignore_inventory_computed_fields
from awx.main.utils import emit_websocket_notification
# Celery
from celery import chain
logger = logging.getLogger('awx.main.models.jobs')
__all__ = ['JobTemplate', 'Job', 'JobHostSummary', 'JobEvent', 'SystemJobOptions', 'SystemJobTemplate', 'SystemJob']
@ -434,7 +419,7 @@ class Job(UnifiedJob, JobOptions):
# running this job (via callback inventory refresh).
try:
start_args = json.loads(decrypt_field(self, 'start_args'))
except Exception, e:
except Exception:
start_args = None
start_args = start_args or {}
inventory_sources_already_updated = start_args.get('inventory_sources_already_updated', [])

View File

@ -10,14 +10,13 @@ import uuid
# Django
from django.conf import settings
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.core.urlresolvers import reverse
from django.contrib.auth.models import User
from django.utils.timezone import now
# AWX
from awx.main.fields import AutoOneToOneField
from awx.main.models.base import *
from awx.main.models.base import * # noqa
__all__ = ['Organization', 'Team', 'Permission', 'Profile', 'AuthToken']

View File

@ -3,34 +3,25 @@
# Python
import datetime
import hashlib
import hmac
import json
import logging
import os
import re
import shlex
import urlparse
import uuid
# Django
from django.conf import settings
from django.db import models
from django.db.models import CASCADE, SET_NULL, PROTECT
from django.utils.translation import ugettext_lazy as _
from django.utils.encoding import smart_str
from django.core.exceptions import ValidationError, NON_FIELD_ERRORS
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
from django.contrib.auth.models import User
from django.utils.timezone import now, make_aware, get_default_timezone
# AWX
from awx.lib.compat import slugify
from awx.main.models.base import *
from awx.main.models.base import * # noqa
from awx.main.models.jobs import Job
from awx.main.models.unified_jobs import *
from awx.main.models.unified_jobs import * # noqa
from awx.main.utils import update_scm_url
from awx.main.utils import encrypt_field
__all__ = ['Project', 'ProjectUpdate']

View File

@ -15,7 +15,7 @@ from django.utils.timezone import now, make_aware, get_default_timezone
from jsonfield import JSONField
# AWX
from awx.main.models.base import *
from awx.main.models.base import * # noqa
from awx.main.utils import ignore_inventory_computed_fields, emit_websocket_notification
from django.core.urlresolvers import reverse

View File

@ -6,7 +6,6 @@ import codecs
import json
import logging
import re
import shlex
import os
import os.path
from StringIO import StringIO
@ -14,9 +13,7 @@ from StringIO import StringIO
# Django
from django.conf import settings
from django.db import models
from django.db import transaction
from django.core.exceptions import ValidationError, NON_FIELD_ERRORS
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import NON_FIELD_ERRORS
from django.utils.datastructures import SortedDict
from django.utils.translation import ugettext_lazy as _
from django.utils.timezone import now
@ -31,9 +28,9 @@ from polymorphic import PolymorphicModel
from djcelery.models import TaskMeta
# AWX
from awx.main.models.base import *
from awx.main.models.base import * # noqa
from awx.main.models.schedules import Schedule
from awx.main.utils import decrypt_field, get_type_for_model, emit_websocket_notification, _inventory_updates
from awx.main.utils import decrypt_field, emit_websocket_notification, _inventory_updates
from awx.main.redact import UriCleaner
__all__ = ['UnifiedJobTemplate', 'UnifiedJob']
@ -597,7 +594,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
if self.result_stdout_file != "":
try:
os.remove(self.result_stdout_file)
except Exception, e:
except Exception:
pass
super(UnifiedJob, self).delete()
@ -713,7 +710,7 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
needed = self.get_passwords_needed_to_start()
try:
start_args = json.loads(decrypt_field(self, 'start_args'))
except Exception, e:
except Exception:
start_args = None
if start_args in (None, ''):
start_args = kwargs
@ -756,10 +753,10 @@ class UnifiedJob(PolymorphicModel, PasswordFieldsModel, CommonModelNameNotUnique
# Each type of unified job has a different Task class; get the
# appropirate one.
task_type = get_type_for_model(self)
# task_type = get_type_for_model(self)
# Actually tell the task runner to run this task.
# NOTE: This will deadlock the task runner
# FIXME: This will deadlock the task runner
#from awx.main.tasks import notify_task_runner
#notify_task_runner.delay({'id': self.id, 'metadata': kwargs,
# 'task_type': task_type})

View File

@ -1,16 +1,16 @@
# Copyright (c) 2014 AnsibleWorks, Inc.
# All Rights Reserved.
from awx.main.tests.organizations import OrganizationsTest
from awx.main.tests.users import *
from awx.main.tests.inventory import *
from awx.main.tests.projects import ProjectsTest, ProjectUpdatesTest
from awx.main.tests.commands import *
from awx.main.tests.scripts import *
from awx.main.tests.tasks import RunJobTest
from awx.main.tests.licenses import LicenseTests
from awx.main.tests.jobs import *
from awx.main.tests.activity_stream import *
from awx.main.tests.schedules import *
from awx.main.tests.redact import *
from awx.main.tests.views import *
from awx.main.tests.organizations import OrganizationsTest # noqa
from awx.main.tests.users import * # noqa
from awx.main.tests.inventory import * # noqa
from awx.main.tests.projects import ProjectsTest, ProjectUpdatesTest # noqa
from awx.main.tests.commands import * # noqa
from awx.main.tests.scripts import * # noqa
from awx.main.tests.tasks import RunJobTest # noqa
from awx.main.tests.licenses import LicenseTests # noqa
from awx.main.tests.jobs import * # noqa
from awx.main.tests.activity_stream import * # noqa
from awx.main.tests.schedules import * # noqa
from awx.main.tests.redact import * # noqa
from awx.main.tests.views import * # noqa

View File

@ -2,21 +2,10 @@
# All Rights Reserved.
# Python
import contextlib
import datetime
import json
import os
import shutil
import tempfile
from django.contrib.auth.models import User
import django.test
from django.test.client import Client
from django.core.urlresolvers import reverse
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
class ActivityStreamTest(BaseTest):

View File

@ -4,7 +4,6 @@
# Python
import base64
import contextlib
import datetime
import json
import os
import random
@ -23,10 +22,9 @@ from django.conf import settings, UserSettingsHolder
from django.contrib.auth.models import User
import django.test
from django.test.client import Client
from django.test.utils import override_settings
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.backend import LDAPSettings
from awx.main.management.commands.run_callback_receiver import CallbackReceiver
from awx.main.management.commands.run_task_system import run_taskmanager

View File

@ -12,8 +12,6 @@ import tempfile
import time
import urlparse
import unittest
if not hasattr(unittest, 'skipIf'):
import unittest2 as unittest
# Django
import django
@ -25,9 +23,12 @@ from django.utils.timezone import now
from django.test.utils import override_settings
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest, BaseLiveServerTest
if not hasattr(unittest, 'skipIf'):
import unittest2 as unittest
__all__ = ['DumpDataTest', 'CleanupDeletedTest', 'CleanupJobsTest',
'InventoryImportTest']
@ -186,7 +187,7 @@ class DumpDataTest(BaseCommandMixin, BaseTest):
def test_dumpdata(self):
result, stdout, stderr = self.run_command('dumpdata')
self.assertEqual(result, None)
data = json.loads(stdout)
json.loads(stdout)
class CleanupDeletedTest(BaseCommandMixin, BaseTest):
'''

View File

@ -11,13 +11,12 @@ import tempfile
# Django
from django.conf import settings
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse
from django.test.utils import override_settings
from django.utils.timezone import now
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest, BaseTransactionTest
__all__ = ['InventoryTest', 'InventoryUpdatesTest']
@ -239,12 +238,12 @@ class InventoryTest(BaseTest):
# a user who is on a team who has read permissions on an inventory can
# see inventory records, but not delete.
with self.current_user(self.other_django_user):
data = self.get(url_b, expect=200)
self.get(url_b, expect=200)
self.delete(url_b, expect=403)
# an org admin can delete inventory records for his orgs only.
with self.current_user(self.normal_django_user):
data = self.get(url_a, expect=200)
self.get(url_a, expect=200)
self.delete(url_a, expect=204)
self.delete(url_b, expect=403)
@ -273,7 +272,7 @@ class InventoryTest(BaseTest):
temp_org.users.add(self.other_django_user)
temp_org.users.add(self.normal_django_user)
temp_inv = temp_org.inventories.create(name='Delete Org Inventory')
temp_group1 = temp_inv.groups.create(name='Delete Org Inventory Group')
temp_inv.groups.create(name='Delete Org Inventory Group')
temp_perm_read = Permission.objects.create(
inventory = temp_inv,
@ -281,7 +280,7 @@ class InventoryTest(BaseTest):
permission_type = 'read'
)
org_detail = reverse('api:organization_detail', args=(temp_org.pk,))
reverse('api:organization_detail', args=(temp_org.pk,))
inventory_detail = reverse('api:inventory_detail', args=(temp_inv.pk,))
permission_detail = reverse('api:permission_detail', args=(temp_perm_read.pk,))
@ -292,7 +291,7 @@ class InventoryTest(BaseTest):
def test_create_inventory_script(self):
inventory_scripts = reverse('api:inventory_script_list')
new_script = dict(name="Test", description="Test Script", script=TEST_SIMPLE_INVENTORY_SCRIPT, organization=self.organizations[0].id)
script_data = self.post(inventory_scripts, data=new_script, expect=201, auth=self.get_super_credentials())
self.post(inventory_scripts, data=new_script, expect=201, auth=self.get_super_credentials())
got = self.get(inventory_scripts, expect=200, auth=self.get_super_credentials())
self.assertEquals(got['count'], 1)
@ -305,11 +304,10 @@ class InventoryTest(BaseTest):
self.post(inventory_scripts, data=failed_no_shebang, expect=400, auth=self.get_super_credentials())
def test_main_line(self):
# some basic URLs...
inventories = reverse('api:inventory_list')
inventories_1 = reverse('api:inventory_detail', args=(self.inventory_a.pk,))
inventories_2 = reverse('api:inventory_detail', args=(self.inventory_b.pk,))
# some basic URLs...
reverse('api:inventory_list')
reverse('api:inventory_detail', args=(self.inventory_a.pk,))
reverse('api:inventory_detail', args=(self.inventory_b.pk,))
hosts = reverse('api:host_list')
groups = reverse('api:group_list')
self.create_test_license_file()
@ -328,22 +326,21 @@ class InventoryTest(BaseTest):
new_host_e = dict(name=u'asdf4.example.com:\u0162', inventory=inv.pk)
host_data0 = self.post(hosts, data=invalid, expect=400, auth=self.get_super_credentials())
host_data0 = self.post(hosts, data=new_host_a, expect=201, auth=self.get_super_credentials())
# Port should be split out into host variables.
host_a = Host.objects.get(pk=host_data0['id'])
self.assertEqual(host_a.name, u'asdf\u0162.example.com')
self.assertEqual(host_a.variables_dict, {'ansible_ssh_port': 1022})
# an org admin can add hosts (try first with invalid port #).
host_data1 = self.post(hosts, data=new_host_e, expect=400, auth=self.get_normal_credentials())
self.post(hosts, data=new_host_e, expect=400, auth=self.get_normal_credentials())
new_host_e['name'] = u'asdf4.example.com'
host_data1 = self.post(hosts, data=new_host_e, expect=201, auth=self.get_normal_credentials())
self.post(hosts, data=new_host_e, expect=201, auth=self.get_normal_credentials())
# a normal user cannot add hosts
host_data2 = self.post(hosts, data=new_host_b, expect=403, auth=self.get_nobody_credentials())
self.post(hosts, data=new_host_b, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions (on any inventory) can create hosts
edit_perm = Permission.objects.create(
Permission.objects.create(
user = self.other_django_user,
inventory = Inventory.objects.get(pk=inv.pk),
permission_type = PERM_INVENTORY_WRITE)
@ -355,7 +352,7 @@ class InventoryTest(BaseTest):
self.assertEqual(host_c.variables_dict, {'ansible_ssh_port': 2022, 'who': 'what?'})
# hostnames must be unique inside an organization
host_data4 = self.post(hosts, data=new_host_c, expect=400, auth=self.get_other_credentials())
self.post(hosts, data=new_host_c, expect=400, auth=self.get_other_credentials())
# Verify we can update host via PUT.
host_url3 = host_data3['url']
@ -363,7 +360,7 @@ class InventoryTest(BaseTest):
host_data3 = self.put(host_url3, data=host_data3, expect=200, auth=self.get_other_credentials())
self.assertEqual(Host.objects.get(id=host_data3['id']).variables, '')
self.assertEqual(Host.objects.get(id=host_data3['id']).variables_dict, {})
# Should reject invalid data.
host_data3['variables'] = 'foo: [bar'
self.put(host_url3, data=host_data3, expect=400, auth=self.get_other_credentials())
@ -373,7 +370,7 @@ class InventoryTest(BaseTest):
self.put(host_url3, data=host_data3, expect=200, auth=self.get_other_credentials())
self.assertEqual(Host.objects.get(id=host_data3['id']).variables, host_data3['variables'])
self.assertEqual(Host.objects.get(id=host_data3['id']).variables_dict, {'bad': 'monkey'})
host_data3['variables'] = '{"angry": "penguin"}'
self.put(host_url3, data=host_data3, expect=200, auth=self.get_other_credentials())
self.assertEqual(Host.objects.get(id=host_data3['id']).variables, host_data3['variables'])
@ -390,14 +387,14 @@ class InventoryTest(BaseTest):
new_group_e = dict(name='web6', inventory=inv.pk)
groups = reverse('api:group_list')
data0 = self.post(groups, data=invalid, expect=400, auth=self.get_super_credentials())
data0 = self.post(groups, data=new_group_a, expect=201, auth=self.get_super_credentials())
self.post(groups, data=invalid, expect=400, auth=self.get_super_credentials())
self.post(groups, data=new_group_a, expect=201, auth=self.get_super_credentials())
# an org admin can add groups
group_data1 = self.post(groups, data=new_group_e, expect=201, auth=self.get_normal_credentials())
self.post(groups, data=new_group_e, expect=201, auth=self.get_normal_credentials())
# a normal user cannot add groups
group_data2 = self.post(groups, data=new_group_b, expect=403, auth=self.get_nobody_credentials())
self.post(groups, data=new_group_b, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions (on any inventory) can create groups
# already done!
@ -405,23 +402,23 @@ class InventoryTest(BaseTest):
# user = self.other_django_user,
# inventory = Inventory.objects.get(pk=inv.pk),
# permission_type = PERM_INVENTORY_WRITE
#)
group_data3 = self.post(groups, data=new_group_c, expect=201, auth=self.get_other_credentials())
#)
self.post(groups, data=new_group_c, expect=201, auth=self.get_other_credentials())
# hostnames must be unique inside an organization
group_data4 = self.post(groups, data=new_group_c, expect=400, auth=self.get_other_credentials())
self.post(groups, data=new_group_c, expect=400, auth=self.get_other_credentials())
# Check that we don't allow creating reserved group names.
data = dict(name='all', inventory=inv.pk)
with self.current_user(self.super_django_user):
response = self.post(groups, data=data, expect=400)
self.post(groups, data=data, expect=400)
data = dict(name='_meta', inventory=inv.pk)
with self.current_user(self.super_django_user):
response = self.post(groups, data=data, expect=400)
self.post(groups, data=data, expect=400)
# A new group should not be able to be added a removed group
del_group = inv.groups.create(name='del')
undel_group = inv.groups.create(name='nondel')
inv.groups.create(name='nondel')
del_children_url = reverse('api:group_children_list', args=(del_group.pk,))
nondel_url = reverse('api:group_detail',
args=(Group.objects.get(name='nondel').pk,))
@ -432,7 +429,6 @@ class InventoryTest(BaseTest):
#################################################
# HOSTS->inventories POST via subcollection
url = reverse('api:inventory_hosts_list', args=(self.inventory_a.pk,))
new_host_a = dict(name='web100.example.com')
new_host_b = dict(name='web101.example.com')
@ -444,10 +440,10 @@ class InventoryTest(BaseTest):
added_by_collection_a = self.post(url, data=new_host_a, expect=201, auth=self.get_super_credentials())
# an org admin can associate hosts with inventories
added_by_collection_b = self.post(url, data=new_host_b, expect=201, auth=self.get_normal_credentials())
self.post(url, data=new_host_b, expect=201, auth=self.get_normal_credentials())
# a normal user cannot associate hosts with inventories
added_by_collection_c = self.post(url, data=new_host_c, expect=403, auth=self.get_nobody_credentials())
self.post(url, data=new_host_c, expect=403, auth=self.get_nobody_credentials())
# a normal user with edit permission on the inventory can associate hosts with inventories
url5 = reverse('api:inventory_hosts_list', args=(inv.pk,))
@ -455,7 +451,7 @@ class InventoryTest(BaseTest):
got = self.get(url5, expect=200, auth=self.get_other_credentials())
self.assertEquals(got['count'], 4)
# now remove the host from inventory (still keeps the record)
# now remove the host from inventory (still keeps the record)
added_by_collection_d['disassociate'] = 1
self.post(url5, data=added_by_collection_d, expect=204, auth=self.get_other_credentials())
got = self.get(url5, expect=200, auth=self.get_other_credentials())
@ -464,7 +460,7 @@ class InventoryTest(BaseTest):
##################################################
# GROUPS->inventories POST via subcollection
root_groups = reverse('api:inventory_root_groups_list', args=(self.inventory_a.pk,))
url = reverse('api:inventory_groups_list', args=(self.inventory_a.pk,))
@ -575,14 +571,14 @@ class InventoryTest(BaseTest):
# an org admin can associate variable objects with inventory
put = self.put(vdata_url, data=vars_b, expect=200, auth=self.get_normal_credentials())
# a normal user cannot associate variable objects with inventory
put = self.put(vdata_url, data=vars_b, expect=403, auth=self.get_nobody_credentials())
# a normal user with inventory edit permissions can associate variable objects with inventory
put = self.put(vdata_url, data=vars_c, expect=200, auth=self.get_normal_credentials())
self.assertEquals(put, vars_c)
# repeat but request variables in yaml
got = self.get(vdata_url, expect=200,
auth=self.get_normal_credentials(),
@ -604,10 +600,10 @@ class InventoryTest(BaseTest):
host2 = hosts[1]
host3 = hosts[2]
groups[0].hosts.add(host1)
groups[0].hosts.add(host3)
groups[0].hosts.add(host3)
groups[0].save()
# access
# access
url1 = reverse('api:group_hosts_list', args=(groups[0].pk,))
alt_group_hosts = reverse('api:group_hosts_list', args=(groups[1].pk,))
other_alt_group_hosts = reverse('api:group_hosts_list', args=(groups[2].pk,))
@ -621,15 +617,15 @@ class InventoryTest(BaseTest):
url = reverse('api:host_detail', args=(host2.pk,))
got = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(got['id'], host2.pk)
posted = self.post(url1, data=got, expect=204, auth=self.get_normal_credentials())
self.post(url1, data=got, expect=204, auth=self.get_normal_credentials())
data = self.get(url1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 3)
self.assertTrue(host2.pk in [x['id'] for x in data['results']])
# now add one new completely new host, to test creation+association in one go
new_host = dict(inventory=got['inventory'], name='completelynewhost.example.com', description='...')
posted = self.post(url1, data=new_host, expect=201, auth=self.get_normal_credentials())
self.post(url1, data=new_host, expect=201, auth=self.get_normal_credentials())
data = self.get(url1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 4)
@ -643,7 +639,7 @@ class InventoryTest(BaseTest):
# removal
got['disassociate'] = 1
posted = self.post(url1, data=got, expect=204, auth=self.get_normal_credentials())
self.post(url1, data=got, expect=204, auth=self.get_normal_credentials())
data = self.get(url1, expect=200, auth=self.get_normal_credentials())
self.assertEquals(data['count'], 3)
self.assertFalse(host2.pk in [x['id'] for x in data['results']])
@ -687,8 +683,8 @@ class InventoryTest(BaseTest):
args=(Group.objects.get(name='web6').pk,))
subgroups_url3 = reverse('api:group_children_list',
args=(Group.objects.get(name='web100').pk,))
subgroups_url4 = reverse('api:group_children_list',
args=(Group.objects.get(name='web101').pk,))
reverse('api:group_children_list',
args=(Group.objects.get(name='web101').pk,))
got = self.get(child_url, expect=200, auth=self.get_super_credentials())
self.post(subgroups_url, data=got, expect=204, auth=self.get_super_credentials())
kids = Group.objects.get(name='web2').children.all()
@ -697,7 +693,7 @@ class InventoryTest(BaseTest):
self.assertEquals(checked['count'], 1)
# an org admin can set subgroups
posted = self.post(subgroups_url2, data=got, expect=204, auth=self.get_normal_credentials())
self.post(subgroups_url2, data=got, expect=204, auth=self.get_normal_credentials())
# see if we can post a completely new subgroup
new_data = dict(inventory=inv.pk, name='completely new', description='blarg?')
@ -767,7 +763,7 @@ class InventoryTest(BaseTest):
'disassociate': 1,
}
with self.current_user(self.super_django_user):
response = self.post(url, data, expect=204)
self.post(url, data, expect=204)
gx3 = Group.objects.get(pk=gx3.pk)
#self.assertFalse(gx3.active) # FIXME: Disabled for now....
self.assertFalse(gx3 in gx2.children.all())
@ -778,13 +774,13 @@ class InventoryTest(BaseTest):
invalid_expect = 400 # hostname validation is disabled for now.
data = dict(name='', inventory=inv.pk)
with self.current_user(self.super_django_user):
response = self.post(hosts, data=data, expect=400)
self.post(hosts, data=data, expect=400)
#data = dict(name='not a valid host name', inventory=inv.pk)
#with self.current_user(self.super_django_user):
# response = self.post(hosts, data=data, expect=invalid_expect)
data = dict(name='validhost:99999', inventory=inv.pk)
with self.current_user(self.super_django_user):
response = self.post(hosts, data=data, expect=invalid_expect)
self.post(hosts, data=data, expect=invalid_expect)
#data = dict(name='123.234.345.456', inventory=inv.pk)
#with self.current_user(self.super_django_user):
# response = self.post(hosts, data=data, expect=invalid_expect)
@ -834,7 +830,7 @@ class InventoryTest(BaseTest):
h_d = i_a.hosts.create(name='d', variables=json.dumps({'d-vars': 'ddd'}))
h_d.groups.add(g_d)
# Add another host not in any groups.
h_z = i_a.hosts.create(name='z', variables=json.dumps({'z-vars': 'zzz'}))
i_a.hosts.create(name='z', variables=json.dumps({'z-vars': 'zzz'}))
# Old, slow 1.2 way.
url = reverse('api:inventory_script_view', args=(i_a.pk,))
@ -858,7 +854,7 @@ class InventoryTest(BaseTest):
self.assertEqual(response, h.variables_dict)
# Now add localhost to the inventory.
h_l = i_a.hosts.create(name='localhost', variables=json.dumps({'ansible_connection': 'local'}))
i_a.hosts.create(name='localhost', variables=json.dumps({'ansible_connection': 'local'}))
# New 1.3 way.
url = reverse('api:inventory_script_view', args=(i_a.pk,))
@ -1857,16 +1853,16 @@ class InventoryUpdatesTest(BaseTransactionTest):
custom_group = custom_inv.groups.create(name="Custom Script Group")
custom_inv_src = reverse('api:inventory_source_detail',
args=(custom_group.inventory_source.pk,))
custom_inv_update = reverse('api:inventory_source_update_view',
args=(custom_group.inventory_source.pk,))
reverse('api:inventory_source_update_view',
args=(custom_group.inventory_source.pk,))
inv_src_opts = {'source': 'custom',
'source_script': script_data["id"],
'source_vars': json.dumps({'HOME': 'no-place-like', 'USER': 'notme', '_': 'nope', 'INVENTORY_SOURCE_ID': -1})
}
with self.current_user(self.super_django_user):
response = self.put(custom_inv_src, inv_src_opts, expect=200)
self.put(custom_inv_src, inv_src_opts, expect=200)
self.check_inventory_source(custom_group.inventory_source)
# Delete script, verify that update fails.
inventory_source = InventorySource.objects.get(pk=custom_group.inventory_source.pk)
self.assertTrue(inventory_source.can_update)
@ -1883,11 +1879,11 @@ class InventoryUpdatesTest(BaseTransactionTest):
custom_group = custom_inv.groups.create(name="Unicode Script Group")
custom_inv_src = reverse('api:inventory_source_detail',
args=(custom_group.inventory_source.pk,))
custom_inv_update = reverse('api:inventory_source_update_view',
args=(custom_group.inventory_source.pk,))
reverse('api:inventory_source_update_view',
args=(custom_group.inventory_source.pk,))
inv_src_opts = {'source': 'custom', 'source_script': script_data["id"]}
with self.current_user(self.super_django_user):
response = self.put(custom_inv_src, inv_src_opts, expect=200)
self.put(custom_inv_src, inv_src_opts, expect=200)
self.check_inventory_source(custom_group.inventory_source)
# This shouldn't work because we are trying to use a custom script from one organization with
@ -1897,8 +1893,8 @@ class InventoryUpdatesTest(BaseTransactionTest):
other_group = other_inv.groups.create(name='A Different Org Group')
other_inv_src = reverse('api:inventory_source_detail',
args=(other_group.inventory_source.pk,))
other_inv_update = reverse('api:inventory_source_update_view',
args=(other_group.inventory_source.pk,))
reverse('api:inventory_source_update_view',
args=(other_group.inventory_source.pk,))
other_inv_src_opts = {'source': 'custom', 'source_script': script_data['id']}
with self.current_user(self.super_django_user):
self.put(other_inv_src, other_inv_src_opts, expect=400)

View File

@ -13,12 +13,9 @@ import uuid
# Django
import django.test
from django.contrib.auth.models import User as DjangoUser
from django.conf import settings
from django.core.urlresolvers import reverse
from django.db import transaction
from django.db.models import Q
from django.test.client import Client
from django.test.utils import override_settings
from django.utils.encoding import smart_str
@ -26,7 +23,7 @@ from django.utils.encoding import smart_str
import requests
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTestMixin
__all__ = ['JobTemplateTest', 'JobTest', 'JobStartCancelTest',
@ -721,27 +718,27 @@ class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
# Sue's credentials (superuser) == 200, full list
self.check_get_list(url, self.user_sue, qs, fields)
# Alex's credentials (admin of all orgs) == 200, full list
self.check_get_list(url, self.user_alex, qs, fields)
# Bob's credentials (admin of eng, user of ops) == 200, all from
# engineering and operations.
bob_qs = qs.filter(
qs.filter(
Q(project__organizations__admins__in=[self.user_bob]) |
Q(project__teams__users__in=[self.user_bob]),
)
#self.check_get_list(url, self.user_bob, bob_qs, fields)
# Chuck's credentials (admin of eng) == 200, all from engineering.
chuck_qs = qs.filter(
qs.filter(
Q(project__organizations__admins__in=[self.user_chuck]) |
Q(project__teams__users__in=[self.user_chuck]),
)
#self.check_get_list(url, self.user_chuck, chuck_qs, fields)
# Doug's credentials (user of eng) == 200, none?.
doug_qs = qs.filter(
qs.filter(
Q(project__organizations__admins__in=[self.user_doug]) |
Q(project__teams__users__in=[self.user_doug]),
)
@ -898,9 +895,7 @@ class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
with self.current_user(self.user_sue):
data = self.get(url)
data['name'] = '%s-updated' % data['name']
response = self.put(url, data)
#patch_data = dict(name='%s-changed' % data['name'])
#response = self.patch(url, patch_data)
self.put(url, data)
# FIXME: Check other credentials and optional fields.
@ -934,7 +929,7 @@ class JobTemplateTest(BaseJobTestMixin, django.test.TestCase):
# sue can create a new job from the template.
with self.current_user(self.user_sue):
response = self.post(url, data, expect=201)
self.post(url, data, expect=201)
# FIXME: Check other credentials and optional fields.
@ -1155,11 +1150,11 @@ class JobTest(BaseJobTestMixin, django.test.TestCase):
# sue can create a new job without a template.
with self.current_user(self.user_sue):
response = self.post(url, data, expect=201)
self.post(url, data, expect=201)
# alex can't create a job without a template, only super users can do that
with self.current_user(self.user_alex):
response = self.post(url, data, expect=403)
self.post(url, data, expect=403)
# sue can also create a job here from a template.
jt = self.jt_ops_east_run
@ -1168,7 +1163,7 @@ class JobTest(BaseJobTestMixin, django.test.TestCase):
job_template=jt.pk,
)
with self.current_user(self.user_sue):
response = self.post(url, data, expect=201)
self.post(url, data, expect=201)
# sue can't create a job when it is hidden due to inactive team
@ -1207,9 +1202,7 @@ class JobTest(BaseJobTestMixin, django.test.TestCase):
with self.current_user(self.user_sue):
data = self.get(url)
data['limit'] = '%s-updated' % data['limit']
response = self.put(url, data)
#patch_data = dict(limit='%s-changed' % data['limit'])
#response = self.patch(url, patch_data)
self.put(url, data)
# sue cannot update the job detail if it is in any other state.
for status in ('pending', 'running', 'successful', 'failed', 'error',

View File

@ -1,17 +1,11 @@
# Copyright (c) 2014 AnsibleWorks, Inc.
# All Rights Reserved.
import datetime
import json
from django.conf import settings
from django.contrib.auth.models import User as DjangoUser
import django.test
from django.test.client import Client
from django.core.urlresolvers import reverse
from awx.main.models import Host, Inventory, Organization
from awx.main.tests.base import BaseTest
from awx.main.task_engine import *
from awx.main.task_engine import * # noqa
class LicenseTests(BaseTest):
@ -23,18 +17,18 @@ class LicenseTests(BaseTest):
u = self.super_django_user
org = Organization.objects.create(name='o1', created_by=u)
inventory = Inventory.objects.create(name='hi', organization=org, created_by=u)
host = Host.objects.create(name='a1', inventory=inventory, created_by=u)
host = Host.objects.create(name='a2', inventory=inventory, created_by=u)
host = Host.objects.create(name='a3', inventory=inventory, created_by=u)
host = Host.objects.create(name='a4', inventory=inventory, created_by=u)
host = Host.objects.create(name='a5', inventory=inventory, created_by=u)
host = Host.objects.create(name='a6', inventory=inventory, created_by=u)
host = Host.objects.create(name='a7', inventory=inventory, created_by=u)
host = Host.objects.create(name='a8', inventory=inventory, created_by=u)
host = Host.objects.create(name='a9', inventory=inventory, created_by=u)
host = Host.objects.create(name='a10', inventory=inventory, created_by=u)
host = Host.objects.create(name='a11', inventory=inventory, created_by=u)
host = Host.objects.create(name='a12', inventory=inventory, created_by=u)
Host.objects.create(name='a1', inventory=inventory, created_by=u)
Host.objects.create(name='a2', inventory=inventory, created_by=u)
Host.objects.create(name='a3', inventory=inventory, created_by=u)
Host.objects.create(name='a4', inventory=inventory, created_by=u)
Host.objects.create(name='a5', inventory=inventory, created_by=u)
Host.objects.create(name='a6', inventory=inventory, created_by=u)
Host.objects.create(name='a7', inventory=inventory, created_by=u)
Host.objects.create(name='a8', inventory=inventory, created_by=u)
Host.objects.create(name='a9', inventory=inventory, created_by=u)
Host.objects.create(name='a10', inventory=inventory, created_by=u)
Host.objects.create(name='a11', inventory=inventory, created_by=u)
Host.objects.create(name='a12', inventory=inventory, created_by=u)
def tearDown(self):
super(LicenseTests, self).tearDown()
@ -62,7 +56,7 @@ class LicenseTests(BaseTest):
assert strdata_loaded == data
reader = TaskSerializer()
vdata = reader.from_string(strdata)
assert vdata['available_instances'] == 500

View File

@ -1,14 +1,8 @@
# Copyright (c) 2014 AnsibleWorks, Inc.
# All Rights Reserved.
import datetime
import json
from django.contrib.auth.models import User as DjangoUser
from django.core.urlresolvers import reverse
import django.test
from django.test.client import Client
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
class OrganizationsTest(BaseTest):
@ -129,31 +123,31 @@ class OrganizationsTest(BaseTest):
# first get all the orgs
orgs = self.get(self.collection(), expect=200, auth=self.get_super_credentials())
# find projects attached to the first org
projects0_url = orgs['results'][0]['related']['projects']
projects1_url = orgs['results'][1]['related']['projects']
projects9_url = orgs['results'][9]['related']['projects']
self.get(projects0_url, expect=401, auth=None)
self.get(projects0_url, expect=401, auth=self.get_invalid_credentials())
# normal user is just a member of the first org, so can see all projects under the org
projects0a = self.get(projects0_url, expect=200, auth=self.get_normal_credentials())
self.get(projects0_url, expect=200, auth=self.get_normal_credentials())
# however in the second org, he's an admin and should see all of them
projects1a = self.get(projects1_url, expect=200, auth=self.get_normal_credentials())
self.assertEquals(projects1a['count'], 5)
# but the non-admin cannot access the list of projects in the org. He should use /projects/ instead!
projects1b = self.get(projects1_url, expect=200, auth=self.get_other_credentials())
self.get(projects1_url, expect=200, auth=self.get_other_credentials())
# superuser should be able to read anything
projects9a = self.get(projects9_url, expect=200, auth=self.get_super_credentials())
self.assertEquals(projects9a['count'], 1)
# nobody user is not a member of any org, so can't see projects...
projects0a = self.get(projects0_url, expect=403, auth=self.get_nobody_credentials())
self.get(projects0_url, expect=403, auth=self.get_nobody_credentials())
projects1a = self.get(projects1_url, expect=403, auth=self.get_nobody_credentials())
def test_get_item_subobjects_users(self):
@ -220,7 +214,7 @@ class OrganizationsTest(BaseTest):
data1 = self.post(self.collection(), new_org, expect=201, auth=self.get_super_credentials())
# duplicate post results in 400
data2 = self.post(self.collection(), new_org, expect=400, auth=self.get_super_credentials())
self.post(self.collection(), new_org, expect=400, auth=self.get_super_credentials())
# look at what we got back from the post, make sure we added an org
last_org = Organization.objects.order_by('-pk')[0]
@ -295,7 +289,7 @@ class OrganizationsTest(BaseTest):
new_user = dict(username='NewUser9000')
which_org = self.normal_django_user.admin_of_organizations.all()[0]
url = reverse('api:organization_users_list', args=(which_org.pk,))
posted = self.post(url, new_user, expect=201, auth=self.get_normal_credentials())
self.post(url, new_user, expect=201, auth=self.get_normal_credentials())
all_users = self.get(url, expect=200, auth=self.get_normal_credentials())
self.assertEqual(all_users['count'], 3)
@ -337,7 +331,7 @@ class OrganizationsTest(BaseTest):
# first get some urls and data to put back to them
urls = self.get_urls(self.collection(), auth=self.get_super_credentials())
data0 = self.get(urls[0], expect=200, auth=self.get_super_credentials())
self.get(urls[0], expect=200, auth=self.get_super_credentials())
data1 = self.get(urls[1], expect=200, auth=self.get_super_credentials())
# test that an unauthenticated user cannot do a put
@ -346,9 +340,9 @@ class OrganizationsTest(BaseTest):
self.put(urls[0], new_data1, expect=401, auth=None)
self.put(urls[0], new_data1, expect=401, auth=self.get_invalid_credentials())
# user normal is an admin of org 0 and a member of org 1 so should be able to put only org 1
# user normal is an admin of org 0 and a member of org 1 so should be able to put only org 1
self.put(urls[0], new_data1, expect=403, auth=self.get_normal_credentials())
put_result = self.put(urls[1], new_data1, expect=200, auth=self.get_normal_credentials())
self.put(urls[1], new_data1, expect=200, auth=self.get_normal_credentials())
# get back org 1 and see if it changed
get_result = self.get(urls[1], expect=200, auth=self.get_normal_credentials())

View File

@ -2,7 +2,6 @@
# All Rights Reserved.
# Python
import datetime
import getpass
import json
import os
@ -15,15 +14,13 @@ import urlparse
# Django
from django.conf import settings
from django.contrib.auth.models import User
import django.test
from django.test.client import Client
from django.core.urlresolvers import reverse
from django.test.utils import override_settings
from django.utils.timezone import now
# AWX
from awx.main.models import *
from awx.main.tests.base import BaseTest, BaseTransactionTest
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTransactionTest
from awx.main.tests.tasks import TEST_SSH_KEY_DATA, TEST_SSH_KEY_DATA_LOCKED, TEST_SSH_KEY_DATA_UNLOCK
from awx.main.utils import decrypt_field, update_scm_url
@ -187,11 +184,11 @@ class ProjectsTest(BaseTransactionTest):
def test_dashboard(self):
url = reverse('api:dashboard_view')
# superuser can read dashboard.
response = self.get(url, expect=200, auth=self.get_super_credentials())
self.get(url, expect=200, auth=self.get_super_credentials())
# org admin can read dashboard.
response = self.get(url, expect=200, auth=self.get_normal_credentials())
self.get(url, expect=200, auth=self.get_normal_credentials())
# regular user can read dashboard.
response = self.get(url, expect=200, auth=self.get_nobody_credentials())
self.get(url, expect=200, auth=self.get_nobody_credentials())
# anonymous/invalid user can't access dashboard.
self.get(url, expect=401)
self.get(url, expect=401, auth=self.get_invalid_credentials())
@ -330,16 +327,16 @@ class ProjectsTest(BaseTransactionTest):
# can add teams
posted1 = self.post(all_teams, data=new_team, expect=201, auth=self.get_super_credentials())
posted2 = self.post(all_teams, data=new_team, expect=400, auth=self.get_super_credentials())
self.post(all_teams, data=new_team, expect=400, auth=self.get_super_credentials())
# normal user is not an admin of organizations[0], but is for [1].
posted3 = self.post(all_teams, data=new_team2, expect=403, auth=self.get_normal_credentials())
new_team2['organization'] = self.organizations[1].pk
posted3 = self.post(all_teams, data=new_team2, expect=201, auth=self.get_normal_credentials())
posted4 = self.post(all_teams, data=new_team2, expect=400, auth=self.get_normal_credentials())
posted5 = self.post(all_teams, data=new_team3, expect=403, auth=self.get_other_credentials())
url1 = posted1['url']
self.post(all_teams, data=new_team2, expect=400, auth=self.get_normal_credentials())
self.post(all_teams, data=new_team3, expect=403, auth=self.get_other_credentials())
posted1['url']
url3 = posted3['url']
url5 = posted1['url']
posted1['url']
new_team = Team.objects.create(name='newTeam4', organization=self.organizations[1])
url = reverse('api:team_detail', args=(new_team.pk,))
@ -356,11 +353,11 @@ class ProjectsTest(BaseTransactionTest):
# can list organization teams (filtered by user) -- this is an org admin function
org_teams = reverse('api:organization_teams_list', args=(self.organizations[1].pk,))
data1 = self.get(org_teams, expect=401)
self.get(org_teams, expect=401)
data2 = self.get(org_teams, expect=403, auth=self.get_nobody_credentials())
data3 = self.get(org_teams, expect=403, auth=self.get_other_credentials())
data4 = self.get(org_teams, expect=200, auth=self.get_normal_credentials())
data5 = self.get(org_teams, expect=200, auth=self.get_super_credentials())
self.get(org_teams, expect=403, auth=self.get_other_credentials())
self.get(org_teams, expect=200, auth=self.get_normal_credentials())
self.get(org_teams, expect=200, auth=self.get_super_credentials())
# can add teams to organizations
new_team1 = dict(name='super new team A')
@ -368,16 +365,16 @@ class ProjectsTest(BaseTransactionTest):
new_team2 = dict(name='super new team B', organization=34567)
new_team3 = dict(name='super new team C')
data1 = self.post(org_teams, new_team1, expect=401)
data1 = self.post(org_teams, new_team1, expect=403, auth=self.get_nobody_credentials())
data1 = self.post(org_teams, new_team1, expect=403, auth=self.get_other_credentials())
self.post(org_teams, new_team1, expect=401)
self.post(org_teams, new_team1, expect=403, auth=self.get_nobody_credentials())
self.post(org_teams, new_team1, expect=403, auth=self.get_other_credentials())
data2 = self.post(org_teams, new_team2, expect=201, auth=self.get_normal_credentials())
data3 = self.post(org_teams, new_team3, expect=201, auth=self.get_super_credentials())
self.post(org_teams, new_team3, expect=201, auth=self.get_super_credentials())
# can remove teams from organizations
data2['disassociate'] = 1
url = data2['url']
deleted = self.post(org_teams, data2, expect=204, auth=self.get_normal_credentials())
self.post(org_teams, data2, expect=204, auth=self.get_normal_credentials())
got = self.get(url, expect=404, auth=self.get_normal_credentials())
@ -1320,7 +1317,7 @@ class ProjectUpdatesTest(BaseTransactionTest):
scm_username=scm_username,
scm_password=scm_password,
)
should_error = bool('github.com' in scm_url and scm_username != 'git')
bool('github.com' in scm_url and scm_username != 'git')
self.check_project_update(project2, should_fail=None) # , should_error=should_error)
def test_scm_key_unlock_on_project_update(self):

View File

@ -3,20 +3,14 @@
# Python
import datetime
import json
import os
import re
# Django
from django.conf import settings
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse
from django.test.utils import override_settings
from django.utils.timezone import now
# AWX
from awx.main.models import *
from awx.main.tests.base import BaseTest, BaseTransactionTest
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
__all__ = ['ScheduleTest']
@ -76,7 +70,7 @@ class ScheduleTest(BaseTest):
self.first_inventory_source.source = 'ec2'
self.first_inventory_source.save()
inv_read = Permission.objects.create(
Permission.objects.create(
inventory = self.first_inventory,
user = self.other_django_user,
permission_type = 'read'
@ -123,7 +117,7 @@ class ScheduleTest(BaseTest):
def test_post_new_schedule(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
second_url = reverse('api:inventory_source_schedules_list', args=(self.second_inventory_source.pk,))
reverse('api:inventory_source_schedules_list', args=(self.second_inventory_source.pk,))
new_schedule = dict(name='newsched_1', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
@ -132,31 +126,31 @@ class ScheduleTest(BaseTest):
# Super user can post a new schedule
with self.current_user(self.super_django_user):
data = self.post(first_url, data=new_schedule, expect=201)
self.post(first_url, data=new_schedule, expect=201)
# #admin can post
# #admin can post
admin_schedule = dict(name='newsched_2', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
data = self.post(first_url, data=admin_schedule, expect=201, auth=self.get_normal_credentials())
self.post(first_url, data=admin_schedule, expect=201, auth=self.get_normal_credentials())
#normal user without write access can't post
unauth_schedule = dict(name='newsched_3', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
with self.current_user(self.other_django_user):
data = self.post(first_url, data=unauth_schedule, expect=403)
self.post(first_url, data=unauth_schedule, expect=403)
#give normal user write access and then they can post
inv_write = Permission.objects.create(
Permission.objects.create(
user = self.other_django_user,
inventory = self.first_inventory,
permission_type = PERM_INVENTORY_WRITE
)
auth_schedule = unauth_schedule
with self.current_user(self.other_django_user):
data = self.post(first_url, data=auth_schedule, expect=201)
self.post(first_url, data=auth_schedule, expect=201)
# another org user shouldn't be able to post a schedule to this org's schedule
diff_user_schedule = dict(name='newsched_4', description='newsched', enabled=True, rrule=GOOD_SCHEDULES[0])
with self.current_user(self.diff_org_user):
data = self.post(first_url, data=diff_user_schedule, expect=403)
self.post(first_url, data=diff_user_schedule, expect=403)
def test_post_schedule_to_non_cloud_source(self):
invalid_inv_url = reverse('api:inventory_source_schedules_list', args=(self.without_valid_source_inventory_source.pk,))
@ -190,9 +184,9 @@ class ScheduleTest(BaseTest):
long_schedule = dict(name='long_schedule', description='going for a long time', enabled=True, rrule=UNTIL_SCHEDULE)
with self.current_user(self.normal_django_user):
data = self.post(first_url, long_schedule, expect=201)
self.post(first_url, long_schedule, expect=201)
self.assertNotEquals(data['dtend'], None)
def test_schedule_filtering(self):
first_url = reverse('api:inventory_source_schedules_list', args=(self.first_inventory_source.pk,))
@ -200,14 +194,14 @@ class ScheduleTest(BaseTest):
dtstart_str = start_time.strftime("%Y%m%dT%H%M%SZ")
new_schedule = dict(name="filter_schedule_1", enabled=True, rrule="DTSTART:%s RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5" % dtstart_str)
with self.current_user(self.normal_django_user):
data = self.post(first_url, new_schedule, expect=201)
self.post(first_url, new_schedule, expect=201)
self.assertTrue(Schedule.objects.enabled().between(now(), now() + datetime.timedelta(minutes=10)).count(), 1)
start_time = now()
dtstart_str = start_time.strftime("%Y%m%dT%H%M%SZ")
new_schedule_middle = dict(name="runnable_schedule", enabled=True, rrule="DTSTART:%s RRULE:FREQ=MINUTELY;INTERVAL=10;COUNT=5" % dtstart_str)
with self.current_user(self.normal_django_user):
data = self.post(first_url, new_schedule_middle, expect=201)
self.post(first_url, new_schedule_middle, expect=201)
self.assertTrue(Schedule.objects.enabled().between(now() - datetime.timedelta(minutes=10), now() + datetime.timedelta(minutes=10)).count(), 1)
def test_rrule_validation(self):

View File

@ -4,18 +4,12 @@
# Python
import json
import os
import StringIO
import subprocess
import sys
import tempfile
import urlparse
# Django
from django.conf import settings
from django.utils.timezone import now
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
__all__ = ['InventoryScriptTest']

View File

@ -20,9 +20,8 @@ from django.utils.timezone import now
from crum import impersonate
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
from awx.main.tasks import RunJob
TEST_PLAYBOOK = u'''
- name: test success
@ -724,7 +723,7 @@ class RunJobTest(BaseCeleryTest):
self.assertEqual(job.processed_hosts.count(), 1)
def test_update_has_active_failures_when_inventory_changes(self):
job = self.test_run_job_that_fails()
self.test_run_job_that_fails()
# Add host to new group (should set has_active_failures)
new_group = self.inventory.groups.create(name='new group')
self.assertFalse(new_group.has_active_failures)
@ -1363,7 +1362,7 @@ class RunJobTest(BaseCeleryTest):
try:
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result = proc.communicate()
proc.communicate()
has_proot = bool(proc.returncode == 0)
except (OSError, ValueError):
has_proot = False

View File

@ -3,19 +3,16 @@
# Python
import datetime
import json
import urllib
# Django
from django.conf import settings
from django.contrib.auth.models import User, Group
from django.db.models import Q
import django.test
from django.test.client import Client
from django.core.urlresolvers import reverse
# AWX
from awx.main.models import *
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
__all__ = ['AuthTokenProxyTest', 'UsersTest', 'LdapTest']
@ -95,7 +92,7 @@ class AuthTokenProxyTest(BaseTest):
# Verify we can access our own user information, from the remote address specified via HTTP_X_FORWARDED_FOR
client_kwargs = {'HTTP_X_FORWARDED_FOR': remote_addr_diff}
response = self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr, client_kwargs=client_kwargs)
self._get_me(expect=401, auth=auth_token, remote_addr=remote_addr_diff)
# should use ip address from other headers when HTTP_X_FORARDED_FOR is blank
@ -142,7 +139,6 @@ class UsersTest(BaseTest):
def test_only_super_user_can_use_superuser_flag(self):
url = reverse('api:user_list')
new_super_user = dict(username='nommy', is_superuser=True)
patch_new_super_user = dict(is_superuser=True)
self.post(url, expect=401, data=new_super_user, auth=self.get_invalid_credentials())
self.post(url, expect=403, data=new_super_user, auth=self.get_other_credentials())
self.post(url, expect=403, data=new_super_user, auth=self.get_normal_credentials())
@ -274,7 +270,7 @@ class UsersTest(BaseTest):
# if superuser, CAN change lastname and username and such
self.put(detail_url, data, expect=200, auth=self.get_super_credentials())
# and user can still login
creds = self.get_other_credentials()
creds = ('newUsername', creds[1])
@ -284,18 +280,18 @@ class UsersTest(BaseTest):
# and password is not stored as plaintext
data['password'] = 'newPassWord1234Changed'
changed = self.put(detail_url, data, expect=200, auth=creds)
self.put(detail_url, data, expect=200, auth=creds)
creds = (creds[0], data['password'])
self.get(detail_url, expect=200, auth=creds)
# make another nobody user, and make sure they can't send any edits
obj = User.objects.create(username='new_user')
obj.set_password('new_user')
obj.save()
hacked = dict(password='asdf')
changed = self.put(detail_url, hacked, expect=403, auth=('new_user', 'new_user'))
self.put(detail_url, hacked, expect=403, auth=('new_user', 'new_user'))
hacked = dict(username='asdf')
changed = self.put(detail_url, hacked, expect=403, auth=('new_user', 'new_user'))
self.put(detail_url, hacked, expect=403, auth=('new_user', 'new_user'))
# password is not stored in plaintext
self.assertTrue(User.objects.get(pk=self.normal_django_user.pk).password != data['password'])
@ -311,10 +307,10 @@ class UsersTest(BaseTest):
# verify that the login works...
self.get(url, expect=200, auth=('username', 'password'))
# but a regular user cannot
# but a regular user cannot
data = self.post(url, expect=403, data=data2, auth=self.get_other_credentials())
# a super user can also create new users
# a super user can also create new users
data = self.post(url, expect=201, data=data2, auth=self.get_super_credentials())
# verify that the login works
@ -325,7 +321,7 @@ class UsersTest(BaseTest):
data = self.post(url, expect=201, data=mod, auth=self.get_super_credentials())
orig = User.objects.get(pk=self.super_django_user.pk)
self.assertTrue(orig.username != 'change')
def test_password_not_shown_in_get_operations_for_list_or_detail(self):
url = reverse('api:user_detail', args=(self.super_django_user.pk,))
data = self.get(url, expect=200, auth=self.get_super_credentials())
@ -360,8 +356,8 @@ class UsersTest(BaseTest):
def test_super_user_can_delete_a_user_but_only_marked_inactive(self):
user_pk = self.normal_django_user.pk
url = reverse('api:user_detail', args=(user_pk,))
data = self.delete(url, expect=204, auth=self.get_super_credentials())
data = self.get(url, expect=404, auth=self.get_super_credentials())
self.delete(url, expect=204, auth=self.get_super_credentials())
self.get(url, expect=404, auth=self.get_super_credentials())
obj = User.objects.get(pk=user_pk)
self.assertEquals(obj.is_active, False)
@ -369,9 +365,9 @@ class UsersTest(BaseTest):
url1 = reverse('api:user_detail', args=(self.super_django_user.pk,))
url2 = reverse('api:user_detail', args=(self.normal_django_user.pk,))
url3 = reverse('api:user_detail', args=(self.other_django_user.pk,))
data = self.delete(url1, expect=403, auth=self.get_other_credentials())
data = self.delete(url2, expect=403, auth=self.get_other_credentials())
data = self.delete(url3, expect=403, auth=self.get_other_credentials())
self.delete(url1, expect=403, auth=self.get_other_credentials())
self.delete(url2, expect=403, auth=self.get_other_credentials())
self.delete(url3, expect=403, auth=self.get_other_credentials())
def test_there_exists_an_obvious_url_where_a_user_may_find_his_user_record(self):
url = reverse('api:user_me_list')
@ -393,8 +389,8 @@ class UsersTest(BaseTest):
data['username'] += '2'
data['first_name'] += ' Awesome'
data['last_name'] += ', Jr.'
response = self.put(url, data, expect=200,
auth=self.get_super_credentials())
self.put(url, data, expect=200,
auth=self.get_super_credentials())
# FIXME: Test if super user mark himself as no longer a super user, or
# delete himself.

View File

@ -17,5 +17,5 @@ ignore=E201,E203,E221,E225,E231,E241,E251,E261,E265,E302,E303,E501,W291,W391,W29
exclude=awx/lib/site-packages,awx/ui,awx/api/urls.py,awx/main/migrations,awx/main/tests/data
[flake8]
ignore=E201,E203,E221,E225,E231,E241,E251,E261,E265,E302,E303,E501,W291,W391,W293
ignore=E201,E203,E221,E225,E231,E241,E251,E261,E265,E302,E303,E501,W291,W391,W293,E731
exclude=awx/lib/site-packages,awx/ui,awx/api/urls.py,awx/main/migrations,awx/main/tests/data