From 00676c95e0d8f4d65b7eba705f7c731aa79c79d7 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 21 Feb 2014 10:40:53 -0500 Subject: [PATCH] Change zmq socket characterstics to ack response. Fix up unit tests --- .../management/commands/run_callback_receiver.py | 12 +++++++----- awx/main/tests/jobs.py | 3 --- awx/plugins/callback/job_event_callback.py | 4 +++- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index da04d82940..f514684477 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -7,6 +7,7 @@ import datetime import logging import json import signal +import time from optparse import make_option from multiprocessing import Process @@ -35,7 +36,7 @@ def run_subscriber(consumer_port, queue_port, use_workers=True): return _handler consumer_context = zmq.Context() - consumer_subscriber = consumer_context.socket(zmq.PULL) + consumer_subscriber = consumer_context.socket(zmq.REP) consumer_subscriber.bind(consumer_port) queue_context = zmq.Context() @@ -57,6 +58,7 @@ def run_subscriber(consumer_port, queue_port, use_workers=True): queue_publisher.send_json(message) else: process_job_event(message) + consumer_subscriber.send("1") @transaction.commit_on_success @@ -89,12 +91,12 @@ def process_job_event(data): break except DatabaseError as e: transaction.rollback() - logger.debug('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) + print('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) time.sleep(1) else: - logger.error('Failed to save job event after %d retries.', - retry_count) + print('Failed to save job event after %d retries.', + retry_count) def callback_worker(port): pool_context = zmq.Context() diff --git a/awx/main/tests/jobs.py b/awx/main/tests/jobs.py index 6f21d3ba5e..b638528700 100644 --- a/awx/main/tests/jobs.py +++ b/awx/main/tests/jobs.py @@ -911,9 +911,6 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase): job = self.job_ops_east_run job.start() - # Wait for events to filter in since we are using a single consumer - time.sleep(30) - # Check that the job detail has been updated. url = reverse('api:job_detail', args=(job.pk,)) with self.current_user(self.user_sue): diff --git a/awx/plugins/callback/job_event_callback.py b/awx/plugins/callback/job_event_callback.py index 1f74042ff9..93a671d899 100644 --- a/awx/plugins/callback/job_event_callback.py +++ b/awx/plugins/callback/job_event_callback.py @@ -37,6 +37,7 @@ import os import sys import urllib import urlparse +import time import requests @@ -109,7 +110,7 @@ class CallbackModule(object): def _start_connection(self): self.context = zmq.Context() - self.socket = self.context.socket(zmq.PUSH) + self.socket = self.context.socket(zmq.REQ) self.socket.connect("tcp://127.0.0.1:5556") def _post_job_event_queue_msg(self, event, event_data): @@ -134,6 +135,7 @@ class CallbackModule(object): self._start_connection() self.socket.send_json(msg) + self.socket.recv() return except Exception, e: self.logger.info('Publish Exception: %r, retry=%d', e,