refactor DML plan
What type of PR is this?
- [ ] API-change
- [ ] BUG
- [ ] Improvement
- [ ] Documentation
- [x] Feature
- [ ] Test and CI
- [x] Code Refactoring
Which issue(s) this PR fixes:
issue #17540 #17539
What this PR does / why we need it:
implement dedup_join
@badboynt1
pipeline for ap query! current cn 127.0.0.1:18000 sql: insert into t1 select result+999999 from generate_series(1000000) tf Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [1]) Pipeline: merge -> dedup join -> lockop -> projection -> multi update PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [0]) Pipeline: merge -> shuffle build PreScopes: { Scope 1 (Magic: Merge, addr:127.0.0.1:18000, mcpu: 1, Receiver: []) DataSource: [] Pipeline: valuescan -> tablefunction -> projection -> projection -> shuffle -> dispatch shuffle to all of MergeReceiver [0, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown] } Scope 2 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 8, Receiver: []) DataSource: test.t1[a] Pipeline: tablescan -> projection -> shuffle -> dispatch shuffle to all of MergeReceiver [1, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown, unknown] }
pipeline for ap query! current cn 127.0.0.1:18000 sql: select * from t1 dedup join (select cast(result+999999 as int) result from generate_series(1000000) tmp) tf on a = result Scope 1 (Magic: Merge, addr:127.0.0.1:18000, mcpu: 1, Receiver: [48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71]) Pipeline: merge -> output PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [1]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [48] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [0]) Pipeline: merge -> shuffle build PreScopes: { Scope 1 (Magic: Merge, addr:127.0.0.1:18000, mcpu: 1, Receiver: []) DataSource: [] Pipeline: valuescan -> tablefunction -> projection -> projection -> shuffle -> dispatch shuffle to all of MergeReceiver [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46] } Scope 2 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 8, Receiver: []) DataSource: test.t1[a] Pipeline: tablescan -> projection -> shuffle -> dispatch shuffle to all of MergeReceiver [1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47] } Scope 2 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [3]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [49] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [2]) Pipeline: merge -> shuffle build } Scope 3 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [5]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [50] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [4]) Pipeline: merge -> shuffle build } Scope 4 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [7]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [51] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [6]) Pipeline: merge -> shuffle build } Scope 5 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [9]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [52] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [8]) Pipeline: merge -> shuffle build } Scope 6 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [11]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [53] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [10]) Pipeline: merge -> shuffle build } Scope 7 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [13]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [54] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [12]) Pipeline: merge -> shuffle build } Scope 8 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [15]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [55] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [14]) Pipeline: merge -> shuffle build } Scope 9 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [17]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [56] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [16]) Pipeline: merge -> shuffle build } Scope 10 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [19]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [57] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [18]) Pipeline: merge -> shuffle build } Scope 11 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [21]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [58] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [20]) Pipeline: merge -> shuffle build } Scope 12 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [23]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [59] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [22]) Pipeline: merge -> shuffle build } Scope 13 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [25]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [60] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [24]) Pipeline: merge -> shuffle build } Scope 14 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [27]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [61] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [26]) Pipeline: merge -> shuffle build } Scope 15 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [29]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [62] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [28]) Pipeline: merge -> shuffle build } Scope 16 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [31]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [63] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [30]) Pipeline: merge -> shuffle build } Scope 17 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [33]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [64] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [32]) Pipeline: merge -> shuffle build } Scope 18 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [35]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [65] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [34]) Pipeline: merge -> shuffle build } Scope 19 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [37]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [66] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [36]) Pipeline: merge -> shuffle build } Scope 20 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [39]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [67] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [38]) Pipeline: merge -> shuffle build } Scope 21 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [41]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [68] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [40]) Pipeline: merge -> shuffle build } Scope 22 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [43]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [69] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [42]) Pipeline: merge -> shuffle build } Scope 23 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [45]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [70] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [44]) Pipeline: merge -> shuffle build } Scope 24 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [47]) Pipeline: merge -> dedup join -> projection -> connect to MergeReceiver [71] PreScopes: { Scope 1 (Magic: Remote, addr:127.0.0.1:18000, mcpu: 1, Receiver: [46]) Pipeline: merge -> shuffle build } }
[ERROR] [SCRIPT FILE]: /home/aunjbr/codebase/work/matrixone/test/distributed/cases/dml/delete/delete_index.test [ROW NUMBER]: 184 [SQL STATEMENT]: insert into t1 values(NULL, 5, 5); [EXPECT RESULT]: Duplicate entry '(5,5)' for key '(b,c)' [ACTUAL RESULT]: Duplicate entry '(5,5)' for key '(b,c)'
create table t1(a int);
insert into t1 select result from generate_series(10000000) g; // 连续跑10次
delete from t1 where a < 9900000;
以上语句可以成功执行。几秒钟之后日志里看到如下panic信息。
2024/10/24 14:14:36.473459 [ants]: worker exits from panic: runtime error: index out of range [0] with length 0 goroutine 4792 [running]: runtime/debug.Stack() /usr/lib/go/src/runtime/debug/stack.go:26 +0x5e github.com/panjf2000/ants/v2.(*goWorker).run.func1.1() /home/aunjbr/go/pkg/mod/github.com/panjf2000/ants/[email protected]/worker.go:60 +0xd4 panic({0x4d86ca0?, 0xc0365994e8?}) /usr/lib/go/src/runtime/panic.go:785 +0x132 github.com/matrixorigin/matrixone/pkg/container/types.DecodeUint32(...) /home/aunjbr/codebase/work/matrixone/pkg/container/types/encoding.go:166 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index.(*hybridFilter).Unmarshal(0x5a3fcd8?, {0xc0352a4484?, 0xc00b604050?, 0x4f?}) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/index/hybrid_filter.go:120 +0x2b9 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index.DecodeBloomFilter(...) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/index/filter.go:34 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables.(*persistedNode).FillBlockTombstones(0xc0a07ecd08, {0x5a3fcd8, 0xc010672780}, {0x7bbd605c9fe8?, 0xc0ad5fb250?}, 0xc036599488, 0xc036dae490, 0x0, 0xc001120fc0) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/tables/pnode.go:300 +0x452 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables.(*baseObject).FillBlockTombstones(0xc005f03d10, {0x5a3fcd8, 0xc010672780}, {0x7bbd605c9fe8, 0xc0ad5fb250}, 0xc036599488, 0xc036dae490, 0x0, 0xc001120fc0) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/tables/base.go:459 +0x10b github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables.HybridScanByBlock({0x5a3fcd8, 0xc010672780}, 0xc031b4eaa0, {0x7bbd605c9fe8, 0xc0ad5fb250}, 0xc05a5ff0c0, 0xc031b56d20, {0xc024aa0eb0, 0x2, 0x2}, ...) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/tables/table_scan.go:62 +0x2f2 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/txn/txnimpl.(*txnObject).HybridScan(0xc0712e5740, {0x5a3fcd8, 0xc010672780}, 0xc05a5ff0c0, 0xc938?, {0xc024aa0eb0, 0x2, 0x2}, 0xc001120fc0) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/txn/txnimpl/object.go:241 +0x171 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs.(*mergeObjectsTask).LoadNextBatch(0xc0365f18c0, {0x5a3fcd8, 0xc010672780}, 0x0?) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/tables/jobs/mergeobjects.go:217 +0x26d github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort.reshape({0x5a3fcd8, 0xc010672780}, {0x5ae6bd0, 0xc0365f18c0}) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/mergesort/reshaper.go:58 +0x2c5 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/mergesort.DoMergeAndWrite({0x5a3fcd8, 0xc010672780}, {0xc036431a40, 0x6b}, 0xffffffffffffffff, {0x5ae6bd0, 0xc0365f18c0}, 0x0) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/mergesort/task.go:136 +0xb28 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/jobs.(*mergeObjectsTask).Execute(0xc0365f18c0, {0x5a3fcd8, 0xc010672780}) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/tables/jobs/mergeobjects.go:358 +0x473 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops.(*Op).OnExec(0xc03627eee0, {0x5a3fcd8, 0xc010672780}) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/tasks/ops/ops.go:103 +0x83 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db.(*ScheduledTxnTask).Execute(0xc07076e390, {0x5a3fcd8, 0xc010672780}) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/db/task.go:64 +0x16c github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/ops.(*Op).OnExec(0xc036bc40e0, {0x5a3fcd8, 0xc010672780}) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/tasks/ops/ops.go:103 +0x83 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks/worker.(*OpWorker).onOp(0xc00b660800, {0x7bbd61d424a8, 0xc07076e390}) /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/tasks/worker/worker.go:210 +0x37 github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tasks.(*poolHandler).doHandle.(*poolHandler).doHandle.func1.func2() /home/aunjbr/codebase/work/matrixone/pkg/vm/engine/tae/tasks/poolhandler.go:60 +0x2c github.com/panjf2000/ants/v2.(*goWorker).run.func1() /home/aunjbr/go/pkg/mod/github.com/panjf2000/ants/[email protected]/worker.go:71 +0x8a created by github.com/panjf2000/ants/v2.(*goWorker).run in goroutine 1932 /home/aunjbr/go/pkg/mod/github.com/panjf2000/ants/[email protected]/worker.go:48 +0x5c