Wednesday, 13 September 2017

A Brief Introduction to the Architecture of Cloudera Impala

In this era of technological advancements, time and velocity are the parameters which describe the effectiveness of any product. When we are talking about Big Data, we not only deal with the huge volume of data but also the velocity. Big data processing is done on Hadoop which provides batch and real time processing.

 Apache Hive is a tool which facilitates a SQL-like interface to query the data present in HDFS. Though hive fulfills the requirements of managing large datasets, it comes at a cost of low latency. Queries running on Hive triggers a MapReduce job which takes a lot to time(sometimes from minutes to hours). To overcome this drawback of Hive, Cloudera built a distributed system, also known as Cloudera Impala, which executes on the top of  Hive warehouse and produces quick results. 

Impala is a massively parallel processing database engine which is widely used to execute analytic queries on huge data sets. Impala is compatible with HiveQL syntax. One can use Impala or Hive to read, write and manage data.

Let us dive into Impala Architecture:

At a high level, Impala consists of the following:
  • The Impala Daemon
  • The Impala Statestore
  • The Impala Catalog Service
The Impala Daemon: Impala consists of a daemon process. This daemon process runs on each DataNode of a cluster. These processes are represented by the impalad process.

Impala daemon is responsible for the following:
  1. Read and write to data files.
  2. Accept the queries from the Impala-shell, JDBC or ODBC.
  3. Parallelize the queries and distribute work across the cluster.
  4. Transmit the intermediate query result back to the central coordinator node.
Note - The central coordinator node is always the daemon instance where the query is submitted. When other nodes complete the computation, results are sent back to the central coordinator node.
  • Impala daemons are in constant communication with the statestore to get the details about the condition of the nodes.
  • Impala daemons also communicate with the catalogd daemons to get the latest updates about the ongoing activities in the cluster like create, alter or drop statements. Also, the load and insert updates are communicated to the Impala daemons. This minimizes the need of a frequent refresh or invalidate metadata statements. This facility was not available prior to Impala 1.2.


The Impala Statestore: Impala statestore is represented by a daemon process named statestored. It checks the health of all the datanodes in a cluster. If an Impala daemon goes offline due to some hardware failures or other issues, the statestore informs all the other Impala daemons. This is useful as Impala daemons avoid making requests to the unavailable node. 


The Impala Catalog Service: It is represented by a daemon process named catalogd. It conveys the metadata changes from an Impala SQL statement to all the datanodes in a cluster. Such process is required only on one host in a cluster.

Facts about Catalog and Statestore daemons:
  • They don't have a special requirement or high availability as it doesn't result in the data lost when offline.
  • If Catalog daemon and Statestore daemons are unavailable:
    1. Stop Impala Service.
    2. Delete Impala Statestore and Impala Catalog server roles.
    3. Add a role on a different host.
    4. Restart the impala services.


This is all about the impala architecture. In the next blog, I would be sharing a quick start guide to Impala that will contain all the basic functions and commands that are frequently used in Impala.

Stay tuned for future posts. :)





Saturday, 22 April 2017

An Introduction to Spark Streaming : Beginner's Guide (Included a working example on Eclipse IDE)

Before diving into Spark Streaming, let us have a quick overview of what streaming is all about? Streaming data is referred to the continuously generating data from multiple sources. These sources send the records continuously in small sizes(~ thousands of KBs).

Some of the streaming data sources are:-
  • Sensors data generating continuously from vehicles, industrial equipment, etc. are sent to the streaming applications.
  • Social media generates a huge volume of data in real time which is used in a real-time analysis.
  • Changes in the stock market in real-time is analyzed and computations are done on those data.
  • Clickstream analysis is done on the ads posted on different websites. 
Though the volume of the streaming data is low, but the real-time processing of the data should be fast. There are many tools that deal with the streaming data, Spark Streaming is one of them. Spark streaming provides a fast, fault tolerant, scalable stream processing of the real-time data streams.

Data is ingested to spark from various sources like Kafka, Flume, TCP Sockets, file systems, twitter, etc. Data ingested to spark streaming is processed using complex algorithms and the results can be stored to file systems, databases, and live dashboards.

Let us start with a basic Spark Streaming example. In this example, we are going to use TCP socket as our data source, a java program is used to generate the data and send it to spark. We are reading the data from the socket and printing it in the console. Before proceeding, make sure that you fulfill the minimum requirement for running a streaming application, i.e., a cluster !! This is where the problem starts, how we are going to practice spark without cluster??  

There is a solution to this problem. :)  We are going to run spark streaming application on our own eclipse!! For those who face difficulty in integrating spark with eclipse, I have written a blog which will help you out.  Have a look:-



Now that we have spark integrated with eclipse, time to write our first basic spark streaming application. We have to just follow these steps:- 

Step I-  Write a code that sends data to Spark Streaming application.
  •  Create a java project, I gave it the name SocketStreaming.
  •  Create a package, 'server' is my package name.
  •  Create a class, 'SendDatatoSpark' is my class name.
  •  Write the following code to this class:
----------------------------------------------------------------------------------
package server;

import java.io.OutputStream;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

public class SendDatatoSpark {
public static void main(String[] args) throws Exception {
try {
// Create a new socket
System.out.println("--->   Creating a new Socket with port number : 9999 and host : localhost");
@SuppressWarnings("resource")
ServerSocket ss = new ServerSocket(9999);
// Waiting to establish a new connection
Socket s = ss.accept();
System.out.println("Connection established");
OutputStream obj = s.getOutputStream();
PrintStream ps = new PrintStream(obj);
// Using Random to create random integer
Random rnd = new Random();
// Creating the data in an infinite loop which will be consumed by
// Spark Streaming.
// Generate the data for userID with IP Adress
int i = 1;
while (true) {
String ip = "192.168.2." + rnd.nextInt(255);
ps.println("User_U0" + i + " -> " + ip);
// Generating the data after every 100 milliseconds.
Thread.sleep(100);
i++;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
----------------------------------------------------------------------------------

Step II - Write a Spark Streaming application to receive the data and do the further processing on that data (in this case, we are justing printing the streaming data to the console). 

  • Create a Maven project with Scala. In my previous blog, I have described the simple steps to create a maven project with Scala. I have created the project with following details:-
    group id - com.ssc (Package Name)
    artifact id - SparkSocketStreaming (Project Name)
    Class - ReceiveDataFromSocket
  • Open pom.xml and copy the following lines:-
----------------------------------------------------------------------------------

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ssc</groupId>
<artifactId>SparkSocketStreaming</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
</project>
----------------------------------------------------------------------------------
  • Write the following line of code to your scala class. 
----------------------------------------------------------------------------------
package com.ssc
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel._

object ReceiveDataFromSocket {

  def main(args: Array[String]): Unit = {
    //Creating an object of SparkConf
    val conf = new SparkConf().setMaster("local[2]").setAppName("SocketStream")
    
    //Creating an object of streaming context from SparkConf with a batch interval of 1 Second.

    val ssc = new StreamingContext(conf, Seconds(1))
    
    //Defining the stream type. In this case we are using SocketTextStream.
    //Host type : "localhost" and port : 9999
    //Storage level is MEMORY_AND_DISK_SER_2
    val lines = ssc.socketTextStream("localhost", 9999, MEMORY_AND_DISK_SER_2)
    
    //operating on Dstream which contains a series of RDD for a certain interval.
    //collecting the results of each RDD and printing it.
    lines.foreachRDD(x => x.collect().foreach(println))
    
    // to start a streaming application, ssc.start() is the first step. It starts the computation
    ssc.start()

    //ssc.awaitTermination waits for the computation to terminate.
    ssc.awaitTermination()
  }
}
----------------------------------------------------------------------------------

Step III - Time to execute whatever we have written.
  • Run the Java code that sends data to the spark application.

  • Run the spark code that receives the data from socket and prints it to the console.



Kudos!! You have executed your first spark streaming program successfully. :) 
Stay connected for future posts.

Friday, 14 April 2017

Running Apache Spark on Eclipse IDE : Beginners Guide

Nowadays, whenever we hear about Big Data, there is always a mention about Apache Spark. Being a keen learner there are plenty of questions that come to our mind. Some of the most frequent questions are jotted down:-

  •  Why is Spark trending nowadays when we have Hadoop?
  •  Why is Hadoop being replaced by Spark?
  •  What should I do to learn Spark?
  •  Where to start?
We can find many answers once we google the right thing.  Some of the suggestions are:-
  • Start learning at least one of the three languages: Scala, Python or Java.
  • Increase your awareness to the concept of distributed computing.
  • Start reading Apache Spark Documentation.
Once we are done with this, there is only one thing left to do which is by far the most important thing, i.e., how to gain hands-on experience of developing a spark application? Because thinking about spark, we start thinking about huge clusters. This demotivates our aspiration to master the most trending technology in Big Data.

I here come up with a solution which will help you to run spark on local, i.e., on your own Eclipse-IDE. We can develop Spark application in any of three languages: Scala, Python, and Java. As Scala is the first choice with Spark in the production environment and so is my experience with Spark-Scala, I am going to share steps to set up Spark-Scala application on Eclipse.

STEPS :-


1. Download Eclipse: - You can download Eclipse for Java or Scala. If your PC is a 32-bit system then download eclipse for 32 bit and if it is a 64-bit system then download it for 64 bit.

Link  - http://www.eclipse.org/downloads/

2. Go to Eclipse folder and open eclipse application.

3. Set WorkSpace.

4. If you have downloaded a Scala IDE, then it is fine. If you have downloaded a Java IDE install scala from the marketplace.





5. Create a Maven Project with group id(package) - com.demo and artifact id(project) - MyFirstProject


6. Go to ,m2 folder, it is by default created to your user. For me it is : C:\Users\Mohit\.m2 . Create a settings.xml file in this folder and write the following XML =>

-----------------------------------------------------------------------------------
<settings xmlns="http://maven.apache.org/SETTINGS/1.0.0"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/SETTINGS/1.0.0
                          https://maven.apache.org/xsd/settings-1.0.0.xsd">
  <proxies>
    <proxy>
      <id>p1</id>
      <active>true</active>
      <protocol>http</protocol>
      <host>your_host</host>
      <port>your_port</port>
      <nonProxyHosts>10.*</nonProxyHosts>
    </proxy>
  </proxies>

</settings>
-----------------------------------------------------------------------------------

7.  Open pom.xml of your project.

Replace the content with this:-

----------------------------------------------------------------------------------
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.demo</groupId>
  <artifactId>MyFirstProject</artifactId>
  <version>0.0.1-SNAPSHOT</version>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>

<dependencies>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.6.0</version>
</dependency>

</dependencies>

</project>
----------------------------------------------------------------------------------

8. If you are using Java Eclipse IDE, then add scala nature.
Right click on project ->  Configure -> Add Scala Nature



9. Change scala version to 2.10.
Right click on project -> Properties ->  Scala Compiler -> Tick on use project settings -> Select Latest 2.10 bundle(dynamic) -> Click OK -> Click OK on Compiler settings changed pop up.



10. Right Click on Project -> Run As Maven install

Once maven is successfully installed, BUILD SUCCESS is printed in the console.

11. Create a HADOOP_HOME folder in your Workspace. Create bin folder inside HADOOP_HOME and copy the following files there.

For 32 bit system -> https://drive.google.com/open?id=0B0w0JKH3mMG-ZmQ1dlFaNC1RM2s
For64 bit system ->  https://drive.google.com/open?id=0B0w0JKH3mMG-cDBkUk5vSk03Vnc

Unzip the file -> Copy the file content to the path =>  C:\Users\Mohit\workspace\HADOOP_HOME\bin folder(your workspace path in your case)

12. In Eclipse add HADOOP_HOME path in the following location =>

Window -> preferences -> java -> build path ->classpath variable -> new -> name: hadoop.home.dir -> path : C:\Users\Mohit\workspace\HADOOP_HOME



13. Add VM Arguments in Run Configurations as =>
 Run -> Run configurations -> Scala application -> Arguments -> VM arguments -> -Xms256m -Xmx1024m




Congratulations!!! You have successfully set up spark on your ECLIPSE IDE. Let us run a small spark program to check whether we are able to run Spark.


Sample Spark Program =>





----------------------------------------------------------------------------------------
Code =>
package com.demo.Processor

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object Test {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("MyFirstSparkProject").setMaster("local")
    val sc = new SparkContext(conf)
    val myArray = Array(1,2,3,4,5)
    val myRDD = sc.parallelize(myArray)
    myRDD.collect.foreach(println)
  }
}
----------------------------------------------------------------------------------------

Right Click on Test.scala -> Run AS -> Scala Application




                                           It's working!!! Have fun with Spark :)

Thank you for patiently reading my blog. Hope you have successfully configured Spark on your Eclipse IDE.

                                           Stay connected for future posts!!


Sunday, 9 April 2017

Performance Tuning in Spark SQL

Thinking about Apache Spark, things that come on everyone's mind is:-
  • It's going to be a lightning fast in-memory computing.
  • It's 100 times faster than MapReduce.

But sometimes, we find that the spark application is not performing to the expected level. What would be the possible reasons for it? The solution to it is very simple:

                       "You might have not tune your Spark Application properly."

Based on my experience in tuning the Spark Application for a live project, I would like to share some of the findings that helped me improve the performance of my Spark SQL job. Following are the configurations that must be taken into consideration while tuning your Spark SQL job:-


Auto broadcast Join: Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join.  This can be achieved by adding the configuration in SqlContext as:

Configuration : 
        sqlContext.setConf("spark.sql.autoBroadcastJoinThreshold", "10485760")

Default value is  “10485760  ”(10MB)
Here the value “ 10485760 ” is in bytes which could be set accordingly. This specifies the maximum size of the table to be broadcasted. All the tables under this size will be broadcasted.

As per observation, it is seen that by setting this parameter all the tables created from the parquet files are broadcasted but the in-memory table(intermediate tables) are not broadcasted. To overcome this issue, there are two options:

1.      Save those tables in hive meta store by using the function : df.write().mode(mode).saveAsTable(tableName).  

After saving the data frame as a table when accessed, the table will be automatically broadcasted.

2.     Save the data frame as a parquet file and then read that parquet. After reading the parquet, register it as a temporary table.  When the join operation will be performed in that table, it will automatically get broadcasted.

The second option is better because there is no need to connect to the hive meta store.

 Note : - If autoBroadcastJoin is enabled,  the cache should not be used, as it changes the BroadcastHash join to SortMerged join.


Shuffle partitions: Configures the number of partitions to use when shuffling data for joins or aggregations.

  Configuration : 
           sqlContext.setConf("spark.sql.shuffle.partitions", "200")

 The default value for partition is 200. The number of partitions should vary with the size of the dataset. If the dataset is small, then no. of partitions should be kept minimum and when the dataset is huge, the number of partitions should be increased accordingly. This can be managed by passing arguments to the application.

Shuffle partitions should be configured as per the requirement. One value of shuffle
partition should not be configured for the whole application, this should be explicitly managed. For small size data, the value of shuffle partition should be small and for the large partition, the value should be large. The Too large value of shuffle partition is also not required.


Data Serialization: Serialization plays an important role in the performance of any distributed application. Formats that are slow to serialize objects into, or consume a large number of bytes, will greatly slow down the computation. Often, this will be the first thing you should tune to optimize a Spark application. 

         By default, Spark serializes objects using Java’s ObjectOutputStream framework, and can work with any class you create that implements java.io.Serializable. Java serialization is flexible but often quite slow.

         Kryo is significantly faster and more compact than Java serialization (often as much as 10x).

Configuration :   sqlContext.setConf("spark.serializer","org.apache.spark.serializer.KryoSerializer")

These are the few configurations that can be used to tune the performance of apache-spark application. In my next post, I would be sharing another set of configurations.

Thank you for reading my post. Hope it helps you to tune your spark application.
Stay connected for future posts. 


A Brief Introduction to the Architecture of Cloudera Impala

In this era of technological advancements, time and velocity are the parameters which describe the effectiveness of any product. When we a...