Creating Observable Sequences
In the preceding chapter, we saw the two fundamental Rx interfaces, IObservable<T>
and IObserver<T>
. We also saw how to receive events by implementing IObserver<T>
, and also by using implementations supplied by the System.Reactive
package. In this chapter we'll see how to create IObservable<T>
sources to represent source events of interest in your application.
We will begin by implementing IObservable<T>
directly. In practice, it's relatively unusual to do that, so we'll then look at the various ways you can get System.Reactive
to supply an implementation that does most of the work for you.
A Very Basic IObservable<T>
Implementation
Here's an implementation of an IObservable<int>
that produces a sequence of numbers:
public class MySequenceOfNumbers : IObservable<int>
{
public IDisposable Subscribe(IObserver<int> observer)
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnCompleted();
return System.Reactive.Disposables.Disposable.Empty; // Handy do-nothing IDisposable
}
}
We can test this by constructing an instance of it, and then subscribing to it:
var numbers = new MySequenceOfNumbers();
numbers.Subscribe(
number => Console.WriteLine($"Received value: {number}"),
() => Console.WriteLine("Sequence terminated"));
This produces the following output:
Received value 1
Received value 2
Received value 3
Sequence terminated
Although MySequenceOfNumbers
is technically a correct implementation of IObservable<int>
, it is a little too simple to be useful. For one thing, we typically use Rx when there are events of interest, but this is not really reactive at all—it just produces a fixed set of numbers immediately. Moreover, the implementation is blocking—it doesn't even return from Subscribe
until after it has finished producing all of its values. This example illustrates the basics of how a source provides events to a subscriber, but if we just want to represent a predetermined sequence of numbers, we might as well use an IEnumerable<T>
implementation such as List<T>
or an array.
Representing Filesystem Events in Rx
Let's look at something a little more realistic. This is a wrapper around .NET's FileSystemWatcher
, presenting filesystem change notifications as an IObservable<FileSystemEventArgs>
. (Note: this is not necessarily the best design for an Rx FileSystemWatcher
wrapper. The watcher provides events for several different types of change, and one of them, Renamed
, provides details as a RenamedEventArgs
. This derives from FileSystemEventArgs
so collapsing everything down to a single event stream does work, but this would be inconvenient for applications that wanted access to the details of rename events. A more serious design problem is that this is incapable of reporting more than one event from FileSystemWatcher.Error
. Such errors might be transient and recoverable, in which case an application might want to continue operating, but since this class chooses to represent everything with a single IObservable<T>
, it reports errors by invoking the observer's OnError
, at which point the rules of Rx oblige us to stop. It would be possible to work around this with Rx's Retry
operator, which can automatically resubscribe after an error, but it might be better to offer a separate IObservable<ErrorEventArgs>
so that we can report errors in a non-terminating way. However, the additional complication of that won't always be warranted. The simplicity of this design means it will be a good fit for some applications. As is often the way with software design, there isn't a one-size-fits-all approach.)
// Represents filesystem changes as an Rx observable sequence.
// NOTE: this is an oversimplified example for illustration purposes.
// It does not handle multiple subscribers efficiently, it does not
// use IScheduler, and it stops immediately after the first error.
public class RxFsEvents : IObservable<FileSystemEventArgs>
{
private readonly string folder;
public RxFsEvents(string folder)
{
this.folder = folder;
}
public IDisposable Subscribe(IObserver<FileSystemEventArgs> observer)
{
// Inefficient if we get multiple subscribers.
FileSystemWatcher watcher = new(this.folder);
// FileSystemWatcher's documentation says nothing about which thread
// it raises events on (unless you use its SynchronizationObject,
// which integrates well with Windows Forms, but is inconvenient for
// us to use here) nor does it promise to wait until we've
// finished handling one event before it delivers the next. The Mac,
// Windows, and Linux implementations are all significantly different,
// so it would be unwise to rely on anything not guaranteed by the
// documentation. (As it happens, the Win32 implementation on .NET 7
// does appear to wait until each event handler returns before
// delivering the next event, so we probably would get way with
// ignoring this issue. For now. On Windows. And actually the Linux
// implementation dedicates a single thread to this job, but there's
// a comment in the source code saying that this should probably
// change - another reason to rely only on documented behaviour.)
// So it's our problem to ensure we obey the rules of IObserver<T>.
// First, we need to make sure that we only make one call at a time
// into the observer. A more realistic example would use an Rx
// IScheduler, but since we've not explained what those are yet,
// we're just going to use lock with this object.
object sync = new();
// More subtly, the FileSystemWatcher documentation doesn't make it
// clear whether we might continue to get a few more change events
// after it has reported an error. Since there are no promises about
// threads, it's possible that race conditions exist that would lead to
// us trying to handle an event from a FileSystemWatcher after it has
// reported an error. So we need to remember if we've already called
// OnError to make sure we don't break the IObserver<T> rules in that
// case.
bool onErrorAlreadyCalled = false;
void SendToObserver(object _, FileSystemEventArgs e)
{
lock (sync)
{
if (!onErrorAlreadyCalled)
{
observer.OnNext(e);
}
}
}
watcher.Created += SendToObserver;
watcher.Changed += SendToObserver;
watcher.Renamed += SendToObserver;
watcher.Deleted += SendToObserver;
watcher.Error += (_, e) =>
{
lock (sync)
{
// The FileSystemWatcher might report multiple errors, but
// we're only allowed to report one to IObservable<T>.
if (!onErrorAlreadyCalled)
{
observer.OnError(e.GetException());
onErrorAlreadyCalled = true;
watcher.Dispose();
}
}
};
watcher.EnableRaisingEvents = true;
return watcher;
}
}
That got more complex fast. This illustrates that IObservable<T>
implementations are responsible for obeying the IObserver<T>
rules. This is generally a good thing: it keeps the messy concerns around concurrency contained in a single place. Any IObserver<FileSystemEventArgs>
that I subscribe to this RxFsEvents
doesn't have to worry about concurrency, because it can count on the IObserver<T>
rules, which guarantee that it will only have to handle one thing at a time. If I hadn't been required to enforce these rules in the source, it might have made my RxFsEvents
class simpler, but all of that complexity of dealing with overlapping events would have spread out into the code that handles the events. Concurrency is hard enough to deal with when its effects are contained. Once it starts to spread across multiple types, it can become almost impossible to reason about. Rx's IObserver<T>
rules prevent this from happening.
(Note: this is a significant feature of Rx. The rules keep things simple for observers. This becomes increasingly important as the complexity of your event sources or event process grows.)
There are a couple of issues with this code (aside from the API design issues already mentioned). One is that when IObservable<T>
implementations produce events modelling real-life asynchronous activity (such as filesystem changes) applications will often want some way to take control over which threads notifications arrive on. For example, UI frameworks tend to have thread affinity requirements. You typically need to be on a particular thread to be allowed to update the user interface. Rx provides mechanisms for redirecting notifications onto different schedulers, so we can work around it, but we would normally expect to be able to provide this sort of observer with an IScheduler
, and for it to deliver notifications through that. We'll discuss schedulers in later chapters.
The other issue is that this does not deal with multiple subscribers efficiently. You're allowed to call IObservable<T>.Subscribe
multiple times, and if you do that with this code, it will create a new FileSystemWatcher
each time. That could happen more easily than you might think. Suppose we had an instance of this watcher, and wanted to handle different events in different ways. We might use the Where
operator to define observable sources that split events up in the way we want:
IObservable<FileSystemEventArgs> configChanges =
fs.Where(e => Path.GetExtension(e.Name) == ".config");
IObservable<FileSystemEventArgs> deletions =
fs.Where(e => e.ChangeType == WatcherChangeTypes.Deleted);
When you call Subscribe
on the IObservable<T>
returned by the Where
operator, it will call Subscribe
on its input. So in this case, if we call Subscribe
on both configChanges
and deletions
, that will result in two calls to Subscribe
on fs
. So if fs
is an instance of our RxFsEvents
type above, each one will construct its own FileSystemEventWatcher
, which is inefficient.
Rx offers a few ways to deal with this. It provides operators designed specifically to take an IObservable<T>
that does not tolerate multiple subscribers and wrap it in an adapter that can:
IObservable<FileSystemEventArgs> fs =
new RxFsEvents(@"c:\temp")
.Publish()
.RefCount();
But this is leaping ahead. (These operators are described in the Publishing Operators chapter.) If you want to build a type that is inherently multi-subscriber-friendly, all you really need to do is keep track of all your subscribers and notify each of them in a loop. Here's a modified version of the filesystem watcher:
public class RxFsEventsMultiSubscriber : IObservable<FileSystemEventArgs>
{
private readonly object sync = new();
private readonly List<Subscription> subscribers = new();
private readonly FileSystemWatcher watcher;
public RxFsEventsMultiSubscriber(string folder)
{
this.watcher = new FileSystemWatcher(folder);
watcher.Created += SendEventToObservers;
watcher.Changed += SendEventToObservers;
watcher.Renamed += SendEventToObservers;
watcher.Deleted += SendEventToObservers;
watcher.Error += SendErrorToObservers;
}
public IDisposable Subscribe(IObserver<FileSystemEventArgs> observer)
{
Subscription sub = new(this, observer);
lock (this.sync)
{
this.subscribers.Add(sub);
if (this.subscribers.Count == 1)
{
// We had no subscribers before, but now we've got one so we need
// to start up the FileSystemWatcher.
watcher.EnableRaisingEvents = true;
}
}
return sub;
}
private void Unsubscribe(Subscription sub)
{
lock (this.sync)
{
this.subscribers.Remove(sub);
if (this.subscribers.Count == 0)
{
watcher.EnableRaisingEvents = false;
}
}
}
void SendEventToObservers(object _, FileSystemEventArgs e)
{
lock (this.sync)
{
foreach (var subscription in this.subscribers)
{
subscription.Observer.OnNext(e);
}
}
}
void SendErrorToObservers(object _, ErrorEventArgs e)
{
Exception x = e.GetException();
lock (this.sync)
{
foreach (var subscription in this.subscribers)
{
subscription.Observer.OnError(x);
}
this.subscribers.Clear();
}
}
private class Subscription : IDisposable
{
private RxFsEventsMultiSubscriber? parent;
public Subscription(
RxFsEventsMultiSubscriber rxFsEventsMultiSubscriber,
IObserver<FileSystemEventArgs> observer)
{
this.parent = rxFsEventsMultiSubscriber;
this.Observer = observer;
}
public IObserver<FileSystemEventArgs> Observer { get; }
public void Dispose()
{
this.parent?.Unsubscribe(this);
this.parent = null;
}
}
}
This creates only a single FileSystemWatcher
instance no matter how many times Subscribe
is called. Notice that I've had to introduce a nested class to provide the IDisposable
that Subscribe
returns. I didn't need that with the very first IObservable<T>
implementation in this chapter because it had already completed the sequence before returning, so it was able to return the Disposable.Empty
property conveniently supplied by Rx. (This is handy in cases where you're obliged to supply an IDisposable
, but you don't actually need to do anything when disposed.) And in my first FileSystemWatcher
wrapper, RxFsEvents
, I just returned the FileSystemWatcher
itself from Dispose
. (This works because FileSystemWatcher.Dispose
shuts down the watcher, and each subscriber was given its own FileSystemWatcher
.) But now that a single FileSystemWatcher
supports multiple observers, we need to do a little more work when an observer unsubscribes.
When a Subscription
instance that we returned from Subscribe
gets disposed, it removes itself from the list of subscribers, ensuring that it won't receive any more notifications. It also sets the FileSystemWatcher
's EnableRaisingEvents
to false if there are no more subscribers, ensuring that this source does not do unnecessary work if nothing needs notifications right now.
This is looking more realistic than the first example. This is truly a source of events that could occur at any moment (making this exactly the sort of thing well suited to Rx) and it now handles multiple subscribers intelligently. However, we wouldn't often write things this way. We're doing all the work ourselves here—this code doesn't even require a reference to the System.Reactive
package because the only Rx types it refers to are IObservable<T>
and IObserver<T>
, both of which are built into the .NET runtime libraries. In practice we typically defer to helpers in System.Reactive
because they can do a lot of work for us.
For example, suppose we only cared about Changed
events. We could write just this:
FileSystemWatcher watcher = new (@"c:\temp");
IObservable<FileSystemEventArgs> changes = Observable
.FromEventPattern<FileSystemEventArgs>(watcher, nameof(watcher.Changed))
.Select(ep => ep.EventArgs);
watcher.EnableRaisingEvents = true;
Here we're using the FromEventPattern
helper from the System.Reactive
library's Observable
class, which can be used to build an IObservable<T>
from any .NET event that conforms to the normal pattern (in which event handlers take two arguments: a sender of type object
, and then some EventArgs
-derived type containing information about the event). This is not as flexible as the earlier example. It reports only one of the events, and we have to manually start (and, if necessary stop) the FileSystemWatcher
. But for some applications that will be good enough, and this is a lot less code to write. If we were aiming to write a fully-featured wrapper for FileSystemWatcher
suitable for many different scenarios, it might be worth writing a specialized IObservable<T>
implementation as shown earlier. (We could easily extend this last example to watch all of the events. We'd just use the FromEventPattern
once for each event, and then use Observable.Merge
to combine the four resulting observables into one. The only real benefit we're getting from a full custom implementation is that we can automatically start and stop the FileSystemWatcher
depending on whether there are currently any observers.) But if we just need to represent some events as an IObservable<T>
so that we can work with them in our application, we can just use this simpler approach.
In practice, we almost always get System.Reactive
to implement IObservable<T>
for us. Even if we want to take control of certain aspects (such as automatically starting up and shutting down the FileSystemWatcher
in these examples) we can almost always find a combination of operators that enable this. The following code uses various methods from System.Reactive
to return an IObservable<FileSystemEventArgs>
that has all the same functionality as the fully-featured hand-written RxFsEventsMultiSubscriber
above, but with considerably less code.
IObservable<FileSystemEventArgs> ObserveFileSystem(string folder)
{
return
// Observable.Defer enables us to avoid doing any work
// until we have a subscriber.
Observable.Defer(() =>
{
FileSystemWatcher fsw = new(folder);
fsw.EnableRaisingEvents = true;
return Observable.Return(fsw);
})
// Once the preceding part emits the FileSystemWatcher
// (which will happen when someone first subscribes), we
// want to wrap all the events as IObservable<T>s, for which
// we'll use a projection. To avoid ending up with an
// IObservable<IObservable<FileSystemEventArgs>>, we use
// SelectMany, which effectively flattens it by one level.
.SelectMany(fsw =>
Observable.Merge(new[]
{
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Created += h, h => fsw.Created -= h),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Changed += h, h => fsw.Changed -= h),
Observable.FromEventPattern<RenamedEventHandler, FileSystemEventArgs>(
h => fsw.Renamed += h, h => fsw.Renamed -= h),
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Deleted += h, h => fsw.Deleted -= h)
})
// FromEventPattern supplies both the sender and the event
// args. Extract just the latter.
.Select(ep => ep.EventArgs)
// The Finally here ensures the watcher gets shut down once
// we have no subscribers.
.Finally(() => fsw.Dispose()))
// This combination of Publish and RefCount means that multiple
// subscribers will get to share a single FileSystemWatcher,
// but that it gets shut down if all subscribers unsubscribe.
.Publish()
.RefCount();
}
I've used a lot of methods there, most of which I've not talked about before. For that example to make any sense, I clearly need to start describing the numerous ways in which the System.Reactive
package can implement IObservable<T>
for you.
Simple factory methods
Due to the large number of methods available for creating observable sequences, we will break them down into categories. Our first category of methods create IObservable<T>
sequences that produce at most a single result.
Observable.Return
One of the simplest factory methods is Observable.Return<T>(T value)
, which you've already seen in the Quiescent
example in the preceding chapter. This method takes a value of type T
and returns an IObservable<T>
which will produce this single value and then complete. In a sense, this wraps a value in an IObservable<T>
; it's conceptually similar to writing new T[] { value }
, in that it's a sequence containing just one element. You could also think of it as being the Rx equivalent of Task.FromResult
, which you can use when you have a value of some type T
, and need to pass it to something that wants a Task<T>
.
IObservable<string> singleValue = Observable.Return<string>("Value");
I specified the type parameter for clarity, but this is not necessary as the compiler can infer the type from argument provided:
IObservable<string> singleValue = Observable.Return("Value");
Return
produces a cold observable: each subscriber will receive the value immediately upon subscription. (Hot and cold observables were described in the preceding chapter.)
Observable.Empty
Sometimes it can be useful to have an empty sequence. .NET's Enumerable.Empty<T>()
does this for IEnumerable<T>
, and Rx has a direct equivalent in the form of Observable.Empty<T>()
, which returns an empty IObservable<T>
. We need to provide the type argument because there's no value from which the compiler can infer the type.
IObservable<string> empty = Observable.Empty<string>();
In practice, an empty sequence is one that immediately calls OnCompleted
on any subscriber.
In comparison with IEnumerable<T>
, this is just the Rx equivalent of an empty list, but there's another way to look at it. Rx is a powerful way to model asynchronous processes, so you could think of this as being similar to a task that completes immediately without producing any result—so it has a conceptual resemblance to Task.CompletedTask
. (This is not as close an analogy as that between Observable.Return
and Task.FromResult
, because in that case we're comparing an IObservable<T>
with a Task<T>
, whereas here we're comparing an IObservable<T>
with a Task
—the only way for a task to complete without producing anything is if we use the non-generic version of Task
.)
Observable.Never
The Observable.Never<T>()
method returns a sequence which, like Empty
, does not produce any values, but unlike Empty
, it never ends. In practice, that means that it never invokes any method (neither OnNext
, OnCompleted
, nor OnError
) on subscribers. Whereas Observable.Empty<T>()
completes immediately, Observable.Never<T>
has infinite duration.
IObservable<string> never = Observable.Never<string>();
It might not seem obvious why this could be useful. I gave one possible use in the last chapter: you could use this in a test to simulate a source that wasn't producing any values, perhaps to enable your test to validate timeout logic.
It can also be used in places where we use observables to represent time-based information. Sometimes we don't actually care what emerges from an observable; we might care only when something (anything) happens. (We saw an example of this "observable sequence used purely for timing purposes" concept in the preceding chapter, although Never
wouldn't make sense in that particular scenario. The Quiescent
example used the Buffer
operator, which works over two observable sequences: the first contains the items of interest, and the second is used purely to determine how to cut the first into chunks. Buffer
doesn't do anything with the values produced by the second observable: it pays attention only to when values emerge, completing the previous chunk each time the second observable produces a value. And if we're representing temporal information it can sometimes be useful to have a way to represent the idea that some event never occurs.)
As an example of where you might want to use Never
for timing purposes, suppose you were using some Rx-based library that offered a timeout mechanism, where an operation would be cancelled when some timeout occurs, and the timeout is itself modelled as an observable sequence. If for some reason you didn't want a timeout, and just want to wait indefinitely, you could specify a timeout of Observable.Never
.
Observable.Throw
Observable.Throw<T>(Exception)
returns a sequence that immediately reports an error to any subscriber. As with Empty
and Never
, we don't supply a value to this method (just an exception) so we need to provide a type parameter so that it knows what T
to use in the IObservable<T>
that it returns. (It will never actually a produce a T
, but you can't have an instance of IObservable<T>
without picking some particular type for T
.)
IObservable<string> throws = Observable.Throw<string>(new Exception());
Observable.Create
The Create
factory method is more powerful than the other creation methods because it can be used to create any kind of sequence. You could implement any of the preceding four methods with Observable.Create
.
The method signature itself may seem more complex than necessary at first, but becomes quite natural once you are used to it.
// Creates an observable sequence from a specified Subscribe method implementation.
public static IObservable<TSource> Create<TSource>(
Func<IObserver<TSource>, IDisposable> subscribe)
{...}
public static IObservable<TSource> Create<TSource>(
Func<IObserver<TSource>, Action> subscribe)
{...}
You provide this with a delegate that will be executed each time a subscription is made. Your delegate will be passed an IObserver<T>
. Logically speaking, this represents the observer passed to the Subscribe
method, although in practice Rx puts a wrapper around that for various reasons. You can call the OnNext
/OnError
/OnCompleted
methods as you need. This is one of the few scenarios where you will work directly with the IObserver<T>
interface. Here's a simple example that produces three items:
private IObservable<int> SomeNumbers()
{
return Observable.Create<int>(
(IObserver<int> observer) =>
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnCompleted();
return Disposable.Empty;
});
}
Your delegate must return either an IDisposable
or an Action
to enable unsubscription. When the subscriber disposes their subscription in order to unsubscribe, Rx will invoke Dispose()
on the IDisposable
you returned, or in the case where you returned an Action
, it will invoke that.
This example is reminiscent of the MySequenceOfNumbers
example from the start of this chapter, in that it immediately produces a few fixed values. The main difference in this case is that Rx adds some wrappers that can handle awkward situations such as re-entrancy. Rx will sometimes automatically defer work to prevent deadlocks, so it's possible that code consuming the IObservable<int>
returned by this method will see a call to Subscribe
return before the callback in the code above runs, in which case it would be possible for them to unsubscribe inside their OnNext
handler.
The following sequence diagram shows how this could occur in practice. Suppose the IObservable<int>
returned by SomeNumbers
has been wrapped by Rx in a way that ensures that subscription occurs in some different execution context. We'd typically determine the context by using a suitable scheduler. (The SubscribeOn
operator creates such a wrapper.) We might use the TaskPoolScheduler
in order to ensure that the subscription occurs on some task pool thread. So when our application code calls Subscribe
, the wrapper IObservable<int>
doesn't immediately subscribe to the underlying observable. Instead it queues up a work item with the scheduler to do that, and then immediately returns without waiting for that work to run. This is how our subscriber can be in possession of an IDisposable
representing the subscription before Observable.Create
invokes our callback. The diagram shows the subscriber then making this available to the observer.
The diagram shows the scheduler call Subscribe
on the underlying observable after this, and that will mean the call back we passed to Observable.Create<int>
will now run. Our callback calls OnNext
, but it is not passed the real observer: instead it is passed another Rx-generated wrapper. That wrapper initially forwards calls directly onto the real observer, but our diagram shows that when the real observer (all the way over on the right) receives the its second call (OnNext(2)
) it unsubscribes by calling Dispose
on the IDisposable
that was returned when we subscribed to the Rx IObservable
wrapper. The two wrappers here—the IObservable
and IObserver
wrappers—are connected, so when we unsubscribe from the IObservable
wrapper, it tells the IObserver
wrapper that the subscription is being shut down. This means that when our Observable.Create<int>
callback calls OnNext(3)
on the IObserver
wrapper, that wrapper does not forward it to the real observer, because it knows that that observer has already unsubscribed. (It also doesn't forward the OnCompleted
, for the same reason.)
You might be wondering how the IDisposable
we return to Observable.Create
can ever do anything useful. It's the return value of the callback, so we can only return it to Rx as the last thing our callback does. Won't we always have finished our work by the time we return, meaning there's nothing to cancel? Not necessarily—we might kick off some work that continues to run after we return. This next example does that, meaning that the unsubscription action it returns is able to do something useful: it sets a cancellation token that is being observed by the loop that generates our observable's output. (This returns a callback instead of an IDisposable
—Observable.Create
offers overloads that let you do either. In this case, Rx will invoke our callback when the subscription is terminated early.)
IObservable<char> KeyPresses() =>
Observable.Create<char>(observer =>
{
CancellationTokenSource cts = new();
Task.Run(() =>
{
while (!cts.IsCancellationRequested)
{
ConsoleKeyInfo ki = Console.ReadKey();
observer.OnNext(ki.KeyChar);
}
});
return () => cts.Cancel();
});
This illustrates how cancellation won't necessarily take effect immediately. The Console.ReadKey
API does not offer an overload accepting a CancellationToken
, so this observable won't be able to detect that cancellation is requested until the user next presses a key, causing ReadKey
to return.
Bearing in mind that cancellation might have been requested while we were waiting for ReadKey
to return, you might think we should check for that after ReadKey
returns and before calling OnNext
. In fact it doesn't matter if we don't. Rx has a rule that says an observable source must not call into an observer after a call to Dispose
on that observer's subscription returns. To enforce that rule, if the callback you pass to Observable.Create
continues to call methods on its IObserver<T>
after a request to unsubscribe, Rx just ignores the call. This is one reason why the IObserver<T>
it passes to you is a wrapper: it can intercept the calls before they are passed to the underlying observer. However, that convenience means there are two important things to be aware of
- if you do ignore attempts to unsubscribe and continue to do work to produce items, you are just wasting time because nothing will receive those items
- if you call
OnError
it's possible that nothing is listening and that the error will be completely ignored.
There are overloads of Create
designed to support async
methods. This next method exploits this to be able to use the asynchronous ReadLineAsync
method to present lines of text from a file as an observable source.
IObservable<string> ReadFileLines(string path) =>
Observable.Create<string>(async (observer, cancellationToken) =>
{
using (StreamReader reader = File.OpenText(path))
{
while (cancellationToken.IsCancellationRequested)
{
string? line = await reader.ReadLineAsync(cancellationToken).ConfigureAwait(false);
if (line is null)
{
break;
}
observer.OnNext(line);
}
observer.OnCompleted();
}
});
Reading data from a storage device typically doesn't happen instantaneously (unless it happens to be in the filesystem cache already), so this source will provide data as quickly as it can be read from storage.
Notice that because this is an async
method, it will typically return to its caller before it completes. (The first await
that actually has to wait returns, and the remainder of the method runs via a callback when the work completes.) That means that subscribers will typically be in possession of the IDisposable
representing their subscription before this method finishes, so we're using a different mechanism to handle unsubscription here. This particular overload of Create
passes its callback not just an IObserver<T>
but also a CancellationToken
, with which it will request cancellation when unsubscription occurs.
File IO can encounter errors. The file we're looking for might not exist, or we might be unable to open it due to security restrictions, or because some other application is using it. The file might be on a remote storage server, and we could lose network connectivity. For this reason, we must expect exceptions from such code. This example has done nothing to detect exceptions, and yet the IObservable<string>
that this ReadFileLines
method returns will in fact report any exceptions that occur. This is because the Create
method will catch any exception that emerges from our callback and report it with OnError
. (If our code already called OnComplete
on the observer, Rx won't call OnError
because that would violate the rules. Instead it will silently drop the exception, so it's best not to attempt to do any work after you call OnCompleted
.)
This automatic exception delivery is another example of why the Create
factory method is the preferred way to implement custom observable sequences. It is almost always a better option than creating custom types that implement the IObservable<T>
interface. This is not just because it saves you some time. It's also that Rx tackles the intricacies that you may not think of such as thread safety of notifications and disposal of subscriptions.
The Create
method entails lazy evaluation, which is a very important part of Rx. It opens doors to other powerful features such as scheduling and combination of sequences that we will see later. The delegate will only be invoked when a subscription is made. So in the ReadFileLines
example, it won't attempt to open the file until you subscribe to the IObservable<string>
that is returned. If you subscribe multiple times, it will execute the callback each time. (So if the file has changed, you can retrieve the latest contents by calling Subscribe
again.)
As an exercise, try to build the Empty
, Return
, Never
& Throw
extension methods yourself using the Create
method. If you have Visual Studio or LINQPad available to you right now, code it up as quickly as you can, or if you have Visual Studio Code, you could create a new Polyglot Notebook. (Polyglot Notebooks make Rx available automatically, so you can just write a C# cell with a suitable using
directive, and you're up and running.) If you don't (perhaps you are on the train on the way to work), try to conceptualize how you would solve this problem.
You completed that last step before moving onto this paragraph, right? Because you can now compare your versions with these examples of Empty
, Return
, Never
and Throw
recreated with Observable.Create
:
public static IObservable<T> Empty<T>()
{
return Observable.Create<T>(o =>
{
o.OnCompleted();
return Disposable.Empty;
});
}
public static IObservable<T> Return<T>(T value)
{
return Observable.Create<T>(o =>
{
o.OnNext(value);
o.OnCompleted();
return Disposable.Empty;
});
}
public static IObservable<T> Never<T>()
{
return Observable.Create<T>(o =>
{
return Disposable.Empty;
});
}
public static IObservable<T> Throws<T>(Exception exception)
{
return Observable.Create<T>(o =>
{
o.OnError(exception);
return Disposable.Empty;
});
}
You can see that Observable.Create
provides the power to build our own factory methods if we wish.
Observable.Defer
One very useful aspect of Observable.Create
is that it provides a place to put code that should run only when subscription occurs. Often, libraries will make IObservable<T>
properties available that won't necessarily be used by all applications, so it can be useful to defer the work involved until you know you will really need it. This deferred initialization is inherent to how Observable.Create
works, but what if the nature of our source means that Observable.Create
is not a good fit? How can we perform deferred initialization in that case? Rx providers Observable.Defer
for this purpose.
I've already used Defer
once. The ObserveFileSystem
method returned an IObservable<FileSystemEventArgs>
reporting changes in a folder. It was not a good candidate for Observable.Create
because it provided all the notifications we wanted as .NET events, so it made sense to use Rx's event adaptation features. But we still wanted to defer the creation of the FileSystemWatcher
until the moment of subscription, which is why that example used Observable.Defer
.
Observable.Defer
takes a callback that returns an IObservable<T>
, and Defer
wraps this with an IObservable<T>
that invokes that callback upon subscription. To show the effect, I'm first going to show an example that does not use Defer
:
static IObservable<int> WithoutDeferal()
{
Console.WriteLine("Doing some startup work...");
return Observable.Range(1, 3);
}
Console.WriteLine("Calling factory method");
IObservable<int> s = WithoutDeferal();
Console.WriteLine("First subscription");
s.Subscribe(Console.WriteLine);
Console.WriteLine("Second subscription");
s.Subscribe(Console.WriteLine);
This produces the following output:
Calling factory method
Doing some startup work...
First subscription
1
2
3
Second subscription
1
2
3
As you can see, the "Doing some startup work...
message appears when we call the factory method, and before we've subscribed. So if nothing ever subscribed to the IObservable<int>
that method returns, the work would be done anyway, wasting time and energy. Here's the Defer
version:
static IObservable<int> WithDeferal()
{
return Observable.Defer(() =>
{
Console.WriteLine("Doing some startup work...");
return Observable.Range(1, 3);
});
}
If we were to use this with similar code to the first example, we'd see this output:
Calling factory method
First subscription
Doing some startup work...
1
2
3
Second subscription
Doing some startup work...
1
2
3
There are two important differences. First, the "Doing some startup work..."
message does not appear until we first subscribe, illustrating that Defer
has done what we wanted. However, notice that the message now appears twice: it will do this work each time we subscribe. If you want this deferred initialization but you'd also like once-only execution, you should look at the operators in the Publishing Operators chapter, which provide various ways to enable multiple subscribers to share a single subscription to an underlying source.
Sequence Generators
The creation methods we've looked at so far are straightforward in that they either produce very simple sequences (such as single-element, or empty sequences), or they rely on our code to tell them exactly what to produce. Now we'll look at some methods that can produce longer sequences.
Observable.Range
Observable.Range(int, int)
returns an IObservable<int>
that produces a range of integers. The first integer is the initial value and the second is the number of values to yield. This example will write the values '10' through to '24' and then complete.
IObservable<int> range = Observable.Range(10, 15);
range.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
Observable.Generate
Suppose you wanted to emulate the Range
factory method using Observable.Create
. You might try this:
// Not the best way to do it!
IObservable<int> Range(int start, int count) =>
Observable.Create<int>(observer =>
{
for (int i = 0; i < count; ++i)
{
observer.OnNext(start + i);
}
return Disposable.Empty;
});
This will work, but it does not respect request to unsubscribe. That won't cause direct harm, because Rx detects unsubscription, and will simply ignore any further values we produce. However, it's a waste of CPU time (and therefore energy, with consequent battery lifetime and/or environmental impact) to carry on generating numbers after nobody is listening. How bad that is depends on how long a range was requested. But imagine you wanted an infinite sequence? Perhaps it's useful to you to have an IObservable<BigInteger>
that produces value from the Fibonacci sequence, or prime numbers. How would you write that with Create
? You'd certainly want some means of handling unsubscription in that case. We need our callback to return if we are to be notified of unsubscription (or we could supply an async
method, but that doesn't really seem suitable here).
There's a different approach that can work better here: Observable.Generate
. The simple version of Observable.Generate
takes the following parameters:
- an initial state
- a predicate that defines when the sequence should terminate
- a function to apply to the current state to produce the next state
- a function to transform the state to the desired output
public static IObservable<TResult> Generate<TState, TResult>(
TState initialState,
Func<TState, bool> condition,
Func<TState, TState> iterate,
Func<TState, TResult> resultSelector)
This shows how you could use Observable.Generate
to construct a Range
method:
// Example code only
public static IObservable<int> Range(int start, int count)
{
int max = start + count;
return Observable.Generate(
start,
value => value < max,
value => value + 1,
value => value);
}
The Generate
method calls us back repeatedly until either our condition
callback says we're done, or the observer unsubscribes. We can define an infinite sequence simply by never saying we are done:
IObservable<BigInteger> Fibonacci()
{
return Observable.Generate(
(v1: new BigInteger(1), v2: new BigInteger(1)),
value => true, // It never ends!
value => (value.v2, value.v1 + value.v2),
value => value.v1);
}
Timed Sequence Generators
Most of the methods we've looked at so far have returned sequences that produce all of their values immediately. (The only exception is where we called Observable.Create
and produced values when we were ready to.) However, Rx is able to generate sequences on a schedule.
As we'll see, operators that schedule their work do so through an abstraction called a scheduler. If you don't specify one, they will pick a default scheduler, but sometimes the timer mechanism is significant. For example, there are timers that integrate with UI frameworks, delivering notifications on the same thread that mouse clicks and other input are delivered on, and we might want Rx's time-based operators to use these. For testing purposes it can be useful to virtualize timings, so we can verify what happens in timing-sensitive code without necessarily waiting for tests to execute in real time.
Schedulers are a complex subject that is out of scope for this chapter, but they are covered in detail in the later chapter on Scheduling and threading.
There are three ways of producing timed events.
Observable.Interval
The first is Observable.Interval(TimeSpan)
which will publish incremental values starting from zero, based on a frequency of your choosing.
This example publishes values every 250 milliseconds.
IObservable<long> interval = Observable.Interval(TimeSpan.FromMilliseconds(250));
interval.Subscribe(
Console.WriteLine,
() => Console.WriteLine("completed"));
Output:
0
1
2
3
4
5
Once subscribed, you must dispose of your subscription to stop the sequence, because Interval
returns an infinite sequence. Rx presumes that you might have considerable patience, because the sequences returned by Interval
are of type IObservable<long>
(long
, not int
) meaning you won't hit problems if you produce more than a paltry 2.1475 billion event (i.e. more than int.MaxValue
).
Observable.Timer
The second factory method for producing constant time based sequences is Observable.Timer
. It has several overloads. The most basic one takes just a TimeSpan
as Observable.Interval
does. But unlike Observable.Interval
, Observable.Timer
will publish exactly one value (the number 0) after the period of time has elapsed, and then it will complete.
var timer = Observable.Timer(TimeSpan.FromSeconds(1));
timer.Subscribe(
Console.WriteLine,
() => Console.WriteLine("completed"));
Output:
0
completed
Alternatively, you can provide a DateTimeOffset
for the dueTime
parameter. This will produce the value 0 and complete at the specified time.
A further set of overloads adds a TimeSpan
that indicates the period at which to produce subsequent values. This allows us to produce infinite sequences. It also shows how Observable.Interval
is really just a special case of Observable.Timer
. Interval
could be implemented like this:
public static IObservable<long> Interval(TimeSpan period)
{
return Observable.Timer(period, period);
}
While Observable.Interval
will always wait the given period before producing the first value, this Observable.Timer
overload gives the ability to start the sequence when you choose. With Observable.Timer
you can write the following to have an interval sequence that starts immediately.
Observable.Timer(TimeSpan.Zero, period);
This takes us to our third way and most general way for producing timer related sequences, back to Observable.Generate
.
Timed Observable.Generate
There's a more complex overload of Observable.Generate
that allows you to provide a function that specifies the due time for the next value.
public static IObservable<TResult> Generate<TState, TResult>(
TState initialState,
Func<TState, bool> condition,
Func<TState, TState> iterate,
Func<TState, TResult> resultSelector,
Func<TState, TimeSpan> timeSelector)
The extra timeSelector
argument lets us tell Generate
when to produce the next item. We can use this to write our own implementation of Observable.Timer
(and as you've already seen, this in turn enables us to write our own Observable.Interval
).
public static IObservable<long> Timer(TimeSpan dueTime)
{
return Observable.Generate(
0l,
i => i < 1,
i => i + 1,
i => i,
i => dueTime);
}
public static IObservable<long> Timer(TimeSpan dueTime, TimeSpan period)
{
return Observable.Generate(
0l,
i => true,
i => i + 1,
i => i,
i => i == 0 ? dueTime : period);
}
public static IObservable<long> Interval(TimeSpan period)
{
return Observable.Generate(
0l,
i => true,
i => i + 1,
i => i,
i => period);
}
This shows how you can use Observable.Generate
to produce infinite sequences. I will leave it up to you the reader, as an exercise using Observable.Generate
, to produce values at variable rates.
Observable sequences and state
As Observable.Generate
makes particularly clear, observable sequences may need to maintain state. With that operator it is explicit—we pass in initial state, and we supply a callback to update it on each iteration. Plenty of other operators maintain internal state. The Timer
remembers its tick count, and more subtly, has to somehow keep track of when it last raised an event and when the next one is due. And as you'll see in forthcoming chapters, plenty of other operators need to remember information about what they've already seen.
This raises an interesting question: what happens if a process shuts down? Is there a way to preserve that state, and reconstitute it in a new process.
With ordinary Rx.NET, the answer is no: all such state is held entirely in memory and there is no way to get hold of that state, or to ask running subscriptions to serialize their current state. This means that if you are dealing with particularly long-running operations you need to work out how you would restart and you can't rely on System.Reactive
to help you. However, there is a related Rx-based set of libraries known collectively as the Reaqtive libraries. These provide implementations of most of the same operators as System.Reactive
, but in a form where you can collect the current state, and recreate new subscriptions from previously preserved state. These libraries also include a component called Reaqtor, which is a hosting technology that can manage automatic checkpointing, and post-crash recovery, making it possible to support very long-running Rx logic, by making subscriptions persistent and reliable. Be aware that this is not currently in any productised form, so you will need to do a fair amount of work to use it, but if you need a persistable version of Rx, be aware that it exists.
Adapting Common Types to IObservable<T>
Although we've now seen two very general ways to produce arbitrary sequences—Create
and Generate
—what if you already have an existing source of information in some other form that you'd like to make available as an IObservable<T>
? Rx provides a few adapters for common source types.
From delegates
The Observable.Start
method allows you to turn a long running Func<T>
or Action
into a single value observable sequence. The action is invoked through a scheduler. If you don't pass a scheduler explicitly, this will use the DefaultScheduler
, which invokes the callback via the thread pool. If the overload you use is a Func<T>
then the return type will be IObservable<T>
. When the function returns its value, the IObservable<T>
, will supply that value to subscribers and then complete immediately after supplying the value. (The IObservable<T>
that Start
returns is based on AsyncSubject
, so if you subscribe to it after the callback has completed, it will immediately supply the value and then complete.) If you use the overload that takes an Action
, then the returned sequence will be of type IObservable<Unit>
. The Unit
type represents the absence of information, so it's somewhat analogous to void
, except you can have an instance of the Unit
type. It's particularly useful in Rx because we often care only about when something has happened, and there might not be any information besides timing. In these cases, we often use an IObservable<Unit>
so that it's possible to produce definite events even though there's no meaningful data in them. (The name comes from the world of functional programming, where this kind of construct is used a lot.) In this case, Unit
is used to publish an acknowledgement that the Action
is complete, because an Action
does not return any information. The Unit
type itself has no value; it just serves as an empty payload for the OnNext
notification. Below is an example of using both overloads.
static void StartAction()
{
var start = Observable.Start(() =>
{
Console.Write("Working away");
for (int i = 0; i < 10; i++)
{
Thread.Sleep(100);
Console.Write(".");
}
});
start.Subscribe(
unit => Console.WriteLine("Unit published"),
() => Console.WriteLine("Action completed"));
}
static void StartFunc()
{
var start = Observable.Start(() =>
{
Console.Write("Working away");
for (int i = 0; i < 10; i++)
{
Thread.Sleep(100);
Console.Write(".");
}
return "Published value";
});
start.Subscribe(
Console.WriteLine,
() => Console.WriteLine("Action completed"));
}
Note the difference between Observable.Start
and Observable.Return
. Return
requires us to supply the value up front, whereas Start
returns an observable sequence immediately, without needing the value to be available. (Although Start
doesn't wait for the callback to complete, it does invoke it immediately. So this is not lazy evaluation—if you want to supply a callback that will be invoked only when someone subscribes to the source, use Defer
.)
The observable returned by Start
can be thought of as representing the same basic idea as Task
or Task<T>
(depending on whether you use the Action
or Func<T>
overload). Each represents work that may take some time before eventually completing, perhaps producing a result. So Start
is useful if you want that basic idea, but for it to be represented as an IObservable<T>
instead of a Task
or Task<T>
.
From events
As we discussed early in the book, .NET has a model for events that is baked into its type system. This predates Rx (not least because Rx wasn't feasible until .NET got generics in .NET 2.0) so it's common for types to support events but not Rx. To be able to integrate with the existing event model, Rx provides methods to take an event and turn it into an observable sequence. I showed this briefly in the file system watcher example earlier, but let's examine this in a bit more detail. There are several different varieties you can use. This show the most succinct form:
FileSystemWatcher watcher = new (@"c:\incoming");
IObservable<EventPattern<FileSystemEventArgs>> changeEvents = Observable
.FromEventPattern<FileSystemEventArgs>(watcher, nameof(watcher.Changed));
If you have an object that provides an event, you can use this overload of FromEventPattern
, passing in the object and the name of the event that you'd like to use with Rx. Although this is the simplest way to adapt events into Rx's world, it has a few problems.
Firstly, why do I need to pass the event name as a string? Identifying members with strings is an error-prone technique. The compiler won't notice if there's a mismatch between the first and second argument (e.g., if I passed the arguments (somethingElse, nameof(watcher.Changed))
by mistake). Couldn't I just pass watcher.Changed
itself? Unfortunately not—this is an example of the issue I mentioned in the first chapter: .NET events are not first class citizens. We can't use them in the way we can use other objects or values. For example, we can't pass an event as an argument to a method. In fact the only thing you can do with a .NET event is attach and remove event handlers. If I want to get some other method to attach handlers to the event of my choosing (e.g., here I want Rx to handle the events), then the only way to do that is to specify the event's name so that the method (FromEventPattern
) can then use reflection to attach its own handlers.
This is a problem for some deployment scenarios. It is increasingly common in .NET to do extra work at build time to optimize runtime behaviour, and reliance on reflection can compromise these techniques. For example, instead of relying on Just In Time (JIT) compilation of code, we might use Ahead of Time (AOT) mechanisms. .NET's Ready to Run (R2R) system enables you to include pre-compiled code targeting specific CPU types alongside the normal IL, avoiding having to wait for .NET to compile the IL into runnable code. This can have a significant effect on startup times. In client side applications, it can fix problems where applications are sluggish when they first start up. It can also be important in server-side applications, especially in environments where code may be moved from one compute node to another fairly frequently, making it important to minimize cold start costs. There are also scenarios where JIT compilation is not even an option, in which case AOT compilation isn't merely an optimization: it's the only means by which code can run at all.
The problem with reflection is that it makes it difficult for the build tools to work out what code will execute at runtime. When they inspect this call to FromEventPattern
they will just see arguments of type object
and string
. It's not self-evident that this is going to result in reflection-driven calls to the add
and remove
methods for FileSystemWatcher.Changed
at runtime. There are attributes that can be used to provide hints, but there are limits to how well these can work. Sometimes the build tools will be unable to determine what code would need to be AOT compiled to enable this method to execute without relying on runtime JIT.
There's another, related problem. The .NET build tools support a feature called 'trimming', in which they remove unused code. The System.Reactive.dll
file is about 1.3MB in size, but it would be a very unusual application that used every member of every type in that component. Basic use of Rx might need only a few tens of kilobytes. The idea with trimming is to work out which bits are actually in use, and produce a copy of the DLL that contains only that code. This can dramatically reduce the volume of code that needs to be deployed for an executable to run. This can be especially important in client-side Blazor applications, where .NET components end up being downloaded by the browser. Having to download an entire 1.3MB component might make you think twice about using it. But if trimming means that basic usage requires only a few tens of KB, and that the size would increase only if you were making more extensive use of the component, that can make it reasonable to use a component that would, without trimming, have imposed too large a penalty to justify its inclusion. But as with AOT compilation, trimming can only work if the tools can determine which code is in use. If they can't do that, it's not just a case of falling back to a slower path, waiting while the relevant code gets JIT compiler. If code has been trimmed, it will be unavailable at runtime, and your application might crash with a MissingMethodException
.
So reflection-based APIs can be problematic if you're using any of these techniques. Fortunately, there's an alternative. We can use an overload that takes a couple of delegates, and Rx will invoke these when it wants to add or remove handlers for the event:
IObservable<EventPattern<FileSystemEventArgs>> changeEvents = Observable
.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => watcher.Changed += h,
h => watcher.Changed -= h);
This is code that AOT and trimming tools can understand easily. We've written methods that explicitly add and remove handlers for the FileSystemWatcher.Changed
event, so AOT tools can pre-compile those two methods, and trimming tools know that they cannot remove the add and remove handlers for those events.
The downside is that this is a pretty cumbersome bit of code to write. If you've not already bought into the idea of using Rx, this might well be enough to make you think "I'll just stick with ordinary .NET events, thanks." But the cumbersome nature is a symptom of what is wrong with .NET events. We wouldn't have had to write anything so ugly if events had been first class citizens in the first place.
Not only has that second-class status meant we couldn't just pass the event itself as an argument, it has also meant that we've had to state type arguments explicitly. The relationship between an event's delegate type (FileSystemEventHandler
in this example) and its event argument type (FileSystemEventArgs
here) is, in general, not something that C#'s type inference can determine automatically, which is why we've had to specify both types explicitly. (Events that use the generic EventHandler<T>
type are more amenable to type inference, and can use a slightly less verbose version of FromEventPattern
. Unfortunately, relatively few events actually use that. Some events provide no information besides the fact that something just happened, and use the base EventHandler
type, and for those kinds of events, you can in fact omit the type arguments completely, making the code slightly less ugly. You still need to provide the add and remove callbacks though.)
Notice that the return type of FromEventPattern
in this example is:
IObservable<EventPattern<FileSystemEventArgs>>
.
The EventPattern<T>
type encapsulates the information that the event passes to handlers. Most .NET events follow a common pattern in which handler methods take two arguments: an object sender
, which just tells you which object raised the event (useful if you attach one event handler to multiple objects) and then a second argument of some type derived from EventArgs
that provides information about the event. EventPattern<T>
just packages these two arguments into a single object that offers Sender
and EventArgs
properties. In cases where you don't in fact want to attach one handler to multiple sources, you only really need that EventArgs
property, which is why the earlier FileSystemWatcher
examples went on to extract just that, to get a simpler result of type IObservable<FileSystemEventArgs>
. It did this with the Select
operator, which we'll get to in more detail later:
IObservable<FileSystemEventArgs> changes = changeEvents.Select(ep => ep.EventArgs);
It is very common to want to expose property changed events as observable sequences. The .NET runtime libraries define a .NET-event-based interface for advertising property changes, INotifyPropertyChanged
, and some user interface frameworks have more specialized systems for this, such as WPF's DependencyProperty
. If you are contemplating writing your own wrappers to do this sort of thing, I would strongly suggest looking at the Reactive UI libraries first. It has a set of features for wrapping properties as IObservable<T>
.
From Task
The Task
and Task<T>
types are very widely used in .NET. Mainstream .NET languages have built-in support for working with them (e.g., C#'s async
and await
keywords). There's some conceptual overlap between tasks and IObservable<T>
: both represent some sort of work that might take a while to complete. There is a sense in which an IObservable<T>
is a generalization of a Task<T>
: both represent potentially long-running work, but an IObservable<T>
can produce multiple results whereas Task<T>
can produce just one.
Since IObservable<T>
is the more general abstraction, we should be able to represent a Task<T>
as an IObservable<T>
. Rx defines various extension methods for Task
and Task<T>
to do this. These methods are all called ToObservable()
, and it offers various overloads offering control of the details where required, and simplicity for the most common scenarios.
Although they are conceptually similar, Task<T>
does a few things differently in the details. For example, you can retrieve its Status
property, which might report that it is in a cancelled or faulted state. IObservable<T>
doesn't provide a way to ask a source for its state; it just tells you things. So ToObservable
makes some decisions about how to present status in a way that makes makes sense in an Rx world:
- if the task is Cancelled,
IObservable<T>
invokes a subscriber'sOnError
passing aTaskCanceledException
- if the task is Faulted
IObservable<T>
invokes a subscriber'sOnError
passing the task's inner exception - if the task is not yet in a final state (neither Cancelled, Faulted, or RanToCompletion), the
IObservable<T>
will not produce any notifications until such time as the task does enter one of these final states
It does not matter whether the task is already in a final state at the moment that you call ToObservable
. If it has finished, ToObservable
will just return a sequence representing that state. (In fact, it uses either the Return
or Throw
creation methods you saw earlier.) If the task has not yet finished, ToObservable
will attach a continuation to the task to detect the outcome once it does complete.
Tasks come in two forms: Task<T>
, which produces a result, and Task
, which does not. But in Rx, there is only IObservable<T>
—there isn't a no-result form. We've already seen this problem once before, when the Observable.Start
method needed to be able to adapt a delegate as an IObservable<T>
even when the delegate was an Action
that produced no result. The solution was to return an IObservable<Unit>
, and that's also exactly what you get when you call ToObservable
on a plain Task
.
The extension method is simple to use:
Task<string> t = Task.Run(() =>
{
Console.WriteLine("Task running...");
return "Test";
});
IObservable<string> source = t.ToObservable();
source.Subscribe(
Console.WriteLine,
() => Console.WriteLine("completed"));
source.Subscribe(
Console.WriteLine,
() => Console.WriteLine("completed"));
Here's the output.
Task running...
Test
completed
Test
completed
Notice that even with two subscribers, the task runs only once. That shouldn't be surprising since we only created a single task. If the task has not yet finished, then all subscribers will receive the result when it does. If the task has finished, the IObservable<T>
effectively becomes a single-value cold observable.
One Task per subscription
There's a different way to get an IObservable<T>
for a source. I can replace the first statement in the preceding example with this:
IObservable<string> source = Observable.FromAsync(() => Task.Run(() =>
{
Console.WriteLine("Task running...");
return "Test";
}));
Subscribing twice to this produces slightly different output:
Task running...
Task running...
Test
Test
completed
completed
Notice that this executes the task twice, once for each call to Subscribe
. FromAsync
can do this because instead of passing a Task<T>
we pass a callback that returns a Task<T>
. It calls that when we call Subscribe
, so each subscriber essentially gets their own task.
If I want to use async
and await
to define my task, then I don't need to bother with the Task.Run
because an async
lambda creates a Func<Task<T>>
, which is exactly the type FromAsync
wants:
IObservable<string> source = Observable.FromAsync(async () =>
{
Console.WriteLine("Task running...");
await Task.Delay(50);
return "Test";
});
This produces exactly the same output as before. There is a subtle difference with this though. When I used Task.Run
the lambda ran on a task pool thread from the start. But when I write it this way, the lambda will begin to run on whatever thread calls Subscribe
. It's only when it hits the first await
that it returns (and the call to Subscribe
will then return), with the remainder of the method running on the thread pool.
From IEnumerable<T>
Rx defines another extension method called ToObservable
, this time for IEnumerable<T>
. In earlier chapters I described how IObservable<T>
was designed to represent the same basic abstraction as IEnumerable<T>
, with the only difference being the mechanism we use to obtain the elements in the sequence: with IEnumerable<T>
, we write code that pulls values out of the collection (e.g., a foreach
loop), whereas IObservable<T>
pushes values to us by invoking OnNext
on our IObserver<T>
.
We could write code that bridges from pull to push:
// Example code only - do not use!
public static IObservable<T> ToObservableOversimplified<T>(this IEnumerable<T> source)
{
return Observable.Create<T>(o =>
{
foreach (var item in source)
{
o.OnNext(item);
}
o.OnComplete();
// Incorrectly ignoring unsubscription.
return Disposable.Empty;
});
}
This crude implementation conveys the basic idea, but it is naive. It does not attempt to handle unsubscription, and it's not easy to fix that when using Observable.Create
for this particular scenario. And as we will see later in the book, Rx sources that might try to deliver large numbers of events in quick succession should integrate with Rx's concurrency model. The implementation that Rx supplies does of course cater for all of these tricky details. That makes it rather more complex, but that's Rx's problem; you can think of it as being logically equivalent to the code shown above, but without the shortcomings.
In fact this is a recurring theme throughout Rx.NET. Many of the built-in operators are useful not because they do something particularly complicated, but because they deal with many subtle and tricky issues for you. You should always try to find something built into Rx.NET that does what you need before considering rolling your own solution.
When transitioning from IEnumerable<T>
to IObservable<T>
, you should carefully consider what you are really trying to achieve. Consider that the blocking synchronous (pull) nature of IEnumerable<T>
does always not mix well with the asynchronous (push) nature of IObservable<T>
. As soon as something subscribes to an IObservable<T>
created in this way, it is effectively asking to iterate over the IEnumerable<T>
, immediately producing all of the values. The call to Subscribe
might not return until it has reached the end of the IEnumerable<T>
, making it similar to the very simple example shown at the start of this chapter. (I say "might" because as we'll see when we get to schedulers, the exact behaviour depends on the context.) ToObservable
can't work magic—something somewhere has to execute what amounts to a foreach
loop.
So although this can be a convenient way to bring sequences of data into an Rx world, you should carefully test and measure the performance impact.
From APM
Rx provides support for the ancient .NET Asynchronous Programming Model (APM). Back in .NET 1.0, this was the only pattern for representing asynchronous operations. It was superseded in 2010 when .NET 4.0 introduced the Task-based Asynchronous Pattern (TAP). The old APM offers no benefits over the TAP. Moreover, C#'s async
and await
keywords (and equivalents in other .NET languages) only support the TAP, meaning that the APM is best avoided. However, the TAP was fairly new back in 2011 when Rx 1.0 was released, so it offered adapters for presenting an APM implementation as an IObservable<T>
.
Nobody should be using the APM today, but for completeness (and just in case you have to use an ancient library that only offers the APM) I will provide a very brief explanation of Rx's support for it.
The result of the call to Observable.FromAsyncPattern
does not return an observable sequence. It returns a delegate that returns an observable sequence. (So it is essentially a factory factory.) The signature for this delegate will match the generic arguments of the call to FromAsyncPattern
, except that the return type will be wrapped in an observable sequence. The following example wraps the Stream
class's BeginRead
/EndRead
methods (which are an implementation of the APM).
Note: this is purely to illustrate how to wrap the APM. You would never do this in practice because Stream
has supported the TAP for years.
Stream stream = GetStreamFromSomewhere();
var fileLength = (int) stream.Length;
Func<byte[], int, int, IObservable<int>> read =
Observable.FromAsyncPattern<byte[], int, int, int>(
stream.BeginRead,
stream.EndRead);
var buffer = new byte[fileLength];
IObservable<int> bytesReadStream = read(buffer, 0, fileLength);
bytesReadStream.Subscribe(byteCount =>
{
Console.WriteLine(
"Number of bytes read={0}, buffer should be populated with data now.",
byteCount);
});
Subjects
So far, this chapter has explored various factory methods that return IObservable<T>
implementations. There is another way though: System.Reactive
defines various types that implement IObservable<T>
that we can instantiate directly. But how do we determine what values these types produce? We're able to do that because they also implement IObserver<T>
, enabling us to push values into them, and those very same values we push in will be the ones seen by observers.
Types that implement both IObservable<T>
and IObserver<T>
are called subjects in Rx. There's an ISubject<T>
to represent this. (This is in the System.Reactive
NuGet package, unlike IObservable<T>
and IObserver<T>
, which are both built into the .NET runtime libraries.) ISubject<T>
looks like this:
public interface ISubject<T> : ISubject<T, T>
{
}
So it turns out there's also a two-argument ISubject<TSource, TResult>
to accommodate the fact that something that is both an observer and an observable might transform the data that flows through it in some way, meaning that the input and output types are not necessarily the same. Here's the two-type-argument definition:
public interface ISubject<in TSource, out TResult> : IObserver<TSource>, IObservable<TResult>
{
}
As you can see the ISubject
interfaces don't define any members of their own. They just inherit from IObserver<T>
and IObservable<T>
—these interfaces are nothing more than a direct expression of the fact that a subject is both an observer and an observable.
But what is this for? You can think of IObserver<T>
and the IObservable<T>
as the 'consumer' and 'publisher' interfaces respectively. A subject, then is both a consumer and a publisher. Data flows both into and out of a subject.
Rx offers a few subject implementations that can occasionally be useful in code that wants to make an IObservable<T>
available. Although Observable.Create
is usually the preferred way to do this, there's one important case where a subject might make more sense: if you have some code that discovers events of interest (e.g., by using the client API for some messaging technology) and wants to make them available through an IObservable<T>
, subjects can sometimes provide a more convenient way to to this than with Observable.Create
or a custom implementation.
Rx offers a few subject types. We'll start with the most straightforward one to understand.
Subject<T>
The Subject<T>
type immediately forwards any calls made to its IObserver<T>
methods on to all of the observers currently subscribed to it. This example shows its basic operation:
Subject<int> s = new();
s.Subscribe(x => Console.WriteLine($"Sub1: {x}"));
s.Subscribe(x => Console.WriteLine($"Sub2: {x}"));
s.OnNext(1);
s.OnNext(2);
s.OnNext(3);
I've created a Subject<int>
. I've subscribed to it twice, and then called its OnNext
method repeatedly. This produces the following output, illustrating that the Subject<int>
forwards each OnNext
call onto both subscribers:
Sub1: 1
Sub2: 1
Sub1: 2
Sub2: 2
Sub1: 3
Sub2: 3
We could use this as a way to bridge between some API from which we receive data into the world of Rx. You could imagine writing something of this kind:
public class MessageQueueToRx : IDisposable
{
private readonly Subject<string> messages = new();
public IObservable<string> Messages => messages;
public void Run()
{
while (true)
{
// Receive a message from some hypothetical message queuing service
string message = MqLibrary.ReceiveMessage();
messages.OnNext(message);
}
}
public void Dispose()
{
message.Dispose();
}
}
It wouldn't be too hard to modify this to use Observable.Create
instead. But where this approach can become easier is if you need to provide multiple different IObservable<T>
sources. Imagine we distinguish between different message types based on their content, and publish them through different observables. That's hard to arrange with Observable.Create
if we still want a single loop pulling messages off the queue.
Subject<T>
also distributes calls to either OnCompleted
or OnError
to all subscribers. Of course, the rules of Rx require that once you have called either of these methods on an IObserver<T>
(and any ISubject<T>
is an IObserver<T>
, so this rule applies to Subject<T>
) you must not call OnNext
, OnError
, or OnComplete
on that observer ever again. In fact, Subject<T>
will tolerate calls that break this rule—it just ignores them, so even if your code doesn't quite stick to these rules internally, the IObservable<T>
you present to the outside world will behave correctly, because Rx enforces this.
Subject<T>
implements IDisposable
. Disposing a Subject<T>
puts it into a state where it will throw an exception if you call any of its methods. The documentation also describes it as unsubscribing all observers, but since a disposed Subject<T>
isn't capable of producing any further notifications in any case, this doesn't really mean much. (Note that it does not call OnCompleted
on its observers when you Dispose
it.) The one practical effect is that its internal field that keeps track of observers is reset to a special sentinel value indicating that it has been disposed, meaning that the one externally observable effect of "unsubscribing" the observers is that if, for some reason, your code held onto a reference to a Subject<T>
after disposing it, that would no longer keep all the subscribers reachable for GC purposes. If a Subject<T>
remains reachable indefinitely after it is no longer in use, that in itself is effectively a memory leak, but disposal would at least limit the effects: only the Subject<T>
itself would remain reachable, and not all of its subscribers.
Subject<T>
is the most straightforward subject, but there are other, more specialized ones.
ReplaySubject<T>
Subject<T>
does not remember anything: it immediately distributes incoming values to subscribers. If new subscribers come along, they will only see events that occur after they subscribe. ReplaySubject<T>
, on the other hand, can remember every value it has ever seen. If a new subject comes along, it will receive the complete history of events so far.
This is a variation on the first example in the preceding Subject<T>
section. It creates a ReplaySubject<int>
instead of a Subject<int>
. And instead of immediately subscribing twice, it creates an initial subscription, and then a second one only after a couple of values have been emitted.
ReplaySubject<int> s = new();
s.Subscribe(x => Console.WriteLine($"Sub1: {x}"));
s.OnNext(1);
s.OnNext(2);
s.Subscribe(x => Console.WriteLine($"Sub2: {x}"));
s.OnNext(3);
This produces the following output:
Sub1: 1
Sub1: 2
Sub2: 1
Sub2: 2
Sub1: 3
Sub2: 3
As you'd expect, we initially see output only from Sub1
. But when we make the second call to subscribe, we can see that Sub2
also received the first two values. And then when we report the third value, both see it. If this example had used Subject<int>
instead, we would have seen just this output:
Sub1: 1
Sub1: 2
Sub1: 3
Sub2: 3
There's an obvious potential problem here: if ReplaySubject<T>
remembers every value published to it, we mustn't use it with endless event sources, because it will eventually cause us to run out of memory.
ReplaySubject<T>
offers constructors that accept simple cache expiry settings that can limit memory consumption. One option is to specify the maximum number of item to remember. This next example creates a ReplaySubject<T>
with a buffer size of 2:
ReplaySubject<int> s = new(2);
s.Subscribe(x => Console.WriteLine($"Sub1: {x}"));
s.OnNext(1);
s.OnNext(2);
s.OnNext(3);
s.Subscribe(x => Console.WriteLine($"Sub2: {x}"));
s.OnNext(4);
Since the second subscription only comes along after we've already produced 3 values, it no longer sees all of them. It only receives the last two values published prior to subscription (but the first subscription continues to see everything of course):
Sub1: 1
Sub1: 2
Sub1: 3
Sub2: 2
Sub2: 3
Sub1: 4
Sub2: 4
Alternatively, you can specify a time-based limit by passing a TimeSpan
to the ReplaySubject<T>
constructor.
BehaviorSubject<T>
Like ReplaySubject<T>
, BehaviorSubject<T>
also has a memory, but it remembers exactly one value. However, it's not quite the same as a ReplaySubject<T>
with a buffer size of 1. Whereas a ReplaySubject<T>
starts off in a state where it has nothing in its memory, BehaviorSubject<T>
always remembers exactly one item. How can that work before we've made our first call to OnNext
? BehaviorSubject<T>
enforces this by requiring us to supply the initial value when we construct it.
So you can think of BehaviorSubject<T>
as a subject that always has a value available. If you subscribe to a BehaviorSubject<T>
it will instantly produce a single value. (It may then go on to produce more values, but it always produces one right away.) As it happens, it also makes that value available through a property called Value
, so you don't need to subscribe an IObserver<T>
to it just to retrieve the value.
A BehaviorSubject<T>
could be thought of an as observable property. Like a normal property, it can immediately supply a value whenever you ask it. The difference is that it can then go on to notify you every time its value changes. If you're using the ReactiveUI framework (an Rx-based framework for building user interfaces), BehaviourSubject<T>
can make sense as the implementation type for a property in a view model (the type that mediates between your underlying domain model and your user interface). It has property-like behaviour, enabling you to retrieve a value at any time, but it also provides change notifications, which ReactiveUI can handle in order to keep the UI up to date.
This analogy falls down slightly when it comes to completion. If you call OnCompleted
, it immediately calls OnCompleted
on all of its observers, and if any new observers subscribe, they will also immediately be completed—it does not first supply the last value. (So this is another way in which it is different from a ReplaySubject<T>
with a buffer size of 1.)
Similarly, if you call OnError
, all current observers will receive an OnError
call, and any subsequent subscribers will also receive nothing but an OnError
call.
AsyncSubject<T>
AsyncSubject<T>
provides all observers with the final value it receives. Since it can't know which is the final value until OnCompleted
is called, it will not invoke any methods on any of its subscribers until either its OnCompleted
or OnError
method is called. (If OnError
is called, it just forwards that to all current and future subscribers.) You will often use this subject indirectly, because it is the basis of Rx's integration with the await
keyword. (When you await
an observable sequence, the await
returns the final value emitted by the source.)
If no calls were made to OnNext
before OnCompleted
then there was no final value, so it will just complete any observers without providing a value.
In this example no values will be published as the sequence never completes. No values will be written to the console.
AsyncSubject<string> subject = new();
subject.OnNext("a");
subject.Subscribe(x => Console.WriteLine($"Sub1: {x}"));
subject.OnNext("b");
subject.OnNext("c");
In this example we invoke the OnCompleted
method so there will be a final value ('c') for the subject to produce:
AsyncSubject<string> subject = new();
subject.OnNext("a");
subject.Subscribe(x => Console.WriteLine($"Sub1: {x}"));
subject.OnNext("b");
subject.OnNext("c");
subject.OnCompleted();
subject.Subscribe(x => Console.WriteLine($"Sub2: {x}"));
This produces the following output:
Sub1: c
Sub2: c
If you have some potentially slow work that needs to be done when your application starts up, and which needs to be done just once, you might choose an AsyncSubject<T>
to make the results of that work available. Code requiring those results can subscribe to the subject. If the work is not yet complete, they will receive the results as soon as they are available. And if the work has already completed, they will receive it immediately.
Subject factory
Finally it is worth making you aware that you can also create a subject via a factory method. Considering that a subject combines the IObservable<T>
and IObserver<T>
interfaces, it seems sensible that there should be a factory that allows you to combine them yourself. The Subject.Create(IObserver<TSource>, IObservable<TResult>)
factory method provides just this.
// Creates a subject from the specified observer used to publish messages to the
// subject and observable used to subscribe to messages sent from the subject
public static ISubject<TSource, TResult> Create<TSource, TResult>(
IObserver<TSource> observer,
IObservable<TResult> observable)
{...}
Note that unlike all of the other subjects just discussed, this creates a subject where there is no inherent relationship between the input and the output. This just takes whatever IObserver<TSource>
and IObserver<TResult>
implementations you supply and wraps them up in a single object. All calls made to the subject's IObserver<TSource>
methods will be passed directly to the observer you supplied. If you want values to emerge to subscribers to the corresponding IObservable<TResult>
, it's up to you to make that happen. This really combines the two objects you supply with the absolute minimum of glue.
Subjects provide a convenient way to poke around Rx, and are occasionally useful in production scenarios, but they are not recommended for most cases. An explanation is in the Usage Guidelines appendix. Instead of using subjects, favour the factory methods shown earlier in this chapter..
Summary
We have looked at the various eager and lazy ways to create a sequence. We have seen how to produce timer based sequences using the various factory methods. And we've also explored ways to transition from other synchronous and asynchronous representations.
As a quick recap:
Factory Methods
- Observable.Return
- Observable.Empty
- Observable.Never
- Observable.Throw
- Observable.Create
- Observable.Defer
Generative methods
- Observable.Range
- Observable.Generate
- Observable.Interval
- Observable.Timer
Adaptation
- Observable.Start
- Observable.FromEventPattern
- Task.ToObservable
- Task<T>.ToObservable
- IEnumerable<T>.ToObservable
- Observable.FromAsyncPattern
Creating an observable sequence is our first step to practical application of Rx: create the sequence and then expose it for consumption. Now that we have a firm grasp on how to create an observable sequence, we can look in more detail at the operators that allow us to describe processing to be applied, to build up more complex observable sequences.