From dd6fc26d64ee01349e2c991fb6a0aac2cf1572c8 Mon Sep 17 00:00:00 2001 From: Jasper-Harvey0 Date: Fri, 13 Mar 2026 11:02:06 +1100 Subject: [PATCH 1/6] Driver update concept --- src/fixate/drivers/dso/agilent_mso_x.py | 527 +------------ src/fixate/drivers/dso/helper.py | 942 ++++-------------------- 2 files changed, 151 insertions(+), 1318 deletions(-) diff --git a/src/fixate/drivers/dso/agilent_mso_x.py b/src/fixate/drivers/dso/agilent_mso_x.py index 7de7d04e..0d929846 100644 --- a/src/fixate/drivers/dso/agilent_mso_x.py +++ b/src/fixate/drivers/dso/agilent_mso_x.py @@ -1,6 +1,6 @@ import pyvisa from fixate.core.exceptions import InstrumentError -from fixate.drivers.dso.helper import DSO +from fixate.drivers.dso.helper2 import DSO import time @@ -8,9 +8,8 @@ # KEYSIGHT TECHNOLOGIES,DSOX1202G,CN60074190,02.10.2019111333 # KEYSIGHT TECHNOLOGIES,DSO-X 1102G,CN57096441,01.20.2019061038 # AGILENT TECHNOLOGIES,MSO-X 3014A,MY51360314,02.43.2018020635 - - class MSO_X_3000(DSO): + # Regex needs work to detect only the 4 channel scopes we have here: REGEX_ID = "(KEYSIGHT|AGILENT) TECHNOLOGIES,[DM]SO-?X" INSTR_TYPE = "VISA" retrys_on_timeout = 1 @@ -22,523 +21,14 @@ def __init__(self, instrument): self._mode = "STOP" self._wave_acquired = False self._triggers_read = 0 + self.reset() self.instrument.query_delay = 0.2 - self._store = {} - self.api = [ - ("source1.ch1", self.store, {"source1": "CHAN1"}), - ("source1.ch2", self.store, {"source1": "CHAN2"}), - ("source1.ch3", self.store, {"source1": "CHAN3"}), - ("source1.ch4", self.store, {"source1": "CHAN4"}), - ("source1.function", self.store, {"source1": "FUNC"}), - ("source1.math", self.store, {"source1": "MATH"}), - ("source1.wmemory1", self.store, {"source1": "WMEM1"}), - ("source1.wmemory2", self.store, {"source1": "WMEM2"}), - ("source2.ch1", self.store, {"source2": "CHAN1"}), - ("source2.ch2", self.store, {"source2": "CHAN2"}), - ("source2.ch3", self.store, {"source2": "CHAN3"}), - ("source2.ch4", self.store, {"source2": "CHAN4"}), - ("source2.function", self.store, {"source1": "FUNC"}), - ("source2.math", self.store, {"source1": "MATH"}), - ("source2.wmemory1", self.store, {"source1": "WMEM1"}), - ("source2.wmemory2", self.store, {"source1": "WMEM2"}), - ("ch1._call", self.write, "CHAN1:DISP {value:d}"), - ("ch2._call", self.write, "CHAN2:DISP {value:d}"), - ("ch3._call", self.write, "CHAN3:DISP {value:d}"), - ("ch4._call", self.write, "CHAN4:DISP {value:d}"), - ("ch1.scale", self.write, "CHAN1:SCAL {value}"), - ("ch2.scale", self.write, "CHAN2:SCAL {value}"), - ("ch3.scale", self.write, "CHAN3:SCAL {value}"), - ("ch4.scale", self.write, "CHAN4:SCAL {value}"), - ("ch1.offset", self.write, "CHAN1:OFFS {value}"), - ("ch2.offset", self.write, "CHAN2:OFFS {value}"), - ("ch3.offset", self.write, "CHAN3:OFFS {value}"), - ("ch4.offset", self.write, "CHAN4:OFFS {value}"), - ("ch1.coupling.ac", self.write, "CHAN1:COUP AC"), - ("ch2.coupling.ac", self.write, "CHAN2:COUP AC"), - ("ch3.coupling.ac", self.write, "CHAN3:COUP AC"), - ("ch4.coupling.ac", self.write, "CHAN4:COUP AC"), - ("ch1.coupling.dc", self.write, "CHAN1:COUP DC"), - ("ch2.coupling.dc", self.write, "CHAN2:COUP DC"), - ("ch3.coupling.dc", self.write, "CHAN3:COUP DC"), - ("ch4.coupling.dc", self.write, "CHAN4:COUP DC"), - ("ch1.probe.attenuation", self.write, "CHAN1:PROB {value}"), - ("ch2.probe.attenuation", self.write, "CHAN2:PROB {value}"), - ("ch3.probe.attenuation", self.write, "CHAN3:PROB {value}"), - ("ch4.probe.attenuation", self.write, "CHAN4:PROB {value}"), - ("time_base.scale", self.write, "TIM:SCAL {value}"), - ("time_base.position", self.write, "TIM:POS {value}"), - ("trigger.mode.edge._call", self.write, "TRIG:MODE EDGE"), - ("trigger.mode.edge.level", self.write, "TRIG:EDGE:LEVEL {value}"), - ("trigger.mode.edge.source.ch1", self.write, "TRIG:EDGE:SOUR CHAN1"), - ("trigger.mode.edge.source.ch2", self.write, "TRIG:EDGE:SOUR CHAN2"), - ("trigger.mode.edge.source.ch3", self.write, "TRIG:EDGE:SOUR CHAN3"), - ("trigger.mode.edge.source.ch4", self.write, "TRIG:EDGE:SOUR CHAN4"), - ("trigger.mode.edge.slope.rising", self.write, "TRIG:EDGE:SLOPE POS"), - ("trigger.mode.edge.slope.falling", self.write, "TRIG:EDGE:SLOPE NEG"), - ("trigger.mode.edge.slope.either", self.write, "TRIG:EDGE:SLOPE EITH"), - ("trigger.mode.edge.slope.alternating", self.write, "TRIG:EDGE:SLOPE ALT"), - ("trigger.sweep.normal", self.write, "TRIG:SWE NORM"), - ("trigger.sweep.auto", self.write, "TRIG:SWE AUTO"), - ("trigger.coupling.ac", self.write, "TRIG:COUP AC"), - ("trigger.coupling.dc", self.write, "TRIG:COUP DC"), - ("trigger.coupling.lf_reject", self.write, "TRIG:COUP LFR"), - ("trigger.hf_reject", self.write, "TRIG:HFR {value}"), - ("acquire.normal", self.write, "ACQ:TYPE NORM"), - ("acquire.peak_detect", self.write, "ACQ:TYPE PEAK"), - ("acquire.averaging", self.write, "ACQ:TYPE AVER;:ACQ:COUN {value}"), - ("acquire.high_resolution", self.write, "ACQ:TYPE HRES"), - ("events.trigger", self.query_bool, ":TER?"), - # Measure - ( - "measure.delay._call", - self.query_after_acquire, - "MEAS:DEL? {self._store[source1]},{self._store[source2]}", - ), - ( - "measure.define.threshold.percent", - self.write, - "MEAS:DEF THR,PERC,{upper},{middle},{lower}", - ), - ( - "measure.define.threshold.absolute", - self.write, - "MEAS:DEF THR,ABS,{upper},{middle},{lower}", - ), - ("measure.delay.edges.rising.rising", self.write, "MEAS:DEF DEL, +1, +1"), - ("measure.delay.edges.rising.falling", self.write, "MEAS:DEF DEL, +1, -1"), - ("measure.delay.edges.falling.rising", self.write, "MEAS:DEF DEL, -1, +1"), - ("measure.delay.edges.falling.falling", self.write, "MEAS:DEF DEL, -1, -1"), - ( - "measure.phase", - self.query_after_acquire, - "MEAS:PHAS? {self._store[source1]},{self._store[source2]}", - ), - ( - "measure.vratio.cycle", - self.query_after_acquire, - "MEAS:VRAT? CYCL,{self._store[source1]},{self._store[source2]}", - ), - ( - "measure.vratio.display", - self.query_after_acquire, - "MEAS:VRAT? DISP,{self._store[source1]},{self._store[source2]}", - ), - # Ch1 Measure - ("measure.counter.ch1", self.query_after_acquire, "MEAS:COUN? CHAN1"), - ("measure.duty.ch1", self.query_after_acquire, "MEAS:DUTY? CHAN1"), - ("measure.fall_time.ch1", self.query_after_acquire, "MEAS:FALL? CHAN1"), - ("measure.rise_time.ch1", self.query_after_acquire, "MEAS:RIS? CHAN1"), - ("measure.frequency.ch1", self.query_after_acquire, "MEAS:FREQ? CHAN1"), - ( - "measure.cnt_edge_rising.ch1", - self.query_after_acquire, - "MEAS:NEDG? CHAN1", - ), - ( - "measure.cnt_edge_falling.ch1", - self.query_after_acquire, - "MEAS:PEDG? CHAN1", - ), - ( - "measure.cnt_pulse_positive.ch1", - self.query_after_acquire, - "MEAS:NPUL? CHAN1", - ), - ( - "measure.cnt_pulse_negative.ch1", - self.query_after_acquire, - "MEAS:PPUL? CHAN1", - ), - ("measure.period.ch1", self.query_after_acquire, "MEAS:PER? CHAN1"), - ("measure.pulse_width.ch1", self.query_after_acquire, "MEAS:PWID? CHAN1"), - ("measure.vamplitude.ch1", self.query_after_acquire, "MEAS:VAMP? CHAN1"), - ( - "measure.vaverage.cycle.ch1", - self.query_after_acquire, - "MEAS:VAV? CYCL,CHAN1", - ), - ( - "measure.vaverage.display.ch1", - self.query_after_acquire, - "MEAS:VAV? DISP,CHAN1", - ), - ("measure.vbase.ch1", self.query_after_acquire, "MEAS:VBAS? CHAN1"), - ("measure.vtop.ch1", self.query_after_acquire, "MEAS:VTOP? CHAN1"), - ("measure.vmax.ch1", self.query_after_acquire, "MEAS:VMAX? CHAN1"), - ("measure.vmin.ch1", self.query_after_acquire, "MEAS:VMIN? CHAN1"), - ("measure.vpp.ch1", self.query_after_acquire, "MEAS:VPP? CHAN1"), - ( - "measure.vrms.dc.cycle.ch1", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,CHAN1", - ), - ( - "measure.vrms.dc.display.ch1", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,CHAN1", - ), - ( - "measure.vrms.ac.cycle.ch1", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,CHAN1", - ), - ( - "measure.vrms.ac.display.ch1", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,CHAN1", - ), - ("measure.xmax.ch1", self.query_after_acquire, "MEAS:XMAX? CHAN1"), - ("measure.xmin.ch1", self.query_after_acquire, "MEAS:XMIN? CHAN1"), - # Ch2 Measure - ("measure.counter.ch2", self.query_after_acquire, "MEAS:COUN? CHAN2"), - ("measure.duty.ch2", self.query_after_acquire, "MEAS:DUTY? CHAN2"), - ("measure.rise_time.ch2", self.query_after_acquire, "MEAS:RIS? CHAN2"), - ("measure.fall_time.ch2", self.query_after_acquire, "MEAS:FALL? CHAN2"), - ("measure.frequency.ch2", self.query_after_acquire, "MEAS:FREQ? CHAN2"), - ( - "measure.cnt_edge_rising.ch2", - self.query_after_acquire, - "MEAS:NEDG? CHAN2", - ), - ( - "measure.cnt_edge_falling.ch2", - self.query_after_acquire, - "MEAS:PEDG? CHAN2", - ), - ( - "measure.cnt_pulse_positive.ch2", - self.query_after_acquire, - "MEAS:NPUL? CHAN2", - ), - ( - "measure.cnt_pulse_negative.ch2", - self.query_after_acquire, - "MEAS:PPUL? CHAN2", - ), - ("measure.period.ch2", self.query_after_acquire, "MEAS:PER? CHAN2"), - ("measure.pulse_width.ch2", self.query_after_acquire, "MEAS:PWID? CHAN2"), - ("measure.vamplitude.ch2", self.query_after_acquire, "MEAS:VAMP? CHAN2"), - ( - "measure.vaverage.cycle.ch2", - self.query_after_acquire, - "MEAS:VAV? CYCL,CHAN2", - ), - ( - "measure.vaverage.display.ch2", - self.query_after_acquire, - "MEAS:VAV? DISP,CHAN2", - ), - ("measure.vbase.ch2", self.query_after_acquire, "MEAS:VBAS? CHAN2"), - ("measure.vtop.ch2", self.query_after_acquire, "MEAS:VTOP? CHAN2"), - ("measure.vmax.ch2", self.query_after_acquire, "MEAS:VMAX? CHAN2"), - ("measure.vmin.ch2", self.query_after_acquire, "MEAS:VMIN? CHAN2"), - ("measure.vpp.ch2", self.query_after_acquire, "MEAS:VPP? CHAN2"), - ( - "measure.vrms.dc.cycle.ch2", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,CHAN2", - ), - ( - "measure.vrms.dc.display.ch2", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,CHAN2", - ), - ( - "measure.vrms.ac.cycle.ch2", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,CHAN2", - ), - ( - "measure.vrms.ac.display.ch2", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,CHAN2", - ), - ("measure.xmax.ch2", self.query_after_acquire, "MEAS:XMAX? CHAN2"), - ("measure.xmin.ch2", self.query_after_acquire, "MEAS:XMIN? CHAN2"), - # Ch3 Measure - ("measure.counter.ch3", self.query_after_acquire, "MEAS:COUN? CHAN3"), - ("measure.duty.ch3", self.query_after_acquire, "MEAS:DUTY? CHAN3"), - ("measure.fall_time.ch3", self.query_after_acquire, "MEAS:FALL? CHAN3"), - ("measure.rise_time.ch3", self.query_after_acquire, "MEAS:RIS? CHAN3"), - ("measure.frequency.ch3", self.query_after_acquire, "MEAS:FREQ? CHAN3"), - ( - "measure.cnt_edge_rising.ch3", - self.query_after_acquire, - "MEAS:NEDG? CHAN3", - ), - ( - "measure.cnt_edge_falling.ch3", - self.query_after_acquire, - "MEAS:PEDG? CHAN3", - ), - ( - "measure.cnt_pulse_positive.ch3", - self.query_after_acquire, - "MEAS:NPUL? CHAN3", - ), - ( - "measure.cnt_pulse_negative.ch3", - self.query_after_acquire, - "MEAS:PPUL? CHAN3", - ), - ("measure.period.ch3", self.query_after_acquire, "MEAS:PER? CHAN3"), - ("measure.pulse_width.ch3", self.query_after_acquire, "MEAS:PWID? CHAN3"), - ("measure.vamplitude.ch3", self.query_after_acquire, "MEAS:VAMP? CHAN3"), - ( - "measure.vaverage.cycle.ch3", - self.query_after_acquire, - "MEAS:VAV? CYCL,CHAN3", - ), - ( - "measure.vaverage.display.ch3", - self.query_after_acquire, - "MEAS:VAV? DISP,CHAN3", - ), - ("measure.vbase.ch3", self.query_after_acquire, "MEAS:VBAS? CHAN3"), - ("measure.vtop.ch3", self.query_after_acquire, "MEAS:VTOP? CHAN3"), - ("measure.vmax.ch3", self.query_after_acquire, "MEAS:VMAX? CHAN3"), - ("measure.vmin.ch3", self.query_after_acquire, "MEAS:VMIN? CHAN3"), - ("measure.vpp.ch3", self.query_after_acquire, "MEAS:VPP? CHAN3"), - ( - "measure.vrms.dc.cycle.ch3", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,CHAN3", - ), - ( - "measure.vrms.dc.display.ch3", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,CHAN3", - ), - ( - "measure.vrms.ac.cycle.ch3", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,CHAN3", - ), - ( - "measure.vrms.ac.display.ch3", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,CHAN3", - ), - ("measure.xmax.ch3", self.query_after_acquire, "MEAS:XMAX? CHAN3"), - ("measure.xmin.ch3", self.query_after_acquire, "MEAS:XMIN? CHAN3"), - # Ch4 Measure - ("measure.counter.ch4", self.query_after_acquire, "MEAS:COUN? CHAN4"), - ("measure.duty.ch4", self.query_after_acquire, "MEAS:DUTY? CHAN4"), - ("measure.fall_time.ch4", self.query_after_acquire, "MEAS:FALL? CHAN4"), - ("measure.rise_time.ch4", self.query_after_acquire, "MEAS:RIS? CHAN4"), - ("measure.frequency.ch4", self.query_after_acquire, "MEAS:FREQ? CHAN4"), - ( - "measure.cnt_edge_rising.ch4", - self.query_after_acquire, - "MEAS:NEDG? CHAN4", - ), - ( - "measure.cnt_edge_falling.ch4", - self.query_after_acquire, - "MEAS:PEDG? CHAN4", - ), - ( - "measure.cnt_pulse_positive.ch4", - self.query_after_acquire, - "MEAS:NPUL? CHAN4", - ), - ( - "measure.cnt_pulse_negative.ch4", - self.query_after_acquire, - "MEAS:PPUL? CHAN4", - ), - ("measure.period.ch4", self.query_after_acquire, "MEAS:PER? CHAN4"), - ("measure.pulse_width.ch4", self.query_after_acquire, "MEAS:PWID? CHAN4"), - ("measure.vamplitude.ch4", self.query_after_acquire, "MEAS:VAMP? CHAN4"), - ( - "measure.vaverage.cycle.ch4", - self.query_after_acquire, - "MEAS:VAV? CYCL,CHAN4", - ), - ( - "measure.vaverage.display.ch4", - self.query_after_acquire, - "MEAS:VAV? DISP,CHAN4", - ), - ("measure.vbase.ch4", self.query_after_acquire, "MEAS:VBAS? CHAN4"), - ("measure.vtop.ch4", self.query_after_acquire, "MEAS:VTOP? CHAN4"), - ("measure.vmax.ch4", self.query_after_acquire, "MEAS:VMAX? CHAN4"), - ("measure.vmin.ch4", self.query_after_acquire, "MEAS:VMIN? CHAN4"), - ("measure.vpp.ch4", self.query_after_acquire, "MEAS:VPP? CHAN4"), - ( - "measure.vrms.dc.cycle.ch4", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,CHAN4", - ), - ( - "measure.vrms.dc.display.ch4", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,CHAN4", - ), - ( - "measure.vrms.ac.cycle.ch4", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,CHAN4", - ), - ( - "measure.vrms.ac.display.ch4", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,CHAN4", - ), - ("measure.xmax.ch4", self.query_after_acquire, "MEAS:XMAX? CHAN4"), - ("measure.xmin.ch4", self.query_after_acquire, "MEAS:XMIN? CHAN4"), - # Function Measure - ("measure.duty.function", self.query_after_acquire, "MEAS:DUTY? FUNC"), - ("measure.fall_time.function", self.query_after_acquire, "MEAS:FALL? FUNC"), - ("measure.rise_time.function", self.query_after_acquire, "MEAS:RIS? FUNC"), - ("measure.frequency.function", self.query_after_acquire, "MEAS:FREQ? FUNC"), - ( - "measure.cnt_edge_rising.function", - self.query_after_acquire, - "MEAS:NEDG? FUNC", - ), - ( - "measure.cnt_edge_falling.function", - self.query_after_acquire, - "MEAS:PEDG? FUNC", - ), - ( - "measure.cnt_pulse_positive.function", - self.query_after_acquire, - "MEAS:NPUL? FUNC", - ), - ( - "measure.cnt_pulse_negative.function", - self.query_after_acquire, - "MEAS:PPUL? FUNC", - ), - ("measure.period.function", self.query_after_acquire, "MEAS:PER? FUNC"), - ( - "measure.pulse_width.function", - self.query_after_acquire, - "MEAS:PWID? FUNC", - ), - ( - "measure.vamplitude.function", - self.query_after_acquire, - "MEAS:VAMP? FUNC", - ), - ( - "measure.vaverage.cycle.function", - self.query_after_acquire, - "MEAS:VAV? CYCL,FUNC", - ), - ( - "measure.vaverage.display.function", - self.query_after_acquire, - "MEAS:VAV? DISP,FUNC", - ), - ("measure.vbase.function", self.query_after_acquire, "MEAS:VBAS? FUNC"), - ("measure.vtop.function", self.query_after_acquire, "MEAS:VTOP? FUNC"), - ("measure.vmax.function", self.query_after_acquire, "MEAS:VMAX? FUNC"), - ("measure.vmin.function", self.query_after_acquire, "MEAS:VMIN? FUNC"), - ("measure.vpp.function", self.query_after_acquire, "MEAS:VPP? FUNC"), - ( - "measure.vrms.dc.cycle.function", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,FUNC", - ), - ( - "measure.vrms.dc.display.function", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,FUNC", - ), - ( - "measure.vrms.ac.cycle.function", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,FUNC", - ), - ( - "measure.vrms.ac.display.function", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,FUNC", - ), - ("measure.xmax.function", self.query_after_acquire, "MEAS:XMAX? FUNC"), - ("measure.xmin.function", self.query_after_acquire, "MEAS:XMIN? FUNC"), - # MATH,Measure - ("measure.duty.math", self.query_after_acquire, "MEAS:DUTY? MATH"), - ("measure.fall_time.math", self.query_after_acquire, "MEAS:FALL? MATH"), - ("measure.rise_time.math", self.query_after_acquire, "MEAS:RIS? MATH"), - ("measure.frequency.math", self.query_after_acquire, "MEAS:FREQ? MATH"), - ( - "measure.cnt_edge_rising.math", - self.query_after_acquire, - "MEAS:NEDG? MATH", - ), - ( - "measure.cnt_edge_falling.math", - self.query_after_acquire, - "MEAS:PEDG? MATH", - ), - ( - "measure.cnt_pulse_positive.math", - self.query_after_acquire, - "MEAS:NPUL? MATH", - ), - ( - "measure.cnt_pulse_negative.math", - self.query_after_acquire, - "MEAS:PPUL? MATH", - ), - ("measure.period.math", self.query_after_acquire, "MEAS:PER? MATH"), - ("measure.pulse_width.math", self.query_after_acquire, "MEAS:PWID? MATH"), - ("measure.vamplitude.math", self.query_after_acquire, "MEAS:VAMP? MATH"), - ( - "measure.vaverage.cycle.math", - self.query_after_acquire, - "MEAS:VAV? CYCL,MATH", - ), - ( - "measure.vaverage.display.math", - self.query_after_acquire, - "MEAS:VAV? DISP,MATH", - ), - ("measure.vbase.math", self.query_after_acquire, "MEAS:VBAS? MATH"), - ("measure.vtop.math", self.query_after_acquire, "MEAS:VTOP? MATH"), - ("measure.vmax.math", self.query_after_acquire, "MEAS:VMAX? MATH"), - ("measure.vmin.math", self.query_after_acquire, "MEAS:VMIN? MATH"), - ("measure.vpp.math", self.query_after_acquire, "MEAS:VPP? MATH"), - ( - "measure.vrms.dc.cycle.math", - self.query_after_acquire, - "MEAS:VRMS? CYCL,DC,MATH", - ), - ( - "measure.vrms.dc.display.math", - self.query_after_acquire, - "MEAS:VRMS? DISP,DC,MATH", - ), - ( - "measure.vrms.ac.cycle.math", - self.query_after_acquire, - "MEAS:VRMS? CYCL,AC,MATH", - ), - ( - "measure.vrms.ac.display.math", - self.query_after_acquire, - "MEAS:VRMS? DISP,AC,MATH", - ), - ("measure.xmax.math", self.query_after_acquire, "MEAS:XMAX? MATH"), - ("measure.xmin.math", self.query_after_acquire, "MEAS:XMIN? MATH"), - ] - self.init_api() self.instrument.timeout = 1000 - def single(self): + def single(self) -> None: """ - Sets up the oscilliscope for a single shot triggered measurement - Does multiple steps to ensure that at the end of this call that the oscilloscope is primed for a trigger - 1. Sets mode to stop to ensure that only one measurement will occur - 2. Clears the status registers - 3. Sets the mode to single - 4. Monitors the registers until the trigger is armed - :return: + Specific implementation of the single trigger setup for Agilent MSO-X """ self._triggers_read = 0 self._raise_if_error() # Raises if any errors were made during setup @@ -941,13 +431,6 @@ def _raise_if_error(self): ) ) - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.instrument.close() - self.is_connected = False - def write(self, base_str, *args, **kwargs): formatted_string = self._format_string(base_str, **kwargs) self._write(formatted_string) diff --git a/src/fixate/drivers/dso/helper.py b/src/fixate/drivers/dso/helper.py index bda2fb78..71deebe8 100644 --- a/src/fixate/drivers/dso/helper.py +++ b/src/fixate/drivers/dso/helper.py @@ -1,829 +1,179 @@ import inspect from abc import ABCMeta, abstractmethod -from functools import update_wrapper -from fixate.core.exceptions import InstrumentFeatureUnavailable +from typing import Callable, Union -import typing +from fixate.core.exceptions import InstrumentError, InstrumentFeatureUnavailable -number = typing.Union[float, int] +number = Union[float, int] +""" +Callbacks +Write and query callbacks get passed down the tree of class variables. -class CallableNoArgs: - def __call__(self): - return self._call() +They are then used to facilitate a single point of communication between +the scope and the PC. This has the benefit of being able to mock the interface. +ie we can create a mock DSO that inherts the base and redefines functions. +""" +WriteCallback = Callable[[str], None] +QueryCallback = Callable[[str], str] - def _call(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) +class Channel: + """ + DSO Channel implementation + """ + + def __init__( + self, channel_name: str, write: WriteCallback, enabled: bool = True + ) -> None: + self._ch_name = channel_name + self._write = write + # Set enabled = False if the DSO does not have this channel + self._ch_cmd = f"CHAN{self._ch_name}" + self._enabled = enabled + # Maybe some fancy thing here to set all methods to raise an error if not enabled. -class CallableBool: def __call__(self, value: bool): - return self._call(value) - - def _call(self, value: bool): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class SourcesCh: - def ch1(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def ch2(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def ch3(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def ch4(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class SourcesSpecial: - def function(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def math(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class SourcesWMem: - def wmemory1(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def wmemory2(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class SourcesExt: - def external(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class SourcesDig: - def d0(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d1(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d2(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d3(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d4(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d5(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d6(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d7(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d8(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d9(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d10(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d11(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d12(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d13(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d14(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def d15(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class MeasureAllSources( - SourcesCh, SourcesSpecial, SourcesWMem, SourcesDig, CallableNoArgs -): - pass - - -class TrigSources(SourcesCh, SourcesExt, SourcesDig): - pass - - -class MultiMeasureSources(MeasureAllSources): - def __init__(self): - self.ch1 = MeasureAllSources() - self.ch1 = MeasureAllSources() - self.ch2 = MeasureAllSources() - self.ch3 = MeasureAllSources() - self.ch4 = MeasureAllSources() - self.function = MeasureAllSources() - self.math = MeasureAllSources() - self.wmemory1 = MeasureAllSources() - self.wmemory2 = MeasureAllSources() - self.external = MeasureAllSources() - self.d0 = MeasureAllSources() - self.d1 = MeasureAllSources() - self.d2 = MeasureAllSources() - self.d3 = MeasureAllSources() - self.d4 = MeasureAllSources() - self.d5 = MeasureAllSources() - self.d6 = MeasureAllSources() - self.d7 = MeasureAllSources() - self.d8 = MeasureAllSources() - self.d9 = MeasureAllSources() - self.d10 = MeasureAllSources() - self.d11 = MeasureAllSources() - self.d12 = MeasureAllSources() - self.d13 = MeasureAllSources() - self.d14 = MeasureAllSources() - self.d15 = MeasureAllSources() + self._write(f"CHAN1:DISP {int(value)}") + + def scale(self, value: number) -> None: + self._write(f"{self._ch_cmd}:SCAL {value}") + + def offset(self, value: number) -> None: + self._write(f"{self._ch_cmd}:OFFS {value}") + + def bandwidth(self, value: number) -> None: + raise InstrumentError("This function is not implemented") + + def bandwidth_limit(self, value: bool) -> None: + raise InstrumentError("This function is not implemented") + + def impedance(self, value: number) -> None: + raise InstrumentError("This function is not implemented") + + def invert(self, value: bool) -> None: + raise InstrumentError("This function is not implemented") class Coupling: - def ac(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def dc(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def lf_reject(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def tv(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class Probe: - def attenuation(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class VerticalUnits: - def volts(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def amps(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class ChannelBase(CallableBool): - def __init__(self, channel_name: str): - self._ch_name = channel_name - # self.waveform = Waveform() - # self.modulate = Modulate() - # self.burst = Burst() - # self.load = Load() - self.coupling = Coupling() - self.probe = Probe() - self.units = VerticalUnits() - - def bandwidth(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def bandwidth_limit(self, value: bool): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def impedance(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def invert(self, value: bool): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def offset(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def scale(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) + """ + Defines the coupling interface + """ + + def __init__(self, write: WriteCallback, cmd_prefix: str): + self._write = write + self._cmd_prefix = cmd_prefix + + +class TriggerCoupling(Coupling): + """ + Extends the Coupling interface with extra functions that the trigger has + """ + + def extra_implementation_stuff(self): + pass + + +class TriggerSweep: + """ + Define the sweep interface on the trigger + """ + + def __init__(self, write: WriteCallback): + self._write = write + + +class TriggerMode: + """ + Define the trigger mode interface + """ + + def __init__(self, write: WriteCallback): + self._write = write class Trigger: - def __init__(self): - self.mode = TrigMode() - self.delay = None - self.eburst = None - self.coupling = Coupling() - self.sweep = TrigSweep() - - def force(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def hf_reject(self, value: bool): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def hold_off(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def n_reject(self, value: bool): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class TrigSweep: - def auto(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def normal(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class TrigLevel: - def high(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def low(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class TrigMode: - def __init__(self): - self.edge = TrigEdge() - - -class TrigEdge(CallableNoArgs): - def __init__(self): - self.source = TrigSources() - self.slope = Slopes() - - def level(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class TrigReject: - def off(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def lf(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def hf(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class Slopes: - def rising(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def falling(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def alternating(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def either(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class Acquire: - def __init__(self): - self.mode = AcquireMode() - - def normal(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def peak_detect(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def averaging(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def high_resolution(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class AcquireMode: - def rtim(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def segm(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class Timebase: - def __init__(self): - self.mode = TimebaseMode() - - def position(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def scale(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class TimebaseMode: - def main(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def window(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def xy(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def roll(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class Events: - def trigger(self): - """ - Indicates if a trigger event has occurred. - Calls to this will clear the existing trigger events - :return: - """ - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class MeasureInterval: - def __init__(self): - self.cycle = MeasureAllSources() - self.display = MeasureAllSources() - - -class MeasureIntervalMultipleSources: - def __init__(self): - self.cycle = MultiMeasureSources() - self.display = MultiMeasureSources() - - -class MeasureRMS: - def __init__(self): - self.dc = MeasureInterval() - self.ac = MeasureInterval() - - -class Threshold: - def percent(self, upper: number, middle: number, lower: number): - """ - :param upper: Upper Threshold - :param middle: Middle Threshold - :param lower: Lower Threshold - :return: - """ - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def absolute(self, upper: number, middle: number, lower: number): - """ - :param upper: Upper Threshold - :param middle: Middle Threshold - :param lower: Lower Threshold - :return: - """ - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - -class Define: - def __init__(self): - super().__init__() - self.threshold = Threshold() - - -class Delay(CallableNoArgs): - def __init__(self): - super().__init__() - self.edges = MultiSlopes() - - -class Measure: - def __init__(self): - self.counter = MeasureAllSources() - self.define = Define() - self.delay = Delay() - self.duty = MeasureAllSources() - self.fall_time = MeasureAllSources() - self.rise_time = MeasureAllSources() - self.frequency = MeasureAllSources() - self.cnt_edge_rising = MeasureAllSources() - self.cnt_edge_falling = MeasureAllSources() - self.cnt_pulse_positive = MeasureAllSources() - self.cnt_pulse_negative = MeasureAllSources() - self.period = MeasureAllSources() - self.phase = MultiMeasureSources() - self.pulse_width = MeasureAllSources() - self.vamplitude = MeasureAllSources() - self.vaverage = MeasureInterval() - self.vbase = MeasureAllSources() - self.vtop = MeasureAllSources() - self.vmax = MeasureAllSources() - self.vmin = MeasureAllSources() - self.vpp = MeasureAllSources() - self.vratio = MeasureIntervalMultipleSources() - self.vrms = MeasureRMS() - self.xmax = MeasureAllSources() - self.xmin = MeasureAllSources() - - -class MultiSlopes(CallableNoArgs): - def __init__(self): - super().__init__() - self.rising = Slopes() - self.falling = Slopes() - self.alternating = Slopes() - self.either = Slopes() + """ + Trigger base class + """ + + def __init__(self, write: WriteCallback): + self._write = write + self.mode = TriggerMode(write) + self.coupling = TriggerCoupling(write, "TRIG") + self.sweep = TriggerSweep(write) class DSO(metaclass=ABCMeta): - REGEX_ID = "DSO" + """ + DSO Base class. + You don't know what version of the scope you will get until run time. + The regex id is going to be the defining factor as to how many channels the DSO has. + + We can assume for the helper class however that all the functionality is available. + On runtime, the correct instrument is then selected and will error if its not implemented. + """ + + REGEX_ID: str = "DSO" + INSTR_TYPE: str = "VISA" def __init__(self, instrument): self.instrument = instrument self.samples = 1 - self.api = [] - self.ch1 = ChannelBase("1") - self.ch2 = ChannelBase("2") - self.ch3 = ChannelBase("3") - self.ch4 = ChannelBase("4") - self.chmath = ChannelBase("math") - self.chfunc = ChannelBase("func") - self.coupling = Coupling() - self.probe = Probe() - self.source1 = MultiMeasureSources() - self.source2 = MultiMeasureSources() - self.trigger = Trigger() - self.time_base = Timebase() - self.acquire = Acquire() - self.measure = Measure() - self.events = Events() - @abstractmethod - def acquire(self, acquire_type, averaging_samples): - pass + # Callbacks: + _write = self._write_cmd + _query_after_acquire = self._query_after_acquire_cmd + _query_bool = self._query_bool_cmd - @abstractmethod - def waveform_values(self, source, filename): - pass + # Retain for compatibility: + self._store: dict = {} - @abstractmethod - def reset(self): - pass + # ------------------------- + # Construct the API: + # ------------------------- + self.ch1 = Channel("1", _write) + self.ch2 = Channel("2", _write) + self.ch3 = Channel("3", _write) + self.ch4 = Channel("4", _write) - @abstractmethod - def auto_scale(self): - pass + self.trigger = Trigger(_write) + # ... More stuff goes here. This will copy the original structure of the api dictionary @abstractmethod - def save_setup(self, file_name): - pass + def wait_for_acquire(self) -> None: + """Block until the current acquisition is complete.""" + + # Query and write functions: + def _write_cmd(self, cmd: str) -> None: + """Write a SCPI command and raise on instrument error.""" + self.instrument.write(cmd) + self._raise_if_error() + + def _query_bool_cmd(self, cmd: str) -> bool: + values = self.instrument.query_ascii_values(cmd) + self._raise_if_error() + return bool(values[0]) + + def _query_after_acquire_cmd(self, cmd: str) -> float: + """Wait for a complete acquisition then query.""" + self.wait_for_acquire() + try: + result = self.instrument.query_ascii_values(cmd)[0] + except Exception: + self.instrument.close() + self.instrument.open() + raise + return result @abstractmethod - def load_setup(self, file_name): - pass + def _raise_if_error(self) -> None: + """Clean error queue and raise InstrumentError if any exist.""" - @abstractmethod - def get_identity(self): - pass + # Context management: + def __enter__(self): + return self - def run(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def single(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def stop(self): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def scale(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def offset(self, value: number): - raise InstrumentFeatureUnavailable( - "{} not available on this device".format( - inspect.currentframe().f_code.co_name - ) - ) - - def init_api(self): - for func_str, handler, base_str in self.api: - *parents, func = func_str.split(".") - parent_obj = self - for parent in parents: - parent_obj = getattr(parent_obj, parent) - func_obc = getattr(parent_obj, func) - setattr(parent_obj, func, self.prepare_string(func_obc, handler, base_str)) - - def prepare_string(self, func, handler, base_str, *args, **kwargs): - def temp_func(*nargs, **nkwargs): - """ - Only formats using **nkwargs - New Temp - :param nargs: - :param nkwargs: - :return: - """ - sig = inspect.signature(func) - keys = [itm[0] for itm in sig.parameters.items()] - for index, param in enumerate(nargs): - nkwargs[keys[index]] = param - # new_str = base_str.format(**nkwargs) - # handler(self, new_str) - return handler(base_str, **nkwargs) - - return update_wrapper(temp_func, func) + def __exit__(self, exc_type, exc_val, exc_tb): + self.instrument.close() + self.is_connected = False From 318852c253d7a8576b3d42053522a769c0842659 Mon Sep 17 00:00:00 2001 From: Jasper-Harvey0 Date: Fri, 13 Mar 2026 11:15:06 +1100 Subject: [PATCH 2/6] What to do if you have a 2 channel scope. --- src/fixate/drivers/dso/agilent_mso_x.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/fixate/drivers/dso/agilent_mso_x.py b/src/fixate/drivers/dso/agilent_mso_x.py index 0d929846..7564c407 100644 --- a/src/fixate/drivers/dso/agilent_mso_x.py +++ b/src/fixate/drivers/dso/agilent_mso_x.py @@ -9,7 +9,7 @@ # KEYSIGHT TECHNOLOGIES,DSO-X 1102G,CN57096441,01.20.2019061038 # AGILENT TECHNOLOGIES,MSO-X 3014A,MY51360314,02.43.2018020635 class MSO_X_3000(DSO): - # Regex needs work to detect only the 4 channel scopes we have here: + # Regex needs work to detect only the 4 channel scopes we have: REGEX_ID = "(KEYSIGHT|AGILENT) TECHNOLOGIES,[DM]SO-?X" INSTR_TYPE = "VISA" retrys_on_timeout = 1 @@ -26,6 +26,11 @@ def __init__(self, instrument): self.instrument.query_delay = 0.2 self.instrument.timeout = 1000 + # If, for example this does not have 4 channels, we can do: + # We would actually call some function like not_availalbe() that would raise a better error + self.ch3 = lambda: 1 / 0 + self.ch4 = lambda: 1 / 0 + def single(self) -> None: """ Specific implementation of the single trigger setup for Agilent MSO-X From 7539e900b6d1c4532cb49512e2bbb8fdb31a13c4 Mon Sep 17 00:00:00 2001 From: Jasper-Harvey0 Date: Fri, 13 Mar 2026 11:16:31 +1100 Subject: [PATCH 3/6] Fix import --- src/fixate/drivers/dso/agilent_mso_x.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fixate/drivers/dso/agilent_mso_x.py b/src/fixate/drivers/dso/agilent_mso_x.py index 7564c407..9e89654f 100644 --- a/src/fixate/drivers/dso/agilent_mso_x.py +++ b/src/fixate/drivers/dso/agilent_mso_x.py @@ -1,6 +1,6 @@ import pyvisa from fixate.core.exceptions import InstrumentError -from fixate.drivers.dso.helper2 import DSO +from fixate.drivers.dso.helper import DSO import time From 6526af64a8f2242021a15d54b388c669bd221b6c Mon Sep 17 00:00:00 2001 From: Jasper-Harvey0 Date: Mon, 16 Mar 2026 13:09:54 +1100 Subject: [PATCH 4/6] Fleshing out the API classes --- src/fixate/drivers/dso/helper.py | 206 +++++++++++++++++++++++++++---- 1 file changed, 184 insertions(+), 22 deletions(-) diff --git a/src/fixate/drivers/dso/helper.py b/src/fixate/drivers/dso/helper.py index 71deebe8..03cb87b4 100644 --- a/src/fixate/drivers/dso/helper.py +++ b/src/fixate/drivers/dso/helper.py @@ -1,3 +1,4 @@ +from _typeshed import SupportsDunderGE import inspect from abc import ABCMeta, abstractmethod from typing import Callable, Union @@ -17,6 +18,48 @@ WriteCallback = Callable[[str], None] QueryCallback = Callable[[str], str] +# Prompt that a featrue is not available: +def unavailable(name: str | None = None): + label = name or inspect.stack()[1].function + return InstrumentFeatureUnavailable(f"{label} not available on this device") + + +class SourceSelector: + """ + Retain compatibility of source selection from the old API + dso.source1.ch1() for example will set the source1.source = "CHAN1" + This is then queried by the two source measurement functions like 'delay' + You must set the sources before calling a multi measurement function. + """ + + def __init__(self): + # Selected source (CHAN1, CHAN2, ... ,FUNC, MATH, WMEM1, WMEM2 + self.source: str | None = None + + def ch1(self) -> None: + self.source = "CHAN1" + + def ch2(self) -> None: + self.source = "CHAN2" + + def ch3(self) -> None: + self.source = "CHAN3" + + def ch4(self) -> None: + self.source = "CHAN4" + + def function(self) -> None: + self.source = "FUNC" + + def math(self) -> None: + self.source = "MATH" + + def wmemory1(self) -> None: + self.source = "WMEM1" + + def wmemory2(self) -> None: + self.source = "WMEM2" + class Channel: """ @@ -26,33 +69,34 @@ class Channel: def __init__( self, channel_name: str, write: WriteCallback, enabled: bool = True ) -> None: + # Set enabled to false to note a device does not have this channel + self.enabled = enabled self._ch_name = channel_name self._write = write - # Set enabled = False if the DSO does not have this channel self._ch_cmd = f"CHAN{self._ch_name}" - self._enabled = enabled - # Maybe some fancy thing here to set all methods to raise an error if not enabled. + + self.coupling = Coupling(write, self._ch_cmd) + self.probe = Probe(write, self._ch_cmd) + + def _check_en(self): + if not self.enabled: + raise unavailable(f"{self._ch_cmd} is not available on this device") def __call__(self, value: bool): + self._check_en() self._write(f"CHAN1:DISP {int(value)}") def scale(self, value: number) -> None: + self._check_en() self._write(f"{self._ch_cmd}:SCAL {value}") def offset(self, value: number) -> None: + self._check_en() self._write(f"{self._ch_cmd}:OFFS {value}") - def bandwidth(self, value: number) -> None: - raise InstrumentError("This function is not implemented") - - def bandwidth_limit(self, value: bool) -> None: - raise InstrumentError("This function is not implemented") - - def impedance(self, value: number) -> None: - raise InstrumentError("This function is not implemented") - def invert(self, value: bool) -> None: - raise InstrumentError("This function is not implemented") + self._check_en() + self._write(f"{self._ch_cmd}:INV {int(value)}") class Coupling: @@ -64,24 +108,54 @@ def __init__(self, write: WriteCallback, cmd_prefix: str): self._write = write self._cmd_prefix = cmd_prefix + def ac(self) -> None: + # AC Coupling + self._write(f"{self._cmd_prefix}:COUP AC") + + def dc(self) -> None: + # DC Coupling + self._write(f"{self._cmd_prefix}:COUP DC") + + +class Probe: + def __init__(self, write: WriteCallback, cmd_prefix: str): + self._write = write + self._cmd_prefix = cmd_prefix + + def attenuation(self, attenuation: number) -> None: + self._write(f"{self._cmd_prefix}:PROB {attenuation}") + + +class TimeBase: + def __init__(self, write: WriteCallback): + self._write = write + + def scale(self, value: number) -> None: + self._write(f"TIM:SCAL {value}") + + def position(self, value: number) -> None: + self._write(f"TIM:POS {value}") + class TriggerCoupling(Coupling): """ Extends the Coupling interface with extra functions that the trigger has """ - def extra_implementation_stuff(self): - pass + def lf_reject(self) -> None: + self._write(f"{self._cmd_prefix}:COUP LFR") class TriggerSweep: - """ - Define the sweep interface on the trigger - """ - - def __init__(self, write: WriteCallback): + def __init__(self, write: WriteCallback) -> None: self._write = write + def auto(self) -> None: + self._write("TRIG:SWE AUTO") + + def normal(self) -> None: + self._write("TRIG:SWE NORM") + class TriggerMode: """ @@ -89,8 +163,55 @@ class TriggerMode: """ def __init__(self, write: WriteCallback): + self.edge = TriggerEdge(write) + + +class TriggerSlopes: + def __init__(self, write: WriteCallback) -> None: self._write = write + def rising(self) -> None: + self._write("TRIG:EDGE:SLOPE POS") + + def falling(self) -> None: + self._write("TRIG:EDGE:SLOPE NEG") + + def either(self) -> None: + self._write("TRIG:EDGE:SLOPE EITH") + + def alternating(self) -> None: + self._write("TRIG:EDGE:SLOPE ALT") + + +class TriggerSources: + def __init__(self, write: WriteCallback) -> None: + self._write = write + + def ch1(self) -> None: + self._write("TRIG:EDGE:SOUR CHAN1") + + def ch2(self) -> None: + self._write("TRIG:EDGE:SOUR CHAN2") + + def ch3(self) -> None: + self._write("TRIG:EDGE:SOUR CHAN3") + + def ch4(self) -> None: + self._write("TRIG:EDGE:SOUR CHAN4") + + +class TriggerEdge: + def __init__(self, write: WriteCallback) -> None: + self._write = write + self.source = TriggerSources(write) + self.slope = TriggerSlopes(write) + + def __call__(self) -> None: + self._write("TRIG:MODE EDGE") + + def level(self, value: number) -> None: + self._write(f"TRIG:EDGE:LEVEL {value}") + class Trigger: """ @@ -99,9 +220,44 @@ class Trigger: def __init__(self, write: WriteCallback): self._write = write + self.mode = TriggerMode(write) - self.coupling = TriggerCoupling(write, "TRIG") self.sweep = TriggerSweep(write) + self.coupling = TriggerCoupling(write, "TRIG") + + def hf_reject(self, value: bool) -> None: + self._write(f"TRIG:HFR {int(value)}") + + +class Acquire: + def __init__(self, write: WriteCallback): + self._write = write + + def normal(self) -> None: + self._write(f"ACQ:TYPE NORM") + + def peak_detect(self) -> None: + self._write(f"ACQ:TYPE PEAK") + + def averaging(self, value) -> None: + self._write(f"ACQ:TYPE AVER;:ACQ:COUN {value}") + + def high_resolution(self) -> None: + self._write(f"ACQ:TYPE HRES") + + +class Measurement: + """ + Single channel measurement class + """ + + +class MultiMeasurement: + """ + Mulitple source measurement class + """ + + # Need a way to mimmic the _store dict. Maybe just easiest to use the same method of a dict class DSO(metaclass=ABCMeta): @@ -132,13 +288,19 @@ def __init__(self, instrument): # ------------------------- # Construct the API: # ------------------------- + self.source1 = SourceSelector() + self.source2 = SourceSelector() + self.ch1 = Channel("1", _write) self.ch2 = Channel("2", _write) self.ch3 = Channel("3", _write) self.ch4 = Channel("4", _write) + self.time_base = TimeBase(_write) + self.trigger = Trigger(_write) - # ... More stuff goes here. This will copy the original structure of the api dictionary + + self.acquire = Acquire(_write) @abstractmethod def wait_for_acquire(self) -> None: From 9953ebb0d9aaae6e8846b6e2abdbe3c21fd9b5c7 Mon Sep 17 00:00:00 2001 From: Jasper-Harvey0 Date: Mon, 16 Mar 2026 13:12:44 +1100 Subject: [PATCH 5/6] Remove stray import --- src/fixate/drivers/dso/helper.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/fixate/drivers/dso/helper.py b/src/fixate/drivers/dso/helper.py index 03cb87b4..a0297015 100644 --- a/src/fixate/drivers/dso/helper.py +++ b/src/fixate/drivers/dso/helper.py @@ -1,4 +1,3 @@ -from _typeshed import SupportsDunderGE import inspect from abc import ABCMeta, abstractmethod from typing import Callable, Union From 75b6958c909c4fe03a8457538da06a731f93e003 Mon Sep 17 00:00:00 2001 From: Jasper-Harvey0 Date: Tue, 17 Mar 2026 15:07:15 +1100 Subject: [PATCH 6/6] More building of the API --- src/fixate/drivers/dso/agilent_mso_x.py | 116 +--------- src/fixate/drivers/dso/helper.py | 283 +++++++++++++++++++----- 2 files changed, 234 insertions(+), 165 deletions(-) diff --git a/src/fixate/drivers/dso/agilent_mso_x.py b/src/fixate/drivers/dso/agilent_mso_x.py index 9e89654f..ea53d09f 100644 --- a/src/fixate/drivers/dso/agilent_mso_x.py +++ b/src/fixate/drivers/dso/agilent_mso_x.py @@ -26,11 +26,6 @@ def __init__(self, instrument): self.instrument.query_delay = 0.2 self.instrument.timeout = 1000 - # If, for example this does not have 4 channels, we can do: - # We would actually call some function like not_availalbe() that would raise a better error - self.ch3 = lambda: 1 / 0 - self.ch4 = lambda: 1 / 0 - def single(self) -> None: """ Specific implementation of the single trigger setup for Agilent MSO-X @@ -77,9 +72,6 @@ def stop(self): self._mode = "STOP" self._wave_acquired = False - def _write(self, value): - self.instrument.write(value) - def acquire(self, acquire_type="normal", averaging_samples=0): """ :param channel @@ -107,7 +99,7 @@ def acquire(self, acquire_type="normal", averaging_samples=0): raise ValueError("Invalid acquire type {}".format(acquire_type)) def waveform_preamble(self): - values = self.query_ascii_values(":WAV:PRE?") + values = self.instrument.query_ascii_values(":WAV:PRE?") wav_form_dict = {"0": "BYTE", "1": "WORD", "4": "ASCii"} acq_type_dict = { "0": "NORMAL", @@ -197,7 +189,7 @@ def waveform_values(self, signal, file_name="", file_type="csv"): self.instrument.write(":WAVeform:POINts:MODE " + points_mode) # Check if there is actually data to acquire: - data_available = int(self.query(":WAVeform:POINTs?")) + data_available = int(self.instrument.query(":WAVeform:POINTs?")) if data_available == 0: # No data is available # Setting a channel to be a waveform source turns it on, so we need to turn it off now: @@ -258,27 +250,6 @@ def waveform_values(self, signal, file_name="", file_type="csv"): raise NotImplementedError("Binary Output not implemented") return x, y - def digitize(self, signals): - signals = [self.validate_signal(sig) for sig in signals] - self.write(":DIG {}".format(",".join(signals))) - return signals - - def validate_signal(self, signal): - """ - :param signal: String ie. "1", "2", "3", "4", "func", "math" - :return: - """ - try: - if not (1 <= int(signal) <= 4): - raise ValueError("Invalid source channel {}".format(signal)) - else: - signal = "CHAN{}".format(int(signal)) - except ValueError: - if signal.lower() not in ["func", "math"]: - raise ValueError("Invalid source channel {}".format(signal)) - signal = signal.lower() - return signal - def reset(self): self.instrument.write("*CLS;*RST;:STOP") time.sleep(0.15) @@ -291,7 +262,7 @@ def save_setup(self, file_name): self.instrument.timeout = 5000 try: with open(file_name, "w") as f: - setup = self.query(":SYSTem:SETup?") + setup = self.instrument.query(":SYSTem:SETup?") f.write(setup) finally: self.instrument.timeout = 1000 @@ -305,43 +276,6 @@ def load_setup(self, file_name): finally: self.instrument.timeout = 1000 - def query(self, value): - try: - response = self.instrument.query(value) - finally: - self._raise_if_error() - return response - - def query_bool(self, value): - return bool(self.query_ascii_value(value)) - - def query_binary_values(self, value): - response = self.instrument.query_binary_values(value) - self._raise_if_error() - return response - - def query_ascii_values(self, value): - response = self.instrument.query_ascii_values(value) - self._raise_if_error() - return response - - def query_ascii_value(self, value): - return self.query_ascii_values(value)[0] - - def query_value(self, base_str, *args, **kwargs): - formatted_string = self._format_string(base_str, **kwargs) - return self.query_ascii_value(formatted_string) - - def query_after_acquire(self, base_str, *args, **kwargs): - self.wait_for_acquire() - try: - formatted_string = self._format_string(base_str, **kwargs) - return self.instrument.query_ascii_values(formatted_string)[0] - except: - self.instrument.close() - self.instrument.open() - raise - def wait_for_trigger(self, timeout): """ Waits for trigger for a set amount of time. @@ -407,12 +341,7 @@ def wait_for_acquire(self): "Cannot acquire waveform in this mode: {}".format(self._mode) ) - def read_raw(self): - data = self.instrument.read_raw() - self._raise_if_error() - return data - - def _check_errors(self): + def _check_errors(self) -> tuple[int, str]: time.sleep(0.1) resp = self.instrument.query("SYST:ERR?") code, msg = resp.strip("\n").split(",") @@ -420,22 +349,6 @@ def _check_errors(self): msg = msg.strip('"') return code, msg - def _raise_if_error(self): - errors = [] - while True: - code, msg = self._check_errors() - if code != 0: - errors.append((code, msg)) - else: - break - if errors: - raise InstrumentError( - "Error(s) Returned from DSO\n" - + "\n".join( - ["Code: {}\nMessage:{}".format(code, msg) for code, msg in errors] - ) - ) - def write(self, base_str, *args, **kwargs): formatted_string = self._format_string(base_str, **kwargs) self._write(formatted_string) @@ -450,27 +363,6 @@ def _format_string(self, base_str, **kwargs): prev_string = cur_string return cur_string - def store(self, store_dict, *args, **kwargs): - """ - Store a dictionary of values in TestClass - :param kwargs: - Dictionary containing the parameters to store - :return: - """ - new_dict = store_dict.copy() - for k, v in store_dict.items(): - # I want the same function from write to set up the string before putting it in new_dict - try: - new_dict[k] = v.format(**kwargs) - except: - pass - self._store.update(new_dict) - - def store_and_write(self, params, *args, **kwargs): - base_str, store_dict = params - self.store(store_dict) - self.write(base_str, *args, **kwargs) - def get_identity(self) -> str: """ :return: AGILENT TECHNOLOGIES,,,X.XX.XX diff --git a/src/fixate/drivers/dso/helper.py b/src/fixate/drivers/dso/helper.py index a0297015..c0a5f71a 100644 --- a/src/fixate/drivers/dso/helper.py +++ b/src/fixate/drivers/dso/helper.py @@ -1,8 +1,8 @@ import inspect from abc import ABCMeta, abstractmethod -from typing import Callable, Union +from typing import Callable, Literal, Union -from fixate.core.exceptions import InstrumentError, InstrumentFeatureUnavailable +from fixate.core.exceptions import InstrumentFeatureUnavailable, InstrumentError number = Union[float, int] @@ -15,7 +15,7 @@ ie we can create a mock DSO that inherts the base and redefines functions. """ WriteCallback = Callable[[str], None] -QueryCallback = Callable[[str], str] +QueryAsciiValuesCallback = Callable[[str], float] # Prompt that a featrue is not available: def unavailable(name: str | None = None): @@ -23,43 +23,6 @@ def unavailable(name: str | None = None): return InstrumentFeatureUnavailable(f"{label} not available on this device") -class SourceSelector: - """ - Retain compatibility of source selection from the old API - dso.source1.ch1() for example will set the source1.source = "CHAN1" - This is then queried by the two source measurement functions like 'delay' - You must set the sources before calling a multi measurement function. - """ - - def __init__(self): - # Selected source (CHAN1, CHAN2, ... ,FUNC, MATH, WMEM1, WMEM2 - self.source: str | None = None - - def ch1(self) -> None: - self.source = "CHAN1" - - def ch2(self) -> None: - self.source = "CHAN2" - - def ch3(self) -> None: - self.source = "CHAN3" - - def ch4(self) -> None: - self.source = "CHAN4" - - def function(self) -> None: - self.source = "FUNC" - - def math(self) -> None: - self.source = "MATH" - - def wmemory1(self) -> None: - self.source = "WMEM1" - - def wmemory2(self) -> None: - self.source = "WMEM2" - - class Channel: """ DSO Channel implementation @@ -245,18 +208,194 @@ def high_resolution(self) -> None: self._write(f"ACQ:TYPE HRES") -class Measurement: - """ - Single channel measurement class - """ +# Yes this sucks... +class MeasurementBase: + def __init__(self, query: QueryAsciiValuesCallback, command: str): + self._query = query + self._command = command + def ch1(self): + return self._query(self._command + " CHAN1") -class MultiMeasurement: + def ch2(self): + return self._query(self._command + " CHAN2") + + def ch3(self): + return self._query(self._command + " CHAN3") + + def ch4(self): + return self._query(self._command + " CHAN4") + + def function(self): + return self._query(self._command + " FUNC") + + def math(self): + return self._query(self._command + " MATH") + + +class VAverageMeasurement: + def __init__(self, query: QueryAsciiValuesCallback) -> None: + self.cycle = MeasurementBase(query, "MEAS:VAV? CYCL,") + self.display = MeasurementBase(query, "MEAS:VAV? DISP,") + + +class VRmsModesMeasurement: + def __init__(self, query: QueryAsciiValuesCallback, mode: str) -> None: + self.cycle = MeasurementBase(query, f"MEAS:VRMS? CYCL,{mode},") + self.display = MeasurementBase(query, f"MEAS:VRMS? DISP,{mode},") + + +class VRmsMeasurement: + def __init__(self, query: QueryAsciiValuesCallback) -> None: + self.dc = VRmsModesMeasurement(query, "DC") + self.ac = VRmsModesMeasurement(query, "AC") + + +class SourceSelector: """ - Mulitple source measurement class + Retain compatibility of source selection from the old API + dso.source1.ch1() for example will set the source1.source = "CHAN1" + This is then queried by the two source measurement functions like 'delay' + You must set the sources before calling a multi measurement function. """ - # Need a way to mimmic the _store dict. Maybe just easiest to use the same method of a dict + def __init__(self, store: dict, source: Literal["source1"] | Literal["source2"]): + self._store = store + # Name of the source: + self.source = source + + def ch1(self) -> None: + self._store[self.source] = "CHAN1" + + def ch2(self) -> None: + self._store[self.source] = "CHAN2" + + def ch3(self) -> None: + self._store[self.source] = "CHAN3" + + def ch4(self) -> None: + self._store[self.source] = "CHAN4" + + def function(self) -> None: + self._store[self.source] = "FUNC" + + def math(self) -> None: + self._store[self.source] = "MATH" + + def wmemory1(self) -> None: + self._store[self.source] = "WMEM1" + + def wmemory2(self) -> None: + self._store[self.source] = "WMEM2" + + +class DefineThreshold: + def __init__(self, write: WriteCallback) -> None: + self._write = write + + def percent(self, upper, middle, lower) -> None: + self._write(f"MEAS:DEF THR,PERC,{upper},{middle},{lower}") + + def absolute(self, upper, middle, lower) -> None: + self._write(f"MEAS:DEF THR,ABS,{upper},{middle},{lower}") + + +class Define: + def __init__(self, write: WriteCallback) -> None: + self.threshold = DefineThreshold(write) + + +class DelayEdges: + def __init__(self, write: WriteCallback) -> None: + self._write = write + + def rising_rising(self) -> None: + self._write("MEAS:DEF DEL, +1, +1") + + def rising_falling(self) -> None: + self._write("MEAS:DEF DEL, +1, -1") + + def falling_rising(self) -> None: + self._write("MEAS:DEF DEL, -1, +1") + + def falling_falling(self) -> None: + self._write("MEAS:DEF DEL, -1, -1") + + +class DelayMeasurement: + def __init__( + self, write: WriteCallback, query: QueryAsciiValuesCallback, store: dict + ) -> None: + self._write = write + self._query = query + self._store = store + self.edges = DelayEdges(write) + + def __call__(self) -> float: + return self._query( + f"MEAS:DEL? {self._store['source1']},{self._store['source2']}" + ) + + +class VRatioMeasurement: + def __init__(self, query: QueryAsciiValuesCallback, store: dict) -> None: + self._query = query + self._store = store + + def cycle(self) -> float: + return self._query( + f"MEAS:VRAT? CYCL,{self._store['source1']},{self._store['source2']}" + ) + + def display(self) -> float: + return self._query( + f"MEAS:VRAT? DISP,{self._store['source1']},{self._store['source2']}" + ) + + +class PhaseMeasurement: + def __init__(self, query: QueryAsciiValuesCallback, store: dict) -> None: + self._query = query + self._store = store + + def __call__(self) -> float: + return self._query( + f"MEAS:PHAS? {self._store['source1']},{self._store['source2']}" + ) + + +class Measure: + def __init__( + self, write: WriteCallback, query: QueryAsciiValuesCallback, store: dict + ) -> None: + self.counter = MeasurementBase(query, "MEAS:COUN?") + self.duty = MeasurementBase(query, "MEAS:DUTY?") + self.fall_time = MeasurementBase(query, "MEAS:FALL?") + self.rise_time = MeasurementBase(query, "MEAS:RIS?") + self.frequency = MeasurementBase(query, "MEAS:FREQ?") + self.cnt_edge_rising = MeasurementBase(query, "MEAS:NEDG?") + self.cnt_edge_falling = MeasurementBase(query, "MEAS:PEDG?") + self.cnt_pulse_positive = MeasurementBase(query, "MEAS:NPUL?") + self.cnt_pulse_negative = MeasurementBase(query, "MEAS:PPUL?") + self.period = MeasurementBase(query, "MEAS:PER?") + self.pulse_width = MeasurementBase(query, "MEAS:PWID?") + self.vamplitude = MeasurementBase(query, "MEAS:VAMP?") + self.vbase = MeasurementBase(query, "MEAS:VBAS?") + self.vtop = MeasurementBase(query, "MEAS:VTOP?") + self.vmax = MeasurementBase(query, "MEAS:VMAX?") + self.vmin = MeasurementBase(query, "MEAS:VMIN?") + self.vpp = MeasurementBase(query, "MEAS:VPP?") + self.xmax = MeasurementBase(query, "MEAS:XMAX?") + self.xmin = MeasurementBase(query, "MEAS:XMIN?") + # These need a little more construction to match the old API: + self.vaverage = VAverageMeasurement(query) + self.vrms = VRmsMeasurement(query) + + # Multi source measurements: + self.define = Define(write) + self.delay = DelayMeasurement(write, query, store) + self.phase = PhaseMeasurement(query, store) + self.vratio = VRatioMeasurement(query, store) class DSO(metaclass=ABCMeta): @@ -287,8 +426,8 @@ def __init__(self, instrument): # ------------------------- # Construct the API: # ------------------------- - self.source1 = SourceSelector() - self.source2 = SourceSelector() + self.source1 = SourceSelector(self._store, "source1") + self.source2 = SourceSelector(self._store, "source2") self.ch1 = Channel("1", _write) self.ch2 = Channel("2", _write) @@ -301,15 +440,41 @@ def __init__(self, instrument): self.acquire = Acquire(_write) + self.measure = Measure(_write, _query_after_acquire, self._store) + + # Mandatory methods to implement per driver: + @abstractmethod + def single(self) -> None: + """Sets oscilliscope to take a single shot measurement""" + + @abstractmethod + def run(self) -> None: + """Sets the oscilliscope to run mode""" + + @abstractmethod + def stop(self) -> None: + """Puts the oscillicope in stop mode""" + + @abstractmethod + def reset(self) -> None: + """Resets the oscilliscope""" + @abstractmethod def wait_for_acquire(self) -> None: """Block until the current acquisition is complete.""" + @abstractmethod + def get_identity(self) -> str: + """Return the IDN string of the device""" + + @abstractmethod + def _check_errors(self) -> tuple[int, str]: + """Check the error buffer for errors""" + # Query and write functions: def _write_cmd(self, cmd: str) -> None: """Write a SCPI command and raise on instrument error.""" self.instrument.write(cmd) - self._raise_if_error() def _query_bool_cmd(self, cmd: str) -> bool: values = self.instrument.query_ascii_values(cmd) @@ -327,9 +492,21 @@ def _query_after_acquire_cmd(self, cmd: str) -> float: raise return result - @abstractmethod - def _raise_if_error(self) -> None: - """Clean error queue and raise InstrumentError if any exist.""" + def _raise_if_error(self): + errors = [] + while True: + code, msg = self._check_errors() + if code != 0: + errors.append((code, msg)) + else: + break + if errors: + raise InstrumentError( + "Error(s) Returned from DSO\n" + + "\n".join( + ["Code: {}\nMessage:{}".format(code, msg) for code, msg in errors] + ) + ) # Context management: def __enter__(self):