The main purpose of Reactive Extensions (Rx) is to enable processing event streams. I set out to use Reactive Extensions for receiving data on the serial port and learned some new things.
The event does not actually contain any information on the data received, it only indicates that there is data available. Reading the data is done inside the lambda expression. Reading serial data will return a list of bytes. This list may contain a complete message or just a part of a message or even multiple messages. To handle this, I want the observable to be an
IObservable<byte>, i.e., it will produce a raw stream of bytes without any indication of where a message begins or ends. This is done through the extension method
public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector) that is used to flatten the sequence returned by the lambda.
So I now have a stream of bytes. I want these bytes to be chunked into messages. For my particular protocol, messages are separated by a special byte. Separation can be done in two ways:
Here, a new observable is created using
Observable.Create(). This observable subscribes to the byte stream, collects the data in a local collection and fires
OnNext() whenever a message delimiter is encountered.
This version uses the
Scan() operator to achieve the same thing. The output is an
IObservable<IEnumerable<byte>> that fires an
IEnumerable<byte> for every new message.
This code worked well up until the point I started attaching multiple observers to the message stream, one to process the messages and one to just dump received messages to a debug console. What happened then was that the code in the first code sample was called multiple times: once for each subscriber. This meant that each chunk of serial data was only received by one subscriber, not all subscribers. There are two possible solutions to this: Either introduce a
Subject<IEnumerable<byte>> subscribing to
serialPortSource and have consumers subscribe to the subject or use the
Publish() operator that does the work for you.
Creating a new observable that produces deserialized messages from the observable producing lists of bytes is now trivial using a simple
What remains is the question of how to use the received data in a typical workflow of sending out a message and receiving a response in return. Here is an example:
This example uses the
Replay() operator. Replay will capture all events from the observable that are fired after the call to
Connect(). After calling
Connect() the call is sent to the device at the other end of the serial connection. The second
await filters the incoming messages for the desired message (even using a filter criterion that was not known before the request was sent), adds a timeout, uses
FirstAsync() to return an observable that only returns the first element followed by
OnCompleted(), and waits for that
Replay() is capturing all messages, the following
await call on the observable should consider all answers from the target, whether they are received before or after the second call to
At Developer Week 2013 I gave a talk (in German) on Reactive Extensions. The slides are online on Slideshare. During the talk, I live-coded a part of a console-based jump and run game that uses Kinect as input. The first task was to detect when the player (the volunteer on stage) jumped. Here’s the code from the demo:
This code contains a lot of simplifications for presentation purposes but it can still be used to explain some Reactive Extension concepts. I’ll walk you through it and also spell out the
This retrieves the single Kinect that is attached to the PC (or throws an exception if there is not exactly one Kinect connected).
This creates an
IObservable from the classic .NET event
SkeletonFrameReady by telling Rx how to subscribe and unsubscribe from the event.
Interestingly, the class
SkeletonFrameReadyEventArgs doesn’t contain any properties at all, just one method
public SkeletonFrame OpenSkeletonFrame(); that can be used to access the skeleton data. The
SkeletonFrame instance must then be disposed within 1/30 second.
We now have an
IObservable that publishes a list of skeletons whenever there are people in front of the Kinect sensor.
This code extracts the joints from the single skeleton if it has a tracked state.
This extracts the average vertical position of the left and right foot. This is a simplification as it would be possible to trick the algorithm by just lifting one foot up twice as high as both feet would need to jump up. To fix this, you could
Select() both feet in this statement.
This is the actual jump detection code. The idea is that to have jumped, the player would have to have his feet low, then high and then low again in a short timespan. To analyze this, we’re looking at samples from a time frame of one second. After this, we’ll move forward by 200 milliseconds and analyze again. This magic is provided by the
Buffer() extension method. We’ll look for the maximum height of the feet within the time frame and see if the first and last samples are both lower than the maximum minus a hard-coded magic number (for simplification again). If the algorithm matches, the
IObservable outputs the string “jumped”, else the string “didn’t jump”.
If you add
jumped.Subscribe(Console.WriteLine); at this point you will see a stream of “didn’t jump” interrupted by a few instances of “jumped” whenever the player jumped.
The call to
DistinctUntilChanged() will only output the string if it differs from the previously output string. The next line simply filters to only output “jumped” whenever the player jumps.
This line will output the resulting
IObservable to the console.
This starts the Kinect sensor.
The presentation was a lot of fun. Thanks again to the volunteer! I’ll be back at Developer Week again this year talking about cross-platform mobile using C#.