Drivers
Drivers are used to stimulate interfaces on the design according to the implementation's signalling protocols. They convert transactions from the representation used by the testbench into the state of the different signals on the boundary of the design.
Defining a Driver
Drivers inherit from BaseDriver and must
implement the drive
method to convert a transaction object into the
implementation's signalling protocol.
Drivers depend on a transaction class being defined, here a simple stream data object is defined:
from forastero import BaseTransaction
@dataclass(kw_only=True)
class StreamTransaction(BaseTransaction):
data: int = 0
The driver's drive
method consumes these StreamTransaction
objects and
converts them into signal state to the DUT:
from cocotb.triggers import RisingEdge
from forastero import BaseDriver
from .transaction import StreamTransaction
class StreamInitiator(BaseDriver):
async def drive(self, obj: StreamTransaction) -> None:
self.io.set("data", obj.data)
self.io.set("valid", 1)
while True:
await RisingEdge(self.clk)
if self.io.get("ready", 1):
break
self.io.set("valid", 0)
The drive
method is called whenever a transaction is queued onto the driver by
the testcase. It should setup the stimulus, wait until it is accepted by the
design, then return the stimulus to a neutral state. In the example above, flow
is controlled by valid
and ready
signals of the interface - valid
is
setup to qualify data
, then at least one cycle must pass before data is
accepted when both valid
and ready
are high together.
Registering Drivers
Drivers must be registered to the testbench, this ensures that each test waits until all stimulus has been fed into the design before the test is allowed to complete.
from forastero import BaseBench, IORole
from .stream import StreamInitiator, StreamIO
class Testbench(BaseBench):
def __init__(self, dut) -> None:
super().__init__(dut, clk=dut.i_clk, rst=dut.i_rst)
stream_io = StreamIO(dut, "stream", IORole.RESPONDER)
self.register("stream_init",
StreamInitiator(self, stream_io, self.clk, self.rst))
To highlight a few important points:
- The stream interface is wrapped up in a
StreamIO
object (this inherits from BaseIO), that assumes signals ofi_stream_data
,i_stream_valid
, ando_stream_ready
(meaning the DUT is taking the role of an interface 'responder'); - An instance of
StreamInitiator
is created, providing a handle to the testbench, theStreamIO
object, and the associated clock and reset signals; - The instance of
StreamInitiator
is registered to the testbench using the namestream_init
.
Driver Events
As transactions progress through a driver, events are emitted to allow observers such as models or stimulus generation to track its progress. There are 3 different events defined:
- ENQUEUE - emitted when a transaction is enqueued into a driver by a test or sequence, this may happen long before the transaction is driven into the DUT;
- PRE_DRIVE - emitted just prior to a queued transaction being driven into the DUT;
- POST_DRIVE - emitted just after a queued transaction is driven into the DUT;
Subscribing to Events
A callback can be registered against any event, this may either be a synchronous or asynchronous method and will be called every time that the given event occurs:
from forastero.driver import DriverEvent
class Testbench(BaseBench):
def __init__(self, dut) -> None:
super().__init__(dut, clk=dut.i_clk, rst=dut.i_rst)
stream_io = StreamIO(dut, "stream", IORole.RESPONDER)
self.register("stream_init",
StreamInitiator(self, stream_io, self.clk, self.rst))
self.stream_init.subscribe(DriverEvent.PRE_DRIVE, self.stream_pre_drive)
self.stream_init.subscribe(DriverEvent.POST_DRIVE, self.stream_post_drive)
def stream_pre_drive(self,
driver: StreamInitiator,
event: DriverEvent,
obj: StreamTransaction):
self.info(f"Driver is about to drive object: {obj}")
async def stream_post_drive(self,
driver: StreamInitiator,
event: DriverEvent,
obj: StreamTransaction):
self.info(f"Driver has just driven object: {obj}")
await ClockCycles(tb.clk, 10)
# ...generate some stimulus...
Waiting for Events
A test can wait for a specific driver event to occur using the wait_for
method,
this will block until the event happens and then return the transaction that
caused the event:
from forastero.driver import DriverEvent
@Testbench.testcase()
async def my_testcase(tb: Testbench, log: SimLog):
for _ in range(10):
# Generate and queue stimulus
tb.stream_init.enqueue(StreamTransaction(data=tb.random.getrandbits(32)))
# Wait for that stimulus to be driven
await tb.stream_init.wait_for(DriverEvent.POST_DRIVE)
Generating Stimulus
Testcases may queue transactions onto a driver using the enqueue
method - for
example:
from cocotb.log import SimLog
from ..stream import StreamTransaction
from ..testbench import Testbench
@Testbench.testcase()
async def random(tb: Testbench, log: SimLog):
for _ in range(100):
tb.stream_init.enqueue(StreamTransaction(data=tb.random.getrandbits(32)))
Note
tb.stream_init
refers to the instance of StreamInitiator
that was
registered onto the testbench in the previous example. The for
loop then
generates a number of StreamTransaction
objects carrying random data.
Drivers can return an event to allow a test or sequence to determine when a
particular transaction reaches a pre or post-drive state. When wait_for
is
provided, the enqueue
function will return a cocotb Event
, and this can be
awaited:
from cocotb.log import SimLog
from forastero.driver import DriverEvent
from ..stream import StreamTransaction
from ..testbench import Testbench
@Testbench.testcase()
async def random(tb: Testbench, log: SimLog):
for _ in range(100):
await tb.stream_init.enqueue(
StreamTransaction(data=tb.random.getrandbits(32)),
wait_for=DriverEvent.POST_DRIVE
).wait()
forastero.driver.BaseDriver
Bases: Component
Component for driving transactions onto an interface matching the implementation's signalling protocol.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tb
|
Any
|
Handle to the testbench |
required |
io
|
BaseIO
|
Handle to the BaseIO interface |
required |
clk
|
ModifiableObject
|
Clock signal to use when driving/sampling the interface |
required |
rst
|
ModifiableObject
|
Reset signal to use when driving/sampling the interface |
required |
random
|
Random | None
|
Random number generator to use (optional) |
None
|
name
|
str | None
|
Unique name for this component instance (optional) |
None
|
busy
property
Busy when either locked or the queue has outstanding entries
queued
property
Return how many entries are queued up
drive(obj)
async
Placeholder driver, this should be overridden by a child class to match the signalling protocol of the interface's implementation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj
|
BaseTransaction
|
The transaction to drive onto the interface |
required |
enqueue(transaction, wait_for=None)
Queue up a transaction to be driven onto the interface
Parameters:
Name | Type | Description | Default |
---|---|---|---|
transaction
|
BaseTransaction
|
Transaction to queue, must inherit from BaseTransaction |
required |
wait_for
|
DriverEvent | None
|
When defined, this will return an event that can be monitored for a given transaction event occurring |
None
|
forastero.driver.DriverEvent
Bases: Enum
ENQUEUE = auto()
class-attribute
instance-attribute
Emitted when a transaction is enqueued to a driver
POST_DRIVE = auto()
class-attribute
instance-attribute
Emitted just after a queued transaction has been driven into the DUT
PRE_DRIVE = auto()
class-attribute
instance-attribute
Emitted just prior to a queued transaction being driven into the DUT