Source code for pylablib.core.devio.backend

"""
Routines for defining a unified interface across multiple backends.
"""

from ..utils.py3 import anystring
from builtins import range,zip
from . import interface

import time
import re
from ..utils import funcargparse, general, log, net, py3, module
import contextlib

_depends_local=[".interface"]


### Generic backend interface ###

[docs]class IDeviceBackend(object): """ An abstract class for a device communication backend. Connection is automatically opened on creation. Args: conn: Connection parameters (depend on the backend). timeout (float): Default timeout (in seconds). term_write (str): Line terminator for writing operations. term_read (str): Line terminator for reading operations. datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object), or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3) """ Error=RuntimeError """Base class for the errors raised by the backend operations""" def __init__(self, conn, timeout=None, term_write=None, term_read=None, datatype="auto"): object.__init__(self) funcargparse.check_parameter_range(datatype,"datatype",{"auto","str","bytes"}) self.datatype=datatype self.conn=conn self.term_write=term_write self.term_read=term_read _conn_params=["addr"] _default_conn=[None] @classmethod def _conn_to_dict(cls, conn): if isinstance(conn, dict): return conn if isinstance(conn, (tuple,list)): return dict(zip(cls._conn_params,conn)) return {cls._conn_params[0]:conn}
[docs] @classmethod def combine_conn(cls, conn1, conn2): conn=cls._conn_to_dict(conn2).copy() conn.update(cls._conn_to_dict(conn1)) return conn
def _to_datatype(self, data): if self.datatype=="auto": return data if self.datatype=="str": return py3.as_str(data) return py3.as_bytes(data)
[docs] def open(self): """Open the connection.""" pass
[docs] def close(self): """Close the connection.""" pass
[docs] def is_opened(self): """Check if the device is connected""" return True
def __bool__(self): return self.is_opened() __nonzero__=__bool__ # Python 2 compatibility
[docs] def lock(self, timeout=None): """Lock the access to the device from other threads/processes (isn't necessarily implemented).""" pass
[docs] def unlock(self): """Unlock the access to the device from other threads/processes (isn't necessarily implemented).""" pass
[docs] def locking(self, timeout=None): """Context manager for lock & unlock.""" return general.DummyResource()
[docs] def cooldown(self): """Cooldown between the operations (usually, some short time delay).""" pass
[docs] def set_timeout(self, timeout): """Set operations timeout (in seconds).""" pass
[docs] def get_timeout(self): """Get operations timeout (in seconds).""" return None
[docs] @contextlib.contextmanager def using_timeout(self, timeout=None): """Context manager for usage of a different timeout inside a block.""" if timeout is not None: to=self.get_timeout() self.set_timeout(timeout) try: yield finally: if timeout is not None: self.set_timeout(to)
[docs] def readline(self, remove_term=True, timeout=None, skip_empty=True): """ Read a single line from the device. Args: remove_term (bool): If ``True``, remove terminal characters from the result. timeout: Operation timeout. If ``None``, use the default device timeout. skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``). """ raise NotImplementedError("IDeviceBackend.readline")
[docs] def readlines(self, lines_num, remove_term=True, timeout=None, skip_empty=True): """ Read multiple lines from the device. Parameters are the same as in :func:`readline`. """ return [self.readline(remove_term=remove_term,timeout=timeout,skip_empty=skip_empty) for _ in range(lines_num)]
[docs] def read(self, size=None): """ Read data from the device. If `size` is not None, read `size` bytes (the standard timeout applies); otherwise, read all available data (return immediately). """ raise NotImplementedError("IDeviceBackend.read")
[docs] def flush_read(self): """Flush the device output (read all the available data; return the number of bytes read).""" return len(self.read())
[docs] def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1): """ Write data to the device. If ``flush==True``, flush the write buffer. If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times). """ raise NotImplementedError("IDeviceBackend.write")
[docs] def ask(self, query, delay=0., read_all=False): """ Perform a write followed by a read, with `delay` in between. If ``read_all==True``, read all the available data; otherwise, read a single line. """ self.write(query) if delay: time.sleep(delay) if read_all: return self.read() else: return self.readline()
[docs] @staticmethod def list_resources(desc=False): """ List all availabe resources for this backend. If ``desc==False``, return list of connections (usually strings), which can be used to connect to the device. Otherwise, return a list of descriptions, which have more info, but can be backend-dependent. Might not be implemented (depending on the backend), in which case returns ``None``. """ return None
### Helper functions ###
[docs]def remove_longest_term(msg, terms): """ Remove the longest terminator among `terms` from the end of the message. """ tcs=0 for t in terms: if msg.endswith(py3.as_builtin_bytes(t)): tcs=max(tcs,len(t)) return msg[:-tcs]
### Specific backends ### _backends={}
[docs]class IBackendOpenError(IOError): pass
try: import visa
[docs] class VisaBackendOpenError(IBackendOpenError,visa.VisaIOError): """Visa backend opening error""" def __init__(self, e): IBackendOpenError.__init__(self) visa.VisaIOError.__init__(self,e.error_code)
[docs] class VisaDeviceBackend(IDeviceBackend): """ NIVisa backend (via pyVISA). Connection is automatically opened on creation. Args: conn (str): Connection string. timeout (float): Default timeout (in seconds). term_write (str): Line terminator for writing operations; appended to the data term_read (str): Line terminator for reading operations (specifies when :func:`readline` stops). do_lock (bool): If ``True``, employ locking operations; otherwise, locking function does nothing. datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object), or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3) """ _default_operation_cooldown=0.03 _backend="visa" Error=visa.VisaIOError """Base class for the errors raised by the backend operations""" BackendOpenError=VisaBackendOpenError if module.cmp_versions(visa.__version__,"1.6")=="<": # older pyvisa versions have a slightly different interface def _set_timeout(self, timeout): self.instr.timeout=timeout def _get_timeout(self): return self.instr.timeout def _open_resource(self, conn): if not self.term_write.endswith(self.term_read): raise NotImplementedError("PyVisa version <1.6 doesn't support different terminators for reading and writing") instr=visa.instrument(conn) instr.term_chars=self.term_read self.term_write=self.term_write[:len(self.term_write)-len(self.term_read)] return instr _lock_default=False def _lock(self, timeout=None): raise NotImplementedError("PyVisa version <1.6 doesn't support locking") def _unlock(self): raise NotImplementedError("PyVisa version <1.6 doesn't support locking") def _lock_context(self, timeout=None): raise NotImplementedError("PyVisa version <1.6 doesn't support locking") def _read_term(self): return py3.as_builtin_bytes(self.instr.term_chars) else: def _set_timeout(self, timeout): self.instr.timeout=timeout*1000. # in newer versions timeout is in ms def _get_timeout(self): return self.instr.timeout/1000. # in newer versions timeout is in ms def _open_resource(self, conn): instr=visa.ResourceManager().open_resource(conn) instr.read_termination=self.term_read instr.write_termination=self.term_write self.term_read=self.term_write="" return instr _lock_default=False ## TODO: figure out GPIB locking issue def _lock(self, timeout=None): self.instr.lock(timeout=timeout*1000. if timeout is not None else None) def _unlock(self): self.instr.unlock() def _lock_context(self, timeout=None): return self.instr.lock_context(timeout=timeout*1000. if timeout is not None else None) def _read_term(self): return py3.as_builtin_bytes(self.instr.read_termination)
[docs] @staticmethod def list_resources(desc=False): return visa.ResourceManager().list_resources_info() if desc else visa.ResourceManager().list_resources()
def __init__(self, conn, timeout=10., term_write=None, term_read=None, do_lock=None, datatype="auto"): if term_write is None: term_write=b"\r\n" if term_read is None: term_read=b"\n" IDeviceBackend.__init__(self,conn,term_write=term_write,term_read=term_read,datatype=datatype) try: self.instr=self._open_resource(self.conn) self.opened=True self._operation_cooldown=self._default_operation_cooldown self._do_lock=do_lock if do_lock is not None else self._lock_default self.cooldown() self.set_timeout(timeout) except self.Error as e: raise VisaBackendOpenError(e)
[docs] def open(self): """Open the connection.""" self.instr.open() self.opened=True self.cooldown()
[docs] def close(self): """Close the connection.""" self.instr.close() self.opened=False self.cooldown()
[docs] def is_opened(self): return self.opened
[docs] def lock(self, timeout=None): """Lock the access to the device from other threads/processes.""" if self._do_lock: self.lock(timeout=timeout)
[docs] def unlock(self): """Unlock the access to the device from other threads/processes.""" if self._do_lock: self.unlock()
[docs] def locking(self, timeout=None): """Context manager for lock & unlock.""" if self._do_lock: return self._lock_context(timeout=timeout) else: return general.DummyResource()
[docs] def cooldown(self): """ Cooldown between the operations. Sleeping for a short time defined by `_operation_cooldown` attribute (30 ms by default). Also can be defined class-wide by `_default_operation_cooldown` class attribute. """ if self._operation_cooldown>0: time.sleep(self._operation_cooldown)
[docs] def set_timeout(self, timeout): """Set operations timeout (in seconds).""" if timeout is not None: self._set_timeout(timeout) self.cooldown()
[docs] def get_timeout(self): """Get operations timeout (in seconds).""" return self._get_timeout()
[docs] def readline(self, remove_term=True, timeout=None, skip_empty=True): """ Read a single line from the device. Args: remove_term (bool): If ``True``, remove terminal characters from the result. timeout: Operation timeout. If ``None``, use the default device timeout. skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``). """ with self.using_timeout(timeout): while True: result=self.instr.read_raw() if remove_term: term=self._read_term() if term and result.endswith(term): result=result[:-len(term)] if (not skip_empty) or result: break self.cooldown() return self._to_datatype(result)
[docs] def read(self, size=None): """ Read data from the device. If `size` is not None, read `size` bytes (the standard timeout applies); otherwise, read all available data (return immediately). """ if size is None: with self.using_timeout(0): return self.instr.read_raw(size=size) result=self.instr.read_raw(size=size) self.cooldown() return self._to_datatype(result)
[docs] def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1): """ Write data to the device. If ``flush==True``, flush the write buffer. If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times). """ data=py3.as_builtin_bytes(data) if self.term_write: data=data+py3.as_builtin_bytes(self.term_write) self.instr.write_raw(data) self.cooldown() if read_echo_delay>0.: time.sleep(read_echo_delay) if read_echo: for _ in range(read_echo_lines): self.readline() self.cooldown()
def __repr__(self): return "VisaDeviceBackend("+self.instr.__repr__()+")"
_backends["visa"]=VisaDeviceBackend except ImportError: pass try: import serial try: import serial.tools.list_ports as serial_list_ports except ImportError: serial_list_ports=None
[docs] class SerialBackendOpenError(IBackendOpenError,serial.SerialException): """Serial backend opening error""" def __init__(self, e): IBackendOpenError.__init__(self) serial.SerialException.__init__(self,*e.args)
[docs] class SerialDeviceBackend(IDeviceBackend): """ Serial backend (via pySerial). Connection is automatically opened on creation. Args: conn: Connection parameters. Can be either a string (for a port), or a list/tuple ``(port, baudrate, bytesize, parity, stopbits, xonxoff, rtscts, dsrdtr)`` supplied to the serial connection (default is ``('COM1',19200,8,'N',1,0,0,0)``), or a dict with the same parameters. timeout (float): Default timeout (in seconds). term_write (str): Line terminator for writing operations; appended to the data term_read (str): List of possible single-char terminator for reading operations (specifies when :func:`readline` stops). connect_on_operation (bool): If ``True``, the connection is normally closed, and is opened only on the operations (normally two processes can't be simultaneously connected to the same device). open_retry_times (int): Number of times the connection is attempted before giving up. no_dtr (bool): If ``True``, turn off DTR status line before opening (e.g., turns off reset-on-connection for Arduino controllers). datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object), or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3) """ _default_operation_cooldown=0.0 _backend="serial" Error=serial.SerialException """Base class for the errors raised by the backend operations""" BackendOpenError=SerialBackendOpenError _conn_params=["port","baudrate","bytesize","parity","stopbits","xonxoff","rtscts","dsrdtr"] _default_conn=["COM1",19200,8,"N",1,0,0,0] def __init__(self, conn, timeout=10., term_write=None, term_read=None, connect_on_operation=False, open_retry_times=3, no_dtr=False, datatype="auto"): conn_dict=self.combine_conn(conn,self._default_conn) if term_write is None: term_write=b"\r\n" if term_read is None: term_read=b"\n" if isinstance(term_read,anystring): term_read=[term_read] IDeviceBackend.__init__(self,conn_dict.copy(),term_write=term_write,term_read=term_read,datatype=datatype) port=conn_dict.pop("port") try: self.instr=serial.serial_for_url(port,do_not_open=True,**conn_dict) self.opened=True if no_dtr: try: self.instr.setDTR(0) except self.Error: log.default_log.debug("Cannot set DTR for an unconnected device",origin="backends/serial",level="misc") if not connect_on_operation: self.instr.open() self._operation_cooldown=self._default_operation_cooldown self._connect_on_operation=connect_on_operation self._opened_stack=0 self._open_retry_times=open_retry_times self.cooldown() self.set_timeout(timeout) except self.Error as e: raise SerialBackendOpenError(e) def _do_open(self): general.retry_wait(self.instr.open, self._open_retry_times, 0.3) def _do_close(self): #general.retry_wait(self.instr.flush, self._open_retry_times, 0.3) general.retry_wait(self.instr.close, self._open_retry_times, 0.3)
[docs] def open(self): """Open the connection.""" if not self._connect_on_operation and not self.opened: self._do_open() self.opened=True
[docs] def close(self): """Close the connection.""" if not self._connect_on_operation and self.opened: self._do_close() self.opened=False
[docs] def is_opened(self): return self.opened
def _op_open(self): if self._connect_on_operation: if not self._opened_stack: self._do_open() self._opened_stack=self._opened_stack+1 def _op_close(self): if self._connect_on_operation: self._opened_stack=self._opened_stack-1 if not self._opened_stack: self._do_close()
[docs] @contextlib.contextmanager def single_op(self): """ Context manager for a single operation. If ``connect_on_operation==True`` during creation, wrapping several command in `single_op` prevents the connection from being closed and reopened between the operations (only opened in the beginning and closed in the end). """ self._op_open() try: yield finally: self._op_close()
[docs] def cooldown(self): """ Cooldown between the operations. Sleeping for a short time defined by `_operation_cooldown` attribute (no cooldown by default). Also defined class-wide by `_default_operation_cooldown` class attribute. """ if self._operation_cooldown>0: time.sleep(self._operation_cooldown)
[docs] def set_timeout(self, timeout): """Set operations timeout (in seconds).""" if timeout is not None: self.instr.timeout=timeout self.cooldown()
[docs] def get_timeout(self): """Get operations timeout (in seconds).""" return self.instr.timeout
def _read_terms(self, terms=(), timeout=None, error_on_timeout=True): result=b"" singlechar_terms=all(len(t)==1 for t in terms) terms=[py3.as_builtin_bytes(t) for t in terms] with self.single_op(): with self.using_timeout(timeout): while True: c=self.instr.read(1 if terms else 8) result=result+c if c==b"": if error_on_timeout and terms: raise self.Error("timeout during read") return result if singlechar_terms: if c in terms: return result else: for t in terms: if result.endswith(t): return result
[docs] def readline(self, remove_term=True, timeout=None, skip_empty=True, error_on_timeout=True): """ Read a single line from the device. Args: remove_term (bool): If ``True``, remove terminal characters from the result. timeout: Operation timeout. If ``None``, use the default device timeout. skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``). error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout. """ while True: result=self._read_terms(self.term_read or [],timeout=timeout,error_on_timeout=error_on_timeout) self.cooldown() if remove_term and self.term_read: result=remove_longest_term(result,self.term_read) if not (skip_empty and remove_term and (not result)): break return self._to_datatype(result)
[docs] def read(self, size=None, error_on_timeout=True): """ Read data from the device. If `size` is not None, read `size` bytes (usual timeout applies); otherwise, read all available data (return immediately). """ with self.single_op(): if size is None: result=self._read_terms(timeout=0,error_on_timeout=error_on_timeout) else: result=self.instr.read(size=size) if len(result)!=size: raise self.Error("read returned less than expected: {} instead of {}".format(len(result),size)) self.cooldown() return self._to_datatype(result)
[docs] def read_multichar_term(self, term, remove_term=True, timeout=None, error_on_timeout=True): """ Read a single line with multiple possible terminators. Args: term: Either a string (single multi-char terminator) or a list of strings (multiple terminators). remove_term (bool): If ``True``, remove terminal characters from the result. timeout: Operation timeout. If ``None``, use the default device timeout. error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout. """ if isinstance(term,anystring): term=[term] result=self._read_terms(term,timeout=timeout,error_on_timeout=error_on_timeout) self.cooldown() if remove_term and term: result=remove_longest_term(result,term) return self._to_datatype(result)
[docs] def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1): """ Write data to the device. If ``flush==True``, flush the write buffer. If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times). """ with self.single_op(): data=py3.as_builtin_bytes(data) if self.term_write: data=data+py3.as_builtin_bytes(self.term_write) self.instr.write(data) self.cooldown() if flush: self.instr.flush() self.cooldown() if read_echo_delay>0.: time.sleep(read_echo_delay) if read_echo: for _ in range(read_echo_lines): self.readline() self.cooldown()
def __repr__(self): return "SerialDeviceBackend("+self.instr.__repr__()+")"
[docs] @staticmethod def list_resources(desc=False): if serial_list_ports is not None: return [(p if desc else p[0]) for p in serial_list_ports.comports()]
_backends["serial"]=SerialDeviceBackend except ImportError: pass try: import ft232
[docs] class FT232BackendOpenError(IBackendOpenError,ft232.Ft232Exception): """FT232 backend opening error""" def __init__(self, e): IBackendOpenError.__init__(self) msgs=ft232.Ft232Exception.errors code=msgs.index(e.msg) if e.msg in msgs else 1 ft232.Ft232Exception.__init__(self,code) def __str__(self): return self.msg
[docs] class FT232DeviceBackend(IDeviceBackend): """ FT232 backend (via pyft232). Connection is automatically opened on creation. Args: conn: Connection parameters. Can be either a string (for a port), or a list/tuple ``(port, baudrate, bytesize, parity, stopbits, xonxoff, rtscts, dsrdtr)`` supplied to the serial connection (default is ``('COM1',19200,8,'N',1,0,0,0)``), or a dict with the same parameters. timeout (float): Default timeout (in seconds). term_write (str): Line terminator for writing operations; appended to the data term_read (str): List of possible single-char terminator for reading operations (specifies when :func:`readline` stops). connect_on_operation (bool): If ``True``, the connection is normally closed, and is opened only on the operations (normally two processes can't be simultaneously connected to the same device). open_retry_times (int): Number of times the connection is attempted before giving up. no_dtr (bool): If ``True``, turn off DTR status line before opening (e.g., turns off reset-on-connection for Arduino controllers). datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object), or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3) """ _default_operation_cooldown=0.0 _backend="ft232" Error=ft232.Ft232Exception """Base class for the errors raised by the backend operations""" BackendOpenError=FT232BackendOpenError _conn_params=["port","baudrate","bytesize","parity","stopbits","xonxoff","rtscts"] _default_conn=[None,9600,8,"N",1,0,0] def __init__(self, conn, timeout=10., term_write=None, term_read=None, open_retry_times=3, datatype="auto"): conn_dict=self.combine_conn(conn,self._default_conn) if term_write is None: term_write=b"\r\n" if term_read is None: term_read=b"\n" if isinstance(term_read,anystring): term_read=[term_read] IDeviceBackend.__init__(self,conn_dict.copy(),term_write=term_write,term_read=term_read,datatype=datatype) port=conn_dict.pop("port") self.opened=False try: self.instr=ft232.Ft232(port,**conn_dict) self.opened=True self._operation_cooldown=self._default_operation_cooldown self._open_retry_times=open_retry_times self.cooldown() self.set_timeout(timeout) self._conn_params=(port,conn_dict,timeout) except self.Error as e: raise FT232BackendOpenError(e) def _do_open(self): if self.is_opened(): return def reopen(): self.instr=ft232.Ft232(self._conn_params[0],**self._conn_params[1]) self.set_timeout(self._conn_params[2]) self.opened=True general.retry_wait(reopen, self._open_retry_times, 0.3) def _do_close(self): if self.is_opened(): general.retry_wait(self.instr.close, self._open_retry_times, 0.3) self.opened=False
[docs] def open(self): """Open the connection.""" self._do_open()
[docs] def close(self): """Close the connection.""" self._do_close()
[docs] def is_opened(self): return self.opened
[docs] @contextlib.contextmanager def single_op(self): """ Context manager for a single operation. Does nothing. """ yield
[docs] def cooldown(self): """ Cooldown between the operations. Sleeping for a short time defined by `_operation_cooldown` attribute (no cooldown by default). Also defined class-wide by `_default_operation_cooldown` class attribute. """ if self._operation_cooldown>0: time.sleep(self._operation_cooldown)
[docs] def set_timeout(self, timeout): """Set operations timeout (in seconds).""" if timeout is not None: if timeout<1E-3: timeout=1E-3 # 0 is infinite timeout self.instr.timeout=timeout self.cooldown()
[docs] def get_timeout(self): """Get operations timeout (in seconds).""" return self.instr.timeout
def _read_terms(self, terms=(), timeout=None, error_on_timeout=True): result=b"" singlechar_terms=all(len(t)==1 for t in terms) terms=[py3.as_builtin_bytes(t) for t in terms] with self.single_op(): with self.using_timeout(timeout): while True: c=self.instr.read(1 if terms else 8) result=result+c if c==b"": if error_on_timeout and terms: raise self.Error(4) return result if singlechar_terms: if c in terms: return result else: for t in terms: if result.endswith(t): return result
[docs] def readline(self, remove_term=True, timeout=None, skip_empty=True, error_on_timeout=True): """ Read a single line from the device. Args: remove_term (bool): If ``True``, remove terminal characters from the result. timeout: Operation timeout. If ``None``, use the default device timeout. skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``). error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout. """ while True: result=self._read_terms(self.term_read or [],timeout=timeout,error_on_timeout=error_on_timeout) self.cooldown() if remove_term and self.term_read: result=remove_longest_term(result,self.term_read) if not (skip_empty and remove_term and (not result)): break return self._to_datatype(result)
[docs] def read(self, size=None, error_on_timeout=True): """ Read data from the device. If `size` is not None, read `size` bytes (usual timeout applies); otherwise, read all available data (return immediately). """ with self.single_op(): if size is None: result=self._read_terms(timeout=0,error_on_timeout=error_on_timeout) else: result=self.instr.read(size=size) if len(result)!=size: raise self.Error(4) self.cooldown() return self._to_datatype(result)
[docs] def read_multichar_term(self, term, remove_term=True, timeout=None, error_on_timeout=True): """ Read a single line with multiple possible terminators. Args: term: Either a string (single multi-char terminator) or a list of strings (multiple terminators). remove_term (bool): If ``True``, remove terminal characters from the result. timeout: Operation timeout. If ``None``, use the default device timeout. error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout. """ if isinstance(term,anystring): term=[term] result=self._read_terms(term,timeout=timeout,error_on_timeout=error_on_timeout) self.cooldown() if remove_term and term: result=remove_longest_term(result,term) return self._to_datatype(result)
[docs] def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1): """ Write data to the device. If ``flush==True``, flush the write buffer. If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times). """ with self.single_op(): data=py3.as_builtin_bytes(data) if self.term_write: data=data+py3.as_builtin_bytes(self.term_write) self.instr.write(data) self.cooldown() if flush: self.instr.flush() self.cooldown() if read_echo_delay>0.: time.sleep(read_echo_delay) if read_echo: for _ in range(read_echo_lines): self.readline() self.cooldown()
def __repr__(self): return "FT232DeviceBackend("+self.instr.__repr__()+")"
[docs] @staticmethod def list_resources(desc=False): return [d if desc else d[0] for d in ft232.list_devices()]
_backends["ft232"]=FT232DeviceBackend except (ImportError,NameError,OSError): pass
[docs]class NetworkBackendOpenError(IBackendOpenError,net.socket.error): """Network backend opening error""" def __init__(self, e): IBackendOpenError.__init__(self) net.socket.error.__init__(self,*e.args)
[docs]class NetworkDeviceBackend(IDeviceBackend): """ Serial backend (via pySerial). Connection is automatically opened on creation. Args: conn: Connection parameters. Can be either a string ``"IP:port"`` (e.g., ``"127.0.0.1:80"``), or a tuple ``(IP,port)``, where `IP` is a string and `port` is a number. timeout (float): Default timeout (in seconds). term_write (str): Line terminator for writing operations; appended to the data term_read (str): List of possible single-char terminator for reading operations (specifies when :func:`readline` stops). datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object), or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3) Note: If `term_read` is a string, its behavior is different from the VISA backend: instead of being a multi-char terminator it is assumed to be a set of single-char terminators. If multi-char terminator is required, `term_read` should be a single-element list instead of a string. """ _default_operation_cooldown=0.0 _backend="network" Error=net.socket.error """Base class for the errors raised by the backend operations""" BackendOpenError=NetworkBackendOpenError def __init__(self, conn, timeout=10., term_write=None, term_read=None, datatype="auto"): if term_write is None: term_write="\r\n" if term_read is None: term_read="\r\n" if isinstance(term_read,anystring): term_read=[term_read] conn=self._conn_to_dict(conn) self._split_addr(conn) IDeviceBackend.__init__(self,conn,term_write=term_write,term_read=term_read,datatype=datatype) try: self.socket=None self.open() self._operation_cooldown=self._default_operation_cooldown self.cooldown() self.set_timeout(timeout) except self.Error as e: raise NetworkBackendOpenError(e) _conn_params=["addr","port"] _default_conn=["127.0.0.1",80] @classmethod def _split_addr(cls, conn): addr=conn["addr"] addr_split=addr.split(":") if len(addr_split)==2: conn["addr"],conn["port"]=addr_split[0],int(addr_split[1]) elif len(addr_split)>2: raise ValueError("invalid device address: {}".format(conn))
[docs] def open(self): """Open the connection.""" self.close() self.socket=net.ClientSocket(send_method="fixedlen",recv_method="fixedlen") self.socket.connect(self.conn["addr"],self.conn["port"])
[docs] def close(self): """Close the connection.""" if self.socket is not None: self.socket.close() self.socket=None
[docs] def is_opened(self): return bool(self.socket)
[docs] def cooldown(self): """ Cooldown between the operations. Sleeping for a short time defined by `_operation_cooldown` attribute (no cooldown by default). Also defined class-wide by `_default_operation_cooldown` class attribute. """ if self._operation_cooldown>0: time.sleep(self._operation_cooldown)
[docs] def set_timeout(self, timeout): """Set operations timeout (in seconds).""" self.socket.set_timeout(timeout)
[docs] def get_timeout(self): """Get operations timeout (in seconds).""" return self.socket.get_timeout()
[docs] def readline(self, remove_term=True, timeout=None, skip_empty=True, error_on_timeout=True): """ Read a single line from the device. Args: remove_term (bool): If ``True``, remove terminal characters from the result. timeout: Operation timeout. If ``None``, use the default device timeout. skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``). error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout. """ while True: try: with self.using_timeout(timeout): result=self.socket.recv_delimiter(self.term_read,strict=True) except net.SocketTimeout: if error_on_timeout: raise self.cooldown() if remove_term and self.term_read: result=remove_longest_term(result,self.term_read) if not (skip_empty and remove_term and (not result)): break return self._to_datatype(result)
[docs] def read(self, size=None, error_on_timeout=True): """ Read data from the device. If `size` is not None, read `size` bytes (usual timeout applies); otherwise, read all available data (return immediately). """ if size is None: return self.socket.recv_all() else: try: data=self.socket.recv_fixedlen(size) except net.SocketTimeout: if error_on_timeout: raise return self._to_datatype(data)
[docs] def read_multichar_term(self, term, remove_term=True, timeout=None, error_on_timeout=True): """ Read a single line with multiple possible terminators. Args: term: Either a string (single multi-char terminator) or a list of strings (multiple terminators). remove_term (bool): If ``True``, remove terminal characters from the result. timeout: Operation timeout. If ``None``, use the default device timeout. error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout. """ if isinstance(term,anystring): term=[term] result=self.socket.recv_delimiter(term,strict=True) self.cooldown() if remove_term and term: result=remove_longest_term(result,term) return self._to_datatype(result)
[docs] def write(self, data, flush=True, read_echo=False, read_echo_delay=0, read_echo_lines=1): """ Write data to the device. If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times). `flush` parameter is ignored. """ self.socket.send_delimiter(data,self.term_write) self.cooldown() if read_echo_delay>0.: time.sleep(read_echo_delay) if read_echo: for _ in range(read_echo_lines): self.readline() self.cooldown()
def __repr__(self): return "NetworkDeviceBackend("+self.socket.__repr__()+")"
_backends["network"]=NetworkDeviceBackend try: import usb import usb.backend.libusb0 import usb.backend.libusb1 import usb.backend.openusb class PyUSBBackendOpenError(IBackendOpenError,usb.USBError): """USB backend opening error""" def __init__(self, e): IBackendOpenError.__init__(self) usb.USBError.__init__(self,*e.args) class PyUSBDeviceBackend(IDeviceBackend): """ USB backend (via PyUSB package). Connection is automatically opened on creation. Args: conn: Connection parameters. Can be either a string (for a port), or a list/tuple ``(vendorID, productID, index, endpoint_read, endpoint_write, backend)`` supplied to the connection (default is ``(0x0000,0x0000,0,0x00,0x00,'libusb0')``, which is invalid for most devices), or a dict with the same parameters. timeout (float): Default timeout (in seconds). term_write (str): Line terminator for writing operations; appended to the data term_read (str): List of possible single-char terminator for reading operations (specifies when :func:`readline` stops). datatype (str): Type of the returned data; can be ``"bytes"`` (return `bytes` object), ``"str"`` (return `str` object), or ``"auto"`` (default Python result: `str` in Python 2 and `bytes` in Python 3) """ _default_operation_cooldown=0.0 _backend="pyusb" Error=usb.USBError """Base class for the errors raised by the backend operations""" BackendOpenError=PyUSBBackendOpenError _conn_params=["vendorID","productID","index","endpoint_read","endpoint_write","backend"] _default_conn=[0x0000,0x0000,0,0x00,0x01,"libusb1"] _usb_backends={"libusb0":usb.backend.libusb0, "libusb1":usb.backend.libusb1, "openusb":usb.backend.openusb} def __init__(self, conn, timeout=10., term_write=None, term_read=None, check_read_size=True, datatype="auto"): conn_dict=self.combine_conn(conn,self._default_conn) funcargparse.check_parameter_range(conn_dict["backend"],"usb_backend",self._usb_backends) if isinstance(term_read,anystring): term_read=[term_read] self._operation_cooldown=self._default_operation_cooldown IDeviceBackend.__init__(self,conn_dict.copy(),term_write=term_write,term_read=term_read,datatype=datatype) self.timeout=timeout self.check_read_size=check_read_size try: self.open() except self.Error as e: raise PyUSBBackendOpenError(e) def open(self): """Open the connection.""" idx=self.conn["index"] backend=self._usb_backends[self.conn["backend"]].get_backend() all_devs=list(usb.core.find(idVendor=self.conn["vendorID"],idProduct=self.conn["productID"],backend=backend,find_all=True)) if len(all_devs)<idx+1: raise PyUSBBackendOpenError("can't find device with index {}; {} devices found".format(idx,len(all_devs))) self.instr=all_devs[idx] self.ep_read=self.conn["endpoint_read"] self.ep_write=self.conn["endpoint_write"] self.cooldown() self.opened=True def close(self): """Close the connection.""" self.instr.finalize() self.opened=False def is_opened(self): return self.opened def cooldown(self): """ Cooldown between the operations. Sleeping for a short time defined by `_operation_cooldown` attribute (no cooldown by default). Also defined class-wide by `_default_operation_cooldown` class attribute. """ if self._operation_cooldown>0: time.sleep(self._operation_cooldown) def set_timeout(self, timeout): """Set operations timeout (in seconds).""" if timeout is not None: self.timeout=timeout def get_timeout(self): """Get operations timeout (in seconds).""" return self.timeout def _timeout(self): return None if self.timeout is None else int(self.timeout*1000) def _read_terms(self, terms=(), read_block_size=65536, timeout=None, error_on_timeout=True): result=b"" singlechar_terms=all(len(t)==1 for t in terms) terms=[py3.as_builtin_bytes(t) for t in terms] while True: c=self.instr.read(self.ep_read,1 if terms else read_block_size,timeout=self._timeout()).tobytes() result=result+c if c==b"": if error_on_timeout and terms: raise self.Error("timeout during read") return result if not terms: return result if singlechar_terms: if c in terms: return result else: for t in terms: if result.endswith(t): return result def readline(self, remove_term=True, timeout=None, skip_empty=True, error_on_timeout=True): """ Read a single line from the device. Args: remove_term (bool): If ``True``, remove terminal characters from the result. timeout: Operation timeout. If ``None``, use the default device timeout. skip_empty (bool): If ``True``, ignore empty lines (works only for ``remove_term==True``). error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout. """ while True: result=self._read_terms(self.term_read or [],timeout=timeout,error_on_timeout=error_on_timeout) self.cooldown() if remove_term and self.term_read: result=remove_longest_term(result,self.term_read) if not (skip_empty and remove_term and (not result)): break return self._to_datatype(result) def read(self, size=None, max_read_size=65536, error_on_timeout=True): """ Read data from the device. If `size` is not None, read `size` bytes (usual timeout applies); otherwise, read all available data (return immediately). """ if size is None: result=self._read_terms(read_block_size=max_read_size,timeout=0,error_on_timeout=error_on_timeout) else: result=self.instr.read(self.ep_read,size,timeout=self._timeout()).tobytes() if len(result)!=size and self.check_read_size: raise self.Error("read returned less than expected {} instead of {}".format(len(result),size)) self.cooldown() return self._to_datatype(result) def read_multichar_term(self, term, remove_term=True, timeout=None, error_on_timeout=True): """ Read a single line with multiple possible terminators. Args: term: Either a string (single multi-char terminator) or a list of strings (multiple terminators). remove_term (bool): If ``True``, remove terminal characters from the result. timeout: Operation timeout. If ``None``, use the default device timeout. error_on_timeout (bool): If ``False``, return an incomplete line instead of raising the error on timeout. """ if isinstance(term,anystring): term=[term] result=self._read_terms(term,timeout=timeout,error_on_timeout=error_on_timeout) self.cooldown() if remove_term and term: result=remove_longest_term(result,term) return self._to_datatype(result) def write(self, data, read_echo=False, read_echo_delay=0, read_echo_lines=1): """ Write data to the device. If ``read_echo==True``, wait for `read_echo_delay` seconds and then perform :func:`readline` (`read_echo_lines` times). """ data=py3.as_builtin_bytes(data) if self.term_write: data=data+py3.as_builtin_bytes(self.term_write) self.instr.write(self.ep_write,data,timeout=self._timeout()) self.cooldown() if read_echo: if read_echo_delay>0.: time.sleep(read_echo_delay) for _ in range(read_echo_lines): self.readline() self.cooldown() def __repr__(self): return "PyUSBDeviceBackend("+self.instr.__repr__()+")" _backends["pyusb"]=PyUSBDeviceBackend except ImportError: pass _serial_re=re.compile(r"^com\d+",re.IGNORECASE) def _is_serial_addr(addr): return isinstance(addr,anystring) and bool(_serial_re.match(addr)) _network_re=re.compile(r"(\d+\.){3}\d+(:\d+)?",re.IGNORECASE) def _is_network_addr(addr): return isinstance(addr,anystring) and bool(_network_re.match(addr))
[docs]def autodetect_backend(conn): """ Try to determine the backend by the connection. The assumed default backend is ``'visa'``. """ if isinstance(conn, (tuple,list)): conn=conn[0] elif isinstance(conn, dict): if "addr" in conn and _is_network_addr(conn["addr"]): return "network" if "port" in conn and _is_serial_addr(conn["port"]): return "serial" return "visa" if _is_network_addr(conn): return "network" if _is_serial_addr(conn): return "serial" return "visa"
[docs]def new_backend(conn, timeout=None, backend="auto", **kwargs): """ Build new backend with the supplied parameters. Args: conn: Connection parameters (depend on the backend). timeout (float): Default timeout (in seconds). backend (str): Backend type. Available backends are ``'auto'`` (try to autodetect), ``'visa'``, ``'serial'``, ``'ft232'``, and ``'network'``. **kwargs: parameters sent to the backend. """ if isinstance(conn,IDeviceBackend): return conn if backend=="auto": backend=autodetect_backend(conn) funcargparse.check_parameter_range(backend,"backend",_backends) return _backends[backend](conn,timeout=timeout,**kwargs)
### Interface for a generic device class ###
[docs]class IBackendWrapper(interface.IDevice): """ A base class for an instrument using a communication backend. Args: instr: Backend (assumed to be already opened). """ def __init__(self, instr): interface.IDevice.__init__(self) self.instr=instr
[docs] def open(self): """Open the backend.""" return self.instr.open()
[docs] def close(self): """Close the backend.""" return self.instr.close()
[docs] def is_opened(self): """Check if the device is connected""" return bool(self.instr)
[docs] def lock(self, timeout=None): """Lock the access to the device from other threads/processes (isn't necessarily implemented).""" return self.instr.lock(timeout=timeout)
[docs] def unlock(self): """Unlock the access to the device from other threads/processes (isn't necessarily implemented).""" return self.instr.unlock()
[docs] def locking(self, timeout=None): """Context manager for lock & unlock.""" return self.instr.locking(timeout=timeout)