Permission Denied when writing to HDFS
I am trying out a basic 'distributed "Hello World" ' job using Dask on a YARN cluster. Basically I am reading some data from HDFS, mapping some columns and then writing them to a different HDFS folder. However, the last step does not work, I receive the following error:
Traceback (most recent call last):
File "./mgmt_score_dist.py", line 52, in <module>
main()
File "./mgmt_score_dist.py", line 47, in main
'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist_flat.parquet',
File "./mgmt_score_dist.py", line 33, in mgmt_score_dist
source_df.to_csv(output, header=False, sep='\t')
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/core.py", line 1091, in to_csv
return to_csv(self, filename, **kwargs)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 577, in to_csv
delayed(values).compute(get=get, scheduler=scheduler)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 156, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 402, in compute
results = schedule(dsk, keys, **kwargs)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 2159, in get
direct=direct)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1562, in gather
asynchronous=asynchronous)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 652, in sync
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 275, in sync
six.reraise(*error[0])
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 260, in f
result[0] = yield make_coro()
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1099, in run
value = future.result()
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/concurrent.py", line 260, in result
raise_exc_info(self._exc_info)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1107, in run
yielded = self.gen.throw(*exc_info)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1439, in _gather
traceback)
File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 438, in _to_csv_chunk
File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/bytes/core.py", line 166, in __enter__
File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/bytes/hdfs3.py", line 24, in open
File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/hdfs3/core.py", line 237, in open
File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/hdfs3/core.py", line 709, in __init__
File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/hdfs3/core.py", line 719, in _set_handle
IOError: Could not open file: /user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist.tsv/0.part, mode: wb Permission denied: user=yarn, access=WRITE, inode="/user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist.tsv/0.part":yuriyd:yuriyd:drwxr-xr-x
I have googled for this error, and, apparently, it occurs when I try to write to HDFS as 'yarn' user and not as my own user. I haven't found anything in the documentation or in the source code about setting the user. I tried initializing DaskYARNCluster with user='yuriyd', but I still got the same error.
Any assistance or advice will be greatly appreciated.
Here is my code:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import dask.dataframe as dd
from dask.distributed import Client
from dask_yarn import DaskYARNCluster
from json import loads
def load_respondable_sources(input_sources):
source_df = dd.read_table("%s/*" % input_sources, usecols=[0, 1], header=None, names=['source_id', 'data'],
converters={'data': loads})
source_df['source_respondable'] = source_df['data'].map(lambda x: x[1])
source_df['response_min_date'] = source_df['data'].map(lambda x: x[2])
# source_df['response_max_date'] = source_df['data'].map(lambda x: x[3])
source_df.drop('data', axis=1)
return source_df.loc[source_df['source_respondable']]
def mgmt_score_dist(
input_mgmt_response, # type: str
input_reviews, # type: str
input_sources, # type: str
output, # type: str
flat_output, # type: str
):
cluster = DaskYARNCluster(env='mgmt_score_dist_dask.zip')
client = Client(cluster)
cluster.start(2, cpus=2, memory=1024, envvars={'KNIT_LANG': 'C.UTF-8', 'DASK_CONFIG': '/tmp'})
source_df = load_respondable_sources(input_sources)
source_df.to_csv(output, header=False, sep='\t')
# TODO
client.close()
cluster.close()
def main():
mgmt_score_dist(
'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dump/mgmt_response.tsv',
'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dump/dump_reviews.tsv',
'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dump/source.tsv',
'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist.tsv',
'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist_flat.parquet',
)
if __name__ == '__main__':
main()
UPDATE: I have temporarily granted every user write permissions (hadoop fs -chmod -R 777 ...), but now I get a different error:
Traceback (most recent call last):
File "./mgmt_score_dist.py", line 53, in <module>
main()
File "./mgmt_score_dist.py", line 48, in main
'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist_flat.parquet',
File "./mgmt_score_dist.py", line 34, in mgmt_score_dist
source_df.to_csv(output, header=False, sep='\t')
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/core.py", line 1091, in to_csv
return to_csv(self, filename, **kwargs)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 577, in to_csv
delayed(values).compute(get=get, scheduler=scheduler)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 156, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 402, in compute
results = schedule(dsk, keys, **kwargs)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 2159, in get
direct=direct)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1562, in gather
asynchronous=asynchronous)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 652, in sync
return sync(self.loop, func, *args, **kwargs)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 275, in sync
six.reraise(*error[0])
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 260, in f
result[0] = yield make_coro()
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1099, in run
value = future.result()
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/concurrent.py", line 260, in result
raise_exc_info(self._exc_info)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1107, in run
yielded = self.gen.throw(*exc_info)
File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1439, in _gather
traceback)
File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 439, in _to_csv_chunk
# finally, compare results against first row and "vote"
File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/pandas/core/frame.py", line 1745, in to_csv
File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/pandas/io/formats/csvs.py", line 161, in save
File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/bytes/utils.py", line 136, in __getattr__
AttributeError: 'HDFile' object has no attribute 'getvalue'
@jcrist , do containers created by skein have credentials to write to HDFS as the initiating user?
Yes. The delegation token for the default filesystem is provided in each container, and picked up automatically by libhdfs (the backend for pyarrow's hdfs reader), and maybe libhdfs3 (haven't tested). Currently skein doesn't handle delegation token renewal, so this will stop working after it expires, but that should only matter for long running jobs (> 1 day).
Thanks @jcrist . @yuriy-davygora , would you like to try using skein ?
@martindurant
The only reason I tried it with knit was that I wanted to have a quick and easy test, but if it does not work, then I'll try skein.
One more question, though: when I set the permission 777 for everything, I received another error (see the update to my original question). I was using hdf3 at the time. I also tried pyarrow, but it complained about not being able to load libhdfs, and I could not get it to run with libhdfs3. Is this again a knit-related issue, or is this something else?
Oop, actually, upon reading your error message it looks like you're using simple authentication instead of kerberos. In that case, no, skein has the same bug under simple authentication. I've been putting off fixing it in favor of other things, but I'll track that down today. Should be a simple fix.
I also tried pyarrow, but it complained about not being able to load libhdfs
Pyarrow should work fine on any system, but you may need to set some environment variables, see: https://arrow.apache.org/docs/python/filesystems.html#hadoop-file-system-hdfs
@TomAugspurger , did something change in pandas to_csv? Calling .getvalue() seems to mean it's assuming the file-like is some BytesIO. This appears like a hdfs3 issue, but I think it showed up in one of the other file-system backends.
What version of pandas? 0.23.0 introduced a couple when using compression: https://github.com/pandas-dev/pandas/issues/21144 and https://github.com/pandas-dev/pandas/issues/17778, and I think those fixes introduces another that is being included in 0.23.2.
Ah, correct, this seems to be in the ZIP branch (https://github.com/pandas-dev/pandas/blob/master/pandas/io/formats/csvs.py#L176), but the dask file object also handles compression internally. Not sure what to do about that. @yuriy-davygora , you could try passing compression= to to_csv and trying various parameters.
@TomAugspurger: conda installed 0.23.1 automatically, I did not specify pandas version explicitely. I will try 0.23.2 tomorrow.
@jcrist Thank you for your answer, I will give pyarrow another go tomorrow.
FYI 0.23.2 isn't released yet. Probably a few days.
On Thu, Jun 28, 2018 at 10:38 AM, yuriy-davygora [email protected] wrote:
@TomAugspurger https://github.com/TomAugspurger: conda installed 0.23.1 automatically, I did not specify pandas version explicitely. I will try 0.23.2 tomorrow.
@jcrist https://github.com/jcrist Thank you for your answer, I will give pyarrow another go tomorrow.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/knit/issues/132#issuecomment-401078317, or mute the thread https://github.com/notifications/unsubscribe-auth/ABQHImRRaCddY0z0SE4rQYrdNhEPfngOks5uBPhugaJpZM4U7Uhp .
@yuriy-davygora, dask-yarn (https://dask-yarn.readthedocs.io/en/latest/) has been released and now uses Skein (https://jcrist.github.io/skein/index.html), a more robust library for python/yarn interaction. The above permissions issue has been addressed there.
As for the to_csv issue, I'm not sure what needs to be done here. This has nothing to do with hadoop stuff necessarily, and is more a bug(?) in dask's bytes handling/to_csv functions. @TomAugspurger, @martindurant any ideas on what if anything needs to be done here?
I haven't looked closely at the error.
Pandas 0.23.2 will be out shortly (next few days) and will hopefully fix this class of bugs.
On Tue, Jul 3, 2018 at 2:05 PM, Jim Crist [email protected] wrote:
@yuriy-davygora https://github.com/yuriy-davygora, dask-yarn ( https://dask-yarn.readthedocs.io/en/latest/) has been released and now uses Skein (https://jcrist.github.io/skein/index.html), a more robust library for python/yarn interaction. The above permissions issue has been addressed there.
As for the to_csv issue, I'm not sure what needs to be done here. This has nothing to do with hadoop stuff necessarily, and is more a bug(?) in dask's bytes handling/to_csv functions. @TomAugspurger https://github.com/TomAugspurger, @martindurant https://github.com/martindurant any ideas on what if anything needs to be done here?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/knit/issues/132#issuecomment-402262092, or mute the thread https://github.com/notifications/unsubscribe-auth/ABQHIkApbRnm6Kn41SN1EvrZDCLJ_FJgks5uC8B6gaJpZM4U7Uhp .