123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- from __future__ import annotations
- import contextlib
- import logging
- import os
- import time
- import warnings
- from abc import ABC, abstractmethod
- from dataclasses import dataclass
- from threading import local
- from typing import TYPE_CHECKING, Any
- from ._error import Timeout
- if TYPE_CHECKING:
- from types import TracebackType
- _LOGGER = logging.getLogger("filelock")
- # This is a helper class which is returned by :meth:`BaseFileLock.acquire` and wraps the lock to make sure __enter__
- # is not called twice when entering the with statement. If we would simply return *self*, the lock would be acquired
- # again in the *__enter__* method of the BaseFileLock, but not released again automatically. issue #37 (memory leak)
- class AcquireReturnProxy:
- """A context aware object that will release the lock file when exiting."""
- def __init__(self, lock: BaseFileLock) -> None:
- self.lock = lock
- def __enter__(self) -> BaseFileLock:
- return self.lock
- def __exit__(
- self,
- exc_type: type[BaseException] | None,
- exc_value: BaseException | None,
- traceback: TracebackType | None,
- ) -> None:
- self.lock.release()
- @dataclass
- class FileLockContext:
- """A dataclass which holds the context for a ``BaseFileLock`` object."""
- # The context is held in a separate class to allow optional use of thread local storage via the
- # ThreadLocalFileContext class.
- #: The path to the lock file.
- lock_file: str
- #: The default timeout value.
- timeout: float
- #: The mode for the lock files
- mode: int
- #: The file descriptor for the *_lock_file* as it is returned by the os.open() function, not None when lock held
- lock_file_fd: int | None = None
- #: The lock counter is used for implementing the nested locking mechanism.
- lock_counter: int = 0 # When the lock is acquired is increased and the lock is only released, when this value is 0
- class ThreadLocalFileContext(FileLockContext, local):
- """A thread local version of the ``FileLockContext`` class."""
- class BaseFileLock(ABC, contextlib.ContextDecorator):
- """Abstract base class for a file lock object."""
- def __init__(
- self,
- lock_file: str | os.PathLike[Any],
- timeout: float = -1,
- mode: int = 0o644,
- thread_local: bool = True, # noqa: FBT001, FBT002
- ) -> None:
- """
- Create a new lock object.
- :param lock_file: path to the file
- :param timeout: default timeout when acquiring the lock, in seconds. It will be used as fallback value in
- the acquire method, if no timeout value (``None``) is given. If you want to disable the timeout, set it
- to a negative value. A timeout of 0 means, that there is exactly one attempt to acquire the file lock.
- :param mode: file permissions for the lockfile.
- :param thread_local: Whether this object's internal context should be thread local or not.
- If this is set to ``False`` then the lock will be reentrant across threads.
- """
- self._is_thread_local = thread_local
- # Create the context. Note that external code should not work with the context directly and should instead use
- # properties of this class.
- kwargs: dict[str, Any] = {
- "lock_file": os.fspath(lock_file),
- "timeout": timeout,
- "mode": mode,
- }
- self._context: FileLockContext = (ThreadLocalFileContext if thread_local else FileLockContext)(**kwargs)
- def is_thread_local(self) -> bool:
- """:return: a flag indicating if this lock is thread local or not"""
- return self._is_thread_local
- @property
- def lock_file(self) -> str:
- """:return: path to the lock file"""
- return self._context.lock_file
- @property
- def timeout(self) -> float:
- """
- :return: the default timeout value, in seconds
- .. versionadded:: 2.0.0
- """
- return self._context.timeout
- @timeout.setter
- def timeout(self, value: float | str) -> None:
- """
- Change the default timeout value.
- :param value: the new value, in seconds
- """
- self._context.timeout = float(value)
- @abstractmethod
- def _acquire(self) -> None:
- """If the file lock could be acquired, self._context.lock_file_fd holds the file descriptor of the lock file."""
- raise NotImplementedError
- @abstractmethod
- def _release(self) -> None:
- """Releases the lock and sets self._context.lock_file_fd to None."""
- raise NotImplementedError
- @property
- def is_locked(self) -> bool:
- """
- :return: A boolean indicating if the lock file is holding the lock currently.
- .. versionchanged:: 2.0.0
- This was previously a method and is now a property.
- """
- return self._context.lock_file_fd is not None
- @property
- def lock_counter(self) -> int:
- """:return: The number of times this lock has been acquired (but not yet released)."""
- return self._context.lock_counter
- def acquire(
- self,
- timeout: float | None = None,
- poll_interval: float = 0.05,
- *,
- poll_intervall: float | None = None,
- blocking: bool = True,
- ) -> AcquireReturnProxy:
- """
- Try to acquire the file lock.
- :param timeout: maximum wait time for acquiring the lock, ``None`` means use the default :attr:`~timeout` is and
- if ``timeout < 0``, there is no timeout and this method will block until the lock could be acquired
- :param poll_interval: interval of trying to acquire the lock file
- :param poll_intervall: deprecated, kept for backwards compatibility, use ``poll_interval`` instead
- :param blocking: defaults to True. If False, function will return immediately if it cannot obtain a lock on the
- first attempt. Otherwise, this method will block until the timeout expires or the lock is acquired.
- :raises Timeout: if fails to acquire lock within the timeout period
- :return: a context object that will unlock the file when the context is exited
- .. code-block:: python
- # You can use this method in the context manager (recommended)
- with lock.acquire():
- pass
- # Or use an equivalent try-finally construct:
- lock.acquire()
- try:
- pass
- finally:
- lock.release()
- .. versionchanged:: 2.0.0
- This method returns now a *proxy* object instead of *self*,
- so that it can be used in a with statement without side effects.
- """
- # Use the default timeout, if no timeout is provided.
- if timeout is None:
- timeout = self._context.timeout
- if poll_intervall is not None:
- msg = "use poll_interval instead of poll_intervall"
- warnings.warn(msg, DeprecationWarning, stacklevel=2)
- poll_interval = poll_intervall
- # Increment the number right at the beginning. We can still undo it, if something fails.
- self._context.lock_counter += 1
- lock_id = id(self)
- lock_filename = self.lock_file
- start_time = time.perf_counter()
- try:
- while True:
- if not self.is_locked:
- _LOGGER.debug("Attempting to acquire lock %s on %s", lock_id, lock_filename)
- self._acquire()
- if self.is_locked:
- _LOGGER.debug("Lock %s acquired on %s", lock_id, lock_filename)
- break
- if blocking is False:
- _LOGGER.debug("Failed to immediately acquire lock %s on %s", lock_id, lock_filename)
- raise Timeout(lock_filename) # noqa: TRY301
- if 0 <= timeout < time.perf_counter() - start_time:
- _LOGGER.debug("Timeout on acquiring lock %s on %s", lock_id, lock_filename)
- raise Timeout(lock_filename) # noqa: TRY301
- msg = "Lock %s not acquired on %s, waiting %s seconds ..."
- _LOGGER.debug(msg, lock_id, lock_filename, poll_interval)
- time.sleep(poll_interval)
- except BaseException: # Something did go wrong, so decrement the counter.
- self._context.lock_counter = max(0, self._context.lock_counter - 1)
- raise
- return AcquireReturnProxy(lock=self)
- def release(self, force: bool = False) -> None: # noqa: FBT001, FBT002
- """
- Releases the file lock. Please note, that the lock is only completely released, if the lock counter is 0. Also
- note, that the lock file itself is not automatically deleted.
- :param force: If true, the lock counter is ignored and the lock is released in every case/
- """
- if self.is_locked:
- self._context.lock_counter -= 1
- if self._context.lock_counter == 0 or force:
- lock_id, lock_filename = id(self), self.lock_file
- _LOGGER.debug("Attempting to release lock %s on %s", lock_id, lock_filename)
- self._release()
- self._context.lock_counter = 0
- _LOGGER.debug("Lock %s released on %s", lock_id, lock_filename)
- def __enter__(self) -> BaseFileLock:
- """
- Acquire the lock.
- :return: the lock object
- """
- self.acquire()
- return self
- def __exit__(
- self,
- exc_type: type[BaseException] | None,
- exc_value: BaseException | None,
- traceback: TracebackType | None,
- ) -> None:
- """
- Release the lock.
- :param exc_type: the exception type if raised
- :param exc_value: the exception value if raised
- :param traceback: the exception traceback if raised
- """
- self.release()
- def __del__(self) -> None:
- """Called when the lock object is deleted."""
- self.release(force=True)
- __all__ = [
- "BaseFileLock",
- "AcquireReturnProxy",
- ]
|