Commit c0251df1 authored by iregon's avatar iregon
Browse files

Main functions as main in reader modules

parent d510abd4
...@@ -31,9 +31,9 @@ from io import StringIO as StringIO ...@@ -31,9 +31,9 @@ from io import StringIO as StringIO
from .data_models import schemas from .data_models import schemas
from . import properties from . import properties
from .common import pandas_TextParser_hdlr from .common import pandas_TextParser_hdlr
from .reader import import_data #from .reader import import_data
from .reader import get_sections #from .reader import get_sections
from .reader.read_sections import main as read_sections from mdf_reader.reader import import_data, get_sections, read_sections
from .validator import validate from .validator import validate
toolPath = os.path.dirname(os.path.abspath(__file__)) toolPath = os.path.dirname(os.path.abspath(__file__))
...@@ -77,13 +77,13 @@ def ERV(TextParser,read_sections_list, schema, code_tables_path): ...@@ -77,13 +77,13 @@ def ERV(TextParser,read_sections_list, schema, code_tables_path):
# - requested NA sections as NaN columns # - requested NA sections as NaN columns
# - columns(sections) order as in read_sections_list # - columns(sections) order as in read_sections_list
sections_df = get_sections.get_sections(string_df, schema, read_sections_list) sections_df = get_sections.main(string_df, schema, read_sections_list)
# 2. Read elements from sections: along data chunks, resulting data types # 2. Read elements from sections: along data chunks, resulting data types
# may vary if gaps, keep track of data types: add Intxx pandas classes rather than intxx to avoid this! # may vary if gaps, keep track of data types: add Intxx pandas classes rather than intxx to avoid this!
# Sections are parsed in the same order as sections_df.columns # Sections are parsed in the same order as sections_df.columns
[data_df, valid_df, out_dtypesi ] = read_sections(sections_df, schema) [data_df, valid_df, out_dtypesi ] = read_sections.main(sections_df, schema)
if i_chunk == 0: if i_chunk == 0:
out_dtypes = copy.deepcopy(out_dtypesi) out_dtypes = copy.deepcopy(out_dtypesi)
...@@ -271,7 +271,7 @@ def main(source, data_model = None, data_model_path = None, sections = None,chun ...@@ -271,7 +271,7 @@ def main(source, data_model = None, data_model_path = None, sections = None,chun
# 2.2 Homogeneize input data to an iterable with dataframes: # 2.2 Homogeneize input data to an iterable with dataframes:
# a list with a single dataframe or a pd.io.parsers.TextFileReader # a list with a single dataframe or a pd.io.parsers.TextFileReader
logging.info("Getting data string from source...") logging.info("Getting data string from source...")
TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows) TextParser = import_data.main(source, chunksize = chunksize, skiprows = skiprows)
# 2.3. Extract, read and validate data in same loop # 2.3. Extract, read and validate data in same loop
logging.info("Extracting and reading sections") logging.info("Extracting and reading sections")
......
...@@ -199,7 +199,7 @@ def extract_sections(string_df): ...@@ -199,7 +199,7 @@ def extract_sections(string_df):
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# MAIN # MAIN
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def get_sections(string_df, schema, read_sections): def main(string_df, schema, read_sections):
global sentinals, section_lens, sentinals_lens global sentinals, section_lens, sentinals_lens
global parsing_order global parsing_order
# Proceed to split sections if more than one # Proceed to split sections if more than one
......
...@@ -41,7 +41,7 @@ import os ...@@ -41,7 +41,7 @@ import os
from .. import properties from .. import properties
def import_data(source,chunksize = None, skiprows = None): def main(source,chunksize = None, skiprows = None):
if os.path.isfile(source): if os.path.isfile(source):
TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize) TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize)
......
...@@ -7,7 +7,7 @@ Extracts and reads (decodes, scales, etc...) the elements of data sections. ...@@ -7,7 +7,7 @@ Extracts and reads (decodes, scales, etc...) the elements of data sections.
Each column of the input dataframe is a section with all its elements stored Each column of the input dataframe is a section with all its elements stored
as a single string. as a single string.
Working on a section by section basis, this module uses the data model Working on a section by section basis, this module uses the data model
information provided in the schema to split the elements, decode and scale them information provided in the schema to split the elements, decode and scale them
where appropriate and ensure its data type consistency. where appropriate and ensure its data type consistency.
...@@ -24,12 +24,12 @@ DEV NOTES: ...@@ -24,12 +24,12 @@ DEV NOTES:
# how to write that..... # how to write that.....
#https://stackoverflow.com/questions/21147058/pandas-to-csv-output-quoting-issue #https://stackoverflow.com/questions/21147058/pandas-to-csv-output-quoting-issue
# quoting=csv.QUOTE_NONE was failing when a section is empty (or just one record in a section,...) # quoting=csv.QUOTE_NONE was failing when a section is empty (or just one record in a section,...)
sections_df[section].to_csv(section_buffer,header=False, encoding = 'utf-8',index = False,quoting=csv.QUOTE_NONE,escapechar="\\",sep="\t") sections_df[section].to_csv(section_buffer,header=False, encoding = 'utf-8',index = False,quoting=csv.QUOTE_NONE,escapechar="\\",sep="\t")
But we were still experiencing problems when reading fully empty sections, now But we were still experiencing problems when reading fully empty sections, now
we only write to the section buffer reports that are not empty. We afterwards we only write to the section buffer reports that are not empty. We afterwards
recover the indexes.... recover the indexes....
@author: iregon @author: iregon
""" """
...@@ -50,24 +50,25 @@ def extract_fixed_width(section_serie_bf,section_schema): ...@@ -50,24 +50,25 @@ def extract_fixed_width(section_serie_bf,section_schema):
section_elements = pd.read_fwf(section_serie_bf, widths = section_widths, header = None, names = section_names , na_values = section_missing, delimiter="\t", encoding = 'utf-8', dtype = 'object', skip_blank_lines = False ) section_elements = pd.read_fwf(section_serie_bf, widths = section_widths, header = None, names = section_names , na_values = section_missing, delimiter="\t", encoding = 'utf-8', dtype = 'object', skip_blank_lines = False )
return section_elements return section_elements
def extract_delimited(section_serie_bf,section_schema): def extract_delimited(section_serie_bf,section_schema):
delimiter = section_schema['header'].get('delimiter') delimiter = section_schema['header'].get('delimiter')
section_names = section_schema['elements'].keys() section_names = section_schema['elements'].keys()
section_missing = { x:section_schema['elements'][x].get('missing_value') for x in section_names } section_missing = { x:section_schema['elements'][x].get('missing_value') for x in section_names }
section_elements = pd.read_csv(section_serie_bf,header = None, delimiter = delimiter, encoding = 'utf-8', section_elements = pd.read_csv(section_serie_bf,header = None, delimiter = delimiter, encoding = 'utf-8',
dtype = 'object', skip_blank_lines = False, dtype = 'object', skip_blank_lines = False,
names = section_names, na_values = section_missing) names = section_names, na_values = section_missing)
return section_elements return section_elements
def read_data(section_df,section_schema): def read_data(section_df,section_schema):
section_names = section_df.columns section_names = section_df.columns
section_dtypes = { i:section_schema['elements'][i]['column_type'] for i in section_names } section_dtypes = { i:section_schema['elements'][i]['column_type'] for i in section_names }
encoded = [ (x) for x in section_names if 'encoding' in section_schema['elements'][x]] encoded = [ (x) for x in section_names if 'encoding' in section_schema['elements'][x]]
section_encoding = { i:section_schema['elements'][i]['encoding'] for i in encoded } section_encoding = { i:section_schema['elements'][i]['encoding'] for i in encoded }
section_valid = pd.DataFrame(index = section_df.index, columns = section_df.columns) section_valid = pd.DataFrame(index = section_df.index, columns = section_df.columns)
for element in section_dtypes.keys(): for element in section_dtypes.keys():
print(element)
missing = section_df[element].isna() missing = section_df[element].isna()
if element in encoded: if element in encoded:
section_df[element] = decoders.get(section_encoding.get(element)).get(section_dtypes.get(element))(section_df[element]) section_df[element] = decoders.get(section_encoding.get(element)).get(section_dtypes.get(element))(section_df[element])
...@@ -76,29 +77,29 @@ def read_data(section_df,section_schema): ...@@ -76,29 +77,29 @@ def read_data(section_df,section_schema):
section_df[element] = converters.get(section_dtypes.get(element))(section_df[element], **kwargs) section_df[element] = converters.get(section_dtypes.get(element))(section_df[element], **kwargs)
section_valid[element] = missing | section_df[element].notna() section_valid[element] = missing | section_df[element].notna()
return section_df,section_valid return section_df,section_valid
def read_sections(sections_df, schema): def main(sections_df, schema):
multiindex = True if len(sections_df.columns) > 1 or sections_df.columns[0] != properties.dummy_level else False multiindex = True if len(sections_df.columns) > 1 or sections_df.columns[0] != properties.dummy_level else False
data_df = pd.DataFrame(index = sections_df.index) data_df = pd.DataFrame(index = sections_df.index)
valid_df = pd.DataFrame(index = sections_df.index) valid_df = pd.DataFrame(index = sections_df.index)
out_dtypes = dict() out_dtypes = dict()
for section in sections_df.columns: for section in sections_df.columns:
print('Reading section {}'.format(section)) print('Reading section {}'.format(section))
section_schema = schema['sections'].get(section) section_schema = schema['sections'].get(section)
disable_read = section_schema.get('header').get('disable_read') disable_read = section_schema.get('header').get('disable_read')
if not disable_read: if not disable_read:
field_layout = section_schema.get('header').get('field_layout') field_layout = section_schema.get('header').get('field_layout')
ignore = [ i for i in section_schema['elements'].keys() if section_schema['elements'][i].get('ignore') ] # evals to True if set and true, evals to False if not set or set and false ignore = [ i for i in section_schema['elements'].keys() if section_schema['elements'][i].get('ignore') ] # evals to True if set and true, evals to False if not set or set and false
# Get rid of false delimiters in fixed_width # Get rid of false delimiters in fixed_width
delimiter = section_schema['header'].get('delimiter') delimiter = section_schema['header'].get('delimiter')
if delimiter and field_layout == 'fixed_width': if delimiter and field_layout == 'fixed_width':
sections_df[section] = sections_df[section].str.replace(delimiter,'') sections_df[section] = sections_df[section].str.replace(delimiter,'')
section_buffer = StringIO() section_buffer = StringIO()
# Here indices are lost, have to give the real ones, those in section_strings: # Here indices are lost, have to give the real ones, those in section_strings:
# we'll see if we do that in the caller module or here.... # we'll see if we do that in the caller module or here....
...@@ -112,9 +113,9 @@ def read_sections(sections_df, schema): ...@@ -112,9 +113,9 @@ def read_sections(sections_df, schema):
section_elements_obj = extract_fixed_width(section_buffer,section_schema) section_elements_obj = extract_fixed_width(section_buffer,section_schema)
elif field_layout == 'delimited': elif field_layout == 'delimited':
section_elements_obj = extract_delimited(section_buffer,section_schema) section_elements_obj = extract_delimited(section_buffer,section_schema)
section_elements_obj.drop(ignore, axis = 1, inplace = True) section_elements_obj.drop(ignore, axis = 1, inplace = True)
# Read the objects to their data types and apply decoding, scaling and so on... # Read the objects to their data types and apply decoding, scaling and so on...
# Give them their actual indexes back # Give them their actual indexes back
section_elements, section_valid = read_data(section_elements_obj,section_schema) section_elements, section_valid = read_data(section_elements_obj,section_schema)
...@@ -124,30 +125,30 @@ def read_sections(sections_df, schema): ...@@ -124,30 +125,30 @@ def read_sections(sections_df, schema):
else: else:
section_elements = pd.DataFrame(sections_df[section],columns = [section]) section_elements = pd.DataFrame(sections_df[section],columns = [section])
section_valid = pd.DataFrame(index = section_elements.index,data = True, columns = [section]) section_valid = pd.DataFrame(index = section_elements.index,data = True, columns = [section])
section_elements.columns = [ (section, x) for x in section_elements.columns] if multiindex else section_elements.columns section_elements.columns = [ (section, x) for x in section_elements.columns] if multiindex else section_elements.columns
section_valid.columns = section_elements.columns section_valid.columns = section_elements.columns
data_df = pd.concat([data_df,section_elements],sort = False,axis=1) data_df = pd.concat([data_df,section_elements],sort = False,axis=1)
valid_df = pd.concat([valid_df,section_valid],sort = False,axis=1) valid_df = pd.concat([valid_df,section_valid],sort = False,axis=1)
# We do the actual out_dtypes here: because the full indexing occurs only # We do the actual out_dtypes here: because the full indexing occurs only
# after concat, NaN values may arise only in data_df if a section is # after concat, NaN values may arise only in data_df if a section is
# not existing in a given report! # not existing in a given report!
for section in sections_df.columns: for section in sections_df.columns:
section_schema = schema['sections'].get(section) section_schema = schema['sections'].get(section)
if not section_schema.get('header').get('disable_read'): if not section_schema.get('header').get('disable_read'):
elements = [ x[1] for x in data_df.columns if x[0] == section ] elements = [ x[1] for x in data_df.columns if x[0] == section ]
if multiindex: if multiindex:
out_dtypes.update({ (section,i):properties.pandas_dtypes.get(section_schema['elements'][i].get('column_type')) for i in elements } ) out_dtypes.update({ (section,i):properties.pandas_dtypes.get(section_schema['elements'][i].get('column_type')) for i in elements } )
out_dtypes.update({ (section,i):data_df[(section,i)].dtype.name for i in elements if data_df[(section,i)].dtype.name in properties.numpy_floats}) out_dtypes.update({ (section,i):data_df[(section,i)].dtype.name for i in elements if data_df[(section,i)].dtype.name in properties.numpy_floats})
else: else:
out_dtypes.update({ i:properties.pandas_dtypes.get(section_schema['elements'][i].get('column_type')) for i in elements } ) out_dtypes.update({ i:properties.pandas_dtypes.get(section_schema['elements'][i].get('column_type')) for i in elements } )
out_dtypes.update({ i:data_df[i].dtype.name for i in section_elements if data_df[i].dtype.name in properties.numpy_floats}) out_dtypes.update({ i:data_df[i].dtype.name for i in section_elements if data_df[i].dtype.name in properties.numpy_floats})
else: else:
if multiindex: if multiindex:
out_dtypes.update({ (section,section):'object' } ) out_dtypes.update({ (section,section):'object' } )
else: else:
out_dtypes.update({ section:'object' } ) out_dtypes.update({ section:'object' } )
return data_df, valid_df, out_dtypes return data_df, valid_df, out_dtypes
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment