Experimenting Hadoop with Real Datasets

Introduction

I have now an operational Cloudera Hadoop cluster with 4 nodes, as described in my previous post. My objective is to use interesting data to experiment MapReduce algorithms.

This article presents how I selected the datasets, imported them in Hadoop, developed and ran the MapReduce Java programs, graphed and analyzed the results using R.

Dataset Selection

I used 3 criteria for selecting a dataset:

  1. The size of data should large enough to experience the multi-node cluster architecture, an yet not so big that my little cluster would break. I looked for a dataset bigger than 2GB and smaller than 20GB.
  2. The dataset should contain multiple columns
  3. The format should be easy to parse with a MapReduce Java program

Browsing the internet, I found this interesting article that gave me some good pointers.

I chose the US weather stations hourly report from the NCDC NOAA. This dataset was meeting my criteria:

  1. Size: These reports contain, per weather station, the hourly weather report of the last 30 years. The size of each reports is about 15MB. There are 263 weather stations. Each report is named with the weather station ID. The total size of the data is more than 4GB.
  2. Multiple Columns: The hourly reports contains temperature, wind, sky conditions, etc.
  3. Format: using a simple fixed width fields format described here, quite simple to parse with a Java program.

Here is a graphical representation of the format:

The ISD-Lite format is based on a fixed width fields.

The ISD-Lite format is based on a fixed width fields.

The zipped file (650MB) can be downloaded here.

There is another file containing the location of all known weather stations in the world; The format is similar, yet there are less columns. The format can be found here, refer to section III.E; the US weather station identification format is USW000xxxxx, where xxxxx can be cross-referenced with the filename of the individual ISDLite-Normals files.

Import Datasets in HDFS

Once all the files are unzipped and located on your home drive, it is simple to import them into HDFS by using the Hue File Browser tool. Connect to your cluster HUE interface (http://hadoop1.example.com:8888) and select the File Browser icon:

In the Hue File Browser, click on the upload button.

In the Hue File Browser, click on the upload button

Select the “upload” button, then select all the files from your local directory where you have unzipped the isdlites-normals files.

The files are imported in parallel, wait until the operation completes.

The files are imported in parallel. Wait until the operation completes…

While the files are uploaded, you can connect to the Cloudera Manager and verify the status of the HDFS services; the nodes of my little cluster were quite loaded and some page swapping took place, leading to a performance degradation of the cluster. Cloudera manager identifies this situation and raises appropriate warnings. The Cloudera Manager allows you to track in real time the cluster health and status.

In the Cloudera Manager - the HDFS monitoring screen allows to closely track progress of the import process.

In the Cloudera Manager – the HDFS monitoring screen allows to closely track progress of the import process.

When all the files are uploaded in HDFS, they are ready to be used by MapReduce jobs. You can open and manipulate the files in the Hue File Browser interface:

Once the process is completed, you can manipulate your files within the Hue File Browser.

Once the process is completed, you can manipulate your files within the Hue File Browser.

The files in HDFS can be accessed with command lines. From the hadoop1 host:

$> hdfs dfs -ls /user/admin

MapReduce

Environment Preparation

The Unix user that executes the MapReduce java program should be part of the hadoop group to write to a HDFS directory. You can create a new user (in this case chris) specifying hadoop as the primary group (or you can add the group hadoop to an existing user).

User creation:

$> useradd -g hadoop chris

Group addition to an existing user:

$> usermod -a -G hadoop chris

MapReduce Java Program

My objective was to generate the mean temperature per weather stations over a period of 30 years. I designed the MapReduce as following:

Map = {key = weather station id, value = temperature}
Reduce = {key = weather station id, value = mean(temperature)}

The following figure represents the high-level design of this MapReduce:

The ISDLite files contained in HDFS are mapped to extract the id and temperature, while the reducer calculate the mean temperature.

The ISDLite files contained in HDFS are mapped to extract the id and temperature, while the reducer calculate the mean temperature.

Map

As explained above, the isdlite-normals filenames contain the weather station identification number.

To get the filename within a map() function, the FileSplit class is used. This class contains information about the chunk of file being processed by the map function. You can retrieve the following information: filename, path, starting position and length of the chunk. We use getPath().getName() to get the filename. FileSplit can be retrieved from the context object of the map function. The weather station identification number is extracted from the position 5 to the position 10 of the filename:

FileSplit fileSplit = (FileSplit) context.getInputSplit();
String weatherStationId = fileSplit.getPath().getName().substring(5, 10);

The file contains one hourly measure per line (in a text format). We need to prevent Hadoop from splitting the file in the middle of a line. The parser input format will be defined as TextInputFormat:

job.setInputFormatClass(TextInputFormat.class); 

Every call to map function map(LongWritable key, Text value, Context context) will parse a line. The line is stored in the value parameter. The different measures can then be extracted based on their position, using the substring(start, end) function. A trim() is used to remove the leading space characters:

String line = value.toString(); // Read a complete line from the file

year           = Integer.parseInt(line.substring(0, 4).trim());
month          = Integer.parseInt(line.substring(5, 7).trim());
day            = Integer.parseInt(line.substring(8, 10).trim());
hour           = Integer.parseInt(line.substring(11, 13).trim());
temperature    = Integer.parseInt(line.substring(14, 19).trim());
dew            = Integer.parseInt(line.substring(20, 25).trim());
pressure       = Integer.parseInt(line.substring(26, 31).trim());
wind_direction = Integer.parseInt(line.substring(32, 37).trim());
wind_speed     = Integer.parseInt(line.substring(38, 43).trim());
sky_condition  = Integer.parseInt(line.substring(44, 49).trim());
rain_1h        = Integer.parseInt(line.substring(50, 55).trim());
rain_6h        = Integer.parseInt(line.substring(56, 61).trim());

When a weather station cannot report a correct measure, the number -9999 is used. We will ignore these invalid measures. The code will be:

if (temperature != -9999) { // Filter out invalid temperatures
    context.write(new Text(filename), new IntWritable(temperature));
}

Resulting in the expected Map = {key = weather station id, value = temperature}

The complete map() can be found here.

Reduce

The reduce(key, Iterable<> values, context) function is invoked for each key and collection of values. We calculate the mean temperature simply by summing up all the temperature measurements and dividing by the number of measurements.

int sum = 0;
int count = 0;
for (IntWritable val : values) {
	sum += val.get();
	count++;
}
result.set(sum / count);
context.write(key, result);

Resulting in the expected Reduce = {key = weather station id, value = mean(temperature)}

The complete reduce() can be found here.

Compiling and Executing MapReduce

To compile the Java MapReduce programs, define the following variables (and add them in the /etc/bashrc configuration file for convenience)

export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
export HADOOP_CMD=/opt/cloudera/parcels/CDH/bin/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

The MapReduce is defined in a java file and class that will be named WeatherHourlyStats. The commands to compile and to create the jar file are:

javac -classpath $HADOOP_CLASSPATH -d WeatherHourlyStats WeatherHourlyStats.java
jar -cvf WeatherHourlyStats.jar -C WeatherHourlyStats/ .

To clean-up the output directory before executing the MapReduce Job:

hdfs dfs -rm -r -f /user/chris/output

The command to launch the Hadoop job:

hadoop jar WeatherHourlyStats.jar chris.hadoop.examples.WeatherHourlyStats /user/admin/input /user/chris/output

Monitor MapReduce Progress

Cloudera Manager dashboards and monitoring tools provides a lot of information. Once the Hadoop job is launched, in the main screen of Cloudera Manager, monitor the overall health of the cluster.

From the main Cloudera Manager dashboard, you can verify the number of MapReduce Job running, IO stats of the cluster network, etc.

From the main Cloudera Manager dashboard, you can verify the number of MapReduce Job running, IO stats of the cluster network, etc.

Additional information can be found in the MapReduce service dashboard, such as number of pending map() and reduce() tasks, number of slots utilized, as well as many additional informational graphs.

From the MapReduce service dashboard, there is additional information about running jobs.

From the MapReduce service dashboard, there is additional information about running jobs.

The activities dashboard, where all the jobs are listed, provides job specific information, such as percentage of completion of the map() and reduce() steps, graphs of the memory, CPU, IO used, etc. This information can be aggregated at the cluster level or drilled-down to a specific job.

MapReduce Activity Dashboard provide very detailed information about the MapReduce jobs, including logs and overview graphs

MapReduce Activity Dashboard provide very detailed information about the MapReduce jobs, including logs and overview graphs

The Job details Dashboard provides tasks information and detailed log execution, including the running time:

The MapReduce Job Details Dashboard provides information on all map() and reduce() tasks, including status, log file, execution time, etc.

The MapReduce Job Details Dashboard provides information on all map() and reduce() tasks, including status, log file, execution time, etc.

The Tasks distribution also provides useful insights on how the MapReduce tasks were distributed in the cluster, based on CPU or Memory vs. the duration.

The MapReduce Job Tasks Distribution provides interesting information on how the tasks were executed in the cluster.

The MapReduce Job Tasks Distribution provides interesting information on how the tasks were executed in the cluster.

All these dashboards provide lot of useful information. This give good insights on how the MapReduce job behaved. This information provides pointers to optimize the java code or the cluster resource distribution.

Working the Results

Once the MapReduce job completes, the output files can be found in the HDFS directory specified when the job was launched. The output is split into multiple files. The files are named part-r-0000, part-r-0001, etc. The files can be copied locally:

$> hdfs dfs -copyToLocal /user/chris/output/part* .

Append all those parts into a single file (e.g. $> cat part* > final_output). The resulting file contains all the weather stations and their average temperature over a 30 years period. The geo-localisation of the weather stations are contained in the ghcnd-stations.txt file. The format is described here.

The next steps is to transform these two files from fixed width format to comma separated (.csv) format. The transformation can be performed either with command lines (e.g. sed and awk) or by using a text editor. In the final_output file, prefix the weather station id with USW000.

By joining the weather station identification contained in both files, we determine the longitude, latitude and elevation of the weather station. To simplify this relation, import these files in a database (e.g. SQLite) using the follwing schema.

CREATE TABLE TEMPERATURE (
  ID TEXT,
  MEAN_TEMP REAL
);
CREATE TABLE WEATHER_STATIONS (
  ID TEXT,
  LATITUDE REAL,
  LONGITUDE REAL, 
  ELEVATION REAL, 
  STATE TEXT
);

In SQLite, import the file using the command .import final_output TEMPERATURE and the weather station id with .import ghcnd-stations.txt WEATHER_STATIONS. To create a file with the concatenations of all these attributes, a simple JOIN suffice:

SELECT id,
       mean_temp,
       latitude,
       longitude,
       elevation,
       state
FROM   temperature
       NATURAL JOIN weather_stations; 

Note: in SQLite, define the output mode with .mode csv and include headers with .header on

Only 166 stations had a localization information in the ghcnd file (even though the ghcnd file contains 48’622 weather stations). This highlights one of the challenges of working with datasets: accuracy and completeness of the data. In this case, 166 stations is enough and provides sufficient information to verify whether the localization of a weather stations is aligned with the expectation that the average temperature should be colder up north than south.

R: Visualize the Results

Now that we have a actionable file, the data it contains can be visualized. Using R and R-Studio to import the data and place the weather stations on their geo localization, highlighting the relation between the latitude and the average temperature.

I recommend using R-Studio, as it simplifies greatly the process of writing R and providing you with an complete IDE:

The R-Studio IDE provides a complete environment to develop and run your R code interactively.

The R-Studio IDE provides a complete environment to develop and run your R code interactively.

The R code is as follows:

library(ggplot2)
library(maptools)

# load the results of the MapReduce
meanTemp <- read.csv("Final Output Consolidated.csv")

# Load the USA map
all_states <- map_data("world", "usa") 

# Plot the map, centered on the USA & the Alaska
p <- ggplot()
p <- p + geom_polygon( data = all_states, aes(x = long, y = lat, group = group), 
                       colour = "grey50", fill= "grey90")
p <- p + scale_x_continuous("Longitude", limits = c(-170, -65))
p <- p + scale_y_continuous("Latitude", limits = c(20, 75))

# Plot the temperatures according to the weather stations location
p <- p + geom_point(data = meanTemp, aes(x = meanTemp$LONGITUDE, y = meanTemp$LATITUDE), 
                    colour = "grey20", size = 4, alpha = 0.7)
p <- p + geom_point(aes(x = meanTemp$LONGITUDE, y = meanTemp$LATITUDE,
                        col = meanTemp$MEAN_TEMP/10), size = 3, alpha = 0.7) 
p <- p + scale_colour_gradientn(name = "Temperature\nin °C", colours = rev(rainbow(4)))
p <- p + theme_bw() + ggtitle("Mean temperatures over a 30 years period")
p

This program produces the following graph:

This map contains the data produced by the hadoop MapReduce, the average temperature measured by  weather stations located in the US over a 30 years period.

This map contains the data produced by the hadoop MapReduce, the average temperature measured by weather stations located in the US over a 30 years period.

And the actual measurements over a period of 30 years confirms that the further north we go, the colder it gets.

Another MapReduce

It is very simple to modify the MapReduce Java program to collect different kind of information. For example, the evolution of the temperature on a per year basis, the map() will be defined as, including the year in the key.

Map = {key = weather station id : Year, value = temperature}

the reduce function remains unchanged.

After consolidation, we can draw visualization such as this one:

A small modification in the map() function allows to collect a different kind of data, such as analyzing the evolution of the temperature over the year per latitude.

A small modification in the map() function allows to collect a different kind of data, such as analyzing the evolution of the temperature over the year per latitude.

In this graph the weather stations are grouped per latitude, with the linear regression of the evolution of the temperature over the years. On all latitude groups, there is a slight temperature increase, which seems to be more pronounced in the south of the US.

Conclusions

This concludes my first hands-on experience in the Big Data world, and I realized, digging through the available information, that I am barely scratching the surface. The possibilities are truly endless. Once datasets are in the Hadoop cluster, you can adapt your MapReduce jobs very quickly, in my case in a few minutes, to test new ideas or theories you have. It is important to visualize your results in such a way that allows interpretation, and R is a powerful tool to achieve this.

Comments and remarks are welcome.

Post navigation


Leave a Reply