fluent-plugin-opensearch icon indicating copy to clipboard operation
fluent-plugin-opensearch copied to clipboard

OpenSearch Output Data Stream - id_key ignored

Open toby181 opened this issue 2 years ago • 1 comments

Hi together, We're using the opensearch_data_stream feature. To avoid duplicated logs in OpenSearch, we want to use the feature opensearch_genid in combination with the "id_key" parameter. Unfortunately the "id_key" seems to be ignored when it comes to data streams.

Steps to replicate

Our dummy config:

<filter **>
    @type opensearch_genid
    hash_id_key "myhash"
  </filter>
  <match **>
    @type opensearch_data_stream
    @id fd_out_os_ds
    @log_level "info"
    id_key "myhash"
    with_transporter_log false
    data_stream_name "log-dummy"
    data_stream_template_name ""
    include_tag_key true
    host "opensearch-staging"
    port 9200
    path ""
    scheme https
    ssl_verify false
    ssl_version TLSv1_2
    user "fluentd"
    password xxxxxx
    reload_connections false
    reconnect_on_error true
    reload_on_failure true
    log_os_400_reason true
    logstash_format true
    include_timestamp true
    sniffer_class_name "Fluent::Plugin::OpenSearchSimpleSniffer"
    request_timeout 5s
    suppress_type_name false
    <buffer tag>
      flush_thread_count 8
      flush_interval 5s
      chunk_limit_size 4M
      total_limit_size 512MB
      retry_max_interval 30
      retry_forever true
    </buffer>
  </match>
 

When I check my logs in OpenSearch the field 'myhash' is filled, with hash values, so it's not a problem with the generation of the hashes.

Expected Behavior or What you need to ask

Based on the example in https://github.com/fluent/fluent-plugin-opensearch#generate-hash-id, the configuration seems ok, I'd expect that the value of the "_id" field is replaced with the value of the "myhash" field, or the configured field.

Using Fluentd and OpenSearch plugin versions

  • Bare Metal or within Docker or Kubernetes or others?
    • Kubernetes
  • Fluentd v1.0 or later
    • paste result of fluentd --version or td-agent --version
    • fluentd 1.16.3
  • OpenSearch plugin version
    • paste boot log of fluentd or td-agent
    • paste result of fluent-gem list, td-agent-gem list or your Gemfile.lock
*** LOCAL GEMS ***

abbrev (default: 0.1.0)
async (1.31.0)
async-http (0.60.2)
async-io (1.37.0)
async-pool (0.4.0)
aws-eventstream (1.2.0)
aws-partitions (1.854.0)
aws-sdk-core (3.187.1)
aws-sigv4 (1.6.1)
base64 (default: 0.1.1)
benchmark (default: 0.2.0)
bigdecimal (default: 3.1.1)
bundler (default: 2.3.26)
cgi (default: 0.3.6)
concurrent-ruby (1.2.2)
console (1.23.2)
cool.io (1.8.0)
csv (default: 3.2.5)
date (default: 3.2.2)
debug (1.6.3)
delegate (default: 0.2.0)
did_you_mean (default: 1.6.1)
digest (default: 3.1.0)
digest-crc (0.6.5)
drb (default: 2.1.0)
english (default: 0.7.1)
erb (default: 2.2.3)
error_highlight (default: 0.3.0)
etc (default: 1.3.0)
excon (0.104.0)
faraday (2.7.11)
faraday-excon (2.1.0)
faraday-net_http (3.0.2)
faraday_middleware-aws-sigv4 (1.0.1)
fcntl (default: 1.0.1)
fiber-annotation (0.2.0)
fiber-local (1.0.0)
fiddle (default: 1.1.0)
fileutils (default: 1.6.0)
find (default: 0.1.1)
fluent-plugin-kafka (0.19.2)
fluent-plugin-multi-format-parser (1.0.0)
fluent-plugin-opensearch (1.1.4)
fluent-plugin-prometheus (2.1.0)
fluent-plugin-record-modifier (2.1.1)
fluentd (1.16.3)
forwardable (default: 1.3.2)
getoptlong (default: 0.1.1)
http_parser.rb (0.8.0)
io-console (default: 0.5.11)
io-nonblock (default: 0.1.0)
io-wait (default: 0.2.1)
ipaddr (default: 1.2.4)
irb (default: 1.4.1)
jmespath (1.6.2)
json (2.6.3, default: 2.6.1)
logger (default: 1.5.0)
ltsv (0.1.2)
matrix (0.4.2)
minitest (5.15.0)
msgpack (1.7.2)
multi_json (1.15.0)
mutex_m (default: 0.1.1)
net-ftp (0.1.3)
net-http (default: 0.3.0)
net-imap (0.2.3)
net-pop (0.1.1)
net-protocol (default: 0.1.2)
net-smtp (0.3.1)
nio4r (2.5.9)
nkf (default: 0.1.1)
observer (default: 0.1.1)
oj (3.16.1)
open-uri (default: 0.2.0)
open3 (default: 0.1.1)
opensearch-ruby (3.0.1)
openssl (default: 3.0.1)
optparse (default: 0.2.0)
ostruct (default: 0.5.2)
pathname (default: 0.2.0)
power_assert (2.0.1)
pp (default: 0.3.0)
prettyprint (default: 0.1.1)
prime (0.1.2)
prometheus-client (4.2.2)
protocol-hpack (1.4.2)
protocol-http (0.24.7)
protocol-http1 (0.15.1)
protocol-http2 (0.15.1)
pstore (default: 0.1.1)
psych (default: 4.0.4)
racc (default: 1.6.0)
rake (13.0.6)
rbs (2.7.0)
rdoc (default: 6.4.0)
readline (default: 0.0.3)
readline-ext (default: 0.1.4)
reline (default: 0.3.1)
resolv (default: 0.2.1)
resolv-replace (default: 0.1.0)
rexml (3.2.6, 3.2.5)
rinda (default: 0.1.1)
rss (0.2.9)
ruby-kafka (1.5.0)
ruby2_keywords (default: 0.0.5)
securerandom (default: 0.2.0)
serverengine (2.3.2)
set (default: 1.0.2)
shellwords (default: 0.1.0)
sigdump (0.2.5)
singleton (default: 0.1.1)
stringio (default: 3.0.1)
strptime (0.2.5)
strscan (default: 3.0.1)
syslog (default: 0.1.0)
tempfile (default: 0.1.2)
test-unit (3.5.3)
time (default: 0.2.2)
timeout (default: 0.2.0)
timers (4.3.5)
tmpdir (default: 0.1.2)
traces (0.11.1)
tsort (default: 0.1.0)
typeprof (0.21.3)
tzinfo (2.0.6)
tzinfo-data (1.2023.3)
un (default: 0.2.0)
uri (0.12.2, default: 0.12.1)
weakref (default: 0.1.1)
webrick (1.8.1)
yajl-ruby (1.4.3)
yaml (default: 0.2.0)
zlib (default: 2.1.1)
  • OpenSearch version (optional)
  • 2.11
  • OpenSearch template(s) (optional)
    {
      "name": "log_template",
      "index_template": {
        "index_patterns": [
          "log-*"
        ],
        "template": {
          "settings": {
            "index": {
              "number_of_shards": "1",
              "number_of_replicas": "1",
              "mapping": {
                "total_fields": {
                  "limit": "2000"
                }
              }
            }
          },
          "mappings": {
            "properties": {
              "timestamp": {
                "type": "date"
              }
            }
          }
        },
        "composed_of": [
          
        ],
        "priority": 200,
        "data_stream": {
          "timestamp_field": {
            "name": "@timestamp"
          }
        }
      }
    }

toby181 avatar Nov 22 '23 17:11 toby181

Did some further testing but instead of using the 'opensearch_data_stream', the 'opensearch' plugin is used. ... I always thought the opensearch_data_stream has to be used when it comes to data streams. Might someone can correct me or provide some further details.

My current configuration looks like this

  <filter **>
    @type opensearch_genid
    hash_id_key _hash
  </filter>
  <label @FLUENT_LOG>
    <match fluent.*>
      @type null
    </match>
  </label>
  <match **>
    @type opensearch
    @id fd_out_os
    @log_level "debug"
    with_transporter_log false
    include_tag_key true
    id_key _hash
    remove_keys _hash
    host "opensearch-staging"
    port 9200
    path ""
    scheme https
    ssl_verify false
    ssl_version TLSv1_2
    user "fluentd"
    password xxxxxx
    reload_connections false
    reconnect_on_error true
    reload_on_failure true
    log_os_400_reason true
    logstash_prefix "null"
    logstash_dateformat "null"
    logstash_format false
    index_name "log-1"
    include_timestamp true
    sniffer_class_name "Fluent::Plugin::OpenSearchSimpleSniffer"
    request_timeout 4s
    suppress_type_name false
    write_operation "create"
    <buffer tag>
      ....
    </buffer>
  </match>

toby181 avatar Nov 23 '23 13:11 toby181