Implement support for ad hoc commands.

This commit is contained in:
Chris Church
2015-03-30 13:04:19 -04:00
parent d9aa35566b
commit f7b8d510dc
26 changed files with 2530 additions and 221 deletions

View File

@@ -117,7 +117,9 @@ class CallbackReceiver(object):
with Socket('callbacks', 'r') as callbacks:
for message in callbacks.listen():
total_messages += 1
if not use_workers:
if 'ad_hoc_command_id' in message:
self.process_ad_hoc_event(message)
elif not use_workers:
self.process_job_event(message)
else:
job_parent_events = last_parent_events.get(message['job_id'], {})
@@ -216,10 +218,68 @@ class CallbackReceiver(object):
# Retrun the job event object.
return job_event
except DatabaseError as e:
# Log the error and try again.
# Log the error and bail out.
logger.error('Database error saving job event: %s', e)
return None
@transaction.atomic
def process_ad_hoc_event(self, data):
# Sanity check: Do we need to do anything at all?
event = data.get('event', '')
if not event or 'ad_hoc_command_id' not in data:
return
# Get the correct "verbose" value from the job.
# If for any reason there's a problem, just use 0.
try:
verbose = AdHocCommand.objects.get(id=data['ad_hoc_command_id']).verbosity
except Exception, e:
verbose = 0
# Convert the datetime for the job event's creation appropriately,
# and include a time zone for it.
#
# In the event of any issue, throw it out, and Django will just save
# the current time.
try:
if not isinstance(data['created'], datetime.datetime):
data['created'] = parse_datetime(data['created'])
if not data['created'].tzinfo:
data['created'] = data['created'].replace(tzinfo=FixedOffset(0))
except (KeyError, ValueError):
data.pop('created', None)
# Print the data to stdout if we're in DEBUG mode.
if settings.DEBUG:
print data
# Sanity check: Don't honor keys that we don't recognize.
for key in data.keys():
if key not in ('ad_hoc_command_id', 'event', 'event_data',
'created', 'counter'):
data.pop(key)
# Save any modifications to the ad hoc command event to the database.
# If we get a database error of some kind, bail out.
try:
# If we're not in verbose mode, wipe out any module
# arguments. FIXME: Needed for adhoc?
res = data['event_data'].get('res', {})
if isinstance(res, dict):
i = res.get('invocation', {})
if verbose == 0 and 'module_args' in i:
i['module_args'] = ''
# Create a new AdHocCommandEvent object.
ad_hoc_command_event = AdHocCommandEvent.objects.create(**data)
# Retrun the ad hoc comamnd event object.
return ad_hoc_command_event
except DatabaseError as e:
# Log the error and bail out.
logger.error('Database error saving ad hoc command event: %s', e)
return None
def callback_worker(self, queue_actual, idx):
messages_processed = 0
while True:

View File

@@ -90,6 +90,12 @@ class JobEventNamespace(TowerBaseNamespace):
logger.info("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR']))
super(JobEventNamespace, self).recv_connect()
class AdHocCommandEventNamespace(TowerBaseNamespace):
def recv_connect(self):
logger.info("Received client connect for ad hoc command event namespace from %s" % str(self.environ['REMOTE_ADDR']))
super(AdHocCommandEventNamespace, self).recv_connect()
class ScheduleNamespace(TowerBaseNamespace):
def get_allowed_methods(self):
@@ -107,6 +113,7 @@ class TowerSocket(object):
socketio_manage(environ, {'/socket.io/test': TestNamespace,
'/socket.io/jobs': JobNamespace,
'/socket.io/job_events': JobEventNamespace,
'/socket.io/ad_hoc_command_events': AdHocCommandEventNamespace,
'/socket.io/schedules': ScheduleNamespace})
else:
logger.warn("Invalid connect path received: " + path)

View File

@@ -48,6 +48,8 @@ class SimpleDAG(object):
def short_string_obj(obj):
if type(obj) == Job:
type_str = "Job"
if type(obj) == AdHocCommand:
type_str = "AdHocCommand"
elif type(obj) == InventoryUpdate:
type_str = "Inventory"
elif type(obj) == ProjectUpdate:
@@ -100,6 +102,8 @@ class SimpleDAG(object):
def get_node_type(self, obj):
if type(obj) == Job:
return "job"
elif type(obj) == AdHocCommand:
return "ad_hoc_command"
elif type(obj) == InventoryUpdate:
return "inventory_update"
elif type(obj) == ProjectUpdate:
@@ -136,13 +140,14 @@ def get_tasks():
RELEVANT_JOBS = ('pending', 'waiting', 'running')
# TODO: Replace this when we can grab all objects in a sane way.
graph_jobs = [j for j in Job.objects.filter(status__in=RELEVANT_JOBS)]
graph_ad_hoc_commands = [ahc for ahc in AdHocCommand.objects.filter(status__in=RELEVANT_JOBS)]
graph_inventory_updates = [iu for iu in
InventoryUpdate.objects.filter(status__in=RELEVANT_JOBS)]
graph_project_updates = [pu for pu in
ProjectUpdate.objects.filter(status__in=RELEVANT_JOBS)]
graph_system_jobs = [sj for sj in
SystemJob.objects.filter(status__in=RELEVANT_JOBS)]
all_actions = sorted(graph_jobs + graph_inventory_updates +
all_actions = sorted(graph_jobs + graph_ad_hoc_commands + graph_inventory_updates +
graph_project_updates + graph_system_jobs,
key=lambda task: task.created)
return all_actions