Source code for qnet.qhdl.qhdl

# coding=utf-8
#This file is part of QNET.
#
#    QNET is free software: you can redistribute it and/or modify
#    it under the terms of the GNU General Public License as published by
#    the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    QNET is distributed in the hope that it will be useful,
#    but WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
#    GNU General Public License for more details.
#
#    You should have received a copy of the GNU General Public License
#    along with QNET.  If not, see <http://www.gnu.org/licenses/>.
#
# Copyright (C) 2012-2013, Nikolas Tezak
#
###########################################################################

"""
This module contains the code to convert a circuit specified in QHDL into a Gough-James circuit expression.

The other module in this package :py:mod:`qhdl_parser` implements an actual parser for the qhdl source text, while this file then
converts structured netlist information into a circuit expression.

For more details on the QHDL syntax, see :ref:`qhdl_syntax`.
"""

from collections import OrderedDict
from qnet.circuit_components.library import getCDIM




[docs]def my_debug(msg): pass
# print(msg)
[docs]class QHDLError(Exception): pass
[docs]class QHDLObject(object):
[docs] def to_python(self): return self.__repr__()
[docs] def to_qhdl(self): raise NotImplementedError
def __repr__(self): raise NotImplementedError
PORT_DIRECTIONS = ('in', 'out') SIGNAL_TYPES = ('fieldmode') GENERIC_TYPES = ('int', 'real', 'complex')
[docs]def gtype_compatible(c_t, g_t): return GENERIC_TYPES.index(c_t) >= GENERIC_TYPES.index(g_t)
[docs]class BasicInterface(QHDLObject): def __repr__(self): return "%s('%s', %r, %r)" % (self.__class__.__name__, self.identifier, self.generics, self.ports)
[docs] def to_qhdl(self,tab_level): pass
[docs] def generics_to_qhdl(self, tab_level): if len(self.generics) == 0: return "" default_formatter_by_type = {float: (lambda v: '%g' % v), int: (lambda v: '%d' % v), long: (lambda v: '%d' % v), complex:(lambda v: "(%g, %g)" % (v.real, v.imag))} default_str = lambda default: " := %s" % default_formatter_by_type[type(default)](default) \ if default else "" g_str = "generic (%s);\n" % (";\n ".join(["%s: %s%s" % (identifier, gtype, default_str(default)) \ for identifier,(gtype, default) in self.generics.items()])) return (tab_level*"\t") + g_str
[docs] def ports_to_qhdl(self, tab_level): in_str = "%s: in fieldmode" % ",\n ".join(self.port_identifiers[ : self.cdim/2]) out_str = "%s: out fieldmode" % ",\n ".join(self.port_identifiers[self.cdim/2 : ]) p_str = "port (%s;\n %s);" % (in_str, out_str) return (tab_level*"\t") + p_str
# @property # def in_port_identifiers(self): # return self.in_ports.keys() # # @property # def out_port_identifiers(self): # return self.out_ports.keys() # cid = 0 in_port_identifiers = [] out_port_identifiers = [] inout_port_identifiers = [] def __init__(self, identifier, generics, ports): self.identifier = identifier self.cid = 0 self.generics = OrderedDict() self.in_port_identifiers = [] self.out_port_identifiers = [] self.ports = OrderedDict() self.inout_port_identifiers = [] for (identifier_list, gtype, default_val) in generics: if gtype not in GENERIC_TYPES: raise QHDLError("Generic type of generics %s is %s, but must be one of the following: %r" \ % (", ".join(identifier_list), gtype, GENERIC_TYPES)) for gid in identifier_list: if gid in self.generics: raise QHDLError("Generic identifier non-unique: %s" % identifier) self.generics[gid] = gtype, default_val # # rewrite the port list and replace inout ports by a separate in and out port # for (identifier_list, direction, signal_type) in list(ports): # if direction == "inout": # self.inout_port_names += identifier_list # identifier_list_i = [identifier + "_i" for identifier in identifier_list] # identifier_list_o = [identifier + "_o" for identifier in identifier_list] # ports = [(identifier_list_i, 'in', signal_type), (identifier_list_o, 'out', signal_type)] # for (identifier_list, direction, signal_type) in ports: if direction not in ('in', 'out','inout'): raise QHDLError(str((identifier_list, direction, signal_type))) if signal_type != "fieldmode": raise QHDLError("We currently only support signals of type 'fieldmode': " + str(identifier_list)) for pid in identifier_list: if pid in self.generics: raise QHDLError("There already exists a generic with the same identifier : %s" % pid) if pid in self.ports: raise QHDLError("Port identifier non-unique: %s" % pid) if direction == 'inout': self.inout_port_identifiers.append(pid) elif direction == 'in': self.in_port_identifiers.append(pid) else: # direction == 'out' self.out_port_identifiers.append(pid) self.ports[pid] = direction, signal_type # if not len(self.in_ports) == len(self.out_ports): # raise QHDLError('The numbers of input and output channels do not match.') # self._port_identifiers = tuple(port_identifiers) # self._generic_identifiers = tuple(generic_identifiers) # @property # def cdim(self): # return len(self.in_ports) # @property def port_identifiers(self): """The port_identifiers property.""" return self.inout_port_identifiers + self.in_port_identifiers + self.out_port_identifiers # # pids = port_identifiers @property def generic_identifiers(self): """The generic_identifiers property.""" return self.generics.keys() gids = generic_identifiers
# def __getitem__(self, key): # if key in self.ports: # return self.ports[key] # elif key in self.generics: # return self.generics[key] # else: # raise KeyError(str(key)) # # def get(self, key, default_val = None): # try: # return self[key] # except KeyError: # return default_val # # def has_generic(self, gid): # return gid in self.generics # # def has_port(self, pid): # return pid in self.ports
[docs]class Entity(BasicInterface):
[docs] def to_qhdl(self, tab_level = 0): ret_str = """entity %s is %s%s end entity %s;""" % (self.identifier, self.generics_to_qhdl(tab_level), self.ports_to_qhdl(tab_level), self.identifier) return (tab_level*"\t") + ret_str
[docs]class Component(BasicInterface):
[docs] def to_qhdl(self, tab_level = 0): ret_str = """component %s %s%s end component %s;""" % (self.identifier, self.generics_to_qhdl(tab_level), self.ports_to_qhdl(tab_level), self.identifier) return (tab_level*"\t") + ret_str
[docs]def dict_keys_sorted_by_val(dd): return sorted(dd.keys(), key = dd.get)
[docs]class Architecture(QHDLObject): global_inout = {} global_out = {} global_in = {} inout_to_signal = {} out_to_signal = {} in_to_signal = {} signal_to_global_in = {} signal_to_global_out = {} signals = [] lossy_signals = [] def __init__(self, identifier, entity, components, signals, assignments, global_assignments = {}): self.identifier = identifier self.entity = entity self.components = OrderedDict() self.signals = [] self.lossy_signals = [] self.instance_assignments = OrderedDict() self._circuit_data = False # lookuptables format # (instance_name, instance_port_name) # => ((target_instance_name|'entity'), port_name, port_id, (connecting_signal_name|None)) # mediated_inport_map = {} # mediated_outport_map = {} self.global_inout = {} self.global_out = {} self.global_in = {} self.inout_to_signal = {} self.out_to_signal = {} self.in_to_signal = {} self.signal_to_global_in = {} self.signal_to_global_out = {} #process components for component in components: #check for duplicate component identifiers if component.identifier in self.components: raise QHDLError('Component ID non-unique: %s' % component.identifier) component.cdim = getCDIM(component.identifier) self.components[component.identifier] = component #process signals for signal_ids, signal_type in signals: if signal_type not in ('fieldmode','lossy_fieldmode'): raise QHDLError("Currently only fieldmode and lossy_fieldmode are accepted as signal types: \n %s : %s" % (", ".join(signal_ids), signal_type)) for sid in signal_ids: #check if signal identifier coincides with entity port name if sid in entity.port_identifiers: raise QHDLError('Signal identifier already used as an entity port identifier: %s' % sid) #check for duplicate signal identifiers if (sid in self.signals) or (sid in self.lossy_signals): raise QHDLError('Signal identifier non-unique: %s' % sid) #every signal can only connect two ports of component instances, #one in-port and one out-port if signal_type == 'fieldmode': self.signals.append(sid) else: self.lossy_signals.append(sid) #process instance definitions, assignments for instance_name, component_name, generic_map, port_map in assignments: #check for duplicate instance identifier if instance_name in self.instance_assignments: raise QHDLError('Instance name non-unique: %s' % instance_name) #check if referenced component (interface) exists component = self.components.get(component_name, False) if not component: raise QHDLError('Bad component identifier: %s' % component_name) #convert generic map (a,b,c,...) into explicit generic map (q=>a, r=>b,...) #based on the generics of the component if not isinstance(generic_map, dict): #number of assignments must match number of generics in component if not len(generic_map) == len(component.generic_identifiers): raise QHDLError('All generics of component %s must be assigned' % component.identifier) generic_map = dict(zip(component.generic_identifiers, generic_map)) # if no generic map statement but the component definition has some generics defined elif generic_map == {} and len(component.generics) > 0: # assert that there are default values for all generics for gname, (_, default) in component.generics.items(): if default is None: raise QHDLError(('No default value defined for generic %s of component %s,\n' % (gname, component.identifier)) + 'either define this or add a generic map statement to the instance assigment of %s' % (instance_name,)) #if generic map is already in explicit form, assert that the #assigned q,r,... coincide those defined in the component elif sorted(generic_map.keys()) != sorted(component.generics.keys()): raise QHDLError('All generics of component %s must be assigned' % component.identifier) #make sure referenced a,b,c,... exist as generics in the entity for name_in_c, name_in_e in generic_map.items(): is_number = isinstance(name_in_e, (int, float, complex)) if not is_number: entity_g = entity.generics.get(name_in_e, False) if not entity_g: raise QHDLError('Entity %s does not define a generic of name %s and it cannot be parsed to a numeric value.' % (entity.identifier, name_in_e)) component_g = component.generics.get(name_in_c, False) #probably redundant if not component_g: raise QHDLError('Component %s does not define a generic of name %s' % (component.identifier, name_in_c)) #check that generics have compatible type #e.g. component_generic_type == real, entity_generic_type == int is okay #because an int may be cast to a real, however the other way around does not always work! if not gtype_compatible(component_g[0], entity_g[0]) : raise QHDLError('Mapped generics are of incompatible type.' \ + 'Change the generic\'s type in the entity to %s' % component_g[0]) # TODO rewrite port map based on whether a port is inout #convert port map (a,b,c,...) into port map (q=>a, r=>b,...) #based on the ports of the component if not isinstance(port_map, dict): if len(port_map) != len(component.port_identifiers): raise QHDLError('All ports of instance %s(%s) must be mapped' % (instance_name, component.identifier)) port_map = dict(zip(component.port_identifiers, port_map)) #if port map is already in explicit form, assert that the #assigned q,r,... coincide those defined in the component # elif sorted(port_map.keys()) != sorted(component.ports.keys()): # raise QHDLError('All ports of instance %s(%s) must be mapped' % (instance_name, component.identifier)) #create lookup tables for name_in_c, name_in_e in port_map.items(): #any referenced a,b,c,... must either exist #as a signal in the architecture or a port of the entity entity_p = entity.ports.get(name_in_e, False) try: self.signals.index(name_in_e) signal = name_in_e except ValueError: try: self.lossy_signals.index(name_in_e) signal = name_in_e except ValueError: signal = False if not entity_p and not signal: if name_in_e == 'OPEN': continue else: print(self.signals, self.lossy_signals) raise QHDLError('The entity %s does not define a port\ and the architecture %s \ does not any define any signal of \ name %s ' % (entity.identifier, self.identifier, name_in_e)) if signal and entity_p: raise QHDLError("Duplicate name for a signal and an entity port: %s" % name_in_e) component_p = component.ports.get(name_in_c, False) #probably redundant if not component_p: raise QHDLError('Component %s does not define a port of name %s' % (component.identifier, name_in_c)) c_dir, _ = component_p if entity_p: e_dir, _ = entity_p if c_dir == "inout": raise QHDLError('Component inout port %s.%s must be connected to signal' % (component.identifier, name_in_c)) if not e_dir == c_dir: raise QHDLError('Component port %s.%s must be connected to entity port of same direction' % (component.identifier, name_in_c)) # if c_dir == "inout": # self.global_inout[(instance_name, name_in_c)] = name_in_e # if c_dir == "in": self.global_in[(instance_name, name_in_c)] = name_in_e else: assert c_dir == "out" self.global_out[(instance_name, name_in_c)] = name_in_e else: if c_dir == "inout": self.inout_to_signal[(instance_name, name_in_c)] = name_in_e elif c_dir == "in": self.in_to_signal[(instance_name, name_in_c)] = name_in_e else: assert c_dir == "out" self.out_to_signal[(instance_name, name_in_c)] = name_in_e self.instance_assignments[instance_name] = component, generic_map, port_map for source_id, target_id in global_assignments.items(): # TODO: handle signals to INOUT ports if target_id in self.entity.out_port_identifiers: if not source_id in self.signals: raise QHDLError('Global Out-Ports may only be assigned to signals') self.signal_to_global_out[source_id] = target_id elif target_id in self.signals: if not source_id in self.entity.in_port_identifiers: raise QHDLError('Signals can only be assigned to global In-Ports') self.signal_to_global_in[target_id] = source_id else: raise QHDLError('Global Assignment Error: %s => %s' % (source_id, target_id))
[docs] def to_circuit(self, identifier_postfix = ''): """ Compute a circuit algebra expression from the QHDL code and return the circuit expression, the all_symbols appearing in it and the component instance assignments """ if self._circuit_data: return self._circuit_data from qnet.algebra import circuit_algebra as ca # initialize trivial circuit circuit = ca.cid(0) II = [] OO = [] if len(self.lossy_signals): if not self.components.get('Beamsplitter', False): self.components['Beamsplitter'] = Component('Beamsplitter', [('theta','real')],[(['In1','In2'],'in','fieldmode'),(['Out1','Out2'],'out','fieldmode')]) self.components['Beamsplitter'].cdim = 2 OPEN = object() #create all_symbols for all instances circuit_symbols = {} for (instance_name, (component, _, _)) in self.instance_assignments.items(): QQ = ca.CircuitSymbol(instance_name + identifier_postfix, component.cdim) circuit_symbols[instance_name] = QQ assert component.inout_port_identifiers == [] circuit = circuit + QQ # II = II + [(instance_name, port_name + "_i" ) for port_name in component.inout_port_identifiers] II = II + [(instance_name, port_name) for port_name in component.in_port_identifiers] # OO = OO + [(instance_name, port_name + "_o") for port_name in component.inout_port_identifiers] OO = OO + [(instance_name, port_name) for port_name in component.out_port_identifiers] if len(component.in_port_identifiers) + len(component.inout_port_identifiers) < component.cdim: II = II + [(OPEN,OPEN)] * ( component.cdim - len(component.in_port_identifiers) - len(component.inout_port_identifiers)) if len(component.out_port_identifiers) < component.cdim: OO = OO + [(OPEN,OPEN)] * ( component.cdim - len(component.out_port_identifiers) - len(component.inout_port_identifiers)) # Add loss-beamsplitters for k, s in enumerate(self.lossy_signals): # while "LSS%s_%d%s" % (s, j, identifier_postfix) in circuit_symbols: # j += 1 LBS = ca.CircuitSymbol("LSS_%s%s" % (s,identifier_postfix), 2) circuit = circuit + LBS II = II + [('LSS_%s' % s, 'In1'),(OPEN, OPEN)] OO = OO + [('LSS_%s' % s, 'Out1'),(OPEN, OPEN)] self.signals.append(s+"__from_loss") self.signals.append(s) # modify assignment of original component that leads into signal try: # exploit enforced order of dictionaries jj = list(self.in_to_signal.values()).index(s) ipnames = list(self.in_to_signal.keys())[jj] assert self.in_to_signal[ipnames] == s self.in_to_signal[ipnames] = s + "__from_loss" except ValueError: jj = list(self.global_out.values()).index(s) ipnames = list(self.global_out.keys())[jj] assert self.global_out[ipnames] == s self.global_out[ipnames] = s + "__from_loss" # Update lookup tables self.in_to_signal[('LSS_%s' % s, 'In1')] = s self.out_to_signal[('LSS_%s' % s, 'Out1')] = s + "__from_loss" # Create artificial instance assignment self.instance_assignments['LSS_%s' % s] = self.components['Beamsplitter'], {'theta': 'theta_LS%d' % k},{"In1": s, "Out1": s + "__from_loss"} circuit_symbols['LSS_%s' % s] = LBS self.entity.generics['theta_LS%d' % k] = "real", None assert circuit.cdim == len(OO) == len(II) SS = list(self.signals) # Add signals as passthru lines below rest circuit = circuit + ca.cid(len(SS)) # print(circuit) # Do feedback from instance output to signals SSp = list(SS) OOp = list(OO) M = len(OO) for iname, pname in OO: if iname is OPEN: continue sname = self.out_to_signal.get((iname, pname), False) if sname: k = OOp.index((iname, pname)) l = SSp.index(sname) + M circuit = circuit.feedback(k,l) SSp.remove(sname) OOp.remove((iname, pname)) # print(circuit) # Do feedback from signal output to instance inputs IIp = list(II) SSpp = list(SS) Mf = len(OOp) for iname, pname in II: if iname is OPEN: continue sname = self.in_to_signal.get((iname, pname), False) if sname: k = SSpp.index(sname) + Mf l = IIp.index((iname, pname)) circuit = circuit.feedback(k,l) SSpp.remove(sname) IIp.remove((iname, pname)) SIGNAL = object() OO_effective = OOp + [(SIGNAL, s) for s in SSpp] II_effective = IIp + [(SIGNAL, s) for s in SSp] omapping = {} # construct output permutation for i, (iname, pname) in enumerate(OO_effective): if iname is not SIGNAL: eport = self.global_out.get((iname, pname), False) else: eport = self.signal_to_global_out.get(pname, False) if eport: omapping[i] = list(self.entity.out_port_identifiers).index(eport) imapping = {} # construct output permutation for i, (iname, pname) in enumerate(II_effective): if not (iname is SIGNAL): eport = self.global_in.get((iname, pname), False) else: eport = self.signal_to_global_in.get(pname, False) if eport: k = list(self.entity.in_port_identifiers).index(eport) imapping[k] = i # print(imapping, II_effective,self.signal_to_global_in) circuit = ca.map_signals_circuit(omapping, circuit.cdim) << circuit << ca.map_signals_circuit(imapping, circuit.cdim) self._circuit_data = circuit, circuit_symbols, self.instance_assignments self.entity.cdim = circuit.cdim return self._circuit_data
def __repr__(self): return "%s('%s', '%s', %r, %r, %r)" \ % (self.__class__.__name__, self.identifier, self.entity, self.components, self.signals, self.instance_assignments)
[docs] def to_qhdl(self, tab_level = 0): components_qhdl = "\n".join([c.to_qhdl(tab_level = tab_level + 1) for c in self.components.values()]) signals_qhdl = " signal %s: fieldmode;\n" % ", ".join(self.signals.keys()) format_map = lambda dd: ", ".join(["%s=>%s" % mm for mm in dd.items()]) format_ass = lambda name, cname, generic_map, port_map : \ " %s: %s %s %s;" % (name, cname,\ ("" if not len(generic_map) \ else "generic map(%s);\n" % format_map(generic_map)),\ ("port map(%s)\n" % format_map(generic_map))) ret_str = """ architecture %s of %s is %s %s begin %s end architecture %s; """ % (self.identifier, self.entity.identifier, components_qhdl, signals_qhdl, "\n ".join([format_ass(a, *v) for a,v in self.instance_assignments.items()]), self.identifier) return ("\t"*tab_level) + ret_str.replace('\n', "\n"+ ("\t"*tab_level))
if __name__ == "__main__": from test.test_qhdl import * unittest.main()