mirror of
https://github.com/ansible/awx.git
synced 2026-02-02 18:18:12 -03:30
fix scheduled jobs race condition
* The periodic scheduler that runs and spawns jobs from Schedule()'s can end up spawning more jobs than intended, for a single Schedule. Specifically, when tower clustering is involed. This change adds a "global" database lock around this critical code. If another process is already doing the scheduling, short circuit.
This commit is contained in:
@@ -435,53 +435,59 @@ def awx_isolated_heartbeat():
|
|||||||
|
|
||||||
@task()
|
@task()
|
||||||
def awx_periodic_scheduler():
|
def awx_periodic_scheduler():
|
||||||
run_now = now()
|
with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired:
|
||||||
state = TowerScheduleState.get_solo()
|
if acquired is False:
|
||||||
last_run = state.schedule_last_run
|
logger.debug("Not running periodic scheduler, another task holds lock")
|
||||||
logger.debug("Last scheduler run was: %s", last_run)
|
return
|
||||||
state.schedule_last_run = run_now
|
logger.debug("Starting periodic scheduler")
|
||||||
state.save()
|
|
||||||
|
|
||||||
old_schedules = Schedule.objects.enabled().before(last_run)
|
run_now = now()
|
||||||
for schedule in old_schedules:
|
state = TowerScheduleState.get_solo()
|
||||||
schedule.save()
|
last_run = state.schedule_last_run
|
||||||
schedules = Schedule.objects.enabled().between(last_run, run_now)
|
logger.debug("Last scheduler run was: %s", last_run)
|
||||||
|
state.schedule_last_run = run_now
|
||||||
|
state.save()
|
||||||
|
|
||||||
invalid_license = False
|
old_schedules = Schedule.objects.enabled().before(last_run)
|
||||||
try:
|
for schedule in old_schedules:
|
||||||
access_registry[Job](None).check_license()
|
schedule.save()
|
||||||
except PermissionDenied as e:
|
schedules = Schedule.objects.enabled().between(last_run, run_now)
|
||||||
invalid_license = e
|
|
||||||
|
|
||||||
for schedule in schedules:
|
invalid_license = False
|
||||||
template = schedule.unified_job_template
|
|
||||||
schedule.save() # To update next_run timestamp.
|
|
||||||
if template.cache_timeout_blocked:
|
|
||||||
logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
|
|
||||||
continue
|
|
||||||
try:
|
try:
|
||||||
job_kwargs = schedule.get_job_kwargs()
|
access_registry[Job](None).check_license()
|
||||||
new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
|
except PermissionDenied as e:
|
||||||
logger.info('Spawned {} from schedule {}-{}.'.format(
|
invalid_license = e
|
||||||
new_unified_job.log_format, schedule.name, schedule.pk))
|
|
||||||
|
|
||||||
if invalid_license:
|
for schedule in schedules:
|
||||||
|
template = schedule.unified_job_template
|
||||||
|
schedule.save() # To update next_run timestamp.
|
||||||
|
if template.cache_timeout_blocked:
|
||||||
|
logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
job_kwargs = schedule.get_job_kwargs()
|
||||||
|
new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
|
||||||
|
logger.info(six.text_type('Spawned {} from schedule {}-{}.').format(
|
||||||
|
new_unified_job.log_format, schedule.name, schedule.pk))
|
||||||
|
|
||||||
|
if invalid_license:
|
||||||
|
new_unified_job.status = 'failed'
|
||||||
|
new_unified_job.job_explanation = str(invalid_license)
|
||||||
|
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
||||||
|
new_unified_job.websocket_emit_status("failed")
|
||||||
|
raise invalid_license
|
||||||
|
can_start = new_unified_job.signal_start()
|
||||||
|
except Exception:
|
||||||
|
logger.exception('Error spawning scheduled job.')
|
||||||
|
continue
|
||||||
|
if not can_start:
|
||||||
new_unified_job.status = 'failed'
|
new_unified_job.status = 'failed'
|
||||||
new_unified_job.job_explanation = str(invalid_license)
|
new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
|
||||||
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
||||||
new_unified_job.websocket_emit_status("failed")
|
new_unified_job.websocket_emit_status("failed")
|
||||||
raise invalid_license
|
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
|
||||||
can_start = new_unified_job.signal_start()
|
state.save()
|
||||||
except Exception:
|
|
||||||
logger.exception('Error spawning scheduled job.')
|
|
||||||
continue
|
|
||||||
if not can_start:
|
|
||||||
new_unified_job.status = 'failed'
|
|
||||||
new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
|
|
||||||
new_unified_job.save(update_fields=['status', 'job_explanation'])
|
|
||||||
new_unified_job.websocket_emit_status("failed")
|
|
||||||
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
|
|
||||||
state.save()
|
|
||||||
|
|
||||||
|
|
||||||
@task()
|
@task()
|
||||||
|
|||||||
Reference in New Issue
Block a user