
Introduction to Rx
Kindle edition (2012)
Practical Rx Training
London 6-7 October 2015
Presented by the author of IntroToRx.com
Book now!
Key types
There are two key types to understand when working with Rx, and a subset of auxiliary types that will help you to learn Rx more effectively. The IObserver<T> and IObservable<T> form the fundamental building blocks for Rx, while implementations of ISubject<TSource, TResult> reduce the learning curve for developers new to Rx.
Many are familiar with LINQ and its many popular forms like LINQ to Objects, LINQ to SQL & LINQ to XML. Each of these common implementations allows you query data at rest; Rx offers the ability to query data in motion. Essentially Rx is built upon the foundations of the Observer pattern. .NET already exposes some other ways to implement the Observer pattern such as multicast delegates or events (which are usually multicast delegates). Multicast delegates are not ideal however as they exhibit the following less desirable features;
- In C#, events have a curious interface. Some find the
+=
and-=
operators an unnatural way to register a callback - Events are difficult to compose
- Events don't offer the ability to be easily queried over time
- Events are a common cause of accidental memory leaks
- Events do not have a standard pattern for signaling completion
- Events provide almost no help for concurrency or multithreaded applications. e.g. To raise an event on a separate thread requires you to do all of the plumbing
Rx looks to solve these problems. Here I will introduce you to the building blocks and some basic types that make up Rx.
IObservable<T>
IObservable<T> is one of the two new core interfaces for working with Rx. It is a simple interface with just a Subscribe method. Microsoft is so confident that this interface will be of use to you it has been included in the BCL as of version 4.0 of .NET. You should be able to think of anything that implements IObservable<T> as a streaming sequence of T objects. So if a method returned an IObservable<Price> I could think of it as a stream of Prices.
.NET already has the concept of Streams with the type and sub types of System.IO.Stream. The System.IO.Stream implementations are commonly used to stream data (generally bytes) to or from an I/O device like a file, network or block of memory. System.IO.Stream implementations can have both the ability to read and write, and sometimes the ability to seek (i.e. fast forward through a stream or move backwards). When I refer to an instance of IObservable<T> as a stream, it does not exhibit the seek or write functionality that streams do. This is a fundamental difference preventing Rx being built on top of the System.IO.Stream paradigm. Rx does however have the concept of forward streaming (push), disposing (closing) and completing (eof). Rx also extends the metaphor by introducing concurrency constructs, and query operations like transformation, merging, aggregating and expanding. These features are also not an appropriate fit for the existing System.IO.Stream types. Some others refer to instances of IObservable<T> as Observable Collections, which I find hard to understand. While the observable part makes sense to me, I do not find them like collections at all. You generally cannot sort, insert or remove items from an IObservable<T> instance like I would expect you can with a collection. Collections generally have some sort of backing store like an internal array. The values from an IObservable<T> source are not usually pre-materialized as you would expect from a normal collection. There is also a type in WPF/Silverlight called an ObservableCollection<T> that does exhibit collection-like behavior, and is very well suited to this description. In fact IObservable<T> integrates very well with ObservableCollection<T> instances. So to save on any confusion we will refer to instances of IObservable<T> as sequences. While instances of IEnumerable<T> are also sequences, we will adopt the convention that they are sequences of data at rest, and IObservable<T> instances are sequences of data in motion.
IObserver<T>
IObserver<T> is the other one of the two core interfaces for working with Rx. It too has made it into the BCL as of .NET 4.0. Don't worry if you are not on .NET 4.0 yet as the Rx team have included these two interfaces in a separate assembly for .NET 3.5 and Silverlight users. IObservable<T> is meant to be the "functional dual of IEnumerable<T>". If you want to know what that last statement means, then enjoy the hours of videos on Channel9 where they discuss the mathematical purity of the types. For everyone else it means that where an IEnumerable<T> can effectively yield three things (the next value, an exception or the end of the sequence), so too can IObservable<T> via IObserver<T>'s three methods OnNext(T), OnError(Exception) and OnCompleted().
Rx has an implicit contract that must be followed. An implementation of IObserver<T> may have zero or more calls to OnNext(T) followed optionally by a call to either OnError(Exception) or OnCompleted(). This protocol ensures that if a sequence terminates, it is always terminated by an OnError(Exception), or an OnCompleted(). This protocol does not however demand that an OnNext(T), OnError(Exception) or OnCompleted() ever be called. This enables to concept of empty and infinite sequences. We will look into this more later.
Interestingly, while you will be exposed to the IObservable<T> interface frequently if you work with Rx, in general you will not need to be concerned with IObserver<T>. This is due to Rx providing anonymous implementations via methods like Subscribe.
Implementing IObserver<T> and IObservable<T>
It is quite easy to implement each interface. If we wanted to create an observer that printed values to the console it would be as easy as this.
Implementing an observable sequence is a little bit harder. An overly simplified implementation that returned a sequence of numbers could look like this.
We can tie these two implementations together to get the following output
Output:
The problem we have here is that this is not really reactive at all. This implementation is blocking, so we may as well use an IEnumerable<T> implementation like a List<T> or an array.
This problem of implementing the interfaces should not concern us too much. You will find that when you use Rx, you do not have the need to actually implement these interfaces, Rx provides all of the implementations you need out of the box. Let's have a look at the simple ones.
Subject<T>
I like to think of the IObserver<T> and the IObservable<T> as the 'reader' and 'writer' or, 'consumer' and 'publisher' interfaces. If you were to create your own implementation of IObservable<T> you may find that while you want to publicly expose the IObservable characteristics you still need to be able to publish items to the subscribers, throw errors and notify when the sequence is complete. Why that sounds just like the methods defined in IObserver<T>! While it may seem odd to have one type implementing both interfaces, it does make life easy. This is what subjects can do for you. Subject<T> is the most basic of the subjects. Effectively you can expose your Subject<T> behind a method that returns IObservable<T> but internally you can use the OnNext, OnError and OnCompleted methods to control the sequence.
In this very basic example, I create a subject, subscribe to that subject and then
publish values to the sequence (by calling subject.OnNext(T)
).
Note that the WriteSequenceToConsole
method takes an IObservable<string>
as it only wants access to the subscribe method. Hang on, doesn't the Subscribe
method need an IObserver<string> as an argument? Surely Console.WriteLine
does not match that interface. Well it doesn't, but the Rx team supply me with an
Extension Method to IObservable<T> that just takes an Action<T>.
The action will be executed every time an item is published. There are
other overloads to the Subscribe extension method that allows you to pass
combinations of delegates to be invoked for OnNext, OnCompleted
and OnError. This effectively means I don't need to implement IObserver<T>.
Cool.
As you can see, Subject<T> could be quite useful for getting started in Rx programming. Subject<T> however, is a basic implementation. There are three siblings to Subject<T> that offer subtly different implementations which can drastically change the way your program runs.
ReplaySubject<T>
ReplaySubject<T> provides the feature of caching values and then replaying them for any late subscriptions. Consider this example where we have moved our first publication to occur before our subscription
The result of this would be that 'b' and 'c' would be written to the console, but 'a' ignored. If we were to make the minor change to make subject a ReplaySubject<T> we would see all publications again.
This can be very handy for eliminating race conditions. Be warned though, the default constructor of the ReplaySubject<T> will create an instance that caches every value published to it. In many scenarios this could create unnecessary memory pressure on the application. ReplaySubject<T> allows you to specify simple cache expiry settings that can alleviate this memory issue. One option is that you can specify the size of the buffer in the cache. In this example we create the ReplaySubject<T> with a buffer size of 2, and so only get the last two values published prior to our subscription:
Here the output would show that the value 'a' had been dropped from the cache, but values 'b' and 'c' were still valid. The value 'd' was published after we subscribed so it is also written to the console.
Another option for preventing the endless caching of values by the ReplaySubject<T>, is to provide a window for the cache. In this example, instead of creating a ReplaySubject<T> with a buffer size, we specify a window of time that the cached values are valid for.
In the above example the window was specified as 150 milliseconds. Values are published 100 milliseconds apart. Once we have subscribed to the subject, the first value is 200ms old and as such has expired and been removed from the cache.
BehaviorSubject<T>
BehaviorSubject<T> is similar to ReplaySubject<T> except it only remembers the last publication. BehaviorSubject<T> also requires you to provide it a default value of T. This means that all subscribers will receive a value immediately (unless it is already completed).
In this example the value 'a' is written to the console:
In this example the value 'b' is written to the console, but not 'a'.
In this example the values 'b', 'c' & 'd' are all written to the console, but again not 'a'
Finally in this example, no values will be published as the sequence has completed. Nothing is written to the console.
That note that there is a difference between a ReplaySubject<T> with a buffer size of one (commonly called a 'replay one subject') and a BehaviorSubject<T>. A BehaviorSubject<T> requires an initial value. With the assumption that neither subjects have completed, then you can be sure that the BehaviorSubject<T> will have a value. You cannot be certain with the ReplaySubject<T> however. With this in mind, it is unusual to ever complete a BehaviorSubject<T>. Another difference is that a replay-one-subject will still cache its value once it has been completed. So subscribing to a completed BehaviorSubject<T> we can be sure to not receive any values, but with a ReplaySubject<T> it is possible.
BehaviorSubject<T>s are often associated with class properties. As they always have a value and can provide change notifications, they could be candidates for backing fields to properties.
AsyncSubject<T>
AsyncSubject<T> is similar to the Replay and Behavior subjects in the way that it caches values, however it will only store the last value, and only publish it when the sequence is completed. The general usage of the AsyncSubject<T> is to only ever publish one value then immediately complete. This means that is becomes quite comparable to Task<T>.
In this example no values will be published as the sequence never completes. No values will be written to the console.
In this example we invoke the OnCompleted method so the last value 'c' is written to the console:
Implicit contracts
There are implicit contacts that need to be upheld when working with Rx as mentioned above. The key one is that once a sequence is completed, no more activity can happen on that sequence. A sequence can be completed in one of two ways, either by OnCompleted() or by OnError(Exception).
The four subjects described in this chapter all cater for this implicit contract by ignoring any attempts to publish values, errors or completions once the sequence has already terminated.
Here we see an attempt to publish the value 'c' on a completed sequence. Only values 'a' and 'b' are written to the console.
ISubject interfaces
While each of the four subjects described in this chapter implement the IObservable<T> and IObserver<T> interfaces, they do so via another set of interfaces:
As all the subjects mentioned here have the same type for both TSource and TResult, they implement this interface which is the superset of all the previous interfaces:
These interfaces are not widely used, but prove useful as the subjects do not share a common base class. We will see the subject interfaces used later when we discover hot and cold observables.
Subject factory
Finally it is worth making you aware that you can also create a subject via a factory method. Considering that a subject combines the IObservable<T> and IObserver<T> interfaces, it seems sensible that there should be a factory that allows you to combine them yourself. The Subject.Create(IObserver<TSource>, IObservable<TResult>) factory method provides just this.
Subjects provide a convenient way to poke around Rx, however they are not recommended for day to day use. An explanation is in the Usage Guidelines in the appendix. Instead of using subjects, favor the factory methods we will look at in Part 2.
The fundamental types IObserver<T> and IObservable<T> and the auxiliary subject types create a base from which to build your Rx knowledge. It is important to understand these simple types and their implicit contracts. In production code you may find that you rarely use the IObserver<T> interface and subject types, but understanding them and how they fit into the Rx eco-system is still important. The IObservable<T> interface is the dominant type that you will be exposed to for representing a sequence of data in motion, and therefore will comprise the core concern for most of your work with Rx and most of this book.
Additional recommended reading
<< Back to : PART 1 - Getting started | Moving on to : Lifetime management>> |