Data Science, amongst other things.

Category: Big Data

Real-Time Analytics with Elasticsearch

When you are running a website used by thousands of people, it should go without saying that very valuable data can be collected about your users and they way they interact with your site. This usually comes in the form of click-stream data – which is effectively logs written by your webserver on every interaction (or click) made by a user who is navigating your site. An e-retailer can use this data for a many purposes, for example: analysing how users interact with the site and what causes them to place an order, monitoring the performance of a new advertising campaign, seeing how users respond to a new feature and detecting problems such as pages which result in an error 404. For many of these purposes, raw data can be crunched through a batch processing system such as Hadoop, but for some purposes it is beneficial have real-time information and statistics about how users are interacting with the site right now.

Imagine you are in e-retailer in the process of rolling out a brand new feature, such an improved search engine. As soon as this feature goes live, it will have an immediate impact on the user experience and the products people are interacting with. It may even change the way people search and the terms they search for. Having real-time statistics such as the number of page views, searches, product visits from searches and product orders from searches gives confidence that the new experience is having a positive impact, or at least not causing any serious problems. This article discusses our experience with building a data store to persist and query over click-stream data in real-time.

For the backing datastore of our analytics framework there were a huge number of possible solutions. We began by considering SQL, but as we are dealing with millions of click-stream events every day SQL can quickly become an expensive and not-so-scalable option. Ultimately we made a choice between MongoDB and Elasticsearch. Both are open source, distributed NoSQL document stores – Elasticsearch is built on top of Apache Lucene (a high performance JVM based search engine) and MongoDB is written entirely in C++. Elasticsearch provides a very clean and compact RESTful API which enables you to use JSON over HTTP to specify your configuration, index documents and perform queries. We found that the query API provided by Elasticsearch provides considerably more flexibility when it comes to faceting and search. An example of this is when you want to calculate a count, total or average and see how it changes over time. For instance, if we want to count the number of searches performed on our site by hour, the date histogram facet makes this easy. The following Elasticsearch query will return a set of key value pairs for each site, where the key is an hourly timestamp and the value is a count of search events:

{
  "query": {
    "match": {
      "eventType": "search"
    }
  },
  "facets": {
    "siteId": {
      "date_histogram": {
        "field": "timestamp",
        "interval": "hour"
      }
    }
  }
}

Doing this kind of aggregation with MongoDB is of course possible, but we found these queries are simply quicker and easier to write with Elasticsearch. Plus the API makes it very easy to query the store with a REST client. Of course there are advantages to using MongoDB – for instance at the time of writing there is far more documentation available, MongoDB provides more flexibility as the document structure can be changed at any time and also MongoDB is particularly good at facilitating backup and recovery. Since we were not looking for a primary data store, these advantages didn’t outweigh Elasticsearch’s cleaner API.

One of the first decisions to make when using Elasticsearch is how to structure your indices. In our case, we wanted to capture several different types of events, each of which would have different associated attributes. Page visits, searches, basket adds among other events are all logged by our web servers in real-time. This data then needs to be sent to Elasticsearch and indexed. We decided to use time-based Elasticsearch indices. This means that every day, a new index is created and all data for that day is stored within that index. The index name takes the format ‘EVENT_TYPE-YYYYMMDD’. There are a few advantages to doing this:

  • It’s easy to query over only the events and days we want. Elasticsearch supports wildcards matching for the index name – so if we send a query to ‘searches-201401*’ our query will be executed over all the search events which took place in January.
  • When we want to delete old data, we simply need to delete old indices. Clearing out old data can then be done easily and automatically using a daily cron job. No need to use time-based indices, which could add significant overhead.
  • Routing and sharding can be configured on a per index basis. This makes it possible for us to configure which servers should handle which days. For example, if a server is running out of disk space, we can just configure shards for future indices to avoid that server.
  • Using Elasticsearch templates, we can configure a mapping and settings which are automatically applied to new indices. For example, any index created with a name matching ‘searches-*’ will have a specific mapping and settings applied to it.

With a good idea of how the data store would be structured, the next step was to get data from our web logs into Elasticsearch, in real-time. This process involves aggregating log files, and then parsing and indexing the log events into Elasticsearch documents. We developed a Java application to serve this purpose – sending bulk updates to our Elasticsearch cluster every few seconds. The simplified data pipeline is pictured below, although we have since replaced log aggregation with Kafka, a distributed messaging framework.

The flow of data from web servers to Elasticsearch

The flow of data from web servers to Elasticsearch

We soon encountered some difficulties when writing queries over our data. One of the biggest difficulties comes when running queries over multiple indices. Each of our event types has different attributes, for example: search terms and search engine information appear on search events, but not on product visit events. What if we want to count the number of product visits resulting from searches for the term ‘xbox’? Well, we want to perform a ‘join’ operation, something which would be quite easy to achieve in SQL:

SELECT * FROM product_visits p INNER JOIN searches s ON (p.userId = s.userId) WHERE p.referrer_url = s.url AND s.term = 'xbox'

With Elasticsearch, things are not so simple. We can easily query several indices at once, but how can we specify that we want to find all product visit documents which have a matching search document?

Solution 1: Use parent child relationships
Elasticsearch enables us to specify a parent child relationship between document types. For instance, we could have two types of document: product_interaction documents (which contain event types product_visit and product_order) and cause documents (which contain event types like search). We could then configure a parent-child relationship between causes and product_interactions, such that each product_interaction is the child of a single cause. This structure enables us to use the built in ‘has_parent’ query to find all product visits that resulted from a specific search:

POST searches-201401*/_search

{
  "query": {
    "has_parent": {
      "parent_type": "cause",
      "query": {
        "match": {
          "searchTerm": "xbox"
        }
      }
    }
  }
}

This solution is flexible. The two document types are completely independent of each other – if we want, we can update one without affecting the other. To aid performance, Elasticsearch ensures that children are physically stored in the same shard as their parent. However there is still a performance cost to using the parent child relation and we found that our query speed sometimes suffered. There is also a big limitation to this design – there is no support for more complex relationships, such as grandparent->parent->child. So, we would be unable to add further relationships (e.g. between product_orders and product_interactions) and model the full chain of events.

Solution 2: Add extra information at indexing time
Our end solution which solves both of the above problems is simply to move the logic surrounding relationships into the application layer. State corresponding to recent user activity (for example, searches in the last ten minutes) are stored in memory in our indexing application. Then, when product visit events are received, the in-memory state is checked and the ’cause’ of the product visit can be found. Then, when indexing the product visit document, extra attributes are indexed within the document such as the search terms which led to the product being visited. If we want to track more events in the train, such as product orders, then attributes that were added to the product visit document are copied into the product order document. This means that at query time, we don’t need to search over multiple document types and perform a join – all the information we need is contained in a single document type. This solution gives us very fast query performance at the cost of higher storage and memory requirements – since document size is greater and some information is duplicated.

With this setup, we are able to comfortably query over a month’s worth of data within just a couple of seconds – and that’s with a single node cluster. One problem we encountered is that it is possible to write incorrect or very computationally demanding queries and no matter how complex or demanding the query, Elasticsearch will attempt the computation. If it does not have sufficient resources to complete the query it will fail with an OutOfMemory error and the only way to recover from this is to restart Elasticsearch on the failed node(s). Aside from this, our experience with Elasticsearch for near real-time analytics has been very positive.

Installing Hadoop 2.4 on Ubuntu 14.04

Hey all,

Another of my ‘getting my new operating system set up with all the bits of kit I use’ – this time we’ll be on Hadoop (and HDFS). There’s a very strong chance that this post will end up a lot like Sean’s post – Hadoop from spare-change. If there are any differences it’ll be for these reasons three:
1.) He was using Ubuntu Server 13.04 not Ubuntu Desktop 14.04
2.) He was using Hadoop 2.2 not Hadoop 2.4
3.) He was setting up a whole bunch of nodes – I’m stuck with this oft-abused laptop

Anywho – on with the show.

Step 1:

Download Hadoop from Apache: I’ll be using this mirror but I trust that if you’re not in England, you can likely find a more suitable one:
http://mirror.ox.ac.uk/sites/rsync.apache.org/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz

If you’re trying to stick to the terminal/don’t have a GUI then go with this:

wget http://mirror.ox.ac.uk/sites/rsync.apache.org/hadoop/common/hadoop-2.4.0/hadoop-2.4.0.tar.gz

Find your way to wherever you downloaded the tar.gz file and untar it using the following command:

tar -xzf hadoop-2.4.0.tar.gz

Sorry if I’m teaching you to suck eggs – everybody has to start somewhere right?

Has it worked up till here?

Run the following command in the same directory you ran the above tar command:

ls | grep hadoop | grep -v *.gz

If there’s at least one line returned (ideally hadoop-2.4.0) then you’re good up till here.

Step 2:

Let’s move everything into a more appropriate directory:

sudo mv hadoop-2.4.0/ /usr/local
cd /usr/local
sudo ln -s hadoop-2.4.0/ hadoop

We create that link to allow us to write scripts/programs that interact with Hadoop that won’t need changing if we upgrade our Hadoop version. All we’ll do is install the new version and point the Hadoop folder to the new version instead. Ace.

Has it worked up to here?

Run this command anywhere:

whereis hadoop

If the output is:
hadoop: /usr/local/hadoop
you may proceed.

Step 3:

Righty, now we’ll be setting up a new user and permissions and all that guff. I’ll steal directly from Michael Noll’s tutorial here and go with:

sudo addgroup hadoop
sudo adduser --ingroup hadoop hduser
sudo adduser hduser sudo
sudo chown -R hduser:hadoop /usr/local/hadoop/

Has it worked up to here?

Type:

ls -l /home/ | grep hadoop

If you see a line then you’re in the money.

Step 4:

SSH is a biggy – possibly not so much for the single node tutorial but when we were setting up our first cluster, SSH problems probably accounted for about 90% of all head-scratching with the remaining 10% being nits.


su - hduser
sudo apt-get install ssh
ssh-keygen -t rsa -P ""
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

So we switch to our newly created user, generate an SSH key and get it added to our authorized keys. Unfortunately, Hadoop and ipv6 don’t play nice so we’ll have to disable it – to do this you’ll need to open up /etc/sysctl.conf and add the following lines to the end:


net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1

Fair warning – you’ll need sudo privileges to modify the file so might want to open up your file editor like this:

sudo apt-get install gksu
gksu gedit /etc/sysctl.conf

If you’re set on using terminal then this’ll do it:

echo "net.ipv6.conf.all.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf
echo "net.ipv6.conf.default.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf
echo "net.ipv6.conf.lo.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf

Rumour has it that at this point you can run
sudo service networking restart
and kapeesh – ipv6 is gone. However, Atheros and Ubuntu seem to have a strange sort of ‘not working’ thing going on and so that command doesn’t work with my wireless driver. If the restart fails, just restart the computer and you should be good.

(if you’re terminal only : sudo shutdown -r now )

Has it worked up to here?

If you’re stout of heart, attempt the following:

su - hduser
ssh localhost

If that’s worked you be greeted with a message along the lines of ‘Are you sure you want to continue connecting?’ The answer you’re looking for at this point is ‘yes’.

If it hasn’t worked at this point run the following command:
cat /proc/sys/net/ipv6/conf/all/disable_ipv6

If the value returned is 0 then you’ve still not got ipv6 disabled – have a re-read of that section and see if you’ve missed anything.

Step 5:
I’m going to assume a clean install of Ubuntu on your machine (because that’s what I’ve got) – if this isn’t the case, it’s entirely likely you’ll already have Java installed. If so, find your JAVA_HOME (lots of tutorials on this online) and use that for the upcoming instructions. I’m going to be installing Java from scratch:

sudo apt-get update
sudo apt-get install default-jdk

Given a bit of luck, you’ll now have Java on your computer (I do on mine) and you’ll be able to set your environment variables. Open up your bashrc file:

su - hduser
gksu gedit .bashrc

and add the following lines:

export HADOOP_HOME=/usr/local/hadoop
export JAVA_HOME=/usr

and follow up with this command:
source ~/.bashrc

If you’ve deviated from any of the instructions above, those lines are likely to be different. You can find what your java home should be by running the following command:
which java | sed -e 's/(.*)/bin/java/1/g'

Your Hadoop home will be wherever you put it in step 2.

Has it worked up to here?

So many different ways to test – let’s run our first Hadoop command:

/usr/local/hadoop/bin/hadoop version

If that worked with no error (and gave you your Hadoop version) then you’re laughing.

Step 6:

Configuration of Hadoop (and associated bits and bobs) – we’re going to be editing a bunch of files so pick your favourite file editor and get to work. First things first though, you’re going to want some place for HDFS to save your files. If you’ve going to be storing anything big/bought external storage for this purpose now is the time to deviate from this tutorial. Otherwise, this should do it:


su - hduser
mkdir /usr/local/hadoop/data

Now for the file editing:

(only necessary when running a multi-node cluster, but let’s do it in case we ever get more nodes to add)
1.) /usr/local/hadoop/etc/hadoop/hadoop-env.sh
Change export JAVA_HOME=${JAVA_HOME} to match the JAVA_HOME you set in your bashrc (for us JAVA_HOME=/usr).
Also, change this line:
export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true
to be

export HADOOP_OPTS="$HADOOP_OPTS -Djava.net.preferIPv4Stack=true -Djava.library.path=$HADOOP_PREFIX/lib"

And finally, add the following line:
export HADOOP_COMMON_LIB_NATIVE_DIR=${HADOOP_PREFIX}/lib/native

2.) /usr/local/hadoop/etc/hadoop/yarn-env.sh
Add the following lines:

export HADOOP_CONF_LIB_NATIVE_DIR=${HADOOP_PREFIX:-"/lib/native"}
export HADOOP_OPTS="-Djava.library.path=$HADOOP_PREFIX/lib"

3.) /usr/local/hadoop/etc/hadoop/core-site.xml
Change the whole file so it looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:9000</value>
</property>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/usr/local/hadoop/data</value>
</property>
</configuration>

4.) /usr/local/hadoop/etc/hadoop/mapred-site.xml
Change the whole file so it looks like this:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

5.) /usr/local/hadoop/etc/hadoop/hdfs-site.xml
Change the whole file so it looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>3</value>
    </property>
</configuration>

6.) /usr/local/hadoop/etc/hadoop/yarn-site.xml
Change the whole file so it looks like this:

<?xml version="1.0"?>
<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>
    <property>
        <name>yarn.resourcemanager.resource-tracker.address</name>
        <value>localhost:8025</value>
    </property>
    <property>
        <name>yarn.resourcemanager.scheduler.address</name>
        <value>localhost:8030</value>
    </property>
    <property>
        <name>yarn.resourcemanager.address</name>
        <value>localhost:8050</value>
    </property>
</configuration>

Annnd we’re done 🙂 Sorry about that – if I could guarantee that you’d be using the same file paths and OS as me then I’d let you wget those files from a Github somewhere but alas, I think that’s likely to cause more headaches than it solves. Don’t worry, we’re nearly there now 🙂

Has it worked up to here?

Run the following command:

/usr/local/hadoop/bin/hadoop namenode -format

If that works, you’re 20% of the way there.

Then, run:

/usr/local/hadoop/sbin/start-dfs.sh

If that seems to work without throwing up a bunch of errors:

/usr/local/hadoop/sbin/start-yarn.sh

If that’s worked, you can safely say you’ve got Hadoop running on your computer 🙂 Get it on the LinkedIn as a strength as soon as possible 😉

Conclusion
Now you’ve got Hadoop up and running on your computer, what can you do? Well, unfortunately with that single node and single hard disk, not much you couldn’t have done without it. However, if you’re just getting started with Linux and Hadoop you’ll have hopefully learnt a bit on the way to setting up your cluster.

Hadoop From Spare Change

A Data Scientist happened upon a load of stuff – junk, at first glance – and wondered, as was his wont, “what can I get out of this?”

Conceded: as an opening line this is less suited to a tech blog than an old-fashioned yarn. In an (arguably) funny way, this isn’t far from the truth. My answer: get some holism down your neck. Make it into a modest, non-production Hadoop cluster and enjoy a large amount of fault-tolerant storage, faster processing of large files than you’d get on a single high-spec machine, the safety of not having placed all your data-eggs in one basket, and an interesting challenge. Squeeze the final, and not inconsiderable, bit of business value out of it.

To explain, when I say “stuff”, what I mean is 6 reasonable but no longer DC-standard rack servers, and more discarded dev desktops than you can shake a duster at. News of the former reached my Data Scientist colleague and I by way of a last call before they went into the skip; I found the latter buried in the boiler room when looking for bonus cabling. As a northerner with a correspondingly traumatic upbringing, instinct won out and, being unable to see it thrown away, I requested to use the hardware.

I’m not gonna lie. They were literally dumped at my feet, “unsupported”. Fortunately, the same qualities of character that refused to see the computers go to waste saw me through the backbreaking physical labour of racking and cabling them up. Having installed Ubuntu Server 13 on each of the boxes, I had soon pinged my desktop upstairs successfully and could flee the freezing server room to administrate from upstairs. Things picked up from here, generally speaking.

The hurdle immediately ahead was the formality of installing and correctly configuring Hadoop on all of the boxes, and this, you may be glad to know, brings me to the point of this blog post. Those making their first tentative steps into the world of Hadoop may be interested to know how exactly this was achieved, and indeed, I defy anyone to point me towards a comprehensive Hadoop-from-scratch quick start which leaves you with a working version of a recent release of Hadoop. Were it not for the fact that Hadoop 2.x has significant configuration differences to Hadoop 1.x, Michael Noll’s excellently put-together page would be ideal. It’s still a superb pointer in itself and was valuable to me during my first youthful fumblings with Hadoop 18 months ago. The inclusion of important lines of bash neatly quashes the sorts of ambiguity that may arise from instructions like “move the file to HDFS” which you sometimes find.

In any case, motivated by the keenness to see cool technology adopted as easily and widely as possible, I propose in this to briefly explain the configuration steps necessary to get me into a state of reverse cartography. (Acknowledged irony: there will probably be a time when someone reads this and it’s out of date. Apologies in advance.) Having set up a single node, it’s actually more of a hassle to backtrack over your configuration to add more nodes than to just go straight to a multi-node cluster. Here’s now to do the latter.

Setting the Scene

The Hadoop architecture can be summarised in saying that it elegantly facilitates doing two things in a distributed manner: storing files, and processing files. The two poles of the Hadoop world which respectively deal with these are known as the DFS (distributed file system) layer, and the MapReduce layer. Each layer knows about the other, but can, and indeed must, be configured, administrated and used fairly independently across a cluster. There’s an interesting history to both of these computing paradigms, and many papers written by the likes of Google and Facebook describing their inner workings. A quick Youtube yields some equally illuminating talks. My personal favourites on the two layers are this for HDFS and this for MapReduce.

Typically a cluster of computers (nodes) will have 1 master node, which coordinates the distribution of storage and processing, and (n-1) slave nodes which do the actual stuff. The modules (daemons) in Hadoop 2.x which control all of these are as below.

Master Slaves
DFS  Namenode  Datanode
MR  Resourcemanager  Nodemanager

Obligatory diagram:

Hadoop Architecture
Illuminating.

Based on your current cluster setup, Hadoop makes a bunch of intelligent decisions about where to put files, and which machines to do certain bits of processing on, motivated by maximising redundancy and fault tolerance by clever replication choices, minimising network overhead, optimally leveraging each bit of hardware, and so on. The way that the architecture makes these decisions in such a way that you, the Hadoop developer, don’t have to worry about them, is where the real beauty and power of Hadoop lies. We’ll see later in this blog how, whilst HDFS and MapReduce are breathtakingly complex and scalable under the bonnet, leveraging their power is no more difficult than performing normal straightforward file system operations and file processing in Linux.

So. At this stage, all you have is a collection of virginal, disparate machines that can see each other on the network, but beyond that share no particular sense of togetherness. Each must undergo the same setup procedure before it’s ready to pull its weight in the cluster. In a production environment, this would be achieved by means of an automated deployment script, so that nodes could be added easily and arbitrarily, but that is both overkill and an unnecessary complication here. Good old-fasioned Bash elbow grease will see us through.

Having said that, one expedient whose virtues I will extol is a little gem of software called SuperPutty, which will send the same command from any single Windows PC to all the Linux boxes simultaneously, in so doing greatly reducing repetitiveness and cutting out chances for human error:

SuperPutty
Using SuperPutty to send commands en-masse is only the same as doing the same thing on each box in sequence.

Connect to all the boxes and make sure you’re at the same bash prompt on all of them. SuperPutty will let you store connection authentication details to save you even more time in swiftly connecting to every  machine in your cluster. (Disclaimer: if you do store passwords, anyone with Linux knowledge who finds your unattended, unlocked PC could connect to your cluster and perform wild-rogue Hadoop operations on your data. Think carefully.)

Masters and Slaves

One of your computers will be the master node, and the rest slaves. The master’s disks are the only ones that need to have an appropriate RAID configuration, since Hadoop itself handles replication in a better way in HDFS: choose JBOD for the slaves. If one of your machines stands above the rest in terms of RAM and/or processing power, choose this as the master.

Since Hadoop juggles data around amongst nodes like there’s no tomorrow, there are a few networking prerequisites to sort, to make sure it can do this unimpeded and all nodes can communicate freely with each other.

Hosts

Working with IPs is a lot like teaching cats to read: it quickly becomes tedious. The file /etc/hosts enables you to specify names for IP addresses, then you can just use the names. Every node needs to know about every other node. You’ll want your hosts file on each of the boxes to look something like this so you can refer to (eg) slave 11 without having to know (or calculate!) slave 11’s IP:
123.1.1.25 master
123.1.1.26 slave001
123.1.1.27 slave002
123.1.1.28 slave003
123.1.1.29 slave004
... etc

It’s also a good idea to disable IPv6 on the Hadoop boxes to avoid potential confusion regarding localhost addresses… Fire every box the below commands to append the necessary lines to /etc/sysctl.conf…
sean@node:~$ echo "#disable ipv6" | sudo tee -a /etc/sysctl.conf
sean@node:~$ echo "net.ipv6.conf.all.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf
sean@node:~$ echo "net.ipv6.conf.default.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf
sean@node:~$ echo "net.ipv6.conf.lo.disable_ipv6 = 1" | sudo tee -a /etc/sysctl.conf

The machines need to be rebooted for the changes to come into effect…
sean@node:~$ sudo shutdown -r now

Once they come back up, run the following to check whether IPv6 has indeed been disabled. A value of 1 would indicate that all is well.
sean@node:~$ cat /proc/sys/net/ipv6/conf/all/disable_ipv6

Setting up the Hadoop User

For uniformity across your cluster, you’ll want to have a dedicated Hadoop user with which to connect and do work…
sean@node:~$ sudo addgroup hadoop
sean@node:~$ sudo adduser --ingroup hadoop hduser
sean@node:~$ sudo adduser hduser sudo

We’ll now switch users and work as the new Hadoop user…
sean@node:~$ su - hduser
hduser@node:~$

SSH Promiscuity

Communication between nodes take place by way of the secure shell (SSH) protocol. The idea is to enable every box to passwordlessly use an SSH connection to itself, and then copy those authentication details to every other box in the cluster, so that any given box is on familiar terms with any other and Hadoop is unshackled to work its magic!

Firstly, send every box the instruction to make a passwordless SSH key to itself for hduser:
hduser@node:~$ ssh-keygen -t rsa -P ""

Bash will prompt you for a location in which to store this newly-created key. Just press enter for default:
Generating public/private rsa key pair.
Enter file in which to save the key (/home/hduser/.ssh/id_rsa): Created directory '/home/hduser/.ssh'.
Your identification has been saved in /home/hduser/.ssh/id_rsa.
Your public key has been saved in /home/hduser/.ssh/id_rsa.pub
The key fingerprint is: 9b:82...................:0e:d2 hduser@ubuntu
The key's randomart image is: [weird ascii image]

Copy this new key into the local list of authorised keys:
hduser@node:~$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys

The final step in enabling local SSH is to connect – this will save the fingerprint of the host to the list of familiar hosts.
hduser@node:~$ ssh hduser@localhost
The authenticity of host 'localhost (::1)' can't be established.
RSA key fingerprint is d7:87...............:36:26
Are you sure you want to continue connecting? yes
Warning: permanently added 'localhost' (RSA) to the list of known hosts.

Now, to allow all the boxes to enjoy the same level of familiarity with each other, fire them all this command:
hduser@node:~$ ssh-copy-id -i $HOME/.ssh/id_rsa.pub hduser@master

This will make every box send its SSH key to the master node. Unfortunately, you have to repeat this to tell every box to send its key to every node…
hduser@node:~$ ssh-copy-id -i $HOME/.ssh/id_rsa.pub hduser@slave001
hduser@node:~$ ssh-copy-id -i $HOME/.ssh/id_rsa.pub hduser@slave002
hduser@node:~$ ssh-copy-id -i $HOME/.ssh/id_rsa.pub hduser@slave003
etc...

Finally, and this is also a bit tedious, via SuperPutty make every box SSH to each box in turn and check that all’s well. Ie, send them all:
hduser@node:~$ ssh master

…check that they all have…
hduser@node:~$ ssh slave001

… check that they all have… etc.

This is a one-time thing; after any box has connected to any other one time, the link between them remains.

Java

The next prerequisite to sort is a Java environment, as the Hadoop core is written Java (although you can harness the power of MapReduce in any language you please, as we shall see). If you’re fortunate, your machines will have internet access, in which case fire the following command to them all using SuperPutty:
hduser@node:~$ sudo apt-get install openjdk-6-jre
If like mine, however, your machines were considered ticking chemical time bombs by infrastructure and hence weren’t granted internet access, what you’ll want to do is download a JDK to a computer that does have internet access and can also see your Hadoop boxes on the network, and fire the files over from there. So on your internet-connected box:
32 bit version:
hduser@node:~$ wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http://www.oracle.com/" http://download.oracle.com/otn-pub/java/jdk/6u34-b04/jre-6u34-linux-i586.bin

64 bit version:
hduser@node:~$ wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http://www.oracle.com/" http://download.oracle.com/otn-pub/java/jdk/6u45-b06/jdk-6u45-linux-x64.bin

Now then! Each of your Hadoop nodes wants to connect to this box and pull over the Java files. Find its IP by typing ifconfig, and then fire this command to all of your Hadoop nodes:
hduser@node:~$ scp user@internetbox:/locationoffile/rightarchtecturefile.bin $HOME

Be careful to get the edition matching the machine, be it 32bit or 64bit.

Now execute the following on the Hadoop machines to install Java…

32 bit machines:
hduser@node:~$ chmod u+x jre-6u34-linux-i586.bin
hduser@node:~$ ./jre-6u34-linux-i586.bin
hduser@node:~$ sudo mkdir -p /usr/lib/jvm
hduser@node:~$ sudo mv jre1.6.0_34 /usr/lib/jvm/
hduser@node:~$ sudo update-alternatives --install "/usr/bin/java" "java" "/usr/lib/jvm/jre1.6.0_34/bin/java" 1
hduser@node:~$ sudo update-alternatives --install "/usr/lib/mozilla/plugins/libjavaplugin.so" "mozilla-javaplugin.so" "/usr/lib/jvm/jre1.6.0_34/lib/i386/libnpjp2.so" 1
hduser@node:~$ sudo update-alternatives --install "/usr/bin/javaws" "javaws" "/usr/lib/jvm/jre1.6.0_34/bin/javaws" 1
hduser@node:~$ sudo update-alternatives --config java
hduser@node:~$ sudo update-alternatives --config javac
hduser@node:~$ export JAVA_HOME=/usr/lib/jvm/jre1.6.0_34/

64 bit machines:
hduser@node:~$ chmod u+x jdk-6u45-linux-x64.bin
hduser@node:~$ ./jdk-6u45-linux-x64.bin
hduser@node:~$ sudo mv jdk1.6.0_45 /opt
hduser@node:~$ sudo update-alternatives --install "/usr/bin/java" "java" "/opt/jdk1.6.0_45/bin/java" 1
hduser@node:~$ sudo update-alternatives --install "/usr/bin/javac" "javac" "/opt/jdk1.6.0_45/bin/javac" 1
hduser@node:~$ sudo update-alternatives --install "/usr/lib/mozilla/plugins/libjavaplugin.so" "mozilla-javaplugin.so" "/opt/jdk1.6.0_45/jre/lib/amd64/libnpjp2.so" 1
hduser@node:~$ sudo update-alternatives --install "/usr/bin/javaws" "javaws" "/opt/jdk1.6.0_45/bin/javaws" 1
hduser@node:~$ sudo update-alternatives --config java
hduser@node:~$ sudo update-alternatives --config javac
hduser@node:~$ export JAVA_HOME=/opt/jdk1.6.0_45/

Finally, test by firing all machines
hduser@node:~$ java --version

You should see something like this:
hduser@node:~$ java -version
java version "1.6.0_45"
Java(TM) SE Runtime Environment (build 1.6.0_45-b06)
Java HotSpot(TM) 64-Bit Server VM (build 20.45-b01, mixed mode)

Installing Hadoop

Download Hadoop 2.2.0 into the directory /usr/local from the best possible source:
hduser@node:~$ cd /usr/local
hduser@node:~$ wget http://mirror.ox.ac.uk/sites/rsync.apache.org/hadoop/core/hadoop-2.2.0/hadoop-2.2.0.tar.gz

If your boxes don’t have internet connectivity, use the same workaround we used above to circuitously get Java.

Unzip, tidy up and make appropriate ownership changes:
hduser@node:~$ sudo tar xzf hadoop-2.2.0.tar.gz
hduser@node:~$ sudo mv hadoop-2.2.0 hadoop
hduser@node:~$ sudo chown -R hduser:hadoop hadoop

Finally, append the appropriate environment variable settings and aliases to the bash configuration file:
hduser@node:~$ echo "" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "export HADOOP_HOME=/usr/local/hadoop" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "" | sudo tee -a $HOME/.bashrc

#32 bit version:
hduser@node:~$ echo "export JAVA_HOME=/usr/lib/jvm/jre1.6.0_34" | sudo tee -a $HOME/.bashrc

#64 bit version:
hduser@node:~$ echo "export JAVA_HOME=/opt/jdk1.6.0_45" | sudo tee -a $HOME/.bashrc

hduser@node:~$ echo "" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "unalias fs &> /dev/null" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "alias fs &>"hadoop fs"" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "unalias hls &> /dev/null" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "alias hls="fs -ls" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "" | sudo tee -a $HOME/.bashrc
hduser@node:~$ echo "export PATH=$PATH:$HADOOP_HOME/bin" | sudo tee -a $HOME/.bashrc

There are a few changes that must be made to the configuration files in /usr/local/hadoop/etc/hadoop which inform the HDFS and MapReduce layers. Editing these on every machine at once via SuperPutty requires skill, especially when, having made the changes, you realise that you can’t send an “escape” character to every machine at once. There’s a solution involving mapping other, sendable, characters to the escape key, but that’s “out of scope” here 😉 Here’s what the files should look like.

core-site.xml

It needs to look like this on all machines, master and slave alike:

[code language=”xml”]
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/hadoop/tmp</value>
</property>
</configuration>[/code]

hadoop-env.sh

There’s only one change that needs to be made to this mofo; locate the line which specifies JAVA_HOME (helpfully commented with “the Java implementation to use”). Assuming a Java setup like that described above, this should read

32 bit machines:

export JAVA_HOME=/usr/lib/jvm/jre1.6.0_34/

64 bit machines:
export JAVA_HOME=/opt/jdk1.6.0_45/

hdfs-site.xml

This specifies the replication level of file blocks. Note that your physical storage size will be divided by this number to give the storage you’ll have in HDFS.

[code language=”xml”]
<configuration>
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
</configuration>[/code]

Additionally, it’s necessary to create a local directory on each box for Hadoop to use:
hduser@node:~$ sudo mkdir -p /app/hadoop/tmp
hduser@node:~$ sudo chown hduser:hadoop /app/hadoop/tmp

mapred-site.xml

Which MapReduce implementation to use. At the moment we’re on YARN (“Yet Another Resource Negotiator”…………).

[code language=”xml”]
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>[/code]

yarn-site.xml

Controls the actual MapReduce configuration. Without further ado, this is what you want:

[code language=”xml”]
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:8025</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8040</value>
</property>
</configuration>[/code]

Slaves

In short, the master needs to consider itself and every other node a slave. Each slave needs to consider itself, and itself only, a slave. The entirety of your slaves file ought to look like this:

Master:
master
slave001
slave002
slave003
etc

Slave xyz:
slavexyz

Formatting the Filesystem

Much like manually deleting your data, formatting a HDFS filesystem containing data will delete any data you might have in it, so don’t do that if you don’t want to delete your data. Warnings notwithstanding, execute the following on the master node to format the HDFS namespace:
hduser@master:~$ cd /usr/local/hadoop
hduser@master:~$ bin/hadoop namenode -format

Bringing up the Cluster

This is the moment that the band strikes up. If you’re not already there, switch to the Hadoop directory…
hduser@master:~$ cd /usr/local/hadoop

Fire this shizz to start the DFS layer:
hduser@master:/usr/bin/hadoop$ sbin/start-dfs.sh

You should see this kind of thing:
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
starting namenodes on [master]
master: starting namenode, logging to /usr/local/hadoop/logs/hadoop-hduser-namenode-master.out
slave001: starting datanode, logging to /usr/local/hadoop/logs/hadoop-hduser-datanode-slave001.out
slave002: starting datanode, logging to /usr/local/hadoop/logs/hadoop-hduser-datanode-slave002.out
slave003: starting datanode, logging to /usr/local/hadoop/logs/hadoop-hduser-datanode-slave003.out
...etc

Now start the MapReduce layer:
hduser@master:/usr/local/hadoop$ sbin/start-yarn.sh

Expect to be greeted by something like this:
starting yarn daemons
starting resourcemanager, logging to /usr/local/hadoop/logs/yarn-hduser-resourcemanager-master.out
slave001: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-hduser-nodemanager-slave001.out
slave002: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-hduser-nodemanager-slave002.out
slave003: starting nodemanager, logging to /usr/local/hadoop/logs/yarn-hduser/nodemanager-slave003.out
...

Also start the job history server…
hduser@master:/usr/local/hadoop$ sbin/mr-jobhistory-daemon.sh start historyserver

Surveying One’s Empire

By this stage your Hadoop cluster is humming like a dynamo. There are several web interfaces which provide a tangible window into the specification of the cluster as a whole…

For the DFS layer, have a look at http://master:50070.

DFS Interface

And for a breakdown of the exact condition of each node in your DFS layer,

DFS Interface 2

And for the MapReduce layer, look at http://master:8088,

YARN Interface

The First Distributed MapReduce

MapReduce is nothing more than a certain way to phrase a script to process a file, which is friendly to distributed computing. There’s a mapper, and a reducer. The “mapper” must be able to process any arbitrary fragment of the file (eg, count the number of occurrences of something within that fragment), independently and obliviously of the contents of the rest of the file. This is why it’s so scalable. The “reducer” aggregates the outputs of the mappers to give the final result (eg, sum up the occurrences of something reported by each of the mappers to give the total number of occurrences). Again, the way that you only have to write the mapper and reducer, and Hadoop handles the rest (deploying a copy of the mapper to every worker node, “shuffling” the mapper outputs for the reducer, re-allocating failed maps etc), is why Hadoop is well good. Indeed, a well-maintained cluster is much like American dance/rap duo LMFAO: every day it’s shuffling.

Later in this blog we’ll address how to write MapReduces; for now let’s just perform one and let the cluster stretch its legs for the first time.

Make a cheeky text file (example.txt):
Example text file
Contains example text

Make a directory in HDFS, lob the new file in there, and check that it’s there:
hduser@master:/usr/local/hadoop$ bin/hadoop fs -mkdir /test
hduser@master:/usr/local/hadoop$ bin/hadoop fs -put example.txt /test
hduser@master:/usr/local/hadoop$ bin/hadoop fs -ls /test
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 1 items
-rw-r--r-- 3 hduser supergroup 50 2013-12-23 09:09 /test/example.txt

As you can see, the Hadoop file system commands are very similar to the normal Linux ones. Now run the example MapReduce:
hduser@master:/usr/local/hadoop$ bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.2.0.jar wordcount /test /testout

Hadoop will immediately inform you that a load of things are now deprecated – ignore these warnings, it seems that deprecation is the final stage in creating new Hadoop modules – and then more interestingly keep you posted on the progress of the job…
INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1387739551023_0001
INFO impl.YarnClientImpl: Submitted application application_1387739551023_0001 to ResourceManager at master/1.1.1.1:8040
INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1387739551023_0001/
INFO mapreduce.Job: Running job: job_1387739551023_0001
INFO mapreduce.Job: Job job_1387739551023_0001 running in uber mode : false
INFO mapreduce.Job: map 0% reduce 0%
INFO mapreduce.Job: map 100% reduce 0%
INFO mapreduce.Job: map 100% reduce 100%
INFO mapreduce.Job: Job job_1387739551023_0001 completed successfully
INFO mapreduce.Job: Counters: 43
File System Counters
FILE: Number of bytes read=173
FILE: Number of bytes written=158211
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=202
HDFS: Number of bytes written=123
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Rack-local map tasks=1
Total time spent by all maps in occupied slots (ms)=7683
Total time spent by all reduces in occupied slots (ms)=11281
Map-Reduce Framework
Map input records=2
Map output records=11
Map output bytes=145
Map output materialized bytes=173
Input split bytes=101
Combine input records=11
Combine output records=11
Reduce input groups=11
Reduce shuffle bytes=173
Reduce input records=11
Reduce output records=11
Spilled Records=22
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=127
CPU time spent (ms)=2570
Physical memory (bytes) snapshot=291241984
Virtual memory (bytes) snapshot=1030144000
Total committed heap usage (bytes)=181075968
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=101
File Output Format Counters
Bytes Written=123
hduser@master:/usr/local/hadoop$

GLORY. We can examine the output thus:
hduser@master:/usr/local/hadoop$ bin/hadoop fs -ls /testout
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r-- 3 hduser supergroup 50 2013-12-23 09:10 /testout/_SUCCESS
-rw-r--r-- 3 hduser supergroup 50 2013-12-23 09:10 /testout/part-r-00000
hduser@master:/usr/local/hadoop$ bin/hadoop fs -cat /testout/part-r-00000
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Contains 1
Example 1
example 1
file 1
text 2

Business value, delivered. If you want to retrieve the output file from HDFS back to your local filesystem, run
hduser@master:/usr/local/hadoop$ bin/hadoop fs -get /testout
hduser@master:/usr/local/hadoop$ ls | grep testout
testout

And there it is! Now that your Hadoop cluster is essentially a self-aware beacon of supercomputing, stay tuned for further posts on using Hadoop to do interesting/lucrative things! 🙂

© 2024 DogDogFish

Theme by Anders NorenUp ↑