Kafka 之事务消息
前言:
在分布式消息系统中,事务消息也是一个热门课题,在项目的实际业务场景中,如果用到事务消息的场景也不少见,那 Kafka 作为一个高性能的分布式消息中间件,同样也支持事务消息,本篇我们将对 Kafka 的事务消息展开讨论。
Kafka 系列文章传送门
Kafka 简介及核心概念讲解
Spring Boot 整合 Kafka 详解
Kafka @KafkaListener 注解的详解及使用
Kafka 客户端工具使用分享【offsetexplorer】
Kafka 之消息同步/异步发送
Kafka 之批量消息发送消费
Kafka 之消息广播消费
Kafka 之消息并发消费
Kafka 之顺序消息
事务消息的使用场景
事务消息的使用场景众多,这里我简单列举几个如下:
- 金融交易处理:在金融领域,每笔交易都必须具备原子性,确保不发生不一致或重复的交易,事务性消息可用于金融交易的场景,来保证交易的完整性。
- 订单处理:用户订单处理必须是完整的,需要保订单的创建、支付和发货不会出现问题,事务性消息可以用于订单场景,来保证整个流程的完整性。
- 。。。。等等等场景。
什么是 Kafka 的事务消息?
Kafka 事务性消息是一种机制,用于确保消息的可靠性传递和处理,与非事务性消息相比,它们在数据处理中提供了额外的保证,一旦消息被写入Kafka 集群,它们将被认为是已经处理,无论发生了什么,Kafka 的事务不同于 RocketMQ,RocketMQ 是保障本地事务与 RocketMQ 消息发送的事务一致性,而 Kafka 的事务消息主要是保障一次发送多条消息的事务一致性,发送的消息要么同时成功要么同时失败。
Kafka 事务消息的特性?
- 原子性:Kafka 事务性消息要么完全成功,要么完全失败,保障了消息不会被部分处理。
- 可靠性:Kafka 事务性消息一旦写入Kafka,它们将被视为已经处理,即使发生了应用程序或系统故障。
- 幂等性:Kafka 生产者可以配置为幂等,确保相同的消息不会被重复发送,对于 Kafka 事务消息通常会配置为幂等。
- Exactly Once 语义:仅一次,Kafka 事务性消息支持仅一次,即消息要么完全到达一次,要么不到达。
Kafka的消息传输保障
Kafka 的消息传输保障分为 3个层级:
at most once:至多一次,消息可能丢失,但绝不会重复。
at least once:最少一次,消息绝不丢失,但可能重复传输。
exactly once:恰好一次,每条消息肯定会被传输一次且仅传输一次。
Kafka 事务消息的使用
Kafka 事务消息的配置
Kafka 的事务消息需要对生产者和消费者进行一些配置来启用 Kafka 对事务消息的支持。
生产者配置
- acks:这个参数我们前面有见到过,是有关生产者接收到确认之后才认为消息发送成功的设置,对于事务性消息,通常将其设置为acks=all(表示需要等到消息写入到所有 ISR 同步副本中,设置为 -1 也是一样的效果),以确保消息在事务完全提交后才被视为成功发送。
- transactional.id:事务id,这是用于标识生产者实例的唯一ID,在配置文件中设置 transactional.id 是启用事务性消息的核心步骤。
消费者配置
- isolation.level:Kafka 消费者隔离级别的设置,对于事务性消息,通常将其设置为isolation.level=read_committed,确保只读取已经提交的事务消息。
- auto.offset.reset:这是消费者启动时从哪里开始读取消息的设置,通常将其设置为auto.offset.reset=earliest(如果分区下有提交的 offset 从提交的 offset 开始消费,如果没有提交的 offset,从头开始消费),以确保不会错过任何已提交的消息。
具体配置如下:
#消息应答
spring.kafka.producer.acks = -1
#事务id配置
spring.kafka.producer.transaction-id-prefix=my-transaction-
#读取已提交的消息
spring.kafka.consumer.isolation-level=read_committed
Kafka 事务消息案例演示
Producer 代码案例
对于生产者我们两种类型可以选择,使用 KafkaTemplate 进行事务消息发送和使用原生 API 进行事务消息发送。
使用 KafkaTemplate 完成事务消息发送代码如下:
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;/*** @ClassName: TransactionalProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息生产者*/
@Slf4j
@Component
public class TransactionalProducer {@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;//发送事务消息(编程式)public void sendTransactionalMessage(int number) {try {kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, String, Object>() {@Overridepublic Object doInOperations(KafkaOperations<String, String> kafkaOperations) {kafkaOperations.send("transactional-topic", "我是第一条事务消息");if (number == 0) {//模拟异常int a = 1 / 0;}kafkaOperations.send("transactional-topic", "我是第二条事务消息");return true;}});} catch (Exception e) {e.printStackTrace();}}//发送事务消息(注解方式)@Transactional(rollbackFor = RuntimeException.class)public void sendTransactionalMessageByAnnotation(int number) {kafkaTemplate.send("transactional-topic", "我是第一条事务消息");if (number == 0) {//模拟异常int a = 1 / 0;}kafkaTemplate.send("transactional-topic", "我是第二条事务消息");}}
上述消息生产者代码中我们使用了两种方式实现,分别是编程式事务和注解式事务消息,大家根据自己的喜好来选择,两种方式都是可以的。
Consumer 代码案例
事务消息的 Consumer 代码没有什么特殊之处,我们使用 Manual ACK 即可。
package com.order.service.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** @ClassName: TransactionalConsumer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息消费者*/
@Slf4j
@Component
public class TransactionalConsumer {@KafkaListener(id = "transactional-consumer",groupId = "transactional-consumer-groupId",topics = "transactional-topic",containerFactory = "myContainerFactory")public void listen(String message, Acknowledgment acknowledgment) {log.info("事务消息消费成功,消息内容:{}", message);//手动提交 ACKacknowledgment.acknowledge();}
}
事务消息结果验证
我们触发注解式事务消息发送,numer 传值 1,得到如下结果:
2024-11-05 19:48:32.649 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
2024-11-05 19:48:32.655 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息
结果符合预期
2024-11-05 19:52:27.444 INFO 20652 --- [nio-8086-exec-8] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-my-transaction-0, transactionalId=my-transaction-0] Aborting incomplete transaction
2024-11-05 19:52:27.451 ERROR 20652 --- [y-transaction-0] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='我是注解式事务第一条事务消息' to topic transactional-topic:org.apache.kafka.common.KafkaException: Failing batch since transaction was abortedat org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:423) [kafka-clients-2.6.0.jar:na]at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:313) [kafka-clients-2.6.0.jar:na]at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) [kafka-clients-2.6.0.jar:na]at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_121]2024-11-05 19:52:27.456 ERROR 20652 --- [nio-8086-exec-8] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.ArithmeticException: / by zero] with root causejava.lang.ArithmeticException: / by zeroat com.order.service.kafka.producer.TransactionalProducer.sendTransactionalMessageByAnnotation(TransactionalProducer.java:49) ~[classes/:na]at com.order.service.kafka.producer.TransactionalProducer$$FastClassBySpringCGLIB$$dbc0d44d.invoke(<generated>) ~[classes/:na]at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-5.3.6.jar:5.3.6]at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) ~[spring-aop-5.3.6.jar:5.3.6]at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) ~[spring-aop-5.3.6.jar:5.3.6]at com.order.service.kafka.producer.TransactionalProducer$$EnhancerBySpringCGLIB$$e4350198.sendTransactionalMessageByAnnotation(<generated>) ~[classes/:na]at com.order.service.controller.KafkaController.sendTransactionalMessageByAnnotation(KafkaController.java:96) ~[classes/:na]at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.6.jar:5.3.6]at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:626) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.6.jar:5.3.6]at javax.servlet.http.HttpServlet.service(HttpServlet.java:733) ~[tomcat-embed-core-9.0.45.jar:4.0.FR]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53) ~[tomcat-embed-websocket-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93) ~[spring-boot-actuator-2.4.5.jar:2.4.5]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.6.jar:5.3.6]at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119) ~[spring-web-5.3.6.jar:5.3.6]at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202) ~[tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707) [tomcat-embed-core-9.0.45.jar:9.0.45]at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_121]at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_121]at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.45.jar:9.0.45]at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
从日志结果我们看到消费者没有消费到任何一条消息(这里我们是在第一条消息发送结束后模拟的异常),结果符合预期。
编程式事务消息这里就不在验证了,有兴趣的朋友可以自己去测试一下。
Producer 使用原生 API 发送事务消息
使用原生 API 发送事务消息,需要有一个 KafkaProducer 对象,我这里使用注入 Bean 的方式,具体如下:
@Bean
public KafkaProducer<String, String> kafkaProducer() {return new KafkaProducer<>(getMyKafkaProps());
}
有了 KafkaProducer 只有我们调用其 API 即可完成事务消息发送,KafkaProducer 有几个重要的 API 如下:
//初始化事务
public void initTransactions();//开启事务
public void beginTransaction() throws ProducerFencedException ;//在事务内提交已经消费的偏移量
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException ;//提交事务
public void commitTransaction() throws ProducerFencedException;//丢弃事务
public void abortTransaction() throws ProducerFencedException ;
使用原生 API 进行事务消息发送代码如下:
package com.order.service.kafka.producer;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @ClassName: NativeTransactionalProducer* @Author: Author* @Date: 2024/10/22 19:22* @Description: 事务消息生产者 原生API*/
@Slf4j
@Component
public class NativeTransactionalProducer {@Autowiredprivate KafkaProducer<String, String> kafkaProducer;//发送事务消息(原生API)public void sendTransactionalMessageByNativeApi(int number) {try {//初始化一个事务kafkaProducer.initTransactions();//开启事务kafkaProducer.beginTransaction();//发送消息kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第一条事务消息"));if (number == 0) {//模拟异常int a = 1 / 0;}kafkaProducer.send(new ProducerRecord<>("transactional-topic", "我是原生API发送出的第二条事务消息"));//提交事务消息kafkaProducer.commitTransaction();} catch (ProducerFencedException e) {//关闭资源kafkaProducer.close();} finally {//关闭资源kafkaProducer.close();}}}
原生 API 发送事务消息结果验证
我们触发原生 API 发送事务消息发送,numer 传值 1,得到如下结果:
2024-11-05 19:48:32.649 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第一条事务消息
2024-11-05 19:48:32.655 INFO 20652 --- [-consumer-2-C-1] c.o.s.k.consumer.TransactionalConsumer : 事务消息消费成功,消息内容:我是注解式事务第二条事务消息
结果符合预期,异常的情况我这里就不演示了,有兴趣的朋友可以自己去验证。
事务消息注意事项
只要开启了事务消息功能,不管是 KafkaProducer 还是 KafkaTemplate 发送的所有消息,都必须处于 Kafka 事务之中,否则会抛出如下异常:
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a recordat org.springframework.util.Assert.state(Assert.java:76)at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:640)at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:552)at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)at com.order.service.kafka.producer.ManualKafkaProducer.sendManualMessage(ManualKafkaProducer.java:34)at com.order.service.controller.KafkaController.sendManualMessage(KafkaController.java:87)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060)at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962)at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898)at javax.servlet.http.HttpServlet.service(HttpServlet.java:626)at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.boot.actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal(WebMvcMetricsFilter.java:93)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201)at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)at java.lang.Thread.run(Thread.java:745)
如果业务中,存在需要事务的情况,也存在不需要事务的情况,那么则需要分别定义两个 KafkaTemplate(Kafka Producer),按需使用。
总结:本篇分享了 Kafka 的事务消息的使用,可以感受到 Kafka 的事务消息和 RocketMQ 是完全不一样的,希望可以帮助到需要用到的伙伴们。
如有不正确的地方欢迎各位指出纠正。
相关文章:
Kafka 之事务消息
前言: 在分布式消息系统中,事务消息也是一个热门课题,在项目的实际业务场景中,如果用到事务消息的场景也不少见,那 Kafka 作为一个高性能的分布式消息中间件,同样也支持事务消息,本篇我们将对 …...
小菜家教平台(四):基于SpringBoot+Vue打造一站式学习管理系统
前言 昨天配置完了过滤器,权限检验,基本的SpringSecurity功能已经配置的差不多了,今天继续开发,明天可能会暂停一天整理一下需求,然后就进行CRUD了。 今日进度 补充SpringSecurity异常处理和全局异常处理器 详细操作…...
解决 Vue3、Vite 和 TypeScript 开发环境下跨域的问题,实现前后端数据传递
引言 本文介绍如何在开发环境下解决 Vite 前端(端口 3000)和后端(端口 80)之间的跨域问题: 在开发环境中,前端使用的 Vite 端口与后端端口不一致,会产生跨域错误提示: Access to X…...
量化交易系统开发-实时行情自动化交易-3.3.数据采集流程
19年创业做过一年的量化交易但没有成功,作为交易系统的开发人员积累了一些经验,最近想重新研究交易系统,一边整理一边写出来一些思考供大家参考,也希望跟做量化的朋友有更多的交流和合作。 接下来说说数据采集流程,后…...
探索PyAV:Python中的多媒体处理利器
文章目录 探索PyAV:Python中的多媒体处理利器第一部分:背景介绍第二部分:PyAV是什么?第三部分:如何安装PyAV?第四部分:简单的库函数使用方法1. 打开文件2. 查看流3. 遍历帧4. 编码帧5. 关闭输出…...
SpringBoot源码解析(三):启动开始阶段
SpringBoot源码系列文章 SpringBoot源码解析(一):SpringApplication构造方法 SpringBoot源码解析(二):引导上下文DefaultBootstrapContext SpringBoot源码解析(三):启动开始阶段 目录 前言一、入口二、SpringApplicationRunListener1、作用…...
C# const与readonly关键字的区别
在C#中,readonly关键字用于定义在对象创建后不能更改的字段。它可以与常量(const)有些相似,但也有显著不同。以下是readonly关键字的一些关键点: 定义与用法: readonly字段可以在类的构造函数中初始化,而const字段必须…...
【数据分享】1901-2023年我国省市县镇四级的逐年降水数据(免费获取/Shp/Excel格式)
之前我们分享过1901-2023年1km分辨率逐月降水栅格数据和Shp和Excel格式的省市县四级逐月降水数据,原始的逐月降水栅格数据来源于彭守璋学者在国家青藏高原科学数据中心平台上分享的数据!基于逐月数据我们采用求年累计值的方法得到逐年降水栅格数据&#…...
hhdb数据库介绍(9-4)
访问安全 权限体系 计算节点有两类用户,一类是计算节点数据库用户,用于操作数据,执行SELECT,UPDATE,DELETE,INSERT等SQL语句。另一类是关系集群数据库可视化管理平台用户,用于管理配置信息。此…...
苍穹外卖的分层所用到的技术以及工具+jwt令牌流程图(jwt验证)
分层用到的技术以及工具: jwt令牌流程图:...
Python——数列1/2,2/3,3/4,···,n/(n+1)···的一般项为Xn=n/(n+1),当n—>∞时,判断数列{Xn}是否收敛
没注释的源代码 from sympy import * n symbols(n) s n/(n1) print(数列的极限为:,limit(s,n,oo))...
css:还是语法
emmet的使用 emmet是一个插件,Emmet 是 Zen Coding 的升级版,由 Zen Coding 的原作者进行开发,可以快速的编写 HTML、CSS 以及实现其他的功能。很多文本编辑器都支持,我们只是学会使用它: 生成html结构 <!-- emme…...
关于 el-table 的合计行问题
目录 一.自定义合计行 二.合计行不展示,只有缩放/变大窗口或者F12弹出后台时才展示 三.合计行出现了表格滚动条下方 四.合计行整体样式的修改 五.合计行单元格样式修改 1.css 2.jsx方式 六.合计行单元格合并 一.自定义合计行 通过 show-summary 属性开启合计…...
解决SVN更新,提交错误乱码
执行清理操作,没有菜单的情况 1.点击TortoiseSVN-设置-如图勾选 注意:下图没有点击上下文菜单勾选清理 选择对应文件目录,执行【清理】操作 2.如果还是乱码,如上操作勾选解除文件锁定, 执行【破除锁定】后再次执行【…...
《Python网络安全项目实战》项目4 编写网络扫描程序
《Python网络安全项目实战》项目4 编写网络扫描程序 项目4 编写网络扫描程序任务4.1 扫描内网有效IP地址任务描述任务分析任务实施任务拓展 任务4.2 编写端口扫描工具任务描述任务分析任务实施相关知识任务评价任务拓展项目评价 项目4 编写网络扫描程序 许多扫描工具是由Pytho…...
Python金融大数据分析概述
💂 个人网站:【 摸鱼游戏】【神级代码资源网站】【海拥导航】💅 想寻找共同学习交流,摸鱼划水的小伙伴,请点击【全栈技术交流群】 金融大数据分析在金融科技领域越来越重要,它涉及从海量数据中提取洞察,为金…...
黑马产品经理
1、合格的产品经理 什么是产品? 什么是产品经理? 想清楚产品怎么做的人。 合格的产品经理 2、产品经理的分类 为什么会有不同的分类? 按服务对象划分 按产品平台划分 公司所属行业不同(不限于以下) 工作内容划分 …...
机器学习——损失函数、代价函数、KL散度
🌺历史文章列表🌺 机器学习——损失函数、代价函数、KL散度机器学习——特征工程、正则化、强化学习机器学习——常见算法汇总机器学习——感知机、MLP、SVM机器学习——KNN机器学习——贝叶斯机器学习——决策树机器学习——随机森林、Bagging、Boostin…...
首次超越扩散模型和非自回归Transformer模型!字节开源RAR:自回归生成最新SOTA!
文章链接:https://arxiv.org/pdf/2411.00776 项目链接:https://yucornetto.github.io/projects/rar.html 代码&模型链接:https://github.com/bytedance/1d-tokenizer 亮点直击 RAR(随机排列自回归训练策略)&#x…...
C语言最简单的扫雷实现(解析加原码)
头文件 #define ROW 9 #define COL 9 #define ROWS ROW2 #define COLS COL2 #include <stdio.h> #include <stdlib.h> #include <time.h> #define numlei 10do while可以循环玩 两个板子,内板子放0,外板子放* set函数初始化两个板子 …...
20. 类模板
一、什么是类模板 类模板用于建立一个通用类,类中的成员数据类型可以不具体指定,用一个虚拟的类型来代替。它的语法格式如下: template<typename T>类模板与函数模板相比主要有两点区别:1) 类模板没有自动类型推导的方式。…...
SSL证书以及实现HTTP反向代理
注意: 本文内容于 2024-11-09 19:20:07 创建,可能不会在此平台上进行更新。如果您希望查看最新版本或更多相关内容,请访问原文地址:SSL证书以及实现HTTP反向代理。感谢您的关注与支持! 之前写的HTTP反向代理工具&…...
多种算法解决组合优化问题平台
🏡作者主页:点击! 🤖编程探索专栏:点击! ⏰️创作时间:2024年11月11日7点12分 点击开启你的论文编程之旅https://www.aspiringcode.com/content?id17302099790265&uidef7618fa204346ff9…...
【笔记】LLC电路工作频点选择 2-1 输出稳定性的限制
LLC工作模式的分析参考了:现代电力电子学,电力出版社,李永东 1.LLC电路可以选择VCS也可以选择ZVS 1.1选择ZCS时,开关管与谐振电感串联后,与谐振电容并联: 1.2选择ZVS时,开关管仅仅安装在谐振电…...
Linux系统程序设计--2. 文件I/O
文件I/O 标准C的I/O FILE结构体 下面只列出了5个成员 可以观察到,有些函数没有FILE类型的结构体指针例如printf主要是一些标准输出,因为其内部用到了stdin,stdout,stderr查找文件所在的位置:find \ -name stat.h查找头文件所…...
右值引用——C++11新特性(一)
目录 一、右值引用与移动语义 1.左值引用与右值引用 2.移动构造和移动赋值 二、引用折叠 三、完美转发 一、右值引用与移动语义 1.左值引用与右值引用 左值:可以取到地址的值,比如一些变量名,指针等。右值:不能取到地址的值…...
JavaScript 观察者设计模式
观察者模式:观察者模式(Observer mode)指的是函数自动观察数据对象,一旦对象有变化,函数就会自动执行。而js中最常见的观察者模式就是事件触发机制。 ES5/ES6实现观察者模式(自定义事件) - 简书 先搭架子 要有一个对象ÿ…...
鸿蒙进阶篇-网格布局 Grid/GridItem(二)
hello大家好,这里是鸿蒙开天组,今天让我们来继续学习鸿蒙进阶篇-网格布局 Grid/GridItem,上一篇博文我们已经学习了固定行列、合并行列和设置滚动,这一篇我们将继续学习Grid的用法,实现翻页滚动、自定义滚动条样式&…...
数据仓库之 Atlas 血缘分析:揭示数据流奥秘
Atlas血缘分析在数据仓库中的实战案例 在数据仓库领域,数据血缘分析是一个重要的环节。血缘分析通过确定数据源之间的关系,以及数据在处理过程中的变化,帮助我们更好地理解数据生成的过程,提高数据的可靠性和准确性。在这篇文章中…...
AndroidStudio-滚动视图ScrollView
滚动视图 滚动视图有两种: 1.ScrollView,它是垂直方向的滚动视图;垂直方向滚动时,layout_width属性值设置为match_parent,layout_height属性值设置为wrap_content。 例如: (1)XML文件中: <?xml ve…...
使用经典wordpress编辑器使用手册/合肥网站关键词排名
1.多线程 1.1 线程安全 如果有多个线程在同时运行,而这些线程可能会同时运行这段代码。程序每次运行结果和单线程运行的结果是一样的,而且其他的变量的值也和预期的是一样的,就是线程安全的。线程安全问题都是由全局变量及静态变量引起的。若…...
做网站的硬件和软件环境/培训学校招生营销方案
Android Wear开发中文教程...
网站建站域名解析最后做/北京网络营销招聘
选择目标编译固件平台 cd ~/source 编辑前检查 make defconfig make prereq 设置固件默认大小及自定义新硬件 想生成自定义名称固件、机器型号需要修改8处地方 主板CPU是MT7620N 硬件是和ZBT WR8305RT的硬件一样 板子上的真实型号是JGX-X5 固件容量: 16M 十六进制&a…...
口碑营销的产品/班级优化大师下载安装app
课 程 设 计 说 明 书题目:基于矩阵键盘、1602液晶屏的简易计算器的设计系统学院(系): 年级专业: 学 号: 学生姓名: 指导教师: 教师职称: xxx大学课程设计(论文)任务书院(系): 理 基…...
做网站有骗子/天津搜索引擎优化
很多学员有以下这些问题,其实除去厂家自行定义的通讯方式外,我们在做项目的时分最常用的通讯方式,通讯不上无非就是这几个缘由能否选择正确通讯协议(modbus、RS232自在协议、CAN0pen、EtherCAT)通讯线能否制造OK&#…...
福州做网站多少钱/seo关键词推广方式
blog.sina.com.cn/s/blog_76a8d80b01018jnf.html Jquery hover 事件中在 IE 中存在的 BUG December 15th 2011评论(11) Jquery 也有 BUG ? 是的!Jquery hover 事件在 IE 中就存在 Bug ! 因为帮朋友做一个在 elegant-box 主题中的特效要用到 J…...