From 546281d435f2ca65296a5aeb906279fe37c0427d Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Fri, 29 Mar 2019 10:31:12 -0400 Subject: [PATCH 1/4] work around a bug where runner doesn't provide atomic event writes --- awx/main/expect/isolated_manager.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/awx/main/expect/isolated_manager.py b/awx/main/expect/isolated_manager.py index 0830b390c7..a1135c7eab 100644 --- a/awx/main/expect/isolated_manager.py +++ b/awx/main/expect/isolated_manager.py @@ -198,9 +198,20 @@ class IsolatedManager(object): for event in set(os.listdir(events_path)) - self.handled_events: path = os.path.join(events_path, event) if os.path.exists(path): - event_data = json.load( - open(os.path.join(events_path, event), 'r') - ) + try: + event_data = json.load( + open(os.path.join(events_path, event), 'r') + ) + except json.decoder.JSONDecodeError: + # This means the event we got back isn't valid JSON + # that can happen if runner is still partially + # writing an event file while it's rsyncing + # these event writes are _supposed_ to be atomic + # but it doesn't look like they actually are in + # practice + # in this scenario, just ignore this event and try it + # again on the next sync + pass event_data.setdefault(self.event_data_key, self.instance.id) dispatcher.dispatch(event_data) self.handled_events.add(event) From d663d397f8eeeeb85e9db91885ac79a50190c05c Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Fri, 29 Mar 2019 10:31:56 -0400 Subject: [PATCH 2/4] clean up some isolated adhoc code if it's adhoc, there's not project directory to copy --- awx/main/tasks.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index c21b4ae554..50c7509187 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1187,14 +1187,17 @@ class BaseTask(object): if self.instance.is_isolated() is True: module_args = None if 'module_args' in params: + # if it's adhoc, copy the module args module_args = ansible_runner.utils.args2cmdline( params.get('module_args'), ) + else: + # otherwise, it's a playbook, so copy the project dir + copy_tree(cwd, os.path.join(private_data_dir, 'project')) shutil.move( params.pop('inventory'), os.path.join(private_data_dir, 'inventory') ) - copy_tree(cwd, os.path.join(private_data_dir, 'project')) ansible_runner.utils.dump_artifacts(params) manager_instance = isolated_manager.IsolatedManager( cancelled_callback=lambda: self.update_model(self.instance.pk).cancel_flag From 8f089c02a5131653ae6f564aac882dcc5f35603f Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Fri, 29 Mar 2019 10:32:24 -0400 Subject: [PATCH 3/4] fix some faulty logic in isolated syncs that caused SSH keys to not work --- awx/playbooks/run_isolated.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awx/playbooks/run_isolated.yml b/awx/playbooks/run_isolated.yml index 8e3ed15cc8..157d561635 100644 --- a/awx/playbooks/run_isolated.yml +++ b/awx/playbooks/run_isolated.yml @@ -18,7 +18,7 @@ src: "{{src}}" dest: "{{dest}}" - - stat: path="{{src}}/env/ssh_key" + - local_action: stat path="{{src}}/env/ssh_key" register: key - name: create a named pipe for secret environment data From a59bc332802abc2865600e08b7a98e2d9bea91c3 Mon Sep 17 00:00:00 2001 From: Ryan Petrello Date: Fri, 29 Mar 2019 10:48:42 -0400 Subject: [PATCH 4/4] slightly refactor isolated event consumption --- awx/main/expect/isolated_manager.py | 73 ++++++++++++++++------------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/awx/main/expect/isolated_manager.py b/awx/main/expect/isolated_manager.py index a1135c7eab..a5f64d6e57 100644 --- a/awx/main/expect/isolated_manager.py +++ b/awx/main/expect/isolated_manager.py @@ -187,39 +187,7 @@ class IsolatedManager(object): self.private_data_dir, extravars=extravars) status, rc = runner_obj.status, runner_obj.rc - - # discover new events and ingest them - events_path = self.path_to('artifacts', self.ident, 'job_events') - - # it's possible that `events_path` doesn't exist *yet*, because runner - # hasn't actually written any events yet (if you ran e.g., a sleep 30) - # only attempt to consume events if any were rsynced back - if os.path.exists(events_path): - for event in set(os.listdir(events_path)) - self.handled_events: - path = os.path.join(events_path, event) - if os.path.exists(path): - try: - event_data = json.load( - open(os.path.join(events_path, event), 'r') - ) - except json.decoder.JSONDecodeError: - # This means the event we got back isn't valid JSON - # that can happen if runner is still partially - # writing an event file while it's rsyncing - # these event writes are _supposed_ to be atomic - # but it doesn't look like they actually are in - # practice - # in this scenario, just ignore this event and try it - # again on the next sync - pass - event_data.setdefault(self.event_data_key, self.instance.id) - dispatcher.dispatch(event_data) - self.handled_events.add(event) - - # handle artifacts - if event_data.get('event_data', {}).get('artifact_data', {}): - self.instance.artifacts = event_data['event_data']['artifact_data'] - self.instance.save(update_fields=['artifacts']) + self.consume_events(dispatcher) last_check = time.time() @@ -231,6 +199,10 @@ class IsolatedManager(object): with open(rc_path, 'r') as f: rc = int(f.readline()) + # consume events one last time just to be sure we didn't miss anything + # in the final sync + self.consume_events(dispatcher) + # emit an EOF event event_data = { 'event': 'EOF', @@ -241,6 +213,41 @@ class IsolatedManager(object): return status, rc + def consume_events(self, dispatcher): + # discover new events and ingest them + events_path = self.path_to('artifacts', self.ident, 'job_events') + + # it's possible that `events_path` doesn't exist *yet*, because runner + # hasn't actually written any events yet (if you ran e.g., a sleep 30) + # only attempt to consume events if any were rsynced back + if os.path.exists(events_path): + for event in set(os.listdir(events_path)) - self.handled_events: + path = os.path.join(events_path, event) + if os.path.exists(path): + try: + event_data = json.load( + open(os.path.join(events_path, event), 'r') + ) + except json.decoder.JSONDecodeError: + # This means the event we got back isn't valid JSON + # that can happen if runner is still partially + # writing an event file while it's rsyncing + # these event writes are _supposed_ to be atomic + # but it doesn't look like they actually are in + # practice + # in this scenario, just ignore this event and try it + # again on the next sync + pass + event_data.setdefault(self.event_data_key, self.instance.id) + dispatcher.dispatch(event_data) + self.handled_events.add(event) + + # handle artifacts + if event_data.get('event_data', {}).get('artifact_data', {}): + self.instance.artifacts = event_data['event_data']['artifact_data'] + self.instance.save(update_fields=['artifacts']) + + def cleanup(self): # If the job failed for any reason, make a last-ditch effort at cleanup extravars = {