A.I. & Optimization

Advanced Machine Learning, Data Mining, and Online Advertising Services

Using Apache Spark for Real-Time Bidding

In this post we demonstrate different use cases for optimizing real-time bidding using the Apache Spark.

MapReduce Overview

There are many applications where we need to analyze a large size of data in order to generate some statistics or train a machine learning model. Google has introduced MapReduce programming model in 2004 where one can parralelize their analysis on a large cluster of commodity machines.

The Map-reduce is a high-level programming system that allows to parallelize many real-world tasks. The programmer needs to write code for two functions: map and reduce. A master controller divides the input data into chunks, and assigns different processors to execute the map function on each chunk. Other processors, perhaps the same ones, are then assigned to perform the reduce function on pieces of the output from the map function.

The run-time system takes care of partitioning the input data, scheduling the program's execution across a set of machines, handling machine failures, and also managing the required inter-machine communication. The reader can find more technical details on MapReduce in the original Google paper: MapReduce: Simplified Data Processing on Large Clusters.

Hadoop (Apache) offers an open source implementation of MapReduce. To learn how to install, configure and run code using Hadoop MapReduce, you can visit here: MapReduce Tutorial

Resilient Distributed Datasets (RDD)

In 2011, a team from a Berkley AMPLab developed another distributed framework called Spark which exploits in-memory computations. Spark uses RDDs, a distributed memory abstraction that lets programmers perform in-memory computations on large clusters.

Using RDDs can improve the performance for two types of applications: iterative algorithms and interactive data mining tools. In both cases, keeping data in memory can improve performance by an order of magnitude. The reader can learn more about RDDs here: Spark RDD Paper.

Apache Spark Overview

To learn more about Apache Spark, you should go to their website: Apache Spark. Below we demo how one can use Apache Spark to analyze the impression/click/conversion logs collected from an RTB bidder. We assume that your bidder logs have been shipped to S3 for post-processing.

The first step in your Spark script is to create a spark context and specify your app name:

In the following example, we will show how one can use Spark to parse bid requests from logs and then compute the app-name counts from bid requests.

sc = SparkContext(appName="MopubAppsInsights")

RDDs can be created from your bidding logs stored on a S3 bucket. Let’s make a new RDD from our bid logs stored on Amazon S3:

rdd = sc.textFile(AWS_S3 + '/rtb-server/no-bidding-09-26-2015-12-*.log.gz')

RDDs have actions, which return values, and transformations, which return pointers to new RDDs.

The no-bidding logs have text format where we store every bid request received from the ad exchange in a separate line. In snippet code shown below, we first sample 1% of lines from each bidding log files.

Next, we get the body of bid requests. Finally, we use the filter transformation to return a new RDD with a subset of the items in the file. Basically we just grab bid requests from our logs and filter the bidder bid responses.

bid_req_rdd = rdd.sample(False, 0.01, int(time.time())).map(lambda log: log.split('\n')[0]).filter(lambda line: 'BidRequest' in line)

Next step is to take the bid requests RDDs and run two more map transformation on them as shown below. With the first transformation, we basically generate new RDDs where we throw out only the name of app from which we have the bide request. Next, we spit out a pair of (appname, 1) per each app seen in a bid request.

appname_rdd = bid_req_rdd.map(get_appname).map(lambda appname: (appname, 1))

At the end, we need to run an action on RDDs. Here, we basically use a reduce by key action to count the number of times we have received bid requests for different apps.

reducedAppsList = appname_rdd.reduceByKey(lambda a, b: a + b)

Finally, we can store our results for apps count into S3 bucket for post processing.

reducedAppsList.saveAsTextFile(AWS_S3 + "/spark-logging/apps.insights")

To view the full source code of our Spark script, check its github repo: rtb-spark.

Amazon EMR

To run Apache Spark, we use Amazon EMR. Below we show how to create different clusters configurations using Amazon EMR.

Config I

To provision a cluster with 3 nodes (i.e. one master and two cores) all of type m3.xlarge, run the following command:

aws emr create-cluster --ami-version 3.2 --name SparkCluster --ec2-attributes KeyName=Your_Key --applications Name=Hive --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=m3.xlarge --use-default-roles --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark

Config II

To provision 3-nodes cluster (dedicated ec2 instance) with instance type of m3.xlarge, run the following command line:

aws emr create-cluster --name SparkCluster --ami-version 3.2 --instance-type m3.xlarge --instance-count 3 --ec2-attributes KeyName=Your_Key --applications Name=Hive --use-default-roles --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark

Config III

To provision a cluster of 3 group types (master, core, and task) with spot ec2 instances, run the following command:

aws emr create-cluster --name "SpotSparkCluster" --ami-version 3.7 --applications Name=Hive --use-default-roles --ec2-attributes KeyName=Your_Key --instance-groups InstanceGroupType=MASTER,InstanceType=m3.xlarge,InstanceCount=1,BidPrice=0.1 InstanceGroupType=CORE,BidPrice=0.04,InstanceType=m3.xlarge,InstanceCount=2 InstanceGroupType=TASK,BidPrice=0.05,InstanceType=m3.xlarge,InstanceCount=3 --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark

Config IV

To provision a cluster of 2 group types (master, core) with spot instances, run the following commands:

aws emr create-cluster --name "SparkSpotCluster" --ami-version 3.7 --applications Name=Hive --use-default-roles --ec2-attributes KeyName=Your_Key --instance-groups InstanceGroupType=MASTER,InstanceType=m3.xlarge,InstanceCount=1,BidPrice=0.07 InstanceGroupType=CORE,BidPrice=0.05,InstanceType=m3.xlarge,InstanceCount=2 --bootstrap-actions Path=s3://support.elasticmapreduce/spark/install-spark

After you have provisoned your cluster, you can check its status and wait until cluster is in waiting state using the following command line:

aws emr describe-cluster --cluster-id cluster-id

When your cluster is ready, you can use Amazon Console to get the public DNS of your master node. After having the public DNS of master node, you can ssh to your master node from your machine:

ssh -i Your_Key hadoop@master-node-dns-name

After ssh'ing to master node, you need to pull spark-rtb code from its github repo.

To run the spark python script, you can use pyspark command as follows:

MASTER=yarn-client spark/bin/pyspark run.py

Config V

For a while, we couldn't follow the example given on Amazon EMR page for creating a Spark cluster (we kept getting errors). We have finally solved that issue by upgrading Amazon CLI to its latest version following the instructions here: aws CLI. So, now one can create an EMR cluster using the following command:

aws emr create-cluster --release-label emr-4.0.0 --applications Name=Spark --ec2-attributes KeyName=Your_Key --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=m3.xlarge

To run the spark script, run the following command:

spark-submit run.py

Instance Groups

Here the reader can find the details for three types of instance groups that we can creating while creating an EMR cluster: Instance Groups.

Clean up

After your Spark computation is done, you should terminate your EMR cluster:

aws emr terminate-clusters --cluster-id