Introduction
Recently, I was asked to look into a .NET push-capable network messaging framework at work. We already had an in-process system working that helped us monitoring progress on long-running processes (using IObservable/IObserver). This just needed to be improved so that you could listen to updates from outside processes.
Naturally, I looked into SignalR. But we wanted to be able to do server-side message filtering (for instance, to have a central messaging server but only receive updates on certain long-running processes in a given listener application). And maybe also have LINQ-capable filtering. Core SignalR does not bring these things to the table. But I found Pushqa which uses SignalR under the hood and adds the functionality we were looking for. Pushqa introduces itself the following way:
Pushqa is a .Net library that allows the filtering of incoming push events from a server to be performed server-side.
It allows the consumer to define queries over event streams so that events that are being emitted server side can be filtered and constrained by client side code. The queries are serialized and executed server side rather than sending all the events to the client for client side filtering.
Pushqa uses Microsoft’s Reactive Extensions (Rx) expressions over an HTTP connection with the queries serialized using the oData URI specification.
A Pushqa service at this point is designed to be working within an ASP.NET MVC web application (see Pushqa website or this blog post for examples). This was not the kind of environment I was looking for. SignalR itself is not limited in this regard so I went looking for a way to create a self-hosting setup for Pushqa (the full source code for the following demo can be found here: https://dl.dropboxusercontent.com/u/31759146/ProgressUpdate.zip).
Service
I created a simple console application that uses in-process OWIN hosting. It uses the following NuGet packages:
Here is the simple setup:
using (WebApp.Start<Startup>("http://localhost:8080/"))
{
Console.WriteLine("Server running at http://localhost:8080/");
Console.ReadLine();
}
The Startup class defines the actual service. Here we map Pushqa’s own PersistentConnection implementation (QueryablePushService) to our endpoint:
public class Startup
{
public void Configuration(IAppBuilder app)
{
app.MapConnection<QueryablePushService<UpdateContext>>(
"/update",
new ConnectionConfiguration { EnableCrossDomain = true }
);
}
}
The Pushqa connection needs a context in which we define the observable property that clients want to access. For this demo I am simply using a Subject (which implements both IObservable and IObserver) into which my demo service puts messages into while running:
public class UpdateContext
{
internal static Subject<Update> InternalUpdates = new Subject<Update>();
public IQbservable<Update> Updates { get { return InternalUpdates.AsQbservable(); } }
}
Listener
For the client I also created a simple console application. It mainly uses the Pushqa.Client NuGet package. The client takes a long-running process’ ID and uses it to filter messages by this ID. It is also just interested in the next 10 messages. This filtering is all done server-side! This is great because now you can have a ton of messages going through the service but only the interesting ones actually get sent to clients. For comfortable client-side filtering and processing you can use Reactive Extensions.
var inputProgressId = Int32.Parse(args[0]);
var updateProvider = new UpdateEventProvider();
var subscription = updateProvider.Updates
.Where(u => u.ProcessId == inputProgressId)
.Take(10)
.AsObservable();
using(subscription.Subscribe(WriteUpdateToConsole))
{
Console.WriteLine("Listening to updates from process {0}", inputProgressId);
Console.ReadLine();
}
Here is the glue code for connecting the local listener observable property to the server-side one:
public class UpdateEventProvider : EventProvider
{
public UpdateEventProvider()
: base(new Uri("http://localhost:8080/update"))
{
}
public EventQuerySource<Update> Updates
{
get { return CreateQuery<Update>("Updates"); }
}
}
Conclusion
You can now start a service and have multiple listeners being updated in a push-manner and using the comfortable IObservable/IObserver pattern! The service is hosted in a simple console application (which at this point is not officially supported by Pushqa) and the message filtering can be done server-side!
I am not totally happy with Pushqa, though… Here is why:
- It uses a LINQ-like interface but it is not LINQ! You have simple functionality like Where and Take but you cannot use the full LINQ spectrum.
- Not many LINQ expressions are supported. For instance, a simple Where(() => true) is not supported!
- I would have expected Pushqa using some official OData library (ODataLib?) to do the OData plumbing but it relies on its own implementation.
- Passing the context to QueryablePushService is kind of awkward
But all in all it is a good framework that
- Gets the job done!
- Uses more or less well known technology under the hood
- Is in good shape source-code-wise
Again: The full source code for the following demo can be found here: https://dl.dropboxusercontent.com/u/31759146/ProgressUpdate.zip.