
Introduction to Rx
Kindle edition (2012)
Practical Rx Training
London 6-7 October 2015
Presented by the author of IntroToRx.com
Book now!
Time-shifted sequences
When working with observable sequences, the time axis is an unknown quantity: when
will the next notification arrive? When consuming an IEnumerable sequence,
asynchrony is not a concern; when we call MoveNext()
, we are blocked
until the sequence yields. This chapter looks at the various methods we can apply
to an observable sequence when its relationship with time is a concern.
Buffer
Our first subject will be the Buffer method. In some situations, you may not want a deluge of individual notifications to process. Instead, you might prefer to work with batches of data. It may be the case that processing one item at a time is just too expensive, and the trade-off is to deal with messages in batches, at the cost of accepting a delay.
The Buffer operator allows you to store away a range of values and then re-publish them as a list once the buffer is full. You can temporarily withhold a specified number of elements, stash away all the values for a given time span, or use a combination of both count and time. Buffer also offers more advanced overloads that we will look at in a future chapter.
The two overloads of Buffer are straight forward and should make it simple for other developers to understand the intent of the code.
For some use cases, it may not be enough to specify only a buffer size and a maximum delay period. Some systems may have a sweet spot for the size of a batch they can process, but also have a time constraint to ensure that data is not stale. In this case buffering by both time and count would be suitable.
In this example below, we create a sequence that produces the first ten values one second apart, then a further hundred values within another second. We buffer by a maximum period of three seconds and a maximum batch size of fifteen values.
Output:
Note the variations in time and buffer size. We never get a buffer containing more
than fifteen elements, and we never wait more than three seconds. A practical application
of this is when you are loading data from an external source into an ObservableCollection<T>
in a WPF application. It may be the case that adding one item at a time is just
an unnecessary load on the dispatcher (especially if you are expecting over a hundred
items). You may have also measured, for example that processing a batch of fifty
items takes 100ms. You decide that this is the maximum amount of time you want to
block the dispatcher, to keep the application responsive. This could give us two
reasonable values to use: source.Buffer(TimeSpan.FromMilliseconds(100), 50)
.
This means the longest we will block the UI is about 100ms to process a batch of
50 values, and we will never have values waiting for longer than 100ms before they
are processed.
Overlapping buffers
Buffer also offers overloads to manipulate the overlapping of the buffers. The variants we have looked at so far do not overlap and have no gaps between buffers, i.e. all values from the source are propagated through.
There are three interesting things you can do with overlapping buffers:
- Overlapping behavior
- Ensure that current buffer includes some or all values from previous buffer
- Standard behavior
- Ensure that each new buffer only has new data
- Skip behavior
- Ensure that each new buffer not only contains new data exclusively, but also ignores one or more values since the previous buffer
Overlapping buffers by count
If you are specifying a buffer size as a count, then you need to use this overload.
You can apply the above scenarios as follows:
- Overlapping behavior
- skip < count*
- Standard behavior
- skip = count
- Skip behavior
- skip > count
*The skip parameter cannot be less than or equal to zero. If you want to use a value of zero (i.e. each buffer contains all values), then consider using the Scan method instead with an IList<T> as the accumulator.
Let's see each of these in action. In this example, we have a source that produces values every second. We apply each of the variations of the buffer overload.
Output
Note that in each buffer, one value is skipped from the previous batch. If we change the skip parameter from 1 to 3 (same as the buffer size), we see standard buffer behavior.
Output
Finally, if we change the skip parameter to 5 (a value greater than the count of 3), we can see that two values are lost between each buffer.
Output
Overlapping buffers by time
You can, of course, apply the same three behaviors with buffers defined by time instead of count.
To exactly replicate the output from our Overlapping Buffers By Count examples, we only need to provide the following arguments:
As our source produces values consistently every second, we can use the same values from our count example but as seconds.
Delay
The Delay extension method is a purely a way to time-shift an entire sequence. You can provide either a relative time the sequence should be delayed by using a TimeSpan, or an absolute point in time that the sequence should wait for using a DateTimeOffset. The relative time intervals between the values are preserved.
To show the Delay method in action, we create a sequence of values one second apart and timestamp them. This will show that it is not the subscription that is being delayed, but the actual forwarding of the notifications to our final subscriber.
Output:
It is worth noting that Delay will not time-shift OnError notifications. These will be propagated immediately.
Sample
The Sample method simply takes the last value for every specified TimeSpan. This is great for getting timely data from a sequence that produces too much information for your requirements. This example shows sample in action.
Output:
This output is interesting and this is the reason why I choose the value of 150ms. If we plot the underlying sequence of values against the time they are produced, we can see that Sample is taking the last value it received for each period of one second.
Relative time (ms) | Source value | Sampled value |
---|---|---|
0 | ||
50 | ||
100 | ||
150 | 0 | |
200 | ||
250 | ||
300 | 1 | |
350 | ||
400 | ||
450 | 2 | |
500 | ||
550 | ||
600 | 3 | |
650 | ||
700 | ||
750 | 4 | |
800 | ||
850 | ||
900 | 5 | |
950 | ||
1000 | 5 | |
1050 | 6 | |
1100 | ||
1150 | ||
1200 | 7 | |
1250 | ||
1300 | ||
1350 | 8 | |
1400 | ||
1450 | ||
1500 | 9 | |
1550 | ||
1600 | ||
1650 | 10 | |
1700 | ||
1750 | ||
1800 | 11 | |
1850 | ||
1900 | ||
1950 | 12 | |
2000 | 12 | |
2050 | ||
2100 | 13 | |
2150 | ||
2200 | ||
2250 | 14 | |
2300 | ||
2350 | ||
2400 | 15 | |
2450 | ||
2500 | ||
2550 | 16 | |
2600 | ||
2650 | ||
2700 | 17 | |
2750 | ||
2800 | ||
2850 | 18 | |
2900 | ||
2950 | ||
3000 | 19 | 19 |
Throttle
The Throttle extension method provides a sort of protection against sequences that produce values at variable rates and sometimes too quickly. Like the Sample method, Throttle will return the last sampled value for a period of time. Unlike Sample though, Throttle's period is a sliding window. Each time Throttle receives a value, the window is reset. Only once the period of time has elapsed will the last value be propagated. This means that the Throttle method is only useful for sequences that produce values at a variable rate. Sequences that produce values at a constant rate (like Interval or Timer) either would have all of their values suppressed if they produced values faster than the throttle period, or all of their values would be propagated if they produced values slower than the throttle period.
A great application of the Throttle method would be to use it with a live search like "Google Suggest". While the user is still typing we can hold off on the search. Once there is a pause for a given period, we can execute the search with what they have typed. The Rx team has a great example of this scenario in the Rx Hands On Lab
Timeout
We have considered handling timeout exceptions previously in the chapter on Flow control. The Timeout extension method allows us terminate the sequence with an error if we do not receive any notifications for a given period. We can either specify the period as a sliding window with a TimeSpan, or as an absolute time that the sequence must complete by providing a DateTimeOffset.
If we provide a TimeSpan and no values are produced within that time span, then the sequence fails with a TimeoutException.
Output:
Like the Throttle method, this overload is only useful for sequences that produce values at a variable rate.
The alternative use of Timeout is to set an absolute time; the sequence must be completed by then.
Output:
Perhaps an even more interesting usage of the Timeout method is to substitute
in an alternative sequence when a timeout occurs. The Timeout method has
overloads the provide the option of specifying a continuation sequence to use if
a timeout occurs. This functionality behaves much like the
Catch operator. It is easy to imagine that the simple overloads actually
just call through to these over loads and specify an Observable.Throw<TimeoutException>
as the continuation sequence.
Rx provides features to tame the unpredictable element of time in a reactive paradigm. Data can be buffered, throttled, sampled or delayed to meet your needs. Entire sequences can be shifted in time with the delay feature, and timeliness of data can be asserted with the Timeout operator. These simple yet powerful features further extend the developer's tool belt for querying data in motion.
Additional recommended reading
<< Back to : Combining sequences | Moving on to : Hot and Cold observables>> |