[Bug]: Side Input Singleton View throw error when impulse period is short: PCollection with more than one element accessed as a singleton view
What happened?
I am writing a pipeline to consume message from pubsub, do some validation, transform and sink to bigquery.
I need to load some data from external api call to be used in pipeline validation stage, for which I followed Slowly updating global window side inputs to load config into side input. However, I keep getting error, while I already applied Latest.globally():
Caused by: java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. Consider using Combine.globally().asSingleton() to combine the PCollection into a single value
Going through online resource doesn't really help. However, I found that this happen only when the impulse duration is short, e.g. < 5s: GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
Is this expected?
This bothers me, because I am not sure if the short period the root cause. Or will the error shows up again if the pipeline traffic becomes large even I have a minute as impulse period in product environment.
Any one have the same issue, or suggestion?
Beam version: 2.46.0
Here are simplied version my code, that can reproduce the error:
import static com.applovin.array.silk.pipeline.common.Constants.CONFIG_DEV;
import static com.applovin.array.silk.pipeline.common.Constants.CONFIG_PROD;
import static org.apache.beam.sdk.options.SdkHarnessOptions.LogLevel;
import com.applovin.array.silk.pipeline.EventTransformer.EventTransform;
import com.applovin.array.silk.pipeline.EventTransformer.ExtractMessage;
import com.applovin.array.silk.pipeline.coders.FailureCoder;
import com.applovin.array.silk.pipeline.coders.JsonObjectCoder;
import com.applovin.array.silk.pipeline.coders.MessageContainerCoder;
import com.applovin.array.silk.pipeline.common.UncaughtExceptionLogger;
import com.applovin.array.silk.pipeline.models.Failure;
import com.applovin.array.silk.pipeline.models.MessageContainer;
import com.applovin.array.silk.pipeline.models.StartOptions;
import com.applovin.array.silk.pipeline.models.SystemConfigs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Latest;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestSideInputPipeline {
private static final Logger logger = LoggerFactory.getLogger(TestSideInputPipeline.class);
private static final FailureCoder<MessageContainer> FAILURE_CODER = FailureCoder.of(
MessageContainerCoder.of());
private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
public static SystemConfigs systemConfigs;
private static final CloseableHttpClient httpClient = HttpClients.createDefault();
public static void main(String[] args) throws IOException {
UncaughtExceptionLogger.register();
StartOptions options = loadOptions(args);
run(options);
}
public static StartOptions loadOptions(String[] args) throws IOException {
StartOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(
StartOptions.class);
ClassLoader classLoader = TestSideInputPipeline.class.getClassLoader();
InputStream configStream = classLoader.getResourceAsStream(
"prod".equalsIgnoreCase(options.getEnv()) ? CONFIG_PROD : CONFIG_DEV);
systemConfigs = mapper.readValue(configStream, SystemConfigs.class);
options.setDefaultSdkHarnessLogLevel(LogLevel.valueOf(
systemConfigs.getLogLevel().toUpperCase()
));
return options;
}
public static void run(StartOptions options) {
Pipeline pipeline = Pipeline.create(options);
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForClass(JsonObject.class, JsonObjectCoder.of());
coderRegistry.registerCoderForType(FAILURE_CODER.getEncodedTypeDescriptor(), FAILURE_CODER);
coderRegistry.registerCoderForClass(MessageContainer.class, MessageContainerCoder.of());
// Pull Pubsub message with PubsubIO, which automatically acknowledge message after complete processing.
// https://cloud.google.com/dataflow/docs/concepts/streaming-with-cloud-pubsub
PCollection<PubsubMessage> messages = pipeline.apply("ReadPubSubMessages",
PubsubIO.readMessagesWithAttributes()
.fromSubscription(systemConfigs.getIngestSubscription()));
// // extract message from pubsub structure
// ExtractMessage extractMessage = new ExtractMessage();
// PCollectionTuple extractMessageResult = messages.apply("ExtractMessage",
// ParDo.of(extractMessage).withOutputTags(extractMessage.getOutputTag(),
// TupleTagList.of(extractMessage.getFailuresTag())));
//
// // do some transform
// EventTransform eventTransform = new EventTransform();
// PCollectionTuple transformResult = extractMessageResult.get(extractMessage.getOutputTag())
// .apply("EventTransform", ParDo.of(eventTransform)
// .withOutputTags(eventTransform.getOutputTag(),
// TupleTagList.of(eventTransform.getFailuresTag())));
//
// PCollection<MessageContainer> transformedEvents = transformResult.get(
// eventTransform.getOutputTag());
// get side input view (json schemas), and validate
PCollectionView<Map<String, String>> map = pipeline.apply("Impulse",
GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
.apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
@ProcessElement
public void process(OutputReceiver<Map<String, String>> o) {
try {
CloseableHttpResponse response = httpClient.execute(
new HttpGet(systemConfigs.getSchemaUrl()));
String body = EntityUtils.toString(response.getEntity());
Map<String, String> schema = mapper.readValue(body, Map.class);
logger.info("Loader got schema: {}", schema);
o.output(schema);
} catch (Exception e) {
logger.error("Failed, {}", e.getMessage());
o.output(Map.of());
}
}
}))
.apply(Window.<Map<String, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(Latest.globally())
.apply(View.asSingleton());
PCollection<PubsubMessage> validateEventResult = messages.apply("ValidateEvent",
ParDo.of(new DoFn<PubsubMessage, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext ctx) {
PubsubMessage message = ctx.element();
Map<String, String> schemas = ctx.sideInput(map);
logger.info("Side input schemas: {}", schemas);
// validate message and save result.
ctx.output(message);
}
}).withSideInputs(map));
//more logic
pipeline.run();
}
}
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- [ ] Component: Python SDK
- [X] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
Label examples-java cannot be managed because it does not exist in the repo. Please check your spelling.
Label cannot be managed because it does not exist in the repo. Please check your spelling.
Label cannot be managed because it does not exist in the repo. Please check your spelling.
The issue here is that Latest.globally() is a separate triggered global Combine than the global Combine contained in View.asSingleton. Thus if Latest triggers multiple times, the subsequent singleton combine may observe more than 1 element when firing and fail.
See https://www.mail-archive.com/[email protected]/msg02129.html
A workaround is to use View.asIterable and take the last element of the iterable when consuming the side input. There will be more than 1 element only if Latest triggers multiple times before the side input combine processes the output.
@kennknowles What do you think about adding a new transform View.asLatest() that is logically the same as Latest.globally() + View.asSingleton() but is implemented with a single combine and thus the side input view will always be a single latest value each time it is calculated?
Just skimming this last comment, it sounds like this is related to the impetus to https://s.apache.org/beam-triggered-side-inputs
Right now we have a combination of
- "definition is whatever happens when you run it" situation with side inputs, always precarious
- side inputs pretty much unaware of triggers (as most of the model is) so they don't intelligently respect panes (they see two firings are just two separate elements rather than a revision to the element)
- no clear way to really manage deltas versus full replacement values on triggering; presumably side inputs would do this intelligently transparently
I am totally happy with a View.latest(). In my doc I propose that as the semantics for View.asSingleton :-)
If you can implement it without runner changes, or have the bandwidth to make the necessary runner changes, I would just replace View.asSingleton with that. Otherwise, having it as a stopgap until View.asSingleton can be adjusted is great.
https://s.apache.org/beam-side-inputs-1-pager
@ballooncross Looking at your code I can suggest an alternative approach by using a custom global combiner that will eliminate subsequent Latest.globally() and View.asSingleton() and will do everything in one step.
PCollectionView<Map<String, String>> map = pipeline.apply("Impulse",
GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1)))
.apply(Window.<Map<String, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(Combine.globally(new MaxImpulseFn<>(functionToRead, coderOfFunctionOutput))
.withoutDefaults()
.asSingletonView());
MaxImpulseFn is a custom combiner (even though quite simple) that finds the max impulse and reads the external data in extractOutput
private static class MaxImpulseFn<T> extends CombineWithContext.CombineFnWithContext<Long, Long, T> {
@Override
public T extractOutput(Long accumulator, Context context) {
return functionToRead.apply(accumulator, context.getPipelineOptions());
}
@Override
public Coder<T> getDefaultOutputCoder(CoderRegistry registry, Coder<Long> inputCoder) {
return coderOfFunctionOutput;
}
}
Hi, is there a plan to continue working on this? We are also trying to use similarly to this issue the SideInputs described in the https://beam.apache.org/documentation/patterns/side-inputs/ Docu on Google Dataflow and getting the same "java.lang.IllegalArgumentException: ...." After some time Dataflow running in the sideInput using transformation. Confusingly it occurs only in our dataflow Joby, in which we use several workers in parallel.
Can someone clarify what the plans to make this usable are?
Kind regards, Ivan Fröhlich SAP SE
@IvanFroehlich are the workarounds working for you?
@liferoad No, thats the issue. we have tried following meanwhile:
v2: replace the latest globally with custom combine function
...
pipeline
.apply(GenerateSequence.from(0)
.withRate(1, Duration.standardSeconds(envConfig.getConfigTtlInSeconds())))
.apply(ParDo.of(doFunction)) //here we read SideInputDestinations(external Data)
.apply(
Window.<SideInputDestinations>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(Combine.globally(new CombineSerializableFunction())
.apply(View.asSingleton());
...
static class CombineSerializableFunction implements
SerializableFunction<Iterable<SideInputDestinations>, SideInputDestinations> {
@Override
public @UnknownKeyFor @Nullable @Initialized SideInputDestinations apply(
Iterable<SideInputDestinations> input) {
SideInputDestinations last = null;
Iterator<SideInputDestinations> iterator = input.iterator();
while (iterator.hasNext()) {
last = iterator.next();
log.info("SideInput: processing iteration for object {}", last.hashCode());
}
return last;
}
}
=> same Exception
v3: replace the View.asSingleton() with asSingletonView():
...
pipeline
.apply(GenerateSequence.from(0)
.withRate(1, Duration.standardSeconds(envConfig.getConfigTtlInSeconds())))
.apply(ParDo.of(doFunction)) //here we read SideInputDestinations(external Data)
.apply(
Window.<SideInputDestinations>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(Combine.globally(new CombineSerializableFunction()).asSingletonView());
...
static class CombineSerializableFunction implements
SerializableFunction<Iterable<SideInputDestinations>, SideInputDestinations> {
@Override
public @UnknownKeyFor @Nullable @Initialized SideInputDestinations apply(
Iterable<SideInputDestinations> input) {
SideInputDestinations last = null;
Iterator<SideInputDestinations> iterator = input.iterator();
while (iterator.hasNext()) {
last = iterator.next();
log.info("SideInput: processing iteration for object {}", last.hashCode());
}
return last;
}
}
=> same Exception in using ParDo:
Caused by: java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view.
Should we try another version?
Kind Regards, Ivan
I believe the workaround is "use View.asIterable and take the last element of the iterable when consuming the side input."
Hi @IvanFroehlich. Have you tried the solution I posted above?