An overwrite option for all sink types that write to HDFS
An overwrite option for all sink types that write to HDFS
Proposal
Sink.withOverwrite
AffectedSinks
- Parquet
- AvroParquet
- Avro
- Orc
- Csv
Hello,
Is it possible to append the content of the file ? https://hadoop.apache.org/docs/r2.8.2/api/org/apache/hadoop/fs/FileSystem.html#append(org.apache.hadoop.fs.Path)
I tested withOverwrite(true) and withOverwrite(false) for ParquetSink and CsvSink sinks, it seems it always overwrite the file anyway.
In comparision, JdbcSink just insert rows in table without deleting previous ones.
@mycaule Thank you - I have reopened this issue and we will look into this.
@mycaule Unfortunately I cannot reproduce the problem - do you have a test case to reproduce the problem?
- I have enhanced the
CSVSinkTestandParquetSinkTestto be a bit more thorough...
it should "support overwrite" in {
val path = new Path(s"target/${UUID.randomUUID().toString}", s"${UUID.randomUUID().toString}.pq")
val schema = StructType(Field("a", StringType))
val ds = DataStream.fromRows(
schema,
Seq(
Row(schema, Vector("x")),
Row(schema, Vector("y"))
)
)
// Write twice to test overwrite
ds.to(ParquetSink(path))
ds.to(ParquetSink(path).withOverwrite(true))
var parentStatus = fs.listStatus(path.getParent)
println("Parquet Overwrite:")
parentStatus.foreach(p => println(p.getPath))
parentStatus.length shouldBe 1
parentStatus.head.getPath.getName shouldBe path.getName
// Write again without overwrite
val appendPath = new Path(path.getParent, s"${UUID.randomUUID().toString}.pq")
ds.to(ParquetSink(appendPath).withOverwrite(false))
parentStatus = fs.listStatus(path.getParent)
println("Parquet Append:")
parentStatus.foreach(p => println(p.getPath))
parentStatus.length shouldBe 2
}
- Snippet of the output
Parquet Overwrite:
file:/home/hannes/projects/eel-sdk/target/092fff6d-b6e9-40b1-b565-953911971376/f22c6618-e07b-4a56-9703-38d75ff26ba5.pq
Parquet Append:
file:/home/hannes/projects/eel-sdk/target/092fff6d-b6e9-40b1-b565-953911971376/f22c6618-e07b-4a56-9703-38d75ff26ba5.pq
file:/home/hannes/projects/eel-sdk/target/092fff6d-b6e9-40b1-b565-953911971376/1f0c810a-f788-46eb-b019-a17cceed1695.pq
Hello my code looks like this. The file path is constant and I would like the file to be appended each time writeCSV is called
implicit val conf = new Configuration()
implicit val fs = FileSystem.get(new URI(s"hdfs://hdfsServer:8020"), conf)
def writeCSV(params...) = {
val path = s"hdfs://tmp/myfile.csv"
val ds = ... // Create ds based on params
val sink = CsvSink(new Path(path), false)
// Second argument true or false give same result : the file is overwritten
ds.to(sink)
}
// first call
writeCSV(params1...)
// second call
writeCSV(params2...)
This file may need to have the Hadoop method fs.append called in my opinion.
https://github.com/51zero/eel-sdk/blob/master/eel-core/src/main/scala/io/eels/component/csv/CsvSink.scala#L34-L35
I think the problem occurs when testing with HDFS with a particular configuration of the cluster. https://stackoverflow.com/questions/22997137/append-data-to-existing-file-in-hdfs-java
I check my HDFS cluster configuration and it does support appending.
<property>
<name>dfs.support.append</name>
<value>true</value>
</property>
@mycaule
The CSV/Avro/Orc/Parquet sinks were not designed to append to existing files for a couple of reasons:
-
For binary formats like
ParquetandOrcit’s not possible to append even thoughHDFSmay allow it - this is because for example forParquetstats (averages, counts, distinct values) are written to thefooterof the file upon closure -Orchas a similar concept -
The config option you mentioned is not typically enabled for most
HDFScluster installations I have worked on. -
This could in theory work for text based formats like
CSVbut there's hardly been any requests for this feature given the immutable nature ofHDFS.
@mycaule In addition the hadoop FileSystem API has an append method as opposed to create - I tried this out by adding in an withAppend(true) method to the CsvSink, here's a snippet of my test:
// Now write to the same file in append mode and test that we have double the amount of rows
ds.to(CsvSink(path).withOverwrite(true))
ds.to(CsvSink(path).withAppend(true))
using(fs.open(path)) { inputStream =>
using(new BufferedReader(new InputStreamReader(inputStream))) { reader =>
val lines = reader.lines().toArray
println(lines.mkString("\n"))
lines.length shouldBe 4
}
}
Which unfortunately yields the following exception:
Not supported
java.io.IOException: Not supported
at org.apache.hadoop.fs.ChecksumFileSystem.append(ChecksumFileSystem.java:357)
at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)
The hadoop code says:
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new IOException("Not supported");
}
@mycaule I would like to close this issue if you have no objections?
Ok, thanks for investigating anyway !
Please close this.