diff --git a/labscript_devices/BS_Series/BLACS_tabs.py b/labscript_devices/BS_Series/BLACS_tabs.py new file mode 100644 index 00000000..ab5f47ed --- /dev/null +++ b/labscript_devices/BS_Series/BLACS_tabs.py @@ -0,0 +1,106 @@ +from qtutils.qt.QtWidgets import QPushButton, QSizePolicy, QHBoxLayout, QSpacerItem, QSizePolicy as QSP +from blacs.tab_base_classes import Worker, define_state +from blacs.device_base_class import DeviceTab +from .logger_config import logger +from blacs.tab_base_classes import MODE_MANUAL +from .utils import _create_button + +class BS_Tab(DeviceTab): + def initialise_GUI(self): + # Get properties from connection table + connection_table = self.settings['connection_table'] + properties = connection_table.find_by_name(self.device_name).properties + + logger.info(f"properties: {properties}") + + self.supports_custom_voltages_per_channel = properties['supports_custom_voltages_per_channel'] + self.num_AO = properties['num_AO'] + if self.supports_custom_voltages_per_channel: + self.AO_ranges = properties['AO_ranges'] + else: + self.default_voltage_range = properties['default_voltage_range'] + + # GUI Capabilities + self.base_units = 'V' + self.base_step = 1 + self.base_decimals = 3 + + # Create AO Output objects + ao_prop = {} + for i in range(1, int(self.num_AO) + 1): + if self.supports_custom_voltages_per_channel: + voltage_range = self.AO_ranges[i-1]['voltage_range'] + ao_prop['CH0%d' % i] = { + 'base_unit': self.base_units, + 'min': voltage_range[0], + 'max': voltage_range[1], + 'step': self.base_step, + 'decimals': self.base_decimals, + } + else: + ao_prop['CH0%d' % i] = { + 'base_unit': self.base_units, + 'min': self.default_voltage_range[0], + 'max': self.default_voltage_range[1], + 'step': self.base_step, + 'decimals': self.base_decimals, + } + + # Create and save AO objects + self.create_analog_outputs(ao_prop) + + # Create widgets for AO objects + widgets, ao_widgets,_ = self.auto_create_widgets() + self.auto_place_widgets(("Analog Outputs", ao_widgets)) + + # Create buttons to send-to-device + self.send_button = _create_button("Send to device", self.send_to_BS) + + # Add centered layout to center the button + center_layout = QHBoxLayout() + center_layout.addStretch() + center_layout.addWidget(self.send_button) + center_layout.addStretch() + + # Add center layout on device layout + self.get_tab_layout().addLayout(center_layout) + + self.supports_remote_value_check(False) + self.supports_smart_programming(False) + + + def initialise_workers(self): + # Get properties from connection table. + properties = self.settings['connection_table'].find_by_name(self.device_name).properties + + worker_kwargs = {"name": self.device_name + '_main', + "port": properties['port'], + "baud_rate": properties['baud_rate'], + "num_AO": properties['num_AO'], + "supports_custom_voltages_per_channel": properties['supports_custom_voltages_per_channel'], + "AO_ranges": properties['AO_ranges'], + "default_voltage_range": properties['default_voltage_range'], + } + + # Start a worker process + self.create_worker( + 'main_worker', + 'labscript_devices.BS_Series.BLACS_workers.BS_Worker', + worker_kwargs, + ) + self.primary_worker = "main_worker" + + @define_state(MODE_MANUAL, True) + def send_to_BS(self): + """Queue a manual send-to-device operation from the GUI. + + This function is triggered from the BLACS tab (by pressing a button) + and runs in the main thread. It queues the `send_to_BS()` function to be + executed by the worker. + + Used to reprogram the device based on current front panel values. + """ + try: + yield(self.queue_work(self.primary_worker, 'send_to_BS', [])) + except Exception as e: + logger.debug(f"Error by send work to worker(send_to_BS): \t {e}") diff --git a/labscript_devices/BS_Series/BLACS_workers.py b/labscript_devices/BS_Series/BLACS_workers.py new file mode 100644 index 00000000..a511e084 --- /dev/null +++ b/labscript_devices/BS_Series/BLACS_workers.py @@ -0,0 +1,218 @@ +from blacs.tab_base_classes import Worker +from labscript import LabscriptError +from .logger_config import logger +import time +import h5py +import numpy as np +from labscript_utils import properties +from zprocess import rich_print +from datetime import datetime +import threading +from .utils import _get_channel_num, _ao_to_CH + +class BS_Worker(Worker): + def init(self): + """Initialises communication with the device. When BLACS (re)starts""" + self.final_values = {} # [[channel_nums(ints)],[voltages(floats)]] to update GUI after shot + self.verbose = True + + # for running the buffered experiment in a separate thread: + self.thread = None + self._stop_event = threading.Event() + self._finished_event = threading.Event() + + try: + # Try to establish a serial connection + from .voltage_source import VoltageSource + self.voltage_source = VoltageSource(self.port, self.baud_rate, self.supports_custom_voltages_per_channel, self.default_voltage_range, self.AO_ranges) + + except LabscriptError as e: + raise RuntimeError(f"BS-34-1A identification failed: {e}") + except Exception as e: + raise RuntimeError(f"An error occurred during BS_Worker initialization: {e}") + + + def shutdown(self): + self.connection.close() + + def program_manual(self, front_panel_values): + """Allows for user control of the device via the BLACS_tab, + setting outputs to the values set in the BLACS_tab widgets. + Runs at the end of the shot.""" + + rich_print(f"---------- Manual MODE start: ----------", color=BLUE) + self.front_panel_values = front_panel_values + + if not getattr(self, 'restored_from_final_values', False): + if self.verbose is True: + print("Front panel values (before shot):") + for ch_name, voltage in front_panel_values.items(): + print(f" {ch_name}: {voltage:.2f} V") + + # Restore final values from previous shot, if available + if self.final_values: + for ch_num, value in self.final_values.items(): + front_panel_values[f'CH0{int(ch_num)}'] = value + + if self.verbose is True: + print("\nFront panel values (after shot):") + for ch_num, voltage in self.final_values.items(): + print(f" {ch_num}: {voltage:.2f} V") + + self.final_values = {} # Empty after restoring + self.restored_from_final_values = True + + return front_panel_values + + def check_remote_values(self): + return + + def transition_to_buffered(self, device_name, h5_file, initial_values, fresh): + """transitions the device to buffered shot mode, + reading the shot h5 file and taking the saved instructions from + labscript_device.generate_code and sending the appropriate commands + to the hardware. + Runs at the start of each shot.""" + + rich_print(f"---------- Begin transition to Buffered: ----------", color=BLUE) + self.restored_from_final_values = False # Drop flag + self.initial_values = initial_values # Store the initial values in case we have to abort and restore them + self.final_values = {} # Store the final values to update GUI during transition_to_manual + self.h5file = h5_file # Store path to h5 to write back from front panel + self.device_name = device_name + + with h5py.File(h5_file, 'r') as hdf5_file: + group = hdf5_file['devices'][device_name] + AO_data = group['AO_buffered'][:] + + # Prepare events + events = [] + for row in AO_data: + t = row['time'] + voltages = {ch: row[ch] for ch in row.dtype.names if ch != 'time'} + events.append((t, voltages)) + + # Create and launch thread + self._stop_event.clear() + self._finished_event.clear() + self.thread = threading.Thread(target=self._run_experiment_sequence, args=(events,)) + self.thread.start() + + return + + def _run_experiment_sequence(self, events): + try: + start_time = time.time() + for t, voltages in events: + now = time.time() + wait_time = t - (now - start_time) + if wait_time > 0: + time.sleep(wait_time) + print(f"[Time: {datetime.now()}] \n") + for conn_name, voltage in voltages.items(): + channel_num = _get_channel_num(conn_name) + self.voltage_source.set_voltage(channel_num, voltage) + self.final_values[channel_num] = voltage + if self.verbose: + print(f"[{t:.3f}s] --> Set {conn_name} (#{channel_num}) = {voltage}") + if self._stop_event.is_set(): + return + finally: + self._finished_event.set() + print(f"[Thread] finished all events !") + + def transition_to_manual(self): + """transitions the device from buffered to manual mode to read/save measurements from hardware + to the shot h5 file as results. + Ensure background thread has finished before exiting the shot.""" + #Stop the thread + rich_print(f"---------- Begin transition to Manual: ----------", color=BLUE) + + self.thread.join() + if not self._finished_event.is_set(): + print("WARNING: experiment sequence did not finish properly.") + else: + print("Experiment sequence completed successfully.") + return True + + def abort_transition_to_buffered(self): + return self.transition_to_manual() + + def _program_manual(self, front_panel_values): + """Sends voltage values to the device for all channels using VoltageSource. + Parameters: + - front_panel_values (dict): Dictionary of voltages keyed by channel name (e.g., 'CH01', 'CH02', ...). + """ + if self.verbose is True: + print("\nProgramming the device with the following values:") + logger.info("Programming the device from manual with the following values:") + + for channel_num in range(1, int(self.num_AO) + 1): + channel_name = f'CH0{channel_num}' # 'CH01' + try: + voltage = front_panel_values[channel_name] + except Exception as e: + raise ValueError(f"Error accessing front panel values for channel '{channel_name}': {e}") + if self.verbose: + print(f"→ {channel_name}: {voltage:.2f} V") + logger.info(f"Setting {channel_name} to {voltage:.2f} V (manual mode)") + + self.voltage_source.set_voltage(channel_num, voltage) + + def send_to_BS(self, kwargs): + """Sends manual values from the front panel to the BS-series device. + This function is executed in the worker process. It uses the current + front panel values to reprogram the device in manual mode by clicking the button 'send to device'. + Args: + kwargs (dict): Not used currently. + """ + self._program_manual(self.front_panel_values) + current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + self._append_front_panel_values_to_manual(self.front_panel_values, current_time) + + def _append_front_panel_values_to_manual(self, front_panel_values, current_time): + """ + Append front-panel voltage values to the 'AO_manual' dataset in the HDF5 file. + + This method records the current manual voltage settings (from the front panel) + along with a timestamp into the 'AO_manual' table inside the device's HDF5 group. + It assumes that `self.h5file` and `self.device_name` have been set + (in `transition_to_buffered`). If not, a RuntimeError is raised. + + Args: + front_panel_values (dict): + Dictionary mapping channel names (e.g., 'CH01') to voltage values (float). + current_time (str): + The timestamp (formatted as a string) when the values were recorded + + Raises: + RuntimeError: If `self.h5file` is not set (i.e., manual values are being saved before + the system is in buffered mode). + """ + # Check if h5file is set (transition_to_buffered must be called first) + if not hasattr(self, 'h5file') or self.h5file is None: + raise RuntimeError( + "Cannot save manual front-panel values: " + "`self.h5file` is not set. Make sure `transition_to_buffered()` has been called before sending to the device." + ) + + with h5py.File(self.h5file, 'r+') as hdf5_file: + group = hdf5_file['devices'][self.device_name] + dset = group['AO_manual'] + old_shape = dset.shape[0] + dtype = dset.dtype + connections = [name for name in dset.dtype.names if name != 'time'] #'ao 1' + + # Create new data row + new_row = np.zeros((1,), dtype=dtype) + new_row['time'] = current_time + for conn in connections: + channel_name = _ao_to_CH(conn) # 'CH01' + new_row[conn] = front_panel_values.get(channel_name, 0.0) + + # Add new row to table + dset.resize(old_shape + 1, axis=0) + dset[old_shape] = new_row[0] + +# --------------------contants +BLUE = '#66D9EF' \ No newline at end of file diff --git a/labscript_devices/BS_Series/BS_341A.md b/labscript_devices/BS_Series/BS_341A.md new file mode 100644 index 00000000..31d2e1b6 --- /dev/null +++ b/labscript_devices/BS_Series/BS_341A.md @@ -0,0 +1,110 @@ +# BS Series Precision Voltage Source (BS 34-1A) +It adds initial support for the **Stahl Electronics BS 34-1A** multichannel voltage source +to the labscript suite. + +Commands are sent via a standard serial interface with standard baud rate = 9600. + +--- +## Timing Strategy +Since the BS 34-1A has no internal clock and no buffering, +time-sensitive operations (e.g., updating voltages across a sequence) are +implemented using `time.sleep(t)` in a background thread, opened in `transition_to_buffered`. +After the whole sequence is done, the thread is closed in `transition_to_manual`. + +While this is a naive approach, it currently works to avoid blocking +the main thread. +However, we acknowledge that this is not ideal, and we welcome proposals for +a cleaner, event-driven timing model. + +--- + +## GUI small extension on button +A "Send to Device" button that: + + * Collects all entered voltages + * Queues them into the worker process + * Sends commands serially to the device + +--- + +## BS 34-1A Emulator +This [emulator](testing/emulateSerPort.py) +simulates the behavior of the BS 34-1A multichannel voltage source. +It allows for testing with BLACS. When started, the emulator creates +a **virtual serial port** that behaves like a real BS 34-1A device. +Programs can connect to this port and communicate with it as if +it were the real device. + +To launch the emulator: + +```bash +python3 -m BS_Series.testing.emulateSerPort +``` +You’ll see output like: For BS 34-1A use: /dev/pts/5 + +Use that port when connecting in `connection_table.py`. + +--- + +## Current implementation + +The current implementation consists the standard implementation and tailored implementation +to the specific BS 34-1A unit we own, which has the following configuration: + +| Channel | Voltage Range | +| ------- |---------------| +| CH1 | ±24 V | +| CH2–CH8 | ±34.565 V | + +We are modeling this modular support similar to how `NI_DAQmx` handles multiple device types. + +--- +## Connection table + +```python +from labscript import start, stop, add_time_marker, AnalogOut, DigitalOut +from labscript_devices.DummyPseudoclock.labscript_devices import DummyPseudoclock +from labscript_devices.BS_Series.models.BS_341A_spec import BS_341A_spec +from labscript_devices.BS_Series.models.BS_341A import BS_341A + +DummyPseudoclock(name='pseudoclock') +BS_341A_spec(name='voltage_source_for_ST', parent_device=pseudoclock.clockline, port='/dev/ttyUSB0', baud_rate=9600, num_AO=8) +BS_341A(name='voltage_source', parent_device=pseudoclock.clockline, port='/dev/pts/2', baud_rate=9600) + +AnalogOut(name='ao_BS_1', parent_device=voltage_source_for_ST ,connection='ao 1') +AnalogOut(name='ao_BS_2', parent_device=voltage_source_for_ST ,connection='ao 2') +AnalogOut(name='ao_BS_3', parent_device=voltage_source_for_ST ,connection='ao 3') +AnalogOut(name='ao_BS_4', parent_device=voltage_source_for_ST ,connection='ao 4') +AnalogOut(name='ao_BS_5', parent_device=voltage_source_for_ST ,connection='ao 5') +AnalogOut(name='ao_BS_6', parent_device=voltage_source_for_ST ,connection='ao 6') +AnalogOut(name='ao_BS_7', parent_device=voltage_source_for_ST ,connection='ao 7') + +AnalogOut(name='ao_BS_11', parent_device=voltage_source ,connection='ao 1', default_value=20) +AnalogOut(name='ao_BS_22', parent_device=voltage_source ,connection='ao 2') +AnalogOut(name='ao_BS_33', parent_device=voltage_source ,connection='ao 3') +AnalogOut(name='ao_BS_44', parent_device=voltage_source ,connection='ao 4') +``` +--- + +## Extensions + +This is the first device in the BS series we've integrated. +Others in the same family are expected to have similar interfaces but may differ in: + +* Voltage ranges per channel +* Number of channels +* Supported commands +* etc. + +The plan is to extend current BS_ implementation by using subclassing and +extending the configurations in [capabilities.json](models/capabilities.json). +In details: + +To add a new device, the user should: +- Define the new device model in the capabilities.json file. +- Create a corresponding .py file (by copying an existing model as a template). +- Adjust the class and the model names in .py file to match the new entry in the JSON file. + +Note: The class name must exactly match the model name specified in capabilities.json. + +--- diff --git a/labscript_devices/BS_Series/__init__.py b/labscript_devices/BS_Series/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/labscript_devices/BS_Series/labscript_devices.py b/labscript_devices/BS_Series/labscript_devices.py new file mode 100644 index 00000000..d065cbb4 --- /dev/null +++ b/labscript_devices/BS_Series/labscript_devices.py @@ -0,0 +1,140 @@ +from labscript_devices import register_classes +from labscript import Device, set_passed_properties, IntermediateDevice, AnalogOut, config +from labscript import IntermediateDevice +import h5py +import numpy as np +from labscript_devices.NI_DAQmx.utils import split_conn_DO, split_conn_AO +from .logger_config import logger + +class BS_(IntermediateDevice): + description = 'BS_Series' + + @set_passed_properties( + property_names={ + "connection_table_properties": [ + "static_AO", + "baud_rate", + "port", + "num_AO", + "AO_ranges", + "default_voltage_range", + "supports_custom_voltages_per_channel", + ], + } + ) + def __init__( + self, + name, + port='', + baud_rate=9600, + parent_device=None, + num_AO=0, + static_AO = None, + AO_ranges = [], + default_voltage_range = [], + supports_custom_voltages_per_channel = False, + **kwargs + ): + """Initialize a generic BS-series analog output device. + + This constructor supports both devices that share a global analog output + voltage range, and those that allow custom voltage ranges per channel. + + Args: + name (str): Name to assign to the created labscript device. + port (str): Serial port used to connect to the device (e.g. COM3, /dev/ttyUSB0) + baud_rate (int): + parent_device (clockline): Parent clockline device that will + clock the outputs of this device + num_AO (int): Number of analog output channels. + AO_ranges (list of dict, optional): A list specifying the voltage range for each AO channel, + used only if `supports_custom_voltages_per_channel` is True. + Each item should be a dict of the form: + { + "channel": , # Channel index + "voltage_range": [, ] # Min and max voltage + } + static_AO (int, optional): Number of static analog output channels. + default_voltage_range (iterable): A `[Vmin, Vmax]` pair that sets the analog + output voltage range for all analog outputs. + supports_custom_voltages_per_channel (bool): Whether this device supports specifying + individual voltage ranges for each AO channel. + """ + self.num_AO = num_AO + if supports_custom_voltages_per_channel: + if len(AO_ranges) < num_AO: + raise ValueError( + "AO_ranges must contain at least num_AO entries when custom voltage ranges are enabled.") + else: + self.AO_ranges = AO_ranges + else: + self.default_voltage_range = default_voltage_range + + IntermediateDevice.__init__(self, name, parent_device, **kwargs) + self.BLACS_connection = '%s,%s' % (port, str(baud_rate)) + + def add_device(self, device): + IntermediateDevice.add_device(self, device) + + def generate_code(self, hdf5_file): + """Convert the list of commands into numpy arrays and save them to the shot file.""" + logger.info("generate_code for BS 34-1A is called") + IntermediateDevice.generate_code(self, hdf5_file) + + clockline = self.parent_device + pseudoclock = clockline.parent_device + times = pseudoclock.times[clockline] + + # create dataset + analogs = {} + for child_device in self.child_devices: + if isinstance(child_device, AnalogOut): + analogs[child_device.connection] = child_device + + AO_table = self._make_analog_out_table(analogs, times) + AO_manual_table = self._make_analog_out_table_from_manual(analogs) + logger.info(f"Times in generate_code AO table: {times}") + logger.info(f"AO table for HV-Series is: {AO_table}") + + group = self.init_device_group(hdf5_file) + group.create_dataset("AO_buffered", data=AO_table, compression=config.compression) + group.create_dataset("AO_manual", shape=AO_manual_table.shape, maxshape=(None,), dtype=AO_manual_table.dtype, + compression=config.compression, chunks=True) + + + def _make_analog_out_table(self, analogs, times): + """Create a structured numpy array with first column as 'time', followed by analog channel data. + Args: + analogs (dict): Mapping of connection names to AnalogOut devices. + times (array-like): Array of time points. + Returns: + np.ndarray: Structured array with time and analog outputs. + """ + if not analogs: + return None + + n_timepoints = len(times) + dtypes = [('time', np.float64)] + [(c, np.float32) for c in analogs] # first column = time + + analog_out_table = np.empty(n_timepoints, dtype=dtypes) + + analog_out_table['time'] = times + for connection, output in analogs.items(): + analog_out_table[connection] = output.raw_output + + return analog_out_table + + def _make_analog_out_table_from_manual(self, analogs): + """Create a structured empty numpy array with first column as 'time', followed by analog channel data. + Args: + times (array-like): Array of timestamps. + ... + Returns: + np.ndarray: Structured empty array with time and analog outputs.""" + + str_dtype = h5py.string_dtype(encoding='utf-8', length=19) + + dtypes = [('time', str_dtype)] + [(c, np.float32) for c in analogs] + + analog_out_table = np.empty(0, dtype=dtypes) + return analog_out_table diff --git a/labscript_devices/BS_Series/logger_config.py b/labscript_devices/BS_Series/logger_config.py new file mode 100644 index 00000000..143cabd7 --- /dev/null +++ b/labscript_devices/BS_Series/logger_config.py @@ -0,0 +1,24 @@ +import os +import logging + +# Configure the logger +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +LOG_FILE = os.path.join(BASE_DIR, 'device.log') + +# Create logger +logger = logging.getLogger("BS_34") +logger.setLevel(logging.DEBUG) + +# Create file handler and set level to debug +handler = logging.FileHandler(LOG_FILE) +handler.setLevel(logging.DEBUG) + +# Create formatter and set it for the handler +formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s: %(message)s') +handler.setFormatter(formatter) + +# Add handler to the logger +logger.addHandler(handler) + +# Test the logger in the config file +logger.info("Logger initialized successfully") diff --git a/labscript_devices/BS_Series/models/BS_341A.py b/labscript_devices/BS_Series/models/BS_341A.py new file mode 100644 index 00000000..f5befc01 --- /dev/null +++ b/labscript_devices/BS_Series/models/BS_341A.py @@ -0,0 +1,18 @@ +from labscript_devices.BS_Series.labscript_devices import BS_ +import json +import os + +THIS_FOLDER = os.path.dirname(os.path.abspath(__file__)) +CAPABILITIES_FILE = os.path.join(THIS_FOLDER, 'capabilities.json') +with open(CAPABILITIES_FILE, 'r') as f: + CAPABILITIES = json.load(f).get('BS_341A', {}) + + +class BS_341A(BS_): + description = 'BS_341A' + + def __init__(self, *args, **kwargs): + """Class for BS 34-1A basic configuration""" + combined_kwargs = CAPABILITIES.copy() + combined_kwargs.update(kwargs) + BS_.__init__(self, *args, **combined_kwargs) \ No newline at end of file diff --git a/labscript_devices/BS_Series/models/BS_341A_spec.py b/labscript_devices/BS_Series/models/BS_341A_spec.py new file mode 100644 index 00000000..ea3e9229 --- /dev/null +++ b/labscript_devices/BS_Series/models/BS_341A_spec.py @@ -0,0 +1,18 @@ +from labscript_devices.BS_Series.labscript_devices import BS_ +import json +import os + +THIS_FOLDER = os.path.dirname(os.path.abspath(__file__)) +CAPABILITIES_FILE = os.path.join(THIS_FOLDER, 'capabilities.json') +with open(CAPABILITIES_FILE, 'r') as f: + CAPABILITIES = json.load(f).get('BS_341A_spec', {}) + + +class BS_341A_spec(BS_): + description = 'BS_341A_spec' + + def __init__(self, *args, **kwargs): + """Class for BS 34-1A special configuration""" + combined_kwargs = CAPABILITIES.copy() + combined_kwargs.update(kwargs) + BS_.__init__(self, *args, **combined_kwargs) \ No newline at end of file diff --git a/labscript_devices/BS_Series/models/__init__.py b/labscript_devices/BS_Series/models/__init__.py new file mode 100644 index 00000000..61a72f8a --- /dev/null +++ b/labscript_devices/BS_Series/models/__init__.py @@ -0,0 +1,19 @@ +import os +import json +from labscript_devices import import_class_by_fullname + +THIS_FOLDER = os.path.dirname(os.path.abspath(__file__)) +CAPABILITIES_FILE = os.path.join(THIS_FOLDER, 'capabilities.json') + +capabilities = {} +if os.path.exists(CAPABILITIES_FILE): + with open(CAPABILITIES_FILE) as f: + capabilities = json.load(f) + +__all__ = [] +# Import all subclasses into the global namespace: +for model_name in capabilities: + class_name = model_name + path = f'labscript_devices.BS_Series.models.{model_name}.{class_name}' + globals()[class_name] = import_class_by_fullname(path) + __all__.append(class_name) \ No newline at end of file diff --git a/labscript_devices/BS_Series/models/capabilities.json b/labscript_devices/BS_Series/models/capabilities.json new file mode 100644 index 00000000..12807d63 --- /dev/null +++ b/labscript_devices/BS_Series/models/capabilities.json @@ -0,0 +1,26 @@ +{ + "BS_341A_spec": { + "static_AO": 0, + "num_AO": 8, + "baud_rate": 9600, + "supports_custom_voltages_per_channel": true, + "AO_ranges": [ + {"channel": 1, "voltage_range": [-24.0, 24.0]}, + {"channel": 2, "voltage_range": [-34.565, 34.565]}, + {"channel": 3, "voltage_range": [-34.565, 34.565]}, + {"channel": 4, "voltage_range": [-34.565, 34.565]}, + {"channel": 5, "voltage_range": [-34.565, 34.565]}, + {"channel": 6, "voltage_range": [-34.565, 34.565]}, + {"channel": 7, "voltage_range": [-34.565, 34.565]}, + {"channel": 8, "voltage_range": [-34.565, 34.565]} + ], + "default_voltage_range": [-24, 24] + }, + "BS_341A": { + "static_AO": 0, + "num_AO": 8, + "baud_rate": 9600, + "supports_custom_voltages_per_channel": false, + "default_voltage_range": [-34, 34] + } +} \ No newline at end of file diff --git a/labscript_devices/BS_Series/register_classes.py b/labscript_devices/BS_Series/register_classes.py new file mode 100644 index 00000000..52fc15be --- /dev/null +++ b/labscript_devices/BS_Series/register_classes.py @@ -0,0 +1,30 @@ +from labscript_devices import register_classes +import json +import os +from labscript_devices.BS_Series.logger_config import logger + +THIS_FOLDER = os.path.dirname(os.path.abspath(__file__)) +CAPABILITIES_FILE = os.path.join(THIS_FOLDER, 'models', 'capabilities.json') + +capabilities = {} +if os.path.exists(CAPABILITIES_FILE): + with open(CAPABILITIES_FILE) as f: + capabilities = json.load(f) + +register_classes( + "BS_", + BLACS_tab='labscript_devices.BS_Series.BLACS_tabs.BS_Tab', + runviewer_parser=None, +) + +for model_name in capabilities: + logger.debug(f"Registering model: {model_name}") + + try: + register_classes( + model_name, + BLACS_tab='labscript_devices.BS_Series.BLACS_tabs.BS_Tab', + runviewer_parser=None, + ) + except Exception as e: + logger.error(f"Error registering {model_name}: {e}") \ No newline at end of file diff --git a/labscript_devices/BS_Series/testing/__init__.py b/labscript_devices/BS_Series/testing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/labscript_devices/BS_Series/testing/emulateSerPort.py b/labscript_devices/BS_Series/testing/emulateSerPort.py new file mode 100644 index 00000000..4bbeb740 --- /dev/null +++ b/labscript_devices/BS_Series/testing/emulateSerPort.py @@ -0,0 +1,70 @@ +""" +Emulate the serial port for the BS 34-1A. + +You will create a virtual serial port using this script. This script will act as if it’s the BS 34-1A device. When you run the script, it will open a serial port (for example, /dev/pts/1) and allow other programs (such as your BLACS worker) to communicate with it. + +The virtual serial port should stay open while the simulation is running, so other code that expects to interact with the serial device can do so just as if the actual device were connected. + +Run following command in the corresponding folder. + python3 -m BS_Series.testing.emulateSerPort +""" + +import os, pty, threading, time +import sys + +class BS_341AEmulator: + def __init__(self, verbose=False): + self.verbose = verbose + self.master, self.slave = pty.openpty() + self.running = False + self.port_name = os.ttyname(self.slave) + self.thread = threading.Thread(target=self._run) + + def start(self): + self.running = True + self.thread.start() + if self.verbose: + print("Starting BS 34-1A Emulator on virtual port: " + self.port_name) + + def stop(self): + self.running = False + self.thread.join() + if self.verbose: + print("Stopping BS 34-1A Emulator.") + + def _run(self): + while self.running: + try: + command = self._read_command().decode().strip() + if self.verbose: + print(f"Received: {command}") + if command == "IDN": + self._respond("HV341 34 8 b\r") + elif command.startswith("HV341 CH"): + _, channel, voltage = command.split()[:3] + self._respond(f"{channel} {voltage}\r") + else: + self._respond("err\r") + except Exception as e: + self._respond("err\r") + time.sleep(0.05) + + def _read_command(self): + """ Reads the command until the '\r' character is encountered. """ + return b"".join(iter(lambda: os.read(self.master, 1), b"\r")) + + def _respond(self, message): + os.write(self.master, message.encode()) + if self.verbose: + print(f"Responded: {message.strip()}") + + +if __name__ == "__main__": + emulator = BS_341AEmulator(verbose=True) + emulator.start() + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + emulator.stop() + sys.exit(0) \ No newline at end of file diff --git a/labscript_devices/BS_Series/utils.py b/labscript_devices/BS_Series/utils.py new file mode 100644 index 00000000..b53e286e --- /dev/null +++ b/labscript_devices/BS_Series/utils.py @@ -0,0 +1,76 @@ +from labscript_utils import dedent +from .logger_config import logger +from qtutils.qt.QtWidgets import QPushButton, QSizePolicy, QHBoxLayout, QSpacerItem, QSizePolicy as QSP + +def _ao_to_CH(ao_name: str) -> str: + """Converts a string like 'ao 0' or 'ao2' to a channel string like 'CH01' or 'CH03'. + Args: + ao_name (str): Analog output name (e.g., 'ao 0', 'ao1') + + Returns: + str: Channel name (e.g., 'CH01') + + Raises: + ValueError: If input format is invalid. + """ + ao_name = ao_name.strip().lower().replace(' ', '') # Normalize input: 'ao 0' -> 'ao0' + + if not ao_name.startswith('ao') or len(ao_name) < 3: + raise ValueError(f"Invalid AO name format: '{ao_name}'") + + try: + ao_index = int(ao_name[2:]) + except ValueError: + raise ValueError(f"Unable to extract index from AO name: '{ao_name}'") + + return f'CH{ao_index:02d}' # Always 2 digits, e.g. CH01, CH02 + + +def _get_channel_num(channel: str) -> int: + """Extracts the channel number from strings like 'AO3', 'ao 3', or 'CH03'. + Args: + channel (str): The name of the channel. + + Returns: + int: Channel number, e.g., 1 to 8. + + Raises: + ValueError: If the channel string format is invalid or the number is out of range.""" + ch_lower = channel.lower() + if ch_lower.startswith("ao "): + channel_num = int(ch_lower[3:]) # 'ao 3' -> 3 + elif ch_lower.startswith("ao"): + channel_num = int(ch_lower[2:]) # 'ao3' -> 3 + elif ch_lower.startswith("channel"): + _, channel_num_str = channel.split() # 'channel 1' -> 1 + channel_num = int(channel_num_str) + elif ch_lower.startswith("ch0"): + channel_num = int(channel[3:]) # 'ch03' -> 3 + else: + raise ValueError(f"Unexpected channel name format: '{channel}'") + + return channel_num + +def _create_button(text, on_click_callback): + """Creates a styled QPushButton with consistent appearance and connects it to the given callback.""" + button = QPushButton(text) + button.setSizePolicy(QSP.Fixed, QSP.Fixed) + button.adjustSize() + button.setStyleSheet(""" + QPushButton { + border: 1px solid #B8B8B8; + border-radius: 3px; + background-color: #F0F0F0; + padding: 4px 10px; + font-weight: light; + } + QPushButton:hover { + background-color: #E0E0E0; + } + QPushButton:pressed { + background-color: #D0D0D0; + } + """) + button.clicked.connect(lambda: on_click_callback()) + logger.debug(f"Button {text} is created") + return button \ No newline at end of file diff --git a/labscript_devices/BS_Series/voltage_source.py b/labscript_devices/BS_Series/voltage_source.py new file mode 100644 index 00000000..3d9596df --- /dev/null +++ b/labscript_devices/BS_Series/voltage_source.py @@ -0,0 +1,227 @@ +import serial +import numpy as np +from labscript.labscript import LabscriptError +from .logger_config import logger + +class VoltageSource: + """ Voltage Source class to establish and maintain the communication with the connection. + """ + def __init__(self, + port, + baud_rate, + supports_custom_voltages_per_channel, + default_voltage_range, + AO_ranges, + verbose=False + ): + logger.debug(f"") + self.verbose = verbose + self.port = port + self.baud_rate = baud_rate + self.supports_custom_voltages_per_channel = supports_custom_voltages_per_channel + self.default_voltage_range = default_voltage_range + self.AO_ranges = AO_ranges + + # connecting to connectionice + self.connection = serial.Serial(self.port, self.baud_rate, timeout=1) + device_info = self.identify_query() + self.device_serial = device_info[0] # For example, 'HV023' + self.device_voltage_range = device_info[1] # For example, '50' + self.device_channels = device_info[2] # For example, '10' + self.device_output_type = device_info[3] # For example, 'b' (bipolar, unipolar, quadrupole, steerer supply) + + def identify_query(self): + """Send identification instruction through serial connection, receive response. + Returns: + list[str]: Parsed identity response split by whitespace. + Raises: + LabscriptError: If identity format is incorrect. + """ + self.connection.write("IDN\r".encode()) + raw_response = self.connection.read_until(b'\r').decode() + identity = raw_response.split() + + if len(identity) == 4: + logger.debug(f"Device initialized with identity: {identity}") + return identity + else: + raise LabscriptError( + f"Device identification failed.\n" + f"Raw identity: {raw_response!r}\n" + f"Parsed identity: {identity!r}\n" + f"Expected format: ['HVXXX', 'RRR', 'CC', 'b']\n" + f"Device: BS at port {self.port!r}\n" + ) + + def set_voltage(self, channel_num, value): + """ Send set voltage command to device. + Args: + channel_num (int): Channel number. + value (float): Voltage value to set. + Raises: + LabscriptError: If the response from device is incorrect. + """ + try: + channel = f"CH{int(channel_num):02d}" + if self.supports_custom_voltages_per_channel: + voltage_range = float(self.AO_ranges[channel_num - 1]['voltage_range'][1]) + else: + voltage_range = float(self.default_voltage_range[1]) + scaled_voltage = self._scale_to_normalized(float(value), float(voltage_range)) + send_str = f"{self.device_serial} {channel} {scaled_voltage:.5f}\r" + + self.connection.write(send_str.encode()) + response = self.connection.read_until(b'\r').decode().strip() #'CHXX Y.YYYYY' + logger.debug(f"Sent to BS-34: {send_str!r} with {value} | Received: {response!r}") + + expected_response = f"{channel} {scaled_voltage:.5f}" + if response != expected_response: + raise LabscriptError( + f"Voltage setting failed.\n" + f"Sent command: {send_str.strip()!r}\n" + f"Expected response: {expected_response!r}\n" + f"Actual response: {response!r}\n" + f"Device at port {self.port!r}" + ) + except Exception as e: + raise LabscriptError(f"Error in set_voltage: {e}") + + def read_temperature(self): + """ + Query the device for temperature. + Returns: + float: Temperature in Celsius. + Raises: + LabscriptError: If the response format is invalid or parsing fails. + """ + send_str = f"{self.device_serial} TEMP\r" + self.connection.write(send_str.encode()) + + response = self.connection.read_until(b'\r').decode().strip() #'TEMP XXX.X°C' + + if response.endswith("°C"): + try: + # Remove the degree symbol and parse the number + _, temperature_str_raw = response.split() # 'TEMP' 'XXX.X°C' + temperature_str = temperature_str_raw.replace("°C", "").strip() + temperature = float(temperature_str) + return temperature + except ValueError: + raise LabscriptError(f"Failed to parse temperature from response.\n") + else: + raise LabscriptError( + f"Temperature query failed.\n" + f"Unexpected response format: {response!r}\n" + f"Expected a value ending in '°C'." + ) + + def voltage_query(self, channel_num): + """ + Query voltage on the channel. + Args: + channel_num (str): Channel number. + Returns: + float: voltage in Volts. + Raises: + LabscriptError: If the response format is invalid or parsing fails. + """ + channel = f"{int(channel_num):02d}" # 1 -> '01' + send_str = f"{self.device_serial} U{channel}\r" # 'DDDDD UXX' + self.connection.write(send_str.encode()) + + response = self.connection.read_until(b'\r').decode().strip() # '+/-yy,yyy V' + + if response.endswith("V"): + try: + numeric_part = response[:-1].strip() # remove 'V' and whitespace + numeric_part = numeric_part.replace(',', '.') + voltage = float(numeric_part) + return voltage + except ValueError: + raise LabscriptError(f"Failed to parse voltage from response.\n") + else: + raise LabscriptError( + f"Voltage query failed.\n" + f"Unexpected response format: {response!r}\n" + f"Expected a value ending in 'V'." + ) + + def current_query(self, channel_num): + """ + Query current on the channel. + Args: + channel_num (int): Channel number. + Returns: + float: current in milliAmpere + Raises: + LabscriptError: If the response format is invalid or parsing fails. + """ + channel = f"{int(channel_num):02d}" # 1 -> '01' + send_str = f"{self.device_serial} I{channel}\r" + self.connection.write(send_str.encode()) + + response = self.connection.read_until(b'\r').decode().strip() # '+/-yy,yyy mA' + + if response.endswith("mA"): + try: + numeric_part = response[:-1].strip() # remove 'mA' and whitespace + numeric_part = numeric_part.replace(',', '.') # convert to Python-style float + current = float(numeric_part) + return current + except ValueError: + raise LabscriptError(f"Failed to parse current from response.\n") + else: + raise LabscriptError( + f"Current query failed.\n" + f"Unexpected response format: {response!r}\n" + f"Expected a value ending in 'mA'." + ) + + def vol_curr_query(self, channel_num): + """ + Query voltage and current on the channel. + Args: + channel_num (int): Channel number. + Returns: + float: voltage in Volts. + float: cuurent in milliAmpere + Raises: + LabscriptError: If the response format is invalid or parsing fails. + """ + channel = f"{int(channel_num):02d}" # 1 -> '01' + send_str = f"{self.device_serial} Q{channel}\r" # 'DDDDD QXX' + self.connection.write(send_str.encode()) + + response = self.connection.read_until(b'\r').decode().strip() # '+/-yy,yyy V +/-z,zzz mA' + + if response.endswith("mA"): + try: + parts = response.split("V") + numeric_vol = parts[0].strip() # e.g., '+12,345' + numeric_curr = parts[1].replace("mA", "").strip() # e.g., '-00,123' + numeric_vol = numeric_vol.replace(',', '.') # convert to Python-style float + numeric_curr = numeric_curr.replace(',', '.') + voltage = float(numeric_vol) + current = float(numeric_curr) + return voltage, current + + except (ValueError, IndexError) as e: + raise LabscriptError( + f"Failed to parse voltage and current from response: {response!r}" + ) from e + else: + raise LabscriptError( + f"Voltage and Current query failed.\n" + f"Unexpected response format: {response!r}\n" + f"Expected format like '+12,345 V -00,123 mA'." + ) + + def _scale_to_range(self, normalized_value, max_range): + """Convert a normalized value (0 to 1) to the specified range (-max_range to +max_range)""" + max_range = float(max_range) + return 2 * max_range * normalized_value - max_range + + def _scale_to_normalized(self, actual_value, max_range): + """Convert an actual value (within -max_range to +max_range) to a normalized value (0 to 1)""" + max_range = float(max_range) + return (actual_value + max_range) / (2 * max_range)