spark-operator icon indicating copy to clipboard operation
spark-operator copied to clipboard

Issue when using SparkCluster with workers being unable to communicate with each other when running the example jar

Open jeynesrya opened this issue 6 years ago • 14 comments

Description:

I am using this spark operator in Openshift to create a Spark cluster which is successful and I am able to see all the workers connected to the master via the Spark UI however when I exec onto the master and run an example jar, the executors crash after 4 seconds of running with the error of UnknownHostException.

Steps:

  1. Deployed the Spark Operator onto OpenShift using version 1.0.2 of the spark operator and the openshift-spark:v2.4.0 docker image on quay.io
  2. Created a cluster using the following yaml:
apiVersion: radanalytics.io/v1
kind: SparkCluster
metadata:
  name: spark-cluster
  namespace: testing
spec:
  master:
     instances: "1"
  worker:
     instances: "4"
  env:
  - name: SPARK_WORKER_MEMORY
     value: 16g
  - name: SPARK_WORKER_CORES
     value: 8
  1. Cluster is created and workers have connected to master as shown from Spark UI
  2. Exec onto master: oc exec <master> -ti -- /bin/bash
  3. Run examples jar: spark-submit --deploy-mode cluster --master spark://<ip-of-master>:7077 --class org.apache.spark.examples.SparkPi --executor-memory 512M /opt/spark/examples/jars/spark-examples_2.11-2.4.0.jar 30
  4. Driver and executors are created
  5. The executors crash after 4 seconds with the errors of:
...
Caused by: java.io.IOException: Failed to connect to spark-cluster-w-xxxxx:42123
...
Caused by: java.net.UnknownHostException: spark-cluster-w-xxxxx
...

Happy to give any more information if needed.

jeynesrya avatar Oct 03 '19 18:10 jeynesrya

thanks for opening this issue @jeynesrya

i have a feeling this is happening because of the ambiguity for the workers connecting back to the master. i don't think we have tested this style of execution. usually we create driver applications that connect to the master and then perform the work that way.

there might be a way to make this work by using the service associated with the master pod as the driver host. i will try to replicate this issue to see if there might be a fix.

elmiko avatar Oct 03 '19 18:10 elmiko

@elmiko are there any updates on this?

jeynesrya avatar Oct 13 '19 19:10 jeynesrya

not yet @jeynesrya , i am going to try replicating your issue to see if perhaps i can see a resolution for this.

my best recommendation in the meantime is to use an external driver process if possible. that is to say, don't shell into the master pod and attempt to run drivers from there. i think that is a use case we haven't explored much.

elmiko avatar Oct 14 '19 13:10 elmiko

@elmiko so I have a little bit of an update on this. I did as you suggested, attempted to run drivers externally and it seems I get the same error. The pods can ping each other through the ip addresses but there aren't any services i.e. spark-cluster-w-4hbej7 to communicate to. Hopefully this makes sense, can attempt to explain further if needed.

jeynesrya avatar Oct 18 '19 22:10 jeynesrya

@jeynesrya hmm, sounds very odd indeed.

~could you share the manifest you are using to spawn the cluster, also how are you creating the driver to connect?~ sorry, forgot you posted it earlier XD

i will try to mirror what you are doing to see if i can reproduce the error.

elmiko avatar Oct 21 '19 12:10 elmiko

hey @jeynesrya , so i have done some testing and i do see the type of error you are seeing. i am not sure if it is the configuration of your driver or the cluster, but i have put together a little demo that can show you how to get this working. you will have to extrapolate how these techniques can be used with your specific use case though.

so, here's what i did.

i took this tutorial[0] and made an image that produces a sparkpi service, you can use this image to test on your end. then i deployed a cluster like yours into my openshift, i deployed the image connected to the cluster, and then curl'd the sparkpi service to confirm it works.

here's how to reproduce

  1. oc project testing
  2. deploy spark cluster with your manifest from above
  3. oc apply -f issue-252-deployment.yaml (included below)
  4. oc expose svc/sparkpi-service
  5. curl http://`oc get routes/sparkpi-service --template='{{.spec.host}}'`/sparkpi

the last curl command should exercise the spark cluster and return you an estimate for pi. i think the magic is all wrapped up in the services and how they are attached to the driver, also how the DRIVER_HOST is specified to the spark submit command. i'm happy to answer any questions about this, i hope it works on your end ;)

issue-252-deployment.yaml


kind: List
apiVersion: v1
metadata: {}

items:
- apiVersion: v1
  kind: DeploymentConfig
  metadata:
    name: sparkpi-service
    labels:
      deploymentConfig: sparkpi-service
      app: sparkpi-service
  spec:
    replicas: 1
    selector:
      deploymentConfig: sparkpi-service
    strategy:
      type: Rolling
    template:
      metadata:
        labels:
          deploymentConfig: sparkpi-service
          app: sparkpi-service
      spec:
        containers:
        - env:
          - name: DRIVER_HOST
            value: sparkpi-service-headless
          - name: APP_EXIT
            value: "true"
          - name: SPARK_URL_OVERRIDE
            value: "spark://spark-cluster:7077"
          - name: POD_NAME
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          image: quay.io/elmiko/sparkpi-service
          imagePullPolicy: Always
          name: sparkpi-service
          resources: {}
          terminationMessagePath: /dev/termination-log
          volumeMounts:
          - mountPath: /etc/podinfo
            name: podinfo
            readOnly: false
        dnsPolicy: ClusterFirst
        restartPolicy: Always
        volumes:
        - downwardAPI:
            items:
            - fieldRef:
                fieldPath: metadata.labels
              path: labels
          name: podinfo
    triggers:
    - type: ConfigChange
- apiVersion: v1
  kind: Service
  metadata:
    name: sparkpi-service
    labels:
      app: sparkpi-service
  spec:
    ports:
    - name: 8080-tcp
      port: 8080
      protocol: TCP
      targetPort: 8080
    selector:
      deploymentConfig: sparkpi-service
- apiVersion: v1
  kind: Service
  metadata:
    name: sparkpi-service-headless
    labels:
      app: sparkpi-service
  spec:
    clusterIP: None
    ports:
    - name: driver-rpc-port
      port: 7078
      protocol: TCP
      targetPort: 7078
    - name: blockmanager
      port: 7079
      protocol: TCP
      targetPort: 7079
    selector:
      deploymentConfig: sparkpi-service

[0] https://radanalytics.io/my-first-radanalytics-app.html

elmiko avatar Oct 21 '19 14:10 elmiko

Hi, @elmiko i am struggling on the same issue. I have followed the same approach above, oc project testing deploy spark cluster with your manifest from above oc apply -f issue-252-deployment.yaml (included below) oc expose svc/sparkpi-service curl http://oc get routes/sparkpi-service --template='{{.spec.host}}'/sparkpi

And, i have copied my custom jar into master pod path /tmp/xxx/ , and tried followed command.

spark-submit
--master spark://<IP>:7077
--deploy-mode cluster
--packages "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5"
--class JavaConsumer /tmp/xxx/JavaConsumer-1.0-SNAPSHOT-jar-with-dependencies.jar
--conf spark.executor.instances=2
--conf spark.kubernetes.namespace=testing
local:///tmp/xxx/JavaConsumer-1.0-SNAPSHOT-jar-with-dependencies.jar

It is still failing with ,

Caused by: java.io.IOException: Failed to connect to spark-cluster-m-p8b7v:36303 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-cluster-m-p8b7v

Can you please help.

anilkumarpanditi avatar Aug 16 '21 11:08 anilkumarpanditi

i have not looked at this code in a few years, but from what you posted i would wonder about this

--master spark://:7077

i think you will need a name in there for the spark cluster (eg --master spark://my-master:7077)

elmiko avatar Aug 16 '21 13:08 elmiko

@elmiko correct, sorry i made it blank here only, --master spark://Mater-IP:7077. Could you help, i am literally stuck here and could not move forward for 3 days.

spark-submit
--master spark://Master-IP:7077
--deploy-mode client
--packages "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5"
--class JavaConsumer /tmp/xx/JavaConsumer-1.0-SNAPSHOT-jar-with-dependencies.jar
--conf spark.executor.instances=2
--conf spark.kubernetes.namespace=testing
local:///tmp/xx/JavaConsumer-1.0-SNAPSHOT-jar-with-dependencies.jar

I come across one solution here, https://stackoverflow.com/questions/57205463/error-in-workers-when-running-spark-on-kubernetes

with following conf parameters, --conf spark.driver.bindAddress=0.0.0.0 --conf spark.driver.host=$MY_NODE_NAME --conf spark.driver.port=value --conf spark.driver.blockManager.port=value

Not sure, what values need to be given for these params, from our above issues-xxx.yaml

anilkumarpanditi avatar Aug 16 '21 15:08 anilkumarpanditi

as i mentioned earlier, i don't really use this code base anymore so my knowledge is limited. but, with that said, you will need to figure out why the workers are not connecting to spark-cluster-m-p8b7v. usually this means that the workers are trying to connect directly to the master pod, but they should be connecting to a service that is associated with the master. you might need to use name of the spark service you exposed as the master FQDN.

this is ultimately a problem where the workers think they should be talking to the master on spark-cluster-m-p8b7v, but this is not allowing connections.

as for the stackoverflow solution, --conf spark.driver.bindAddress=0.0.0.0 this value is fine

--conf spark.driver.host=$MY_NODE_NAME this is should be set to the service that is exposed on the driver application (the business logic part of the spark app)

--conf spark.driver.port=value --conf spark.driver.blockManager.port=value these values are usually selected randomly, so you would need to set it on the driver and in the cluster

you can see some examples of how i have used these values in the past in this blog post about python3 on radanalytics. hope this helps =)

elmiko avatar Aug 16 '21 18:08 elmiko

Hi @elmiko , thanks for keeping the radanalyticsio community vibrant and supportive. Regarding your suggestion to use (k8s/oc) svc over pod name/ip, I have a query. Let me quote from your response:

usually this means that the workers are trying to connect directly to the master pod, but they should be connecting to a service that is associated with the master.

Why workers are unable to connect directly to the master pod? Because as per the docs, we can access pods using <pod-name>.<namespace>.pod.cluster.local, so exposing master pod via service should be optional.

singh-abhijeet avatar Feb 08 '22 06:02 singh-abhijeet

@singh-abhijeet correct, k8s creates these domain names for each pod where all the dots in the IP are replaced with dashes + the suffix you've mentioned. The issue with that is that relying on pod IPs isn't good idea in kubernetes because these are not stable, but Spark was designed way before k8s so there is some discrepancy. Those names don't have to be that long, because /etc/resolv.conf contains the suffix as a sarch domain.. so 1-2-3-4.$ns.pod would do.

I was able to make the original issue/question working on lightweight k3d clustery by:

$ hostname -i
10.42.0.24
spark-submit --master spark://spark-cluster:7077 --conf spark.driver.host=10-42-0-24.default.pod.cluster.local --class org.apache.spark.examples.SparkPi --executor-memory 512M /opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar 30
....
3.14
full log
bash-4.2$ hostname -i
10.42.0.24
bash-4.2$
bash-4.2$ spark-submit --master spark://spark-cluster:7077 --conf spark.driver.host=10-42-0-24.default.pod.cluster.local --class org.apache.spark.examples.SparkPi --executor-memory 512M /opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar 30
22/02/08 11:06:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/08 11:06:03 INFO SparkContext: Running Spark version 2.4.5
22/02/08 11:06:03 INFO SparkContext: Submitted application: Spark Pi
22/02/08 11:06:03 INFO SecurityManager: Changing view acls to: 185
22/02/08 11:06:03 INFO SecurityManager: Changing modify acls to: 185
22/02/08 11:06:03 INFO SecurityManager: Changing view acls groups to:
22/02/08 11:06:03 INFO SecurityManager: Changing modify acls groups to:
22/02/08 11:06:03 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(185); groups with view permissions: Set(); users  with modify permissions: Set(185); groups with modify permissions: Set()
22/02/08 11:06:04 INFO Utils: Successfully started service 'sparkDriver' on port 39607.
22/02/08 11:06:04 INFO SparkEnv: Registering MapOutputTracker
22/02/08 11:06:04 INFO SparkEnv: Registering BlockManagerMaster
22/02/08 11:06:04 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
22/02/08 11:06:04 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
22/02/08 11:06:04 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-92143cfc-2f72-49b0-b2a7-f76b49f42c1e
22/02/08 11:06:04 INFO MemoryStore: MemoryStore started with capacity 413.9 MB
22/02/08 11:06:04 INFO SparkEnv: Registering OutputCommitCoordinator
22/02/08 11:06:05 INFO Utils: Successfully started service 'SparkUI' on port 4040.
22/02/08 11:06:05 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10-42-0-24.default.pod.cluster.local:4040
22/02/08 11:06:05 INFO SparkContext: Added JAR file:/opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar at spark://10-42-0-24.default.pod.cluster.local:39607/jars/spark-examples_2.11-2.4.5.jar with timestamp 1644318365200
22/02/08 11:06:05 INFO StandaloneAppClient$ClientEndpoint: Connecting to master spark://spark-cluster:7077...
22/02/08 11:06:05 INFO TransportClientFactory: Successfully created connection to spark-cluster/10.43.218.239:7077 after 79 ms (0 ms spent in bootstraps)
22/02/08 11:06:05 INFO StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20220208110605-0005
22/02/08 11:06:05 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20220208110605-0005/0 on worker-20220208105128-10.42.0.25-35277 (10.42.0.25:35277) with 1 core(s)
22/02/08 11:06:05 INFO StandaloneSchedulerBackend: Granted executor ID app-20220208110605-0005/0 on hostPort 10.42.0.25:35277 with 1 core(s), 512.0 MB RAM
22/02/08 11:06:05 INFO StandaloneAppClient$ClientEndpoint: Executor added: app-20220208110605-0005/1 on worker-20220208105128-10.42.0.26-34819 (10.42.0.26:34819) with 1 core(s)
22/02/08 11:06:05 INFO StandaloneSchedulerBackend: Granted executor ID app-20220208110605-0005/1 on hostPort 10.42.0.26:34819 with 1 core(s), 512.0 MB RAM
22/02/08 11:06:05 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 32829.
22/02/08 11:06:05 INFO NettyBlockTransferService: Server created on 10-42-0-24.default.pod.cluster.local:32829
22/02/08 11:06:05 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
22/02/08 11:06:05 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220208110605-0005/0 is now RUNNING
22/02/08 11:06:05 INFO StandaloneAppClient$ClientEndpoint: Executor updated: app-20220208110605-0005/1 is now RUNNING
22/02/08 11:06:05 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10-42-0-24.default.pod.cluster.local, 32829, None)
22/02/08 11:06:05 INFO BlockManagerMasterEndpoint: Registering block manager 10-42-0-24.default.pod.cluster.local:32829 with 413.9 MB RAM, BlockManagerId(driver, 10-42-0-24.default.pod.cluster.local, 32829, None)
22/02/08 11:06:05 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10-42-0-24.default.pod.cluster.local, 32829, None)
22/02/08 11:06:05 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10-42-0-24.default.pod.cluster.local, 32829, None)
22/02/08 11:06:06 INFO StandaloneSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
22/02/08 11:06:07 INFO SparkContext: Starting job: reduce at SparkPi.scala:38
22/02/08 11:06:07 INFO DAGScheduler: Got job 0 (reduce at SparkPi.scala:38) with 30 output partitions
22/02/08 11:06:07 INFO DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:38)
22/02/08 11:06:07 INFO DAGScheduler: Parents of final stage: List()
22/02/08 11:06:07 INFO DAGScheduler: Missing parents: List()
22/02/08 11:06:07 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no missing parents
22/02/08 11:06:07 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.0 KB, free 413.9 MB)
22/02/08 11:06:07 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1381.0 B, free 413.9 MB)
22/02/08 11:06:07 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10-42-0-24.default.pod.cluster.local:32829 (size: 1381.0 B, free: 413.9 MB)
22/02/08 11:06:07 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1163
22/02/08 11:06:07 INFO DAGScheduler: Submitting 30 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
22/02/08 11:06:07 INFO TaskSchedulerImpl: Adding task set 0.0 with 30 tasks
22/02/08 11:06:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.42.0.25:41764) with ID 0
22/02/08 11:06:10 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.42.0.25, executor 0, partition 0, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:10 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Registered executor NettyRpcEndpointRef(spark-client://Executor) (10.42.0.26:53944) with ID 1
22/02/08 11:06:10 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.42.0.26, executor 1, partition 1, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:10 INFO BlockManagerMasterEndpoint: Registering block manager 10.42.0.25:35629 with 117.0 MB RAM, BlockManagerId(0, 10.42.0.25, 35629, None)
22/02/08 11:06:10 INFO BlockManagerMasterEndpoint: Registering block manager 10.42.0.26:36881 with 117.0 MB RAM, BlockManagerId(1, 10.42.0.26, 36881, None)
22/02/08 11:06:11 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.42.0.25:35629 (size: 1381.0 B, free: 117.0 MB)
22/02/08 11:06:11 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.42.0.26:36881 (size: 1381.0 B, free: 117.0 MB)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 10.42.0.26, executor 1, partition 2, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, 10.42.0.25, executor 0, partition 3, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1219 ms on 10.42.0.25 (executor 0) (1/30)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 1160 ms on 10.42.0.26 (executor 1) (2/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 4.0 in stage 0.0 (TID 4, 10.42.0.26, executor 1, partition 4, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 49 ms on 10.42.0.26 (executor 1) (3/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 5.0 in stage 0.0 (TID 5, 10.42.0.25, executor 0, partition 5, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 110 ms on 10.42.0.25 (executor 0) (4/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 6.0 in stage 0.0 (TID 6, 10.42.0.26, executor 1, partition 6, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 82 ms on 10.42.0.26 (executor 1) (5/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 7.0 in stage 0.0 (TID 7, 10.42.0.25, executor 0, partition 7, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 5.0 in stage 0.0 (TID 5) in 90 ms on 10.42.0.25 (executor 0) (6/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 8.0 in stage 0.0 (TID 8, 10.42.0.26, executor 1, partition 8, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 6.0 in stage 0.0 (TID 6) in 34 ms on 10.42.0.26 (executor 1) (7/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 9.0 in stage 0.0 (TID 9, 10.42.0.25, executor 0, partition 9, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 7.0 in stage 0.0 (TID 7) in 41 ms on 10.42.0.25 (executor 0) (8/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 10.0 in stage 0.0 (TID 10, 10.42.0.26, executor 1, partition 10, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 8.0 in stage 0.0 (TID 8) in 35 ms on 10.42.0.26 (executor 1) (9/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 11.0 in stage 0.0 (TID 11, 10.42.0.25, executor 0, partition 11, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 9.0 in stage 0.0 (TID 9) in 25 ms on 10.42.0.25 (executor 0) (10/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 12.0 in stage 0.0 (TID 12, 10.42.0.26, executor 1, partition 12, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 24 ms on 10.42.0.26 (executor 1) (11/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 13.0 in stage 0.0 (TID 13, 10.42.0.25, executor 0, partition 13, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 26 ms on 10.42.0.25 (executor 0) (12/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 14.0 in stage 0.0 (TID 14, 10.42.0.26, executor 1, partition 14, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 12.0 in stage 0.0 (TID 12) in 38 ms on 10.42.0.26 (executor 1) (13/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 15.0 in stage 0.0 (TID 15, 10.42.0.25, executor 0, partition 15, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 13.0 in stage 0.0 (TID 13) in 27 ms on 10.42.0.25 (executor 0) (14/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 16.0 in stage 0.0 (TID 16, 10.42.0.26, executor 1, partition 16, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 14.0 in stage 0.0 (TID 14) in 29 ms on 10.42.0.26 (executor 1) (15/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 17.0 in stage 0.0 (TID 17, 10.42.0.25, executor 0, partition 17, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 15.0 in stage 0.0 (TID 15) in 33 ms on 10.42.0.25 (executor 0) (16/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 18.0 in stage 0.0 (TID 18, 10.42.0.26, executor 1, partition 18, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 16.0 in stage 0.0 (TID 16) in 25 ms on 10.42.0.26 (executor 1) (17/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 19.0 in stage 0.0 (TID 19, 10.42.0.25, executor 0, partition 19, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 17.0 in stage 0.0 (TID 17) in 41 ms on 10.42.0.25 (executor 0) (18/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 20.0 in stage 0.0 (TID 20, 10.42.0.26, executor 1, partition 20, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 18.0 in stage 0.0 (TID 18) in 33 ms on 10.42.0.26 (executor 1) (19/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 21.0 in stage 0.0 (TID 21, 10.42.0.25, executor 0, partition 21, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 19.0 in stage 0.0 (TID 19) in 34 ms on 10.42.0.25 (executor 0) (20/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 22.0 in stage 0.0 (TID 22, 10.42.0.26, executor 1, partition 22, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 20.0 in stage 0.0 (TID 20) in 37 ms on 10.42.0.26 (executor 1) (21/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 23.0 in stage 0.0 (TID 23, 10.42.0.25, executor 0, partition 23, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 21.0 in stage 0.0 (TID 21) in 39 ms on 10.42.0.25 (executor 0) (22/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 24.0 in stage 0.0 (TID 24, 10.42.0.26, executor 1, partition 24, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 22.0 in stage 0.0 (TID 22) in 33 ms on 10.42.0.26 (executor 1) (23/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 25.0 in stage 0.0 (TID 25, 10.42.0.26, executor 1, partition 25, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 24.0 in stage 0.0 (TID 24) in 26 ms on 10.42.0.26 (executor 1) (24/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 26.0 in stage 0.0 (TID 26, 10.42.0.25, executor 0, partition 26, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 23.0 in stage 0.0 (TID 23) in 45 ms on 10.42.0.25 (executor 0) (25/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 27.0 in stage 0.0 (TID 27, 10.42.0.26, executor 1, partition 27, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 25.0 in stage 0.0 (TID 25) in 27 ms on 10.42.0.26 (executor 1) (26/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 28.0 in stage 0.0 (TID 28, 10.42.0.25, executor 0, partition 28, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 26.0 in stage 0.0 (TID 26) in 25 ms on 10.42.0.25 (executor 0) (27/30)
22/02/08 11:06:11 INFO TaskSetManager: Starting task 29.0 in stage 0.0 (TID 29, 10.42.0.26, executor 1, partition 29, PROCESS_LOCAL, 7870 bytes)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 27.0 in stage 0.0 (TID 27) in 23 ms on 10.42.0.26 (executor 1) (28/30)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 28.0 in stage 0.0 (TID 28) in 32 ms on 10.42.0.25 (executor 0) (29/30)
22/02/08 11:06:11 INFO TaskSetManager: Finished task 29.0 in stage 0.0 (TID 29) in 28 ms on 10.42.0.26 (executor 1) (30/30)
22/02/08 11:06:11 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
22/02/08 11:06:11 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 4.493 s
22/02/08 11:06:11 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 4.640001 s
Pi is roughly 3.141907713969238
22/02/08 11:06:11 INFO SparkUI: Stopped Spark web UI at http://10-42-0-24.default.pod.cluster.local:4040
22/02/08 11:06:11 INFO StandaloneSchedulerBackend: Shutting down all executors
22/02/08 11:06:11 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
22/02/08 11:06:11 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/02/08 11:06:12 INFO MemoryStore: MemoryStore cleared
22/02/08 11:06:12 INFO BlockManager: BlockManager stopped
22/02/08 11:06:12 INFO BlockManagerMaster: BlockManagerMaster stopped
22/02/08 11:06:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/02/08 11:06:12 INFO SparkContext: Successfully stopped SparkContext
22/02/08 11:06:12 INFO ShutdownHookManager: Shutdown hook called
22/02/08 11:06:12 INFO ShutdownHookManager: Deleting directory /tmp/spark-6c20d3ae-ee78-4018-bf61-1f4bd5aa614d
22/02/08 11:06:12 INFO ShutdownHookManager: Deleting directory /tmp/spark-03c55c0a-2f18-4188-bf81-89aaa1b64c98

so the spark.driver.host needs to be specified to something reachable from workers and unfortunatelly the service for master dosn't work, on the other hand it works for the URL that's used for submit (weirdly enough). If you want to stay on the save side you can have something more general like this:

spark-submit --master spark://spark-cluster:7077 --conf spark.driver.host=$(hostname -i | tr '.' '-').$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).pod --class org.apache.spark.examples.SparkPi --executor-memory 512M /opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar 30

so full repro steps:

  1. k3d cluster create
  2. kubectl apply -f http://bit.ly/sparkop
cat <<CLUSTER | kubectl apply -f -
apiVersion: radanalytics.io/v1
kind: SparkCluster
metadata:
  name: spark-cluster
spec:
  master:
     instances: "1"
     memory: "1Gi"
  worker:
     instances: "2"
     memory: "1Gi"
CLUSTER
  1. k exec k get pods -lradanalytics.io/podType=master -oname -ti -- bash

spark-submit --master spark://spark-cluster:7077 --conf spark.driver.host=$(hostname -i | tr '.' '-').$(cat /var/run/secrets/kubernetes.io/serviceaccount/namespace).pod --class org.apache.spark.examples.SparkPi --executor-memory 512M /opt/spark/examples/jars/spark-examples_2.11-2.4.5.jar 30

6 ...

  1. profit :)

jkremser avatar Feb 08 '22 11:02 jkremser

thanks for adding some details Jirka :wave:

Why workers are unable to connect directly to the master pod? Because as per the docs, we can access pods using ..pod.cluster.local, so exposing master pod via service should be optional.

this really depends on your configuration. one some cases the URL that is received by the spark workers does not match any address in the kube DNS, and sometimes it is an incorrect IP address. i have seen both happen. so, as per Jirka's example, you can make this work with the cluster DNS if you match all the values properly. but, my suggestion to use services for these things is to help make this pattern more repeatable in the future by using well known interface names.

there are several ways to make these connections work, you could even go so far as to get the raw IP addresses for the pods and plumb those values through, but this will be very brittle to maintain.

elmiko avatar Feb 08 '22 18:02 elmiko

@jkremser Yes, Pod IP can change and relying on a DNS based on IP defeats the purpose of DNS in first place!

K8s also provides DNS based on pod name, I was able to use <pod_name>.<deployment_name>.<namespace> as driver.host. Since, I'm using SparkLauncher API for submitting spark jobs, using Linux cmds as a part of spark-submit arg is not an option.

My working spark props (client mode):

spark.driver.host=spark-app-driver-pod.spark-deployment.default
spark.kubernetes.driver.pod.name=spark-app-driver-pod

singh-abhijeet avatar Feb 17 '22 06:02 singh-abhijeet