在线精品99_中国九九盗摄偷拍偷看_91免费版在线观看_91.app_91高清视频在线_99热最新网站

flink中如何实现基于k8s的环境搭建

153次阅读
没有评论

共计 9812 个字符,预计需要花费 25 分钟才能阅读完成。

这篇文章主要介绍了 flink 中如何实现基于 k8s 的环境搭建,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让丸趣 TV 小编带着大家一起了解一下。

前面写了一些 flink 的基础组件,但是还没有说过 flink 的环境搭建,现在我们来说下基本的环境搭建
1. 使用 StatefulSet 的原因
对于 Flink 来说,使用 sts 的最大的原因是 pod 的 hostname 是有序的;这样潜在的好处有
hostname 为 - 0 和 - 1 的 pod 可以直接指定为 jobmanager;可以使用一个 statefulset 启动一个 cluster,而 deployment 必须 2 个;Jobmanager 和 TaskManager 分别独立的 deployment
pod 由于各种原因 fail 后,由于 StatefulSet 重新拉起的 pod 的 hostname 不变,集群 recover 的速度理论上可以比 deployment 更快(deployment 每次主机名随机)
2. 使用 StatefulSet 部署 Flink
2.1 docker 的 entrypoint
由于要由主机名来判断是启动 jobmanager 还是 taskmanager,因此需要在 entrypoint 中去匹配设置的 jobmanager 的主机名是否有一致
传入参数为:cluster ha;则自动根据主机名判断启动那个角色;也可以直接指定角色名称
docker-entrypoint.sh 的脚本内容如下:

#!/bin/sh
 
# If unspecified, the hostname of the container is taken as the JobManager address
ACTION_CMD= $1 
# if use cluster model, pod ${JOB_CLUSTER_NAME}-0,${JOB_CLUSTER_NAME}-1 as jobmanager
if [ ${ACTION_CMD} ==  cluster  ]; then
 jobmanagers=(${JOB_MANGER_HOSTS//,/ })
 ACTION_CMD= taskmanager 
 for i in ${!jobmanagers[@]}
 do
 if [  $(hostname -s)  ==  ${jobmanagers[i]}  ]; then
 ACTION_CMD= jobmanager 
 echo  pod hostname match jobmanager config host, change action to jobmanager. 
 fi
 done
 
# if ha model, replace ha configuration
if [  $2  ==  ha  ]; then
 sed -i -e  s|high-availability.cluster-id: cluster-id|high-availability.cluster-id: ${FLINK_CLUSTER_IDENT}|g   $FLINK_CONF_DIR/flink-conf.yaml 
 sed -i -e  s|high-availability.zookeeper.quorum: localhost:2181|high-availability.zookeeper.quorum: ${FLINK_ZK_QUORUM}|g   $FLINK_CONF_DIR/flink-conf.yaml 
 sed -i -e  s|state.backend.fs.checkpointdir: checkpointdir|state.backend.fs.checkpointdir: hdfs:///user/flink/flink-checkpoints/${FLINK_CLUSTER_IDENT}|g   $FLINK_CONF_DIR/flink-conf.yaml 
 sed -i -e  s|high-availability.storageDir: hdfs:///flink/ha/|high-availability.storageDir: hdfs:///user/flink/ha/${FLINK_CLUSTER_IDENT}|g   $FLINK_CONF_DIR/flink-conf.yaml 
 
if [ ${ACTION_CMD} ==  help  ]; then
 echo  Usage: $(basename  $0) (cluster ha|jobmanager|taskmanager|local|help) 
 exit 0
elif [ ${ACTION_CMD} ==  jobmanager  ]; then
 JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
 echo  Starting Job Manager 
 sed -i -e  s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: ${JOB_MANAGER_RPC_ADDRESS}/g   $FLINK_CONF_DIR/flink-conf.yaml 
 sed -i -e  s/jobmanager.heap.mb: 1024/jobmanager.heap.mb: ${JOB_MANAGER_HEAP_MB}/g   $FLINK_CONF_DIR/flink-conf.yaml 
 
 echo  config file:     grep  ^[^\n#]   $FLINK_CONF_DIR/flink-conf.yaml 
 exec  $FLINK_HOME/bin/jobmanager.sh  start-foreground cluster
 
elif [ ${ACTION_CMD} ==  taskmanager  ]; then
 TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
 echo  Starting Task Manager 
 
 sed -i -e  s/taskmanager.heap.mb: 1024/taskmanager.heap.mb: ${TASK_MANAGER_HEAP_MB}/g   $FLINK_CONF_DIR/flink-conf.yaml 
 sed -i -e  s/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: $TASK_MANAGER_NUMBER_OF_TASK_SLOTS/g   $FLINK_CONF_DIR/flink-conf.yaml 
 
 echo  config file:     grep  ^[^\n#]   $FLINK_CONF_DIR/flink-conf.yaml 
 exec  $FLINK_HOME/bin/taskmanager.sh  start-foreground
elif [ ${ACTION_CMD} ==  local  ]; then
 echo  Starting local cluster 
 exec  $FLINK_HOME/bin/jobmanager.sh  start-foreground local
 
exec  $@

2.2. 使用 ConfigMap 分发 hdfs 和 flink 配置文件
ConfigMap 介绍参考:
https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#create-configmaps-from-files
Q:为什么使用 ConfigMap
A:由于 hadoop 配置文件在不同的环境不一样,不方便打包到镜像里面;因此合适的方式就只有 2 种,使用 ConfigMap 和 Pod 的 InitContainer。使用 InitContainer 的话,可以 wget 获取远程的一个配置文件,但是这样还需要依赖一个配置服务。相比而已,ConfigMap 更简单。
创建 ConfigMap
[root@rc-mzgjg ~]# kubectl create configmap hdfs-conf –from-file=hdfs-site.xml –from-file=core-site.xml
[root@rc-mzgjg ~]# kubectl create configmap flink-conf –from-file=flink-conf/log4j-console.properties –from-file=flink-conf/flink-conf.yaml
使用 describe 命令查看创建的名词为 hdfs-conf 的 ConfigMap,会显示文件的内容到控制台
[root@rc-mzgjg ~]# kubectl describe configmap hdfs-conf
Name:         hdfs-conf
Namespace:    default
Labels:       none
Annotations:   none
Data
====
core-site.xml:
通过 volumeMounts 使用 ConfigMap
Pod 的 Container 要使用配置文件,则可以通过 volumeMounts 方式挂载到 Container 中。如下 demo 所示,将配置文件挂载到 /home/xxxx/conf/hadoop 目录下

apiVersion: apps/v1
kind: StatefulSet
metadata:
 name: flink-jm
spec:
 selector:
 matchLabels:
 app: flink-jm
 serviceName: flink-jm
 replicas: 2
 podManagementPolicy: Parallel
 template:
 metadata:
 labels:
 app: flink-jm
 spec:
 terminationGracePeriodSeconds: 2
 containers:
 - name: test
 imagePullPolicy: Always
 image: ip:5000/test:latest
 args: [sleep ,  1d]
 volumeMounts:
 - name: hdfs-conf
 mountPath: /home/xxxx/conf/hadoop
 volumes:
 - name: hdfs-conf
 configMap:
 # Provide the name of the ConfigMap containing the files you want to add to the container
 name: hdfs-conf

创建好 Pod 后,查看配置文件的挂载
[hadoop@flink-jm-0 hadoop]$ ll /home/xxxx/conf/hadoop
total 0
lrwxrwxrwx. 1 root root 20 Apr  9 06:54 core-site.xml – ..data/core-site.xml
lrwxrwxrwx. 1 root root 20 Apr  9 06:54 hdfs-site.xml – ..data/hdfs-site.xml
配置文件是链接到了..data 目录
1.10 才能支持 Pod 多 Container 的 namespace 共享
最初的想法是一个 Pod 里面多个 Container,然后配置文件是其中一个 Container;测试验证起数据目录并不能互相访问;如预想的配置,其中一个 Container 里面的 image 是 hdfs-conf 的配置文件

containers:
 - name: hdfs-conf
 imagePullPolicy: Always
 image: ip:5000/hdfs-dev:2.6
 args: [sleep ,  1d]
 - name: flink-jm
 imagePullPolicy: Always
 image: ip:5000/flink:1.4.2

实际验证,两个 Container 的只能共享网络,文件目录彼此看不见
“Share Process Namespace between Containers in a Pod”这个是 Kubernates 1.10 才开始支持,参考
https://kubernetes.io/docs/tasks/configure-pod-container/share-process-namespace/
2.3 StatefulSet 的配置
Flink 的配置文件和 hadoop 的配置文件,依赖 ConfigMap 来分发

环境变量名称

参数

内容

  说明

 

FLINK_CLUSTER_IDENT

namespace/StatefulSet.name

default/flink-cluster

用来做 zk ha 设置和 hdfs checkpiont 的根目录  

FLINK_ZK_QUORUM

env:FLINK_ZK_QUORUM

ip:2181

HA ZK 的地址  

JOB_MANAGER_HEAP_MB

env:JOB_MANAGER_HEAP_MB

value:containers.resources.memory.limit -1024

512JM 的 Heap 大小,由于存在 Netty 的堆外内存,需要小于 container.resources.memory.limits;否则容易 OOM kill 

JOB_MANGER_HOSTS
StatefulSet.name-0,StatefulSet.name-1
flink-cluster-0,flink-cluster-1

JM 的主机名,短主机名;可以不用 FQDN 

TASK_MANAGER_HEAP_MB

env:TASK_MANAGER_HEAP_MB 

value: containers.resources.memory.limit -1024

512TM 的 Heap 大小,由于存在 Netty 的堆外内存,需要小于 container.resources.memory.limits;否则容易 OOM kill 

TASK_MANAGER_NUMBER_OF_TASK_SLOTS

containers.resources.cpu.limits

2TM 的 slot 数量,根据 resources.cpu.limits 来设置  

Pod 的 imagePullPolicy 策略,测试环境 Always,每次都 pull,方便验证;线上则是 IfNotPresent;线上如果对 images 做了变更,必须更改 images 的 tag
完整的内容可以参考如下:

# headless service for statefulset
apiVersion: v1
kind: Service
metadata:
 name: flink-cluster
 labels:
 app: flink-cluster
spec:
 clusterIP: None
 ports:
 - port: 8080
 name: ui
 selector:
 app: flink-cluster
# create flink statefulset
apiVersion: apps/v1
kind: StatefulSet
metadata:
 name: flink-cluster
spec:
 selector:
 matchLabels:
 app: flink-cluster
 serviceName: flink-cluster
 replicas: 4
 podManagementPolicy: Parallel
 template:
 metadata:
 labels:
 app: flink-cluster
 spec:
 terminationGracePeriodSeconds: 2
 containers:
 - name: flink-cluster
 imagePullPolicy: Always
 image: ip:5000/flink:1.4.2
 args: [cluster ,  ha]
 volumeMounts:
 - name: hdfs-conf
 mountPath: /home/xxxx/conf/hadoop
 - name: flink-conf
 mountPath: /home/xxxx/conf/flink
 - name: flink-log
 mountPath: /home/xxxx/logs
 resources:
 requests:
 memory:  1536Mi 
 cpu: 1
 limits:
 memory:  1536Mi 
 cpu: 2
 env:
 - name: JOB_MANGER_HOSTS
 value:  flink-cluster-0,flink-cluster-1 
 - name: FLINK_CLUSTER_IDENT
 value:  default/flink-cluster 
 - name: TASK_MANAGER_NUMBER_OF_TASK_SLOTS
 value:  2 
 - name: FLINK_ZK_QUORUM
 value:  ip:2181 
 - name: JOB_MANAGER_HEAP_MB
 value:  512 
 - name: TASK_MANAGER_HEAP_MB
 value:  512 
 ports:
 - containerPort: 6124
 name: blob
 - containerPort: 6125
 name: query
 - containerPort: 8080
 name: flink-ui
 volumes:
 - name: hdfs-conf
 configMap:
 # Provide the name of the ConfigMap containing the files you want to add to the container
 name: hdfs-conf
 - name: flink-conf
 configMap:
 name: flink-conf
 - name: flink-log
 hostPath:
 # directory location on host
 path: /tmp
 # this field is optional
 type: Directory

 
3. 测试环境对外暴露 Flink UI
由于测试环境使用 Flannel 进行网络通信,在 K8S 集群外部无法访问到 Flink UI 的 IP 和端口,因此需要通过 NodePort 方式将内部 IP 映射出来。配置如下:

# only for test k8s cluster
# use service to expose flink jobmanager 0 s web port
apiVersion: v1
kind: Service
metadata:
 labels:
 app: flink-cluster
 statefulset.kubernetes.io/pod-name: flink-cluster-0
 name: flink-web-0
 namespace: default
spec:
 ports:
 - port: 8080
 protocol: TCP
 targetPort: 8080
 selector:
 app: flink-cluster
 statefulset.kubernetes.io/pod-name: flink-cluster-0
 type: NodePort
# use service to expose flink jobmanager 1 s web port
apiVersion: v1
kind: Service
metadata:
 labels:
 app: flink-cluster
 statefulset.kubernetes.io/pod-name: flink-cluster-1
 name: flink-web-1
 namespace: default
spec:
 ports:
 - port: 8080
 protocol: TCP
 targetPort: 8080
 selector:
 app: flink-cluster
 statefulset.kubernetes.io/pod-name: flink-cluster-1
 type: NodePort

 
4. 服务部署状态
执行完前面操作后,可以查看到当前的 StatefulSet 状态
[root@rc-mzgjg ~]# kubectl get sts flink-cluster -o wide
NAME            DESIRED   CURRENT   AGE       CONTAINERS      IMAGES
flink-cluster   4         4         1h        flink-cluster   ip:5000/flink:1.4.2
容器的 Pod 状态
[root@rc-mzgjg ~]# kubectl get pod -l app=flink-cluster  -o wide
NAME              READY     STATUS    RESTARTS   AGE       IP            NODE
flink-cluster-0   1/1       Running   0          1h        ip1   ip5
flink-cluster-1   1/1       Running   0          1h        ip2   ip6
flink-cluster-2   1/1       Running   0          1h        ip3   ip7
flink-cluster-3   1/1       Running   0          1h        ip4   ip8
相关的 Service 信息
[root@rc-mzgjg ~]# kubectl get svc -l app=flink-cluster  -o wide
NAME            TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)          AGE       SELECTOR
flink-cluster   ClusterIP   None             none        8080/TCP         2h        app=flink-cluster
flink-web-0     NodePort    10.254.8.103     none        8080:30495/TCP   1h        app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-0
flink-web-1     NodePort    10.254.172.158   none        8080:30158/TCP   1h        app=flink-cluster,statefulset.kubernetes.io/pod-name=flink-cluster-1
根据 Service 的信息;可以通过任何一个 k8s node 的 ip 地址加 PORT 来访问 Flink UI

这里主要说一下,在搭建的过程中遇到了一个和权限相关的问题
错误日志如下
ERROR setFile(null,true) call failed
FileNotFoundException:no such file or directory
原因:是因为 flink 服务缺少日志目录的权限
修改方式:
1.adduser flink 添加相应的用户
2.chown -R flink:flink /home/xxxx/logs

感谢你能够认真阅读完这篇文章,希望丸趣 TV 小编分享的“flink 中如何实现基于 k8s 的环境搭建”这篇文章对大家有帮助,同时也希望大家多多支持丸趣 TV,关注丸趣 TV 行业资讯频道,更多相关知识等着你来学习!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-25发表,共计9812字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)
主站蜘蛛池模板: 亚洲精品无码永久电影在线 | 男男h黄漫画啪啪无遮挡 | 欧美与黑人午夜性猛交久久久 | 亚洲中文字幕久久无码精品 | 成人精品一区二区久久久 | 亚洲色图欧美色 | a性片| 九色视频在线看 | 免费无码专区毛片高潮喷水 | 无码夜色一区二区三区 | 美女与男人对肌免费网站 | 免费一本色道久久一区 | 久久久久中文字幕 | 中国少妇的bbb真爽 中国少妇内射xxxhd | 黄色毛片在线看 | 国产精品成人免费 | 日本人视频jzzjzz | 成人av.com | 欧美性大战久久久久久久 | 国产精品亚洲片在线牛牛影视 | 少妇高潮毛片免费看 | 台湾佬视频在线偷拍 | 亚洲国产精品第一页 | 人妻无码熟妇乱又伦精品视频 | 午夜在线视频 | 成人福利网站 | 国产主播第一页 | 亚洲人成色99999在线观看 | 韩国一级特黄清高免费大片 | 99爱免费观看视频在线 | 禁断看护妇中文字幕在线视频 | 欧洲熟妇色xxxx欧美老妇免费 | a级毛片无码免费真人 | 九九九精品成人免费视频 | 天堂素人在线 | 日本一本视频 | 91一区二区三区四区五区 | 日韩av无码成人无码免费 | 好吊妞视频这里有精品 | 伊人久久大香线焦av综合影院 | 国产私密|