Spark 1.4 Released

On June 11, the Spark team announced availability of Release 1.4.  More than 210 contributors from 70 different organizations contributed more than 1,000 patches.  Spark continues to expand its contributor base, the best measure of health for an open source project.

Screen Shot 2015-06-12 at 2.00.20 PM

Spark Core

The Spark team continues to improve Spark operability, performance and compatibility.  Key enhancements include:

  • The first phase in Project Tungsten performance improvements, a cache-friendly sort algorithm
  • Also for improved performance, serialized shuffle output
  • For the Spark UI, visualization for Spark DAGs and operational monitoring
  • A REST API for application information, such as job, stage, task and storage status
  • For Python users, support for Python 3.x, plus external spilling for Python groupByKey operations
  • Two YARN enhancements: support for YARN on EC2 and security for long-running YARN applications
  • Two Mesos enhancements: Docker support and cluster mode.

DataFrames and SQL

This release includes extensions of analytic functions for DataFrames, operational utilities for Spark SQL and support for ORCFile format.

A complete list of enhancements to the DataFrame API is here.

R Interface

AMPLab released a developer version of SparkR in January 2014.  In June 2014, Alteryx and Databricks announced a partnership to lead development of this component.  In March, 2015, SparkR officially merged into Spark.

SparkR offers an interface to use Apache Spark from R.  In Spark 1.4, SparkR supports operations like selection, filtering and aggregation on large datasets.  It’s important to note that as of this release SparkR does not support an interface to MLLib, Streaming or GraphX.

Machine Learning

In Spark 1.4, ML pipelines graduate from alpha release, add feature transformations (Vector Assembler, String Indexer, Bucketizer etc.) and a Python API.  Additional enhancements to ML include:

There appears to be an effort under way to rebuild MLLib’s supervised learning algorithms in ML.

Enhancements to MLLib include:

There is a single enhancement to GraphX in Spark 1.4, a personalized PageRank.  Spark’s graph analytics capabilities are comparatively static.


The enhancements to Spark Streaming include improvements to the UI plus enhanced support for Kafka and Kinesis and a pluggable interface for write ahead logs.  Enhanced support for Kafka includes better error reporting, support for Kafka and Kafka with Scala 2.11, input rate tracking and a Python API for Kakfa direct mode.

Spark Summit East: A Report (Updated)

Updated with links to slides where available.  Some links are broken, conference organizers have been notified.

Spark Summit East 2015 met on March 18 and 19 at the Sheraton Times Square in New York City.  Conference organizers announced another sellout (like the last two Spark Summits on the West Coast).

Competition for speaking slots at Spark events is heating up.  There were 170 submissions for 30 speaking slots at this event, compared to 85 submissions for 50 slots at Spark Summit 2014.  Compared to the last Spark Summit, presentations in the Applications Track, which I attended, were more polished, and demonstrate real progress in putting Spark to work.

The “father” of Spark, Matei Zaharia, kicked off the conference with a review of Spark progress in 2014 and planned enhancements for 2015.  Highlights of 2014 include:

  • Growth in contributors, from 150 to 500
  • Growth in the code base, from 190K lines to 370K lines
  • More than 500 known production instances at the close of 2014

Spark remains the most active project in the Hadoop ecosystem.

Also, in 2014, a team at Databricks smashed the Daytona GreySort record for petabyte-scale sorting.  The previous record, set in 2013, used MapReduce running on 2,100 machines to complete the task in 72 minutes.  The new record, set by Databricks with Spark running in the cloud, used 207 machines to complete the task in 23 minutes.

Key enhancements projected for 2015 include:

  • DataFrames, which are similar to frames in R, already released in Spark 1.3
  • R interface, which currently exists as SparkR, an independent project, targeted to be merged into Spark 1.4 in June
  • Enhancements to machine learning pipelines, which are sequences of tasks linked together into a process
  • Continued expansion of smart interfaces to external data sources, pushing logic into the sources
  • Spark packages — a repository for third-party packages (comparable to CRAN)

Databricks CEO Ion Stoica followed with a pitch for Databricks Cloud, which included brief testimonials from myfitnesspal, Automatic, Zoomdata, Uncharted Software and Tresata.

Additional keynoters included Brian Schimpf of Palantir, Matthew Glickman of Goldman Sachs and Peter Wang of Continuum Analytics.

Spark contributors presented detailed views on the current state of Spark:

  • Michael Armbrust, Spark SQL lead developer presented on the new DataFrames API and other enhancements to Spark SQL.
  • Tathagata Das delivered a talk on the current state and future of Spark Streaming.
  • Joseph Bradley covered MLLib, focusing on the Pipelines capability added in Spark 1.2
  • Ankur Dave offered an overview of GraphX, Spark’s graph engine.

Several observations from the Applications track:

(1) Geospatial applications had a strong presence.

  • Automatic, Tresata and Uncharted all showed live demonstrations of marketable products with geospatial components running on Spark
  • Mansour Raad of ESRI followed his boffo performance at Strata/Hadoop World last October with a virtuoso demonstration of Spark with massive spatial and temporal datasets and the ESRI open source GIS stack

(2) Spark provides a great platform for recommendation engines.

  • Comcast uses Spark to serve personalized recommendations based on analysis of billions of machine-generated events
  • Gilt Groupe uses Spark for a similar real-time application supporting flash sale events, where products are available for a limited time and in limited quantities
  • Leah McGuire of Salesforce described her work building a recommendation system using Spark

(3) Spark is gaining credibility in retail banking.

  • Sandy Ryza of Cloudera presented on Value At Risk (VAR) computations in Spark, a critical element in Basel reporting and stress testing
  • Startup Tresata demonstrated its application for Anti Money Laundering, which is built on a social graph built in Spark

(4) Spark has traction in the life sciences

  • Jeremy Freeman of HHMI Janelia Research Center, a regular presenter at Spark Summits, covered Spark’s unique capability for streaming machine learning.
  • David Tester of Novartis presented plans to build a trillion-edge graph for genomic integration
  • Timothy Danforth of Berkeley’s AMPLab delivered a presentation on next-generation genomics with Spark and ADAM
  • Kevin Mader of ETH Zurich spoke about turning big hairy 3D images into simple, robust, reproducible numbers without resorting to black boxes or magic

Also in the applications track: presenters from Baidu, myfitnesspal and Shopify.

Distributed Analytics: A Primer

Can we leverage distributed computing for machine learning and predictive analytics? The question keeps surfacing in different contexts, so I thought I’d take a few minutes to write an overview of the topic.

The question is important for four reasons:

  • Source data for analytics frequently resides in distributed data platforms, such as MPP appliances or Hadoop;
  • In many cases, the volume of data needed for analysis is too large to fit into memory on a single machine;
  • Growing computational volume and complexity requires more throughput than we can achieve with single-threaded processing;
  • Vendors make misleading claims about distributed analytics in the platforms they promote.

First, a quick definition of terms.  We use the term parallel computing to mean the general practice of dividing a task into smaller units and performing them in parallel; multi-threaded processing means the ability of a software program to run multiple threads (where resources are available); and distributed computing means the ability to spread processing across multiple physical or virtual machines.

The principal benefit of parallel computing is speed and scalability; if it takes a worker one hour to make one hundred widgets, one hundred workers can make ten thousand widgets in an hour (ceteris paribus, as economists like to say).  Multi-threaded processing is better than single-threaded processing, but shared memory and machine architecture impose a constraint on potential speedup and scalability.  In principle, distributed computing can scale out without limit.

The ability to parallelize a task is inherent in the definition of the task itself.  Some tasks are easy to parallelize, because computations performed by each worker are independent of all other workers, and the desired result set is a simple combination of the results from each worker; we call these tasks embarrassingly parallel.   A SQL Select query is embarrassingly parallel; so is model scoring; so are many of the tasks in a text mining process, such as word filtering and stemming.

A second class of tasks requires a little more effort to parallelize.  For these tasks, computations performed by each worker are independent of all other workers, and the desired result set is a linear combination of the results from each worker.  For example, we can parallelize computation of the mean of a distributed database by computing the mean and row count independently for each worker, then compute the grand mean as the weighted mean of the worker means.  We call these tasks linear parallel.

There is a third class of tasks, which is harder to parallelize because the data must be organized in a meaningful way.  We call a task data parallel if computations performed by each worker are independent of all other workers so long as each worker has a “meaningful” chunk of the data.  For example, suppose that we want to build independent time series forecasts for each of three hundred retail stores, and our model includes no cross-effects among stores; if we can organize the data so that each worker has all of the data for one and only one store, the problem will be embarrassingly parallel and we can distribute computing to as many as three hundred workers.

While data parallel problems may seem to be a natural application for processing inside an MPP database or Hadoop, there are two constraints to consider.  For a task to be data parallel, the data must be organized in chunks that align with the business problem.  Data stored in distributed databases rarely meets this requirement, so the data must be shuffled and reorganized prior to analytic processing, a process that adds latency.  The second constraint is that the optimal number of workers depends on the problem; in the retail forecasting problem cited above, the optimal number of workers is three hundred.  This rarely aligns with the number of nodes in a distributed database or Hadoop cluster.

There is no generally agreed label for tasks that are the opposite of embarrassingly parallel; for convenience, I use the term orthogonal to describe a task that cannot be parallelized at all.  In analytics, case-based reasoning is the best example of this, as the method works by examining individual cases in a sequence.  Most machine learning and predictive analytics algorithms fall into a middle ground of complex parallelism; it is possible to divide the data into “chunks” for processing by distributed workers, but workers must communicate with one another, multiple iterations may be required and the desired result is a complex combination of results from individual workers.

Software for complex machine learning tasks must be expressly designed and coded to support distributed processing.  While it is physically possible to install open source R or Python in a distributed environment (such as Hadoop), machine learning packages for these languages run locally on each node in the cluster.  For example, if you install open source R on each node in a twenty-four node Hadoop cluster and try to run logistic regression you will end up with twenty-four logistic regression models developed separately for each node.  You may be able to use those results in some way, but you will have to program the combination yourself.

Legacy commercial tools for advanced analytics provide only limited support for parallel and distributed processing.  SAS has more than 300 procedures in its legacy Base and STAT software packages; only a handful of these support multi-threaded (SMP) operations on a single machine;  nine PROCs can support distributed processing (but only if the customer licenses an additional product, SAS High-Performance Statistics).  IBM SPSS Modeler Server supports multi-threaded processing but not distributed processing; the same is true for Statistica.

The table below shows currently available distributed platforms for predictive analytics; the table is complete as of this writing (to the best of my knowledge).

Distributed Analytics Software, May 2014

Several observations about the contents of this table:

(1) There is currently no software for distributed analytics that runs on all distributed platforms.

(2) SAS can deploy its proprietary framework on a number of different platforms, but it is co-located and does not run inside MPP databases.  Although SAS claims to support HPA in Hadoop, it seems to have some difficulty executing on this claim, and is unable to describe even generic customer success stories.

(3) Some products, such as Netezza and Oracle, aren’t portable at all.

(4) In theory, MADLib should run in any SQL environment, but Pivotal database appears to be the primary platform.

To summarize key points:

— The ability to parallelize a task is inherent in the definition of the task itself.

— Most “learning” tasks in advanced analytics tasks are not embarrassingly parallel.

— Running a piece of software on a distributed platform is not the same as running it in distributed mode.  Unless the software is expressly written to support distributed processing, it will run locally, and the user will have to figure out how to combine the results from distributed workers.

Vendors who claim that their distributed data platform can perform advanced analytics with open source R or Python packages without extra programming are confusing predictive model “learning” with simpler tasks, such as scoring or SQL queries.