Skip to main content

Trying to be more functional with Rx

I realised this week I'm not being as functional when creating an Rx extension method as I should could be.

This came out of a discussion I was having with @leeoades about a Pausable<T> extension we thought we needed at work. Lee had a solution and I thought I'd try to create one without looking for the answer on the t'internet.

   1:  public static IObservable<T> Pausable<T>(this IObservable<T> stream,
                                                IObservable<bool> paused,
                                                bool initialState = false)
   2:  {
   3:      ...
   4:  }

Hopefully the idea of the method is obvious - have the ability to pause & resume the publishing of instances to the stream.

Before implementing Pausable<T> I thought I'd implement an extension that didn't remember state whilst paused - Suspendable<T>, as you can see the same signature.

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      ...
   4:  }

So my first attempt looked like this:

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      var suspended = new ReplaySubject<bool>(1);
   4:      suspended.OnNext(initialState);
   6:      suspend.Subscribe(suspended.OnNext);
   7:      return stream.Where(t => !suspended.Take(1).Wait());
   8:  }

And to make sure this does exactly what's expected a set of tests covering all the edge cases:
Each of these test follow a common pattern with a defined generator publishing numbers to the Rx stream, 0 - 100. Shown below is the setup and a couple of the tests, one demonstrating a single resume and the other a multiple resume scenario:

   1:  [SetUp]
   2:  public void SetUp()
   3:  {
   4:      _generatorCount = 100;
   5:      _testScheduler = new TestScheduler();
   7:      _generator = Observable.Generate(1,
   8:          x => x <= _generatorCount,
   9:          x => x + 1,
  10:          x => x,
  11:          x => TimeSpan.FromSeconds(1), _testScheduler);
  12:  }
  14:  [Test]
  15:  public void should_recieve_values_after_single_resuming()
  16:  {
  17:      // ARRANGE
  18:      var count = 0;
  19:      var suspend = new Subject<bool>();
  21:      _generator.Suspendable(suspend, true)
  22:          .Subscribe(n => count++);
  24:      // ACT
  25:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1100).Ticks);
  26:      suspend.OnNext(false);
  27:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1010).Ticks);
  29:      // ASSERT
  30:      Assert.That(count, Is.EqualTo(1));
  31:  }
  33:  [Test]
  34:  public void should_recieve_values_after_multiple_resuming()
  35:  {
  36:      // ARRANGE
  37:      var count = 0;
  38:      var suspend = new Subject<bool>();
  40:      _generator.Suspendable(suspend, true)
  41:          .Subscribe(n => count++);
  43:      // ACT
  44:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  45:      suspend.OnNext(false);
  46:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  47:      suspend.OnNext(true);
  48:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  49:      suspend.OnNext(false);
  50:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  52:      // ASSERT
  53:      Assert.That(count, Is.EqualTo(20));
  54:  }

So why is this implementation of Suspendable<T> not an ideal solution even though all the tests pass?

Subjects are mutable by design and from a functional programming perspective the idea of being able to mutate state is not something you want to do. Obviously you can use subjects, one scenario is where the subject is the origin of an Rx stream, e.g. in a service publishing instances asynchronously via the OnNext method. This isn't applicable in an Rx extension method. Ideally one should be using other Rx operators & extension methods.

A quick Google search for 'avoid Subject Rx' leads me to a post on the Rx forum by the Eric Meijer giving his view on the topic.

So how do I make this a more functional implementation?

The easiest way to avoid using a Subject<T> is to use a Observable.Create<T> to return the IObservable<T> instance, this is what I came up with:

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      return Observable.Create<T>(o =>
   4:      {
   5:          var disposable = suspend.StartWith(initialState)
   6:                  .DistinctUntilChanged()
   7:                  .Select(s => s ? Observable.Empty<T>() : stream)
   8:                  .Switch()
   9:                  .Subscribe(o);
  11:          return disposable;
  12:      });
  13:  }

This introduced me to two new Rx operators - StartWith & Switch, StartWith should be obvious in what it does but Switch is a little more subtle, it only produces values from the latest observable stream. In the code above the Select operator returns the original observable stream or an empty observable stream depending on the suspend observable stream most recent value.

Now this implementation has an issue and because I'm a good programmer and already had unit tests it was picked up straight away :)
As you can see from the output the test 'should_complete_when_stream_completes' fails - hopefully the test name is obvious, but if not basically I'm expecting the OnComplete action to be called when the stream completes:
For some reason the completed parameter is never set to true and therefore the assert fails as shown by the red x shown at the side. The stream is setup as follows, you can see it completes when 100 is reached:

   1:  _generatorCount = 100;
   2:  _testScheduler = new TestScheduler();
   4:  _generator = Observable.Generate(1,
   5:      x => x <= _generatorCount,
   6:      x => x + 1,
   7:      x => x,
   8:      x => TimeSpan.FromSeconds(1), _testScheduler);

What confuses me about this is the fact I have another test in which the generator throws an exception when the value of 42 is reached and this test passes:
So why isn't the OnComplete action ever called if the OnError action can be called?

I'm sure there's something missing  wrong with the implementation of my Suspend<T> extension method, I'll continue to investigate...


Popular posts from this blog

Showing a message box from a ViewModel in MVVM

I was doing a code review with a client last week for a WPF app using MVVM and they asked ' How can I show a message from the ViewModel? '. What follows is how I would (and have) solved the problem in the past. When I hear the words ' show a message... ' I instantly think you mean show a transient modal message box that requires the user input before continuing ' with something else ' - once the user has interacted with the message box it will disappear. The following solution only applies to this scenario. The first solution is the easiest but is very wrong from a separation perspective. It violates the ideas behind the Model-View-Controller pattern because it places View concerns inside the ViewModel - the ViewModel now knows about the type of the View and specifically it knows how to show a message box window: The second approach addresses this concern by introducing the idea of messaging\events between the ViewModel and the View. In the example below

Implementing a busy indicator using a visual overlay in MVVM

This is a technique we use at work to lock the UI whilst some long running process is happening - preventing the user clicking on stuff whilst it's retrieving or rendering data. Now we could have done this by launching a child dialog window but that feels rather out of date and clumsy, we wanted a more modern pattern similar to the way <div> overlays are done on the web. Imagine we have the following simple WPF app and when 'Click' is pressed a busy waiting overlay is shown for the duration entered into the text box. What I'm interested in here is not the actual UI element of the busy indicator but how I go about getting this to show & hide from when using MVVM. The actual UI elements are the standard Busy Indicator coming from the WPF Toolkit : The XAML behind this window is very simple, the important part is the ViewHost. As you can see the ViewHost uses a ContentPresenter element which is bound to the view model, IMainViewModel, it contains 3 child v

Custom AuthorizationHandler for SignalR Hubs

How to implement IAuthorizationRequirement for SignalR in Asp.Net Core v5.0 Been battling this for a couple of days, and eventually ended up raising an issue on Asp.Net Core gitHub  to find the answer. Wanting to do some custom authorization on a SignalR Hub when the client makes a connection (Hub is created) and when an endpoint (Hub method) is called:  I was assuming I could use the same Policy for both class & method attributes, but it ain't so - not because you can't, because you need the signatures to be different. Method implementation has a resource type of HubInnovationContext: I assumed class implementation would have a resource type of HubConnectionContext - client connects etc... This isn't the case, it's infact of type DefaultHttpContext . For me I don't even need that, it can be removed completely  from the inheritence signature and override implementation. Only other thing to note, and this could be a biggy, is the ordering of the statements in th