hdfs icon indicating copy to clipboard operation
hdfs copied to clipboard

hdfs write fail use AsyncWriter

Open yujiapingyu opened this issue 4 years ago • 2 comments

def hdfs_copy_stream(src, dst, namenode=None):
    try:
        md5 = hashlib.md5()
        offset = 0
        clt = get_client(src, namenode)
        with clt.read(src, offset=offset, chunk_size=2 ** 16) as reader:
            with clt.write(dst, overwrite=True) as writer:
                for chunk in reader:
                    md5.update(chunk)
                    offset += len(chunk)
                    writer.write(chunk)
        md5_value = md5.hexdigest()
        print('md5 = {}, length = {}'.format(md5_value, offset))
        return RunState.Done, (md5_value, offset)
    except Exception as e:
        print("copy file {} to {} failed: {}".format(src, dst, e))
        return RunState.Error, None

Hi,I implemented a copy method like this. It works well for large files, but for small files with tens of KB, sometimes the function runs successfully, but the target path of HDFS is indeed a file with a length of 0. I checked it for a long time and found no problem. When I add time.sleep like this:

offset += len(chunk)
writer.write(chunk)
time.sleep(0.001)

Problem solved. I really don't know why, so I'm here to ask for your help.

yujiapingyu avatar Jan 19 '22 13:01 yujiapingyu

Also, I used the Requests library for 2.26.0.

yujiapingyu avatar Jan 19 '22 15:01 yujiapingyu

I tried debug and found that Consomer was consuming a generator,But there is retry logic in Requests,when send is retried, there is no data left in the generator.

I'm not sure if that's the reason.

yujiapingyu avatar Jan 19 '22 16:01 yujiapingyu