scio icon indicating copy to clipboard operation
scio copied to clipboard

Pipeline testing w/ Session window function throws in 0.7.0-alpha2

Open soldera opened this issue 7 years ago • 10 comments

Hi! Just updated a pipeline to the new 0.7.0-alpha2. The pipeline runs well on the direct runner and in Google Dataflow, but the test fails with the following message:

[info] java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. Invalid because: WindowFn has already been consumed by previous GroupByKey [info] at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:162) [info] at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:185) [info] at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:107) [info] at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) [info] at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) [info] at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352) [info] at com.spotify.scio.values.PCollectionWrapper.applyInternal(PCollectionWrapper.scala:38) [info] at com.spotify.scio.values.PCollectionWrapper.applyInternal$(PCollectionWrapper.scala:36) [info] at com.spotify.scio.values.SCollectionImpl.applyInternal(SCollection.scala:1138) [info] at com.spotify.scio.values.PCollectionWrapper.pApply(PCollectionWrapper.scala:49)

It happened only after bumping the version to 0.7.0, it didn't happen in 0.6

Code:

input type: SCollection[TableRow] ...

.withName("Remove ProcessingTime For Deduplication").map(genericEventParserFunctions.removeProcessingTime)
      .withName("Map to Tuple With Id For Deduplicating Events").map(
        input =>
          (
            input.getOrDefault("Id", "NoId").asInstanceOf[String],
            input
          )
      )
      .withName("Apply Session Window Based on Id").withSessionWindows(
        gapDuration = Duration.standardMinutes(10),
        options = WindowOptions(
          allowedLateness = Duration.ZERO,
          trigger = AfterWatermark.pastEndOfWindow()
            .withEarlyFirings(
              AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardMinutes(12))
            ),
          accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
        )
      )
      .withName("Group By Id").groupBy(input => input._1)

soldera avatar Oct 16 '18 16:10 soldera

Can't repro with the following snippet and local runner. Can you provide more details or work the snippet to a state that can repro the issue?

object Test {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    sc.optionsAs[StreamingOptions].setStreaming(true)

    sc.customInput("In",
                   GenerateSequence
                     .from(0)
                     .withRate(1, Duration.standardSeconds(1))
                     .withTimestampFn(x => new Instant(x * 1000)))
      .flatMap(x => Seq("a", "a", "a", "b", "b", "c"))
      .withSessionWindows(
        gapDuration = Duration.standardMinutes(10),
        options = WindowOptions(
          allowedLateness = Duration.ZERO,
          trigger = AfterWatermark
            .pastEndOfWindow()
            .withEarlyFirings(
              AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardMinutes(12))
            ),
          accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
        )
      )
      .debug()
      .groupBy(identity)
      .debug()

    sc.close()
  }
}

nevillelyh avatar Oct 16 '18 18:10 nevillelyh

Hi Neville, I had the issue doing the integration tests.

Can you send me your code snippet of the testing, so I can work to the scenario I had?

Thank you.

soldera avatar Oct 16 '18 19:10 soldera

It's the one I posted above.

nevillelyh avatar Oct 16 '18 19:10 nevillelyh

Hi Neville, I discovered what is causing the bug. It's the "saveAsTypedBigQuery" function downstream in my pipe. When I turn it off, the test passes. I've been able to reproduce the error scenario, based on my own pipe:

package scio_test

import com.spotify.scio.ContextAndArgs
import com.spotify.scio.bigquery.TableRow
import com.spotify.scio.bigquery.types.BigQueryType
import com.spotify.scio.values.WindowOptions
import org.apache.beam.sdk.io.GenerateSequence
import org.apache.beam.sdk.options.StreamingOptions
import org.apache.beam.sdk.transforms.windowing.{AfterProcessingTime, AfterWatermark}
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition}
import com.spotify.scio.bigquery._

import org.joda.time.{Duration, Instant}

object Test {

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    sc.optionsAs[StreamingOptions].setStreaming(true)

    sc.customInput("In",
      GenerateSequence
        .from(0)
        .withRate(1, Duration.standardSeconds(5))
        .withTimestampFn(x => new Instant(x * 1000)))
      .map(x => (x.toString, TableRow("Id"->x.toString, "TestContent"->x.toString)))
      .withSessionWindows(
        gapDuration = Duration.standardSeconds(5),
        options = WindowOptions(
          allowedLateness = Duration.ZERO,
          trigger = AfterWatermark
            .pastEndOfWindow()
            .withEarlyFirings(
              AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(10))
            ),
          accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
        )
      )
      .groupBy(_._1)
      .withName("Deduplicate to Option of TableRow").map({
      input => {
        val inputListDistinct = input._2.toList.distinct
        if (inputListDistinct.isEmpty) None
        else Some(inputListDistinct.map(_._2))
      }
    })
      .withName("From Options to Values - After Mutation")
      .flatMap(element => element).flatMap(element => element)
      .map(Extra.parseToTyped)
      .saveAsTypedBigQuery(
        "arquiveidev:arquivei_streaming_parsed_events.test",
        writeDisposition = WriteDisposition.WRITE_APPEND,
        createDisposition = CreateDisposition.CREATE_NEVER
      )

    sc.close()
  }
}

object Extra {

  @BigQueryType.toTable
  case class TestRow
  (
    Id: Option[String],
    TestContent: Option[String]
  )

  def parseToTyped(input: TableRow): TestRow = {
    val id = input.getOrDefault("Id", null)
    val testContent = input.getOrDefault("TestContent", null)

    TestRow(
      Id = if(id == null) None else Some(id.toString),
      TestContent = if(testContent == null) None else Some(testContent.toString)
    )
  }

}

Test code:

package scio_test

import com.spotify.scio.bigquery.BigQueryIO
import com.spotify.scio.io.CustomIO
import com.spotify.scio.testing.PipelineSpec
import scio_test.Extra.TestRow

class TesteTest extends PipelineSpec{

  "Pipe" should "work" in {

    val out = List(
      TestRow(Some("1"),Some("1")),
      TestRow(Some("2"),Some("2"))
    )


     JobTest[scio_test.Test.type]
      .input(CustomIO("In"), List(new java.lang.Long(1), new java.lang.Long(2)))
       .output(BigQueryIO[TestRow]("arquiveidev:arquivei_streaming_parsed_events.test")) (result => {
         result should containInAnyOrder(out)
       })
      .run()
  }


}

soldera avatar Oct 17 '18 17:10 soldera

Also, if you change the window to fixedWindow, the test passes

soldera avatar Oct 17 '18 17:10 soldera

Thanks. I can verify that this reproduces the issue. Will look into it.

nevillelyh avatar Oct 17 '18 20:10 nevillelyh

Found this https://stackoverflow.com/questions/46983318/writing-via-textio-write-with-sessions-windowing-raises-groupbykey-consumption-e

I verified that .saveAsTextFile() causes the same error. Adding .withGlobalWindow() before saveAs* fixes the issue. ~Are you sure this only affects 0.7.0? The bug is a year old.~ Also verified that 0.6.1 works for this case. So does reverting Beam back to 2.6.0. So it's most likely a Beam change. We should report this to Apache if we can reproduce it with a Java snippet (should be trivial).

Also can you re-window before writing as a workaround? IMO writing sessioned data (which is presumed to be keyed) doesn't make sense.

nevillelyh avatar Oct 17 '18 20:10 nevillelyh

Thanks Neville! The workaround using withGlobalWindow worked. I'm using the session window to deduplicate the events, I'll change to global window after deduplication, because the session window won't make sense right before the writing.

soldera avatar Oct 17 '18 21:10 soldera

Link to BEAM issue: https://issues.apache.org/jira/browse/BEAM-3122

jto avatar Oct 18 '18 08:10 jto

@nevillelyh Can we close this ? The issue is upstream and has had no activity since Oct 2018 ?

jto avatar Jun 25 '20 12:06 jto