This guide assumes, that you have basic knowledge of Sparkling, e.g. by reading the getting started guide. It will walk you through a complete (albeit small) project. The task fulfilled in the project will compute tf-idf for a set of documents. Tf-idf stands for Term-frequency / inverse document frequency and is used to determine relevance of a document in a given corpus for a given query. Thus, given a set of documents, our project will yield a term vector for each document.
The code for this guide can be found in the master branch of our “getting started repo”. However, you should be able to setup the complete project simply by following this guide.
Tuple2
from Apache SparkTF-IDF (term frequency-inverse document frequency) is a way to score the importance of terms in a document based on how frequently they appear across a collection of documents (corpus). The tf-idf weight of a term in a document is the product of its tf
weight:
tf(t, d) = (number of times term t appears in document d) / (total number of terms in document d)
and its idf
weight:
idf(t) = ln(total number of documents in corpus / (1 + number of documents with term t))
## Setting up a new project using lein
Make sure you have Leiningen installed, I’m currently using version 2.5.1. And my working directory is tmp
, as you can see from my shell prompt ➜ tmp
.
Now, create a new project named “tf-idf”.
Now, open that project in your IDE of choice. Personally, I use IntelliJ/Cursive, but for the sake of this tutorial, let’s use Light Table, as a easy-going common denominator. We start by editing project.clj
, adding a dependency to [gorillalabs/sparkling 1.1.1]
.
The file should now look like this:
To check whether everything is fine run lein test
, the result should look like this:
Now we can start working on our problem. Open src/tf_idf/core.clj
in Light Table and turn it into an InstaREPL (using Ctrl+Space to open the Command palette, type “insta” and hit “enter”). You will see a little “live” tag in the upper right corner of you editor. Now delete the foo
function definition and add a require statement to the namespace and our own functions, so the file looks like this:
These functions provide a basic domain model for our problem.
As we’re in instarepl: just add
and see the functionality explode ;)
We’ll add this as a test right now. So, stay tuned!
Ok, it’s not quit big data right now, but a big pro for using Sparkling (or Spark, in general) is the ability to unit test the code. So far, we’re not running on the Spark Engine, but we can test our Clojure code using the test framework of our choice. I usually prefer expectations, but for sake of simplicity, we’re using clojure.test here.
Open test/tf_idf/core_test.clj
in Light Table and replace the existing a-test
with this:
I added a keybinding to my Light Table to execute tests on keystroke. You might want to have that also, because it makes things a lot easier here: Just open your keybindings (Ctrl+space
to open the Commands, type key
, select ‘Settings: User keymaps’) and add these lines inside the vector:
Upon saving the user.keymap
file, keybindings are reloaded and back in the core_test.clj
editor tab I press cmd-t to run my test. This will give me info about my tests in the status bar.
For all others, just use
And yes, I know that we should test remove-stopwords
and terms
functions separtely, but this is just a tutorial, so I took the short route.
Tuple2
from Apache SparkGo back to core.clj
, as we’re adding some more functionality.
For our next step we need to deal with some Spark internals: Spark uses Scalas Tuple2
instances for the PairRDDs, i.e. for its key-value-semantics. Thus, we need to work with Tuple2
also, but Sparkling makes this as easy as possible.
First, require sparkling.core
namespace, so our namespace definition looks like this:
Second, add another function
This function can be tested in the InstaREPL by adding the following line at the end of core.clj
:
The InstaREPL unfolds the evaluation and shows as result
So, doc1 has 1 appearance of “quick” from a total of 3 words. And yes, the count of words (doc-terms-count) is repeated in every term element. This simplifies things by “wasting” some memory.
As you can see, Sparkling brings its own tagged literal for the clojure reader to help you cope with Tuple2
without interfering with Scala too much. #sparkling/tuple
turns the following two-element vector into a Tuple2
instance for you while reading. There’s also a function for that: sparkling.core/tuple
, referenced here with the namespace alias as spark/tuple
. Remember: The #sparkling/tuple tagged literal reads the rest of the expression as is, reading #sparkling.tuple[(int 3) :foo] will read a list (with elements 'int
and flaot 3) as key element. It will not evaluate the call to long you might have expected when writing this. Use the tuple function for this kind of stuff.
As we now have everything necessary for the tf-part of our computation, we still need the idf thing. Just add those two functions:
And add a bunch of tests to your test namespace core-test
. Just add the following testing-block to your existing deftest:
Run lein test
from the command line to make sure everything works.
We’re ready to target the Spark engine to really process large amounts of data using a distributed system. …. Naaaa, we’ll stay on a local Spark master for the moment, but that’s ok for the sake of this guide: We’re introducing the Spark Context and the RDDs required.
To deal with an annoyance I stumbled over from time to time, especially in demos given without network connectivity, we start with a little hack: Spark binds to an IP found on your system, but that might not be the one you’re interested in. Thus, we configure that IP using the environment variable SPARK_LOCAL_IP. Good luck, there’s a leiningen plugin for that. Open you project.clj
and insert this :dev
profile:
And, while we’re on it, also add :aot :all
and a :main directive. If you’re not sure - you’re project.clj file should look like this:
The lein-dotenv plugin reads from an .env
file and sets environment variables for the subsequent lein tasks. Thus, create a .env
file in your project root directory with this content:
If you decided to not use Light Table but some other mechanism, make sure to make this environment variable available to your runtime.
Adding this requires you to reset the “connection” in Light Table. Got to Connections (Menu “Views” > “Connections”) and “disconnect”, start a new connection going to the editor tab for core.clj
, open the command palette (Ctrl+Space) and select “Eval: Eval editor contents”, which will start a new Clojure REPL connection for you.
In my Light Table version that’s the point where InstaREPL is not working anymore, but evaluating the contents of the editor is perfectly possible. Remember the keystroke, you’ll need it from time to time. For me it’s Cmd-Shift-Enter
.
Add the line
to your core.clj
file and evaluate. It should show “127.0.0.1” right next to the expression.
Now we’re ready to create a Spark context.
First, require namespace sparkling.conf
:
Second, add this to core.clj
:
Test this in your command line using lein run
and you should see something like this:
As your program exits, the services are also teared down, so going to the Spark WebUI under http://localhost:4040 is pretty pointless right now. You won’t find anything.
Remember: As Spark requires to have functions compiled, you need to AOT-compile the namespaces with functions used in spark transformations. This is done when running lein run
, but it’s not done in your REPL automatically. Strange errors? Do lein do clean, run
and make sure nothing broke when you clean your stuff first.
From here on, we go test driven. So open up core_test.clj
and insert this in your ns definition:
as well as these functions (at the end of the file):
Both prepare stuff for our test: make-test-context
creates a local, single-threaded Spark Context to use in our tests and the documents-fixture
provides a tuple-set of documents with id and body as key/value pairs.
Now, we need the total number or documents to compute idf, so we start working on a document-count
function in a TDD fashion. We add some tests down here:
lein test
ing that should result in an error, as we do not have a function document-count yet. So let’s add it to core.clj
just before the -main
function:
Now, lein test
should run 2 tests with 6 assertions and report back that everything is fine.
Congratulations, you wrote your first big data job using Sparkling, fully unit tested. ;) However, it just does nothing so far.
## Destructuring Spark’s objects using wrapper functions
We now put the pieces together. From our RDD (resilient distributed dataset) of documents, we create another RDD using our term-count-from-doc
function. We apply this function to every document using on of sparkling’s map functions. As term-count-from-doc
returns a collection of tuples for one document and we’re interested in a collection of tuples over all documents, we’re using flat-map-to-pair
, stitching together all the different per-document collections to one for the whole corpus.
First, we create two small helper functions in our core_test.clj:
tuple2vec
converts a Scala Tuple2
instance returned from Spark to a vector using a wrapped version of the vector function. The sparkling/destructuring
namespace contains a number of wrapper functions to destructure the different instances returned from Spark, depending on the situation. For example, a (flat-)map-to-pair
function does return a Tuple2
to store a key-value-pair. A join on two PairRDDs will return a Tuple2
with another Tuple2
as value, thus we need another wrapper function from sparkling/destructuring
to wrap functions processing RDDs of that type.
first2vec
picks the first element from an RDD and returns it as a vector.
Second, we add a test for this inside the spark-functions-test
:
It’s a bit oversimplifying to have this as the only test case, but it’s enough to show how to test your sparkling code.
Now let’s populate the core.clj
to have your tests accepted:
Add [sparkling.destructuring :as s-de]
as required to the namespace definition, and add this function (before the -main
function).
Now your tests should be green again. Just check.
## Spark transformations for idf and tf computation
Next step is to compute idf. Remember, idf is defined like this:
idf(t) = ln(total number of documents in corpus / (1 + number of documents with term t))
We already have the total number of documents (document-count
), thus we need the number of documents with term t for each term t in our corpus. Sounds like a PairRDD from term->document-count-for-term. We’ll get this from our document-term-corpus like this:
Let’s add that function:
This remaps the document-term-corpus to an rdd containing term
,1-tuples, as each document/term-combination will appear only once in the document-term-corpus rdd. From there, we just need to count all documents per term.
I’ll skip tests for that for sake of brevity, but you shouldn’t do that. Go on, write a test for that in core_test.clj
. I’ll wait for you right here!
You’re back again? That’s fine!
Let’s go on to compute the `idf-by-term easily:
We do the same for tf
. Remember tf’s definition? Neither do I. So let’s repeat:
tf(t, d) = (number of times term t appears in document d) / (total number of terms in document d)
Both numbers are in our term-count-from-doc-rdd for each document/term combination. So we map over this:
## Joining idf and tf information to get to tf-idf
To combine tf and idf, we need to join both RDDs. That’s the reason we did return a Tuple (term -> [doc tf])
from tf-by-doc-term
, because Spark only joins over the same keys. So both tf-by-doc-term
and idf-by-term
need to be keyed by term.
With this said, it’s easy to develop tf-idf-by-doc-term
: First we join, and then we map-to-pair to a new doc,term combination as key and the tf*idf as value.
Did you enjoy following? Did you write tests for the new code? You could do so now, if you like!
Now, we just need to stich everything together like this:
Now populate your -main
function like this:
There you are. lein run
your project from the command line an you should see this:
And that’s how you develop your own Spark job using Sparkling. From here, you can go on deploy your stuff to the cluster or run it from a local driver against the cluster - but beware the traps lying around there. To get you going until I find time to write a separate guide on that topic remember to use spark-submit
, and to read the docs on running Spark on a cluster.
And you can checkout the whole source from our Github getting-started project.
Please take a moment to tell us what you think about this guide on Twitter or open an issue on Github.
Let us know what was unclear or what has not been covered. Reader feedback is key to making the documentation better. If we know people are reading the documentation, we'll be much more inclined to make the documentation that much better. Please speak up!