struct FilterDescription {
  /// Expressions coming from the parent nodes
  filters: Vec<Arc<dyn PhysicalExpr>>
}

/// Result of try_pushdown_filters(), storing the necessary information about a filter pushdown trial
struct FilterPushdownResult {
  // Filter predicates which can be pushed down through the operator. 
  // NOTE that these are not placed into any operator.
  child_filters: Vec<FilterDescription>,
  // Filters which cannot be pushed down through the operator.
  // NOTE that caller of try_pushdown_filters() should handle these remanining predicates,
  // possibly introducing a FilterExec on top of this operator.
  remaining_filters: FilterDescription
  // Possibly updated new operator
  operator: Arc<dyn ExecutionPlan>
}

impl ExecutionPlan {
    fn try_pushdown_filters(&self, fd: FilterDescription) -> Result<FilterPushdownResult> {
    ...
    }
}

The snippet above shows the new structs and API. Let me explain how these are defined and behave in the planning phase:

Initial plan:

- FilterExec: a@0 = foo
-   AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0])
-     FilterExec: a@0 = foo
-       CoalesceBatchesExec: target_batch_size=1
-         DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test

Define the PlanContext type, and start the rule

pub type FilterDescriptionContext = PlanContext<FilterDescription>;

...

// Create an empty FilterDescription for the root operator
let context = FilterDescriptionContext::new_default();

context.transform_down(|node| {
    let FilterPushdownResult {
        child_filters, remaining_filters, operator
    } = node.plan.try_pushdown_filters(node.data)?;
    
    // Here, the operator was FilterExec, so the fields are filled like this:
    // child_filters: [[a@0 = foo]], remaining_filters: [], operator: AggregateExec...
    if remaining_filters.is_empty() {
        // No need to add a FilterExec, continue with the next operator
        for child_index in 0..self.children.len {
            node.children[child_index].data = child_filters[child_index];
        }
        return Ok(Transformed::yes(node)); // this closure returns here for the first node FilterExec
    } else {
        ...
    }
})

FilterExec at the root is removed now, but we have a filter which is trying to be pushed down. Let’s go with the next node

context.transform_down(|node| {
    let FilterPushdownResult {
        child_filters, remaining_filters, operator
    } = node.plan.try_pushdown_filters(node.data)?;
    // node.data is [a@0 = foo] above, and plan is AggregateExec
    // Fields are filled like this:
    // child_filters: [[]], remaining_filters: [a@0 = foo], operator: AggregateExec...
    if remaining_filters.is_empty() {
        ...
    } else {
        // Now, we have some filters which cannot be pushed down more. So, let's add a FilterExec:
        node.plan = FilterExec::try_new(predicate: remaining_filters, input: node.plan);
        node.children = [node.clone().clear_node_data()];
        node.data = Default::default();
        // there is a more idiomatic way of doing these last 3 lines, FYI
    }
})

Now, the plan is the same of how it starts, but the rule works on AggregateExec now, with an empty FilterDescription

context.transform_down(|node| {
    let FilterPushdownResult {
        child_filters, remaining_filters, operator
    } = node.plan.try_pushdown_filters(node.data)?;
    // node.data is [] above, and plan is AggregateExec
    // Fields are "child_filters: [[]], remaining_filters: [], operator: AggregateExec..."
    if remaining_filters.is_empty() {
        // No need to add a FilterExec, continue with the next operator
        for child_index in 0..self.children.len {
            // the children node data is empty
            node.children[child_index].data = child_filters[child_index];
        }
        return Ok(Transformed::yes(node)); // no problem to return "yes" here, but it actually doesn't change
    } else {
        ...
    }
})

Now, the rule works as if it’s initialized freshly on this plan without any accumulated filter knowledge:

-     FilterExec: a@0 = foo
-       CoalesceBatchesExec: target_batch_size=1
-         DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test
context.transform_down(|node| {
    let FilterPushdownResult {
        child_filters, remaining_filters, operator
    } = node.plan.try_pushdown_filters(node.data)?;
    
    // Here, the operator was FilterExec, so the fields are filled like this:
    // child_filters: [[a@0 = foo]], remaining_filters: [], operator: CoalesceBatchesExec...
    if remaining_filters.is_empty() {
        // No need to add a FilterExec, continue with the next operator
        for child_index in 0..self.children.len {
            node.children[child_index].data = child_filters[child_index];
        }
        return Ok(Transformed::yes(node));
    } else {
        ...
    }
})
context.transform_down(|node| {
    let FilterPushdownResult {
        child_filters, remaining_filters, operator
    } = node.plan.try_pushdown_filters(node.data)?;
    
    // Here, the operator was CoalesceBatchesExec, so the fields are filled like this:
    // child_filters: [[a@0 = foo]], remaining_filters: [], operator: CoalesceBatchesExec...
    if remaining_filters.is_empty() {
        // No need to add a FilterExec, continue with the next operator
        for child_index in 0..self.children.len {
            node.children[child_index].data = child_filters[child_index];
        }
        return Ok(Transformed::yes(node));
    } else {
        ...
    }
})
context.transform_down(|node| {
    let FilterPushdownResult {
        child_filters, remaining_filters, operator
    } = node.plan.try_pushdown_filters(node.data)?;
    
    // Here, the operator was DataSourceExec WITHOUT a filter, so the fields are filled like this:
    // child_filters: [[]], remaining_filters: [], operator: DataSourceExec WITH a filter
    if remaining_filters.is_empty() {
        // No need to add a FilterExec, continue with the next operator
        for child_index in 0..self.children.len {
            node.children[child_index].data = child_filters[child_index];
        }
        return Ok(Transformed::yes(node));
    } else {
        ...
    }
})

The rule part is more or less is like that. Now, let’s examine the try_pushdown_filters() impls for some operators

  1. Default Implementation
fn try_pushdown_filters(&self, fd: FilterDescription) -> Result<FilterPushdownResult> {
    let child_filters = [[]]; // Parent filters can't be passed onto children.
    let remaining_filters = [fd]; // We have no filters to contribute.
    let new_self = self;
    Ok(FilterPushdownResult {
        child_filters, remaining_filters, new_self
    })
}
  1. Transparent Ones