当前位置: 首页 > news >正文

【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)

【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)

  • 1)导入依赖
  • 2)resources
    • 2.1.appconfig.yml
    • 2.2.application.properties
    • 2.3.log4j.properties
    • 2.4.log4j2.xml
  • 3)util
    • 3.1.KafkaMysqlUtils
    • 3.2.CustomDeSerializationSchema
  • 4)po
    • 4.1.TableBean
  • 5)kafkacdc2mysql
    • 5.1.Kafka2MysqlApp

需求描述:

1、数据从 Kafka 写入 Mysql。

2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。

3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。

4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。

5、读取时使用自定义 Source,写入时使用自定义 Sink。

6、消费 Kafka 数据时自定义反序列化。

1)导入依赖

这里的依赖比较冗余,大家可以根据各自需求做删除或保留。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>gaei.cn.x5l</groupId><artifactId>x8vbusiness</artifactId><version>1.0.0</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><target.java.version>1.8</target.java.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.10</scala.version><flink.version>1.14.0</flink.version><log4j.version>2.17.2</log4j.version><hadoop.version>3.1.2</hadoop.version><hive.version>3.1.2</hive.version><mongo.driver.version>3.12.6</mongo.driver.version><mongo.driver.core.version>4.3.1</mongo.driver.core.version></properties><dependencies><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version><!--            <exclusions>--><!--                <exclusion>--><!--                    <groupId>mysql</groupId>--><!--                    <artifactId>mysql-connector-java</artifactId>--><!--                </exclusion>--><!--            </exclusions>--></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency><!-- 基础依赖  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 基础依赖  结束--><!-- TABLE  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>1.14.0</version><scope>provided</scope></dependency><!-- 使用 hive sql时注销,其他时候可以放开 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- TABLE  结束--><!-- sql  开始--><!-- sql解析 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- sql解析 结束 --><!-- sql连接 kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- sql  结束--><!-- 检查点 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-state-processor-api_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version><scope>compile</scope></dependency><!-- 本地监控任务 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 本地监控任务 结束 --><!-- DataStream 开始 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><!-- hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.1</version><exclusions><exclusion><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></exclusion></exclusions></dependency><!-- 重点,容易被忽略的jar --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>${hadoop.version}</version></dependency><!-- rocksdb_2 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 其他 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency><dependency><groupId>org.jyaml</groupId><artifactId>jyaml</artifactId><version>1.3</version></dependency><!-- TABLE  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><!--            <version>${flink.version}</version>--><version>1.13.5</version><scope>provided</scope></dependency><!-- TABLE  结束--><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.3</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><version>2.3.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><!--            <version>5.1.44</version>--><version>8.0.27</version><scope>runtime</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.2.8</version></dependency><dependency><groupId>org.mongodb</groupId><artifactId>bson</artifactId><version>${mongo.driver.core.version}</version></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver-core</artifactId><version>${mongo.driver.core.version}</version></dependency><!--    使用 mongodb-driver 重新打包成的 custom-mongo-core  --><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>3.12.6</version></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude><exclude>org.apache.flink:flink-runtime-web_2.11</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass></transformer><!-- flink sql 需要  --><!-- The service transformer is needed to merge META-INF/services files --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><!-- ... --></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.0.0,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>

2)resources

2.1.appconfig.yml

mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false"
mysql.username: "test"
mysql.password: "123456"
mysql.driver: "com.mysql.jdbc.Driver"

2.2.application.properties

url=mongodb://test:test123456@10.1.1.1:34516/?authSource=admin
#database=diagnosis
#collection=diagnosisEntiry
maxConnectionIdleTime=1000000
batchSize=1# flink
checkpoint.interval=300000
checkpoint.minPauseBetweenCheckpoints=10000
checkpoint.checkpointTimeout=400000
maxConcurrentCheckpoints=1
restartInterval=120
restartStrategy=3
checkpointDataUri=hdfs://nameserver/user/flink/rocksdbcheckpoint_mongomysql.url=jdbc:mysql://1.1.1.1:3306/test?useSSL=false
mysql.username=test
mysql.password=123456#envType=PRE
envType=PRD# mysql  druid 连接池生产环境连接池配置
druid.driverClassName=com.mysql.jdbc.Driver
#生产
druid.url=jdbc:mysql://1.1.1.1:3306/test
druid.username=test
druid.password=123456
# 初始化连接数
druid.initialSize=1
# 最大连接数
druid.maxActive=5
# 最大等待时间
druid.maxWait=3000

2.3.log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.4.log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5"><Properties><property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /><property name="LOG_LEVEL" value="ERROR" /></Properties><appenders><console name="console" target="SYSTEM_OUT"><PatternLayout pattern="${LOG_PATTERN}"/><ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/></console><File name="log" fileName="tmp/log/job.log" append="false"><PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/></File></appenders><loggers><root level="${LOG_LEVEL}"><appender-ref ref="console"/><appender-ref ref="log"/></root></loggers>
</configuration>

3)util

3.1.KafkaMysqlUtils

public class KafkaUtils {public static FlinkKafkaConsumer<ConsumerRecord<String, String>> getKafkaConsumer(List<String> topic) throws IOException {Properties prop1 = confFromYaml();//认证环境String envType = prop1.getProperty("envType");Properties prop = new Properties();System.setProperty("java.security.krb5.conf", "/opt/conf/krb5.conf");prop.put("security.protocol", "SASL_PLAINTEXT");prop.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "+ "useTicketCache=false  "+ "serviceName=\"" + "kafka" + "\" "+ "useKeyTab=true "+ "keyTab=\"" + "/opt/conf/test.keytab" + "\" "+ "principal=\"" + getKafkaKerberos(envType).get("principal") + "\";");prop.put("bootstrap.servers", getKafkaKerberos(envType).get("bootstrap.servers"));prop.put("group.id", "Kafka2Mysql_test");prop.put("auto.offset.reset", "earliest");prop.put("enable.auto.commit", "false");prop.put("max.poll.interval.ms", "60000");prop.put("max.poll.records", "3000");prop.put("session.timeout.ms", "600000");//        List<String> topics = Stream.of(prop.getProperty("topics").split(",", -1))
//                .collect(Collectors.toList());prop.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");prop.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<ConsumerRecord<String, String>>(topic, new CustomDeSerializationSchema(), prop);consumer.setStartFromGroupOffsets();consumer.setCommitOffsetsOnCheckpoints(true);return consumer;}public static void main(String[] args) throws Exception {Properties druidConf = KafkaUtils.getDruidConf();if (druidConf == null) {throw new RuntimeException("缺少druid相关配置信息,请检查");}DataSource dataSource = DruidDataSourceFactory.createDataSource(druidConf);Connection connection = dataSource.getConnection();PreparedStatement showDatabases = connection.prepareStatement("\n" +"select count(*) from tab_factory");ResultSet resultSet = showDatabases.executeQuery();while (resultSet.next()) {String string = resultSet.getString(1);System.out.println(string);}resultSet.close();showDatabases.close();connection.close();}public static Properties getDruidConf() {try {Properties prop = confFromYaml();String driverClassName = prop.get("druid.driverClassName").toString();String url = prop.get("druid.url").toString();String username = prop.get("druid.username").toString();String password = prop.get("druid.password").toString();String initialSize = prop.get("druid.initialSize").toString();String maxActive = prop.get("druid.maxActive").toString();String maxWait = prop.get("druid.maxWait").toString();Properties p = new Properties();p.put("driverClassName", driverClassName);p.put("url", url);p.put("username", username);p.put("password", password);p.put("initialSize", initialSize);p.put("maxActive", maxActive);p.put("maxWait", maxWait);
//            p.forEach((k,v)-> System.out.println("连接池属性 "+k+"="+v));return p;} catch (Exception e) {e.printStackTrace();}return null;}// envType     PRE  PRDpublic static Map<String, String> getKafkaKerberos(String envType) {Map<String, String> map = new HashMap<>();if ("PRD".equalsIgnoreCase(envType)) {map.put("principal", "prd@PRD.PRD.COM");map.put("bootstrap.servers", "kfk01.prd:9092,kfk02.prd:9092,kfk03.prd:9092,kfk04.prd:9092,kfk05.prd:9092,kfk06.prd:9092");} else if ("PRE".equalsIgnoreCase(envType)) {map.put("principal", "pre@PRE.PRE.COM");map.put("bootstrap.servers", "kfk01.pre:9092,kfk02.pre:9092,kfk03.pre:9092");} /*else if ("TEST".equalsIgnoreCase(envType)) {map.put("principal","test@TEST.TEST.COM");map.put("bootstrap.servers","test@TEST.TEST.COM");} */ else {System.out.println("没有该" + envType + "环境");throw new RuntimeException("没有该" + envType + "环境");}return map;}public static StreamExecutionEnvironment setupFlinkEnv(StreamExecutionEnvironment env) throws IOException {Properties prop = confFromYaml();env.enableCheckpointing(Long.valueOf(prop.getProperty("checkpoint.interval")), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(prop.getProperty("checkpoint.minPauseBetweenCheckpoints")));env.getCheckpointConfig().setCheckpointTimeout(Long.valueOf(prop.getProperty("checkpoint.checkpointTimeout")));env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.valueOf(prop.getProperty("maxConcurrentCheckpoints")));env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.valueOf(prop.getProperty("restartStrategy")), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次Time.of(Integer.valueOf(prop.getProperty("restartInterval")), TimeUnit.SECONDS) // 延时));// 设置状态后端存储方式
//        env.setStateBackend(new RocksDBStateBackend((String) prop.getProperty("checkPointPath"), true));
//        env.setStateBackend(new MemoryStateBackend());env.setStateBackend(new RocksDBStateBackend(String.valueOf(prop.getProperty("checkpointDataUri")), true));return env;}public static Properties confFromYaml() {Properties prop = new Properties();InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");try {prop.load(resourceStream);} catch (Exception e) {e.printStackTrace();} finally {try {if (resourceStream != null) {resourceStream.close();}} catch (Exception ex) {ex.printStackTrace();}}return prop;}
}

3.2.CustomDeSerializationSchema

public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {private static String encoding = "UTF8";//是否表示l流的最后一条元素,设置为false,表示数据会源源不断的到来@Overridepublic boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {return false;}//这里返回一个ConsumerRecord<String,String>类型的数据,除了原数据还包括topic,offset,partition等信息@Overridepublic ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {byte[] key = (record.key() == null ? "".getBytes() : record.key());return new ConsumerRecord<String, String>(record.topic(),record.partition(),record.offset(),record.timestamp(),record.timestampType(),record.checksum(),record.serializedKeySize(),record.serializedValueSize(),/*这里我没有进行空值判断,生产一定记得处理*/new  String(key, encoding),new  String(record.value(), encoding));}//指定数据的输入类型@Overridepublic TypeInformation<ConsumerRecord<String, String>> getProducedType() {return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {});}
}

4)po

4.1.TableBean

@Data
public class TableBean {private String database;private String table;private String primaryKey;private TableBean() {}public TableBean(String database, String table, String primaryKey) {this.database = '`' + database + '`';this.table = '`' + table + '`';this.primaryKey = primaryKey;}
}

5)kafkacdc2mysql

5.1.Kafka2MysqlApp

public class Kafka2MysqlApp {// key 是 topic 名,value是对应数据库表中的主键列名private static final Map<String, TableBean> map = new HashMap<>();static {//表名这里没有进行配置,后面根据实际业务进行配置即可map.put("mysql_tab1", new TableBean("db1", "", "alarm_id"));map.put("mysql_tab2", new TableBean("db2", "", "id"));}public static void main(String[] args) throws Exception {ArrayList<String> topicList = new ArrayList<>(map.keySet());StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();KafkaUtils.setupFlinkEnv(env);RichSinkFunction<ConsumerRecord<String, String>> sinkFunction =new RichSinkFunction<ConsumerRecord<String, String>>() {DataSource dataSource = null;@Overridepublic void open(Configuration parameters) throws Exception {initDruidDataSource();}private void initDruidDataSource() throws Exception {Properties druidConf = KafkaUtils.getDruidConf();if (druidConf == null) {throw new RuntimeException("缺少druid相关配置信息,请检查");}dataSource = DruidDataSourceFactory.createDataSource(druidConf);}@Overridepublic void close() throws Exception {}@Overridepublic void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {if (dataSource == null) {throw new RuntimeException("连接池未初始化");}String operationType = "";String keyId = "";String sql = "";try (Connection connection = dataSource.getConnection()) {//定义表名String table_name = record.topic();JSONObject jsonObject = JSONObject.parseObject(record.value());operationType = jsonObject.getString("operationType");jsonObject.remove("operationType");String primaryKey = map.get(record.topic()).getPrimaryKey();String database = map.get(record.topic()).getDatabase();keyId = jsonObject.getString(primaryKey);List<String> columns = new ArrayList<>();List<String> columnValues = new ArrayList<>();jsonObject.forEach((k, v) -> {columns.add(k);columnValues.add(v.toString());});if ("INSERT".equals(operationType)) {try {sql = "delete from " + database + "." + table_name + " where " + primaryKey + "= ?";PreparedStatement preparedStatement = connection.prepareStatement(sql);preparedStatement.setObject(1, keyId);preparedStatement.executeUpdate();preparedStatement.close();} catch (Exception ignore) {}StringBuilder sb = new StringBuilder();sb.append("insert into ").append(database).append(".").append(table_name).append("(");for (String column : columns) {sb.append("`").append(column).append("`,");}sb.append(") values(");for (String columnValue : columnValues) {sb.append("?,");}sb.append(")");//去除最后一个逗号sql = sb.toString().replace(",)", ")");PreparedStatement preparedStatement = connection.prepareStatement(sql);for (int i = 0; i < columnValues.size(); i++) {preparedStatement.setObject(i + 1, columnValues.get(i));}preparedStatement.executeUpdate();preparedStatement.close();} else if ("UPDATE".equals(operationType)) {StringBuilder sb = new StringBuilder();sb.append("update ").append(database).append(".").append(table_name).append(" set ");for (String column : columns) {sb.append("`").append(column).append("`= ?,");}String sqlPre = sb.substring(0, sb.length() - 1);sql = sqlPre + " where " + primaryKey + "='" + keyId + "'";PreparedStatement preparedStatement = connection.prepareStatement(sql);for (int i = 0; i < columnValues.size(); i++) {preparedStatement.setObject(i + 1, columnValues.get(i));}preparedStatement.executeUpdate();preparedStatement.close();} else if ("DELETE".equals(operationType)) {sql = "delete from " + database + "." + table_name + " where " + primaryKey + "= ?";PreparedStatement preparedStatement = connection.prepareStatement(sql);preparedStatement.setObject(1, keyId);preparedStatement.executeUpdate();preparedStatement.close();}} catch (Exception e) {System.out.printf("mysql同步操作(%s)有误,主键是%s,原因是%s,对应topic数据是%s%n", operationType, keyId, e.getMessage(), record);System.out.println("执行sql语句为 " + sql);throw new RuntimeException(e);}}};env.addSource(KafkaUtils.getKafkaConsumer(topicList)).addSink(sinkFunction);env.execute("kafka2mysql synchronization " + topicList.toString());}
}

相关文章:

【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)

【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql&#xff08;根据对应操作类型进行增、删、改操作&#xff09; 1&#xff09;导入依赖2&#xff09;resources2.1.appconfig.yml2.2.application.properties2.3.log4j.properties2.4.log4j2.xml 3&#xff09;uti…...

SpringMVC学习与开发(四)

注&#xff1a;此为笔者学习狂神说SpringMVC的笔记&#xff0c;其中包含个人的笔记和理解&#xff0c;仅做学习笔记之用&#xff0c;更多详细资讯请出门左拐B站&#xff1a;狂神说!!! 11、Ajax初体验 1、伪造Ajax 结果&#xff1a;并未有xhr异步请求 <!DOCTYPE html> &…...

odoo17核心概念view7——listview总体框架分析

这是view系列的第七篇文章&#xff0c;今天主要介绍我们最常用的list视图。 1、先看list_view,这是主文件 /** odoo-module */import { registry } from "web/core/registry"; import { RelationalModel } from "web/model/relational_model/relational_mode…...

大创项目推荐 深度学习交通车辆流量分析 - 目标检测与跟踪 - python opencv

文章目录 0 前言1 课题背景2 实现效果3 DeepSORT车辆跟踪3.1 Deep SORT多目标跟踪算法3.2 算法流程 4 YOLOV5算法4.1 网络架构图4.2 输入端4.3 基准网络4.4 Neck网络4.5 Head输出层 5 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; *…...

数字图像处理——亚像素边缘的轮廓提取

像素 像素是图像处理中的基本单位&#xff0c;一个像素是图像中最小的离散化单位&#xff0c;具有特定的位置和颜色信息。在数字图像中&#xff0c;每个像素都有一个特定的坐标&#xff0c;通常以行和列的形式表示。每个像素的颜色信息可以通过不同的表示方式&#xff0c;如灰…...

【六袆 - Framework】vue3入门;vue框架的特点矩阵列举;Vue.js 工作原理

vue框架的特点 Vue.js的特点展开叙述Vue.js的工作原理展开叙述 官方文档&#xff1a; https://cn.vuejs.org/guide/introduction.html Vue.js的特点 ┌────────────────────┬────────────────────────────────────…...

GO学习记录 —— 创建一个GO项目

文章目录 前言一、项目介绍二、目录介绍三、创建过程1.引入Gin框架、创建main2.加载配置文件3.连接MySQL、redis4.创建结构体5.错误处理、返回响应处理 前言 代码地址 下载地址&#xff1a;https://github.com/Lee-ZiMu/Golang-Init.git 一、项目介绍 1、使用Gin框架来创建项…...

C语言中的goto语句:使用、争议与最佳实践

各位少年&#xff1a; 引言&#xff1a; 在C语言编程中&#xff0c;goto语句是一个历史悠久且颇具争议的控制流结构。作为无条件跳转指令&#xff0c;它允许程序执行从当前点直接跳转到同一函数内的任意位置&#xff0c;由一个标签&#xff08;label&#xff09;来指定目标。尽…...

wpf-动态设置组件【按钮为例】样式

文章速览 解决方案具体实现Converter 部分创建样式Binding样式 坚持记录实属不易&#xff0c;希望友善多金的码友能够随手点一个赞。 共同创建氛围更加良好的开发者社区&#xff01; 谢谢~ 解决方案 创建一个Converter&#xff0c;返回对应的style实现对应的修改 创建多个样式…...

40道MyBatis面试题带答案(很全)

1. 什么是MyBatis &#xff08;1&#xff09;Mybatis是一个半ORM&#xff08;对象关系映射&#xff09;框架&#xff0c;它内部封装了JDBC&#xff0c;开发时只需要关注SQL语句本身&#xff0c;不需要花费精力去处理加载驱动、创建连接、创建statement等繁杂的过程。程序员直接…...

python:PyCharm更改.PyCharm配置文件夹存储位置

关联账号文章&#xff1a;另外的账号 在启动 PyCharm 后选择 Help -> Edit Custom Properties 的选项&#xff0c;弹出&#xff1a; 选择 Create &#xff0c;之后在文件中添加配置文件新的存储位置即可&#xff0c;例如&#xff1a; idea.config.pathD:/Program Files/.Py…...

Centos安装Kafka(KRaft模式)

1. KRaft引入 Kafka是一种高吞吐量的分布式发布订阅消息系统&#xff0c;它可以处理消费者在网站中的所有动作流数据。其核心组件包含Producer、Broker、Consumer&#xff0c;以及依赖的Zookeeper集群。其中Zookeeper集群是Kafka用来负责集群元数据的管理、控制器的选举等。 由…...

学习笔记13——Spring整合Mybatis、junit、AOP、事务

学习笔记系列开头惯例发布一些寻亲消息 链接&#xff1a;https://baobeihuijia.com/bbhj/ Mybatis - Spring&#xff08;使用第三方包new一个对象bean&#xff09; 原始的Mybatis与数据库交互【通过sqlmapconfig来配置和连接】 初始化SqlSessionFactory获得连接获取数据层接口…...

【12月比赛合集】4场可报名的「创新应用」、「数据分析」和「程序设计」大奖赛,任君挑选!

CompHub[1] 实时聚合多平台的数据类(Kaggle、天池…)和OJ类(Leetcode、牛客…&#xff09;比赛。本账号会推送最新的比赛消息&#xff0c;欢迎关注&#xff01; 以下信息仅供参考&#xff0c;以比赛官网为准 目录 数据分析赛&#xff08;1场比赛&#xff09;程序设计赛&#…...

Cisco模拟器-企业网络部署

某企业园区网有&#xff1a;2个分厂&#xff08;分别是&#xff1a;零件分厂、总装分厂&#xff09;1个总厂网络中心 1个总厂会议室&#xff1b; &#xff08;1&#xff09;每个分厂有自己的路由器&#xff0c;均各有&#xff1a;1个楼宇分厂网络中心 每个楼宇均包含&#x…...

WPF+Halcon 培训项目实战(12):WPF导出匹配模板

文章目录 前言相关链接项目专栏运行环境匹配图片WPF导出匹配模板如何了解Halcon和C#代码的对应关系逻辑分析&#xff1a;添加截取ROI功能基类矩形圆形 生成导出模板运行结果&#xff1a;可能的报错你的文件路径不存在你选择的区域的内容有效信息过少 前言 为了更好地去学习WPF…...

uniapp中uview组件库的丰富Upload 上传上午用法

目录 基础用法 #上传视频 #文件预览 #隐藏上传按钮 #限制上传数量 #自定义上传样式 API #Props #Methods #Slot #Events 基础用法 可以通过设置fileList参数(数组&#xff0c;元素为对象)&#xff0c;显示预置的图片。其中元素的url属性为图片路径 <template>…...

Unity关于动画混合树(Blend Tree)的使用

在动画与动画的切换过程中&#xff0c;常因为两个动画之间的差距过大&#xff0c;而显得动画的切换很不自然。 这时候就需要动画混合树Blend Tree这个功能。使用混合树可以将多个动画混合在一起&#xff0c;例如在处理角色的移动中&#xff0c;走动画与跑动画切换的时候&#x…...

怎么下载landsat 8影像并在ArcGIS Pro中进行波段组合

Landsat 8&#xff08;前身为Landsat数据连续性任务&#xff0c;或 LDCM&#xff09;于2013年2月11日由 Atlas-V火箭从加利福尼亚州范登堡空军基地发射升空&#xff0c;这里为大家介绍一下该数据的下载的方法&#xff0c;希望能对你有所帮助。 注册账号 如果之前已经注册过的…...

编程新手IDE

身为一个前端开发者&#xff0c;我深知一个好的开发环境对于编程体验的重要性。对于新手来说&#xff0c;选择一个合适的IDE&#xff08;集成开发环境&#xff09;更是至关重要。一个好的IDE可以提高编程效率&#xff0c;减少错误&#xff0c;让新手更专注于学习编程本身。 今…...

如何将一个JSON字符串解析为JavaScript对象或值

JSON.parse(JSON.stringify(data)) 将后端传入的JSON数据data放入该方法的参数中&#xff0c;返回的结果就是JavaScript对象 比如将后端传入的对象key作为对象&#xff0c;而不是字符串双引号格式 {"path": "/home","name": "home",…...

idea配置docker推送本地镜像到远程私有仓库

目录 1&#xff0c;搭建远程Docker 私有仓库 Docker registry 2&#xff0c;Windows10/11系统上安装Docker Desktop 3&#xff0c;idea 配置远程私有仓库地址 4&#xff0c;idea 配置Docker 5&#xff0c;idea在本地构建镜像 6&#xff0c;推送本地Docker镜像到远程 Dock…...

Spring Boot学习随笔- 集成MyBatis-Plus(二)条件查询QueryWrapper、聚合函数的使用、Lambda条件查询

学习视频&#xff1a;【编程不良人】Mybatis-Plus整合SpringBoot实战教程,提高的你开发效率,后端人员必备! 查询方法详解 普通查询 // 根据主键id去查询单个结果的。 Test public void selectById() {User user userMapper.selectById(1739970502337392641L);System.out.print…...

十二、K8S之污点和容忍

污点和容忍 一、概念 k8s 集群中可能管理着非常庞大的服务器&#xff0c;这些服务器可能是各种各样不同类型的&#xff0c;比如机房、地理位置、配置等&#xff0c;有些是计算型节点&#xff0c;有些是存储型节点&#xff0c;此时我们希望能更好的将 pod 调度到与之需求更匹配…...

llvm后端之指令选择源码分析

llvm后端之指令选择源码分析 引言1 主要流程1.1 参数降级1.2 构建DAG1.3 类型合法化1.4 向量合法化1.5 DAG合法化1.6 DAG合并 2 目标实现2.1 TargetLowering2.2 SelectionDAGISel 引言 llvm后端指令选择主要是class SelectionDAGISel的子类实现。整个过程将llvm IR转为有向无环…...

【消息中间件】Rabbitmq消息可靠性、持久化机制、各种消费

原文作者&#xff1a;我辈李想 版权声明&#xff1a;文章原创&#xff0c;转载时请务必加上原文超链接、作者信息和本声明。 文章目录 前言一、常见用法1.消息可靠性2.持久化机制3.消息积压批量消费&#xff1a;增加 prefetch 的数量,提高单次连接的消息数并发消费&#xff1a;…...

aws-sdk-cpp通过bazel构建的S3_client轮子

感觉时间过得很快&#xff0c;又是很久没有更新了 哎&#xff0c;主要原因还是很久都没有学什么东西了&#xff0c;进入社会后不知不觉间倦怠了许多 没什么办法&#xff0c;上班了之后做的很多东西都是调用api&#xff0c;越来越像一个工具人了&#xff0c;虽然说本身也大差不…...

关于WPF MVVM 的详细使用过程以及注意的问题

WPF MVVM 是一种常用的设计模式&#xff0c;在 WPF 应用程序中使用它可以更好地分离界面逻辑和业务逻辑&#xff0c;并且更容易进行单元测试和重构。下面是深入理解 WPF MVVM 的详细使用过程以及注意的问题。 一、MVVM 的基本概念 MVVM 是 Model-View-ViewModel 的缩写&#…...

计算机视觉 全教程目录

1、OpenCV 图像处理框架 实战系列 总目录 OpenCV 图像处理框架 实战系列 总目录 2、现代卷积网络实战系列 总目录 现代卷积网络实战系列 总目录 3、YOLO 物体检测 系列教程 总目录 YOLO 物体检测 系列教程 总目录 4、图像分割实战-系列教程 总目录 图像分割实战-系列教程 总目录…...

油猴脚本开发,之如何添加html和css

简介 油猴是一个脚本管理器,让我们能够方便的使用js脚本&#xff0c;以实现对页面内容的修改、功能增强或其他定制化操作。 常见脚本管理器 Tampermonkey 应该是各位见得最多的也是最知名的&#xff0c;好用又稳定&#xff0c;多浏览器支持Greasemonkey 用户脚本始祖&#x…...

手机网站seo怎么做/太原百度推广开户

问题 开始播放视频的时候&#xff0c;有一个闪动 原因 没有播放的时候&#xff0c;有封面图&#xff0c;开始播放之后&#xff0c;封面图消失&#xff0c;开始播放&#xff0c;这时候视频画面还没有展示&#xff0c;这个小的间隙就造成了闪动 修复 开始展示视频画面的时候…...

政府类网站建设费用/怎样开网站

Curl是Linux下一个非常强大的http命令行工具&#xff0c;其功能十分强大。一、CURL对HTTP的常规訪问1. 訪问站点$ curl http://www.linuxidc.com回车之后。www.linuxidc.com 的html 显示在屏幕上了 2. 保存页面用curl option: -o$ curl -o page.html http://www.linuxidc.com能…...

wordpress批量换网址/竞价托管哪家效果好

这篇很水&#xff0c;因为就只有一行代码&#xff1b; Camera.main.transform.Translate (Vector3.forward); 这行代码控制主摄像头向前移动&#xff0c;其它的如下&#xff1a; up&#xff1a;向上 down&#xff1a;向下 left&#xff1a;相左 right&#xff1a;向右 back&…...

做电脑网站手机能显示不出来怎么办/营销策略4p

转载&#xff1a;http://blog.sina.com.cn/s/blog_675dc44b0100rcqg.html1. 确定已经把xdict32.api 拷贝到Adobe Reader X安装目录下的plug_ins文件夹中(如X:/Program Files/Adobe/Reader 10.0/Reader/plug_ins/)。2. 启动Adobe Reader X后按ctrlk&#xff0c;在弹出的设置窗口…...

哈尔滨网站建设网站/百度大数据

Python四种逐行读取文件内容的方法 下面四种Python逐行读取文件内容的方法&#xff0c; 分析了各种方法的优缺点及应用场景&#xff0c;以下代码在python3中测试通过&#xff0c; python2中运行部分代码已注释&#xff0c;稍加修改即可。 方法一&#xff1a;readline函数 # -*-…...

成都医院手机网站建设/兰州seo优化公司

无意间发现之前搭建MySQL的时候&#xff0c;有将操作过程记录下来&#xff0c;涉及主机系统配置修改&#xff0c;MySQL用户和目录创建&#xff0c;初始化等操作。未免日后遗忘&#xff0c;特保存在这里。有些步骤可能会有错&#xff0c;需要甄别。注&#xff1a;mysql-5.7.91、…...