diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index a93a347da1..59e5e4759e 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -1224,7 +1224,7 @@ class InventoryUpdate(UnifiedJob, InventorySourceOptions): update_fields = kwargs.get('update_fields', []) if bool(('license' in self.result_stdout or 'licensed' in self.result_stdout) and 'exceeded' in self.result_stdout and not self.license_error) or \ - bool('License has expired' in self.result_stdout or 'License count exceeded'): + bool(any(x in self.result_stdout for x in ('License has expired', 'License count exceeded'))): self.license_error = True if 'license_error' not in update_fields: update_fields.append('license_error') diff --git a/awx/main/tests/base.py b/awx/main/tests/base.py index a7f83ace09..c33c6fa8e8 100644 --- a/awx/main/tests/base.py +++ b/awx/main/tests/base.py @@ -177,12 +177,12 @@ class BaseTestMixin(QueueTestMixin): rnd_str = '____' + str(random.randint(1, 9999999)) return __name__ + '-generated-' + string + rnd_str - def create_test_license_file(self, instance_count=10000): + def create_test_license_file(self, instance_count=10000, license_date=int(time.time() + 3600)): writer = LicenseWriter( company_name='AWX', contact_name='AWX Admin', contact_email='awx@example.com', - license_date=int(time.time() + 3600), + license_date=license_date, instance_count=instance_count) handle, license_path = tempfile.mkstemp(suffix='.json') os.close(handle) diff --git a/awx/main/tests/inventory.py b/awx/main/tests/inventory.py index ef35c407ae..62741c1678 100644 --- a/awx/main/tests/inventory.py +++ b/awx/main/tests/inventory.py @@ -8,6 +8,7 @@ import json import os import re import tempfile +import time # Django from django.conf import settings @@ -1899,6 +1900,29 @@ class InventoryUpdatesTest(BaseTransactionTest): with self.current_user(self.super_django_user): self.put(other_inv_src, other_inv_src_opts, expect=400) + def test_update_expired_license(self): + self.create_test_license_file(license_date=int(time.time() - 3600)) + inventory_scripts = reverse('api:inventory_script_list') + new_script = dict(name="Test", description="Test Script", script=TEST_SIMPLE_INVENTORY_SCRIPT, organization=self.organization.id) + script_data = self.post(inventory_scripts, data=new_script, expect=201, auth=self.get_super_credentials()) + + custom_inv = self.organization.inventories.create(name='Custom Script Inventory') + custom_group = custom_inv.groups.create(name="Custom Script Group") + custom_inv_src = reverse('api:inventory_source_detail', + args=(custom_group.inventory_source.pk,)) + reverse('api:inventory_source_update_view', + args=(custom_group.inventory_source.pk,)) + inv_src_opts = {'source': 'custom', + 'source_script': script_data["id"], + 'source_vars': json.dumps({'HOME': 'no-place-like', 'USER': 'notme', '_': 'nope', 'INVENTORY_SOURCE_ID': -1}) + } + with self.current_user(self.super_django_user): + response = self.put(custom_inv_src, inv_src_opts, expect=200) + + inventory_source = InventorySource.objects.get(pk=response['id']) + inventory_update = inventory_source.update(inventory_source=inventory_source) + self.assertFalse(inventory_update.license_error) + def test_update_from_openstack(self): api_url = getattr(settings, 'TEST_OPENSTACK_HOST', '') api_user = getattr(settings, 'TEST_OPENSTACK_USER', '')