Error: One of the request inputs is not valid. on mv or cp function
What happened:
Error occurred on move.
Checked the data lake and the files did copy to new location successfully but were not removed from previous location.
---------------------------------------------------------------------------
HttpResponseError Traceback (most recent call last)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/aio/_blob_client_async.py in start_copy_from_url(self, source_url, metadata, incremental_copy, **kwargs)
1184 return await self._client.page_blob.copy_incremental(**options)
-> 1185 return await self._client.blob.start_copy_from_url(**options)
1186 except HttpResponseError as error:
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/_generated/aio/operations/_blob_operations.py in start_copy_from_url(self, copy_source, timeout, metadata, tier, rehydrate_priority, request_id_parameter, blob_tags_string, seal_blob, source_modified_access_conditions, modified_access_conditions, lease_access_conditions, **kwargs)
2258 error = self._deserialize(_models.StorageError, response)
-> 2259 raise HttpResponseError(response=response, model=error)
2260
HttpResponseError: Operation returned an invalid status 'One of the request inputs is not valid.'
During handling of the above exception, another exception occurred:
HttpResponseError Traceback (most recent call last)
<command-2840456655888337> in <module>
10 fs.mkdirs(f'{container_name}/RAW/Runs/{job_run_id}', exist_ok=True)
11 # fs.mv(files, f'{container_name}/RAW/Runs/{job_run_id}/*', recursive=True)
---> 12 await fs.cp(f'{container_name}/RAW/Config/', f'{container_name}/RAW/Runs/{job_run_id}/', recursive=True)
13 print(fs.ls(f'{container_name}/RAW/Config/'))
14 print(fs.ls(f'{container_name}/RAW/Runs/{job_run_id}/'))
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/fsspec/spec.py in cp(self, path1, path2, **kwargs)
1163 def cp(self, path1, path2, **kwargs):
1164 """Alias of :ref:`FilesystemSpec.copy`."""
-> 1165 return self.copy(path1, path2, **kwargs)
1166
1167 def move(self, path1, path2, **kwargs):
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
86 def wrapper(*args, **kwargs):
87 self = obj or args[0]
---> 88 return sync(self.loop, func, *args, **kwargs)
89
90 return wrapper
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/fsspec/asyn.py in sync(loop, func, timeout, *args, **kwargs)
67 raise FSTimeoutError
68 if isinstance(result[0], BaseException):
---> 69 raise result[0]
70 return result[0]
71
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/fsspec/asyn.py in _runner(event, coro, result, timeout)
23 coro = asyncio.wait_for(coro, timeout=timeout)
24 try:
---> 25 result[0] = await coro
26 except Exception as ex:
27 result[0] = ex
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/fsspec/asyn.py in _copy(self, path1, path2, recursive, on_error, maxdepth, **kwargs)
295 if on_error == "ignore" and isinstance(ex, FileNotFoundError):
296 continue
--> 297 raise ex
298
299 async def _pipe(self, path, value=None, **kwargs):
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/adlfs/spec.py in _cp_file(self, path1, path2, **kwargs)
1548 cc2 = self.service_client.get_container_client(container2)
1549 blobclient2 = cc2.get_blob_client(blob=path2)
-> 1550 await blobclient2.start_copy_from_url(blobclient1.url)
1551 self.invalidate_cache(container1)
1552 self.invalidate_cache(container2)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/core/tracing/decorator_async.py in wrapper_use_tracer(*args, **kwargs)
72 span_impl_type = settings.tracing_implementation()
73 if span_impl_type is None:
---> 74 return await func(*args, **kwargs)
75
76 # Merge span is parameter is set, but only if no explicit parent are passed
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/aio/_blob_client_async.py in start_copy_from_url(self, source_url, metadata, incremental_copy, **kwargs)
1185 return await self._client.blob.start_copy_from_url(**options)
1186 except HttpResponseError as error:
-> 1187 process_storage_error(error)
1188
1189 @distributed_trace_async
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/_shared/response_handlers.py in process_storage_error(storage_error)
148 error.error_code = error_code
149 error.additional_info = additional_data
--> 150 error.raise_with_traceback()
151
152
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/core/exceptions.py in raise_with_traceback(self)
245 def raise_with_traceback(self):
246 try:
--> 247 raise super(AzureError, self).with_traceback(self.exc_traceback)
248 except AttributeError:
249 self.__traceback__ = self.exc_traceback
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/aio/_blob_client_async.py in start_copy_from_url(self, source_url, metadata, incremental_copy, **kwargs)
1183 if incremental_copy:
1184 return await self._client.page_blob.copy_incremental(**options)
-> 1185 return await self._client.blob.start_copy_from_url(**options)
1186 except HttpResponseError as error:
1187 process_storage_error(error)
/local_disk0/.ephemeral_nfs/envs/pythonEnv-a76d3706-ed54-4a01-9bb0-e59d474c9690/lib/python3.8/site-packages/azure/storage/blob/_generated/aio/operations/_blob_operations.py in start_copy_from_url(self, copy_source, timeout, metadata, tier, rehydrate_priority, request_id_parameter, blob_tags_string, seal_blob, source_modified_access_conditions, modified_access_conditions, lease_access_conditions, **kwargs)
2257 map_error(status_code=response.status_code, response=response, error_map=error_map)
2258 error = self._deserialize(_models.StorageError, response)
-> 2259 raise HttpResponseError(response=response, model=error)
2260
2261 response_headers = {}
HttpResponseError: One of the request inputs is not valid.
RequestId:c8d8f1d3-601e-0006-1bab-79e110000000
Time:2021-07-15T19:01:25.5107678Z
ErrorCode:InvalidInput
Error:None
What you expected to happen:
The files within that directory should be moved to the new directory and then removed from the old directory.
Minimal Complete Verifiable Example:
fs = adlfs.AzureBlobFileSystem(
account_name = storage_account_name,
account_key = storage_account_access_key
)
container_name = "container1"
files = fs.ls(f'{container_name}/RAW/Config/')
fs.mkdirs(f'{container_name}/RAW/Runs/{job_run_id}', exist_ok=True)
fs.mv(f'{container_name}/RAW/Config/', f'{container_name}/RAW/Runs/{job_run_id}/', recursive=True)
Anything else we need to know?:
Environment: Databricks, adlfs==2021.7.0
- Python version: 3.8.8
- Operating System: Linux 5.4.0-1051-azure
- Install method (conda, pip, source): pip
I'm facing the same issue with adlfs 2021.10.0. It seems to be caused by Azure Blob Storage not supporting copying of directories (even if they are empty).
This is what seems to be happening:
fs.mv() calls fs.copy(), which in turn calls fs.expand_path() to get the list of files to copy. When recursive=True, this returns all the files under the path. The last line of fs.expand_path() sorts the list of paths before returning it:
return list(sorted(out))
https://github.com/intake/filesystem_spec/blob/0cc4518fd9141c8894b022aad16acdc5049cc671/fsspec/spec.py#L907
So given this set of files and directories:
container/
folder/
sub1/
file1.txt
file2.txt
sub2/
file3.txt
and we call fs.mv('container/folder', 'container/folder2', recursive=True), then fs.expand_path() returns:
[
'container/folder',
'container/folder/sub1',
'container/folder/sub1/file1.txt',
'container/folder/sub1/file2.txt',
'container/folder/sub2',
'container/folder/sub2/file3.txt'
]
fs.copy() then iterates through this list to perform the copy operation:
for p1, p2 in zip(paths, path2):
try:
self.cp_file(p1, p2, **kwargs)
except FileNotFoundError:
if on_error == "raise":
raise
https://github.com/intake/filesystem_spec/blob/0cc4518fd9141c8894b022aad16acdc5049cc671/fsspec/spec.py#L871-L876
The problem is that Azure Blob Storage does not support copying of directories. If we attempt the copy on each of the individual paths listed above, the copy succeeds on the files (e.g. container/folder/sub1/file1.txt) but fails on the directories.
@mattc-eostar @aloysius-lim
Whenever you create a Storage Account on Microsoft Azure, you have three kinds of Azure Storage Account available. I encountered the same issue when I was writing to kind = "StorageV2". This kind does not support move and copy methods for moving data from temporary storage. I created another Storage Account with kind = "BlobStorage" and tried the same operation, this underlying StorageKind worked fine for me and supported move and copy methods.