Categories
Programming

Serial communication using Reactive Extensions

The main purpose of Reactive Extensions (Rx) is to enable processing event streams. I set out to use Reactive Extensions for receiving data on the serial port and learned some new things.

Here is the code that uses Observable.FromEventPattern<T>() to create an IObservable<T> from the .NET event SerialPort.DataReceivedEvent:

The event does not actually contain any information on the data received, it only indicates that there is data available. Reading the data is done inside the lambda expression. Reading serial data will return a list of bytes. This list may contain a complete message or just a part of a message or even multiple messages. To handle this, I want the observable to be an IObservable<byte>, i.e., it will produce a raw stream of bytes without any indication of where a message begins or ends. This is done through the extension method public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector) that is used to flatten the sequence returned by the lambda.

So I now have a stream of bytes. I want these bytes to be chunked into messages. For my particular protocol, messages are separated by a special byte. Separation can be done in two ways:

Here, a new observable is created using Observable.Create(). This observable subscribes to the byte stream, collects the data in a local collection and fires OnNext() whenever a message delimiter is encountered.

This version uses the Scan() operator to achieve the same thing. The output is an IObservable<IEnumerable<byte>> that fires an IEnumerable<byte> for every new message.

This code worked well up until the point I started attaching multiple observers to the message stream, one to process the messages and one to just dump received messages to a debug console. What happened then was that the code in the first code sample was called multiple times: once for each subscriber. This meant that each chunk of serial data was only received by one subscriber, not all subscribers. There are two possible solutions to this: Either introduce a Subject<IEnumerable<byte>> subscribing to serialPortSource and have consumers subscribe to the subject or use the Publish() operator that does the work for you.

Creating a new observable that produces deserialized messages from the observable producing lists of bytes is now trivial using a simple Select().

What remains is the question of how to use the received data in a typical workflow of sending out a message and receiving a response in return. Here is an example:

This example uses the Replay() operator. Replay will capture all events from the observable that are fired after the call to Connect(). After calling Connect() the call is sent to the device at the other end of the serial connection. The second await filters the incoming messages for the desired message (even using a filter criterion that was not known before the request was sent), adds a timeout, uses FirstAsync() to return an observable that only returns the first element followed by OnCompleted(), and waits for that OnCompleted() using await. Since Replay() is capturing all messages, the following await call on the observable should consider all answers from the target, whether they are received before or after the second call to await.

Categories
Programming

async/await part 2: Understanding the await keyword

In part 1 of this series we looked at the async keyword. async without await is possible (though not very useful) but await without async not. If you want to use await inside a method it must be marked as async.

The await keyword

Here is an example of a method using await:

Apart from the keywords async and await and the funny return value this looks just as you would write a synchronous method: Download a web page, then apply a regular expression to its contents to see if it matches.

And that is just the point. Although this method is asynchronous, it looks and feels like a synchronous method. But unlike its counterpart, the async method won’t block the caller until it is finished, won’t stay stuck in your mouse click event handler and, most importantly, won’t freeze the rest of your application.

How does does it work?

What happens when this method is called is that it executes just like any other method up to the point where the first await is encountered. At that point, the method returns to the caller with a Task (let’s call it method task) that contains information about the state of its work. As soon as the Task returned by GetStringAsync() completes the remainder of the method is executed. Note that all local variables are preserved and are available just as in a standard non-async method. When the method is finished with completing the remainder of its work (applying the regular expression) it will magically set the method task’s Result property to the value passed to the return statement and mark the Task as completed.

Typically, calls to async methods will be called and awaited by other async methods so the caller of our DoesWebContentMatchPatternAsync() method will not have to do anything special to receive this result. All it takes is writing await in front of the method call.

This will generate a compiler error if that method is not async.

How do I introduce await?

In practice, you’ll see this happen a lot when asyncifying existing code. The procedure is always the same:

  1. Add an await inside a method.
  2. Realize it doesn’t compile anymore because it is not async.
  3. Add async to the method’s declaration.
    1. If the return type was void, change it to Task.
    2. If the return type was of any type T, change it to Task<T>.
  4. Add the Async suffix to your method name using your favorite refactoring tool.
  5. Repeat from step 1 for the calling method.

This typically leads to async/await creeping through your entire codebase after some time but will leave a good feeling of accomplishment afterwards.

But what about exceptions?

But the magic of await doesn’t stop there. Another tedious task of asynchronous programming is catching and handling errors that occur in code running in the background. async/await handles this problem very elegantly by propagating exceptions through the Task‘s IsFaulted and Exception properties. What this means is that you can write exception handling code just as you would with synchronous code.

If you don’t catch the exception it will propagate to the calling method, just like with synchronous code. As long a your code is async/await all the way you won’t have to do anything differently.

Background work and the UI thread

The last piece of magic comes in the form of context synchronization. Another common pitfall in asynchronous programming comes from the fact that most UI technologies rely on a single UI thread to handle all modifications of the UI. Modifying a UI element from a different thread will typically lead to an exception being thrown. Let’s look at another piece of code using our method from above:

The code shows an event handler for a button click in a WPF code-behind (so not the place you would typically implement this functionality). The line assigning a new value to resultBox.Text is a line that has to be called from the UI thread in WPF. However, there is no code marshaling the call back to the UI context. This is because the default behavior of async/await is to try and execute the remainder of the code following an await call back into the original context for you.

This behavior is typically only needed for UI code and should be disabled for all other code as it adds overhead and may cause deadlocks. Disabling is achieved by calling ConfigureAwait(false) on the Task to be awaited:

This tells the compiler that it is OK to execute the remainder of the method (i.e., everything following the call to await) in the same context as the work done in the awaited call.

Give it a shot!

If you’re already on .NET 4.5 I suggest you give async/await a try. It will change the way you look at asynchronous programming.

Categories
Programming

async/await part 1: Understanding the async keyword

By far my favorite feature of .NET 4.5 is async/await, or, as Microsoft call it, the Task-based Asynchronous Pattern (TAP). It is something I didn’t know I was missing until I saw a recording of Anders Hejlsberg’s Build talk on the subject. Shortly after this, I had a highly asynchronous C++ embedded project that lasted for over a year where I felt miserable building state machine after state machine to handle the inherent problem of all asynchronous programs: What do you do after your asynchronous operation completes?

This blog series is aimed at C# programmers new to async/await. A general understanding of the Task class introduced with .NET 4.0 is expected. In this first part, I’ll explain the async in async/await.

What is async?

The async keyword can be used to decorate a method or lambda expression.

It is important to note that async is not a part of the method signature, so if you are implementing an interface or overriding a virtual or abstract method you have a choice of whether to use async or not.

Return types

An async method may only return void, Task or Task<T> for any concrete type T.

void as a return type should be avoided where possible and is generally only needed in event handlers, making the method a fire-and-forget method where the caller has no way of knowing when the call finished or failed.

If you write a method returning Task or Task<T> you should generally follow the convention of using the suffix Async in your method name to indicate that the method can be awaited (regardless of whether you method’s implementation uses async or not).

Task should be returned for methods that would, if they were synchronous methods, return void. Task<T> should be returned for methods that would otherwise return some type T (i.e., anything not void). Think of the Task as the object the caller can use to keep track of what ever happened to that asynchronous method he triggered.

What effect does async have?

Writing async will do two things to your method or lambda:

  1. It will allow you to write await in your method (see my next blog post in this series).
  2. If your return type is not void the compiler will automagically convert your return statement (or the missing return statement at the end of your method) into a Task<T> or Task.

For a method that does not contain any await calls, this means a completed Task will be returned without the need to explicitly specify this. For the above example this means it will behave exactly like this non-async version of it:

For a method that does contain an await in the executed path, the method will return a Task object that will turn to the IsCompleted state when the last awaited call has completed and the synchronous code following it (if any) has finished executing. (For more on this, read my next blog post in this series about the await keyword.)

Do I need async?

Methods containing only one await statement as the very last instruction in the method may generally be written without the async keyword. For example, the method

is equivalent to

Even though this yields the same result, the method declared as async seems to be the more readable version and has a slight runtime penalty. The other difference here is that if stream.FlushAsync() throws an exception and await is not used FlushTheStreamAsync() will not appear in the exception’s call stack (more on exceptions in the next blog post).

How does that help me?

As mentioned earlier, the returned Task object can be used to examine the state of the asynchronous call (still running? completed? failed? cancelled?). While this is possible using the various methods of the Task class, the easiest way to handle this is through the await keyword handled in the next blog post.