Tuesday, March 31, 2015

Building Apache Spark Applications In Visual Studio

There are several applications named Spark. This post refers to Apache's in-memory streaming application suite named Spark. This is part of the Hadoop reference architecture. Generally speaking the documentation is oriented towards developers coding on a Linux laptop using sbt or Maven for CI support. I am working on a mixed team which prefers the Microsoft tooling (as do I). Our goal was to find a language that had built in Visual Studio and TFS support that would also deploy to HDFS using the Hadoop/Spark supported deployment tooling (Ambari and spark submit) with only minor environment reconfiguration.

Tooling:
Team Foundation Server 2013
Visual Studio 2013
Python Tools for Visual Studio (UnitTest, pip, python environment support, REPL integration, and MSBuild support for Python setup tools)
Spark 1.3
Python 2.7 (3.N, PyPy, and Anaconda are not tested with Spark yet)
CentOS
Windows 8.1
Hortonworks magical sandbox.
Powershell 3
GitHub for pulling Spark


Windows Setup
Install VS, PTVS, Python 2.7, IPython, nad GitHub to the default paths.
Install Spark in C:\Spark
Add a ton of environment variables (super important).

SPARK_HOME      C:\Spark
PYSPARK_HOME C:\Spark\Python
PY4J_HOME          C:\Spark\python\lib\py4j-0.8.2.1-src.zip
PYTHONPATH       C:\python27;C:\python27\scripts;c:\python27;c:\python27\scripts;%SPARK_HOME%;%PYSPARK_HOME%;%PY4J_HOME%

These are optional
PYTHON2      C:\python27\python
PYTHON3      C:\python3\python
ANACONDA C:\Users\ealdinger\AppData\Local\Continuum\Anaconda
GIT_HOME    <wherever you dump your files>

You can do this from Powershell or System - Advanced - Environment Variables

Testing Setup
Open Powershell
$py = $env:Path|select-string -pattern "c:\\python27"
$spark = $env:Path|select-string -pattern "c:\\spark"
$pyspark = $env:Path|select-string -pattern "c:\\spark\\python"
$py4j = $env:Path|select-string -pattern "C:\\Spark\\python\\lib\\py4j-0.8.2.1-src.zip"
$py -ne $null;$spark -ne $null;$pyspark -ne $null;$py4j -ne $null;

Start IPython and test that pyspark can be imported
Paste the lines below
from pyspark import SparkContext
logFile = "c:\spark\README.md"  # Should be some file on your system
sc = SparkContext("local", "SimpleApp")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "_-_-_-_-_-_-_-_-_-_-_-_-_-_-"
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
print "_-_-_-_-_-_-_-_-_-_-_-_-_-_-"
sc.stop()

Look for
In [32]: print "_-_-_-_-_-_-_-_-_-_-_-_-_-_-"
_-_-_-_-_-_-_-_-_-_-_-_-_-_-
In [33]: print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 60, lines with b: 29
In [34]: print "_-_-_-_-_-_-_-_-_-_-_-_-_-_-"
_-_-_-_-_-_-_-_-_-_-_-_-_-_-

Start Visual Studio - Create a new Python Application project
Right click Search Paths in the solution.  Add PYTHONPATH to Search Path. You should see see spark, spark\python and py4j
Add a file to test with add
import re
for test_string in ['555-1212', 'ILL-EGAL']:
    if re.match(r'^\d{3}-\d{4}$', test_string):
        print test_string, 'is a valid US local phone number'
    else:
        print test_string, 'rejected'
print 'end of test'
Save and Start with Debugging
Add another file or change the first one.
from pyspark import SparkContext

logFile = "c:\spark\README.md"  # Should be some file on your system
sc = SparkContext("local", "Simple App")

logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "_-_-_-_-_-_-_-_-_-_-_-_-_-_-"
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
print "_-_-_-_-_-_-_-_-_-_-_-_-_-_-"

sc.stop()
 Save and Start with Debugging. The script should run with a lot of output. The final output should be
Lines with a: 60, lines with b: 29

REF:
http://mund-consulting.com/Blog/using-ipython-and-visual-studio-with-apache-spark/

Saturday, March 14, 2015

Some Generic Python Ideas

I am getting a chance to work with Python again. I am finding some features I was unaware of and like.

EXAMPLE 0

Iterating over slices of an array
>>> a = [1,2,3,4,5]
>>> a[::3]
[1, 4]
>>> a[::2]
[1, 3, 5]
>>> a[::5]
[1]
>>> a[::-1]
[5, 4, 3, 2, 1] //reverse order!
>>> b =["this", a, 234.0, 2012-12-12]
>>> b[::-1]
[1988, 234.0, [1, 2, 3, 4, 5], 'this']
>>> b[::2]
['this', 234.0]

This is a generator expression. Not sure why, as it seems like another general iterating thingie.
This is from the Pydocs functional programming page for version 2.

Iterating using a list comprehension
>>> words = "This is a set of fabulant vociferications".split()
>>> words
['This', 'is', 'a', 'set', 'of', 'fabulant', 'vociferications']
>>> letters = [len(word) for word in words]
>>> letters
[4, 2, 1, 3, 2, 8, 15]


EXAMPLE 1
>>> seq1 = 'abc'
>>> seq2 = ('1,2,3,4')
>>> [(x,y) for x in seq1 for y in seq2]

[('a', '1'), ('a', ','), ('a', '2'), ('a', ','), ('a', '3'), ('a', ','), ('a', '4'), ('b', '1'), ('b', ','), ('b', '2'), ('b', ','), ('b', '3'), ('b', ','), ('b', '4'), ('c', '1'), ('c', ','), ('c', '2'), ('c', ','), ('c', '3'), ('c', ','), ('c', '4')]

EXAMPLE 2
>>> seq1 = 'abc'
>>> seq2 = (1,2,3,4)
>>> seq3 = (5,6)
>>> [(x,y,z) for x in seq1 for y in seq2 for z in seq3]
[('a', 1, 5), ('a', 1, 6), ('a', 2, 5), ('a', 2, 6), ('a', 3, 5), ('a', 3, 6), ('a', 4, 5), ('a', 4, 6), ('b', 1, 5), ('
b', 1, 6), ('b', 2, 5), ('b', 2, 6), ('b', 3, 5), ('b', 3, 6), ('b', 4, 5), ('b', 4, 6), ('c', 1, 5), ('c', 1, 6), ('c',
 2, 5), ('c', 2, 6), ('c', 3, 5), ('c', 3, 6), ('c', 4, 5), ('c', 4, 6)]


Different ternary styles can be used.
These evaluate the same.
x=(2,3)[y==3]
x = 3 if (y==3) else 2
or you can branch logic
(func1 if y==2 else func2)(arg1,arg2)
or assignment
x = (class1 if y==2 else class2)(arg1,arg2)

EXAMPLE 4
While loop with nested until conditional logic common to python, using break statement to break a loop.
>>> while True:
...     response = input()
...     if int(response) %7==0:
...             break

EXAMPLE 5
Simple inheritance
class Reader(object):
    def __init__(self,fileName):
        self.fileName = FileName
        self.line

class reader2(Reader):
    def reader(self,fileName):
        try:
            f = open(fileName)
            s = f.readline()
            line = s.strip('a')
        except IOError as e:
            print "Error: {0}:{1}".format(e.errno,e.strerror)
        return line    

EXAMPLE 6
Shows how to default a pass parm, use a regex to detect if a pass parm is a URL, or raise an exception if a required parm is missing. TODO: add an argparse example.

import sys
import re
from urllib.request import urlopen
urlregx = re.compile('(https?:\/\/)([\da-z\.-]+)\.([a-z\.]{2,6})([\/\w \.-]*)*')

def fetch_words(_url):
    """Fetch the list of words from a url or a collection

    Args:
        url: a url to a utf-8 text

    Returns:
        A list of strings

    Tests:
        use 'http://sixty-north.com/c/t.txt' as url
    """
    with urlopen(_url) as story:
        story_words = []
        for line in story:
            line_words = line.decode('utf-8').split()
            for word in line_words:
                story_words.append(word)
    return story_words


def print_items(items):
    """print the items

        Args:
            An iterable series of non-object items
    """
    for item in items:
        print(item)


def main(_url = None):
    if _url == None:
        url = 'http://textfiles.com/anarchy/JOLLYROGER/001.jrc'
    else:
        url = _url
    words = fetch_words(url)
    print_items(words)

if __name__ == '__main__':
    _a = sys.argv
    try:
        x = urlregx.match(_a[1])
        if x == None:
            print(_a[1]+' is not a valid url')
            sys.exit()
        main(_a[1])
    except IndexError: 
        print("from the commandline include a parameter like 'http://textfiles.com/anarchy/JOLLYROGER/003.jrc'")
        sys.exit()

REFS
http://stackoverflow.com/questions/101268/hidden-features-of-python#112303
https://github.com/gregmalcolm/python_koans/wiki

http://ocw.mit.edu/courses/electrical-engineering-and-computer-science/6-189-a-gentle-introduction-to-programming-using-python-january-iap-2011/
https://docs.python.org/2/howto/functional.html

http://www.python-course.eu/lambda.php

FURTHER
How to do this in Python
    [BsonSerializer(typeof(DynamicMongoBsonSerializer))]
    public class DynamicMongoEntity : MongoEntityBase
    {
        public DynamicMongoEntity()
        {
            FieldsAsDynamic = new BsonDocument().ConvertBsonDocumentToDynamicObject();
        }

        /// <summary>
        /// BsonDocument representation of the FieldsAsDynamic property
        ///
        /// GETTER -> Will parse the BsonExtraElements document to json if it hasnt already
        /// SETTER -> Will set the local holder to the updated dynamic object and then deserize the dynamic json back into a BsonDocument for the BsonExtraElements
        /// </summary>
        public override BsonDocument CatchAll
        {
            get { return BsonSerializer.Deserialize<BsonDocument>(JsonConvert.SerializeObject(FieldsAsDynamic)); }
            set { FieldsAsDynamic = value.ConvertBsonDocumentToDynamicObject(); }
        }

        /// <summary>
        /// a c# dynamic representation of the BsonExtraElements document
        /// </summary>
        [BsonIgnore]
        [JsonIgnore]
        public dynamic FieldsAsDynamic { get; set; }

        /// <summary>
        /// This will find a dynamic mongo object in a give collection based on a key and value
        /// </summary>
        /// <param name="collection"></param>
        /// <param name="key"></param>
        /// <param name="value"></param>
        /// <returns></returns>
        public static DynamicMongoEntity FindByDynamicField(MongoCollection collection, string key, object value)
        {
            var dynamicQuery = Query.EQ(key, BsonValue.Create(value));

            var dynamicObject = collection.FindOneAs<DynamicMongoEntity>(dynamicQuery);

            return dynamicObject;
        }

        public static List<DynamicMongoEntity> FindAllByDynamicField(MongoCollection collection, string key, object value)
        {
            var dynamicQuery = Query.EQ(key, BsonValue.Create(value));

            var dynamicObjectList = collection.FindAs<DynamicMongoEntity>(dynamicQuery).ToList();

            return dynamicObjectList;
        }

        public static List<DynamicMongoEntity> FindAllByDynamicFields(MongoCollection collection, string[] keys, object[] values, Func<IEnumerable<IMongoQuery>, IMongoQuery> operatorFunc )
        {
            if (keys.Length != values.Length)
                throw new ArgumentException("Expected equal number of keys and values.");

            IEnumerable<IMongoQuery> queryExpressions = keys.Select((key, i) => Query.EQ(key, BsonValue.Create(values[i]))).ToList();

            var dynamicQuery = operatorFunc(queryExpressions);

            var dynamicObjectList = collection.FindAs<DynamicMongoEntity>(dynamicQuery).ToList();

            return dynamicObjectList;
        }

        public static IMongoQuery QueryDynamicField(string key, object value, Func<string, BsonValue, IMongoQuery> func)
        {
            return func(key, BsonValue.Create(value));
        }
    }

EXCEPTION HANDLING

#! python
# code page

import sys

"""module for exception testing"""

def convert(s):
        """converts to int"""
        x = -1.0
        try:
                x = int(s)
        except (ValueError,TypeError) as e:
                print("Conversion error: {}"\
                      .format(str(e)),file=sys.stderr)
        except:
                print("Life sucks then you die")
        return x

ITERATION EXAMPLES
iterable = ['Summer','Spring','Autumn','Winter']
iterator = iter(iterable)
for each in iterator:
...     next(iterator)Where next and iter are built ins and iterable and iterator are instance variables