Skip to main content

Trying to be more functional with Rx

I realised this week I'm not being as functional when creating an Rx extension method as I should could be.

This came out of a discussion I was having with @leeoades about a Pausable<T> extension we thought we needed at work. Lee had a solution and I thought I'd try to create one without looking for the answer on the t'internet.

   1:  public static IObservable<T> Pausable<T>(this IObservable<T> stream,
                                                IObservable<bool> paused,
                                                bool initialState = false)
   2:  {
   3:      ...
   4:  }

Hopefully the idea of the method is obvious - have the ability to pause & resume the publishing of instances to the stream.

Before implementing Pausable<T> I thought I'd implement an extension that didn't remember state whilst paused - Suspendable<T>, as you can see the same signature.

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      ...
   4:  }

So my first attempt looked like this:

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      var suspended = new ReplaySubject<bool>(1);
   4:      suspended.OnNext(initialState);
   5:              
   6:      suspend.Subscribe(suspended.OnNext);
   7:      return stream.Where(t => !suspended.Take(1).Wait());
   8:  }

And to make sure this does exactly what's expected a set of tests covering all the edge cases:
Each of these test follow a common pattern with a defined generator publishing numbers to the Rx stream, 0 - 100. Shown below is the setup and a couple of the tests, one demonstrating a single resume and the other a multiple resume scenario:

   1:  [SetUp]
   2:  public void SetUp()
   3:  {
   4:      _generatorCount = 100;
   5:      _testScheduler = new TestScheduler();
   6:   
   7:      _generator = Observable.Generate(1,
   8:          x => x <= _generatorCount,
   9:          x => x + 1,
  10:          x => x,
  11:          x => TimeSpan.FromSeconds(1), _testScheduler);
  12:  }
  13:   
  14:  [Test]
  15:  public void should_recieve_values_after_single_resuming()
  16:  {
  17:      // ARRANGE
  18:      var count = 0;
  19:      var suspend = new Subject<bool>();
  20:   
  21:      _generator.Suspendable(suspend, true)
  22:          .Subscribe(n => count++);
  23:   
  24:      // ACT
  25:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1100).Ticks);
  26:      suspend.OnNext(false);
  27:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1010).Ticks);
  28:   
  29:      // ASSERT
  30:      Assert.That(count, Is.EqualTo(1));
  31:  }
  32:          
  33:  [Test]
  34:  public void should_recieve_values_after_multiple_resuming()
  35:  {
  36:      // ARRANGE
  37:      var count = 0;
  38:      var suspend = new Subject<bool>();
  39:   
  40:      _generator.Suspendable(suspend, true)
  41:          .Subscribe(n => count++);
  42:   
  43:      // ACT
  44:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  45:      suspend.OnNext(false);
  46:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  47:      suspend.OnNext(true);
  48:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  49:      suspend.OnNext(false);
  50:      _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
  51:   
  52:      // ASSERT
  53:      Assert.That(count, Is.EqualTo(20));
  54:  }

So why is this implementation of Suspendable<T> not an ideal solution even though all the tests pass?

Subjects are mutable by design and from a functional programming perspective the idea of being able to mutate state is not something you want to do. Obviously you can use subjects, one scenario is where the subject is the origin of an Rx stream, e.g. in a service publishing instances asynchronously via the OnNext method. This isn't applicable in an Rx extension method. Ideally one should be using other Rx operators & extension methods.

A quick Google search for 'avoid Subject Rx' leads me to a post on the Rx forum by the Eric Meijer giving his view on the topic.

So how do I make this a more functional implementation?

The easiest way to avoid using a Subject<T> is to use a Observable.Create<T> to return the IObservable<T> instance, this is what I came up with:

   1:  public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
                                                   IObservable<bool> suspend,
                                                   bool initialState = false)
   2:  {
   3:      return Observable.Create<T>(o =>
   4:      {
   5:          var disposable = suspend.StartWith(initialState)
   6:                  .DistinctUntilChanged()
   7:                  .Select(s => s ? Observable.Empty<T>() : stream)
   8:                  .Switch()
   9:                  .Subscribe(o);
  10:   
  11:          return disposable;
  12:      });
  13:  }

This introduced me to two new Rx operators - StartWith & Switch, StartWith should be obvious in what it does but Switch is a little more subtle, it only produces values from the latest observable stream. In the code above the Select operator returns the original observable stream or an empty observable stream depending on the suspend observable stream most recent value.

Now this implementation has an issue and because I'm a good programmer and already had unit tests it was picked up straight away :)
As you can see from the output the test 'should_complete_when_stream_completes' fails - hopefully the test name is obvious, but if not basically I'm expecting the OnComplete action to be called when the stream completes:
For some reason the completed parameter is never set to true and therefore the assert fails as shown by the red x shown at the side. The stream is setup as follows, you can see it completes when 100 is reached:

   1:  _generatorCount = 100;
   2:  _testScheduler = new TestScheduler();
   3:   
   4:  _generator = Observable.Generate(1,
   5:      x => x <= _generatorCount,
   6:      x => x + 1,
   7:      x => x,
   8:      x => TimeSpan.FromSeconds(1), _testScheduler);

What confuses me about this is the fact I have another test in which the generator throws an exception when the value of 42 is reached and this test passes:
So why isn't the OnComplete action ever called if the OnError action can be called?

I'm sure there's something missing  wrong with the implementation of my Suspend<T> extension method, I'll continue to investigate...

Comments

Popular posts from this blog

Integrating jasmine into Visual Studio 2010/2011 beta

Following on from my previous post about testing javascript with jasmine. I was interested to explore integration into visual studio 2010 so I could run them along side test written in another language like C#. I found the VS 2010 extension Chutpah (pronounced  'hutz-pah'). This got me up and running with the ability to run test manually and to my surprised it worked by only have the SpecRunner.html file open. I didn't a csproj or sln file containing the javascript, it's clever enough to resolve all dependencies: Test results are render in the output window of VS 2010: This is good and I appreciate the work someone has done to get this far but I want more... I want integration into Resharper... A quick squizz on the inter'webs and I end posting a request on jetBrains forum , it looks like support is coming in R# 7. Then I thought lets check out the current beta and see, so off I go and boot Win8 and install R#7 beta and see if it's there yet... ...

Implementing a busy indicator using a visual overlay in MVVM

This is a technique we use at work to lock the UI whilst some long running process is happening - preventing the user clicking on stuff whilst it's retrieving or rendering data. Now we could have done this by launching a child dialog window but that feels rather out of date and clumsy, we wanted a more modern pattern similar to the way <div> overlays are done on the web. Imagine we have the following simple WPF app and when 'Click' is pressed a busy waiting overlay is shown for the duration entered into the text box. What I'm interested in here is not the actual UI element of the busy indicator but how I go about getting this to show & hide from when using MVVM. The actual UI elements are the standard Busy Indicator coming from the WPF Toolkit : The XAML behind this window is very simple, the important part is the ViewHost. As you can see the ViewHost uses a ContentPresenter element which is bound to the view model, IMainViewModel, it contains 3 child v...

Showing a message box from a ViewModel in MVVM

I was doing a code review with a client last week for a WPF app using MVVM and they asked ' How can I show a message from the ViewModel? '. What follows is how I would (and have) solved the problem in the past. When I hear the words ' show a message... ' I instantly think you mean show a transient modal message box that requires the user input before continuing ' with something else ' - once the user has interacted with the message box it will disappear. The following solution only applies to this scenario. The first solution is the easiest but is very wrong from a separation perspective. It violates the ideas behind the Model-View-Controller pattern because it places View concerns inside the ViewModel - the ViewModel now knows about the type of the View and specifically it knows how to show a message box window: The second approach addresses this concern by introducing the idea of messaging\events between the ViewModel and the View. In the example ...