当前位置: 首页 > news >正文

【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)

【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)

  • 1)导入依赖
  • 2)resources
    • 2.1.appconfig.yml
    • 2.2.application.properties
    • 2.3.log4j.properties
    • 2.4.log4j2.xml
  • 3)util
    • 3.1.KafkaMysqlUtils
    • 3.2.CustomDeSerializationSchema
  • 4)po
    • 4.1.TableBean
  • 5)kafkacdc2mysql
    • 5.1.Kafka2MysqlApp

需求描述:

1、数据从 Kafka 写入 Mysql。

2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。

3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。

4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。

5、读取时使用自定义 Source,写入时使用自定义 Sink。

6、消费 Kafka 数据时自定义反序列化。

1)导入依赖

这里的依赖比较冗余,大家可以根据各自需求做删除或保留。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>gaei.cn.x5l</groupId><artifactId>x8vbusiness</artifactId><version>1.0.0</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><target.java.version>1.8</target.java.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.10</scala.version><flink.version>1.14.0</flink.version><log4j.version>2.17.2</log4j.version><hadoop.version>3.1.2</hadoop.version><hive.version>3.1.2</hive.version><mongo.driver.version>3.12.6</mongo.driver.version><mongo.driver.core.version>4.3.1</mongo.driver.core.version></properties><dependencies><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version><!--            <exclusions>--><!--                <exclusion>--><!--                    <groupId>mysql</groupId>--><!--                    <artifactId>mysql-connector-java</artifactId>--><!--                </exclusion>--><!--            </exclusions>--></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency><!-- 基础依赖  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 基础依赖  结束--><!-- TABLE  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>1.14.0</version><scope>provided</scope></dependency><!-- 使用 hive sql时注销,其他时候可以放开 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- TABLE  结束--><!-- sql  开始--><!-- sql解析 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- sql解析 结束 --><!-- sql连接 kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- sql  结束--><!-- 检查点 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-state-processor-api_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version><scope>compile</scope></dependency><!-- 本地监控任务 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 本地监控任务 结束 --><!-- DataStream 开始 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><!-- hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.1</version><exclusions><exclusion><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></exclusion></exclusions></dependency><!-- 重点,容易被忽略的jar --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>${hadoop.version}</version></dependency><!-- rocksdb_2 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 其他 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency><dependency><groupId>org.jyaml</groupId><artifactId>jyaml</artifactId><version>1.3</version></dependency><!-- TABLE  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><!--            <version>${flink.version}</version>--><version>1.13.5</version><scope>provided</scope></dependency><!-- TABLE  结束--><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.3</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><version>2.3.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><!--            <version>5.1.44</version>--><version>8.0.27</version><scope>runtime</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.2.8</version></dependency><dependency><groupId>org.mongodb</groupId><artifactId>bson</artifactId><version>${mongo.driver.core.version}</version></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver-core</artifactId><version>${mongo.driver.core.version}</version></dependency><!--    使用 mongodb-driver 重新打包成的 custom-mongo-core  --><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>3.12.6</version></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude><exclude>org.apache.flink:flink-runtime-web_2.11</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass></transformer><!-- flink sql 需要  --><!-- The service transformer is needed to merge META-INF/services files --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><!-- ... --></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.0.0,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>

2)resources

2.1.appconfig.yml

mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false"
mysql.username: "test"
mysql.password: "123456"
mysql.driver: "com.mysql.jdbc.Driver"

2.2.application.properties

url=mongodb://test:test123456@10.1.1.1:34516/?authSource=admin
#database=diagnosis
#collection=diagnosisEntiry
maxConnectionIdleTime=1000000
batchSize=1# flink
checkpoint.interval=300000
checkpoint.minPauseBetweenCheckpoints=10000
checkpoint.checkpointTimeout=400000
maxConcurrentCheckpoints=1
restartInterval=120
restartStrategy=3
checkpointDataUri=hdfs://nameserver/user/flink/rocksdbcheckpoint_mongomysql.url=jdbc:mysql://1.1.1.1:3306/test?useSSL=false
mysql.username=test
mysql.password=123456#envType=PRE
envType=PRD# mysql  druid 连接池生产环境连接池配置
druid.driverClassName=com.mysql.jdbc.Driver
#生产
druid.url=jdbc:mysql://1.1.1.1:3306/test
druid.username=test
druid.password=123456
# 初始化连接数
druid.initialSize=1
# 最大连接数
druid.maxActive=5
# 最大等待时间
druid.maxWait=3000

2.3.log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.4.log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5"><Properties><property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /><property name="LOG_LEVEL" value="ERROR" /></Properties><appenders><console name="console" target="SYSTEM_OUT"><PatternLayout pattern="${LOG_PATTERN}"/><ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/></console><File name="log" fileName="tmp/log/job.log" append="false"><PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/></File></appenders><loggers><root level="${LOG_LEVEL}"><appender-ref ref="console"/><appender-ref ref="log"/></root></loggers>
</configuration>

3)util

3.1.KafkaMysqlUtils

public class KafkaUtils {public static FlinkKafkaConsumer<ConsumerRecord<String, String>> getKafkaConsumer(List<String> topic) throws IOException {Properties prop1 = confFromYaml();//认证环境String envType = prop1.getProperty("envType");Properties prop = new Properties();System.setProperty("java.security.krb5.conf", "/opt/conf/krb5.conf");prop.put("security.protocol", "SASL_PLAINTEXT");prop.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "+ "useTicketCache=false  "+ "serviceName=\"" + "kafka" + "\" "+ "useKeyTab=true "+ "keyTab=\"" + "/opt/conf/test.keytab" + "\" "+ "principal=\"" + getKafkaKerberos(envType).get("principal") + "\";");prop.put("bootstrap.servers", getKafkaKerberos(envType).get("bootstrap.servers"));prop.put("group.id", "Kafka2Mysql_test");prop.put("auto.offset.reset", "earliest");prop.put("enable.auto.commit", "false");prop.put("max.poll.interval.ms", "60000");prop.put("max.poll.records", "3000");prop.put("session.timeout.ms", "600000");//        List<String> topics = Stream.of(prop.getProperty("topics").split(",", -1))
//                .collect(Collectors.toList());prop.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");prop.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<ConsumerRecord<String, String>>(topic, new CustomDeSerializationSchema(), prop);consumer.setStartFromGroupOffsets();consumer.setCommitOffsetsOnCheckpoints(true);return consumer;}public static void main(String[] args) throws Exception {Properties druidConf = KafkaUtils.getDruidConf();if (druidConf == null) {throw new RuntimeException("缺少druid相关配置信息,请检查");}DataSource dataSource = DruidDataSourceFactory.createDataSource(druidConf);Connection connection = dataSource.getConnection();PreparedStatement showDatabases = connection.prepareStatement("\n" +"select count(*) from tab_factory");ResultSet resultSet = showDatabases.executeQuery();while (resultSet.next()) {String string = resultSet.getString(1);System.out.println(string);}resultSet.close();showDatabases.close();connection.close();}public static Properties getDruidConf() {try {Properties prop = confFromYaml();String driverClassName = prop.get("druid.driverClassName").toString();String url = prop.get("druid.url").toString();String username = prop.get("druid.username").toString();String password = prop.get("druid.password").toString();String initialSize = prop.get("druid.initialSize").toString();String maxActive = prop.get("druid.maxActive").toString();String maxWait = prop.get("druid.maxWait").toString();Properties p = new Properties();p.put("driverClassName", driverClassName);p.put("url", url);p.put("username", username);p.put("password", password);p.put("initialSize", initialSize);p.put("maxActive", maxActive);p.put("maxWait", maxWait);
//            p.forEach((k,v)-> System.out.println("连接池属性 "+k+"="+v));return p;} catch (Exception e) {e.printStackTrace();}return null;}// envType     PRE  PRDpublic static Map<String, String> getKafkaKerberos(String envType) {Map<String, String> map = new HashMap<>();if ("PRD".equalsIgnoreCase(envType)) {map.put("principal", "prd@PRD.PRD.COM");map.put("bootstrap.servers", "kfk01.prd:9092,kfk02.prd:9092,kfk03.prd:9092,kfk04.prd:9092,kfk05.prd:9092,kfk06.prd:9092");} else if ("PRE".equalsIgnoreCase(envType)) {map.put("principal", "pre@PRE.PRE.COM");map.put("bootstrap.servers", "kfk01.pre:9092,kfk02.pre:9092,kfk03.pre:9092");} /*else if ("TEST".equalsIgnoreCase(envType)) {map.put("principal","test@TEST.TEST.COM");map.put("bootstrap.servers","test@TEST.TEST.COM");} */ else {System.out.println("没有该" + envType + "环境");throw new RuntimeException("没有该" + envType + "环境");}return map;}public static StreamExecutionEnvironment setupFlinkEnv(StreamExecutionEnvironment env) throws IOException {Properties prop = confFromYaml();env.enableCheckpointing(Long.valueOf(prop.getProperty("checkpoint.interval")), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(prop.getProperty("checkpoint.minPauseBetweenCheckpoints")));env.getCheckpointConfig().setCheckpointTimeout(Long.valueOf(prop.getProperty("checkpoint.checkpointTimeout")));env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.valueOf(prop.getProperty("maxConcurrentCheckpoints")));env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.valueOf(prop.getProperty("restartStrategy")), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次Time.of(Integer.valueOf(prop.getProperty("restartInterval")), TimeUnit.SECONDS) // 延时));// 设置状态后端存储方式
//        env.setStateBackend(new RocksDBStateBackend((String) prop.getProperty("checkPointPath"), true));
//        env.setStateBackend(new MemoryStateBackend());env.setStateBackend(new RocksDBStateBackend(String.valueOf(prop.getProperty("checkpointDataUri")), true));return env;}public static Properties confFromYaml() {Properties prop = new Properties();InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");try {prop.load(resourceStream);} catch (Exception e) {e.printStackTrace();} finally {try {if (resourceStream != null) {resourceStream.close();}} catch (Exception ex) {ex.printStackTrace();}}return prop;}
}

3.2.CustomDeSerializationSchema

public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {private static String encoding = "UTF8";//是否表示l流的最后一条元素,设置为false,表示数据会源源不断的到来@Overridepublic boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {return false;}//这里返回一个ConsumerRecord<String,String>类型的数据,除了原数据还包括topic,offset,partition等信息@Overridepublic ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {byte[] key = (record.key() == null ? "".getBytes() : record.key());return new ConsumerRecord<String, String>(record.topic(),record.partition(),record.offset(),record.timestamp(),record.timestampType(),record.checksum(),record.serializedKeySize(),record.serializedValueSize(),/*这里我没有进行空值判断,生产一定记得处理*/new  String(key, encoding),new  String(record.value(), encoding));}//指定数据的输入类型@Overridepublic TypeInformation<ConsumerRecord<String, String>> getProducedType() {return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {});}
}

4)po

4.1.TableBean

@Data
public class TableBean {private String database;private String table;private String primaryKey;private TableBean() {}public TableBean(String database, String table, String primaryKey) {this.database = '`' + database + '`';this.table = '`' + table + '`';this.primaryKey = primaryKey;}
}

5)kafkacdc2mysql

5.1.Kafka2MysqlApp

public class Kafka2MysqlApp {// key 是 topic 名,value是对应数据库表中的主键列名private static final Map<String, TableBean> map = new HashMap<>();static {//表名这里没有进行配置,后面根据实际业务进行配置即可map.put("mysql_tab1", new TableBean("db1", "", "alarm_id"));map.put("mysql_tab2", new TableBean("db2", "", "id"));}public static void main(String[] args) throws Exception {ArrayList<String> topicList = new ArrayList<>(map.keySet());StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();KafkaUtils.setupFlinkEnv(env);RichSinkFunction<ConsumerRecord<String, String>> sinkFunction =new RichSinkFunction<ConsumerRecord<String, String>>() {DataSource dataSource = null;@Overridepublic void open(Configuration parameters) throws Exception {initDruidDataSource();}private void initDruidDataSource() throws Exception {Properties druidConf = KafkaUtils.getDruidConf();if (druidConf == null) {throw new RuntimeException("缺少druid相关配置信息,请检查");}dataSource = DruidDataSourceFactory.createDataSource(druidConf);}@Overridepublic void close() throws Exception {}@Overridepublic void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {if (dataSource == null) {throw new RuntimeException("连接池未初始化");}String operationType = "";String keyId = "";String sql = "";try (Connection connection = dataSource.getConnection()) {//定义表名String table_name = record.topic();JSONObject jsonObject = JSONObject.parseObject(record.value());operationType = jsonObject.getString("operationType");jsonObject.remove("operationType");String primaryKey = map.get(record.topic()).getPrimaryKey();String database = map.get(record.topic()).getDatabase();keyId = jsonObject.getString(primaryKey);List<String> columns = new ArrayList<>();List<String> columnValues = new ArrayList<>();jsonObject.forEach((k, v) -> {columns.add(k);columnValues.add(v.toString());});if ("INSERT".equals(operationType)) {try {sql = "delete from " + database + "." + table_name + " where " + primaryKey + "= ?";PreparedStatement preparedStatement = connection.prepareStatement(sql);preparedStatement.setObject(1, keyId);preparedStatement.executeUpdate();preparedStatement.close();} catch (Exception ignore) {}StringBuilder sb = new StringBuilder();sb.append("insert into ").append(database).append(".").append(table_name).append("(");for (String column : columns) {sb.append("`").append(column).append("`,");}sb.append(") values(");for (String columnValue : columnValues) {sb.append("?,");}sb.append(")");//去除最后一个逗号sql = sb.toString().replace(",)", ")");PreparedStatement preparedStatement = connection.prepareStatement(sql);for (int i = 0; i < columnValues.size(); i++) {preparedStatement.setObject(i + 1, columnValues.get(i));}preparedStatement.executeUpdate();preparedStatement.close();} else if ("UPDATE".equals(operationType)) {StringBuilder sb = new StringBuilder();sb.append("update ").append(database).append(".").append(table_name).append(" set ");for (String column : columns) {sb.append("`").append(column).append("`= ?,");}String sqlPre = sb.substring(0, sb.length() - 1);sql = sqlPre + " where " + primaryKey + "='" + keyId + "'";PreparedStatement preparedStatement = connection.prepareStatement(sql);for (int i = 0; i < columnValues.size(); i++) {preparedStatement.setObject(i + 1, columnValues.get(i));}preparedStatement.executeUpdate();preparedStatement.close();} else if ("DELETE".equals(operationType)) {sql = "delete from " + database + "." + table_name + " where " + primaryKey + "= ?";PreparedStatement preparedStatement = connection.prepareStatement(sql);preparedStatement.setObject(1, keyId);preparedStatement.executeUpdate();preparedStatement.close();}} catch (Exception e) {System.out.printf("mysql同步操作(%s)有误,主键是%s,原因是%s,对应topic数据是%s%n", operationType, keyId, e.getMessage(), record);System.out.println("执行sql语句为 " + sql);throw new RuntimeException(e);}}};env.addSource(KafkaUtils.getKafkaConsumer(topicList)).addSink(sinkFunction);env.execute("kafka2mysql synchronization " + topicList.toString());}
}

相关文章:

【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)

【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql&#xff08;根据对应操作类型进行增、删、改操作&#xff09; 1&#xff09;导入依赖2&#xff09;resources2.1.appconfig.yml2.2.application.properties2.3.log4j.properties2.4.log4j2.xml 3&#xff09;uti…...

SpringMVC学习与开发(四)

注&#xff1a;此为笔者学习狂神说SpringMVC的笔记&#xff0c;其中包含个人的笔记和理解&#xff0c;仅做学习笔记之用&#xff0c;更多详细资讯请出门左拐B站&#xff1a;狂神说!!! 11、Ajax初体验 1、伪造Ajax 结果&#xff1a;并未有xhr异步请求 <!DOCTYPE html> &…...

odoo17核心概念view7——listview总体框架分析

这是view系列的第七篇文章&#xff0c;今天主要介绍我们最常用的list视图。 1、先看list_view,这是主文件 /** odoo-module */import { registry } from "web/core/registry"; import { RelationalModel } from "web/model/relational_model/relational_mode…...

大创项目推荐 深度学习交通车辆流量分析 - 目标检测与跟踪 - python opencv

文章目录 0 前言1 课题背景2 实现效果3 DeepSORT车辆跟踪3.1 Deep SORT多目标跟踪算法3.2 算法流程 4 YOLOV5算法4.1 网络架构图4.2 输入端4.3 基准网络4.4 Neck网络4.5 Head输出层 5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; *…...

数字图像处理——亚像素边缘的轮廓提取

像素 像素是图像处理中的基本单位&#xff0c;一个像素是图像中最小的离散化单位&#xff0c;具有特定的位置和颜色信息。在数字图像中&#xff0c;每个像素都有一个特定的坐标&#xff0c;通常以行和列的形式表示。每个像素的颜色信息可以通过不同的表示方式&#xff0c;如灰…...

【六袆 - Framework】vue3入门;vue框架的特点矩阵列举;Vue.js 工作原理

vue框架的特点 Vue.js的特点展开叙述Vue.js的工作原理展开叙述 官方文档&#xff1a; https://cn.vuejs.org/guide/introduction.html Vue.js的特点 ┌────────────────────┬────────────────────────────────────…...

GO学习记录 —— 创建一个GO项目

文章目录 前言一、项目介绍二、目录介绍三、创建过程1.引入Gin框架、创建main2.加载配置文件3.连接MySQL、redis4.创建结构体5.错误处理、返回响应处理 前言 代码地址 下载地址&#xff1a;https://github.com/Lee-ZiMu/Golang-Init.git 一、项目介绍 1、使用Gin框架来创建项…...

C语言中的goto语句:使用、争议与最佳实践

各位少年&#xff1a; 引言&#xff1a; 在C语言编程中&#xff0c;goto语句是一个历史悠久且颇具争议的控制流结构。作为无条件跳转指令&#xff0c;它允许程序执行从当前点直接跳转到同一函数内的任意位置&#xff0c;由一个标签&#xff08;label&#xff09;来指定目标。尽…...

wpf-动态设置组件【按钮为例】样式

文章速览 解决方案具体实现Converter 部分创建样式Binding样式 坚持记录实属不易&#xff0c;希望友善多金的码友能够随手点一个赞。 共同创建氛围更加良好的开发者社区&#xff01; 谢谢~ 解决方案 创建一个Converter&#xff0c;返回对应的style实现对应的修改 创建多个样式…...

40道MyBatis面试题带答案(很全)

1. 什么是MyBatis &#xff08;1&#xff09;Mybatis是一个半ORM&#xff08;对象关系映射&#xff09;框架&#xff0c;它内部封装了JDBC&#xff0c;开发时只需要关注SQL语句本身&#xff0c;不需要花费精力去处理加载驱动、创建连接、创建statement等繁杂的过程。程序员直接…...

python:PyCharm更改.PyCharm配置文件夹存储位置

关联账号文章&#xff1a;另外的账号 在启动 PyCharm 后选择 Help -> Edit Custom Properties 的选项&#xff0c;弹出&#xff1a; 选择 Create &#xff0c;之后在文件中添加配置文件新的存储位置即可&#xff0c;例如&#xff1a; idea.config.pathD:/Program Files/.Py…...

Centos安装Kafka(KRaft模式)

1. KRaft引入 Kafka是一种高吞吐量的分布式发布订阅消息系统&#xff0c;它可以处理消费者在网站中的所有动作流数据。其核心组件包含Producer、Broker、Consumer&#xff0c;以及依赖的Zookeeper集群。其中Zookeeper集群是Kafka用来负责集群元数据的管理、控制器的选举等。 由…...

学习笔记13——Spring整合Mybatis、junit、AOP、事务

学习笔记系列开头惯例发布一些寻亲消息 链接&#xff1a;https://baobeihuijia.com/bbhj/ Mybatis - Spring&#xff08;使用第三方包new一个对象bean&#xff09; 原始的Mybatis与数据库交互【通过sqlmapconfig来配置和连接】 初始化SqlSessionFactory获得连接获取数据层接口…...

【12月比赛合集】4场可报名的「创新应用」、「数据分析」和「程序设计」大奖赛,任君挑选!

CompHub[1] 实时聚合多平台的数据类(Kaggle、天池…)和OJ类(Leetcode、牛客…&#xff09;比赛。本账号会推送最新的比赛消息&#xff0c;欢迎关注&#xff01; 以下信息仅供参考&#xff0c;以比赛官网为准 目录 数据分析赛&#xff08;1场比赛&#xff09;程序设计赛&#…...

Cisco模拟器-企业网络部署

某企业园区网有&#xff1a;2个分厂&#xff08;分别是&#xff1a;零件分厂、总装分厂&#xff09;1个总厂网络中心 1个总厂会议室&#xff1b; &#xff08;1&#xff09;每个分厂有自己的路由器&#xff0c;均各有&#xff1a;1个楼宇分厂网络中心 每个楼宇均包含&#x…...

WPF+Halcon 培训项目实战(12):WPF导出匹配模板

文章目录 前言相关链接项目专栏运行环境匹配图片WPF导出匹配模板如何了解Halcon和C#代码的对应关系逻辑分析&#xff1a;添加截取ROI功能基类矩形圆形 生成导出模板运行结果&#xff1a;可能的报错你的文件路径不存在你选择的区域的内容有效信息过少 前言 为了更好地去学习WPF…...

uniapp中uview组件库的丰富Upload 上传上午用法

目录 基础用法 #上传视频 #文件预览 #隐藏上传按钮 #限制上传数量 #自定义上传样式 API #Props #Methods #Slot #Events 基础用法 可以通过设置fileList参数(数组&#xff0c;元素为对象)&#xff0c;显示预置的图片。其中元素的url属性为图片路径 <template>…...

Unity关于动画混合树(Blend Tree)的使用

在动画与动画的切换过程中&#xff0c;常因为两个动画之间的差距过大&#xff0c;而显得动画的切换很不自然。 这时候就需要动画混合树Blend Tree这个功能。使用混合树可以将多个动画混合在一起&#xff0c;例如在处理角色的移动中&#xff0c;走动画与跑动画切换的时候&#x…...

怎么下载landsat 8影像并在ArcGIS Pro中进行波段组合

Landsat 8&#xff08;前身为Landsat数据连续性任务&#xff0c;或 LDCM&#xff09;于2013年2月11日由 Atlas-V火箭从加利福尼亚州范登堡空军基地发射升空&#xff0c;这里为大家介绍一下该数据的下载的方法&#xff0c;希望能对你有所帮助。 注册账号 如果之前已经注册过的…...

编程新手IDE

身为一个前端开发者&#xff0c;我深知一个好的开发环境对于编程体验的重要性。对于新手来说&#xff0c;选择一个合适的IDE&#xff08;集成开发环境&#xff09;更是至关重要。一个好的IDE可以提高编程效率&#xff0c;减少错误&#xff0c;让新手更专注于学习编程本身。 今…...

如何将一个JSON字符串解析为JavaScript对象或值

JSON.parse(JSON.stringify(data)) 将后端传入的JSON数据data放入该方法的参数中&#xff0c;返回的结果就是JavaScript对象 比如将后端传入的对象key作为对象&#xff0c;而不是字符串双引号格式 {"path": "/home","name": "home",…...

idea配置docker推送本地镜像到远程私有仓库

目录 1&#xff0c;搭建远程Docker 私有仓库 Docker registry 2&#xff0c;Windows10/11系统上安装Docker Desktop 3&#xff0c;idea 配置远程私有仓库地址 4&#xff0c;idea 配置Docker 5&#xff0c;idea在本地构建镜像 6&#xff0c;推送本地Docker镜像到远程 Dock…...

Spring Boot学习随笔- 集成MyBatis-Plus(二)条件查询QueryWrapper、聚合函数的使用、Lambda条件查询

学习视频&#xff1a;【编程不良人】Mybatis-Plus整合SpringBoot实战教程,提高的你开发效率,后端人员必备! 查询方法详解 普通查询 // 根据主键id去查询单个结果的。 Test public void selectById() {User user userMapper.selectById(1739970502337392641L);System.out.print…...

十二、K8S之污点和容忍

污点和容忍 一、概念 k8s 集群中可能管理着非常庞大的服务器&#xff0c;这些服务器可能是各种各样不同类型的&#xff0c;比如机房、地理位置、配置等&#xff0c;有些是计算型节点&#xff0c;有些是存储型节点&#xff0c;此时我们希望能更好的将 pod 调度到与之需求更匹配…...

llvm后端之指令选择源码分析

llvm后端之指令选择源码分析 引言1 主要流程1.1 参数降级1.2 构建DAG1.3 类型合法化1.4 向量合法化1.5 DAG合法化1.6 DAG合并 2 目标实现2.1 TargetLowering2.2 SelectionDAGISel 引言 llvm后端指令选择主要是class SelectionDAGISel的子类实现。整个过程将llvm IR转为有向无环…...

【消息中间件】Rabbitmq消息可靠性、持久化机制、各种消费

原文作者&#xff1a;我辈李想 版权声明&#xff1a;文章原创&#xff0c;转载时请务必加上原文超链接、作者信息和本声明。 文章目录 前言一、常见用法1.消息可靠性2.持久化机制3.消息积压批量消费&#xff1a;增加 prefetch 的数量,提高单次连接的消息数并发消费&#xff1a;…...

aws-sdk-cpp通过bazel构建的S3_client轮子

感觉时间过得很快&#xff0c;又是很久没有更新了 哎&#xff0c;主要原因还是很久都没有学什么东西了&#xff0c;进入社会后不知不觉间倦怠了许多 没什么办法&#xff0c;上班了之后做的很多东西都是调用api&#xff0c;越来越像一个工具人了&#xff0c;虽然说本身也大差不…...

关于WPF MVVM 的详细使用过程以及注意的问题

WPF MVVM 是一种常用的设计模式&#xff0c;在 WPF 应用程序中使用它可以更好地分离界面逻辑和业务逻辑&#xff0c;并且更容易进行单元测试和重构。下面是深入理解 WPF MVVM 的详细使用过程以及注意的问题。 一、MVVM 的基本概念 MVVM 是 Model-View-ViewModel 的缩写&#…...

计算机视觉 全教程目录

1、OpenCV 图像处理框架 实战系列 总目录 OpenCV 图像处理框架 实战系列 总目录 2、现代卷积网络实战系列 总目录 现代卷积网络实战系列 总目录 3、YOLO 物体检测 系列教程 总目录 YOLO 物体检测 系列教程 总目录 4、图像分割实战-系列教程 总目录 图像分割实战-系列教程 总目录…...

油猴脚本开发,之如何添加html和css

简介 油猴是一个脚本管理器,让我们能够方便的使用js脚本&#xff0c;以实现对页面内容的修改、功能增强或其他定制化操作。 常见脚本管理器 Tampermonkey 应该是各位见得最多的也是最知名的&#xff0c;好用又稳定&#xff0c;多浏览器支持Greasemonkey 用户脚本始祖&#x…...

【MATLAB】BiGRU神经网络时序预测算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~也可转原文链接获取~ 1 基本定义 BiGRU神经网络时序预测算法是一种基于双向门控循环单元&#xff08;GRU&#xff09;的多变量时间序列预测方法。该方法结合了双向模型和门控机制&#xff0c;旨在有效地捕捉时间序列数据中…...

57.0/初识 PhotoShopCS4(详细版)

目录 57.1 PhotoShop 概要 57.2.1 像素和分辨率 57.2.2 色彩模式 57.2.3 位图和矢量图 57.3 PhotoShop 基本操作 57.3.1 PhotoShop 界面的认识 57.3.2 PhotoShop 基本界面工具 57.3.3 移动选择工具(V) 57.3.4 选框工具(M)​编辑 ​编辑57.3.5 套索工具(L) 57.3…...

[C#]opencvsharp进行图像拼接普通拼接stitch算法拼接

介绍&#xff1a; opencvsharp进行图像拼一般有2种方式&#xff1a;一种是传统方法将2个图片上下或者左右拼接&#xff0c;还有一个方法就是融合拼接&#xff0c;stitch拼接就是一种非常好的算法。opencv里面已经有stitch拼接算法因此我们很容易进行拼接。 效果&#xff1a; …...

《妙趣横生的算法》(C语言实现)-第10章算法设计与数据结构面试题精粹

【10-1】输入一个字符串并将它输出&#xff0c;以ctrlz组合键表示输入完毕&#xff0c;要求将输入的字符串中多于1个的连续空格符合并为1个。 //10-1 2023年12月30日17点11分-17点18分 # include <stdio.h> int main() {char c;c getchar();//scanf("%c", &a…...

(JAVA)-(网络编程)-初始网络编程

网络编程就是在通信协议下&#xff0c;不同的计算机上运行的程序&#xff0c;进行的数据传输。 讲的通俗一点&#xff0c;就是以前我们写的代码是单机版的&#xff0c;网络编程就是联机版的。 应用场景&#xff1a;即时通信&#xff0c;网游对战&#xff0c;金融证券&#xf…...

Observer观察者模式(组件协作)

观察者模式&#xff08;组件协作&#xff09; 链接&#xff1a;观察者模式实例代码 解析 目的 在软件构建过程中&#xff0c;我们需要为某些对象建立一种“通知依赖关系” ——一个对象&#xff08;目标对象&#xff09;的状态发生改变&#xff0c;所有的依赖对象&#xff0…...

数据挖掘 聚类度量

格式化之前的代码&#xff1a; import numpy as np#计算 import pandas as pd#处理结构化表格 import matplotlib.pyplot as plt#绘制图表和可视化数据的函数&#xff0c;通常与numpy和pandas一起使用。 from sklearn import metrics#聚类算法的评估指标。 from sklearn.clust…...

[Angular] 笔记 24:ngContainer vs. ngTemplate vs. ngContent

请说明 Angular 中 ngContainer&#xff0c; ngTemplate 和 ngContent 这三者之间的区别。 chatgpt 回答&#xff1a; 这三个在 Angular 中的概念是关于处理和组织视图的。 1. ngContainer&#xff1a; ngContainer 是一个虚拟的 HTML 容器&#xff0c;它本身不会在最终渲染…...

❀My排序算法学习之插入排序❀

目录 插入排序(Insertion Sort):) 一、定义 二、基本思想 三、示例 时间复杂度 空间复杂度 bash C++ 四、稳定性分析...

【算法题】30. 串联所有单词的子串

题目 给定一个字符串 s 和一个字符串数组 words。 words 中所有字符串 长度相同。 s 中的 串联子串 是指一个包含 words 中所有字符串以任意顺序排列连接起来的子串。 例如&#xff0c;如果 words ["ab","cd","ef"]&#xff0c; 那么 "…...

SAP-FI模块 处理自动生成会计凭证增强

ENHANCEMENT 2 ZEHENC_SAPMF05A. "active version * FI 20221215&#xff1a;固定资产业务过渡科目摘要增强功能 WAIT UP TO 1 SECONDS.READ TABLE xbseg WITH KEY hkont 1601990001. IF sy-subrc 0.DATA: lt_bkdf TYPE TABLE OF bkdf,lt_bkpf TYPE TABLE OF bkpf,…...

Shell脚本-bin/bash: 解释器错误: 没有那个文件或目录-完整路径执行-“/”引发的脑裂

引起该不适的一种可能以及解决方案&#xff0c;网上较多&#xff0c;比如&#xff1a; 但按以上方式操作&#xff0c;并经过查看&#xff0c;发现仍然未能解决问题。 因为两种方式执行&#xff0c;有一种能成功&#xff0c;有一种不能&#xff0c;刚开始未怀疑是文件问题&…...

React MUI(版本v5.15.2)详细使用

使用React MUI&#xff08;版本v5.15.2&#xff09;的详细示例。请注意&#xff0c;由于版本可能会有所不同&#xff0c;因此建议您查阅官方文档以获取最新的信息和示例。但是&#xff0c;我将根据我的知识库为您提供一些基本示例。 首先&#xff0c;确保您已经按照之前的说明…...

用CSS中的动画效果做一个转动的表

<!DOCTYPE html> <html lang"en"><head><meta charset"utf-8"><title></title><style>*{margin:0;padding:0;} /*制作表的样式*/.clock{width: 500px;height: 500px;margin:0 auto;margin-top:100px;border-rad…...

【linux】Linux管道的原理与使用场景

Linux管道是Linux命令行界面中一种强大的工具&#xff0c;它允许用户将多个命令链接起来&#xff0c;使得一个命令的输出可以作为另一个命令的输入。这种机制使得我们可以创建复杂的命令链&#xff0c;并在处理数据时提供了极大的灵活性。在本文中&#xff0c;我们将详细介绍Li…...

nvidia jetson xavier nx developer kit version emmc版重装系统

一、将开发板上的外置硬盘取下来格式化 二、在双系统ubuntu安装SDK Manager&#xff08;.deb文件&#xff09; SDK Manager | NVIDIA Developer sudo apt install ./sdkmanager_1.9.2-10884_amd64.deb 报错直接百度错误&#xff0c;执行相应命令即可 三、 运行SDK Manager …...

命令模式-实例使用

未使用命令模式的UML 使用命令模式后的UML public abstract class Command {public abstract void execute(); }public class Invoker {private Command command;/*** 为功能键注入命令* param command*/public void setCommand(Command command) {this.command command;}/***…...

将网页变身移动应用:网址封装成App的完全指南

什么是网址封装&#xff1f; 网址封装是一个将你的网站或网页直接嵌入到一个原生应用容器中的过程。用户可以通过下载你的App来访问网站&#xff0c;而无需通过浏览器。这种方式不仅提升了用户体验&#xff0c;还可利用移动设备的功能&#xff0c;如推送通知和硬件集成。 小猪…...

探讨kernel32.dll文件是什么,有效解决kernel32.dll丢失

在使用电脑时&#xff0c;你是否遇到过kernel32.dll丢失的困扰&#xff1f;面对这个问题&#xff0c;我们需要及时去解决kernel32.dll丢失的问题。接下来&#xff0c;我们将深入探讨kernel32.dll的功能以及其在操作系统和应用程序中的具体应用领域&#xff0c;相信这将对你解决…...

LOAM: Lidar Odometry and Mapping in Real-time 论文阅读

论文链接 LOAM: Lidar Odometry and Mapping in Real-time 0. Abstract 提出了一种使用二维激光雷达在6自由度运动中的距离测量进行即时测距和建图的方法 距离测量是在不同的时间接收到的&#xff0c;并且运动估计中的误差可能导致生成的点云的错误配准 本文的方法在不需要高…...