redispy

予早 2025-02-21 01:08:23
Categories: Tags:
"""
锁键值定义
服务名:业务模块:操作或字段
"""

# 基础数据更新或修改
# dsv2:company:basic:up_or_de:P000001
DATA_COMPANY_BASIC_UPDATE_OR_DELETE_CODE = 'data:company:basic:up_or_de:{company_code}'

# 基础数据插入
# dsv2:company:basic:insert:抖音集团
DATA_COMPANY_BASIC_INSERT_NAME = 'data:company:basic:insert:{company_name}'
"""
redis用lua脚本
"""

# 相同value批量加锁
# KEYS, [key1, key2, ..., key3]
# ARGV, [value, ttl]
BATCH_LOCK_FOR_SAME_VALUE = """
local keys_len = #KEYS
local lock_ttl = tonumber(ARGV[table.getn(ARGV)])
local value = ARGV[1]

local count = 0
for i = 1, keys_len do
    local result = redis.call('setnx', KEYS[i], value)
    if result == 0 then
        local token = redis.call('get', KEYS[i])
        if token ~= nil and token == value then
            count = count + 1
        end
    else
        redis.call('expire', KEYS[i], lock_ttl)
        count = count + 1
    end
end
return count
"""

# 相同value批量解锁
# KEYS, [key1, key2, ..., key3]
# ARGV, [value]
BATCH_UNLOCK_FOR_SAME_VALUE = """
local keys_len = #KEYS

local count = 0
for i = 1, keys_len do
    local token = redis.call('get', KEYS[i])
    if token ~= nil and token == ARGV[1] then
        count = count + redis.call('del', KEYS[i])
    end
end

return count
"""
import functools
import socket
import threading
import uuid
import redis
import lock_key
import lua_script_for_redis
from redis.backoff import ExponentialBackoff
from redis.exceptions import ConnectionError, TimeoutError
from redis.retry import Retry
from redis.exceptions import RedisError


class BatchLockError(RedisError):
    pass


redis_client: redis.Redis = redis.Redis(
    host="192.168.1.210", port=6379,
    password="38c01e7ecce575ee99a0eb65337e405b3a5122bc2d72a00c1c69518adfb0dbee")


def check_equal_for_exception(n1, n2, e):
    if n1 != n2:
        raise e


if __name__ == "__main__":
    v = str(uuid.uuid1().hex) + str(threading.get_native_id())
    codes = ["P000001", "P000002", "P000003", "P000004", "P000005", "P000006", "P000007"]
    keys = [lock_key.DATA_COMPANY_BASIC_INSERT_NAME.format(company_name=code) for code in codes]
    batch_unlocking_script = redis_client.register_script(lua_script_for_redis.BATCH_LOCK_FOR_SAME_VALUE)

    Retry(ExponentialBackoff(cap=30, base=0.25), retries=10,
          supported_errors=(ConnectionError, TimeoutError, socket.timeout, BatchLockError)).call_with_retry(
        functools.partial(
            check_equal_for_exception, n1=len(codes),
            n2=batch_unlocking_script(keys=keys, args=[v, 90 * 60], client=redis_client),
            e=BatchLockError()),
        lambda _: _
    )