Hermes: an open source implementation of Publish/Subscribe engine

I’m glad to announce Hermes, a new open source project developed by TellagoDevLabs. It implements a Publish/Subscribe pattern exposed through a RESTFull API  and MongoDB as backend for storage.

Hermes allows us to create Topics and then Publishers and Subscribers will be able to publish and retrieve messages for a specific Topic.

Hermes is message agnostic, that means that it doesn’t care about what the message contains and its data type.

Its RESTFull interface allows us to interact with Hermes using plain HTTP request. The example below shows us how to publish the string “Hello Hermes!” for a Topic (In this example the Topic’s ID is ‘4e1aeec5e892e70b044c695c’)

To publish the message we should:

  1. Create a HttpWebRequest
  2. Set request’s method to POST
  3. Complete the Content-Type Header with a valid value, in this case it will be ‘text/text’
  4. Set Request’s body with the message, in this case it will be ‘Hello Hermes!’ but you can set any stream as message’s body.
  5. Optionally, you can add custom HTTP headers, in this sample I added a header named ‘Foo’
// Create HTTP POST request var request = (HttpWebRequest)WebRequest.Create("http://localhost:6156/messages/topic/4e1aeec5e892e70b044c695c");
request.Method = "POST";

// Add Body and Content-Type header to the Request request.ContentType = "text/text";
using (var stream = request.GetRequestStream())
    using (var writer = new StreamWriter(stream))
        writer.Write("Hello Hermes!");

// Add a custom header to the request request.Headers.Add("foo","some data");

// Publish the message var response = (HttpWebResponse)request.GetResponse();

// Displays HTTP status code and message's url. Console.WriteLine("HttpStatusCode: {0}", response.StatusCode);
Console.WriteLine("Header Location: {0}", response.Headers["Location"]);


When Hermes processes the request, it store the request’s body an all request’s headers, and after that it returns a HTTP response with status code 201 CREATED, and the Location HTTP header containing the URL to the message just published.

We can retrieve the message sending an HTTP GET request to the URL receive at Location header:

// Create HTTP GET request to Retrieve the message request = (HttpWebRequest)WebRequest.Create(response.Headers["Location"]);
request.Method = "GET";

// Send the request response = (HttpWebResponse)request.GetResponse();

Console.WriteLine("Retrieving message");
Console.WriteLine("Foo header: {0}", response.Headers["Foo"]);
Console.WriteLine("Message body: {0}", new StreamReader(response.GetResponseStream()).ReadToEnd());

Or we can retrieve all the messages for a specific topic sending an HTTP GET request to the Topic’s URL. The response will contain an array of links to each topic’s message:

// Create HTTP GET request request = (HttpWebRequest)WebRequest.Create("http://localhost:6156/messages/topic/4e1aeec5e892e70b044c695c");
request.Method = "GET";

// send request response = (HttpWebResponse)request.GetResponse();

Console.WriteLine("Polling topic's messages");

var contentAsString = new StreamReader(response.GetResponseStream()).ReadToEnd();
var content = XDocument.Parse(contentAsString);

I ran the sample above against a Topic that contains two messages, below is its output:


In future posts I will write about the Client API that we created in order to simplify consuming Hermes.

You can find the complete source code here and its documentation  here. And you can also find more information in these blogs:


Understanding MapReduce

The first time that I read something about MapReduce was in a paper from Google, its title is “MapReduce: Simplied Data Processing on Large Clusters

The idea captivated me from the beginning but I haven’t any opportunity to use it, until now, when a project took me over to MongoDB and find that MapReduce is part of it.

So, in this first post I’m going to talk about how MapReduce algorithm works, with a very simple c# implementation as a sample. In a future post I will show you how we can use it in MongoDB

What is MapReduce?

It is an algorithm that can process a very large amount of data in parallel. It receives three inputs, a source collection, a Map function and a Reduce function. And it will return a new data collection.

Collection MapReduce(Collection source, Function map, Function reduce)

The algorithm is compossed by few steps, the first one consists to execute the Map function to each item within the source collection. The Map will return zero or may instances of Key/Value objects.

 ArrayOfKeyValue Map(object itemFromSourceCollection)

So, we can say that Map’s responsability is to convert an item from the source collection to zero or many instances of Key/Value objects. We can see this at the picture below.

At the next step , the algorithm will sort all Key/Value instances and it will create new object instances where all values will be grouped by Key.

The last step will executes the Reduce function by each grouped Key/Value instance.

ItemResult Reduce(KeyWithArrayOfValues item)

 The Reduce function will return a new item instance that will be included into the result collection.


Let me show you a very simple c# implementation of this algorithm, it counts all the vocals in an array of strings.

I created a generic MapReduce functon that represents the algorithm’s orchestation, and I also implemented the Map and Reduce functions that are required as inputs of the MapReduce. The implementation of the Map and Reduce functions are specifics for the task that we want to acomplish, in this sample it is “To count all vocals within a set of strings”

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;

namespace MapReduceSample
    // This class represents a collection's item result
    class VocalCount
        public char Vocal;
        public int Count;

    class Program
        static void Main(string[] args)
            // "lines" represents the source collection.
            var lines = new[] { 
                "How many vocals do", 
                "these two lines have?" 

            foreach (var line in lines)

            // Invokes MapReduce
            var results = MapReduce(lines, Map, Reduce); 

            // Displays result
            foreach (var result in results)
                Console.WriteLine("{0} = {1}", result.Vocal, result.Count);


        /// <summary>
        /// The map function counts vocals in a string
        /// </summary>
        /// <param name="sourceItem">A line to be processed</param>
        /// <returns>A collection of Key/Value instances. 
        /// Where the key is the vocal, and the value is its count.</returns>
        static IEnumerable<KeyValuePair<char, int>> Map(string sourceItem)
            return sourceItem
                .Where(c => "aeiou".Contains(c))
                .GroupBy(c => c, (c, instances) => new KeyValuePair<char, int>(c, instances.Count())); 


        /// <summary>
        /// The reduce function compute the total count for each vocal
        /// </summary>
        /// <param name="reduceItem">A Key/Values instance. Where the key is the vocal, 
        /// and the value is an enumeration of all counts</param>
        /// <returns>A result instance, VocalCount</returns>
        static VocalCount Reduce(KeyValuePair<char, IEnumerable<int>> reduceItem)
            return new VocalCount
                Vocal = reduceItem.Key,
                Count = reduceItem.Value.Sum()  // Computes total count

        /// <summary>
        /// A generic implementation of MapReduce
        /// </summary>
        /// <typeparam name="TSource">Type of the items at the source collection</typeparam>
        /// <typeparam name="TKey">Key's type used by Map and Reduce functions</typeparam>
        /// <typeparam name="TValue">Value's type used by Map and Reduce functions</typeparam>
        /// <typeparam name="TResult">Type of the items at the returned collection</typeparam>
        /// <param name="source">Source collection</param>
        /// <param name="map">Map function to apply</param>
        /// <param name="reduce">reduce function to apply</param>
        /// <returns></returns>
        static IEnumerable<TResult>  MapReduce<TSource, TKey, TValue, TResult>(
            IEnumerable<TSource> source, 
            Func<TSource, IEnumerable<KeyValuePair<TKey, TValue>>> map, 
            Func<KeyValuePair<TKey, IEnumerable<TValue>>, TResult> reduce)
            // Collection where map's result will we stored
            var mapResults = new ConcurrentBag<KeyValuePair<TKey, TValue>>();

            // Invokes, in a parallel way,the Map function for each item at source
            Parallel.ForEach(source, sourceItem =>
                foreach (var mapResult in map(sourceItem))

            // Groups al values by key.
            var reduceSources = mapResults.GroupBy(
                    item => item.Key, 
                    (key, values) => new KeyValuePair<TKey, IEnumerable<TValue>>(key, values.Select(i=>i.Value)));

            var resultCollection = new BlockingCollection<TResult>();

            // Kick off a reduce task
            Task.Factory.StartNew(() =>
                // Invokes, in a parallel way,the Reduce function for each item at reduceSources
                                (reduceItem) => resultCollection.Add(reduce(reduceItem)));

                // Need to do this to keep GetConsumingEnumerable below from hanging

            // Use resultCollection.GetConsumingEnumerable() instead of just resultCollection
            // because the former will block waiting for completion and the latter will
            // simply take a snapshot of the current state of the underlying collection.
            return resultCollection.GetConsumingEnumerable();