fluent-bit icon indicating copy to clipboard operation
fluent-bit copied to clipboard

filter_watermark: The initial commit for filter watermark

Open ginobiliwang opened this issue 4 years ago • 34 comments

Signed-off-by: Fenggang [email protected]

  1. This is initial commit for watermark filter feature.
  2. The idea is from a paper (The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing) from Google.
  3. From stream processing perspective current function is limited. It has a fixed watermark. It can deal with out of order data. It has a batch window. It has tolerance for data latency.
  4. Document PR for this filter watermark feature would be submitted soon.
  5. The little demo and valgrind output below are for your reference.

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

[root@localhost ~]# nc 127.0.0.1 5170 {"time":"2021-11-12 18:30:00"} {"time":"2021-11-12 18:30:01"} {"time":"2021-11-12 18:30:02"} {"time":"2021-11-12 18:30:07"} {"time":"2021-11-12 18:30:06"} {"time":"2021-11-12 18:30:04"} {"time":"2021-11-12 18:30:05"} {"time":"2021-11-12 18:31:00"}

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

[root@localhost build]# valgrind --leak-check=full bin/fluent-bit -i tcp -F watermark -p 'time_field=time' 'watermark=20' -m '*' -o stdout ==3805== Memcheck, a memory error detector ==3805== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al. ==3805== Using Valgrind-3.15.0 and LibVEX; rerun with -h for copyright info ==3805== Command: bin/fluent-bit -i tcp -F watermark -p time_field=time watermark=20 -m * -o stdout ==3805== Fluent Bit v1.7.4

  • Copyright (C) 2019-2021 The Fluent Bit Authors
  • Copyright (C) 2015-2018 Treasure Data
  • Fluent Bit is a CNCF sub-project under the umbrella of Fluentd
  • https://fluentbit.io

[2021/05/08 22:12:14] [ info] [engine] started (pid=3805) [2021/05/08 22:12:14] [ info] [storage] version=1.1.1, initializing... [2021/05/08 22:12:14] [ info] [storage] in-memory [2021/05/08 22:12:14] [ info] [storage] normal synchronization mode, checksum disabled, max_chunks_up=128 [2021/05/08 22:12:14] [ info] [input:tcp:tcp.0] listening on 0.0.0.0:5170 [2021/05/08 22:12:14] [ info] [sp] stream processor started ==3805== Warning: client switching stacks? SP change: 0x9bc2788 --> 0xa48a850 ==3805== to suppress, use: --max-stackframe=9208008 or greater ==3805== Warning: client switching stacks? SP change: 0xa48a7b8 --> 0x9bc2788 ==3805== to suppress, use: --max-stackframe=9207856 or greater ==3805== Warning: client switching stacks? SP change: 0x9bc2788 --> 0xa48a7b8 ==3805== to suppress, use: --max-stackframe=9207856 or greater ==3805== further instances of this message will not be shown. [0] tcp.0: [1620483146.702023972, {"time"=>"2021-11-12 18:30:00"}] [1] tcp.0: [1620483150.354546260, {"time"=>"2021-11-12 18:30:01"}] [2] tcp.0: [1620483153.771547131, {"time"=>"2021-11-12 18:30:02"}] [3] tcp.0: [1620483176.174986006, {"time"=>"2021-11-12 18:30:04"}] [4] tcp.0: [1620483180.376079757, {"time"=>"2021-11-12 18:30:05"}] [5] tcp.0: [1620483169.903685948, {"time"=>"2021-11-12 18:30:06"}] [6] tcp.0: [1620483163.781975807, {"time"=>"2021-11-12 18:30:07"}] ^C[2021/05/08 22:13:19] [engine] caught signal (SIGINT) [2021/05/08 22:13:19] [ warn] [engine] service will stop in 5 seconds [2021/05/08 22:13:24] [ info] [engine] service stopped ==3805== ==3805== HEAP SUMMARY: ==3805== in use at exit: 99,816 bytes in 3,380 blocks ==3805== total heap usage: 4,229 allocs, 849 frees, 5,092,444 bytes allocated ==3805== ==3805== LEAK SUMMARY: ==3805== definitely lost: 0 bytes in 0 blocks ==3805== indirectly lost: 0 bytes in 0 blocks ==3805== possibly lost: 0 bytes in 0 blocks ==3805== still reachable: 99,816 bytes in 3,380 blocks ==3805== suppressed: 0 bytes in 0 blocks ==3805== Reachable blocks (those to which a pointer was found) are not shown. ==3805== To see them, rerun with: --leak-check=full --show-leak-kinds=all ==3805== ==3805== For lists of detected and suppressed errors, rerun with: -s ==3805== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0) [root@localhost build]#

ginobiliwang avatar May 08 '21 06:05 ginobiliwang

Hi @koleini & @edsiper, this is the feature i have discussed with you before. Now it is ready to be reviewed.

ginobiliwang avatar May 08 '21 06:05 ginobiliwang

https://github.com/fluent/fluent-bit-docs/pull/530

ginobiliwang avatar May 13 '21 03:05 ginobiliwang

Is there an well known and tested 3rd party heap implementation, as an alternative?

nigels-com avatar May 28 '21 06:05 nigels-com

Is there an well known and tested 3rd party heap implementation, as an alternative?

Most of implementations are customized versions. For this plugin, i have done customization as well.

ginobiliwang avatar May 31 '21 13:05 ginobiliwang

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Jul 01 '21 01:07 github-actions[bot]

@edsiper @koleini , could you please review this PR ?

ginobiliwang avatar Jul 11 '21 00:07 ginobiliwang

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Aug 11 '21 01:08 github-actions[bot]

@koleini ping

edsiper avatar Dec 12 '21 23:12 edsiper

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Mar 15 '22 02:03 github-actions[bot]

@koleini ,can you help to review this PR ?

ginobiliwang avatar Jul 21 '22 00:07 ginobiliwang

This pull request requires documentation and unit tests to be included. In addition, there are indentation problems in the code that needs to be fixed.

Please see https://github.com/fluent/fluent-bit/blob/master/CONTRIBUTING.md for the guidelines.

koleini avatar Jul 29 '22 21:07 koleini

Hi @koleini , this is the doc PR https://github.com/fluent/fluent-bit-docs/pull/530 .

Let me fix the format issue and fill up unit tests.

ginobiliwang avatar Aug 01 '22 10:08 ginobiliwang

There are still some guidelines that are not followed, for instance:

  1. Conditions with single likes don't have braces. See Braces usage on conditionals for the details.
  2. Some variable definitions are still in the middle of functions. See Variable definitions for the details.

koleini avatar Oct 24 '22 17:10 koleini

Would be good to add tests if possible.

koleini avatar Oct 24 '22 18:10 koleini

Would be good to add tests if possible.

Sorry,please ignore the request review, i clicked it by accident. I have tried to add some test cases, but failed to make it, will ping you separately for it.

ginobiliwang avatar Oct 26 '22 02:10 ginobiliwang

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Jan 25 '23 02:01 github-actions[bot]

@koleini please review latest code if possible.

ginobiliwang avatar Jan 30 '23 09:01 ginobiliwang

In addition to the fixes, a proper documentation on this plugin is needed. For instance, users need to know the definition of batch window and how it works for watermarking.

koleini avatar Feb 01 '23 18:02 koleini

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar May 03 '23 01:05 github-actions[bot]

will continue to finish this pr soon, tied up by some other affairs.

ginobiliwang avatar May 03 '23 02:05 ginobiliwang

Hi @ginobiliwang, when you do resume, please be sure to make the required changes to comply with the coding style, it's really important when it comes to improving the quality and resilience of the code which is why it's a hard requirement for the code to be merged.

Side question, is the reheap data structure an ordered sliding window data store? if that's the case then I'd like to ask you what the benefit of using this would be compared to a binary tree with a thin wrapper.

I'm asking this question because to me it sounds like a binary tree would be faster and since we already have a binary tree implementation which I'd rather reuse than introduce a new data structure.

And once you are done there is one additional thing we'll need to do which is update the code to use the log event abstraction layer which you should use to decode the input chunk and encode the records your filter produces.

Once you get to that stage you can either use other filters for reference or message me directly and I'll send you some reference links and documentation (if we still haven't published it).

leonardo-albertovich avatar May 03 '23 07:05 leonardo-albertovich

Hi @leonardo-albertovich, sure. Thanks so much for comments.

For data structure, it is just a simple heap structure. I do not think binary tree can replace it, but if possible could you share with me the binary tree implementation ? i could take a reference.

Thanks for information about "to use the log event abstraction layer", i will ping you on slack for details.

ginobiliwang avatar May 03 '23 10:05 ginobiliwang

The binary tree we use is this well known red/black binary tree implementation https://github.com/fluent/fluent-bit/tree/master/lib/rbtree

leonardo-albertovich avatar May 03 '23 11:05 leonardo-albertovich

I did not notice we have rbtree in place to use. And definitely it is ok to use rbtree for current feature.

ginobiliwang avatar May 03 '23 11:05 ginobiliwang

I'm glad you find it fitting, I agree and would really appreciate it if at some point you updated your code to use it.

Thanks a lot!

leonardo-albertovich avatar May 03 '23 13:05 leonardo-albertovich

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Aug 03 '23 01:08 github-actions[bot]

will continue to finish this pr soon, tied up by some other affairs.

ginobiliwang avatar Aug 06 '23 06:08 ginobiliwang

This PR is stale because it has been open 45 days with no activity. Remove stale label or comment or this will be closed in 10 days.

github-actions[bot] avatar Nov 06 '23 01:11 github-actions[bot]

sorry for the delay, will catch up ASAP.

ginobiliwang avatar Nov 06 '23 03:11 ginobiliwang

HI @leonardo-albertovich , i have tried to use rbtree in fluent bit repo to failed to apply. The main issue is that rbtree could not handle duplicate key. In this PR, it would deal with records with duplicate keys. So i have to revert back to use heap data structure to implement it, i think heap is the best data structure to do the job.

BTW, i have accidently closed this PR, would you please help to reopen it.

ginobiliwang avatar Dec 19 '23 01:12 ginobiliwang