Skip to main content

Trying to be more functional with Rx

I realised this week I'm not being as functional when creating an Rx extension method as I should could be.

This came out of a discussion I was having with @leeoades about a Pausable<T> extension we thought we needed at work. Lee had a solution and I thought I'd try to create one without looking for the answer on the t'internet.

   1:  public static IObservable<T> Pausable<T>(this IObservable<T> stream,
                                                IObservable<bool> paused,
                                                bool initialState = false)
   2:  {
   3:      ...
   4:  }

Hopefully the idea of the method is obvious - have the ability to pause & resume the publishing of instances to the stream.

Before implementing Pausable<T> I thought I'd implement an extension that didn't remember state whilst paused - Suspendable<T>, as you can see the same signature.

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      ...
   4:  }

So my first attempt looked like this:

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      var suspended = new ReplaySubject<bool>(1);
   4:      suspended.OnNext(initialState);
   6:      suspend.Subscribe(suspended.OnNext);
   7:      return stream.Where(t => !suspended.Take(1).Wait());
   8:  }

And to make sure this does exactly what's expected a set of tests covering all the edge cases:
Each of these test follow a common pattern with a defined generator publishing numbers to the Rx stream, 0 - 100. Shown below is the setup and a couple of the tests, one demonstrating a single resume and the other a multiple resume scenario:

   1:  [SetUp]
   2:  public void SetUp()
   3:  {
   4:      _generatorCount = 100;
   5:      _testScheduler = new TestScheduler();
   7:      _generator = Observable.Generate(1,
   8:          x => x <= _generatorCount,
   9:          x => x + 1,
  10:          x => x,
  11:          x => TimeSpan.FromSeconds(1), _testScheduler);
  12:  }
  14:  [Test]
  15:  public void should_recieve_values_after_single_resuming()
  16:  {
  17:      // ARRANGE
  18:      var count = 0;
  19:      var suspend = new Subject<bool>();
  21:      _generator.Suspendable(suspend, true)
  22:          .Subscribe(n => count++);
  24:      // ACT
  25:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1100).Ticks);
  26:      suspend.OnNext(false);
  27:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1010).Ticks);
  29:      // ASSERT
  30:      Assert.That(count, Is.EqualTo(1));
  31:  }
  33:  [Test]
  34:  public void should_recieve_values_after_multiple_resuming()
  35:  {
  36:      // ARRANGE
  37:      var count = 0;
  38:      var suspend = new Subject<bool>();
  40:      _generator.Suspendable(suspend, true)
  41:          .Subscribe(n => count++);
  43:      // ACT
  44:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  45:      suspend.OnNext(false);
  46:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  47:      suspend.OnNext(true);
  48:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  49:      suspend.OnNext(false);
  50:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  52:      // ASSERT
  53:      Assert.That(count, Is.EqualTo(20));
  54:  }

So why is this implementation of Suspendable<T> not an ideal solution even though all the tests pass?

Subjects are mutable by design and from a functional programming perspective the idea of being able to mutate state is not something you want to do. Obviously you can use subjects, one scenario is where the subject is the origin of an Rx stream, e.g. in a service publishing instances asynchronously via the OnNext method. This isn't applicable in an Rx extension method. Ideally one should be using other Rx operators & extension methods.

A quick Google search for 'avoid Subject Rx' leads me to a post on the Rx forum by the Eric Meijer giving his view on the topic.

So how do I make this a more functional implementation?

The easiest way to avoid using a Subject<T> is to use a Observable.Create<T> to return the IObservable<T> instance, this is what I came up with:

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      return Observable.Create<T>(o =>
   4:      {
   5:          var disposable = suspend.StartWith(initialState)
   6:                  .DistinctUntilChanged()
   7:                  .Select(s => s ? Observable.Empty<T>() : stream)
   8:                  .Switch()
   9:                  .Subscribe(o);
  11:          return disposable;
  12:      });
  13:  }

This introduced me to two new Rx operators - StartWith & Switch, StartWith should be obvious in what it does but Switch is a little more subtle, it only produces values from the latest observable stream. In the code above the Select operator returns the original observable stream or an empty observable stream depending on the suspend observable stream most recent value.

Now this implementation has an issue and because I'm a good programmer and already had unit tests it was picked up straight away :)
As you can see from the output the test 'should_complete_when_stream_completes' fails - hopefully the test name is obvious, but if not basically I'm expecting the OnComplete action to be called when the stream completes:
For some reason the completed parameter is never set to true and therefore the assert fails as shown by the red x shown at the side. The stream is setup as follows, you can see it completes when 100 is reached:

   1:  _generatorCount = 100;
   2:  _testScheduler = new TestScheduler();
   4:  _generator = Observable.Generate(1,
   5:      x => x <= _generatorCount,
   6:      x => x + 1,
   7:      x => x,
   8:      x => TimeSpan.FromSeconds(1), _testScheduler);

What confuses me about this is the fact I have another test in which the generator throws an exception when the value of 42 is reached and this test passes:
So why isn't the OnComplete action ever called if the OnError action can be called?

I'm sure there's something missing  wrong with the implementation of my Suspend<T> extension method, I'll continue to investigate...


Popular posts from this blog

WPF tips & tricks: Dispatcher thread performance

Not blogged for an age, and I received an email last week which provoked me back to life. It was a job spec for a WPF contract where they want help sorting out the performance of their app especially around grids and tabular data. I thought I'd shared some tips & tricks I've picked up along the way, these aren't probably going to solve any issues you might be having directly, but they might point you in the right direction when trying to find and resolve performance issues with a WPF app. First off, performance is something you shouldn't try and improve without evidence, and this means having evidence proving you've improved the performance - before & after metrics for example. Without this you're basically pissing into the wind, which can be fun from a developer point of view but bad for a project :) So, what do I mean by ' Dispatcher thread performance '? The 'dispatcher thread' or the 'UI thread' is probably the most

Showing a message box from a ViewModel in MVVM

I was doing a code review with a client last week for a WPF app using MVVM and they asked ' How can I show a message from the ViewModel? '. What follows is how I would (and have) solved the problem in the past. When I hear the words ' show a message... ' I instantly think you mean show a transient modal message box that requires the user input before continuing ' with something else ' - once the user has interacted with the message box it will disappear. The following solution only applies to this scenario. The first solution is the easiest but is very wrong from a separation perspective. It violates the ideas behind the Model-View-Controller pattern because it places View concerns inside the ViewModel - the ViewModel now knows about the type of the View and specifically it knows how to show a message box window: The second approach addresses this concern by introducing the idea of messaging\events between the ViewModel and the View. In the example below

Implementing a busy indicator using a visual overlay in MVVM

This is a technique we use at work to lock the UI whilst some long running process is happening - preventing the user clicking on stuff whilst it's retrieving or rendering data. Now we could have done this by launching a child dialog window but that feels rather out of date and clumsy, we wanted a more modern pattern similar to the way <div> overlays are done on the web. Imagine we have the following simple WPF app and when 'Click' is pressed a busy waiting overlay is shown for the duration entered into the text box. What I'm interested in here is not the actual UI element of the busy indicator but how I go about getting this to show & hide from when using MVVM. The actual UI elements are the standard Busy Indicator coming from the WPF Toolkit : The XAML behind this window is very simple, the important part is the ViewHost. As you can see the ViewHost uses a ContentPresenter element which is bound to the view model, IMainViewModel, it contains 3 child v