Skip to content

Time-based sequences

With event sources, timing is often important. In some cases, the only information of interest about some event might be the time at which it occurred. The core IObservable<T> and IObserver<T> interfaces don't mention timing at all in their method signatures, but they don't need to, because a source can decide when it calls an observer's OnNext method. A subscriber knows when an event occurred because it is occurring right now. This isn't always the most convenient way in which to work with timing, so the Rx library provides some timing-related operators. We've already seen a couple of operators that offer optional time-based operation: Buffer and Window. This chapter looks at the various operators that are all about timing.

Timestamp and TimeInterval

As observable sequences are asynchronous it can be convenient to know when elements are received. Obviously, a subscriber can always just use DateTimeOffset.Now, but if you want to refer to the arrival time as part of a larger query, the Timestamp extension method is a handy convenience method that attaches a timestamp to each element. It wraps elements from its source sequence in a light weight Timestamped<T> structure. The Timestamped<T> type is a struct that exposes the value of the element it wraps, and also a DateTimeOffset indicating when Timestamp operator received it.

In this example we create a sequence of three values, one second apart, and then transform it to a time stamped sequence.

Observable.Interval(TimeSpan.FromSeconds(1))
          .Take(3)
          .Timestamp()
          .Dump("Timestamp");

As you can see, Timestamped<T>'s implementation of ToString() gives us a readable output.

Timestamp-->0@07/08/2023 10:03:58 +00:00
Timestamp-->1@07/08/2023 10:03:59 +00:00
Timestamp-->2@07/08/2023 10:04:00 +00:00
TimeStamp completed

We can see that the values 0, 1, and 2 were each produced one second apart.

Rx also offers TimeInterval. Instead of reporting the time at which items arrived, it reports the interval between items (or, in the case of the first element, how long it took for that to emerge after subscription). Similarly to the Timestamp method, elements are wrapped in a light weight structure. But whereas Timestamped<T> adorned each item with the arrival time, TimeInterval wraps each element with the TimeInterval<T> type which adds a TimeSpan. We can modify the previous example to use TimeInterval:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Take(3)
          .TimeInterval()
          .Dump("TimeInterval");

As you can see, the output now reports the time between elements instead of the time of day at which they were received:

Timestamp-->0@00:00:01.0183771
Timestamp-->1@00:00:00.9965679
Timestamp-->2@00:00:00.9908958
Timestamp completed

As you can see from the output, the timings are not exactly one second but are pretty close. Some of this will be measurement noise in the TimeInterval operator, but most of this variability is likely to arise from the Observable.Interval class. There will always be a limit to the precision with which a scheduler can honour the timing request of it. Some scheduler introduce more variation than others. The schedulers that deliver work via a UI thread are ultimately limited by how quickly that thread's message loop responds. But even in the most favourable condition, schedulers are limited by the fact that .NET is not built for use in real-time systems (and nor are most of the operating systems Rx can be used on). So with all of the operators in this section, you should be aware that timing is always a best effort affair in Rx.

In fact, the inherent variations in timing can make Timestamp particularly useful. The problem with simply looking at DateTimeOffset.Now is that it takes a non-zero amount of time to process an event, so you'll likely see a slightly different time each time you try to read the current time during the processing of one event. By attaching a timestamp once, we capture the time at which the event was observed, and then it doesn't matter how much delay downstream processing adds. The event will be annotated with a single, fixed time indicating when it passed through Timestamp.

Delay

The Delay extension method time-shifts an entire sequence. Delay attempts to preserve the relative time intervals between the values. There is inevitably a limit to the precision with which it can do this—it won't recreate timing down to the nearest nanosecond. The exact precision is determined by the scheduler you use, and will typically get worse under heavy load, but it will typically reproduce timings to within a few milliseconds.

There are overloads of Delay offering various different ways to specify the time shift. (With all the options, you can optionally pass a scheduler, but if you call the overloads that don't take one, it defaults to DefaultScheduler.) The most straightforward is to pass a TimeSpan, which will delay the sequence by the specified amount. And there are also delays that accept a DateTimeOffset which will wait until the specified time occurs, and then start replaying the input. (This second, absolute time based approach is essentially equivalent to the TimeSpan overloads. You would get more or less the same effect by subtracting the current time from the target time to get a TimeSpan, except the DateTimeOffset version attempts to deal with changes in the system clock that occur between Delay being called, and the specified time arriving.)

To show the Delay method in action, this example creates a sequence of values one second apart and timestamps them. This will show that it is not the subscription that is being delayed, but the actual forwarding of the notifications to our final subscriber.

IObservable<Timestamped<long>> source = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Take(5)
    .Timestamp();

IObservable<Timestamped<long>> delay = source.Delay(TimeSpan.FromSeconds(2));

delay.Subscribe(value => 
   Console.WriteLine(
     $"Item {value.Value} with timestamp {value.Timestamp} received at {DateTimeOffset.Now}"),
   () => Console.WriteLine("delay Completed"));

If you look at the timestamps in the output, you can see that the times captured by Timestamp are all two seconds earlier than the time reported by the subscription:

Item 0 with timestamp 09/11/2023 17:32:20 +00:00 received at 09/11/2023 17:32:22 +00:00
Item 1 with timestamp 09/11/2023 17:32:21 +00:00 received at 09/11/2023 17:32:23 +00:00
Item 2 with timestamp 09/11/2023 17:32:22 +00:00 received at 09/11/2023 17:32:24 +00:00
Item 3 with timestamp 09/11/2023 17:32:23 +00:00 received at 09/11/2023 17:32:25 +00:00
Item 4 with timestamp 09/11/2023 17:32:24 +00:00 received at 09/11/2023 17:32:26 +00:00
delay Completed

Note that Delay will not time-shift OnError notifications. These will be propagated immediately.

Sample

The Sample method produces items at whatever interval you ask. Each time it produces a value, it reports the last value that emerged from your source. If you have a source that produces data at a higher rate than you need (e.g. suppose you have an accelerometer that reports 100 measurements per second, but you only need to take a reading 10 times a second), Sample provides an easy way to reduce the data rate. This example shows Sample in action.

IObservable<long> interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
interval.Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);

Output:

5
12
18

If you looked at these numbers closely, you might have noticed that the interval between the values is not the same each time. I chose a source interval of 150ms and a sample interval of 1 second to highlight an aspect of sampling that can require careful handling: if the rate at which a source produces items doesn't line up neatly with the sampling rate, this can mean that Sample introduces irregularities that weren't present in the source. If we list the times at which the underlying sequence produces values, and the times at which Sample takes each value, we can see that with these particular timings, the sample intervals only line up with the source timings every 3 seconds.

Relative time (ms) Source value Sampled value
0
50
100
150 0
200
250
300 1
350
400
450 2
500
550
600 3
650
700
750 4
800
850
900 5
950
1000 5
1050 6
1100
1150
1200 7
1250
1300
1350 8
1400
1450
1500 9
1550
1600
1650 10
1700
1750
1800 11
1850
1900
1950 12
2000 12
2050
2100 13
2150
2200
2250 14
2300
2350
2400 15
2450
2500
2550 16
2600
2650
2700 17
2750
2800
2850 18
2900
2950
3000 19 19

Since the first sample is taken after the source emits five, and two thirds of the way into the gap after which it will produce six, there's a sense in which the "right" current value is something like 5.67, but Sample doesn't attempt any such interpolation. It just reports the last value to emerge from the source. A related consequence is that if the sampling interval is short enough that you're asking Sample to report values faster than they are emerging from the source, it will just repeat values.

Throttle

The Throttle extension method provides a sort of protection against sequences that produce values at variable rates and sometimes too quickly. Like the Sample method, Throttle will return the last sampled value for a period of time. Unlike Sample though, Throttle's period is a sliding window. Each time Throttle receives a value, the window is reset. Only once the period of time has elapsed will the last value be propagated. This means that the Throttle method is only useful for sequences that produce values at a variable rate. Sequences that produce values at a constant rate (like Interval or Timer) would have all of their values suppressed if they produced values faster than the throttle period, whereas all of their values would be propagated if they produced values slower than the throttle period.

// Ignores values from an observable sequence which 
// are followed by another value before dueTime.
public static IObservable<TSource> Throttle<TSource>(
    this IObservable<TSource> source, 
    TimeSpan dueTime)
{...}
public static IObservable<TSource> Throttle<TSource>(
    this IObservable<TSource> source, 
    TimeSpan dueTime, 
    IScheduler scheduler)
{...}

We could apply Throttle to use a live search feature that makes suggestions as you type. We would typically want to wait until the user has stopped typing for a bit before searching for suggestions, because otherwise, we might end up kicking off several searches in a row, cancelling the last one each time the user presses another key. Only once there is a pause should we can execute a search with what they have typed so far. Throttle fits well with this scenario, because it won't allow any events through at all if the source is producing values faster than the specified rate.

Note that the RxJS library decided to make their version of throttle work differently, so if you ever find yourself using both Rx.NET and RxJS, be aware that they don't work the same way. In RxJS, throttle doesn't shut off completely when the source exceeds the specified rate: it just drops enough items that the output never exceeds the specified rate. So RxJS's throttle implementation is a kind of rate limiter, whereas Rx.NET's Throttle is more like a self-resetting circuit breaker that shuts off completely during an overload.

Timeout

The Timeout operator method allows us terminate a sequence with an error if the source does not produce any notifications for a given period. We can either specify the period as a sliding window with a TimeSpan, or as an absolute time that the sequence must complete by providing a DateTimeOffset.

// Returns either the observable sequence or a TimeoutException
// if the maximum duration between values elapses.
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source, 
    TimeSpan dueTime)
{...}
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source, 
    TimeSpan dueTime, 
    IScheduler scheduler)
{...}

// Returns either the observable sequence or a  
// TimeoutException if dueTime elapses.
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source, 
    DateTimeOffset dueTime)
{...}
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source, 
    DateTimeOffset dueTime, 
    IScheduler scheduler)
{...}

If we provide a TimeSpan and no values are produced within that time span, then the sequence fails with a TimeoutException.

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
                       .Take(5)
                       .Concat(Observable.Interval(TimeSpan.FromSeconds(2)));

var timeout = source.Timeout(TimeSpan.FromSeconds(1));
timeout.Subscribe(
    Console.WriteLine, 
    Console.WriteLine, 
    () => Console.WriteLine("Completed"));

Initially this produces values frequently enough to satisfy Timeout, so the observable returned by Timeout just forwards items from the source. But once the source stops producing items, we get an OnError:

0
1
2
3
4
System.TimeoutException: The operation has timed out.

Alternatively, we can pass Timeout an absolute time; if the sequence does not complete by that time, it produces an error.

var dueDate = DateTimeOffset.UtcNow.AddSeconds(4);
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var timeout = source.Timeout(dueDate);
timeout.Subscribe(
    Console.WriteLine, 
    Console.WriteLine, 
    () => Console.WriteLine("Completed"));

Output:

0
1
2
System.TimeoutException: The operation has timed out.

There are other Timeout overloads enabling us to substitute an alternative sequence when a timeout occurs.

// Returns the source observable sequence or the other observable 
// sequence if the maximum duration between values elapses.
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source, 
    TimeSpan dueTime, 
    IObservable<TSource> other)
{...}

public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source, 
    TimeSpan dueTime, 
    IObservable<TSource> other, 
    IScheduler scheduler)
{...}

// Returns the source observable sequence or the 
// other observable sequence if dueTime elapses.
public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source, 
    DateTimeOffset dueTime, 
    IObservable<TSource> other)
{...}  

public static IObservable<TSource> Timeout<TSource>(
    this IObservable<TSource> source, 
    DateTimeOffset dueTime, 
    IObservable<TSource> other, 
    IScheduler scheduler)
{...}

As we've now seen, Rx provides features to manage timing in a reactive paradigm. Data can be timed, throttled, or sampled to meet your needs. Entire sequences can be shifted in time with the delay feature, and timeliness of data can be asserted with the Timeout operator.

Next we will look at the boundary between Rx and the rest of the world.

Ian Griffiths

Technical Fellow I

Ian Griffiths

Ian has worked in various aspects of computing, including computer networking, embedded real-time systems, broadcast television systems, medical imaging, and all forms of cloud computing. Ian is a Technical Fellow at endjin, and Microsoft MVP in Developer Technologies. He is the author of O'Reilly's Programming C# 10.0, and has written Pluralsight courses on WPF (and here) and the TPL. He's a maintainer of Reactive Extensions for .NET, Reaqtor, and endjin's 50+ open source projects. Technology brings him joy.