diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d51f3cb1a7..7a408265d9 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -23,6 +23,9 @@ import fcntl from pathlib import Path from uuid import uuid4 import urllib.parse as urlparse +import socket +import threading +import concurrent.futures # Django from django.conf import settings @@ -49,6 +52,9 @@ from gitdb.exc import BadName as BadGitName # Runner import ansible_runner +# Receptor +from receptorctl.socket_interface import ReceptorControl + # AWX from awx import __version__ as awx_application_version from awx.main.constants import PRIVILEGE_ESCALATION_METHODS, STANDARD_INVENTORY_UPDATE_ENV @@ -1453,15 +1459,10 @@ class BaseTask(object): params = { 'ident': self.instance.id, 'private_data_dir': private_data_dir, - 'project_dir': cwd, 'playbook': self.build_playbook_path_relative_to_cwd(self.instance, private_data_dir), 'inventory': self.build_inventory(self.instance, private_data_dir), 'passwords': expect_passwords, 'envvars': env, - 'event_handler': self.event_handler, - 'cancel_callback': self.cancel_callback, - 'finished_callback': self.finished_callback, - 'status_handler': self.status_handler, 'settings': { 'job_timeout': self.get_instance_timeout(self.instance), 'suppress_ansible_output': True, @@ -1473,10 +1474,7 @@ class BaseTask(object): # We don't want HOME passed through to container groups. # TODO: remove this conditional after everything is containerized params['envvars'].pop('HOME', None) - else: - # TODO: container group jobs will not work with container isolation settings - # but both will run with same settings when worker_in and worker_out are added - params['settings'].update(execution_environment_params) + if isinstance(self.instance, AdHocCommand): params['module'] = self.build_module_name(self.instance) @@ -1497,39 +1495,85 @@ class BaseTask(object): del params[v] self.dispatcher = CallbackQueueDispatcher() - if self.instance.is_isolated() or containerized: - module_args = None - if 'module_args' in params: - # if it's adhoc, copy the module args - module_args = ansible_runner.utils.args2cmdline( - params.get('module_args'), - ) - # TODO on merge: delete if https://github.com/ansible/awx/pull/8185 is merged - if not os.path.exists(os.path.join(private_data_dir, 'inventory')): - shutil.move( - params.pop('inventory'), - os.path.join(private_data_dir, 'inventory') - ) - ansible_runner.utils.dump_artifacts(params) - isolated_manager_instance = isolated_manager.IsolatedManager( - self.event_handler, - canceled_callback=lambda: self.update_model(self.instance.pk).cancel_flag, - check_callback=self.check_handler, - pod_manager=pod_manager - ) - status, rc = isolated_manager_instance.run(self.instance, - private_data_dir, - params.get('playbook'), - params.get('module'), - module_args, - ident=str(self.instance.pk)) - self.finished_callback(None) + if not isinstance(self.instance, ProjectUpdate): + worktype='worker' + # TODO: container group jobs will not work with container isolation settings + # but both will run with same settings when worker_in and worker_out are added + params['settings'].update(execution_environment_params) else: - res = ansible_runner.interface.run(**params) - status = res.status - rc = res.rc + worktype='worker' + params['settings'].update(execution_environment_params) + + # Create a socketpair. Where the left side will be used for writing our payload + # (private data dir, kwargs). The right side will be passed to Receptor for + # reading. + sockin, sockout = socket.socketpair() + + # Spawned in a thread so Receptor can start reading before we finish writing, we + # write our payload to the left side of our socketpair. + def transmit(_socket): + ansible_runner.interface.run(streamer='transmit', + _output=_socket.makefile('wb'), + **params) + + # Socket must be shutdown here, or the reader will hang forever. + _socket.shutdown(socket.SHUT_WR) + + threading.Thread(target=transmit, args=[sockin]).start() + + self.instance.log_lifecycle("running_playbook") + # We establish a connection to the Receptor socket and submit our work, passing + # in the right side of our socketpair for reading. + receptor_ctl = ReceptorControl('/var/run/receptor/receptor.sock') + result = receptor_ctl.submit_work(worktype=worktype, + payload=sockout.makefile('rb')) + sockin.close() + sockout.close() + + resultsock, resultfile = receptor_ctl.get_work_results(result['unitid'], + return_socket=True, + return_sockfile=True) + + def processor(): + return ansible_runner.interface.run(streamer='process', + quiet=True, + _input=resultfile, + event_handler=self.event_handler, + finished_callback=self.finished_callback, + status_handler=self.status_handler) + + def cancel_watcher(processor_future): + while True: + if processor_future.done(): + return + + if self.cancel_callback(): + result = namedtuple('result', ['status', 'rc']) + return result('canceled', 1) + time.sleep(1) + + # Both "processor" and "cancel_watcher" are spawned in separate threads. + # We wait for the first one to return. If cancel_watcher returns first, + # we yank the socket out from underneath the processor, which will cause it + # to exit. A reference to the processor_future is passed into the cancel_watcher_future, + # Which exits if the job has finished normally. The context manager ensures we do not + # leave any threads laying around. + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + processor_future = executor.submit(processor) + cancel_watcher_future = executor.submit(cancel_watcher, processor_future) + futures = [processor_future, cancel_watcher_future] + first_future = concurrent.futures.wait(futures, + return_when=concurrent.futures.FIRST_COMPLETED) + + res = list(first_future.done)[0].result() + if res.status == 'canceled': + resultsock.shutdown(socket.SHUT_RDWR) + resultfile.close() + + status = res.status + rc = res.rc if status == 'timeout': self.instance.job_explanation = "Job terminated due to timeout" diff --git a/awx/main/tests/functional/test_inventory_source_injectors.py b/awx/main/tests/functional/test_inventory_source_injectors.py index bf1a4002c5..c4f7e6a17d 100644 --- a/awx/main/tests/functional/test_inventory_source_injectors.py +++ b/awx/main/tests/functional/test_inventory_source_injectors.py @@ -206,6 +206,10 @@ def test_inventory_update_injected_content(this_kind, inventory, fake_credential It will make assertions that the contents are correct If MAKE_INVENTORY_REFERENCE_FILES is set, it will produce reference files """ + if _kw.get('streamer') != 'transmit': + Res = namedtuple('Result', ['status', 'rc']) + return Res('successful', 0) + private_data_dir = envvars.pop('AWX_PRIVATE_DATA_DIR') assert envvars.pop('ANSIBLE_INVENTORY_ENABLED') == 'auto' set_files = bool(os.getenv("MAKE_INVENTORY_REFERENCE_FILES", 'false').lower()[0] not in ['f', '0']) diff --git a/requirements/requirements_dev.txt b/requirements/requirements_dev.txt index fe51fff164..4788e153a2 100644 --- a/requirements/requirements_dev.txt +++ b/requirements/requirements_dev.txt @@ -20,5 +20,6 @@ matplotlib backports.tempfile # support in unit tests for py32+ tempfile.TemporaryDirectory mockldap sdb +remote-pdb gprof2dot atomicwrites==1.4.0 diff --git a/tools/ansible/roles/dockerfile/templates/Dockerfile.j2 b/tools/ansible/roles/dockerfile/templates/Dockerfile.j2 index 838b0ef7c6..0d48f2ea56 100644 --- a/tools/ansible/roles/dockerfile/templates/Dockerfile.j2 +++ b/tools/ansible/roles/dockerfile/templates/Dockerfile.j2 @@ -143,11 +143,6 @@ RUN ansible-galaxy collection install --collections-path /usr/share/ansible/coll RUN rm -rf /root/.cache && rm -rf /tmp/* -# Install Receptor -RUN cd /usr/local/bin && \ - curl -L http://nightlies.testing.ansible.com/receptor/receptor --output receptor && \ - chmod a+x receptor - # Install OpenShift CLI RUN cd /usr/local/bin && \ curl -L https://github.com/openshift/origin/releases/download/v3.11.0/openshift-origin-client-tools-v3.11.0-0cbc58b-linux-64bit.tar.gz | \ @@ -190,6 +185,7 @@ COPY --from=builder /var/lib/awx /var/lib/awx RUN ln -s /var/lib/awx/venv/awx/bin/awx-manage /usr/bin/awx-manage {%if build_dev|bool %} +COPY --from=quay.io/shanemcd/receptor /usr/bin/receptor /usr/bin/receptor RUN openssl req -nodes -newkey rsa:2048 -keyout /etc/nginx/nginx.key -out /etc/nginx/nginx.csr \ -subj "/C=US/ST=North Carolina/L=Durham/O=Ansible/OU=AWX Development/CN=awx.localhost" && \ openssl x509 -req -days 365 -in /etc/nginx/nginx.csr -signkey /etc/nginx/nginx.key -out /etc/nginx/nginx.crt && \ diff --git a/tools/docker-compose/ansible/roles/sources/templates/docker-compose.yml.j2 b/tools/docker-compose/ansible/roles/sources/templates/docker-compose.yml.j2 index eae187cea6..db81b4ba39 100644 --- a/tools/docker-compose/ansible/roles/sources/templates/docker-compose.yml.j2 +++ b/tools/docker-compose/ansible/roles/sources/templates/docker-compose.yml.j2 @@ -35,6 +35,8 @@ services: - "redis_socket:/var/run/redis/:rw" - "receptor:/var/run/receptor/" - "/sys/fs/cgroup:/sys/fs/cgroup" + - "./docker-compose/receptor.conf:/etc/receptor/receptor.conf" + - "~/.kube/config:/var/lib/awx/.kube/config" privileged: true tty: true # A useful container that simply passes through log messages to the console @@ -43,16 +45,6 @@ services: # build: # context: ./docker-compose # dockerfile: Dockerfile-logstash - ee: - image: quay.io/ansible/awx-ee - user: ${CURRENT_UID} - volumes: - - "./docker-compose/receptor.cfg:/receptor.cfg" - - "receptor:/var/run/receptor/" - command: - - receptor - - --config - - /receptor.cfg postgres: image: postgres:12 container_name: tools_postgres_1 diff --git a/tools/docker-compose/receptor.cfg b/tools/docker-compose/receptor.conf similarity index 59% rename from tools/docker-compose/receptor.cfg rename to tools/docker-compose/receptor.conf index 137d15cdf6..7df861f6ca 100644 --- a/tools/docker-compose/receptor.cfg +++ b/tools/docker-compose/receptor.conf @@ -5,11 +5,15 @@ service: control filename: /var/run/receptor/receptor.sock -- tcp-listener: - port: 2222 +- local-only: - work-command: worktype: worker command: ansible-runner params: worker - allowruntimeparams: true + +- work-kubernetes: + worktype: ocp + namespace: receptor + image: quay.io/shanemcd/ee + authmethod: kubeconfig diff --git a/tools/docker-compose/supervisor.conf b/tools/docker-compose/supervisor.conf index 04ddb66838..1a71b8018e 100644 --- a/tools/docker-compose/supervisor.conf +++ b/tools/docker-compose/supervisor.conf @@ -84,7 +84,7 @@ stdout_logfile=/dev/fd/1 stdout_logfile_maxbytes=0 [program:awx-receptor] -command = receptor --node id=%(ENV_HOSTNAME)s --control-service filename=/var/run/receptor/receptor.sock --tcp-listener port=2222 +command = receptor --config /etc/receptor/receptor.conf autostart = true autorestart = true stopsignal = KILL