From ba1b9b913615cc2064810a0a81efbf39330d80d9 Mon Sep 17 00:00:00 2001 From: Matthew Jones Date: Tue, 11 Nov 2014 11:46:51 -0500 Subject: [PATCH] Add timestamp to logs for prints in management commands so we pick them up the supervisor logs --- .../commands/run_callback_receiver.py | 15 ++++++----- .../commands/run_socketio_service.py | 13 ++++++---- .../management/commands/run_task_system.py | 25 +++++++++++-------- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 3eeadaa8b0..e3a9822629 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -36,6 +36,9 @@ class CallbackReceiver(object): def __init__(self): self.parent_mappings = {} + def print_log(self, message): + print("[%s] %s" % (now().isoformat(), message)) + def run_subscriber(self, consumer_port, queue_port, use_workers=True): def shutdown_handler(active_workers): def _handler(signum, frame): @@ -62,10 +65,10 @@ class CallbackReceiver(object): signal.signal(signal.SIGINT, shutdown_handler([w])) signal.signal(signal.SIGTERM, shutdown_handler([w])) if settings.DEBUG: - print 'Started worker %s' % str(idx) + self.print_log('Started worker %s' % str(idx)) worker_queues.append([0, queue_actual, w]) elif settings.DEBUG: - print 'Started callback receiver (no workers)' + self.print_log('Started callback receiver (no workers)') main_process = Process(target=self.callback_handler, args=(use_workers, consumer_port, worker_queues,)) main_process.daemon = True @@ -209,12 +212,12 @@ class CallbackReceiver(object): return job_event except DatabaseError as e: # Log the error and try again. - print('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) + self.print_log('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) time.sleep(1) # We failed too many times, and are giving up. - print('Failed to save job event after %d retries.', retry_count) + self.print_log('Failed to save job event after %d retries.', retry_count) return None def callback_worker(self, queue_actual): @@ -224,7 +227,7 @@ class CallbackReceiver(object): self.process_job_event(message) messages_processed += 1 if messages_processed >= MAX_REQUESTS: - print("Shutting down message receiver") + self.print_log("Shutting down message receiver") break class Command(NoArgsCommand): diff --git a/awx/main/management/commands/run_socketio_service.py b/awx/main/management/commands/run_socketio_service.py index c2b0869227..a77ec84aff 100644 --- a/awx/main/management/commands/run_socketio_service.py +++ b/awx/main/management/commands/run_socketio_service.py @@ -34,6 +34,9 @@ from socketio import socketio_manage from socketio.server import SocketIOServer from socketio.namespace import BaseNamespace +def print_log(message): + print("[%s] %s" % (now().isoformat(), message)) + class TowerBaseNamespace(BaseNamespace): def get_allowed_methods(self): @@ -69,7 +72,7 @@ class TowerBaseNamespace(BaseNamespace): class TestNamespace(TowerBaseNamespace): def recv_connect(self): - print("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR'])) + print_log("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR'])) self.emit('test', "If you see this then you are connected to the test socket endpoint") class JobNamespace(TowerBaseNamespace): @@ -78,7 +81,7 @@ class JobNamespace(TowerBaseNamespace): return ['summary_complete', 'status_changed'] def recv_connect(self): - print("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR'])) + print_log("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR'])) class JobEventNamespace(TowerBaseNamespace): @@ -89,11 +92,11 @@ class JobEventNamespace(TowerBaseNamespace): else: user_jobs = get_user_queryset(valid_user, Job).filter(finished__isnull=True) visible_jobs = set(['recv_connect'] + ["job_events-%s" % str(j.id) for j in user_jobs]) - print("Visible jobs: " + str(visible_jobs)) + print_log("Visible jobs: " + str(visible_jobs)) return visible_jobs def recv_connect(self): - print("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR'])) + print_log("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR'])) class ScheduleNamespace(TowerBaseNamespace): @@ -101,7 +104,7 @@ class ScheduleNamespace(TowerBaseNamespace): return ["schedule_changed"] def recv_connect(self): - print("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR'])) + print_log("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR'])) class TowerSocket(object): diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index c3cf1a1452..b6d97e2636 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -31,6 +31,9 @@ import zmq # Celery from celery.task.control import inspect +def print_log(message): + print("[%s] %s" % (now().isoformat(), message)) + class SimpleDAG(object): ''' A simple implementation of a directed acyclic graph ''' @@ -148,7 +151,7 @@ def rebuild_graph(message): if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): active_task_queues = inspector.active() else: - print("Ignoring celery task inspector") + print_log("Ignoring celery task inspector") active_task_queues = None all_sorted_tasks = get_tasks() @@ -161,7 +164,7 @@ def rebuild_graph(message): active_tasks += [at['id'] for at in active_task_queues[queue]] else: if settings.DEBUG: - print("Could not communicate with celery!") + print_log("Could not communicate with celery!") # TODO: Something needs to be done here to signal to the system as a whole that celery appears to be down if not hasattr(settings, 'CELERY_UNIT_TEST'): return None @@ -170,7 +173,7 @@ def rebuild_graph(message): new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) # Check running tasks and make sure they are active in celery - print("Active celery tasks: " + str(active_tasks)) + print_log("Active celery tasks: " + str(active_tasks)) for task in list(running_tasks): if task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): # NOTE: Pull status again and make sure it didn't finish in the meantime? @@ -179,13 +182,13 @@ def rebuild_graph(message): task.save() task.socketio_emit_status("failed") running_tasks.pop(running_tasks.index(task)) - print("Task %s appears orphaned... marking as failed" % task) + print_log("Task %s appears orphaned... marking as failed" % task) # Create and process dependencies for new tasks for task in new_tasks: - print("Checking dependencies for: %s" % str(task)) + print_log("Checking dependencies for: %s" % str(task)) task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario - print("New dependencies: %s" % str(task_dependencies)) + print_log("New dependencies: %s" % str(task_dependencies)) for dep in task_dependencies: # We recalculate the created time for the moment to ensure the dependencies are always sorted in the right order relative to the dependent task time_delt = len(task_dependencies) - task_dependencies.index(dep) @@ -220,11 +223,11 @@ def process_graph(graph, task_capacity): running_impact = sum([t['node_object'].task_impact for t in running_nodes]) ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) remaining_volume = task_capacity - running_impact - print("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes), + print_log("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes), str(task_capacity), str(running_impact), str(remaining_volume))) - print("Ready Nodes: %s" % str(ready_nodes)) + print_log("Ready Nodes: %s" % str(ready_nodes)) for task_node in ready_nodes: node_obj = task_node['node_object'] node_args = task_node['metadata'] @@ -248,7 +251,7 @@ def process_graph(graph, task_capacity): continue remaining_volume -= impact running_impact += impact - print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume))) + print_log("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume))) def run_taskmanager(command_port): ''' Receive task start and finish signals to rebuild a dependency graph and manage the actual running of tasks ''' @@ -264,7 +267,7 @@ def run_taskmanager(command_port): command_context = zmq.Context() command_socket = command_context.socket(zmq.PULL) command_socket.bind(command_port) - print("Listening on %s" % command_port) + print_log("Listening on %s" % command_port) last_rebuild = datetime.datetime.fromtimestamp(0) while True: try: @@ -273,7 +276,7 @@ def run_taskmanager(command_port): message = None if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 10: if message is not None and 'pause' in message: - print("Pause command received: %s" % str(message)) + print_log("Pause command received: %s" % str(message)) paused = message['pause'] graph = rebuild_graph(message) if not paused and graph is not None: