Using PXF to Query External System Data

Using PXF to Query External System Data

PXF is an extensible framework that allows HAWQ to query external system data. PXF includes built-in connectors for accessing data that exists inside HDFS files, Hive tables, and HBase tables. Users can also create their own connectors to other parallel data stores or processing engines. To create these connectors using JAVA plugins, see the PXF External Tables and API.

Configuring PXF

Setting up the Java Classpath

The classpath of PXF service is set during the installation process. Administrators should only modify it when adding new PXF connectors. The classpath is defined in two files:

  1. /etc/gphd/pxf/conf/pxf-private.classpath - contains all the required resources to run the service, including pxf-hdfs, pxf-hbase and pxf-hive plugins.This file must not be edited or removed.
  2. /etc/gphd/pxf/conf/pxf-public.classpath - user defined resources can be added here, for example for running a user defined plugin.The classpath resources should be defined one per line. Wildcard characters can be used in the name of the resource, but not in the full path.

After changing the classpath files, the PXF service must be restarted. Resources can also be added to the staging /usr/lib/gphd/publicstage directory (see About the Public Directory below).

Setting up the JVM Command Line Options for the PXF Service

The pxf service JVM command line options can be added/modified for each pxf-service instance in the /var/gphd/pxf/pxf-service/bin/setenv.sh file:

Currently the JVM_OPTS are set with following values for maximum Java heap size and thread stack size:
JVM_OPTS="-Xmx512M -Xss256K"

After adding/modifying the JVM command line options, the PXF service must be restarted.

Secure PXF

PXF can be used on a secure HDFS cluster. Read, write and analyze operations for PXF tables on HDFS files are enabled. It requires no changes to prexisting PXF tables from a previous version.

Requirements

  • Both HDFS and YARN principals are created and are properly configured.
  • HDFS uses port 8020 (see the note below).
  • HAWQ is correctly configured to work in secure mode, according to the instructions in the HAWQ guide.
    Note:
    • The HDFS Namenode port must be 8020. This is a limitation that will be fixed in the next PXF version.
    • Please refer to the troubleshooting section for common errors related to PXF security, and their meaning.

Reading and Writing Data with PXF

PXF comes with a number of built-in connectors for reading data that exists inside HDFS files, Hive tables, HBase tables, and for writing data into HDFS files. These built-in connectors use the PXF extensible API. You can also use the extensible API to create your own connectors to any other type of parallel data store or processing engine. See PXF External Tables and API for more information about the API.

This topic contains the following information:

  • Accessing HDFS File Data with PXF (Read + Write)
  • Accessing HIVE Data with PXF (Read only)
  • Accessing HBase Data with PXF (Read only)

Built-in Profiles

A profile is a collection of common metadata attributes. Use the convenient and simplified PXF syntax. 

PXF comes with a number of built-in profiles that group together a collection of metadata attributes to achieve a common goal:

Profile Description Fragmenter/Accessor/Resolver
HdfsTextSimple Read or write delimited single line records from or to plain text files on HDFS.
  • com.pivotal.pxf.plugins.hdfs.HdfsDataFragmenter
  • com.pivotal.pxf.plugins.hdfs.LineBreakAccessor
  • com.pivotal.pxf.plugins.hdfs.StringPassResolver
HdfsTextMulti Read delimited single or multi-line records (with quoted linefeeds) from plain text files on HDFS.This profile is not splittable (non parallel); therefore reading is slower than reading with HdfsTextSimple.
  • com.pivotal.pxf.plugins.hdfs.HdfsDataFragmenter
  • com.pivotal.pxf.plugins.hdfs.QuotedLineBreakAccessor
  • com.pivotal.pxf.plugins.hdfs.StringPassResolver
Hive Use this when connecting to Hive. The Hive table can use any of the available storage formats: text, RC, ORC, Sequence, or Parquet.
  • com.pivotal.pxf.plugins.hive.HiveDataFragmenter
  • com.pivotal.pxf.plugins.hive.HiveAccessor
  • com.pivotal.pxf.plugins.hive.HiveResolver
HiveRC Use this when connecting to a Hive table where each partition is stored as an RCFile. It is optimized for it.
Note: The DELIMITER parameter is mandatory.
  • com.pivotal.pxf.plugins.hive.HiveInputFormatFragmenter
  • com.pivotal.pxf.plugins.hive.HiveRCFileAccessor
  • com.pivotal.pxf.plugins.hive.HiveColumnarSerdeResolver
HiveText Use this when connecting to a Hive table where each partition is stored as a text file. It is optimized for it.
Note: The DELIMITER parameter is mandatory.
  • com.pivotal.pxf.plugins.hive.HiveInputFormatFragmenter
  • com.pivotal.pxf.plugins.hive.HiveLineBreakAccessor
  • com.pivotal.pxf.plugins.hive.HiveStringPassResolver
HBase Use this when connected to an HBase data store engine.
  • com.pivotal.pxf.plugins.hbase.HBaseDataFragmenter
  • com.pivotal.pxf.plugins.hbase.HBaseAccessor
  • com.pivotal.pxf.plugins.hbase.HBaseResolver
Avro Reading Avro files (i.e fileName.avro).
  • com.pivotal.pxf.plugins.hdfs.HdfsDataFragmenter
  • com.pivotal.pxf.plugins.hdfs.AvroFileAccessor
  • com.pivotal.pxf.plugins.hdfs.AvroResolver

Adding and Updating Profiles

Administrators can add new profiles or edit the built-in profiles inside pxf-profiles.xml (and apply them with the Pivotal Hadoop (HD) Enterprise Command Line Interface). You can use the all the profiles in pxf-profiles.xml.

Each profile has a mandatory unique name and an optional description.

In addition, each profile contains a set of plugins that are an extensible set of metadata attributes.

Custom Profile Example

<profile> 
 <name>MyCustomProfile</name>
 <description>A Custom Profile Example</description>
 <plugins>
 	<fragmenter>package.name.CustomProfileFragmenter</fragmenter>
 	<accessor>package.name.CustomProfileAccessor</accessor>
 	<customPlugin1>package.name.MyCustomPluginValue1</customPlugin1>
 	<customPlugin2>package.name.MyCustomPluginValue2</customPlugin2>
 </plugins>
</profile>

Deprecated Classnames

In past versions of PXF, connector class names could be used without their package names:e.g. HdfsDataFragmenter instead of com.pivotal.pxf.plugins.hdfs.HdfsDataFragmenter.

This has changed in the last version. While package-less classes can still be used, a warning will be issued upon creation and use of any table.

WARNING:  Use of HdfsDataFragmenter is deprecated and it will be removed on the next major version
DETAIL:  Please use the appropriate PXF profile for forward compatibility (e.g. profile=HdfsTextSimple)

Please note that the next major release will not support the old names.That means a "class not found" error message will be issued.

To avoid future deprecation issues, PXF Profiles should be used. Recommended built-in PXF profiles: 

Old name Profile
HdfsDataFragmenter, TextFileAccessor, TextResolver HdfsTextSimple
HdfsDataFragmenter, QuotedLineBreakAccessor, TextResolver HdfsTextMulti
HdfsDataFragmenter, AvroFileAccessor, AvroResolver Avro
HdfsDataFragmenter, SequenceFileAccessor, CustomWritable SequenceWritable
HBaseDataFragmenter, HBaseAccessor, HBaseResolver HBase
HiveDataFragmenter, HiveAccessor, HiveResolver Hive

The following table shows old versus new class names:

Old Name New Name
TextFileAccessor, LineBreakAccessor, LineReaderAccessor com.pivotal.pxf.plugins.hdfs.LineBreakAccessor
QuotedLineBreakAccessor com.pivotal.pxf.plugins.hdfs.QuotedLineBreakAccessor
AvroFileAccessor com.pivotal.pxf.plugins.hdfs.AvroFileAccessor
SequenceFileAccessor com.pivotal.pxf.plugins.hdfs.SequenceFileAccessor
TextResolver, StringPassResolver com.pivotal.pxf.plugins.hdfs.StringPassResolver
AvroResolver com.pivotal.pxf.plugins.hdfs.AvroResolver
WritableResolver com.pivotal.pxf.plugins.hdfs.WritableResolver
HdfsDataFragmenter com.pivotal.pxf.plugins.hdfs.HdfsDataFragmenter

Accessing HDFS File Data with PXF

Installing the PXF HDFS plugin

Install the PXF HDFS plugin jar file on all nodes in the cluster:

sudo rpm -i pxf-hdfs-2.5.1.0-x.rpm
  • PXF RPMs reside in the Pivotal ADS/HAWQ stack file. 
  • The script installs the JAR file at the default location at /usr/lib/gphd/pxf-2.5.1.0. The Softlink pxf-hdfs.jar will be created in /usr/lib/gphd/pxf
Note:
  • Pivotal recommends that you test PXF on HDFS before connecting to Hive or HBase.
  • PXF on secure HDFS clusters requires NameNode to be configured on port 8020.
  • HBase/Hive configurations requiring user authentication are not supported.

About the Public Directory

PXF provides a space to store your customized serializers and schema files on the filesystem. You must add schema files on all the datanodes and restart the cluster. The RPM creates the directory at the default location: /usr/lib/gphd/publicstage.

Ensure that all HDFS users have read permissions to HDFS services and limit write permissions to specific users.

Syntax

The syntax for accessing an HDFS file is as follows: 

CREATE [READABLE|WRITABLE] EXTERNAL TABLE <tbl_name> (<attr list>)
LOCATION ('pxf://<name_node_hostname:51200>/<path_to_file_or_directory>?Profile=<chosen_profile>[&<additional_options>=<value>]')
FORMAT '[TEXT | CSV | CUSTOM]' (<formatting_properties>)
[ [LOG ERRORS INTO <error_table>] SEGMENT REJECT LIMIT <count> [ROWS | PERCENT] ];

SELECT ... FROM <tbl_name>; --to read from hdfs with READABLE table.
INSERT INTO <tbl_name> ...; --to write to hdfs with WRITABLE table.

To read the data in the files or to write based on the existing format, you need to select the FORMAT, Profile, or one of the classes.

This topic describes the following:

  • FORMAT clause
  • Profile
  • Accessor
  • Resolver
  • Avro
Note: For more details about the API and classes, see PXF External Tables and API.

FORMAT clause

To read data, use the following formats with any PXF connector:

  • FORMAT ‘TEXT’: Use with plain delimited text files on HDFS.
  • FORMAT ‘CSV’: Use with comma-separated value files on HDFS.
  • FORMAT ‘CUSTOM’: Use with all other files, including Avro format and binary formats. Must always be used with the built-in formatter ‘pxfwritable_import’ (for read) or 'pxfwritable_export' (for write).

Profile

For plain or comma-separated text files use either the [HdfsTextSimple | HdfsTextMulti] Profile or a com.pivotal.pxf.plugins.hdfs.HdfsDataFragmenter for the HDFS file data. Use the Avro profile for Avro files.

Note: For read tables, you must include a Profile or a Fragmenter in the table definition.

Accessor

The choice of an Accessor depends on the HDFS data file type. 

Note: You must include a Profile or an Accessor in the table definition.
File Type Accessor FORMAT clause Comments
Plain Text delimited com.pivotal.pxf.plugins.hdfs.LineBreakAccessor FORMAT 'TEXT' (<format param list>)  Read + Write
Plain Text CSV 

com.pivotal.pxf.plugins.hdfs.LineBreakAccessor

FORMAT 'CSV' (<format param list>) 

LineBreakAccessor is parallel and faster.

Use if each logical data row is a physical data line.

Read + Write 

com.pivotal.pxf.plugins.hdfs.QuotedLineBreakAccessor

QuotedLineBreakAccessor is slower and non parallel.

Use if the data includes embedded (quoted) linefeed characters.

Read Only 

SequenceFile com.pivotal.pxf.plugins.hdfs.SequenceFileAccessor FORMAT 'CUSTOM' (formatter='pxfwritable_import')  Read + Write (use formatter='pxfwritable_export' for write)
AvroFile com.pivotal.pxf.plugins.hdfs.AvroFileAccessor FORMAT 'CUSTOM' (formatter='pxfwritable_import')  Read Only

Resolver

Choose the Resolver format if data records are serialized in the HDFS file. 

Note: You must include a Profile or a Resolver in the table definition.
Record Serialization Resolver Comments
Avro com.pivotal.pxf.plugins.hdfs.AvroResolver
  • Avro files include the record schema, Avro serialization can be used in other file types (e.g, Sequence File). 
  • For Avro serialized records outside of an Avro file, include a schema file name (.avsc) in the url under the optional Schema-Data option.
  • The schema file name must exist in the public stage directory.
  • Deserialize Only (Read) .
Java Writable com.pivotal.pxf.plugins.hdfs.WritableResolver
  • Include the name of the Java class that uses Writable serialization in the URL under the optional Schema-Data.
  • The class file must exist in the public stage directory (or in Hadoop's class path).
  • Deserialize and Serialize (Read + Write). 
  • See Customized Writable Schema File Guidelines.
None (plain text) com.pivotal.pxf.plugins.hdfs.StringPassResolver
  • Does not serialize plain text records. The database parses plain records. Passes records as they are.
  • Deserialize and Serialize (Read + Write).
Schema File Guidelines for WritableResolver

When using a WritableResolver, a schema file needs to be defined. The file needs to be a Java class file and must be on the class path of PXF.

The class file must follow the following requirements:

  1. Must implement org.apache.hadoop.io.Writable interface.
  2. WritableResolver uses reflection to recreate the schema and populate its fields (for both read and write). Then it uses the Writable interface functions to read/write.Therefore, fields must be public, to enable access to them. Private fields will be ignored.
  3. Fields are accessed and populated by the order in which they are declared in the class file.
  4. Supported field types:
    • boolean
    • byte array
    • double
    • float
    • int
    • long
    • short
    • string

    Arrays of any of the above types is supported, but the constructor must define the array size, so the reflection will work.

Additional Options

Table 1. Additional PXF Options
Option Name Description
COLLECTION_DELIM (Avro or Hive profiles only.) The delimiter character(s) to place between entries in a top-level array, map, or record field when PXF maps a Hive or Avro complex data type to a text column. The default is a "," character.
COMPRESSION_CODEC
  • Useful for WRITABLE PXF tables.
  • Specifies the compression codec class name for compressing the written data. The class must implement the org.apache.hadoop.io.compress.CompressionCodec interface.
  •  Some valid values are org.apache.hadoop.io.compress.DefaultCodec org.apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.BZip2Codec.
  • Note: org.apache.hadoop.io.compress.BZip2Codec runs in a single thread and can be slow.
  • This option has no default value. 
  • When the option is not defined, no compression will be done.
COMPRESSION_TYPE
  • Useful WRITABLE PXF tables with SequenceFileAccessor.
  • Ignored when COMPRESSION_CODEC is not defined.
  • Specifies the compression type for sequence file.
  • Valid options are: 
    • RECORD - only the value part of each row is compressed.
    • BLOCK - both keys and values are collected in 'blocks' separately and compressed.
  • Default value: RECORD.
MAPKEY_DELIM (Avro or Hive profiles only.) The delimiter character(s) to place between the key and value of a map entry when PXF maps a Hive or Avro complex data type to a text colum. The default is a ":" character.
RECORDKEY_DELIM (Avro profile only.) The delimiter character(s) to place between the field name and value of a record entry when PXF maps an Avro complex data type to a text colum. The default is a ":" character.
SCHEMA-DATA The data schema file used to create and read the HDFS file. For example, you could create an avsc (for Avro), or a Java class (for Writable Serialization) file. Check that the file exists on the public directory (see About the Public Directory).

This option has no default value.

THREAD-SAFE Determines if the table query can run in multithread mode or not. When set to FALSE, requests will be handled in a single thread.

Should be set when a plugin or other elements that are not thread safe are used (e.g. compression codec).

Allowed values: TRUE, FALSE. Default value is TRUE - requests can run in multithread mode.

 <custom> Any option added to the pxf URI string will be accepted and passed, along with its value, to the Fragmenter, Accessor, Analyzer and Resolver implementations.

Accessing Data on a High Availability HDFS Cluster

To access data on a High Availability HDFS cluster, you need to change the authority in the URI in the LOCATION. Use <HA nameservice> instead of <name node host:51200>.

CREATE [READABLE|WRITABLE] EXTERNAL TABLE <tbl name> (<attr list>)
LOCATION ('pxf://<HA nameservice>/<path to file or directory>?Profile=<chosen profile>[&<additional options>=<value>]')
FORMAT '[TEXT | CSV | CUSTOM]' (<formatting properties>);

The opposite is true when a highly available HDFS cluster is reverted to a single namenode configuration. In that case, any table definition that has the nameservice specified should use the <NN host>:<NN rest port> syntax. 

Using a Record Key with Key-Value File Formats

For sequence file and other file formats that store rows in a key-value format, the key value can be accessed through HAWQ by using the saved keyword 'recordkey' as a field name.

The field type must correspond to the key type, much as the other fields must match the HDFS data. 

WritableResolver supports read and write of recordkey, which can be of the following Writable Hadoop types:

  • BooleanWritable
  • ByteWritable
  • DoubleWritable
  • FloatWritable
  • IntWritable
  • LongWritable
  • Text

If the recordkey field is not defined, the key is ignored in read, and a default value (segment id as LongWritable) is written in write.

Example

Let's say we have a data schema Babies.class containing 3 fields: (name text, birthday text, weight float). 

An external table must include these three fields, and can either include or ignore the recordkey.

-- writable table with recordkey
CREATE WRITABLE EXTERNAL TABLE babies_registry (recordkey int, name text, birthday text, weight float)
LOCATION ('pxf://namenode_host:51200/babies_1940s?
ACCESSOR=com.pivotal.pxf.plugins.hdfs.SequenceFileAccessor&RESOLVER=com.pivotal.pxf.plugins.hdfs.WritableResolver&DATA-SCHEMA=Babies')
FORMAT 'CUSTOM' (formatter='pxfwritable_export');
INSERT INTO babies_registry VALUES (123456, "James Paul McCartney", "June 18, 1942", 3.800);
-- writable table without recordkey
CREATE WRITABLE EXTERNAL TABLE babies_registry2 (name text, birthday text, weight float)
LOCATION ('pxf://namenode_host:51200/babies_1940s?ACCESSOR=com.pivotal.pxf.plugins.SequenceFileAccessor&RESOLVER=com.pivotal.pxf.plugins.WritableResolver&DATA-SCHEMA=Babies')
FORMAT 'CUSTOM' (formatter='pxfwritable_export');
INSERT INTO babies_registry VALUES ("Richard Starkey", "July 7, 1940", 4.0); -- this record's key will have some default value

The same goes for reading data from an existing file with a key-value format, e.g. a Sequence file.

-- readable table with recordkey
CREATE EXTERNAL TABLE babies_1940 (recordkey int, name text, birthday text, weight float)
LOCATION ('pxf://namenode_host:51200/babies_1940s?ACCESSOR=com.pivotal.pxf.plugins.hdfs.SequenceFileAccessor&RESOLVER=com.pivotal.pxf.plugins.hdfs.WritableResolver&DATA-SCHEMA=Babies')
FORMAT 'CUSTOM' (formatter='pxfwritable_import');
SELECT * FROM babies_1940; -- retrieves each record's key
-- readable table without recordkey
CREATE EXTERNAL TABLE babies_1940_2 (name text, birthday text, weight float)
LOCATION ('pxf://namenode_host:51200/babies_1940s?ACCESSOR=com.pivotal.pxf.plugins.hdfs.SequenceFileAccessor&RESOLVER=com.pivotal.pxf.plugins.hdfs.WritableResolver&DATA-SCHEMA=Babies')
FORMAT 'CUSTOM' (formatter='pxfwritable_import');
SELECT * FROM babies_1940_2; -- ignores the records' key

Working with Avro Files

Avro files combine their data with a schema, and can support complex data types such as arrays, maps, records, enumerations, and fixed types. When you create a PXF external table to represent Avro data, map top-level fields in the schema that use a primitive data type to HAWQ columns of the same type. Map top-level fields that use a complex data type to a TEXT column in the external table. The PXF Avro profile automatically separates components of a complex type by inserting delimiters in the text column. You can then use functions or application code to further process components of the complex data.

The following table summarizes external table mapping rules for Avro data.

Table 2. Avro Data Type Mapping
Avro Data Type PXF Type
Primitive type (int, double, float, long, string, bytes, boolean) Corresponding HAWQ data type. See Data Types.
Complex type: Array, Map, Record, or Enum TEXT, with default delimiters inserted between collection items, mapped key-value pairs, and record data.
Complex type: Fixed BYTEA
Union Follows the above conventions for primitive or complex data types, depending on the union. Null values are supported in Unions.

For complex types, the PXF Avro profile inserts default delimiters between collection items and values. You can use non-default delimiter characters by including the COLLECTION_DELIM, MAPKEY_DELIM, and/or RECORDKEY_DELIM optional parameters to the Avro profile. See Table 1 for a description of the parameters.

Example

The following example uses the Avro schema shown in Sample Avro Schema and the associated data file shown in Sample Avro Data (JSON).

Sample Avro Schema
{
  "type" : "record",
  "name" : "example_schema",
  "namespace" : "com.example",
  "fields" : [ {
    "name" : "id",
    "type" : "long",
    "doc" : "Id of the user account"
  }, {
    "name" : "username",
    "type" : "string",
    "doc" : "Name of the user account"
  }, {
    "name" : "followers",
    "type" : {"type": "array", "items": "string"},
    "doc" : "Users followers"
  }, {
    "name": "rank",
    "type": ["null", "int"],
    "default": null
  }, {
    "name": "fmap",
    "type": {"type": "map", "values": "long"}
  }, {
    "name": "address",
    "type": {
        "type": "record",
        "name": "addressRecord",
        "fields": [
            {"name":"number", "type":"int"},
            {"name":"street", "type":"string"},
            {"name":"city", "type":"string"}]
    }
  }, {
   "name": "relationship",
    "type": {
        "type": "enum",
        "name": "relationshipEnum",
        "symbols": ["MARRIED","LOVE","FRIEND","COLLEAGUE","STRANGER","ENEMY"]
    }
  }, {
    "name" : "md5",
    "type": {
        "type" : "fixed",
        "name" : "md5Fixed",
        "size" : 4
    } ],
  "doc:" : "A basic schema for storing messages"
}
Sample Avro Data (JSON)
{"id":1, "username":"john","followers":["kate", "santosh"], "rank":null, "relationship": "FRIEND", "fmap": {"kate":10,"santosh":4},
"address":{"street":"renaissance drive", "number":1,"city":"san jose"}, "md5":\u3F00\u007A\u0073\u0074}

{"id":2, "username":"jim","followers":["john", "pam"], "rank":3, "relationship": "COLLEAGUE", "fmap": {"john":3,"pam":3}, 
"address":{"street":"deer creek", "number":9,"city":"palo alto"}, "md5":\u0010\u0021\u0003\u0004}
To map this Avro file to an external table, the top-level primitive fields ("id" of type long and "name" of type string) are mapped to their equivalent HAWQ types (bigint and text). The remaining complex fields are mapped to text columns:
gpadmin=# CREATE EXTERNAL TABLE avro_complex 
  (id bigint, 
  username text, 
  followers text, 
  rank int, 
  fmap text, 
  address text, 
  relationship text,
  md5 bytea) 
LOCATION ('pxf://localhost:51200/tmp/avro_complex?PROFILE=Avro')
FORMAT ‘CUSTOM’ (FORMATTER='pxfwritable_import');
The above command uses default delimiters for separating components of the complex types. This command is equivalent to the one above, but it explicitly sets the delimiters using the Avro profile parameters:
gpadmin=# CREATE EXTERNAL TABLE avro_complex 
  (id bigint, 
  username text, 
  followers text, 
  rank int, 
  fmap text, 
  address text, 
  relationship text,
  md5 bytea) 
LOCATION ('pxf://localhost:51200/tmp/avro_complex?PROFILE=Avro&COLLECTION_DELIM=,&MAPKEY_DELIM=:&RECORDKEY_DELIM=:')
FORMAT ‘CUSTOM’ (FORMATTER='pxfwritable_import');
A simple query of the external table shows the components of the complex type data separated with delimiters:
gpadmin=# select * from avro_complex;
id | username |  followers  |    rank     |  fmap   |    address  |  relationship  |  md5
1| john | [kate,santosh] |   | {kate:10,santosh:4} | {number:1,street:renaissance drive,city:san jose} | FRIEND | ?zst
2| jim | [john,pam] | 3 | {john:3,pam:3} | {number:9,street:deer creek,city:palo alto} | COLLEAGUE | \020!\003\004
You can process the delimited components in the text columns as necessary for your application. For example, the following command uses the string_to_array function to convert entries in the "followers" field to a text array column in a new view. The view is then queried to filter rows based on whether a particular follower appears in the array:
gpadmin=# create view followers_view as 
  select username, address, string_to_array(substring(followers from 2 for (char_length(followers) - 2)), ',')::text[] as followers from avro_complex;

gpadmin=# select username, address from followers_view where john = ANY(followers);
username | address
jim | {number:9,street:deer creek,city:palo alto}

Accessing Hive Data with PXF

Installing the PXF HIVE Plugin

Install the PXF HIVE plugin on all nodes in the cluster: 

sudo rpm -i pxf-hive-2.5.1.0-x.rpm
  • PXF RPMs reside in the Pivotal ADS/HAWQ stack file. 
  • The script installs the JAR file at the default location at /usr/lib/gphd/pxf-2.5.1.0. The Softlink pxf-hive.jar will be created in /usr/lib/gphd/pxf.
Note: PXF HIVE Prerequisites

Check the following before adding PXF support on Hive:

  • PXF HDFS plugin is installed on the cluster nodes.
  • You are running the Hive Metastore service on a machine in your cluster. 
  • Check that you have set the hive.metastore.uris property in the hive-site.xml on the Namenode.
  • The Hive JAR files and conf directory are installed on the cluster nodes.

Syntax

Hive tables are always defined in a specific way in PXF, regardless of the underlying file storage format. The PXF Hive plugins automatically detect source tables:

  • Text based
  • SequenceFile
  • RCFile
  • ORCFile
  • Parquet
  • Avro

The source table can also be a combination of these types. The PXF Hive plugin uses this information to query the data in runtime. The following PXF table definition is valid for any file storage type.

CREATE EXTERNAL TABLE hivetest(id int, newid int) 
LOCATION ('pxf://<NN host>:51200/<hive table name>?PROFILE=Hive')
FORMAT 'custom' (formatter='pxfwritable_import');

SELECT * FROM hivetest;
Note: 51200, as noted in the example above, is the REST server port on the HDFS NameNode. If a different port is assigned in your installation, use that port.

Hive Command Line

To start the Hive command line and work directly on a Hive table:

/>${HIVE_HOME}/bin/hive

Here's an example of how to create and load data into a sample Hive table from an existing file.

Hive> CREATE TABLE test (name string, type string, supplier_key int, full_price double) row format delimited fields terminated by ',';
Hive> LOAD DATA local inpath '/local/path/data.txt' into table test; 

Mapping Hive Complex Types

PXF supports Hive data types that are not primitive types. The supported Hive complex data types are: array, struct, map, and union. For example :

CREATE TABLE sales_collections (
  item STRING, 
  price FLOAT, 
  properties ARRAY<STRING>, 
  hash MAP<STRING,FLOAT>, 
  delivery_address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>,
  others UNIONTYPE<FLOAT, BOOLEAN, STRING>
)  
ROW FORMAT DELIMITED  FIELDS
TERMINATED BY '\001' COLLECTION ITEMS TERMINATED BY '\002' MAP KEYS TERMINATED BY '\003' LINES TERMINATED BY '\n' STORED AS TEXTFILE;
LOAD DATA LOCAL INPATH '/local/path/<some data file>' INTO TABLE sales_collection;

To query a Hive table schema similar to the one in the example, you define the PXF external table with a text column for each corresponding complex type. PXF automatically inserts default delimiter characters between components of the complext type, or you can specify the delimiters to use with paramters to the Hive profile (see Table 1). For example:

CREATE EXTERNAL TABLE gp_sales_collections(
  item_name TEXT,
  item_price REAL,
  properties TEXT,
  hash TEXT,
  delivery_addresses TEXT,
  others TEXT
)
LOCATION ('pxf://<namenode_host>:51200/sales_collections?PROFILE=Hive&COLLECTION_DELIM=,&MAPKEY_DELIM=:')
FORMAT 'custom' (FORMATTER='pxfwritable_import');

You can use functions or application code to extract the components of the complex data columns as necessary.

Partition Filtering

The PXF Hive plugin uses the Hive partitioning feature and directory structure. This enables partition exclusion on HDFS files that contain the Hive table. To use the Partition Filtering feature to reduce network traffic and I/O, run a PXF query using a WHERE clause that refers to a specific partition in the partitioned Hive table.

To take advantage of PXF Partition filtering push-down, name the partition fields in the external table. These names must be the same as those stored in the Hive table. Otherwise, PXF ignores Partition filtering and the filtering is performed on the HAWQ side, impacting performance.

Note: The Hive plugin only filters on partition columns, not on other table attributes.

Example

Create a Hive table sales_part with 2 partition columns - delivery_state and delivery_city:

CREATE TABLE sales_part (name STRING, type STRING, supplier_key INT, price DOUBLE)
PARTITIONED BY (delivery_state STRING, delivery_city STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

Load data into this Hive table and add some partitions:

LOAD DATA LOCAL INPATH '/local/path/data1.txt' INTO TABLE sales_part PARTITION(delivery_state = 'CALIFORNIA', delivery_city = 'San Francisco');
LOAD DATA LOCAL INPATH '/local/path/data2.txt' INTO TABLE sales_part PARTITION(delivery_state = 'CALIFORNIA', delivery_city = 'Sacramento');
LOAD DATA LOCAL INPATH '/local/path/data3.txt' INTO TABLE sales_part PARTITION(delivery_state = 'NEVADA'    , delivery_city = 'Reno');
LOAD DATA LOCAL INPATH '/local/path/data4.txt' INTO TABLE sales_part PARTITION(delivery_state = 'NEVADA'    , delivery_city = 'Las Vegas');

The Hive storage directory should appears as follows:

/hive/warehouse/sales_part/delivery_state=CALIFORNIA/delivery_city=’San Francisco’/data1.txt
/hive/warehouse/sales_part/delivery_state=CALIFORNIA/delivery_city=Sacramento/data2.txt
/hive/warehouse/sales_part/delivery_state=NEVADA/delivery_city=Reno/data3.txt
/hive/warehouse/sales_part/delivery_state=NEVADA/delivery_city=’Las Vegas’/data4.txt

To define a PXF table to read this Hive table and take advantage of partition filter push-down, define the fields corresponding to the Hive partition fields at the end of the attribute list. 

When defining an external table, check that the fields corresponding to the Hive partition fields are at the end of the column list. In HiveQL, issuing a select* statement on a partitioned table shows the partition fields at the end of the record.

CREATE EXTERNAL TABLE pxf_sales_part(
  item_name TEXT, 
  item_type TEXT, 
  supplier_key INTEGER, 
  item_price DOUBLE PRECISION, 
  delivery_state TEXT, 
  delivery_city TEXT
)
LOCATION ('pxf://namenode_host:51200/sales_part?Profile=Hive')
FORMAT 'custom' (FORMATTER='pxfwritable_import');

SELECT * FROM pxf_sales_part;

Example

In the following example, the HAWQ query filters the delivery_city partition Sacramento. The filter on  item_name is not pushed down, since it is not a partition column. It is performed on the HAWQ side after all the data on Sacramento is transferred for processing.

SELECT * FROM pxf_sales_part WHERE delivery_city = 'Sacramento' AND item_name = 'shirt';

Example

The following HAWQ query reads all the data under delivery_city partition CALIFORNIA, regardless of the city partition.

SELECT * FROM pxf_sales_part WHERE delivery_state = 'CALIFORNIA'

Using PXF with Hive Default Partitions

This topic describes a difference in query results between Hive and PXF queries when Hive tables use a default partition.

When dynamic partitioning is enabled in Hive, a partitioned table may store data in a default partition. Hive creates a default partition when the value of a partitioning column does not match the defined type of the column (for example, when a NULL value is used for any partitioning column). In Hive, any query that includes a filter on a partition column excludes any data that is stored in the table's default partition.

Similar to Hive, PXF represents a table's partitioning columns as columns that are appended to the end of the table. However, PXF translates any column value in a default partition to a NULL value. This means that a HAWQ query that includes an IS NULL filter on a partitioning column can return different results as compared to the same Hive query.

Example

Consider a Hive partitioned table that is created with the statement:
CREATE TABLE sales (order_id bigint, order_amount float) PARTITIONED BY (date date);
The table is loaded with five rows that contain the following data:
1    1.0    1900-01-01
2    2.2    1994-04-14
3    3.3    2011-03-31
4    4.5    NULL
5    5.0    2013-12-06
In this case, the insertion of row 4 creates a Hive default partition, because the partition column "date" contains a null value.
In Hive, any query that filters on the partition column omits data in the default partition. For example the following query returns no rows:
hive> select * from sales where date is null;

However, if you map this table as a PXF external table in HAWQ, all default partition values are translated into actual NULL values. In HAWQ, executing the same query against the PXF table returns row 4 as the result, because the filter matches the NULL value.

Keep this behavior in mind when executing IS NULL queries on Hive partitioned tables.

Accessing Hive Tables in Parquet Format

The PXF Hive profile supports both non-partitioned and partitioned Hive tables that use the Parquet storage format in HDFS. Simply map the table columns using equivalent HAWQ data types. For example, in Hive if a table is created using:
create table hive_parquet_table (fname string, lname string, custid int, acctbalance double) stored as
      parquet;
Then you would define the HAWQ external table using:
create external table pxf_parquet_table (fname text, lname text, custid int, acctbalance double precision) location
      ('pxf://localhost:51200/hive_parquet_table?profile=Hive') format 'custom' (formatter='pxfwritable_import'); 

Accessing HBase Data with PXF

Installing the PXF HBase Plugin

Install the PXF HBase plugin on all nodes in the cluster: 

sudo rpm -i pxf-hbase-2.5.1.0-x.rpm
  • PXF RPMs reside in the Pivotal ADS/HAWQ stack file. 
  • The script installs the JAR file at the default location at /usr/lib/gphd/pxf-2.5.1.0. The Softlink pxf-hbase.jar will be created in /usr/lib/gphd/pxf.
Note: PXF HBase Prerequisites

Before using the PXF HBase plugin, verify the following:

  • PXF HDFS plugin is installed on the cluster nodes.
  • HBase and zookeeper jars are installed on the cluster nodes.
  • The /etc/hbase/hbase-env.sh configuration file must reference the pxf-hbase.jar. For example, /etc/hbase/hbase-env.sh should include the line:
    export HBASE_CLASSPATH=\
    `echo /usr/lib/gphd/pxf/pxf-hbase.jar`:\
    Note: You must restart HBase after making any changes to the HBase configuration.

Syntax

  To query an HBase table, use the following syntax:

CREATE EXTERNAL TABLE <pxf tblname> (<col list - see details below>)
LOCATION ('pxf://<NN REST host>:<NN REST port>/<HBase table name>?PROFILE=HBase') 
FORMAT 'CUSTOM' (FORMATTER='pxfwritable_import');
 
SELECT * FROM <pxf tblname>;

Column Mapping

Most HAWQ external tables (PXF or others) require that the HAWQ table attributes match the source data record layout, and include all the available attributes. However, use the PXF HBase plugin to specify the subset of HBase qualifiers that define the HAWQ PXF table. To set up a clear mapping between each attribute in the PXF table and a specific qualifier in the HBase table, you can use either:

  • Direct mapping
  • Indirect mapping

In addition, the HBase row key is handled in a special way.

Row Key

You can use the HBase table row key in several ways. For example, you can see them using query results, or you can run a WHERE clause filter on a range of row key values. To use the row key in the HAWQ query, define the HAWQ table with the reserved PXF attribute recordkey. This attribute name tells PXF to return the record key in any key-value based system and in HBase.

Note: Since HBase is byte and not character-based, Pivotal recommends that you define the recordkey as type bytea. This may result in better ability to filter data and increase performance.
CREATE EXTERNAL TABLE <tname> (recordkey bytea, ... ) LOCATION ('pxf:// ...')

Direct Mapping

Use Direct Mapping to map HAWQ table attributes to HBase qualifiers. You can specify the HBase qualifier names of interest, with column family names included, as quoted values. 

For example, you have defined an HBase table called hbase_sales with multiple column families and many qualifiers. To see the following in the resulting attribute section of the CREATE EXTERNAL TABLE:

  • rowkey
  • qualifier saleid in the column family cf1
  • qualifier comments in the column family cf8 
CREATE EXTERNAL TABLE hbase_sales (
  recordkey bytea,
  “cf1:saleid” int,
  “cf8:comments” varchar
) ...

The PXF HBase plugin uses these attribute names as-is and returns the values of these HBase qualifiers.

Indirect Mapping (via Lookup Table)

Direct mapping method is fast and intuitive, but using indirect mapping helps to reconcile HBase qualifier names with HAWQ behavior:

  • HBase qualifier names that are longer than 32 characters. HAWQ has a 32 character limit on attribute name size.
  • HBase qualifier names can be binary or non-printable. HAWQ attribute names are character based.

In either case, Indirect Mapping uses a lookup table on HBase. You can create the lookup table to store all necessary lookup information. This works as a template for any future queries. The name of the lookup table must be pxflookup and must include the column family named mapping.

Using the sales example in Direct Mapping, if our rowkey represents the HBase table name and the mapping column family includes the actual attribute mapping in the key value form of
<hawq attr name>=<hbase cf:qualifier>. 

Example

(row key) mapping
sales id=cf1:saleid
sales cmts=cf8:comments
Note: The mapping assigned new names for each qualifier. You can use these names in your HAWQ table definition:
CREATE EXTERNAL TABLE hbase_sales (
  recordkey bytea
  id int,
  cmts varchar
) ...

PXF automatically matches HAWQ to HBase column names when a pxflookup table exists in HBase.

Troubleshooting

The following table describes some common errors while using PXF:

Table 3. PXF Errors and Explanation
Error Common Explanation
ERROR:  invalid URI pxf://localhost:51200/demo/file1: missing options section LOCATION does not include options after the file name: <path>?<key>=<value>&<key>=<value>...
ERROR:  protocol "pxf" does not exist HAWQ is not compiled with PXF  protocol. It requires the GPSQL version of HAWQ version.
ERROR:  remote component error (0) from '<x>': There is no pxf servlet listening on the host and port specified in the external table url. Wrong server or port or the service is not started.
ERROR:  Missing FRAGMENTER option in the pxf uri: pxf://localhost:51200/demo/file1?a=a

No FRAGMENTER option was specified in LOCATION.

ERROR:  remote component error (500) from '<x>':   type  Exception report   message   org.apache.hadoop.mapred.InvalidInputException:

Input path does not exist: hdfs://0.0.0.0:8020/demo/file1  

File or pattern given in LOCATION doesn't exist on specified path. 
ERROR: remote component error (500) from '<x>':   type  Exception report   message   org.apache.hadoop.mapred.InvalidInputException : Input Pattern hdfs://0.0.0.0:8020/demo/file* matches 0 files  File or pattern given in LOCATION doesn't exist on specified path.
ERROR:  remote component error (500) from '<x>': PXF not correctly installed in CLASSPATH Cannot find PXF Jar
ERROR:  GPHD component not found Either the required data node does not exist or PXF service (tcServer) on data node is not started or PXF webapp was not started.
ERROR:  remote component error (500) from '<x>':  type  Exception report   message   java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/HTableInterface One of the classes required for running PXF or one of its plugins is missing. Check that all resources in the PXF classpath files exist on the cluster nodes.
ERROR: remote component error (500) from '<x>':   type  Exception report   message   java.io.IOException: Can't get Master Kerberos principal for use as renewer Secure PXF: YARN isn't properly configured for secure (Kerberized) HDFS installs
ERROR: fail to get filesystem credential for uri hdfs://<namenode>:8020/ Secure PXF: Wrong HDFS host or port is not 8020 (this is a limitation that will be removed in the next release)
ERROR: remote component error (413) from '<x>': HTTP status code is 413 but HTTP response string is empty The PXF table number of attributes and their name sizes are too large for tcServer to accommodate in its request buffer. The solution is to increase the value of the maxHeaderCount and maxHttpHeaderSize parameters on server.xml on tcServer instance on all nodes and then restart PXF:

<Connector acceptCount="100" connectionTimeout="20000" executor="tomcatThreadPool" maxKeepAliveRequests="15"maxHeaderCount="<some larger value>"maxHttpHeaderSize="<some larger value in bytes>" port="${bio.http.port}" protocol="org.apache.coyote.http11.Http11Protocol" redirectPort="${bio.https.port}"/>

HBase Specific Errors
ERROR:  remote component error (500) from '<x>':   type  Exception report   message    org.apache.hadoop.hbase.client.NoServerForRegionException: Unable to find region for t1,,99999999999999 after 10 tries. HBase service is down, probably HRegionServer.
ERROR:  remote component error (500) from '<x>':  type  Exception report   message   org.apache.hadoop.hbase.TableNotFoundException: nosuch HBase cannot find the requested table
ERROR:  remote component error (500) from '<x>':  type  Exception report   message   java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/client/HTableInterface PXF cannot find a required JAR file, probably HBase's
ERROR:  remote component error (500) from '<x>':   type  Exception report   message   java.lang.NoClassDefFoundError: org/apache/zookeeper/KeeperException PXF cannot find Zookeeper's JAR
ERROR:  remote component error (500) from '<x>':  type  Exception report   message   java.lang.Exception: java.lang.IllegalArgumentException: Illegal HBase column name a, missing : PXF table has an illegal field name. Each field name must correspond to an HBase column in the syntax <column family>:<field name>
ERROR: remote component error (500) from '<x>': type Exception report message org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException: Column family a does not exist in region t1,,1405517248353.85f4977bfa88f4d54211cb8ac0f4e644. in table 't1', {NAME =&gt; 'cf', DATA_BLOCK_ENCODING =&gt; 'NONE', BLOOMFILTER =&gt; 'ROW', REPLICATION_SCOPE =&gt; '0', COMPRESSION =&gt; 'NONE', VERSIONS =&gt; '1', TTL =&gt; '2147483647', MIN_VERSIONS =&gt; '0', KEEP_DELETED_CELLS =&gt; 'false', BLOCKSIZE =&gt; '65536', ENCODE_ON_DISK =&gt; 'true', IN_MEMORY =&gt; 'false', BLOCKCACHE =&gt; 'true'} Required HBase table does not contain the requested column.
Hive Specific Errors
ERROR:  remote component error (500) from '<x>':  type  Exception report   message   java.lang.RuntimeException: Failed to connect to Hive metastore: java.net.ConnectException: Connection refused Hive Metastore service is down.
ERROR:  remote component error (500) from '<x>': type  Exception report   message

NoSuchObjectException(message:default.players table not found)

Table doesn't exist in Hive.