mirror of
https://github.com/ansible/awx.git
synced 2026-03-24 04:15:02 -02:30
Merge pull request #12694 from AlanCoding/whoami
Shortcut Instance.objects.me when possible
This commit is contained in:
@@ -166,11 +166,7 @@ class Metrics:
|
|||||||
elif settings.IS_TESTING():
|
elif settings.IS_TESTING():
|
||||||
self.instance_name = "awx_testing"
|
self.instance_name = "awx_testing"
|
||||||
else:
|
else:
|
||||||
try:
|
self.instance_name = Instance.objects.my_hostname()
|
||||||
self.instance_name = Instance.objects.me().hostname
|
|
||||||
except Exception as e:
|
|
||||||
self.instance_name = settings.CLUSTER_HOST_ID
|
|
||||||
logger.info(f'Instance {self.instance_name} seems to be unregistered, error: {e}')
|
|
||||||
|
|
||||||
# metric name, help_text
|
# metric name, help_text
|
||||||
METRICSLIST = [
|
METRICSLIST = [
|
||||||
|
|||||||
@@ -16,12 +16,7 @@ def startup_reaping():
|
|||||||
If this particular instance is starting, then we know that any running jobs are invalid
|
If this particular instance is starting, then we know that any running jobs are invalid
|
||||||
so we will reap those jobs as a special action here
|
so we will reap those jobs as a special action here
|
||||||
"""
|
"""
|
||||||
try:
|
jobs = UnifiedJob.objects.filter(status='running', controller_node=Instance.objects.my_hostname())
|
||||||
me = Instance.objects.me()
|
|
||||||
except RuntimeError as e:
|
|
||||||
logger.warning(f'Local instance is not registered, not running startup reaper: {e}')
|
|
||||||
return
|
|
||||||
jobs = UnifiedJob.objects.filter(status='running', controller_node=me.hostname)
|
|
||||||
job_ids = []
|
job_ids = []
|
||||||
for j in jobs:
|
for j in jobs:
|
||||||
job_ids.append(j.id)
|
job_ids.append(j.id)
|
||||||
@@ -62,16 +57,13 @@ def reap_waiting(instance=None, status='failed', job_explanation=None, grace_per
|
|||||||
if grace_period is None:
|
if grace_period is None:
|
||||||
grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT
|
grace_period = settings.JOB_WAITING_GRACE_PERIOD + settings.TASK_MANAGER_TIMEOUT
|
||||||
|
|
||||||
me = instance
|
if instance is None:
|
||||||
if me is None:
|
hostname = Instance.objects.my_hostname()
|
||||||
try:
|
else:
|
||||||
me = Instance.objects.me()
|
hostname = instance.hostname
|
||||||
except RuntimeError as e:
|
|
||||||
logger.warning(f'Local instance is not registered, not running reaper: {e}')
|
|
||||||
return
|
|
||||||
if ref_time is None:
|
if ref_time is None:
|
||||||
ref_time = tz_now()
|
ref_time = tz_now()
|
||||||
jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=ref_time - timedelta(seconds=grace_period), controller_node=me.hostname)
|
jobs = UnifiedJob.objects.filter(status='waiting', modified__lte=ref_time - timedelta(seconds=grace_period), controller_node=hostname)
|
||||||
if excluded_uuids:
|
if excluded_uuids:
|
||||||
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
|
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
|
||||||
for j in jobs:
|
for j in jobs:
|
||||||
@@ -82,16 +74,13 @@ def reap(instance=None, status='failed', job_explanation=None, excluded_uuids=No
|
|||||||
"""
|
"""
|
||||||
Reap all jobs in running for this instance.
|
Reap all jobs in running for this instance.
|
||||||
"""
|
"""
|
||||||
me = instance
|
if instance is None:
|
||||||
if me is None:
|
hostname = Instance.objects.my_hostname()
|
||||||
try:
|
else:
|
||||||
me = Instance.objects.me()
|
hostname = instance.hostname
|
||||||
except RuntimeError as e:
|
|
||||||
logger.warning(f'Local instance is not registered, not running reaper: {e}')
|
|
||||||
return
|
|
||||||
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
|
workflow_ctype_id = ContentType.objects.get_for_model(WorkflowJob).id
|
||||||
jobs = UnifiedJob.objects.filter(
|
jobs = UnifiedJob.objects.filter(
|
||||||
Q(status='running') & (Q(execution_node=me.hostname) | Q(controller_node=me.hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
|
Q(status='running') & (Q(execution_node=hostname) | Q(controller_node=hostname)) & ~Q(polymorphic_ctype_id=workflow_ctype_id)
|
||||||
)
|
)
|
||||||
if excluded_uuids:
|
if excluded_uuids:
|
||||||
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
|
jobs = jobs.exclude(celery_task_id__in=excluded_uuids)
|
||||||
|
|||||||
@@ -53,7 +53,7 @@ class Command(BaseCommand):
|
|||||||
return lines
|
return lines
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_connection_status(cls, me, hostnames, data):
|
def get_connection_status(cls, hostnames, data):
|
||||||
host_stats = [('hostname', 'state', 'start time', 'duration (sec)')]
|
host_stats = [('hostname', 'state', 'start time', 'duration (sec)')]
|
||||||
for h in hostnames:
|
for h in hostnames:
|
||||||
connection_color = '91' # red
|
connection_color = '91' # red
|
||||||
@@ -78,7 +78,7 @@ class Command(BaseCommand):
|
|||||||
return host_stats
|
return host_stats
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_connection_stats(cls, me, hostnames, data):
|
def get_connection_stats(cls, hostnames, data):
|
||||||
host_stats = [('hostname', 'total', 'per minute')]
|
host_stats = [('hostname', 'total', 'per minute')]
|
||||||
for h in hostnames:
|
for h in hostnames:
|
||||||
h_safe = safe_name(h)
|
h_safe = safe_name(h)
|
||||||
@@ -119,8 +119,8 @@ class Command(BaseCommand):
|
|||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
me = Instance.objects.me()
|
my_hostname = Instance.objects.my_hostname()
|
||||||
logger.info('Active instance with hostname {} is registered.'.format(me.hostname))
|
logger.info('Active instance with hostname {} is registered.'.format(my_hostname))
|
||||||
except RuntimeError as e:
|
except RuntimeError as e:
|
||||||
# the CLUSTER_HOST_ID in the task, and web instance must match and
|
# the CLUSTER_HOST_ID in the task, and web instance must match and
|
||||||
# ensure network connectivity between the task and web instance
|
# ensure network connectivity between the task and web instance
|
||||||
@@ -145,19 +145,19 @@ class Command(BaseCommand):
|
|||||||
else:
|
else:
|
||||||
data[family.name] = family.samples[0].value
|
data[family.name] = family.samples[0].value
|
||||||
|
|
||||||
me = Instance.objects.me()
|
my_hostname = Instance.objects.my_hostname()
|
||||||
hostnames = [i.hostname for i in Instance.objects.exclude(hostname=me.hostname)]
|
hostnames = [i.hostname for i in Instance.objects.exclude(hostname=my_hostname)]
|
||||||
|
|
||||||
host_stats = Command.get_connection_status(me, hostnames, data)
|
host_stats = Command.get_connection_status(hostnames, data)
|
||||||
lines = Command._format_lines(host_stats)
|
lines = Command._format_lines(host_stats)
|
||||||
|
|
||||||
print(f'Broadcast websocket connection status from "{me.hostname}" to:')
|
print(f'Broadcast websocket connection status from "{my_hostname}" to:')
|
||||||
print('\n'.join(lines))
|
print('\n'.join(lines))
|
||||||
|
|
||||||
host_stats = Command.get_connection_stats(me, hostnames, data)
|
host_stats = Command.get_connection_stats(hostnames, data)
|
||||||
lines = Command._format_lines(host_stats)
|
lines = Command._format_lines(host_stats)
|
||||||
|
|
||||||
print(f'\nBroadcast websocket connection stats from "{me.hostname}" to:')
|
print(f'\nBroadcast websocket connection stats from "{my_hostname}" to:')
|
||||||
print('\n'.join(lines))
|
print('\n'.join(lines))
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -99,9 +99,12 @@ class InstanceManager(models.Manager):
|
|||||||
instance or role.
|
instance or role.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def my_hostname(self):
|
||||||
|
return settings.CLUSTER_HOST_ID
|
||||||
|
|
||||||
def me(self):
|
def me(self):
|
||||||
"""Return the currently active instance."""
|
"""Return the currently active instance."""
|
||||||
node = self.filter(hostname=settings.CLUSTER_HOST_ID)
|
node = self.filter(hostname=self.my_hostname())
|
||||||
if node.exists():
|
if node.exists():
|
||||||
return node[0]
|
return node[0]
|
||||||
raise RuntimeError("No instance found with the current cluster host id")
|
raise RuntimeError("No instance found with the current cluster host id")
|
||||||
|
|||||||
@@ -700,7 +700,7 @@ class SourceControlMixin(BaseTask):
|
|||||||
|
|
||||||
def spawn_project_sync(self, project, sync_needs, scm_branch=None):
|
def spawn_project_sync(self, project, sync_needs, scm_branch=None):
|
||||||
pu_ig = self.instance.instance_group
|
pu_ig = self.instance.instance_group
|
||||||
pu_en = Instance.objects.me().hostname
|
pu_en = Instance.objects.my_hostname()
|
||||||
|
|
||||||
sync_metafields = dict(
|
sync_metafields = dict(
|
||||||
launch_type="sync",
|
launch_type="sync",
|
||||||
|
|||||||
@@ -35,7 +35,7 @@ def unwrap_broadcast_msg(payload: dict):
|
|||||||
def get_broadcast_hosts():
|
def get_broadcast_hosts():
|
||||||
Instance = apps.get_model('main', 'Instance')
|
Instance = apps.get_model('main', 'Instance')
|
||||||
instances = (
|
instances = (
|
||||||
Instance.objects.exclude(hostname=Instance.objects.me().hostname)
|
Instance.objects.exclude(hostname=Instance.objects.my_hostname())
|
||||||
.exclude(node_type='execution')
|
.exclude(node_type='execution')
|
||||||
.exclude(node_type='hop')
|
.exclude(node_type='hop')
|
||||||
.order_by('hostname')
|
.order_by('hostname')
|
||||||
@@ -47,7 +47,7 @@ def get_broadcast_hosts():
|
|||||||
|
|
||||||
def get_local_host():
|
def get_local_host():
|
||||||
Instance = apps.get_model('main', 'Instance')
|
Instance = apps.get_model('main', 'Instance')
|
||||||
return Instance.objects.me().hostname
|
return Instance.objects.my_hostname()
|
||||||
|
|
||||||
|
|
||||||
class WebsocketTask:
|
class WebsocketTask:
|
||||||
|
|||||||
Reference in New Issue
Block a user