Feb 18, 2010

Adding Rx Framework to Courier

In a previous series of posts I introduced the Courier framework. Recently I have been doing some refactoring on the project and feature enhancements. The refactoring was fairly straight forward.

I was never quite happy with the way that you unregistered for a message so I changed the model of message registration to return a token and then subsequently changed the signature of unregister to take in this token. That change was pretty straight forward and badly needed.

The big feature in this update is the integration with the Rx Framework. If you don’t understand Rx I suggest diving into these videos. Be forewarned, the Rx guys are incredibly smart and tend to blaze through things very quickly. Much is left “up to the reader” to figure out and play with on their own. This makes it pretty difficult to grok. I am not sure I really understand all the moving parts of Rx.

One of my main goals in adding Rx to the Courier project was to allow you do include or exclude the Rx features with minimal pain. I also wanted to avoid a deep dependency on the Rx extension assembly. So I created a separate assembly for the Rx features that introduces an extension method to the Messenger class. This gives you an additional way to subscribe to a message and get an IObservable back. Since it is in a separate assembly you should be able to omit the assembly (and any dependency on Rx) pretty easily if you wanted to.

So what does this change let you do? Essentially I have added one extension method to the messenger that gives you different method to subscribe for a message. Syntactically it looks like this

IObservable myObservable = MyMessenger.RegisterForMessage("Foo");


This tells the messenger that you want to listen for the “Foo” message and get the result pushed into your myObservable sequence. So why is this cool? LINQ.



Plain and simple IObservable is queryable with LINQ and that makes for very interesting scenarios when you are dealing with a message pump that is very active and you want to filter out messages based on a criteria. I have added an example of how to use the new Rx extension method and tried to model the scenario into a real world situation.



Imagine you are listening to WCF service that is sending messages from a Blood Pressure monitor. Imagine the Messenger is broadcasting messages each time the Blood Pressure monitor sends some data.   This can lead to a flood of messages firing through the Messenger. Using this new extension method you can write a simple LINQ query to filter these messages and only “react” to the ones you care about.



In the scenario I show in the sample project I only care about messages when the blood pressure is outside of the “normal” range. In those cases I want to “react” and show some alert. I have written a quick and dirty simulation that will fire messages every second and randomize the pressure. When the pressure is outside of the predefined range I will get a message “pushed” into my IObservable sequence. When this happens I have a delegate that “reacts” to this “push” and does something (in my case I raise an alert)



So, from the snippet above I can take it one step further and do this:



IObservable<Object> observable = from msg in Mediator.RegisterForMessage("BloodPressureMessage")
where (((BloodPressure)msg).Systolic < 90 || ((BloodPressure)msg).Systolic > 119)
&& (((BloodPressure)msg).Diastolic < 60 || ((BloodPressure)msg).Diastolic > 79)
select msg;

observable.Subscribe(OnAlertReceived);


Notice how I am now writing a LINQ query on the IObservable that the messenger is returning from the RegisterForMessage method. The line after the LINQ query is how I subscribe to the “push” events that happen on the observable sequence. OnAlertReceived is the delegate I want invoked when a new message is “pushed” on the IObservavle sequence.



Cool? I think so. Also, this check in I am going to promote to Beta 2 status. I have added unit tests and done some other small cleanup work. I think it is getting close to being ready for a true release. I would like to buff out the sample application a bit more and get some more API documentation in place and work out and remaining bugs.



one little nit that I want to clean up in the next version is the casting of the “msg” in the LINQ Query. There are some internal issues with how I am implementing Rx under the covers that makes strong typing of the resulting observable challenging, I don’t have a solution for this yet given my current implementation, so if you find one please contribute it back to the project or let me know.

Labels: , ,

2 Comments:

Blogger Unknown said...

Hello,

Thanks for your great work of Courier! Just two question on it.

1) Where can I find a sample of Courier? Seems there is only binary files on codeplex.
2) Can we use Courier in Silverlight 3/4 platform, or it is only can be used for WPF?

March 10, 2011 at 1:16 AM  
Blogger Brad said...

Hi Wang,

You can get the samples and the source code from the Source Code tab on the codeplex site. http://courier.codeplex.com/SourceControl/list/changesets

Currently Silverlight is experimentally supported. Basically I haven't spent enough time testing to know what issues there are.

There is one known issue currently with Silverlight. That is that the Action you pass to the moderator (to be raised on your object when a message is sent) MUST be public in Silverlight. In WPF this isn't the case, the delegate can be private.

This is due to Silverlight's protection model. This is a pretty easy issue to work around, just make the Action public.

I have not released binaries with Silverlight support yet due to this issue and the lack of test coverage. However, you are more than welcome to pull down the latest source and compile the binaries yourself.

Good luck, If you do use it under Silverlight and find any additional issues please feel free to post them on the codeplex site so I can resolve them.

March 10, 2011 at 9:43 AM  

Post a Comment

Subscribe to Post Comments [Atom]

<< Home