ditto icon indicating copy to clipboard operation
ditto copied to clipboard

Connection extraField Enrichment.

Open VETRIVEL001 opened this issue 1 year ago • 7 comments

I have given this RQL query in kafka connection topics field which is inside targets "_/_/things/twin/events?filter=and(like(thingId,"org.eclipse.ditto:myThing*"),exists(features/lamp))&extraFields=thingId,features/water-tank/properties,attributes/location". Now i need to send events on kafka only if features/ lamp was modified but when i mention &extraFields= in that query, its sending events if anything gets changed in thing. can anyone help me out in this, when particular field was modified in thing, send events with extraField.

VETRIVEL001 avatar Apr 29 '24 07:04 VETRIVEL001

@VETRIVEL001 as long as you don't specify features or features/lamp in the extraFields, the behavior should be as you expect it to be:

  • only if the lamp feature was modified, an event should be published

If this is not the case, please share a reproducer, e.g. as complete connection configuration and as the change you expect to "pass" the filter - and one change to a Thing you expect to "not pass" the filter.

thjaeckle avatar Apr 29 '24 13:04 thjaeckle

This is my thing { "thingId": "org.eclipse.ditto:myThing12345", "policyId": "org.eclipse.ditto:myThing12345", "attributes": { "manufacturer": "ACME demo corp.", "location": "Berlin, main floor", "serialno": "42", "model": "Speaking coffee machine" }, "features": { "water-tank": { "properties": { }, "desiredProperties": {} }, "lamp": {
"properties": {}, "desiredProperties": {} } } and this is my connection { "id": "werkjhgf", "name": "KAFKA", "connectionType": "kafka", "connectionStatus": "open", "uri": "tcp://ip:9092", "sources": [ { "addresses": [ "topic-123" ], "consumerCount": 1, "qos": 0, "authorizationContext": [ "nginx:ditto" ], "headerMapping": {}, "payloadMapping": [ "Ditto" ], "replyTarget": { "address": "topic-Reply-123", "headerMapping": {}, "expectedResponseTypes": [ "response", "error" ], "enabled": true } } ], "targets": [ { "address": "eventTopic", "topics": [ "//things/twin/events?filter=and(like(thingId,'org.eclipse.ditto:myThing:*'),exists(features/lamp))&extraFields=thingId,features/water-tank/properties,attributes/location" ], "qos": 0, "authorizationContext": [ "nginx:ditto" ], "headerMapping": {} } ], "clientCount": 1, "failoverEnabled": true, "validateCertificates": false, "processorPoolSize": 1, "specificConfig": { "saslMechanism": "plain", "bootstrapServers": "kafka_ip:port" }, "mappingDefinitions": { "LexiMapper": { "mappingEngine": "", "options": { "incomingScript": "", "outgoingScript": "" } } }, "tags": [] } this is how I created the connection in my ditto. I am expecting it should publish the event with these extraFields "thingId,features/water-tank/properties,attributes/location" only if I changed anything under the lamp feature but it was publishing the event on the topic if anything gets changed in thing. I want to publish event with extraFields only specified 'feature or field' gets changed. can anyone tell me how to do it?

VETRIVEL001 avatar Apr 30 '24 19:04 VETRIVEL001

@VETRIVEL001 I cannot reproduce ..

What I did:

  • Created the connection like you did with:
"topics": [
                "_/_/things/twin/events?filter=and(like(thingId,'org.eclipse.ditto:myThing:*'),exists(features/lamp))&extraFields=thingId,features/water-tank/properties,attributes/location"
            ],
  • then I created the Thing (notice that you require another : in your filter like(thingId,'org.eclipse.ditto:myThing:*'):
PUT /api/2/things/org.eclipse.ditto:myThing:123

with payload:

{
  "attributes": {
    "manufacturer": "ACME demo corp.",
    "location": "Berlin, main floor",
    "serialno": "42",
    "model": "Speaking coffee machine"
  },
  "features": {
    "water-tank": {
      "properties": {},
      "desiredProperties": {}
    },
    "lamp": {
      "properties": {},
      "desiredProperties": {}
    }
  }
}
  • That triggered the connection to publish an event (because features/lamp) was contained in the payload.
  • Then I updated only the feature water-tank with:
PUT /api/2/things/org.eclipse.ditto:myThing:123/features/water-tank

Any payload:

{
   "properties": {
      "foo": "bar"
   }
}
  • Which did - as expected - not send out an event via the connection - as the lamp feature was not updated.

thjaeckle avatar May 02 '24 13:05 thjaeckle

I also followed the same way you did. connection: "topics": [ "//things/twin/events?filter=and(like(thingId,'org.eclipse.ditto:myThing:*'),exists(features/lamp))&extraFields=thingId,features/water-tank/properties,attributes/location" ],

PUT /api/2/things/org.eclipse.ditto:myThing:123/features/water-tank I tried both put and patch payload: { "properties": { "foo": "bar" } } but it sending out an event on kafka. still it persists the same issue.

VETRIVEL001 avatar May 03 '24 06:05 VETRIVEL001

Do you maybe still have another Kafka connection in Ditto which does not have the same filter?

thjaeckle avatar May 03 '24 06:05 thjaeckle

No, I have only one connection.

VETRIVEL001 avatar May 03 '24 06:05 VETRIVEL001

Hello @VETRIVEL001

I tried to replicate the issue, but I wasn't able to. I use Ditto 3.5.4.

I basically did the same steps as you:

  • created thing with lamp feature provided
  • updated (with PUT) water-tank feature
  • updated (with PUT) lamp feature

Before doing that I enabled logs for the connection and here is what I see in Ditto UI: 2024-05-03-122852_912x454_scrot Creating thing and updating lamp feature provided series of events with types dispatched-filtered-mapped-published while updating water-tank feature produced only event of dispatched type, which is expected behavior.

Connection logs as JSON
{
  "connectionId": "1929-kafka",
  "connectionLogs": [
    {
      "correlationId": "8dda0473-9ea8-43ff-bf28-d4a476ffae11",
      "timestamp": "2024-05-03T09:23:13.366877149Z",
      "category": "connection",
      "type": "other",
      "level": "success",
      "message": "Successfully reset the logs.",
      "entityType": "connection",
      "entityId": "1929-kafka"
    },
    {
      "correlationId": "24767649-4f65-4874-9f21-ad232a19361a",
      "timestamp": "2024-05-03T09:23:17.151495994Z",
      "category": "target",
      "type": "dispatched",
      "level": "success",
      "message": "Successfully dispatched signal. - Message headers: [referer=http://localhost:8080/ui/, sec-fetch-site=same-origin, correlation-id=24767649-4f65-4874-9f21-ad232a19361a, origin=http://localhost:8080, ditto-ackgregator-address=pekko://[email protected]:2551/user/$Yb/ackr0-24767649-4f65-4874-9f21-ad232a19361a#16254017, requested-acks=[\"twin-persisted\"], authorization=Basic ZGl0dG86ZGl0dG8=, ditto-originator=nginx:ditto, response-required=false, host=localhost:8080, ditto-read-subjects=[\"nginx:ditto\"], sec-fetch-mode=cors, if-none-match=*, x-ditto-pre-authenticated=nginx:ditto, cookie=.AspNetCore.Antiforgery.P_Dp5CLNiMg=CfDJ8EsrvQfSA0lDjqCxsYQfCHfsIwLKzeW0GSHtFQ-_km-beQOug9wZEm5ctF2ZoQNQ-FypJc2ZWPbI7HQQK_McYdFGOlry7K8LzOkGvQjSO971P9wxTb_pVvG0o0u1NR1i3ljZqJEpEA6SEpI278_Dhl0; .AspNetCore.Antiforgery.8vFUR3_kimI=CfDJ8EsrvQfSA0lDjqCxsYQfCHfO5jmjvCr-hw6-WX5H38gaDdbRs9B-QivzCQjvik0fzWhOh0PVL0SYDqrNr8wIxLJSismJPCv3iocH6K16UkDU6iGpsTHR18J8opgm3GD8-rCqZIslFupzOx1VZ0Mlxog, accept-language=en-US, en;q=0.5, dnt=1, x-forwarded-for=172.18.0.1, pragma=no-cache, accept=*/*, x-real-ip=172.18.0.1, x-forwarded-user=ditto, ditto-auth-context={\"type\":\"pre-authenticated-http\",\"subjects\":[\"nginx:ditto\"]}, sec-fetch-dest=empty, user-agent=Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0] - Message payload: {\"type\":\"things.events:thingCreated\",\"_timestamp\":\"2024-05-03T09:23:17.128058725Z\",\"_metadata\":null,\"revision\":9,\"thingId\":\"org.eclipse.ditto:myThing:123\",\"thing\":{\"thingId\":\"org.eclipse.ditto:myThing:123\",\"policyId\":\"org.eclipse.ditto:myThing:123\",\"attributes\":{\"manufacturer\":\"ACME demo corp.\",\"location\":\"Berlin, main floor\",\"serialno\":\"42\",\"model\":\"Speaking coffee machine\"},\"features\":{\"water-tank\":{\"properties\":{},\"desiredProperties\":{}},\"lamp\":{\"properties\":{},\"desiredProperties\":{}}}}}",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "24767649-4f65-4874-9f21-ad232a19361a",
      "timestamp": "2024-05-03T09:23:17.168057238Z",
      "category": "target",
      "type": "filtered",
      "level": "success",
      "message": "Signal successfully passed possible filters.",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "24767649-4f65-4874-9f21-ad232a19361a",
      "timestamp": "2024-05-03T09:23:17.201507206Z",
      "category": "target",
      "type": "mapped",
      "level": "success",
      "message": "Mapped outgoing signal with mapper <default>",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "24767649-4f65-4874-9f21-ad232a19361a",
      "timestamp": "2024-05-03T09:23:17.254721551Z",
      "category": "target",
      "type": "published",
      "level": "success",
      "message": "Successfully published signal. - Message headers: [content-type=application/vnd.eclipse.ditto+json, correlation-id=24767649-4f65-4874-9f21-ad232a19361a] - Message payload: {\"topic\":\"org.eclipse.ditto/myThing:123/things/twin/events/created\",\"headers\":{\"sec-fetch-mode\":\"cors\",\"referer\":\"http://localhost:8080/ui/\",\"x-ditto-pre-authenticated\":\"nginx:ditto\",\"sec-fetch-site\":\"same-origin\",\"cookie\":\".AspNetCore.Antiforgery.P_Dp5CLNiMg=CfDJ8EsrvQfSA0lDjqCxsYQfCHfsIwLKzeW0GSHtFQ-_km-beQOug9wZEm5ctF2ZoQNQ-FypJc2ZWPbI7HQQK_McYdFGOlry7K8LzOkGvQjSO971P9wxTb_pVvG0o0u1NR1i3ljZqJEpEA6SEpI278_Dhl0; .AspNetCore.Antiforgery.8vFUR3_kimI=CfDJ8EsrvQfSA0lDjqCxsYQfCHfO5jmjvCr-hw6-WX5H38gaDdbRs9B-QivzCQjvik0fzWhOh0PVL0SYDqrNr8wIxLJSismJPCv3iocH6K16UkDU6iGpsTHR18J8opgm3GD8-rCqZIslFupzOx1VZ0Mlxog\",\"accept-language\":\"en-US, en;q=0.5\",\"origin\":\"http://localhost:8080\",\"dnt\":\"1\",\"x-forwarded-for\":\"172.18.0.1\",\"pragma\":\"no-cache\",\"accept\":\"*/*\",\"authorization\":\"Basic ZGl0dG86ZGl0dG8=\",\"x-real-ip\":\"172.18.0.1\",\"x-forwarded-user\":\"ditto\",\"host\":\"localhost:8080\",\"sec-fetch-dest\":\"empty\",\"user-agent\":\"Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0\",\"ditto-originator\":\"nginx:ditto\",\"response-required\":false,\"requested-acks\":[],\"content-type\":\"application/json\",\"correlation-id\":\"24767649-4f65-4874-9f21-ad232a19361a\"},\"path\":\"/\",\"value\":{\"thingId\":\"org.eclipse.ditto:myThing:123\",\"policyId\":\"org.eclipse.ditto:myThing:123\",\"attributes\":{\"manufacturer\":\"ACME demo corp.\",\"location\":\"Berlin, main floor\",\"serialno\":\"42\",\"model\":\"Speaking coffee machine\"},\"features\":{\"water-tank\":{\"properties\":{},\"desiredProperties\":{}},\"lamp\":{\"properties\":{},\"desiredProperties\":{}}}},\"extra\":{\"thingId\":\"org.eclipse.ditto:myThing:123\",\"features\":{\"water-tank\":{\"properties\":{}}},\"attributes\":{\"location\":\"Berlin, main floor\"}},\"revision\":9,\"timestamp\":\"2024-05-03T09:23:17.128058725Z\"}",
      "address": "eventTopic"
    },
    {
      "correlationId": "fabbefdc-2d9a-4ffe-b772-87df269cdf90",
      "timestamp": "2024-05-03T09:24:16.594616743Z",
      "category": "target",
      "type": "dispatched",
      "level": "success",
      "message": "Successfully dispatched signal. - Message headers: [x-ditto-pre-authenticated=nginx:ditto, correlation-id=fabbefdc-2d9a-4ffe-b772-87df269cdf90, x-forwarded-for=172.18.0.1, ditto-ackgregator-address=pekko://[email protected]:2551/user/$3b/ackr0-fabbefdc-2d9a-4ffe-b772-87df269cdf90#-907028526, accept=*/*, requested-acks=[\"twin-persisted\"], authorization=Basic ZGl0dG86ZGl0dG8=, x-real-ip=172.18.0.1, x-forwarded-user=ditto, ditto-originator=nginx:ditto, response-required=false, ditto-auth-context={\"type\":\"pre-authenticated-http\",\"subjects\":[\"nginx:ditto\"]}, host=localhost:8080, ditto-read-subjects=[\"nginx:ditto\"], user-agent=curl/8.7.1] - Message payload: {\"type\":\"things.events:featureModified\",\"_timestamp\":\"2024-05-03T09:24:16.573186811Z\",\"_metadata\":null,\"revision\":10,\"thingId\":\"org.eclipse.ditto:myThing:123\",\"featureId\":\"water-tank\",\"feature\":{\"properties\":{\"foo\":\"bar\"}}}",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "a2306b00-9c37-4334-a285-bdd520017c7d",
      "timestamp": "2024-05-03T09:24:20.508087532Z",
      "category": "target",
      "type": "dispatched",
      "level": "success",
      "message": "Successfully dispatched signal. - Message headers: [x-ditto-pre-authenticated=nginx:ditto, correlation-id=a2306b00-9c37-4334-a285-bdd520017c7d, x-forwarded-for=172.18.0.1, ditto-ackgregator-address=pekko://[email protected]:2551/user/$4b/ackr0-a2306b00-9c37-4334-a285-bdd520017c7d#-1885240064, accept=*/*, requested-acks=[\"twin-persisted\"], authorization=Basic ZGl0dG86ZGl0dG8=, x-real-ip=172.18.0.1, x-forwarded-user=ditto, ditto-originator=nginx:ditto, response-required=false, ditto-auth-context={\"type\":\"pre-authenticated-http\",\"subjects\":[\"nginx:ditto\"]}, host=localhost:8080, ditto-read-subjects=[\"nginx:ditto\"], user-agent=curl/8.7.1] - Message payload: {\"type\":\"things.events:featureModified\",\"_timestamp\":\"2024-05-03T09:24:20.493875969Z\",\"_metadata\":null,\"revision\":11,\"thingId\":\"org.eclipse.ditto:myThing:123\",\"featureId\":\"lamp\",\"feature\":{\"properties\":{\"foo\":\"bar\"}}}",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "a2306b00-9c37-4334-a285-bdd520017c7d",
      "timestamp": "2024-05-03T09:24:20.516700844Z",
      "category": "target",
      "type": "filtered",
      "level": "success",
      "message": "Signal successfully passed possible filters.",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "a2306b00-9c37-4334-a285-bdd520017c7d",
      "timestamp": "2024-05-03T09:24:20.586274286Z",
      "category": "target",
      "type": "mapped",
      "level": "success",
      "message": "Mapped outgoing signal with mapper <default>",
      "address": "eventTopic",
      "entityType": "thing",
      "entityId": "org.eclipse.ditto:myThing:123"
    },
    {
      "correlationId": "a2306b00-9c37-4334-a285-bdd520017c7d",
      "timestamp": "2024-05-03T09:24:20.606415085Z",
      "category": "target",
      "type": "published",
      "level": "success",
      "message": "Successfully published signal. - Message headers: [content-type=application/vnd.eclipse.ditto+json, correlation-id=a2306b00-9c37-4334-a285-bdd520017c7d] - Message payload: {\"topic\":\"org.eclipse.ditto/myThing:123/things/twin/events/modified\",\"headers\":{\"authorization\":\"Basic ZGl0dG86ZGl0dG8=\",\"x-real-ip\":\"172.18.0.1\",\"x-forwarded-user\":\"ditto\",\"x-ditto-pre-authenticated\":\"nginx:ditto\",\"host\":\"localhost:8080\",\"x-forwarded-for\":\"172.18.0.1\",\"accept\":\"*/*\",\"user-agent\":\"curl/8.7.1\",\"ditto-originator\":\"nginx:ditto\",\"response-required\":false,\"requested-acks\":[],\"content-type\":\"application/json\",\"correlation-id\":\"a2306b00-9c37-4334-a285-bdd520017c7d\"},\"path\":\"/features/lamp\",\"value\":{\"properties\":{\"foo\":\"bar\"}},\"extra\":{\"thingId\":\"org.eclipse.ditto:myThing:123\",\"features\":{\"water-tank\":{\"properties\":{\"foo\":\"bar\"}}},\"attributes\":{\"location\":\"Berlin, main floor\"}},\"revision\":11,\"timestamp\":\"2024-05-03T09:24:20.493875969Z\"}",
      "address": "eventTopic"
    }
  ],
  "enabledSince": "2024-05-03T09:33:01.831675300Z",
  "enabledUntil": "2024-05-03T10:33:01.833136688Z"
}

Here are the events I see in kafka (I'm using command kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic eventTopic in kafka container):

{"topic":"org.eclipse.ditto/myThing:123/things/twin/events/created","headers":{"sec-fetch-mode":"cors","referer":"http://localhost:8080/ui/","x-ditto-pre-authenticated":"nginx:ditto","sec-fetch-site":"same-origin","cookie":".AspNetCore.Antiforgery.P_Dp5CLNiMg=CfDJ8EsrvQfSA0lDjqCxsYQfCHfsIwLKzeW0GSHtFQ-_km-beQOug9wZEm5ctF2ZoQNQ-FypJc2ZWPbI7HQQK_McYdFGOlry7K8LzOkGvQjSO971P9wxTb_pVvG0o0u1NR1i3ljZqJEpEA6SEpI278_Dhl0; .AspNetCore.Antiforgery.8vFUR3_kimI=CfDJ8EsrvQfSA0lDjqCxsYQfCHfO5jmjvCr-hw6-WX5H38gaDdbRs9B-QivzCQjvik0fzWhOh0PVL0SYDqrNr8wIxLJSismJPCv3iocH6K16UkDU6iGpsTHR18J8opgm3GD8-rCqZIslFupzOx1VZ0Mlxog","accept-language":"en-US, en;q=0.5","origin":"http://localhost:8080","dnt":"1","x-forwarded-for":"172.18.0.1","pragma":"no-cache","accept":"*/*","authorization":"Basic ZGl0dG86ZGl0dG8=","x-real-ip":"172.18.0.1","x-forwarded-user":"ditto","host":"localhost:8080","sec-fetch-dest":"empty","user-agent":"Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0","ditto-originator":"nginx:ditto","response-required":false,"requested-acks":[],"content-type":"application/json","correlation-id":"24767649-4f65-4874-9f21-ad232a19361a"},"path":"/","value":{"thingId":"org.eclipse.ditto:myThing:123","policyId":"org.eclipse.ditto:myThing:123","attributes":{"manufacturer":"ACME demo corp.","location":"Berlin, main floor","serialno":"42","model":"Speaking coffee machine"},"features":{"water-tank":{"properties":{},"desiredProperties":{}},"lamp":{"properties":{},"desiredProperties":{}}}},"extra":{"thingId":"org.eclipse.ditto:myThing:123","features":{"water-tank":{"properties":{}}},"attributes":{"location":"Berlin, main floor"}},"revision":9,"timestamp":"2024-05-03T09:23:17.128058725Z"}
{"topic":"org.eclipse.ditto/myThing:123/things/twin/events/modified","headers":{"authorization":"Basic ZGl0dG86ZGl0dG8=","x-real-ip":"172.18.0.1","x-forwarded-user":"ditto","x-ditto-pre-authenticated":"nginx:ditto","host":"localhost:8080","x-forwarded-for":"172.18.0.1","accept":"*/*","user-agent":"curl/8.7.1","ditto-originator":"nginx:ditto","response-required":false,"requested-acks":[],"content-type":"application/json","correlation-id":"a2306b00-9c37-4334-a285-bdd520017c7d"},"path":"/features/lamp","value":{"properties":{"foo":"bar"}},"extra":{"thingId":"org.eclipse.ditto:myThing:123","features":{"water-tank":{"properties":{"foo":"bar"}}},"attributes":{"location":"Berlin, main floor"}},"revision":11,"timestamp":"2024-05-03T09:24:20.493875969Z"}

Could you, please, provide following:

  • Ditto version you use
  • connection logs
    Please, enable connection logs, do the test and send the logs here. Logs can be enabled using HTTP API or Ditto UI under "Connections" tab at the top.
    Logs can be viewed on Ditto UI as well, or requested using HTTP API. I believe, requesting them using HTTP API will be better here as UI does not show whole logs at once and JSON can be more easily checked.
  • events published to Kafka

dimabarbul avatar May 03 '24 09:05 dimabarbul