fluentd icon indicating copy to clipboard operation
fluentd copied to clipboard

flush_interval is not working

Open rokytnice opened this issue 3 years ago • 1 comments

Describe the bug

Our configuration is in the "Your Configuration" section. We have a message rate of about 1000 per second. Fluentd writes to the elastic search and also to s3. The elastic search part is fine. The S3 part needs a lot of buffers (3G). But our main problem is that the buffer is emptied after 2 minutes, which is too late. We have configured 1s.

Source is rabbitmq.

To Reproduce

Rate of 1000 Msg/s

Expected behavior

Flushing the buffer after 1 second.

Your Environment

- Fluentd version: fluentd 1.2.4
- TD Agent version:
- Operating system: linux 5.13.0-40-generic
- Kernel version: 5.13.0-40-generic

Your Configuration

<source>
        @type rabbitmq
        tag ppm      
        host "#{ENV['EXT_RABBITMQ_HOST']}"
        user "#{ENV['EXT_RABBITMQ_USERNAME']}"
        pass "#{ENV['EXT_RABBITMQ_PASSWORD']}"
        vhost /
        queue "#{ENV['EXT_RABBITMQ_QUEUE']}"
        durable true
        heartbeat 10 # integer as seconds or :server (interval specified by server)
        <parse>
            @type json
        </parse>
    </source>


    <source>
        @type monitor_agent
    </source>

    <source>
        @type prometheus
        bind 0.0.0.0
        port 24231
        metrics_path /status/metrics
    </source>

    <source>
        @type prometheus_output_monitor
        interval 10
        <labels>
            hostname ${hostname}
        </labels>
    </source>


<system>    
    log_level "#{ENV['EXT_FLUENTD_PPM_LOG_LEVEL']}"    
</system>

    <match ppm>
        @type copy
        copy_mode shallow
        <store>
            @type relabel
            @label @ppm-s3
        </store>
        <store>
            @type relabel
            @label @ppm-elastic
        </store>
    </match>

    <label @ppm-elastic>
        <filter ppm>
            @type record_modifier
            <record>
                storingTimestamp ${Time.now.strftime('%Y-%m-%dT%H:%M:%S.%N%:z')}
            </record>
            <record>
                loggingTimestamp ${Time.at(record['timestamp'] / 1000.0).strftime('%Y-%m-%dT%H:%M:%S.%L%:z')}
            </record>
            <record>
                namespace "#{ENV['EXT_FLUENTD_PPM_NAMESPACE']}"
            </record>
            <record>
                _dummy_ ${if record.has_key?('payload') && !record['payload'].nil? && record['payload'].has_key?('payloadData'); record['payload']['payloadSize'] = (record['payload']['payloadData']).size ; end; nil}
            </record>
            remove_keys timestamp, _dummy_, payload
        </filter>

        <filter ppm>
            @type prometheus
            <metric>
                name fluentd_input_status_num_records_total
                type counter
                desc The total number of incoming records
                <labels>
                    tag ${tag}
                    hostname ${hostname}
                </labels>
            </metric>
        </filter>

        <match ppm>
            @type copy
            <store>
                @id elasticsearch
                @type elasticsearch
                include_tag_key false
                host "#{ENV['EXT_ELASTICSEARCH_HOST']}"
                port "#{ENV['EXT_ELASTICSEARCH_PORT']}"
                scheme "#{ENV['EXT_ELASTICSEARCH_SCHEME']}"
                logstash_format true
                logstash_prefix ppm
                time_key loggingTimestamp
                time_key_format %Y-%m-%dT%H:%M:%S.%L%:z
                # https://github.com/atomita/fluent-plugin-aws-elasticsearch-service/issues/15#issuecomment-254793259
                reload_connections false
                reconnect_on_error true
                reload_on_failure true
                request_timeout 30s
                resurrect_after 5s
                type_name _doc
                <buffer>
                    # https://docs.fluentd.org/v1.0/articles/buffer-section
                    @type file
                    path /var/log/fluentd-buffers/ppm.es.buffer
                    flush_mode interval
                    flush_interval 6s
                    retry_forever true
                    retry_type exponential_backoff
                    retry_max_interval 3m
                    retry_wait 1s
                    flush_thread_count 2
                    overflow_action block
                    disable_chunk_backup true
                    chunk_limit_size "#{ENV['EXT_ELASTICSEARCH_CHUNK_LIMIT_SIZE']}"
                </buffer>
            </store>
            <store>
                @type prometheus
                <metric>
                    name fluentd_output_status_num_records_total
                    type counter
                    desc The total number of outgoing records
                    <labels>
                        tag ${tag}
                        hostname ${hostname}
                    </labels>
                </metric>
            </store>
        </match>
    </label>
    

        
    <label @ppm-s3>
        
        <filter ppm>
            @type adduuid
            key objectkey_uuid
        </filter>

        <match ppm>
            @type s3   
            store_as json
            #add_tag_prefix filtered
            s3_bucket "#{ENV['EXT_S3_BUCKET']}"
            s3_region eu-central-1
            path ${objectkey_uuid}

            s3_object_key_format  %{path}__%{index}.%{file_extension}
            # workaround to run with minio
            "#{ENV['S3_ENDPOINT_PARAMETER']}" "#{ENV['S3_ENDPOINT_VALUE']}"
            "#{ENV['AWS_KEY_ID_PARAMETER']}" "#{ENV['AWS_KEY_ID_VALUE']}"
            "#{ENV['AWS_SEC_KEY_PARAMETER']}" "#{ENV['AWS_SEC_KEY_VALUE']}"
            # workaround end
 
                <buffer time,objectkey_uuid>
                    # https://docs.fluentd.org/v1.0/articles/buffer-section
                    @type memory
                    path /var/log/fluentd-buffers/ppm.s3.buffer
                    flush_mode interval
                    flush_interval 1s
                    retry_forever true
                    retry_type exponential_backoff
                    retry_max_interval 1s
                    retry_wait 1s
                    flush_thread_count 2
                    overflow_action block
                    disable_chunk_backup true
                    chunk_limit_size "#{ENV['EXT_ELASTICSEARCH_CHUNK_LIMIT_SIZE']}"
                </buffer>

 
              <format>
                @type json
              </format>
              <inject>
                time_key log_time
                time_type string
                time_format %Y-%m-%dT%H:%M:%S
                utc true
              </inject>

        </match>
    </label>

Your Error Log

Buffer overflow if we dont configure enough memory.

Additional context

No response

rokytnice avatar May 05 '22 14:05 rokytnice

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days

github-actions[bot] avatar Aug 04 '22 10:08 github-actions[bot]

This issue was automatically closed because of stale in 30 days

github-actions[bot] avatar Sep 04 '22 10:09 github-actions[bot]