from ...core.utils import dictionary, py3, general
from ...core.devio import data_format, interface
from ...core.dataproc import image as image_utils
import numpy as np
import contextlib
import time
import collections
from . import IMAQdx_lib
lib=IMAQdx_lib.lib
try:
lib.initlib()
except (ImportError, OSError):
pass
IMAQdxError=IMAQdx_lib.IMAQdxGenericError
_depends_local=[".IMAQdx_lib","...core.devio.interface"]
[docs]class IMAQdxAttribute(object):
"""
Object representing an IMAQdx camera parameter.
Allows to query and set values and get additional information.
Usually created automatically by an :class:`IMAQdxCamera` instance, but could be created manually.
Attributes:
name: attribute name
display_name: attribute display name (short description name)
tooltip: longer attribute description
description: full attribute description (usually, same as `tooltip`)
units: attribute units (if applicable)
readable (bool): whether attribute is readable
writable (bool): whether attribute is writable
min (float or int): minimal attribute value (if applicable)
max (float or int): maximal attribute value (if applicable)
inc (float or int): minimal attribute increment value (if applicable)
values: list of possible attribute values (if applicable)
"""
def __init__(self, sid, name):
object.__init__(self)
self.sid=sid
self.name=name
self.display_name=py3.as_str(lib.IMAQdxGetAttributeDisplayName(sid,name))
self.tooltip=py3.as_str(lib.IMAQdxGetAttributeTooltip(sid,name))
self.description=py3.as_str(lib.IMAQdxGetAttributeDescription(sid,name))
self.units=py3.as_str(lib.IMAQdxGetAttributeUnits(sid,name))
self.readable=lib.IMAQdxIsAttributeReadable(sid,name)
self.writable=lib.IMAQdxIsAttributeWritable(sid,name)
self._attr_type=lib.IMAQdxGetAttributeType(sid,name)
self.type=IMAQdx_lib.IMAQdxAttributeType_enum[self._attr_type]
if self._attr_type in [0,1,2,5]:
self.min=lib.IMAQdxGetAttributeMinimum(sid,name,self._attr_type)
self.max=lib.IMAQdxGetAttributeMaximum(sid,name,self._attr_type)
self.inc=lib.IMAQdxGetAttributeIncrement(sid,name,self._attr_type)
else:
self.min=self.max=self.inc=None
if self._attr_type==4:
self.values=lib.IMAQdxEnumerateAttributeValues(sid,name)
else:
self.values=None
[docs] def update_minmax(self):
"""Update minimal and maximal attribute limits"""
if self._attr_type in [0,1,2,5]:
self.min=lib.IMAQdxGetAttributeMinimum(self.sid,self.name,self._attr_type)
self.max=lib.IMAQdxGetAttributeMaximum(self.sid,self.name,self._attr_type)
self.inc=lib.IMAQdxGetAttributeIncrement(self.sid,self.name,self._attr_type)
[docs] def truncate_value(self, value):
"""Truncate value to lie within attribute limits"""
self.update_minmax()
if self._attr_type in [0,1,2,5]:
if value<self.min:
value=self.min
elif value>self.max:
value=self.max
else:
inc=self.inc
if inc>0:
value=((value-self.min)//inc)*inc+self.min
return value
[docs] def get_value(self, enum_as_str=True):
"""
Get attribute value.
If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values.
"""
if not self.readable:
raise IMAQdxError("Attribute {} is not readable".format(self.name))
val=lib.IMAQdxGetAttribute(self.sid,self.name,self._attr_type)
if self._attr_type==4 and enum_as_str:
val=val.Name
return val
[docs] def set_value(self, value, truncate=True):
"""
Get attribute value.
If ``truncate==True``, automatically truncate value to lie within allowed range.
"""
if not self.writable:
raise IMAQdxError("Attribute {} is not writable".format(self.name))
if truncate:
value=self.truncate_value(value)
return lib.IMAQdxSetAttribute(self.sid,self.name,value,None)
def __repr__(self):
return "{}({})".format(self.__class__.__name__,self.name)
[docs]def list_cameras(connected=True):
"""List all cameras available through IMAQdx interface"""
return lib.IMAQdxEnumerateCameras(connected)
[docs]class IMAQdxCamera(interface.IDevice):
"""
Generic IMAQdx camera interface.
Args:
name: interface name (can be learned by :func:`list_cameras`; usually, but not always, starts with ``"cam"``)
mode: connection mode; can be ``"controller"`` (full control) or ``"listener"`` (only reading)
default_visibility: default attribute visibility when listing attributes;
can be ``"simple"``, ``"intermediate"`` or ``"advanced"`` (higher mode exposes more attributes).
"""
def __init__(self, name="cam0", mode="controller", default_visibility="simple"):
interface.IDevice.__init__(self)
self.init_done=False
self.name=name
self.mode=mode
self.default_visibility=default_visibility
self.sid=None
self.open()
self.image_indexing="rct"
try:
attrs=self.list_attributes()
self.attributes=dictionary.Dictionary(dict([ (a.name.replace("::","/"),a) for a in attrs ]))
except Exception:
self.close()
raise
self.init_done=True
self.acq_params=None
self.frame_counter=0
self.last_wait_frame=-1
self.buffers_num=0
self.post_open()
self.v=dictionary.ItemAccessor(self.get_value,self.set_value)
self._add_full_info_node("model_data",self.get_model_data)
self._add_full_info_node("interface_name",lambda: self.name)
self._add_status_node("attributes",self.get_all_attributes)
self._add_status_node("buffer_size",lambda: self.buffers_num)
self._add_status_node("data_dimensions",self.get_data_dimensions)
self._add_full_info_node("detector_size",self.get_detector_size)
self._add_settings_node("roi",self.get_roi,self.set_roi)
self._add_status_node("roi_limits",self.get_roi_limits)
self._add_status_node("last_frame",self._last_buffer)
self._add_status_node("read_frames",lambda: self.frame_counter)
[docs] def open(self, mode=None):
"""Open connection to the camera"""
mode=self.mode if mode is None else mode
mode=IMAQdx_lib.IMAQdxCameraControlMode_enum.get(mode,mode)
self.sid=lib.IMAQdxOpenCamera(self.name,mode)
self.post_open()
[docs] def close(self):
"""Close connection to the camera"""
if self.sid is not None:
lib.IMAQdxCloseCamera(self.sid)
self.sid=None
[docs] def reset(self):
"""Reset connection to the camera"""
self.close()
lib.IMAQdxResetCamera(self.name,False)
self.open()
[docs] def is_opened(self):
"""Check if the device is connected"""
return self.sid is not None
[docs] def post_open(self):
"""Action to automatically call on opening"""
pass
# _builtin_attrs=["OffsetX","OffsetY","Width","Height","PixelFormat","PayloadSize"]
_builtin_attrs=["OffsetX","OffsetY","Width","Height","PixelFormat","PayloadSize","StatusInformation::LastBufferNumber","AcquisitionAttributes::BitsPerPixel"]
[docs] def list_attributes(self, root="", visibility=None, add_builtin=True):
"""
List all attributes at a given root.
Return list of :class:`IMAQdxAttribute` objects, which allow querying and settings values
and getting additional information (description, limits, increment).
"""
visibility=visibility or self.default_visibility
visibility=IMAQdx_lib.IMAQdxAttributeVisibility_enum.get(visibility,visibility)
root=root.replace("/","::")
attrs=lib.IMAQdxEnumerateAttributes2(self.sid,root,visibility)
attr_names=[a.Name for a in attrs]
if add_builtin:
builtin_attrs=[]
for a in self._builtin_attrs:
if a not in attr_names:
try:
lib.IMAQdxGetAttributeDisplayName(self.sid,a)
builtin_attrs.append(a)
except IMAQdxError:
pass
attr_names+=builtin_attrs
return [IMAQdxAttribute(self.sid,a) for a in attr_names]
[docs] def get_value(self, name, default=None):
"""Get value of the attribute with a given name"""
name=name.replace("::","/")
if (default is not None) and (name not in self.attributes):
return default
if self.attributes.is_dictionary(self.attributes[name]):
return self.get_all_attributes(root=name)
v=self.attributes[name].get_value()
if isinstance(v,py3.new_bytes):
v=py3.as_str(v)
return v
[docs] def set_value(self, name, value, ignore_missing=False, truncate=True):
"""
Set value of the attribute with a given name.
If ``truncate==True``, truncate value to lie within attribute range.
"""
name=name.replace("::","/")
if (name in self.attributes) or (not ignore_missing):
if self.attributes.is_dictionary(self.attributes[name]):
self.set_all_attributes(value,root=name)
else:
self.attributes[name].set_value(value,truncate=truncate)
[docs] def get_all_attributes(self, root="", as_dict=False):
"""
Get values of all attributes with the given `root`.
If ``as_dict==True``, return ``dict`` object; otherwise, return :class:`.Dictionary` object.
"""
settings=self.attributes[root].copy().filter_self(lambda a: a.readable).map_self(lambda a: a.get_value())
return settings.as_dict(style="flat") if as_dict else settings
[docs] def set_all_attributes(self, settings, root="", truncate=True):
"""
Set values of all attributes with the given `root`.
If ``truncate==True``, truncate value to lie within attribute range.
"""
settings=dictionary.as_dict(settings,style="flat",copy=False)
for k in settings:
if k in self.attributes[root] and self.attributes[root,k].writable:
self.attributes[root,k].set_value(settings[k],truncate=truncate)
ModelData=collections.namedtuple("ModelData",["vendor","model","serial_number","bus_type"])
[docs] def get_model_data(self):
"""
Get camera model data.
Return tuple ``(vendor, model, serial_number, bus_type)``.
"""
cam_info=self.v["CameraInformation"]
return self.ModelData(cam_info["VendorName"],cam_info["ModelName"],(cam_info["SerialNumberHigh"],cam_info["SerialNumberLow"]),cam_info["BusType"])
def _get_data_dimensions_rc(self):
return self.v["Height"],self.v["Width"]
[docs] def get_data_dimensions(self):
"""Get the current data dimension (taking ROI and binning into account)"""
return image_utils.convert_shape_indexing(self._get_data_dimensions_rc(),"rc",self.image_indexing)
[docs] def get_detector_size(self):
"""Get camera detector size (in pixels) as a tuple ``(width, height)``"""
return self.attributes["Width"].max,self.attributes["Height"].max
[docs] def get_roi(self):
"""
Get current ROI.
Return tuple ``(hstart, hend, vstart, vend)``.
"""
ox=self.v.get("OffsetX",0)
oy=self.v.get("OffsetY",0)
w=self.v["Width"]
h=self.v["Height"]
return ox,ox+w,oy,oy+h
[docs] def set_roi(self, hstart=0, hend=None, vstart=0, vend=None):
"""
Setup camera ROI.
By default, all non-supplied parameters take extreme values.
"""
for a in ["Width","Height","OffsetX","OffsetY"]:
if a not in self.attributes or not self.attributes[a].writable:
return
det_size=self.get_detector_size()
if hend is None:
hend=det_size[0]
if vend is None:
vend=det_size[1]
with self.pausing_acquisition():
self.v["Width"]=self.attributes["Width"].min
self.v["Height"]=self.attributes["Height"].min
self.v["OffsetX"]=hstart
self.v["OffsetY"]=vstart
self.v["Width"]=max(self.v["Width"],hend-hstart)
self.v["Height"]=max(self.v["Height"],vend-vstart)
return self.get_roi()
[docs] def get_roi_limits(self):
"""
Get the minimal and maximal ROI parameters.
Return tuple ``(min_roi, max_roi)``, where each element is in turn 4-tuple describing the ROI.
"""
params=["OffsetX","OffsetY","Width","Height"]
minp=tuple([(self.attributes[p].min if p in self.attributes else 0) for p in params])
maxp=tuple([(self.attributes[p].max if p in self.attributes else 0) for p in params])
min_roi=(0,0)+minp[2:]
max_roi=maxp
return (min_roi,max_roi)
[docs] def setup_acquisition(self, continuous, frames):
"""
Setup acquisition mode.
`continuous` determines whether acquisition runs continuously, or stops after the given number of frames
(note that :meth:`.IMAQdxCamera.acquisition_in_progress` would still return ``True`` in this case, even though new frames are no longer acquired).
`frames` sets up number of frame buffers.
"""
lib.IMAQdxConfigureAcquisition(self.sid,continuous,frames)
self.acq_params=(continuous,frames)
self.buffers_num=frames
[docs] def clear_acquisition(self):
lib.IMAQdxUnconfigureAcquisition(self.sid)
self.buffers_num=0
[docs] def start_acquisition(self):
if self.acq_params is None:
self.setup_acquisition(True,100)
lib.IMAQdxStartAcquisition(self.sid)
self.frame_counter=0
self.last_wait_frame=-1
[docs] def stop_acquisition(self):
lib.IMAQdxStopAcquisition(self.sid)
self.frame_counter=0
self.last_wait_frame=-1
[docs] def acquisition_in_progress(self):
"""Check if acquisition is in progress"""
return self.v["StatusInformation/AcqInProgress"]
[docs] def refresh_acquisition(self, delay=0.005):
"""Stop and restart the acquisition, waiting `delay` seconds in between"""
self.stop_acquisition()
self.clear_acquisition()
self.setup_acquisition(0,1)
self.start_acquisition()
time.sleep(delay)
self.stop_acquisition()
self.clear_acquisition()
def _last_buffer(self):
last_buffer=self.v["StatusInformation/LastBufferNumber"]
return last_buffer if last_buffer<2**31 else -1
[docs] @contextlib.contextmanager
def pausing_acquisition(self):
"""
Context manager which temporarily pauses acquisition during execution of ``with`` block.
Useful for applying certain settings which can't be changed during the acquisition.
"""
acq_params=self.acq_params
acq_in_progress=self.acquisition_in_progress()
try:
self.stop_acquisition()
self.clear_acquisition()
yield
finally:
if acq_params:
self.setup_acquisition(*acq_params)
if acq_in_progress:
self.start_acquisition()
[docs] def get_new_images_range(self):
"""
Get the range of the new images.
Return tuple ``(first, last)`` with images range (inclusive).
If no images are available, return ``None``.
"""
newest_img=self._last_buffer()
if not self.acquisition_in_progress() or newest_img<0:
return None
if self.frame_counter>newest_img:
return None
if self.buffer_valid(self.frame_counter):
return (self.frame_counter,newest_img)
if self.buffer_valid(newest_img-self.buffers_num+1) and not self.buffer_valid(newest_img-self.buffers_num):
return (newest_img-self.buffers_num+1,newest_img)
valid_buffer=self._get_oldest_valid_buffer(self.frame_counter,newest_img)
return (valid_buffer,newest_img)
[docs] def wait_for_frame(self, since="lastread", timeout=20., period=1E-3):
"""
Wait for a new camera frame.
`since` specifies what constitutes a new frame.
Can be ``"lastread"`` (wait for a new frame after the last read frame),
``"lastwait"`` (wait for a new frame after last :meth:`wait_for_frame` call),
or ``"now"`` (wait for a new frame acquired after this function call).
If `timeout` is exceeded, raise :exc:`.IMAQdxGenericError`.
`period` specifies camera polling period.
"""
ctd=general.Countdown(timeout)
last_call_frame=self._last_buffer()
while not ctd.passed():
last_frame=self._last_buffer()
since_last_wait=last_frame-self.last_wait_frame
self.last_wait_frame=last_frame
if since=="lastread" and last_frame>=self.frame_counter:
return
if since=="now" and last_frame>last_call_frame:
return
if since=="lastwait" and since_last_wait>0:
return
tl=ctd.time_left()
time.sleep(period if tl is None else min(period,tl))
raise IMAQdxError()
[docs] def read_data_raw(self, size_bytes, mode, buffer_num=0):
"""Return raw bytes string from the given buffer number"""
mode=IMAQdx_lib.IMAQdxBufferNumberMode_enum.get(mode,mode)
return lib.IMAQdxGetImageData(self.sid,size_bytes,mode,buffer_num)
[docs] def buffer_valid(self, buffer_num):
return self.read_data_raw(0,2,buffer_num=buffer_num)[1]==buffer_num
def _get_oldest_valid_buffer(self, start, stop):
if start>stop or not self.buffer_valid(stop):
return None
if self.buffer_valid(start):
return start
while start<stop-1:
mid=(start+stop)//2
start,stop=(start,mid) if self.buffer_valid(mid) else (mid,stop)
return stop
[docs]class IMAQdxPhotonFocusCamera(IMAQdxCamera):
"""
IMAQdx interface to a PhotonFocus camera.
Args:
name: interface name (can be learned by :func:`list_cameras`; usually, but not always, starts with ``"cam"``)
mode: connection mode; can be ``"controller"`` (full control) or ``"listener"`` (only reading)
default_visibility: default attribute visibility when listing attributes;
can be ``"simple"``, ``"intermediate"`` or ``"advanced"`` (higher mode exposes more attributes).
small_packet_size: if ``True``, automatically set up Ethernet packet size to 1500 bytes.
"""
def __init__(self, name, mode="controller", default_visibility="simple", small_packet_size=True):
self.small_packet_size=small_packet_size
IMAQdxCamera.__init__(self,name,mode=mode,default_visibility=default_visibility)
self._add_settings_node("exposure",self.get_exposure,self.set_exposure)
# self._add_status_node("readout_time",self.get_readout_time)
# self._add_status_node("acq_status",self.get_status)
[docs] def post_open(self):
if self.init_done and self.small_packet_size:
self.set_value("AcquisitionAttributes/PacketSize",1500,ignore_missing=True)
[docs] def get_exposure(self):
"""Get current exposure"""
return self.v["CameraAttributes/AcquisitionControl/ExposureTime"]*1E-6
[docs] def set_exposure(self, exposure):
"""Set current exposure"""
with self.pausing_acquisition():
self.v["CameraAttributes/AcquisitionControl/ExposureTime"]=exposure*1E6
return self.get_exposure()
[docs] def setup_acquisition(self, continuous, frames):
"""
Setup acquisition mode.
`continuous` determines whether acquisition runs continuously, or stops after the given number of frames
(note that :meth:`IMAQdxCamera.acquisition_in_progress` would still return ``True`` in this case, even though new frames are no longer acquired).
`frames` sets up number of frame buffers.
"""
IMAQdxCamera.setup_acquisition(self,continuous,frames)
if continuous:
self.buffers_num=frames//2 # seems to be the case
def _get_bpp(self):
pform=self.v["PixelFormat"]
if pform.startswith("Mono"):
pform=pform[4:]
if pform.endswith("Packed"):
raise IMAQdxError("packed pixel format isn't currently supported: {}".format("Mono"+pform))
try:
return (int(pform)-1)//8+1
except ValueError:
pass
raise IMAQdxError("unrecognized pixel format: {}".format(pform))
def _bytes_to_frame(self, raw_data):
dim=self._get_data_dimensions_rc()
bpp=self._get_bpp()
dtype=data_format.DataFormat(bpp,"i","<")
img=np.fromstring(raw_data,dtype=dtype.to_desc("numpy")).reshape((dim[0],dim[1]))
return image_utils.convert_image_indexing(img,"rct",self.image_indexing)
[docs] def peek_frame(self, mode="last", buffer_num=0):
"""
Read a frame without marking it as read
`mode` specifies frame selection mode: can be ``"first"`` (first available), ``"last"`` (last read),
or ``"number"`` (index determined by `buffer_num`).
Return tuple ``(frame, buffer_num)``.
"""
raw_data,buffer_num=self.read_data_raw(self.v["PayloadSize"],mode=mode,buffer_num=buffer_num)
return self._bytes_to_frame(raw_data),buffer_num
[docs] def read_multiple_images(self, rng=None, peek=False, skip=False, missing_frame="skip"):
"""
Read multiple images specified by `rng` (by default, all un-read images).
If ``peek==True``, return images but not mark them as read.
If ``skip==True``, mark frames as read but don't read them (i.e., reading with ``peek==True`` and ``skip==True`` does nothing).
`missing_frame` determines what to do with frames which are out of range (missing or lost):
can be ``"none"`` (replacing them with ``None``), ``"zero"`` (replacing them with zero-filled frame),
or ``"skip"`` (skipping them).
"""
new_range=self.get_new_images_range()
if rng is None:
rng=new_range
missing_frame="skip"
elif new_range:
rng=rng[0],min(rng[1],new_range[1]) if isinstance(rng,(tuple,list)) else new_range[1]-rng,new_range[1]
else:
rng=None
frames=None if skip else []
if rng is None:
return frames
if not skip:
frame_bytes=self.v["PayloadSize"]
dim=self.get_data_dimensions()
for i in range(rng[0],rng[1]+1):
raw_data,buffer_num=self.read_data_raw(frame_bytes,mode="number",buffer_num=i)
frame=self._bytes_to_frame(raw_data)
if buffer_num==i:
frames.append(frame)
elif missing_frame=="none":
frames.append(None)
elif missing_frame=="zero":
frames.append(np.zeros(dim))
if not peek:
self.frame_counter=max(self.frame_counter,rng[1]+1)
if missing_frame!="none":
frames=np.asarray(frames)
return frames
[docs] def snap(self, timeout=20.):
"""Snap a single image (with preset image read mode parameters)"""
self.refresh_acquisition()
self.setup_acquisition(False,1)
self.start_acquisition()
self.wait_for_frame(timeout=timeout)
frame=self.read_multiple_images()[0]
self.stop_acquisition()
self.clear_acquisition()
return frame