beam icon indicating copy to clipboard operation
beam copied to clipboard

Support for reading Kafka topics from any startReadTime in Java

Open damccorm opened this issue 3 years ago • 4 comments

https://github.com/apache/beam/blob/fd8546355523f67eaddc22249606fdb982fe4938/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java#L180-L198

 

Right now the 'startReadTime' config for KafkaIO.Read looks up an offset in every topic partition that is newer or equal to that timestamp. The problem is that if we use a timestamp that is so new, that we don't have any newer/equal message in the partition. In that case the code fails with an exception. Meanwhile in certain cases it makes no sense as we could actually make it work.

If we don't get an offset from calling consumer.offsetsForTimes, we should call endOffsets, and use the returned offset **** 1. That is actually the offset we will have to read next time.

Even if endOffsets can't return an offset we could use 0 as the offset to read from.

 

Am I missing something here? Is it okay to contribute this?

Imported from Jira BEAM-14518. Original Jira may contain additional context. Reported by: bnemeth.

damccorm avatar Jun 05 '22 00:06 damccorm

I'm not sure that simply using endOffsets would work in all cases. It may make sense to instead have the caller of this method try to handle getting a correct offset.

johnjcasey avatar Jul 11 '22 19:07 johnjcasey

So basically the ability to support a provider that calculates an offset to be used if it's not found, and the default provider being the current implementation that throws an exception?

nbali avatar Jul 20 '22 22:07 nbali

Essentially. This method is public, so we shouldn't change the general pattern here. I would add a new method that calls this, but tries for end offset if it gets the exception, and then use that utility in the relevant places in the kafka readers

johnjcasey avatar Aug 04 '22 14:08 johnjcasey

I'm not sure I will have time in the near future to implement this, but given how slowly the discussion went I created a working solution for reading the whole kafka stream in a batch pipeline. So whoever needs a quicker workaround that is even more customizable:

/**
 * Using {@link KafkaIO.Read#withStopReadTime(org.joda.time.Instant)} will try to acquire an offset for the given timestamp.<br>
 * If there are only older offsets than the provided timestamp the default implementation fails with an exception.<br>
 * This implementation falls back to the newest available offset instead - essentially reading till the newest available message.
 */
@Slf4j
public class MyKafkaConsumer<K, V> extends KafkaConsumer<K, V> {
    
    public MyKafkaConsumer(Map<String, Object> configs) {
        super(configs);
    }
    
    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
        final Map<TopicPartition, OffsetAndTimestamp> result = super.offsetsForTimes(timestampsToSearch, timeout);
        
        final List<TopicPartition> topicPartitionsWithoutProperOffset =
                result.keySet().stream()
                        .filter(topicPartition -> result.get(topicPartition) == null)
                        .collect(Collectors.toList());
        
        endOffsets(topicPartitionsWithoutProperOffset).forEach((topicPartition, endOffset) -> {
            final Long timestampToSearch = timestampsToSearch.get(topicPartition);
            log.warn("Offset for topicPartition: {}, timestamp: {} was not found, replaced by endOffset: {}",
                    topicPartition, timestampToSearch, endOffset);
            result.put(topicPartition, new OffsetAndTimestamp(endOffset, timestampToSearch));
        });
        
        return result;
    }
    
}
KafkaIO.readBytes()
	.withStopReadTime(Instant.now())
	.withConsumerFactoryFn(MyKafkaConsumer::new) // required for stopReadTime

nbali avatar Aug 09 '22 00:08 nbali