Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,8 @@ __pycache__/
/.pdm-build
/.venv
/pdm.lock
/overrides.txt

# simulation
*.gtkw
*.vcd
2 changes: 1 addition & 1 deletion chipflow_digital_ip/io/_glasgow_i2c.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from amaranth import *
from amaranth import *
from amaranth.lib.cdc import FFSynchronizer


Expand Down
22 changes: 13 additions & 9 deletions chipflow_digital_ip/io/_glasgow_iostream.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from amaranth import *
from amaranth import *
from amaranth.lib import data, wiring, stream, io
from amaranth.lib.wiring import In, Out

from amaranth_types.types import ShapeLike


__all__ = ["IOStreamer", "PortGroup"]


class PortGroup:
"""Group of Amaranth library I/O ports.

Expand Down Expand Up @@ -118,7 +122,7 @@ class IOStreamer(wiring.Component):
"""

@staticmethod
def o_stream_signature(ioshape, /, *, ratio=1, meta_layout=0):
def o_stream_signature(ioshape, /, *, ratio=1, meta_layout: ShapeLike = 0):
return stream.Signature(data.StructLayout({
"port": _map_ioshape("o", ioshape, lambda width: data.StructLayout({
"o": width if ratio == 1 else data.ArrayLayout(width, ratio),
Expand All @@ -129,16 +133,16 @@ def o_stream_signature(ioshape, /, *, ratio=1, meta_layout=0):
}))

@staticmethod
def i_stream_signature(ioshape, /, *, ratio=1, meta_layout=0):
def i_stream_signature(ioshape, /, *, ratio=1, meta_layout: ShapeLike = 0):
return stream.Signature(data.StructLayout({
"port": _map_ioshape("i", ioshape, lambda width: data.StructLayout({
"i": width if ratio == 1 else data.ArrayLayout(width, ratio),
})),
"meta": meta_layout,
}))

def __init__(self, ioshape, ports, /, *, ratio=1, init=None, meta_layout=0):
assert isinstance(ioshape, (int, dict))
def __init__(self, ioshape: dict, ports, /, *, ratio=1, init=None, meta_layout: ShapeLike = 0):
assert isinstance(ioshape, dict)
assert ratio in (1, 2)

self._ioshape = ioshape
Expand All @@ -161,7 +165,7 @@ def elaborate(self, platform):
buffer_cls, latency = SimulatableDDRBuffer, 3

if isinstance(self._ports, io.PortLike):
m.submodules.buffer = buffer = buffer_cls("io", self._ports)
m.submodules.buffer = buffer = buffer_cls(io.Direction.Bidir, self._ports)
if isinstance(self._ports, PortGroup):
buffer = {}
for name, sub_port in self._ports.__dict__.items():
Expand Down Expand Up @@ -231,7 +235,7 @@ def delay(value, name):

class IOClocker(wiring.Component):
@staticmethod
def i_stream_signature(ioshape, /, *, _ratio=1, meta_layout=0):
def i_stream_signature(ioshape, /, *, _ratio=1, meta_layout: ShapeLike = 0):
# Currently the only supported ratio is 1, but this will change in the future for
# interfaces like HyperBus.
return stream.Signature(data.StructLayout({
Expand All @@ -245,10 +249,10 @@ def i_stream_signature(ioshape, /, *, _ratio=1, meta_layout=0):
}))

@staticmethod
def o_stream_signature(ioshape, /, *, ratio=1, meta_layout=0):
def o_stream_signature(ioshape, /, *, ratio=1, meta_layout: ShapeLike = 0):
return IOStreamer.o_stream_signature(ioshape, ratio=ratio, meta_layout=meta_layout)

def __init__(self, ioshape, *, clock, o_ratio=1, meta_layout=0, divisor_width=16):
def __init__(self, ioshape, *, clock, o_ratio=1, meta_layout: ShapeLike = 0, divisor_width=16):
assert isinstance(ioshape, dict)
assert isinstance(clock, str)
assert o_ratio in (1, 2)
Expand Down
1 change: 0 additions & 1 deletion chipflow_digital_ip/io/_gpio.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from amaranth import Module, unsigned
from amaranth.lib import wiring
from amaranth.lib.wiring import In, Out, flipped, connect
Expand Down
43 changes: 26 additions & 17 deletions chipflow_digital_ip/io/_rfc_uart.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,26 @@
The Amaranth SoC RFC UART from https://github.com/ChipFlow/chipflow-digital-ip
"""

from typing import Generic, TypeVar

from amaranth import *
from amaranth.lib import stream, wiring
from amaranth.lib.wiring import In, Out, flipped, connect
from amaranth.hdl import ValueCastable

from amaranth_types.types import HasElaborate, ShapeLike, ValueLike

from amaranth_soc import csr


__all__ = ["RxPhySignature", "TxPhySignature", "RxPeripheral", "TxPeripheral", "Peripheral"]

_T_ValueOrValueCastable = TypeVar("_T_ValueOrValueCastable", bound=Value | ValueCastable, covariant=True)
_T_ShapeLike = TypeVar("_T_ShapeLike", bound=ShapeLike, covariant=True)
_T_Symbol_ShapeLike = TypeVar("_T_Symbol_ShapeLike", bound=ShapeLike, covariant=True)


class RxPhySignature(wiring.Signature):
class RxPhySignature(wiring.Signature, Generic[_T_ShapeLike, _T_Symbol_ShapeLike]):
"""Receiver PHY signature.

Parameters
Expand All @@ -38,7 +47,7 @@ class RxPhySignature(wiring.Signature):
Receiver error flag. Pulsed for one clock cycle in case of an implementation-specific error
(e.g. wrong parity bit).
"""
def __init__(self, phy_config_shape, symbol_shape):
def __init__(self, phy_config_shape: _T_ShapeLike, symbol_shape: _T_Symbol_ShapeLike):
super().__init__({
"rst": Out(1),
"config": Out(phy_config_shape),
Expand All @@ -48,7 +57,7 @@ def __init__(self, phy_config_shape, symbol_shape):
})


class TxPhySignature(wiring.Signature):
class TxPhySignature(wiring.Signature, Generic[_T_ShapeLike, _T_Symbol_ShapeLike]):
"""Transmitter PHY signature.

Parameters
Expand All @@ -68,7 +77,7 @@ class TxPhySignature(wiring.Signature):
symbols : :py:`Out(stream.Signature(symbol_shape))`
Symbol stream. The shape of its payload is given by the `symbol_shape` parameter.
"""
def __init__(self, phy_config_shape, symbol_shape):
def __init__(self, phy_config_shape: _T_ShapeLike, symbol_shape: _T_Symbol_ShapeLike):
super().__init__({
"rst": Out(1),
"config": Out(phy_config_shape),
Expand Down Expand Up @@ -98,7 +107,7 @@ def elaborate(self, platform):
return m


class RxPeripheral(wiring.Component):
class RxPeripheral(wiring.Component, Generic[_T_ShapeLike, _T_ValueOrValueCastable, _T_Symbol_ShapeLike]):
class Config(csr.Register, access="rw"):
"""Peripheral configuration register.

Expand Down Expand Up @@ -141,7 +150,7 @@ class PhyConfig(csr.Register, access="rw"):
phy_config_init : :class:`int`
Initial value of the PHY configuration word.
"""
def __init__(self, phy_config_shape, phy_config_init):
def __init__(self, phy_config_shape: _T_ShapeLike, phy_config_init: _T_ValueOrValueCastable):
super().__init__(csr.Field(_PhyConfigFieldAction, phy_config_shape,
init=phy_config_init))

Expand Down Expand Up @@ -199,7 +208,7 @@ class Data(csr.Register, access="r"):
symbol_shape : :ref:`shape-like <lang-shapelike>`
Shape of a symbol.
"""
def __init__(self, symbol_shape):
def __init__(self, symbol_shape: _T_Symbol_ShapeLike):
super().__init__(csr.Field(csr.action.R, symbol_shape))

"""UART receiver peripheral.
Expand All @@ -224,8 +233,8 @@ def __init__(self, symbol_shape):
phy : :py:`Out(RxPhySignature(phy_config_shape, symbol_shape))`
Interface between the peripheral and its PHY.
"""
def __init__(self, *, addr_width, data_width, phy_config_shape=unsigned(16),
phy_config_init=0, symbol_shape=unsigned(8)):
def __init__(self, *, addr_width, data_width, phy_config_shape:_T_ShapeLike = unsigned(16),
phy_config_init: _T_ValueOrValueCastable = Value.cast(0), symbol_shape: _T_Symbol_ShapeLike = unsigned(8)):
regs = csr.Builder(addr_width=addr_width, data_width=data_width)

self._config = regs.add("Config", self.Config())
Expand Down Expand Up @@ -298,7 +307,7 @@ def elaborate(self, platform):
return m


class TxPeripheral(wiring.Component):
class TxPeripheral(wiring.Component, Generic[_T_ShapeLike, _T_Symbol_ShapeLike, _T_ValueOrValueCastable]):
class Config(csr.Register, access="rw"):
"""Peripheral configuration register.

Expand Down Expand Up @@ -341,7 +350,7 @@ class PhyConfig(csr.Register, access="rw"):
phy_config_init : :class:`int`
Initial value of the PHY configuration word.
"""
def __init__(self, phy_config_shape, phy_config_init):
def __init__(self, phy_config_shape: _T_ShapeLike, phy_config_init: _T_ValueOrValueCastable):
super().__init__(csr.Field(_PhyConfigFieldAction, phy_config_shape,
init=phy_config_init))

Expand Down Expand Up @@ -391,7 +400,7 @@ class Data(csr.Register, access="w"):
symbol_shape : :ref:`shape-like <lang-shapelike>`
Shape of a symbol.
"""
def __init__(self, symbol_shape):
def __init__(self, symbol_shape: _T_Symbol_ShapeLike):
super().__init__(csr.Field(csr.action.W, symbol_shape))

"""UART transmitter peripheral.
Expand All @@ -416,8 +425,8 @@ def __init__(self, symbol_shape):
phy : :py:`Out(TxPhySignature(phy_config_shape, symbol_shape))`
Interface between the peripheral and its PHY.
"""
def __init__(self, *, addr_width, data_width=8, phy_config_shape=unsigned(16),
phy_config_init=0, symbol_shape=unsigned(8)):
def __init__(self, *, addr_width, data_width=8, phy_config_shape: _T_ShapeLike = unsigned(16),
phy_config_init: _T_ValueOrValueCastable = Value.cast(0), symbol_shape: _T_Symbol_ShapeLike = unsigned(8)):
regs = csr.Builder(addr_width=addr_width, data_width=data_width)

self._config = regs.add("Config", self.Config())
Expand Down Expand Up @@ -487,7 +496,7 @@ def elaborate(self, platform):
return m


class Peripheral(wiring.Component):
class Peripheral(wiring.Component, Generic[_T_ShapeLike, _T_Symbol_ShapeLike, _T_ValueOrValueCastable]):
"""UART transceiver peripheral.

This peripheral is composed of two subordinate peripherals. A :class:`RxPeripheral` occupies
Expand Down Expand Up @@ -522,8 +531,8 @@ class Peripheral(wiring.Component):
:exc:`TypeError`
If ``addr_width`` is not a positive integer.
"""
def __init__(self, *, addr_width, data_width=8, phy_config_shape=unsigned(16),
phy_config_init=0, symbol_shape=unsigned(8)):
def __init__(self, *, addr_width, data_width=8, phy_config_shape: _T_ShapeLike = unsigned(16),
phy_config_init: _T_ValueOrValueCastable = Value.cast(0), symbol_shape: _T_Symbol_ShapeLike = unsigned(8)):
if not isinstance(addr_width, int) or addr_width <= 0:
raise TypeError(f"Address width must be a positive integer, not {addr_width!r}")

Expand Down
2 changes: 1 addition & 1 deletion chipflow_digital_ip/io/_spi.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from amaranth import Module, Signal, Cat, C, unsigned
from amaranth import *
from amaranth.lib import wiring
from amaranth.lib.wiring import In, Out, connect, flipped

Expand Down
124 changes: 124 additions & 0 deletions chipflow_digital_ip/io/_svtest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import os
import sys
import tomli

from enum import StrEnum, auto
from pathlib import Path
from typing import Dict, Optional, List, Literal, Self

from amaranth.lib import wiring

from pydantic import (
BaseModel, ImportString, JsonValue, ValidationError,
model_validator
)

from chipflow import ChipFlowError


class Files(BaseModel):
module: Optional[ImportString] = None
path: Optional[Path] = None

@model_validator(mode="after")
def verify_module_or_path(self) -> Self:
print(self.module)
print(self.path)
if (self.module and self.path) or (not self.module and not self.path):
raise ValueError("You must set `module` or `path`.")
return self


class GenerateSpinalHDL(BaseModel):

scala_class: str
options: List[str] = []

def generate(self, source_path: Path, dest_path: Path, name: str, parameters: Dict[str, JsonValue]):
gen_args = [o.format(**parameters) for o in self.options]
path = source_path / "ext" / "SpinalHDL"
args=" ".join(gen_args + [f'--netlist-directory={dest_path.absolute()}', f'--netlist-name={name}'])
cmd = f'cd {path} && sbt -J--enable-native-access=ALL-UNNAMED -v "lib/runMain {self.scala_class} {args}"'
os.environ["GRADLE_OPTS"] = "--enable-native-access=ALL-UNNAMED"
print("!!! " + cmd)
if os.system(cmd) != 0:
raise OSError('Failed to run sbt')
return [f'{name}.v']


class Generators(StrEnum):
SPINALHDL = auto()
VERILOG = auto()


class Generate(BaseModel):
parameters: Optional[Dict[str, JsonValue]] = None
generator: Generators
spinalhdl: Optional[GenerateSpinalHDL] = None


class Port(BaseModel):
interface: str # ImportString
params: Optional[Dict[str, JsonValue]] = None
vars: Optional[Dict[str, Literal["int"]]] = None
map: str | Dict[str, Dict[str, str] | str]


class ExternalWrap(BaseModel):
name: str
files: Files
generate: Optional[Generate] = None
clocks: Dict[str, str] = {}
resets: Dict[str, str] = {}
ports: Dict[str,Port] = {}
pins: Dict[str, Port] = {}


if __name__ == "__main__":
with open(sys.argv[1], "rb") as f:
wrapper = tomli.load(f)

try:
# Validate with Pydantic
wrap = ExternalWrap.model_validate(wrapper) # Valiate
print(wrap)

source = Path()
if wrap.files.module:
source = Path(wrap.files.module.data_location)
elif wrap.files.path:
source = wrap.files.path
else:
assert True

if wrap.generate:
dest = Path("./build/verilog")
dest.mkdir(parents=True, exist_ok=True)
files = getattr(wrap.generate, wrap.generate.generator.value).generate(source, Path(dest), wrap.name, wrap.generate.parameters)
print(f'Generated files: {files}')

def init(self, **kwargs):
for name, value in kwargs.items():
setattr(self, name, value)

attr = {
'__init__': init
}
_class = type(wrap.name, (wiring.Component,), attr)




except ValidationError as e:
# Format Pydantic validation errors in a user-friendly way
error_messages = []
for error in e.errors():
location = ".".join(str(loc) for loc in error["loc"])
message = error["msg"]
error_messages.append(f"Error at '{location}': {message}")

error_str = "\n".join(error_messages)
raise ChipFlowError(f"Validation error in chipflow.toml:\n{error_str}")



Loading