Lettuce学习笔记
Lettuce简介
Lettuce是一个开源的redis client,由javg编写。其主要的宣传特性是线程安全、支持同步异步以及reactive api、支持单实例redis、redis sentinel、redis cluster等。
Lettuce的实现主要是基于netty实现了网络层逻辑,通过专门的设计支持在同一个连接上并发的处理多个请求响应。
使用实例
1、 添加依赖
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>${lettuce.version}</version>
</dependency>
2、创建RedisClient并执行命令
//创建客户端,也可以与Spring集成
RedisClient redisClient = RedisClient.create("redis://password@localhost:6379/0");
StatefulRedisConnection<String, String> connection = redisClient.connect();
//获取同步API
RedisCommands<String, String> syncCommands = connection.sync();
syncCommands.set("key", "Hello, Redis!");
//获取异步API
RedisAsyncCommands<String, String> asyncCommands = connection.async();
asyncCommands.get("key").thenAccept(result -> System.out.println("the result is " + result));
//获取reactive API
RedisReactiveCommands<String, String> reactiveCommands = connection.reactive();
reactiveCommands.get("key").subscribe(result -> System.out.println("the result is " + result));
//关闭连接和客户端
connection.close();
redisClient.shutdown();
StatefulRedisConnection
StatefulRedisConnection是一个接口,它代表客户端与服务端的一个连接,同步、异步和reactive三种api底层都是通过StatefulRedisConnection与服务端通信。
连接的创建
连接的创建由RedisClient的connect方法执行,connect方法需要传入编解码所需的codec与服务端的URI。codec可以不需要指定,默认是StringCodec.UTF8。connect方法最终会调用到connectStandaloneAsync方法,在这里执行真正的连接创建逻辑,主要包含三个部分:
- 创建DefaultEndpoint对象,DefaultEndpoint包含了连接自身的管理以及向对端发送命令的逻辑。
- 创建StatefulRedisConnectionImpl对象,它是StatefulRedisConnection的实际实现。
- 调用connectStatefulAsync方法进行实际的连接。
connectStatefulAsync方法中首先会创建ConnectionBuilder对象,其中包含了连接的各个选项,比如连接地址、netty相关的配置等等。接着会通过解析redis uri获得一个socketAddress,然后触发initializeChannelAsync0方法中的netty的connect操作,于是连接就建立了。
连接状态的管理
lettuce是基于netty实现的,所以它在netty的channel上添加了几个自定义的handler,其中具体包括:
- ChannelGroupHandler,用于维护channelGroup的状态,主要逻辑就是在连接建立或者断开时将对应的连接添加到channelGroup或者从channelGroup中移除
- CommandEncoder,用于redis command的编码
- CommandHandler,lettuce的核心handler,主要负责redis command发送以及服务端响应的解析
- ConnectionWatchdog,用于监控连接状态和连接的自动重连
- ConnectionEventTrigger,用于转发channel的状态事件,对外暴露出listener,用户可以借此监听连接变化
连接状态的管理主要有ChannelGroupHandler与ConnectionWatchDog实现,其中ChannelGroupHandler的逻辑比较简单,ConnectionWatchDog的实现主要是监听channelInactive事件,然后提交一个重连任务到线程池。
此外,DefaultEndpoint也会监听连接状态的变化,实际上整个通信过程中与服务端的连接由DefaultEndpoint对象持有,当发生连接变化(比如断开或者重连)时,DefaultEndpoint会根据连接的状态执行不同的逻辑,比如:
- 当连接建立时,将持有的连接对象替换成新建立的连接
- 当发送数据时,先判断连接的状态,如果连接正常则直接通过连接发送;否则将数据添加到自身的缓冲里
- 当连接重连时,将缓冲的数据发送出去,并将持有的连接对象替换成新建立的连接
CommandHandler
CommandHandler是lettuce的核心部分,它自身也是一个channel handler,拦截了redis command的write和服务端响应的read等逻辑。
发送command
CommandHandler内部维护了一个command queue,所有发出的command都在这个queue中保存。当执行命令时,CommandHandler会拦截write方法,将其保存到queue中。
读取响应
当收到来自服务端的响应时,CommandHandler就从queue head取出command然后进行解析,解析成功后command就执行完毕,这时将其从queue中移除即可,如果还有剩余的数据,就继续从queue中取出command进行解析。值得注意的是CommandHandler采用了buffer的方案来处理可能的TCP粘包拆包问题,其内部维护了一个单独的buffer,每次接收到服务端的数据时并不直接进行解析,而是先将其写入到自身的buffer中,然后对buffer的数据进行解析,而每次解析时会尽可能的消费buffer中的所有数据。
command的编解码
command的编码在CommandEncoder中触发,其逻辑也比较简单,就是直接调用了command的encode方法。对于服务端响应的解析则直接位于CommandHandler的channelRead方法中。command的编解码也就是直接实现了redis的通信协议,具体可以参考:redis通信协议
API的实现
异步API
在对外API的实现方面,主要逻辑由以下几步构成:
- 组装命令对应的Command对象,包括命令的类型、命令的参数、命令的返回类型等
- 通过StatefulRedisConnection的dispatch方法发送命令,dispatch方法实际上也就是调用了channel的write方法
- 返回对应的Command对象
在异步API的实现中,返回的是一个AsyncCommand对象,AsyncCommand继承了CompletableFuture类型,所以可以作为一个Future返回。AsyncCommand内维护了原始的命令以及返回值的占位符,当解析完成后AsyncCommand就会调用其自身的compelte方法,调用就完成了。
同步API
同步API的实现实际上就是封装了异步调用的API,在返回前阻塞调用Future的get方法。
reactive API
reactive API的实现底层也是调用了StatefulRedisConnection的dispatch方法发送命令,然后将返回包装成一个Publisher。