Skip to content

Adding support for GeniCam-compatible cameras #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
171 changes: 171 additions & 0 deletions labscript_devices/GeniCam/_genicam/_feature_tree.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from genicam.genapi import NodeMap
from genicam.genapi import EInterfaceType, EAccessMode, EVisibility

from ._feature_value_tuple import (FeatureValueTuple, FeatureType,
FeatureAccessMode, FeatureVisibility)

from ._tree import TreeNode

_readable_nodes = [
EInterfaceType.intfIBoolean,
EInterfaceType.intfIEnumeration,
EInterfaceType.intfIFloat,
EInterfaceType.intfIInteger,
EInterfaceType.intfIString,
EInterfaceType.intfIRegister,
]

_readable_access_modes = [EAccessMode.RW, EAccessMode.RO]


def populate_feature_value_tuple(feature):
interface_type = feature.node.principal_interface_type
value = None
if interface_type in [EInterfaceType.intfIBoolean,
EInterfaceType.intfIFloat,
EInterfaceType.intfIInteger]:
value = feature.value
else:
try:
value = str(feature.value)
except AttributeError:
try:
value = feature.to_string()
except AttributeError:
return None

visibility = feature.node.visibility
access_mode = feature.node.get_access_mode()
entries = None
if interface_type == EInterfaceType.intfIEnumeration:
entries = [ item.symbolic for item in feature.entries ]

return FeatureValueTuple(
name=feature.node.display_name,
value=value,
type=FeatureType(int(interface_type)),
entries=entries,
access_mode=FeatureAccessMode(int(access_mode)),
visibility=FeatureVisibility(int(visibility))
)


class GeniCamFeatureTreeNode(TreeNode):
def __init__(self, name, parent_node=None, data=None, child_nodes=None):
super().__init__(name, parent_node, data, child_nodes)

@property
def feature(self):
return self.data

@classmethod
def get_tree_from_genicam_root_node(cls, root_node):
root = cls("Root", child_nodes={})
features = root_node.features

cls.populate_feature_tree(features, root)

return root

@classmethod
def populate_feature_tree(cls, features, parent_item):
for feature in features:
interface_type = feature.node.principal_interface_type

if interface_type == EInterfaceType.intfICategory:
item = cls(feature.node.display_name, parent_node=parent_item, child_nodes={})
cls.populate_feature_tree(feature.features, item)
else:
item = cls(feature.node.display_name, parent_node=parent_item, data=feature)

parent_item.add_child(item)

@staticmethod
def _eval_feature(feature):
value_tuple = populate_feature_value_tuple(feature)

return value_tuple

def __getitem__(self, k):
if self.child_nodes:
return self.child_nodes[k]
else:
raise KeyError(f"Node `{self.name}` is a data node that has no children.")

def set_attributes(self, attr_dict, set_if_changed=False):
return self._set_attributes(attr_dict, [], set_if_changed)

def _set_attributes(self, attr_dict, parent_paths, set_if_changed=False):
for k in attr_dict.keys():
v = attr_dict[k]
if isinstance(v, dict):
attr_dict[k] = self._set_attributes(v, parent_paths + [k])
else:
feature = self.access(parent_paths + [k]).data

interface_type = feature.node.principal_interface_type

try:
if interface_type == EInterfaceType.intfICommand:
if v:
feature.execute()
elif interface_type == EInterfaceType.intfIBoolean:
_v = True if v.lower() == 'true' else False
if not set_if_changed or feature.value != _v: # what's the point of doing this? should I cache?
feature.value = _v
elif interface_type == EInterfaceType.intfIFloat:
_v = float(v)
if not set_if_changed or feature.value != _v:
feature.value = _v
else:
if not set_if_changed or feature.value != v:
feature.value = v

except Exception:
pass

if interface_type == EInterfaceType.intfICommand:
attr_dict[k] = False
else:
attr_dict[k] = feature.value

return attr_dict

def filter_tree_with_visibility(self, visibility=None, must_writable=False):
visibility = EVisibility.Guru if visibility is None else visibility

if isinstance(visibility, str):
_visibility = {
"beginner": EVisibility.Beginner,
"expert": EVisibility.Expert,
"guru": EVisibility.Guru
}[visibility.lower()]
else:
_visibility = visibility

filtered_tree = GeniCamFeatureTreeNode.filter_tree(
self,
lambda n: n if (int(n.node.visibility) <= int(_visibility) and ((n.node.get_access_mode() == EAccessMode.RW) \
if must_writable else (n.node.get_access_mode() in [EAccessMode.RO, EAccessMode.RW]))) else None
)

return filtered_tree

def dump_value_dict(self, visibility=None, writable=False):
filtered_tree = self.filter_tree_with_visibility(visibility, writable)

if filtered_tree:
value_tree = TreeNode.eval_tree(filtered_tree,
lambda node: self._eval_feature(node).value)
return value_tree.dump_value_dict()
else:
return {}

def dump_value_tuple_dict(self, visibility=None, writable=False):
filtered_tree = self.filter_tree_with_visibility(visibility, writable)

if filtered_tree:
value_tree = TreeNode.eval_tree(filtered_tree, self._eval_feature)
return value_tree.dump_value_dict()
else:
return {}
39 changes: 39 additions & 0 deletions labscript_devices/GeniCam/_genicam/_feature_value_tuple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from collections import namedtuple
from enum import Enum


# The purpose of provising these homemade types instead of using Harvester is to
# not forcing the installation of Harvester on BLACS computer simply because I need
# a few Enum types (only the worker computer needs Harvester)

FeatureValueTuple = namedtuple("FeatureValueTuple", ["name", "value", "type", "entries",
"access_mode", "visibility"])


class FeatureType(Enum):
Value = 0
Base = 1
Integer = 2
Boolean = 3
Command = 4
Float = 5
String = 6
Register = 7
Category = 8
Enumeration = 9
EnumEntry = 10
Port = 11

class FeatureAccessMode(Enum):
NI = 0
NA = 1
WO = 2
RO = 3
RW = 4

class FeatureVisibility(Enum):
Beginner = 0
Expert = 1
Guru = 2
Invisible = 3

151 changes: 151 additions & 0 deletions labscript_devices/GeniCam/_genicam/_genicam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from harvesters.core import Harvester
from genicam.genapi import NodeMap
from genicam.genapi import EInterfaceType, EAccessMode, EVisibility
from genicam.gentl import TimeoutException, GenericException

import time
import logging
import numpy as np

from ._feature_tree import GeniCamFeatureTreeNode


def nested_get(dct, keys):
for key in keys:
dct = dct[key]
return dct


class GeniCamException(Exception):
pass


class GeniCam:
_readable_nodes = [
EInterfaceType.intfIBoolean,
EInterfaceType.intfIEnumeration,
EInterfaceType.intfIFloat,
EInterfaceType.intfIInteger,
EInterfaceType.intfIString,
EInterfaceType.intfIRegister,
]

_readable_access_modes = [EAccessMode.RW, EAccessMode.RO]

def __init__(self, serial_number, cti_path, logger=None):
self.logger = logger if logger else logging.getLogger()

self.raise_exception_on_failed_shot = False
self._image_fetch_polling_interval = 0.01 ## c.f. harvesters/core.py, `_timeout_on_client_fetch_call`

self.harvester = Harvester()

self.harvester.add_file(cti_path, check_validity=True)
self.harvester.update()

self.ia = None

try:
self.ia = self.harvester.create({'serial_number': serial_number})
except IndexError:
logging.error(f"Couldn't not find camera with serial number {serial_number}. List of available cameras:")
logging.error(self.harvester.device_info_list)

raise GeniCamException(f"Couldn't not find camera with serial number {serial_number}.")

self.feature_tree = GeniCamFeatureTreeNode.get_tree_from_genicam_root_node(
self.ia.remote_device.node_map.Root)

self._abort_acquisition = False

def snap(self, timeout):
mode_feat = self.feature_tree["Acquisition"]["AcquisitionMode"].data
old_mode = mode_feat.value
mode_feat.value = "SingleFrame"

self.ia.start()
img = self.fetch(timeout=timeout)
self.ia.stop()

mode_feat.value = old_mode

if img is None:
raise Exception("Acqusition timeout.")

return img

def fetch(self, timeout: float =0, raise_when_timeout=False):
try:
with self.ia.fetch(timeout=timeout) as buffer:
component = buffer.payload.components[0]
data = np.copy(component.data)
_2d = data.reshape(component.height, component.width)

return _2d
except TimeoutException as e:
if raise_when_timeout:
raise e
return None

def fetch_n_images(self, n_images, callback=None, timeout=0):
self._abort_acquisition = False

images = []

base = time.time()

poll_timeout = 100e-6 # polling every 100us

self.logger.debug(f"Polling for {n_images} images...")
for i in range(n_images):
while True:
elapsed = time.time() - base
if self._abort_acquisition:
self.logger.info("Received abort signal during acquisition.")
self._abort_acquisition = False
if callback:
callback(images)

return images

if timeout and elapsed > timeout:
raise Exception(f"Acqusition timeout while waiting for the {i+1}/{n_images} image.")

try:
img = self.fetch(poll_timeout, True)
break
except TimeoutException:
continue
except GenericException as e:
if self.raise_exception_on_failed_shot:
raise e
else:
self.logger.error(f"Error when acquiring image {i+1}/{n_images}:")
self.logger.exception(e)

if img is not None:
images.append(img)
self.logger.debug(f"Received image {i+1}/{n_images}.")

self.logger.debug(f"Received all {n_images} images.")

if callback:
callback(images)

return images

def stop_acquisition(self):
self.ia.stop()

def start_acquisition(self):
self._abort_acquisition = False
self.ia.start()

def abort_acquisition(self):
self._abort_acquisition = True
self.ia.stop()

def close(self):
if self.ia:
self.ia.destroy()

Loading