Compare commits

...

5 Commits

Author SHA1 Message Date
Lila
98dc60f9d6 Update tests to expect batch_size to agree with changes 2025-04-18 16:08:18 -04:00
Lila
29addb6ad0 Remove del load_credentials to resolve CI issue 2025-04-18 15:22:35 -04:00
Lila
c2f8acebb1 Sort both bulk updates and add batch size to facts bulk update to resolve deadlock issue 2025-04-18 15:19:16 -04:00
Alan Rominger
65d309f44a Comment out actual fix 2025-04-18 15:16:41 -04:00
Alan Rominger
8bcc65fe80 Demo of sorting hosts 2025-04-18 15:16:08 -04:00
4 changed files with 84 additions and 4 deletions

View File

@@ -602,7 +602,7 @@ class JobEvent(BasePlaybookEvent):
h.last_job_host_summary_id = host_mapping[h.id]
updated_hosts.add(h)
Host.objects.bulk_update(list(updated_hosts), ['last_job_id', 'last_job_host_summary_id'], batch_size=100)
Host.objects.bulk_update(sorted(updated_hosts, key=lambda host: host.id), ['last_job_id', 'last_job_host_summary_id'], batch_size=100)
# Create/update Host Metrics
self._update_host_metrics(updated_hosts_list)

View File

@@ -62,7 +62,8 @@ def start_fact_cache(hosts, destination, log_data, timeout=None, inventory_id=No
def raw_update_hosts(host_list):
Host.objects.bulk_update(host_list, ['ansible_facts', 'ansible_facts_modified'])
host_list = sorted(host_list, key=lambda host: host.id)
Host.objects.bulk_update(host_list, ['ansible_facts', 'ansible_facts_modified'], batch_size=100)
def update_hosts(host_list, max_tries=5):

View File

@@ -0,0 +1,79 @@
import multiprocessing
import random
from django.db import connection
from django.utils.timezone import now
from awx.main.models import Inventory, Host
def worker_delete_target(ready_event, continue_event, field_name):
"""Runs the bulk update, will be called in duplicate, in parallel"""
inv = Inventory.objects.get(organization__name='Default', name='test_host_update_contention')
host_list = list(inv.hosts.all())
random.shuffle(host_list)
for i, host in enumerate(host_list):
setattr(host, field_name, f'my_var: {i}')
# ready to do the bulk_update
print('worker has loaded all the hosts needed')
ready_event.set()
# wait for the coordination message
continue_event.wait()
# # presumed fix
# host_list = sorted(host_list, key=lambda host: host.id)
# NOTE: did not reproduce the bug without batch_size
Host.objects.bulk_update(host_list, [field_name], batch_size=100)
print('finished doing the bulk update in worker')
def test_host_update_contention(default_org):
inv_kwargs = dict(organization=default_org, name='test_host_update_contention')
if Inventory.objects.filter(**inv_kwargs).exists():
inv = Inventory.objects.get(**inv_kwargs).delete()
inv = Inventory.objects.create(**inv_kwargs)
right_now = now()
hosts = [Host(inventory=inv, name=f'host-{i}', created=right_now, modified=right_now) for i in range(1000)]
print('bulk creating hosts')
Host.objects.bulk_create(hosts)
# sanity check
for host in hosts:
assert not host.variables
# Force our worker pool to make their own connection
connection.close()
ready_events = [multiprocessing.Event() for _ in range(2)]
continue_event = multiprocessing.Event()
print('spawning processes for concurrent bulk updates')
processes = []
fields = ['variables', 'ansible_facts']
for i in range(2):
p = multiprocessing.Process(target=worker_delete_target, args=(ready_events[i], continue_event, fields[i]))
processes.append(p)
p.start()
# Assure both processes are connected and have loaded their host list
for e in ready_events:
print('waiting on subprocess ready event')
e.wait()
# Begin the bulk_update queries
print('setting the continue event for the workers')
continue_event.set()
# if a Deadloack happens it will probably be surfaced by result here
print('waiting on the workers to finish the bulk_update')
for p in processes:
p.join()
print('checking workers have variables set')
for host in inv.hosts.all():
assert host.variables.startswith('my_var:')
assert host.ansible_facts.startswith('my_var:')

View File

@@ -103,7 +103,7 @@ def test_finish_job_fact_cache_with_existing_data(hosts, mocker, tmpdir, ref_tim
assert host.ansible_facts_modified == ref_time
assert hosts[1].ansible_facts == ansible_facts_new
assert hosts[1].ansible_facts_modified > ref_time
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'])
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'], batch_size=100)
def test_finish_job_fact_cache_with_bad_data(hosts, mocker, tmpdir):
@@ -139,4 +139,4 @@ def test_finish_job_fact_cache_clear(hosts, mocker, ref_time, tmpdir):
assert host.ansible_facts_modified == ref_time
assert hosts[1].ansible_facts == {}
assert hosts[1].ansible_facts_modified > ref_time
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'])
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'], batch_size=100)