I realised this week I'm not being as functional when creating an Rx extension method as I should could be.
This came out of a discussion I was having with @leeoades about a Pausable<T> extension we thought we needed at work. Lee had a solution and I thought I'd try to create one without looking for the answer on the t'internet.
Hopefully the idea of the method is obvious - have the ability to pause & resume the publishing of instances to the stream.
Before implementing Pausable<T> I thought I'd implement an extension that didn't remember state whilst paused - Suspendable<T>, as you can see the same signature.
So my first attempt looked like this:
And to make sure this does exactly what's expected a set of tests covering all the edge cases:
Each of these test follow a common pattern with a defined generator publishing numbers to the Rx stream, 0 - 100. Shown below is the setup and a couple of the tests, one demonstrating a single resume and the other a multiple resume scenario:
So why is this implementation of Suspendable<T> not an ideal solution even though all the tests pass?
Subjects are mutable by design and from a functional programming perspective the idea of being able to mutate state is not something you want to do. Obviously you can use subjects, one scenario is where the subject is the origin of an Rx stream, e.g. in a service publishing instances asynchronously via the OnNext method. This isn't applicable in an Rx extension method. Ideally one should be using other Rx operators & extension methods.
A quick Google search for 'avoid Subject Rx ' leads me to a post on the Rx forum by the Eric Meijer giving his view on the topic.
So how do I make this a more functional implementation?
The easiest way to avoid using a Subject<T> is to use a Observable.Create<T> to return the IObservable<T> instance, this is what I came up with:
This introduced me to two new Rx operators - StartWith & Switch, StartWith should be obvious in what it does but Switch is a little more subtle, it only produces values from the latest observable stream. In the code above the Select operator returns the original observable stream or an empty observable stream depending on the suspend observable stream most recent value.
Now this implementation has an issue and because I'm a good programmer and already had unit tests it was picked up straight away :)
As you can see from the output the test 'should_complete_when_stream_completes' fails - hopefully the test name is obvious, but if not basically I'm expecting the OnComplete action to be called when the stream completes:
For some reason the completed parameter is never set to true and therefore the assert fails as shown by the red x shown at the side. The stream is setup as follows, you can see it completes when 100 is reached:
What confuses me about this is the fact I have another test in which the generator throws an exception when the value of 42 is reached and this test passes:
So why isn't the OnComplete action ever called if the OnError action can be called?
I'm sure there's somethingmissing wrong with the implementation of my Suspend<T> extension method, I'll continue to investigate...
This came out of a discussion I was having with @leeoades about a Pausable<T> extension we thought we needed at work. Lee had a solution and I thought I'd try to create one without looking for the answer on the t'internet.
1: public static IObservable<T> Pausable<T>(this IObservable<T> stream,
IObservable<bool> paused,
bool initialState = false)
2: {
3: ...
4: }
Hopefully the idea of the method is obvious - have the ability to pause & resume the publishing of instances to the stream.
Before implementing Pausable<T> I thought I'd implement an extension that didn't remember state whilst paused - Suspendable<T>, as you can see the same signature.
1: public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
IObservable<bool> suspend,
bool initialState = false)
2: {
3: ...
4: }
So my first attempt looked like this:
1: public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
IObservable<bool> suspend,
bool initialState = false)
2: {
3: var suspended = new ReplaySubject<bool>(1);
4: suspended.OnNext(initialState);
5:
6: suspend.Subscribe(suspended.OnNext);
7: return stream.Where(t => !suspended.Take(1).Wait());
8: }
And to make sure this does exactly what's expected a set of tests covering all the edge cases:
Each of these test follow a common pattern with a defined generator publishing numbers to the Rx stream, 0 - 100. Shown below is the setup and a couple of the tests, one demonstrating a single resume and the other a multiple resume scenario:
1: [SetUp]
2: public void SetUp()
3: {
4: _generatorCount = 100;
5: _testScheduler = new TestScheduler();
6:
7: _generator = Observable.Generate(1,
8: x => x <= _generatorCount,
9: x => x + 1,
10: x => x,
11: x => TimeSpan.FromSeconds(1), _testScheduler);
12: }
13:
14: [Test]
15: public void should_recieve_values_after_single_resuming()
16: {
17: // ARRANGE
18: var count = 0;
19: var suspend = new Subject<bool>();
20:
21: _generator.Suspendable(suspend, true)
22: .Subscribe(n => count++);
23:
24: // ACT
25: _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1100).Ticks);
26: suspend.OnNext(false);
27: _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(1010).Ticks);
28:
29: // ASSERT
30: Assert.That(count, Is.EqualTo(1));
31: }
32:
33: [Test]
34: public void should_recieve_values_after_multiple_resuming()
35: {
36: // ARRANGE
37: var count = 0;
38: var suspend = new Subject<bool>();
39:
40: _generator.Suspendable(suspend, true)
41: .Subscribe(n => count++);
42:
43: // ACT
44: _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
45: suspend.OnNext(false);
46: _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
47: suspend.OnNext(true);
48: _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
49: suspend.OnNext(false);
50: _testScheduler.AdvanceBy(TimeSpan.FromMilliseconds(10010).Ticks);
51:
52: // ASSERT
53: Assert.That(count, Is.EqualTo(20));
54: }
So why is this implementation of Suspendable<T> not an ideal solution even though all the tests pass?
Subjects are mutable by design and from a functional programming perspective the idea of being able to mutate state is not something you want to do. Obviously you can use subjects, one scenario is where the subject is the origin of an Rx stream, e.g. in a service publishing instances asynchronously via the OnNext method. This isn't applicable in an Rx extension method. Ideally one should be using other Rx operators & extension methods.
A quick Google search for 'avoid Subject
So how do I make this a more functional implementation?
The easiest way to avoid using a Subject<T> is to use a Observable.Create<T> to return the IObservable<T> instance, this is what I came up with:
1: public static IObservable<T> Suspendable<T>(this IObservable<T> stream,
IObservable<bool> suspend,
bool initialState = false)
2: {
3: return Observable.Create<T>(o =>
4: {
5: var disposable = suspend.StartWith(initialState)
6: .DistinctUntilChanged()
7: .Select(s => s ? Observable.Empty<T>() : stream)
8: .Switch()
9: .Subscribe(o);
10:
11: return disposable;
12: });
13: }
This introduced me to two new Rx operators - StartWith & Switch, StartWith should be obvious in what it does but Switch is a little more subtle, it only produces values from the latest observable stream. In the code above the Select operator returns the original observable stream or an empty observable stream depending on the suspend observable stream most recent value.
Now this implementation has an issue and because I'm a good programmer and already had unit tests it was picked up straight away :)
As you can see from the output the test 'should_complete_when_stream_completes' fails - hopefully the test name is obvious, but if not basically I'm expecting the OnComplete action to be called when the stream completes:
For some reason the completed parameter is never set to true and therefore the assert fails as shown by the red x shown at the side. The stream is setup as follows, you can see it completes when 100 is reached:
1: _generatorCount = 100;
2: _testScheduler = new TestScheduler();
3:
4: _generator = Observable.Generate(1,
5: x => x <= _generatorCount,
6: x => x + 1,
7: x => x,
8: x => TimeSpan.FromSeconds(1), _testScheduler);
What confuses me about this is the fact I have another test in which the generator throws an exception when the value of 42 is reached and this test passes:
So why isn't the OnComplete action ever called if the OnError action can be called?
I'm sure there's something
Comments
Post a Comment