python-redis-lock

予早 2025-02-21 01:08:23
Categories: Tags:

https://pypi.org/project/python-redis-lock/

pip install python-redis-lock
import threading
import unittest

import redis
from redis_lock import Lock, NotAcquired


class ReentrantLock(Lock):
    """
    可重入锁
    1. 分布式锁
    2. 自动续期
    3. 使用id解决锁误删问题
    4. 可重入
    5. 1)非阻塞,2)阻塞有限时间,3)禁止永久阻塞
    """

    def __init__(self, redis_client, name,
                 expire=5 * 60,
                 id=str(threading.current_thread().native_id),
                 auto_renewal=True,
                 strict=True,
                 signal_expire=1000
                 ):
        """
        注意,python-redis-lock 中锁的名称有一个公共前缀 lock:,自动续期间隔为 float(expire) * 2 / 3
        """
        super().__init__(redis_client, name, expire=expire, id=id, auto_renewal=auto_renewal, strict=strict,
                         signal_expire=signal_expire)
        self._counter_name = f"{self._name}:counter"

    def __enter__(self, blocking=True):
        acquired = self.acquire(blocking=blocking)
        if not acquired:
            raise NotAcquired(f"Lock({self._name}) can't acquired!")
        return self

    def acquire(self, blocking=True, timeout=3 * 60):
        if self._held:
            acquire_res = True
            self._client.incr(self._counter_name)
        else:
            acquire_res = super().acquire(blocking=blocking, timeout=timeout)
            if acquire_res:
                self._client.set(self._counter_name, 1)
        return acquire_res

    def release(self):
        if not self._held:
            return
        counter = self._client.decrby(self._counter_name)
        if counter <= 0:
            self._client.delete(*[self._counter_name])
            super().release()


class TestReentrantLock(unittest.TestCase):

    def locks(self):
        client = redis.Redis(
            host='192.168.1.211',
            port=6379,
            password='38c01e7ecce575ee99a0eb65337e405b3a5122bc2d72a00c1c69518adfb0dbee',
            db=5
        )
        a_lock = ReentrantLock(client, name='a')
        b_lock = ReentrantLock(client, name='b')
        return a_lock, b_lock

    def test_distribute_lock(self):
        a_lock, b_lock = self.locks()
        with a_lock:
            print(a_lock.id)

    def test_auto_renew(self):
        a_lock, b_lock = self.locks()
        with a_lock:
            print(a_lock.id)

    def test_check_own_id(self):
        ...

    def test_reentrant_lock(self):
        """
        可重入测试
        :return:
        """
        a_lock, b_lock = self.locks()
        with a_lock, a_lock, a_lock:
            print(a_lock.id)

    def test_not_blocking(self):
        ...

    def check_dead_lock(self):
        ...