mirror of
https://github.com/ansible/awx.git
synced 2026-02-05 19:44:43 -03:30
Compare commits
19 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4cd90163fc | ||
|
|
8dc6ceffee | ||
|
|
2c7184f9d2 | ||
|
|
5cf93febaa | ||
|
|
284bd8377a | ||
|
|
14992cee17 | ||
|
|
6db663eacb | ||
|
|
87bb70bcc0 | ||
|
|
c2d02841e8 | ||
|
|
e5a6007bf1 | ||
|
|
6f9ea1892b | ||
|
|
abc56305cc | ||
|
|
9bb6786a58 | ||
|
|
aec9a9ca56 | ||
|
|
7e4cf859f5 | ||
|
|
90c3d8a275 | ||
|
|
6d1c8de4ed | ||
|
|
601b62deef | ||
|
|
131dd088cd |
35
.github/workflows/pr_body_check_jira.yml
vendored
Normal file
35
.github/workflows/pr_body_check_jira.yml
vendored
Normal file
@@ -0,0 +1,35 @@
|
||||
---
|
||||
name: Check body for reference to jira
|
||||
on:
|
||||
pull_request:
|
||||
branches:
|
||||
- release_**
|
||||
jobs:
|
||||
pr-check:
|
||||
if: github.repository_owner == 'ansible' && github.repository != 'awx'
|
||||
name: Scan PR description for JIRA links
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
packages: write
|
||||
contents: read
|
||||
steps:
|
||||
- name: Check for JIRA lines
|
||||
env:
|
||||
PR_BODY: ${{ github.event.pull_request.body }}
|
||||
run: |
|
||||
echo "$PR_BODY" | grep "JIRA: None" > no_jira
|
||||
echo "$PR_BODY" | grep "JIRA: https://.*[0-9]+"> jira
|
||||
exit 0
|
||||
# We exit 0 and set the shell to prevent the returns from the greps from failing this step
|
||||
# See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#exit-codes-and-error-action-preference
|
||||
shell: bash {0}
|
||||
|
||||
- name: Check for exactly one item
|
||||
run: |
|
||||
if [ $(cat no_jira jira | wc -l) != 1 ] ; then
|
||||
echo "The PR body must contain exactly one of [ 'JIRA: None' or 'JIRA: <one or more links>' ]"
|
||||
echo "We counted $(cat no_jira jira | wc -l)"
|
||||
exit 255;
|
||||
else
|
||||
exit 0;
|
||||
fi
|
||||
@@ -4,6 +4,6 @@
|
||||
|
||||
Early versions of AWX did not support seamless upgrades between major versions and required the use of a backup and restore tool to perform upgrades.
|
||||
|
||||
Users who wish to upgrade modern AWX installations should follow the instructions at:
|
||||
As of version 18.0, `awx-operator` is the preferred install/upgrade method. Users who wish to upgrade modern AWX installations should follow the instructions at:
|
||||
|
||||
https://github.com/ansible/awx/blob/devel/INSTALL.md#upgrading-from-previous-versions
|
||||
https://github.com/ansible/awx-operator/blob/devel/docs/upgrade/upgrading.md
|
||||
|
||||
@@ -366,9 +366,9 @@ class BaseAccess(object):
|
||||
report_violation = lambda message: None
|
||||
else:
|
||||
report_violation = lambda message: logger.warning(message)
|
||||
if validation_info.get('trial', False) is True or validation_info['instance_count'] == 10: # basic 10 license
|
||||
if validation_info.get('trial', False) is True:
|
||||
|
||||
def report_violation(message):
|
||||
def report_violation(message): # noqa
|
||||
raise PermissionDenied(message)
|
||||
|
||||
if check_expiration and validation_info.get('time_remaining', None) is None:
|
||||
|
||||
@@ -613,3 +613,20 @@ def host_metric_table(since, full_path, until, **kwargs):
|
||||
since.isoformat(), until.isoformat(), since.isoformat(), until.isoformat()
|
||||
)
|
||||
return _copy_table(table='host_metric', query=host_metric_query, path=full_path)
|
||||
|
||||
|
||||
@register('host_metric_summary_monthly_table', '1.0', format='csv', description=_('HostMetricSummaryMonthly export, full sync'), expensive=trivial_slicing)
|
||||
def host_metric_summary_monthly_table(since, full_path, **kwargs):
|
||||
query = '''
|
||||
COPY (SELECT main_hostmetricsummarymonthly.id,
|
||||
main_hostmetricsummarymonthly.date,
|
||||
main_hostmetricsummarymonthly.license_capacity,
|
||||
main_hostmetricsummarymonthly.license_consumed,
|
||||
main_hostmetricsummarymonthly.hosts_added,
|
||||
main_hostmetricsummarymonthly.hosts_deleted,
|
||||
main_hostmetricsummarymonthly.indirectly_managed_hosts
|
||||
FROM main_hostmetricsummarymonthly
|
||||
ORDER BY main_hostmetricsummarymonthly.id ASC) TO STDOUT WITH CSV HEADER
|
||||
'''
|
||||
|
||||
return _copy_table(table='host_metric_summary_monthly', query=query, path=full_path)
|
||||
|
||||
@@ -40,8 +40,12 @@ def get_task_queuename():
|
||||
|
||||
|
||||
class PubSub(object):
|
||||
def __init__(self, conn):
|
||||
def __init__(self, conn, select_timeout=None):
|
||||
self.conn = conn
|
||||
if select_timeout is None:
|
||||
self.select_timeout = 5
|
||||
else:
|
||||
self.select_timeout = select_timeout
|
||||
|
||||
def listen(self, channel):
|
||||
with self.conn.cursor() as cur:
|
||||
@@ -72,12 +76,12 @@ class PubSub(object):
|
||||
n = psycopg.connection.Notify(pgn.relname.decode(enc), pgn.extra.decode(enc), pgn.be_pid)
|
||||
yield n
|
||||
|
||||
def events(self, select_timeout=5, yield_timeouts=False):
|
||||
def events(self, yield_timeouts=False):
|
||||
if not self.conn.autocommit:
|
||||
raise RuntimeError('Listening for events can only be done in autocommit mode')
|
||||
|
||||
while True:
|
||||
if select.select([self.conn], [], [], select_timeout) == NOT_READY:
|
||||
if select.select([self.conn], [], [], self.select_timeout) == NOT_READY:
|
||||
if yield_timeouts:
|
||||
yield None
|
||||
else:
|
||||
@@ -90,7 +94,7 @@ class PubSub(object):
|
||||
|
||||
|
||||
@contextmanager
|
||||
def pg_bus_conn(new_connection=False):
|
||||
def pg_bus_conn(new_connection=False, select_timeout=None):
|
||||
'''
|
||||
Any listeners probably want to establish a new database connection,
|
||||
separate from the Django connection used for queries, because that will prevent
|
||||
@@ -115,7 +119,7 @@ def pg_bus_conn(new_connection=False):
|
||||
raise RuntimeError('Unexpectedly could not connect to postgres for pg_notify actions')
|
||||
conn = pg_connection.connection
|
||||
|
||||
pubsub = PubSub(conn)
|
||||
pubsub = PubSub(conn, select_timeout=select_timeout)
|
||||
yield pubsub
|
||||
if new_connection:
|
||||
conn.close()
|
||||
|
||||
@@ -40,6 +40,9 @@ class Control(object):
|
||||
def cancel(self, task_ids, *args, **kwargs):
|
||||
return self.control_with_reply('cancel', *args, extra_data={'task_ids': task_ids}, **kwargs)
|
||||
|
||||
def schedule(self, *args, **kwargs):
|
||||
return self.control_with_reply('schedule', *args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def generate_reply_queue_name(cls):
|
||||
return f"reply_to_{str(uuid.uuid4()).replace('-','_')}"
|
||||
@@ -52,14 +55,14 @@ class Control(object):
|
||||
if not connection.get_autocommit():
|
||||
raise RuntimeError('Control-with-reply messages can only be done in autocommit mode')
|
||||
|
||||
with pg_bus_conn() as conn:
|
||||
with pg_bus_conn(select_timeout=timeout) as conn:
|
||||
conn.listen(reply_queue)
|
||||
send_data = {'control': command, 'reply_to': reply_queue}
|
||||
if extra_data:
|
||||
send_data.update(extra_data)
|
||||
conn.notify(self.queuename, json.dumps(send_data))
|
||||
|
||||
for reply in conn.events(select_timeout=timeout, yield_timeouts=True):
|
||||
for reply in conn.events(yield_timeouts=True):
|
||||
if reply is None:
|
||||
logger.error(f'{self.service} did not reply within {timeout}s')
|
||||
raise RuntimeError(f"{self.service} did not reply within {timeout}s")
|
||||
|
||||
@@ -1,57 +1,142 @@
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from multiprocessing import Process
|
||||
import yaml
|
||||
from datetime import datetime
|
||||
|
||||
from django.conf import settings
|
||||
from django.db import connections
|
||||
from schedule import Scheduler
|
||||
from django_guid import set_guid
|
||||
from django_guid.utils import generate_guid
|
||||
|
||||
from awx.main.dispatch.worker import TaskWorker
|
||||
from awx.main.utils.db import set_connection_name
|
||||
|
||||
logger = logging.getLogger('awx.main.dispatch.periodic')
|
||||
|
||||
|
||||
class Scheduler(Scheduler):
|
||||
def run_continuously(self):
|
||||
idle_seconds = max(1, min(self.jobs).period.total_seconds() / 2)
|
||||
class ScheduledTask:
|
||||
"""
|
||||
Class representing schedules, very loosely modeled after python schedule library Job
|
||||
the idea of this class is to:
|
||||
- only deal in relative times (time since the scheduler global start)
|
||||
- only deal in integer math for target runtimes, but float for current relative time
|
||||
|
||||
def run():
|
||||
ppid = os.getppid()
|
||||
logger.warning('periodic beat started')
|
||||
Missed schedule policy:
|
||||
Invariant target times are maintained, meaning that if interval=10s offset=0
|
||||
and it runs at t=7s, then it calls for next run in 3s.
|
||||
However, if a complete interval has passed, that is counted as a missed run,
|
||||
and missed runs are abandoned (no catch-up runs).
|
||||
"""
|
||||
|
||||
set_connection_name('periodic') # set application_name to distinguish from other dispatcher processes
|
||||
def __init__(self, name: str, data: dict):
|
||||
# parameters need for schedule computation
|
||||
self.interval = int(data['schedule'].total_seconds())
|
||||
self.offset = 0 # offset relative to start time this schedule begins
|
||||
self.index = 0 # number of periods of the schedule that has passed
|
||||
|
||||
while True:
|
||||
if os.getppid() != ppid:
|
||||
# if the parent PID changes, this process has been orphaned
|
||||
# via e.g., segfault or sigkill, we should exit too
|
||||
pid = os.getpid()
|
||||
logger.warning(f'periodic beat exiting gracefully pid:{pid}')
|
||||
raise SystemExit()
|
||||
try:
|
||||
for conn in connections.all():
|
||||
# If the database connection has a hiccup, re-establish a new
|
||||
# connection
|
||||
conn.close_if_unusable_or_obsolete()
|
||||
set_guid(generate_guid())
|
||||
self.run_pending()
|
||||
except Exception:
|
||||
logger.exception('encountered an error while scheduling periodic tasks')
|
||||
time.sleep(idle_seconds)
|
||||
# parameters that do not affect scheduling logic
|
||||
self.last_run = None # time of last run, only used for debug
|
||||
self.completed_runs = 0 # number of times schedule is known to run
|
||||
self.name = name
|
||||
self.data = data # used by caller to know what to run
|
||||
|
||||
process = Process(target=run)
|
||||
process.daemon = True
|
||||
process.start()
|
||||
@property
|
||||
def next_run(self):
|
||||
"Time until the next run with t=0 being the global_start of the scheduler class"
|
||||
return (self.index + 1) * self.interval + self.offset
|
||||
|
||||
def due_to_run(self, relative_time):
|
||||
return bool(self.next_run <= relative_time)
|
||||
|
||||
def expected_runs(self, relative_time):
|
||||
return int((relative_time - self.offset) / self.interval)
|
||||
|
||||
def mark_run(self, relative_time):
|
||||
self.last_run = relative_time
|
||||
self.completed_runs += 1
|
||||
new_index = self.expected_runs(relative_time)
|
||||
if new_index > self.index + 1:
|
||||
logger.warning(f'Missed {new_index - self.index - 1} schedules of {self.name}')
|
||||
self.index = new_index
|
||||
|
||||
def missed_runs(self, relative_time):
|
||||
"Number of times job was supposed to ran but failed to, only used for debug"
|
||||
missed_ct = self.expected_runs(relative_time) - self.completed_runs
|
||||
# if this is currently due to run do not count that as a missed run
|
||||
if missed_ct and self.due_to_run(relative_time):
|
||||
missed_ct -= 1
|
||||
return missed_ct
|
||||
|
||||
|
||||
def run_continuously():
|
||||
scheduler = Scheduler()
|
||||
for task in settings.CELERYBEAT_SCHEDULE.values():
|
||||
apply_async = TaskWorker.resolve_callable(task['task']).apply_async
|
||||
total_seconds = task['schedule'].total_seconds()
|
||||
scheduler.every(total_seconds).seconds.do(apply_async)
|
||||
scheduler.run_continuously()
|
||||
class Scheduler:
|
||||
def __init__(self, schedule):
|
||||
"""
|
||||
Expects schedule in the form of a dictionary like
|
||||
{
|
||||
'job1': {'schedule': timedelta(seconds=50), 'other': 'stuff'}
|
||||
}
|
||||
Only the schedule nearest-second value is used for scheduling,
|
||||
the rest of the data is for use by the caller to know what to run.
|
||||
"""
|
||||
self.jobs = [ScheduledTask(name, data) for name, data in schedule.items()]
|
||||
min_interval = min(job.interval for job in self.jobs)
|
||||
num_jobs = len(self.jobs)
|
||||
|
||||
# this is intentionally oppioniated against spammy schedules
|
||||
# a core goal is to spread out the scheduled tasks (for worker management)
|
||||
# and high-frequency schedules just do not work with that
|
||||
if num_jobs > min_interval:
|
||||
raise RuntimeError(f'Number of schedules ({num_jobs}) is more than the shortest schedule interval ({min_interval} seconds).')
|
||||
|
||||
# even space out jobs over the base interval
|
||||
for i, job in enumerate(self.jobs):
|
||||
job.offset = (i * min_interval) // num_jobs
|
||||
|
||||
# internally times are all referenced relative to startup time, add grace period
|
||||
self.global_start = time.time() + 2.0
|
||||
|
||||
def get_and_mark_pending(self):
|
||||
relative_time = time.time() - self.global_start
|
||||
to_run = []
|
||||
for job in self.jobs:
|
||||
if job.due_to_run(relative_time):
|
||||
to_run.append(job)
|
||||
logger.debug(f'scheduler found {job.name} to run, {relative_time - job.next_run} seconds after target')
|
||||
job.mark_run(relative_time)
|
||||
return to_run
|
||||
|
||||
def time_until_next_run(self):
|
||||
relative_time = time.time() - self.global_start
|
||||
next_job = min(self.jobs, key=lambda j: j.next_run)
|
||||
delta = next_job.next_run - relative_time
|
||||
if delta <= 0.1:
|
||||
# careful not to give 0 or negative values to the select timeout, which has unclear interpretation
|
||||
logger.warning(f'Scheduler next run of {next_job.name} is {-delta} seconds in the past')
|
||||
return 0.1
|
||||
elif delta > 20.0:
|
||||
logger.warning(f'Scheduler next run unexpectedly over 20 seconds in future: {delta}')
|
||||
return 20.0
|
||||
logger.debug(f'Scheduler next run is {next_job.name} in {delta} seconds')
|
||||
return delta
|
||||
|
||||
def debug(self, *args, **kwargs):
|
||||
data = dict()
|
||||
data['title'] = 'Scheduler status'
|
||||
|
||||
now = datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S UTC')
|
||||
start_time = datetime.fromtimestamp(self.global_start).strftime('%Y-%m-%d %H:%M:%S UTC')
|
||||
relative_time = time.time() - self.global_start
|
||||
data['started_time'] = start_time
|
||||
data['current_time'] = now
|
||||
data['current_time_relative'] = round(relative_time, 3)
|
||||
data['total_schedules'] = len(self.jobs)
|
||||
|
||||
data['schedule_list'] = dict(
|
||||
[
|
||||
(
|
||||
job.name,
|
||||
dict(
|
||||
last_run_seconds_ago=round(relative_time - job.last_run, 3) if job.last_run else None,
|
||||
next_run_in_seconds=round(job.next_run - relative_time, 3),
|
||||
offset_in_seconds=job.offset,
|
||||
completed_runs=job.completed_runs,
|
||||
missed_runs=job.missed_runs(relative_time),
|
||||
),
|
||||
)
|
||||
for job in sorted(self.jobs, key=lambda job: job.interval)
|
||||
]
|
||||
)
|
||||
return yaml.safe_dump(data, default_flow_style=False, sort_keys=False)
|
||||
|
||||
@@ -73,15 +73,15 @@ class task:
|
||||
return cls.apply_async(args, kwargs)
|
||||
|
||||
@classmethod
|
||||
def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
|
||||
def get_async_body(cls, args=None, kwargs=None, uuid=None, **kw):
|
||||
"""
|
||||
Get the python dict to become JSON data in the pg_notify message
|
||||
This same message gets passed over the dispatcher IPC queue to workers
|
||||
If a task is submitted to a multiprocessing pool, skipping pg_notify, this might be used directly
|
||||
"""
|
||||
task_id = uuid or str(uuid4())
|
||||
args = args or []
|
||||
kwargs = kwargs or {}
|
||||
queue = queue or getattr(cls.queue, 'im_func', cls.queue)
|
||||
if not queue:
|
||||
msg = f'{cls.name}: Queue value required and may not be None'
|
||||
logger.error(msg)
|
||||
raise ValueError(msg)
|
||||
obj = {'uuid': task_id, 'args': args, 'kwargs': kwargs, 'task': cls.name, 'time_pub': time.time()}
|
||||
guid = get_guid()
|
||||
if guid:
|
||||
@@ -89,6 +89,16 @@ class task:
|
||||
if bind_kwargs:
|
||||
obj['bind_kwargs'] = bind_kwargs
|
||||
obj.update(**kw)
|
||||
return obj
|
||||
|
||||
@classmethod
|
||||
def apply_async(cls, args=None, kwargs=None, queue=None, uuid=None, **kw):
|
||||
queue = queue or getattr(cls.queue, 'im_func', cls.queue)
|
||||
if not queue:
|
||||
msg = f'{cls.name}: Queue value required and may not be None'
|
||||
logger.error(msg)
|
||||
raise ValueError(msg)
|
||||
obj = cls.get_async_body(args=args, kwargs=kwargs, uuid=uuid, **kw)
|
||||
if callable(queue):
|
||||
queue = queue()
|
||||
if not is_testing():
|
||||
@@ -116,4 +126,5 @@ class task:
|
||||
setattr(fn, 'name', cls.name)
|
||||
setattr(fn, 'apply_async', cls.apply_async)
|
||||
setattr(fn, 'delay', cls.delay)
|
||||
setattr(fn, 'get_async_body', cls.get_async_body)
|
||||
return fn
|
||||
|
||||
@@ -11,11 +11,13 @@ import psycopg
|
||||
import time
|
||||
from uuid import UUID
|
||||
from queue import Empty as QueueEmpty
|
||||
from datetime import timedelta
|
||||
|
||||
from django import db
|
||||
from django.conf import settings
|
||||
|
||||
from awx.main.dispatch.pool import WorkerPool
|
||||
from awx.main.dispatch.periodic import Scheduler
|
||||
from awx.main.dispatch import pg_bus_conn
|
||||
from awx.main.utils.common import log_excess_runtime
|
||||
from awx.main.utils.db import set_connection_name
|
||||
@@ -64,10 +66,12 @@ class AWXConsumerBase(object):
|
||||
def control(self, body):
|
||||
logger.warning(f'Received control signal:\n{body}')
|
||||
control = body.get('control')
|
||||
if control in ('status', 'running', 'cancel'):
|
||||
if control in ('status', 'schedule', 'running', 'cancel'):
|
||||
reply_queue = body['reply_to']
|
||||
if control == 'status':
|
||||
msg = '\n'.join([self.listening_on, self.pool.debug()])
|
||||
if control == 'schedule':
|
||||
msg = self.scheduler.debug()
|
||||
elif control == 'running':
|
||||
msg = []
|
||||
for worker in self.pool.workers:
|
||||
@@ -93,16 +97,11 @@ class AWXConsumerBase(object):
|
||||
else:
|
||||
logger.error('unrecognized control message: {}'.format(control))
|
||||
|
||||
def process_task(self, body):
|
||||
def dispatch_task(self, body):
|
||||
"""This will place the given body into a worker queue to run method decorated as a task"""
|
||||
if isinstance(body, dict):
|
||||
body['time_ack'] = time.time()
|
||||
|
||||
if 'control' in body:
|
||||
try:
|
||||
return self.control(body)
|
||||
except Exception:
|
||||
logger.exception(f"Exception handling control message: {body}")
|
||||
return
|
||||
if len(self.pool):
|
||||
if "uuid" in body and body['uuid']:
|
||||
try:
|
||||
@@ -116,6 +115,16 @@ class AWXConsumerBase(object):
|
||||
self.pool.write(queue, body)
|
||||
self.total_messages += 1
|
||||
|
||||
def process_task(self, body):
|
||||
"""Routes the task details in body as either a control task or a task-task"""
|
||||
if 'control' in body:
|
||||
try:
|
||||
return self.control(body)
|
||||
except Exception:
|
||||
logger.exception(f"Exception handling control message: {body}")
|
||||
return
|
||||
self.dispatch_task(body)
|
||||
|
||||
@log_excess_runtime(logger)
|
||||
def record_statistics(self):
|
||||
if time.time() - self.last_stats > 1: # buffer stat recording to once per second
|
||||
@@ -150,7 +159,7 @@ class AWXConsumerRedis(AWXConsumerBase):
|
||||
|
||||
|
||||
class AWXConsumerPG(AWXConsumerBase):
|
||||
def __init__(self, *args, **kwargs):
|
||||
def __init__(self, *args, schedule=None, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.pg_max_wait = settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE
|
||||
# if no successful loops have ran since startup, then we should fail right away
|
||||
@@ -161,27 +170,53 @@ class AWXConsumerPG(AWXConsumerBase):
|
||||
self.subsystem_metrics = s_metrics.Metrics(auto_pipe_execute=False)
|
||||
self.last_metrics_gather = init_time
|
||||
self.listen_cumulative_time = 0.0
|
||||
if schedule:
|
||||
schedule = schedule.copy()
|
||||
else:
|
||||
schedule = {}
|
||||
# add control tasks to be ran at regular schedules
|
||||
# NOTE: if we run out of database connections, it is important to still run cleanup
|
||||
# so that we scale down workers and free up connections
|
||||
schedule['pool_cleanup'] = {'control': self.pool.cleanup, 'schedule': timedelta(seconds=60)}
|
||||
# record subsystem metrics for the dispatcher
|
||||
schedule['metrics_gather'] = {'control': self.record_metrics, 'schedule': timedelta(seconds=20)}
|
||||
self.scheduler = Scheduler(schedule)
|
||||
|
||||
def record_metrics(self):
|
||||
current_time = time.time()
|
||||
self.pool.produce_subsystem_metrics(self.subsystem_metrics)
|
||||
self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather))
|
||||
self.subsystem_metrics.pipe_execute()
|
||||
self.listen_cumulative_time = 0.0
|
||||
self.last_metrics_gather = current_time
|
||||
|
||||
def run_periodic_tasks(self):
|
||||
self.record_statistics() # maintains time buffer in method
|
||||
"""
|
||||
Run general periodic logic, and return maximum time in seconds before
|
||||
the next requested run
|
||||
This may be called more often than that when events are consumed
|
||||
so this should be very efficient in that
|
||||
"""
|
||||
try:
|
||||
self.record_statistics() # maintains time buffer in method
|
||||
except Exception as exc:
|
||||
logger.warning(f'Failed to save dispatcher statistics {exc}')
|
||||
|
||||
current_time = time.time()
|
||||
if current_time - self.last_cleanup > 60: # same as cluster_node_heartbeat
|
||||
# NOTE: if we run out of database connections, it is important to still run cleanup
|
||||
# so that we scale down workers and free up connections
|
||||
self.pool.cleanup()
|
||||
self.last_cleanup = current_time
|
||||
for job in self.scheduler.get_and_mark_pending():
|
||||
if 'control' in job.data:
|
||||
try:
|
||||
job.data['control']()
|
||||
except Exception:
|
||||
logger.exception(f'Error running control task {job.data}')
|
||||
elif 'task' in job.data:
|
||||
body = self.worker.resolve_callable(job.data['task']).get_async_body()
|
||||
# bypasses pg_notify for scheduled tasks
|
||||
self.dispatch_task(body)
|
||||
|
||||
# record subsystem metrics for the dispatcher
|
||||
if current_time - self.last_metrics_gather > 20:
|
||||
try:
|
||||
self.pool.produce_subsystem_metrics(self.subsystem_metrics)
|
||||
self.subsystem_metrics.set('dispatcher_availability', self.listen_cumulative_time / (current_time - self.last_metrics_gather))
|
||||
self.subsystem_metrics.pipe_execute()
|
||||
except Exception:
|
||||
logger.exception(f"encountered an error trying to store {self.name} metrics")
|
||||
self.listen_cumulative_time = 0.0
|
||||
self.last_metrics_gather = current_time
|
||||
self.pg_is_down = False
|
||||
self.listen_start = time.time()
|
||||
|
||||
return self.scheduler.time_until_next_run()
|
||||
|
||||
def run(self, *args, **kwargs):
|
||||
super(AWXConsumerPG, self).run(*args, **kwargs)
|
||||
@@ -197,14 +232,15 @@ class AWXConsumerPG(AWXConsumerBase):
|
||||
if init is False:
|
||||
self.worker.on_start()
|
||||
init = True
|
||||
self.listen_start = time.time()
|
||||
# run_periodic_tasks run scheduled actions and gives time until next scheduled action
|
||||
# this is saved to the conn (PubSub) object in order to modify read timeout in-loop
|
||||
conn.select_timeout = self.run_periodic_tasks()
|
||||
# this is the main operational loop for awx-manage run_dispatcher
|
||||
for e in conn.events(yield_timeouts=True):
|
||||
self.listen_cumulative_time += time.time() - self.listen_start
|
||||
self.listen_cumulative_time += time.time() - self.listen_start # for metrics
|
||||
if e is not None:
|
||||
self.process_task(json.loads(e.payload))
|
||||
self.run_periodic_tasks()
|
||||
self.pg_is_down = False
|
||||
self.listen_start = time.time()
|
||||
conn.select_timeout = self.run_periodic_tasks()
|
||||
if self.should_stop:
|
||||
return
|
||||
except psycopg.InterfaceError:
|
||||
|
||||
@@ -3,15 +3,13 @@
|
||||
import logging
|
||||
import yaml
|
||||
|
||||
from django.core.cache import cache as django_cache
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db import connection as django_connection
|
||||
|
||||
from awx.main.dispatch import get_task_queuename
|
||||
from awx.main.dispatch.control import Control
|
||||
from awx.main.dispatch.pool import AutoscalePool
|
||||
from awx.main.dispatch.worker import AWXConsumerPG, TaskWorker
|
||||
from awx.main.dispatch import periodic
|
||||
|
||||
logger = logging.getLogger('awx.main.dispatch')
|
||||
|
||||
@@ -21,6 +19,7 @@ class Command(BaseCommand):
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('--status', dest='status', action='store_true', help='print the internal state of any running dispatchers')
|
||||
parser.add_argument('--schedule', dest='schedule', action='store_true', help='print the current status of schedules being ran by dispatcher')
|
||||
parser.add_argument('--running', dest='running', action='store_true', help='print the UUIDs of any tasked managed by this dispatcher')
|
||||
parser.add_argument(
|
||||
'--reload',
|
||||
@@ -42,6 +41,9 @@ class Command(BaseCommand):
|
||||
if options.get('status'):
|
||||
print(Control('dispatcher').status())
|
||||
return
|
||||
if options.get('schedule'):
|
||||
print(Control('dispatcher').schedule())
|
||||
return
|
||||
if options.get('running'):
|
||||
print(Control('dispatcher').running())
|
||||
return
|
||||
@@ -58,21 +60,11 @@ class Command(BaseCommand):
|
||||
print(Control('dispatcher').cancel(cancel_data))
|
||||
return
|
||||
|
||||
# It's important to close these because we're _about_ to fork, and we
|
||||
# don't want the forked processes to inherit the open sockets
|
||||
# for the DB and cache connections (that way lies race conditions)
|
||||
django_connection.close()
|
||||
django_cache.close()
|
||||
|
||||
# spawn a daemon thread to periodically enqueues scheduled tasks
|
||||
# (like the node heartbeat)
|
||||
periodic.run_continuously()
|
||||
|
||||
consumer = None
|
||||
|
||||
try:
|
||||
queues = ['tower_broadcast_all', 'tower_settings_change', get_task_queuename()]
|
||||
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4))
|
||||
consumer = AWXConsumerPG('dispatcher', TaskWorker(), queues, AutoscalePool(min_workers=4), schedule=settings.CELERYBEAT_SCHEDULE)
|
||||
consumer.run()
|
||||
except KeyboardInterrupt:
|
||||
logger.debug('Terminating Task Dispatcher')
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Generated by Django 4.2 on 2023-06-09 19:51
|
||||
# Generated by Django 4.2.3 on 2023-08-02 13:18
|
||||
|
||||
import awx.main.models.notifications
|
||||
from django.db import migrations, models
|
||||
@@ -11,16 +11,6 @@ class Migration(migrations.Migration):
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name='activitystream',
|
||||
name='deleted_actor',
|
||||
field=models.JSONField(null=True),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='activitystream',
|
||||
name='setting',
|
||||
field=models.JSONField(blank=True, default=dict),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='instancegroup',
|
||||
name='policy_instance_list',
|
||||
@@ -28,31 +18,11 @@ class Migration(migrations.Migration):
|
||||
blank=True, default=list, help_text='List of exact-match Instances that will always be automatically assigned to this group'
|
||||
),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='job',
|
||||
name='survey_passwords',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='joblaunchconfig',
|
||||
name='char_prompts',
|
||||
field=models.JSONField(blank=True, default=dict),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='joblaunchconfig',
|
||||
name='survey_passwords',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='jobtemplate',
|
||||
name='survey_spec',
|
||||
field=models.JSONField(blank=True, default=dict),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='notification',
|
||||
name='body',
|
||||
field=models.JSONField(blank=True, default=dict),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='notificationtemplate',
|
||||
name='messages',
|
||||
@@ -94,31 +64,6 @@ class Migration(migrations.Migration):
|
||||
name='survey_passwords',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='unifiedjob',
|
||||
name='job_env',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='workflowjob',
|
||||
name='char_prompts',
|
||||
field=models.JSONField(blank=True, default=dict),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='workflowjob',
|
||||
name='survey_passwords',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='workflowjobnode',
|
||||
name='char_prompts',
|
||||
field=models.JSONField(blank=True, default=dict),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='workflowjobnode',
|
||||
name='survey_passwords',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='workflowjobtemplate',
|
||||
name='char_prompts',
|
||||
@@ -139,4 +84,194 @@ class Migration(migrations.Migration):
|
||||
name='survey_passwords',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
# These are potentially a problem. Move the existing fields
|
||||
# aside while pretending like they've been deleted, then add
|
||||
# in fresh empty fields. Make the old fields nullable where
|
||||
# needed while we are at it, so that new rows don't hit
|
||||
# IntegrityError. We'll do the data migration out-of-band
|
||||
# using a task.
|
||||
migrations.RunSQL( # Already nullable
|
||||
"ALTER TABLE main_activitystream RENAME deleted_actor TO deleted_actor_old;",
|
||||
state_operations=[
|
||||
migrations.RemoveField(
|
||||
model_name='activitystream',
|
||||
name='deleted_actor',
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='activitystream',
|
||||
name='deleted_actor',
|
||||
field=models.JSONField(null=True),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
ALTER TABLE main_activitystream RENAME setting TO setting_old;
|
||||
ALTER TABLE main_activitystream ALTER COLUMN setting_old DROP NOT NULL;
|
||||
""",
|
||||
state_operations=[
|
||||
migrations.RemoveField(
|
||||
model_name='activitystream',
|
||||
name='setting',
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='activitystream',
|
||||
name='setting',
|
||||
field=models.JSONField(blank=True, default=dict),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
ALTER TABLE main_job RENAME survey_passwords TO survey_passwords_old;
|
||||
ALTER TABLE main_job ALTER COLUMN survey_passwords_old DROP NOT NULL;
|
||||
""",
|
||||
state_operations=[
|
||||
migrations.RemoveField(
|
||||
model_name='job',
|
||||
name='survey_passwords',
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='job',
|
||||
name='survey_passwords',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
ALTER TABLE main_joblaunchconfig RENAME char_prompts TO char_prompts_old;
|
||||
ALTER TABLE main_joblaunchconfig ALTER COLUMN char_prompts_old DROP NOT NULL;
|
||||
""",
|
||||
state_operations=[
|
||||
migrations.RemoveField(
|
||||
model_name='joblaunchconfig',
|
||||
name='char_prompts',
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='joblaunchconfig',
|
||||
name='char_prompts',
|
||||
field=models.JSONField(blank=True, default=dict),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
ALTER TABLE main_joblaunchconfig RENAME survey_passwords TO survey_passwords_old;
|
||||
ALTER TABLE main_joblaunchconfig ALTER COLUMN survey_passwords_old DROP NOT NULL;
|
||||
""",
|
||||
state_operations=[
|
||||
migrations.RemoveField(
|
||||
model_name='joblaunchconfig',
|
||||
name='survey_passwords',
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='joblaunchconfig',
|
||||
name='survey_passwords',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
ALTER TABLE main_notification RENAME body TO body_old;
|
||||
ALTER TABLE main_notification ALTER COLUMN body_old DROP NOT NULL;
|
||||
""",
|
||||
state_operations=[
|
||||
migrations.RemoveField(
|
||||
model_name='notification',
|
||||
name='body',
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='notification',
|
||||
name='body',
|
||||
field=models.JSONField(blank=True, default=dict),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
ALTER TABLE main_unifiedjob RENAME job_env TO job_env_old;
|
||||
ALTER TABLE main_unifiedjob ALTER COLUMN job_env_old DROP NOT NULL;
|
||||
""",
|
||||
state_operations=[
|
||||
migrations.RemoveField(
|
||||
model_name='unifiedjob',
|
||||
name='job_env',
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='unifiedjob',
|
||||
name='job_env',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
ALTER TABLE main_workflowjob RENAME char_prompts TO char_prompts_old;
|
||||
ALTER TABLE main_workflowjob ALTER COLUMN char_prompts_old DROP NOT NULL;
|
||||
""",
|
||||
state_operations=[
|
||||
migrations.RemoveField(
|
||||
model_name='workflowjob',
|
||||
name='char_prompts',
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='workflowjob',
|
||||
name='char_prompts',
|
||||
field=models.JSONField(blank=True, default=dict),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
ALTER TABLE main_workflowjob RENAME survey_passwords TO survey_passwords_old;
|
||||
ALTER TABLE main_workflowjob ALTER COLUMN survey_passwords_old DROP NOT NULL;
|
||||
""",
|
||||
state_operations=[
|
||||
migrations.RemoveField(
|
||||
model_name='workflowjob',
|
||||
name='survey_passwords',
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='workflowjob',
|
||||
name='survey_passwords',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
ALTER TABLE main_workflowjobnode RENAME char_prompts TO char_prompts_old;
|
||||
ALTER TABLE main_workflowjobnode ALTER COLUMN char_prompts_old DROP NOT NULL;
|
||||
""",
|
||||
state_operations=[
|
||||
migrations.RemoveField(
|
||||
model_name='workflowjobnode',
|
||||
name='char_prompts',
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='workflowjobnode',
|
||||
name='char_prompts',
|
||||
field=models.JSONField(blank=True, default=dict),
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
ALTER TABLE main_workflowjobnode RENAME survey_passwords TO survey_passwords_old;
|
||||
ALTER TABLE main_workflowjobnode ALTER COLUMN survey_passwords_old DROP NOT NULL;
|
||||
""",
|
||||
state_operations=[
|
||||
migrations.RemoveField(
|
||||
model_name='workflowjobnode',
|
||||
name='survey_passwords',
|
||||
),
|
||||
],
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='workflowjobnode',
|
||||
name='survey_passwords',
|
||||
field=models.JSONField(blank=True, default=dict, editable=False),
|
||||
),
|
||||
]
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
# Django
|
||||
from django.conf import settings # noqa
|
||||
from django.db import connection
|
||||
from django.db.models.signals import pre_delete # noqa
|
||||
|
||||
# AWX
|
||||
@@ -99,6 +100,58 @@ User.add_to_class('can_access_with_errors', check_user_access_with_errors)
|
||||
User.add_to_class('accessible_objects', user_accessible_objects)
|
||||
|
||||
|
||||
def convert_jsonfields():
|
||||
if connection.vendor != 'postgresql':
|
||||
return
|
||||
|
||||
# fmt: off
|
||||
fields = [
|
||||
('main_activitystream', 'id', (
|
||||
'deleted_actor',
|
||||
'setting',
|
||||
)),
|
||||
('main_job', 'unifiedjob_ptr_id', (
|
||||
'survey_passwords',
|
||||
)),
|
||||
('main_joblaunchconfig', 'id', (
|
||||
'char_prompts',
|
||||
'survey_passwords',
|
||||
)),
|
||||
('main_notification', 'id', (
|
||||
'body',
|
||||
)),
|
||||
('main_unifiedjob', 'id', (
|
||||
'job_env',
|
||||
)),
|
||||
('main_workflowjob', 'unifiedjob_ptr_id', (
|
||||
'char_prompts',
|
||||
'survey_passwords',
|
||||
)),
|
||||
('main_workflowjobnode', 'id', (
|
||||
'char_prompts',
|
||||
'survey_passwords',
|
||||
)),
|
||||
]
|
||||
# fmt: on
|
||||
|
||||
with connection.cursor() as cursor:
|
||||
for table, pkfield, columns in fields:
|
||||
# Do the renamed old columns still exist? If so, run the task.
|
||||
old_columns = ','.join(f"'{column}_old'" for column in columns)
|
||||
cursor.execute(
|
||||
f"""
|
||||
select count(1) from information_schema.columns
|
||||
where
|
||||
table_name = %s and column_name in ({old_columns});
|
||||
""",
|
||||
(table,),
|
||||
)
|
||||
if cursor.fetchone()[0]:
|
||||
from awx.main.tasks.system import migrate_jsonfield
|
||||
|
||||
migrate_jsonfield.apply_async([table, pkfield, columns])
|
||||
|
||||
|
||||
def cleanup_created_modified_by(sender, **kwargs):
|
||||
# work around a bug in django-polymorphic that doesn't properly
|
||||
# handle cascades for reverse foreign keys on the polymorphic base model
|
||||
|
||||
@@ -31,6 +31,7 @@ class RunnerCallback:
|
||||
self.model = model
|
||||
self.update_attempts = int(settings.DISPATCHER_DB_DOWNTOWN_TOLLERANCE / 5)
|
||||
self.wrapup_event_dispatched = False
|
||||
self.artifacts_processed = False
|
||||
self.extra_update_fields = {}
|
||||
|
||||
def update_model(self, pk, _attempt=0, **updates):
|
||||
@@ -211,6 +212,9 @@ class RunnerCallback:
|
||||
if result_traceback:
|
||||
self.delay_update(result_traceback=result_traceback)
|
||||
|
||||
def artifacts_handler(self, artifact_dir):
|
||||
self.artifacts_processed = True
|
||||
|
||||
|
||||
class RunnerCallbackForProjectUpdate(RunnerCallback):
|
||||
def __init__(self, *args, **kwargs):
|
||||
|
||||
@@ -9,6 +9,7 @@ from django.conf import settings
|
||||
from django.db.models.query import QuerySet
|
||||
from django.utils.encoding import smart_str
|
||||
from django.utils.timezone import now
|
||||
from django.db import OperationalError
|
||||
|
||||
# AWX
|
||||
from awx.main.utils.common import log_excess_runtime
|
||||
@@ -57,6 +58,28 @@ def start_fact_cache(hosts, destination, log_data, timeout=None, inventory_id=No
|
||||
return None
|
||||
|
||||
|
||||
def raw_update_hosts(host_list):
|
||||
Host.objects.bulk_update(host_list, ['ansible_facts', 'ansible_facts_modified'])
|
||||
|
||||
|
||||
def update_hosts(host_list, max_tries=5):
|
||||
if not host_list:
|
||||
return
|
||||
for i in range(max_tries):
|
||||
try:
|
||||
raw_update_hosts(host_list)
|
||||
except OperationalError as exc:
|
||||
# Deadlocks can happen if this runs at the same time as another large query
|
||||
# inventory updates and updating last_job_host_summary are candidates for conflict
|
||||
# but these would resolve easily on a retry
|
||||
if i + 1 < max_tries:
|
||||
logger.info(f'OperationalError (suspected deadlock) saving host facts retry {i}, message: {exc}')
|
||||
continue
|
||||
else:
|
||||
raise
|
||||
break
|
||||
|
||||
|
||||
@log_excess_runtime(
|
||||
logger,
|
||||
debug_cutoff=0.01,
|
||||
@@ -111,7 +134,6 @@ def finish_fact_cache(hosts, destination, facts_write_time, log_data, job_id=Non
|
||||
system_tracking_logger.info('Facts cleared for inventory {} host {}'.format(smart_str(host.inventory.name), smart_str(host.name)))
|
||||
log_data['cleared_ct'] += 1
|
||||
if len(hosts_to_update) > 100:
|
||||
Host.objects.bulk_update(hosts_to_update, ['ansible_facts', 'ansible_facts_modified'])
|
||||
update_hosts(hosts_to_update)
|
||||
hosts_to_update = []
|
||||
if hosts_to_update:
|
||||
Host.objects.bulk_update(hosts_to_update, ['ansible_facts', 'ansible_facts_modified'])
|
||||
update_hosts(hosts_to_update)
|
||||
|
||||
@@ -1094,7 +1094,7 @@ class RunJob(SourceControlMixin, BaseTask):
|
||||
# actual `run()` call; this _usually_ means something failed in
|
||||
# the pre_run_hook method
|
||||
return
|
||||
if self.should_use_fact_cache():
|
||||
if self.should_use_fact_cache() and self.runner_callback.artifacts_processed:
|
||||
job.log_lifecycle("finish_job_fact_cache")
|
||||
finish_fact_cache(
|
||||
job.get_hosts_for_fact_cache(),
|
||||
|
||||
@@ -464,6 +464,7 @@ class AWXReceptorJob:
|
||||
event_handler=self.task.runner_callback.event_handler,
|
||||
finished_callback=self.task.runner_callback.finished_callback,
|
||||
status_handler=self.task.runner_callback.status_handler,
|
||||
artifacts_handler=self.task.runner_callback.artifacts_handler,
|
||||
**self.runner_params,
|
||||
)
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
from collections import namedtuple
|
||||
import functools
|
||||
import importlib
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -14,7 +15,7 @@ from datetime import datetime
|
||||
|
||||
# Django
|
||||
from django.conf import settings
|
||||
from django.db import transaction, DatabaseError, IntegrityError
|
||||
from django.db import connection, transaction, DatabaseError, IntegrityError
|
||||
from django.db.models.fields.related import ForeignKey
|
||||
from django.utils.timezone import now, timedelta
|
||||
from django.utils.encoding import smart_str
|
||||
@@ -48,6 +49,7 @@ from awx.main.models import (
|
||||
SmartInventoryMembership,
|
||||
Job,
|
||||
HostMetric,
|
||||
convert_jsonfields,
|
||||
)
|
||||
from awx.main.constants import ACTIVE_STATES
|
||||
from awx.main.dispatch.publish import task
|
||||
@@ -86,6 +88,11 @@ def dispatch_startup():
|
||||
if settings.IS_K8S:
|
||||
write_receptor_config()
|
||||
|
||||
try:
|
||||
convert_jsonfields()
|
||||
except Exception:
|
||||
logger.exception("Failed json field conversion, skipping.")
|
||||
|
||||
startup_logger.debug("Syncing Schedules")
|
||||
for sch in Schedule.objects.all():
|
||||
try:
|
||||
@@ -129,6 +136,52 @@ def inform_cluster_of_shutdown():
|
||||
logger.exception('Encountered problem with normal shutdown signal.')
|
||||
|
||||
|
||||
@task(queue=get_task_queuename)
|
||||
def migrate_jsonfield(table, pkfield, columns):
|
||||
batchsize = 10000
|
||||
with advisory_lock(f'json_migration_{table}', wait=False) as acquired:
|
||||
if not acquired:
|
||||
return
|
||||
|
||||
from django.db.migrations.executor import MigrationExecutor
|
||||
|
||||
# If Django is currently running migrations, wait until it is done.
|
||||
while True:
|
||||
executor = MigrationExecutor(connection)
|
||||
if not executor.migration_plan(executor.loader.graph.leaf_nodes()):
|
||||
break
|
||||
time.sleep(120)
|
||||
|
||||
logger.warning(f"Migrating json fields for {table}: {', '.join(columns)}")
|
||||
|
||||
with connection.cursor() as cursor:
|
||||
for i in itertools.count(0, batchsize):
|
||||
# Are there even any rows in the table beyond this point?
|
||||
cursor.execute(f"select count(1) from {table} where {pkfield} >= %s limit 1;", (i,))
|
||||
if not cursor.fetchone()[0]:
|
||||
break
|
||||
|
||||
column_expr = ', '.join(f"{colname} = {colname}_old::jsonb" for colname in columns)
|
||||
# If any of the old columns have non-null values, the data needs to be cast and copied over.
|
||||
empty_expr = ' or '.join(f"{colname}_old is not null" for colname in columns)
|
||||
cursor.execute( # Only clobber the new fields if there is non-null data in the old ones.
|
||||
f"""
|
||||
update {table}
|
||||
set {column_expr}
|
||||
where {pkfield} >= %s and {pkfield} < %s
|
||||
and {empty_expr};
|
||||
""",
|
||||
(i, i + batchsize),
|
||||
)
|
||||
rows = cursor.rowcount
|
||||
logger.debug(f"Batch {i} to {i + batchsize} copied on {table}, {rows} rows affected.")
|
||||
|
||||
column_expr = ', '.join(f"DROP COLUMN {column}_old" for column in columns)
|
||||
cursor.execute(f"ALTER TABLE {table} {column_expr};")
|
||||
|
||||
logger.warning(f"Migration of {table} to jsonb is finished.")
|
||||
|
||||
|
||||
@task(queue=get_task_queuename)
|
||||
def apply_cluster_membership_policies():
|
||||
from awx.main.signals import disable_activity_stream
|
||||
|
||||
@@ -3,6 +3,7 @@ import multiprocessing
|
||||
import random
|
||||
import signal
|
||||
import time
|
||||
import yaml
|
||||
from unittest import mock
|
||||
|
||||
from django.utils.timezone import now as tz_now
|
||||
@@ -13,6 +14,7 @@ from awx.main.dispatch import reaper
|
||||
from awx.main.dispatch.pool import StatefulPoolWorker, WorkerPool, AutoscalePool
|
||||
from awx.main.dispatch.publish import task
|
||||
from awx.main.dispatch.worker import BaseWorker, TaskWorker
|
||||
from awx.main.dispatch.periodic import Scheduler
|
||||
|
||||
|
||||
'''
|
||||
@@ -439,3 +441,76 @@ class TestJobReaper(object):
|
||||
assert job.started > ref_time
|
||||
assert job.status == 'running'
|
||||
assert job.job_explanation == ''
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestScheduler:
|
||||
def test_too_many_schedules_freak_out(self):
|
||||
with pytest.raises(RuntimeError):
|
||||
Scheduler({'job1': {'schedule': datetime.timedelta(seconds=1)}, 'job2': {'schedule': datetime.timedelta(seconds=1)}})
|
||||
|
||||
def test_spread_out(self):
|
||||
scheduler = Scheduler(
|
||||
{
|
||||
'job1': {'schedule': datetime.timedelta(seconds=16)},
|
||||
'job2': {'schedule': datetime.timedelta(seconds=16)},
|
||||
'job3': {'schedule': datetime.timedelta(seconds=16)},
|
||||
'job4': {'schedule': datetime.timedelta(seconds=16)},
|
||||
}
|
||||
)
|
||||
assert [job.offset for job in scheduler.jobs] == [0, 4, 8, 12]
|
||||
|
||||
def test_missed_schedule(self, mocker):
|
||||
scheduler = Scheduler({'job1': {'schedule': datetime.timedelta(seconds=10)}})
|
||||
assert scheduler.jobs[0].missed_runs(time.time() - scheduler.global_start) == 0
|
||||
mocker.patch('awx.main.dispatch.periodic.time.time', return_value=scheduler.global_start + 50)
|
||||
scheduler.get_and_mark_pending()
|
||||
assert scheduler.jobs[0].missed_runs(50) > 1
|
||||
|
||||
def test_advance_schedule(self, mocker):
|
||||
scheduler = Scheduler(
|
||||
{
|
||||
'job1': {'schedule': datetime.timedelta(seconds=30)},
|
||||
'joba': {'schedule': datetime.timedelta(seconds=20)},
|
||||
'jobb': {'schedule': datetime.timedelta(seconds=20)},
|
||||
}
|
||||
)
|
||||
for job in scheduler.jobs:
|
||||
# HACK: the offsets automatically added make this a hard test to write... so remove offsets
|
||||
job.offset = 0.0
|
||||
mocker.patch('awx.main.dispatch.periodic.time.time', return_value=scheduler.global_start + 29)
|
||||
to_run = scheduler.get_and_mark_pending()
|
||||
assert set(job.name for job in to_run) == set(['joba', 'jobb'])
|
||||
mocker.patch('awx.main.dispatch.periodic.time.time', return_value=scheduler.global_start + 39)
|
||||
to_run = scheduler.get_and_mark_pending()
|
||||
assert len(to_run) == 1
|
||||
assert to_run[0].name == 'job1'
|
||||
|
||||
@staticmethod
|
||||
def get_job(scheduler, name):
|
||||
for job in scheduler.jobs:
|
||||
if job.name == name:
|
||||
return job
|
||||
|
||||
def test_scheduler_debug(self, mocker):
|
||||
scheduler = Scheduler(
|
||||
{
|
||||
'joba': {'schedule': datetime.timedelta(seconds=20)},
|
||||
'jobb': {'schedule': datetime.timedelta(seconds=50)},
|
||||
'jobc': {'schedule': datetime.timedelta(seconds=500)},
|
||||
'jobd': {'schedule': datetime.timedelta(seconds=20)},
|
||||
}
|
||||
)
|
||||
rel_time = 119.9 # slightly under the 6th 20-second bin, to avoid offset problems
|
||||
current_time = scheduler.global_start + rel_time
|
||||
mocker.patch('awx.main.dispatch.periodic.time.time', return_value=current_time - 1.0e-8)
|
||||
self.get_job(scheduler, 'jobb').mark_run(rel_time)
|
||||
self.get_job(scheduler, 'jobd').mark_run(rel_time - 20.0)
|
||||
|
||||
output = scheduler.debug()
|
||||
data = yaml.safe_load(output)
|
||||
assert data['schedule_list']['jobc']['last_run_seconds_ago'] is None
|
||||
assert data['schedule_list']['joba']['missed_runs'] == 4
|
||||
assert data['schedule_list']['jobd']['missed_runs'] == 3
|
||||
assert data['schedule_list']['jobd']['completed_runs'] == 1
|
||||
assert data['schedule_list']['jobb']['next_run_in_seconds'] > 25.0
|
||||
|
||||
@@ -6,6 +6,7 @@ import json
|
||||
from awx.main.models import (
|
||||
Job,
|
||||
Instance,
|
||||
Host,
|
||||
JobHostSummary,
|
||||
InventoryUpdate,
|
||||
InventorySource,
|
||||
@@ -18,6 +19,9 @@ from awx.main.models import (
|
||||
ExecutionEnvironment,
|
||||
)
|
||||
from awx.main.tasks.system import cluster_node_heartbeat
|
||||
from awx.main.tasks.facts import update_hosts
|
||||
|
||||
from django.db import OperationalError
|
||||
from django.test.utils import override_settings
|
||||
|
||||
|
||||
@@ -112,6 +116,51 @@ def test_job_notification_host_data(inventory, machine_credential, project, job_
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestAnsibleFactsSave:
|
||||
current_call = 0
|
||||
|
||||
def test_update_hosts_deleted_host(self, inventory):
|
||||
hosts = [Host.objects.create(inventory=inventory, name=f'foo{i}') for i in range(3)]
|
||||
for host in hosts:
|
||||
host.ansible_facts = {'foo': 'bar'}
|
||||
last_pk = hosts[-1].pk
|
||||
assert inventory.hosts.count() == 3
|
||||
Host.objects.get(pk=last_pk).delete()
|
||||
assert inventory.hosts.count() == 2
|
||||
update_hosts(hosts)
|
||||
assert inventory.hosts.count() == 2
|
||||
for host in inventory.hosts.all():
|
||||
host.refresh_from_db()
|
||||
assert host.ansible_facts == {'foo': 'bar'}
|
||||
|
||||
def test_update_hosts_forever_deadlock(self, inventory, mocker):
|
||||
hosts = [Host.objects.create(inventory=inventory, name=f'foo{i}') for i in range(3)]
|
||||
for host in hosts:
|
||||
host.ansible_facts = {'foo': 'bar'}
|
||||
db_mock = mocker.patch('awx.main.tasks.facts.Host.objects.bulk_update')
|
||||
db_mock.side_effect = OperationalError('deadlock detected')
|
||||
with pytest.raises(OperationalError):
|
||||
update_hosts(hosts)
|
||||
|
||||
def fake_bulk_update(self, host_list):
|
||||
if self.current_call > 2:
|
||||
return Host.objects.bulk_update(host_list, ['ansible_facts', 'ansible_facts_modified'])
|
||||
self.current_call += 1
|
||||
raise OperationalError('deadlock detected')
|
||||
|
||||
def test_update_hosts_resolved_deadlock(self, inventory, mocker):
|
||||
hosts = [Host.objects.create(inventory=inventory, name=f'foo{i}') for i in range(3)]
|
||||
for host in hosts:
|
||||
host.ansible_facts = {'foo': 'bar'}
|
||||
self.current_call = 0
|
||||
mocker.patch('awx.main.tasks.facts.raw_update_hosts', new=self.fake_bulk_update)
|
||||
update_hosts(hosts)
|
||||
for host in inventory.hosts.all():
|
||||
host.refresh_from_db()
|
||||
assert host.ansible_facts == {'foo': 'bar'}
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
class TestLaunchConfig:
|
||||
def test_null_creation_from_prompts(self):
|
||||
|
||||
@@ -97,8 +97,6 @@ class SpecialInventoryHandler(logging.Handler):
|
||||
self.event_handler(dispatch_data)
|
||||
|
||||
|
||||
ColorHandler = logging.StreamHandler
|
||||
|
||||
if settings.COLOR_LOGS is True:
|
||||
try:
|
||||
from logutils.colorize import ColorizingStreamHandler
|
||||
@@ -133,3 +131,5 @@ if settings.COLOR_LOGS is True:
|
||||
except ImportError:
|
||||
# logutils is only used for colored logs in the dev environment
|
||||
pass
|
||||
else:
|
||||
ColorHandler = logging.StreamHandler
|
||||
|
||||
@@ -175,7 +175,12 @@ class Licenser(object):
|
||||
license.setdefault('pool_id', sub['pool']['id'])
|
||||
license.setdefault('product_name', sub['pool']['productName'])
|
||||
license.setdefault('valid_key', True)
|
||||
license.setdefault('license_type', 'enterprise')
|
||||
if sub['pool']['productId'].startswith('S'):
|
||||
license.setdefault('trial', True)
|
||||
license.setdefault('license_type', 'trial')
|
||||
else:
|
||||
license.setdefault('trial', False)
|
||||
license.setdefault('license_type', 'enterprise')
|
||||
license.setdefault('satellite', False)
|
||||
# Use the nearest end date
|
||||
endDate = parse_date(sub['endDate'])
|
||||
@@ -287,7 +292,7 @@ class Licenser(object):
|
||||
license['productId'] = sub['product_id']
|
||||
license['quantity'] = int(sub['quantity'])
|
||||
license['support_level'] = sub['support_level']
|
||||
license['usage'] = sub['usage']
|
||||
license['usage'] = sub.get('usage')
|
||||
license['subscription_name'] = sub['name']
|
||||
license['subscriptionId'] = sub['subscription_id']
|
||||
license['accountNumber'] = sub['account_number']
|
||||
|
||||
@@ -210,7 +210,7 @@ JOB_EVENT_WORKERS = 4
|
||||
|
||||
# The number of seconds to buffer callback receiver bulk
|
||||
# writes in memory before flushing via JobEvent.objects.bulk_create()
|
||||
JOB_EVENT_BUFFER_SECONDS = 0.1
|
||||
JOB_EVENT_BUFFER_SECONDS = 1
|
||||
|
||||
# The interval at which callback receiver statistics should be
|
||||
# recorded
|
||||
|
||||
@@ -28,8 +28,8 @@ SHELL_PLUS_PRINT_SQL = False
|
||||
|
||||
# show colored logs in the dev environment
|
||||
# to disable this, set `COLOR_LOGS = False` in awx/settings/local_settings.py
|
||||
LOGGING['handlers']['console']['()'] = 'awx.main.utils.handlers.ColorHandler' # noqa
|
||||
COLOR_LOGS = True
|
||||
LOGGING['handlers']['console']['()'] = 'awx.main.utils.handlers.ColorHandler' # noqa
|
||||
|
||||
ALLOWED_HOSTS = ['*']
|
||||
|
||||
|
||||
@@ -77,7 +77,7 @@ function PromptModalForm({
|
||||
}
|
||||
|
||||
if (launchConfig.ask_labels_on_launch) {
|
||||
const { labelIds } = createNewLabels(
|
||||
const { labelIds } = await createNewLabels(
|
||||
values.labels,
|
||||
resource.organization
|
||||
);
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import React, { useCallback, useEffect, useState } from 'react';
|
||||
import { useLocation } from 'react-router-dom';
|
||||
import { t } from '@lingui/macro';
|
||||
import { RolesAPI, TeamsAPI, UsersAPI, OrganizationsAPI } from 'api';
|
||||
import { RolesAPI, TeamsAPI, UsersAPI } from 'api';
|
||||
import { getQSConfig, parseQueryString } from 'util/qs';
|
||||
import useRequest, { useDeleteItems } from 'hooks/useRequest';
|
||||
import { useUserProfile, useConfig } from 'contexts/Config';
|
||||
import { useUserProfile } from 'contexts/Config';
|
||||
import AddResourceRole from '../AddRole/AddResourceRole';
|
||||
import AlertModal from '../AlertModal';
|
||||
import DataListToolbar from '../DataListToolbar';
|
||||
@@ -25,8 +25,7 @@ const QS_CONFIG = getQSConfig('access', {
|
||||
});
|
||||
|
||||
function ResourceAccessList({ apiModel, resource }) {
|
||||
const { isSuperUser, isOrgAdmin } = useUserProfile();
|
||||
const { me } = useConfig();
|
||||
const { isSuperUser } = useUserProfile();
|
||||
const [submitError, setSubmitError] = useState(null);
|
||||
const [deletionRecord, setDeletionRecord] = useState(null);
|
||||
const [deletionRole, setDeletionRole] = useState(null);
|
||||
@@ -34,42 +33,15 @@ function ResourceAccessList({ apiModel, resource }) {
|
||||
const [showDeleteModal, setShowDeleteModal] = useState(false);
|
||||
const location = useLocation();
|
||||
|
||||
const {
|
||||
isLoading: isFetchingOrgAdmins,
|
||||
error: errorFetchingOrgAdmins,
|
||||
request: fetchOrgAdmins,
|
||||
result: { isCredentialOrgAdmin },
|
||||
} = useRequest(
|
||||
useCallback(async () => {
|
||||
if (
|
||||
isSuperUser ||
|
||||
resource.type !== 'credential' ||
|
||||
!isOrgAdmin ||
|
||||
!resource?.organization
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
const {
|
||||
data: { count },
|
||||
} = await OrganizationsAPI.readAdmins(resource.organization, {
|
||||
id: me.id,
|
||||
});
|
||||
return { isCredentialOrgAdmin: !!count };
|
||||
}, [me.id, isOrgAdmin, isSuperUser, resource.type, resource.organization]),
|
||||
{
|
||||
isCredentialOrgAdmin: false,
|
||||
}
|
||||
);
|
||||
|
||||
useEffect(() => {
|
||||
fetchOrgAdmins();
|
||||
}, [fetchOrgAdmins]);
|
||||
|
||||
let canAddAdditionalControls = false;
|
||||
if (isSuperUser) {
|
||||
canAddAdditionalControls = true;
|
||||
}
|
||||
if (resource.type === 'credential' && isOrgAdmin && isCredentialOrgAdmin) {
|
||||
if (
|
||||
resource.type === 'credential' &&
|
||||
resource?.summary_fields?.user_capabilities?.edit &&
|
||||
resource?.organization
|
||||
) {
|
||||
canAddAdditionalControls = true;
|
||||
}
|
||||
if (resource.type !== 'credential') {
|
||||
@@ -195,8 +167,8 @@ function ResourceAccessList({ apiModel, resource }) {
|
||||
return (
|
||||
<>
|
||||
<PaginatedTable
|
||||
error={contentError || errorFetchingOrgAdmins}
|
||||
hasContentLoading={isLoading || isDeleteLoading || isFetchingOrgAdmins}
|
||||
error={contentError}
|
||||
hasContentLoading={isLoading || isDeleteLoading}
|
||||
items={accessRecords}
|
||||
itemCount={itemCount}
|
||||
pluralizedItemName={t`Roles`}
|
||||
|
||||
@@ -463,7 +463,7 @@ describe('<ResourceAccessList />', () => {
|
||||
expect(wrapper.find('ToolbarAddButton').length).toEqual(1);
|
||||
});
|
||||
|
||||
test('should not show add button for non system admin & non org admin', async () => {
|
||||
test('should not show add button for a user without edit permissions on the credential', async () => {
|
||||
useUserProfile.mockImplementation(() => {
|
||||
return {
|
||||
isSuperUser: false,
|
||||
@@ -476,7 +476,21 @@ describe('<ResourceAccessList />', () => {
|
||||
let wrapper;
|
||||
await act(async () => {
|
||||
wrapper = mountWithContexts(
|
||||
<ResourceAccessList resource={credential} apiModel={CredentialsAPI} />,
|
||||
<ResourceAccessList
|
||||
resource={{
|
||||
...credential,
|
||||
summary_fields: {
|
||||
...credential.summary_fields,
|
||||
user_capabilities: {
|
||||
edit: false,
|
||||
delete: false,
|
||||
copy: false,
|
||||
use: false,
|
||||
},
|
||||
},
|
||||
}}
|
||||
apiModel={CredentialsAPI}
|
||||
/>,
|
||||
{ context: { router: { credentialHistory } } }
|
||||
);
|
||||
});
|
||||
|
||||
@@ -47,7 +47,7 @@ export default function StatusLabel({ status, tooltipContent = '', children }) {
|
||||
unreachable: t`Unreachable`,
|
||||
running: t`Running`,
|
||||
pending: t`Pending`,
|
||||
skipped: t`Skipped'`,
|
||||
skipped: t`Skipped`,
|
||||
timedOut: t`Timed out`,
|
||||
waiting: t`Waiting`,
|
||||
disabled: t`Disabled`,
|
||||
|
||||
@@ -8722,8 +8722,8 @@ msgid "Skipped"
|
||||
msgstr "Skipped"
|
||||
|
||||
#: components/StatusLabel/StatusLabel.js:50
|
||||
msgid "Skipped'"
|
||||
msgstr "Skipped'"
|
||||
msgid "Skipped"
|
||||
msgstr "Skipped"
|
||||
|
||||
#: components/NotificationList/NotificationList.js:200
|
||||
#: screens/NotificationTemplate/NotificationTemplateList/NotificationTemplateList.js:141
|
||||
|
||||
@@ -8190,8 +8190,8 @@ msgid "Skipped"
|
||||
msgstr "Omitido"
|
||||
|
||||
#: components/StatusLabel/StatusLabel.js:50
|
||||
msgid "Skipped'"
|
||||
msgstr "Omitido'"
|
||||
msgid "Skipped"
|
||||
msgstr "Omitido"
|
||||
|
||||
#: components/NotificationList/NotificationList.js:200
|
||||
#: screens/NotificationTemplate/NotificationTemplateList/NotificationTemplateList.js:141
|
||||
|
||||
@@ -8078,7 +8078,7 @@ msgid "Skipped"
|
||||
msgstr "Ignoré"
|
||||
|
||||
#: components/StatusLabel/StatusLabel.js:50
|
||||
msgid "Skipped'"
|
||||
msgid "Skipped"
|
||||
msgstr "Ignoré"
|
||||
|
||||
#: components/NotificationList/NotificationList.js:200
|
||||
|
||||
@@ -8118,8 +8118,8 @@ msgid "Skipped"
|
||||
msgstr "スキップ済"
|
||||
|
||||
#: components/StatusLabel/StatusLabel.js:50
|
||||
msgid "Skipped'"
|
||||
msgstr "スキップ済'"
|
||||
msgid "Skipped"
|
||||
msgstr "スキップ済"
|
||||
|
||||
#: components/NotificationList/NotificationList.js:200
|
||||
#: screens/NotificationTemplate/NotificationTemplateList/NotificationTemplateList.js:141
|
||||
|
||||
@@ -8072,8 +8072,8 @@ msgid "Skipped"
|
||||
msgstr "건너뜀"
|
||||
|
||||
#: components/StatusLabel/StatusLabel.js:50
|
||||
msgid "Skipped'"
|
||||
msgstr "건너뜀'"
|
||||
msgid "Skipped"
|
||||
msgstr "건너뜀"
|
||||
|
||||
#: components/NotificationList/NotificationList.js:200
|
||||
#: screens/NotificationTemplate/NotificationTemplateList/NotificationTemplateList.js:141
|
||||
|
||||
@@ -8096,8 +8096,8 @@ msgid "Skipped"
|
||||
msgstr "Overgeslagen"
|
||||
|
||||
#: components/StatusLabel/StatusLabel.js:50
|
||||
msgid "Skipped'"
|
||||
msgstr "Overgeslagen'"
|
||||
msgid "Skipped"
|
||||
msgstr "Overgeslagen"
|
||||
|
||||
#: components/NotificationList/NotificationList.js:200
|
||||
#: screens/NotificationTemplate/NotificationTemplateList/NotificationTemplateList.js:141
|
||||
|
||||
@@ -8072,8 +8072,8 @@ msgid "Skipped"
|
||||
msgstr "跳过"
|
||||
|
||||
#: components/StatusLabel/StatusLabel.js:50
|
||||
msgid "Skipped'"
|
||||
msgstr "跳过'"
|
||||
msgid "Skipped"
|
||||
msgstr "跳过"
|
||||
|
||||
#: components/NotificationList/NotificationList.js:200
|
||||
#: screens/NotificationTemplate/NotificationTemplateList/NotificationTemplateList.js:141
|
||||
|
||||
@@ -8503,7 +8503,7 @@ msgid "Skipped"
|
||||
msgstr ""
|
||||
|
||||
#: components/StatusLabel/StatusLabel.js:50
|
||||
msgid "Skipped'"
|
||||
msgid "Skipped"
|
||||
msgstr ""
|
||||
|
||||
#: components/NotificationList/NotificationList.js:200
|
||||
|
||||
@@ -50,6 +50,11 @@ options:
|
||||
- If value not set, will try environment variable C(CONTROLLER_VERIFY_SSL) and then config files
|
||||
type: bool
|
||||
aliases: [ tower_verify_ssl ]
|
||||
request_timeout:
|
||||
description:
|
||||
- Specify the timeout Ansible should use in requests to the controller host.
|
||||
- Defaults to 10s, but this is handled by the shared module_utils code
|
||||
type: float
|
||||
controller_config_file:
|
||||
description:
|
||||
- Path to the controller config file.
|
||||
|
||||
@@ -68,6 +68,14 @@ options:
|
||||
why: Collection name change
|
||||
alternatives: 'CONTROLLER_VERIFY_SSL'
|
||||
aliases: [ validate_certs ]
|
||||
request_timeout:
|
||||
description:
|
||||
- Specify the timeout Ansible should use in requests to the controller host.
|
||||
- Defaults to 10 seconds
|
||||
- This will not work with the export or import modules.
|
||||
type: float
|
||||
env:
|
||||
- name: CONTROLLER_REQUEST_TIMEOUT
|
||||
|
||||
notes:
|
||||
- If no I(config_file) is provided we will attempt to use the tower-cli library
|
||||
|
||||
@@ -51,6 +51,7 @@ class ControllerModule(AnsibleModule):
|
||||
controller_username=dict(required=False, aliases=['tower_username'], fallback=(env_fallback, ['CONTROLLER_USERNAME', 'TOWER_USERNAME'])),
|
||||
controller_password=dict(no_log=True, aliases=['tower_password'], required=False, fallback=(env_fallback, ['CONTROLLER_PASSWORD', 'TOWER_PASSWORD'])),
|
||||
validate_certs=dict(type='bool', aliases=['tower_verify_ssl'], required=False, fallback=(env_fallback, ['CONTROLLER_VERIFY_SSL', 'TOWER_VERIFY_SSL'])),
|
||||
request_timeout=dict(type='float', required=False, fallback=(env_fallback, ['CONTROLLER_REQUEST_TIMEOUT'])),
|
||||
controller_oauthtoken=dict(
|
||||
type='raw', no_log=True, aliases=['tower_oauthtoken'], required=False, fallback=(env_fallback, ['CONTROLLER_OAUTH_TOKEN', 'TOWER_OAUTH_TOKEN'])
|
||||
),
|
||||
@@ -63,12 +64,14 @@ class ControllerModule(AnsibleModule):
|
||||
'username': 'controller_username',
|
||||
'password': 'controller_password',
|
||||
'verify_ssl': 'validate_certs',
|
||||
'request_timeout': 'request_timeout',
|
||||
'oauth_token': 'controller_oauthtoken',
|
||||
}
|
||||
host = '127.0.0.1'
|
||||
username = None
|
||||
password = None
|
||||
verify_ssl = True
|
||||
request_timeout = 10
|
||||
oauth_token = None
|
||||
oauth_token_id = None
|
||||
authenticated = False
|
||||
@@ -304,7 +307,7 @@ class ControllerAPIModule(ControllerModule):
|
||||
kwargs['supports_check_mode'] = True
|
||||
|
||||
super().__init__(argument_spec=argument_spec, direct_params=direct_params, error_callback=error_callback, warn_callback=warn_callback, **kwargs)
|
||||
self.session = Request(cookies=CookieJar(), validate_certs=self.verify_ssl)
|
||||
self.session = Request(cookies=CookieJar(), timeout=self.request_timeout, validate_certs=self.verify_ssl)
|
||||
|
||||
if 'update_secrets' in self.params:
|
||||
self.update_secrets = self.params.pop('update_secrets')
|
||||
@@ -500,7 +503,14 @@ class ControllerAPIModule(ControllerModule):
|
||||
data = dumps(kwargs.get('data', {}))
|
||||
|
||||
try:
|
||||
response = self.session.open(method, url.geturl(), headers=headers, validate_certs=self.verify_ssl, follow_redirects=True, data=data)
|
||||
response = self.session.open(
|
||||
method, url.geturl(),
|
||||
headers=headers,
|
||||
timeout=self.request_timeout,
|
||||
validate_certs=self.verify_ssl,
|
||||
follow_redirects=True,
|
||||
data=data
|
||||
)
|
||||
except (SSLValidationError) as ssl_err:
|
||||
self.fail_json(msg="Could not establish a secure connection to your host ({1}): {0}.".format(url.netloc, ssl_err))
|
||||
except (ConnectionError) as con_err:
|
||||
@@ -612,6 +622,7 @@ class ControllerAPIModule(ControllerModule):
|
||||
'POST',
|
||||
api_token_url,
|
||||
validate_certs=self.verify_ssl,
|
||||
timeout=self.request_timeout,
|
||||
follow_redirects=True,
|
||||
force_basic_auth=True,
|
||||
url_username=self.username,
|
||||
@@ -988,6 +999,7 @@ class ControllerAPIModule(ControllerModule):
|
||||
'DELETE',
|
||||
api_token_url,
|
||||
validate_certs=self.verify_ssl,
|
||||
timeout=self.request_timeout,
|
||||
follow_redirects=True,
|
||||
force_basic_auth=True,
|
||||
url_username=self.username,
|
||||
|
||||
@@ -53,6 +53,23 @@
|
||||
that:
|
||||
- result is not changed
|
||||
|
||||
- name: Create a git project and wait with short request timeout.
|
||||
project:
|
||||
name: "{{ project_name1 }}"
|
||||
organization: Default
|
||||
scm_type: git
|
||||
scm_url: https://github.com/ansible/test-playbooks
|
||||
wait: true
|
||||
state: exists
|
||||
request_timeout: .001
|
||||
register: result
|
||||
ignore_errors: true
|
||||
|
||||
- assert:
|
||||
that:
|
||||
- result is failed
|
||||
- "'timed out' in result.msg"
|
||||
|
||||
- name: Delete a git project without credentials and wait
|
||||
project:
|
||||
name: "{{ project_name1 }}"
|
||||
|
||||
@@ -13,30 +13,35 @@
|
||||
apiVersion: v1
|
||||
kind: ServiceAccount
|
||||
metadata:
|
||||
name: awx
|
||||
|
||||
name: containergroup-service-account
|
||||
namespace: containergroup-namespace
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: Role
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
metadata:
|
||||
name: pod-manager
|
||||
name: role-containergroup-service-account
|
||||
namespace: containergroup-namespace
|
||||
rules:
|
||||
- apiGroups: [""] # "" indicates the core API group
|
||||
- apiGroups: [""]
|
||||
resources: ["pods"]
|
||||
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
|
||||
- apiGroups: [""]
|
||||
resources: ["pods/exec"]
|
||||
verbs: ["create"]
|
||||
|
||||
resources: ["pods/log"]
|
||||
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
|
||||
- apiGroups: [""]
|
||||
resources: ["pods/attach"]
|
||||
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
|
||||
---
|
||||
kind: RoleBinding
|
||||
apiVersion: rbac.authorization.k8s.io/v1beta1
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
metadata:
|
||||
name: awx-pod-manager
|
||||
name: role-containergroup-service-account-binding
|
||||
namespace: containergroup-namespace
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: awx
|
||||
name: containergroup-service-account
|
||||
namespace: containergroup-namespace
|
||||
roleRef:
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
kind: Role
|
||||
name: pod-manager
|
||||
name: role-containergroup-service-account
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
Copyright (c) 2013-2019 Python Charmers Pty Ltd, Australia
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
@@ -1,21 +0,0 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2013 Daniel Bader (http://dbader.org)
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
@@ -40,12 +40,12 @@ psycopg
|
||||
psutil
|
||||
pygerduty
|
||||
pyparsing==2.4.6 # Upgrading to v3 of pyparsing introduce errors on smart host filtering: Expected 'or' term, found 'or' (at char 15), (line:1, col:16)
|
||||
python-daemon>3.0.0
|
||||
python-dsv-sdk
|
||||
python-tss-sdk==1.0.0
|
||||
python-ldap
|
||||
pyyaml>=6.0.1
|
||||
receptorctl==1.3.0
|
||||
schedule==0.6.0
|
||||
social-auth-core[openidconnect]==4.3.0 # see UPGRADE BLOCKERs
|
||||
social-auth-app-django==5.0.0 # see UPGRADE BLOCKERs
|
||||
sqlparse >= 0.4.4 # Required by django https://github.com/ansible/awx/security/dependabot/96
|
||||
|
||||
@@ -155,8 +155,6 @@ frozenlist==1.3.3
|
||||
# via
|
||||
# aiohttp
|
||||
# aiosignal
|
||||
future==0.18.3
|
||||
# via django-radius
|
||||
gitdb==4.0.10
|
||||
# via gitpython
|
||||
gitpython==3.1.30
|
||||
@@ -315,8 +313,10 @@ pyrad==2.4
|
||||
# via django-radius
|
||||
pyrsistent==0.19.2
|
||||
# via jsonschema
|
||||
python-daemon==2.3.2
|
||||
# via ansible-runner
|
||||
python-daemon==3.0.1
|
||||
# via
|
||||
# -r /awx_devel/requirements/requirements.in
|
||||
# ansible-runner
|
||||
python-dateutil==2.8.2
|
||||
# via
|
||||
# adal
|
||||
@@ -380,8 +380,6 @@ rsa==4.9
|
||||
# python-jose
|
||||
s3transfer==0.6.0
|
||||
# via boto3
|
||||
schedule==0.6.0
|
||||
# via -r /awx_devel/requirements/requirements.in
|
||||
semantic-version==2.10.0
|
||||
# via setuptools-rust
|
||||
service-identity==21.1.0
|
||||
@@ -392,7 +390,6 @@ setuptools-scm[toml]==7.0.5
|
||||
# via -r /awx_devel/requirements/requirements.in
|
||||
six==1.16.0
|
||||
# via
|
||||
# ansible-runner
|
||||
# automat
|
||||
# azure-core
|
||||
# django-pglocks
|
||||
|
||||
Reference in New Issue
Block a user