Source code for pylablib.aux_libs.file_formats.ecam
from ...core.utils import py3, general, funcargparse, files as file_utils
from ...core.devio import data_format
from ...core.fileio import binio
import numpy as np
import numpy.random
import time, collections, os.path, zlib, pickle
[docs]def gen_uid():
uid_arr=numpy.random.randint(0,256,size=8,dtype="u1")
return py3.as_bytes(uid_arr)
[docs]class ECamFrame(object):
"""
A data frame for .ecam format.
Args:
data: frame data (numpy array with between 1 and 4 dimensions)
uid(bytes): 8-byte unique ID of the frame (by default, generate a new random ID).
timetamps(float): frame timestamp (by default, use current time)
**kwargs: additional frame blocks (values and meaning depend on the block type, and can be expanded later)
"""
def __init__(self, data, uid="new", timestamp="new", **kwargs):
object.__init__(self)
self.data=data
self.uid=gen_uid() if uid=="new" else uid
self.timestamp=time.time() if timestamp=="new" else timestamp
self.blocks=kwargs
def __getitem__(self, key):
return self.blocks[key]
def __setitem__(self, key, value):
self.blocks[key]=value
[docs] def update_timestamp(self, timestamp=None):
"""Update the frame timestamp (by default, use current time)"""
self.timestamp=timestamp or time.time()
[docs] def uid_to_int(self):
"""Return UID as an 8-byte integer"""
return int(np.frombuffer(self.uid,"<u8")[0]) if self.uid else None
[docs] def uid_to_hex(self):
"""Return UID as a 16-symbol hex string"""
return "".join(["{:02x}".format(d) for d in self.uid]) if self.uid else None
current_version=0x0001
valid_magic=b"eCAM\x0f64\x0b"
valid_versions=[0x0001]
default_pickle_proto=3
_header_fields=[("header_size",4),("image_bytes",8),("version",2),("magic",8),("shape",16),("dtype",2),("stype",2),
("uid",8),("timestamp",8)]
_hf_sizes=dict(_header_fields)
def _gen_offsets(fields):
offsets={}
off=0
for n,s in fields:
offsets[n]=off
off+=s
offsets["__end__"]=off
return offsets
_hf_offsets=_gen_offsets(_header_fields)
THeader=collections.namedtuple("THeader",["header_size","image_bytes","version","shape","dtype","stype","uid","timestamp","blocks"])
stypes={0x00:"none",0x01:"raw",0x10:"zlib"}
stypes_inv=general.invert_dict(stypes)
dtypes=general.merge_dicts(binio.fdtypes,binio.idtypes)
dtypes_inv=general.invert_dict(dtypes)
TBlock=collections.namedtuple("TBlock",["btype","value"])
btypes={0x00:"none",0x01:"skip",0x10:"cam_params",0x20:"pickle"}
btypes_inv=general.invert_dict(btypes)
cam_params={0x01:("name","sp<u2"),0x02:("model","sp<u2"),0x03:("id","sp<u2"),
0x10:("exposure","<f8"),0x11:("frame_rate","<f8"),
0x20:("roi",("<u4",)*4),0x21:("binning",("<u2",)*2),0x22:("pixel_max","<u8"),0x23:("pixel_min","<i8"),
0x30:("acq_mode","sp<u1"),0x31:("read_mode","sp<u1"),0x32:("pixel_mode","sp<u1"),
0x38:("timing_mode","sp<u1"),0x39:("trigger_mode","sp<u1"),0x3a:("trigger_level","<f8"),
0x40:("buffer_size","<u4"),0x41:("buffer_filled","<u4"),
0x80:("status","sp<u2"),0x81:("acq_status","sp<u2")}
cam_params_inv=general.invert_dict(cam_params,kmap=lambda x: x[0])
[docs]class ECamFormatter(object):
"""
Formatter for .ecam files.
Class responsible for writing and reading arbitrary ECam frames.
Args:
stype(str): storage type for the data. Can be ``"raw"`` (write as raw binary),
``"zlib"`` (raw binary compressed using standard Python zlib module), or ``"none"`` (write zeros instead of data).
Used only for writing; in reading, all storage types are supported.
dtype: default data dtype. If suppled, any written data will be converted to this dtype,
and any read data will have this dtype by default (unless specified explicitly). Otherwise, use supplied data dtype when writing.
shape(tuple): default data shape (tuple of length 2 or 3). If suppled, any written and read data is supposed to have this shape
(also use this as default shape if none is provided in the file).
"""
def __init__(self, stype="raw", dtype=None, shape=(None,None)):
object.__init__(self)
self.stype=stype
self.dtype=dtype
self.shape=shape
def _build_frame(self, header, data):
blocks={}
for btype,bvalue in header.blocks:
if btypes[btype]=="pickle":
name,val=bvalue
blocks.setdefault("pickle",{})[name]=val
elif btypes[btype]=="cam_params":
blocks["cam_params"]=bvalue
return ECamFrame(data,header.uid,header.timestamp,**blocks)
def _read_block(self, f, btype, size):
if btype not in btypes:
raise ECamFormatError("bad file format: unknown block type 0x{:04x}".format(btype))
btype=btypes[btype]
if btype=="skip":
f.seek(size,1)
value=size
elif btype=="pickle":
ipos=f.tell()
parsize=binio.read_num(f,"<u2")
f.seek(parsize,1)
name=binio.read_str(f,"sp<u4")
header_size=(f.tell()-ipos)
if size>=header_size:
svalue=f.read(size-header_size)
value=(name,pickle.loads(svalue))
else:
raise ECamFormatError("error reading block: block size {} is too small".format(size))
elif btype=="cam_params":
value={}
epos=f.tell()+size
while f.tell()<epos:
pid=binio.read_num(f,"u1")
if pid not in cam_params:
raise ECamFormatError("error reading block: unknown camera param id 0x{:02x}".format(pid))
name,dtype=cam_params[pid]
value[name]=binio.read_val(f,dtype)
return value
def _read_header(self, f):
try:
header_size=binio.read_num(f,"<u4")
except IndexError:
raise StopIteration
if header_size in {0,4} or (header_size<_hf_offsets["__end__"] and header_size not in _hf_offsets.values()):
raise ECamFormatError("bad file format: header size is {}".format(header_size))
image_bytes=binio.read_num(f,"<u8")
if header_size>_hf_offsets["version"]:
version=binio.read_num(f,"<u2")
if version not in valid_versions:
raise ECamFormatError("bad file format: unsupported version 0x{:02x}".format(version))
else:
version=None
if header_size>_hf_offsets["magic"]:
magic=f.read(8)
if magic!=valid_magic:
raise ECamFormatError("bad file format: invalid magic {}".format(magic))
if header_size>_hf_offsets["shape"]:
shape=binio.read_val(f,("<u4",)*4)
else:
shape=self.shape
shape=tuple([s for s in shape if s!=0])
if (None not in self.shape) and shape!=self.shape:
raise ValueError("data shape {} doesn't agree with the formatter shape {}".format(shape,self.shape))
dtype=binio.read_num(f,"<u2") if header_size>_hf_offsets["dtype"] else self.dtype
stype=binio.read_num(f,"<u2") if header_size>_hf_offsets["stype"] else self.stype
uid=f.read(8) if header_size>_hf_offsets["uid"] else None
timestamp=binio.read_num(f,"<f8") if header_size>_hf_offsets["timestamp"] else None
read_bytes=_hf_offsets["__end__"]
blocks=[]
while header_size>read_bytes:
if read_bytes+2>header_size:
raise ECamFormatError("bad file format: not enough data for a block type")
btype=binio.read_num(f,"<u2")
if btype==0x00:
f.seek(header_size-(read_bytes+2))
read_bytes=header_size
break
if read_bytes+6>header_size:
raise ECamFormatError("bad file format: not enough data for a block size")
bsize=binio.read_num(f,"<u4")
bvalue=self._read_block(f,btype,bsize)
blocks.append(TBlock(btype,bvalue))
read_bytes+=(6+bsize)
return THeader(header_size,image_bytes,version,shape,dtype,stype,uid,timestamp,blocks)
def _check_data_size(self, df, shape, data_bytes):
nelem=int(np.prod(shape,dtype="u8"))
if df.size*nelem!=data_bytes:
shape_str="x".join([str(s) for s in shape])
raise ECamFormatError("bad file format: mismatched frame byte size: expect {}x{}={}, got {}".format(
shape_str,df.size,nelem*df.size,data_bytes))
def _read_data(self, f, header):
if header.stype is None:
raise ECamFormatError("bad file format: not enough header data to read image")
if header.stype not in stypes:
raise ECamFormatError("bad file format: unknown storage type 0x{:04x}".format(header.stype))
stype=stypes[header.stype]
if stype=="none":
f.seek(header.image_bytes,1)
return None
if (None in header.shape) or header.dtype is None:
raise ECamFormatError("bad file format: not enough header data to read image")
if header.dtype not in dtypes:
raise ECamFormatError("bad file format: unknown data type 0x{:04x}".format(header.dtype))
dtype=dtypes[header.dtype]
df=data_format.DataFormat.from_desc(dtype)
nelem=int(np.prod(header.shape,dtype="u8"))
if stype=="raw":
self._check_data_size(df,header.shape,header.image_bytes)
img=np.fromfile(f,dtype=df.to_desc(),count=nelem)
if len(img)!=nelem:
raise ECamFormatError("bad file format: expected {} elements, found {}".format(nelem,len(img)))
img=img.reshape(header.shape)
elif stype=="zlib":
comp_data=f.read(header.image_bytes)
raw_data=zlib.decompress(comp_data)
self._check_data_size(df,header.shape,len(raw_data))
img=np.fromstring(raw_data,dtype=df.to_desc(),count=nelem).reshape(header.shape)
return img
[docs] def skip_frame(self, f):
"""Skip next frame starting at the current position within the file `f`"""
header=self._read_header(f)
f.seek(header.image_bytes,1)
return header.header_size,header.image_bytes
[docs] def read_frame(self, f, return_format="frame"):
"""
Read next frame starting at the current position within the file `f`.
`return_format` is the format for return data. Can be ``"frame"`` (return :class:`ECamFrame` object with all metadata),
``"image"`` (return only image array), or ``"raw"`` (return tuple ``(header, image)`` with raw data).
"""
funcargparse.check_parameter_range(return_format,"return_format",{"frame","image","raw"})
header=self._read_header(f)
img=self._read_data(f,header)
if return_format=="frame":
return self._build_frame(header,img)
elif return_format=="image":
return img
else:
return header,img
def _format_frame(self, frame):
if isinstance(frame,ECamFrame):
data=np.asarray(frame.data)
uid=frame.uid
timestamp=frame.timestamp
fblocks=frame.blocks
else:
data=np.asarray(frame)
uid,timestamp=None,None
fblocks={}
if self.dtype is not None:
data=data.astype(self.dtype)
if (None not in self.shape) and data.shape!=self.shape:
raise ValueError("data shape {} doesn't agree with the formatter shape {}".format(data.shape,self.shape))
if data.ndim not in [1,2,3,4]:
raise ValueError("can only save 1D, 2D, 3D, and 4D arrays; got {}D".format(data.ndim))
df=data_format.DataFormat.from_desc(str(data.dtype))
dtype=df.to_desc()
shape=data.shape
if self.stype=="none":
dsize=int(np.prod(data.shape,dtype="u8"))*df.size
data=None
elif self.stype=="raw":
data=data.astype(dtype=dtype)
dsize=int(np.prod(data.shape,dtype="u8"))*df.size
elif self.stype=="zlib":
raw_str=data.astype(dtype=dtype).tostring()
data=zlib.compress(raw_str,level=1)
dsize=len(data)
else:
raise ValueError("unrecognized storage type: {}".format(self.stype))
blocks=[]
for k,v in fblocks.items():
if k=="pickle":
btype=btypes_inv["pickle"]
for pn,pv in v.items():
bvalue=(pn,pv)
blocks.append(TBlock(btype,bvalue))
elif k=="cam_params":
btype=btypes_inv["cam_params"]
blocks.append(TBlock(btype,v))
header=THeader(-1,dsize,current_version,shape,dtypes_inv[dtype],stypes_inv[self.stype],uid,timestamp,blocks)
return header,data
def _write_block(self, f, btype, bvalue):
btype=btypes[btype]
if btype=="skip":
f.write(b"\x00"*bvalue)
elif btype=="pickle":
pn,pv=bvalue
binio.write_num(2,f,"<u2")
binio.write_num(default_pickle_proto,f,"<u2")
binio.write_str(pn,f,"sp<u4")
pickle.dump(pv,f,protocol=default_pickle_proto)
elif btype=="cam_params":
for pid,(pn,dtype) in cam_params.items():
if pn in bvalue:
binio.write_num(pid,f,"u1")
v=bvalue[pn]
binio.write_val(v,f,dtype)
def _write_header(self, header, f):
with binio.size_prepend(f,"<u4",4):
binio.write_num(header.image_bytes,f,"<u8")
binio.write_num(header.version,f,"<u2")
f.write(valid_magic)
if None not in header.shape:
shape=header.shape+(0,)*(4-len(header.shape))
np.asarray(shape,dtype="u4").astype("<u4").tofile(f)
else:
return
if header.dtype is not None:
binio.write_num(header.dtype,f,"<u2")
else:
return
if header.stype is not None:
binio.write_num(header.stype,f,"<u2")
else:
return
if header.uid is not None:
f.write(header.uid)
else:
return
if header.timestamp is not None:
binio.write_num(header.timestamp,f,"<f8")
else:
return
for btype,bvalue in header.blocks:
binio.write_num(btype,f,"<u2")
with binio.size_prepend(f,"<u4"):
self._write_block(f,btype,bvalue)
def _write_image(self, header, data, f):
if data is None:
f.write("\x00"*header.image_bytes)
elif isinstance(data,bytes):
f.write(data)
elif isinstance(data,np.ndarray):
data.tofile(f)
else:
raise ValueError("don't know how to write data {}".format(data))
[docs] def write_frame(self, frame, f):
"""
Read the supplied `frame` starting at the current position within the file `f`.
`frame` can be either :class:`ECamFrame` object, or a numpy array (in which case no metadata is saved).
"""
header,data=self._format_frame(frame)
self._write_header(header,f)
self._write_image(header,data,f)
return header.header_size,header.image_bytes
[docs]def save_ecam(frames, path, append=True, formatter=None):
"""
Save `frames` into a .ecam datafile.
If ``append==False``, clear the file before writing the frames.
`formatter` specifies :class:`ECamFormatter` instance for frame saving.
"""
mode="r+b" if (append and os.path.exists(path)) else "wb"
formatter=formatter or ECamFormatter()
with open(path,mode) as f:
f.seek(0,2)
for fr in frames:
formatter.write_frame(fr,f)
[docs]def save_ecam_single(frame, path, append=True, **kwargs):
"""
Save a single `frame` into a .ecam datafile.
If ``append==False``, clear the file before writing the frames.
``**kwargs`` specify parameters passed to the :class:`ECamFormatter` constructor for the saving formatter.
"""
formatter=ECamFormatter(**kwargs)
save_ecam([frame],path,append=append,formatter=formatter)
[docs]class ECamReader(object):
"""
Reader class for .ecam files.
Allows transparent access to frames by reading them from the file on the fly (without loading the whole file).
Supports determining length, indexing (only positive single-element indices) and iteration.
Args:
path(str): path to .ecam file.
same_size(bool): if ``True``, assume that all frames have the same size (including header), which speeds up random access and obtaining number of frames;
otherwise, the first time the length is determined or a large-index frame is accessed can take a long time (all subsequent calls are faster).
return_format(str): format for return data. Can be ``"frame"`` (return :class:`ECamFrame` object with all metadata),
``"image"`` (return only image array), or ``"raw"`` (return tuple ``(header, image)`` with raw data).
formatter(ECamFormatter): formatter for saving
"""
def __init__(self, path, same_size=False, return_format="frame", formatter=None):
object.__init__(self)
self.path=file_utils.normalize_path(path)
self.frame_offsets=[0]
self.frames_num=None
self.same_size=same_size
self.return_format=return_format
self.formatter=formatter or ECamFormatter()
def _read_frame_at(self, offset):
with open(self.path,"rb") as f:
f.seek(offset)
return self.formatter.read_frame(f,return_format=self.return_format)
def _read_next_frame(self, f, skip=False):
if skip:
self.formatter.skip_frame(f)
data=None
else:
data=self.formatter.read_frame(f,return_format=self.return_format)
self.frame_offsets.append(f.tell())
return data
def _read_frame(self, idx):
idx=int(idx)
if self.same_size:
if len(self.frame_offsets)==1:
with open(self.path,"rb") as f:
self._read_next_frame(f,skip=True)
offset=self.frame_offsets[1]*idx
return self._read_frame_at(offset)
else:
if idx<len(self.frame_offsets):
return self._read_frame_at(self.frame_offsets[idx])
next_idx=len(self.frame_offsets)-1
offset=self.frame_offsets[-1]
with open(self.path,"rb") as f:
f.seek(offset)
while next_idx<=idx:
data=self._read_next_frame(f,next_idx<idx)
next_idx+=1
return data
def _fill_offsets(self):
if self.frames_num is not None:
return
if self.same_size:
file_size=os.path.getsize(self.path)
if file_size==0:
self.frames_num=0
else:
with open(self.path,"rb") as f:
self._read_next_frame(f,skip=True)
if file_size%self.frame_offsets[1]:
raise IOError("File size {} is not a multiple of single frame size {}".format(file_size,self.frame_offsets[1]))
self.frames_num=file_size//self.frame_offsets[1]
else:
offset=self.frame_offsets[-1]
try:
with open(self.path,"rb") as f:
f.seek(offset)
while True:
self._read_next_frame(f,skip=True)
except StopIteration:
pass
self.frames_num=len(self.frame_offsets)-1
[docs] def size(self):
"""Get the total number of frames"""
self._fill_offsets()
return self.frames_num
__len__=size
def __getitem__(self, idx):
if isinstance(idx,slice):
return list(self.iterrange(idx.start or 0,idx.stop,idx.step or 1))
try:
return self._read_frame(idx)
except StopIteration:
raise IndexError("index {} is out of range".format(idx))
[docs] def get_data(self, idx):
"""Get a single frame at the given index (only non-negative indices are supported)"""
return self[idx]
def __iter__(self):
return self.iterrange()
[docs] def iterrange(self, *args):
"""
iterrange([start,] stop[, step])
Iterate over frames starting with `start` ending at `stop` (``None`` means until the end of file) with the given `step`.
"""
start,stop,step=0,None,1
if len(args)==1:
stop,=args
elif len(args)==2:
start,stop=args
elif len(args)==3:
start,stop,step=args
if step<0:
raise IndexError("format doesn't support reversed indexing")
try:
n=start
while True:
yield self._read_frame(n)
n+=step
if stop is not None and n>=stop:
break
except StopIteration:
pass
[docs]def load_ecam(path, return_format="image"):
"""
Read .ecam file.
Args:
path(str): path to .ecam file.
return_format(str): format for return data. Can be ``"frame"`` (return :class:`ECamFrame` object with all metadata),
``"image"`` (return only image array), or ``"raw"`` (return tuple ``(header, image)`` with raw data).
"""
return list(ECamReader(path,return_format=return_format).iterrange())