From fed4262ee2dcd928e73b181b1095b1fb4659795d Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Thu, 19 Feb 2015 11:27:40 -0500 Subject: [PATCH 1/6] Some light implimentation details for basic fact caching and collection --- .../commands/run_callback_receiver.py | 4 +- .../commands/run_fact_cache_receiver.py | 27 ++++++++++++++ awx/main/tasks.py | 4 ++ awx/playbooks/scan_facts.yml | 5 +++ awx/plugins/fact_caching/tower.py | 27 ++++++++++++++ awx/plugins/library/scan_packages.py | 37 +++++++++++++++++++ 6 files changed, 102 insertions(+), 2 deletions(-) create mode 100644 awx/main/management/commands/run_fact_cache_receiver.py create mode 100644 awx/playbooks/scan_facts.yml create mode 100644 awx/plugins/fact_caching/tower.py create mode 100755 awx/plugins/library/scan_packages.py diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index e883a1963c..6bad56ef03 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -89,7 +89,7 @@ class CallbackReceiver(object): queue_worker[2] = w if workers_changed: signal.signal(signal.SIGINT, shutdown_handler([p[2] for p in worker_queues] + [main_process])) - signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) + signal.signal(signal.SIGTERM, shutdown_handler([p[2] for p in worker_queues] + [main_process])) if not main_process.is_alive(): sys.exit(1) time.sleep(0.1) @@ -241,7 +241,7 @@ class Command(NoArgsCommand): Save Job Callback receiver (see awx.plugins.callbacks.job_event_callback) Runs as a management command and receives job save events. It then hands them off to worker processors (see Worker) which writes them to the database - ''' + ''' help = 'Launch the job callback receiver' def handle_noargs(self, **options): diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py new file mode 100644 index 0000000000..0ed5b1e921 --- /dev/null +++ b/awx/main/management/commands/run_fact_cache_receiver.py @@ -0,0 +1,27 @@ +# Copyright (c) 2015 Ansible, Inc. +# All Rights Reserved + +import logging + +from django.core.management.base import NoArgsCommand + +from awx.main.models import * # noqa + +logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver') + +class FactCacheReceiver(object): + pass + +class Command(NoArgsCommand): + ''' + blah blah + ''' + help = 'Launch the Fact Cache Receiver' + + def handle_noargs(self, **options): + fcr = FactCacheReceiver() + try: + fcr.run_receiver() + except KeyboardInterrupt: + pass + diff --git a/awx/main/tasks.py b/awx/main/tasks.py index dde73b2159..5e3393f417 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -550,6 +550,10 @@ class RunJob(BaseTask): env['JOB_ID'] = str(job.pk) env['INVENTORY_ID'] = str(job.inventory.pk) env['ANSIBLE_CALLBACK_PLUGINS'] = plugin_dir + # TODO: env['ANSIBLE_LIBRARY'] # plugins/library + # TODO: env['ANSIBLE_CACHE_PLUGINS'] # plugins/fact_caching + # TODD: env['ANSIBLE_CACHE_PLUGIN'] # tower + # TODO: env['ANSIBLE_CACHE_PLUGIN_CONNECTION'] # connection to tower service env['REST_API_URL'] = settings.INTERNAL_API_URL env['REST_API_TOKEN'] = job.task_auth_token or '' env['CALLBACK_CONSUMER_PORT'] = str(settings.CALLBACK_CONSUMER_PORT) diff --git a/awx/playbooks/scan_facts.yml b/awx/playbooks/scan_facts.yml new file mode 100644 index 0000000000..252c284631 --- /dev/null +++ b/awx/playbooks/scan_facts.yml @@ -0,0 +1,5 @@ +- hosts: all + gather_facts: no + tasks: + - scan_packages: + diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py new file mode 100644 index 0000000000..1f4122d493 --- /dev/null +++ b/awx/plugins/fact_caching/tower.py @@ -0,0 +1,27 @@ +from ansible.cache.base import BaseCacheModule + +class TowerCacheModule(BaseCacheModule): + + def __init__(self, *args, **kwargs): + pass + + def get(self, key): + pass + + def set(self, key, value): + pass + + def keys(self): + pass + + def contains(self, key): + pass + + def delete(self, key): + pass + + def flush(self): + pass + + def copy(self): + pass diff --git a/awx/plugins/library/scan_packages.py b/awx/plugins/library/scan_packages.py new file mode 100755 index 0000000000..00188d686f --- /dev/null +++ b/awx/plugins/library/scan_packages.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python + +import os +from ansible.module_utils.basic import * + +def rpm_package_list(): + import rpm + trans_set = rpm.TransactionSet() + installed_packages = [] + for package in trans_set.dbMatch(): + installed_packages.append({'name': package['name'], + 'version': "%s" % (package['version'])}) + return installed_packages + +def deb_package_list(): + import apt + apt_cache = apt.Cache() + installed_packages = [] + apt_installed_packages = [pk for pk in apt_cache.keys() if apt_cache[pk].is_installed] + for package in apt_installed_packages: + installed_packages.append({'name': package, + 'version': apt_cache[package].installed.version}) + return installed_packages + +def main(): + module = AnsibleModule( + argument_spec = dict()) + + packages = [] + if os.path.exists("/etc/redhat-release"): + packages = rpm_package_list() + elif os.path.exists("/etc/os-release"): + packages = deb_package_list() + results = dict(ansible_facts=dict(packages=packages)) + module.exit_json(**results) + +main() From fd1668dfd837e2808b3558e5bc6d5c919411b210 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 20 Feb 2015 16:56:35 -0500 Subject: [PATCH 2/6] Implement fact caching service and mongo reference handler --- .../commands/run_fact_cache_receiver.py | 24 ++++++- awx/main/socket.py | 1 + awx/playbooks/scan_facts.yml | 1 - awx/plugins/fact_caching/tower.py | 63 ++++++++++++++++--- awx/settings/defaults.py | 2 + 5 files changed, 82 insertions(+), 9 deletions(-) mode change 100644 => 100755 awx/plugins/fact_caching/tower.py diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py index 0ed5b1e921..1526062c47 100644 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ b/awx/main/management/commands/run_fact_cache_receiver.py @@ -4,13 +4,35 @@ import logging from django.core.management.base import NoArgsCommand +from django.utils.timezone import now from awx.main.models import * # noqa +from awx.main.socket import Socket + +from pymongo import MongoClient logger = logging.getLogger('awx.main.commands.run_fact_cache_receiver') class FactCacheReceiver(object): - pass + + def __init__(self): + self.client = MongoClient('localhost', 27017) + + def process_fact_message(self, message): + host = message['host'] + facts = message['facts'] + host_db = self.client.host_facts + host_collection = host_db[host] + facts.update(dict(tower_host=host, datetime=now())) + host_collection.insert(facts) + + def run_receiver(self): + with Socket('fact_cache', 'r') as facts: + for message in facts.listen(): + print("Message received: " + str(message)) + if 'host' not in message or 'facts' not in message: + continue + self.process_fact_message(message) class Command(NoArgsCommand): ''' diff --git a/awx/main/socket.py b/awx/main/socket.py index 679b5f5fd0..39b252ae26 100644 --- a/awx/main/socket.py +++ b/awx/main/socket.py @@ -63,6 +63,7 @@ class Socket(object): settings.CALLBACK_CONSUMER_PORT), 'task_commands': settings.TASK_COMMAND_PORT, 'websocket': settings.SOCKETIO_NOTIFICATION_PORT, + 'fact_cache': settings.FACT_CACHE_PORT, }[self._bucket] def connect(self): diff --git a/awx/playbooks/scan_facts.yml b/awx/playbooks/scan_facts.yml index 252c284631..8e12ba75a3 100644 --- a/awx/playbooks/scan_facts.yml +++ b/awx/playbooks/scan_facts.yml @@ -1,5 +1,4 @@ - hosts: all - gather_facts: no tasks: - scan_packages: diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py old mode 100644 new mode 100755 index 1f4122d493..73a44b33a0 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -1,21 +1,70 @@ +# Copyright (c) 2015 Ansible, Inc. +# This file is a utility Ansible plugin that is not part of the AWX or Ansible +# packages. It does not import any code from either package, nor does its +# license apply to Ansible or AWX. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# Neither the name of the nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import sys +from ansible import constants as C from ansible.cache.base import BaseCacheModule -class TowerCacheModule(BaseCacheModule): +try: + import zmq +except Import: + print("pyzmq is required") + sys.exit(1) + +class CacheModule(BaseCacheModule): def __init__(self, *args, **kwargs): - pass + + # This is the local tower zmq connection + self._tower_connection = C.CACHE_PLUGIN_CONNECTION + try: + self.context = zmq.Context() + self.socket = self.context.socket(zmq.REQ) + self.socket.connect(self._tower_connection) + except Exception, e: + print("Connection to zeromq failed at %s" % str(self._tower_connection)) + sys.exit(1) def get(self, key): - pass + return {} # Temporary until we have some tower retrieval endpoints def set(self, key, value): - pass + self.socket.send_json(dict(host=key, facts=value)) + self.socket.recv() def keys(self): - pass + return [] def contains(self, key): - pass + return False def delete(self, key): pass @@ -24,4 +73,4 @@ class TowerCacheModule(BaseCacheModule): pass def copy(self): - pass + return dict() diff --git a/awx/settings/defaults.py b/awx/settings/defaults.py index fed14e4fa5..9f8a3b923e 100644 --- a/awx/settings/defaults.py +++ b/awx/settings/defaults.py @@ -510,6 +510,8 @@ TASK_COMMAND_PORT = 6559 SOCKETIO_NOTIFICATION_PORT = 6557 SOCKETIO_LISTEN_PORT = 8080 +FACT_CACHE_PORT = 6564 + ORG_ADMINS_CAN_SEE_ALL_USERS = True # Logging configuration. From 70f2e5ea1d849fcd1416ee6b5344b4f48d00c2f0 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Fri, 20 Feb 2015 17:19:19 -0500 Subject: [PATCH 3/6] Handle unifying the datetime value as a key that we can update on the tower side --- .../management/commands/run_fact_cache_receiver.py | 14 ++++++++++---- awx/plugins/fact_caching/tower.py | 4 +++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/awx/main/management/commands/run_fact_cache_receiver.py b/awx/main/management/commands/run_fact_cache_receiver.py index 1526062c47..29742cae3e 100644 --- a/awx/main/management/commands/run_fact_cache_receiver.py +++ b/awx/main/management/commands/run_fact_cache_receiver.py @@ -4,7 +4,6 @@ import logging from django.core.management.base import NoArgsCommand -from django.utils.timezone import now from awx.main.models import * # noqa from awx.main.socket import Socket @@ -21,16 +20,23 @@ class FactCacheReceiver(object): def process_fact_message(self, message): host = message['host'] facts = message['facts'] + date_key = message['date_key'] host_db = self.client.host_facts host_collection = host_db[host] - facts.update(dict(tower_host=host, datetime=now())) - host_collection.insert(facts) + facts.update(dict(tower_host=host, datetime=date_key)) + rec = host_collection.find({"datetime": date_key}) + if rec.count(): + this_fact = rec.next() + this_fact.update(facts) + host_collection.save(this_fact) + else: + host_collection.insert(facts) def run_receiver(self): with Socket('fact_cache', 'r') as facts: for message in facts.listen(): print("Message received: " + str(message)) - if 'host' not in message or 'facts' not in message: + if 'host' not in message or 'facts' not in message or 'date_key' not in message: continue self.process_fact_message(message) diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py index 73a44b33a0..0d9a5297b5 100755 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -30,6 +30,7 @@ # POSSIBILITY OF SUCH DAMAGE. import sys +import datetime from ansible import constants as C from ansible.cache.base import BaseCacheModule @@ -45,6 +46,7 @@ class CacheModule(BaseCacheModule): # This is the local tower zmq connection self._tower_connection = C.CACHE_PLUGIN_CONNECTION + self.date_key = datetime.datetime.utcnow() try: self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) @@ -57,7 +59,7 @@ class CacheModule(BaseCacheModule): return {} # Temporary until we have some tower retrieval endpoints def set(self, key, value): - self.socket.send_json(dict(host=key, facts=value)) + self.socket.send_json(dict(host=key, facts=value, date_key=self.date_key)) self.socket.recv() def keys(self): From 0db749ea19af95ef883fd42afe9e70cf2174fbe8 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Mon, 23 Feb 2015 10:11:10 -0500 Subject: [PATCH 4/6] Fix up date formatting when emitting facts to the fact service --- awx/plugins/fact_caching/tower.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py index 0d9a5297b5..bb35ebaec4 100755 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -30,13 +30,14 @@ # POSSIBILITY OF SUCH DAMAGE. import sys +import time import datetime from ansible import constants as C from ansible.cache.base import BaseCacheModule try: import zmq -except Import: +except ImportError: print("pyzmq is required") sys.exit(1) @@ -46,7 +47,7 @@ class CacheModule(BaseCacheModule): # This is the local tower zmq connection self._tower_connection = C.CACHE_PLUGIN_CONNECTION - self.date_key = datetime.datetime.utcnow() + self.date_key = time.mktime(datetime.datetime.utcnow().timetuple()) try: self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) From 3615f8a634cfc103723c1027370737a338431900 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 25 Feb 2015 13:31:14 -0500 Subject: [PATCH 5/6] Update package fact format after discussions with core team --- awx/plugins/fact_caching/tower.py | 3 ++- awx/plugins/library/scan_packages.py | 23 +++++++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/awx/plugins/fact_caching/tower.py b/awx/plugins/fact_caching/tower.py index bb35ebaec4..1072ae90f8 100755 --- a/awx/plugins/fact_caching/tower.py +++ b/awx/plugins/fact_caching/tower.py @@ -53,7 +53,8 @@ class CacheModule(BaseCacheModule): self.socket = self.context.socket(zmq.REQ) self.socket.connect(self._tower_connection) except Exception, e: - print("Connection to zeromq failed at %s" % str(self._tower_connection)) + print("Connection to zeromq failed at %s with error: %s" % (str(self._tower_connection), + str(e))) sys.exit(1) def get(self, key): diff --git a/awx/plugins/library/scan_packages.py b/awx/plugins/library/scan_packages.py index 00188d686f..5d5737d5c4 100755 --- a/awx/plugins/library/scan_packages.py +++ b/awx/plugins/library/scan_packages.py @@ -6,20 +6,31 @@ from ansible.module_utils.basic import * def rpm_package_list(): import rpm trans_set = rpm.TransactionSet() - installed_packages = [] + installed_packages = {} for package in trans_set.dbMatch(): - installed_packages.append({'name': package['name'], - 'version': "%s" % (package['version'])}) + package_details = dict(name=package[rpm.RPMTAG_NAME], + version=package[rpm.RPMTAG_VERSION], + release=package[rpm.RPMTAG_RELEASE], + epoch=package[rpm.RPMTAG_EPOCH], + arch=package[rpm.RPMTAG_ARCH], + source='rpm') + if package['name'] not in installed_packages: + installed_packages[package['name']] = [package_details] + else: + installed_packages[package['name']].append(package_details) return installed_packages def deb_package_list(): import apt apt_cache = apt.Cache() - installed_packages = [] + installed_packages = {} apt_installed_packages = [pk for pk in apt_cache.keys() if apt_cache[pk].is_installed] for package in apt_installed_packages: - installed_packages.append({'name': package, - 'version': apt_cache[package].installed.version}) + ac_pkg = apt_cache[package].installed + package_details = dict(name=package, + version=ac_pkg.version, + architecture=ac_pkg.architecture) + installed_packages[package] = [package_details] return installed_packages def main(): From b179517564ec744ee0b02d54e3da26fa2f4993ae Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Wed, 25 Feb 2015 13:44:54 -0500 Subject: [PATCH 6/6] "source" for apt --- awx/plugins/library/scan_packages.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/awx/plugins/library/scan_packages.py b/awx/plugins/library/scan_packages.py index 5d5737d5c4..620b3dc36f 100755 --- a/awx/plugins/library/scan_packages.py +++ b/awx/plugins/library/scan_packages.py @@ -29,7 +29,8 @@ def deb_package_list(): ac_pkg = apt_cache[package].installed package_details = dict(name=package, version=ac_pkg.version, - architecture=ac_pkg.architecture) + architecture=ac_pkg.architecture, + source='apt') installed_packages[package] = [package_details] return installed_packages