Graphinder – Async IEnumerable with Rx.Net and IObservable


Observer pattern and Rx.Net

Recently I’ve mentioned that I’d like to cover some of the topics regarding Rx.Net also known as System.Reactive.
Although there is lots of things that can be mentioned on playing around with Rx, I’d like to cover one I especially needed for my project, that is async IEnumerable with Rx.Net. To understand how Rx.Net works with IObservable, at least basic understanding of Observer pattern is required. In a nutshell, whole concept wraps up in one class (let’s call it Observable), that can be observed and can notify on specific action (defined by itself) and several other classes (let’s call them Observers), that can observe notifications and react to them.

Provided we want to react on notifications, we need to let Observable know what action should be invoked.

Generally such actions are called respectively: onNext for next element in observable sequence (sequence of events/notifications/changes etc.) and onCompleted (on sequence completed).

Subscribing to Observable with provided delegates…

…and example of notification

Easy? Easy. But how about asynchronous operations?

Going async?

“Alright, alright. I could easily to that without Rx.Net in synchronous environment with passing over delegates.
Just show me the money already.”

Well, I would find it hard to defend Observer pattern being a new concept (and/or implementation of it), but how Rx.Net does it makes you focus on more important thing: what should notify and what should react on notification. That’s the very heart of Rx.Net – deal with all that and let me design/program my ideas instead.
Now let’s jump to the code!

Imagine we have a method like this:

I won’t pretend that it’s either beautiful, nor elegant solution. But it meets the criteria for mimicking solution finding in Graphinder and that’s the point. Let’s get the values we have there:

Result of such program would look the same, or at least close to this:

Synchronous IEnumerable

Easy to follow, plain, synchronous code, right? Right.
But there’s an usual problem of synchronous programming – we are bound to be locked by operations inside GetStuff() and can’t do much more inside our Main method. What should we do about it then? Let’s see what Rx.Net can offer us.


Async IEnumerable with Rx.Net with minimal modification

Let’s change our code as little as possible to achieve two things – reactive code and asynchronous execution.
That’s the field where Rx.Net shine. We get a tons of ready to use extension methods, classes and schedulers that fit any scenario you could think of when going reactive. If I would put it in a one sentence, I would say that:

Rx is for reactive programming like “Saving Private Ryan” to.. well… Pvt. Ryan. The very heart of it.

Now let’s look what modification is needed to go for async reactive IEnumerable:

The while(!stop) loop will imitate stuff that we could do in main thread.
What we need to do with GetStuff() is call ToObservable() on it to – well – make it observable (thank You cpt. Obvious!) and pass a default Scheduler to make it async.
Then, we Subscribe to any notification from GetStuff(), that is: any incoming yield.
We would like to possibly do something when GetStuff() completes, so let’s break while loop with stop flag change.

The output of new program would look like this now:

Synchronous IEnumerable

Alright, alright. But where’s the modification to GetStuff() you made?
And that’s the beauty of Rx.Net – THERE IS NONE.

Thing to think of

If you look closely, you’ll spot that delegate passed to Observable was invoked on Observable thread.
On a second thought it’s an obvious thing, but should be kept in mind wherever lock is needed on non-atomic operations.

As for the next time, I’ll elaborate a little how I used Rx.Net so far in Graphinder. Stay tuned!

2 thoughts on “Graphinder – Async IEnumerable with Rx.Net and IObservable”

  1. Nice example. For me, when I was getting familiar with Rx, understanding that IObservable/IObserver are kind of mirror images of IEnumerable/IEnumerator was crutial to grasp Rx concept.

Leave a Reply

Your email address will not be published. Required fields are marked *