mirror of
https://github.com/ansible/awx.git
synced 2026-02-04 19:18:13 -03:30
Compare commits
5 Commits
fix/redis-
...
devel_bug_
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
98dc60f9d6 | ||
|
|
29addb6ad0 | ||
|
|
c2f8acebb1 | ||
|
|
65d309f44a | ||
|
|
8bcc65fe80 |
@@ -602,7 +602,7 @@ class JobEvent(BasePlaybookEvent):
|
||||
h.last_job_host_summary_id = host_mapping[h.id]
|
||||
updated_hosts.add(h)
|
||||
|
||||
Host.objects.bulk_update(list(updated_hosts), ['last_job_id', 'last_job_host_summary_id'], batch_size=100)
|
||||
Host.objects.bulk_update(sorted(updated_hosts, key=lambda host: host.id), ['last_job_id', 'last_job_host_summary_id'], batch_size=100)
|
||||
|
||||
# Create/update Host Metrics
|
||||
self._update_host_metrics(updated_hosts_list)
|
||||
|
||||
@@ -62,7 +62,8 @@ def start_fact_cache(hosts, destination, log_data, timeout=None, inventory_id=No
|
||||
|
||||
|
||||
def raw_update_hosts(host_list):
|
||||
Host.objects.bulk_update(host_list, ['ansible_facts', 'ansible_facts_modified'])
|
||||
host_list = sorted(host_list, key=lambda host: host.id)
|
||||
Host.objects.bulk_update(host_list, ['ansible_facts', 'ansible_facts_modified'], batch_size=100)
|
||||
|
||||
|
||||
def update_hosts(host_list, max_tries=5):
|
||||
|
||||
79
awx/main/tests/live/tests/test_host_update_contention.py
Normal file
79
awx/main/tests/live/tests/test_host_update_contention.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import multiprocessing
|
||||
import random
|
||||
|
||||
from django.db import connection
|
||||
from django.utils.timezone import now
|
||||
|
||||
from awx.main.models import Inventory, Host
|
||||
|
||||
|
||||
def worker_delete_target(ready_event, continue_event, field_name):
|
||||
"""Runs the bulk update, will be called in duplicate, in parallel"""
|
||||
inv = Inventory.objects.get(organization__name='Default', name='test_host_update_contention')
|
||||
host_list = list(inv.hosts.all())
|
||||
random.shuffle(host_list)
|
||||
for i, host in enumerate(host_list):
|
||||
setattr(host, field_name, f'my_var: {i}')
|
||||
|
||||
# ready to do the bulk_update
|
||||
print('worker has loaded all the hosts needed')
|
||||
ready_event.set()
|
||||
# wait for the coordination message
|
||||
continue_event.wait()
|
||||
|
||||
# # presumed fix
|
||||
# host_list = sorted(host_list, key=lambda host: host.id)
|
||||
|
||||
# NOTE: did not reproduce the bug without batch_size
|
||||
Host.objects.bulk_update(host_list, [field_name], batch_size=100)
|
||||
print('finished doing the bulk update in worker')
|
||||
|
||||
|
||||
def test_host_update_contention(default_org):
|
||||
inv_kwargs = dict(organization=default_org, name='test_host_update_contention')
|
||||
|
||||
if Inventory.objects.filter(**inv_kwargs).exists():
|
||||
inv = Inventory.objects.get(**inv_kwargs).delete()
|
||||
|
||||
inv = Inventory.objects.create(**inv_kwargs)
|
||||
right_now = now()
|
||||
hosts = [Host(inventory=inv, name=f'host-{i}', created=right_now, modified=right_now) for i in range(1000)]
|
||||
print('bulk creating hosts')
|
||||
Host.objects.bulk_create(hosts)
|
||||
|
||||
# sanity check
|
||||
for host in hosts:
|
||||
assert not host.variables
|
||||
|
||||
# Force our worker pool to make their own connection
|
||||
connection.close()
|
||||
|
||||
ready_events = [multiprocessing.Event() for _ in range(2)]
|
||||
continue_event = multiprocessing.Event()
|
||||
|
||||
print('spawning processes for concurrent bulk updates')
|
||||
processes = []
|
||||
fields = ['variables', 'ansible_facts']
|
||||
for i in range(2):
|
||||
p = multiprocessing.Process(target=worker_delete_target, args=(ready_events[i], continue_event, fields[i]))
|
||||
processes.append(p)
|
||||
p.start()
|
||||
|
||||
# Assure both processes are connected and have loaded their host list
|
||||
for e in ready_events:
|
||||
print('waiting on subprocess ready event')
|
||||
e.wait()
|
||||
|
||||
# Begin the bulk_update queries
|
||||
print('setting the continue event for the workers')
|
||||
continue_event.set()
|
||||
|
||||
# if a Deadloack happens it will probably be surfaced by result here
|
||||
print('waiting on the workers to finish the bulk_update')
|
||||
for p in processes:
|
||||
p.join()
|
||||
|
||||
print('checking workers have variables set')
|
||||
for host in inv.hosts.all():
|
||||
assert host.variables.startswith('my_var:')
|
||||
assert host.ansible_facts.startswith('my_var:')
|
||||
@@ -103,7 +103,7 @@ def test_finish_job_fact_cache_with_existing_data(hosts, mocker, tmpdir, ref_tim
|
||||
assert host.ansible_facts_modified == ref_time
|
||||
assert hosts[1].ansible_facts == ansible_facts_new
|
||||
assert hosts[1].ansible_facts_modified > ref_time
|
||||
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'])
|
||||
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'], batch_size=100)
|
||||
|
||||
|
||||
def test_finish_job_fact_cache_with_bad_data(hosts, mocker, tmpdir):
|
||||
@@ -139,4 +139,4 @@ def test_finish_job_fact_cache_clear(hosts, mocker, ref_time, tmpdir):
|
||||
assert host.ansible_facts_modified == ref_time
|
||||
assert hosts[1].ansible_facts == {}
|
||||
assert hosts[1].ansible_facts_modified > ref_time
|
||||
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'])
|
||||
bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified'], batch_size=100)
|
||||
|
||||
Reference in New Issue
Block a user