mirror of
https://github.com/ansible/awx.git
synced 2026-05-15 13:27:40 -02:30
Merge branch 'master' into expunge-zeromq-unstable
Conflicts: awx/main/management/commands/run_callback_receiver.py awx/main/management/commands/run_task_system.py
This commit is contained in:
@@ -775,6 +775,7 @@ class UserList(ListCreateAPIView):
|
|||||||
model = User
|
model = User
|
||||||
serializer_class = UserSerializer
|
serializer_class = UserSerializer
|
||||||
|
|
||||||
|
@disallow_superuser_escalation
|
||||||
class UserMeList(ListAPIView):
|
class UserMeList(ListAPIView):
|
||||||
|
|
||||||
model = User
|
model = User
|
||||||
@@ -849,7 +850,7 @@ class UserActivityStreamList(SubListAPIView):
|
|||||||
return qs.filter(Q(actor=parent) | Q(user__in=[parent]))
|
return qs.filter(Q(actor=parent) | Q(user__in=[parent]))
|
||||||
|
|
||||||
|
|
||||||
|
@disallow_superuser_escalation
|
||||||
class UserDetail(RetrieveUpdateDestroyAPIView):
|
class UserDetail(RetrieveUpdateDestroyAPIView):
|
||||||
|
|
||||||
model = User
|
model = User
|
||||||
|
|||||||
@@ -35,6 +35,9 @@ class CallbackReceiver(object):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.parent_mappings = {}
|
self.parent_mappings = {}
|
||||||
|
|
||||||
|
def print_log(self, message):
|
||||||
|
print("[%s] %s" % (now().isoformat(), message))
|
||||||
|
|
||||||
def run_subscriber(self, use_workers=True):
|
def run_subscriber(self, use_workers=True):
|
||||||
def shutdown_handler(active_workers):
|
def shutdown_handler(active_workers):
|
||||||
def _handler(signum, frame):
|
def _handler(signum, frame):
|
||||||
@@ -61,10 +64,10 @@ class CallbackReceiver(object):
|
|||||||
signal.signal(signal.SIGINT, shutdown_handler([w]))
|
signal.signal(signal.SIGINT, shutdown_handler([w]))
|
||||||
signal.signal(signal.SIGTERM, shutdown_handler([w]))
|
signal.signal(signal.SIGTERM, shutdown_handler([w]))
|
||||||
if settings.DEBUG:
|
if settings.DEBUG:
|
||||||
print 'Started worker %s' % str(idx)
|
self.print_log('Started worker %s' % str(idx))
|
||||||
worker_queues.append([0, queue_actual, w])
|
worker_queues.append([0, queue_actual, w])
|
||||||
elif settings.DEBUG:
|
elif settings.DEBUG:
|
||||||
print 'Started callback receiver (no workers)'
|
self.print_log('Started callback receiver (no workers)')
|
||||||
|
|
||||||
main_process = Process(
|
main_process = Process(
|
||||||
target=self.callback_handler,
|
target=self.callback_handler,
|
||||||
@@ -207,12 +210,12 @@ class CallbackReceiver(object):
|
|||||||
return job_event
|
return job_event
|
||||||
except DatabaseError as e:
|
except DatabaseError as e:
|
||||||
# Log the error and try again.
|
# Log the error and try again.
|
||||||
print('Database error saving job event, retrying in '
|
self.print_log('Database error saving job event, retrying in '
|
||||||
'1 second (retry #%d): %s', retry_count + 1, e)
|
'1 second (retry #%d): %s', retry_count + 1, e)
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
# We failed too many times, and are giving up.
|
# We failed too many times, and are giving up.
|
||||||
print('Failed to save job event after %d retries.', retry_count)
|
self.print_log('Failed to save job event after %d retries.', retry_count)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def callback_worker(self, queue_actual):
|
def callback_worker(self, queue_actual):
|
||||||
@@ -222,7 +225,7 @@ class CallbackReceiver(object):
|
|||||||
self.process_job_event(message)
|
self.process_job_event(message)
|
||||||
messages_processed += 1
|
messages_processed += 1
|
||||||
if messages_processed >= MAX_REQUESTS:
|
if messages_processed >= MAX_REQUESTS:
|
||||||
print("Shutting down message receiver")
|
self.print_log("Shutting down message receiver")
|
||||||
break
|
break
|
||||||
|
|
||||||
class Command(NoArgsCommand):
|
class Command(NoArgsCommand):
|
||||||
|
|||||||
@@ -32,6 +32,9 @@ from socketio import socketio_manage
|
|||||||
from socketio.server import SocketIOServer
|
from socketio.server import SocketIOServer
|
||||||
from socketio.namespace import BaseNamespace
|
from socketio.namespace import BaseNamespace
|
||||||
|
|
||||||
|
def print_log(message):
|
||||||
|
print("[%s] %s" % (now().isoformat(), message))
|
||||||
|
|
||||||
class TowerBaseNamespace(BaseNamespace):
|
class TowerBaseNamespace(BaseNamespace):
|
||||||
|
|
||||||
def get_allowed_methods(self):
|
def get_allowed_methods(self):
|
||||||
@@ -67,7 +70,7 @@ class TowerBaseNamespace(BaseNamespace):
|
|||||||
class TestNamespace(TowerBaseNamespace):
|
class TestNamespace(TowerBaseNamespace):
|
||||||
|
|
||||||
def recv_connect(self):
|
def recv_connect(self):
|
||||||
print("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
print_log("Received client connect for test namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||||
self.emit('test', "If you see this then you are connected to the test socket endpoint")
|
self.emit('test', "If you see this then you are connected to the test socket endpoint")
|
||||||
|
|
||||||
class JobNamespace(TowerBaseNamespace):
|
class JobNamespace(TowerBaseNamespace):
|
||||||
@@ -76,7 +79,7 @@ class JobNamespace(TowerBaseNamespace):
|
|||||||
return ['summary_complete', 'status_changed']
|
return ['summary_complete', 'status_changed']
|
||||||
|
|
||||||
def recv_connect(self):
|
def recv_connect(self):
|
||||||
print("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
print_log("Received client connect for job namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||||
|
|
||||||
class JobEventNamespace(TowerBaseNamespace):
|
class JobEventNamespace(TowerBaseNamespace):
|
||||||
|
|
||||||
@@ -87,11 +90,11 @@ class JobEventNamespace(TowerBaseNamespace):
|
|||||||
else:
|
else:
|
||||||
user_jobs = get_user_queryset(valid_user, Job).filter(finished__isnull=True)
|
user_jobs = get_user_queryset(valid_user, Job).filter(finished__isnull=True)
|
||||||
visible_jobs = set(['recv_connect'] + ["job_events-%s" % str(j.id) for j in user_jobs])
|
visible_jobs = set(['recv_connect'] + ["job_events-%s" % str(j.id) for j in user_jobs])
|
||||||
print("Visible jobs: " + str(visible_jobs))
|
print_log("Visible jobs: " + str(visible_jobs))
|
||||||
return visible_jobs
|
return visible_jobs
|
||||||
|
|
||||||
def recv_connect(self):
|
def recv_connect(self):
|
||||||
print("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
print_log("Received client connect for job event namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||||
|
|
||||||
class ScheduleNamespace(TowerBaseNamespace):
|
class ScheduleNamespace(TowerBaseNamespace):
|
||||||
|
|
||||||
@@ -99,7 +102,7 @@ class ScheduleNamespace(TowerBaseNamespace):
|
|||||||
return ["schedule_changed"]
|
return ["schedule_changed"]
|
||||||
|
|
||||||
def recv_connect(self):
|
def recv_connect(self):
|
||||||
print("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
print_log("Received client connect for schedule namespace from %s" % str(self.environ['REMOTE_ADDR']))
|
||||||
|
|
||||||
class TowerSocket(object):
|
class TowerSocket(object):
|
||||||
|
|
||||||
|
|||||||
@@ -32,6 +32,11 @@ from celery.task.control import inspect
|
|||||||
|
|
||||||
queue = FifoQueue('tower_task_manager')
|
queue = FifoQueue('tower_task_manager')
|
||||||
|
|
||||||
|
|
||||||
|
def print_log(message):
|
||||||
|
print("[%s] %s" % (now().isoformat(), message))
|
||||||
|
|
||||||
|
|
||||||
class SimpleDAG(object):
|
class SimpleDAG(object):
|
||||||
''' A simple implementation of a directed acyclic graph '''
|
''' A simple implementation of a directed acyclic graph '''
|
||||||
|
|
||||||
@@ -163,7 +168,7 @@ def rebuild_graph(message):
|
|||||||
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
|
if not hasattr(settings, 'IGNORE_CELERY_INSPECTOR'):
|
||||||
active_task_queues = inspector.active()
|
active_task_queues = inspector.active()
|
||||||
else:
|
else:
|
||||||
print("Ignoring celery task inspector")
|
print_log("Ignoring celery task inspector")
|
||||||
active_task_queues = None
|
active_task_queues = None
|
||||||
|
|
||||||
all_sorted_tasks = get_tasks()
|
all_sorted_tasks = get_tasks()
|
||||||
@@ -176,9 +181,9 @@ def rebuild_graph(message):
|
|||||||
active_tasks += [at['id'] for at in active_task_queues[queue]]
|
active_tasks += [at['id'] for at in active_task_queues[queue]]
|
||||||
else:
|
else:
|
||||||
if settings.DEBUG:
|
if settings.DEBUG:
|
||||||
print("Could not communicate with celery!")
|
print_log("Could not communicate with celery!")
|
||||||
# TODO: Something needs to be done here to signal to the system
|
# TODO: Something needs to be done here to signal to the system
|
||||||
# as a whole that celery appears to be down.
|
# as a whole that celery appears to be down.
|
||||||
if not hasattr(settings, 'CELERY_UNIT_TEST'):
|
if not hasattr(settings, 'CELERY_UNIT_TEST'):
|
||||||
return None
|
return None
|
||||||
running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks)
|
running_tasks = filter(lambda t: t.status == 'running', all_sorted_tasks)
|
||||||
@@ -186,7 +191,7 @@ def rebuild_graph(message):
|
|||||||
new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks)
|
new_tasks = filter(lambda t: t.status == 'pending', all_sorted_tasks)
|
||||||
|
|
||||||
# Check running tasks and make sure they are active in celery
|
# Check running tasks and make sure they are active in celery
|
||||||
print("Active celery tasks: " + str(active_tasks))
|
print_log("Active celery tasks: " + str(active_tasks))
|
||||||
for task in list(running_tasks):
|
for task in list(running_tasks):
|
||||||
if (task.celery_task_id not in active_tasks and
|
if (task.celery_task_id not in active_tasks and
|
||||||
not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
not hasattr(settings, 'IGNORE_CELERY_INSPECTOR')):
|
||||||
@@ -200,15 +205,13 @@ def rebuild_graph(message):
|
|||||||
task.save()
|
task.save()
|
||||||
task.socketio_emit_status("failed")
|
task.socketio_emit_status("failed")
|
||||||
running_tasks.pop(running_tasks.index(task))
|
running_tasks.pop(running_tasks.index(task))
|
||||||
print("Task %s appears orphaned... marking as failed" % task)
|
print_log("Task %s appears orphaned... marking as failed" % task)
|
||||||
|
|
||||||
# Create and process dependencies for new tasks
|
# Create and process dependencies for new tasks
|
||||||
for task in new_tasks:
|
for task in new_tasks:
|
||||||
print("Checking dependencies for: %s" % str(task))
|
print_log("Checking dependencies for: %s" % str(task))
|
||||||
#TODO: other 'new' tasks? Need to investigate this scenario
|
task_dependencies = task.generate_dependencies(running_tasks + waiting_tasks) #TODO: other 'new' tasks? Need to investigate this scenario
|
||||||
task_dependencies = task.generate_dependencies(running_tasks +
|
print_log("New dependencies: %s" % str(task_dependencies))
|
||||||
waiting_tasks)
|
|
||||||
print("New dependencies: %s" % str(task_dependencies))
|
|
||||||
for dep in task_dependencies:
|
for dep in task_dependencies:
|
||||||
# We recalculate the created time for the moment to ensure the
|
# We recalculate the created time for the moment to ensure the
|
||||||
# dependencies are always sorted in the right order relative to
|
# dependencies are always sorted in the right order relative to
|
||||||
@@ -247,11 +250,11 @@ def process_graph(graph, task_capacity):
|
|||||||
running_impact = sum([t['node_object'].task_impact for t in running_nodes])
|
running_impact = sum([t['node_object'].task_impact for t in running_nodes])
|
||||||
ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes)
|
ready_nodes = filter(lambda x: x['node_object'].status != 'running', leaf_nodes)
|
||||||
remaining_volume = task_capacity - running_impact
|
remaining_volume = task_capacity - running_impact
|
||||||
print('Running Nodes: %s; Capacity: %s; Running Impact: %s; '
|
print_log('Running Nodes: %s; Capacity: %s; Running Impact: %s; '
|
||||||
'Remaining Capacity: %s' %
|
'Remaining Capacity: %s' %
|
||||||
(str(running_nodes), str(task_capacity),
|
(str(running_nodes), str(task_capacity),
|
||||||
str(running_impact), str(remaining_volume)))
|
str(running_impact), str(remaining_volume)))
|
||||||
print("Ready Nodes: %s" % str(ready_nodes))
|
print_log("Ready Nodes: %s" % str(ready_nodes))
|
||||||
for task_node in ready_nodes:
|
for task_node in ready_nodes:
|
||||||
node_obj = task_node['node_object']
|
node_obj = task_node['node_object']
|
||||||
node_args = task_node['metadata']
|
node_args = task_node['metadata']
|
||||||
@@ -278,8 +281,9 @@ def process_graph(graph, task_capacity):
|
|||||||
continue
|
continue
|
||||||
remaining_volume -= impact
|
remaining_volume -= impact
|
||||||
running_impact += impact
|
running_impact += impact
|
||||||
print('Started Node: %s (capacity hit: %s) Remaining Capacity: %s' %
|
print_log('Started Node: %s (capacity hit: %s) '
|
||||||
(str(node_obj), str(impact), str(remaining_volume)))
|
'Remaining Capacity: %s' %
|
||||||
|
(str(node_obj), str(impact), str(remaining_volume)))
|
||||||
|
|
||||||
def run_taskmanager():
|
def run_taskmanager():
|
||||||
"""Receive task start and finish signals to rebuild a dependency graph
|
"""Receive task start and finish signals to rebuild a dependency graph
|
||||||
@@ -311,7 +315,7 @@ def run_taskmanager():
|
|||||||
# appropriate.
|
# appropriate.
|
||||||
if (datetime.datetime.now() - last_rebuild).seconds > 10:
|
if (datetime.datetime.now() - last_rebuild).seconds > 10:
|
||||||
if 'pause' in message:
|
if 'pause' in message:
|
||||||
print("Pause command received: %s" % str(message))
|
print_log("Pause command received: %s" % str(message))
|
||||||
paused = message['pause']
|
paused = message['pause']
|
||||||
graph = rebuild_graph(message)
|
graph = rebuild_graph(message)
|
||||||
if not paused and graph is not None:
|
if not paused and graph is not None:
|
||||||
|
|||||||
Reference in New Issue
Block a user