"""
锁键值定义
服务名:业务模块:操作或字段
"""
# 基础数据更新或修改
# dsv2:company:basic:up_or_de:P000001
DATA_COMPANY_BASIC_UPDATE_OR_DELETE_CODE = 'data:company:basic:up_or_de:{company_code}'
# 基础数据插入
# dsv2:company:basic:insert:抖音集团
DATA_COMPANY_BASIC_INSERT_NAME = 'data:company:basic:insert:{company_name}'
"""
redis用lua脚本
"""
# 相同value批量加锁
# KEYS, [key1, key2, ..., key3]
# ARGV, [value, ttl]
BATCH_LOCK_FOR_SAME_VALUE = """
local keys_len = #KEYS
local lock_ttl = tonumber(ARGV[table.getn(ARGV)])
local value = ARGV[1]
local count = 0
for i = 1, keys_len do
local result = redis.call('setnx', KEYS[i], value)
if result == 0 then
local token = redis.call('get', KEYS[i])
if token ~= nil and token == value then
count = count + 1
end
else
redis.call('expire', KEYS[i], lock_ttl)
count = count + 1
end
end
return count
"""
# 相同value批量解锁
# KEYS, [key1, key2, ..., key3]
# ARGV, [value]
BATCH_UNLOCK_FOR_SAME_VALUE = """
local keys_len = #KEYS
local count = 0
for i = 1, keys_len do
local token = redis.call('get', KEYS[i])
if token ~= nil and token == ARGV[1] then
count = count + redis.call('del', KEYS[i])
end
end
return count
"""
import functools
import socket
import threading
import uuid
import redis
import lock_key
import lua_script_for_redis
from redis.backoff import ExponentialBackoff
from redis.exceptions import ConnectionError, TimeoutError
from redis.retry import Retry
from redis.exceptions import RedisError
class BatchLockError(RedisError):
pass
redis_client: redis.Redis = redis.Redis(
host="192.168.1.210", port=6379,
password="38c01e7ecce575ee99a0eb65337e405b3a5122bc2d72a00c1c69518adfb0dbee")
def check_equal_for_exception(n1, n2, e):
if n1 != n2:
raise e
if __name__ == "__main__":
v = str(uuid.uuid1().hex) + str(threading.get_native_id())
codes = ["P000001", "P000002", "P000003", "P000004", "P000005", "P000006", "P000007"]
keys = [lock_key.DATA_COMPANY_BASIC_INSERT_NAME.format(company_name=code) for code in codes]
batch_unlocking_script = redis_client.register_script(lua_script_for_redis.BATCH_LOCK_FOR_SAME_VALUE)
Retry(ExponentialBackoff(cap=30, base=0.25), retries=10,
supported_errors=(ConnectionError, TimeoutError, socket.timeout, BatchLockError)).call_with_retry(
functools.partial(
check_equal_for_exception, n1=len(codes),
n2=batch_unlocking_script(keys=keys, args=[v, 90 * 60], client=redis_client),
e=BatchLockError()),
lambda _: _
)