Lettuce简介

Lettuce是一个开源的redis client,由javg编写。其主要的宣传特性是线程安全、支持同步异步以及reactive api、支持单实例redis、redis sentinel、redis cluster等。

Lettuce的实现主要是基于netty实现了网络层逻辑,通过专门的设计支持在同一个连接上并发的处理多个请求响应。

使用实例

1、 添加依赖

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>${lettuce.version}</version>
</dependency>

2、创建RedisClient并执行命令

//创建客户端,也可以与Spring集成
RedisClient redisClient = RedisClient.create("redis://password@localhost:6379/0");
StatefulRedisConnection<String, String> connection = redisClient.connect();
//获取同步API
RedisCommands<String, String> syncCommands = connection.sync();
syncCommands.set("key", "Hello, Redis!");
//获取异步API
RedisAsyncCommands<String, String> asyncCommands = connection.async();
asyncCommands.get("key").thenAccept(result -> System.out.println("the result is " + result));  
//获取reactive API
RedisReactiveCommands<String, String> reactiveCommands = connection.reactive();
reactiveCommands.get("key").subscribe(result -> System.out.println("the result is " + result));
//关闭连接和客户端
connection.close();
redisClient.shutdown();

StatefulRedisConnection

StatefulRedisConnection是一个接口,它代表客户端与服务端的一个连接,同步、异步和reactive三种api底层都是通过StatefulRedisConnection与服务端通信。

连接的创建

连接的创建由RedisClient的connect方法执行,connect方法需要传入编解码所需的codec与服务端的URI。codec可以不需要指定,默认是StringCodec.UTF8。connect方法最终会调用到connectStandaloneAsync方法,在这里执行真正的连接创建逻辑,主要包含三个部分:

  1. 创建DefaultEndpoint对象,DefaultEndpoint包含了连接自身的管理以及向对端发送命令的逻辑。
  2. 创建StatefulRedisConnectionImpl对象,它是StatefulRedisConnection的实际实现。
  3. 调用connectStatefulAsync方法进行实际的连接。

connectStatefulAsync方法中首先会创建ConnectionBuilder对象,其中包含了连接的各个选项,比如连接地址、netty相关的配置等等。接着会通过解析redis uri获得一个socketAddress,然后触发initializeChannelAsync0方法中的netty的connect操作,于是连接就建立了。

连接状态的管理

lettuce是基于netty实现的,所以它在netty的channel上添加了几个自定义的handler,其中具体包括:

  1. ChannelGroupHandler,用于维护channelGroup的状态,主要逻辑就是在连接建立或者断开时将对应的连接添加到channelGroup或者从channelGroup中移除
  2. CommandEncoder,用于redis command的编码
  3. CommandHandler,lettuce的核心handler,主要负责redis command发送以及服务端响应的解析
  4. ConnectionWatchdog,用于监控连接状态和连接的自动重连
  5. ConnectionEventTrigger,用于转发channel的状态事件,对外暴露出listener,用户可以借此监听连接变化

连接状态的管理主要有ChannelGroupHandler与ConnectionWatchDog实现,其中ChannelGroupHandler的逻辑比较简单,ConnectionWatchDog的实现主要是监听channelInactive事件,然后提交一个重连任务到线程池。

此外,DefaultEndpoint也会监听连接状态的变化,实际上整个通信过程中与服务端的连接由DefaultEndpoint对象持有,当发生连接变化(比如断开或者重连)时,DefaultEndpoint会根据连接的状态执行不同的逻辑,比如:

  1. 当连接建立时,将持有的连接对象替换成新建立的连接
  2. 当发送数据时,先判断连接的状态,如果连接正常则直接通过连接发送;否则将数据添加到自身的缓冲里
  3. 当连接重连时,将缓冲的数据发送出去,并将持有的连接对象替换成新建立的连接

CommandHandler

CommandHandler是lettuce的核心部分,它自身也是一个channel handler,拦截了redis command的write和服务端响应的read等逻辑。

发送command

CommandHandler内部维护了一个command queue,所有发出的command都在这个queue中保存。当执行命令时,CommandHandler会拦截write方法,将其保存到queue中。

读取响应

当收到来自服务端的响应时,CommandHandler就从queue head取出command然后进行解析,解析成功后command就执行完毕,这时将其从queue中移除即可,如果还有剩余的数据,就继续从queue中取出command进行解析。值得注意的是CommandHandler采用了buffer的方案来处理可能的TCP粘包拆包问题,其内部维护了一个单独的buffer,每次接收到服务端的数据时并不直接进行解析,而是先将其写入到自身的buffer中,然后对buffer的数据进行解析,而每次解析时会尽可能的消费buffer中的所有数据。

command的编解码

command的编码在CommandEncoder中触发,其逻辑也比较简单,就是直接调用了command的encode方法。对于服务端响应的解析则直接位于CommandHandler的channelRead方法中。command的编解码也就是直接实现了redis的通信协议,具体可以参考:redis通信协议

API的实现

异步API

在对外API的实现方面,主要逻辑由以下几步构成:

  1. 组装命令对应的Command对象,包括命令的类型、命令的参数、命令的返回类型等
  2. 通过StatefulRedisConnection的dispatch方法发送命令,dispatch方法实际上也就是调用了channel的write方法
  3. 返回对应的Command对象

在异步API的实现中,返回的是一个AsyncCommand对象,AsyncCommand继承了CompletableFuture类型,所以可以作为一个Future返回。AsyncCommand内维护了原始的命令以及返回值的占位符,当解析完成后AsyncCommand就会调用其自身的compelte方法,调用就完成了。

同步API

同步API的实现实际上就是封装了异步调用的API,在返回前阻塞调用Future的get方法。

reactive API

reactive API的实现底层也是调用了StatefulRedisConnection的dispatch方法发送命令,然后将返回包装成一个Publisher。