Source code for pylablib.aux_libs.gui.device_thread

from ...core.gui.qt.thread import controller
from ...core.utils import rpyc as rpyc_utils

[docs]class DeviceThread(controller.QTaskThread): """ Expansion of :class:`.QTaskThread` equipped to deal with a single device. Args: name: thread name devargs: args supplied to :math:`setup_task` method devkwargs: keyword args supplied to :math:`setup_task` method signal_pool: :class:`.SignalPool` for this thread (by default, use the default common pool) Attributes: device: managed device. Its opening should be specified in an overloaded :meth:`connect_device` method, and it is actually opened by calling :meth:`open_device` method (which also handles status updates and duplicate opening issues) qd: device query accessor, which routes device method call through a command ``ctl.qd.method(*args,**kwarg)`` is equivalent to ``ctl.device.method(args,kwargs)`` called as a query in the device thread qdi: device query accessor, ignores and silences any exceptions (including missing /stopped controller); similar to ``.qi`` accessor for queries Methods to overload: setup_task: executed on the thread startup (between synchronization points ``"start"`` and ``"run"``) finalize_task: executed on thread cleanup (attempts to execute in any case, including exceptions); by default, close the device connection if it is opened connect_device: create the device class and assign it to ``.device`` attribute; if connection failed, can leave the attribute ``None`` device_open: re-open currently closed device (by default, call ``.open`` method of the device) device_close: close currently opened device (by default, call ``.close`` method of the device) Commands: open_device: open the device, if not already opened close_device: close the device, if opened get_settings: get device settings get_full_info: get full info of the device """ def __init__(self, name=None, devargs=None, devkwargs=None, signal_pool=None): controller.QTaskThread.__init__(self,name=name,signal_pool=signal_pool,setupargs=devargs,setupkwargs=devkwargs) self.device=None self.add_command("open_device",self.open_device) self.add_command("close_device",self.close_device) self.add_command("get_settings",self.get_settings) self.add_command("get_full_info",self.get_full_info) self.add_command("_device_method",self._device_method) self._full_info_job=False self._full_info_nodes=None self.device_reconnect_tries=0 self._tried_device_connect=0 self.qd=self.DeviceMethodAccessor(self,ignore_errors=False) self.qdi=self.DeviceMethodAccessor(self,ignore_errors=True)
[docs] def finalize_task(self): self.close_device()
[docs] def connect_device(self): """ Connect the device and assign it to the ``self.device`` attribute. Should be overloaded in subclasses. In case of connection error, can leave ``self.device`` as ``None``, which symbolizes connection failure. """ pass
[docs] def device_open(self): """ Open the device which has been previously closed. By default, call ``.open`` method of the device. """ self.device.open()
[docs] def device_close(self): """ Close the device which is currently opened. By default, call ``.close`` method of the device. """ self.device.close()
[docs] def open_device(self): """ Open the device by calling :meth:`connect_device`. Return ``True`` if connection was a success (or the device is already connected) and ``False`` otherwise. """ if self.device is not None and self.device.is_opened(): return True if self.device is None and (self.device_reconnect_tries>=0 and self._tried_device_connect>self.device_reconnect_tries): return False self.update_status("connection","opening","Connecting...") if self.device is None: self.connect_device() if self.device is not None: if not self.device.is_opened(): self.device_open() if self.device.is_opened(): self.update_status("connection","opened","Connected") self._tried_device_connect=0 return True self._tried_device_connect+=1 self.update_status("connection","closed","Disconnected") return False
[docs] def close_device(self): """ Close the device. Automatically called on the thread finalization, ususally shouldn't be called explicitly. """ if self.device is not None and self.device.is_opened(): self.update_status("connection","closing","Disconnecting...") self.device_close() self.update_status("connection","closed","Disconnected")
[docs] def get_settings(self): """Get device settings""" return self.device.get_settings() if self.device is not None else {}
[docs] def setup_full_info_job(self, period=2., nodes=None): """ Setup a job which periodically obtains full information (by calling ``get_full_info`` method) from the device Useful if obtaining settings takes a lot of time, and they might be needed by some other thread on a short notice. Args: period: job period node: specifies info nodes to be requested (by default, all available nodes) """ if not self._full_info_job: self._full_info_nodes=nodes self.add_job("update_full_info",self.update_full_info,period) self._full_info_job=True
[docs] def update_full_info(self): """ Update full info of the device. A function for a job which is setup in :meth:`DeviceThread.setup_full_info_job`. Normally doesn't need to be called explicitly. """ self["full_info"]=self.device.get_full_info(nodes=self._full_info_nodes)
[docs] def get_full_info(self): """ Get full device info If the full info job is set up using :meth:`DeviceThread.setup_full_info_job`, use the last cached version of the full info; otherwise, request a new version from the device. """ if self.device: return self["full_info"] if self._full_info_job else self.device.get_full_info(nodes=self._full_info_nodes) else: return {}
def _device_method(self, name, args, kwargs): """Call a device method""" if self.open_device(): return getattr(self.device,name)(*args,**kwargs) return None
[docs] class DeviceMethodAccessor(object): """ Accessor object designed to simplify calling device commands. Automatically created by the thread, so doesn't need to be invoked externally. """ def __init__(self, parent, ignore_errors=False): object.__init__(self) self.parent=parent self.ignore_errors=ignore_errors self._calls={} def __getattr__(self, name): if name not in self._calls: parent=self.parent def remcall(*args, **kwargs): return parent.call_query("_device_method",[name,args,kwargs],ignore_errors=self.ignore_errors) self._calls[name]=remcall return self._calls[name]
[docs]class RemoteDeviceThread(DeviceThread): """ Expansion of :class:`DeviceThread` equipped to deal with a remote device (via RPyC library). All arguments, attributes and commands are the same as in :class:`DeviceThread`. """ def __init__(self, name=None, devargs=None, devkwargs=None, signal_pool=None): DeviceThread.__init__(self,name=name,signal_pool=signal_pool,devargs=devargs,devkwargs=devkwargs) self.rpyc=False self.rpyc_serv=None
[docs] def rpyc_device(self, remote, module, device, *args, **kwargs): """ Create a remote device on a different PC via RPyC. Can replace straightforward device creation for remote devices, i.e., instead of ``self.device = DeviceModule.DeviceClass(*args,**kwargs)`` one would call ``self.device = self.rpyc_device(host,"DeviceModule","DeviceClass",*args,**kwargs)``. Args: remote: address of the remote host (it should be running RPyC server; see :func:`.rpyc.run_device_service` for details) module: device class module name device: device class name args: arguments supplied to the device constructor. kwargs: keyword arguments supplied to the device constructor. """ self.rpyc=True self.rpyc_serv=rpyc_utils.connect_device_service(remote) if not self.rpyc_serv: return None return self.rpyc_serv.get_device(module,device,*args,**kwargs)
[docs] def rpyc_obtain(self, obj): """ Obtain (i.e., transfer to the local PC) an object returned by the device. If current device is local, return `obj` as is. """ if self.rpyc: return rpyc_utils.obtain(obj,serv=self.rpyc_serv) return obj
[docs] def finalize_task(self): DeviceThread.finalize_task(self) rpyc_serv=self.rpyc_serv self.device=None self.rpyc_serv=None if rpyc_serv is not None: try: rpyc_serv.getconn().close() except EOFError: pass
[docs] def get_settings(self): return self.rpyc_obtain(self.device.get_settings()) if self.device is not None else {}
[docs] def update_full_info(self): self["full_info"]=self.rpyc_obtain(self.device.get_full_info(nodes=self._full_info_nodes))
[docs] def get_full_info(self): if self.device: return self["full_info"] if self._full_info_job else self.rpyc_obtain(self.device.get_full_info(nodes=self._full_info_nodes)) else: return {}