If we can generate results of the query as we scan the table, without the need to hold infinite memory we say that query can be run on unbounded data. In this document, we will analyze how we can determine whether we can accomplish this.

Observation: To support streaming window queries such as following

SELECT
    SUM(inc_col) OVER(ORDER BY inc_col ASC RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING)
FROM annotated_data

we need a mechanism for getting information about table columns (via annotations of column Fields). For instance, if we know that column inc_col is already sorted, we can leverage this information to support streaming queries that work on unbounded data. In the following documentation, we assume that inc_col is physically ascending and desc_col is physically descending already (This will enable us to remove Sort expressions for ORDER BY clauses involving these columns).

In the following documentation we will analyze Streaming window support under 4 cases:

When a query contains both PARTITION BY and ORDER BY clauses, PARTITION BY clauses and ORDER BY clauses can be analyzed according to above cases independently.

ORDER BY clauses

Untitled

Single ORDER BY clauses

For the query below

SELECT
    SUM(inc_col) OVER(ORDER BY inc_col ASC RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING)
FROM annotated_data

The physical plan of the above query can be seen below

+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                        |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+                                                                                                                                       |
| physical_plan | ProjectionExec: expr=[SUM(annotated_data.inc_col) ORDER BY [annotated_data.inc_col ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@0 as SUM(annotated_data.inc_col)]             |
|               |   WindowAggExec: wdw=[SUM(annotated_data.inc_col): Ok(Field { name: "SUM(annotated_data.inc_col)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: None })] |
|               |     SortExec: [inc_col@0 ASC NULLS LAST]                                                                                                                                                    |
|               |       MemoryExec: partitions=1, partition_sizes=[51]                                                                                                                                        |
|               |                                                                                                                                                                                             |
+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

All operations except SortExec in the above physical plan can be accomplished without the need to see the entire table. Since we know that inc_col is already ascending sorted, we can remove SortExec from the physical plan. Assuming we have StreamWindowAggExec we can turn the above physical plan to the one below.