From fcc7d3d7b199839bbacf6af65713761b9d5bedf6 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 3 Dec 2014 11:26:51 -0500 Subject: [PATCH] Rework the callback emitter a little more, purge tower internals from the module and fallback to the older codebase's mechanism. Make sure we are passing the callback port as a string otherwise it seems to corrupt the shell --- awx/main/tasks.py | 2 +- awx/plugins/callback/job_event_callback.py | 38 ++++++++++++++++++---- 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 42e1087c6b..43da92bc25 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -585,7 +585,7 @@ class RunJob(BaseTask): env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_TOKEN'] = job.task_auth_token or '' - #env['CALLBACK_CONSUMER_PORT'] = settings.CALLBACK_CONSUMER_PORT + env['CALLBACK_CONSUMER_PORT'] = str(settings.CALLBACK_CONSUMER_PORT) if getattr(settings, 'JOB_CALLBACK_DEBUG', False): env['JOB_CALLBACK_DEBUG'] = '2' elif settings.DEBUG: diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 5d985f9bd0..55e4d37dc1 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -43,8 +43,7 @@ from contextlib import closing # Requests import requests -# Tower -from awx.main.socket import Socket +import zmq class TokenAuth(requests.auth.AuthBase): @@ -80,6 +79,9 @@ class CallbackModule(object): self.job_id = int(os.getenv('JOB_ID')) self.base_url = os.getenv('REST_API_URL', '') self.auth_token = os.getenv('REST_API_TOKEN', '') + self.callback_consumer_port = os.getenv('CALLBACK_CONSUMER_PORT', 5556) + self.context = None + self.socket = None self._init_logging() self._init_connection() self.counter = 0 @@ -106,6 +108,11 @@ class CallbackModule(object): self.context = None self.socket = None + def _start_connection(self): + self.context = zmq.Context() + self.socket = self.context.socket(zmq.REQ) + self.socket.connect("tcp://127.0.0.1:%s" % str(self.callback_consumer_port)) + def _post_job_event_queue_msg(self, event, event_data): self.counter += 1 msg = { @@ -116,10 +123,29 @@ class CallbackModule(object): 'created': datetime.datetime.utcnow().isoformat(), } - # Publish the callback. - with Socket('callbacks', 'w', debug=self.job_callback_debug, - logger=self.logger) as callbacks: - callbacks.publish(msg) + active_pid = os.getpid() + if self.job_callback_debug: + msg.update({ + 'pid': active_pid, + }) + for retry_count in xrange(4): + try: + if not hasattr(self, 'connection_pid'): + self.connection_pid = active_pid + if self.connection_pid != active_pid: + self._init_connection() + if self.context is None: + self._start_connection() + + self.socket.send_json(msg) + self.socket.recv() + return + except Exception, e: + self.logger.info('Publish Exception: %r, retry=%d', e, + retry_count, exc_info=True) + # TODO: Maybe recycle connection here? + if retry_count >= 3: + raise def _post_rest_api_event(self, event, event_data): data = json.dumps({