arrow icon indicating copy to clipboard operation
arrow copied to clipboard

join_asof out-of-order error for big sorted tables

Open bepec opened this issue 1 year ago • 1 comments

Describe the usage question you have. Please include as many useful details as possible.

With pyarrow 16.0.0, I can't apply join_asof although the input tables are ordered by "on" key. Noticed when trying to merge bigger sorted tables - for example, it fails for tables with rows numbers 1061753 & 994046, but can be executed if I reduce numbers to 1048178 & 975257.

I think this behavior can be reproduced with an example below:

import numpy as np
ts0 = 0
nticks = 2_000_000 # it's OK for nticks = 1_000_000
ncats = 10
ticks = np.arange(ts0, ts0 + nticks)
cats = np.arange(0, ncats).repeat(nticks/ncats)
t1 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
t2 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
t1.join_asof(t2, on="ts", tolerance=-10, by="cats")

# Last line fails with error:
---------------------------------------------------------------------------
ArrowInvalid                              Traceback (most recent call last)
Cell In[273], line 10
      8 t1 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
      9 t2 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
---> 10 t1.join_asof(t2, on="ts", tolerance=-10, by="cats")

File /lib/python3.10/site-packages/pyarrow/table.pxi:5528, in pyarrow.lib.Table.join_asof()

File /lib/python3.10/site-packages/pyarrow/acero.py:333, in _perform_join_asof(left_operand, left_on, left_by, right_operand, right_on, right_by, tolerance, use_threads, output_type)
    326 join_opts = AsofJoinNodeOptions(
    327     left_on, left_by, right_on, right_by, tolerance
    328 )
    329 decl = Declaration(
    330     "asofjoin", options=join_opts, inputs=[left_source, right_source]
    331 )
--> 333 result_table = decl.to_table(use_threads=use_threads)
    335 if output_type == Table:
    336     return result_table

File /lib/python3.10/site-packages/pyarrow/_acero.pyx:590, in pyarrow._acero.Declaration.to_table()

File /lib/python3.10/site-packages/pyarrow/error.pxi:154, in pyarrow.lib.pyarrow_internal_check_status()

File /lib/python3.10/site-packages/pyarrow/error.pxi:91, in pyarrow.lib.check_status()

ArrowInvalid: AsofJoin does not allow out-of-order on-key values

So I suspect the issue has nothing to do with the on-key values order, but rather the input size? Is it the bug that can be fixed or some fundamental limitation? Is there any workaround other than limiting input size?

Component(s)

Python

bepec avatar May 17 '24 10:05 bepec

I was suspecting if pre-joining with by-key may cause the on-key reordering? As discussed here: https://lists.apache.org/[email protected]:2024-4:join However, the above example still fails even with empty by-key: t1.join_asof(t2, on="ts", tolerance=-10, by=[])

bepec avatar May 17 '24 10:05 bepec

I have a similar issue with a smaller table.

It only happens if I have a lot of small chunks in the table.

Here's an example:

import pyarrow as pa
import pytest
from pandas import Timestamp

LEFT = [
    {"left_on": Timestamp("2023-09-07 12:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 12:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 12:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 12:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 13:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 13:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 13:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 13:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 14:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 14:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 14:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 14:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 15:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 15:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 15:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 15:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 16:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 16:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 16:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 16:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 17:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 17:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 17:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 17:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 18:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 18:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 18:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 18:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 19:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 19:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 19:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 19:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 20:00:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 20:15:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 20:30:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 20:45:00+0000", tz="UTC"), "left_by": "SYM1"},
    {"left_on": Timestamp("2023-09-07 21:00:00+0000", tz="UTC"), "left_by": "SYM1"},
]
RIGHT = [
    {
        "right_on": Timestamp("2023-09-07 15:00:00+0000", tz="UTC"),
        "right_by": "SYM1",
    }
]


def test_asofjoin_order():
    left: pa.Table = pa.Table.from_pylist(LEFT)
    right = pa.Table.from_pylist(RIGHT)

    left = pa.concat_tables(left[i : i + 1] for i in range(left.num_rows))
    assert left[left.column_names[0]] == left[left.column_names[0]].sort()
    assert right[right.column_names[0]] == right[right.column_names[0]].sort()
    with pytest.raises(
        pa.ArrowInvalid, match="AsofJoin does not allow out-of-order on-key values"
    ):
        left.join_asof(
            right,
            on=left.column_names[0],
            by=left.column_names[1],
            right_on=right.column_names[0],
            right_by=right.column_names[1],
            tolerance=-9_223_372_036_854_775_808,
        )

it took a while to make a reproducible example. I can't exactly pin down what is causing the issue.

0x26res avatar May 23 '24 15:05 0x26res

This also happened for me. I am using acero C++ api

gitmodimo avatar Jun 12 '24 17:06 gitmodimo

I think I narrowed down the problem. The as-of-join node does not sequence incoming ExecBatches. Here is a patch I created. It probably is not the optimal solution. Maybe sequencer should be incorporated in BackpressureConcurrentQueue?

gitmodimo avatar Jun 20 '24 15:06 gitmodimo

I created a PR with test that cause failure with specyfic parameters.

mroz45 avatar Sep 16 '24 11:09 mroz45

Issue resolved by pull request 44083 https://github.com/apache/arrow/pull/44083

westonpace avatar Oct 29 '24 21:10 westonpace