Commit b6648218 authored by iregon's avatar iregon
Browse files

Validation integrated in main flow

parent 4cf39a3d
...@@ -31,6 +31,56 @@ from mdf_reader.reader import get_sections ...@@ -31,6 +31,56 @@ from mdf_reader.reader import get_sections
from mdf_reader.reader import read_sections from mdf_reader.reader import read_sections
from mdf_reader.validate import validate from mdf_reader.validate import validate
toolPath = os.path.dirname(os.path.abspath(__file__))
schema_lib = os.path.join(toolPath,'schemas','lib')
def ERV(TextParser,read_sections_list, schema, code_tables_path):
data_buffer = StringIO()
valid_buffer = StringIO()
for i_chunk, string_df in enumerate(TextParser):
# a. Get a DF with sections separated in columns:
# - one section per column
# - only sections requested, ignore rest
# - requested NA sections as NaN columns
# - columns order as in read_sections_list
sections_df = get_sections.get_sections(string_df, schema, read_sections_list)
# b. Read elements from sections: along data chunks, resulting data types
# may vary if gaps, keep track of data types!
# Sections as parsed in the same order as sections_df.columns
[data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema)
if i_chunk == 0:
out_dtypes = copy.deepcopy(out_dtypesi)
for k in out_dtypesi:
if out_dtypesi in properties.numpy_floats:
out_dtypes.update({ k:out_dtypesi.get(k) })
valid_df = validate.validate(data_df, valid_df, schema, code_tables_path)
# Save to buffer
data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
valid_df.to_csv(valid_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
# Create the output
# WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
data_buffer.seek(0)
valid_buffer.seek(0)
logging.info("Wrapping output....")
# Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader
# (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input):
# This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader
chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
# 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail
date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
for i,element in enumerate(list(out_dtypes)):
if out_dtypes.get(element) == 'datetime':
date_columns.append(i)
data = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes, parse_dates = date_columns)
valid = pd.read_csv(valid_buffer,names = data_df.columns, chunksize = chunksize)
return data, valid
def validate_arg(arg_name,arg_value,arg_type): def validate_arg(arg_name,arg_value,arg_type):
if arg_value and not isinstance(arg_value,arg_type): if arg_value and not isinstance(arg_value,arg_type):
...@@ -70,12 +120,17 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun ...@@ -70,12 +120,17 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path) schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path)
if not schema: if not schema:
return return
if data_model:
model_path = os.path.join(schema_lib,data_model)
else:
model_path = data_model_path
code_tables_path = os.path.join(model_path,'code_tables')
# For future use: some work already done in schema reading # For future use: some work already done in schema reading
if schema['header'].get('multiple_reports_per_line'): if schema['header'].get('multiple_reports_per_line'):
logging.error('File format not yet supported') logging.error('File format not yet supported')
sys.exit(1) sys.exit(1)
# 2. Read data # 2. Read and validate data
imodel = data_model if data_model else data_model_path imodel = data_model if data_model else data_model_path
logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel)) logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel))
...@@ -92,65 +147,18 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun ...@@ -92,65 +147,18 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
# a list with a single dataframe or a pd.io.parsers.TextFileReader # a list with a single dataframe or a pd.io.parsers.TextFileReader
logging.info("Getting data string from source...") logging.info("Getting data string from source...")
TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows) TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows)
print(type(TextParser))
# 2.3. Extract and read data in same loop # 2.3. Extract, read and validate data in same loop
logging.info("Extracting sections...") logging.info("Extracting and reading sections")
data_buffer = StringIO() data,valid = ERV(TextParser,read_sections_list, schema, code_tables_path)
valid_buffer = StringIO()
for i,string_df in enumerate(TextParser):
# a. Get a DF with sections separated in columns:
# - one section per column
# - only sections requested, ignore rest
# - requested NA sections as NaN columns
# - columns order as in read_sections_list
sections_df = get_sections.get_sections(string_df, schema, read_sections_list)
# b. Read elements from sections: along data chunks, resulting data types
# may vary if gaps, keep track of data types!
# Sections as parsed in the same order as sections_df.columns
[data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema)
if i == 0:
out_dtypes = copy.deepcopy(out_dtypesi)
for k in out_dtypesi:
if out_dtypesi in properties.numpy_floats:
out_dtypes.update({ k:out_dtypesi.get(k) })
# Save to buffer
data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
valid_df.to_csv(valid_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
# 2.4 Create output data
# WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
data_buffer.seek(0)
valid_buffer.seek(0)
logging.info("Wrapping output....")
# Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader
# (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input):
# This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader
chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
# 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail
date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
for i,element in enumerate(list(out_dtypes)):
if out_dtypes.get(element) == 'datetime':
date_columns.append(i)
data = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes, parse_dates = date_columns)
valid = pd.read_csv(valid_buffer,names = data_df.columns, chunksize = chunksize)
# 3. Create out data attributes # 3. Create out data attributes
logging.info("CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL(S)") logging.info("CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL(S)")
data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names'] data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names']
out_atts = schemas.df_schema(data_columns, schema, data_model) out_atts = schemas.df_schema(data_columns, schema)
# 4. Complete data model validation
logging.info("VALIDATING DATA")
valid = validate.validate(data, out_atts, valid, data_model = data_model, data_model_path = data_model_path)
if isinstance(data,pd.io.parsers.TextFileReader):
logging.info('...RESTORING DATA PARSER')
data = pandas_TextParser_hdlr.restore(data.f,data.orig_options)
# 5. Output to files if requested # 4. Output to files if requested
if out_path: if out_path:
enlisted = False enlisted = False
if not isinstance(data,pd.io.parsers.TextFileReader): if not isinstance(data,pd.io.parsers.TextFileReader):
...@@ -182,7 +190,7 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun ...@@ -182,7 +190,7 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
with open(os.path.join(out_path,'atts.json'),'w') as fileObj: with open(os.path.join(out_path,'atts.json'),'w') as fileObj:
json.dump(out_atts_json,fileObj,indent=4) json.dump(out_atts_json,fileObj,indent=4)
# 6. Return data # 5. Return data
return {'data':data,'atts':out_atts,'valid_mask':valid} return {'data':data,'atts':out_atts,'valid_mask':valid}
if __name__=='__main__': if __name__=='__main__':
......
...@@ -50,6 +50,7 @@ import io ...@@ -50,6 +50,7 @@ import io
from mdf_reader import properties from mdf_reader import properties
def to_iterable_df(source,skiprows = None, chunksize = None): def to_iterable_df(source,skiprows = None, chunksize = None):
print('chunksize is {}'.format(str(chunksize)))
TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize) TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize)
if not chunksize: if not chunksize:
TextParser = [TextParser] TextParser = [TextParser]
......
...@@ -133,15 +133,13 @@ def read_schema(schema_name = None, ext_schema_path = None): ...@@ -133,15 +133,13 @@ def read_schema(schema_name = None, ext_schema_path = None):
schema['header']['parsing_order'][0]['s'].append(str(i)) schema['header']['parsing_order'][0]['s'].append(str(i))
return schema return schema
def df_schema(df_columns, schema, data_model, supp_section = None, supp_schema = None, supp_model = None ): def df_schema(df_columns, schema):
def clean_schema(columns,schema,data_model): def clean_schema(columns,schema):
# Could optionally add cleaning of element descriptors that only apply # Could optionally add cleaning of element descriptors that only apply
# to the initial reading of the data model: field_length, etc.... # to the initial reading of the data model: field_length, etc....
for element in list(schema): for element in list(schema):
if element not in columns: if element not in columns:
schema.pop(element) schema.pop(element)
else:
schema[element].update({'data_model': data_model})
return return
flat_schema = dict() flat_schema = dict()
...@@ -154,12 +152,7 @@ def df_schema(df_columns, schema, data_model, supp_section = None, supp_schema = ...@@ -154,12 +152,7 @@ def df_schema(df_columns, schema, data_model, supp_section = None, supp_schema =
else: else:
flat_schema.update( { (section, x): schema['sections'].get(section).get('elements').get(x) for x in schema['sections'].get(section).get('elements') }) flat_schema.update( { (section, x): schema['sections'].get(section).get('elements').get(x) for x in schema['sections'].get(section).get('elements') })
clean_schema(df_columns, flat_schema, data_model) clean_schema(df_columns, flat_schema)
# Here we are assuming supp data has no sections!
if supp_section:
flat_supp = dict()
flat_supp.update( { (supp_section, x): supp_schema['sections'].get(properties.dummy_level).get('elements').get(x) for x in supp_schema['sections'].get(properties.dummy_level).get('elements') })
clean_schema(df_columns, flat_supp, supp_model)
flat_schema.update(flat_supp)
return flat_schema return flat_schema
...@@ -16,50 +16,31 @@ Validated elements are those with the following column_types: ...@@ -16,50 +16,31 @@ Validated elements are those with the following column_types:
@author: iregon @author: iregon
""" """
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
# CAREFULL HERE:
# Note that in Python 3, the io.open function is an alias for the built-in open function.
# The built-in open function only supports the encoding argument in Python 3, not Python 2.
# https://docs.python.org/3.4/library/io.html?highlight=io
from io import StringIO as StringIO
import sys
import os import os
import pandas as pd import pandas as pd
import numpy as np import numpy as np
import logging import logging
from .. import properties from .. import properties
from ..schemas import code_tables from ..schemas import code_tables
from ..schemas import schemas
if sys.version_info[0] >= 3: def validate_numeric(elements,data,schema):
py3 = True
else:
py3 = False
from io import BytesIO as BytesIO
# Get pandas dtype for time_stamps
toolPath = os.path.dirname(os.path.abspath(__file__))
dirname=os.path.dirname
schema_lib = os.path.join(dirname(toolPath),'schemas','lib')
def validate_numeric(elements,df,schema):
# Find thresholds in schema. Flag if not available -> warn # Find thresholds in schema. Flag if not available -> warn
mask = pd.DataFrame(index = df.index, data = False, columns = elements) mask = pd.DataFrame(index = data.index, data = False, columns = elements)
lower = { x:schema.get(x).get('valid_min', -np.inf) for x in elements } lower = { x:schema.get(x).get('valid_min', -np.inf) for x in elements }
upper = { x:schema.get(x).get('valid_max', np.inf) for x in elements } upper = { x:schema.get(x).get('valid_max', np.inf) for x in elements }
set_elements = [ x for x in lower.keys() if lower.get(x) != -np.inf and upper.get(x) != np.inf ] set_elements = [ x for x in lower.keys() if lower.get(x) != -np.inf and upper.get(x) != np.inf ]
if len([ x for x in elements if x not in set_elements ]) > 0: if len([ x for x in elements if x not in set_elements ]) > 0:
logging.warning('Data numeric elements with missing upper or lower threshold: {}'.format(",".join([ str(x) for x in elements if x not in set_elements ]))) logging.warning('Data numeric elements with missing upper or lower threshold: {}'.format(",".join([ str(x) for x in elements if x not in set_elements ])))
logging.warning('Corresponding upper and/or lower bounds set to +/-inf for validation') logging.warning('Corresponding upper and/or lower bounds set to +/-inf for validation')
#mask[set_elements] = ((df[set_elements] >= [ lower.get(x) for x in set_elements ] ) & (df[set_elements] <= [ upper.get(x) for x in set_elements ])) | df[set_elements].isna()
mask[elements] = ((df[elements] >= [ lower.get(x) for x in elements ] ) & (df[elements] <= [ upper.get(x) for x in elements ])) | df[elements].isna() mask[elements] = ((data[elements] >= [ lower.get(x) for x in elements ] ) & (data[elements] <= [ upper.get(x) for x in elements ])) | data[elements].isna()
return mask return mask
def validate_codes(elements, df, code_tables_path, schema, supp = False): def validate_codes(elements, data, code_tables_path, schema, supp = False):
mask = pd.DataFrame(index = df.index, data = False, columns = elements) mask = pd.DataFrame(index = data.index, data = False, columns = elements)
if os.path.isdir(code_tables_path): if os.path.isdir(code_tables_path):
for element in elements: for element in elements:
...@@ -85,8 +66,8 @@ def validate_codes(elements, df, code_tables_path, schema, supp = False): ...@@ -85,8 +66,8 @@ def validate_codes(elements, df, code_tables_path, schema, supp = False):
dtypes = { x:properties.pandas_dtypes.get(schema.get(x).get('column_type')) for x in key_elements } dtypes = { x:properties.pandas_dtypes.get(schema.get(x).get('column_type')) for x in key_elements }
table_keys = code_tables.table_keys(table) table_keys = code_tables.table_keys(table)
table_keys_str = [ "∿".join(x) if isinstance(x,list) else x for x in table_keys ] table_keys_str = [ "∿".join(x) if isinstance(x,list) else x for x in table_keys ]
validation_df = df[key_elements] validation_df = data[key_elements]
imask = pd.Series(index = df.index, data =True) imask = pd.Series(index = data.index, data =True)
imask.iloc[np.where(validation_df.notna().all(axis = 1))[0]] = validation_df.iloc[np.where(validation_df.notna().all(axis = 1))[0],:].astype(dtypes).astype('str').apply("∿".join, axis=1).isin(table_keys_str) imask.iloc[np.where(validation_df.notna().all(axis = 1))[0]] = validation_df.iloc[np.where(validation_df.notna().all(axis = 1))[0],:].astype(dtypes).astype('str').apply("∿".join, axis=1).isin(table_keys_str)
mask[element] = imask mask[element] = imask
except Exception as e: except Exception as e:
...@@ -104,92 +85,57 @@ def validate_codes(elements, df, code_tables_path, schema, supp = False): ...@@ -104,92 +85,57 @@ def validate_codes(elements, df, code_tables_path, schema, supp = False):
return mask return mask
def validate(data, schema, mask0, data_model = None, data_model_path = None, supp_section = None, supp_model = None, supp_model_path = None ):
# schema is the input data schema: collection of attributes for DF elements, not the data model schema def validate(data, mask0, schema, code_tables_path):
# data model schema info is nevertheless needed to access code tables
logging.basicConfig(format='%(levelname)s\t[%(asctime)s](%(filename)s)\t%(message)s', logging.basicConfig(format='%(levelname)s\t[%(asctime)s](%(filename)s)\t%(message)s',
level=logging.INFO,datefmt='%Y%m%d %H:%M:%S',filename=None) level=logging.INFO,datefmt='%Y%m%d %H:%M:%S',filename=None)
# 0. Check arguments are valid--------------------------------------------- # Check input
if not data_model and not data_model_path: if not isinstance(data,pd.DataFrame) or not isinstance(mask0,pd.DataFrame):
logging.error('A valid data model or data model path must be provided') logging.error('Input data and mask must be a pandas data frame object')
return return
if supp_section:
if not supp_model and not supp_model_path: # Get the data elements from the input data: might be just a subset of
logging.error('A valid data model or data model path must be provided for supplemental data') # data model and flatten the schema to get a simple and sequential list
return # of elements included in the input data
if not isinstance(data,pd.DataFrame) and not isinstance(data,pd.io.parsers.TextFileReader): elements = [ x for x in data ]
logging.error('Input data must be a data frame or a TextFileReader object') element_atts = schemas.df_schema(elements, schema)
return # See what elements we need to validate
# 1. Get data models' path------------------------------------------------- numeric_elements = [ x for x in elements if element_atts.get(x).get('column_type') in properties.numeric_types ]
if data_model: datetime_elements = [ x for x in elements if element_atts.get(x).get('column_type') == 'datetime' ]
model_path = os.path.join(schema_lib,data_model) coded_elements = [ x for x in elements if element_atts.get(x).get('column_type') == 'key' ]
else:
model_path = data_model_path
code_tables_path = os.path.join(model_path,'code_tables')
if supp_section: if any([isinstance(x,tuple) for x in numeric_elements + datetime_elements + coded_elements ]):
if supp_model: validated_columns = pd.MultiIndex.from_tuples(list(set(numeric_elements + coded_elements + datetime_elements)))
supp_path = os.path.join(schema_lib,supp_model) else:
else: validated_columns = list(set(numeric_elements + coded_elements + datetime_elements))
supp_path = supp_model_path
supp_code_tables_path = os.path.join(supp_path,'code_tables')
# 2. Go--------------------------------------------------------------------
TextParserData = [data.copy()] if isinstance(data,pd.DataFrame) else data
TextParserMask = [mask0.copy()] if isinstance(mask0,pd.DataFrame) else mask0
output_buffer = StringIO() if py3 else BytesIO()
for df, mk in zip(TextParserData, TextParserMask):
elements = [ x for x in df if x in schema ]
# See what elements we need to validate: coded go to different code table paths if supplemental
numeric_elements = [ x for x in elements if schema.get(x).get('column_type') in properties.numeric_types ]
datetime_elements = [ x for x in elements if schema.get(x).get('column_type') == 'datetime' ]
coded_elements = [ x for x in elements if schema.get(x).get('column_type') == 'key' ]
if supp_section:
supp_coded_elements = [ x for x in coded_elements if x[0] == supp_section ]
for x in supp_coded_elements:
coded_elements.remove(x)
if any([isinstance(x,tuple) for x in numeric_elements + datetime_elements + coded_elements ]):
validated_columns = pd.MultiIndex.from_tuples(list(set(numeric_elements + coded_elements + datetime_elements)))
else:
validated_columns = list(set(numeric_elements + coded_elements + datetime_elements))
imask = pd.DataFrame(index = df.index, columns = df.columns)
# Validate elements by dtype
# Table coded elements can be as well numeric -> initially should not have its bounds defined in schema, but:
# Numeric validation will be overriden by code table validation!!!
# 1. NUMERIC ELEMENTS
imask[numeric_elements] = validate_numeric(numeric_elements, df, schema)
# 2. TABLE CODED ELEMENTS mask = pd.DataFrame(index = data.index, columns = data.columns)
# See following: in multiple keys code tables, the non parameter element, won't have a code_table attribute in the schema:
# So we need to check the code_table.keys files in addition to the schema
# Additionally, a YEAR key can fail in one table, but be compliant with anbother, then, how would we mask this?
# also, a YEAR defined as an integer, will undergo its own check.....
# So I think we need to check nested keys as a whole, and mask only the actual parameterized element:
# Get the full list of keys combinations (tuples, triplets...) and check the column combination against that: if it fails, mark the element!
# Need to see how to grab the YEAR part of a datetime when YEAR comes from a datetime element
# pd.DatetimeIndex(df['_datetime']).year
if len(coded_elements)> 0:
imask[coded_elements] = validate_codes(coded_elements, df, code_tables_path, schema)
try:
if len(supp_coded_elements)>0:
imask[supp_coded_elements] = validate_codes(supp_coded_elements, df, supp_code_tables_path, schema, supp = True)
except:
pass
# 3. DATETIME ELEMENTS
# only those declared as such in schema, not _datetime
# Because of the way they are converted, read into datetime, they should already be NaT if they not validate as a valid datetime;
# let's check: hurray! they are!
imask[datetime_elements] = df[datetime_elements].notna()
imask[validated_columns] = imask[validated_columns].mask(mk[validated_columns] == False, False)
imask.to_csv(output_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) # Validate elements by dtype:
# 1. Numeric elements
mask[numeric_elements] = validate_numeric(numeric_elements, data, element_atts)
# 2. Table coded elements
# See following: in multiple keys code tables, the non parameter element,
# won't have a code_table attribute in the element_atts:
# So we need to check the code_table.keys files in addition to the element_atts
# Additionally, a YEAR key can fail in one table, but be compliant with anbother, then, how would we mask this?
# also, a YEAR defined as an integer, will undergo its own check.....
# So I think we need to check nested keys as a whole, and mask only the actual parameterized element:
# Get the full list of keys combinations (tuples, triplets...) and check the column combination against that: if it fails, mark the element!
# Need to see how to grab the YEAR part of a datetime when YEAR comes from a datetime element
# pd.DatetimeIndex(df['_datetime']).year
if len(coded_elements)> 0:
mask[coded_elements] = validate_codes(coded_elements, data, code_tables_path, element_atts)
output_buffer.seek(0) # 3. Datetime elements
chunksize = None if isinstance(data,pd.DataFrame) else data.orig_options['chunksize'] # Those declared as such in element_atts
mask = pd.read_csv(output_buffer,names = [ x for x in imask ], chunksize = chunksize) # Because of the way they are converted, read into datetime,
# they should already be NaT if they not validate as a valid datetime;
# let's check: hurray! they are!
mask[datetime_elements] = data[datetime_elements].notna()
return mask mask[validated_columns] = mask[validated_columns].mask(mask0[validated_columns] == False, False)
\ No newline at end of file return mask
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment