This guide combines an overview of Sparkling with a quick tutorial that helps you to get started with it. It should take about 20 minutes to read and study the provided code examples. This guide covers:
There is a companion project to this getting started guide. Just use this as a staring point to explore Sparkling, because it contains a ready-made project.clj for Leiningen. There’s no need to install Apache Spark, or even to run a cluster of any kind. Just get going on your notebook.
Clone that repo by executing
git clone https://github.com/gorillalabs/sparkling-getting-started.git cd sparkling-getting-started
in your shell.
Start up your REPL (in your favourite tool), you should see something like this (after downloading required dependencies):
$ lein do clean, repl Compiling tf-idf.core nREPL server started ... REPL-y 0.3.1 Clojure 1.6.0 Docs: (doc function-name-here) (find-doc "part-of-name-here") Source: (source function-name-here) Javadoc: (javadoc java-object-or-class-here) Exit: Control+D or (exit) or (quit) Results: Stored in vars *1, *2, *3, an exception in *e sparkling.example.tfidf=>
Require the sparkling namespaces you will need for this guide.
The first step is to create a Spark configuration object, SparkConf, which contains information about your application. This is used to construct a SparkContext object which tells Spark how to access a cluster.
Here we create a SparkConf object with the string
local to run in local mode:
local master provides you with a standalone system, where you do not need to install other software besides your Clojure environment (like JDK, lein, etc.) So it’s great for in-REPL-development, and unit testing.
The main abstraction Spark provides is a resilient distributed dataset, RDD, which is a fault-tolerant collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. There are two ways to create RDDs: parallelizing / parallelizing-pairs an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat, or even JDBC.
RDDs basically come in two flavors. Plain RDDs simply hold a collection of arbitrary objects. PairRDDs provide a key-value-collection, but unlike a Map it can contain multiple instances of a single key. Internally, PairRDDs are constructed as collections of Scala Tuple2 objects, but Sparkling provides clojuresque ways to access these.
Plain RDDs in Sparkling are created by calling the
parallelize function on your Clojure data structure:
Check out the contents of you newly created RDD:
PairRDDs in Sparkling are created by calling the
parallelize-pairs function on your Clojure data structure:
Once initialized, the distributed dataset or RDD can be operated on in parallel.
An important parameter for parallel collections is the number of slices to cut the dataset into. Spark runs one task for each slice of the cluster. Normally, Spark tries to set the number of slices automatically based on your cluster. However, you can also set it manually in sparkling by passing it as a third parameter to parallelize:
Spark can create RDDs from any storage source supported by Hadoop, including the local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
Text file RDDs can be created in sparkling using the
text-file function under the
sparkling.core namespace. This function takes a URI for the file (either a local path on the machine, or a
s3n://..., etc URI) and reads it as a collection of lines. Note,
text-file supports S3 and HDFS globs.
The following example refers to the README.md file at the current directory. Make sure to have one.
RDDs support two types of operations:
To illustrate RDD basics in sparkling, consider the following simple application using this sample
The first line defines a base RDD from an external file. The dataset is not loaded into memory; it is merely a pointer to the file. The second line defines an RDD of the lengths of the lines as a result of the
map transformation. Note, the lengths are not immediately computed due to laziness. Finally, we run
reduce on the transformed RDD, which is an action, returning only a value to the driver program.
If we also wanted to reuse the resulting RDD of length of lines in later steps, we could insert:
reduce action, which would cause the line-lengths RDD to be saved to memory after the first time it is realized. See RDD Persistence for more on persisting and caching RDDs in sparkling.
Spark’s API relies heavily on passing functions in the driver program to run on the cluster. Sparkling makes it easy and natural to define serializable Spark functions/operations and provides two ways to do this. So, in order for your functions to be available on the cluster, the namespaces containing them need to be (AOT-)compiled. That’s usually no problem, because you should uberjar your project to deploy it to the Cluster anyhow.
When we evaluate this
map transformation on the initial RDD, the result is another RDD. The result of this transformation can be seen using the
spark/collect action to return all of the elements of the RDD. The following example will only work in an AOT-compiled environment. So, it will not work in your REPL:
We can also use
spark/take to return just a subset of the data.
Some transformations in Spark operate on Key-Value-Tuples, e.g. joins, reduce-by-key, etc. In sparkling, these operations are available on PairRDDs.
You do not need to deal with the internal data structures of Apache Spark (like scala.Tuple2), if you use the functions from the
So, first require that namespace
We deal with strings, so require clojure.string also:
The following code uses the
reduce-by-key operation on key-value pairs to count how many times each word occurs in a file:
reduce-by-key operation, we can sort the pairs alphabetically using
spark/sort-by-key. To collect the word counts as an array of objects in the repl or to write them to a filesysten, we can use the
This section describes a syntactic convenience. It can safely be skipped.
In the example above,
spark/sort-by-key produces a PairRDD, whose elements are
scala/Tuple2 instances. We then need to unwrap these to get the raw values inside.
This is what
s-de/key-value-fn does; it is one of several such wrappers provided.
In addition to tuples, there are two other “wrapper” classes which can be produced by RDD operations:
scala.collection.convert.Wrappers$IterableWrapperfor collections, for example as in the result of
com.google.common.base.Optionalfor potentially absent values, for example as in the result of
For this reason, the
sparkling.destructuring/fn macro implements a destructuring binding form specialized for these data types.
In the last example above, you could replace
(s-de/key-value-fn (fn [k v] [k v]))
(s-de/fn [(k v)] [k v])
…which is not much different. However
s-de/fn supports arbitrary nesting of tuples, delegates to clojure.core’s destructuring of maps and vectors, and has special treatment of symbols beginning with
- (unwrap an iterable) or
? (unwrap an Optional). Some examples will clarify:
Notice that the
left-outer-join results in a PairRDD where each member is a Tuple2 containing a name and a nested Tuple2 containing a collection of events, and an optional state. The
s-de/fn can make the code to process this RDD cleaner:
The binding form above
[(-events ?rep)] means: “Expect a single Tuple2 argument, containing an iterable and an optional”.
rep will be bound inside the fn, their values already “unwrapped”. Arbitrarily nested tuples are supported. You can mix this with Clojure’s built-in destructuring forms (maps and vectors).
The tests provide many examples.
Sparkling supports the following RDD transformations:
map: returns a new RDD formed by passing each element of the source through a function.
map-to-pair: returns a new
JavaPairRDDof (K, V) pairs by applying a function to all elements of an RDD.
reduce-by-key: when called on an RDD of (K, V) pairs, returns an RDD of (K, V) pairs where the values for each key are aggregated using a reduce function.
flat-map: similar to
map, but each input item can be mapped to 0 or more output items (so the function should return a collection rather than a single item)
filter: returns a new RDD containing only the elements of the source RDD that satisfy a predicate function.
join: when called on an RDD of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
left-outer-join: performs a left outer join of a pair of RDDs. For each element (K, V) in the first RDD, the resulting RDD will either contain all pairs (K, (V, W)) for W in second RDD, or the pair (K, (V, nil)) if no elements in the second RDD have key K.
cogroup: groups data from different RDDs based upon their key. Can be called with two or three RDDs, results in an RDD (K, (V, W)) or (K, (V, W, X)).
sample: returns a ‘fraction’ sample of an RDD, with or without replacement, using a random number generator ‘seed’.
combine-by-key: combines the elements for each key using a custom set of aggregation functions. Turns an RDD of (K, V) pairs into a result of type (K, C), for a ‘combined type’ C. Note that V and C can be different – for example, one might group an RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users must provide three functions:
sort-by-key: when called on an RDD of (K, V) pairs where K implements ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified by the optional boolean ascending argument.
coalesce: decreases the number of partitions in an RDD to ‘n’. Useful for running operations more efficiently after filtering down a large dataset.
group-by: returns an RDD of items grouped by the return value of a function.
group-by-key: groups the values for each key in an RDD into a single sequence.
flat-map-to-pair: returns a new
JavaPairRDDby first applying a function to all elements of the RDD, and then flattening the results.
zip-with-index: when called on and RDD of type T, returns a new
JavaPairRDDof (T, index) pairs where the index is a
longelement index, starting from 0.
zip-with-unique-id: when called on and RDD of type T, returns a new
JavaPairRDDof (T, id) pairs where the id is a unique
long. Items in the kth partition will get ids k, n+k, 2*n+k, …, where n is the number of partitions. So there may exist gaps, but this method won’t trigger a spark job.
Sparkling supports the following RDD actions:
reduce: aggregates the elements of an RDD using a function which takes two arguments and returns one. The function should be commutative and associative so that it can be computed correctly in parallel.
count-by-key: only available on RDDs of type (K, V). Returns a map of (K, Int) pairs with the count of each key.
foreach: applies a function to all elements of an RDD.
fold: aggregates the elements of each partition, and then the results for all the partitions using an associative function and a neutral ‘zero value’.
first: returns the first element of an RDD.
count: returns the number of elements in an RDD.
collect: returns all the elements of an RDD as an array at the driver process.
distinct: returns a new RDD that contains the distinct elements of the source RDD.
take: returns an array with the first n elements of the RDD.
glom: returns an RDD created by coalescing all elements of the source RDD within each partition into a list.
cache: persists an RDD with the default storage level (‘MEMORY_ONLY’).
min: returns the minimum element of an RDD in the ordering defined by a comparator fn.
max: returns the maximum element of an RDD in the ordering defined by a comparator fn.
Spark provides the ability to persist (or cache) a dataset in memory across operations. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it. Caching is a key tool for iterative algorithms and fast interactive use. Like Spark, sparkling provides the functions
spark/cache to persist RDDs.
spark/persist sets the storage level of an RDD to persist its values across operations after the first time it is computed. Storage levels are available in the
sparkling.core/STORAGE-LEVELS map. This can only be used to assign a new storage level if the RDD does not have a storage level set already.
cache is a convenience function for using the default storage level, ‘MEMORY_ONLY’.
(let [line-lengths (->> (spark/text-file sc "data.txt") (spark/map count) spark/cache)] (->> line-lengths (spark/reduce +))) ;; 1406
We will provide you with further guides, e.g. on deploying your project to a Spark Cluster in the future. So, please stay tuned and check our guides section from time to time.
Star / watch the Sparkling Github Repo to keep up to date.
Let us know what was unclear or what has not been covered. Reader feedback is key to making the documentation better. If we know people are reading the documentation, we'll be much more inclined to make the documentation that much better. Please speak up!