diff --git a/properties.py b/properties.py index d00f4f798f84b8722f01ba94d445dea8c4b15c6f..8e6e034b86ae0baae6e626d46a6fae276a639878 100644 --- a/properties.py +++ b/properties.py @@ -13,7 +13,7 @@ import pandas as pd # Supported formats, sources and internal data models ------------------------- schema_path = os.path.join(os.path.dirname(__file__),'schemas','lib') supported_file_formats = [ os.path.basename(x).split(".")[0] for x in glob.glob(schema_path + '/*/*.json') if os.path.basename(x).split(".")[0] == os.path.dirname(x).split("/")[-1]] -supported_sources = [pd.DataFrame, pd.io.parsers.TextFileReader, io.StringIO] +supported_sources = [pd.io.parsers.TextFileReader, io.StringIO] # Data types ------------------------------------------------------------------ numpy_integers = ['int8','int16','int32','int64','uint8','uint16','uint32','uint64'] diff --git a/read.py b/read.py index 1533d0d74f9e13bc5d4eb0446cfb77cc3dd8f3b2..b3a9d6f90441857349570602d1047f2a29d1d5c1 100644 --- a/read.py +++ b/read.py @@ -5,7 +5,7 @@ Created on Tue Apr 30 09:38:17 2019 Reads source data (file, pandas DataFrame or pd.io.parsers.TextFileReader) to a pandas DataFrame. The source data model needs to be input to the module as -a named model (included in the module) or as the path to a data model. +a named model (included in the module) or as the path to a valid data model. Data is validated against its data model after reading, producing a boolean mask. @@ -17,13 +17,24 @@ read the data and validate it. import os import sys import pandas as pd +import logging +import json + from mdf_reader.reader import reader as reader from mdf_reader.validate import validate as validate import mdf_reader.schemas as schemas + import mdf_reader.properties as properties import mdf_reader.common.pandas_TextParser_hdlr as pandas_TextParser_hdlr -import logging -import json + + +def validate_arg(arg_name,arg_value,arg_type): + + if arg_value and not isinstance(arg_value,arg_type): + logging.error('Argument {0} must be {1}, input type is {2}'.format(arg_name,arg_type,type(arg_value))) + return False + else: + return True def read(source, data_model = None, data_model_path = None, sections = None,chunksize = None, skiprows = None, out_path = None ): @@ -31,7 +42,7 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun logging.basicConfig(format='%(levelname)s\t[%(asctime)s](%(filename)s)\t%(message)s', level=logging.INFO,datefmt='%Y%m%d %H:%M:%S',filename=None) - # 0. Make sure min info is available + # 0. Validate input if not data_model and not data_model_path: logging.error('A valid data model name or path to data model must be provided') return @@ -43,8 +54,14 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun logging.error('Can\'t reach data source {} as a file'.format(source)) logging.info('Supported in-memory data sources are {}'.format(",".join(properties.supported_sources))) return + if not validate_arg('sections',sections,list): + return + if not validate_arg('chunksize',chunksize,int): + return + if not validate_arg('skiprows',skiprows,int): + return - # 1. Read schema(s) and get file format + # 1. Read data model: schema reader will return None if schema does not validate logging.info("READING DATA MODEL SCHEMA FILE...") schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path) if not schema: @@ -60,13 +77,14 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names'] out_atts = schemas.df_schema(data_columns, schema, data_model) - # 5. Complete data validation + # 5. Complete data model validation logging.info("VALIDATING DATA") valid = validate.validate(data, out_atts, valid, data_model = data_model, data_model_path = data_model_path) if isinstance(data,pd.io.parsers.TextFileReader): logging.info('...RESTORING DATA PARSER') data = pandas_TextParser_hdlr.restore(data.f,data.orig_options) + # 6. Output to files if requested if out_path: logging.info('WRITING DATA TO FILES IN: {}'.format(out_path)) cols = [ x for x in data ] @@ -81,11 +99,11 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun with open(os.path.join(out_path,'atts.json'),'w') as fileObj: json.dump(out_atts_json,fileObj,indent=4) + # 7. Return data return {'data':data,'atts':out_atts,'valid_mask':valid} if __name__=='__main__': kwargs = dict(arg.split('=') for arg in sys.argv[2:]) if 'sections' in kwargs.keys(): kwargs.update({ 'sections': [ x.strip() for x in kwargs.get('sections').split(",")] }) - read(sys.argv[1], - **kwargs) # kwargs + read(sys.argv[1], **kwargs) # kwargs diff --git a/reader/get_sections.py b/reader/get_sections.py index 4d8a5698e5988257ec2bce8b6535772d12e90498..c4073d385d82ccf28fff6b18549e3ac1d3f8bd44 100644 --- a/reader/get_sections.py +++ b/reader/get_sections.py @@ -3,10 +3,10 @@ """ Created on Tue Apr 30 09:38:17 2019 -Splits string reports in sections using a data model layout +Splits string reports in sections using a data model layout. -Input and output are simple pandas dataframes, with the output DF column names -as the section names +Input and output are simple pandas dataframes, with the output dataframe +column names being section names To work with a pandas TextParser, loop through this module. @@ -34,8 +34,6 @@ use, also support to chunking would make converting to series a bit dirty... import pandas as pd from copy import deepcopy import logging -import mdf_reader.properties as properties - # --------------------------------------------------------------------------- # FUNCTIONS TO PERFORM INITIAL SEPARATION OF SECTIONS: MAIN IS GET_SECTIONS() @@ -128,7 +126,7 @@ def add_dynamic_children(): if (len(threads[thread_id]['modulo'])) > 0: add_higher_group_children() -def extract_sections(df_in): +def extract_sections(string_df): # threads elements: # 'parsing_order' What needs to be applied to current parent data # 'group_number' Order in the global parsing order @@ -154,7 +152,7 @@ def extract_sections(df_in): threads[thread_id]['group_number'] = 0 threads[thread_id]['group_type'] = None threads[thread_id]['section'] = None - threads[thread_id]['parent_data'] = df_in + threads[thread_id]['parent_data'] = string_df threads[thread_id]['data'] = None threads[thread_id]['modulo'] = threads[thread_id]['parent_data'] del threads[thread_id]['parent_data'] @@ -198,25 +196,25 @@ def extract_sections(df_in): # --------------------------------------------------------------------------- # MAIN # --------------------------------------------------------------------------- -def get_sections(StringDf, schema, read_sections): +def get_sections(string_df, schema, read_sections): global sentinals, section_lens, sentinals_lens global parsing_order - + # Proceed to split sections if more than one if len(schema['sections'].keys())> 1: section_lens = { section: schema['sections'][section]['header'].get('length') for section in schema['sections'].keys()} sentinals = { section: schema['sections'][section]['header'].get('sentinal') for section in schema['sections'].keys()} sentinals_lens = { section: len(sentinals.get(section)) if sentinals.get(section) else 0 for section in sentinals.keys()} parsing_order = schema['header']['parsing_order'] # Get sections separated - section_strings = extract_sections(StringDf) + section_strings = extract_sections(string_df) # Paste in order in a single dataframe with columns named as sections # Do not include sections not asked for df_out = pd.DataFrame() for section in read_sections: df_out = pd.concat([df_out,section_strings[section].rename(section)],sort = False,axis=1) else: - df_out = StringDf + # return section in named column + df_out = string_df df_out.columns = read_sections - return df_out diff --git a/reader/import_data.py b/reader/import_data.py index a98a184dad6c054a8e7b3ffe93134060a26157b2..26b0a98942ae558033a4ac6e9a78bef4a18c769a 100644 --- a/reader/import_data.py +++ b/reader/import_data.py @@ -3,16 +3,32 @@ """ Created on Fri Jan 10 13:17:43 2020 -FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS, AN ITERABLE: -EITHER A PD.IO.PARSERS.TEXTFILEREADER OR A LIST, DEPENDING ON -SOURCE TYPE AND CHUNKSIZE ARGUMENT -BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE +FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS: + AN ITERABLE WITH DATAFRMAES + +INPUT IS EITHER: + - pd.io.parsers.textfilereader + - io.StringIO + - file path + +OUTPUT IS AN ITERABLE, DEPENDING ON SOURCE TYPE AND CHUNKSIZE BEING SET: + - a single dataframe in a list + - a pd.io.parsers.textfilereader + + +WITH BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE delimiter="\t" option in pandas.read_fwf avoids white spaces at taild to be stripped @author: iregon +DEV NOTES: +1) What this module is able to ingest needs to align with properties.supported_sources +2) Check io.StringIO input: why there, does it actually work as it is? +3) Check pd.io.parsers.textfilereader input: why there, does it actually work as it is? + + OPTIONS IN OLD DEVELOPMENT: 1. DLMT: delimiter = ',' default names = [ (x,y) for x in schema['sections'].keys() for y in schema['sections'][x]['elements'].keys()] @@ -29,16 +45,27 @@ OPTIONS IN OLD DEVELOPMENT: import pandas as pd import os +import io + from mdf_reader import properties +def to_iterable_df(source,skiprows = None, chunksize = None): + TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize) + if not chunksize: + TextParser = [TextParser] + return TextParser + + def import_data(source,chunksize = None, skiprows = None): if isinstance(source,pd.io.parsers.TextFileReader): - TextParser = source + return source + elif isinstance(source, io.StringIO): + TextParser = to_iterable_df(source,skiprows = None, chunksize = None) + return TextParser elif os.path.isfile(source): - TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize) - if not chunksize: - TextParser = [TextParser] + TextParser = to_iterable_df(source,skiprows = None, chunksize = None) + return TextParser else: print('Error') - return TextParser + return diff --git a/reader/read_sections.py b/reader/read_sections.py index 9bdd36168402e8ed70ff30a79933d173897ed647..e95631aaeeaf5935e4bf8e6cc34c3c48aa152df5 100644 --- a/reader/read_sections.py +++ b/reader/read_sections.py @@ -3,13 +3,26 @@ """ Created on Fri Jan 10 13:17:43 2020 +Extracts and reads (decodes, scales, etc...) the elements of data sections. +Each column of the input dataframe is a section with all its elements stored +as a single string. + +Working on a section by section basis, this module uses the data model +information provided in the schema to split the elements, decode and scale them +where appropriate and ensure its data type consistency. + +Output is a dataframe with columns as follows depending on the data model +structure: + 1) Data model with sections (1 or more): [(section0,element0),.......(sectionN,elementN)] + 2) Data model with no sections[element0...element1] + + @author: iregon """ import pandas as pd from io import StringIO as StringIO import mdf_reader.properties as properties -import csv # To disable quoting from mdf_reader.common.converters import converters from mdf_reader.common.decoders import decoders @@ -38,24 +51,25 @@ def read_data(section_df,section_schema): section_dtypes = { i:section_schema['elements'][i]['column_type'] for i in section_names } encoded = [ (x) for x in section_names if 'encoding' in section_schema['elements'][x]] section_encoding = { i:section_schema['elements'][i]['encoding'] for i in encoded } + section_valid = pd.DataFrame(index = section_df.index, columns = section_df.columns) for element in section_dtypes.keys(): - #missing = section_elements[element].isna() + missing = section_df[element].isna() if element in encoded: section_df[element] = decoders.get(section_encoding.get(element)).get(section_dtypes.get(element))(section_df[element]) kwargs = { converter_arg:section_schema['elements'][element].get(converter_arg) for converter_arg in properties.data_type_conversion_args.get(section_dtypes.get(element)) } section_df[element] = converters.get(section_dtypes.get(element))(section_df[element], **kwargs) -# section_valid[element] = missing | section_elements[element].notna() + section_valid[element] = missing | section_df[element].notna() - return section_df + return section_df,section_valid def read_sections(sections_df, schema): multiindex = True if len(sections_df.columns) > 1 or sections_df.columns[0] != properties.dummy_level else False data_df = pd.DataFrame() - + valid_df = pd.DataFrame() out_dtypes = dict() for section in sections_df.columns: @@ -81,18 +95,21 @@ def read_sections(sections_df, schema): # we'll see if we do that in the caller module or here.... sections_df[section].to_csv(section_buffer,header=False, encoding = 'utf-8',index = False)#,quoting=csv.QUOTE_NONE,escapechar="\\",sep="\t") ssshh = section_buffer.seek(0) - # Get the individual elements as objects + # Get the individual elements as objects if field_layout == 'fixed_width': section_elements_obj = extract_fixed_width(section_buffer,section_schema) elif field_layout == 'delimited': section_elements_obj = extract_delimited(section_buffer,section_schema) section_elements_obj.drop(ignore, axis = 1, inplace = True) + # Read the objects to their data types and apply decoding, scaling and so on... - section_elements = read_data(section_elements_obj,section_schema) + section_elements, section_valid = read_data(section_elements_obj,section_schema) section_elements.index = sections_df[section].index + section_valid.index = sections_df[section].index else: section_elements = pd.DataFrame(sections_df[section],columns = [section]) + section_valid = pd.DataFrame(index = section_elements.index,data = True, columns = [section]) if not disable_read: if multiindex: @@ -108,6 +125,8 @@ def read_sections(sections_df, schema): out_dtypes.update({ section:'object' } ) section_elements.columns = [ (section, x) for x in section_elements.columns] if multiindex else section_elements.columns + section_valid.columns = section_elements.columns data_df = pd.concat([data_df,section_elements],sort = False,axis=1) - - return data_df,out_dtypes + valid_df = pd.concat([valid_df,section_valid],sort = False,axis=1) + + return data_df, valid_df, out_dtypes diff --git a/reader/reader.py b/reader/reader.py index db2a381ccc26d19b352002cd75a575d783f845d5..737367b0ee3b57c1105b7f1dcc24499cb99f6689 100644 --- a/reader/reader.py +++ b/reader/reader.py @@ -5,18 +5,9 @@ Created on Tue Apr 30 09:38:17 2019 Reads source data from a data model to a pandas DataFrame. -Optionally, it reads supplemental data from the same source (from a different - data model) and pastes that to the output DataFrame - -Uses the meta_formats generic submodules ('delimited' and 'fixed_width') to -pre-format data source and read either generic type of data model. - @author: iregon """ -from __future__ import unicode_literals -from __future__ import print_function -from __future__ import absolute_import # CAREFULL HERE: # Note that in Python 3, the io.open function is an alias for the built-in open function. # The built-in open function only supports the encoding argument in Python 3, not Python 2. @@ -35,11 +26,6 @@ from . import read_sections import copy -if sys.version_info[0] >= 3: - py3 = True -else: - py3 = False - from io import BytesIO as BytesIO # Get pandas dtype for time_stamps pandas_timestamp_dtype = pd.to_datetime(pd.DataFrame(['20000101'])[0],format='%Y%m%d').dtypes @@ -52,62 +38,59 @@ def read_model(source,schema, sections = None, chunksize = None, skiprows = None logging.error('File format not yet supported') sys.exit(1) - # 1. PARSE SCHEMA --------------------------------------------------------- - + # 1. DEFINE OUTPUT -------------------------------------------------------- + # Subset data model sections to requested sections parsing_order = schema['header'].get('parsing_order') - - # 2. DEFINE OUTPUT -------------------------------------------------------- - # 2.1 Sections to read + # 1.1 Sections to read if not sections: sections = [ x.get(y) for x in parsing_order for y in x ] read_sections_list = [y for x in sections for y in x] else: - read_sections_list = sections - + read_sections_list = sections - # 3. HOMOGENEIZE INPUT DATA (FILE OR TEXTREADER) TO AN ITERABLE TEXTREADER + # 2. HOMOGENEIZE INPUT DATA TO AN ITERABLE WITH DATAFRAMES: + # a list with a single dataframe or a pd.io.parsers.TextFileReader logging.info("Getting data string from source...") TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows) - # 4. EXTRACT SECTIONS IN A PARSER; EXTRACT SECTIONS HERE AND READ DATA IN - # SAME LOOP? SHOULD DO.... + # 3. EXTRACT AND READ DATA IN SAME LOOP ----------------------------------- logging.info("Extracting sections...") data_buffer = StringIO() + valid_buffer = StringIO() - -# valid_buffer = ... for i,string_df in enumerate(TextParser): - # Get sections separated in a dataframe: one per column, only requested - # sections, ignore rest. + # a. Get sections separated in a dataframe columns: + # one per column, only requested sections, ignore rest. sections_df = get_sections.get_sections(string_df, schema, read_sections_list) - # Read elements from sections: along data chunks, resulting data types - # may vary if gaps - [data_df,out_dtypesi ] = read_sections.read_sections(sections_df, schema) + # b. Read elements from sections: along data chunks, resulting data types + # may vary if gaps, keep track of data types! + [data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema) if i == 0: out_dtypes = copy.deepcopy(out_dtypesi) for k in out_dtypesi: if out_dtypesi in properties.numpy_floats: out_dtypes.update({ k:out_dtypesi.get(k) }) - + # Save to buffer data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) -# [output_buffer,valid_buffer,dtypes] = reader_function(TextParser, schema, read_sections = read_sections, idx_offset = idx_offset ) -# -# # 5. OUTPUT DATA:---------------------------------------------------------- -# # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE + valid_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) + + # 4. OUTPUT DATA ---------------------------------------------------------- + # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE data_buffer.seek(0) -# valid_buffer.seek(0) -# logging.info("Wrapping output....") -# chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None -# logging.info('Data') -# # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail -# date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples.... -# for i,element in enumerate(list(dtypes)): -# if dtypes.get(element) == 'datetime': -# date_columns.append(i) - data_reader = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes)#, parse_dates = date_columns) -# logging.info('Mask') -# valid_reader = pd.read_csv(valid_buffer,names = out_names, chunksize = chunksize) + valid_buffer.seek(0) + logging.info("Wrapping output....") +# Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader +# (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input): +# This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader + chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None + # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail + date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples.... + for i,element in enumerate(list(out_dtypes)): + if out_dtypes.get(element) == 'datetime': + date_columns.append(i) + + data_reader = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes, parse_dates = date_columns) + valid_reader = pd.read_csv(valid_buffer,names = data_df.columns, chunksize = chunksize) -# return data_reader, valid_reader - return data_reader + return data_reader, valid_reader diff --git a/schemas/schemas.py b/schemas/schemas.py index 8a981a03080c9c90fae1798ddc00fc82aef2f7c7..f5dcf8908c2d5042f090f9b6328a06cb27ca7b5b 100644 --- a/schemas/schemas.py +++ b/schemas/schemas.py @@ -9,6 +9,7 @@ Read data file format json schema to dictionary Add schema validation: - check mandatory are not null - check fixed options +..and return None if it does not validate """ @@ -59,22 +60,6 @@ def copy_template(schema, out_dir = None,out_path = None): print('\tValid names are: {}'.format(", ".join(schemas))) return -def get_field_layout(field_layout_def,field_layout): - if not field_layout_def and not field_layout: - return None - elif not field_layout: - return field_layout_def - else: - return field_layout - -def get_delimiter(delimiter_def,delimiter): - if not delimiter_def and not delimiter: - return None - elif not delimiter_def: - return delimiter - else: - return field_layout - def read_schema(schema_name = None, ext_schema_path = None): if schema_name: