Source code for pylablib.aux_libs.devices.Attocube

from ...core.devio import backend as backend_mod  #@UnresolvedImport
from ...core.utils import py3, general
from ...core.utils.strpack import pack_uint, unpack_uint, pack_int, unpack_int

import re
import collections
import time

_depends_local=["...core.devio.backend"]


[docs]class AttocubeError(RuntimeError): """Generic Attocube error"""
[docs]class ANC300(backend_mod.IBackendWrapper): """ Attocube ANC300 controller. Args: conn: connection parameters; for Ethernet connection is a tuple ``(addr, port)`` or a string ``"addr:port"`` backend(str): communication backend; by default, try to determine from the communication parameters pwd(str): connection password for Ethernet connection (default is ``"123456"``) """ def __init__(self, conn, backend="auto", pwd="123456"): if backend=="auto": backend=backend_mod.autodetect_backend(conn) if backend=="network": conn=backend_mod.NetworkDeviceBackend.combine_conn(conn,{"port":7230}) instr=backend_mod.new_backend(conn,backend=backend,timeout=3.,term_write="\r\n") self.pwd=pwd backend_mod.IBackendWrapper.__init__(self,instr) self.open() self._correction={} self._add_settings_node("voltages",self.get_all_voltages,self.set_all_voltages) self._add_settings_node("offsets",self.get_all_offsets,self.set_all_offsets) self._add_settings_node("frequencies",self.get_all_frequencies,self.set_all_frequencies) self._add_status_node("voltage_output",self.get_all_outputs) self._add_status_node("capacitance",self.get_all_capacitances)
[docs] def open(self): """Open the connection to the stage""" res=self.instr.open() if self.instr._backend=="network" and self.pwd is not None: self.instr.write(self.pwd) self.instr.write("echo off") self.instr.flush_read() self.update_available_axes() return res
[docs] def query(self, msg): """Send a query to the stage and return the reply""" self.instr.flush_read() self.instr.write(msg) reply=self.instr.read_multichar_term(["ERROR","OK"],remove_term=False) self.instr.flush_read() if reply.upper().endswith(b"ERROR"): raise AttocubeError(reply[:-5].strip()) return reply[:-2].strip()
[docs] def update_available_axes(self): """ Update the list of available axes. Need to call only if the hardware configuration of the ANC module has changed. """ axes=[] for ax in range(1,8): try: self.query("getm {}".format(ax)) axes.append(ax) except AttocubeError: pass self.axes=list(axes) return axes
[docs] def set_mode(self, axis="all", mode="stp"): """ Set axis mode. `axis` is either an axis index (starting from 1), or ``"all"`` (all axes). `mode` is ``"gnd"`` (ground) or ``"stp"`` (step). """ if axis=="all": for ax in self.axes: self.set_mode(ax,mode) return self.query("setm {} {}".format(axis,mode))
[docs] def get_mode(self, axis="all"): """ Get axis mode `axis` is either an axis index (starting from 1), or ``"all"`` (all axes). """ if axis=="all": return [self.get_mode(ax) for ax in self.axes] reply=py3.as_str(self.query("getm {}".format(axis))).strip() if reply.startswith("mode = "): return reply[7:].strip() raise AttocubeError("unexpected reply: {}".format(reply))
[docs] def enable_axis(self, axis, mode="stp"): """Enable specific axis (set to step mode)""" self.set_mode(axis,mode=mode)
[docs] def disable_axis(self, axis): """Disable specific axis (set to ground mode)""" self.set_mode(axis,mode="gnd")
[docs] def enable_all(self, mode="stp"): """Enable all axes (set to step mode)""" self.set_mode("all",mode=mode)
[docs] def disable_all(self): """Disable all axes (set to ground mode)""" self.set_mode("all",mode="gnd")
[docs] def measure_capacitance(self, axis="all", wait=True): """ Measure axis capacitance; finish in the GND mode. If ``wait==True``, wait until the capacitance measurement is finished (takes about a second per axis). """ if axis=="all": for ax in self.axes: self.measure_capacitance(ax,wait=wait) return self.set_mode(axis,mode="gnd") time.sleep(0.05) self.set_mode(axis,mode="cap") if wait: time.sleep(0.05) while self.get_mode(axis)!="gnd": time.sleep(0.1)
def _parse_reply(self, reply, name, units): patt=name+r"\s*=\s*([\d.]+)\s*"+units reply=py3.as_str(reply) m=re.match(patt,reply,re.IGNORECASE) if not m: raise AttocubeError("unexpected reply: {}".format(reply)) return float(m.groups()[0])
[docs] def get_voltage(self, axis): """Get axis step voltage in Volts""" reply=self.query("getv {}".format(axis)) return self._parse_reply(reply,"voltage","V")
[docs] def set_voltage(self, axis, voltage): """Set axis step voltage in Volts""" self.query("setv {} {}".format(axis,voltage)) return self.get_voltage(axis)
[docs] def get_offset(self, axis): """Get axis offset voltage in Volts""" reply=self.query("geta {}".format(axis)) return self._parse_reply(reply,"voltage","V")
[docs] def set_offset(self, axis, voltage): """Set axis offset voltage in Volts""" self.query("seta {} {}".format(axis,voltage)) return self.get_offset(axis)
[docs] def get_output(self, axis): """Get axis current output voltage in Volts""" reply=self.query("geto {}".format(axis)) return self._parse_reply(reply,"voltage","V")
[docs] def get_frequency(self, axis): """Get axis step frequency in Hz""" reply=self.query("getf {}".format(axis)) return self._parse_reply(reply,"frequency","Hz")
[docs] def set_frequency(self, axis, freq): """Set axis step frequency in Hz""" self.query("setf {} {}".format(axis,freq)) return self.get_frequency(axis)
[docs] def get_capacitance(self, axis, measure=False): """ Get capacitance measurement on the axis. If ``measure==True``, re-measure axis capacitance (takes about a second); otherwise, get the last measurement value. """ if measure: self.measure_capacitance(axis,wait=True) reply=self.query("getc {}".format(axis)) return self._parse_reply(reply,"capacitance","nF")*1E-9
def _get_all_axes_data(self, getter): return dict([(a,getter(a)) for a in self.axes])
[docs] def get_all_voltages(self): """Get the list of all axes step voltages""" return self._get_all_axes_data(self.get_voltage)
[docs] def get_all_offsets(self): """Get the list of all axes offset voltages""" return self._get_all_axes_data(self.get_offset)
[docs] def get_all_outputs(self): """Get the list of all axes offset voltages""" return self._get_all_axes_data(self.get_output)
[docs] def get_all_frequencies(self): """Get the list of all axes step frequencies""" return self._get_all_axes_data(self.get_frequency)
[docs] def get_all_capacitances(self, measure=False): """ Get the list of all axes capacitances If ``measure==True``, re-measure axes capacitances (takes about a secon0d per axis); otherwise, get the last measurement values. """ return self._get_all_axes_data(lambda axis: self.get_capacitance(axis,measure=measure))
def _set_all_axes_data(self, setter, values): if isinstance(values,(tuple,list)): values=dict(zip([self.axes,values])) for a,v in values.items(): setter(a,v)
[docs] def set_all_voltages(self, voltages): """ Get all axes step voltages. `voltages` is a list of step voltage, whose length is equal to the number of active (connected) axes. """ self._set_all_axes_data(self.set_voltage,voltages) return self.get_all_voltages()
[docs] def set_all_offsets(self, offsets): """ Get all axes offset voltages `offsets` is a list of offset voltags, whose length is equal to the number of active (connected) axes. """ self._set_all_axes_data(self.set_offset,offsets) return self.get_all_offsets()
[docs] def set_all_frequencies(self, frequencies): """ Get all axes step frequencies `frequencies` is a list of step frequencies, whose length is equal to the number of active (connected) axes. """ self._set_all_axes_data(self.set_frequency,frequencies) return self.get_all_frequencies()
[docs] def set_axis_correction(self, axis, factor=1.): """ Set axis correction factor. The factor is automatically applied when the motion is in the negative direction. """ self._correction[axis]=factor
[docs] def move(self, axis, steps=1): """Move a given axis for a given number of steps""" if steps<0: steps*=self._correction.get(axis,1.) steps=int(steps) if not steps: return comm="stepu" if steps>0 else "stepd" self.query("{} {} {}".format(comm,axis,abs(steps)))
[docs] def wait_for_axis(self, axis, timeout=30.): """ Wait for a given axis to stop moving. If the motion is not finished after `timeout` seconds, raise a backend error. """ with self.instr.using_timeout(timeout): self.query("stepw {}".format(axis))
[docs] def is_moving(self, axis): """Check if a given axis is moving""" return self.get_output(axis)!=0.
[docs] def stop(self, axis): """Stop motion of a given axis""" self.query("stop {}".format(axis))
[docs] def stop_all(self): """Stop motion of all axes""" for ax in self.axes: self.stop(ax)
ANCDevice=ANC300
[docs]class ANC350(backend_mod.IBackendWrapper): """ Attocube ANC350 controller. Args: conn: connection parameters - index of the Attocube ANC350 in the system (for a single controller leave 0) timeout(float): default operation timeout """ def __init__(self, conn=0, timeout=5.): if isinstance(conn,int): conn=(0x16C0,0x055B,conn,0x86,0x02,"libusb0") # default device IDs backend_mod.IBackendWrapper.__init__(self,None) instr=backend_mod.new_backend(conn,backend="pyusb",timeout=timeout,check_read_size=False) self._corr_number=0 self._tell_telegrams={} backend_mod.IBackendWrapper.__init__(self,instr) self.open() self.instr.read(512) self.set_value(0x000A,0,0) self.enable_updates(False) self.axes=[0,1,2] self._add_settings_node("voltages",self.get_all_voltages,self.set_all_voltages) self._add_settings_node("offsets",self.get_all_offsets,self.set_all_offsets) self._add_settings_node("frequencies",self.get_all_frequencies,self.set_all_frequencies) self._add_status_node("positions",self.get_all_positions) self._add_status_node("target_positions",self.get_all_target_positions) def _check_axis(self, axis): if axis not in self.axes: raise AttocubeError("invalid axis: {}".format(axis)) def _make_telegram(self, opcode, address, index=0, data=b"", add_corr=True): data=data[:(len(data)//4)*4] l=16+len(data) if add_corr: self._corr_number=(self._corr_number%0xFFFF)+1 corr_number=self._corr_number else: corr_number=0 return b"".join([pack_uint(v,4,"<") for v in [l,opcode,address,index,corr_number]])+data Telegram=collections.namedtuple("Telegram",["opcode","address","index","data","corr_number"]) def _parse_telegram(self, telegram): if len(telegram)<20: raise ValueError("data is too short: {}".format(len(telegram))) l,opcode,address,index,corr_number=[unpack_uint(telegram[i*4:i*4+4],"<") for i in range(5)] if len(telegram)!=l+4: raise ValueError("wrong telegram length: expected {}, got {}".format(l+4,len(telegram))) return self.Telegram(opcode,address,index,telegram[20:],corr_number) def _read_telegram(self, corr_number=None): ctd=general.Countdown(self.instr.timeout) while True: tg=self._parse_telegram(self.instr.read(512)) if tg.opcode==3: # ACK if corr_number is None or tg.corr_number==corr_number: return tg raise AttocubeError("gut unexpected correlation number: {}, expected {}".format(tg.corr_number,corr_number)) if tg.opcode==4: # TELL if (tg.address>>8)!=0x0F: self._tell_telegrams[(tg.address,tg.index)]=tg if ctd.passed(): raise AttocubeError("timeout while read") Reply=collections.namedtuple("Reply",["address","index","reason","data"]) def _write(self, opcode, address, index=0, data=b""): if isinstance(data, int): data=pack_int(data,4,"<") tg=self._make_telegram(opcode,address,index,data,add_corr=False) self.instr.write(tg) def _query(self, opcode, address, index=0, data=b""): if isinstance(data, int): data=pack_int(data,4,"<") tg=self._make_telegram(opcode,address,index,data) self.instr.write(tg) resp=self._read_telegram(self._corr_number) reason=unpack_uint(resp.data[:4],"<") return self.Reply(resp.address,resp.index,reason,resp.data[4:])
[docs] def check_tell(self, timeout=0.01): """Check for queued TELL (periodic value update) commands""" try: with self.instr.using_timeout(timeout): self._read_telegram() except (AttocubeError,self.instr.Error): pass
[docs] def set_value(self, address, index, value, ack=False, return_reason=False): """ Set device value at the given address and index. If ``ack==True``, request ACK responds and return its value; otherwise, return immediately after set. If ``return_reason==True``, return tuple ``(result, reason)``; otherwise, simply return result. """ if ack: resp=self._query(0,address,index,value) res=resp.data if isinstance(value,int): res=unpack_int(res,"<") return resp.reason,res else: self._write(0,address,index,value)
[docs] def get_value(self, address, index, as_int=True, return_reason=False): """ Get device value at the given address and index. If ``as_int==True``, convert the result into a signed integer; otherwise return raw byte string. If ``return_reason==True``, return tuple ``(result, reason)``; otherwise, simply return result. """ resp=self._query(1,address,index) res=resp.data if as_int: res=unpack_int(res,"<") return (res,resp.reason) if return_reason else res
def _read_register(self, address): idx=0 res=b"" while True: data=self.get_value(address,idx,as_int=False) eoln_pos=data.find(b"\x00") if eoln_pos>=0: res+=data[:eoln_pos] return res res+=data idx+=1
[docs] def enable_updates(self, enabled=True): """Enable or disable periodic TELL updates""" self.set_value(0x0145,0,1 if enabled else 0)
[docs] def get_hardware_id(self): """Return device HWID (by default -1)""" return self.get_value(0x0168,0)
[docs] def is_axis_connected(self, axis): """Check if axis is connected""" self._check_axis(axis) return bool(self.get_value(0x3002,axis))
[docs] def enable_axis(self, axis): """Enable specific axis""" self._check_axis(axis) self.set_value(0x3030,axis,1)
[docs] def disable_axis(self, axis): self._check_axis(axis) """Disable specific axis""" self.set_value(0x3030,axis,0)
[docs] def enable_all(self): """Enable all axes (set to step mode)""" for ax in self.axes: self.enable_axis(ax)
[docs] def disable_all(self): """Disable all axes (set to ground mode)""" for ax in self.axes: self.disable_axis(ax)
[docs] def get_voltage(self, axis): """Get axis step voltage in Volts""" self._check_axis(axis) return self.get_value(0x0400,axis)*1E-3
[docs] def set_voltage(self, axis, voltage): """Set axis step voltage in Volts""" self._check_axis(axis) self.set_value(0x0400,axis,int(voltage*1E3)) return self.get_voltage(axis)
[docs] def get_offset(self, axis): """Get axis offset voltage in Volts""" self._check_axis(axis) return self.get_value(0x0514,axis)*1E-3
[docs] def set_offset(self, axis, voltage): """Set axis offset voltage in Volts""" self._check_axis(axis) self.set_value(0x0514,axis,int(voltage*1E3)) return self.get_offset(axis)
[docs] def get_frequency(self, axis): """Get axis step frequency in Hz""" self._check_axis(axis) return self.get_value(0x0401,axis)
[docs] def set_frequency(self, axis, freq): """Set axis step frequency in Hz""" self._check_axis(axis) self.set_value(0x0401,axis,int(freq)) return self.get_frequency(axis)
def _get_all_axes_data(self, getter): return dict([(a,getter(a)) for a in self.axes])
[docs] def get_all_voltages(self): """Get the list of all axes step voltages""" return self._get_all_axes_data(self.get_voltage)
[docs] def get_all_offsets(self): """Get the list of all axes offset voltages""" return self._get_all_axes_data(self.get_offset)
[docs] def get_all_frequencies(self): """Get the list of all axes step frequencies""" return self._get_all_axes_data(self.get_frequency)
[docs] def get_all_positions(self): """Get the list of all axes positions""" return self._get_all_axes_data(self.get_position)
[docs] def get_all_target_positions(self): """Get the list of all axes target positions""" return self._get_all_axes_data(self.get_target_position)
def _set_all_axes_data(self, setter, values): if isinstance(values,(tuple,list)): values=dict(zip([self.axes,values])) for a,v in values.items(): setter(a,v)
[docs] def set_all_voltages(self, voltages): """ Get all axes step voltages. `voltages` is a list of step voltage, whose length is equal to the number of active (connected) axes. """ self._set_all_axes_data(self.set_voltage,voltages) return self.get_all_voltages()
[docs] def set_all_offsets(self, offsets): """ Get all axes offset voltages `offsets` is a list of offset voltags, whose length is equal to the number of active (connected) axes. """ self._set_all_axes_data(self.set_offset,offsets) return self.get_all_offsets()
[docs] def set_all_frequencies(self, frequencies): """ Get all axes step frequencies `frequencies` is a list of step frequencies, whose length is equal to the number of active (connected) axes. """ self._set_all_axes_data(self.set_frequency,frequencies) return self.get_all_frequencies()
[docs] def get_position(self, axis): """Get axis position (in m)""" self._check_axis(axis) return self.get_value(0x0415,axis)*1E-9
[docs] def move_to(self, axis, position): """Move to target position (in m)""" self._check_axis(axis) self.set_value(0x0408,axis,int(position*1E9)) self.set_value(0x040D,axis,1)
[docs] def move(self, axis, steps=1): """Move a given axis for a given number of steps""" self._check_axis(axis) steps=int(steps) if steps>=0: tg=self._make_telegram(0,0x0410,axis,b"\x01\x00\x00\x00",add_corr=False) else: tg=self._make_telegram(0,0x0411,axis,b"\x01\x00\x00\x00",add_corr=False) steps=-steps for _ in range(steps): self.instr.write(tg)
[docs] def is_moving(self, axis): """Move a given axis for a given number of steps""" self._check_axis(axis) return bool(self.get_value(0x302E,axis))
[docs] def get_target_position(self, axis): """Get the target position for the given axis""" self._check_axis(axis) return self.get_value(0x0408,axis)*1E-9
[docs] def is_target_reached(self, axis, precision=1E-9): """ Check if the target position is reached. Precision sets the final positioning precision (in m). """ self._check_axis(axis) self.set_value(0x3036,axis,int(precision*1E9)) return self.get_value(0x3037,axis)
[docs] def wait_for_axis(self, axis, precision=1E-9, timeout=10., period=0.01): """ Wait for a given axis to stop moving or to reach target position. If the motion is not finished after `timeout` seconds, raise a backend error. Precision sets the final positioning precision (in m). """ self._check_axis(axis) ctd=general.Countdown(timeout) while True: if (not self.is_moving(axis)) or self.is_target_reached(axis): return if ctd.passed(): raise AttocubeError("axis waiting timeout error") time.sleep(period)
[docs] def stop(self, axis): """Stop motion of a given axis""" self._check_axis(axis) self.set_value(0x0410,axis,0)
[docs] def stop_all(self): """Stop motion of all axes""" for axis in self.axes: self.stop(axis)
[docs] def jog(self, axis, direction): """ Jog a given axis in a given direction. `direction` can be either ``"-"`` (negative) or ``"+"`` (positive). The motion continues until it is explicitly stopped, or unitl a limit is hit. """ if not direction: # 0 or False also mean left direction="-" if direction in [1, True]: direction="+" if direction not in ["+","-"]: raise AttocubeError("unrecognized direction: {}".format(direction)) self._check_axis(axis) if direction=="+": self.set_value(0x040E,axis,1) else: self.set_value(0x040F,axis,1)