Sequences
There are two competing objectives when it comes to testing a design:
-
To utilise random stimulus as far as possible in order to avoid implicit assumptions being baked into testcases and hence failing to explore the "unknown-unknowns";
-
To explore the entire state space of the DUT, in some cases reaching very deep sequential pathways that require very specific stimulus.
Forastero offers the concept of 'sequences' to help address both of these objectives. A sequence can act in any way it likes, from providing entirely random stimulus all the way to directing a very specific set of operations. A testcase can schedule any number of sequences to be executed concurrently, allowing different stimulus patterns to be overlayed on each other to create complex interactions.
Defining a Sequence
A sequence is defined using the sequence
decorator, and this must wrap around
an async def
function declaration:
import forastero
from forastero.sequence import SeqContext
@forastero.sequence()
async def very_simple_seq(ctx: SeqContext):
...
A sequence must declare the drivers and monitors that it wishes to interact with,
this is achieved using the requires
decorator:
import forastero
from forastero.sequence import SeqContext, SeqProxy
from ..stream import StreamInitiator, StreamMonitor
@forastero.sequence()
@forastero.requires("stream_drv", StreamInitiator)
@forastero.requires("stream_mon", StreamMonitor)
async def very_simple_seq(ctx: SeqContext,
stream_drv: SeqProxy[StreamInitiator],
stream_mon: SeqProxy[StreamMonitor]):
...
Note
The name of the driver/monitor used in the requires
decorator is internal
to the sequence, it does not relate to the name of the driver/monitor
registered into the testbench. When a sequence is scheduled, the testcase
must identify the driver and monitor that matches each requirement.
A sequence may also accept parameters in order to vary it's behaviour - in the
example below two arguments of repetitions
and data_width
are provided
that the sequence can then use to guide the transactions it produces:
import forastero
from forastero.sequence import SeqContext, SeqProxy
from ..stream import StreamInitiator, StreamMonitor
@forastero.sequence()
@forastero.requires("stream_drv", StreamInitiator)
@forastero.requires("stream_mon", StreamMonitor)
async def very_simple_seq(ctx: SeqContext,
stream_drv: SeqProxy[StreamInitiator],
stream_mon: SeqProxy[StreamMonitor],
repetitions: int = 100,
data_width: int = 64):
...
Locking
As sequences run, they can interact with any number of drivers or monitors to stimulate or observe the design. However, at any one time any number of sequences may be attempting to enqueue stimulus into the DUT and this creates a problem.
For example if sequence A produces entirely random stimulus while sequence B is attempting to configure the DUT in a specific way, then sequence B could be easily disrupted by sequence A and fail to achieve the desired state space.
To resolve this problem Forastero allows sequences to acquire locks that allow them to take sole control of selected drivers and monitors for an arbitrary period of the test.
When a lock is acquired on a driver, it allows the sequence the sole right to
enqueue
transactions to that driver. If the enqueue
function is called by
a sequence that doesn't hold the lock, an exception will always be raised.
When a lock is acquired on a monitor, it allows the sequence the sole right to
observe transactions being captured by that monitor - either via subscribe
or
wait_for
. If no locks are claimed on a monitor, then all active sequences can
observe transactions. If a sequence observes a monitor while the lock is held by
another sequence, then the sequence will not be notified of events while the lock
is held.
import forastero
from forastero.sequence import SeqContext, SeqProxy
from ..stream import StreamInitiator, StreamTransaction
@forastero.sequence()
@forastero.requires("stream_a", StreamInitiator)
@forastero.requires("stream_b", StreamInitiator)
async def weird_traffic(ctx: SeqContext,
stream_a: SeqProxy[StreamInitiator],
stream_b: SeqProxy[StreamInitiator]):
# Send a transaction to just stream A
async with ctx.lock(stream_a):
stream_a.enqueue(StreamTransaction(data=ctx.getrandbits(32)))
# Send a transaction to just stream B
async with ctx.lock(stream_b):
stream_b.enqueue(StreamTransaction(data=ctx.getrandbits(32)))
# Send a transaction to BOTH streams
async with ctx.lock(stream_a, stream_b):
stream_a.enqueue(StreamTransaction(data=ctx.getrandbits(32)))
stream_b.enqueue(StreamTransaction(data=ctx.getrandbits(32)))
Note
This example uses an asynchronous context manager (e.g. async with
) to
claim locks. Any locks claimed by the context are then automatically released
once the program moves beyond the context (i.e. returns to the outer
indentation level).
Sequences can also declare arbitrarily named locks that they can use to
co-ordinate with other sequences using the requires
decorator but without
providing an expected type, for example this may be used to represent some state
of the design that is not directly exposed via the I/Os of the DUT.
import forastero
from forastero.sequence import SeqContext, SeqLock, SeqProxy
from ..apb import ApbInitiator, ApbTransaction
from ..irq import IrqInitiator, IrqTransaction
@forastero.sequence()
@forastero.requires("cfg", ApbInitiator)
@forastero.requires("irq", IrqInitiator)
@forastero.requires("irq_config")
async def trigger_interrupt(ctx: SeqContext,
cfg: SeqProxy[ApbInitiator],
irq: SeqProxy[IrqInitiator],
irq_config: SeqLock):
# Claim drivers and lock to avoid IRQ being re-configured
async with ctx.lock(cfg, irq, irq_config):
# Setup register block to handle IRQs
cfg.enqueue(ApbTransaction(address=0x0, ...))
cfg.enqueue(ApbTransaction(address=0x4, ...))
# Release the APB lock
# NOTE: This relies on other sequences respecting the irq_config lock
ctx.release(cfg)
Warning
Releasing a lock that a sequence doesn't hold will result in an exception being raised.
Sequences may not request more locks until they have released all previously held locks, this means that nesting contexts (as shown below) is illegal and will lead to an exception being raised:
import forastero
from forastero.sequence import SeqContext, SeqLock, SeqProxy
from ..apb import ApbInitiator, ApbTransaction
from ..irq import IrqInitiator, IrqTransaction
@forastero.sequence()
@forastero.requires("cfg", ApbInitiator)
@forastero.requires("irq", IrqInitiator)
@forastero.requires("irq_config")
async def trigger_interrupt(ctx: SeqContext,
cfg: SeqProxy[ApbInitiator],
irq: SeqProxy[IrqInitiator],
irq_config: SeqLock):
async with ctx.lock(cfg):
async with ctx.lock(irq): # ILLEGAL!
async with ctx.lock(irq_config): # ILLEGAL!
...
Auto-Locking Sequences
Some simple sequences may only ever need to claim locks once and then will release
them all once the sequence completes. In such a scenario a sequence can be marked
as 'auto-locking' by providing auto_lock=True
to the sequence
decorator:
import forastero
from forastero.sequence import SeqContext, SeqLock, SeqProxy
from ..apb import ApbInitiator, ApbTransaction
from ..irq import IrqInitiator, IrqTransaction
@forastero.sequence(auto_lock=True)
@forastero.requires("cfg", ApbInitiator)
@forastero.requires("irq", IrqInitiator)
@forastero.requires("irq_config")
async def my_auto_lock_seq(ctx: SeqContext,
cfg: SeqProxy[ApbInitiator],
irq: SeqProxy[IrqInitiator],
irq_config: SeqLock):
# NOTE: The 'async with' is no longer needed!
cfg.enqueue(ApbTransaction(address=0x0, ...))
cfg.enqueue(ApbTransaction(address=0x4, ...))
irq.enqueue(IrqTransaction(...))
Scheduling Sequences
When using sequences the role of the testcase is to select the sequences that
need to be scheduled, provide references to the drivers and monitors, and specify
any parameter values. Sequences are scheduled using the tb.schedule(...)
function:
from cocotb.log import SimLog
import forastero
from forastero.driver import DriverEvent
from forastero.sequence import SeqContext, SeqProxy
from ..stream import StreamInitiator, StreamTransaction
from ..testbench import Testbench
@forastero.sequence()
@forastero.requires("stream_drv", StreamInitiator)
async def random_traffic(ctx: SeqContext,
stream_drv: SeqProxy[StreamInitiator],
length: int = 1,
data_width: int = 32):
for idx in range(length):
async with ctx.lock(stream_drv):
ctx.log.info(f"Driving packet {idx}")
stream_drv.enqueue(StreamTransaction(data=ctx.random.getrandbits(data_width)))
await stream_drv.wait_for(DriverEvent.PRE_DRIVE)
@Testbench.testcase()
async def stress_all_interfaces(tb: Testbench, log: SimLog):
"""Drive lots of random traffic on all stream interfaces"""
for stream in (tb.stream_a, tb.stream_b, tb.stream_c):
tb.schedule(random_traffic(stream_drv=stream, length=1000, data_width=64))
Note
When the testcase schedules the sequence it must provide the right driver or
monitor for each component required by the sequence, in this case the sequence
requires stream_drv
and the testcase is mapping this to stream_a
,
stream_b
, and stream_c
in turn.
Taking a closer look at the schedule
call, there is a difference between the
arguments of the sequence function:
async def random_traffic(ctx: SeqContext,
stream_drv: SeqProxy[StreamInitiator],
length: int = 1,
data_width: int = 32):
...and the schedule
call:
For those struggling to spot the difference, the ctx
argument is missing from
the scheduling call. This is deliberate as the context is filled in by the
testbench as the sequence is scheduled, more details on the context object can
be found below.
Sequence Context
Each sequence scheduled is provided with a unique instance of SeqContext, which provides a number of useful mechanisms:
seq.log
provides a logging context that is unique to the sequence;seq.random
provides an instance of Python's Random class that is seeded in such a way that a given sequence should be invariant run-to-run even if other sequence is altered;seq.lock
andseq.release
allow a testcase to claim and release locks on drivers and monitors as well as arbitrary named locks;seq.ctx
andseq.rst
provide access to the same clock and reset signals that the main testbench uses.
A sequence should always use the context for logging and random number generation rather than accessing the root testbench directly.
Sequence Arbitration
When more than one sequence is scheduled, the arbiter is responsible for deciding which sequence is able to claim locks first - this is especially important when sequences are in contention over shared locks.
The current arbitration implementation is based on random ordering of the queued
sequences (i.e. everything currently waiting on a async with ctx.lock(...)
call).
Future improvements to Forastero will introduce more complex arbitration functions
that allow control over the balancing of different sequences.
Randomised Sequence Arguments
When defining sequences it is good practice to make them reusable and provide arguments to vary the stimulus that the sequence generates. In the sections above it was shown how to declare normal arguments with static default values, but Forastero also offers the possibility to randomise the values of the arguments within simple constraints.
import forastero
from forastero.sequence import SeqContext, SeqProxy
from ..stream import StreamInitiator, StreamMonitor
@forastero.sequence()
@forastero.requires("stream_drv", StreamInitiator)
@forastero.requires("stream_mon", StreamMonitor)
@forastero.randarg("repetitions", range=(100, 300))
@forastero.randarg("data_mode", choices=("random", "zero", "one", "increment"))
async def rand_data_seq(ctx: SeqContext,
stream_drv: SeqProxy[StreamInitiator],
stream_mon: SeqProxy[StreamMonitor],
repetitions: int,
data_mode: str,
data_width: int = 64):
...
The example above defines three arguments to the sequence, of which two are randomised:
repetitions
will take a random value between 100 and 300 (inclusive);data_mode
will select a random string fromrandom
,zero
,one
, orincrement
.
The data_width
argument is not randomised as this is expected to be matched to
a design constant (i.e. the bus width).
The randarg
decorator always requires a variable name and exactly one randomisation
mode, selected from:
range=(X, Y)
selects a random value in the range X to Y inclusive;bit_width=X
selects a random value over a bit width of X bits;choices=(X, Y, Z, ...)
makes a random choice from a restricted list of values.
Randomisation is performed at the point the sequence is scheduled, so code running inside the sequence will see a fixed value. The log contains messages (at debug verbosity) detailing the sequence and variables that was scheduled:
21.00ns DEBUG tb.sequence.rand_data_seq[0] Launching rand_data_seq[0] with variables: {'repetitions': 127, 'data_mode': 'one'}
The values and randomisation behaviour can also be overridden at the point the sequence is scheduled, for example:
@Testbench.testcase()
async def random_traffic(tb: Testbench, log: SimLog):
# Schedule a fixed length burst of entirely random traffic
tb.schedule(rand_data_seq(repetitions=10, data_mode="random"))
# Schedule a burst between 30 and 60 transactions of zero or one traffic
tb.schedule(rand_data_seq(repetitions_range=(30, 60),
data_mode_choices=("one", "zero")))
When providing an override:
<X>=123
will fix the argument to a static value;<X>_range=(Y, Z)
will override the range randomisation betweenY
andZ
;<X>_bit_width=Y
will override the bit-width randomisation toY
bits;<X>_choices=(A, B, C)
will override the choices to be selected from to be one ofA
,B
, orC
.
forastero.sequence.SeqLock
Wraps around cocotb's Lock primitive to also track which sequence currently holds the lock.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Name of the lock |
required |
locked
property
Check if the lock is currently taken
acquire(context)
async
Attempt to acquire a lock, waiting until it becomes available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context
|
SeqContext
|
Reference to the sequence context claiming the lock |
required |
count_all_locks()
classmethod
Return the total number of locks
get_all_component_locks()
classmethod
Return a list of all known component locks
get_all_locks()
classmethod
Return a list of all known locks
get_all_named_locks()
classmethod
Return a list of all known named locks
get_component_lock(comp)
classmethod
Retrieve the shared lock for a specific component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
comp
|
Component
|
Reference to the component |
required |
Returns:
Type | Description |
---|---|
Self
|
The shared lock instance |
get_named_lock(name)
classmethod
Retrieve a shared named lock (distinct from all component locks).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Name of the shared lock |
required |
Returns:
Type | Description |
---|---|
Self
|
The shared lock instance |
release(context)
Release a held lock only if the context matches the current lock holder.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context
|
SeqContext
|
Reference to the sequence context that previously claimed the lock |
required |
forastero.sequence.SeqProxy
Bases: EventEmitter
, Generic[C]
Wraps around a component to provide locking and masking functionality, this is achieved by mocking functionality of a component including intercepting and filtering events.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context
|
SeqContext
|
The sequence context associated to this proxy |
required |
component
|
C
|
The component to proxy |
required |
lock
|
SeqLock
|
The shared lock for the component |
required |
enqueue(*args, **kwds)
Forward an enqueue request through from the proxy to the wrapped driver.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args
|
Any
|
Arguments to forward |
()
|
*kwds
|
Any
|
Keyword arguments to forward |
{}
|
idle()
Forward idle through to the wrapped component
forastero.sequence.SeqArbiter
Arbitrates being queuing sequences to determine which sequences can start based on the locks they are requesting.
queue_for(context, locks)
async
Queue against the arbiter for a collection of locks. The arbiter will schedule the sequence only once all locks can be atomically acquired.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context
|
SeqContext
|
The sequence context queueing |
required |
locks
|
list[SeqLock]
|
The list of locks required |
required |
forastero.sequence.SeqContextEvent
Bases: Enum
forastero.sequence.SeqContext
A context specific to a given sequence invocation that provides logging, random value generation, and lock management.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sequence
|
BaseSequence
|
The sequence being invoked |
required |
log
|
SimLog
|
The root sequencing log (pre-fork) |
required |
random
|
Random
|
The root random instance (pre-fork) |
required |
lock(*lockables)
async
Atomically acquire one or more named or component locks (i.e. locks will only be claimed if all locks are available, otherwise it will wait until such a condition can be met).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*lockables
|
SeqLock | SeqProxy
|
References to named locks or sequence proxies (which will be resolved to the equivalent component locks) |
()
|
release(*lockables)
Release one or more named or component locks, allowing other sequences to move forward. Note that attempting to release a lock that the sequence doesn't hold will raise an exception.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*lockables
|
SeqLock | SeqProxy
|
References to named locks or sequence proxies (which will be resolved to the equivalent component locks) |
()
|
forastero.sequence.BaseSequence
Wraps around a sequencing function and services lock requests and other integration with the testbench.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn
|
Callable
|
The sequencing function being wrapped |
required |
__call__(**kwds)
Call the underlying sequence with any parameters that it requires, and launch prepare it to be launched within a managed context.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**kwds
|
Any keyword arguments |
{}
|
Returns:
Type | Description |
---|---|
The wrapped coroutine |
add_lock(lock_name)
Define an arbitrary lock to support simple cross-sequence co-ordination over resources internal to the design (that are not well captured by a driver or monitor requirement).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lock_name
|
str
|
Name of the lock |
required |
Returns:
Type | Description |
---|---|
Self
|
Self to allow for simple chaining |
add_randarg(name, bit_width=None, range=None, choices=None)
Define an argument that can be randomised in a number of different ways. Only one method of randomisation may be specified.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
Name of the argument |
required |
bit_width
|
int | None
|
Randomise over a given number of bits |
None
|
range
|
tuple[int | float, int | float] | None
|
Randomise over a given range (int or float) |
None
|
choices
|
tuple[Any] | None
|
Make a random selection from a list of choices |
None
|
Returns:
Type | Description |
---|---|
Self
|
Self to allow for simple chaining |
add_requirement(req_name, req_type)
Add a requirement on a driver/monitor that must be provided by the testbench for the sequence to execute.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
req_name
|
str
|
Name of the driver/monitor |
required |
req_type
|
Any
|
Type of the driver/monitor |
required |
Returns:
Type | Description |
---|---|
Self
|
Self to allow for simple chaining |
register(fn)
classmethod
Uniquely wrap a sequencing function inside a BaseSequence, returning the shared instance on future invocations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn
|
Callable | Self
|
The sequencing function |
required |
Returns:
Type | Description |
---|---|
Self
|
The wrapping BaseSequence instance |