Playing with Mass Transit - Publish/Subscribe
Discussions at ALT.NET and with Greg Young made me realize that I need to get more into messaging so I thought I’d start with a piece of work I’m doing now that is ripe for a bit of messaging. The piece of work relates to domain events, when an event happens in the domain (e.g Customer becomes active) we generate a message which subscribers can pick up. Generating an MSMQ message and sending it to subscribers seems sensible.
The obvious framework to look at was NServiceBus but Mass Transit is an alternative which I decided to try. Whilst playing with it I thought I'd write up my discoveries in the hope that someone might be useful and also produced a little sample project which you can download here.
First I should add some caveats:
- Playing Around - The code is just me playing with Mass Transit, I am certainly no expert with it and there will be better ways of doing what I'm doing.
- Quality - I specifically didn't refactor/redesign code or add things like transactions because this was a learning exercise.
- Code Will Change - Any references to specific areas of code within this document could go out of date if they refer to classes in the Mass Transit codebase.
- Existing Examples - There is an existing publish/subscribe example with Mass Transit (MassTransit\Samples\PublishSubscribe) but I thought it was worth working through my own solution.
Note that this sample contains a copy of some the Mass Transit code, I've done this to make it easier to setup/debug but the code will quickly go out of date so it is certainly worth downloading the latest copy.
Running The Sample
If you want to run the different parts of the example then you can do so using the binaries in the TestAppBinaries folder. The parts of the solution are:
- SubscriptionManager - Stores subscriptions in memory and will provide information about those subscriptions to interested parties. This needs to be started before the other parts of the solution.
- Receiver - Registers its interest in football result messages and then prints them to the console. You can run multiple instances of the receiver as it allows you to enter a number that is prefixed to the queue name.
- Sender- Publishes football result messages out to interested subscribers, it knows what the interested subscribers are because it keeps track of the latest subscriptions that have been registered with SubscriptionManager.
Debugging wise I found I could only really follow one app at a time, so I might start SubscriptionManager and Receiver and then debug Sender and then check that it correctly finds out about and processes the fact that Receiver wants to know about FootbalResultMessages.
Currently you must start the Receiver before the Sender or the behaviour will not be as expected, I will look into the reasons for this.
Message Delivery Options
The world revolves around the ServiceBus class which has several methods that you can use to send messages:
- Publish – You just specify the message to publish, the ServiceBus will decide where to send the messages based on its own internal subscription cache.
- Send – You specify where to send the message (IEndPoint) and the messages to send.
- Deliver – You pass in an IEnvelope which specifies the message and where to deliver it to.
Send/Deliver are fine and I do use them but they do not promote truly loose coupling, in many cases you won't want to specify the destination when sending the message and so I’m really more interested in Publish. You can see this if you look at Sender class (well its a static Main method, but hey this is just sample code) as it contains the following line:
bus.Publish(new FootballResultMessage(message))To understand the way that the Publish approach works you need you need to look at how the ServiceBus manages subscriptions...
Subscriptions
For publish/subscribe to work you need to decouple the publisher from the subscribers and unsurprisingly you do this using messaging. The Mass Transit documentation describes one method of managing subscriptions where you register them with a single queue. There are multiple parts to this:
- Adding/Removing Subscriptions - You can manage subscriptions dynamically, for example a subscriber can send an AddSubscription message to register interest in a particular type of message.
- Managing Subscriptions - I'm centralizing the subscriptions for this example.
- Requesting Subscriptions - If the subscriptions are managed centrally then publishers need to be able to ask for the list of endpoints that handle particular types of messages.
I'll explain one potential way of handling these two parts using Mass Transit.
Adding/Removing Subscriptions (Receiver)
A subscriber sends an AddSubscription message to the ServiceBus when it wants to subscribe to a particular kind of message. The AddSubscription message takes two parameters, the message name and the URI (which in our case means MSMQ queue) to send messages of that type to.
To see how this works look at SubscriptionBasedMessageProcessor, in the Subscribe method it registers its interest in the message type that it is setup with. This registration involves the AddSubscription message being sent to the SubscriptionManager. In addition we specify a delegate (callback) that will be run when a message of the specified type arrives.
The code that sends the AddSubscription message is in MsmqUtil:
private static void SendSubscription(ServiceBus bus)
{
AddSubscription subscriptionMessage = new AddSubscription(MessageName, bus.Endpoint.Uri);
SendSubscriptionUpdate(bus, subscriptionMessage);
}
private static void SendSubscriptionUpdate(ServiceBus bus, SubscriptionChange subscriptionMessage)
{
MsmqEndpoint publishersQueue = "msmq://./subscriptions";
bus.Send(publishersQueue, subscriptionMessage);
}
You can see that in this case I'm sending an AddSubscription message to the central subscription management queue saying that football result messages should be sent to the ServiceBus that is passed in (which is the same queue that the BasicMessageReceiver is listening on).
Managing Subscriptions (SubscriptionManager)
My centralized store needs to maintain the list of subscriptions and also provide a way for interested parties to find out about them:
- Subscription Cache - A subscription cache inherits from ISubscriptionCache and maintains the list of subscriptions that have been registered, LocalSubscriptionCache keeps the list in memory and NHibernateSubscriptionStorage stores it in the DB (see SQL script provided with Mass Transfer that sets up the table).
- Subscription Service - The SubscriptionService class provides the functionality needed to consume messages related to subscriptions, my centralized subscription service thus uses an instance of this class.
Since all I needed was in-memory supported I used LocalSubscriptionCache. Not that I had to make LocalSubscriptionCache implement ISubscriptionRepository so that I could use it with SubscriptionService, however this was a trivial change. The code that registers the cache is:
LocalSubscriptionCache cache = new LocalSubscriptionCache();
ServiceBus bus = new ServiceBus(subscriptionQueue, cache);
SubscriptionService subscriptionService = new SubscriptionService(bus, cache, cache);
subscriptionService.Start();
To see this code at work put a break point in MsmqMessageReceiver.ProcessMessage, start SubscriptionManager in the debugger and then open Sender which causes a cache update request message to come in for processing. You should end up debugging into SubscriptionService.HandleCacheUpdateRequest which ensures the appropriate response is sent back to the caller.
One interesting thing to note is that when Sender sends off the CacheUpdateRequest it puts the return address as its own ServiceBus' end point (the MSMQ queue it is working from). The reply message is picked off this queue by MsmqMessageReceiver and is then routed to the SubscriptionClient which has registered its interest in the reply.
Requesting Subscriptions (Sender)
I've now managed to get my subscriptions registered with a centralized queue (backed up by SubscriptionManager), however I need to make sure that I can get the latest subscriptions when I need them. Luckily this is easily accomplished using the SubscriptionClient:
SubscriptionClient subscriptionClient = new SubscriptionClient(bus, bus.SubscriptionCache, subscriptionServiceEndpoint);
subscriptionClient.Start();
This class ensures that I am kept up to date as subscriptions are updated, for example if an AddSubscription is processed by the centralized service then it will be distributed back to my service which will cause SubscriptionClient.CacheUpdateResponse_Callback to execute (NOTE: I'm not sure I've fully understood this functionality yet and it is not working quite as I expected so this last statement may not be correct).
Internals When Processing Messages
One key thing I learned when working on this was how key the queue that you pass into a ServiceBus on construction is, this queue is the one that the ServiceBus looks for messages on.
To see how key it is you can write a simple code example and follow it through. Create a ServiceBus then call Subscribe and pass in any old delegate before publishing a message of the same type using Publish.
When you run the code you can see a few things:
- When you call Subscribe on the queue two things happen, firstly the delegate that is to be executed when the message is consumed is saved (_consumers) and secondly a Subscription is created and saved. This subscription basically associates the message type with the queue that the ServiceBus is feeding off (EndPoint).
- When you publish a method using ServiceBus.Publish a message is put on the queue that the ServiceBus is feeding off (EndPoint).
- The code in MsmqMessageReceiver.MonitorQueue will ensure that the message is read off that queue and will then be processed by the ServiceBus. To see the way that the ServiceBus processes the message look at ServiceBus.Deliver.
This setup makes a lot of sense as the behaviour is the same regardless of the messages origin, so a message coming into the ServiceBus’ queue from outside is treated the same as a message that the ServiceBus adds to its own queue.
No comments:
Post a Comment