当前位置: 首页 > news >正文

分布式锁—2.Redisson的可重入锁一

大纲

1.Redisson可重入锁RedissonLock概述

2.可重入锁源码之创建RedissonClient实例

3.可重入锁源码之lua脚本加锁逻辑

4.可重入锁源码之WatchDog维持加锁逻辑

5.可重入锁源码之可重入加锁逻辑

6.可重入锁源码之锁的互斥阻塞逻辑

7.可重入锁源码之释放锁逻辑

8.可重入锁源码之获取锁超时与锁超时自动释放逻辑

9.可重入锁源码总结

1.Redisson可重入锁RedissonLock概述

(1)在pom.xml里引入依赖

(2)构建RedissonClient并使用Redisson

(3)Redisson可重入锁RedissonLock简单使用

(1)在pom.xml里引入依赖

<dependencies>
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.16.8</version>
     </dependency> 
</dependencies>

(2)构建RedissonClient并使用Redisson

参考官网中文文档,连接上3主3从的Redis Cluster。

//https://github.com/redisson/redisson/wiki/目录
public class Application {
    public static void main(String[] args) throws Exception {
        //连接3主3从的Redis CLuster
        Config config = new Config();
        config.useClusterServers()
            .addNodeAddress("redis://192.168.1.110:7001")
            .addNodeAddress("redis://192.168.1.110:7002")
            .addNodeAddress("redis://192.168.1.110:7003")
            .addNodeAddress("redis://192.168.1.111:7001")
            .addNodeAddress("redis://192.168.1.111:7002")
            .addNodeAddress("redis://192.168.1.111:7003");

        //创建RedissonClient实例
        RedissonClient redisson = Redisson.create(config);

        //获取可重入锁
        RLock lock = redisson.getLock("myLock");
        lock.lock();
        lock.unlock();
      
        RMap<String, Object> map = redisson.getMap("myMap");
        map.put("foo", "bar");  
      
        map = redisson.getMap("myMap");
        System.out.println(map.get("foo"));   
    }
}

(3)Redisson可重入锁RedissonLock简单使用

Redisson可重入锁RLock实现了java.util.concurrent.locks.Lock接口,同时还提供了异步(Async)、响应式(Reactive)和RxJava2标准的接口。

RLock lock = redisson.getLock("myLock");
//最常见的使用方法
lock.lock();

如果设置锁的超时时间不合理,导致超时时间已到时锁还没能主动释放,但实际上锁却被Redis节点通过过期时间释放了,这会有问题。

为了避免这种情况,Redisson内部提供了一个用来监控锁的WatchDog。WatchDog的作用是在Redisson实例被关闭前,不断地延长锁的有效期。

WatchDog检查锁的默认超时时间是30秒,可通过Config.lockWatchdogTimeout来指定。

RLock的tryLock方法提供了leaseTime参数来指定加锁的超时时间,超过这个时间后锁便自动被释放。

//如果没有主动释放锁的话,10秒后将会自动释放锁
lock.lock(10, TimeUnit.SECONDS);

//加锁等待最多是100秒;加锁成功后如果没有主动释放锁的话,锁会在10秒后自动释放
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
    try {
        ...
    } finally {
        lock.unlock();
    }
}

RLock完全符合Java的Lock规范,即只有拥有锁的进程才能解锁,其他进程解锁则会抛出IllegalMonitorStateException错误。如果需要其他进程也能解锁,那么可以使用分布式信号量Semaphore。

2.可重入锁源码之创建RedissonClient实例

(1)初始化与Redis的连接管理器ConnectionManager

(2)初始化Redis的命令执行器CommandExecutor

使用Redisson.create()方法可以根据配置创建一个RedissonClient实例,因为Redisson类会实现RedissonClient接口,而创建RedissonClient实例的主要工作其实就是:

一.初始化与Redis的连接管理器ConnectionManager

二.初始化Redis的命令执行器CommandExecutor

(1)初始化与Redis的连接管理器ConnectionManager

Redis的配置类Config会被封装在连接管理器ConnectionManager中,后续可以通过连接管理器ConnectionManager获取Redis的配置类Config。

public class Application {
    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
        //创建RedissonClient实例
        RedissonClient redisson = Redisson.create(config);
        ...
    }
}

//创建RedissonClient实例的源码
public class Redisson implements RedissonClient {
    protected final Config config;//Redis配置类
    protected final ConnectionManager connectionManager;//Redis的连接管理器
    protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器
  
    ...
    public static RedissonClient create(Config config) {
        return new Redisson(config);
    }
    
    protected Redisson(Config config) {
        this.config = config;
        Config configCopy = new Config(config);
        //根据Redis配置类Config实例创建和Redis的连接管理器
        connectionManager = ConfigSupport.createConnectionManager(configCopy);
        RedissonObjectBuilder objectBuilder = null;
        if (config.isReferenceEnabled()) {
            objectBuilder = new RedissonObjectBuilder(this);
        }
        //创建Redis的命令执行器
        commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
        evictionScheduler = new EvictionScheduler(commandExecutor);
        writeBehindService = new WriteBehindService(commandExecutor);
    }
    ...
}

public class ConfigSupport {
    ...
    //创建Redis的连接管理器
    public static ConnectionManager createConnectionManager(Config configCopy) {
        //生成UUID
        UUID id = UUID.randomUUID();
        ...
        if (configCopy.getClusterServersConfig() != null) {
            validate(configCopy.getClusterServersConfig());
            //返回ClusterConnectionManager实例
            return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
        }
        ...
    }
    ...
}

public class ClusterConnectionManager extends MasterSlaveConnectionManager {
    public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
        super(config, id);
        ...
        this.natMapper = cfg.getNatMapper();
        //将Redis的配置类Config封装在ConnectionManager中
        this.config = create(cfg);
        initTimer(this.config);
    
        Throwable lastException = null;
        List<String> failedMasters = new ArrayList<String>();
        for (String address : cfg.getNodeAddresses()) {
            RedisURI addr = new RedisURI(address);
            //异步连接Redis节点
            CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, addr, addr.getHost());
            ...
            //通过connectionFuture阻塞获取建立好的连接
            RedisConnection connection = connectionFuture.toCompletableFuture().join();
            ...
            List<ClusterNodeInfo> nodes = connection.sync(clusterNodesCommand);
            ...
            CompletableFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
            Collection<ClusterPartition> partitions = partitionsFuture.join();
            List<CompletableFuture<Void>> masterFutures = new ArrayList<>();
            for (ClusterPartition partition : partitions) {
                if (partition.isMasterFail()) {
                    failedMasters.add(partition.getMasterAddress().toString());
                    continue;
                }
                if (partition.getMasterAddress() == null) {
                    throw new IllegalStateException("Master node: " + partition.getNodeId() + " doesn't have address.");
                }
                CompletableFuture<Void> masterFuture = addMasterEntry(partition, cfg);
                masterFutures.add(masterFuture);
            }
            CompletableFuture<Void> masterFuture = CompletableFuture.allOf(masterFutures.toArray(new CompletableFuture[0]));
            masterFuture.join();
            ...
        }
        ...
    }
    ...
}

public class MasterSlaveConnectionManager implements ConnectionManager {
    protected final String id;//初始化时为UUID
    private final Map<RedisURI, RedisConnection> nodeConnections = new ConcurrentHashMap<>();
    ...
    protected MasterSlaveConnectionManager(Config cfg, UUID id) {
        this.id = id.toString();//传入的是UUID
        this.cfg = cfg;
        ...
    }
    
    protected final CompletionStage<RedisConnection> connectToNode(NodeType type, BaseConfig<?> cfg, RedisURI addr, String sslHostname) {
        RedisConnection conn = nodeConnections.get(addr);
        if (conn != null) {
            if (!conn.isActive()) {
                closeNodeConnection(conn);
            } else {
                return CompletableFuture.completedFuture(conn);
            }
        }
        //创建Redis客户端连接实例
        RedisClient client = createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);
        //向Redis服务端发起异步连接请求,这个future会层层往外返回
        CompletionStage<RedisConnection> future = client.connectAsync();
        return future.thenCompose(connection -> {
            if (connection.isActive()) {
                if (!addr.isIP()) {
                    RedisURI address = new RedisURI(addr.getScheme() + "://" + connection.getRedisClient().getAddr().getAddress().getHostAddress() + ":" + connection.getRedisClient().getAddr().getPort());
                    nodeConnections.put(address, connection);
                }
                nodeConnections.put(addr, connection);
                return CompletableFuture.completedFuture(connection);
            } else {
                connection.closeAsync();
                CompletableFuture<RedisConnection> f = new CompletableFuture<>();
                f.completeExceptionally(new RedisException("Connection to " + connection.getRedisClient().getAddr() + " is not active!"));
                return f;
            }
        });
    }
    
    //创建Redis客户端连接实例
    @Override
    public RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {
        RedisClientConfig redisConfig = createRedisConfig(type, address, timeout, commandTimeout, sslHostname);
        return RedisClient.create(redisConfig);
    }
    ...
}

//Redisson主要使用Netty去和Redis服务端建立连接
public final class RedisClient {
    private final Bootstrap bootstrap;
    private final Bootstrap pubSubBootstrap;
    ...
    public static RedisClient create(RedisClientConfig config) {
        return new RedisClient(config);
    }
    
    private RedisClient(RedisClientConfig config) {
        ...
        bootstrap = createBootstrap(copy, Type.PLAIN);
        pubSubBootstrap = createBootstrap(copy, Type.PUBSUB);
        this.commandTimeout = copy.getCommandTimeout();
    }
    
    private Bootstrap createBootstrap(RedisClientConfig config, Type type) {
        Bootstrap bootstrap = new Bootstrap()
            .resolver(config.getResolverGroup())
            .channel(config.getSocketChannelClass())
            .group(config.getGroup());
        bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());
        bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());
        bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());
        config.getNettyHook().afterBoostrapInitialization(bootstrap);
        return bootstrap;
    }
    
    //向Redis服务端发起异步连接请求
    public RFuture<RedisConnection> connectAsync() {
        CompletableFuture<InetSocketAddress> addrFuture = resolveAddr();
        CompletableFuture<RedisConnection> f = addrFuture.thenCompose(res -> {
            CompletableFuture<RedisConnection> r = new CompletableFuture<>();
            //Netty的Bootstrap发起连接
            ChannelFuture channelFuture = bootstrap.connect(res);
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(final ChannelFuture future) throws Exception {
                    if (bootstrap.config().group().isShuttingDown()) {
                        IllegalStateException cause = new IllegalStateException("RedisClient is shutdown");
                        r.completeExceptionally(cause);
                        return;
                    }
                    if (future.isSuccess()) {
                        RedisConnection c = RedisConnection.getFrom(future.channel());
                        c.getConnectionPromise().whenComplete((res, e) -> {
                            bootstrap.config().group().execute(new Runnable() {
                                @Override
                                public void run() {
                                    if (e == null) {
                                        if (!r.complete(c)) {
                                            c.closeAsync();
                                        }
                                    } else {
                                        r.completeExceptionally(e);
                                        c.closeAsync();
                                    }
                                }
                            });
                        });
                    } else {
                        bootstrap.config().group().execute(new Runnable() {
                            public void run() {
                                r.completeExceptionally(future.cause());
                            }
                        });
                    }
                }
            });
            return r;
        });
        return new CompletableFutureWrapper<>(f);
    }
    ...
}

(2)初始化Redis的命令执行器CommandExecutor

首先,CommandSyncService继承自CommandAsyncService类。

而CommandAsyncService类实现了CommandExecutor接口。

然后,ConnectionManager连接管理器会封装在命令执行器CommandExecutor中。

所以,通过CommandExecutor命令执行器可以获取连接管理器ConnectionManager。

//Redis命令的同步执行器CommandSyncService
public class CommandSyncService extends CommandAsyncService implements CommandExecutor {
    //初始化CommandExecutor
    public CommandSyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {
        super(connectionManager, objectBuilder, RedissonObjectBuilder.ReferenceType.DEFAULT);
    }
    
    public <T, R> R read(String key, RedisCommand<T> command, Object... params) {
        return read(key, connectionManager.getCodec(), command, params);
    }
    
    public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
        RFuture<R> res = readAsync(key, codec, command, params);
        return get(res);
    }
    
    public <T, R> R evalRead(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
        return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
    }
    
    public <T, R> R evalRead(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
        RFuture<R> res = evalReadAsync(key, codec, evalCommandType, script, keys, params);
        return get(res);
    }
    
    public <T, R> R evalWrite(String key, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
        return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params);
    }
    
    public <T, R> R evalWrite(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
        RFuture<R> res = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
        return get(res);
    }
}

//Redis命令的异步执行器CommandAsyncService
public class CommandAsyncService implements CommandAsyncExecutor {
    //Redis连接管理器
    final ConnectionManager connectionManager;
    final RedissonObjectBuilder objectBuilder;
    final RedissonObjectBuilder.ReferenceType referenceType;

    public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
        this.connectionManager = connectionManager;
        this.objectBuilder = objectBuilder;
        this.referenceType = referenceType;
    }
    
    @Override
    public <V> V getNow(CompletableFuture<V> future) {
        try {
            return future.getNow(null);
        } catch (Exception e) {
            return null;
        }
    }
    
    @Override
    public <T, R> R read(String key, Codec codec, RedisCommand<T> command, Object... params) {
        RFuture<R> res = readAsync(key, codec, command, params);
        return get(res);
    }
    
    @Override
    public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
        NodeSource source = getNodeSource(key);
        return async(true, source, codec, command, params, false, false);
    }
    
    private NodeSource getNodeSource(String key) {
        int slot = connectionManager.calcSlot(key);
        return new NodeSource(slot);
    }
    
    public <V, R> RFuture<R> async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommand<V> command, Object[] params, boolean ignoreRedirect, boolean noRetry) {
        CompletableFuture<R> mainPromise = createPromise();
        RedisExecutor<V, R> executor = new RedisExecutor<>(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry);
        executor.execute();
        return new CompletableFutureWrapper<>(mainPromise);
    }
    
    @Override
    public <V> V get(RFuture<V> future) {
        if (Thread.currentThread().getName().startsWith("redisson-netty")) {
            throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
        }
        try {
            return future.toCompletableFuture().get();
        } catch (InterruptedException e) {
            future.cancel(true);
            Thread.currentThread().interrupt();
            throw new RedisException(e);
        } catch (ExecutionException e) {
            throw convertException(e);
        }
    }
    ...
}

3.可重入锁源码之lua脚本加锁逻辑

(1)通过Redisson.getLock()方法获取一个RedissonLock实例

(2)加锁时的执行流程

(3)加锁时执行的lua脚本

(4)执行加锁lua脚本的命令执行器逻辑

(5)如何根据slot值获取对应的节点

(1)通过Redisson.getLock()方法获取一个RedissonLock实例

在Redisson.getLock()方法中,会传入命令执行器CommandExecutor来创建一个RedissonLock实例,而命令执行器CommandExecutor是在执行Redisson.create()方法时初始化好的,所以命令执行器CommandExecutor会被封装在RedissonLock实例中。

因此,通过RedissonLock实例可以获取一个命令执行器CommandExecutor,通过命令执行器CommandExecutor可获取连接管理器ConnectionManager,通过连接管理器ConnectionManager可获取Redis的配置信息类Config,通过Redis的配置信息类Config可以获取各种配置信息。

RedissonLock类继承自实现了RLock接口的RedissonBaseLock类。在RedissonLock的构造方法里面,有个internalLockLeaseTime变量,这个internalLockLeaseTime变量与WatchDog看门狗有关系。interlnalLockLeaseTime的默认值是30000毫秒,即30秒;

public class Application {
    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
        //创建RedissonClient实例
        RedissonClient redisson = Redisson.create(config);
        //获取可重入锁
        RLock lock = redisson.getLock("myLock");
        lock.lock();
        ...
    }
}

//创建Redisson实例
public class Redisson implements RedissonClient {
    protected final Config config;//Redis配置类
    protected final ConnectionManager connectionManager;//Redis的连接管理器
    protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器
    ...
    
    public static RedissonClient create(Config config) {
        return new Redisson(config);
    }
    
    protected Redisson(Config config) {
        ...
        //根据Redis配置类Config实例创建和Redis的连接管理器
        connectionManager = ConfigSupport.createConnectionManager(configCopy);
        //创建Redis的命令执行器
        commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
        ...
    }
    ...
    @Override
    public RLock getLock(String name) {
        return new RedissonLock(commandExecutor, name);
    }
    ...
}

//创建RedissonLock实例
//通过RedissonLock实例可以获取一个命令执行器CommandExecutor;
public class RedissonLock extends RedissonBaseLock {
    protected long internalLockLeaseTime;
    protected final LockPubSub pubSub;
    final CommandAsyncExecutor commandExecutor;
    
    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        //与WatchDog有关的internalLockLeaseTime
        //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
        //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
        //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
    ...
}

//创建Redis的命令执行器
//通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
public class CommandAsyncService implements CommandAsyncExecutor {
    final ConnectionManager connectionManager;
    ...
    public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
        this.connectionManager = connectionManager;
        this.objectBuilder = objectBuilder;
        this.referenceType = referenceType;
    }
    
    @Override
    public ConnectionManager getConnectionManager() {
        return connectionManager;
    }
    ...
}

//创建Redis的连接管理器
//通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
    ...
    public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
        super(config, id);
        ...
    }
    ...
}

//创建Redis的连接管理器
//通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
public class MasterSlaveConnectionManager implements ConnectionManager {
    private final Config cfg;
    protected final String id;//初始化时为UUID
    ...
    protected MasterSlaveConnectionManager(Config cfg, UUID id) {
        this.id = id.toString();//传入的是UUID
        this.cfg = cfg;
        ...
    }
    
    @Override
    public Config getCfg() {
        return cfg;
    }
    ...
}

//配置信息类Config中的lockWatchdogTimeout变量初始化为30秒,该变量与WatchDog有关
public class Config {
    private long lockWatchdogTimeout = 30 * 1000;
    ...
    //This parameter is only used if lock has been acquired without leaseTimeout parameter definition. 
    //Lock expires after "lockWatchdogTimeout" if watchdog didn't extend it to next "lockWatchdogTimeout" time interval.
    //This prevents against infinity locked locks due to Redisson client crush or any other reason when lock can't be released in proper way.
    //Default is 30000 milliseconds
    public Config setLockWatchdogTimeout(long lockWatchdogTimeout) {
        this.lockWatchdogTimeout = lockWatchdogTimeout;
        return this;
    }
    
    public long getLockWatchdogTimeout() {
        return lockWatchdogTimeout;
    }
}

默认情况下,调用RedissonLock.lock()方法加锁时,传入的leaseTime为-1。此时锁的超时时间会设为lockWatchdogTimeout默认的30秒,从而避免出现死锁的情况。

public class RedissonLock extends RedissonBaseLock {
    ...
    //加锁
    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }
    
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        ...
    }
    
    //解锁
    @Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
    }
    ...
}

(2)加锁时的执行流程

首先会调用RedissonLock的tryAcquire()方法处理异步RFuture相关,然后调用RedissonLock的tryAcquireAsync()方法对执行脚本的结果进行处理,接着调用RedissonLock.tryLockInnerAsync方法执行加锁的lua脚本。

public class RedissonLock extends RedissonBaseLock {
    protected long internalLockLeaseTime;
    protected final LockPubSub pubSub;
    final CommandAsyncExecutor commandExecutor;
    
    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        //与WatchDog有关的internalLockLeaseTime
        //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
        //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
        //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
    ...
    //加锁
    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }
    
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //线程ID,用来生成设置Hash的值
        long threadId = Thread.currentThread().getId();
        //尝试加锁,此时执行RedissonLock.lock()方法默认传入的leaseTime=-1
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        //ttl为null说明加锁成功
        if (ttl == null) {
            return;
        }
        //加锁失败时的处理
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }
        try {
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }
                // waiting for message
                if (ttl >= 0) {
                    try {
                        commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        commandExecutor.getNow(future).getLatch().acquire();
                    } else {
                        commandExecutor.getNow(future).getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(commandExecutor.getNow(future), threadId);
        }
    }
    ...
    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //默认下waitTime和leaseTime都是-1,下面调用的get方法是来自于RedissonObject的get()方法
        //可以理解为异步转同步:将异步的tryAcquireAsync通过get转同步
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
    
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime != -1) {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            //默认情况下,由于leaseTime=-1,所以会使用初始化RedissonLock实例时的internalLockLeaseTime
            //internalLockLeaseTime的默认值就是lockWatchdogTimeout的默认值,30秒
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
            //加锁返回的ttlRemaining为null表示加锁成功
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    scheduleExpirationRenewal(threadId);
                }
            }
            return ttlRemaining;
        });
        return new CompletableFutureWrapper<>(f);
    }
    
    //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()),//锁的名字:KEYS[1]
            unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒
            getLockName(threadId)//ARGV[2],值为UUID + 线程ID
        );
    }
    ...
}

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
    final String id;
    final String entryName;
    final CommandAsyncExecutor commandExecutor;
    
    public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.id = commandExecutor.getConnectionManager().getId();
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
    }
    
    protected String getLockName(long threadId) {
        return id + ":" + threadId;
    }
    ...
}

abstract class RedissonExpirable extends RedissonObject implements RExpirable {
    ...
    RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
        super(connectionManager, name);
    }
    ...
}

public abstract class RedissonObject implements RObject {
    protected final CommandAsyncExecutor commandExecutor;
    protected String name;
    protected final Codec codec;
    
    public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
        this.codec = codec;
        this.commandExecutor = commandExecutor;
        if (name == null) {
            throw new NullPointerException("name can't be null");
        }
        setName(name);
    }
    ...
    protected final <V> V get(RFuture<V> future) {
        //下面会调用CommandAsyncService.get()方法
        return commandExecutor.get(future);
    }
    ...
}

public class CommandAsyncService implements CommandAsyncExecutor {
    ...
    @Override
    public <V> V get(RFuture<V> future) {
        if (Thread.currentThread().getName().startsWith("redisson-netty")) {
            throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
        }
        try {
            return future.toCompletableFuture().get();
        } catch (InterruptedException e) {
            future.cancel(true);
            Thread.currentThread().interrupt();
            throw new RedisException(e);
        } catch (ExecutionException e) {
            throw convertException(e);
        }
    }
    ...
}

(3)加锁时执行的lua脚本

public class RedissonLock extends RedissonBaseLock {
    //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()),//锁的名字:KEYS[1],比如"myLock"
            unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒
            getLockName(threadId)//ARGV[2],值为UUID + 线程ID
        );
    }
    ...
}

首先执行Redis的exists命令,判断key为锁名的Hash值是否不存在,也就是判断key为锁名myLock的Hash值是否存在。

一.如果key为锁名的Hash值不存在,那么就进行如下加锁处理

首先通过Redis的hset命令设置一个key为锁名的Hash值。该Hash值的key为锁名,value是一个映射。也就是在value值中会有一个field为UUID + 线程ID,value为1的映射。比如:hset myLock UUID:ThreadID 1,lua脚本中的ARGV[2]就是由UUID + 线程ID组成的唯一值。

然后通过Redis的pexpire命令设置key为锁名的Hash值的过期时间,也就是设置key为锁名的Hash值的过期时间为30秒。比如:pexpire myLock 30000。所以默认情况下,myLock这个锁在30秒后就会自动过期。

二.如果key为锁名的Hash值存在,那么就执行如下判断处理

首先通过Redis的hexists命令判断在key为锁名的Hash值里,field为UUID + 线程ID的映射是否已经存在。

如果在key为锁名的Hash值里,field为UUID + 线程ID的映射存在,那么就通过Redis的hincrby命令,对field为UUID + 线程ID的value值进行递增1。比如:hincrby myLock UUID:ThreadID 1。也就是在key为myLock的Hash值里,把field为UUID:ThreadID的value值从1累加到2,发生这种情况的时候往往就是当前线程对锁进行了重入。接着执行:pexpire myLock 30000,再次将myLock的有效期设置为30秒。

如果在key为锁名的Hash值里,field为UUID + 线程ID的映射不存在,发生这种情况的时候往往就是其他线程获取不到这把锁而产生互斥。那么就通过Redis的pttl命令,返回key为锁名的Hash值的剩余存活时间,因为不同线程的ARGV[2]是不一样的,ARGV[2] = UUID + 线程ID。

(4)执行加锁lua脚本的命令执行器逻辑

在RedissonLock的tryLockInnerAsync()方法中,会通过RedissonBaseLock的evalWriteAsync()方法执行lua脚本,即通过CommandAsyncService的evalWriteAsync()方法执行lua脚本。

在CommandAsyncService的evalWriteAsync()方法中,首先会执行CommandAsyncService的getNodeSource()方法获取对应的节点。然后执行CommandAsyncService的evalAsync()方法来执行lua脚本。

在CommandAsyncService的getNodeSource()方法中,会根据key进行CRC16运算,然后再对16384取模,计算出key的slot值。然后根据这个slot值创建一个NodeSource实例进行返回。

在CommandAsyncService的evalAsync()方法中,会将获得的NodeSource实例封装到Redis执行器RedisExecutor里。然后执行RedisExecutor,实现将脚本请求发送给对应的Redis节点处理。

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
    //从外部传入的:在创建实现了RedissonClient的Redisson实例时,初始化的命令执行器CommandExecutor
    final CommandAsyncExecutor commandExecutor;
    public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        this.id = commandExecutor.getConnectionManager().getId();
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.entryName = id + ":" + name;
    }
    ...
    protected <T> RFuture<T> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
        //获取可用的节点,并继续封装一个命令执行器CommandBatchService
        MasterSlaveEntry entry = commandExecutor.getConnectionManager().getEntry(getRawName());
        int availableSlaves = entry.getAvailableSlaves();
        CommandBatchService executorService = createCommandBatchService(availableSlaves);
        //通过CommandAsyncService.evalWriteAsync方法执行lua脚本
        RFuture<T> result = executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);
        if (commandExecutor instanceof CommandBatchService) {
            return result;
        }
        //异步执行然后获取结果
        RFuture<BatchResult<?>> future = executorService.executeAsync();
        CompletionStage<T> f = future.handle((res, ex) -> {
            if (ex == null && res.getSyncedSlaves() != availableSlaves) {
                throw new CompletionException(new IllegalStateException("Only " + res.getSyncedSlaves() + " of " + availableSlaves + " slaves were synced"));
            }
            return result.getNow();
        });
        return new CompletableFutureWrapper<>(f);
    }
    
    private CommandBatchService createCommandBatchService(int availableSlaves) {
        if (commandExecutor instanceof CommandBatchService) {
            return (CommandBatchService) commandExecutor;
        }
        BatchOptions options = BatchOptions.defaults().syncSlaves(availableSlaves, 1, TimeUnit.SECONDS);
        return new CommandBatchService(commandExecutor, options);
    }
    ...
}

public class CommandBatchService extends CommandAsyncService {
    ...
    public CommandBatchService(CommandAsyncExecutor executor, BatchOptions options) {
        this(executor.getConnectionManager(), options, executor.getObjectBuilder(), RedissonObjectBuilder.ReferenceType.DEFAULT);
    }
    
    private CommandBatchService(ConnectionManager connectionManager, BatchOptions options, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
        super(connectionManager, objectBuilder, referenceType);
        this.options = options;
    }
    ...
}

public class CommandAsyncService implements CommandAsyncExecutor {
    final ConnectionManager connectionManager;
    final RedissonObjectBuilder objectBuilder;
    final RedissonObjectBuilder.ReferenceType referenceType;
    
    public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {
        this.connectionManager = connectionManager;
        this.objectBuilder = objectBuilder;
        this.referenceType = referenceType;
    }
    ...
    @Override
    public <T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params) {
        //获取key对应的节点
        NodeSource source = getNodeSource(key);
        //让对应的节点执行lua脚本请求
        return evalAsync(source, false, codec, evalCommandType, script, keys, false, params);
    }
    
    //获取key对应的Redis Cluster节点
    private NodeSource getNodeSource(String key) {
        //先计算key对应的slot值
        int slot = connectionManager.calcSlot(key);
        //返回节点实例
        return new NodeSource(slot);
    }
    
    //执行lua脚本
    private <T, R> RFuture<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, boolean noRetry, Object... params) {
        if (isEvalCacheActive() && evalCommandType.getName().equals("EVAL")) {
            CompletableFuture<R> mainPromise = new CompletableFuture<>();
            Object[] pps = copy(params);
            CompletableFuture<R> promise = new CompletableFuture<>();
            String sha1 = calcSHA(script);
            RedisCommand cmd = new RedisCommand(evalCommandType, "EVALSHA");
            List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
            args.add(sha1);
            args.add(keys.size());
            args.addAll(keys);
            args.addAll(Arrays.asList(params));
            //将根据key进行CRC16运算然后对16384取模获取到的NodeSource实例,封装到Redis执行器RedisExecutor中
            RedisExecutor<T, R> executor = new RedisExecutor<>(readOnlyMode, nodeSource, codec, cmd, args.toArray(), promise, false, connectionManager, objectBuilder, referenceType, noRetry);
            //通过执行Redis执行器RedisExecutor,来实现将lua脚本请求发送给对应的Redis节点进行处理
            executor.execute();
            ...
        }
        ...
    }
    ...
}

public class ClusterConnectionManager extends MasterSlaveConnectionManager {
    public static final int MAX_SLOT = 16384;//Redis Cluster默认有16384个slot
    ...
    //对key进行CRC16运算,然后再对16384取模
    @Override
    public int calcSlot(String key) {
        if (key == null) {
            return 0;
        }

        int start = key.indexOf('{');
        if (start != -1) {
            int end = key.indexOf('}');
            if (end != -1 && start + 1 < end) {
                key = key.substring(start + 1, end);
            }
        }

        int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
        return result;
    }
    ...
}

(5)如何根据slot值获取对应的节点

因为最后会执行封装了NodeSource实例的RedisExecutor的excute()方法,而NodeSource实例中又会封装了锁名key对应的slot值,所以RedisExecutor的excute()方法可以通过getConnection()方法获取对应节点的连接。

其中RedisExecutor的getConnection()方法会调用到MasterSlaveConnectionManager的connectionWriteOp()方法,该方法又会通过调用ConnectionManager的getEntry()方法根据slot值获取节点,也就是由ClusterConnectionManager的getEntry()方法去获取Redis的主节点。

其实在初始化连接管理器ClusterConnectionManager时,就已经根据配置初始化好哪些slot映射到那个Redis主节点了。

public class RedisExecutor<V, R> {
    NodeSource source;
    ...
    public void execute() {
        ...
        //异步获取建立好的Redis连接
        CompletableFuture<RedisConnection> connectionFuture = getConnection().toCompletableFuture();
        ...
    }
    
    protected CompletableFuture<RedisConnection> getConnection() {
        ...
        connectionFuture = connectionManager.connectionWriteOp(source, command);
        return connectionFuture;
    }
    ...
}

public class MasterSlaveConnectionManager implements ConnectionManager {
    ...
    @Override
    public CompletableFuture<RedisConnection> connectionWriteOp(NodeSource source, RedisCommand<?> command) {
        MasterSlaveEntry entry = getEntry(source);
        ...
    }
    
    private MasterSlaveEntry getEntry(NodeSource source) {
        if (source.getRedirect() != null) {
            return getEntry(source.getAddr());
        }
        MasterSlaveEntry entry = source.getEntry();
        if (source.getRedisClient() != null) {
            entry = getEntry(source.getRedisClient());
        }
        if (entry == null && source.getSlot() != null) {
            //根据slot获取Redis的主节点
            entry = getEntry(source.getSlot());
        }
        return entry;
    }
    ...
}

public class ClusterConnectionManager extends MasterSlaveConnectionManager {
    //slot和Redis主节点的原子映射数组
    private final AtomicReferenceArray<MasterSlaveEntry> slot2entry = new AtomicReferenceArray<>(MAX_SLOT);
    //Redis客户端连接和Redis主节点的映射关系
    private final Map<RedisClient, MasterSlaveEntry> client2entry = new ConcurrentHashMap<>();
    ...
    @Override
    public MasterSlaveEntry getEntry(int slot) {
        //根据slot获取Redis的主节点
        return slot2entry.get(slot);
    }
    ...
    //初始化连接管理器ClusterConnectionManager时
    //就已经根据配置初始化好那些slot映射到那个Redis主节点了
    public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
        ...
        for (String address : cfg.getNodeAddresses()) {
            ...
            CompletableFuture<Collection<ClusterPartition>> partitionsFuture = parsePartitions(nodes);
            Collection<ClusterPartition> partitions = partitionsFuture.join();
            List<CompletableFuture<Void>> masterFutures = new ArrayList<>();
            for (ClusterPartition partition : partitions) {
                ...
                CompletableFuture<Void> masterFuture = addMasterEntry(partition, cfg);
                masterFutures.add(masterFuture);
            }
            ...
        }
        ...
    }
    
    private CompletableFuture<Void> addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
        ...
        CompletionStage<RedisConnection> connectionFuture = connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName);
        connectionFuture.whenComplete((connection, ex1) -> {
            //成功连接时的处理
            if (ex1 != null) {
                log.error("Can't connect to master: {} with slot ranges: {}", partition.getMasterAddress(), partition.getSlotRanges());
                result.completeExceptionally(ex1);
                return;
            }
            MasterSlaveServersConfig config = create(cfg);
            config.setMasterAddress(partition.getMasterAddress().toString());
            //创建Redis的主节点
            MasterSlaveEntry entry;
            if (config.checkSkipSlavesInit()) {
                entry = new SingleEntry(ClusterConnectionManager.this, config);
            } else {
                Set<String> slaveAddresses = partition.getSlaveAddresses().stream().map(r -> r.toString()).collect(Collectors.toSet());
                config.setSlaveAddresses(slaveAddresses);
                entry = new MasterSlaveEntry(ClusterConnectionManager.this, config);
            }


            CompletableFuture<RedisClient> f = entry.setupMasterEntry(new RedisURI(config.getMasterAddress()), configEndpointHostName);
            f.whenComplete((masterClient, ex3) -> {
                if (ex3 != null) {
                    log.error("Can't add master: " + partition.getMasterAddress() + " for slot ranges: " + partition.getSlotRanges(), ex3);
                    result.completeExceptionally(ex3);
                    return;
                }
                //为创建的Redis的主节点添加slot值
                for (Integer slot : partition.getSlots()) {
                    addEntry(slot, entry);
                    lastPartitions.put(slot, partition);
                }
                ...
            });
        });
        ...
    }
        
    //添加slot到对应节点的映射关系
    private void addEntry(Integer slot, MasterSlaveEntry entry) {
        MasterSlaveEntry oldEntry = slot2entry.getAndSet(slot, entry);
        if (oldEntry != entry) {
            entry.incReference();
            shutdownEntry(oldEntry);
        }
        client2entry.put(entry.getClient(), entry);
    }
    ...
}

相关文章:

  • 机器学习(五)
  • 线程相关八股
  • 【论文分析】语义驱动+迁移强化学习:无人机自主视觉导航的高效解决方案(语义驱动的无人机自主视觉导航)
  • 基于开源库编写MQTT通讯
  • Unity 内置渲染管线各个Shader的用途和性能分析,以及如何修改Shader(build in shader 源码下载)
  • Spring(二)容器
  • 2025年能源工作指导意见
  • 6.C#对接微信Native支付(退款申请、退款回调通知)
  • 分布式中间件:Redis介绍
  • Linux驱动开发之串口驱动移植
  • Android Studio 新版本Gradle发布本地Maven仓库示例
  • The Rust Programming Language 学习 (二)
  • jupyter汉化、修改默认路径详细讲解
  • STM32标准库之编码器接口示例代码
  • Flutter管理项目实战
  • 蓝桥试题:斐波那契数列
  • 【Leetcode 每日一题】1278. 分割回文串 III
  • SpringBoot系列之Spring AI+DeekSeek创建AI应用
  • 【每日八股】计算机网络篇(二):TCP 和 UDP
  • 虚拟机配置
  • 王毅:携手做世界和平与发展事业的中流砥柱
  • 体坛联播|巴萨“三杀”皇马夺国王杯,陈妤颉破亚洲少年纪录
  • 欢迎回家!日本和歌山县4只大熊猫将于6月底送返中国
  • 民生访谈|公共数据如何既开放又安全?政务领域如何适度运用人工智能?
  • 现场|贝聿铭上海大展:回到他建筑梦的初始之地
  • 石磊当选河北秦皇岛市市长