Kategorien
Programmierung

Serielle Kommunikation mit Reactive Extensions

Der Haupteinsatzzweck von Reactive Extensions (Rx) ist die Verarbeitung von Event-Streams. Ich habe Reactive Extensions eingesetzt, um Daten auf der seriellen Schnittstelle zu empfangen und habe dabei einige neue Dinge gelernt.

Hier ist der Code, der Observable.FromEventPattern<T>() verwendet, um aus dem .NET-Event SerialPort.DataReceivedEvent ein IObservable<T> zu erzeugen:

Das Event enthält leider keinerlei Informationen zu den empfangenen Daten, es signalisiert nur, dass neue Daten vorliegen. Das Auslesen der Daten erfolgt im Lambdaausdruck. Das Auslesen der seriellen Schnittstelle liefert eine Menge an Bytes. Diese Menge könnte genau eine Nachricht enthalten oder nur einen Teil einer Nachricht oder mehrere Nachrichten. Aus diesem Grund sollte das Observable ein IObservable<byte> sein, d.h. einen rohen Strom an Bytes erzeugen ohne einen Hinweis darauf, wo eine Nachricht anfängt oder aufhört. Dies wird erreicht über die Extension Method public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector), die verwendet wird, um die vom Lambdaausdruck zurückgegebenen Bytes in ein IObservable<byte> zu transferieren.

Zu diesem Zeitpunkt habe ich einen Strom an Bytes. Diese Bytes sollten idealerweise in Nachrichten gruppiert werden. Bei dem von mir eingesetzten Protokoll werden Nachrichten durch ein besonderes Byte von einander getrennt. Für die Aufteilung gibt es zwei mögliche Ansätze:

In diesem Beispiel wird ein neues Observable mit Hilfe von Observable.Create() erzeugt. Dieses Observable abonniert sich auf den Bytestrom, sammelt die Daten in einer lokalen Liste und feuert OnNext() sobald ein Nachrichtentrenner entdeckt wurde.

Diese Version verwendet den Scan()-Operator, um das gleiche zu erreichen. Die Ausgabe ist ein IObservable<IEnumerable<byte>>, der für jede neue Nachricht einIEnumerable<byte> feuert.

Dieser Code funktionierte sehr gut bis zu dem Zeitpunkt, als ich mehrere Observer an den Stream hängte: Einen zur Verarbeitung der Nachrichten und einen zur Ausgabe der Nachrichten auf der Debugkonsole. Hierdurch wurden die seriellen Daten immer nur von einem Subscriber gelesen, nicht von allen. Hierfür gibt es zwei mögliche Lösungen: Man kann ein Subject<IEnumerable<byte>> einführen, das sich auf serialPortSource abonniert und auf das sich alle Konsumenten abonnieren. Alternativ kann man den Publish()-Operator verwenden, der diese Arbeit übernimmt.

Aus dem neuen Observable, das Listen an Bytes produziert, lässt sich leicht mit Hilfe des Select()-Operators ein Observable erzeugen, das deserialisierte Nachrichten produziert.

Jetzt bleibt noch die Frage, wie man die empfangenen Daten in einem typischen Workflow verwendet: Das Senden einer Nachrichten und Empfangen der Antwort. Hier ist ein Beispiel:

Dieses Beispiel verwendet den Replay()-Operator. Replay sammelt alle Events, die das Observable nach Aufruf von Connect() feuert. Nach dem Aufruf von Connect() wird eine serielle Nachricht an das Gerät am anderen Ende der seriellen Leitung gesendet. Der zweite Aufruf von await filtert die eingehenden Nachrichten nach der erwünschten Nachrichten (und verwendet dabei ein Kriterium, das vor dem Aufruf noch gar nicht bekannt war), fügt ein Timeout hinzu, verwendet FirstAsync(), um ein Observable zu erzeugen, das nur das erste Element gefolgt von OnCompleted() zurückgibt und wartet dann auf das OnCompleted() mit Hilfe von await. Da Replay() alle Nachrichten aufzeichnet, werden beim folgenden await-Aufruf alle Nachrichten vom Gegenüber berücksichtigt, egal ob diese vor oder nach dem zweiten await-Aufruf eintrifft.

Kategorien
Programmierung

async/await Teil 2: Das await-Schlüsselwort verstehen

In Teil 1 dieser Serie haben wir uns mit dem async-Schlüsselwort auseinandergesetzt. async funktioniert auch ohne await (auch wenn das nicht wirklich sinnvoll ist), aber await funktioniert ohne async überhaupt nicht. Um await innerhalb einer Methode zu verwenden, muss sie als async gekennzeichnet sein.

Das await-Schlüsselwort

Hier ist ein Beispiel einer Methode, die await verwendet:

Bis auf die Schlüsselwörter async und await und den seltsamen Rückgabewert sieht das genauso aus, wie eine synchrone Methode aussehen würde: Eine Webseite herunterladen und im Anschluss überprüfen, ob der angegebene reguläre Ausdruck auf den Inhalt der Webseite anzuwenden ist.

Und genau darum geht ist. Obwohl die Methode asynchron ist, sieht sie aus wie eine synchrone Methode. Nur dass sie im Gegensatz zur synchronen Version nicht den Aufrufer blockiert, bis sie fertig ist, nicht im Mouse-Click-Handler hängenbleibt und am wichtigsten: nicht den Rest der Anwendung einfriert.

Wie funktioniert das?

Beim Aufruf der Methode wird die Methode genauso ausgeführt wie jede andere Methode auch, bis der erste await-Aufruf erreicht wird. An dieser Stelle gibt die Methode dem Aufrufer einen Task zurück (nennen wir ihn den Methoden-Task), der Informationen über den weiteren Verlauf enthält. Sobald der von GetStringAsync() zurückgegebene Task abgeschlossen ist, wird der Rest der Methode ausgeführt. Hierbei sind alle lokalen Variablen wie gewohnt in ihrem vorherigen Zustand und verfügbar, genau wie in nicht-async-Methoden. Wenn die Methode den Rest ihrer Arbei erledigt hat (das Anwenden des regulären Ausdrucks), setzt sie auf magische Weise das Result des Methoden-Tasks auf den Wert, der im return-Ausdruck steht und markiert den Task als IsCompleted.

Üblicherwerise werden async-Methoden von anderen async-Methoden via await aufgerufen. Dadurch muss der Aufrufer der DoesWebContentMatchPatternAsync()-Methode nichts ungewöhnliches machen, um an den Rückgabewert zu gelangen. Es reicht, await vor den Aufruf zu schreiben.

Dies hat einen Kompilierungsfehler zur Folge, wenn die aufrufende Methode nicht ebenfalls async ist.

Wie führe ich await ein?

In der Praxis sieht passiert dies öfter beim Einführen von async/await. Das Schema ist immer das gleiche:

  1. Ein erstes await in die Methode schreiben.
  2. Feststellen, dass die Methode nicht mehr kompiliert, weil sie nicht async ist
  3. Die Methode selbst async machen.
    1. Wenn der Rückgabewert void: Rückgabewert in Task ändern.
    2. Wenn der Rückgabewert irgendein Typ T war: Rückgabewert in Task<T> ändern.
  4. Mit dem Lieblings-Refactoring-Tool das Suffix Async zum Methodennamen hinzufügen.
  5. Ab Schritt 1 für den Aufrufer wiederholen.

Hierdurch breitet sich typischerweise async/await schnell im gesamten Projekt aus, was jedoch positiv zu sehen ist.

Aber was passiert mit Exceptions?

Die Magie von await beinhaltet noch mehr. Ein weiterer aufwändiger Teil asynchroner Programmierung ist das Fangen und behandeln von Fehlern, die im Code auftreten, der im Hintergrund läuft. async/await löst dieses Problem auf elegante Weise durch weiterreichen von Exceptions über die Eigenschaften IsFaulted and Exception in der Task-Klasse. Das bedeutet, dass man Exceptions genauso behandeln kann wie in synchronem Code.

Wird eine Exception nicht gefangen, wird sie, wie bei synchronem Code, an die aufrufende Methode weitergereicht. Solange der Code durchgängig async/await verwendet, muss nicht weiter geändert werden.

Das leidige Thema mit dem UI-Thread

Das letzte Stück Magie nennt sich Kontextsynchronisation. Ein beliebter Stolperstein asynchroner Entwicklung rührt aus der Tatsache, dass die meisten UI-Technologien einen einzelnen UI-Thread besitzen, der alle Änderungen am UI durchführen muss. Ändert man ein UI-Element aus einem anderen Thread, führt dies typischerweise zu einer Exception. Hierzu ein weiteres Beispiel, das den obigen Code verwendet:

Dieser Code zeigt einen Event-Handler eines Button-Click-Events im Code-Behind unter WPF (also dort, wo man solche Funktionalität typischerweise nicht umsetzen sollte). Die Zeile, die resultBox.Text einen neuen Wert zuweist, ist eine Zeile, die unter WPF aus dem UI-Thread aufgerufen werden muss. Trotzdem steht hier kein Code, der diesen Aufruf an den UI-Kontext delegiert. Das liegt daran, dass das Standardverhalten von async/await so ist, dass es versucht, den Code hinter dem await wieder im den Ursprungskontext auszuführen.

Dieses Standardverhalten wird typischerweise nur in UI-Code benötigt und sollte für alle andern Fälle deaktiviert werden, da es zusätzlichen Aufwand bedeutet und Deadlocks verursachen kann. Es lässt sich durch einen Aufruf der von ConfigureAwait(false) auf den jeweiligen Task deaktivieren:

Hierdurch wird dem Compiler mitgeteilt, dass es OK ist, den Rest der Methode (also alles hinter dem await) im gleichen Kontext auszuführen, in dem auch der asynchrone Code hinter dem Schlüsselwort await ausgeführt wurde.

Einfach ausprobieren!

Probiere es einfach mal async/await aus, wenn Dein Projekt schon .NET 4.5 unterstützt. Es wird Deine Wahrnehmung asynchroner Entwicklung nachhaltig verändern.

Kategorien
Programmierung

async/await Teil 1: Das async-Schlüsselwort verstehen

Mit Abstand mein Lieblingsfeature in .NET 4.5 ist async/await oder, wie Microsoft es nennt, das Task-based Asynchronous Pattern (TAP). Ich wusste nicht wirklich, dass ich das all die Jahre hätte brauchen können, bis ich eine Aufzeichnung von Anders Hejlsbergs Build-Vortrag zu dem Thema sah. Kurz danach fand ich mich in einem stark asynchronen C++-Embedded-Projekt vor, das über ein Jahr dauerte und in dem ich mich nicht wohl fühlte, eine State Machine nach der anderen zu bauen um das inherente Problem aller asynchroner Anwendungen zu lösen: Was tun, wenn eine asynchrone Operation abgeschlossen ist?

Diese Blogserie wendet sich an C#-Entwickler, die sich für async/await interessieren. Ein Verständnis der mit .NET 4.0 eingeführten Task-Klasse ist von Vorteil. Im ersten Teil erkläre ich das async aus async/await.

Was ist async?

Mit dem async-Schlüsselwort kann eine Method oder ein Lambda dekoriert werden.

Hier sollte erwähnt werden, dass async nicht Teil der Signatur der Methode ist, daher kann man beim Implementieren eines Interfaces oder beim Überschreiben einer virtuellen oder abstrakten Methode entscheiden, ob man async verwendet oder nicht.

Rückgabewerte

Eine async-Method darf nur void, Task oder Task<T> für einen konkreten Typen T zurückgeben.

void sollte als Rückgabewert soweit wie möglich vermieden werden und wird fast ausschließlich in Eventhandlern gebraucht, wodurch die Methode eine Fire-and-Forget-Methode wird, bei der der Aufrufer keine Möglichkeit hat, zu erkennen, wann die Methode fertig oder fehlgeschlagen ist.

Bei einer Methode, die Task oder Task<T> zurückgibt, sollte man der Konvention nach das Suffix Async verwenden, um hervorzuheben, dass die Methode awaitable ist (unabhängig davon, ob die Implementierung async verwendet oder nicht.

Task als Rückgabewert sollte verwendet werden für Methoden, die, wenn sie synchron wären, void zurückgeben würden. Task<T> sollte verwendet werden für Methoden, die sonst einen Typen T zurückgeben würden (d.h. alles außer void). Den Task kann man sich als das Objekt vorstellen, das der Aufrufer verwenden kann, um mitzubekommen, was denn aus der asynchronen Methode geworden ist, die er angestoßen hat.

Welche Auswirkung hat async?

Durch das Schreiben von async passieren zwei Dinge mit der Methode oder dem Lambdaausdruck:

  1. Es erlaubt die Verwendung von await innerhalb der Methode (siehe meinen nächsten Blogbeitrag in dieser Serie).
  2. Wenn der Rückgabewert nicht void ist, übersetzt der Kompiler auf magische Weise die return-Anweisung (oder die fehlende return-Anweisung am Ende der Methode) in einen Task<T> oder Task.

Für eine Methode, die keine await-Aufrufe beinhaltet, bedeutet das, dass eine abgeschlossener Task zurückgegeben wird, ohne dass dies explizit angegeben werden muss. Für Das Beispiel oben heißt das, dass es sich genauso verhält, wie diese nicht-async-Version:

Eine Methode, die ein await durchläuft, gibt ein Task-Objekt zurück, dessen Zustand auf IsCompleted wechselt, sobald der letzte Aufruf, auf den await aufgerufen wurde, abgeschlossen ist und der darauf folgende synchrone Code (falls vorhanden) anschließend ebenfalls abgeschlossen ist. (Mehr hierzu in meinem nächsten Blogbeitrag in dieser Serie zum await-Schlüsselwort.)

Brauche ich async?

Methoden, die nur ein await als allerletzte Anweisung beinhalten, können grundsätzlich auch ohne das async-Schlüsselwort implementiert werden. Die Methode

ist z.B. äquivalent zu

Obwohl diese Methoden das gleiche Ergebnis liefern, wirkt die async-Version besser lesbarer, auch wenn sie leicht langsamer ist. Der andere Unterschied an dieser Stelle ist, dass, sollte die Methode stream.FlushAsync() eine Exception werfen, die Methode FlushTheStreamAsync() nicht im Call Stack der Exception auftaucht (mehr hierzu im nächsten Blogbeitrag).

Wie hilft mir das weiter?

Wie bereits erwähnt, kann das zurückgegebene Task-Objekt verwendet werden, um den Zustand des asynchronen Aufrufs zu analysieren (Läuft er noch? Ist er fertig? Ist er fehlgeschlagen? Wurde er abgebrochen?). Auch wenn man diese Untersuchungen über die diversen Methoden und Eigenschaften der Task-Klasse möglich ist, ist es meistens deutlich einfacher, hierzu das await-Schlüsselwort zu verwenden, das im nächsten Blogbeitrag erläutert wird.