Transactional MSMQ

Couple of weeks ago I have discovered excellent support from Microsoft that allows one to easily solve the producer-consumer problem.

As stated on Wikipedia, the producer-consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue (link).

In order to avoid re-implementing over and over the queue, one can use the MSMQ (Microsoft Message Queuing) that comes out of the box on Windows operating systems.

Microsoft Message Queuing or MSMQ is a Message Queue implementation developed by Microsoft and deployed in its Windows Server operating systems since Windows NT 4 and Windows 95. The latest Windows 8 also includes this component. In addition to its mainstream server platform support, MSMQ has been incorporated into Microsoft Embedded platforms since 1999 and the release of Windows CE 3.0 (link).

Among immediate benefits of MSMQ I have noticed the following:

  • Transactional behavior
  • Persistence
  • Support for security settings on Queues

The following example is built under Windows 7 using Visual Studio 2010 and .Net 4.0.

1. Install MSMQ under Windows 7

  • Go to: Control Panel->Programs->Programs and Features->Turn Windows features on or off
  • Select: Microsoft Message Queue (MSMQ) Server
  • Press OK

WindowsFeatures

2. Verify that MSMQ component has been installed

  • Go to: Computer Management->Services and Applications

Computer Management

By now you should be able to see that the MSMQ component has been installed.

3. Creating queues

In our example we will use a private transactional queue.

One has two options:

a. Create the queue manually

  • Go to: Computer Management -> Services and Applications-> Message Queuing
  • Right Click on Private Queues -> New -> Private Queue
  • Add queue name, Select transactional, Click OK

b. Create the queue programmatically

I will go for creating the queue programmatically:

Using Visual Studio 2010  just create a solution called MsmqExample that contains a Console Application called MsmqExample:

MsmqExample_sln

Also make sure you add a reference to System.Messaging component (her is where MSMQ API resides).

Creating a private transactional queue is straightforward:

namespace MsmqExample
{
    class Program
    {
        static void Main(string[] args)
        {
            const string queuePath = @".\private$\commonqueue";

            if (!MessageQueue.Exists(queuePath))
            {
                MessageQueue.Create(queuePath, true);
            }
        }
    }
}

After executing the above, you should be able to see that a new private transactional queue has been created on your computer (check Computer Management).

4. Creating a message object

In order to add data to a queue and retrieve data from the queue we should create a custom type that will embed data.

Instances of the custom type will be serialized when they are added to the queue and deserialized when they are read from the queue. The serialization engine that is used by MSMQ in order to serialize/deserialize data is XmlSerializer. XmlSerializer serializes all public properties and fields of an object so we will define the following custom type:

namespace MsmqExample
{
    public struct CustomMessage
    {
        public int Id { get; set; }

        public string Description { get; set; }
    }
}

5. Adding a message object to the queue

First we will need to create an instance of System.Messaging.MessageQueue that will refer to a certain path (the queue path).

Next, in order to send a new object to the System.Messaging.MessageQueue we will wrap the object inside a System.Messaging.Message and create a System.Messaging.MessageQueueTransaction in order to send the message to the System.Messaging.MessageQueue.

namespace MsmqExample
{
    internal class Sender<T> : IDisposable
    {
        private MessageQueue m_queue;

        public Sender(string queuePath)
        {
            m_queue = new MessageQueue(queuePath);
        }

        public void Dispose()
        {
            if (m_queue != null)
            {
                m_queue.Dispose();
                m_queue = null;
            }
        }

        public void Send(T message)
        {
            using (var queueMessage = new Message(message))
            using (var transaction = new MessageQueueTransaction())
            {
                try
                {
                    transaction.Begin();
                    m_queue.Send(queueMessage, transaction);
                    transaction.Commit();
                }
                catch (Exception)
                {
                    transaction.Abort();
                }
            }
        }
    }
}

The Sender<T> class can be used in order to add message to a queue:

namespace MsmqExample
{
    class Program
    {
        static void Main(string[] args)
        {
            const string queuePath = @".\private$\commonqueue";

            if (!MessageQueue.Exists(queuePath))
            {
                MessageQueue.Create(queuePath, true);
            }

            var message = new CustomMessage
            {
                Id = 123,
                Description = "Test Message",
            };

            var sender = new Sender<CustomMessage>(queuePath);
            sender.Send(message);
        }
    }
}

After executing the previous code the “.\private$\commonqueue”

should be populated with our custom message. In order to verify that just go to Computer Management->Services and Applications->Message Queuing->Private Queues->commonqueue->Queue messages. If you double-click the message and go to the Body part you should see that it contains the serialization result of instance of CustomMessage:

MSMQ_Add

6. Reading messages from the queue

For receiving/reading a message from the queue we will create a Listener class that will register itself as a listener on a . The Listener class accepts a delegate (Func<T, bool> messageHandler) as parameter for the Start method. Whenever a message is detected in the queue the delegate will be invoked.

Note: MSMQ will take care to notify the Listener class in a background thread.

The Listener class:

namespace MsmqExample
{
    internal class Listener<T> : IDisposable
    {
        private MessageQueue m_queue;
        private bool m_listen;
        private Func<T, bool> m_messageHandler;

        public Listener(string queuePath)
        {
            m_queue = new MessageQueue(queuePath);
            m_queue.Formatter = new XmlMessageFormatter(new [] { typeof(T) });
        }

        public void Dispose()
        {
            m_queue.PeekCompleted -= OnPeekCompleted;
            m_queue.Dispose();
            m_queue = null;
        }

        public void Start(Func<T, bool> messageHandler)
        {
            m_listen = true;
            m_queue.PeekCompleted += OnPeekCompleted;
            m_messageHandler = messageHandler;

            StartListening();
        }

        private void StartListening()
        {
            if (!m_listen)
            {
                return;
            }

            m_queue.BeginPeek();
        }

        private void OnPeekCompleted(object sender, PeekCompletedEventArgs e)
        {
            m_queue.EndPeek(e.AsyncResult);
            var messageId = string.Empty;

            using (var transaction = new MessageQueueTransaction())
            {
                try
                {
                    transaction.Begin();
                    using (var message = m_queue.Receive(transaction))
                    {
                        if (m_listen && message != null)
                        {
                            messageId = message.Id;
                            var concreteMessage = (T) message.Body;
                            if (ProcessMessage(concreteMessage))
                            {
                                transaction.Commit();
                            }
                            else
                            {
                                transaction.Abort();
                            }
                        }
                    }
                }
                catch (Exception exception)
                {
                    Console.WriteLine("Exception for messageId : {0}. Exception : {1}", messageId, exception);
                    transaction.Abort();
                }
            }

            StartListening();
        }

        private bool ProcessMessage(T body)
        {
            if (m_messageHandler != null)
            {
                return m_messageHandler(body);
            }

            return false;
        }
    }
}

In order to listen to a queue just instantiate the Listener class, invoke Start method with a delegate that will receive the message. Note that the delegate will execute in a new background thread so in order to see the results we add a Sleep at the end of Main method. Without the sleep the program will terminate even if there are background threads that execute:

namespace MsmqExample
{
    class Program
    {
        static void Main(string[] args)
        {
            const string queuePath = @".\private$\commonqueue";

            if (!MessageQueue.Exists(queuePath))
            {
                MessageQueue.Create(queuePath, true);
            }

            var message = new CustomMessage
            {
                Id = 123,
                Description = "Test Message",
            };

            var sender = new Sender<CustomMessage>(queuePath);
            sender.Send(message);

            var listener = new Listener<CustomMessage>(queuePath);
            listener.Start(
                m =>
                {
                    Console.WriteLine("New message Id : {0} Description : {1}", m.Id, m.Description);
                    return true;
                }
            );

            Thread.Sleep(2000);
        }
    }
}
About these ads

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s