第03讲:SpringCloudStream实现分布式事务
需求分析
本案例是通过一个发送短信验证码的功能来实验MQ发送消息时实现分布式事务,思路分析如下
-
消息生产者生产发送验证码的半消息
-
生产者执行本地事务(将验证码保存到数据库),并记录事务的ID,如果整个过程不出现异常,则提交事务,消息成功投递,否则进行事务的回滚操作
-
MQ二次确认消息是否成功投递,如果没成功(发生了异常),则丢弃消息
需求实现
一、创建项目
- 创建一个主工程(stream-mq-demo),目的是维护项目的版本号、一些必要的类库、集成SpringCloudAlibaba
- 子工程(producer),目的是生产发送验证码的消息,及使用事务将验证码保存到数据库
- 子工程(consumer),目的是消费消息
二、主工程
2.1、pom.xml
目的是维护项目的版本号、一些必要的类库、以及集成SpringCloudAlibaba
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.1.5.RELEASE</version></parent><groupId>org.example</groupId><artifactId>stream-mq-demo</artifactId><packaging>pom</packaging><version>1.0-SNAPSHOT</version><modules><module>producer</module><module>consumer</module></modules><properties><spring-cloud.version>Greenwich.SR1</spring-cloud.version><spring-cloud-alibaba.version>0.9.0.RELEASE</spring-cloud-alibaba.version><java.version>1.8</java.version><lombok.version>1.18.8</lombok.version><rocketmq.version>2.0.3</rocketmq.version><mybatis.plus.version>3.5.1</mybatis.plus.version><mysql.version>8.0.32</mysql.version></properties><dependencies><!-- RocketMQ坐标 --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${rocketmq.version}</version></dependency><!-- SpringCloudStream坐标 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency><!-- SpringWeb坐标 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- lombok坐标 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency><!-- test --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><!--整合spring cloud--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><!--整合spring cloud alibaba--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>${spring-cloud-alibaba.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>
</project>
三、Producer子工程
3.1、pom.xml
添加MyBatisPlus、MySQL、FastJSON类库
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>stream-mq-demo</artifactId><groupId>org.example</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>producer</artifactId><dependencies><!-- mybatis-plus --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>${mybatis.plus.version}</version></dependency><!-- mysql-connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql.version}</version></dependency><!-- fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.12</version></dependency></dependencies>
</project>
3.2、application.yml
- 配置数据源(application_druid.yml)
- 配置端口号为8081
- 配置MQ的name-server地址
- 配置SpringCloudStream的消费者模式并开启事务
- 配置MQ的topic
数据源application_druid.yml
spring:datasource:type: com.zaxxer.hikari.HikariDataSourcedriver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://192.168.0.3:3306/mq_demo?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8&allowPublicKeyRetrieval=trueusername: rootpassword: Aa123123.jackson:date-format: yyyy-MM-dd HH:mm:ss
mybatis-plus:type-aliases-package: demo.entityconfiguration:log-impl: org.apache.ibatis.logging.stdout.StdOutImplglobal-config:db-config:table-prefix: t_
主配置文件application.yml
spring:profiles:include: druidcloud:stream:rocketmq:binder:name-server: 192.168.0.3:9876bindings:#消费者output:producer:#事务消息transactional: true#与AddBonusTransactionListener类中@RocketMQTransactionListener一致group: tx-captcha-groupbindings:output:#用来指定topic,要和content-center微服务的topic匹配destination: captcha-topic
server:port: 8081
3.3、启动类
使用@EnableBinding(Source.class)定义消息的推送管道
Source.class源代码
public interface Source {String OUTPUT = "output";@Output("output")MessageChannel output();
}
application.yml中配置的output属性
启动类
package demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;@SpringBootApplication
@EnableBinding(Source.class)
public class ProducerApplication {public static void main(String[] args) {SpringApplication.run(ProducerApplication.class, args);}
}
3.4、必要的实体类
验证码类
package demo.entity;import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import lombok.Builder;
import lombok.Data;import java.util.Date;@Data
@Builder
public class Captcha {@JsonSerialize(using = ToStringSerializer.class)@TableId(type = IdType.AUTO)private Integer id;private String captcha;private String phone;private Date publishTime;
}
事务日志类
package demo.entity;import lombok.Builder;
import lombok.Data;import java.util.Date;@Data
@Builder
public class TransactionLog {private String transactionId;private Date createTime;private String log;
}
3.5、本地事务类
- 发送半消息
- 保存验证码到数据库并记录日志
package demo.service;import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import demo.entity.Captcha;
import demo.entity.TransactionLog;
import demo.mapper.CaptchaMapper;
import demo.mapper.TransactionLogMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class SendCaptchaService {private final Source source;private final TransactionLogMapper transactionLogMapper;private final CaptchaMapper boundMapper;/** 发送半消息*/public void sendCaptchaMsg(Captcha captcha){// 发送半消息。。String transactionId = UUID.randomUUID().toString();Map<String, Object> msg = new HashMap<>();msg.put("phone", captcha.getPhone());msg.put("captcha", captcha.getCaptcha());this.source.output().send(MessageBuilder.withPayload(msg)// header也有妙用....setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader("dto", JSON.toJSONString(captcha)).build());}/**添加验证码到数据库,并记录事务日志*/@Transactional(rollbackFor = Exception.class)public void addBoundWithRocketMqLog(Captcha captcha, String transactionId) {//执行本地事务this.addBound(captcha);//记录MQ事务日志transactionLogMapper.insert(TransactionLog.builder().transactionId(transactionId).createTime(new Date()).log("发送短信验证码").build());}/**将验证码保存到数据库*/@Transactional(rollbackFor = Exception.class)public void addBound(Captcha captcha){captcha.setPublishTime(new Date());boundMapper.insert(captcha);}
}
3.6、MQ事务类
MQ事务类实现RocketMQLocalTransactionListener
接口
- 重写用于执行本地事务的方法
executeLocalTransaction
,在该方法中执行本地事务类的保存验证码到数据库并记录日志的方法addBoundWithRocketMqLog
- 重写本地事务的检查接口,检查本地事务是否执行成功,即:MQ没有收到执行本地事务后的二次确认
checkLocalTransaction
,在该方法中去查询事务日志表(t_transaction_log)是否存在相同事务ID的日志,如果不存在则将消息丢弃,否则标记为成功投递
package demo.mq;import com.alibaba.fastjson.JSON;
import demo.entity.Captcha;
import demo.entity.TransactionLog;
import demo.mapper.TransactionLogMapper;
import demo.service.SendCaptchaService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;@RocketMQTransactionListener(txProducerGroup = "tx-captcha-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class SendCaptchaTransactionListener implements RocketMQLocalTransactionListener {private final SendCaptchaService addBoundService;private final TransactionLogMapper transactionLogMapper;/** 用于执行本地事务的方法*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);String dtoString = (String) headers.get("dto");Captcha bound = JSON.parseObject(dtoString, Captcha.class);//本地事务(service层用@Transaction标注的方法)成功就提交,本地事务失败就回滚try {//执行本地事务addBoundService.addBoundWithRocketMqLog(bound, transactionId);return RocketMQLocalTransactionState.COMMIT; //本地事务执行成功就提交MQ} catch (Exception e) {return RocketMQLocalTransactionState.ROLLBACK; //本地事务执行失败就回滚MQ}}/** 本地事务的检查接口,检查本地事务是否执行成功,即:MQ没有收到执行本地事务后的二次确认*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info("MQ二次事务检查,transactionID={}", transactionId);// 从MQ事务日志表里查,看看对应的事务ID是否存在记录,如果存在则表示成功(COMMIT),否则表示执行本地事务失败(ROLLBACK)TransactionLog transactionLog = transactionLogMapper.selectById(transactionId);if (transactionLog != null) {return RocketMQLocalTransactionState.COMMIT;} else {return RocketMQLocalTransactionState.ROLLBACK;}}
}
3.7、测试
使用单元测试,创建测试方法调用本地事务类发送半消息
package demo;import demo.entity.Captcha;
import demo.service.SendCaptchaService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;@SpringBootTest(classes = {ProducerApplication.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestSendCaptcha {@Autowiredprivate SendCaptchaService sendCaptchaMsg;@Testpublic void test(){//随机生成一个4位的验证码String code = "";for(int i=0; i<4; i++){code += (int)(Math.random()*10);}//发送半消息sendCaptchaMsg.sendCaptchaMsg(Captcha.builder().captcha(code).phone("13843188848").build());}
}
运行单元测试方法之后浏览器访问MQ-Dashboard可以看到topic已经被创建
在Message中可以看到刚刚发送的消息
消息详情
数据库验证码表(t_captcha)插入了数据
数据库事务日志表(t_transaction_log)插入了数据
Tip:可以在本地事务中模拟一个运行时异常,可以发现事务日志表中并无法插入日志,在MQ事务二次确认消息的时候会讲消息丢弃
四、Consumer子工程
4.1、application.yml
- 配置端口号为8082
- 配置MQ的name-server地址
- 配置MQ的topic
- 配置group,如果使用的消息队列是RocketMQ,则该属性务必配置,内容可以是任意字符串
spring:cloud:stream:rocketmq:binder:name-server: 192.168.0.3:9876bindings:#消息消费者input:#用来指定topic,要和消息生产者的的topic匹配destination: captcha-topic#一定要设置,必填项,如果用其他MQ,该属性可以不设置group: test
server:port: 8082
4.2、启动类
- 使用@EnableBinding(Sink.class)定义消息的推送管道
Sink.class源代码
public interface Sink {String INPUT = "input";@Input("input")SubscribableChannel input();
}
application.yml中配置的input属性
- 使用@StreamListener(Sink.INPUT)注解监听消息
- 使用@StreamListener(“errorChannel”)统一处理MQ的异常
启动类
package demo;import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;import java.util.HashMap;@Slf4j
@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsumerApplication {/*** 消费消息监听器** @param message*/@StreamListener(Sink.INPUT)public void receive(HashMap<String, Object> message) {log.info("消费消息={}", message);}/*** 全局异常处理** @param message 发生异常的消息*/@StreamListener("errorChannel")public void error(Message<?> message) {ErrorMessage errorMessage = (ErrorMessage) message;log.warn("RocketMQ-SpringCloudStream发生异常,errorMessage={}", errorMessage);}public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}
}
4.3、测试
再次执行Producer子工程单元测试方法发送半消息,发现在Consumer子工程中成功监听到了消息
4.4、消息过滤器
在@StreamListener
注解中可以使用condition属性来定义要匹配(过滤)的消息,将消费者改造一下,只接收手机号为13843188848的消息
Tip:该方式只支持RoketMQ,不支持Kafka/RabbitMQ
/*** 消费消息监听器** condition的作用是消息过滤,当前案例是匹配消息中header属性phone的值为13843188848的消息*/@StreamListener(value = Sink.INPUT, condition = "headers['phone']=='13843188848'")public void receive(HashMap<String, Object> message) {log.info("消费消息={}", message);}
如果不满足匹配条件将会有提示
但是消息已经成功发送
相关文章:

第03讲:SpringCloudStream实现分布式事务
需求分析 本案例是通过一个发送短信验证码的功能来实验MQ发送消息时实现分布式事务,思路分析如下 消息生产者生产发送验证码的半消息 生产者执行本地事务(将验证码保存到数据库),并记录事务的ID,如果整个过程不出现异…...

【从零开始学Skynet】高级篇(一):Protobuf数据传输
1、什么是Protobuf Protobuf是谷歌发布的一套协议格式,它规定了一系列的编码和解 码方法,比如对于数字,它要求根据数字的大小选择存储空间,小于等于15的数字只用1个字节来表示,大于15的数用2个字节表示,以此…...

快速入门Lombok
Lombok是一个Java库,可以通过注解的方式来简化Java代码,它可以自动生成Getter、Setter、构造函数等代码,从而减少重复的模板代码。下面是Lombok的使用详情: 1. 添加Lombok依赖 在使用Lombok之前,我们需要先添加Lombo…...

Linux 常见命令与常见问题解决思路
Linux 常见命令 Linux 基础命令目录相关查看文件(日志)查看普通的文件查看压缩的文件 解压压缩Linux 系统调优topvmstatpidstatps vi/vim 编辑文件查找文件属性相关定时任务scp 复制文件和目录awk 分隔cutsort 与 uniq常见问题处理思路CPU 高系统平均负载…...

用GPT-4 写2022年天津高考作文能得多少分?
正文共 792 字,阅读大约需要 3 分钟 学生必备技巧,您将在3分钟后获得以下超能力: 积累作文素材 Beezy评级 :B级 *经过简单的寻找, 大部分人能立刻掌握。主要节省时间。 推荐人 | Kim 编辑者 | Linda ●图片由Lexica …...

Django如何把SQLite数据库转换为Mysql数据库
大部分新手刚学Django开发的时候默认用的都是SQLite数据库,上线部署的时候,大多用的却是Mysql。那么我们应该如何把数据库从SQLite迁移转换成Mysql呢? 之前我们默认使用的是SQLite数据库,我们开发完成之后,里面有许多数…...

使用apisix代理静态文件
前言 最近公司考虑用apisix作为公司网关并且部署到k8s上,我这边收到一个小任务:使用apisix代理静态文件 通过apisix官网了解到它构建于 NGINX ngx_lua 的技术基础之上,所以按理应该和nginx代理静态资源是一样的。因为是通过docker容器部署…...

[元带你学NVMe协议] NVMe1.4 多路径(Multipathing)
声明 主页:元存储的博客_CSDN博客 依公开知识及经验整理,如有误请留言。 个人辛苦整理,付费内容,禁止转载。 内容摘要 全文9100字, 主要内容 目录 前言 1 多路径(Multipathing)概念...

Elasticsearch:如何使用自定义的证书安装 Elastic Stack 8.x
在我之前的文章 “如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch”,我详细描述了如何在各个平台中安装 Elastic Stack 8.x。在其中的文章中,我们大多采用默认的证书来安装 Elasticsearch。在今天的文章中,我们用自己创…...

HADOOP--yarn ,, git
Yarn架构体系 主从架构 也是采用 master(Resource Manager)- slave (Node Manager)架构,Resource Manager 整个集群只有一个,一个可靠的节点。 1、 每个节点上可以负责该节点上的资源管理以及任务调度&am…...

IOS开发指南之UITableView控件使用
1.创建一个IOS单页应用 2.双击Main.storyboard然后拖放UITableView到视图中 3.添加TableViewCell 成功添加Table View Cell 4.修改Table View Cell属性 选中Table View Cell 在右边的Image栏输入default.png回车 到此布局设计完成,现在运行还是显示 空白,要在代码中做相关的实…...
C语言中的数据类型
目录 一、数据类型 1.基本类型 2.sizeof运算符 3.signed和unsigned 二、基本数据类型的取值范围 1.比特位 2.字节 3.符号位 4.补码 5.基本数据类型的取值范围 一、数据类型 1.基本类型 (1)整数类型 short intintlong intlong long int &…...

什么是微服务中的熔断器设计模式?
在本文中,我将解释什么是熔断器设计模式以及它解决了什么问题。 我们将仔细研究熔断器设计模式,并探讨如何使用Spring Cloud Netflix Hystrix在Java中实现它。到本文结束时,您将更好地了解如何使用熔断器设计模式提高微服务架构的弹性。 熔断…...

Ubuntu查看系统日志的几种方法
在 Ubuntu 22.10 中,你可以查看系统日志来排查错误。以下是几种查看日志的方法: 一、Journalctl 命令: 使用 journalctl 命令可以查看系统日志信息,包括引起闪退的错误信息。你可以运行以下命令来查看最新的系统日志:…...

【ubuntu】安装ZIP
【ubuntu】安装ZIP 输入如下命令安装zip $ sudo apt-get install zip 输出信息如下: Reading package lists... Done Building dependency tree Reading state information... Done The following additional packages will be installed: unzip The follo…...

DiffDock源码解析
DiffDock源码解析 数据预处理 数据输入方式 df pd.read_csv(args.protein_ligand_csv), 使用的是csv的方式输入, 格式: 不管受体还是配体, 输入可以是序列或者3维结构的文件 如果蛋白输入的是序列,需要计算蛋白的三维结构&am…...

1099 Build A Binary Search Tree(超详细注解+38行代码)
分数 30 全屏浏览题目 作者 CHEN, Yue 单位 浙江大学 A Binary Search Tree (BST) is recursively defined as a binary tree which has the following properties: The left subtree of a node contains only nodes with keys less than the nodes key.The right subtree…...

[刷题]贪心入门
文章目录 贪心区间问题区间选点区间合并区间覆盖 哈夫曼树(堆)合并果子 排序不等式排队打水 绝对值不等式货仓选址 推出来的不等式耍杂技的牛 以前的题 贪心 贪心:每一步行动总是按某种指标选取最优的操作来进行, 该指标只看眼前&…...

项目集战略一致性
项目集战略一致性是识别项目集输出和成果,以便与组织的目标和目的保持一致的绩效领域。 本章内容包括: 1 项目集商业论证 2 项目集章程 3 项目集路线图 4 环境评估 5 项目集风险管理战略 项目集应与组织战略保持一致,并促进组织效益的实现。为…...

Linux学习 Day3
目录 1. 时间相关的指令 2. cal指令 3. find指令:(灰常重要) -name 4. grep指令 5. zip/unzip指令 6. tar指令(重要):打包/解包,不打开它,直接看内容 7. bc指令 8. uname –…...

前端开发推荐vscode安装什么插件?
前言 可以参考一下下面我推荐的插件,注意:插件的目的是用于提高开发的效率,节约开发的时间,像类似检查一些bug、拼写错误等这些可以使用插件快速的识别,避免在查找错误上浪费过多的时间,但切记不要过度依赖…...

如何打造完整的客户服务体系?
对于企业来说,提供优质的客户服务是保持竞争力和赢得市场份额的关键因素之一。一个高效、专业、人性化的客户服务体系,对于企业吸引和留住客户,提升品牌声誉,甚至增加销售额都有着不可忽视的作用。本文将从多个方面来阐述如何打造…...

裸奔时代,隐私何处寻?
随着互联网的普及,人工智能时代的大幕初启,数据作为人工智能的重要支撑,数据之争成为“兵家必争之地”,随之而来的就是,各种花式手段“收割”个人信息,用户隐私暴露程度越来越高,隐私保护早已成…...

从期望最大化(EM)到变分自编码器(VAE)
本文主要记录了自己对变分自编码器论文的理解。 Kingma D P, Welling M. Auto-encoding variational bayes[J]. arXiv preprint arXiv:1312.6114, 2013. https://arxiv.org/abs/1312.6114 1 带有潜在变量的极大似然估计 假设我们有一个有限整数随机数发生器 z ∼ p θ ( z ) …...

【数学杂记】表达式中的 s.t. 是什么意思
今天写题的时候遇见了这个记号:s.t.,查了一下百度。 s.t.,全称 subject to,意思是“使得……满足”。 比如这个: 意思是存在 i i i,使得 i i i 满足 A i ≠ B i A_i\neq B_i AiBi. 运用这个记号…...

flink watermark介绍及watermark的窗口触发机制
Flink的三种时间 在谈watermark之前,首先需要了解flink的三种时间概念。在flink中,有三种时间戳概念:Event Time 、Processing Time 和 Ingestion Time。其中watermark只对Event Time类型的时间戳有用。这三种时间概念分别表示: …...

Spring Cloud: 云原生微服务实践
文章目录 1. Spring Cloud 简介2. Spring Cloud Eureka:服务注册与发现在Spring Cloud中使用Eureka 3. Spring Cloud Config:分布式配置中心在Spring Cloud中使用Config 4. Spring Cloud Hystrix:熔断器在Spring Cloud中使用Hystrix 5. Sprin…...

存bean和取bean
准备工作存bean获取bean三种方式 准备工作 bean:一个对象在多个地方使用。 spring和spring boot:spring和spring boot项目;spring相当于老版本 spring boot本质还是spring项目;为了方便spring项目的搭建;操作起来更加简单 spring…...

39. 组合总和
给你一个 无重复元素 的整数数组 candidates 和一个目标整数 target ,找出 candidates 中可以使数字和为目标数 target 的 所有 不同组合 ,并以列表形式返回。你可以按 任意顺序 返回这些组合。 candidates 中的 同一个 数字可以 无限制重复被选取 。如…...

100行以内Python能做那些事
Python100 找到一个很好的python教程分享出来---->非本人 B站视频连接 100行以内的Pyhton代码可以做哪些有意思的事 按照难度1-5颗星,分为五个文件夹 希望大家可以补充 关于运行环境的补充 Python3.7 Pycharm社区版2019 关于用到的Python库,有些是自带的&am…...