Change zmq socket characterstics to ack response. Fix up unit tests

This commit is contained in:
Matthew Jones
2014-02-21 10:40:53 -05:00
parent 3231f966db
commit 00676c95e0
3 changed files with 10 additions and 9 deletions

View File

@@ -7,6 +7,7 @@ import datetime
import logging import logging
import json import json
import signal import signal
import time
from optparse import make_option from optparse import make_option
from multiprocessing import Process from multiprocessing import Process
@@ -35,7 +36,7 @@ def run_subscriber(consumer_port, queue_port, use_workers=True):
return _handler return _handler
consumer_context = zmq.Context() consumer_context = zmq.Context()
consumer_subscriber = consumer_context.socket(zmq.PULL) consumer_subscriber = consumer_context.socket(zmq.REP)
consumer_subscriber.bind(consumer_port) consumer_subscriber.bind(consumer_port)
queue_context = zmq.Context() queue_context = zmq.Context()
@@ -57,6 +58,7 @@ def run_subscriber(consumer_port, queue_port, use_workers=True):
queue_publisher.send_json(message) queue_publisher.send_json(message)
else: else:
process_job_event(message) process_job_event(message)
consumer_subscriber.send("1")
@transaction.commit_on_success @transaction.commit_on_success
@@ -89,12 +91,12 @@ def process_job_event(data):
break break
except DatabaseError as e: except DatabaseError as e:
transaction.rollback() transaction.rollback()
logger.debug('Database error saving job event, retrying in ' print('Database error saving job event, retrying in '
'1 second (retry #%d): %s', retry_count + 1, e) '1 second (retry #%d): %s', retry_count + 1, e)
time.sleep(1) time.sleep(1)
else: else:
logger.error('Failed to save job event after %d retries.', print('Failed to save job event after %d retries.',
retry_count) retry_count)
def callback_worker(port): def callback_worker(port):
pool_context = zmq.Context() pool_context = zmq.Context()

View File

@@ -911,9 +911,6 @@ class JobStartCancelTest(BaseJobTestMixin, django.test.LiveServerTestCase):
job = self.job_ops_east_run job = self.job_ops_east_run
job.start() job.start()
# Wait for events to filter in since we are using a single consumer
time.sleep(30)
# Check that the job detail has been updated. # Check that the job detail has been updated.
url = reverse('api:job_detail', args=(job.pk,)) url = reverse('api:job_detail', args=(job.pk,))
with self.current_user(self.user_sue): with self.current_user(self.user_sue):

View File

@@ -37,6 +37,7 @@ import os
import sys import sys
import urllib import urllib
import urlparse import urlparse
import time
import requests import requests
@@ -109,7 +110,7 @@ class CallbackModule(object):
def _start_connection(self): def _start_connection(self):
self.context = zmq.Context() self.context = zmq.Context()
self.socket = self.context.socket(zmq.PUSH) self.socket = self.context.socket(zmq.REQ)
self.socket.connect("tcp://127.0.0.1:5556") self.socket.connect("tcp://127.0.0.1:5556")
def _post_job_event_queue_msg(self, event, event_data): def _post_job_event_queue_msg(self, event, event_data):
@@ -134,6 +135,7 @@ class CallbackModule(object):
self._start_connection() self._start_connection()
self.socket.send_json(msg) self.socket.send_json(msg)
self.socket.recv()
return return
except Exception, e: except Exception, e:
self.logger.info('Publish Exception: %r, retry=%d', e, self.logger.info('Publish Exception: %r, retry=%d', e,