Combining sequences
Data sources are everywhere, and sometimes we need to consume data from more than just a single source. Common examples that have many inputs include: price feeds, sensor networks, news feeds, social media aggregators, file watchers, multi touch surfaces, heart-beating/polling servers, etc. The way we deal with these multiple stimuli is varied too. We may want to consume it all as a deluge of integrated data, or one sequence at a time as sequential data. We could also get it in an orderly fashion, pairing data values from two sources to be processed together, or perhaps just consume the data from the first source that responds to the request.
Earlier chapters have also shown some examples of the fan out and back in style of data processing, where we partition data, and perform processing on each partition to convert high-volume data into lower-volume higher-value events before recombining. This ability to restructure streams greatly enhances the benefits of operator composition. If Rx only enabled us to apply composition as a simple linear processing chain, it would be a good deal less powerful. Being able to pull streams apart gives us much more flexibility. So even when there is a single source of events, we often still need to combine multiple observable streams as part of our processing. Sequence composition enables you to create complex queries across multiple data sources. This unlocks the possibility to write some very powerful yet succinct code.
We've already used SelectMany
in earlier chapters. This is one of the fundamental operators in Rx. As we saw in the Transformation chapter, it's possible to build several other operators from SelectMany
, and its ability to combine streams is part of what makes it powerful. But there are several more specialized combination operators available, which make it easier to solve certain problems than it would be using SelectMany
. Also, some operators we've seen before (including TakeUntil
and Buffer
) have overloads we've not yet explored that can combine multiple sequences.
Sequential Combination
We'll start with the simplest kind of combining operators, which do not attempt concurrent combination. They deal with one source sequence at a time.
Concat
Concat
is arguably the simplest way to combine sequences. It does the same thing as its namesake in other LINQ providers: it concatenates two sequences. The resulting sequence produces all of the elements from the first sequence, followed by all of the elements from the second sequence. The simplest signature for Concat
is as follows.
public static IObservable<TSource> Concat<TSource>(
this IObservable<TSource> first,
IObservable<TSource> second)
Since Concat
is an extension method, we can invoke it as a method on any sequence, passing the second sequence in as the only argument:
IObservable<int> s1 = Observable.Range(0, 3);
IObservable<int> s2 = Observable.Range(5, 5);
IObservable<int> c = s1.Concat(s2);
IDisposable sub = c.Subscribe(Console.WriteLine, x => Console.WriteLine("Error: " + x));
This marble diagram shows the items emerging from the two sources, s1
and s2
, and how Concat
combines them into the result, c
:
Rx's Concat
does nothing with its sources until something subscribes to the IObservable<T>
it returns. So in this case, when we call Subscribe
on c
(the source returned by Concat
) it will subscribe to its first input, s1
, and each time that produces a value, the c
observable will emit that same value to its subscriber. If we went on to call sub.Dispose()
before s1
completes, Concat
would unsubscribe from the first source, and would never subscribe to s2
. If s1
were to report an error, c
would report that same error to is subscriber, and again, it will never subscribe to s2
. Only if s1
completes will the Concat
operator subscribe to s2
, at which point it will forward any items that second input produces until either the second source completes or fails, or the application unsubscribes from the concatenated observable.
Although Rx's Concat
has the same logical behaviour as the LINQ to Objects Concat
, there are some Rx-specific details to be aware of. In particular, timing is often more significant in Rx than with other LINQ implementations. For example, in Rx we distinguish between hot and cold source. With a cold source it typically doesn't matter exactly when you subscribe, but hot sources are essentially live, so you only get notified of things that happen while you are subscribed. This can mean that hot sources might not be a good fit with Concat
The following marble diagram illustrates a scenario in which this produces results that have the potential to surprise:
Since Concat
doesn't subscribe to its second input until the first has finished, it won't see the first couple of items that the hot
source would deliver to any subscribers that been listening from the start. This might not be the behaviour you would expect: it certainly doesn't look like this concatenated all of the items from the first sequence with all of the items from the second one. It looks like it missed out A
and B
from hot
.
Marble Diagram Limitations
This last example reveals that marble diagrams gloss over a detail: they show when a source starts, when it produces values, and when it finishes, but they ignore the fact that to be able to produce items at all, an observable source needs a subscriber. If nothing subscribes to an IObservable<T>
, then it doesn't really produce anything. Concat
doesn't subscribe to its second input until the first completes, so arguably instead of the diagram above, it would be more accurate to show this:
This makes it easier to see why Concat
produces the output it does. But since hot
is a hot source here, this diagram fails to convey the fact that hot
is producing items entirely on its own schedule. In a scenario where hot
had multiple subscribers, then the earlier diagram would arguably be better because it correctly reflects every event available from hot
(regardless of however many listeners might be subscribed at any particular moment). But although this convention works for hot sources, it doesn't work for cold ones, which typically start producing items upon subscription. A source returned by Timer
produces items on a regular schedule, but that schedule starts at the instant when subscription occurs. That means that if there are multiple subscriptions, there are multiple schedules. Even if I have just a single IObservable<long>
returned by Observable.Timer
, each distinct subscriber will get items on its own schedule—subscribers receive events at a regular interval starting from whenever they happened to subscribe. So for cold observables, it typically makes sense to use the convention used by this second diagram, in which we're looking at the events received by one particular subscription to a source.
Most of the time we can get away with ignoring this subtlety, quietly using whichever convention suits us. To paraphrase Humpty Dumpty: when I use a marble diagram, it means just what I choose it to mean—neither more nor less. But when you're combining hot and cold sources together, there might not be one obviously best way to represent this in a marble diagram. We could even do something like this, where we describe the events that hot
represents separately from the events seen by a particular subscription to hot
.
We're using a distinct 'lane' in the marble diagram to represent the events seen by a particular subscription to a source. With this technique, we can also show what would happen if you pass the same cold source into Concat
twice:
This highlights the fact that that being a cold source, cold
provides items separately to each subscription. We see the same three values emerging from the same source, but at different times.
Concatenating Multiple Sources
What if you wanted to concatenate more than two sequences? Concat
has an overload accepting multiple observable sequences as an array. This is annotated with the params
keyword, so you don't need to construct the array explicitly. You can just pass any number of arguments, and the C# compiler will generate the code to create the array for you. There's also an overload taking an IEnumerable<IObservable<T>>
, in case the observables you want to concatenate are already in some collection.
public static IObservable<TSource> Concat<TSource>(
params IObservable<TSource>[] sources)
public static IObservable<TSource> Concat<TSource>(
this IEnumerable<IObservable<TSource>> sources)
The IEnumerable<IObservable<T>>
overload evaluates sources
lazily. It won't begin to ask it for source observables until someone subscribes to the observable that Concat
returns, and it only calls MoveNext
again on the resulting IEnumerator<IObservable<T>>
when the current source completes meaning it's ready to start on the text. To illustrate this, the following example is an iterator method that returns a sequence of sequences and is sprinkled with logging. It returns three observable sequences each with a single value [1], [2] and [3]. Each sequence returns its value on a timer delay.
public IEnumerable<IObservable<long>> GetSequences()
{
Console.WriteLine("GetSequences() called");
Console.WriteLine("Yield 1st sequence");
yield return Observable.Create<long>(o =>
{
Console.WriteLine("1st subscribed to");
return Observable.Timer(TimeSpan.FromMilliseconds(500))
.Select(i => 1L)
.Finally(() => Console.WriteLine("1st finished"))
.Subscribe(o);
});
Console.WriteLine("Yield 2nd sequence");
yield return Observable.Create<long>(o =>
{
Console.WriteLine("2nd subscribed to");
return Observable.Timer(TimeSpan.FromMilliseconds(300))
.Select(i => 2L)
.Finally(() => Console.WriteLine("2nd finished"))
.Subscribe(o);
});
Thread.Sleep(1000); // Force a delay
Console.WriteLine("Yield 3rd sequence");
yield return Observable.Create<long>(o =>
{
Console.WriteLine("3rd subscribed to");
return Observable.Timer(TimeSpan.FromMilliseconds(100))
.Select(i=>3L)
.Finally(() => Console.WriteLine("3rd finished"))
.Subscribe(o);
});
Console.WriteLine("GetSequences() complete");
}
We can call this GetSequences
method and pass the results to Concat
, and then use our Dump
extension method to watch what happens:
GetSequences().Concat().Dump("Concat");
Here's the output:
GetSequences() called
Yield 1st sequence
1st subscribed to
Concat-->1
1st finished
Yield 2nd sequence
2nd subscribed to
Concat-->2
2nd finished
Yield 3rd sequence
3rd subscribed to
Concat-->3
3rd finished
GetSequences() complete
Concat completed
Below is a marble diagram of the Concat
operator applied to the GetSequences
method. 's1', 's2' and 's3' represent sequence 1, 2 and 3. Respectively, 'rs' represents the result sequence.
You should note that once the iterator has executed its first yield return
to return the first sequence, the iterator does not continue until the first sequence has completed. The iterator calls Console.WriteLine
to display the text Yield 2nd sequence
immediately after that first yield return
, but you can see that message doesn't appear in the output until after we see the Concat-->1
message showing the first output from Concat
, and also the 1st finished
message, produced by the Finally
operator, which runs only after that first sequence has completed. (The code also makes that first source delay for 500ms before producing its value, so that if you run this, you can see that everything stops for a bit until that first source produces its single value then completes.) Once the first source completes, the GetSequences
method continues (because Concat
will ask it for the next item once the first observable source completes). When GetSequences
provides the second sequence with another yield return
, Concat
subscribes to that, and again GetSequences
makes no further progress until that second observable sequence completes. When asked for the third sequence, the iterator itself waits for a second before producing that third and final value, which you can see from the gap between the end of s2
and the start of s3
in the diagram.
Prepend
There's one particular scenario that Concat
supports, but in a slightly cumbersome way. It can sometimes be useful to make a sequence that always emits some initial value immediately. Take the example I've been using a lot in this book, where ships transmit AIS messages to report their location and other information: in some applications you might not want to wait until the ship happens next to transmit a message. You could imagine an application that records the last known location of any vessel. This would make it possible for the application to offer, say, an IObservable<IVesselNavigation>
which instantly reports the last known information upon subscription, and which then goes on to supply any newer messages if the vessel produces any.
How would we implement this? We want initially cold-source-like behaviour, but transitioning into hot. So we could just concatenate two sources. We could use Observable.Return
to create a single-element cold source, and then concatenate that with the live stream:
IVesselNavigation lastKnown = ais.GetLastReportedNavigationForVessel(mmsi);
IObservable<IVesselNavigation> live = ais.GetNavigationMessagesForVessel(mmsi);
IObservable<IVesselNavigation> lastKnownThenLive = Observable.Concat(
Observable.Return(lastKnown), live);
This is a common enough requirement that Rx supplies Prepend
that has a similar effect. We can replace the final line with:
IObservable<IVesselNavigation> lastKnownThenLive = live.Prepend(lastKnown);
This observable will do exactly the same thing: subscribers will immediately receive the lastKnown
, and then if the vessel should emit further navigation messages, they will receive those too. By the way, for this scenario you'd probably also want to ensure that the look up of the "last known" message happens as late as possible. We can delay this until the point of subscription by using Defer
:
public static IObservable<IVesselNavigation> GetLastKnownAndSubsequenceNavigationForVessel(uint mmsi)
{
return Observable.Defer<IVesselNavigation>(() =>
{
// This lambda will run each time someone subscribes.
IVesselNavigation lastKnown = ais.GetLastReportedNavigationForVessel(mmsi);
IObservable<IVesselNavigation> live = ais.GetNavigationMessagesForVessel(mmsi);
return live.Prepend(lastKnown);
}
}
StartWith
might remind you of BehaviorSubject<T>
, because that also ensures that consumers receive a value as soon as they subscribe. It's not quite the same: BehaviorSubject<T>
caches the last value its own source emits. You might think that would make it a better way to implement this vessel navigation example. However, since this example is able to return a source for any vessel (the mmsi
argument is a Maritime Mobile Service Identity uniquely identifying a vessel) it would need to keep a BehaviorSubject<T>
running for every single vessel you were interested in, which might be impractical.
BehaviorSubject<T>
can hold onto only one value, which is fine for this AIS scenario, and Prepend
shares this limitation. But what if you need a source to begin with some particular sequence?
StartWith
StartWith
is a generalization of Prepend
that enables us to provide any number of values to emit immediately upon subscription. As with Prepend
, it will then go on to forward any further notifications that emerge from the source.
As you can see from its signature, this method takes a params
array of values so you can pass in as many or as few values as you need:
// prefixes a sequence of values to an observable sequence.
public static IObservable<TSource> StartWith<TSource>(
this IObservable<TSource> source,
params TSource[] values)
There's also an overload that accepts an IEnumerable<T>
. Note that Rx will not defer its enumeration of this. StartWith
immediately converts the IEnumerable<T>
into an array before returning.
StartsWith
is not a common LINQ operator, and its existence is peculiar to Rx. If you imagine what StartsWith
would look like in LINQ to Objects, it would not be meaningfully different from Concat
. There's a difference in Rx because StartsWith
effectively bridges between pull and push worlds. It effectively converts the items we supply into an observable, and it then concatenates the source
argument onto that.
Append
The existence of Prepend
might lead you to wonder whether there is an Append
for adding a single item onto the end of any IObservable<T>
. After all, this is a common LINQ operator; LINQ to Objects has an Append
implementation, for example. And Rx does indeed supply such a thing:
IObservable<string> oneMore = arguments.Append("And another thing...");
There is no corresponding EndWith
. There's no fundamental reason that there couldn't be such a thing it's just that apparently there's not much demand—the Rx repository has not yet had a feature request. So although the symmetry of Prepend
and Append
does suggest that there could be a similar symmetry between StartWith
and an as-yet-hypothetical EndWith
, the absence of this counterpart doesn't seem to have caused any problems. There's an obvious value to being able to create observable sources that always immediately produce a useful output; it's not clear what EndWith
would be useful for, besides satisfying a craving for symmetry.
DefaultIfEmpty
The next operator we'll examine doesn't strictly performs sequential combination. However, it's a very close relative of Append
and Prepend
. Like those operators, this will emit everything their source does. And like those operators, DefaultIfEmpty
takes one additional item. The difference is that it won't always emit that additional item.
Whereas Prepend
emits its additional item at the start, and Append
emits its additional item at the end, DefaultIfEmpty
emits the additional item only if the source completes without producing anything. So this provides a way of guaranteeing that an observable will not be empty.
You don't have to supply DefaultIfEmpty
with a value. If you use the overload in which you supply no such value, it will just use default(T)
. This will be a zero-like value for struct types and null
for reference types.
Repeat
The final operator that combines sequences sequentially is Repeat
. It allows you to simply repeat a sequence. It offers overloads where you can specify the number of times to repeat the input, and one that repeats infinitely:
// Repeats the observable sequence a specified number of times.
public static IObservable<TSource> Repeat<TSource>(
this IObservable<TSource> source,
int repeatCount)
// Repeats the observable sequence indefinitely and sequentially.
public static IObservable<TSource> Repeat<TSource>(
this IObservable<TSource> source)
Repeat
resubscribes to the source for each repetition. This means that this will only strictly repeat if the source produces the same items each time you subscribe. Unlike the ReplaySubject<T>
, this doesn't store and replay the items that emerge from the source. This means that you normally won't want to call Repeat
on a hot source. (If you really want repetition of the output of a hot source, a combination of Replay
and Repeat
might fit the bill.)
If you use the overload that repeats indefinitely, then the only way the sequence will stop is if there is an error or the subscription is disposed of. The overload that specifies a repeat count will stop on error, un-subscription, or when it reaches that count. This example shows the sequence [0,1,2] being repeated three times.
var source = Observable.Range(0, 3);
var result = source.Repeat(3);
result.Subscribe(
Console.WriteLine,
() => Console.WriteLine("Completed"));
Output:
0
1
2
0
1
2
0
1
2
Completed
Concurrent sequences
We'll now move on to operators for combining observable sequences that might produce values concurrently.
Amb
Amb
is a strangely named operator. It's short for ambiguous, but that doesn't tell us much more than Amb
. If you're curious about the name you can read about the origins of Amb
in Appendix D, but for now, let's look at what it actually does.
Rx's Amb
takes any number of IObservable<T>
sources as inputs, and waits to see which, if any, first produces some sort of output. As soon as this happens, it immediately unsubscribes from all of the other sources, and forwards all notifications from the source that reacted first.
Why is that useful?
A common use case for Amb
is when you want to produce some sort of result as quickly as possible, and you have multiple options for obtaining that result, but you don't know in advance which will be fastest. Perhaps there are multiple servers that could all potentially give you the answer you want, and it's impossible to predict which will have the lowest response time. You could send requests to all of them, and then just use the first to respond. If you model each individual request as its own IObservable<T>
, Amb
can handle this for you. Note that this isn't very efficient: you're asking several servers all to do the same work, and you're going to discard the results from most of them. (Since Amb
unsubscribes from all the sources it's not going to use as soon as the first reacts, it's possible that you might be able to send a message to all the other servers to cancel the request. But this is still somewhat wasteful.) But there may be scenarios in which timeliness is crucial, and for those cases it might be worth tolerating a bit of wasted effort to produce faster results.
Amb
is broadly similar to Task.WhenAny
, in that it lets you detect when the first of multiple sources does something. However, the analogy is not precise. Amb
automatically unsubscribes from all of the other sources, ensuring that everything is cleaned up. With Task
you should always ensure that you eventually observe all tasks in case any of them faulted.
To illustrate Amb
's behaviour, here's a marble diagram showing three sequences, s1
, s2
, and s3
, each able to produce a sequence values. The line labelled r
shows the result of passing all three sequences into Amb
. As you can see, r
provides exactly the same notifications as s1
, because in this example, s1
was the first sequence to produce a value.
This code creates exactly the situation described in that marble diagram, to verify that this is indeed how Amb
behaves:
var s1 = new Subject<int>();
var s2 = new Subject<int>();
var s3 = new Subject<int>();
var result = Observable.Amb(s1, s2, s3);
result.Subscribe(
Console.WriteLine,
() => Console.WriteLine("Completed"));
s1.OnNext(1);
s2.OnNext(99);
s3.OnNext(8);
s1.OnNext(2);
s2.OnNext(88);
s3.OnNext(7);
s2.OnCompleted();
s1.OnNext(3);
s3.OnNext(6);
s1.OnNext(4);
s1.OnCompleted();
s3.OnCompleted();
Output:
1
2
3
4
Completed
If we changed the order so that s2.OnNext(99)
came before the call to s1.OnNext(1);
then s2 would produce values first and the marble diagram would look like this.
There are a few overloads of Amb
. The preceding example used the overload that takes a params
array of sequences. There's also an overload that takes exactly two sources, avoiding the array allocation that occurs with params
. Finally, you could pass in an IEnumerable<IObservable<T>>
. (Note that there are no overloads that take an IObservable<IObservable<T>>
. Amb
requires all of the source observables it monitors to be supplied up front.)
// Propagates the observable sequence that reacts first.
public static IObservable<TSource> Amb<TSource>(
this IObservable<TSource> first,
IObservable<TSource> second)
{...}
public static IObservable<TSource> Amb<TSource>(
params IObservable<TSource>[] sources)
{...}
public static IObservable<TSource> Amb<TSource>(
this IEnumerable<IObservable<TSource>> sources)
{...}
Reusing the GetSequences
method from the Concat
section, we see that Amb
evaluates the outer (IEnumerable) sequence completely before subscribing to any of the sequences it returns.
GetSequences().Amb().Dump("Amb");
Output:
GetSequences() called
Yield 1st sequence
Yield 2nd sequence
Yield 3rd sequence
GetSequences() complete
1st subscribed to
2nd subscribed to
3rd subscribed to
Amb-->3
Amb completed
Here is the marble diagram illustrating how this code behaves:
Remember that GetSequences
produces its first two observables as soon as it is asked for them, and then waits for 1 second before producing the third and final one. But unlike Concat
, Amb
won't subscribe to any of its sources until it has retrieved all of them from the iterator, which is why this marble diagram shows the subscriptions to all three sources starting after 1 second. (The first two sources were available earlier—Amb
would have started enumerating the sources as soon as subscription occurred, but it waited until it had all three before subscribing, which is why they all appear over on the right.) The third sequence has the shortest delay between subscription and producing its value, so although it's the last observable returned, it is able to produce its value the fastest even though there are two sequences yielded one second before it (due to the Thread.Sleep
).
Merge
The Merge
extension method takes multiple sequences as its input. Any time any of those input sequences produces a value, the observable returned by Merge
produces that same value. If the input sequences produce values at the same time on different threads, Merge
handles this safely, ensuring that it delivers items one at a time.
Since Merge
returns a single observable sequence that includes all of the values from all of its input sequences, there's a sense in which it is similar to Concat
. But whereas Concat
waits until each input sequence completes before moving onto the next, Merge
supports concurrently active sequences. As soon as you subscribe to the observable returned by Merge
, it immediately subscribes to all of its inputs, forwarding everything any of them produces. This marble diagram shows two sequences, s1
and s2
, running concurrently and r
shows the effect of combining these with Merge
: the values from both source sequences emerge from the merged sequence.
The result of a Merge
will complete only once all input sequences complete. However, the Merge
operator will error if any of the input sequences terminates erroneously (at which point it will unsubscribe from all its other inputs).
If you read the Creating Observables chapter, you've already seen one example of Merge
. I used it to combine the individual sequences representing the various events provided by a FileSystemWatcher
into a single stream at the end of the 'Representing Filesystem Events in Rx' section. As another example, let's look at AIS once again. There is no publicly available single global source that can provide all AIS messages across the entire globe as an IObservable<IAisMessage>
. Any single source is likely to cover just one area, or maybe even just a single AIS receiver. With Merge
, it's straightforward to combine these into a single source:
IObservable<IAisMessage> station1 = aisStations.GetMessagesFromStation("AdurStation");
IObservable<IAisMessage> station2 = aisStations.GetMessagesFromStation("EastbourneStation");
IObservable<IAisMessage> allMessages = station1.Merge(station2);
If you want to combine more than two sources, you have a few options:
- Chain
Merge
operators together e.g.s1.Merge(s2).Merge(s3)
- Pass a
params
array of sequences to theObservable.Merge
static method. e.g.Observable.Merge(s1,s2,s3)
- Apply the
Merge
operator to anIEnumerable<IObservable<T>>
. - Apply the
Merge
operator to anIObservable<IObservable<T>>
.
The overloads look like this:
/// Merges two observable sequences into a single observable sequence.
/// Returns a sequence that merges the elements of the given sequences.
public static IObservable<TSource> Merge<TSource>(
this IObservable<TSource> first,
IObservable<TSource> second)
{...}
// Merges all the observable sequences into a single observable sequence.
// The observable sequence that merges the elements of the observable sequences.
public static IObservable<TSource> Merge<TSource>(
params IObservable<TSource>[] sources)
{...}
// Merges an enumerable sequence of observable sequences into a single observable sequence.
public static IObservable<TSource> Merge<TSource>(
this IEnumerable<IObservable<TSource>> sources)
{...}
// Merges an observable sequence of observable sequences into an observable sequence.
// Merges all the elements of the inner sequences in to the output sequence.
public static IObservable<TSource> Merge<TSource>(
this IObservable<IObservable<TSource>> sources)
{...}
As the number of sources being merged goes up, the operators that take collections have an advantage over the first overload. (I.e., s1.Merge(s2).Merge(s3)
performs slightly less well than Observable.Merge(new[] { s1, s2, s3 })
, or the equivalent Observable.Merge(s1, s2, s3)
.) However, for just three or four, the differences are small, so in practice you can choose between the first two overloads as a matter of your preferred style. (If you're merging 100 sources or more the differences are more pronounced, but by that stage, the you probably wouldn't want to use the chained call style anyway.) The third and fourth overloads allow to you merge sequences that can be evaluated lazily at run time.
That last Merge
overload that takes a sequence of sequences is particularly interesting, because it makes it possible for the set of sources being merged to grow over time. Merge
will remain subscribed to sources
for as long as your code remains subscribed to the IObservable<T>
that Merge
returns. So if sources
emits more and more IObservable<T>
s over time, these will all be included by Merge
.
That might sound familiar. The SelectMany
operator, which is able to flatten multiple observable sources back out into a single observable source. This is just another illustration of why I've described SelectMany
as a fundamental operator in Rx: strictly speaking we don't need a lot of the operators that Rx gives us because we could build them using SelectMany
. Here's a simple re-implementation of that last Merge
overload using SelectMany
:
public static IObservable<T> MyMerge<T>(this IObservable<IObservable<T>> sources) =>
sources.SelectMany(source => source);
As well as illustrating that we don't technically need Rx to provide that last Merge
for us, it's also a good illustration of why it's helpful that it does. It's not immediately obvious what this does. Why are we passing a lambda that just returns its argument? Unless you've seen this before, it can take some thought to work out that SelectMany
expects us to pass a callback that it invokes for each incoming item, but that our input items are already nested sequences, so we can just return each item directly, and SelectMany
will then take that and merge everything it produces into its output stream. And even if you have internalized SelectMany
so completely that you know right away that this will just flatten sources
, you'd still probably find Observable.Merge(sources)
a more direct expression of intent. (Also, since Merge
is a more specialized operator, Rx is able to provide a very slightly more efficient implementation of it than the SelectMany
version shown above.)
If we again reuse the GetSequences
method, we can see how the Merge
operator works with a sequence of sequences.
GetSequences().Merge().Dump("Merge");
Output:
GetSequences() called
Yield 1st sequence
1st subscribed to
Yield 2nd sequence
2nd subscribed to
Merge --> 2
Merge --> 1
Yield 3rd sequence
3rd subscribed to
GetSequences() complete
Merge --> 3
Merge completed
As we can see from the marble diagram, s1 and s2 are yielded and subscribed to immediately. s3 is not yielded for one second and then is subscribed to. Once all input sequences have completed, the result sequence completes.
For each of the Merge
overloads that accept variable numbers of sources (either via an array, an IEnumerable<IObservable<T>>
, or an IObservable<IObservable<T>>
) there's an additional overload adding a maxconcurrent
parameter. For example:
public static IObservable<TSource> Merge<TSource>(this IEnumerable<IObservable<TSource>> sources, int maxConcurrent)
This enables you to limit the number of sources that Merge
accepts inputs from at any single time. If the number of sources available exceeds maxConcurrent
(either because you passed in a collection with more sources, or because you used the IObservable<IObservable<T>
-based overload and the source emitted more nested sources than maxConcurrent
) Merge
will wait for existing sources to complete before moving onto new ones. A maxConcurrent
of 1 makes Merge
behave in the same way as Concat
.
Switch
Rx's Switch
operator takes an IObservable<IObservable<T>>
, and produces notifications from the most recent nested observable. Each time its source produces a new nested IObservable<T>
, Switch
unsubscribes from the previous nested source (unless this is the first source, in which case there won't be a previous one) and subscribes to the latest one.
Switch
can be used in a 'time to leave' type feature for a calendar application. In fact you can see the source code for a modified version of how Bing provides (or at least provided; the implementation may have changed) notifications telling you that it's time to leave for an appointment. Since that's derived from a real example, it's a little complex, so I'll describe just the essence here.
The basic idea with a 'time to leave' notification is that we using map and route finding services to work out the expected journey time to get to wherever the appointment is, and to use the Timer
operator to create an IObservable<T>
that will produce a notification when it's time to leave. (Specifically this code produces an IObservable<TrafficInfo>
which reports the proposed route for the journey, and expected travel time.) However, there are two things that can change, rendering the initial predicted journey time useless. First, traffic conditions can change. When the user created their appointment, we have to guess the expected journey time based on how traffic normally flows at the time of day in question. However, if there turns out to be really bad traffic on the day, the estimate will need to be revised upwards, and we'll need to notify the user earlier.
The other thing that can change is the user's location. This will also obviously affect the predicted journey time.
To handle this, the system will need observable sources that can report changes in the user's location, and changes in traffic conditions affecting the proposed journey. Every time either of these reports a change, we will need to produce a new estimated journey time, and a new IObservable<TrafficInfo>
that will produce a notification when it's time to leave.
Every time we revise our estimate, we want to abandon the previously created IObservable<TrafficInfo>
. (Otherwise, the user will receive a bewildering number of notifications telling them to leave, one for every time we recalculated the journey time.) We just want to use the latest one. And that's exactly what Switch
does.
You can see the example for that scenario in the Reaqtor repo. Here, I'm going to present a different, simpler scenario: live searches. As you type, the text is sent to a search service and the results are returned to you as an observable sequence. Most implementations have a slight delay before sending the request so that unnecessary work does not happen. Imagine I want to search for "Intro to Rx". I quickly type in "Into to" and realize I have missed the letter 'r'. I stop briefly and change the text to "Intro ". By now, two searches have been sent to the server. The first search will return results that I do not want. Furthermore, if I were to receive data for the first search merged together with results for the second search, it would be a very odd experience for the user. I really only want results corresponding to the latest search text. This scenario fits perfectly with the Switch
method.
In this example, there is an IObservable<string>
source that represents the search text—each new value the user types emerges from this source sequence. We also have a search function that produces a single search result for a given search term:
private IObservable<string> SearchResults(string query)
{
...
}
This returns just a single value, but we model it as an IObservable<string>
partly to deal with the fact that it might take some time to perform the search, and also to be enable to use it with Rx. We can take our source of search terms, and then use Select
to pass each new search value to this SearchResults
function. This creates our resulting nested sequence, IObservable<IObservable<string>>
.
Suppose we were to then use Merge
to process the results:
IObservable<string> searchValues = ....;
IObservable<IObservable<string>> search = searchValues.Select(searchText => SearchResults(searchText));
var subscription = search
.Merge()
.Subscribe(Console.WriteLine);
If we were lucky and each search completed before the next element from searchValues
was produced, the output would look sensible. However, it is much more likely, however that multiple searches will result in overlapped search results. This marble diagram shows what the Merge
function could do in such a situation.
Note how the values from the search results are all mixed together. The fact that some search terms took longer to get a search result than others has also meant that they have come out in the wrong order. This is not what we want. If we use the Switch
extension method we will get much better results. Switch
will subscribe to the outer sequence and as each inner sequence is yielded it will subscribe to the new inner sequence and dispose of the subscription to the previous inner sequence. This will result in the following marble diagram:
Now, each time a new search term arrives, causing a new search to be kicked off, a corresponding new IObservable<string>
for that search's results appears, causing Switch
to unsubscribe from the previous results. This means that any results that arrive too late (i.e., when the result is for a search term that is no longer the one in the search box) will be dropped. As it happens, in this particular example, this means that we only see the result for the final search term. All the intermediate values that we saw as the user was typing didn't hang around for long, because the user kept on pressing the next key before we'd received the previous value's results. Only at the end, when the user stopped typing for long enough that the search results came back before they became out of date, do we finally see a value from Switch
. The net effect is that we've eliminated confusing results that are out of date.
This is another diagram where the ambiguity of marble diagrams causes a slight issue. I've shown each of the single-value observables produced by each of the calls to SearchResults
, but in practice Switch
unsubscribes from all but the last of these before they've had a chance to produce a value. So this diagram is showing the values those sources could potentially produce, and not the values that they actually delivered as part of the subscription, because the subscriptions were cut short.
Pairing sequences
The previous methods allowed us to flatten multiple sequences sharing a common type into a result sequence of the same type (with various strategies for deciding what to include and what to discard). The operators in this section still take multiple sequences as an input, but attempt to pair values from each sequence to produce a single value for the output sequence. In some cases, they also allow you to provide sequences of different types.
Zip
Zip
combines pairs of items from two sequences. So its first output is created by combining the first item from one input with the first item from the other. The second output combines the second item from each input. And so on. The name is meant to evoke a zipper on clothing or a bag, which brings the teeth on each half of the zipper together one pair at a time.
Since Zip
combines pairs of item in strict order, it will complete when the first of the sequences complete. If one of the sequence has reached its end, then even if the other continues to emit values, there will be nothing to pair any of these values with, so Zip
just unsubscribes at this point, discards the unpairable values, and reports completion.
If either of the sequences produces an error, the sequence returned by Zip
will report that same error.
If one of the source sequences publishes values faster than the other sequence, the rate of publishing will be dictated by the slower of the two sequences, because it can only emit an item when it has one from each source.
Here's an example:
// Generate values 0,1,2
var nums = Observable.Interval(TimeSpan.FromMilliseconds(250))
.Take(3);
// Generate values a,b,c,d,e,f
var chars = Observable.Interval(TimeSpan.FromMilliseconds(150))
.Take(6)
.Select(i => Char.ConvertFromUtf32((int)i + 97));
// Zip values together
nums.Zip(chars, (lhs, rhs) => (lhs, rhs)))
.Dump("Zip");
The effect can be seen in this marble diagram below.:
Here's the actual output of the code:
{ Left = 0, Right = a }
{ Left = 1, Right = b }
{ Left = 2, Right = c }
Note that the nums
sequence only produced three values before completing, while the chars
sequence produced six values. The result sequence produced three values, this was as many pairs is it could make.
It is also worth noting that Zip
has a second overload that takes an IEnumerable<T>
as the second input sequence.
// Merges an observable sequence and an enumerable sequence into one observable sequence
// containing the result of pair-wise combining the elements by using the selector function.
public static IObservable<TResult> Zip<TFirst, TSecond, TResult>(
this IObservable<TFirst> first,
IEnumerable<TSecond> second,
Func<TFirst, TSecond, TResult> resultSelector)
{...}
This allows us to zip sequences from both IEnumerable<T>
and IObservable<T>
paradigms!
SequenceEqual
There's another operator that processes pairs of items from two sources: SequenceEqual
. But instead of producing an output for each pair of inputs, this compares each pair, and ultimately produces a single value indicating whether every pair of inputs was equal or not.
In the case where the sources produce different values, SequenceEqual
produces a single false
value as soon as it detects this. But if the sources are equal, it can only report this when both have completed because until that happens, it doesn't yet know if there might a difference coming later. Here's an example illustrating its behaviour:
var subject1 = new Subject<int>();
subject1.Subscribe(
i => Console.WriteLine($"subject1.OnNext({i})"),
() => Console.WriteLine("subject1 completed"));
var subject2 = new Subject<int>();
subject2.Subscribe(
i => Console.WriteLine($"subject2.OnNext({i})"),
() => Console.WriteLine("subject2 completed"));
var areEqual = subject1.SequenceEqual(subject2);
areEqual.Subscribe(
i => Console.WriteLine($"areEqual.OnNext({i})"),
() => Console.WriteLine("areEqual completed"));
subject1.OnNext(1);
subject1.OnNext(2);
subject2.OnNext(1);
subject2.OnNext(2);
subject2.OnNext(3);
subject1.OnNext(3);
subject1.OnCompleted();
subject2.OnCompleted();
Output:
subject1.OnNext(1)
subject1.OnNext(2)
subject2.OnNext(1)
subject2.OnNext(2)
subject2.OnNext(3)
subject1.OnNext(3)
subject1 completed
subject2 completed
areEqual.OnNext(True)
areEqual completed
CombineLatest
The CombineLatest
operator is similar to Zip
in that it combines pairs of items from its sources. However, instead of pairing the first items, then the second, and so on, CombineLatest
produces an output any time either of its inputs produces a new value. For each new value to emerge from an input, CombineLatest
uses that along with the most recently seen value from the other input. (To be precise, it doesn't produce anything until each input has produced at least one value, so if one input takes longer to get started than the other, there will be a period in which CombineLatest
doesn't in fact produce an output each time one of its inputs does, because it's waiting for the other to produce its first value.) The signature is as follows.
// Composes two observable sequences into one observable sequence by using the selector
// function whenever one of the observable sequences produces an element.
public static IObservable<TResult> CombineLatest<TFirst, TSecond, TResult>(
this IObservable<TFirst> first,
IObservable<TSecond> second,
Func<TFirst, TSecond, TResult> resultSelector)
{...}
The marble diagram below shows off usage of CombineLatest
with one sequence that produces numbers (s1
), and the other letters (s2
). If the resultSelector
function just joins the number and letter together as a pair, this would produce the result shown on the bottom line. I've colour coded each output to indicate which of the two sources caused it to emit that particular result, but as you can see, each output includes a value from each source.
If we slowly walk through the above marble diagram, we first see that s2
produces the letter 'a'. s1
has not produced any value yet so there is nothing to pair, meaning that no value is produced for the result. Next, s1
produces the number '1' so the result sequence can now produce a pair '1,a'. We then receive the number '2' from s1
. The last letter is still 'a' so the next pair is '2,a'. The letter 'b' is then produced creating the pair '2,b', followed by 'c' giving '2,c'. Finally the number 3 is produced and we get the pair '3,c'.
This is great in case you need to evaluate some combination of state which needs to be kept up-to-date when any single component of that state changes. A simple example would be a monitoring system. Each service is represented by a sequence that returns a Boolean indicating the availability of said service. The monitoring status is green if all services are available; we can achieve this by having the result selector perform a logical AND. Here is an example.
IObservable<bool> webServerStatus = GetWebStatus();
IObservable<bool> databaseStatus = GetDBStatus();
// Yields true when both systems are up.
var systemStatus = webServerStatus
.CombineLatest(
databaseStatus,
(webStatus, dbStatus) => webStatus && dbStatus);
You may have noticed that this method could produce a lot of duplicate values. For example, if the web server goes down the result sequence will yield 'false
'. If the database then goes down, another (unnecessary) 'false
' value will be yielded. This would be an appropriate time to use the DistinctUntilChanged
extension method. The corrected code would look like the example below.
// Yields true when both systems are up, and only on change of status
var systemStatus = webServerStatus
.CombineLatest(
databaseStatus,
(webStatus, dbStatus) => webStatus && dbStatus)
.DistinctUntilChanged();
Join
The Join
operator allows you to logically join two sequences. Whereas the Zip
operator would pair values from the two sequences based on their position within the sequence, the Join
operator allows you join sequences based on when elements are emitted.
Since the production of a value by an observable source is logically an instantaneous event, joins use a model of intersecting windows. Recall that with the Window
operator, you can define the duration of each window using an observable sequence. The Join
operator uses a similar concept: for each source, we can define a time window over which each element is considered to be 'current' and two elements from different sources will be joined if their time windows overlap. As the Zip
operator, we also need to provide a selector function to produce the result item from each pair of values. Here's the Join
operator:
public static IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>
(
this IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
Func<TLeft, TRight, TResult> resultSelector
)
This is a complex signature to try and understand in one go, so let's take it one parameter at a time.
IObservable<TLeft> left
is the first source sequence. IObservable<TRight> right
is the second source sequence. Join
is looking to produce pairs of items, with each pair containing one element from left
and one element from right
.
The leftDurationSelector
argument enables us to define the time window for each item from left
. A source item's time window begins when the source emits the item. To determine when the window for an item from left
should close, Join
will invoke the leftDurationSelector
, passing in the value just produced by left
. This selector must return an observable source. (It doesn't matter at all what the element type of this source is, because Join
is only interested in when it does things.) The item's time window ends as soon as the source returned for that item by leftDurationSelector
either produces a value or completes.
The rightDurationSelector
argument defines the time window for each item from right
. It works in exactly the same way as the leftDurationSelector
.
Initially, there are no current items. But as left
and right
produce items, these items' windows will start, so Join
might have multiple items all with their windows currently open. Each time left
produces a new item, Join
looks to see if any items from right
still have their windows open. If they do, left
is now paired with each of them. (So a single item from one source might be joined with multiple items from the other source.) Join
calls the resultSelector
for each such pairing. Likewise, each time right
produces an item, then if there are any currently open windows for items from left
, that new item from right
will be paired with each of these, and again, resultSelector
will be called for each such pairing.
The observable returned by Join
produces the result of each call to resultSelector
.
Let us now imagine a scenario where the left sequence produces values twice as fast as the right sequence. Imagine that in addition we never close the left windows; we could do this by always returning Observable.Never<Unit>()
from the leftDurationSelector
function. And imagine that we make the right windows close as soon as they possibly can, which we can achieve by making rightDurationSelector
return Observable.Empty<Unit>()
. The following marble diagram illustrates this:
Each time a left duration window intersects with a right duration window, we get an output. The right duration windows are all effectively of zero length, but this doesn't stop them from intersecting with the left duration windows, because those all never end. So the first item from right has a (zero-length) window that falls inside two of the windows for the left
items, and so Join
produces two results. I've stacked these vertically on the diagram to show that they happen at virtually the same time. Of course, the rules of IObserver<T>
mean that they can't actually happen at the same time: Join
has to wait until the consumer's OnNext
has finished processing 0,A
before it can go on to produce 1,A
. But it will produce all the pairs as quickly as possible any time a single event from one source overlaps with multiple windows for the other.
If I also immediately closed the left window by returning Observable.Empty<Unit>
, or perhaps Observable.Return(0)
, the windows would never overlap, so no pairs would ever get produced. (In theory if both left and right produce items at exactly the same time, then perhaps we might get a pair, but since the timing of events is never absolutely precise, it would be a bad idea to design a system that depended on this.)
What if I wanted to ensure that items from right
only ever intersected with a single value from left
? In that case, I'd need to ensure that the left durations did not overlap. One way to do that would be to have my leftDurationSelector
always return the same sequence that I passed as the left
sequence. This will result in Join
making multiple subscriptions to the same source, and for some kinds of sources that might introduce unwanted side effects, but the Publish
and RefCount
operators provide a way to deal with that, so this is in fact a reasonably strategy. If we do that, the results look more like this.
The last example is very similar to CombineLatest
, except that it is only producing a pair when the right sequence changes. We can easily make it work the same way by changing the right durations to work in the same way as the left durations. This code shows how (including the use of Publish
and RefCount
to ensure that we only get a single subscription to the underlying left
and right
sources despite providing then to Join
many times over).
public static IObservable<TResult> MyCombineLatest<TLeft, TRight, TResult>
(
IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, TRight, TResult> resultSelector
)
{
var refcountedLeft = left.Publish().RefCount();
var refcountedRight = right.Publish().RefCount();
return Observable.Join(
refcountedLeft,
refcountedRight,
value => refcountedLeft,
value => refcountedRight,
resultSelector);
}
Obviously there's no need to write this—you can just use the built-in CombineLatest
. (And that will be slightly more efficient because it has a specialized implementation.) But it shows that Join
is a powerful operator.
GroupJoin
When the Join
operator pairs up values whose windows overlap, it will pass the scalar values left and right to the resultSelector
. The GroupJoin
operator is based on the same concept of overlapping windows, but its selector works slightly differently: GroupJoin
still passes a single (scalar) value from the left source, but it passes an IObservable<TRight>
as the second argument. This argument represents all of the values from the right sequence that occur within the window for the particular left value for which it was invoked.
So this lacks the symmetry of Join
, because the left and right sources are handled differently. GroupJoin
will call the resultSelector
exactly once for each item produced by the left
source. When a left value's window overlaps with the windows of multiple right values, Group
would deal with that by calling the selector once for each such pairing, but GroupJoin
deals with this by having the observable passed as the second argument to resultSelector
emit each of the right items that overlap with that left item. (If a left item overlaps with nothing from the right, resultSelector
will still be called with that item, it'll just be passed an IObservable<TRight>
that doesn't produce any items.)
The GroupJoin
signature is very similar to Join
, but note the difference in the resultSelector
parameter.
public static IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>
(
this IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
Func<TLeft, IObservable<TRight>, TResult> resultSelector
)
If we went back to our first Join
example where we had
- the
left
producing values twice as fast as the right, - the left never expiring
- the right immediately expiring
This diagram shows those same inputs again, and also shows the observables GroupJoin
would pass to the resultSelector
for each of the items produced by left
:
This produces events corresponding to all of the same events that Join
produced, they're just distributed across six different IObservable<TRight>
sources. It may have occurred to you that with GroupJoin
you could effectively re-create your own Join
method by doing something like this:
public IObservable<TResult> MyJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
IObservable<TLeft> left,
IObservable<TRight> right,
Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
Func<TLeft, TRight, TResult> resultSelector)
{
return Observable.GroupJoin
(
left,
right,
leftDurationSelector,
rightDurationSelector,
(leftValue, rightValues) =>
rightValues.Select(rightValue=>resultSelector(leftValue, rightValue))
)
.Merge();
}
You could even create a crude version of Window
with this code:
public IObservable<IObservable<T>> MyWindow<T>(IObservable<T> source, TimeSpan windowPeriod)
{
return Observable.Create<IObservable<T>>(o =>;
{
var sharedSource = source
.Publish()
.RefCount();
var intervals = Observable.Return(0L)
.Concat(Observable.Interval(windowPeriod))
.TakeUntil(sharedSource.TakeLast(1))
.Publish()
.RefCount();
return intervals.GroupJoin(
sharedSource,
_ => intervals,
_ => Observable.Empty<Unit>(),
(left, sourceValues) => sourceValues)
.Subscribe(o);
});
}
Rx delivers yet another way to query data in motion by allowing you to interrogate sequences of coincidence. This enables you to solve the intrinsically complex problem of managing state and concurrency while performing matching from multiple sources. By encapsulating these low level operations, you are able to leverage Rx to design your software in an expressive and testable fashion. Using the Rx operators as building blocks, your code effectively becomes a composition of many simple operators. This allows the complexity of the domain code to be the focus, not the otherwise incidental supporting code.
And-Then-When
Zip
can take only two sequences as an input. If that is a problem, then you can use a combination of the three And
/Then
/When
methods. These methods are used slightly differently from most of the other Rx methods. Out of these three, And
is the only extension method to IObservable<T>
. Unlike most Rx operators, it does not return a sequence; instead, it returns the mysterious type Pattern<T1, T2>
. The Pattern<T1, T2>
type is public (obviously), but all of its properties are internal. The only two (useful) things you can do with a Pattern<T1, T2>
are invoking its And
or Then
methods. The And
method called on the Pattern<T1, T2>
returns a Pattern<T1, T2, T3>
. On that type, you will also find the And
and Then
methods. The generic Pattern
types are there to allow you to chain multiple And
methods together, each one extending the generic type parameter list by one. You then bring them all together with the Then
method overloads. The Then
methods return you a Plan
type. Finally, you pass this Plan
to the Observable.When
method in order to create your sequence.
It may sound very complex, but comparing some code samples should make it easier to understand. It will also allow you to see which style you prefer to use.
To Zip
three sequences together, you can either use Zip
methods chained together like this:
IObservable<long> one = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
IObservable<long> two = Observable.Interval(TimeSpan.FromMilliseconds(250)).Take(10);
IObservable<long> three = Observable.Interval(TimeSpan.FromMilliseconds(150)).Take(14);
// lhs represents 'Left Hand Side'
// rhs represents 'Right Hand Side'
IObservable<(long One, long Two, long Three)> zippedSequence = one
.Zip(two, (lhs, rhs) => (One: lhs, Two: rhs))
.Zip(three, (lhs, rhs) => (lhs.One, lhs.Two, Three: rhs));
zippedSequence.Subscribe(
v => Console.WriteLine($"One: {v.One}, Two: {v.Two}, Three: {v.Three}"),
() => Console.WriteLine("Completed"));
Or perhaps use the nicer syntax of the And
/Then
/When
:
Pattern<long, long, long> pattern =
one.And(two).And(three);
Plan<(long One, long Two, long Three)> plan =
pattern.Then((first, second, third) => (One: first, Two: second, Three: third));
IObservable<(long One, long Two, long Three)> zippedSequence = Observable.When(plan);
zippedSequence.Subscribe(
v => Console.WriteLine($"One: {v.One}, Two: {v.Two}, Three: {v.Three}"),
() => Console.WriteLine("Completed"));
This can be further reduced, if you prefer, to:
IObservable<(long One, long Two, long Three)> zippedSequence = Observable.When(
one.And(two).And(three)
.Then((first, second, third) =>
(One: first, Two: second, Three: third))
);
zippedSequence.Subscribe(
v => Console.WriteLine($"One: {v.One}, Two: {v.Two}, Three: {v.Three}"),
() => Console.WriteLine("Completed"));
The And
/Then
/When
trio has more overloads that enable you to group an even greater number of sequences. They also allow you to provide more than one 'plan' (the output of the Then
method). This gives you the Merge
feature but on the collection of 'plans'. I would suggest playing around with them if this functionality is of interest to you. The verbosity of enumerating all of the combinations of these methods would be of low value. You will get far more value out of using them and discovering for yourself.
Summary
This chapter covered a set of methods that allow us to combine observable sequences. This brings us to a close on Part 2. We've looked at the operators that are mostly concerned with defining the computations we want to perform on the data. In Part 3 we will move onto practical concerns such as managing scheduling, side effects, and error handling.