mirror of
https://github.com/ansible/awx.git
synced 2026-01-15 03:40:42 -03:30
Removal of OAuth2 stuff from CLI
also from awxkit generally Remove login command
This commit is contained in:
parent
670b7e7754
commit
6599f3f827
@ -34,7 +34,8 @@ class ControllerAWXKitModule(ControllerModule):
|
||||
def authenticate(self):
|
||||
try:
|
||||
if self.oauth_token:
|
||||
self.connection.login(None, None, token=self.oauth_token)
|
||||
# MERGE: fix conflicts with removal of OAuth2 token from collection branch
|
||||
self.connection.login(None, None)
|
||||
self.authenticated = True
|
||||
elif self.username:
|
||||
self.connection.login(username=self.username, password=self.password)
|
||||
|
||||
@ -86,11 +86,6 @@ options:
|
||||
- workflow names, IDs, or named URLs to export
|
||||
type: list
|
||||
elements: str
|
||||
applications:
|
||||
description:
|
||||
- OAuth2 application names, IDs, or named URLs to export
|
||||
type: list
|
||||
elements: str
|
||||
schedules:
|
||||
description:
|
||||
- schedule names, IDs, or named URLs to export
|
||||
|
||||
@ -23,7 +23,6 @@ ASSETS = set([
|
||||
"job_templates",
|
||||
"workflow_job_templates",
|
||||
"execution_environments",
|
||||
"applications",
|
||||
"schedules",
|
||||
])
|
||||
|
||||
|
||||
@ -13,15 +13,6 @@ class ConnectionException(exc.Common):
|
||||
pass
|
||||
|
||||
|
||||
class Token_Auth(requests.auth.AuthBase):
|
||||
def __init__(self, token):
|
||||
self.token = token
|
||||
|
||||
def __call__(self, request):
|
||||
request.headers['Authorization'] = 'Bearer {0.token}'.format(self)
|
||||
return request
|
||||
|
||||
|
||||
def log_elapsed(r, *args, **kwargs): # requests hook to display API elapsed time
|
||||
log.debug('"{0.request.method} {0.url}" elapsed: {0.elapsed}'.format(r))
|
||||
|
||||
@ -47,7 +38,7 @@ class Connection(object):
|
||||
self.get(config.api_base_path) # this causes a cookie w/ the CSRF token to be set
|
||||
return dict(next=next)
|
||||
|
||||
def login(self, username=None, password=None, token=None, **kwargs):
|
||||
def login(self, username=None, password=None, **kwargs):
|
||||
if username and password:
|
||||
_next = kwargs.get('next')
|
||||
if _next:
|
||||
@ -62,8 +53,6 @@ class Connection(object):
|
||||
self.uses_session_cookie = True
|
||||
else:
|
||||
self.session.auth = (username, password)
|
||||
elif token:
|
||||
self.session.auth = Token_Auth(token)
|
||||
else:
|
||||
self.session.auth = None
|
||||
|
||||
|
||||
@ -4,13 +4,11 @@ from .base import * # NOQA
|
||||
from .bulk import * # NOQA
|
||||
from .access_list import * # NOQA
|
||||
from .api import * # NOQA
|
||||
from .authtoken import * # NOQA
|
||||
from .roles import * # NOQA
|
||||
from .organizations import * # NOQA
|
||||
from .notifications import * # NOQA
|
||||
from .notification_templates import * # NOQA
|
||||
from .users import * # NOQA
|
||||
from .applications import * # NOQA
|
||||
from .teams import * # NOQA
|
||||
from .credentials import * # NOQA
|
||||
from .unified_jobs import * # NOQA
|
||||
|
||||
@ -25,7 +25,6 @@ EXPORTABLE_RESOURCES = [
|
||||
'job_templates',
|
||||
'workflow_job_templates',
|
||||
'execution_environments',
|
||||
'applications',
|
||||
'schedules',
|
||||
]
|
||||
|
||||
|
||||
@ -1,82 +0,0 @@
|
||||
from awxkit.utils import random_title, update_payload, filter_by_class, PseudoNamespace
|
||||
from awxkit.api.resources import resources
|
||||
from awxkit.api.pages import Organization
|
||||
from awxkit.api.mixins import HasCreate, DSAdapter
|
||||
|
||||
from . import page
|
||||
from . import base
|
||||
|
||||
|
||||
class OAuth2Application(HasCreate, base.Base):
|
||||
dependencies = [Organization]
|
||||
NATURAL_KEY = ('organization', 'name')
|
||||
|
||||
def payload(self, **kwargs):
|
||||
payload = PseudoNamespace(
|
||||
name=kwargs.get('name') or 'OAuth2Application - {}'.format(random_title()),
|
||||
description=kwargs.get('description') or random_title(10),
|
||||
client_type=kwargs.get('client_type', 'public'),
|
||||
authorization_grant_type=kwargs.get('authorization_grant_type', 'password'),
|
||||
)
|
||||
if kwargs.get('organization'):
|
||||
payload.organization = kwargs['organization'].id
|
||||
|
||||
optional_fields = ('redirect_uris', 'skip_authorization')
|
||||
update_payload(payload, optional_fields, kwargs)
|
||||
return payload
|
||||
|
||||
def create_payload(self, organization=Organization, **kwargs):
|
||||
self.create_and_update_dependencies(*filter_by_class((organization, Organization)))
|
||||
organization = self.ds.organization if organization else None
|
||||
payload = self.payload(organization=organization, **kwargs)
|
||||
payload.ds = DSAdapter(self.__class__.__name__, self._dependency_store)
|
||||
return payload
|
||||
|
||||
def create(self, organization=Organization, **kwargs):
|
||||
payload = self.create_payload(organization=organization, **kwargs)
|
||||
return self.update_identity(OAuth2Applications(self.connection).post(payload))
|
||||
|
||||
|
||||
page.register_page((resources.application, (resources.applications, 'post')), OAuth2Application)
|
||||
|
||||
|
||||
class OAuth2Applications(page.PageList, OAuth2Application):
|
||||
pass
|
||||
|
||||
|
||||
page.register_page(resources.applications, OAuth2Applications)
|
||||
|
||||
|
||||
class OAuth2AccessToken(HasCreate, base.Base):
|
||||
optional_dependencies = [OAuth2Application]
|
||||
|
||||
def payload(self, **kwargs):
|
||||
payload = PseudoNamespace(description=kwargs.get('description') or random_title(10), scope=kwargs.get('scope', 'write'))
|
||||
|
||||
if kwargs.get('oauth_2_application'):
|
||||
payload.application = kwargs['oauth_2_application'].id
|
||||
|
||||
optional_fields = ('expires',)
|
||||
update_payload(payload, optional_fields, kwargs)
|
||||
return payload
|
||||
|
||||
def create_payload(self, oauth_2_application=None, **kwargs):
|
||||
self.create_and_update_dependencies(*filter_by_class((oauth_2_application, OAuth2Application)))
|
||||
oauth_2_application = self.ds.oauth_2_application if oauth_2_application else None
|
||||
payload = self.payload(oauth_2_application=oauth_2_application, **kwargs)
|
||||
payload.ds = DSAdapter(self.__class__.__name__, self._dependency_store)
|
||||
return payload
|
||||
|
||||
def create(self, oauth_2_application=None, **kwargs):
|
||||
payload = self.create_payload(oauth_2_application=oauth_2_application, **kwargs)
|
||||
return self.update_identity(OAuth2AccessTokens(self.connection).post(payload))
|
||||
|
||||
|
||||
page.register_page((resources.token, (resources.tokens, 'post')), OAuth2AccessToken)
|
||||
|
||||
|
||||
class OAuth2AccessTokens(page.PageList, OAuth2AccessToken):
|
||||
pass
|
||||
|
||||
|
||||
page.register_page(resources.tokens, OAuth2AccessTokens)
|
||||
@ -1,10 +0,0 @@
|
||||
from awxkit.api.resources import resources
|
||||
from . import base
|
||||
from . import page
|
||||
|
||||
|
||||
class AuthToken(base.Base):
|
||||
pass
|
||||
|
||||
|
||||
page.register_page(resources.authtoken, AuthToken)
|
||||
@ -1,23 +1,13 @@
|
||||
import collections
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from requests.auth import HTTPBasicAuth
|
||||
|
||||
from awxkit.api.pages import Page, get_registered_page, exception_from_status_code
|
||||
from awxkit.api.pages import Page
|
||||
from awxkit.config import config
|
||||
from awxkit.api.resources import resources
|
||||
import awxkit.exceptions as exc
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AuthUrls(typing.TypedDict):
|
||||
access_token: str
|
||||
personal_token: str
|
||||
|
||||
|
||||
class Base(Page):
|
||||
def silent_delete(self):
|
||||
"""Delete the object. If it's already deleted, ignore the error"""
|
||||
@ -135,66 +125,6 @@ class Base(Page):
|
||||
for obj_role in Roles(self.connection, endpoint=url).get().json.results:
|
||||
yield Role(self.connection, endpoint=obj_role.url).get()
|
||||
|
||||
def get_authtoken(self, username='', password=''):
|
||||
default_cred = config.credentials.default
|
||||
payload = dict(username=username or default_cred.username, password=password or default_cred.password)
|
||||
auth_url = resources.authtoken
|
||||
return get_registered_page(auth_url)(self.connection, endpoint=auth_url).post(payload).token
|
||||
|
||||
def load_authtoken(self, username='', password=''):
|
||||
self.connection.login(token=self.get_authtoken(username, password))
|
||||
return self
|
||||
|
||||
load_default_authtoken = load_authtoken
|
||||
|
||||
def _request_token(self, auth_urls, username, password, client_id, description, client_secret, scope):
|
||||
req = collections.namedtuple('req', 'headers')({})
|
||||
if client_id and client_secret:
|
||||
HTTPBasicAuth(client_id, client_secret)(req)
|
||||
req.headers['Content-Type'] = 'application/x-www-form-urlencoded'
|
||||
resp = self.connection.post(
|
||||
auth_urls["access_token"],
|
||||
data={"grant_type": "password", "username": username, "password": password, "scope": scope},
|
||||
headers=req.headers,
|
||||
)
|
||||
elif client_id:
|
||||
req.headers['Content-Type'] = 'application/x-www-form-urlencoded'
|
||||
resp = self.connection.post(
|
||||
auth_urls["access_token"],
|
||||
data={"grant_type": "password", "username": username, "password": password, "client_id": client_id, "scope": scope},
|
||||
headers=req.headers,
|
||||
)
|
||||
else:
|
||||
HTTPBasicAuth(username, password)(req)
|
||||
resp = self.connection.post(
|
||||
auth_urls['personal_token'],
|
||||
json={"description": description, "application": None, "scope": scope},
|
||||
headers=req.headers,
|
||||
)
|
||||
if resp.ok:
|
||||
result = resp.json()
|
||||
if client_id:
|
||||
return result.pop('access_token', None)
|
||||
else:
|
||||
return result.pop('token', None)
|
||||
else:
|
||||
raise exception_from_status_code(resp.status_code)
|
||||
|
||||
def get_oauth2_token(self, username='', password='', client_id=None, description='AWX CLI', client_secret=None, scope='write'):
|
||||
default_cred = config.credentials.default
|
||||
username = username or default_cred.username
|
||||
password = password or default_cred.password
|
||||
# Try gateway first, fallback to controller
|
||||
urls: AuthUrls = {"access_token": "/o/token/", "personal_token": f"{config.gateway_base_path}v1/tokens/"}
|
||||
try:
|
||||
return self._request_token(urls, username, password, client_id, description, client_secret, scope)
|
||||
except exc.NotFound:
|
||||
urls = {
|
||||
"access_token": f"{config.api_base_path}o/token/",
|
||||
"personal_token": f"{config.api_base_path}v2/users/{username}/personal_tokens/",
|
||||
}
|
||||
return self._request_token(urls, username, password, client_id, description, client_secret, scope)
|
||||
|
||||
def load_session(self, username='', password=''):
|
||||
default_cred = config.credentials.default
|
||||
self.connection.login(
|
||||
|
||||
@ -12,10 +12,7 @@ class Resources(object):
|
||||
_ad_hoc_related_cancel = r'ad_hoc_commands/\d+/cancel/'
|
||||
_ad_hoc_relaunch = r'ad_hoc_commands/\d+/relaunch/'
|
||||
_ansible_facts = r'hosts/\d+/ansible_facts/'
|
||||
_application = r'applications/\d+/'
|
||||
_applications = 'applications/'
|
||||
_auth = 'auth/'
|
||||
_authtoken = 'authtoken/'
|
||||
_bulk = 'bulk/'
|
||||
_bulk_job_launch = 'bulk/job_launch/'
|
||||
_config = 'config/'
|
||||
@ -233,8 +230,6 @@ class Resources(object):
|
||||
_team_permissions = r'teams/\d+/permissions/'
|
||||
_team_users = r'teams/\d+/users/'
|
||||
_teams = 'teams/'
|
||||
_token = r'tokens/\d+/'
|
||||
_tokens = 'tokens/'
|
||||
_unified_job_template = r'unified_job_templates/\d+/'
|
||||
_unified_job_templates = 'unified_job_templates/'
|
||||
_unified_jobs = 'unified_jobs/'
|
||||
@ -282,12 +277,7 @@ class Resources(object):
|
||||
def __getattr__(self, resource):
|
||||
if resource[:3] == '___':
|
||||
raise AttributeError('No existing resource: {}'.format(resource))
|
||||
# Currently we don't handle anything under:
|
||||
# /api/o/
|
||||
# /api/login/
|
||||
# /api/logout/
|
||||
# If/when we do we will probably need to modify this __getattr__ method
|
||||
# Also, if we add another API version, this would be handled here
|
||||
# If/when we add another API version, this would be handled here
|
||||
prefix = 'v2'
|
||||
resource = '_' + resource
|
||||
return '{0}{1}'.format(getattr(self, prefix), getattr(self, resource))
|
||||
|
||||
@ -73,7 +73,6 @@ def check_related(resource):
|
||||
@contextmanager
|
||||
def as_user(v, username, password=None):
|
||||
"""Context manager to allow running tests as an alternative login user."""
|
||||
access_token = False
|
||||
if not isinstance(v, api.client.Connection):
|
||||
connection = v.connection
|
||||
else:
|
||||
@ -83,11 +82,6 @@ def as_user(v, username, password=None):
|
||||
password = username.password
|
||||
username = username.username
|
||||
|
||||
if isinstance(username, api.OAuth2AccessToken):
|
||||
access_token = username.token
|
||||
username = None
|
||||
password = None
|
||||
|
||||
try:
|
||||
if config.use_sessions:
|
||||
session_id = None
|
||||
@ -101,10 +95,7 @@ def as_user(v, username, password=None):
|
||||
break
|
||||
if session_id:
|
||||
del connection.session.cookies[connection.session_cookie_name]
|
||||
if access_token:
|
||||
kwargs = dict(token=access_token)
|
||||
else:
|
||||
kwargs = connection.get_session_requirements()
|
||||
kwargs = connection.get_session_requirements()
|
||||
else:
|
||||
previous_auth = connection.session.auth
|
||||
kwargs = dict()
|
||||
@ -112,8 +103,6 @@ def as_user(v, username, password=None):
|
||||
yield
|
||||
finally:
|
||||
if config.use_sessions:
|
||||
if access_token:
|
||||
connection.session.auth = None
|
||||
del connection.session.cookies[connection.session_cookie_name]
|
||||
if session_id:
|
||||
connection.session.cookies.set(connection.session_cookie_name, session_id, domain=domain)
|
||||
|
||||
@ -82,17 +82,9 @@ class CLI(object):
|
||||
return '--help' in self.argv or '-h' in self.argv
|
||||
|
||||
def authenticate(self):
|
||||
"""Configure the current session (or OAuth2.0 token)"""
|
||||
token = self.get_config('token')
|
||||
if token:
|
||||
self.root.connection.login(
|
||||
None,
|
||||
None,
|
||||
token=token,
|
||||
)
|
||||
else:
|
||||
config.use_sessions = True
|
||||
self.root.load_session().get()
|
||||
"""Configure the current session for basic auth"""
|
||||
config.use_sessions = True
|
||||
self.root.load_session().get()
|
||||
|
||||
def connect(self):
|
||||
"""Fetch top-level resources from /api/v2"""
|
||||
@ -141,7 +133,7 @@ class CLI(object):
|
||||
"""Attempt to parse the <resource> (e.g., jobs) specified on the CLI
|
||||
|
||||
If a valid resource is discovered, the user will be authenticated
|
||||
(either via an OAuth2.0 token or session-based auth) and the remaining
|
||||
(via session-based auth) and the remaining
|
||||
CLI arguments will be processed (to determine the requested action
|
||||
e.g., list, create, delete)
|
||||
|
||||
|
||||
@ -88,6 +88,3 @@ A few of the most important ones are:
|
||||
|
||||
``--conf.password, CONTROLLER_PASSWORD``
|
||||
the AWX password to use for authentication
|
||||
|
||||
``--conf.token, CONTROLLER_OAUTH_TOKEN``
|
||||
an OAuth2.0 token to use for authentication
|
||||
|
||||
@ -30,12 +30,6 @@ def add_authentication_arguments(parser, env):
|
||||
default=env.get('CONTROLLER_HOST', env.get('TOWER_HOST', 'https://127.0.0.1:443')),
|
||||
metavar='https://example.awx.org',
|
||||
)
|
||||
auth.add_argument(
|
||||
'--conf.token',
|
||||
default=env.get('CONTROLLER_OAUTH_TOKEN', env.get('CONTROLLER_TOKEN', env.get('TOWER_OAUTH_TOKEN', env.get('TOWER_TOKEN', '')))),
|
||||
help='an OAuth2.0 token (get one by using `awx login`)',
|
||||
metavar='TEXT',
|
||||
)
|
||||
|
||||
config_username, config_password = get_config_credentials()
|
||||
# options configured via cli args take higher precedence than those from the config
|
||||
|
||||
@ -1,14 +1,12 @@
|
||||
import yaml
|
||||
import json
|
||||
import os
|
||||
|
||||
from awxkit import api, config, yaml_file
|
||||
from awxkit import config, yaml_file
|
||||
from awxkit.exceptions import ImportExportError
|
||||
from awxkit.utils import to_str
|
||||
from awxkit.api.pages import Page
|
||||
from awxkit.api.pages.api import EXPORTABLE_RESOURCES
|
||||
from awxkit.cli.format import FORMATTERS, format_response, add_authentication_arguments, add_formatting_import_export
|
||||
from awxkit.cli.utils import CustomRegistryMeta, cprint
|
||||
from awxkit.cli.format import format_response, add_formatting_import_export
|
||||
from awxkit.cli.utils import CustomRegistryMeta
|
||||
|
||||
|
||||
CONTROL_RESOURCES = ['ping', 'config', 'me', 'metrics', 'mesh_visualizer']
|
||||
@ -66,44 +64,6 @@ class CustomCommand(metaclass=CustomRegistryMeta):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class Login(CustomCommand):
|
||||
name = 'login'
|
||||
help_text = 'authenticate and retrieve an OAuth2 token'
|
||||
|
||||
def print_help(self, parser):
|
||||
add_authentication_arguments(parser, os.environ)
|
||||
parser.print_help()
|
||||
|
||||
def handle(self, client, parser):
|
||||
auth = parser.add_argument_group('OAuth2.0 Options')
|
||||
auth.add_argument('--description', help='description of the generated OAuth2.0 token', metavar='TEXT')
|
||||
auth.add_argument('--conf.client_id', metavar='TEXT')
|
||||
auth.add_argument('--conf.client_secret', metavar='TEXT')
|
||||
auth.add_argument('--conf.scope', choices=['read', 'write'], default='write')
|
||||
if client.help:
|
||||
self.print_help(parser)
|
||||
raise SystemExit()
|
||||
parsed = parser.parse_known_args()[0]
|
||||
kwargs = {
|
||||
'client_id': getattr(parsed, 'conf.client_id', None),
|
||||
'client_secret': getattr(parsed, 'conf.client_secret', None),
|
||||
'scope': getattr(parsed, 'conf.scope', None),
|
||||
}
|
||||
if getattr(parsed, 'description', None):
|
||||
kwargs['description'] = parsed.description
|
||||
try:
|
||||
token = api.Api().get_oauth2_token(**kwargs)
|
||||
except Exception as e:
|
||||
self.print_help(parser)
|
||||
cprint('Error retrieving an OAuth2.0 token ({}).'.format(e.__class__), 'red')
|
||||
else:
|
||||
fmt = client.get_config('format')
|
||||
if fmt == 'human':
|
||||
print('export CONTROLLER_OAUTH_TOKEN={}'.format(token))
|
||||
else:
|
||||
print(to_str(FORMATTERS[fmt]({'token': token}, '.')).strip())
|
||||
|
||||
|
||||
class Config(CustomCommand):
|
||||
name = 'config'
|
||||
help_text = 'print current configuration values'
|
||||
@ -114,7 +74,6 @@ class Config(CustomCommand):
|
||||
raise SystemExit()
|
||||
return {
|
||||
'base_url': config.base_url,
|
||||
'token': client.get_config('token'),
|
||||
'use_sessions': config.use_sessions,
|
||||
'credentials': config.credentials,
|
||||
}
|
||||
|
||||
@ -37,7 +37,7 @@ class WSClient(object):
|
||||
'control': ['limit_reached']}
|
||||
e.x:
|
||||
```
|
||||
ws = WSClient(token, port=8013, secure=False).connect()
|
||||
ws = WSClient(port=8013, secure=False, session_id='xyz', csrftoken='abc').connect()
|
||||
ws.job_details()
|
||||
... # launch job
|
||||
job_messages = [msg for msg in ws]
|
||||
@ -52,7 +52,6 @@ class WSClient(object):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
token=None,
|
||||
hostname='',
|
||||
port=443,
|
||||
secure=True,
|
||||
@ -80,15 +79,12 @@ class WSClient(object):
|
||||
self.suffix = ws_suffix
|
||||
self._use_ssl = secure
|
||||
self.hostname = hostname
|
||||
self.token = token
|
||||
self.session_id = session_id
|
||||
self.csrftoken = csrftoken
|
||||
self._recv_queue = Queue()
|
||||
self._ws_closed = False
|
||||
self._ws_connected_flag = threading.Event()
|
||||
if self.token is not None:
|
||||
auth_cookie = 'token="{0.token}";'.format(self)
|
||||
elif self.session_id is not None:
|
||||
if self.session_id is not None:
|
||||
auth_cookie = '{1}="{0.session_id}"'.format(self, session_cookie_name)
|
||||
if self.csrftoken:
|
||||
auth_cookie += ';csrftoken={0.csrftoken}'.format(self)
|
||||
|
||||
@ -1,9 +1,6 @@
|
||||
from http.client import NOT_FOUND
|
||||
import pytest
|
||||
from pytest_mock import MockerFixture
|
||||
from requests import Response
|
||||
|
||||
from awxkit.api.pages import Base
|
||||
from awxkit.config import config
|
||||
|
||||
|
||||
@ -22,38 +19,3 @@ def response(mocker):
|
||||
"access_token": "my_token",
|
||||
}
|
||||
return r
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("auth_creds", "url", "token"),
|
||||
[
|
||||
({"client_id": "foo", "client_secret": "bar"}, "/o/token/", "my_token"),
|
||||
({"client_id": "foo"}, "/o/token/", "my_token"),
|
||||
({}, "/api/gateway/v1/tokens/", "my_personal_token"),
|
||||
],
|
||||
)
|
||||
def test_get_oauth2_token_from_gateway(mocker: MockerFixture, response: Response, auth_creds, url, token):
|
||||
post = mocker.patch("requests.Session.post", return_value=response)
|
||||
base = Base()
|
||||
ret = base.get_oauth2_token(**auth_creds)
|
||||
assert post.call_count == 1
|
||||
assert post.call_args.args[0] == url
|
||||
assert ret == token
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("auth_creds", "url", "token"),
|
||||
[
|
||||
({"client_id": "foo", "client_secret": "bar"}, "/api/o/token/", "my_token"),
|
||||
({"client_id": "foo"}, "/api/o/token/", "my_token"),
|
||||
({}, "/api/v2/users/foo/personal_tokens/", "my_personal_token"),
|
||||
],
|
||||
)
|
||||
def test_get_oauth2_token_from_controller(mocker: MockerFixture, response: Response, auth_creds, url, token):
|
||||
type(response).ok = mocker.PropertyMock(side_effect=[False, True])
|
||||
post = mocker.patch("requests.Session.post", return_value=response)
|
||||
base = Base()
|
||||
ret = base.get_oauth2_token(**auth_creds)
|
||||
assert post.call_count == 2
|
||||
assert post.call_args.args[0] == url
|
||||
assert ret == token
|
||||
|
||||
@ -10,11 +10,10 @@ ParseResult = namedtuple("ParseResult", ["port", "hostname", "secure"])
|
||||
|
||||
|
||||
def test_explicit_hostname():
|
||||
client = WSClient("token", "some-hostname", 556, False)
|
||||
client = WSClient(hostname="some-hostname", port=556, secure=False)
|
||||
assert client.port == 556
|
||||
assert client.hostname == "some-hostname"
|
||||
assert client._use_ssl == False
|
||||
assert client.token == "token"
|
||||
|
||||
|
||||
def test_websocket_suffix():
|
||||
@ -35,7 +34,7 @@ def test_urlparsing(url, result):
|
||||
with patch("awxkit.ws.config") as mock_config:
|
||||
mock_config.base_url = url
|
||||
|
||||
client = WSClient("token")
|
||||
client = WSClient(hostname=None)
|
||||
assert client.port == result.port
|
||||
assert client.hostname == result.hostname
|
||||
assert client._use_ssl == result.secure
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user