from builtins import range
from ...core.utils.py3 import bytestring
from ...core.utils import dictionary, funcargparse #@UnresolvedImport
from ...core.dataproc import waveforms, fourier #@UnresolvedImport
try:
import zhinst.ziPython as zp #@UnresolvedImport
except ImportError:
pass
import numpy as np
import time
##### Traces sanitizing #####
[docs]def get_timestamps_data(trace):
if trace is None or len(trace)<2:
return (0.,0.,0.)
ts=trace[:,0]
dt=ts[1:]-ts[:-1]
return dt[0],(dt!=dt[0]).sum()
[docs]def check_timestamps(trace):
return get_timestamps_data(trace)[1]==0
[docs]def check_all_timestamps(data):
return all([check_timestamps(s) for s in data])
[docs]def get_timestamp_chunks(ts):
dt=abs(ts[1:]-ts[:-1])
un,cnts=np.unique(dt,return_counts=True)
ts0=un[cnts.argmax()]
jumps=(dt!=ts0).nonzero()[0]
chunks=[]
j0=0
for j in jumps:
chunks.append((ts[j0],ts[j]))
j0=j+1
if j0<len(dt):
chunks.append((ts[j0],ts[-1]))
return chunks
[docs]def cut_to_largest_chunk(traces):
lch=extract_largest_chunk(traces)
cut_traces=[]
for tr in traces:
if len(tr)==0:
cut_traces.append(tr)
else:
ctr=tr[(tr[:,0]>=lch[0])&(tr[:,0]<=lch[1])]
cut_traces.append(ctr)
return cut_traces
[docs]def align_all_timestamps(data, max_length=None):
traces=[s for s in data if len(s)>0]
start_ts=np.max([tr[0,0] for tr in traces])
start_ts=np.max([waveforms.find_closest_value(tr[:,0],start_ts,approach="top",ordered=True) for tr in traces]) # account for different timestamp steps
start_pts=[waveforms.find_closest_arg(s[:,0],start_ts,ordered=True) if len(s)>0 else 0 for s in data]
finish_ts=np.min([tr[-1,0] for tr in traces])
if max_length:
if finish_ts<start_ts+max_length:
return None
else:
finish_ts=start_ts+max_length
finish_ts=np.min([waveforms.find_closest_value(tr[:,0],finish_ts,approach="bottom",ordered=True) for tr in traces])
finish_pts=[waveforms.find_closest_arg(s[:,0],finish_ts,ordered=True) if len(s)>0 else 0 for s in data]
data=[s[spt:fpt,:] for s,spt,fpt in zip(data,start_pts,finish_pts)]
return data
[docs]def normalize_time(trace, normalization="full",clockbase=210E6):
funcargparse.check_parameter_range(normalization,"normalization",{"none","shift","scale","full"})
if trace.ndim==2:
trace=trace.copy()
if normalization in {"shift","full"}:
trace[:,0]=trace[:,0]-trace[0,0]
if normalization in {"scale","full"}:
trace[:,0]=trace[:,0]/clockbase
else:
if normalization in {"shift","full"}:
trace=trace-trace[0]
if normalization in {"scale","full"}:
trace=trace/clockbase
return trace
[docs]def filter_transfer_function(f, order, timeconst):
return 1./(2.j*np.pi*f*timeconst+1)**order
[docs]def tc_to_bandwidth(tc, order):
return (2**(1./order)-1)**.5/(2*np.pi*tc)
[docs]def bandwidth_to_tc(bandwidth, order):
return (2**(1./order)-1)**.5/(2*np.pi*bandwidth)
[docs]def filter_amplitude_kernel(t, order, timeconst):
return np.exp(-t/timeconst)*(np.sign(t)+1)/2.*t**(order-1)
_filter_p_consts=[[1],[1,1],[3,3,1],[15,15,6,1],[105,105,45,10,1],[945,945,420,105,15,1],[10395,10395,4725,1260,210,21,1],[134134,134134,62370,17325,3150,378,28,1]]
[docs]def filter_power_kernel(t, order, timeconst):
cs=_filter_p_consts[order-1]
tn=abs(t/timeconst)
s=t*0
for p,c in enumerate(cs):
s=s+c*tn**p
return (np.exp(-tn)*s)/cs[0]
[docs]def PSD_filter_compensate(PSD, order, timeconst):
PSD_filtered=PSD.copy()
PSD_filtered[:,1]=PSD_filtered[:,1]/abs(filter_transfer_function(PSD[:,0],order,timeconst))**2
return PSD_filtered
[docs]def FT_filter_compensate(ft, order, timeconst):
ft_filtered=ft.copy()
ft_filtered[:,1]=ft_filtered[:,1]/filter_transfer_function(ft_filtered[:,0],order,timeconst)
return ft_filtered
[docs]def trace_filter_compensate(trace, order, timeconst, truncate=True):
ft=fourier.fourier_transform(trace,truncate)
ft=FT_filter_compensate(ft,order,timeconst)
return fourier.inverse_fourier_transform(ft,False)
[docs]class Demod(object):
def __init__(self, n, dev):
object.__init__(self)
demods_n=len(dev["demods"])
mod_period=demods_n//2
self.osc=int(dev["demods",n,"oscselect"])
self.rel_frequency=dev["oscs",self.osc,"freq"]
self.frequency=self.rel_frequency
self.sideband_side=None
if ("mods",n//mod_period,"enable") in dev:
mod_ena=int(dev["mods",n//mod_period,"enable"])
else:
mod_ena=int(dev["mods",n//mod_period,"mode"])
if mod_ena:
if n%mod_period==0:
self.sideband_side=0
if n%mod_period==1:
self.sideband_side=1
elif n%mod_period==2:
self.sideband_side=-1
if self.sideband_side:
carrier_osc=int(dev["demods",(n//mod_period)*mod_period,"oscselect"])
carrier_freq=dev["oscs",carrier_osc,"freq"]
self.frequency=carrier_freq+self.rel_frequency*self.sideband_side
self.order=int(dev["demods",n,"order"])
self.timeconst=dev["demods",n,"timeconstant"]
self.rate=dev["demods",n,"rate"]
self.phaseshift=dev["demods",n,"phaseshift"]/180.*np.pi
self.input=int(dev["demods",n,"adcselect"])
self.input_range=int(dev["sigins",self.input,"range"])
[docs] def filter_transfer_function(self, f):
return filter_transfer_function(f,self.order,self.timeconst)
[docs] def filter_amplitude_kernel(self, t):
return filter_amplitude_kernel(t,self.order,self.timeconst)
[docs] def filter_power_kernel(self, t):
return filter_power_kernel(t,self.order,self.timeconst)
[docs] def PSD_filter_compensate(self, PSD, relative_frequency=True):
"""
If relative_frequency=True, frequencies are given relative to the demod frequency;
otherwise, they're real frequencies of an input signal
"""
if relative_frequency:
return PSD_filter_compensate(PSD,self.order,self.timeconst)
else:
PSD=PSD_filter_compensate(np.column_stack((PSD[:,0]-self.frequency,PSD[:,1])), self.order, self.timeconst)
return np.column_stack((PSD[:,0]+self.frequency,PSD[:,1]))
[docs] def FT_filter_compensate(self, ft, relative_frequency=True):
if relative_frequency:
return FT_filter_compensate(ft,self.order,self.timeconst)
else:
FT=FT_filter_compensate(np.column_stack((ft[:,0]-self.frequency,ft[:,1])),self.order,self.timeconst)
return np.column_stack((FT[:,0]+self.frequency,FT[:,1]))
[docs] def trace_filter_compensate(self, trace, truncate=True):
return trace_filter_compensate(trace,self.order,self.timeconst,truncate)
[docs]class ZIDevice(object):
"""
Generic Zurich Instruments device.
"""
def __init__(self, dev_id=None, port=8005, api_level=1):
object.__init__(self)
#self.server=ziutils.autoConnect(port,api_level)
for _ in range(5):
try:
self.server=zp.ziDAQServer('localhost',port,api_level)
if self._get_server_devices():
break
finally:
time.sleep(5.)
if dev_id is None:
devs=self._get_server_devices()
if not devs:
raise RuntimeError("No devices were detected")
dev_id=devs[0]
self.dev_id=dev_id
self.ref_timestamp=0
def _get_server_devices(self):
nodes=[node.lower() for node in self.server.listNodes('/',0)]
devs=[node for node in nodes if node.startswith("dev")]
return devs
[docs] def close(self):
self.server.disconnect()
self.server=None
def __enter__(self):
return self
def __exit__(self, *args, **vargs):
self.close()
return False
def _build_path(self, rel_path):
if isinstance(rel_path,list) or isinstance(rel_path,tuple):
rel_path="/".join([str(e) for e in rel_path])
else:
rel_path=str(rel_path)
return "/{0}/{1}".format(self.dev_id,rel_path)
[docs] def get_parameter(self, path, argtype="float"):
try:
path=self._build_path(path)
if argtype=="int":
return self.server.getInt(path)
elif argtype=="float":
return self.server.getDouble(path)
elif argtype=="str":
return self.server.getByte(path)
elif argtype=="sample":
return self.server.getSample(path)
elif argtype=="auxin_sample":
return self.server.getAuxInSample(path)
except RuntimeError as e:
raise RuntimeError(e.message+". The path is {}".format(path))
raise ValueError("unrecognized argtype: {0}".format(argtype))
[docs] def set_parameter(self, path, value):
try:
path=self._build_path(path)
if isinstance(value,float):
self.server.setDouble(path,value)
elif isinstance(value,bytestring):
self.server.setByte(path,value)
else:
self.server.setInt(path,value)
except RuntimeError as e:
raise RuntimeError(e.message+". The path is {}".format(path))
[docs] def get_default(self, path, argtype="float", default=None):
try:
self.get_parameter(path,argtype)
except RuntimeError:
return default
_timestamp_keys={"VALUE","TIMESTAMP"}
[docs] @classmethod
def filter_settings_dict(cls, settings):
def undo_array(x):
try:
if len(x)==1:
return x[0]
else:
return x
except Exception:
return x
def undo_timestamp(d):
if len(d)==2 and all([k.upper() in cls._timestamp_keys for k in d]):
return d["VALUE"]
else:
return d
if not dictionary.is_dictionary(settings):
return undo_array(settings)
else:
settings.map_self(undo_timestamp,to_visit="branches")
settings.map_self(undo_array,to_visit="leafs")
return settings
[docs] def get_branch(self, path="", rel_paths=True, raw=False):
try:
path=self._build_path(path)
branch=dictionary.Dictionary(self.server.get(path),case_normalization="upper",case_sensitive=False)
if rel_paths:
branch=branch.detach_branch(path)
else:
branch=branch.detach_branch(self.dev_id)
if not raw:
branch=self.filter_settings_dict(branch)
return branch
except RuntimeError as e:
raise RuntimeError(e.message+". The path is {}".format(path))
[docs] def set_branch(self, branch, path=""):
try:
settings=dictionary.Dictionary()
settings.merge_branch(branch,path)
for p,v in settings.iternodes(to_visit="leafs",include_path=True):
self.set_parameter(p,v)
except RuntimeError as e:
raise RuntimeError(e.message+". The path is {}".format(path))
get_settings=get_branch
apply_settings=set_branch
def __getitem__(self, path):
try:
return self.get_parameter(path)
except RuntimeError:
return self.get_branch(path)
def __setitem__(self, path, value):
if dictionary.is_dictionary(value):
self.set_branch(value,path)
else:
self.set_parameter(path,value)
def __contains__(self, path):
try:
self.get_parameter(path)
return True
except RuntimeError:
return False
### CONTROL ###
[docs] def sync(self):
self.server.sync()
[docs] def flush(self):
self.server.sync() # flush has been deprecated in the API
### PROPERTIES ###
[docs] def get_devtype(self):
return self.get_parameter("features/devtype","str")
[docs] def get_options(self):
options=self.get_parameter("features/options","str")
options=[o.strip() for o in options.split("\n") if not (o.isspace() or o=="")]
return options
[docs] def get_clockbase(self):
return self["clockbase"]
[docs] def normalize_time(self, trace, normalization="full"):
return normalize_time(trace,normalization=normalization,clockbase=self.get_clockbase())
[docs] def get_timestamp(self, subtract_ref_timestamp=True):
timestamp=self.server.getAuxInSample(self._build_path("auxins/0/sample"))["timestamp"][0]
if subtract_ref_timestamp:
return timestamp-self.ref_timestamp
else:
return timestamp
[docs] def update_ref_timestamp(self):
self.ref_timestamp=self.get_timestamp(subtract_ref_timestamp=False)
### DEMODS ###
[docs] def get_demod_sample(self, demod, fmt="complex"):
funcargparse.check_parameter_range(fmt,"fmt",{"dict","complex"})
res=self.get_parameter(("demods",demod,"sample"),"sample")
if fmt=="dict":
return res
else:
return res["x"][0]+1j*res["y"][0]
[docs] def get_demods_rates(self):
raise NotImplementedError("ZIDevice.get_demods_rates")
[docs] def get_demods_num(self):
return len(self.get_demods_rates())
[docs] def set_demod_rate(self, demod, rate):
raise NotImplementedError("ZIDevice.set_demod_rate")
[docs] def stop_demod(self, demod):
self.set_demod_rate(demod,0)
[docs] def get_demod_frequency(self, demod):
osc=int(self["demods",demod,"oscselect"])
return self["oscs",osc,"freq"]
[docs] def set_demod_frequency(self, demod, freq):
osc=int(self["demods",demod,"oscselect"])
self["oscs",osc,"freq"]=freq
[docs] def get_demod_power(self, demod, sigout, only_enabled=True):
amplitude=self["sigouts",sigout,"amplitudes",demod]
enable=(not only_enabled) or self["sigouts",sigout,"enables",demod]
if enable:
return np.log10(amplitude**2/(2*50.)/1E-3)*10.
else:
return None
[docs] def set_demod_power(self, demod, sigout, power_dBm=None, enable=True):
if power_dBm is not None:
amplitude=(10**(power_dBm*0.1)*1E-3*2*50)**0.5
self["sigouts",sigout,"amplitudes",demod]=amplitude
self["sigouts",sigout,"enables",demod]=enable
[docs] def read_demods(self, trace_length, demods=None, save_frequencies_for=None, buffer_size=None, sanitize_data=True, consistent_length=False):
return self.read_demods_poll(trace_length,demods=demods,save_frequencies_for=save_frequencies_for,sanitize_data=sanitize_data,consistent_length=consistent_length)
[docs] def get_demod(self, demod_idx):
return Demod(demod_idx,self)
[docs] def read_demod_pts(self, trace_pts, demod, poll_time=None, sanitize_data=True):
rate=self.get_demods_rates()[demod]
if not rate:
return []
trace_length=(trace_pts/rate*1.2)+0.05
if poll_time is not None:
trace_length=max(trace_length,poll_time)
data=self.read_demods_poll(trace_length,sanitize_data=sanitize_data)
if data is not None:
data=data[demod]
if poll_time is not None:
chunks=int(len(data)//trace_pts)
if chunks>0:
data=[data[trace_pts*i:trace_pts*(i+1),:] for i in range(chunks)]
else:
data=[]
else:
if len(data)>trace_pts:
data=[data[:trace_pts,:]]
else:
data=[]
return data
[docs] def repeat_read_demod_pts(self, trace_pts, demod, traces_num=1, poll_time=None):
traces=[]
while len(traces)<traces_num:
pts=self.read_demod_pts(trace_pts,demod,poll_time=poll_time)
if pts is not None:
traces=traces+pts
return traces[:traces_num]
[docs] class DemodTraceAccumulator(object):
def __init__(self, rate, length, columns=3):
object.__init__(self)
self.rate=rate
self.length=length
self.columns=columns
self.points_num=int(np.ceil(rate*length))
self.data=np.zeros((self.points_num,columns))
self.position=0
[docs] def add_piece(self, *traces):
add_l=min([len(t) for t in traces]+[self.points_num-self.position])
for t in range(self.columns):
self.data[self.position:self.position+add_l,t]=traces[t][:add_l]
self.position=self.position+add_l
if self.position==self.points_num:
return True
else:
return False
[docs] def full(self):
return self.position==self.points_num
def _parse_demods_data(self, data, demods=None, auxin_demod=None, save_demod_frequency=None):
dnum=self.get_demods_num()
demods=demods or range(dnum)
save_demod_frequency=save_demod_frequency or [False]*dnum
traces=[]
if self.dev_id in data:
data=data[self.dev_id]['demods']
for d in demods:
if str(d) in data:
if len(data[str(d)]["sample"])>0:
sample_data=data[str(d)]['sample']
if isinstance(sample_data,list):
sample_data=sample_data[0]
traces_to_save=[sample_data['timestamp']-self.ref_timestamp,sample_data['x'],sample_data['y']]
if save_demod_frequency[d]:
traces_to_save.append(sample_data['frequency'])
traces.append(traces_to_save)
else:
traces.append(None)
else:
traces.append(None)
if auxin_demod is not None:
auxin_demod=str(auxin_demod)
if auxin_demod in data and len(data[auxin_demod]["sample"])>0:
sample_data=data[auxin_demod]['sample']
if isinstance(sample_data,list):
sample_data=sample_data[0]
traces.append([sample_data['timestamp']-self.ref_timestamp,sample_data['auxin0'],sample_data['auxin1']])
else:
traces.append(None)
else:
traces=[[] for _ in demods]
if auxin_demod is not None:
traces=traces+[None]
data=[]
for d,t in zip(demods,traces):
if t is None:
c=4 if save_demod_frequency[d] else 3
data.append(np.zeros((0,c)))
else:
data.append(np.column_stack(t))
data.append(np.column_stack(traces[-1]))
return data
[docs] def read_demods_poll(self, trace_length, demods=None, save_frequencies_for=None, sanitize_data=True, consistent_length=False):
rates=self.get_demods_rates()
demods=demods or range(len(rates))
length_factor=1.1 if consistent_length else 1.
max_rate=max(rates)
max_rate_demod=rates.index(max_rate)
save_frequencies_for=save_frequencies_for or []
save_demod_frequency=[(d in save_frequencies_for) for d in range(8)]
self.sync()
for d in demods:
self.server.subscribe(self._build_path(("demods",d,"sample")))
data=self.server.poll(trace_length*length_factor,500,2)
for d in demods:
self.server.unsubscribe(self._build_path(("demods",d,"sample")))
data=self._parse_demods_data(data,demods,max_rate_demod,save_demod_frequency)
if sanitize_data or consistent_length:
if not check_all_timestamps(data):
return None
max_length=trace_length*self.get_clockbase() if consistent_length else None
return align_all_timestamps(data,max_length=max_length)
else:
return data
### SCOPE ###
[docs] def get_auxin_sample(self, channel, fmt="float"):
funcargparse.check_parameter_range(fmt,"fmt",{"dict","float"})
res=self.get_parameter(("auxins/0/sample"),"auxin_sample")
if fmt=="dict":
return res
else:
return res["ch{}".format(channel)][0]
[docs] def get_scope_trace_pts(self):
return self["scopes/0/length"]
[docs] def set_scope_trace_pts(self, length):
self["scopes/0/length"]=length
_scope_sources=["sigin0","sigin1","sigout0","sigout1"]
_scope_ranges={"sigin0":"sigins/0/range","sigin1":"sigins/1/range",
"sigout0":"sigouts/0/range","sigout1":"sigouts/1/range"}
[docs] def get_scope_source_idx(self, source):
funcargparse.check_parameter_range(source,"source",self._scope_sources)
return self._scope_sources.index(source)
[docs] def get_scope_source_range(self, source):
if isinstance(source,str):
funcargparse.check_parameter_range(source,"source",self._scope_sources)
else:
source=self._scope_sources[source]
return self[self._scope_ranges[source]]
[docs] def get_scope_timestep_idx(self):
return self["scopes/0/time"]
[docs] def get_scope_timestep(self, timestep_idx=None):
if timestep_idx is None:
timestep_idx=self["scopes/0/time"]
return 2**timestep_idx/self.get_clockbase()
[docs] def get_scope_time_points(self, length=None):
length=length or self.get_scope_trace_pts()
return np.arange(length)*self.get_scope_timestep()
[docs] def setup_trigger(self, channel="continuous", edge="rising", level=0., delay=0., holdoff=0.):
raise NotImplementedError("ZIDevice.setup_trigger")
[docs] def setup_scope(self, timestep_idx=None, source="sigin0", trace_pts=None, bw_limit=True):
raise NotImplementedError("ZIDevice.setup_scope")
[docs] def read_scope_poll(self, traces=1, poll_time=None, add_time=False, timeout="auto", fast_flush=False):
raise NotImplementedError("ZIDevice.read_scope_poll")
[docs] def read_scope(self, timestep_idx=None, source="sigin0", traces=1, trace_pts=None, poll_time=None, add_time=False, bw_limit=True, timeout="auto"):
self.setup_scope(timestep_idx,source,trace_pts,bw_limit)
return self.read_scope_poll(traces,poll_time,timeout=timeout,add_time=add_time)
[docs]class HF2Device(ZIDevice):
"""
Zurich Instruments HF2 device.
"""
def __init__(self, dev_id=None, port=8005):
ZIDevice.__init__(self,dev_id=dev_id,port=port,api_level=1)
self._scope_trigger=-2
### PROPERTIES ###
[docs] def get_clockbase(self):
return 210E6
### DEMODS ###
[docs] def get_demods_rates(self):
return [self.get_parameter(("demods",d,"rate")) for d in range(6)]
[docs] def set_demod_rate(self, demod, rate):
if rate<=0:
rate=0
self["demods",demod,"rate"]=rate
### SCOPE ###
[docs] def get_scope_trace_pts(self):
return 2048
[docs] def set_scope_trace_pts(self, length):
if length!=2048:
raise ValueError("for HF2 the number of scope points is always 2048")
[docs] def get_scope_source(self):
return self._scope_sources[self["scopes/0/channel"]]
def _parse_scope_data(self, data, trace_pts=None):
path="/{}/scopes/0/wave".format(self.dev_id)
if path not in data:
return []
waves=data[path]
traces=[]
rng=self.get_scope_source_range(waves[0]["scopechannel"])
trace_pts=trace_pts or len(waves[0]["wave"])
for w in waves:
if len(w["wave"])==trace_pts:
traces.append(w["wave"]/(2.**15)*rng)
return traces
[docs] def setup_trigger(self, channel="continuous", edge="rising", level=0., delay=0., holdoff=0.):
funcargparse.check_parameter_range(channel,"channel",self._scope_sources+["continuous"])
funcargparse.check_parameter_range(edge,"edge",{"rising","falling"})
self._scope_trigger=self._scope_sources.index(channel) if channel in self._scope_sources else -2
self["scopes/0/trigedge"]=(edge=="rising")
if channel in self._scope_sources:
rng=self.get_scope_source_range(channel)
self["scopes/0/triglevel"]=int(level/rng*32767.)
self["scopes/0/trigholdoff"]=holdoff
[docs] def setup_scope(self, timestep_idx=None, source="sigin0", trace_pts=None, bw_limit=True):
self["scopes/0/channel"]=self.get_scope_source_idx(source)
self["scopes/0/bwlimit"]=bw_limit
if timestep_idx is not None:
if timestep_idx>15 or timestep_idx<0:
raise ValueError("timestep_idx is an integer between 0 and 15")
self["scopes/0/time"]=timestep_idx
if trace_pts is not None:
self.set_scope_trace_pts(trace_pts)
self.server.sync()
[docs] def read_scope_poll(self, traces=1, poll_time=None, add_time=False, timeout="auto", fast_flush=True):
if add_time:
raise ValueError("HF2 doesn't support scope timestamps")
trace_pts=self.get_scope_trace_pts()
trace_length=trace_pts*self.get_scope_timestep()
trace_length=max(1E-3,trace_length)
res=[]
if poll_time is None:
total_length=traces*trace_length*1.2
else:
total_length=poll_time
if timeout=="auto":
timeout=max(total_length*3,1.)
t=time.time()
while len(res)<traces:
if timeout is not None and (time.time()>t+timeout):
raise RuntimeError("execution timed out; trace length changed during execution?")
if fast_flush:
self.server.poll(1E-3,1)
else:
#self.server.flush()
self.server.sync()
self.server.subscribe(self._build_path(["scopes/0/wave"]))
self["scopes/0/trigchannel"]=self._scope_trigger
data=self.server.poll(total_length,500,0,True)
self["scopes/0/trigchannel"]=-1
self.server.unsubscribe(self._build_path(["scopes/0/wave"]))
data=self._parse_scope_data(data,trace_pts=trace_pts)
if poll_time is not None:
return data
res=res+data
if len(data)==0:
total_length=total_length*2
else:
total_length=total_length/len(data)*(traces-len(res))
return res[:traces]
[docs]class UHFDevice(ZIDevice):
"""
Zurich Instruments UHF device.
"""
def __init__(self, dev_id=None, port=8004):
ZIDevice.__init__(self,dev_id=dev_id,port=port,api_level=4)
### DEMODS ###
[docs] def get_demods_rates(self):
return [self.get_parameter(("demods",d,"rate")) if self.get_parameter(("demods",d,"enable")) else 0. for d in range(8)]
[docs] def set_demod_rate(self, demod, rate):
if rate<=0:
self["demods",demod,"enable"]=False
else:
self["demods",demod,"rate"]=rate
self["demods",demod,"enable"]=True
### SCOPE ###
_scope_sources=["sigin0","sigin1","trigin0","trigin1"]+[None]*4+["auxin0","auxin1"]
_scope_ranges={"sigin0":"sigins/0/range","sigin1":"sigins/1/range"}
[docs] def get_scope_source_idx(self, source):
funcargparse.check_parameter_range(source,"source",self._scope_sources)
return self._scope_sources.index(source)
[docs] def get_scope_source_range(self, source):
if isinstance(source,str):
funcargparse.check_parameter_range(source,"source",self._scope_sources)
else:
source=self._scope_sources[source]
if source is None:
raise KeyError("can't access source index {}".format(source))
if source.startswith("auxin"):
return 10.
elif source.startswith("trigin"):
return 1.
else:
return self[self._scope_ranges[source]]
[docs] def get_scope_source(self):
return self._scope_sources[self["scopes/0/channels/0/inputselect"]]
def _parse_scope_data(self, data, trace_pts=None):
path=[self.dev_id,"scopes","0","wave"]
waves=data
for p in path:
if p not in waves:
return [],[]
waves=waves[p]
traces=[]
starts=[]
trace_pts=trace_pts or waves[0]["totalsamples"]
bn=0
blocks=[]
cstart=None
for w in waves:
if w.get("blocknumber",0)!=bn: # missing/extra block
bn=0
blocks=[]
continue
scale=w["channelscaling"]
offset=w["channeloffset"]
blk=((w["wave"]*scale)+offset)[:,0]
blocks.append(blk)
if bn==0: # first block of the traces
cstart=w["timestamp"]-self.ref_timestamp
if w.get("blockmarker",1)==1: # last block
tr=blocks[0] if len(blocks)==1 else np.concatenate(blocks)
if len(tr)==trace_pts:
traces.append(tr)
starts.append(cstart)
bn=0
blocks=[]
else:
bn=bn+1
return traces,starts
[docs] def setup_trigger(self, channel="continuous", edge="rising", level=0., delay=0., holdoff=0.):
funcargparse.check_parameter_range(channel,"channel",self._scope_sources+["continuous"])
funcargparse.check_parameter_range(edge,"edge",{"rising","falling","both"})
if channel=="continuous":
self["scopes/0/trigenable"]=0
else:
self["scopes/0/trigenable"]=1
self["scopes/0/trigchannel"]=self.get_scope_source_idx(channel)
self["scopes/0/trigrising" ]=(edge in {"rising" ,"both"})
self["scopes/0/trigfalling"]=(edge in {"falling","both"})
self["scopes/0/triglevel"]=level
self["scopes/0/trigdelay"]=delay
self["scopes/0/trigholdoff"]=holdoff
if channel!="continuous":
self["scopes/0/trigenable"]=0
self["scopes/0/trigenable"]=1
[docs] def setup_scope(self, timestep_idx=None, source="sigin0", trace_pts=None, bw_limit=True):
self["scopes/0/channels/0/inputselect"]=self.get_scope_source_idx(source)
self["scopes/0/channels/0/bwlimit"]=bw_limit
if timestep_idx is not None:
if timestep_idx>16 or timestep_idx<0:
raise ValueError("timestep_idx is an integer between 0 and 16")
self["scopes/0/time"]=timestep_idx
if trace_pts is not None:
self.set_scope_trace_pts(trace_pts)
self.server.sync()
[docs] def read_scope_poll(self, traces=1, poll_time=None, add_time=False, timeout="auto", fast_flush=False, stop_after=True):
trace_pts=self.get_scope_trace_pts()
trace_length=trace_pts*self.get_scope_timestep()
trace_length=max(0.01,trace_length)
res=[]
res_starts=[]
if poll_time is None:
total_length=traces*trace_length*1.2+trace_pts/10E6
else:
total_length=poll_time
if timeout=="auto":
timeout=max(total_length*3,1.)
poll_to=max(50,min(500,int(total_length*1E3)))
t=time.time()
while len(res)<traces:
if timeout is not None and (time.time()>t+timeout):
raise RuntimeError("execution timed out; trace length changed during execution?")
if fast_flush:
self.server.poll(1E-3,1)
else:
self.server.sync()
self.server.subscribe(self._build_path(["scopes/0/wave"]))
self["scopes/0/enable"]=1
data=self.server.poll(total_length,poll_to,0)
#data=None
if stop_after:
self["scopes/0/enable"]=0
self.server.unsubscribe(self._build_path(["scopes/0/wave"]))
#return data
data,starts=self._parse_scope_data(data,trace_pts=trace_pts)
if add_time:
ts=self.get_scope_timestep()
data=[np.column_stack(( np.arange(len(d))*ts,d )) for d in data]
if poll_time is not None:
return (data,starts) if add_time else data
res=res+data
res_starts=res_starts+starts
if len(data)==0:
total_length=total_length*2
else:
total_length=total_length/len(data)*(traces-len(res))
return (res[:traces],res_starts[:traces]) if add_time else res[:traces]
[docs] def read_demods_with_scope(self, trace_length, demods=None):
rates=self.get_demods_rates()
demods=demods or range(len(rates))
max_rate=max(rates)
max_rate_demod=rates.index(max_rate)
self.sync()
for d in demods:
self.server.subscribe(self._build_path(("demods",d,"sample")))
self.server.subscribe(self._build_path(["scopes/0/wave"]))
self["scopes/0/enable"]=1
self.server.sync()
raw_data=self.server.poll(trace_length,500,2)
self["scopes/0/enable"]=0
self.server.unsubscribe(self._build_path(["scopes/0/wave"]))
for d in demods:
self.server.unsubscribe(self._build_path(("demods",d,"sample")))
demod_data=self._parse_demods_data(raw_data,demods,max_rate_demod)
demod_data=align_all_timestamps(cut_to_largest_chunk(demod_data))
data_extent=(demod_data[-1][0,0],demod_data[-1][-1,0])
scope_data=self._parse_scope_data(raw_data,trace_pts=self.get_scope_trace_pts())
scope_data=[sd for sd in zip(*scope_data) if sd[1]>data_extent[0] and sd[1]<data_extent[1]]
dts=int(self.get_scope_timestep()*self.get_clockbase())
scope_data=[np.column_stack(( np.arange(len(d[0]))*dts+d[1],d[0] )) for d in scope_data]
return demod_data,scope_data
[docs]class ZISweeper(object):
"""
Sweeper interface for Zurich Instruments devices.
"""
def __init__(self, dev, settings=None):
object.__init__(self)
self.settings=dictionary.Dictionary(settings)
self.settings.merge_branch(self._default_settings,overwrite=False)
self.dev=dev
def __getitem__(self, key): return self.settings.__getitem__(key)
def __setitem__(self, key, value): return self.settings.__setitem__(key,value)
_default_settings={"log_sweep":False,
"wait/initial":5,
"wait/settling":5,
"wait/integrating":5,
"wait/averaging_pause":5,
"averaging":1,
"tc":None,
"order":None}
[docs] def is_set(self):
return all([k in self.settings for k in ["range","oscillator","demods"]])
[docs] def setup(self, sweep_range, oscillator, demods, settings=None):
self["range"]=sweep_range
self["oscillator"]=oscillator
self["demods"]=funcargparse.as_sequence(demods)
if settings:
self.settings.merge_branch(settings,overwrite=True)
[docs] def frequency_points(self):
if self["log_sweep"]:
start,stop,steps=self["range"]
logstart,logstop=np.log(start),np.log(stop)
return np.exp(np.linspace(logstart,logstop,steps))
else:
return np.linspace(*self["range"])
_min_tc=1E-3
_sampling_rate_per_tc=100.
_min_sampling_rate=1E3
_min_int_time=1E-1
[docs] def get_max_tc(self):
tcs=[self.dev["demods",d,"timeconstant"] for d in self["demods"]]
return max(max(tcs),self._min_tc)
[docs] def get_tc(self, variable_tc=False):
if self["tc"] is not None:
for d in self["demods"]:
self.dev["demods",d,"timeconstant"]=self["tc"][0] if variable_tc else self["tc"]
if self["order"] is not None:
for d in self["demods"]:
self.dev["demods",d,"order"]=self["order"]
return self.get_max_tc()
[docs] def execute(self, result_format=("c","comb"), method="zi"):
if method=="zi":
return self.execute_zi(result_format)
else:
return self.execute_manual(result_format)
[docs] def execute_manual(self, result_format=("c","comb")):
funcargparse.check_parameter_range(result_format[0],"result_format",{"xy","c"})
funcargparse.check_parameter_range(result_format[1],"result_format",{"sep","comb"})
if not self.is_set():
raise ValueError("setup sweeper first")
dev=self.dev
demods=self["demods"]
variable_tc=funcargparse.is_sequence(self["tc"])
tc=self.get_tc(variable_tc=variable_tc)
sampling_rate=max(self._min_sampling_rate,self._sampling_rate_per_tc/tc)
initial_rates=dev.get_demods_rates()
for d in range(dev.get_demods_num()):
dev.set_demod_rate(d,sampling_rate if (d in demods) else 0)
avgs=self["averaging"]
values=[[] for _ in demods]
freqs=self.frequency_points()
dev.set_parameter(("oscs",self["oscillator"],"freq"),freqs[0])
time.sleep(tc*self["wait/initial"])
for i,f in enumerate(freqs):
dev.set_parameter(("oscs",self["oscillator"],"freq"),f)
if variable_tc:
tc=max(self["tc"][i],self._min_tc)
sampling_rate=max(self._min_sampling_rate,self._sampling_rate_per_tc/tc)
for d in demods:
dev["demods",d,"timeconstant"]=tc
dev.set_demod_rate(d,sampling_rate)
time.sleep(tc*self["wait/settling"])
integrating_time=max(tc*self["wait/integrating"],self._min_int_time)
avg_data=[[] for _ in demods]
for i in range(avgs):
while True:
data=dev.read_demods(integrating_time*1.1)
if data is not None:
break
for d,a in zip(demods,avg_data):
a.append(data[d][:,1:].mean(axis=0))
if i+1<avgs:
time.sleep(tc*self["wait/averaging_pause"])
for v,a in zip(values,avg_data):
sam_xy=np.mean(a,axis=0)
if result_format[0]=="xy":
v.append(sam_xy)
else:
v.append(sam_xy[0]+1j*sam_xy[1])
for d,r in enumerate(initial_rates):
dev.set_demod_rate(d,r)
if result_format[1]=="sep":
return [np.column_stack(( freqs,v )) for v in values]
else:
return np.column_stack([freqs]+values)
[docs] def execute_zi(self, result_format=("c","comb"), poll_time=.1):
funcargparse.check_parameter_range(result_format[0],"result_format",{"xy","c"})
funcargparse.check_parameter_range(result_format[1],"result_format",{"sep","comb"})
dev=self.dev
if not self.is_set():
raise ValueError("setup sweeper first")
tc=self.get_tc()
if self["order"] is None:
order=max([dev["demods",d,"order"] for d in self["demods"]])
else:
order=self["order"]
sweeper=self.dev.server.sweep(500)
sweeper.set("sweep/device",dev.dev_id)
sweeper.set("sweep/start",self["range"][0])
sweeper.set("sweep/stop",self["range"][1])
sweeper.set("sweep/samplecount",self["range"][2])
sweeper.set("sweep/gridnode","oscs/{}/freq".format(self["oscillator"]))
sweeper.set("sweep/xmapping",1 if self["log_sweep"] else 0)
sweeper.set("sweep/scan",0)
#sweeper.set("sweep/settling/time",0)
sweeper.set("sweep/settling/time",self["wait/initial"]*tc)
sweeper.set("sweep/settling/tc",self["wait/settling"])
sweeper.set("sweep/averaging/tc",self["wait/integrating"])
'''sweeper.set("sweep/settling/time",self["wait/settling"]*tc)
sweeper.set("sweep/settling/tc",self["wait/integrating"])
sweeper.set("sweep/averaging/tc",0)'''
sweeper.set("sweep/averaging/sample",self["averaging"])
sweeper.set("sweep/bandwidthcontrol",0)
sweeper.set("sweep/bandwidth",tc_to_bandwidth(tc,order))
sweeper.set("sweep/order",order)
for d in self["demods"]:
sweeper.subscribe(dev._build_path(("demods",d,"sample")))
sampling_rate=max(self._min_sampling_rate,self._sampling_rate_per_tc/tc)
initial_rates=dev.get_demods_rates()
for d in range(dev.get_demods_num()):
dev.set_demod_rate(d,sampling_rate if (d in self["demods"]) else 0)
sweeper.execute()
sweeper.trigger()
while not(sweeper.finished()):
time.sleep(poll_time)
for d,r in enumerate(initial_rates):
dev.set_demod_rate(d,r)
sweeper_data=sweeper.read()
if dev.dev_id not in sweeper_data:
return None
values=[]
freqs=None
for d in self["demods"]:
ds=sweeper_data[dev.dev_id]["demods"][str(d)]["sample"]
if len(ds)==0:
return None
ds=ds[0][0]
if result_format[0]=="xy":
values.append(np.column_stack(( ds["x"],ds["y"] )))
else:
values.append(ds["x"]+1j*ds["y"])
if freqs is None:
freqs=ds["frequency"]
if result_format[1]=="sep":
return [np.column_stack(( freqs,v )) for v in values]
else:
return np.column_stack([freqs]+values)