From 648aa470d7c033c3c1ec9546520e97333e15afff Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Tue, 11 Apr 2017 14:58:30 -0400 Subject: [PATCH 1/4] lock projects on project sync * Use filesystem, blocking, locks to prevent two project syncs for the same project from running the project update playbook at the same time. --- awx/main/models/projects.py | 6 ++++++ awx/main/tasks.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index bf60e5b77c..79c2183964 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -208,6 +208,12 @@ class ProjectOptions(models.Model): results.append(smart_text(playbook)) return sorted(results, key=lambda x: smart_str(x).lower()) + def get_lock_file(self): + proj_path = self.get_project_path() + if proj_path: + return os.path.join(proj_path, 'tower_sync.lock') + return None + class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin): ''' diff --git a/awx/main/tasks.py b/awx/main/tasks.py index d60d936531..b0e8a5b698 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -22,6 +22,7 @@ import urlparse import uuid from distutils.version import LooseVersion as Version import yaml +import fcntl try: import psutil except: @@ -1312,7 +1313,37 @@ class RunProjectUpdate(BaseTask): instance_actual.save() return OutputEventFilter(stdout_handle, raw_callback=raw_callback) + def release_lock(self, instance): + # May raise IOError + fcntl.flock(self.lock_fd, fcntl.LOCK_UN) + + os.close(self.lock_fd) + self.lock_fd = None + + ''' + Note: We don't support blocking=False + ''' + def acquire_lock(self, instance, blocking=True): + lock_path = instance.get_lock_file() + if lock_path is None: + raise RuntimeError(u'Invalid file %s' % instance.get_lock_file()) + + # May raise IOError + self.lock_fd = os.open(lock_path, os.O_RDONLY | os.O_CREAT) + + # May raise IOError + fcntl.flock(self.lock_fd, fcntl.LOCK_EX) + + def pre_run_hook(self, instance, **kwargs): + if instance.launch_type == 'sync': + #from celery.contrib import rdb + #rdb.set_trace() + self.acquire_lock(instance) + def post_run_hook(self, instance, status, **kwargs): + if instance.launch_type == 'sync': + self.release_lock(instance) + if instance.job_type == 'check' and status not in ('failed', 'canceled',): p = instance.project fd = open(self.revision_path, 'r') From 7919c47370f55c8d756e0a3d9baf9d200cc75d72 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Tue, 11 Apr 2017 16:53:13 -0400 Subject: [PATCH 2/4] put lock file in project root dir --- awx/main/models/projects.py | 6 +++--- awx/main/tasks.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/awx/main/models/projects.py b/awx/main/models/projects.py index 79c2183964..a49046e0eb 100644 --- a/awx/main/models/projects.py +++ b/awx/main/models/projects.py @@ -210,9 +210,9 @@ class ProjectOptions(models.Model): def get_lock_file(self): proj_path = self.get_project_path() - if proj_path: - return os.path.join(proj_path, 'tower_sync.lock') - return None + if not proj_path: + return None + return proj_path + '.lock' class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin): diff --git a/awx/main/tasks.py b/awx/main/tasks.py index b0e8a5b698..1078b6a76f 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1326,7 +1326,7 @@ class RunProjectUpdate(BaseTask): def acquire_lock(self, instance, blocking=True): lock_path = instance.get_lock_file() if lock_path is None: - raise RuntimeError(u'Invalid file %s' % instance.get_lock_file()) + raise RuntimeError(u'Invalid lock file path') # May raise IOError self.lock_fd = os.open(lock_path, os.O_RDONLY | os.O_CREAT) From d1eba4b607d5a7abe906ca7fe351fcdcf90b75a0 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Wed, 12 Apr 2017 11:41:11 -0400 Subject: [PATCH 3/4] log lock errors and test it --- awx/main/tests/unit/test_tasks.py | 63 +++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/awx/main/tests/unit/test_tasks.py b/awx/main/tests/unit/test_tasks.py index 16b9bc6b14..6465512da0 100644 --- a/awx/main/tests/unit/test_tasks.py +++ b/awx/main/tests/unit/test_tasks.py @@ -1,6 +1,9 @@ from contextlib import contextmanager +import os +import fcntl import pytest +import mock import yaml from awx.main.models import ( @@ -143,3 +146,63 @@ def test_openstack_client_config_generation_with_private_source_vars(mocker, sou 'private': expected } } + + +def test_os_open_oserror(): + with pytest.raises(OSError): + os.open('this_file_does_not_exist', os.O_RDONLY) + + +def test_fcntl_ioerror(): + with pytest.raises(IOError): + fcntl.flock(99999, fcntl.LOCK_EX) + + +@mock.patch('os.open') +@mock.patch('logging.getLogger') +def test_aquire_lock_open_fail_logged(logging_getLogger, os_open): + err = OSError() + err.errno = 3 + err.strerror = 'dummy message' + + instance = mock.Mock() + instance.get_lock_file.return_value = 'this_file_does_not_exist' + + os_open.side_effect = err + + logger = mock.Mock() + logging_getLogger.return_value = logger + + ProjectUpdate = tasks.RunProjectUpdate() + + with pytest.raises(OSError, errno=3, strerror='dummy message'): + ProjectUpdate.acquire_lock(instance) + assert logger.err.called_with("I/O error({0}) while trying to open lock file [{1}]: {2}".format(3, 'this_file_does_not_exist', 'dummy message')) + + +@mock.patch('os.open') +@mock.patch('os.close') +@mock.patch('logging.getLogger') +@mock.patch('fcntl.flock') +def test_aquire_lock_acquisition_fail_logged(fcntl_flock, logging_getLogger, os_close, os_open): + err = IOError() + err.errno = 3 + err.strerror = 'dummy message' + + instance = mock.Mock() + instance.get_lock_file.return_value = 'this_file_does_not_exist' + + os_open.return_value = 3 + + logger = mock.Mock() + logging_getLogger.return_value = logger + + fcntl_flock.side_effect = err + + ProjectUpdate = tasks.RunProjectUpdate() + + with pytest.raises(IOError, errno=3, strerror='dummy message'): + ProjectUpdate.acquire_lock(instance) + os_close.assert_called_with(3) + assert logger.err.called_with("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(3, 'this_file_does_not_exist', 'dummy message')) + From 70fafe75d2af1f4ccb33431b2f125c36526a4758 Mon Sep 17 00:00:00 2001 From: Chris Meyers Date: Wed, 12 Apr 2017 11:42:31 -0400 Subject: [PATCH 4/4] clean up commented code --- awx/main/tasks.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/awx/main/tasks.py b/awx/main/tasks.py index 1078b6a76f..7d09ee87d1 100644 --- a/awx/main/tasks.py +++ b/awx/main/tasks.py @@ -1314,8 +1314,12 @@ class RunProjectUpdate(BaseTask): return OutputEventFilter(stdout_handle, raw_callback=raw_callback) def release_lock(self, instance): - # May raise IOError - fcntl.flock(self.lock_fd, fcntl.LOCK_UN) + try: + fcntl.flock(self.lock_fd, fcntl.LOCK_UN) + except IOError as e: + logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, instance.get_lock_file(), e.strerror)) + os.close(self.lock_fd) + raise os.close(self.lock_fd) self.lock_fd = None @@ -1328,16 +1332,21 @@ class RunProjectUpdate(BaseTask): if lock_path is None: raise RuntimeError(u'Invalid lock file path') - # May raise IOError - self.lock_fd = os.open(lock_path, os.O_RDONLY | os.O_CREAT) + try: + self.lock_fd = os.open(lock_path, os.O_RDONLY | os.O_CREAT) + except OSError as e: + logger.error("I/O error({0}) while trying to open lock file [{1}]: {2}".format(e.errno, lock_path, e.strerror)) + raise - # May raise IOError - fcntl.flock(self.lock_fd, fcntl.LOCK_EX) + try: + fcntl.flock(self.lock_fd, fcntl.LOCK_EX) + except IOError as e: + os.close(self.lock_fd) + logger.error("I/O error({0}) while trying to aquire lock on file [{1}]: {2}".format(e.errno, lock_path, e.strerror)) + raise def pre_run_hook(self, instance, **kwargs): if instance.launch_type == 'sync': - #from celery.contrib import rdb - #rdb.set_trace() self.acquire_lock(instance) def post_run_hook(self, instance, status, **kwargs):