Observability
Imagine a collection of values T
. This collection can be updated by inserting
new values, removing existing ones, or the collection can truncated, cleared…
This collection acts as the standard Vec
. However, there is a
subtlety: This collection is observable. It is possible for someone to
subscribe to this collection and to receive its updates.
This observability pattern is the basis of reactive programming. It applies to
any kind of type. Actually, it can be generalized as a single Observable<T>
type. For collections though, we will see that an ObservableVector<T>
type is
more efficient.
I’ve recently played a lot with this pattern as part of my work inside the Matrix Rust SDK, a set of Rust libraries that aim at developing robust Matrix clients or bridges. It is notoriously used by the next generation Matrix client developed by Element, namely Element X. The Matrix Rust SDK is cross-platform. Element X has two implementations: on iOS, iPadOS and macOS with Swift, and on Android with Kotlin. Both languages are using our Rust bindings to Swift and Kotlin. This is the story for another series (how we have automated this, how we support asynchronous flows from Rust to foreign languages etc.), but for the moment, let’s keep focus on reactive programming.
Taking the Element X use case, the room list –which is the central piece of the app– is fully dynamic:
- Rooms are sorted by recency, so rooms move to the top when a new interesting message is received,
- The list can be filtered by room properties (one can filter by group or people, favourites, unreads, invites…),
- The list is also searchable by room names.
The rooms exposed by the room list are stored in a unique observable type.
Why is it dynamic? Because the app continuously sync new data that update the
internal state: when a room gets an update from the network, the room list is
automatically updated. The beauty of it: we have nothing to do. Sorters and
filters are run automatically. Why? Spoiler: because everything is a Stream
.
Thanks to the Rust async model, every part is lazy. The app never needs to ask for Rust if a new update is present. It literally just waits for them.
I believe this reactive programming approach is pretty interesting to explore.
And this is precisely the goal of this series. We are going to play with
Stream
a lot, with higher-order Stream
a lot more, and w…
Hold on a second! I believe this first step is a bit steep for someone who's not familiar with asynchronous code in Rust, don't you think?
Before digging in the implementation details you are obviously eager to share, maybe we can start with examples.
Alrighty. Fair. Before digging into the really fun bits, we need some basis.
Baby steps with reactive programming
Everything we are going to share with you has been implemented in a library
called eyeball
. To give you a good idea of what reactive
programming in Rust can look like, let's create a Rust program:
)
// in `src/main.rs`
use Observable;
What do we see here? First off, observable
is an observable value. Proof
is: It is possible to subscribe to it, see subscriber
. Both observable
and
subscriber
are seeing the same initial value: 7. When observable
receives a
new value, 13, both observable
and subscriber
are seeing the updated value. Let's take it for a spin:
& ) =
=
) & ) =
=
)
Tadaa. Fantastic, isn't it?
I… I am… speechless? Is it really reactive programming? Where is the reactivity here? It seems like you've only shared a value between an owner and a watcher. You're calling them observable and subscriber, alright, but how is this thing reactive? I only see synchronous code for the moment.
Hold on. You told me to start slow. You're right though: the Observable
owns
the value. The Subscriber
is able to read the value from the Observable
.
However, Subscriber::next
returns a Future
! Let's add this:
// in `src/main.rs`
// …
|
| ) {
|
| ) );
|
Indeed. Almighty rustc
is correct. The main
function is not async
. We need
an asynchronous runtime. Let's use the smol
project, I enjoy it a
lot: it's a small, fast and well-written async runtime:
[
Now let's modify our main
function a little bit:
// in `src/main.rs`
use Observable;
Please rustc
, be nice…
& ) =
=
) & ) =
=
) )
)
Hurray!
We can even have a bit more ergonomics by using the smol-macros
crate which sets up a default async runtime
Executor
for us. It's useful in our case as we want to play
with something else (reactive programming), and don't want to focus on the async
runtime itself:
We will take the opportunity to improve our program a little bit. Let's spawn a
Future
that will continuously read new updates from the subscriber
.
use Duration;
use Observable;
use apply;
use ;
use main;
async
The little Timer::after
calls are here to pretend the values are coming from
random events, for the moment. Let's run it again to see if we get the same
result:
Here we go, perfect! See, ah ha! It's async and nice now.
I believe I start to appreciate it. However, I foresee you might hide something
behind these Time::after
. Am I right?
And this task.await
at the end makes the program to never finish. It explains
the need to send a SIGINT
signal to the program to interrupt it,
right?
You're slick. Indeed, I wanted to focus on the observable
and the
subscriber
. Because there is a subtlety here. If the Timer::after
are
removed, only the last update will be displayed on the output by dbg!
.
And that's perfectly normal. The async runtime will execute all the
Observable::set(&mut observable, new_value)
in a row, and then, once there
is an await point, another task will have room to run. In this case, that's
subscriber.next().await
.
The subscriber only receives the last update, and that's pretty important
to understand. There is no buffer of all the previous updates here, no memory,
no trace, subscriber
returns the last value when it is called. Note that this
is not always the case as we will see with ObservableVector
later, but for the
moment, that's the case.
And yes, if we want the task
to get a chance to consume more updates, we need
to tell the executor we will wait while the current other tasks are waken up. To
do that, we can use the smol::yield_now
function:
// Now, let's update `observable`.
;
set // Eh `executor`: `task` can run now, we will wait!
yield_now.await;
// More updates.
;
set ;
set // Eh `executor`: _bis repetita placent_!
yield_now.await;
drop
task.await;
}
Let's see what happens:
Eh, see, new_value = 17
is not displayed, because the observable
is
updated but the subscriber
is suspended by the executor. But the others are
read, good good.
Note that we are dropping the observable
. Once it's dropped, the subscriber
won't be able to read any value from it, so it's going to close itself, and the
task
will end. That's why waiting on the task with task.await
will terminate
this time. And thus, the program will finish gracefully.
And that's it. That's the basis of reactive programming. Also note that
Subscriber<T>
implements Send
and Sync
if T
implements Send
and
Sync
, i.e. if the observed type implements these traits. That's pretty useful
actually: it is possible to send the subscriber in a different thread, and keep
waiting for new updates.
Attack of the Clones
However, at the beginning of this episode, we were talking about a collection.
Let's focus on Vec
.
Why do we focus on Vec
only? Why not HashMap
, HashSet
, BTreeSet
,
BTreeMap
, BinaryHeap
, LinkedList
or even VecDeque
? It seems a bit
non-inclusive if you ask me. Are you aware there isn't only Vec
in life?
Well, the reason is simple: Vec
is supported by eyeball
. It's a matter of
time and work to support other collections, it's definitely not impossible but
you will see that it's not trivial neither to support all these collections for
a simple reason: Did you notice that Subscriber
produces an owned T
? Not a
&T
, but a T
. That's because
Subscriber::next
requires T: Clone
. It means
that the observed value will be cloned every time it is broadcasted to a
subscriber.
Cloning a value may be expensive. Here we are manipulating usize
,
which is a primitive type, so it's all fine (it boils down to a memcpy
).
But imagine an Observable<Vec<BigType>>
where BigType
is 512 bytes: the
memory impact is going to be quickly noticeable. So th…
… Excuse my interruption! You know how I love reading books. I like
defining myself as a bibliophile. Anyway. During my perusal of the eyeball
documentation, I have found
Subscriber::next_ref
. The documentation
says:
Wait for an update and get a read lock for the updated value.
and later:
You can use this method to get updates of an
Observable
where the inner type does not implementClone
.
Can you stop cutting me off please? It's really unpleasant. And do not forget we are not alone… doing sideways head movement
You're right though. There is Subscriber::next_ref
. However, if you are such
an assiduous reader, you may have read the end of the documentation, aren't
you?
However, the
Observable
will be locked (not updateable) while any read guards are alive.
Blocking the Observable
might be tolerable in some cases, but it cannot be
generalized to all use cases. A user is more likely to prefer next
instead of
next_ref
by default.
Back to our Observable<Vec<BigType>>
then. Imagine the collection contains a
lot of items: cloning the entire Vec<_>
for every update to every subscriber
is a pretty inefficient way of programming. Remember that, as a programmer, we
have the responsability to make our programs use as few resources as possible,
so that hardwares can be used longer. The hardware is the most polluting segment
of our digital world.
So. How a data structure like Vec
can be cloned cheaply? We could put
it inside an Arc
right? Cloning an Atomically Reference Counted value
is really cheap: it increases the counter by 1 atomically,
the inner value is untouched. Nonetheless, we have a mutation problem now.
If we have Observable<Arc<Vec<_>>>
, it means that the subscribers will be
Subscriber<Arc<Vec<_>>>
. In this case, every time the observable wants to
mutate the data, it is going to… be… impossible because an Arc
is nothing
less than a shared reference, and shared references in Rust disallow mutation by
default. Using Observable::set
will create a new Arc
, but we cannot update
the value inside the Arc
, except if we use a lock… Well, we are adding more
and more complexity.
Spes salutis
1! Fortunately for us, immutable data
structures exist in Rust.
An immutable data structure is a data structure which can be copied and modified efficiently without altering the original.
It can be modified. However, as soon as it is copied (or cloned), it is still possible to modify the copy but the original data is not modified. That's extremely powerful.
Such structures bring many advantages, but one of them is structural sharing:
If two data structures are mostly copies of each other, most of the memory they take up will be shared between them. This implies that making copies of an immutable data structure is cheap: it's really only a matter of copying a pointer and increasing a reference counter, where in the case of
Vec
you have to allocate the same amount of memory all over again and make a copy of every element it contains. For immutable data structures, extra memory isn't allocated until you modify either the copy or the original, and then only the memory needed to record the difference.
Well, taking a deep breath, it sounds exactly like what we
need to solve our issue, isn't it? The Observable<Immutable<_>>
and the
Subscriber<Immutable<_>>
s will share the same value, with the observable
being able to mutate its inner value. The subscribers can modify the received
value too, in an efficient way, without conflicting with the value from the
observable. Both values will continue to live on their side, but cloning the
value is cheap.
Dare I ask how immutable data structures are implemented? It sounds like complex beasts.
I mean… a naive implementation sounds relatively doable but I am guessing there is a lot of subtleties, possible conflicts, and many memory guarantees that I am not anticipating yet, right?
Oh… beati pauperes in spiritu
2… it is
actually really complex. It may be a topic for another series or articles.
For the moment, if you interested, let me redirect you to one research paper
that proposes an immutable Vec
: RRR Vector: A Practical General
Immutable Sequence3. Be cool though, understanding this part is
not necessary at all for what we are talking now. It's a great tool we are going
to use, no matter how it works internally.
Do you know the other good news? We don't have to implement it by ourselves,
because some nice people already did it! Enter the imbl
crate. This
crate provides a Vector
type. It can be use like a regular
Vec
. (Side note: it's even smarter than a Vec
because it implements smart
head and tail chunking4, and allocates in the stack or on the heap
depending on the size of the collection, similarly to the smallvec
crate. End of digression)
Observable (immutable) collection
The imbl
crate then. It provides a Vector
type. eyeball
provides a crate for working with immutable data structures (how surprising
huh?): this crate is eyeball-im
.
Instead of providing an Observable<T>
type, it provides an
ObservableVector<T>
type which is a Vector
,
but an observable one! Let's see… what do we have… scroll the
documentation, hmm, interesting, scroll more…, okay, that's
interesting:
- First off, there is methods like
append
,pop_back
,pop_front
,push_back
,push_front
,remove
,insert
,set
,truncate
andclear
. It seems this collection is pretty flexible. The vocabulary is clear. They all take a&mut self
, cool. - Then, there is a
with_capacity
method, this is intriguing, add to notes, - Finally, we find our not-so-ol' friend
subscribe
, but it returns aVectorSubscriber<T>
.
Let's explore VectorSubscriber
a bit more, would you? Scroll the
document, contrary to
Subscriber::next
, there is no next
method. How
are we supposed to wait on an update?
Confer to the assiduous reader! If you read carefully the documentation of the
Subscriber::next
method, you will see:
This method is a convenience so you don't have to import a
Stream
extension trait such asfutures::StreamExt
ortokio_stream::StreamExt
.
… fair enough. So Subscriber::next
mimics StreamExt::next
. Okay. Let's look
at Stream
first, it's from the futures
crate. Stream
defines itself as:
A stream of values produced asynchronously.
If
Future<Output = T>
is an asynchronous version ofT
, thenStream<Item = T>
is an asynchronous version ofIterator<Item = T>
. A stream represents a sequence of value-producing events that occur asynchronously to the caller.The trait is modeled after
Future
, but allowspoll_next
to be called even after a value has been produced, yielding None once the stream has been fully exhausted.
We aren't going to teach everything about Stream
: why this design, its
pros and cons… However, wave its hand to ask you to come
closer, did you notice how Future::poll
returns Poll<Self::Output>
,
whilst Stream::poll_next
returns
Poll<Option<Self::Item>>
? It's really similar to Iterator::next
which
returns Option<Self::Item>
.
Let's take a look at Poll<T>
don't you mind? It's an enum with 2 variants:
Ready(value)
means avalue
is immediately ready,Pending
means no value is ready yet.
Then, what Poll<Option<T>>
represents for a Stream
?
Poll::Ready(Some(value))
means this stream has successfully produced avalue
, and may produce more values on subsequentpoll_next
calls,Poll::Ready(None)
means the stream has terminated (andpoll_next
should not be called anymore),Poll::Pending
means no value is ready yet.
It makes perfect sense. A Future
produces a single value, whilst a Stream
produces multiple values, and Poll::Ready(None)
represents the termination of
the stream, similarly to None
to represent the termination of an Iterator
.
Ahh, I love consistency.
We have the basis. Now let's see StreamExt
. It's
a trait extending Stream
to add convenient combinator methods. Amongst other
things, we find StreamExt::next
! Ah ha!
It returns a Next
type which implements a Future
, exactly what eyeball
does actually. Remember our:
// from `main.rs`
while let Some = subscriber.next.await
It is exactly the same pattern with StreamExt::next
:
// from the documentation of `StreamExt::Next`
use ;
let mut stream = iter;
assert_eq!;
assert_eq!;
assert_eq!;
assert_eq!;
Pieces start to come together, don't they?
End of the detour. Back to eyeball_im::VectorSubscriber<T>
. It is possible to
transform this type into a Stream
with its
into_stream
method. It returns
a VectorSubscriberStream
. Naming is
hard, but if I would have to guess, I would say it implements… a… Stream
?
// from `eyeball-im`
Dust blown away, the puzzle starts to appear clearly. Let's back on coding!
[
// in `src/main.rs`
use ObservableVector;
use StreamExt;
use apply;
use ;
use main;
async
Time to show off:
Do you see something new?
Hmm, indeed. With Observable
, some values may “miss” because Observable
and Subscriber
have no buffer. The subscribers only return the current value
when asked for. However, with ObservableVector
, things are different: no
missing values. There are all here. As if there… was a buffer!
And the values returned by the subscriber are not the raw T
:
we see PushBack
. It comes from, check the documentation,
VectorDiff::PushBack
!
Good eyes, well done.
First off, that's correct that PushBack
comes from
VectorDiff
. Let's come back to this piece in
a second: it is the cornerstone of the entire series, it deserves a bit of
explanations.
Second, yes, VectorSubscriber
returns all values! There is actually a
buffer. It's a bit annoying to continue with a task
as we did so far, let's
use assert_eq!
instead.
// in `src/main.rs`
use ;
// ^^^^^^^^^^ new!
// …
async
)
)
)
)
)
)
)
)
Beautiful! However… the code is a bit verbose, isn't it? Desperately waiting for an affirmative answer, okay, okay, something you may not know about me: I love macros. There. I said it. Let's quickly craft one:
// in `src/main.rs`
// before the `main` function
This macro does exactly what our assert_eq!
was doing, except now it's shorter
to use, and thus more pleasant. Don't believe me? See by yourself:
// in `src/main.rs`
// at the end of the `main` function
// Push one value.
observable.push_back;
assert_next_eq!;
// Push another value.
observable.push_back;
observable.push_back;
observable.push_back;
assert_next_eq!;
assert_next_eq!;
assert_next_eq!;
There we go.
Having a scientific and rigorous approach is important in our domain. We said
ObservableVector
seems to contain a buffer, and VectorSubscriber
seems to
pop values from this buffer. Let's play with that. I see two things to test:
- Modify the
ObservableVector
, and subscribe to it after: Does the subscriber receive the update before it was created? - How many values the buffer can hold?
let mut observable = new;
// Push a value before the subscriber exists.
observable.push_back;
let mut subscriber = observable.subscribe.into_stream;
// Push another value.
observable.push_back;
assert_next_eq!;
If the subscriber
receives a
, it must fail, otherwise no error:
)
)
Look Ma', no error!
We have learned that a VectorSubscriber
is aware of the new updates that are
made once it exists. A VectorSubscriber
is not aware of updates that happened
before its creation.
In the example, VectorDiff::PushBack { value: 'a' }
is not received before
subscriber
was created. However, VectorDiff::PushBack { value: 'b' }
is
received because it happened after subscriber
was created. It makes perfect
sense.
It suggests that the buffer lives inside VectorSubscriber
, and not inside
ObservableVector
. Or maybe the buffer is shared between the observable and the
subscribers, with the buffer having some specific semantics, like a channel.
We would need to look at the implementation to be sure.
Agree. This is left as an exercise for the reader, wink to you.
We have an answer to question 1. What about question 2? The size of the buffer.
// in `src/main.rs`
let mut observable = new;
let mut subscriber = observable.subscribe.into_stream;
// Push ALL THE VALUES!
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
assert_next_eq!;
// no need to assert the others
)
)
Hmm, the buffer doesn't seem to be full with 16 values. Let's add a couple more:
// in `src/main.rs`
// [ … snip … ]
observable.push_back;
observable.push_back;
observable.push_back;
observable.push_back;
// ^ new!
observable.push_back;
// ^ new!
assert_next_eq!;
)
)
)
)
Oh! An error, great! Our assert_next_eq!
has failed. subscriber
does not receive a VectorDiff::PopBack
but a VectorDiff::Reset
.
Let's play with
ObservableVector::with_capacity
a moment, maybe it's related to the buffer capacity? Let's change a single line:
let mut observable = with_capacity;
// ^^^^^^^^^^^^^^^^^ new!
)
)
We have learned that ObservableVector::with_capacity
controls the size of
the buffer.
The name could suggest that it controls the capacity of the observed Vector
,
à la Vec::with_capacity
, but it must not be confused.
For a reason we ignore so far, when the buffer is full, we receive a
VectorDiff::Reset
. We need to learn more about this type.
Observable differences
The previous section was explaining how immutable data structures could save us
by cheaply and efficiently cloning the data between the observable and its
subscribers. However, we see that eyeball-im
, despite using imbl
, does
not share a imbl::Vector
but a eyeball_im::VectorDiff
. Why such design?
It looks like a drama. A betrayal. An act of treachery!
Well. Firstly, eyeball-im
is relying on some immutable properties of Vector
.
And secondly, the reason for which VectorDiff
exists is simple. If a
subscriber receives Vector
s, how is the user able to see what has changed? The
user (!) would be responsible to calculate the differences between 2 Vector
s
every time! Not only this is costly, but it is utterly error-prone.
Are you suggesting that VectorSubscriber
(or VectorSubscriberStream
)
calculates the differences between the Vector
s itself so that the user doesn't
have to?
I still see many problems though. I believe the order of the VectorDiff
s
matters a lot for some use cases. For example, let's consider two consecutive
Vector
s:
['a', 'b', 'c']
and['a', 'c', 'b']
.
Has 'b'
been removed and pushed back, or 'c'
been popped back and inserted?
How can you decide between the twos?
We can't —it would be implementation specifics anyway— and we don't want to.
The user is manipulating the ObservableVector
in a special way, and we should
ideally not change that.
These VectorDiff
actually comes from ObservableVector
itself! Let's look at
the implementation of
ObservableVector::push_back
:
Each method adding or removing values on the ObservableVector
emits its own
VectorDiff
variant. No calculation, it's purely a mapping:
ObservableVector::… | VectorDiff::… | Meaning |
---|---|---|
append(values) | Append { values } | Append many values |
clear() | Clear | Clear out all the values |
insert(index, value) | Insert { index, value } | Insert a value at index |
pop_back() | PopBack | Remove the value at the back |
pop_front() | PopFront | Remove the value at the front |
push_back(value) | PushBack { value } | Add value at the back |
push_front(value) | PushFront { value } | Add value at the front |
remove(index) | Remove { index } | Remove value at index |
set(index, value) | Set { index, value } | Replace value at index by value |
truncate(length) | Truncate { length } | Truncate to length values |
Mappings of ObservableVector
methods to VectorDiff
variants.
See, for each VectorDiff
variant, there is an ObservableVector
method
triggering it.
And what about VectorDiff::Reset
?
We were receiving it when the buffer was full apparently. You are not mentioning
it, and if I take a close look at ObservableVector
's documentation, I don't
see any reset
method. Is it only an internal thing?
You are correct. When the buffer is full, the subscriber will provide a
VectorDiff::Reset { values }
where values
is the full list of values. The
documentation says:
The subscriber lagged too far behind, and the next update that should have been received has already been discarded from the internal buffer.
If the subscriber didn't catch all the updates, the best thing it can do is to
say: Okay, I am late at the party, I've missed several things, so here is the
current state!
. This is not ideal, but the subscriber is responsible to not
lag, and this design avoids having missing values. If a subscriber receives
too much VectorDiff::Reset
s, the user may consider increasing the capacity of
the ObservableVector
.
Filtering and sorting with higher-order Stream
s
We are reaching the end of this episode. And you know what? We have set all the
parts to talk about higher-order Stream
, chante victory and dance at the
same time!
At the beginning of this episode, we were saying that the Matrix Rust SDK is
able to filter and to sort an ObservableVector
representing all the rooms.
How? VectorSubscriberStream
is a Stream
. More specifically, it is a
Stream<Item = VectorDiff<T>>
. Now questions:
- What's the difference between an unfiltered
Vector
and a filteredVector
? - What's the difference between an unsorted
Vector
and a sortedVector
? - What's the difference between a filtered
Vector
and a sortedVector
? - and so on.
All of them are strictly Stream<Item = VectorDiff<T>>
! However, the
VectorDiff
s aren't the same. A simple example. Let's say we build a vector by
inserting 1
, 2
, 3
and 4
. We subscribe to it, and we want to filter out
all the even numbers. Instead of receiving:
VectorDiff::Insert { index: 0, value: 1 }
,VectorDiff::Insert { index: 1, value: 2 }
,VectorDiff::Insert { index: 2, value: 3 }
,VectorDiff::Insert { index: 3, value: 4 }
.
… we want to receive:
VectorDiff::Insert { index: 0, value: 1 }
,VectorDiff::Insert { index: 1, value: 3 }
: note theindex
, it is not 2 but 1!
We will see how all that works in the next episodes and how powerful this design
is, especially when it comes to cross-platform UI (user interface). We are going
to learn so much about Stream
and Future
, it's going to be fun!
Latine expression meaning salavation hope.
Latine expression meaning bless are the poor in spirit.
Relaxed-Radix-Balanced (RRR) Vector: A Practical General Purpose Immutable Sequence by Sticki N., Rompf T., Ureche V. and Bagwell P. (2015, August), in Proceedings of the 20th ACM SIGPLAN International Conference on Functional Programming (pp. 342-354).
Theory and Practise of Chunked Sequences by Acar U. A., Charguéraud A., and Rainey M. (2014), in Algorithms-ESA 2014: 22th Annual European Symposium, Wroclaw, Poland, September 8-10, 2014. Proceedings 21 (pp. 25-36)., Springer Berlin Heidelberg.