Motivation
Do I need one? Haven't you read the news? It's bigdata, this will make us all rich!
I'm not one of the voices that claim that this is the best invention
since the wheel. There's a lot of hype out there regarding bigdata, and
vendors desperatly seeking business opportunities are on top of this
appear every day.
Having said all that, hadoop is an amazing framework. It's one more
tool at hand to be chosen for specific set of problems. It's getting
easier and easier for companies to adopt it and really increases the
power to do certain type of
ad-hoc analysis that would be impossible otherwise. But there's some things that should be said out loud:
- Building and maintaining an hadoop cluster is expensive
- You need a team that knows what it's doing - more than the cost of big data is the cost of bad data
- Not everyone needs it
Before someone jumps at me: cost is obviously relative. We're talking about
Hadoop or BigData?
There's a huge difference between
Hadoop and
Big Data. Simply put,
Hadoop is a framework that provides a reliable shared storage, provided by
HDFS, and a processing framework given by
MapReduce. As we dig deeper there are other pieces of the puzzle that start to appear but theses are the fundamental ones.
This is just engineering talk.
Big Data is what you do with
it. And that makes all the difference. Write a check for a cluster,
install hadoop on it and you'll end up with a bunch of noisy machines
and 0 added value. The real challenge starts there - what you do with
the data.
The challenge
At
Mozilla, one of the Hadoop clusters of the Metrics team uses is about 60 nodes, and it is used to store / process several different data sources., stored between
hdfs,
hbase,
hive and a bunch of foreign sound words that meant little to me. It was about time
The initial goal sounded relatively easy.
Analyze a bunch of weblogs that are stored in hdfs, process them using geo localization and find out how many users per country saw the web pages
Now... where do I start? I know other engineers from the team that write their own
java code for the map reduce jobs. I'm too old for that.
I heard that
Pig
would also be an option. The last thing that I need right now is having
to learn yet another technology - unless absolutely necessary.
I had heard about all the work
Pentaho has done with Big Data Analytics but never quite understood what all that was about. But the idea of being able to use an
extremely powerful ETL tool that me and my team have been using for ages with very good results is, to say the least, appealing.
But the first step has nothing to do with it. For me is to answer the
question: What exactly is hadoop and how does it work anyway?
Hadoop 101
This link
proved to be a great resource to get me up and running. I have access
to the staging, research and if needed production cluster at Mozilla but
using it as an experimentation ground doesn't make me comfortable at
all. So I decided to install it locally and try to get it working.
Basic concepts: Hadoop
Hadoop was created by
Doug Cutting and Michael J. Cafarella and was originally developed to support distribution for the
Nutch search engine project.
There are lots of components on hadoop, but the core is divided into 2 main subprojects:
- MapReduce - A framework that schedules and assigns jobs and tasks on the cluster
- HDFS - A distributed file system that guarantees scalability and reliability
There are some important services running on the cluster. Mapreduce work is managed by the
Job Tracker, running on the master and handed over to the different
Task Trackers on the nodes.
On the data side, the master runs a
Name Node that keeps a reference to every file and block in the file system, and talks with the different
Data Nodes throughtout the slaves in the cluster.
One of the big advantages of mapreduce over the generic concept of
grid computing is it's ability to process the data that is stored locally; The planners try as much as possible to reduce bandwidth usage by processing local data.
There's obviously a lot more around, but I'll stop here for the sake of simplicity.
Installing Hadoop
Like what happens in
Linux, even though the main project is driven by
Apache, there are several different distributions, that ensure that all the independent hadoop sub projects are correctly configured and ready to talk between them.
The main providers are
HortonWorks,
Cloudera and
MapR. At Mozilla we use Cloudera's
CDH3, so that's what I chose to install.
Downloading the Virtual Machine
I chose to install a pre-configured virtual machine. I'm not very interested in the small details of configuration, so a one-node cluster is more than enough to get me started.
Cloudera provides pre-
built virtual machines you can use, and in different formats. I use
Virtual Box, so that's the one I used.
This excellent post provides details on how to install it, plus a great overview of hadoop.
In the end, booting the VM will result in something like this:
Almost ready to start playing with your system. There's only some extra network changes we need to ensure communication between host and client.
Network configurations changes
There are some configuration changes that will prove to be important. I don't want to do everything from within the VM, I also want to be able to connect from my host machine to it, and run kettle connected to it.
First step is to configure the network interfaces in virtual box. For vmware or others the instructions may vary. I defined 2 network adapters, one with
Nat to allow for outside connections and one
Host-only adapter. This will allow a static ip connection between the host and the client:
If this is correctly configured, you should see an extra interface in your host with ip
192.168.56.1, and your client would have
192.168.56.101. For convenience, I chose
hadoop-pedro for my machine (lousy name, but my text, my name!), so I changed the following configuration files:
Client:
- /etc/sysconfig/network - Adding HOSTNAME=hadoop-pedro
- /etc/hosts
$ cat /etc/hosts
127.0.0.1 localhost.localdomain localhost
192.168.56.101 hadoop-pedro hadoop-pedro.local
::1 localhost6.localdomain6 localhost6
Host:
- /etc/hosts - Add the following line:
$ cat /etc/hosts | grep hadoop-pedro
192.168.56.101 hadoop-pedro hadoop-pedro.local
This should ensure proper communication between host and VM. You should be able to
ping hadoop-pedro and get results from the hosts.
Hadoop configurations changes
CDH's configurations default to the local interfaces, and in order to guarantee that it works flawlessly when called from the hosts, I got better results by changing hadoop's configuration files to attach to the new hostname.
Hadoop is installed in
/usr/lib/hadoop, and inside there's a
conf/ directory that holds the configuration files.
There are some configuration changes that will prove to be important. The default configuration files makes hadoop's services listen to 0.0.0.0. I got better results by pointing to the specific IP address. So here's the properties I changed:
- core-site.xml: Change fs.default.name to hdfs://hadoop-pedro.local:8020
- mapred-site.xml: Change mapred.job.tracker to hadoop-pedro.local:8021 and jobtracker.thrift.address to hadoop-pedro.local:9290. Also add the following properties: mapred.map.child.java.opts to -Xmx768m and mapred.reduce.child.java.opts to -Xmx1536m
Reboot the VM for all the changes to have effect. Should be ready to go.
Knowing our way around it
Services
I really feel more comfortable knowing what happens in the system, what is running, how to restart, how to know what's happening. If you wanted to start the services manually, here's what should be run:
- $ /etc/init.d/hadoop-0.20-namenode start
- $ /etc/init.d/hadoop-0.20-secondarynamenode start
- $ /etc/init.d/hadoop-0.20-datanode start
- $ /etc/init.d/hadoop-0.20-jobtracker start
- $ /etc/init.d/hadoop-0.20-tasktracker start
You'll probably notice this is absolutely coherent with the description of the different hadoop components that I described before. There are a few others,
zookeeper and
hbase for example, that we won't need for now.
If you want to stop the services... just run the opposite way.
The logs are under
/var/log/hadoop/. To know what's going on, simply follow them:
$ tail -F /var/log/hadoop/*
Command line utils
Hadoop comes with a command line executable to interact with the system. You'll find the command
hadoop on path (or under the
bin/ directory of the hadoop distribution). Execute it without arguments to see how it works. The ones I use more often are
hadoop fs to interact with hdfs and more infrequently
hadoop job to query job execution.
Web utility ports
There are some important ports to look for:
Namenode / DFS status: http://hadoop-pedro:50070/
Information about the status of our filesystem cluster
Job Tracker: http://hadoop-pedro:50030/
One of the most useful ones. Displays information about running jobs and it's where we can inspect the output of the individual tasks running on the nodes.
Task Tracker: http://hadoop-pedro:50060/
Displays the status of individual tasks.
Hdfs
It's fundamental to know how to interact with hdfs. I use the command line
$ hadoop fs . Once again, run without arguments to know what are the different options.
We can either run locally without specifying the hdfs server or remotely specifying the full
VFS path:
$ hadoop fs -ls /
$ hadoop fs -ls hdfs://hadoop-pedro:8020/
If not present yet, I recommend creating a home directory for your user on hadoop, on my case... surprise...
pedro.
hadoop fs -mkdir hdfs://hadoop-pedro:8020/user/pedro
The most commonly used commands are:
- $ hadoop fs -ls : List files
- $ hadoop fs -mkdir : Make directory
- $ hadoop fs -put : Put local files into hdfs
- $ hadoop fs -get : Get files from hdfs
- $ hadoop fs -cat : Show the contents of a file in hdfs
- $ hadoop fs -rm : Remove a file
- $ hadoop fs -rmr : Recursively remove a directory
Pentaho Bigdata
Once I started to get familiarized with the hadoop infrastructure and
starting to look at kettle, I was surprised about the level of
documentation of
Pentaho's Big Data plugin. This is not an easy concept. It's hard to use, hard to debug, lots of stuff to know. So having a
Wiki with a good set of documentation aimed more at concrete examples is very good.
My first question was obviously
"How do I start? What do I download?". The
wiki suggests downloading a
stable kettle version and you'd get up and running in no time. But that would be too easy, and we wouldn't understand what was happening behind the hood.
Compiling kettle
I always compile kettle from source. Everyone does that, right? :)
$ svn co svn://source.pentaho.org/svnkettleroot/Kettle/branches/4.4.1
$ cd kettle-4.4.1
$ ant clean distrib
Please note that I'm using the 4.4.1 branch. This is always changing. I don't yet feel confident about using 5.0, so pay attention to the one you should be using.
In the end, we'll get a ready to run kettle in the
distrib directory. This doesn't have the bigdata plugin.
Bigdata plugin
Compiling
Next step is to compile the
bigdata plugin. Fortunately this one's already on
git.
$ git clone https://github.com/pentaho/big-data-plugin.git
$ cd big-data-plugin
$ ant
There's an important detail that made me lose a lot of time and is not obvious at all. I'll describe the details later, but the bigdata plugin prepares a zip of a bunch of jars and dependencies to copy to hadoop, and that's a static bundle. By default, points to
TRUNK-SNAPSHOT, which means that will download the latest version of kettle, eventually causing incompatibilities with the kettle version we chose before.
You can edit the file
build.properties and change the following line:
$ dependency.kettle.revision=4.4.0-stable
I'm not aware of any artifact that points to a continuous build of 4.x, so I chose the closest version available.
If you compile again you'll get a plugin version ready to use under the
dist directory. Unzip in the plugins director.
$ tar -xzf dist/pentaho-big-data-plugin-TRUNK-SNAPSHOT.tar.gz -C ~/.kettle/plugins
Configuring
After installing the plugin, we need to configure it properly. There's an important file that needs to be changed.
$ vim pentaho-big-data-plugin/plugin.properties
You need to change the following properties:
- active.hadoop.configuration = cdh3u4
- pmr.kettle.dfs.install.dir = /user//pentaho/mapreduce
- pmr.kettle.additional.plugins = steps/maxmind
Like I mentioned before, there are several hadoop distributions and each of them has made the modifications they considered necessary in order to ensure everything works well. This is a good thing. The bad thing is that 3rd party integrators have to comply with all the variants.
Pentaho developers did a great approach to try to minimize, to a certain extent, this problem. They developed a
shim around the common hadoop code (if you like to mess with source code, you'll find it under the package
org.pentaho.hadoop.shim.common) to comply with the variants.
Like I mentioned, I'm using Cloudera's
CDH3u4. Luckily it's one of the supported versions. You can see the possible values by looking at the directory
pentaho-big-data-plugin/hadoop-configurations/ . Currently the supported versions are:
- cdh3u4
- cdh4
- hadoop-20
- mapr
I'm sure this list will increase with time and relevance.
The second fundamental property is
pmr.kettle.dfs.install.dir. This is where kettle will be copied to in hdfs in order for mapreduce to be able to find all the dependencies of our jobs/transformations. Due to the way permissions are setup on Mozilla's cluster, I have to use my remote username. So I pointed it to my home dir in
/user/pedroalves/pentaho/mapreduce.
The third option is a fundamental one on my case. The bundle file that gets copied to hdfs and run on hadoop has only the core transformations and steps (and bigdata plugin, obviously) . On my case I wanted to add another one. The format is relative to the kettle directory, and my geoip plugin is under
steps/maxmind.
Everything is ready to start using. If we now launch spoon, we should see the bigdata steps:
Running mapreduce tasks
Setting up the environment
Back to my initial challenge: Parse and geolocate weblogs. On my case, I wanted to know how many and which
snippets were seen on a daily basis by country. Pentaho bigdata wiki has a
very detailed example on how to achieve the majority of this, but lacked the geolocation step.
The files are stored in the main cluster, in a hdfs directory. I started by copying a sample of those files to my local vm, simulating the real environment:
$ hadoop fs -get hdfs://:8020/www_weblogs/dir/part-r-00000.gz .
$ hadoop fs -mkdir hdfs://hadoop-pedro:8020/www_weblogs/dir/
$ hadoop fs -put part-r-00000.gz hdfs://hadoop-pedro:8020/www_weblogs/dir/
Preparing the job and the transformation
My job is pretty simple, almost a direct call to pentaho mapreduce step:
We need to fill in some information related to this step:
- Cluster information
- Map transformation
- Reduce transformation (if needed)
- Combiner transformation (if needed)
- Information about input and output
As usual best practices recommend, I used variables as much as possible. Here are the ones that I'm using, and should be self-explanatory:
# local
SNIPPET_HDFS_HOST=hadoop-pedro.localSNIPPET_HDFS_PORT=8020SNIPPET_JT_HOST=hadoop-pedro.localSNIPPET_JT_PORT=8021SNIPPET_HDFS_INPUT_PATH=/www_weblogs/snippets-stats.mozilla.org/dir/
SNIPPET_HDFS_OUTPUT_PATH=/user/pedro/tests/snippets
The input and output formats are
org.apache.hadoop.mapred.TextInputFormat and
org.apache.hadoop.mapred.TextOutputFormat. You can see in
hadoop documentation the possible values to put here, always with the possibility to write your own. Same for
output formats.
This is my transformation, ready to be executed:
Running the job
When I run the job, I immediately see in the logs the following lines:
INFO 01-02 16:49:52,665 - Spoon - Starting job...
INFO 01-02 16:49:52,666 - test_mapreduce_job - Start of job execution
INFO 01-02 16:49:52,668 - test_mapreduce_job - Starting entry [Pentaho MapReduce]
INFO 01-02 16:49:52,708 - test_mapper_with_geoip - Dispatching started for transformation [test_mapper_with_geoip]
INFO 01-02 16:49:52,817 - test_reducer - Dispatching started for transformation [test_reducer]
INFO 01-02 16:49:52,836 - Pentaho MapReduce - Cleaning output path: hdfs://hadoop-pedro.local:8020/user/pedro/tests/snippets
INFO 01-02 16:49:52,841 - Pentaho MapReduce - Installing Kettle to /user/pedroalves/pentaho/mapreduce/4.4.0-TRUNK-SNAPSHOT-cdh3u4
This looks good. However, a few moments (or minutes, depending on where the cluster is), I get a few less motivating messages:
INFO 01-02 16:50:01,687 - Total input paths to process : 1
INFO 01-02 16:50:01,843 - Pentaho MapReduce - Setup Complete: 0.0 Mapper Completion: 0.0 Reducer Completion: 0.0
INFO 01-02 16:50:06,844 - Pentaho MapReduce - Setup Complete: 0.0 Mapper Completion: 0.0 Reducer Completion: 0.0
INFO 01-02 16:50:11,857 - Pentaho MapReduce - Setup Complete: 100.0 Mapper Completion: 0.0 Reducer Completion: 0.0
INFO 01-02 16:50:16,861 - Pentaho MapReduce - Setup Complete: 100.0 Mapper Completion: 0.0 Reducer Completion: 0.0
INFO 01-02 16:50:21,878 - Pentaho MapReduce - Setup Complete: 100.0 Mapper Completion: 0.0 Reducer Completion: 0.0
ERROR 01-02 16:50:21,920 - Pentaho MapReduce - [FAILED] -- Task: attempt_201301301222_0006_m_000000_0 Attempt: attempt_201301301222_0006_m_000000_0 Event: 1
java.io.IOException: org.pentaho.di.core.exception.KettleException:
We failed to initialize at least one step. Execution can not begin!
at org.pentaho.hadoop.mapreduce.PentahoMapRunnable.run(PentahoMapRunnable.java:467)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177)
at org.apache.hadoop.mapred.Child.main(Child.java:264)
Caused by: org.pentaho.di.core.exception.KettleException:
We failed to initialize at least one step. Execution can not begin!
at org.pentaho.di.trans.Trans.prepareExecution(Trans.java:932)
at org.pentaho.hadoop.mapreduce.PentahoMapRunnable.run(PentahoMapRunnable.java:354)
... 7 more
And loops until I stop the job. From the log messages I would have absolutely no idea what was going on.
Under the hood
I had no option but to try and go deeper into understanding what happens under the hoods. To change that I had to go to the source of the information...
literally.
The approach is actually pretty simple, and follows the instructions on practically every hadoop book, but tweaked so that we can execute transformation without the hassle of writing pure java code. Here's the sequence
- Detect the shim we chose. This will guarantee later on that the specifics of each distribution is respected
- The mapreduce step is processed to get:
- The configuration for the mapper
- Configurations for the combiner
- Configurations for the reducer
- Input and Output formats
- Cluster information
- Input paths
- Output paths
- User defined configurations
- Number of map and reduce tasks
- Our set of kettle variables will be passed to the hadoop configuration, ensuring all the environment stays the same
- The output path is deleted, if that was the chosen option
- Bigdata plugin properties are read to determine the kettle installation directory. This depends on the kettle version, so a single cluster supports the usage of different versions at the same time
- Checks if kettle is already installed in hdfs. It does that by seeing if the chosen hdfs directory exists (on my specific case evaluated to: /user/pedroalves/pentaho/mapreduce/4.4.0-TRUNK-SNAPSHOT-cdh3u4 ) and if it has the subdirectories lib and plugins. This is bound to change in the future, as it clearly inefficient and unable to detect changes to the content of those directories
- The kettle archive (pentaho-big-data-plugin/pentaho-mapreduce-libraries.zip), bigdata plugin and the extra plugins we specified
- Everything is registred in haddop's DistributedCache, for local file access and classpath registration
- The job is finally submitted to execution
Debugging the transformation
Once the job is submitted, we will be able to track it's execution in hadoop's
Job Tracker at
http://hadoop-pedro:50030/
If you follow the link to get more details on the running job, you'll be able to get details on the specifics of the job configuration and the specified tasks. On my case, following the link on the map task I'm able to see the exception thrown by the mapper transformation
If we click on one the task, we'll be able to see all task attempts that have been made. And individually access the task logs. And there is the very familiar kettle output, with a line that clearly states what's going on.
ERROR 31-01 03:10:07,458 - Lookup Country - Error initializing max mind database file location '/usr/local/share/GeoIP/GeoIPCity.dat'
ERROR 31-01 03:10:07,458 - Lookup Country - org.pentaho.di.core.exception.KettleStepException:
Unable to set up MaxMind database '/usr/local/share/GeoIP/GeoIPCity.dat'
/usr/local/share/GeoIP/GeoIPCity.dat (No such file or directory)
I will spare you of all the pain that we had to go through to fix this. This seems simple but it's not, it was a very very hard task to ensure that the
.dat files were available on all the nodes. In the end,
Matt Casters and I completely rewrote the Maxmind plugin step,
which is now also on github, to support VFS.
Pro tip: Everything happened to me. Apparent thread locks, the system totally hanging with 100% cpu usage and no log output anywhere, that I eventually traced down to memory usage. One trick I managed to use and was very useful was to send a
QUIT signal to the task process (with
kill -QUIT ). Despite the scary name, this will cause the
JVM to do a thread dump, allowing us to spy on what it's doing. This tip is true for any java program.
After several days changing the plugin and debugging the origin of the problem, I finally discovered that by default mapreduce tasks run with a maximum memory of
-Xmx200m. Since I was using the city level geolocation, that value was clearly insufficient to run the transformation, that ran into OOM/GC issues, which didn't happen when I used the geo location only at country level. So do yourself a favor - increase the available memory on the cluster.
Pentaho still needs to improve the debugging abilities of pentaho bigdata plugin. Like I wrote
on my last post I ended up developing a change to the WriteToLog step to allow displaying only the top N rows of the dataset. Helps a bit until they allow us to do
proper debugging from within spoon like any regular transformation.
After all the changes to the maxmind step and increasing cluster memory, I ended up copying the GeoIP files to my hdfs user directory and specified the location using the following variables (the step also supports variable substitution - thanks Matt!)
maxmind.geoip.path = hdfs://hadoop-pedro:8020/user/pedro/geoip/GeoIP.dat
maxmind.geoipcity.path = hdfs://hadoop-pedro:8020/user/pedro/geoip/GeoIPCity.dat
Running the transformation again results in a successful run of both map and reduce tasks!
And we have access to the output
maxmind.geoip.path = hdfs://hadoop-pedro:8020/user/pedro/geoip/GeoIP.dat
$ hadoop fs -cat hdfs://hadoop-pedro:8020/user/pedro/tests/snippets/part-00000 | head -n 40
13/Aug/2012|Afghanistan|1234|31
13/Aug/2012|Afghanistan|2345|7
13/Aug/2012|Aland Islands|3456|16
13/Aug/2012|Aland Islands|4567|1
13/Aug/2012|Albania|5678|7
13/Aug/2012|Albania|7890|5
# get all the files in case there were multiple reducers running
$ hadoop fs -getmerge hdfs://hadoop-pedro:8020/user/pedro/tests/snippets/ result.txt
Final Remarks and Credits
It was a very tough week, but absolutely fundamental for me to understand how things work. Hadoop is an amazing framework, and being able to take full advantage of kettle to run our map reduce analysis is a huge bonus. There's still a lot of user experience improvements around these steps, but considering the alternative is to write java code manually or learn other new languages make this a great start.
This blog post was never meant to be a full pentaho bigdata tutorial. What I hope is that from this point on, understanding in detail how things work and what happens when we press the "run" button allows me to do further development with much more speed, since I know exactly where to look for.
I also got the chance to understand the very basics of how hadoop works, and know what each of the components does. Next step will be digging into
hbase and
hive.
I was also a bit suspicious of the performance and overhead of executing kettle transformations on hadoop, and how it would compare with
pig. Having seen the code and how lightweight the wrapper around kettle is, I have no doubts that using kettle instead of learning new stuff or approaches is, indeed an astonishingly efficient way to run map reduce jobs.
Need to credit a bunch of people that helped me throughout this last week. Mark Reid, Xavier Stevens and Daniel Einspanjer from Mozilla, Doug Moran, Matt Casters and Matt Burgess from Pentaho and Maria Roldan from
webdetails. I'm aware I was a royal PITA the last few days :)