Im vorherigen Beispiel haben wir eine Aufzeichnung eines DCF 77 Signals mittels Python Funktionen demoduliert. Die File basierte Verarbeitung ist dabei ein gutes Hilfsmittel, um die einzelnen Komponenten und mathematischen Methoden zu testen und das Eingangsfile zu prüfen. Der Code ist jedoch nicht echtzeitfähig.
Im Beispiel ist die gesamte Datei im Speicher und kann als Array verarbeitet werden. Die Verarbeitung beginnt erst, wenn scipy.io.wavfile.read() die Datei vollständig geladen hat.
In Echtzeitsystemen verarbeiten wir einen kontinuierlichen Datenstrom, es gibt keinen definierten Anfang und kein definiertes Ende. Die Verarbeitung beginnt irgendwo währen der Aussendung des Signals. Einzelne Teile müssen ggf. parallel verarbeitet werden.
Bei der Verarbeitung des Python Files ergibt sich der Zeitbezug aus dem Index des Datenarrays. Man hat auf jeden Zeitpunkt des gesamten Signals über das Array Zugriff. Wenn man die Samplerate kennt, lässt sich alles berechnen.
In GNU Radio gibt es keine Variable t für die Zeit. Es gibt nur einen Sample Zähler. Ein Block kennt nur die Anzahl der Samples, die ihm aktuell übergeben wurden, er weiß aber nicht um welchen absoluten Zeitpunkt es sich handelt.
Bei der Echtzeitverarbeitung gibt es keine Zukunft. Wenn eine fallende Flanke kommt, musst man sich den Zählerstand merken und warten, bis die steigende Flanke eintrifft. Die Systemzeit spielt bei der Echtzeitverarbeitung keine Rolle, da nicht sichergestellt werden kann, dass ein Decoder die Daten unmittelbar verarbeitet.
Ablauf in GNU Radio

Der Start für das Auslesen der Daten in GnuRadio folgt dem gleichen Konzept, wie das Python Programm aus dem vorherigen Beispiel. Doch Achtung, jetzt arbeiten wir Blockorientiert und mit Echtzeitdaten.
Da wir aber weiterhin eine Konserve analysieren, also keine Hardware haben, die den Takt vorgibt, erfolgt die Verarbeitung mit voller CPU Kapazität des Rechners. Ohne Bremse liest GNU Radio die Datei so schnell ein, wie es die Festplatte und die CPU erlauben. Die nachfolgenden Blöcke werden mit Millionen von Samples pro Sekunde “geflutet”. Das Programm verbraucht in diesem Fall 100% der CPU-Leistung für eine Aufgabe, die eigentlich nur Echtzeitgeschwindigkeit benötigen würde.
Der Throttle-Block fungiert als künstlicher Engpass. Er limitiert den Durchfluss der Samples auf die eingestellte Sample-Rate. Der Throttle-Block sorgt also dafür, dass sich die Simulation so verhält, als würde sie an einer echten Antenne hängen.
Das gibt die Hoffnung, dass wir im nächsten Schritt die WAV Datei durch eine SDR Empfänger ersetzen können.
Der Hilbert Block erzeugt uns aus den WAV Daten IQ-Daten, er ist also derzeit nur ein Mathematisches Hilfsmittel, bis wie echte Daten verwenden können.
Der ComplexToMag Block erzeugt uns aus dem Datenstrom die Hüllkurve, indem er die Beträge der einzelnen Samples bildet.
Das Ergebnis sieht nun so aus:

Mit Hilfe eines Tiefpassfilters muss das Signal nun noch geglättet werden.
Das so erhaltene Signal lässt sich nun gut weiterverarbeiten. Zur Erinnerung, wir haben ein ASK Signal, in der Amplitude der Hüllkurve liegt also die Information.
Jeder Übergang von High auf Low und die Dazer, wie lange das Signal Low ist, ergibt das Bit.
Mit der Tiefpassfilterung haben wir einen Großteil der hohen Frequenzen eliminiert. Wir können also die Samplerate reduzieren. Dazu wird eine Decimation im Tiefpassfilter eingestellt. Die Samplerate aller nachfolgenden Schritte erfolgt also mit weniger Daten und entlastet den Rechner damit.

Das Signal wurde mit einer Samplerate von 12000 Bit/s aufgezeichnet. Nach der Tiefpassfilterung können wir problemlos um den Faktor 10 reduzieren.

Mit Hilfe des Threshold Blocks können wir aus denm Datenstrom nun echte 0 und 1 Signale machen. Im Python Programm hatten wir hierzu Schwellwerte programmiert. Diese können im Threshold-Block eingegeben werden.
Hier liegt einen Baustelle, die wir bei der Arbeit mit echten Daten noch lösen müssen. Die Signalstärke ist in unsere Konserve bekannt. Bei echten Daten kann sie variieren, die Schwellwerte müssen später noch dynamisiert werden.
Verarbeitung und Decodierung der Bits
Was haben wir nun erreicht ? Unser Workflow liefert einen kontinuierlichen Strom von Nullen und Einsen. Noch wissen wir nicht, was ein einzelnen Bit bedeutet, wir müssen den Anfang der Verarbeitungskette ermitteln.
Eine Minute im DCF77 Signal wird dadurch gekennzeichnet, dass ein Signal mit Pegel High 2 Sekunden ausgesendet wird. Eine 0 ist ein Signal, das einen Low Pegel von 100ms besitzt, eine 1 besitzt einen LOW Pegel von 200ms.
Der Nächste Schritt erfordert es also die Dauer der einzelnen Zustände zu ermitteln. Dazu benötigen wir einen eigenen Blok in dem wir das programmieren können.
Wie Embedded Python Blöcke funktionieren, wurde bereits an anderer Stelle beschrieben.
Um die Zeitdauer zu ermitteln, müssen wir also erkennen, wann ein Signalwechsel stattfindet und die Zeitdauer bis zum nächsten Wechsel messen. Da die Verarbeitungsdauer aber nicht zwingend mit der Signaldauer übereinstimmt, heißt es Samples zu zählen und über die Samplerate die Dauer zu berechnen.
Wir arbeiten an dieser Stelle nach der Decimation mit 1200 Samples pro Sekunde.
100ms enthalten also 120 Samples, 200ms 240Samples und 2min 2400Samples.
Das sind die 3 Werte die wir benötigen.
Eine Falle lauert noch, wir arbeiten jetzt mit Blöcken. Wir können also nicht davon ausgehen, dass der Signalwechsel 0/1 Übergang im gleichen Block stattfindet. Wir müssen daher die history Funktion im Block bemühen.
import numpy as np
from gnuradio import gr
import pmt
class blk(gr.sync_block):
def __init__(self, samp_rate=1200, debounce_ms=20,
threshold_low=0.4, threshold_high=0.6):
gr.sync_block.__init__(
self,
name="DCF77 Tagger",
in_sig=[np.float32],
out_sig=[np.float32]
)
self.samp_rate = samp_rate
self.set_history(2)
# Schmitt-Trigger Schwellwerte (Hysterese)
self.threshold_low = threshold_low
self.threshold_high = threshold_high
# Debouncing-Parameter
self.debounce_samples = int((debounce_ms / 1000.0) * samp_rate)
self.stable_high_count = 0
self.stable_low_count = 0
self.confirmed_state = None
# Messzustand
self.state = "IDLE"
self.count = 0
self.pulse_start_abs = None # Absolute Sample-Position der fallenden Flanke
# Timeout-Parameter
self.max_count = int((0.4 * samp_rate))
print(f"Debounce: {self.debounce_samples} samples ({debounce_ms}ms)")
print(f"Hysterese: LOW<{threshold_low} | HIGH>{threshold_high}")
def work(self, input_items, output_items):
in0 = input_items[0]
out = output_items[0]
current_abs_pos = self.nitems_read(0)
for i in range(1, len(in0)):
curr = in0[i]
prev_confirmed = self.confirmed_state
# Schmitt-Trigger mit Hysterese
if curr >= self.threshold_high:
self.stable_high_count += 1
self.stable_low_count = 0
elif curr <= self.threshold_low:
self.stable_low_count += 1
self.stable_high_count = 0
# Bestätige Zustandswechsel
if self.stable_high_count >= self.debounce_samples:
self.confirmed_state = "HIGH"
elif self.stable_low_count >= self.debounce_samples:
self.confirmed_state = "LOW"
# Erkenne Flanken
if prev_confirmed is not None and self.confirmed_state != prev_confirmed:
# FALLENDE FLANKE (HIGH -> LOW)
if self.confirmed_state == "LOW" and prev_confirmed == "HIGH":
if self.state == "MEASURE":
duration_ms = (self.count / self.samp_rate) * 1000
print(f"⚠️ Abbruch bei {self.pulse_start_abs}: {duration_ms:.1f}ms (neue Flanke)")
self.state = "MEASURE"
self.count = 0
# Speichere absolute Position (aktuell + i)
self.pulse_start_abs = current_abs_pos + i
print(f"↓ Fallende Flanke bei Sample {self.pulse_start_abs}")
# STEIGENDE FLANKE (LOW -> HIGH)
elif self.confirmed_state == "HIGH" and prev_confirmed == "LOW":
if self.state == "MEASURE" and self.pulse_start_abs is not None:
duration_ms = (self.count / self.samp_rate) * 1000
# Klassifizierung
bit_key = pmt.intern("dcf77_bit")
if 70 <= duration_ms <= 150:
val = pmt.from_long(0)
bit_str = "0"
elif 160 <= duration_ms <= 300:
val = pmt.from_long(1)
bit_str = "1"
else:
val = pmt.intern(f"X:{int(duration_ms)}")
bit_str = "X"
# Tag auf die AKTUELLE Position setzen (steigende Flanke)
tag_pos = current_abs_pos + i
print(f"✓ Bit '{bit_str}' Puls: {self.pulse_start_abs}→{tag_pos}, Dauer: {duration_ms:.1f}ms")
# Tags auf aktuelle Position setzen (immer gültig!)
self.add_item_tag(0, tag_pos, bit_key, val)
self.add_item_tag(0, tag_pos,
pmt.intern("ms"),
pmt.from_double(duration_ms))
self.add_item_tag(0, tag_pos,
pmt.intern("pulse_start"),
pmt.from_uint64(self.pulse_start_abs))
self.state = "IDLE"
self.pulse_start_abs = None
# Während der Messung: Zähle weiter
if self.state == "MEASURE":
self.count += 1
# TIMEOUT-CHECK
if self.count > self.max_count:
duration_ms = (self.count / self.samp_rate) * 1000
print(f"⚠️ Timeout bei {self.pulse_start_abs}: {duration_ms:.1f}ms")
self.state = "IDLE"
self.pulse_start_abs = None
# Korrektes Output-Slicing
out[:] = in0[1:len(out)+1]
return len(out)Der Python Block verarbeitet das eingehende Signal in 3 Stufen:
1. Signalpegel-Ermittlung (Schmitt-Trigger & Hysterese)
Anstatt nur einen einzelnen Schwellenwert zu nutzen, verwendet der Code zwei Werte, um Rauschen zu unterdrücken:
Debouncing (debounce_samples): Das ist eine digitale Totzeit. Ein Pegel wird erst als confirmed_state übernommen, wenn er für eine bestimmte Anzahl an Samples (stable_high_count bzw. stable_low_count) stabil anliegt. Dies verhindert, dass kurze Störspitzen (Spikes) als Signalflanken fehlinterpretiert werden.
threshold_low (0.4) & threshold_high (0.6): Dies bildet eine Hysterese. Ein Signal gilt erst als “HIGH”, wenn es 0.6 überschreitet, und erst wieder als “LOW”, wenn es unter 0.4 fällt. Werte dazwischen lösen keine Zustandsänderung aus.
2. Zeitliche Erfassung (Dauer-Ermittlung)
Sobald die Pegel stabilisiert sind, wird die Zeit (Dauer) gemessen:
Umrechnung in Millisekunden:
Flankenerkennung: Der Code vergleicht prev_confirmed mit self.confirmed_state.
Fallende Flanke (HIGH → LOW): Markiert beim DCF77 den Beginn einer Absenkung (eines Pulses). Hier wird self.count auf 0 gesetzt und die absolute Position self.pulse_start_abs gespeichert.
Steigende Flanke (LOW → HIGH): Markiert das Ende des Pulses.
self.count: Dieser Zähler wird in jedem Durchlauf der work-Methode inkrementiert, solange sich das System im Zustand MEASURE befindet.
3. Funktionale Logik der DCF77-Bits
Nachdem die Dauer eines Pulses feststeht, erfolgt die Klassifizierung in die logischen Zustände des DCF77-Protokolls:
| Dauer (ms) | Interpretation | Bedeutung im Code |
| 70 – 150 ms | Logische “0” | val = 0, Tag: dcf77_bit |
| 160 – 300 ms | Logische “1” | val = 1, Tag: dcf77_bit |
| Sonstige | Fehler / Unbekannt | val = X, Tag mit Zeitwert |
self.max_count(Timeout): Wenn der Puls länger als 400ms dauert (0.4 * samp_rate), bricht das System die Messung ab. Dies ist ein Schutzmechanismus gegen Signalverlust oder Dauerstörungen.
4. Metadaten-Tagging
Das Ergebnis der Ermittlung wird nicht nur in die Konsole gedruckt, sondern über GNU Radio Tags fest mit dem Datenstrom verknüpft:
add_item_tag: Schreibt die Information direkt an das Sample der steigenden Flanke (tag_pos).- Gespeicherte Werte: Das erkannte Bit (0/1), die exakte Dauer in
msund der Startpunkt des Pulses. Dies ermöglicht es nachfolgenden Blöcken (z.B. einem Decoder), die Zeitinformationen präzise zu verarbeiten.
Ergebnis dieser Verarbeitung ist nun eine saubere Ermittlung der einzelnen Signalpegel.

Die Zusände sin mittels Steam Tags fest. mit dem Signal verbunden. Die Folgeverarbeitung kann sich nun vollständig auf die Verarbeitung der Tags konzentrieren
Ein Hinweis an dieser Stelle:
Das Tag ist mit der steigenden Flanke verbunden. In meiner ersten Version war dies an der Fallenden. Immer dann, wenn der Samplewert dieser Flanke in der History des Blockes war, wurde das Tag nicht gespeichert.
Bitanalyse
Wir kennen nun die einzelnen Bits (Dauer und Wert), wir können uns daher nun an die Auswertung der Daten machen.
Ein neuer Python Block ist dafür vorgesehen.
Der DCF77 Decoder Block
sammelt die Tags aus dem vorherigen Block, erkennt das Ende einer Minute und rechnet die Bit-Positionen in lesbare Daten um.
Die Tags werden mittels der Funktion get_tags_in_window ermittelt.
tags = self.get_tags_in_window(0, 0, len(in0)) liefert demnach alle Tags die im Input Port 0 enthalten sind. Der gesamte Puffer wird analysiert.
Unsere Information ist im Tag mit dem Key “dcf77_bit” als PMT (polymorpher type) gespeichert. Über die To_python Methode können wir auf die Inhalte zugreifen (Wert und offset) .
if pmt.to_python(tag.key) == “dcf77_bit”:
val = pmt.to_python(tag.value)
offset = tag.offset
Im Block kommt zudem ein message port zum Einsatz, um die Daten an für die Darstellung weiterzuleiten.
Folgende Funktionen sind implementiert:
BCD Dekodierung.
DCF77 nutzt den Binary Coded Decimal (BCD) Code. Das bedeutet, eine Zahl (z. B. die Minute) wird nicht als eine große Binärzahl gesendet, sondern in Einer- und Zehnerstellen aufgeteilt. Die Funktion multipliziert jedes empfangene Bit mit seinem “Gewicht” und summiert alles auf.
Beispiel: Die Bits für die “Einer” haben die Gewichte 1, 2, 4, 8. Die Bits für die “Zehner” haben 10, 20, 40.
DCF77 Protokoll dekodieren
Hier steckt das Wissen über den DCF77-Standard. Der Block greift gezielt auf bestimmte Indizes in der Liste bits zu:
- Bits 21–28: Minuten
- Bits 29–35: Stunden
- Bits 17–18: Zeitzone (MESZ Sommerzeit oder MEZ Winterzeit)
- Bits 50–58: Das Jahr (z. B. 26 für 2026)
import numpy as np
from gnuradio import gr
import pmt
class blk(gr.sync_block):
def __init__(self, working_rate=1200):
gr.sync_block.__init__(self, name="DCF77 Decoder", in_sig=[np.float32], out_sig=None)
self.working_rate = working_rate
self.bits = []
self.last_tag_offset = 0
self.first_bit_received = False
self.full_container = "--:--|0|Warte auf Frame|--|--"
self.message_port_register_out(pmt.intern("out"))
def bcd_decode(self, bit_slice, weights):
return sum(bit * weight for bit, weight in zip(bit_slice, weights))
def decode_full_info(self, bits):
if len(bits) < 58:
return f"Sync...|{len(bits)}|Warte auf Frame|--|--"
try:
is_mesz = bits[17] == 1 and bits[18] == 0
is_mez = bits[17] == 0 and bits[18] == 1
tz_str = "MESZ" if is_mesz else ("MEZ" if is_mez else "??")
schalt_warn = "JA" if bits[19] == 1 else "Nein"
minuten = self.bcd_decode(bits[21:28], [1, 2, 4, 8, 10, 20, 40])
stunden = self.bcd_decode(bits[29:35], [1, 2, 4, 8, 10, 20])
tag = self.bcd_decode(bits[36:42], [1, 2, 4, 8, 10, 20])
monat = self.bcd_decode(bits[45:50], [1, 2, 4, 8, 10])
jahr = self.bcd_decode(bits[50:58], [1, 2, 4, 8, 10, 20, 40, 80])
wochentage = ["", "Mo", "Di", "Mi", "Do", "Fr", "Sa", "So"]
w_tag = wochentage[self.bcd_decode(bits[42:45], [1, 2, 4])]
zeit_info = f"{stunden:02d}:{minuten:02d}"
datum_info = f"{w_tag}, {tag:02d}.{monat:02d}.20{jahr:02d}"
return f"{zeit_info}|{len(bits)}|{datum_info}|{tz_str}|{schalt_warn}"
except:
return f"Fehler|{len(bits)}|Datenfehler|--|--"
def work(self, input_items, output_items):
in0 = input_items[0]
tags = self.get_tags_in_window(0, 0, len(in0))
for tag in tags:
if pmt.to_python(tag.key) == "dcf77_bit":
val = pmt.to_python(tag.value)
offset = tag.offset
if self.first_bit_received:
gap = offset - self.last_tag_offset
if gap > (1.5 * self.working_rate):
# 59. Sekunde: Frame dekodieren
self.full_container = self.decode_full_info(self.bits)
self.message_port_pub(pmt.intern("out"), pmt.intern(self.full_container))
self.bits = []
self.first_bit_received = True
if isinstance(val, int):
self.bits.append(val)
# Update-Nachricht: Wir nehmen den alten Container und ersetzen NUR die Bit-Zahl
parts = self.full_container.split('|')
if len(parts) >= 5:
# Ersetze nur das Bit-Feld (Index 1) mit der aktuellen Länge
parts[1] = str(len(self.bits))
update_msg = "|".join(parts)
self.message_port_pub(pmt.intern("out"), pmt.intern(update_msg))
self.last_tag_offset = offset
return len(in0)Daten ausgeben
Wenn man die Uhrzeit nicht nur technisch weiterverarbeiten möchte, muss sie noch in irgendeiner Form an ein Folgesystem weitergegeben werden.
Ich habe hier den Message Port verwendet. Während der normale Stream wie ein konstanter Fluss aus Samples ist, sind Messages eher wie Postkarten, die nur bei Bedarf verschickt werden, also wen nein bestimmtes Ereignis eintritt. In diesem Beispiel wird hier eine Nachricht zu jeder Sekunde übertragen, damit man den Fortschritt sehen kann.
Der Workflow nach dem Tiefpassfilter, also der eigentliche Verarbeitungsteil sieht nun wie folgt aus:

Die erzeugten Messages werden in dem Debugger Block dargestellt.
Zudem kommt eine ZMQ Message Sink zum Einsatz, der die Nachricht jede Sekunde an die eingestellte Adresse 127.0.0.1:5555 sendet.
Auf der Empfängerseite ist ein kleines Python Programm, das die Daten empfängt und als Uhr darstellt.

Nächster Schritt ist es nun die Uhrzeit aus den Daten live zu ermitteln und die Themen wie Signalschwankungen und verschiedene Eingangspegel zu lösen.