databricks-cli icon indicating copy to clipboard operation
databricks-cli copied to clipboard

"Invalid JSON given in the body of the request - expected a map" when using reset_job method

Open georgikoemdzhiev opened this issue 2 years ago • 2 comments

Hello,

I am trying to change an existing job settings using the cli but when I invoke the reset_job method I am getting this error:

Traceback (most recent call last):
  File "/home/vsts/work/1/s/DataPlatform.DR/main.py", line 78, in <module>
    dr.experiment(host,token)
  File "/home/vsts/work/1/s/DataPlatform.DR/main.py", line 58, in experiment
    jobs.reset_job(job_json)
  File "/home/vsts/.local/lib/python3.10/site-packages/databricks_cli/jobs/api.py", line 49, in reset_job
    return self.client.client.perform_query('POST', '/jobs/reset', data=json, headers=headers,
  File "/home/vsts/.local/lib/python3.10/site-packages/databricks_cli/sdk/api_client.py", line 174, in perform_query
    raise requests.exceptions.HTTPError(message, response=e.response)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://spg-sustainable1-qa.cloud.databricks.com/api/2.0/jobs/reset
 Response from server: 
 { 'error_code': 'MALFORMED_REQUEST',
  'message': 'Invalid JSON given in the body of the request - expected a map'}

Here is the sample python code I am using:

...
api_client = ApiClient(host=databricks_host, token=databricks_token)
jobs = JobsApi(api_client)

job_list = jobs.list_jobs()["jobs"]

job_name = "DP DataSync Job"
result_list = list(
    filter(
    lambda job: job['settings']['name'] == job_name, job_list)
    )

job = result_list[0]
job_id = job["job_id"]
job["settings"]["schedule"]["pause_status"] = "UNPAUSED"

print(f"Resetting job with id: {job_id}")

job_json = json.dumps(job)

jobs.reset_job(job_json)

Here is the json that gets passed to reset_job:

{
	"job_id": 217841321277199,
	"creator_user_name": "...",
	"settings": {
		"name": "DP DataSync Job",
		"new_cluster": {
			"cluster_name": "",
			"spark_version": "10.4.x-scala2.12",
			"aws_attributes": {
				"first_on_demand": 1,
				"availability": "SPOT_WITH_FALLBACK",
				"zone_id": "us-east-1a",
				"spot_bid_price_percent": 100,
				"ebs_volume_count": 0
			},
			"node_type_id": "d3.4xlarge",
			"custom_tags": {
				"Owner": "[email protected]",
				"AppID": "appidhere",
				"Environment": ""
			},
			"spark_env_vars": {
				"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
			},
			"enable_elastic_disk": false,
			"runtime_engine": "STANDARD",
			"autoscale": {
				"min_workers": 2,
				"max_workers": 16
			}
		},
		"libraries": [
			{
				"jar": "DataSync-1.0-all.jar"
			}
		],
		"email_notifications": {
			"on_start": [
				"[email protected]"
			],
			"on_success": [
				"[email protected]"
			],
			"on_failure": [
				"[email protected]"
			],
			"no_alert_for_skipped_runs": false
		},
		"timeout_seconds": 0,
		"schedule": {
			"quartz_cron_expression": "35 0 21 * * ?",
			"timezone_id": "America/New_York",
			"pause_status": "UNPAUSED"
		},
		"spark_jar_task": {
			"jar_uri": "",
			"main_class_name": "com.company.s.dp.datasync",
			"parameters": [
				"Config.json"
			],
			"run_as_repl": true
		},
		"max_concurrent_runs": 1,
		"format": "SINGLE_TASK"
	},
	"created_time": 1678272261985
}

Databricks CLI version: 17.4

georgikoemdzhiev avatar Mar 15 '23 15:03 georgikoemdzhiev

If you look at the API documentation, the JSON key for the settings in the /jobs/reset endpoint needs to be "new_settings", rather than "settings".

mroy-seedbox avatar Mar 15 '23 17:03 mroy-seedbox

Thank you for your suggestion. I changed my existing logic so that I am passing new_settings (see below) but I am getting the same error - Invalid JSON given in the body of the request - expected a map':

Here is the updated python code:

from databricks_cli.jobs.api import JobsApi
from databricks_cli.sdk.api_client import ApiClient
from databricks_cli.sdk.service import JobsService

api_client = ApiClient(host=databricks_host, token=databricks_token)
jobs = JobsApi(api_client)

job_list = jobs.list_jobs()["jobs"]

job_name = "DP DataSync Job"
result_list = list(
    filter(
    lambda job: job['settings']['name'] == job_name, job_list)
    )

job = result_list[0]
job_id = job["job_id"]
job["settings"]["schedule"]["pause_status"] = "UNPAUSED"

# change settings key to new_settings
job["new_settings"] = job["settings"]
del job["settings"]

job_json = json.dumps(job, indent=4)
print(job_json)

jobs.reset_job(job_json)

Here is the JSON I am passing to reset_job method:

{
	"job_id": 217841321277199,
	"creator_user_name": "creator_name_here",
	"created_time": 1678272261985,
	"new_settings": {
		"name": "DP DataSync Job",
		"new_cluster": {
			"cluster_name": "",
			"spark_version": "10.4.x-scala2.12",
			"aws_attributes": {
				"first_on_demand": 1,
				"availability": "SPOT_WITH_FALLBACK",
				"zone_id": "us-east-1a",
				"spot_bid_price_percent": 100,
				"ebs_volume_count": 0
			},
			"node_type_id": "d3.4xlarge",
			"spark_env_vars": {
				"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
			},
			"enable_elastic_disk": false,
			"runtime_engine": "STANDARD",
			"autoscale": {
				"min_workers": 2,
				"max_workers": 16
			}
		},
		"libraries": [
			{
				"jar": "DataSync-1.0-all.jar"
			}
		],
		"email_notifications": {
			"on_start": [
				"[email protected]"
			],
			"on_success": [
				"[email protected]"
			],
			"on_failure": [
				"[email protected]"
			],
			"no_alert_for_skipped_runs": false
		},
		"timeout_seconds": 0,
		"schedule": {
			"quartz_cron_expression": "35 0 21 * * ?",
			"timezone_id": "America/New_York",
			"pause_status": "UNPAUSED"
		},
		"spark_jar_task": {
			"jar_uri": "",
			"main_class_name": "com.company.dataplatform.datasync",
			"parameters": [
				"Config.json"
			],
			"run_as_repl": true
		},
		"max_concurrent_runs": 1,
		"format": "SINGLE_TASK"
	}
}

georgikoemdzhiev avatar Mar 15 '23 17:03 georgikoemdzhiev