Redisson—分布式集合
7.1. 映射(Map)
基于Redis的Redisson的分布式映射结构的RMap Java对象实现了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。与HashMap不同的是,RMap保持了元素的插入顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
除了同步接口外,还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。如果你想用Redis Map来保存你的POJO的话,可以考虑使用分布式实时对象(Live Object)服务。
在特定的场景下,映射缓存(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,可以使用Redisson提供的带有本地缓存功能的映射。
RMap<String, SomeObject> map = redisson.getMap("anyMap");
SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");map.fastPut("321", new SomeObject());
map.fastRemove("321");RFuture<SomeObject> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");
映射的字段锁的用法:
RMap<MyKey, MyValue> map = redisson.getMap("anyMap");
MyKey k = new MyKey();
RLock keyLock = map.getLock(k);
keyLock.lock();
try {MyValue v = map.get(k);// 其他业务逻辑
} finally {keyLock.unlock();
}RReadWriteLock rwLock = map.getReadWriteLock(k);
rwLock.readLock().lock();
try {MyValue v = map.get(k);// 其他业务逻辑
} finally {keyLock.readLock().unlock();
}
7.1.1. 映射(Map)的元素淘汰(Eviction),本地缓存(LocalCache)和数据分片(Sharding)
Redisson提供了一系列的映射类型的数据结构,这些结构按特性主要分为三大类:
- 元素淘汰(Eviction) 类 -- 带有元素淘汰(Eviction)机制的映射类允许针对一个映射中每个元素单独设定 有效时间 和 最长闲置时间 。
- 本地缓存(LocalCache) 类 -- 本地缓存(Local Cache)也叫就近缓存(Near Cache)。这类映射的使用主要用于在特定的场景下,映射缓存(MapCache)上的高度频繁的读取操作,使网络通信都被视为瓶颈的情况。Redisson与Redis通信的同时,还将部分数据保存在本地内存里。这样的设计的好处是它能将读取速度提高最多 45倍 。 所有同名的本地缓存共用一个订阅发布话题,所有更新和过期消息都将通过该话题共享。
- 数据分片(Sharding) 类 -- 数据分片(Sharding)类仅适用于Redis集群环境下,因此带有数据分片(Sharding)功能的映射也叫集群分布式映射。它利用分库的原理,将单一一个映射结构切分为若干个小的映射,并均匀的分布在集群中的各个槽里。这样的设计能使一个单一映射结构突破Redis自身的容量限制,让其容量随集群的扩大而增长。在扩容的同时,还能够使读写性能和元素淘汰处理能力随之成线性增长。
以下列表是Redisson提供的所有映射的名称及其特性:
接口名称 | RedissonClient | 本地缓存功能 | 数据分片功能 | 元素淘汰功能 |
RMap | getMap() | No | No | No |
RMapCache | getMapCache() | No | No | Yes |
RLocalCachedMap | getLocalCachedMap() | Yes | No | No |
RLocalCachedMap | getLocalCachedMapCache() | Yes | No | Yes |
RClusteredMap | getClusteredMap() | No | Yes | No |
RClusteredMapCache | getClusteredMapCache() | No | Yes | Yes |
RClusteredLocal | getClusteredLocal | Yes | Yes | No |
RClusteredLocal | getClusteredLocal | Yes | Yes | Yes |
除此以外,Redisson还提供了Spring Cache和JCache的实现。
元素淘汰功能(Eviction)
Redisson的分布式的RMapCache Java对象在基于RMap的前提下实现了针对单个元素的淘汰机制。同时仍然保留了元素的插入顺序。由于RMapCache是基于RMap实现的,使它同时继承了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。Redisson提供的Spring Cache整合以及JCache正是基于这样的功能来实现的。
目前的Redis自身并不支持散列(Hash)当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理300个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到1小时之间。比如该次清理时删除了300条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。
RMapCache<String, SomeObject> map = redisson.getMapCache("anyMap");
// 有效时间 ttl = 10分钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES);
// 有效时间 ttl = 10分钟, 最长闲置时间 maxIdleTime = 10秒钟
map.put("key1", new SomeObject(), 10, TimeUnit.MINUTES, 10, TimeUnit.SECONDS);// 有效时间 = 3 秒钟
map.putIfAbsent("key2", new SomeObject(), 3, TimeUnit.SECONDS);
// 有效时间 ttl = 40秒钟, 最长闲置时间 maxIdleTime = 10秒钟
map.putIfAbsent("key2", new SomeObject(), 40, TimeUnit.SECONDS, 10, TimeUnit.SECONDS);
本地缓存功能(Local Cache)
在特定的场景下,映射(Map)上的高度频繁的读取操作,使网络通信都被视为瓶颈时,使用Redisson提供的带有本地缓存功能的分布式本地缓存映射RLocalCachedMapJava对象会是一个很好的选择。它同时实现了java.util.concurrent.ConcurrentMap和java.util.Map两个接口。本地缓存功能充分的利用了JVM的自身内存空间,对部分常用的元素实行就地缓存,这样的设计让读取操作的性能较分布式映射相比提高最多 45倍 。以下配置参数可以用来创建这个实例:
LocalCachedMapOptions options = LocalCachedMapOptions.defaults()// 用于淘汰清除本地缓存内的元素// 共有以下几种选择:// LFU - 统计元素的使用频率,淘汰用得最少(最不常用)的。// LRU - 按元素使用时间排序比较,淘汰最早(最久远)的。// SOFT - 元素用Java的WeakReference来保存,缓存元素通过GC过程清除。// WEAK - 元素用Java的SoftReference来保存, 缓存元素通过GC过程清除。// NONE - 永不淘汰清除缓存元素。.evictionPolicy(EvictionPolicy.NONE)// 如果缓存容量值为0表示不限制本地缓存容量大小.cacheSize(1000)// 以下选项适用于断线原因造成了未收到本地缓存更新消息的情况。// 断线重连的策略有以下几种:// CLEAR - 如果断线一段时间以后则在重新建立连接以后清空本地缓存// LOAD - 在服务端保存一份10分钟的作废日志// 如果10分钟内重新建立连接,则按照作废日志内的记录清空本地缓存的元素// 如果断线时间超过了这个时间,则将清空本地缓存中所有的内容// NONE - 默认值。断线重连时不做处理。.reconnectionStrategy(ReconnectionStrategy.NONE)// 以下选项适用于不同本地缓存之间相互保持同步的情况// 缓存同步策略有以下几种:// INVALIDATE - 默认值。当本地缓存映射的某条元素发生变动时,同时驱逐所有相同本地缓存映射内的该元素// UPDATE - 当本地缓存映射的某条元素发生变动时,同时更新所有相同本地缓存映射内的该元素// NONE - 不做任何同步处理.syncStrategy(SyncStrategy.INVALIDATE)// 每个Map本地缓存里元素的有效时间,默认毫秒为单位.timeToLive(10000)// 或者.timeToLive(10, TimeUnit.SECONDS)// 每个Map本地缓存里元素的最长闲置时间,默认毫秒为单位.maxIdle(10000)// 或者.maxIdle(10, TimeUnit.SECONDS);
RLocalCachedMap<String, Integer> map = redisson.getLocalCachedMap("test", options);String prevObject = map.put("123", 1);
String currentObject = map.putIfAbsent("323", 2);
String obj = map.remove("123");// 在不需要旧值的情况下可以使用fast为前缀的类似方法
map.fastPut("a", 1);
map.fastPutIfAbsent("d", 32);
map.fastRemove("b");RFuture<String> putAsyncFuture = map.putAsync("321");
RFuture<Void> fastPutAsyncFuture = map.fastPutAsync("321");map.fastPutAsync("321", new SomeObject());
map.fastRemoveAsync("321");
当不再使用Map本地缓存对象的时候应该手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
RLocalCachedMap<String, Integer> map = ...
map.destroy();
如何通过加载数据的方式来降低过期淘汰事件发布信息对网络的影响
代码范例:
public void loadData(String cacheName, Map<String, String> data) {RLocalCachedMap<String, String> clearMap = redisson.getLocalCachedMap(cacheName, LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.INVALIDATE));RLocalCachedMap<String, String> loadMap = redisson.getLocalCachedMap(cacheName, LocalCachedMapOptions.defaults().cacheSize(1).syncStrategy(SyncStrategy.NONE));loadMap.putAll(data);clearMap.clearLocalCache();
}
数据分片功能(Sharding)
Map数据分片是Redis集群模式下的一个功能。Redisson提供的分布式集群映射RClusteredMap Java对象也是基于RMap实现的。它同时实现了java.util.concurrent.ConcurrentMap和java.util.Map两个接口。在这里可以获取更多的内部信息。
RClusteredMap<String, SomeObject> map = redisson.getClusteredMap("anyMap");SomeObject prevObject = map.put("123", new SomeObject());
SomeObject currentObject = map.putIfAbsent("323", new SomeObject());
SomeObject obj = map.remove("123");map.fastPut("321", new SomeObject());
map.fastRemove("321");
7.1.2. 映射持久化方式(缓存策略)
Redisson供了将映射中的数据持久化到外部储存服务的功能。主要场景有一下几种:
- 将Redisson的分布式映射类型作为业务和外部储存媒介之间的缓存。
- 或是用来增加Redisson映射类型中数据的持久性,或是用来增加已被驱逐的数据的寿命。
- 或是用来缓存数据库,Web服务或其他数据源的数据。
Read-through策略
通俗的讲,如果一个被请求的数据不存在于Redisson的映射中的时候,Redisson将通过预先配置好的MapLoader对象加载数据。
Write-through(数据同步写入)策略
在遇到映射中某条数据被更改时,Redisson会首先通过预先配置好的MapWriter对象写入到外部储存系统,然后再更新Redis内的数据。
Write-behind(数据异步写入)策略
对映射的数据的更改会首先写入到Redis,然后再使用异步的方式,通过MapWriter对象写入到外部储存系统。在并发环境下可以通过writeBehindThreads参数来控制写入线程的数量,已达到对外部储存系统写入并发量的控制。
以上策略适用于所有实现了RMap、RMapCache、RLocalCachedMap和RLocalCachedMapCache接口的对象。
配置范例:
MapOptions<K, V> options = MapOptions.<K, V>defaults().writer(myWriter).loader(myLoader);RMap<K, V> map = redisson.getMap("test", options);
// 或
RMapCache<K, V> map = redisson.getMapCache("test", options);
// 或
RLocalCachedMap<K, V> map = redisson.getLocalCachedMap("test", options);
// 或
RLocalCachedMapCache<K, V> map = redisson.getLocalCachedMapCache("test", options);
7.1.3. 映射监听器(Map Listener)
Redisson为所有实现了RMapCache或RLocalCachedMapCache接口的对象提供了监听以下事件的监听器:
事件 | 监听器 元素 添加 事件 | org.redisson.api.map.event.EntryCreatedListener
元素 过期 事件 | org.redisson.api.map.event.EntryExpiredListener
元素 删除 事件 | org.redisson.api.map.event.EntryRemovedListener
元素 更新 事件 | org.redisson.api.map.event.EntryUpdatedListener
使用范例:
RMapCache<String, Integer> map = redisson.getMapCache("myMap");
// 或
RLocalCachedMapCache<String, Integer> map = redisson.getLocalCachedMapCache("myMap", options);int updateListener = map.addListener(new EntryUpdatedListener<Integer, Integer>() {@Overridepublic void onUpdated(EntryEvent<Integer, Integer> event) {event.getKey(); // 字段名event.getValue() // 新值event.getOldValue() // 旧值// ...}
});int createListener = map.addListener(new EntryCreatedListener<Integer, Integer>() {@Overridepublic void onCreated(EntryEvent<Integer, Integer> event) {event.getKey(); // 字段名event.getValue() // 值// ...}
});int expireListener = map.addListener(new EntryExpiredListener<Integer, Integer>() {@Overridepublic void onExpired(EntryEvent<Integer, Integer> event) {event.getKey(); // 字段名event.getValue() // 值// ...}
});int removeListener = map.addListener(new EntryRemovedListener<Integer, Integer>() {@Overridepublic void onRemoved(EntryEvent<Integer, Integer> event) {event.getKey(); // 字段名event.getValue() // 值// ...}
});map.removeListener(updateListener);
map.removeListener(createListener);
map.removeListener(expireListener);
map.removeListener(removeListener);
7.1.4. LRU有界映射
Redisson提供了基于Redis的以LRU为驱逐策略的分布式LRU有界映射对象。顾名思义,分布式LRU有界映射允许通过对其中元素按使用时间排序处理的方式,主动移除超过规定容量限制的元素。
RMapCache<String, String> map = redisson.getMapCache("map");
// 尝试将该映射的最大容量限制设定为10
map.trySetMaxSize(10);// 将该映射的最大容量限制设定或更改为10
map.setMaxSize(10);map.put("1", "2");
map.put("3", "3", 1, TimeUnit.SECONDS);
7.2. 多值映射(Multimap)
基于Redis的Redisson的分布式RMultimap Java对象允许Map中的一个字段值包含多个元素。 字段总数受Redis限制,每个Multimap最多允许有4 294 967 295个不同字段。
7.2.1. 基于集(Set)的多值映射(Multimap)
基于Set的Multimap不允许一个字段值包含有重复的元素。
RSetMultimap<SimpleKey, SimpleValue> map = redisson.getSetMultimap("myMultimap");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("3"), new SimpleValue("4"));Set<SimpleValue> allValues = map.get(new SimpleKey("0"));List<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
Set<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);Set<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
7.2.2. 基于列表(List)的多值映射(Multimap)
基于List的Multimap在保持插入顺序的同时允许一个字段下包含重复的元素。
RListMultimap<SimpleKey, SimpleValue> map = redisson.getListMultimap("test1");
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("0"), new SimpleValue("2"));
map.put(new SimpleKey("0"), new SimpleValue("1"));
map.put(new SimpleKey("3"), new SimpleValue("4"));List<SimpleValue> allValues = map.get(new SimpleKey("0"));Collection<SimpleValue> newValues = Arrays.asList(new SimpleValue("7"), new SimpleValue("6"), new SimpleValue("5"));
List<SimpleValue> oldValues = map.replaceValues(new SimpleKey("0"), newValues);List<SimpleValue> removedValues = map.removeAll(new SimpleKey("0"));
7.2.3. 多值映射(Multimap)淘汰机制(Eviction)
Multimap对象的淘汰机制是通过不同的接口来实现的。它们是RSetMultimapCache接口和RListMultimapCache接口,分别对应的是Set和List的Multimaps。
所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到2小时之间。比如该次清理时删除了100条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。
RSetMultimapCache的使用范例:
RSetMultimapCache<String, String> multimap = redisson.getSetMultimapCache("myMultimap");
multimap.put("1", "a");
multimap.put("1", "b");
multimap.put("1", "c");multimap.put("2", "e");
multimap.put("2", "f");multimap.expireKey("2", 10, TimeUnit.MINUTES);
7.3. 集(Set)
基于Redis的Redisson的分布式Set结构的RSet Java对象实现了java.util.Set接口。通过元素的相互状态比较保证了每个元素的唯一性。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
RSet<SomeObject> set = redisson.getSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());
Redisson PRO版本中的Set对象还可以在Redis集群环境下支持单集合数据分片。
7.3.1. 集(Set)淘汰机制(Eviction)
基于Redis的Redisson的分布式RSetCache Java对象在基于RSet的前提下实现了针对单个元素的淘汰机制。由于RSetCache是基于RSet实现的,使它还集成了java.util.Set接口。
目前的Redis自身并不支持Set当中的元素淘汰,因此所有过期元素都是通过org.redisson.EvictionScheduler实例来实现定期清理的。为了保证资源的有效利用,每次运行最多清理100个过期元素。任务的启动时间将根据上次实际清理数量自动调整,间隔时间趋于1秒到2小时之间。比如该次清理时删除了100条元素,那么下次执行清理的时间将在1秒以后(最小间隔时间)。一旦该次清理数量少于上次清理数量,时间间隔将增加1.5倍。
RSetCache<SomeObject> set = redisson.getSetCache("anySet");
// ttl = 10 seconds
set.add(new SomeObject(), 10, TimeUnit.SECONDS);
7.3.2. 集(Set)数据分片(Sharding)
Set数据分片是Redis集群模式下的一个功能。Redisson提供的分布式RClusteredSet Java对象也是基于RSet实现的。在这里可以获取更多的信息。
RClusteredSet<SomeObject> set = redisson.getClusteredSet("anySet");
set.add(new SomeObject());
set.remove(new SomeObject());
除了RClusteredSet以外,Redisson还提供了另一种集群模式下的分布式集(Set),它不仅提供了透明的数据分片功能,还为每个元素提供了淘汰机制。RClusteredSetCache 类分别同时提供了RClusteredSet 和RSetCache 这两个接口的实现。当然这些都是基于java.util.Set的接口实现上的。
该功能仅限于Redisson PRO版本。
7.4. 有序集(SortedSet)
基于Redis的Redisson的分布式RSortedSet Java对象实现了java.util.SortedSet接口。在保证元素唯一性的前提下,通过比较器(Comparator)接口实现了对元素的排序。
RSortedSet<Integer> set = redisson.getSortedSet("anySet");
set.trySetComparator(new MyComparator()); // 配置元素比较器
set.add(3);
set.add(1);
set.add(2);set.removeAsync(0);
set.addAsync(5);
7.5. 计分排序集(ScoredSortedSet)
基于Redis的Redisson的分布式RScoredSortedSet Java对象是一个可以按插入时指定的元素评分排序的集合。它同时还保证了元素的唯一性。
RScoredSortedSet<SomeObject> set = redisson.getScoredSortedSet("simple");set.add(0.13, new SomeObject(a, b));
set.addAsync(0.251, new SomeObject(c, d));
set.add(0.302, new SomeObject(g, d));set.pollFirst();
set.pollLast();int index = set.rank(new SomeObject(g, d)); // 获取元素在集合中的位置
Double score = set.getScore(new SomeObject(g, d)); // 获取元素的评分
7.6. 字典排序集(LexSortedSet)
基于Redis的Redisson的分布式RLexSortedSet Java对象在实现了java.util.Set<String>接口的同时,将其中的所有字符串元素按照字典顺序排列。它公式还保证了字符串元素的唯一性。
RLexSortedSet set = redisson.getLexSortedSet("simple");
set.add("d");
set.addAsync("e");
set.add("f");set.lexRangeTail("d", false);
set.lexCountHead("e");
set.lexRange("d", true, "z", false);
7.7. 列表(List)
基于Redis的Redisson分布式列表(List)结构的RList Java对象在实现了java.util.List接口的同时,确保了元素插入时的顺序。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
RList<SomeObject> list = redisson.getList("anyList");
list.add(new SomeObject());
list.get(0);
list.remove(new SomeObject());
7.8. 队列(Queue)
基于Redis的Redisson分布式无界队列(Queue)结构的RQueue Java对象实现了java.util.Queue接口。尽管RQueue对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
RQueue<SomeObject> queue = redisson.getQueue("anyQueue");
queue.add(new SomeObject());
SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
7.9. 双端队列(Deque)
基于Redis的Redisson分布式无界双端队列(Deque)结构的RDeque Java对象实现了java.util.Deque接口。尽管RDeque对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
RDeque<SomeObject> queue = redisson.getDeque("anyDeque");
queue.addFirst(new SomeObject());
queue.addLast(new SomeObject());
SomeObject obj = queue.removeFirst();
SomeObject someObj = queue.removeLast();
7.10. 阻塞队列(Blocking Queue)
基于Redis的Redisson分布式无界阻塞队列(Blocking Queue)结构的RBlockingQueue Java对象实现了java.util.concurrent.BlockingQueue接口。尽管RBlockingQueue对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
RBlockingQueue<SomeObject> queue = redisson.getBlockingQueue("anyQueue");
queue.offer(new SomeObject());SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。
7.11. 有界阻塞队列(Bounded Blocking Queue)
基于Redis的Redisson分布式有界阻塞队列(Bounded Blocking Queue)结构的RBoundedBlockingQueue Java对象实现了java.util.concurrent.BlockingQueue接口。该对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。队列的初始容量(边界)必须在使用前设定好。
RBoundedBlockingQueue<SomeObject> queue = redisson.getBoundedBlockingQueue("anyQueue");
// 如果初始容量(边界)设定成功则返回`真(true)`,
// 如果初始容量(边界)已近存在则返回`假(false)`。
queue.trySetCapacity(2);queue.offer(new SomeObject(1));
queue.offer(new SomeObject(2));
// 此时容量已满,下面代码将会被阻塞,直到有空闲为止。
queue.put(new SomeObject());SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。
7.12. 阻塞双端队列(Blocking Deque)
基于Redis的Redisson分布式无界阻塞双端队列(Blocking Deque)结构的RBlockingDeque Java对象实现了java.util.concurrent.BlockingDeque接口。尽管RBlockingDeque对象无初始大小(边界)限制,但对象的最大容量受Redis限制,最大元素数量是4 294 967 295个。
RBlockingDeque<Integer> deque = redisson.getBlockingDeque("anyDeque");
deque.putFirst(1);
deque.putLast(2);
Integer firstValue = queue.takeFirst();
Integer lastValue = queue.takeLast();
Integer firstValue = queue.pollFirst(10, TimeUnit.MINUTES);
Integer lastValue = queue.pollLast(3, TimeUnit.MINUTES);
poll, pollFromAny, pollLastAndOfferFirstTo和take方法内部采用话题订阅发布实现,在Redis节点故障转移(主从切换)或断线重连以后,内置的相关话题监听器将自动完成话题的重新订阅。
7.13. 阻塞公平队列(Blocking Fair Queue)
基于Redis的Redisson分布式无界阻塞公平队列(Blocking Fair Queue)结构的RBlockingFairQueue Java对象在实现Redisson分布式无界阻塞队列(Blocking Queue)结构RBlockingQueue接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。
以分布式无界阻塞队列为基础,采用公平获取消息的机制,不仅保证了poll、pollFromAny、pollLastAndOfferFirstTo和take方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。
RBlockingFairQueue queue = redisson.getBlockingFairQueue("myQueue");
queue.offer(new SomeObject());SomeObject obj = queue.peek();
SomeObject someObj = queue.poll();
SomeObject ob = queue.poll(10, TimeUnit.MINUTES);
该功能仅限于Redisson PRO版本。
7.14. 阻塞公平双端队列(Blocking Fair Deque)
基于Redis的Redisson分布式无界阻塞公平双端队列(Blocking Fair Deque)结构的RBlockingFairDeque Java对象在实现Redisson分布式无界阻塞双端队列(Blocking Deque)结构RBlockingDeque接口的基础上,解决了多个队列消息的处理者在复杂的网络环境下,网络延时的影响使“较远”的客户端最终收到消息数量低于“较近”的客户端的问题。从而解决了这种现象引发的个别处理节点过载的情况。
以分布式无界阻塞双端队列为基础,采用公平获取消息的机制,不仅保证了poll、take、pollFirst、takeFirst、pollLast和takeLast方法获取消息的先入顺序,还能让队列里的消息被均匀的发布到处在复杂分布式环境中的各个处理节点里。
RBlockingFairDeque deque = redisson.getBlockingFairDeque("myDeque");
deque.offer(new SomeObject());SomeObject firstElement = queue.peekFirst();
SomeObject firstElement = queue.pollFirst();
SomeObject firstElement = queue.pollFirst(10, TimeUnit.MINUTES);
SomeObject firstElement = queue.takeFirst();SomeObject lastElement = queue.peekLast();
SomeObject lastElement = queue.pollLast();
SomeObject lastElement = queue.pollLast(10, TimeUnit.MINUTES);
SomeObject lastElement = queue.takeLast();
该功能仅限于Redisson PRO版本。
7.15. 延迟队列(Delayed Queue)
基于Redis的Redisson分布式延迟队列(Delayed Queue)结构的RDelayedQueue Java对象在实现了RQueue接口的基础上提供了向队列按要求延迟添加项目的功能。该功能可以用来实现消息传送延迟按几何增长或几何衰减的发送策略。
RQueue<String> distinationQueue = ...
RDelayedQueue<String> delayedQueue = getDelayedQueue(distinationQueue);
// 10秒钟以后将消息发送到指定队列
delayedQueue.offer("msg1", 10, TimeUnit.SECONDS);
// 一分钟以后将消息发送到指定队列
delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
在该对象不再需要的情况下,应该主动销毁。仅在相关的Redisson对象也需要关闭的时候可以不用主动销毁。
RDelayedQueue<String> delayedQueue = ...
delayedQueue.destroy();
7.16. 优先队列(Priority Queue)
基于Redis的Redisson分布式优先队列(Priority Queue)Java对象实现了java.util.Queue的接口。可以通过比较器(Comparator)接口来对元素排序。
RPriorityQueue<Integer> queue = redisson.getPriorityQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(3);
queue.add(1);
queue.add(2);queue.removeAsync(0);
queue.addAsync(5);queue.poll();
7.17. 优先双端队列(Priority Deque)
基于Redis的Redisson分布式优先双端队列(Priority Deque)Java对象实现了java.util.Deque的接口。可以通过比较器(Comparator)接口来对元素排序。
RPriorityDeque<Integer> queue = redisson.getPriorityDeque("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.addLast(3);
queue.addFirst(1);
queue.add(2);queue.removeAsync(0);
queue.addAsync(5);queue.pollFirst();
queue.pollLast();
7.18. 优先阻塞队列(Priority Blocking Queue)
基于Redis的分布式无界优先阻塞队列(Priority Blocking Queue)Java对象的结构与java.util.concurrent.PriorityBlockingQueue类似。可以通过比较器(Comparator)接口来对元素排序。PriorityBlockingQueue的最大容量是4 294 967 295个元素。
RPriorityBlockingQueue<Integer> queue = redisson.getPriorityBlockingQueue("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(3);
queue.add(1);
queue.add(2);queue.removeAsync(0);
queue.addAsync(5);queue.take();
当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll,pollLastAndOfferFirstTo或take方法的对象Redisson将自动再次为其订阅相关的话题。
7.19. 优先阻塞双端队列(Priority Blocking Deque)
基于Redis的分布式无界优先阻塞双端队列(Priority Blocking Deque)Java对象实现了java.util.Deque的接口。addLast、 addFirst、push方法不能再这个对里使用。PriorityBlockingDeque的最大容量是4 294 967 295个元素。
RPriorityBlockingDeque<Integer> queue = redisson.getPriorityBlockingDeque("anyQueue");
queue.trySetComparator(new MyComparator()); // 指定对象比较器
queue.add(2);queue.removeAsync(0);
queue.addAsync(5);queue.pollFirst();
queue.pollLast();
queue.takeFirst();
queue.takeLast();
当Redis服务端断线重连以后,或Redis服务端发生主从切换,并重新建立连接后,断线时正在使用poll,pollLastAndOfferFirstTo或take方法的对象Redisson将自动再次为其订阅相关的话题。
相关文章:
Redisson—分布式集合
7.1. 映射(Map) 基于Redis的Redisson的分布式映射结构的RMap Java对象实现了java.util.concurrent.ConcurrentMap接口和java.util.Map接口。与HashMap不同的是,RMap保持了元素的插入顺序。该对象的最大容量受Redis限制,最大元素数…...
93、Redis 之 使用连接池管理Redis6.0以上的连接 及 消息的订阅与发布
★ 使用连接池管理Redis连接 从Redis 6.0开始,Redis可支持使用多线程来接收、处理客户端命令,因此应用程序可使用连接池来管理Redis连接。 上一章讲的是创建单个连接来操作redis数据库,这次使用连接池来操作redis数据库 Lettuce连接池 支持…...
doris动态分区开启历史分区
举例说明: CREATE TABLE tbl1 (k1 DATE,... ) PARTITION BY RANGE(k1) () DISTRIBUTED BY HASH(k1) PROPERTIES ("dynamic_partition.enable" "true","dynamic_partition.time_unit" "DAY","dynamic_partition.sta…...
Linux用户与权限(认知root用户、修改权限控制 - chmod、修改权限控制 - chown)
目录 1. 认知root用户 1.1 什么是root用户(超级管理员) 1.2 用户切换命令 1.3 sudo命令 1.3.1 为普通用户配置sudo认证 2. 用户、用户组管理 2.1 理解用户、用户组的概念 2.2 掌握用户、用户组管理的相关命令 2.2.1 用户组管理 2.2.2 …...
处理conda安装工具的动态库问题——解决记录 libssl.1.0.0 系统中所有openssl位置全览 whereis openssl
处理conda安装工具的动态库问题——解决记录 处理conda安装工具的动态库问题——解决记录 - 简书 解决libssl.so.1.0.0: cannot open shared object file: No such file or directory问题 - 简书 openssl 默认版本问题(Anaconda相关)_anaconda openssl-…...
如何在Go中格式化字符串
由于字符串通常由书面文本组成,在很多情况下,我们可能希望通过标点符号、换行和缩进来更好地控制字符串的外观,以使其更易于阅读。 在本教程中,我们将介绍一些处理go字符串的方法,以确保所有输出文本的格式正确。 字…...
C程序设计内容与例题讲解 -- 第四章--选择结构程序设计第二部分(第五版)谭浩强
前言:在前面我们学习了选择结构和条件判断,用if语句实现选择结构,关系运算符和关系表达式,逻辑运算符和逻辑表达式等知识。今天我们将接着上一篇未讲完的继续讲解。 鸡汤:种一棵树最好的时间是十年前,其次是现在!加油各…...
接雨水问题
接雨水问题 问题背景 LeetCode 42. 接雨水 接雨水问题是一个经典的计算雨水滞留量的问题,通常使用柱状图来表示不同高度的柱子。在下雨的情况下,柱子之间的凹陷部分能够存储雨水,问题的目标是计算这些柱子所能接收的雨水总量。 相关知识 …...
小谈设计模式(9)—工厂方法模式
小谈设计模式(9)—工厂方法模式 专栏介绍专栏地址专栏介绍 工厂方法模式角色分类抽象产品(Abstract Product)具体产品(Concrete Product)抽象工厂(Abstract Factory)具体工厂&#x…...
Android etc1tool之png图片转换pkm 和 zipalign简介
关于作者:CSDN内容合伙人、技术专家, 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 ,擅长java后端、移动开发、商业变现、人工智能等,希望大家多多支持。 目录 一、导读二、etc1tool2.1、用法 三、zipalign3.1 使用 四…...
Spring Boot快速入门:构建简单的Web应用
SpringBoot Spring Boot是一个用于简化Spring应用程序开发的框架,它通过提供开箱即用的配置和一组常用的功能,使得构建高效、可维护的应用变得非常容易。在本篇博客中,我们将一步步地介绍如何快速入门Spring Boot,并构建一个简单的…...
JAVA 泛型、序列化和复制
泛型提供了编译时类型安全检测机制,该机制允许程序员在编译时检测到非法的类型。泛型的本质是参数化类型,也就是说所操作的数据类型被指定为一个参数。比如我们要写一个排序方法,能够对整型数组、字符串数组甚至其他任何类型的数组进行排序&a…...
以太网基础学习(二)——ARP协议
一、什么是MAC地址 MAC地址(英语:Media Access Control Address),直译为媒体访问控制位址,也称为局域网地址(LAN Address),MAC位址,以太网地址(Ethernet Addr…...
【Java-LangChain:使用 ChatGPT API 搭建系统-4】评估输入-分类
第三章,评估输入-分类 如果您正在构建一个允许用户输入信息的系统,首先要确保人们在负责任地使用系统,以及他们没有试图以某种方式滥用系统,这是非常重要的。 在本章中,我们将介绍几种策略来实现这一目标。 我们将学习…...
嵌入式Linux应用开发-驱动大全-第一章同步与互斥③
嵌入式Linux应用开发-驱动大全-第一章同步与互斥③ 第一章 同步与互斥③1.4 Linux锁的介绍与使用1.4.1 锁的类型1.4.1.1 自旋锁1.4.1.2 睡眠锁 1.4.2 锁的内核函数1.4.2.1 自旋锁1.4.2.2 信号量1.4.2.3 互斥量1.4.2.4 semaphore和 mutex的区别 1.4.3 何时用何种锁1.4.4 内核抢占…...
树的存储结构以及树,二叉树,森林之间的转换
目录 1.双亲表示法 2.孩子链表 3.孩子兄弟表示法 4.树与二叉树的转换 (1)树转换为二叉树 (2)二叉树转换成树 5.二叉树与森林的转化 (1)森林转换为二叉树 以下树为例 1.双亲表示法 双亲表示法定义了…...
【AI视野·今日NLP 自然语言处理论文速览 第四十二期】Wed, 27 Sep 2023
AI视野今日CS.NLP 自然语言处理论文速览 Wed, 27 Sep 2023 Totally 50 papers 👉上期速览✈更多精彩请移步主页 Daily Computation and Language Papers Attention Satisfies: A Constraint-Satisfaction Lens on Factual Errors of Language Models Authors Mert …...
华为云云耀云服务器L实例评测|部署个人在线电子书库 calibre
华为云云耀云服务器L实例评测|部署个人在线电子书库 calibre 一、云耀云服务器L实例介绍1.1 云服务器介绍1.2 应用场景1.3 支持镜像 二、云耀云服务器L实例配置2.1 重置密码2.2 服务器连接2.3 安全组配置 三、部署 calibre3.1 calibre 介绍3.2 Docker 环境搭建3.3 c…...
代码随想录刷题 Day28
216.组合总和III 和前一个题一样,照着自己就能写出来,就多了一个判断结果是不是等于n的逻辑。有两个地方可以剪纸,一个是当和已经大于要找的时候直接返回,另一个是当剩余元素少于三个的时候直接返回(第一层递归是少于…...
【生命周期】
生命周期 1 引出生命周期2 分析生命周期3 总结生命周期 1 引出生命周期 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta …...
【C语言 模拟实现memcpy函数、memcpy函数】
C语言程序设计笔记---027 C语言之模拟实现memcpy函数、memcpy函数1、介绍memcpy函数1.1、模拟实现memcpy函数 2、介绍memmove函数2.1、模拟实现memmove函数 3、结语 C语言之模拟实现memcpy函数、memcpy函数 前言: 通过C语言内存函数的知识,这篇将对memc…...
opencv视频文件的读取,处理与保存
文章目录 opencv视频文件的读取,处理与保存一、视频文件的读取:1、cv::VideoCapture是OpenCV库中用于处理视频输入的类,它提供了一种简单的方法来从摄像头,视频文件、或图像序列中读取帧;(1)打开…...
java - 七大比较排序 - 详解
前言 本篇介绍了七大比较排序,直接插入排序,希尔排序,冒泡排序,堆排序,选择排序,快速排序,归并排序,一些简单思想代码实现,如有错误,请在评论区指正…...
项目集成七牛云存储sdk
以PHP为例 第一步:下载sdk PHP SDK_SDK 下载_对象存储 - 七牛开发者中心 sdk下载成功之后,将sdk放入项目中,目录选择以自己项目实际情况而定。 注意:在examples目录中有各种上传文件的参考示例,这里我们主要参考的是…...
docker-compose一键启动neo4j
下载镜像 docker pull neo4j:3.5.22-community 编写配置文件 参考文档 编写docker-compose.yml文件 version: "3"services:neo4j:image: neo4j:3.5.22-communitycontainer_name: neo4j restart: alwaysports:- 7474:7474- 7687:7687environment:- NEO4J_AUTH:ne…...
深入剖析@ConfigurationProperties注解
当我们构建Spring Boot应用程序时,配置属性通常是不可或缺的一部分。Spring Boot提供了多种方式来管理这些属性,其中之一是使用ConfigurationProperties注解。这篇博客将详细解释ConfigurationProperties注解以及如何使用它来管理和映射配置属性。 什么…...
北京开发APP需要多少钱
北京开发一个移动应用(APP)的费用因多种因素而异,包括项目的规模、复杂性、所需功能、设计要求、技术选择、开发团队的经验和地理位置等。一般来说,北京的APP开发费用通常较高,因为这是中国的主要技术和创新中心之一&a…...
self-attention、transformer、bert理解
参考李宏毅老师的视频 https://www.bilibili.com/video/BV1LP411b7zS?p2&spm_id_frompageDriver&vd_sourcec67a2725ac3ca01c38eb3916d221e708 一个输入,一个输出,未考虑输入之间的关系!!! self-attention…...
junit @ExcludePackages排除多个包
在JUnit中,可以使用ExcludePackages注解来排除多个包。该注解可以用在测试类或测试方法上。 如果要排除多个包,可以在ExcludePackages注解的value属性中使用数组来指定要排除的包名。例如,要排除包com.example.package1和com.example.packag…...
Explain执行计划字段解释说明---select_type、table、patitions字段说明
1、select_type的类型有哪些 2、select_type的查询类型说明 1、SIMPLE 简单的 select 查询,查询中不包含子查询或者UNION 2、PRIMARY 查询中若包含任何复杂的子部分,最外层查询则被标记为Primary 3、DERIVED 在FROM列表中包含的子查询被标记为DERIVED(衍生)&…...
响应式网站制作流程图/中国百强城市榜单
首先声明,默认jdk指我们安装完CentOS后系统自带jdk,自己下载安装的jdk只需要下载,解压即可,之后步骤与此文一致 1.查看我们默认jdk的位置 指令: [html] view plaincopy which java 我们去看一下发现是一个超链接&…...
权威的营销单页网站/个人博客网页设计html
转载于:https://www.cnblogs.com/fanBlog/p/9525312.html...
帮诈骗公司做网站/拓客软件哪个好用
T3用友通销售发货单审核时提示“xx单据审核失败,保存失败”T3用友通销售发货单审核时提示“xx单据审核失败,保存失败”在销售发货单审核时,系统提示“xx单据审核失败,单据体保存失败。vouchtype表有问题。vouchtype表有问题&#…...
cf刷枪网站怎么做的/开发一个app需要多少钱
转自:https://www.pinlue.com/article/2020/03/2703/5710058193135.html...
哪里有网站建设哪家好/seo报告
主要的加载顺序是 servletcontext-------->context-param---------->listener---->filter----->servlet 而同一个类别之间的实际情况调用顺序要根据鬼影的mapping的顺序进行调用。 转载于:https://www.cnblogs.com/mengzhongyunying/p/8668311.html...
黄页网站推广app软件/涟源网站seo
http://www.cnblogs.com/tianzhiliang/archive/2011/01/06/1927691.html...