from ...core.devio.interface import IDevice
from ...core.utils import py3, funcargparse, dictionary, general
from ...core.dataproc import image as image_utils
_depends_local=[".DCAM_lib","...core.devio.interface"]
import numpy as np
import collections
import ctypes
import contextlib
import time
from future.utils import raise_from
from .DCAM_lib import lib, DCAMLibError
[docs]class DCAMError(RuntimeError):
"Generic Hamamatsu camera error."
[docs]class DCAMTimeoutError(DCAMError):
"Timeout while waiting."
[docs]def get_cameras_number():
"""Get number of connected Hamamatsu cameras"""
lib.initlib()
try:
return lib.dcamapi_init()
except DCAMLibError:
return 0
_open_cameras=0
[docs]def restart_lib():
global _open_cameras
lib.dcamapi_uninit()
_open_cameras=0
_rpyc=False
[docs]class DCAMCamera(IDevice):
def __init__(self, idx=0):
IDevice.__init__(self)
lib.initlib()
self.idx=idx
self.handle=None
self.dcamwait=None
self.properties={}
self._alloc_nframes=0
self._default_nframes=100
self._acq_mode=None
self.open()
self.image_indexing="rct"
self._last_frame=None
self.v=dictionary.ItemAccessor(self.get_value,self.set_value)
self._add_full_info_node("model_data",self.get_model_data)
self._add_status_node("properties",self.get_all_properties)
self._add_settings_node("trigger_mode",self.get_trigger_mode,self.set_trigger_mode)
self._add_settings_node("ext_trigger",self.get_ext_trigger_parameters,self.setup_ext_trigger)
self._add_settings_node("exposure",self.get_exposure,self.set_exposure)
self._add_settings_node("readout_speed",self.get_readout_speed,self.set_readout_speed)
self._add_status_node("readout_time",self.get_readout_time)
self._add_status_node("buffer_size",self.get_buffer_size)
self._add_status_node("data_dimensions",self.get_data_dimensions)
self._add_full_info_node("detector_size",self.get_detector_size)
self._add_settings_node("roi",self.get_roi,self.set_roi)
self._add_status_node("roi_limits",self.get_roi_limits)
self._add_status_node("acq_status",self.get_status)
self._add_status_node("transfer_info",self.get_transfer_info)
[docs] def open(self):
"""Open connection to the camera"""
global _open_cameras
ncams=get_cameras_number()
if self.idx>=ncams:
raise DCAMError("camera index {} is not available ({} cameras exist)".format(self.idx,ncams))
try:
self.handle=lib.dcamdev_open(self.idx)
_open_cameras+=1
self.dcamwait=lib.dcamwait_open(self.handle)
self._update_properties_list()
except DCAMLibError:
self.close()
[docs] def close(self):
"""Close connection to the camera"""
global _open_cameras
if self.handle:
lib.dcamwait_close(self.dcamwait.hwait)
lib.dcamdev_close(self.handle)
_open_cameras-=1
if not _open_cameras:
lib.dcamapi_uninit()
self.handle=None
[docs] def is_opened(self):
"""Check if the device is connected"""
return self.handle is not None
ModelData=collections.namedtuple("ModelData",["vendor","model","serial_number","camera_version"])
[docs] def get_model_data(self):
"""
Get camera model data.
Return tuple ``(vendor, model, serial_number, camera_version)``.
"""
vendor=py3.as_str(lib.dcamdev_getstring(self.handle,67109123))
model=py3.as_str(lib.dcamdev_getstring(self.handle,67109124))
serial_number=py3.as_str(lib.dcamdev_getstring(self.handle,67109122))
camera_version=py3.as_str(lib.dcamdev_getstring(self.handle,67109125))
model_data=self.ModelData(vendor,model,serial_number,camera_version)
return tuple(model_data) if _rpyc else model_data
[docs] class Property(object):
def __init__(self, cam_handle, name, id, min, max, step, default, unit):
object.__init__(self)
self.cam_handle=cam_handle
self.name=name
self.id=id
self.min=min
self.max=max
self.step=step
self.default=default
self.unit=unit
[docs] def as_text(self, value=None):
"""Get property value as text (by default, current value)"""
if value is None:
value=self.get_value()
return lib.dcamprop_getvaluetext(self.cam_handle,self.id,value)
[docs] def get_value(self):
"""Get current property value"""
return lib.dcamprop_getvalue(self.cam_handle,self.id)
[docs] def set_value(self, value):
"""Set property value"""
return lib.dcamprop_setgetvalue(self.cam_handle,self.id,value)
def __repr__(self):
return "{}(name='{}', id={}, min={}, max={}, unit={})".format(self.__class__.__name__,self.name,self.id,self.min,self.max,self.unit)
[docs] def list_properties(self):
"""Return list of all available properties"""
ids=lib.dcamprop_getallids(self.handle,0)
names=[lib.dcamprop_getname(self.handle,i) for i in ids]
props=[lib.dcamprop_getattr(self.handle,i) for i in ids]
props=[self.Property(self.handle,name,idx,p.valuemin,p.valuemax,p.valuestep,p.valuedefault,p.iUnit) for (idx,name,p) in zip(ids,names,props)]
return props
[docs] def get_all_properties(self):
props=self.list_properties()
result={}
for prop in props:
name=py3.as_str(prop.name).lower().replace(" ","_")
result[name]={}
result[name]["value"]=prop.get_value()
try:
result[name]["text_value"]=prop.as_text()
except DCAMLibError:
pass
return result
def _update_properties_list(self):
props=self.list_properties()
for p in props:
self.properties[py3.as_str(p.name)]=p
[docs] def get_value(self, name, error_on_missing=True, default=None):
"""
Get value of a property with the given name.
If the value doesn't exist and ``error_on_missing==True``, raise :exc:`DCAMError`; otherwise, return `default`.
"""
if name not in self.properties:
if error_on_missing:
raise DCAMError("can't find property {}".format(name))
else:
return default
return self.properties[name].get_value()
[docs] def set_value(self, name, value, error_on_missing=True):
"""
Set value of a property with the given name.
If the value doesn't exist and ``error_on_missing==True``, raise :exc:`DCAMError`; otherwise, do nothing.
"""
if name not in self.properties:
if error_on_missing:
raise DCAMError("can't find property {}".format(name))
else:
return
return self.properties[name].set_value(value)
[docs] def set_trigger_mode(self, mode):
"""
Set trigger mode.
Can be ``"int"`` (internal), ``"ext"`` (external), or ``"software"`` (software trigger).
"""
trigger_modes={"int":1,"ext":2,"software":3}
funcargparse.check_parameter_range(mode,"mode",trigger_modes.keys())
self.set_value("TRIGGER SOURCE",trigger_modes[mode])
return self.get_trigger_mode()
[docs] def get_trigger_mode(self):
"""
Get trigger mode.
Can be ``"int"`` (internal), ``"ext"`` (external), or ``"software"`` (software trigger).
"""
tm=int(self.get_value("TRIGGER SOURCE"))
if tm in {1,2,3}:
return ["int","ext","software"][tm-1]
else:
raise DCAMError("unknown trigger mode: {}".format(tm))
[docs] def setup_ext_trigger(self, invert=False, delay=0.):
"""Setup external trigger (inversion and delay)"""
self.set_value("TRIGGER POLARITY",2 if invert else 1)
self.set_value("TRIGGER DELAY",delay,error_on_missing=False)
[docs] def get_ext_trigger_parameters(self):
"""Return external trigger parameters (inversion and delay)"""
invert=self.get_value("TRIGGER POLARITY")==2
delay=self.get_value("TRIGGER DELAY",error_on_missing=False)
return invert,delay
[docs] def send_software_trigger(self):
"""Send software trigger signal"""
lib.dcamcap_firetrigger(self.handle)
[docs] def set_exposure(self, exposure):
"""Set camera exposure"""
self.set_value("EXPOSURE TIME",exposure)
return self.get_exposure()
[docs] def get_exposure(self):
"""Set current exposure"""
return self.get_value("EXPOSURE TIME")
[docs] def set_readout_speed(self, speed="fast"):
"""Set readout speed (can be ``"fast"`` or ``"slow"``)"""
self.set_value("READOUT SPEED",1 if speed=="slow" else 2,error_on_missing=False)
return self.get_readout_speed()
[docs] def get_readout_speed(self):
"""Set current readout speed"""
return "fast" if self.get_value("READOUT SPEED",default=2,error_on_missing=False)==2 else "slow"
[docs] def get_readout_time(self):
"""Set current readout time"""
return self.get_value("TIMING READOUT TIME")
[docs] def get_defect_correct_mode(self):
"""Check if the defect pixel correction mode is on"""
return self.get_value("DEFECT CORRECT MODE",error_on_missing=False,default=1)==2
[docs] def set_defect_correct_mode(self, enabled=True):
"""Enable or disable the defect pixel correction mode"""
self.set_value("DEFECT CORRECT MODE",2 if enabled else 1,error_on_missing=False)
return self.get_defect_correct_mode()
def _allocate_buffer(self, nframes):
self._deallocate_buffer()
if nframes:
lib.dcambuf_alloc(self.handle,nframes)
self._alloc_nframes=nframes
def _deallocate_buffer(self):
lib.dcambuf_release(self.handle,0)
self._alloc_nframes=0
self._last_frame=None
def _read_buffer(self, buffer):
return lib.dcambuf_lockframe(self.handle,buffer)
@contextlib.contextmanager
def _reset_buffers(self):
nframes=self._alloc_nframes
self._deallocate_buffer()
try:
yield
finally:
self._allocate_buffer(nframes)
def _buffer_to_array(self, buffer):
bpp=int(buffer.bpp)
if bpp==1:
ct=ctypes.c_uint8*buffer.btot
elif bpp==2:
ct=ctypes.c_uint16*(buffer.btot//2)
elif bpp==4:
ct=ctypes.c_uint32*(buffer.btot//4)
else:
raise DCAMError("can't convert data with {} BBP into an array".format(bpp))
data=ct.from_address(buffer.buf)
img=np.array(data).reshape((buffer.height,buffer.width))
return image_utils.convert_image_indexing(img,"rct",self.image_indexing)
[docs] def get_buffer_size(self):
"""Get the size of the allocated ring buffer (0 if no buffer is allocated)"""
return self._alloc_nframes
FrameInfo=collections.namedtuple("FrameInfo",["framestamp","timestamp_us","camerastamp","left","top","pixeltype"])
[docs] def get_frame(self, buffer, return_info=False):
"""
Get a frame at the given buffer index.
If ``return_info==True``, return tuple ``(data, info)``, where info is the :class:`FrameInfo` instance
describing frame index and timestamp, camera stamp, frame location on the sensor, and pixel type.
Does not advance the read frames counter.
"""
sframe=self._read_buffer(buffer)
info=self.FrameInfo(sframe.framestamp,sframe.timestamp[0]*10**6+sframe.timestamp[1],sframe.camerastamp,sframe.left,sframe.top,sframe.pixeltype)
info=tuple(info) if _rpyc else info
data=self._buffer_to_array(sframe)
return (data,info) if return_info else data
[docs] def get_data_dimensions(self):
"""Get the current data dimension (taking ROI and binning into account)"""
dim=(int(self.get_value("IMAGE WIDTH")),int(self.get_value("IMAGE HEIGHT")))
return image_utils.convert_shape_indexing(dim,"xy",self.image_indexing)
[docs] def get_detector_size(self):
"""Get camera detector size (in pixels) as a tuple ``(width, height)``"""
return (int(self.properties["SUBARRAY HSIZE"].max),int(self.properties["SUBARRAY VSIZE"].max))
[docs] def get_roi(self):
"""
Get current ROI.
Return tuple ``(hstart, hend, vstart, vend, bin)`` (binning is the same for both axes).
"""
hstart=int(self.get_value("SUBARRAY HPOS"))
hend=hstart+int(self.get_value("SUBARRAY HSIZE"))
vstart=int(self.get_value("SUBARRAY VPOS"))
vend=vstart+int(self.get_value("SUBARRAY VSIZE"))
bin=int(self.get_value("BINNING"))
return (hstart,hend,vstart,vend,bin)
[docs] def set_roi(self, hstart=0, hend=None, vstart=0, vend=None, bin=1):
"""
Set current ROI.
By default, all non-supplied parameters take extreme values. Binning is the same for both axes.
"""
with self._reset_buffers():
self.set_value("SUBARRAY MODE",2)
hend=hend or self.properties["SUBARRAY HSIZE"].max
vend=vend or self.properties["SUBARRAY VSIZE"].max
min_roi,max_roi=self.get_roi_limits()
if bin==3:
bin=2
hstart=(hstart//min_roi[2])*min_roi[2]
hend=(hend//min_roi[2])*min_roi[2]
self.set_value("SUBARRAY HSIZE",min_roi[2])
self.set_value("SUBARRAY HPOS",hstart)
self.set_value("SUBARRAY HSIZE",max(hend-hstart,min_roi[2]))
vstart=(vstart//min_roi[3])*min_roi[3]
vend=(vend//min_roi[3])*min_roi[3]
self.set_value("SUBARRAY VSIZE",min_roi[3])
self.set_value("SUBARRAY VPOS",(vstart//min_roi[3])*min_roi[3])
self.set_value("SUBARRAY VSIZE",max(vend-vstart,min_roi[3]))
self.set_value("BINNING",min(bin,max_roi[4]))
return self.get_roi()
[docs] def get_roi_limits(self):
"""
Get the minimal and maximal ROI parameters.
Return tuple ``(min_roi, max_roi)``, where each element is in turn 5-tuple describing the ROI.
"""
params=["SUBARRAY HPOS","SUBARRAY VPOS","SUBARRAY HSIZE","SUBARRAY VSIZE","BINNING"]
minp=tuple([self.properties[p].min for p in params])
maxp=tuple([self.properties[p].max for p in params])
min_roi=(0,0)+minp[2:]
max_roi=maxp
return (min_roi,max_roi)
[docs] def start_acquisition(self, mode="sequence", nframes=None):
"""
Start acquisition.
`mode` can be either ``"snap"`` (since frame or sequency acquisition) or ``"sequence"`` (continuous acquisition).
`nframes` determines number of frames to acquire in ``"snap"`` mode, or size of the ring buffer in the ``"sequence"`` mode (by default, 100).
"""
acq_modes=["sequence","snap"]
funcargparse.check_parameter_range(mode,"mode",acq_modes)
if nframes:
self._allocate_buffer(nframes)
elif not self._alloc_nframes:
self._allocate_buffer(self._default_nframes)
lib.dcamcap_start(self.handle,0 if mode=="snap" else -1)
self._last_frame=-1
self._acq_mode=(mode,nframes)
[docs] def stop_acquisition(self):
"""Stop acquisition"""
lib.dcamcap_stop(self.handle)
self._acq_mode=None
[docs] @contextlib.contextmanager
def pausing_acquisition(self):
"""
Context manager which temporarily pauses acquisition during execution of ``with`` block.
Useful for applying certain settings which can't be changed during the acquisition.
"""
acq_mode=self._acq_mode
try:
self.stop_acquisition()
yield
finally:
if acq_mode:
self.start_acquisition(*acq_mode)
[docs] def get_status(self):
"""
Get acquisition status.
Can be ``"busy"`` (capturing in progress), ``"ready"`` (ready for capturing),
``"stable"`` (not prepared for capturing), ``"unstable"`` (can't be prepared for capturing), or ``"error"`` (some other error).
"""
status=["error","busy","ready","stable","unstable"]
return status[lib.dcamcap_status(self.handle)]
[docs] def get_transfer_info(self):
"""
Get frame transfer info.
Return tuple ``(last_buff, frame_count)``, where ``last_buff`` is the index of the last filled buffer,
and ``frame_count`` is the total number of acquired frames.
"""
return tuple(lib.dcamcap_transferinfo(self.handle,0))
[docs] def get_new_images_range(self):
"""
Get the range of the new images.
Return tuple ``(first, last)`` with images range (inclusive).
If no images are available, return ``None``.
If some images were in the buffer were overwritten, exclude them from the range.
"""
if self._last_frame is None:
return None
_,frame_count=self.get_transfer_info()
oldest_frame=max(self._last_frame+1,frame_count-self.get_buffer_size())
if oldest_frame==frame_count:
return None
return oldest_frame,frame_count-1
[docs] def read_multiple_images(self, rng=None, return_info=False, peek=False):
"""
Read multiple images specified by `rng` (by default, all un-read images).
If ``return_info==True``, return tuple ``(data, info)``, where info is the :class:`FrameInfo` instance
describing frame index and timestamp, camera stamp, frame location on the sensor, and pixel type.
If ``peek==True``, return images but not mark them as read.
"""
if rng is None:
rng=self.get_new_images_range()
dim=self.get_data_dimensions()
if rng is None:
return np.zeros((0,dim[0],dim[1]))
frames=[self.get_frame(n%self._alloc_nframes,return_info=True) for n in range(rng[0],rng[1]+1)]
images,infos=list(zip(*frames))
images=np.array(images)
if not peek:
self._last_frame=max(self._last_frame,rng[1])
return (images,infos) if return_info else images
[docs] def wait_for_frame(self, since="lastread", timeout=20., period=1E-3):
"""
Wait for a new camera frame.
`since` specifies what constitutes a new frame.
Can be ``"lastread"`` (wait for a new frame after the last read frame), ``"lastwait"`` (wait for a new frame after last :meth:`wait_for_frame` call),
or ``"now"`` (wait for a new frame acquired after this function call).
If `timeout` is exceeded, raise :exc:`DCAMTimeoutError`.
"""
funcargparse.check_parameter_range(since,"since",{"lastread","lastwait","now"})
if since=="lastwait":
ctd=general.Countdown(timeout)
while True:
try:
lib.dcamwait_start(self.dcamwait.hwait,0x02,0) # default timeout doesn't work, it's either 0 (for timeout=0) or infinite (for timeout>0)
return
except DCAMLibError as e:
if e.text_code=="DCAMERR_TIMEOUT":
if ctd.passed():
raise_from(DCAMTimeoutError,None)
else:
raise
time.sleep(period)
elif since=="lastread":
while self.get_new_images_range() is None:
self.wait_for_frame(since="lastwait",timeout=timeout)
else:
rng=self.get_new_images_range()
last_img=rng[1] if rng else None
while True:
self.wait_for_frame(since="lastwait",timeout=timeout)
rng=self.get_new_images_range()
if rng and (last_img is None or rng[1]>last_img):
return
[docs] def snap(self, nframes=None, return_info=False):
"""Snap a single image (with preset image read mode parameters)"""
readframes=nframes or 1
self.start_acquisition("snap",nframes=readframes)
while self.get_new_images_range()!=(0,readframes-1):
self.wait_for_frame(since="lastwait")
frames=self.read_multiple_images(return_info=return_info)
if nframes is None:
frames=(frames[0][0],frames[1][0]) if return_info else frames[0]
return frames