k8s之flink的几种创建方式
在此之前需要部署一下私人docker仓库,教程搭建 Docker 镜像仓库
注意:每台节点的daemon.json都需要配置"insecure-registries": ["http://主机IP:8080"] 并重启
一、session 模式
Session 模式是指在 Kubernetes 上启动一个共享的 Flink 集群(由 JobManager 和多个 TaskManagers 组成),然后多个 Flink 作业可以提交到这个共享集群上运行。这个模式下的集群会长期运行,直到用户手动停止它。这种模式适合多个作业需要频繁启动和停止,且对集群资源的利用率要求较高的场景。
Kubernetes 中的 Flink Session 集群部署至少包含三个组件:
-
运行
JobManager的部署 -
TaskManagers池的部署 -
暴露
JobManager的 REST 和 UI 端口的服务
1.1 Native Kubernetes 模式
Flink 的 Native Kubernetes 模式允许用户将 Apache Flink 无缝集成至 Kubernetes 环境中,实现在 Kubernetes 上运行 Flink 作业和应用程序。这种模式的主要优点是 Flink 能够利用 Kubernetes 提供的资源编排和管理能力,简化 Flink 集群的部署和管理。
在 Native Kubernetes 模式下,Flink 集群的部署和管理是通过 Flink 的 Kubernetes Operator 或者是直接使用 kubectl 命令行工具来完成的。Flink 的每个组件都被作为 Kubernetes 资源(如Pods, Services等)来管理。
1.1.1 构建镜像 Dockerfile
1.创建dockerfileFROM flink:1.16.2
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-82.开始构建镜像
docker build -t 192.168.20.62:2333/bigdata/flink-session:1.16.23.上传镜像
docker push 192.168.20.62:2333/bigdata/flink-session:1.16.2
1.1.2 创建命名空间和 serviceaccount
# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
1.1.3 创建 flink 集群
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-session:1.16.2 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort
1.1.4 提交任务
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar \
-Dkubernetes.taskmanager.cpu=2000m \
-Dexternal-resource.limits.kubernetes.cpu=4000m \
-Dexternal-resource.limits.kubernetes.memory=10Gi \
-Dexternal-resource.requests.kubernetes.cpu=2000m \
-Dexternal-resource.requests.kubernetes.memory=8Gi \
-Dkubernetes.taskmanager.cpu=2000m \
1.1.5 删除 flink 集群
kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force
1.2 Standalone 模式
Standalone 模式通常指的是在 Kubernetes 集群上运行 Flink 的一个单独集群环境,但它不是专门为 Kubernetes 设计的。在 Kubernetes 上使用 Standalone 模式意味着你将手动设置 Flink 集群(包括 JobManager 和 TaskManagers),而不是通过 Kubernetes Operator 或者其他 Kubernetes 原生的资源调度和管理机制。换句话说,在这个模式下,Flink 集群的各个组件(JobManager和TaskManagers)运行在 Kubernetes Pod 中,但是它们的生命周期管理并不是通过 Kubernetes 原生的支持来实现的,而是类似于在任何其他环境中部署 Flink 的传统方式。
1.2.1 创建docker-entrypoint.sh脚本
#!/usr/bin/env bash###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd() {if [ $(id -u) != 0 ]; then# Don't need to drop privs if EUID != 0returnelif [ -x /sbin/su-exec ]; then# Alpineecho su-exec adminelse# Othersecho gosu adminfi
}copy_plugins_if_required() {if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; thenreturn 0fiecho "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); doecho "Linking ${target_plugin} to plugin directory"plugin_name=${target_plugin%.jar}mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; thenecho "Plugin ${target_plugin} does not exist. Exiting."exit 1elseln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"echo "Successfully enabled ${target_plugin}"fidone
}set_config_option() {local option=$1local value=$2# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; thensed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"elseecho "${option}: ${value}" >> "${CONF_FILE}"fi
}prepare_configuration() {set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}set_config_option blob.server.port 6124set_config_option query.server.port 6125if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; thenset_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}fiif [ -n "${FLINK_PROPERTIES}" ]; thenecho "${FLINK_PROPERTIES}" >> "${CONF_FILE}"fienvsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}maybe_enable_jemalloc() {if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; thenJEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if [ -f "$JEMALLOC_PATH" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATHelif [ -f "$JEMALLOC_FALLBACK" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; thenMSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"fiecho "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."fifi
}maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@")
if [ "$1" = "help" ]; thenprintf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"printf " Or $(basename "$0") help\n\n"printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"exit 0
elif [ "$1" = "jobmanager" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; thenargs=("${args[@]:1}")echo "Starting History Server"exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"
1.2.2 编排 Dockerfile
FROM centos:7.9.2009USER root# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN mkdir -p /opt/apacheADD jdk-8u231-linux-x64.tar.gz /opt/apache/ADD flink-1.16.2-bin-scala_2.12.tgz /opt/apache/ENV FLINK_HOME /opt/apache/flink-1.16.2
ENV JAVA_HOME /opt/apache/jdk1.8.0_231
ENV PATH $JAVA_HOME/bin:$PATH# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/
RUN chmod +x /opt/apache/docker-entrypoint.shRUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin adminRUN chown -R admin:admin /opt/apache#设置的工作目录
WORKDIR $FLINK_HOME# 对外暴露端口
EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]
1.2.3 开始构建镜像
docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2 . --no-cache# 上传镜像
docker push 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2
1.2.4 创建命名空间和 serviceaccount
# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
1.2.5 编排 yaml 文件
1.2.5.1 flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
1.2.5.2 jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
1.2.5.3 jobmanager-rest-service.yaml
将 jobmanager rest端口公开为公共 Kubernetes 节点的端口
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
1.2.5.4 taskmanager-query-state-service.yaml
公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口
apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager
1.2.5.5 jobmanager-session-deployment-non-ha.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-jobmanager
spec:replicas: 1selector:matchLabels:app: flinkcomponent: jobmanagertemplate:metadata:labels:app: flinkcomponent: jobmanagerspec:containers:- name: jobmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2args: ["jobmanager"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf/securityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
1.2.5.6 taskmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin:1.16.2args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf/securityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties
1.2.6 创建 flink 集群
kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink# Create the deployments for the cluster
kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
kubectl create -f taskmanager-session-deployment.yaml -n flink
1.2.7 提交任务
./bin/flink run -m 192.168.20.62:30081 ./examples/streaming/TopSpeedWindowing.jar
1.2.8 删除集群
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f taskmanager-session-deployment.yaml -n flink
kubectl delete -f jobmanager-session-deployment.yaml -n flink
kubectl delete ns flink --force
二 、application 模式(推荐)
Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件
-
运行
JobManager的应用程序 -
TaskManagers池的部署 -
暴露
JobManager的 REST 和 UI 端口的服务
2.1 Native Kubernetes 模式(常用)
2.1.1 构建镜像 Dockerfile
FROM flink:1.16.2
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
RUN mkdir -p $FLINK_HOME/usrlib
COPY ./flink-1.16.2/examples/streaming/TopSpeedWindowing.jar /opt/flink/usrlib/开始构建镜像docker build -t 192.168.20.62:2333/bigdata/flink-application:1.16.2 . --no-cache
docker push 192.168.20.62:2333/bigdata/flink-application:1.16.2
2.1.2 创建命名空间和 serviceacount
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
2.1.3 创建 flink 集群并提交任务
./bin/flink run-application \--target kubernetes-application \-Dkubernetes.cluster-id=my-first-application-cluster \-Dkubernetes.container.image=192.168.20.62:2333/bigdata/flink-application:1.16.2 \-Dkubernetes.jobmanager.replicas=1 \-Dkubernetes.namespace=flink \-Dkubernetes.jobmanager.service-account=flink-service-account \-Dexternal-resource.limits.kubernetes.cpu=2000m \-Dexternal-resource.limits.kubernetes.memory=2Gi \-Dexternal-resource.requests.kubernetes.cpu=1000m \-Dexternal-resource.requests.kubernetes.memory=1Gi \-Dkubernetes.rest-service.exposed.type=NodePort \local:///opt/flink/usrlib/TopSpeedWindowing.jar
local是application模式中唯一支持的方案。local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。
2.1.4 删除 flink 集群
kubectl delete deployment/my-first-application-cluster -n flink
kubectl delete ns flink --force
2.2 Standalone 模式
在此之前需要使用nfs设置一个共享目录,配置文件中设置共享目录是/mnt/bigdata/flink/usrlib
NFS共享存储服务
2.2.1 创建启动脚本docker-entrypoint.sh
#!/usr/bin/env bash###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"drop_privs_cmd() {if [ $(id -u) != 0 ]; then# Don't need to drop privs if EUID != 0returnelif [ -x /sbin/su-exec ]; then# Alpineecho su-exec adminelse# Othersecho gosu adminfi
}copy_plugins_if_required() {if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; thenreturn 0fiecho "Enabling required built-in plugins"for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); doecho "Linking ${target_plugin} to plugin directory"plugin_name=${target_plugin%.jar}mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; thenecho "Plugin ${target_plugin} does not exist. Exiting."exit 1elseln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"echo "Successfully enabled ${target_plugin}"fidone
}set_config_option() {local option=$1local value=$2# escape periods for usage in regular expressionslocal escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")# either override an existing entry, or append a new oneif grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; thensed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"elseecho "${option}: ${value}" >> "${CONF_FILE}"fi
}prepare_configuration() {set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}set_config_option blob.server.port 6124set_config_option query.server.port 6125if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; thenset_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}fiif [ -n "${FLINK_PROPERTIES}" ]; thenecho "${FLINK_PROPERTIES}" >> "${CONF_FILE}"fienvsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}maybe_enable_jemalloc() {if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; thenJEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"if [ -f "$JEMALLOC_PATH" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATHelif [ -f "$JEMALLOC_FALLBACK" ]; thenexport LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACKelseif [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; thenMSG_PATH=$JEMALLOC_PATHelseMSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"fiecho "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."fifi
}maybe_enable_jemalloccopy_plugins_if_requiredprepare_configurationargs=("$@")
if [ "$1" = "help" ]; thenprintf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"printf " Or $(basename "$0") help\n\n"printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"exit 0
elif [ "$1" = "jobmanager" ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; thenargs=("${args[@]:1}")echo "Starting Job Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; thenargs=("${args[@]:1}")echo "Starting History Server"exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; thenargs=("${args[@]:1}")echo "Starting Task Manager"exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fiargs=("${args[@]}")# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"
2.2.2 编排Dockerfile
FROM centos:7.9.2009USER root# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezoneRUN mkdir -p /opt/apacheADD jdk-8u231-linux-x64.tar.gz /opt/apache/ADD flink-1.16.2-bin-scala_2.12.tgz /opt/apache/ENV FLINK_HOME /opt/apache/flink-1.16.2
ENV JAVA_HOME /opt/apache/jdk1.8.0_231
ENV PATH $JAVA_HOME/bin:$PATH# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/COPY /RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin adminRUN chown -R admin:admin /opt/apache
RUN chmod +x /opt/apache/docker-entrypoint.sh#设置的工作目录
WORKDIR $FLINK_HOME# 对外暴露端口
EXPOSE 6123 8081# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]docker build -t 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2 . --no-cache
docker push 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2
2.2.3 创建命名空间和 serviceacount
# 创建namespace
kubectl create ns flink# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
2.2.4 flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:name: flink-configlabels:app: flink
data:flink-conf.yaml: |+jobmanager.rpc.address: flink-jobmanagertaskmanager.numberOfTaskSlots: 2blob.server.port: 6124jobmanager.rpc.port: 6123taskmanager.rpc.port: 6122queryable-state.proxy.ports: 6125jobmanager.memory.process.size: 3200mtaskmanager.memory.process.size: 2728mtaskmanager.memory.flink.size: 2280mparallelism.default: 2log4j-console.properties: |+# This affects logging for both user code and FlinkrootLogger.level = INFOrootLogger.appenderRef.console.ref = ConsoleAppenderrootLogger.appenderRef.rolling.ref = RollingFileAppender# Uncomment this if you want to _only_ change Flink's logging#logger.flink.name = org.apache.flink#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on# log level INFO. The root logger does not override this. You have to manually# change the log levels here.logger.akka.name = akkalogger.akka.level = INFOlogger.kafka.name= org.apache.kafkalogger.kafka.level = INFOlogger.hadoop.name = org.apache.hadooplogger.hadoop.level = INFOlogger.zookeeper.name = org.apache.zookeeperlogger.zookeeper.level = INFO# Log all infos to the consoleappender.console.name = ConsoleAppenderappender.console.type = CONSOLEappender.console.layout.type = PatternLayoutappender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n# Log all infos in the given rolling fileappender.rolling.name = RollingFileAppenderappender.rolling.type = RollingFileappender.rolling.append = falseappender.rolling.fileName = ${sys:log.file}appender.rolling.filePattern = ${sys:log.file}.%iappender.rolling.layout.type = PatternLayoutappender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%nappender.rolling.policies.type = Policiesappender.rolling.policies.size.type = SizeBasedTriggeringPolicyappender.rolling.policies.size.size=100MBappender.rolling.strategy.type = DefaultRolloverStrategyappender.rolling.strategy.max = 10# Suppress the irrelevant (wrong) warnings from the Netty channel handlerlogger.netty.name = org.jboss.netty.channel.DefaultChannelPipelinelogger.netty.level = OFF
2.2.5 jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager
spec:type: ClusterIPports:- name: rpcport: 6123- name: blob-serverport: 6124- name: webuiport: 8081selector:app: flinkcomponent: jobmanager
2.2.6 jobmanager-rest-service.yaml
将 jobmanager rest 端口公开为公共 Kubernetes 节点的端口。
apiVersion: v1
kind: Service
metadata:name: flink-jobmanager-rest
spec:type: NodePortports:- name: restport: 8081targetPort: 8081nodePort: 30081selector:app: flinkcomponent: jobmanager
2.2.7 taskmanager-query-state-service.yaml
公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口。
apiVersion: v1
kind: Service
metadata:name: flink-taskmanager-query-state
spec:type: NodePortports:- name: query-stateport: 6125targetPort: 6125nodePort: 30025selector:app: flinkcomponent: taskmanager
2.2.8 jobmanager-application-non-ha.yaml
apiVersion: batch/v1
kind: Job
metadata:name: flink-jobmanager
spec:template:metadata:labels:app: flinkcomponent: jobmanagerspec:restartPolicy: OnFailurecontainers:- name: jobmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2env:args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.windowing.TopSpeedWindowing","--output","/tmp/result"]ports:- containerPort: 6123name: rpc- containerPort: 6124name: blob-server- containerPort: 8081name: webuilivenessProbe:tcpSocket:port: 6123initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf- name: job-artifacts-volumemountPath: /opt/apache/flink-1.16.2/usrlibsecurityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /mnt/bigdata/flink/usrlib
2.2.9 taskmanager-job-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:name: flink-taskmanager
spec:replicas: 2selector:matchLabels:app: flinkcomponent: taskmanagertemplate:metadata:labels:app: flinkcomponent: taskmanagerspec:containers:- name: taskmanagerimage: 192.168.20.62:2333/bigdata/flink-centos-admin-application:1.16.2env:args: ["taskmanager"]ports:- containerPort: 6122name: rpc- containerPort: 6125name: query-statelivenessProbe:tcpSocket:port: 6122initialDelaySeconds: 30periodSeconds: 60volumeMounts:- name: flink-config-volumemountPath: /opt/apache/flink-1.16.2/conf- name: job-artifacts-volumemountPath: /opt/apache/flink-1.16.2/usrlibsecurityContext:runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessaryvolumes:- name: flink-config-volumeconfigMap:name: flink-configitems:- key: flink-conf.yamlpath: flink-conf.yaml- key: log4j-console.propertiespath: log4j-console.properties- name: job-artifacts-volumehostPath:path: /mnt/bigdata/flink/usrlib
2.2.10 创建 flink 集群并提交任务
kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink# Create the deployments for the cluster
kubectl create -f jobmanager-application-non-ha.yaml -n flink
kubectl create -f taskmanager-job-deployment.yaml -n flink
2.2.11 删除 flink 集群
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f jobmanager-rest-service.yaml -n flink
kubectl delete -f taskmanager-query-state-service.yaml -n flink
kubectl delete -f jobmanager-application-non-ha.yaml -n flink
kubectl delete -f taskmanager-job-deployment.yaml -n flinkkubectl delete ns flink --force
相关文章:
k8s之flink的几种创建方式
在此之前需要部署一下私人docker仓库,教程搭建 Docker 镜像仓库 注意:每台节点的daemon.json都需要配置"insecure-registries": ["http://主机IP:8080"] 并重启 一、session 模式 Session 模式是指在 Kubernetes 上启动一个共享的…...
应用OpenCV绘制箭头
绘制箭头函数 方法:函数cv2.arrowedLine( ) 语法格式:cv2.arrowedLine(img, pt1, pt2, color[, thickness[, line_type[, shift[, tipLength]]]]) 参数说明: img:要画的直线所在的图像,也称为画布。。 pt1&#x…...
信息学奥赛一本通1032:大象喝水查
1032:大象喝水查 时间限制: 1000 ms 内存限制: 65536 KB 提交数: 104347 通过数: 64726 【题目描述】 一只大象口渴了,要喝20升水才能解渴,但现在只有一个深h厘米,底面半径为r厘米的小圆桶(h和r都是整数)。问大象至少…...
聊聊jvm的direct buffer统计
序 本文主要研究一下jvm的direct buffer统计 spring boot metrics jvm.memory.used {"name": "jvm.memory.used","description": "The amount of used memory","baseUnit": "bytes","measurements"…...
C/C++ 位段
目录 什么是位段? 位段的内存分配 位段的跨平台问题 什么是位段? 位段的声明与结构是类似的,但是有两个不同: 位段的成员必须是 int、unsigned int 或signed int 等整型家族。位段的成员名后边有一个冒号和一个数字 这是一个…...
Peter算法小课堂—树的应用
开篇先给大家讲个东西,叫vector,有老师称之为“向量”,当然与数学中的向量不一样啊,所以我要称之为“长度可变的数组” vector 头文件:#include <vector> 用法:vector<int> d; 尾部增加元素…...
FineBI:简介
1 介绍 FineBI 是帆软软件有限公司推出的一款商业智能(Business Intelligence)产品。 FineBI 是定位于自助大数据分析的 BI 工具,能够帮助企业的业务人员和数据分析师,开展以问题导向的探索式分析。 2 现阶段数据分析弊端 现阶…...
原神单机版【完全无脑搭建】⭐纯单机⭐*稳定版*
版本介绍 版本3.7稳定版【过分追新并不稳,合理才完美】 独家原神,游戏内自带剧情任务,完美仿官,一比一完美复制! 已经拥有完美剧情、任务、副本、卡池、深渊、全物品、和全部功能和皮肤。 送:GM全套工具…...
用通俗易懂的方式讲解:万字长文带你入门大模型
告别2023,迎接2024。大模型技术已成为业界关注焦点,你是否也渴望掌握这一领域却又不知从何学起? 本篇文章将特别针对入门新手,以浅显易懂的方式梳理大模型的发展历程、核心网络结构以及数据微调等关键技术。 如果你在阅读中收获…...
Invalid options in vue.config.js: “plugins“ is not allowed
项目场景: 安装并配置elementPlus报错。 问题描述 "plugins" is not allowed. plugins不被允许。参考官网修改配置文件vue.config.js。 解决方案: const AutoImport require(unplugin-auto-import/webpack) const Components require(un…...
四、C语言中的数组:数组的创建与初始化
其实在之前的学习中我们已经或多或少接触到了数组,有关scanf()的安全用法中我们提到了如何避免数组溢出的问题,详情可以查看二、C语言数据类型与变量(scanf和printf (4)完) 这一章我们将详细学习数组在C语言中的应用 1.数组的概…...
html5中各标签的语法格式总结以及属性值说明
有关闭标签的元素 a元素 <a href"" target"" title""></a>表格相关元素 table元素:表格标签caption元素:表头thead元素tbody元素:表格主体元素tfoot元素th元素tr元素:行标签td元素&…...
力扣(leetcode)第412题Fizz Buzz(Python)
412.Fizz Buzz 题目链接:412.Fizz Buzz 给你一个整数 n ,找出从 1 到 n 各个整数的 Fizz Buzz 表示,并用字符串数组 answer(下标从 1 开始)返回结果,其中: answer[i] “FizzBuzz” 如果 i 同…...
苦学golang半年,写了一款web服务器
苦学golang半年,写了一款web服务器 文章目录 苦学golang半年,写了一款web服务器example 项目地址:https://github.com/fengyuan-liang/jet-web-fasthttp 可以的话,请star支持一下🙂 苦学golang半年,写了一款…...
uniapp vue2 车牌号输入组件记录
uniapp vue2 车牌号输入案例记录 组件如图 直接上代码 1.html <template><view><view class"plate" :class"{show: show}"><view class"itemFirst flex-d"><view class"item item1" click"handl…...
Unity 点击对话系统(含Demo)
点击对话系统 可实现点击物体后自动移动到物体附近,然后弹出对话框进行对话。 基于Unity 简单角色对话UI脚本的编写(新版UI组件)和Unity 关于点击不同物品移动并触发不同事件的结合体,有兴趣可以看一下之前文章。 下边代码为U…...
vue接入高德地图
使用 JSAPI 安全模式,代理服务请以_AMapService 作为一级路由 index.html <script type"text/javascript">window._AMapSecurityConfig {serviceHost: "http://xx.xx.xx.xx:8223/_AMapService"};</script><script type"text/javascr…...
Linux的基本指令(5)
目录 bc指令 uname指令 压缩解压相关的指令 zip指令 unzip指令 tar打包压缩指令 tar解压解包指令 传输指令sz&rz 热键 关机命令 安装:yum install -y 指令 bc指令 bc命令可以很方便的进行浮点运算 Linux中的计算器 uname指令 语法:unam…...
华为商城秒杀时加密验证 device_data 的算法研究
前言 之前华为商城放出 Mate60 手机时, 想给自己和家人抢购一两台,手动刷了好几天无果后,决定尝试编写程序,直接发送 POST 请求来抢。通过抓包和简单重放发送后,始终不成功。仔细研究,发现 Cookie 中有一个名为 devic…...
Wrk压测发送Post请求的正确姿势
一、Wrk简介 wrk 是一个能够在单个多核 CPU 上产生显著负载的现代 HTTP 基准测试工具。它采用了多线程设计,并使用了像 epoll 和 kqueue 这样的可扩展事件通知机制。此外,用户可以指定 LuaJIT 脚本来完成 HTTP 请求生成、响应处理和自定义报告等功能。 …...
浏览器访问 AWS ECS 上部署的 Docker 容器(监听 80 端口)
✅ 一、ECS 服务配置 Dockerfile 确保监听 80 端口 EXPOSE 80 CMD ["nginx", "-g", "daemon off;"]或 EXPOSE 80 CMD ["python3", "-m", "http.server", "80"]任务定义(Task Definition&…...
通过Wrangler CLI在worker中创建数据库和表
官方使用文档:Getting started Cloudflare D1 docs 创建数据库 在命令行中执行完成之后,会在本地和远程创建数据库: npx wranglerlatest d1 create prod-d1-tutorial 在cf中就可以看到数据库: 现在,您的Cloudfla…...
服务器硬防的应用场景都有哪些?
服务器硬防是指一种通过硬件设备层面的安全措施来防御服务器系统受到网络攻击的方式,避免服务器受到各种恶意攻击和网络威胁,那么,服务器硬防通常都会应用在哪些场景当中呢? 硬防服务器中一般会配备入侵检测系统和预防系统&#x…...
srs linux
下载编译运行 git clone https:///ossrs/srs.git ./configure --h265on make 编译完成后即可启动SRS # 启动 ./objs/srs -c conf/srs.conf # 查看日志 tail -n 30 -f ./objs/srs.log 开放端口 默认RTMP接收推流端口是1935,SRS管理页面端口是8080,可…...
【2025年】解决Burpsuite抓不到https包的问题
环境:windows11 burpsuite:2025.5 在抓取https网站时,burpsuite抓取不到https数据包,只显示: 解决该问题只需如下三个步骤: 1、浏览器中访问 http://burp 2、下载 CA certificate 证书 3、在设置--隐私与安全--…...
Java 加密常用的各种算法及其选择
在数字化时代,数据安全至关重要,Java 作为广泛应用的编程语言,提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景,有助于开发者在不同的业务需求中做出正确的选择。 一、对称加密算法…...
使用 SymPy 进行向量和矩阵的高级操作
在科学计算和工程领域,向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能,能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作,并通过具体…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...
uniapp手机号一键登录保姆级教程(包含前端和后端)
目录 前置条件创建uniapp项目并关联uniClound云空间开启一键登录模块并开通一键登录服务编写云函数并上传部署获取手机号流程(第一种) 前端直接调用云函数获取手机号(第三种)后台调用云函数获取手机号 错误码常见问题 前置条件 手机安装有sim卡手机开启…...
MFE(微前端) Module Federation:Webpack.config.js文件中每个属性的含义解释
以Module Federation 插件详为例,Webpack.config.js它可能的配置和含义如下: 前言 Module Federation 的Webpack.config.js核心配置包括: name filename(定义应用标识) remotes(引用远程模块࿰…...
