Skip to content

Modified for running in Linux and New stage driver #1045

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions src/navigate/controller/sub_controllers/tiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,26 +266,19 @@ def set_table(self):
y_stop = float(self.variables["y_end"].get())
y_tiles = int(self.variables["y_tiles"].get())

# NOTE: Removed shifting by the origin becuase, it was not clear how to set the origin.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should discuss this. I'm hesitant to make the change immediately, but we can consider a better way to communicate the origin.

# shift z by coordinate origin of local z-stack
z_start = float(self.variables["z_start"].get()) - float(
self.stack_acq_widgets["start_position"].get()
)
z_stop = float(self.variables["z_end"].get()) - float(
self.stack_acq_widgets["end_position"].get()
)
z_start = float(self.variables["z_start"].get()) # - float(self.stack_acq_widgets["start_position"].get())
z_stop = float(self.variables["z_end"].get()) # - float(self.stack_acq_widgets["end_position"].get())
z_tiles = int(self.variables["z_tiles"].get())

# Default to fixed theta
r_start = float(self.stage_position_vars["theta"].get())
r_stop = float(self.stage_position_vars["theta"].get())
r_tiles = 1

f_start = float(self.variables["f_start"].get()) - float(
self.stack_acq_widgets["start_focus"].get()
)
f_stop = float(self.variables["f_end"].get()) - float(
self.stack_acq_widgets["end_focus"].get()
)
f_start = float(self.variables["f_start"].get()) #- float(self.stack_acq_widgets["start_focus"].get())
f_stop = float(self.variables["f_end"].get()) #- float(self.stack_acq_widgets["end_focus"].get())
f_tiles = int(self.variables["f_tiles"].get())

# for consistency, always go from low to high
Expand All @@ -308,11 +301,12 @@ def sort_vars(a, b):
return b, a
return a, b

#NOTE: Sorting variables breaks down if the F and Z stage are not both moving in the same direction. On our microscope, the F stage moves in a negative direction for positive z-stack acquisitions. I also have a hard coded (-) in commmon_features.py to allow the focus stage to move in a negative direction.
x_start, x_stop = sort_vars(x_start, x_stop)
y_start, y_stop = sort_vars(y_start, y_stop)
z_start, z_stop = sort_vars(z_start, z_stop)
r_start, r_stop = sort_vars(r_start, r_stop)
f_start, f_stop = sort_vars(f_start, f_stop)
# f_start, f_stop = sort_vars(f_start, f_stop)

overlap = float(self._percent_overlap) / 100
table_values = compute_tiles_from_bounding_box(
Expand Down
28 changes: 18 additions & 10 deletions src/navigate/model/device_startup_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,18 @@ def load_stages(
exception=TLFTDICommunicationError,
)
)

elif stage_type == "KST101":
elif stage_type == "KINESIS" and platform.system() == "Linux":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now obsolete with the latest PR.

from navigate.model.devices.stages.tl_kinesis_steppermotor import (
build_KINESIS_Stage_connection
)
stage_devices.append(
auto_redial(
build_KINESIS_Stage_connection,
(stage_config["serial_number"],),
exception=Exception
)
)
elif stage_type == "KST101" and platform.system=="Windows":
from navigate.model.devices.stages.tl_kcube_steppermotor import (
build_TLKSTStage_connection,
)
Expand Down Expand Up @@ -562,12 +572,8 @@ def load_stages(
)
)

elif stage_type == "MS2000" and platform.system() == "Windows":
"""Filter wheel can be controlled from the same Controller. If
so, then we will load this as a shared device. If not, we will create the
connection to the Controller.

TODO: Evaluate whether MS2000 should be able to operate as a shared device.
elif stage_type == "MS2000":
"""Stage and filter wheel are independent and should not be a shared device
"""

from navigate.model.devices.stages.asi_MSTwoThousand import (
Expand Down Expand Up @@ -705,7 +711,10 @@ def start_stage(
from navigate.model.devices.stages.tl_kcube_inertial import TLKIMStage

return TLKIMStage(microscope_name, device_connection, configuration, id)

elif device_type == "KINESIS":
Copy link
Collaborator

@AdvancedImagingUTSW AdvancedImagingUTSW Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This device should also be added to the configuration wizard.

from navigate.model.devices.stages.tl_kinesis_steppermotor import TLKINStage

return TLKINStage(microscope_name, device_connection, configuration, id)
elif device_type == "KST101":
from navigate.model.devices.stages.tl_kcube_steppermotor import TLKSTStage

Expand Down Expand Up @@ -1265,7 +1274,6 @@ def start_lasers(
modulation = "analog"
elif digital == "NI":
modulation = "digital"

return LaserNI(
microscope_name=microscope_name,
device_connection=device_connection,
Expand Down
13 changes: 8 additions & 5 deletions src/navigate/model/devices/APIs/asi/asi_MS2000_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import threading
import time
import logging
import platform

# Third Party Imports
from serial import Serial
Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(self, code: str):
":N-6": "Undefined Error (command is incorrect, but the controller does "
"not know exactly why.",
":N-21": "Serial Command halted by the HALT command",
":N-21\r\n": "Serial Command halted by the HALT command",
}
#: str: Error code received from MS2000 Console
self.code = code
Expand Down Expand Up @@ -191,8 +193,9 @@ def connect_to_serial(
self.serial_port.write_timeout = write_timeout
self.serial_port.timeout = read_timeout

# set the size of the rx and tx buffers before calling open
self.serial_port.set_buffer_size(rx_size, tx_size)
if platform.system()=="Windows":
# Only changed the buffer size in windows
self.serial_port.set_buffer_size(rx_size, tx_size)
try:
self.serial_port.open()
except SerialException:
Expand All @@ -216,8 +219,8 @@ def connect_to_serial(
"X",
"Y",
"Z",
] # self.get_default_motor_axis_sequence()

]
def get_default_motor_axis_sequence(self) -> None:
"""Get the default motor axis sequence from the ASI device

Expand Down Expand Up @@ -374,7 +377,7 @@ def read_response(self) -> str:
self.report_to_console(f"Received Response: {response.strip()}")
if response.startswith(":N"):
logger.error(f"Incorrect response received: {response}")
raise MS2000Exception(response)
raise MS2000Exception(response.strip())

return response # in case we want to read the response

Expand Down
149 changes: 149 additions & 0 deletions src/navigate/model/devices/APIs/thorlabs/pykinesis_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""
API for connection to Thorlabs.MotionControl.KCube.StepperMotor.dll.
See Thorlabs.MotionControl.KCube.StepperMotor.h for more functions to implement.
"""

"""
2024/10/23 Sheppard: Initialized to control Kinesis Stepper motor in Linux
"""
from pylablib.devices import Thorlabs
import logging
# Local Imports

# Logger Setup
p = __name__.split(".")[1]
logger = logging.getLogger(p)

class KinesisStage():
def __init__(self, dev_path: str, verbose: bool):
"""_summary_

Args:
connection (_type_): _description_
"""
connection = {"port":dev_path,"baudrate":115200,"rtscts":True}
self.verbose = verbose
self.dev_path = dev_path
self.defualt_axes = ["f"]

self.move_params = {"min_velocity":None,
"max_velocity":None,
"acceleration":None}

self.open(connection)


def __str__(self) -> str:
"""Returns the string representation of the MS2000 Controller class"""
return "KinesisController"

def open(self, connection):
"""
Open the device for communications.

Parmeters
---------
serial_number : str
Serial number of Thorlabs Kinesis Stepper Motor (KST) device.

Returns
-------
int
The error code or 0 if successful.
"""
try:
self.stage = Thorlabs.KinesisMotor(("serial", connection), scale="step")
success = True
except Exception as e:
success = False
raise ConnectionError(f"KST101 stage connection failed! \nError: {e}")

def close(self):
"""
Disconnect and close the device.

Parmeters
---------
serial_number : str
Serial number of Thorlabs Kinesis Stepper Motor (KST) device.

Returns
-------
None
"""
self.stage.stop()
self.stage.close()

def move_to_position(self, position, steps_per_um, wait_till_done):
"""Move to position (um)
"""
self.stage.get_position(channel=1, scale=False)
cur_pos = self.stage.get_position(channel=1, scale=False)
position_um = cur_pos / steps_per_um
# calculate the distance needed to move
distance = position - position_um
# convert total distance to steps
steps = steps_per_um * distance
self.stage.move_by(steps, channel=1, scale=False)
if wait_till_done:
self.stage.wait_move(channel=1)
return 0

def get_current_position(self, steps_per_um):
"""Get the current position

Parmeters
---------
serial_number : str
Serial number of Thorlabs Kinesis Stepper Motor (KST) device.

Returns
-------
int
Current position.
"""
self.stage.get_position(channel=1, scale=False)
position = self.stage.get_position(channel=1, scale="False")
position_um = position / steps_per_um
return round(position_um, 2)

def stop(self):
"""
Halt motion

Parmeters
---------
serial_number : str
Serial number of Thorlabs Kinesis Stepper Motor (KST) device.
channel : int
The device channel. One of SCC_Channels.

Returns
-------
int
The error code or 0 if successful.
"""
self.stage.stop()
return 0

def home_stage(self):
"""Home Device
"""
self.stage.home()
return 0

def set_velocity_params(self,
min_velocity,
max_velocity,
acceleration,
steps_per_um):
"""Set velocity profile required for move
"""
min_velocity *= steps_per_um
max_velocity *= steps_per_um
acceleration *= steps_per_um
self.stage.set_move_params(min_velocity, max_velocity, acceleration)
self.move_params = {"min_velocity":min_velocity,
"max_velocity":max_velocity,
"acceleration":acceleration}
return 0
4 changes: 2 additions & 2 deletions src/navigate/model/devices/camera/photometrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ def build_photometrics_connection(camera_connection):
"""
try:
pvc.init_pvcam()
# camera_names = Camera.get_available_camera_names()
camera_to_open = Camera.select_camera(camera_connection)
camera_names = Camera.get_available_camera_names()
camera_to_open = Camera.select_camera(camera_names[0])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this assume only one camera?

camera_to_open.open()
return camera_to_open
except Exception as e:
Expand Down
14 changes: 8 additions & 6 deletions src/navigate/model/devices/daq/ni.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,14 @@ def set_external_trigger(self, external_trigger=None) -> None:
self.analog_output_tasks[
board_name
].triggers.start_trigger.cfg_dig_edge_start_trig(trigger_source)
try:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it was throwing an error, but handled with a try/except statement, why did you have to remove it?

self.analog_output_tasks[board_name].register_done_event(None)
except Exception:
logger.debug(
f"Error Registering Done Event: {traceback.format_exc()}"
)
# NOTE: this was causing an error for me using PCIe-6343 in Linux. Not sure if it was board or OS related.
# try:
# # print(board_name)
# # self.analog_output_tasks[board_name].register_done_event(None)
# except Exception:
# logger.debug(
# f"Error Registering Done Event: {traceback.format_exc()}"
# )
else:
# close master trigger task
if self.master_trigger_task:
Expand Down
Loading
Loading