fides
fides copied to clipboard
Validate that the graph is a DAG
Is your feature request related to a specific problem?
We have validation that collections can't be self-referencing, and that every node can be reached, but no validation that the graph is actually a DAG. We ran into an example where collection A was referencing collection B and collection B was referencing collection A, which throws an error in Dask, which is tricky to debug.
v = dask.delayed(get(dsk, TERMINATOR_ADDRESS, num_workers=1))
File "/usr/local/lib/python3.9/site-packages/dask/threaded.py", line 89, in get
results = get_async(
File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 438, in get_async
keyorder = order(dsk)
File "/usr/local/lib/python3.9/site-packages/dask/order.py", line 117, in order
metrics = graph_metrics(dependencies, dependents, total_dependencies)
File "/usr/local/lib/python3.9/site-packages/dask/order.py", line 888, in graph_metrics
val = total_dependencies[key]
KeyError: __TERMINATE__:__TERMINATE__
This can be reproduced with the minimum working example:
import dask
from dask.threaded import get
from dask import delayed
inc = lambda a: a + 1
good_dsk = {'a': 1, 'b': (inc, 'a'), 'TERMINATE': (inc, 'b')}
delayed(get(good_dsk, 'TERMINATE')).compute()
# Collection b references collection c, and collection c references collection b
bad_dsk = {'a': 1, 'b': (inc, 'a', 'c'), 'c': (inc, 'b'), 'TERMINATE': (inc, 'c')}
delayed(get(bad_dsk, 'TERMINATE')).compute()
Describe the solution you'd like
Add validation that the graph is a DAG. We could probably utilize a dask method itself for this. Maybe add in __verify_traversal.
Describe alternatives you've considered, if any
A description of any alternative solutions or features you've considered.
Additional context
Add any other context or screenshots about the feature request here.