mirror of
https://github.com/ansible/awx.git
synced 2026-03-08 05:01:09 -02:30
Move InstanceLink creation and updating to the async tasks
So that they get applied in situations that do not go through the API.
This commit is contained in:
@@ -368,11 +368,6 @@ class InstanceList(ListCreateAPIView):
|
|||||||
search_fields = ('hostname',)
|
search_fields = ('hostname',)
|
||||||
ordering = ('id',)
|
ordering = ('id',)
|
||||||
|
|
||||||
def perform_create(self, serializer):
|
|
||||||
obj = serializer.save(node_state=models.Instance.States.INSTALLED)
|
|
||||||
for instance in models.Instance.objects.filter(node_type__in=[models.Instance.Types.CONTROL, models.Instance.Types.HYBRID]):
|
|
||||||
models.InstanceLink.objects.create(source=instance, target=obj, link_state=models.InstanceLink.States.ADDING)
|
|
||||||
|
|
||||||
|
|
||||||
class InstanceDetail(RetrieveUpdateAPIView):
|
class InstanceDetail(RetrieveUpdateAPIView):
|
||||||
|
|
||||||
@@ -384,9 +379,6 @@ class InstanceDetail(RetrieveUpdateAPIView):
|
|||||||
r = super(InstanceDetail, self).update(request, *args, **kwargs)
|
r = super(InstanceDetail, self).update(request, *args, **kwargs)
|
||||||
if status.is_success(r.status_code):
|
if status.is_success(r.status_code):
|
||||||
obj = self.get_object()
|
obj = self.get_object()
|
||||||
if obj.node_state == models.Instance.States.DEPROVISIONING:
|
|
||||||
models.InstanceLink.objects.filter(target=obj).update(link_state=models.InstanceLink.States.REMOVING)
|
|
||||||
models.InstanceLink.objects.filter(source=obj).update(link_state=models.InstanceLink.States.REMOVING)
|
|
||||||
obj.set_capacity_value()
|
obj.set_capacity_value()
|
||||||
obj.save(update_fields=['capacity'])
|
obj.save(update_fields=['capacity'])
|
||||||
r.data = serializers.InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj)
|
r.data = serializers.InstanceSerializer(obj, context=self.get_serializer_context()).to_representation(obj)
|
||||||
|
|||||||
@@ -427,11 +427,11 @@ def on_instance_group_saved(sender, instance, created=False, raw=False, **kwargs
|
|||||||
def on_instance_saved(sender, instance, created=False, raw=False, **kwargs):
|
def on_instance_saved(sender, instance, created=False, raw=False, **kwargs):
|
||||||
if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION,):
|
if settings.IS_K8S and instance.node_type in (Instance.Types.EXECUTION,):
|
||||||
if instance.node_state == Instance.States.DEPROVISIONING:
|
if instance.node_state == Instance.States.DEPROVISIONING:
|
||||||
from awx.main.tasks.receptor import wait_for_jobs # prevents circular import
|
from awx.main.tasks.receptor import remove_deprovisioned_node # prevents circular import
|
||||||
|
|
||||||
# wait for jobs on the node to complete, then delete the
|
# wait for jobs on the node to complete, then delete the
|
||||||
# node and kick off write_receptor_config
|
# node and kick off write_receptor_config
|
||||||
connection.on_commit(lambda: wait_for_jobs.apply_async(instance.hostname))
|
connection.on_commit(lambda: remove_deprovisioned_node.apply_async([instance.hostname]))
|
||||||
|
|
||||||
if instance.node_state == Instance.States.INSTALLED:
|
if instance.node_state == Instance.States.INSTALLED:
|
||||||
from awx.main.tasks.receptor import write_receptor_config # prevents circular import
|
from awx.main.tasks.receptor import write_receptor_config # prevents circular import
|
||||||
|
|||||||
@@ -646,9 +646,15 @@ def write_receptor_config():
|
|||||||
|
|
||||||
this_inst = Instance.objects.me()
|
this_inst = Instance.objects.me()
|
||||||
instances = Instance.objects.filter(node_type=Instance.Types.EXECUTION)
|
instances = Instance.objects.filter(node_type=Instance.Types.EXECUTION)
|
||||||
|
existing_peers = {link.target_id for link in InstanceLink.objects.filter(source=this_inst)}
|
||||||
|
new_links = []
|
||||||
for instance in instances:
|
for instance in instances:
|
||||||
peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}}
|
peer = {'tcp-peer': {'address': f'{instance.hostname}:{instance.listener_port}', 'tls': 'tlsclient'}}
|
||||||
receptor_config.append(peer)
|
receptor_config.append(peer)
|
||||||
|
if instance.id not in existing_peers:
|
||||||
|
new_links.append(InstanceLink(source=this_inst, target=instance, link_state=InstanceLink.States.ADDING))
|
||||||
|
|
||||||
|
InstanceLink.objects.bulk_create(new_links)
|
||||||
|
|
||||||
with open(__RECEPTOR_CONF, 'w') as file:
|
with open(__RECEPTOR_CONF, 'w') as file:
|
||||||
yaml.dump(receptor_config, file, default_flow_style=False)
|
yaml.dump(receptor_config, file, default_flow_style=False)
|
||||||
@@ -672,7 +678,10 @@ def write_receptor_config():
|
|||||||
|
|
||||||
|
|
||||||
@task(queue=get_local_queuename)
|
@task(queue=get_local_queuename)
|
||||||
def wait_for_jobs(hostname):
|
def remove_deprovisioned_node(hostname):
|
||||||
|
InstanceLink.objects.filter(source__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)
|
||||||
|
InstanceLink.objects.filter(target__hostname=hostname).update(link_state=InstanceLink.States.REMOVING)
|
||||||
|
|
||||||
node_jobs = UnifiedJob.objects.filter(
|
node_jobs = UnifiedJob.objects.filter(
|
||||||
execution_node=hostname,
|
execution_node=hostname,
|
||||||
status__in=(
|
status__in=(
|
||||||
|
|||||||
Reference in New Issue
Block a user