mirror of
https://github.com/ansible/awx.git
synced 2026-03-27 05:45:02 -02:30
Compare commits
7 Commits
21.4.0
...
mesh-scali
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ae9156a4a | ||
|
|
4890c15eeb | ||
|
|
bc6b8fc4ae | ||
|
|
03c70077f9 | ||
|
|
dab8c3ef55 | ||
|
|
d2a6be7ca9 | ||
|
|
170795ab76 |
2
.github/ISSUE_TEMPLATE.md
vendored
2
.github/ISSUE_TEMPLATE.md
vendored
@@ -25,7 +25,7 @@ Instead use the bug or feature request.
|
|||||||
<!--- Pick one below and delete the rest: -->
|
<!--- Pick one below and delete the rest: -->
|
||||||
- Breaking Change
|
- Breaking Change
|
||||||
- New or Enhanced Feature
|
- New or Enhanced Feature
|
||||||
- Bug, Docs Fix or other nominal change
|
- Bug or Docs Fix
|
||||||
|
|
||||||
|
|
||||||
##### COMPONENT NAME
|
##### COMPONENT NAME
|
||||||
|
|||||||
2
.github/PULL_REQUEST_TEMPLATE.md
vendored
2
.github/PULL_REQUEST_TEMPLATE.md
vendored
@@ -11,7 +11,7 @@ the change does.
|
|||||||
<!--- Pick one below and delete the rest: -->
|
<!--- Pick one below and delete the rest: -->
|
||||||
- Breaking Change
|
- Breaking Change
|
||||||
- New or Enhanced Feature
|
- New or Enhanced Feature
|
||||||
- Bug, Docs Fix or other nominal change
|
- Bug or Docs Fix
|
||||||
|
|
||||||
##### COMPONENT NAME
|
##### COMPONENT NAME
|
||||||
<!--- Name of the module/plugin/module/task -->
|
<!--- Name of the module/plugin/module/task -->
|
||||||
|
|||||||
3
.github/triage_replies.md
vendored
3
.github/triage_replies.md
vendored
@@ -93,9 +93,6 @@ The Ansible Community is looking at building an EE that corresponds to all of th
|
|||||||
- AWX: https://github.com/ansible/awx/blob/devel/CONTRIBUTING.md
|
- AWX: https://github.com/ansible/awx/blob/devel/CONTRIBUTING.md
|
||||||
- AWX-Operator: https://github.com/ansible/awx-operator/blob/devel/CONTRIBUTING.md
|
- AWX-Operator: https://github.com/ansible/awx-operator/blob/devel/CONTRIBUTING.md
|
||||||
|
|
||||||
### Oracle AWX
|
|
||||||
We'd be happy to help if you can reproduce this with AWX since we do not have Oracle's Linux Automation Manager. If you need help with this specific version of Oracles Linux Automation Manager you will need to contact your Oracle for support.
|
|
||||||
|
|
||||||
### AWX Release
|
### AWX Release
|
||||||
Subject: Announcing AWX Xa.Ya.za and AWX-Operator Xb.Yb.zb
|
Subject: Announcing AWX Xa.Ya.za and AWX-Operator Xb.Yb.zb
|
||||||
|
|
||||||
|
|||||||
45
.github/workflows/pr_body_check.yml
vendored
45
.github/workflows/pr_body_check.yml
vendored
@@ -1,45 +0,0 @@
|
|||||||
---
|
|
||||||
name: PR Check
|
|
||||||
env:
|
|
||||||
BRANCH: ${{ github.base_ref || 'devel' }}
|
|
||||||
on:
|
|
||||||
pull_request:
|
|
||||||
types: [opened, edited, reopened, synchronize]
|
|
||||||
jobs:
|
|
||||||
pr-check:
|
|
||||||
name: Scan PR description for semantic versioning keywords
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
permissions:
|
|
||||||
packages: write
|
|
||||||
contents: read
|
|
||||||
steps:
|
|
||||||
- name: Write PR body to a file
|
|
||||||
run: |
|
|
||||||
cat >> pr.body << __SOME_RANDOM_PR_EOF__
|
|
||||||
${{ github.event.pull_request.body }}
|
|
||||||
__SOME_RANDOM_PR_EOF__
|
|
||||||
|
|
||||||
- name: Display the received body for troubleshooting
|
|
||||||
run: cat pr.body
|
|
||||||
|
|
||||||
# We want to write these out individually just incase the options were joined on a single line
|
|
||||||
- name: Check for each of the lines
|
|
||||||
run: |
|
|
||||||
grep "Bug, Docs Fix or other nominal change" pr.body > Z
|
|
||||||
grep "New or Enhanced Feature" pr.body > Y
|
|
||||||
grep "Breaking Change" pr.body > X
|
|
||||||
exit 0
|
|
||||||
# We exit 0 and set the shell to prevent the returns from the greps from failing this step
|
|
||||||
# See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#exit-codes-and-error-action-preference
|
|
||||||
shell: bash {0}
|
|
||||||
|
|
||||||
- name: Check for exactly one item
|
|
||||||
run: |
|
|
||||||
if [ $(cat X Y Z | wc -l) != 1 ] ; then
|
|
||||||
echo "The PR body must contain exactly one of [ 'Bug, Docs Fix or other nominal change', 'New or Enhanced Feature', 'Breaking Change' ]"
|
|
||||||
echo "We counted $(cat X Y Z | wc -l)"
|
|
||||||
echo "See the default PR body for examples"
|
|
||||||
exit 255;
|
|
||||||
else
|
|
||||||
exit 0;
|
|
||||||
fi
|
|
||||||
@@ -4753,7 +4753,7 @@ class ScheduleSerializer(LaunchConfigurationBaseSerializer, SchedulePreviewSeria
|
|||||||
class InstanceLinkSerializer(BaseSerializer):
|
class InstanceLinkSerializer(BaseSerializer):
|
||||||
class Meta:
|
class Meta:
|
||||||
model = InstanceLink
|
model = InstanceLink
|
||||||
fields = ('source', 'target')
|
fields = ('source', 'target', 'link_state')
|
||||||
|
|
||||||
source = serializers.SlugRelatedField(slug_field="hostname", read_only=True)
|
source = serializers.SlugRelatedField(slug_field="hostname", read_only=True)
|
||||||
target = serializers.SlugRelatedField(slug_field="hostname", read_only=True)
|
target = serializers.SlugRelatedField(slug_field="hostname", read_only=True)
|
||||||
@@ -4762,31 +4762,25 @@ class InstanceLinkSerializer(BaseSerializer):
|
|||||||
class InstanceNodeSerializer(BaseSerializer):
|
class InstanceNodeSerializer(BaseSerializer):
|
||||||
class Meta:
|
class Meta:
|
||||||
model = Instance
|
model = Instance
|
||||||
fields = ('id', 'hostname', 'node_type', 'node_state')
|
fields = ('id', 'hostname', 'node_type', 'node_state', 'enabled')
|
||||||
|
|
||||||
node_state = serializers.SerializerMethodField()
|
|
||||||
|
|
||||||
def get_node_state(self, obj):
|
|
||||||
if not obj.enabled:
|
|
||||||
return "disabled"
|
|
||||||
return "error" if obj.errors else "healthy"
|
|
||||||
|
|
||||||
|
|
||||||
class InstanceSerializer(BaseSerializer):
|
class InstanceSerializer(BaseSerializer):
|
||||||
|
|
||||||
consumed_capacity = serializers.SerializerMethodField()
|
consumed_capacity = serializers.SerializerMethodField()
|
||||||
percent_capacity_remaining = serializers.SerializerMethodField()
|
percent_capacity_remaining = serializers.SerializerMethodField()
|
||||||
jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that ' 'are targeted for this instance'), read_only=True)
|
jobs_running = serializers.IntegerField(help_text=_('Count of jobs in the running or waiting state that are targeted for this instance'), read_only=True)
|
||||||
jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True)
|
jobs_total = serializers.IntegerField(help_text=_('Count of all jobs that target this instance'), read_only=True)
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
model = Instance
|
model = Instance
|
||||||
read_only_fields = ('uuid', 'hostname', 'version', 'node_type')
|
read_only_fields = ('uuid', 'hostname', 'version', 'node_type', 'node_state')
|
||||||
fields = (
|
fields = (
|
||||||
"id",
|
"id",
|
||||||
"type",
|
"type",
|
||||||
"url",
|
"url",
|
||||||
"related",
|
"related",
|
||||||
|
"summary_fields",
|
||||||
"uuid",
|
"uuid",
|
||||||
"hostname",
|
"hostname",
|
||||||
"created",
|
"created",
|
||||||
@@ -4808,6 +4802,7 @@ class InstanceSerializer(BaseSerializer):
|
|||||||
"enabled",
|
"enabled",
|
||||||
"managed_by_policy",
|
"managed_by_policy",
|
||||||
"node_type",
|
"node_type",
|
||||||
|
"node_state",
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_related(self, obj):
|
def get_related(self, obj):
|
||||||
@@ -4819,6 +4814,14 @@ class InstanceSerializer(BaseSerializer):
|
|||||||
res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk})
|
res['health_check'] = self.reverse('api:instance_health_check', kwargs={'pk': obj.pk})
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
def get_summary_fields(self, obj):
|
||||||
|
summary = super().get_summary_fields(obj)
|
||||||
|
|
||||||
|
if self.is_detail_view:
|
||||||
|
summary['links'] = InstanceLinkSerializer(InstanceLink.objects.select_related('target', 'source').filter(source=obj), many=True).data
|
||||||
|
|
||||||
|
return summary
|
||||||
|
|
||||||
def get_consumed_capacity(self, obj):
|
def get_consumed_capacity(self, obj):
|
||||||
return obj.consumed_capacity
|
return obj.consumed_capacity
|
||||||
|
|
||||||
|
|||||||
@@ -440,6 +440,7 @@ class InstanceHealthCheck(GenericAPIView):
|
|||||||
def post(self, request, *args, **kwargs):
|
def post(self, request, *args, **kwargs):
|
||||||
obj = self.get_object()
|
obj = self.get_object()
|
||||||
|
|
||||||
|
# Note: hop nodes are already excluded by the get_queryset method
|
||||||
if obj.node_type == 'execution':
|
if obj.node_type == 'execution':
|
||||||
from awx.main.tasks.system import execution_node_health_check
|
from awx.main.tasks.system import execution_node_health_check
|
||||||
|
|
||||||
|
|||||||
@@ -27,7 +27,9 @@ class Command(BaseCommand):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def handle(self, **options):
|
def handle(self, **options):
|
||||||
|
# provides a mapping of hostname to Instance objects
|
||||||
nodes = Instance.objects.in_bulk(field_name='hostname')
|
nodes = Instance.objects.in_bulk(field_name='hostname')
|
||||||
|
|
||||||
if options['source'] not in nodes:
|
if options['source'] not in nodes:
|
||||||
raise CommandError(f"Host {options['source']} is not a registered instance.")
|
raise CommandError(f"Host {options['source']} is not a registered instance.")
|
||||||
if not (options['peers'] or options['disconnect'] or options['exact'] is not None):
|
if not (options['peers'] or options['disconnect'] or options['exact'] is not None):
|
||||||
@@ -57,7 +59,9 @@ class Command(BaseCommand):
|
|||||||
|
|
||||||
results = 0
|
results = 0
|
||||||
for target in options['peers']:
|
for target in options['peers']:
|
||||||
_, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target])
|
_, created = InstanceLink.objects.update_or_create(
|
||||||
|
source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED}
|
||||||
|
)
|
||||||
if created:
|
if created:
|
||||||
results += 1
|
results += 1
|
||||||
|
|
||||||
@@ -80,7 +84,9 @@ class Command(BaseCommand):
|
|||||||
links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__hostname', flat=True))
|
links = set(InstanceLink.objects.filter(source=nodes[options['source']]).values_list('target__hostname', flat=True))
|
||||||
removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__hostname__in=links - peers).delete()
|
removals, _ = InstanceLink.objects.filter(source=nodes[options['source']], target__hostname__in=links - peers).delete()
|
||||||
for target in peers - links:
|
for target in peers - links:
|
||||||
_, created = InstanceLink.objects.get_or_create(source=nodes[options['source']], target=nodes[target])
|
_, created = InstanceLink.objects.update_or_create(
|
||||||
|
source=nodes[options['source']], target=nodes[target], defaults={'link_state': InstanceLink.States.ESTABLISHED}
|
||||||
|
)
|
||||||
if created:
|
if created:
|
||||||
additions += 1
|
additions += 1
|
||||||
|
|
||||||
|
|||||||
@@ -129,10 +129,13 @@ class InstanceManager(models.Manager):
|
|||||||
# if instance was not retrieved by uuid and hostname was, use the hostname
|
# if instance was not retrieved by uuid and hostname was, use the hostname
|
||||||
instance = self.filter(hostname=hostname)
|
instance = self.filter(hostname=hostname)
|
||||||
|
|
||||||
|
from awx.main.models import Instance
|
||||||
|
|
||||||
# Return existing instance
|
# Return existing instance
|
||||||
if instance.exists():
|
if instance.exists():
|
||||||
instance = instance.first() # in the unusual occasion that there is more than one, only get one
|
instance = instance.first() # in the unusual occasion that there is more than one, only get one
|
||||||
update_fields = []
|
instance.node_state = Instance.States.INSTALLED # Wait for it to show up on the mesh
|
||||||
|
update_fields = ['node_state']
|
||||||
# if instance was retrieved by uuid and hostname has changed, update hostname
|
# if instance was retrieved by uuid and hostname has changed, update hostname
|
||||||
if instance.hostname != hostname:
|
if instance.hostname != hostname:
|
||||||
logger.warning("passed in hostname {0} is different from the original hostname {1}, updating to {0}".format(hostname, instance.hostname))
|
logger.warning("passed in hostname {0} is different from the original hostname {1}, updating to {0}".format(hostname, instance.hostname))
|
||||||
@@ -141,6 +144,7 @@ class InstanceManager(models.Manager):
|
|||||||
# if any other fields are to be updated
|
# if any other fields are to be updated
|
||||||
if instance.ip_address != ip_address:
|
if instance.ip_address != ip_address:
|
||||||
instance.ip_address = ip_address
|
instance.ip_address = ip_address
|
||||||
|
update_fields.append('ip_address')
|
||||||
if instance.node_type != node_type:
|
if instance.node_type != node_type:
|
||||||
instance.node_type = node_type
|
instance.node_type = node_type
|
||||||
update_fields.append('node_type')
|
update_fields.append('node_type')
|
||||||
@@ -151,12 +155,12 @@ class InstanceManager(models.Manager):
|
|||||||
return (False, instance)
|
return (False, instance)
|
||||||
|
|
||||||
# Create new instance, and fill in default values
|
# Create new instance, and fill in default values
|
||||||
create_defaults = dict(capacity=0)
|
create_defaults = {'node_state': Instance.States.INSTALLED, 'capacity': 0}
|
||||||
if defaults is not None:
|
if defaults is not None:
|
||||||
create_defaults.update(defaults)
|
create_defaults.update(defaults)
|
||||||
uuid_option = {}
|
uuid_option = {}
|
||||||
if uuid is not None:
|
if uuid is not None:
|
||||||
uuid_option = dict(uuid=uuid)
|
uuid_option = {'uuid': uuid}
|
||||||
if node_type == 'execution' and 'version' not in create_defaults:
|
if node_type == 'execution' and 'version' not in create_defaults:
|
||||||
create_defaults['version'] = RECEPTOR_PENDING
|
create_defaults['version'] = RECEPTOR_PENDING
|
||||||
instance = self.create(hostname=hostname, ip_address=ip_address, node_type=node_type, **create_defaults, **uuid_option)
|
instance = self.create(hostname=hostname, ip_address=ip_address, node_type=node_type, **create_defaults, **uuid_option)
|
||||||
|
|||||||
79
awx/main/migrations/0165_node_and_link_state.py
Normal file
79
awx/main/migrations/0165_node_and_link_state.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
# Generated by Django 3.2.13 on 2022-08-02 17:53
|
||||||
|
|
||||||
|
import django.core.validators
|
||||||
|
from django.db import migrations, models
|
||||||
|
|
||||||
|
|
||||||
|
def forwards(apps, schema_editor):
|
||||||
|
# All existing InstanceLink objects need to be in the state
|
||||||
|
# 'Established', which is the default, so nothing needs to be done
|
||||||
|
# for that.
|
||||||
|
|
||||||
|
Instance = apps.get_model('main', 'Instance')
|
||||||
|
for instance in Instance.objects.all():
|
||||||
|
instance.node_state = 'ready' if not instance.errors else 'unavailable'
|
||||||
|
instance.save(update_fields=['node_state'])
|
||||||
|
|
||||||
|
|
||||||
|
class Migration(migrations.Migration):
|
||||||
|
|
||||||
|
dependencies = [
|
||||||
|
('main', '0164_remove_inventorysource_update_on_project_update'),
|
||||||
|
]
|
||||||
|
|
||||||
|
operations = [
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='instance',
|
||||||
|
name='listener_port',
|
||||||
|
field=models.PositiveIntegerField(
|
||||||
|
blank=True,
|
||||||
|
default=27199,
|
||||||
|
help_text='Port that Receptor will listen for incoming connections on.',
|
||||||
|
validators=[django.core.validators.MinValueValidator(1), django.core.validators.MaxValueValidator(65535)],
|
||||||
|
),
|
||||||
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='instance',
|
||||||
|
name='node_state',
|
||||||
|
field=models.CharField(
|
||||||
|
choices=[
|
||||||
|
('provisioning', 'Provisioning'),
|
||||||
|
('provision-fail', 'Provisioning Failure'),
|
||||||
|
('installed', 'Installed'),
|
||||||
|
('ready', 'Ready'),
|
||||||
|
('unavailable', 'Unavailable'),
|
||||||
|
('deprovisioning', 'De-provisioning'),
|
||||||
|
('deprovision-fail', 'De-provisioning Failure'),
|
||||||
|
],
|
||||||
|
default='ready',
|
||||||
|
help_text='Indicates the current life cycle stage of this instance.',
|
||||||
|
max_length=16,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
migrations.AddField(
|
||||||
|
model_name='instancelink',
|
||||||
|
name='link_state',
|
||||||
|
field=models.CharField(
|
||||||
|
choices=[('adding', 'Adding'), ('established', 'Established'), ('removing', 'Removing')],
|
||||||
|
default='established',
|
||||||
|
help_text='Indicates the current life cycle stage of this peer link.',
|
||||||
|
max_length=16,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
migrations.AlterField(
|
||||||
|
model_name='instance',
|
||||||
|
name='node_type',
|
||||||
|
field=models.CharField(
|
||||||
|
choices=[
|
||||||
|
('control', 'Control plane node'),
|
||||||
|
('execution', 'Execution plane node'),
|
||||||
|
('hybrid', 'Controller and execution'),
|
||||||
|
('hop', 'Message-passing node, no execution capability'),
|
||||||
|
],
|
||||||
|
default='hybrid',
|
||||||
|
help_text='Role that this node plays in the mesh.',
|
||||||
|
max_length=16,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
migrations.RunPython(forwards, reverse_code=migrations.RunPython.noop),
|
||||||
|
]
|
||||||
@@ -5,7 +5,7 @@ from decimal import Decimal
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from django.core.validators import MinValueValidator
|
from django.core.validators import MinValueValidator, MaxValueValidator
|
||||||
from django.db import models, connection
|
from django.db import models, connection
|
||||||
from django.db.models.signals import post_save, post_delete
|
from django.db.models.signals import post_save, post_delete
|
||||||
from django.dispatch import receiver
|
from django.dispatch import receiver
|
||||||
@@ -58,6 +58,15 @@ class InstanceLink(BaseModel):
|
|||||||
source = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='+')
|
source = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='+')
|
||||||
target = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='reverse_peers')
|
target = models.ForeignKey('Instance', on_delete=models.CASCADE, related_name='reverse_peers')
|
||||||
|
|
||||||
|
class States(models.TextChoices):
|
||||||
|
ADDING = 'adding', _('Adding')
|
||||||
|
ESTABLISHED = 'established', _('Established')
|
||||||
|
REMOVING = 'removing', _('Removing')
|
||||||
|
|
||||||
|
link_state = models.CharField(
|
||||||
|
choices=States.choices, default=States.ESTABLISHED, max_length=16, help_text=_("Indicates the current life cycle stage of this peer link.")
|
||||||
|
)
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
unique_together = ('source', 'target')
|
unique_together = ('source', 'target')
|
||||||
|
|
||||||
@@ -126,13 +135,33 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
|||||||
default=0,
|
default=0,
|
||||||
editable=False,
|
editable=False,
|
||||||
)
|
)
|
||||||
NODE_TYPE_CHOICES = [
|
|
||||||
("control", "Control plane node"),
|
class Types(models.TextChoices):
|
||||||
("execution", "Execution plane node"),
|
CONTROL = 'control', _("Control plane node")
|
||||||
("hybrid", "Controller and execution"),
|
EXECUTION = 'execution', _("Execution plane node")
|
||||||
("hop", "Message-passing node, no execution capability"),
|
HYBRID = 'hybrid', _("Controller and execution")
|
||||||
]
|
HOP = 'hop', _("Message-passing node, no execution capability")
|
||||||
node_type = models.CharField(default='hybrid', choices=NODE_TYPE_CHOICES, max_length=16)
|
|
||||||
|
node_type = models.CharField(default=Types.HYBRID, choices=Types.choices, max_length=16, help_text=_("Role that this node plays in the mesh."))
|
||||||
|
|
||||||
|
class States(models.TextChoices):
|
||||||
|
PROVISIONING = 'provisioning', _('Provisioning')
|
||||||
|
PROVISION_FAIL = 'provision-fail', _('Provisioning Failure')
|
||||||
|
INSTALLED = 'installed', _('Installed')
|
||||||
|
READY = 'ready', _('Ready')
|
||||||
|
UNAVAILABLE = 'unavailable', _('Unavailable')
|
||||||
|
DEPROVISIONING = 'deprovisioning', _('De-provisioning')
|
||||||
|
DEPROVISION_FAIL = 'deprovision-fail', _('De-provisioning Failure')
|
||||||
|
|
||||||
|
node_state = models.CharField(
|
||||||
|
choices=States.choices, default=States.READY, max_length=16, help_text=_("Indicates the current life cycle stage of this instance.")
|
||||||
|
)
|
||||||
|
listener_port = models.PositiveIntegerField(
|
||||||
|
blank=True,
|
||||||
|
default=27199,
|
||||||
|
validators=[MinValueValidator(1), MaxValueValidator(65535)],
|
||||||
|
help_text=_("Port that Receptor will listen for incoming connections on."),
|
||||||
|
)
|
||||||
|
|
||||||
peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target'))
|
peers = models.ManyToManyField('self', symmetrical=False, through=InstanceLink, through_fields=('source', 'target'))
|
||||||
|
|
||||||
@@ -209,15 +238,18 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
|||||||
return self.last_seen < ref_time - timedelta(seconds=grace_period)
|
return self.last_seen < ref_time - timedelta(seconds=grace_period)
|
||||||
|
|
||||||
def mark_offline(self, update_last_seen=False, perform_save=True, errors=''):
|
def mark_offline(self, update_last_seen=False, perform_save=True, errors=''):
|
||||||
if self.cpu_capacity == 0 and self.mem_capacity == 0 and self.capacity == 0 and self.errors == errors and (not update_last_seen):
|
if self.node_state not in (Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
|
||||||
return
|
return
|
||||||
|
if self.node_state == Instance.States.UNAVAILABLE and self.errors == errors and (not update_last_seen):
|
||||||
|
return
|
||||||
|
self.node_state = Instance.States.UNAVAILABLE
|
||||||
self.cpu_capacity = self.mem_capacity = self.capacity = 0
|
self.cpu_capacity = self.mem_capacity = self.capacity = 0
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
if update_last_seen:
|
if update_last_seen:
|
||||||
self.last_seen = now()
|
self.last_seen = now()
|
||||||
|
|
||||||
if perform_save:
|
if perform_save:
|
||||||
update_fields = ['capacity', 'cpu_capacity', 'mem_capacity', 'errors']
|
update_fields = ['node_state', 'capacity', 'cpu_capacity', 'mem_capacity', 'errors']
|
||||||
if update_last_seen:
|
if update_last_seen:
|
||||||
update_fields += ['last_seen']
|
update_fields += ['last_seen']
|
||||||
self.save(update_fields=update_fields)
|
self.save(update_fields=update_fields)
|
||||||
@@ -274,6 +306,9 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
|||||||
if not errors:
|
if not errors:
|
||||||
self.refresh_capacity_fields()
|
self.refresh_capacity_fields()
|
||||||
self.errors = ''
|
self.errors = ''
|
||||||
|
if self.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
|
||||||
|
self.node_state = Instance.States.READY
|
||||||
|
update_fields.append('node_state')
|
||||||
else:
|
else:
|
||||||
self.mark_offline(perform_save=False, errors=errors)
|
self.mark_offline(perform_save=False, errors=errors)
|
||||||
update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity'])
|
update_fields.extend(['cpu_capacity', 'mem_capacity', 'capacity'])
|
||||||
@@ -292,7 +327,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
|||||||
# playbook event data; we should consider this a zero capacity event
|
# playbook event data; we should consider this a zero capacity event
|
||||||
redis.Redis.from_url(settings.BROKER_URL).ping()
|
redis.Redis.from_url(settings.BROKER_URL).ping()
|
||||||
except redis.ConnectionError:
|
except redis.ConnectionError:
|
||||||
errors = _('Failed to connect ot Redis')
|
errors = _('Failed to connect to Redis')
|
||||||
|
|
||||||
self.save_health_data(awx_application_version, get_cpu_count(), get_mem_in_bytes(), update_last_seen=True, errors=errors)
|
self.save_health_data(awx_application_version, get_cpu_count(), get_mem_in_bytes(), update_last_seen=True, errors=errors)
|
||||||
|
|
||||||
|
|||||||
@@ -38,7 +38,9 @@ class TaskManagerInstances:
|
|||||||
self.instances_by_hostname = dict()
|
self.instances_by_hostname = dict()
|
||||||
if instances is None:
|
if instances is None:
|
||||||
instances = (
|
instances = (
|
||||||
Instance.objects.filter(hostname__isnull=False, enabled=True).exclude(node_type='hop').only('node_type', 'capacity', 'hostname', 'enabled')
|
Instance.objects.filter(hostname__isnull=False, node_state=Instance.States.READY, enabled=True)
|
||||||
|
.exclude(node_type='hop')
|
||||||
|
.only('node_type', 'node_state', 'capacity', 'hostname', 'enabled')
|
||||||
)
|
)
|
||||||
for instance in instances:
|
for instance in instances:
|
||||||
self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance)
|
self.instances_by_hostname[instance.hostname] = TaskManagerInstance(instance)
|
||||||
|
|||||||
@@ -114,7 +114,7 @@ def inform_cluster_of_shutdown():
|
|||||||
try:
|
try:
|
||||||
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
|
this_inst = Instance.objects.get(hostname=settings.CLUSTER_HOST_ID)
|
||||||
this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal'))
|
this_inst.mark_offline(update_last_seen=True, errors=_('Instance received normal shutdown signal'))
|
||||||
logger.warning('Normal shutdown signal for instance {}, ' 'removed self from capacity pool.'.format(this_inst.hostname))
|
logger.warning('Normal shutdown signal for instance {}, removed self from capacity pool.'.format(this_inst.hostname))
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception('Encountered problem with normal shutdown signal.')
|
logger.exception('Encountered problem with normal shutdown signal.')
|
||||||
|
|
||||||
@@ -341,9 +341,13 @@ def _cleanup_images_and_files(**kwargs):
|
|||||||
logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}')
|
logger.info(f'Performed local cleanup with kwargs {kwargs}, output:\n{stdout}')
|
||||||
|
|
||||||
# if we are the first instance alphabetically, then run cleanup on execution nodes
|
# if we are the first instance alphabetically, then run cleanup on execution nodes
|
||||||
checker_instance = Instance.objects.filter(node_type__in=['hybrid', 'control'], enabled=True, capacity__gt=0).order_by('-hostname').first()
|
checker_instance = (
|
||||||
|
Instance.objects.filter(node_type__in=['hybrid', 'control'], node_state=Instance.States.READY, enabled=True, capacity__gt=0)
|
||||||
|
.order_by('-hostname')
|
||||||
|
.first()
|
||||||
|
)
|
||||||
if checker_instance and this_inst.hostname == checker_instance.hostname:
|
if checker_instance and this_inst.hostname == checker_instance.hostname:
|
||||||
for inst in Instance.objects.filter(node_type='execution', enabled=True, capacity__gt=0):
|
for inst in Instance.objects.filter(node_type='execution', node_state=Instance.States.READY, enabled=True, capacity__gt=0):
|
||||||
runner_cleanup_kwargs = inst.get_cleanup_task_kwargs(**kwargs)
|
runner_cleanup_kwargs = inst.get_cleanup_task_kwargs(**kwargs)
|
||||||
if not runner_cleanup_kwargs:
|
if not runner_cleanup_kwargs:
|
||||||
continue
|
continue
|
||||||
@@ -399,6 +403,9 @@ def execution_node_health_check(node):
|
|||||||
if instance.node_type != 'execution':
|
if instance.node_type != 'execution':
|
||||||
raise RuntimeError(f'Execution node health check ran against {instance.node_type} node {instance.hostname}')
|
raise RuntimeError(f'Execution node health check ran against {instance.node_type} node {instance.hostname}')
|
||||||
|
|
||||||
|
if instance.node_state not in (Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
|
||||||
|
raise RuntimeError(f"Execution node health check ran against node {instance.hostname} in state {instance.node_state}")
|
||||||
|
|
||||||
data = worker_info(node)
|
data = worker_info(node)
|
||||||
|
|
||||||
prior_capacity = instance.capacity
|
prior_capacity = instance.capacity
|
||||||
@@ -432,6 +439,7 @@ def inspect_execution_nodes(instance_list):
|
|||||||
|
|
||||||
nowtime = now()
|
nowtime = now()
|
||||||
workers = mesh_status['Advertisements']
|
workers = mesh_status['Advertisements']
|
||||||
|
|
||||||
for ad in workers:
|
for ad in workers:
|
||||||
hostname = ad['NodeID']
|
hostname = ad['NodeID']
|
||||||
|
|
||||||
@@ -445,9 +453,7 @@ def inspect_execution_nodes(instance_list):
|
|||||||
if instance.node_type in ('control', 'hybrid'):
|
if instance.node_type in ('control', 'hybrid'):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
was_lost = instance.is_lost(ref_time=nowtime)
|
|
||||||
last_seen = parse_date(ad['Time'])
|
last_seen = parse_date(ad['Time'])
|
||||||
|
|
||||||
if instance.last_seen and instance.last_seen >= last_seen:
|
if instance.last_seen and instance.last_seen >= last_seen:
|
||||||
continue
|
continue
|
||||||
instance.last_seen = last_seen
|
instance.last_seen = last_seen
|
||||||
@@ -455,12 +461,12 @@ def inspect_execution_nodes(instance_list):
|
|||||||
|
|
||||||
# Only execution nodes should be dealt with by execution_node_health_check
|
# Only execution nodes should be dealt with by execution_node_health_check
|
||||||
if instance.node_type == 'hop':
|
if instance.node_type == 'hop':
|
||||||
if was_lost and (not instance.is_lost(ref_time=nowtime)):
|
if instance.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
|
||||||
logger.warning(f'Hop node {hostname}, has rejoined the receptor mesh')
|
logger.warning(f'Hop node {hostname}, has rejoined the receptor mesh')
|
||||||
instance.save_health_data(errors='')
|
instance.save_health_data(errors='')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if was_lost:
|
if instance.node_state in (Instance.States.UNAVAILABLE, Instance.States.INSTALLED):
|
||||||
# if the instance *was* lost, but has appeared again,
|
# if the instance *was* lost, but has appeared again,
|
||||||
# attempt to re-establish the initial capacity and version
|
# attempt to re-establish the initial capacity and version
|
||||||
# check
|
# check
|
||||||
@@ -479,7 +485,7 @@ def inspect_execution_nodes(instance_list):
|
|||||||
def cluster_node_heartbeat():
|
def cluster_node_heartbeat():
|
||||||
logger.debug("Cluster node heartbeat task.")
|
logger.debug("Cluster node heartbeat task.")
|
||||||
nowtime = now()
|
nowtime = now()
|
||||||
instance_list = list(Instance.objects.all())
|
instance_list = list(Instance.objects.filter(node_state__in=(Instance.States.READY, Instance.States.UNAVAILABLE, Instance.States.INSTALLED)))
|
||||||
this_inst = None
|
this_inst = None
|
||||||
lost_instances = []
|
lost_instances = []
|
||||||
|
|
||||||
@@ -530,9 +536,9 @@ def cluster_node_heartbeat():
|
|||||||
try:
|
try:
|
||||||
if settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
if settings.AWX_AUTO_DEPROVISION_INSTANCES:
|
||||||
deprovision_hostname = other_inst.hostname
|
deprovision_hostname = other_inst.hostname
|
||||||
other_inst.delete()
|
other_inst.delete() # FIXME: what about associated inbound links?
|
||||||
logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname))
|
logger.info("Host {} Automatically Deprovisioned.".format(deprovision_hostname))
|
||||||
elif other_inst.capacity != 0 or (not other_inst.errors):
|
elif other_inst.node_state == Instance.States.READY:
|
||||||
other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive'))
|
other_inst.mark_offline(errors=_('Another cluster node has determined this instance to be unresponsive'))
|
||||||
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen))
|
logger.error("Host {} last checked in at {}, marked as lost.".format(other_inst.hostname, other_inst.last_seen))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user