Source code for pylablib.aux_libs.devices.Arcus
from .misc import default_placing_message, load_lib
from ...core.utils import py3
from ...core.devio.interface import IDevice
import os.path
import ctypes
import time
import sys
import struct
HANDLE=ctypes.c_void_p
BOOL=ctypes.c_int
DWORD=ctypes.c_int
[docs]class GenericPerformaxStage(IDevice):
"""
Generic Arcus Performax translation stage.
Args:
lib_path(str): path to the PerformaxCom.dll (default is to use the library supplied with the package)
idx(int): stage index
"""
_default_operation_cooldown=0.01
def __init__(self, lib_path=None, idx=0):
IDevice.__init__(self)
self._operation_cooldown=self._default_operation_cooldown
self.dll=self._load_dll(lib_path=lib_path)
self.dll.fnPerformaxComOpen.argtypes=[DWORD,ctypes.c_char_p]
self.dll.fnPerformaxComOpen.restype=BOOL
self.dll.fnPerformaxComClose.argtypes=[HANDLE]
self.dll.fnPerformaxComClose.restype=BOOL
self.dll.fnPerformaxComSendRecv.argtypes=[HANDLE,ctypes.c_char_p,DWORD,DWORD,ctypes.c_char_p]
self.dll.fnPerformaxComSendRecv.restype=BOOL
self.dll.fnPerformaxComSetTimeouts.argtypes=[DWORD,DWORD]
self.dll.fnPerformaxComSetTimeouts.restype=BOOL
self.dll.fnPerformaxComFlush.argtypes=[HANDLE]
self.dll.fnPerformaxComFlush.restype=BOOL
self.dll.fnPerformaxComGetProductString.argtypes=[DWORD,ctypes.c_char_p,DWORD]
self.dll.fnPerformaxComGetProductString.restype=BOOL
self.idx=idx
self.handle=None
self.rbuff=ctypes.create_string_buffer(65536)
self.open()
self._add_full_info_node("device_id",self.get_device_id)
@staticmethod
def _load_dll(lib_path=None):
error_message="The library is supplied on on Arcus website;\nAdditional required library: SiUSBXp.dll\n{}".format(default_placing_message)
if lib_path is None:
return load_lib("PerformaxCom.dll",locations=("local","global"),call_conv="stdcall",locally=True,error_message=error_message)
else:
return load_lib(lib_path,call_conv="stdcall",locally=True,error_message=error_message)
[docs] @staticmethod
def get_devices_num(lib_path=None):
"""Get number of connected Arcus devices"""
ndev=DWORD(0)
dll=GenericPerformaxStage._load_dll(lib_path=lib_path)
dll.fnPerformaxComGetNumDevices.argtypes=[ctypes.POINTER(DWORD)]
dll.fnPerformaxComGetNumDevices.restype=BOOL
dll.fnPerformaxComGetNumDevices(ctypes.byref(ndev))
return ndev.value
[docs] def open(self):
"""Open the connection to the stage"""
self.close()
self.dll.fnPerformaxComSetTimeouts(5000,5000)
phandle=ctypes.create_string_buffer(b"\x00"*8)
hlen=ctypes.sizeof(ctypes.c_void_p)
for _ in range(5):
code=self.dll.fnPerformaxComOpen(DWORD(int(self.idx)),phandle)
if code:
self.handle=struct.unpack("P",phandle.raw[:hlen])[0]
self.dll.fnPerformaxComFlush(self.handle)
return
time.sleep(0.3)
raise ArcusError("can't connect to the stage with index {}, return code {}".format(self.idx,code))
[docs] def close(self):
"""Close the connection to the stage"""
if self.handle:
for _ in range(5):
if self.dll.fnPerformaxComClose(self.handle):
self.handle=None
return
time.sleep(0.3)
raise ArcusError("can't disconnect from the stage with index {}".format(self.idx))
def _check_handle(self):
if self.handle is None:
raise ArcusError("device is not opened")
[docs] def get_device_id(self):
"""Get the device ID"""
devs=[]
for n in range(5):
time.sleep(self._operation_cooldown)
if not self.dll.fnPerformaxComGetProductString(self.idx,self.rbuff,n):
raise ArcusError("can't get info for the device with index {}".format(self.idx))
devs.append(py3.as_str(self.rbuff.value))
return devs
[docs] def query(self, comm):
"""Send a query to the stage and return the reply"""
self._check_handle()
time.sleep(self._operation_cooldown)
comm=py3.as_builtin_bytes(comm)+b"\x00"
if self.dll.fnPerformaxComSendRecv(self.handle,comm,len(comm),64,self.rbuff):
return py3.as_str(self.rbuff.value)
else:
raise ArcusError("error sending command {}".format(comm))
[docs]class Performax4EXStage(GenericPerformaxStage):
"""
Arcus Performax 4EX translation stage.
Args:
lib_path(str): path to the PerformaxCom.dll (default is to use the library supplied with the package)
idx(int): stage index
"""
def __init__(self, lib_path=None, idx=0):
GenericPerformaxStage.__init__(self,lib_path=lib_path,idx=idx)
self.set_absolute_mode()
self.enable_all_outputs()
self.ignore_limit_errors()
self._add_settings_node("ignore_limit_errors",self.is_ignoring_limit_errors,self.ignore_limit_errors)
self._add_status_node("positions",self.get_position,mux=("XYZU",))
self._add_settings_node("global_speed",self.get_speed,self.set_speed)
self._add_settings_node("axis_speed",self.get_axis_speed,self.set_axis_speed,mux=("XYZU",0))
self._add_status_node("axis_status",self.get_status,mux=("XYZU",0))
self._add_status_node("moving",self.is_moving,mux=("XYZU",0))
@staticmethod
def _check_axis(axis):
if axis.lower() not in ["x","y","z","u"]:
raise ArcusError("unrecognized axis: {}".format(axis))
return axis.upper()
[docs] def ignore_limit_errors(self, do_ignore=True):
"""
Switch ignoring limit errors on or off.
If on, limit error on one axis doesn't stop other axes.
"""
self.query("IERR={}".format("1" if do_ignore else "0"))
[docs] def is_ignoring_limit_errors(self):
"""
Check if ignoring limit errors is on.
If on, limit error on one axis doesn't stop other axes.
"""
return self.query("IERR")=="1"
[docs] def enable_output(self, axis, enable=True):
"""
Enable axis output.
If the output is disabled, the steps are generated by the controller, but not sent to the motors.
"""
axis=self._check_axis(axis)
axisn="XYZU".index(axis)+1
self.query("EO{}={}".format(axisn,"1" if enable else "0"))
[docs] def enable_all_outputs(self, enable=True):
"""Enable output on all axes"""
self.query("EO={}".format("15" if enable else "0"))
for axis in "xyzu":
self.enable_output(axis,enable=enable)
[docs] def get_position(self, axis):
"""Get the current axis position"""
axis=self._check_axis(axis)
return int(self.query("P"+axis))
[docs] def set_position_reference(self, axis, position=0):
"""
Set the current axis position as a reference.
Re-calibrate the position encoder so that the current position is set as `position` (0 by default).
"""
axis=self._check_axis(axis)
self.query("P{}={:.0f}".format(axis,position))
[docs] def move_to(self, axis, position):
"""Move a given axis to a given position"""
axis=self._check_axis(axis)
self.query("{}{:.0f}".format(axis,position))
[docs] def move(self, axis, steps=1):
"""Move a given axis for a given number of steps"""
self.move_to(axis,self.get_position(axis)+steps)
[docs] def jog(self, axis, direction):
"""
Jog a given axis in a given direction.
`direction` can be either ``"-"`` (negative) or ``"+"`` (positive).
The motion continues until it is explicitly stopped, or unitl a limit is hit.
"""
axis=self._check_axis(axis)
if not direction: # 0 or False also mean left
direction="-"
if direction in [1, True]:
direction="+"
if direction not in ["+","-"]:
raise ArcusError("unrecognized direction: {}".format(direction))
self.query("J{}{}".format(axis,direction))
[docs] def stop(self, axis):
"""Stop motion of a given axis"""
axis=self._check_axis(axis)
self.query("STOP"+axis)
[docs] def stop_all(self):
"""Stop motion of all axes"""
for axis in "XYZU":
self.query("STOP"+axis)
[docs] def get_axis_speed(self, axis):
"""Get the individual axis speed setting (in Hz)"""
axis=self._check_axis(axis)
return int(self.query("HS"+axis))
[docs] def set_speed(self, speed):
"""Set the global speed setting (in Hz)"""
self.query("HS={:.0f}".format(speed))
[docs] def set_axis_speed(self, axis, speed):
"""Set the individual axis speed setting (in Hz)"""
axis=self._check_axis(axis)
self.query("HS{}={:.0f}".format(axis,speed))
_status_bits={ "accel":0x001,"decel":0x002,"moving":0x004,
"alarm":0x008,
"sw_plus_lim":0x010,"sw_minus_lim":0x020,"sw_home":0x040,
"err_plus_lim":0x080,"err_minus_lim":0x100,"err_alarm":0x200,
"TOC_timeout":0x800}
def _get_full_status(self):
stat=self.query("MST")
return [int(x) for x in stat.split(":") if x]
[docs] def get_status_n(self, axis):
"""Get the axis status as an integer"""
axis=self._check_axis(axis)
axis_n="XYZU".index(axis)
return self._get_full_status()[axis_n]
[docs] def get_status(self, axis):
"""Get the axis status as a set of string descriptors"""
statn=self.get_status_n(axis)
return [ k for k in self._status_bits if self._status_bits[k]&statn ]
[docs] def is_moving(self, axis):
"""Check if a given axis is moving"""
return bool(self.get_status_n(axis)&0x007)
[docs] def check_limit_error(self, axis):
"""
Check if the axis hit limit errors.
Return ``""`` (not errors), ``"+"`` (positive limit error) or ``"-"`` (negative limit error).
"""
stat=self.get_status_n(axis)
err=""
if stat&self._status_bits["err_plus_lim"]:
err=err+"+"
if stat&self._status_bits["err_minus_lim"]:
err=err+"-"
return err
[docs] def clear_limit_error(self, axis):
"""Clear axis limit errors"""
axis=self._check_axis(axis)
self.query("CLR"+axis)
PerformaxStage=Performax4EXStage # for backwards compatibility