mirror of
https://github.com/ansible/awx.git
synced 2026-04-06 18:49:21 -02:30
Implement fact receiver thread worker
Optionally allow processing of fact receiver messages in a worker thread. This works around an issue where data could take a while to page into Mongo and cross the zmq socket timeout.
This commit is contained in:
@@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
# Python
|
# Python
|
||||||
import logging
|
import logging
|
||||||
|
from threading import Thread
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
@@ -76,14 +77,18 @@ class FactCacheReceiver(object):
|
|||||||
(fact_obj, version_obj) = Fact.add_fact(self.timestamp, facts, host, module)
|
(fact_obj, version_obj) = Fact.add_fact(self.timestamp, facts, host, module)
|
||||||
logger.info('Created new fact <fact, fact_version> <%s, %s>' % (fact_obj.id, version_obj.id))
|
logger.info('Created new fact <fact, fact_version> <%s, %s>' % (fact_obj.id, version_obj.id))
|
||||||
|
|
||||||
def run_receiver(self):
|
def run_receiver(self, use_processing_threads=True):
|
||||||
with Socket('fact_cache', 'r') as facts:
|
with Socket('fact_cache', 'r') as facts:
|
||||||
for message in facts.listen():
|
for message in facts.listen():
|
||||||
if 'host' not in message or 'facts' not in message or 'date_key' not in message:
|
if 'host' not in message or 'facts' not in message or 'date_key' not in message:
|
||||||
logger.warn('Received invalid message %s' % message)
|
logger.warn('Received invalid message %s' % message)
|
||||||
continue
|
continue
|
||||||
logger.info('Received message %s' % message)
|
logger.info('Received message %s' % message)
|
||||||
self.process_fact_message(message)
|
if use_processing_threads:
|
||||||
|
wt = Thread(target=self.process_fact_message, args=(message,))
|
||||||
|
wt.start()
|
||||||
|
else:
|
||||||
|
self.process_fact_message(message)
|
||||||
|
|
||||||
class Command(NoArgsCommand):
|
class Command(NoArgsCommand):
|
||||||
'''
|
'''
|
||||||
|
|||||||
@@ -142,7 +142,7 @@ class RunFactCacheReceiverUnitTest(BaseTest, MongoDBRequired):
|
|||||||
|
|
||||||
receiver = FactCacheReceiver()
|
receiver = FactCacheReceiver()
|
||||||
receiver.process_fact_message = MagicMock(name='process_fact_message')
|
receiver.process_fact_message = MagicMock(name='process_fact_message')
|
||||||
receiver.run_receiver()
|
receiver.run_receiver(use_processing_threads=False)
|
||||||
|
|
||||||
receiver.process_fact_message.assert_called_once_with(TEST_MSG)
|
receiver.process_fact_message.assert_called_once_with(TEST_MSG)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user