Schlagwort-Archive: Reactive Extensions

Serielle Kommunikation mit Reactive Extensions

Der Haupteinsatzzweck von Reactive Extensions (Rx) ist die Verarbeitung von Event-Streams. Ich habe Reactive Extensions eingesetzt, um Daten auf der seriellen Schnittstelle zu empfangen und habe dabei einige neue Dinge gelernt.

Hier ist der Code, der Observable.FromEventPattern<T>() verwendet, um aus dem .NET-Event SerialPort.DataReceivedEvent ein IObservable<T> zu erzeugen:

Das Event enthält leider keinerlei Informationen zu den empfangenen Daten, es signalisiert nur, dass neue Daten vorliegen. Das Auslesen der Daten erfolgt im Lambdaausdruck. Das Auslesen der seriellen Schnittstelle liefert eine Menge an Bytes. Diese Menge könnte genau eine Nachricht enthalten oder nur einen Teil einer Nachricht oder mehrere Nachrichten. Aus diesem Grund sollte das Observable ein IObservable<byte> sein, d.h. einen rohen Strom an Bytes erzeugen ohne einen Hinweis darauf, wo eine Nachricht anfängt oder aufhört. Dies wird erreicht über die Extension Method public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector), die verwendet wird, um die vom Lambdaausdruck zurückgegebenen Bytes in ein IObservable<byte> zu transferieren.

Zu diesem Zeitpunkt habe ich einen Strom an Bytes. Diese Bytes sollten idealerweise in Nachrichten gruppiert werden. Bei dem von mir eingesetzten Protokoll werden Nachrichten durch ein besonderes Byte von einander getrennt. Für die Aufteilung gibt es zwei mögliche Ansätze:

In diesem Beispiel wir ein neues Observable mit Hilfe von Observable.Create() erzeugt. Dieses Observable abonniert sich auf den Bytestrom, sammelt die Daten in einer lokalen Liste und feuert OnNext() sobald ein Nachrichtentrenner entdeckt wurde.

Diese Version verwendet den Scan()-Operator, um das gleiche zu erreichen. Die Ausgabe ist ein IObservable<IEnumerable<byte>>, der für jede neue Nachricht einIEnumerable<byte> feuert.

Dieser Code funktionierte sehr gut bis zu dem Zeitpunkt, als ich mehrere Observer an den Stream hängte: Einen zur Verarbeitung der Nachrichten und einen zur Ausgabe der Nachrichten auf der Debugkonsole. Hierdurch wurden die seriellen Daten immer nur von einem Subscriber gelesen, nicht von allen. Hierfür gibt es zwei mögliche Lösungen: Man kann ein Subject<IEnumerable<byte>> einführen, das sich auf serialPortSource abonniert und auf das sich alle Konsumenten abonnieren. Alternative kann man den Publish()-Operator verwenden, der diese Arbeit übernimmt.

Aus dem neuen Observable, das Listen an Bytes produziert, lässt sich leicht mit Hilfe des Select()-Operators ein Observable erzeugen, das deserialisierte Nachrichten produziert.

Jetzt bleibt noch die Frage, wie man die empfangenen Daten in einem typischen Workflow verwendet: Das Senden einer Nachrichten und Empfangen der Antwort. Hier ist ein Beispiel:

Dieses Beispiel verwendet den Replay()-Operator. Replay sammelt alle Events, die das Observable nach Aufruf von Connect() feuert. Nach dem Aufruf von Connect() wird eine serielle Nachricht an das Gerät am anderen Ende der seriellen Leitung gesendet. Der zweite Aufruf von await filtert die eingehenden Nachrichten nach der erwünschten Nachrichten (und verwendet dabei ein Kriterium, das vor dem Aufruf noch gar nicht bekannt war), fügt ein Timeout hinzu, verwendet FirstAsync(), um ein Observable zu erzeugen, das nur das erste Element gefolgt von OnCompleted() zurückgibt und wartet dann auf das OnCompleted() mit Hilfe von await. Da Replay() alle Nachrichten aufzeichnet, werden beim folgenden await-Aufruf alle Nachrichten vom Gegenüber berücksichtigt, egal ob diese vor oder nach dem zweiten await-Aufruf eintrifft.

DWX13-Nachtrag: Sprünge mit Kinect und Reactive Extensions erkennen

Bei der Developer Week 2013 habe ich einen Vortrag über Reactive Extensions gehalten. Die Folien sind auf Slideshare zu finden. Im Laufe des Vortrags habe ich einen Teil eines konsolenbasierten Jump-and-Run-Spiels live kodiert, das eine Kinect als Eingabemedium verwendet. Die erste Aufgabe war es, zu erkennen, wenn ein Spieler (der Freiwillige auf der Bühne) in die Luft springt. Hier ist der Code aus der Demo:

Dieser Code beinhaltet viele Vereinfachungen zu Demonstrationszwecken, kann aber trotzdem gut verwendet werden, um einige Konzepte, die hinter Reactive Extensions stecken, zu erklären. Ich werde die einzelnen Teile erklären und die var durch die spezifischen Typen ersetzen:

Diese Zeile greift auf die eine Kinect zu, die an den Rechner angeschlossen ist (und wirft eine Exception, wenn nicht genau eine Kinect angeschlossen ist).

Diese Anweisung erzeugt aus einem klassischen .NET-Event SkeletonFrameReady ein IObservable, indem es Rx beschreibt, wie man sich von dem Event an- und abmeldet.

Interessanterweise beinhaltet die Klasse SkeletonFrameReadyEventArgs überhaupt keine Eigenschaften sondern nur eine Methode public SkeletonFrame OpenSkeletonFrame();, über die man an die Skelettdaten gelangt. Die Instanz von SkeletonFrame muss daraufhin innerhalb von 1/30 Sekunde wieder über Dispose() zerstört werden.

Jetzt haben wir ein IObservable, das eine Liste an Skeletten von sich gibt, sobald sich Personen vor dem Kinect-Sensor befinden.

Dieser Code extrahiert die Gelenke aus dem einen Skelett, wenn es den Zustand SkeletonTrackingState.Tracked hat.

Hierdurch wird die durschnittliche vertikale Position des linken und rechten Fußes ermittelt. Dies ist eine Vereinfachung, da es auch möglich wäre, den Algorithmus zu überlisten, indem man einen Fuß doppelt so hoch hebt als man eigentlich mit zwei Füßen hätte springen sollen. Als Alternative könnte man im Select() beide Füße extrahieren.

Hier wird der eigentliche Sprung detektiert. Das Idee ist die folgende: Um gesprungen zu sein, müsste ein Spieler beide Füße in einer kurzen Zeitspanne erst tief, dann hoch und dann wieder tief haben. Um das zu ermitteln, analysieren wir eine Zeitspanne von einer Sekunde. Nach dieser Analyse bewegen wir und im Zeitstrahl 200 Millisekunden weiter und analysieren wieder. Diese Magie liefert und die Methode Buffer(), die eine Extension Method von IObservable ist. Innerhalb dieser Sekunde ermitteln wir den Maximalwert und bilden die Differenz zum ersten und letzten Wert des Zeitfensters. Zur Vereinfachung wird die Differenz mit einem hartkodierten Wert verglichen. Wenn der Algorithmus zuschlägt, enthält das resultierende IObservable einen Strom an entweder “jumped” oder “didn’t jump”.

Wenn man an dieser Stelle ein jumped.Subscribe(Console.WriteLine); einfügt, sieht man eine lange Folge von “didn’t jump” unterbrochen durch ein paar Vorkommnisse von “didn’t jump” an den Stellen, an denen der Spieler in die Luft gesprungen ist.

Durch den Aufruf von DistinctUntilChanged() gibt das IObservable nur einen Wert aus, wenn er sich vom Vorgängerwert unterscheidet. Die nächste Zeile filter dieses noch auf den Wert “jumped”, d.h. es wird immer einmal “jumped” ausgegeben, wenn der Spieler in die Luft springt.

Diese Zeile gibt das resultierende IObservable auf der Konsole aus.

Hierdurch wird der Kinect-Sensor gestartet.

Die Präsentation hat viel Spaß gemacht. Vielen Dank an den Freiwilligen! Dieses Jahr werde ich wieder auf der Developer Week sprechen, mit einem Vortrag über Cross-Plattform Mobile mit C#.