diff --git a/awx/api/serializers.py b/awx/api/serializers.py index 12799e0d94..bcee4ba225 100644 --- a/awx/api/serializers.py +++ b/awx/api/serializers.py @@ -768,6 +768,7 @@ class UnifiedJobSerializer(BaseSerializer): 'result_traceback', 'event_processing_finished', 'launched_by', + 'work_unit_id', ) extra_kwargs = { diff --git a/awx/main/migrations/0148_unifiedjob_receptor_unit_id.py b/awx/main/migrations/0148_unifiedjob_receptor_unit_id.py new file mode 100644 index 0000000000..9938daa691 --- /dev/null +++ b/awx/main/migrations/0148_unifiedjob_receptor_unit_id.py @@ -0,0 +1,20 @@ +# Generated by Django 2.2.16 on 2021-06-11 04:50 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0147_validate_ee_image_field'), + ] + + operations = [ + migrations.AddField( + model_name='unifiedjob', + name='work_unit_id', + field=models.CharField( + blank=True, default=None, editable=False, help_text='The Receptor work unit ID associated with this job.', max_length=255, null=True + ), + ), + ] diff --git a/awx/main/models/unified_jobs.py b/awx/main/models/unified_jobs.py index 2b2f05a51a..fbbede899d 100644 --- a/awx/main/models/unified_jobs.py +++ b/awx/main/models/unified_jobs.py @@ -717,6 +717,9 @@ class UnifiedJob( editable=False, help_text=_("The version of Ansible Core installed in the execution environment."), ) + work_unit_id = models.CharField( + max_length=255, blank=True, default=None, editable=False, null=True, help_text=_("The Receptor work unit ID associated with this job.") + ) def get_absolute_url(self, request=None): RealClass = self.get_real_instance_class() diff --git a/awx/main/tasks.py b/awx/main/tasks.py index e2eab4c3c3..ba8a61e9dd 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -472,6 +472,33 @@ def cluster_node_heartbeat(): logger.exception('Error marking {} as lost'.format(other_inst.hostname)) +@task(queue=get_local_queuename) +def awx_receptor_workunit_reaper(): + """ + When an AWX job is launched via receptor, files such as status, stdin, and stdout are created + in a specific receptor directory. This directory on disk is a random 8 character string, e.g. qLL2JFNT + This is also called the work Unit ID in receptor, and is used in various receptor commands, + e.g. "work results qLL2JFNT" + After an AWX job executes, the receptor work unit directory is cleaned up by + issuing the work release command. In some cases the release process might fail, or + if AWX crashes during a job's execution, the work release command is never issued to begin with. + As such, this periodic task will obtain a list of all receptor work units, and find which ones + belong to AWX jobs that are in a completed state (status is canceled, error, or succeeded). + This task will call "work release" on each of these work units to clean up the files on disk. + """ + if not settings.RECEPTOR_RELEASE_WORK: + return + logger.debug("Checking for unreleased receptor work units") + receptor_ctl = get_receptor_ctl() + receptor_work_list = receptor_ctl.simple_command("work list") + + unit_ids = [id for id in receptor_work_list] + jobs_with_unreleased_receptor_units = UnifiedJob.objects.filter(work_unit_id__in=unit_ids).exclude(status__in=ACTIVE_STATES) + for job in jobs_with_unreleased_receptor_units: + logger.debug(f"{job.log_format} is not active, reaping receptor work unit {job.work_unit_id}") + receptor_ctl.simple_command(f"work release {job.work_unit_id}") + + @task(queue=get_local_queuename) def awx_k8s_reaper(): if not settings.RECEPTOR_RELEASE_WORK: @@ -729,6 +756,10 @@ def with_path_cleanup(f): return _wrapped +def get_receptor_ctl(): + return ReceptorControl('/var/run/receptor/receptor.sock') + + class BaseTask(object): model = None event_model = None @@ -1370,8 +1401,8 @@ class BaseTask(object): ) else: receptor_job = AWXReceptorJob(self, params) - self.unit_id = receptor_job.unit_id res = receptor_job.run() + self.unit_id = receptor_job.unit_id if not res: return @@ -2890,7 +2921,7 @@ class AWXReceptorJob: def run(self): # We establish a connection to the Receptor socket - receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock') + receptor_ctl = get_receptor_ctl() try: return self._run_internal(receptor_ctl) @@ -2912,6 +2943,7 @@ class AWXReceptorJob: # in the right side of our socketpair for reading. result = receptor_ctl.submit_work(worktype=self.work_type, payload=sockout.makefile('rb'), params=self.receptor_params) self.unit_id = result['unitid'] + self.task.update_model(self.task.instance.pk, work_unit_id=result['unitid']) sockin.close() sockout.close() @@ -3026,10 +3058,6 @@ class AWXReceptorJob: result = namedtuple('result', ['status', 'rc']) return result('canceled', 1) - if hasattr(self, 'unit_id') and 'RECEPTOR_UNIT_ID' not in self.task.instance.job_env: - self.task.instance.job_env['RECEPTOR_UNIT_ID'] = self.unit_id - self.task.update_model(self.task.instance.pk, job_env=self.task.instance.job_env) - time.sleep(1) @property diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index d1b38a2c1e..413899a894 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -427,6 +427,7 @@ CELERYBEAT_SCHEDULE = { 'gather_analytics': {'task': 'awx.main.tasks.gather_analytics', 'schedule': timedelta(minutes=5)}, 'task_manager': {'task': 'awx.main.scheduler.tasks.run_task_manager', 'schedule': timedelta(seconds=20), 'options': {'expires': 20}}, 'k8s_reaper': {'task': 'awx.main.tasks.awx_k8s_reaper', 'schedule': timedelta(seconds=60), 'options': {'expires': 50}}, + 'receptor_reaper': {'task': 'awx.main.tasks.awx_receptor_workunit_reaper', 'schedule': timedelta(seconds=60)}, 'send_subsystem_metrics': {'task': 'awx.main.analytics.analytics_tasks.send_subsystem_metrics', 'schedule': timedelta(seconds=20)}, 'cleanup_images': {'task': 'awx.main.tasks.cleanup_execution_environment_images', 'schedule': timedelta(hours=3)}, } diff --git a/docs/tasks.md b/docs/tasks.md index 7ff5847052..584ea243a1 100644 --- a/docs/tasks.md +++ b/docs/tasks.md @@ -156,6 +156,11 @@ One of the most important tasks in a clustered AWX installation is the periodic If a node in an AWX cluster discovers that one of its peers has not updated its heartbeat within a certain grace period, it is assumed to be offline, and its capacity is set to zero to avoid scheduling new tasks on that node. Additionally, jobs allegedly running or scheduled to run on that node are assumed to be lost, and "reaped", or marked as failed. +## Reaping Receptor Work Units +When an AWX job is launched via receptor, files such as status, stdin, and stdout are created in a specific receptor directory. This directory on disk is a random 8 character string, e.g. qLL2JFNT +This is also called the work Unit ID in receptor, and is used in various receptor commands, e.g. "work results qLL2JFNT" +After an AWX job executes, the receptor work unit directory is cleaned up by issuing the work release command. In some cases the release process might fail, or if AWX crashes during a job's execution, the work release command is never issued to begin with. +As such, there is a periodic task that will obtain a list of all receptor work units, and find which ones belong to AWX jobs that are in a completed state (status is canceled, error, or succeeded). This task will call "work release" on each of these work units to clean up the files on disk. ## AWX Jobs