Skip to main content

Understanding RefCount in Reactive Extensions

A couple of weeks ago @LordHanson & I ran into an issue converting a stateless async service exposed as an Rx cold observable to a connect-able hot observable. The issue was solved after a couple of hours, what follows is how we how got there.

The service is an Rx wrapper around the Caplin messaging API. The Caplin API is designed around the idea of opening a connection, subscribing on subjects and then receiving messages asynchronously - pretty standard and nothing unusual.

When used in an app where there's only ever one subscriber for a subject then a service designed around Rx cold observables is suitable - we had something similar to what's shown below, in this example I've substituted the Caplin part with an observable timer to spit out messages every second:
So this is a cold observable because every time the Listen method is called a new stream is created with it's own set of messages being published - Lee Campbell has a good explanation of hot & cold observables.

So when we went to a model where there would be multiple subscribers for a subject we needed to change the implementation, we now had to concern our selves with the idea of sharing the stream between multiple subscribers (threads). Under the covers the underlying Caplin connection was being shared between multiple threads this meant we didn't want the connection to be dropped just because one of the thread had disposed of it's observable instance, we only wanted connection to be dropped when ALL of the subscribers had disposed of their observable instances - we needed some kind of reference counting on the under lying stream...

We knew we had to create a hot connect-able observable using the Publish & Connect Rx extension methods - basically you create a connect-able stream by publishing and then explicitly connecting when you are ready to subscribe:
For us we wanted something a little more complicated - we needed our service to have only a single stream per subject, this stream would then be shared between multiple listeners (subscribers) and importantly as I have already said we needed ref counting on the stream. This is where we started to have problems.

We knew our service was going to cache the stream on a per subject basis, so that if a connection & subscription had already been made on a subject we'd return that instance:
We also knew we'd need a composite disposable so that we could remove the subject once it had been disposed:
Our issues were around how to use the Publish, Connect & RefCount Rx extension methods, specifically the following codes doesn't compile because the RefCount method is not exposed on the IDisposable interface, it's only exposed on the IConnectableDisposable interface:
What the documentation didn't make clear was the fact we didn't need the call to the Connect method. It only states:

 'Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.'

It doesn't explicitly state it actually calls Connect for you under the covers, we were in fact very close to the solution:
Below are couple of MSpec fixtures, the first shows the underlying observable is a shared instance:
The second shows once the subscribers are disposed and new one created, the newly created one is a different instance:
With the following test results:

Comments

  1. MSpec looks interesting - will have to check it out sometime :) Hamish

    ReplyDelete
  2. Presumably the "subjects" dictionary requires locking around it because of the different threads connecting and being disposed of asynchronously?

    ReplyDelete
  3. Based on the fact that dictionary is not thread safe by default then yes you would require some kind of synchronisation

    ReplyDelete

Post a Comment

Popular posts from this blog

Implementing a busy indicator using a visual overlay in MVVM

This is a technique we use at work to lock the UI whilst some long running process is happening - preventing the user clicking on stuff whilst it's retrieving or rendering data. Now we could have done this by launching a child dialog window but that feels rather out of date and clumsy, we wanted a more modern pattern similar to the way <div> overlays are done on the web. Imagine we have the following simple WPF app and when 'Click' is pressed a busy waiting overlay is shown for the duration entered into the text box. What I'm interested in here is not the actual UI element of the busy indicator but how I go about getting this to show & hide from when using MVVM. The actual UI elements are the standard Busy Indicator coming from the WPF Toolkit : The XAML behind this window is very simple, the important part is the ViewHost. As you can see the ViewHost uses a ContentPresenter element which is bound to the view model, IMainViewModel, it contains 3 child v...

Showing a message box from a ViewModel in MVVM

I was doing a code review with a client last week for a WPF app using MVVM and they asked ' How can I show a message from the ViewModel? '. What follows is how I would (and have) solved the problem in the past. When I hear the words ' show a message... ' I instantly think you mean show a transient modal message box that requires the user input before continuing ' with something else ' - once the user has interacted with the message box it will disappear. The following solution only applies to this scenario. The first solution is the easiest but is very wrong from a separation perspective. It violates the ideas behind the Model-View-Controller pattern because it places View concerns inside the ViewModel - the ViewModel now knows about the type of the View and specifically it knows how to show a message box window: The second approach addresses this concern by introducing the idea of messaging\events between the ViewModel and the View. In the example ...

WPF tips & tricks: Dispatcher thread performance

Not blogged for an age, and I received an email last week which provoked me back to life. It was a job spec for a WPF contract where they want help sorting out the performance of their app especially around grids and tabular data. I thought I'd shared some tips & tricks I've picked up along the way, these aren't probably going to solve any issues you might be having directly, but they might point you in the right direction when trying to find and resolve performance issues with a WPF app. First off, performance is something you shouldn't try and improve without evidence, and this means having evidence proving you've improved the performance - before & after metrics for example. Without this you're basically pissing into the wind, which can be fun from a developer point of view but bad for a project :) So, what do I mean by ' Dispatcher thread performance '? The 'dispatcher thread' or the 'UI thread' is probably the most ...