From bec301c2a8c213c273f7b2676a0061b4a5b8b62d Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Sat, 22 Mar 2014 11:18:25 -0400 Subject: [PATCH] Shift task start signal into an asynchronous task so we don't deadlock trying to update the same record from the task runner while waiting for the signal to be received from the signaler --- awx/main/management/commands/run_task_system.py | 9 +++++---- awx/main/models/inventory.py | 7 ++----- awx/main/models/jobs.py | 7 ++----- awx/main/models/projects.py | 7 ++----- awx/main/tasks.py | 14 +++++++++----- 5 files changed, 20 insertions(+), 24 deletions(-) diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index 422c0134da..fc69ef0323 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -142,6 +142,10 @@ def get_tasks(): def rebuild_graph(message): ''' Regenerate the task graph by refreshing known tasks from Tower, purging orphaned running tasks, and creatingdependencies for new tasks before generating directed edge relationships between those tasks ''' + all_sorted_tasks = get_tasks() + if not len(all_sorted_tasks): + return None + inspector = inspect() if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): active_task_queues = inspector.active() @@ -159,9 +163,6 @@ def rebuild_graph(message): # TODO: Something needs to be done here to signal to the system as a whole that celery appears to be down if not hasattr(settings, 'CELERY_UNIT_TEST'): return None - all_sorted_tasks = get_tasks() - if not len(all_sorted_tasks): - return None running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) waiting_tasks = filter(lambda t: t.status != 'running', all_sorted_tasks) new_tasks = filter(lambda t: t.status == 'new', all_sorted_tasks) @@ -264,7 +265,7 @@ def run_taskmanager(command_port): command_socket.send("1") except zmq.ZMQError,e: message = None - if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 60: + if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 10: if message is not None and 'pause' in message: print("Pause command received: %s" % str(message)) paused = message['pause'] diff --git a/awx/main/models/inventory.py b/awx/main/models/inventory.py index 7ffdeba14c..5e053cc424 100644 --- a/awx/main/models/inventory.py +++ b/awx/main/models/inventory.py @@ -768,6 +768,7 @@ class InventoryUpdate(CommonTask): return 50 def signal_start(self, **kwargs): + from awx.main.tasks import notify_task_runner if not self.can_start: return False needed = self._get_passwords_needed_to_start() @@ -780,9 +781,5 @@ class InventoryUpdate(CommonTask): self.save() self.start_args = encrypt_field(self, 'start_args') self.save() - signal_context = zmq.Context() - signal_socket = signal_context.socket(zmq.REQ) - signal_socket.connect(settings.TASK_COMMAND_PORT) - signal_socket.send_json(dict(task_type="inventory_update", id=self.id, metadata=kwargs)) - signal_socket.recv() + notify_task_runner.delay(dict(task_type="inventory_update", id=self.id, metadata=kwargs)) return True diff --git a/awx/main/models/jobs.py b/awx/main/models/jobs.py index a5fba271ae..d7118cf3f4 100644 --- a/awx/main/models/jobs.py +++ b/awx/main/models/jobs.py @@ -385,6 +385,7 @@ class Job(CommonTask): return dependencies def signal_start(self, **kwargs): + from awx.main.tasks import notify_task_runner if hasattr(settings, 'CELERY_UNIT_TEST'): return self.start(None, **kwargs) if not self.can_start: @@ -399,11 +400,7 @@ class Job(CommonTask): self.save() self.start_args = encrypt_field(self, 'start_args') self.save() - signal_context = zmq.Context() - signal_socket = signal_context.socket(zmq.REQ) - signal_socket.connect(settings.TASK_COMMAND_PORT) - signal_socket.send_json(dict(task_type="ansible_playbook", id=self.id)) - signal_socket.recv() + notify_task_runner.delay(dict(task_type="ansible_playbook", id=self.id)) return True def start(self, error_callback, **kwargs): diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index c8586bd02a..93c45f6cd5 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -380,6 +380,7 @@ class ProjectUpdate(CommonTask): return 20 def signal_start(self, **kwargs): + from awx.main.tasks import notify_task_runner if not self.can_start: return False needed = self._get_passwords_needed_to_start() @@ -392,11 +393,7 @@ class ProjectUpdate(CommonTask): self.save() self.start_args = encrypt_field(self, 'start_args') self.save() - signal_context = zmq.Context() - signal_socket = signal_context.socket(zmq.REQ) - signal_socket.connect(settings.TASK_COMMAND_PORT) - signal_socket.send_json(dict(task_type="project_update", id=self.id, metadata=kwargs)) - signal_socket.recv() + notify_task_runner.delay(dict(task_type="project_update", id=self.id, metadata=kwargs)) return True def _update_parent_instance(self): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 8da8a30309..c05f4b8031 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -51,6 +51,14 @@ logger = logging.getLogger('awx.main.tasks') # FIXME: Cleanly cancel task when celery worker is stopped. +@task() +def notify_task_runner(metadata_dict): + time.sleep(1) + signal_context = zmq.Context() + signal_socket = signal_context.socket(zmq.PUSH) + signal_socket.connect(settings.TASK_COMMAND_PORT) + signal_socket.send_json(metadata_dict) + @task(bind=True) def handle_work_error(self, task_id, subtasks=None): print('Executing error task id %s, subtasks: %s' % (str(self.request.id), str(subtasks))) @@ -124,11 +132,7 @@ class BaseTask(Task): self.model._meta.object_name, retry_count) def signal_finished(self, pk): - signal_context = zmq.Context() - signal_socket = signal_context.socket(zmq.REQ) - signal_socket.connect(settings.TASK_COMMAND_PORT) - signal_socket.send_json(dict(complete=pk)) - signal_socket.recv() + notify_task_runner(dict(complete=pk)) def get_model(self, pk): return self.model.objects.get(pk=pk)