Apps     Components     Interfaces     All Files     Source Tree     source: tos.lib.TinyDB.TupleRouterM.nc

Component: TupleRouterM

The TupleRouter is the core of the TinyDB system -- it receives queries from the network, creates local state for them (converts them from Queries to ParsedQueries), and then collects results from local sensors and neighboring nodes and feeds them through local queries.

Queries consist of selections and aggregates. Results from queries without aggregates are simply forwarded to the root of the tree to be handled by the query processor.

Queries with aggregates are processed according to the TAG approach: each node collects partial aggregates from its children, combines those aggregates with its own sensor readings, and forwards a partial aggregate on to its parents.

There are three main execution paths within TUPLE_ROUTER; one for accepting new queries, one for accepting results from neighboring nodes, and one for generating local results and deliver data to parent nodes.

QUERY ARRIVAL

------------

1) New queries arrive in a TUPLE_ROUTER_QUERY_MESSAGE. Each query is assumed to be identified by a globally unique ID. Query messages contain a part of a query: either a single field (attribute) to retrieve, a single selection predicate to apply, or a single aggregation predicate to apply. All the QUERY_MESSAGEs describing a single query must arrive before the router will begin routing tuples for that query.

2) Once all the QUERY_MESSAGESs have arrived, the router calls parseQuery() to generate a compact representation of the query in which field names have been replaced with field ids that can be used as offsets into the sensors local catalog (SCHEMA).

3) Given a parsedQuery, the tuple router allocates space at the end of the query to hold a single, "in-flight" tuple for that query -- this tuple will be filled in with the appropriate data fields as the query executes.

4) TupleRouter then calls setSampleRate() to start (or restart) the mote's 32khz clock to fire at the appropriate data-delivery rate for all of the queries currently in the system. If there is only one query, it will fire once per "epoch" -- if there are multiple queries, it will fire at the GCD of the delivery intervals of all the queries.

TUPLE DELIVERY

--------------

1) Whenever a clock event occurs (TUPLE_ROUTER_TIMER_EVENT), the router must perform four actions:

a) Deliver tuples which were completed on the previous clock event (deliverTuplesTask). If the query contains an aggregate, deliver the aggregate data from the aggregate operator; if not, deliver the tuple filled out during the last iteration. Reset the counters that indicate when these queries should be fired again.

b) Decrement the counters for all queries. Any queries who's counters reach 0 need to have data delivered. Reset the expression specific state for these queries (this is specific to the expressions in the queries -- MAX aggregates, for instances, will want to reset the current maximum aggregate to some large negative number.)

c) Fetch data fields for each query firing this epoch. Loop through all fields of all queries, fetch them (using the SCHEMA interface), and fill in the appropriate values in the tuples on the appropriate queries.

d) Route filled in tuples to query operators. First route to selections, then the aggregate (if it exists). If any selection rejects a tuple, stop routing it.

NEIGHBOR RESULT ARRIVAL

-----------------------

When a result arrives from a neighbor (TUPLE_ROUTER_RESULT_MESSAGE), it needs to be integrated into the aggregate values being computed locally. If the result corresponds to an aggregate query, that result is forwarded into the AGG_OPERATOR component, otherwise it is simply forwarded up the routing tree towards the root.
Author: Sam Madden (madden@cs.berkeley.edu)

Required Interfaces

Provided Interfaces

Variables

Function Index

Function Descriptions

forwardQuery

TinyDBError forwardQuery(TOS_MsgPtr msg)

Forward out a query message, setting errors as appropriate if the radio is already busy. Note, this uses the mMsg module variable.

Parameters:

msg - The message to send (a copy is made into mMsg, so the application can overwrite after this call)

Returns: err_MSF_ForwardKnownQuery if message send failed, err_MSG_ForwardKnownQueryBusy if radio was busy

parsedCallback

void parsedCallback(Handle *memory)

Continuation after parsed query is successfully alloc'ed NOTE: after we setup query, need to resize for tuple at end of query...

Parameters:

memory - Newly allocated handle for the parsed query


removeQuery

TinyDBError removeQuery(uint8_t qid, BoolPtr success)

Remove a query from the tuple router

Parameters:

qid - The query to remove

success - Set TRUE if the query was succesfully removed, FALSE if the query couldn't be found or an error occurred.

Returns: err_RemoveRouterFailed if router is in a state such that the query may be use, err_NoError otherwise.

Network.snoopedSub

event TOS_MsgPtr Network.snoopedSub (TOS_MsgPtr msg, uint8_t amId, bool isParent)

A message not directly addressed to us that we overhead Use this for time synchronization with our parent, and to snoop on queries.

Parameters:

msg - The message

amId - The amid of the message

isParent - If the message is from our parent


SelOperator.processedTuple

event TinyDBError SelOperator.processedTuple (Tuple *t, ParsedQuery *q, Expr *e, bool passed)

Continue processing a tuple after a selection operator Basically, if the tuple passed the selection, we continue routing it to additional operators. Otherwise, we move on to the next query for routing.

Parameters:

t - The tuple that has been processed by the operator,

q - The query that this tuple belongs to

e - The expression that processed the tuple

passed - Indicates whether the tuple passed the operator -- if not, the tuple should not be output.

Returns: err_NoError

AggOperator.processedResult

event TinyDBError AggOperator.processedResult (QueryResult *qr, ParsedQuery *q, Expr *e)

Called every time we route a query result through an aggregate operator.

Parameters:

qr - The query result we processed

q - The query it belongs to

e - The expression that processed it Need to route to the next aggregation operator.


aggregateResult

void aggregateResult(ParsedQuery *q, QueryResult *qr, char exprId)

Apply all aggregate operators to this result. Apply them one after another, starting with exprId.

This is called from TUPLE_ROUTER_RESULT_MESSAGE and from AGGREGATED_RESULT_EVENT

Parameters:

q - The query that the result applies to

qr - The query result

exprID - The expression to apply to qr


Timer.fired

event result_t Timer.fired (void)

Clock fired event --
Works as follows:
1) Output tuples from previous epochs
2) Deterimine what queries fire in this epoch
3) Collect samples from those queries
4) Fill in the tuples in those queries
5) Apply operators to those tuples

While this is happening, results may arrive from other sensors nodes representing results from the last epoch. Those results need to be forwarded (if we're just selection), or stored (if we're aggregating)

Question: What to do if next time event goes off before this one is complete? Right now, this we'll simply ignore later events if this one hasn't finished processing

As of 10/5/2002, moved main body into a task.


deliverTuplesTask

task void deliverTuplesTask(void)

Walk through queries, finding ones that have gone off (timer reached 0), and where the tuples are complete. Output said tuples to the appropriate output buffers.

mCurRouteQuery contains the last query routed, or NULL if starting at the first query (it's not a parameter, since this task needs to be rescheduled as successive tuples are delivered)


sendWaitingMessages

void sendWaitingMessages(void)

Called from the main timer loop -- task that drains the internal message queue. Uses mOutputCount to track the time until the next message should be sent.

fetchNextAttr

bool fetchNextAttr(void)

Fetch the next needed attribute, and
Returns: TRUE if an attribute was found and (possibly) more attribute exist. Does this by scanning the current queries, finding those that have fired and need fields filled in, and filling in those fields. Uses mLastQuery to track the last query that results were fetched for, and mFetchingField to track the last field that was filled in. Note that a return value of TRUE doesn't mean the recent attribute was actually found

routeTask

task void routeTask(void)

routeTask does the heavy lifting of routing tuples to queries. It assumes the tuples stored in each query that needs to be routed during this epoch have been initialized. It then iterates through the operators in each query, routing tuples to them in succession. Tuples are routed through a query at a time, and always in a fixed order. mCurRouteQuery is set to the query for which tuples are currently being routed.

nextQueryToRoute

TupleRouterM.QueryListHandle nextQueryToRoute(TupleRouterM.QueryListHandle curQuery)

Returns: the next query in the query list that needs to be output
Aassumes that all attributes have already been filled out (e.g. fetchNextAttr() returned false) mCurTuple is changed to point at the tuple corresponding to the returned query.

routeToQuery

bool routeToQuery(ParsedQuery *q, Tuple *t)

Route the specified tuple to the first operator of the specified query. This will send the tuple to an operator, which will return the tuple when it is done.

Parameters:

q - The query that t should be routed to

t - The tuple to route

Returns: TRUE if the tuple was routed, FALSE otherwise

nextExpr

Expr nextExpr(ParsedQuery *q)

Uses mCurExpr to track the current expression in q that is being applied.
mCurExpr should be set to -1 to get the first expression in q.
The expression id is not an explicit parameter since expression routing needs to be resumed after the previous split phase expression application.
Returns: the next expression in q, or null if no such expression exists.

getQuery

bool getQuery(uint8_t qid, ParsedQuery **q)

Returns: TRUE if the query exists.

Parameters:

qid - The query to fetch

q - Will point to the query pointer upon return if the return value is TRUE.


QueryProcessor.msgToQueryRoot

command short QueryProcessor.msgToQueryRoot (TOS_MsgPtr msg)

Given a processor message return the owner (origninating node) of the query, or -1 if the query is unknown or the message is a query processor message.

Parameters:

msg - The query for which the root is sought


parseQuery

bool parseQuery(Query *q, ParsedQuery *pq)

Given a query, parse it into pq

Parameters:

q - The query to convert

pq - The parsed query to fill in. Assumes that pq has been allocated with ParsedQueryIntf.pqSize(q) bytes.

Returns: TRUE if successful

allocPendingQuery

bool allocPendingQuery(TupleRouterM.MemoryCallback callback, Query *q)

Allocates space for a query, given a query with numExprs and numFields filled in

Parameters:

q - The query to allocate a query data structure for

callback - The callback to fire when the allocation is complete

Returns: TRUE if the allocation was successfully initiated (e.g. a callback is expected)

allocQuery

bool allocQuery(TupleRouterM.MemoryCallback callback, Query *q)

Allocate space for a parsed query After request compltes, add the result to qs linked list, and then call callback.

Parameters:

callback - Callback fired once allocation is complete

q - The query to use to initialize the parsed query

Returns: true if request succesfully made (e.g. a callback is expected)

RadioQueue.enqueue

command TinyDBError RadioQueue.enqueue (const char *msg, uint16_t len)

Copy the specified bytes into the message queue. Messages are always data (tuple) messages. Messages of more than DATA_LENGTH - sizeof(DbMsgHdr) bytes will be truncated.

Parameters:

msg - The data payload of the message to enqueue

len - The length (in bytes) of the data

Returns: err_MessageSendFailed if the queue is full