flink 1.12.2 standalone application cluster on k8s 集成 gitlab cicd 自动 savepoint 保存和恢复

cicd

cicd我使用的是gitlab cicd,runner用的是shell executor。

gitlab.ci

需要事先在instance上安装和配置sbt、docker、git和kubectl,截取一部分ci文件,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
stages:
- xx
- xx

Job_xx:
stage: xx
tags:
- xx-runner
script:
- sbt clean assembly
- mv ./target/scala-2.12/*.jar ./ops/xx.jar
- cd ops
- docker build -t xx/xx/xx:xx-$CI_PIPELINE_ID .
- docker push xx/xx/xx:xx-$CI_PIPELINE_ID
- git clone git@xx:xx.git
- cd xx/application-cluster
- sed -i "s!xx/xx/xx:.*!xx/xx/xx:xx-$CI_PIPELINE_ID!" jobmanager-deployment.yaml
- sed -i "s!xx/xx/xx:.*!xx/xx/xx:xx-$CI_PIPELINE_ID!" taskmanager-deployment.yaml
- kubectl apply -f .
- git -c core.quotepath=false -c log.showSignature=false commit -a -m "*** gitlab-auto-deploy ***"
- git -c core.quotepath=false -c log.showSignature=false fetch origin --recurse-submodules=no --progress --prune
- git -c core.quotepath=false -c log.showSignature=false rebase origin/master
- git -c core.quotepath=false -c log.showSignature=false push --progress --porcelain origin refs/heads/master:master --follow-tags

k8s集成savepoint

checkpoint的目录是/data/flink/backend,savepoint的目录是/data/flink/savepoint。

通过重写command和args实现savepoint启动,

通过preStop钩子函数实现savepoint保存,对是stop不是cancel,cancel就交给k8s的api service去做就好了。

jobmanager和taskmanager文件,如下:

jobmanager-deployment.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-ac-jobmanager
namespace: xx
spec:
# 关闭滚动更新,为了job manager先生成savepoint再restore。
strategy:
type: Recreate
replicas: 1
selector:
matchLabels:
app: flink-ac
component: jobmanager
template:
metadata:
labels:
app: flink-ac
component: jobmanager
spec:
# 代码里checkpoint是1分钟超时,这里设置两分钟是为让k8s优雅关闭的时候能够正确执行term信号
# 一般1秒内完成checkpoint
terminationGracePeriodSeconds: 120
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- flink-ac-jobmanager
topologyKey: "kubernetes.io/hostname"
imagePullSecrets:
- name: docker
restartPolicy: Always
containers:
- name: jobmanager
image: xx/x/xx:xx-22045
command:
- "/bin/sh"
- "-c"
args:
- |
# shell of this block already is not useful
mkdir -p /data/flink/backend;
cd /data/flink/backend;
rm -rf *

# for get newest savepoint
mkdir -p /data/flink/savepoint;
cd /data/flink/savepoint;
ls -lt | grep d | awk -F ' ' '{if(NR>1) print $9}' | xargs rm -rf;

# get newest savepoint
savepoint_path=`ls -lt | grep d | awk -F ' ' '{if(NR==1) print $9}'`
# savepoint exeists
# job id need to be fixed
[ "$savepoint_path" != "" ] && echo '>>> start with savepoint <<<' && /docker-entrypoint.sh standalone-job --allowNonRestoredState --job-classname xx --job-id 00000000000000000000000000000000 --fromSavepoint $savepoint_path;
# savepoint not exeists
# job id need to be fixed
[ "$savepoint_path" = "" ] && echo '>>> start without savepoint <<<' && /docker-entrypoint.sh standalone-job --allowNonRestoredState --job-classname xx --job-id 00000000000000000000000000000000;
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
envFrom:
- configMapRef:
name: flink
env:
# for cicd test use
- name: CICD_VERSION
value: "20210317193502"
lifecycle:
preStop:
exec:
command:
- "/bin/sh"
- "-c"
- |
rm -rf /data/flink/savepoint/*;
/opt/flink/bin/flink stop -m 127.0.0.1:8081 -p file:///data/flink/savepoint 00000000000000000000000000000000 > /data/flink/k8s_pre_stop.log;
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-ac-config-volume
mountPath: /opt/flink/conf
- name: flink-data-volume
mountPath: /data/flink
securityContext:
runAsUser: 9999
volumes:
- name: flink-ac-config-volume
configMap:
name: flink-ac-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-data-volume
persistentVolumeClaim:
claimName: flink-pvc
---
apiVersion: v1
kind: Service
metadata:
name: flink-ac-jobmanager-rest
namespace: xx
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 10012
selector:
app: flink-ac
component: jobmanager
---
apiVersion: v1
kind: Service
metadata:
name: flink-ac-jobmanager
namespace: xx
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink-ac
component: jobmanager

taskmanager-deployment.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-ac-taskmanager
namespace: xx
spec:
# 防止task manager连不上job manager
strategy:
type: Recreate
replicas: 2
selector:
matchLabels:
app: flink-ac
component: taskmanager
template:
metadata:
labels:
app: flink-ac
component: taskmanager
spec:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- flink-ac-taskmanager
topologyKey: "kubernetes.io/hostname"
imagePullSecrets:
- name: docker
restartPolicy: Always
containers:
- name: taskmanager
image: xx/xx/xx:xx-22045
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
envFrom:
- configMapRef:
name: flink
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-ac-config-volume
mountPath: /opt/flink/conf
- name: flink-data-volume
mountPath: /data/flink
securityContext:
runAsUser: 9999
volumes:
- name: flink-ac-config-volume
configMap:
name: flink-ac-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-data-volume
persistentVolumeClaim:
claimName: flink-pvc
---
apiVersion: v1
kind: Service
metadata:
name: flink-ac-taskmanager-query-state
namespace: xx
spec:
type: NodePort
ports:
- name: query-state
port: 6125
targetPort: 6125
nodePort: 10013
selector:
app: flink-ac
component: taskmanager