import pywinusb.hid as hid #@UnresolvedImport
from ...core.devio import backend #@UnresolvedImport
from ...core.utils import numerical, general #@UnresolvedImport
from ...core.utils import log #@UnresolvedImport
import time
import threading
_depends_local=["...core.devio.backend"]
[docs]class IVaunixDevice(backend.IBackendWrapper):
"""
Generic Vaunix device.
"""
def __init__(self, product_id=None, idx=0):
backend.IBackendWrapper.__init__(self,self._find_device(product_id,idx))
self._responses={}
self._events={}
self._event_lock=threading.Lock()
self._pause_handler=False
self._pause_start_event=threading.Event()
self.instr.open()
self.instr.set_raw_data_handler(self._response_handler)
self._retry_count=5
self._recv_timeout=5.
[docs] def close(self):
self._pause_handler=True
self._pause_start_event.wait()
self.instr.close()
@staticmethod
def _find_device(product_id, idx):
if product_id:
devices=hid.HidDeviceFilter(vendor_id=0x041F,product_id=product_id).get_devices()
else:
devices=hid.HidDeviceFilter(vendor_id=0x041F).get_devices()
if len(devices)<=idx:
raise RuntimeError("can't find device index {}".format(idx))
devices.sort(key=lambda d: str(d))
return devices[idx]
@staticmethod
def _build_query(comm, count, value):
data=[0]+[comm]+[count]
byteblock=[(value>>(i*8))&0xFF for i in range(count)]
data=data+byteblock
return data+[0]*(9-len(data))
@staticmethod
def _parse_response(data):
comm=data[1]
count=data[2]
byteblock=data[3:3+count]
value=sum([ (b<<(8*i)) for i,b in enumerate(byteblock) ])
return comm,value
def _response_handler(self, data):
if self._pause_handler:
self._pause_start_event.set()
time.sleep(.5) # pausing the response thread seems to reduce the possibility of inst.close() crashing (more wait time for closing resources?)
return
with self._event_lock:
comm,value=self._parse_response(data)
self._responses[comm]=value
if comm not in self._events:
self._events[comm]=threading.Event()
self._events[comm].set()
def _try_recv_data(self, comm):
with self._event_lock:
if comm not in self._events:
self._events[comm]=threading.Event()
evt=self._events[comm]
if evt.wait(timeout=self._recv_timeout):
evt.clear()
return self._responses.pop(comm)
else:
raise RuntimeError("no data received with command 0x{:02x}".format(comm))
[docs] def send_data(self, comm, count, value):
query=self._build_query(comm,count,value)
self.instr.send_output_report(query)
time.sleep(0.05)
[docs] def recv_data(self, comm):
for t in general.RetryOnException(self._retry_count,RuntimeError):
with t:
return self._try_recv_data(comm)
error_msg="receiving command 0x{:02x} failed; retrying...".format(comm)
log.default_log.info(error_msg,origin="devices/Vaunix",level="warning")
[docs] def query_data(self, send_comm, recv_comm=None, send_count=0, send_value=0):
comm=send_comm if recv_comm is None else recv_comm
for t in general.RetryOnException(self._retry_count,RuntimeError):
with t:
self.send_data(send_comm,send_count,send_value)
return self._try_recv_data(comm)
error_msg="querying command 0x{:02x} failed; retrying...".format(comm)
log.default_log.info(error_msg,origin="devices/Vaunix",level="warning")
[docs]class LMS(IVaunixDevice):
"""
Vaunix LMS (LabBrick) microwave generator.
"""
def __init__(self, product_id=None, idx=0):
IVaunixDevice.__init__(self,product_id=product_id,idx=idx)
self._max_power=10.
self._min_power=-45.
self._add_settings_node("power_on",self.get_output,None)
self._add_settings_node("power",self.get_output_level,self.set_output_level)
self._add_settings_node("frequency",self.get_frequency,self.set_frequency)
self._add_settings_node("extref",self.get_extref,self.set_extref)
[docs] def get_output(self):
return bool(self.query_data(0x0A))
[docs] def set_output(self, output=True):
self.send_data(0x8A,1,int(output))
return self.get_output()
[docs] def get_extref(self):
return not bool(self.query_data(0x23))
[docs] def set_extref(self, extref=True):
self.send_data(0xA3,1,int(not extref))
return self.get_extref()
[docs] def get_output_level(self):
level=self.query_data(0x0D)
return self._max_power-level*0.25 # weird power encoding
[docs] def set_output_level(self, level):
level=numerical.limit_to_range(level,self._min_power,self._max_power)
level=int((self._max_power-level)/0.25) # weird power encoding
self.send_data(0x8D,1,level)
return self.get_output_level()
[docs] def get_frequency(self):
return self.query_data(0x44)*10. # units of 10's of Hz
[docs] def set_frequency(self, frequency):
frequency=numerical.limit_to_range(frequency,0.5E9,2.3E9)
self.send_data(0xC4,4,int(frequency/10.)) # units of 10's of Hz
return self.get_frequency()
[docs] def save_as_default(self):
self.send_data(0x8C,3,0x315542)
[docs] def apply_settings(self, settings):
if "power_on" in settings and not settings["power_on"]:
self.set_output(False)
IVaunixDevice.apply_settings(self,settings)
if "power_on" in settings and settings["power_on"]:
self.set_output(True)
return self.get_settings()