A couple of weeks ago @LordHanson & I ran into an issue converting a stateless async service exposed as an Rx cold observable to a connect-able hot observable. The issue was solved after a couple of hours, what follows is how we how got there.
The service is an Rx wrapper around the Caplin messaging API. The Caplin API is designed around the idea of opening a connection, subscribing on subjects and then receiving messages asynchronously - pretty standard and nothing unusual.
When used in an app where there's only ever one subscriber for a subject then a service designed around Rx cold observables is suitable - we had something similar to what's shown below, in this example I've substituted the Caplin part with an observable timer to spit out messages every second:
So this is a cold observable because every time the Listen method is called a new stream is created with it's own set of messages being published - Lee Campbell has a good explanation of hot & cold observables.
So when we went to a model where there would be multiple subscribers for a subject we needed to change the implementation, we now had to concern our selves with the idea of sharing the stream between multiple subscribers (threads). Under the covers the underlying Caplin connection was being shared between multiple threads this meant we didn't want the connection to be dropped just because one of the thread had disposed of it's observable instance, we only wanted connection to be dropped when ALL of the subscribers had disposed of their observable instances - we needed some kind of reference counting on the under lying stream...
We knew we had to create a hot connect-able observable using the Publish & Connect Rx extension methods - basically you create a connect-able stream by publishing and then explicitly connecting when you are ready to subscribe:
For us we wanted something a little more complicated - we needed our service to have only a single stream per subject, this stream would then be shared between multiple listeners (subscribers) and importantly as I have already said we needed ref counting on the stream. This is where we started to have problems.
We knew our service was going to cache the stream on a per subject basis, so that if a connection & subscription had already been made on a subject we'd return that instance:
We also knew we'd need a composite disposable so that we could remove the subject once it had been disposed:
Our issues were around how to use the Publish, Connect & RefCount Rx extension methods, specifically the following codes doesn't compile because the RefCount method is not exposed on the IDisposable interface, it's only exposed on the IConnectableDisposable interface:
What the documentation didn't make clear was the fact we didn't need the call to the Connect method. It only states:
'Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.'
It doesn't explicitly state it actually calls Connect for you under the covers, we were in fact very close to the solution:
Below are couple of MSpec fixtures, the first shows the underlying observable is a shared instance:
The second shows once the subscribers are disposed and new one created, the newly created one is a different instance:
With the following test results:
The service is an Rx wrapper around the Caplin messaging API. The Caplin API is designed around the idea of opening a connection, subscribing on subjects and then receiving messages asynchronously - pretty standard and nothing unusual.
When used in an app where there's only ever one subscriber for a subject then a service designed around Rx cold observables is suitable - we had something similar to what's shown below, in this example I've substituted the Caplin part with an observable timer to spit out messages every second:
So this is a cold observable because every time the Listen method is called a new stream is created with it's own set of messages being published - Lee Campbell has a good explanation of hot & cold observables.
So when we went to a model where there would be multiple subscribers for a subject we needed to change the implementation, we now had to concern our selves with the idea of sharing the stream between multiple subscribers (threads). Under the covers the underlying Caplin connection was being shared between multiple threads this meant we didn't want the connection to be dropped just because one of the thread had disposed of it's observable instance, we only wanted connection to be dropped when ALL of the subscribers had disposed of their observable instances - we needed some kind of reference counting on the under lying stream...
We knew we had to create a hot connect-able observable using the Publish & Connect Rx extension methods - basically you create a connect-able stream by publishing and then explicitly connecting when you are ready to subscribe:
For us we wanted something a little more complicated - we needed our service to have only a single stream per subject, this stream would then be shared between multiple listeners (subscribers) and importantly as I have already said we needed ref counting on the stream. This is where we started to have problems.
We knew our service was going to cache the stream on a per subject basis, so that if a connection & subscription had already been made on a subject we'd return that instance:
We also knew we'd need a composite disposable so that we could remove the subject once it had been disposed:
Our issues were around how to use the Publish, Connect & RefCount Rx extension methods, specifically the following codes doesn't compile because the RefCount method is not exposed on the IDisposable interface, it's only exposed on the IConnectableDisposable interface:
What the documentation didn't make clear was the fact we didn't need the call to the Connect method. It only states:
'Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.'
It doesn't explicitly state it actually calls Connect for you under the covers, we were in fact very close to the solution:
Below are couple of MSpec fixtures, the first shows the underlying observable is a shared instance:
The second shows once the subscribers are disposed and new one created, the newly created one is a different instance:
With the following test results:
MSpec looks interesting - will have to check it out sometime :) Hamish
ReplyDeletePresumably the "subjects" dictionary requires locking around it because of the different threads connecting and being disposed of asynchronously?
ReplyDeleteBased on the fact that dictionary is not thread safe by default then yes you would require some kind of synchronisation
ReplyDelete