Categories
Programmierung

Serielle Kommunikation mit Reactive Extensions

Der Haupteinsatzzweck von Reactive Extensions (Rx) ist die Verarbeitung von Event-Streams. Ich habe Reactive Extensions eingesetzt, um Daten auf der seriellen Schnittstelle zu empfangen und habe dabei einige neue Dinge gelernt.

Hier ist der Code, der Observable.FromEventPattern<T>() verwendet, um aus dem .NET-Event SerialPort.DataReceivedEvent ein IObservable<T> zu erzeugen:

Das Event enthält leider keinerlei Informationen zu den empfangenen Daten, es signalisiert nur, dass neue Daten vorliegen. Das Auslesen der Daten erfolgt im Lambdaausdruck. Das Auslesen der seriellen Schnittstelle liefert eine Menge an Bytes. Diese Menge könnte genau eine Nachricht enthalten oder nur einen Teil einer Nachricht oder mehrere Nachrichten. Aus diesem Grund sollte das Observable ein IObservable<byte> sein, d.h. einen rohen Strom an Bytes erzeugen ohne einen Hinweis darauf, wo eine Nachricht anfängt oder aufhört. Dies wird erreicht über die Extension Method public static IObservable<TResult> SelectMany<TSource, TResult>(this IObservable<TSource> source, Func<TSource, IEnumerable<TResult>> selector), die verwendet wird, um die vom Lambdaausdruck zurückgegebenen Bytes in ein IObservable<byte> zu transferieren.

Zu diesem Zeitpunkt habe ich einen Strom an Bytes. Diese Bytes sollten idealerweise in Nachrichten gruppiert werden. Bei dem von mir eingesetzten Protokoll werden Nachrichten durch ein besonderes Byte von einander getrennt. Für die Aufteilung gibt es zwei mögliche Ansätze:

In diesem Beispiel wird ein neues Observable mit Hilfe von Observable.Create() erzeugt. Dieses Observable abonniert sich auf den Bytestrom, sammelt die Daten in einer lokalen Liste und feuert OnNext() sobald ein Nachrichtentrenner entdeckt wurde.

Diese Version verwendet den Scan()-Operator, um das gleiche zu erreichen. Die Ausgabe ist ein IObservable<IEnumerable<byte>>, der für jede neue Nachricht einIEnumerable<byte> feuert.

Dieser Code funktionierte sehr gut bis zu dem Zeitpunkt, als ich mehrere Observer an den Stream hängte: Einen zur Verarbeitung der Nachrichten und einen zur Ausgabe der Nachrichten auf der Debugkonsole. Hierdurch wurden die seriellen Daten immer nur von einem Subscriber gelesen, nicht von allen. Hierfür gibt es zwei mögliche Lösungen: Man kann ein Subject<IEnumerable<byte>> einführen, das sich auf serialPortSource abonniert und auf das sich alle Konsumenten abonnieren. Alternativ kann man den Publish()-Operator verwenden, der diese Arbeit übernimmt.

Aus dem neuen Observable, das Listen an Bytes produziert, lässt sich leicht mit Hilfe des Select()-Operators ein Observable erzeugen, das deserialisierte Nachrichten produziert.

Jetzt bleibt noch die Frage, wie man die empfangenen Daten in einem typischen Workflow verwendet: Das Senden einer Nachrichten und Empfangen der Antwort. Hier ist ein Beispiel:

Dieses Beispiel verwendet den Replay()-Operator. Replay sammelt alle Events, die das Observable nach Aufruf von Connect() feuert. Nach dem Aufruf von Connect() wird eine serielle Nachricht an das Gerät am anderen Ende der seriellen Leitung gesendet. Der zweite Aufruf von await filtert die eingehenden Nachrichten nach der erwünschten Nachrichten (und verwendet dabei ein Kriterium, das vor dem Aufruf noch gar nicht bekannt war), fügt ein Timeout hinzu, verwendet FirstAsync(), um ein Observable zu erzeugen, das nur das erste Element gefolgt von OnCompleted() zurückgibt und wartet dann auf das OnCompleted() mit Hilfe von await. Da Replay() alle Nachrichten aufzeichnet, werden beim folgenden await-Aufruf alle Nachrichten vom Gegenüber berücksichtigt, egal ob diese vor oder nach dem zweiten await-Aufruf eintrifft.

One reply on “Serielle Kommunikation mit Reactive Extensions”

Mir sind bei Ihren Codeauszügen einige Dinge nicht ganz klar bzw. sehe ich die ein oder andere Stelle etwas” anders”. Hätten Sie ein ein laufähiges Beispielprojekt? Vielen Dank für Ihre Rückmeldung.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert