diff --git a/awx/api/views.py b/awx/api/views.py index 495faa94df..3183ed2869 100644 --- a/awx/api/views.py +++ b/awx/api/views.py @@ -775,6 +775,7 @@ class UserList(ListCreateAPIView): model = User serializer_class = UserSerializer +@disallow_superuser_escalation class UserMeList(ListAPIView): model = User @@ -849,7 +850,7 @@ class UserActivityStreamList(SubListAPIView): return qs.filter(Q(actor=parent) | Q(user__in=[parent])) - +@disallow_superuser_escalation class UserDetail(RetrieveUpdateDestroyAPIView): model = User diff --git a/awx/main/management/commands/run_callback_receiver.py b/awx/main/management/commands/run_callback_receiver.py index 9f4ee9693f..304f3eefd7 100644 --- a/awx/main/management/commands/run_callback_receiver.py +++ b/awx/main/management/commands/run_callback_receiver.py @@ -35,6 +35,9 @@ class CallbackReceiver(object): def __init__(self): self.parent_mappings = {} + def print_log(self, message): + print("[%s] %s" % (now().isoformat(), message)) + def run_subscriber(self, use_workers=True): def shutdown_handler(active_workers): def _handler(signum, frame): @@ -61,10 +64,10 @@ class CallbackReceiver(object): signal.signal(signal.SIGINT, shutdown_handler([w])) signal.signal(signal.SIGTERM, shutdown_handler([w])) if settings.DEBUG: - print 'Started worker %s' % str(idx) + self.print_log('Started worker %s' % str(idx)) worker_queues.append([0, queue_actual, w]) elif settings.DEBUG: - print 'Started callback receiver (no workers)' + self.print_log('Started callback receiver (no workers)') main_process = Process( target=self.callback_handler, @@ -207,12 +210,12 @@ class CallbackReceiver(object): return job_event except DatabaseError as e: # Log the error and try again. - print('Database error saving job event, retrying in ' - '1 second (retry #%d): %s', retry_count + 1, e) + self.print_log('Database error saving job event, retrying in ' + '1 second (retry #%d): %s', retry_count + 1, e) time.sleep(1) # We failed too many times, and are giving up. - print('Failed to save job event after %d retries.', retry_count) + self.print_log('Failed to save job event after %d retries.', retry_count) return None def callback_worker(self, queue_actual): @@ -222,7 +225,7 @@ class CallbackReceiver(object): self.process_job_event(message) messages_processed += 1 if messages_processed >= MAX_REQUESTS: - print("Shutting down message receiver") + self.print_log("Shutting down message receiver") break class Command(NoArgsCommand): diff --git a/awx/main/management/commands/run_socketio_service.py b/awx/main/management/commands/run_socketio_service.py index e7bd1fd643..20afe8fc86 100644 --- a/awx/main/management/commands/run_socketio_service.py +++ b/awx/main/management/commands/run_socketio_service.py @@ -32,6 +32,9 @@ from socketio import socketio_manage from socketio.server import SocketIOServer from socketio.namespace import BaseNamespace +def print_log(message): + print("[%s] %s" % (now().isoformat(), message)) + class TowerBaseNamespace(BaseNamespace): def get_allowed_methods(self): @@ -67,7 +70,7 @@ class TowerBaseNamespace(BaseNamespace): class TestNamespace(TowerBaseNamespace): def recv_connect(self): - print("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR'])) + print_log("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR'])) self.emit('test', "If you see this then you are connected to the test socket endpoint") class JobNamespace(TowerBaseNamespace): @@ -76,7 +79,7 @@ class JobNamespace(TowerBaseNamespace): return ['summary_complete', 'status_changed'] def recv_connect(self): - print("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR'])) + print_log("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR'])) class JobEventNamespace(TowerBaseNamespace): @@ -87,11 +90,11 @@ class JobEventNamespace(TowerBaseNamespace): else: user_jobs = get_user_queryset(valid_user, Job).filter(finished__isnull=True) visible_jobs = set(['recv_connect'] + ["job_events-%s" % str(j.id) for j in user_jobs]) - print("Visible jobs: " + str(visible_jobs)) + print_log("Visible jobs: " + str(visible_jobs)) return visible_jobs def recv_connect(self): - print("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR'])) + print_log("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR'])) class ScheduleNamespace(TowerBaseNamespace): @@ -99,7 +102,7 @@ class ScheduleNamespace(TowerBaseNamespace): return ["schedule_changed"] def recv_connect(self): - print("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR'])) + print_log("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR'])) class TowerSocket(object): diff --git a/awx/main/management/commands/run_task_system.py b/awx/main/management/commands/run_task_system.py index d83371c150..e44a96a6bf 100644 --- a/awx/main/management/commands/run_task_system.py +++ b/awx/main/management/commands/run_task_system.py @@ -32,6 +32,11 @@ from celery.task.control import inspect queue = FifoQueue('tower_task_manager') + +def print_log(message): + print("[%s] %s" % (now().isoformat(), message)) + + class SimpleDAG(object): ''' A simple implementation of a directed acyclic graph ''' @@ -163,7 +168,7 @@ def rebuild_graph(message): if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'): active_task_queues = inspector.active() else: - print("Ignoring celery task inspector") + print_log("Ignoring celery task inspector") active_task_queues = None all_sorted_tasks = get_tasks() @@ -176,9 +181,9 @@ def rebuild_graph(message): active_tasks += [at['id'] for at in active_task_queues[queue]] else: if settings.DEBUG: - print("Could not communicate with celery!") - # TODO: Something needs to be done here to signal to the system - # as a whole that celery appears to be down. + print_log("Could not communicate with celery!") + # TODO: Something needs to be done here to signal to the system + # as a whole that celery appears to be down. if not hasattr(settings, 'CELERY_UNIT_TEST'): return None running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks) @@ -186,7 +191,7 @@ def rebuild_graph(message): new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks) # Check running tasks and make sure they are active in celery - print("Active celery tasks: " + str(active_tasks)) + print_log("Active celery tasks: " + str(active_tasks)) for task in list(running_tasks): if (task.celery_task_id not in active_tasks and not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')): @@ -200,15 +205,13 @@ def rebuild_graph(message): task.save() task.socketio_emit_status("failed") running_tasks.pop(running_tasks.index(task)) - print("Task %s appears orphaned... marking as failed" % task) + print_log("Task %s appears orphaned... marking as failed" % task) # Create and process dependencies for new tasks for task in new_tasks: - print("Checking dependencies for: %s" % str(task)) - #TODO: other 'new' tasks? Need to investigate this scenario - task_dependencies = task.generate_dependencies(running_tasks + - waiting_tasks) - print("New dependencies: %s" % str(task_dependencies)) + print_log("Checking dependencies for: %s" % str(task)) + task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario + print_log("New dependencies: %s" % str(task_dependencies)) for dep in task_dependencies: # We recalculate the created time for the moment to ensure the # dependencies are always sorted in the right order relative to @@ -247,11 +250,11 @@ def process_graph(graph, task_capacity): running_impact = sum([t['node_object'].task_impact for t in running_nodes]) ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes) remaining_volume = task_capacity - running_impact - print('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' - 'Remaining Capacity: %s' % - (str(running_nodes), str(task_capacity), - str(running_impact), str(remaining_volume))) - print("Ready Nodes: %s" % str(ready_nodes)) + print_log('Running Nodes: %s; Capacity: %s; Running Impact: %s; ' + 'Remaining Capacity: %s' % + (str(running_nodes), str(task_capacity), + str(running_impact), str(remaining_volume))) + print_log("Ready Nodes: %s" % str(ready_nodes)) for task_node in ready_nodes: node_obj = task_node['node_object'] node_args = task_node['metadata'] @@ -278,8 +281,9 @@ def process_graph(graph, task_capacity): continue remaining_volume -= impact running_impact += impact - print('Started Node: %s (capacity hit: %s) Remaining Capacity: %s' % - (str(node_obj), str(impact), str(remaining_volume))) + print_log('Started Node: %s (capacity hit: %s) ' + 'Remaining Capacity: %s' % + (str(node_obj), str(impact), str(remaining_volume))) def run_taskmanager(): """Receive task start and finish signals to rebuild a dependency graph @@ -311,7 +315,7 @@ def run_taskmanager(): # appropriate. if (datetime.datetime.now() - last_rebuild).seconds > 10: if 'pause' in message: - print("Pause command received: %s" % str(message)) + print_log("Pause command received: %s" % str(message)) paused = message['pause'] graph = rebuild_graph(message) if not paused and graph is not None: