from builtins import range
import numpy as np
from ...core.devio import SCPI, data_format #@UnresolvedImport
from ...core.utils import funcargparse #@UnresolvedImport
from ...core.utils import general as general_utils #@UnresolvedImport
_depends_local=["...core.devio.SCPI"]
[docs]class DSA1030A(SCPI.SCPIDevice):
"""
Rigol DSA1030A Spectrum Analyzer.
"""
def __init__(self, addr):
SCPI.SCPIDevice.__init__(self,addr,term_write="\n")
self.data_fmt="<f4"
self.channel=1
self._add_settings_node("power_on",self.get_output,None)
self._add_settings_node("power",self.get_output_level,self.set_output_level)
self._add_settings_node("frequency_range",self.get_frequency_range,self.set_frequency_range)
self._add_settings_node("bandwidth",self.get_bandwidth,self.set_bandwidth)
self._add_settings_node("bandwidth/video",
lambda: self.get_bandwidth("video"),lambda bw: self.set_bandwidth(bw,"video"))
self._add_settings_node("bandwidth/resolution",
lambda: self.get_bandwidth("resolution"),lambda bw: self.set_bandwidth(bw,"resolution"))
self._add_settings_node("attenuation",self.get_attenuation,self.set_attenuation)
self._add_settings_node("sweep_points",self.get_sweep_points,self.set_sweep_points)
[docs] def select_channel(self, channel):
self.channel=channel
[docs] def current_channel(self):
return self.channel
[docs] def sweep_single(self, wait_type="sync"):
self.write(":ABORT;:INIT:CONT OFF;:INIT")
self.wait(wait_type)
[docs] def sweep_reset(self, wait_type="sync"):
self.write(":ABORT;:INIT")
self.wait(wait_type)
[docs] def sweep_continuous(self, enable=True):
self.write(":INIT:CONT",enable)
if not enable:
self.write(":ABORT")
[docs] def is_continuous(self):
return self.ask(":INIT:CONT?","bool")
[docs] def get_output(self):
return self.ask(":OUTPUT:STATE?","bool")
[docs] def set_output(self, enabled=True):
self.write(":OUTPUT:STATE",enabled)
[docs] def get_output_level(self):
return self.ask(":SOURCE:POWER:LEVEL:IMMEDIATE:AMPLITUDE?","float")
[docs] def set_output_level(self, level):
if level is None:
self.set_output(False)
return None
else:
self.write(":SOURCE:POWER:LEVEL:IMMEDIATE:AMPLITUDE {:.2E}".format(level))
return self.get_output_level()
[docs] def get_frequency_range(self):
start=self.ask(":SENSE:FREQUENCY:START?","float")
stop=self.ask(":SENSE:FREQUENCY:STOP?","float")
return start,stop
[docs] def set_frequency_range(self, frequency):
try:
start,stop=min(frequency),max(frequency)
except TypeError:
start,stop=frequency,frequency
if start-stop>=100.:
self.write(":SENSE:FREQUENCY:START {:.10E}".format(start))
self.write(":SENSE:FREQUENCY:STOP {:.10E}".format(stop))
else: # go to zero span mode
self.write(":SENSE:FREQUENCY:SPAN {:.10E}".format(stop-start))
self.write(":SENSE:FREQUENCY:CENTER {:.10E}".format((start+stop)/2.))
return self.get_frequency_range()
[docs] def get_sweep_points(self):
return self.ask(":SENSE:SWEEP:POINTS?","int")
[docs] def set_sweep_points(self, pts):
self.write(":SENSE:SWEEP:POINTS",int(pts))
return self.get_sweep_points()
[docs] def get_bandwidth(self, bw_type="video"):
if not bw_type in ["video","resolution"]:
raise ValueError("unrecognized bandwidth type: {0}".format(bw_type))
return self.ask(":SENSE:BANDWIDTH:{0}?".format(bw_type),"float")
[docs] def set_bandwidth(self, bwidth, bw_type="both"):
if not bw_type in ["video","resolution","both"]:
raise ValueError("unrecognized bandwidth type: {0}".format(bw_type))
if bw_type=="both":
self.set_bandwidth(bwidth,"video")
return self.set_bandwidth(bwidth,"resolution")
else:
if bwidth is None:
self.write(":SENSE:BANDWIDTH:{0}:AUTO".format(bw_type),True)
else:
self.write(":SENSE:BANDWIDTH:{0}:AUTO".format(bw_type),False)
self.write(":SENSE:BANDWIDTH:{0}".format(bw_type),bwidth)
return self.get_bandwidth(bw_type)
[docs] def get_attenuation(self):
return self.ask(":SENSE:POWER:RF:ATTENUATION?","float")
[docs] def set_attenuation(self, att):
if att is None:
self.write(":SENSE:POWER:RF:ATTENUATION:AUTO 1")
else:
self.write(":SENSE:POWER:RF:ATTENUATION",att)
return self.get_attenuation()
[docs] def autoscale_power(self):
self.write(":SENSE:POWER:ASCALE")
[docs] def request_data(self, fmt=None):
fmt=funcargparse.getdefault(fmt,self.data_fmt)
self.set_data_format(fmt)
data=self.ask(":TRACE:DATA? TRACE{0}".format(self.channel),"raw")
return self.parse_trace_data(data,fmt)
[docs] def read_sweep(self):
pts=self.get_sweep_points()
trace=self.request_data()
if len(trace)!=pts:
raise RuntimeError("received data length {0} is not equal to the number of points {1}".format(len(trace),pts))
freq_range=self.get_frequency_range()
freqs=np.linspace(freq_range[0],freq_range[1],pts)
return np.column_stack(( freqs,trace ))
_chunks_ovlap_pts=300
[docs] def grab_single_sweep(self, chunks=1, autoscale=True, auto_step_fraction=0.1, chunks_ovlap_pts="auto", error_retries=3):
for t in general_utils.RetryOnException(error_retries):
with t:
self.wait()
freq_range=self.get_frequency_range()
cont=self.is_continuous()
if autoscale:
rbw=self.get_bandwidth("resolution")
vbw=self.get_bandwidth("video")
pts=self.get_sweep_points()
att=self.get_attenuation()
self.set_bandwidth(min((freq_range[1]-freq_range[0])/10.,3E6))
self.set_sweep_points(1201)
self.sweep_single()
self.autoscale_power()
self.set_bandwidth(rbw,"resolution")
self.set_bandwidth(vbw,"video")
self.set_attenuation(att)
opt_step=self.get_bandwidth("resolution")*auto_step_fraction
if chunks=="auto":
opt_pts=(freq_range[1]-freq_range[0])/opt_step
if opt_pts<pts:
chunks=1
elif opt_pts<3001:
pts=opt_pts
chunks=1
else:
pts=3001
chunks=int(opt_pts//pts+1)
if chunks_ovlap_pts=="auto":
chunks_ovlap_pts=150
else:
if chunks_ovlap_pts=="auto":
chunks_ovlap_pts=0
self.set_sweep_points(pts)
if chunks==1:
self.sweep_single()
data=self.read_sweep()
else:
rel_ovlap=chunks_ovlap_pts/(pts-1.)
df=float(freq_range[1]-freq_range[0])/chunks
spf=df/(1.-rel_ovlap*2)
f0=freq_range[0]-spf*rel_ovlap
sweeps=[]
for i in range(chunks):
self.set_frequency_range((f0+df*i,f0+df*i+spf))
self.wait()
self.sweep_single()
single_sweep=self.read_sweep()[chunks_ovlap_pts:pts-chunks_ovlap_pts]
if i!=0:
single_sweep=single_sweep[1:,:]
sweeps.append(single_sweep)
self.set_frequency_range(freq_range)
data=np.concatenate(sweeps,axis=0)
adj_freq=np.linspace(freq_range[0],freq_range[1],len(data))
fstep=(freq_range[1]-freq_range[0])/(len(data)-1.)
if abs(data[:,0]-adj_freq).max()>fstep*.1:
raise RuntimeError("frequency adjustment error")
data[:,0]=adj_freq
self.sweep_continuous(cont)
return data
[docs] def apply_settings(self, settings):
if "power_on" in settings and not settings["power_on"]:
self.set_output(False)
SCPI.SCPIDevice.apply_settings(self,settings)
if "power_on" in settings and settings["power_on"]:
self.set_output(True)
self.wait()
return self.get_settings()