TCQ Window Semantics
By Fred Reiss
Syntax
After each stream in a query there may appear a window clause in the
form:
[ RANGE BY '<time>' SLIDE BY '<time>'
START AT '<time>' ]
Together, these three parameters specify a sliding, hopping, or jumping
time window over the stream.
The START AT
parameter is optional; if you don't specify a START
AT value, the first window will start at the end of the last
second before the query
was submitted.
Inside the query, a special aggregate, wtime(*),
allows you to retrieve
the rightmost endpoint of the current time window.
Semantics
Our current aggregate window semantics are as follows:
- There is a global "current time", defined as the
lowest
maximum timestamp the join
processor has consumed across all the streams we are
joining. Note that current time can only advance when a tuple
enters the query
processor; we do not currently advance based purely on the system
clock.
- For each base stream, the window whose latest time is closest to
the
current time without going beyond
it is considered the "current window" for that stream. For example:
- If START AT is 0 seconds and RANGE BY and SLIDE BY
are both 1 second...
- ...and window 1 goes from 0 seconds to 1 second, while window 2
goes from 1 to 2 seconds
- ...and the current time is 1.5 seconds
- ...then the current window is window
1.
- A base tuple is considered "live" if it is inside the
current window for its stream.
- An join tuple is considered "live" only if all of
its component base tuples are live.
- As the current time advances, the current windows of one
or more streams will
change. Each of these changes constitutes a "slide event". If two
windows slide
at exactly the same time, it is considered to be one slide event.
- Every time that a slide event occurs, the aggregation
operator outputs the value for the aggregate as applied to all
join tuples
that are currently live.
- After outputting the aggregate value, the operator
updates the set of live tuples to
correspond to the new set of current windows.
Implementation
Currently, sliding windows for aggregates are implemented by the Fjord
Aggregate operator, located in
src/backend/executor/nodeFAgg.c. This
operator maintains a buffer
of "live" join tuples and reevaluates the
aggregation tree every time a window slides. Documentation for
the algorithm
and data structures the operator uses can be found at the top of
nodeFAgg.c.
An interesting aspect of the implementation is the computation of
"birth" and "death" times (the times when tuples become "live" and
"dead," respectively) for join tuples. We compute the birth and death
times as follows:
- For each base tuple in the join tuple, do the following:
- Compute the first and last time windows that the tuple appears
in (also check to see whether the tuple appears in no windows at all)
- The birth time of the base tuple is at the end of the first
window containing the tuple
- The death time of the base tuple is at the end of the window immediately after the last window
that contains the tuple
- The birth time of the join tuple is the maximum of the birth
times of its base tuples
- The death time of the join tuple is the minimum of
those of the base tuples.
Windows and Shared Aggregates
Known Issues
The current implementation of sliding windows has several known issues:
- Windowed join queries without aggregates will produce incorrect
answers, because our STeM operators still use
windows that slide on a per-tuple basis.
- Due to the problems with joins, windows with a RANGE
BY less than
the SLIDE BY can result in missing aggregate
results.
- If the SLIDE BY parameters of the
different windows are not
multiples of each other, you may see some
strange-looking results. Correct, but strange. We may place
constraints on SLIDE BY values in the future
to rectify this.