Skip to main content

Understanding RefCount in Reactive Extensions

A couple of weeks ago @LordHanson & I ran into an issue converting a stateless async service exposed as an Rx cold observable to a connect-able hot observable. The issue was solved after a couple of hours, what follows is how we how got there.

The service is an Rx wrapper around the Caplin messaging API. The Caplin API is designed around the idea of opening a connection, subscribing on subjects and then receiving messages asynchronously - pretty standard and nothing unusual.

When used in an app where there's only ever one subscriber for a subject then a service designed around Rx cold observables is suitable - we had something similar to what's shown below, in this example I've substituted the Caplin part with an observable timer to spit out messages every second:
So this is a cold observable because every time the Listen method is called a new stream is created with it's own set of messages being published - Lee Campbell has a good explanation of hot & cold observables.

So when we went to a model where there would be multiple subscribers for a subject we needed to change the implementation, we now had to concern our selves with the idea of sharing the stream between multiple subscribers (threads). Under the covers the underlying Caplin connection was being shared between multiple threads this meant we didn't want the connection to be dropped just because one of the thread had disposed of it's observable instance, we only wanted connection to be dropped when ALL of the subscribers had disposed of their observable instances - we needed some kind of reference counting on the under lying stream...

We knew we had to create a hot connect-able observable using the Publish & Connect Rx extension methods - basically you create a connect-able stream by publishing and then explicitly connecting when you are ready to subscribe:
For us we wanted something a little more complicated - we needed our service to have only a single stream per subject, this stream would then be shared between multiple listeners (subscribers) and importantly as I have already said we needed ref counting on the stream. This is where we started to have problems.

We knew our service was going to cache the stream on a per subject basis, so that if a connection & subscription had already been made on a subject we'd return that instance:
We also knew we'd need a composite disposable so that we could remove the subject once it had been disposed:
Our issues were around how to use the Publish, Connect & RefCount Rx extension methods, specifically the following codes doesn't compile because the RefCount method is not exposed on the IDisposable interface, it's only exposed on the IConnectableDisposable interface:
What the documentation didn't make clear was the fact we didn't need the call to the Connect method. It only states:

 'Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.'

It doesn't explicitly state it actually calls Connect for you under the covers, we were in fact very close to the solution:
Below are couple of MSpec fixtures, the first shows the underlying observable is a shared instance:
The second shows once the subscribers are disposed and new one created, the newly created one is a different instance:
With the following test results:

Comments

  1. MSpec looks interesting - will have to check it out sometime :) Hamish

    ReplyDelete
  2. Presumably the "subjects" dictionary requires locking around it because of the different threads connecting and being disposed of asynchronously?

    ReplyDelete
  3. Based on the fact that dictionary is not thread safe by default then yes you would require some kind of synchronisation

    ReplyDelete

Post a Comment

Popular posts from this blog

Implementing a busy indicator using a visual overlay in MVVM

This is a technique we use at work to lock the UI whilst some long running process is happening - preventing the user clicking on stuff whilst it's retrieving or rendering data. Now we could have done this by launching a child dialog window but that feels rather out of date and clumsy, we wanted a more modern pattern similar to the way <div> overlays are done on the web. Imagine we have the following simple WPF app and when 'Click' is pressed a busy waiting overlay is shown for the duration entered into the text box. What I'm interested in here is not the actual UI element of the busy indicator but how I go about getting this to show & hide from when using MVVM. The actual UI elements are the standard Busy Indicator coming from the WPF Toolkit : The XAML behind this window is very simple, the important part is the ViewHost. As you can see the ViewHost uses a ContentPresenter element which is bound to the view model, IMainViewModel, it contains 3 child v

Showing a message box from a ViewModel in MVVM

I was doing a code review with a client last week for a WPF app using MVVM and they asked ' How can I show a message from the ViewModel? '. What follows is how I would (and have) solved the problem in the past. When I hear the words ' show a message... ' I instantly think you mean show a transient modal message box that requires the user input before continuing ' with something else ' - once the user has interacted with the message box it will disappear. The following solution only applies to this scenario. The first solution is the easiest but is very wrong from a separation perspective. It violates the ideas behind the Model-View-Controller pattern because it places View concerns inside the ViewModel - the ViewModel now knows about the type of the View and specifically it knows how to show a message box window: The second approach addresses this concern by introducing the idea of messaging\events between the ViewModel and the View. In the example below

Custom AuthorizationHandler for SignalR Hubs

How to implement IAuthorizationRequirement for SignalR in Asp.Net Core v5.0 Been battling this for a couple of days, and eventually ended up raising an issue on Asp.Net Core gitHub  to find the answer. Wanting to do some custom authorization on a SignalR Hub when the client makes a connection (Hub is created) and when an endpoint (Hub method) is called:  I was assuming I could use the same Policy for both class & method attributes, but it ain't so - not because you can't, because you need the signatures to be different. Method implementation has a resource type of HubInnovationContext: I assumed class implementation would have a resource type of HubConnectionContext - client connects etc... This isn't the case, it's infact of type DefaultHttpContext . For me I don't even need that, it can be removed completely  from the inheritence signature and override implementation. Only other thing to note, and this could be a biggy, is the ordering of the statements in th