Refactor canceling to work through messaging and signals, not database

If canceled attempted before, still allow attempting another cancel
in this case, attempt to send the sigterm signal again.
Keep clicking, you might help!

Replace other cancel_callbacks with sigterm watcher
  adapt special inventory mechanism for this too

Get rid of the cancel_watcher method with exception in main thread

Handle academic case of sigterm race condition

Process cancelation as control signal

Fully connect cancel method and run_dispatcher to control

Never transition workflows directly to canceled, add logs
This commit is contained in:
Alan Rominger
2021-11-08 16:57:12 -05:00
parent f9428c10b9
commit c59bbdecdb
13 changed files with 137 additions and 91 deletions

View File

@@ -37,18 +37,24 @@ class Control(object):
def running(self, *args, **kwargs):
return self.control_with_reply('running', *args, **kwargs)
def cancel(self, task_ids, *args, **kwargs):
return self.control_with_reply('cancel', *args, extra_data={'task_ids': task_ids}, **kwargs)
@classmethod
def generate_reply_queue_name(cls):
return f"reply_to_{str(uuid.uuid4()).replace('-','_')}"
def control_with_reply(self, command, timeout=5):
def control_with_reply(self, command, timeout=5, extra_data=None):
logger.warning('checking {} {} for {}'.format(self.service, command, self.queuename))
reply_queue = Control.generate_reply_queue_name()
self.result = None
with pg_bus_conn(new_connection=True) as conn:
conn.listen(reply_queue)
conn.notify(self.queuename, json.dumps({'control': command, 'reply_to': reply_queue}))
send_data = {'control': command, 'reply_to': reply_queue}
if extra_data:
send_data.update(extra_data)
conn.notify(self.queuename, json.dumps(send_data))
for reply in conn.events(select_timeout=timeout, yield_timeouts=True):
if reply is None: