
Introduction
I have now an operational Cloudera Hadoop cluster with 4 nodes, as described in my previous post. My objective is to use interesting data to experiment MapReduce algorithms.
This article presents how I selected the datasets, imported them in Hadoop, developed and ran the MapReduce Java programs, graphed and analyzed the results using R.
Dataset Selection
I used 3 criteria for selecting a dataset:
- The size of data should large enough to experience the multi-node cluster architecture, an yet not so big that my little cluster would break. I looked for a dataset bigger than 2GB and smaller than 20GB.
- The dataset should contain multiple columns
- The format should be easy to parse with a MapReduce Java program
Browsing the internet, I found this interesting article that gave me some good pointers.
I chose the US weather stations hourly report from the NCDC NOAA. This dataset was meeting my criteria:
- Size: These reports contain, per weather station, the hourly weather report of the last 30 years. The size of each reports is about 15MB. There are 263 weather stations. Each report is named with the weather station ID. The total size of the data is more than 4GB.
- Multiple Columns: The hourly reports contains temperature, wind, sky conditions, etc.
- Format: using a simple fixed width fields format described here, quite simple to parse with a Java program.
Here is a graphical representation of the format:
The zipped file (650MB) can be downloaded here.
There is another file containing the location of all known weather stations in the world; The format is similar, yet there are less columns. The format can be found here, refer to section III.E; the US weather station identification format is USW000xxxxx, where xxxxx can be cross-referenced with the filename of the individual ISDLite-Normals files.
Import Datasets in HDFS
Once all the files are unzipped and located on your home drive, it is simple to import them into HDFS by using the Hue File Browser tool. Connect to your cluster HUE interface (http://hadoop1.example.com:8888) and select the File Browser icon:
Select the “upload” button, then select all the files from your local directory where you have unzipped the isdlites-normals files.
While the files are uploaded, you can connect to the Cloudera Manager and verify the status of the HDFS services; the nodes of my little cluster were quite loaded and some page swapping took place, leading to a performance degradation of the cluster. Cloudera manager identifies this situation and raises appropriate warnings. The Cloudera Manager allows you to track in real time the cluster health and status.

In the Cloudera Manager – the HDFS monitoring screen allows to closely track progress of the import process.
When all the files are uploaded in HDFS, they are ready to be used by MapReduce jobs. You can open and manipulate the files in the Hue File Browser interface:
The files in HDFS can be accessed with command lines. From the hadoop1 host:
$> hdfs dfs -ls /user/admin
MapReduce
Environment Preparation
The Unix user that executes the MapReduce java program should be part of the hadoop
group to write to a HDFS directory. You can create a new user (in this case chris
) specifying hadoop
as the primary group (or you can add the group hadoop
to an existing user).
User creation:
$> useradd -g hadoop chris
Group addition to an existing user:
$> usermod -a -G hadoop chris
MapReduce Java Program
My objective was to generate the mean temperature per weather stations over a period of 30 years. I designed the MapReduce as following:
Map = {key = weather station id, value = temperature} Reduce = {key = weather station id, value = mean(temperature)}
The following figure represents the high-level design of this MapReduce:

The ISDLite files contained in HDFS are mapped to extract the id and temperature, while the reducer calculate the mean temperature.
Map
As explained above, the isdlite-normals filenames contain the weather station identification number.
To get the filename within a map()
function, the FileSplit
class is used. This class contains information about the chunk of file being processed by the map function. You can retrieve the following information: filename, path, starting position and length of the chunk. We use getPath().getName()
to get the filename. FileSplit can be retrieved from the context
object of the map function. The weather station identification number is extracted from the position 5 to the position 10 of the filename:
FileSplit fileSplit = (FileSplit) context.getInputSplit(); String weatherStationId = fileSplit.getPath().getName().substring(5, 10);
The file contains one hourly measure per line (in a text format). We need to prevent Hadoop from splitting the file in the middle of a line. The parser input format will be defined as TextInputFormat
:
job.setInputFormatClass(TextInputFormat.class);
Every call to map function map(LongWritable key, Text value, Context context)
will parse a line. The line is stored in the value
parameter. The different measures can then be extracted based on their position, using the substring(start, end)
function. A trim()
is used to remove the leading space characters:
String line = value.toString(); // Read a complete line from the file year = Integer.parseInt(line.substring(0, 4).trim()); month = Integer.parseInt(line.substring(5, 7).trim()); day = Integer.parseInt(line.substring(8, 10).trim()); hour = Integer.parseInt(line.substring(11, 13).trim()); temperature = Integer.parseInt(line.substring(14, 19).trim()); dew = Integer.parseInt(line.substring(20, 25).trim()); pressure = Integer.parseInt(line.substring(26, 31).trim()); wind_direction = Integer.parseInt(line.substring(32, 37).trim()); wind_speed = Integer.parseInt(line.substring(38, 43).trim()); sky_condition = Integer.parseInt(line.substring(44, 49).trim()); rain_1h = Integer.parseInt(line.substring(50, 55).trim()); rain_6h = Integer.parseInt(line.substring(56, 61).trim());
When a weather station cannot report a correct measure, the number -9999 is used. We will ignore these invalid measures. The code will be:
if (temperature != -9999) { // Filter out invalid temperatures context.write(new Text(filename), new IntWritable(temperature)); }
Resulting in the expected Map = {key = weather station id, value = temperature}
The complete map()
can be found here.
Reduce
The reduce(key, Iterable<> values, context)
function is invoked for each key and collection of values. We calculate the mean temperature simply by summing up all the temperature measurements and dividing by the number of measurements.
int sum = 0; int count = 0; for (IntWritable val : values) { sum += val.get(); count++; } result.set(sum / count); context.write(key, result);
Resulting in the expected Reduce = {key = weather station id, value = mean(temperature)}
The complete reduce()
can be found here.
Compiling and Executing MapReduce
To compile the Java MapReduce programs, define the following variables (and add them in the /etc/bashrc configuration file for convenience)
export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop export HADOOP_CMD=/opt/cloudera/parcels/CDH/bin/hadoop export HADOOP_CLASSPATH=`hadoop classpath`
The MapReduce is defined in a java file and class that will be named WeatherHourlyStats
. The commands to compile and to create the jar file are:
javac -classpath $HADOOP_CLASSPATH -d WeatherHourlyStats WeatherHourlyStats.java jar -cvf WeatherHourlyStats.jar -C WeatherHourlyStats/ .
To clean-up the output directory before executing the MapReduce Job:
hdfs dfs -rm -r -f /user/chris/output
The command to launch the Hadoop job:
hadoop jar WeatherHourlyStats.jar chris.hadoop.examples.WeatherHourlyStats /user/admin/input /user/chris/output
Monitor MapReduce Progress
Cloudera Manager dashboards and monitoring tools provides a lot of information. Once the Hadoop job is launched, in the main screen of Cloudera Manager, monitor the overall health of the cluster.

From the main Cloudera Manager dashboard, you can verify the number of MapReduce Job running, IO stats of the cluster network, etc.
Additional information can be found in the MapReduce service dashboard, such as number of pending map() and reduce() tasks, number of slots utilized, as well as many additional informational graphs.
The activities dashboard, where all the jobs are listed, provides job specific information, such as percentage of completion of the map() and reduce() steps, graphs of the memory, CPU, IO used, etc. This information can be aggregated at the cluster level or drilled-down to a specific job.

MapReduce Activity Dashboard provide very detailed information about the MapReduce jobs, including logs and overview graphs
The Job details Dashboard provides tasks information and detailed log execution, including the running time:

The MapReduce Job Details Dashboard provides information on all map() and reduce() tasks, including status, log file, execution time, etc.
The Tasks distribution also provides useful insights on how the MapReduce tasks were distributed in the cluster, based on CPU or Memory vs. the duration.

The MapReduce Job Tasks Distribution provides interesting information on how the tasks were executed in the cluster.
All these dashboards provide lot of useful information. This give good insights on how the MapReduce job behaved. This information provides pointers to optimize the java code or the cluster resource distribution.
Working the Results
Once the MapReduce job completes, the output files can be found in the HDFS directory specified when the job was launched. The output is split into multiple files. The files are named part-r-0000, part-r-0001, etc. The files can be copied locally:
$> hdfs dfs -copyToLocal /user/chris/output/part* .
Append all those parts into a single file (e.g. $> cat part* > final_output
). The resulting file contains all the weather stations and their average temperature over a 30 years period. The geo-localisation of the weather stations are contained in the ghcnd-stations.txt
file. The format is described here.
The next steps is to transform these two files from fixed width format to comma separated (.csv) format. The transformation can be performed either with command lines (e.g. sed
and awk
) or by using a text editor. In the final_output
file, prefix the weather station id with USW000
.
By joining the weather station identification contained in both files, we determine the longitude, latitude and elevation of the weather station. To simplify this relation, import these files in a database (e.g. SQLite) using the follwing schema.
CREATE TABLE TEMPERATURE ( ID TEXT, MEAN_TEMP REAL ); CREATE TABLE WEATHER_STATIONS ( ID TEXT, LATITUDE REAL, LONGITUDE REAL, ELEVATION REAL, STATE TEXT );
In SQLite, import the file using the command .import final_output TEMPERATURE
and the weather station id with .import ghcnd-stations.txt WEATHER_STATIONS
. To create a file with the concatenations of all these attributes, a simple JOIN suffice:
SELECT id, mean_temp, latitude, longitude, elevation, state FROM temperature NATURAL JOIN weather_stations;
Note: in SQLite, define the output mode with .mode csv
and include headers with .header on
Only 166 stations had a localization information in the ghcnd file (even though the ghcnd file contains 48’622 weather stations). This highlights one of the challenges of working with datasets: accuracy and completeness of the data. In this case, 166 stations is enough and provides sufficient information to verify whether the localization of a weather stations is aligned with the expectation that the average temperature should be colder up north than south.
R: Visualize the Results
Now that we have a actionable file, the data it contains can be visualized. Using R and R-Studio to import the data and place the weather stations on their geo localization, highlighting the relation between the latitude and the average temperature.
I recommend using R-Studio, as it simplifies greatly the process of writing R and providing you with an complete IDE:
The R code is as follows:
library(ggplot2) library(maptools) # load the results of the MapReduce meanTemp <- read.csv("Final Output Consolidated.csv") # Load the USA map all_states <- map_data("world", "usa") # Plot the map, centered on the USA & the Alaska p <- ggplot() p <- p + geom_polygon( data = all_states, aes(x = long, y = lat, group = group), colour = "grey50", fill= "grey90") p <- p + scale_x_continuous("Longitude", limits = c(-170, -65)) p <- p + scale_y_continuous("Latitude", limits = c(20, 75)) # Plot the temperatures according to the weather stations location p <- p + geom_point(data = meanTemp, aes(x = meanTemp$LONGITUDE, y = meanTemp$LATITUDE), colour = "grey20", size = 4, alpha = 0.7) p <- p + geom_point(aes(x = meanTemp$LONGITUDE, y = meanTemp$LATITUDE, col = meanTemp$MEAN_TEMP/10), size = 3, alpha = 0.7) p <- p + scale_colour_gradientn(name = "Temperature\nin °C", colours = rev(rainbow(4))) p <- p + theme_bw() + ggtitle("Mean temperatures over a 30 years period") p
This program produces the following graph:

This map contains the data produced by the hadoop MapReduce, the average temperature measured by weather stations located in the US over a 30 years period.
And the actual measurements over a period of 30 years confirms that the further north we go, the colder it gets.
Another MapReduce
It is very simple to modify the MapReduce Java program to collect different kind of information. For example, the evolution of the temperature on a per year basis, the map()
will be defined as, including the year in the key.
Map = {key = weather station id : Year, value = temperature}
the reduce function remains unchanged.
After consolidation, we can draw visualization such as this one:

A small modification in the map()
function allows to collect a different kind of data, such as analyzing the evolution of the temperature over the year per latitude.
In this graph the weather stations are grouped per latitude, with the linear regression of the evolution of the temperature over the years. On all latitude groups, there is a slight temperature increase, which seems to be more pronounced in the south of the US.
Conclusions
This concludes my first hands-on experience in the Big Data world, and I realized, digging through the available information, that I am barely scratching the surface. The possibilities are truly endless. Once datasets are in the Hadoop cluster, you can adapt your MapReduce jobs very quickly, in my case in a few minutes, to test new ideas or theories you have. It is important to visualize your results in such a way that allows interpretation, and R is a powerful tool to achieve this.
Comments and remarks are welcome.