diff --git a/read.py b/read.py index 4c39ceb8a46549a8568743c72e8fd3f63203c0e1..7720cd26aabaf953a92bef9dfd8f4142a841df9a 100644 --- a/read.py +++ b/read.py @@ -31,6 +31,56 @@ from mdf_reader.reader import get_sections from mdf_reader.reader import read_sections from mdf_reader.validate import validate +toolPath = os.path.dirname(os.path.abspath(__file__)) +schema_lib = os.path.join(toolPath,'schemas','lib') + +def ERV(TextParser,read_sections_list, schema, code_tables_path): + data_buffer = StringIO() + valid_buffer = StringIO() + + for i_chunk, string_df in enumerate(TextParser): + # a. Get a DF with sections separated in columns: + # - one section per column + # - only sections requested, ignore rest + # - requested NA sections as NaN columns + # - columns order as in read_sections_list + sections_df = get_sections.get_sections(string_df, schema, read_sections_list) + + # b. Read elements from sections: along data chunks, resulting data types + # may vary if gaps, keep track of data types! + # Sections as parsed in the same order as sections_df.columns + [data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema) + if i_chunk == 0: + out_dtypes = copy.deepcopy(out_dtypesi) + + for k in out_dtypesi: + if out_dtypesi in properties.numpy_floats: + out_dtypes.update({ k:out_dtypesi.get(k) }) + + valid_df = validate.validate(data_df, valid_df, schema, code_tables_path) + # Save to buffer + data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) + valid_df.to_csv(valid_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) + # Create the output + # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE + data_buffer.seek(0) + valid_buffer.seek(0) + logging.info("Wrapping output....") + # Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader + # (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input): + # This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader + chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None + # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail + date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples.... + for i,element in enumerate(list(out_dtypes)): + if out_dtypes.get(element) == 'datetime': + date_columns.append(i) + + data = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes, parse_dates = date_columns) + valid = pd.read_csv(valid_buffer,names = data_df.columns, chunksize = chunksize) + + return data, valid + def validate_arg(arg_name,arg_value,arg_type): if arg_value and not isinstance(arg_value,arg_type): @@ -70,12 +120,17 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path) if not schema: return + if data_model: + model_path = os.path.join(schema_lib,data_model) + else: + model_path = data_model_path + code_tables_path = os.path.join(model_path,'code_tables') # For future use: some work already done in schema reading if schema['header'].get('multiple_reports_per_line'): logging.error('File format not yet supported') sys.exit(1) - # 2. Read data + # 2. Read and validate data imodel = data_model if data_model else data_model_path logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel)) @@ -92,65 +147,18 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun # a list with a single dataframe or a pd.io.parsers.TextFileReader logging.info("Getting data string from source...") TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows) + print(type(TextParser)) - # 2.3. Extract and read data in same loop - logging.info("Extracting sections...") - data_buffer = StringIO() - valid_buffer = StringIO() - - for i,string_df in enumerate(TextParser): - # a. Get a DF with sections separated in columns: - # - one section per column - # - only sections requested, ignore rest - # - requested NA sections as NaN columns - # - columns order as in read_sections_list - sections_df = get_sections.get_sections(string_df, schema, read_sections_list) - - # b. Read elements from sections: along data chunks, resulting data types - # may vary if gaps, keep track of data types! - # Sections as parsed in the same order as sections_df.columns - [data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema) - if i == 0: - out_dtypes = copy.deepcopy(out_dtypesi) - - for k in out_dtypesi: - if out_dtypesi in properties.numpy_floats: - out_dtypes.update({ k:out_dtypesi.get(k) }) - # Save to buffer - data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) - valid_df.to_csv(valid_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) - - # 2.4 Create output data - # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE - data_buffer.seek(0) - valid_buffer.seek(0) - logging.info("Wrapping output....") - # Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader - # (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input): - # This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader - chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None - # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail - date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples.... - for i,element in enumerate(list(out_dtypes)): - if out_dtypes.get(element) == 'datetime': - date_columns.append(i) - - data = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes, parse_dates = date_columns) - valid = pd.read_csv(valid_buffer,names = data_df.columns, chunksize = chunksize) + # 2.3. Extract, read and validate data in same loop + logging.info("Extracting and reading sections") + data,valid = ERV(TextParser,read_sections_list, schema, code_tables_path) # 3. Create out data attributes logging.info("CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL(S)") data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names'] - out_atts = schemas.df_schema(data_columns, schema, data_model) - - # 4. Complete data model validation - logging.info("VALIDATING DATA") - valid = validate.validate(data, out_atts, valid, data_model = data_model, data_model_path = data_model_path) - if isinstance(data,pd.io.parsers.TextFileReader): - logging.info('...RESTORING DATA PARSER') - data = pandas_TextParser_hdlr.restore(data.f,data.orig_options) + out_atts = schemas.df_schema(data_columns, schema) - # 5. Output to files if requested + # 4. Output to files if requested if out_path: enlisted = False if not isinstance(data,pd.io.parsers.TextFileReader): @@ -182,7 +190,7 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun with open(os.path.join(out_path,'atts.json'),'w') as fileObj: json.dump(out_atts_json,fileObj,indent=4) - # 6. Return data + # 5. Return data return {'data':data,'atts':out_atts,'valid_mask':valid} if __name__=='__main__': diff --git a/reader/import_data.py b/reader/import_data.py index a0a3e3ac4efdbc782f0e5092f1c2295fbeae1611..a167be9b8cafd82126cbbd224326e97486dde878 100644 --- a/reader/import_data.py +++ b/reader/import_data.py @@ -50,6 +50,7 @@ import io from mdf_reader import properties def to_iterable_df(source,skiprows = None, chunksize = None): + print('chunksize is {}'.format(str(chunksize))) TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize) if not chunksize: TextParser = [TextParser] diff --git a/schemas/schemas.py b/schemas/schemas.py index a5778adeee4940ebfcfc46b1d10118648378fed4..22722a9a1e99da758398675a94c8a19291e9e62f 100644 --- a/schemas/schemas.py +++ b/schemas/schemas.py @@ -133,15 +133,13 @@ def read_schema(schema_name = None, ext_schema_path = None): schema['header']['parsing_order'][0]['s'].append(str(i)) return schema -def df_schema(df_columns, schema, data_model, supp_section = None, supp_schema = None, supp_model = None ): - def clean_schema(columns,schema,data_model): +def df_schema(df_columns, schema): + def clean_schema(columns,schema): # Could optionally add cleaning of element descriptors that only apply # to the initial reading of the data model: field_length, etc.... for element in list(schema): if element not in columns: schema.pop(element) - else: - schema[element].update({'data_model': data_model}) return flat_schema = dict() @@ -154,12 +152,7 @@ def df_schema(df_columns, schema, data_model, supp_section = None, supp_schema = else: flat_schema.update( { (section, x): schema['sections'].get(section).get('elements').get(x) for x in schema['sections'].get(section).get('elements') }) - clean_schema(df_columns, flat_schema, data_model) - # Here we are assuming supp data has no sections! - if supp_section: - flat_supp = dict() - flat_supp.update( { (supp_section, x): supp_schema['sections'].get(properties.dummy_level).get('elements').get(x) for x in supp_schema['sections'].get(properties.dummy_level).get('elements') }) - clean_schema(df_columns, flat_supp, supp_model) - flat_schema.update(flat_supp) + clean_schema(df_columns, flat_schema) + return flat_schema diff --git a/validate/validate.py b/validate/validate.py index 6b36e6231b16009a1d54202b7c3f55b331dcf4a6..d306d4af6b6d5f0fdcaae099a465ed04695e96be 100644 --- a/validate/validate.py +++ b/validate/validate.py @@ -16,50 +16,31 @@ Validated elements are those with the following column_types: @author: iregon """ -from __future__ import unicode_literals -from __future__ import print_function -from __future__ import absolute_import -# CAREFULL HERE: -# Note that in Python 3, the io.open function is an alias for the built-in open function. -# The built-in open function only supports the encoding argument in Python 3, not Python 2. -# https://docs.python.org/3.4/library/io.html?highlight=io -from io import StringIO as StringIO - -import sys import os import pandas as pd import numpy as np import logging from .. import properties from ..schemas import code_tables +from ..schemas import schemas -if sys.version_info[0] >= 3: - py3 = True -else: - py3 = False - from io import BytesIO as BytesIO - -# Get pandas dtype for time_stamps -toolPath = os.path.dirname(os.path.abspath(__file__)) -dirname=os.path.dirname -schema_lib = os.path.join(dirname(toolPath),'schemas','lib') - -def validate_numeric(elements,df,schema): +def validate_numeric(elements,data,schema): # Find thresholds in schema. Flag if not available -> warn - mask = pd.DataFrame(index = df.index, data = False, columns = elements) + mask = pd.DataFrame(index = data.index, data = False, columns = elements) lower = { x:schema.get(x).get('valid_min', -np.inf) for x in elements } upper = { x:schema.get(x).get('valid_max', np.inf) for x in elements } + set_elements = [ x for x in lower.keys() if lower.get(x) != -np.inf and upper.get(x) != np.inf ] if len([ x for x in elements if x not in set_elements ]) > 0: logging.warning('Data numeric elements with missing upper or lower threshold: {}'.format(",".join([ str(x) for x in elements if x not in set_elements ]))) logging.warning('Corresponding upper and/or lower bounds set to +/-inf for validation') - #mask[set_elements] = ((df[set_elements] >= [ lower.get(x) for x in set_elements ] ) & (df[set_elements] <= [ upper.get(x) for x in set_elements ])) | df[set_elements].isna() - mask[elements] = ((df[elements] >= [ lower.get(x) for x in elements ] ) & (df[elements] <= [ upper.get(x) for x in elements ])) | df[elements].isna() + + mask[elements] = ((data[elements] >= [ lower.get(x) for x in elements ] ) & (data[elements] <= [ upper.get(x) for x in elements ])) | data[elements].isna() return mask -def validate_codes(elements, df, code_tables_path, schema, supp = False): +def validate_codes(elements, data, code_tables_path, schema, supp = False): - mask = pd.DataFrame(index = df.index, data = False, columns = elements) + mask = pd.DataFrame(index = data.index, data = False, columns = elements) if os.path.isdir(code_tables_path): for element in elements: @@ -85,8 +66,8 @@ def validate_codes(elements, df, code_tables_path, schema, supp = False): dtypes = { x:properties.pandas_dtypes.get(schema.get(x).get('column_type')) for x in key_elements } table_keys = code_tables.table_keys(table) table_keys_str = [ "∿".join(x) if isinstance(x,list) else x for x in table_keys ] - validation_df = df[key_elements] - imask = pd.Series(index = df.index, data =True) + validation_df = data[key_elements] + imask = pd.Series(index = data.index, data =True) imask.iloc[np.where(validation_df.notna().all(axis = 1))[0]] = validation_df.iloc[np.where(validation_df.notna().all(axis = 1))[0],:].astype(dtypes).astype('str').apply("∿".join, axis=1).isin(table_keys_str) mask[element] = imask except Exception as e: @@ -104,92 +85,57 @@ def validate_codes(elements, df, code_tables_path, schema, supp = False): return mask -def validate(data, schema, mask0, data_model = None, data_model_path = None, supp_section = None, supp_model = None, supp_model_path = None ): - # schema is the input data schema: collection of attributes for DF elements, not the data model schema - # data model schema info is nevertheless needed to access code tables + +def validate(data, mask0, schema, code_tables_path): logging.basicConfig(format='%(levelname)s\t[%(asctime)s](%(filename)s)\t%(message)s', level=logging.INFO,datefmt='%Y%m%d %H:%M:%S',filename=None) - # 0. Check arguments are valid--------------------------------------------- - if not data_model and not data_model_path: - logging.error('A valid data model or data model path must be provided') + # Check input + if not isinstance(data,pd.DataFrame) or not isinstance(mask0,pd.DataFrame): + logging.error('Input data and mask must be a pandas data frame object') return - if supp_section: - if not supp_model and not supp_model_path: - logging.error('A valid data model or data model path must be provided for supplemental data') - return - if not isinstance(data,pd.DataFrame) and not isinstance(data,pd.io.parsers.TextFileReader): - logging.error('Input data must be a data frame or a TextFileReader object') - return - # 1. Get data models' path------------------------------------------------- - if data_model: - model_path = os.path.join(schema_lib,data_model) - else: - model_path = data_model_path - code_tables_path = os.path.join(model_path,'code_tables') + + # Get the data elements from the input data: might be just a subset of + # data model and flatten the schema to get a simple and sequential list + # of elements included in the input data + elements = [ x for x in data ] + element_atts = schemas.df_schema(elements, schema) + # See what elements we need to validate + numeric_elements = [ x for x in elements if element_atts.get(x).get('column_type') in properties.numeric_types ] + datetime_elements = [ x for x in elements if element_atts.get(x).get('column_type') == 'datetime' ] + coded_elements = [ x for x in elements if element_atts.get(x).get('column_type') == 'key' ] - if supp_section: - if supp_model: - supp_path = os.path.join(schema_lib,supp_model) - else: - supp_path = supp_model_path - supp_code_tables_path = os.path.join(supp_path,'code_tables') - - # 2. Go-------------------------------------------------------------------- - TextParserData = [data.copy()] if isinstance(data,pd.DataFrame) else data - TextParserMask = [mask0.copy()] if isinstance(mask0,pd.DataFrame) else mask0 - output_buffer = StringIO() if py3 else BytesIO() - for df, mk in zip(TextParserData, TextParserMask): - elements = [ x for x in df if x in schema ] - # See what elements we need to validate: coded go to different code table paths if supplemental - numeric_elements = [ x for x in elements if schema.get(x).get('column_type') in properties.numeric_types ] - datetime_elements = [ x for x in elements if schema.get(x).get('column_type') == 'datetime' ] - coded_elements = [ x for x in elements if schema.get(x).get('column_type') == 'key' ] - if supp_section: - supp_coded_elements = [ x for x in coded_elements if x[0] == supp_section ] - for x in supp_coded_elements: - coded_elements.remove(x) - - if any([isinstance(x,tuple) for x in numeric_elements + datetime_elements + coded_elements ]): - validated_columns = pd.MultiIndex.from_tuples(list(set(numeric_elements + coded_elements + datetime_elements))) - else: - validated_columns = list(set(numeric_elements + coded_elements + datetime_elements)) - imask = pd.DataFrame(index = df.index, columns = df.columns) - - # Validate elements by dtype - # Table coded elements can be as well numeric -> initially should not have its bounds defined in schema, but: - # Numeric validation will be overriden by code table validation!!! - # 1. NUMERIC ELEMENTS - imask[numeric_elements] = validate_numeric(numeric_elements, df, schema) + if any([isinstance(x,tuple) for x in numeric_elements + datetime_elements + coded_elements ]): + validated_columns = pd.MultiIndex.from_tuples(list(set(numeric_elements + coded_elements + datetime_elements))) + else: + validated_columns = list(set(numeric_elements + coded_elements + datetime_elements)) - # 2. TABLE CODED ELEMENTS - # See following: in multiple keys code tables, the non parameter element, won't have a code_table attribute in the schema: - # So we need to check the code_table.keys files in addition to the schema - # Additionally, a YEAR key can fail in one table, but be compliant with anbother, then, how would we mask this? - # also, a YEAR defined as an integer, will undergo its own check..... - # So I think we need to check nested keys as a whole, and mask only the actual parameterized element: - # Get the full list of keys combinations (tuples, triplets...) and check the column combination against that: if it fails, mark the element! - # Need to see how to grab the YEAR part of a datetime when YEAR comes from a datetime element - # pd.DatetimeIndex(df['_datetime']).year - if len(coded_elements)> 0: - imask[coded_elements] = validate_codes(coded_elements, df, code_tables_path, schema) - try: - if len(supp_coded_elements)>0: - imask[supp_coded_elements] = validate_codes(supp_coded_elements, df, supp_code_tables_path, schema, supp = True) - except: - pass - # 3. DATETIME ELEMENTS - # only those declared as such in schema, not _datetime - # Because of the way they are converted, read into datetime, they should already be NaT if they not validate as a valid datetime; - # let's check: hurray! they are! - imask[datetime_elements] = df[datetime_elements].notna() - imask[validated_columns] = imask[validated_columns].mask(mk[validated_columns] == False, False) + mask = pd.DataFrame(index = data.index, columns = data.columns) - imask.to_csv(output_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) + # Validate elements by dtype: + # 1. Numeric elements + mask[numeric_elements] = validate_numeric(numeric_elements, data, element_atts) + + # 2. Table coded elements + # See following: in multiple keys code tables, the non parameter element, + # won't have a code_table attribute in the element_atts: + # So we need to check the code_table.keys files in addition to the element_atts + # Additionally, a YEAR key can fail in one table, but be compliant with anbother, then, how would we mask this? + # also, a YEAR defined as an integer, will undergo its own check..... + # So I think we need to check nested keys as a whole, and mask only the actual parameterized element: + # Get the full list of keys combinations (tuples, triplets...) and check the column combination against that: if it fails, mark the element! + # Need to see how to grab the YEAR part of a datetime when YEAR comes from a datetime element + # pd.DatetimeIndex(df['_datetime']).year + if len(coded_elements)> 0: + mask[coded_elements] = validate_codes(coded_elements, data, code_tables_path, element_atts) - output_buffer.seek(0) - chunksize = None if isinstance(data,pd.DataFrame) else data.orig_options['chunksize'] - mask = pd.read_csv(output_buffer,names = [ x for x in imask ], chunksize = chunksize) + # 3. Datetime elements + # Those declared as such in element_atts + # Because of the way they are converted, read into datetime, + # they should already be NaT if they not validate as a valid datetime; + # let's check: hurray! they are! + mask[datetime_elements] = data[datetime_elements].notna() - return mask - \ No newline at end of file + mask[validated_columns] = mask[validated_columns].mask(mask0[validated_columns] == False, False) + + return mask \ No newline at end of file