es_out: support Upstream Servers with configuration overriding
Implementation of Upstream feature for the Elasticsearch output plugin.
This pull request is based on pull request #1560 and Forward output plugin.
It was tested in a local setup with:
-
Fluent Bit without Upstream feature connected to a single node of Elasticsearch cluster consisting of 3 master-eligible/data and 1 coordinating nodes.
Refer to elastic-cluster directory of mabrarov/elastic-stack repository for Docker Compose project used to create target Elasticsearch cluster and Kibana.
fluent-bit.conf Fluent Bit configuration file used for the test - refer to fluent-bit-es/fluent-bit.conf and (same in YAML format) fluent-bit-es/fluent-bit.yaml in mabrarov/elastic-stack repository.
Debug log is available at flb_es.log.
-
Fluent Bit with Upstream feature connected to all Elasticsearch data nodes of Elasticsearch cluster consisting of 3 master-eligible/data and 1 coordinating nodes.
Refer to elastic-cluster directory of mabrarov/elastic-stack repository for Docker Compose project used to create target Elasticsearch cluster and Kibana.
fluent-bit.conf Fluent Bit configuration file used for the test - refer to fluent-bit-es-cluster/fluent-bit.conf and (same in YAML format) fluent-bit-es-cluster/fluent-bit.yaml in mabrarov/elastic-stack repository.
Debug log is available at flb_es_upstream.log.
Testing
- [x] Example configuration files for the change can be found in mabrarov/elastic-stack repository under fluent-bit-es-cluster directory.
- [x] Debug log output from testing the change - see above.
- [x] Attached Valgrind output that shows no leaks or memory corruption was found - refer to flb_run_code_analysis.log for the output of command
TEST_PRESET=valgrind SKIP_TESTS='flb-rt-out_td flb-it-network' ./run_code_analysis.sh - [N/A] Run local packaging test showing all targets (including any new ones) build.
- [N/A] Set
ok-package-testlabel to test for all targets (requires maintainer to do).
Documentation
- [x] Documentation required for this feature - refer to https://github.com/fluent/fluent-bit-docs/pull/1143.
Backporting
- [N/A] Backport to latest stable release.
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Hi reviewers,
Is it possible to approve only workflow for this pull request, so that automated checks and build can start?
Thank you.
@mabrarov sure
Hi @PettitWesley,
It looks like all failed checks are around run-macos-unit-tests jobs and caused by the following failed unit tests:
- flb-rt-in_event_test
- flb-rt-out_tcp
I feel like other pull requests have the same issues, i.e. it doesn't seem that the failed checks are caused by this pull request changes.
Help of maintainers is appreciated.
Thank you.
Hi @PettitWesley,
Is it possible to trigger automated workflow (build) for this pull request one more time? I found & fixed one issue and added tests for the new code since last build happened.
Thank you.
Hi dear reviewers,
Is it possible to get this pull request reviewed / accepted sooner? Is there something pending / waiting from my side to start review?
Thank you.
Hi @PettitWesley and @edsiper,
It feels like you are code owners for Elasticsearch output plugin. Is there something pending / waiting from my side to start review of this pull request? This new feature was requested 4 years ago and I feel it is something which multiple users of Fluent Bit (not just my team) would like to have.
Thank you.
Hi @PettitWesley and @edsiper,
Is there a chance to get this pull request reviewed (and eventually accepted)? Please let me know if something is missing and blocking that.
Thank you.
Please someone review this, I also want this feature to merged and released. thank you!
+1
Hello reviewers, could you please review this PR?
This feature also will be very useful in our project and very useful feature to support HA.
Thank you!
Just FYI, my team successfully tested replacement of following log shipping solution:
Fluent Bit DaemonSet in K8s (Forward output plugin with Upstream Servers, Upstream node per Fluentd instance) → Fluentd (2 instances with fluent-plugin-elasticsearch configured to connect to all nodes of Elasticsearch cluster) → Elasticsearch cluster (3+ nodes)
with:
Fluent Bit DaemonSet in K8s (Docker image built from source branch of this pull request, Elasticsearch output plugin with Upstream Servers, Upstream node per Elasticsearch node) → Elasticsearch cluster (3+ nodes)
For my team it means (this pull request means) a possibility to get rid of 7-9 Fluentd instances (where each Fluentd instance occupies a single VM).
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
This pull request is still actual and up-to-date (ready for merging, tested with changes coming from target branch).
Hello, @PettitWesley and @edsiper.
i hope you guys can take a look at this pr and merge it sooner, it's a highly requested feature with its mentioned in the official docs as well.
Hi @PettitWesley and @edsiper,
It looks like there is one more enhancement which this pull request brings comparing to approach "Fluent Bit (Forward output plugin with Upstream Servers) → multiple Fluentd instances → multiple Elasticsearch instances of a single Elasticsearch cluster":
- Fluentd can generate unique ID of Elasticsearch document before inserting data into Elasticsearch. This approach can help to avoid duplicating records (at Elasticsearch side) when re-sending data to Elasticsearch at Fluentd side. Refer to https://github.com/uken/fluent-plugin-elasticsearch?tab=readme-ov-file#generate-hash-id.
- Unfortunately,
elasticsearch_genidFluentd filter plugin won't help to avoid duplicated records when Fluent Bit re-sends data to Fluentd. - With this pull request we can generate unique ID of Elasticsearch document at Fluent Bit side - refer to
Generate_IDkey of Elasticsearch Fluent Bit output plugin configuration. This approach (Fluent Bit built from this pull request, Fluent Bit configured with Elasticsearch output plugin with Upstream Servers andGenerate_IDconfiguration key set toOn→ multiple Elasticsearch instances of a single Elasticsearch cluster) fully eliminates possibility to get duplicated records at Elasticsearch side due to re-sending of data.
Hi @PettitWesley,
Could you (or someone from maintainers / reviewers) please approve workflow for this pull request, so that the automated checks and build of this pull request can start?
Thank you.
FYI, the competitor of Fluent Bit - Vector - supports multiple Elasticsearch hosts in its elastcisearch sink. Refer to https://vector.dev/docs/reference/configuration/sinks/elasticsearch/#endpoints. Note that it looks like Vector uses smart approach (not like this pull request) when deciding what Elasticsearch endpoint to send data, because sink configuration supports healthcheck. Just like Fluentd Elasticsearch plugin.
Hi @cosmo0920,
Checking whether initialized or not with
own_xxxis bad idea
own_xxx is not about initialization, but is about ownership. Some data (to simplify the code and to reduce memory footprint) is shared b/w output plugin configuration and upstream node configuration, so it is required to know what entity owns the data to deallocate that data just once.
Initializing planning to be allocated variable should be initialized with NULL and checking whether it was allocated or not with
if (ec->xxx) { ... }pattern. NULL checking pattern should be enough to release allocated resources.
NULL check is not required for flb_free function, because it uses C standard library free which accepts NULL:
The function accepts (and does nothing with) the null pointer to reduce the amount of special-casing.
Thank you.
Hi @cosmo0920,
Regarding your comment - the ownership concept have no relationship with language. If we remove all own_xxx fields and run Fluent Bit with upstream servers (refer to fluent-bit-es-cluster/fluent-bit.conf in description of this pull request) and Valgrind, then we will get double deallocation error, because the same value (same address) is stored in multiple pointers. Please check the code one more time and thank you for the time you spend on review of these changes.
Hi @cosmo0920,
Regarding your comment - the ownership concept have no relationship with language. If we remove all
own_xxxfields and run Fluent Bit with upstream servers (refer to fluent-bit-es-cluster/fluent-bit.conf in description of this pull request) and Valgrind, then we will get double deallocation error, because the same value (same address) is stored in multiple pointers. Please check the code one more time and thank you for the time you spend on review of these changes.
In our code base, there is no own_xxx fields even if flb_upstream using plugins such as out_forward. This indicates that your code style does not fit for ours. Thanks.
Hi @cosmo0920,
Regarding your comment.
The only Fluent Bit output plugin supporting upstream servers (before this pull request) is Forward output plugin and that plugin seems to not support overriding common configuration of plugin per each upstream node. That's the reason Forward output plugin doesn't need to care about ownership of configuration data - each upstream node just use its own set of configuration data. User of Forward output plugin has to specify (duplicate) plugin options at upstream node level otherwise default value will be used (e.g. shared_key, username and password options), i.e there is no place to specify configuration which is common for all upstream nodes of specific Forward output plugin instance. This approach is not going to work well with Elasticsearch output plugin, because it has too many options to duplicate them per each upstream node.
Refer to the linked https://github.com/fluent/fluent-bit-docs/pull/1143 for the changes in documentation for Elasticsearch output plugin which clearly state what options can be overridden at upstream node level (or configuration of output plugin is used otherwise).
Thank you.
Hi @cosmo0920,
Regarding your comment.
The only Fluent Bit output plugin supporting upstream servers (before this pull request) is Forward output plugin and that plugin seems to not support overriding common configuration of plugin per each upstream node. That's the reason Forward output plugin doesn't need to care about ownership of configuration data - each upstream node just use its own set of configuration data. User of Forward output plugin has to specify (duplicate) plugin options at upstream node level otherwise default value will be used (e.g.
shared_key,usernameandpasswordoptions), i.e there is no place to specify configuration which is common for all upstream nodes of specific Forward output plugin instance. This approach is not going to work well with Elasticsearch output plugin, because it has too many options to duplicate them per each upstream node.Refer to the linked fluent/fluent-bit-docs#1143 for the changes in documentation for Elasticsearch output plugin which clearly state what options can be overridden at upstream node level (or configuration of output plugin is used otherwise).
Thank you.
Hi, If your suggestion is true, we need to implement overriding mechanism on configuration on upstream nodes. With the current implementation, it is too deviated from our coding rules. From the own_xxx pattern, there is not good smells.
After recent rebase tests failed. It looks like it is caused by upgraded version of CMake - refer to https://github.com/actions/runner-images/issues/11926
After recent rebase tests failed. It looks like it is caused by upgraded version of CMake - refer to actions/runner-images#11926
Fixed CMake version in CI configuration in https://github.com/fluent/fluent-bit/pull/10178 which (with https://github.com/fluent/fluent-bit/pull/10180) was cherry-picked into this PR.
@mabrarov / @cosmo0920 are the issues covered here? There's a lot of stuff in the conversation so not sure if the changes have been actioned.
Hi @patrick-stephens,
Answering your question about status of this pull request (my understanding): review of @cosmo0920 ended with conclusion that the ownership tracking (please note that it is not reference counting) implemented in this pull request doesn't follow Fluent Bit code style / architecture (which is true. IMHO, it is so, because existing Fluent Bit code doesn't have the case which this pull request has). Unfortunately, I failed to find better solution, because the only alternative - duplication of configuration per each upstream node - drastically complicates code without real benefit. I'd like Fluent Bit maintainers and community to review this pull request one more time. Maybe it's the case when it is acceptable to introduce changes into configuration data ownership handling.
Thank you.
Hi @cosmo0920,
Regarding:
We might want to use slightly different approach for achieving this.
This is because own_* flags are frequently appeared in this PR. So, we wanted to wrap them with the following struct like:
/* A wrapper for flb_sds_t that tracks memory ownership */ typedef struct { flb_sds_t sds; /* The actual string data */ int owned; /* A flag indicating if this struct owns the memory */ } flb_es_sds_t;
I like this approach, but there might be not just flb_sds_t type which will be wrapped (I'm looking at elasticsearch_config_destroy function in plugins/out_es/es_conf.c) if we decide to use this approach (C does not have C++ templates 😄). Let me check and try during this week.
Thank you.
Regarding:
We might want to use slightly different approach for achieving this. This is because own_* flags are frequently appeared in this PR. So, we wanted to wrap them with the following struct like:
/* A wrapper for flb_sds_t that tracks memory ownership */ typedef struct { flb_sds_t sds; /* The actual string data */ int owned; /* A flag indicating if this struct owns the memory */ } flb_es_sds_t;I like this approach, but there might be not just
flb_sds_ttype which will be wrapped (I'm looking atelasticsearch_config_destroyfunction in plugins/out_es/es_conf.c) if we decide to use this approach (C does not have C++ templates 😄).
Hi, Thanks for the response. I mean, I'm not objection for the defining each of necessary functions for wrapping up for shallow copies. Just wanted to conceal own_* flags.
Hi @cosmo0920,
Here is one more thing I found.
Some of configuration options - like flb_elasticsearch_config::index defined in plugins/out_es/es.h - are loaded using flb_config_map, which instance is defined at plugins/out_es/es.c - refer to https://github.com/fluent/fluent-bit/blob/42e29d6807c450e6f4a263aa51e54be150d35796/plugins/out_es/es.c#L1077
If I use custom type for such configuration options (fields of flb_elasticsearch_config struct) - like flb_es_char_ptr struct for index:
struct flb_es_char_ptr {
char *ptr;
int owned;
};
struct flb_elasticsearch_config {
/* Elasticsearch index (database) and type (table) */
struct flb_es_char_ptr index;
...
};
then it will complicate usage of flb_config_map too. Like (I'm not sure this code is correct):
static struct flb_config_map config_map[] = {
{
FLB_CONFIG_MAP_STR, FLB_ES_CONFIG_PROPERTY_INDEX, FLB_ES_DEFAULT_INDEX,
0, FLB_TRUE, offsetof(struct flb_elasticsearch_config, index) + offsetof(struct flb_es_char_ptr, ptr),
"Set an index name"
},
...
Anyway, I need to try it first. Just some thoughts.
Thank you.
Walkthrough
Adds a formatter test flush-context API and callback; refactors Elasticsearch output to support per-config and HA/node-aware routing with cloud ID parsing and AWS credential providers; introduces per-node config lifecycle, new parsing helpers, updated tests, and integrates flush-context into engine dispatch.
Changes
| Cohort / File(s) | Change Summary |
|---|---|
Test API & engine dispatchinclude/fluent-bit/flb_lib.h, include/fluent-bit/flb_output.h, src/flb_lib.c, src/flb_engine_dispatch.c |
Added forward declaration struct flb_input_instance; renamed formatter test context param to flush_ctx; added flb_output_set_test_flush_ctx_callback(...); struct flb_test_out_formatter gains flush_ctx and flush_ctx_callback; test_run_formatter computes flush_ctx via stored value or callback. |
Elasticsearch core & public headersplugins/out_es/es.c, plugins/out_es/es.h |
Added struct flb_elasticsearch_config (per-config) and HA/upstream fields; added flb_elasticsearch_target() for node-aware routing; migrated plugin logic and configuration map to use per-config properties and new property macros; adjusted default port and forward declarations. |
Elasticsearch config parsing & helpersplugins/out_es/es_conf.c, plugins/out_es/es_conf.h, plugins/out_es/es_conf_parse.c, plugins/out_es/es_conf_parse.h, plugins/out_es/es_conf_prop.h |
Added per-node config creation/teardown and HA vs simple paths (es_config_ha, es_config_simple); Cloud ID parsing and cloud_auth handling; AWS provider, STS and unsigned headers setup (FLB_HAVE_AWS guarded); added property macros and new public APIs (flb_es_upstream_conf, cloud/AWS setters); changed flb_es_conf_destroy return type. |
Elasticsearch build fileplugins/out_es/CMakeLists.txt |
Included es_conf_parse.c in plugin sources. |
Elasticsearch teststests/runtime/out_elasticsearch.c |
Added upstream helper and multiple upstream-focused tests; register per-test flush-context callback (cb_flush_context) and use new flush-context APIs in test setup. |
Sequence Diagram(s)
sequenceDiagram
participant TestAPI as Test API
participant Engine as Engine
participant Output as Output Instance
participant FlushCB as FlushCtxCallback (opt)
participant Formatter as Formatter Callback
Note right of TestAPI `#E8F8F5`: register formatter test + optional flush_ctx_callback
TestAPI->>Output: register formatter test + optional flush_ctx_callback
Engine->>Output: start formatter test run (test_run_formatter)
alt flush_ctx_callback provided
Output->>FlushCB: call flush_ctx_callback(config, i_ins, plugin_ctx, flush_ctx)
FlushCB-->>Output: return per-run flush_ctx
Note right of Output `#DDEBF7`: computed flush_ctx used for formatter
else no flush_ctx_callback
Note right of Output `#F7F1DD`: use stored flush_ctx
end
Output->>Formatter: invoke formatter callback with computed flush_ctx
Formatter-->>Output: return formatted result
Output-->>Engine: deliver result
Estimated code review effort
🎯 4 (Complex) | ⏱️ ~60 minutes
- Pay extra attention to:
- HA/upstream selection and node-targeting in
plugins/out_es/es.c(flb_elasticsearch_target). - Ownership, allocation and free paths across
es_conf.c/es_conf_parse.c. - AWS credential/provider and TLS lifecycle (FLB_HAVE_AWS conditional code).
- Integration of flush_ctx callback APIs with engine dispatch (
src/flb_engine_dispatch.c,src/flb_lib.c). - Tests that register flush-context callbacks (
tests/runtime/out_elasticsearch.c).
- HA/upstream selection and node-targeting in
Suggested reviewers
- PettitWesley
- cosmo0920
- edsiper
- fujimotos
Poem
I'm a rabbit in a code-lined hat,
I hop through nodes and map each path.
Flush contexts bind where formatters chat,
Upstreams steer routes — no more stray math.
Tests thump their paws; the patch finds its craft. 🐇✨
Pre-merge checks and finishing touches
❌ Failed checks (1 warning)
| Check name | Status | Explanation | Resolution |
|---|---|---|---|
| Docstring Coverage | ⚠️ Warning | Docstring coverage is 11.48% which is insufficient. The required threshold is 80.00%. | You can run @coderabbitai generate docstrings to improve docstring coverage. |
✅ Passed checks (2 passed)
| Check name | Status | Explanation |
|---|---|---|
| Description Check | ✅ Passed | Check skipped - CodeRabbit’s high-level summary is enabled. |
| Title check | ✅ Passed | The title directly and clearly describes the main feature being implemented: upstream server support with per-node configuration overriding for the Elasticsearch output plugin. |
✨ Finishing touches
- [ ] 📝 Generate docstrings
🧪 Generate unit tests (beta)
- [ ] Create PR with unit tests
- [ ] Post copyable unit tests in a comment
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
Comment @coderabbitai help to get the list of available commands and usage tips.
@mabrarov can you address the changes requested?