telegraph.cs.berkeley.edu


Data Acquisition in TelegraphCQ


TelegraphCQ interacts with data sources via its wrapper clearinghouse process.  The wrapper clearing house manages connections with external data sources, and the user defined wrappers which read data from these data sources.  The sole function of the wrapper clearinghouse is to set-up, schedule, and clean up  wrapper instances.

What is a wrapper?


A wrapper consists of a set of functions which are defined using the Postgres CREATE FUNCTION command.  A wrapper will typically define an initialization function, a next data tuple function, a done function, and possibly a rescan function.  The init, next, and done functions are used by the wrapper clearing house to set up wrapper state, retrieve data rows from the data source, and cleanup when the wrapper is done. 

TelegraphCQ supports two types of data sources.  PUSH data sources initiate a connection to TelegraphCQ, declare which wrapper should handle the data, and which stream will receive the data once it has been turned into a data tuple.

A PULL type source is not aware of TelegraphCQ, and thus cannot initiate a connection to it.  For such sources, the wrapper must set up a connection to the data source in its init function.   In addition, some PULL sources may require that TelegraphCQ send some information to the source before it will produce data results.  An example of such a source would be a data provider that returns search results in response to a set of key words.  The last wrapper function, the rescan function,is required for PULL sources which require input values in order to produce data items.

Wrappers can also have a set of KEY/VALUE pairs associated with them when they are created. This information, which is typically used for wrapper or wrapper clearinghouse configuration,  can be retrieved via an API  from within a wrapper function.

Specifing the wrapper type


A wrapper can be either a PUSH or PULL type wrapper.  Setting the WRAPPERTYPE configuration parameter to either PUSH or PULL specifies the type of the wrapper.  By default wrappers are PUSH type wrappers.
 

The CSV Wrapper


Many TelegraphCQ users may not want to worry about the details of wrapper implementation.  We have developed a flexible wrapper that will serve the needs of many users.

The wrapper supports the input of data that arrives at the server in a delimited format.  By default the per column delimiters are commas, and data tuples are delimited by a newline, but these defaults can be overridden using wrapper configuration parameters.  Further, the number of columns in a data item need not match the number of columns in the target stream exactly: data tuples with too many items will be truncated by the wrapper to fit into the target, and data tuples with fewer columns will be padded with NULL’s for the columns that are missing.

Recall that each TelegraphCQ data stream must have a special timestamp attribute which the executor uses for stream operations.  This timestamp attribute must be the last attribute in any stream that will be used with the CSV wrapper.

The data conversion for a particular column uses the postgres data input function for the type of the column in the target stream.  If this function is unable to do the conversion, that column will have a NULL value.


Setting up the CSV wrapper


The CSV wrapper is predefined in the system catalogs.  A wrapper named csvwrapper is available immediately upon installation.  Users may create additional instances of the CSV wrapper with different configuration parameters using the CREATE WRAPPER DDL statement. 

The pre-defined wrapper functions for the CSV wrapper are called:
Csv_init  -- initialization function
Csv_next – next tuple
Csv_done -- cleanup

Providing data to the CSV wrapper


By default, the wrapper clearinghouse listens for connections from PUSH data sources on port 5533.  A simple perl script called source.pl is provided with the distribution to send data to the wrapper clearinghouse.  It functions much like the unix “tee” command with its input going both to TelegraphCQ and the standard output. 

The script is invoked as follows:

source.pl hostname port name

the host and port specify the location where the wrapper clearinghouse listens for connections.

The name string is actually composed of two parts separated by commas.  The first is the name of the wrapper that will be used to process the data stream (in this example, csvwrapper).  The second is the fully qualified name of the stream that will receive the data (this, for instance, might be myschema.csvstream)


Configuring the CSV wrapper


By default, the CSV wrapper expects columns to be delimited by commas.  These per column delimiters can be overridden by creating a new wrapper object.  The following wrapper attributes are recognized by the CSV wrapper:
KEY
VALUE
DELIMITERS# ,A string containing all the possible delimiters for a particular column, numbered starting from 0
DEFAULT_DELIMITERS  Set up the delimiters for columns not explicitly specified using the DELIMITERS# key.  The default is ,
ERROR_INDICATOR When the csv wrapper looks for a new data row, it first looks to see if the row begins with the text specified in the error_indicator key.  If so, the error message is extracted, and the wrapper passes the error back to the wrapper clearinghouse.  The default for this column is “ERROR:”
DONE_INDICATOR This key is used only  for wrappers which retrieve rows in response to some query that has been sent via the rescan function.  For such wrappers, the text in this key is used by the csv_next function to determine when all of the results for a particular query have been processed.  The default is “DONE WITH PROBE”




SEE ALSO:

TelegraphCQ Wrapper Writers guide