Understanding MapReduce

The first time that I read something about MapReduce was in a paper from Google, its title is “MapReduce: Simplied Data Processing on Large Clusters

The idea captivated me from the beginning but I haven’t any opportunity to use it, until now, when a project took me over to MongoDB and find that MapReduce is part of it.

So, in this first post I’m going to talk about how MapReduce algorithm works, with a very simple c# implementation as a sample. In a future post I will show you how we can use it in MongoDB

What is MapReduce?

It is an algorithm that can process a very large amount of data in parallel. It receives three inputs, a source collection, a Map function and a Reduce function. And it will return a new data collection.

Collection MapReduce(Collection source, Function map, Function reduce)

The algorithm is compossed by few steps, the first one consists to execute the Map function to each item within the source collection. The Map will return zero or may instances of Key/Value objects.

 ArrayOfKeyValue Map(object itemFromSourceCollection)

So, we can say that Map’s responsability is to convert an item from the source collection to zero or many instances of Key/Value objects. We can see this at the picture below.

At the next step , the algorithm will sort all Key/Value instances and it will create new object instances where all values will be grouped by Key.

The last step will executes the Reduce function by each grouped Key/Value instance.

ItemResult Reduce(KeyWithArrayOfValues item)

 The Reduce function will return a new item instance that will be included into the result collection.

Sample

Let me show you a very simple c# implementation of this algorithm, it counts all the vocals in an array of strings.

I created a generic MapReduce functon that represents the algorithm’s orchestation, and I also implemented the Map and Reduce functions that are required as inputs of the MapReduce. The implementation of the Map and Reduce functions are specifics for the task that we want to acomplish, in this sample it is “To count all vocals within a set of strings”

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;

namespace MapReduceSample
{
    // This class represents a collection's item result
    class VocalCount
    {
        public char Vocal;
        public int Count;
    }

    class Program
    {
        static void Main(string[] args)
        {
            // "lines" represents the source collection.
            var lines = new[] { 
                "How many vocals do", 
                "these two lines have?" 
            };

            foreach (var line in lines)
            {
                Console.WriteLine(line);
            }
            Console.WriteLine();

            // Invokes MapReduce
            var results = MapReduce(lines, Map, Reduce); 

            // Displays result
            foreach (var result in results)
            {
                Console.WriteLine("{0} = {1}", result.Vocal, result.Count);
            }

            Console.ReadKey();
        }

        /// <summary>
        /// The map function counts vocals in a string
        /// </summary>
        /// <param name="sourceItem">A line to be processed</param>
        /// <returns>A collection of Key/Value instances. 
        /// Where the key is the vocal, and the value is its count.</returns>
        static IEnumerable<KeyValuePair<char, int>> Map(string sourceItem)
        {
            return sourceItem
                .ToLower()
                .Where(c => "aeiou".Contains(c))
                .GroupBy(c => c, (c, instances) => new KeyValuePair<char, int>(c, instances.Count())); 

        }

        /// <summary>
        /// The reduce function compute the total count for each vocal
        /// </summary>
        /// <param name="reduceItem">A Key/Values instance. Where the key is the vocal, 
        /// and the value is an enumeration of all counts</param>
        /// <returns>A result instance, VocalCount</returns>
        static VocalCount Reduce(KeyValuePair<char, IEnumerable<int>> reduceItem)
        {
            return new VocalCount
            {
                Vocal = reduceItem.Key,
                Count = reduceItem.Value.Sum()  // Computes total count
            };
        }

        /// <summary>
        /// A generic implementation of MapReduce
        /// </summary>
        /// <typeparam name="TSource">Type of the items at the source collection</typeparam>
        /// <typeparam name="TKey">Key's type used by Map and Reduce functions</typeparam>
        /// <typeparam name="TValue">Value's type used by Map and Reduce functions</typeparam>
        /// <typeparam name="TResult">Type of the items at the returned collection</typeparam>
        /// <param name="source">Source collection</param>
        /// <param name="map">Map function to apply</param>
        /// <param name="reduce">reduce function to apply</param>
        /// <returns></returns>
        static IEnumerable<TResult>  MapReduce<TSource, TKey, TValue, TResult>(
            IEnumerable<TSource> source, 
            Func<TSource, IEnumerable<KeyValuePair<TKey, TValue>>> map, 
            Func<KeyValuePair<TKey, IEnumerable<TValue>>, TResult> reduce)
        {
            // Collection where map's result will we stored
            var mapResults = new ConcurrentBag<KeyValuePair<TKey, TValue>>();

            // Invokes, in a parallel way,the Map function for each item at source
            Parallel.ForEach(source, sourceItem =>
            {
                foreach (var mapResult in map(sourceItem))
                {
                    mapResults.Add(mapResult);
                }
            });

            // Groups al values by key.
            var reduceSources = mapResults.GroupBy(
                    item => item.Key, 
                    (key, values) => new KeyValuePair<TKey, IEnumerable<TValue>>(key, values.Select(i=>i.Value)));

            var resultCollection = new BlockingCollection<TResult>();

            // Kick off a reduce task
            Task.Factory.StartNew(() =>
            {
                // Invokes, in a parallel way,the Reduce function for each item at reduceSources
                Parallel.ForEach(reduceSources,
                                (reduceItem) => resultCollection.Add(reduce(reduceItem)));

                // Need to do this to keep GetConsumingEnumerable below from hanging
                resultCollection.CompleteAdding();
            });

            // Use resultCollection.GetConsumingEnumerable() instead of just resultCollection
            // because the former will block waiting for completion and the latter will
            // simply take a snapshot of the current state of the underlying collection.
            return resultCollection.GetConsumingEnumerable();
        }
    }
}
Advertisements

About Silvio Massari
I am a Node.js, .Net and C developer, and technology enthusiast. I really like spending time coding an idea.

3 Responses to Understanding MapReduce

  1. machadogj says:

    Finally!! Congrats on your first post, which is very cool by the way.
    Gus

  2. leo W says:

    it’s like a too complicated way of counting vocals, huh?
    for which kind of things do you use this approach in the real life? do you have any examples?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: