当前位置: 首页 > news >正文

一文上手Kafka【中】

一、发送消息细节

在发送消息的特别注意: 在版本 3.0 中,以前返回 ListenableFuture 的方法已更改为返回 CompletableFuture。为了便于迁移,2.9 版本添加了一个方法 usingCompletableFuture(),该方法为 CompletableFuture 返回类型提供了相同的方法;此方法不再可用。

1.1 ProducerConfig

在spring kafka项目当中,提供了Kafka 生产者的相关配置.在类ProducerConfig当中,其值分别定义在不同的常量类当中. 结合上篇当中发送消息的时候控制台输出的日志,具体字段含义如下所示:

通用配置:

  • acks = 1:生产者要求领导者在确认消息写入之前收到的最少同步副本数量。设置为 1
    表示领导者成功写入消息后即确认,不等待副本同步完成。
  • auto.include.jmx.reporter = true:自动包含用于Java Management Extensions(JMX)的报告器,以便通过 JMX 监控生产者的指标。
  • batch.size =16384:当多个消息发送到同一分区时,生产者将尝试在单个请求中发送消息的批量大小(以字节为单位)。
  • bootstrap.servers =[ip:9092]:Kafka 集群的服务器地址列表,用于建立初始连接。
  • buffer.memory =33554432:生产者可以用来缓冲等待发送到服务器的记录的总内存大小(以字节为单位)。
  • client.id =rj-spring-kafka-demo-producer-1:生产者的客户端 ID,用于在 Kafka 服务器端标识此生产者。
  • compression.type = none:消息的压缩类型,可以是 none、gzip、snappy 等。
  • connections.max.idle.ms = 540000:在关闭不活动的连接之前,连接可以保持空闲的最长时间(以毫秒为单位)。
  • delivery.timeout.ms = 120000:消息发送的超时时间,包括所有可能的重试。
  • enable.idempotence =false:是否启用生产者的幂等性,确保在重试时不会产生重复的消息。 >
  • enable.metrics.push =true:是否启用推送生产者的指标数据到外部系统。
  • interceptor.classes =[]:生产者拦截器的类列表,用于在发送消息之前或之后执行自定义逻辑。
  • key.serializer = class.org.apache.kafka.common.serialization.StringSerializer:用于序列化消息键的类。
  • linger.ms = 0:生产者在发送批次之前等待的额外时间(以毫秒为单位),以允许更多消息积累在批次中。
  • max.block.ms =60000:生产者在发送消息或获取元数据等操作中阻塞的最长时间。
  • max.in.flight.requests.per.connection = 5:在单个连接上允许的最大未确认请求数量。
  • max.request.size = 1048576:生产者请求的最大大小(以字节为单位)。
  • metadata.max.age.ms = 300000:元数据(如主题分区信息)的过期时间(以毫秒为单位),之后将强制刷新元数据。
  • metadata.max.idle.ms = 300000:元数据在没有任何更新的情况下保持有效的最长时间。
  • metric.reporters = []:自定义的指标报告器类列表。 metrics.num.samples =2:用于计算指标的样本数量。
  • metrics.recording.level = INFO:指标记录的级别,例如 INFO、DEBUG 等。
  • metrics.sample.window.ms = 30000:用于计算指标的时间窗口大小(以毫秒为单位)。
  • partitioner.adaptive.partitioning.enable =true:是否启用自适应分区功能,根据负载动态调整分区分配。
  • partitioner.availability.timeout.ms =0:分区器在确定分区不可用时等待的时间(以毫秒为单位)。 >
  • partitioner.class =null:自定义分区器的类,如果未设置则使用默认分区器。
  • partitioner.ignore.keys = false:是否忽略消息的键,不基于键进行分区。
  • receive.buffer.bytes = 32768:套接字接收缓冲区的大小(以字节为单位)。
  • reconnect.backoff.max.ms =1000:重新连接的最大退避时间(以毫秒为单位)。
  • reconnect.backoff.ms =50:重新连接的初始退避时间(以毫秒为单位)。
  • request.timeout.ms =30000:生产者请求的超时时间,包括发送请求和接收响应的时间。
  • retries = 3:生产者在发送消息失败时的重试次数。
  • retry.backoff.max.ms = 1000:重试之间的最大退避时间(以毫秒为单位)。
  • retry.backoff.ms =100:重试之间的初始退避时间(以毫秒为单位)。

SASL 相关配置(用于安全认证):

  • sasl.client.callback.handler.class = null:SASL 客户端回调处理程序的类。
  • sasl.jaas.config = null:Java Authentication and Authorization Service(JAAS)配置,用于 SASL 认证。
  • sasl.kerberos.kinit.cmd = /usr/bin/kinit:Kerberos 的 kinit 命令路径。
  • sasl.kerberos.min.time.before.relogin = 60000:Kerberos 重新登录之前的最小时间(以毫秒为单位)。
  • sasl.kerberos.service.name = null:Kerberos 服务名称。
  • sasl.kerberos.ticket.renew.jitter = 0.05:Kerberos 票证更新的抖动因子。
  • sasl.kerberos.ticket.renew.window.factor = 0.8:Kerberos 票证更新的窗口因子。
  • sasl.login.callback.handler.class = null:SASL 登录回调处理程序的类。
  • sasl.login.class = null:SASL 登录机制的类。
  • sasl.login.connect.timeout.ms = null:SASL 登录连接超时时间(以毫秒为单位)。
  • sasl.login.read.timeout.ms = null:SASL 登录读取超时时间(以毫秒为单位)。
  • sasl.login.refresh.buffer.seconds = 300:SASL 登录刷新缓冲区时间(以秒为单位)。
  • sasl.login.refresh.min.period.seconds = 60:SASL 登录刷新的最小周期(以秒为单位)。
  • sasl.login.refresh.window.factor = 0.8:SASL 登录刷新的窗口因子。
  • sasl.login.refresh.window.jitter = 0.05:SASL 登录刷新的抖动因子。
  • sasl.login.retry.backoff.max.ms = 10000:SASL 登录重试的最大退避时间(以毫秒为单位)。
  • sasl.login.retry.backoff.ms = 100:SASL 登录重试的初始退避时间(以毫秒为单位)。
  • sasl.mechanism = GSSAPI:SASL 认证机制,如 GSSAPI、PLAIN 等。
  • sasl.oauthbearer.clock.skew.seconds = 30:OAuth Bearer 令牌的时钟偏差时间(以秒为单位)。
  • sasl.oauthbearer.expected.audience = null:预期的 OAuth Bearer 令牌受众。
  • sasl.oauthbearer.expected.issuer = null:预期的 OAuth Bearer 令牌发行者。
  • sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000:OAuth Bearer JWKS 端点的刷新时间(以毫秒为单位)。
  • sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000:OAuth Bearer JWKS 端点重试的最大退避时间(以毫秒为单位)。
  • sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100:OAuth Bearer JWKS 端点重试的初始退避时间(以毫秒为单位)。
  • sasl.oauthbearer.jwks.endpoint.url = null:OAuth Bearer JWKS 端点的 URL。
  • sasl.oauthbearer.scope.claim.name = scope:OAuth Bearer 令牌范围声明的名称。
  • sasl.oauthbearer.sub.claim.name = sub:OAuth Bearer 令牌主题声明的名称。
  • sasl.oauthbearer.token.endpoint.url = null:OAuth Bearer 令牌端点的 URL。

安全协议相关配置:

  • security.protocol = PLAINTEXT:生产者使用的安全协议,如 PLAINTEXT、SSL、SASL_PLAINTEXT 等。
  • security.providers = null:安全提供程序的类列表。

网络相关配置:

  • send.buffer.bytes = 131072:套接字发送缓冲区的大小(以字节为单位)。
  • socket.connection.setup.timeout.max.ms = 30000:套接字连接设置的最大超时时间(以毫秒为单位)。
  • socket.connection.setup.timeout.ms = 10000:套接字连接设置的初始超时时间(以毫秒为单位)。

SSL 相关配置(用于加密连接):

  • ssl.cipher.suites = null:SSL 加密套件列表。
  • ssl.enabled.protocols = [TLSv1.2, TLSv1.3]:启用的 SSL 协议版本列表。
  • ssl.endpoint.identification.algorithm = https:SSL 端点标识算法。
  • ssl.engine.factory.class = null:SSL 引擎工厂的类。
  • ssl.key.password = null:SSL 密钥密码。
  • ssl.keymanager.algorithm = SunX509:SSL 密钥管理器算法。
  • ssl.keystore.certificate.chain = null:SSL 密钥库证书链。
  • ssl.keystore.key = null:SSL 密钥库的密钥。
  • ssl.keystore.location = null:SSL 密钥库的位置。
  • ssl.keystore.password = null:SSL 密钥库的密码。
  • ssl.keystore.type = JKS:SSL 密钥库的类型。
  • ssl.protocol = TLSv1.3:SSL 协议版本。
  • ssl.provider = null:SSL 提供程序。
  • ssl.secure.random.implementation = null:SSL 安全随机数生成器的实现。
  • ssl.trustmanager.algorithm = PKIX:SSL 信任管理器算法。
  • ssl.truststore.certificates = null:SSL 信任库证书。
  • ssl.truststore.location = null:SSL 信任库的位置。
  • ssl.truststore.password = null:SSL 信任库的密码。
  • ssl.truststore.type = JKS:SSL 信任库的类型。

事务相关配置:

  • transaction.timeout.ms = 60000:事务的超时时间(以毫秒为单位)。
  • transactional.id = null:事务 ID,用于标识一个事务性生产者。

序列化相关配置:

  • value.serializer = class org.apache.kafka.common.serialization.StringSerializer:用于序列化消息值的类。

1.2 sendDefault

CompletableFuture<SendResult<K, V>> sendDefault(V data);

该api要求向模板提供的默认主题发送消息.要使用该模板,您可以配置生产者工厂并在模板的构造函数中提供它.

@Configuration
public class KafkaConfig {@Beanpublic Map<String, Object> producerConfig(){Map<String, Object> map = new HashMap<>();map.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:9092");map.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);map.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);map.put(ProducerConfig.RETRIES_CONFIG, "3");return map;}@Beanpublic ProducerFactory<Integer, Object> producerFactory(){return new DefaultKafkaProducerFactory<>(producerConfig());}@Beanpublic KafkaTemplate<Integer,Object> kafkaTemplate(){KafkaTemplate<Integer, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());// 设置默认主题kafkaTemplate.setDefaultTopic("rj-default-topic");return kafkaTemplate;}
}

此时发送的时候,可以不必指定主题了,而直接将消息发送到我们自己定义的默认的主题当中了

    @GetMapping("/default")public String sendDefaultMsg(String msg) throws ExecutionException, InterruptedException {CompletableFuture<SendResult<Integer, Object>> completableFuture = kafkaTemplate.sendDefault(msg);SendResult<Integer, Object> sendResult = completableFuture.get();log.info("sendResult:{}", sendResult);return "向默认主题发送消息";}

从版本 2.5 开始,您现在可以覆盖工厂的 ProducerConfig 属性,以从同一工厂创建具有不同生产者配置的模板。

    @Beanpublic KafkaTemplate<Integer,Object> kafkaTemplate(){KafkaTemplate<Integer, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());// 设置默认主题kafkaTemplate.setDefaultTopic("rj-default-topic");return kafkaTemplate;}/*** 从同一个工厂创建具有不同生产者的配置的模块* @param producerFactory* @return*/@Beanpublic KafkaTemplate<String, String> stringKafkaTemplate(ProducerFactory<String, String> producerFactory){return new KafkaTemplate<>(producerFactory, Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));}// 如果要重用ProducerFactory,则必须修改一下ProducerFactory的初始的泛型,修改为如下的格式@Beanpublic ProducerFactory<?, ?> producerFactory(){return new DefaultKafkaProducerFactory<>(producerConfig());}

当然以上的ProducerFactory相关配置的属性,也可以在application.yml配置文件当中进行配置.

1.3 Message接口

在使用KafkaTemplate发送数据的时候,可以直接发送一个Message.方法定义如下所示:

	@Overridepublic CompletableFuture<SendResult<K, V>> send(Message<?> message) {ProducerRecord<?, ?> producerRecord = this.messageConverter.fromMessage(message, this.defaultTopic);if (!producerRecord.headers().iterator().hasNext()) { // possibly no Jacksonbyte[] correlationId = message.getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);if (correlationId != null) {producerRecord.headers().add(KafkaHeaders.CORRELATION_ID, correlationId);}}return observeSend((ProducerRecord<K, V>) producerRecord);}

这里需要注意, Message是在org.springframework.messaging包当中定义的,定义接口如下所示:

// 带有 headers 和 body 的通用消息表示形式。
public interface Message<T> {/*** Return the message payload.* 消息体*/T getPayload();/*** Return message headers for the message (never {@code null} but may be empty).* 消息头,可以在发送的时候指定*/MessageHeaders getHeaders();}

继承关系
这里我们使用实现类GenericMessage即可:

// 推荐使用这个构造方法,简单方便
public GenericMessage(T payload, Map<String, Object> headers) {this(payload, new MessageHeaders(headers));
}public GenericMessage(T payload, MessageHeaders headers) {Assert.notNull(payload, "Payload must not be null");Assert.notNull(headers, "MessageHeaders must not be null");this.payload = payload;this.headers = headers;
}

测试代码:

 @GetMapping("/message")public String sendMessage(){Map<String, Object> map = new HashMap<>();// 向 Kafka 发送数据时包含主题的headermap.put(KafkaHeaders.TOPIC, Constants.Kafka.TOPIC_NAME);map.put(KafkaHeaders.KEY, "rj");// 包含从中接收消息的主题的header。map.put(KafkaHeaders.RECEIVED_TOPIC, Constants.Kafka.TOPIC_NAME);// 创建MessageHeaders对象MessageHeaders messageHeaders = new MessageHeaders(map);// 构建Message对象Message<String> message = new GenericMessage<>("hello, message!", messageHeaders);// 将Message发送到指定的topicCompletableFuture<SendResult<Integer, Object>> completableFuture = kafkaTemplate.send(message);completableFuture.whenComplete((result, ex) -> {if (ex == null) {System.out.println("发送成功");} else {System.out.println("发送失败");}});return "发送成功!";}

上述Map集合的key直接使用定义好的即可: KafkaHeaders, 用的时候,需要啥就添加啥.
header定义
注意事项:

  • 使用的KafkaTemplate发送消息的时候,要注意泛型匹配的问题.这里步及到key、value的序列化与反序列化操作.
  • 如果重用了ProducerFactory则需要注意使用的泛型和发送消息的类型是否能匹配得上.

如上所示: 我们使用的的是private final KafkaTemplate<String, String> kafkaTemplate,我们通过配置文件,注入到容器的类型是:

  @Bean
public KafkaTemplate<String, String> stringKafkaTemplate(ProducerFactory<String, String> producerFactory){return new KafkaTemplate<>(producerFactory,// 【注意】: 这里重新设置了value的序列化,而对于key的序列化是在构建ProducerFactory的时候,传入的. Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
}

而注入到容器当中ProducerFactory对象,在构建的时候,分别设置了key、value的序列化规则.
序列化
以上的泛型必须得能匹配上,或者可以直接转换,否则会抛出异常.

1.4 ProducerListener

使用ProducerListener配置KafkaTemplate,以获取带有发送结果(成功或失败)的异步回调,而不是等待Future完成。下面的清单显示了ProducerListener接口的定义:

public interface ProducerListener<K, V> {// 发送成功void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);// 发送失败void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,Exception exception);
}

默认情况下,模板配置了 LoggingProducerListener,它会记录错误,并且在发送成功时不执行任何操作。

@Slf4j
public class CustomProducerListener implements ProducerListener<String, String> {@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {String topic = producerRecord.topic();String key = producerRecord.key();String value = producerRecord.value();Long timestamp = producerRecord.timestamp();int partition = recordMetadata.partition();long offset = recordMetadata.offset();log.info("消息发送成功,topic:{},key:{},value:{},timestamp:{},partition:{},offset:{}", topic, key, value, timestamp, partition, offset);}@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {String topic = producerRecord.topic();String key = producerRecord.key();String value = producerRecord.value();Long timestamp = producerRecord.timestamp();int partition = recordMetadata.partition();long offset = recordMetadata.offset();log.error("消息发送失败,topic:{},key:{},value:{},timestamp:{},partition:{},offset:{}, exception message: {}", topic, key, value, timestamp, partition, offset, exception.getMessage());}
}

将自定义ProducerListener的对象,配置到KafkaTemplate当中

/*** 从同一个工厂创建具有不同生产者的配置的模块* @param producerFactory* @return*/
@Bean
public KafkaTemplate<String, String> stringKafkaTemplate(ProducerFactory<String, String> producerFactory){KafkaTemplate<String, String> stringStringKafkaTemplate = new KafkaTemplate<>(producerFactory, Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));// 配置ProducerListenerstringStringKafkaTemplate.setProducerListener(new CustomProducerListener());// 配置默认主题stringStringKafkaTemplate.setDefaultTopic("rj-string-topic");return stringStringKafkaTemplate;
}

发送消息:

@GetMapping("/listener")
public String sendMsgProducerListener(String msg) {CompletableFuture<SendResult<String, String>> completableFuture = stringKafkaTemplate.sendDefault(msg);completableFuture.whenComplete((result, ex) -> {if (ex == null) {System.out.println("发送成功");} else {System.out.println("发送失败");}});return "发送成功!";
}

控制台日志输出如下所示:

2024-09-24T19:20:07.061+08:00  INFO 16436 --- [rj-spring-kafka-demo] [demo-producer-1] c.r.k.listener.CustomProducerListener    : 消息发送成功,topic:rj-string-topic,key:null,value:listener,timestamp:null,partition:0,offset:0

1.5 发送结果监听CompletableFuture

发送消息的send 方法返回 CompletableFuture<SendResult>。您可以向侦听器注册回调,以异步接收发送的结果.

CompletableFuture<SendResult<Integer, String>> future = template.send("topic-name", "msg data");
future.whenComplete((result, ex) -> {...
});

如果你希望阻塞发送线程等待结果,你可以调用 future 的 get() 方法;建议使用带有 timeout 的方法。如果你已经设置了 linger.ms,你可能希望在等待之前调用 flush(),或者为了方便起见,模板有一个带有 autoFlush 参数的构造函数,该参数会导致模板在每次发送时 flush()

  • 在使用 Kafka 生产者发送消息时,通常会得到一个表示发送任务的Future对象。如果调用future.get()方法,发送线程会被阻塞,直到发送结果返回。这意味着发送线程会暂停执行,等待消息成功发送到 Kafka 集群并获取结果。然而,直接使用get()方法可能会导致线程长时间阻塞,在实际应用中可能不太理想,所以建议使用带有超时参数的get(long timeout, TimeUnit unit)方法,这样可以在一定时间后如果还未获取到结果就不再等待,避免无限期阻塞。
  • linger.ms和flush
    • linger.ms属性
      • 当设置了linger.ms生产者属性时,生产者会在发送消息时等待一段时间,让更多的消息积累在一个批次中,以提高发送效率。如果在这段时间内积累了足够多的消息,生产者会将这些消息作为一个批次发送出去。
    • flush()方法
      • 如果希望立即发送部分批处理的消息而不是等待linger.ms指定的时间,可以调用flush()方法。这个方法会强制生产者立即发送当前缓冲区内的消息,而不管是否满足批次大小或等待时间的条件。
    • 带有autoFlush参数的构造函数
      • 为了方便起见,KafkaTemplate有一个带有autoFlush参数的构造函数。当设置autoFlushtrue时,每次发送消息后,模板会自动调用flush()方法,确保消息立即发送出去。这在需要立即确认消息发送的场景中非常有用,但可能会降低发送效率,因为每次发送都不会等待批次积累。

在使用 Kafka 生产者时,需要根据实际需求合理选择是否阻塞发送线程等待结果,以及是否使用flush()方法或带有autoFlush参数的构造函数来控制消息的发送时机。如果设置了linger.ms属性,并且需要在特定情况下立即发送部分批处理的消息,可以考虑调用flush()方法或使用带有autoFlush参数的构造函数。

linger.ms属性可以在配置文件当中进行配置.

@Bean
public Map<String, Object> producerConfig(){// ...// 配置linger.msmap.put(ProducerConfig.LINGER_MS_CONFIG, "500");return map;
}

SendResult 有两个属性:ProducerRecordRecordMetadata

public class SendResult<K, V> {// ProducerRecord是生产者发送消息时使用的数据结构private final ProducerRecord<K, V> producerRecord;// 当生产者成功发送消息后,会返回一个RecordMetadata对象private final RecordMetadata recordMetadata;public SendResult(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {this.producerRecord = producerRecord;this.recordMetadata = recordMetadata;}public ProducerRecord<K, V> getProducerRecord() {return this.producerRecord;}public RecordMetadata getRecordMetadata() {return this.recordMetadata;}@Overridepublic String toString() {return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";}}

1.5.1 ProducerRecord

  • 主题(Topic)

    • 确定消息的归属主题。Kafka 中的不同主题用于区分不同类型的消息流。例如,一个电商系统可能有 “订单主题”、“用户行为主题” 等。

    • 生产者通过指定主题将消息发送到 Kafka 集群中的相应主题

  • 分区(Partition)

    • 分区的目的是为了实现可扩展性和并行处理。Kafka 将一个主题的数据分布在多个分区上,不同的分区可以由不同的消费者或消费者组同时消费。

    • 可以手动指定消息发送到特定分区,通常根据消息的键或者特定的业务规则来决定分区。如果不指定,Kafka 会根据默认的分区策略来分配分区。

  • 键(Key)

    • 键在消息处理中有多种用途。一方面,它可以用于确定消息的分区。例如,基于键的哈希值来决定消息发送到哪个分区,这样可以确保具有相同键的消息被发送到同一个分区,方便后续的有序处理。

    • 键也可以在消费者端用于消息的分组和聚合。例如,在处理订单数据时,可以根据订单 ID 作为键,将同一订单的不同状态更新消息发送到同一个分区,方便消费者对同一订单的消息进行有序处理。

  • 值(Value)

    • 这是消息的实际内容,可以是任何可序列化的对象。例如,在一个日志系统中,值可以是一条日志记录;在电商系统中,值可以是一个订单对象或者用户行为事件。
  • 消息头部(Headers)

    • 消息头部提供了一种在消息中添加额外元数据的方式。这些元数据可以用于传递特定的业务信息或者用于消息的路由和处理。

    • 例如,可以在头部添加消息的来源系统、消息的类型、处理优先级等信息

1.5.2 RecordMetadata

  • 主题(Topic)

    • 确认消息最终被发送到的主题,与ProducerRecord中的主题相对应。这可以用于验证消息是否被发送到了正确的主题。
  • 分区(Partition)

    • 指示消息存储在哪个分区。在消费者端,可以根据分区信息来确定从哪个分区读取消息。

    • 对于需要对特定分区进行监控或管理的场景,分区信息非常重要。

  • 偏移量(Offset)

    • 偏移量是消息在分区中的唯一标识,它代表了消息在分区中的位置顺序。每个分区中的消息都有一个连续的偏移量。

    • 消费者通过偏移量来确定已经消费到了哪个位置,以便在下次消费时从正确的位置继续读取消息。

    • 偏移量也可以用于数据恢复和重新处理消息的场景。例如,如果消费者出现故障,在恢复时可以根据存储的偏移量重新开始消费。

  • 时间戳(Timestamp)

    • 时间戳可以由生产者在发送消息时指定,也可以由 Kafka 自动生成。时间戳可以用于基于时间的消息处理和查询。

    • 例如,可以根据时间戳来查询特定时间段内的消息,或者对消息进行时间序列分析

二、接收消息细节

本小节,我们着重介绍一下@KafkaListener这个注解的使用.

2.1 字段解释

@KafkaListener是 Spring Kafka 提供的用于监听 Kafka 主题消息的注解。

字段名称类型说明
idString为监听器指定一个唯一的标识符。如果不指定,会自动生成一个 ID。这个 ID 可以用于在代码中通过KafkaListenerEndpointRegistry来获取特定的监听器容器。此外,如果设置了这个值,并且idIsGroup()false或者同时设置了groupId(),那么这个 ID 将覆盖消费者工厂配置中的groupId属性。支持 SpEL(Spring Expression Language)表达式#{...}和属性占位符${...}
containerFactoryString指定用于创建消息监听器容器的KafkaListenerContainerFactory的 bean 名称。如果不指定,则使用默认的容器工厂(如果存在)。SpEL 表达式可以评估为容器工厂实例或 bean 名称。
topicsString[]指定监听器要监听的一个或多个 Kafka 主题名称。可以是主题名称、属性占位符键或 SpEL 表达式,表达式必须解析为主题名称。使用组管理,Kafka 会将分区分配给组内成员。与topicPattern()topicPartitions()互斥。
topicPatternString使用正则表达式指定要监听的 Kafka 主题模式。可以是主题模式、属性占位符键或 SpEL 表达式,表达式必须解析为主题模式(支持字符串或java.util.regex.Pattern结果类型)。使用组管理,Kafka 会将分区分配给组内成员。与topics()topicPartitions()互斥。
topicPartitionsTopicPartition[]当使用手动主题/分区分配时,指定监听器要监听的主题和分区。与topicPattern()topics()互斥。
containerGroupString如果提供了这个值,监听器容器将被添加到一个以这个值为名称、类型为Collection<MessageListenerContainer>的 bean 中。这允许例如遍历这个集合来启动/停止一部分容器。从版本 2.7.3 开始,这种集合 beans 已被弃用,在 2.8 版本中将被移除。取而代之的是,应该使用名称为containerGroup + ".group"且类型为org.springframework.kafka.listener.ContainerGroup的 bean。支持 SpEL 表达式和属性占位符。
errorHandlerString设置一个KafkaListenerErrorHandler bean 的名称,在监听器方法抛出异常时调用。如果是 SpEL 表达式,可以评估为KafkaListenerErrorHandler实例或 bean 名称。
groupIdString覆盖消费者工厂的group.id属性,仅针对这个监听器。支持 SpEL 表达式和属性占位符。
idIsGroupboolean如果groupId()未提供,当这个值为true时,使用id()(如果提供了)作为消费者的group.id属性;当这个值为false时,使用消费者工厂中的group.id属性。
clientIdPrefixString如果提供了这个值,将覆盖消费者工厂配置中的客户端 ID 属性。对于每个容器实例,会添加一个后缀(‘-n’)以确保在使用并发时的唯一性。支持 SpEL 表达式和属性占位符。
beanRefString一个伪 bean 名称,在这个注解中的 SpEL 表达式中用于引用定义这个监听器的当前 bean。这允许访问封闭 bean 中的属性和方法。默认值为’__listener’。
concurrencyString覆盖容器工厂的concurrency设置,针对这个监听器。可以是属性占位符或 SpEL 表达式,评估为一个Number,然后使用Number#intValue()获取值。
autoStartupString设置为truefalse,以覆盖容器工厂中的默认自动启动设置。可以是属性占位符或 SpEL 表达式,评估为BooleanString,然后使用Boolean#parseBoolean(String)获取值。
propertiesString[]Kafka 消费者属性,它们将覆盖消费者工厂中具有相同名称的任何属性(如果消费者工厂支持属性覆盖)。支持的语法与 JavaProperties文件中的键值对语法相同。group.idclient.id将被忽略。支持 SpEL 表达式和属性占位符。SpEL 表达式必须解析为StringString[]Collection<String>,其中数组或集合的每个成员是一个属性名+值,格式与上述语法相同。
splitIterablesboolean当为false且返回类型是Iterable时,将结果作为单个回复记录的值返回,而不是为每个元素创建单独的记录。默认值为true。如果回复类型是Iterable<Message<?>>,则忽略此设置。
contentTypeConverterString设置一个SmartMessageConverter(如CompositeMessageConverter)的 bean 名称,结合org.springframework.messaging.MessageHeaders#CONTENT_TYPE头来执行转换到所需类型。如果是 SpEL 表达式,可以评估为SmartMessageConverter实例或 bean 名称。
batchString覆盖容器工厂的batchListener属性。监听器方法签名应该接收一个List<?>。如果不设置,将使用容器工厂的设置。不支持 SpEL 和属性占位符,因为监听器类型不能是可变的。
filterString设置一个RecordFilterStrategy bean 名称,以覆盖在容器工厂上配置的策略。如果是 SpEL 表达式,可以评估为RecordFilterStrategy实例或 bean 名称。
infoString静态信息将作为一个头添加,键为org.springframework.kafka.support.KafkaHeaders#LISTENER_INFO。例如,可以在RecordInterceptorRecordFilterStrategy或监听器本身中用于任何目的。支持 SpEL 表达式和属性占位符,但必须解析为Stringbyte[]。如果使用输入记录的头创建出站记录,这个头将被剥离。
containerPostProcessorString设置一个ContainerPostProcessor的 bean 名称,允许在创建和配置监听器容器后对其进行自定义。这个后处理器仅应用于当前监听器容器,与ContainerCustomizer不同,后者应用于所有监听器容器。这个后处理器在容器自定义器(如果存在)之后应用。

2.2 获取Header的值

在监听消息的时候,可以通过Header(headers)的方式读取发送消息设置的headers的值.

 @KafkaListener(groupId = "rj", topics = Constants.Kafka.TOPIC_NAME)public void listen2(String msg, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){log.info("receive msg: {}, topic: {}", msg, topic);}

日志输出:

2024-09-24T20:20:57.501+08:00  INFO 1692 --- [rj-spring-kafka-demo] [ntainer#0-0-C-1] com.rj.kafka.listener.UserListener       : receive msg: hello, message!, topic: rj-topic

2.3 获取ConsumerRecord

ConsumerRecord是消费者从 Kafka 主题中读取消息时所使用的数据结构, 具体包含如下细节:

主题和分区信息

  • topic():
    • 返回消息所属的 Kafka 主题名称。
    • 主题是消息的逻辑分类,用于区分不同类型的消息流。例如,一个电商系统可能有 “订单主题”、“用户行为主题” 等。
    • 通过这个方法可以确定消息来自哪个主题,方便消费者根据不同的主题进行不同的处理逻辑。
  • partition()
    • 返回消息所在的分区编号。
    • Kafka 将一个主题的数据分布在多个分区上,不同的分区可以由不同的消费者或消费者组同时消费,以实现可扩展性和并行处理。
    • 了解消息所在的分区可以用于一些特定的场景,例如在需要对特定分区进行监控或管理时,或者在一些需要根据分区进行数据处理的情况下。

偏移量信息

  • offset()
    • 返回消息在所属分区中的偏移量。
    • 偏移量是消息在分区中的唯一标识,代表消息在分区中的位置顺序。随着新消息的不断写入,偏移量会不断递增。
    • 消费者通过偏移量来确定已经消费到了哪个位置,以便在下次消费时从正确的位置继续读取消息。偏移量也可以用于数据恢复和重新处理消息的场景。例如,如果消费者出现故障,在恢复时可以根据存储的偏移量重新开始消费。

键和值信息

  • key():
    • 返回消息的键。
    • 消息的键可以用于分区策略以及在一些场景下可以帮助消费者进行消息的有序处理。例如,基于键的哈希值来决定消息发送到哪个分区,这样可以确保具有相同键的消息被发送到同一个分区,方便后续的有序处理。
    • 键的类型通常是可序列化的对象,可以根据具体的业务需求来设置和使用。
  • value():
    • 返回消息的实际内容。
    • 这是消费者真正关心的消息数据,可以是任何可序列化的对象。例如,在一个日志系统中,值可以是一条日志记录;在电商系统中,值可以是一个订单对象或者用户行为事件。

时间戳信息

  • timestamp()
    • 返回消息的时间戳。
    • 时间戳可以由生产者在发送消息时指定,也可以由 Kafka 自动生成。时间戳可以用于基于时间的消息处理和查询。
    • 例如,可以根据时间戳来查询特定时间段内的消息,或者对消息进行时间序列分析。

headers

  • headers()
    • 返回一个Headers对象,其中包含了消息的头部信息。
    • 消息头部提供了一种在消息中添加额外元数据的方式。这些元数据可以用于传递特定的业务信息或者用于消息的路由和处理。
    • 例如,可以在头部添加消息的来源系统、消息的类型、处理优先级等信息。消费者可以通过读取头部信息来进行更加灵活的消息处理。

同样的操作,也可以在获取方法的参数内传入ConsumerRecord,从中获取需要的信息.操作如下所示:

   @KafkaListener(groupId = "rj", topics = Constants.Kafka.TOPIC_NAME)public void listen2(ConsumerRecord<String, String> record, String msg, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){log.info("receive record: {}", record);log.info("receive msg: {}, topic: {}", msg, topic);}

日志输出:

2024-09-24T20:34:47.759+08:00  INFO 11296 --- [rj-spring-kafka-demo] [ntainer#0-0-C-1] com.rj.kafka.listener.UserListener       : receive record: ConsumerRecord(topic = rj-topic, partition = 0, leaderEpoch = 0, offset = 10, CreateTime = 1727181287120, serialized key size = 2, serialized value size = 15, headers = RecordHeaders(headers = [], isReadOnly = false), key = rj, value = hello, message!)
2024-09-24T20:34:47.759+08:00  INFO 11296 --- [rj-spring-kafka-demo] [ntainer#0-0-C-1] com.rj.kafka.listener.UserListener       : receive msg: hello, message!, topic: rj-topic

2.4 Acknowledgment

在 Kafka 中,Acknowledgment(确认机制)主要用于确保消息被正确处理和持久化,以保证数据的可靠性。
在 Kafka 生产者发送消息和消费者处理消息的过程中,确认机制起着关键作用。它确保消息在不同阶段的成功处理,从而提高系统的可靠性和数据的完整性。如果没有适当的确认机制,可能会出现消息丢失或重复处理的情况,这在许多关键业务场景中是不可接受的。

生产者端的确认机制

  • acks
    • acks是生产者的一个重要配置参数,用于控制生产者在发送消息后等待多少个副本的确认。
      • 设置acks=0, 表示 生产者在将消息发送到 Kafka 服务器后,不会等待任何确认。这种设置提供了最低的延迟,但也意味着如果在消息发送后但在 Kafka 服务器成功存储之前发生故障,消息可能会丢失。适用于对延迟要求极高且可以容忍消息丢失的场景,例如实时日志收集,其中丢失少量日志不会对系统产生重大影响。
      • 设置acks=1表示只要领导者副本成功接收到消息并写入磁盘,生产者就会认为消息发送成功。这种设置提供了较低的延迟,但如果领导者副本在确认后出现故障,可能会导致消息丢失。如果领导者副本在确认后但在其他副本同步之前出现故障,可能会导致消息丢失。适用于对延迟有一定要求,但也需要一定程度可靠性的场景,例如一些实时数据分析系统,其中少量数据丢失可以通过后续的数据处理进行补偿。
      • 设置acks=all(或acks=-1)表示生产者需要等待所有同步副本都成功接收到消息并写入磁盘后才认为消息发送成功。这提供了最高的可靠性,但会增加延迟,因为生产者需要等待多个副本的确认。确保了即使领导者副本出现故障,消息也不会丢失,因为其他同步副本中仍然存在该消息。适用于对数据可靠性要求极高的场景,例如金融交易系统或关键业务数据的处理。

消费者端的确认机制

  • 自动提交和手动提交:
    • Kafka 消费者可以选择自动提交偏移量或手动提交偏移量。
    • 自动提交:消费者会定期自动提交已经消费的消息的偏移量。这种方式比较方便,但可能会导致在处理消息过程中出现故障时,已经消费的消息被重复处理。例如,如果消费者在自动提交偏移量后但在完成消息处理之前出现故障,重新启动后会从上次提交的偏移量开始消费,导致已经处理过的消息被再次处理。
    • 手动提交:消费者可以在处理完一批消息后手动提交偏移量。这提供了更精细的控制,可以确保在消息被正确处理后才提交偏移量,避免重复处理。手动提交可以在代码中通过调用commitSync()(同步提交)或commitAsync()(异步提交)方法来实现。
  • 提交偏移量的时机:
    • 在手动提交时,需要谨慎选择提交偏移量的时机。一般来说,可以在确认消息已经被成功处理并持久化到外部系统(如果需要)后再提交偏移量。
    • 例如,在一个数据处理管道中,消费者从 Kafka 读取消息,进行数据转换和存储到数据库中。只有在数据库存储成功后,才提交偏移量,以确保消息不会被重复处理。

确认机制与性能的权衡

  • 延迟和吞吐量
    • 较高的确认级别(如acks=all)和手动提交偏移量通常会增加延迟,因为生产者和消费者需要等待更多的确认。这可能会影响系统的整体性能和吞吐量。
    • 在设计系统时,需要根据具体的业务需求和性能要求来权衡可靠性和性能。如果业务对数据的可靠性要求非常高,可以选择更严格的确认机制,但可能需要接受较低的吞吐量和较高的延迟。如果性能是关键因素,可以适当降低确认级别或使用自动提交偏移量,但需要注意可能出现的数据丢失和重复处理的风险。

在接口 Acknowledgment当中的acknowledge()十分重要, 这个方法用于消费者确认已经成功处理了一条消息。当消费者调用这个方法时,Kafka 会记录该消费者对特定消息的处理确认,并且可以根据配置决定是否更新消费者的偏移量。
使用场景: 在手动提交偏移量的情况下,消费者通常在确认消息已经被成功处理后调用这个方法。例如,在一个数据处理管道中,消费者从 Kafka 读取消息,进行数据转换和存储到数据库中。只有在数据库存储成功后,才调用acknowledge()方法来确认消息的处理。正确使用这个方法可以确保消息不会被重复处理,同时也可以保证在出现故障时能够正确地恢复处理进度。如果消费者在处理消息后没有正确地确认,可能会导致消息被重复处理,或者在消费者出现故障后无法正确地恢复处理进度。

  @KafkaListener(groupId = "rj", topics = Constants.Kafka.TOPIC_NAME)public void listen2(ConsumerRecord<String, String> record, String msg, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Acknowledgment acknowledgment){log.info("receive record: {}", record);log.info("receive msg: {}, topic: {}", msg, topic);// 处理各种各样的业务逻辑之后,再进行消息确认.acknowledgment.acknowledge();}

三、总结

本文对Kafka的消息发送、接收当中常用的功能做了一些较为详细的分析.这些在实际开发当中较为常用. 下一篇着重介绍一下kafka当中有关集群的知识.

相关文章:

一文上手Kafka【中】

一、发送消息细节 在发送消息的特别注意: 在版本 3.0 中&#xff0c;以前返回 ListenableFuture 的方法已更改为返回 CompletableFuture。为了便于迁移&#xff0c;2.9 版本添加了一个方法 usingCompletableFuture&#xff08;&#xff09;&#xff0c;该方法为 CompletableFu…...

Ubuntu如何如何安装tcpdump

在Ubuntu上安装tcpdump非常简单&#xff0c;可以通过以下步骤完成&#xff1a; 打开终端。 更新包列表&#xff1a; 首先&#xff0c;更新你的包管理器的包列表&#xff1a; sudo apt update 安装tcpdump&#xff1a; 使用以下命令安装tcpdump&#xff1a; sudo apt install …...

3-3 AUTOSAR RTE 对SR Port的作用

返回总目录->返回总目录<- 一、前言 RTE作为SWC和BSW之间的通信机构,支持Sender-Receiver方式实现ECU内及ECU间的通信。 对于Sender-Receiver Port支持三种模式: 显式访问:若运行实体采用显示模式的S/R通信方式,数据读写是即时的;隐式访问:当多个运行实体需要读取…...

hive/impala/mysql几种数据库的sql常用写法和函数说明

做大数据开发的时候&#xff0c;会在几种库中来回跳&#xff0c;同一个需求&#xff0c;不同库函数和写法会有出入&#xff0c;在此做汇总沉淀。 1. hive 1. 日期差 DATEDIFF(CURRENT_DATE(),wdjv.creation_date) < 30 30天内的数据 2.impala 3. spark 4. mysql 1.时间差…...

论文阅读:LM-Cocktail: Resilient Tuning of Language Models via Model Merging

论文链接 代码链接 Abstract 预训练的语言模型不断进行微调,以更好地支持下游应用。然而,此操作可能会导致目标领域之外的通用任务的性能显著下降。为了克服这个问题,我们提出了LM Cocktail,它使微调后的模型在总体上保持弹性。我们的方法以模型合并(Model Merging)的形…...

8640 希尔(shell)排序

### 思路 希尔排序是一种基于插入排序的排序算法&#xff0c;通过将待排序数组分割成多个子序列分别进行插入排序来提高效率。初始增量d为n/2&#xff0c;之后每次减半&#xff0c;直到d为1。 ### 伪代码 1. 读取输入的待排序关键字个数n。 2. 读取n个待排序关键字并存储在数组…...

Linux 安装redis主从模式+哨兵模式3台节点

下载 https://download.redis.io/releases/ 解压 tar -zxvf redis-7.2.4.tar.gz -C /opt chmod 777 -R /opt/redis-7.2.4/安装 # 编译 make # 安装&#xff0c; 一定是大写PREFIX make PREFIX/opt/redis-7.2.4/redis/ install配置为系统服务 cd /etc/systemd/system/主服务…...

[BCSP-X2024.小高3] 学习计划

题目描述 暑假共有 n 天&#xff0c;第 i 天的精力指数为 a[i]&#xff0c;你想要利用假期依次&#xff08;按 1,2,...,m 顺序&#xff09;复习 m 门功课&#xff0c;第 i 门功课的重要程度为 b[i]&#xff0c;且每门的复习时段必须连 续&#xff0c;并且不能有某天不干事。 …...

Android Debug Bridge(ADB)完全指南

文章目录 前言一、什么是ADB&#xff1f;二、ADB的工作原理ADB由三个部分组成&#xff1a; 三、如何安装ADBWindows系统&#xff1a;macOS和Linux系统&#xff1a; 四、ADB常用指令大全设备相关操作1. 查看连接的设备&#xff1a;2. 重启设备&#xff1a;3. 进入Bootloader模式…...

再次重逢,愿遍地繁花

再次重逢&#xff0c;愿遍地繁花 我并不是一个对最终幻想7很热衷的粉丝&#xff0c;也并没有像那些评论区的大佬&#xff0c;能够轻易地说出整部世界的全貌。说到底&#xff0c;我只是一个看完了《最终幻想7&#xff1a;重制版》和《最终幻想7&#xff1a;重生》的爱好者罢了。…...

数据结构和算法基础(一)

文章目录 链表反转链表合并删除链表倒数第 n 个结点找链表的中间结点链表中环的检测排序算法递归 趁空闲时间刷一遍极客时间上王争的《数据结构与算法之美》课程&#xff0c;个人觉得写的很好&#xff0c;每章节由浅入深且从基础到引入设计类问题&#xff0c;如果写过很多代码想…...

【超长好文】网络安全从业者面试指南

文章为笔者偶然看到的github项目《网络安全面试指南》&#xff0c;作者FeeiCN&#xff0c;读完内容深感作者的用心&#xff0c;尽管一些观点因为时间原因与当下行情存在差异&#xff0c;但仍旧值得大家参考&#xff0c;希望能给大家在这行业寒冬带来一些启发&#xff0c;愿正在…...

基于大数据的高校新生数据可视化分析系统

作者&#xff1a;计算机学姐 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等&#xff0c;“文末源码”。 专栏推荐&#xff1a;前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏&#xff1a;…...

【cache】浅析四种常用的缓存淘汰算法 FIFO/LRU/LFU/W-TinyLFU

本文浅析淘汰策略与工作中结合使用、选取&#xff0c;并非针对算法本身如何实现的 文章目录 FIFOLFULRUW-TinyLFU实践与优化监控与调整 FIFO first input first output &#xff0c; 先进先出&#xff0c;即最早存入的元素最先取出&#xff0c; 典型数据结构代表&#xff1a;…...

STM32的DMA技术介绍

DMA&#xff08;Direct Memory Access&#xff0c;直接内存访问&#xff09; 是一种允许外设直接与系统内存进行数据传输&#xff0c;而无需经过CPU的技术。在STM32微控制器中&#xff0c;DMA技术极大地提高了数据传输效率&#xff0c;降低了CPU的负担&#xff0c;从而提升系统…...

C++11 多线程编程-小白零基础到手撕线程池

提示&#xff1a;文章 文章目录 前言一、背景二、 2.1 2.2 总结 前言 前期疑问&#xff1a; 本文目标&#xff1a; 一、背景 来源于b站视频 C11 多线程编程-小白零基础到手撕线程池 学习来源&#xff1a;https://www.bilibili.com/video/BV1d841117SH/?p2&spm_id_f…...

智源研究院与百度达成战略合作 共建AI产研协同生态

2024年9月24日&#xff0c;北京智源人工智能研究院&#xff08;简称“智源研究院”&#xff09;与北京百度网讯科技有限公司&#xff08;简称“百度”&#xff09;正式签署战略合作协议&#xff0c;双方将充分发挥互补优势&#xff0c;在大模型等领域展开深度合作&#xff0c;共…...

Flask-SQLAlchemy:在Flask应用中优雅地操作数据库

在Python的Web开发领域&#xff0c;Flask是一个备受欢迎的轻量级Web框架&#xff0c;它以简洁、灵活而著称。而当我们需要在Flask应用中与数据库进行交互时&#xff0c;Flask-SQLAlchemy就成为了一个强大而便捷的工具。它将Flask的简洁性与SQLAlchemy的强大数据库抽象能力完美结…...

智能巡检机器人 数据库

智能巡检机器人AI智能识别。无需人工。只需后台监控结果即可&#xff01;...

Spring AOP异步操作实现

在Spring框架中&#xff0c;AOP&#xff08;面向切面编程&#xff09;提供了一种非常灵活的方式来增强应用程序的功能。异步操作是现代应用程序中常见的需求&#xff0c;尤其是在处理耗时任务时&#xff0c;它可以帮助我们提高应用程序的响应性和吞吐量。Spring提供了一种简单的…...

【2006.07】UMLS工具——MetaMap原理深度解析

文献&#xff1a;《MetaMap: Mapping Text to the UMLS Metathesaurus》2006 年 7 月 14 日 https://lhncbc.nlm.nih.gov/ii/information/Papers/metamap06.pdf MetaMap&#xff1a;将文本映射到 UMLS 元数据库 总结 解决的问题 自动概念映射问题&#xff1a;解决如何将文本…...

ros2 colcon build 构建后,install中的local_setup.bash 和setup.bash有什么区别

功能概述 在 ROS2 中&#xff0c;colcon build是用于构建软件包的工具。构建完成后会生成install文件夹&#xff0c;其中的setup.bash和local_setup.bash文件都与环境设置相关&#xff0c;但存在一些区别。setup.bash 作用范围 setup.bash文件用于设置整个工作空间的环境变量。…...

Thymeleaf基础语法

Thymeleaf 是一种用于 Web 和非 Web 环境的现代服务器端 Java 模板引擎。它能够处理 HTML、XML、JavaScript、CSS 甚至纯文本。以下是 Thymeleaf 的一些基础语法&#xff1a; 1. 变量表达式 <!-- 显示变量的值 --> <p th:text"${name}">Default Name&l…...

spring cloud alibaba学习路线

以下是一条学习Spring Cloud Alibaba的路线&#xff1a; 一、基础前置知识 1. Java基础 熟练掌握Java语言特性&#xff0c;包括面向对象编程、集合框架、多线程等知识。 2. Spring和Spring Boot基础深入理解Spring框架&#xff0c;如依赖注入&#xff08;DI&#xff09;、控…...

基于 Seq2Seq 的中英文翻译项目(pytorch)

项目简介 本项目旨在使用 PyTorch 构建一个基于 Seq2Seq(编码器-解码器架构)的中英文翻译模型。我们将使用双语句子对的数据进行训练,最终实现一个能够将英文句子翻译为中文的模型。项目的主要步骤包括: 数据预处理:从数据集中提取英文和中文句子,并进行初步清洗和保存。…...

部标主动安全(ADAS+DMS)对接说明

1.前言 上一篇介绍了部标&#xff08;JT/T1078&#xff09;流媒体对接说明&#xff0c;这里说一下如何对接主动安全附件服务器。 流媒体的对接主要牵扯到4个方面&#xff1a; &#xff08;1&#xff09;平台端&#xff1a;业务端系统&#xff0c;包含前端呈现界面。 &#x…...

C++ STL(1)迭代器

文章目录 一、迭代器详解1、迭代器的定义与功能2、迭代器类型3、示例4、迭代器失效4.1、vector 迭代器失效分析4.2、list 迭代器失效分析4.3、set 与 map 迭代器失效分析 5、总结 前言&#xff1a; 在C标准模板库&#xff08;STL&#xff09;中&#xff0c;迭代器是一个核心概念…...

uview表单校验不生效问题

最近几次使用发现有时候会不生效&#xff0c;具体还没排查出来什么原因&#xff0c;先记录一下解决使用方法 <u--formlabelPosition"top"labelWidth"auto":model"form":rules"rules"ref"uForm" ><view class"…...

前端开发设计模式——单例模式

目录 一、单例模式的定义和特点&#xff1a; 1.定义&#xff1a; 2.特点&#xff1a; 二、单例模式的实现方式&#xff1a; 1.立即执行函数结合闭包实现&#xff1a; 2.ES6类实现&#xff1a; 三、单例模式的应用场景 1.全局状态管理&#xff1a; 2.日志记录器&#xff1a; …...

行情叠加量化,占据市场先机!

A股久违的3000点&#xff0c;最近都没有更新&#xff0c;现在终于对我们的市场又来点信息。相信在座的朋友这几天都是喜笑颜开&#xff0c;对A股又充满信心。当前行情好起来了&#xff0c;很多朋友又开始重回市场&#xff0c;研究股票学习量化&#xff0c;今天我们给大家重温下…...

wordpress仿链家/郑州seo技术顾问

我浏览了一个关于使用Tkinter的教程&#xff0c;看到了以下代码&#xff1a;>>> from Tkinter import *>>> winTk()这将生成一个标题为Tk的框&#xff0c;而没有其他内容。但是&#xff0c;当我尝试此代码时&#xff0c;不会出现这样的框。我没有发现任何错误…...

微信公众号网站导航怎么做/天津网站排名提升多少钱

我们团队开发了一些小巧有用的工具来满足客户的某些特殊需求。现在我们把这些工具广而告之&#xff0c;希望对大家能有用。说明一下&#xff0c;这些工具不是产品的功能&#xff0c;风险自担。基于Horizon虚拟桌面的盲水印更改全屏虚拟桌面的分辨率清除空闲的horizon桌面会话Wr…...

手机营销型网站建设/手游代理加盟哪个平台最强大

我正在为数据库实现一个漂亮的基本搜索引擎&#xff0c;用户可能会在其中包含不同类型的信息。搜索本身由几个联合组成&#xff0c;选择结果总是合并到3列中。然而&#xff0c;返回的数据正在从不同的表中获取。每个查询使用$ term进行匹配&#xff0c;我将其绑定到“&#xff…...

公司网站建设需要提供什么材料/百度输入法

需求是催生项目和推进项目的不竭动力。 背景&#xff1a; 最近&#xff0c;因为媳妇要做个B超检查&#xff0c;想着去大医院查查应该更放心&#xff0c;所以就把目标瞄准在A医院。早已耳闻A院一号难求万人空巷&#xff0c;所以把所有能接触到的机会都看了一遍&#xff0c;线下听…...

江苏常州武进区建设局网站/网站seo标题是什么意思

只是把随时随地所思所想赶快记录下来&#xff0c;没有别的用意和价值一、大数据有哪些我们过去常用的数据存储是关系型数据库&#xff0c;因而也诞生了三大关系型数据库巨头&#xff1a;MSSQL、Oracle、MySQL。至于DB2、informix、Sybase另外说。大数据是从NoSQL兴起的。NoSQL最…...

做网站所需要的代码6/流量平台有哪些

前面在“数字调制系列&#xff1a;IQ 基本理论”一文中介绍了 IQ 的概念、常用数字调制方式及映射星座图等内容&#xff0c;当完成数字比特流到 IQ 坐标系的映射后&#xff0c;便可以得到数字 I 和 Q 信号&#xff0c;然后分别经过 DAC 变换为模拟 I 和 Q 信号&#xff0c;最后…...