The code used in this blog post can be found on GitHub.
Apache Spark is a data processing framework that supports building projects in Python and comes with MLlib, distributed machine learning framework. I was excited at the possibilities this software offered when I first read a guide to creating a movie recommendation engine. I was able to find some code snippets and helpful gists but I couldn't find an end-to-end tutorial for a recommendation engine using Spark that was written in Python so I set about building the engine and below I've documented my steps in creating it.
Getting an environment setup
First setup a fresh Ubuntu 14.04.2 machine and install Java, Scala, Git, Unzip and some Python dependencies:
$ sudo add-apt-repository ppa:webupd8team/java $ sudo apt update $ sudo apt install \ git \ oracle-java7-installer \ python-dev \ python-virtualenv \ scala \ unzip
Spark then needs to be downloaded and built using the sbt build tool:
$ curl -O http://apache.cs.utah.edu/spark/spark-1.3.0/spark-1.3.0.tgz $ tar xvf spark-1.3.0.tgz $ cd spark-1.3.0/ $ build/sbt assembly
I've included the code for this tutorial in a repo so clone it and install the requirements:
$ virtualenv spark_venv $ source spark_venv/bin/activate $ git clone https://github.com/marklit/recommend.git $ cd recommend $ pip install -r requirements.txt
User-submitted film ratings data supplied by MovieLens will be used to train our collaborative filtering model.
$ curl -O http://files.grouplens.org/papers/ml-1m.zip $ unzip -j ml-1m.zip "*.dat"
Note: the ratings themselves seem to be only on films released before the turn of the century.
Anatomy of the recommendation engine
There are two main parts of the engine, the first is the model trainer and the second is the recommendation generation.
When communicating with Spark pyspark is used and requires a context to be kept during communications and for it to be closed when you no longer need it. For this reason I put together a context manager:
@contextlib.contextmanager def spark_manager(): conf = SparkConf().setMaster(SPARK_MASTER) \ .setAppName(SPARK_APP_NAME) \ .set("spark.executor.memory", SPARK_EXECUTOR_MEMORY) spark_context = SparkContext(conf=conf) try: yield spark_context finally: spark_context.stop()
Now Spark-related parts of the code can be wrapped in a with statement simplifying the context management process.
The film ratings file is loaded into a Resilient Distributed Dataset (RDD) where its elements can be operated on in a fault-tolerant fashion.
with spark_manager() as context: ratings = context.textFile(training_data_file) \ .filter(lambda x: x and len(x.split('::')) == 4) \ .map(parse_rating)
The parse_rating method will create a column with a value between 0 and 9 which is a modulus of each rating's time stamp. This value will be used to break the rows up into three sets: a training set, a validation set and a test set.
training = ratings.filter(lambda x: x < 6) \ .values() \ .repartition(numPartitions) \ .cache() validation = ratings.filter(lambda x: x >= 6 and x < 8) \ .values() \ .repartition(numPartitions) \ .cache() test = ratings.filter(lambda x: x >= 8) \ .values() \ .cache()
The recommendation engine uses pyspark.mllib.recommendation.ALS to train its model with the training data. Various combinations of ranks, lambdas and iterations are run to see which has the lowest RMSE (Root Mean Squared Error) against the validation model. The model with the lowest RMSE is evaluated against the test set of data.
for rank, lmbda, numIter in itertools.product(ranks, lambdas, iterations): model = ALS.train(ratings=training, rank=rank, iterations=numIter, lambda_=lmbda) validationRmse = compute_rmse(model, validation, numValidation) if validationRmse < bestValidationRmse: bestModel, bestValidationRmse = model, validationRmse bestRank, bestLambda, bestNumIter = rank, lmbda, numIter testRmse = compute_rmse(bestModel, test, numTest)
Note: lambda is a keyword in Python so it's misspelt to avoid conflicts.
The best resulting combination of rank, lambda and iteration count are reported back. Here is an example of trying a few different combinations of ranks, lambdas and iteration counts:
$ ../bin/spark-submit recommend.py train ratings.dat \ --ranks=8,9,10 --lambdas=0.31,0.32,0.33 --iterations=3
The best model was trained with: Rank: 10 Lambda: 0.320000 Iterations: 3 RMSE on test set: 0.931992
The recommendation engine needs to know your opinion on films which have been rated by a lot of other users. The metrics command shows which films have the largest number of user ratings:
$ ../bin/spark-submit recommend.py metrics ratings.dat movies.dat
10 most rated films: 3,428 #2858 American Beauty (1999) 2,991 #260 Star Wars: Episode IV - A New Hope (1977) 2,990 #1196 Star Wars: Episode V - The Empire Strikes Back (1980) 2,883 #1210 Star Wars: Episode VI - Return of the Jedi (1983) 2,672 #480 Jurassic Park (1993) 2,653 #2028 Saving Private Ryan (1998) 2,649 #589 Terminator 2: Judgment Day (1991) 2,590 #2571 Matrix, The (1999) 2,583 #1270 Back to the Future (1985) 2,578 #593 Silence of the Lambs, The (1991)
I've picked 5 films which have a lot of ratings and added a parameter to the recommend command which let you rate each of them. 1 is a poor film, 5 is the best and 0 if you haven't seen it. The films are American Beauty (1999), Jurassic Park (1993), Terminator 2: Judgement Day (1991), The Matrix (1999) and Back to the Future (1985). The following parameter rates them 5, 3, 5, 5 and 4 accordingly:
So with the ratings, rank, lambda and iterations picked you can now see which films are recommended viewing.
$ ../bin/spark-submit recommend.py recommend ratings.dat movies.dat \ --ratings=5.0,3.0,5.0,5.0,4.0 \ --rank=10 --lambda=0.32 --iteration=3
His Girl Friday (1940) New Jersey Drive (1995) Breakfast at Tiffany's (1961) Halloween 5: The Revenge of Michael Myers (1989) Just the Ticket (1999) I'll Be Home For Christmas (1998) Goya in Bordeaux (Goya en Bodeos) (1999) For the Moment (1994) Thomas and the Magic Railroad (2000) Message in a Bottle (1999)