from ...core.utils import dictionary, py3, general
from ...core.devio import data_format, interface
from ...core.dataproc import image as image_utils
import numpy as np
import contextlib
import time
import collections
import re
from .IMAQdx import IMAQdxPhotonFocusCamera as PhotonFocusIMAQdxCamera
from .IMAQ import IMAQCamera, IMAQError, lib as IMAQ_lib
from . import pfcam_lib
lib=pfcam_lib.lib
try:
lib.initlib()
except (ImportError, OSError):
pass
PfcamError=pfcam_lib.PfcamLibError
[docs]class PFGenericError(RuntimeError):
"Generic IMAQ camera error."
_depends_local=[".pfcam_lib",".IMAQ",".IMAQdx","...core.devio.interface"]
[docs]class PfcamProperty(object):
"""
Object representing a pfcam camera property.
Allows to query and set values and get additional information.
Usually created automatically by an :class:`PhotonFocusIMAQCamera` instance, but could be created manually.
Attributes:
name: attribute name
readable (bool): whether property is readable
writable (bool): whether property is writable
is_command (bool): whether property is a command
min (float or int): minimal property value (if applicable)
max (float or int): maximal property value (if applicable)
values: list of possible property values (if applicable)
"""
def __init__(self, port, name):
object.__init__(self)
self.port=port
self.name=py3.as_str(name)
self._token=lib.pfProperty_ParseName(port,self.name)
if self._token==pfcam_lib.PfInvalidToken:
raise PFGenericError("property {} doesn't exist".format(name))
self._type=pfcam_lib.lib.get_ptype_dicts(port)[0][lib.pfProperty_GetType(port,self._token)]
if self._type not in pfcam_lib.ValuePropertyTypes|{"PF_COMMAND"}:
raise PFGenericError("property type {} not supported".format(self._type))
self._flags=lib.pfProperty_GetFlags(port,self._token)
if self._flags&0x02:
raise PFGenericError("property {} is private".format(self.name))
self.is_command=self._type=="PF_COMMAND"
self.readable=not (self._flags&0x20 or self.is_command)
self.writable=not (self._flags&0x10 or self.is_command)
if self._type in {"PF_INT","PF_UINT","PF_FLOAT"}:
self.min=lib.get_property_by_name(port,self.name+".Min")
self.max=lib.get_property_by_name(port,self.name+".Max")
else:
self.min=self.max=None
if self._type=="PF_MODE":
self._values_dict={}
self._values_dict_inv={}
nodes=lib.collect_properties(port,self._token,backbone=False)
for tok,val in nodes:
val=py3.as_str(val)
if lib.pfProperty_GetType(port,tok)==2: # integer token, means one of possible values
ival=lib.pfDevice_GetProperty(port,tok)
self._values_dict[val]=ival
self._values_dict_inv[ival]=val
self.values=list(self._values_dict)
else:
self._values_dict=self._values_dict_inv={}
self.values=None
[docs] def update_minmax(self):
"""Update minimal and maximal property limits"""
if self._type in {"PF_INT","PF_UINT","PF_FLOAT"}:
self.min=lib.get_property_by_name(self.port,self.name+".Min")
self.max=lib.get_property_by_name(self.port,self.name+".Max")
[docs] def truncate_value(self, value):
"""Truncate value to lie within property limits"""
self.update_minmax()
if self.min is not None and value<self.min:
value=self.min
if self.max is not None and value>self.max:
value=self.max
return value
[docs] def get_value(self, enum_as_str=True):
"""
Get property value.
If ``enum_as_str==True``, return enum-style values as strings; otherwise, return corresponding integer values.
"""
if not self.readable:
raise PFGenericError("property {} is not readable".format(self.name))
val=lib.pfDevice_GetProperty(self.port,self._token)
if self._type=="PF_MODE" and enum_as_str:
val=self._values_dict_inv[val]
return val
[docs] def set_value(self, value, truncate=True):
"""
Get property value.
If ``truncate==True``, automatically truncate value to lie within allowed range.
"""
if not self.writable:
raise PFGenericError("property {} is not writable".format(self.name))
if truncate:
value=self.truncate_value(value)
if isinstance(value,py3.anystring) and self._type=="PF_MODE":
value=self._values_dict[value]
lib.pfDevice_SetProperty(self.port,self._token,value)
return self.get_value()
[docs] def call_command(self, arg=0):
"""If property is a command, call it with a given argument; otherwise, raise an error."""
if not self.is_command:
raise PFGenericError("{} is not a PF_COMMAND property".format(self.name))
lib.pfDevice_SetProperty(self.port,self._token,arg)
def __repr__(self):
return "{}({})".format(self.__class__.__name__,self.name)
[docs]def query_camera_name(port):
"""Query cameras name at a given port in pfcam interface"""
lib.pfPortInit()
try:
lib.pfDeviceOpen(port)
raw_name=lib.pfProperty_GetName(port,lib.pfDevice_GetRoot(port))
value=py3.as_str(raw_name) if raw_name is not None else None
lib.pfDeviceClose(port)
return value
except PfcamError:
try:
lib.pfDeviceClose(port)
except PfcamError:
pass
return None
[docs]def list_cameras(supported=False):
"""
List all cameras available through pfcam interface
If ``supported==True``, only return cameras which support pfcam protocol.
"""
ports=range(lib.pfPortInit())
if supported:
ports=[p for p in ports if query_camera_name(p) is not None]
return [(p,lib.pfPortInfo(p)) for p in ports]
[docs]class PhotonFocusIMAQCamera(IMAQCamera):
"""
IMAQ+PFcam interface to a PhotonFocus camera.
Args:
imaq_name: IMAQ interface name (can be learned by :func:`.IMAQ.list_cameras`; usually, but not always, starts with ``"img"``)
pfcam_port: port number for pfcam interface (can be learned by :func:`list_cameras`; port number is the first element of the camera data tuple)
"""
def __init__(self, imaq_name="img0", pfcam_port=0):
self.pfcam_port=pfcam_port
self.pfcam_opened=False
self.v=dictionary.ItemAccessor(self.get_value,self.set_value)
try:
IMAQCamera.__init__(self,imaq_name)
except Exception:
self.close()
raise
self._add_full_info_node("model_data",self.get_model_data)
self._add_full_info_node("interface_name",lambda: self.name)
self._add_full_info_node("pfcam_port",lambda: self.pfcam_port)
self._add_status_node("properties",self.get_all_properties)
self._add_settings_node("trigger_interleave",self.get_trigger_interleave,self.set_trigger_interleave)
self._add_settings_node("cfr",self.is_CFR_enabled,self.enable_CFR)
self._add_settings_node("status_line",self.is_status_line_enabled,self.enable_status_line)
self._add_settings_node("bl_offset",self.get_black_level_offset,self.set_black_level_offset)
self._add_settings_node("exposure",self.get_exposure,self.set_exposure)
self._add_settings_node("frame_time",self.get_frame_time,self.set_frame_time)
[docs] def setup_max_baudrate(self):
brs=[115200,57600,38400,19200,9600,4800,2400,1200]
try:
for br in brs:
if lib.pfIsBaudRateSupported(self.pfcam_port,br):
lib.pfSetBaudRate(self.pfcam_port,br)
return
except PfcamError: # pfSetBaudRate sometimes raises unknown error
pass
[docs] def open(self):
"""Open connection to the camera"""
IMAQCamera.open(self)
if not self.pfcam_opened:
lib.pfPortInit()
lib.pfDeviceOpen(self.pfcam_port)
self.pfcam_opened=True
self.setup_max_baudrate()
self.properties=dictionary.Dictionary(dict([ (p.name.replace(".","/"),p) for p in self.list_properties() ]))
self._update_imaq()
[docs] def close(self):
"""Close connection to the camera"""
IMAQCamera.close(self)
if self.pfcam_opened:
lib.pfDeviceClose(self.pfcam_port)
self.pfcam_opened=False
[docs] def post_open(self):
"""Action to automatically call on opening"""
pass
[docs] def list_properties(self, root=""):
"""
List all properties at a given root.
Return list of :class:`PfcamProperty` objects, which allow querying and settings values
and getting additional information (limits, values).
"""
root=root.replace("/",".")
pfx=root
if root=="":
root=lib.pfDevice_GetRoot(self.pfcam_port)
else:
root=lib.pfProperty_ParseName(self.pfcam_port,root)
props=lib.collect_properties(self.pfcam_port,root,pfx=pfx,include_types=pfcam_lib.ValuePropertyTypes|{"PF_COMMAND"})
pfprops=[]
for (_,name) in props:
try:
pfprops.append(PfcamProperty(self.pfcam_port,name))
except PFGenericError:
pass
return pfprops
[docs] def get_value(self, name, default=None):
"""Get value of the property with a given name"""
name=name.replace(".","/")
if (default is not None) and (name not in self.properties):
return default
if self.properties.is_dictionary(self.properties[name]):
return self.get_all_properties(root=name)
v=self.properties[name].get_value()
return v
def _get_value_direct(self, name):
return lib.get_property_by_name(self.pfcam_port,name)
[docs] def set_value(self, name, value, ignore_missing=False, truncate=True):
"""
Set value of the property with a given name.
If ``truncate==True``, truncate value to lie within property range.
"""
name=name.replace(".","/")
if (name in self.properties) or (not ignore_missing):
if self.properties.is_dictionary(self.properties[name]):
self.set_all_properties(value,root=name)
else:
self.properties[name].set_value(value,truncate=truncate)
[docs] def call_command(self, name, arg=0, ignore_missing=False):
"""If property is a command, call it with a given argument; otherwise, raise an error."""
name=name.replace(".","/")
if (name in self.properties) or (not ignore_missing):
self.properties[name].call_command(arg=arg)
[docs] def get_all_properties(self, root="", as_dict=False):
"""
Get values of all properties with the given `root`.
If ``as_dict==True``, return ``dict`` object; otherwise, return :class:`.Dictionary` object.
"""
settings=self.properties[root].copy().filter_self(lambda a: a.readable).map_self(lambda a: a.get_value())
return settings.as_dict(style="flat") if as_dict else settings
[docs] def set_all_properties(self, settings, root="", truncate=True):
"""
Set values of all properties with the given `root`.
If ``truncate==True``, truncate value to lie within attribute range.
"""
settings=dictionary.as_dict(settings,style="flat",copy=False)
for k in settings:
if k in self.properties[root] and self.properties[root,k].writable:
self.properties[root,k].set_value(settings[k],truncate=truncate)
ModelData=collections.namedtuple("ModelData",["model","serial_number"])
[docs] def get_model_data(self):
"""
Get camera model data.
Return tuple ``(model, serial_number)``.
"""
model=py3.as_str(lib.pfProperty_GetName(self.pfcam_port,lib.pfDevice_GetRoot(self.pfcam_port)))
serial_number=self.get_value("Header.Serial",0)
return self.ModelData(model,serial_number)
[docs] def get_detector_size(self):
return self.properties["Window/W"].max,self.properties["Window/H"].max
def _get_pf_data_dimensions_rc(self):
return self.v["Window/H"],self.v["Window/W"]
def _update_imaq(self):
r,c=self._get_pf_data_dimensions_rc()
IMAQCamera.set_roi(self,0,c,0,r)
[docs] def get_roi(self):
"""
Get current ROI.
Return tuple ``(hstart, hend, vstart, vend)``.
"""
ox=self.v.get("Window/X",0)
oy=self.v.get("Window/Y",0)
w=self.v["Window/W"]
h=self.v["Window/H"]
return ox,ox+w,oy,oy+h
[docs] def set_roi(self, hstart=0, hend=None, vstart=0, vend=None):
"""
Setup camera ROI.
By default, all non-supplied parameters take extreme values.
"""
for a in ["Window/X","Window/Y","Window/W","Window/H"]:
if a not in self.properties or not self.properties[a].writable:
return
det_size=self.get_detector_size()
imaq_detector_size=IMAQCamera.get_detector_size(self)
if hend is None:
hend=det_size[0]
if vend is None:
vend=det_size[1]
self.v["Window/W"]=min(hend-hstart,imaq_detector_size[0])
self.v["Window/H"]=min(vend-vstart,imaq_detector_size[1])
self.v["Window/X"]=hstart
self.v["Window/Y"]=vstart
self._update_imaq()
return self.get_roi()
[docs] def get_roi_limits(self):
"""
Get the minimal and maximal ROI parameters.
Return tuple ``(min_roi, max_roi)``, where each element is in turn 4-tuple describing the ROI.
"""
params=["Window/X","Window/Y","Window/W","Window/H"]
for p in params:
self.properties[p].update_minmax()
minp=tuple([(self.properties[p].min if p in self.properties else 0) for p in params])
maxp=tuple([(self.properties[p].max if p in self.properties else 0) for p in params])
min_roi=(0,0)+minp[2:]
max_roi=maxp
return (min_roi,max_roi)
def _get_buffer_bpp(self):
bpp=IMAQCamera._get_buffer_bpp(self)
if "DataResolution" in self.properties:
res=self.v["DataResolution"]
m=re.match(r"Res(\d+)Bit",res)
if m:
bpp=(int(m.group(1))-1)//8+1
return bpp
def _get_data_dimensions_rc(self):
roi=self.get_roi()
w,h=roi[1]-roi[0],roi[3]-roi[2]
return h,w
[docs] def get_exposure(self):
"""Get current exposure"""
return self.v["ExposureTime"]*1E-3
[docs] def set_exposure(self, exposure):
"""Set current exposure"""
self.v["ExposureTime"]=exposure*1E3
return self.get_exposure()
[docs] def get_frame_time(self):
"""Get current frame time"""
if "FrameTime" in self.properties:
return self.v["FrameTime"]*1E-3
else:
return 1./float(self.v["FrameRate"])
[docs] def set_frame_time(self, frame_time):
"""Set current frame time"""
if "FrameTime" in self.properties:
self.v["FrameTime"]=frame_time*1E3
return self.get_frame_time()
[docs] def is_CFR_enabled(self):
"""Check if the constant frame rate mode is enabled"""
return self.get_value("Trigger/CFR",False)
[docs] def enable_CFR(self, enabled=True):
"""Enable constant frame rate mode"""
self.set_value("Trigger/CFR",enabled,ignore_missing=True)
return self.is_CFR_enabled()
[docs] def get_trigger_interleave(self):
"""Check if the trigger interleave is on"""
return self.get_value("Trigger/Interleave",False)
[docs] def set_trigger_interleave(self, enabled):
"""Set the trigger interleave option on or off"""
if self.get_trigger_interleave()!=enabled:
if self.is_CFR_enabled():
ft=self.get_frame_time()
self.enable_CFR(False)
self.set_value("Trigger/Interleave",enabled,ignore_missing=True)
self.enable_CFR(True)
self.set_frame_time(ft)
else:
self.set_value("Trigger/Interleave",enabled,ignore_missing=True)
return self.get_trigger_interleave()
[docs] def is_status_line_enabled(self):
"""Check if the status line is on"""
return self.get_value("EnStatusLine",False)
[docs] def enable_status_line(self, enabled=True):
"""Enable or disable status line"""
self.set_value("EnStatusLine",enabled,ignore_missing=True)
return self.is_status_line_enabled()
[docs] def get_black_level_offset(self):
"""Get the black level offset"""
return self.get_value("Voltages/BlackLevelOffset",0)
[docs] def set_black_level_offset(self, offset):
"""Set the black level offset"""
self.set_value("Voltages/BlackLevelOffset",offset,ignore_missing=True)
return self.get_black_level_offset()
##### Dealing with status line #####
_status_line_magic=0x55AA00FF
def _check_magic(line):
"""Check if the status line satisfies the magic 4-byte requirement"""
if line.ndim==1:
return line[0]==_status_line_magic
else:
return np.all(line[:,0]==_status_line_magic)
def _extract_line(frames, preferred_line=True):
lsz=min(frames.shape[-1]//4,6)
if frames.ndim==2:
if (frames.shape[1]>=36) == preferred_line:
return np.frombuffer(frames[-1,:lsz*4].astype("<u1").tobytes(),"<u4") if frames.shape[0]>0 else np.zeros((lsz,1))
else:
return np.frombuffer(frames[-2,:lsz*4].astype("<u1").tobytes(),"<u4") if frames.shape[0]>1 else np.zeros((lsz,1))
else:
if (frames.shape[2]>=36) == preferred_line:
return np.frombuffer((frames[:,-1,:lsz*4].astype("<u1").tobytes()),"<u4").reshape((-1,lsz)) if frames.shape[1]>0 else np.zeros((len(frames),lsz,1))
else:
return np.frombuffer((frames[:,-2,:lsz*4].astype("<u1").tobytes()),"<u4").reshape((-1,lsz)) if frames.shape[1]>1 else np.zeros((len(frames),lsz,1))
[docs]def get_status_lines(frames, check_transposed=True):
"""
Extract status lines from the given frames.
`frames` can be 2D array (one frame), 3D array (stack of frames, first index is frame number), or list of array.
Automatically check if the status line is present; return ``None`` if it's not.
If ``check_transposed==True``, check for the case where the image is transposed (i.e., line becomes a column).
"""
if isinstance(frames,list):
return [get_status_lines(f,check_transposed=check_transposed) for f in frames]
if frames.shape[-1]>=4:
lines=_extract_line(frames,True)
if _check_magic(lines):
return lines
lines=_extract_line(frames,False)
if _check_magic(lines):
return lines
if check_transposed:
tframes=frames.T if frames.ndim==2 else frames.transpose((0,2,1))
return get_status_lines(tframes,check_transposed=False)
return None
[docs]def get_status_line_position(frame, check_transposed=True):
"""
Check whether status line is present in the frame, and return its location.
Return tuple ``(row, transposed)``, where `row` is the status line row (can be ``-1`` or ``-2``)
and `transposed` is ``True`` if the line is present in the transposed image.
If no status line is found, return ``None``.
If ``check_transposed==True``, check for the case where the image is transposed (i.e., line becomes a column).
"""
if frame.shape[-1]>=4:
line=_extract_line(frame,True)
if _check_magic(line):
return (-1 if frame.shape[1]>=36 else -2),False
lines=_extract_line(frame,False)
if _check_magic(lines):
return (-2 if frame.shape[1]>=36 else -1),False
if check_transposed:
res=get_status_line_position(frame.T,check_transposed=False)
if res:
return res[0],True
return None
[docs]def remove_status_line(frame, sl_pos="calculate", policy="duplicate", copy=True):
"""
Remove status line from the frame.
Args:
frame: a frame to process (2D or 3D numpy array; if 3D, the first axis is the frame number)
sl_pos: status line position (returned by :func:`get_status_line_position`); if equal to ``"calculate"``, calculate here;
for a 3D array, assumed to be the same for all frames
policy: determines way to deal with the status line;
can be ``"keep"`` (keep as is), ``"cut"`` (cut off the status line row), ``"zero"`` (set it to zero),
``"median"`` (set it to the image median), or ``"duplicate"`` (set it equal to the previous row; default)
copy: if ``True``, make copy of the original frames; otherwise, attempt to remove the line in-place
"""
if sl_pos is "calculate":
sl_pos=get_status_line_position(frame) if frame.ndim==2 else get_status_line_position(frame[0])
if sl_pos and policy!="keep":
if copy:
frame=frame.copy()
if frame.ndim==2:
if sl_pos[1]:
frame=frame.T
if policy=="median":
frame[sl_pos[0]:,:]=np.median(frame[:,:sl_pos[0]]) if frame.shape[0]>abs(sl_pos[0]) else 0
elif policy=="zero":
frame[sl_pos[0]:,:]=0
elif policy=="cut":
frame=frame[:sl_pos[0],:]
else:
frame[sl_pos[0]:,:]=frame[sl_pos[0]-1,:].reshape((1,-1)) if frame.shape[0]>abs(sl_pos[0]) else 0
if sl_pos[1]:
frame=frame.T
else:
if sl_pos[1]:
frame=frame.transpose((0,2,1))
if policy=="median":
frame[:,sl_pos[0]:,:]=np.median(frame[:,:sl_pos[0],:],axis=(1,2)).reshape((-1,1,1)) if frame.shape[1]>abs(sl_pos[0]) else 0
elif policy=="zero":
frame[:,sl_pos[0]:,:]=0
elif policy=="cut":
frame=frame[:,:sl_pos[0],:]
else:
frame[:,sl_pos[0]:,:]=frame[:,sl_pos[0]-1,:].reshape((len(frame),1,-1)) if frame.shape[1]>abs(sl_pos[0]) else 0
if sl_pos[1]:
frame=frame.transpose((0,2,1))
return frame
[docs]def find_skipped_frames(lines, step=1):
"""
Check if there are skipped frames based on status line reading.
`step` specifies expected index step between neighboring frames.
Return list ``[(idx, skipped)]``, where `idx` is the index after which `skipped` frames were skipped.
"""
dfs=(lines[1:,1]-lines[:-1,1])%(2**24) # the internal counter is only 24-bit
skipped_idx=(dfs!=step)
skipped_idx=skipped_idx.nonzero()[0]
return list(zip(skipped_idx,dfs[skipped_idx])) if len(skipped_idx) else []