"""
Utilities for parsing CSV files.
"""
from builtins import range, zip
from ..utils.py3 import textstring
from ..utils import string, funcargparse #@UnresolvedImport
from ..datatable import table as datatable #@UnresolvedImport
import re
import numpy as np
import pandas as pd
_depends_local=["..utils.string"]
_table_delimiters=string._delimiters
_table_delimiters_regexp=re.compile(_table_delimiters)
def _try_convert_element(element, dtype="numeric"):
# if dtype=="complex":
# return [complex(e.lower().replace('i','j')) for e in line]
# else:
if dtype=="raw":
return element
if dtype=="generic":
return string.from_string(element)
elif dtype=="numeric":
element=string.from_string(element)
try:
complex(element)
return element
except (TypeError, ValueError):
raise ValueError("malformed element")
else:
return np.asscalar(np.array(element).astype(dtype))
def _try_convert_row(line, dtype):
"""
Try and parse a single line with a given dtype.
"""
dtype=funcargparse.as_sequence(dtype,len(line),allowed_type="builtin;nostring",length_conflict_action="error")
return np.array([_try_convert_element(e,dt) for e,dt in zip(line,dtype)])
def _get_row_length(table, dtype):
if funcargparse.is_sequence(dtype,"builtin;nostring"):
row_len=len(dtype)
else:
row_len=None
for row in table:
try:
row_len=len(_try_convert_row(row,dtype))
break
except ValueError:
pass
return row_len
def _try_convert_column(column, dtype, min_dtype="int"):
"""
Try and parse a single column with a given dtype.
Return tuple converted_column, actual_dtype.
If dtype=="generic" or dtype=="numeric", min_dtype determines "minimal" (in a sense that int<float<complex<generic) dtype.
If min_dtype!="generic", the routine first tries to convert the whole column into a numpy array, gradually increasing types on fails.
"""
if len(column)>0 and not isinstance(column[0],textstring):
raise ValueError("_try_convert_column only works for string input")
if dtype=="raw":
return column, dtype
elif dtype in {"numeric","generic"}:
if min_dtype!="generic":
dtypes_order=["int","float","complex"]
start_dtype=dtypes_order.index(min_dtype)
column_array=np.array(column)
for dt in dtypes_order[start_dtype:]:
try:
return column_array.astype(dt), dt
except ValueError:
pass
except OverflowError: # need to use the standard Python long integer type, which can't be stored in a numpy array
break
column=[string.from_string(e) for e in column]
if dtype=="numeric":
for e in column:
complex(e) # check numeric nature
min_dtype="generic" if dtype=="numeric" else dtype
return column, min_dtype
else:
return np.array(column,dtype=dtype), dtype # dtype is specified, just convert
[docs]class ChunksAccumulator(object):
"""
Class for accumulating data chunks into a single array.
Args:
dtype: dtype of entries; can be either a single type, or a list of types (one per column).
Possible dtypes are: ``'int'``, ``'float'``, ``'complex'``,
``'numeric'`` (tries to coerce to minimal possible numeric type, raises error if data can't be converted to complex),
``'generic'`` (accept arbitrary types, including lists, dictionaries, escaped strings, etc.), ``'raw'`` (keep raw string).
ignore_corrupted_lines: If ``True``, skip corrupted (e.g., non-numeric for numeric dtype, or with too few entries) lines;
otherwise, raise :exc:`ValueError`.
"""
def __init__(self, dtype="numeric", ignore_corrupted_lines=True):
object.__init__(self)
self.dtype=dtype
self.row_size=len(dtype) if funcargparse.is_sequence(dtype,"builtin;nostring") else None
self.min_dtype=None if self.row_size is None else ["int"]*self.row_size
self.ignore_corrupted_lines=ignore_corrupted_lines
self.corrupted_lines={"size":[],"type":[]}
self.columns=[]
[docs] def corrupted_number(self):
return len(self.corrupted_lines["size"])+len(self.corrupted_lines["type"])
[docs] def convert_columns(self, raw_columns):
"""
Convert raw columns into appropriate data structure (numpy array for numeric dtypes, list for generic and raw).
"""
columns=[]
new_min_dtype=[]
for rc,dt,mdt in zip(raw_columns,self.dtype,self.min_dtype):
c,mdt=_try_convert_column(rc,dt,mdt)
new_min_dtype.append(mdt)
columns.append(c)
self.min_dtype=new_min_dtype
return columns
[docs] def add_columns(self, columns):
"""
Append columns (lists or numpy arrays) to the existing data.
"""
if columns==[]:
return
if self.columns==[]:
self.columns=columns
else:
new_columns=[]
for c,ac in zip(self.columns,columns):
if isinstance(c,np.ndarray) and isinstance(ac,np.ndarray):
nc=np.concatenate((c,ac))
elif isinstance(ac,np.ndarray):
nc=c+list(ac)
elif isinstance(c,np.ndarray):
nc=list(c)+ac
else:
nc=c+ac
new_columns.append(nc)
self.columns=new_columns
[docs] def add_chunk(self, chunk):
"""
Add a chunk (2D list) to the pre-existing data.
"""
# determine row size
if self.row_size is None:
self.row_size=_get_row_length(chunk,self.dtype)
if self.row_size is None:
self.corrupted_lines["type"]=self.corrupted_lines["type"]+chunk
return
else:
self.min_dtype=["int"]*self.row_size
self.dtype=[self.dtype]*self.row_size
row_size=self.row_size
# trim chunks
trimmed_chunk=[]
for row in chunk:
if len(row)>=row_size:
trimmed_chunk.append(row[:row_size])
else:
if self.ignore_corrupted_lines:
self.corrupted_lines["size"].append(row)
else:
raise ValueError("size of the row doesn't agree with the number of columns")
# convert chunks
try:
raw_columns=zip(*trimmed_chunk)
columns=self.convert_columns(raw_columns)
except ValueError:
filtered_chunk=[]
dtype=self.dtype
for row in trimmed_chunk:
try:
#filtered_chunk.append([_try_convert_element(e,dt) for e,dt in zip(row,dtype)])
# check convertability, but otherwise leave in raw state
for e,dt in zip(row,dtype):
_try_convert_element(e,dt)
filtered_chunk.append(row)
except ValueError:
self.corrupted_lines["type"].append(row)
raw_columns=zip(*filtered_chunk)
columns=self.convert_columns(raw_columns)
self.add_columns(columns)
_complex_dtypes={"generic","raw"} # dtypes for which simple_entries==False (they can potentially be strings or lists, so that splitting lines is more complicated)
[docs]def load_columns(f, dtype, delimiters=_table_delimiters, empty_entry_substitute=None, ignore_corrupted_lines=True, stop_comment=None):
"""
Load columns from the file stream `f`.
Args:
dtype: dtype of entries; can be either a single type, or a list of types (one per column).
Possible dtypes are: ``'int'``, ``'float'``, ``'complex'``,
``'numeric'`` (tries to coerce to minimal possible numeric type, raises error if data can't be converted to complex),
``'generic'`` (accept arbitrary types, including lists, dictionaries, escaped strings, etc.), ``'raw'`` (keep raw string).
delimiters (str): Regex string which recognizes delimiters (by default ``r"\s*,\s*|\s+"``, i.e., commas and whitespaces).
empty_entry_substitute: Substitute for empty table entries. If ``None``, all empty table entries are skipped.
ignore_corrupted_lines: If ``True``, skip corrupted (e.g., non-numeric for numeric dtype, or with too few entries) lines;
otherwise, raise :exc:`ValueError`.
stop_comment (str): Regex string for the stopping comment.
If not ``None``. the function will stop if comment satisfying `stop_comment` regex is encountered.
Returns:
tuple: ``(columns, comments, corrupted_lines)``.
`columns` is a list of columns with data.
`comments` is a list of comment strings.
`corrupted_lines` is a dict ``{'size':list, 'type':list}`` of corrupted lines (already split into entries),
based on the corruption type (``'size'`` means too small size, ``'type'`` means it couldn't be converted using provided dtype).
"""
original_chunk_size=1000
chunk_multiplier=1.5
chunk_size=original_chunk_size
comments=[]
accum=ChunksAccumulator(dtype,ignore_corrupted_lines=ignore_corrupted_lines)
if funcargparse.is_sequence(dtype,"builtin;nostring"):
generic_dtype=any(dt in _complex_dtypes for dt in dtype)
else:
generic_dtype=dtype in _complex_dtypes
finished=False
while not finished:
current_corrupted=accum.corrupted_number()
chunk,chunk_comments,chunk_finished=read_table_and_comments(f,
delimiters=delimiters,empty_entry_substitute=empty_entry_substitute,stop_comment=stop_comment,chunk_size=chunk_size,simple_entries=not generic_dtype)
finished=finished or chunk_finished
comments=comments+chunk_comments
accum.add_chunk(chunk)
if accum.corrupted_number()==current_corrupted:
chunk_size=int(chunk_size*chunk_multiplier)
else:
chunk_size=max(int(chunk_size/chunk_multiplier),original_chunk_size)
return accum.columns,comments,accum.corrupted_lines
[docs]def columns_to_table(data, columns=None, out_type="table"):
"""
Convert `data` (columns list) into a table.
Args:
columns: either number if columns, or a list of columns names.
out_type (str): type of the result: ``'array'`` for numpy array, ``'pandas'`` for pandas DataFrame, ``'table'`` for :class:`.DataTable` object.
"""
funcargparse.check_parameter_range(out_type,"out_type",{"table","array","pandas"})
if columns is not None:
if funcargparse.is_sequence(columns,"builtin;nostring"):
col_num=len(columns)
else:
col_num=columns
columns=None
else:
col_num=None
if out_type=="table":
if len(data)==0 and col_num is not None:
data=[np.zeros(0,) for _ in range(col_num)] # to from array columns instead of list ones
return datatable.DataTable(data,column_names=columns,transposed=True)
elif out_type=="pandas":
if columns is None:
columns=list(range(len(data)))
return pd.DataFrame(dict(zip(columns,data)),columns=columns)
else:
if len(data)==0:
data=np.zeros((0,col_num or 0))
else:
data=np.column_stack(data)
return data
[docs]def load_table(f, dtype="numeric", columns=None, out_type="table", delimiters=_table_delimiters, empty_entry_substitute=None, ignore_corrupted_lines=True, stop_comment=None):
"""
Load table from the file stream `f`.
Arguments are the same as in :func:`load_columns` and :func:`columns_to_table`.
Returns:
tuple: ``(table, comments, corrupted_lines)``.
`table` is a table of the format `out_type`.
`corrupted_lines` is a dict ``{'size':list, 'type':list}`` of corrupted lines (already split into entries),
based on the corruption type (``'size'`` means too small size, ``'type'`` means it couldn't be converted using provided dtype).
`comments` is a list of comment strings.
"""
if columns is not None:
if funcargparse.is_sequence(columns,"builtin;nostring"):
col_num=len(columns)
else:
col_num=columns
dtype=funcargparse.as_sequence(dtype,col_num,allowed_type="builtin;nostring",length_conflict_action="error")
data,comments,corrupted_lines=load_columns(f,dtype,
delimiters=delimiters,empty_entry_substitute=empty_entry_substitute,stop_comment=stop_comment,ignore_corrupted_lines=ignore_corrupted_lines)
return columns_to_table(data,columns=columns,out_type=out_type),comments,corrupted_lines