Data Science, amongst other things.

Category: Hadoop

Building a Recommendation Service on AWS with Mahout

Hi all,

Those (two) of my regular readers will know I frequently curse my fool-of-a-took laptop. With this in mind I’ve been meaning to write a tutorial on working with Amazon’s web services for when you don’t have the hardware you need. This isn’t that post but I will assume you’ve already got a cluster to hand with all the bits and bobs you need installed.

If you don’t, don’t worry about it (for now) – I’ll do a post on how to get one of those set up at a later date. For now I’d like to talk about collaborative filtering. This is one of the most prevalent techniques for generating recommendations and with good reason – it’s pretty good. It’s quite a common sense method:

I know that person A likes items 1, 2 and 3 but not item 4.
I know that person B likes item 4 and 3 but not 1 or 2.
I know that person C likes item 1  but not 2, 3 or 4.
I know that person D doesn’t like item 4 – which of the items 1, 2 or 3 should I recommend them?

I’m sure most people reading this will say we should recommend them item 1, then probably item 2 then 3? Certainly item 1 with highest preference. Now if you deconstruct your thought process in coming up with that recommendation I’d wager (not a lot) that it’d be akin to the collaborative filtering algorithm.

Based on the behaviour of people A, B and C we can start to get an idea about which items are rated favourably by the same people versus those that aren’t. User-based collaborative filtering is about finding similar users to person D and looking at what they liked. Item-based collaborative filtering is about finding similar items to the ones person D has rated positively and recommending those.

There are lots of nice articles on collaborative filtering – more often than not when it comes to machine learning I end up recommending Andrew Ng material and this is no exception – check out his machine learning course on Coursera. There’s a whole lecture on collaborative filtering and it’s boss. If my explanation wasn’t enough (if you’re serious about this sort of thing, it shouldn’t be) check out his lectures on the subject and read around on the mathematical details of it all. Make sure you polish up on your linear algebra – you’ll have very large sparse matrices heading your way!

Also, please note that this whole post is just a re-imagining of this post by Amazon: Recommendations using EMR – I liked it and thought I’d give it a go myself with a different data set and technologies I was more familiar with. Now on with the show…

The Data Set

I’m going to be using a publicly available data set containing user ratings of books gathered on Amazon – dataset available here: Book Data Set

There are three files in there – one with all the details of the books, one with all the details of the users and finally one with all the user ratings. Don’t worry about downloading them yet if you’re following this as a tutorial/example – we’ll stick that bit in a script later on.

The Cluster Set-Up

Given that I’m on the free tier and I don’t use AWS at work (we’ve got a nice cluster that handles all our data just fine thank you) I’m a bit ‘over-compute’ happy at the moment and so I’m going to use a 5 node cluster (1 master node and 4 slaves). This isn’t really necessary but then nor is this as a piece of work so I’m not that bothered by practicality at this point. The benefit of using Amazon’s Elastic Mapreduce is that the nodes all come with Hadoop (& Hive/Pig/Mahout/whatever) installed and set up so you don’t have to bother with any of that.

Building a Service

I want to demonstrate how easy it is to turn this sort of thing into a recommendations service (instead of a one-off bit of analysis). Now I’m going to stress that I’m setting this up as an offline batch machine learning method and that there are a few steps I’d change if I were actually setting this up as a real, ‘production’, service.

I’m going to use a Redis database because I’ve got a reasonable amount of memory on these boxes (Redis is an in-memory key-value store), because Redis is really really quick and because I’m used to using Redis with Python. Redis will store the output of my collaborative filtering and I’ll stick a lightweight web framework (Flask) on top of it. This will allow a client to retrieve recommendations for a given user as well as delete them or overwrite them. I’ve decided to add the latter two functions because we don’t live in a perfect world.

I’ll also write a script that’ll update the books, users and ratings files and stick it on a cronjob to run every day. That way, by copying this method you’ll end up with a reasonably scalable, reasonably fast and reasonably up-to-date recommendation service. It’s not the best one out there (heck, it might be the worst) but it goes to show how simple these things are to set up given all the tools that are already written…

The Code

Firstly, we’ll create the cluster using the Elastic Mapreduce CLI:

./elastic-mapreduce --create --alive --name recommendation-service --num-instances 5 --master-instance-type m1.large --slave-instance-type m2.xlarge --ami-version 3.1 --ssh

The --ssh means that once the cluster has been created and all the necessary bits and bobs installed we ssh into the master node

Extracting the data set

sed '1d;' BX-Book-Ratings.csv | tr ';' ',' | tr -d '"' > Book-Ratings.csv}

Unfortunately for us, we’ve got a bit of work to do – the ‘out of the box’ Mahout recommendation algorithm requires user and book ids to be integers and ISBNs are not. To get around this, we use this nifty little script:


import sys
import re

isbn_dictionary = {}

counter = 0

with open('Book-Ratings.csv', 'rb') as f:
    with open('New-Book-Ratings.csv', 'w') as g:
        with open('Book-Mappings.csv', 'w') as h:
            for line in f:
                    user, book, rating = line.split(',')
                    index = isbn_dictionary[book]
                    isbn_dictionary[book] = counter
                    index = counter
                    h.write("%s,%dn" % (book, counter))
                    counter += 1
                g.write("%d,%d,%dn" % (int(user), index, int(rating)))

Now we’ve got data in the format our Mahout recommender is expecting let’s put it in HDFS and run our algorithm

hadoop fs -put New-Book-Ratings.csv /New-Book-Ratings.csv
mahout recommenditembased --input /New-Book-Ratings.csv --output recommendations --numRecommendations 10 --similarityClassname SIMILARITY_COSINE

Now we play the waiting game…
Actually, while that’s running, open up another SSH session to that terminal and run these commands

sudo easy_install redis flask
tar xzf redis-2.8.13.tar.gz
cd redis-2.8.13
./src/redis-server &

Now Redis is installed and running and we’ve got the necessary Python libraries we can create the following server…

import redis
import os

isbn_dictionary = {}

with open('Book-Mappings.csv', 'rb') as f:
    for line in f:
        isbn, counter = line.split(',')
        isbn_dictionary[int(counter)] = isbn

r = redis.StrictRedis(host='localhost', port=6379, db=0)

p = os.popen('hadoop fs -cat recommendations/part*')

for recommendation in p:
    user, recommendations = recommendation.split('t')
    recommendations = [entry.split(':')[0] for entry in recommendations.replace('[', '').replace(']','').split(',')]
    recommendations = [isbn_dictionary[int(entry)] for entry in recommendations]
    r.set(int(user), recommendations)

Now we’ve populated our database with all the recommendations we generated with our Mahout job. Now we’ll set something up (called to return these items to a client (as well as allow modifications to be made).

import redis
from flask import Flask
r = redis.StrictRedis(host='localhost', port=6379, db=0)

app = Flask(__name__)

@app.route('/user/<user_id>', methods = ['GET', 'POST', 'PUT', 'DELETE'])
def restful_api(user_id):
    if request.method == 'GET':
        recommendations = r.get(int(user_id))
        return recommendations if recommendations else "User %d does not exist" % int(user_id)
    elif request.method == 'POST':
        if request.headers['Content-Type'] == 'application/json':
            r.set(int(user_id), request.json)
            return "Successfully set recommendations for user %d" % int(user_id)
            return False
    elif request.method == 'DELETE':
        if r.exists(int(user_id)):
            return "Successfully deleted recommendations for user %d" % int(user_id)
            return "User %d does not exist" % int(user_id)
    elif request.method == 'PUT':
        if request.headers['Content-Type'] == 'application/json':
            r.set(int(user_id), request.json)
            return "Successfully set recommendations for user %d" % int(user_id)
            return "Require JSON put"
        return "No idea what you've done here, but don't do it again"

if __name__ == '__main__':

Now we’ve built our client-facing recommendations service let’s ‘deploy’ it and give it a test

python &
curl -X GET localhost:5000/user/277965
curl -X DELETE localhost:5000/user/277965
curl -X GET localhost:5000/user/277965


We’ve not done anything that tricky but in a fairly short time we’ve managed to build something that’d be at least part way useful to a small business somewhere looking for recommendations for their users. It’s not a great leap from this and a genuine production ready system – you don’t need 5 Hadoop nodes running all the time for a 20 minute Mahout job and I’m uncomfortable using Flask and local files for a serious system. As such, you’d likely put together a couple of nodes, each with their own copy of the recommendations set in memory. One of them would create a cluster, get any new data a generate the data set as often as you saw fit – communicating it to the other node. Finally, you’d add a few more actions to the server and make sure it could be called by whichever clients need it (and block everybody else).

As I say, nothing earth-shattering but a solid base on which to build a recommendations system.

I am become death.


Hadoop wordcount in Python

Hi all,

There’ll be a follow up post to this detailing how to run a mapreduce using Eclipse and Java but, as I’ve found myself in permissions hell in running that, I’ll go with the easy one first. Hadoop comes with a streaming jar that allows you to write your mappers and reducers in any language you like – just take input from stdin and output to stdout and you’re laughing. I’ll show you how to achieve this using Python.

Cluster Set-up

I’m going to assume you’ve followed a tutorial and have got Hadoop installed and working – if you haven’t, follow one (maybe even mine) and then come back here. Make sure you’ve got HDFS and Yarn running by executing the following commands:

su - hduser ## Only need this if you created a user called hduser to interface with Hadoop
cd /usr/local/hadoop ## If you followed the tutorial - otherwise, wherever your Hadoop home directory is

Let’s see about putting a text file into HDFS for us to perform a word count on – I’m going to use The Count of Monte Cristo because it’s amazing. Honestly, get it read if you haven’t. It’s really really good. Anywho, enough fandom – this little command will download the whole book and stick it into whichever directory you happen to be in when you run the command.

 cd ~
wget -O 'count_of_monte_cristo.txt'

Now we’ve got the file in our home directory (really, it was that easy, check it out if you don’t believe me – then read the book). However, that’s not in HDFS – we need to explicitly put it there. I’m going to create a directory in HDFS called input and then put the file in there:

/usr/local/hadoop/bin/hadoop fs -mkdir /input
/usr/local/hadoop/bin/hadoop fs -put ~/count_of_monte_cristo.txt /input

Has it worked?

Run this command:

 /usr/local/hadoop/bin/hadoop fs -ls /input | grep count_of_monte_cristo | awk -F '/' '{print $3}' | cut -d '.' -f1 

If it returns a warning followed by ‘count_of_monte_cristo’ then you’re in the money. If you don’t understand the commands above, don’t worry. But do find out about them.

Otherwise, drop me a comment and I’ll see what can be done.

The Mapper

With this bit of code we’re going to go over every line in the text file and output the word and the number of instances of that word (one, for now) – easy does it:


import sys

for line in sys.stdin:
    for word in line.strip().split():
        print "%st%d" % (word, 1)

Save that file as something sensible at a sensible location – I’m going to use /home/hduser/
Also, make sure it’s executable:

chmod +x /home/hduser/

Has it worked?
Run this little beaut’ of a command:

 /usr/local/hadoop/bin/hadoop fs -cat /input/count_of_monte_cristo.txt | /home/hduser/ 

If you’ve gone maverick and used a different filename or file location then that’s fine – just substitute that in where I’ve used


. If you’ve gone maverick but don’t really know what you’re doing and don’t know what I’ve just said, that’s basically on you. Keep trooping on, you’ll get there.

Either way, don’t stop until that code outputs a stream of words followed by the number 1. Don’t worry – you can stop it by pressing Ctrl and C together.

The Reducer

We’ve got ourselves a nice stream of words. The Hadoop streaming jar will take care of the sorting for us (though we can override the default behaviour should we choose) so we just need to decide what to do with that stream of words. I’m going to propose this:


import sys

current_word = None
current_count = 1

for line in sys.stdin:
    word, count = line.strip().split('t')
    if current_word:
        if word == current_word:
            current_count += int(count)
            print "%st%d" % (current_word, current_count)
            current_count = 1

    current_word = word

if current_count > 1:
    print "%st%d" % (current_word, current_count)

Follow the code through and try to think of the different cases it’s trying to catch. The first and last lines are tricky but play around with it – what happens if I just feed a file containing one word? What about a file with no duplicate words? Think about all the different cases and hopefully – the above code handles them all as you’d expect. If not, please let me know. That’d be real embarrassing.

Has it worked?

Make sure that file is executable:

 chmod +x /home/hduser/ 

Run this:

 /usr/local/hadoop/bin/hadoop fs -cat /input/count_of_monte_cristo.txt | /home/hduser/ | head -n 100 | sort | /home/hduser/ 

If everything’s gone to plan you should see a bunch of lines and associated counts – some of them should be non-one.


Run the Mapreduce

This is what you’ve been waiting for. Well – it’s what I’ve been waiting for at least. Run this command and you’ll basically be a Hadoop hero:

 cd /usr/local/hadoop
bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.4.0.jar -files /home/hduser/,/home/hduser/ -mapper /home/hduser/ -reducer /home/hduser/ -input /input/count_of_monte_cristo.txt -output /output

And off it goes – enjoy watching your mapreduce race through at what I’m sure is a barely tolerable crawl.

Has it worked?

Run this beauty:

 /usr/local/hadoop/bin/hadoop fs -cat /output/part-00000 

If you see a stream of likely looking results – you’re golden. If you want to get the file out of HDFS for inspection run something like this:

 /usr/local/hadoop/bin/hadoop fs -get /output/part-00000 /home/hduser/monte_cristo_counted.txt
less /home/hduser/monte_cristo_counted.txt 

Hope that’s worked well for you – it’s not the most glamorous of Hadoop jobs but it’s a good stepping stone. In a post coming to you soon I should be able to show you how to get Eclipse set up to run Hadoop jobs and give you an example or two in Java.

(Pseudo) Distributed Wishes

Installing Hadoop 2.4 on Ubuntu 14.04

Hey all,

Another of my ‘getting my new operating system set up with all the bits of kit I use’ – this time we’ll be on Hadoop (and HDFS). There’s a very strong chance that this post will end up a lot like Sean’s post – Hadoop from spare-change. If there are any differences it’ll be for these reasons three:
1.) He was using Ubuntu Server 13.04 not Ubuntu Desktop 14.04
2.) He was using Hadoop 2.2 not Hadoop 2.4
3.) He was setting up a whole bunch of nodes – I’m stuck with this oft-abused laptop

Anywho – on with the show.

Step 1:

Download Hadoop from Apache: I’ll be using this mirror but I trust that if you’re not in England, you can likely find a more suitable one:

If you’re trying to stick to the terminal/don’t have a GUI then go with this:


Find your way to wherever you downloaded the tar.gz file and untar it using the following command:

tar -xzf hadoop-2.4.0.tar.gz

Sorry if I’m teaching you to suck eggs – everybody has to start somewhere right?

Has it worked up till here?

Run the following command in the same directory you ran the above tar command:

ls | grep hadoop | grep -v *.gz

If there’s at least one line returned (ideally hadoop-2.4.0) then you’re good up till here.

Step 2:

Let’s move everything into a more appropriate directory:

sudo mv hadoop-2.4.0/ /usr/local
cd /usr/local
sudo ln -s hadoop-2.4.0/ hadoop

We create that link to allow us to write scripts/programs that interact with Hadoop that won’t need changing if we upgrade our Hadoop version. All we’ll do is install the new version and point the Hadoop folder to the new version instead. Ace.

Has it worked up to here?

Run this command anywhere:

whereis hadoop

If the output is:
hadoop: /usr/local/hadoop
you may proceed.

Step 3:

Righty, now we’ll be setting up a new user and permissions and all that guff. I’ll steal directly from Michael Noll’s tutorial here and go with:

sudo addgroup hadoop
sudo adduser --ingroup hadoop hduser
sudo adduser hduser sudo
sudo chown -R hduser:hadoop /usr/local/hadoop/

Has it worked up to here?


ls -l /home/ | grep hadoop

If you see a line then you’re in the money.

Step 4:

SSH is a biggy – possibly not so much for the single node tutorial but when we were setting up our first cluster, SSH problems probably accounted for about 90% of all head-scratching with the remaining 10% being nits.

su - hduser
sudo apt-get install ssh
ssh-keygen -t rsa -P ""
cat ~/.ssh/ >> ~/.ssh/authorized_keys

So we switch to our newly created user, generate an SSH key and get it added to our authorized keys. Unfortunately, Hadoop and ipv6 don’t play nice so we’ll have to disable it – to do this you’ll need to open up /etc/sysctl.conf and add the following lines to the end:

net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1

Fair warning – you’ll need sudo privileges to modify the file so might want to open up your file editor like this:

sudo apt-get install gksu
gksu gedit /etc/sysctl.conf

If you’re set on using terminal then this’ll do it:

echo "net.ipv6.conf.all.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf
echo "net.ipv6.conf.default.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf
echo "net.ipv6.conf.lo.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf

Rumour has it that at this point you can run
sudo service networking restart
and kapeesh – ipv6 is gone. However, Atheros and Ubuntu seem to have a strange sort of ‘not working’ thing going on and so that command doesn’t work with my wireless driver. If the restart fails, just restart the computer and you should be good.

(if you’re terminal only : sudo shutdown -r now )

Has it worked up to here?

If you’re stout of heart, attempt the following:

su - hduser
ssh localhost

If that’s worked you be greeted with a message along the lines of ‘Are you sure you want to continue connecting?’ The answer you’re looking for at this point is ‘yes’.

If it hasn’t worked at this point run the following command:
cat /proc/sys/net/ipv6/conf/all/disable_ipv6

If the value returned is 0 then you’ve still not got ipv6 disabled – have a re-read of that section and see if you’ve missed anything.

Step 5:
I’m going to assume a clean install of Ubuntu on your machine (because that’s what I’ve got) – if this isn’t the case, it’s entirely likely you’ll already have Java installed. If so, find your JAVA_HOME (lots of tutorials on this online) and use that for the upcoming instructions. I’m going to be installing Java from scratch:

sudo apt-get update
sudo apt-get install default-jdk

Given a bit of luck, you’ll now have Java on your computer (I do on mine) and you’ll be able to set your environment variables. Open up your bashrc file:

su - hduser
gksu gedit .bashrc

and add the following lines:

export HADOOP_HOME=/usr/local/hadoop
export JAVA_HOME=/usr

and follow up with this command:
source ~/.bashrc

If you’ve deviated from any of the instructions above, those lines are likely to be different. You can find what your java home should be by running the following command:
which java | sed -e 's/(.*)/bin/java/1/g'

Your Hadoop home will be wherever you put it in step 2.

Has it worked up to here?

So many different ways to test – let’s run our first Hadoop command:

/usr/local/hadoop/bin/hadoop version

If that worked with no error (and gave you your Hadoop version) then you’re laughing.

Step 6:

Configuration of Hadoop (and associated bits and bobs) – we’re going to be editing a bunch of files so pick your favourite file editor and get to work. First things first though, you’re going to want some place for HDFS to save your files. If you’ve going to be storing anything big/bought external storage for this purpose now is the time to deviate from this tutorial. Otherwise, this should do it:

su - hduser
mkdir /usr/local/hadoop/data

Now for the file editing:

(only necessary when running a multi-node cluster, but let’s do it in case we ever get more nodes to add)
1.) /usr/local/hadoop/etc/hadoop/
Change export JAVA_HOME=${JAVA_HOME} to match the JAVA_HOME you set in your bashrc (for us JAVA_HOME=/usr).
Also, change this line:
to be

export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$HADOOP_PREFIX/lib"

And finally, add the following line:

2.) /usr/local/hadoop/etc/hadoop/
Add the following lines:

export HADOOP_OPTS="-Djava.library.path=$HADOOP_PREFIX/lib"

3.) /usr/local/hadoop/etc/hadoop/core-site.xml
Change the whole file so it looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

4.) /usr/local/hadoop/etc/hadoop/mapred-site.xml
Change the whole file so it looks like this:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>


5.) /usr/local/hadoop/etc/hadoop/hdfs-site.xml
Change the whole file so it looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>


6.) /usr/local/hadoop/etc/hadoop/yarn-site.xml
Change the whole file so it looks like this:

<?xml version="1.0"?>

Annnd we’re done 🙂 Sorry about that – if I could guarantee that you’d be using the same file paths and OS as me then I’d let you wget those files from a Github somewhere but alas, I think that’s likely to cause more headaches than it solves. Don’t worry, we’re nearly there now 🙂

Has it worked up to here?

Run the following command:

/usr/local/hadoop/bin/hadoop namenode -format

If that works, you’re 20% of the way there.

Then, run:


If that seems to work without throwing up a bunch of errors:


If that’s worked, you can safely say you’ve got Hadoop running on your computer 🙂 Get it on the LinkedIn as a strength as soon as possible 😉

Now you’ve got Hadoop up and running on your computer, what can you do? Well, unfortunately with that single node and single hard disk, not much you couldn’t have done without it. However, if you’re just getting started with Linux and Hadoop you’ll have hopefully learnt a bit on the way to setting up your cluster.

Hadoop From Spare Change

A Data Scientist happened upon a load of stuff – junk, at first glance – and wondered, as was his wont, “what can I get out of this?”

Conceded: as an opening line this is less suited to a tech blog than an old-fashioned yarn. In an (arguably) funny way, this isn’t far from the truth. My answer: get some holism down your neck. Make it into a modest, non-production Hadoop cluster and enjoy a large amount of fault-tolerant storage, faster processing of large files than you’d get on a single high-spec machine, the safety of not having placed all your data-eggs in one basket, and an interesting challenge. Squeeze the final, and not inconsiderable, bit of business value out of it.

To explain, when I say “stuff”, what I mean is 6 reasonable but no longer DC-standard rack servers, and more discarded dev desktops than you can shake a duster at. News of the former reached my Data Scientist colleague and I by way of a last call before they went into the skip; I found the latter buried in the boiler room when looking for bonus cabling. As a northerner with a correspondingly traumatic upbringing, instinct won out and, being unable to see it thrown away, I requested to use the hardware.

I’m not gonna lie. They were literally dumped at my feet, “unsupported”. Fortunately, the same qualities of character that refused to see the computers go to waste saw me through the backbreaking physical labour of racking and cabling them up. Having installed Ubuntu Server 13 on each of the boxes, I had soon pinged my desktop upstairs successfully and could flee the freezing server room to administrate from upstairs. Things picked up from here, generally speaking.

The hurdle immediately ahead was the formality of installing and correctly configuring Hadoop on all of the boxes, and this, you may be glad to know, brings me to the point of this blog post. Those making their first tentative steps into the world of Hadoop may be interested to know how exactly this was achieved, and indeed, I defy anyone to point me towards a comprehensive Hadoop-from-scratch quick start which leaves you with a working version of a recent release of Hadoop. Were it not for the fact that Hadoop 2.x has significant configuration differences to Hadoop 1.x, Michael Noll’s excellently put-together page would be ideal. It’s still a superb pointer in itself and was valuable to me during my first youthful fumblings with Hadoop 18 months ago. The inclusion of important lines of bash neatly quashes the sorts of ambiguity that may arise from instructions like “move the file to HDFS” which you sometimes find.

In any case, motivated by the keenness to see cool technology adopted as easily and widely as possible, I propose in this to briefly explain the configuration steps necessary to get me into a state of reverse cartography. (Acknowledged irony: there will probably be a time when someone reads this and it’s out of date. Apologies in advance.) Having set up a single node, it’s actually more of a hassle to backtrack over your configuration to add more nodes than to just go straight to a multi-node cluster. Here’s now to do the latter.

Setting the Scene

The Hadoop architecture can be summarised in saying that it elegantly facilitates doing two things in a distributed manner: storing files, and processing files. The two poles of the Hadoop world which respectively deal with these are known as the DFS (distributed file system) layer, and the MapReduce layer. Each layer knows about the other, but can, and indeed must, be configured, administrated and used fairly independently across a cluster. There’s an interesting history to both of these computing paradigms, and many papers written by the likes of Google and Facebook describing their inner workings. A quick Youtube yields some equally illuminating talks. My personal favourites on the two layers are this for HDFS and this for MapReduce.

Typically a cluster of computers (nodes) will have 1 master node, which coordinates the distribution of storage and processing, and (n-1) slave nodes which do the actual stuff. The modules (daemons) in Hadoop 2.x which control all of these are as below.

Master Slaves
DFS  Namenode  Datanode
MR  Resourcemanager  Nodemanager

Obligatory diagram:

Hadoop Architecture

Based on your current cluster setup, Hadoop makes a bunch of intelligent decisions about where to put files, and which machines to do certain bits of processing on, motivated by maximising redundancy and fault tolerance by clever replication choices, minimising network overhead, optimally leveraging each bit of hardware, and so on. The way that the architecture makes these decisions in such a way that you, the Hadoop developer, don’t have to worry about them, is where the real beauty and power of Hadoop lies. We’ll see later in this blog how, whilst HDFS and MapReduce are breathtakingly complex and scalable under the bonnet, leveraging their power is no more difficult than performing normal straightforward file system operations and file processing in Linux.

So. At this stage, all you have is a collection of virginal, disparate machines that can see each other on the network, but beyond that share no particular sense of togetherness. Each must undergo the same setup procedure before it’s ready to pull its weight in the cluster. In a production environment, this would be achieved by means of an automated deployment script, so that nodes could be added easily and arbitrarily, but that is both overkill and an unnecessary complication here. Good old-fasioned Bash elbow grease will see us through.

Having said that, one expedient whose virtues I will extol is a little gem of software called SuperPutty, which will send the same command from any single Windows PC to all the Linux boxes simultaneously, in so doing greatly reducing repetitiveness and cutting out chances for human error:

Using SuperPutty to send commands en-masse is only the same as doing the same thing on each box in sequence.

Connect to all the boxes and make sure you’re at the same bash prompt on all of them. SuperPutty will let you store connection authentication details to save you even more time in swiftly connecting to every  machine in your cluster. (Disclaimer: if you do store passwords, anyone with Linux knowledge who finds your unattended, unlocked PC could connect to your cluster and perform wild-rogue Hadoop operations on your data. Think carefully.)

Masters and Slaves

One of your computers will be the master node, and the rest slaves. The master’s disks are the only ones that need to have an appropriate RAID configuration, since Hadoop itself handles replication in a better way in HDFS: choose JBOD for the slaves. If one of your machines stands above the rest in terms of RAM and/or processing power, choose this as the master.

Since Hadoop juggles data around amongst nodes like there’s no tomorrow, there are a few networking prerequisites to sort, to make sure it can do this unimpeded and all nodes can communicate freely with each other.


Working with IPs is a lot like teaching cats to read: it quickly becomes tedious. The file /etc/hosts enables you to specify names for IP addresses, then you can just use the names. Every node needs to know about every other node. You’ll want your hosts file on each of the boxes to look something like this so you can refer to (eg) slave 11 without having to know (or calculate!) slave 11’s IP: master slave001 slave002 slave003 slave004
... etc

It’s also a good idea to disable IPv6 on the Hadoop boxes to avoid potential confusion regarding localhost addresses… Fire every box the below commands to append the necessary lines to /etc/sysctl.conf…
sean@node:~$ echo "#disable ipv6" | sudo tee -a /etc/sysctl.conf
sean@node:~$ echo "net.ipv6.conf.all.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf
sean@node:~$ echo "net.ipv6.conf.default.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf
sean@node:~$ echo "net.ipv6.conf.lo.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf

The machines need to be rebooted for the changes to come into effect…
sean@node:~$ sudo shutdown -r now

Once they come back up, run the following to check whether IPv6 has indeed been disabled. A value of 1 would indicate that all is well.
sean@node:~$ cat /proc/sys/net/ipv6/conf/all/disable_ipv6

Setting up the Hadoop User

For uniformity across your cluster, you’ll want to have a dedicated Hadoop user with which to connect and do work…
sean@node:~$ sudo addgroup hadoop
sean@node:~$ sudo adduser --ingroup hadoop hduser
sean@node:~$ sudo adduser hduser sudo

We’ll now switch users and work as the new Hadoop user…
sean@node:~$ su - hduser

SSH Promiscuity

Communication between nodes take place by way of the secure shell (SSH) protocol. The idea is to enable every box to passwordlessly use an SSH connection to itself, and then copy those authentication details to every other box in the cluster, so that any given box is on familiar terms with any other and Hadoop is unshackled to work its magic!

Firstly, send every box the instruction to make a passwordless SSH key to itself for hduser:
hduser@node:~$ ssh-keygen -t rsa -P ""

Bash will prompt you for a location in which to store this newly-created key. Just press enter for default:
Generating public/private rsa key pair.
Enter file in which to save the key (/home/hduser/.ssh/id_rsa): Created directory '/home/hduser/.ssh'.
Your identification has been saved in /home/hduser/.ssh/id_rsa.
Your public key has been saved in /home/hduser/.ssh/
The key fingerprint is: 9b:82...................:0e:d2 hduser@ubuntu
The key's randomart image is: [weird ascii image]

Copy this new key into the local list of authorised keys:
hduser@node:~$ cat $HOME/.ssh/ >> $HOME/.ssh/authorized_keys

The final step in enabling local SSH is to connect – this will save the fingerprint of the host to the list of familiar hosts.
hduser@node:~$ ssh hduser@localhost
The authenticity of host 'localhost (::1)' can't be established.
RSA key fingerprint is d7:87...............:36:26
Are you sure you want to continue connecting? yes
Warning: permanently added 'localhost' (RSA) to the list of known hosts.

Now, to allow all the boxes to enjoy the same level of familiarity with each other, fire them all this command:
hduser@node:~$ ssh-copy-id -i $HOME/.ssh/ hduser@master

This will make every box send its SSH key to the master node. Unfortunately, you have to repeat this to tell every box to send its key to every node…
hduser@node:~$ ssh-copy-id -i $HOME/.ssh/ hduser@slave001
hduser@node:~$ ssh-copy-id -i $HOME/.ssh/ hduser@slave002
hduser@node:~$ ssh-copy-id -i $HOME/.ssh/ hduser@slave003

Finally, and this is also a bit tedious, via SuperPutty make every box SSH to each box in turn and check that all’s well. Ie, send them all:
hduser@node:~$ ssh master

…check that they all have…
hduser@node:~$ ssh slave001

… check that they all have… etc.

This is a one-time thing; after any box has connected to any other one time, the link between them remains.


The next prerequisite to sort is a Java environment, as the Hadoop core is written Java (although you can harness the power of MapReduce in any language you please, as we shall see). If you’re fortunate, your machines will have internet access, in which case fire the following command to them all using SuperPutty:
hduser@node:~$ sudo apt-get install openjdk-6-jre
If like mine, however, your machines were considered ticking chemical time bombs by infrastructure and hence weren’t granted internet access, what you’ll want to do is download a JDK to a computer that does have internet access and can also see your Hadoop boxes on the network, and fire the files over from there. So on your internet-connected box:
32 bit version:
hduser@node:~$ wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24="

64 bit version:
hduser@node:~$ wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24="

Now then! Each of your Hadoop nodes wants to connect to this box and pull over the Java files. Find its IP by typing ifconfig, and then fire this command to all of your Hadoop nodes:
hduser@node:~$ scp user@internetbox:/locationoffile/rightarchtecturefile.bin $HOME

Be careful to get the edition matching the machine, be it 32bit or 64bit.

Now execute the following on the Hadoop machines to install Java…

32 bit machines:
hduser@node:~$ chmod u+x jre-6u34-linux-i586.bin
hduser@node:~$ ./jre-6u34-linux-i586.bin
hduser@node:~$ sudo mkdir -p /usr/lib/jvm
hduser@node:~$ sudo mv jre1.6.0_34 /usr/lib/jvm/
hduser@node:~$ sudo update-alternatives --install "/usr/bin/java" "java" "/usr/lib/jvm/jre1.6.0_34/bin/java" 1
hduser@node:~$ sudo update-alternatives --install "/usr/lib/mozilla/plugins/" "" "/usr/lib/jvm/jre1.6.0_34/lib/i386/" 1
hduser@node:~$ sudo update-alternatives --install "/usr/bin/javaws" "javaws" "/usr/lib/jvm/jre1.6.0_34/bin/javaws" 1
hduser@node:~$ sudo update-alternatives --config java
hduser@node:~$ sudo update-alternatives --config javac
hduser@node:~$ export JAVA_HOME=/usr/lib/jvm/jre1.6.0_34/

64 bit machines:
hduser@node:~$ chmod u+x jdk-6u45-linux-x64.bin
hduser@node:~$ ./jdk-6u45-linux-x64.bin
hduser@node:~$ sudo mv jdk1.6.0_45 /opt
hduser@node:~$ sudo update-alternatives --install "/usr/bin/java" "java" "/opt/jdk1.6.0_45/bin/java" 1
hduser@node:~$ sudo update-alternatives --install "/usr/bin/javac" "javac" "/opt/jdk1.6.0_45/bin/javac" 1
hduser@node:~$ sudo update-alternatives --install "/usr/lib/mozilla/plugins/" "" "/opt/jdk1.6.0_45/jre/lib/amd64/" 1
hduser@node:~$ sudo update-alternatives --install "/usr/bin/javaws" "javaws" "/opt/jdk1.6.0_45/bin/javaws" 1
hduser@node:~$ sudo update-alternatives --config java
hduser@node:~$ sudo update-alternatives --config javac
hduser@node:~$ export JAVA_HOME=/opt/jdk1.6.0_45/

Finally, test by firing all machines
hduser@node:~$ java --version

You should see something like this:
hduser@node:~$ java -version
java version "1.6.0_45"
Java(TM) SE Runtime Environment (build 1.6.0_45-b06)
Java HotSpot(TM) 64-Bit Server VM (build 20.45-b01, mixed mode)

Installing Hadoop

Download Hadoop 2.2.0 into the directory /usr/local from the best possible source:
hduser@node:~$ cd /usr/local
hduser@node:~$ wget

If your boxes don’t have internet connectivity, use the same workaround we used above to circuitously get Java.

Unzip, tidy up and make appropriate ownership changes:
hduser@node:~$ sudo tar xzf hadoop-2.2.0.tar.gz
hduser@node:~$ sudo mv hadoop-2.2.0 hadoop
hduser@node:~$ sudo chown -R hduser:hadoop hadoop

Finally, append the appropriate environment variable settings and aliases to the bash configuration file:
hduser@node:~$ echo "" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "export HADOOP_HOME=/usr/local/hadoop" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "" | sudo tee -a $HOME/.bashrc

#32 bit version:
hduser@node:~$ echo "export JAVA_HOME=/usr/lib/jvm/jre1.6.0_34" | sudo tee -a $HOME/.bashrc

#64 bit version:
hduser@node:~$ echo "export JAVA_HOME=/opt/jdk1.6.0_45" | sudo tee -a $HOME/.bashrc

hduser@node:~$ echo "" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "unalias fs &> /dev/null" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "alias fs &>"hadoop fs"" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "unalias hls &> /dev/null" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "alias hls="fs -ls" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "export PATH=$PATH:$HADOOP_HOME/bin" | sudo tee -a $HOME/.bashrc

There are a few changes that must be made to the configuration files in /usr/local/hadoop/etc/hadoop which inform the HDFS and MapReduce layers. Editing these on every machine at once via SuperPutty requires skill, especially when, having made the changes, you realise that you can’t send an “escape” character to every machine at once. There’s a solution involving mapping other, sendable, characters to the escape key, but that’s “out of scope” here 😉 Here’s what the files should look like.


It needs to look like this on all machines, master and slave alike:

[code language=”xml”]

There’s only one change that needs to be made to this mofo; locate the line which specifies JAVA_HOME (helpfully commented with “the Java implementation to use”). Assuming a Java setup like that described above, this should read

32 bit machines:

export JAVA_HOME=/usr/lib/jvm/jre1.6.0_34/

64 bit machines:
export JAVA_HOME=/opt/jdk1.6.0_45/


This specifies the replication level of file blocks. Note that your physical storage size will be divided by this number to give the storage you’ll have in HDFS.

[code language=”xml”]

Additionally, it’s necessary to create a local directory on each box for Hadoop to use:
hduser@node:~$ sudo mkdir -p /app/hadoop/tmp
hduser@node:~$ sudo chown hduser:hadoop /app/hadoop/tmp


Which MapReduce implementation to use. At the moment we’re on YARN (“Yet Another Resource Negotiator”…………).

[code language=”xml”]


Controls the actual MapReduce configuration. Without further ado, this is what you want:

[code language=”xml”]


In short, the master needs to consider itself and every other node a slave. Each slave needs to consider itself, and itself only, a slave. The entirety of your slaves file ought to look like this:


Slave xyz:

Formatting the Filesystem

Much like manually deleting your data, formatting a HDFS filesystem containing data will delete any data you might have in it, so don’t do that if you don’t want to delete your data. Warnings notwithstanding, execute the following on the master node to format the HDFS namespace:
hduser@master:~$ cd /usr/local/hadoop
hduser@master:~$ bin/hadoop namenode -format

Bringing up the Cluster

This is the moment that the band strikes up. If you’re not already there, switch to the Hadoop directory…
hduser@master:~$ cd /usr/local/hadoop

Fire this shizz to start the DFS layer:
hduser@master:/usr/bin/hadoop$ sbin/

You should see this kind of thing:
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
starting namenodes on [master]
master: starting namenode, logging to /usr/local/hadoop/logs/hadoop-hduser-namenode-master.out
slave001: starting datanode, logging to /usr/local/hadoop/logs/hadoop-hduser-datanode-slave001.out
slave002: starting datanode, logging to /usr/local/hadoop/logs/hadoop-hduser-datanode-slave002.out
slave003: starting datanode, logging to /usr/local/hadoop/logs/hadoop-hduser-datanode-slave003.out

Now start the MapReduce layer:
hduser@master:/usr/local/hadoop$ sbin/

Expect to be greeted by something like this:
starting yarn daemons
starting resourcemanager, logging to /usr/local/hadoop/logs/yarn-hduser-resourcemanager-master.out
slave001: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-hduser-nodemanager-slave001.out
slave002: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-hduser-nodemanager-slave002.out
slave003: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-hduser/nodemanager-slave003.out

Also start the job history server…
hduser@master:/usr/local/hadoop$ sbin/ start historyserver

Surveying One’s Empire

By this stage your Hadoop cluster is humming like a dynamo. There are several web interfaces which provide a tangible window into the specification of the cluster as a whole…

For the DFS layer, have a look at http://master:50070.

DFS Interface

And for a breakdown of the exact condition of each node in your DFS layer,

DFS Interface 2

And for the MapReduce layer, look at http://master:8088,

YARN Interface

The First Distributed MapReduce

MapReduce is nothing more than a certain way to phrase a script to process a file, which is friendly to distributed computing. There’s a mapper, and a reducer. The “mapper” must be able to process any arbitrary fragment of the file (eg, count the number of occurrences of something within that fragment), independently and obliviously of the contents of the rest of the file. This is why it’s so scalable. The “reducer” aggregates the outputs of the mappers to give the final result (eg, sum up the occurrences of something reported by each of the mappers to give the total number of occurrences). Again, the way that you only have to write the mapper and reducer, and Hadoop handles the rest (deploying a copy of the mapper to every worker node, “shuffling” the mapper outputs for the reducer, re-allocating failed maps etc), is why Hadoop is well good. Indeed, a well-maintained cluster is much like American dance/rap duo LMFAO: every day it’s shuffling.

Later in this blog we’ll address how to write MapReduces; for now let’s just perform one and let the cluster stretch its legs for the first time.

Make a cheeky text file (example.txt):
Example text file
Contains example text

Make a directory in HDFS, lob the new file in there, and check that it’s there:
hduser@master:/usr/local/hadoop$ bin/hadoop fs -mkdir /test
hduser@master:/usr/local/hadoop$ bin/hadoop fs -put example.txt /test
hduser@master:/usr/local/hadoop$ bin/hadoop fs -ls /test
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
-rw-r--r-- 3 hduser supergroup 50 2013-12-23 09:09 /test/example.txt

As you can see, the Hadoop file system commands are very similar to the normal Linux ones. Now run the example MapReduce:
hduser@master:/usr/local/hadoop$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar wordcount /test /testout

Hadoop will immediately inform you that a load of things are now deprecated – ignore these warnings, it seems that deprecation is the final stage in creating new Hadoop modules – and then more interestingly keep you posted on the progress of the job…
INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1387739551023_0001
INFO impl.YarnClientImpl: Submitted application application_1387739551023_0001 to ResourceManager at master/
INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1387739551023_0001/
INFO mapreduce.Job: Running job: job_1387739551023_0001
INFO mapreduce.Job: Job job_1387739551023_0001 running in uber mode : false
INFO mapreduce.Job: map 0% reduce 0%
INFO mapreduce.Job: map 100% reduce 0%
INFO mapreduce.Job: map 100% reduce 100%
INFO mapreduce.Job: Job job_1387739551023_0001 completed successfully
INFO mapreduce.Job: Counters: 43
File System Counters
FILE: Number of bytes read=173
FILE: Number of bytes written=158211
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=202
HDFS: Number of bytes written=123
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=7683
Total time spent by all reduces in occupied slots (ms)=11281
Map-Reduce Framework
Map input records=2
Map output records=11
Map output bytes=145
Map output materialized bytes=173
Input split bytes=101
Combine input records=11
Combine output records=11
Reduce input groups=11
Reduce shuffle bytes=173
Reduce input records=11
Reduce output records=11
Spilled Records=22
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=127
CPU time spent (ms)=2570
Physical memory (bytes) snapshot=291241984
Virtual memory (bytes) snapshot=1030144000
Total committed heap usage (bytes)=181075968
Shuffle Errors
File Input Format Counters
Bytes Read=101
File Output Format Counters
Bytes Written=123

GLORY. We can examine the output thus:
hduser@master:/usr/local/hadoop$ bin/hadoop fs -ls /testout
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r-- 3 hduser supergroup 50 2013-12-23 09:10 /testout/_SUCCESS
-rw-r--r-- 3 hduser supergroup 50 2013-12-23 09:10 /testout/part-r-00000
hduser@master:/usr/local/hadoop$ bin/hadoop fs -cat /testout/part-r-00000
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Contains 1
Example 1
example 1
file 1
text 2

Business value, delivered. If you want to retrieve the output file from HDFS back to your local filesystem, run
hduser@master:/usr/local/hadoop$ bin/hadoop fs -get /testout
hduser@master:/usr/local/hadoop$ ls | grep testout

And there it is! Now that your Hadoop cluster is essentially a self-aware beacon of supercomputing, stay tuned for further posts on using Hadoop to do interesting/lucrative things! 🙂

© 2024 DogDogFish

Theme by Anders NorenUp ↑