HybridBackend icon indicating copy to clipboard operation
HybridBackend copied to clipboard

Error when drop_reminder=True using rebatch API

Open liurcme opened this issue 3 years ago • 0 comments

Current behavior

Using rebatch API with drop_reminder=True will make program exit with segmentation fault

Expected behavior

No error

System information

  • GPU model and memory:
  • OS Platform: ubuntu 18
  • Docker version:
  • GCC/CUDA/cuDNN version:
  • Python/conda version: python 3.6
  • TensorFlow/PyTorch version: 1.5.0
  • HybridBackend version: 0.6.0a0

Code to reproduce

(1) First generate a random parquet file.

import pandas as pd
import random

data_list = []
for i in range(1, 100000):
    int_feature = random.randint(1, 1000)
    array_feature = [random.randint(1, 1000) for x in range(0, 50)]
    data_list.append([int_feature, array_feature, 0.8])

df = pd.DataFrame(data_list, columns=["int_feature", "array_feature", "label"])
df['label'] = pd.to_numeric(df["label"], downcast="float")
df.to_parquet("parquet_sample_file.parquet")

(2) Then read data

import tensorflow as tf
import tensorflow.keras as keras
import hybridbackend.tensorflow as hb

BATCH_SIZE = 1000


def get_parquet_ds():
    filenames_ds = tf.data.Dataset.from_tensor_slices([
        'parquet_sample_file.parquet'
    ]*1)
    hb_fields = []

    def _map(elem):
        features = {
            "int_feature": tf.cast(tf.reshape(elem["int_feature"], [-1, 1]), dtype=tf.float32),
            "array_feature": tf.cast(tf.reshape(elem["array_feature"].values, [-1, 50]),
                                              dtype=tf.float32)
        }
        labels = tf.reshape(elem["label"], [-1, 1])
        return features, labels

    hb_fields.append(hb.data.DataFrame.Field("int_feature", tf.int64, ragged_rank=0))
    hb_fields.append(hb.data.DataFrame.Field("array_feature", tf.int64, ragged_rank=1))
    hb_fields.append(hb.data.DataFrame.Field("label", tf.float32, ragged_rank=0))
    iterator = filenames_ds.apply(
        hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    iterator = iterator.apply(hb.data.rebatch(BATCH_SIZE*2, fields=hb_fields, drop_remainder=True)).map(_map)

    return iterator


def train():
    global_init_op = tf.compat.v1.global_variables_initializer()

    ds = get_parquet_ds()
    iterator = ds.make_one_shot_iterator()
    get_data_op = iterator.get_next()

    with tf.compat.v1.Session() as sess:
        a = sess.run([global_init_op])
        i = 1
        while True:
            try:
                sample = sess.run([get_data_op])

                f_category = sample[0][0]["int_feature"]
                f_list = sample[0][0]["array_feature"]
                labels_ = sample[0][1]

                if i % 100 == 0:
                    print(f"step={i}")
                i += 1

            except tf.errors.OutOfRangeError:
                break


if __name__ == '__main__':
    train()

Willing to contribute

Yes

liurcme avatar May 30 '22 08:05 liurcme