fix scheduled jobs race condition

* The periodic scheduler that runs and spawns jobs from Schedule()'s can
end up spawning more jobs than intended, for a single Schedule.
Specifically, when tower clustering is involed. This change adds a
"global" database lock around this critical code. If another process is
already doing the scheduling, short circuit.
This commit is contained in:
chris meyers
2019-02-07 16:57:06 -05:00
parent 1328fb80a0
commit d4c3c089df

View File

@@ -435,53 +435,59 @@ def awx_isolated_heartbeat():
@task() @task()
def awx_periodic_scheduler(): def awx_periodic_scheduler():
run_now = now() with advisory_lock('awx_periodic_scheduler_lock', wait=False) as acquired:
state = TowerScheduleState.get_solo() if acquired is False:
last_run = state.schedule_last_run logger.debug("Not running periodic scheduler, another task holds lock")
logger.debug("Last scheduler run was: %s", last_run) return
state.schedule_last_run = run_now logger.debug("Starting periodic scheduler")
state.save()
old_schedules = Schedule.objects.enabled().before(last_run) run_now = now()
for schedule in old_schedules: state = TowerScheduleState.get_solo()
schedule.save() last_run = state.schedule_last_run
schedules = Schedule.objects.enabled().between(last_run, run_now) logger.debug("Last scheduler run was: %s", last_run)
state.schedule_last_run = run_now
state.save()
invalid_license = False old_schedules = Schedule.objects.enabled().before(last_run)
try: for schedule in old_schedules:
access_registry[Job](None).check_license() schedule.save()
except PermissionDenied as e: schedules = Schedule.objects.enabled().between(last_run, run_now)
invalid_license = e
for schedule in schedules: invalid_license = False
template = schedule.unified_job_template
schedule.save() # To update next_run timestamp.
if template.cache_timeout_blocked:
logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
continue
try: try:
job_kwargs = schedule.get_job_kwargs() access_registry[Job](None).check_license()
new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs) except PermissionDenied as e:
logger.info('Spawned {} from schedule {}-{}.'.format( invalid_license = e
new_unified_job.log_format, schedule.name, schedule.pk))
if invalid_license: for schedule in schedules:
template = schedule.unified_job_template
schedule.save() # To update next_run timestamp.
if template.cache_timeout_blocked:
logger.warn("Cache timeout is in the future, bypassing schedule for template %s" % str(template.id))
continue
try:
job_kwargs = schedule.get_job_kwargs()
new_unified_job = schedule.unified_job_template.create_unified_job(**job_kwargs)
logger.info(six.text_type('Spawned {} from schedule {}-{}.').format(
new_unified_job.log_format, schedule.name, schedule.pk))
if invalid_license:
new_unified_job.status = 'failed'
new_unified_job.job_explanation = str(invalid_license)
new_unified_job.save(update_fields=['status', 'job_explanation'])
new_unified_job.websocket_emit_status("failed")
raise invalid_license
can_start = new_unified_job.signal_start()
except Exception:
logger.exception('Error spawning scheduled job.')
continue
if not can_start:
new_unified_job.status = 'failed' new_unified_job.status = 'failed'
new_unified_job.job_explanation = str(invalid_license) new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
new_unified_job.save(update_fields=['status', 'job_explanation']) new_unified_job.save(update_fields=['status', 'job_explanation'])
new_unified_job.websocket_emit_status("failed") new_unified_job.websocket_emit_status("failed")
raise invalid_license emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
can_start = new_unified_job.signal_start() state.save()
except Exception:
logger.exception('Error spawning scheduled job.')
continue
if not can_start:
new_unified_job.status = 'failed'
new_unified_job.job_explanation = "Scheduled job could not start because it was not in the right state or required manual credentials"
new_unified_job.save(update_fields=['status', 'job_explanation'])
new_unified_job.websocket_emit_status("failed")
emit_channel_notification('schedules-changed', dict(id=schedule.id, group_name="schedules"))
state.save()
@task() @task()