宝丰网站制作公司/seo代码优化
前言
1. 高吞吐量(High Throughput)
Kafka 设计的一个核心特性是高吞吐量。它能够每秒处理百万级别的消息,适合需要高频次、低延迟消息传递的场景。即使在大规模分布式环境下,它也能保持很高的吞吐量和性能,支持低延迟的数据传输。
2. 可扩展性(Scalability)
Kafka 具有强大的可扩展性。它支持水平扩展,可以轻松增加更多的节点来处理更多的数据流量。Kafka 通过分区机制(Partitioning)和分布式架构,确保即使数据量剧增,也能够平滑地扩展。
-
分区:消息被划分成多个分区(partition),每个分区可以独立存储和读取数据,从而支持并行处理。
-
副本:Kafka 会将每个分区的数据复制到多个节点上,以确保高可用性和容错性。
3. 高可用性和容错性(Fault Tolerance)
Kafka 提供了内建的高可用性和容错机制。它通过将消息复制到多个代理(broker)上来确保数据的可靠性,即使部分节点出现故障,数据仍然可以从其他副本中恢复。这种机制使得 Kafka 能够承受硬件故障而不会丢失数据。
4. 持久化和日志存储(Durability and Log Storage)
Kafka 的消息是持久化存储的,意味着消息会被写入磁盘并按日志顺序存储。这使得 Kafka 不仅可以作为一个消息队列,还能用作长期的日志存储系统,能够回溯历史数据。消息默认保留在 Kafka 中一段时间,消费者可以根据需要按需读取。
5. 低延迟(Low Latency)
Kafka 的设计能够提供低延迟的数据传输,特别适合于实时流处理的应用场景。Kafka 能够以毫秒级的延迟来传输大量数据,这对许多实时数据处理应用至关重要,比如实时分析、监控系统等。
6. 强大的消费模型(Consumer Model)
Kafka 提供了灵活的消费模型,允许消费者以不同的方式读取消息:
-
发布/订阅模式:多个消费者可以同时订阅相同的主题(Topic),消息将被广播给所有消费者。
-
点对点模式:消费者组(Consumer Group)机制允许多个消费者协调工作,处理不同的消息分区。
-
消息偏移量:Kafka 维护每个消费者的消息偏移量(offset),消费者可以从任意位置开始读取消息,支持灵活的消息消费和重播。
7. 分布式和横向扩展(Distributed and Horizontal Scaling)
Kafka 本身是一个分布式系统,设计上支持横向扩展。随着系统需求的增加,可以简单地增加更多的 Kafka 节点,分区和副本会在节点之间自动重新平衡,确保负载均匀分布和高可用性。
8. 支持流处理(Stream Processing)
Kafka 不仅是一个消息队列,也可以作为流处理平台。通过 Kafka Streams API,可以对流数据进行实时处理(例如聚合、过滤、连接等)。此外,Kafka 可以与其他流处理框架(如 Apache Flink、Apache Spark)集成,构建复杂的数据处理管道。
9. 灵活的集成能力(Integration Capabilities)
Kafka 被广泛集成到各种技术栈中,包括传统数据库、数据仓库、大数据系统、微服务架构等。Kafka 的强大支持让它成为企业架构中的核心组件。
-
Kafka Connect:Kafka Connect 是 Kafka 官方提供的一个框架,用于将 Kafka 与外部系统(如数据库、文件系统、Hadoop、Elasticsearch 等)进行连接。它简化了系统之间的数据同步和集成。
10. 广泛的社区支持和生态系统
Kafka 拥有活跃的开源社区和庞大的生态系统。它得到了许多企业的广泛使用,拥有大量的插件和扩展工具,可以满足各种需求。Kafka 的生态系统包括但不限于:
-
Kafka Streams:用于实时数据流处理。
-
Kafka Connect:用于与外部系统集成。
-
KSQL:用于在 Kafka 上执行 SQL 查询的流处理工具。
11. 使用场景:
Kafka 被广泛应用于以下场景:
-
日志收集与传输:Kafka 可以作为一个日志聚合平台,将来自不同服务或系统的日志收集起来并传输到中央日志分析平台。
-
实时数据处理:Kafka 用于支持实时数据处理和流计算,例如监控系统、推荐系统、用户行为分析等。
-
事件驱动架构:Kafka 非常适合于事件驱动的微服务架构,帮助微服务之间实现解耦和异步通信。
-
数据管道:Kafka 作为一个高效的数据管道,广泛用于将不同系统中的数据实时传输到数据仓库、数据湖或分析平台。
总结:
Kafka 是一个高性能、可扩展、容错且持久化的消息队列系统,非常适合处理大规模的实时数据流。它在大数据流处理、日志聚合、微服务架构、事件驱动架构等多个领域都有广泛应用。如果你的系统需要处理高吞吐量、低延迟、可扩展的消息传递和实时数据流处理,那么 Kafka 是一个非常合适的选择。
使用教程
1.导入依赖
<!-- Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2.导入配置
# Spring
spring:kafka:producer:# Kafka服务器bootstrap-servers: 你自己的kafka服务器地址# 开启事务,必须在开启了事务的方法中发送,否则报错transaction-id-prefix: kafkaTx-# 发生错误后,消息重发的次数,开启事务必须设置大于0。retries: 3# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。# 开启事务时,必须设置为allacks: all# 当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 生产者内存缓冲区的大小。buffer-memory: 1024000# 键的序列化方式key-serializer: org.springframework.kafka.support.serializer.JsonSerializer# 值的序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)value-serializer: org.springframework.kafka.support.serializer.JsonSerializerconsumer:# Kafka服务器bootstrap-servers: 你自己的kafka服务器地址group-id: firstGroup# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D#auto-commit-interval: 2s# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)# none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: latest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: false# 键的反序列化方式#key-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 值的反序列化方式(建议使用Json,这种序列化方式可以无需额外配置传输实体类)value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer# 配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要properties:spring:json:trusted:packages: "*"# 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。# 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,# 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,# 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。# 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数# 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况max-poll-records: 3properties:# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancemax:poll:interval:ms: 600000# 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10ssession:timeout:ms: 10000listener:# 在侦听器容器中运行的线程数,一般设置为 机器数*分区数concurrency: 4# 自动提交关闭,需要设置手动消息确认ack-mode: manual_immediate# 消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误missing-topics-fatal: false# 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepoll-timeout: 600000
3.config文件(一共4个,已经封装好直接使用即可)
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;/*** kafka配置,也可以写在yml,这个文件会覆盖yml*/
@SpringBootConfiguration
public class KafkaConsumerConfig {@Value("${spring.kafka.consumer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.consumer.enable-auto-commit}")private boolean enableAutoCommit;@Value("${spring.kafka.properties.session.timeout.ms}")private String sessionTimeout;@Value("${spring.kafka.properties.max.poll.interval.ms}")private String maxPollIntervalTime;@Value("${spring.kafka.consumer.max-poll-records}")private String maxPollRecords;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;@Value("${spring.kafka.listener.concurrency}")private Integer concurrency;@Value("${spring.kafka.listener.missing-topics-fatal}")private boolean missingTopicsFatal;@Value("${spring.kafka.listener.poll-timeout}")private long pollTimeout;@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>(16);propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);//是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);//自动提交的时间间隔,自动提交开启时生效propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000");//该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理://earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费分区的记录//latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据(在消费者启动之后生成的记录)//none:当各分区都存在已提交的offset时,从提交的offset开始消费;只要有一个分区不存在已提交的offset,则抛出异常propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);//两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalancepropsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalTime);//这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。//这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,//如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,//然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。//要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数//注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);//当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10spropsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);//序列化(建议使用Json,这种序列化方式可以无需额外配置传输实体类)propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);return propsMap;}@Beanpublic ConsumerFactory<Object, Object> consumerFactory() {//配置消费者的 Json 反序列化的可信赖包,反序列化实体类需要try (JsonDeserializer<Object> deserializer = new JsonDeserializer<>()) {deserializer.trustedPackages("*");return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new JsonDeserializer<>(), deserializer);}}/*** KafkaListenerContainerFactory 用来做消费者的配置* @return*/@Beanpublic KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());//在侦听器容器中运行的线程数,一般设置为 机器数*分区数factory.setConcurrency(concurrency);//消费监听接口监听的主题不存在时,默认会报错,所以设置为false忽略错误factory.setMissingTopicsFatal(missingTopicsFatal);// 自动提交关闭,需要设置手动消息确认factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);factory.getContainerProperties().setPollTimeout(pollTimeout);// 设置为批量监听,需要用List接收// factory.setBatchListener(true);return factory;}
}
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.transaction.KafkaTransactionManager;import java.util.HashMap;
import java.util.Map;@SpringBootConfiguration
public class KafkaProviderConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.transaction-id-prefix}")private String transactionIdPrefix;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.retries}")private String retries;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.buffer-memory}")private String bufferMemory;@Beanpublic Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>(16);props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);//acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。//acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。//acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。//开启事务必须设为allprops.put(ProducerConfig.ACKS_CONFIG, acks);//发生错误后,消息重发的次数,开启事务必须大于0props.put(ProducerConfig.RETRIES_CONFIG, retries);//当多个消息发送到相同分区时,生产者会将消息打包到一起,以减少请求交互. 而不是一条条发送//批次的大小可以通过batch.size 参数设置.默认是16KB//较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。//比如说,kafka里的消息5秒钟Batch才凑满了16KB,才能发送出去。那这些消息的延迟就是5秒钟//实测batchSize这个参数没有用props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);//有的时刻消息比较少,过了很久,比如5min也没有凑够16KB,这样延时就很大,所以需要一个参数. 再设置一个时间,到了这个时间,//即使数据没达到16KB,也将这个批次发送出去props.put(ProducerConfig.LINGER_MS_CONFIG, "5000");//生产者内存缓冲区的大小props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);//反序列化,和生产者的序列化方式对应props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return props;}@Beanpublic ProducerFactory<Object, Object> producerFactory() {DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(producerConfigs());//开启事务,会导致 LINGER_MS_CONFIG 配置失效factory.setTransactionIdPrefix(transactionIdPrefix);return factory;}@Beanpublic KafkaTransactionManager<Object, Object> kafkaTransactionManager(ProducerFactory<Object, Object> producerFactory) {return new KafkaTransactionManager<>(producerFactory);}@Beanpublic KafkaTemplate<Object, Object> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Component;/*** kafka消息发送回调*/
@Component
public class KafkaSendResultHandler implements ProducerListener<Object, Object> {@Overridepublic void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {System.out.println("消息发送成功:" + producerRecord.toString());}@Overridepublic void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {System.out.println("消息发送失败:" + producerRecord.toString() + exception.getMessage());}
}
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;/*** 异常处理*/
@Component
public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler {@Override@NonNullpublic Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception) {return new Object();}@Override@NonNullpublic Object handleError(@NonNull Message<?> message, @NonNull ListenerExecutionFailedException exception,Consumer<?, ?> consumer) {System.out.println("消息详情:" + message);System.out.println("异常信息::" + exception);System.out.println("消费者详情::" + consumer.groupMetadata());System.out.println("监听主题::" + consumer.listTopics());// TODO 消费记录return KafkaListenerErrorHandler.super.handleError(message, exception, consumer);}
}
4.代码实现
生产者:
@Autowiredprivate KafkaTemplate<Object,Object> kafkaTemplate;public void test() {// 生成一个随机的 UUID 字符串String message = UUID.randomUUID().toString();// 使用 KafkaTemplate 将消息发送到 Kafka 中的 "test" 主题// KafkaTemplate.send() 方法的第一个参数是目标主题名,第二个参数是要发送的消息内容kafkaTemplate.send("test", message);// 发送成功后,Kafka 会异步处理消息,返回一个 Future 对象, // 如果需要进一步处理发送成功与否的回调,可以通过该对象的回调接口进行处理}
消费者:
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;@Component
@Log4j2
public class Consumer {@Autowiredprivate StringRedisTemplate redisTemplate;private static final String REDIS_SET_KEY = "test";// Kafka 监听器配置@KafkaListener(topics = "test",containerFactory = "kafkaListenerContainerFactory", errorHandler = "myKafkaListenerErrorHandler")public void consumer(ConsumerRecord<Object, Object> consumerRecord, Acknowledgment acknowledgment) {String key = (String) consumerRecord.key();Object value = consumerRecord.value();// 记录消费的基本信息,便于追踪log.info("开始消费消息,Topic: {}, Partition: {}, Offset: {}, 消费内容: {}", consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset(), value);try {// Redis 去重,确保消息没有重复消费Long result = redisTemplate.opsForSet().add(REDIS_SET_KEY, key);// 如果 Redis 返回 1,表示该 key 尚未消费,才进行后续处理if (result != null && result == 1) {// 处理业务逻辑// 手动提交偏移量acknowledgment.acknowledge();log.info("消费成功,消费的信息: {}", value);} else {log.info("消息已消费过,跳过处理。key: {}", key);acknowledgment.acknowledge();}} catch (Exception e) {log.error("消费失败,错误信息: {}", e.getMessage(), e);// 如果发生异常,进行重试处理acknowledgment.nack(1000); // 可配置重试时间}}// 可选:定时清理 Redis 集合中的已消费记录@Scheduled(fixedDelay = 3600000) // 每小时清理一次public void cleanUpRedis() {// 可以根据消息处理的需要调整清理策略redisTemplate.delete(REDIS_SET_KEY);log.info("已清理 Redis 中的消费记录");}
}
相关文章:

SprinBoot整合KafKa的使用(详解)
前言 1. 高吞吐量(High Throughput) Kafka 设计的一个核心特性是高吞吐量。它能够每秒处理百万级别的消息,适合需要高频次、低延迟消息传递的场景。即使在大规模分布式环境下,它也能保持很高的吞吐量和性能,支持低延…...

【机器学习】CatBoost 模型实践:回归与分类的全流程解析
一. 引言 本篇博客首发于掘金 https://juejin.cn/post/7441027173430018067。 PS:转载自己的文章也算原创吧。 在机器学习领域,CatBoost 是一款强大的梯度提升框架,特别适合处理带有类别特征的数据。本篇博客以脱敏后的保险数据集为例&#x…...

PyTorch 实现动态输入
使用 PyTorch 实现动态输入:支持训练和推理输入维度不一致的 CNN 和 LSTM/GRU 模型 在深度学习中,处理不同大小的输入数据是一个常见的挑战。许多实际应用需要模型能够灵活地处理可变长度的输入。本文将介绍如何使用 PyTorch 实现支持动态输入的 CNN 和…...

【Linux相关】查看conda路径和conda和cudnn版本、安装cudnn、cuDNN无需登录官方下载链接
【Linux相关】 查看conda路径和conda和cudnn版本 安装cudnn cuDNN无需登录官方下载链接 文章目录 1. 查看信息1.1 查看 Conda 路径1.2 查看 Conda 版本1.3 查看 cuDNN 版本1.4 总结 2. 安装cudnn2.1 安装cudnn步骤2.2 cuDNN无需登录官方下载链接 1. 查看信息 查看Conda 路径、C…...

基于Java Springboot环境保护生活App且微信小程序
一、作品包含 源码数据库设计文档万字PPT全套环境和工具资源部署教程 二、项目技术 前端技术:Html、Css、Js、Vue、Element-ui 数据库:MySQL 后端技术:Java、Spring Boot、MyBatis 三、运行环境 开发工具:IDEA/eclipse 微信…...

简单的springboot使用sse功能
什么是sse? 1、SSE 是Server-Sent Events(服务器发送事件) 2、SSE是一种允许服务器主动向客户端推送实时更新的技术。 3、它基于HTTP协议,并使用了其长连接特性,在客户端与服务器之间建立一条持久化的连接。 通过这条连接&am…...

【服务器问题】xshell 登录远程服务器卡住( 而 vscode 直接登录不上)
打开 xshell ssh 登录远程服务器:卡在下面这里,迟迟不继续 当 SSH 连接卡在 Connection established. 之后,但没有显示远程终端提示符时,这通常意味着连接已经成功建立,说明不是网络连接和服务器连接问题,…...

AI×5G 市场前瞻及应用现状
本文为《5GAI时代:生活方式和市场的裂变》一书读后总结及研究。 本书的上架建议是“经营”,内容也更偏向于市场分析。书出版于2021年,现在是2024年,可以收集整理一些例子,看看书里的前瞻性5GAI应用预测,到…...

利用 Redis 与 Lua 脚本解决秒杀系统中的高并发与库存超卖问题
1. 前言 1.1 秒杀系统中的库存超卖问题 在电商平台上,秒杀活动是吸引用户参与并提升销量的一种常见方式。秒杀通常会以极低的价格限量出售某些商品,目的是制造紧迫感,吸引大量用户参与。然而,这种活动的特殊性也带来了许多技术挑…...

【MySQL】创建数据库、用户和密码
创建数据库、用户和密码参考sql语句 drop database if exists demoshop; drop user if exists demoshop%; -- 支持emoji:需要mysql数据库参数: character_set_serverutf8mb4 create database demoshop default character set utf8mb4 collate utf8mb4_un…...

leetcode hot100【Leetcode 72.编辑距离】java实现
Leetcode 72.编辑距离 题目描述 给定两个单词 word1 和 word2,返回将 word1 转换为 word2 所使用的最少操作数。 你可以对一个单词执行以下三种操作之一: 插入一个字符删除一个字符替换一个字符 示例 1: 输入: word1 "horse", word2 &…...

腾讯阅文集团Java后端开发面试题及参考答案
Java 的基本数据类型有哪些?Byte 的数值范围是多少? Java 的基本数据类型共有 8 种,可分为 4 类: 整数类型:包括 byte、short、int 和 long。byte 占 1 个字节,其数值范围是 - 128 到 127,用于表示较小范围的整数,节省内存空间,在处理一些底层的字节流数据或对内存要求…...

protobuf实现Hbase数据压缩
目录 前置HBase数据压缩效果获取数据(反序列化) 前置 安装说明 使用说明 HBaseDDL和DML操作 HBase数据压缩 问题 在上文的datain中原文 每次写入数据会写入4个单元格的内容,现在希望能对其进行筛减,合并成1格,减少存储空间(序列…...

论文阅读之方法: Single-cell transcriptomics of 20 mouse organs creates a Tabula Muris
The Tabula Muris Consortium., Overall coordination., Logistical coordination. et al. Single-cell transcriptomics of 20 mouse organs creates a Tabula Muris. Nature 562, 367–372 (2018). 论文地址:https://doi.org/10.1038/s41586-018-0590-4 代码地址…...

PHP语法学习(第三天)
老规矩,先回顾一下昨天学习的内容 PHP语法学习(第二天) 主要学习了PHP变量、变量的作用域、以及参数作用域。 今天由Tom来打开新的篇章 文章目录 echo 和 print 区别PHP echo 语句实例 PHP print 语句实例 PHP 数组创建数组利用array() 函数 数组的类型索引数组关联…...

PostgreSQL添加PostGIS扩展和存储坐标
一、安装 1、PostGIS安装:Getting Started | PostGIS 2、安装好后,执行下面sql CREATE EXTENSION postgis;SELECT PostGIS_Full_Version(); 二、使用 PostGIS文档:PostGIS 简介 — Introduction to PostGIS 建表: CREATE TAB…...

Flink四大基石之State(状态) 的使用详解
目录 一、有状态计算与无状态计算 (一)概念差异 (二)应用场景 二、有状态计算中的状态分类 (一)托管状态(Managed State)与原生状态(Raw State) 两者的…...

Linux中dos2unix详解
dos2unix 是一个用于将文本文件从DOS/Windows格式转换为Unix/Linux格式的工具。在不同的操作系统中,文本文件中的换行符表示方式是不一样的。具体来说: 在DOS和Windows系统中,换行由两个字符组成:回车(Carriage Retur…...

MySQL MVCC 介绍
MVCC(Multi-Version Concurrency Control)是一种并发控制机制,用于在多个并发事务同时读写数据库时保持数据的一致性和隔离性。MVCC通过在每个数据行上维护多个版本的数据来实现。当一个事务要对数据库中的数据进行修改时,MVCC不会…...

Linux篇之日志管理工具Logrotate介绍并结合crontab使用
1. Logrotate介绍 logrotate 是一个用于管理和轮换日志文件的工具,通常用于 Unix 和 Linux 系统。它可以自动化日志文件的轮换、压缩、删除和邮寄等操作,确保日志文件不会无限制地增长,占用过多的磁盘空间。 2. 主要功能 轮换:定期将日志文件移动到备份目录,并生成新的…...

Vulnhub靶场 Matrix-Breakout: 2 Morpheus 练习
目录 0x00 准备0x01 主机信息收集0x02 站点信息收集0x03 漏洞查找与利用1. 文件上传2. 提权 0x04 总结 0x00 准备 下载连接:https://download.vulnhub.com/matrix-breakout/matrix-breakout-2-morpheus.ova 介绍: This is the second in the Matrix-Br…...

秒杀项目 超卖问题 详解
秒杀项目中的超卖问题详解 秒杀场景是一种高并发场景,用户在短时间内大量涌入抢购有限的商品。超卖问题指的是由于系统设计不合理,导致实际售出的商品数量超过库存数量。 1. 为什么会出现超卖问题? 超卖问题通常由以下原因引发:…...

Linux系统编程之进程控制
概述 在Linux系统中,创建一个新的进程后,如何对该进程进行有效的控制,是一项非常重要的操作。控制进程状态的操作主要包括:进程的执行、进程的等待、进程的终止等。下面,我们将逐个进行介绍。 进程的执行 创建进程后&a…...

集合的相关性质与定义
集合 集合 集合描述了一组对象的集合,而映射描述了集合之间的对应关系。 集合 集合是由一组无序的,互不相同的对象组成的整体,集合中的对象称为元素或成员。集合可以用大括号{}表示,元素之间用逗号进行分隔。 定义: 集合 A …...

pytest自定义命令行参数
实际使用场景:pytest运行用例的时候,启动mitmdump进程试试抓包,pytest命令行启动的时候,传入mitmdump需要的参数(1)抓包生成的文件地址 (2)mitm的proxy设置 # 在pytest的固定文件中…...

c++预编译头文件
文章目录 c预编译头文件1.使用g编译预编译头文件2.使用visual studio进行预编译头文件2.1visual studio如何设置输出预处理文件(.i文件)2.2visual studio 如何设置预编译(初始创建空项目的情况下)2.3 visual studio打开输出编译时…...

YOLOv8模型pytorch格式转为onnx格式
一、YOLOv8的Pytorch网络结构 model DetectionModel((model): Sequential((0): Conv((conv): Conv2d(3, 64, kernel_size(3, 3), stride(2, 2), padding(1, 1))(act): SiLU(inplaceTrue))(1): Conv((conv): Conv2d(64, 128, kernel_size(3, 3), stride(2, 2), padding(1, 1))(a…...

电子课程开发中的典型误区
创建一个有效的电子课程需要仔细的规划和执行,但常见的错误可能会破坏其成功。以下是开发人员应该避免的一些典型陷阱: 1.缺乏明确的目标 如果没有明确的学习目标,课程可能会缺乏重点,让学习者不确定自己应该实现什么。明确、可衡…...

Docker 逃逸突破边界
免责声明 本博客文章仅供教育和研究目的使用。本文中提到的所有信息和技术均基于公开来源和合法获取的知识。本文不鼓励或支持任何非法活动,包括但不限于未经授权访问计算机系统、网络或数据。 作者对于读者使用本文中的信息所导致的任何直接或间接后果不承担任何…...

残差连接,就是当某一偏导等于0时,加上x偏导就是1,这样乘以1保证不失效
目录 残差连接,就是当某一偏导等于0时,加上x偏导就是1,这样乘以1保证不失效 残差连接中F(x)一般代表什么,将F(x)变为F(x) +x,这样不是改变了函数 本身的性质 F(x)=F(x) +x F(x)偏导若==0;偏导连乘就是0,这样就梯度消失了 F(x) +x;求偏导时x导数是1,保证不丢失F(x)…...