knit icon indicating copy to clipboard operation
knit copied to clipboard

Permission Denied when writing to HDFS

Open yuriy-davygora opened this issue 7 years ago • 12 comments

I am trying out a basic 'distributed "Hello World" ' job using Dask on a YARN cluster. Basically I am reading some data from HDFS, mapping some columns and then writing them to a different HDFS folder. However, the last step does not work, I receive the following error:

Traceback (most recent call last):
  File "./mgmt_score_dist.py", line 52, in <module>
    main()
  File "./mgmt_score_dist.py", line 47, in main
    'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist_flat.parquet',
  File "./mgmt_score_dist.py", line 33, in mgmt_score_dist
    source_df.to_csv(output, header=False, sep='\t')
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/core.py", line 1091, in to_csv
    return to_csv(self, filename, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 577, in to_csv
    delayed(values).compute(get=get, scheduler=scheduler)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 402, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 2159, in get
    direct=direct)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1562, in gather
    asynchronous=asynchronous)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 652, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 275, in sync
    six.reraise(*error[0])
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 260, in f
    result[0] = yield make_coro()
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/concurrent.py", line 260, in result
    raise_exc_info(self._exc_info)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1439, in _gather
    traceback)
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 438, in _to_csv_chunk
    
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/bytes/core.py", line 166, in __enter__
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/bytes/hdfs3.py", line 24, in open
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/hdfs3/core.py", line 237, in open
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/hdfs3/core.py", line 709, in __init__
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45487/container_1512662857247_45487_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/hdfs3/core.py", line 719, in _set_handle
IOError: Could not open file: /user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist.tsv/0.part, mode: wb Permission denied: user=yarn, access=WRITE, inode="/user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist.tsv/0.part":yuriyd:yuriyd:drwxr-xr-x

I have googled for this error, and, apparently, it occurs when I try to write to HDFS as 'yarn' user and not as my own user. I haven't found anything in the documentation or in the source code about setting the user. I tried initializing DaskYARNCluster with user='yuriyd', but I still got the same error.

Any assistance or advice will be greatly appreciated.

Here is my code:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import dask.dataframe as dd
from dask.distributed import Client
from dask_yarn import DaskYARNCluster
from json import loads


def load_respondable_sources(input_sources):
    source_df = dd.read_table("%s/*" % input_sources, usecols=[0, 1], header=None, names=['source_id', 'data'],
                              converters={'data': loads})
    source_df['source_respondable'] = source_df['data'].map(lambda x: x[1])
    source_df['response_min_date'] = source_df['data'].map(lambda x: x[2])
    # source_df['response_max_date'] = source_df['data'].map(lambda x: x[3])
    source_df.drop('data', axis=1)
    return source_df.loc[source_df['source_respondable']]


def mgmt_score_dist(
        input_mgmt_response,  # type: str
        input_reviews,  # type: str
        input_sources,  # type: str
        output,  # type: str
        flat_output,  # type: str
):
    cluster = DaskYARNCluster(env='mgmt_score_dist_dask.zip')
    client = Client(cluster)

    cluster.start(2, cpus=2, memory=1024, envvars={'KNIT_LANG': 'C.UTF-8', 'DASK_CONFIG': '/tmp'})

    source_df = load_respondable_sources(input_sources)
    source_df.to_csv(output, header=False, sep='\t')

    # TODO

    client.close()
    cluster.close()


def main():
    mgmt_score_dist(
        'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dump/mgmt_response.tsv',
        'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dump/dump_reviews.tsv',
        'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dump/source.tsv',
        'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist.tsv',
        'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist_flat.parquet',
    )


if __name__ == '__main__':
    main()

UPDATE: I have temporarily granted every user write permissions (hadoop fs -chmod -R 777 ...), but now I get a different error:

Traceback (most recent call last):
  File "./mgmt_score_dist.py", line 53, in <module>
    main()
  File "./mgmt_score_dist.py", line 48, in main
    'hdfs:///user/yuriyd/daily_staging/2018-06-20-spark/group/dask/mgmt_score_dist_flat.parquet',
  File "./mgmt_score_dist.py", line 34, in mgmt_score_dist
    source_df.to_csv(output, header=False, sep='\t')
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/core.py", line 1091, in to_csv
    return to_csv(self, filename, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 577, in to_csv
    delayed(values).compute(get=get, scheduler=scheduler)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 156, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/base.py", line 402, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 2159, in get
    direct=direct)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1562, in gather
    asynchronous=asynchronous)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 652, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 275, in sync
    six.reraise(*error[0])
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/utils.py", line 260, in f
    result[0] = yield make_coro()
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/concurrent.py", line 260, in result
    raise_exc_info(self._exc_info)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/usr/local/trustyou/home/yuriyd/miniconda3/envs/mgmt_score_dist_dask/lib/python2.7/site-packages/distributed/client.py", line 1439, in _gather
    traceback)
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 439, in _to_csv_chunk
    # finally, compare results against first row and "vote"
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/pandas/core/frame.py", line 1745, in to_csv
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/pandas/io/formats/csvs.py", line 161, in save
  File "/mnt/disk4/hadoop/yarn/local/usercache/yuriyd/appcache/application_1512662857247_45493/container_1512662857247_45493_01_000003/mgmt_score_dist_dask.zip/mgmt_score_dist_dask/lib/python2.7/site-packages/dask/bytes/utils.py", line 136, in __getattr__
AttributeError: 'HDFile' object has no attribute 'getvalue'

yuriy-davygora avatar Jun 28 '18 12:06 yuriy-davygora

@jcrist , do containers created by skein have credentials to write to HDFS as the initiating user?

martindurant avatar Jun 28 '18 12:06 martindurant

Yes. The delegation token for the default filesystem is provided in each container, and picked up automatically by libhdfs (the backend for pyarrow's hdfs reader), and maybe libhdfs3 (haven't tested). Currently skein doesn't handle delegation token renewal, so this will stop working after it expires, but that should only matter for long running jobs (> 1 day).

jcrist avatar Jun 28 '18 15:06 jcrist

Thanks @jcrist . @yuriy-davygora , would you like to try using skein ?

martindurant avatar Jun 28 '18 15:06 martindurant

@martindurant

The only reason I tried it with knit was that I wanted to have a quick and easy test, but if it does not work, then I'll try skein.

One more question, though: when I set the permission 777 for everything, I received another error (see the update to my original question). I was using hdf3 at the time. I also tried pyarrow, but it complained about not being able to load libhdfs, and I could not get it to run with libhdfs3. Is this again a knit-related issue, or is this something else?

yuriy-davygora avatar Jun 28 '18 15:06 yuriy-davygora

Oop, actually, upon reading your error message it looks like you're using simple authentication instead of kerberos. In that case, no, skein has the same bug under simple authentication. I've been putting off fixing it in favor of other things, but I'll track that down today. Should be a simple fix.

I also tried pyarrow, but it complained about not being able to load libhdfs

Pyarrow should work fine on any system, but you may need to set some environment variables, see: https://arrow.apache.org/docs/python/filesystems.html#hadoop-file-system-hdfs

jcrist avatar Jun 28 '18 15:06 jcrist

@TomAugspurger , did something change in pandas to_csv? Calling .getvalue() seems to mean it's assuming the file-like is some BytesIO. This appears like a hdfs3 issue, but I think it showed up in one of the other file-system backends.

martindurant avatar Jun 28 '18 15:06 martindurant

What version of pandas? 0.23.0 introduced a couple when using compression: https://github.com/pandas-dev/pandas/issues/21144 and https://github.com/pandas-dev/pandas/issues/17778, and I think those fixes introduces another that is being included in 0.23.2.

TomAugspurger avatar Jun 28 '18 15:06 TomAugspurger

Ah, correct, this seems to be in the ZIP branch (https://github.com/pandas-dev/pandas/blob/master/pandas/io/formats/csvs.py#L176), but the dask file object also handles compression internally. Not sure what to do about that. @yuriy-davygora , you could try passing compression= to to_csv and trying various parameters.

martindurant avatar Jun 28 '18 15:06 martindurant

@TomAugspurger: conda installed 0.23.1 automatically, I did not specify pandas version explicitely. I will try 0.23.2 tomorrow.

@jcrist Thank you for your answer, I will give pyarrow another go tomorrow.

yuriy-davygora avatar Jun 28 '18 15:06 yuriy-davygora

FYI 0.23.2 isn't released yet. Probably a few days.

On Thu, Jun 28, 2018 at 10:38 AM, yuriy-davygora [email protected] wrote:

@TomAugspurger https://github.com/TomAugspurger: conda installed 0.23.1 automatically, I did not specify pandas version explicitely. I will try 0.23.2 tomorrow.

@jcrist https://github.com/jcrist Thank you for your answer, I will give pyarrow another go tomorrow.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/knit/issues/132#issuecomment-401078317, or mute the thread https://github.com/notifications/unsubscribe-auth/ABQHImRRaCddY0z0SE4rQYrdNhEPfngOks5uBPhugaJpZM4U7Uhp .

TomAugspurger avatar Jun 28 '18 15:06 TomAugspurger

@yuriy-davygora, dask-yarn (https://dask-yarn.readthedocs.io/en/latest/) has been released and now uses Skein (https://jcrist.github.io/skein/index.html), a more robust library for python/yarn interaction. The above permissions issue has been addressed there.

As for the to_csv issue, I'm not sure what needs to be done here. This has nothing to do with hadoop stuff necessarily, and is more a bug(?) in dask's bytes handling/to_csv functions. @TomAugspurger, @martindurant any ideas on what if anything needs to be done here?

jcrist avatar Jul 03 '18 19:07 jcrist

I haven't looked closely at the error.

Pandas 0.23.2 will be out shortly (next few days) and will hopefully fix this class of bugs.

On Tue, Jul 3, 2018 at 2:05 PM, Jim Crist [email protected] wrote:

@yuriy-davygora https://github.com/yuriy-davygora, dask-yarn ( https://dask-yarn.readthedocs.io/en/latest/) has been released and now uses Skein (https://jcrist.github.io/skein/index.html), a more robust library for python/yarn interaction. The above permissions issue has been addressed there.

As for the to_csv issue, I'm not sure what needs to be done here. This has nothing to do with hadoop stuff necessarily, and is more a bug(?) in dask's bytes handling/to_csv functions. @TomAugspurger https://github.com/TomAugspurger, @martindurant https://github.com/martindurant any ideas on what if anything needs to be done here?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dask/knit/issues/132#issuecomment-402262092, or mute the thread https://github.com/notifications/unsubscribe-auth/ABQHIkApbRnm6Kn41SN1EvrZDCLJ_FJgks5uC8B6gaJpZM4U7Uhp .

TomAugspurger avatar Jul 03 '18 20:07 TomAugspurger