Compare commits

..

1 Commits

Author SHA1 Message Date
Luiz Costa
9e7486b024 WIP Makefile 2022-11-16 16:04:12 -03:00
437 changed files with 218672 additions and 15840 deletions

View File

@@ -2,7 +2,6 @@
name: CI
env:
BRANCH: ${{ github.base_ref || 'devel' }}
LC_ALL: "C.UTF-8" # prevent ERROR: Ansible could not initialize the preferred locale: unsupported locale setting
on:
pull_request:
jobs:

View File

@@ -1,7 +1,5 @@
---
name: Build/Push Development Images
env:
LC_ALL: "C.UTF-8" # prevent ERROR: Ansible could not initialize the preferred locale: unsupported locale setting
on:
push:
branches:

View File

@@ -1,8 +1,5 @@
---
name: E2E Tests
env:
LC_ALL: "C.UTF-8" # prevent ERROR: Ansible could not initialize the preferred locale: unsupported locale setting
on:
pull_request_target:
types: [labeled]

View File

@@ -1,7 +1,5 @@
---
name: Feature branch deletion cleanup
env:
LC_ALL: "C.UTF-8" # prevent ERROR: Ansible could not initialize the preferred locale: unsupported locale setting
on:
delete:
branches:

View File

@@ -1,9 +1,5 @@
---
name: Promote Release
env:
LC_ALL: "C.UTF-8" # prevent ERROR: Ansible could not initialize the preferred locale: unsupported locale setting
on:
release:
types: [published]

View File

@@ -1,9 +1,5 @@
---
name: Stage Release
env:
LC_ALL: "C.UTF-8" # prevent ERROR: Ansible could not initialize the preferred locale: unsupported locale setting
on:
workflow_dispatch:
inputs:

View File

@@ -1,9 +1,5 @@
---
name: Upload API Schema
env:
LC_ALL: "C.UTF-8" # prevent ERROR: Ansible could not initialize the preferred locale: unsupported locale setting
on:
push:
branches:

View File

@@ -12,7 +12,7 @@ recursive-include awx/plugins *.ps1
recursive-include requirements *.txt
recursive-include requirements *.yml
recursive-include config *
recursive-include licenses *
recursive-include docs/licenses *
recursive-exclude awx devonly.py*
recursive-exclude awx/api/tests *
recursive-exclude awx/main/tests *

157
Makefile
View File

@@ -34,7 +34,7 @@ RECEPTOR_IMAGE ?= quay.io/ansible/receptor:devel
SRC_ONLY_PKGS ?= cffi,pycparser,psycopg2,twilio
# These should be upgraded in the AWX and Ansible venv before attempting
# to install the actual requirements
VENV_BOOTSTRAP ?= pip==21.2.4 setuptools==65.6.3 setuptools_scm[toml]==7.0.5 wheel==0.38.4
VENV_BOOTSTRAP ?= pip==21.2.4 setuptools==58.2.0 setuptools_scm[toml]==6.4.2 wheel==0.36.2
NAME ?= awx
@@ -54,47 +54,6 @@ I18N_FLAG_FILE = .i18n_built
VERSION PYTHON_VERSION docker-compose-sources \
.git/hooks/pre-commit
clean-tmp:
rm -rf tmp/
clean-venv:
rm -rf venv/
clean-dist:
rm -rf dist
clean-schema:
rm -rf swagger.json
rm -rf schema.json
rm -rf reference-schema.json
clean-languages:
rm -f $(I18N_FLAG_FILE)
find ./awx/locale/ -type f -regex ".*\.mo$" -delete
## Remove temporary build files, compiled Python files.
clean: clean-ui clean-api clean-awxkit clean-dist
rm -rf awx/public
rm -rf awx/lib/site-packages
rm -rf awx/job_status
rm -rf awx/job_output
rm -rf reports
rm -rf tmp
rm -rf $(I18N_FLAG_FILE)
mkdir tmp
clean-api:
rm -rf build $(NAME)-$(VERSION) *.egg-info
rm -rf .tox
find . -type f -regex ".*\.py[co]$$" -delete
find . -type d -name "__pycache__" -delete
rm -f awx/awx_test.sqlite3*
rm -rf requirements/vendor
rm -rf awx/projects
clean-awxkit:
rm -rf awxkit/*.egg-info awxkit/.tox awxkit/build/*
## convenience target to assert environment variables are defined
guard-%:
@if [ "$${$*}" = "" ]; then \
@@ -372,15 +331,6 @@ bulk_data:
UI_BUILD_FLAG_FILE = awx/ui/.ui-built
clean-ui:
rm -rf node_modules
rm -rf awx/ui/node_modules
rm -rf awx/ui/build
rm -rf awx/ui/src/locales/_build
rm -rf $(UI_BUILD_FLAG_FILE)
# the collectstatic command doesn't like it if this dir doesn't exist.
mkdir -p awx/ui/build/static
awx/ui/node_modules:
NODE_OPTIONS=--max-old-space-size=6144 $(NPM_BIN) --prefix awx/ui --loglevel warn --force ci
@@ -503,15 +453,6 @@ detect-schema-change: genschema
# Ignore differences in whitespace with -b
diff -u -b reference-schema.json schema.json
docker-compose-clean: awx/projects
docker-compose -f tools/docker-compose/_sources/docker-compose.yml rm -sf
docker-compose-container-group-clean:
@if [ -f "tools/docker-compose-minikube/_sources/minikube" ]; then \
tools/docker-compose-minikube/_sources/minikube delete; \
fi
rm -rf tools/docker-compose-minikube/_sources/
## Base development image build
docker-compose-build:
ansible-playbook tools/ansible/dockerfile.yml -e build_dev=True -e receptor_image=$(RECEPTOR_IMAGE)
@@ -519,15 +460,6 @@ docker-compose-build:
--build-arg BUILDKIT_INLINE_CACHE=1 \
--cache-from=$(DEV_DOCKER_TAG_BASE)/awx_devel:$(COMPOSE_TAG) .
docker-clean:
$(foreach container_id,$(shell docker ps -f name=tools_awx -aq && docker ps -f name=tools_receptor -aq),docker stop $(container_id); docker rm -f $(container_id);)
if [ "$(shell docker images | grep awx_devel)" ]; then \
docker images | grep awx_devel | awk '{print $$3}' | xargs docker rmi --force; \
fi
docker-clean-volumes: docker-compose-clean docker-compose-container-group-clean
docker volume rm -f tools_awx_db tools_grafana_storage tools_prometheus_storage $(docker volume ls --filter name=tools_redis_socket_ -q)
docker-refresh: docker-clean docker-compose
## Docker Development Environment with Elastic Stack Connected
@@ -540,14 +472,6 @@ docker-compose-cluster-elk: awx/projects docker-compose-sources
docker-compose-container-group:
MINIKUBE_CONTAINER_GROUP=true make docker-compose
clean-elk:
docker stop tools_kibana_1
docker stop tools_logstash_1
docker stop tools_elasticsearch_1
docker rm tools_logstash_1
docker rm tools_elasticsearch_1
docker rm tools_kibana_1
psql-container:
docker run -it --net tools_default --rm postgres:12 sh -c 'exec psql -h "postgres" -p "5432" -U postgres'
@@ -593,6 +517,7 @@ pot: $(UI_BUILD_FLAG_FILE)
po: $(UI_BUILD_FLAG_FILE)
$(NPM_BIN) --prefix awx/ui --loglevel warn run extract-strings -- --clean
LANG = "en_us"
## generate API django .pot .po
messages:
@if [ "$(VENV_BASE)" ]; then \
@@ -603,6 +528,84 @@ messages:
print-%:
@echo $($*)
# Cleaning
# --------------------------------------
## Remove temporary build files, compiled Python files.
clean: clean-ui clean-api clean-awxkit clean-dist
rm -rf awx/public
rm -rf awx/lib/site-packages
rm -rf awx/job_status
rm -rf awx/job_output
rm -rf reports
rm -rf tmp
rm -rf $(I18N_FLAG_FILE)
mkdir tmp
clean-elk:
docker stop tools_kibana_1
docker stop tools_logstash_1
docker stop tools_elasticsearch_1
docker rm tools_logstash_1
docker rm tools_elasticsearch_1
docker rm tools_kibana_1
clean-ui:
rm -rf node_modules
rm -rf awx/ui/node_modules
rm -rf awx/ui/build
rm -rf awx/ui/src/locales/_build
rm -rf $(UI_BUILD_FLAG_FILE)
# the collectstatic command doesn't like it if this dir doesn't exist.
mkdir -p awx/ui/build/static
clean-tmp:
rm -rf tmp/
clean-venv:
rm -rf venv/
clean-dist:
rm -rf dist
clean-schema:
rm -rf swagger.json
rm -rf schema.json
rm -rf reference-schema.json
clean-languages:
rm -f $(I18N_FLAG_FILE)
find ./awx/locale/ -type f -regex ".*\.mo$" -delete
clean-api:
rm -rf build $(NAME)-$(VERSION) *.egg-info
rm -rf .tox
find . -type f -regex ".*\.py[co]$$" -delete
find . -type d -name "__pycache__" -delete
rm -f awx/awx_test.sqlite3*
rm -rf requirements/vendor
rm -rf awx/projects
clean-awxkit:
rm -rf awxkit/*.egg-info awxkit/.tox awxkit/build/*
docker-compose-clean: awx/projects
docker-compose -f tools/docker-compose/_sources/docker-compose.yml rm -sf
docker-compose-container-group-clean:
@if [ -f "tools/docker-compose-minikube/_sources/minikube" ]; then \
tools/docker-compose-minikube/_sources/minikube delete; \
fi
rm -rf tools/docker-compose-minikube/_sources/
docker-clean:
$(foreach container_id,$(shell docker ps -f name=tools_awx -aq && docker ps -f name=tools_receptor -aq),docker stop $(container_id); docker rm -f $(container_id);)
if [ "$(shell docker images | grep awx_devel)" ]; then \
docker images | grep awx_devel | awk '{print $$3}' | xargs docker rmi --force; \
fi
docker-clean-volumes: docker-compose-clean docker-compose-container-group-clean
docker volume rm -f tools_awx_db tools_grafana_storage tools_prometheus_storage $(docker volume ls --filter name=tools_redis_socket_ -q)
# HELP related targets
# --------------------------------------

View File

@@ -113,7 +113,7 @@ from awx.main.utils import (
)
from awx.main.utils.filters import SmartFilter
from awx.main.utils.named_url_graph import reset_counters
from awx.main.scheduler.task_manager_models import TaskManagerModels
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups, TaskManagerInstances
from awx.main.redact import UriCleaner, REPLACE_STR
from awx.main.validators import vars_validate_or_raise
@@ -5040,10 +5040,12 @@ class InstanceHealthCheckSerializer(BaseSerializer):
class InstanceGroupSerializer(BaseSerializer):
show_capabilities = ['edit', 'delete']
capacity = serializers.SerializerMethodField()
consumed_capacity = serializers.SerializerMethodField()
percent_capacity_remaining = serializers.SerializerMethodField()
jobs_running = serializers.SerializerMethodField()
jobs_running = serializers.IntegerField(
help_text=_('Count of jobs in the running or waiting state that ' 'are targeted for this instance group'), read_only=True
)
jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance group'), read_only=True)
instances = serializers.SerializerMethodField()
is_container_group = serializers.BooleanField(
@@ -5069,22 +5071,6 @@ class InstanceGroupSerializer(BaseSerializer):
label=_('Policy Instance Minimum'),
help_text=_("Static minimum number of Instances that will be automatically assign to " "this group when new instances come online."),
)
max_concurrent_jobs = serializers.IntegerField(
default=0,
min_value=0,
required=False,
initial=0,
label=_('Max Concurrent Jobs'),
help_text=_("Maximum number of concurrent jobs to run on a group. When set to zero, no maximum is enforced."),
)
max_forks = serializers.IntegerField(
default=0,
min_value=0,
required=False,
initial=0,
label=_('Max Forks'),
help_text=_("Maximum number of forks to execute concurrently on a group. When set to zero, no maximum is enforced."),
)
policy_instance_list = serializers.ListField(
child=serializers.CharField(),
required=False,
@@ -5106,8 +5092,6 @@ class InstanceGroupSerializer(BaseSerializer):
"consumed_capacity",
"percent_capacity_remaining",
"jobs_running",
"max_concurrent_jobs",
"max_forks",
"jobs_total",
"instances",
"is_container_group",
@@ -5189,39 +5173,28 @@ class InstanceGroupSerializer(BaseSerializer):
# Store capacity values (globally computed) in the context
if 'task_manager_igs' not in self.context:
instance_groups_queryset = None
jobs_qs = UnifiedJob.objects.filter(status__in=('running', 'waiting'))
if self.parent: # Is ListView:
instance_groups_queryset = self.parent.instance
tm_models = TaskManagerModels.init_with_consumed_capacity(
instance_fields=['uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'enabled'],
instance_groups_queryset=instance_groups_queryset,
)
instances = TaskManagerInstances(jobs_qs)
instance_groups = TaskManagerInstanceGroups(instances_by_hostname=instances, instance_groups_queryset=instance_groups_queryset)
self.context['task_manager_igs'] = tm_models.instance_groups
self.context['task_manager_igs'] = instance_groups
return self.context['task_manager_igs']
def get_consumed_capacity(self, obj):
ig_mgr = self.get_ig_mgr()
return ig_mgr.get_consumed_capacity(obj.name)
def get_capacity(self, obj):
ig_mgr = self.get_ig_mgr()
return ig_mgr.get_capacity(obj.name)
def get_percent_capacity_remaining(self, obj):
capacity = self.get_capacity(obj)
if not capacity:
if not obj.capacity:
return 0.0
consumed_capacity = self.get_consumed_capacity(obj)
return float("{0:.2f}".format(((float(capacity) - float(consumed_capacity)) / (float(capacity))) * 100))
ig_mgr = self.get_ig_mgr()
return float("{0:.2f}".format((float(ig_mgr.get_remaining_capacity(obj.name)) / (float(obj.capacity))) * 100))
def get_instances(self, obj):
ig_mgr = self.get_ig_mgr()
return len(ig_mgr.get_instances(obj.name))
def get_jobs_running(self, obj):
ig_mgr = self.get_ig_mgr()
return ig_mgr.get_jobs_running(obj.name)
return obj.instances.count()
class ActivityStreamSerializer(BaseSerializer):

View File

@@ -7,7 +7,7 @@ receptor_work_commands:
command: ansible-runner
params: worker
allowruntimeparams: true
verifysignature: true
verifysignature: {{ sign_work }}
custom_worksign_public_keyfile: receptor/work-public-key.pem
custom_tls_certfile: receptor/tls/receptor.crt
custom_tls_keyfile: receptor/tls/receptor.key

View File

@@ -6237,5 +6237,4 @@ msgstr "%s se está actualizando."
#: awx/ui/urls.py:24
msgid "This page will refresh when complete."
msgstr "Esta página se actualizará cuando se complete."
msgstr "Esta página se actualizará cuando se complete."

View File

@@ -721,7 +721,7 @@ msgstr "DTSTART valide obligatoire dans rrule. La valeur doit commencer par : DT
#: awx/api/serializers.py:4657
msgid ""
"DTSTART cannot be a naive datetime. Specify ;TZINFO= or YYYYMMDDTHHMMSSZZ."
msgstr "DTSTART ne peut correspondre à une date-heure naïve. Spécifier ;TZINFO= ou YYYYMMDDTHHMMSSZZ."
msgstr "DTSTART ne peut correspondre à une DateHeure naïve. Spécifier ;TZINFO= ou YYYYMMDDTHHMMSSZZ."
#: awx/api/serializers.py:4659
msgid "Multiple DTSTART is not supported."
@@ -6239,5 +6239,4 @@ msgstr "%s est en cours de mise à niveau."
#: awx/ui/urls.py:24
msgid "This page will refresh when complete."
msgstr "Cette page sera rafraîchie une fois terminée."
msgstr "Cette page sera rafraîchie une fois terminée."

View File

@@ -6237,5 +6237,4 @@ msgstr "Er wordt momenteel een upgrade van%s geïnstalleerd."
#: awx/ui/urls.py:24
msgid "This page will refresh when complete."
msgstr "Deze pagina wordt vernieuwd als hij klaar is."
msgstr "Deze pagina wordt vernieuwd als hij klaar is."

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,7 @@
import datetime
import asyncio
import logging
import aioredis
import redis
import re
@@ -81,7 +82,7 @@ class BroadcastWebsocketStatsManager:
async def run_loop(self):
try:
redis_conn = await redis.asyncio.create_redis_pool(settings.BROKER_URL)
redis_conn = await aioredis.create_redis_pool(settings.BROKER_URL)
while True:
stats_data_str = ''.join(stat.serialize() for stat in self._stats.values())
await redis_conn.set(self._redis_key, stats_data_str)

View File

@@ -16,7 +16,7 @@ from awx.conf.license import get_license
from awx.main.utils import get_awx_version, camelcase_to_underscore, datetime_hook
from awx.main import models
from awx.main.analytics import register
from awx.main.scheduler.task_manager_models import TaskManagerModels
from awx.main.scheduler.task_manager_models import TaskManagerInstances
"""
This module is used to define metrics collected by awx.main.analytics.gather()
@@ -237,8 +237,11 @@ def projects_by_scm_type(since, **kwargs):
def instance_info(since, include_hostnames=False, **kwargs):
info = {}
# Use same method that the TaskManager does to compute consumed capacity without querying all running jobs for each Instance
tm_models = TaskManagerModels.init_with_consumed_capacity(instance_fields=['uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'enabled'])
for tm_instance in tm_models.instances.instances_by_hostname.values():
active_tasks = models.UnifiedJob.objects.filter(status__in=['running', 'waiting']).only('task_impact', 'controller_node', 'execution_node')
tm_instances = TaskManagerInstances(
active_tasks, instance_fields=['uuid', 'version', 'capacity', 'cpu', 'memory', 'managed_by_policy', 'enabled', 'node_type']
)
for tm_instance in tm_instances.instances_by_hostname.values():
instance = tm_instance.obj
instance_info = {
'uuid': instance.uuid,

View File

@@ -38,14 +38,7 @@ class Command(BaseCommand):
(changed, instance) = Instance.objects.register(ip_address=os.environ.get('MY_POD_IP'), node_type='control', uuid=settings.SYSTEM_UUID)
RegisterQueue(settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, 100, 0, [], is_container_group=False).register()
RegisterQueue(
settings.DEFAULT_EXECUTION_QUEUE_NAME,
100,
0,
[],
is_container_group=True,
pod_spec_override=settings.DEFAULT_EXECUTION_QUEUE_POD_SPEC_OVERRIDE,
max_forks=settings.DEFAULT_EXECUTION_QUEUE_MAX_FORKS,
max_concurrent_jobs=settings.DEFAULT_EXECUTION_QUEUE_MAX_CONCURRENT_JOBS,
settings.DEFAULT_EXECUTION_QUEUE_NAME, 100, 0, [], is_container_group=True, pod_spec_override=settings.DEFAULT_EXECUTION_QUEUE_POD_SPEC_OVERRIDE
).register()
else:
(changed, instance) = Instance.objects.register(hostname=hostname, node_type=node_type, uuid=uuid)

View File

@@ -17,9 +17,7 @@ class InstanceNotFound(Exception):
class RegisterQueue:
def __init__(
self, queuename, instance_percent, inst_min, hostname_list, is_container_group=None, pod_spec_override=None, max_forks=None, max_concurrent_jobs=None
):
def __init__(self, queuename, instance_percent, inst_min, hostname_list, is_container_group=None, pod_spec_override=None):
self.instance_not_found_err = None
self.queuename = queuename
self.instance_percent = instance_percent
@@ -27,8 +25,6 @@ class RegisterQueue:
self.hostname_list = hostname_list
self.is_container_group = is_container_group
self.pod_spec_override = pod_spec_override
self.max_forks = max_forks
self.max_concurrent_jobs = max_concurrent_jobs
def get_create_update_instance_group(self):
created = False
@@ -49,14 +45,6 @@ class RegisterQueue:
ig.pod_spec_override = self.pod_spec_override
changed = True
if self.max_forks and (ig.max_forks != self.max_forks):
ig.max_forks = self.max_forks
changed = True
if self.max_concurrent_jobs and (ig.max_concurrent_jobs != self.max_concurrent_jobs):
ig.max_concurrent_jobs = self.max_concurrent_jobs
changed = True
if changed:
ig.save()

View File

@@ -1,14 +1,24 @@
# Generated by Django 3.2.13 on 2022-06-21 21:29
from django.db import migrations
import logging
logger = logging.getLogger("awx")
def forwards(apps, schema_editor):
InventorySource = apps.get_model('main', 'InventorySource')
InventorySource.objects.filter(update_on_project_update=True).update(update_on_launch=True)
Project = apps.get_model('main', 'Project')
Project.objects.filter(scm_inventory_sources__update_on_project_update=True).update(scm_update_on_launch=True)
sources = InventorySource.objects.filter(update_on_project_update=True)
for src in sources:
if src.update_on_launch == False:
src.update_on_launch = True
src.save(update_fields=['update_on_launch'])
logger.info(f"Setting update_on_launch to True for {src}")
proj = src.source_project
if proj and proj.scm_update_on_launch is False:
proj.scm_update_on_launch = True
proj.save(update_fields=['scm_update_on_launch'])
logger.warning(f"Setting scm_update_on_launch to True for {proj}")
class Migration(migrations.Migration):

View File

@@ -1,23 +0,0 @@
# Generated by Django 3.2.13 on 2022-10-24 18:22
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('main', '0172_prevent_instance_fallback'),
]
operations = [
migrations.AddField(
model_name='instancegroup',
name='max_concurrent_jobs',
field=models.IntegerField(default=0, help_text='Maximum number of concurrent jobs to run on this group. Zero means no limit.'),
),
migrations.AddField(
model_name='instancegroup',
name='max_forks',
field=models.IntegerField(default=0, help_text='Max forks to execute on this group. Zero means no limit.'),
),
]

View File

@@ -379,8 +379,6 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
default='',
)
)
max_concurrent_jobs = models.IntegerField(default=0, help_text=_("Maximum number of concurrent jobs to run on this group. Zero means no limit."))
max_forks = models.IntegerField(default=0, help_text=_("Max forks to execute on this group. Zero means no limit."))
policy_instance_percentage = models.IntegerField(default=0, help_text=_("Percentage of Instances to automatically assign to this group"))
policy_instance_minimum = models.IntegerField(default=0, help_text=_("Static minimum number of Instances to automatically assign to this group"))
policy_instance_list = JSONBlob(
@@ -394,8 +392,6 @@ class InstanceGroup(HasPolicyEditsMixin, BaseModel, RelatedJobsMixin):
@property
def capacity(self):
if self.is_container_group:
return self.max_forks
return sum(inst.capacity for inst in self.instances.all())
@property

View File

@@ -1351,12 +1351,12 @@ class UnifiedJob(
if required in defined_fields and not credential.has_input(required):
missing_credential_inputs.append(required)
if missing_credential_inputs:
self.job_explanation = '{} cannot start because Credential {} does not provide one or more required fields ({}).'.format(
self._meta.verbose_name.title(), credential.name, ', '.join(sorted(missing_credential_inputs))
)
self.save(update_fields=['job_explanation'])
return (False, None)
if missing_credential_inputs:
self.job_explanation = '{} cannot start because Credential {} does not provide one or more required fields ({}).'.format(
self._meta.verbose_name.title(), credential.name, ', '.join(sorted(missing_credential_inputs))
)
self.save(update_fields=['job_explanation'])
return (False, None)
needed = self.get_passwords_needed_to_start()
try:

View File

@@ -27,8 +27,8 @@ class AWXProtocolTypeRouter(ProtocolTypeRouter):
websocket_urlpatterns = [
re_path(r'websocket/$', consumers.EventConsumer.as_asgi()),
re_path(r'websocket/broadcast/$', consumers.BroadcastConsumer.as_asgi()),
re_path(r'websocket/$', consumers.EventConsumer),
re_path(r'websocket/broadcast/$', consumers.BroadcastConsumer),
]
application = AWXProtocolTypeRouter(

View File

@@ -43,7 +43,8 @@ from awx.main.utils.common import task_manager_bulk_reschedule, is_testing
from awx.main.signals import disable_activity_stream
from awx.main.constants import ACTIVE_STATES
from awx.main.scheduler.dependency_graph import DependencyGraph
from awx.main.scheduler.task_manager_models import TaskManagerModels
from awx.main.scheduler.task_manager_models import TaskManagerInstances
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
import awx.main.analytics.subsystem_metrics as s_metrics
from awx.main.utils import decrypt_field
@@ -70,12 +71,7 @@ class TaskBase:
# is called later.
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
self.start_time = time.time()
# We want to avoid calling settings in loops, so cache these settings at init time
self.start_task_limit = settings.START_TASK_LIMIT
self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT
self.control_task_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT
for m in self.subsystem_metrics.METRICS:
if m.startswith(self.prefix):
self.subsystem_metrics.set(m, 0)
@@ -83,7 +79,7 @@ class TaskBase:
def timed_out(self):
"""Return True/False if we have met or exceeded the timeout for the task manager."""
elapsed = time.time() - self.start_time
if elapsed >= self.task_manager_timeout:
if elapsed >= settings.TASK_MANAGER_TIMEOUT:
logger.warning(f"{self.prefix} manager has run for {elapsed} which is greater than TASK_MANAGER_TIMEOUT of {settings.TASK_MANAGER_TIMEOUT}.")
return True
return False
@@ -475,8 +471,9 @@ class TaskManager(TaskBase):
Init AFTER we know this instance of the task manager will run because the lock is acquired.
"""
self.dependency_graph = DependencyGraph()
self.tm_models = TaskManagerModels()
self.controlplane_ig = self.tm_models.instance_groups.controlplane_ig
self.instances = TaskManagerInstances(self.all_tasks)
self.instance_groups = TaskManagerInstanceGroups(instances_by_hostname=self.instances)
self.controlplane_ig = self.instance_groups.controlplane_ig
def job_blocked_by(self, task):
# TODO: I'm not happy with this, I think blocking behavior should be decided outside of the dependency graph
@@ -508,15 +505,7 @@ class TaskManager(TaskBase):
@timeit
def start_task(self, task, instance_group, dependent_tasks=None, instance=None):
# Just like for process_running_tasks, add the job to the dependency graph and
# ask the TaskManagerInstanceGroups object to update consumed capacity on all
# implicated instances and container groups.
self.dependency_graph.add_job(task)
if instance_group is not None:
task.instance_group = instance_group
# We need the instance group assigned to correctly account for container group max_concurrent_jobs and max_forks
self.tm_models.consume_capacity(task)
self.subsystem_metrics.inc(f"{self.prefix}_tasks_started", 1)
self.start_task_limit -= 1
if self.start_task_limit == 0:
@@ -524,6 +513,12 @@ class TaskManager(TaskBase):
ScheduleTaskManager().schedule()
from awx.main.tasks.system import handle_work_error, handle_work_success
# update capacity for control node and execution node
if task.controller_node:
self.instances[task.controller_node].consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT)
if task.execution_node:
self.instances[task.execution_node].consume_capacity(task.task_impact)
dependent_tasks = dependent_tasks or []
task_actual = {
@@ -551,6 +546,7 @@ class TaskManager(TaskBase):
ScheduleWorkflowManager().schedule()
# at this point we already have control/execution nodes selected for the following cases
else:
task.instance_group = instance_group
execution_node_msg = f' and execution node {task.execution_node}' if task.execution_node else ''
logger.debug(
f'Submitting job {task.log_format} controlled by {task.controller_node} to instance group {instance_group.name}{execution_node_msg}.'
@@ -584,7 +580,6 @@ class TaskManager(TaskBase):
if type(task) is WorkflowJob:
ScheduleWorkflowManager().schedule()
self.dependency_graph.add_job(task)
self.tm_models.consume_capacity(task)
@timeit
def process_pending_tasks(self, pending_tasks):
@@ -616,11 +611,11 @@ class TaskManager(TaskBase):
# Determine if there is control capacity for the task
if task.capacity_type == 'control':
control_impact = task.task_impact + self.control_task_impact
control_impact = task.task_impact + settings.AWX_CONTROL_NODE_TASK_IMPACT
else:
control_impact = self.control_task_impact
control_instance = self.tm_models.instance_groups.fit_task_to_most_remaining_capacity_instance(
task, instance_group_name=self.controlplane_ig.name, impact=control_impact, capacity_type='control'
control_impact = settings.AWX_CONTROL_NODE_TASK_IMPACT
control_instance = self.instance_groups.fit_task_to_most_remaining_capacity_instance(
task, instance_group_name=settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME, impact=control_impact, capacity_type='control'
)
if not control_instance:
self.task_needs_capacity(task, tasks_to_update_job_explanation)
@@ -631,19 +626,15 @@ class TaskManager(TaskBase):
# All task.capacity_type == 'control' jobs should run on control plane, no need to loop over instance groups
if task.capacity_type == 'control':
if not self.tm_models.instance_groups[self.controlplane_ig.name].has_remaining_capacity(control_impact=True):
continue
task.execution_node = control_instance.hostname
execution_instance = self.tm_models.instances[control_instance.hostname].obj
execution_instance = self.instances[control_instance.hostname].obj
task.log_lifecycle("controller_node_chosen")
task.log_lifecycle("execution_node_chosen")
self.start_task(task, self.controlplane_ig, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
continue
for instance_group in self.tm_models.instance_groups.get_instance_groups_from_task_cache(task):
if not self.tm_models.instance_groups[instance_group.name].has_remaining_capacity(task):
continue
for instance_group in self.instance_groups.get_instance_groups_from_task_cache(task):
if instance_group.is_container_group:
self.start_task(task, instance_group, task.get_jobs_fail_chain(), None)
found_acceptable_queue = True
@@ -651,9 +642,9 @@ class TaskManager(TaskBase):
# at this point we know the instance group is NOT a container group
# because if it was, it would have started the task and broke out of the loop.
execution_instance = self.tm_models.instance_groups.fit_task_to_most_remaining_capacity_instance(
execution_instance = self.instance_groups.fit_task_to_most_remaining_capacity_instance(
task, instance_group_name=instance_group.name, add_hybrid_control_cost=True
) or self.tm_models.instance_groups.find_largest_idle_instance(instance_group_name=instance_group.name, capacity_type=task.capacity_type)
) or self.instance_groups.find_largest_idle_instance(instance_group_name=instance_group.name, capacity_type=task.capacity_type)
if execution_instance:
task.execution_node = execution_instance.hostname
@@ -669,7 +660,7 @@ class TaskManager(TaskBase):
task.log_format, instance_group.name, execution_instance.hostname, execution_instance.remaining_capacity
)
)
execution_instance = self.tm_models.instances[execution_instance.hostname].obj
execution_instance = self.instances[execution_instance.hostname].obj
self.start_task(task, instance_group, task.get_jobs_fail_chain(), execution_instance)
found_acceptable_queue = True
break

View File

@@ -15,18 +15,15 @@ logger = logging.getLogger('awx.main.scheduler')
class TaskManagerInstance:
"""A class representing minimal data the task manager needs to represent an Instance."""
def __init__(self, obj, **kwargs):
def __init__(self, obj):
self.obj = obj
self.node_type = obj.node_type
self.consumed_capacity = 0
self.capacity = obj.capacity
self.hostname = obj.hostname
self.jobs_running = 0
def consume_capacity(self, impact, job_impact=False):
def consume_capacity(self, impact):
self.consumed_capacity += impact
if job_impact:
self.jobs_running += 1
@property
def remaining_capacity(self):
@@ -36,106 +33,9 @@ class TaskManagerInstance:
return remaining
class TaskManagerInstanceGroup:
"""A class representing minimal data the task manager needs to represent an InstanceGroup."""
def __init__(self, obj, task_manager_instances=None, **kwargs):
self.name = obj.name
self.is_container_group = obj.is_container_group
self.container_group_jobs = 0
self.container_group_consumed_forks = 0
_instances = obj.instances.all()
# We want the list of TaskManagerInstance objects because these are shared across the TaskManagerInstanceGroup objects.
# This way when we consume capacity on an instance that is in multiple groups, we tabulate across all the groups correctly.
self.instances = [task_manager_instances[instance.hostname] for instance in _instances if instance.hostname in task_manager_instances]
self.instance_hostnames = tuple([instance.hostname for instance in _instances if instance.hostname in task_manager_instances])
self.max_concurrent_jobs = obj.max_concurrent_jobs
self.max_forks = obj.max_forks
self.control_task_impact = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT)
def consume_capacity(self, task):
"""We only consume capacity on an instance group level if it is a container group. Otherwise we consume capacity on an instance level."""
if self.is_container_group:
self.container_group_jobs += 1
self.container_group_consumed_forks += task.task_impact
else:
raise RuntimeError("We only track capacity for container groups at the instance group level. Otherwise, consume capacity on instances.")
def get_remaining_instance_capacity(self):
return sum(inst.remaining_capacity for inst in self.instances)
def get_instance_capacity(self):
return sum(inst.capacity for inst in self.instances)
def get_consumed_instance_capacity(self):
return sum(inst.consumed_capacity for inst in self.instances)
def get_instance_jobs_running(self):
return sum(inst.jobs_running for inst in self.instances)
def get_jobs_running(self):
if self.is_container_group:
return self.container_group_jobs
return sum(inst.jobs_running for inst in self.instances)
def get_capacity(self):
"""This reports any type of capacity, including that of container group jobs.
Container groups don't really have capacity, but if they have max_forks set,
we can interperet that as how much capacity the user has defined them to have.
"""
if self.is_container_group:
return self.max_forks
return self.get_instance_capacity()
def get_consumed_capacity(self):
if self.is_container_group:
return self.container_group_consumed_forks
return self.get_consumed_instance_capacity()
def get_remaining_capacity(self):
return self.get_capacity() - self.get_consumed_capacity()
def has_remaining_capacity(self, task=None, control_impact=False):
"""Pass either a task or control_impact=True to determine if the IG has capacity to run the control task or job task."""
task_impact = self.control_task_impact if control_impact else task.task_impact
job_impact = 0 if control_impact else 1
task_string = f"task {task.log_format} with impact of {task_impact}" if task else f"control task with impact of {task_impact}"
# We only want to loop over instances if self.max_concurrent_jobs is set
if self.max_concurrent_jobs == 0:
# Override the calculated remaining capacity, because when max_concurrent_jobs == 0 we don't enforce any max
remaining_jobs = 0
else:
remaining_jobs = self.max_concurrent_jobs - self.get_jobs_running() - job_impact
# We only want to loop over instances if self.max_forks is set
if self.max_forks == 0:
# Override the calculated remaining capacity, because when max_forks == 0 we don't enforce any max
remaining_forks = 0
else:
remaining_forks = self.max_forks - self.get_consumed_capacity() - task_impact
if remaining_jobs < 0 or remaining_forks < 0:
# A value less than zero means the task will not fit on the group
if remaining_jobs < 0:
logger.debug(f"{task_string} cannot fit on instance group {self.name} with {remaining_jobs} remaining jobs")
if remaining_forks < 0:
logger.debug(f"{task_string} cannot fit on instance group {self.name} with {remaining_forks} remaining forks")
return False
# Returning true means there is enough remaining capacity on the group to run the task (or no instance group level limits are being set)
logger.debug(f"{task_string} can fit on instance group {self.name} with {remaining_forks} remaining forks and {remaining_jobs}")
return True
class TaskManagerInstances:
def __init__(self, instances=None, instance_fields=('node_type', 'capacity', 'hostname', 'enabled'), **kwargs):
def __init__(self, active_tasks, instances=None, instance_fields=('node_type', 'capacity', 'hostname', 'enabled')):
self.instances_by_hostname = dict()
self.instance_groups_container_group_jobs = dict()
self.instance_groups_container_group_consumed_forks = dict()
self.control_task_impact = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT)
if instances is None:
instances = (
Instance.objects.filter(hostname__isnull=False, node_state=Instance.States.READY, enabled=True)
@@ -143,15 +43,18 @@ class TaskManagerInstances:
.only('node_type', 'node_state', 'capacity', 'hostname', 'enabled')
)
for instance in instances:
self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance, **kwargs)
self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance)
def consume_capacity(self, task):
control_instance = self.instances_by_hostname.get(task.controller_node, '')
execution_instance = self.instances_by_hostname.get(task.execution_node, '')
if execution_instance and execution_instance.node_type in ('hybrid', 'execution'):
self.instances_by_hostname[task.execution_node].consume_capacity(task.task_impact, job_impact=True)
if control_instance and control_instance.node_type in ('hybrid', 'control'):
self.instances_by_hostname[task.controller_node].consume_capacity(self.control_task_impact)
# initialize remaining capacity based on currently waiting and running tasks
for task in active_tasks:
if task.status not in ['waiting', 'running']:
continue
control_instance = self.instances_by_hostname.get(task.controller_node, '')
execution_instance = self.instances_by_hostname.get(task.execution_node, '')
if execution_instance and execution_instance.node_type in ('hybrid', 'execution'):
self.instances_by_hostname[task.execution_node].consume_capacity(task.task_impact)
if control_instance and control_instance.node_type in ('hybrid', 'control'):
self.instances_by_hostname[task.controller_node].consume_capacity(settings.AWX_CONTROL_NODE_TASK_IMPACT)
def __getitem__(self, hostname):
return self.instances_by_hostname.get(hostname)
@@ -161,57 +64,42 @@ class TaskManagerInstances:
class TaskManagerInstanceGroups:
"""A class representing minimal data the task manager needs to represent all the InstanceGroups."""
"""A class representing minimal data the task manager needs to represent an InstanceGroup."""
def __init__(self, task_manager_instances=None, instance_groups=None, instance_groups_queryset=None, **kwargs):
def __init__(self, instances_by_hostname=None, instance_groups=None, instance_groups_queryset=None):
self.instance_groups = dict()
self.task_manager_instances = task_manager_instances if task_manager_instances is not None else TaskManagerInstances()
self.controlplane_ig = None
self.pk_ig_map = dict()
self.control_task_impact = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT)
self.controlplane_ig_name = kwargs.get('controlplane_ig_name', settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME)
if instance_groups is not None: # for testing
self.instance_groups = {ig.name: TaskManagerInstanceGroup(ig, self.task_manager_instances, **kwargs) for ig in instance_groups}
self.pk_ig_map = {ig.pk: ig for ig in instance_groups}
self.instance_groups = instance_groups
else:
if instance_groups_queryset is None:
instance_groups_queryset = InstanceGroup.objects.prefetch_related('instances').only(
'name', 'instances', 'max_concurrent_jobs', 'max_forks', 'is_container_group'
)
instance_groups_queryset = InstanceGroup.objects.prefetch_related('instances').only('name', 'instances')
for instance_group in instance_groups_queryset:
if instance_group.name == self.controlplane_ig_name:
if instance_group.name == settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME:
self.controlplane_ig = instance_group
self.instance_groups[instance_group.name] = TaskManagerInstanceGroup(instance_group, self.task_manager_instances, **kwargs)
self.instance_groups[instance_group.name] = dict(
instances=[
instances_by_hostname[instance.hostname] for instance in instance_group.instances.all() if instance.hostname in instances_by_hostname
],
)
self.pk_ig_map[instance_group.pk] = instance_group
def __getitem__(self, ig_name):
return self.instance_groups.get(ig_name)
def __contains__(self, ig_name):
return ig_name in self.instance_groups
def get_remaining_capacity(self, group_name):
return self.instance_groups[group_name].get_remaining_instance_capacity()
instances = self.instance_groups[group_name]['instances']
return sum(inst.remaining_capacity for inst in instances)
def get_consumed_capacity(self, group_name):
return self.instance_groups[group_name].get_consumed_capacity()
def get_jobs_running(self, group_name):
return self.instance_groups[group_name].get_jobs_running()
def get_capacity(self, group_name):
return self.instance_groups[group_name].get_capacity()
def get_instances(self, group_name):
return self.instance_groups[group_name].instances
instances = self.instance_groups[group_name]['instances']
return sum(inst.consumed_capacity for inst in instances)
def fit_task_to_most_remaining_capacity_instance(self, task, instance_group_name, impact=None, capacity_type=None, add_hybrid_control_cost=False):
impact = impact if impact else task.task_impact
capacity_type = capacity_type if capacity_type else task.capacity_type
instance_most_capacity = None
most_remaining_capacity = -1
instances = self.instance_groups[instance_group_name].instances
instances = self.instance_groups[instance_group_name]['instances']
for i in instances:
if i.node_type not in (capacity_type, 'hybrid'):
@@ -219,7 +107,7 @@ class TaskManagerInstanceGroups:
would_be_remaining = i.remaining_capacity - impact
# hybrid nodes _always_ control their own tasks
if add_hybrid_control_cost and i.node_type == 'hybrid':
would_be_remaining -= self.control_task_impact
would_be_remaining -= settings.AWX_CONTROL_NODE_TASK_IMPACT
if would_be_remaining >= 0 and (instance_most_capacity is None or would_be_remaining > most_remaining_capacity):
instance_most_capacity = i
most_remaining_capacity = would_be_remaining
@@ -227,13 +115,10 @@ class TaskManagerInstanceGroups:
def find_largest_idle_instance(self, instance_group_name, capacity_type='execution'):
largest_instance = None
instances = self.instance_groups[instance_group_name].instances
instances = self.instance_groups[instance_group_name]['instances']
for i in instances:
if i.node_type not in (capacity_type, 'hybrid'):
continue
if i.capacity <= 0:
# We don't want to select an idle instance with 0 capacity
continue
if (hasattr(i, 'jobs_running') and i.jobs_running == 0) or i.remaining_capacity == i.capacity:
if largest_instance is None:
largest_instance = i
@@ -254,56 +139,3 @@ class TaskManagerInstanceGroups:
logger.warn(f"No instance groups in cache exist, defaulting to global instance groups for task {task}")
return task.global_instance_groups
return igs
class TaskManagerModels:
def __init__(self, **kwargs):
# We want to avoid calls to settings over and over in loops, so cache this information here
kwargs['control_task_impact'] = kwargs.get('control_task_impact', settings.AWX_CONTROL_NODE_TASK_IMPACT)
kwargs['controlplane_ig_name'] = kwargs.get('controlplane_ig_name', settings.DEFAULT_CONTROL_PLANE_QUEUE_NAME)
self.instances = TaskManagerInstances(**kwargs)
self.instance_groups = TaskManagerInstanceGroups(task_manager_instances=self.instances, **kwargs)
@classmethod
def init_with_consumed_capacity(cls, **kwargs):
tmm = cls(**kwargs)
tasks = kwargs.get('tasks', None)
if tasks is None:
instance_group_queryset = kwargs.get('instance_groups_queryset', None)
# No tasks were provided, so we will fetch them from the database
task_status_filter_list = kwargs.get('task_status_filter_list', ['running', 'waiting'])
task_fields = kwargs.get('task_fields', ('task_impact', 'controller_node', 'execution_node', 'instance_group'))
from awx.main.models import UnifiedJob
if instance_group_queryset is not None:
logger.debug("******************INSTANCE GROUP QUERYSET PASSED -- FILTERING TASKS ****************************")
# Sometimes things like the serializer pass a queryset that looks at not all instance groups. in this case,
# we also need to filter the tasks we look at
tasks = UnifiedJob.objects.filter(status__in=task_status_filter_list, instance_group__in=[ig.id for ig in instance_group_queryset]).only(
*task_fields
)
else:
# No instance group query set, look at all tasks in whole system
tasks = UnifiedJob.objects.filter(status__in=task_status_filter_list).only(*task_fields)
for task in tasks:
tmm.consume_capacity(task)
return tmm
def consume_capacity(self, task):
# Consume capacity on instances, which bubbles up to instance groups they are a member of
self.instances.consume_capacity(task)
# For container group jobs, additionally we must account for capacity consumed since
# The container groups have no instances to look at to track how many jobs/forks are consumed
if task.instance_group_id:
if not task.instance_group_id in self.instance_groups.pk_ig_map.keys():
logger.warn(
f"Task {task.log_format} assigned {task.instance_group_id} but this instance group not present in map of instance groups{self.instance_groups.pk_ig_map.keys()}"
)
else:
ig = self.instance_groups.pk_ig_map[task.instance_group_id]
if ig.is_container_group:
self.instance_groups[ig.name].consume_capacity(task)

View File

@@ -61,15 +61,10 @@ def read_receptor_config():
return yaml.safe_load(f)
def work_signing_enabled(config_data):
for section in config_data:
if 'work-verification' in section:
return True
return False
def get_receptor_sockfile():
data = read_receptor_config()
def get_receptor_sockfile(config_data):
for section in config_data:
for section in data:
for entry_name, entry_data in section.items():
if entry_name == 'control-service':
if 'filename' in entry_data:
@@ -80,11 +75,12 @@ def get_receptor_sockfile(config_data):
raise RuntimeError(f'Receptor conf {__RECEPTOR_CONF} does not have control-service entry needed to get sockfile')
def get_tls_client(config_data, use_stream_tls=None):
def get_tls_client(use_stream_tls=None):
if not use_stream_tls:
return None
for section in config_data:
data = read_receptor_config()
for section in data:
for entry_name, entry_data in section.items():
if entry_name == 'tls-client':
if 'name' in entry_data:
@@ -92,12 +88,10 @@ def get_tls_client(config_data, use_stream_tls=None):
return None
def get_receptor_ctl(config_data=None):
if config_data is None:
config_data = read_receptor_config()
receptor_sockfile = get_receptor_sockfile(config_data)
def get_receptor_ctl():
receptor_sockfile = get_receptor_sockfile()
try:
return ReceptorControl(receptor_sockfile, config=__RECEPTOR_CONF, tlsclient=get_tls_client(config_data, True))
return ReceptorControl(receptor_sockfile, config=__RECEPTOR_CONF, tlsclient=get_tls_client(True))
except RuntimeError:
return ReceptorControl(receptor_sockfile)
@@ -165,18 +159,15 @@ def run_until_complete(node, timing_data=None, **kwargs):
"""
Runs an ansible-runner work_type on remote node, waits until it completes, then returns stdout.
"""
config_data = read_receptor_config()
receptor_ctl = get_receptor_ctl(config_data)
receptor_ctl = get_receptor_ctl()
use_stream_tls = getattr(get_conn_type(node, receptor_ctl), 'name', None) == "STREAMTLS"
kwargs.setdefault('tlsclient', get_tls_client(config_data, use_stream_tls))
kwargs.setdefault('tlsclient', get_tls_client(use_stream_tls))
kwargs.setdefault('ttl', '20s')
kwargs.setdefault('payload', '')
if work_signing_enabled(config_data):
kwargs['signwork'] = True
transmit_start = time.time()
result = receptor_ctl.submit_work(worktype='ansible-runner', node=node, **kwargs)
result = receptor_ctl.submit_work(worktype='ansible-runner', node=node, signwork=True, **kwargs)
unit_id = result['unitid']
run_start = time.time()
@@ -311,8 +302,7 @@ class AWXReceptorJob:
def run(self):
# We establish a connection to the Receptor socket
self.config_data = read_receptor_config()
receptor_ctl = get_receptor_ctl(self.config_data)
receptor_ctl = get_receptor_ctl()
res = None
try:
@@ -337,7 +327,7 @@ class AWXReceptorJob:
if self.work_type == 'ansible-runner':
work_submit_kw['node'] = self.task.instance.execution_node
use_stream_tls = get_conn_type(work_submit_kw['node'], receptor_ctl).name == "STREAMTLS"
work_submit_kw['tlsclient'] = get_tls_client(self.config_data, use_stream_tls)
work_submit_kw['tlsclient'] = get_tls_client(use_stream_tls)
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
transmitter_future = executor.submit(self.transmit, sockin)
@@ -487,9 +477,7 @@ class AWXReceptorJob:
@property
def sign_work(self):
if self.work_type in ('ansible-runner', 'local'):
return work_signing_enabled(self.config_data)
return False
return True if self.work_type in ('ansible-runner', 'local') else False
@property
def work_type(self):

View File

@@ -4,7 +4,7 @@ from awx.main.models import (
Instance,
InstanceGroup,
)
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups, TaskManagerInstances
class TestInstanceGroupInstanceMapping(TransactionTestCase):
@@ -23,10 +23,11 @@ class TestInstanceGroupInstanceMapping(TransactionTestCase):
def test_mapping(self):
self.sample_cluster()
with self.assertNumQueries(3):
instance_groups = TaskManagerInstanceGroups()
instances = TaskManagerInstances([]) # empty task list
instance_groups = TaskManagerInstanceGroups(instances_by_hostname=instances)
ig_instance_map = instance_groups.instance_groups
assert set(i.hostname for i in ig_instance_map['ig_small'].instances) == set(['i1'])
assert set(i.hostname for i in ig_instance_map['ig_large'].instances) == set(['i2', 'i3'])
assert set(i.hostname for i in ig_instance_map['default'].instances) == set(['i2'])
assert set(i.hostname for i in ig_instance_map['ig_small']['instances']) == set(['i1'])
assert set(i.hostname for i in ig_instance_map['ig_large']['instances']) == set(['i2', 'i3'])
assert set(i.hostname for i in ig_instance_map['default']['instances']) == set(['i2'])

View File

@@ -10,10 +10,6 @@ from awx.main.utils import (
create_temporary_fifo,
)
from awx.main.scheduler import TaskManager
from . import create_job
@pytest.fixture
def containerized_job(default_instance_group, kube_credential, job_template_factory):
@@ -38,50 +34,6 @@ def test_containerized_job(containerized_job):
assert containerized_job.instance_group.credential.kubernetes
@pytest.mark.django_db
def test_max_concurrent_jobs_blocks_start_of_new_jobs(controlplane_instance_group, containerized_job, mocker):
"""Construct a scenario where only 1 job will fit within the max_concurrent_jobs of the container group.
Since max_concurrent_jobs is set to 1, even though 2 jobs are in pending
and would be launched into the container group, only one will be started.
"""
containerized_job.unified_job_template.allow_simultaneous = True
containerized_job.unified_job_template.save()
default_instance_group = containerized_job.instance_group
default_instance_group.max_concurrent_jobs = 1
default_instance_group.save()
task_impact = 1
# Create a second job that should not be scheduled at first, blocked by the other
create_job(containerized_job.unified_job_template)
tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = task_impact
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once()
@pytest.mark.django_db
def test_max_forks_blocks_start_of_new_jobs(controlplane_instance_group, containerized_job, mocker):
"""Construct a scenario where only 1 job will fit within the max_forks of the container group.
In this case, we set the container_group max_forks to 10, and make the task_impact of a job 6.
Therefore, only 1 job will fit within the max of 10.
"""
containerized_job.unified_job_template.allow_simultaneous = True
containerized_job.unified_job_template.save()
default_instance_group = containerized_job.instance_group
default_instance_group.max_forks = 10
# Create a second job that should not be scheduled
create_job(containerized_job.unified_job_template)
tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = 6
with mock.patch("awx.main.scheduler.TaskManager.start_task"):
tm.schedule()
tm.start_task.assert_called_once()
@pytest.mark.django_db
def test_kubectl_ssl_verification(containerized_job, default_job_execution_environment):
containerized_job.execution_environment = default_job_execution_environment

View File

@@ -248,76 +248,6 @@ def test_multi_jt_capacity_blocking(hybrid_instance, job_template_factory, mocke
mock_job.assert_called_once_with(j2, controlplane_instance_group, [], instance)
@pytest.mark.django_db
def test_max_concurrent_jobs_ig_capacity_blocking(hybrid_instance, job_template_factory, mocker):
"""When max_concurrent_jobs of an instance group is more restrictive than capacity of instances, enforce max_concurrent_jobs."""
instance = hybrid_instance
controlplane_instance_group = instance.rampart_groups.first()
# We will expect only 1 job to be started
controlplane_instance_group.max_concurrent_jobs = 1
controlplane_instance_group.save()
num_jobs = 3
jobs = []
for i in range(num_jobs):
jobs.append(
create_job(job_template_factory(f'jt{i}', organization=f'org{i}', project=f'proj{i}', inventory=f'inv{i}', credential=f'cred{i}').job_template)
)
tm = TaskManager()
task_impact = 1
# Sanity check that multiple jobs would run if not for the max_concurrent_jobs setting.
assert task_impact * num_jobs < controlplane_instance_group.capacity
tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = task_impact
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once()
jobs[0].status = 'running'
jobs[0].controller_node = instance.hostname
jobs[0].execution_node = instance.hostname
jobs[0].instance_group = controlplane_instance_group
jobs[0].save()
# while that job is running, we should not start another job
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = task_impact
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_not_called()
# now job is done, we should start one of the two other jobs
jobs[0].status = 'successful'
jobs[0].save()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = task_impact
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once()
@pytest.mark.django_db
def test_max_forks_ig_capacity_blocking(hybrid_instance, job_template_factory, mocker):
"""When max_forks of an instance group is less than the capacity of instances, enforce max_forks."""
instance = hybrid_instance
controlplane_instance_group = instance.rampart_groups.first()
controlplane_instance_group.max_forks = 15
controlplane_instance_group.save()
task_impact = 10
num_jobs = 2
# Sanity check that 2 jobs would run if not for the max_forks setting.
assert controlplane_instance_group.max_forks < controlplane_instance_group.capacity
assert task_impact * num_jobs > controlplane_instance_group.max_forks
assert task_impact * num_jobs < controlplane_instance_group.capacity
for i in range(num_jobs):
create_job(job_template_factory(f'jt{i}', organization=f'org{i}', project=f'proj{i}', inventory=f'inv{i}', credential=f'cred{i}').job_template)
tm = TaskManager()
with mock.patch('awx.main.models.Job.task_impact', new_callable=mock.PropertyMock) as mock_task_impact:
mock_task_impact.return_value = task_impact
with mock.patch.object(TaskManager, "start_task", wraps=tm.start_task) as mock_job:
tm.schedule()
mock_job.assert_called_once()
@pytest.mark.django_db
def test_single_job_dependencies_project_launch(controlplane_instance_group, job_template_factory, mocker):
objects = job_template_factory('jt', organization='org1', project='proj', inventory='inv', credential='cred')

View File

@@ -121,8 +121,8 @@ def test_python_and_js_licenses():
return errors
base_dir = settings.BASE_DIR
api_licenses = index_licenses('%s/../licenses' % base_dir)
ui_licenses = index_licenses('%s/../licenses/ui' % base_dir)
api_licenses = index_licenses('%s/../docs/licenses' % base_dir)
ui_licenses = index_licenses('%s/../docs/licenses/ui' % base_dir)
api_requirements = read_api_requirements('%s/../requirements' % base_dir)
ui_requirements = read_ui_requirements('%s/ui' % base_dir)

View File

@@ -1,7 +1,10 @@
import pytest
from unittest import mock
from unittest.mock import Mock
from decimal import Decimal
from awx.main.models import Instance
from awx.main.models import InstanceGroup, Instance
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups
@pytest.mark.parametrize('capacity_adjustment', [0.0, 0.25, 0.5, 0.75, 1, 1.5, 3])
@@ -14,6 +17,83 @@ def test_capacity_adjustment_no_save(capacity_adjustment):
assert inst.capacity == (float(inst.capacity_adjustment) * abs(inst.mem_capacity - inst.cpu_capacity) + min(inst.mem_capacity, inst.cpu_capacity))
def T(impact):
j = mock.Mock(spec_set=['task_impact', 'capacity_type'])
j.task_impact = impact
j.capacity_type = 'execution'
return j
def Is(param):
"""
param:
[remaining_capacity1, remaining_capacity2, remaining_capacity3, ...]
[(jobs_running1, capacity1), (jobs_running2, capacity2), (jobs_running3, capacity3), ...]
"""
instances = []
if isinstance(param[0], tuple):
for (jobs_running, capacity) in param:
inst = Mock()
inst.capacity = capacity
inst.jobs_running = jobs_running
inst.node_type = 'execution'
instances.append(inst)
else:
for i in param:
inst = Mock()
inst.remaining_capacity = i
inst.node_type = 'execution'
instances.append(inst)
return instances
class TestInstanceGroup(object):
@pytest.mark.parametrize(
'task,instances,instance_fit_index,reason',
[
(T(100), Is([100]), 0, "Only one, pick it"),
(T(100), Is([100, 100]), 0, "Two equally good fits, pick the first"),
(T(100), Is([50, 100]), 1, "First instance not as good as second instance"),
(T(100), Is([50, 0, 20, 100, 100, 100, 30, 20]), 3, "Pick Instance [3] as it is the first that the task fits in."),
(T(100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"),
],
)
def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason):
InstanceGroup(id=10)
tm_igs = TaskManagerInstanceGroups(instance_groups={'controlplane': {'instances': instances}})
instance_picked = tm_igs.fit_task_to_most_remaining_capacity_instance(task, 'controlplane')
if instance_fit_index is None:
assert instance_picked is None, reason
else:
assert instance_picked == instances[instance_fit_index], reason
@pytest.mark.parametrize(
'instances,instance_fit_index,reason',
[
(Is([(0, 100)]), 0, "One idle instance, pick it"),
(Is([(1, 100)]), None, "One un-idle instance, pick nothing"),
(Is([(0, 100), (0, 200), (1, 500), (0, 700)]), 3, "Pick the largest idle instance"),
(Is([(0, 100), (0, 200), (1, 10000), (0, 700), (0, 699)]), 3, "Pick the largest idle instance"),
(Is([(0, 0)]), None, "One idle but down instance, don't pick it"),
],
)
def test_find_largest_idle_instance(self, instances, instance_fit_index, reason):
def filter_offline_instances(*args):
return filter(lambda i: i.capacity > 0, instances)
InstanceGroup(id=10)
instances_online_only = filter_offline_instances(instances)
tm_igs = TaskManagerInstanceGroups(instance_groups={'controlplane': {'instances': instances_online_only}})
if instance_fit_index is None:
assert tm_igs.find_largest_idle_instance('controlplane') is None, reason
else:
assert tm_igs.find_largest_idle_instance('controlplane') == instances[instance_fit_index], reason
def test_cleanup_params_defaults():
inst = Instance(hostname='foobar')
assert inst.get_cleanup_task_kwargs(exclude_strings=['awx_423_']) == {'exclude_strings': ['awx_423_'], 'file_pattern': '/tmp/awx_*_*', 'grace_period': 60}

View File

@@ -1,6 +1,6 @@
import pytest
from awx.main.scheduler.task_manager_models import TaskManagerModels
from awx.main.scheduler.task_manager_models import TaskManagerInstanceGroups, TaskManagerInstances
class FakeMeta(object):
@@ -16,64 +16,38 @@ class FakeObject(object):
class Job(FakeObject):
def __init__(self, **kwargs):
self.task_impact = kwargs.get('task_impact', 43)
self.is_container_group_task = kwargs.get('is_container_group_task', False)
self.controller_node = kwargs.get('controller_node', '')
self.execution_node = kwargs.get('execution_node', '')
self.instance_group = kwargs.get('instance_group', None)
self.instance_group_id = self.instance_group.id if self.instance_group else None
self.capacity_type = kwargs.get('capacity_type', 'execution')
task_impact = 43
is_container_group_task = False
controller_node = ''
execution_node = ''
def log_format(self):
return 'job 382 (fake)'
class Instances(FakeObject):
def add(self, *args):
for instance in args:
self.obj.instance_list.append(instance)
def all(self):
return self.obj.instance_list
class InstanceGroup(FakeObject):
def __init__(self, **kwargs):
super(InstanceGroup, self).__init__(**kwargs)
self.instance_list = []
self.pk = self.id = kwargs.get('id', 1)
@property
def instances(self):
mgr = Instances(obj=self)
return mgr
@property
def is_container_group(self):
return False
@property
def max_concurrent_jobs(self):
return 0
@property
def max_forks(self):
return 0
class Instance(FakeObject):
def __init__(self, **kwargs):
self.node_type = kwargs.get('node_type', 'hybrid')
self.capacity = kwargs.get('capacity', 0)
self.hostname = kwargs.get('hostname', 'fakehostname')
self.consumed_capacity = 0
self.jobs_running = 0
@pytest.fixture
def sample_cluster():
def stand_up_cluster():
class Instances(FakeObject):
def add(self, *args):
for instance in args:
self.obj.instance_list.append(instance)
def all(self):
return self.obj.instance_list
class InstanceGroup(FakeObject):
def __init__(self, **kwargs):
super(InstanceGroup, self).__init__(**kwargs)
self.instance_list = []
@property
def instances(self):
mgr = Instances(obj=self)
return mgr
class Instance(FakeObject):
pass
ig_small = InstanceGroup(name='ig_small')
ig_large = InstanceGroup(name='ig_large')
@@ -92,12 +66,14 @@ def sample_cluster():
@pytest.fixture
def create_ig_manager():
def _rf(ig_list, tasks):
tm_models = TaskManagerModels.init_with_consumed_capacity(
tasks=tasks,
instances=set(inst for ig in ig_list for inst in ig.instance_list),
instance_groups=ig_list,
)
return tm_models.instance_groups
instances = TaskManagerInstances(tasks, instances=set(inst for ig in ig_list for inst in ig.instance_list))
seed_igs = {}
for ig in ig_list:
seed_igs[ig.name] = {'instances': [instances[inst.hostname] for inst in ig.instance_list]}
instance_groups = TaskManagerInstanceGroups(instance_groups=seed_igs)
return instance_groups
return _rf
@@ -150,75 +126,3 @@ def test_RBAC_reduced_filter(sample_cluster, create_ig_manager):
# Cross-links between groups not visible to current user,
# so a naieve accounting of capacities is returned instead
assert instance_groups_mgr.get_consumed_capacity('default') == 43
def Is(param):
"""
param:
[remaining_capacity1, remaining_capacity2, remaining_capacity3, ...]
[(jobs_running1, capacity1), (jobs_running2, capacity2), (jobs_running3, capacity3), ...]
"""
instances = []
if isinstance(param[0], tuple):
for index, (jobs_running, capacity) in enumerate(param):
inst = Instance(capacity=capacity, node_type='execution', hostname=f'fakehost-{index}')
inst.jobs_running = jobs_running
instances.append(inst)
else:
for index, capacity in enumerate(param):
inst = Instance(capacity=capacity, node_type='execution', hostname=f'fakehost-{index}')
inst.node_type = 'execution'
instances.append(inst)
return instances
class TestSelectBestInstanceForTask(object):
@pytest.mark.parametrize(
'task,instances,instance_fit_index,reason',
[
(Job(task_impact=100), Is([100]), 0, "Only one, pick it"),
(Job(task_impact=100), Is([100, 100]), 0, "Two equally good fits, pick the first"),
(Job(task_impact=100), Is([50, 100]), 1, "First instance not as good as second instance"),
(Job(task_impact=100), Is([50, 0, 20, 100, 100, 100, 30, 20]), 3, "Pick Instance [3] as it is the first that the task fits in."),
(Job(task_impact=100), Is([50, 0, 20, 99, 11, 1, 5, 99]), None, "The task don't a fit, you must a quit!"),
],
)
def test_fit_task_to_most_remaining_capacity_instance(self, task, instances, instance_fit_index, reason):
ig = InstanceGroup(id=10, name='controlplane')
tasks = []
for instance in instances:
ig.instances.add(instance)
for _ in range(instance.jobs_running):
tasks.append(Job(execution_node=instance.hostname, controller_node=instance.hostname, instance_group=ig))
tm_models = TaskManagerModels.init_with_consumed_capacity(tasks=tasks, instances=instances, instance_groups=[ig])
instance_picked = tm_models.instance_groups.fit_task_to_most_remaining_capacity_instance(task, 'controlplane')
if instance_fit_index is None:
assert instance_picked is None, reason
else:
assert instance_picked.hostname == instances[instance_fit_index].hostname, reason
@pytest.mark.parametrize(
'instances,instance_fit_index,reason',
[
(Is([(0, 100)]), 0, "One idle instance, pick it"),
(Is([(1, 100)]), None, "One un-idle instance, pick nothing"),
(Is([(0, 100), (0, 200), (1, 500), (0, 700)]), 3, "Pick the largest idle instance"),
(Is([(0, 100), (0, 200), (1, 10000), (0, 700), (0, 699)]), 3, "Pick the largest idle instance"),
(Is([(0, 0)]), None, "One idle but down instance, don't pick it"),
],
)
def test_find_largest_idle_instance(self, instances, instance_fit_index, reason):
ig = InstanceGroup(id=10, name='controlplane')
tasks = []
for instance in instances:
ig.instances.add(instance)
for _ in range(instance.jobs_running):
tasks.append(Job(execution_node=instance.hostname, controller_node=instance.hostname, instance_group=ig))
tm_models = TaskManagerModels.init_with_consumed_capacity(tasks=tasks, instances=instances, instance_groups=[ig])
if instance_fit_index is None:
assert tm_models.instance_groups.find_largest_idle_instance('controlplane') is None, reason
else:
assert tm_models.instance_groups.find_largest_idle_instance('controlplane').hostname == instances[instance_fit_index].hostname, reason

View File

@@ -304,13 +304,11 @@ INSTALLED_APPS = [
'django.contrib.messages',
'django.contrib.sessions',
'django.contrib.sites',
# daphne has to be installed before django.contrib.staticfiles for the app to startup
# According to channels 4.0 docs you install daphne instead of channels now
'daphne',
'django.contrib.staticfiles',
'oauth2_provider',
'rest_framework',
'django_extensions',
'channels',
'polymorphic',
'taggit',
'social_django',
@@ -985,13 +983,6 @@ DJANGO_GUID = {'GUID_HEADER_NAME': 'X-API-Request-Id'}
DEFAULT_EXECUTION_QUEUE_NAME = 'default'
# pod spec used when the default execution queue is a container group, e.g. when deploying on k8s/ocp with the operator
DEFAULT_EXECUTION_QUEUE_POD_SPEC_OVERRIDE = ''
# Max number of concurrently consumed forks for the default execution queue
# Zero means no limit
DEFAULT_EXECUTION_QUEUE_MAX_FORKS = 0
# Max number of concurrently running jobs for the default execution queue
# Zero means no limit
DEFAULT_EXECUTION_QUEUE_MAX_CONCURRENT_JOBS = 0
# Name of the default controlplane queue
DEFAULT_CONTROL_PLANE_QUEUE_NAME = 'controlplane'

View File

@@ -1,49 +0,0 @@
import React from 'react';
import { arrayOf, bool, number, shape, string } from 'prop-types';
import { Label, LabelGroup } from '@patternfly/react-core';
import { Link } from 'react-router-dom';
function InstanceGroupLabels({ labels, isLinkable }) {
const buildLinkURL = (isContainerGroup) =>
isContainerGroup
? '/instance_groups/container_group/'
: '/instance_groups/';
return (
<LabelGroup numLabels={5}>
{labels.map(({ id, name, is_container_group }) =>
isLinkable ? (
<Label
color="blue"
key={id}
render={({ className, content, componentRef }) => (
<Link
className={className}
innerRef={componentRef}
to={`${buildLinkURL(is_container_group)}${id}/details`}
>
{content}
</Link>
)}
>
{name}
</Label>
) : (
<Label color="blue" key={id}>
{name}
</Label>
)
)}
</LabelGroup>
);
}
InstanceGroupLabels.propTypes = {
labels: arrayOf(shape({ id: number.isRequired, name: string.isRequired }))
.isRequired,
isLinkable: bool,
};
InstanceGroupLabels.defaultProps = { isLinkable: false };
export default InstanceGroupLabels;

View File

@@ -1 +0,0 @@
export { default } from './InstanceGroupLabels';

View File

@@ -6,7 +6,6 @@ import { Link } from 'react-router-dom';
import styled from 'styled-components';
import { Chip, Divider, Title } from '@patternfly/react-core';
import { toTitleCase } from 'util/strings';
import InstanceGroupLabels from 'components/InstanceGroupLabels';
import CredentialChip from '../CredentialChip';
import ChipGroup from '../ChipGroup';
import { DetailList, Detail, UserDateDetail } from '../DetailList';
@@ -228,7 +227,21 @@ function PromptDetail({
label={t`Instance Groups`}
rows={4}
value={
<InstanceGroupLabels labels={overrides.instance_groups} />
<ChipGroup
numChips={5}
totalChips={overrides.instance_groups.length}
ouiaId="prompt-instance-groups-chips"
>
{overrides.instance_groups.map((instance_group) => (
<Chip
key={instance_group.id}
ouiaId={`instance-group-${instance_group.id}-chip`}
isReadOnly
>
{instance_group.name}
</Chip>
))}
</ChipGroup>
}
/>
)}

View File

@@ -10,7 +10,6 @@ import useRequest, { useDismissableError } from 'hooks/useRequest';
import { JobTemplatesAPI, SchedulesAPI, WorkflowJobTemplatesAPI } from 'api';
import { parseVariableField, jsonToYaml } from 'util/yaml';
import { useConfig } from 'contexts/Config';
import InstanceGroupLabels from 'components/InstanceGroupLabels';
import parseRuleObj from '../shared/parseRuleObj';
import FrequencyDetails from './FrequencyDetails';
import AlertModal from '../../AlertModal';
@@ -28,6 +27,11 @@ import { VariablesDetail } from '../../CodeEditor';
import { VERBOSITY } from '../../VerbositySelectField';
import getHelpText from '../../../screens/Template/shared/JobTemplate.helptext';
const buildLinkURL = (instance) =>
instance.is_container_group
? '/instance_groups/container_group/'
: '/instance_groups/';
const PromptDivider = styled(Divider)`
margin-top: var(--pf-global--spacer--lg);
margin-bottom: var(--pf-global--spacer--lg);
@@ -494,7 +498,26 @@ function ScheduleDetail({ hasDaysToKeepField, schedule, surveyConfig }) {
fullWidth
label={t`Instance Groups`}
value={
<InstanceGroupLabels labels={instanceGroups} isLinkable />
<ChipGroup
numChips={5}
totalChips={instanceGroups.length}
ouiaId="instance-group-chips"
>
{instanceGroups.map((ig) => (
<Link
to={`${buildLinkURL(ig)}${ig.id}/details`}
key={ig.id}
>
<Chip
key={ig.id}
ouiaId={`instance-group-${ig.id}-chip`}
isReadOnly
>
{ig.name}
</Chip>
</Link>
))}
</ChipGroup>
}
isEmpty={instanceGroups.length === 0}
/>

View File

@@ -24,10 +24,12 @@ function WorkflowOutputNavigation({ relatedJobs, parentRef }) {
const { id } = useParams();
const relevantResults = relatedJobs.filter(
({ job: jobId, summary_fields }) =>
jobId &&
`${jobId}` !== id &&
summary_fields.job.type !== 'workflow_approval'
({
job: jobId,
summary_fields: {
unified_job_template: { unified_job_type },
},
}) => jobId && `${jobId}` !== id && unified_job_type !== 'workflow_approval'
);
const [isOpen, setIsOpen] = useState(false);
@@ -99,14 +101,16 @@ function WorkflowOutputNavigation({ relatedJobs, parentRef }) {
{sortedJobs?.map((node) => (
<SelectOption
key={node.id}
to={`/jobs/${JOB_URL_SEGMENT_MAP[node.summary_fields.job.type]}/${
node.summary_fields.job?.id
}/output`}
to={`/jobs/${
JOB_URL_SEGMENT_MAP[
node.summary_fields.unified_job_template.unified_job_type
]
}/${node.summary_fields.job?.id}/output`}
component={Link}
value={node.summary_fields.job.name}
value={node.summary_fields.unified_job_template.name}
>
{stringIsUUID(node.identifier)
? node.summary_fields.job.name
? node.summary_fields.unified_job_template.name
: node.identifier}
</SelectOption>
))}

View File

@@ -1,85 +0,0 @@
import React from 'react';
import { within, render, screen, waitFor } from '@testing-library/react';
import userEvent from '@testing-library/user-event';
import WorkflowOutputNavigation from './WorkflowOutputNavigation';
import { createMemoryHistory } from 'history';
import { I18nProvider } from '@lingui/react';
import { i18n } from '@lingui/core';
import { en } from 'make-plural/plurals';
import english from '../../../src/locales/en/messages';
import { Router } from 'react-router-dom';
jest.mock('react-router-dom', () => ({
...jest.requireActual('react-router-dom'),
useParams: () => ({
id: 1,
}),
}));
const jobs = [
{
id: 1,
summary_fields: {
job: {
name: 'Ansible',
type: 'project_update',
id: 1,
status: 'successful',
},
},
job: 4,
},
{
id: 2,
summary_fields: {
job: {
name: 'Durham',
type: 'job',
id: 2,
status: 'successful',
},
},
job: 3,
},
{
id: 3,
summary_fields: {
job: {
name: 'Red hat',
type: 'job',
id: 3,
status: 'successful',
},
},
job: 2,
},
];
describe('<WorkflowOuputNavigation/>', () => {
test('Should open modal and deprovision node', async () => {
i18n.loadLocaleData({ en: { plurals: en } });
i18n.load({ en: english });
i18n.activate('en');
const user = userEvent.setup();
const ref = jest
.spyOn(React, 'useRef')
.mockReturnValueOnce({ current: 'div' });
const history = createMemoryHistory({
initialEntries: ['jobs/playbook/2/output'],
});
render(
<I18nProvider i18n={i18n}>
<Router history={history}>
<WorkflowOutputNavigation relatedJobs={jobs} parentRef={ref} />
</Router>
</I18nProvider>
);
const button = screen.getByRole('button');
await user.click(button);
await waitFor(() => screen.getByText('Workflow Nodes'));
await waitFor(() => screen.getByText('Red hat'));
await waitFor(() => screen.getByText('Durham'));
await waitFor(() => screen.getByText('Ansible'));
});
});

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -29,10 +29,6 @@ function ContainerGroupAdd() {
try {
const { data: response } = await InstanceGroupsAPI.create({
name: values.name,
max_forks: values.max_forks ? values.max_forks : 0,
max_concurrent_jobs: values.max_concurrent_jobs
? values.max_concurrent_jobs
: 0,
credential: values?.credential?.id,
pod_spec_override: values.override
? getPodSpecValue(values.pod_spec_override)

View File

@@ -33,8 +33,6 @@ const initialPodSpec = {
const instanceGroupCreateData = {
name: 'Fuz',
credential: { id: 71, name: 'CG' },
max_concurrent_jobs: 0,
max_forks: 0,
pod_spec_override:
'apiVersion: v1\nkind: Pod\nmetadata:\n namespace: default\nspec:\n containers:\n - image: ansible/ansible-runner\n tty: true\n stdin: true\n imagePullPolicy: Always\n args:\n - sleep\n - infinity\n - test',
};

View File

@@ -9,12 +9,7 @@ import AlertModal from 'components/AlertModal';
import ErrorDetail from 'components/ErrorDetail';
import { CardBody, CardActionsRow } from 'components/Card';
import DeleteButton from 'components/DeleteButton';
import {
Detail,
DetailList,
UserDateDetail,
DetailBadge,
} from 'components/DetailList';
import { Detail, DetailList, UserDateDetail } from 'components/DetailList';
import useRequest, { useDismissableError } from 'hooks/useRequest';
import { jsonToYaml, isJsonString } from 'util/yaml';
import { InstanceGroupsAPI } from 'api';
@@ -52,20 +47,6 @@ function ContainerGroupDetails({ instanceGroup }) {
value={t`Container group`}
dataCy="container-group-type"
/>
<DetailBadge
label={t`Max concurrent jobs`}
dataCy="instance-group-max-concurrent-jobs"
helpText={t`Maximum number of jobs to run concurrently on this group.
Zero means no limit will be enforced.`}
content={instanceGroup.max_concurrent_jobs}
/>
<DetailBadge
label={t`Max forks`}
dataCy="instance-group-max-forks"
helpText={t`Maximum number of forks to allow across all jobs running concurrently on this group.
Zero means no limit will be enforced.`}
content={instanceGroup.max_forks}
/>
{instanceGroup.summary_fields.credential && (
<Detail
label={t`Credential`}

View File

@@ -23,8 +23,6 @@ const instanceGroup = {
created: '2020-09-03T18:26:47.113934Z',
modified: '2020-09-03T19:34:23.244694Z',
capacity: 0,
max_concurrent_jobs: 0,
max_forks: 0,
committed_capacity: 0,
consumed_capacity: 0,
percent_capacity_remaining: 0.0,

View File

@@ -39,10 +39,6 @@ function ContainerGroupEdit({ instanceGroup }) {
name: values.name,
credential: values.credential ? values.credential.id : null,
pod_spec_override: values.override ? values.pod_spec_override : null,
max_forks: values.max_forks ? values.max_forks : 0,
max_concurrent_jobs: values.max_concurrent_jobs
? values.max_concurrent_jobs
: 0,
is_container_group: true,
});
history.push(detailsIUrl);

View File

@@ -34,8 +34,6 @@ const instanceGroup = {
policy_instance_percentage: 0,
policy_instance_minimum: 0,
policy_instance_list: [],
max_concurrent_jobs: 0,
max_forks: 0,
pod_spec_override: '',
summary_fields: {
credential: {
@@ -146,8 +144,6 @@ describe('<ContainerGroupEdit/>', () => {
...updatedInstanceGroup,
credential: 12,
pod_spec_override: null,
max_concurrent_jobs: 0,
max_forks: 0,
is_container_group: true,
});
expect(history.location.pathname).toEqual(

View File

@@ -42,8 +42,6 @@ const instanceGroup = {
credential: null,
policy_instance_percentage: 100,
policy_instance_minimum: 0,
max_concurrent_jobs: 0,
max_forks: 0,
policy_instance_list: ['receptor-1', 'receptor-2'],
pod_spec_override: '',
summary_fields: {

View File

@@ -73,20 +73,6 @@ function InstanceGroupDetails({ instanceGroup }) {
dataCy="instance-group-policy-instance-percentage"
content={`${instanceGroup.policy_instance_percentage} %`}
/>
<DetailBadge
label={t`Max concurrent jobs`}
dataCy="instance-group-max-concurrent-jobs"
helpText={t`Maximum number of jobs to run concurrently on this group.
Zero means no limit will be enforced.`}
content={instanceGroup.max_concurrent_jobs}
/>
<DetailBadge
label={t`Max forks`}
dataCy="instance-group-max-forks"
helpText={t`Maximum number of forks to allow across all jobs running concurrently on this group.
Zero means no limit will be enforced.`}
content={instanceGroup.max_forks}
/>
{instanceGroup.capacity ? (
<DetailBadge
label={t`Used capacity`}

View File

@@ -19,8 +19,6 @@ const instanceGroups = [
policy_instance_minimum: 10,
policy_instance_percentage: 50,
percent_capacity_remaining: 60,
max_concurrent_jobs: 0,
max_forks: 0,
is_container_group: false,
created: '2020-07-21T18:41:02.818081Z',
modified: '2020-07-24T20:32:03.121079Z',
@@ -40,8 +38,6 @@ const instanceGroups = [
policy_instance_minimum: 0,
policy_instance_percentage: 0,
percent_capacity_remaining: 0,
max_concurrent_jobs: 0,
max_forks: 0,
is_container_group: true,
created: '2020-07-21T18:41:02.818081Z',
modified: '2020-07-24T20:32:03.121079Z',

View File

@@ -11,7 +11,7 @@ import FormField, {
CheckboxField,
} from 'components/FormField';
import FormActionGroup from 'components/FormActionGroup';
import { required, minMaxValue } from 'util/validators';
import { required } from 'util/validators';
import {
FormColumnLayout,
FormFullWidthLayout,
@@ -57,26 +57,6 @@ function ContainerGroupFormFields({ instanceGroup }) {
tooltip={t`Credential to authenticate with Kubernetes or OpenShift. Must be of type "Kubernetes/OpenShift API Bearer Token". If left blank, the underlying Pod's service account will be used.`}
autoPopulate={!instanceGroup?.id}
/>
<FormField
id="instance-group-max-concurrent-jobs"
label={t`Max concurrent jobs`}
name="max_concurrent_jobs"
type="number"
min="0"
validate={minMaxValue(0, 2147483647)}
tooltip={t`Maximum number of jobs to run concurrently on this group.
Zero means no limit will be enforced.`}
/>
<FormField
id="instance-group-max-forks"
label={t`Max forks`}
name="max_forks"
type="number"
min="0"
validate={minMaxValue(0, 2147483647)}
tooltip={t`Maximum number of forks to allow across all jobs running concurrently on this group.
Zero means no limit will be enforced.`}
/>
<FormGroup fieldId="container-groups-option-checkbox" label={t`Options`}>
<FormCheckboxLayout>
@@ -117,8 +97,6 @@ function ContainerGroupForm({
const initialValues = {
name: instanceGroup?.name || '',
max_concurrent_jobs: instanceGroup.max_concurrent_jobs || 0,
max_forks: instanceGroup.max_forks || 0,
credential: instanceGroup?.summary_fields?.credential,
pod_spec_override: isCheckboxChecked
? instanceGroup?.pod_spec_override

View File

@@ -42,26 +42,6 @@ function InstanceGroupFormFields() {
assigned to this group when new instances come online.`}
validate={minMaxValue(0, 100)}
/>
<FormField
id="instance-group-max-concurrent-jobs"
label={t`Max concurrent jobs`}
name="max_concurrent_jobs"
type="number"
min="0"
validate={minMaxValue(0, 2147483647)}
tooltip={t`Maximum number of jobs to run concurrently on this group.
Zero means no limit will be enforced.`}
/>
<FormField
id="instance-group-max-forks"
label={t`Max forks`}
name="max_forks"
type="number"
min="0"
validate={minMaxValue(0, 2147483647)}
tooltip={t`Maximum number of forks to allow across all jobs running concurrently on this group.
Zero means no limit will be enforced.`}
/>
</>
);
}
@@ -77,8 +57,6 @@ function InstanceGroupForm({
name: instanceGroup.name || '',
policy_instance_minimum: instanceGroup.policy_instance_minimum || 0,
policy_instance_percentage: instanceGroup.policy_instance_percentage || 0,
max_concurrent_jobs: instanceGroup.max_concurrent_jobs || 0,
max_forks: instanceGroup.max_forks || 0,
};
return (
<Formik

View File

@@ -1,6 +1,6 @@
import React, { useCallback, useEffect, useState } from 'react';
import { useHistory, useParams } from 'react-router-dom';
import { Link, useHistory, useParams } from 'react-router-dom';
import { t, Plural } from '@lingui/macro';
import {
Button,
@@ -11,6 +11,7 @@ import {
CodeBlockCode,
Tooltip,
Slider,
Label,
} from '@patternfly/react-core';
import { DownloadIcon, OutlinedClockIcon } from '@patternfly/react-icons';
import styled from 'styled-components';
@@ -33,7 +34,6 @@ import useRequest, {
useDismissableError,
} from 'hooks/useRequest';
import HealthCheckAlert from 'components/HealthCheckAlert';
import InstanceGroupLabels from 'components/InstanceGroupLabels';
import RemoveInstanceButton from '../Shared/RemoveInstanceButton';
const Unavailable = styled.span`
@@ -156,6 +156,11 @@ function InstanceDetail({ setBreadcrumb, isK8s }) {
</>
);
const buildLinkURL = (inst) =>
inst.is_container_group
? '/instance_groups/container_group/'
: '/instance_groups/';
const { error, dismissError } = useDismissableError(
updateInstanceError || healthCheckError
);
@@ -220,9 +225,25 @@ function InstanceDetail({ setBreadcrumb, isK8s }) {
label={t`Instance Groups`}
dataCy="instance-groups"
helpText={t`The Instance Groups to which this instance belongs.`}
value={
<InstanceGroupLabels labels={instanceGroups} isLinkable />
}
value={instanceGroups.map((ig) => (
<React.Fragment key={ig.id}>
<Label
color="blue"
isTruncated
render={({ className, content, componentRef }) => (
<Link
to={`${buildLinkURL(ig)}${ig.id}/details`}
className={className}
innerRef={componentRef}
>
{content}
</Link>
)}
>
{ig.name}
</Label>{' '}
</React.Fragment>
))}
isEmpty={instanceGroups.length === 0}
/>
)}

View File

@@ -23,7 +23,6 @@ import { InventoriesAPI } from 'api';
import useRequest, { useDismissableError } from 'hooks/useRequest';
import { Inventory } from 'types';
import { relatedResourceDeleteRequests } from 'util/getRelatedResourceDeleteDetails';
import InstanceGroupLabels from 'components/InstanceGroupLabels';
import getHelpText from '../shared/Inventory.helptext';
function InventoryDetail({ inventory }) {
@@ -106,7 +105,23 @@ function InventoryDetail({ inventory }) {
<Detail
fullWidth
label={t`Instance Groups`}
value={<InstanceGroupLabels labels={instanceGroups} isLinkable />}
value={
<ChipGroup
numChips={5}
totalChips={instanceGroups?.length}
ouiaId="instance-group-chips"
>
{instanceGroups?.map((ig) => (
<Chip
key={ig.id}
isReadOnly
ouiaId={`instance-group-${ig.id}-chip`}
>
{ig.name}
</Chip>
))}
</ChipGroup>
}
isEmpty={instanceGroups.length === 0}
/>
)}

View File

@@ -131,8 +131,9 @@ describe('<InventoryDetail />', () => {
expect(InventoriesAPI.readInstanceGroups).toHaveBeenCalledWith(
mockInventory.id
);
const label = wrapper.find('Label').at(0);
expect(label.prop('children')).toEqual('Foo');
const chip = wrapper.find('Chip').at(0);
expect(chip.prop('isReadOnly')).toEqual(true);
expect(chip.prop('children')).toEqual('Foo');
});
test('should not load instance groups', async () => {

View File

@@ -2,7 +2,7 @@ import React, { useCallback, useEffect } from 'react';
import { Link, useHistory } from 'react-router-dom';
import { t } from '@lingui/macro';
import { Button, Label } from '@patternfly/react-core';
import { Button, Chip, Label } from '@patternfly/react-core';
import { Inventory } from 'types';
import { InventoriesAPI, UnifiedJobsAPI } from 'api';
@@ -10,6 +10,7 @@ import useRequest, { useDismissableError } from 'hooks/useRequest';
import AlertModal from 'components/AlertModal';
import { CardBody, CardActionsRow } from 'components/Card';
import ChipGroup from 'components/ChipGroup';
import { VariablesDetail } from 'components/CodeEditor';
import ContentError from 'components/ContentError';
import ContentLoading from 'components/ContentLoading';
@@ -17,7 +18,6 @@ import DeleteButton from 'components/DeleteButton';
import { DetailList, Detail, UserDateDetail } from 'components/DetailList';
import ErrorDetail from 'components/ErrorDetail';
import Sparkline from 'components/Sparkline';
import InstanceGroupLabels from 'components/InstanceGroupLabels';
function SmartInventoryDetail({ inventory }) {
const history = useHistory();
@@ -120,7 +120,23 @@ function SmartInventoryDetail({ inventory }) {
<Detail
fullWidth
label={t`Instance groups`}
value={<InstanceGroupLabels labels={instanceGroups} />}
value={
<ChipGroup
numChips={5}
totalChips={instanceGroups.length}
ouiaId="instance-group-chips"
>
{instanceGroups.map((ig) => (
<Chip
key={ig.id}
isReadOnly
ouiaId={`instance-group-${ig.id}-chip`}
>
{ig.name}
</Chip>
))}
</ChipGroup>
}
isEmpty={instanceGroups.length === 0}
/>
<VariablesDetail

View File

@@ -4,7 +4,6 @@ import { getJobModel } from 'util/jobs';
export default function useWsJob(initialJob) {
const [job, setJob] = useState(initialJob);
const [pendingMessages, setPendingMessages] = useState([]);
const lastMessage = useWebsocket({
jobs: ['status_changed'],
control: ['limit_reached_1'],
@@ -14,48 +13,30 @@ export default function useWsJob(initialJob) {
setJob(initialJob);
}, [initialJob]);
const processMessage = (message) => {
if (message.unified_job_id !== job.id) {
return;
}
if (
['successful', 'failed', 'error', 'cancelled'].includes(message.status)
) {
fetchJob();
}
setJob(updateJob(job, message));
};
async function fetchJob() {
const { data } = await getJobModel(job.type).readDetail(job.id);
setJob(data);
}
useEffect(
() => {
if (!lastMessage) {
async function fetchJob() {
const { data } = await getJobModel(job.type).readDetail(job.id);
setJob(data);
}
if (!job || lastMessage?.unified_job_id !== job.id) {
return;
}
if (job) {
processMessage(lastMessage);
} else if (lastMessage.unified_job_id) {
setPendingMessages(pendingMessages.concat(lastMessage));
if (
['successful', 'failed', 'error', 'cancelled'].includes(
lastMessage.status
)
) {
fetchJob();
} else {
setJob(updateJob(job, lastMessage));
}
},
[lastMessage] // eslint-disable-line react-hooks/exhaustive-deps
);
useEffect(() => {
if (!job || !pendingMessages.length) {
return;
}
pendingMessages.forEach((message) => {
processMessage(message);
});
setPendingMessages([]);
}, [job, pendingMessages]); // eslint-disable-line react-hooks/exhaustive-deps
return job;
}

View File

@@ -2,7 +2,7 @@ import React, { useEffect, useState, useCallback } from 'react';
import { Link, useHistory, useRouteMatch } from 'react-router-dom';
import { t } from '@lingui/macro';
import { Button } from '@patternfly/react-core';
import { Button, Chip } from '@patternfly/react-core';
import { OrganizationsAPI } from 'api';
import { DetailList, Detail, UserDateDetail } from 'components/DetailList';
import { CardBody, CardActionsRow } from 'components/Card';
@@ -16,7 +16,6 @@ import ErrorDetail from 'components/ErrorDetail';
import useRequest, { useDismissableError } from 'hooks/useRequest';
import { useConfig } from 'contexts/Config';
import ExecutionEnvironmentDetail from 'components/ExecutionEnvironmentDetail';
import InstanceGroupLabels from 'components/InstanceGroupLabels';
import { relatedResourceDeleteRequests } from 'util/getRelatedResourceDeleteDetails';
function OrganizationDetail({ organization }) {
@@ -80,6 +79,11 @@ function OrganizationDetail({ organization }) {
return <ContentError error={contentError} />;
}
const buildLinkURL = (instance) =>
instance.is_container_group
? '/instance_groups/container_group/'
: '/instance_groups/';
return (
<CardBody>
<DetailList>
@@ -122,7 +126,25 @@ function OrganizationDetail({ organization }) {
fullWidth
label={t`Instance Groups`}
helpText={t`The Instance Groups for this Organization to run on.`}
value={<InstanceGroupLabels labels={instanceGroups} isLinkable />}
value={
<ChipGroup
numChips={5}
totalChips={instanceGroups.length}
ouiaId="instance-group-chips"
>
{instanceGroups.map((ig) => (
<Link to={`${buildLinkURL(ig)}${ig.id}/details`} key={ig.id}>
<Chip
key={ig.id}
isReadOnly
ouiaId={`instance-group-${ig.id}-chip`}
>
{ig.name}
</Chip>
</Link>
))}
</ChipGroup>
}
isEmpty={instanceGroups.length === 0}
/>
)}

View File

@@ -90,7 +90,7 @@ describe('<OrganizationDetail />', () => {
await waitForElement(component, 'ContentLoading', (el) => el.length === 0);
expect(
component
.find('Label')
.find('Chip')
.findWhere((el) => el.text() === 'One')
.exists()
).toBe(true);

View File

@@ -34,7 +34,6 @@ import useRequest, { useDismissableError } from 'hooks/useRequest';
import useBrandName from 'hooks/useBrandName';
import ExecutionEnvironmentDetail from 'components/ExecutionEnvironmentDetail';
import { relatedResourceDeleteRequests } from 'util/getRelatedResourceDeleteDetails';
import InstanceGroupLabels from 'components/InstanceGroupLabels';
import getHelpText from '../shared/JobTemplate.helptext';
function JobTemplateDetail({ template }) {
@@ -168,6 +167,11 @@ function JobTemplateDetail({ template }) {
);
};
const buildLinkURL = (instance) =>
instance.is_container_group
? '/instance_groups/container_group/'
: '/instance_groups/';
if (instanceGroupsError) {
return <ContentError error={instanceGroupsError} />;
}
@@ -418,7 +422,25 @@ function JobTemplateDetail({ template }) {
label={t`Instance Groups`}
dataCy="jt-detail-instance-groups"
helpText={helpText.instanceGroups}
value={<InstanceGroupLabels labels={instanceGroups} isLinkable />}
value={
<ChipGroup
numChips={5}
totalChips={instanceGroups.length}
ouiaId="instance-group-chips"
>
{instanceGroups.map((ig) => (
<Link to={`${buildLinkURL(ig)}${ig.id}/details`} key={ig.id}>
<Chip
key={ig.id}
ouiaId={`instance-group-${ig.id}-chip`}
isReadOnly
>
{ig.name}
</Chip>
</Link>
))}
</ChipGroup>
}
isEmpty={instanceGroups.length === 0}
/>
{job_tags && (

View File

@@ -28,64 +28,52 @@ options:
default: 'False'
organizations:
description:
- organization names to export
type: list
elements: str
- organization name to export
type: str
users:
description:
- user names to export
type: list
elements: str
- user name to export
type: str
teams:
description:
- team names to export
type: list
elements: str
- team name to export
type: str
credential_types:
description:
- credential type names to export
type: list
elements: str
- credential type name to export
type: str
credentials:
description:
- credential names to export
type: list
elements: str
- credential name to export
type: str
execution_environments:
description:
- execution environment names to export
type: list
elements: str
- execution environment name to export
type: str
notification_templates:
description:
- notification template names to export
type: list
elements: str
- notification template name to export
type: str
inventory_sources:
description:
- inventory soruces to export
type: list
elements: str
- inventory soruce to export
type: str
inventory:
description:
- inventory names to export
type: list
elements: str
- inventory name to export
type: str
projects:
description:
- project names to export
type: list
elements: str
- project name to export
type: str
job_templates:
description:
- job template names to export
type: list
elements: str
- job template name to export
type: str
workflow_job_templates:
description:
- workflow names to export
type: list
elements: str
- workflow name to export
type: str
requirements:
- "awxkit >= 9.3.0"
notes:
@@ -106,10 +94,6 @@ EXAMPLES = '''
export:
job_templates: "My Template"
credential: 'all'
- name: Export a list of inventories
export:
inventory: ['My Inventory 1', 'My Inventory 2']
'''
import logging
@@ -127,12 +111,24 @@ except ImportError:
def main():
argument_spec = dict(
all=dict(type='bool', default=False),
credential_types=dict(type='str'),
credentials=dict(type='str'),
execution_environments=dict(type='str'),
inventory=dict(type='str'),
inventory_sources=dict(type='str'),
job_templates=dict(type='str'),
notification_templates=dict(type='str'),
organizations=dict(type='str'),
projects=dict(type='str'),
teams=dict(type='str'),
users=dict(type='str'),
workflow_job_templates=dict(type='str'),
)
# We are not going to raise an error here because the __init__ method of ControllerAWXKitModule will do that for us
if HAS_EXPORTABLE_RESOURCES:
for resource in EXPORTABLE_RESOURCES:
argument_spec[resource] = dict(type='list', elements='str')
argument_spec[resource] = dict(type='str')
module = ControllerAWXKitModule(argument_spec=argument_spec)

View File

@@ -54,18 +54,6 @@ options:
required: False
type: int
default: '0'
max_concurrent_jobs:
description:
- Maximum number of concurrent jobs to run on this group. Zero means no limit.
required: False
type: int
default: '0'
max_forks:
description:
- Max forks to execute on this group. Zero means no limit.
required: False
type: int
default: '0'
policy_instance_list:
description:
- List of exact-match Instances that will be assigned to this group
@@ -107,8 +95,6 @@ def main():
is_container_group=dict(type='bool', default=False),
policy_instance_percentage=dict(type='int', default='0'),
policy_instance_minimum=dict(type='int', default='0'),
max_concurrent_jobs=dict(type='int', default='0'),
max_forks=dict(type='int', default='0'),
policy_instance_list=dict(type='list', elements='str'),
pod_spec_override=dict(),
instances=dict(required=False, type="list", elements='str', default=None),
@@ -125,8 +111,6 @@ def main():
is_container_group = module.params.get('is_container_group')
policy_instance_percentage = module.params.get('policy_instance_percentage')
policy_instance_minimum = module.params.get('policy_instance_minimum')
max_concurrent_jobs = module.params.get('max_concurrent_jobs')
max_forks = module.params.get('max_forks')
policy_instance_list = module.params.get('policy_instance_list')
pod_spec_override = module.params.get('pod_spec_override')
instances = module.params.get('instances')
@@ -160,10 +144,6 @@ def main():
new_fields['policy_instance_percentage'] = policy_instance_percentage
if policy_instance_minimum is not None:
new_fields['policy_instance_minimum'] = policy_instance_minimum
if max_concurrent_jobs is not None:
new_fields['max_concurrent_jobs'] = max_concurrent_jobs
if max_forks is not None:
new_fields['max_forks'] = max_forks
if policy_instance_list is not None:
new_fields['policy_instance_list'] = policy_instance_list
if pod_spec_override is not None:

View File

@@ -61,40 +61,6 @@
- mixed_export['assets']['organizations'] | length() == 1
- "'workflow_job_templates' not in mixed_export['assets']"
- name: Export list of organizations
export:
organizations: "{{[org_name1, org_name2]}}"
register: list_asserts
- assert:
that:
- list_asserts is not changed
- list_asserts is successful
- list_asserts['assets']['organizations'] | length() >= 2
- name: Export list with one organization
export:
organizations: "{{[org_name1]}}"
register: list_asserts
- assert:
that:
- list_asserts is not changed
- list_asserts is successful
- list_asserts['assets']['organizations'] | length() >= 1
- "org_name1 in (list_asserts['assets']['organizations'] | map(attribute='name') )"
- name: Export one organization as string
export:
organizations: "{{org_name2}}"
register: string_asserts
- assert:
that:
- string_asserts is not changed
- string_asserts is successful
- string_asserts['assets']['organizations'] | length() >= 1
- "org_name2 in (string_asserts['assets']['organizations'] | map(attribute='name') )"
always:
- name: Remove our inventory
inventory:

View File

@@ -213,23 +213,11 @@ class ApiV2(base.Base):
assets = (self._export(asset, post_fields) for asset in endpoint.results)
return [asset for asset in assets if asset is not None]
def _check_for_int(self, value):
return isinstance(value, int) or (isinstance(value, str) and value.isdecimal())
def _filtered_list(self, endpoint, value):
if isinstance(value, list) and len(value) == 1:
value = value[0]
if self._check_for_int(value):
if isinstance(value, int) or value.isdecimal():
return endpoint.get(id=int(value))
options = self._cache.get_options(endpoint)
identifier = next(field for field in options['search_fields'] if field in ('name', 'username', 'hostname'))
if isinstance(value, list):
if all(self._check_for_int(item) for item in value):
identifier = 'or__id'
else:
identifier = 'or__' + identifier
return endpoint.get(**{identifier: value}, all_pages=True)
def export_assets(self, **kwargs):

View File

@@ -19,7 +19,7 @@ class InstanceGroup(HasCreate, base.Base):
def payload(self, **kwargs):
payload = PseudoNamespace(name=kwargs.get('name') or 'Instance Group - {}'.format(random_title()))
fields = ('policy_instance_percentage', 'policy_instance_minimum', 'policy_instance_list', 'is_container_group', 'max_forks', 'max_concurrent_jobs')
fields = ('policy_instance_percentage', 'policy_instance_minimum', 'policy_instance_list', 'is_container_group')
update_payload(payload, fields, kwargs)
set_payload_foreign_key_args(payload, ('credential',), kwargs)

View File

@@ -333,6 +333,7 @@ class InventorySource(HasCreate, HasNotifications, UnifiedJobTemplate):
'overwrite_vars',
'update_cache_timeout',
'update_on_launch',
'update_on_project_update',
'verbosity',
)

View File

@@ -161,7 +161,7 @@ class Export(CustomCommand):
# 1) the resource flag is not used at all, which will result in the attr being None
# 2) the resource flag is used with no argument, which will result in the attr being ''
# 3) the resource flag is used with an argument, and the attr will be that argument's value
resources.add_argument('--{}'.format(resource), nargs='*')
resources.add_argument('--{}'.format(resource), nargs='?', const='')
def handle(self, client, parser):
self.extend_parser(parser)

View File

@@ -124,15 +124,3 @@ be selected. If set to a value of `0.0` then the smallest value will be used. A
be `18`:
16 + (20 - 16) * 0.5 == 18
### Max forks and Max Concurrent jobs on Instance Groups and Container Groups
By default, only Instances have capacity and we only track capacity consumed per instance. With the max_forks and max_concurrent_jobs fields now available on Instance Groups, we additionally can limit how many jobs or forks are allowed to be concurrently consumed across an entire Instance Group or Container Group.
This is especially useful for Container Groups where previously, there was no limit to how many jobs we would submit to a Container Group, which made it impossible to "overflow" job loads from one Container Group to another container group, which may be on a different Kubenetes cluster or namespace.
One way to calculate what max_concurrent_jobs is desirable to set on a Container Group is to consider the pod_spec for that container group. In the pod_spec we indicate the resource requests and limits for the automation job pod. If you pod_spec indicates that a pod with 100MB of memory will be provisioned, and you know your Kubernetes cluster has 1 worker node with 8GB of RAM, you know that the maximum number of jobs that you would ideally start would be around 81 jobs, calculated by taking (8GB memory on node * 1024 MB) // 100 MB memory/job pod which with floor division comes out to 81.
Alternatively, instead of considering the number of job pods and the resources requested, we can consider the memory consumption of the forks in the jobs. We normally consider that 100MB of memory will be used by each fork of ansible. Therefore we also know that our 8 GB worker node should also only run 81 forks of ansible at a time -- which depending on the forks and inventory settings of the job templates, could be consumed by anywhere from 1 job to 81 jobs. So we can also set max_forks = 81. This way, either 39 jobs with 1 fork can run (task impact is always forks + 1), or 2 jobs with forks set to 39 can run.
While this feature is most useful for Container Groups where there is no other way to limit job execution, this feature is avialable for use on any instance group. This can be useful if for other business reasons you want to set a InstanceGroup wide limit on concurrent jobs. For example, if you have a job template that you only want 10 copies of running at a time -- you could create a dedicated instance group for that job template and set max_concurrent_jobs to 10.

Some files were not shown because too many files have changed in this diff Show More