fides icon indicating copy to clipboard operation
fides copied to clipboard

Validate that the graph is a DAG

Open pattisdr opened this issue 3 years ago • 0 comments

Is your feature request related to a specific problem?

We have validation that collections can't be self-referencing, and that every node can be reached, but no validation that the graph is actually a DAG. We ran into an example where collection A was referencing collection B and collection B was referencing collection A, which throws an error in Dask, which is tricky to debug.

  v = dask.delayed(get(dsk, TERMINATOR_ADDRESS, num_workers=1))
  File "/usr/local/lib/python3.9/site-packages/dask/threaded.py", line 89, in get
    results = get_async(
  File "/usr/local/lib/python3.9/site-packages/dask/local.py", line 438, in get_async
    keyorder = order(dsk)
  File "/usr/local/lib/python3.9/site-packages/dask/order.py", line 117, in order
    metrics = graph_metrics(dependencies, dependents, total_dependencies)
  File "/usr/local/lib/python3.9/site-packages/dask/order.py", line 888, in graph_metrics
    val = total_dependencies[key]
KeyError: __TERMINATE__:__TERMINATE__

This can be reproduced with the minimum working example:

import dask
from dask.threaded import get
from dask import delayed

inc = lambda a: a + 1

good_dsk = {'a': 1, 'b': (inc, 'a'), 'TERMINATE': (inc, 'b')}
delayed(get(good_dsk, 'TERMINATE')).compute()

# Collection b references collection c, and collection c references collection b
bad_dsk = {'a': 1, 'b': (inc, 'a', 'c'), 'c': (inc, 'b'), 'TERMINATE': (inc, 'c')}
delayed(get(bad_dsk, 'TERMINATE')).compute()

Describe the solution you'd like

Add validation that the graph is a DAG. We could probably utilize a dask method itself for this. Maybe add in __verify_traversal.

Describe alternatives you've considered, if any

A description of any alternative solutions or features you've considered.

Additional context

Add any other context or screenshots about the feature request here.

pattisdr avatar Oct 13 '22 23:10 pattisdr