Category Archives: Hadoop

Data processing with Spark in R & Python

I recently gave a talk on data processing with Apache Spark using R and Python. tl;dr – the slides and presentation can be accessed below:

https://www.brighttalk.com/webcast/9059/172833

As noted in my previous post, Spark has become the defacto standard for big data applications and has been adopted quickly by the industry. See Cloudera’s  One Platform initiative blog post by CEO Mike Olson for their commitment to Spark.

In data science R had seen rapid adoption, not only because it was open source and free compared to costly SAS, but also the huge number of statistical and graphical packages provided by R for data science. The most popular ones of course are the ones from Hadley Wickham (dplyr, ggplot2, reshape2, tidyr and more). On the other hand, Python had seen rapid adoption among developers and engineers due to its being useful to script big data tasks along with data analysis with the help of packages like pandas, scikit-learn, NumPy, SciPy, matplotlib etc. and also the popular iPython & later Jupyter notebooks.

There are numerous posts strewn on the net picking fights between R and Python. However it is quite usual for any big data and data science shop to have developers and data scientists who use either or both these tools. Spark makes it easy for both communities to leverage the power of Hadoop and distributed processing systems with its own APIs like DataFrames which can be used in a polyglot fashion. Therefore it is essential for any data enthusiast to learn about how data processing in Spark can be done using R or Python.

Advertisements

Why Spark is the big data platform of the future

Apache Spark has created a lot of buzz recently. In fact, beyond the buzz, Apache Spark has seen phenomenal adoption and has been marked out as the successor to Hadoop MapReduce.

Apache Spark

Apache Spark

Google Trends confirms the hockey stick like growth in interest in Apache Spark.  All leading Hadoop vendors, including Cloudera, now include Apache Spark in their Hadoop distribution.

GoogleTrends - Apache Spark

GoogleTrends – Interest in Apache Spark

So what exactly is Spark, and why has it generated such enthusiasm? Apache Spark is an open-source big data processing framework designed for speed and ease of use.  Spark is well-known for its in-memory performance, but that has also given rise to misconceptions about its on-disk abilities. Spark is in fact a general execution engine – which has a greatly improved performance both in-memory as well as on-disk, when compared with older frameworks like MapReduce. With its advanced DAG (directed acyclic graph) execution engine, Spark can run programs up to 100x faster than MapReduce in memory, or 10x faster on-disk. Why is Spark faster than MapReduce?

  • A key step during MapReduce operations is the synchronization or  “shuffle” step, intermediate between the “map”-step and the “reduce”-step. Apache Spark implements a sort-based shuffle design, which improves performance.
  • Apache Spark also includes a DAG (directed-acyclic graph) which allow developers to execute DAGs all at once, not step by step. This eliminates the costly synchronization required by MapReduce. Note that DAGs are also used by Storm and Tez
  • Spark supports in-memory data sharing across DAGs, so different jobs can work with the same data at a very high speed.

It’s important to remember that Hadoop is a decade-old technology, developed at a time when memory was still relatively expensive, and therefore took the design approach of persistence to disk as a way of maintaining state between execution steps. On the other hand, Spark was developed at UC Berkeley AMPLab in 2009 and then it was open-sourced in 2010 – when memory had become much cheaper. Therefore, Spark stores data in memory and transparently persists it to disk if needed, thereby achieving better performance. The core concept of Spark is this programming abstraction over data storage – called RDDs (Resilient Distributed Dataset). Under the hood, Spark automatically distributes the data contained in RDDs across the cluster and parallelizes the operations performed on them.

Word-count code in Spark

Word-count code in Spark

The end result is that, on an average – the lines of code required to develop a distributed data processing program is much less in Spark, when compared with MapReduce. See more details on why Spark is the fastest open-source engine for sorting a petabyte. Clearly, faster execution has been one of the key reasons for the uptake of Spark, but Spark also provides further advantages. Similar to YARN, the upgrade of the Hadoop framework over the MapReduce-only version, Spark allows a wide range of workloads from batch to interactive and streaming. It reduces the burden of maintaining separate tools as in Hadoop – and provides APIs in Scala, Java, Python and SQL.  Spark can run over a variety of cluster-managers, including Hadoop YARN, Apache Mesos, and Spark’s own standalone scheduler. Spark components

Spark-components

Spark-components

Spark Core – provides basic functionality of Apache Spark, including RDDs and APIs to manipulate them. Spark SQL – A new component which replaces the older Shark (SQL on Spark) project, this package provides better integration with Spark Core, it allows querying data through SQL and HiveQL and supports many data sources from Hive tables, Parquet and JSON. Spark SQL also allows developers to intermix SQL queries with the code for data manipulations with RDDs in Python, Java, and Scala. It also provides fast SQL connectivity to BI tools like Tableau or QlikView.

SparkSQL

SparkSQL

Spark Streaming – based on micro-batching, this component enables processing of real-time streaming data. It uses DStreams, which are series of RDDs, to process real-time data. The Spark Streaming API is very similar to the Spark Core RDD APIs, making it easy for developers to reuse and adapt code for batch to interactive or real-time applications. MLlib – provides a library of machine learning algorithms including classification, regression, clustering, and collaborative filtering, as well as model evaluation and data import. GraphX – provides an API for graphs and graph-parallel computations and operators for manipulating graphs and a library of graph algorithms. The SparkR project aims to provide a light-weight front-end to use Apache Spark from R. Work is on to integrate SparkR into Spark. Recently, Spark has introduced a dataframe library with R/Pandas syntax for use across all of the Spark language APIs and an ML pipeline API which also integrates with data frames. Spark adoption is increasing manifold, boosted by increased third-part vendor support. Databricks – the company spun out of AMPLab by the creators of Apache Spark, now provides Spark as a service on the cloud – with its own Databricks Cloud – which is in private beta. The Databricks cloud is designed to support data science in the lab as well as in the factory – by creating polyglot notebooks (mix of Scala/Java/Python/SQL possible) and building production pipelines for ETL and analytics jobs. Tableau and MemSQL have provided Spark connectors, Altiscale now provides Spark in the cloud and machine learning vendors like Nube are building products like Reifier to perform entity resolution and de-duplication using Spark. ClearStory Data provides Spark-based data processing and analytics. There is also a fledgling community of packages for Apache Spark. Big data and data science projects are complex with an increasing diverse toolset which require massive integration efforts. Greater flexibility than that provided by MapReduce, capability to support a variety of workloads and a simpler, more unified ecosystem of tools which work out of the box on a general execution engine (Apache Spark) thus provide better simplicity than the complex zoo of Hadoop MapReduce projects. Together with SparkSQL and dataframes library, Spark democratizes access to distributed data processing beyond MapReduce programmers extending it to other developers and business analysts. Over and above, considering the fast performance of Spark, it is no wonder that Apache Spark continues to gain traction and looks all set to be the default framework for Big data processing in the near future. More info:

Read the series on Big Data: Part-1 : Basics, Part-2 : Hadoop, Part-3 : Hadoop data warehouse and Part-4 : NoSQL

Set up a Hadoop Spark cluster in 10 minutes with Vagrant

With each of the big 3 Hadoop vendors – Cloudera, Hortonworks and MapR each providing their own Hadoop sandbox virtual machines (VMs), trying out Hadoop today has become extremely easy. For a developer, it is extremely useful to download a get started with one of these VMs and try out Hadoop to practice data science right away.

Vagrant Hadoop Spark Cluster

Set up a Hadoop-Spark cluster with Vagrant in 10 minutes

However, with the core Apache Hadoop, these vendors package their own software into their distributions, mostly for the orchestration and management, which can be a pain due to the multiple scattered open-source projects within the Hadoop ecosystem. e.g. Hortonworks includes the open-source Ambari while Cloudera includes its own Cloudera Manager for orchestrating Hadoop installations and managing multi-node clusters.

Moreover, most of these distributions require today a 64-bit machine and sometimes a high-amount of memory (for a laptop). e.g. running Cloudera Manager with a full-blown Cloudera Hadoop Distribution (CDH) 5.x requires at least 10GB RAM. For a developer with a laptop, RAM is always at a premium, hence it may seem easier to try out the vanilla Apache Hadoop downloads for installations. The documentation for Hadoop for installing a single-node cluster, and even a multi-node cluster is much improved nowadays, but with the hassles of downloading the distributions and setting up SSH, it can easily take up a long-time to effectively set up a useful multi-node cluster. The overhead of setting up and running multiple VMs can also be a challenge. The vanilla distributions also require separate installations for UI (Cloudera Hue being a nice one) and job tracking (Oozie) or orchestration (Ambari). Unfortunately Ambari works only with select versions of HDP (Hortonwork’s distribution of Hadoop), and configuring Oozie with disparate versions of Hadoop, Java and other libraries can be a real pain.

One of the solutions to this problem is to use a container-based approach to installation. Hadoop clusters can be setup with LXC (Linux containers) approach, e.g. with the very popular Docker. There are also other approaches with using Puppet, Ansible, Chef and Salt which allow easy installations. One of the simpler approaches that I tried apart from vanilla Hadoop is using Vagrant. Indeed setting up VMs with Vagrant is a breeze, and with a vagrant script (written in Ruby), setting up a multi-node cluster is very quick. In fact you can get started with a Hadoop and Spark multi-node cluster in less than 10 minutes.

Check out the project on Github – it’s adapted from Jee Vang’s excellent Vagrant project to allow for 32-bit machines,  speed-up with pre-downloads of Hadoop, Spark and Java, and includes an updated Readme with script change locations detailed.

Basics of Big Data – Building a Hadoop data warehouse

This is the 3rd part of a series of posts on Big Data. Read Part-1 (What is Big Data) and Part-2 (Hadoop).

Traditionally data warehouses have been built with relational databases as backbone. With the new challenges (3Vs) of Big Data, relational databases have been falling short of the requirements of handling

  • New data types (unstructured data)
  • Extended analytic processing
  • Throughput (TB/hour loading) with immediate query access

The industry has turned to Hadoop as a disruptive solution for these very challenges.

The new Hadoop architecture (courtesy Hortonworks):

Hadoop 2.0 with YARN

Hadoop 2.0 with YARN

Comparing the RDBMS and Hadoop data warehousing stack:

The heavy-lifting for any data warehouse is the ETL (Extract-Transform-Load) or ELT processing. The processing layers involved in the data warehousing ETL are different across conventional RDBMS and Hadoop.

Layer Conventional RDBMS Hadoop Advantages of Hadoop over conventional RDBMS
Storage Database tables HDFS file system HDFS is purpose-built for extreme IO speeds
Metadata System tables HCatalog All clients can use HCatalog to read files.
Query SQL query engine Multiple engines (SQL and non-SQL) Multiple query engines like Hive or Impala are available.

The Hadoop USP: Exploratory analytics

All 3 layers in a conventional RDBMS are glued together into a proprietary bundle unlike Hadoop where each layer is independent and separate, allowing multiple access. Apart from these 3 layers, Hadoop provides an important advantage for exploratory BI in a single step from data load to query, which is not available in conventional RDBMS.

The data-load-to-query in one step involves:

1. Copy data into HDFS with ETL tool (e.g. Informatica), Sqoop or Flume into standard HDFS files (write once). This registers the metadata with HCatalog.

2. Declare the query schema in Hive or Impala, which doesn’t require data copying or re-loading, due to the schema-on-read advantage of Hadoop compared with schema-on-write constraint in RDBMS.

3. Explore with SQL queries and launching BI tools e.g. Tableau, BusinessObjects for exploratory analytics.

Other data warehousing use cases with Hadoop:

1. High performance DW using Parquet columnar file:

The Parquet file format has a columnar storage layout with flexible compression options and its layout is optimized for queries that process large volumes of data. It is accessible to multiple query and analysis applications and can be updated and have the schema modified. A high performance DW can be created by copying raw data from raw HDFS into Parquet files which can then be queried in real-time using Hive or Cloudera Impala.

2. Platform for transforming data or ETL:

The (extract-transform-load) ETL or ELT (extract-load-transform) use case for Hadoop is well established. The reasons why Hadoop is a popular ETL platform are:

  • Hadoop itself is a general-purpose massively parallel processing (MPP) platform.
  • Hadoop’s NoSQL database – it’s flexible schema-on-read offers a relaxed alternative to the rigid strongly-typed schema-on-write model of a relational data warehouse.
  • Hadoop is highly cost-effective (up to 50 to 100 times cheaper) compared to conventional relational data warehouse on a per-Terabyte basis, due to its commodity hardware distributed architecture (low-cost scale-out) compared to the relatively high-end infrastructure required for conventional systems (high-cost scale-up).
  • Higher performance on a per-core basis (CPU processing power) allows Hadoop to beat most conventional ETL systems.
  • Most ETL vendors already market versions of their software which leverage Hadoop, whether this is through using Hadoop connectors (e.g. Oracle Data Integrator) or ETL-optimized libraries for MapReduce (e.g. Syncsort DMX-h).

Most such ETL tools allows existing developers to build Hadoop ETL without having to code or script in MapReduce with Java or Pig. Several vendors also provide ETL-on-Hadoop automation, and a rich user experience (UX) with drag-and-drop design of jobs or workflows, and even native integration taking advantage of (Yet Another Resource Negotiator) YARN in Hadoop v2.0

3. Advanced analytics:

Predictive analytics, statistics and other categories of advanced analytics produce insights that traditional BI approaches like query and reporting may find unlikely to discover. Advanced analytics platforms and tools become more important when managing big data volumes due to issues of scalability and performance.

Strategic applications, like data mining, machine learning and analytics on new types of data, including unstructured data, with much improved performance due to Hadoop’s distributed architecture also allow for new types of integrated analysis previously not possible on relational database platforms. Several newer tools and vendors support using statistical packages and languages like R (and SAS or SPSS) for big data analytics.

4. DW offloading:

Due to low storage costs (50 to 100 times less on a per-TB basis), Hadoop is well suited to keeping data online for an indefinite period of time. This is also known as a data lake or an enterprise data hub, where various types of data can be kept till newer use cases are discovered for such data. Such storage also allows for “active archival” of infrequently used data from (Enterprise Data Warehouse) EDW, thus allowing only the most valuable (and usually most recent) data to be kept in the EDW.

With ELT loads driving up to 80% of database capacity, Hadoop can also be used as a staging area for data preparation and ELT to allow offloading data processing from the EDW

With the big data deluge, and its 3 equally challenging dimensions of volume, velocity and variety – existing conventional platforms are finding it difficult to meet all of an organization’s data warehousing needs along with ETL processing times and availability SLAs. The balance of power keeps tilting towards Hadoop with newer tools and appliances extending the capabilities of Hadoop and with its superior price/performance ratio, building a data warehouse leveraging Hadoop needs to be given serious consideration.

Read the series on Big Data: Part-1 : Basics, Part-2 : Hadoop, Part-3 : Hadoop data warehouse and Part-4 : NoSQL

Basics of Big Data – Part 2 – Hadoop

As discussed in Part 1 of this series, Hadoop is the foremost among tools being currently used for deriving value out of Big Data. The process of gaining insights from data through Business Intelligence and analytics essentially remains the same. However, with the huge variety, volume and velocity (the 3Vs of Big Data), it’s become necessary to re-think of the data management infrastructure. Hadoop, originally designed to be used with the MapReduce algorithm to solve parallel processing constraints in distributed architectures (e.g. web indexing) of web giants like Yahoo or Google, has become the de-facto standard for Big Data (large-scale data-intensive) analytics platforms.

What is Hadoop?

Think of Hadoop as an operating system for Big Data. It is essentially a flexible and available architecture for large scale computation and data processing on a network of commodity hardware.

Conceptually, the key components of the Java-based Hadoop framework are a file store and a distributed processing system:

1. Hadoop Distributed File System (HDFS): provides reliable (fault-tolerant), scalable, low-cost storage

2. MapReduce: Batch-oriented, distributed (parallel) data processing system with resource management and scheduling

As of October 2013, the 2.x GA release of Apache Hadoop also included an enhancement – a key third component:

3. YARN: a general purpose resource management system for Hadoop to allow MapReduce and other data processing services

Hadoop architecture stack

Open-source Hadoop is an Apache project. There are however commercial distributions of Hadoop (similar to UNIX distros) most notably from Cloudera, Hortonworks, MapR, IBM, Amazon etc. The Hadoop ecosystem has several projects in development, seeking to enhance the Hadoop framework to make it more suited to performing Big Data tasks including ETL and analytics.

The key components of the Hadoop distribution:

1. Distributed file system and storage – HDFS

HDFS – a Java based file system providing scalable and reliable data storage, designed to span large clusters of commodity servers

2. Data integration – Flume, Sqoop

Flume – service for integrating large amounts of streaming data (e.g. logs) into HDFS

Sqoop – tool for transferring bulk data between Hadoop and structured databases e.g. RDBMSes

3. Data access – HBase, Hive, Pig, Impala (CDH version for interactive SQL query), Storm, MapReduce jobs in Java/Python etc.

HBase – a non-relational (NoSQL) columnar database running on top of HDFS.

Hive – a data warehouse infrastructure built on Hadoop, providing a mechanism to project structure onto the data  and query it using SQL like language – HiveQL

Pig – allows writing complex MapReduce jobs using a scripting language – PigLatin

Impala – SQL query engine running natively in Hadoop, allows querying data in HDFS and HBase. It is part of Cloudera’s CDH distribution.

Storm – provides real-time data processing capabilities to Hadoop which is traditionally batch oriented (based on MapReduce).

4. Operations– Oozie, Ambari, ZooKeeper

Oozie – Java web application used to schedule Hadoop jobs

Ambari – Framework and tools to provision, manage and monitor Hadoop clusters

ZooKeeper – provides operational services for Hadoop – e.g. distributed configuration service, named registry, synchronization service etc.

5. Resource management – YARN

YARN – separates the resource management and processing components in Hadoop 2.x which used to be done in MapReduce packages in Hadoop 1.x

A schematic of Cloudera’s Hadoop distribution (CDH) is shown below:

CDH

Why Hadoop?

Hadoop has gained immense traction in a very short amount of time and is proving useful in a range of applications, including deriving insights from Big Data analytics.

The key advantages of Hadoop as a data processing platform are:

1. Scalability and availability – Due to its ability to store and distribute extremely large datasets across hundreds of inexpensive servers operating in parallel, Hadoop offers extreme scalability. With high-availability HDFS feature in Hadoop 2.0 providing redundant namenodes for standby and failover, Hadoop now also provides high availability

2. Cost-effectiveness – Due to its design incorporating fault-tolerance and scale-out architecture, Hadoop clusters can be built with relatively inexpensive commodity hardware instead of costly blade servers, thereby providing great savings for storage and computing abilities on a per TB basis.

3. Resilience – With built-in fault tolerance, e.g. multiple copies of data replicated on cluster nodes, and with high availability HDFS in version 2.0, Hadoop provides cost-effective resilience to faults and data loss.

4. Flexibility and performance – Ability to access and store various types of data – both structured and unstructured, with no constraints of schema-on-write, along with the emergence of new ways of accessing and processing data – e.g. Storm for real-time/streaming data, SQL-like tools including Impala, Hadapt, Stinger etc.

Due to these key advantages, Hadoop lends itself to several data processing use cases. Key use cases are:

1. Data store / Enterprise data warehouse (EDW) – cost-effective storage for all of an organization’s ever expanding data

2. Active archive – allowing cost-effective querying on historical data from archival systems

3. Transformation – executing data transformations (T step of ETL/ELT) for improved throughput and performance

4. Exploration – allows fast exploration and quicker insights from new questions and use cases, taking advantage of Hadoop’s schema-on-read model instead of schema-on-write models of traditional relational databases

5. Real-time applications – usage of flexible add-ons like Storm to provide dynamic data mashups

6. Machine learning, data mining, predictive modeling and advanced statistics

The early adopters of Hadoop are the web giants like Facebook, Yahoo, Google, LinkedIn, Twitter etc.

Facebook uses Hadoop – Hive and HBase for data warehousing (over 300 PB in aggregate and over 600 TB daily data inflows) and real-time application, serving up dynamic pages customized for each of its over 1.2 billion users.

Yahoo uses Hadoop and Pig for data processing and analytics, web search, email antispam and ad serving with more than 100,000 CPUs in over 40,000 servers running Hadoop with 170 PB of storage .

Google had used MapReduce to create its web index from crawl data and also uses Hadoop clusters on its cloud platform with Google Compute Engine (GCE).

LinkedIn uses Hadoop for data storage and analytics driving personalized recommendations like “People you may know” and ad targeting.

Twitter uses Hadoop – Pig and HBase for data visualization, social graph analysis and machine learning.

Limitations of Hadoop

While Hadoop is the most well-known Big Data solution, it is just one of the components in the Big Data landscape. While in theory, Hadoop is infinitely scalable and resilient and allows a great deal of flexibility in storing structured and unstructured data, in practice, there are several considerations to be taken care of while architecting Hadoop clusters due to the inherent limitations of Hadoop.

1. Workloads – Hadoop is suitable for various types of workloads, however mixed workloads or situations where the workload may vary widely or is not known ahead, makes it difficult to optimize the Hadoop architecture.

2. Integration – Hadoop should not be a stand-alone solution, else it will quickly become a data silo unconnected with the rest of the data management infrastructure. The Hadoop strategy needs to fit into the overall data management and processing framework of the organization to allow for growth and maintenance while not sacrificing on flexibility and agility

3. Security – In the enterprise, security is a big deal. While Hadoop was originally built without a security model, the Hadoop ecosystem is evolving with various projects for security, including Kerberos authentication, the Sentry offering from Cloudera, Project Rhino from Intel, Apache Knox as reverse proxy (with contribution from Hortonworks) or using Apache Accumulo for cell-level security;  however most are complex to setup and there is still no reference standard across deployments.

4. Complexity – the complexity of Hadoop as a Big Data platform lies in its evolving ecosystem of newer technologies, with most data warehousing and analytics specialists skilled in traditional relational databases, SQL and techniques which are difficult to use on Hadoop due to the lack of tools (e.g. still evolving SQL access) and the need for additional skills including data mining or advanced statistical techniques.

5. Availability – Up until the 2.0 release, Hadoop with single-master nodes in HDFS and MapReduce was subject to single point of failure.

6. Inefficiency – HDFS is inefficient for handling small files thereby making analysis on smaller datasets extremely inefficient. This is especially painful while designing models or finding patterns on smaller datasets. MapReduce is also a batch-oriented architecture not suitable for real-time access, but this is being addressed with tools like Storm. Tools like Impala provide interactive SQL-like querying on HDFS, which helps in improving quick adhoc analysis on smaller datasets.

7. Processing framework – Not all data processing problems or analytic questions can be designed with the MapReduce framework. Hadoop is therefore ill suited for such problems which cannot be expressed as problems with Map and Reduce steps and need other data processing paradigms. There are improvements being developed with Storm for real-time access or Spark for improving the data analytics performance with in-memory distributed computing to get around these issues.

In the next parts of this series, I will explore topics of building a Hadoop data warehouse, big data analytics with tools like R as well as other Big Data solutions, Hadoop enhancements  and alternatives to Hadoop.

Read the series on Big Data: Part-1 : Basics, Part-2 : Hadoop, Part-3 : Hadoop data warehouse and Part-4 : NoSQL