If we can generate results of the query as we scan the table, without the need to hold infinite memory we say that query can be run on unbounded data. In this document, we will analyze how we can determine whether we can accomplish this.
Observation: To support streaming window queries such as following
SELECT
SUM(inc_col) OVER(ORDER BY inc_col ASC RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING)
FROM annotated_data
we need a mechanism for getting information about table columns (via annotations of column Fields). For instance, if we know that column inc_col is already sorted, we can leverage this information to support streaming queries that work on unbounded data. In the following documentation, we assume that inc_col is physically ascending and desc_col is physically descending already (This will enable us to remove Sort expressions for ORDER BY clauses involving these columns).
In the following documentation we will analyze Streaming window support under 4 cases:
ORDER BY clauses e.g OVER(ORDER BY a …),ORDER BY clauses e.g OVER (ORDER BY a, b …)PARTITION BY clauses where columns are physically sorted e.g OVER (PARTITION BY sorted_col ..)PARTITION BY clauses where columns are not sorted e.g OVER (PARTITION BY unsorted_col ..)When a query contains both PARTITION BY and ORDER BY clauses, PARTITION BY clauses and ORDER BY clauses can be analyzed according to above cases independently.
ORDER BY clausesORDER BY clausesFor the query below
SELECT
SUM(inc_col) OVER(ORDER BY inc_col ASC RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING)
FROM annotated_data
The physical plan of the above query can be seen below
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |
| physical_plan | ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.inc_col ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@0 as SUM(annotated_data.inc_col)] |
| | WindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: "SUM(annotated_data.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None })] |
| | SortExec: [inc_col@0 ASC NULLS LAST] |
| | MemoryExec: partitions=1, partition_sizes=[51] |
| | |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
All operations except SortExec in the above physical plan can be accomplished without the need to see the entire table. Since we know that inc_col is already ascending sorted, we can remove SortExec from the physical plan. Assuming we have StreamWindowAggExec we can turn the above physical plan to the one below.