From 36f3051d0d920bd1060abbad870d0466c5f321d5 Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Tue, 29 Oct 2019 18:01:10 +0800 Subject: [PATCH 01/12] update .gitignore & setup.py --- .gitignore | 3 ++- setup.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index ed32753..11fffae 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,6 @@ /*.egg-info /.eggs -# nMigen +# tests *.vcd +*.gtkw diff --git a/setup.py b/setup.py index f83315c..5a30776 100644 --- a/setup.py +++ b/setup.py @@ -23,10 +23,11 @@ def local_scheme(version): #long_description="""TODO""", license="BSD", setup_requires=["setuptools_scm"], - install_requires=["nmigen"], + install_requires=["nmigen~=0.1.rc1"], packages=find_packages(), project_urls={ "Source Code": "https://github.com/m-labs/nmigen-stdio", "Bug Tracker": "https://github.com/m-labs/nmigen-stdio/issues", }, ) + From aea62923cc9a5aedbf13d836c47bb5de3a6c7793 Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Tue, 19 Nov 2019 17:29:16 +0800 Subject: [PATCH 02/12] serial: fix AsyncSerialTX logic --- nmigen_stdio/serial.py | 49 ++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/nmigen_stdio/serial.py b/nmigen_stdio/serial.py index 4da264f..e477310 100644 --- a/nmigen_stdio/serial.py +++ b/nmigen_stdio/serial.py @@ -1,6 +1,6 @@ from nmigen import * from nmigen.lib.cdc import MultiReg -from nmigen.tools import bits_for +from nmigen.utils import bits_for __all__ = ["AsyncSerialRX", "AsyncSerialTX", "AsyncSerial"] @@ -56,12 +56,17 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self._pins = pins + self.timer = Signal.like(self.divisor) + self.shreg = Record(_wire_layout(len(self.data), self._parity)) + self.bitno = Signal(range(len(self.shreg))) + + def elaborate(self, platform): m = Module() - timer = Signal.like(self.divisor) - shreg = Record(_wire_layout(len(self.data), self._parity)) - bitno = Signal.range(len(shreg)) + timer = self.timer + shreg = self.shreg + bitno = self.bitno if self._pins is not None: m.d.submodules += MultiReg(self._pins.rx.i, self.i, reset=1) @@ -71,7 +76,7 @@ def elaborate(self, platform): with m.If(~self.i): m.d.sync += [ bitno.eq(len(shreg) - 1), - timer.eq(self.divisor >> 1), + timer.eq(self.divisor >> 1) ] m.next = "BUSY" @@ -82,7 +87,7 @@ def elaborate(self, platform): m.d.sync += [ shreg.eq(Cat(self.i, shreg)), bitno.eq(bitno - 1), - timer.eq(self.divisor), + timer.eq(self.divisor) ] with m.If(bitno == 0): m.next = "DONE" @@ -93,7 +98,7 @@ def elaborate(self, platform): self.data.eq(shreg.data), self.err.frame .eq(~((shreg.start == 0) & (shreg.stop == 1))), self.err.parity.eq(~(shreg.parity == - _compute_parity_bit(shreg.data, self._parity))), + _compute_parity_bit(shreg.data, self._parity))) ] m.d.sync += self.err.overflow.eq(~self.ack) m.next = "IDLE" @@ -115,43 +120,55 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.rdy = Signal() self.ack = Signal() - self.o = Signal() + self.o = Signal(reset=1) self._pins = pins + + self.timer = Signal.like(self.divisor) + self.shreg = Record(_wire_layout(len(self.data), self._parity)) + self.bitno = Signal(range(len(self.shreg))) + def elaborate(self, platform): m = Module() - timer = Signal.like(self.divisor) - shreg = Record(_wire_layout(len(self.data), self._parity)) - bitno = Signal.range(len(shreg)) + timer = self.timer + shreg = self.shreg + bitno = self.bitno + + shreg_sent = Signal.like(shreg) if self._pins is not None: m.d.comb += self._pins.tx.o.eq(self.o) - with m.FSM(): + with m.FSM() as fsm: with m.State("IDLE"): m.d.comb += self.rdy.eq(1) - m.d.sync += self.o.eq(shreg[0]) with m.If(self.ack): m.d.sync += [ shreg.start .eq(0), shreg.data .eq(self.data), shreg.parity.eq(_compute_parity_bit(self.data, self._parity)), shreg.stop .eq(1), + self.o.eq(shreg.start), bitno.eq(len(shreg) - 1), - timer.eq(self.divisor), + timer.eq(self.divisor) ] m.next = "BUSY" with m.State("BUSY"): + # Copy shreg.data, shreg.parity, shreg.stop to shreg_sent + with m.If((bitno == len(shreg) - 1) & + (timer == self.divisor)): + m.d.sync += shreg_sent.eq(Cat(0, shreg[:-1])) with m.If(timer != 0): m.d.sync += timer.eq(timer - 1) with m.Else(): m.d.sync += [ - Cat(self.o, shreg).eq(shreg), + self.o.eq(shreg_sent[-1]), + shreg_sent.eq(Cat(0, shreg_sent[:-1])), bitno.eq(bitno - 1), - timer.eq(self.divisor), + timer.eq(self.divisor) ] with m.If(bitno == 0): m.next = "IDLE" From c8d7604ce70aebbde849fd4c0917da4683af69e2 Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Thu, 21 Nov 2019 14:16:41 +0800 Subject: [PATCH 03/12] serial: change RX state logic, add formal checks, remove FIFO simulation - RX now enters DONE state 1 clock earlier to avoid delay - RX now checks for one more bit (taking (divisor+1)-long time span) while making the received data ready for collection --- nmigen_stdio/serial.py | 69 ++++-- nmigen_stdio/test/test_serial.py | 362 +++++++++++++++++++++++++++++-- 2 files changed, 399 insertions(+), 32 deletions(-) diff --git a/nmigen_stdio/serial.py b/nmigen_stdio/serial.py index e477310..d59082b 100644 --- a/nmigen_stdio/serial.py +++ b/nmigen_stdio/serial.py @@ -49,8 +49,11 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi ("frame", 1), ("parity", 1), ]) + # rx.rdy: Internally driven, to indicate if received data is ready to be read self.rdy = Signal() + # rx.ack: Externally driven, to indicate if transfer of received data is enabled self.ack = Signal() + self.busy = Signal() self.i = Signal(reset=1) @@ -59,6 +62,7 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.timer = Signal.like(self.divisor) self.shreg = Record(_wire_layout(len(self.data), self._parity)) self.bitno = Signal(range(len(self.shreg))) + self.done_once = Signal() def elaborate(self, platform): @@ -67,6 +71,7 @@ def elaborate(self, platform): timer = self.timer shreg = self.shreg bitno = self.bitno + done_once = self.done_once if self._pins is not None: m.d.submodules += MultiReg(self._pins.rx.i, self.i, reset=1) @@ -76,32 +81,59 @@ def elaborate(self, platform): with m.If(~self.i): m.d.sync += [ bitno.eq(len(shreg) - 1), - timer.eq(self.divisor >> 1) + timer.eq(self.divisor >> 1), + self.busy.eq(1) ] m.next = "BUSY" with m.State("BUSY"): with m.If(timer != 0): m.d.sync += timer.eq(timer - 1) - with m.Else(): - m.d.sync += [ - shreg.eq(Cat(self.i, shreg)), - bitno.eq(bitno - 1), - timer.eq(self.divisor) - ] - with m.If(bitno == 0): + with m.If(((timer == 0) & (bitno != 0)) | ((timer == 1) & (bitno == 0))): + m.d.sync += shreg.eq(Cat(self.i, shreg)) + with m.If(bitno != 0): + m.d.sync += [ + bitno.eq(bitno - 1), + timer.eq(self.divisor) + ] + with m.Else(): m.next = "DONE" with m.State("DONE"): - with m.If(self.ack): + with m.If(~done_once & (timer == 0)): + with m.If(self.ack): + m.d.sync += [ + self.data.eq(shreg.data), + self.err.frame .eq(~((shreg.start == 0) & (shreg.stop == 1))), + self.err.parity.eq(~(shreg.parity == + _compute_parity_bit(shreg.data, self._parity))) + ] m.d.sync += [ - self.data.eq(shreg.data), - self.err.frame .eq(~((shreg.start == 0) & (shreg.stop == 1))), - self.err.parity.eq(~(shreg.parity == - _compute_parity_bit(shreg.data, self._parity))) + self.err.overflow.eq(~self.ack), + self.busy.eq(0) ] - m.d.sync += self.err.overflow.eq(~self.ack) - m.next = "IDLE" + # At second clock edge for DONE, + # bitno remains -1, while timer is set as divisor + m.d.sync += [ + timer.eq(self.divisor), + done_once.eq(1) + ] + # Continue the timer to receive one more bit + # Set back to IDLE if it is still stop bit + with m.If(timer != 0): + m.d.sync += timer.eq(timer - 1) + with m.Elif(done_once): + m.d.sync += done_once.eq(0) + with m.If(~self.i): + m.d.sync += [ + shreg.eq(Cat(self.i, shreg)), + bitno.eq(len(shreg) - 1), + timer.eq(self.divisor), + self.busy.eq(1) + ] + m.next = "BUSY" + with m.Else(): + m.next = "IDLE" with m.If(self.ack): m.d.sync += self.rdy.eq(fsm.ongoing("DONE")) @@ -117,8 +149,11 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor) self.data = Signal(data_bits) + # rx.rdy: Internally driven, to indicate if data transmission can start self.rdy = Signal() + # rx.ack: Externally driven, to indicate if transfer of data to transmit is enabled self.ack = Signal() + self.busy = Signal() self.o = Signal(reset=1) @@ -152,7 +187,8 @@ def elaborate(self, platform): shreg.stop .eq(1), self.o.eq(shreg.start), bitno.eq(len(shreg) - 1), - timer.eq(self.divisor) + timer.eq(self.divisor), + self.busy.eq(1) ] m.next = "BUSY" @@ -171,6 +207,7 @@ def elaborate(self, platform): timer.eq(self.divisor) ] with m.If(bitno == 0): + m.d.sync += self.busy.eq(0) m.next = "IDLE" return m diff --git a/nmigen_stdio/test/test_serial.py b/nmigen_stdio/test/test_serial.py index 1b32c48..7b02445 100644 --- a/nmigen_stdio/test/test_serial.py +++ b/nmigen_stdio/test/test_serial.py @@ -3,6 +3,10 @@ from nmigen.lib.fifo import SyncFIFO from nmigen.back.pysim import * +# Formal Verification +from nmigen.test.utils import * +from nmigen.asserts import * + from ..serial import * @@ -86,23 +90,349 @@ def process(): self.assertTrue((yield self.dut.err.overflow)) simulation_test(self.dut, process) - def test_fifo(self): - self.dut = AsyncSerialRX(divisor=7) - self.fifo = SyncFIFO(width=8, depth=4) + +class AsyncSerialTXTestCase(unittest.TestCase): + def tx_period(self): + for _ in range((yield self.dut.divisor) + 1): + yield + + def tx_dummy_test(self): + self.dut = AsyncSerialTX(divisor=7) m = Module() - m.submodules.rx = self.dut - m.submodules.fifo = self.fifo - m.d.comb += [ - self.dut.ack.eq(self.fifo.w_rdy), - self.fifo.w_en.eq(self.dut.rdy), - self.fifo.w_data.eq(self.dut.data), - ] + m.submodules.tx = self.dut def process(): - yield from self.tx_bits([0, 1,0,1,0,1,0,1,0, 1, - 0, 0,1,0,1,0,1,0,1, 1]) - yield from self.tx_period() - self.assertEqual((yield from self.fifo.read()), 0xAA) - self.assertEqual((yield from self.fifo.read()), 0x55) + yield self.dut.ack.eq(1) + yield self.dut.data.eq(0xAA) + for _ in range(10): + yield from self.tx_period() + yield yield - self.assertFalse((yield self.fifo.r_rdy)) simulation_test(m, process) + + def tx_bits(self, data_bits): + for _ in range(data_bits): + yield from self.tx_period() + yield self.dut.o + + +class AsyncSerialLoopbackSpec(Elaboratable): + def __init__(self, *, divisor, data_bits, parity): + self.rx = AsyncSerialRX(divisor=divisor, data_bits=data_bits, parity=parity) + self.tx = AsyncSerialTX(divisor=divisor, data_bits=data_bits, parity=parity) + self.data_bits = data_bits + self.parity = parity + + def elaborate(self, platform): + m = Module() + m.submodules.rx = rx = self.rx + m.submodules.tx = tx = self.tx + + m.submodules.rx_fifo = rx_fifo = SyncFIFO(width=self.data_bits, depth=1) + m.submodules.tx_fifo = tx_fifo = SyncFIFO(width=self.data_bits, depth=1) + + m.d.comb += [ + # + tx.ack.eq(tx_fifo.r_rdy), + tx_fifo.r_en.eq(tx.rdy), + tx.data.eq(tx_fifo.r_data), + # + rx.i.eq(tx.o), + # + rx.ack.eq(rx_fifo.w_rdy), + rx_fifo.w_en.eq(rx.rdy), + rx_fifo.w_data.eq(rx.data) + ] + + m.domains += ClockDomain("sync") + m.d.comb += ResetSignal("sync").eq(0) + + # Assumptions for TX + fv_tx_data = AnyConst(self.data_bits) + m.d.comb += Assume(Stable(tx.divisor)) + # Set up an FSM for TX such that only 1 data frame is sent per test + with m.FSM() as tx_fsm: + with m.State("TX-1"): + with m.If(tx_fifo.w_rdy): + m.d.comb += [ + tx_fifo.w_en.eq(1), + tx_fifo.w_data.eq(fv_tx_data), + ] + m.next = "TX-2" + with m.State("TX-2"): + with m.If((tx.bitno == 0) & (tx.timer == 0)): + m.next = "DONE" + tx_fsm.state.name = "fv_tx_fsm_state" + + # Assumptions for RX + fv_rx_data = Signal(self.data_bits) + m.d.comb += Assume(Stable(rx.divisor)) + # Set up an FSM for RX such that it expects 1 data frame from TX + with m.FSM() as rx_fsm: + with m.State("RX-1"): + with m.If((rx.bitno == 0) & (rx.timer == 0)): + m.d.sync += rx_fifo.r_en.eq(1) + m.next = "RX-LATCH" + with m.State("RX-LATCH"): + with m.If(rx_fifo.r_rdy): + m.d.sync += fv_rx_data.eq(rx_fifo.r_data) + m.next = "CHECK" + with m.State("CHECK"): + with m.If((fv_rx_data == fv_tx_data) & + (rx.err == 0)): + m.next = "DONE" + rx_fsm.state.name = "fv_rx_fsm_state" + + # Assume initial FSM states + with m.If(Initial()): + m.d.comb += [ + Assume(tx_fsm.ongoing("TX-1")), + Assume(rx_fsm.ongoing("RX-1")) + ] + # Assertions + with m.If(Past(rx_fsm.state) == rx_fsm.encoding["CHECK"]): + m.d.comb += [ + Assert(rx_fsm.ongoing("DONE")), + Assert(tx_fsm.ongoing("DONE")) + ] + + return m + + +class AsyncSerialLoopbackTestCase(FHDLTestCase): + def check_formal(self, *, divisor, data_bits, parity): + depth = (divisor+1) * (data_bits+(3 if parity!="none" else 2)) + 5 + self.assertFormal(AsyncSerialLoopbackSpec(divisor=divisor, data_bits=data_bits, parity=parity), + mode="bmc", depth=depth) + + def check_all(self): + # Commonly-used divisors, from "Embedded Systems Design with 8051 Microcontrollers: Hardware and Software", p5-54, Fig 6.11 + list_divisor = [0x417, 0x30, 0x18, 0xC, 0x6, 0x3, 0x2, 0x1] + list_data_bits = range(5, 9) + list_parity = ["none", "mark", "space", "even", "odd"] + for divisor in list_divisor: + for data_bits in list_data_bits: + for parity in list_parity: + with self.subTest(divisor=divisor, data_bits=data_bits, parity=parity): + self.check_formal(divisor=divisor, data_bits=data_bits, parity=parity) + + def check_all_div6(self): + list_data_bits = range(5, 9) + list_parity = ["none", "mark", "space", "even", "odd"] + for data_bits in list_data_bits: + for parity in list_parity: + with self.subTest(data_bits=data_bits, parity=parity): + self.check_formal(divisor=6, data_bits=data_bits, parity=parity) + + +class AsyncSerialBitstreamSpec(Elaboratable): + def __init__(self, *, divisor, data_bits, parity): + self.rx = AsyncSerialRX(divisor=divisor, data_bits=data_bits, parity=parity) + self.divisor = divisor + self.data_bits = data_bits + self.parity = parity + + def elaborate(self, platform): + m = Module() + m.submodules.rx = rx = self.rx + + len_bitstream = self.data_bits+(3 if self.parity!="none" else 2) + m.submodules.rx_fifo = rx_fifo = SyncFIFO(width=self.data_bits, depth=1) + m.submodules.txbit_fifo = txbit_fifo = SyncFIFO(width=1, depth=len_bitstream+1) + + fv_txfifo_start = Signal() + m.d.txclk += fv_txfifo_start.eq(fv_txfifo_start) + with m.If(fv_txfifo_start): + m.d.comb += rx.i.eq(txbit_fifo.r_data) + with m.Else(): + m.d.comb += rx.i.eq(1) + m.d.comb += [ + # + rx.ack.eq(rx_fifo.w_rdy), + rx_fifo.w_en.eq(rx.rdy), + rx_fifo.w_data.eq(rx.data) + ] + + m.domains += [ + ClockDomain("sync"), + ClockDomain("txclk") + ] + m.d.comb += [ + ResetSignal("sync").eq(0), + ResetSignal("txclk").eq(0) + ] + + # Assumptions for TX + fv_tx_bitstream_val = AnyConst(len_bitstream) + # Assume the bitstream doesn't have 1-bit delay + m.d.comb += Assume(fv_tx_bitstream_val[-1] != 1) + fv_tx_bitstream = Signal(len_bitstream+1) + fv_tx_extra_bit = AnyConst(1) # A const bit representing the extra bit after the bitstream + fv_tx_overflow = AnyConst(1) # A const flag to determine if the rx_fifo is always full + fv_tx_bitno = Signal(range(len(fv_tx_bitstream))) + fv_tx_timer = Signal.like(rx.divisor) + fv_rx_data = Signal(self.data_bits) + # + fv_txfifo_num_bits = Signal(range(len(fv_tx_bitstream))) + with m.FSM(domain="txclk") as txfifo_fsm: + with m.State("WRITE-PREP"): + m.d.txclk += [ + fv_tx_bitstream.eq(Cat(fv_tx_extra_bit, fv_tx_bitstream_val)), + fv_txfifo_num_bits.eq(len(fv_tx_bitstream)), + ] + with m.If(fv_txfifo_num_bits == len(fv_tx_bitstream)): + m.d.comb += txbit_fifo.w_en.eq(1) + m.next = "WRITE-BITSTREAM" + with m.State("WRITE-BITSTREAM"): + m.d.comb += txbit_fifo.w_en.eq(1) + with m.If(fv_txfifo_num_bits != 0): + m.d.comb += txbit_fifo.w_data.eq(fv_tx_bitstream[-1]) + m.d.txclk += [ + fv_tx_bitstream.eq(Cat(0, fv_tx_bitstream[:-1])), + fv_txfifo_num_bits.eq(fv_txfifo_num_bits - 1) + ] + with m.Else(): + m.next = "DONE" + txfifo_fsm.state.name = "fv_txfifo_fsm_state" + # + # Assume a variable timer initial value for bit 0 + fv_tx_bit0_divisor = AnyConst(range(self.divisor)) + m.d.comb += Assume(fv_tx_bit0_divisor > (self.divisor >> 1)) + fv_tx_done = Signal() + m.d.txclk += fv_tx_done.eq(fv_tx_done) + with m.FSM(domain="txclk") as tx_fsm: + with m.State("TX-PREP"): + with m.If(txbit_fifo.r_rdy): + m.d.txclk += txbit_fifo.r_en.eq(1) + with m.If(txbit_fifo.r_en): + m.d.txclk += [ + txbit_fifo.r_en.eq(0), + fv_tx_bitno.eq(len(fv_tx_bitstream) - 1), + fv_tx_timer.eq(fv_tx_bit0_divisor), + fv_txfifo_start.eq(1) + ] + m.next = "TX-SENDBIT" + with m.State("TX-SENDBIT"): + m.d.txclk += txbit_fifo.r_en.eq(0) + with m.If(fv_tx_timer != 0): + m.d.txclk += fv_tx_timer.eq(fv_tx_timer - 1) + with m.If((fv_tx_timer == 1) & (fv_tx_bitno != 0)): + m.d.txclk += txbit_fifo.r_en.eq(1) + with m.Else(): + m.d.txclk += [ + fv_tx_bitno.eq(fv_tx_bitno - 1), + fv_tx_timer.eq(self.divisor), + ] + with m.If(fv_tx_bitno == 0): + m.d.txclk += fv_tx_done.eq(1) + m.next = "DONE" + tx_fsm.state.name = "fv_tx_fsm_state" + + # Assumptions for RX + m.d.comb += Assume(Stable(rx.divisor)) + # + with m.If(fv_tx_overflow): + m.d.comb += Assume(rx_fifo.w_rdy == 0) + with m.Else(): + m.d.comb += [ + rx_fifo.w_en.eq(rx.rdy), + rx_fifo.w_data.eq(rx.data) + ] + # + with m.FSM() as rx_fsm: + with m.State("RX-1"): + with m.If((rx.bitno == 0) & (rx.timer == 0)): + m.d.sync += rx_fifo.r_en.eq(1) + m.next = "RX-LATCH" + with m.State("RX-LATCH"): + with m.If(rx_fifo.r_rdy): + m.d.sync += fv_rx_data.eq(rx_fifo.r_data) + m.next = "CHECK" + with m.State("CHECK"): + with m.If((rx.err != 0)): + m.next = "ERROR" + with m.Else(): + m.next = "DONE" + rx_fsm.state.name = "fv_rx_fsm_state" + + # Assume initial FSM states + with m.If(Initial()): + m.d.comb += [ + Assume(txfifo_fsm.ongoing("WRITE-PREP")), + Assume(tx_fsm.ongoing("TX-PREP")), + Assume(fv_txfifo_start == 0), + Assume(fv_tx_done == 0), + Assume(rx_fsm.ongoing("RX-1")) + ] + + # Assertions + with m.If(Past(rx_fsm.state) == rx_fsm.encoding["CHECK"]): + m.d.comb += Assert(rx_fsm.ongoing("DONE") | rx_fsm.ongoing("ERROR")) + + with m.If(rx_fsm.ongoing("DONE")): + m.d.comb += Assert((fv_tx_bitstream_val[-1] == 0) & (fv_tx_bitstream_val[0] == 1)) + if self.parity == "mark": + m.d.comb += Assert((fv_tx_bitstream_val[1] == 1)) + elif self.parity == "space": + m.d.comb += Assert((fv_tx_bitstream_val[1] == 0)) + elif self.parity == "even": + m.d.comb += Assert((fv_tx_bitstream_val[1] == fv_rx_data.xor())) + elif self.parity == "odd": + m.d.comb += Assert((fv_tx_bitstream_val[1] == ~fv_rx_data.xor())) + m.d.comb += Assert(~fv_tx_overflow) + + with m.Elif(rx_fsm.ongoing("ERROR")): + with m.If(rx.err.frame): + m.d.comb += Assert((fv_tx_bitstream_val[-1] != 0) | (fv_tx_bitstream_val[0] != 1)) + if self.parity == "none": + m.d.comb += Assert(~rx.err.parity) + else: + with m.If(rx.err.parity): + if self.parity == "mark": + m.d.comb += Assert((fv_tx_bitstream_val[1] != 1)) + elif self.parity == "space": + m.d.comb += Assert((fv_tx_bitstream_val[1] != 0)) + elif self.parity == "even": + m.d.comb += Assert((fv_tx_bitstream_val[1] != fv_rx_data.xor())) + elif self.parity == "odd": + m.d.comb += Assert((fv_tx_bitstream_val[1] != ~fv_rx_data.xor())) + with m.If(rx.err.overflow): + m.d.comb += Assert(fv_tx_overflow) + + with m.If(~fv_tx_extra_bit & fv_tx_done): + m.d.comb += [ + Assert(rx.shreg.stop == 0), + Assert(rx.busy == 1) + ] + with m.Elif(fv_tx_extra_bit & fv_tx_done): + m.d.comb += [ + Assert(rx.i == 1), + Assert(rx.busy == 0) + ] + + return m + + +class AsyncSerialBitstreamTestCase(FHDLTestCase): + def check_formal(self, *, divisor, data_bits, parity): + depth = (divisor+2) * (data_bits+(3 if parity!="none" else 2)+1) + 5 + self.assertFormal(AsyncSerialBitstreamSpec(divisor=divisor, data_bits=data_bits, parity=parity), + mode="bmc", depth=depth) + + def check_all(self): + # Commonly-used divisors, from "Embedded Systems Design with 8051 Microcontrollers: Hardware and Software", p5-54, Fig 6.11 + list_divisor = [0x417, 0x30, 0x18, 0xC, 0x6, 0x3, 0x2, 0x1] + list_data_bits = range(5, 9) + list_parity = ["none", "mark", "space", "even", "odd"] + for divisor in list_divisor: + for data_bits in list_data_bits: + for parity in list_parity: + with self.subTest(divisor=divisor, data_bits=data_bits, parity=parity): + self.check_formal(divisor=divisor, data_bits=data_bits, parity=parity) + + def check_all_div6(self): + list_data_bits = range(5, 9) + list_parity = ["none", "mark", "space", "even", "odd"] + for data_bits in list_data_bits: + for parity in list_parity: + with self.subTest(data_bits=data_bits, parity=parity): + self.check_formal(divisor=6, data_bits=data_bits, parity=parity) From 8732bc2c68bf26595eb194df95dacd06ee088c45 Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Tue, 3 Dec 2019 16:22:30 +0800 Subject: [PATCH 04/12] =?UTF-8?q?serial:=20MultiReg=20=E2=86=92=20FFSynchr?= =?UTF-8?q?onizer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- nmigen_stdio/serial.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nmigen_stdio/serial.py b/nmigen_stdio/serial.py index d59082b..219cff4 100644 --- a/nmigen_stdio/serial.py +++ b/nmigen_stdio/serial.py @@ -1,5 +1,5 @@ from nmigen import * -from nmigen.lib.cdc import MultiReg +from nmigen.lib.cdc import FFSynchronizer from nmigen.utils import bits_for @@ -74,7 +74,7 @@ def elaborate(self, platform): done_once = self.done_once if self._pins is not None: - m.d.submodules += MultiReg(self._pins.rx.i, self.i, reset=1) + m.submodules += FFSynchronizer(self._pins.rx.i, self.i, reset=1) with m.FSM() as fsm: with m.State("IDLE"): From b6d930fe29f66107b24fe4a331ebf03e5e2568d9 Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Fri, 20 Dec 2019 13:59:51 +0800 Subject: [PATCH 05/12] serial: remove shreg_sent, fix AsyncSerial init params --- nmigen_stdio/serial.py | 21 +++++++-------------- nmigen_stdio/test/test_serial.py | 7 ++++++- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/nmigen_stdio/serial.py b/nmigen_stdio/serial.py index 219cff4..bd7f6a2 100644 --- a/nmigen_stdio/serial.py +++ b/nmigen_stdio/serial.py @@ -161,7 +161,7 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.timer = Signal.like(self.divisor) self.shreg = Record(_wire_layout(len(self.data), self._parity)) - self.bitno = Signal(range(len(self.shreg))) + self.bitno = Signal(range(len(self.shreg)+1)) def elaborate(self, platform): @@ -171,8 +171,6 @@ def elaborate(self, platform): shreg = self.shreg bitno = self.bitno - shreg_sent = Signal.like(shreg) - if self._pins is not None: m.d.comb += self._pins.tx.o.eq(self.o) @@ -185,24 +183,19 @@ def elaborate(self, platform): shreg.data .eq(self.data), shreg.parity.eq(_compute_parity_bit(self.data, self._parity)), shreg.stop .eq(1), - self.o.eq(shreg.start), - bitno.eq(len(shreg) - 1), - timer.eq(self.divisor), + bitno.eq(len(self.shreg)), + timer.eq(0), self.busy.eq(1) ] m.next = "BUSY" with m.State("BUSY"): - # Copy shreg.data, shreg.parity, shreg.stop to shreg_sent - with m.If((bitno == len(shreg) - 1) & - (timer == self.divisor)): - m.d.sync += shreg_sent.eq(Cat(0, shreg[:-1])) with m.If(timer != 0): m.d.sync += timer.eq(timer - 1) with m.Else(): m.d.sync += [ - self.o.eq(shreg_sent[-1]), - shreg_sent.eq(Cat(0, shreg_sent[:-1])), + self.o.eq(shreg[-1]), + shreg.eq(Cat(0, shreg[:-1])), bitno.eq(bitno - 1), timer.eq(self.divisor) ] @@ -217,8 +210,8 @@ class AsyncSerial(Elaboratable): def __init__(self, *, divisor, divisor_bits=None, **kwargs): self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor) - self.rx = AsyncSerialRX(**kwargs) - self.tx = AsyncSerialTX(**kwargs) + self.rx = AsyncSerialRX(divisor=divisor, divisor_bits=divisor_bits, **kwargs) + self.tx = AsyncSerialTX(divisor=divisor, divisor_bits=divisor_bits, **kwargs) def elaborate(self, platform): m = Module() diff --git a/nmigen_stdio/test/test_serial.py b/nmigen_stdio/test/test_serial.py index 7b02445..9394522 100644 --- a/nmigen_stdio/test/test_serial.py +++ b/nmigen_stdio/test/test_serial.py @@ -189,7 +189,8 @@ def elaborate(self, platform): Assume(rx_fsm.ongoing("RX-1")) ] # Assertions - with m.If(Past(rx_fsm.state) == rx_fsm.encoding["CHECK"]): + with m.If((Past(rx_fsm.state) == rx_fsm.encoding["CHECK"]) & + ~Stable(rx_fsm.state)): m.d.comb += [ Assert(rx_fsm.ongoing("DONE")), Assert(tx_fsm.ongoing("DONE")) @@ -379,6 +380,10 @@ def elaborate(self, platform): elif self.parity == "odd": m.d.comb += Assert((fv_tx_bitstream_val[1] == ~fv_rx_data.xor())) m.d.comb += Assert(~fv_tx_overflow) + if self.parity == "none": + m.d.comb += Assert(fv_rx_data == fv_tx_bitstream_val[1:-1]) + else: + m.d.comb += Assert(fv_rx_data == fv_tx_bitstream_val[2:-1]) with m.Elif(rx_fsm.ongoing("ERROR")): with m.If(rx.err.frame): From 8e91edc09031889e1033a69ea097f8a9f882f49a Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Mon, 23 Dec 2019 10:30:06 +0800 Subject: [PATCH 06/12] serial: fix TX logic, improve unit tests --- nmigen_stdio/serial.py | 130 ++++++++++++-------- nmigen_stdio/test/test_serial.py | 203 +++++++++++++++++-------------- 2 files changed, 195 insertions(+), 138 deletions(-) diff --git a/nmigen_stdio/serial.py b/nmigen_stdio/serial.py index bd7f6a2..66b5b60 100644 --- a/nmigen_stdio/serial.py +++ b/nmigen_stdio/serial.py @@ -29,14 +29,16 @@ def _compute_parity_bit(data, parity): def _wire_layout(data_bits, parity="none"): return [ - ("stop", 1), - ("parity", 0 if parity == "none" else 1), - ("data", data_bits), ("start", 1), + ("data", data_bits), + ("parity", 0 if parity == "none" else 1), + ("stop", 1) ] class AsyncSerialRX(Elaboratable): + """An UART receiver module. Receives the LSB first and MSB last. + """ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pins=None): _check_parity(parity) self._parity = parity @@ -49,11 +51,12 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi ("frame", 1), ("parity", 1), ]) - # rx.rdy: Internally driven, to indicate if received data is ready to be read - self.rdy = Signal() # rx.ack: Externally driven, to indicate if transfer of received data is enabled self.ack = Signal() + # rx.busy: Internally driven, to indicate if there is an ongoing data transfer self.busy = Signal() + # rx.r_rdy: Internally driven, to indicate if received data is ready to be read + self.r_rdy = Signal() self.i = Signal(reset=1) @@ -61,8 +64,8 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.timer = Signal.like(self.divisor) self.shreg = Record(_wire_layout(len(self.data), self._parity)) - self.bitno = Signal(range(len(self.shreg))) - self.done_once = Signal() + self.bits_left = Signal(range(len(self.shreg) + 1)) + self.done = Signal() def elaborate(self, platform): @@ -70,8 +73,8 @@ def elaborate(self, platform): timer = self.timer shreg = self.shreg - bitno = self.bitno - done_once = self.done_once + bits_left = self.bits_left + done = self.done if self._pins is not None: m.submodules += FFSynchronizer(self._pins.rx.i, self.i, reset=1) @@ -80,27 +83,26 @@ def elaborate(self, platform): with m.State("IDLE"): with m.If(~self.i): m.d.sync += [ - bitno.eq(len(shreg) - 1), - timer.eq(self.divisor >> 1), - self.busy.eq(1) + bits_left.eq(len(shreg)), + timer.eq(self.divisor >> 1) ] m.next = "BUSY" with m.State("BUSY"): with m.If(timer != 0): m.d.sync += timer.eq(timer - 1) - with m.If(((timer == 0) & (bitno != 0)) | ((timer == 1) & (bitno == 0))): - m.d.sync += shreg.eq(Cat(self.i, shreg)) - with m.If(bitno != 0): - m.d.sync += [ - bitno.eq(bitno - 1), - timer.eq(self.divisor) - ] + with m.If(((timer == 0) & (bits_left != 1)) | ((timer == 1) & (bits_left == 1))): + m.d.sync += [ + shreg.eq(Cat(shreg[1:], self.i)), + bits_left.eq(bits_left - 1) + ] + with m.If(bits_left != 1): + m.d.sync += timer.eq(self.divisor) with m.Else(): m.next = "DONE" with m.State("DONE"): - with m.If(~done_once & (timer == 0)): + with m.If(~done & (timer == 0)): with m.If(self.ack): m.d.sync += [ self.data.eq(shreg.data), @@ -109,39 +111,41 @@ def elaborate(self, platform): _compute_parity_bit(shreg.data, self._parity))) ] m.d.sync += [ - self.err.overflow.eq(~self.ack), - self.busy.eq(0) + self.err.overflow.eq(~self.ack) ] # At second clock edge for DONE, - # bitno remains -1, while timer is set as divisor + # timer is set as divisor m.d.sync += [ timer.eq(self.divisor), - done_once.eq(1) + done.eq(1) ] # Continue the timer to receive one more bit # Set back to IDLE if it is still stop bit with m.If(timer != 0): m.d.sync += timer.eq(timer - 1) - with m.Elif(done_once): - m.d.sync += done_once.eq(0) + with m.Elif(done): + m.d.sync += done.eq(0) with m.If(~self.i): m.d.sync += [ - shreg.eq(Cat(self.i, shreg)), - bitno.eq(len(shreg) - 1), - timer.eq(self.divisor), - self.busy.eq(1) + shreg.eq(Cat(shreg[1:], self.i)), + bits_left.eq(len(shreg)), + timer.eq(self.divisor) ] m.next = "BUSY" with m.Else(): m.next = "IDLE" - with m.If(self.ack): - m.d.sync += self.rdy.eq(fsm.ongoing("DONE")) + m.d.comb += [ + self.r_rdy.eq(fsm.ongoing("DONE") & done), + self.busy.eq(fsm.ongoing("BUSY")) + ] return m class AsyncSerialTX(Elaboratable): + """An UART transmitter module. Transmits the LSB first and MSB last. + """ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pins=None): _check_parity(parity) self._parity = parity @@ -149,11 +153,12 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor) self.data = Signal(data_bits) - # rx.rdy: Internally driven, to indicate if data transmission can start - self.rdy = Signal() - # rx.ack: Externally driven, to indicate if transfer of data to transmit is enabled + # tx.ack: Externally driven, to indicate if transfer of data to transmit is enabled self.ack = Signal() + # tx.busy: Internally driven, to indicate if there is an ongoing data transfer self.busy = Signal() + # tx.w_done: Internally driven, to indicate if data has been transmitted + self.w_done = Signal() self.o = Signal(reset=1) @@ -161,7 +166,7 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.timer = Signal.like(self.divisor) self.shreg = Record(_wire_layout(len(self.data), self._parity)) - self.bitno = Signal(range(len(self.shreg)+1)) + self.bits_left = Signal(range(len(self.shreg) + 1)) def elaborate(self, platform): @@ -169,39 +174,68 @@ def elaborate(self, platform): timer = self.timer shreg = self.shreg - bitno = self.bitno + bits_left = self.bits_left if self._pins is not None: m.d.comb += self._pins.tx.o.eq(self.o) with m.FSM() as fsm: with m.State("IDLE"): - m.d.comb += self.rdy.eq(1) + m.d.sync += self.o.eq(1) with m.If(self.ack): m.d.sync += [ + self.o.eq(0), # Issue start bit ASAP shreg.start .eq(0), shreg.data .eq(self.data), shreg.parity.eq(_compute_parity_bit(self.data, self._parity)), shreg.stop .eq(1), - bitno.eq(len(self.shreg)), - timer.eq(0), - self.busy.eq(1) + bits_left.eq(len(self.shreg)), + timer.eq(self.divisor) ] m.next = "BUSY" with m.State("BUSY"): with m.If(timer != 0): m.d.sync += timer.eq(timer - 1) - with m.Else(): + with m.If(((timer == 0) & (bits_left != 1)) | ((timer == 1) & (bits_left == 1))): + m.d.sync += bits_left.eq(bits_left - 1), + with m.If(bits_left == len(self.shreg)): + m.d.sync += [ + self.o.eq(shreg[1]), # Start bit has already been issued + shreg.eq(Cat(shreg[2:], 0, 0)) # Skip it during transfer + ] + with m.Elif(bits_left != 1): + m.d.sync += [ + self.o.eq(shreg[0]), + shreg.eq(Cat(shreg[1:], 0)), + ] + with m.If(bits_left != 1): + m.d.sync += timer.eq(self.divisor) + with m.Else(): + m.next = "DONE" + + with m.State("DONE"): + # Accept next sequence if ACK is still high + with m.If(self.ack): m.d.sync += [ - self.o.eq(shreg[-1]), - shreg.eq(Cat(0, shreg[:-1])), - bitno.eq(bitno - 1), + self.o.eq(0), # Issue start bit ASAP + shreg.start .eq(0), + shreg.data .eq(self.data), + shreg.parity.eq(_compute_parity_bit(self.data, self._parity)), + shreg.stop .eq(1), + bits_left.eq(len(self.shreg)), timer.eq(self.divisor) ] - with m.If(bitno == 0): - m.d.sync += self.busy.eq(0) - m.next = "IDLE" + m.next = "BUSY" + # Set back to IDLE if ACK is low + with m.Else(): + m.d.sync += self.o.eq(1) + m.next = "IDLE" + + m.d.comb += [ + self.w_done.eq(fsm.ongoing("DONE")), + self.busy.eq(fsm.ongoing("BUSY")) + ] return m diff --git a/nmigen_stdio/test/test_serial.py b/nmigen_stdio/test/test_serial.py index 9394522..8f5e681 100644 --- a/nmigen_stdio/test/test_serial.py +++ b/nmigen_stdio/test/test_serial.py @@ -96,18 +96,45 @@ def tx_period(self): for _ in range((yield self.dut.divisor) + 1): yield - def tx_dummy_test(self): - self.dut = AsyncSerialTX(divisor=7) - m = Module() - m.submodules.tx = self.dut + def tx_test(self, *, data): def process(): yield self.dut.ack.eq(1) - yield self.dut.data.eq(0xAA) + yield self.dut.data.eq(data) + yield + yield self.dut.ack.eq(0) for _ in range(10): yield from self.tx_period() - yield - yield - simulation_test(m, process) + yield from self.tx_period() # Check 1 more period + simulation_test(self.dut, process) + + def test_8n1(self): + self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="none") + self.tx_test(data=0x10101010) + + def test_8n1_100e6_112500(self): + div = round(100e6/112500)-1 + self.dut = AsyncSerialTX(divisor=div, data_bits=8, parity="none") + self.tx_test(data=32) + + def test_16n1(self): + self.dut = AsyncSerialTX(divisor=7, data_bits=16, parity="none") + self.tx_test(data=0x0101011011110100) + + def test_8m1(self): + self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="mark") + self.tx_test(data=0x00111100) + + def test_8s1(self): + self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="space") + self.tx_test(data=0x11011010) + + def test_8e1(self): + self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="even") + self.tx_test(data=0x00101110) + + def test_8o1(self): + self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="odd") + self.tx_test(data=0x01110011) def tx_bits(self, data_bits): for _ in range(data_bits): @@ -133,13 +160,13 @@ def elaborate(self, platform): m.d.comb += [ # tx.ack.eq(tx_fifo.r_rdy), - tx_fifo.r_en.eq(tx.rdy), + tx_fifo.r_en.eq(~tx.busy), tx.data.eq(tx_fifo.r_data), # rx.i.eq(tx.o), # rx.ack.eq(rx_fifo.w_rdy), - rx_fifo.w_en.eq(rx.rdy), + rx_fifo.w_en.eq(rx.r_rdy), rx_fifo.w_data.eq(rx.data) ] @@ -159,7 +186,7 @@ def elaborate(self, platform): ] m.next = "TX-2" with m.State("TX-2"): - with m.If((tx.bitno == 0) & (tx.timer == 0)): + with m.If((tx.bits_left == 0) & (tx.timer == 0)): m.next = "DONE" tx_fsm.state.name = "fv_tx_fsm_state" @@ -169,12 +196,15 @@ def elaborate(self, platform): # Set up an FSM for RX such that it expects 1 data frame from TX with m.FSM() as rx_fsm: with m.State("RX-1"): - with m.If((rx.bitno == 0) & (rx.timer == 0)): + with m.If(~rx_fifo.w_rdy): m.d.sync += rx_fifo.r_en.eq(1) m.next = "RX-LATCH" with m.State("RX-LATCH"): with m.If(rx_fifo.r_rdy): - m.d.sync += fv_rx_data.eq(rx_fifo.r_data) + m.d.sync += [ + fv_rx_data.eq(rx_fifo.r_data), + rx_fifo.r_en.eq(0) + ] m.next = "CHECK" with m.State("CHECK"): with m.If((fv_rx_data == fv_tx_data) & @@ -195,34 +225,34 @@ def elaborate(self, platform): Assert(rx_fsm.ongoing("DONE")), Assert(tx_fsm.ongoing("DONE")) ] + ## RX r_rdy + with m.If(Past(rx.busy, 2) & ~Stable(rx.busy, 1)): + m.d.comb += Assert(rx.r_rdy) + with m.If(Stable(rx.r_rdy) & rx.r_rdy): + m.d.comb += Assert(Stable(rx.data) & + Stable(rx.err.overflow) & + Stable(rx.err.frame) & + Stable(rx.err.parity)) + ## TX w_done + with m.If(Past(tx.busy) & ~Stable(tx.busy)): + m.d.comb += Assert(tx.w_done) return m class AsyncSerialLoopbackTestCase(FHDLTestCase): def check_formal(self, *, divisor, data_bits, parity): - depth = (divisor+1) * (data_bits+(3 if parity!="none" else 2)) + 5 + depth = (divisor+1) * (data_bits+(3 if parity!="none" else 2) + 2) self.assertFormal(AsyncSerialLoopbackSpec(divisor=divisor, data_bits=data_bits, parity=parity), mode="bmc", depth=depth) - def check_all(self): - # Commonly-used divisors, from "Embedded Systems Design with 8051 Microcontrollers: Hardware and Software", p5-54, Fig 6.11 - list_divisor = [0x417, 0x30, 0x18, 0xC, 0x6, 0x3, 0x2, 0x1] - list_data_bits = range(5, 9) - list_parity = ["none", "mark", "space", "even", "odd"] - for divisor in list_divisor: - for data_bits in list_data_bits: - for parity in list_parity: - with self.subTest(divisor=divisor, data_bits=data_bits, parity=parity): - self.check_formal(divisor=divisor, data_bits=data_bits, parity=parity) - - def check_all_div6(self): + def test_all_div7(self): list_data_bits = range(5, 9) list_parity = ["none", "mark", "space", "even", "odd"] for data_bits in list_data_bits: for parity in list_parity: with self.subTest(data_bits=data_bits, parity=parity): - self.check_formal(divisor=6, data_bits=data_bits, parity=parity) + self.check_formal(divisor=7, data_bits=data_bits, parity=parity) class AsyncSerialBitstreamSpec(Elaboratable): @@ -238,18 +268,17 @@ def elaborate(self, platform): len_bitstream = self.data_bits+(3 if self.parity!="none" else 2) m.submodules.rx_fifo = rx_fifo = SyncFIFO(width=self.data_bits, depth=1) - m.submodules.txbit_fifo = txbit_fifo = SyncFIFO(width=1, depth=len_bitstream+1) + m.submodules.txbit_fifo = txbit_fifo = SyncFIFO(width=1, depth=len_bitstream+2) fv_txfifo_start = Signal() - m.d.txclk += fv_txfifo_start.eq(fv_txfifo_start) with m.If(fv_txfifo_start): - m.d.comb += rx.i.eq(txbit_fifo.r_data) + m.d.sync += rx.i.eq(txbit_fifo.r_data) with m.Else(): - m.d.comb += rx.i.eq(1) + m.d.sync += rx.i.eq(1) m.d.comb += [ # rx.ack.eq(rx_fifo.w_rdy), - rx_fifo.w_en.eq(rx.rdy), + rx_fifo.w_en.eq(rx.r_rdy), rx_fifo.w_data.eq(rx.data) ] @@ -265,51 +294,43 @@ def elaborate(self, platform): # Assumptions for TX fv_tx_bitstream_val = AnyConst(len_bitstream) # Assume the bitstream doesn't have 1-bit delay - m.d.comb += Assume(fv_tx_bitstream_val[-1] != 1) + m.d.comb += Assume(fv_tx_bitstream_val[0] != 1) fv_tx_bitstream = Signal(len_bitstream+1) fv_tx_extra_bit = AnyConst(1) # A const bit representing the extra bit after the bitstream fv_tx_overflow = AnyConst(1) # A const flag to determine if the rx_fifo is always full - fv_tx_bitno = Signal(range(len(fv_tx_bitstream))) + fv_tx_bitno = Signal(range(len(fv_tx_bitstream)+1)) fv_tx_timer = Signal.like(rx.divisor) fv_rx_data = Signal(self.data_bits) # - fv_txfifo_num_bits = Signal(range(len(fv_tx_bitstream))) + fv_txfifo_num_bits = Signal(range(len(fv_tx_bitstream)+2)) with m.FSM(domain="txclk") as txfifo_fsm: with m.State("WRITE-PREP"): m.d.txclk += [ - fv_tx_bitstream.eq(Cat(fv_tx_extra_bit, fv_tx_bitstream_val)), - fv_txfifo_num_bits.eq(len(fv_tx_bitstream)), + fv_tx_bitstream.eq(Cat(fv_tx_bitstream_val, fv_tx_extra_bit)), + fv_txfifo_num_bits.eq(0), + txbit_fifo.w_en.eq(1) ] - with m.If(fv_txfifo_num_bits == len(fv_tx_bitstream)): - m.d.comb += txbit_fifo.w_en.eq(1) - m.next = "WRITE-BITSTREAM" + m.next = "WRITE-BITSTREAM" with m.State("WRITE-BITSTREAM"): - m.d.comb += txbit_fifo.w_en.eq(1) - with m.If(fv_txfifo_num_bits != 0): - m.d.comb += txbit_fifo.w_data.eq(fv_tx_bitstream[-1]) + with m.If(fv_txfifo_num_bits != len(fv_tx_bitstream)+1): m.d.txclk += [ - fv_tx_bitstream.eq(Cat(0, fv_tx_bitstream[:-1])), - fv_txfifo_num_bits.eq(fv_txfifo_num_bits - 1) + txbit_fifo.w_data.eq(fv_tx_bitstream[0]), + fv_tx_bitstream.eq(Cat(fv_tx_bitstream[1:], 0)), + fv_txfifo_num_bits.eq(fv_txfifo_num_bits + 1) ] with m.Else(): + m.d.txclk += txbit_fifo.w_en.eq(0) m.next = "DONE" txfifo_fsm.state.name = "fv_txfifo_fsm_state" # - # Assume a variable timer initial value for bit 0 - fv_tx_bit0_divisor = AnyConst(range(self.divisor)) - m.d.comb += Assume(fv_tx_bit0_divisor > (self.divisor >> 1)) - fv_tx_done = Signal() - m.d.txclk += fv_tx_done.eq(fv_tx_done) + fv_tx_extra_done = Signal() with m.FSM(domain="txclk") as tx_fsm: with m.State("TX-PREP"): + m.d.txclk += txbit_fifo.r_en.eq(0) with m.If(txbit_fifo.r_rdy): - m.d.txclk += txbit_fifo.r_en.eq(1) - with m.If(txbit_fifo.r_en): m.d.txclk += [ - txbit_fifo.r_en.eq(0), - fv_tx_bitno.eq(len(fv_tx_bitstream) - 1), - fv_tx_timer.eq(fv_tx_bit0_divisor), - fv_txfifo_start.eq(1) + fv_tx_bitno.eq(len(fv_tx_bitstream)), + fv_tx_timer.eq(1) ] m.next = "TX-SENDBIT" with m.State("TX-SENDBIT"): @@ -318,13 +339,15 @@ def elaborate(self, platform): m.d.txclk += fv_tx_timer.eq(fv_tx_timer - 1) with m.If((fv_tx_timer == 1) & (fv_tx_bitno != 0)): m.d.txclk += txbit_fifo.r_en.eq(1) + with m.If(fv_tx_bitno == len(fv_tx_bitstream)): + m.d.txclk += fv_txfifo_start.eq(1) with m.Else(): m.d.txclk += [ fv_tx_bitno.eq(fv_tx_bitno - 1), fv_tx_timer.eq(self.divisor), ] with m.If(fv_tx_bitno == 0): - m.d.txclk += fv_tx_done.eq(1) + m.d.txclk += fv_tx_extra_done.eq(1) m.next = "DONE" tx_fsm.state.name = "fv_tx_fsm_state" @@ -335,18 +358,21 @@ def elaborate(self, platform): m.d.comb += Assume(rx_fifo.w_rdy == 0) with m.Else(): m.d.comb += [ - rx_fifo.w_en.eq(rx.rdy), + rx_fifo.w_en.eq(rx.r_rdy), rx_fifo.w_data.eq(rx.data) ] # with m.FSM() as rx_fsm: with m.State("RX-1"): - with m.If((rx.bitno == 0) & (rx.timer == 0)): + with m.If(~rx_fifo.w_rdy): m.d.sync += rx_fifo.r_en.eq(1) m.next = "RX-LATCH" with m.State("RX-LATCH"): with m.If(rx_fifo.r_rdy): - m.d.sync += fv_rx_data.eq(rx_fifo.r_data) + m.d.sync += [ + fv_rx_data.eq(rx_fifo.r_data), + rx_fifo.r_en.eq(0) + ] m.next = "CHECK" with m.State("CHECK"): with m.If((rx.err != 0)): @@ -361,54 +387,62 @@ def elaborate(self, platform): Assume(txfifo_fsm.ongoing("WRITE-PREP")), Assume(tx_fsm.ongoing("TX-PREP")), Assume(fv_txfifo_start == 0), - Assume(fv_tx_done == 0), + Assume(fv_tx_extra_done == 0), Assume(rx_fsm.ongoing("RX-1")) ] # Assertions with m.If(Past(rx_fsm.state) == rx_fsm.encoding["CHECK"]): m.d.comb += Assert(rx_fsm.ongoing("DONE") | rx_fsm.ongoing("ERROR")) + ## RX r_rdy + with m.If(Past(rx.busy, 2) & ~Stable(rx.busy, 1)): + m.d.comb += Assert(rx.r_rdy) + with m.If(Stable(rx.r_rdy) & rx.r_rdy): + m.d.comb += Assert(Stable(rx.data) & + Stable(rx.err.overflow) & + Stable(rx.err.frame) & + Stable(rx.err.parity)) with m.If(rx_fsm.ongoing("DONE")): - m.d.comb += Assert((fv_tx_bitstream_val[-1] == 0) & (fv_tx_bitstream_val[0] == 1)) + m.d.comb += Assert((fv_tx_bitstream_val[0] == 0) & (fv_tx_bitstream_val[-1] == 1)) if self.parity == "mark": - m.d.comb += Assert((fv_tx_bitstream_val[1] == 1)) + m.d.comb += Assert((fv_tx_bitstream_val[-2] == 1)) elif self.parity == "space": - m.d.comb += Assert((fv_tx_bitstream_val[1] == 0)) + m.d.comb += Assert((fv_tx_bitstream_val[-2] == 0)) elif self.parity == "even": - m.d.comb += Assert((fv_tx_bitstream_val[1] == fv_rx_data.xor())) + m.d.comb += Assert((fv_tx_bitstream_val[-2] == fv_rx_data.xor())) elif self.parity == "odd": - m.d.comb += Assert((fv_tx_bitstream_val[1] == ~fv_rx_data.xor())) + m.d.comb += Assert((fv_tx_bitstream_val[-2] == ~fv_rx_data.xor())) m.d.comb += Assert(~fv_tx_overflow) if self.parity == "none": m.d.comb += Assert(fv_rx_data == fv_tx_bitstream_val[1:-1]) else: - m.d.comb += Assert(fv_rx_data == fv_tx_bitstream_val[2:-1]) + m.d.comb += Assert(fv_rx_data == fv_tx_bitstream_val[1:-2]) with m.Elif(rx_fsm.ongoing("ERROR")): with m.If(rx.err.frame): - m.d.comb += Assert((fv_tx_bitstream_val[-1] != 0) | (fv_tx_bitstream_val[0] != 1)) + m.d.comb += Assert((fv_tx_bitstream_val[0] != 0) | (fv_tx_bitstream_val[-1] != 1)) if self.parity == "none": m.d.comb += Assert(~rx.err.parity) else: with m.If(rx.err.parity): if self.parity == "mark": - m.d.comb += Assert((fv_tx_bitstream_val[1] != 1)) + m.d.comb += Assert((fv_tx_bitstream_val[-2] != 1)) elif self.parity == "space": - m.d.comb += Assert((fv_tx_bitstream_val[1] != 0)) + m.d.comb += Assert((fv_tx_bitstream_val[-2] != 0)) elif self.parity == "even": - m.d.comb += Assert((fv_tx_bitstream_val[1] != fv_rx_data.xor())) + m.d.comb += Assert((fv_tx_bitstream_val[-2] != fv_rx_data.xor())) elif self.parity == "odd": - m.d.comb += Assert((fv_tx_bitstream_val[1] != ~fv_rx_data.xor())) + m.d.comb += Assert((fv_tx_bitstream_val[-2] != ~fv_rx_data.xor())) with m.If(rx.err.overflow): m.d.comb += Assert(fv_tx_overflow) - with m.If(~fv_tx_extra_bit & fv_tx_done): + with m.If(~fv_tx_extra_bit & fv_tx_extra_done): m.d.comb += [ - Assert(rx.shreg.stop == 0), - Assert(rx.busy == 1) + Assert(rx.i == 0), + Assert(rx.busy == 1) # BUSY to read the next sequence ] - with m.Elif(fv_tx_extra_bit & fv_tx_done): + with m.Elif(fv_tx_extra_bit & fv_tx_extra_done): m.d.comb += [ Assert(rx.i == 1), Assert(rx.busy == 0) @@ -419,25 +453,14 @@ def elaborate(self, platform): class AsyncSerialBitstreamTestCase(FHDLTestCase): def check_formal(self, *, divisor, data_bits, parity): - depth = (divisor+2) * (data_bits+(3 if parity!="none" else 2)+1) + 5 + depth = (divisor+1) * (data_bits+(3 if parity!="none" else 2) + 2) self.assertFormal(AsyncSerialBitstreamSpec(divisor=divisor, data_bits=data_bits, parity=parity), mode="bmc", depth=depth) - def check_all(self): - # Commonly-used divisors, from "Embedded Systems Design with 8051 Microcontrollers: Hardware and Software", p5-54, Fig 6.11 - list_divisor = [0x417, 0x30, 0x18, 0xC, 0x6, 0x3, 0x2, 0x1] - list_data_bits = range(5, 9) - list_parity = ["none", "mark", "space", "even", "odd"] - for divisor in list_divisor: - for data_bits in list_data_bits: - for parity in list_parity: - with self.subTest(divisor=divisor, data_bits=data_bits, parity=parity): - self.check_formal(divisor=divisor, data_bits=data_bits, parity=parity) - - def check_all_div6(self): + def test_all_div7(self): list_data_bits = range(5, 9) list_parity = ["none", "mark", "space", "even", "odd"] for data_bits in list_data_bits: for parity in list_parity: with self.subTest(data_bits=data_bits, parity=parity): - self.check_formal(divisor=6, data_bits=data_bits, parity=parity) + self.check_formal(divisor=7, data_bits=data_bits, parity=parity) From d973f00252c7c019919f69beb29dc1248478000d Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Fri, 3 Jan 2020 14:18:28 +0800 Subject: [PATCH 07/12] serial: fix styling --- nmigen_stdio/serial.py | 8 ++------ nmigen_stdio/test/test_serial.py | 34 ++++++++++++++++++++------------ 2 files changed, 23 insertions(+), 19 deletions(-) diff --git a/nmigen_stdio/serial.py b/nmigen_stdio/serial.py index 66b5b60..f340f91 100644 --- a/nmigen_stdio/serial.py +++ b/nmigen_stdio/serial.py @@ -59,7 +59,6 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.r_rdy = Signal() self.i = Signal(reset=1) - self._pins = pins self.timer = Signal.like(self.divisor) @@ -67,7 +66,6 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.bits_left = Signal(range(len(self.shreg) + 1)) self.done = Signal() - def elaborate(self, platform): m = Module() @@ -161,19 +159,17 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.w_done = Signal() self.o = Signal(reset=1) - self._pins = pins - + self.timer = Signal.like(self.divisor) self.shreg = Record(_wire_layout(len(self.data), self._parity)) self.bits_left = Signal(range(len(self.shreg) + 1)) - def elaborate(self, platform): m = Module() timer = self.timer - shreg = self.shreg + shreg = self.shreg bits_left = self.bits_left if self._pins is not None: diff --git a/nmigen_stdio/test/test_serial.py b/nmigen_stdio/test/test_serial.py index 8f5e681..5286dbf 100644 --- a/nmigen_stdio/test/test_serial.py +++ b/nmigen_stdio/test/test_serial.py @@ -82,12 +82,14 @@ def test_err_frame(self): def test_err_overflow(self): self.dut = AsyncSerialRX(divisor=7) + def process(): self.assertFalse((yield self.dut.rdy)) yield from self.tx_bits([0, 0,0,0,0,0,0,0,0, 1]) yield from self.tx_period() self.assertFalse((yield self.dut.rdy)) self.assertTrue((yield self.dut.err.overflow)) + simulation_test(self.dut, process) @@ -104,7 +106,7 @@ def process(): yield self.dut.ack.eq(0) for _ in range(10): yield from self.tx_period() - yield from self.tx_period() # Check 1 more period + yield from self.tx_period() # Check 1 more period simulation_test(self.dut, process) def test_8n1(self): @@ -225,15 +227,15 @@ def elaborate(self, platform): Assert(rx_fsm.ongoing("DONE")), Assert(tx_fsm.ongoing("DONE")) ] - ## RX r_rdy + # RX r_rdy with m.If(Past(rx.busy, 2) & ~Stable(rx.busy, 1)): m.d.comb += Assert(rx.r_rdy) with m.If(Stable(rx.r_rdy) & rx.r_rdy): - m.d.comb += Assert(Stable(rx.data) & + m.d.comb += Assert(Stable(rx.data) & Stable(rx.err.overflow) & Stable(rx.err.frame) & Stable(rx.err.parity)) - ## TX w_done + # TX w_done with m.If(Past(tx.busy) & ~Stable(tx.busy)): m.d.comb += Assert(tx.w_done) @@ -243,7 +245,9 @@ def elaborate(self, platform): class AsyncSerialLoopbackTestCase(FHDLTestCase): def check_formal(self, *, divisor, data_bits, parity): depth = (divisor+1) * (data_bits+(3 if parity!="none" else 2) + 2) - self.assertFormal(AsyncSerialLoopbackSpec(divisor=divisor, data_bits=data_bits, parity=parity), + self.assertFormal(AsyncSerialLoopbackSpec(divisor=divisor, + data_bits=data_bits, + parity=parity), mode="bmc", depth=depth) def test_all_div7(self): @@ -290,18 +294,20 @@ def elaborate(self, platform): ResetSignal("sync").eq(0), ResetSignal("txclk").eq(0) ] - + # Assumptions for TX fv_tx_bitstream_val = AnyConst(len_bitstream) # Assume the bitstream doesn't have 1-bit delay m.d.comb += Assume(fv_tx_bitstream_val[0] != 1) fv_tx_bitstream = Signal(len_bitstream+1) - fv_tx_extra_bit = AnyConst(1) # A const bit representing the extra bit after the bitstream - fv_tx_overflow = AnyConst(1) # A const flag to determine if the rx_fifo is always full + fv_tx_extra_bit = AnyConst(1) # A const bit representing + # the extra bit after the bitstream + fv_tx_overflow = AnyConst(1) # A const flag to determine if + # the rx_fifo is always full fv_tx_bitno = Signal(range(len(fv_tx_bitstream)+1)) fv_tx_timer = Signal.like(rx.divisor) fv_rx_data = Signal(self.data_bits) - # + # fv_txfifo_num_bits = Signal(range(len(fv_tx_bitstream)+2)) with m.FSM(domain="txclk") as txfifo_fsm: with m.State("WRITE-PREP"): @@ -322,7 +328,7 @@ def elaborate(self, platform): m.d.txclk += txbit_fifo.w_en.eq(0) m.next = "DONE" txfifo_fsm.state.name = "fv_txfifo_fsm_state" - # + # fv_tx_extra_done = Signal() with m.FSM(domain="txclk") as tx_fsm: with m.State("TX-PREP"): @@ -394,11 +400,11 @@ def elaborate(self, platform): # Assertions with m.If(Past(rx_fsm.state) == rx_fsm.encoding["CHECK"]): m.d.comb += Assert(rx_fsm.ongoing("DONE") | rx_fsm.ongoing("ERROR")) - ## RX r_rdy + # RX r_rdy with m.If(Past(rx.busy, 2) & ~Stable(rx.busy, 1)): m.d.comb += Assert(rx.r_rdy) with m.If(Stable(rx.r_rdy) & rx.r_rdy): - m.d.comb += Assert(Stable(rx.data) & + m.d.comb += Assert(Stable(rx.data) & Stable(rx.err.overflow) & Stable(rx.err.frame) & Stable(rx.err.parity)) @@ -454,7 +460,9 @@ def elaborate(self, platform): class AsyncSerialBitstreamTestCase(FHDLTestCase): def check_formal(self, *, divisor, data_bits, parity): depth = (divisor+1) * (data_bits+(3 if parity!="none" else 2) + 2) - self.assertFormal(AsyncSerialBitstreamSpec(divisor=divisor, data_bits=data_bits, parity=parity), + self.assertFormal(AsyncSerialBitstreamSpec(divisor=divisor, + data_bits=data_bits, + parity=parity), mode="bmc", depth=depth) def test_all_div7(self): From bdd32d7e3da4058b1d35c47ce3cda048f4a48a30 Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Fri, 17 Jan 2020 15:44:28 +0800 Subject: [PATCH 08/12] serial: add `TX.continuous` to control if there is a break between tx'es * `TX.continuous` is an externally driven Signal, resets to 0 (deasserted) * When asserted, TX assumes that `TX.data` is valid AT THE SAME TIME when the start bit starts to be sent out * When deasserted, TX assumes that `TX.data` is valid BEFORE the start bit is sent, and AT THE SAME TIME when `TX.ack` is asserted --- nmigen_stdio/serial.py | 56 +++++++++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/nmigen_stdio/serial.py b/nmigen_stdio/serial.py index f340f91..405e6bd 100644 --- a/nmigen_stdio/serial.py +++ b/nmigen_stdio/serial.py @@ -149,6 +149,9 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self._parity = parity self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor) + # tx.continuous: Externally driven; no breaks between transmission of "packets" if asserted + # Useful if ACK stays high until the entire transmission has ended + self.continuous = Signal(reset=0) self.data = Signal(data_bits) # tx.ack: Externally driven, to indicate if transfer of data to transmit is enabled @@ -181,14 +184,49 @@ def elaborate(self, platform): with m.If(self.ack): m.d.sync += [ self.o.eq(0), # Issue start bit ASAP + bits_left.eq(len(self.shreg)), + timer.eq(self.divisor) + ] + # In continous mode, data is valid only at beginning of transmission + # and after ACK is asserted + with m.If(self.continuous): + m.next = "LATCH-DATA" + # In normal mode, data is valid at the same time as ACK is asserted + with m.Else(): + m.d.sync += [ + shreg.start .eq(0), + shreg.data .eq(self.data), + shreg.parity.eq(_compute_parity_bit(self.data, self._parity)), + shreg.stop .eq(1), + ] + m.next = "BUSY" + + with m.State("LATCH-DATA"): + # In continuous mode, data is valid only at beginning of transmission; + # Latch the data either before timer reaches 0 on the first bit, + # or, if timer is always 0 (i.e. divisor is 0), + # latch the first data bit to output for the next clock directly + with m.If(self.divisor == 0): + # On the next clock, do: + m.d.sync += [ + self.o.eq(self.data[0]), # Send data[0], and + shreg.eq(Cat( # Store to shreg: + self.data[1:], # data[1:] + _compute_parity_bit(self.data, self._parity), # parity + 1, 0, 0 # stop bit + )) + ] + with m.Else(): + # On the next clock, latch the data to shreg and keep counting down + m.d.sync += [ shreg.start .eq(0), shreg.data .eq(self.data), shreg.parity.eq(_compute_parity_bit(self.data, self._parity)), shreg.stop .eq(1), - bits_left.eq(len(self.shreg)), - timer.eq(self.divisor) + timer.eq(timer - 1) ] - m.next = "BUSY" + # Resume BUSY state + m.next = "BUSY" with m.State("BUSY"): with m.If(timer != 0): @@ -211,18 +249,14 @@ def elaborate(self, platform): m.next = "DONE" with m.State("DONE"): - # Accept next sequence if ACK is still high - with m.If(self.ack): + # In continuous mode, accept next sequence if ACK is still high + with m.If(self.ack & self.continuous): m.d.sync += [ self.o.eq(0), # Issue start bit ASAP - shreg.start .eq(0), - shreg.data .eq(self.data), - shreg.parity.eq(_compute_parity_bit(self.data, self._parity)), - shreg.stop .eq(1), bits_left.eq(len(self.shreg)), timer.eq(self.divisor) ] - m.next = "BUSY" + m.next = "LATCH-DATA" # Set back to IDLE if ACK is low with m.Else(): m.d.sync += self.o.eq(1) @@ -230,7 +264,7 @@ def elaborate(self, platform): m.d.comb += [ self.w_done.eq(fsm.ongoing("DONE")), - self.busy.eq(fsm.ongoing("BUSY")) + self.busy.eq(fsm.ongoing("LATCH-DATA") | fsm.ongoing("BUSY")) ] return m From f64d8d7b66734b5765e912e8ccbf8b7304ffc72d Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Fri, 31 Jan 2020 10:12:20 +0800 Subject: [PATCH 09/12] serial: fix RX logic on continuous bytes; fix reversed data in RX tests --- nmigen_stdio/serial.py | 41 +++++++++++++------------------- nmigen_stdio/test/test_serial.py | 30 ++++++++++++----------- 2 files changed, 33 insertions(+), 38 deletions(-) diff --git a/nmigen_stdio/serial.py b/nmigen_stdio/serial.py index 405e6bd..5b59857 100644 --- a/nmigen_stdio/serial.py +++ b/nmigen_stdio/serial.py @@ -63,7 +63,7 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.timer = Signal.like(self.divisor) self.shreg = Record(_wire_layout(len(self.data), self._parity)) - self.bits_left = Signal(range(len(self.shreg) + 1)) + self.bits_left = Signal(range(len(self.shreg))) self.done = Signal() def elaborate(self, platform): @@ -81,7 +81,7 @@ def elaborate(self, platform): with m.State("IDLE"): with m.If(~self.i): m.d.sync += [ - bits_left.eq(len(shreg)), + bits_left.eq(len(shreg) - 1), timer.eq(self.divisor >> 1) ] m.next = "BUSY" @@ -89,18 +89,18 @@ def elaborate(self, platform): with m.State("BUSY"): with m.If(timer != 0): m.d.sync += timer.eq(timer - 1) - with m.If(((timer == 0) & (bits_left != 1)) | ((timer == 1) & (bits_left == 1))): + with m.Else(): m.d.sync += [ shreg.eq(Cat(shreg[1:], self.i)), - bits_left.eq(bits_left - 1) + bits_left.eq(bits_left - 1), + timer.eq(self.divisor) ] - with m.If(bits_left != 1): - m.d.sync += timer.eq(self.divisor) - with m.Else(): + with m.If(((self.divisor != 0) & (bits_left == 0)) | + ((self.divisor == 0) & (bits_left == 1))): m.next = "DONE" with m.State("DONE"): - with m.If(~done & (timer == 0)): + with m.If(timer == self.divisor): with m.If(self.ack): m.d.sync += [ self.data.eq(shreg.data), @@ -109,34 +109,27 @@ def elaborate(self, platform): _compute_parity_bit(shreg.data, self._parity))) ] m.d.sync += [ - self.err.overflow.eq(~self.ack) - ] - # At second clock edge for DONE, - # timer is set as divisor - m.d.sync += [ - timer.eq(self.divisor), - done.eq(1) + self.err.overflow.eq(~self.ack), + self.r_rdy.eq(1) ] - # Continue the timer to receive one more bit - # Set back to IDLE if it is still stop bit with m.If(timer != 0): m.d.sync += timer.eq(timer - 1) - with m.Elif(done): - m.d.sync += done.eq(0) + with m.Else(): + # Check if this next bit is start bit + # (useful for divisor == 0) with m.If(~self.i): m.d.sync += [ shreg.eq(Cat(shreg[1:], self.i)), - bits_left.eq(len(shreg)), + bits_left.eq(len(shreg) - 2), timer.eq(self.divisor) ] m.next = "BUSY" with m.Else(): m.next = "IDLE" - m.d.comb += [ - self.r_rdy.eq(fsm.ongoing("DONE") & done), - self.busy.eq(fsm.ongoing("BUSY")) - ] + m.d.comb += self.busy.eq(fsm.ongoing("BUSY")) + with m.If(self.r_rdy): + m.d.sync += self.r_rdy.eq(0) return m diff --git a/nmigen_stdio/test/test_serial.py b/nmigen_stdio/test/test_serial.py index 5286dbf..4c0d190 100644 --- a/nmigen_stdio/test/test_serial.py +++ b/nmigen_stdio/test/test_serial.py @@ -17,6 +17,7 @@ def simulation_test(dut, process): sim.run() +# TODO: Add test cases for continuous RX class AsyncSerialRXTestCase(unittest.TestCase): def tx_period(self): for _ in range((yield self.dut.divisor) + 1): @@ -29,10 +30,10 @@ def tx_bits(self, bits): def rx_test(self, bits, *, data=None, errors=None): def process(): - self.assertFalse((yield self.dut.rdy)) + self.assertFalse((yield self.dut.r_rdy)) yield self.dut.ack.eq(1) yield from self.tx_bits(bits) - while not (yield self.dut.rdy): + while not (yield self.dut.r_rdy): yield if data is not None: self.assertFalse((yield self.dut.err)) @@ -45,35 +46,35 @@ def process(): def test_8n1(self): self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="none") - self.rx_test([0, 1,0,1,0,1,1,1,0, 1], data=0b10101110) + self.rx_test([0, 1,0,1,0,1,1,1,0, 1], data=0b01110101) def test_16n1(self): self.dut = AsyncSerialRX(divisor=7, data_bits=16, parity="none") self.rx_test([0, 1,0,1,0,1,1,1,0,1,1,1,1,0,0,0,0, 1], - data=0b1010111011110000) + data=0b0000111101110101) def test_8m1(self): self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="mark") - self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], data=0b10101110) - self.rx_test([0, 1,0,1,0,1,1,0,0, 1, 1], data=0b10101100) + self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], data=0b01110101) + self.rx_test([0, 1,0,1,0,1,1,0,0, 1, 1], data=0b00110101) self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], errors={"parity"}) def test_8s1(self): self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="space") - self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], data=0b10101110) - self.rx_test([0, 1,0,1,0,1,1,0,0, 0, 1], data=0b10101100) + self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], data=0b01110101) + self.rx_test([0, 1,0,1,0,1,1,0,0, 0, 1], data=0b00110101) self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], errors={"parity"}) def test_8e1(self): self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="even") - self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], data=0b10101110) - self.rx_test([0, 1,0,1,0,1,1,0,0, 0, 1], data=0b10101100) + self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], data=0b01110101) + self.rx_test([0, 1,0,1,0,1,1,0,0, 0, 1], data=0b00110101) self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], errors={"parity"}) def test_8o1(self): self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="odd") - self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], data=0b10101110) - self.rx_test([0, 1,0,1,0,1,1,0,0, 1, 1], data=0b10101100) + self.rx_test([0, 1,0,1,0,1,1,1,0, 0, 1], data=0b01110101) + self.rx_test([0, 1,0,1,0,1,1,0,0, 1, 1], data=0b00110101) self.rx_test([0, 1,0,1,0,1,1,1,0, 1, 1], errors={"parity"}) def test_err_frame(self): @@ -84,15 +85,16 @@ def test_err_overflow(self): self.dut = AsyncSerialRX(divisor=7) def process(): - self.assertFalse((yield self.dut.rdy)) + self.assertFalse((yield self.dut.r_rdy)) yield from self.tx_bits([0, 0,0,0,0,0,0,0,0, 1]) yield from self.tx_period() - self.assertFalse((yield self.dut.rdy)) + self.assertFalse((yield self.dut.r_rdy)) self.assertTrue((yield self.dut.err.overflow)) simulation_test(self.dut, process) +# TODO: Add test cases for continuous TX class AsyncSerialTXTestCase(unittest.TestCase): def tx_period(self): for _ in range((yield self.dut.divisor) + 1): From 81f63567c218fdd8e6a69dd83fc6e3e1243ac539 Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Sun, 1 Mar 2020 11:51:16 +0800 Subject: [PATCH 10/12] serial: add tests to simulate continuous & non-continuous TX/RX --- nmigen_stdio/test/test_serial.py | 91 ++++++++++++++++++++++++++++---- 1 file changed, 80 insertions(+), 11 deletions(-) diff --git a/nmigen_stdio/test/test_serial.py b/nmigen_stdio/test/test_serial.py index 4c0d190..10cb7e7 100644 --- a/nmigen_stdio/test/test_serial.py +++ b/nmigen_stdio/test/test_serial.py @@ -44,10 +44,57 @@ def process(): self.assertTrue((yield getattr(self.dut.err, error))) simulation_test(self.dut, process) + def rx_test_multiple(self, bits, *, data_bits, parity, dataset=None, errorset=None, continuous=False): + def process(): + tx_bitlen = (data_bits+(2 if parity=="none" else 3)) + if len(bits) % tx_bitlen != 0: + raise ValueError("Total number of bits {} is not a multiple of number of bits per transmission {}" + .format(bits, tx_bitlen)) + self.assertFalse((yield self.dut.r_rdy)) + rx_count = 0 + for bit in bits: + yield self.dut.ack.eq(1) + for _ in range((yield self.dut.divisor) + 1): + if (yield self.dut.r_rdy): + if dataset is not None: + data = dataset[rx_count] + self.assertFalse((yield self.dut.err)) + self.assertEqual((yield self.dut.data), data) + if errorset is not None: + errors = errorset[rx_count] + self.assertTrue((yield self.dut.err)) + for error in errors: + self.assertTrue((yield getattr(self.dut.err, error))) + rx_count += 1 + if not continuous: + for __ in range((yield self.dut.divisor) + 1): + yield + yield + yield self.dut.i.eq(bit) + simulation_test(self.dut, process) + def test_8n1(self): self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="none") self.rx_test([0, 1,0,1,0,1,1,1,0, 1], data=0b01110101) + def test_8n1_multiple_continuous(self): + self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="none") + self.rx_test_multiple( + [0, 0,1,0,1,0,1,0,1, 1, + 0, 1,0,1,0,1,0,1,0, 1], data_bits=8, parity="none", + dataset=[0b10101010, 0b01010101], + continuous=True + ) + + def test_8n1_multiple_notcontinuous(self): + self.dut = AsyncSerialRX(divisor=7, data_bits=8, parity="none") + self.rx_test_multiple( + [0, 0,1,0,1,0,1,0,1, 1, + 0, 1,0,1,0,1,0,1,0, 1], data_bits=8, parity="none", + dataset=[0b10101010, 0b01010101], + continuous=False + ) + def test_16n1(self): self.dut = AsyncSerialRX(divisor=7, data_bits=16, parity="none") self.rx_test([0, 1,0,1,0,1,1,1,0,1,1,1,1,0,0,0,0, 1], @@ -111,34 +158,56 @@ def process(): yield from self.tx_period() # Check 1 more period simulation_test(self.dut, process) + def tx_test_multiple(self, *, dataset, continuous=False): + def process(): + if continuous: + yield self.dut.continuous.eq(1) + yield self.dut.ack.eq(1) + yield self.dut.data.eq(dataset[0]) + yield + for _ in range(10): + yield from self.tx_period() + for data in dataset[1:]: + if not continuous: + yield + yield self.dut.data.eq(data) + yield + for _ in range(10): + yield from self.tx_period() + yield from self.tx_period() # Check 1 more period + simulation_test(self.dut, process) + def test_8n1(self): self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="none") - self.tx_test(data=0x10101010) + self.tx_test(data=0b10101010) - def test_8n1_100e6_112500(self): - div = round(100e6/112500)-1 - self.dut = AsyncSerialTX(divisor=div, data_bits=8, parity="none") - self.tx_test(data=32) + def test_8n1_multiple_continuous(self): + self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="none") + self.tx_test_multiple(dataset=[0b10101010, 0b01010101], continuous=True) + + def test_8n1_multiple_notcontinuous(self): + self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="none") + self.tx_test_multiple(dataset=[0b10101010, 0b01010101], continuous=False) def test_16n1(self): self.dut = AsyncSerialTX(divisor=7, data_bits=16, parity="none") - self.tx_test(data=0x0101011011110100) + self.tx_test(data=0b0101011011110100) def test_8m1(self): self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="mark") - self.tx_test(data=0x00111100) + self.tx_test(data=0b00111100) def test_8s1(self): self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="space") - self.tx_test(data=0x11011010) + self.tx_test(data=0b11011010) def test_8e1(self): self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="even") - self.tx_test(data=0x00101110) + self.tx_test(data=0b00101110) def test_8o1(self): self.dut = AsyncSerialTX(divisor=7, data_bits=8, parity="odd") - self.tx_test(data=0x01110011) + self.tx_test(data=0b01110011) def tx_bits(self, data_bits): for _ in range(data_bits): @@ -461,7 +530,7 @@ def elaborate(self, platform): class AsyncSerialBitstreamTestCase(FHDLTestCase): def check_formal(self, *, divisor, data_bits, parity): - depth = (divisor+1) * (data_bits+(3 if parity!="none" else 2) + 2) + depth = (divisor+1) * (data_bits+(3 if parity!="none" else 2) + 2) + 6 self.assertFormal(AsyncSerialBitstreamSpec(divisor=divisor, data_bits=data_bits, parity=parity), From 46dac6a61987c2f3de1f9ba598efab72f754559c Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Thu, 23 Apr 2020 15:56:46 +0800 Subject: [PATCH 11/12] serial: fix bitstream test to add variable delay before the extra bit * This FV case now inserts any delay (0 - divisor+1) between the final bit and the start bit (0) of the next sequence. --- nmigen_stdio/test/test_serial.py | 121 ++++++++++++++++++++++--------- 1 file changed, 87 insertions(+), 34 deletions(-) diff --git a/nmigen_stdio/test/test_serial.py b/nmigen_stdio/test/test_serial.py index 10cb7e7..de937b7 100644 --- a/nmigen_stdio/test/test_serial.py +++ b/nmigen_stdio/test/test_serial.py @@ -17,7 +17,6 @@ def simulation_test(dut, process): sim.run() -# TODO: Add test cases for continuous RX class AsyncSerialRXTestCase(unittest.TestCase): def tx_period(self): for _ in range((yield self.dut.divisor) + 1): @@ -141,7 +140,6 @@ def process(): simulation_test(self.dut, process) -# TODO: Add test cases for continuous TX class AsyncSerialTXTestCase(unittest.TestCase): def tx_period(self): for _ in range((yield self.dut.divisor) + 1): @@ -216,6 +214,16 @@ def tx_bits(self, data_bits): class AsyncSerialLoopbackSpec(Elaboratable): + """This FV model is to verify the behaviours of the transmitter and receiver with data of + variable width. The test case contains the following independent variables: + + * Data width: the number of bits to transmit per unit of transmission + * Divisor: the value of the divisor register + * Parity: the parity mode + + Only when different arguments are passed to the unit test function will these independent + variables change. + """ def __init__(self, *, divisor, data_bits, parity): self.rx = AsyncSerialRX(divisor=divisor, data_bits=data_bits, parity=parity) self.tx = AsyncSerialTX(divisor=divisor, data_bits=data_bits, parity=parity) @@ -227,17 +235,19 @@ def elaborate(self, platform): m.submodules.rx = rx = self.rx m.submodules.tx = tx = self.tx + # RX FIFO: stores data received by RX m.submodules.rx_fifo = rx_fifo = SyncFIFO(width=self.data_bits, depth=1) + # TX FIFO: stores data to be sent by TX m.submodules.tx_fifo = tx_fifo = SyncFIFO(width=self.data_bits, depth=1) m.d.comb += [ - # + # Connect TX to the TX FIFO tx.ack.eq(tx_fifo.r_rdy), tx_fifo.r_en.eq(~tx.busy), tx.data.eq(tx_fifo.r_data), - # + # Connect TX output to RX input rx.i.eq(tx.o), - # + # Connect RX to the RX FIFO rx.ack.eq(rx_fifo.w_rdy), rx_fifo.w_en.eq(rx.r_rdy), rx_fifo.w_data.eq(rx.data) @@ -331,6 +341,16 @@ def test_all_div7(self): class AsyncSerialBitstreamSpec(Elaboratable): + """This FV model is to verify the continuous behaviour of the receiver with data of + variable width. The test case contains the following independent variables: + + * Data width: the number of bits to transmit per unit of transmission + * Divisor: the value of the divisor register + * Parity: the parity mode + + Only when different arguments are passed to the unit test function will these independent + variables change. + """ def __init__(self, *, divisor, data_bits, parity): self.rx = AsyncSerialRX(divisor=divisor, data_bits=data_bits, parity=parity) self.divisor = divisor @@ -342,16 +362,26 @@ def elaborate(self, platform): m.submodules.rx = rx = self.rx len_bitstream = self.data_bits+(3 if self.parity!="none" else 2) + # RX FIFO: stores data (the first sequence) received by RX m.submodules.rx_fifo = rx_fifo = SyncFIFO(width=self.data_bits, depth=1) + # TX bit FIFO: stores the bit sequence generated and sends it to RX bit by bit m.submodules.txbit_fifo = txbit_fifo = SyncFIFO(width=1, depth=len_bitstream+2) - + # A const flag to determine if an extra bit of 0 or 1 should be sent + fv_tx_extra_bit = AnyConst(1) + # A flag to indicate if an extra bit is being sent + fv_tx_extra_send = Signal() + # A flag to indicate if the TX FIFO has started to be read fv_txfifo_start = Signal() + # A flag to indicate if the Tx FIFO has ended + fv_txfifo_end = Signal() with m.If(fv_txfifo_start): - m.d.sync += rx.i.eq(txbit_fifo.r_data) + m.d.sync += rx.i.eq( + Mux(fv_tx_extra_send, fv_tx_extra_bit, txbit_fifo.r_data) + ) with m.Else(): m.d.sync += rx.i.eq(1) m.d.comb += [ - # + # Connect RX to the RX FIFO rx.ack.eq(rx_fifo.w_rdy), rx_fifo.w_en.eq(rx.r_rdy), rx_fifo.w_data.eq(rx.data) @@ -370,26 +400,32 @@ def elaborate(self, platform): fv_tx_bitstream_val = AnyConst(len_bitstream) # Assume the bitstream doesn't have 1-bit delay m.d.comb += Assume(fv_tx_bitstream_val[0] != 1) - fv_tx_bitstream = Signal(len_bitstream+1) - fv_tx_extra_bit = AnyConst(1) # A const bit representing - # the extra bit after the bitstream - fv_tx_overflow = AnyConst(1) # A const flag to determine if - # the rx_fifo is always full - fv_tx_bitno = Signal(range(len(fv_tx_bitstream)+1)) + fv_tx_bitstream = Signal(len_bitstream) + # Delay for issuing an extra bit after the bitstream + fv_tx_end_delay = Signal(range(self.divisor + 2)) + m.d.comb += [ + Assume(Stable(fv_tx_end_delay)), + Assume((fv_tx_end_delay >= 0) & (fv_tx_end_delay <= rx.divisor + 1)) + ] + # A const flag to determine if the rx_fifo is always full + fv_tx_overflow = AnyConst(1) + fv_tx_bitno = Signal(range(len_bitstream+1)) fv_tx_timer = Signal.like(rx.divisor) fv_rx_data = Signal(self.data_bits) - # - fv_txfifo_num_bits = Signal(range(len(fv_tx_bitstream)+2)) + # A flag to indicate if the extra bit (0) has been issued + fv_tx_extra_done = Signal() + # FSM for storing the bit sequence generated + fv_txfifo_num_bits = Signal(range(len_bitstream+2)) with m.FSM(domain="txclk") as txfifo_fsm: with m.State("WRITE-PREP"): m.d.txclk += [ - fv_tx_bitstream.eq(Cat(fv_tx_bitstream_val, fv_tx_extra_bit)), + fv_tx_bitstream.eq(Cat(fv_tx_bitstream_val)), fv_txfifo_num_bits.eq(0), txbit_fifo.w_en.eq(1) ] m.next = "WRITE-BITSTREAM" with m.State("WRITE-BITSTREAM"): - with m.If(fv_txfifo_num_bits != len(fv_tx_bitstream)+1): + with m.If(fv_txfifo_num_bits != len_bitstream+1): m.d.txclk += [ txbit_fifo.w_data.eq(fv_tx_bitstream[0]), fv_tx_bitstream.eq(Cat(fv_tx_bitstream[1:], 0)), @@ -399,14 +435,13 @@ def elaborate(self, platform): m.d.txclk += txbit_fifo.w_en.eq(0) m.next = "DONE" txfifo_fsm.state.name = "fv_txfifo_fsm_state" - # - fv_tx_extra_done = Signal() + # FSM for sending the bit sequence bit by bit with m.FSM(domain="txclk") as tx_fsm: with m.State("TX-PREP"): m.d.txclk += txbit_fifo.r_en.eq(0) with m.If(txbit_fifo.r_rdy): m.d.txclk += [ - fv_tx_bitno.eq(len(fv_tx_bitstream)), + fv_tx_bitno.eq(len_bitstream), fv_tx_timer.eq(1) ] m.next = "TX-SENDBIT" @@ -416,7 +451,7 @@ def elaborate(self, platform): m.d.txclk += fv_tx_timer.eq(fv_tx_timer - 1) with m.If((fv_tx_timer == 1) & (fv_tx_bitno != 0)): m.d.txclk += txbit_fifo.r_en.eq(1) - with m.If(fv_tx_bitno == len(fv_tx_bitstream)): + with m.If(fv_tx_bitno == len_bitstream): m.d.txclk += fv_txfifo_start.eq(1) with m.Else(): m.d.txclk += [ @@ -424,13 +459,36 @@ def elaborate(self, platform): fv_tx_timer.eq(self.divisor), ] with m.If(fv_tx_bitno == 0): - m.d.txclk += fv_tx_extra_done.eq(1) - m.next = "DONE" + m.d.txclk += fv_txfifo_end.eq(1) + with m.If(fv_tx_end_delay == 0): + m.d.txclk += [ + fv_tx_timer.eq(self.divisor), + fv_tx_extra_send.eq(1) + ] + m.next = "TX-SENDEXTRA" + with m.Else(): + m.d.txclk += fv_tx_timer.eq(fv_tx_end_delay) + m.next = "TX-ENDDELAY" + with m.State("TX-ENDDELAY"): + with m.If(fv_tx_timer != 0): + m.d.txclk += fv_tx_timer.eq(fv_tx_timer - 1) + with m.Else(): + m.d.txclk += [ + fv_tx_timer.eq(self.divisor), + fv_tx_extra_send.eq(1) + ] + m.next = "TX-SENDEXTRA" + with m.State("TX-SENDEXTRA"): + with m.If(fv_tx_timer != 0): + m.d.txclk += fv_tx_timer.eq(fv_tx_timer - 1) + with m.Else(): + m.d.txclk += fv_tx_extra_done.eq(1) + m.next = "DONE" tx_fsm.state.name = "fv_tx_fsm_state" # Assumptions for RX m.d.comb += Assume(Stable(rx.divisor)) - # + # Connect RX to the RX FIFO with m.If(fv_tx_overflow): m.d.comb += Assume(rx_fifo.w_rdy == 0) with m.Else(): @@ -438,7 +496,7 @@ def elaborate(self, platform): rx_fifo.w_en.eq(rx.r_rdy), rx_fifo.w_data.eq(rx.data) ] - # + # FSM for storing the received data with m.FSM() as rx_fsm: with m.State("RX-1"): with m.If(~rx_fifo.w_rdy): @@ -514,15 +572,10 @@ def elaborate(self, platform): with m.If(rx.err.overflow): m.d.comb += Assert(fv_tx_overflow) - with m.If(~fv_tx_extra_bit & fv_tx_extra_done): - m.d.comb += [ - Assert(rx.i == 0), - Assert(rx.busy == 1) # BUSY to read the next sequence - ] - with m.Elif(fv_tx_extra_bit & fv_tx_extra_done): + with m.If(fv_tx_extra_done): m.d.comb += [ - Assert(rx.i == 1), - Assert(rx.busy == 0) + Assert(rx.i == fv_tx_extra_bit), + Assert(rx.busy == ~fv_tx_extra_bit) ] return m From b72b54e48beadec525bf03c0287195f7ef6df6bb Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Thu, 23 Apr 2020 20:19:02 +0800 Subject: [PATCH 12/12] serial: add docstring, remove `done` for TX --- nmigen_stdio/serial.py | 110 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 98 insertions(+), 12 deletions(-) diff --git a/nmigen_stdio/serial.py b/nmigen_stdio/serial.py index 5b59857..dd7183e 100644 --- a/nmigen_stdio/serial.py +++ b/nmigen_stdio/serial.py @@ -37,7 +37,43 @@ def _wire_layout(data_bits, parity="none"): class AsyncSerialRX(Elaboratable): - """An UART receiver module. Receives the LSB first and MSB last. + """A UART receiver module. Receives the LSB first and MSB last. + + Parameters + ---------- + divisor : int + Reset value of the divisor register, whose value is used as the number of clock cycles + needed per bit. + divisor_bits : int + Number of bits used to form the divisor value. + If not None, the reset value of the divisor is 2**divisor_bits. + data_bits : int + Number of data bits per transfer. + parity : str + Parity mode. Must be either "none", "mark", "space", "even" or "odd" + pins : Record + Corresponding serial communication pins on board. + + Attributes + ---------- + divisor : Signal + Divisor register. + i : Signal() + Input pin. + data : Signal(data_bits) + Returns the data received when `r_rdy` is asserted. + err.overflow : Signal() + Internally driven, to indicate if the transfer has overflown. + err.frame : Signal() + Internally driven, to indicate if the transfer has framing error. + err.parity : Signal() + Internally driven, to indicate if the transfer has parity error. + ack : Signal() + Externally driven, to indicate if transfer of received data is enabled. + busy : Signal() + Internally driven, to indicate if there is an ongoing data transfer. + r_rdy : Signal() + Internally driven, to indicate if received data is ready to be read. """ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pins=None): _check_parity(parity) @@ -51,11 +87,8 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi ("frame", 1), ("parity", 1), ]) - # rx.ack: Externally driven, to indicate if transfer of received data is enabled self.ack = Signal() - # rx.busy: Internally driven, to indicate if there is an ongoing data transfer self.busy = Signal() - # rx.r_rdy: Internally driven, to indicate if received data is ready to be read self.r_rdy = Signal() self.i = Signal(reset=1) @@ -64,7 +97,6 @@ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pi self.timer = Signal.like(self.divisor) self.shreg = Record(_wire_layout(len(self.data), self._parity)) self.bits_left = Signal(range(len(self.shreg))) - self.done = Signal() def elaborate(self, platform): m = Module() @@ -72,7 +104,6 @@ def elaborate(self, platform): timer = self.timer shreg = self.shreg bits_left = self.bits_left - done = self.done if self._pins is not None: m.submodules += FFSynchronizer(self._pins.rx.i, self.i, reset=1) @@ -135,23 +166,52 @@ def elaborate(self, platform): class AsyncSerialTX(Elaboratable): - """An UART transmitter module. Transmits the LSB first and MSB last. + """A UART transmitter module. Transmits the LSB first and MSB last. + + Parameters + ---------- + divisor : int + Reset value of the divisor register, whose value is used as the number of clock cycles + needed per bit. + divisor_bits : int + Number of bits used to form the divisor value. + If not None, the reset value of the divisor is 2**divisor_bits. + data_bits : int + Number of data bits per transfer. + parity : str + Parity mode. Must be either "none", "mark", "space", "even" or "odd" + pins : Record + Corresponding serial communication pins on board. + + Attributes + ---------- + divisor : Signal + Divisor register. + o : Signal() + Output pin. + data : Signal(data_bits) + Register storing the data to be sent went `ack` is asserted. + continuous : Signal() + Externally driven; no breaks between frames of transfer if asserted. + Useful if ACK stays high until the entire transmission has ended. + Resets to 0 (deasserted). + ack : Signal() + Externally driven, to indicate if transfer of data to transmit is enabled. + busy : Signal() + Internally driven, to indicate if there is an ongoing data transfer. + w_done : Signal() + Internally driven, to indicate if data has been transmitted. """ def __init__(self, *, divisor, divisor_bits=None, data_bits=8, parity="none", pins=None): _check_parity(parity) self._parity = parity self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor) - # tx.continuous: Externally driven; no breaks between transmission of "packets" if asserted - # Useful if ACK stays high until the entire transmission has ended self.continuous = Signal(reset=0) self.data = Signal(data_bits) - # tx.ack: Externally driven, to indicate if transfer of data to transmit is enabled self.ack = Signal() - # tx.busy: Internally driven, to indicate if there is an ongoing data transfer self.busy = Signal() - # tx.w_done: Internally driven, to indicate if data has been transmitted self.w_done = Signal() self.o = Signal(reset=1) @@ -264,6 +324,32 @@ def elaborate(self, platform): class AsyncSerial(Elaboratable): + """A full UART module. Receives and transmits the LSB first and MSB last. + + Parameters + ---------- + divisor : int + Reset value of the divisor register for both the receiver and transmitter, whose value is + used as the number of clock cycles needed per bit. + divisor_bits : int + Number of bits used to form the divisor value. + If not None, the reset value of the divisor is 2**divisor_bits. + data_bits : int + Number of data bits per transfer. + parity : str + Parity mode. Must be either "none", "mark", "space", "even" or "odd" + pins : Record + Corresponding serial communication pins on board. + + Attributes + ---------- + divisor : Signal + Divisor register. + rx : :class:`AsyncSerialRX` + UART receiver module. + tx : :class:`AsyncSerialTX` + UART transmission module. + """ def __init__(self, *, divisor, divisor_bits=None, **kwargs): self.divisor = Signal(divisor_bits or bits_for(divisor), reset=divisor)