当前位置: 首页 > news >正文

【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse

【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse

  • 1)导入相关依赖
  • 2)代码实现
    • 2.1.resources
      • 2.1.1.appconfig.yml
      • 2.1.2.log4j.properties
      • 2.1.3.log4j2.xml
      • 2.1.4.flink_backup_local.yml
    • 2.2.utils
      • 2.2.1.DBConn
      • 2.2.2.CommonUtils
      • 2.2.3.RemoteConfigUtil
      • 2.2.4.ClickhouseUtil
    • 2.3.flatmap
      • 2.3.1.FlatMapFunction
    • 2.4.sink
      • 2.4.1.ClickHouseCatalog
    • 2.5.Kafka2ClickHouse
      • 2.5.1.Kafka2chApp
      • 2.5.2.Kafka2Ck-ODS

需求描述:

1、数据从 Kafka 写入 ClickHouse。

2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。

3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。

4、先在 ClickHouse 中创建表然后动态获取 ClickHouse 的表结构。

5、Kafka 数据为 Json 格式,通过 FlatMap 扁平化处理后,根据表结构封装到 Row 中后完成写入。

6、写入时转换成临时视图模式,利用 Flink-Sql 实现数据写入。

7、本地测试时可以编辑 resources.flink_backup_local.yml 通过 ConfigTools.initConf 方法获取配置。

1)导入相关依赖

这里的依赖比较冗余,大家可以根据各自需求做删除或保留。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>gaei.cn.x5l</groupId><artifactId>kafka2ch</artifactId><version>1.0.0</version><properties><hbase.version>2.3.3</hbase.version><hadoop.version>3.1.1</hadoop.version><spark.version>3.0.2</spark.version><scala.version>2.12.10</scala.version><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.0</flink.version><scala.binary.version>2.12</scala.binary.version><target.java.version>1.8</target.java.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.2</log4j.version><hadoop.version>3.1.2</hadoop.version><hive.version>3.1.2</hive.version></properties><dependencies><!-- 基础依赖  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 基础依赖  结束--><!-- TABLE  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>1.14.0</version><scope>provided</scope></dependency><!-- 使用 hive sql时注销,其他时候可以放开 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- TABLE  结束--><!-- sql  开始--><!-- sql解析 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- sql解析 结束 --><!-- sql连接 kafka -->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>--><!-- sql  结束--><!-- 检查点 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-state-processor-api_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 有状态的函数依赖 开始 --><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>statefun-sdk-java</artifactId>--><!--            <version>3.0.0</version>--><!--        </dependency>--><!-- 有状态的函数依赖 结束 --><!-- 连接Kafka -->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>--><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version><scope>compile</scope></dependency><!-- DataStream 开始 --><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>statefun-flink-datastream</artifactId>--><!--            <version>3.0.0</version>--><!--        </dependency>--><!-- DataStream 结束 --><!-- 本地监控任务 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 本地监控任务 结束 --><!-- DataStream 开始 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><!-- hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.1</version><!--            <exclusions>--><!--                <exclusion>--><!--                    <groupId>org.apache.curator</groupId>--><!--                    <artifactId>curator-client</artifactId>--><!--                </exclusion>--><!--            </exclusions>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.curator/curator-client --><!--        <dependency>--><!--            <groupId>org.apache.curator</groupId>--><!--            <artifactId>curator-client</artifactId>--><!--            <version>5.3.0</version>--><!--        </dependency>--><!-- 重点,容易被忽略的jar --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>${hadoop.version}</version></dependency><!-- rocksdb_2 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 其他 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency><dependency><groupId>gaei.cn.x5l.bigdata.common</groupId><artifactId>x5l-bigdata-common</artifactId><version>1.3-SNAPSHOT</version><exclusions><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId></exclusion><exclusion><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId></exclusion></exclusions></dependency><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>--><!--            <version>${flink.version}</version>--><!--        </dependency>--><!--   将  flink-connector-kafka_2.12  改为  flink-sql-connector-kafka_2.12 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-clickhouse</artifactId><version>1.14.3-SNAPSHOT</version><!--<systemPath>${project.basedir}/lib/flink-connector-clickhouse-1.12.0-SNAPSHOT.jar</systemPath>--><!--<scope>system</scope>--></dependency><dependency><groupId>gaei.cn.x5l</groupId><artifactId>tsp-gb-decode</artifactId><version>1.0.0</version></dependency><dependency><groupId>org.jyaml</groupId><artifactId>jyaml</artifactId><version>1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version><scope>runtime</scope></dependency><dependency><groupId>gaei.cn.x5l.flink.common</groupId><artifactId>x5l-flink-common</artifactId><version>1.4-SNAPSHOT</version></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude><exclude>org.apache.flink:flink-runtime-web_2.11</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass></transformer><!-- flink sql 需要  --><!-- The service transformer is needed to merge META-INF/services files --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><!-- ... --></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.0.0,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>

2)代码实现

2.1.resources

2.1.1.appconfig.yml

mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false&useUnicode=true&characterEncoding=UTF8&connectTimeout=60000&socketTimeout=60000"
mysql.username: "test"
mysql.password: "123456"
mysql.driver: "com.mysql.jdbc.Driver"

2.1.2.log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.1.3.log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5"><Properties><property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /><property name="LOG_LEVEL" value="ERROR" /></Properties><appenders><console name="console" target="SYSTEM_OUT"><PatternLayout pattern="${LOG_PATTERN}"/><ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/></console><File name="log" fileName="tmp/log/job.log" append="false"><PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/></File></appenders><loggers><root level="${LOG_LEVEL}"><appender-ref ref="console"/><appender-ref ref="log"/></root></loggers>
</configuration>

2.1.4.flink_backup_local.yml

clickhouse:connector: 'clickhouse'database-name: 'dwd'driver: 'ru.yandex.clickhouse.ClickHouseDriver'jdbcurl: 'jdbc:clickhouse://10.1.1.1:8123/dwd?socket_timeout=480000'password: 'X8v@123456!%$'reissueInterval: 3sink.batch-size: '200000'sink.flush-interval: '3000000'sink.ignore-delete: 'true'sink.max-retries: '3'sink.partition-key: 'toYYYYMMDD(sample_date_time)'sink.partition-strategy: 'balanced'table-name: 'test_local'url: 'clickhouse://10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123,10.1.1.1:8123'username: 'test'
hdfs:checkPointPath: 'hdfs://nameserver/user/flink/rocksdbcheckpoint'checkpointTimeout: 360000checkpointing: 300000maxConcurrentCheckpoints: 1minPauseBetweenCheckpoints: 10000restartInterval: 60restartStrategy: 3
kafka-consumer:prop:auto.offset.reset: 'earliest'bootstrap.servers: 'kfk01:9092,kfk02:9092,kfk03:9092'enable.auto.commit: 'false'fetch.max.bytes: '52428700'group.id: 'test'isKerberized: '1'keytab: 'D:/keytab/test.keytab'krb5Conf: 'D:/keytab/krb5.conf'max.poll.interval.ms: '300000'max.poll.records: '1000'principal: 'test@PRE.TEST.COM'security_protocol: 'SASL_PLAINTEXT'serviceName: 'kafka'session.timeout.ms: '600000'useTicketCache: 'false'topics: 'topicA,topicB'
kafka-producer:defaultTopic: 'kafka2hive_error'prop:acks: 'all'batch.size: '1048576'bootstrap.servers: 'kfk01:9092,kfk02:9092,kfk03:9092'compression.type: 'lz4'key.serializer: 'org.apache.kafka.common.serialization.StringSerializer'retries: '3'value.serializer: 'org.apache.kafka.common.serialization.StringSerializer'

2.2.utils

2.2.1.DBConn

import java.sql.*;public class DBConn {private static final String driver = "com.mysql.jdbc.Driver";		//mysql驱动private static Connection conn = null;private static PreparedStatement ps = null;private static ResultSet rs = null;private static final CallableStatement cs = null;/*** 连接数据库* @return*/public static Connection conn(String url,String username,String password) {Connection conn = null;try {Class.forName(driver);  //加载数据库驱动try {conn = DriverManager.getConnection(url, username, password);  //连接数据库} catch (SQLException e) {e.printStackTrace();}} catch (ClassNotFoundException e) {e.printStackTrace();}return conn;}/*** 关闭数据库链接* @return*/public static void close() {if(conn != null) {try {conn.close();  //关闭数据库链接} catch (SQLException e) {e.printStackTrace();}}}
}

2.2.2.CommonUtils

@Slf4j
public class CommonUtils {public static StreamExecutionEnvironment setCheckpoint(StreamExecutionEnvironment env) throws IOException {
//        ConfigTools.initConf("local");Map hdfsMap = (Map) ConfigTools.mapConf.get("hdfs");env.enableCheckpointing(((Integer) hdfsMap.get("checkpointing")).longValue(), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟env.getCheckpointConfig().setMinPauseBetweenCheckpoints(((Integer) hdfsMap.get("minPauseBetweenCheckpoints")).longValue());env.getCheckpointConfig().setCheckpointTimeout(((Integer) hdfsMap.get("checkpointTimeout")).longValue());env.getCheckpointConfig().setMaxConcurrentCheckpoints((Integer) hdfsMap.get("maxConcurrentCheckpoints"));env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setRestartStrategy(RestartStrategies.fixedDelayRestart((Integer) hdfsMap.get("restartStrategy"), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次Time.of(((Integer) hdfsMap.get("restartInterval")).longValue(), TimeUnit.SECONDS) // 延时));//设置可容忍的检查点失败数,默认值为0表示不允许容忍任何检查点失败env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);//设置状态后端存储方式env.setStateBackend(new RocksDBStateBackend((String) hdfsMap.get("checkPointPath"), true));
//        env.setStateBackend(new FsStateBackend((String) hdfsMap.get("checkPointPath"), true));
//        env.setStateBackend(new HashMapStateBackend(());return env;}public static FlinkKafkaConsumer<ConsumerRecord<String, String>> getKafkaConsumer(Map<String, Object> kafkaConf) throws IOException {String[] topics = ((String) kafkaConf.get("topics")).split(",");log.info("监听的topic: {}", topics);Properties properties = new Properties();Map<String, String> kafkaProp = (Map<String, String>) kafkaConf.get("prop");for (String key : kafkaProp.keySet()) {properties.setProperty(key, kafkaProp.get(key).toString());}if (!StringUtils.isBlank((String) kafkaProp.get("isKerberized")) && "1".equals(kafkaProp.get("isKerberized"))) {System.setProperty("java.security.krb5.conf", kafkaProp.get("krb5Conf"));properties.put("security.protocol", kafkaProp.get("security_protocol"));properties.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "+ "useTicketCache=" + kafkaProp.get("useTicketCache") + " "+ "serviceName=\"" + kafkaProp.get("serviceName") + "\" "+ "useKeyTab=true "+ "keyTab=\"" + kafkaProp.get("keytab").toString() + "\" "+ "principal=\"" + kafkaProp.get("principal").toString() + "\";");}properties.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");properties.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");FlinkKafkaConsumer<ConsumerRecord<String, String>> consumerRecordFlinkKafkaConsumer = new FlinkKafkaConsumer<ConsumerRecord<String, String>>(Arrays.asList(topics), new KafkaDeserializationSchema<ConsumerRecord<String, String>>() {@Overridepublic TypeInformation<ConsumerRecord<String, String>> getProducedType() {return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {});}@Overridepublic boolean isEndOfStream(ConsumerRecord<String, String> stringStringConsumerRecord) {return false;}@Overridepublic ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {return new ConsumerRecord<String, String>(record.topic(),record.partition(),record.offset(),record.timestamp(),record.timestampType(),record.checksum(),record.serializedKeySize(),record.serializedValueSize(),new String(record.key() == null ? "".getBytes(StandardCharsets.UTF_8) : record.key(), StandardCharsets.UTF_8),new String(record.value() == null ? "{}".getBytes(StandardCharsets.UTF_8) : record.value(), StandardCharsets.UTF_8));}}, properties);return consumerRecordFlinkKafkaConsumer;}
}

2.2.3.RemoteConfigUtil

public class RemoteConfigUtil {private static final Logger log = LoggerFactory.getLogger(RemoteConfigUtil.class);private static Connection conn = null;private static PreparedStatement ps = null;private static ResultSet rs = null;public static Map<String, Object> mapConf;public RemoteConfigUtil() {}public static Map<String, Object> getByAppNameAndConfigName(String appName, String ConfigName) throws SQLException {if (mapConf != null && mapConf.size() > 0) {return mapConf;} else {Map<String, String> ymlMap = LocalConfigUtil.getYmlMap("/appconfig");String username = (String)ymlMap.get("mysql.username");String password = (String)ymlMap.get("mysql.password");String url = (String)ymlMap.get("mysql.url");String driver = (String)ymlMap.get("mysql.driver");Connection conn = JdbcUtil.getConnection(url, username, password, driver);PreparedStatement preparedStatement = null;Map var14;try {String sql = "select config_context from base_app_config where app_name = '%s' and config_name = '%s'";preparedStatement = conn.prepareStatement(String.format(sql, appName, ConfigName));ResultSet rs = preparedStatement.executeQuery();String config_context;for(config_context = ""; rs.next(); config_context = rs.getString("config_context")) {}rs.close();log.info("配置信息config_context: {}", config_context);if (StringUtils.isNotBlank(config_context)) {System.out.println(JSONObject.toJSONString(JSONObject.parseObject(config_context), new SerializerFeature[]{SerializerFeature.PrettyFormat}));}mapConf = (Map)JSON.parseObject(config_context, Map.class);var14 = mapConf;} finally {if (preparedStatement != null) {preparedStatement.close();}if (conn != null) {conn.close();}}return var14;}}
}

2.2.4.ClickhouseUtil

public class ClickhouseUtil {public ClickhouseUtil() {}public static List<SchemaPo> getSchemaPoList(Map<String, Object> chMapConf) throws SQLException {List schemaPos = new ArrayList();Connection connection = null;try {String jdbcurl = (String) chMapConf.get("jdbcurl");String driver = (String) chMapConf.get("driver");String userName = (String) chMapConf.get("username");String password = (String) chMapConf.get("password");String databaseName = (String) chMapConf.get("database-name");String tableName = (String) chMapConf.get("table-name");connection = JdbcUtil.getConnection(jdbcurl, userName, password, driver);DatabaseMetaData metaData = connection.getMetaData();ResultSet colRet = metaData.getColumns((String) null, databaseName, tableName, "%");System.out.println("表字段信息:");while (colRet.next()) {String columnName = colRet.getString("COLUMN_NAME");String columnType = colRet.getString("TYPE_NAME");schemaPos.add(new SchemaPo(columnName, columnType));System.out.println(columnName + "   " + columnType);}} finally {try {if (connection != null) {connection.close();}} catch (SQLException var18) {var18.printStackTrace();}}return schemaPos;}public static String getCreateSinkTableSql(Map<String, Object> clickhouse, String sinkTableName, List<SchemaPo> schemaPos) {StringBuilder sinkTableSql = new StringBuilder();String userName = (String) clickhouse.get("username");String password = (String) clickhouse.get("password");String connector = (String) clickhouse.get("connector");String databaseName = (String) clickhouse.get("database-name");String url = (String) clickhouse.get("url");String tableName = (String) clickhouse.get("table-name");String sinkBatchSize = (String) clickhouse.get("sink.batch-size");String sinkFlushInterval = (String) clickhouse.get("sink.flush-interval");String sinkMaxRetries = (String) clickhouse.get("sink.max-retries");String sinkPartitionStrategy = (String) clickhouse.get("sink.partition-strategy");String sinkPartitionKey = (String) clickhouse.get("sink.partition-key");String sinkIgnoreDelete = (String) clickhouse.get("sink.ignore-delete");sinkTableSql.append(String.format("CREATE TABLE %s (\n", sinkTableName));int i = 0;Iterator var17 = schemaPos.iterator();while (var17.hasNext()) {SchemaPo schemaPo = (SchemaPo) var17.next();++i;String signal = schemaPo.getSignal();String type = schemaPo.getType();if ("UInt64".equalsIgnoreCase(type)) {type = "BIGINT";} else if ("Map(String,String)".equalsIgnoreCase(type)) {type = "Map<String,String>";} else if ("Datetime".equalsIgnoreCase(type)) {type = "Timestamp(0)";} else {type = "String";}sinkTableSql.append(String.format("    `%s` %s", signal, type));sinkTableSql.append(i == schemaPos.size() ? ")" : ",\n");}sinkTableSql.append("WITH(\n");sinkTableSql.append(String.format("'connector' = '%s',\n", connector));sinkTableSql.append(String.format("'url' = '%s',\n", url));sinkTableSql.append(String.format("'username' = '%s',\n", userName));sinkTableSql.append(String.format("'password' = '%s',\n", password));sinkTableSql.append(String.format("'url' = '%s',\n", url));sinkTableSql.append(String.format("'database-name' = '%s',\n", databaseName));sinkTableSql.append(String.format("'table-name' = '%s',\n", tableName));sinkTableSql.append(String.format("'sink.batch-size' = '%s',\n", sinkBatchSize));sinkTableSql.append(String.format("'sink.flush-interval' = '%s',\n", sinkFlushInterval));sinkTableSql.append(String.format("'sink.max-retries' = '%s',\n", sinkMaxRetries));sinkTableSql.append(String.format("'sink.partition-strategy' = 'hash',\n"));sinkTableSql.append(String.format("'sink.partition-key' = 'sample_date_time',\n"));sinkTableSql.append(String.format("'sink.ignore-delete' = '%s'\n", sinkIgnoreDelete));sinkTableSql.append(" )");return sinkTableSql.toString();}//转换成ck需要的格式public static Row convertRow(Map<String, String> resultMap, List<SchemaPo> schemaPos) {Row row = new Row(schemaPos.size());for (int i = 0; i < schemaPos.size(); i++) {SchemaPo schemaPo = schemaPos.get(i);String valueStr = resultMap.get(schemaPo.getSignal());if (StringUtils.isBlank(valueStr)) {row.setField(i, null);continue;}if ("UInt64".equalsIgnoreCase(schemaPo.getType())) {Long svalue = Long.valueOf(valueStr);row.setField(i, Math.abs(svalue));} else if ("Int64".equalsIgnoreCase(schemaPo.getType())) {Long svalue = Long.valueOf(valueStr);row.setField(i, Math.abs(svalue));} else if ("Int32".equalsIgnoreCase(schemaPo.getType())) {Integer svalue = Integer.valueOf(valueStr);row.setField(i, svalue);} else if ("datetime".equalsIgnoreCase(schemaPo.getType())) {try {Date svalue = (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")).parse(valueStr);Timestamp timestamp = new Timestamp(svalue.getTime());row.setField(i, timestamp);} catch (Exception ex) {System.out.println(ex.getMessage());System.out.println(Arrays.toString(ex.getStackTrace()));}} else {row.setField(i, valueStr);}}return row;}}

2.3.flatmap

2.3.1.FlatMapFunction

public interface FlatMapFunction {public FlatMapFunction<ConsumerRecord<String, String>, Row> newInstance(List<SchemaPo> schemaPos);
}

2.4.sink

2.4.1.ClickHouseCatalog

public class ClickHouseCatalog extends AbstractCatalog {private static final Logger LOG = LoggerFactory.getLogger(ClickHouseCatalog.class);public static final String DEFAULT_DATABASE = "default";private final String baseUrl;private final String username;private final String password;private final boolean ignorePrimaryKey;private final Map<String, String> properties;private ClickHouseConnection connection;public ClickHouseCatalog(String catalogName, Map<String, String> properties) {this(catalogName, (String)properties.get("database-name"), (String)properties.get("url"), (String)properties.get("username"), (String)properties.get("password"), properties);}public ClickHouseCatalog(String catalogName, @Nullable String defaultDatabase, String baseUrl, String username, String password) {this(catalogName, defaultDatabase, baseUrl, username, password, Collections.emptyMap());}public ClickHouseCatalog(String catalogName, @Nullable String defaultDatabase, String baseUrl, String username, String password, Map<String, String> properties) {super(catalogName, defaultDatabase == null ? "default" : defaultDatabase);Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl), "baseUrl cannot be null or empty");Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(username), "username cannot be null or empty");Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(password), "password cannot be null or empty");this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";this.username = username;this.password = password;this.ignorePrimaryKey = properties.get("catalog.ignore-primary-key") == null || Boolean.parseBoolean((String)properties.get("catalog.ignore-primary-key"));this.properties = Collections.unmodifiableMap(properties);}public void open() throws CatalogException {try {Properties configuration = new Properties();configuration.putAll(this.properties);configuration.setProperty(ClickHouseQueryParam.USER.getKey(), this.username);configuration.setProperty(ClickHouseQueryParam.PASSWORD.getKey(), this.password);configuration.setProperty("socket_timeout", "600000");String jdbcUrl = ClickHouseUtil.getJdbcUrl(this.baseUrl, this.getDefaultDatabase());BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource(jdbcUrl, configuration);dataSource.actualize();this.connection = dataSource.getConnection();LOG.info("Created catalog {}, established connection to {}", this.getName(), jdbcUrl);} catch (Exception var4) {throw new CatalogException(String.format("Opening catalog %s failed.", this.getName()), var4);}}public synchronized void close() throws CatalogException {try {this.connection.close();LOG.info("Closed catalog {} ", this.getName());} catch (Exception var2) {throw new CatalogException(String.format("Closing catalog %s failed.", this.getName()), var2);}}public Optional<Factory> getFactory() {return Optional.of(new ClickHouseDynamicTableFactory());}public synchronized List<String> listDatabases() throws CatalogException {try {PreparedStatement stmt = this.connection.prepareStatement("SELECT name from `system`.databases");Throwable var2 = null;try {ResultSet rs = stmt.executeQuery();Throwable var4 = null;try {List<String> databases = new ArrayList();while(rs.next()) {databases.add(rs.getString(1));}return databases;} catch (Throwable var31) {var4 = var31;throw var31;} finally {if (rs != null) {if (var4 != null) {try {rs.close();} catch (Throwable var30) {var4.addSuppressed(var30);}} else {rs.close();}}}} catch (Throwable var33) {var2 = var33;throw var33;} finally {if (stmt != null) {if (var2 != null) {try {stmt.close();} catch (Throwable var29) {var2.addSuppressed(var29);}} else {stmt.close();}}}} catch (Exception var35) {throw new CatalogException(String.format("Failed listing database in catalog %s", this.getName()), var35);}}public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {if (this.listDatabases().contains(databaseName)) {return new CatalogDatabaseImpl(Collections.emptyMap(), (String)null);} else {throw new DatabaseNotExistException(this.getName(), databaseName);}}public boolean databaseExists(String databaseName) throws CatalogException {Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));return this.listDatabases().contains(databaseName);}public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {throw new UnsupportedOperationException();}public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotEmptyException, CatalogException {throw new UnsupportedOperationException();}public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {throw new UnsupportedOperationException();}public synchronized List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {if (!this.databaseExists(databaseName)) {throw new DatabaseNotExistException(this.getName(), databaseName);} else {try {PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT name from `system`.tables where database = '%s'", databaseName));Throwable var3 = null;try {ResultSet rs = stmt.executeQuery();Throwable var5 = null;try {List<String> tables = new ArrayList();while(rs.next()) {tables.add(rs.getString(1));}return tables;} catch (Throwable var32) {var5 = var32;throw var32;} finally {if (rs != null) {if (var5 != null) {try {rs.close();} catch (Throwable var31) {var5.addSuppressed(var31);}} else {rs.close();}}}} catch (Throwable var34) {var3 = var34;throw var34;} finally {if (stmt != null) {if (var3 != null) {try {stmt.close();} catch (Throwable var30) {var3.addSuppressed(var30);}} else {stmt.close();}}}} catch (Exception var36) {throw new CatalogException(String.format("Failed listing tables in catalog %s database %s", this.getName(), databaseName), var36);}}}public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {throw new UnsupportedOperationException();}public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {if (!this.tableExists(tablePath)) {throw new TableNotExistException(this.getName(), tablePath);} else {Map<String, String> configuration = new HashMap(this.properties);configuration.put("url", this.baseUrl);configuration.put("database-name", tablePath.getDatabaseName());configuration.put("table-name", tablePath.getObjectName());configuration.put("username", this.username);configuration.put("password", this.password);String databaseName = tablePath.getDatabaseName();String tableName = tablePath.getObjectName();try {DistributedEngineFullSchema engineFullSchema = ClickHouseUtil.getAndParseDistributedEngineSchema(this.connection, tablePath.getDatabaseName(), tablePath.getObjectName());if (engineFullSchema != null) {databaseName = engineFullSchema.getDatabase();tableName = engineFullSchema.getTable();}} catch (Exception var6) {throw new CatalogException(String.format("Failed getting engine full of %s.%s.%s", this.getName(), databaseName, tableName), var6);}return new CatalogTableImpl(this.createTableSchema(databaseName, tableName), this.getPartitionKeys(databaseName, tableName), configuration, "");}}private synchronized TableSchema createTableSchema(String databaseName, String tableName) {try {PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT * from `%s`.`%s` limit 0", databaseName, tableName));Throwable var4 = null;TableSchema var24;try {ClickHouseResultSetMetaData metaData = (ClickHouseResultSetMetaData)stmt.getMetaData().unwrap(ClickHouseResultSetMetaData.class);Method getColMethod = metaData.getClass().getDeclaredMethod("getCol", Integer.TYPE);getColMethod.setAccessible(true);List<String> primaryKeys = this.getPrimaryKeys(databaseName, tableName);TableSchema.Builder builder = TableSchema.builder();for(int idx = 1; idx <= metaData.getColumnCount(); ++idx) {ClickHouseColumnInfo columnInfo = (ClickHouseColumnInfo)getColMethod.invoke(metaData, idx);String columnName = columnInfo.getColumnName();DataType columnType = ClickHouseTypeUtil.toFlinkType(columnInfo);if (primaryKeys.contains(columnName)) {columnType = (DataType)columnType.notNull();}builder.field(columnName, columnType);}if (!primaryKeys.isEmpty()) {builder.primaryKey((String[])primaryKeys.toArray(new String[0]));}var24 = builder.build();} catch (Throwable var21) {var4 = var21;throw var21;} finally {if (stmt != null) {if (var4 != null) {try {stmt.close();} catch (Throwable var20) {var4.addSuppressed(var20);}} else {stmt.close();}}}return var24;} catch (Exception var23) {throw new CatalogException(String.format("Failed getting columns in catalog %s database %s table %s", this.getName(), databaseName, tableName), var23);}}private List<String> getPrimaryKeys(String databaseName, String tableName) {if (this.ignorePrimaryKey) {return Collections.emptyList();} else {try {PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT name from `system`.columns where `database` = '%s' and `table` = '%s' and is_in_primary_key = 1", databaseName, tableName));Throwable var4 = null;try {ResultSet rs = stmt.executeQuery();Throwable var6 = null;try {List<String> primaryKeys = new ArrayList();while(rs.next()) {primaryKeys.add(rs.getString(1));}return primaryKeys;} catch (Throwable var33) {var6 = var33;throw var33;} finally {if (rs != null) {if (var6 != null) {try {rs.close();} catch (Throwable var32) {var6.addSuppressed(var32);}} else {rs.close();}}}} catch (Throwable var35) {var4 = var35;throw var35;} finally {if (stmt != null) {if (var4 != null) {try {stmt.close();} catch (Throwable var31) {var4.addSuppressed(var31);}} else {stmt.close();}}}} catch (Exception var37) {throw new CatalogException(String.format("Failed getting primary keys in catalog %s database %s table %s", this.getName(), databaseName, tableName), var37);}}}private List<String> getPartitionKeys(String databaseName, String tableName) {try {PreparedStatement stmt = this.connection.prepareStatement(String.format("SELECT name from `system`.columns where `database` = '%s' and `table` = '%s' and is_in_partition_key = 1", databaseName, tableName));Throwable var4 = null;try {ResultSet rs = stmt.executeQuery();Throwable var6 = null;try {List<String> partitionKeys = new ArrayList();while(rs.next()) {partitionKeys.add(rs.getString(1));}return partitionKeys;} catch (Throwable var33) {var6 = var33;throw var33;} finally {if (rs != null) {if (var6 != null) {try {rs.close();} catch (Throwable var32) {var6.addSuppressed(var32);}} else {rs.close();}}}} catch (Throwable var35) {var4 = var35;throw var35;} finally {if (stmt != null) {if (var4 != null) {try {stmt.close();} catch (Throwable var31) {var4.addSuppressed(var31);}} else {stmt.close();}}}} catch (Exception var37) {throw new CatalogException(String.format("Failed getting partition keys of %s.%s.%s", this.getName(), databaseName, tableName), var37);}}public boolean tableExists(ObjectPath tablePath) throws CatalogException {try {return this.databaseExists(tablePath.getDatabaseName()) && this.listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName());} catch (DatabaseNotExistException var3) {return false;}}public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {throw new UnsupportedOperationException();}public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException {throw new UnsupportedOperationException();}public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {throw new UnsupportedOperationException();}public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {throw new UnsupportedOperationException();}public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException {return Collections.emptyList();}public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, CatalogException {return Collections.emptyList();}public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException {return Collections.emptyList();}public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {throw new PartitionNotExistException(this.getName(), tablePath, partitionSpec);}public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {throw new UnsupportedOperationException();}public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {throw new UnsupportedOperationException();}public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {throw new UnsupportedOperationException();}public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {throw new UnsupportedOperationException();}public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {return Collections.emptyList();}public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {throw new FunctionNotExistException(this.getName(), functionPath);}public boolean functionExists(ObjectPath functionPath) throws CatalogException {return false;}public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {throw new UnsupportedOperationException();}public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {throw new UnsupportedOperationException();}public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {throw new UnsupportedOperationException();}public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {return CatalogTableStatistics.UNKNOWN;}public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {return CatalogColumnStatistics.UNKNOWN;}public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {return CatalogTableStatistics.UNKNOWN;}public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {return CatalogColumnStatistics.UNKNOWN;}public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {throw new UnsupportedOperationException();}public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException {throw new UnsupportedOperationException();}public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {throw new UnsupportedOperationException();}public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {throw new UnsupportedOperationException();}
}

2.5.Kafka2ClickHouse

2.5.1.Kafka2chApp

public class Kafka2chApp {private static final Logger log = LoggerFactory.getLogger(Kafka2chApp.class);private static String SINK_TABLE = "sinkTable";private static String KAFKA_TEMP_VIEW = "kafkaTempView";/*** @param appName            mysql配置表对应字段* @param configName         mysql配置表对应字段* @throws Exception*/public static void run(String appName, String configName, FlatMapFunction FlatMapFunction) throws Exception {log.info("Kafka2chApp.run传参appName:{}, configName:{}", appName, configName);// 获得数据库中的配置Map<String, Object> mapConf = RemoteConfigUtil.getByAppNameAndConfigName(appName, configName);if (mapConf == null || mapConf.size() == 0) return;Map<String, Object> clickhouseConf = (Map<String, Object>) mapConf.get("clickhouse");Map<String, Object> kafkaConsumerConf = (Map<String, Object>) mapConf.get("kafka-consumer");Map<String, Object> hdfsConf = (Map<String, Object>) mapConf.get("hdfs");// long beforeTime2Dropout = System.currentTimeMillis() - (Long) mapConf.get("before2DropoutHourStep") * 3600;// long after2DropoutTime = System.currentTimeMillis();// 初始化TableEnv & 获得流StreamExecutionEnvironment streamEnv = StreamEnv.getStreamEnv(hdfsConf);streamEnv.setParallelism(ckP);StreamTableEnvironment tableEnv = TableEnv.getTableEnv();// 处理List<SchemaPo> schemaPos = ClickhouseUtil.getSchemaPoList(clickhouseConf);TypeInformation[] types = getTypeInformationArray(schemaPos);// TypeInformation[] types = (schemaPos);String[] fieldNames = SchemaPoUtil.getFieldLists(schemaPos);FlatMapFunction<ConsumerRecord<String, String>, Row> flatMapFunction = x5lFlatMapFunction.newInstance(schemaPos);DataStreamSource<ConsumerRecord<String, String>> stream;SingleOutputStreamOperator<Row> infos;stream = streamEnv.addSource(CommonUtils.getKafkaConsumer(kafkaConsumerConf));System.out.println("Source 设置并行度为"+streamEnv.getParallelism());}infos = stream.flatMap(flatMapFunction);infos = infos.map(e -> e,new RowTypeInfo(types, fieldNames));System.out.println("map 设置并行度为"+streamEnv.getParallelism());}// 创建kafka数据临时视图tableEnv.createTemporaryView(KAFKA_TEMP_VIEW, infos);// 创建存放kafka数据的clickhouse映射表// String createSinkTableSql = ClickhouseUtil.getCreateSinkTableSql(clickhouseConf, SINK_TABLE, schemaPos);Map<String, String> props = new HashMap<>();props.put(ClickHouseConfig.DATABASE_NAME, (String) clickhouseConf.get("database-name"));props.put(ClickHouseConfig.URL, (String) clickhouseConf.get("url"));props.put(ClickHouseConfig.USERNAME, (String) clickhouseConf.get("username"));props.put(ClickHouseConfig.PASSWORD, (String) clickhouseConf.get("password"));props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, (String) clickhouseConf.get("sink.flush-interval"));props.put(ClickHouseConfig.SINK_BATCH_SIZE, (String) clickhouseConf.get("sink.batch-size"));Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);tableEnv.registerCatalog("clickhouse", cHcatalog);tableEnv.useCatalog("clickhouse");// Arrays.stream(tableEnv.listCatalogs()).forEach(e -> System.out.println("catalog: " + e));// Arrays.stream(tableEnv.listDatabases()).forEach(e -> System.out.println("database: " + e));// System.out.println(tableEnv.listTables().length);// Arrays.stream(tableEnv.listTables()).forEach(e -> System.out.println("table: " + e));// tableEnv.executeSql(createSinkTableSql);// System.out.println(tableEnv.executeSql("select * from " + KAFKA_TEMP_VIEW).getTableSchema());//拼接sqlString insertSql = "insert into `" + clickhouseConf.get("table-name") + "` select * from default_catalog.default_database." + KAFKA_TEMP_VIEW;// System.out.println("insertSql: " + insertSql);// log.info("insertSql: ", insertSql);//执行sqltableEnv.executeSql(insertSql);// 测试打印infos结果/*infos.print();streamEnv.executeAsync();*/}public static TypeInformation[] getTypeInformationArray(List<SchemaPo> schemaPos) {// String[] fieldNames = new String[columnTypeMap.size()];TypeInformation[] types = new TypeInformation[schemaPos.size()];int i = 0;for (SchemaPo po : schemaPos) {if ("String".equalsIgnoreCase(po.getType())) {types[i] = Types.STRING;} else if ("Int64".equalsIgnoreCase(po.getType())) {types[i] = Types.LONG;} else if ("UInt64".equalsIgnoreCase(po.getType())) {types[i] = Types.LONG;} else if ("Int32".equalsIgnoreCase(po.getType())) {types[i] = Types.INT;} else if ("Int8".equalsIgnoreCase(po.getType())) {types[i] = Types.INT;} else if ("datetime".equalsIgnoreCase(po.getType())) {types[i] = Types.SQL_TIMESTAMP;} else if ("Map(String,String)".equalsIgnoreCase(po.getType())) {types[i] = Types.MAP(Types.STRING, Types.STRING);} else {types[i] = Types.STRING;}i++;}return types;}}

2.5.2.Kafka2Ck-ODS

public class Kafka2Ck-ODS implements FlatMapFunction {private static Logger logger = Logger.getLogger(Kafka2Ck-ODS.class);public static void main(String[] args) throws Exception {Kafka2chApp.run(Kafka2Ck-ODS.class.getName(), args[0], new Kafka2Ck-ODS());}@Overridepublic FlatMapFunction<ConsumerRecord<String, String>, Row> newInstance(List<SchemaPo> schemaPos) {return new FlatMapFunction<ConsumerRecord<String, String>, Row>() {@Overridepublic void flatMap(ConsumerRecord<String, String> record, Collector<Row> out) throws Exception {// System.out.println("record ===> " +record); // 测试String value = record.value();try {HashMap<String, Object> infoMap = JSON.parseObject(value, HashMap.class);// 处理dataListMap中的数据for (Map.Entry<String, String> entry : dataListMap.entrySet()) {String key = entry.getKey();String value1 = entry.getValue();resultMap.put(key.toLowerCase(), value1);}Row row = TableEnv.getRowBySchemaPo1(resultMap, schemaPos);out.collect(row);} catch (Exception e) {e.printStackTrace();System.out.printf("数据异常,原因是%s,topic为%s,key为%s,value为%s%n", e.getMessage(), record.topic(), record.key(), record.value());}}};}
}

相关文章:

【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse

【Flink-Kafka-To-ClickHouse】使用 Flink 实现 Kafka 数据写入 ClickHouse 1&#xff09;导入相关依赖2&#xff09;代码实现2.1.resources2.1.1.appconfig.yml2.1.2.log4j.properties2.1.3.log4j2.xml2.1.4.flink_backup_local.yml 2.2.utils2.2.1.DBConn2.2.2.CommonUtils2.…...

浅谈Redis分布式锁(下)

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 自定义Redis分布式锁的…...

Django Rest Framework框架的安装

Django Rest Framework框架的安装 Django Rest Framework框架的安装 1.DRF简介2.安装依赖3.安装使用pip安装添加rest_framework应用 1.DRF简介 Django REST Framework是Web api的工具包。它是在Django框架基础之上&#xff0c;进行了二次开发。 2.安装依赖 链接python安装 …...

深度学习(七):bert理解之输入形式

传统的预训练方法存在一些问题&#xff0c;如单向语言模型的局限性和无法处理双向上下文的限制。为了解决这些问题&#xff0c;一种新的预训练方法随即被提出&#xff0c;即BERT&#xff08;Bidirectional Encoder Representations from Transformers&#xff09;。通过在大规模…...

如何用Excel制作一张能在网上浏览的动态数据报表

前言 如今各类BI产品大行其道&#xff0c;“数据可视化”成为一个热门词汇。相比价格高昂的各种BI软件&#xff0c;用Excel来制作动态报表就更加经济便捷。今天小编就将为大家介绍一下如何使用葡萄城公司的纯前端表格控件——SpreadJS来实现一个Excel动态报表&#xff1a; 实…...

双向数据绑定是什么

一、什么是双向绑定 我们先从单向绑定切入单向绑定非常简单&#xff0c;就是把Model绑定到View&#xff0c;当我们用JavaScript代码更新Model时&#xff0c;View就会自动更新双向绑定就很容易联想到了&#xff0c;在单向绑定的基础上&#xff0c;用户更新了View&#xff0c;Mo…...

鱼眼标定方式

鱼眼作用 人单眼水平视角最大可达156度&#xff0c;垂直方向150度。为了增加可视范围&#xff0c;摄像头可以通过畸变参数扩大视野&#xff0c;一般100度到200度的fov。所以鱼眼是为了看的视野更大&#xff0c;注意在一定分辨率下&#xff0c;fov边缘的像素点稀疏&#xff0c;…...

详解Keras3.0 KerasNLP Models: GPT2 GPT2Tokenizer

1、GPT2Tokenizer 用于将文本数据转换为适合训练和预测的格式&#xff0c;主要功能是将输入的文本进行分词、编码等操作&#xff0c;以便在神经网络中使用 keras_nlp.models.GPT2Tokenizer(vocabulary, merges, **kwargs) 参数说明 vocabulary&#xff1a;一个字典&#x…...

2016年第五届数学建模国际赛小美赛B题直达地铁线路解题全过程文档及程序

2016年第五届数学建模国际赛小美赛 B题 直达地铁线路 原题再现&#xff1a; 在目前的大都市地铁网络中&#xff0c;在两个相距遥远的车站之间运送乘客通常需要很长时间。我们可以建议在两个长途车站之间设置直达班车&#xff0c;以节省长途乘客的时间。   第一部分&#xf…...

三秦通ETC续航改造

前些天开车时ETC每隔2分钟滴滴响一下&#xff0c;重插卡提示电池电压低 2.8V。看来应该是电池不行了。去银行更换ETC应该是需要费用的。还有一种办法是注销掉&#xff0c;然后去别的银行办一个。不过我想自己更换电池试一下。 首先拆下ETC&#xff0c;我使用的办法是开水烫。烧…...

使用Python实现发送Email电子邮件【第19篇—python发邮件】

文章目录 &#x1f47d;使用Python实现发送Email电子邮件&#x1f3b6;实现原理&#x1f3c3;Python实现发送Email电子邮件-基础版&#x1f46b;实现源码&#x1f646;源码解析 &#x1f487;Python实现发送Email电子邮件-完善版&#x1f46b;实现源码&#x1f646;源码解析&am…...

Docker基本命令和Docker怎么自己制作镜像

基本命令 启动新的容器&#xff08;指定容器名称和端口映射【主机端口&#xff1a;容器端口】) docker run --name 容器名 -p 8080:80 镜像名 启动新的容器&#xff08;交互式&#xff09; docker run -it centos7-with-jdk /bin/bash 特权方式启动容器 docker run -d --…...

Netty-2-数据编解码

解析编解码支持的原理 以编码为例&#xff0c;要将对象序列化成字节流&#xff0c;你可以使用MessageToByteEncoder或MessageToMessageEncoder类。 这两个类都继承自ChannelOutboundHandlerAdapter适配器类&#xff0c;用于进行数据的转换。 其中&#xff0c;对于MessageToMe…...

伽马校正:FPGA

参考资料&#xff1a; Tone Mapping 与 Gamma Correction - 知乎 (zhihu.com) Book_VIP: 《基于MATLAB与FPGA的图像处理教程》此书是业内第一本基于MATLAB与FPGA的图像处理教程&#xff0c;第一本真正结合理论及算法加速方案&#xff0c;在Matlab验证&#xff0c;以及在FPGA上…...

【SpringCloud笔记】(8)服务网关之GateWay

GateWay 概述简介 官网地址&#xff1a; 上一代网关Zuul 1.x&#xff1a;https://github.com/Netflix/zuul/wiki&#xff08;有兴趣可以了解一下&#xff09; gateway&#xff1a;https://cloud.spring.io/spring-cloud-static/spring-cloud-gateway/2.2.1.RELEASE/reference/…...

Compose常用布局

Compose布局基础知识 上一节对Compose做了简单的介绍&#xff0c;本章节主要介绍Compose中常用的布局&#xff0c;其中包括三个基础布局&#xff08;Colmun、Row、Box&#xff09;&#xff1b;以及其他常用布局&#xff08;ConstraintLayout 、BoxWithConstraints、HorizontalP…...

使用keytool查看Android APK签名

文章目录 一、找到JDK位置二、使用方法2.1 打开windows命令行工具2.2 查看签名 三、如何给APK做系统签名呢? 一、找到JDK位置 安卓AS之后&#xff0c;可选择继续安装JDK&#xff0c;如本文使用amazon版本默认位置&#xff1a;C:\Users\66176.jdks\corretto-1.8.0_342可通过自…...

数据库学习日常案例20231221-oracle libray cache lock分析

1 问题概述&#xff1a; 阻塞的源头为两个ddl操作导致大量的libray cache lock 其中1133为gis sde的create table as语句。 其中697为alter index语句。...

【数据结构】最短路径算法实现(Dijkstra(迪克斯特拉),FloydWarshall(弗洛伊德) )

文章目录 前言一、Dijkstra&#xff08;迪克斯特拉&#xff09;1.方法&#xff1a;2.代码实现 二、FloydWarshall&#xff08;弗洛伊德&#xff09;1.方法2.代码实现 完整源码 前言 最短路径问题&#xff1a;从在带权有向图G中的某一顶点出发&#xff0c;找出一条通往另一顶点…...

算法模板之队列图文详解

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;算法模板、数据结构 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. ⛳️模拟队列1.1 &#x1f514;用数组模拟实现队列1.1.1 &#x1f47b;队列的定…...

[node]Node.js 中REPL简单介绍

[node]Node.js 中REPL简单介绍 什么是REPL为什么使用REPL如何使用REPL 命令REPL模式node的全局内容展示node全局所有模块查看全局模块具体内容其它命令 实践 什么是REPL Node.js REPL(Read Eval Print Loop:交互式解释器) 表示电脑的环境&#xff0c;类似 Windows 系统的终端或…...

AtomHub 开源容器镜像中心开放公测,国内服务稳定下载

由开放原子开源基金会主导&#xff0c;华为、浪潮、DaoCloud、谐云、青云、飓风引擎以及 OpenSDV 开源联盟、openEuler 社区、OpenCloudOS 社区等成员单位共同发起建设的 AtomHub 可信镜像中心正式开放公测。AtomHub 秉承共建、共治、共享的理念&#xff0c;旨在为开源组织和开…...

java8实战 lambda表达式、函数式接口、方法引用双冒号(中)

前言 书接上文&#xff0c;上一篇博客讲到了lambda表达式的应用场景&#xff0c;本篇接着将java8实战第三章的总结。建议读者先看第一篇博客 其他函数式接口例子 上一篇有讲到Java API也有其他的函数式接口&#xff0c;书里也举了2个例子&#xff0c;一个是java.util.functi…...

FPGA高端项目:UltraScale GTH + SDI 视频编解码,SDI无缓存回环输出,提供2套工程源码和技术支持

目录 1、前言免责声明 2、相关方案推荐我这里已有的 GT 高速接口解决方案我目前已有的SDI编解码方案 3、详细设计方案设计框图3G-SDI摄像头LMH0384均衡EQUltraScale GTH 的SDI模式应用UltraScale GTH 基本结构参考时钟的选择和分配UltraScale GTH 发送和接收处理流程UltraScale…...

为什么react call api in cDidMount

为什么react call api in cDM 首先&#xff0c;放到constructor或者cWillMount不是语法错误 参考1 参考2 根据上2个参考&#xff0c;总结为&#xff1a; 1、官网就是这么建议的&#xff1a; 2、17版本后的react 由于fiber的出现导致 cWM 会调用多次&#xff01; cWM 方法已…...

openGauss学习笔记-171 openGauss 数据库运维-备份与恢复-导入数据-深层复制

文章目录 openGauss学习笔记-171 openGauss 数据库运维-备份与恢复-导入数据-深层复制171.1 使用CREATE TABLE执行深层复制171.1.1 操作步骤 171.2 使用CREATE TABLE LIKE执行深层复制171.2.1 操作步骤 171.3 通过创建临时表并截断原始表来执行深层复制171.3.1 操作步骤 openGa…...

[kubernetes]控制平面ETCD

什么是ETCD CoreOS基于Raft开发的分布式key-value存储&#xff0c;可用于服务发现、共享配置以及一致性保障&#xff08;如数据库选主、分布式锁等&#xff09;etcd像是专门为集群环境的服务发现和注册而设计&#xff0c;它提供了数据TTL失效、数据改变监视、多值、目录监听、…...

序列化类的高级用法

1.3.3 模型类序列化器 如果我们想要使用序列化器对应的是Django的模型类&#xff0c;DRF为我们提供了ModelSerializer模型类序列化器来帮助我们快速创建一个Serializer类。 ModelSerializer与常规的Serializer相同&#xff0c;但提供了&#xff1a; 基于模型类自动生成一系列…...

4.svn版本管理工具使用

1. 什么是SVN 版本控制 它可以记录每一次文件和目录的修改情况,这样就可以借此将数据恢复到以前的版本,并可以查看数据的更改细节! Subversion(简称SVN)是一个自由开源的版本控制系统。在Subversion管理下,文件和目录可以超越时空 SVN的优势 统一的版本号 Subversi…...

ZKP Algorithms for Efficient Cryptographic Operations 1 (MSM Pippenger)

MIT IAP 2023 Modern Zero Knowledge Cryptography课程笔记 Lecture 6: Algorithms for Efficient Cryptographic Operations (Jason Morton) Multi-scalar Multiplication(MSM) Naive: nP (((P P) P) P)… (2(2P))…Binary expand $n e_0e_1\alphae_2\alpha2\dots\e_{\…...

Windows系统安装 ffmpeg

下载及解压 ffmpeg官方下载地址&#xff1a;https://ffmpeg.org/download.html 下载好后将其解压至你想保存的位置中。 环境变量设置 打开Windows设置&#xff0c;在搜索框输入&#xff1a;系统高级设置。 新建环境变量&#xff0c;并输入bin目录具体位置。 安装检查 按住 w…...

油猴脚本教程案例【键盘监听】-编写 ChatGPT 快捷键优化

文章目录 1. 元数据namenamespaceversiondescriptionauthormatchgranticon 2. 编写函数.1 函数功能2.1.1. input - 聚焦发言框2.1.2. stop - 取消回答2.1.3. newFunction - 开启新窗口2.1.4. scroll - 回到底部 3. 监听键盘事件3.1 监听X - 开启新对话3.2 监听Z - 取消回答3.3 …...

数据结构 | 查漏补缺

目录 数据的基本单位 冒泡排序 DFS和BFS中文 Prim 比较 中序线索二叉树 顺序栈 链栈 时间复杂度 循环队列 求第K个结点的值 数据的基本单位 数据元素 循环队列sq中&#xff0c;用数组elem[0‥25]存放数据元素&#xff0c;设当前sq->front为20&#xff0c;sq-&g…...

回溯算法练习题

78. 子集 中等 1.9K 相关企业 给你一个整数数组 nums &#xff0c;数组中的元素 互不相同 。返回该数组所有可能的子集&#xff08;幂集&#xff09;。 解集 不能 包含重复的子集。你可以按 任意顺序 返回解集。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#x…...

代码随想录算法训练营 | day60 单调栈 84.柱状图中最大的矩形

刷题 84.柱状图中最大的矩形 题目链接 | 文章讲解 | 视频讲解 题目&#xff1a;给定 n 个非负整数&#xff0c;用来表示柱状图中各个柱子的高度。每个柱子彼此相邻&#xff0c;且宽度为 1 。 求在该柱状图中&#xff0c;能够勾勒出来的矩形的最大面积。 1 < heights.len…...

vscode中vue项目报错

当在vscode中写代码时&#xff0c;报错报错报错......... 已经头大&#xff0c;还没写就报错&#xff0c; 这是因为eslint对语法的要求太过严格导致的编译时&#xff0c;出现各种语法格式错误 我们打开vue.config.js&#xff0c;加上这句代码&#xff0c;就OK啦 lintOnSave:…...

「数据结构」二叉树2

&#x1f387;个人主页&#xff1a;Ice_Sugar_7 &#x1f387;所属专栏&#xff1a;初阶数据结构 &#x1f387;欢迎点赞收藏加关注哦&#xff01; 文章目录 &#x1f349;前言&#x1f349;链式结构&#x1f349;遍历二叉树&#x1f34c;前序遍历&#x1f34c;中序遍历&#x…...

数据处理系列课程 01:谈谈数据处理在数据分析中的重要性

一、数据分析 可能很多朋友第一次听到这个名词&#xff0c;那么我们先来谈一谈什么是数据分析。 数据分析是指用适当的统计分析方法对收集来的大量数据进行分析&#xff0c;将它们加以汇总和理解&#xff0c;以求最大化地开发数据的功能&#xff0c;发挥数据的作用。数据分析是…...

C++卡码网题目55--右旋字符串

卡码网题目链接 字符串的右旋转操作是把字符串尾部的若干个字符转移到字符串的前面。给定一个字符串 s 和一个正整数 k&#xff0c;请编写一个函数&#xff0c;将字符串中的后面 k 个字符移到字符串的前面&#xff0c;实现字符串的右旋转操作。 例如&#xff0c;对于输入字符…...

八股文打卡day8——计算机网络(8)

面试题&#xff1a;什么是强缓存和协商缓存&#xff1f; 我的回答&#xff1a; 强缓存&#xff1a;浏览器不需要发送请求到服务器&#xff0c;直接从浏览器缓存中获取数据。浏览器不需要和服务器进行交互就可以获取数据&#xff0c;这样极大提高了页面访问速度。 协商缓存&am…...

亚马逊推出 Graviton4:具有 536.7 GBps 内存带宽的 96 核 ARM CPU

如今&#xff0c;许多云服务提供商都设计自己的芯片&#xff0c;但亚马逊网络服务 (AWS) 开始领先于竞争对手&#xff0c;目前其子公司 Annapurna Labs 开发的处理器可以与 AMD 和英特尔的处理器竞争。本周&#xff0c;AWS 推出了 Graviton4 SoC&#xff0c;这是一款基于 ARM 的…...

跨域问题的解决

1.什么是跨域&#xff1f; 浏览器从一个域名的网页去请求另外一个域名的资源时&#xff0c;域名、端口或者协议不同都是跨域 2.跨域的解决方案 设置CORS响应头∶后端可以在HTTP响应头中添加相关的CORS标头&#xff0c;允许特定的源&#xff08;域名、协议、端口)访问资源。S…...

Typro+PicGo自动上传图片(图床配置)

文章目录 所需工具主要配置 TyproPicGo自动上传图片&#xff08;图床配置&#xff09; 使用Typro编写 的markdown(md)文件如果存在图片&#xff0c;并且想快速发布博文的话&#xff0c;常使用PiGO工具配置图床服务器来管理图片。 所需工具 TyporaPicGo(依赖Nodejs和插件super…...

uniapp实战 -- 个人信息维护(含选择图片 uni.chooseMedia,上传文件 uni.uploadFile,获取和更新表单数据)

效果预览 相关代码 页面–我的 src\pages\my\my.vue <!-- 个人资料 --><view class"profile" :style"{ paddingTop: safeAreaInsets!.top px }"><!-- 情况1&#xff1a;已登录 --><view class"overview" v-if"membe…...

企业如何建立价值评估体系?

企业绩效评价体系是指由一系列与绩效评价相关的评价制度、评价指标体系、评价方法、评价标准以及评价机构等形成的有机整体。企业的评价系统大致可以分为以下四个层次&#xff1a; 第一、岗位评价系统&#xff0c;主要针对不同岗位之间的评估。例如&#xff0c;企业中一般业务…...

华为安防监控摄像头

华为政企42 华为政企 目录 上一篇华为政企城市一张网研究报告下一篇华为全屋wifi6蜂鸟套装标准...

[node] Node.js 缓冲区Buffer

[node] Node.js 缓冲区Buffer 什么是BufferBuffer 与字符编码Buffer 的方法概览Buffer 的实例Buffer 的创建写入缓冲区从 Buffer 区读取数据将 Buffer 转换为 JSON 对象Buffer 的合并Buffer 的比较Buffer 的覆盖Buffer 的截取--sliceBuffer 的长度writeUIntLEwriteUIntBE 什么是…...

【ARM Cortex-M 系列 5 -- RT-Thread renesas/ra4m2-eco 移植编译篇】

文章目录 RT-Thread 移植编译篇编译os.environ 使用示例os.putenv使用示例python from 后指定路径 编译问题_POSIX_C_SOURCE 介绍编译结果 RT-Thread 移植编译篇 本文以瑞萨的ra4m2-eco 为例介绍如何下载rt-thread 及编译的设置。 RT-Thread 代码下载&#xff1a; git clone …...

功能强大的开源数据中台系统 DataCap 1.18.0 发布

推荐一套基于 SpringBoot 开发的简单、易用的开源权限管理平台&#xff0c;建议下载使用: https://github.com/devlive-community/authx 推荐一套为 Java 开发人员提供方便易用的 SDK 来与目前提供服务的的 Open AI 进行交互组件&#xff1a;https://github.com/devlive-commun…...

A Philosophy of Software Design 学习笔记

前言 高耦合&#xff0c;低内聚&#xff0c;降低复杂度&#xff1a;在软件迭代中&#xff0c;不关注软件系统结构&#xff0c;导致软件复杂度累加&#xff0c;软件缺乏系统设计&#xff0c;模块混乱&#xff0c;一旦需求增加、修改或者优化&#xff0c;改变的代价无法评估&…...