DataprocCreateClusterOperator doesn't read softwareConfig properties as it should
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow-providers-google==10.14.0
Apache Airflow version
2.6.3
Operating System
PRETTY_NAME="Ubuntu 22.04.3 LTS" NAME="Ubuntu" VERSION_ID="22.04" VERSION="22.04.3 LTS (Jammy Jellyfish)" VERSION_CODENAME=jammy ID=ubuntu ID_LIKE=debian HOME_URL="https://www.ubuntu.com/" SUPPORT_URL="https://help.ubuntu.com/" BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/" PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy" UBUNTU_CODENAME=jammy
Deployment
Google Cloud Composer
Deployment details
No response
What happened
DataprocCreateClusterOperator works well as long as you do not specify software_config.properties in cluster_config when creating a dataproc cluster.
Indeed, since cluster_config is a template_field,
"properties": {
"capacity-scheduler:yarn.scheduler.capacity.maximum-am-resource-percent": "0.5",
"capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy": "fair",
...
"core:fs.gs.metadata.cache.enable": "false",
}
becomes
"properties": {
"capacity-scheduler:yarn.scheduler.capacity.maximum-am-resource-percent": 0.5,
"capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy": "fair",
...
"core:fs.gs.metadata.cache.enable": "false",
}
But here is what I get when the DataprocCreateClusterOperator gets executed:
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 744, in execute
operation = self._create_cluster(hook)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 655, in _create_cluster
return hook.create_cluster(
^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/common/hooks/base_google.py", line 482, in inner_wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 327, in create_cluster
result = client.create_cluster(
^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/google/cloud/dataproc_v1/services/cluster_controller/client.py", line 606, in create_cluster
request = clusters.CreateClusterRequest(request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/proto/message.py", line 581, in __init__
pb_value = marshal.to_proto(pb_type, value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/marshal.py", line 228, in to_proto
pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/rules/message.py", line 41, in to_proto
return self._wrapper(value)._pb
^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/proto/message.py", line 581, in __init__
pb_value = marshal.to_proto(pb_type, value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/marshal.py", line 228, in to_proto
pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/rules/message.py", line 41, in to_proto
return self._wrapper(value)._pb
^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/proto/message.py", line 581, in __init__
pb_value = marshal.to_proto(pb_type, value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/marshal.py", line 228, in to_proto
pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/rules/message.py", line 41, in to_proto
return self._wrapper(value)._pb
^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/proto/message.py", line 615, in __init__
super().__setattr__("_pb", self._meta.pb(**params))
^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen _collections_abc>", line 949, in update
TypeError: bad argument type for built-in operation
Because properties is meant to be a MutableMapping[str, str] according to the documentation
I dirty patched it using my own custom operator with
template_fields = tuple(field for field in DataprocCreateClusterOperator.template_fields if field != "cluster_config")
What you think should happen instead
String in properties should remain strings and not be converted into numerical values
How to reproduce
Create a DataprocCreateClusterOperator and use a config such as
{
"gce_cluster_config": {
"zone_uri": "https://www.googleapis.com/compute/v1/projects/dedge-stg-datafusion/zones/europe-west1-d",
"subnetwork_uri": "https://www.googleapis.com/compute/v1/projects/dedge-stg-shared-backbone/regions/europe-west1/subnetworks/shared-default-subnet",
"internal_ip_only": True,
"service_account_scopes": [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/cloud.useraccounts.readonly",
"https://www.googleapis.com/auth/devstorage.read_write",
"https://www.googleapis.com/auth/logging.write",
],
"metadata": {
"VmDnsSetting": "ZonalPreferred",
},
"shielded_instance_config": {
"enable_secure_boot": False,
"enable_vtpm": False,
"enable_integrity_monitoring": False,
}
},
"master_config": {
"num_instances": 1,
"image_uri": "https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-2-0-deb10-20240306-155100-rc01",
"machine_type_uri": "https://www.googleapis.com/compute/v1/projects/dedge-stg-datafusion/zones/europe-west1-d/machineTypes/e2-custom-2-8192",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 100
},
"preemptibility": "NON_PREEMPTIBLE",
"min_cpu_platform": "AUTOMATIC",
},
"worker_config": {
"num_instances": 2,
"image_uri": "https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-2-0-deb10-20240306-155100-rc01",
"machine_type_uri": "https://www.googleapis.com/compute/v1/projects/dedge-stg-datafusion/zones/europe-west1-d/machineTypes/e2-custom-2-8192",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 200
},
"preemptibility": "NON_PREEMPTIBLE",
"min_cpu_platform": "AUTOMATIC"
},
"software_config": {
"image_version": "2.0.95-debian10",
"properties": {
"capacity-scheduler:yarn.scheduler.capacity.maximum-am-resource-percent": "0.5",
"capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy": "fair",
"core:fs.gs.block.size": "134217728",
"core:fs.gs.metadata.cache.enable": "false",
"core:hadoop.ssl.enabled.protocols": "TLSv1,TLSv1.1,TLSv1.2",
"dataproc:dataproc.allow.zero.workers": "true",
"dataproc:dataproc.conscrypt.provider.enable": "false",
"dataproc:dataproc.logging.stackdriver.enable": "false",
"dataproc:dataproc.monitoring.stackdriver.enable": "false",
"distcp:mapreduce.map.java.opts": "-Xmx576m",
"distcp:mapreduce.map.memory.mb": "768",
"distcp:mapreduce.reduce.java.opts": "-Xmx576m",
"distcp:mapreduce.reduce.memory.mb": "768",
"hadoop-env:HADOOP_DATANODE_OPTS": "-Xmx512m",
"hdfs:dfs.datanode.address": "0.0.0.0:9866",
"hdfs:dfs.datanode.http.address": "0.0.0.0:9864",
"hdfs:dfs.datanode.https.address": "0.0.0.0:9865",
"hdfs:dfs.datanode.ipc.address": "0.0.0.0:9867",
"hdfs:dfs.namenode.handler.count": "20",
"hdfs:dfs.namenode.http-address": "0.0.0.0:9870",
"hdfs:dfs.namenode.https-address": "0.0.0.0:9871",
"hdfs:dfs.namenode.secondary.http-address": "0.0.0.0:9868",
"hdfs:dfs.namenode.secondary.https-address": "0.0.0.0:9869",
"hdfs:dfs.namenode.service.handler.count": "10",
"hive:hive.fetch.task.conversion": "none",
"mapred-env:HADOOP_JOB_HISTORYSERVER_HEAPSIZE": "2048",
"mapred:mapreduce.job.maps": "9",
"mapred:mapreduce.job.reduce.slowstart.completedmaps": "0.95",
"mapred:mapreduce.job.reduces": "3",
"mapred:mapreduce.jobhistory.recovery.store.class": "org.apache.hadoop.mapreduce.v2.hs.HistoryServerLeveldbStateStoreService",
"mapred:mapreduce.map.cpu.vcores": "1",
"mapred:mapreduce.map.java.opts": "-Xmx2621m",
"mapred:mapreduce.map.memory.mb": "3277",
"mapred:mapreduce.reduce.cpu.vcores": "1",
"mapred:mapreduce.reduce.java.opts": "-Xmx2621m",
"mapred:mapreduce.reduce.memory.mb": "3277",
"mapred:mapreduce.task.io.sort.mb": "256",
"mapred:yarn.app.mapreduce.am.command-opts": "-Xmx2621m",
"mapred:yarn.app.mapreduce.am.resource.cpu-vcores": "1",
"mapred:yarn.app.mapreduce.am.resource.mb": "3277",
"spark-env:SPARK_DAEMON_MEMORY": "2048m",
"spark:spark.default.parallelism": "32",
"spark:spark.driver.maxResultSize": "1024m",
"spark:spark.driver.memory": "2048m",
"spark:spark.executor.cores": "1",
"spark:spark.executor.instances": "2",
"spark:spark.executor.memory": "2893m",
"spark:spark.executorEnv.OPENBLAS_NUM_THREADS": "1",
"spark:spark.metrics.namespace": "spark",
"spark:spark.scheduler.mode": "FAIR",
"spark:spark.sql.adaptive.coalescePartitions.initialPartitionNum": "128",
"spark:spark.sql.cbo.enabled": "true",
"spark:spark.ui.port": "0",
"spark:spark.yarn.am.memory": "640m",
"yarn-env:YARN_NODEMANAGER_HEAPSIZE": "819",
"yarn-env:YARN_RESOURCEMANAGER_HEAPSIZE": "2048",
"yarn-env:YARN_TIMELINESERVER_HEAPSIZE": "2048",
"yarn:yarn.nodemanager.address": "0.0.0.0:8026",
"yarn:yarn.nodemanager.delete.debug-delay-sec": "86400",
"yarn:yarn.nodemanager.pmem-check-enabled": "false",
"yarn:yarn.nodemanager.resource.cpu-vcores": "2",
"yarn:yarn.nodemanager.resource.memory-mb": "6554",
"yarn:yarn.nodemanager.vmem-check-enabled": "false",
"yarn:yarn.resourcemanager.decommissioning-nodes-watcher.decommission-if-no-shuffle-data": "true",
"yarn:yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs": "86400",
"yarn:yarn.scheduler.maximum-allocation-mb": "6554",
"yarn:yarn.scheduler.minimum-allocation-mb": "1"
}
}
}
Anything else
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Hi @Tchopane, I was trying to reproduce your issue, both in airflow composer and locally from dev environment with the following config where I copied your software_config values.
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
},
"secondary_worker_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 32,
},
"is_preemptible": True,
"preemptibility": "PREEMPTIBLE",
},
"software_config": {
"image_version": "2.0.95-debian10",
"properties": {
"capacity-scheduler:yarn.scheduler.capacity.maximum-am-resource-percent": "0.5",
"capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy": "fair",
"core:fs.gs.block.size": "134217728",
"core:fs.gs.metadata.cache.enable": "false",
"core:hadoop.ssl.enabled.protocols": "TLSv1,TLSv1.1,TLSv1.2",
"dataproc:dataproc.allow.zero.workers": "true",
"dataproc:dataproc.conscrypt.provider.enable": "false",
"dataproc:dataproc.logging.stackdriver.enable": "false",
"dataproc:dataproc.monitoring.stackdriver.enable": "false",
"distcp:mapreduce.map.java.opts": "-Xmx576m",
"distcp:mapreduce.map.memory.mb": "768",
"distcp:mapreduce.reduce.java.opts": "-Xmx576m",
"distcp:mapreduce.reduce.memory.mb": "768",
"hadoop-env:HADOOP_DATANODE_OPTS": "-Xmx512m",
"hdfs:dfs.datanode.address": "0.0.0.0:9866",
"hdfs:dfs.datanode.http.address": "0.0.0.0:9864",
"hdfs:dfs.datanode.https.address": "0.0.0.0:9865",
"hdfs:dfs.datanode.ipc.address": "0.0.0.0:9867",
"hdfs:dfs.namenode.handler.count": "20",
"hdfs:dfs.namenode.http-address": "0.0.0.0:9870",
"hdfs:dfs.namenode.https-address": "0.0.0.0:9871",
"hdfs:dfs.namenode.secondary.http-address": "0.0.0.0:9868",
"hdfs:dfs.namenode.secondary.https-address": "0.0.0.0:9869",
"hdfs:dfs.namenode.service.handler.count": "10",
"hive:hive.fetch.task.conversion": "none",
"mapred-env:HADOOP_JOB_HISTORYSERVER_HEAPSIZE": "2048",
"mapred:mapreduce.job.maps": "9",
"mapred:mapreduce.job.reduce.slowstart.completedmaps": "0.95",
"mapred:mapreduce.job.reduces": "3",
"mapred:mapreduce.jobhistory.recovery.store.class": "org.apache.hadoop.mapreduce.v2.hs.HistoryServerLeveldbStateStoreService",
"mapred:mapreduce.map.cpu.vcores": "1",
"mapred:mapreduce.map.java.opts": "-Xmx2621m",
"mapred:mapreduce.map.memory.mb": "3277",
"mapred:mapreduce.reduce.cpu.vcores": "1",
"mapred:mapreduce.reduce.java.opts": "-Xmx2621m",
"mapred:mapreduce.reduce.memory.mb": "3277",
"mapred:mapreduce.task.io.sort.mb": "256",
"mapred:yarn.app.mapreduce.am.command-opts": "-Xmx2621m",
"mapred:yarn.app.mapreduce.am.resource.cpu-vcores": "1",
"mapred:yarn.app.mapreduce.am.resource.mb": "3277",
"spark-env:SPARK_DAEMON_MEMORY": "2048m",
"spark:spark.default.parallelism": "32",
"spark:spark.driver.maxResultSize": "1024m",
"spark:spark.driver.memory": "2048m",
"spark:spark.executor.cores": "1",
"spark:spark.executor.instances": "2",
"spark:spark.executor.memory": "2893m",
"spark:spark.executorEnv.OPENBLAS_NUM_THREADS": "1",
"spark:spark.metrics.namespace": "spark",
"spark:spark.scheduler.mode": "FAIR",
"spark:spark.sql.adaptive.coalescePartitions.initialPartitionNum": "128",
"spark:spark.sql.cbo.enabled": "true",
"spark:spark.ui.port": "0",
"spark:spark.yarn.am.memory": "640m",
"yarn-env:YARN_NODEMANAGER_HEAPSIZE": "819",
"yarn-env:YARN_RESOURCEMANAGER_HEAPSIZE": "2048",
"yarn-env:YARN_TIMELINESERVER_HEAPSIZE": "2048",
"yarn:yarn.nodemanager.address": "0.0.0.0:8026",
"yarn:yarn.nodemanager.delete.debug-delay-sec": "86400",
"yarn:yarn.nodemanager.pmem-check-enabled": "false",
"yarn:yarn.nodemanager.resource.cpu-vcores": "2",
"yarn:yarn.nodemanager.resource.memory-mb": "6554",
"yarn:yarn.nodemanager.vmem-check-enabled": "false",
"yarn:yarn.resourcemanager.decommissioning-nodes-watcher.decommission-if-no-shuffle-data": "true",
"yarn:yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs": "86400",
"yarn:yarn.scheduler.maximum-allocation-mb": "6554",
"yarn:yarn.scheduler.minimum-allocation-mb": "1"
}
}
}
I am not getting any error when creating the Dataproc cluster. Also when looking at the underlying code logic of the operator, I do not see a reason why the string values would be converted to int.
Please let me know if i have correctly understood the issue.
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.