from builtins import range
from ...core.utils.py3 import textstring
import numpy as np
from ...core.devio import SCPI, data_format #@UnresolvedImport
from ...core.utils import funcargparse, general #@UnresolvedImport
_depends_local=["...core.devio.SCPI"]
[docs]class ITektronixScope(SCPI.SCPIDevice):
"""
Generic Tektronix Oscilloscope.
"""
def __init__(self, addr):
SCPI.SCPIDevice.__init__(self,addr)
self.data_fmt="<i1"
self._wfmpre="WFMP"
self._trig="TRIGGER:MAIN"
[docs] def grab_single(self, wait_type="sync"):
self.write(":ACQ:STOPAFTER SEQ; :ACQ:STATE ON")
self.wait(wait_type)
[docs] def grab_continuous(self, enable=True):
self.write(":ACQ:STOPAFTER RUNSTOP; :ACQ:STATE",enable)
[docs] def is_continuous(self):
return self.ask(":ACQ:STOPAFTER?").strip().upper().startswith("RUN")
[docs] def is_grabbing(self):
return self.ask(":ACQ:STATE?","bool")
[docs] def get_edge_trigger_source(self):
src=self.ask(self._trig+":EDGE:SOURCE?").upper()
if src.startswith("CH"):
return int(src[2:])
raise RuntimeError("unrecognized trigger source: {}".format(src))
[docs] def get_edge_trigger_coupling(self):
return self.ask(self._trig+":EDGE:COUPL?").lower()
[docs] def get_edge_trigger_slope(self):
return self.ask(self._trig+":EDGE:SLOPE?").lower()
[docs] def get_trigger_level(self):
return self.ask(self._trig+":LEVEL?").lower()
[docs] def set_edge_trigger(self, source, level, coupling="dc", slope="fall"):
self.write(self._trig+":EDGE:SOURCE","CH{}".format(source))
self.write(self._trig+":EDGE:COUPL",coupling.upper())
self.write(self._trig+":EDGE:SLOPE",slope.upper())
self.write(self._trig+":LEVEL",level)
self.write(self._trig+":TYPE","EDGE")
return self.get_edge_trigger_source(), self.get_trigger_level(), self.get_edge_trigger_coupling(), self.get_edge_trigger_slope()
[docs] def get_trigger_mode(self):
return self.ask(self._trig+":MODE?")[:4].lower()
[docs] def set_trigger_mode(self, mode="auto"):
self.write(self._trig+":MODE", "AUTO" if mode.lower()=="auto" else "NORM")
return self.get_trigger_mode()
[docs] def get_trigger_state(self):
return self.ask("TRIGGER:STATE?").lower()
[docs] def force_trigger(self):
self.write("TRIGGER FORCE")
[docs] def get_horizontal_span(self):
return self.ask(":HORIZONTAL:SCALE?","float")*10.
[docs] def set_horizontal_span(self, span):
self.write(":HORIZONTAL:SCALE",span/10.,"float")
return self.get_horizontal_span()
[docs] def get_horizontal_offset(self):
return self.ask(":HORIZONTAL:POS?","float")
[docs] def set_horizontal_offset(self, offset=0.):
self.write(":HORIZONTAL:POS",offset,"float")
return self.get_horizontal_offset()
[docs] def channel_enabled(self, channel):
return self.ask(":SELECT:CH{}?".format(channel),"bool")
[docs] def enable_channel(self, channel, enabled=True):
self.write(":SELECT:CH{}".format(channel),enabled,"bool")
return self.channel_enabled(channel)
[docs] def get_coupling(self, channel):
return self.ask(":CH{}:COUPL?".format(channel)).lower()
[docs] def set_coupling(self, channel, coupling="dc"):
self.write(":CH{}:COUPL".format(channel),coupling.upper())
return self.get_coupling(channel)
[docs] def get_vertical_span(self, channel):
return self.ask(":CH{}:SCALE?".format(channel),"float")*10. # scale is per division (10 division per screen)
[docs] def set_vertical_span(self, channel, span):
self.write(":CH{}:SCALE".format(channel),span/10.,"float") # scale is per division (10 division per screen)
return self.get_vertical_span(channel)
[docs] def get_vertical_position(self, channel):
return self.ask(":CH{}:POSITION?".format(channel),"float")*self.get_vertical_span(channel)/10. # offset is in divisions (10 division per screen)
[docs] def set_vertical_position(self, channel, offset):
offset/=self.get_vertical_span(channel)/10. # offset is in divisions (10 division per screen)
self.write(":CH{}:POSITION".format(channel),offset,"float")
return self.get_vertical_position(channel)
[docs] def get_points_number(self, kind="send"):
funcargparse.check_parameter_range(kind,"kind",{"acq","trace","send"})
if kind=="acq":
return self.ask(":HORIZONTAL:RECORDLENGTH?","int")
elif kind=="trace":
return self.ask(":{}:RECORDLENGTH?".format(self._wfmpre),"int")
else:
return self.ask(":{}:NR_PT?".format(self._wfmpre),"int")
def _set_data_resolution(self, resolution):
pass
[docs] def set_points_number(self, pts_num, reset_limts=True):
if pts_num<5E4:
self._set_data_resolution("REDUCED")
else:
self._set_data_resolution("FULL")
self.write(":HORIZONTAL:RECORDLENGTH",pts_num,"int")
if reset_limts:
self.set_data_pts_range()
return self.get_points_number(kind="send")
[docs] def get_data_pts_range(self):
return self.ask(":DATA:START?","int"),min(self.ask(":DATA:STOP?","int"),self.get_points_number())
[docs] def set_data_pts_range(self, rng=None):
if rng is None:
start,stop=1,self.get_points_number(kind="acq")
else:
start,stop=min(rng),max(rng)
self.write(":DATA:START",start,"int")
self.write(":DATA:STOP",stop,"int")
return self.get_data_pts_range()
def _build_data_format(self, dfmt, size):
if dfmt.startswith("ASC"):
return "ascii"
if dfmt[0]=="S":
border=">"
dfmt=dfmt[1:]
else:
border="<"
kind="i" if dfmt[1]=="I" else "u"
return data_format.DataFormat(size,kind,border)
def _build_wfmpre(self, data):
wfmpre={}
wfmpre["fmt"]=data_format.DataFormat.from_desc("ascii") if data[2].startswith("ASC") else self._build_data_format(data[3],data[0])
wfmpre["pts"]=int(data[5])
wfmpre["xzero"]=float(data[10])
wfmpre["xincr"]=float(data[8])
wfmpre["ptoff"]=int(data[9])
wfmpre["ymult"]=float(data[12])
wfmpre["yzero"]=float(data[13])
wfmpre["yoff"]=float(data[14])
return wfmpre
[docs] def get_wfmpre(self):
data=self.ask(":{}?".format(self._wfmpre)).split(";")
data=[d.strip().upper() for d in data]
return self._build_wfmpre(data)
[docs] def request_data(self, fmt=None):
fmt=funcargparse.getdefault(fmt,self.data_fmt)
self.set_data_format(fmt)
data=self.ask(":CURVE?","raw")
return self.parse_trace_data(data,fmt)
def _scale_data(self, data, wfmpre=None):
wfmpre=wfmpre or self.get_wfmpre()
xpts=(np.arange(len(data))-wfmpre["ptoff"])*wfmpre["xincr"]+wfmpre["xzero"]
ypts=(data-wfmpre["yoff"])*wfmpre["ymult"]+wfmpre["yzero"]
return np.column_stack((xpts,ypts))
[docs] def selected_channel(self):
source=self.ask(":DATA:SOURCE?").strip().upper()
if not source.startswith("CH") or len(source)!=3:
raise RuntimeError("source {} is not a channel".format(source))
return int(source[-1])
[docs] def select_channel(self, channel):
self.write(":DATA:SOURCE CH{}".format(channel))
return self.selected_channel()
def _read_sweep_fast(self, channel, wfmpre=None):
self.write(":DATA:SOURCE CH{}".format(channel))
wfmpre=wfmpre or self.get_wfmpre()
data=self.ask(":CURVE?","raw")
trace=self.parse_trace_data(data,wfmpre["fmt"].to_desc())
if len(trace)!=wfmpre["pts"]:
raise RuntimeError("received data length {0} is not equal to the number of points {1}".format(len(trace),wfmpre["pts"]))
return self._scale_data(trace,wfmpre)
[docs] def read_multiple_sweeps(self, channels, wfmpres=None, ensure_fmt=True):
wfmpres=wfmpres or [None]*len(channels)
if ensure_fmt:
fmt=wfmpres[0]["fmt"] if wfmpres[0] else None
self.set_data_format(fmt=fmt)
return [self._read_sweep_fast(ch,wfmpre) for (ch,wfmpre) in zip(channels,wfmpres)]
[docs] def read_sweep(self, channel, wfmpre=None, ensure_fmt=True):
return self.read_multiple_sweeps([channel],[wfmpre],ensure_fmt=ensure_fmt)[0]
[docs]class DPO2014(ITektronixScope):
"""
Tektronix DPO2014 Oscilloscope.
"""
def __init__(self, addr):
ITektronixScope.__init__(self,addr)
self._wfmpre="WFMO"
self._trig="TRIGGER:A"
def _build_wfmpre(self, data):
wfmpre={}
wfmpre["fmt"]=data_format.DataFormat.from_desc("ascii") if data[2].startswith("ASC") else self._build_data_format(data[3],data[0])
wfmpre["pts"]=int(data[6])
wfmpre["xzero"]=float(data[10])
wfmpre["xincr"]=float(data[9])
wfmpre["ptoff"]=int(data[11])
wfmpre["ymult"]=float(data[13])
wfmpre["yzero"]=float(data[15])
wfmpre["yoff"]=float(data[14])
return wfmpre
def _set_data_resolution(self, resolution):
self.write(":DATA:RESOLUTION",resolution)
# def get_vertical_offset(self, channel):
# return self.ask(":CH{}:OFFSET?".format(channel),"float")
# def set_vertical_offset(self, channel, offset):
# self.write(":CH{}:OFFSET".format(channel),offset,"float")
# return self.get_vertical_offset(channel)
[docs] def get_horizontal_offset(self):
return self.ask(":HORIZONTAL:DELAY:TIME?","float") if self.ask(":HORIZONTAL:DELAY:MODE?","bool") else 0
[docs] def set_horizontal_offset(self, offset=0.):
if offset:
self.write(":HORIZONTAL:DELAY:TIME",offset,"float")
else:
self.write(":HORIZONTAL:DELAY:MODE",False,"bool")
return self.get_horizontal_offset()
[docs]class TDS2000(ITektronixScope):
"""
Tektronix TDS2000 Oscilloscope.
"""
def __init__(self, addr):
ITektronixScope.__init__(self,addr)
[docs]class MDO3000(SCPI.SCPIDevice):
"""
Tektronix MDO3000 Oscilloscope with the Spectrum Analyzer add-on.
"""
_default_operation_cooldown=0.02
def __init__(self, addr):
SCPI.SCPIDevice.__init__(self,addr)
self.data_fmt_scope="<i1"
self.data_fmt_spec="<f4"
self._wfmpre="WFMO"
self._rel_sweep_step=5.
self._add_settings_node("avg",self.get_avg,self.set_avg)
self._add_settings_node("frequency_range",self.get_frequency_range,self.set_frequency_range)
self._add_settings_node("bandwidth",self.get_bandwidth,self.set_bandwidth)
self._add_settings_node("rel_sweep_step",self.get_rel_sweep_step,self.set_rel_sweep_step)
self._add_settings_node("ref_level",self.get_ref_level,self.set_ref_level)
self._add_settings_node("rf_avg",self.get_rf_avg,self.set_rf_avg)
self._add_settings_node("rf_mode",self.get_rf_mode,self.set_rf_mode)
self._add_settings_node("channel",self.get_selected_channel,self.select_channel)
### TRIGGER ###
[docs] def get_avg(self):
mode=self.ask(":ACQ:MODE?").upper()
if mode=="AVERAGE":
return self.ask(":ACQ:NUMAVG?","int")
else:
return 1
[docs] def set_avg(self, avg):
if avg is True or avg>1:
self.write(":ACQ:MODE AVERAGE")
if avg is not True:
self.write(":ACQ:NUMAVG",avg)
self.set_rf_avg(avg)
else:
self.write(":ACQ:MODE SAMPLE")
return self.get_avg()
[docs] def grab_single(self, avg=None, wait_type="sync"):
if avg is not None:
self.set_avg(avg)
self.write(":ACQ:STOPAFTER SEQ")
self.write(":ACQ:STATE ON")
self.wait(wait_type)
[docs] def grab_continuous(self, enable=True):
self.write(":ACQ:STOPAFTER RUNSTOP")
self.write(":ACQ:STATE",enable)
[docs] def is_continuous(self):
return self.ask(":ACQ:STOPAFTER?").strip().upper().startswith("RUN")
[docs] def is_grabbing(self):
return self.ask(":ACQ:STATE?")
### SCOPE ###
[docs] def get_horizontal_span(self):
return self.ask(":HORIZONTAL:SCALE?","float")*10.
[docs] def set_horizontal_span(self, span):
self.write(":HORIZONTAL:SCALE",span/10.,"float")
return self.get_horizontal_span()
[docs] def channel_enabled(self, channel):
return self.ask(":SELECT:CH{}?".format(channel),"bool")
[docs] def enable_channel(self, channel, enabled=True):
self.write(":SELECT:CH{}".format(channel),enabled,"bool")
return self.channel_enabled(channel)
[docs] def get_vertical_span(self, channel):
return self.ask(":CH{}:SCALE?".format(channel),"float")*10.
[docs] def set_vertical_span(self, channel, span):
self.write(":CH{}:SCALE".format(channel),span/10.,"float")
return self.get_vertical_span(channel)
### SPECTRUM ANALYZER ##
[docs] def get_frequency_range(self):
center=self.ask(":RF:FREQ?","float")
span=self.ask(":RF:SPAN?","float")
return center-span/2., center+span/2.
[docs] def set_frequency_range(self, frequency):
try:
start,stop=min(frequency),max(frequency)
except TypeError:
start,stop=frequency,frequency
self.write(":RF:FREQ",(start+stop)/2.)
self.write(":RF:SPAN",stop-start)
return self.get_frequency_range()
[docs] def get_bandwidth(self):
return self.ask(":RF:RBW?","float")
[docs] def set_bandwidth(self, bwidth):
self.write(":RF:RBW:MODE MANUAL")
self.write(":RF:RBW",bwidth)
return self.get_bandwidth()
[docs] def get_rel_sweep_step(self):
return self._rel_sweep_step
[docs] def set_rel_sweep_step(self, step):
if step>0:
self._rel_sweep_step=step
return self._rel_sweep_step
[docs] def get_rf_avg(self):
return self.ask(":RF:RF_AVERAGE:NUMAVG?","int")
[docs] def set_rf_avg(self, avg):
self.write(":RF:RF_AVERAGE:NUMAVG", avg)
return self.get_rf_avg()
[docs] def get_ref_level(self):
return self.ask(":RF:REFLEVEL?","float")
[docs] def set_ref_level(self, level):
self.write(":RF:REFLEVEL",level)
return self.get_ref_level()
_rf_trace_kind={"normal":"RF_NORMAL","avg":"RF_AVERAGE","max":"RF_MAXHOLD","min":"RF_MINHOLD"}
_inv_rf_trace_kind=general.invert_dict(_rf_trace_kind)
def _get_rf_trace_kind(self, kind):
try:
return self._rf_trace_kind[kind]
except KeyError:
funcargparse.check_parameter_range(kind,"kind",self._rf_trace_kind)
[docs] def enable_rf_channel(self, kind="normal", enable=True):
self.write(":SELECT:{} {}".format(self._get_rf_trace_kind(kind),1 if enable else 0))
_rf_modes={"min":"MINUSPEAK","max":"PLUSPEAK","sample":"SAMPLE","avg":"AVERAGE"}
_inv_rf_modes=general.invert_dict(_rf_modes)
[docs] def get_rf_mode(self, kind="normal"):
mode=self.ask(":RF:DETECT:{}?".format(self._get_rf_trace_kind(kind))).strip().upper()
return self._inv_rf_modes[mode]
[docs] def set_rf_mode(self, mode="max", kind="norm"):
try:
mode=self._rf_modes[mode]
except KeyError:
funcargparse.check_parameter_range(mode,"mode",self._rf_modes)
self.write(":RF:DETECT:MODE MANUAL")
self.write(":RF:DETECT:{} {}".format(self._get_rf_trace_kind(kind),mode))
return self.get_rf_mode(kind)
### DATA TRANSFER ###
[docs] def get_points_number(self, kind="send"):
funcargparse.check_parameter_range(kind,"kind",{"acq","trace","send"})
if kind=="acq":
return self.ask(":HORIZONTAL:RECORDLENGTH?","int")
elif kind=="trace":
return self.ask(":{}:RECORDLENGTH?".format(self._wfmpre),"int")
else:
return self.ask(":{}:NR_PT?".format(self._wfmpre),"int")
def _set_data_resolution(self, resolution):
pass
[docs] def set_points_number(self, pts_num, reset_limts=True):
if pts_num<5E4:
self._set_data_resolution("REDUCED")
else:
self._set_data_resolution("FULL")
self.write(":HORIZONTAL:RECORDLENGTH",pts_num,"int")
if reset_limts:
self.set_data_pts_range()
return self.get_points_number(kind="send")
[docs] def get_data_pts_range(self):
return self.ask(":DATA:START?","int"),min(self.ask(":DATA:STOP?","int"),self.get_points_number())
[docs] def set_data_pts_range(self, rng=None):
if rng is None:
start,stop=1,self.get_points_number(kind="acq")
else:
start,stop=min(rng),max(rng)
self.write(":DATA:START",start,"int")
self.write(":DATA:STOP",stop,"int")
return self.get_data_pts_range()
def _get_default_data_format(self, fmt):
if fmt is not None:
return fmt
src=self.ask(":DATA:SOURCE?").strip().upper()
return self.data_fmt_scope if src.startswith("CH") else self.data_fmt_spec
[docs] def request_data(self, fmt=None, set_fmt=True):
fmt=self._get_default_data_format(fmt)
if set_fmt:
self.set_data_format(fmt)
data=self.ask(":CURVE?","raw")
return self.parse_trace_data(data,fmt)
def _scale_time_data(self, data):
xzero=self.ask(":{}:XZERO?".format(self._wfmpre),"float")
xincr=self.ask(":{}:XINCR?".format(self._wfmpre),"float")
xpts=np.arange(len(data))*xincr+xzero
ymult=self.ask(":{}:YMULT?".format(self._wfmpre),"float")
yzero=self.ask(":{}:YZERO?".format(self._wfmpre),"float")
yoff=self.ask(":{}:YOFF?".format(self._wfmpre),"float")
ypts=(data-yoff)*ymult+yzero
return np.column_stack((xpts,ypts))
[docs] def read_trace(self, channel):
self.select_channel(channel)
pts=self.get_points_number(kind="send")
trace=self.request_data()
if len(trace)!=pts:
raise RuntimeError("received data length {0} is not equal to the number of points {1}".format(len(trace),pts))
return self._scale_time_data(trace)
def _scale_rf_data(self, data):
center=self.ask(":RF:FREQ?","float")
span=self.ask(":RF:SPAN?","float")
xpts=np.linspace(center-span/2.,center+span/2.,len(data))
return np.column_stack((xpts,data))
[docs] def read_sweep(self, kind="normal"):
self.select_channel(self._get_rf_trace_kind(kind))
pts=self.get_points_number(kind="send")
trace=self.request_data()
if len(trace)!=pts:
raise RuntimeError("received data length {0} is not equal to the number of points {1}".format(len(trace),pts))
return self._scale_rf_data(trace)
def _read_sweep_fast(self, pts, fmt):
trace=self.request_data(fmt, set_fmt=False)
if len(trace)!=pts:
raise RuntimeError("received data length {0} is not equal to the number of points {1}".format(len(trace),pts))
return self._scale_rf_data(trace)
[docs] def grab_rf_sweep(self, kind="avg", avg=None, freq_step=None):
if freq_step is None:
self.grab_single(avg=avg)
return self.read_sweep(kind)
if freq_step=="auto":
freq_step=self._rel_sweep_step*self.get_bandwidth()
freq_range=self.get_frequency_range()
center,span=(freq_range[0]+freq_range[1])/2.,freq_range[1]-freq_range[0]
tr_pts=self.get_points_number()
if tr_pts!=751:
raise RuntimeError("expected {} points per trace; got {}".format(751,tr_pts))
tr_span=(tr_pts-1)*freq_step
pts=int(span/(freq_step*2.))*2+1
if pts<=tr_pts:
self.set_frequency_range((center-tr_span/2.,center+tr_span/2.))
sweep=self.grab_rf_sweep(kind,avg,freq_step=None)
self.set_frequency_range(freq_range)
dpts=(tr_pts-pts)//2
return sweep[dpts:tr_pts-dpts]
core_pts=501
step_span=(core_pts-1)*freq_step
chunks_n=(pts-2)//(core_pts-1)+1
chunks=[]
for ch in range(chunks_n):
tr_center=freq_range[0]+step_span*(ch+0.5)
self.set_frequency_range((tr_center-tr_span/2.,tr_center+tr_span/2.))
self.grab_single(avg=avg)
chunks.append(self.read_sweep(kind))
for c in range(len(chunks)-1):
chunks[c]=chunks[c][(tr_pts-core_pts)//2:-(tr_pts-core_pts)//2-1]
last_pts=(pts-2)%(core_pts-1)+1
chunks[-1]=chunks[-1][(tr_pts-core_pts)//2:(tr_pts-core_pts)//2+last_pts+1]
self.set_frequency_range(freq_range)
return np.concatenate(chunks)
[docs] def get_selected_channel(self):
source=self.ask(":DATA:SOURCE?").strip().upper()
if source.startswith("CH") and len(source)==3:
return int(source[-1])
if source.startswith("RF"):
return self._inv_rf_trace_kind[source]
raise RuntimeError("source {} is not a channel".format(source))
[docs] def select_channel(self, channel):
if not isinstance(channel,textstring):
channel="CH{}".format(channel)
elif channel in self._rf_trace_kind:
channel=self._get_rf_trace_kind(channel)
self.write(":DATA:SOURCE {}".format(channel))
return self.get_selected_channel()