Source code for pylablib.core.fileio.loadfile
"""
Utilities for reading data files.
"""
from builtins import bytes, range
from . import datafile, location, dict_entry, parse_csv #@UnresolvedImport
from ..utils import dictionary, funcargparse, string #@UnresolvedImport
import datetime
import re
import numpy as np
_depends_local=[".parse_csv"]
_module_parameters={"fileio/loadfile/csv/out_type":"table"}
##### File type detection #####
def _is_unprintable_character(chn):
return chn<8 or 13<chn<27 or 27<chn<32
def _detect_binary_file(stream):
pos=stream.tell()
chunk=bytes(stream.read(4096))
stream.seek(pos)
for c in chunk:
if _is_unprintable_character(c):
return True
return False
_dict_line_soft=r"^[\S]*(/[\S]*)*\s+"
_dict_line_soft_regexp=re.compile(_dict_line_soft)
_dict_line_hard=r"^[\w]*(/[\w]*)+\s+"
_dict_line_hard_regexp=re.compile(_dict_line_hard)
_dicttable_line=r"^#+\s*table\s+(start|end)"
_dicttable_line_regexp=re.compile(_dicttable_line)
## single comma with whitespaces, or just whitespaces, but no less than 3 spaces in a row
##_table_delimiters_hard=r"\s*(,| )\s*|[\f\n\r\t\v]+"
##_table_delimiters_hard_regexp=re.compile(_table_delimiters_hard)
def _try_row_type(line):
"""
Try to determine whether the line is a comment line, a numerical data row, a dictionary row or an unrecognized row.
Doesn't distinguish with a great accuracy; useful only for trying to guess file format.
"""
line=line.strip().lower()
if line=="":
return "empty"
if _dicttable_line_regexp.match(line):
return "dict_table"
if line[0]=="#":
return "comment"
if _dict_line_hard_regexp.match(line):
return "dict"
split_line=parse_csv._table_delimiters_regexp.split(line)
split_line=[el for el in split_line if el!=""]
try:
for e in split_line:
if not e in {"","nan",'inf',"+inf","-inf"}:
complex(e.replace("i","j"))
return "numerical"
except ValueError:
return "unrecognized"
def _detect_textfile_type(stream):
line_type_count={"empty":0,"dict":0,"dict_table":0,"comment":0,"numerical":0,"unrecognized":0}
pos=stream.tell()
data_lines=0
while data_lines<20:
l=stream.readline()
if l=="":
break
line_type=_try_row_type(l)
line_type_count[line_type]=line_type_count[line_type]+1
if line_type in {"dict","numerical"}:
data_lines=data_lines+1
stream.seek(pos)
if line_type_count["dict_table"]>0 and data_lines>2:
return "dict"
if data_lines<5 and data_lines<line_type_count["unrecognized"]*2:
return "unrecognized"
if line_type_count["dict"]>line_type_count["numerical"]:
return "dict"
else:
return "table"
_time_expr=r"(\d+)\s*/\s*(\d+)\s*/\s*(\d+)\s+(\d+)\s*:\s*(\d+)\s*:\s*(\d+)(.\d+)?"
_time_comment=r"(?:saved|created)\s+(?:on|at)\s*"+_time_expr
_time_comment_regexp=re.compile(_time_comment,re.IGNORECASE)
def _try_time_comment(line):
m=_time_comment_regexp.match(line)
if m is None:
return None
else:
year,month,day,hour,minute,second,usec=m.groups()
usec=usec or 0
return datetime.datetime(int(year),int(month),int(day),int(hour),int(minute),int(second),int(float(usec)*1E6))
def _try_columns_line(line, row_size):
split_line=string.from_row_string(line,parse_csv._table_delimiters_regexp)
if len(split_line)!=row_size:
return None
try:
for e in split_line:
complex(e.replace("i","j"))
return None # all numerical, can't be column names
except (ValueError, AttributeError):
return split_line
def _find_columns_lines(corrupted, comments, row_size):
if len(corrupted["type"])>0:
return corrupted["type"][0],None
for i,l in enumerate(comments):
columns=_try_columns_line(l,row_size)
if columns is not None:
return columns,i
return None,None
def _parse_dict_line(line):
s=line.split(None,1)
if len(s)==0:
return None
if len(s)==1:
return tuple(s)
key,value=tuple(s)
value=string.from_string(value)
return key,value
_dicttable_start=r"^#+\s*(table\s+start|start\s+table)"
_dicttable_start_regexp=re.compile(_dicttable_start,re.IGNORECASE)
_dicttable_end=r"^#+\s*(table\s+end|end\s+table)"
_dicttable_end_regexp=re.compile(_dicttable_end,re.IGNORECASE)
def _load_dict_and_comments(f, case_normalization=None, inline_dtype="generic"):
case_sensitive=case_normalization is None
data=dictionary.Dictionary(case_sensitive=case_sensitive, case_normalization=case_normalization or "lower")
comment_lines=[]
line=f.readline()
root_keys=[]
prev_key=None
while line:
line=line.strip()
if line!="":
if line[:1]!='#': #dict row
parsed=_parse_dict_line(line)
if parsed is not None:
if len(parsed)==1:
key=parsed[0]
if key.startswith("///"): # root key one level up
root_keys=root_keys[:-1]
elif key.startswith("//"): # new nested root key
root_keys.append(key[2:])
else:
if root_keys:
key="/".join(root_keys)+"/"+key
prev_key=(key,) # single-key line possibly means that an inline table follows
else:
key,value=parsed
if root_keys:
key="/".join(root_keys)+"/"+key
data[key]=value
prev_key=key
else:
if _dicttable_start_regexp.match(line[1:]) is not None:
table,comments,corrupted=parse_csv.load_table(f,dtype=inline_dtype,stop_comment=_dicttable_end_regexp)
columns,comment_idx=_find_columns_lines(corrupted,comments,table.shape[1])
if comment_idx is not None:
del comments[comment_idx]
if columns is not None:
table.set_column_names(columns)
comment_lines=comment_lines+comments
if prev_key is not None:
data[prev_key]=table
else:
raise IOError("inline table isn't attributed to any dict node")
else:
comment_lines.append(line.lstrip("# \t"))
line=f.readline()
return (data,comment_lines)
##### Data normalization #####
def _extract_savetime_comment(comments):
if len(comments)==0:
return None
for i,c in enumerate(comments):
creation_time=_try_time_comment(c)
if creation_time is not None:
break
if i<len(comments):
del comments[i]
return creation_time
def _determine_columns_comment(comment):
pass
##### File formats #####
[docs]class IInputFileFormat(object):
"""
Generic class for an input file format.
Based on `file_format` or autodetection, calls one of its subclasses to read the file.
"""
def __init__(self):
object.__init__(self)
[docs] @staticmethod
def read_file(location_file, file_format, **kwargs):
file_format=file_format or "generic"
if file_format in {"txt","csv","dict"}:
return ITextInputFileFormat.read_file(location_file,file_format=file_format,**kwargs)
if file_format in {"bin"}:
return BinaryTableInputFileFormatter.read_file(location_file,file_format=file_format,**kwargs)
if file_format in {"generic"}:
with location_file.opening(mode="read",data_type="binary"):
is_binary=_detect_binary_file(location_file.stream)
if is_binary:
return BinaryTableInputFileFormatter.read_file(location_file,file_format="bin",**kwargs)
else:
return ITextInputFileFormat.read_file(location_file,file_format="txt",**kwargs)
[docs]class ITextInputFileFormat(IInputFileFormat):
"""
Generic class for a text input file format.
Based on `file_format` or autodetection, calls one of its subclasses to read the file.
"""
def __init__(self):
IInputFileFormat.__init__(self)
[docs] @staticmethod
def read_file(location_file, file_format, **kwargs):
if file_format in {"csv"}:
return CSVTableInputFileFormat.read_file(location_file,file_format=file_format,**kwargs)
if file_format in {"dict"}:
return DictionaryInputFileFormat.read_file(location_file,file_format=file_format,**kwargs)
if file_format in {"txt"}:
with location_file.opening(mode="read",data_type="text"):
txt_type=_detect_textfile_type(location_file.stream)
if txt_type=="table":
return CSVTableInputFileFormat.read_file(location_file,file_format="csv",**kwargs)
elif txt_type=="dict":
return DictionaryInputFileFormat.read_file(location_file,file_format="dict",**kwargs)
else:
raise IOError("can't determine file type")
[docs]class CSVTableInputFileFormat(ITextInputFileFormat):
"""
Class for CSV input file format.
"""
def __init__(self):
ITextInputFileFormat.__init__(self)
[docs] @staticmethod
def read_file(location_file, out_type="default", dtype="numeric", columns=None, delimiters=None, empty_entry_substitute=None, ignore_corrupted_lines=True, skip_lines=0, **kwargs):
"""
Read CSV file.
See :func:`.parse_csv.load_table` for more description.
Args:
location_file: Location of the data.
out_type (str): type of the result: ``'array'`` for numpy array, ``'pandas'`` for pandas DataFrame, ``'table'`` for :class:`.DataTable` object,
or ``'default'`` (determined by the library default; ``'table'`` by default)
dtype: dtype of entries; can be either a single type, or a list of types (one per column).
Possible dtypes are: ``'int'``, ``'float'``, ``'complex'``,
``'numeric'`` (tries to coerce to minimal possible numeric type, raises error if data can't be converted to `complex`),
``'generic'`` (accept arbitrary types, including lists, dictionaries, escaped strings, etc.),
``'raw'`` (keep raw string).
columns: either a number if columns, or a list of columns names.
delimiters (str): Regex string which recognizes entries delimiters (by default ``r"\\s*,\\s*|\\s+"``, i.e., commas and whitespaces).
empty_entry_substitute: Substitute for empty table entries. If ``None``, all empty table entries are skipped.
ignore_corrupted_lines (bool): If ``True``, skip corrupted (e.g., non-numeric for numeric dtype, or with too few entries) lines;
otherwise, raise :exc:`ValueError`.
skip_lines (int): Number of lines to skip from the beginning of the file.
"""
if out_type=="default":
out_type=_module_parameters["fileio/loadfile/csv/out_type"]
if delimiters is None:
delimiters=parse_csv._table_delimiters
with location_file.opening(mode="read",data_type="text"):
for _ in range(skip_lines):
location_file.stream.readline()
data,comments,corrupted=parse_csv.load_table(location_file.stream,dtype=dtype,columns=columns,out_type=out_type,
delimiters=delimiters,empty_entry_substitute=empty_entry_substitute,ignore_corrupted_lines=ignore_corrupted_lines)
if out_type in {"table","pandas"} and not funcargparse.is_sequence(columns,"builtin;nostring") and len(data)>0:
columns,comment_idx=_find_columns_lines(corrupted,comments,data.shape[1])
if comment_idx is not None:
del comments[comment_idx]
if columns is not None:
if out_type=="table":
data.set_column_names(columns)
else:
data.columns=columns
creation_time=_extract_savetime_comment(comments)
return datafile.DataFile(data=data,comments=comments,creation_time=creation_time,filetype="csv")
[docs]class DictionaryInputFileFormat(ITextInputFileFormat):
"""
Class for Dictionary input file format.
"""
def __init__(self):
ITextInputFileFormat.__init__(self)
[docs] @staticmethod
def read_file(location_file, case_normalization=None, inline_dtype="generic", entry_format="value", skip_lines=0, **kwargs):
"""
Read Dictionary file.
Args:
location_file: Location of the data.
case_normalization (str): If ``None``, the dictionary paths are case-sensitive;
otherwise, defines the way the entries are normalized (``'lower'`` or ``'upper'``).
inline_dtype (str): dtype for inlined tables.
entry_format (str): Determines the way for dealing with :class:`.dict_entry.IDictionaryEntry` objects
(objects transformed into dictionary branches with special recognition rules). Can be
``'branch'`` (don't attempt to recognize those object, leave dictionary as in the file),
``'dict_entry'`` (recognize and leave as :class:`.dict_entry.IDictionaryEntry` objects) or
``'value'`` (recognize and keep the value).
skip_lines (int): Number of lines to skip from the beginning of the file.
"""
if not entry_format in {"branch","dict_entry","value"}:
raise ValueError("unrecognized entry format: {0}".format(entry_format))
with location_file.opening(mode="read",data_type="text"):
for _ in range(skip_lines):
location_file.stream.readline()
data,comments=_load_dict_and_comments(location_file.stream,inline_dtype=inline_dtype,case_normalization=case_normalization)
creation_time=_extract_savetime_comment(comments)
def map_entries(ptr):
if dict_entry.special_load_rules(ptr):
entry=dict_entry.from_dict(ptr,location_file.loc)
if entry_format=="value":
entry=entry.data
return entry
else:
return ptr
if entry_format!="branch":
data.map_self(map_entries,to_visit="branches",topdown=False)
if len(data)==1 and list(data.keys())==["__data__"]: # special case of files with preamble
data=data["__data__"]
return datafile.DataFile(data=data,comments=comments,creation_time=creation_time,filetype="dict")
[docs]class BinaryTableInputFileFormatter(IInputFileFormat):
"""
Class for binary input file format.
"""
def __init__(self):
IInputFileFormat.__init__(self)
[docs] @staticmethod
def read_file(location_file, out_type="default", dtype=">f8", columns=None, packing="flatten", preamble=None, skip_bytes=0, **kwargs):
"""
Read binary file.
Args:
location_file: Location of the data.
out_type (str): type of the result: ``'array'`` for numpy array, ``'pandas'`` for pandas DataFrame, ``'table'`` for :class:`.DataTable` object,
or ``'default'`` (determined by the library default; ``'table'`` by default)
dtype: :class:`numpy.dtype` describing the data.
columns: either number if columns, or a list of columns names.
packing (str): The way the 2D array is packed. Can be either
``'flatten'`` (data is stored row-wise) or
``'transposed'`` (data is stored column-wise).
preamble (dict): If not ``None``, defines binary file parameters that supersede the parameteres supplied to the function.
The defined parameters are ``'dtype'``, ``'packing'``, ``'ncols'`` (number of columns) and ``'nrows'`` (number of rows).
skip_bytes (int): Number of bytes to skip from the beginning of the file.
"""
if out_type=="default":
out_type=_module_parameters["fileio/loadfile/csv/out_type"]
preamble=preamble or {}
dtype=preamble.get("dtype",dtype)
packing=preamble.get("packing",packing)
preamble_columns_num=preamble.get("ncols",None)
preamble_rows_num=preamble.get("nrows",None)
with location_file.opening(mode="read",data_type="binary"):
if skip_bytes:
location_file.stream.seek(skip_bytes,1)
data=np.fromfile(location_file.stream,dtype=dtype)
try:
columns_num=len(columns)
except TypeError:
columns_num=columns
columns=None
if columns_num is None:
columns_num=preamble_columns_num
elif preamble_columns_num is not None and preamble_columns_num!=columns_num:
raise ValueError("supplied columns number {0} disagrees with extracted form preamble {1}".format(columns_num,preamble_columns_num))
if columns_num is not None:
if packing=="flatten":
data=data.reshape((-1,columns_num))
elif packing=="transposed":
data=data.reshape((columns_num,-1)).transposed()
else:
raise ValueError("unrecognized packing method: {0}".format(packing))
else:
data=np.column_stack([data])
if preamble_rows_num is not None and len(data)!=preamble_rows_num:
raise ValueError("supplied rows number {0} disagrees with extracted form preamble {1}".format(len(data),preamble_rows_num))
data=parse_csv.columns_to_table([data[:,i] for i in range(data.shape[1])],columns=columns,out_type=out_type)
return datafile.DataFile(data=data,filetype="bin")
[docs]def load(path=None, input_format=None, loc="file", return_file=False, **kwargs):
"""
Load data from the file.
Args:
path (str): Path to the file.
input_format (str): Input file format. If ``None``, attempt to auto-detect file format (same as ``'generic'``).
loc (str): Location type.
return_file (bool): If ``True``, return :class:`.DataFile` object (contains some metainfo);
otherwise, return just the file data.
`**kwargs` are passed to the file formatter used to read the data
(see :meth:`CSVTableInputFileFormat.read_file`, :meth:`DictionaryInputFileFormat.read_file` and :meth:`BinaryTableInputFileFormatter.read_file` for the possible arguments).
The default format names are:
- ``'generic'``: Generic file format. Attempt to autodetect, raise :exc:`IOError` if unsuccessful;
- ``'txt'``: Generic text file. Attempt to autodetect, raise :exc:`IOError` if unsuccessful
- ``'csv'``: CSV file, corresponds to :class:`CSVTableInputFileFormat`;
- ``'dict'``: Dictionary file, corresponds to :class:`DictionaryInputFileFormat`;
- ``'bin'``: Binary file, corresponds to :class:`BinaryTableInputFileFormatter`
"""
loc=location.get_location(loc,path)
location_file=location.LocationFile(loc)
data_file=IInputFileFormat.read_file(location_file,file_format=input_format,**kwargs)
if return_file:
return data_file
else:
return data_file.data