1.10版本 无法使用WATERMARK ,报空指针。
日志样例 {"time":"2020-06-17 23:00:01.211","pushid":"pushback_send-BC110-12392212311","app":"mm"} 由于时间序列是bigint类型,用UNIX_TIMESTAMP进行转换 Flink运行日志报错: ERROR com.dtstack.flink.sql.watermarker.CustomerWaterMarkerForLong - java.lang.NullPointerException
建表语句: CREATE TABLE MyTable ( time varchar , pushid varchar , app varchar , UNIX_TIMESTAMP(time, 'yyyy-MM-dd HH:mm:ss')*1000 bigint AS xctime , WATERMARK FOR xctime AS withOffset( xctime , 1000) ) WITH ( type='kafka11', bootstrapServers='kafka:9092', offsetReset='latest', topic='test_1', groupId='flink_sql', parallelism='4', timezone='Asia/Shanghai', topicIsPattern ='false', sourcedatatype ='dt_nest' );
CREATE TABLE MyResult( app VARCHAR, cnt BIGINT, wStart timestamp )WITH( type ='elasticsearch6', address ='eshost:9200', cluster='bigdata-es5.6', estype ='date', index ='MyResult', parallelism ='1', id='0' );
insert into MyResult
select
d.app ,
count(d.pushid) cnt ,
TUMBLE_START(d.ROWTIME, INTERVAL '3' SECOND) as wStart
from
MyTable as d
group by d.app,TUMBLE(d.ROWTIME, INTERVAL '3' SECOND);
可否提供一份完整的1.10版本 WATERMARK 语法案例。实际测试中无法使用
2020-06-19 14:19:21,873 ERROR com.dtstack.flink.sql.watermarker.CustomerWaterMarkerForLong - java.lang.NullPointerException at com.dtstack.flink.sql.watermarker.AbstractCustomerWaterMarker.getExtractTimestamp(AbstractCustomerWaterMarker.java:105) at com.dtstack.flink.sql.watermarker.CustomerWaterMarkerForLong.extractTimestamp(CustomerWaterMarkerForLong.java:53) at com.dtstack.flink.sql.watermarker.CustomerWaterMarkerForLong.extractTimestamp(CustomerWaterMarkerForLong.java:38) at org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor.extractTimestamp(BoundedOutOfOrdernessTimestampExtractor.java:81) at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:64) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SinkConversion$3.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at SourceConversion$1.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at com.dtstack.flink.sql.source.kafka.KafkaConsumer011.run(KafkaConsumer011.java:68) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
问题已记录,会尽快修复