filter_watermark: The initial commit for filter watermark
Signed-off-by: Fenggang [email protected]
- This is initial commit for watermark filter feature.
- The idea is from a paper (The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing) from Google.
- From stream processing perspective current function is limited. It has a fixed watermark. It can deal with out of order data. It has a batch window. It has tolerance for data latency.
- Document PR for this filter watermark feature would be submitted soon.
- The little demo and valgrind output below are for your reference.
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
[root@localhost ~]# nc 127.0.0.1 5170 {"time":"2021-11-12 18:30:00"} {"time":"2021-11-12 18:30:01"} {"time":"2021-11-12 18:30:02"} {"time":"2021-11-12 18:30:07"} {"time":"2021-11-12 18:30:06"} {"time":"2021-11-12 18:30:04"} {"time":"2021-11-12 18:30:05"} {"time":"2021-11-12 18:31:00"}
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
[root@localhost build]# valgrind --leak-check=full bin/fluent-bit -i tcp -F watermark -p 'time_field=time' 'watermark=20' -m '*' -o stdout ==3805== Memcheck, a memory error detector ==3805== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al. ==3805== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info ==3805== Command: bin/fluent-bit -i tcp -F watermark -p time_field=time watermark=20 -m * -o stdout ==3805== Fluent Bit v1.7.4
- Copyright (C) 2019-2021 The Fluent Bit Authors
- Copyright (C) 2015-2018 Treasure Data
- Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
- https://fluentbit.io
[2021/05/08 22:12:14] [ info] [engine] started (pid=3805) [2021/05/08 22:12:14] [ info] [storage] version=1.1.1, initializing... [2021/05/08 22:12:14] [ info] [storage] in-memory [2021/05/08 22:12:14] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128 [2021/05/08 22:12:14] [ info] [input:tcp:tcp.0] listening on 0.0.0.0:5170 [2021/05/08 22:12:14] [ info] [sp] stream processor started ==3805== Warning: client switching stacks? SP change: 0x9bc2788 --> 0xa48a850 ==3805== to suppress, use: --max-stackframe=9208008 or greater ==3805== Warning: client switching stacks? SP change: 0xa48a7b8 --> 0x9bc2788 ==3805== to suppress, use: --max-stackframe=9207856 or greater ==3805== Warning: client switching stacks? SP change: 0x9bc2788 --> 0xa48a7b8 ==3805== to suppress, use: --max-stackframe=9207856 or greater ==3805== further instances of this message will not be shown. [0] tcp.0: [1620483146.702023972, {"time"=>"2021-11-12 18:30:00"}] [1] tcp.0: [1620483150.354546260, {"time"=>"2021-11-12 18:30:01"}] [2] tcp.0: [1620483153.771547131, {"time"=>"2021-11-12 18:30:02"}] [3] tcp.0: [1620483176.174986006, {"time"=>"2021-11-12 18:30:04"}] [4] tcp.0: [1620483180.376079757, {"time"=>"2021-11-12 18:30:05"}] [5] tcp.0: [1620483169.903685948, {"time"=>"2021-11-12 18:30:06"}] [6] tcp.0: [1620483163.781975807, {"time"=>"2021-11-12 18:30:07"}] ^C[2021/05/08 22:13:19] [engine] caught signal (SIGINT) [2021/05/08 22:13:19] [ warn] [engine] service will stop in 5 seconds [2021/05/08 22:13:24] [ info] [engine] service stopped ==3805== ==3805== HEAP SUMMARY: ==3805== in use at exit: 99,816 bytes in 3,380 blocks ==3805== total heap usage: 4,229 allocs, 849 frees, 5,092,444 bytes allocated ==3805== ==3805== LEAK SUMMARY: ==3805== definitely lost: 0 bytes in 0 blocks ==3805== indirectly lost: 0 bytes in 0 blocks ==3805== possibly lost: 0 bytes in 0 blocks ==3805== still reachable: 99,816 bytes in 3,380 blocks ==3805== suppressed: 0 bytes in 0 blocks ==3805== Reachable blocks (those to which a pointer was found) are not shown. ==3805== To see them, rerun with: --leak-check=full --show-leak-kinds=all ==3805== ==3805== For lists of detected and suppressed errors, rerun with: -s ==3805== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0) [root@localhost build]#
Hi @koleini & @edsiper, this is the feature i have discussed with you before. Now it is ready to be reviewed.
https://github.com/fluent/fluent-bit-docs/pull/530
Is there an well known and tested 3rd party heap implementation, as an alternative?
Is there an well known and tested 3rd party heap implementation, as an alternative?
Most of implementations are customized versions. For this plugin, i have done customization as well.
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
@edsiper @koleini , could you please review this PR ?
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
@koleini ping
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
@koleini ,can you help to review this PR ?
This pull request requires documentation and unit tests to be included. In addition, there are indentation problems in the code that needs to be fixed.
Please see https://github.com/fluent/fluent-bit/blob/master/CONTRIBUTING.md for the guidelines.
Hi @koleini , this is the doc PR https://github.com/fluent/fluent-bit-docs/pull/530 .
Let me fix the format issue and fill up unit tests.
There are still some guidelines that are not followed, for instance:
- Conditions with single likes don't have braces. See Braces usage on conditionals for the details.
- Some variable definitions are still in the middle of functions. See Variable definitions for the details.
Would be good to add tests if possible.
Would be good to add tests if possible.
Sorry,please ignore the request review, i clicked it by accident. I have tried to add some test cases, but failed to make it, will ping you separately for it.
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
@koleini please review latest code if possible.
In addition to the fixes, a proper documentation on this plugin is needed. For instance, users need to know the definition of batch window and how it works for watermarking.
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
will continue to finish this pr soon, tied up by some other affairs.
Hi @ginobiliwang, when you do resume, please be sure to make the required changes to comply with the coding style, it's really important when it comes to improving the quality and resilience of the code which is why it's a hard requirement for the code to be merged.
Side question, is the reheap data structure an ordered sliding window data store? if that's the case then I'd like to ask you what the benefit of using this would be compared to a binary tree with a thin wrapper.
I'm asking this question because to me it sounds like a binary tree would be faster and since we already have a binary tree implementation which I'd rather reuse than introduce a new data structure.
And once you are done there is one additional thing we'll need to do which is update the code to use the log event abstraction layer which you should use to decode the input chunk and encode the records your filter produces.
Once you get to that stage you can either use other filters for reference or message me directly and I'll send you some reference links and documentation (if we still haven't published it).
Hi @leonardo-albertovich, sure. Thanks so much for comments.
For data structure, it is just a simple heap structure. I do not think binary tree can replace it, but if possible could you share with me the binary tree implementation ? i could take a reference.
Thanks for information about "to use the log event abstraction layer", i will ping you on slack for details.
The binary tree we use is this well known red/black binary tree implementation https://github.com/fluent/fluent-bit/tree/master/lib/rbtree
I did not notice we have rbtree in place to use. And definitely it is ok to use rbtree for current feature.
I'm glad you find it fitting, I agree and would really appreciate it if at some point you updated your code to use it.
Thanks a lot!
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
will continue to finish this pr soon, tied up by some other affairs.
This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.
sorry for the delay, will catch up ASAP.
HI @leonardo-albertovich , i have tried to use rbtree in fluent bit repo to failed to apply. The main issue is that rbtree could not handle duplicate key. In this PR, it would deal with records with duplicate keys. So i have to revert back to use heap data structure to implement it, i think heap is the best data structure to do the job.
BTW, i have accidently closed this PR, would you please help to reopen it.