当前位置: 首页 > news >正文

Redisson—分布式服务

一、 分布式远程服务(Remote Service)

基于Redis的Java分布式远程服务,可以用来通过共享接口执行存在于另一个Redisson实例里的对象方法。换句话说就是通过Redis实现了Java的远程过程调用(RPC)。分布式远程服务基于可以用POJO对象,方法的参数和返回类不受限制,可以是任何类型。

分布式远程服务(Remote Service)提供了两种类型的RRemoteService实例:

  • 服务端(远端)实例 - 用来执行远程方法(工作者实例即worker instance). 例如:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceImpl someServiceImpl = new SomeServiceImpl();// 在调用远程方法以前,应该首先注册远程服务
// 只注册了一个服务端工作者实例,只能同时执行一个并发调用
remoteService.register(SomeServiceInterface.class, someServiceImpl);// 注册了12个服务端工作者实例,可以同时执行12个并发调用
remoteService.register(SomeServiceInterface.class, someServiceImpl, 12);
  • 客户端(本地)实例 - 用来请求远程方法. 例如:
RRemoteService remoteService = redisson.getRemoteService();
SomeServiceInterface service = remoteService.get(SomeServiceInterface.class);String result = service.doSomeStuff(1L, "secondParam", new AnyParam());

客户端和服务端必须使用一样的共享接口,生成两者的Redisson实例必须采用相同的连接配置。客户端和服务端实例可以运行在同一个JVM里,也可以是不同的。客户端和服务端的数量不收限制。(注意:尽管Redisson不做任何限制,但是Redis的限制仍然有效。)

在服务端工作者可用实例数量 大于1 的时候,将并行执行并发调用的远程方法。

并行执行工作者数量计算方法如下: T = R * N

T - 并行执行工作者总数 R - Redisson服务端数量 N - 注册服务端时指定的执行工作者数量

超过该数量的并发请求将在列队中等候执行。

在服务端工作者实例可用数量为 1 时,远程过程调用将会按 顺序执行。这种情况下,每次只有一个请求将会被执行,其他请求将在列队中等候执行。

1. 分布式远程服务工作流程

分布式远程服务为每个注册接口建立了两个列队。一个列队用于请求,由服务端监听,另一个列队用于应答回执和结果回复,由客户端监听。应答回执用于判定该请求是否已经被接受。如果在指定的超时时间内没有被执行工作者执行将会抛出RemoteServiceAckTimeoutException错误。

下图描述了每次发起远程过程调用请求的工作流程。

2. 发送即不管(Fire-and-Forget)模式和应答回执(Ack-Response)模式

分布式远程服务通过org.redisson.core.RemoteInvocationOptions类,为每个远程过程调用提供了一些可配置选项。这些选项可以用来指定和修改请求超时和选择跳过应答回执或结果的发送模式。例如:

// 应答回执超时1秒钟,远程执行超时30秒钟
RemoteInvocationOptions options = RemoteInvocationOptions.defaults();// 无需应答回执,远程执行超时30秒钟
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck();// 应答回执超时1秒钟,不等待执行结果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noResult();// 应答回执超时1分钟,不等待执行结果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().expectAckWithin(1, TimeUnit.MINUTES).noResult();// 发送即不管(Fire-and-Forget)模式,无需应答回执,不等待结果
RemoteInvocationOptions options = RemoteInvocationOptions.defaults().noAck().noResult();RRemoteService remoteService = redisson.getRemoteService();
YourService service = remoteService.get(YourService.class, options);

3. 异步调用

远程过程调用也可以采用异步的方式执行。异步调用需要单独提交一个带有@RRemoteAsync注解(annotation)的异步接口类。异步接口方法签名必须与远程接口的方法签名相符。异步接口的返回类必须是org.redisson.api.RFuture对象或其子对象。在调用RRemoteService.get方法时将对异步接口的方法进行验证。异步接口无须包含所有的远程接口里的方法,只需要包含要求异步执行的方法即可。

// 远程接口
public interface RemoteInterface {Long someMethod1(Long param1, String param2);void someMethod2(MyObject param);MyObject someMethod3();}// 匹配远程接口的异步接口
@RRemoteAsync(RemoteInterface.class)
public interface RemoteInterfaceAsync {RFuture<Long> someMethod1(Long param1, String param2);RFuture<Void> someMethod2(MyObject param);}RRemoteService remoteService = redisson.getRemoteService();
RemoteInterfaceAsync asyncService = remoteService.get(RemoteInterfaceAsync.class);

4. 取消异步调用

通过调用Future.cancel()方法可以非常方便的取消一个异步调用。分布式远程服务允许在三个阶段中任何一个阶段取消异步调用:

  1. 远程调用请求在列队中排队阶段
  2. 远程调用请求已经被分布式远程服务接受,还未发送应答回执,执行尚未开始。
  3. 远程调用请求已经在执行阶段

想要正确的处理第三个阶段,在服务端代码里应该检查Thread.currentThread().isInterrupted()的返回状态。范例如下:

// 远程接口
public interface MyRemoteInterface {Long myBusyMethod(Long param1, String param2);}// 匹配远程接口的异步接口
@RRemoteAsync(MyRemoteInterface.class)
public interface MyRemoteInterfaceAsync {RFuture<Long> myBusyMethod(Long param1, String param2);}// 远程接口的实现
public class MyRemoteServiceImpl implements MyRemoteInterface {public Long myBusyMethod(Long param1, String param2) {for (long i = 0; i < Long.MAX_VALUE; i++) {iterations.incrementAndGet();if (Thread.currentThread().isInterrupted()) {System.out.println("interrupted! " + i);return;}}}}RRemoteService remoteService = redisson.getRemoteService();
ExecutorService executor = Executors.newFixedThreadPool(5);
// 注册远程服务的服务端的同时,通过单独指定的ExecutorService来配置执行线程池
MyRemoteInterface serviceImpl = new MyRemoteServiceImpl();
remoteService.register(MyRemoteInterface.class, serviceImpl, 5, executor);// 异步调用方法
MyRemoteInterfaceAsync asyncService = remoteService.get(MyRemoteInterfaceAsync.class);
RFuture<Long> future = asyncService.myBusyMethod(1L, "someparam");
// 取消异步调用
future.cancel(true);

二、分布式实时对象(Live Object)服务

1. 介绍

一个 分布式实时对象(Live Object) 可以被理解为一个功能强化后的Java对象。该对象不仅可以被一个JVM里的各个线程相引用,还可以被多个位于不同JVM里的线程同时引用。Wikipedia对这种特殊对象的概述是:

Live distributed object (also abbreviated as live object) refers to a running instance of a distributed multi-party (or peer-to-peer) protocol, viewed from the object-oriented perspective, as an entity that has a distinct identity, may encapsulate internal state and threads of execution, and that exhibits a well-defined externally visible behavior.

Redisson分布式实时对象(Redisson Live Object,简称RLO)运用即时生成的代理类(Proxy),将一个指定的普通Java类里的所有字段,以及针对这些字段的操作全部映射到一个Redis Hash的数据结构,实现这种理念。每个字段的get和set方法最终被转译为针对同一个Redis Hash的hget和hset命令,从而使所有连接到同一个Redis节点的所有可以客户端同时对一个指定的对象进行操作。众所周知,一个对象的状态是由其内部的字段所赋的值来体现的,通过将这些值保存在一个像Redis这样的远程共享的空间的过程,把这个对象强化成了一个分布式对象。这个分布式对象就叫做Redisson分布式实时对象(Redisson Live Object,简称RLO)。

通过使用RLO,运行在不同服务器里的多个程序之间,共享一个对象实例变得和在单机程序里共享一个对象实例一样了。同时还避免了针对任何一个字段操作都需要将整个对象序列化和反序列化的繁琐,进而降低了程序开发的复杂性和其数据模型的复杂性:从任何一个客户端修改一个字段的值,处在其他服务器上的客户端(几乎^)即刻便能查看到。而且实现代码与单机程序代码无异。(^连接到从节点的客户端仍然受Redis的最终一致性的特性限制)

鉴于Redis是一个单线程的程序,针对实时对象的所有的字段操作可以理解为全部是原子性操作,也就是说在读取一个字段的过程不会担心被其他线程所修改。

通过使用RLO,可以把Redis当作一个允许被多个JVM同时操作且不受GC影响的共享堆(Heap Space)。

2. 使用方法

Redisson为分布式实时对象提供了一系列不同功能的注解,其中@REntity和@RId两个注解是分布式实时对象的必要条件。

@REntity
public class MyObject {@RIdprivate String id;@RIndexprivate String value;private MyObject parent;public MyObject(String id) {this.id = id;}public MyObject() {}// getters and setters}

在开始使用分布式实时对象以前,需要先通过Redisson服务将指定的对象连接(attach),合并(merge)或持久化(persist)到Redis里。

RLiveObjectService service = redisson.getLiveObjectService();
MyLiveObject myObject = new MyLiveObject();
myObject.setId("1");
// 将myObject对象当前的状态持久化到Redis里并与之保持同步。
myObject = service.persist(myObject);MyLiveObject myObject = new MyLiveObject("1");
// 抛弃myObject对象当前的状态,并与Redis里的数据建立连接并保持同步。
myObject = service.attach(myObject);MyLiveObject myObject = new MyLiveObject();
myObject.setId("1");
// 将myObject对象当前的状态与Redis里的数据合并之后与之保持同步。
myObject = service.merge(myObject);
myObject.setValue("somevalue");// 通过ID获取分布式实时对象
MyLiveObject myObject = service.get(MyLiveObject.class, "1");// 通过索引查找分布式实时对象
Collection<MyLiveObject> myObjects = service.find(MyLiveObject.class, Conditions.in("value", "somevalue", "somevalue2"));Collection<MyLiveObject> myObjects = service.find(MyLiveObject.class, Conditions.and(Conditions.in("value", "somevalue", "somevalue2"), Conditions.eq("secondfield", "test")));

“parent”字段中包含了指向到另一个分布式实时对象的引用,它可以与包含类是同一类型也可以不同。Redisson内部采用了与Java的引用类似的方式保存这个关系,而非将全部对象序列化,可视为与普通的引用同等效果。

//RLO对象:
MyObject myObject = service.get(MyObject.class, "1");
MyObject myParentObject = service.get(MyObject.class, "2");
myObject.setValue(myParentObject);

RLO的字段类型基本上无限制,可以是任何类型。比如Java util包里的集合类,Map类等,也可以是自定义的对象。只要指定的编码解码器能够对其进行编码和解码操作便可。关于编码解码器的详细信息请查阅高级使用方法章节。

尽管RLO的字段类型基本上无限制,个别类型还是受限。注解了RId的字段类型不能是数组类(Array),比如int[],long[],double[],byte[]等等。更多关于限制有关的介绍和原理解释请查阅使用限制 章节。

为了保证RLO的用法和普通Java对象的用法尽可能一直,Redisson分布式实时对象服务自动将以下普通Java对象转换成与之匹配的Redisson分布式对象RObject。

普通Java类

转换后的Redisson类

SortedSet.class

RedissonSortedSet.class

Set.class

RedissonSet.class

ConcurrentMap.class

RedissonMap.class

Map.class

RedissonMap.class

BlockingDeque.class

RedissonBlockingDeque.class

Deque.class

RedissonDeque.class

BlockingQueue.class

RedissonBlockingQueue.class

Queue.class

RedissonQueue.class

List.class

RedissonList.class

类型转换将按照从上至下的顺序匹配类型,例如LinkedList类同时实现了Deque,List和Queue,由于Deque排在靠上的位置,因此它将会被转换成一个RedissonDeque类型。

Redisson的分布式对象也采用类似的方式,将自身的状态储存于Redis当中,(几乎^)所有的状态改变都直接映射到Redis里,不在本地JVM中保留任何赋值。(^本地缓存对象除外,比如RLocalCachedMap)

3. 高级使用方法

正如上述介绍,RLO类其实都是按需实时生成的代理(Proxy)类。生成的代理类和原类都一同缓存Redisson实例里。这个过程会消耗一些时间,在对耗时比较敏感的情况下,建议通过RedissonLiveObjectService提前注册所有的RLO类。这个服务也可以用来注销不再需要的RLO类,也可以用来查询一个类是否已经注册了。

RLiveObjectService service = redisson.getLiveObjectService();
service.registerClass(MyClass.class);
service.unregisterClass(MyClass.class);
Boolean registered = service.isClassRegistered(MyClass.class);

4. 注解(Annotation)使用方法

@REntity

仅适用于类。通过指定@REntity的各个参数,可以详细的对每个RLO类实现特殊定制,以达到改变RLO对象的行为。

  • namingScheme - 命名方案。命名方案规定了每个实例在Redis中对应key的名称。它不仅被用来与已存在的RLO建立关联,还被用来储存新建的RLO实例。默认采用Redisson自带的DefaultNamingScheme对象。
  • codec - 编码解码器。在运行当中,Redisson用编码解码器来对RLO中的每个字段进行编码解码。Redisson内部采用了实例池管理不同类型的编码解码器实例。Redisson提供了多种不同的编码解码器,默认使用JsonJacksonCodec。
  • fieldTransformation - 字段转换模式。如上所述,为了尽可能的保证RLO的用法和普通Java对象一致,Redisson会自动将常用的普通Java对象转换成与其匹配的Redisson分布式对象。这是由于字段转换模式的默认值是ANNOTATION_BASED,修改为IMPLEMENTATION_BASED就可以不转换。

@RId

仅适用于字段。@RId注解只能用在具备区分实例的字段上,这类字段可以理解为一个类的id字段或主键字段。这个字段的值将被命名方案namingScheme用来与事先存在的RLO建立引用。加了该注解的字段是唯一在本地JVM里同时保存赋值的字段。一个类只能有一个字段包含@RId注解。

可以通过指定一个生成器generator策略来实现自动生成这个字段的值。默认不提供生成器。

@RIndex

仅适用于字段。用来指定可用于搜索的字段。可以通过RLiveObjectService.find方法来根据条件精细查找分布式实时对象。查询条件可以是含(IN),或(OR),和(AND)或相等(EQ)以及它们的任意组合。

使用范例如下:

public class MyObject {@RIndexString field1;@RIndexString field2;@RIndexString field3;
}Collection<MyObject> objects = RLiveObjectService.find(MyObject.class, Conditions.or(Conditions.and(Conditions.eq("field1", "value"), Conditions.eq("field2", "value")), Conditions.in("field3", "value1", "value2"));

@RObjectField

仅适用于字段。允许通过该注解中的namingScheme或codec来改变该字段的命名或编码方式,用来区别于@REntity中指定的预设方式。

@RCascade

仅适用于字段。用来指定包含于分布式实时对象字段内其它对象的级联操作方式。

可选的级联操作方式为如下:

RCascadeType.ALL - 执行所有级联操作
RCascadeType.PERSIST - 仅在执行RLiveObjectService.persist()方法时进行级联操作 RCascadeType.DETACH - 仅在执行RLiveObjectService.detach()方法时进行级联操作 RCascadeType.MERGE - 仅在执行RLiveObjectService.merge()方法时进行级联操作 RCascadeType.DELETE - 仅在执行RLiveObjectService.delete()方法时进行级联操作

5. 使用限制

如上所述,带有RId注解字段的类型不能使数组类,这是因为目前默认的命名方案类DefaultNamingScheme还不能正确地将数组类序列化和反序列化。在改善了DefaultNamingScheme类的不足以后会考虑取消这个限制。另外由于带有RId注解的字段是用来指定Redis中映射的key的名称,因此组建一个只含有唯一一个字段的RLO类是毫无意义的。选用RBucket会更适合这样的场景。

三、分布式执行服务(Executor Service)

1. 分布式执行服务概述

Redisson的分布式执行服务实现了java.util.concurrent.ExecutorService接口,支持在不同的独立节点里执行基于java.util.concurrent.Callable接口或java.lang.Runnable接口或Lambda的任务。这样的任务也可以通过使用Redisson实例,实现对储存在Redis里的数据进行操作。Redisson分布式执行服务是最快速和有效执行分布式运算的方法。

2. 任务

Redisson独立节点不要求任务的类在类路径里。他们会自动被Redisson独立节点的ClassLoader加载。因此每次执行一个新任务时,不需要重启Redisson独立节点。

采用Callable任务的范例:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;for (Integer value : map.values()) {result += value;}return result;}}

采用Runnable任务的范例:

public class RunnableTask implements Runnable {@RInjectprivate RedissonClient redissonClient;private long param;public RunnableTask() {}public RunnableTask(long param) {this.param = param;}@Overridepublic void run() {RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic");atomic.addAndGet(param);}}

在创建ExecutorService时可以配置以下参数:

ExecutorOptions options = ExecutorOptions.defaults()// 指定重新尝试执行任务的时间间隔。
// ExecutorService的工作节点将等待10分钟后重新尝试执行任务
//
// 设定为0则不进行重试
//
// 默认值为5分钟
options.taskRetryInterval(10, TimeUnit.MINUTES);
RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
executorService.submit(new RunnableTask(123));RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
Future<Long> future = executorService.submit(new CallableTask());
Long result = future.get();

使用Lambda任务的范例:

RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
Future<Long> future = executorService.submit((Callable & Serializable)() -> {System.out.println("task has been executed!");
});
Long result = future.get();

可以通过@RInject注解来为任务实时注入Redisson实例依赖。

3. 取消任务

通过Future.cancel()方法可以很方便的取消所有已提交的任务。通过对Thread.currentThread().isInterrupted()方法的调用可以在已经处于运行状态的任务里实现任务中断:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;// map里包含了许多的元素for (Integer value : map.values()) {if (Thread.currentThread().isInterrupted()) {// 任务被取消了return null;}result += value;}return result;}}RExecutorService executorService = redisson.getExecutorService("myExecutor");
Future<Long> future = executorService.submit(new CallableTask());
// 或
RFuture<Long> future = executorService.submitAsync(new CallableTask());
// ...
future.cancel(true);

四、 分布式调度任务服务(Scheduler Service)

1. 分布式调度任务服务概述

Redisson的分布式调度任务服务实现了java.util.concurrent.ScheduledExecutorService接口,支持在不同的独立节点里执行基于java.util.concurrent.Callable接口或java.lang.Runnable接口的任务。Redisson独立节点按顺序运行Redis列队里的任务。调度任务是一种需要在未来某个指定时间运行一次或多次的特殊任务。

2. 设定任务计划

Redisson独立节点不要求任务的类在类路径里。他们会自动被Redisson独立节点的ClassLoader加载。因此每次执行一个新任务时,不需要重启Redisson独立节点。

采用Callable任务的范例:

public class CallableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;for (Integer value : map.values()) {result += value;}return result;}}

在创建ExecutorService时可以配置以下参数:

ExecutorOptions options = ExecutorOptions.defaults()// 指定重新尝试执行任务的时间间隔。
// ExecutorService的工作节点将等待10分钟后重新尝试执行任务
//
// 设定为0则不进行重试
//
// 默认值为5分钟
options.taskRetryInterval(10, TimeUnit.MINUTES);
RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<Long> future = executorService.schedule(new CallableTask(), 10, TimeUnit.MINUTES);
Long result = future.get();

使用Lambda任务的范例:

RExecutorService executorService = redisson.getExecutorService("myExecutor", options);
ScheduledFuture<Long> future = executorService.schedule((Callable & Serializable)() -> {System.out.println("task has been executed!");
}, 10, TimeUnit.MINUTES);
Long result = future.get();

采用Runnable任务的范例:

public class RunnableTask implements Runnable {@RInjectprivate RedissonClient redissonClient;private long param;public RunnableTask() {}public RunnableTask(long param) {this.param= param;}@Overridepublic void run() {RAtomicLong atomic = redissonClient.getAtomicLong("myAtomic");atomic.addAndGet(param);}}RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
ScheduledFuture<?> future1 = executorService.schedule(new RunnableTask(123), 10, TimeUnit.HOURS);
// ...
ScheduledFuture<?> future2 = executorService.scheduleAtFixedRate(new RunnableTask(123), 10, 25, TimeUnit.HOURS);
// ...
ScheduledFuture<?> future3 = executorService.scheduleWithFixedDelay(new RunnableTask(123), 5, 10, TimeUnit.HOURS);

3. 通过CRON表达式设定任务计划

在分布式调度任务中,可以通过CRON表达式来为任务设定一个更复杂的计划。表达式与Quartz的CRON格式完全兼容。

例如:

RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
executorService.schedule(new RunnableTask(), CronSchedule.of("10 0/5 * * * ?"));
// ...
executorService.schedule(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));
// ...
executorService.schedule(new RunnableTask(), CronSchedule.weeklyOnDayAndHourAndMinute(12, 4, Calendar.MONDAY, Calendar.FRIDAY));

4. 取消计划任务

分布式调度任务服务提供了两张取消任务的方式:通过调用ScheduledFuture.cancel()方法或调用RScheduledExecutorService.cancelScheduledTask方法。通过对Thread.currentThread().isInterrupted()方法的调用可以在已经处于运行状态的任务里实现任务中断:

public class RunnableTask implements Callable<Long> {@RInjectprivate RedissonClient redissonClient;@Overridepublic Long call() throws Exception {RMap<String, Integer> map = redissonClient.getMap("myMap");Long result = 0;// map里包含了许多的元素for (Integer value : map.values()) {if (Thread.currentThread().isInterrupted()) {// 任务被取消了return null;}result += value;}return result;}}RScheduledExecutorService executorService = redisson.getExecutorService("myExecutor");
RScheduledFuture<Long> future = executorService.scheduleAsync(new RunnableTask(), CronSchedule.dailyAtHourAndMinute(10, 5));
// ...
future.cancel(true);
// 或
String taskId = future.getTaskId();
// ...
executorService.cancelScheduledTask(taskId);

五、分布式映射归纳服务(MapReduce)

1. 介绍

Redisson提供了通过映射归纳(MapReduce)编程模式来处理储存在Redis环境里的大量数据的服务。这个想法来至于其他的类似实现方式和谷歌发表的研究。所有 映射(Map)归纳(Reduce) 阶段中的任务都是被分配到各个独立节点(Redisson Node)里并行执行的。以下所有接口均支持映射归纳(MapReduce)功能: RMap、 RMapCache、 RLocalCachedMap、 RSet、 RSetCache、 RList、 RSortedSet、 RScoredSortedSet、 RQueue、 RBlockingQueue、 RDeque、 RBlockingDeque、 RPriorityQueue 和 RPriorityDeque

映射归纳(MapReduce)的功能是通过RMapper、 RCollectionMapper、 RReducer 和 RCollator 这几个接口实现的。

1.RMapper映射器接口适用于映射(Map)类,它用来把映射(Map)中的每个元素转换为另一个作为归纳(Reduce)处理用的键值对。

public interface RMapper<KIn, VIn, KOut, VOut> extends Serializable {void map(KIn key, VIn value, RCollector<KOut, VOut> collector);}

2.RCollectionMapper 映射器接口仅适用于集合(Collection)类型的对象,它用来把集合(Collection)中的元素转换成一组作为归纳(Reduce)处理用的键值对。

public interface RCollectionMapper<VIn, KOut, VOut> extends Serializable {void map(VIn value, RCollector<KOut, VOut> collector);}

3.RReducer归纳器接口用来将上面这些,由映射器生成的键值对列表进行归纳整理。

public interface RReducer<K, V> extends Serializable {V reduce(K reducedKey, Iterator<V> values);}

4.RCollator收集器接口用来把归纳整理以后的结果化简为单一一个对象。

public interface RCollator<K, V, R> extends Serializable {R collate(Map<K, V> resultMap);}

以上每个阶段的任务都可以用@RInject注解的方式来获取RedissonClient实例:

    public class WordMapper implements RMapper<String, String, String, Integer> {@RInjectprivate RedissonClient redissonClient;@Overridepublic void map(String key, String value, RCollector<String, Integer> collector) {// ...redissonClient.getAtomicLong("mapInvocations").incrementAndGet();}}

2. 映射(Map)类型的使用范例

Redisson提供的RMap、 RMapCache和RLocalCachedMap这三种映射(Map)类型的对象均可以使用这种分布式映射归纳(MapReduce)服务。

以下是在映射(Map)类型的基础上采用映射归纳(MapReduce)来实现字数统计的范例:

  1. Mapper对象将每个映射的值用空格且分开。
    public class WordMapper implements RMapper<String, String, String, Integer> {@Overridepublic void map(String key, String value, RCollector<String, Integer> collector) {String[] words = value.split("[^a-zA-Z]");for (String word : words) {collector.emit(word, 1);}}}
  1. Reducer对象计算统计所有单词的使用情况。
    public class WordReducer implements RReducer<String, Integer> {@Overridepublic Integer reduce(String reducedKey, Iterator<Integer> iter) {int sum = 0;while (iter.hasNext()) {Integer i = (Integer) iter.next();sum += i;}return sum;}}
  1. Collator对象统计所有单词的使用情况。
    public class WordCollator implements RCollator<String, Integer, Integer> {@Overridepublic Integer collate(Map<String, Integer> resultMap) {int result = 0;for (Integer count : resultMap.values()) {result += count;}return result;}}
  1. 把上面的各个对象串起来使用:
    RMap<String, String> map = redisson.getMap("wordsMap");map.put("line1", "Alice was beginning to get very tired");map.put("line2", "of sitting by her sister on the bank and");map.put("line3", "of having nothing to do once or twice she");map.put("line4", "had peeped into the book her sister was reading");map.put("line5", "but it had no pictures or conversations in it");map.put("line6", "and what is the use of a book");map.put("line7", "thought Alice without pictures or conversation");RMapReduce<String, String, String, Integer> mapReduce= map.<String, Integer>mapReduce().mapper(new WordMapper()).reducer(new WordReducer());// 统计词频Map<String, Integer> mapToNumber = mapReduce.execute();// 统计字数Integer totalWordsAmount = mapReduce.execute(new WordCollator());

3. 集合(Collection)类型的使用范例

Redisson提供的RSet、 RSetCache、 RList、 RSortedSet、 RScoredSortedSet、 RQueue、 RBlockingQueue、 RDeque、 RBlockingDeque、 RPriorityQueue和RPriorityDeque这几种集合(Collection)类型的对象均可以使用这种分布式映射归纳(MapReduce)服务。

以下是在集合(Collection)类型的基础上采用映射归纳(MapReduce)来实现字数统计的范例:

    public class WordMapper implements RCollectionMapper<String, String, Integer> {@Overridepublic void map(String value, RCollector<String, Integer> collector) {String[] words = value.split("[^a-zA-Z]");for (String word : words) {collector.emit(word, 1);}}}
    public class WordReducer implements RReducer<String, Integer> {@Overridepublic Integer reduce(String reducedKey, Iterator<Integer> iter) {int sum = 0;while (iter.hasNext()) {Integer i = (Integer) iter.next();sum += i;}return sum;}}
    public class WordCollator implements RCollator<String, Integer, Integer> {@Overridepublic Integer collate(Map<String, Integer> resultMap) {int result = 0;for (Integer count : resultMap.values()) {result += count;}return result;}}
    RList<String> list = redisson.getList("myList");list.add("Alice was beginning to get very tired");list.add("of sitting by her sister on the bank and");list.add("of having nothing to do once or twice she");list.add("had peeped into the book her sister was reading");list.add("but it had no pictures or conversations in it");list.add("and what is the use of a book");list.add("thought Alice without pictures or conversation");RCollectionMapReduce<String, String, Integer> mapReduce= list.<String, Integer>mapReduce().mapper(new WordMapper()).reducer(new WordReducer());// 统计词频Map<String, Integer> mapToNumber = mapReduce.execute();// 统计字数Integer totalWordsAmount = mapReduce.execute(new WordCollator());

相关文章:

Redisson—分布式服务

一、 分布式远程服务&#xff08;Remote Service&#xff09; 基于Redis的Java分布式远程服务&#xff0c;可以用来通过共享接口执行存在于另一个Redisson实例里的对象方法。换句话说就是通过Redis实现了Java的远程过程调用&#xff08;RPC&#xff09;。分布式远程服务基于可…...

volatile使用方法

volatile使用方法 编译优化。使用等级3的话&#xff0c;可能将优化了一些变量。 这为什么会开启等第三呢&#xff1f;这是关于单片机的内存容量比较小&#xff0c;所以开启优化的话&#xff0c;可以可以省一些空间&#xff0c;但是如果。会出现些变量的问题&#xff0c;需要通过…...

提升您的 Go 应用性能的 6 种方法

优化您的 Go 应用程序 1. 如果您的应用程序在 Kubernetes 中运行&#xff0c;请自动设置 GOMAXPROCS 以匹配 Linux 容器的 CPU 配额 Go 调度器 可以具有与运行设备的核心数量一样多的线程。由于我们的应用程序在 Kubernetes 环境中的节点上运行&#xff0c;当我们的 Go 应用程…...

计算摄像技术02 - 颜色空间

一些计算摄像技术知识内容的整理&#xff1a;颜色视觉与感知特性、颜色空间和基于彩色滤镜阵列的彩色感知。 文章目录 一、颜色视觉与感知特性 &#xff08;1&#xff09;色调 &#xff08;2&#xff09;饱和度 &#xff08;3&#xff09;明度 二、颜色空间 &#xff08;1&…...

Pytorch笔记之分类

文章目录 前言一、导入库二、数据处理三、构建模型四、迭代训练五、模型评估总结 前言 使用Pytorch进行MNIST分类&#xff0c;使用TensorDataset与DataLoader封装、加载本地数据集。 一、导入库 import numpy as np import torch from torch import nn, optim from torch.uti…...

【目标检测】——PE-YOLO精读

yolo&#xff0c;暗光目标检测 论文&#xff1a;PE-YOLO 1. 简介 卷积神经网络&#xff08;CNNs&#xff09;在近年来如何推动了物体检测的发展。许多检测器已经被提出&#xff0c;而且在许多基准数据集上的性能正在不断提高。然而&#xff0c;大多数现有的检测器都是在正常条…...

Java 数组转集合

数组转集合 如果仅仅这样转化Arrays.asList(数组)&#xff0c;导致集合只能查询&#xff0c;无法进行其他操作&#xff0c;因此&#xff0c;对该方法进行优化&#xff1a; List<实体> list1 new ArrayList<>(Arrays.asList(数组))以上方法就可以使用集合的所有操…...

Elasticsearch:ES|QL 查询语言简介

警告&#xff1a;此功能处于技术预览阶段&#xff0c;可能会在未来版本中更改或删除。 Elastic 将尽最大努力解决任何问题&#xff0c;但技术预览版中的功能不受官方 GA 功能的支持 SLA 的约束。在目前的 Elastic Stack 8.10 中此功能还没有提供。 Elasticsearch 查询语言 (ES|…...

qt qml中listview出现卡顿情况时的常用处理方法

如果在qt QML中使用ListView时出现卡顿情况&#xff0c;可能是因为渲染大量的数据或者在模型中进行复杂的数据处理。以下是常用的解决方法&#xff1a; 1. 设置ListView的缓存策略&#xff1a;通过设置ListView的cacheBuffer属性为适当的值&#xff0c;可以提高滚动的流畅性。…...

Elasticsearch基础操作演示总结

一、索引操作 &#xff08;一&#xff09;创建索引 创建Elasticsearch&#xff08;ES&#xff09;索引是在ES中存储和管理数据的重要操作之一。索引是用于组织和检索文档的结构化数据存储。 当创建Elasticsearch索引时&#xff0c;通常需要同时指定索引的设置&#xff08;Se…...

Spring 作用域解析器AnnotationScopeMetadataResolver

博主介绍&#xff1a;✌全网粉丝近5W&#xff0c;全栈开发工程师&#xff0c;从事多年软件开发&#xff0c;在大厂呆过。持有软件中级、六级等证书。可提供微服务项目搭建与毕业项目实战&#xff0c;博主也曾写过优秀论文&#xff0c;查重率极低&#xff0c;在这方面有丰富的经…...

如何发布一个 NPM 包

首先初始化: npm init 文件夹结构 .gitignore Git 库忽略文件清单.npmignore 不包括在 npm 注册库中的文件清单LECENSE 模块的授权文件README.md 说明文档bin 保存模块可执行文件的文件夹doc 保存模块文档的文件夹example 保存模块实际示例lib 保存模块代码man 保存模块的手册…...

Flask小项目教程(含MySQL与前端部分)

CONTENTS 1. 环境配置2. 快速搭建Flask应用程序 1. 环境配置 首先我们在项目的根目录下创建一个 Python 虚拟环境&#xff0c;打开命令行输入以下指令&#xff1a; python -m venv venv启动虚拟环境&#xff1a; .\venv\Scripts\Activate.ps1如果遇到报错&#xff1a;.\venv…...

Eureka

大家好我是苏麟今天带来Eureka的使用 . 提供者和消费者 在服务调用关系中&#xff0c;会有两个不同的角色&#xff1a; 服务提供者&#xff1a;一次业务中&#xff0c;被其它微服务调用的服务。&#xff08;提供接口给其它微服务&#xff09; 服务消费者&#xff1a;一次业务…...

STM32G070RBT6-MCU温度测量(ADC)

1、借助STM32CubeMX生成系统及外设相关初始化代码。 在以上配置后就可以生成相关初始化代码了。 /* ADC1 init function */ void MX_ADC1_Init(void) {/* USER CODE BEGIN ADC1_Init 0 *//* USER CODE END ADC1_Init 0 */ADC_ChannelConfTypeDef sConfig {0};/* USER COD…...

数据结构之带头双向循环链表

目录 链表的分类 带头双向循环链表的实现 带头双向循环链表的结构 带头双向循环链表的结构示意图 空链表结构示意图 单结点链表结构示意图 多结点链表结构示意图 链表创建结点 双向链表初始化 销毁双向链表 打印双向链表 双向链表尾插 尾插函数测试 双向链表头插 …...

adb详细教程(四)-使用adb启动应用、关闭应用、清空应用数据、获取设备已安装应用列表

adb对于安卓移动端来说&#xff0c;是个非常重要的调试工具。本篇介绍常用的adb指令 文章目录 一、启动应用&#xff1a;adb shell am start二、使用浏览器打开指定网址&#xff1a;adb shell am start三、杀死应用进程adb shell am force-stop/adb shell am kill四、删除应用所…...

【Spring Boot】日志文件

日志文件 一. 日志文件有什么用二. 日志怎么用三. ⾃定义⽇志打印1. 在程序中得到⽇志对象2. 使⽤⽇志对象打印⽇志3. ⽇志格式说明 四. 日志级别1. ⽇志级别有什么⽤2. ⽇志级别的分类与使⽤ 五. 日志持久化六. 更简单的⽇志输出—lombok1. 添加 lombok 依赖2. 输出⽇志3. lom…...

图像处理与计算机视觉--第五章-图像分割-Canny算子

文章目录 1.边缘检测算子分类2.Canny算子核心理论2.1.Canny算子简单介绍2.2.Canny算子边缘检测指标2.3.Canny算子基本原理 3.Canny算子处理流程3.1.高斯滤波去噪声化3.2.图像梯度搜寻3.3.非极大值抑制处理3.4.双阈值边界处理3.5.边界滞后技术跟踪3.6.Canny算子边缘检测的特点 4…...

LabVIEW开发教学实验室自动化INL和DNL测试系统

LabVIEW开发教学实验室自动化INL和DNL测试系统 如今&#xff0c;几乎所有的测量仪器都是基于微处理器的设备。模拟输入量在进行数字处理之前被转换为数字量。对于参加电气和电子测量课程的学生来说&#xff0c;了解ADC以及如何欣赏其性能至关重要。ADC的不确定性可以根据其传输…...

数据结构: 数组与链表

目录 1 数组 1.1 数组常用操作 1. 初始化数组 2. 访问元素 3. 插入元素 4. 删除元素 5. 遍历数组 6. 查找元素 7. 扩容数组 1.2 数组优点与局限性 1.3 数组典型应用 2 链表 2.1 链表常用操作 1. 初始化链表 2. 插入节点 3. 删除…...

unity 控制玩家物体

创建场景 放上一个plane&#xff0c;放上一个球 sphere&#xff0c;假定我们的球就是我们的玩家&#xff0c;使用控制键w a s d 来控制球也就是玩家移动。增加一个材质&#xff0c;把颜色改成绿色&#xff0c;把材质赋给plane&#xff0c;区分我们增加的白球。 增加组件和脚…...

指数分布优化器(EDO)(含MATLAB代码)

先做一个声明&#xff1a;文章是由我的个人公众号中的推送直接复制粘贴而来&#xff0c;因此对智能优化算法感兴趣的朋友&#xff0c;可关注我的个人公众号&#xff1a;启发式算法讨论。我会不定期在公众号里分享不同的智能优化算法&#xff0c;经典的&#xff0c;或者是近几年…...

Java 时间的加减处理

时间的加减处理 Date date new Date(操作时间&#xff08;类型Date&#xff09;-(60000*60*1));600001分钟 60000*60*1 1小时...

基于A4988/DRV8825的四路步进电机驱动器

概述 简化板的CNC sheild V3.0&#xff0c;仅保留步进电机速度与方向的控制引脚STEP/DIR、使能端EN、芯片供电VCC\GND&#xff0c;共计11个引脚。PCB四周开设四个M3通孔&#xff0c;以便于安装固定。此外&#xff0c;将板载的焊死的保险丝更改为可更换的保险座保险丝&#xff…...

万字总结网络原理

目录 一、网络基础 1.1认识IP地址 1.2子网掩码 1.3认识MAC地址 1.4一跳一跳的网络数据传输 1.5总结IP地址和MAC地址 二、网络设备及相关技术 2.1集线器:转发所有端口 2.2交换机:MAC地址转换表+转发对应端口 2.3主机:网络分层从上到下封装 2.4主机&路由器:ARP…...

【AI视野·今日CV 计算机视觉论文速览 第262期】Fri, 6 Oct 2023

AI视野今日CS.CV 计算机视觉论文速览 Fri, 6 Oct 2023 Totally 73 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Computer Vision Papers Improved Baselines with Visual Instruction Tuning Authors Haotian Liu, Chunyuan Li, Yuheng Li, Yong Jae Lee大型多模…...

一文搞懂Jenkins持续集成解决的是什么问题

1、持续集成的定义 大师 Martin Fowler 是这样定义持续集成的: 持续集成是一种软件开发实战, 即团队开发成员经常集成他们的工作. 通常, 每个成员每天至少集成一次, 也就意味着每天可能发生多次集成. 持续集成并不能消除Bug, 而是让它们非常容易发现和改正. 根据对项目实战的理…...

微信小程序去除默认滚动条展示

一、微信小程序改版框架升级后&#xff0c;滚动条默认展示了。 在实际应用中效果不好&#xff0c;如果想默认隐藏掉&#xff0c;代码段如下&#xff1a; /* 去除默认滚动条效果 */ ::-webkit-scrollbar {display:none;width:0;height:0;color:transparent; } 设置成全局样式…...

3.02 创建订单操作详细-订单创建与回滚 (创建订单操作详细)

步骤1&#xff1a; 创建orders订单表,子订单表和订单状态表对应的pojo和mappperOrders和OrderItemsMapperOrderItems和OrderItemsMapperOrderStatus和OrderStatusMapper步骤2&#xff1a;创建OrderService和对应的实现类 public interface OrderService {/*** 用于创建订单相关…...

“一个”网站/宁波seo公司推荐

我争取过一切&#xff0c;看淡了世事&#xff0c;放下了执念。 我心静如水&#xff0c;人事物态都扰不了我心。 曾想过探求世上的一切本质&#xff0c;以及大道规则。 过去&#xff0c;我自命天纵神武&#xff0c;命运要我改变世界。 如今&#xff0c;低调低调&#xff0c;…...

影视公司网站模板/北京百度推广代理公司

目录简介安装初次使用如何创建一个类图泛化&#xff08;Generalization&#xff09;实现&#xff08;Realization&#xff09;关联&#xff08;Association)聚合&#xff08;Aggregation&#xff09;组合(Composition)依赖(Dependency)简介 所有的面向对象&#xff08;Object …...

建设银行网站储蓄账户查询密码/域名是什么意思

思维导图 文字版&#xff1a; 5 结构化架构&#xff1a;游戏AI开发的常用技巧 本章目的 让读者对游戏AI的全局框架有充分的了解&#xff0c;并针对开发问题提供思路和解决方案 不会说深入具体方法和技术细节&#xff0c;这些内容可以参考文后文献资料 定义 AI架构 实现AI评…...

做网站最大的公司/网站维护工程师

你的代码足够有效。我想justhalf的答案还不错。在简而言之&#xff0c;这个代码怎么样&#xff1f;在def randvector(n):theta 2 * np.pi * np.random.random_sample((n))return np.vstack((np.cos(theta), np.sin(theta))).T更新追加cProfile结果。在仅一半的^{pr2}$操作5 fu…...

服装公司网站结构/百度产品大全入口

今天机房突然断电&#xff0c;DB连不上了&#xff0c;提示 无法打开数据库MyDB。恢复操作已将该数据库标记为 SUSPECT。 原因是断电导致DB文件损坏 通过SQL Server Management Studio链接到故障数据库的服务器 执行下面的操作 -- 执行前先把下面数据库的名字MyDB更改为“可疑”…...

网站建设 论文/2345网址导航怎么样

前言&#xff1a;大多数项目中都需要后台对传过来的对象进行校验&#xff0c;所以经常需要写一些字段校验的代码&#xff0c;比如特殊字段非空、字段长度限制和邮箱格式验证等等。之前我们可能都是使用if…else…,写这些与业务逻辑关系不大的代码&#xff0c;不仅验证代码繁琐而…...