当前位置: 首页 > news >正文

kafka安装并测试

一. Linux下ZooKeeper的安装及使用

1、创建工作目录,下载安装包

#创建安装目录
mkdir -p /opt/zookeeper
#移动到目录
cd /opt/zookeepe   
#下载zookeeper安装包
wget https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
#解压缩
tar -zxvf zookeeper-3.4.14.tar.gz

2、配置文件

#移到配置目录
cd /opt/zookeeper/zookeeper-3.4.14/conf/
#复制配置文件
cp zoo_sample.cfg zoo.cfg
#修改及添加以下配置
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/zoodata
dataLogDir=/opt/zookeeper/zoodatalog
clientPort=2181
server.0=127.0.0.1:2888:3888
#多节点 集群
#server.1=127.0.0.1:4888:5888
#server.2=127.0.0.1:5888:6888
admin.serverPort=9099

#保存退出

#配置说明

tickTime:客户端会话超时时间,默认2000毫秒。
initLimit:配置客户端初始化可接受多少个心跳监测,默认10,即10*tickTime(默认2000),表示20s没有连接上集群的配置则连接失败。
syncLimit:配置Leader和follwer之间,允许多少个请求应答长度,默认5,即5*tickTime(默认2000),表示默认10sLeader和Follwer之间如果消息5次没有发送成功就不尝试了。
dataDir:配置存储快照文件的目录。
dataLogDir:配置事务日志存储的目录。
clientPort:服务默认端口,默认2181。
server.X=A:B:C 其中X是一个数字,表示这是第几号server,A是该server所在的IP地址,B配置该server和集群中的leader交换消息所使用的端口,C配置选举leader时所使用的端口。

3、创建节点的myid

#创建dataDir目录
mkdir -p /opt/zookeeper/zoodata
#移动到目录
cd /opt/zookeeper/zoodata
#把节点号写入myid文件(各个节点分别配置)
echo 0 > myid
#配置端口防火墙(各个节点分别配置)
firewall-cmd --zone=public --add-port=2181/tcp --permanent
firewall-cmd --reload

4、启动ZooKeeper

#重启
./zkServer.sh restart
#关闭
./zkServer.sh stop
#查看状态
./zkServer.sh staus
#启动的时候,查看后台信息
./zkServer.sh start-foreground &

没起来的可能报错

2023-10-27 14:15:16,975 [myid:0] - ERROR [main:ZooKeeperServerMain@85] - Unable to start AdminServer, exiting abnormally
原因: zk admin启动默认端口是8080,如果有其他服务在用8080,那启动时就报错了,端口已被绑定   配置文件中添加admin.serverPort=9099

5、客户端连接
#启动客户端

./zkCli.sh#创建节点
create /test test1
#获取节点数据
get /test
#更新节点
set /test  test2
#删除节点
delete /test
#递归删除数据,将子目录的数据也删除掉
rmr /test
#查看节点
ls /
#查看输入过的命令
history

二. Linux下搭建Kafka服务

1、安装JDK 1.8
java -version 命令查看JDK版本,如图安装成功

[root@localhost kafka]# java -version
java version "1.8.0_201"
Java(TM) SE Runtime Environment (build 1.8.0_201-b09)
Java HotSpot(TM) 64-Bit Server VM (build 25.201-b09, mixed mode)

2、安装kafka

#创建安装目录
mkdir -p /opt/kafka
#移动到目录
cd /opt/kafka
#下载kafka安装包
wget https://mirrors.aliyun.com/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz
#解压缩
tar -zxvf kafka_2.12-2.5.0.tgz

3、配置文件

#进入配置目录
cd kafka_2.12-2.5.0/config/
#备份配置文件
cp server.properties server.properties.bak
#修改配置文件
vim server.properties
#修改及添加以下配置
broker.id=1
listeners=PLAINTEXT://127.0.0.1:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
#其他自定义配置(根据实际修改)
zookeeper.connect=127.0.0.1:2181
zookeeper.connection.timeout.ms=18000

#保存退出

#配置说明

broker.id:当前机器在集群中的唯一标识。例如有三台Kafka主机,则分别配置为1,2,3。
listeners:服务监听端口。
advertised.listeners:提供给生产者,消费者的端口号,即外部访问地址。默认为listeners的值。
zookeeper.connect:zookeeper连接地址。如有集群配置,每台Kafka主机都需要连接全部zookeeper服务,实例如下:
zookeeper.connect=192.168.1.41:2181,192.168.1.42:2181,192.168.1.47:2181
zookeeper.connection.timeout.ms:zookeeper连接超时时间。

4、启动Kafka

#移到工作目录
cd /opt/kafka/kafka_2.12-2.5.0/bin/
#启动kafka
./kafka-server-start.sh -daemon ../config/server.properties
#关闭kafka服务
./kafka-server-stop.sh
查看端口已被监听,启动成功:
[root@localhost kafka]# netstat -antlp | grep 9092
tcp        0      0 127.0.0.1:34162         127.0.0.1:9092          ESTABLISHED 19313/./my_producer
tcp6       0      0 127.0.0.1:9092          :::*                    LISTEN      10101/java          
tcp6       1      0 127.0.0.1:34142         127.0.0.1:9092          CLOSE_WAIT  10101/java          
tcp6       0      0 127.0.0.1:9092          127.0.0.1:34162         ESTABLISHED 10101/java

5、测试创建一个topic

#移到工作目录
cd /opt/kafka/kafka_2.12-2.5.0/bin/
#创建topic
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic topic1
#查看topic信息
./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic topic1

测试
#启动生产者控制台

[root@localhost bin]# ./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic t1
>test
>123456

#启动消费者控制台(新开一个窗口)

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic t1 --from-beginning
test
123456
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic t1
Created topic t1.
[root@localhost bin]#
[root@localhost bin]# ./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic t1
Topic: t1    PartitionCount: 1    ReplicationFactor: 1    Configs:Topic: t1    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
[root@localhost bin]#

三. c语言使用librdkafka库实现kafka的生产和消费实例

1. 生产者常用接口

1、创建kafka配置
rd_kafka_conf_t *rd_kafka_conf_new (void)
2、配置kafka各项参数
rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,const char *name,const char *value,char *errstr, size_t errstr_size)
3、设置发送回调函数
void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,void (*dr_msg_cb) (rd_kafka_t *rk,const rd_kafka_message_t *rkmessage,void *opaque))
4、创建producer实例
rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)
5、实例化topic
rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)
6、异步调用将消息发送到指定的topic
int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,int msgflags,void *payload, size_t len,const void *key, size_t keylen,void *msg_opaque)
7、阻塞等待消息发送完成
int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms)
8、等待完成producer请求完成
rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)
9、销毁topic
void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)
10、销毁producer实例
void rd_kafka_destroy (rd_kafka_t *rk)

生产者实例实现:

#include <stdio.h>
#include <signal.h>
#include <string.h>
#include "librdkafka/rdkafka.h"
//  gcc produce.c -o my_producer  -lrdkafka -lz -lpthread -lrt
static int run = 1;
static void stop(int sig){run = 0;fclose(stdin);
}
/*每条消息调用一次该回调函数,说明消息是传递成功(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR)还是传递失败(rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)该回调函数由rd_kafka_poll()触发,在应用程序的线程上执行
*/
static void dr_msg_cb(rd_kafka_t *rk,const rd_kafka_message_t *rkmessage, void *opaque){if(rkmessage->err)fprintf(stderr, "%% Message delivery failed: %s\n",rd_kafka_err2str(rkmessage->err));elsefprintf(stderr,"%% Message delivered (%zd bytes, ""partition %"PRId32")\n",rkmessage->len, rkmessage->partition);/* rkmessage被librdkafka自动销毁*/
}
int main(int argc, char **argv){rd_kafka_t *rk;            /*Producer instance handle*/rd_kafka_topic_t *rkt;     /*topic对象*/rd_kafka_conf_t *conf;     /*临时配置对象*/char errstr[512];          char buf[512];             const char *brokers;       const char *topic;         if(argc != 3){fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);return 1;}brokers = argv[1];topic = argv[2];/* 创建一个kafka配置占位 */conf = rd_kafka_conf_new();/*创建broker集群*/if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK){fprintf(stderr, "%s\n", errstr);return 1;}/*设置发送报告回调函数,rd_kafka_produce()接收的每条消息都会调用一次该回调函数*应用程序需要定期调用rd_kafka_poll()来服务排队的发送报告回调函数*/rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);/*创建producer实例rd_kafka_new()获取conf对象的所有权,应用程序在此调用之后不得再次引用它*/rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));if(!rk){fprintf(stderr, "%% Failed to create new producer:%s\n", errstr);return 1;}/*实例化一个或多个topics(`rd_kafka_topic_t`)来提供生产或消费,topic对象保存topic特定的配置,并在内部填充所有可用分区和leader brokers,*/rkt = rd_kafka_topic_new(rk, topic, NULL);if (!rkt){fprintf(stderr, "%% Failed to create topic object: %s\n",rd_kafka_err2str(rd_kafka_last_error()));rd_kafka_destroy(rk);return 1;}/*用于中断的信号*/signal(SIGINT, stop);fprintf(stderr,"%% Type some text and hit enter to produce message\n""%% Or just hit enter to only serve delivery reports\n""%% Press Ctrl-C or Ctrl-D to exit\n");while(run && fgets(buf, sizeof(buf), stdin)){size_t len = strlen(buf);if(buf[len-1] == '\n')buf[--len] = '\0';if(len == 0){/*轮询用于事件的kafka handle,事件将导致应用程序提供的回调函数被调用第二个参数是最大阻塞时间,如果设为0,将会是非阻塞的调用*/rd_kafka_poll(rk, 0);continue;}retry:/*Send/Produce message.这是一个异步调用,在成功的情况下,只会将消息排入内部producer队列,对broker的实际传递尝试由后台线程处理,之前注册的传递回调函数(dr_msg_cb)用于在消息传递成功或失败时向应用程序发回信号*/if (rd_kafka_produce(/* Topic object */rkt,/*使用内置的分区来选择分区*/RD_KAFKA_PARTITION_UA,/*生成payload的副本*/RD_KAFKA_MSG_F_COPY,/*消息体和长度*/buf, len,/*可选键及其长度*/NULL, 0,NULL) == -1){fprintf(stderr,"%% Failed to produce to topic %s: %s\n",rd_kafka_topic_name(rkt),rd_kafka_err2str(rd_kafka_last_error()));if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){/*如果内部队列满,等待消息传输完成并retry,内部队列表示要发送的消息和已发送或失败的消息,内部队列受限于queue.buffering.max.messages配置项*/rd_kafka_poll(rk, 1000);goto retry;}   }else{fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n",len, rd_kafka_topic_name(rkt));}/*producer应用程序应不断地通过以频繁的间隔调用rd_kafka_poll()来为传送报告队列提供服务。在没有生成消息以确定先前生成的消息已发送了其发送报告回调函数(和其他注册过的回调函数)期间,要确保rd_kafka_poll()仍然被调用*/rd_kafka_poll(rk, 0);}fprintf(stderr, "%% Flushing final message.. \n");/*rd_kafka_flush是rd_kafka_poll()的抽象化,等待所有未完成的produce请求完成,通常在销毁producer实例前完成以确保所有排列中和正在传输的produce请求在销毁前完成*/rd_kafka_flush(rk, 10*1000);/* Destroy topic object */rd_kafka_topic_destroy(rkt);/* Destroy the producer instance */rd_kafka_destroy(rk);return 0;
}

**2. 消费者常用接口 **

1、创建kafka配置
rd_kafka_conf_t *rd_kafka_conf_new (void)
2、创建kafka topic的配置
rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)
3、配置kafka各项参数
rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,const char *name,const char *value,char *errstr, size_t errstr_size)
4、配置kafka topic各项参数
rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,const char *name,const char *value,char *errstr, size_t errstr_size)
5、创建consumer实例
rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)
6、为consumer实例添加brokerlist
int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)
7、开启consumer订阅
rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)
8、轮询消息或事件,并调用回调函数
rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms)
9、关闭consumer实例
rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)
10、释放topic list资源
rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)
11、销毁consumer实例
void rd_kafka_destroy (rd_kafka_t *rk)
12、等待consumer对象的销毁
int rd_kafka_wait_destroyed (int timeout_ms)

消费者实例实现

#include <string.h>
#include <stdlib.h>
#include <syslog.h>
#include <signal.h>
#include <error.h>
#include <getopt.h>
#include "librdkafka/rdkafka.h"//  gcc consume.c -o my_consumer  -lrdkafka -lz -lpthread -lrtstatic int run = 1;
//`rd_kafka_t`自带一个可选的配置API,如果没有调用API,Librdkafka将会使用CONFIGURATION.md中的默认配置。
static rd_kafka_t *rk;
static rd_kafka_topic_partition_list_t *topics;
static void stop (int sig) {if (!run)exit(1);run = 0;fclose(stdin); /* abort fgets() */
}
static void sig_usr1 (int sig) {rd_kafka_dump(stdout, rk);
}
/**
* 处理并打印已消费的消息
*/
static void msg_consume (rd_kafka_message_t *rkmessage,void *opaque) {if (rkmessage->err) {if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {fprintf(stderr,"%% Consumer reached end of %s [%"PRId32"] ""message queue at offset %"PRId64"\n",rd_kafka_topic_name(rkmessage->rkt),rkmessage->partition, rkmessage->offset);return;}if (rkmessage->rkt)fprintf(stderr, "%% Consume error for ""topic \"%s\" [%"PRId32"] ""offset %"PRId64": %s\n",rd_kafka_topic_name(rkmessage->rkt),rkmessage->partition,rkmessage->offset,rd_kafka_message_errstr(rkmessage));elsefprintf(stderr, "%% Consumer error: %s: %s\n",rd_kafka_err2str(rkmessage->err),rd_kafka_message_errstr(rkmessage));if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)run = 0;return;}fprintf(stdout, "%% Message (topic %s [%"PRId32"], ""offset %"PRId64", %zd bytes):\n",rd_kafka_topic_name(rkmessage->rkt),rkmessage->partition,rkmessage->offset, rkmessage->len);if (rkmessage->key_len) {printf("Key: %.*s\n",(int)rkmessage->key_len, (char *)rkmessage->key);}printf("%.*s\n",(int)rkmessage->len, (char *)rkmessage->payload);}
/*init all configuration of kafka
*/
int initKafka(char *brokers, char *group,char *topic){rd_kafka_conf_t *conf;rd_kafka_topic_conf_t *topic_conf;rd_kafka_resp_err_t err;char tmp[16];char errstr[512];/* Kafka configuration */conf = rd_kafka_conf_new();//quick terminationsnprintf(tmp, sizeof(tmp), "%i", SIGIO);rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);//topic configurationtopic_conf = rd_kafka_topic_conf_new();/* Consumer groups require a group id */if (!group)group = "rdkafka_consumer_example";if (rd_kafka_conf_set(conf, "group.id", group,errstr, sizeof(errstr)) !=RD_KAFKA_CONF_OK) {fprintf(stderr, "%% %s\n", errstr);return -1;}/* Consumer groups always use broker based offset storage */if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method","broker",errstr, sizeof(errstr)) !=RD_KAFKA_CONF_OK) {fprintf(stderr, "%% %s\n", errstr);return -1;}/* Set default topic config for pattern-matched topics. */rd_kafka_conf_set_default_topic_conf(conf, topic_conf);//实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));if(!rk){fprintf(stderr, "%% Failed to create new consumer:%s\n", errstr);return -1;}//Librdkafka需要至少一个brokers的初始化listif (rd_kafka_brokers_add(rk, brokers) == 0){fprintf(stderr, "%% No valid brokers specified\n");return -1;}//重定向 rd_kafka_poll()队列到consumer_poll()队列rd_kafka_poll_set_consumer(rk);//创建一个Topic+Partition的存储空间(list/vector)topics = rd_kafka_topic_partition_list_new(1);//把Topic+Partition加入listrd_kafka_topic_partition_list_add(topics, topic, -1);//开启consumer订阅,匹配的topic将被添加到订阅列表中if((err = rd_kafka_subscribe(rk, topics))){fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));return -1;}return 1;
}
int main(int argc, char **argv){char *brokers = "localhost:9092";char *group = NULL;char *topic = NULL;int opt;rd_kafka_resp_err_t err;while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){switch (opt) {case 'b':brokers = optarg;break;case 'g':group = optarg;break;case 't':topic = optarg;break;default:break;}}signal(SIGINT, stop);signal(SIGUSR1, sig_usr1);if(!initKafka(brokers, group, topic)){fprintf(stderr, "kafka server initialize error\n");}else{while(run){rd_kafka_message_t *rkmessage;/*-轮询消费者的消息或事件,最多阻塞timeout_ms-应用程序应该定期调用consumer_poll(),即使没有预期的消息,以服务所有排队等待的回调函数,当注册过rebalance_cb,该操作尤为重要,因为它需要被正确地调用和处理以同步内部消费者状态 */rkmessage = rd_kafka_consumer_poll(rk, 1000);if(rkmessage){msg_consume(rkmessage, NULL);/*释放rkmessage的资源,并把所有权还给rdkafka*/rd_kafka_message_destroy(rkmessage);}}}
done:/*此调用将会阻塞,直到consumer撤销其分配,调用rebalance_cb(如果已设置),commit offset到broker,并离开consumer group最大阻塞时间被设置为session.timeout.ms*/err = rd_kafka_consumer_close(rk);if(err){fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));}else{fprintf(stderr, "%% Consumer closed\n");}//释放topics list使用的所有资源和它自己rd_kafka_topic_partition_list_destroy(topics);//destroy kafka handlerd_kafka_destroy(rk);run = 5;//等待所有rd_kafka_t对象销毁,所有kafka对象被销毁,返回0,超时返回-1while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){printf("Waiting for librdkafka to decommission\n");}if(run <= 0){//dump rdkafka内部状态到stdout流rd_kafka_dump(stdout, rk);}return 0;
}

运行:

[root@localhost kafka]# ./my_consumer -b localhost:9092 -t t1
%4|1592810248.073|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property offset.store.method is deprecated: Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires "group.id" to be configured and Apache Kafka 0.8.2 or later on the broker.).
% Message (topic t1 [0], offset 5, 6 bytes):
hellpo
% Message (topic t1 [0], offset 6, 6 bytes):
123456[root@localhost kafka]# ./my_producer localhost:9092 t1
% Type some text and hit enter to produce message
% Or just hit enter to only serve delivery reports
% Press Ctrl-C or Ctrl-D to exit
hellpo
% Enqueued message (6 bytes) for topic t1
123456

相关文章:

kafka安装并测试

一. Linux下ZooKeeper的安装及使用 1、创建工作目录&#xff0c;下载安装包 #创建安装目录 mkdir -p /opt/zookeeper #移动到目录 cd /opt/zookeepe #下载zookeeper安装包 wget https://mirrors.aliyun.com/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz #解…...

flutter路由跳转

Navigator.of(context).push(); //路由跳转(模块方式) Navigator.of(context).push(MaterialPageRoute(builder: (BuildContext context) {return const Page() ;//Page()指页面}, )) Navigator.pushNamed(context, "/") //路由跳转(路由方式) Navigator.pop(cont…...

微服务项目小结1

01.微服务的概念 单体、分布式、集群 (面试用到)微服务把之前的大的应用&#xff0c;按照业务功能拆分成若干个小的模块&#xff0c;每个模块都是独立的开发&#xff0c;测试&#xff0c;上线&#xff0c;维护缺点: 开发成本高&#xff0c;众多服务出错的处理(容错),分布式事务…...

【小熊猫 ide】更新支持mingw 支持c++20

没有format 头文件 GCC版本对C++的支持情况即使我使用11,也没有format 头文件小熊猫 ide https://wwe.lanzoui.com/b01os0mwd最新11可以自己更新https://royqh1979.gitee.io/redpandacpp/docsy/docs/gcc13 才支持format [7GCC 13 has added support for std::format.](https:/…...

ESD保护二极管ESD9B3.3ST5G 以更小的空间实现强大的保护 车规级TVS二极管更给力

什么是汽车级TVS二极管&#xff1f; TVS二极管是一种用于保护电子电路的电子元件。它主要用于电路中的过电压保护&#xff0c;防止电压过高而损坏其他部件。TVS二极管通常被称为“汽车级”是因为它们能够满足汽车电子系统的特殊要求。 在汽车电子系统中&#xff0c;由于车辆启…...

SAP BTP云上一个JVM与DB Connection纠缠的案例

前言 最近在CF (Cloud Foundry) 云平台上遇到一个比较经典的案例。因为牵扯到JVM &#xff08;app进程&#xff09;与数据库连接两大块&#xff0c;稍有不慎&#xff0c;很容易引起不快。 在云环境下&#xff0c;有时候相互扯皮的事蛮多。如果是DB的问题&#xff0c;就会找DB…...

Linux进程的基本概念

冯诺依曼体系结构 我们常见的计算机&#xff0c;如笔记本。我们不常见的计算机&#xff0c;如服务器&#xff0c;大部分都遵守冯诺依曼体系。 截至目前&#xff0c;我们所认识的计算机&#xff0c;都是有一个个的硬件组件组成 输入单元&#xff1a;包括键盘 , 鼠标&#xf…...

设计模式深度解析:AI如何影响装饰器模式与组合模式的选择与应用

​&#x1f308; 个人主页&#xff1a;danci_ &#x1f525; 系列专栏&#xff1a;《设计模式》《MYSQL应用》 &#x1f4aa;&#x1f3fb; 制定明确可量化的目标&#xff0c;坚持默默的做事。 AI如何影响装饰器模式与组合模式的选择与应用 在今天这个快速发展的技术时代&#…...

JAVA面试大全之微服务篇

目录 1、Spring Cloud 1.1、什么是微服务?谈谈你对微服务的理解? 1.2、什么是Spring Cloud? 1.3、springcloud中的组件有那些? 1.4、具体说说SpringCloud主要项目...

WiFiSpoof for Mac wifi地址修改工具

WiFiSpoof for Mac&#xff0c;一款专为Mac用户打造的网络隐私守护神器&#xff0c;让您在畅游互联网的同时&#xff0c;轻松保护个人信息安全。 软件下载&#xff1a;WiFiSpoof for Mac下载 在这个信息爆炸的时代&#xff0c;网络安全问题日益凸显。WiFiSpoof通过伪装MAC地址&…...

14 - grace数据处理 - 泄露误差改正 - 空域滤波法(Mascon法)

@[TOC](grace数据处理 - 泄露误差改正 - 空域滤波法(Mascon法)) 空域法的基本思想是假设地面某区域的质量变化是由一系列位置已知、质量未知的质量块(小范围区域)引起的,那么将GRACE反演的结果归算到n个质量块上的过程就是泄露信号恢复的过程。个人理解是这样的:假定已知研…...

openGauss MySQL兼容性增强

MySQL兼容性增强 可获得性 本特性自openGauss 3.0.0版本开始引入。 特性简介 本特性主要从以下几方面增强openGauss与MySQL的兼容性&#xff08;只列举部分典型语法&#xff0c;详情请参见《数据迁移指南》中“MySQL兼容性说明”章节&#xff09;&#xff1a;。 支持用户锁…...

【跟小嘉学 Linux 系统架构与开发】二、Linux发型版介绍与基础常用命令介绍

系列文章目录 【跟小嘉学 Linux 系统架构与开发】一、学习环境的准备与Linux系统介绍 【跟小嘉学 Linux 系统架构与开发】二、Linux发型版介绍与基础常用命令介绍 文章目录 系列文章目录[TOC](文章目录) 前言一、 Linux 发行版(Linux distribution)介绍二、Centos 虚拟机初始化…...

EMD关于信号的重建,心率提取

关于EMD的俩个假设&#xff1a; IMF 有两个假设条件&#xff1a; 在整个数据段内&#xff0c;极值点的个数和过零点的个数必须相等或相差最多不能超过一 个&#xff1b;在任意时刻&#xff0c;由局部极大值点形成的上包络线和由局部极小值点形成的下包络线 的平均值为零&#x…...

HEVC的Profile和Level介绍

文章目录 HEVCProfile&#xff08;配置&#xff09;&#xff1a;Level&#xff08;级别&#xff09;&#xff1a;划分标准 HEVC HEVC&#xff08;High Efficiency Video Coding&#xff09;&#xff0c;也称为H.265&#xff0c;是一种视频压缩标准&#xff0c;旨在提供比先前的…...

Springboot Thymeleaf 实现数据添加、修改、查询、删除

1、引言 在Spring Boot中使用Thymeleaf模板引擎实现数据的添加、修改、查询和删除功能&#xff0c;通常步骤如下&#xff1a; 在Controller类中&#xff0c;定义处理HTTP请求的方法。创建Thymeleaf模板来处理表单的显示和数据的绑定。 2、用户数据添加 1、 在Controller类中…...

关于 UnityEditorWindow

想要使用UnityEditorWindow作为调试窗口吗&#xff1f; 这样做可以很方便的针对游戏中的重要对象做调试。 但是有一个很不方便的地方&#xff0c;OnGUI 的刷新频率不高&#xff0c;或者说需要鼠标点击之后才会重绘&#xff0c;如何解决这一问题&#xff1f; 可以如下操作&am…...

小狐狸JSON-RPC:wallet_addEthereumChain(添加指定链)

wallet_addethereumchain&#xff08;添加网络&#xff09; var res await window.ethereum.request({"method": "wallet_addEthereumChain","params": [{"chainId": "0x64", // 链 ID &#xff08;必填&#xff09;"…...

Pandas | value_counts() 的详细用法

value_counts() 函数得作用 用来统计数据表中&#xff0c;指定列里有多少个不同的数据值&#xff0c;并计算每个不同值有在该列中的个数&#xff0c;同时还能根据指定得参数返回排序后结果。 返回得是Series对象 value_counts(values,sortTrue, ascendingFalse, normalizeFal…...

上岸美团了!

Hello&#xff0c;大家好&#xff0c;最近春招正在如火如荼&#xff0c;给大家分享一份美团的面经&#xff0c;作者是一份某双非的硕&#xff08;只如初见668&#xff09;&#xff0c;刚刚通过了美团的3轮面试&#xff0c;已经拿到offer&#xff0c;以下是他的一些分享。 一面&…...

Gemma开源AI指南

近几个月来&#xff0c;谷歌推出了 Gemini 模型&#xff0c;在人工智能领域掀起了波澜。 现在&#xff0c;谷歌推出了 Gemma&#xff0c;再次引领创新潮流&#xff0c;这是向开源人工智能世界的一次变革性飞跃。 与前代产品不同&#xff0c;Gemma 是一款轻量级、小型模型&…...

LabVIEW智能家居安防系统

LabVIEW智能家居安防系统 随着科技的飞速发展和人们生活水平的不断提升&#xff0c;智能家居系统以其便利性和高效性&#xff0c;逐渐成为现代生活的新趋势。智能家居安防系统作为智能家居系统的重要组成部分&#xff0c;不仅能够提高家庭的安全性&#xff0c;还能为用户提供更…...

[蓝桥杯 2022 省 A] 求和

[蓝桥杯 2022 省 A] 求和 题目描述 给定 n n n 个整数 a 1 , a 2 , ⋯ , a n a_{1}, a_{2}, \cdots, a_{n} a1​,a2​,⋯,an​, 求它们两两相乘再相加的和&#xff0c;即 S a 1 ⋅ a 2 a 1 ⋅ a 3 ⋯ a 1 ⋅ a n a 2 ⋅ a 3 ⋯ a n − 2 ⋅ a n − 1 a n − 2 ⋅ a…...

【C++入门】输入输出、命名空间、缺省参数、函数重载、引用、内联函数、auto、基于范围的for循环

目录 命名空间 命名空间的定义 命名空间的使用 输入输出 缺省参数 函数重载 引用 常引用 引用的使用场景 内联函数 auto 基于范围的for循环 命名空间 请看一段C语言的代码&#xff1a; #include <stdio.h> #include <stdlib.h>int rand 10;int main…...

Docker + Nginx 安装

安装Docker 1.防火墙 2.yum源 3.安装基础软件 更新yum源 wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo yum clean all #清除yum源缓存 yu…...

UE RPC 外网联机(1)

技术&#xff1a;RPC TCP通信 设计&#xff1a;大厅服务<---TCP--->房间服务<---RPC--->客户端&#xff08;Creator / Participator&#xff09; 1. PlayerController 用于RPC通信控制 2.GameMode 用于数据同步 3.类图 4. 注意 &#xff08;1&#xff09;RPC&a…...

AI预测福彩3D第22弹【2024年3月31日预测--第5套算法开始计算第4次测试】

今天&#xff0c;咱们继续进行本套算法的测试&#xff0c;今天为第四次测试&#xff0c;仍旧是采用冷温热趋势结合AI模型进行预测。好了&#xff0c;废话不多说了。直接上结果~ 仍旧是分为两个方案&#xff0c;1大1小。 经过人工神经网络计算并进行权重赋值打分后&#xff0c;3…...

Django(二)-搭建第一个应用(1)

一、项目环境和结构 1、项目环境 2、项目结构 二、编写项目 1、创建模型 代码示例: import datetimefrom django.db import models from django.utils import timezone# Create your models here.class Question(models.Model):question_text models.CharField(max_length2…...

前端bugs

问题&#xff1a; Failed to load plugin typescript-eslint declared in package.json eslint-config-react-app#overrides[0]: Cannot find module eslint/package.json 解决&#xff1a; google了一晚上还得是chatgpt管用 运行以下命令【同时还要注意项目本身使用的Node版…...

MCGS学习——水位控制

要求 插入一个水罐&#xff0c;液位最大值为37插入一个滑动输入器&#xff0c;用来调节水罐水位&#xff0c;滑动输入器最大调节为液位最大值&#xff0c;并能清楚的显示出液位情况用仪表显示水位变化情况&#xff0c;仪表最大显示设置直观清楚方便读数&#xff0c;主划线为小…...

网站建设需求原型/hao123影视

文章来源&#xff1a;Python小例子告别枯燥&#xff0c;通过学习有趣的小例子&#xff0c;扎实而系统的入门Python&#xff0c;从菜鸟到大师&#xff0c;个人觉得这是很靠谱的一种方法。通过一个又一个的小例子&#xff0c;真正领悟Python之强大&#xff0c;之简洁&#xff0c;…...

网站建设策划 优帮云/友情链接检测

1.跨境电子商务运营模式包括&#xff1a;M2C模式&#xff0e;B2C模式&#xff0e;C2C模式&#xff0e;BBC保税区模式&#xff0e;海外电商直邮。该说法&#xff08; &#xff09; A.正确 B.错误 错误 正确答案&#xff1a;左边查询 学生答案&#xff1a;未作答 2.在按照服务类型…...

物联网技术是学什么的/seo薪资水平

FlatBuffers是一个高性能、跨平台的序列化库&#xff0c;支持C、C#、C、Go、Java、JavaScript、TypeScript、PHP和Python。由google开发&#xff0c;并用于游戏及其他对性能要求高的应用。 1、不用解析/解包&#xff0c;就可以直接访问序列化的数据 1&#xff09;写schema文件。…...

网站显示结算/自助建站系统源码

前言&#xff1a;前面讲完了一些并发编程的原理&#xff0c;现在我们要来学习的是线程之间的协作。通俗来说就是&#xff0c;当前线程在某个条件下需要等待&#xff0c;不需要使用太多系统资源。在某个条件下我们需要去唤醒它&#xff0c;分配给它一定的系统资源&#xff0c;让…...

那个网站可以做网站测速对比/百度推广后台登录页面

文章目录1.下载安装包2.上传安装包3.修改配置文件3.1修改zeppelin-site.xml文件3.2修改zeppelin-env.sh文件4.启动zeppelin5.配置hive解释器5.1环境和变量配置5.2 在web界面配置集成hive6.使用Zepplin的hive解释器1.下载安装包 官网直达 选择zeppelin-0.8.1-bin-all.tgz 2.上…...

家具制作网站/新型实体企业100强

这个错误通常是由于卷积层&#xff08;Convolutional layer&#xff09;的输入通道数与卷积核&#xff08;Convolutional kernel&#xff09;的通道数不匹配导致的。具体地说&#xff0c;卷积核的通道数应该与输入 tensor 的通道数相同。 在你的代码中&#xff0c;卷积层的卷积…...