mirror of
https://github.com/ansible/awx.git
synced 2026-01-20 22:18:01 -03:30
Add timestamp to logs for prints in management commands so we pick them
up the supervisor logs
This commit is contained in:
parent
c4e2463db1
commit
ba1b9b9136
@ -36,6 +36,9 @@ class CallbackReceiver(object):
|
||||
def __init__(self):
|
||||
self.parent_mappings = {}
|
||||
|
||||
def print_log(self, message):
|
||||
print("[%s] %s" % (now().isoformat(), message))
|
||||
|
||||
def run_subscriber(self, consumer_port, queue_port, use_workers=True):
|
||||
def shutdown_handler(active_workers):
|
||||
def _handler(signum, frame):
|
||||
@ -62,10 +65,10 @@ class CallbackReceiver(object):
|
||||
signal.signal(signal.SIGINT, shutdown_handler([w]))
|
||||
signal.signal(signal.SIGTERM, shutdown_handler([w]))
|
||||
if settings.DEBUG:
|
||||
print 'Started worker %s' % str(idx)
|
||||
self.print_log('Started worker %s' % str(idx))
|
||||
worker_queues.append([0, queue_actual, w])
|
||||
elif settings.DEBUG:
|
||||
print 'Started callback receiver (no workers)'
|
||||
self.print_log('Started callback receiver (no workers)')
|
||||
|
||||
main_process = Process(target=self.callback_handler, args=(use_workers, consumer_port, worker_queues,))
|
||||
main_process.daemon = True
|
||||
@ -209,12 +212,12 @@ class CallbackReceiver(object):
|
||||
return job_event
|
||||
except DatabaseError as e:
|
||||
# Log the error and try again.
|
||||
print('Database error saving job event, retrying in '
|
||||
'1 second (retry #%d): %s', retry_count + 1, e)
|
||||
self.print_log('Database error saving job event, retrying in '
|
||||
'1 second (retry #%d): %s', retry_count + 1, e)
|
||||
time.sleep(1)
|
||||
|
||||
# We failed too many times, and are giving up.
|
||||
print('Failed to save job event after %d retries.', retry_count)
|
||||
self.print_log('Failed to save job event after %d retries.', retry_count)
|
||||
return None
|
||||
|
||||
def callback_worker(self, queue_actual):
|
||||
@ -224,7 +227,7 @@ class CallbackReceiver(object):
|
||||
self.process_job_event(message)
|
||||
messages_processed += 1
|
||||
if messages_processed >= MAX_REQUESTS:
|
||||
print("Shutting down message receiver")
|
||||
self.print_log("Shutting down message receiver")
|
||||
break
|
||||
|
||||
class Command(NoArgsCommand):
|
||||
|
||||
@ -34,6 +34,9 @@ from socketio import socketio_manage
|
||||
from socketio.server import SocketIOServer
|
||||
from socketio.namespace import BaseNamespace
|
||||
|
||||
def print_log(message):
|
||||
print("[%s] %s" % (now().isoformat(), message))
|
||||
|
||||
class TowerBaseNamespace(BaseNamespace):
|
||||
|
||||
def get_allowed_methods(self):
|
||||
@ -69,7 +72,7 @@ class TowerBaseNamespace(BaseNamespace):
|
||||
class TestNamespace(TowerBaseNamespace):
|
||||
|
||||
def recv_connect(self):
|
||||
print("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||
print_log("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||
self.emit('test', "If you see this then you are connected to the test socket endpoint")
|
||||
|
||||
class JobNamespace(TowerBaseNamespace):
|
||||
@ -78,7 +81,7 @@ class JobNamespace(TowerBaseNamespace):
|
||||
return ['summary_complete', 'status_changed']
|
||||
|
||||
def recv_connect(self):
|
||||
print("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||
print_log("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||
|
||||
class JobEventNamespace(TowerBaseNamespace):
|
||||
|
||||
@ -89,11 +92,11 @@ class JobEventNamespace(TowerBaseNamespace):
|
||||
else:
|
||||
user_jobs = get_user_queryset(valid_user, Job).filter(finished__isnull=True)
|
||||
visible_jobs = set(['recv_connect'] + ["job_events-%s" % str(j.id) for j in user_jobs])
|
||||
print("Visible jobs: " + str(visible_jobs))
|
||||
print_log("Visible jobs: " + str(visible_jobs))
|
||||
return visible_jobs
|
||||
|
||||
def recv_connect(self):
|
||||
print("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||
print_log("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||
|
||||
class ScheduleNamespace(TowerBaseNamespace):
|
||||
|
||||
@ -101,7 +104,7 @@ class ScheduleNamespace(TowerBaseNamespace):
|
||||
return ["schedule_changed"]
|
||||
|
||||
def recv_connect(self):
|
||||
print("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||
print_log("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||
|
||||
class TowerSocket(object):
|
||||
|
||||
|
||||
@ -31,6 +31,9 @@ import zmq
|
||||
# Celery
|
||||
from celery.task.control import inspect
|
||||
|
||||
def print_log(message):
|
||||
print("[%s] %s" % (now().isoformat(), message))
|
||||
|
||||
class SimpleDAG(object):
|
||||
''' A simple implementation of a directed acyclic graph '''
|
||||
|
||||
@ -148,7 +151,7 @@ def rebuild_graph(message):
|
||||
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
|
||||
active_task_queues = inspector.active()
|
||||
else:
|
||||
print("Ignoring celery task inspector")
|
||||
print_log("Ignoring celery task inspector")
|
||||
active_task_queues = None
|
||||
|
||||
all_sorted_tasks = get_tasks()
|
||||
@ -161,7 +164,7 @@ def rebuild_graph(message):
|
||||
active_tasks += [at['id'] for at in active_task_queues[queue]]
|
||||
else:
|
||||
if settings.DEBUG:
|
||||
print("Could not communicate with celery!")
|
||||
print_log("Could not communicate with celery!")
|
||||
# TODO: Something needs to be done here to signal to the system as a whole that celery appears to be down
|
||||
if not hasattr(settings, 'CELERY_UNIT_TEST'):
|
||||
return None
|
||||
@ -170,7 +173,7 @@ def rebuild_graph(message):
|
||||
new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks)
|
||||
|
||||
# Check running tasks and make sure they are active in celery
|
||||
print("Active celery tasks: " + str(active_tasks))
|
||||
print_log("Active celery tasks: " + str(active_tasks))
|
||||
for task in list(running_tasks):
|
||||
if task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
|
||||
# NOTE: Pull status again and make sure it didn't finish in the meantime?
|
||||
@ -179,13 +182,13 @@ def rebuild_graph(message):
|
||||
task.save()
|
||||
task.socketio_emit_status("failed")
|
||||
running_tasks.pop(running_tasks.index(task))
|
||||
print("Task %s appears orphaned... marking as failed" % task)
|
||||
print_log("Task %s appears orphaned... marking as failed" % task)
|
||||
|
||||
# Create and process dependencies for new tasks
|
||||
for task in new_tasks:
|
||||
print("Checking dependencies for: %s" % str(task))
|
||||
print_log("Checking dependencies for: %s" % str(task))
|
||||
task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario
|
||||
print("New dependencies: %s" % str(task_dependencies))
|
||||
print_log("New dependencies: %s" % str(task_dependencies))
|
||||
for dep in task_dependencies:
|
||||
# We recalculate the created time for the moment to ensure the dependencies are always sorted in the right order relative to the dependent task
|
||||
time_delt = len(task_dependencies) - task_dependencies.index(dep)
|
||||
@ -220,11 +223,11 @@ def process_graph(graph, task_capacity):
|
||||
running_impact = sum([t['node_object'].task_impact for t in running_nodes])
|
||||
ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes)
|
||||
remaining_volume = task_capacity - running_impact
|
||||
print("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes),
|
||||
print_log("Running Nodes: %s; Capacity: %s; Running Impact: %s; Remaining Capacity: %s" % (str(running_nodes),
|
||||
str(task_capacity),
|
||||
str(running_impact),
|
||||
str(remaining_volume)))
|
||||
print("Ready Nodes: %s" % str(ready_nodes))
|
||||
print_log("Ready Nodes: %s" % str(ready_nodes))
|
||||
for task_node in ready_nodes:
|
||||
node_obj = task_node['node_object']
|
||||
node_args = task_node['metadata']
|
||||
@ -248,7 +251,7 @@ def process_graph(graph, task_capacity):
|
||||
continue
|
||||
remaining_volume -= impact
|
||||
running_impact += impact
|
||||
print("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume)))
|
||||
print_log("Started Node: %s (capacity hit: %s) Remaining Capacity: %s" % (str(node_obj), str(impact), str(remaining_volume)))
|
||||
|
||||
def run_taskmanager(command_port):
|
||||
''' Receive task start and finish signals to rebuild a dependency graph and manage the actual running of tasks '''
|
||||
@ -264,7 +267,7 @@ def run_taskmanager(command_port):
|
||||
command_context = zmq.Context()
|
||||
command_socket = command_context.socket(zmq.PULL)
|
||||
command_socket.bind(command_port)
|
||||
print("Listening on %s" % command_port)
|
||||
print_log("Listening on %s" % command_port)
|
||||
last_rebuild = datetime.datetime.fromtimestamp(0)
|
||||
while True:
|
||||
try:
|
||||
@ -273,7 +276,7 @@ def run_taskmanager(command_port):
|
||||
message = None
|
||||
if message is not None or (datetime.datetime.now() - last_rebuild).seconds > 10:
|
||||
if message is not None and 'pause' in message:
|
||||
print("Pause command received: %s" % str(message))
|
||||
print_log("Pause command received: %s" % str(message))
|
||||
paused = message['pause']
|
||||
graph = rebuild_graph(message)
|
||||
if not paused and graph is not None:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user