portalocker package

Submodules

Module contents

exception portalocker.AlreadyLocked(*args: Any, fh: IO | None = None, **kwargs: Any)[source]

Bases: LockException

class portalocker.BoundedSemaphore(maximum: int, name: str = 'bounded_semaphore', filename_pattern: str = '{name}.{number:02d}.lock', directory: str = '/usr/src/tmp', timeout: float | None = 5, check_interval: float | None = 0.25, fail_when_locked: bool | None = True)[source]

Bases: LockBase

Bounded semaphore to prevent too many parallel processes from running

It’s also possible to specify a timeout when acquiring the lock to wait for a resource to become available. This is very similar to threading.BoundedSemaphore but works across multiple processes and across multiple operating systems.

>>> semaphore = BoundedSemaphore(2, directory='')
>>> str(semaphore.get_filenames()[0])
'bounded_semaphore.00.lock'
>>> str(sorted(semaphore.get_random_filenames())[1])
'bounded_semaphore.01.lock'
acquire(timeout: float = None, check_interval: float = None, fail_when_locked: bool = None) Lock | None[source]
get_filename(number) Path[source]
get_filenames() Sequence[Path][source]
get_random_filenames() Sequence[Path][source]
lock: Lock | None
release()[source]
try_lock(filenames: Sequence[str | Path]) bool[source]
portalocker.LOCK_EX: LockFlags = LockFlags.EXCLUSIVE

Place an exclusive lock. Only one process may hold an exclusive lock for a given file at a given time.

portalocker.LOCK_NB: LockFlags = LockFlags.NON_BLOCKING

Acquire the lock in a non-blocking fashion.

portalocker.LOCK_SH: LockFlags = LockFlags.SHARED

Place a shared lock. More than one process may hold a shared lock for a given file at a given time.

portalocker.LOCK_UN: LockFlags = LockFlags.UNBLOCK

Remove an existing lock held by this process.

class portalocker.Lock(filename: str | Path, mode: str = 'a', timeout: float = None, check_interval: float = 0.25, fail_when_locked: bool = False, flags: LockFlags = LockFlags.EXCLUSIVE | NON_BLOCKING, **file_open_kwargs)[source]

Bases: LockBase

Lock manager with built-in timeout

Parameters:
  • filename – filename

  • mode – the open mode, ‘a’ or ‘ab’ should be used for writing. When mode contains w the file will be truncated to 0 bytes.

  • timeout – timeout when trying to acquire a lock

  • check_interval – check interval while waiting

  • fail_when_locked – after the initial lock failed, return an error or lock the file. This does not wait for the timeout.

  • **file_open_kwargs – The kwargs for the open(...) call

fail_when_locked is useful when multiple threads/processes can race when creating a file. If set to true than the system will wait till the lock was acquired and then return an AlreadyLocked exception.

Note that the file is opened first and locked later. So using ‘w’ as mode will result in truncate _BEFORE_ the lock is checked.

acquire(timeout: float = None, check_interval: float = None, fail_when_locked: bool = None) IO[source]

Acquire the locked filehandle

release()[source]

Releases the currently locked file handle

exception portalocker.LockException(*args: Any, fh: IO | None = None, **kwargs: Any)[source]

Bases: BaseLockException

class portalocker.LockFlags(value, names=None, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntFlag

EXCLUSIVE = 2

exclusive lock

NON_BLOCKING = 4

non-blocking

SHARED = 1

shared lock

UNBLOCK = 8

unlock

class portalocker.RLock(filename, mode='a', timeout=5, check_interval=0.25, fail_when_locked=False, flags=LockFlags.EXCLUSIVE | NON_BLOCKING)[source]

Bases: Lock

A reentrant lock, functions in a similar way to threading.RLock in that it can be acquired multiple times. When the corresponding number of release() calls are made the lock will finally release the underlying file lock.

acquire(timeout: float = None, check_interval: float = None, fail_when_locked: bool = None) IO[source]

Acquire the locked filehandle

release()[source]

Releases the currently locked file handle

portalocker.lock(file_: IO, flags: LockFlags)[source]

Lock a file. Note that this is an advisory lock on Linux/Unix systems

portalocker.open_atomic(filename: str | Path, binary: bool = True) Iterator[IO][source]

Open a file for atomic writing. Instead of locking this method allows you to write the entire file and move it to the actual location. Note that this makes the assumption that a rename is atomic on your platform which is generally the case but not a guarantee.

http://docs.python.org/library/os.html#os.rename

>>> filename = 'test_file.txt'
>>> if os.path.exists(filename):
...     os.remove(filename)
>>> with open_atomic(filename) as fh:
...     written = fh.write(b'test')
>>> assert os.path.exists(filename)
>>> os.remove(filename)
>>> import pathlib
>>> path_filename = pathlib.Path('test_file.txt')
>>> with open_atomic(path_filename) as fh:
...     written = fh.write(b'test')
>>> assert path_filename.exists()
>>> path_filename.unlink()
portalocker.unlock(file_: IO)[source]

Unlock a file