【Redis】Redis高级客户端Lettuce详解
文章目录
- 前提
- Lettuce简介
- 连接Redis
- 定制的连接URI语法
- 基本使用
- API
- 同步API
- 异步API
- 反应式API
- 发布和订阅
- 事务和批量命令执行
- Lua脚本执行
- 高可用和分片
- 普通主从模式
- 哨兵模式
- 集群模式
- 动态命令和自定义命令
- 高阶特性
- 配置客户端资源
- 使用连接池
- 几个常见的渐进式删除例子
- 在SpringBoot中使用Lettuce
- 小结
- 链接
前提
Lettuce,读音[ˈletɪs],是一个Redis的Java驱动包,初识她的时候是使用RedisTemplate的时候遇到点问题Debug到底层的一些源码,发现spring-data-redis的驱动包在某个版本之后替换为Lettuce。Lettuce翻译为生菜,没错,就是吃的那种生菜,所以它的Logo长这样:
既然能被Spring生态所认可,Lettuce想必有过人之处,于是笔者花时间阅读她的官方文档,整理测试示例,写下这篇文章。编写本文时所使用的版本为Lettuce 5.1.8.RELEASE,SpringBoot 2.1.8.RELEASE,JDK [8,11]。超长警告:这篇文章断断续续花了两周完成,超过4万字…
Lettuce简介
Lettuce是一个高性能基于Java编写的Redis驱动框架,底层集成了Project Reactor提供天然的反应式编程,通信框架集成了Netty使用了非阻塞IO,5.x版本之后融合了JDK1.8的异步编程特性,在保证高性能的同时提供了十分丰富易用的API,5.1版本的新特性如下:
- 支持Redis的新增命令ZPOPMIN, ZPOPMAX, BZPOPMIN, BZPOPMAX。
- 支持通过Brave模块跟踪Redis命令执行。
- 支持Redis Streams。
- 支持异步的主从连接。
- 支持异步连接池。
- 新增命令最多执行一次模式(禁止自动重连)。
- 全局命令超时设置(对异步和反应式命令也有效)。
- …等等
注意一点:Redis的版本至少需要2.6,当然越高越好,API的兼容性比较强大。
只需要引入单个依赖就可以开始愉快地使用Lettuce:
- Maven
<dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>5.1.8.RELEASE</version>
</dependency>
- Gradle
dependencies {compile 'io.lettuce:lettuce-core:5.1.8.RELEASE'
}
连接Redis
单机、哨兵、集群模式下连接Redis需要一个统一的标准去表示连接的细节信息,在Lettuce中这个统一的标准是RedisURI。可以通过三种方式构造一个RedisURI实例:
- 定制的字符串URI语法:
RedisURI uri = RedisURI.create("redis://localhost/");
- 使用建造器(RedisURI.Builder):
RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();
- 直接通过构造函数实例化:
RedisURI uri = new RedisURI("localhost", 6379, 60, TimeUnit.SECONDS);
定制的连接URI语法
- 单机(前缀为redis://)
格式:redis://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:redis://mypassword@127.0.0.1:6379/0?timeout=10s
简单:redis://localhost
- 单机并且使用SSL(前缀为rediss://) <== 注意后面多了个s
格式:rediss://[password@]host[:port][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]
完整:rediss://mypassword@127.0.0.1:6379/0?timeout=10s
简单:rediss://localhost
- 单机Unix Domain Sockets模式(前缀为redis-socket://)
格式:redis-socket://path[?[timeout=timeout[d|h|m|s|ms|us|ns]][&_database=database_]]
完整:redis-socket:///tmp/redis?timeout=10s&_database=0
- 哨兵(前缀为redis-sentinel://)
格式:redis-sentinel://[password@]host[:port][,host2[:port2]][/databaseNumber][?[timeout=timeout[d|h|m|s|ms|us|ns]]#sentinelMasterId
完整:redis-sentinel://mypassword@127.0.0.1:6379,127.0.0.1:6380/0?timeout=10s#mymaster
超时时间单位:
- d 天
- h 小时
- m 分钟
- s 秒钟
- ms 毫秒
- us 微秒
- ns 纳秒
个人建议使用RedisURI提供的建造器,毕竟定制的URI虽然简洁,但是比较容易出现人为错误。鉴于笔者没有SSL和Unix Domain Socket的使用场景,下面不对这两种连接方式进行列举。
基本使用
Lettuce使用的时候依赖于四个主要组件:
- RedisURI:连接信息。
- RedisClient:Redis客户端,特殊地,集群连接有一个定制的RedisClusterClient。
- Connection:Redis连接,主要是StatefulConnection或者StatefulRedisConnection的子类,连接的类型主要由连接的具体方式(单机、哨兵、集群、订阅发布等等)选定,比较重要。
- RedisCommands:Redis命令API接口,基本上覆盖了Redis发行版本的所有命令,提供了同步(sync)、异步(async)、反应式(reative)的调用方式,对于使用者而言,会经常跟RedisCommands系列接口打交道。
一个基本使用例子如下:
@Test
public void testSetGet() throws Exception {RedisURI redisUri = RedisURI.builder() // <1> 创建单机连接的连接信息.withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri); // <2> 创建客户端StatefulRedisConnection<String, String> connection = redisClient.connect(); // <3> 创建线程安全的连接RedisCommands<String, String> redisCommands = connection.sync(); // <4> 创建同步命令SetArgs setArgs = SetArgs.Builder.nx().ex(5);String result = redisCommands.set("name", "throwable", setArgs);Assertions.assertThat(result).isEqualToIgnoringCase("OK");result = redisCommands.get("name");Assertions.assertThat(result).isEqualTo("throwable");// ... 其他操作connection.close(); // <5> 关闭连接redisClient.shutdown(); // <6> 关闭客户端
}
注意:
- <5>:关闭连接一般在应用程序停止之前操作,一个应用程序中的一个Redis驱动实例不需要太多的连接(一般情况下只需要一个连接实例就可以,如果有多个连接的需要可以考虑使用连接池,其实Redis目前处理命令的模块是单线程,在客户端多个连接多线程调用理论上没有效果)。
- <6>:关闭客户端一般应用程序停止之前操作,如果条件允许的话,基于后开先闭原则,客户端关闭应该在连接关闭之后操作。
API
Lettuce主要提供三种API:
- 同步(sync):RedisCommands。
- 异步(async):RedisAsyncCommands。
- 反应式(reactive):RedisReactiveCommands。
先准备好一个单机Redis连接备用:
private static StatefulRedisConnection<String, String> CONNECTION;
private static RedisClient CLIENT;@BeforeClass
public static void beforeClass() {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();CLIENT = RedisClient.create(redisUri);CONNECTION = CLIENT.connect();
}@AfterClass
public static void afterClass() throws Exception {CONNECTION.close();CLIENT.shutdown();
}
Redis命令API的具体实现可以直接从StatefulRedisConnection实例获取,见其接口定义:
public interface StatefulRedisConnection<K, V> extends StatefulConnection<K, V> {boolean isMulti();RedisCommands<K, V> sync();RedisAsyncCommands<K, V> async();RedisReactiveCommands<K, V> reactive();
}
值得注意的是,在不指定编码解码器RedisCodec的前提下,RedisClient创建的StatefulRedisConnection实例一般是泛型实例StatefulRedisConnection<String,String>,也就是所有命令API的KEY和VALUE都是String类型,这种使用方式能满足大部分的使用场景。当然,必要的时候可以定制编码解码器RedisCodec<K,V>。
同步API
先构建RedisCommands实例:
private static RedisCommands<String, String> COMMAND;@BeforeClass
public static void beforeClass() {COMMAND = CONNECTION.sync();
}
基本使用:
@Test
public void testSyncPing() throws Exception {String pong = COMMAND.ping();Assertions.assertThat(pong).isEqualToIgnoringCase("PONG");
}@Test
public void testSyncSetAndGet() throws Exception {SetArgs setArgs = SetArgs.Builder.nx().ex(5);COMMAND.set("name", "throwable", setArgs);String value = COMMAND.get("name");log.info("Get value: {}", value);
}// Get value: throwable
同步API在所有命令调用之后会立即返回结果。如果熟悉Jedis的话,RedisCommands的用法其实和它相差不大。
异步API
先构建RedisAsyncCommands实例:
private static RedisAsyncCommands<String, String> ASYNC_COMMAND;@BeforeClass
public static void beforeClass() {ASYNC_COMMAND = CONNECTION.async();
}
基本使用:
@Test
public void testAsyncPing() throws Exception {RedisFuture<String> redisFuture = ASYNC_COMMAND.ping();log.info("Ping result:{}", redisFuture.get());
}
// Ping result:PONG
RedisAsyncCommands所有方法执行返回结果都是RedisFuture实例,而RedisFuture接口的定义如下:
public interface RedisFuture<V> extends CompletionStage<V>, Future<V> {String getError();boolean await(long timeout, TimeUnit unit) throws InterruptedException;
}
也就是,RedisFuture可以无缝使用Future或者JDK1.8中引入的CompletableFuture提供的方法。举个例子:
@Test
public void testAsyncSetAndGet1() throws Exception {SetArgs setArgs = SetArgs.Builder.nx().ex(5);RedisFuture<String> future = ASYNC_COMMAND.set("name", "throwable", setArgs);// CompletableFuture#thenAccept()future.thenAccept(value -> log.info("Set命令返回:{}", value));// Future#get()future.get();
}
// Set命令返回:OK@Test
public void testAsyncSetAndGet2() throws Exception {SetArgs setArgs = SetArgs.Builder.nx().ex(5);CompletableFuture<Void> result =(CompletableFuture<Void>) ASYNC_COMMAND.set("name", "throwable", setArgs).thenAcceptBoth(ASYNC_COMMAND.get("name"),(s, g) -> {log.info("Set命令返回:{}", s);log.info("Get命令返回:{}", g);});result.get();
}
// Set命令返回:OK
// Get命令返回:throwable
如果能熟练使用CompletableFuture和函数式编程技巧,可以组合多个RedisFuture完成一些列复杂的操作。
反应式API
Lettuce引入的反应式编程框架是Project Reactor,如果没有反应式编程经验可以先自行了解一下Project Reactor。
构建RedisReactiveCommands实例:
private static RedisReactiveCommands<String, String> REACTIVE_COMMAND;@BeforeClass
public static void beforeClass() {REACTIVE_COMMAND = CONNECTION.reactive();
}
根据Project Reactor,RedisReactiveCommands的方法如果返回的结果只包含0或1个元素,那么返回值类型是Mono,如果返回的结果包含0到N(N大于0)个元素,那么返回值是Flux。举个例子:
@Test
public void testReactivePing() throws Exception {Mono<String> ping = REACTIVE_COMMAND.ping();ping.subscribe(v -> log.info("Ping result:{}", v));Thread.sleep(1000);
}
// Ping result:PONG@Test
public void testReactiveSetAndGet() throws Exception {SetArgs setArgs = SetArgs.Builder.nx().ex(5);REACTIVE_COMMAND.set("name", "throwable", setArgs).block();REACTIVE_COMMAND.get("name").subscribe(value -> log.info("Get命令返回:{}", value));Thread.sleep(1000);
}
// Get命令返回:throwable@Test
public void testReactiveSet() throws Exception {REACTIVE_COMMAND.sadd("food", "bread", "meat", "fish").block();Flux<String> flux = REACTIVE_COMMAND.smembers("food");flux.subscribe(log::info);REACTIVE_COMMAND.srem("food", "bread", "meat", "fish").block();Thread.sleep(1000);
}
// meat
// bread
// fish
举个更加复杂的例子,包含了事务、函数转换等:
@Test
public void testReactiveFunctional() throws Exception {REACTIVE_COMMAND.multi().doOnSuccess(r -> {REACTIVE_COMMAND.set("counter", "1").doOnNext(log::info).subscribe();REACTIVE_COMMAND.incr("counter").doOnNext(c -> log.info(String.valueOf(c))).subscribe();}).flatMap(s -> REACTIVE_COMMAND.exec()).doOnNext(transactionResult -> log.info("Discarded:{}", transactionResult.wasDiscarded())).subscribe();Thread.sleep(1000);
}
// OK
// 2
// Discarded:false
这个方法开启一个事务,先把counter设置为1,再将counter自增1。
发布和订阅
非集群模式下的发布订阅依赖于定制的连接StatefulRedisPubSubConnection,集群模式下的发布订阅依赖于定制的连接StatefulRedisClusterPubSubConnection,两者分别来源于RedisClient#connectPubSub()系列方法和RedisClusterClient#connectPubSub():
- 非集群模式:
// 可能是单机、普通主从、哨兵等非集群模式的客户端
RedisClient client = ...
StatefulRedisPubSubConnection<String, String> connection = client.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });// 同步命令
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");// 异步命令
RedisPubSubAsyncCommands<String, String> async = connection.async();
RedisFuture<Void> future = async.subscribe("channel");// 反应式命令
RedisPubSubReactiveCommands<String, String> reactive = connection.reactive();
reactive.subscribe("channel").subscribe();reactive.observeChannels().doOnNext(patternMessage -> {...}).subscribe()
- 集群模式:
// 使用方式其实和非集群模式基本一致
RedisClusterClient clusterClient = ...
StatefulRedisClusterPubSubConnection<String, String> connection = clusterClient.connectPubSub();
connection.addListener(new RedisPubSubListener<String, String>() { ... });
RedisPubSubCommands<String, String> sync = connection.sync();
sync.subscribe("channel");
// ...
这里用单机同步命令的模式举一个Redis键空间通知(Redis Keyspace Notifications)的例子:
@Test
public void testSyncKeyspaceNotification() throws Exception {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379)// 注意这里只能是0号库.withDatabase(0).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri);StatefulRedisConnection<String, String> redisConnection = redisClient.connect();RedisCommands<String, String> redisCommands = redisConnection.sync();// 只接收键过期的事件redisCommands.configSet("notify-keyspace-events", "Ex");StatefulRedisPubSubConnection<String, String> connection = redisClient.connectPubSub();connection.addListener(new RedisPubSubAdapter<>() {@Overridepublic void psubscribed(String pattern, long count) {log.info("pattern:{},count:{}", pattern, count);}@Overridepublic void message(String pattern, String channel, String message) {log.info("pattern:{},channel:{},message:{}", pattern, channel, message);}});RedisPubSubCommands<String, String> commands = connection.sync();commands.psubscribe("__keyevent@0__:expired");redisCommands.setex("name", 2, "throwable");Thread.sleep(10000);redisConnection.close();connection.close();redisClient.shutdown();
}
// pattern:__keyevent@0__:expired,count:1
// pattern:__keyevent@0__:expired,channel:__keyevent@0__:expired,message:name
实际上,在实现RedisPubSubListener的时候可以单独抽离,尽量不要设计成匿名内部类的形式。
事务和批量命令执行
事务相关的命令就是WATCH、UNWATCH、EXEC、MULTI和DISCARD,在RedisCommands系列接口中有对应的方法。举个例子:
// 同步模式
@Test
public void testSyncMulti() throws Exception {COMMAND.multi();COMMAND.setex("name-1", 2, "throwable");COMMAND.setex("name-2", 2, "doge");TransactionResult result = COMMAND.exec();int index = 0;for (Object r : result) {log.info("Result-{}:{}", index, r);index++;}
}
// Result-0:OK
// Result-1:OK
Redis的Pipeline也就是管道机制可以理解为把多个命令打包在一次请求发送到Redis服务端,然后Redis服务端把所有的响应结果打包好一次性返回,从而节省不必要的网络资源(最主要是减少网络请求次数)。Redis对于Pipeline机制如何实现并没有明确的规定,也没有提供特殊的命令支持Pipeline机制。Jedis中底层采用BIO(阻塞IO)通讯,所以它的做法是客户端缓存将要发送的命令,最后需要触发然后同步发送一个巨大的命令列表包,再接收和解析一个巨大的响应列表包。Pipeline在Lettuce中对使用者是透明的,由于底层的通讯框架是Netty,所以网络通讯层面的优化Lettuce不需要过多干预,换言之可以这样理解:Netty帮Lettuce从底层实现了Redis的Pipeline机制。但是,Lettuce的异步API也提供了手动Flush的方法:
@Test
public void testAsyncManualFlush() {// 取消自动flushASYNC_COMMAND.setAutoFlushCommands(false);List<RedisFuture<?>> redisFutures = Lists.newArrayList();int count = 5000;for (int i = 0; i < count; i++) {String key = "key-" + (i + 1);String value = "value-" + (i + 1);redisFutures.add(ASYNC_COMMAND.set(key, value));redisFutures.add(ASYNC_COMMAND.expire(key, 2));}long start = System.currentTimeMillis();ASYNC_COMMAND.flushCommands();boolean result = LettuceFutures.awaitAll(10, TimeUnit.SECONDS, redisFutures.toArray(new RedisFuture[0]));Assertions.assertThat(result).isTrue();log.info("Lettuce cost:{} ms", System.currentTimeMillis() - start);
}
// Lettuce cost:1302 ms
上面只是从文档看到的一些理论术语,但是现实是骨感的,对比了下Jedis的Pipeline提供的方法,发现了Jedis的Pipeline执行耗时比较低:
@Test
public void testJedisPipeline() throws Exception {Jedis jedis = new Jedis();Pipeline pipeline = jedis.pipelined();int count = 5000;for (int i = 0; i < count; i++) {String key = "key-" + (i + 1);String value = "value-" + (i + 1);pipeline.set(key, value);pipeline.expire(key, 2);}long start = System.currentTimeMillis();pipeline.syncAndReturnAll();log.info("Jedis cost:{} ms", System.currentTimeMillis() - start);
}
// Jedis cost:9 ms
个人猜测Lettuce可能底层并非合并所有命令一次发送(甚至可能是单条发送),具体可能需要抓包才能定位。依此来看,如果真的有大量执行Redis命令的场景,不妨可以使用Jedis的Pipeline。
注意:由上面的测试推断RedisTemplate的executePipelined()方法是假的Pipeline执行方法,使用RedisTemplate的时候请务必注意这一点。
Lua脚本执行
Lettuce中执行Redis的Lua命令的同步接口如下:
public interface RedisScriptingCommands<K, V> {<T> T eval(String var1, ScriptOutputType var2, K... var3);<T> T eval(String var1, ScriptOutputType var2, K[] var3, V... var4);<T> T evalsha(String var1, ScriptOutputType var2, K... var3);<T> T evalsha(String var1, ScriptOutputType var2, K[] var3, V... var4);List<Boolean> scriptExists(String... var1);String scriptFlush();String scriptKill();String scriptLoad(V var1);String digest(V var1);
}
异步和反应式的接口方法定义差不多,不同的地方就是返回值类型,一般我们常用的是eval()、evalsha()和scriptLoad()方法。举个简单的例子:
private static RedisCommands<String, String> COMMANDS;
private static String RAW_LUA = "local key = KEYS[1]\n" +"local value = ARGV[1]\n" +"local timeout = ARGV[2]\n" +"redis.call('SETEX', key, tonumber(timeout), value)\n" +"local result = redis.call('GET', key)\n" +"return result;";
private static AtomicReference<String> LUA_SHA = new AtomicReference<>();@Test
public void testLua() throws Exception {LUA_SHA.compareAndSet(null, COMMANDS.scriptLoad(RAW_LUA));String[] keys = new String[]{"name"};String[] args = new String[]{"throwable", "5000"};String result = COMMANDS.evalsha(LUA_SHA.get(), ScriptOutputType.VALUE, keys, args);log.info("Get value:{}", result);
}
// Get value:throwable
高可用和分片
为了Redis的高可用,一般会采用普通主从(Master/Replica,这里笔者称为普通主从模式,也就是仅仅做了主从复制,故障需要手动切换)、哨兵和集群。普通主从模式可以独立运行,也可以配合哨兵运行,只是哨兵提供自动故障转移和主节点提升功能。普通主从和哨兵都可以使用MasterSlave,通过入参包括RedisClient、编码解码器以及一个或者多个RedisURI获取对应的Connection实例。
这里注意一点,MasterSlave中提供的方法如果只要求传入一个RedisURI实例,那么Lettuce会进行拓扑发现机制,自动获取Redis主从节点信息;如果要求传入一个RedisURI集合,那么对于普通主从模式来说所有节点信息是静态的,不会进行发现和更新。
拓扑发现的规则如下:
- 对于普通主从(Master/Replica)模式,不需要感知RedisURI指向从节点还是主节点,只会进行一次性的拓扑查找所有节点信息,此后节点信息会保存在静态缓存中,不会更新。
- 对于哨兵模式,会订阅所有哨兵实例并侦听订阅/发布消息以触发拓扑刷新机制,更新缓存的节点信息,也就是哨兵天然就是动态发现节点信息,不支持静态配置。
拓扑发现机制的提供API为TopologyProvider,需要了解其原理的可以参考具体的实现。
对于集群(Cluster)模式,Lettuce提供了一套独立的API。
另外,如果Lettuce连接面向的是非单个Redis节点,连接实例提供了数据读取节点偏好(ReadFrom)设置,可选值有:
- MASTER:只从Master节点中读取。
- MASTER_PREFERRED:优先从Master节点中读取。
- SLAVE_PREFERRED:优先从Slavor节点中读取。
- SLAVE:只从Slavor节点中读取。
- NEAREST:使用最近一次连接的Redis实例读取。
普通主从模式
假设现在有三个Redis服务形成树状主从关系如下:
- 节点一:localhost:6379,角色为Master。
- 节点二:localhost:6380,角色为Slavor,节点一的从节点。
- 节点三:localhost:6381,角色为Slavor,节点二的从节点。
首次动态节点发现主从模式的节点信息需要如下构建连接:
@Test
public void testDynamicReplica() throws Exception {// 这里只需要配置一个节点的连接信息,不一定需要是主节点的信息,从节点也可以RedisURI uri = RedisURI.builder().withHost("localhost").withPort(6379).build();RedisClient redisClient = RedisClient.create(uri);StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), uri);// 只从从节点读取数据connection.setReadFrom(ReadFrom.SLAVE);// 执行其他Redis命令connection.close();redisClient.shutdown();
}
如果需要指定静态的Redis主从节点连接属性,那么可以这样构建连接:
@Test
public void testStaticReplica() throws Exception {List<RedisURI> uris = new ArrayList<>();RedisURI uri1 = RedisURI.builder().withHost("localhost").withPort(6379).build();RedisURI uri2 = RedisURI.builder().withHost("localhost").withPort(6380).build();RedisURI uri3 = RedisURI.builder().withHost("localhost").withPort(6381).build();uris.add(uri1);uris.add(uri2);uris.add(uri3);RedisClient redisClient = RedisClient.create();StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient,new Utf8StringCodec(), uris);// 只从主节点读取数据connection.setReadFrom(ReadFrom.MASTER);// 执行其他Redis命令connection.close();redisClient.shutdown();
}
哨兵模式
由于Lettuce自身提供了哨兵的拓扑发现机制,所以只需要随便配置一个哨兵节点的RedisURI实例即可:
@Test
public void testDynamicSentinel() throws Exception {RedisURI redisUri = RedisURI.builder().withPassword("你的密码").withSentinel("localhost", 26379).withSentinelMasterId("哨兵Master的ID").build();RedisClient redisClient = RedisClient.create();StatefulRedisMasterSlaveConnection<String, String> connection = MasterSlave.connect(redisClient, new Utf8StringCodec(), redisUri);// 只允许从从节点读取数据connection.setReadFrom(ReadFrom.SLAVE);RedisCommands<String, String> command = connection.sync();SetArgs setArgs = SetArgs.Builder.nx().ex(5);command.set("name", "throwable", setArgs);String value = command.get("name");log.info("Get value:{}", value);
}
// Get value:throwable
集群模式
鉴于笔者对Redis集群模式并不熟悉,Cluster模式下的API使用本身就有比较多的限制,所以这里只简单介绍一下怎么用。先说几个特性:
下面的API提供跨槽位(Slot)调用的功能:
- RedisAdvancedClusterCommands。
- RedisAdvancedClusterAsyncCommands。
- RedisAdvancedClusterReactiveCommands。
静态节点选择功能:
- masters:选择所有主节点执行命令。
- slaves:选择所有从节点执行命令,其实就是只读模式。
- all nodes:命令可以在所有节点执行。
集群拓扑视图动态更新功能:
- 手动更新,主动调用RedisClusterClient#reloadPartitions()。
- 后台定时更新。
- 自适应更新,基于连接断开和MOVED/ASK命令重定向自动更新。
Redis集群搭建详细过程可以参考官方文档,假设已经搭建好集群如下(192.168.56.200是笔者的虚拟机Host):
- 192.168.56.200:7001 => 主节点,槽位0-5460。
- 192.168.56.200:7002 => 主节点,槽位5461-10922。
- 192.168.56.200:7003 => 主节点,槽位10923-16383。
- 192.168.56.200:7004 => 7001的从节点。
- 192.168.56.200:7005 => 7002的从节点。
- 192.168.56.200:7006 => 7003的从节点。
简单的集群连接和使用方式如下:
@Test
public void testSyncCluster(){RedisURI uri = RedisURI.builder().withHost("192.168.56.200").build();RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();RedisAdvancedClusterCommands<String, String> commands = connection.sync();commands.setex("name",10, "throwable");String value = commands.get("name");log.info("Get value:{}", value);
}
// Get value:throwable
节点选择:
@Test
public void testSyncNodeSelection() {RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();RedisAdvancedClusterCommands<String, String> commands = connection.sync();
// commands.all(); // 所有节点
// commands.masters(); // 主节点// 从节点只读NodeSelection<String, String> replicas = commands.slaves();NodeSelectionCommands<String, String> nodeSelectionCommands = replicas.commands();// 这里只是演示,一般应该禁用keys *命令Executions<List<String>> keys = nodeSelectionCommands.keys("*");keys.forEach(key -> log.info("key: {}", key));connection.close();redisClusterClient.shutdown();
}
定时更新集群拓扑视图(每隔十分钟更新一次,这个时间自行考量,不能太频繁):
@Test
public void testPeriodicClusterTopology() throws Exception {RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder().enablePeriodicRefresh(Duration.of(10, ChronoUnit.MINUTES)).build();redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();RedisAdvancedClusterCommands<String, String> commands = connection.sync();commands.setex("name", 10, "throwable");String value = commands.get("name");log.info("Get value:{}", value);Thread.sleep(Integer.MAX_VALUE);connection.close();redisClusterClient.shutdown();
}
自适应更新集群拓扑视图:
@Test
public void testAdaptiveClusterTopology() throws Exception {RedisURI uri = RedisURI.builder().withHost("192.168.56.200").withPort(7001).build();RedisClusterClient redisClusterClient = RedisClusterClient.create(uri);ClusterTopologyRefreshOptions options = ClusterTopologyRefreshOptions.builder().enableAdaptiveRefreshTrigger(ClusterTopologyRefreshOptions.RefreshTrigger.MOVED_REDIRECT,ClusterTopologyRefreshOptions.RefreshTrigger.PERSISTENT_RECONNECTS).adaptiveRefreshTriggersTimeout(Duration.of(30, ChronoUnit.SECONDS)).build();redisClusterClient.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(options).build());StatefulRedisClusterConnection<String, String> connection = redisClusterClient.connect();RedisAdvancedClusterCommands<String, String> commands = connection.sync();commands.setex("name", 10, "throwable");String value = commands.get("name");log.info("Get value:{}", value);Thread.sleep(Integer.MAX_VALUE);connection.close();redisClusterClient.shutdown();
}
动态命令和自定义命令
自定义命令是Redis命令有限集,不过可以更细粒度指定KEY、ARGV、命令类型、编码解码器和返回值类型,依赖于dispatch()方法:
// 自定义实现PING方法
@Test
public void testCustomPing() throws Exception {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri);StatefulRedisConnection<String, String> connect = redisClient.connect();RedisCommands<String, String> sync = connect.sync();RedisCodec<String, String> codec = StringCodec.UTF8;String result = sync.dispatch(CommandType.PING, new StatusOutput<>(codec));log.info("PING:{}", result);connect.close();redisClient.shutdown();
}
// PING:PONG// 自定义实现Set方法
@Test
public void testCustomSet() throws Exception {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri);StatefulRedisConnection<String, String> connect = redisClient.connect();RedisCommands<String, String> sync = connect.sync();RedisCodec<String, String> codec = StringCodec.UTF8;sync.dispatch(CommandType.SETEX, new StatusOutput<>(codec),new CommandArgs<>(codec).addKey("name").add(5).addValue("throwable"));String result = sync.get("name");log.info("Get value:{}", result);connect.close();redisClient.shutdown();
}
// Get value:throwable
动态命令是基于Redis命令有限集,并且通过注解和动态代理完成一些复杂命令组合的实现。主要注解在io.lettuce.core.dynamic.annotation包路径下。简单举个例子:
public interface CustomCommand extends Commands {// SET [key] [value]@Command("SET ?0 ?1")String setKey(String key, String value);// SET [key] [value]@Command("SET :key :value")String setKeyNamed(@Param("key") String key, @Param("value") String value);// MGET [key1] [key2]@Command("MGET ?0 ?1")List<String> mGet(String key1, String key2);/*** 方法名作为命令*/@CommandNaming(strategy = CommandNaming.Strategy.METHOD_NAME)String mSet(String key1, String value1, String key2, String value2);
}@Test
public void testCustomDynamicSet() throws Exception {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri);StatefulRedisConnection<String, String> connect = redisClient.connect();RedisCommandFactory commandFactory = new RedisCommandFactory(connect);CustomCommand commands = commandFactory.getCommands(CustomCommand.class);commands.setKey("name", "throwable");commands.setKeyNamed("throwable", "doge");log.info("MGET ===> " + commands.mGet("name", "throwable"));commands.mSet("key1", "value1","key2", "value2");log.info("MGET ===> " + commands.mGet("key1", "key2"));connect.close();redisClient.shutdown();
}
// MGET ===> [throwable, doge]
// MGET ===> [value1, value2]
高阶特性
Lettuce有很多高阶使用特性,这里只列举个人认为常用的两点:
- 配置客户端资源。
- 使用连接池。
更多其他特性可以自行参看官方文档。
配置客户端资源
客户端资源的设置与Lettuce的性能、并发和事件处理相关。线程池或者线程组相关配置占据客户端资源配置的大部分(EventLoopGroups和EventExecutorGroup),这些线程池或者线程组是连接程序的基础组件。一般情况下,客户端资源应该在多个Redis客户端之间共享,并且在不再使用的时候需要自行关闭。笔者认为,客户端资源是面向Netty的。注意:除非特别熟悉或者花长时间去测试调整下面提到的参数,否则在没有经验的前提下凭直觉修改默认值,有可能会踩坑。
客户端资源接口是ClientResources,实现类是DefaultClientResources。
构建DefaultClientResources实例:
// 默认
ClientResources resources = DefaultClientResources.create();// 建造器
ClientResources resources = DefaultClientResources.builder().ioThreadPoolSize(4).computationThreadPoolSize(4).build()
使用:
ClientResources resources = DefaultClientResources.create();
// 非集群
RedisClient client = RedisClient.create(resources, uri);
// 集群
RedisClusterClient clusterClient = RedisClusterClient.create(resources, uris);
// ......
client.shutdown();
clusterClient.shutdown();
// 关闭资源
resources.shutdown();
客户端资源基本配置:
属性 | 描述 | 默认值 |
---|---|---|
ioThreadPoolSize | I/O线程数 | Runtime.getRuntime().availableProcessors() |
computationThreadPoolSize | 任务线程数 | Runtime.getRuntime().availableProcessors() |
客户端资源高级配置:
属性 | 描述 | 默认值 |
---|---|---|
eventLoopGroupProvider | EventLoopGroup提供商 | - |
eventExecutorGroupProvider | EventExecutorGroup提供商 | - |
eventBus | 事件总线 | DefaultEventBus |
commandLatencyCollectorOptions | 命令延时收集器配置 | DefaultCommandLatencyCollectorOptions |
commandLatencyCollector | 命令延时收集器 | DefaultCommandLatencyCollector |
commandLatencyPublisherOptions | 命令延时发布器配置 | DefaultEventPublisherOptions |
dnsResolver | DNS处理器 | JDK或者Netty提供 |
reconnectDelay | 重连延时配置 | Delay.exponential() |
nettyCustomizer | Netty自定义配置器 | - |
tracing | 轨迹记录器 | - |
非集群客户端RedisClient的属性配置:
Redis非集群客户端RedisClient本身提供了配置属性方法:
RedisClient client = RedisClient.create(uri);
client.setOptions(ClientOptions.builder().autoReconnect(false).pingBeforeActivateConnection(true).build());
非集群客户端的配置属性列表:
属性 | 描述 | 默认值 |
---|---|---|
pingBeforeActivateConnection | 连接激活之前是否执行PING命令 | false |
autoReconnect | 是否自动重连 | true |
cancelCommandsOnReconnectFailure | 重连失败是否拒绝命令执行 | false |
suspendReconnectOnProtocolFailure | 底层协议失败是否挂起重连操作 | false |
requestQueueSize | 请求队列容量 | 2147483647(Integer#MAX_VALUE) |
disconnectedBehavior | 失去连接时候的行为 | DEFAULT |
sslOptions | SSL配置 | - |
socketOptions | Socket配置 | 10 seconds Connection-Timeout, no keep-alive, no TCP noDelay |
timeoutOptions | 超时配置 | - |
publishOnScheduler | 发布反应式信号数据的调度器 | 使用I/O线程 |
集群客户端属性配置:
Redis集群客户端RedisClusterClient本身提供了配置属性方法:
RedisClusterClient client = RedisClusterClient.create(uri);
ClusterTopologyRefreshOptions topologyRefreshOptions = ClusterTopologyRefreshOptions.builder().enablePeriodicRefresh(refreshPeriod(10, TimeUnit.MINUTES)).enableAllAdaptiveRefreshTriggers().build();client.setOptions(ClusterClientOptions.builder().topologyRefreshOptions(topologyRefreshOptions).build());
集群客户端的配置属性列表:
属性 | 描述 | 默认值 |
---|---|---|
enablePeriodicRefresh | 是否允许周期性更新集群拓扑视图 | false |
refreshPeriod | 更新集群拓扑视图周期 | 60秒 |
enableAdaptiveRefreshTrigger | 设置自适应更新集群拓扑视图触发器RefreshTrigger | - |
adaptiveRefreshTriggersTimeout | 自适应更新集群拓扑视图触发器超时设置 | 30秒 |
refreshTriggersReconnectAttempts 自 | 适应更新集群拓扑视图触发重连次数 | 5 |
dynamicRefreshSources | 是否允许动态刷新拓扑资源 | true |
closeStaleConnections | 是否允许关闭陈旧的连接 | true |
maxRedirects | 集群重定向次数上限 | 5 |
validateClusterNodeMembership | 是否校验集群节点的成员关系 | true |
使用连接池
引入连接池依赖commons-pool2:
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.7.0</version>
</dependency
基本使用如下:
@Test
public void testUseConnectionPool() throws Exception {RedisURI redisUri = RedisURI.builder().withHost("localhost").withPort(6379).withTimeout(Duration.of(10, ChronoUnit.SECONDS)).build();RedisClient redisClient = RedisClient.create(redisUri);GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();GenericObjectPool<StatefulRedisConnection<String, String>> pool= ConnectionPoolSupport.createGenericObjectPool(redisClient::connect, poolConfig);try (StatefulRedisConnection<String, String> connection = pool.borrowObject()) {RedisCommands<String, String> command = connection.sync();SetArgs setArgs = SetArgs.Builder.nx().ex(5);command.set("name", "throwable", setArgs);String n = command.get("name");log.info("Get value:{}", n);}pool.close();redisClient.shutdown();
}
其中,同步连接的池化支持需要用ConnectionPoolSupport,异步连接的池化支持需要用AsyncConnectionPoolSupport(Lettuce5.1之后才支持)。
几个常见的渐进式删除例子
渐进式删除Hash中的域-属性:
@Test
public void testDelBigHashKey() throws Exception {// SCAN参数ScanArgs scanArgs = ScanArgs.Builder.limit(2);// TEMP游标ScanCursor cursor = ScanCursor.INITIAL;// 目标KEYString key = "BIG_HASH_KEY";prepareHashTestData(key);log.info("开始渐进式删除Hash的元素...");int counter = 0;do {MapScanCursor<String, String> result = COMMAND.hscan(key, cursor, scanArgs);// 重置TEMP游标cursor = ScanCursor.of(result.getCursor());cursor.setFinished(result.isFinished());Collection<String> fields = result.getMap().values();if (!fields.isEmpty()) {COMMAND.hdel(key, fields.toArray(new String[0]));}counter++;} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));log.info("渐进式删除Hash的元素完毕,迭代次数:{} ...", counter);
}private void prepareHashTestData(String key) throws Exception {COMMAND.hset(key, "1", "1");COMMAND.hset(key, "2", "2");COMMAND.hset(key, "3", "3");COMMAND.hset(key, "4", "4");COMMAND.hset(key, "5", "5");
}
渐进式删除集合中的元素:
@Test
public void testDelBigSetKey() throws Exception {String key = "BIG_SET_KEY";prepareSetTestData(key);// SCAN参数ScanArgs scanArgs = ScanArgs.Builder.limit(2);// TEMP游标ScanCursor cursor = ScanCursor.INITIAL;log.info("开始渐进式删除Set的元素...");int counter = 0;do {ValueScanCursor<String> result = COMMAND.sscan(key, cursor, scanArgs);// 重置TEMP游标cursor = ScanCursor.of(result.getCursor());cursor.setFinished(result.isFinished());List<String> values = result.getValues();if (!values.isEmpty()) {COMMAND.srem(key, values.toArray(new String[0]));}counter++;} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));log.info("渐进式删除Set的元素完毕,迭代次数:{} ...", counter);
}private void prepareSetTestData(String key) throws Exception {COMMAND.sadd(key, "1", "2", "3", "4", "5");
}
渐进式删除有序集合中的元素:
@Test
public void testDelBigZSetKey() throws Exception {// SCAN参数ScanArgs scanArgs = ScanArgs.Builder.limit(2);// TEMP游标ScanCursor cursor = ScanCursor.INITIAL;// 目标KEYString key = "BIG_ZSET_KEY";prepareZSetTestData(key);log.info("开始渐进式删除ZSet的元素...");int counter = 0;do {ScoredValueScanCursor<String> result = COMMAND.zscan(key, cursor, scanArgs);// 重置TEMP游标cursor = ScanCursor.of(result.getCursor());cursor.setFinished(result.isFinished());List<ScoredValue<String>> scoredValues = result.getValues();if (!scoredValues.isEmpty()) {COMMAND.zrem(key, scoredValues.stream().map(ScoredValue<String>::getValue).toArray(String[]::new));}counter++;} while (!(ScanCursor.FINISHED.getCursor().equals(cursor.getCursor()) && ScanCursor.FINISHED.isFinished() == cursor.isFinished()));log.info("渐进式删除ZSet的元素完毕,迭代次数:{} ...", counter);
}private void prepareZSetTestData(String key) throws Exception {COMMAND.zadd(key, 0, "1");COMMAND.zadd(key, 0, "2");COMMAND.zadd(key, 0, "3");COMMAND.zadd(key, 0, "4");COMMAND.zadd(key, 0, "5");
}
在SpringBoot中使用Lettuce
个人认为,spring-data-redis中的API封装并不是很优秀,用起来比较重,不够灵活,这里结合前面的例子和代码,在SpringBoot脚手架项目中配置和整合Lettuce。先引入依赖:
<dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>2.1.8.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.lettuce</groupId><artifactId>lettuce-core</artifactId><version>5.1.8.RELEASE</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version><scope>provided</scope></dependency>
</dependencies>
一般情况下,每个应用应该使用单个Redis客户端实例和单个连接实例,这里设计一个脚手架,适配单机、普通主从、哨兵和集群四种使用场景。对于客户端资源,采用默认的实现即可。对于Redis的连接属性,比较主要的有Host、Port和Password,其他可以暂时忽略。基于约定大于配置的原则,先定制一系列属性配置类(其实有些配置是可以完全共用,但是考虑到要清晰描述类之间的关系,这里拆分多个配置属性类和多个配置方法):
@Data
@ConfigurationProperties(prefix = "lettuce")
public class LettuceProperties {private LettuceSingleProperties single;private LettuceReplicaProperties replica;private LettuceSentinelProperties sentinel;private LettuceClusterProperties cluster;}@Data
public class LettuceSingleProperties {private String host;private Integer port;private String password;
}@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceReplicaProperties extends LettuceSingleProperties {}@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceSentinelProperties extends LettuceSingleProperties {private String masterId;
}@EqualsAndHashCode(callSuper = true)
@Data
public class LettuceClusterProperties extends LettuceSingleProperties {}
配置类如下,主要使用@ConditionalOnProperty做隔离,一般情况下,很少有人会在一个应用使用一种以上的Redis连接场景:
@RequiredArgsConstructor
@Configuration
@ConditionalOnClass(name = "io.lettuce.core.RedisURI")
@EnableConfigurationProperties(value = LettuceProperties.class)
public class LettuceAutoConfiguration {private final LettuceProperties lettuceProperties;@Bean(destroyMethod = "shutdown")public ClientResources clientResources() {return DefaultClientResources.create();}@Bean@ConditionalOnProperty(name = "lettuce.single.host")public RedisURI singleRedisUri() {LettuceSingleProperties singleProperties = lettuceProperties.getSingle();return RedisURI.builder().withHost(singleProperties.getHost()).withPort(singleProperties.getPort()).withPassword(singleProperties.getPassword()).build();}@Bean(destroyMethod = "shutdown")@ConditionalOnProperty(name = "lettuce.single.host")public RedisClient singleRedisClient(ClientResources clientResources, @Qualifier("singleRedisUri") RedisURI redisUri) {return RedisClient.create(clientResources, redisUri);}@Bean(destroyMethod = "close")@ConditionalOnProperty(name = "lettuce.single.host")public StatefulRedisConnection<String, String> singleRedisConnection(@Qualifier("singleRedisClient") RedisClient singleRedisClient) {return singleRedisClient.connect();}@Bean@ConditionalOnProperty(name = "lettuce.replica.host")public RedisURI replicaRedisUri() {LettuceReplicaProperties replicaProperties = lettuceProperties.getReplica();return RedisURI.builder().withHost(replicaProperties.getHost()).withPort(replicaProperties.getPort()).withPassword(replicaProperties.getPassword()).build();}@Bean(destroyMethod = "shutdown")@ConditionalOnProperty(name = "lettuce.replica.host")public RedisClient replicaRedisClient(ClientResources clientResources, @Qualifier("replicaRedisUri") RedisURI redisUri) {return RedisClient.create(clientResources, redisUri);}@Bean(destroyMethod = "close")@ConditionalOnProperty(name = "lettuce.replica.host")public StatefulRedisMasterSlaveConnection<String, String> replicaRedisConnection(@Qualifier("replicaRedisClient") RedisClient replicaRedisClient,@Qualifier("replicaRedisUri") RedisURI redisUri) {return MasterSlave.connect(replicaRedisClient, new Utf8StringCodec(), redisUri);}@Bean@ConditionalOnProperty(name = "lettuce.sentinel.host")public RedisURI sentinelRedisUri() {LettuceSentinelProperties sentinelProperties = lettuceProperties.getSentinel();return RedisURI.builder().withPassword(sentinelProperties.getPassword()).withSentinel(sentinelProperties.getHost(), sentinelProperties.getPort()).withSentinelMasterId(sentinelProperties.getMasterId()).build();}@Bean(destroyMethod = "shutdown")@ConditionalOnProperty(name = "lettuce.sentinel.host")public RedisClient sentinelRedisClient(ClientResources clientResources, @Qualifier("sentinelRedisUri") RedisURI redisUri) {return RedisClient.create(clientResources, redisUri);}@Bean(destroyMethod = "close")@ConditionalOnProperty(name = "lettuce.sentinel.host")public StatefulRedisMasterSlaveConnection<String, String> sentinelRedisConnection(@Qualifier("sentinelRedisClient") RedisClient sentinelRedisClient,@Qualifier("sentinelRedisUri") RedisURI redisUri) {return MasterSlave.connect(sentinelRedisClient, new Utf8StringCodec(), redisUri);}@Bean@ConditionalOnProperty(name = "lettuce.cluster.host")public RedisURI clusterRedisUri() {LettuceClusterProperties clusterProperties = lettuceProperties.getCluster();return RedisURI.builder().withHost(clusterProperties.getHost()).withPort(clusterProperties.getPort()).withPassword(clusterProperties.getPassword()).build();}@Bean(destroyMethod = "shutdown")@ConditionalOnProperty(name = "lettuce.cluster.host")public RedisClusterClient redisClusterClient(ClientResources clientResources, @Qualifier("clusterRedisUri") RedisURI redisUri) {return RedisClusterClient.create(clientResources, redisUri);}@Bean(destroyMethod = "close")@ConditionalOnProperty(name = "lettuce.cluster")public StatefulRedisClusterConnection<String, String> clusterConnection(RedisClusterClient clusterClient) {return clusterClient.connect();}
}
最后为了让IDE识别我们的配置,可以添加IDE亲缘性,/META-INF文件夹下新增一个文件spring-configuration-metadata.json,内容如下:
{"properties": [{"name": "lettuce.single","type": "club.throwable.spring.lettuce.LettuceSingleProperties","description": "单机配置","sourceType": "club.throwable.spring.lettuce.LettuceProperties"},{"name": "lettuce.replica","type": "club.throwable.spring.lettuce.LettuceReplicaProperties","description": "主从配置","sourceType": "club.throwable.spring.lettuce.LettuceProperties"},{"name": "lettuce.sentinel","type": "club.throwable.spring.lettuce.LettuceSentinelProperties","description": "哨兵配置","sourceType": "club.throwable.spring.lettuce.LettuceProperties"},{"name": "lettuce.single","type": "club.throwable.spring.lettuce.LettuceClusterProperties","description": "集群配置","sourceType": "club.throwable.spring.lettuce.LettuceProperties"}]
}
如果想IDE亲缘性做得更好,可以添加/META-INF/additional-spring-configuration-metadata.json进行更多细节定义。简单使用如下:
@Slf4j
@Component
public class RedisCommandLineRunner implements CommandLineRunner {@Autowired@Qualifier("singleRedisConnection")private StatefulRedisConnection<String, String> connection;@Overridepublic void run(String... args) throws Exception {RedisCommands<String, String> redisCommands = connection.sync();redisCommands.setex("name", 5, "throwable");log.info("Get value:{}", redisCommands.get("name"));}
}
// Get value:throwable
小结
本文算是基于Lettuce的官方文档,对它的使用进行全方位的分析,包括主要功能、配置都做了一些示例,限于篇幅部分特性和配置细节没有分析。Lettuce已经被spring-data-redis接纳作为官方的Redis客户端驱动,所以值得信赖,它的一些API设计确实比较合理,扩展性高的同时灵活性也高。个人建议,基于Lettuce包自行添加配置到SpringBoot应用用起来会得心应手,毕竟RedisTemplate实在太笨重,而且还屏蔽了Lettuce一些高级特性和灵活的API。
参考资料:
Lettuce Reference Guide
链接
Github Page:http://www.throwable.club/2019/09/28/redis-client-driver-lettuce-usage
Coding Page:http://throwable.coding.me/2019/09/28/redis-client-driver-lettuce-usage
相关文章:
【Redis】Redis高级客户端Lettuce详解
文章目录前提Lettuce简介连接Redis定制的连接URI语法基本使用API同步API异步API反应式API发布和订阅事务和批量命令执行Lua脚本执行高可用和分片普通主从模式哨兵模式集群模式动态命令和自定义命令高阶特性配置客户端资源使用连接池几个常见的渐进式删除例子在SpringBoot中使用…...
Qt——自定义界面之QStyle
1. Qt控件结构简介 首先我们要来讲讲GUI控件结构,这里以QComboBox为例: 一个完整的控件由一种或多种GUI元素构成: Complex Control Element。Primitive Element。Control Element。 1.1 Complex Control Element Complex control elements …...
指针和数组面试题(逐题分析,完善你可能遗漏的知识)
人生不是一种享乐,而是一桩十分沉重的工作。 —— 列夫托尔斯泰 前言:之前我们就学习了数组和指针的知识。 数组:数组就是能够存放一组相同类型的元素,数组的大小取决于数组的元素个数和元素类型。 指针:…...
centos7搭建nfs挂载日志目录完整步骤
NFS服务器配置 1.安装NFS服务 首先使用yum安装nfs服务: yum -y install rpcbind nfs-utils 2.创建共享目录 在服务器上创建共享目录,并设置权限。 mkdir /data/share/ chmod 755 -R /data/share/ 3.配置NFS nfs的配置文件是 /etc/exports &…...
三、JavaScript
目录 一、JavaScript和html代码的结合方式 二、javascript和java的区别 1、变量 2、运算 3、数组(重点) 4、函数 5、重载 6、隐形参数arguments 7、js中的自定义对象 三、js中的事件 四、DOM模型 五、正则表达式 一、JavaScript和html代码的结合方…...
深圳大学计软《面向对象的程序设计》实验11 多继承
A. 在职研究生(多重继承) 题目描述 1、建立如下的类继承结构: 1)定义一个人员类CPeople,其属性(保护类型)有:姓名、性别、年龄; 2)从CPeople类派生出学生类CStudent,…...
并发变成实战-原子变量与非阻塞同步机制
文章目录1.锁的劣势2.硬件对并发的支持2.1 比较并交换2.2 非阻塞的计数器3.原子变量类3.1 原子变量是一种“更好的volatile”3.2 性能比较:锁与原子变量4.非阻塞算法4.1 非阻塞的栈4.2 非阻塞的链表4.3 ABA问题非阻塞算法设计和实现上要复杂的多,但在可伸…...
sql数据库常用操作指令
一、操作库-- 创建库create database db1;-- 创建库是否存在,不存在则创建create database if not exists db1;-- 查看所有数据库show databases;-- 查看某个数据库的定义信息 show create database db1; -- 修改数据库字符信息alter database db1 character set ut…...
4-1 定时任务的示例10个
文章目录前言基本命令与格式示例前言 Linux crontab 是用来定期执行程序的命令。当安装完成操作系统之后,默认都已经安装,并启动此任务调度命令。 crond 命令每分钟会定期检查是否有要执行的工作,如果有要执行的工作便会自动执行该工作。 基…...
外贸建站多少钱才能达到预期效果?
外贸建站多少钱才能达到预期效果?这是每个外贸企业都会问的问题。作为一个做外贸建站多年的人,我有一些个人的操盘感想。 首先,我认为外贸建站的投资是非常必要的。 因为在现代社会,网站已经成为外贸企业开展业务的必要工具之一…...
【Java学习笔记】5.Java 基本数据类型
Java 基本数据类型 变量就是申请内存来存储值。也就是说,当创建变量的时候,需要在内存中申请空间。 内存管理系统根据变量的类型为变量分配存储空间,分配的空间只能用来储存该类型数据。 因此,通过定义不同类型的变量…...
InnoDB 死锁和问题排查
文章目录死锁(dead lock)示例 1问题排查查看连接的线程查看相关的表查看最近一次的死锁信息查看服务器的锁信息查看正在使用的表如何尽可能地避免死锁死锁(dead lock) 两个及以上的事务各自持有对方需要的锁,导致双方…...
tensorflow07——使用tf.keras搭建神经网络(Sequential顺序神经网络)——六步法——鸢尾花数据集分类
使用tf.keras搭建顺序神经网络 六步法——鸢尾花数据集分类 01 导入相关包 02 导入数据集,打乱顺序 03 建立Sequential模型 04 编译——确定优化器,损失函数,评测指标(用哪一种准确率) 05 训练模型——把各项参入填入…...
关于Java连接Hive,Spark等服务的Kerberos工具类封装
关于Java连接Hive,Spark等服务的Kerberos工具类封装 idea连接服务器的hive等相关服务的kerberos认证注意事项 idea 本地配置,连接服务器;进行kerberos认证,连接hive、HDFS、Spark等服务注意事项: 本地idea连接Hadoo…...
大数据框架之Hadoop:MapReduce(五)Yarn资源调度器
Apache YARN (Yet Another Resource Negotiator) 是 hadoop 2.0 引入的集群资源管理系统。用户可以将各种服务框架部署在 YARN 上,由 YARN 进行统一地管理和资源分配。 简言之,Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源&…...
uniapp实现地图点聚合功能
前言 在工作中接到的一个任务,在app端实现如下功能: 地图点聚合地图页面支持tab切换(设备、劳务、人员)支持人员搜索显示分布 但是uniapp原有的map标签不支持点聚合功能(最新的版本支持了点聚合功能)&am…...
经典分类模型回顾2—GoogleNet实现图像分类(matlab版)
GoogleNet是深度学习领域的一种经典的卷积神经网络,其在ImageNet图像分类任务上的表现十分优秀。下面是使用Matlab实现GoogleNet的图像分类示例。 1. 数据准备 在开始之前,需要准备一些图像数据用来训练和测试模型,可以从ImageNet等数据集中…...
Java经典面试题——谈谈 final、finally、finalize 有什么不同?
典型回答 final 可以用来修饰类、方法、变量,分别有不同的意义,final 修饰的 class 代表不可以继承扩展, final 的变量是不可以修改的,而 final 的方法也是不可以重写的(override)。 finally 则是 Java 保…...
C#的Version类型值与SQL Server中二进制binary类型转换
使用C#语言编写的应用程序可以通过.NET Framework框架提供的Version类来控制每次发布的版本号,以便更好控制每次版本更新迭代。 版本号由两到四个组件组成:主要、次要、内部版本和修订。 版本号的格式如下所示, 可选组件显示在方括号 ([ 和…...
软测入门(五)接口测试Postman
Postman 一款Http接口收工测试工具。如果做自动化测试会使用jemter做。 安装 去官网下载即可。 https://www.postman.com/downloads/?utm_sourcepostman-home 功能介绍 页面上的单词基本上都能了解,不多介绍。 转代码&注释 可将接口的访问转为其他语言的…...
UWB通道选择、信号阻挡和反射对UWB定位范围和定位精度的影响
(一)介绍检查NLOS操作时需要考虑三个方面:(1)由于整体信号衰减,通信范围减小。(2)由于直接路径信号的衰减,导致直接路径检测范围的减小。(3)由于阻…...
linux基本功之列之wget命令实战
文章目录前言一. wget命令介绍二. 语法格式及常用选项三. 参考案例3.1 下载单个文件3.2 使用wget -o 下载文件并改名3.3 -c 参数,下载断开链接时,可以恢复下载3.4 wget后台下载3.5 使用wget下载整个网站四. 补充与汇总常见用法总结前言 大家好ÿ…...
学习ROS时针对gazebo相关的问题(重装与卸载是永远的神)
ResourceNotFound:gazebo_ros 错误解决 参考:https://blog.csdn.net/weixin_42591529/article/details/123869969 当将机器人加载到gazebo时,运行launch文件出现如下错误 这是由于缺少gazebo包所导致的。 解决办法:...
几个C语言容易忽略的问题
1 取模符号自增问题 我们不妨尝试写这样的程序 #include<stdio.h> int main(){int n,t5;printf("%d\n",7%(-3));//1printf("%d\n",(-7)%3);//-1while(--t)printf("%d\n",t);t5;while(t--)printf("%d\n",t);return 0; } 运行…...
CentOS 7.9安装Zabbix 4.4《保姆级教程》
CentOS 7.9安装Zabbix 4.4一、配置一览二、环境准备设置Selinux和firewalld设置软件源1.配置ustc CentOS-Base源2.安装zabbix 4.4官方源3.安装并更换epel源4.清除并生成缓存三、安装并配置Zabbix Server安装zabbix组件安装php安装mariadb并创建数据库修改zabbix_server.conf设置…...
路由器与交换机的区别(基础知识)
文章目录交换机路由器路由器和交换机的区别(1)工作层次不同(2)数据转发所依据的对象不同(3)传统的交换机只能分割冲突域,不能分割广播域;而路由器可以分割广播域(4&#…...
Python基础学习9——函数
基本概念 函数是一种能够完成某项任务的封装工具。在数学中,函数是自变量到因变量的一种映射,通过某种方式能够使自变量的值变成因变量的值。其实本质上也是实现了某种值的转换的任务。 函数的定义 在python中,函数是利用def来进行定义&am…...
项目中的MD5、盐值加密
首先介绍一下MD5,而项目中用的是MD5和盐值来确保密码的安全性; 1. md5简介 md5的全称是md5信息摘要算法(英文:MD5 Message-Digest Algorithm ),一种被广泛使用的密码散列函数,可以产生一个128位…...
电商项目后端框架SpringBoot、MybatisPlus
后端框架基础 1.代码自动生成工具 mybatis-plus (1)首先需要添加依赖文件 <dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.2.2</version></dependency><de…...
2023年03月IDE流行度最新排名
点击查看最新IDE流行度最新排名(每月更新) 2023年03月IDE流行度最新排名 顶级IDE排名是通过分析在谷歌上搜索IDE下载页面的频率而创建的 一个IDE被搜索的次数越多,这个IDE就被认为越受欢迎。原始数据来自谷歌Trends 如果您相信集体智慧&am…...
银川网站推广/seo狂人
我们经常需要写代码,查看代码,查看代码时一般会用记事本,但是记事本的功能太弱,而写代码时有的编译器大的编辑功能太弱,像IAR keil之类的,因此需要一个小巧好用的替代记事本的编辑软件。Notepad我用了很久了…...
东莞网站建设服务/营销新闻
在函数内部,可以调用其他函数。如果一个函数在内部调用自身本身,这个函数就是递归函数。举个例子,我们来计算阶乘n! 1 * 2 * 3 * ... * n,用函数fact(n)表示,可以看出:fact(n) n! 1 x 2 x 3 x ... x (n-…...
网站开发的最后五个阶段/网站建设方案及报价
有朋友会问“为什么我的在每天晚上10点钟运行的非常缓慢?”。有经验的朋友会想到为CBO提供了自动收集数据库对象统计信息的功能,称之为“”。与之对应的Job正是在周一至周五的晚上10:00到第二天早上的6:00以及周六周日全天这个中来完成的。我们探索一下A…...
wordpress无法评论/seo网站优化方法
项目背景和意义 目的:本课题主要目标是设计并能够实现一个基于微信小程序运动场地预约系统,前台用户使用小程序,后台管理使用基PHPMySql的B/S架构;通过后台添加开放的场地类型(比如羽毛球、篮球、网球等)、…...
贵阳做网站开发科技有限公司/有没有免费的广告平台
1. 检测对象是不是数组 instanceof操作符 Array.isArray()方法 var color new Array("red", "green");console.log(Array.isArray(color)); //true 2. 转换方法 toString() 该方法会输出每一项,并以,连接,实际上该方法会调用数组…...
邢台网站建设03191688/哪里有网页设计公司
你的linux减肥了吗我们知道linux系统稳定,而忽视系统管理,日积月累,系统不堪任负,系统就会出现莫明其妙的问题,其中我们维护之一,就需要对系统减肥,我们使用到的命令有:find1. 我们删除账户之后,就会存在一些无用垃圾文件及目录,我们要找出属于这个用户的垃圾东西find / -user …...