from collections import deque
from copy import deepcopy
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple, Union
import sinabs
from .dvs_layer import DVSLayer
from .dynapcnn_layer import DynapcnnLayer
from .exceptions import InvalidModel
[docs]
@dataclass
class LayerConstraints:
kernel_memory: int
neuron_memory: int
bias_memory: int
def fits(self, layer: DynapcnnLayer) -> bool:
layer_summary = layer.memory_summary()
return (
(0 <= layer_summary["kernel"] <= self.kernel_memory)
and (0 <= layer_summary["neuron"] <= self.neuron_memory)
and (0 <= layer_summary["bias"] <= self.bias_memory)
)
[docs]
def find_chip_layers(
layer: DynapcnnLayer, constraints: List[LayerConstraints]
) -> List[int]:
"""Find all layers where a given layer configuration fits.
Args:
layer: DynapCNNLayer.
constraints: A list of all the layer's constraints.
Returns:
A list of indices of layers where the given layer fits.
"""
idx = [i for (i, constraint) in enumerate(constraints) if constraint.fits(layer)]
return idx
[docs]
def get_valid_mapping(
layers: Dict[int, DynapcnnLayer], constraints: List[LayerConstraints]
) -> Dict[int, int]:
"""Given a model, find a valid layer ordering for its placement within the constraints
provided.
Args:
layers: Dict with layer indices as keys and DynapcnnLayer instances as values.
constraints: A list of all the layer's constraints.
Returns:
Dict mapping from layer index (key) to assigned core ID (value).
"""
# Store layer indices and lists of possible target chips in separate lists
layer_indices = []
layer_mapping = []
for layer_index, this_layer in layers.items():
# Skip DVSLayers
if isinstance(this_layer, DynapcnnLayer):
chip_layers = find_chip_layers(this_layer, constraints)
layer_mapping.append(chip_layers)
layer_indices.append(layer_index)
# Make sure only DynapCNNLayers and DVSLayers are passed
elif not isinstance(this_layer, DVSLayer):
raise ValueError(f"Found unexpected layer type: `{type(this_layer)}")
graph = make_flow_graph(layer_mapping, len(constraints))
# use Edmonds' Algorithm to find suitable cores for each DynapCNNLayer.
new_graph = edmonds(graph, 0, len(graph) - 1)
netmap = recover_mapping(new_graph, len(layer_mapping))
# Convert `netmap` to dict mapping from layer index to core ID
return {layer_idx: core_id for layer_idx, core_id in zip(layer_indices, netmap)}
[docs]
@dataclass
class FlowGraphEdge:
s: int
t: int
cap: int
flow: int = 0
rev: Optional["FlowGraphEdge"] = None
def __repr__(self):
return f"FlowGraphEdge from {self.s} to {self.t} with capacity {self.cap} and flow {self.flow}"
[docs]
def edmonds(
graph: List[List[FlowGraphEdge]], source: int, sink: int, verbose: bool = False
) -> List[List[FlowGraphEdge]]:
"""Use Edmonds' Algorithm to compute flow of flow graph
Makes a copy of the graph. The original graph is not changed in place.
Args:
graph List[List[FlowGraphEdge]]): Flow graph representation. Each list
entry corresponds to a node and consists of a list holding the
outgoing edges from this node.
source (int): Index of source node within graph.
sind (int): Index of sink node within graph.
verbose (bool): Print detailed flow information if `True`.
Returns:
New flow graph with calculated flow. Type is List[List[FlowGraphEdge]].
"""
graph = deepcopy(graph)
flow = 0
while True:
q = deque()
q.append(source)
pred = [None for _ in range(len(graph))]
while len(q) != 0:
cur = q.popleft() # current node index
for edge in graph[cur]: # edges to/from current node
if pred[edge.t] is None and edge.t != source and edge.cap > edge.flow:
pred[edge.t] = edge
q.append(edge.t)
if pred[sink] is not None:
delta_flow = float("inf")
edge = pred[sink]
while edge is not None:
prev_flow = delta_flow
delta_flow = min(delta_flow, edge.cap - edge.flow)
if (delta_flow < prev_flow) and verbose:
print(f"found new min flow {delta_flow} on edge {edge}")
edge = pred[edge.s]
edge = pred[sink]
while edge is not None:
edge.flow += delta_flow
edge.rev.flow -= delta_flow
edge = pred[edge.s]
flow += delta_flow
if pred[sink] is None:
break
return graph
[docs]
def make_flow_graph(
layer_mapping: List[List[int]], num_layers: int = 9
) -> List[List[FlowGraphEdge]]:
"""
Make a bipartite flow graph (flow network) given all possible chip layers
for each DynapCNNLayer layer. The goal is to formulate the mapping from
DynapCNNLayer instance to chip layer as a bipartite matching problem. Note that the
flows are not computed yet. The flow for the graph generated here needs to
be populated by calling the method `edmonds`.
Args:
layer_mapping: List of a list of matching chip core indices for each DynapCNNLayer instance.
Eg. [[1,3], [4, 6, 1]] for a two layer model, where each integer is a core index.
num_layers (int): Number of layers on the chip.
Returns:
Flow graph representation. Each list entry corresponds to a node and consists
of a list holding the outgoing edges from this node.
The returned object is of type List[List[FlowGraphEdge]].
"""
graph = []
# add all our nodes
# one source node
graph.append([])
# one node for every layer that will be mapped
for __ in range(len(layer_mapping)):
graph.append([])
# one node for every chip layer
for __ in range(num_layers):
graph.append([])
# one sink node
graph.append([])
# add all node edges
target_offset = len(layer_mapping) + 1
# first from source to all layers
for i in range(len(layer_mapping)):
source_to_layer = FlowGraphEdge(s=0, t=i + 1, cap=1, flow=0)
layer_to_source = FlowGraphEdge(s=i + 1, t=0, cap=0, flow=0)
# fill in reverse pointers
source_to_layer.rev = layer_to_source
layer_to_source.rev = source_to_layer
# append new edges
graph[0].append(source_to_layer)
graph[i + 1].append(layer_to_source)
# then from layers to chip layers
for i, layer_targets in enumerate(layer_mapping):
for target in layer_targets:
layer_to_chip = FlowGraphEdge(
s=i + 1, t=target + target_offset, cap=1, flow=0
)
chip_to_layer = FlowGraphEdge(
s=target + target_offset, t=i + 1, cap=0, flow=0
)
layer_to_chip.rev = chip_to_layer
chip_to_layer.rev = layer_to_chip
graph[i + 1].append(layer_to_chip)
graph[target + target_offset].append(chip_to_layer)
# then from chip layers to sink
sink = len(graph) - 1
for chip_node in range(target_offset, sink):
graph[chip_node].append(FlowGraphEdge(s=chip_node, t=sink, cap=1, flow=0))
graph[sink].append(FlowGraphEdge(s=sink, t=chip_node, cap=0, flow=0))
graph[chip_node][-1].rev = graph[sink][-1]
graph[sink][-1].rev = graph[sink][-1]
return graph
[docs]
def recover_mapping(graph: List[List[FlowGraphEdge]], num_layers: int) -> List[int]:
"""Based on the flow graph retrieve a layer-to-core mapping
Args:
graph List[List[FlowGraphEdge]]): Flow graph representation with flow
calculated. Each list entry corresponds to a node and consists of a
list holding the outgoing edges from this node.
num_layers (int): Number of software layers.
Returns:
Assigned core IDs for each layer in order. Type is List[int].
"""
mapping = []
for i in range(1, num_layers + 1): # `+1` to skip source node
for edge in graph[i]:
if edge.flow == 1:
mapping.append(edge.t - num_layers - 1)
if len(mapping) != num_layers:
raise ValueError(
"No valid mapping found. One or more of the DynapcnnLayers could not be mapped to any core."
"For Speck family you can verify if it is a memory issue by using `utils.validate_memory_mapping_speck()` to get more information."
)
return mapping