Pipeline testing w/ Session window function throws in 0.7.0-alpha2
Hi! Just updated a pipeline to the new 0.7.0-alpha2. The pipeline runs well on the direct runner and in Google Dataflow, but the test fails with the following message:
[info] java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. Invalid because: WindowFn has already been consumed by previous GroupByKey [info] at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:162) [info] at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:185) [info] at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:107) [info] at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) [info] at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) [info] at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352) [info] at com.spotify.scio.values.PCollectionWrapper.applyInternal(PCollectionWrapper.scala:38) [info] at com.spotify.scio.values.PCollectionWrapper.applyInternal$(PCollectionWrapper.scala:36) [info] at com.spotify.scio.values.SCollectionImpl.applyInternal(SCollection.scala:1138) [info] at com.spotify.scio.values.PCollectionWrapper.pApply(PCollectionWrapper.scala:49)
It happened only after bumping the version to 0.7.0, it didn't happen in 0.6
Code:
input type: SCollection[TableRow] ...
.withName("Remove ProcessingTime For Deduplication").map(genericEventParserFunctions.removeProcessingTime)
.withName("Map to Tuple With Id For Deduplicating Events").map(
input =>
(
input.getOrDefault("Id", "NoId").asInstanceOf[String],
input
)
)
.withName("Apply Session Window Based on Id").withSessionWindows(
gapDuration = Duration.standardMinutes(10),
options = WindowOptions(
allowedLateness = Duration.ZERO,
trigger = AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(12))
),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
)
)
.withName("Group By Id").groupBy(input => input._1)
Can't repro with the following snippet and local runner. Can you provide more details or work the snippet to a state that can repro the issue?
object Test {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.optionsAs[StreamingOptions].setStreaming(true)
sc.customInput("In",
GenerateSequence
.from(0)
.withRate(1, Duration.standardSeconds(1))
.withTimestampFn(x => new Instant(x * 1000)))
.flatMap(x => Seq("a", "a", "a", "b", "b", "c"))
.withSessionWindows(
gapDuration = Duration.standardMinutes(10),
options = WindowOptions(
allowedLateness = Duration.ZERO,
trigger = AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(12))
),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
)
)
.debug()
.groupBy(identity)
.debug()
sc.close()
}
}
Hi Neville, I had the issue doing the integration tests.
Can you send me your code snippet of the testing, so I can work to the scenario I had?
Thank you.
It's the one I posted above.
Hi Neville, I discovered what is causing the bug. It's the "saveAsTypedBigQuery" function downstream in my pipe. When I turn it off, the test passes. I've been able to reproduce the error scenario, based on my own pipe:
package scio_test
import com.spotify.scio.ContextAndArgs
import com.spotify.scio.bigquery.TableRow
import com.spotify.scio.bigquery.types.BigQueryType
import com.spotify.scio.values.WindowOptions
import org.apache.beam.sdk.io.GenerateSequence
import org.apache.beam.sdk.options.StreamingOptions
import org.apache.beam.sdk.transforms.windowing.{AfterProcessingTime, AfterWatermark}
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.{CreateDisposition, WriteDisposition}
import com.spotify.scio.bigquery._
import org.joda.time.{Duration, Instant}
object Test {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.optionsAs[StreamingOptions].setStreaming(true)
sc.customInput("In",
GenerateSequence
.from(0)
.withRate(1, Duration.standardSeconds(5))
.withTimestampFn(x => new Instant(x * 1000)))
.map(x => (x.toString, TableRow("Id"->x.toString, "TestContent"->x.toString)))
.withSessionWindows(
gapDuration = Duration.standardSeconds(5),
options = WindowOptions(
allowedLateness = Duration.ZERO,
trigger = AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10))
),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES
)
)
.groupBy(_._1)
.withName("Deduplicate to Option of TableRow").map({
input => {
val inputListDistinct = input._2.toList.distinct
if (inputListDistinct.isEmpty) None
else Some(inputListDistinct.map(_._2))
}
})
.withName("From Options to Values - After Mutation")
.flatMap(element => element).flatMap(element => element)
.map(Extra.parseToTyped)
.saveAsTypedBigQuery(
"arquiveidev:arquivei_streaming_parsed_events.test",
writeDisposition = WriteDisposition.WRITE_APPEND,
createDisposition = CreateDisposition.CREATE_NEVER
)
sc.close()
}
}
object Extra {
@BigQueryType.toTable
case class TestRow
(
Id: Option[String],
TestContent: Option[String]
)
def parseToTyped(input: TableRow): TestRow = {
val id = input.getOrDefault("Id", null)
val testContent = input.getOrDefault("TestContent", null)
TestRow(
Id = if(id == null) None else Some(id.toString),
TestContent = if(testContent == null) None else Some(testContent.toString)
)
}
}
Test code:
package scio_test
import com.spotify.scio.bigquery.BigQueryIO
import com.spotify.scio.io.CustomIO
import com.spotify.scio.testing.PipelineSpec
import scio_test.Extra.TestRow
class TesteTest extends PipelineSpec{
"Pipe" should "work" in {
val out = List(
TestRow(Some("1"),Some("1")),
TestRow(Some("2"),Some("2"))
)
JobTest[scio_test.Test.type]
.input(CustomIO("In"), List(new java.lang.Long(1), new java.lang.Long(2)))
.output(BigQueryIO[TestRow]("arquiveidev:arquivei_streaming_parsed_events.test")) (result => {
result should containInAnyOrder(out)
})
.run()
}
}
Also, if you change the window to fixedWindow, the test passes
Thanks. I can verify that this reproduces the issue. Will look into it.
Found this https://stackoverflow.com/questions/46983318/writing-via-textio-write-with-sessions-windowing-raises-groupbykey-consumption-e
I verified that .saveAsTextFile() causes the same error. Adding .withGlobalWindow() before saveAs* fixes the issue. ~Are you sure this only affects 0.7.0? The bug is a year old.~ Also verified that 0.6.1 works for this case. So does reverting Beam back to 2.6.0. So it's most likely a Beam change. We should report this to Apache if we can reproduce it with a Java snippet (should be trivial).
Also can you re-window before writing as a workaround? IMO writing sessioned data (which is presumed to be keyed) doesn't make sense.
Thanks Neville! The workaround using withGlobalWindow worked. I'm using the session window to deduplicate the events, I'll change to global window after deduplication, because the session window won't make sense right before the writing.
Link to BEAM issue: https://issues.apache.org/jira/browse/BEAM-3122
@nevillelyh Can we close this ? The issue is upstream and has had no activity since Oct 2018 ?