Source code for pylablib.aux_libs.devices.Altera

"""
Home-built Altera NIOS soft-core device.
"""

from ...core.devio import backend  #@UnresolvedImport
from ...core.utils import numerical  #@UnresolvedImport

import struct, math, time

import numpy as np

_depends_local=["...core.devio.backend"]


[docs]def float_to_sfixed(val, high, low): l=high-low+1 val=val/(2.**low) ival=int(math.ceil(val)) ival=numerical.limit_to_range(ival,-(1<<(l-1)),(1<<(l-1))-1) ival=ival%(1<<l) return ival
[docs]def float_to_ufixed(val, high, low): l=high-low+1 val=val/(2**low) ival=int(math.ceil(val)) ival=numerical.limit_to_range(ival,0,(1<<l)-1) return ival
[docs]def sfixed_to_float(ival, high, low): l=high-low+1 ival=ival%int(2**l) if ival>=int(2**(l-1)): ival=ival-2**l return ival*(2**low)
[docs]def ufixed_to_float(ival, high, low): l=high-low+1 ival=ival%int(2**l) return ival*(2**low)
[docs]class NIOSDevice(backend.IBackendWrapper): def __init__(self, port_addr, timeout=3.): instr=backend.SerialDeviceBackend((port_addr,57600),timeout=timeout,term_write="",term_read="\n",connect_on_operation=True) backend.IBackendWrapper.__init__(self,instr) _ops={"nop":0x00,"reset":0x01, "read_memreg":0x10,"read_mmapreg":0x11,"write_memreg":0x18, "read_stream":0x20,"write_stream":0x28, "periph_comm":0x40,"debug":0xE0} @staticmethod def _build_comm_msg(op, addr, p1, p2): op=NIOSDevice._ops.get(op,op) return struct.pack("<H",op&0xFFFF)+struct.pack("<H",addr&0xFFFF)+struct.pack("<I",p1&0xFFFFFFFF)+struct.pack("<I",p2&0xFFFFFFFF) @staticmethod def _check_response(resp): if resp.startswith("OK"): return resp if resp.startswith("ERR"): raise RuntimeError("command returned error: {}".format(resp)) raise ValueError("unrecognized response: {}".format(resp)) def _send_read(self, msg, bin_length=0, timeout=None): with self.instr.single_op(): self.instr.flush_read() self.instr.write(msg[:1],flush=False) if len(msg)>1: self.instr.write(msg[1:],flush=False) if bin_length: resp=self.instr.read(bin_length) else: resp="" resp=resp+self.instr.readline(timeout=timeout) self.instr.flush_read() return resp
[docs] def comm(self, op, addr=0, p1=0, p2=0, timeout=1.): c="C"+self._build_comm_msg(op,addr,p1,p2) resp=self._send_read(c,timeout=timeout) return self._check_response(resp)
[docs] def query(self, op, addr=0, p1=0, p2=0, timeout=1.): q="Q"+self._build_comm_msg(op,addr,p1,p2) resp=self._send_read(q,bin_length=4,timeout=timeout) self._check_response(resp[4:]) return struct.unpack("<I",resp[:4])[0]
[docs] def upstream(self, data, timeout=1.): l=len(data)//4 c="U"+struct.pack("<I",l) #resp=self._send_read(s,timeout=timeout) with self.instr.single_op(): self.instr.flush_read() self.instr.write(c[:1],flush=False) self.instr.write(c[1:],flush=False) with self.instr.using_timeout(40.): self.instr.write(data,flush=False) resp=self.instr.readline(timeout=timeout) self.instr.flush_read() return self._check_response(resp)
[docs] def downstream(self, length, timeout=1.): s="D"+struct.pack("<I",length) with self.instr.single_op(): self.instr.flush_read() self.instr.write(s[:1],flush=False) self.instr.write(s[1:],flush=False) with self.instr.using_timeout(40.): data=self.instr.read(length*4) resp=self.instr.readline(timeout=timeout) self.instr.flush_read() self._check_response(resp) return data
[docs] def peripheral_comm(self, dev, write_data="", read_length=0, timeout=1.): write_length=len(write_data) s="P"+struct.pack("<I",dev)+struct.pack("<I",write_length)+struct.pack("<I",read_length) with self.instr.single_op(): self.instr.flush_read() self.instr.write(s[:1],flush=False) self.instr.write(s[1:],flush=False) if write_length: self.instr.write(write_data,flush=False) if read_length: with self.instr.using_timeout(40.): data=self.instr.read(read_length) else: data="" resp=self.instr.readline(timeout=timeout) self.instr.flush_read() self._check_response(resp) return data
[docs] def restart(self, full=False): if full: resp=self._send_read("R\x01") else: resp=self._send_read("R\x00") return self._check_response(resp)
[docs] def write_ram(self, addr, value, mask=0xFFFFFFFF): return self.comm("write_memreg",addr,value,mask)
[docs] def read_ram(self, addr): return self.query("read_memreg",addr)
[docs] def write_ram_sfixed(self, addr, value, high, low, shift=0, mask=None): value=float_to_sfixed(value,high,low) if mask is None: mask=(1<<(high-low+1))-1 self.comm("write_memreg",addr,value<<shift,mask<<shift) return sfixed_to_float(value,high,low)
[docs] def read_ram_sfixed(self, addr, high, low, shift=0, mask=None): value=self.query("read_memreg",addr) if mask is None: mask=(1<<(high-low+1))-1 return sfixed_to_float((value>>shift)&mask,high,low)
[docs] def upstream_ram_bank(self, bank, data, start=0): if isinstance(data,np.ndarray): data=data.astype("<i4").tostring() l=len(data)//4 self.comm("write_stream",bank,start,l) return self.upstream(data)
[docs] def zero_ram_bank(self, bank, size, start=0): data="\x00"*(size*4) return self.upstream_ram_bank(bank,data,start=start)
[docs] def downstream_ram_bank(self, bank, size, start=0): l=size//4 self.comm("read_stream",bank,start,l) return self.downstream(l)
[docs]class FPGAController(NIOSDevice): _reg_map={ "DSP_mode":(0x05,2,8), "adc1_offset":(0x10,(15,0)), "adc1_scale":(0x10,(1,-14),16), "adc2_offset":(0x11,(15,0)), "adc2_scale":(0x11,(1,-14),16), "adc1_jitter":(0x18,(15,0)), "dac_offset":(0x14,(18,0)), "dac_scale":(0x13,(2,-15)), "PID_P":(0x15,(8,-15)), "PID_I":(0x16,(0,-23)), "PID_D":(0x17,(20,-3)), "osc_step":(0x12,), "pulses_period":(0x20,), "pulse1_end":(0x21,), "pulse2_start":(0x22,), "pulse2_end":(0x23,), "LEDR":(0x00,16)} _default_reg=(0x00,32,0) _reg_enums={"DSP_mode":{"gen":0,"PID":1,"transf":2}} @classmethod def _get_reg_desc(cls, reg): if reg in cls._reg_map: r=cls._reg_map[reg] return r+cls._default_reg[len(r):] else: raise ValueError("unrecognized register: {}".format(reg))
[docs] def set_reg(self, reg, value): addr,size,shift=self._get_reg_desc(reg) if reg in self._reg_enums: value=self._reg_enums.get(value,value) if isinstance(size,tuple): self.write_ram_sfixed(addr,value,size[0],size[1],shift=shift) else: mask=(1<<(size+1))-1 value=value<<shift self.write_ram(addr,int(value),mask)
[docs] def get_reg(self, reg, translate_enum=True): addr,size,shift=self._get_reg_desc(reg) if isinstance(size,tuple): return self.read_ram_sfixed(addr,size[0],size[1],shift=shift) else: mask=(1<<(size+1))-1 value=(self.read_ram(addr)>>shift)&mask if translate_enum and reg in self._reg_enums: pass return value
__setitem__=set_reg __getitem__=get_reg
[docs] def set_pulses(self, period, w1, d, w2=None): w2=w1 if w2 is None else w2 self["pulses_period"]=int(period*100E6)-1 self["pulse1_end"]=int(w1*100E6) self["pulse2_start"]=int(d*100E6) self["pulse2_end"]=int(d*100E6)+int(w2*100E6)
[docs] def set_feedback_setpoint(self, setpoint): self["adc2_offset"]=0x8000-(0x10000/5.*setpoint) self["adc2_scale"]=1.
[docs] def enable_feedback(self, enabled=True, base_level=3.): self["dac_offset"]=(1<<17)/20.*base_level self["dac_scale"]=1. if enabled else 0.
[docs] def teach_feedback(self, delay=10., reset=True): if reset: self.write_ram(0x05,0x01,0x01) self.zero_ram_bank(0x10,0x1000) self.write_ram(0x05,0x00,0x01) self["PID_I"]=-5E-3 time.sleep(delay) self["PID_I"]=0
[docs] def W5500_read(self, block, addr, l): comm=struct.pack(">H",addr)+struct.pack("B",(block<<3)&0xFF) return self.peripheral_comm(0x0110,comm,l)
[docs] def W5500_write(self, block, addr, data, readback=True): comm=struct.pack(">H",addr)+struct.pack("B",((block<<3)&0xFF)|0x04)+data self.peripheral_comm(0x0110,comm) return self.W5500_read(block,addr,len(data)) if readback else ""