Lately I’ve been mainly working on parallel computing, especially that I have access to this powerful LONI supercomputers with hundred of nodes and lots of memory. Specifically I wanted to compare MPI, openmp and Hadoop, both in terms of efficiency and ease of use.
K-means being my favorite easy to implement clustering algorithm, I first tried to modify my C++ code and exploit the Hadoop Pipes library to implement a map reduce program. I confess it was the first time for me writing some Hadoop code, but when you read about it the word “easy” pops a lot. If your program differs from wordcount, don’t get fooled. Rethinking an algorithm in M/R is essentially the first thing to do on paper, however those little chunk of data and their movement, quickly becomes a nightmare to implement, especially that the only documentation is word count. How about having multiple types of inputs files? specific format? and algorithm that iterates taking the output from a previous iteration?
Well definitely I dropped the idea of using the C++ Pipes, and jumped to a machine learning library Mahout that implements a number of clustering algorithms including K-Means. I cracked open the Mahout’s code and started to discover what is really needed to implement M/R code on Hadoop. While I am not going to cover that here, a word of advice, if you are a beginner, stick to Java and get you a book.
Using Mahout saves you the time to write the desired algorithm in M/R but still requires that you write some code to transform your input file into something it can understand, namely SequenceFile, and this is the input preparation phase. I really cannot elaborate more about that for now because in my case I found a work around! so basically if you use command line, and have a valid transformed sequence file as your input for both your data points and a self generated clusters (unless you specify k):
$ bin/mahout kmeans -i input -c clusters -o output -k num_clusters -dm measure -x maxIter -cd convergence_delta
Mahout provides some examples to run the corresponding implementation on a predefined raw datasets. For Kmeans, it has this example of syntheticcontrol data, where the input is space delimited. Now, depending on what you want to run you can use the provided examples but make sure the data format used corresponds to yours !
In the Kmeans example case you should have this kind of input: (space delimited vectors)
35.5351 41.7067 39.1705 48.3964 .. 38.6103
24.2104 41.7679 45.2228 43.7762 .. 48.8175
You still need to push this data to the HDFS under input directory for example:
$ $MAHOUT_HOME/bin/mahout org.apache.mahout.clustering.syntheticcontrol.kmeans.Job -i input -o output -t1 .. -t2 .. -cd .. -x 10
Lets examine the options: -cd is the convergence delta, -x is the max iteration. Like me you’d legitimately ask “but, where is kmeans’ K?” well it seems that this example runs the Canopy algorithm using the t1 and t2 distances and generates the initial cluster … I don’t want that! what I need is to provide k (integer) as an input and randomly select K points from my clusters.
Remember in the command line mode you have the K parameter, so there should be a Random Generator code somewhere! After reading the corresponding piece of code I have finally understood where it is and how to use it . I figured I can push my code to mahout so that others can use it directly, but basically I created my own Job class based on the syntheticcontrol and called it generic, then I’ve made the following modification to the source code:
Path directoryContainingConvertedInput = new Path(output, DIRECTORY_CONTAINING_CONVERTED_INPUT);
InputDriver.runJob(input, directoryContainingConvertedInput, "org.apache.mahout.math.RandomAccessSparseVector");
// log.info("Running Canopy to get initial clusters"); // Old code using CanopyDriver
// CanopyDriver.run(conf, directoryContainingConvertedInput, output, measure, t1, t2, false, false); // Old Code calling the Canopy Driver
log.info("Running random seed to get initial clusters");
Path clusters= new Path(output, Cluster.INITIAL_CLUSTERS_DIR);
clusters = RandomSeedGenerator.buildRandom(directoryContainingConvertedInput, clusters, k, measure);
If you are interested you can check the code of the InputDriver that convert the space separated file into a SequenceFile, and the RandomSeedGenerator that takes the converted input and extract k random vectors and put them into clusters directory.
Finally, if you want to run Kmeans using Mahout and you have a space separated input file you can use my example code, (waiting for the merge here is the patch: patch ):
$MAHOUT_HOME/bin/mahout org.apache.mahout.clustering.generic.kmeans.Job -i input -o output -k .. -cd .. -x 10
Bottom line, you really need to read some to write some useful Hadoop M/R code in order to grasp the idea of sequencefiles, what interfaces your classes should implement and what not! a good place to start are the example packages and Mahout.