mirror of
https://github.com/ansible/awx.git
synced 2026-05-17 06:17:36 -02:30
Do not change link state if Removing
inspect_established_receptor_connections should not change link state is current state is Removing. Other changes: - rename inspect_execution_nodes to inspect_execution_and_hop_nodes - Default link state is Adding - Set min listener_port value to 1024 - inspect_established_receptor_connections now runs as part of cluster_node_heartbeat task
This commit is contained in:
@@ -37,7 +37,7 @@ class Migration(migrations.Migration):
|
|||||||
name='link_state',
|
name='link_state',
|
||||||
field=models.CharField(
|
field=models.CharField(
|
||||||
choices=[('adding', 'Adding'), ('established', 'Established'), ('disconnected', 'Disconnected'), ('removing', 'Removing')],
|
choices=[('adding', 'Adding'), ('established', 'Established'), ('disconnected', 'Disconnected'), ('removing', 'Removing')],
|
||||||
default='disconnected',
|
default='adding',
|
||||||
help_text='Indicates the current life cycle stage of this peer link.',
|
help_text='Indicates the current life cycle stage of this peer link.',
|
||||||
max_length=16,
|
max_length=16,
|
||||||
),
|
),
|
||||||
@@ -54,7 +54,7 @@ class Migration(migrations.Migration):
|
|||||||
default=None,
|
default=None,
|
||||||
help_text='Port that Receptor will listen for incoming connections on.',
|
help_text='Port that Receptor will listen for incoming connections on.',
|
||||||
null=True,
|
null=True,
|
||||||
validators=[django.core.validators.MinValueValidator(1), django.core.validators.MaxValueValidator(65535)],
|
validators=[django.core.validators.MinValueValidator(1024), django.core.validators.MaxValueValidator(65535)],
|
||||||
),
|
),
|
||||||
),
|
),
|
||||||
migrations.RunPython(automatically_peer_from_control_plane),
|
migrations.RunPython(automatically_peer_from_control_plane),
|
||||||
|
|||||||
@@ -72,7 +72,7 @@ class InstanceLink(BaseModel):
|
|||||||
REMOVING = 'removing', _('Removing')
|
REMOVING = 'removing', _('Removing')
|
||||||
|
|
||||||
link_state = models.CharField(
|
link_state = models.CharField(
|
||||||
choices=States.choices, default=States.DISCONNECTED, max_length=16, help_text=_("Indicates the current life cycle stage of this peer link.")
|
choices=States.choices, default=States.ADDING, max_length=16, help_text=_("Indicates the current life cycle stage of this peer link.")
|
||||||
)
|
)
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
@@ -181,7 +181,7 @@ class Instance(HasPolicyEditsMixin, BaseModel):
|
|||||||
blank=True,
|
blank=True,
|
||||||
null=True,
|
null=True,
|
||||||
default=None,
|
default=None,
|
||||||
validators=[MinValueValidator(1), MaxValueValidator(65535)],
|
validators=[MinValueValidator(1024), MaxValueValidator(65535)],
|
||||||
help_text=_("Port that Receptor will listen for incoming connections on."),
|
help_text=_("Port that Receptor will listen for incoming connections on."),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -688,8 +688,7 @@ def should_update_config(instances):
|
|||||||
if key.endswith('-peer'):
|
if key.endswith('-peer'):
|
||||||
current_peers.append(value['address'])
|
current_peers.append(value['address'])
|
||||||
intended_peers = [f"{i.hostname}:{i.listener_port}" for i in instances]
|
intended_peers = [f"{i.hostname}:{i.listener_port}" for i in instances]
|
||||||
# TODO remove this logging line
|
logger.debug(f"Peers current {current_peers} intended {intended_peers}")
|
||||||
logger.warning(f"current {current_peers} intended {intended_peers}")
|
|
||||||
if set(current_peers) == set(intended_peers):
|
if set(current_peers) == set(intended_peers):
|
||||||
return False # config file is already update to date
|
return False # config file is already update to date
|
||||||
|
|
||||||
@@ -700,7 +699,7 @@ def generate_config_data():
|
|||||||
# returns two values
|
# returns two values
|
||||||
# receptor config - based on current database peers
|
# receptor config - based on current database peers
|
||||||
# should_update - If True, receptor_config differs from the receptor conf file on disk
|
# should_update - If True, receptor_config differs from the receptor conf file on disk
|
||||||
instances = Instance.objects.filter(node_type__in=(Instance.Types.EXECUTION, Instance.Types.HOP), peers_from_control_nodes=True)
|
instances = Instance.objects.me().peers.all()
|
||||||
|
|
||||||
receptor_config = list(RECEPTOR_CONFIG_STARTER)
|
receptor_config = list(RECEPTOR_CONFIG_STARTER)
|
||||||
for instance in instances:
|
for instance in instances:
|
||||||
|
|||||||
@@ -512,37 +512,38 @@ def execution_node_health_check(node):
|
|||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
@task(queue=get_task_queuename)
|
def inspect_established_receptor_connections(mesh_status):
|
||||||
def inspect_receptor_connections():
|
'''
|
||||||
ctl = get_receptor_ctl()
|
detect active/inactive receptor links
|
||||||
mesh_status = ctl.simple_command('status')
|
'''
|
||||||
|
|
||||||
# detect active/inactive receptor links
|
|
||||||
from awx.main.models import InstanceLink
|
from awx.main.models import InstanceLink
|
||||||
|
|
||||||
all_links = InstanceLink.objects.all()
|
all_links = InstanceLink.objects.all()
|
||||||
active_receptor_conns = mesh_status['KnownConnectionCosts']
|
active_receptor_conns = mesh_status['KnownConnectionCosts']
|
||||||
update_links = []
|
update_links = []
|
||||||
for link in all_links:
|
for link in all_links:
|
||||||
if link.target.hostname in active_receptor_conns.get(link.source.hostname, {}):
|
if link.link_state != InstanceLink.States.REMOVING:
|
||||||
if link.link_state is not InstanceLink.States.ESTABLISHED:
|
if link.target.hostname in active_receptor_conns.get(link.source.hostname, {}):
|
||||||
link.link_state = InstanceLink.States.ESTABLISHED
|
if link.link_state is not InstanceLink.States.ESTABLISHED:
|
||||||
update_links.append(link)
|
link.link_state = InstanceLink.States.ESTABLISHED
|
||||||
else:
|
update_links.append(link)
|
||||||
if link.link_state is not InstanceLink.States.DISCONNECTED:
|
else:
|
||||||
link.link_state = InstanceLink.States.DISCONNECTED
|
if link.link_state is not InstanceLink.States.DISCONNECTED:
|
||||||
update_links.append(link)
|
link.link_state = InstanceLink.States.DISCONNECTED
|
||||||
|
update_links.append(link)
|
||||||
|
|
||||||
InstanceLink.objects.bulk_update(update_links, ['link_state'])
|
InstanceLink.objects.bulk_update(update_links, ['link_state'])
|
||||||
|
|
||||||
|
|
||||||
def inspect_execution_nodes(instance_list):
|
def inspect_execution_and_hop_nodes(instance_list):
|
||||||
with advisory_lock('inspect_execution_nodes_lock', wait=False):
|
with advisory_lock('inspect_execution_and_hop_nodes_lock', wait=False):
|
||||||
node_lookup = {inst.hostname: inst for inst in instance_list}
|
node_lookup = {inst.hostname: inst for inst in instance_list}
|
||||||
|
|
||||||
ctl = get_receptor_ctl()
|
ctl = get_receptor_ctl()
|
||||||
mesh_status = ctl.simple_command('status')
|
mesh_status = ctl.simple_command('status')
|
||||||
|
|
||||||
|
inspect_established_receptor_connections(mesh_status)
|
||||||
|
|
||||||
nowtime = now()
|
nowtime = now()
|
||||||
workers = mesh_status['Advertisements']
|
workers = mesh_status['Advertisements']
|
||||||
|
|
||||||
@@ -600,7 +601,7 @@ def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None):
|
|||||||
this_inst = inst
|
this_inst = inst
|
||||||
break
|
break
|
||||||
|
|
||||||
inspect_execution_nodes(instance_list)
|
inspect_execution_and_hop_nodes(instance_list)
|
||||||
|
|
||||||
for inst in list(instance_list):
|
for inst in list(instance_list):
|
||||||
if inst == this_inst:
|
if inst == this_inst:
|
||||||
|
|||||||
@@ -37,7 +37,7 @@ def test_orphan_unified_job_creation(instance, inventory):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.mark.django_db
|
@pytest.mark.django_db
|
||||||
@mock.patch('awx.main.tasks.system.inspect_execution_nodes', lambda *args, **kwargs: None)
|
@mock.patch('awx.main.tasks.system.inspect_execution_and_hop_nodes', lambda *args, **kwargs: None)
|
||||||
@mock.patch('awx.main.models.ha.get_cpu_effective_capacity', lambda cpu: 8)
|
@mock.patch('awx.main.models.ha.get_cpu_effective_capacity', lambda cpu: 8)
|
||||||
@mock.patch('awx.main.models.ha.get_mem_effective_capacity', lambda mem: 62)
|
@mock.patch('awx.main.models.ha.get_mem_effective_capacity', lambda mem: 62)
|
||||||
def test_job_capacity_and_with_inactive_node():
|
def test_job_capacity_and_with_inactive_node():
|
||||||
|
|||||||
@@ -463,7 +463,6 @@ CELERYBEAT_SCHEDULE = {
|
|||||||
'schedule': timedelta(seconds=CLUSTER_NODE_HEARTBEAT_PERIOD),
|
'schedule': timedelta(seconds=CLUSTER_NODE_HEARTBEAT_PERIOD),
|
||||||
'options': {'expires': 50},
|
'options': {'expires': 50},
|
||||||
},
|
},
|
||||||
'inspect_receptor_connections': {'task': 'awx.main.tasks.system.inspect_receptor_connections', 'schedule': timedelta(seconds=20)},
|
|
||||||
'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)},
|
'gather_analytics': {'task': 'awx.main.tasks.system.gather_analytics', 'schedule': timedelta(minutes=5)},
|
||||||
'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
|
'task_manager': {'task': 'awx.main.scheduler.tasks.task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
|
||||||
'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
|
'dependency_manager': {'task': 'awx.main.scheduler.tasks.dependency_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}},
|
||||||
|
|||||||
Reference in New Issue
Block a user