Add single-step versions of ExecutionPlan::execute() and PhysicalExpr::evaluate() methods
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
We would like to implement new ExecutionPlans and PhysicalExprs that are very similar to existing ones in DataFusion but with some side effects. This requires revising ExecutionPlan::execute() and PhysicalExpr::evaluate() methods. The problem we have is that we end up re-evaluating and re-executing a lot just for our additional side effects to happen.
Describe the solution you'd like
We need single step, public versions of ExecutionPlan::execute() and PhysicalExpr::evaluate() methods with the execution/evaluation results of children of a node given as parameters.
Describe alternatives you've considered
I have considered copying execute and evaluate code en masse such as implementing our own slightly tweaked versions of datafusion::physical_plan::ProjectionStream which can fix the problem in the short term. However this will make upgrading almost impossible.
Additional context
I will file a PR to contribute this feature if accepted.
@iajoiner I don't quite follow what you want to do...
Is it like you want a way to override PhysicalExpr::execute for all nodes?
Have you considered a wrapper that does your side effect and then calls into the node something like:
struct PhysicalExprWithEffects {
inner: Arc<dyn PhysicalExpr>
}
impl PhysicalExpr for PhysicalExprWithEffects
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
// TODO: add your side effects here
// then call the inner expr:
self.inner.evaluate(batch)
}
...
}
Then you could use the same operators, etc
One challenge with this approach is that at the moment there is no good way to traverse down a tree of PhysicalExpr so you may have to add something like
pub trait PhysicalExpr {
....
fn children(&self) -> [&dyn PhysicalExpr];
}
to the trait
@alamb I'm already using wrappers on my side. The issue I currently have is that self.inner.evaluate(batch) evaluates all the children of the raw PhysicalExpr but none of the side effects for the children actually take place. So what happens right now is that I had to manually call evaluate for wrapped children, leading to evaluate being called way too many times for inner nodes.
What I want to achieve is to add one public method per every non-leaf node.
E.g. for IsNullExpr:
https://github.com/apache/arrow-datafusion/blob/master/datafusion/physical-expr/src/expressions/is_null.rs
pub fn single_step_evaluate(&self, input: ColumnarValue) -> Result<ColumnarValue> {
match input {
ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new(
compute::is_null(array.as_ref())?,
))),
ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar(
ScalarValue::Boolean(Some(scalar.is_null())),
)),
}
}
Then evaluate() is simply
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let arg = self.arg.evaluate(batch)?;
self.single_step_evaluate(arg)
}
That way I will simply call single_step_evaluate in IsNullExprWithSideEffects::evaluate() and handle the issue of evaluating children myself, avoiding repeated evaluation of nodes.
@iajoiner I see -- you want to annotate the entire tree
Here is what I recommend:
- add
children()andnew_with_childrento thePhysicalExprtrait that will allow you to walk the tree, following the model ofExecutionPlan:
https://docs.rs/datafusion/10.0.0/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.children https://docs.rs/datafusion/10.0.0/datafusion/physical_plan/trait.ExecutionPlan.html#tymethod.with_new_children
pub trait PhysicalExpr {
....
fn children(&self) -> [&dyn PhysicalExpr];
}
- write a function that recursively wraps all nodes in a a tree with your wrapper (bonus points for writing something like
ExprRewriterbut forPhysicalExprs- see https://docs.rs/datafusion/10.0.0/datafusion/physical_plan/trait.ExecutionPlanVisitor.html and https://docs.rs/datafusion/10.0.0/datafusion/logical_plan/trait.ExprRewriter.html
for inspiration
fn wrap_one_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
// make whatever wrapper type you want here
Arc::new(MyWrapper(expr))
}
/// Calls `wrap_one_expr` on all children in a tree of `PhysicalExpr`:
fn wrap_all_nodes(root: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
// recursively wrap all children:
let wrapped_children = root
.children()
.into_iter()
.map(wrap_all_nodes)
.collect::<Vec<_>>();
// create newly wrapped root node
wrap_one_expr(root.new_with_children(wrapped_children))
}
What do you think?