mirror of
https://github.com/ansible/awx.git
synced 2026-02-21 05:00:07 -03:30
Rework the callback emitter a little more, purge tower internals from
the module and fallback to the older codebase's mechanism. Make sure we are passing the callback port as a string otherwise it seems to corrupt the shell
This commit is contained in:
@@ -585,7 +585,7 @@ class RunJob(BaseTask):
|
|||||||
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir
|
env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir
|
||||||
env['REST_API_URL'] = settings.INTERNAL_API_URL
|
env['REST_API_URL'] = settings.INTERNAL_API_URL
|
||||||
env['REST_API_TOKEN'] = job.task_auth_token or ''
|
env['REST_API_TOKEN'] = job.task_auth_token or ''
|
||||||
#env['CALLBACK_CONSUMER_PORT'] = settings.CALLBACK_CONSUMER_PORT
|
env['CALLBACK_CONSUMER_PORT'] = str(settings.CALLBACK_CONSUMER_PORT)
|
||||||
if getattr(settings, 'JOB_CALLBACK_DEBUG', False):
|
if getattr(settings, 'JOB_CALLBACK_DEBUG', False):
|
||||||
env['JOB_CALLBACK_DEBUG'] = '2'
|
env['JOB_CALLBACK_DEBUG'] = '2'
|
||||||
elif settings.DEBUG:
|
elif settings.DEBUG:
|
||||||
|
|||||||
@@ -43,8 +43,7 @@ from contextlib import closing
|
|||||||
# Requests
|
# Requests
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
# Tower
|
import zmq
|
||||||
from awx.main.socket import Socket
|
|
||||||
|
|
||||||
class TokenAuth(requests.auth.AuthBase):
|
class TokenAuth(requests.auth.AuthBase):
|
||||||
|
|
||||||
@@ -80,6 +79,9 @@ class CallbackModule(object):
|
|||||||
self.job_id = int(os.getenv('JOB_ID'))
|
self.job_id = int(os.getenv('JOB_ID'))
|
||||||
self.base_url = os.getenv('REST_API_URL', '')
|
self.base_url = os.getenv('REST_API_URL', '')
|
||||||
self.auth_token = os.getenv('REST_API_TOKEN', '')
|
self.auth_token = os.getenv('REST_API_TOKEN', '')
|
||||||
|
self.callback_consumer_port = os.getenv('CALLBACK_CONSUMER_PORT', 5556)
|
||||||
|
self.context = None
|
||||||
|
self.socket = None
|
||||||
self._init_logging()
|
self._init_logging()
|
||||||
self._init_connection()
|
self._init_connection()
|
||||||
self.counter = 0
|
self.counter = 0
|
||||||
@@ -106,6 +108,11 @@ class CallbackModule(object):
|
|||||||
self.context = None
|
self.context = None
|
||||||
self.socket = None
|
self.socket = None
|
||||||
|
|
||||||
|
def _start_connection(self):
|
||||||
|
self.context = zmq.Context()
|
||||||
|
self.socket = self.context.socket(zmq.REQ)
|
||||||
|
self.socket.connect("tcp://127.0.0.1:%s" % str(self.callback_consumer_port))
|
||||||
|
|
||||||
def _post_job_event_queue_msg(self, event, event_data):
|
def _post_job_event_queue_msg(self, event, event_data):
|
||||||
self.counter += 1
|
self.counter += 1
|
||||||
msg = {
|
msg = {
|
||||||
@@ -116,10 +123,29 @@ class CallbackModule(object):
|
|||||||
'created': datetime.datetime.utcnow().isoformat(),
|
'created': datetime.datetime.utcnow().isoformat(),
|
||||||
}
|
}
|
||||||
|
|
||||||
# Publish the callback.
|
active_pid = os.getpid()
|
||||||
with Socket('callbacks', 'w', debug=self.job_callback_debug,
|
if self.job_callback_debug:
|
||||||
logger=self.logger) as callbacks:
|
msg.update({
|
||||||
callbacks.publish(msg)
|
'pid': active_pid,
|
||||||
|
})
|
||||||
|
for retry_count in xrange(4):
|
||||||
|
try:
|
||||||
|
if not hasattr(self, 'connection_pid'):
|
||||||
|
self.connection_pid = active_pid
|
||||||
|
if self.connection_pid != active_pid:
|
||||||
|
self._init_connection()
|
||||||
|
if self.context is None:
|
||||||
|
self._start_connection()
|
||||||
|
|
||||||
|
self.socket.send_json(msg)
|
||||||
|
self.socket.recv()
|
||||||
|
return
|
||||||
|
except Exception, e:
|
||||||
|
self.logger.info('Publish Exception: %r, retry=%d', e,
|
||||||
|
retry_count, exc_info=True)
|
||||||
|
# TODO: Maybe recycle connection here?
|
||||||
|
if retry_count >= 3:
|
||||||
|
raise
|
||||||
|
|
||||||
def _post_rest_api_event(self, event, event_data):
|
def _post_rest_api_event(self, event, event_data):
|
||||||
data = json.dumps({
|
data = json.dumps({
|
||||||
|
|||||||
Reference in New Issue
Block a user