Unable to use event driven operations.
Component(s)
router
Component version
latest sha-627d542 0.240.3 main
wgc version
0.85.6
controlplane version
cloud
router version
latest sha-627d542 0.240.3 main
What happened?
Description
I already searched for implementing an event-driven graph using Kafka with these links:
- https://cosmo-docs.wundergraph.com/cli/subgraph/create#event-driven-graph
- https://cosmo-docs.wundergraph.com/router/event-driven-federated-subscriptions-edfs/kafka
When I call the mutation below:
curl -s http://localhost:8080/graphql -H 'Content-Type: application/json' -d '{
"query":"mutation($accountId: ID!){ publishChatMessageIncrement(accountId: $accountId){ success } }",
"variables": {
"accountId": "65c13016-05f5-449b-9472-7afca5de2d03",
"content": "x"
}
}'
The output is always like this:
{"errors":[{"message":"Failed to fetch from Subgraph 'b289de7f-225b-485b-b6de-c4fa343701e0'."}],"data":null}
It should have a value of { "success": true }.
Is it a bug from Cosmo, or am I using an event-driven graph in the wrong way?
Could you fix/give a correction about this problem?
Steps to Reproduce
Try to publish a similar edg schema file like this backend-edg-schema.graphqls
directive @edfs__kafkaSubscribe(topics: [String!]!, providerId: String! = "default") on FIELD_DEFINITION
directive @edfs__kafkaPublish(topic: String!, providerId: String! = "default") on FIELD_DEFINITION
type edfs__PublishResult {
success: Boolean!
}
type Account @key(fields: "id", resolvable: false) {
id: ID! @external
}
type Subscription {
postLikeIncremented: Account! @edfs__kafkaSubscribe(topics: ["postLike.increment"], providerId: "default")
postLikeDecremented: Account! @edfs__kafkaSubscribe(topics: ["postLike.decrement"], providerId: "default")
chatMessageIncremented: Account! @edfs__kafkaSubscribe(topics: ["chatMessage.increment"], providerId: "default")
}
type Mutation {
publishPostLikeIncrement(accountId: ID!): edfs__PublishResult! @edfs__kafkaPublish(topic: "postLike.increment", providerId: "default")
publishPostLikeDecrement(accountId: ID!): edfs__PublishResult! @edfs__kafkaPublish(topic: "postLike.decrement", providerId: "default")
publishChatMessageIncrement(accountId: ID!): edfs__PublishResult! @edfs__kafkaPublish(topic: "chatMessage.increment", providerId: "default")
}
For running all services:
- Clone infrastructure from https://github.com/muazhari/social-media-infrastructure-1.
- Clone backend-1 from https://github.com/muazhari/social-media-backend-1.
- Clone backend-2 from https://github.com/muazhari/social-media-backend-2.
- Adjust backend-1 and backend-2 path in .env infrastructure.
- Adjust networking in .env infrastructure.
- Fill the API token in .env infrastructure.
- Run
make run-dev create-subgraph publish-subgraphin infrastructure. - Run the curl command like above.
Expected Result
The subscription and mutation operations work normally.
Actual Result
The subscription and mutation operations are unable to work normally.
Environment information
Environment
OS: Ubuntu 24.04.1 LTS Package Manager: npm
Router configuration
# yaml-language-server: $schema=https://raw.githubusercontent.com/wundergraph/cosmo/main/router/pkg/config/config.schema.json
version: "1"
headers:
all:
request:
- op: "propagate"
named: "Authorization"
authentication:
jwt:
jwks:
- url: "${AUTH_JWKS_URL}"
header_name: Authorization
header_value_prefix: Bearer
events:
providers:
kafka:
- id: default
brokers:
- "${DATASTORE_5_HOST}:${DATASTORE_5_PORT}"
Router execution config
{
"engineConfig": {
"defaultFlushInterval": "500",
"datasourceConfigurations": [
{
"kind": "GRAPHQL",
"rootNodes": [
{
"typeName": "Account",
"fieldNames": [
"id",
"imageUrl",
"name",
"email",
"password",
"totalPostLike",
"totalChatMessage",
"scopes"
]
},
{
"typeName": "ChatMessage",
"fieldNames": [
"id",
"account"
],
"externalFieldNames": [
"accountId"
]
},
{
"typeName": "ChatRoomMember",
"fieldNames": [
"id",
"account"
],
"externalFieldNames": [
"accountId"
]
},
{
"typeName": "ChatRoom",
"fieldNames": [
"id"
]
},
{
"typeName": "Post",
"fieldNames": [
"id",
"account"
],
"externalFieldNames": [
"accountId"
]
},
{
"typeName": "PostLike",
"fieldNames": [
"id",
"account"
],
"externalFieldNames": [
"accountId"
]
},
{
"typeName": "Query",
"fieldNames": [
"accounts",
"account"
]
},
{
"typeName": "Mutation",
"fieldNames": [
"login",
"register",
"createAccount",
"updateAccount",
"updateMyAccount",
"deleteAccount"
]
}
],
"childNodes": [
{
"typeName": "Session",
"fieldNames": [
"account",
"accessToken",
"refreshToken"
]
}
],
"overrideFieldPathFromAlias": true,
"customGraphql": {
"fetch": {
"url": {
"staticVariableContent": "http://172.23.128.1:8081/graphql"
},
"method": "POST",
"body": {},
"baseUrl": {},
"path": {}
},
"subscription": {
"enabled": true,
"url": {
"staticVariableContent": "http://172.23.128.1:8081/graphql"
},
"protocol": "GRAPHQL_SUBSCRIPTION_PROTOCOL_WS",
"websocketSubprotocol": "GRAPHQL_WEBSOCKET_SUBPROTOCOL_AUTO"
},
"federation": {
"enabled": true,
"serviceSdl": "scalar Upload\n\ntype Account @key(fields: \"id\") {\n id: ID!\n imageUrl: String\n name: String!\n email: String!\n password: String! @requiresScopes(scopes: [[\"admin\"]])\n totalPostLike: Float!\n totalChatMessage: Float!\n scopes: [String!]!\n}\n\ntype ChatMessage @key(fields: \"id\") @extends {\n id: ID!\n accountId: ID! @external\n account: Account! @requires(fields: \"accountId\")\n}\n\ntype ChatRoomMember @key(fields: \"id\") @extends {\n id: ID!\n accountId: ID! @external\n account: Account! @requires(fields: \"accountId\")\n}\n\ntype ChatRoom @key(fields: \"id\") @extends {\n id: ID!\n}\n\ntype Post @key(fields: \"id\") @extends {\n id: ID!\n accountId: ID! @external\n account: Account! @requires(fields: \"accountId\")\n}\n\ntype PostLike @key(fields: \"id\") @extends {\n id: ID!\n accountId: ID! @external\n account: Account! @requires(fields: \"accountId\")\n}\n\ntype Session {\n account: Account!\n accessToken: String!\n refreshToken: String!\n}\n\ntype Query {\n accounts: [Account!]!\n account(id: ID!): Account\n}\n\ninput AccountInput {\n image: Upload\n name: String!\n email: String!\n password: String!\n scopes: [String!]\n}\n\ninput LoginInput {\n email: String!\n password: String!\n}\n\ninput RegisterInput {\n name: String!\n email: String!\n password: String!\n}\n\ntype Mutation {\n login(input: LoginInput!): Session!\n register(input: RegisterInput!): Account!\n createAccount(input: AccountInput!): Account! @requiresScopes(scopes: [[\"admin\"]])\n updateAccount(id: ID!, input: AccountInput!): Account! @requiresScopes(scopes: [[\"admin\"]])\n updateMyAccount(input: AccountInput!): Account! @authenticated\n deleteAccount(id: ID!): Account! @requiresScopes(scopes: [[\"admin\"]])\n}"
},
"upstreamSchema": {
"key": "35bab0c1e5ce233aab0f5623d08ab8ee0c9a431f"
}
},
"requestTimeoutSeconds": "10",
"id": "a8359ec1-658f-475e-b80c-b216e1264061",
"keys": [
{
"typeName": "Account",
"selectionSet": "id"
},
{
"typeName": "ChatMessage",
"selectionSet": "id"
},
{
"typeName": "ChatRoomMember",
"selectionSet": "id"
},
{
"typeName": "ChatRoom",
"selectionSet": "id"
},
{
"typeName": "Post",
"selectionSet": "id"
},
{
"typeName": "PostLike",
"selectionSet": "id"
}
],
"requires": [
{
"typeName": "ChatMessage",
"fieldName": "account",
"selectionSet": "accountId"
},
{
"typeName": "ChatRoomMember",
"fieldName": "account",
"selectionSet": "accountId"
},
{
"typeName": "Post",
"fieldName": "account",
"selectionSet": "accountId"
},
{
"typeName": "PostLike",
"fieldName": "account",
"selectionSet": "accountId"
}
]
},
{
"kind": "GRAPHQL",
"rootNodes": [
{
"typeName": "ChatMessage",
"fieldNames": [
"id",
"content",
"accountId",
"room"
]
},
{
"typeName": "ChatRoomMember",
"fieldNames": [
"id",
"accountId",
"room"
]
},
{
"typeName": "ChatRoom",
"fieldNames": [
"id",
"name",
"description",
"members",
"messages"
]
},
{
"typeName": "Post",
"fieldNames": [
"id",
"imageUrl",
"title",
"content",
"accountId",
"likes"
]
},
{
"typeName": "PostLike",
"fieldNames": [
"id",
"post",
"accountId"
]
},
{
"typeName": "Account",
"fieldNames": [
"id",
"messages",
"rooms",
"posts",
"postLikes"
]
},
{
"typeName": "Query",
"fieldNames": [
"chatRooms",
"chatRoomMembers",
"chatMessages",
"posts",
"myPosts",
"myChatRooms"
]
},
{
"typeName": "Mutation",
"fieldNames": [
"addChatRoom",
"addMemberToChatRoom",
"addMyChatMessage",
"addMyPost",
"likePost",
"unlikePost"
]
}
],
"overrideFieldPathFromAlias": true,
"customGraphql": {
"fetch": {
"url": {
"staticVariableContent": "http://172.23.128.1:8082/graphql"
},
"method": "POST",
"body": {},
"baseUrl": {},
"path": {}
},
"subscription": {
"enabled": true,
"url": {
"staticVariableContent": "http://172.23.128.1:8082/graphql"
},
"protocol": "GRAPHQL_SUBSCRIPTION_PROTOCOL_WS",
"websocketSubprotocol": "GRAPHQL_WEBSOCKET_SUBPROTOCOL_AUTO"
},
"federation": {
"enabled": true,
"serviceSdl": "scalar Upload\n\ntype ChatMessage @key(fields: \"id\") {\n id: ID!\n content: String!\n accountId: ID!\n room: ChatRoom!\n}\n\ntype ChatRoomMember @key(fields: \"id\") {\n id: ID!\n accountId: ID!\n room: ChatRoom!\n}\n\ntype ChatRoom @key(fields: \"id\") {\n id: ID!\n name: String!\n description: String\n members: [ChatRoomMember]\n messages: [ChatMessage]\n}\n\ntype Post @key(fields: \"id\") {\n id: ID!\n imageUrl: String\n title: String!\n content: String!\n accountId: ID!\n likes: [PostLike]\n}\n\ntype PostLike @key(fields: \"id\") {\n id: ID!\n post: Post!\n accountId: ID!\n}\n\ntype Account @key(fields: \"id\") @extends {\n id: ID!\n messages: [ChatMessage]\n rooms: [ChatRoom]\n posts: [Post]\n postLikes: [PostLike]\n}\n\ninput ChatRoomInput {\n name: String!\n description: String\n}\n\ninput ChatRoomMemberInput {\n accountId: ID\n chatRoomId: ID!\n}\n\ninput ChatMessageInput {\n chatRoomId: ID!\n accountId: ID\n content: String!\n}\n\ninput PostInput {\n accountId: ID\n title: String!\n content: String!\n image: Upload\n}\n\ninput PostLikeInput {\n postId: ID!\n}\n\ntype Query {\n chatRooms: [ChatRoom]\n chatRoomMembers(chatRoomId: ID!): [ChatRoomMember]\n chatMessages(chatRoomId: ID!): [ChatMessage]\n posts: [Post]\n myPosts: [Post]\n myChatRooms: [ChatRoom]\n}\n\ntype Mutation {\n addChatRoom(input: ChatRoomInput!): ChatRoom\n addMemberToChatRoom(input: ChatRoomMemberInput!): ChatRoomMember\n addMyChatMessage(input: ChatMessageInput!): ChatMessage\n addMyPost(input: PostInput!): Post\n likePost(input: PostLikeInput!): PostLike\n unlikePost(input: PostLikeInput!): PostLike\n}\n"
},
"upstreamSchema": {
"key": "c82fc423a14485cdf0395dce32e6d142a4593b0c"
}
},
"requestTimeoutSeconds": "10",
"id": "00dc55bf-85fc-4f34-9864-650bf2cb2cc3",
"keys": [
{
"typeName": "ChatMessage",
"selectionSet": "id"
},
{
"typeName": "ChatRoomMember",
"selectionSet": "id"
},
{
"typeName": "ChatRoom",
"selectionSet": "id"
},
{
"typeName": "Post",
"selectionSet": "id"
},
{
"typeName": "PostLike",
"selectionSet": "id"
},
{
"typeName": "Account",
"selectionSet": "id"
}
]
},
{
"kind": "PUBSUB",
"rootNodes": [
{
"typeName": "Subscription",
"fieldNames": [
"postLikeIncremented",
"postLikeDecremented",
"chatMessageIncremented"
]
},
{
"typeName": "Mutation",
"fieldNames": [
"publishPostLikeIncrement",
"publishPostLikeDecrement",
"publishChatMessageIncrement"
]
}
],
"childNodes": [
{
"typeName": "edfs__PublishResult",
"fieldNames": [
"success"
]
},
{
"typeName": "Account",
"fieldNames": [
"id"
],
"externalFieldNames": [
"id"
]
}
],
"overrideFieldPathFromAlias": true,
"requestTimeoutSeconds": "10",
"id": "b289de7f-225b-485b-b6de-c4fa343701e0",
"keys": [
{
"typeName": "Account",
"selectionSet": "id",
"disableEntityResolver": true
}
],
"customEvents": {
"kafka": [
{
"engineEventConfiguration": {
"providerId": "default",
"type": "SUBSCRIBE",
"typeName": "Subscription",
"fieldName": "postLikeIncremented"
},
"topics": [
"postLike.increment"
]
},
{
"engineEventConfiguration": {
"providerId": "default",
"type": "SUBSCRIBE",
"typeName": "Subscription",
"fieldName": "postLikeDecremented"
},
"topics": [
"postLike.decrement"
]
},
{
"engineEventConfiguration": {
"providerId": "default",
"type": "SUBSCRIBE",
"typeName": "Subscription",
"fieldName": "chatMessageIncremented"
},
"topics": [
"chatMessage.increment"
]
},
{
"engineEventConfiguration": {
"providerId": "default",
"typeName": "Mutation",
"fieldName": "publishPostLikeIncrement"
},
"topics": [
"postLike.increment"
]
},
{
"engineEventConfiguration": {
"providerId": "default",
"typeName": "Mutation",
"fieldName": "publishPostLikeDecrement"
},
"topics": [
"postLike.decrement"
]
},
{
"engineEventConfiguration": {
"providerId": "default",
"typeName": "Mutation",
"fieldName": "publishChatMessageIncrement"
},
"topics": [
"chatMessage.increment"
]
}
]
}
}
],
"fieldConfigurations": [
{
"typeName": "Query",
"fieldName": "account",
"argumentsConfiguration": [
{
"name": "id",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Query",
"fieldName": "chatRoomMembers",
"argumentsConfiguration": [
{
"name": "chatRoomId",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Query",
"fieldName": "chatMessages",
"argumentsConfiguration": [
{
"name": "chatRoomId",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Mutation",
"fieldName": "login",
"argumentsConfiguration": [
{
"name": "input",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Mutation",
"fieldName": "register",
"argumentsConfiguration": [
{
"name": "input",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Mutation",
"fieldName": "createAccount",
"argumentsConfiguration": [
{
"name": "input",
"sourceType": "FIELD_ARGUMENT"
}
],
"authorizationConfiguration": {
"requiresAuthentication": true,
"requiredOrScopes": [
{
"requiredAndScopes": [
"admin"
]
}
],
"requiredOrScopesByOr": [
{
"requiredAndScopes": [
"admin"
]
}
]
}
},
{
"typeName": "Mutation",
"fieldName": "updateAccount",
"argumentsConfiguration": [
{
"name": "id",
"sourceType": "FIELD_ARGUMENT"
},
{
"name": "input",
"sourceType": "FIELD_ARGUMENT"
}
],
"authorizationConfiguration": {
"requiresAuthentication": true,
"requiredOrScopes": [
{
"requiredAndScopes": [
"admin"
]
}
],
"requiredOrScopesByOr": [
{
"requiredAndScopes": [
"admin"
]
}
]
}
},
{
"typeName": "Mutation",
"fieldName": "updateMyAccount",
"argumentsConfiguration": [
{
"name": "input",
"sourceType": "FIELD_ARGUMENT"
}
],
"authorizationConfiguration": {
"requiresAuthentication": true
}
},
{
"typeName": "Mutation",
"fieldName": "deleteAccount",
"argumentsConfiguration": [
{
"name": "id",
"sourceType": "FIELD_ARGUMENT"
}
],
"authorizationConfiguration": {
"requiresAuthentication": true,
"requiredOrScopes": [
{
"requiredAndScopes": [
"admin"
]
}
],
"requiredOrScopesByOr": [
{
"requiredAndScopes": [
"admin"
]
}
]
}
},
{
"typeName": "Mutation",
"fieldName": "addChatRoom",
"argumentsConfiguration": [
{
"name": "input",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Mutation",
"fieldName": "addMemberToChatRoom",
"argumentsConfiguration": [
{
"name": "input",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Mutation",
"fieldName": "addMyChatMessage",
"argumentsConfiguration": [
{
"name": "input",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Mutation",
"fieldName": "addMyPost",
"argumentsConfiguration": [
{
"name": "input",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Mutation",
"fieldName": "likePost",
"argumentsConfiguration": [
{
"name": "input",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Mutation",
"fieldName": "unlikePost",
"argumentsConfiguration": [
{
"name": "input",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Mutation",
"fieldName": "publishPostLikeIncrement",
"argumentsConfiguration": [
{
"name": "accountId",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Mutation",
"fieldName": "publishPostLikeDecrement",
"argumentsConfiguration": [
{
"name": "accountId",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Mutation",
"fieldName": "publishChatMessageIncrement",
"argumentsConfiguration": [
{
"name": "accountId",
"sourceType": "FIELD_ARGUMENT"
}
]
},
{
"typeName": "Account",
"fieldName": "password",
"authorizationConfiguration": {
"requiresAuthentication": true,
"requiredOrScopes": [
{
"requiredAndScopes": [
"admin"
]
}
],
"requiredOrScopesByOr": [
{
"requiredAndScopes": [
"admin"
]
}
]
}
}
],
"graphqlSchema": "schema {\n query: Query\n mutation: Mutation\n subscription: Subscription\n}\n\ndirective @authenticated on ENUM | FIELD_DEFINITION | INTERFACE | OBJECT | SCALAR\n\ndirective @inaccessible on ARGUMENT_DEFINITION | ENUM | ENUM_VALUE | FIELD_DEFINITION | INPUT_FIELD_DEFINITION | INPUT_OBJECT | INTERFACE | OBJECT | SCALAR | UNION\n\ndirective @requiresScopes(scopes: [[openfed__Scope!]!]!) on ENUM | FIELD_DEFINITION | INTERFACE | OBJECT | SCALAR\n\ndirective @tag(name: String!) repeatable on ARGUMENT_DEFINITION | ENUM | ENUM_VALUE | FIELD_DEFINITION | INPUT_FIELD_DEFINITION | INPUT_OBJECT | INTERFACE | OBJECT | SCALAR | UNION\n\nscalar openfed__Scope\n\nscalar Upload\n\ntype Account {\n id: ID!\n imageUrl: String\n name: String!\n email: String!\n password: String! @requiresScopes(scopes: [[\"admin\"]])\n totalPostLike: Float!\n totalChatMessage: Float!\n scopes: [String!]!\n messages: [ChatMessage]\n rooms: [ChatRoom]\n posts: [Post]\n postLikes: [PostLike]\n}\n\ntype ChatMessage {\n id: ID!\n accountId: ID!\n account: Account!\n content: String!\n room: ChatRoom!\n}\n\ntype ChatRoomMember {\n id: ID!\n accountId: ID!\n account: Account!\n room: ChatRoom!\n}\n\ntype ChatRoom {\n id: ID!\n name: String!\n description: String\n members: [ChatRoomMember]\n messages: [ChatMessage]\n}\n\ntype Post {\n id: ID!\n accountId: ID!\n account: Account!\n imageUrl: String\n title: String!\n content: String!\n likes: [PostLike]\n}\n\ntype PostLike {\n id: ID!\n accountId: ID!\n account: Account!\n post: Post!\n}\n\ntype Session {\n account: Account!\n accessToken: String!\n refreshToken: String!\n}\n\ntype Query {\n accounts: [Account!]!\n account(id: ID!): Account\n chatRooms: [ChatRoom]\n chatRoomMembers(chatRoomId: ID!): [ChatRoomMember]\n chatMessages(chatRoomId: ID!): [ChatMessage]\n posts: [Post]\n myPosts: [Post]\n myChatRooms: [ChatRoom]\n}\n\ninput AccountInput {\n image: Upload\n name: String!\n email: String!\n password: String!\n scopes: [String!]\n}\n\ninput LoginInput {\n email: String!\n password: String!\n}\n\ninput RegisterInput {\n name: String!\n email: String!\n password: String!\n}\n\ntype Mutation {\n login(input: LoginInput!): Session!\n register(input: RegisterInput!): Account!\n createAccount(input: AccountInput!): Account! @requiresScopes(scopes: [[\"admin\"]])\n updateAccount(id: ID!, input: AccountInput!): Account! @requiresScopes(scopes: [[\"admin\"]])\n updateMyAccount(input: AccountInput!): Account! @authenticated\n deleteAccount(id: ID!): Account! @requiresScopes(scopes: [[\"admin\"]])\n addChatRoom(input: ChatRoomInput!): ChatRoom\n addMemberToChatRoom(input: ChatRoomMemberInput!): ChatRoomMember\n addMyChatMessage(input: ChatMessageInput!): ChatMessage\n addMyPost(input: PostInput!): Post\n likePost(input: PostLikeInput!): PostLike\n unlikePost(input: PostLikeInput!): PostLike\n publishPostLikeIncrement(accountId: ID!): edfs__PublishResult!\n publishPostLikeDecrement(accountId: ID!): edfs__PublishResult!\n publishChatMessageIncrement(accountId: ID!): edfs__PublishResult!\n}\n\ninput ChatRoomInput {\n name: String!\n description: String\n}\n\ninput ChatRoomMemberInput {\n accountId: ID\n chatRoomId: ID!\n}\n\ninput ChatMessageInput {\n chatRoomId: ID!\n accountId: ID\n content: String!\n}\n\ninput PostInput {\n accountId: ID\n title: String!\n content: String!\n image: Upload\n}\n\ninput PostLikeInput {\n postId: ID!\n}\n\ntype edfs__PublishResult {\n success: Boolean!\n}\n\ntype Subscription {\n postLikeIncremented: Account!\n postLikeDecremented: Account!\n chatMessageIncremented: Account!\n}",
"stringStorage": {
"35bab0c1e5ce233aab0f5623d08ab8ee0c9a431f": "schema {\n query: Query\n mutation: Mutation\n}\n\ndirective @authenticated on ENUM | FIELD_DEFINITION | INTERFACE | OBJECT | SCALAR\n\ndirective @composeDirective(name: String!) repeatable on SCHEMA\n\ndirective @extends on INTERFACE | OBJECT\n\ndirective @external on FIELD_DEFINITION | OBJECT\n\ndirective @inaccessible on ARGUMENT_DEFINITION | ENUM | ENUM_VALUE | FIELD_DEFINITION | INPUT_FIELD_DEFINITION | INPUT_OBJECT | INTERFACE | OBJECT | SCALAR | UNION\n\ndirective @interfaceObject on OBJECT\n\ndirective @key(fields: openfed__FieldSet!, resolvable: Boolean = true) repeatable on INTERFACE | OBJECT\n\ndirective @override(from: String!) on FIELD_DEFINITION\n\ndirective @provides(fields: openfed__FieldSet!) on FIELD_DEFINITION\n\ndirective @requires(fields: openfed__FieldSet!) on FIELD_DEFINITION\n\ndirective @requiresScopes(scopes: [[openfed__Scope!]!]!) on ENUM | FIELD_DEFINITION | INTERFACE | OBJECT | SCALAR\n\ndirective @shareable repeatable on FIELD_DEFINITION | OBJECT\n\ndirective @tag(name: String!) repeatable on ARGUMENT_DEFINITION | ENUM | ENUM_VALUE | FIELD_DEFINITION | INPUT_FIELD_DEFINITION | INPUT_OBJECT | INTERFACE | OBJECT | SCALAR | UNION\n\ntype Account @key(fields: \"id\") {\n email: String!\n id: ID!\n imageUrl: String\n name: String!\n password: String! @requiresScopes(scopes: [[\"admin\"]])\n scopes: [String!]!\n totalChatMessage: Float!\n totalPostLike: Float!\n}\n\ninput AccountInput {\n email: String!\n image: Upload\n name: String!\n password: String!\n scopes: [String!]\n}\n\ntype ChatMessage @key(fields: \"id\") @extends {\n account: Account! @requires(fields: \"accountId\")\n accountId: ID! @external\n id: ID!\n}\n\ntype ChatRoom @key(fields: \"id\") @extends {\n id: ID!\n}\n\ntype ChatRoomMember @key(fields: \"id\") @extends {\n account: Account! @requires(fields: \"accountId\")\n accountId: ID! @external\n id: ID!\n}\n\ninput LoginInput {\n email: String!\n password: String!\n}\n\ntype Mutation {\n createAccount(input: AccountInput!): Account! @requiresScopes(scopes: [[\"admin\"]])\n deleteAccount(id: ID!): Account! @requiresScopes(scopes: [[\"admin\"]])\n login(input: LoginInput!): Session!\n register(input: RegisterInput!): Account!\n updateAccount(id: ID!, input: AccountInput!): Account! @requiresScopes(scopes: [[\"admin\"]])\n updateMyAccount(input: AccountInput!): Account! @authenticated\n}\n\ntype Post @key(fields: \"id\") @extends {\n account: Account! @requires(fields: \"accountId\")\n accountId: ID! @external\n id: ID!\n}\n\ntype PostLike @key(fields: \"id\") @extends {\n account: Account! @requires(fields: \"accountId\")\n accountId: ID! @external\n id: ID!\n}\n\ntype Query {\n account(id: ID!): Account\n accounts: [Account!]!\n}\n\ninput RegisterInput {\n email: String!\n name: String!\n password: String!\n}\n\ntype Session {\n accessToken: String!\n account: Account!\n refreshToken: String!\n}\n\nscalar Upload\n\nscalar openfed__FieldSet\n\nscalar openfed__Scope",
"c82fc423a14485cdf0395dce32e6d142a4593b0c": "schema {\n query: Query\n mutation: Mutation\n}\n\ndirective @extends on INTERFACE | OBJECT\n\ndirective @external on FIELD_DEFINITION | OBJECT\n\ndirective @key(fields: openfed__FieldSet!, resolvable: Boolean = true) repeatable on INTERFACE | OBJECT\n\ndirective @provides(fields: openfed__FieldSet!) on FIELD_DEFINITION\n\ndirective @requires(fields: openfed__FieldSet!) on FIELD_DEFINITION\n\ndirective @tag(name: String!) repeatable on ARGUMENT_DEFINITION | ENUM | ENUM_VALUE | FIELD_DEFINITION | INPUT_FIELD_DEFINITION | INPUT_OBJECT | INTERFACE | OBJECT | SCALAR | UNION\n\ntype Account @key(fields: \"id\") @extends {\n id: ID!\n messages: [ChatMessage]\n postLikes: [PostLike]\n posts: [Post]\n rooms: [ChatRoom]\n}\n\ntype ChatMessage @key(fields: \"id\") {\n accountId: ID!\n content: String!\n id: ID!\n room: ChatRoom!\n}\n\ninput ChatMessageInput {\n accountId: ID\n chatRoomId: ID!\n content: String!\n}\n\ntype ChatRoom @key(fields: \"id\") {\n description: String\n id: ID!\n members: [ChatRoomMember]\n messages: [ChatMessage]\n name: String!\n}\n\ninput ChatRoomInput {\n description: String\n name: String!\n}\n\ntype ChatRoomMember @key(fields: \"id\") {\n accountId: ID!\n id: ID!\n room: ChatRoom!\n}\n\ninput ChatRoomMemberInput {\n accountId: ID\n chatRoomId: ID!\n}\n\ntype Mutation {\n addChatRoom(input: ChatRoomInput!): ChatRoom\n addMemberToChatRoom(input: ChatRoomMemberInput!): ChatRoomMember\n addMyChatMessage(input: ChatMessageInput!): ChatMessage\n addMyPost(input: PostInput!): Post\n likePost(input: PostLikeInput!): PostLike\n unlikePost(input: PostLikeInput!): PostLike\n}\n\ntype Post @key(fields: \"id\") {\n accountId: ID!\n content: String!\n id: ID!\n imageUrl: String\n likes: [PostLike]\n title: String!\n}\n\ninput PostInput {\n accountId: ID\n content: String!\n image: Upload\n title: String!\n}\n\ntype PostLike @key(fields: \"id\") {\n accountId: ID!\n id: ID!\n post: Post!\n}\n\ninput PostLikeInput {\n postId: ID!\n}\n\ntype Query {\n chatMessages(chatRoomId: ID!): [ChatMessage]\n chatRoomMembers(chatRoomId: ID!): [ChatRoomMember]\n chatRooms: [ChatRoom]\n myChatRooms: [ChatRoom]\n myPosts: [Post]\n posts: [Post]\n}\n\nscalar Upload\n\nscalar openfed__FieldSet",
"378341c47b2980be51d57dc83d89997b4381354f": "schema {\n mutation: Mutation\n subscription: Subscription\n}\n\ndirective @edfs__kafkaPublish(providerId: String! = \"default\", topic: String!) on FIELD_DEFINITION\n\ndirective @edfs__kafkaSubscribe(providerId: String! = \"default\", topics: [String!]!) on FIELD_DEFINITION\n\ndirective @extends on INTERFACE | OBJECT\n\ndirective @external on FIELD_DEFINITION | OBJECT\n\ndirective @key(fields: openfed__FieldSet!, resolvable: Boolean = true) repeatable on INTERFACE | OBJECT\n\ndirective @provides(fields: openfed__FieldSet!) on FIELD_DEFINITION\n\ndirective @requires(fields: openfed__FieldSet!) on FIELD_DEFINITION\n\ndirective @tag(name: String!) repeatable on ARGUMENT_DEFINITION | ENUM | ENUM_VALUE | FIELD_DEFINITION | INPUT_FIELD_DEFINITION | INPUT_OBJECT | INTERFACE | OBJECT | SCALAR | UNION\n\ntype Account @key(fields: \"id\", resolvable: false) {\n id: ID! @external\n}\n\ntype Mutation {\n publishChatMessageIncrement(accountId: ID!): edfs__PublishResult! @edfs__kafkaPublish(topic: \"chatMessage.increment\", providerId: \"default\")\n publishPostLikeDecrement(accountId: ID!): edfs__PublishResult! @edfs__kafkaPublish(topic: \"postLike.decrement\", providerId: \"default\")\n publishPostLikeIncrement(accountId: ID!): edfs__PublishResult! @edfs__kafkaPublish(topic: \"postLike.increment\", providerId: \"default\")\n}\n\ntype Subscription {\n chatMessageIncremented: Account! @edfs__kafkaSubscribe(topics: [\"chatMessage.increment\"], providerId: \"default\")\n postLikeDecremented: Account! @edfs__kafkaSubscribe(topics: [\"postLike.decrement\"], providerId: \"default\")\n postLikeIncremented: Account! @edfs__kafkaSubscribe(topics: [\"postLike.increment\"], providerId: \"default\")\n}\n\ntype edfs__PublishResult {\n success: Boolean!\n}\n\nscalar openfed__FieldSet"
}
},
"version": "11cea3d4-e120-4070-a3e6-b28fa3a45b3f",
"subgraphs": [
{
"id": "a8359ec1-658f-475e-b80c-b216e1264061",
"name": "backend-1",
"routingUrl": "http://172.23.128.1:8081/graphql"
},
{
"id": "00dc55bf-85fc-4f34-9864-650bf2cb2cc3",
"name": "backend-2",
"routingUrl": "http://172.23.128.1:8082/graphql"
},
{
"id": "b289de7f-225b-485b-b6de-c4fa343701e0",
"name": "backend-edg"
}
],
"compatibilityVersion": "1:{{$COMPOSITION__VERSION}}"
}
Log output
None
Additional context
None
WunderGraph commits fully to Open Source and we want to make sure that we can help you as fast as possible. The roadmap is driven by our customers and we have to prioritize issues that are important to them. You can influence the priority by becoming a customer. Please contact us here.
Hey @muazhari, thanks for taking the time to give us a proper report.
The most common issue we see is a misconfiguration of the provider. It could be that kafka is not reachable by the router or that the topic where is message is being published doesn't exist yet (it has to be created manually).
If you're sure that neither of these two issues is the cause, could you please try reproducing it with a log_level set to debug level? (https://cosmo-docs.wundergraph.com/router/configuration#router).
Thanks, and let me know!
I verified that:
- Kafka is accessible from the current Docker network. (see curl command in terminal)
- The related topic is already created. (see Kafka UI panel)
- When the mutation curl is executed, the topic still has zero messages. (see curl command in terminal and message count in Kafka UI panel)
Proofs:
-
VS Code and Kafka UI
-
Simulated router connection. Somehow, I can't attach the terminal to the router container using bash, sh, or ash. So, I do a simulated approach using another Docker container in the same network.
Router debug log:
* Executing task: docker logs --tail 1000 -f 4f6b092dc435b66d3c2fa25a88b5e5f3ab2de27b60cb2e4502c614df8f095875
11:53:42 AM INFO cmd/main.go:83 Config file path provided. Values in the config file have higher priority than environment variables {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "config_file": "/app/router-config.yaml"}
11:53:42 AM DEBUG maxprocs/maxprocs.go:47 maxprocs: Leaving GOMAXPROCS=8: CPU quota undefined {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1"}
11:53:42 AM DEBUG core/router.go:466 Using default trace exporter {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "endpoint": "https://cosmo-otel.wundergraph.com"}
11:53:42 AM DEBUG core/router.go:481 Using default metrics exporter {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "endpoint": "https://cosmo-otel.wundergraph.com"}
11:53:42 AM WARN core/router.go:553 Development mode enabled. This should only be used for testing purposes {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1"}
11:53:42 AM INFO core/router.go:566 Kafka Event source enabled {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "provider_id": "default", "brokers": ["172.23.138.198:9094"]}
11:53:42 AM INFO core/router.go:780 Registering router with control plane because you opted in to send telemetry to Cosmo Cloud or advanced request tracing (ART) in production {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1"}
11:53:42 AM DEBUG selfregister/self_register.go:75 Registering router on controlplane {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1"}
11:53:42 AM DEBUG selfregister/self_register.go:85 Router self registered on controlplane {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "duration": 0.367754882}
11:53:42 AM WARN core/router.go:791 Trace sampling rate is higher than account limit. Using account limit instead. Please contact support to increase your account limit. {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "limit": 1, "account_limit": "0.10"}
11:53:42 AM INFO trace/meter.go:108 Tracer enabled {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "exporter": "http", "endpoint": "https://cosmo-otel.wundergraph.com", "path": "/v1/traces"}
11:53:42 AM INFO metric/prometheus_server.go:43 Prometheus metrics enabled {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "listen_addr": "127.0.0.1:8088", "endpoint": "/metrics"}
11:53:42 AM INFO metric/meter.go:253 Metrics enabled {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "exporter": "http", "endpoint": "https://cosmo-otel.wundergraph.com", "path": "/v1/metrics"}
11:53:42 AM INFO core/router.go:862 GraphQL schema coverage metrics enabled {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1"}
11:53:42 AM INFO core/router.go:891 Serving GraphQL playground {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "url": "http://0.0.0.0:3002/"}
11:53:42 AM DEBUG core/router.go:1007 Default to Cosmo CDN as persisted operations provider {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "url": "https://cosmo-cdn.wundergraph.com"}
11:53:42 AM DEBUG graphqlmetrics/exporter.go:294 Starting exporter {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "component": "graphqlmetrics_exporter"}
11:53:42 AM INFO core/init_config_poller.go:108 Polling for execution config updates from Cosmo CDN in the background {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "interval": "10s"}
11:53:43 AM DEBUG core/websocket.go:135 Net poller is available {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1"}
11:53:43 AM INFO core/router.go:1198 GraphQL endpoint {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "method": "POST", "url": "http://0.0.0.0:3002/graphql"}
11:53:43 AM INFO core/router.go:1209 localhost fallback enabled, connections that fail to connect to localhost will be retried using host.docker.internal {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1"}
11:53:43 AM INFO core/router.go:1241 Server initialized and ready to serve requests {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "listen_addr": "0.0.0.0:3002", "playground": true, "introspection": true, "config_version": "11cea3d4-e120-4070-a3e6-b28fa3a45b3f"}
11:53:52 AM DEBUG graphqlmetrics/exporter.go:309 Exporter.start: tick {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "component": "graphqlmetrics_exporter"}
11:53:55 AM DEBUG configpoller/config_poller.go:78 No new router config available. Trying again ... {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "poll_interval": "10s", "fetch_time": "269.901409ms"}
11:54:02 AM DEBUG graphqlmetrics/exporter.go:309 Exporter.start: tick {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "component": "graphqlmetrics_exporter"}
11:54:03 AM INFO requestlogger/requestlogger.go:181 /graphql {"hostname": "4f6b092dc435", "pid": 1, "log_type": "request", "method": "POST", "path": "/graphql", "query": "", "ip": "[REDACTED]", "user_agent": "curl/8.5.0", "config_version": "11cea3d4-e120-4070-a3e6-b28fa3a45b3f", "trace_id": "74f12c88bf4203fc61334486cd6e602c", "latency": 0.001330702, "status": 200, "request_id": "4f6b092dc435/rRQLwdFP10-000001"}
11:54:03 AM DEBUG graphqlmetrics/exporter.go:315 Exporter.start: item {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "component": "graphqlmetrics_exporter"}
11:54:05 AM DEBUG configpoller/config_poller.go:78 No new router config available. Trying again ... {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "poll_interval": "10s", "fetch_time": "229.537347ms"}
@alepane21 Any suggestion? The router acknowledges the Kafka as in the log:
11:53:42 AM INFO core/router.go:566 Kafka Event source enabled {"hostname": "4f6b092dc435", "pid": 1, "service": "@wundergraph/router", "service_version": "0.193.1", "provider_id": "default", "brokers": ["172.23.138.198:9094"]}
However, why is the mutation request returned with a 200 status code but isn't processed correctly?
11:54:03 AM INFO requestlogger/requestlogger.go:181 /graphql {"hostname": "4f6b092dc435", "pid": 1, "log_type": "request", "method": "POST", "path": "/graphql", "query": "", "ip": "[REDACTED]", "user_agent": "curl/8.5.0", "config_version": "11cea3d4-e120-4070-a3e6-b28fa3a45b3f", "trace_id": "74f12c88bf4203fc61334486cd6e602c", "latency": 0.001330702, "status": 200, "request_id": "4f6b092dc435/rRQLwdFP10-000001"}
Hi @muazhari, I tested the execution config that you sent us, and locally I got it working.
ale@Mac cosmo % curl -s http://localhost:3002/graphql -H 'Content-Type: application/json' -d '{
"query":"mutation($accountId: ID!){ publishChatMessageIncrement(accountId: $accountId){ success } }",
"variables": {
"accountId": "65c13016-05f5-449b-9472-7afca5de2d03",
"content": "x"
}
}'
{"data":{"publishChatMessageIncrement":{"success":true}}}
Is this the only mutation not working? The subscriptions are working?
Let me know!
Is this the only mutation not working? The subscriptions are working?
I only have a test on that one mutation and subscription handle. I think if that one succeeds, the other will succeed too.
What have you changed and done when reproducing it until success? Could I know related information that might be a potential/suspected culprit? Also, does the Wunder Graph Cosmo GraphQL federation support on WSL?
Hi @muazhari, sorry for the delayed reply.
I used the following router config:
version: "1"
execution_config:
file:
path: "./execution_config.json"
headers:
all:
request:
- op: "propagate"
named: "Authorization"
authentication:
jwt:
jwks:
- url: "http://localhost:8080/realms/cosmo/protocol/openid-connect/certs"
header_name: Authorization
events:
providers:
kafka:
- id: default
brokers:
- "localhost:9092"
No changes to the execution config were required; it worked on the first try with no modifications.
The most common issues with EDFS relate to provider configuration: networking problems, misconfiguration, etc.
We are not actively testing the cosmo router on a WSL environment, so it may work there, but we cannot guarantee that.