Introduce a new PageCache object

and split out get_post_fields into its own utility function.
This commit is contained in:
Jeff Bradberry
2020-04-13 15:40:47 -04:00
parent bb66e4633d
commit 1300d38e47
3 changed files with 68 additions and 59 deletions

View File

@@ -1,15 +1,13 @@
import itertools import itertools
import logging import logging
import re
from awxkit.api.resources import resources from awxkit.api.resources import resources
import awxkit.exceptions as exc import awxkit.exceptions as exc
from . import base from . import base
from . import page from . import page
from .. import utils
from ..mixins import has_create from ..mixins import has_create
descRE = re.compile(r'^[*] `(\w+)`: [^(]*\((\w+), ([^)]+)\)')
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@@ -60,59 +58,8 @@ class Api(base.Base):
page.register_page(resources.api, Api) page.register_page(resources.api, Api)
def parse_description(desc):
options = {}
for line in desc[desc.index('POST'):].splitlines():
match = descRE.match(line)
if not match:
continue
options[match.group(1)] = {'type': match.group(2),
'required': match.group(3) == 'required'}
return options
def remove_encrypted(value):
if value == '$encrypted$':
return ''
if isinstance(value, list):
return [remove_encrypted(item) for item in value]
if isinstance(value, dict):
return {k: remove_encrypted(v) for k, v in value.items()}
return value
class ApiV2(base.Base): class ApiV2(base.Base):
# Common import/export methods
def _get_options(self, _page):
if getattr(self, '_options', None) is None:
self._options = {}
url = _page.url if isinstance(_page, page.Page) else str(_page)
if url in self._options:
return self._options[url]
options = _page.options()
warning = options.r.headers.get('Warning', '')
if '299' in warning and 'deprecated' in warning:
return self._options.setdefault(url, None)
return self._options.setdefault(url, options)
def _get_post_fields(self, _page):
options_page = self._get_options(_page)
if options_page is None:
return None
if 'POST' not in options_page.r.headers.get('Allow', ''):
return None
if 'POST' in options_page.json['actions']:
return options_page.json['actions']['POST']
else:
return parse_description(options_page.json['description'])
# Export methods # Export methods
def _serialize_asset(self, asset, options): def _serialize_asset(self, asset, options):
@@ -147,7 +94,7 @@ class ApiV2(base.Base):
continue continue
rel = related_endpoint._create() rel = related_endpoint._create()
related_options = self._get_post_fields(related_endpoint) related_options = utils.get_post_fields(related_endpoint, self._cache)
if related_options is None: # This is a read-only endpoint. if related_options is None: # This is a read-only endpoint.
continue continue
is_attach = 'id' in related_options # This is not a create-only endpoint. is_attach = 'id' in related_options # This is not a create-only endpoint.
@@ -183,11 +130,11 @@ class ApiV2(base.Base):
return None return None
fields['natural_key'] = natural_key fields['natural_key'] = natural_key
return remove_encrypted(fields) return utils.remove_encrypted(fields)
def _get_assets(self, resource, value): def _get_assets(self, resource, value):
endpoint = getattr(self, resource) endpoint = getattr(self, resource)
options = self._get_post_fields(endpoint) options = utils.get_post_fields(endpoint, self._cache)
if options is None: if options is None:
return None return None
@@ -203,6 +150,8 @@ class ApiV2(base.Base):
return [asset for asset in assets if asset is not None] return [asset for asset in assets if asset is not None]
def export_assets(self, **kwargs): def export_assets(self, **kwargs):
self._cache = page.PageCache()
# If no resource kwargs are explicitly used, export everything. # If no resource kwargs are explicitly used, export everything.
all_resources = all(kwargs.get(resource) is None for resource in EXPORTABLE_RESOURCES) all_resources = all(kwargs.get(resource) is None for resource in EXPORTABLE_RESOURCES)
@@ -236,7 +185,7 @@ class ApiV2(base.Base):
def _register_existing_assets(self, resource): def _register_existing_assets(self, resource):
endpoint = getattr(self, resource) endpoint = getattr(self, resource)
options = self._get_post_fields(endpoint) options = utils.get_post_fields(endpoint, self._cache)
if options is None: if options is None:
return return
@@ -266,7 +215,7 @@ class ApiV2(base.Base):
return return
endpoint = getattr(self, resource) endpoint = getattr(self, resource)
options = self._get_post_fields(endpoint) options = utils.get_post_fields(endpoint, self._cache)
assets = data[resource] assets = data[resource]
for asset in assets: for asset in assets:
post_data = {} post_data = {}

View File

@@ -531,3 +531,24 @@ class TentativePage(str):
def __ne__(self, other): def __ne__(self, other):
return self.endpoint != other return self.endpoint != other
class PageCache(object):
def __init__(self):
self.options = {}
def get_options(self, page):
url = page.url if isinstance(page, Page) else str(page)
if url in self.options:
return self.options[url]
try:
options = page.options()
except exc.Common:
return self.options.setdefault(url, None)
warning = options.r.headers.get('Warning', '')
if '299' in warning and 'deprecated' in warning:
return self.options.setdefault(url, None)
return self.options.setdefault(url, options)

View File

@@ -0,0 +1,39 @@
import re
descRE = re.compile(r'^[*] `(\w+)`: [^(]*\((\w+), ([^)]+)\)')
def parse_description(desc):
options = {}
for line in desc[desc.index('POST'):].splitlines():
match = descRE.match(line)
if not match:
continue
options[match.group(1)] = {'type': match.group(2),
'required': match.group(3) == 'required'}
return options
def remove_encrypted(value):
if value == '$encrypted$':
return ''
if isinstance(value, list):
return [remove_encrypted(item) for item in value]
if isinstance(value, dict):
return {k: remove_encrypted(v) for k, v in value.items()}
return value
def get_post_fields(page, cache):
options_page = cache.get_options(page)
if options_page is None:
return None
if 'POST' not in options_page.r.headers.get('Allow', ''):
return None
if 'POST' in options_page.json['actions']:
return options_page.json['actions']['POST']
else:
return parse_description(options_page.json['description'])