springwolf-core icon indicating copy to clipboard operation
springwolf-core copied to clipboard

Add support for Payload Kafka Key Types

Open Sonic-Rage opened this issue 2 years ago • 3 comments

We utilize an Object as a Key to send events with Kafka. It would be great if the scanner could recognize objects used for Kafka Keys to add the schemas automatically to message binding.

Here is a sample of a listener that we use with Spring Kafka

@KafkaHandler public void consumeMessage( @Headers MessageHeaders headers, @Header(KafkaHeaders.RECEIVED_KEY) KeyObject key, @Payload(required = false) PayloadObject payload) {

Sonic-Rage avatar Jan 25 '24 16:01 Sonic-Rage

Welcome to Springwolf. Thanks a lot for reporting your first issue. Please check out our contributors guide and feel free to join us on discord.

github-actions[bot] avatar Jan 25 '24 16:01 github-actions[bot]

Hi @Sonic-Rage I was looking into this ticket. Would be fair to assume that, given something like

    @KafkaHandler
    public void receiveExamplePayload(
            @Headers MessageHeaders headers,
            @Header(KafkaHeaders.RECEIVED_KEY) String key,
            @Payload(required = false) ExamplePayloadDto payload) {
        log.info("Received new message in {}: {}", TOPIC, payload.toString());
    }

Should generate

        "message": {
          "schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0",
          "name": "io.github.stavshamir.springwolf.example.kafka.dtos.AnotherPayloadDto",
          "title": "AnotherPayloadDto",
          "payload": {
            "$ref": "#/components/schemas/io.github.stavshamir.springwolf.example.kafka.dtos.AnotherPayloadDto"
          },
          "headers": {
            "type": "object",
            "properties": {
                "id": {
                   "type": "string"
                 },
                "kafka_receivedMessageKey": {
                   "type": "string"
                 },
                "timestamp": {
                   "type": "string"
                 },
          },
          "bindings": {
            "kafka": {
              "bindingVersion": "0.4.0"
            }
          }
        }

We are heavily working on the next release, but we can check how to improve this case. Can you confirm that would be your expectation?

ctasada avatar Jan 28 '24 12:01 ctasada

@ctasada Sorry it took me a little bit to go through the Async spec some. The key for Kafka doesn't appear to be well documented from what I can tell so hopefully I'm not making too many assumptions. My use case involves all three( Kafka headers, key and payload) Headers for traceability, and objects in Key for filtering and partioning Kafka events as well as payload objects.

The Readme here shows the key as https://github.com/asyncapi/bindings/blob/master/kafka/README.md#message-binding-object gets configured as a message binding.

To keep the key separated from the actual Kafka headers I would expect the Key to go into message bindings. Instead of manually defining KafkaAsyncOperationBinding with key definition if @Header(KafkaHeaders.RECEIVED_KEY) Object key is found then add to message bindings.

Here is a sample I tossed together using studio.asyncapi.com

{
    "asyncapi": "3.0.0",
    "info": {
        "title": "SpringWolf Kafka Object Key Payload",
        "version": "1.0.0",
        "contact": {
            "name": "foo",
            "url": "https://foo.com",
            "email": "[email protected]"
        },
        "tags": [],
        "x-generator": "springwolf"
    },
    "defaultContentType": "application/json",
    "servers": {
        "kafka": {
            "host": "localhost:9092",
            "protocol": "kafka"
        }
    },
    "channels": {
        "topic1": {
            "address": "Sample topic",
            "messages": {
                "topic1.message": {
                    "name": "Topic name",
                    "title": "Topic title",
                    "description": "Sample json for Kafka Key and Payload Objects Spring wolf",
                    "$ref": "#/components/messages/object1",
                    "headers": {
                        "$ref": "#/components/schemas/HeadersNotDocumented"
                    },
                    "bindings": {},
                    "payload": {
                        "schemaFormat": "application/vnd.oai.openapi+json;version=3.0.0"
                    }
                }
            }
        }
    },
    "operations": {
        "topic1": {
            "action": "send",
            "channel": {
                "$ref": "#/channels/topic1"
            },
            "description": "Sample topic with Kafka Key and payload as objects",
            "bindings": {},
            "messages": [
                {
                    "$ref": "#/channels/topic1/messages/topic1.message"
                }
            ]
        }
    },
    "components": {
        "messages": {
            "object1": {
                "name": "Topic 1 message",
                "title": "Topic 1 message",
                "headers": {
                    "$ref": "#/components/schemas/MessageHeaders"
                },
                "payload": {
                    "$ref": "#/components/schemas/MessagePayload"
                },
                "bindings": {
                    "kafka": {
                        "key": {
                            "$ref": "#/components/schemas/MessageKey"
                        }
                    }
                }
            }
        },
        "schemas": {
            "MessageHeaders": {
                "type": "object",
                "properties": {
                    "header1": {
                        "type": "string"
                    },
                    "header2": {
                        "type": "string"
                    }
                }
            },
            "MessagePayload": {
                "type": "object",
                "properties": {
                    "intField": {
                        "type": "integer",
                        "description": "some integer"
                    },
                    "stringField": {
                        "type": "string",
                        "description": "some string field"
                    }
                },
                "description": "Payload object",
                "example": {
                    "intField": 500,
                    "stringField": "foo"
                }
            },
            "MessageKey": {
                "type": "object",
                "properties": {
                    "keyField1": {
                        "type": "string",
                        "description": "Unique id that ensures equal distribution across partitions",
                        "example": "ea251adf-a778-4491-becb-f3ca9fd886a1"
                    },
                    "keyField2": {
                        "type": "string",
                        "description": "key field 2"
                    }
                },
                "description": "The key for the event",
                "example": {
                    "keyfield1": "ea251adf-a778-4491-becb-f3ca9fd886a1",
                    "keyfield2": "foo"
                }
            }
        }
    }
}

Sonic-Rage avatar Jan 31 '24 18:01 Sonic-Rage

The change is staged for release and will be part of the next release.

If you want to try and verify it in your application today, use the latest 1.X.0-SNAPSHOT build as described in our README.md > Testing SNAPSHOT version

Thank you for the report/contribution!

github-actions[bot] avatar Jun 21 '24 14:06 github-actions[bot]

The change is available in the latest release. 🎉

Thank you for the report/contribution and making Springwolf better!

github-actions[bot] avatar Jun 29 '24 11:06 github-actions[bot]