mirror of
https://github.com/ansible/awx.git
synced 2026-02-21 05:00:07 -03:30
Keep callback receiver working
* remove any code that is not used by the call back receiver
This commit is contained in:
@@ -33,45 +33,7 @@ class Control(object):
|
|||||||
workers.append(r.get(key).decode('utf-8'))
|
workers.append(r.get(key).decode('utf-8'))
|
||||||
return '\n'.join(workers)
|
return '\n'.join(workers)
|
||||||
|
|
||||||
def running(self, *args, **kwargs):
|
|
||||||
return self.control_with_reply('running', *args, **kwargs)
|
|
||||||
|
|
||||||
def cancel(self, task_ids, with_reply=True):
|
|
||||||
if with_reply:
|
|
||||||
return self.control_with_reply('cancel', extra_data={'task_ids': task_ids})
|
|
||||||
else:
|
|
||||||
self.control({'control': 'cancel', 'task_ids': task_ids, 'reply_to': None}, extra_data={'task_ids': task_ids})
|
|
||||||
|
|
||||||
def schedule(self, *args, **kwargs):
|
|
||||||
return self.control_with_reply('schedule', *args, **kwargs)
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def generate_reply_queue_name(cls):
|
def generate_reply_queue_name(cls):
|
||||||
return f"reply_to_{str(uuid.uuid4()).replace('-','_')}"
|
return f"reply_to_{str(uuid.uuid4()).replace('-','_')}"
|
||||||
|
|
||||||
def control_with_reply(self, command, timeout=5, extra_data=None):
|
|
||||||
logger.warning('checking {} {} for {}'.format(self.service, command, self.queuename))
|
|
||||||
reply_queue = Control.generate_reply_queue_name()
|
|
||||||
self.result = None
|
|
||||||
|
|
||||||
if not connection.get_autocommit():
|
|
||||||
raise RuntimeError('Control-with-reply messages can only be done in autocommit mode')
|
|
||||||
|
|
||||||
with pg_bus_conn(select_timeout=timeout) as conn:
|
|
||||||
conn.listen(reply_queue)
|
|
||||||
send_data = {'control': command, 'reply_to': reply_queue}
|
|
||||||
if extra_data:
|
|
||||||
send_data.update(extra_data)
|
|
||||||
conn.notify(self.queuename, json.dumps(send_data))
|
|
||||||
|
|
||||||
for reply in conn.events(yield_timeouts=True):
|
|
||||||
if reply is None:
|
|
||||||
logger.error(f'{self.service} did not reply within {timeout}s')
|
|
||||||
raise RuntimeError(f"{self.service} did not reply within {timeout}s")
|
|
||||||
break
|
|
||||||
|
|
||||||
return json.loads(reply.payload)
|
|
||||||
|
|
||||||
def control(self, msg, **kwargs):
|
|
||||||
with pg_bus_conn() as conn:
|
|
||||||
conn.notify(self.queuename, json.dumps(msg))
|
|
||||||
|
|||||||
@@ -1,13 +1,10 @@
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import random
|
|
||||||
import signal
|
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
import traceback
|
import traceback
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from uuid import uuid4
|
from uuid import uuid4
|
||||||
import json
|
|
||||||
|
|
||||||
import collections
|
import collections
|
||||||
from multiprocessing import Process
|
from multiprocessing import Process
|
||||||
@@ -17,19 +14,9 @@ from queue import Full as QueueFull, Empty as QueueEmpty
|
|||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.db import connection as django_connection, connections
|
from django.db import connection as django_connection, connections
|
||||||
from django.core.cache import cache as django_cache
|
from django.core.cache import cache as django_cache
|
||||||
from django.utils.timezone import now as tz_now
|
|
||||||
from django_guid import set_guid
|
|
||||||
from jinja2 import Template
|
from jinja2 import Template
|
||||||
import psutil
|
import psutil
|
||||||
|
|
||||||
from ansible_base.lib.logging.runtime import log_excess_runtime
|
|
||||||
|
|
||||||
from awx.main.models import UnifiedJob
|
|
||||||
from awx.main.dispatch import reaper
|
|
||||||
from awx.main.utils.common import get_mem_effective_capacity, get_corrected_memory, get_corrected_cpu, get_cpu_effective_capacity
|
|
||||||
|
|
||||||
# ansible-runner
|
|
||||||
from ansible_runner.utils.capacity import get_mem_in_bytes, get_cpu_count
|
|
||||||
|
|
||||||
if 'run_callback_receiver' in sys.argv:
|
if 'run_callback_receiver' in sys.argv:
|
||||||
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
||||||
@@ -37,8 +24,6 @@ else:
|
|||||||
logger = logging.getLogger('awx.main.dispatch')
|
logger = logging.getLogger('awx.main.dispatch')
|
||||||
|
|
||||||
|
|
||||||
RETIRED_SENTINEL_TASK = "[retired]"
|
|
||||||
|
|
||||||
|
|
||||||
class NoOpResultQueue(object):
|
class NoOpResultQueue(object):
|
||||||
def put(self, item):
|
def put(self, item):
|
||||||
@@ -94,7 +79,7 @@ class PoolWorker(object):
|
|||||||
if self.retiring:
|
if self.retiring:
|
||||||
uuid = body.get('uuid', 'N/A') if isinstance(body, dict) else 'N/A'
|
uuid = body.get('uuid', 'N/A') if isinstance(body, dict) else 'N/A'
|
||||||
logger.info(f"Worker pid:{self.pid} is retiring. Refusing new task {uuid}.")
|
logger.info(f"Worker pid:{self.pid} is retiring. Refusing new task {uuid}.")
|
||||||
raise QueueFull("Worker is retiring and not accepting new tasks") # AutoscalePool.write handles QueueFull
|
raise QueueFull("Worker is retiring and not accepting new tasks")
|
||||||
uuid = '?'
|
uuid = '?'
|
||||||
if isinstance(body, dict):
|
if isinstance(body, dict):
|
||||||
if not body.get('uuid'):
|
if not body.get('uuid'):
|
||||||
@@ -164,8 +149,6 @@ class PoolWorker(object):
|
|||||||
# the purpose of self.managed_tasks is to just track internal
|
# the purpose of self.managed_tasks is to just track internal
|
||||||
# state of which events are *currently* being processed.
|
# state of which events are *currently* being processed.
|
||||||
logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid))
|
logger.warning('Event UUID {} appears to be have been duplicated.'.format(uuid))
|
||||||
if self.retiring:
|
|
||||||
self.managed_tasks[RETIRED_SENTINEL_TASK] = {'task': RETIRED_SENTINEL_TASK}
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def current_task(self):
|
def current_task(self):
|
||||||
@@ -213,10 +196,6 @@ class PoolWorker(object):
|
|||||||
return not self.busy
|
return not self.busy
|
||||||
|
|
||||||
|
|
||||||
class StatefulPoolWorker(PoolWorker):
|
|
||||||
track_managed_tasks = True
|
|
||||||
|
|
||||||
|
|
||||||
class WorkerPool(object):
|
class WorkerPool(object):
|
||||||
"""
|
"""
|
||||||
Creates a pool of forked PoolWorkers.
|
Creates a pool of forked PoolWorkers.
|
||||||
@@ -328,256 +307,3 @@ class WorkerPool(object):
|
|||||||
except Exception:
|
except Exception:
|
||||||
logger.exception('could not kill {}'.format(worker.pid))
|
logger.exception('could not kill {}'.format(worker.pid))
|
||||||
|
|
||||||
|
|
||||||
def get_auto_max_workers():
|
|
||||||
"""Method we normally rely on to get max_workers
|
|
||||||
|
|
||||||
Uses almost same logic as Instance.local_health_check
|
|
||||||
The important thing is to be MORE than Instance.capacity
|
|
||||||
so that the task-manager does not over-schedule this node
|
|
||||||
|
|
||||||
Ideally we would just use the capacity from the database plus reserve workers,
|
|
||||||
but this poses some bootstrap problems where OCP task containers
|
|
||||||
register themselves after startup
|
|
||||||
"""
|
|
||||||
# Get memory from ansible-runner
|
|
||||||
total_memory_gb = get_mem_in_bytes()
|
|
||||||
|
|
||||||
# This may replace memory calculation with a user override
|
|
||||||
corrected_memory = get_corrected_memory(total_memory_gb)
|
|
||||||
|
|
||||||
# Get same number as max forks based on memory, this function takes memory as bytes
|
|
||||||
mem_capacity = get_mem_effective_capacity(corrected_memory, is_control_node=True)
|
|
||||||
|
|
||||||
# Follow same process for CPU capacity constraint
|
|
||||||
cpu_count = get_cpu_count()
|
|
||||||
corrected_cpu = get_corrected_cpu(cpu_count)
|
|
||||||
cpu_capacity = get_cpu_effective_capacity(corrected_cpu, is_control_node=True)
|
|
||||||
|
|
||||||
# Here is what is different from health checks,
|
|
||||||
auto_max = max(mem_capacity, cpu_capacity)
|
|
||||||
|
|
||||||
# add magic number of extra workers to ensure
|
|
||||||
# we have a few extra workers to run the heartbeat
|
|
||||||
auto_max += 7
|
|
||||||
|
|
||||||
return auto_max
|
|
||||||
|
|
||||||
|
|
||||||
class AutoscalePool(WorkerPool):
|
|
||||||
"""
|
|
||||||
An extended pool implementation that automatically scales workers up and
|
|
||||||
down based on demand
|
|
||||||
"""
|
|
||||||
|
|
||||||
pool_cls = StatefulPoolWorker
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self.max_workers = kwargs.pop('max_workers', None)
|
|
||||||
self.max_worker_lifetime_seconds = kwargs.pop(
|
|
||||||
'max_worker_lifetime_seconds', getattr(settings, 'WORKER_MAX_LIFETIME_SECONDS', 14400)
|
|
||||||
) # Default to 4 hours
|
|
||||||
super(AutoscalePool, self).__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
if self.max_workers is None:
|
|
||||||
self.max_workers = get_auto_max_workers()
|
|
||||||
|
|
||||||
# max workers can't be less than min_workers
|
|
||||||
self.max_workers = max(self.min_workers, self.max_workers)
|
|
||||||
|
|
||||||
# the task manager enforces settings.TASK_MANAGER_TIMEOUT on its own
|
|
||||||
# but if the task takes longer than the time defined here, we will force it to stop here
|
|
||||||
self.task_manager_timeout = settings.TASK_MANAGER_TIMEOUT + settings.TASK_MANAGER_TIMEOUT_GRACE_PERIOD
|
|
||||||
|
|
||||||
# initialize some things for subsystem metrics periodic gathering
|
|
||||||
# the AutoscalePool class does not save these to redis directly, but reports via produce_subsystem_metrics
|
|
||||||
self.scale_up_ct = 0
|
|
||||||
self.worker_count_max = 0
|
|
||||||
|
|
||||||
# last time we wrote current tasks, to avoid too much log spam
|
|
||||||
self.last_task_list_log = time.monotonic()
|
|
||||||
|
|
||||||
def produce_subsystem_metrics(self, metrics_object):
|
|
||||||
metrics_object.set('dispatcher_pool_scale_up_events', self.scale_up_ct)
|
|
||||||
metrics_object.set('dispatcher_pool_active_task_count', sum(len(w.managed_tasks) for w in self.workers))
|
|
||||||
metrics_object.set('dispatcher_pool_max_worker_count', self.worker_count_max)
|
|
||||||
self.worker_count_max = len(self.workers)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def should_grow(self):
|
|
||||||
if len(self.workers) < self.min_workers:
|
|
||||||
# If we don't have at least min_workers, add more
|
|
||||||
return True
|
|
||||||
# If every worker is busy doing something, add more
|
|
||||||
return all([w.busy for w in self.workers])
|
|
||||||
|
|
||||||
@property
|
|
||||||
def full(self):
|
|
||||||
return len(self.workers) == self.max_workers
|
|
||||||
|
|
||||||
@property
|
|
||||||
def debug_meta(self):
|
|
||||||
return 'min={} max={}'.format(self.min_workers, self.max_workers)
|
|
||||||
|
|
||||||
@log_excess_runtime(logger, debug_cutoff=0.05, cutoff=0.2)
|
|
||||||
def cleanup(self):
|
|
||||||
"""
|
|
||||||
Perform some internal account and cleanup. This is run on
|
|
||||||
every cluster node heartbeat:
|
|
||||||
|
|
||||||
1. Discover worker processes that exited, and recover messages they
|
|
||||||
were handling.
|
|
||||||
2. Clean up unnecessary, idle workers.
|
|
||||||
|
|
||||||
IMPORTANT: this function is one of the few places in the dispatcher
|
|
||||||
(aside from setting lookups) where we talk to the database. As such,
|
|
||||||
if there's an outage, this method _can_ throw various
|
|
||||||
django.db.utils.Error exceptions. Act accordingly.
|
|
||||||
"""
|
|
||||||
orphaned = []
|
|
||||||
for w in self.workers[::]:
|
|
||||||
is_retirement_age = self.max_worker_lifetime_seconds is not None and w.age > self.max_worker_lifetime_seconds
|
|
||||||
if not w.alive:
|
|
||||||
# the worker process has exited
|
|
||||||
# 1. take the task it was running and enqueue the error
|
|
||||||
# callbacks
|
|
||||||
# 2. take any pending tasks delivered to its queue and
|
|
||||||
# send them to another worker
|
|
||||||
logger.error('worker pid:{} is gone (exit={})'.format(w.pid, w.exitcode))
|
|
||||||
if w.current_task:
|
|
||||||
if w.current_task == {'task': RETIRED_SENTINEL_TASK}:
|
|
||||||
logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age))
|
|
||||||
self.workers.remove(w)
|
|
||||||
continue
|
|
||||||
if w.current_task != 'QUIT':
|
|
||||||
try:
|
|
||||||
for j in UnifiedJob.objects.filter(celery_task_id=w.current_task['uuid']):
|
|
||||||
reaper.reap_job(j, 'failed')
|
|
||||||
except Exception:
|
|
||||||
logger.exception('failed to reap job UUID {}'.format(w.current_task['uuid']))
|
|
||||||
else:
|
|
||||||
logger.warning(f'Worker was told to quit but has not, pid={w.pid}')
|
|
||||||
orphaned.extend(w.orphaned_tasks)
|
|
||||||
self.workers.remove(w)
|
|
||||||
|
|
||||||
elif w.idle and len(self.workers) > self.min_workers:
|
|
||||||
# the process has an empty queue (it's idle) and we have
|
|
||||||
# more processes in the pool than we need (> min)
|
|
||||||
# send this process a message so it will exit gracefully
|
|
||||||
# at the next opportunity
|
|
||||||
logger.debug('scaling down worker pid:{}'.format(w.pid))
|
|
||||||
w.quit()
|
|
||||||
self.workers.remove(w)
|
|
||||||
|
|
||||||
elif w.idle and is_retirement_age:
|
|
||||||
logger.debug('scaling down worker pid:{} due to worker age: {}'.format(w.pid, w.age))
|
|
||||||
w.quit()
|
|
||||||
self.workers.remove(w)
|
|
||||||
|
|
||||||
elif is_retirement_age and not w.retiring and not w.idle:
|
|
||||||
logger.info(
|
|
||||||
f"Worker pid:{w.pid} (age: {w.age:.0f}s) exceeded max lifetime ({self.max_worker_lifetime_seconds:.0f}s). "
|
|
||||||
"Signaling for graceful retirement."
|
|
||||||
)
|
|
||||||
# Send QUIT signal; worker will finish current task then exit.
|
|
||||||
w.quit()
|
|
||||||
# mark as retiring to reject any future tasks that might be assigned in meantime
|
|
||||||
w.retiring = True
|
|
||||||
|
|
||||||
if w.alive:
|
|
||||||
# if we discover a task manager invocation that's been running
|
|
||||||
# too long, reap it (because otherwise it'll just hold the postgres
|
|
||||||
# advisory lock forever); the goal of this code is to discover
|
|
||||||
# deadlocks or other serious issues in the task manager that cause
|
|
||||||
# the task manager to never do more work
|
|
||||||
current_task = w.current_task
|
|
||||||
if current_task and isinstance(current_task, dict):
|
|
||||||
endings = ('tasks.task_manager', 'tasks.dependency_manager', 'tasks.workflow_manager')
|
|
||||||
current_task_name = current_task.get('task', '')
|
|
||||||
if current_task_name.endswith(endings):
|
|
||||||
if 'started' not in current_task:
|
|
||||||
w.managed_tasks[current_task['uuid']]['started'] = time.time()
|
|
||||||
age = time.time() - current_task['started']
|
|
||||||
w.managed_tasks[current_task['uuid']]['age'] = age
|
|
||||||
if age > self.task_manager_timeout:
|
|
||||||
logger.error(f'{current_task_name} has held the advisory lock for {age}, sending SIGUSR1 to {w.pid}')
|
|
||||||
os.kill(w.pid, signal.SIGUSR1)
|
|
||||||
|
|
||||||
for m in orphaned:
|
|
||||||
# if all the workers are dead, spawn at least one
|
|
||||||
if not len(self.workers):
|
|
||||||
self.up()
|
|
||||||
idx = random.choice(range(len(self.workers)))
|
|
||||||
self.write(idx, m)
|
|
||||||
|
|
||||||
def add_bind_kwargs(self, body):
|
|
||||||
bind_kwargs = body.pop('bind_kwargs', [])
|
|
||||||
body.setdefault('kwargs', {})
|
|
||||||
if 'dispatch_time' in bind_kwargs:
|
|
||||||
body['kwargs']['dispatch_time'] = tz_now().isoformat()
|
|
||||||
if 'worker_tasks' in bind_kwargs:
|
|
||||||
worker_tasks = {}
|
|
||||||
for worker in self.workers:
|
|
||||||
worker.calculate_managed_tasks()
|
|
||||||
worker_tasks[worker.pid] = list(worker.managed_tasks.keys())
|
|
||||||
body['kwargs']['worker_tasks'] = worker_tasks
|
|
||||||
|
|
||||||
def up(self):
|
|
||||||
if self.full:
|
|
||||||
# if we can't spawn more workers, just toss this message into a
|
|
||||||
# random worker's backlog
|
|
||||||
idx = random.choice(range(len(self.workers)))
|
|
||||||
return idx, self.workers[idx]
|
|
||||||
else:
|
|
||||||
self.scale_up_ct += 1
|
|
||||||
ret = super(AutoscalePool, self).up()
|
|
||||||
new_worker_ct = len(self.workers)
|
|
||||||
if new_worker_ct > self.worker_count_max:
|
|
||||||
self.worker_count_max = new_worker_ct
|
|
||||||
return ret
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def fast_task_serialization(current_task):
|
|
||||||
try:
|
|
||||||
return str(current_task.get('task')) + ' - ' + str(sorted(current_task.get('args', []))) + ' - ' + str(sorted(current_task.get('kwargs', {})))
|
|
||||||
except Exception:
|
|
||||||
# just make sure this does not make things worse
|
|
||||||
return str(current_task)
|
|
||||||
|
|
||||||
def write(self, preferred_queue, body):
|
|
||||||
if 'guid' in body:
|
|
||||||
set_guid(body['guid'])
|
|
||||||
try:
|
|
||||||
if isinstance(body, dict) and body.get('bind_kwargs'):
|
|
||||||
self.add_bind_kwargs(body)
|
|
||||||
if self.should_grow:
|
|
||||||
self.up()
|
|
||||||
# we don't care about "preferred queue" round robin distribution, just
|
|
||||||
# find the first non-busy worker and claim it
|
|
||||||
workers = self.workers[:]
|
|
||||||
random.shuffle(workers)
|
|
||||||
for w in workers:
|
|
||||||
if not w.busy:
|
|
||||||
w.put(body)
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
task_name = 'unknown'
|
|
||||||
if isinstance(body, dict):
|
|
||||||
task_name = body.get('task')
|
|
||||||
logger.warning(f'Workers maxed, queuing {task_name}, load: {sum(len(w.managed_tasks) for w in self.workers)} / {len(self.workers)}')
|
|
||||||
# Once every 10 seconds write out task list for debugging
|
|
||||||
if time.monotonic() - self.last_task_list_log >= 10.0:
|
|
||||||
task_counts = {}
|
|
||||||
for worker in self.workers:
|
|
||||||
task_slug = self.fast_task_serialization(worker.current_task)
|
|
||||||
task_counts.setdefault(task_slug, 0)
|
|
||||||
task_counts[task_slug] += 1
|
|
||||||
logger.info(f'Running tasks by count:\n{json.dumps(task_counts, indent=2)}')
|
|
||||||
self.last_task_list_log = time.monotonic()
|
|
||||||
return super(AutoscalePool, self).write(preferred_queue, body)
|
|
||||||
except Exception:
|
|
||||||
for conn in connections.all():
|
|
||||||
# If the database connection has a hiccup, re-establish a new
|
|
||||||
# connection
|
|
||||||
conn.close_if_unusable_or_obsolete()
|
|
||||||
logger.exception('failed to write inbound message')
|
|
||||||
|
|||||||
@@ -1,3 +1,3 @@
|
|||||||
from .base import AWXConsumerRedis, AWXConsumerPG, BaseWorker # noqa
|
from .base import AWXConsumerRedis, BaseWorker # noqa
|
||||||
from .callback import CallbackBrokerWorker # noqa
|
from .callback import CallbackBrokerWorker # noqa
|
||||||
from .task import TaskWorker # noqa
|
from .task import TaskWorker # noqa
|
||||||
|
|||||||
@@ -6,25 +6,17 @@ import logging
|
|||||||
import signal
|
import signal
|
||||||
import sys
|
import sys
|
||||||
import redis
|
import redis
|
||||||
import json
|
|
||||||
import psycopg
|
|
||||||
import time
|
import time
|
||||||
from uuid import UUID
|
|
||||||
from queue import Empty as QueueEmpty
|
from queue import Empty as QueueEmpty
|
||||||
from datetime import timedelta
|
|
||||||
|
|
||||||
from django import db
|
from django import db
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
import redis.exceptions
|
import redis.exceptions
|
||||||
|
|
||||||
from ansible_base.lib.logging.runtime import log_excess_runtime
|
|
||||||
|
|
||||||
from awx.main.utils.redis import get_redis_client
|
from awx.main.utils.redis import get_redis_client
|
||||||
from awx.main.dispatch.pool import WorkerPool
|
from awx.main.dispatch.pool import WorkerPool
|
||||||
from awx.main.dispatch.periodic import Scheduler
|
|
||||||
from awx.main.dispatch import pg_bus_conn
|
|
||||||
from awx.main.utils.db import set_connection_name
|
from awx.main.utils.db import set_connection_name
|
||||||
import awx.main.analytics.subsystem_metrics as s_metrics
|
|
||||||
|
|
||||||
if 'run_callback_receiver' in sys.argv:
|
if 'run_callback_receiver' in sys.argv:
|
||||||
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
logger = logging.getLogger('awx.main.commands.run_callback_receiver')
|
||||||
@@ -62,85 +54,6 @@ class AWXConsumerBase(object):
|
|||||||
self.pool.init_workers(self.worker.work_loop)
|
self.pool.init_workers(self.worker.work_loop)
|
||||||
self.redis = get_redis_client()
|
self.redis = get_redis_client()
|
||||||
|
|
||||||
@property
|
|
||||||
def listening_on(self):
|
|
||||||
return f'listening on {self.queues}'
|
|
||||||
|
|
||||||
def control(self, body):
|
|
||||||
logger.warning(f'Received control signal:\n{body}')
|
|
||||||
control = body.get('control')
|
|
||||||
if control in ('status', 'schedule', 'running', 'cancel'):
|
|
||||||
reply_queue = body['reply_to']
|
|
||||||
if control == 'status':
|
|
||||||
msg = '\n'.join([self.listening_on, self.pool.debug()])
|
|
||||||
if control == 'schedule':
|
|
||||||
msg = self.scheduler.debug()
|
|
||||||
elif control == 'running':
|
|
||||||
msg = []
|
|
||||||
for worker in self.pool.workers:
|
|
||||||
worker.calculate_managed_tasks()
|
|
||||||
msg.extend(worker.managed_tasks.keys())
|
|
||||||
elif control == 'cancel':
|
|
||||||
msg = []
|
|
||||||
task_ids = set(body['task_ids'])
|
|
||||||
for worker in self.pool.workers:
|
|
||||||
task = worker.current_task
|
|
||||||
if task and task['uuid'] in task_ids:
|
|
||||||
logger.warn(f'Sending SIGTERM to task id={task["uuid"]}, task={task.get("task")}, args={task.get("args")}')
|
|
||||||
os.kill(worker.pid, signal.SIGTERM)
|
|
||||||
msg.append(task['uuid'])
|
|
||||||
if task_ids and not msg:
|
|
||||||
logger.info(f'Could not locate running tasks to cancel with ids={task_ids}')
|
|
||||||
|
|
||||||
if reply_queue is not None:
|
|
||||||
with pg_bus_conn() as conn:
|
|
||||||
conn.notify(reply_queue, json.dumps(msg))
|
|
||||||
elif control == 'reload':
|
|
||||||
for worker in self.pool.workers:
|
|
||||||
worker.quit()
|
|
||||||
else:
|
|
||||||
logger.error('unrecognized control message: {}'.format(control))
|
|
||||||
|
|
||||||
def dispatch_task(self, body):
|
|
||||||
"""This will place the given body into a worker queue to run method decorated as a task"""
|
|
||||||
if isinstance(body, dict):
|
|
||||||
body['time_ack'] = time.time()
|
|
||||||
|
|
||||||
if len(self.pool):
|
|
||||||
if "uuid" in body and body['uuid']:
|
|
||||||
try:
|
|
||||||
queue = UUID(body['uuid']).int % len(self.pool)
|
|
||||||
except Exception:
|
|
||||||
queue = self.total_messages % len(self.pool)
|
|
||||||
else:
|
|
||||||
queue = self.total_messages % len(self.pool)
|
|
||||||
else:
|
|
||||||
queue = 0
|
|
||||||
self.pool.write(queue, body)
|
|
||||||
self.total_messages += 1
|
|
||||||
|
|
||||||
def process_task(self, body):
|
|
||||||
"""Routes the task details in body as either a control task or a task-task"""
|
|
||||||
if 'control' in body:
|
|
||||||
try:
|
|
||||||
return self.control(body)
|
|
||||||
except Exception:
|
|
||||||
logger.exception(f"Exception handling control message: {body}")
|
|
||||||
return
|
|
||||||
self.dispatch_task(body)
|
|
||||||
|
|
||||||
@log_excess_runtime(logger, debug_cutoff=0.05, cutoff=0.2)
|
|
||||||
def record_statistics(self):
|
|
||||||
if time.time() - self.last_stats > 1: # buffer stat recording to once per second
|
|
||||||
save_data = self.pool.debug()
|
|
||||||
try:
|
|
||||||
self.redis.set(f'awx_{self.name}_statistics', save_data)
|
|
||||||
except redis.exceptions.ConnectionError as exc:
|
|
||||||
logger.warning(f'Redis connection error saving {self.name} status data:\n{exc}\nmissed data:\n{save_data}')
|
|
||||||
except Exception:
|
|
||||||
logger.exception(f"Unknown redis error saving {self.name} status data:\nmissed data:\n{save_data}")
|
|
||||||
self.last_stats = time.time()
|
|
||||||
|
|
||||||
def run(self, *args, **kwargs):
|
def run(self, *args, **kwargs):
|
||||||
signal.signal(signal.SIGINT, self.stop)
|
signal.signal(signal.SIGINT, self.stop)
|
||||||
signal.signal(signal.SIGTERM, self.stop)
|
signal.signal(signal.SIGTERM, self.stop)
|
||||||
@@ -165,140 +78,6 @@ class AWXConsumerRedis(AWXConsumerBase):
|
|||||||
time.sleep(60)
|
time.sleep(60)
|
||||||
|
|
||||||
|
|
||||||
class AWXConsumerPG(AWXConsumerBase):
|
|
||||||
def __init__(self, *args, schedule=None, **kwargs):
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self.pg_max_wait = getattr(settings, 'DISPATCHER_DB_DOWNTOWN_TOLLERANCE', settings.DISPATCHER_DB_DOWNTIME_TOLERANCE)
|
|
||||||
# if no successful loops have ran since startup, then we should fail right away
|
|
||||||
self.pg_is_down = True # set so that we fail if we get database errors on startup
|
|
||||||
init_time = time.time()
|
|
||||||
self.pg_down_time = init_time - self.pg_max_wait # allow no grace period
|
|
||||||
self.last_cleanup = init_time
|
|
||||||
self.subsystem_metrics = s_metrics.DispatcherMetrics(auto_pipe_execute=False)
|
|
||||||
self.last_metrics_gather = init_time
|
|
||||||
self.listen_cumulative_time = 0.0
|
|
||||||
if schedule:
|
|
||||||
schedule = schedule.copy()
|
|
||||||
else:
|
|
||||||
schedule = {}
|
|
||||||
# add control tasks to be ran at regular schedules
|
|
||||||
# NOTE: if we run out of database connections, it is important to still run cleanup
|
|
||||||
# so that we scale down workers and free up connections
|
|
||||||
schedule['pool_cleanup'] = {'control': self.pool.cleanup, 'schedule': timedelta(seconds=60)}
|
|
||||||
# record subsystem metrics for the dispatcher
|
|
||||||
schedule['metrics_gather'] = {'control': self.record_metrics, 'schedule': timedelta(seconds=20)}
|
|
||||||
self.scheduler = Scheduler(schedule)
|
|
||||||
|
|
||||||
@log_excess_runtime(logger, debug_cutoff=0.05, cutoff=0.2)
|
|
||||||
def record_metrics(self):
|
|
||||||
current_time = time.time()
|
|
||||||
self.pool.produce_subsystem_metrics(self.subsystem_metrics)
|
|
||||||
self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather))
|
|
||||||
try:
|
|
||||||
self.subsystem_metrics.pipe_execute()
|
|
||||||
except redis.exceptions.ConnectionError as exc:
|
|
||||||
logger.warning(f'Redis connection error saving dispatcher metrics, error:\n{exc}')
|
|
||||||
self.listen_cumulative_time = 0.0
|
|
||||||
self.last_metrics_gather = current_time
|
|
||||||
|
|
||||||
def run_periodic_tasks(self):
|
|
||||||
"""
|
|
||||||
Run general periodic logic, and return maximum time in seconds before
|
|
||||||
the next requested run
|
|
||||||
This may be called more often than that when events are consumed
|
|
||||||
so this should be very efficient in that
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
self.record_statistics() # maintains time buffer in method
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning(f'Failed to save dispatcher statistics {exc}')
|
|
||||||
|
|
||||||
# Everything benchmarks to the same original time, so that skews due to
|
|
||||||
# runtime of the actions, themselves, do not mess up scheduling expectations
|
|
||||||
reftime = time.time()
|
|
||||||
|
|
||||||
for job in self.scheduler.get_and_mark_pending(reftime=reftime):
|
|
||||||
if 'control' in job.data:
|
|
||||||
try:
|
|
||||||
job.data['control']()
|
|
||||||
except Exception:
|
|
||||||
logger.exception(f'Error running control task {job.data}')
|
|
||||||
elif 'task' in job.data:
|
|
||||||
body = self.worker.resolve_callable(job.data['task']).get_async_body()
|
|
||||||
# bypasses pg_notify for scheduled tasks
|
|
||||||
self.dispatch_task(body)
|
|
||||||
|
|
||||||
if self.pg_is_down:
|
|
||||||
logger.info('Dispatcher listener connection established')
|
|
||||||
self.pg_is_down = False
|
|
||||||
|
|
||||||
self.listen_start = time.time()
|
|
||||||
|
|
||||||
return self.scheduler.time_until_next_run(reftime=reftime)
|
|
||||||
|
|
||||||
def run(self, *args, **kwargs):
|
|
||||||
super(AWXConsumerPG, self).run(*args, **kwargs)
|
|
||||||
|
|
||||||
logger.info(f"Running {self.name}, workers min={self.pool.min_workers} max={self.pool.max_workers}, listening to queues {self.queues}")
|
|
||||||
init = False
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
with pg_bus_conn(new_connection=True) as conn:
|
|
||||||
for queue in self.queues:
|
|
||||||
conn.listen(queue)
|
|
||||||
if init is False:
|
|
||||||
self.worker.on_start()
|
|
||||||
init = True
|
|
||||||
# run_periodic_tasks run scheduled actions and gives time until next scheduled action
|
|
||||||
# this is saved to the conn (PubSub) object in order to modify read timeout in-loop
|
|
||||||
conn.select_timeout = self.run_periodic_tasks()
|
|
||||||
# this is the main operational loop for awx-manage run_dispatcher
|
|
||||||
for e in conn.events(yield_timeouts=True):
|
|
||||||
self.listen_cumulative_time += time.time() - self.listen_start # for metrics
|
|
||||||
if e is not None:
|
|
||||||
self.process_task(json.loads(e.payload))
|
|
||||||
conn.select_timeout = self.run_periodic_tasks()
|
|
||||||
if self.should_stop:
|
|
||||||
return
|
|
||||||
except psycopg.InterfaceError:
|
|
||||||
logger.warning("Stale Postgres message bus connection, reconnecting")
|
|
||||||
continue
|
|
||||||
except (db.DatabaseError, psycopg.OperationalError):
|
|
||||||
# If we have attained stady state operation, tolerate short-term database hickups
|
|
||||||
if not self.pg_is_down:
|
|
||||||
logger.exception(f"Error consuming new events from postgres, will retry for {self.pg_max_wait} s")
|
|
||||||
self.pg_down_time = time.time()
|
|
||||||
self.pg_is_down = True
|
|
||||||
current_downtime = time.time() - self.pg_down_time
|
|
||||||
if current_downtime > self.pg_max_wait:
|
|
||||||
logger.exception(f"Postgres event consumer has not recovered in {current_downtime} s, exiting")
|
|
||||||
# Sending QUIT to multiprocess queue to signal workers to exit
|
|
||||||
for worker in self.pool.workers:
|
|
||||||
try:
|
|
||||||
worker.quit()
|
|
||||||
except Exception:
|
|
||||||
logger.exception(f"Error sending QUIT to worker {worker}")
|
|
||||||
raise
|
|
||||||
# Wait for a second before next attempt, but still listen for any shutdown signals
|
|
||||||
for i in range(10):
|
|
||||||
if self.should_stop:
|
|
||||||
return
|
|
||||||
time.sleep(0.1)
|
|
||||||
for conn in db.connections.all():
|
|
||||||
conn.close_if_unusable_or_obsolete()
|
|
||||||
except Exception:
|
|
||||||
# Log unanticipated exception in addition to writing to stderr to get timestamps and other metadata
|
|
||||||
logger.exception('Encountered unhandled error in dispatcher main loop')
|
|
||||||
# Sending QUIT to multiprocess queue to signal workers to exit
|
|
||||||
for worker in self.pool.workers:
|
|
||||||
try:
|
|
||||||
worker.quit()
|
|
||||||
except Exception:
|
|
||||||
logger.exception(f"Error sending QUIT to worker {worker}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
class BaseWorker(object):
|
class BaseWorker(object):
|
||||||
def read(self, queue):
|
def read(self, queue):
|
||||||
return queue.get(block=True, timeout=1)
|
return queue.get(block=True, timeout=1)
|
||||||
|
|||||||
@@ -15,6 +15,9 @@ import subprocess
|
|||||||
import tempfile
|
import tempfile
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
|
||||||
|
# Dispatcher
|
||||||
|
from dispatcherd.factories import get_control_from_settings
|
||||||
|
|
||||||
# Django
|
# Django
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.db import models, connection, transaction
|
from django.db import models, connection, transaction
|
||||||
@@ -1499,7 +1502,6 @@ class UnifiedJob(
|
|||||||
# Special case for task manager (used during workflow job cancellation)
|
# Special case for task manager (used during workflow job cancellation)
|
||||||
if not connection.get_autocommit():
|
if not connection.get_autocommit():
|
||||||
try:
|
try:
|
||||||
from dispatcherd.factories import get_control_from_settings
|
|
||||||
|
|
||||||
ctl = get_control_from_settings()
|
ctl = get_control_from_settings()
|
||||||
ctl.control('cancel', data={'uuid': self.celery_task_id})
|
ctl.control('cancel', data={'uuid': self.celery_task_id})
|
||||||
@@ -1510,7 +1512,6 @@ class UnifiedJob(
|
|||||||
# Standard case with reply
|
# Standard case with reply
|
||||||
try:
|
try:
|
||||||
timeout = 5
|
timeout = 5
|
||||||
from dispatcherd.factories import get_control_from_settings
|
|
||||||
|
|
||||||
ctl = get_control_from_settings()
|
ctl = get_control_from_settings()
|
||||||
results = ctl.control_with_reply('cancel', data={'uuid': self.celery_task_id}, expected_replies=1, timeout=timeout)
|
results = ctl.control_with_reply('cancel', data={'uuid': self.celery_task_id}, expected_replies=1, timeout=timeout)
|
||||||
|
|||||||
@@ -622,40 +622,8 @@ def inspect_execution_and_hop_nodes(instance_list):
|
|||||||
execution_node_health_check.apply_async([hostname])
|
execution_node_health_check.apply_async([hostname])
|
||||||
|
|
||||||
|
|
||||||
@task(queue=get_task_queuename, bind_kwargs=['dispatch_time', 'worker_tasks'])
|
|
||||||
def cluster_node_heartbeat(dispatch_time=None, worker_tasks=None):
|
|
||||||
"""
|
|
||||||
Original implementation for AWX dispatcher.
|
|
||||||
Uses worker_tasks from bind_kwargs to track running tasks.
|
|
||||||
"""
|
|
||||||
# Run common instance management logic
|
|
||||||
this_inst, instance_list, lost_instances = _heartbeat_instance_management()
|
|
||||||
if this_inst is None:
|
|
||||||
return # Early return case from instance management
|
|
||||||
|
|
||||||
# Check versions
|
|
||||||
_heartbeat_check_versions(this_inst, instance_list)
|
|
||||||
|
|
||||||
# Handle lost instances
|
|
||||||
_heartbeat_handle_lost_instances(lost_instances, this_inst)
|
|
||||||
|
|
||||||
# Run local reaper - original implementation using worker_tasks
|
|
||||||
if worker_tasks is not None:
|
|
||||||
active_task_ids = []
|
|
||||||
for task_list in worker_tasks.values():
|
|
||||||
active_task_ids.extend(task_list)
|
|
||||||
|
|
||||||
# Convert dispatch_time to datetime
|
|
||||||
ref_time = datetime.fromisoformat(dispatch_time) if dispatch_time else now()
|
|
||||||
|
|
||||||
reaper.reap(instance=this_inst, excluded_uuids=active_task_ids, ref_time=ref_time)
|
|
||||||
|
|
||||||
if max(len(task_list) for task_list in worker_tasks.values()) <= 1:
|
|
||||||
reaper.reap_waiting(instance=this_inst, excluded_uuids=active_task_ids, ref_time=ref_time)
|
|
||||||
|
|
||||||
|
|
||||||
@task(queue=get_task_queuename, bind=True)
|
@task(queue=get_task_queuename, bind=True)
|
||||||
def adispatch_cluster_node_heartbeat(binder):
|
def cluster_node_heartbeat(binder):
|
||||||
"""
|
"""
|
||||||
Dispatcherd implementation.
|
Dispatcherd implementation.
|
||||||
Uses Control API to get running tasks.
|
Uses Control API to get running tasks.
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import pytest
|
|||||||
|
|
||||||
from awx.main.models import Job, WorkflowJob, Instance
|
from awx.main.models import Job, WorkflowJob, Instance
|
||||||
from awx.main.dispatch import reaper
|
from awx.main.dispatch import reaper
|
||||||
from awx.main.dispatch.publish import task
|
from dispatcherd.publish import task
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Prevent logger.<warn, debug, error> calls from triggering database operations
|
Prevent logger.<warn, debug, error> calls from triggering database operations
|
||||||
|
|||||||
@@ -454,7 +454,7 @@ for options in CELERYBEAT_SCHEDULE.values():
|
|||||||
task_name = options['task']
|
task_name = options['task']
|
||||||
# Handle the only one exception case of the heartbeat which has a new implementation
|
# Handle the only one exception case of the heartbeat which has a new implementation
|
||||||
if task_name == 'awx.main.tasks.system.cluster_node_heartbeat':
|
if task_name == 'awx.main.tasks.system.cluster_node_heartbeat':
|
||||||
task_name = 'awx.main.tasks.system.adispatch_cluster_node_heartbeat'
|
task_name = 'awx.main.tasks.system.cluster_node_heartbeat'
|
||||||
new_options['task'] = task_name
|
new_options['task'] = task_name
|
||||||
new_options['schedule'] = options['schedule'].total_seconds()
|
new_options['schedule'] = options['schedule'].total_seconds()
|
||||||
DISPATCHER_SCHEDULE[task_name] = new_options
|
DISPATCHER_SCHEDULE[task_name] = new_options
|
||||||
|
|||||||
Reference in New Issue
Block a user