目录
  1. 1 背景
  2. 2 作业提交
    1. 2.1 通过原生方式提交作业
    2. 2.2 通过operator提交作业
  3. 3 Driver的启动
  4. 4 Executor的启动
  5. 5 作业的退出
  6. 参考资料

1 背景

​  Spark 2.3开始,Spark官方就开始支持Kubernetes作为新的资源调度模式(除此之外还有Standalone、Mesos、YARN)。在K8S上运行Spark降低了集群运维成本,而且可以和其他应用混合部署来提高机器利用率,因此越来越多的用户开始尝试在Kubernetes上运行Spark作业。

​  直到Spark3.1之前,Spark on Kubernetes仍然被官方定义为experimental,说明还并不成熟。和其他模式不同,没有常驻的节点资源,因此用户在使用时,对于依赖的管理、环境的配置、日志和监控都需要一些新的方式。目前在k8s上运行Spark,生产环境里使用的比例并不大。但kubernetes作为云原生的基石,Spark在k8s上运行已经是大势所趋。

​  本文以一个wordCount作业(org.apache.spark.examples.JavaWordCount)为例,分析一下Spark作业在Kubernetes上执行的流程。代码参考自Spark 2.4.5 。

​  过程中会列举一些源码等信息,可能有些枯燥,建议结合下面我画的这张图来整体理解。

2 作业提交

​  Spark官方提供了通过spark-submit提交k8s作业的方式[1]。不过在实践中,很多人会选择Google开源的spark-on-k8s-operator[2],这个项目在官方提交方式的基础上,又封装了一层通过kubernetes CRD提交的模式。下面会分别介绍下。

2.1 通过原生方式提交作业

​  Spark on Kubernetes官方文档提供的作业提交方式,是通过一个拥有本地Spark环境的client,执行bin/spark-submit来提交作业。这个client可以在k8s集群外也可以是k8s集群内的一个pod,通常会把它作为gateway单独用于作业提交。这种方式看起来和其他调度模式差别不大,只不过在参数中需要指定k8s的apiserver地址、镜像地址等一系列k8s独有的配置信息。

img

例如,我们提交一个wordCount作业的方式如下(占位符替换成相应的地址),在Cliet节点执行:

1
2
3
4
5
6
7
$ bin/spark-submit \
--master k8s://https://{k8s-apiserver-host}:6443 \
--deploy-mode cluster \
--name spark-wordcount-example \
--class org.apache.spark.examples.JavaWordCount \
local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar \
oss://{wordcount-file-oss-bucket}/

执行之后,会在Client节点启动一个java进程,主方法是org.apache.spark.deploy.SparkSubmit:

1
2
3
4
5
6
$ ps -ef
UID PID PPID C STIME TTY STAT TIME CMD
root 216 7 53 16:24 pts/0 Sl+ 0:03 /usr/lib/jvm/java-1.8.0-openjdk-1.8.0/jre/bin/java -cp /opt
/spark/conf/:/opt/spark/jars/* org.apache.spark.deploy.SparkSubmit --master k8s://https://{k8s-apiserver-host}:6443 --deploy-mode cluster --clas
s org.apache.spark.examples.JavaWordCount --name spark-wordcount-example local:///opt/spark/examples/target/scala-2.11/jars/spark-example
s_2.11-2.4.5.jar oss://{wordcount-file-oss-bucket}/

此时可以直接看到kubernetes集群中,pod已经启动了:

1
2
3
4
5
$ kubectl get pod
NAME READY STATUS RESTARTS AGE
spark-wordcount-example-1628927658927-driver 1/1 Running 0 7s
spark-wordcount-example-1628927658927-exec-1 1/1 Running 0 2s
spark-wordcount-example-1628927658927-exec-2 1/1 Running 0 2s

其中的Driver Pod,就是上面的SparkSubmit进程来启动的,为了进一步看启动细节的源码,我们此时jstack可以看到调用栈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
"main" #1 prio=5 os_prio=0 tid=0x00007f0e28051800 nid=0x8c waiting on condition [0x00007f0e2ee81000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000000b2282088> (a java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
at org.apache.spark.deploy.k8s.submit.LoggingPodStatusWatcherImpl.awaitCompletion(LoggingPodStatusWatcher.scala:138)
at org.apache.spark.deploy.k8s.submit.Client$$anonfun$run$2.apply(KubernetesClientApplication.scala:155)
at org.apache.spark.deploy.k8s.submit.Client$$anonfun$run$2.apply(KubernetesClientApplication.scala:140)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2545)
at org.apache.spark.deploy.k8s.submit.Client.run(KubernetesClientApplication.scala:140)
at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication$$anonfun$run$5.apply(KubernetesClientApplication.scala:250)
at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication$$anonfun$run$5.apply(KubernetesClientApplication.scala:241)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2545)
at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.run(KubernetesClientApplication.scala:241)
at org.apache.spark.deploy.k8s.submit.KubernetesClientApplication.start(KubernetesClientApplication.scala:204)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:856)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:931)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:940)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

通过调用栈,我们重点看一下下面几个Class的源码

最后,我们再看一下Driver Pod的完整描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
apiVersion: v1
kind: Pod
metadata:
labels:
spark-app-selector: spark-4977cde55f33498d8cc34dcf80589ba0
spark-role: driver
name: spark-wordcount-example-1628999880800-driver
spec:
containers:
- args:
- driver
- '--properties-file'
- /opt/spark/conf/spark.properties
- '--class'
- org.apache.spark.examples.JavaWordCount
- spark-internal
- 'oss://{wordcount-file-oss-bucket}/'
env:
- name: SPARK_DRIVER_BIND_ADDRESS
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: SPARK_CONF_DIR
value: /opt/spark/conf
image: '{spark镜像地址}'
imagePullPolicy: IfNotPresent
name: spark-kubernetes-driver
ports:
- containerPort: 7078
name: driver-rpc-port
protocol: TCP
- containerPort: 7079
name: blockmanager
protocol: TCP
- containerPort: 4040
name: spark-ui
protocol: TCP
resources:
limits:
memory: 1408Mi
requests:
cpu: '1'
memory: 1408Mi
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/run/secrets/kubernetes.io/serviceaccount
name: spark-token-nq9p4
readOnly: true
serviceAccount: spark
serviceAccountName: spark
volumes:
- configMap:
defaultMode: 420
name: spark-wordcount-example-1628999880800-driver-conf-map
name: spark-conf-volume
- name: spark-token-nq9p4
secret:
defaultMode: 420
secretName: spark-token-nq9p4

2.2 通过operator提交作业

​  spark-on-k8s-operator[2],可以让用户以CRD(CustomResourceDefinition) [4] 的方式提交和管理Spark作业。这种方式能够更好的利用k8s原生的能力,具备更好的扩展性。并且在此之上增加了定时任务、重试、监控等一系列功能。具体的功能特性可以在github查看官方文档。

​  这种operator模式[5] 也是kubernetes官方推荐的一种部署复杂应用的模式,用来构建和管理特定的应用程序:每一种应用都可以设计自己的CRD,然后通过编写自定义的Controller来监听CRD的变更,实现应用部署的具体逻辑。

  使用spark-on-k8s-operator,需要提前在k8s集群中安装,此时会启动一个名为sparkoperator的pod。提交作业时,无需准备一个具备Spark环境的Client,直接通过kubectl或者kubernetes api就可以提交Spark作业。spark-on-k8s-operator提供了两个CRD定义,SparkApplication和ScheduledSparkApplication,分别对应了Spark作业和定时任务。提交、查看、删除作业也就变成了对这个CRD的apply、get、delete操作。

例如,和上文一样提交一个wordCount作业,需要准备一个wordcount.yaml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-wordcount-example
namespace: default
spec:
type: Java
sparkVersion: 2.4.5
mainClass: org.apache.spark.examples.JavaWordCount
image: {Spark镜像地址}
mainApplicationFile: "local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar"
arguments:
- "oss://{wordcount-file-oss-bucket}/"
driver:
cores: 1
coreLimit: 1000m
memory: 4g
executor:
cores: 1
coreLimit: 1000m
memory: 4g
memoryOverhead: 1g
instances: 2

然后通过kubectl执行:

1
$ kubectl apply -f wordcount.yaml

执行过后,spark-on-k8s-operator会监听到新创建的SparkApplication,然后通过触发spark-submit创建Driver Pod,接着运行作业。

为了能清晰的看到operator内部流程,我们在operator做spark-submit的代码处做一下PrintStack(operator是golang写的,和java还略有不同):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
goroutine 113 [running]:
runtime/debug.Stack(0x15, 0x0, 0x0)
/usr/local/go/src/runtime/debug/stack.go:24 +0x9d
runtime/debug.PrintStack()
/usr/local/go/src/runtime/debug/stack.go:16 +0x22
github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.runSparkSubmit()
/workspace/pkg/controller/sparkapplication/submission.go:69 +0x25e
github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.(*Controller).submitSparkApplication()
/workspace/pkg/controller/sparkapplication/controller.go:700 +0xc3f
github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.(*Controller).syncSparkApplication()
/workspace/pkg/controller/sparkapplication/controller.go:555 +0xa2c
github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.(*Controller).processNextItem()
/workspace/pkg/controller/sparkapplication/controller.go:264 +0x1e8
github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.(*Controller).runWorker()
/workspace/pkg/controller/sparkapplication/controller.go:248 +0x63
k8s.io/apimachinery/pkg/util/wait.JitterUntil.func1()
/go/pkg/mod/k8s.io/apimachinery@v0.16.6/pkg/util/wait/wait.go:152 +0x5f
k8s.io/apimachinery/pkg/util/wait.JitterUntil()
/go/pkg/mod/k8s.io/apimachinery@v0.16.6/pkg/util/wait/wait.go:153 +0xf8
k8s.io/apimachinery/pkg/util/wait.Until()
/go/pkg/mod/k8s.io/apimachinery@v0.16.6/pkg/util/wait/wait.go:88 +0x4d
created by github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/controller/sparkapplication.(*Controller).Start
/workspace/pkg/controller/sparkapplication/controller.go:166 +0xfe

按照调用栈来看提交作业的流程:

还有一个功能值得关注,SparkApplication的yaml里,定义了很多k8s相关的spec,如env、volumeMounts、podAntiAffinity、tolerations等等,而这些spec并不能被原生的spark-submit提交方式支持(尤其是指Spark2.x)。因此spark operator还实现了一个Mutating Admission Webhook,这是Kubernetes的Dynamic Admission Control [7]的一种,它可以拦截Kubernetes API请求,并回调到指定的http地址,在回调的mutating webhook里可以任意修改处理的资源对象,这样就可以实现对Driver和Executor Pod的一些自定义配置。

spark operator为了实现这个webhook,在helm安装时将operator暴露一个service,然后通过配置一个MutatingWebhookConfiguration指向/webhook路径。这样所有的k8s资源对象请求都会被回调到这里处理,operator会按需进行配置。

3 Driver的启动

​  在上文中我们理清了Driver Pod启动的yaml,那么Pod拉起之后会做什么事情呢。我们知道,Docker镜像run的时候,会触发ENTRYPOINT或者CMD的命令,作为容器运行的主进程。Spark镜像的ENTRYPOINT是/opt/entrypoint.sh,driver模式下里面的内容基本就是把arg参数传递给/bin/spark-submit,然后指定以client模式再次启动一个SparkSubmit进程

1
2
## 通过ps -ef,可以看到Driver Pod的Java进程启动参数为
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0/jre/bin/java -cp /opt/spark/jars/* -Xmx4g org.apache.spark.deploy.SparkSubmit --deploy-mode client --conf spark.driver.bindAddress=xx.xx.xx.xx --properties-file /opt/spark/conf/spark.properties --class org.apache.spark.examples.JavaWordCount local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.5.jar oss://{wordcount-file-oss-bucket}/

​  启动SparkSubmit的源码,和上文分析的一样,只不过这次是以client模式提交的,所以不再会调用到org.apache.spark.deploy.k8s.submit.KubernetesClientApplication,而是直接调用–class后面的作业Class的main方法,在我们的例子中就是直接执行org.apache.spark.examples.JavaWordCount。

4 Executor的启动

​  Executor是在SparkContext初始化时创建的。我们继续用调用栈的方式,来看一下Driver触发Executor创建时的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
"main" #1 prio=5 os_prio=0 tid=0x00007f9258051800 nid=0x376 in Object.wait() [0x00007f925e88f000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000000d5b23770> (a org.apache.spark.scheduler.TaskSchedulerImpl)
at org.apache.spark.scheduler.TaskSchedulerImpl.waitBackendReady(TaskSchedulerImpl.scala:821)
- locked <0x00000000d5b23770> (a org.apache.spark.scheduler.TaskSchedulerImpl)
at org.apache.spark.scheduler.TaskSchedulerImpl.postStartHook(TaskSchedulerImpl.scala:196)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:562)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2537)
- locked <0x00000000d5950988> (a java.lang.Object)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:959)
at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:950)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:950)
- locked <0x00000000d59509e8> (a org.apache.spark.sql.SparkSession$)
- locked <0x00000000d5950a08> (a org.apache.spark.sql.SparkSession$Builder)
at org.apache.spark.examples.JavaWordCount.main(JavaWordCount.java:43)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:856)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:931)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:940)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

来看一下Executor Pod的具体yaml,以及启动的JVM进程参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
apiVersion: v1
kind: Pod
metadata:
labels:
spark-app-selector: spark-2480ba0e4b544209bafa0ddd553700f3
spark-exec-id: '1'
spark-role: executor
name: javawordcount-1629452540505-exec-1
namespace: c-18eecfc5d78446c7
ownerReferences:
- apiVersion: v1
controller: true
kind: Pod
name: spark-word-count-driver
spec:
containers:
- args:
- executor
env:
- name: SPARK_DRIVER_URL
value: >-
spark://CoarseGrainedScheduler@spark-word-count-279b297b62f07ab8-driver-svc.c-18eecfc5d78446c7.svc:7078
- name: SPARK_EXECUTOR_CORES
value: '1'
- name: SPARK_EXECUTOR_MEMORY
value: 8g
- name: SPARK_APPLICATION_ID
value: spark-2480ba0e4b544209bafa0ddd553700f3
- name: SPARK_EXECUTOR_ID
value: '1'
- name: SPARK_EXECUTOR_POD_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
image: '{spark镜像地址}'
imagePullPolicy: IfNotPresent
name: executor
ports:
- containerPort: 7079
name: blockmanager
protocol: TCP
resources:
limits:
cpu: '1'
memory: 9Gi
requests:
cpu: '1'
memory: 9Gi
volumeMounts:
- mountPath: /var/run/secrets/kubernetes.io/serviceaccount
name: default-token-bvgh7
readOnly: true
serviceAccount: default
serviceAccountName: default
volumes:
- name: default-token-bvgh7
secret:
defaultMode: 420
secretName: default-token-bvgh7
1
2
3
$ ps -ef
UID PID PPID C STIME TTY STAT TIME CMD
root 15 1 53 17:51 ? Sl 0:07 /bin/java -Xms8g -Xmx8g -cp :/opt/spark/jars/* org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@spark-word-count-279b297b62f07ab8-driver-svc.c-18eecfc5d78446c7.svc:7078 --executor-id 2 --cores 1 --app-id spark-d79090c58b514cc2b7112ea319983b5c --hostname 10.100.3.67

从entrypoint脚本可以看到,Executor模式下,启动的主类是org.apache.spark.executor.CoarseGrainedExecutorBackend(和Standalone/YARN模式一样)。启动逻辑如下:

为了直观的看到Executor的网络情况,我们在作业执行过程中,在Executor Pod里执行一下netstat。在本例中,7078是Driver rpc的端口,7079是Driver的blockManager端口,Executor的server由于没有指定端口,分配到了39311。

1
2
3
4
5
6
7
$ netstat -at -W
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address Foreign Address State
tcp6 0 0 javawordcount-1629538339101-exec-1:39311 [::]:* LISTEN
tcp6 0 0 javawordcount-1629538339101-exec-1:51370 10-100-3-70.spark-word-count-279b297b62f07ab8-driver-svc.c-18eecfc5d78446c7.svc.cluster.local:7078 TIME_WAIT
tcp6 0 0 javawordcount-1629538339101-exec-1:47044 10-100-3-70.spark-word-count-279b297b62f07ab8-driver-svc.c-18eecfc5d78446c7.svc.cluster.local:7079 ESTABLISHED
tcp6 0 0 javawordcount-1629538339101-exec-1:51374 10-100-3-70.spark-word-count-279b297b62f07ab8-driver-svc.c-18eecfc5d78446c7.svc.cluster.local:7078 ESTABLISHED

5 作业的退出

​  Spark作业结束时,Driver进程正常退出,Driver Pod的状态会置为Succeeded;如果出现异常退出(exitCode>0),Driver Pod状态会变成Failed。在这两种状态下,Driver Pod都不会被真正删除,此时可以看到状态和日志,但不会占用集群资源。而Executor会在SparkContext关闭时,被Driver主动删除,因此如果想保留Executor日志不删除Pod,需要配置spark.kubernetes.executor.deleteOnTermination=false。对于一些额外资源,例如Driver的service、configmap,由于在创建之初都绑定了Driver Pod的ownerReference,会在Driver Pod真正删除时才清理。

​  如果在作业运行中想要终止作业,一种方式是直接通过k8s删除Driver Pod,另一种方式是执行spark-submit –kill(Spark3才支持)。直接删除Driver Pod的话,由于各种资源包括Executor Pod都设置了ownerReference,所以作业所有相关资源都会回收。

参考资料

[1] Running Spark on Kubernetes - Spark 3.1.2 Documentation

[2] GoogleCloudPlatform/spark-on-k8s-operator: Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes.

[3] Using Watch with the Kubernetes API | Baeldung

[4] Custom Resources | Kubernetes

[5] Operator pattern | Kubernetes

[6] Bitnami Engineering: A deep dive into Kubernetes controllers

[7] Dynamic Admission Control | Kubernetes