vector icon indicating copy to clipboard operation
vector copied to clipboard

Since vector0.32.0, the output of loki sink is empty.

Open 1123183721 opened this issue 2 years ago • 5 comments

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

I can not find the message in loki if config encoding.codec = "text" if I upgrade vector to version 0.32.0+

Configuration

[sinks.out_loki]
type = "loki"
inputs = ["remap_kafka"]
endpoint = "http://loki:3100"
encoding.codec = "text"
out_of_order_action = "accept"
remove_label_fields = true

Version

0.32.0+

Debug Output

No response

Example Data

No response

Additional Context

No response

References

No response

1123183721 avatar Dec 11 '23 02:12 1123183721

Hi @1123183721,

In order to help us investigate, could you please provide your full configuration and clarify what you mean by the output of the loki sink is empty. Does Vector log any errors?

dsmith3197 avatar Dec 12 '23 22:12 dsmith3197

Thanks reply @dsmith3197 , The full configuration:

data_dir = "/var/lib/vector"
schema.log_namespace = true
api.enabled = true
api.address = "0.0.0.0:8686"

[sources.in_kafka]
type = "kafka"
bootstrap_servers = "xx:9092"
group_id = "forwarder-vector"
topics = ["log-analyzer-forwarder-pressure-test"]
decoding.codec = "json"

[sources.in_kafka.librdkafka_options]
"max.partition.fetch.bytes" = "1048576"
"fetch.max.bytes" = "52428800"
"message.max.bytes" = "1000000"

[sinks.out_loki]
type = "loki"
inputs = ["in_kafka"]
endpoint = "http://loki:3100"
encoding.codec = "text"
compression = "snappy"
encoding.only_fields = ["message"]
# accept、drop、rewrite_timestamp
out_of_order_action = "accept"
remove_label_fields = true

[sinks.out_loki.labels]
job = "log_analyzer_forwarder"
log_analyzer_forwarder = "vector"
"filename" = "{{ filename }}"
"hostname" = "{{ hostname }}"

Kafka data sample:

{"hostname": "test22","message": "Fail to read file.","filename": "/var/log/ceph/ceph-mgr.test22.log","@collect_time": "2023-12-08T01:42:41.272816048Z"}

Response of loki query api:

{
    "status": "success",
    "data": {
        "resultType": "streams",
        "result": [
            {
                "stream": {
                    "filename": "/var/log/ceph/ceph-mgr.test22.log",
                    "hostname": "test22",
                    "job": "log_analyzer_forwarder",
                    "log_analyzer_forwarder": "vector"
                },
                "values": [
                    [
                        "1702259523150000000",
                        ""
                    ]
                ]
            }
        ],
        "stats": {
            "summary": {
                "bytesProcessedPerSecond": 104894934,
                "linesProcessedPerSecond": 2299531,
                "totalBytesProcessed": 202544313,
                "totalLinesProcessed": 4440224,
                "execTime": 1.930926,
                "queueTime": 0.000026,
                "subqueries": 0,
                "totalEntriesReturned": 1,
                "splits": 0,
                "shards": 1
            },
            "querier": {
                "store": {
                    "totalChunksRef": 0,
                    "totalChunksDownloaded": 0,
                    "chunksDownloadTime": 0,
                    "chunk": {
                        "headChunkBytes": 0,
                        "headChunkLines": 0,
                        "decompressedBytes": 0,
                        "decompressedLines": 0,
                        "compressedBytes": 0,
                        "totalDuplicates": 0
                    }
                }
            },
            "ingester": {
                "totalReached": 1,
                "totalChunksMatched": 451,
                "totalBatches": 1,
                "totalLinesSent": 1,
                "store": {
                    "totalChunksRef": 0,
                    "totalChunksDownloaded": 0,
                    "chunksDownloadTime": 0,
                    "chunk": {
                        "headChunkBytes": 57948846,
                        "headChunkLines": 1592892,
                        "decompressedBytes": 144595467,
                        "decompressedLines": 2847332,
                        "compressedBytes": 63206352,
                        "totalDuplicates": 0
                    }
                }
            },
            "cache": {
                "chunk": {
                    "entriesFound": 0,
                    "entriesRequested": 0,
                    "entriesStored": 0,
                    "bytesReceived": 0,
                    "bytesSent": 0,
                    "requests": 0,
                    "downloadTime": 0
                },
                "index": {
                    "entriesFound": 0,
                    "entriesRequested": 0,
                    "entriesStored": 0,
                    "bytesReceived": 0,
                    "bytesSent": 0,
                    "requests": 0,
                    "downloadTime": 0
                },
                "result": {
                    "entriesFound": 0,
                    "entriesRequested": 0,
                    "entriesStored": 0,
                    "bytesReceived": 0,
                    "bytesSent": 0,
                    "requests": 0,
                    "downloadTime": 0
                }
            }
        }
    }
}

And I don't see any error logs.

1123183721 avatar Dec 13 '23 09:12 1123183721

I've been able to confirm this behavior, specifically for the text codec.

Repro:

Start loki on localhost:3100 using docker (guide).

Run vector with the following config

data_dir = "/var/lib/vector"
schema.log_namespace = true
api.enabled = true
api.address = "0.0.0.0:8686"

[sources.in_kafka]
type = "stdin"
decoding.codec = "json"

[sinks.out_loki]
type = "loki"
inputs = ["in_kafka"]
endpoint = "http://localhost:3100"
encoding.codec = "text"
compression = "snappy"
encoding.only_fields = ["message"]
# accept、drop、rewrite_timestamp
out_of_order_action = "accept"
remove_label_fields = true

[sinks.out_loki.labels]
job = "log_analyzer_forwarder"
log_analyzer_forwarder = "vector"
"filename" = "{{ filename }}"
"hostname" = "{{ hostname }}"

Paste the below in the Vector console:

{"hostname": "test22","message": "Fail to read file.","filename": "/var/log/ceph/ceph-mgr.test22.log","@collect_time": "2023-12-08T01:42:41.272816048Z"}

Query loki:

curl -G -s "http://localhost:3100/loki/api/v1/query_range" \
    --data-urlencode "query={hostname=~\".+\"}" \
    --data-urlencode "limit=1000" \
    --data-urlencode "step=60" | jq

dsmith3197 avatar Dec 20 '23 21:12 dsmith3197

Note that this behavior only occurs when log namespacing is enabled. When log namespacing is enabled, you need to manually set the semantic meaning for the message like so

data_dir = "/var/lib/vector"
schema.log_namespace = true
api.enabled = true
api.address = "0.0.0.0:8686"

[sources.in_kafka]
type = "stdin"
decoding.codec = "json"

[transforms.set_meaning]
type = "remap"
inputs = ["in_kafka"]
source = """set_semantic_meaning(.message, "message")"""

[sinks.console]
type = "console"
encoding.codec = "text"
inputs = ["set_meaning"]

See the log namespacing guide for more info.

I agree that this is confusing from a UX perspective and something we should consider.

dsmith3197 avatar Dec 21 '23 18:12 dsmith3197

@dsmith3197 Thanks, I get it.

1123183721 avatar Jan 29 '24 08:01 1123183721