HAWQ InputFormat for MapReduce

MapReduce is a programming model developed by Google for processing and generating large data sets on an array of commodity servers. You can use the HAWQ InputFormat option to enable MapReduce jobs to access HAWQ data stored in HDFS.

To use HAWQ InputFormat, you only need to provide the URL of the databasen to connect to, along with the table name you want to access. HAWQ InputFormat only fetches the metadata of the database and table of interest, which is much less data than the table data itself. After getting the metadata, the HAWQ InputFormat determines where and how the table data is stored in HDFS. It reads and parses those HDFS files and processes the parsed table tuples directly inside a Map task.

This chapter describes the document format and schema for defining HAWQ MapReduce jobs.

Supported Data Types

HAWQ InputFormat supports the following data types:

SQL/HAWQ JDBC/JAVA setXXX getXXX
DECIMAL/NUMERIC java.math.BigDecimal setBigDecimal getBigDecimal
FLOAT8/DOUBLE PRECISION double setDouble getDouble
INT8/BIGINT long setLong getLong
INTEGER/INT4/INT int setInt getInt
FLOAT4/REAL float setFloat getFloat
SMALLINT/INT2 short setShort getShort
BOOL/BOOLEAN boolean setBoolean getBoolean
VARCHAR/CHAR/TEXT String setString getString
DATE java.sql.Date setDate getDate
TIME/TIMETZ java.sql.Time setTime getTime
TIMESTAMP/TIMSTAMPTZ java.sql.Timestamp setTimestamp getTimestamp
ARRAY java.sq.Array setArray getArray
BIT/VARBIT com.pivotal.hawq.mapreduce.datatype. setVarbit getVarbit
BYTEA byte[] setByte getByte
INTERVAL com.pivotal.hawq.mapreduce.datatype.HAWQInterval setInterval getInterval
POINT com.pivotal.hawq.mapreduce.datatype.HAWQPoint setPoint getPoint
LSEG com.pivotal.hawq.mapreduce.datatype.HAWQLseg setLseg getLseg
BOX com.pivotal.hawq.mapreduce.datatype.HAWQBox setBox getBox
CIRCLE com.pivotal.hawq.mapreduce.datatype.HAWQCircle setVircle getCircle
PATH com.pivotal.hawq.mapreduce.datatype.HAWQPath setPath getPath
POLYGON com.pivotal.hawq.mapreduce.datatype.HAWQPolygon setPolygon getPolygon
MACADDR com.pivotal.hawq.mapreduce.datatype.HAWQMacaddr setMacaddr getMacaddr
INET com.pivotal.hawq.mapreduce.datatype.HAWQInet setInet getInet
CIDR com.pivotal.hawq.mapreduce.datatype.HAWQCIDR setCIDR getCIDR

HAWQ InputFormat Example

The following example shows how you can use the HAWQ InputFormat to access HAWQ table data from MapReduce jobs.

package com.mycompany.app;
import com.pivotal.hawq.mapreduce.HAWQException;
import com.pivotal.hawq.mapreduce.HAWQInputFormat;
import com.pivotal.hawq.mapreduce.HAWQRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.io.IntWritable;

import java.io.IOException;
public class HAWQInputFormatDemoDriver extends Configured
implements Tool {

    // CREATE TABLE employees (
    // id INTEGER NOT NULL, name VARCHAR(32) NOT NULL);
    public static class DemoMapper extends
        Mapper<Void, HAWQRecord, IntWritable, Text> {
       int id = 0;
       String name = null;
       public void map(Void key, HAWQRecord value, Context context)
        throws IOException, InterruptedException {
        try {
        id = value.getInt(1);
        name = value.getString(2);
        } catch (HAWQException hawqE) {
        throw new IOException(hawqE.getMessage());
        }
        context.write(new IntWritable(id), new Text(name));
       }
    }
    private static int printUsage() {
       System.out.println("HAWQInputFormatDemoDriver
       <database_url> <table_name> <output_path> [username]
       [password]");
       ToolRunner.printGenericCommandUsage(System.out);
       return 2;
    }

    public int run(String[] args) throws Exception {
       if (args.length < 3) {
        return printUsage();
       }
       Job job = new Job(getConf());
       job.setJobName("hawq-inputformat-demo");
       job.setJarByClass(HAWQInputFormatDemoDriver.class);
       job.setMapperClass(DemoMapper.class);
       job.setMapOutputValueClass(Text.class);
       job.setOutputValueClass(Text.class);
       String db_url = args[0];
       String table_name = args[1];
       String output_path = args[2];
       String user_name = null;
       if (args.length > 3) {
         user_name = args[3];
       }
       String password = null;
       if (args.length > 4) {
         password = args[4];
       }
       job.setInputFormatClass(HAWQInputFormat.class);
       HAWQInputFormat.setInput(job.getConfiguration(), db_url,
       user_name, password, table_name);
       FileOutputFormat.setOutputPath(job, new
       Path(output_path));
       job.setNumReduceTasks(0);
       int res = job.waitForCompletion(true) ? 0 : 1;
       return res;
    }

    public static void main(String[] args) throws Exception {
       int res = ToolRunner.run(new Configuration(),
         new HAWQInputFormatDemoDriver(), args);
       System.exit(res);
    }
}

To compile and run the example:

  1. Add the following dependencies into the project for compilation:

    1. HAWQInputFormat jars (located in the $GPHOME/lib/postgresql/hawq-mr-io directory):
      • hawq-mapreduce-common.jar
      • hawq-mapreduce-ao.jar
      • hawq_mapreduce-parquet.jar
      • hawq-mapreduce-tool.jar
    2. Required 3rd party jars (located in the $GPHOME/lib/postgresql/hawq-mr-io/lib directory):
      • parquet-column-1.1.0.jar
      • parquet-common-1.1.0.jar
      • parquet-encoding-1.1.0.jar
      • parquet-format-1.1.0.jar
      • parquet-hadoop-1.1.0.jar
      • postgresql-jdbc.jar
      • snakeyaml.jar
    3. Hadoop Mapreduce related jars (located in the install directory of your Hadoop distribution).
  2. Check that you have installed HAWQ, HDFS and Yarn.

  3. Create sample table:

    1. Log in to HAWQ:

       $ psql -d postgres 
      
    2. Create the sample table:

      CREATE TABLE employees (
      id INTEGER NOT NULL,
      name TEXT NOT NULL);
      

      Or a Parquet table:

      CREATE TABLE employees ( id INTEGER NOT NULL, name TEXT NOT NULL) with (appendonly=true, orientation=parquet);
      
    3. Insert one tuple:

      INSERT INTO employees VALUES (1, 'Paul');
      
    4. Use the following shell script snippet showing how to run the Mapreduce job:

      # Suppose all five needed jars are under ./lib
      export
      LIBJARS=lib/hawq-mapreduce-common.jar,lib/hawq-mapreduce-ao.
      jar,lib/hawq-mapreduce-tool.jar,lib/postgresql-9.2-1003-jdbc
      4.jar,lib/snakeyaml-1.12.jar
      export
      HADOOP_CLASSPATH=lib/hawq-mapreduce-common.jar:lib/hawq-mapr
      educe-ao.jar:lib/hawq-mapreduce-tool.jar:lib/postgresql-9.2-
      1003-jdbc4.jar:lib/snakeyaml-1.12.jar
      # Suppose the built application jar is my-app.jar
      hadoop jar my-app.jar
      com.mycompany.app.HAWQInputFormatDemoDriver -libjars
      ${LIBJARS} localhost:5432/postgres employees /tmp/employees
      
    5. Use the following command to check the result of the Mapreduce job:

      $ hadoop fs -cat /tmp/employees/*
      

The output will appear as follows:

1 Paul

Accessing HAWQ Data

You can access HAWQ data using the following interfaces:

  • HAWQInputFormat.setInput API: Use this when HAWQ is running.
  • Metadata Export Tool: Use this when HAWQ is not running.

HAWQInputFormat.setInput

  /**
    * Initializes the map-part of the job with the appropriate input settings
    * through connecting to Database.
    *
    * @param conf
    * The map-reduce job configuration
    * @param db_url
    * The database URL to connect to
    * @param username
    * The username for setting up a connection to the database
    * @param password
    * The password for setting up a connection to the database
    * @param tableName
    * The name of the table to access to
    * @throws Exception
    */
public static void setInput(Configuration conf, String db_url,
    String username, String password, String tableName)
throws Exception;

Metadata Export Tool

Use the metadata export tool, hawq extract, to export the metadata of the target table into a local YAML file:

$ hawq extract [-h hostname] [-p port] [-U username] [-ddatabase] [-o output_file] [-W] <tablename>

Using the extracted metadata, access HAWQ data through the following interface:

 /**
   * Initializes the map-part of the job with the appropriate input settings through reading metadata file stored in local filesystem.
   *
   * To get metadata file, please use hawq extract first
   *
   * @param conf
   * The map-reduce job configuration
   * @param pathStr
   * The metadata file path in local filesystem. e.g.
   * /home/gpadmin/metadata/postgres_test
   * @throws Exception
   */
public static void setInput(Configuration conf, String pathStr)
   throws Exception;