Wednesday, May 13, 2015

Python JSON Based Configuration

Given a JSON configuration file as such

{
    "queue_args":{
        "host"                   :"localhost",
        "port"                   :"15672",
        "virtual_host"           :"/",
        "channel_max"            :"None",   /* Int of AMQP channel_max value*/
        "frame_max"              :"None",   /* Int of AMQP frame_max value*/
        "heartbeat_interval"     :"None",   /* Int of AMQP heartbeat_interval*/
        "ssl"                    :"None",   /* Bool to enable ssl*/
        "ssl_options"            :"None",   /* Dict of ssl options. See https://www.rabbitmq.com/ssl.html*/
        "connection_attempts"    :"1000",   /* Int maximum number of retry attempts*/
        "retry_delay"            :"0.25",   /* Float time to wait in seconds, before the next.*/
        "socket_timeout"         :"None",   /* Int socket timeout (in seconds?) for high latency networks*/
        "locale"                 :"None",  
        "backpressure_detection" :"None",   /* Bool to toggle backpressure detection*/
        "login"                  :"guest",
        "password"               :"guest",
        "exchange"               :"",
        "exchange_type"          :"fanout"
    },
    "daemon_args":{
        "daemon"     : "False",             /* Bool to run as a daemon rather than as an immediate process*/
        "pidfile"    : "StreamMessage.pid", /* the daemon PID file (default: %default)*/
        "working-dir": ".",                 /* the directory to run the daemon in*/
        "uid"        : "os.getuid()",       /* the userid to run the daemon as (default: inherited from parent process)*/
        "gid"        : "os.getgid()",       /* the groupid to run the daemon as (default: inherited from parent process)*/
        "umask"      : "0022",              /* the umask for files created by the daemon (default: 0022)*/
        "stdout"     : "False",             /* sends standard output to the file STDOUT if set*/
        "stderr"     : "False"              /* sends standard error to the file STDERR if set*/   
    },   
    "spark_args":{
        "connection":  "main",              /* The name of the connection configuration to host */
        "connector":   "connection",        /* Override the connector implementation entry point */
        "class": "MessageReceiver",         /* The entry point for your application (e.g. org.apache.spark.examples.SparkPi)*/
        "master": "spark://192.168.56.101", /* The master URL for the cluster. TODO: determine correct port  (e.g. spark://23.195.26.187:7077)*/
        "deploy-mode": "client",            /* Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client)*/
        "conf": "None",                     /* Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown).*/
        "application-jar": "None",          /* Path to jar including your application and all dependencies. Must be globally visible inside of your cluster, hdfs:// or file:// present on all nodes.*/
        "application-arguments": "None"     /* Arguments passed to the main method of your main class, if any*/
    }
}

You will need to minify, parse, and unit test.

from jsmin import jsmin
import json
json_file = 'c:\\Temp\\config.json'
raw_data=open(json_file)
mini_data=jsmin(raw_data.read())
json_data=json.loads(mini_data)
print json_data['rabbit_args']['exchange_type']
raw_data.close()

Tuesday, May 12, 2015

SQL on Hadoop Part 1 - Hive

I ended up taking a wring turn on the path to building Spark applications. So to get a quick win on my project we decided to delay the streaming and move to SQL and tabular storage of data for trend analysis.

In the Hadoop world you can use SQL with several engines against heterogeneous data sources. The easiest way to render data using ANSI 92 SQL is using Hive. This is a database on HDFS and a rendering engine. The Hive engine differs wildly from a SQL Server or Oracle database engine. In SQL Server the query optimizer uses a cost-based approach to determine which physical operator (access to a tabular pointer/location table and column) will implement the logical operators (algebraic operation like UNION) in the DML statement. In Hive the logical operators in the DML statement are rendered into map reduce jobs, each of which spins up a JVM, a costly and unintuitive process to try and performance tune for.



One major difference, and a selling point for Hadoop, is the idea that schema is not applied to data as it is written to Hadoop. Instead it is expected that you will serialize and deserialize data written to Hadoop as part of your process and that you will infer the schema on reading the data. This means there is no costly application of schema before you can start importing data. And that you can restructure the same data several ways without reimporting and rewriting that data to different datamarts. The down side is that for schema on read to be reusable the deserialization or libraries applying schema must be a shared resource across various data applications.

The Hive warehouse is the metadata story (Derby or My SQL) describing the Hive layout.

Databases in Hive are more of a namespace abstraction separating data stores. Hive supports the following storage objects:

HDFS files: distributed file system managed by Hadoop and allowing ubiqiotous access across data nodes, redundancy, durability. Differs from SAN and RAID type storage technologies in management but not really in access or application of ACLs

Databases: namespace for lower level data objects

Tables: columnar data storage just like any database
Partitions: Split a table based on the value of a column that determines where the data is physically stored. Partitioning tables will change how Hive structures the data storage on HDFS and will create subdirectories reflecting the structure of the partitioning. In SQL Server 2008 there was a limit of 1000 partitions (Month 1, Month 2, ..., Month 1,000). Later it was raised to 10,000 partitions. I am not sure what the limit is in Hadoop. Over partitioning can create a large number of files and directories, and add thrashing / processing overhead to the NameNode (which manages the file system locations in memory).

Buckets: A partition physically organizes data horizontally (by row) based on the range of values in the partitioning column to be used in the WHERE clause. A Hive table can be PARTITIONED BY (PostalCode STRING, StartedDate DateTime). Bucketing decomposes data sets based on a column as well. However a column that is used as a bucketing column will be hashed by a user-defined number of buckets. This is coded as CLUSTER BY Col1 INTO 12 BUCKETS and that will create 12 data groups per partition (say...PostalCode) by Col1. The number of buckets does not change because of data volume (count or storage size). Bucketing is essential in performance tuning map-side joins and prolly other stuff.

Warnings on external tables. If you drop an object external to (not managed by) Hive, the metadata about this is removed from the ive Warehouse, but the data is not removed.

SQL is not actually ANSI 92, even though some say it is. Hive SQL allows Java Regex column specification, uses LIMIT in place of TOP, and allows a really wrong SQL syntax along side an ANSI compliant SQL. Both of these are legal:
A. SELECT Col1,Col2 AS Total FROM Table1 WHERE 1=1 LIMIT 100;
B. FROM Table1 SELECT Col1,Col2 AS Total WHERE 1=;

The terminating semicolon is required, not optional.

Hive supports subqueries, but only as correlated subqueries nested in the FROM clause (not from the projected column list)

SELECT Tbl2.Col1, Tbl1.Col2
FROM (
SELECT ColA + ColB AS Col1
     FROM Table2) Tbl2 JOIN Table1 Tbl1 ON (Tbl1.Pk = Tbl2.FK)

Hive supports UNION ALL (return duplicates) but not UNION (return unique values).

Hive can access data in "Hive Managed Tables" on HDFS, or external tables in HBase or Cassandra. This may work with Mongo leveraging the Mongo-Hadoop Connector.

PRO TIP: Tables have custom/extended properties. These are not just descriptive, but can be leveraged by SerDes (serializer/deserializer) to determine data structure.

One of the biggest risks in Hive is the lazy evaluation of schema. You can define a table based on input of a deserialized HL7 OBX and insert a MISMO based document in the same table. Hive nor Hadoop will warn you of the clash. When you go to retrieve a record from the second document, an error will occur. So each app needs to be aware of potential issues caused by ETL no honoring the schema .