Saturday, 22 April 2017

An Introduction to Spark Streaming : Beginner's Guide (Included a working example on Eclipse IDE)

Before diving into Spark Streaming, let us have a quick overview of what streaming is all about? Streaming data is referred to the continuously generating data from multiple sources. These sources send the records continuously in small sizes(~ thousands of KBs).

Some of the streaming data sources are:-
  • Sensors data generating continuously from vehicles, industrial equipment, etc. are sent to the streaming applications.
  • Social media generates a huge volume of data in real time which is used in a real-time analysis.
  • Changes in the stock market in real-time is analyzed and computations are done on those data.
  • Clickstream analysis is done on the ads posted on different websites. 
Though the volume of the streaming data is low, but the real-time processing of the data should be fast. There are many tools that deal with the streaming data, Spark Streaming is one of them. Spark streaming provides a fast, fault tolerant, scalable stream processing of the real-time data streams.

Data is ingested to spark from various sources like Kafka, Flume, TCP Sockets, file systems, twitter, etc. Data ingested to spark streaming is processed using complex algorithms and the results can be stored to file systems, databases, and live dashboards.

Let us start with a basic Spark Streaming example. In this example, we are going to use TCP socket as our data source, a java program is used to generate the data and send it to spark. We are reading the data from the socket and printing it in the console. Before proceeding, make sure that you fulfill the minimum requirement for running a streaming application, i.e., a cluster !! This is where the problem starts, how we are going to practice spark without cluster??  

There is a solution to this problem. :)  We are going to run spark streaming application on our own eclipse!! For those who face difficulty in integrating spark with eclipse, I have written a blog which will help you out.  Have a look:-



Now that we have spark integrated with eclipse, time to write our first basic spark streaming application. We have to just follow these steps:- 

Step I-  Write a code that sends data to Spark Streaming application.
  •  Create a java project, I gave it the name SocketStreaming.
  •  Create a package, 'server' is my package name.
  •  Create a class, 'SendDatatoSpark' is my class name.
  •  Write the following code to this class:
----------------------------------------------------------------------------------
package server;

import java.io.OutputStream;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

public class SendDatatoSpark {
public static void main(String[] args) throws Exception {
try {
// Create a new socket
System.out.println("--->   Creating a new Socket with port number : 9999 and host : localhost");
@SuppressWarnings("resource")
ServerSocket ss = new ServerSocket(9999);
// Waiting to establish a new connection
Socket s = ss.accept();
System.out.println("Connection established");
OutputStream obj = s.getOutputStream();
PrintStream ps = new PrintStream(obj);
// Using Random to create random integer
Random rnd = new Random();
// Creating the data in an infinite loop which will be consumed by
// Spark Streaming.
// Generate the data for userID with IP Adress
int i = 1;
while (true) {
String ip = "192.168.2." + rnd.nextInt(255);
ps.println("User_U0" + i + " -> " + ip);
// Generating the data after every 100 milliseconds.
Thread.sleep(100);
i++;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
----------------------------------------------------------------------------------

Step II - Write a Spark Streaming application to receive the data and do the further processing on that data (in this case, we are justing printing the streaming data to the console). 

  • Create a Maven project with Scala. In my previous blog, I have described the simple steps to create a maven project with Scala. I have created the project with following details:-
    group id - com.ssc (Package Name)
    artifact id - SparkSocketStreaming (Project Name)
    Class - ReceiveDataFromSocket
  • Open pom.xml and copy the following lines:-
----------------------------------------------------------------------------------

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ssc</groupId>
<artifactId>SparkSocketStreaming</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
</project>
----------------------------------------------------------------------------------
  • Write the following line of code to your scala class. 
----------------------------------------------------------------------------------
package com.ssc
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel._

object ReceiveDataFromSocket {

  def main(args: Array[String]): Unit = {
    //Creating an object of SparkConf
    val conf = new SparkConf().setMaster("local[2]").setAppName("SocketStream")
    
    //Creating an object of streaming context from SparkConf with a batch interval of 1 Second.

    val ssc = new StreamingContext(conf, Seconds(1))
    
    //Defining the stream type. In this case we are using SocketTextStream.
    //Host type : "localhost" and port : 9999
    //Storage level is MEMORY_AND_DISK_SER_2
    val lines = ssc.socketTextStream("localhost", 9999, MEMORY_AND_DISK_SER_2)
    
    //operating on Dstream which contains a series of RDD for a certain interval.
    //collecting the results of each RDD and printing it.
    lines.foreachRDD(x => x.collect().foreach(println))
    
    // to start a streaming application, ssc.start() is the first step. It starts the computation
    ssc.start()

    //ssc.awaitTermination waits for the computation to terminate.
    ssc.awaitTermination()
  }
}
----------------------------------------------------------------------------------

Step III - Time to execute whatever we have written.
  • Run the Java code that sends data to the spark application.

  • Run the spark code that receives the data from socket and prints it to the console.



Kudos!! You have executed your first spark streaming program successfully. :) 
Stay connected for future posts.

1 comment:

  1. Thanks to share ur knowledge, its little old 5 years back. Its simple spark streaming hello world program. I think within this time you got maximum knowledge in spark please share ur knowledge how to handel spark streaming projecs and realtime problems ... pls share ur knowledge frequently thx in advanced.
    Venu
    spark training institute in Hyderabad

    bigdata training in Hyderabad

    ReplyDelete

A Brief Introduction to the Architecture of Cloudera Impala

In this era of technological advancements, time and velocity are the parameters which describe the effectiveness of any product. When we a...