当前位置: 首页 > news >正文

golang kafka sarama源码分析

一些理论

1.topic支持多分区,每个分区只能被组内的一个消费者消费,一个消费者可能消费多个分区的数据;
2.消费者组重平衡的分区策略,是由消费者自己决定的,具体是从消费者组中选一个作为leader进行分区方案分配;
3.每条消息都有一个唯一的offset,kafka保证单个分区的消息有序,因为每个分区的消息是按顺序写入的,消费者是按offset拉取;
4.自动提交和手动提交,自动提交是指 sdk 开启了一个协程,定时自动提交已经标记处理的消息的offset,而不是说拉到消息就自动提交;手动提交则需要业务代码手动提交offset;如果是每消费一条消息再手动提交一次,这样是同步操作,会降低消费者消费速度,可以考虑批量处理多个消息再一起提交;
5.消费模式,kafka是拉模式,消费者定时从kafka拉取消息;
6.服务发布更新、重启、k8s中pod扩缩容 会导致消费者组内消费者成员数量发生变化,进而发生消费者组重平衡,重平衡期间stw消费者组短暂停止拉取拉取 会导致消息堆积,这种重平衡无法避免,stw时间取决于服务升级期间的耗时

源码分析

消费者组接口

type ConsumerGroup interface {Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) errorErrors() <-chan errorClose() error
}type ConsumerGroupHandler interface {Setup(ConsumerGroupSession) errorCleanup(ConsumerGroupSession) errorConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}type ConsumerGroupClaim interface {Topic() stringPartition() int32InitialOffset() int64HighWaterMarkOffset() int64Messages() <-chan *ConsumerMessage
}type ConsumerGroupSession interface {Claims() map[string][]int32MemberID() stringGenerationID() int32MarkOffset(topic string, partition int32, offset int64, metadata string)Commit()ResetOffset(topic string, partition int32, offset int64, metadata string)MarkMessage(msg *ConsumerMessage, metadata string)Context() context.Context
}

消息拉取

可以在一个请求拉多个分区的数据,然后按分区分类

func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {// 。。。go withRecover(bc.subscriptionManager)go withRecover(bc.subscriptionConsumer)return bc
}response, err := bc.fetchNewMessages()func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {request := &FetchRequest{MinBytes:    bc.consumer.conf.Consumer.Fetch.Min,MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),}// 。。。for child := range bc.subscriptions {request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)fmt.Printf("fetchNewMessages topic=%s partition=%d offset=%d fetchSize=%d\n",child.topic, child.partition, child.offset, child.fetchSize)}return bc.broker.Fetch(request)
}//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
func (bc *brokerConsumer) subscriptionConsumer() {<-bc.wait // wait for our first piece of workfor newSubscriptions := range bc.newSubscriptions {bc.updateSubscriptions(newSubscriptions)if len(bc.subscriptions) == 0 {// We're about to be shut down or we're about to receive more subscriptions.// Either way, the signal just hasn't propagated to our goroutine yet.<-bc.waitcontinue}response, err := bc.fetchNewMessages()fmt.Printf("[%s]subscriptionConsumer.fetchNewMessages...\n", time.Now())if err != nil {Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)bc.abort(err)return}bc.acks.Add(len(bc.subscriptions))for child := range bc.subscriptions {// 每个分区处理器都写入fmt.Printf("subscriptionConsumer write %s %d %+v\n", child.topic, child.partition, response)child.feeder <- response}bc.acks.Wait()bc.handleResponses()}
}

消息解析处理

func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {child := &partitionConsumer{consumer:  c,conf:      c.conf,topic:     topic,partition: partition,messages:  make(chan *ConsumerMessage, c.conf.ChannelBufferSize),errors:    make(chan *ConsumerError, c.conf.ChannelBufferSize),feeder:    make(chan *FetchResponse, 1),trigger:   make(chan none, 1),dying:     make(chan none),fetchSize: c.conf.Consumer.Fetch.Default,}if err := child.chooseStartingOffset(offset); err != nil {return nil, err}var leader *Brokervar err errorif leader, err = c.client.Leader(child.topic, child.partition); err != nil {return nil, err}if err := c.addChild(child); err != nil {return nil, err}go withRecover(child.dispatcher)// 每个分区起一个协程处理go withRecover(child.responseFeeder)fmt.Printf("\nConsumePartition go %s %d %d\n", topic, partition, offset)child.broker = c.refBrokerConsumer(leader)child.broker.input <- childreturn child, nil
}

自动提交offset


func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {// Check that we are not dealing with a closed Client before processing any other argumentsif client.Closed() {return nil, ErrClosedClient}conf := client.Config()om := &offsetManager{client: client,conf:   conf,group:  group,poms:   make(map[string]map[int32]*partitionOffsetManager),memberID:   memberID,generation: generation,closing: make(chan none),closed:  make(chan none),}if conf.Consumer.Offsets.AutoCommit.Enable {om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)go withRecover(om.mainLoop)}return om, nil
}
func (om *offsetManager) mainLoop() {defer om.ticker.Stop()defer close(om.closed)for {select {case <-om.ticker.C:om.Commit()case <-om.closing:return}}
}func (om *offsetManager) Commit() {om.flushToBroker()om.releasePOMs(false)
}func (om *offsetManager) flushToBroker() {req := om.constructRequest()if req == nil {return}broker, err := om.coordinator()if err != nil {om.handleError(err)return}resp, err := broker.CommitOffset(req)if err != nil {fmt.Printf("broker.CommitOffset fail %v\n", err)om.handleError(err)om.releaseCoordinator(broker)_ = broker.Close()return}om.handleResponse(broker, req, resp)
}

标记位移

func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
}
func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {pom.lock.Lock()defer pom.lock.Unlock()if offset > pom.offset {pom.offset = offsetpom.metadata = metadatapom.dirty = true}
}

func (om *offsetManager) constructRequest() *OffsetCommitRequest {var r *OffsetCommitRequestvar perPartitionTimestamp int64if om.conf.Consumer.Offsets.Retention == 0 {perPartitionTimestamp = ReceiveTimer = &OffsetCommitRequest{Version:                 1,ConsumerGroup:           om.group,ConsumerID:              om.memberID,ConsumerGroupGeneration: om.generation,}} else {r = &OffsetCommitRequest{Version:                 2,RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),ConsumerGroup:           om.group,ConsumerID:              om.memberID,ConsumerGroupGeneration: om.generation,}}om.pomsLock.RLock()defer om.pomsLock.RUnlock()for _, topicManagers := range om.poms {for _, pom := range topicManagers {pom.lock.Lock()if pom.dirty {r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)}pom.lock.Unlock()}}// 有处理的才提交if len(r.blocks) > 0 {return r}return nil
}

消费者重平衡

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err := c.client.Coordinator(c.groupID)if err != nil {if retries <= 0 {return nil, err}return c.retryNewSession(ctx, topics, handler, retries, true)}// 申请加入组// Join consumer groupjoin, err := c.joinGroupRequest(coordinator, topics)if err != nil {_ = coordinator.Close()return nil, err}switch join.Err {case ErrNoError:c.memberID = join.MemberIdcase ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, true)// 已经在重平衡期间case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, join.Err}// 消费者组中的一个消费者作为leader,进行分区方案分配// Prepare distribution plan if we joined as the leadervar plan BalanceStrategyPlanif join.LeaderId == join.MemberId {members, err := join.GetMembers()if err != nil {return nil, err}// 分配分区plan, err = c.balance(members)if err != nil {return nil, err}}// 同步给kafka,只有 leader会带上分区方案// Sync consumer groupgroupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)if err != nil {_ = coordinator.Close()return nil, err}switch groupRequest.Err {case ErrNoError:case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, true)case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, groupRequest.Err}// Retrieve and sort claimsvar claims map[string][]int32 // topic->partions// 如果有可消费的分区if len(groupRequest.MemberAssignment) > 0 {members, err := groupRequest.GetMemberAssignment()if err != nil {return nil, err}claims = members.Topicsc.userData = members.UserDatafor _, partitions := range claims {sort.Sort(int32Slice(partitions))}}return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}

使用例子

消费者-自动提交

package mainimport ("context""fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Version = sarama.V2_0_0_0config.Consumer.Offsets.Initial = sarama.OffsetNewestconfig.Consumer.Offsets.AutoCommit.Enable = true // 自动提交config.Consumer.Return.Errors = truevar (brokers = []string{"localhost:9092"}groupID = "g1"topics  = []string{"test3"})group, err := sarama.NewConsumerGroup(brokers, groupID, config)if err != nil {panic(err)}defer func() { _ = group.Close() }()// Track errorsgo func() {for err := range group.Errors() {fmt.Println("ERROR", err)}}()// Iterate over consumer sessions.ctx := context.Background()for {handler := exampleConsumerGroupHandler{}// `Consume` should be called inside an infinite loop, when a// server-side rebalance happens, the consumer session will need to be// recreated to get the new claimserr := group.Consume(ctx, topics, handler)if err != nil {panic(err)}}
}type exampleConsumerGroupHandler struct{}func (exampleConsumerGroupHandler) Setup(se sarama.ConsumerGroupSession) error {fmt.Printf("Setup %q %+v", se.MemberID(), se.Claims())return nil
}
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Message topic:%q partition:%d offset:%d ts:%s val:%s\n",msg.Topic, msg.Partition, msg.Offset, msg.Timestamp, msg.Value)//time.Sleep(time.Second * 10)sess.MarkMessage(msg, "")//sess.Commit()//fmt.Printf("\n\nafter commit\n")}return nil
}

消费者-手动提交

package mainimport ("context""fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Version = sarama.V2_0_0_0config.Consumer.Offsets.Initial = sarama.OffsetNewestconfig.Consumer.Offsets.AutoCommit.Enable = falseconfig.Consumer.Return.Errors = truevar (brokers = []string{"localhost:9092"}groupID = "g1"topics  = []string{"test3"})group, err := sarama.NewConsumerGroup(brokers, groupID, config)if err != nil {panic(err)}defer func() { _ = group.Close() }()// Track errorsgo func() {for err := range group.Errors() {fmt.Println("ERROR", err)}}()// Iterate over consumer sessions.ctx := context.Background()for {handler := exampleConsumerGroupHandler{}// `Consume` should be called inside an infinite loop, when a// server-side rebalance happens, the consumer session will need to be// recreated to get the new claimserr := group.Consume(ctx, topics, handler)if err != nil {panic(err)}}
}type exampleConsumerGroupHandler struct{}func (exampleConsumerGroupHandler) Setup(se sarama.ConsumerGroupSession) error {fmt.Printf("Setup %q %+v", se.MemberID(), se.Claims())return nil
}
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {for msg := range claim.Messages() {fmt.Printf("Message topic:%q partition:%d offset:%d ts:%s val:%s\n",msg.Topic, msg.Partition, msg.Offset, msg.Timestamp, msg.Value)//time.Sleep(time.Second * 10)sess.MarkMessage(msg, "")sess.Commit()}return nil
}

生产者 同步发送

package mainimport ("fmt""log""os""github.com/Shopify/sarama"
)var (logger = log.New(os.Stderr, "", log.LstdFlags)
)func main() {var (brokers = []string{"localhost:9092"}topic   = "test3")config := sarama.NewConfig()config.Producer.Return.Successes = true/*=0 不发送任何响应,TCP ACK就是你得到的全部WaitForLocal RequiredAcks=1 只等待本地提交成功后再进行响应。WaitForAll RequiredAcks=-1 等待所有同步副本提交后再响应。*/config.Producer.RequiredAcks = sarama.WaitForAll // WaitForAll等待所有同步副本提交后再响应。producer, err := sarama.NewSyncProducer(brokers, config)if err != nil {fmt.Printf("Failed to open Kafka producer: %s", err)return}defer func() {if err := producer.Close(); err != nil {logger.Println("Failed to close Kafka producer cleanly:", err)}}()message := &sarama.ProducerMessage{Topic: topic,Key:   sarama.StringEncoder("k1"),Value: sarama.StringEncoder("v1"),}partition, offset, err := producer.SendMessage(message)if err != nil {fmt.Printf("Failed to produce message: %s", err)}fmt.Printf("produce %d/%d\n", partition, offset)
}

shell

生产者
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test3创建topic 分区数3
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test查看堆积情况,位移差值越大,堆积越严重
[root@localhost kafka_2.12-2.5.1] # [kube:] ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group g1GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST             CLIENT-ID
g1              test3           0          4               4               0               sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama
g1              test3           1          4               4               0               sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama
g1              test3           2          3               3               0               sarama-c0869d1a-9a8e-400d-a6f5-887ca95587d5 /0:0:0:0:0:0:0:1 sarama
g1              test            0          4               4               0               -                                           -                -

相关文章:

golang kafka sarama源码分析

一些理论 1.topic支持多分区&#xff0c;每个分区只能被组内的一个消费者消费&#xff0c;一个消费者可能消费多个分区的数据&#xff1b; 2.消费者组重平衡的分区策略&#xff0c;是由消费者自己决定的&#xff0c;具体是从消费者组中选一个作为leader进行分区方案分配&#…...

计算机组成原理【CO】Ch2 数据的表示和应用

文章目录 大纲2.1 数制与编码2.2 运算方法和运算电路2.3 浮点数的表示和运算 【※】带标志加法器OFSFZFCF计算机怎么区分有符号数无符号数? 【※】存储排列和数据类型转换数据类型大小数据类型转换 进位计数制进制转换2的次幂 各种码的基本特性无符号整数的表示和运算带符号整…...

dfs回溯 -- Leetcode46. 全排列

题目链接&#xff1a;46. 全排列 题目描述 给定一个不含重复数字的数组 nums &#xff0c;返回其 所有可能的全排列 。你可以 按任意顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1a;[[1,2,3],[1,3,2],[2,1,3],[2,3,1],[3,1,2],[3,2,1]]示…...

设计模式-接口隔离原则

基本介绍 客户端不应该依赖它不需要的接口&#xff0c;即一个类对另一个类的依赖应该建立在最小的接口上先看一张图: 类A通过接口Interface1 依赖类B&#xff0c;类C通过接口Interface1 依赖类D&#xff0c;如果接口Interface1对于类A和类C来说不是最小接口&#xff0c;那么类…...

BD202311夏日漫步(最少步数,BFS或者 Dijstra)

本题链接&#xff1a;码蹄集 题目&#xff1a; 夏日夜晚&#xff0c;小度看着庭院中长长的走廊&#xff0c;萌发出想要在上面散步的欲望&#xff0c;小度注意到月光透过树荫落在地砖上&#xff0c;并且由于树荫的遮蔽度不通&#xff0c;所以月光的亮度不同&#xff0c;为了直…...

React - 你知道props和state之间深层次的区别吗

难度级别:初级及以上 提问概率:60% 如果把React组件看做一个函数的话,props更像是外部传入的参数,而state更像是函数内部定义的变量。那么他们还有哪些更深层次的区别呢,我们来看一下。 首先说props,他是组件外部传入的参数,我们知道…...

mysql 查询实战-变量方式-解答

对mysql 查询实战-变量方式-题目&#xff0c;进行一个解答。&#xff08;先看题&#xff0c;先做&#xff0c;再看解答&#xff09; 1、查询表中⾄少连续三次的数字 1&#xff0c;处理思路 要计算连续出现的数字&#xff0c;加个前置变量&#xff0c;记录上一个的值&#xff0c…...

SpringBoot3配置SpringSecurity6

访问1&#xff1a;localhost:8080/security&#xff0c;返回&#xff1a;需要先认证才能访问&#xff08;说明没有权限&#xff09; 访问2&#xff1a;localhost:8080/anonymous&#xff0c;返回&#xff1a;anonymous&#xff08;说明正常访问&#xff09; 相关文件如下&…...

Unity之Unity面试题(三)

内容将会持续更新&#xff0c;有错误的地方欢迎指正&#xff0c;谢谢! Unity之Unity面试题&#xff08;三&#xff09; TechX 坚持将创新的科技带给世界&#xff01; 拥有更好的学习体验 —— 不断努力&#xff0c;不断进步&#xff0c;不断探索 TechX —— 心探索、心进取…...

Linux命令-dos2unix命令(将DOS格式文本文件转换成Unix格式)

说明 dos2unix命令 用来将DOS格式的文本文件转换成UNIX格式的&#xff08;DOS/MAC to UNIX text file format converter&#xff09;。DOS下的文本文件是以 \r\n 作为断行标志的&#xff0c;表示成十六进制就是0D0A。而Unix下的文本文件是以\n作为断行标志的&#xff0c;表示成…...

企业怎么做数据分析

数据分析在当今信息化时代扮演着至关重要的角色。能够准确地收集、分析和利用数据&#xff0c;对企业的决策和发展都具有重要意义。数聚将介绍企业如何合理地利用数据分析&#xff0c;如何协助企业在竞争激烈的市场中取得优势。 一、建立完善的数据收集系统 在进行数据分析之…...

1111111111

c语言中的小小白-CSDN博客c语言中的小小白关注算法,c,c语言,贪心算法,链表,mysql,动态规划,后端,线性回归,数据结构,排序算法领域.https://blog.csdn.net/bhbcdxb123?spm1001.2014.3001.5343 给大家分享一句我很喜欢我话&#xff1a; 知不足而奋进&#xff0c;望远山而前行&am…...

[面向对象] 单例模式与工厂模式

单例模式 是一种创建模式&#xff0c;保证一个类只有一个实例&#xff0c;且提供访问实例的全局节点。 工厂模式 面向对象其中的三大原则&#xff1a; 单一职责&#xff1a;一个类只有一个职责&#xff08;Game类负责什么时候创建英雄机&#xff0c;而不需要知道创建英雄机要…...

《前端防坑》- JS基础 - 你觉得typeof nullValue === null 么?

问题 JS原始类型有6种Undefined, Null, Number, String, Boolean, Symbol共6种。 在对原始类型使用typeof进行判断时, typeof stringValue string typeof numberValue number 如果一个变量(nullValue)的值为null&#xff0c;那么typeof nullValue "?" const u …...

【项目实战经验】DataKit迁移MySQL到openGauss(下)

上一篇我们分享了安装、设置、链接、启动等步骤&#xff0c;本篇我们将继续分享迁移、启动~ 目录 9. 离线迁移 9.1. 迁移插件安装 中断安装&#xff0c;比如 kill 掉java进程&#xff08;安装失败也要等待300s&#xff09; 下载安装包准备上传 缺少mysqlclient lib包 mysq…...

AI预测体彩排3第2弹【2024年4月13日预测--第1套算法开始计算第2次测试】

各位小伙伴&#xff0c;今天实在抱歉&#xff0c;周末回了趟老家&#xff0c;回来比较晚了&#xff0c;数据今天上午跑完后就回老家了&#xff0c;晚上8点多才回来&#xff0c;赶紧把预测结果发出来吧&#xff0c;虽然有点晚了&#xff0c;但是咱们前面说过了&#xff0c;目前的…...

【13137】质量管理(一)2024年4月串讲题组一

目录 1.选择题 2.多选题 3.简答题 4.论述题 5.计算题 6.论述题 【13137】质量管理-速 记 宝 典【全国通用】</...

Go语言中工作负载类型对并发的影响

在实际工作开发中我们需要根据工作负载是CPU密集型还是I/O密集型,使用不同的方式来解决问题。下面我们先来看这些概念,然后再讨论其影响。 在程序执行时,工作负载的执行时间会受以下因素限制: CPU的速度--例如,运行归并排序算法。工作负载被称为CPU密集型。I/O速度--例如…...

常用的Python内置函数

目录 1. getattr() 函数: 2. setattr() 函数: 3. id():返回对象的唯一标识符(内存地址)。 4. type():返回对象的类型。 5. isinstance(obj, classinfo):判断对象是否是某种类型或其子类的实例。 6. issubclass(class1, class2):判断一个类是否是另一个类的子类。 …...

MAC(M1芯片)编译Java项目慢且发热严重问题解决方案

目录 一、背景二、排查三、解决四、效果以及结果展示五、总结 一、背景 使用idea编译项目等操作&#xff0c;经常性发热严重&#xff0c;并且时间慢。直到昨天编译一个项目用时30分钟&#xff0c;电脑温度很高&#xff0c;并且有烧灼的味道&#xff0c;于是有了此篇文章。 二、…...

如何循环pandas格式的数据

如何循环pandas格式的数据 要循环处理 Pandas 格式的数据&#xff0c;可以使用 iterrows() 方法或者 iteritems() 方法。 iterrows() 方法&#xff1a; import pandas as pd# 假设 df 是你的 Pandas DataFrame for index, row in df.iterrows():# 在这里处理每一行的数据&am…...

新零售SaaS架构:客户管理系统架构设计(万字图文总结)

什么是客户管理系统&#xff1f; 客户管理系统&#xff0c;也称为CRM&#xff08;Customer Relationship Management&#xff09;&#xff0c;主要目标是建立、发展和维护好客户关系。 CRM系统围绕客户全生命周期的管理&#xff0c;吸引和留存客户&#xff0c;实现缩短销售周…...

Apache Spark

Apache Spark是一种开源的分布式计算系统&#xff0c;主要用于大数据处理和分析。Spark提供了一个高效的计算引擎&#xff0c;可以在分布式环境中处理大规模数据集。它支持多种编程语言&#xff0c;包括Scala、Java、Python和R。 Spark的核心概念是弹性分布式数据集&#xff0…...

CentOS7编译ZLMediaKit并使能WebRTC

使能WebRTC需要libsrtp库, libsrtp库需要openssl, 所以第一步先安装openssl, 系统自带的版本是1.0.2的, libsrtp需要1.1.1以上版本, 需要使用源码进行编译; GCC准备 需要安装gcc7以上版本, 并切换到gcc7的编译环境 yum install centos-release-scl yum install devtoolset-7…...

【数据交换格式】网络socket编程温度采集智能存储与上报项目技术------JSON、TLV

作者简介&#xff1a; 一个平凡而乐于分享的小比特&#xff0c;中南民族大学通信工程专业研究生在读&#xff0c;研究方向无线联邦学习 擅长领域&#xff1a;驱动开发&#xff0c;嵌入式软件开发&#xff0c;BSP开发 作者主页&#xff1a;一个平凡而乐于分享的小比特的个人主页…...

IP地址定位技术在各领域的作用

IP地址定位是通过确定IP地址的物理位置来定位一个设备的技术&#xff0c;它在现代社会的多个领域中都有着广泛的应用。以下将详细探讨IP地址定位的应用场景&#xff0c;以期对读者有所启发。 首先&#xff0c;在网络安全领域&#xff0c;IP地址定位发挥着至关重要的作用。网络…...

代码随想录 538. 把二叉搜索树转换为累加树

题目 给出二叉 搜索 树的根节点&#xff0c;该树的节点值各不相同&#xff0c;请你将其转换为累加树&#xff08;Greater Sum Tree&#xff09;&#xff0c;使每个节点 node 的新值等于原树中大于或等于 node.val 的值之和。 提醒一下&#xff0c;二叉搜索树满足下列约束条件&a…...

JavaWeb--前端--01HTML和CSS

文章目录 1 前端开发介绍2 开发工具3 文档查阅4 VSCode的插件 1 前端开发介绍 Web标准也称为网页标准&#xff0c;由一系列的标准组成&#xff0c;大部分由W3C&#xff08; World Wide Web Consortium&#xff0c;万维网联盟&#xff09;负责制定。由三个组成部分&#xff1a;…...

Oracle SQL中的DECODE函数与NVL函数:区别与应用场景详析

Oracle SQL中的DECODE函数与NVL函数&#xff1a;区别与应用场景详析 引言1. NVL函数简介与使用示例2. DECODE函数简介与使用示例3. NVL与DECODE函数的区别4. 使用场景举例结论 引言 在Oracle数据库开发和数据分析过程中&#xff0c;DECODE函数和NVL函数都是非常实用且常见的工具…...

算法设计与分析实验报告c++实现(N皇后问题、卫兵布置问题、求解填字游戏问题、图的m着色问题)

一&#xff0e;N皇后问题 基本原理和思路&#xff1a; 从一条路往前走&#xff0c;能进则进&#xff0c;不能进则退回来&#xff0c;换一条路再试。在包含问题的所有解的解空间树中&#xff0c;按照深度优先搜索的策略&#xff0c;从根结点出发深度探索解空间树。当探索到某一…...

修改wordpress媒体url/网络营销专业是干什么的

打开本地git bash,使用如下命令生成ssh公钥和私钥对 ssh-keygen -t rsa -C ‘xxxxxx.com’ 然后一路回车(-C 参数是你的邮箱地址) ssh-keygen -t rsa -C “morgan.zhudotonlink.com”然后打开/.ssh/id_rsa.pub文件(表示用户目录&#xff0c;比如我的windows就是C:\Users\Admi…...

公司做网站费用会计处理/新冠疫情最新情况最新消息

将已有项目代码加入svn版本控制 - TortoiseSVN入门篇Windows下SVN实用教程(以TortoiseSVN作为客户端(client)) 翻译&#xff1a; Bravo Young Next: 版本库的备份与存储 目录 导引安装Subversion安装TortoiseSVN一步步地操作 步骤0. 设置全局忽略文件类型(此步骤为可选)步骤1. …...

wordpress 七牛云加速/快速seo软件

动态加载 对动态库的加载分为自动加载和动态加载两种 自动加载 程序在开始执行的时候, 将依赖的动态库文件加载到内存中, 再进行函数的链接, 称为自动加载 (之前讲动态库讲过) 动态加载 程序在执行期间, 需要使用到某个动态库中的文件的时候, 可以向动态链接器发出请求, 请求…...

佛山网站建设app/一年的百度指数

在家自制手工曲奇&#xff0c;因为在制作过程中没有添加防腐剂和添加剂&#xff0c;所以保质期多半为15-30天&#xff0c;而且必须是密封好保存在阴凉干燥处&#xff0c;否则时间会缩短。为了保证手工曲奇的口感&#xff0c;曲奇应尽量在短时间内吃完&#xff0c;那如果做的太多…...

抖音排名优化/seo黑帽培训骗局

何时使用领域驱动设计转载自&#xff1a;https://www.cnblogs.com/daxnet/p/15177443.html何时使用领域驱动设计&#xff1f;其实当你的应用程序架构设计是面向业务的时候&#xff0c;你已经开始使用领域驱动设计了。领域驱动设计既不是架构风格&#xff08;Architecture Style…...

直播网站怎么做的/郑州网站建设公司哪家好

Bean的自动装配 自动装配说明 自动装配是使用spring满足bean依赖的一种方法 spring会在应用上下文中为某个bean寻找其依赖的bean。 Spring中bean有三种装配机制&#xff0c;分别是&#xff1a; 在xml中显式配置&#xff1b; 在java中显式配置&#xff1b; 隐式的bean发现机…...