[Feature][release_4.6] Policy as Code MVP part 1 (#6848)

This commit is contained in:
Hao Liu
2025-02-24 15:58:57 -05:00
committed by GitHub
parent b8a1e90b06
commit 2d648d1225
15 changed files with 922 additions and 12 deletions

View File

@@ -10,7 +10,7 @@ from django.core.validators import URLValidator, _lazy_re_compile
from django.utils.translation import gettext_lazy as _
# Django REST Framework
from rest_framework.fields import BooleanField, CharField, ChoiceField, DictField, DateTimeField, EmailField, IntegerField, ListField # noqa
from rest_framework.fields import BooleanField, CharField, ChoiceField, DictField, DateTimeField, EmailField, IntegerField, ListField, FloatField # noqa
from rest_framework.serializers import PrimaryKeyRelatedField # noqa
# AWX

View File

@@ -4,6 +4,7 @@ import logging
# Django
from django.core.checks import Error
from django.utils.translation import gettext_lazy as _
from django.conf import settings
# Django REST Framework
from rest_framework import serializers
@@ -12,6 +13,7 @@ from rest_framework import serializers
from awx.conf import fields, register, register_validate
from awx.main.models import ExecutionEnvironment
from awx.main.constants import SUBSCRIPTION_USAGE_MODEL_UNIQUE_HOSTS
from awx.main.tasks.policy import OPA_AUTH_TYPES
logger = logging.getLogger('awx.main.conf')
@@ -993,3 +995,124 @@ def csrf_trusted_origins_validate(serializer, attrs):
register_validate('system', csrf_trusted_origins_validate)
if settings.FEATURE_POLICY_AS_CODE_ENABLED: # Unable to use flag_enabled due to AppRegistryNotReady error
register(
'OPA_HOST',
field_class=fields.CharField,
label=_('OPA Server Hostname'),
default='',
help_text=_('Host to connect to OPA service, when set to the default value of "" policy enforcement will be disabled.'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
)
register(
'OPA_PORT',
field_class=fields.IntegerField,
label=_('OPA Server Port'),
default=8181,
help_text=_('Port to connect to OPA service, defaults to 8181.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_SSL',
field_class=fields.BooleanField,
label=_('Use SSL for OPA Connection'),
default=False,
help_text=_('Use SSL to connect to OPA service, defaults to False.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_AUTH_TYPE',
field_class=fields.ChoiceField,
label=_('OPA Authentication Type'),
choices=[OPA_AUTH_TYPES.NONE, OPA_AUTH_TYPES.TOKEN, OPA_AUTH_TYPES.CERTIFICATE],
default=OPA_AUTH_TYPES.NONE,
help_text=_('Authentication type for OPA: "None", "Token", or "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_AUTH_TOKEN',
field_class=fields.CharField,
label=_('OPA Authentication Token'),
default='',
help_text=_('Token for OPA authentication, required when OPA_AUTH_TYPE is "Token".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
encrypted=True,
)
register(
'OPA_AUTH_CLIENT_CERT',
field_class=fields.CharField,
label=_('OPA Client Certificate Content'),
default='',
help_text=_('Content of the client certificate file for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
)
register(
'OPA_AUTH_CLIENT_KEY',
field_class=fields.CharField,
label=_('OPA Client Key Content'),
default='',
help_text=_('Content of the client key for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
encrypted=True,
)
register(
'OPA_AUTH_CA_CERT',
field_class=fields.CharField,
label=_('OPA CA Certificate Content'),
default='',
help_text=_('Content of the CA certificate for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".'),
category=('PolicyAsCode'),
category_slug='policyascode',
allow_blank=True,
)
register(
'OPA_AUTH_CUSTOM_HEADERS',
field_class=fields.DictField,
label=_('OPA Custom Authentication Headers'),
default={},
help_text=_('Custom headers for OPA authentication, defaults to {}, this will be added to the request headers. TODO: currently unimplemented.'),
category=('PolicyAsCode'),
category_slug='policyascode',
encrypted=True,
)
register(
'OPA_REQUEST_TIMEOUT',
field_class=fields.FloatField,
label=_('OPA Request Timeout'),
default=1.5,
help_text=_('Connection timeout in seconds, defaults to 1.5 seconds.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)
register(
'OPA_REQUEST_RETRIES',
field_class=fields.IntegerField,
label=_('OPA Request Retry Count'),
default=2,
help_text=_('Number of retries to connect to OPA service, defaults to 2.'),
category=('PolicyAsCode'),
category_slug='policyascode',
)

View File

@@ -38,5 +38,12 @@ class PostRunError(Exception):
super(PostRunError, self).__init__(msg)
class PolicyEvaluationError(Exception):
def __init__(self, msg, status='failed', tb=''):
self.status = status
self.tb = tb
super(PolicyEvaluationError, self).__init__(msg)
class ReceptorNodeNotFound(RuntimeError):
pass

View File

@@ -17,7 +17,8 @@ import urllib.parse as urlparse
# Django
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from rest_framework.exceptions import PermissionDenied
# Runner
import ansible_runner
@@ -26,7 +27,6 @@ import ansible_runner
import git
from gitdb.exc import BadName as BadGitName
# AWX
from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename
@@ -62,10 +62,11 @@ from awx.main.tasks.callback import (
RunnerCallbackForProjectUpdate,
RunnerCallbackForSystemJob,
)
from awx.main.tasks.policy import evaluate_policy
from awx.main.tasks.signals import with_signal_handling, signal_callback
from awx.main.tasks.receptor import AWXReceptorJob
from awx.main.tasks.facts import start_fact_cache, finish_fact_cache
from awx.main.exceptions import AwxTaskError, PostRunError, ReceptorNodeNotFound
from awx.main.exceptions import AwxTaskError, PolicyEvaluationError, PostRunError, ReceptorNodeNotFound
from awx.main.utils.ansible import read_ansible_config
from awx.main.utils.execution_environments import CONTAINER_ROOT, to_container_path
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
@@ -81,8 +82,6 @@ from awx.conf.license import get_license
from awx.main.utils.handlers import SpecialInventoryHandler
from awx.main.tasks.system import update_smart_memberships_for_inventory, update_inventory_computed_fields
from awx.main.utils.update_model import update_model
from rest_framework.exceptions import PermissionDenied
from django.utils.translation import gettext_lazy as _
logger = logging.getLogger('awx.main.tasks.jobs')
@@ -497,6 +496,7 @@ class BaseTask(object):
self.instance.send_notification_templates("running")
private_data_dir = self.build_private_data_dir(self.instance)
self.pre_run_hook(self.instance, private_data_dir)
evaluate_policy(self.instance)
self.build_project_dir(self.instance, private_data_dir)
self.instance.log_lifecycle("preparing_playbook")
if self.instance.cancel_flag or signal_callback():
@@ -624,6 +624,8 @@ class BaseTask(object):
elif cancel_flag_value is False:
self.runner_callback.delay_update(skip_if_already_set=True, job_explanation="The running ansible process received a shutdown signal.")
status = 'failed'
except PolicyEvaluationError as exc:
self.runner_callback.delay_update(job_explanation=str(exc), result_traceback=str(exc))
except ReceptorNodeNotFound as exc:
self.runner_callback.delay_update(job_explanation=str(exc))
except Exception:

286
awx/main/tasks/policy.py Normal file
View File

@@ -0,0 +1,286 @@
import json
import tempfile
import contextlib
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from flags.state import flag_enabled
from opa_client import OpaClient
from requests import HTTPError
from rest_framework import serializers
from rest_framework import fields
from awx.main import models
from awx.main.exceptions import PolicyEvaluationError
class _UserSerializer(serializers.ModelSerializer):
class Meta:
model = models.User
fields = ('id', 'username', 'is_superuser')
class _ExecutionEnvironmentSerializer(serializers.ModelSerializer):
class Meta:
model = models.ExecutionEnvironment
fields = (
'id',
'name',
'image',
'pull',
)
class _InstanceGroupSerializer(serializers.ModelSerializer):
class Meta:
model = models.InstanceGroup
fields = (
'id',
'name',
'capacity',
'jobs_running',
'jobs_total',
'max_concurrent_jobs',
'max_forks',
)
class _InventorySourceSerializer(serializers.ModelSerializer):
class Meta:
model = models.InventorySource
fields = ('id', 'name', 'type', 'kind')
class _InventorySerializer(serializers.ModelSerializer):
inventory_sources = _InventorySourceSerializer(many=True)
class Meta:
model = models.Inventory
fields = (
'id',
'name',
'description',
'total_hosts',
'total_groups',
'inventory_sources',
)
class _JobTemplateSerializer(serializers.ModelSerializer):
class Meta:
model = models.JobTemplate
fields = (
'id',
'name',
'job_type',
)
class _WorkflowJobTemplateSerializer(serializers.ModelSerializer):
class Meta:
model = models.WorkflowJobTemplate
fields = (
'id',
'name',
'job_type',
)
class _OrganizationSerializer(serializers.ModelSerializer):
class Meta:
model = models.Organization
fields = (
'id',
'name',
)
class _ProjectSerializer(serializers.ModelSerializer):
class Meta:
model = models.Project
fields = (
'id',
'name',
'status',
'scm_type',
'scm_url',
'scm_branch',
'scm_refspec',
'scm_clean',
'scm_track_submodules',
'scm_delete_on_update',
)
class JobSerializer(serializers.ModelSerializer):
created_by = _UserSerializer()
execution_environment = _ExecutionEnvironmentSerializer()
instance_group = _InstanceGroupSerializer()
job_template = _JobTemplateSerializer()
organization = _OrganizationSerializer()
project = _ProjectSerializer()
extra_vars = fields.SerializerMethodField()
hosts_count = fields.SerializerMethodField()
workflow_job_template = fields.SerializerMethodField()
class Meta:
model = models.Job
fields = (
'id',
'name',
'created',
'created_by',
'execution_environment',
'extra_vars',
'forks',
'hosts_count',
'instance_group',
'inventory',
'job_template',
'job_type',
'job_type_name',
'launch_type',
'limit',
'launched_by',
'organization',
'playbook',
'project',
'scm_branch',
'scm_revision',
'workflow_job_id',
'workflow_node_id',
'workflow_job_template',
)
def get_extra_vars(self, obj: models.Job):
return json.loads(obj.display_extra_vars())
def get_hosts_count(self, obj: models.Job):
return obj.hosts.count()
def get_workflow_job_template(self, obj: models.Job):
workflow_job: models.WorkflowJob = obj.get_workflow_job()
if workflow_job is None:
return None
workflow_job_template: models.WorkflowJobTemplate = workflow_job.workflow_job_template
if workflow_job_template is None:
return None
return _WorkflowJobTemplateSerializer().to_representation(workflow_job_template)
class OPAResultSerializer(serializers.Serializer):
allowed = fields.BooleanField(required=True)
violations = fields.ListField(child=fields.CharField())
class OPA_AUTH_TYPES:
NONE = 'None'
TOKEN = 'Token'
CERTIFICATE = 'Certificate'
@contextlib.contextmanager
def opa_cert_file():
if settings.OPA_AUTH_TYPE == OPA_AUTH_TYPES.CERTIFICATE:
with tempfile.NamedTemporaryFile(delete=True, mode='w', suffix=".pem") as cert_temp:
cert_temp.write(settings.OPA_AUTH_CA_CERT)
cert_temp.write("\n")
cert_temp.write(settings.OPA_AUTH_CLIENT_CERT)
cert_temp.write("\n")
cert_temp.write(settings.OPA_AUTH_CLIENT_KEY)
cert_temp.write("\n")
cert_temp.flush()
yield cert_temp.name
elif settings.OPA_SSL and settings.OPA_AUTH_CA_CERT:
with tempfile.NamedTemporaryFile(delete=True, mode='w', suffix=".pem") as cert_temp:
cert_temp.write(settings.OPA_AUTH_CA_CERT)
cert_temp.write("\n")
cert_temp.flush()
yield cert_temp.name
else:
yield None
@contextlib.contextmanager
def opa_client(headers=None):
with opa_cert_file() as cert_temp_file_name:
with OpaClient(
host=settings.OPA_HOST,
port=settings.OPA_PORT,
headers=headers,
ssl=settings.OPA_SSL,
cert=cert_temp_file_name,
retries=settings.OPA_REQUEST_RETRIES,
timeout=settings.OPA_REQUEST_TIMEOUT,
) as client:
yield client
def evaluate_policy(instance):
# Policy evaluation for Policy as Code feature
if not flag_enabled("FEATURE_POLICY_AS_CODE_ENABLED"):
return
if not settings.OPA_HOST:
return
if not isinstance(instance, models.Job):
return
input_data = JobSerializer(instance=instance).data
headers = None
if settings.OPA_AUTH_TYPE == OPA_AUTH_TYPES.TOKEN:
headers = {'Authorization': 'Bearer {}'.format(settings.OPA_AUTH_TOKEN)}
if settings.OPA_AUTH_TYPE == OPA_AUTH_TYPES.CERTIFICATE and not settings.OPA_SSL:
raise PolicyEvaluationError(_('OPA_AUTH_TYPE=Certificate requires OPA_SSL to be enabled.'))
cert_settings_missing = []
if settings.OPA_AUTH_TYPE == OPA_AUTH_TYPES.CERTIFICATE:
if not settings.OPA_AUTH_CLIENT_CERT:
cert_settings_missing += ['OPA_AUTH_CLIENT_CERT']
if not settings.OPA_AUTH_CLIENT_KEY:
cert_settings_missing += ['OPA_AUTH_CLIENT_KEY']
if not settings.OPA_AUTH_CA_CERT:
cert_settings_missing += ['OPA_AUTH_CA_CERT']
if cert_settings_missing:
raise PolicyEvaluationError(_('Following certificate settings are missing for OPA_AUTH_TYPE=Certificate: {}').format(cert_settings_missing))
try:
with opa_client(headers=headers) as client:
try:
response = client.query_rule(input_data=input_data, package_path='job_template', rule_name='response')
except HTTPError as e:
message = _('Call to OPA failed. Exception: {}').format(e)
try:
error_data = e.response.json()
except ValueError:
raise PolicyEvaluationError(message)
error_code = error_data.get("code")
error_message = error_data.get("message")
if error_code or error_message:
message = _('Call to OPA failed. Code: {}, Message: {}').format(error_code, error_message)
raise PolicyEvaluationError(message)
except Exception as e:
raise PolicyEvaluationError(_('Call to OPA failed. Exception: {}').format(e))
result = response.get('result')
if result is None:
raise PolicyEvaluationError(_('Call to OPA did not return a "result" property. The path refers to an undefined document.'))
result_serializer = OPAResultSerializer(data=result)
if not result_serializer.is_valid():
raise PolicyEvaluationError(_('OPA policy returned invalid result.'))
result_data = result_serializer.validated_data
if not result_data["allowed"]:
violations = result_data.get("violations", [])
raise PolicyEvaluationError(_('OPA policy denied the request, Violations: {}').format(violations))
except Exception as e:
raise PolicyEvaluationError(_('Policy evaluation failed, Exception: {}').format(e))

View File

@@ -0,0 +1,182 @@
import json
import re
from unittest import mock
import pytest
import requests.exceptions
from django.test import override_settings
from awx.main.models import Job, Inventory, Project, Organization
from awx.main.exceptions import PolicyEvaluationError
from awx.main.tasks import policy
from awx.main.tasks.policy import JobSerializer
@pytest.fixture(autouse=True)
def enable_flag():
with override_settings(
OPA_HOST='opa.example.com',
FLAGS={"FEATURE_POLICY_AS_CODE_ENABLED": [("boolean", True)]},
FLAG_SOURCES=('flags.sources.SettingsFlagsSource',),
):
yield
@pytest.fixture
def opa_client():
cls_mock = mock.MagicMock(name='OpaClient')
instance_mock = cls_mock.return_value
instance_mock.__enter__.return_value = instance_mock
with mock.patch('awx.main.tasks.policy.OpaClient', cls_mock):
yield instance_mock
@pytest.fixture
def job():
project: Project = Project.objects.create(name='proj1', scm_type='git', scm_branch='main', scm_url='https://git.example.com/proj1')
inventory: Inventory = Inventory.objects.create(name='inv1')
job: Job = Job.objects.create(name='job1', extra_vars="{}", inventory=inventory, project=project)
return job
@pytest.mark.django_db
def test_job_serializer():
org: Organization = Organization.objects.create(name='org1')
project: Project = Project.objects.create(name='proj1', scm_type='git', scm_branch='main', scm_url='https://git.example.com/proj1')
inventory: Inventory = Inventory.objects.create(name='inv1')
extra_vars = {"FOO": "value1", "BAR": "value2"}
job: Job = Job.objects.create(name='job1', extra_vars=json.dumps(extra_vars), inventory=inventory, project=project, organization=org)
serializer = JobSerializer(instance=job)
assert serializer.data == {
'id': job.id,
'name': 'job1',
'created': job.created.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
'created_by': None,
'execution_environment': None,
'extra_vars': extra_vars,
'forks': 0,
'hosts_count': 0,
'instance_group': None,
'inventory': inventory.id,
'job_template': None,
'job_type': 'run',
'job_type_name': 'job',
'launch_type': 'manual',
'limit': '',
'launched_by': {},
'organization': {
'id': org.id,
'name': 'org1',
},
'playbook': '',
'project': {
'id': project.id,
'name': 'proj1',
'status': 'pending',
'scm_type': 'git',
'scm_url': 'https://git.example.com/proj1',
'scm_branch': 'main',
'scm_refspec': '',
'scm_clean': False,
'scm_track_submodules': False,
'scm_delete_on_update': False,
},
'scm_branch': '',
'scm_revision': '',
'workflow_job_id': None,
'workflow_node_id': None,
'workflow_job_template': None,
}
@pytest.mark.django_db
def test_evaluate_policy(opa_client):
project: Project = Project.objects.create(name='proj1', scm_type='git', scm_branch='main', scm_url='https://git.example.com/proj1')
inventory: Inventory = Inventory.objects.create(name='inv1')
job: Job = Job.objects.create(name='job1', extra_vars="{}", inventory=inventory, project=project)
response = {
"result": {
"allowed": True,
"violations": [],
}
}
opa_client.query_rule.return_value = response
try:
policy.evaluate_policy(job)
except PolicyEvaluationError as e:
pytest.fail(f"Must not raise PolicyEvaluationError: {e}")
opa_client.query_rule.assert_called_once_with(input_data=mock.ANY, package_path='job_template', rule_name='response')
@pytest.mark.django_db
def test_evaluate_policy_allowed(opa_client, job):
response = {
"result": {
"allowed": True,
"violations": [],
}
}
opa_client.query_rule.return_value = response
try:
policy.evaluate_policy(job)
except PolicyEvaluationError as e:
pytest.fail(f"Must not raise PolicyEvaluationError: {e}")
opa_client.query_rule.assert_called_once()
@pytest.mark.django_db
def test_evaluate_policy_not_allowed(opa_client, job):
response = {
"result": {
"allowed": False,
"violations": ["Access not allowed."],
}
}
opa_client.query_rule.return_value = response
with pytest.raises(PolicyEvaluationError, match=re.escape("OPA policy denied the request, Violations: ['Access not allowed.']")):
policy.evaluate_policy(job)
opa_client.query_rule.assert_called_once()
@pytest.mark.django_db
def test_evaluate_policy_not_found(opa_client, job):
response = {}
opa_client.query_rule.return_value = response
with pytest.raises(PolicyEvaluationError, match=re.escape('Call to OPA did not return a "result" property. The path refers to an undefined document.')):
policy.evaluate_policy(job)
opa_client.query_rule.assert_called_once()
@pytest.mark.django_db
def test_evaluate_policy_server_error(opa_client, job):
http_error_msg = '500 Server Error: Internal Server Error for url: https://opa.example.com:8181/v1/data/job_template/response/invalid'
error_response = {
'code': 'internal_error',
'message': (
'1 error occurred: 1:1: rego_type_error: undefined ref: data.job_template.response.invalid\n\t'
'data.job_template.response.invalid\n\t'
' ^\n\t'
' have: "invalid"\n\t'
' want (one of): ["allowed" "violations"]'
),
}
response = mock.Mock()
response.status_code = requests.codes.internal_server_error
response.json.return_value = error_response
opa_client.query_rule.side_effect = requests.exceptions.HTTPError(http_error_msg, response=response)
with pytest.raises(PolicyEvaluationError, match=re.escape(f'Call to OPA failed. Code: internal_error, Message: {error_response["message"]}')):
policy.evaluate_policy(job)
opa_client.query_rule.assert_called_once()

View File

@@ -473,7 +473,7 @@ class TestGenericRun:
task.model.objects.get = mock.Mock(return_value=job)
task.build_private_data_files = mock.Mock(side_effect=OSError())
with mock.patch('awx.main.tasks.jobs.shutil.copytree'):
with mock.patch('awx.main.tasks.jobs.shutil.copytree'), mock.patch('awx.main.tasks.jobs.evaluate_policy'):
with pytest.raises(Exception):
task.run(1)

View File

@@ -1213,7 +1213,23 @@ ANSIBLE_BASE_ALLOW_SINGLETON_ROLES_API = False # Do not allow creating user-def
# system username for django-ansible-base
SYSTEM_USERNAME = None
# feature flags
FLAGS = {}
# setting for Policy as Code feature
FEATURE_POLICY_AS_CODE_ENABLED = False
OPA_POLICY_EVALUATION_DEFAULT_RESULT = {'allowed': True} # Default policy enforcement decision if policy evaluation fail for any reason.
OPA_HOST = '' # Host to connect to OPA service, defaults to ''. When this value is set to '', policy enforcement will be disabled.
OPA_PORT = 8181 # Port to connect to OPA service, defaults to 8181.
OPA_SSL = False # Use SSL to connect to OPA service, defaults to False.
OPA_AUTH_TYPE = 'None' # 'None', 'Token', 'Certificate'
OPA_AUTH_TOKEN = '' # Token for OPA authentication, defaults to '', required when OPA_AUTH_TYPE = 'Token'.
OPA_AUTH_CLIENT_CERT = '' # Content of the client certificate file for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".
OPA_AUTH_CLIENT_KEY = '' # Content of the client key file for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".
OPA_AUTH_CA_CERT = '' # Content of the CA certificate file for mTLS authentication, required when OPA_AUTH_TYPE is "Certificate".
OPA_AUTH_CUSTOM_HEADERS = {} # Custom header for OPA authentication, defaults to {}, this will be added to the request headers. TODO: currently unimplemented.
OPA_REQUEST_TIMEOUT = 1.5 # Connection timeout in seconds, defaults to 1.5 seconds.
OPA_REQUEST_RETRIES = 2 # Number of retries to connect to OPA service, defaults to 2.
# feature flags
FLAG_SOURCES = ('flags.sources.SettingsFlagsSource',)
FLAGS = {'FEATURE_POLICY_AS_CODE_ENABLED': [{'condition': 'boolean', 'value': False}]}