Introduction to Rx
Kindle edition (2012)
Practical Rx Training
London 7-8th July 2014
Presented by the author of IntroToRx.com
Testing software has its roots in debugging and demonstrating code. Having largely matured past manual tests that try to "break the application", modern quality assurance standards demand a level of automation that can help evaluate and prevent bugs. While teams of testing specialists are common, more and more coders are expected to provide quality guarantees via automated test suites.
Up to this point, we have covered a broad scope of Rx, and we have almost enough knowledge to start using Rx in anger! Still, many developers would not dream of coding without first being able to write tests. Tests can be used to prove that code is in fact satisfying requirements, provide a safety net against regression and can even help document the code. This chapter makes the assumption that you are familiar with the concepts of dependency injection and unit testing with test-doubles, such as mocks or stubs.
Rx poses some interesting problems to our Test-Driven community:
- Scheduling, and therefore threading, is generally avoided in test scenarios as it can introduce race conditions which may lead to non-deterministic tests
- Tests should run as fast as possible
- For many, Rx is a new technology/library. Naturally, as we progress on our journey to mastering Rx, we may want to refactor some of our previous Rx code. We want to use tests to ensure that our refactoring has not altered the internal behavior of our code base
- Likewise, tests will ensure nothing breaks when we upgrade versions of Rx.
While we do want to test our code, we don't want to introduce slow or non-deterministic tests; indeed, the later would introduce false-negatives or false-positives. If we look at the Rx library, there are plenty of methods that involve scheduling (implicitly or explicitly), so using Rx effectively makes it hard to avoid scheduling. This LINQ query shows us that there are at least 26 extension methods that accept an IScheduler as a parameter.
Many of these methods also have an overload that does not take an IScheduler and instead uses a default instance. TDD/Test First coders will want to opt for the overload that accepts the IScheduler, so that they can have some control over scheduling in our tests. I will explain why soon.
Consider this example, where we create a sequence that publishes values every second for five seconds.
If we were to write a test that ensured that we received five values and they were each one second apart, it would take five seconds to run. That would be no good; I want hundreds if not thousands of tests to run in five seconds. Another very common requirement is to test a timeout. Here, we try to test a timeout of one minute.
We have two problems here:
- either the Assert runs too soon, and the test is pointless as it always fails, or
- we have to add a delay of one minute to perform an accurate test
For this test to be useful, it would therefore take one minute to run. Unit tests that take one minute to run are not acceptable.
To our rescue comes the TestScheduler; it introduces the concept of a virtual scheduler to allow us to emulate and control time.
A virtual scheduler can be conceptualized as a queue of actions to be executed. Each are assigned a point in time when they should be executed. We use the TestScheduler as a substitute, or test double, for the production IScheduler types. Using this virtual scheduler, we can either execute all queued actions, or only those up to a specified point in time.
In this example, we schedule a task onto the queue to be run immediately by using the simple overload (Schedule(Action)). We then advance the virtual clock forward by one tick. By doing so, we execute everything scheduled up to that point in time. Note that even though we schedule an action to be executed immediately, it will not actually be executed until the clock is manually advanced.
Running and debugging this example may help you to better understand the basics of the TestScheduler.
The TestScheduler implements the IScheduler interface (naturally) and also extends it to allow us to control and monitor virtual time. We are already familiar with the IScheduler.Schedule methods, however the AdvanceBy(long), AdvanceTo(long) and Start() methods unique to the TestScheduler are of most interest. Likewise, the Clock property will also be of interest, as it can help us understand what is happening internally.
The AdvanceTo(long) method will execute all the actions that have been scheduled up to the absolute time specified. The TestScheduler uses ticks as its measurement of time. In this example, we schedule actions to be invoked now, in 10 ticks, and in 20 ticks.
Note that nothing happened when we advanced to 15 ticks. All work scheduled before 15 ticks had been performed and we had not advanced far enough yet to get to the next scheduled action.
The AdvanceBy(long) method allows us to move the clock forward a relative amount of time. Again, the measurements are in ticks. We can take the last example and modify it to use AdvanceBy(long).
The TestScheduler's Start() method is an effective way to execute everything that has been scheduled. We take the same example again and swap out the AdvanceBy(long) calls for a single Start() call.
Note that once all of the scheduled actions have been executed, the virtual clock matches our last scheduled item (20 ticks).
We further extend our example by scheduling a new action to happen after Start() has already been called.
Note that the output is exactly the same; If we want our fourth action to be executed, we will have to call Start() again.
In previous releases of Rx, the Start() method was called Run(). Now there is a Stop() method whose name seems to imply some symmetry with Start(). All it does however, is set the IsEnabled property to false. This property is used as an internal flag to check whether the internal queue of actions should continue being executed. The processing of the queue may indeed be instigated by Start(), however AdvanceTo or AdvanceBy can be used too.
In this example, we show how you could use Stop() to pause processing of scheduled actions.
Note that "C" never gets printed as we stop the clock at 15 ticks. I have been testing Rx successfully for nearly two years now, yet I have not found the need to use the Stop() method. I imagine that there are cases that warrant its use; however I just wanted to make the point that you do not have to be concerned about the lack of use of it in your tests.
When scheduling actions, it is possible and even likely that many actions will be scheduled for the same point in time. This most commonly would occur when scheduling multiple actions for now. It could also happen that there are multiple actions scheduled for the same point in the future. The TestScheduler has a simple way to deal with this. When actions are scheduled, they are marked with the clock time they are scheduled for. If multiple items are scheduled for the same point in time, they are queued in order that they were scheduled; when the clock advances, all items for that point in time are executed in the order that they were scheduled.
Note that the virtual clock is at 10 ticks, the time we advanced to.
Testing Rx code
Now that we have learnt a little bit about the TestScheduler, let's look at how we could use it to test our two initial code snippets that use Interval and Timeout. We want to execute tests as fast as possible but still maintain the semantics of time. In this example we generate our five values one second apart but pass in our TestScheduler to the Interval method to use instead of the default scheduler.
While this is mildly interesting, what I think is more important is how we would test a real piece of code. Imagine, if you will, a ViewModel that subscribes to a stream of prices. As prices are published, it adds them to a collection. Assuming this is a WPF or Silverlight implementation, we take the liberty of enforcing that the subscription be done on the ThreadPool and the observing is executed on the Dispatcher.
Injecting scheduler dependencies
While the snippet of code above may do what we want it to, it will be hard to test as it is accessing the schedulers via static properties. To help my testing, I have created my own interface that exposes the same IScheduler implementations that the Scheduler type does, i suggest you adopt this interface too.
TaskPool property should be included or not depends on
your target platform. If you adopt this concept, feel free to name this type in
accordance with your naming conventions e.g.
The default implementation that we would run in production is implemented as follows:
This now allows me to substitute implementations of ISchedulerProvider to help with testing. I could mock the ISchedulerProvider, but I find it easier to provide a test implementation. My implementation for testing is as follows.
Note that ISchedulerProvider is implemented explicitly. This means that, in our tests, we can access the TestScheduler instances directly, but our system under test (SUT) still just sees the interface implementation. I can now write some tests for my ViewModel. Below, we test a modified version of the MyViewModel class that takes an ISchedulerProvider and uses that instead of the static schedulers from the Scheduler class. We also use the popular Moq framework in order to mock out our model.
These two tests ensure five things:
- That the Price property has prices added to it as the model produces them
- That the sequence is subscribed to on the ThreadPool
- That the Price property is updated on the Dispatcher i.e. the sequence is observed on the Dispatcher
- That a timeout of 10 seconds between prices will set the ViewModel to disconnected.
- The tests run fast. While the time to run the tests is not that impressive, most of that time seems to be spent warming up my test harness. Moreover, increasing the test count to 10 only adds 0.03seconds. In general, on a modern CPU, I expect to see unit tests run at a rate of +1000 tests per second
Usually, I would not have more than one assert/verify per test, but here it does
help illustrate a point. In the first test, we can see that only once both the
ThreadPool and the
Dispatcher schedulers have been run will
we get a result. In the second test, it helps to verify that the timeout is not
less than 10 seconds.
In some scenarios, you are not interested in the scheduler and you want to be focusing your tests on other functionality. If this is the case, then you may want to create another test implementation of the ISchedulerProvider that returns the ImmediateScheduler for all of its members. That can help reduce the noise in your tests.
Advanced features - ITestableObserver
The TestScheduler provides further advanced features. I find that I am able to get by quite well without these methods, but others may find them useful. Perhaps this is because I have found myself accustomed to testing without them from using earlier versions of Rx.
There are three overloads to Start, which are used to start an observable sequence at a given time, record the notifications it makes and dispose of the subscription at a given time. This can be confusing at first, as the parameterless overload of Start is quite unrelated. These three overloads return an ITestableObserver<T> which allows you to record the notifications from an observable sequence, much like the Materialize method we saw in the Transformation chapter.
While there are three overloads, we will look at the most specific one first. This overload takes four parameters:
- an observable sequence factory delegate
- the point in time to invoke the factory
- the point in time to subscribe to the observable sequence returned from the factory
- the point in time to dispose of the subscription
The time for the last three parameters is measured in ticks, as per the rest of the TestScheduler members.
We could use this method to test the Observable.Interval factory method. Here, we create an observable sequence that spawns a value every second for 4 seconds. We use the TestScheduler.Start method to create and subscribe to it immediately (by passing 0 for the second and third parameters). We dispose of our subscription after 5 seconds. Once the Start method has run, we output what we have recorded.
Note that the ITestObserver<T> records
OnCompleted notifications. If the sequence was to terminate in error,
the ITestObserver<T> would record the
We can play with the input variables to see the impact it makes. We know that the Observable.Interval method is a Cold Observable, so the virtual time of the creation is not relevant. Changing the virtual time of the subscription can change our results. If we change it to 2 seconds, we will notice that if we leave the disposal time at 5 seconds, we will miss some messages.
We start the subscription at 2 seconds; the Interval produces values after
each second (i.e. second 3 and 4), and we dispose on second 5. So we miss the other
OnNext messages as well as the
There are two other overloads to this TestScheduler.Start method.
As you can see, these overloads just call through to the variant we have been looking at, but passing some default values. I am not sure why these default values are special; I can not imagine why you would want to use these two methods, unless your specific use case matched that specific configuration exactly.
Just as we can record an observable sequence, we can also use CreateColdObservable
to playback a set of Recorded<Notification<int>>. The signature
for CreateColdObservable simply takes a
params array of recorded
The CreateColdObservable returns an ITestableObservable<T>. This interface extends IObservable<T> by exposing the list of "subscriptions" and the list of messages it will produce.
Using CreateColdObservable, we can emulate the Observable.Interval test we had earlier.
Note that our output is exactly the same as the previous example with Observable.Interval.
We can also create hot test observable sequences using the CreateHotObservable method. It has the same parameters and return value as CreateColdObservable; the difference is that the virtual time specified for each message is now relative to when the observable was created, not when it is subscribed to as per the CreateColdObservable method.
This example is just that last "cold" sample, but creating a Hot observable instead.
Note that the output is almost the same. Scheduling of the creation and subscription do not affect the Hot Observable, therefore the notifications happen 1 tick earlier than their Cold counterparts.
We can see the major difference a Hot Observable bears by changing the virtual create time and virtual subscribe time to be different values. With a Cold Observable, the virtual create time has no real impact, as subscription is what initiates any action. This means we can not miss any early message on a Cold Observable. For Hot Observables, we can miss messages if we subscribe too late. Here, we create the Hot Observable immediately, but only subscribe to it after 1 second (thus missing the first message).
Finally, if you do not want to use the TestScheduler.Start methods, and you need more fine-grained control over your observer, you can use TestScheduler.CreateObserver(). This will return an ITestObserver that you can use to manage the subscriptions to your observable sequences with. Furthermore, you will still be exposed to the recorded messages and any subscribers.
Current industry standards demand broad coverage of automated unit tests to meet quality assurance standards. Concurrent programming, however, is often a difficult area to test well. Rx delivers a well-designed implementation of testing features, allowing deterministic and high-throughput testing. The TestScheduler provides methods to control virtual time and produce observable sequences for testing. This ability to easily and reliably test concurrent systems sets Rx apart from many other libraries.
Additional recommended reading
|<< Back to : PART 4 - Concurrency||Moving on to : Sequences of coincidence>>|