Packaging your application using SBT

We showed how to run Spark in interactive mode. Now we will explain how to build applications that can be submitted using the spark-submit command.

First, we will explain how to structure a Scala project, using the SBT build tool. The typical project structure is

├── build.sbt
├── project
│   └── build.properties
└── src
    └── main
        └── scala
            └── example.scala

This is typical for JVM languages. More directories are added under the scala folder to resemble the package structure.

The project's name, dependencies, and versioning is defined in the build.sbt file. An example build.sbt file is

name := "Example"
version := "0.1.0"
scalaVersion := "2.12.14"

This specifies the Scala version of the project (2.12.14) and the name of the project.

If you run sbt in this folder it will generate the project directory and build.properties. build.properties contains the SBT version that is used to build the project with, for backwards compatibility.

Open example.scala and add the following

package example


object Example {
  def main(args: Array[String]) {
    println("Hello world!")
  }
}

Start the sbt container in the root folder (the one where build.sbt is located). This puts you in interactive mode of SBT. We can compile the sources by writing the compile command.

docker run -it --rm -v "`pwd`":/root sbt sbt
copying runtime jar...
[info] welcome to sbt 1.5.5 (Oracle Corporation Java 11.0.12)
[info] loading project definition from /root/project
[info] loading settings for project root from build.sbt ...
[info] set current project to Example (in build file:/root/)
[info] sbt server started at local:///root/.sbt/1.0/server/27dc1aa3fdf4049b492d/sock
[info] started sbt server
sbt:Example>

We can now type compile.

sbt:Example> compile
[info] compiling 1 Scala source to /root/target/scala-2.12/classes ...
...
[info] Non-compiled module 'compiler-bridge_2.12' for Scala 2.12.14. Compiling...
[info]   Compilation completed in 10.128s.
[success] Total time: 14 s, completed Aug 26, 2021, 3:03:34 PM

We can try to run the application by typing run.

sbt:Example> run
[info] running example.Example
Hello world!
[success] Total time: 1 s, completed Aug 26, 2021, 3:05:29 PM

Now let's add a function to example.scala.

object Example {
  def addOne(tuple: (Char, Int)) : (Char, Int) = tuple match {
    case (chr, int) => (chr, int+1)
  }
  def main(args: Array[String]) {
    println("Hello world!")
    println(addOne('a', 1))
  }
}

In your SBT session we can prepend any command with a tilde (~) to make them run automatically on source changes.

sbt:Example> ~run
[info] Compiling 1 Scala source to /root/target/scala-2.12/classes ...
[info] running example.Example
Hello world!
(a,2)
[success] Total time: 0 s, completed Sep 7, 2020 10:40:56 AM
[info] 1. Monitoring source files for root/run...
[info]    Press <enter> to interrupt or '?' for more options.

We can also open an interactive session using SBT.

sbt:Example> console
[info] Starting scala interpreter...
Welcome to Scala 2.12.14 (OpenJDK 64-Bit Server VM, Java 11.0.12).
Type in expressions for evaluation. Or try :help.

scala> example.Example.addOne('a', 1)
res1: (Char, Int) = (a,2)

scala> println("Interactive environment")
Interactive environment

To build Spark applications with SBT we need to include dependencies (Spark most notably) to build the project. Modify your build.sbt file like so

name := "Example"
version := "0.1.0"
scalaVersion := "2.12.14"

val sparkVersion = "3.1.2"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion
)

If you still have the SBT shell opened, you must use reload to make sure your build.sbt is updated.

We could now use Spark in the script (after running compile).

Let's implement a Spark application. Modify example.scala as follows, but don't run the code yet!

package example

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import java.sql.Timestamp


object ExampleSpark {
  case class SensorData (
    sensorName: String,
    timestamp: Timestamp,
    numA: Double,
    numB: Double,
    numC: Long,
    numD: Double,
    numE: Long,
    numF: Double
  )
  def main(args: Array[String]) {
    val schema =
      StructType(
        Array(
          StructField("sensorname", StringType, nullable=false),
          StructField("timestamp", TimestampType, nullable=false),
          StructField("numA", DoubleType, nullable=false),
          StructField("numB", DoubleType, nullable=false),
          StructField("numC", LongType, nullable=false),
          StructField("numD", DoubleType, nullable=false),
          StructField("numE", LongType, nullable=false),
          StructField("numF", DoubleType, nullable=false)
        )
      )

    val spark = SparkSession
      .builder
      .appName("Example")
      .getOrCreate()
    val sc = spark.sparkContext // If you need SparkContext object

    import spark.implicits._

    val ds = spark.read
                  .schema(schema)
                  .option("timestampFormat", "M/d/yy:H:m")
                  .csv("./sensordata.csv")
                  .as[SensorData]

    val dsFilter = ds.filter(a => a.timestamp ==
        Timestamp.valueOf("2014-03-10 01:01:00"))

    dsFilter.collect.foreach(println)

    spark.stop
  }
}

We will not run this code, but submit it to a local Spark "cluster" (on your machine). To do so, we require a JAR. You can build a JAR using the package command (or assembly to include all dependencies) in SBT. This JAR will be located in the target/scala-version/project_name_version.jar.

You can run the JAR via a spark-submit container (which will run on local mode). By mounting the spark-events directory the event log of the application run is stored to be inspected later using the Spark history server.

docker run -it --rm -v "`pwd`":/io -v "`pwd`"/spark-events:/spark-events spark-submit target/scala-2.12/example_2.12-0.1.0.jar

The output should look as follows:

2020-09-07 11:07:28,890 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2020-09-07 11:07:29,068 INFO spark.SparkContext: Running Spark version 3.1.2
2020-09-07 11:07:29,087 INFO spark.SparkContext: Submitted application: Example

...

SensorData(COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94)
SensorData(NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78)
SensorData(THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89)
SensorData(BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67)
SensorData(CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44)
SensorData(LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78)
SensorData(CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57)
SensorData(ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66)
SensorData(MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75)
SensorData(BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28)

...

2020-09-07 11:07:33,694 INFO util.ShutdownHookManager: Shutdown hook called
2020-09-07 11:07:33,694 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-757daa7c-c317-428e-934f-aaa9e74bf808
2020-09-07 11:07:33,696 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a38554ba-18fc-46aa-aa1e-0972e24a4cb0

By default, Spark's logging is quite verbose. You can change the log levels to warn to reduce the output.

For development purposes you can also try running the application from SBT using the run command. Make sure to set the Spark master to local in your code. You might run into some trouble with threads here, which can be solved by running the application in a forked process, which can be enabled by setting fork in run := true in build.sbt. You will also have to set to change the log levels programmatically, if desired.

import org.apache.log4j.{Level, Logger}
...


def main(args: Array[String]) {
    ...
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    ...
}

You can also use this logger to log your application which might be helpful for debugging on the AWS cluster later on.

You can inspect the event log from the application run using the Spark history server. Start a spark-history-server container from the project root folder and mount the spark-events folder in the container.

docker run -it --rm -v "`pwd`"/spark-events/:/spark-events -p 18080:18080 spark-history-server

The output will look as follows:

starting org.apache.spark.deploy.history.HistoryServer, logging to /spark/logs/spark--org.apache.spark.deploy.history.HistoryServer-1-5b5de5805769.out

...

2020-09-07 11:10:23,020 INFO history.FsHistoryProvider: Parsing file:/spark-events/local-1599477015931 for listing data... 2020-09-07
11:10:23,034 INFO history.FsHistoryProvider: Finished parsing file:/spark-events/local-1599477015931

Navigate to http://localhost:18080 to view detailed information about your jobs. After analysis you can shutdown the Spark history server using ctrl+C.

^C
2020-09-07 11:13:21,619 ERROR history.HistoryServer: RECEIVED SIGNAL INT
2020-09-07 11:13:21,630 INFO server.AbstractConnector: Stopped Spark@70219bf{HTTP/1.1,[http/1.1]}{0.0.0.0:18080}
2020-09-07 11:13:21,633 INFO util.ShutdownHookManager: Shutdown hook called

Be sure to explore the history server thoroughly! You can use it to gain an understanding of how Spark executes your application, as well as to debug and time your code, which is important for both lab 1 and 2.