DogDogFish

Data Science, amongst other things.

Tag: Hadoop

Building a Recommendation Service on AWS with Mahout

Hi all,

Those (two) of my regular readers will know I frequently curse my fool-of-a-took laptop. With this in mind I’ve been meaning to write a tutorial on working with Amazon’s web services for when you don’t have the hardware you need. This isn’t that post but I will assume you’ve already got a cluster to hand with all the bits and bobs you need installed.

If you don’t, don’t worry about it (for now) – I’ll do a post on how to get one of those set up at a later date. For now I’d like to talk about collaborative filtering. This is one of the most prevalent techniques for generating recommendations and with good reason – it’s pretty good. It’s quite a common sense method:

I know that person A likes items 1, 2 and 3 but not item 4.
I know that person B likes item 4 and 3 but not 1 or 2.
I know that person C likes item 1  but not 2, 3 or 4.
I know that person D doesn’t like item 4 – which of the items 1, 2 or 3 should I recommend them?

I’m sure most people reading this will say we should recommend them item 1, then probably item 2 then 3? Certainly item 1 with highest preference. Now if you deconstruct your thought process in coming up with that recommendation I’d wager (not a lot) that it’d be akin to the collaborative filtering algorithm.

Based on the behaviour of people A, B and C we can start to get an idea about which items are rated favourably by the same people versus those that aren’t. User-based collaborative filtering is about finding similar users to person D and looking at what they liked. Item-based collaborative filtering is about finding similar items to the ones person D has rated positively and recommending those.

There are lots of nice articles on collaborative filtering – more often than not when it comes to machine learning I end up recommending Andrew Ng material and this is no exception – check out his machine learning course on Coursera. There’s a whole lecture on collaborative filtering and it’s boss. If my explanation wasn’t enough (if you’re serious about this sort of thing, it shouldn’t be) check out his lectures on the subject and read around on the mathematical details of it all. Make sure you polish up on your linear algebra – you’ll have very large sparse matrices heading your way!

Also, please note that this whole post is just a re-imagining of this post by Amazon: Recommendations using EMR – I liked it and thought I’d give it a go myself with a different data set and technologies I was more familiar with. Now on with the show…

The Data Set

I’m going to be using a publicly available data set containing user ratings of books gathered on Amazon – dataset available here: Book Data Set

There are three files in there – one with all the details of the books, one with all the details of the users and finally one with all the user ratings. Don’t worry about downloading them yet if you’re following this as a tutorial/example – we’ll stick that bit in a script later on.

The Cluster Set-Up

Given that I’m on the free tier and I don’t use AWS at work (we’ve got a nice cluster that handles all our data just fine thank you) I’m a bit ‘over-compute’ happy at the moment and so I’m going to use a 5 node cluster (1 master node and 4 slaves). This isn’t really necessary but then nor is this as a piece of work so I’m not that bothered by practicality at this point. The benefit of using Amazon’s Elastic Mapreduce is that the nodes all come with Hadoop (& Hive/Pig/Mahout/whatever) installed and set up so you don’t have to bother with any of that.

Building a Service

I want to demonstrate how easy it is to turn this sort of thing into a recommendations service (instead of a one-off bit of analysis). Now I’m going to stress that I’m setting this up as an offline batch machine learning method and that there are a few steps I’d change if I were actually setting this up as a real, ‘production’, service.

I’m going to use a Redis database because I’ve got a reasonable amount of memory on these boxes (Redis is an in-memory key-value store), because Redis is really really quick and because I’m used to using Redis with Python. Redis will store the output of my collaborative filtering and I’ll stick a lightweight web framework (Flask) on top of it. This will allow a client to retrieve recommendations for a given user as well as delete them or overwrite them. I’ve decided to add the latter two functions because we don’t live in a perfect world.

I’ll also write a script that’ll update the books, users and ratings files and stick it on a cronjob to run every day. That way, by copying this method you’ll end up with a reasonably scalable, reasonably fast and reasonably up-to-date recommendation service. It’s not the best one out there (heck, it might be the worst) but it goes to show how simple these things are to set up given all the tools that are already written…

The Code

Firstly, we’ll create the cluster using the Elastic Mapreduce CLI:

./elastic-mapreduce --create --alive --name recommendation-service --num-instances 5 --master-instance-type m1.large --slave-instance-type m2.xlarge --ami-version 3.1 --ssh

The --ssh means that once the cluster has been created and all the necessary bits and bobs installed we ssh into the master node

Extracting the data set

 
wget http://www2.informatik.uni-freiburg.de/~cziegler/BX/BX-CSV-Dump.zip
unzip BX-CSV-Dump.zip
sed '1d;' BX-Book-Ratings.csv | tr ';' ',' | tr -d '"' > Book-Ratings.csv}

Unfortunately for us, we’ve got a bit of work to do – the ‘out of the box’ Mahout recommendation algorithm requires user and book ids to be integers and ISBNs are not. To get around this, we use this nifty little script:

#!/usr/bin/python

import sys
import re

isbn_dictionary = {}

counter = 0

with open('Book-Ratings.csv', 'rb') as f:
    with open('New-Book-Ratings.csv', 'w') as g:
        with open('Book-Mappings.csv', 'w') as h:
            for line in f:
                try:
                    user, book, rating = line.split(',')
                except:
                    continue
                try:
                    index = isbn_dictionary[book]
                except:
                    isbn_dictionary[book] = counter
                    index = counter
                    h.write("%s,%dn" % (book, counter))
                    counter += 1
                g.write("%d,%d,%dn" % (int(user), index, int(rating)))

Now we’ve got data in the format our Mahout recommender is expecting let’s put it in HDFS and run our algorithm

hadoop fs -put New-Book-Ratings.csv /New-Book-Ratings.csv
mahout recommenditembased --input /New-Book-Ratings.csv --output recommendations --numRecommendations 10 --similarityClassname SIMILARITY_COSINE

Now we play the waiting game…
Actually, while that’s running, open up another SSH session to that terminal and run these commands

sudo easy_install redis flask
wget http://download.redis.io/releases/redis-2.8.13.tar.gz
tar xzf redis-2.8.13.tar.gz
cd redis-2.8.13
make
./src/redis-server &
cd

Now Redis is installed and running and we’ve got the necessary Python libraries we can create the following server…

import redis
import os

isbn_dictionary = {}

with open('Book-Mappings.csv', 'rb') as f:
    for line in f:
        isbn, counter = line.split(',')
        isbn_dictionary[int(counter)] = isbn

r = redis.StrictRedis(host='localhost', port=6379, db=0)

p = os.popen('hadoop fs -cat recommendations/part*')

for recommendation in p:
    user, recommendations = recommendation.split('t')
    recommendations = [entry.split(':')[0] for entry in recommendations.replace('[', '').replace(']','').split(',')]
    recommendations = [isbn_dictionary[int(entry)] for entry in recommendations]
    r.set(int(user), recommendations)

Now we’ve populated our database with all the recommendations we generated with our Mahout job. Now we’ll set something up (called webserver.py) to return these items to a client (as well as allow modifications to be made).

import redis
from flask import Flask
r = redis.StrictRedis(host='localhost', port=6379, db=0)

app = Flask(__name__)

@app.route('/user/<user_id>', methods = ['GET', 'POST', 'PUT', 'DELETE'])
def restful_api(user_id):
    if request.method == 'GET':
        recommendations = r.get(int(user_id))
        return recommendations if recommendations else "User %d does not exist" % int(user_id)
    elif request.method == 'POST':
        if request.headers['Content-Type'] == 'application/json':
            r.set(int(user_id), request.json)
            return "Successfully set recommendations for user %d" % int(user_id)
        else:
            return False
    elif request.method == 'DELETE':
        if r.exists(int(user_id)):
            r.delete(int(user_id))
            return "Successfully deleted recommendations for user %d" % int(user_id)
        else:
            return "User %d does not exist" % int(user_id)
    elif request.method == 'PUT':
        if request.headers['Content-Type'] == 'application/json':
            r.set(int(user_id), request.json)
            return "Successfully set recommendations for user %d" % int(user_id)
        else:
            return "Require JSON put"
    else:
        return "No idea what you've done here, but don't do it again"

if __name__ == '__main__':
    app.run()

Now we’ve built our client-facing recommendations service let’s ‘deploy’ it and give it a test

python webserver.py &
curl -X GET localhost:5000/user/277965
curl -X DELETE localhost:5000/user/277965
curl -X GET localhost:5000/user/277965

Summary

We’ve not done anything that tricky but in a fairly short time we’ve managed to build something that’d be at least part way useful to a small business somewhere looking for recommendations for their users. It’s not a great leap from this and a genuine production ready system – you don’t need 5 Hadoop nodes running all the time for a 20 minute Mahout job and I’m uncomfortable using Flask and local files for a serious system. As such, you’d likely put together a couple of nodes, each with their own copy of the recommendations set in memory. One of them would create a cluster, get any new data a generate the data set as often as you saw fit – communicating it to the other node. Finally, you’d add a few more actions to the server and make sure it could be called by whichever clients need it (and block everybody else).

As I say, nothing earth-shattering but a solid base on which to build a recommendations system.

I am become death.

 

Hadoop wordcount in Python

Hi all,

There’ll be a follow up post to this detailing how to run a mapreduce using Eclipse and Java but, as I’ve found myself in permissions hell in running that, I’ll go with the easy one first. Hadoop comes with a streaming jar that allows you to write your mappers and reducers in any language you like – just take input from stdin and output to stdout and you’re laughing. I’ll show you how to achieve this using Python.

Cluster Set-up

I’m going to assume you’ve followed a tutorial and have got Hadoop installed and working – if you haven’t, follow one (maybe even mine) and then come back here. Make sure you’ve got HDFS and Yarn running by executing the following commands:

su - hduser ## Only need this if you created a user called hduser to interface with Hadoop
cd /usr/local/hadoop ## If you followed the tutorial - otherwise, wherever your Hadoop home directory is
sbin/start-all.sh

Let’s see about putting a text file into HDFS for us to perform a word count on – I’m going to use The Count of Monte Cristo because it’s amazing. Honestly, get it read if you haven’t. It’s really really good. Anywho, enough fandom – this little command will download the whole book and stick it into whichever directory you happen to be in when you run the command.

 cd ~
wget -O 'count_of_monte_cristo.txt' http://www.gutenberg.org/cache/epub/1184/pg1184.txt

Now we’ve got the file in our home directory (really, it was that easy, check it out if you don’t believe me – then read the book). However, that’s not in HDFS – we need to explicitly put it there. I’m going to create a directory in HDFS called input and then put the file in there:

/usr/local/hadoop/bin/hadoop fs -mkdir /input
/usr/local/hadoop/bin/hadoop fs -put ~/count_of_monte_cristo.txt /input

Has it worked?

Run this command:

 /usr/local/hadoop/bin/hadoop fs -ls /input | grep count_of_monte_cristo | awk -F '/' '{print $3}' | cut -d '.' -f1 

If it returns a warning followed by ‘count_of_monte_cristo’ then you’re in the money. If you don’t understand the commands above, don’t worry. But do find out about them.

Otherwise, drop me a comment and I’ll see what can be done.

The Mapper

With this bit of code we’re going to go over every line in the text file and output the word and the number of instances of that word (one, for now) – easy does it:

#!/usr/bin/python

import sys

for line in sys.stdin:
    for word in line.strip().split():
        print "%st%d" % (word, 1)

Save that file as something sensible at a sensible location – I’m going to use /home/hduser/word_mapper.py.
Also, make sure it’s executable:

chmod +x /home/hduser/word_mapper.py

Has it worked?
Run this little beaut’ of a command:

 /usr/local/hadoop/bin/hadoop fs -cat /input/count_of_monte_cristo.txt | /home/hduser/word_mapper.py 

If you’ve gone maverick and used a different filename or file location then that’s fine – just substitute that in where I’ve used

/home/hduser/word_mapper.py

. If you’ve gone maverick but don’t really know what you’re doing and don’t know what I’ve just said, that’s basically on you. Keep trooping on, you’ll get there.

Either way, don’t stop until that code outputs a stream of words followed by the number 1. Don’t worry – you can stop it by pressing Ctrl and C together.

The Reducer

We’ve got ourselves a nice stream of words. The Hadoop streaming jar will take care of the sorting for us (though we can override the default behaviour should we choose) so we just need to decide what to do with that stream of words. I’m going to propose this:

#!/usr/bin/python

import sys

current_word = None
current_count = 1

for line in sys.stdin:
    word, count = line.strip().split('t')
    if current_word:
        if word == current_word:
            current_count += int(count)
        else:
            print "%st%d" % (current_word, current_count)
            current_count = 1

    current_word = word

if current_count > 1:
    print "%st%d" % (current_word, current_count)

Follow the code through and try to think of the different cases it’s trying to catch. The first and last lines are tricky but play around with it – what happens if I just feed a file containing one word? What about a file with no duplicate words? Think about all the different cases and hopefully – the above code handles them all as you’d expect. If not, please let me know. That’d be real embarrassing.

Has it worked?

Make sure that file is executable:

 chmod +x /home/hduser/word_reducer.py 

Run this:

 /usr/local/hadoop/bin/hadoop fs -cat /input/count_of_monte_cristo.txt | /home/hduser/word_mapper.py | head -n 100 | sort | /home/hduser/word_reducer.py 

If everything’s gone to plan you should see a bunch of lines and associated counts – some of them should be non-one.

Super.

Run the Mapreduce

This is what you’ve been waiting for. Well – it’s what I’ve been waiting for at least. Run this command and you’ll basically be a Hadoop hero:

 cd /usr/local/hadoop
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.4.0.jar -files /home/hduser/word_mapper.py,/home/hduser/word_reducer.py -mapper /home/hduser/word_mapper.py -reducer /home/hduser/word_reducer.py -input /input/count_of_monte_cristo.txt -output /output

And off it goes – enjoy watching your mapreduce race through at what I’m sure is a barely tolerable crawl.

Has it worked?

Run this beauty:

 /usr/local/hadoop/bin/hadoop fs -cat /output/part-00000 

If you see a stream of likely looking results – you’re golden. If you want to get the file out of HDFS for inspection run something like this:

 /usr/local/hadoop/bin/hadoop fs -get /output/part-00000 /home/hduser/monte_cristo_counted.txt
less /home/hduser/monte_cristo_counted.txt 

Hope that’s worked well for you – it’s not the most glamorous of Hadoop jobs but it’s a good stepping stone. In a post coming to you soon I should be able to show you how to get Eclipse set up to run Hadoop jobs and give you an example or two in Java.

(Pseudo) Distributed Wishes

Installing Hadoop 2.4 on Ubuntu 14.04

Hey all,

Another of my ‘getting my new operating system set up with all the bits of kit I use’ – this time we’ll be on Hadoop (and HDFS). There’s a very strong chance that this post will end up a lot like Sean’s post – Hadoop from spare-change. If there are any differences it’ll be for these reasons three:
1.) He was using Ubuntu Server 13.04 not Ubuntu Desktop 14.04
2.) He was using Hadoop 2.2 not Hadoop 2.4
3.) He was setting up a whole bunch of nodes – I’m stuck with this oft-abused laptop

Anywho – on with the show.

Step 1:

Download Hadoop from Apache: I’ll be using this mirror but I trust that if you’re not in England, you can likely find a more suitable one:
http://mirror.ox.ac.uk/sites/rsync.apache.org/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz

If you’re trying to stick to the terminal/don’t have a GUI then go with this:

wget http://mirror.ox.ac.uk/sites/rsync.apache.org/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz

Find your way to wherever you downloaded the tar.gz file and untar it using the following command:

tar -xzf hadoop-2.4.0.tar.gz

Sorry if I’m teaching you to suck eggs – everybody has to start somewhere right?

Has it worked up till here?

Run the following command in the same directory you ran the above tar command:

ls | grep hadoop | grep -v *.gz

If there’s at least one line returned (ideally hadoop-2.4.0) then you’re good up till here.

Step 2:

Let’s move everything into a more appropriate directory:

sudo mv hadoop-2.4.0/ /usr/local
cd /usr/local
sudo ln -s hadoop-2.4.0/ hadoop

We create that link to allow us to write scripts/programs that interact with Hadoop that won’t need changing if we upgrade our Hadoop version. All we’ll do is install the new version and point the Hadoop folder to the new version instead. Ace.

Has it worked up to here?

Run this command anywhere:

whereis hadoop

If the output is:
hadoop: /usr/local/hadoop
you may proceed.

Step 3:

Righty, now we’ll be setting up a new user and permissions and all that guff. I’ll steal directly from Michael Noll’s tutorial here and go with:

sudo addgroup hadoop
sudo adduser --ingroup hadoop hduser
sudo adduser hduser sudo
sudo chown -R hduser:hadoop /usr/local/hadoop/

Has it worked up to here?

Type:

ls -l /home/ | grep hadoop

If you see a line then you’re in the money.

Step 4:

SSH is a biggy – possibly not so much for the single node tutorial but when we were setting up our first cluster, SSH problems probably accounted for about 90% of all head-scratching with the remaining 10% being nits.


su - hduser
sudo apt-get install ssh
ssh-keygen -t rsa -P ""
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

So we switch to our newly created user, generate an SSH key and get it added to our authorized keys. Unfortunately, Hadoop and ipv6 don’t play nice so we’ll have to disable it – to do this you’ll need to open up /etc/sysctl.conf and add the following lines to the end:


net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1

Fair warning – you’ll need sudo privileges to modify the file so might want to open up your file editor like this:

sudo apt-get install gksu
gksu gedit /etc/sysctl.conf

If you’re set on using terminal then this’ll do it:

echo "net.ipv6.conf.all.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf
echo "net.ipv6.conf.default.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf
echo "net.ipv6.conf.lo.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf

Rumour has it that at this point you can run
sudo service networking restart
and kapeesh – ipv6 is gone. However, Atheros and Ubuntu seem to have a strange sort of ‘not working’ thing going on and so that command doesn’t work with my wireless driver. If the restart fails, just restart the computer and you should be good.

(if you’re terminal only : sudo shutdown -r now )

Has it worked up to here?

If you’re stout of heart, attempt the following:

su - hduser
ssh localhost

If that’s worked you be greeted with a message along the lines of ‘Are you sure you want to continue connecting?’ The answer you’re looking for at this point is ‘yes’.

If it hasn’t worked at this point run the following command:
cat /proc/sys/net/ipv6/conf/all/disable_ipv6

If the value returned is 0 then you’ve still not got ipv6 disabled – have a re-read of that section and see if you’ve missed anything.

Step 5:
I’m going to assume a clean install of Ubuntu on your machine (because that’s what I’ve got) – if this isn’t the case, it’s entirely likely you’ll already have Java installed. If so, find your JAVA_HOME (lots of tutorials on this online) and use that for the upcoming instructions. I’m going to be installing Java from scratch:

sudo apt-get update
sudo apt-get install default-jdk

Given a bit of luck, you’ll now have Java on your computer (I do on mine) and you’ll be able to set your environment variables. Open up your bashrc file:

su - hduser
gksu gedit .bashrc

and add the following lines:

export HADOOP_HOME=/usr/local/hadoop
export JAVA_HOME=/usr

and follow up with this command:
source ~/.bashrc

If you’ve deviated from any of the instructions above, those lines are likely to be different. You can find what your java home should be by running the following command:
which java | sed -e 's/(.*)/bin/java/1/g'

Your Hadoop home will be wherever you put it in step 2.

Has it worked up to here?

So many different ways to test – let’s run our first Hadoop command:

/usr/local/hadoop/bin/hadoop version

If that worked with no error (and gave you your Hadoop version) then you’re laughing.

Step 6:

Configuration of Hadoop (and associated bits and bobs) – we’re going to be editing a bunch of files so pick your favourite file editor and get to work. First things first though, you’re going to want some place for HDFS to save your files. If you’ve going to be storing anything big/bought external storage for this purpose now is the time to deviate from this tutorial. Otherwise, this should do it:


su - hduser
mkdir /usr/local/hadoop/data

Now for the file editing:

(only necessary when running a multi-node cluster, but let’s do it in case we ever get more nodes to add)
1.) /usr/local/hadoop/etc/hadoop/hadoop-env.sh
Change export JAVA_HOME=${JAVA_HOME} to match the JAVA_HOME you set in your bashrc (for us JAVA_HOME=/usr).
Also, change this line:
export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true
to be

export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true -Djava.library.path=$HADOOP_PREFIX/lib"

And finally, add the following line:
export HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_PREFIX}/lib/native

2.) /usr/local/hadoop/etc/hadoop/yarn-env.sh
Add the following lines:

export HADOOP_CONF_LIB_NATIVE_DIR=${HADOOP_PREFIX:-"/lib/native"}
export HADOOP_OPTS="-Djava.library.path=$HADOOP_PREFIX/lib"

3.) /usr/local/hadoop/etc/hadoop/core-site.xml
Change the whole file so it looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:9000</value>
</property>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/usr/local/hadoop/data</value>
</property>
</configuration>

4.) /usr/local/hadoop/etc/hadoop/mapred-site.xml
Change the whole file so it looks like this:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

5.) /usr/local/hadoop/etc/hadoop/hdfs-site.xml
Change the whole file so it looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
</configuration>

6.) /usr/local/hadoop/etc/hadoop/yarn-site.xml
Change the whole file so it looks like this:

<?xml version="1.0"?>
<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>
    <property>
        <name>yarn.resourcemanager.resource-tracker.address</name>
        <value>localhost:8025</value>
    </property>
    <property>
        <name>yarn.resourcemanager.scheduler.address</name>
        <value>localhost:8030</value>
    </property>
    <property>
        <name>yarn.resourcemanager.address</name>
        <value>localhost:8050</value>
    </property>
</configuration>

Annnd we’re done 🙂 Sorry about that – if I could guarantee that you’d be using the same file paths and OS as me then I’d let you wget those files from a Github somewhere but alas, I think that’s likely to cause more headaches than it solves. Don’t worry, we’re nearly there now 🙂

Has it worked up to here?

Run the following command:

/usr/local/hadoop/bin/hadoop namenode -format

If that works, you’re 20% of the way there.

Then, run:

/usr/local/hadoop/sbin/start-dfs.sh

If that seems to work without throwing up a bunch of errors:

/usr/local/hadoop/sbin/start-yarn.sh

If that’s worked, you can safely say you’ve got Hadoop running on your computer 🙂 Get it on the LinkedIn as a strength as soon as possible 😉

Conclusion
Now you’ve got Hadoop up and running on your computer, what can you do? Well, unfortunately with that single node and single hard disk, not much you couldn’t have done without it. However, if you’re just getting started with Linux and Hadoop you’ll have hopefully learnt a bit on the way to setting up your cluster.

© 2017 DogDogFish

Theme by Anders NorenUp ↑