Source code for pylablib.aux_libs.devices.Trinamic

from ...core.utils import strpack
from ...core.devio import backend, data_format

from builtins import bytes
import collections
import time

_depends_local=["...core.devio.backend"]

[docs]class TrinamicError(RuntimeError): """Generic Trinamic error."""
i4_conv=data_format.DataFormat.from_desc(">i4")
[docs]class TMCM1100(backend.IBackendWrapper): """ Trinamic stepper motor controller. Args: conn: serial connection parameters (usually port or a tuple containing port and baudrate) """ def __init__(self, conn): conn=backend.SerialDeviceBackend.combine_conn(conn,(None,9600)) instr=backend.SerialDeviceBackend(conn,term_write="",term_read="",timeout=3.) backend.IBackendWrapper.__init__(self,instr) self._add_status_node("position",self.get_position) self._add_settings_node("speed",self.get_speed,self.set_speed) self._add_status_node("current_speed",self.get_current_speed) self._add_status_node("moving",self.is_moving) self.open()
[docs] def open(self): backend.IBackendWrapper.open(self) self.instr.flush_read() self.instr._operation_cooldown=0.01
@staticmethod def _build_command(comm, comm_type, value, bank=0, addr=0): val_str=i4_conv.convert_to_str(value) data_str=bytes(strpack.pack_uint(addr,1)+strpack.pack_uint(comm,1)+strpack.pack_uint(comm_type,1)+strpack.pack_uint(bank,1)+val_str) chksum=sum([b for b in data_str])%0x100 return data_str+strpack.pack_uint(chksum,1) ReplyData=collections.namedtuple("ReplyData",["comm","status","value","addr","module"]) @staticmethod def _parse_reply(reply, result_format="u4"): reply=bytes(reply) data_str=reply[:8] chksum=sum([b for b in data_str])%0x100 if chksum!=reply[8]: raise TrinamicError("Communication error: incorrect checksum") addr=strpack.unpack_uint(reply[0:1]) module=strpack.unpack_uint(reply[1:2]) status=strpack.unpack_uint(reply[2:3]) comm=strpack.unpack_uint(reply[3:4]) if result_format=="str": value=reply[4:8] else: value=data_format.DataFormat.from_desc(">"+result_format).convert_from_str(reply[4:8])[-1] return TMCM1100.ReplyData(comm,status,value,addr,module) _status_codes={100:"Success", 101:"Command loaded", 1:"Wrong checksum", 2:"Invalid command", 3:"Wrong type", 4:"Invalid value", 5:"EEPROM locked", 6:"Command not available"} @classmethod def _check_status(cls, status): if status not in cls._status_codes: raise TrinamicError("unrecognized status: {}".format(status)) if status<100: raise TrinamicError("error status: {} ({})".format(status,cls._status_codes[status]))
[docs] def query(self, comm, comm_type, value, result_format="i4", bank=0, addr=0): """ Send a query to the stage and return the reply. For details, see TMCM-1110 firmware manual. """ command=self._build_command(comm,comm_type,value,bank=bank,addr=addr) self.instr.write(command) reply_str=self.instr.read(9) reply=self._parse_reply(reply_str,result_format=result_format) self._check_status(reply.status) return reply
[docs] def get_axis_parameter(self, parameter, result_format="i4", addr=0): """Get a given axis parameter""" return self.query(6,parameter,0,result_format=result_format,addr=addr).value
[docs] def set_axis_parameter(self, parameter, value, addr=0): """Set a given axis parameter (volatile; resets on power cycling)""" return self.query(5,parameter,value,addr=addr)
[docs] def store_axis_parameter(self, parameter, value=None, addr=0): """Store a given axis parameter in EEPROM (by default, value is the current value)""" if value is not None: self.set_axis_parameter(parameter,value,addr=addr) return self.query(7,parameter,value,addr=addr)
[docs] def get_global_parameter(self, parameter, result_format="i4", bank=0, addr=0): """Get a given global parameter""" return self.query(10,parameter,0,result_format=result_format,bank=bank,addr=addr).value
[docs] def set_global_parameter(self, parameter, value, bank=0, addr=0): """Set a given global parameter""" return self.query(9,parameter,value,bank=bank,addr=addr)
[docs] def move_to(self, position, addr=0): """Move to a given position""" return self.query(4,0,position,addr=addr)
[docs] def move(self, steps=1, addr=0): """Move for a given number of steps""" return self.query(4,1,steps,addr=addr)
[docs] def get_position(self, addr=0): """Get the current axis position""" return self.get_axis_parameter(1,addr=addr)
[docs] def jog(self, direction, speed): """ Jog in a given direction with a given speed. `direction` can be either ``"-"`` (negative) or ``"+"`` (positive). The motion continues until it is explicitly stopped, or until a limit is hit. """ if not direction: # 0 or False also mean left direction="-" if direction in [1, True]: direction="+" if direction not in ["+","-"]: raise TrinamicError("unrecognized direction: {}".format(direction)) return self.query(1 if direction=="+" else 2,0,speed)
[docs] def stop(self, addr=0): """Stop motion""" return self.query(3,0,0,addr=addr)
[docs] def get_speed(self, addr=0): """Get the speed setting""" return self.get_axis_parameter(4,addr=addr)
[docs] def set_speed(self, speed, addr=0): """Set the speed setting""" return self.set_axis_parameter(4,speed,addr=addr)
[docs] def get_current_speed(self, addr=0): """Get the current speed""" return self.get_axis_parameter(3,addr=addr)
[docs] def is_moving(self, addr=0): """Check if the motor is moving""" return bool(self.get_current_speed(addr=addr))
[docs] def wait_for_axis(self, addr=0): """Wait for the motor to stop moving""" while self.get_current_speed(addr=addr): time.sleep(0.02)