flush_interval is not working
Describe the bug
Our configuration is in the "Your Configuration" section. We have a message rate of about 1000 per second. Fluentd writes to the elastic search and also to s3. The elastic search part is fine. The S3 part needs a lot of buffers (3G). But our main problem is that the buffer is emptied after 2 minutes, which is too late. We have configured 1s.
Source is rabbitmq.
To Reproduce
Rate of 1000 Msg/s
Expected behavior
Flushing the buffer after 1 second.
Your Environment
- Fluentd version: fluentd 1.2.4
- TD Agent version:
- Operating system: linux 5.13.0-40-generic
- Kernel version: 5.13.0-40-generic
Your Configuration
<source>
@type rabbitmq
tag ppm
host "#{ENV['EXT_RABBITMQ_HOST']}"
user "#{ENV['EXT_RABBITMQ_USERNAME']}"
pass "#{ENV['EXT_RABBITMQ_PASSWORD']}"
vhost /
queue "#{ENV['EXT_RABBITMQ_QUEUE']}"
durable true
heartbeat 10 # integer as seconds or :server (interval specified by server)
<parse>
@type json
</parse>
</source>
<source>
@type monitor_agent
</source>
<source>
@type prometheus
bind 0.0.0.0
port 24231
metrics_path /status/metrics
</source>
<source>
@type prometheus_output_monitor
interval 10
<labels>
hostname ${hostname}
</labels>
</source>
<system>
log_level "#{ENV['EXT_FLUENTD_PPM_LOG_LEVEL']}"
</system>
<match ppm>
@type copy
copy_mode shallow
<store>
@type relabel
@label @ppm-s3
</store>
<store>
@type relabel
@label @ppm-elastic
</store>
</match>
<label @ppm-elastic>
<filter ppm>
@type record_modifier
<record>
storingTimestamp ${Time.now.strftime('%Y-%m-%dT%H:%M:%S.%N%:z')}
</record>
<record>
loggingTimestamp ${Time.at(record['timestamp'] / 1000.0).strftime('%Y-%m-%dT%H:%M:%S.%L%:z')}
</record>
<record>
namespace "#{ENV['EXT_FLUENTD_PPM_NAMESPACE']}"
</record>
<record>
_dummy_ ${if record.has_key?('payload') && !record['payload'].nil? && record['payload'].has_key?('payloadData'); record['payload']['payloadSize'] = (record['payload']['payloadData']).size ; end; nil}
</record>
remove_keys timestamp, _dummy_, payload
</filter>
<filter ppm>
@type prometheus
<metric>
name fluentd_input_status_num_records_total
type counter
desc The total number of incoming records
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</filter>
<match ppm>
@type copy
<store>
@id elasticsearch
@type elasticsearch
include_tag_key false
host "#{ENV['EXT_ELASTICSEARCH_HOST']}"
port "#{ENV['EXT_ELASTICSEARCH_PORT']}"
scheme "#{ENV['EXT_ELASTICSEARCH_SCHEME']}"
logstash_format true
logstash_prefix ppm
time_key loggingTimestamp
time_key_format %Y-%m-%dT%H:%M:%S.%L%:z
# https://github.com/atomita/fluent-plugin-aws-elasticsearch-service/issues/15#issuecomment-254793259
reload_connections false
reconnect_on_error true
reload_on_failure true
request_timeout 30s
resurrect_after 5s
type_name _doc
<buffer>
# https://docs.fluentd.org/v1.0/articles/buffer-section
@type file
path /var/log/fluentd-buffers/ppm.es.buffer
flush_mode interval
flush_interval 6s
retry_forever true
retry_type exponential_backoff
retry_max_interval 3m
retry_wait 1s
flush_thread_count 2
overflow_action block
disable_chunk_backup true
chunk_limit_size "#{ENV['EXT_ELASTICSEARCH_CHUNK_LIMIT_SIZE']}"
</buffer>
</store>
<store>
@type prometheus
<metric>
name fluentd_output_status_num_records_total
type counter
desc The total number of outgoing records
<labels>
tag ${tag}
hostname ${hostname}
</labels>
</metric>
</store>
</match>
</label>
<label @ppm-s3>
<filter ppm>
@type adduuid
key objectkey_uuid
</filter>
<match ppm>
@type s3
store_as json
#add_tag_prefix filtered
s3_bucket "#{ENV['EXT_S3_BUCKET']}"
s3_region eu-central-1
path ${objectkey_uuid}
s3_object_key_format %{path}__%{index}.%{file_extension}
# workaround to run with minio
"#{ENV['S3_ENDPOINT_PARAMETER']}" "#{ENV['S3_ENDPOINT_VALUE']}"
"#{ENV['AWS_KEY_ID_PARAMETER']}" "#{ENV['AWS_KEY_ID_VALUE']}"
"#{ENV['AWS_SEC_KEY_PARAMETER']}" "#{ENV['AWS_SEC_KEY_VALUE']}"
# workaround end
<buffer time,objectkey_uuid>
# https://docs.fluentd.org/v1.0/articles/buffer-section
@type memory
path /var/log/fluentd-buffers/ppm.s3.buffer
flush_mode interval
flush_interval 1s
retry_forever true
retry_type exponential_backoff
retry_max_interval 1s
retry_wait 1s
flush_thread_count 2
overflow_action block
disable_chunk_backup true
chunk_limit_size "#{ENV['EXT_ELASTICSEARCH_CHUNK_LIMIT_SIZE']}"
</buffer>
<format>
@type json
</format>
<inject>
time_key log_time
time_type string
time_format %Y-%m-%dT%H:%M:%S
utc true
</inject>
</match>
</label>
Your Error Log
Buffer overflow if we dont configure enough memory.
Additional context
No response
This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days
This issue was automatically closed because of stale in 30 days