Basic runtime enforcement of policy as code part 2 (#6875)

* Add `opa_query_path field` for Inventory, Organization and JobTemplate models (#6850)

Add `opa_query_path` model field to Inventory, Organizatio and JobTemplate. Add migration file and expose opa_query_path field in the related API serializers.

* Gather and evaluate `opa_query_path` fields and raise violation exceptions (#6864)

gather and evaluate all opa query related to a job execution during policy evaluation phase 

* Add OPA_AUTH_CUSTOM_HEADERS support (#6863)

* Extend policy input data serializers (#6890)

* Extend policy input data serializers

* Update help text for PaC related fields (#6891)

* Remove encrypted from OPA_AUTH_CUSTOMER_HEADER

Unable to encrypt a dict field

---------

Co-authored-by: Jiří Jeřábek (Jiri Jerabek) <Jerabekjirka@email.cz>
Co-authored-by: Alexander Saprykin <cutwatercore@gmail.com>
Co-authored-by: Tina Tien <98424339+tiyiprh@users.noreply.github.com>
This commit is contained in:
Hao Liu 2025-03-17 22:39:26 -04:00 committed by GitHub
parent e9f2a14ebd
commit bad4e630ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 472 additions and 95 deletions

View File

@ -7,6 +7,7 @@ import json
import logging
import re
import yaml
import urllib.parse
from collections import Counter, OrderedDict
from datetime import timedelta
from uuid import uuid4
@ -49,6 +50,9 @@ from ansible_base.lib.utils.models import get_type_for_model
from ansible_base.rbac.models import RoleEvaluation, ObjectRole
from ansible_base.rbac import permission_registry
# django-flags
from flags.state import flag_enabled
# AWX
from awx.main.access import get_user_capabilities
from awx.main.constants import ACTIVE_STATES, CENSOR_VALUE, org_role_to_permission
@ -688,7 +692,25 @@ class EmptySerializer(serializers.Serializer):
pass
class UnifiedJobTemplateSerializer(BaseSerializer):
class OpaQueryPathEnabledMixin(serializers.Serializer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not flag_enabled("FEATURE_POLICY_AS_CODE_ENABLED") and 'opa_query_path' in self.fields:
self.fields.pop('opa_query_path')
def validate_opa_query_path(self, value):
# Decode the URL and re-encode it
decoded_value = urllib.parse.unquote(value)
re_encoded_value = urllib.parse.quote(decoded_value, safe='/')
if value != re_encoded_value:
raise serializers.ValidationError(_("The URL must be properly encoded."))
return value
class UnifiedJobTemplateSerializer(BaseSerializer, OpaQueryPathEnabledMixin):
# As a base serializer, the capabilities prefetch is not used directly,
# instead they are derived from the Workflow Job Template Serializer and the Job Template Serializer, respectively.
capabilities_prefetch = []
@ -1322,12 +1344,12 @@ class OAuth2ApplicationSerializer(BaseSerializer):
return ret
class OrganizationSerializer(BaseSerializer):
class OrganizationSerializer(BaseSerializer, OpaQueryPathEnabledMixin):
show_capabilities = ['edit', 'delete']
class Meta:
model = Organization
fields = ('*', 'max_hosts', 'custom_virtualenv', 'default_environment')
fields = ('*', 'max_hosts', 'custom_virtualenv', 'default_environment', 'opa_query_path')
read_only_fields = ('*', 'custom_virtualenv')
def get_related(self, obj):
@ -1682,7 +1704,7 @@ class LabelsListMixin(object):
return res
class InventorySerializer(LabelsListMixin, BaseSerializerWithVariables):
class InventorySerializer(LabelsListMixin, BaseSerializerWithVariables, OpaQueryPathEnabledMixin):
show_capabilities = ['edit', 'delete', 'adhoc', 'copy']
capabilities_prefetch = ['admin', 'adhoc', {'copy': 'organization.inventory_admin'}]
@ -1703,6 +1725,7 @@ class InventorySerializer(LabelsListMixin, BaseSerializerWithVariables):
'inventory_sources_with_failures',
'pending_deletion',
'prevent_instance_group_fallback',
'opa_query_path',
)
def get_related(self, obj):
@ -3396,6 +3419,7 @@ class JobTemplateSerializer(JobTemplateMixin, UnifiedJobTemplateSerializer, JobO
'webhook_service',
'webhook_credential',
'prevent_instance_group_fallback',
'opa_query_path',
)
read_only_fields = ('*', 'custom_virtualenv')

View File

@ -1001,9 +1001,9 @@ if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to
register(
'OPA_HOST',
field_class=fields.CharField,
label=_('OPA Server Hostname'),
label=_('OPA server hostname'),
default='',
help_text=_('Host to connect to OPA service, when set to the default value of "" policy enforcement will be disabled.'),
help_text=_('The hostname used to connect to the OPA server. If empty, policy enforcement will be disabled.'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
@ -1012,9 +1012,9 @@ if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to
register(
'OPA_PORT',
field_class=fields.IntegerField,
label=_('OPA Server Port'),
label=_('OPA server port'),
default=8181,
help_text=_('Port to connect to OPA service, defaults to 8181.'),
help_text=_('The port used to connect to the OPA server. Defaults to 8181.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
@ -1022,9 +1022,9 @@ if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to
register(
'OPA_SSL',
field_class=fields.BooleanField,
label=_('Use SSL for OPA Connection'),
label=_('Use SSL for OPA connection'),
default=False,
help_text=_('Use SSL to connect to OPA service, defaults to False.'),
help_text=_('Enable or disable the use of SSL to connect to the OPA server. Defaults to false.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
@ -1032,10 +1032,10 @@ if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to
register(
'OPA_AUTH_TYPE',
field_class=fields.ChoiceField,
label=_('OPA Authentication Type'),
label=_('OPA authentication type'),
choices=[OPA_AUTH_TYPES.NONE, OPA_AUTH_TYPES.TOKEN, OPA_AUTH_TYPES.CERTIFICATE],
default=OPA_AUTH_TYPES.NONE,
help_text=_('Authentication type for OPA: "None", "Token", or "Certificate".'),
help_text=_('The authentication type that will be used to connect to the OPA server: "None", "Token", or "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
@ -1043,9 +1043,11 @@ if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to
register(
'OPA_AUTH_TOKEN',
field_class=fields.CharField,
label=_('OPA Authentication Token'),
label=_('OPA authentication token'),
default='',
help_text=_('Token for OPA authentication, required when OPA_AUTH_TYPE is "Token".'),
help_text=_(
'The token for authentication to the OPA server. Required when OPA_AUTH_TYPE is "Token". If an authorization header is defined in OPA_AUTH_CUSTOM_HEADERS, it will be overridden by OPA_AUTH_TOKEN.'
),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
@ -1055,9 +1057,9 @@ if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to
register(
'OPA_AUTH_CLIENT_CERT',
field_class=fields.CharField,
label=_('OPA Client Certificate Content'),
label=_('OPA client certificate content'),
default='',
help_text=_('Content of the client certificate file for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".'),
help_text=_('The content of the client certificate file for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
@ -1066,9 +1068,9 @@ if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to
register(
'OPA_AUTH_CLIENT_KEY',
field_class=fields.CharField,
label=_('OPA Client Key Content'),
label=_('OPA client key content'),
default='',
help_text=_('Content of the client key for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".'),
help_text=_('The content of the client key for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
@ -1078,9 +1080,9 @@ if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to
register(
'OPA_AUTH_CA_CERT',
field_class=fields.CharField,
label=_('OPA CA Certificate Content'),
label=_('OPA CA certificate content'),
default='',
help_text=_('Content of the CA certificate for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".'),
help_text=_('The content of the CA certificate for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
@ -1089,20 +1091,19 @@ if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to
register(
'OPA_AUTH_CUSTOM_HEADERS',
field_class=fields.DictField,
label=_('OPA Custom Authentication Headers'),
label=_('OPA custom authentication headers'),
default={},
help_text=_('Custom headers for OPA authentication, defaults to {}, this will be added to the request headers. TODO: currently unimplemented.'),
help_text=_('Optional custom headers included in requests to the OPA server. Defaults to empty dictionary ({}).'),
category=('PolicyAsCode'),
category_slug='policyascode',
encrypted=True,
)
register(
'OPA_REQUEST_TIMEOUT',
field_class=fields.FloatField,
label=_('OPA Request Timeout'),
label=_('OPA request timeout'),
default=1.5,
help_text=_('Connection timeout in seconds, defaults to 1.5 seconds.'),
help_text=_('The number of seconds after which the connection to the OPA server will time out. Defaults to 1.5 seconds.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
@ -1110,9 +1111,9 @@ if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to
register(
'OPA_REQUEST_RETRIES',
field_class=fields.IntegerField,
label=_('OPA Request Retry Count'),
label=_('OPA request retry count'),
default=2,
help_text=_('Number of retries to connect to OPA service, defaults to 2.'),
help_text=_('The number of retry attempts for connecting to the OPA server. Default is 2.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)

View File

@ -0,0 +1,46 @@
# Generated by Django 4.2.18 on 2025-03-17 16:10
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0196_indirect_managed_node_audit'),
]
operations = [
migrations.AddField(
model_name='inventory',
name='opa_query_path',
field=models.CharField(
blank=True,
default=None,
help_text='The query path for the OPA policy to evaluate prior to job execution. The query path should be formatted as package/rule.',
max_length=128,
null=True,
),
),
migrations.AddField(
model_name='jobtemplate',
name='opa_query_path',
field=models.CharField(
blank=True,
default=None,
help_text='The query path for the OPA policy to evaluate prior to job execution. The query path should be formatted as package/rule.',
max_length=128,
null=True,
),
),
migrations.AddField(
model_name='organization',
name='opa_query_path',
field=models.CharField(
blank=True,
default=None,
help_text='The query path for the OPA policy to evaluate prior to job execution. The query path should be formatted as package/rule.',
max_length=128,
null=True,
),
),
]

View File

@ -1,6 +1,5 @@
import logging
from awx.main.models import Organization
logger = logging.getLogger('awx.main.migrations')
@ -8,6 +7,7 @@ logger = logging.getLogger('awx.main.migrations')
def migrate_org_admin_to_use(apps, schema_editor):
logger.info('Initiated migration from Org admin to use role')
roles_added = 0
Organization = apps.get_model('main', 'Organization')
for org in Organization.objects.prefetch_related('admin_role__members').iterator(chunk_size=1000):
igs = list(org.instance_groups.all())
if not igs:

View File

@ -47,6 +47,7 @@ from awx.main.models.mixins import (
TaskManagerInventoryUpdateMixin,
RelatedJobsMixin,
CustomVirtualEnvMixin,
OpaQueryPathMixin,
)
from awx.main.models.notifications import (
NotificationTemplate,
@ -74,7 +75,7 @@ class InventoryConstructedInventoryMembership(models.Model):
)
class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin):
class Inventory(CommonModelNameNotUnique, ResourceMixin, RelatedJobsMixin, OpaQueryPathMixin):
"""
an inventory source contains lists and hosts.
"""

View File

@ -51,6 +51,7 @@ from awx.main.models.mixins import (
RelatedJobsMixin,
WebhookMixin,
WebhookTemplateMixin,
OpaQueryPathMixin,
)
from awx.main.constants import JOB_VARIABLE_PREFIXES
@ -192,7 +193,9 @@ class JobOptions(BaseModel):
return needed
class JobTemplate(UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, ResourceMixin, CustomVirtualEnvMixin, RelatedJobsMixin, WebhookTemplateMixin):
class JobTemplate(
UnifiedJobTemplate, JobOptions, SurveyJobTemplateMixin, ResourceMixin, CustomVirtualEnvMixin, RelatedJobsMixin, WebhookTemplateMixin, OpaQueryPathMixin
):
"""
A job template is a reusable job definition for applying a project (with
playbook) to an inventory source with a given credential.

View File

@ -42,6 +42,7 @@ __all__ = [
'TaskManagerInventoryUpdateMixin',
'ExecutionEnvironmentMixin',
'CustomVirtualEnvMixin',
'OpaQueryPathMixin',
]
@ -692,3 +693,16 @@ class WebhookMixin(models.Model):
logger.debug("Webhook status update sent.")
else:
logger.error("Posting webhook status failed, code: {}\n" "{}\nPayload sent: {}".format(response.status_code, response.text, json.dumps(data)))
class OpaQueryPathMixin(models.Model):
class Meta:
abstract = True
opa_query_path = models.CharField(
max_length=128,
blank=True,
null=True,
default=None,
help_text=_("The query path for the OPA policy to evaluate prior to job execution. The query path should be formatted as package/rule."),
)

View File

@ -22,12 +22,12 @@ from awx.main.models.rbac import (
ROLE_SINGLETON_SYSTEM_AUDITOR,
)
from awx.main.models.unified_jobs import UnifiedJob
from awx.main.models.mixins import ResourceMixin, CustomVirtualEnvMixin, RelatedJobsMixin
from awx.main.models.mixins import ResourceMixin, CustomVirtualEnvMixin, RelatedJobsMixin, OpaQueryPathMixin
__all__ = ['Organization', 'Team', 'Profile', 'UserSessionMembership']
class Organization(CommonModel, NotificationFieldsModel, ResourceMixin, CustomVirtualEnvMixin, RelatedJobsMixin):
class Organization(CommonModel, NotificationFieldsModel, ResourceMixin, CustomVirtualEnvMixin, RelatedJobsMixin, OpaQueryPathMixin):
"""
An organization is the basic unit of multi-tenancy divisions
"""

View File

@ -2,6 +2,8 @@ import json
import tempfile
import contextlib
from pprint import pformat
from typing import Optional, Union
from django.conf import settings
@ -40,10 +42,22 @@ def _opa_base_client_init_fix(
BaseClient.__init__ = _opa_base_client_init_fix
class _TeamSerializer(serializers.ModelSerializer):
class Meta:
model = models.Team
fields = ('id', 'name')
class _UserSerializer(serializers.ModelSerializer):
teams = serializers.SerializerMethodField()
class Meta:
model = models.User
fields = ('id', 'username', 'is_superuser')
fields = ('id', 'username', 'is_superuser', 'teams')
def get_teams(self, user: models.User):
teams = models.Team.access_qs(user, 'member')
return _TeamSerializer(many=True).to_representation(teams)
class _ExecutionEnvironmentSerializer(serializers.ModelSerializer):
@ -74,7 +88,7 @@ class _InstanceGroupSerializer(serializers.ModelSerializer):
class _InventorySourceSerializer(serializers.ModelSerializer):
class Meta:
model = models.InventorySource
fields = ('id', 'name', 'type', 'kind')
fields = ('id', 'name', 'source', 'status')
class _InventorySerializer(serializers.ModelSerializer):
@ -86,8 +100,13 @@ class _InventorySerializer(serializers.ModelSerializer):
'id',
'name',
'description',
'kind',
'total_hosts',
'total_groups',
'has_inventory_sources',
'total_inventory_sources',
'has_active_failures',
'hosts_with_active_failures',
'inventory_sources',
)
@ -112,6 +131,15 @@ class _WorkflowJobTemplateSerializer(serializers.ModelSerializer):
)
class _WorkflowJobSerializer(serializers.ModelSerializer):
class Meta:
model = models.WorkflowJob
fields = (
'id',
'name',
)
class _OrganizationSerializer(serializers.ModelSerializer):
class Meta:
model = models.Organization
@ -138,15 +166,45 @@ class _ProjectSerializer(serializers.ModelSerializer):
)
class _CredentialSerializer(serializers.ModelSerializer):
organization = _OrganizationSerializer()
class Meta:
model = models.Credential
fields = (
'id',
'name',
'description',
'organization',
'credential_type',
'managed',
'kind',
'cloud',
'kubernetes',
)
class _LabelSerializer(serializers.ModelSerializer):
organization = _OrganizationSerializer()
class Meta:
model = models.Label
fields = ('id', 'name', 'organization')
class JobSerializer(serializers.ModelSerializer):
created_by = _UserSerializer()
credentials = _CredentialSerializer(many=True)
execution_environment = _ExecutionEnvironmentSerializer()
instance_group = _InstanceGroupSerializer()
inventory = _InventorySerializer()
job_template = _JobTemplateSerializer()
labels = _LabelSerializer(many=True)
organization = _OrganizationSerializer()
project = _ProjectSerializer()
extra_vars = fields.SerializerMethodField()
hosts_count = fields.SerializerMethodField()
workflow_job = fields.SerializerMethodField()
workflow_job_template = fields.SerializerMethodField()
class Meta:
@ -156,6 +214,7 @@ class JobSerializer(serializers.ModelSerializer):
'name',
'created',
'created_by',
'credentials',
'execution_environment',
'extra_vars',
'forks',
@ -165,6 +224,7 @@ class JobSerializer(serializers.ModelSerializer):
'job_template',
'job_type',
'job_type_name',
'labels',
'launch_type',
'limit',
'launched_by',
@ -173,8 +233,7 @@ class JobSerializer(serializers.ModelSerializer):
'project',
'scm_branch',
'scm_revision',
'workflow_job_id',
'workflow_node_id',
'workflow_job',
'workflow_job_template',
)
@ -184,6 +243,12 @@ class JobSerializer(serializers.ModelSerializer):
def get_hosts_count(self, obj: models.Job):
return obj.hosts.count()
def get_workflow_job(self, obj: models.Job):
workflow_job: models.WorkflowJob = obj.get_workflow_job()
if workflow_job is None:
return None
return _WorkflowJobSerializer().to_representation(workflow_job)
def get_workflow_job_template(self, obj: models.Job):
workflow_job: models.WorkflowJob = obj.get_workflow_job()
if workflow_job is None:
@ -255,11 +320,13 @@ def evaluate_policy(instance):
if not isinstance(instance, models.Job):
return
instance.log_lifecycle("evaluate_policy")
input_data = JobSerializer(instance=instance).data
headers = None
headers = settings.OPA_AUTH_CUSTOM_HEADERS
if settings.OPA_AUTH_TYPE == OPA_AUTH_TYPES.TOKEN:
headers = {'Authorization': 'Bearer {}'.format(settings.OPA_AUTH_TOKEN)}
headers.update({'Authorization': 'Bearer {}'.format(settings.OPA_AUTH_TOKEN)})
if settings.OPA_AUTH_TYPE == OPA_AUTH_TYPES.CERTIFICATE and not settings.OPA_SSL:
raise PolicyEvaluationError(_('OPA_AUTH_TYPE=Certificate requires OPA_SSL to be enabled.'))
@ -277,36 +344,66 @@ def evaluate_policy(instance):
if cert_settings_missing:
raise PolicyEvaluationError(_('Following certificate settings are missing for OPA_AUTH_TYPE=Certificate: {}').format(cert_settings_missing))
query_paths = [
('Organization', instance.organization.opa_query_path),
('Inventory', instance.inventory.opa_query_path),
('Job template', instance.job_template.opa_query_path),
]
violations = dict()
errors = dict()
try:
with opa_client(headers=headers) as client:
try:
response = client.query_rule(input_data=input_data, package_path='job_template', rule_name='response')
except HTTPError as e:
message = _('Call to OPA failed. Exception: {}').format(e)
for path_type, query_path in query_paths:
response = dict()
try:
error_data = e.response.json()
except ValueError:
raise PolicyEvaluationError(message)
if not query_path:
continue
error_code = error_data.get("code")
error_message = error_data.get("message")
if error_code or error_message:
message = _('Call to OPA failed. Code: {}, Message: {}').format(error_code, error_message)
raise PolicyEvaluationError(message)
except Exception as e:
raise PolicyEvaluationError(_('Call to OPA failed. Exception: {}').format(e))
response = client.query_rule(input_data=input_data, package_path=query_path)
result = response.get('result')
if result is None:
raise PolicyEvaluationError(_('Call to OPA did not return a "result" property. The path refers to an undefined document.'))
except HTTPError as e:
message = _('Call to OPA failed. Exception: {}').format(e)
try:
error_data = e.response.json()
except ValueError:
errors[path_type] = message
continue
result_serializer = OPAResultSerializer(data=result)
if not result_serializer.is_valid():
raise PolicyEvaluationError(_('OPA policy returned invalid result.'))
error_code = error_data.get("code")
error_message = error_data.get("message")
if error_code or error_message:
message = _('Call to OPA failed. Code: {}, Message: {}').format(error_code, error_message)
errors[path_type] = message
continue
except Exception as e:
errors[path_type] = _('Call to OPA failed. Exception: {}').format(e)
continue
result = response.get('result')
if result is None:
errors[path_type] = _('Call to OPA did not return a "result" property. The path refers to an undefined document.')
continue
result_serializer = OPAResultSerializer(data=result)
if not result_serializer.is_valid():
errors[path_type] = _('OPA policy returned invalid result.')
continue
result_data = result_serializer.validated_data
if not result_data.get("allowed") and (result_violations := result_data.get("violations")):
violations[path_type] = result_violations
format_results = dict()
if any(errors[e] for e in errors):
format_results["Errors"] = errors
if any(violations[v] for v in violations):
format_results["Violations"] = violations
if violations or errors:
raise PolicyEvaluationError(pformat(format_results, width=80))
result_data = result_serializer.validated_data
if not result_data["allowed"]:
violations = result_data.get("violations", [])
raise PolicyEvaluationError(_('OPA policy denied the request, Violations: {}').format(violations))
except Exception as e:
raise PolicyEvaluationError(_('Policy evaluation failed, Exception: {}').format(e))
raise PolicyEvaluationError(_('This job cannot be executed due to a policy violation or error. See the following details:\n{}').format(e))

View File

@ -1,17 +1,39 @@
import json
import re
from unittest import mock
import pytest
import requests.exceptions
from django.test import override_settings
from awx.main.models import Job, Inventory, Project, Organization
from awx.main.models import (
Job,
Inventory,
Project,
Organization,
JobTemplate,
Credential,
CredentialType,
User,
Team,
Label,
WorkflowJob,
WorkflowJobNode,
InventorySource,
)
from awx.main.exceptions import PolicyEvaluationError
from awx.main.tasks import policy
from awx.main.tasks.policy import JobSerializer
def _parse_exception_message(exception: PolicyEvaluationError):
pe_plain = str(exception.value)
assert "This job cannot be executed due to a policy violation or error. See the following details:" in pe_plain
violation_message = "This job cannot be executed due to a policy violation or error. See the following details:"
return eval(pe_plain.split(violation_message)[1].strip())
@pytest.fixture(autouse=True)
def enable_flag():
with override_settings(
@ -35,18 +57,41 @@ def opa_client():
@pytest.fixture
def job():
project: Project = Project.objects.create(name='proj1', scm_type='git', scm_branch='main', scm_url='https://git.example.com/proj1')
inventory: Inventory = Inventory.objects.create(name='inv1')
job: Job = Job.objects.create(name='job1', extra_vars="{}", inventory=inventory, project=project)
inventory: Inventory = Inventory.objects.create(name='inv1', opa_query_path="inventory/response")
org: Organization = Organization.objects.create(name="org1", opa_query_path="organization/response")
jt: JobTemplate = JobTemplate.objects.create(name="jt1", opa_query_path="job_template/response")
job: Job = Job.objects.create(name='job1', extra_vars="{}", inventory=inventory, project=project, organization=org, job_template=jt)
return job
@pytest.mark.django_db
def test_job_serializer():
user: User = User.objects.create(username='user1')
org: Organization = Organization.objects.create(name='org1')
team: Team = Team.objects.create(name='team1', organization=org)
team.admin_role.members.add(user)
project: Project = Project.objects.create(name='proj1', scm_type='git', scm_branch='main', scm_url='https://git.example.com/proj1')
inventory: Inventory = Inventory.objects.create(name='inv1')
inventory: Inventory = Inventory.objects.create(name='inv1', description='Demo inventory')
inventory_source: InventorySource = InventorySource.objects.create(name='inv-src1', source='file', inventory=inventory)
extra_vars = {"FOO": "value1", "BAR": "value2"}
job: Job = Job.objects.create(name='job1', extra_vars=json.dumps(extra_vars), inventory=inventory, project=project, organization=org)
CredentialType.setup_tower_managed_defaults()
cred_type_ssh: CredentialType = CredentialType.objects.get(kind='ssh')
cred: Credential = Credential.objects.create(name="cred1", description='Demo credential', credential_type=cred_type_ssh, organization=org)
label: Label = Label.objects.create(name='label1', organization=org)
job: Job = Job.objects.create(
name='job1', extra_vars=json.dumps(extra_vars), inventory=inventory, project=project, organization=org, created_by=user, launch_type='workflow'
)
# job.unified_job_node.workflow_job = workflow_job
job.credentials.add(cred)
job.labels.add(label)
workflow_job: WorkflowJob = WorkflowJob.objects.create(name='wf-job1')
WorkflowJobNode.objects.create(job=job, workflow_job=workflow_job)
serializer = JobSerializer(instance=job)
@ -54,17 +99,69 @@ def test_job_serializer():
'id': job.id,
'name': 'job1',
'created': job.created.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
'created_by': None,
'created_by': {
'id': user.id,
'username': 'user1',
'is_superuser': False,
'teams': [
{'id': team.id, 'name': 'team1'},
],
},
'credentials': [
{
'id': cred.id,
'name': 'cred1',
'description': 'Demo credential',
'organization': {
'id': org.id,
'name': 'org1',
},
'credential_type': cred_type_ssh.id,
'kind': 'ssh',
'managed': False,
'kubernetes': False,
'cloud': False,
},
],
'execution_environment': None,
'extra_vars': extra_vars,
'forks': 0,
'hosts_count': 0,
'instance_group': None,
'inventory': inventory.id,
'inventory': {
'id': inventory.id,
'name': 'inv1',
'description': 'Demo inventory',
'kind': '',
'total_hosts': 0,
'total_groups': 0,
'has_inventory_sources': False,
'total_inventory_sources': 0,
'has_active_failures': False,
'hosts_with_active_failures': 0,
'inventory_sources': [
{
'id': inventory_source.id,
'name': 'inv-src1',
'source': 'file',
'status': 'never updated',
}
],
},
'job_template': None,
'job_type': 'run',
'job_type_name': 'job',
'launch_type': 'manual',
'labels': [
{
'id': label.id,
'name': 'label1',
'organization': {
'id': org.id,
'name': 'org1',
},
},
],
'launch_type': 'workflow',
'limit': '',
'launched_by': {},
'organization': {
@ -86,17 +183,21 @@ def test_job_serializer():
},
'scm_branch': '',
'scm_revision': '',
'workflow_job_id': None,
'workflow_node_id': None,
'workflow_job': {
'id': workflow_job.id,
'name': 'wf-job1',
},
'workflow_job_template': None,
}
@pytest.mark.django_db
def test_evaluate_policy(opa_client):
def test_evaluate_policy_missing_opa_query_path_field(opa_client):
project: Project = Project.objects.create(name='proj1', scm_type='git', scm_branch='main', scm_url='https://git.example.com/proj1')
inventory: Inventory = Inventory.objects.create(name='inv1')
job: Job = Job.objects.create(name='job1', extra_vars="{}", inventory=inventory, project=project)
org: Organization = Organization.objects.create(name="org1")
jt: JobTemplate = JobTemplate.objects.create(name="jt1")
job: Job = Job.objects.create(name='job1', extra_vars="{}", inventory=inventory, project=project, organization=org, job_template=jt)
response = {
"result": {
@ -110,7 +211,32 @@ def test_evaluate_policy(opa_client):
except PolicyEvaluationError as e:
pytest.fail(f"Must not raise PolicyEvaluationError: {e}")
opa_client.query_rule.assert_called_once_with(input_data=mock.ANY, package_path='job_template', rule_name='response')
assert opa_client.query_rule.call_count == 0
@pytest.mark.django_db
def test_evaluate_policy(opa_client, job):
response = {
"result": {
"allowed": True,
"violations": [],
}
}
opa_client.query_rule.return_value = response
try:
policy.evaluate_policy(job)
except PolicyEvaluationError as e:
pytest.fail(f"Must not raise PolicyEvaluationError: {e}")
opa_client.query_rule.assert_has_calls(
[
mock.call(input_data=mock.ANY, package_path='organization/response'),
mock.call(input_data=mock.ANY, package_path='inventory/response'),
mock.call(input_data=mock.ANY, package_path='job_template/response'),
],
any_order=False,
)
assert opa_client.query_rule.call_count == 3
@pytest.mark.django_db
@ -127,7 +253,7 @@ def test_evaluate_policy_allowed(opa_client, job):
except PolicyEvaluationError as e:
pytest.fail(f"Must not raise PolicyEvaluationError: {e}")
opa_client.query_rule.assert_called_once()
assert opa_client.query_rule.call_count == 3
@pytest.mark.django_db
@ -140,10 +266,19 @@ def test_evaluate_policy_not_allowed(opa_client, job):
}
opa_client.query_rule.return_value = response
with pytest.raises(PolicyEvaluationError, match=re.escape("OPA policy denied the request, Violations: ['Access not allowed.']")):
with pytest.raises(PolicyEvaluationError) as pe:
policy.evaluate_policy(job)
opa_client.query_rule.assert_called_once()
pe_plain = str(pe.value)
assert "Errors:" not in pe_plain
exception = _parse_exception_message(pe)
assert exception["Violations"]["Organization"] == ["Access not allowed."]
assert exception["Violations"]["Inventory"] == ["Access not allowed."]
assert exception["Violations"]["Job template"] == ["Access not allowed."]
assert opa_client.query_rule.call_count == 3
@pytest.mark.django_db
@ -151,10 +286,17 @@ def test_evaluate_policy_not_found(opa_client, job):
response = {}
opa_client.query_rule.return_value = response
with pytest.raises(PolicyEvaluationError, match=re.escape('Call to OPA did not return a "result" property. The path refers to an undefined document.')):
with pytest.raises(PolicyEvaluationError) as pe:
policy.evaluate_policy(job)
opa_client.query_rule.assert_called_once()
missing_result_property = 'Call to OPA did not return a "result" property. The path refers to an undefined document.'
exception = _parse_exception_message(pe)
assert exception["Errors"]["Organization"] == missing_result_property
assert exception["Errors"]["Inventory"] == missing_result_property
assert exception["Errors"]["Job template"] == missing_result_property
assert opa_client.query_rule.call_count == 3
@pytest.mark.django_db
@ -176,7 +318,56 @@ def test_evaluate_policy_server_error(opa_client, job):
opa_client.query_rule.side_effect = requests.exceptions.HTTPError(http_error_msg, response=response)
with pytest.raises(PolicyEvaluationError, match=re.escape(f'Call to OPA failed. Code: internal_error, Message: {error_response["message"]}')):
with pytest.raises(PolicyEvaluationError) as pe:
policy.evaluate_policy(job)
opa_client.query_rule.assert_called_once()
exception = _parse_exception_message(pe)
assert exception["Errors"]["Organization"] == f'Call to OPA failed. Code: internal_error, Message: {error_response["message"]}'
assert exception["Errors"]["Inventory"] == f'Call to OPA failed. Code: internal_error, Message: {error_response["message"]}'
assert exception["Errors"]["Job template"] == f'Call to OPA failed. Code: internal_error, Message: {error_response["message"]}'
assert opa_client.query_rule.call_count == 3
@pytest.mark.django_db
def test_evaluate_policy_invalid_result(opa_client, job):
response = {
"result": {
"absolutely": "no!",
}
}
opa_client.query_rule.return_value = response
with pytest.raises(PolicyEvaluationError) as pe:
policy.evaluate_policy(job)
invalid_result = 'OPA policy returned invalid result.'
exception = _parse_exception_message(pe)
assert exception["Errors"]["Organization"] == invalid_result
assert exception["Errors"]["Inventory"] == invalid_result
assert exception["Errors"]["Job template"] == invalid_result
assert opa_client.query_rule.call_count == 3
@pytest.mark.django_db
def test_evaluate_policy_failed_exception(opa_client, job):
error_response = {}
response = mock.Mock()
response.status_code = requests.codes.internal_server_error
response.json.return_value = error_response
opa_client.query_rule.side_effect = ValueError("Invalid JSON")
with pytest.raises(PolicyEvaluationError) as pe:
policy.evaluate_policy(job)
opa_failed_exception = 'Call to OPA failed. Exception: Invalid JSON'
exception = _parse_exception_message(pe)
assert exception["Errors"]["Organization"] == opa_failed_exception
assert exception["Errors"]["Inventory"] == opa_failed_exception
assert exception["Errors"]["Job template"] == opa_failed_exception
assert opa_client.query_rule.call_count == 3

View File

@ -1238,18 +1238,18 @@ INDIRECT_HOST_AUDIT_RECORD_MAX_AGE_DAYS = 7
FEATURE_POLICY_AS_CODE_ENABLED = False
OPA_POLICY_EVALUATION_DEFAULT_RESULT = {'allowed': True} # Default policy enforcement decision if policy evaluation fail for any reason.
OPA_HOST = '' # Host to connect to OPA service, defaults to ''. When this value is set to '', policy enforcement will be disabled.
OPA_PORT = 8181 # Port to connect to OPA service, defaults to 8181.
OPA_SSL = False # Use SSL to connect to OPA service, defaults to False.
OPA_HOST = '' # The hostname used to connect to the OPA server. If empty, policy enforcement will be disabled.
OPA_PORT = 8181 # The port used to connect to the OPA server. Defaults to 8181.
OPA_SSL = False # Enable or disable the use of SSL to connect to the OPA server. Defaults to false.
OPA_AUTH_TYPE = 'None' # 'None', 'Token', 'Certificate'
OPA_AUTH_TOKEN = '' # Token for OPA authentication, defaults to '', required when OPA_AUTH_TYPE = 'Token'.
OPA_AUTH_CLIENT_CERT = '' # Content of the client certificate file for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".
OPA_AUTH_CLIENT_KEY = '' # Content of the client key file for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".
OPA_AUTH_CA_CERT = '' # Content of the CA certificate file for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".
OPA_AUTH_CUSTOM_HEADERS = {} # Custom header for OPA authentication, defaults to {}, this will be added to the request headers. TODO: currently unimplemented.
OPA_REQUEST_TIMEOUT = 1.5 # Connection timeout in seconds, defaults to 1.5 seconds.
OPA_REQUEST_RETRIES = 2 # Number of retries to connect to OPA service, defaults to 2.
OPA_AUTH_TYPE = 'None' # The authentication type that will be used to connect to the OPA server: "None", "Token", or "Certificate".
OPA_AUTH_TOKEN = '' # The token for authentication to the OPA server. Required when OPA_AUTH_TYPE is "Token". If an authorization header is defined in OPA_AUTH_CUSTOM_HEADERS, it will be overridden by OPA_AUTH_TOKEN.
OPA_AUTH_CLIENT_CERT = '' # The content of the client certificate file for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".
OPA_AUTH_CLIENT_KEY = '' # The content of the client key for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".
OPA_AUTH_CA_CERT = '' # The content of the CA certificate for mTLS authentication to the OPA server. Required when OPA_AUTH_TYPE is "Certificate".
OPA_AUTH_CUSTOM_HEADERS = {} # Optional custom headers included in requests to the OPA server. Defaults to empty dictionary ({}).
OPA_REQUEST_TIMEOUT = 1.5 # The number of seconds after which the connection to the OPA server will time out. Defaults to 1.5 seconds.
OPA_REQUEST_RETRIES = 2 # The number of retry attempts for connecting to the OPA server. Default is 2.
# feature flags
FLAG_SOURCES = ('flags.sources.SettingsFlagsSource',)