struct FilterDescription {
/// Expressions coming from the parent nodes
filters: Vec<Arc<dyn PhysicalExpr>>
}
/// Result of try_pushdown_filters(), storing the necessary information about a filter pushdown trial
struct FilterPushdownResult {
// Filter predicates which can be pushed down through the operator.
// NOTE that these are not placed into any operator.
child_filters: Vec<FilterDescription>,
// Filters which cannot be pushed down through the operator.
// NOTE that caller of try_pushdown_filters() should handle these remanining predicates,
// possibly introducing a FilterExec on top of this operator.
remaining_filters: FilterDescription
// Possibly updated new operator
operator: Arc<dyn ExecutionPlan>
}
impl ExecutionPlan {
fn try_pushdown_filters(&self, fd: FilterDescription) -> Result<FilterPushdownResult> {
...
}
}
The snippet above shows the new structs and API. Let me explain how these are defined and behave in the planning phase:
Initial plan:
- FilterExec: a@0 = foo
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0])
- FilterExec: a@0 = foo
- CoalesceBatchesExec: target_batch_size=1
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test
Define the PlanContext type, and start the rule
pub type FilterDescriptionContext = PlanContext<FilterDescription>;
...
// Create an empty FilterDescription for the root operator
let context = FilterDescriptionContext::new_default();
context.transform_down(|node| {
let FilterPushdownResult {
child_filters, remaining_filters, operator
} = node.plan.try_pushdown_filters(node.data)?;
// Here, the operator was FilterExec, so the fields are filled like this:
// child_filters: [[a@0 = foo]], remaining_filters: [], operator: AggregateExec...
if remaining_filters.is_empty() {
// No need to add a FilterExec, continue with the next operator
for child_index in 0..self.children.len {
node.children[child_index].data = child_filters[child_index];
}
return Ok(Transformed::yes(node)); // this closure returns here for the first node FilterExec
} else {
...
}
})
FilterExec at the root is removed now, but we have a filter which is trying to be pushed down. Let’s go with the next node
context.transform_down(|node| {
let FilterPushdownResult {
child_filters, remaining_filters, operator
} = node.plan.try_pushdown_filters(node.data)?;
// node.data is [a@0 = foo] above, and plan is AggregateExec
// Fields are filled like this:
// child_filters: [[]], remaining_filters: [a@0 = foo], operator: AggregateExec...
if remaining_filters.is_empty() {
...
} else {
// Now, we have some filters which cannot be pushed down more. So, let's add a FilterExec:
node.plan = FilterExec::try_new(predicate: remaining_filters, input: node.plan);
node.children = [node.clone().clear_node_data()];
node.data = Default::default();
// there is a more idiomatic way of doing these last 3 lines, FYI
}
})
Now, the plan is the same of how it starts, but the rule works on AggregateExec now, with an empty FilterDescription
context.transform_down(|node| {
let FilterPushdownResult {
child_filters, remaining_filters, operator
} = node.plan.try_pushdown_filters(node.data)?;
// node.data is [] above, and plan is AggregateExec
// Fields are "child_filters: [[]], remaining_filters: [], operator: AggregateExec..."
if remaining_filters.is_empty() {
// No need to add a FilterExec, continue with the next operator
for child_index in 0..self.children.len {
// the children node data is empty
node.children[child_index].data = child_filters[child_index];
}
return Ok(Transformed::yes(node)); // no problem to return "yes" here, but it actually doesn't change
} else {
...
}
})
Now, the rule works as if it’s initialized freshly on this plan without any accumulated filter knowledge:
- FilterExec: a@0 = foo
- CoalesceBatchesExec: target_batch_size=1
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test
context.transform_down(|node| {
let FilterPushdownResult {
child_filters, remaining_filters, operator
} = node.plan.try_pushdown_filters(node.data)?;
// Here, the operator was FilterExec, so the fields are filled like this:
// child_filters: [[a@0 = foo]], remaining_filters: [], operator: CoalesceBatchesExec...
if remaining_filters.is_empty() {
// No need to add a FilterExec, continue with the next operator
for child_index in 0..self.children.len {
node.children[child_index].data = child_filters[child_index];
}
return Ok(Transformed::yes(node));
} else {
...
}
})
context.transform_down(|node| {
let FilterPushdownResult {
child_filters, remaining_filters, operator
} = node.plan.try_pushdown_filters(node.data)?;
// Here, the operator was CoalesceBatchesExec, so the fields are filled like this:
// child_filters: [[a@0 = foo]], remaining_filters: [], operator: CoalesceBatchesExec...
if remaining_filters.is_empty() {
// No need to add a FilterExec, continue with the next operator
for child_index in 0..self.children.len {
node.children[child_index].data = child_filters[child_index];
}
return Ok(Transformed::yes(node));
} else {
...
}
})
context.transform_down(|node| {
let FilterPushdownResult {
child_filters, remaining_filters, operator
} = node.plan.try_pushdown_filters(node.data)?;
// Here, the operator was DataSourceExec WITHOUT a filter, so the fields are filled like this:
// child_filters: [[]], remaining_filters: [], operator: DataSourceExec WITH a filter
if remaining_filters.is_empty() {
// No need to add a FilterExec, continue with the next operator
for child_index in 0..self.children.len {
node.children[child_index].data = child_filters[child_index];
}
return Ok(Transformed::yes(node));
} else {
...
}
})
The rule part is more or less is like that. Now, let’s examine the try_pushdown_filters()
impls for some operators
fn try_pushdown_filters(&self, fd: FilterDescription) -> Result<FilterPushdownResult> {
let child_filters = [[]]; // Parent filters can't be passed onto children.
let remaining_filters = [fd]; // We have no filters to contribute.
let new_self = self;
Ok(FilterPushdownResult {
child_filters, remaining_filters, new_self
})
}