Using Notebooks and Spark on Bluemix

Share: Share on FacebookTweet about this on TwitterShare on Google+Share on LinkedInShare on RedditEmail this to someonePrint this page

In this demonstration we are going to utilize the IBM Apache Spark and IBM Cloudant Bluemix services to process and persist data from the Meetup rsvp stream. On the backend the IBM Apache Spark service will be using the the Spark Kernel. The Spark Kernel provides an interface that allows clients to interact with a Spark Cluster. Clients can send libraries and snippets of code that are interpreted and run against a Spark context. Along with this blog we include sample Notebooks which should be run on IBM Bluemix using the IBM Apache Spark and IBM Cloudant services available in the IBM Bluemix Catalog.

The intent of this article is to walk through a case study of how one can integrate IBM Cloudant as a primary data store in their data papeline fueled by Spark Streaming.  We will show you how to utilize Spark Streaming by implementing a custom websocket receiver to process events from the Meetup rsvp stream.  We will be able to filter this stream in real time, serialize it, store it into a dataframe, and persist it to IBM Cloudant.  The example given below will be utilizing Scala.

Prerequisites

  1. IBM Bluemix account – Sign up for IBM Bluemix
  2. IBM Spark service instance – Create an IBM Apache Spark Service.  Note once the IBM Spark service is created you can then use the create notebook option and upload both Bluemix-Spark-Cloudant Notebooks which are ready to run.
  3. IBM Cloudant service instance – Create an IBM Cloudant NoSQL DB Service. Note once the Cloudant service is created you can then create a meetup_group database that our example will utilize.
  4. IBM Cloudant service credentials and Data Connection
    1. In the IBM Spark Bluemix service there will be a data option on the left side where you will create a connection to the datasource that links the IBM Spark Bluemix service to the IBM Cloudant database service created previously.
    2.  Once a connection has been created in the Spark Service Notebook uploaded previously use the Data Source option on the right Palette to Add a data source.
    3. After the data source is configured and linked to the created notebook you can use the Insert to code link which will add metadata regarding the datasource to your notebook. You will want to keep track of the hostname, username, and password to be used for configuration.

IBM’s Bluemix Cloud Platform

Screen Shot 2015-11-09 at 9.08.25 AM

IBM’s Bluemix is a cloud platform as a service.  It supports several programming languages and services.  IBM Bluemix is based on Cloud Foundry open technology and runs on SoftLayer infrastructure.

IBM’s Cloudant as a service

Cloudant

Cloudant, an IBM product, is an open source non-relational, distributed database service based on the CouchDB and BigCouch projects.

IBM’s Spark as a service

Spark

Apache Spark is a fast and general-purpose cluster computing framework.  Spark the successor to Hadoop makes use of in memory primitives, rather than a disk based MapReduce process.  This in memory processing has been shown to dramatically improve performance in applications that repeatedly query data.

Plan of Attack

  1. Add Dependencies/Jars
  2. Initialize spark context with Cloudant configurations
  3. Read/Write from the IBM Cloudant Bluemix service
  4. Create the WebSocketReceiver for our Streaming Context
  5. Persist the Meetup stream to IBM Cloudant
  6. Verify reading from IBM Cloudant and filter on our dataframe

Note in our implementation we have broken up the Plan of Attack into two Bluemix-Spark-Cloudant Notebooks.  The first Notebook, 1-Streaming-Meetups-to-IBM-Cloudant-using-Spark, will include steps one through five; where as the second Notebook, 2-Reading-Meetups-from-IBM-Cloudant-using-Spark, will include steps one, two, three, and six.  We treat the first Notebook as a long running streaming job.  The second notebook serves the purpose of analyzing and interacting with the filtered and persisted stream via IBM Cloudant in real time.

The following code snippets are from the provided Notebooks that run on top of  the IBM Bluemix Spark service

Add Jars

In order to run this demonstration notebook we are using the cloudant-spark and scalawebsocket libraries. These Scala dependencies/jars are added to our environment using the AddJar magic from the Spark Kernel, which adds the specified jars to the Spark Kernel and Spark cluster.

Create the Spark context

Note you need to update the HOST, USERNAME, and PASSWORD for the configuration below.

Read/Write the meetup_group Cloudant database

Using the cloudant-spark library we are able to seemlessly interact with our IBM Cloudant Bluemix Service meetup_group database through the abstraction of Spark Dataframes.

Process and Persist the meetup rsvp stream into a Cloudant database

This function creates a streaming context that operates on a 10 second window. Note due to our implementation in the custom WebSocketReceiver we are able to transform the text content of the websocket to JSON and extract the MeetupRsvp from it. Then for each MeetupRsvp RDD in our stream we are able to filter the stream where the group_state equals Texas. Lastly, we convert the MeetupRsvp to a dataframe and utilize our writeToDatabase function to persist the instance to IBM Cloudant.

 Verify reading from Cloudant and filter on our dataframe

Summary

A developer is able to effortlessly integrate Cloudant as a datastore underneath their Spark dataframes using IBM Spark as a service with Notebooks and IBM Cloudant on IBM Bluemix.

Notebooks to be run on IBM’s Apache Spark Bluemix Service

Bluemix-Spark-Cloudant Notebooks

References

IBM Apache Spark Bluemix Service

IBM Cloudant NoSQL DB Bluemix Service

Bluemix Catalog

Spark Streaming Custom Receivers

Spark SQL Cloudant External Datasource

Spark SQL and DataFrame Guide

Share: Share on FacebookTweet about this on TwitterShare on Google+Share on LinkedInShare on RedditEmail this to someonePrint this page
Michael Poplavski
Michael is a Software Engineer in IBM Emerging Technologies.
Michael Poplavski

Latest posts by Michael Poplavski (see all)

Michael Poplavski

6 comments

  • Holger Kache

    Great example of the Cloudant Spark integration! Just one important note: the Cloudant-Spark connector libraries are pre-installed with the Spark Service on Bluemix and you can remove this AddJar magic command from your example:

    %AddJar https://dl.dropboxusercontent.com/u/19043899/cloudant-spark.jar

    There are 2 advantages to removing this external dependency:

    1) The Cloudant team can update the .jar with their continuous delivery model and notebooks will automatically have the latest version available
    2) One can implement notebooks for both runtimes, Scala and Python where the AddJar command is not available

  • Pingback: Apache Spark Notebooks and Cloudant with Meetup Data | Lab HubLab Hub

  • Shailender Gupta

    Thanks for posting this sample.

    When I try to view the filtered data, I get following error:

    Name: org.apache.spark.SparkException
    Message: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 172, localhost): java.lang.RuntimeException: sendReceive doesn’t support chunked responses, try sendTo instead
    at scala.sys.package$.error(package.scala:27)
    at spray.client.pipelining$$anonfun$sendReceive$1$$anonfun$apply$1.apply(pipelining.scala:40)
    at spray.client.pipelining$$anonfun$sendReceive$1$$anonfun$apply$1.apply(pipelining.scala:38)
    at scala.util.Success$$anonfun$map$1.apply(Try.scala:206)
    at scala.util.Try$.apply(Try.scala:161)
    at scala.util.Success.map(Try.scala:206)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

    Driver stacktrace:
    StackTrace: org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
    org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
    org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
    scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
    org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    scala.Option.foreach(Option.scala:236)
    org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
    org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
    org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
    org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Leave a Reply

Your email address will not be published. Required fields are marked *