telegraph.cs.berkeley.edu

TelegraphCQ Wrapper Writers Guide

Overview

Wrapper writers should first consider if their data can be imported into the system using the CSV wrapper example provided with the distribution, as this is by far the easiest (and a quite flexible) way to get data into the system.

Wrapper types

PULL wrappers are instantiated as a result of a queries arrival in the system, and as such are responsible for establishing a connection with the data source as a part of their initialization. PULL wrappers may also receive data values or "probe tuples" as inputs to the data retrieval process. The optional Rescan function takes a probe tuple as its input and communicates with the remote data source to initiate the retrieval of data items which correspond to that probe tuple.

 

A wrapper writer will need to implement the init, next, and done functions for a PUSH wrapper. A PULL wrapper will also need to implement a rescan function.

A few words on memory management

Each instance of a wrapper has two Postgres memory contexts. The first context, or per wrapper context, is always the active context when a wrapper function is called. The lifetime of this context is the lifetime of the wrapper instance. The second context is a per-tuple context. This context will be cleared by the wrapper clearinghouse each time it receives a new data tuple from the wrappers next function.

Arguments and return values

Each time a wrapper function is called, the wrapper clearinghouse will pass it the same instance of the wrapperstate structure defined below. During the course of its execution, the wrapper may modify its own wrapper-defined state, and can also modify certain wrapperstate fields in order to communicate data items, errors, or state transitions to the wrapper clearinghouse. The wrapper function returns TRUE or FALSE to indicate if the wrapper completed successfully, or encountered a fatal error.

The wrapperstate argument

Each wrapper is passed the same instance of a wrapperstate function each time it is called. The wrapperstate function contains the following data members;

 

int fd;

The socket descriptor for the connection to the data source. This socket is always in non-blocking mode.

TupleDesc tdesc;

The postgres tuple descriptor for the target stream.

void *udfstate;

A pointer that can be used to hold wrapper specific state. Allocation of the memory for this state should occur in the per-wrapper memory context.

FunctionMode mode;

bool doneWithProbe;

This variable should be set by the wrapper to indicate the next wrapper function to be called. Valid values are:

MODE_INIT  call the initialization function

MODE_NEXT  call the next function

MODE_DONE  call the done function

MODE_END  the wrapper cleanup has finished, delete the wrapper

 

NOTE: the next function should indicate it has finished processing all tuples for a probe tuple by setting the doneWithProbe variable to true.

Datum *returnTuple;

char *returnTupleIsNull;

int returnTupleLength;

bool hasTuple;

These fields are used by the wrapper to return a data tuple to the wrapper clearinghouse.

ReturnTuple is the array which holds the actual data elements as Datums

ReturnTupleIsNull is an array of chars each of which is either ' ' or ''n' indicate whether a particular column has a value or is null, respectively.

HasTuple is set to true by the wrapper to indicate to the wrapper clearinghouse that a tuple has been returned.

bool hasBufferedData;

This Boolean is set by the wrapper to indicate that it has read some data from the network socket, but has not yet had the opportunity to process it. Setting this flag will tell the WCH to call the wrapper functions even if there is no new data available from the network.

bool connectionReady;

 

The wrapper clearinghouse sets this flag prior to calling a wrapper function to let the wrapper know if there is data available on the socket.

void * wrapper_info;

This structure member should be passed to the get_entry() utility function to retrieve wrapper attributes.

char * error_message;

Wrappers should set this field to an error message and return FALSE to indicate an error to the client.

AttInMetadata * aim;

AttOutMetadata * aom;

These data structures are passed to the DatumFromTypeInstance and CstringFromDatum utility functions.

MemoryContext perTupleContext;

The memory context in which per tuple data should be allocated.

Input and Output utility functions

Wrapper functions may call the following utility functions to convert data and access wrapper attributes. For functions related to binding values, please see the section on the rescan function.

 

Datum DatumFromTypeInstance(AttInMetadata *aim, Int colno, char *typeInstance, MemoryContext ctx);

Convert a string representing a data value into the internal representation of that type. The return value of this function may be placed in the returnTuple array of the wrapper state structure. If for some reason the typeInstance cannot be converted, a null value will be returned.

char * CStringFromTuple(AttOutMetadata *aom, HeapTuple tup, int attno, bool * isNull, MemoryContext ctx);

Given a heap tuple, return the value of a particular column as a C string.

key_value_entry * getEntry(char * key);

Look up a wrapper attribute, and return a structure containing the value. Struct key_value_entry is declared as follows:

 

Struct key_value_entry

{

char * key;

char * value;

};

 

 

Error reporting

A wrapper function may set the error_message field of the wrapperstate structure and return FALSE to communicate an error condition.

Wrapper functions and their responsibilities

The initialization function

An initialization function for a wrapper will typically allocate state used during execution and place a pointer to that state in the udfstate field of the wrapperstate structure. The state information maintained by a wrapper typically includes a buffer for data that has been read off of the network, but not yet processed.

 

Pull type wrappers must also initiate a network connection with the data source as a part of the initialization function. A utility function called establish_connection() will take care of setting up the network connection provided that the HOSTNAME and PORT wrapper parameters where specified when the wrapper was created.

The next function

This is the workhorse of a wrapper. The next function must process data from the source, and turn it into a set of postgres Datums. The next function may be called under two circumstances: 1) the wrapper has data buffered in its internal structures, but has not yet processed that data. And/or 2) there is data available on the wrappers network socket. The wrapper can check the connectionReady structure member to see if there is data ready to read.

 

If the wrapper has read a complete data item it should:

 

1)  convert the data items for each column into a Postgres Datum of the correct type, according to the type specified in the TupleDescriptor. Place the resulting Datums in the returnTuple array. Also, fill in the returnTupleIsNull array with a ' ' charater if the column is not null and a 'n' character if the column is NULL;

2)  set the hasTuple flag to true

3)  set hasBufferedData to indicate if the wrapper has buffered data or must wait for data to arrive

4)  return

 

Pull wrappers will typically send requests for data via their rescan functions. The next function will then need to determine when (if ever) the results for that probe have finished being returned. This is typically accomplished by sending a special marker in the data stream that would not otherwise appear. When the wrapper detects one of these done with probe indicators, it should set the doneWithProbe field in the structure, and return. The wrapper clearinghouse will cause the rescan function to be scheduled next.

The done function

The done function gives the wrapper an opportunity to clean up any state.

The rescan function

A pull wrapper may have 0 or more binding columns associated with it. A binding column is a special column which must have a value provided for it before the wrapper may be triggered to produce tuples. The rescan function receives these values as "probe tuples", or tuples which have values for all binding columns and NULLs in all other columns.

 

The rescan function is invoked when all of data tuples corresponding to the previous probe tuple have been received. The job of the rescan function is to examine the new probe tuple, and turn it into a request appropriate for sending to the remote source. Once such a request is formed, the rescan function must send it to the remote source. Once the rescan function has finished its work, the wrapper must be returned to the NEXT state, so that the results that are returned as a consequence of the newly sent request can be consumed and processed by the wrapper.

 

The input probe tuples are delivered to the wrapper clearinghouse from the TelegraphCQ backend process, and are accessible to the rescan function via a number of utility functions described in the following table:

 

bool hasNewProbeTuple(wrapperstate *s);

Is there a probe tuple waiting to be processed by this wrapper?

int getNumBindings(wrapperstate *s);

Returns the number of binding values there are in this probe tuple. A binding value is the value associated with a column that was flagged to be used as input for this wrapper when the wrapper was associated with the target stream.

bool isBindingColumn(wrapperstate *s, int colno);

Returns true if the indicated stream column is a column which holds a binding value, false if the column is one for which the wrapper will produce data.

char * getBindingValueByNumber(wrapperstate *s, int bindingno);

Get a binding value according to the order in which input (binding) columns were specified when the wrapper was associated with a stream.

char * getBindingValueByStreamCol(wrapperstate *s, int streamcol);

Check a particular stream column, and if it is a binding column, return the bound value. Otherwise return NULL.

 

Example Wrapper

The source code for the csv wrapper can be found in src/backend/utils/telegraphcq/csv.c. It contains an example implementation of an initialization, next, and done wrapper function.

 

An example rescan function that communicates with the WebQueryServer can be found in src/test/examples/web.c

SEE ALSO

Data Acquisition