diff --git a/awx/main/models/events.py b/awx/main/models/events.py index fcbb81409a..1850e79d2d 100644 --- a/awx/main/models/events.py +++ b/awx/main/models/events.py @@ -24,6 +24,7 @@ from awx.main.managers import DeferJobCreatedManager from awx.main.constants import MINIMAL_EVENTS from awx.main.models.base import CreatedModifiedModel from awx.main.utils import ignore_inventory_computed_fields, camelcase_to_underscore +from awx.main.utils.db import bulk_update_sorted_by_id analytics_logger = logging.getLogger('awx.analytics.job_events') @@ -602,7 +603,7 @@ class JobEvent(BasePlaybookEvent): h.last_job_host_summary_id = host_mapping[h.id] updated_hosts.add(h) - Host.objects.bulk_update(list(updated_hosts), ['last_job_id', 'last_job_host_summary_id'], batch_size=100) + bulk_update_sorted_by_id(Host, updated_hosts, ['last_job_id', 'last_job_host_summary_id']) # Create/update Host Metrics self._update_host_metrics(updated_hosts_list) diff --git a/awx/main/tasks/facts.py b/awx/main/tasks/facts.py index 0475468e19..2617daa28f 100644 --- a/awx/main/tasks/facts.py +++ b/awx/main/tasks/facts.py @@ -8,11 +8,11 @@ import logging from django.conf import settings from django.utils.encoding import smart_str from django.utils.timezone import now -from django.db import OperationalError # AWX +from awx.main.utils.db import bulk_update_sorted_by_id +from awx.main.models import Host from awx.main.utils.common import log_excess_runtime -from awx.main.models.inventory import Host logger = logging.getLogger('awx.main.tasks.facts') @@ -59,28 +59,6 @@ def start_fact_cache(hosts, destination, log_data, timeout=None, inventory_id=No return None, hosts_cached -def raw_update_hosts(host_list): - Host.objects.bulk_update(host_list, ['ansible_facts', 'ansible_facts_modified']) - - -def update_hosts(host_list, max_tries=5): - if not host_list: - return - for i in range(max_tries): - try: - raw_update_hosts(host_list) - except OperationalError as exc: - # Deadlocks can happen if this runs at the same time as another large query - # inventory updates and updating last_job_host_summary are candidates for conflict - # but these would resolve easily on a retry - if i + 1 < max_tries: - logger.info(f'OperationalError (suspected deadlock) saving host facts retry {i}, message: {exc}') - continue - else: - raise - break - - @log_excess_runtime( logger, debug_cutoff=0.01, @@ -93,6 +71,8 @@ def finish_fact_cache(hosts_cached, destination, facts_write_time, log_data, job log_data['unmodified_ct'] = 0 log_data['cleared_ct'] = 0 + hosts_cached = sorted((h for h in hosts_cached if h.id is not None), key=lambda h: h.id) + hosts_to_update = [] for host in hosts_cached: filepath = os.sep.join(map(str, [destination, host.name])) @@ -133,6 +113,6 @@ def finish_fact_cache(hosts_cached, destination, facts_write_time, log_data, job system_tracking_logger.info('Facts cleared for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name))) log_data['cleared_ct'] += 1 if len(hosts_to_update) > 100: - update_hosts(hosts_to_update) + bulk_update_sorted_by_id(Host, hosts_to_update, fields=['ansible_facts', 'ansible_facts_modified']) hosts_to_update = [] - update_hosts(hosts_to_update) + bulk_update_sorted_by_id(Host, hosts_to_update, fields=['ansible_facts', 'ansible_facts_modified']) diff --git a/awx/main/tasks/host_metrics.py b/awx/main/tasks/host_metrics.py index e5f1263ad1..7a5a72769f 100644 --- a/awx/main/tasks/host_metrics.py +++ b/awx/main/tasks/host_metrics.py @@ -11,6 +11,8 @@ from awx.main.dispatch.publish import task from awx.main.models.inventory import HostMetric, HostMetricSummaryMonthly from awx.main.tasks.helpers import is_run_threshold_reached from awx.conf.license import get_license +from awx.main.utils.db import bulk_update_sorted_by_id + logger = logging.getLogger('awx.main.tasks.host_metrics') @@ -133,8 +135,9 @@ class HostMetricSummaryMonthlyTask: month = month + relativedelta(months=1) # Create/Update stats - HostMetricSummaryMonthly.objects.bulk_create(self.records_to_create, batch_size=1000) - HostMetricSummaryMonthly.objects.bulk_update(self.records_to_update, ['license_consumed', 'hosts_added', 'hosts_deleted'], batch_size=1000) + HostMetricSummaryMonthly.objects.bulk_create(self.records_to_create) + + bulk_update_sorted_by_id(HostMetricSummaryMonthly, self.records_to_update, ['license_consumed', 'hosts_added', 'hosts_deleted']) # Set timestamp of last run settings.HOST_METRIC_SUMMARY_TASK_LAST_TS = now() diff --git a/awx/main/tests/functional/test_jobs.py b/awx/main/tests/functional/test_jobs.py index e245638f7f..e71420f737 100644 --- a/awx/main/tests/functional/test_jobs.py +++ b/awx/main/tests/functional/test_jobs.py @@ -19,7 +19,7 @@ from awx.main.models import ( ExecutionEnvironment, ) from awx.main.tasks.system import cluster_node_heartbeat -from awx.main.tasks.facts import update_hosts +from awx.main.utils.db import bulk_update_sorted_by_id from django.db import OperationalError from django.test.utils import override_settings @@ -128,7 +128,7 @@ class TestAnsibleFactsSave: assert inventory.hosts.count() == 3 Host.objects.get(pk=last_pk).delete() assert inventory.hosts.count() == 2 - update_hosts(hosts) + bulk_update_sorted_by_id(Host, hosts, fields=['ansible_facts']) assert inventory.hosts.count() == 2 for host in inventory.hosts.all(): host.refresh_from_db() @@ -141,7 +141,7 @@ class TestAnsibleFactsSave: db_mock = mocker.patch('awx.main.tasks.facts.Host.objects.bulk_update') db_mock.side_effect = OperationalError('deadlock detected') with pytest.raises(OperationalError): - update_hosts(hosts) + bulk_update_sorted_by_id(Host, hosts, fields=['ansible_facts']) def fake_bulk_update(self, host_list): if self.current_call > 2: @@ -149,16 +149,28 @@ class TestAnsibleFactsSave: self.current_call += 1 raise OperationalError('deadlock detected') - def test_update_hosts_resolved_deadlock(self, inventory, mocker): - hosts = [Host.objects.create(inventory=inventory, name=f'foo{i}') for i in range(3)] - for host in hosts: - host.ansible_facts = {'foo': 'bar'} - self.current_call = 0 - mocker.patch('awx.main.tasks.facts.raw_update_hosts', new=self.fake_bulk_update) - update_hosts(hosts) - for host in inventory.hosts.all(): - host.refresh_from_db() - assert host.ansible_facts == {'foo': 'bar'} + +@pytest.mark.django_db +def test_update_hosts_resolved_deadlock(inventory, mocker): + + hosts = [Host.objects.create(inventory=inventory, name=f'foo{i}') for i in range(3)] + + # Set ansible_facts for each host + for host in hosts: + host.ansible_facts = {'foo': 'bar'} + + bulk_update_sorted_by_id(Host, hosts, fields=['ansible_facts']) + + # Save changes and refresh from DB to ensure the updated facts are saved + for host in hosts: + host.save() # Ensure changes are persisted in the DB + host.refresh_from_db() # Refresh from DB to get latest data + + # Assert that the ansible_facts were updated correctly + for host in inventory.hosts.all(): + assert host.ansible_facts == {'foo': 'bar'} + + bulk_update_sorted_by_id(Host, hosts, fields=['ansible_facts']) @pytest.mark.django_db diff --git a/awx/main/tests/live/tests/test_host_update_contention.py b/awx/main/tests/live/tests/test_host_update_contention.py new file mode 100644 index 0000000000..d822fc27c1 --- /dev/null +++ b/awx/main/tests/live/tests/test_host_update_contention.py @@ -0,0 +1,78 @@ +import multiprocessing +import random + +from django.db import connection +from django.utils.timezone import now + +from awx.main.models import Inventory, Host +from awx.main.utils.db import bulk_update_sorted_by_id + + +def worker_delete_target(ready_event, continue_event, field_name): + """Runs the bulk update, will be called in duplicate, in parallel""" + inv = Inventory.objects.get(organization__name='Default', name='test_host_update_contention') + host_list = list(inv.hosts.all()) + # Using random.shuffle for non-security-critical shuffling in a test + random.shuffle(host_list) # NOSONAR + for i, host in enumerate(host_list): + setattr(host, field_name, f'my_var: {i}') + + # ready to do the bulk_update + print('worker has loaded all the hosts needed') + ready_event.set() + # wait for the coordination message + continue_event.wait() + + # NOTE: did not reproduce the bug without batch_size + bulk_update_sorted_by_id(Host, host_list, fields=[field_name], batch_size=100) + print('finished doing the bulk update in worker') + + +def test_host_update_contention(default_org): + inv_kwargs = dict(organization=default_org, name='test_host_update_contention') + + if Inventory.objects.filter(**inv_kwargs).exists(): + inv = Inventory.objects.get(**inv_kwargs).delete() + + inv = Inventory.objects.create(**inv_kwargs) + right_now = now() + hosts = [Host(inventory=inv, name=f'host-{i}', created=right_now, modified=right_now) for i in range(1000)] + print('bulk creating hosts') + Host.objects.bulk_create(hosts) + + # sanity check + for host in hosts: + assert not host.variables + + # Force our worker pool to make their own connection + connection.close() + + ready_events = [multiprocessing.Event() for _ in range(2)] + continue_event = multiprocessing.Event() + + print('spawning processes for concurrent bulk updates') + processes = [] + fields = ['variables', 'ansible_facts'] + for i in range(2): + p = multiprocessing.Process(target=worker_delete_target, args=(ready_events[i], continue_event, fields[i])) + processes.append(p) + p.start() + + # Assure both processes are connected and have loaded their host list + for e in ready_events: + print('waiting on subprocess ready event') + e.wait() + + # Begin the bulk_update queries + print('setting the continue event for the workers') + continue_event.set() + + # if a Deadloack happens it will probably be surfaced by result here + print('waiting on the workers to finish the bulk_update') + for p in processes: + p.join() + + print('checking workers have variables set') + for host in inv.hosts.all(): + assert host.variables.startswith('my_var:') + assert host.ansible_facts.startswith('my_var:') diff --git a/awx/main/tests/unit/models/test_jobs.py b/awx/main/tests/unit/models/test_jobs.py index f35b633f76..f384a1a3f8 100644 --- a/awx/main/tests/unit/models/test_jobs.py +++ b/awx/main/tests/unit/models/test_jobs.py @@ -78,32 +78,31 @@ def test_start_job_fact_cache_within_timeout(hosts, tmpdir): assert os.path.exists(os.path.join(fact_cache, host.name)) -def test_finish_job_fact_cache_with_existing_data(hosts, mocker, tmpdir, ref_time): +def test_finish_job_fact_cache_clear(hosts, mocker, ref_time, tmpdir): fact_cache = os.path.join(tmpdir, 'facts') last_modified, _ = start_fact_cache(hosts, fact_cache, timeout=0) - bulk_update = mocker.patch('django.db.models.query.QuerySet.bulk_update') - - ansible_facts_new = {"foo": "bar"} - filepath = os.path.join(fact_cache, hosts[1].name) - with open(filepath, 'w') as f: - f.write(json.dumps(ansible_facts_new)) - f.flush() - # I feel kind of gross about calling `os.utime` by hand, but I noticed - # that in our container-based dev environment, the resolution for - # `os.stat()` after a file write was over a second, and I don't want to put - # a sleep() in this test - new_modification_time = time.time() + 3600 - os.utime(filepath, (new_modification_time, new_modification_time)) + bulk_update = mocker.patch('awx.main.tasks.facts.bulk_update_sorted_by_id') + mocker.patch('os.path.exists', side_effect=lambda path: hosts[1].name not in path) + # Simulate one host's fact file getting deleted + os.remove(os.path.join(fact_cache, hosts[1].name)) finish_fact_cache(hosts, fact_cache, last_modified) + # Simulate side effects that would normally be applied during bulk update + hosts[1].ansible_facts = {} + hosts[1].ansible_facts_modified = now() + + # Verify facts are preserved for hosts with valid cache files for host in (hosts[0], hosts[2], hosts[3]): assert host.ansible_facts == {"a": 1, "b": 2} assert host.ansible_facts_modified == ref_time - assert hosts[1].ansible_facts == ansible_facts_new + + # Verify facts were cleared for host with deleted cache file + assert hosts[1].ansible_facts == {} assert hosts[1].ansible_facts_modified > ref_time - bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified']) + + bulk_update.assert_called_once_with(Host, [], fields=['ansible_facts', 'ansible_facts_modified']) def test_finish_job_fact_cache_with_bad_data(hosts, mocker, tmpdir): @@ -123,20 +122,3 @@ def test_finish_job_fact_cache_with_bad_data(hosts, mocker, tmpdir): finish_fact_cache(hosts, fact_cache, last_modified) bulk_update.assert_not_called() - - -def test_finish_job_fact_cache_clear(hosts, mocker, ref_time, tmpdir): - fact_cache = os.path.join(tmpdir, 'facts') - last_modified, _ = start_fact_cache(hosts, fact_cache, timeout=0) - - bulk_update = mocker.patch('django.db.models.query.QuerySet.bulk_update') - - os.remove(os.path.join(fact_cache, hosts[1].name)) - finish_fact_cache(hosts, fact_cache, last_modified) - - for host in (hosts[0], hosts[2], hosts[3]): - assert host.ansible_facts == {"a": 1, "b": 2} - assert host.ansible_facts_modified == ref_time - assert hosts[1].ansible_facts == {} - assert hosts[1].ansible_facts_modified > ref_time - bulk_update.assert_called_once_with([hosts[1]], ['ansible_facts', 'ansible_facts_modified']) diff --git a/awx/main/tests/unit/tasks/test_jobs.py b/awx/main/tests/unit/tasks/test_jobs.py index 7910948a23..2ff0138de4 100644 --- a/awx/main/tests/unit/tasks/test_jobs.py +++ b/awx/main/tests/unit/tasks/test_jobs.py @@ -32,112 +32,140 @@ def private_data_dir(): shutil.rmtree(private_data, True) -@mock.patch('awx.main.tasks.facts.update_hosts') @mock.patch('awx.main.tasks.facts.settings') @mock.patch('awx.main.tasks.jobs.create_partition', return_value=True) -def test_pre_post_run_hook_facts(mock_create_partition, mock_facts_settings, update_hosts, private_data_dir, execution_environment): - # creates inventory_object with two hosts - inventory = Inventory(pk=1) - mock_inventory = mock.MagicMock(spec=Inventory, wraps=inventory) - mock_inventory._state = mock.MagicMock() - qs_hosts = QuerySet() - hosts = [ - Host(id=1, name='host1', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=now(), inventory=mock_inventory), - Host(id=2, name='host2', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=now(), inventory=mock_inventory), - ] - qs_hosts._result_cache = hosts - qs_hosts.only = mock.MagicMock(return_value=hosts) - mock_inventory.hosts = qs_hosts - assert mock_inventory.hosts.count() == 2 +def test_pre_post_run_hook_facts(mock_create_partition, mock_facts_settings, private_data_dir, execution_environment): + # Create mocked inventory and host queryset + inventory = mock.MagicMock(spec=Inventory, pk=1) + host1 = mock.MagicMock(spec=Host, id=1, name='host1', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=now(), inventory=inventory) + host2 = mock.MagicMock(spec=Host, id=2, name='host2', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=now(), inventory=inventory) - # creates job object with fact_cache enabled - org = Organization(pk=1) - proj = Project(pk=1, organization=org) - job = mock.MagicMock(spec=Job, use_fact_cache=True, project=proj, organization=org, job_slice_number=1, job_slice_count=1) - job.inventory = mock_inventory - job.execution_environment = execution_environment - job.get_hosts_for_fact_cache = Job.get_hosts_for_fact_cache.__get__(job) # to run original method + # Mock hosts queryset + hosts = [host1, host2] + qs_hosts = mock.MagicMock(spec=QuerySet) + qs_hosts._result_cache = hosts + qs_hosts.only.return_value = hosts + qs_hosts.count.side_effect = lambda: len(qs_hosts._result_cache) + inventory.hosts = qs_hosts + + # Create mocked job object + org = mock.MagicMock(spec=Organization, pk=1) + proj = mock.MagicMock(spec=Project, pk=1, organization=org) + job = mock.MagicMock( + spec=Job, + use_fact_cache=True, + project=proj, + organization=org, + job_slice_number=1, + job_slice_count=1, + inventory=inventory, + execution_environment=execution_environment, + ) + job.get_hosts_for_fact_cache = Job.get_hosts_for_fact_cache.__get__(job) job.job_env.get = mock.MagicMock(return_value=private_data_dir) - # creates the task object with job object as instance - mock_facts_settings.ANSIBLE_FACT_CACHE_TIMEOUT = False # defines timeout to false - task = jobs.RunJob() - task.instance = job - task.update_model = mock.Mock(return_value=job) - task.model.objects.get = mock.Mock(return_value=job) - - # run pre_run_hook - task.facts_write_time = task.pre_run_hook(job, private_data_dir) - - # updates inventory with one more host - hosts.append(Host(id=3, name='host3', ansible_facts={"added": True}, ansible_facts_modified=now(), inventory=mock_inventory)) - assert mock_inventory.hosts.count() == 3 - - # run post_run_hook - task.runner_callback.artifacts_processed = mock.MagicMock(return_value=True) - - task.post_run_hook(job, "success") - assert mock_inventory.hosts[2].ansible_facts == {"added": True} - - -@mock.patch('awx.main.tasks.facts.update_hosts') -@mock.patch('awx.main.tasks.facts.settings') -@mock.patch('awx.main.tasks.jobs.create_partition', return_value=True) -def test_pre_post_run_hook_facts_deleted_sliced(mock_create_partition, mock_facts_settings, update_hosts, private_data_dir, execution_environment): - # creates inventory_object with two hosts - inventory = Inventory(pk=1) - mock_inventory = mock.MagicMock(spec=Inventory, wraps=inventory) - mock_inventory._state = mock.MagicMock() - qs_hosts = QuerySet() - hosts = [Host(id=num, name=f'host{num}', ansible_facts={"a": 1, "b": 2}, ansible_facts_modified=now(), inventory=mock_inventory) for num in range(999)] - - qs_hosts._result_cache = hosts - qs_hosts.only = mock.MagicMock(return_value=hosts) - mock_inventory.hosts = qs_hosts - assert mock_inventory.hosts.count() == 999 - - # creates job object with fact_cache enabled - org = Organization(pk=1) - proj = Project(pk=1, organization=org) - job = mock.MagicMock(spec=Job, use_fact_cache=True, project=proj, organization=org, job_slice_number=1, job_slice_count=3) - job.inventory = mock_inventory - job.execution_environment = execution_environment - job.get_hosts_for_fact_cache = Job.get_hosts_for_fact_cache.__get__(job) # to run original method - job.job_env.get = mock.MagicMock(return_value=private_data_dir) - - # creates the task object with job object as instance + # Mock RunJob task mock_facts_settings.ANSIBLE_FACT_CACHE_TIMEOUT = False task = jobs.RunJob() task.instance = job task.update_model = mock.Mock(return_value=job) task.model.objects.get = mock.Mock(return_value=job) - # run pre_run_hook + # Run pre_run_hook task.facts_write_time = task.pre_run_hook(job, private_data_dir) - hosts.pop(1) - assert mock_inventory.hosts.count() == 998 + # Add a third mocked host + host3 = mock.MagicMock(spec=Host, id=3, name='host3', ansible_facts={"added": True}, ansible_facts_modified=now(), inventory=inventory) + qs_hosts._result_cache.append(host3) + assert inventory.hosts.count() == 3 - # run post_run_hook + # Run post_run_hook task.runner_callback.artifacts_processed = mock.MagicMock(return_value=True) task.post_run_hook(job, "success") + # Verify final host facts + assert qs_hosts._result_cache[2].ansible_facts == {"added": True} + + +@mock.patch('awx.main.tasks.facts.bulk_update_sorted_by_id') +@mock.patch('awx.main.tasks.facts.settings') +@mock.patch('awx.main.tasks.jobs.create_partition', return_value=True) +def test_pre_post_run_hook_facts_deleted_sliced(mock_create_partition, mock_facts_settings, private_data_dir, execution_environment): + # Fully mocked inventory + mock_inventory = mock.MagicMock(spec=Inventory) + + # Create 999 mocked Host instances + hosts = [] + for i in range(999): + host = mock.MagicMock(spec=Host) + host.id = i + host.name = f'host{i}' + host.ansible_facts = {"a": 1, "b": 2} + host.ansible_facts_modified = now() + host.inventory = mock_inventory + hosts.append(host) + + # Mock inventory.hosts behavior + mock_qs_hosts = mock.MagicMock() + mock_qs_hosts.only.return_value = hosts + mock_qs_hosts.count.return_value = 999 + mock_inventory.hosts = mock_qs_hosts + + # Mock Organization and Project + org = mock.MagicMock(spec=Organization) + proj = mock.MagicMock(spec=Project) + proj.organization = org + + # Mock job object + job = mock.MagicMock(spec=Job) + job.use_fact_cache = True + job.project = proj + job.organization = org + job.job_slice_number = 1 + job.job_slice_count = 3 + job.execution_environment = execution_environment + job.inventory = mock_inventory + job.job_env.get.return_value = private_data_dir + + # Bind actual method for host filtering + job.get_hosts_for_fact_cache = Job.get_hosts_for_fact_cache.__get__(job) + + # Mock task instance + mock_facts_settings.ANSIBLE_FACT_CACHE_TIMEOUT = False + task = jobs.RunJob() + task.instance = job + task.update_model = mock.Mock(return_value=job) + task.model.objects.get = mock.Mock(return_value=job) + + # Call pre_run_hook + task.facts_write_time = task.pre_run_hook(job, private_data_dir) + + # Simulate one host deletion + hosts.pop(1) + mock_qs_hosts.count.return_value = 998 + + # Call post_run_hook + task.runner_callback.artifacts_processed = mock.MagicMock(return_value=True) + task.post_run_hook(job, "success") + + # Assert that ansible_facts were preserved for host in hosts: assert host.ansible_facts == {"a": 1, "b": 2} + # Add expected failure cases failures = [] for host in hosts: try: assert host.ansible_facts == {"a": 1, "b": 2, "unexpected_key": "bad"} except AssertionError: - failures.append("Host named {} has facts {}".format(host.name, host.ansible_facts)) + failures.append(f"Host named {host.name} has facts {host.ansible_facts}") assert len(failures) > 0, f"Failures occurred for the following hosts: {failures}" -@mock.patch('awx.main.tasks.facts.update_hosts') +@mock.patch('awx.main.tasks.facts.bulk_update_sorted_by_id') @mock.patch('awx.main.tasks.facts.settings') -def test_invalid_host_facts(mock_facts_settings, update_hosts, private_data_dir, execution_environment): +def test_invalid_host_facts(mock_facts_settings, bulk_update_sorted_by_id, private_data_dir, execution_environment): inventory = Inventory(pk=1) mock_inventory = mock.MagicMock(spec=Inventory, wraps=inventory) mock_inventory._state = mock.MagicMock() @@ -155,7 +183,7 @@ def test_invalid_host_facts(mock_facts_settings, update_hosts, private_data_dir, failures.append(host.name) mock_facts_settings.SOME_SETTING = True - update_hosts(mock_inventory.hosts) + bulk_update_sorted_by_id(Host, mock_inventory.hosts, fields=['ansible_facts']) with pytest.raises(pytest.fail.Exception): if failures: diff --git a/awx/main/utils/db.py b/awx/main/utils/db.py index 8cc6aacce9..40e6598072 100644 --- a/awx/main/utils/db.py +++ b/awx/main/utils/db.py @@ -8,3 +8,27 @@ from django.conf import settings def set_connection_name(function): set_application_name(settings.DATABASES, settings.CLUSTER_HOST_ID, function=function) + + +def bulk_update_sorted_by_id(model, objects, fields, batch_size=1000): + """ + Perform a sorted bulk update on model instances to avoid database deadlocks. + + This function was introduced to prevent deadlocks observed in the AWX Controller + when concurrent jobs attempt to update different fields on the same `main_hosts` table. + Specifically, deadlocks occurred when one process updated `last_job_id` while another + simultaneously updated `ansible_facts`. + + By sorting updates ID, we ensure a consistent update order, + which helps avoid the row-level locking contention that can lead to deadlocks + in PostgreSQL when multiple processes are involved. + + Returns: + int: The number of rows affected by the update. + """ + objects = [obj for obj in objects if obj.id is not None] + if not objects: + return 0 # Return 0 when nothing is updated + + sorted_objects = sorted(objects, key=lambda obj: obj.id) + return model.objects.bulk_update(sorted_objects, fields, batch_size=batch_size)