mirror of
https://github.com/ansible/awx.git
synced 2026-05-15 05:17:36 -02:30
convert py2 -> py3
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import mock
|
||||
from unittest import mock
|
||||
import pytest
|
||||
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Python
|
||||
import pytest
|
||||
import mock
|
||||
from mock import PropertyMock
|
||||
from unittest import mock
|
||||
from unittest.mock import PropertyMock
|
||||
|
||||
# AWX
|
||||
from awx.api.serializers import (
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
# Python
|
||||
from collections import namedtuple
|
||||
import pytest
|
||||
import mock
|
||||
from unittest import mock
|
||||
import json
|
||||
|
||||
from six.moves import xrange
|
||||
|
||||
# AWX
|
||||
from awx.api.serializers import (
|
||||
JobDetailSerializer,
|
||||
@@ -47,12 +45,12 @@ def job(mocker, job_template, project_update):
|
||||
|
||||
@pytest.fixture
|
||||
def labels(mocker):
|
||||
return [Label(id=x, name='label-%d' % x) for x in xrange(0, 25)]
|
||||
return [Label(id=x, name='label-%d' % x) for x in range(0, 25)]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def jobs(mocker):
|
||||
return [Job(id=x, name='job-%d' % x) for x in xrange(0, 25)]
|
||||
return [Job(id=x, name='job-%d' % x) for x in range(0, 25)]
|
||||
|
||||
|
||||
@mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {})
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
# Python
|
||||
import pytest
|
||||
import mock
|
||||
|
||||
from six.moves import xrange
|
||||
from unittest import mock
|
||||
|
||||
# AWX
|
||||
from awx.api.serializers import (
|
||||
@@ -41,7 +39,7 @@ def job(mocker, job_template):
|
||||
|
||||
@pytest.fixture
|
||||
def jobs(mocker):
|
||||
return [Job(id=x, name='job-%d' % x) for x in xrange(0, 25)]
|
||||
return [Job(id=x, name='job-%d' % x) for x in range(0, 25)]
|
||||
|
||||
|
||||
@mock.patch('awx.api.serializers.UnifiedJobTemplateSerializer.get_related', lambda x,y: {})
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Python
|
||||
import pytest
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
# AWX
|
||||
from awx.api.serializers import (
|
||||
|
||||
@@ -66,7 +66,7 @@ def test_invalid_field():
|
||||
field_lookup = FieldLookupBackend()
|
||||
with pytest.raises(ValueError) as excinfo:
|
||||
field_lookup.value_to_python(WorkflowJobTemplate, invalid_field, 'foo')
|
||||
assert 'is not an allowed field name. Must be ascii encodable.' in excinfo.value.message
|
||||
assert 'is not an allowed field name. Must be ascii encodable.' in str(excinfo.value)
|
||||
|
||||
|
||||
@pytest.mark.parametrize('lookup_suffix', ['', 'contains', 'startswith', 'in'])
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
|
||||
# Python
|
||||
import pytest
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
# DRF
|
||||
from rest_framework import status
|
||||
@@ -12,7 +12,6 @@ from rest_framework.exceptions import PermissionDenied
|
||||
from awx.api.generics import (
|
||||
ParentMixin,
|
||||
SubListCreateAttachDetachAPIView, SubListAttachDetachAPIView,
|
||||
DeleteLastUnattachLabelMixin,
|
||||
ResourceAccessList,
|
||||
ListAPIView
|
||||
)
|
||||
@@ -30,13 +29,6 @@ def get_object_or_400(mocker):
|
||||
return mocker.patch('awx.api.generics.get_object_or_400')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_response_new(mocker):
|
||||
m = mocker.patch('awx.api.generics.Response.__new__')
|
||||
m.return_value = m
|
||||
return m
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_organization():
|
||||
return Organization(pk=4, name="Unsaved Org")
|
||||
@@ -76,33 +68,26 @@ class TestSubListCreateAttachDetachAPIView:
|
||||
|
||||
assert type(res) is Response
|
||||
|
||||
def test_attach_create_and_associate(self, mocker, get_object_or_400, parent_relationship_factory, mock_response_new):
|
||||
def test_attach_create_and_associate(self, mocker, get_object_or_400, parent_relationship_factory):
|
||||
(serializer, mock_parent_relationship) = parent_relationship_factory(SubListCreateAttachDetachAPIView, 'wife')
|
||||
create_return_value = mocker.MagicMock(status_code=status.HTTP_201_CREATED)
|
||||
serializer.create = mocker.Mock(return_value=create_return_value)
|
||||
|
||||
mock_request = mocker.MagicMock(data=dict())
|
||||
ret = serializer.attach(mock_request, None, None)
|
||||
serializer.attach(mock_request, None, None)
|
||||
|
||||
assert ret == mock_response_new
|
||||
serializer.create.assert_called_with(mock_request, None, None)
|
||||
mock_parent_relationship.wife.add.assert_called_with(get_object_or_400.return_value)
|
||||
mock_response_new.assert_called_with(
|
||||
Response, create_return_value.data, status=status.HTTP_201_CREATED,
|
||||
headers={'Location': create_return_value['Location']}
|
||||
)
|
||||
|
||||
def test_attach_associate_only(self, mocker, get_object_or_400, parent_relationship_factory, mock_response_new):
|
||||
def test_attach_associate_only(self, mocker, get_object_or_400, parent_relationship_factory):
|
||||
(serializer, mock_parent_relationship) = parent_relationship_factory(SubListCreateAttachDetachAPIView, 'wife')
|
||||
serializer.create = mocker.Mock(return_value=mocker.MagicMock())
|
||||
|
||||
mock_request = mocker.MagicMock(data=dict(id=1))
|
||||
ret = serializer.attach(mock_request, None, None)
|
||||
serializer.attach(mock_request, None, None)
|
||||
|
||||
assert ret == mock_response_new
|
||||
serializer.create.assert_not_called()
|
||||
mock_parent_relationship.wife.add.assert_called_with(get_object_or_400.return_value)
|
||||
mock_response_new.assert_called_with(Response, status=status.HTTP_204_NO_CONTENT)
|
||||
|
||||
def test_unattach_validate_ok(self, mocker):
|
||||
mock_request = mocker.MagicMock(data=dict(id=1))
|
||||
@@ -183,44 +168,6 @@ def test_attach_detatch_only(mocker):
|
||||
assert 'field is missing' in resp.data['msg']
|
||||
|
||||
|
||||
class TestDeleteLastUnattachLabelMixin:
|
||||
@mock.patch('__builtin__.super')
|
||||
def test_unattach_ok(self, super, mocker):
|
||||
mock_request = mocker.MagicMock()
|
||||
mock_sub_id = mocker.MagicMock()
|
||||
super.return_value = super
|
||||
super.unattach_validate = mocker.MagicMock(return_value=(mock_sub_id, None))
|
||||
super.unattach_by_id = mocker.MagicMock()
|
||||
|
||||
mock_model = mocker.MagicMock()
|
||||
mock_model.objects.get.return_value = mock_model
|
||||
mock_model.is_detached.return_value = True
|
||||
|
||||
view = DeleteLastUnattachLabelMixin()
|
||||
view.model = mock_model
|
||||
|
||||
view.unattach(mock_request, None, None)
|
||||
|
||||
super.unattach_validate.assert_called_with(mock_request)
|
||||
super.unattach_by_id.assert_called_with(mock_request, mock_sub_id)
|
||||
mock_model.is_detached.assert_called_with()
|
||||
mock_model.objects.get.assert_called_with(id=mock_sub_id)
|
||||
mock_model.delete.assert_called_with()
|
||||
|
||||
@mock.patch('__builtin__.super')
|
||||
def test_unattach_fail(self, super, mocker):
|
||||
mock_request = mocker.MagicMock()
|
||||
mock_response = mocker.MagicMock()
|
||||
super.return_value = super
|
||||
super.unattach_validate = mocker.MagicMock(return_value=(None, mock_response))
|
||||
view = DeleteLastUnattachLabelMixin()
|
||||
|
||||
res = view.unattach(mock_request, None, None)
|
||||
|
||||
super.unattach_validate.assert_called_with(mock_request)
|
||||
assert mock_response == res
|
||||
|
||||
|
||||
class TestParentMixin:
|
||||
def test_get_parent_object(self, mocker, get_object_or_404):
|
||||
parent_mixin = ParentMixin()
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
import StringIO
|
||||
from io import StringIO
|
||||
|
||||
# AWX
|
||||
from awx.api.parsers import JSONParser
|
||||
@@ -16,14 +16,14 @@ from rest_framework.exceptions import ParseError
|
||||
]
|
||||
)
|
||||
def test_jsonparser_valid_input(input_, output):
|
||||
input_stream = StringIO.StringIO(input_)
|
||||
input_stream = StringIO(input_)
|
||||
assert JSONParser().parse(input_stream) == output
|
||||
input_stream.close()
|
||||
|
||||
|
||||
@pytest.mark.parametrize('invalid_input', ['1', '"foobar"', '3.14', '{"foo": "bar",}'])
|
||||
def test_json_parser_invalid_input(invalid_input):
|
||||
input_stream = StringIO.StringIO(invalid_input)
|
||||
input_stream = StringIO(invalid_input)
|
||||
with pytest.raises(ParseError):
|
||||
JSONParser().parse(input_stream)
|
||||
input_stream.close()
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import re
|
||||
|
||||
import mock
|
||||
import pytest
|
||||
import requests
|
||||
from copy import deepcopy
|
||||
from unittest import mock
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
@@ -24,13 +23,6 @@ from awx.main.views import handle_error
|
||||
from rest_framework.test import APIRequestFactory
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_response_new(mocker):
|
||||
m = mocker.patch('awx.api.views.Response.__new__')
|
||||
m.return_value = m
|
||||
return m
|
||||
|
||||
|
||||
def test_handle_error():
|
||||
# Assure that templating of error does not raise errors
|
||||
request = APIRequestFactory().get('/fooooo/')
|
||||
@@ -38,7 +30,7 @@ def test_handle_error():
|
||||
|
||||
|
||||
class TestApiRootView:
|
||||
def test_get_endpoints(self, mocker, mock_response_new):
|
||||
def test_get_endpoints(self, mocker):
|
||||
endpoints = [
|
||||
'ping',
|
||||
'config',
|
||||
@@ -72,11 +64,9 @@ class TestApiRootView:
|
||||
]
|
||||
view = ApiVersionRootView()
|
||||
ret = view.get(mocker.MagicMock())
|
||||
|
||||
assert ret == mock_response_new
|
||||
data_arg = mock_response_new.mock_calls[0][1][1]
|
||||
assert ret.status_code == 200
|
||||
for endpoint in endpoints:
|
||||
assert endpoint in data_arg
|
||||
assert endpoint in ret.data
|
||||
|
||||
|
||||
class TestJobTemplateLabelList:
|
||||
|
||||
@@ -20,8 +20,8 @@ class TestInvalidOptions:
|
||||
cmd = Command()
|
||||
with pytest.raises(CommandError) as err:
|
||||
cmd.handle()
|
||||
assert 'inventory-id' in err.value.message
|
||||
assert 'required' in err.value.message
|
||||
assert 'inventory-id' in str(err.value)
|
||||
assert 'required' in str(err.value)
|
||||
|
||||
def test_invalid_options_name_and_id(self):
|
||||
# You can not specify both name and if of the inventory
|
||||
@@ -30,8 +30,8 @@ class TestInvalidOptions:
|
||||
cmd.handle(
|
||||
inventory_id=42, inventory_name='my-inventory'
|
||||
)
|
||||
assert 'inventory-id' in err.value.message
|
||||
assert 'exclusive' in err.value.message
|
||||
assert 'inventory-id' in str(err.value)
|
||||
assert 'exclusive' in str(err.value)
|
||||
|
||||
def test_invalid_options_id_and_keep_vars(self):
|
||||
# You can't overwrite and keep_vars at the same time, that wouldn't make sense
|
||||
@@ -40,8 +40,8 @@ class TestInvalidOptions:
|
||||
cmd.handle(
|
||||
inventory_id=42, overwrite=True, keep_vars=True
|
||||
)
|
||||
assert 'overwrite-vars' in err.value.message
|
||||
assert 'exclusive' in err.value.message
|
||||
assert 'overwrite-vars' in str(err.value)
|
||||
assert 'exclusive' in str(err.value)
|
||||
|
||||
def test_invalid_options_id_but_no_source(self):
|
||||
# Need a source to import
|
||||
@@ -50,19 +50,19 @@ class TestInvalidOptions:
|
||||
cmd.handle(
|
||||
inventory_id=42, overwrite=True, keep_vars=True
|
||||
)
|
||||
assert 'overwrite-vars' in err.value.message
|
||||
assert 'exclusive' in err.value.message
|
||||
assert 'overwrite-vars' in str(err.value)
|
||||
assert 'exclusive' in str(err.value)
|
||||
with pytest.raises(CommandError) as err:
|
||||
cmd.handle(
|
||||
inventory_id=42, overwrite_vars=True, keep_vars=True
|
||||
)
|
||||
assert 'overwrite-vars' in err.value.message
|
||||
assert 'exclusive' in err.value.message
|
||||
assert 'overwrite-vars' in str(err.value)
|
||||
assert 'exclusive' in str(err.value)
|
||||
|
||||
def test_invalid_options_missing_source(self):
|
||||
cmd = Command()
|
||||
with pytest.raises(CommandError) as err:
|
||||
cmd.handle(inventory_id=42)
|
||||
assert '--source' in err.value.message
|
||||
assert 'required' in err.value.message
|
||||
assert '--source' in str(err.value)
|
||||
assert 'required' in str(err.value)
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
|
||||
# Python
|
||||
import pytest
|
||||
import mock
|
||||
from unittest import mock
|
||||
from datetime import timedelta
|
||||
|
||||
# Django
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import pytest
|
||||
import logging
|
||||
|
||||
from mock import PropertyMock
|
||||
from unittest.mock import PropertyMock
|
||||
|
||||
from awx.api.urls import urlpatterns as api_patterns
|
||||
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import StringIO
|
||||
import mock
|
||||
import os
|
||||
import pytest
|
||||
import re
|
||||
@@ -10,10 +8,13 @@ import stat
|
||||
import tempfile
|
||||
import time
|
||||
from collections import OrderedDict
|
||||
from io import StringIO
|
||||
from unittest import mock
|
||||
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from django.utils.encoding import smart_str, smart_bytes
|
||||
|
||||
from awx.main.expect import run, isolated_manager
|
||||
|
||||
@@ -32,11 +33,11 @@ def rsa_key(request):
|
||||
backend=default_backend()
|
||||
)
|
||||
return (
|
||||
key.private_bytes(
|
||||
smart_str(key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=serialization.BestAvailableEncryption(passphrase)
|
||||
),
|
||||
encryption_algorithm=serialization.BestAvailableEncryption(smart_bytes(passphrase))
|
||||
)),
|
||||
passphrase
|
||||
)
|
||||
|
||||
@@ -58,7 +59,7 @@ def mock_sleep(request):
|
||||
|
||||
|
||||
def test_simple_spawn():
|
||||
stdout = StringIO.StringIO()
|
||||
stdout = StringIO()
|
||||
status, rc = run.run_pexpect(
|
||||
['ls', '-la'],
|
||||
HERE,
|
||||
@@ -72,7 +73,7 @@ def test_simple_spawn():
|
||||
|
||||
|
||||
def test_error_rc():
|
||||
stdout = StringIO.StringIO()
|
||||
stdout = StringIO()
|
||||
status, rc = run.run_pexpect(
|
||||
['ls', '-nonsense'],
|
||||
HERE,
|
||||
@@ -86,7 +87,7 @@ def test_error_rc():
|
||||
|
||||
|
||||
def test_cancel_callback_error():
|
||||
stdout = StringIO.StringIO()
|
||||
stdout = StringIO()
|
||||
|
||||
def bad_callback():
|
||||
raise Exception('unique exception')
|
||||
@@ -108,7 +109,7 @@ def test_cancel_callback_error():
|
||||
@pytest.mark.timeout(3) # https://github.com/ansible/tower/issues/2391#issuecomment-401946895
|
||||
@pytest.mark.parametrize('value', ['abc123', six.u('Iñtërnâtiônàlizætiøn')])
|
||||
def test_env_vars(value):
|
||||
stdout = StringIO.StringIO()
|
||||
stdout = StringIO()
|
||||
status, rc = run.run_pexpect(
|
||||
['python', '-c', 'import os; print os.getenv("X_MY_ENV")'],
|
||||
HERE,
|
||||
@@ -122,7 +123,7 @@ def test_env_vars(value):
|
||||
|
||||
|
||||
def test_password_prompt():
|
||||
stdout = StringIO.StringIO()
|
||||
stdout = StringIO()
|
||||
expect_passwords = OrderedDict()
|
||||
expect_passwords[re.compile(r'Password:\s*?$', re.M)] = 'secret123'
|
||||
status, rc = run.run_pexpect(
|
||||
@@ -139,7 +140,7 @@ def test_password_prompt():
|
||||
|
||||
|
||||
def test_job_timeout():
|
||||
stdout = StringIO.StringIO()
|
||||
stdout = StringIO()
|
||||
extra_update_fields={}
|
||||
status, rc = run.run_pexpect(
|
||||
['python', '-c', 'import time; time.sleep(5)'],
|
||||
@@ -156,7 +157,7 @@ def test_job_timeout():
|
||||
|
||||
|
||||
def test_manual_cancellation():
|
||||
stdout = StringIO.StringIO()
|
||||
stdout = StringIO()
|
||||
status, rc = run.run_pexpect(
|
||||
['python', '-c', 'print raw_input("Password: ")'],
|
||||
HERE,
|
||||
@@ -172,7 +173,7 @@ def test_manual_cancellation():
|
||||
def test_build_isolated_job_data(private_data_dir, rsa_key):
|
||||
pem, passphrase = rsa_key
|
||||
mgr = isolated_manager.IsolatedManager(
|
||||
['ls', '-la'], HERE, {}, StringIO.StringIO(), ''
|
||||
['ls', '-la'], HERE, {}, StringIO(), ''
|
||||
)
|
||||
mgr.private_data_dir = private_data_dir
|
||||
mgr.build_isolated_job_data()
|
||||
@@ -209,7 +210,7 @@ def test_run_isolated_job(private_data_dir, rsa_key):
|
||||
env = {'JOB_ID': '1'}
|
||||
pem, passphrase = rsa_key
|
||||
mgr = isolated_manager.IsolatedManager(
|
||||
['ls', '-la'], HERE, env, StringIO.StringIO(), ''
|
||||
['ls', '-la'], HERE, env, StringIO(), ''
|
||||
)
|
||||
mgr.private_data_dir = private_data_dir
|
||||
secrets = {
|
||||
@@ -220,7 +221,7 @@ def test_run_isolated_job(private_data_dir, rsa_key):
|
||||
'ssh_key_data': pem
|
||||
}
|
||||
mgr.build_isolated_job_data()
|
||||
stdout = StringIO.StringIO()
|
||||
stdout = StringIO()
|
||||
# Mock environment variables for callback module
|
||||
with mock.patch('os.getenv') as env_mock:
|
||||
env_mock.return_value = '/path/to/awx/lib'
|
||||
@@ -239,7 +240,7 @@ def test_run_isolated_adhoc_command(private_data_dir, rsa_key):
|
||||
env = {'AD_HOC_COMMAND_ID': '1'}
|
||||
pem, passphrase = rsa_key
|
||||
mgr = isolated_manager.IsolatedManager(
|
||||
['pwd'], HERE, env, StringIO.StringIO(), ''
|
||||
['pwd'], HERE, env, StringIO(), ''
|
||||
)
|
||||
mgr.private_data_dir = private_data_dir
|
||||
secrets = {
|
||||
@@ -250,7 +251,7 @@ def test_run_isolated_adhoc_command(private_data_dir, rsa_key):
|
||||
'ssh_key_data': pem
|
||||
}
|
||||
mgr.build_isolated_job_data()
|
||||
stdout = StringIO.StringIO()
|
||||
stdout = StringIO()
|
||||
# Mock environment variables for callback module
|
||||
with mock.patch('os.getenv') as env_mock:
|
||||
env_mock.return_value = '/path/to/awx/lib'
|
||||
@@ -270,7 +271,7 @@ def test_run_isolated_adhoc_command(private_data_dir, rsa_key):
|
||||
|
||||
def test_check_isolated_job(private_data_dir, rsa_key):
|
||||
pem, passphrase = rsa_key
|
||||
stdout = StringIO.StringIO()
|
||||
stdout = StringIO()
|
||||
mgr = isolated_manager.IsolatedManager(['ls', '-la'], HERE, {}, stdout, '')
|
||||
mgr.private_data_dir = private_data_dir
|
||||
mgr.instance = mock.Mock(id=123, pk=123, verbosity=5, spec_set=['id', 'pk', 'verbosity'])
|
||||
@@ -318,51 +319,9 @@ def test_check_isolated_job(private_data_dir, rsa_key):
|
||||
)
|
||||
|
||||
|
||||
def test_check_isolated_job_with_multibyte_unicode(private_data_dir):
|
||||
"""
|
||||
Ensure that multibyte unicode is properly synced when stdout only
|
||||
contains the first part of the multibyte character
|
||||
|
||||
see: https://github.com/ansible/tower/issues/2315
|
||||
"""
|
||||
def raw_output():
|
||||
yield ('failed', '\xe8\xb5\xb7\xe5') # 起 <partial byte>
|
||||
yield ('successful', '\xe8\xb5\xb7\xe5\x8b\x95') # 起動
|
||||
raw_output = raw_output()
|
||||
stdout = StringIO.StringIO()
|
||||
mgr = isolated_manager.IsolatedManager(['ls', '-la'], HERE, {}, stdout, '')
|
||||
mgr.private_data_dir = private_data_dir
|
||||
mgr.instance = mock.Mock(id=123, pk=123, verbosity=5, spec_set=['id', 'pk', 'verbosity'])
|
||||
mgr.started_at = time.time()
|
||||
mgr.host = 'isolated-host'
|
||||
|
||||
os.mkdir(os.path.join(private_data_dir, 'artifacts'))
|
||||
with mock.patch('awx.main.expect.run.run_pexpect') as run_pexpect:
|
||||
|
||||
def _synchronize_job_artifacts(args, cwd, env, buff, **kw):
|
||||
buff.write('checking job status...')
|
||||
status, out = next(raw_output)
|
||||
for filename, data in (
|
||||
['status', status],
|
||||
['rc', '0'],
|
||||
['stdout', out]
|
||||
):
|
||||
with open(os.path.join(private_data_dir, 'artifacts', filename), 'w') as f:
|
||||
f.write(data)
|
||||
f.flush()
|
||||
return (status, 0)
|
||||
|
||||
run_pexpect.side_effect = _synchronize_job_artifacts
|
||||
with mock.patch.object(mgr, '_missing_artifacts') as missing_artifacts:
|
||||
missing_artifacts.return_value = False
|
||||
status, rc = mgr.check(interval=0)
|
||||
|
||||
assert stdout.getvalue() == '起動'
|
||||
|
||||
|
||||
def test_check_isolated_job_timeout(private_data_dir, rsa_key):
|
||||
pem, passphrase = rsa_key
|
||||
stdout = StringIO.StringIO()
|
||||
stdout = StringIO()
|
||||
extra_update_fields = {}
|
||||
mgr = isolated_manager.IsolatedManager(['ls', '-la'], HERE, {}, stdout, '',
|
||||
job_timeout=1,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from datetime import datetime
|
||||
from django.utils.timezone import utc
|
||||
import mock
|
||||
from unittest import mock
|
||||
import pytest
|
||||
|
||||
from awx.main.models import (JobEvent, ProjectUpdateEvent, AdHocCommandEvent,
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import pytest
|
||||
import mock
|
||||
from mock import Mock
|
||||
from unittest import mock
|
||||
from unittest.mock import Mock
|
||||
|
||||
from awx.main.models import (
|
||||
Job,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
import mock
|
||||
from unittest import mock
|
||||
import json
|
||||
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
@@ -3,7 +3,7 @@ import pytest
|
||||
# AWX
|
||||
from awx.main.models.jobs import JobTemplate
|
||||
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
|
||||
def test_missing_project_error(job_template_factory):
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
from awx.main.models.label import Label
|
||||
from awx.main.models.unified_jobs import UnifiedJobTemplate, UnifiedJob
|
||||
|
||||
@@ -85,6 +85,7 @@ def job(mocker):
|
||||
'pk': 1, 'job_template.pk': 1, 'job_template.name': '',
|
||||
'created_by.pk': 1, 'created_by.username': 'admin',
|
||||
'launch_type': 'manual',
|
||||
'verbosity': 1,
|
||||
'awx_meta_vars.return_value': {},
|
||||
'inventory.get_script_data.return_value': {}})
|
||||
ret.project = mocker.MagicMock(scm_revision='asdf1234')
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
from awx.main.models import (
|
||||
UnifiedJob,
|
||||
|
||||
@@ -6,7 +6,7 @@ from awx.main.models.workflow import (
|
||||
WorkflowJobTemplate, WorkflowJobTemplateNode, WorkflowJobOptions,
|
||||
WorkflowJob, WorkflowJobNode
|
||||
)
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -236,4 +236,4 @@ class TestWorkflowJobNodeJobKWARGS:
|
||||
|
||||
|
||||
def test_get_ask_mapping_integrity():
|
||||
assert WorkflowJobTemplate.get_ask_mapping().keys() == ['extra_vars', 'inventory']
|
||||
assert list(WorkflowJobTemplate.get_ask_mapping().keys()) == ['extra_vars', 'inventory']
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import mock
|
||||
import json
|
||||
|
||||
from unittest import mock
|
||||
from django.core.mail.message import EmailMessage
|
||||
|
||||
import awx.main.notifications.rocketchat_backend as rocketchat_backend
|
||||
@@ -6,6 +8,7 @@ import awx.main.notifications.rocketchat_backend as rocketchat_backend
|
||||
|
||||
def test_send_messages():
|
||||
with mock.patch('awx.main.notifications.rocketchat_backend.requests') as requests_mock:
|
||||
requests_mock.post.return_value.status_code = 201
|
||||
backend = rocketchat_backend.RocketChatBackend()
|
||||
message = EmailMessage('test subject', 'test body', [], ['http://example.com', ])
|
||||
sent_messages = backend.send_messages([message, ])
|
||||
@@ -15,24 +18,41 @@ def test_send_messages():
|
||||
|
||||
def test_send_messages_with_username():
|
||||
with mock.patch('awx.main.notifications.rocketchat_backend.requests') as requests_mock:
|
||||
requests_mock.post.return_value.status_code = 201
|
||||
backend = rocketchat_backend.RocketChatBackend(rocketchat_username='testuser')
|
||||
message = EmailMessage('test subject', 'test body', [], ['http://example.com', ])
|
||||
sent_messages = backend.send_messages([message, ])
|
||||
requests_mock.post.assert_called_once_with('http://example.com', data='{"username": "testuser", "text": "test subject"}', verify=True)
|
||||
|
||||
calls = requests_mock.post.call_args_list
|
||||
assert len(calls) == 1
|
||||
args, kwargs = calls[0]
|
||||
assert args[0] == 'http://example.com'
|
||||
assert json.loads(kwargs['data'])['text'] == 'test subject'
|
||||
assert json.loads(kwargs['data'])['username'] == 'testuser'
|
||||
assert kwargs['verify'] is True
|
||||
assert sent_messages == 1
|
||||
|
||||
|
||||
def test_send_messages_with_icon_url():
|
||||
with mock.patch('awx.main.notifications.rocketchat_backend.requests') as requests_mock:
|
||||
requests_mock.post.return_value.status_code = 201
|
||||
backend = rocketchat_backend.RocketChatBackend(rocketchat_icon_url='http://example.com')
|
||||
message = EmailMessage('test subject', 'test body', [], ['http://example.com', ])
|
||||
sent_messages = backend.send_messages([message, ])
|
||||
requests_mock.post.assert_called_once_with('http://example.com', data='{"text": "test subject", "icon_url": "http://example.com"}', verify=True)
|
||||
|
||||
calls = requests_mock.post.call_args_list
|
||||
assert len(calls) == 1
|
||||
args, kwargs = calls[0]
|
||||
assert args[0] == 'http://example.com'
|
||||
assert json.loads(kwargs['data'])['text'] == 'test subject'
|
||||
assert json.loads(kwargs['data'])['icon_url'] == 'http://example.com'
|
||||
assert kwargs['verify'] is True
|
||||
assert sent_messages == 1
|
||||
|
||||
|
||||
def test_send_messages_with_no_verify_ssl():
|
||||
with mock.patch('awx.main.notifications.rocketchat_backend.requests') as requests_mock:
|
||||
requests_mock.post.return_value.status_code = 201
|
||||
backend = rocketchat_backend.RocketChatBackend(rocketchat_no_verify_ssl=True)
|
||||
message = EmailMessage('test subject', 'test body', [], ['http://example.com', ])
|
||||
sent_messages = backend.send_messages([message, ])
|
||||
|
||||
@@ -14,7 +14,8 @@ def node_generator():
|
||||
def simple_cycle_1(node_generator):
|
||||
g = SimpleDAG()
|
||||
nodes = [node_generator() for i in range(4)]
|
||||
map(lambda n: g.add_node(n), nodes)
|
||||
for n in nodes:
|
||||
g.add_node(n)
|
||||
|
||||
r'''
|
||||
0
|
||||
|
||||
@@ -36,7 +36,8 @@ def wf_node_generator(mocker):
|
||||
def workflow_dag_1(wf_node_generator):
|
||||
g = WorkflowDAG()
|
||||
nodes = [wf_node_generator() for i in range(4)]
|
||||
map(lambda n: g.add_node(n), nodes)
|
||||
for n in nodes:
|
||||
g.add_node(n)
|
||||
|
||||
r'''
|
||||
0
|
||||
@@ -67,7 +68,8 @@ class TestWorkflowDAG():
|
||||
wf_root_nodes = [wf_node_generator() for i in range(0, 10)]
|
||||
wf_leaf_nodes = [wf_node_generator() for i in range(0, 10)]
|
||||
|
||||
map(lambda n: g.add_node(n), wf_root_nodes + wf_leaf_nodes)
|
||||
for n in wf_root_nodes + wf_leaf_nodes:
|
||||
g.add_node(n)
|
||||
|
||||
'''
|
||||
Pair up a root node with a single child via an edge
|
||||
@@ -77,7 +79,8 @@ class TestWorkflowDAG():
|
||||
| | |
|
||||
C1 C2 Cx
|
||||
'''
|
||||
map(lambda (i, n): g.add_edge(wf_root_nodes[i], n, 'label'), enumerate(wf_leaf_nodes))
|
||||
for i, n in enumerate(wf_leaf_nodes):
|
||||
g.add_edge(wf_root_nodes[i], n, 'label')
|
||||
return (g, wf_root_nodes, wf_leaf_nodes)
|
||||
|
||||
|
||||
@@ -185,7 +188,8 @@ class TestIsWorkflowDone():
|
||||
def workflow_dag_canceled(self, wf_node_generator):
|
||||
g = WorkflowDAG()
|
||||
nodes = [wf_node_generator() for i in range(1)]
|
||||
map(lambda n: g.add_node(n), nodes)
|
||||
for n in nodes:
|
||||
g.add_node(n)
|
||||
r'''
|
||||
F0
|
||||
'''
|
||||
@@ -252,7 +256,8 @@ class TestBFSNodesToRun():
|
||||
def workflow_dag_canceled(self, wf_node_generator):
|
||||
g = WorkflowDAG()
|
||||
nodes = [wf_node_generator() for i in range(4)]
|
||||
map(lambda n: g.add_node(n), nodes)
|
||||
for n in nodes:
|
||||
g.add_node(n)
|
||||
r'''
|
||||
C0
|
||||
/ | \
|
||||
@@ -279,7 +284,8 @@ class TestDocsExample():
|
||||
def complex_dag(self, wf_node_generator):
|
||||
g = WorkflowDAG()
|
||||
nodes = [wf_node_generator() for i in range(10)]
|
||||
map(lambda n: g.add_node(n), nodes)
|
||||
for n in nodes:
|
||||
g.add_node(n)
|
||||
|
||||
g.add_edge(nodes[0], nodes[1], "failure_nodes")
|
||||
g.add_edge(nodes[0], nodes[2], "success_nodes")
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.forms.models import model_to_dict
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
# Copyright (c) 2016 Ansible, Inc.
|
||||
|
||||
# Python
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
# AWX
|
||||
from awx.main.ha import is_ha_environment
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
from contextlib import contextmanager
|
||||
from datetime import datetime
|
||||
from functools import partial
|
||||
import ConfigParser
|
||||
import configparser
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
@@ -12,7 +12,7 @@ import tempfile
|
||||
|
||||
from backports.tempfile import TemporaryDirectory
|
||||
import fcntl
|
||||
import mock
|
||||
from unittest import mock
|
||||
import pytest
|
||||
import six
|
||||
import yaml
|
||||
@@ -427,11 +427,11 @@ class TestExtraVarSanitation(TestJobExecution):
|
||||
class TestGenericRun(TestJobExecution):
|
||||
|
||||
def test_generic_failure(self):
|
||||
self.task.build_private_data_files = mock.Mock(side_effect=IOError())
|
||||
self.task.build_private_data_files = mock.Mock(side_effect=OSError())
|
||||
with pytest.raises(Exception):
|
||||
self.task.run(self.pk)
|
||||
update_model_call = self.task.update_model.call_args[1]
|
||||
assert 'IOError' in update_model_call['result_traceback']
|
||||
assert 'OSError' in update_model_call['result_traceback']
|
||||
assert update_model_call['status'] == 'error'
|
||||
assert update_model_call['emitted_events'] == 0
|
||||
|
||||
@@ -1131,7 +1131,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
shade_config = open(env['OS_CLIENT_CONFIG_FILE'], 'rb').read()
|
||||
shade_config = open(env['OS_CLIENT_CONFIG_FILE'], 'r').read()
|
||||
assert shade_config == '\n'.join([
|
||||
'clouds:',
|
||||
' devstack:',
|
||||
@@ -1167,7 +1167,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(env['OVIRT_INI_PATH'])
|
||||
assert config.get('ovirt', 'ovirt_url') == 'some-ovirt-host.example.org'
|
||||
assert config.get('ovirt', 'ovirt_username') == 'bob'
|
||||
@@ -1175,7 +1175,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
if ca_file:
|
||||
assert config.get('ovirt', 'ovirt_ca_file') == ca_file
|
||||
else:
|
||||
with pytest.raises(ConfigParser.NoOptionError):
|
||||
with pytest.raises(configparser.NoOptionError):
|
||||
config.get('ovirt', 'ovirt_ca_file')
|
||||
return ['successful', 0]
|
||||
|
||||
@@ -1209,7 +1209,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
assert env['ANSIBLE_NET_AUTHORIZE'] == expected_authorize
|
||||
if authorize:
|
||||
assert env['ANSIBLE_NET_AUTH_PASS'] == 'authorizeme'
|
||||
assert open(env['ANSIBLE_NET_SSH_KEYFILE'], 'rb').read() == self.EXAMPLE_PRIVATE_KEY
|
||||
assert open(env['ANSIBLE_NET_SSH_KEYFILE'], 'r').read() == self.EXAMPLE_PRIVATE_KEY
|
||||
return ['successful', 0]
|
||||
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
@@ -1549,7 +1549,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
assert open(env['MY_CLOUD_INI_FILE'], 'rb').read() == '[mycloud]\nABC123'
|
||||
assert open(env['MY_CLOUD_INI_FILE'], 'r').read() == '[mycloud]\nABC123'
|
||||
return ['successful', 0]
|
||||
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
@@ -1576,7 +1576,7 @@ class TestJobCredentials(TestJobExecution):
|
||||
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
assert open(env['MY_CLOUD_INI_FILE'], 'rb').read() == value.encode('utf-8')
|
||||
assert open(env['MY_CLOUD_INI_FILE'], 'r').read() == value
|
||||
return ['successful', 0]
|
||||
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
@@ -1619,8 +1619,8 @@ class TestJobCredentials(TestJobExecution):
|
||||
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
assert open(env['MY_CERT_INI_FILE'], 'rb').read() == '[mycert]\nCERT123'
|
||||
assert open(env['MY_KEY_INI_FILE'], 'rb').read() == '[mykey]\nKEY123'
|
||||
assert open(env['MY_CERT_INI_FILE'], 'r').read() == '[mycert]\nCERT123'
|
||||
assert open(env['MY_KEY_INI_FILE'], 'r').read() == '[mykey]\nKEY123'
|
||||
return ['successful', 0]
|
||||
|
||||
self.run_pexpect.side_effect = run_pexpect_side_effect
|
||||
@@ -1820,7 +1820,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
assert 'AWS_SECRET_ACCESS_KEY' not in env
|
||||
assert 'EC2_INI_PATH' in env
|
||||
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(env['EC2_INI_PATH'])
|
||||
assert 'ec2' in config.sections()
|
||||
return ['successful', 0]
|
||||
@@ -1895,7 +1895,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
assert env['AWS_SECRET_ACCESS_KEY'] == 'secret'
|
||||
assert 'EC2_INI_PATH' in env
|
||||
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(env['EC2_INI_PATH'])
|
||||
assert 'ec2' in config.sections()
|
||||
return ['successful', 0]
|
||||
@@ -1921,7 +1921,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(env['VMWARE_INI_PATH'])
|
||||
assert config.get('vmware', 'username') == 'bob'
|
||||
assert config.get('vmware', 'password') == 'secret'
|
||||
@@ -1963,7 +1963,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
assert env['AZURE_SUBSCRIPTION_ID'] == 'some-subscription'
|
||||
assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar'
|
||||
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(env['AZURE_INI_PATH'])
|
||||
assert config.get('azure', 'include_powerstate') == 'yes'
|
||||
assert config.get('azure', 'group_by_resource_group') == 'no'
|
||||
@@ -2008,7 +2008,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
assert env['AZURE_PASSWORD'] == 'secret'
|
||||
assert env['AZURE_CLOUD_ENVIRONMENT'] == 'foobar'
|
||||
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(env['AZURE_INI_PATH'])
|
||||
assert config.get('azure', 'include_powerstate') == 'yes'
|
||||
assert config.get('azure', 'group_by_resource_group') == 'no'
|
||||
@@ -2054,7 +2054,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
assert json_data['client_email'] == 'bob'
|
||||
assert json_data['project_id'] == 'some-project'
|
||||
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(env['GCE_INI_PATH'])
|
||||
assert 'cache' in config.sections()
|
||||
assert config.getint('cache', 'cache_max_age') == 0
|
||||
@@ -2091,7 +2091,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
shade_config = open(env['OS_CLIENT_CONFIG_FILE'], 'rb').read()
|
||||
shade_config = open(env['OS_CLIENT_CONFIG_FILE'], 'r').read()
|
||||
assert '\n'.join([
|
||||
'clouds:',
|
||||
' devstack:',
|
||||
@@ -2131,7 +2131,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(env['FOREMAN_INI_PATH'])
|
||||
assert config.get('foreman', 'url') == 'https://example.org'
|
||||
assert config.get('foreman', 'user') == 'bob'
|
||||
@@ -2168,7 +2168,7 @@ class TestInventoryUpdateCredentials(TestJobExecution):
|
||||
|
||||
def run_pexpect_side_effect(*args, **kwargs):
|
||||
args, cwd, env, stdout = args
|
||||
config = ConfigParser.ConfigParser()
|
||||
config = configparser.ConfigParser()
|
||||
config.read(env['CLOUDFORMS_INI_PATH'])
|
||||
assert config.get('cloudforms', 'url') == 'https://example.org'
|
||||
assert config.get('cloudforms', 'username') == 'bob'
|
||||
@@ -2276,7 +2276,7 @@ def test_os_open_oserror():
|
||||
|
||||
|
||||
def test_fcntl_ioerror():
|
||||
with pytest.raises(IOError):
|
||||
with pytest.raises(OSError):
|
||||
fcntl.flock(99999, fcntl.LOCK_EX)
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import pytest
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
# Django REST Framework
|
||||
from rest_framework import exceptions
|
||||
|
||||
@@ -7,7 +7,7 @@ import pytest
|
||||
from uuid import uuid4
|
||||
import json
|
||||
import yaml
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
from backports.tempfile import TemporaryDirectory
|
||||
from django.conf import settings
|
||||
|
||||
@@ -10,14 +10,14 @@ def test_encrypt_field():
|
||||
field = Setting(pk=123, value='ANSIBLE')
|
||||
encrypted = field.value = encryption.encrypt_field(field, 'value')
|
||||
assert encryption.decrypt_field(field, 'value') == 'ANSIBLE'
|
||||
assert encrypted.startswith('$encrypted$AESCBC$')
|
||||
assert encrypted.startswith('$encrypted$UTF8$AESCBC$')
|
||||
|
||||
|
||||
def test_encrypt_field_without_pk():
|
||||
field = Setting(value='ANSIBLE')
|
||||
encrypted = field.value = encryption.encrypt_field(field, 'value')
|
||||
assert encryption.decrypt_field(field, 'value') == 'ANSIBLE'
|
||||
assert encrypted.startswith('$encrypted$AESCBC$')
|
||||
assert encrypted.startswith('$encrypted$UTF8$AESCBC$')
|
||||
|
||||
|
||||
def test_encrypt_field_with_unicode_string():
|
||||
@@ -28,19 +28,11 @@ def test_encrypt_field_with_unicode_string():
|
||||
assert encrypted.startswith('$encrypted$UTF8$AESCBC$')
|
||||
|
||||
|
||||
def test_encrypt_field_force_disable_unicode():
|
||||
value = u"NothingSpecial"
|
||||
field = Setting(value=value)
|
||||
encrypted = field.value = encryption.encrypt_field(field, 'value', skip_utf8=True)
|
||||
assert "UTF8" not in encrypted
|
||||
assert encryption.decrypt_field(field, 'value') == value
|
||||
|
||||
|
||||
def test_encrypt_subfield():
|
||||
field = Setting(value={'name': 'ANSIBLE'})
|
||||
encrypted = field.value = encryption.encrypt_field(field, 'value', subfield='name')
|
||||
assert encryption.decrypt_field(field, 'value', subfield='name') == 'ANSIBLE'
|
||||
assert encrypted.startswith('$encrypted$AESCBC$')
|
||||
assert encrypted.startswith('$encrypted$UTF8$AESCBC$')
|
||||
|
||||
|
||||
def test_encrypt_field_with_ask():
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
import pytest
|
||||
import base64
|
||||
import json
|
||||
from StringIO import StringIO
|
||||
|
||||
from six.moves import xrange
|
||||
from io import StringIO
|
||||
|
||||
from django.utils.encoding import smart_bytes, smart_str
|
||||
from awx.main.utils import OutputEventFilter, OutputVerboseFilter
|
||||
|
||||
MAX_WIDTH = 78
|
||||
@@ -12,14 +11,14 @@ EXAMPLE_UUID = '890773f5-fe6d-4091-8faf-bdc8021d65dd'
|
||||
|
||||
|
||||
def write_encoded_event_data(fileobj, data):
|
||||
b64data = base64.b64encode(json.dumps(data))
|
||||
b64data = smart_str(base64.b64encode(smart_bytes(json.dumps(data))))
|
||||
# pattern corresponding to OutputEventFilter expectation
|
||||
fileobj.write(u'\x1b[K')
|
||||
for offset in xrange(0, len(b64data), MAX_WIDTH):
|
||||
fileobj.write('\x1b[K')
|
||||
for offset in range(0, len(b64data), MAX_WIDTH):
|
||||
chunk = b64data[offset:offset + MAX_WIDTH]
|
||||
escaped_chunk = u'{}\x1b[{}D'.format(chunk, len(chunk))
|
||||
escaped_chunk = '{}\x1b[{}D'.format(chunk, len(chunk))
|
||||
fileobj.write(escaped_chunk)
|
||||
fileobj.write(u'\x1b[K')
|
||||
fileobj.write('\x1b[K')
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
|
||||
# Python
|
||||
import pytest
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
# AWX
|
||||
from awx.main.utils.filters import SmartFilter, ExternalLoggerEnabled
|
||||
@@ -120,7 +120,7 @@ class TestSmartFilterQueryFromString():
|
||||
def test_invalid_filter_strings(self, mock_get_host_model, filter_string):
|
||||
with pytest.raises(RuntimeError) as e:
|
||||
SmartFilter.query_from_string(filter_string)
|
||||
assert e.value.message == u"Invalid query " + filter_string
|
||||
assert str(e.value) == u"Invalid query " + filter_string
|
||||
|
||||
@pytest.mark.parametrize("filter_string", [
|
||||
'created_by__password__icontains=pbkdf2'
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import base64
|
||||
import cStringIO
|
||||
import logging
|
||||
import socket
|
||||
import datetime
|
||||
from dateutil.tz import tzutc
|
||||
from io import StringIO
|
||||
from uuid import uuid4
|
||||
|
||||
import mock
|
||||
from unittest import mock
|
||||
|
||||
from django.conf import LazySettings
|
||||
from django.utils.encoding import smart_str
|
||||
import pytest
|
||||
import requests
|
||||
from requests_futures.sessions import FuturesSession
|
||||
@@ -52,7 +53,7 @@ def connection_error_adapter():
|
||||
|
||||
@pytest.fixture
|
||||
def fake_socket(tmpdir_factory, request):
|
||||
sok = socket._socketobject
|
||||
sok = socket.socket
|
||||
sok.send = mock.MagicMock()
|
||||
sok.connect = mock.MagicMock()
|
||||
sok.setblocking = mock.MagicMock()
|
||||
@@ -255,7 +256,7 @@ def test_https_logging_handler_connection_error(connection_error_adapter,
|
||||
handler.setFormatter(LogstashFormatter())
|
||||
handler.session.mount('http://', connection_error_adapter)
|
||||
|
||||
buff = cStringIO.StringIO()
|
||||
buff = StringIO()
|
||||
logging.getLogger('awx.main.utils.handlers').addHandler(
|
||||
logging.StreamHandler(buff)
|
||||
)
|
||||
@@ -308,7 +309,7 @@ def test_https_logging_handler_emit_logstash_with_creds(https_adapter,
|
||||
|
||||
assert len(https_adapter.requests) == 1
|
||||
request = https_adapter.requests[0]
|
||||
assert request.headers['Authorization'] == 'Basic %s' % base64.b64encode("user:pass")
|
||||
assert request.headers['Authorization'] == 'Basic %s' % smart_str(base64.b64encode(b"user:pass"))
|
||||
|
||||
|
||||
def test_https_logging_handler_emit_splunk_with_creds(https_adapter,
|
||||
@@ -331,7 +332,7 @@ def test_https_logging_handler_emit_splunk_with_creds(https_adapter,
|
||||
({u'测试键': u'测试值'}, '{"测试键": "测试值"}'),
|
||||
])
|
||||
def test_encode_payload_for_socket(payload, encoded_payload):
|
||||
assert _encode_payload_for_socket(payload) == encoded_payload
|
||||
assert _encode_payload_for_socket(payload).decode('utf-8') == encoded_payload
|
||||
|
||||
|
||||
def test_udp_handler_create_socket_at_init():
|
||||
|
||||
Reference in New Issue
Block a user