eel-sdk icon indicating copy to clipboard operation
eel-sdk copied to clipboard

An overwrite option for all sink types that write to HDFS

Open hannesmiller opened this issue 9 years ago • 10 comments

An overwrite option for all sink types that write to HDFS

Proposal

Sink.withOverwrite

AffectedSinks

  • Parquet
  • AvroParquet
  • Avro
  • Orc
  • Csv

hannesmiller avatar Jan 23 '17 18:01 hannesmiller

Hello,

Is it possible to append the content of the file ? https://hadoop.apache.org/docs/r2.8.2/api/org/apache/hadoop/fs/FileSystem.html#append(org.apache.hadoop.fs.Path)

I tested withOverwrite(true) and withOverwrite(false) for ParquetSink and CsvSink sinks, it seems it always overwrite the file anyway.

In comparision, JdbcSink just insert rows in table without deleting previous ones.

mycaule avatar Jun 19 '18 13:06 mycaule

@mycaule Thank you - I have reopened this issue and we will look into this.

hannesmiller avatar Jun 20 '18 09:06 hannesmiller

@mycaule Unfortunately I cannot reproduce the problem - do you have a test case to reproduce the problem?

  • I have enhanced the CSVSinkTest and ParquetSinkTest to be a bit more thorough...
  it should "support overwrite" in {
    val path = new Path(s"target/${UUID.randomUUID().toString}", s"${UUID.randomUUID().toString}.pq")
    val schema = StructType(Field("a", StringType))
    val ds = DataStream.fromRows(
      schema,
      Seq(
        Row(schema, Vector("x")),
        Row(schema, Vector("y"))
      )
    )

    // Write twice to test overwrite
    ds.to(ParquetSink(path))
    ds.to(ParquetSink(path).withOverwrite(true))

    var parentStatus = fs.listStatus(path.getParent)
    println("Parquet Overwrite:")
    parentStatus.foreach(p => println(p.getPath))
    parentStatus.length shouldBe 1
    parentStatus.head.getPath.getName shouldBe path.getName

    // Write again without overwrite
    val appendPath = new Path(path.getParent, s"${UUID.randomUUID().toString}.pq")
    ds.to(ParquetSink(appendPath).withOverwrite(false))
    parentStatus = fs.listStatus(path.getParent)
    println("Parquet Append:")
    parentStatus.foreach(p => println(p.getPath))
    parentStatus.length shouldBe 2
  }
  • Snippet of the output
Parquet Overwrite:
file:/home/hannes/projects/eel-sdk/target/092fff6d-b6e9-40b1-b565-953911971376/f22c6618-e07b-4a56-9703-38d75ff26ba5.pq
Parquet Append:
file:/home/hannes/projects/eel-sdk/target/092fff6d-b6e9-40b1-b565-953911971376/f22c6618-e07b-4a56-9703-38d75ff26ba5.pq
file:/home/hannes/projects/eel-sdk/target/092fff6d-b6e9-40b1-b565-953911971376/1f0c810a-f788-46eb-b019-a17cceed1695.pq

hannesmiller avatar Jun 20 '18 10:06 hannesmiller

Hello my code looks like this. The file path is constant and I would like the file to be appended each time writeCSV is called

  implicit val conf = new Configuration()
  implicit val fs = FileSystem.get(new URI(s"hdfs://hdfsServer:8020"), conf)

 def writeCSV(params...) = {
    val path = s"hdfs://tmp/myfile.csv"

    val ds = ... // Create ds based on params
    val sink = CsvSink(new Path(path), false)
  // Second argument true or false give same result : the file is overwritten
    ds.to(sink)
  }

  // first call
  writeCSV(params1...)

  // second call
  writeCSV(params2...)

This file may need to have the Hadoop method fs.append called in my opinion.

https://github.com/51zero/eel-sdk/blob/master/eel-core/src/main/scala/io/eels/component/csv/CsvSink.scala#L34-L35

mycaule avatar Jun 20 '18 12:06 mycaule

I think the problem occurs when testing with HDFS with a particular configuration of the cluster. https://stackoverflow.com/questions/22997137/append-data-to-existing-file-in-hdfs-java

I check my HDFS cluster configuration and it does support appending.

<property>
   <name>dfs.support.append</name>
   <value>true</value>
</property>

mycaule avatar Jun 20 '18 12:06 mycaule

@mycaule The CSV/Avro/Orc/Parquet sinks were not designed to append to existing files for a couple of reasons:

  1. For binary formats like Parquet and Orc it’s not possible to append even though HDFS may allow it - this is because for example for Parquet stats (averages, counts, distinct values) are written to the footer of the file upon closure - Orc has a similar concept

  2. The config option you mentioned is not typically enabled for most HDFS cluster installations I have worked on.

  3. This could in theory work for text based formats like CSV but there's hardly been any requests for this feature given the immutable nature of HDFS.

hannesmiller avatar Jun 21 '18 15:06 hannesmiller

@mycaule In addition the hadoop FileSystem API has an append method as opposed to create - I tried this out by adding in an withAppend(true) method to the CsvSink, here's a snippet of my test:

      // Now write to the same file in append mode and test that we have double the amount of rows
      ds.to(CsvSink(path).withOverwrite(true))
      ds.to(CsvSink(path).withAppend(true))
      using(fs.open(path)) { inputStream =>
        using(new BufferedReader(new InputStreamReader(inputStream))) { reader =>
          val lines = reader.lines().toArray
          println(lines.mkString("\n"))
          lines.length shouldBe 4
        }
      }

Which unfortunately yields the following exception:

Not supported
java.io.IOException: Not supported
	at org.apache.hadoop.fs.ChecksumFileSystem.append(ChecksumFileSystem.java:357)
	at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)

The hadoop code says:

  @Override
  public FSDataOutputStream append(Path f, int bufferSize,
      Progressable progress) throws IOException {
    throw new IOException("Not supported");
  }

hannesmiller avatar Jun 21 '18 16:06 hannesmiller

@mycaule I would like to close this issue if you have no objections?

hannesmiller avatar Jun 21 '18 16:06 hannesmiller

Ok, thanks for investigating anyway !

mycaule avatar Jun 22 '18 12:06 mycaule

Please close this.

mycaule avatar Sep 05 '18 18:09 mycaule