dagger icon indicating copy to clipboard operation
dagger copied to clipboard

runFlink fails on fresh checkout repo

Open yz-chime opened this issue 3 years ago • 6 comments

Description The failure occurred when I checked out the repo and ran the basic command in the guide. It seems com.tests.TestMessage is somehow not registered in Stencil. I'm on MacOS 12.5.1, with Java8 and Kafka installed. Stacktrace:

yunfanzhong | dagger $ ./gradlew dagger-core:runFlink

> Task :dagger-core:runFlink FAILED
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/yunfanzhong/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.7/58f588119ffd1702c77ccab6acb54bfb41bed8bd/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/yunfanzhong/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-simple/1.7.25/8dacf9514f0c707cbbcdd6fd699e8940d42fb54e/slf4j-simple-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/yunfanzhong/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.10/b3eeae7d1765f988a1f45ea81517191315c69c9e/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
09:23:11,421 INFO  io.odpf.dagger.core.config.ConfigurationProviderFactory       - Arguments are:
com.tests.TestMessage
io.odpf.dagger.common.exceptions.DescriptorNotFoundException: descriptor not found
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.createFieldDescriptor(ProtoType.java:59)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getProtoFieldDescriptor(ProtoType.java:50)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getRowType(ProtoType.java:44)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoDeserializer.<init>(ProtoDeserializer.java:46)
        at io.odpf.dagger.core.deserializer.ProtoDeserializerProvider.getDaggerDeserializer(ProtoDeserializerProvider.java:40)
        at io.odpf.dagger.core.deserializer.DaggerDeserializerFactory.create(DaggerDeserializerFactory.java:28)
        at io.odpf.dagger.core.source.Stream$Builder.build(Stream.java:46)
        at io.odpf.dagger.core.source.StreamsFactory.getStreams(StreamsFactory.java:18)
        at io.odpf.dagger.core.StreamManager.getStreams(StreamManager.java:237)
        at io.odpf.dagger.core.StreamManager.registerSourceWithPreProcessors(StreamManager.java:103)
        at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:37)
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: io.odpf.dagger.common.exceptions.DescriptorNotFoundException: descriptor not found
        at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:43)
Caused by: io.odpf.dagger.common.exceptions.DescriptorNotFoundException: descriptor not found
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.createFieldDescriptor(ProtoType.java:59)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getProtoFieldDescriptor(ProtoType.java:50)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoType.getRowType(ProtoType.java:44)
        at io.odpf.dagger.common.serde.proto.deserialization.ProtoDeserializer.<init>(ProtoDeserializer.java:46)
        at io.odpf.dagger.core.deserializer.ProtoDeserializerProvider.getDaggerDeserializer(ProtoDeserializerProvider.java:40)
        at io.odpf.dagger.core.deserializer.DaggerDeserializerFactory.create(DaggerDeserializerFactory.java:28)
        at io.odpf.dagger.core.source.Stream$Builder.build(Stream.java:46)
        at io.odpf.dagger.core.source.StreamsFactory.getStreams(StreamsFactory.java:18)
        at io.odpf.dagger.core.StreamManager.getStreams(StreamManager.java:237)
        at io.odpf.dagger.core.StreamManager.registerSourceWithPreProcessors(StreamManager.java:103)
        at io.odpf.dagger.core.KafkaProtoSQLProcessor.main(KafkaProtoSQLProcessor.java:37)

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':dagger-core:runFlink'.
> Process 'command '/usr/local/Cellar/openjdk@8/1.8.0+345/libexec/openjdk.jdk/Contents/Home/bin/java'' finished with non-zero exit value 1

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.6.1/userguide/command_line_interface.html#sec:command_line_warnings

However, all tests pass when I ran ./gradlew clean test. I noticed that com.tests.TestMessage is referenced in many tests. Not sure why the behavior is different in dagger-core:runFlink target.

To Reproduce Steps to reproduce the behavior: Check out the repo and run ./gradlew dagger-core:runFlink

Expected behavior Not sure what's the expected behavior, maybe dagger process will keep running until shutdown?

yz-chime avatar Aug 25 '22 16:08 yz-chime

@yz-chime Thanks for trying it out. For stencil to work, you need to run a stencil server. please check https://github.com/odpf/stencil for more info. You have to put the proto descriptor on the stencil. Or you wanna try it out locally, you can include proto classes in the jar and disable stencil. I hope this helps. TestMessage will not help because it's part of the test package, it's not bundled in the main jar.

lavkesh avatar Aug 26 '22 04:08 lavkesh

Thanks. A few follow up questions:

  1. How to disable stencil?
  2. I assume if deployed to a flink 1.9 cluster, we can also disable stencil when proto classes are built into the jar?
  3. Is there a guide on common practice of building container image and deploying to a Kubernetes cluster?

yz-chime avatar Aug 26 '22 18:08 yz-chime

@yz-chime Please go through the documentation here and here

lavkesh avatar Aug 27 '22 02:08 lavkesh

@yz-chime In addition to ref @lavkesh provided inlining configs.

  1. Using this config
SCHEMA_REGISTRY_STENCIL_ENABLE : false
  1. Yes
  2. Yes (https://github.com/odpf/dagger/blob/main/docs/docs/guides/deployment.md)

ravisuhag avatar Aug 27 '22 02:08 ravisuhag

I copied generated classes of TestMessage.proto to the src/main/java of dapper-common. runFlink target could start flink and listen to kafka topic. I chose the io.odpf.dagger.consumer.TestNestedRepeatedMessage as the schema for the test Kafka topic, and set 6 as the INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX according to the proto definition.

I generated protobuf template in python and wrote a python script to send a serialized sample events to the topic. The code is straightforward:

from pb.TestMessage_pb2 import TestNestedRepeatedMessage
from kafka import KafkaProducer

tnrm = TestNestedRepeatedMessage()
tm = tnrm.single_message
tm.order_number = "123"
tm.order_url = "https://www.example.com/page"
tm.order_details = "something"

tnrm.number_field = 123
tnrm.event_timestamp.GetCurrentTime()
# print(tnrm.ListFields())

data = tnrm.SerializeToString()

producer = KafkaProducer(bootstrap_servers="localhost:9092")

producer.send("test-topic", data).get()
print(f"sent {len(data)} bytes to test-topic")

However the Dagger job instantly crashes when receiving the event. Relevant log lines are below:

11:31:33,168 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='test-topic', partition=0}]
11:31:33,170 INFO  org.apache.flink.runtime.taskmanager.Task                     - Source: Custom Source -> Timestamps/Watermarks -> Filter -> SourceConversion(table=[default_catalog.default_database.data_stream], fields=[single_message, repeated_message, number_field, repeated_number_field, metadata, event_timestamp, repeated_long_field, __internal_validation_field__, rowtime]) -> SinkConversionToTuple2 -> Filter -> Map (1/1)#0 (02e81ee53e06b8a604da2703d593457d) switched from INITIALIZING to RUNNING.
11:31:33,171 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Timestamps/Watermarks -> Filter -> SourceConversion(table=[default_catalog.default_database.data_stream], fields=[single_message, repeated_message, number_field, repeated_number_field, metadata, event_timestamp, repeated_long_field, __internal_validation_field__, rowtime]) -> SinkConversionToTuple2 -> Filter -> Map (1/1) (02e81ee53e06b8a604da2703d593457d) switched from INITIALIZING to RUNNING.
11:31:33,174 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 creating fetcher with offsets {KafkaTopicPartition{topic='test-topic', partition=0}=-915623761773}.
11:31:33,186 INFO  org.apache.kafka.clients.consumer.ConsumerConfig              - ConsumerConfig values: ... (omitted)
11:31:33,192 WARN  org.apache.kafka.clients.consumer.ConsumerConfig              - The configuration 'auto.commit.enable' was supplied but isn't a known config.
11:31:33,192 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka version: 2.4.1
11:31:33,193 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka commitId: c57222ae8cd7866b
11:31:33,193 INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka startTimeMs: 1661970693192
11:31:33,196 INFO  org.apache.kafka.clients.consumer.KafkaConsumer               - [Consumer clientId=consumer-dummy-consumer-group-2, groupId=dummy-consumer-group] Subscribed to partition(s): test-topic-0
11:31:33,213 INFO  org.apache.kafka.clients.Metadata                             - [Consumer clientId=consumer-dummy-consumer-group-2, groupId=dummy-consumer-group] Cluster ID: 6Mx7Agf4SNmUXno6XX-7_w
11:31:33,215 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-dummy-consumer-group-2, groupId=dummy-consumer-group] Discovered group coordinator 10.0.0.3:9092 (id: 2147483647 rack: null)
11:31:33,224 INFO  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - [Consumer clientId=consumer-dummy-consumer-group-2, groupId=dummy-consumer-group] Setting offset for partition test-topic-0 to the committed offset FetchPosition{offset=1, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=10.0.0.3:9092 (id: 0 rack: null), epoch=0}}
#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x000000010eedbd1d, pid=8572, tid=0x0000000000014f03
#
# JRE version: OpenJDK Runtime Environment (8.0_345) (build 1.8.0_345-bre_2022_08_04_23_35-b00)
# Java VM: OpenJDK 64-Bit Server VM (25.345-b00 mixed mode bsd-amd64 compressed oops)
# Problematic frame:
# V  [libjvm.dylib+0x539d1d]
#
# Failed to write core dump. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /Users/yunfanzhong/code/dagger/dagger-core/hs_err_pid8572.log
Compiled method (nm)   16202 1513     n 0       sun.misc.Unsafe::getInt (native)
 total in heap  [0x0000000114e1a010,0x0000000114e1a360] = 848
 relocation     [0x0000000114e1a138,0x0000000114e1a178] = 64
 main code      [0x0000000114e1a180,0x0000000114e1a360] = 480
Compiled method (nm)   16203 1513     n 0       sun.misc.Unsafe::getInt (native)
 total in heap  [0x0000000114e1a010,0x0000000114e1a360] = 848
 relocation     [0x0000000114e1a138,0x0000000114e1a178] = 64
 main code      [0x0000000114e1a180,0x0000000114e1a360] = 480
#
# If you would like to submit a bug report, please visit:
#   https://github.com/Homebrew/homebrew-core/issues
#

> Task :dagger-core:runFlink FAILED

I'm not sure what caused this. Is it because TestNestedRepeatedMessage object is too complex and I probably should use a plain object with simple fields instead?

yz-chime avatar Aug 31 '22 18:08 yz-chime

I tried a simple message definition and the job still crashes.

message TestSimpleEvent {
    string user_id = 1;
    string event_name = 2;
    google.protobuf.Timestamp event_timestamp = 3;
}

yz-chime avatar Aug 31 '22 19:08 yz-chime