Source code for pylablib.aux_libs.devices.AgilentElectronics

from builtins import range

from ...core.devio import SCPI, units, data_format  #@UnresolvedImport
from ...core.utils import general, funcargparse  #@UnresolvedImport

import numpy as np

_depends_local=["...core.devio.SCPI"]





[docs]class AWG33220A(SCPI.SCPIDevice): """ Agilent AWG33220A Arbitrary Wave Generator. Also partially works with compatible AWGs such as Agilent 33500, Rigol D1000, etc. """ def __init__(self, addr): SCPI.SCPIDevice.__init__(self,addr,term_write="\n") self._add_settings_node("output_on",self.get_output,None) self._add_scpi_parameter("output_polarity",":OUTPUT:POLARITY",kind="enum",options={"NORM":"norm","INV":"inv"},add_node=True) self._add_scpi_parameter("output_sync",":OUTPUT:SYNC",kind="bool",add_node=True) self._add_settings_node("range",self.get_range,self.set_range,multiarg=False) self._add_settings_node("load",self.get_load,self.set_load) self._add_settings_node("frequency",self.get_frequency,self.set_frequency) self._add_settings_node("phase",self.get_phase,self.set_phase) functions={"SIN":"sine","SQU":"square","RAMP":"ramp","PULS":"pulse","NOIS":"noise","PRBS":"prbs","DC":"DC","USER":"user","ARB":"arb"} self._add_scpi_parameter("function",":FUNCTION",kind="enum",options=functions,add_node=True) self._add_scpi_parameter("duty_cycle",":FUNCTION:SQUARE:DCYCLE",add_node=True) self._add_scpi_parameter("ramp_symmetry",":FUNCTION:RAMP:SYMMETRY",add_node=True) self._add_scpi_parameter("burst_enabled",":BURST:STATE",kind="bool",add_node=True) self._add_scpi_parameter("burst_mode",":BURST:MODE",kind="enum",options={"TRIG":"trig","GAT":"gate"},add_node=True) self._add_settings_node("burst_ncycles",self.get_burst_ncycles,self.set_burst_ncycles) self._add_scpi_parameter("gate_polarity",":BURST:GATE:POL",kind="enum",options={"NORM":"norm","INV":"inv"},add_node=True) self._add_scpi_parameter("trigger_source",":TRIG:SOURCE",kind="enum",options={"IMM":"imm","EXT":"ext","BUS":"bus"},add_node=True) self._add_scpi_parameter("trigger_slope",":TRIG:SLOPE",kind="enum",options={"POS":"pos","NEG":"neg"},add_node=True) self._add_scpi_parameter("trigger_output",":OUTPUT:TRIG",kind="bool",add_node=True) self._add_scpi_parameter("output_trigger_slope",":OUTPUT:TRIG:SLOPE",kind="enum",options={"POS":"pos","NEG":"neg"},add_node=True)
[docs] def get_output(self): """Check if the output is enabled""" return self.ask(":OUTPUT?","bool")
[docs] def set_output(self, enabled=True): """Turn the output on or off""" self.write(":OUTPUT",enabled) return self.get_output()
[docs] def get_output_polarity(self): """ Get output polarity. Can be either ``"norm"`` or ``"inv"``. """ return self._get_scpi_parameter("output_polarity")
[docs] def set_output_polarity(self, polarity="norm"): """ Set output polarity. Can be either ``"norm"`` or ``"inv"``. """ return self._set_scpi_parameter("output_polarity",polarity)
[docs] def is_sync_output_enabled(self): """Check if SYNC output is enabled""" return self._get_scpi_parameter("sync_output")
[docs] def enable_sync_output(self, enabled=True): """Enable or disable SYNC output""" return self._set_scpi_parameter("sync_output",enabled)
[docs] def get_load(self): """Get the output load""" return self.ask("OUTPUT:LOAD?","float")
[docs] def set_load(self, load=None): """Set the output load (``None`` means High-Z)""" if load is None: self.write("OUTPUT:LOAD INF") else: self.write("OUTPUT:LOAD",load,"float") return self.get_load()
[docs] def get_function(self): """ Get output function. Can be one of the following: ``"sine"``, ``"square"``, ``"ramp"``, ``"pulse"``, ``"noise"``, ``"prbs"``, ``"DC"``, ``"user"``, ``"arb"``. Not all functions can be available, depending on the particular model of the generator. """ return self._get_scpi_parameter("function")
[docs] def set_function(self, func): """ Set output function. Can be one of the following: ``"sine"``, ``"square"``, ``"ramp"``, ``"pulse"``, ``"noise"``, ``"prbs"``, ``"DC"``, ``"user"``, ``"arb"``. Not all functions can be available, depending on the particular model of the generator. """ return self._set_scpi_parameter("function",func)
[docs] def get_amplitude(self): """Get output amplitude""" self.write(":VOLTAGE:UNIT VPP") return self.ask(":VOLTAGE?","float")/2.
[docs] def set_amplitude(self, amplitude): """Set output amplitude""" self.write(":VOLTAGE",amplitude*2.,"float",unit="Vpp") return self.get_amplitude()
[docs] def get_offset(self): """Get output offset""" return self.ask(":VOLTAGE:OFFSET?","float")
[docs] def set_offset(self, offset): """Set output offset""" return self.write(":VOLTAGE:OFFSET",offset,"float")
[docs] def get_range(self): """ Get output voltage range. Return tuple ``(vmin, vmax)`` with the low and high voltage values (i.e., ``offset-amplitude`` and ``offset+amplitude``). """ return self.ask(":VOLTAGE:LOW?","float"),self.ask(":VOLTAGE:HIGH?","float")
[docs] def set_range(self, rng): """ Set output voltage range. If span is less than ``1E-4``, automatically switch to DC mode. """ try: low,high=min(rng),max(rng) except TypeError: low,high=rng,rng if abs(high-low)<1E-4: self.set_function("DC") self.set_amplitude(10E-3) self.set_offset((high+low)/2.) else: curr_rng=self.get_range() if low<curr_rng[1]: self.write("VOLTAGE:LOW",low,"float") self.write("VOLTAGE:HIGH",high,"float") else: self.write("VOLTAGE:HIGH",high,"float") self.write("VOLTAGE:LOW",low,"float") return self.get_range()
[docs] def get_frequency(self): """Get output frequency""" value,unit=self.ask(":FREQUENCY?","value") return units.convert_frequency_units(value,unit or "Hz","Hz")
[docs] def set_frequency(self, frequency): """Set output frequency""" self.write(":FREQUENCY",frequency,"float",unit="Hz") return self.get_frequency()
[docs] def get_phase(self): """Get output phase (in degrees)""" self.write(":UNIT:ANGLE DEG") return self.ask(":PHASE?","float")
[docs] def set_phase(self, phase): """Set output phase (in degrees)""" self.write(":UNIT:ANGLE DEG") self.write(":PHASE",phase,"float") return self.get_phase()
[docs] def get_duty_cycle(self): """ Get output duty cycle (in percent). Only applies to ``"square"`` output function. """ return self._get_scpi_parameter("duty_cycle")
[docs] def set_duty_cycle(self, dcycle): """ Set output duty cycle (in percent). Only applies to ``"square"`` output function. """ return self._set_scpi_parameter("duty_cycle",dcycle)
[docs] def get_ramp_symmetry(self): """ Get output ramp symmetry (in percent). Only applies to ``"ramp"`` output function. """ return self._get_scpi_parameter("ramp_symmetry")
[docs] def set_ramp_symmetry(self, rsymm): """ Set output ramp symmetry (in percent). Only applies to ``"ramp"`` output function. """ return self._set_scpi_parameter("ramp_symmetry",rsymm)
[docs] def is_burst_enabled(self): """Check if the burst mode is enabled""" return self._get_scpi_parameter("burst_enabled")
[docs] def enable_burst(self, enabled=True): """Enable burst mode""" return self._set_scpi_parameter("burst_enabled",enabled)
[docs] def get_burst_mode(self): """ Get burst mode. Can be either ``"trig"`` or ``"gate"``. """ return self._get_scpi_parameter("burst_mode")
[docs] def set_burst_mode(self, mode): """ Set burst mode. Can be either ``"trig"`` or ``"gate"``. """ return self._set_scpi_parameter("burst_mode",mode)
[docs] def get_burst_ncycles(self): """ Get burst mode ncycles. Infinite corresponds to a large value (>1E37). """ return self.ask(":BURST:NCYC?","float")
[docs] def set_burst_ncycles(self, ncycles=1): """ Set burst mode ncycles. Infinite corresponds to ``None`` """ if ncycles is None or ncycles>1E37: self.write(":BURST:NCYC INF") else: self.write(":BURST:NCYC",ncycles) return self.get_burst_ncycles()
[docs] def get_gate_polarity(self): """ Get burst gate polarity. Can be either ``"norm"`` or ``"inv"``. """ return self._get_scpi_parameter("gate_polarity")
[docs] def set_gate_polarity(self, polarity="norm"): """ Set burst gate polarity. Can be either ``"norm"`` or ``"inv"``. """ return self._set_scpi_parameter("gate_polarity",polarity)
[docs] def get_trigger_source(self): """ Get trigger source. Can be either ``"imm"``, ``"ext"``, or ``"bus"``. """ return self._get_scpi_parameter("trigger_source")
[docs] def set_trigger_source(self, src): """ Set trigger source. Can be either ``"imm"``, ``"ext"``, or ``"bus"``. """ return self._set_scpi_parameter("trigger_source",src)
[docs] def get_trigger_slope(self): """ Get trigger slope. Can be either ``"pos"``, or ``"neg"``. """ return self._get_scpi_parameter("trigger_slope")
[docs] def set_trigger_slope(self, slope): """ Set trigger slope. Can be either ``"pos"``, or ``"neg"``. """ return self._set_scpi_parameter("trigger_slope",slope)
[docs] def is_trigger_output_enabled(self): """Check if the trigger output is enabled""" return self._get_scpi_parameter("trigger_output")
[docs] def enable_trigger_output(self, enabled=True): """Enable trigger output""" return self._set_scpi_parameter("trigger_output",enabled)
[docs] def get_output_trigger_slope(self): """ Get output trigger slope. Can be either ``"pos"``, or ``"neg"``. """ return self._get_scpi_parameter("output_trigger_slope")
[docs] def set_output_trigger_slope(self, slope): """ Set output trigger slope. Can be either ``"pos"``, or ``"neg"``. """ return self._set_scpi_parameter("output_trigger_slope",slope)
[docs] def apply_settings(self, settings): if "output_on" in settings and not settings["output_on"]: self.set_output(False) SCPI.SCPIDevice.apply_settings(self,settings) if "output_on" in settings and settings["output_on"]: self.set_output(True) return self.get_settings()
[docs]class AMP33502A(SCPI.SCPIDevice): """ Agilent AMP3350A amplifier. """ def __init__(self, addr): SCPI.SCPIDevice.__init__(self,addr)
[docs] def get_output(self, channel=None): return self.ask("OUTPUT{}:STATE?".format(channel+1),"bool")
[docs] def set_output(self, channel, enabled=True): self.write("OUTPUT{}:STATE".format(channel+1),enabled) return self.get_output(channel)
[docs] def get_path(self, channel): return self.ask("ROUTE{}:PATH?".format(channel+1),"string").lower()
[docs] def set_path(self, channel, path): funcargparse.check_parameter_range(path,"path",{"dir","ampl"}) self.write("ROUTE{}:PATH".format(channel+1),path.upper()) return self.get_path(channel)
[docs] def get_coupling(self, channel): return self.ask("INPUT{}:COUPLING?".format(channel+1),"string").lower()
[docs] def set_coupling(self, channel, coupling): funcargparse.check_parameter_range(coupling,"coupling",{"ac","dc"}) self.write("INPUT{}:COUPLING".format(channel+1),coupling.upper()) return self.get_coupling(channel)
[docs] def get_impedance(self, channel): return self.ask("INPUT{}:IMPEDANCE?".format(channel+1),"float")
[docs] def set_impedance(self, channel, impedance): self.write("INPUT{}:IMPEDANCE".format(channel+1),impedance) return self.get_impedance(channel)
[docs] def get_settings(self): settings=SCPI.SCPIDevice.get_settings(self) settings["output_on"]=[self.get_output(ch) for ch in [0,1]] settings["path"]=[self.get_path(ch) for ch in [0,1]] settings["coupling"]=[self.get_coupling(ch) for ch in [0,1]] settings["impedance"]=[self.get_impedance(ch) for ch in [0,1]] return settings
[docs] def apply_settings(self, settings): for ch in [0,1]: ch_settings={} for k in {"output_on","path","coupling","impedance"}: if k in settings: if isinstance(settings[k],list): ch_settings[k]=settings[k][ch] else: ch_settings[k]=settings[k] if "output_on" in ch_settings and not ch_settings["output_on"]: self.set_output(False) if "path" in ch_settings: self.set_path(ch,ch_settings["path"]) if "coupling" in ch_settings: self.set_coupling(ch,ch_settings["coupling"]) if "impedance" in ch_settings: self.set_impedance(ch,ch_settings["impedance"]) if "output_on" in ch_settings and ch_settings["output_on"]: self.set_output(True) return self.get_settings()
[docs]class N9310A(SCPI.SCPIDevice): """ Agilent N9310A microwave generator. """ def __init__(self, addr): SCPI.SCPIDevice.__init__(self,addr) self._add_settings_node("power_on",self.get_output,None) self._add_settings_node("power",self.get_output_level,self.set_output_level) self._add_settings_node("frequency",self.get_frequency,self.set_frequency)
[docs] def get_output(self): return self.ask(":RFOUTPUT:STATE?","bool")
[docs] def set_output(self, enabled=True): self.write(":RFOUTPUT:STATE",enabled)
[docs] def get_output_level(self): value,unit=self.ask(":AMPLITUDE:CW?","value") return units.convert_power_units(value,unit or "dBm","dBm")
[docs] def set_output_level(self, level): if level is None: self.set_output(False) return None else: self.write(":AMPLITUDE:CW",level,"float",unit="dBm") return self.get_output_level()
[docs] def get_frequency(self): value,unit=self.ask(":FREQUENCY:CW?","value") return units.convert_frequency_units(value,unit or "Hz","Hz")
[docs] def set_frequency(self, frequency): self.write(":FREQUENCY:CW",frequency/1E3,"float",unit="kHz") return self.get_frequency()
[docs] def apply_settings(self, settings): if "power_on" in settings and not settings["power_on"]: self.set_output(False) SCPI.SCPIDevice.apply_settings(self,settings) if "power_on" in settings and settings["power_on"]: self.set_output(True) return self.get_settings()
[docs]class HP8712B(SCPI.SCPIDevice): """ HP8712B Vector Network Analyzer. """ def __init__(self, addr): SCPI.SCPIDevice.__init__(self,addr) self.channel=1 self.data_fmt="<f4" self._add_settings_node("power_on",self.get_output,None) self._add_settings_node("power",self.get_output_level,self.set_output_level) self._add_settings_node("frequency_range",self.get_frequency_range,self.set_frequency_range) self._add_settings_node("bandwidth",self.get_bandwidth,self.set_bandwidth) self._add_settings_node("sweep_points",self.get_sweep_points,self.set_sweep_points) self._add_settings_node("avg",self.get_avg,self.set_avg) self._add_settings_node("channel_format",self.get_channel_format,self.set_channel_format) self._add_settings_node("electrical_delay",self.get_electrical_delay,self.set_electrical_delay) self._add_settings_node("channel_format",self.get_phase_offset,self.set_phase_offset)
[docs] def select_channel(self, channel): self.channel=channel
[docs] def current_channel(self): return self.channel
[docs] def sweep_single(self, wait_type="sync", count=1): if count is None: count=1 self.write(":ABORT;:INIT:CONT OFF;") for _ in range(count): self.write(":INIT") self.wait(wait_type)
[docs] def sweep_reset(self, wait_type="sync"): self.write(":ABORT;:INIT") self.wait(wait_type)
[docs] def sweep_continuous(self, enable=True): self.write(":INIT:CONT",enable) if not enable: self.write(":ABORT")
[docs] def is_continuous(self): return self.ask(":INIT:CONT?","bool")
[docs] def get_output(self): return self.ask(":OUTPUT?","bool")
[docs] def set_output(self, enabled=True): self.write(":OUTPUT",enabled)
[docs] def get_output_level(self): return self.ask(":SOURCE:POWER?","float")
[docs] def set_output_level(self, level): if level is None: self.set_output(False) return None else: self.write("SOURCE:POWER",level) return self.get_output_level()
[docs] def get_avg(self): avg_on=self.ask(":SENSE{0}:AVERAGE?".format(self.channel),"bool") avg_samples=self.ask(":SENSE{0}:AVERAGE:COUNT?".format(self.channel),"int") return (avg_on,avg_samples)
[docs] def restart_avg(self): self.write(":SENSE{0}:AVERAGE:CLEAR".format(self.channel))
[docs] def set_avg(self, avg=None): try: avg_on,avg_samples=avg except TypeError: if (not avg) or avg<=1: avg_on=False avg_samples=None else: avg_on=True avg_samples=None if avg is True else avg self.write(":SENSE{0}:AVERAGE".format(self.channel),avg_on) if avg_samples is not None: self.write(":SENSE{0}:AVERAGE:COUNT".format(self.channel),avg) return self.get_avg()
[docs] def get_frequency_range(self): start=self.ask(":SENSE:FREQ:START?","float") stop=self.ask(":SENSE:FREQ:STOP?","float") return start,stop
[docs] def set_frequency_range(self, frequency): try: start,stop=min(frequency),max(frequency) except TypeError: start,stop=frequency,frequency self.write(":SENSE:FREQ:START {0:E};:SENSE:FREQ:STOP {1:E}".format(start,stop)) return self.get_frequency_range()
[docs] def get_sweep_points(self): return self.ask(":SENSE:SWEEP:POINTS?","int")
[docs] def set_sweep_points(self, pts): self.write(":SENSE:SWEEP:POINTS",int(pts)) return self.get_sweep_points()
[docs] def get_bandwidth(self): return self.ask(":SENSE:BWIDTH?","float")
[docs] def set_bandwidth(self, bwidth): self.write(":SENSE:BWIDTH",bwidth) return self.get_bandwidth()
[docs] def get_channel_format(self): return self.ask(":CALC{0}:FORMAT?".format(self.channel)).lower()
[docs] def set_channel_format(self, chan_fmt): self.write(":CALC{0}:FORMAT {1}".format(self.channel, chan_fmt)) return self.get_channel_format()
[docs] def get_phase_offset(self): return self.ask("SENSE{0}:CORR:OFFS:PHAS?","float")
[docs] def set_phase_offset(self, offset): self.write("SENSE{0}:CORR:OFFS:PHAS",offset) return self.get_phase_offset()
[docs] def get_electrical_delay(self): return self.ask("SENSE{0}:CORR:EDEL:TIME?".format(self.channel),"float")
[docs] def set_electrical_delay(self, delay): self.write("SENSE{0}:CORR:EDEL:TIME".format(self.channel),delay) return self.get_electrical_delay()
[docs] def set_data_format(self, fmt=None): fmt=funcargparse.getdefault(fmt,self.data_fmt) fmt=data_format.DataFormat.from_desc(fmt) if not (fmt.is_ascii() or fmt.to_desc()[1:] in ["f4","f8","i2"]): raise ValueError("Format {0} isn't supported".format(fmt)) self.write(":FORMAT:DATA {0};:FORMAT:BORDER {1}".format(*fmt.to_desc("SCPI")))
[docs] def get_data_format(self): enc=self.ask(":FORMAT:DATA?") border=self.ask(":FORMAT:BORDER?") return data_format.DataFormat.from_desc_SCPI(enc,border).to_desc()
[docs] def request_data(self, source="data", fmt=None): fmt=funcargparse.getdefault(fmt,self.data_fmt) self.set_data_format(fmt) if source=="data": data=self.ask(":TRACE:DATA? CH{0}FDATA".format(self.channel),"raw") elif source=="memory": data=self.ask(":TRACE:DATA? CH{0}FDMEM".format(self.channel),"raw") return self.parse_trace_data(data,fmt)
[docs] def read_sweep(self, transfer_fmt="xy"): original_channel_fmt=self.get_channel_format() if transfer_fmt=="xy": channel_fmts=["real","imag"] elif transfer_fmt=="rp": channel_fmts=["mlin","phase"] else: raise ValueError("unrecognized read format: {0}".format(transfer_fmt)) pts=self.get_sweep_points() data=[] for chf in channel_fmts: self.set_channel_format(chf) trace=self.request_data() if len(trace)!=pts: raise RuntimeError("received data length {0} is not equal to the number of points {1}".format(len(trace),pts)) data.append(trace) freq_range=self.get_frequency_range() freqs=np.linspace(freq_range[0],freq_range[1],pts) data=np.column_stack(( freqs,data[0],data[1] )) self.set_channel_format(original_channel_fmt) return data
[docs] def grab_single_sweep(self, transfer_fmt="xy", count=None): self.wait() if count is None: do_avg,avg_count=self.get_avg() count=avg_count if do_avg else 1 cont=self.is_continuous() self.sweep_single(count=count) data=self.read_sweep(transfer_fmt=transfer_fmt) self.sweep_continuous(cont) return data
[docs] def apply_settings(self, settings): if "power_on" in settings and not settings["power_on"]: self.set_output(False) SCPI.SCPIDevice.apply_settings(self,settings) if "power_on" in settings and settings["power_on"]: self.set_output(True) self.wait() return self.get_settings()
[docs]class HP8722D(SCPI.SCPIDevice): """ HP8722D Vector Network Analyzer. """ def __init__(self, addr): SCPI.SCPIDevice.__init__(self,addr) self.data_fmt="<f4" self._add_settings_node("power_on",self.get_output,None) self._add_settings_node("power",self.get_output_level,self.set_output_level) self._add_settings_node("frequency_range",self.get_frequency_range,self.set_frequency_range) self._add_settings_node("bandwidth",self.get_bandwidth,self.set_bandwidth) self._add_settings_node("sweep_points",self.get_sweep_points,self.set_sweep_points) self._add_settings_node("avg",self.get_avg,self.set_avg) self._add_settings_node("channel_format",self.get_channel_format,self.set_channel_format) self._add_settings_node("measurement",self.get_measurement,self.set_measurement) self._add_settings_node("electrical_delay",self.get_electrical_delay,self.set_electrical_delay) self._add_settings_node("phase_offset",self.get_phase_offset,self.set_phase_offset)
[docs] def select_channel(self, channel): self.write("CHAN{:d}".format(channel)) self.wait_sync()
[docs] def current_channel(self): for ch in range(1,5): if self.ask("CHAN{:d}?".format(ch),bool): return ch return None
_wait_sync_comm="OPC?;NOOP"
[docs] def wait_dev(self): raise NotImplementedError("HP8722D.wait_dev")
[docs] def sweep_single(self, wait_type="sync", count=1): if count is None: count=1 self.write("NUMG",count) self.wait(wait_type)
[docs] def sweep_continuous(self, enable=True): if enable: self.write("CONT") else: self.write("HOLD")
[docs] def is_continuous(self): return self.ask("CONT?","bool")
[docs] def get_output(self): return self.ask("SOUP?","bool")
[docs] def set_output(self, enabled=True): self.write("SOUP",enabled) return self.get_output() if self._setter_echo else None
[docs] def get_output_level(self): return self.ask("POWE?","float")
[docs] def set_output_level(self, level): if level is None: self.set_output(False) return None else: self.write("PWRR ON;POWE",level) return self.get_output_level() if self._setter_echo else None
[docs] def set_measurement(self, meas): self.write(meas) return self.get_measurement() if self._setter_echo else None
[docs] def get_measurement(self): for meas in ["S11","S12","S21","S22"]: if self.ask(meas+"?","bool"): return meas return None
[docs] def get_avg(self): avg_on=self.ask("AVERO?","bool") avg_samples=self.ask("AVERFACT?","int") return (avg_on,avg_samples)
[docs] def restart_avg(self): self.write("AVERREST")
[docs] def set_avg(self, avg=None): try: avg_on,avg_samples=avg except TypeError: if avg is True: avg_on=True avg_samples=None elif (not avg) or avg<=1: avg_on=False avg_samples=None else: avg_on=True avg_samples=avg self.write("AVERO",avg_on) if avg_samples is not None: self.write("AVERFACT",avg) return self.get_avg() if self._setter_echo else None
[docs] def get_frequency_range(self): start=self.ask("STAR?","float") stop=self.ask("STOP?","float") return start,stop
[docs] def set_frequency_range(self, frequency): try: start,stop=min(frequency),max(frequency) except TypeError: start,stop=frequency,frequency self.write("LINFREQ; STAR {0:E};STOP {1:E}".format(start,stop)) return self.get_frequency_range() if self._setter_echo else None
[docs] def get_sweep_points(self): return self.ask("POIN?","int")
[docs] def set_sweep_points(self, pts): self.write("POIN",int(pts)) return self.get_sweep_points() if self._setter_echo else None
[docs] def get_bandwidth(self): return self.ask("IFBW?","float")
[docs] def set_bandwidth(self, bwidth): self.write("IFBW",bwidth) return self.get_bandwidth() if self._setter_echo else None
_channel_formats={"real":"REAL","imag":"IMAG","mlin":"LINM","mlog":"LOGM","phase":"PHAS"}
[docs] def get_channel_format(self): for fmt,comm in self._channel_format.items(): if self.ask(comm+"?","bool"): return fmt return None
[docs] def set_channel_format(self, chan_fmt): self.write(self._channel_formats[chan_fmt]) return self.get_channel_format() if self._setter_echo else None
[docs] def get_phase_offset(self): return self.ask("PHAO?","float")
[docs] def set_phase_offset(self, offset): self.write("PHAO",offset) return self.get_phase_offset() if self._setter_echo else None
[docs] def get_electrical_delay(self): return self.ask("ELED?","float")
[docs] def set_electrical_delay(self, delay): self.write("ELED",delay) return self.get_electrical_delay() if self._setter_echo else None
[docs] def set_data_format(self, fmt=None): fmt=funcargparse.getdefault(fmt,self.data_fmt) fmt=data_format.DataFormat.from_desc(fmt) if fmt.is_ascii(): self.write("FORM4") elif fmt.to_desc()[1:]=="f4": self.write("FORM2") elif fmt.to_desc()[1:]=="f8": self.write("FORM3") else: raise ValueError("Format {0} isn't supported".format(fmt))
[docs] @staticmethod def parse_trace_data(data, fmt): fmt=data_format.DataFormat.from_desc(fmt) if fmt.is_ascii(): return fmt.convert_from_str(data) fmt.byteorder=">" # the only byteorder the device understands if data[:2]!=b"#A": raise ValueError("malformatted data") length=data_format.DataFormat.from_desc(">i2").convert_from_str(data[2:4]) data=data[4:] if len(data)!=length: raise ValueError("data length {0} doesn't agree with declared length {1}".format(len(data),length)) return fmt.convert_from_str(data)
[docs] def request_data(self, fmt=None): fmt=funcargparse.getdefault(fmt,self.data_fmt) self.set_data_format(fmt) data=self.ask("OUTPDATA","raw") return self.parse_trace_data(data,fmt)
[docs] def read_sweep(self): pts=self.get_sweep_points() trace=self.request_data().reshape((-1,2)) freq_range=self.get_frequency_range() freqs=np.linspace(freq_range[0],freq_range[1],pts) ctrace=trace[:,0]+1j*trace[:,1] ctrace=ctrace*np.exp(1j*2*np.pi*(freqs*self.get_electrical_delay()-self.get_phase_offset()/360.)) # manual offset; network analyzer doesn't do it for OUTPDATA data=np.column_stack(( freqs,ctrace.real,ctrace.imag )) return data
[docs] def grab_single_sweep(self, count=None): self.wait() if count is None: do_avg,avg_count=self.get_avg() count=avg_count if do_avg else 1 cont=self.is_continuous() self.sweep_single(count=count) data=self.read_sweep() self.sweep_continuous(cont) return data
[docs] def apply_settings(self, settings): if "power_on" in settings and not settings["power_on"]: self.set_output(False) SCPI.SCPIDevice.apply_settings(self,settings) if "power_on" in settings and settings["power_on"]: self.set_output(True) self.wait()