ecosystem icon indicating copy to clipboard operation
ecosystem copied to clipboard

can not use with Dataframe.write.partitionBy, always output one partition.

Open legatoo opened this issue 6 years ago • 9 comments

I want to split my data evenly, so I add an column index to my dataframe, and I am pretty sure this column is added correctly. I printed some rows:

image

I firstly add the index using the code below:

val partitionCnt: Int = 12
val dataWithIndex = context.sc.createDataFrame(
    originData.rdd.zipWithIndex.map { case (r, i) => Row.fromSeq(r.toSeq :+ i % partitionCnt ) },
    StructType(trainRaw.schema.fields :+ StructField("index_id", LongType, false))
)

then I want to partition by index_id:

dataWithIndex.write.partitionBy(rowId)
    .format("tfrecords")
    .mode(SaveMode.Overwrite)
    .option("recordType", "Example")
    .save(hdfsPath)

but this code output only one partition everytime. I thougt is could be something wrong with dataframe, but when I output to another format, it works as expected. e.g. csv:

dataWithIndex.write.partitionBy(rowId)
    .format("com.databricks.spark.csv")
    .mode(SaveMode.Overwrite)
    .save(params("hdfsPath"))

image

legatoo avatar Jan 31 '19 09:01 legatoo

Hi there,

Not sure why you need .mode(SaveMode.Overwrite) when writing the dataframe to HDFS. Have you tried it without .mode(SaveMode.Overwrite)?

ghost avatar Feb 01 '19 04:02 ghost

@MajesticKhan I don't think it will help, but I give it a try, the same problem too. According to the doc, mode is used to Specifies the behavior when data or table already exists, I use mode without partitionBy to produce tfrecord, it works.

legatoo avatar Feb 01 '19 06:02 legatoo

The only other idea I have is to first partition the data and then persist or cache the dataframe. From there, try to write it to the stated path

ghost avatar Feb 02 '19 02:02 ghost

at the very least, partitionBy should fail. Currently its a silent bug.

nicholas-leonard avatar Nov 05 '19 20:11 nicholas-leonard

encounter the same problem

jerome-eyespage avatar Nov 09 '19 04:11 jerome-eyespage

Any update on this from the developers? See the same issue at 1.15.0. Does anyone have a workaround? I suppose I could partition the data then save the partitions sequentially, but am wondering if there are better approaches.

junshi15 avatar Jan 04 '20 18:01 junshi15

I've run into this issue as well. Using partitionBy with all suggested workaround still results in one partition. Is anyone working on this issue?

boarder7395 avatar Feb 28 '20 13:02 boarder7395

+1 @boarder7395

I suppose the tensorflow dastasource takes the whole dataframe as an input and don;t respect the partitionBy clause.

Is there a workaround other than writing each partition data separately in a loop.

priyabratapatnaik avatar Mar 20 '20 19:03 priyabratapatnaik

We open sourced a similar package to address this issue. You can try it out here. https://github.com/linkedin/spark-tfrecord https://engineering.linkedin.com/blog/2020/spark-tfrecord

junshi15 avatar May 05 '20 00:05 junshi15