在 Kubernetes 上部署和使用 Chunjun
纯钧(Chunjun, 原名 FlinkX)是一个国产的、流批一体的、分布式数据同步工具,支持多种数据源和数据目的地,具有高性能、低延迟、高可靠性等特点。本文记录一下如何在 Kubernetes 集群上部署一个包含纯钧的 Flink 集群(Standalone session 模式),以及如何使用。
前言
本篇文章使用的软件版本如下:
软件 | 版本 |
---|---|
纯钧 | release 1.12 |
Flink | 1.12.7 |
Kubernetes | 1.32 |
部署
1. 准备
纯钧自身在源码内已经提供了 Kubernetes Session 模式和 application 模式部署的入口,这里我们使用 Session 模式部署。由于我们需要使用纯钧自身提供的任务提交方式,因此我们需要自己打一个包含纯钧的 Flink 镜像。
1.1 Docker 镜像
我们无需对 Flink 做任何修改,Flink 官方提供的镜像已经满足需要了,只需要将纯钧 Lib 复制进去,在此提供我自己使用的 Dockerfile 参考,略过此节。如果有需要对 Flink 镜像进行修改(制作自定义 Flink 镜像),可以参考Flink 官方Docker 库。
自行按需从源码构建纯钧本体,或下载官方 release 版本。
创建 Dockerfile
## From base Flink image
FROM --platform=linux/amd64 apache/flink:1.12.7-scala_2.12
WORKDIR /opt
## Copy Chunjun distribution
COPY chunjun-dist-1.12-SNAPSHOT.tar.gz .
RUN set -ex; \
mkdir -p /opt/chunjun-dist-1.12-SNAPSHOT; \
tar -zxf /opt/chunjun-dist-1.12-SNAPSHOT.tar.gz -C /opt/chunjun-dist-1.12-SNAPSHOT; \
chown -R flink: /opt/chunjun-dist-1.12-SNAPSHOT; \
rm -rf /opt/chunjun-dist-1.12-SNAPSHOT.tar.gz; \
mkdir -p /opt/flink/lib/chunjun; \
chown -R flink: /opt/flink/lib/chunjun; \
cp -R /opt/chunjun-dist-1.12-SNAPSHOT/chunjun-dist/* /opt/flink/lib/chunjun/
ENV CHUNJUN_HOME=/opt/chunjun-dist-1.12-SNAPSHOT
提示
chunjun-dist-1.12-SNAPSHOT.tar.gz 是纯钧 package 后的完整安装包,包含 bin
目录,而不只是 chunjun-dist
目录。
- 构建镜像并推送到私有库
docker buildx build --platform linux/amd64 -t 169.254.71.10:40600/flink:1.12.7 .
docker push 169.254.71.10:40600/flink:1.12.7
注意构建的镜像的架构一致性(指的就是你——Apple M芯片)。如果构建了 ARM 架构的镜像,在 x86 架构的 K8s 集群是无法运行的。
1.2 Kubernetes 集群
在此略过 K8s 集群的部署,总之你需要有一个搭建好的 Kubernetes 集群(使用 Minikube 也是可以的)。后续会单独写一篇简单的文章记录 K8s 集群的部署。
演示的 K8s 集群配置如下:
主机名 | IP 地址 | 角色 | host 解析 | 硬件配置 |
---|---|---|---|---|
master | 192.168.46.134 | 主节点(控制面所在) | master | 4c6g |
node1 | 192.168.46.135 | 工作节点 | node1 | 2c4g |
node2 | 192.168.46.136 | 工作节点 | node2 | 2c4g |
其中主节点是默认污点配置(不允许分配工作 Pod)。
2. 部署
2.1 YAML 配置
首先给出使用到的所有 YAML 文件,后续会逐一解释。本次搭建的是一个 Flink Standalone 集群,运行在 Session 模式,并且没有配置高可用。
Flink 配置
flink-configuration-configmap.yamlapiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 200 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 queryable-state.proxy.ports: 6125 jobmanager.memory.process.size: 1600m taskmanager.memory.process.size: 1728m taskmanager.memory.managed.size: 96m taskmanager.memory.network.max: 256m taskmanager.memory.jvm-overhead.max: 256m taskmanager.memory.jvm-overhead.min: 128m taskmanager.memory.jvm-metaspace.size: 256m taskmanager.memory.framework.off-heap.size: 128m classloader.resolve-order: parent-first restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10s rest.bind-address: 0.0.0.0 rest.address: 0.0.0.0 parallelism.default: 1 log4j-console.properties: |+ # This affects logging for both user code and Flink rootLogger.level = INFO rootLogger.appenderRef.console.ref = ConsoleAppender rootLogger.appenderRef.rolling.ref = RollingFileAppender # Uncomment this if you want to _only_ change Flink's logging #logger.flink.name = org.apache.flink #logger.flink.level = INFO # The following lines keep the log level of common libraries/connectors on # log level INFO. The root logger does not override this. You have to manually # change the log levels here. logger.akka.name = akka logger.akka.level = INFO logger.kafka.name= org.apache.kafka logger.kafka.level = INFO logger.hadoop.name = org.apache.hadoop logger.hadoop.level = INFO logger.zookeeper.name = org.apache.zookeeper logger.zookeeper.level = INFO # Log all infos to the console appender.console.name = ConsoleAppender appender.console.type = CONSOLE appender.console.layout.type = PatternLayout appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n # Log all infos in the given rolling file appender.rolling.name = RollingFileAppender appender.rolling.type = RollingFile appender.rolling.append = false appender.rolling.fileName = ${sys:log.file} appender.rolling.filePattern = ${sys:log.file}.%i appender.rolling.layout.type = PatternLayout appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n appender.rolling.policies.type = Policies appender.rolling.policies.size.type = SizeBasedTriggeringPolicy appender.rolling.policies.size.size=100MB appender.rolling.strategy.type = DefaultRolloverStrategy appender.rolling.strategy.max = 10 # Suppress the irrelevant (wrong) warnings from the Netty channel handler logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline logger.netty.level = OFF
提示
flink-conf.yaml
jobmanager.rpc.address
:Job Manager 的地址,这里使用 Service 名称,K8s 会自动解析为 Cluster IP。rest.bind-address
和rest.address
:配置外网访问 REST API。classloader.resolve-order
:类加载顺序,避免类加载冲突。- 其余配置按需调整。对于纯钧同步任务场景,是用不上 Flink Managed Memory 的,可以将内存尽量分配到 Task Heap 去。
log4j-console.properties
- 日志配置,这里只是简单配置了一下,可以根据需要调整。
两个服务(内部 Cluster IP 和对外暴露 REST Port 的 Node Port)
jobmanager-service.yamlapiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob-server port: 6124 selector: app: flink component: jobmanager
提示
内部的 Cluster IP 配置,和上方配置的 Config Map 的配置对照。
jobmanager-service-rest.yamlapiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 targetPort: 8081 nodePort: 30081 selector: app: flink component: jobmanager
提示
可选的对外暴露 REST 端点,便于访问管理。按自己的端口需求调整。
Job Manager Deployment
jobmanager-session-deployment-non-ha.yamlapiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager resources: requests: memory: "1600Mi" cpu: "300m" limits: memory: "1800Mi" cpu: "500m" image: 169.254.71.10:40600/flink:1.12.7 args: [ "jobmanager" ] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob-server - containerPort: 8081 name: webui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
提示
replicas
:Job Manager 的副本数,这里是 1。超过 1 是高可用模式的配置,需要设置对应的高可用配置才会使额外的 JM 有用。resources
:资源分配限制配置,根据实际情况调整。如果不设置则不受限(受限于 Node 可用资源)。
Task Manager Deployment
taskmanager-session-deployment.yamlapiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 3 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager resources: requests: memory: "1728Mi" cpu: "500m" limits: memory: "1800Mi" cpu: "1000m" image: 169.254.71.10:40600/flink:1.12.7 args: ["taskmanager"] ports: - containerPort: 6122 name: rpc - containerPort: 6125 name: query-state livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j-console.properties path: log4j-console.properties
提示
replicas
:Task Manager 的副本数,这里是 3。根据实际情况调整。resources
:资源分配限制配置,根据实际情况调整。如果不设置则不受限(受限于 Node 可用资源)。
2.2 部署集群
按照 ConfigMap -> Service -> Deployment(先 JM 后 TM)
的顺序使用 kubectl 部署所有组件。
kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-service-rest.yaml
kubectl create -f jobmanager-session-deployment-non-ha.yaml
kubectl create -f taskmanager-session-deployment.yaml
部署完之后检查一下 Pod 状态:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-jobmanager-679b597656-bdm5j 1/1 Running 0 59m
flink-taskmanager-9b566bfc-7s76k 1/1 Running 0 59m
flink-taskmanager-9b566bfc-s28xp 1/1 Running 0 59m
flink-taskmanager-9b566bfc-zcwnv 1/1 Running 0 59m
2.3 提交纯钧任务
在这里我们使用纯钧自带的任务提交方式提交一个任务,这里以一个简单的 Oracle 到 PostgreSQL 的同步任务为例。
来到安装了 kubectl 并且配置好了 Kube Config 的任意机器上,确保机器上有纯钧运行所需的基础环境:
- 纯钧本体可执行文件
- 本地 Flink(非容器内)
- Java
FLINK_HOME
等环境变量
使用纯钧自带的 chunjun-kubernetes-session.sh
提交一个 Session 模式集群任务:
./chunjun-kubernetes-session.sh \
-job test.json \
-jobName kubernetes-job \
-flinkConfDir /home/s/flink-1.12.7/conf \
-flinkLibDir /home/s/flink-1.12.7/lib \
-chunjunDistDir /home/s/chunjun-1.12/chunjun-dist \
-confProp "{\"kubernetes.config.file\":\"/home/s/.kube/config\",\"kubernetes.cluster-id\":\"flink-jobmanager\"}"
提交指令参数说明
提交指令分为两部分,前半部分是纯钧提交任务的基本参数,你至少需要指定:
-job
:任务 JSON 文件-flinkConfDir
:本机本地的 Flink 配置文件目录-flinkLibDir
:本机本地的 Flink Lib 目录-chunjunDistDir
:本机本地的纯钧 Dist 目录(目录下有chunjun-core.jar
)
后半部分 confProp
是 Flink 集群的配置参数(Json 格式),你至少需要指定:
kubernetes.config.file
:本机的 Kube Config 文件路径(通常在安装集群时使用的用户的home/.kube/
内)kubernetes.cluster-id
:Flink 集群的 Job Manager Deployment 名称- 如果你在创建集群的时候指定了自定义的 Kubernetes 命名空间,还需要指定
kubernetes.namespace
参数
如果提交成功,访问暴露的 REST 端点,通过 WebUI 应该能够看到任务正确运行。至此一个不包含高可用的 Flink Session 集群就部署完成了。后续可以根据需要自行调整 Flink 集群的配置(ConfigMap)以及配置高可用。
如果需要更新纯钧 Lib,可以重新构建镜像并更新 Deployment。
踩坑
集群启动失败,Pod 无法启动,Pod 日志报找不到主类:
Error: Could not find or load main class org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
通常来说这是因为你使用了 官方 Flink 镜像,如果你使用的是 DockerHub 中的 Official 镜像,在部分版本可能会出现这个问题。将你的基础镜像更换到 apache/flink 这个镜像即可。或按需自己从头构建镜像。
集群启动失败,Pod 无法启动。
检查你的 Docker 镜像架构是否与 K8s 集群架构一致,如果不一致,需要重新构建镜像。使用 Apple M 芯片进行开发尤其需要注意,M 系列芯片是 ARM 架构,默认构建的镜像也是 ARM 架构的。
集群已经搭建好了,也能运行 Flink 官方的示例任务,但是提交纯钧任务直接失败。错误一般是
java.lang.ClassCastException
。这是一个类加载顺序问题。修改集群的 Config Map,在
flink-conf.yaml
中添加classloader.resolve-order: parent-first
,重启集群即可。