join_asof out-of-order error for big sorted tables
Describe the usage question you have. Please include as many useful details as possible.
With pyarrow 16.0.0, I can't apply join_asof although the input tables are ordered by "on" key. Noticed when trying to merge bigger sorted tables - for example, it fails for tables with rows numbers 1061753 & 994046, but can be executed if I reduce numbers to 1048178 & 975257.
I think this behavior can be reproduced with an example below:
import numpy as np
ts0 = 0
nticks = 2_000_000 # it's OK for nticks = 1_000_000
ncats = 10
ticks = np.arange(ts0, ts0 + nticks)
cats = np.arange(0, ncats).repeat(nticks/ncats)
t1 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
t2 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
t1.join_asof(t2, on="ts", tolerance=-10, by="cats")
# Last line fails with error:
---------------------------------------------------------------------------
ArrowInvalid Traceback (most recent call last)
Cell In[273], line 10
8 t1 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
9 t2 = pa.Table.from_pydict({"ts": ticks, "cats": cats})
---> 10 t1.join_asof(t2, on="ts", tolerance=-10, by="cats")
File /lib/python3.10/site-packages/pyarrow/table.pxi:5528, in pyarrow.lib.Table.join_asof()
File /lib/python3.10/site-packages/pyarrow/acero.py:333, in _perform_join_asof(left_operand, left_on, left_by, right_operand, right_on, right_by, tolerance, use_threads, output_type)
326 join_opts = AsofJoinNodeOptions(
327 left_on, left_by, right_on, right_by, tolerance
328 )
329 decl = Declaration(
330 "asofjoin", options=join_opts, inputs=[left_source, right_source]
331 )
--> 333 result_table = decl.to_table(use_threads=use_threads)
335 if output_type == Table:
336 return result_table
File /lib/python3.10/site-packages/pyarrow/_acero.pyx:590, in pyarrow._acero.Declaration.to_table()
File /lib/python3.10/site-packages/pyarrow/error.pxi:154, in pyarrow.lib.pyarrow_internal_check_status()
File /lib/python3.10/site-packages/pyarrow/error.pxi:91, in pyarrow.lib.check_status()
ArrowInvalid: AsofJoin does not allow out-of-order on-key values
So I suspect the issue has nothing to do with the on-key values order, but rather the input size? Is it the bug that can be fixed or some fundamental limitation? Is there any workaround other than limiting input size?
Component(s)
Python
I was suspecting if pre-joining with by-key may cause the on-key reordering? As discussed here: https://lists.apache.org/[email protected]:2024-4:join
However, the above example still fails even with empty by-key: t1.join_asof(t2, on="ts", tolerance=-10, by=[])
I have a similar issue with a smaller table.
It only happens if I have a lot of small chunks in the table.
Here's an example:
import pyarrow as pa
import pytest
from pandas import Timestamp
LEFT = [
{"left_on": Timestamp("2023-09-07 12:00:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 12:15:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 12:30:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 12:45:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 13:00:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 13:15:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 13:30:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 13:45:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 14:00:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 14:15:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 14:30:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 14:45:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 15:00:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 15:15:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 15:30:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 15:45:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 16:00:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 16:15:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 16:30:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 16:45:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 17:00:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 17:15:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 17:30:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 17:45:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 18:00:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 18:15:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 18:30:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 18:45:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 19:00:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 19:15:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 19:30:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 19:45:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 20:00:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 20:15:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 20:30:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 20:45:00+0000", tz="UTC"), "left_by": "SYM1"},
{"left_on": Timestamp("2023-09-07 21:00:00+0000", tz="UTC"), "left_by": "SYM1"},
]
RIGHT = [
{
"right_on": Timestamp("2023-09-07 15:00:00+0000", tz="UTC"),
"right_by": "SYM1",
}
]
def test_asofjoin_order():
left: pa.Table = pa.Table.from_pylist(LEFT)
right = pa.Table.from_pylist(RIGHT)
left = pa.concat_tables(left[i : i + 1] for i in range(left.num_rows))
assert left[left.column_names[0]] == left[left.column_names[0]].sort()
assert right[right.column_names[0]] == right[right.column_names[0]].sort()
with pytest.raises(
pa.ArrowInvalid, match="AsofJoin does not allow out-of-order on-key values"
):
left.join_asof(
right,
on=left.column_names[0],
by=left.column_names[1],
right_on=right.column_names[0],
right_by=right.column_names[1],
tolerance=-9_223_372_036_854_775_808,
)
it took a while to make a reproducible example. I can't exactly pin down what is causing the issue.
This also happened for me. I am using acero C++ api
I think I narrowed down the problem. The as-of-join node does not sequence incoming ExecBatches. Here is a patch I created. It probably is not the optimal solution. Maybe sequencer should be incorporated in BackpressureConcurrentQueue?
I created a PR with test that cause failure with specyfic parameters.
Issue resolved by pull request 44083 https://github.com/apache/arrow/pull/44083