From 3647ead34d08ec9291f176c13a222b2990be08f8 Mon Sep 17 00:00:00 2001 From: perezgonzalez-irene <iregon@noc.ac.uk> Date: Wed, 22 Jan 2020 10:22:13 +0000 Subject: [PATCH] Internal read management now here --- read.py | 89 +++++++++++++++++++++++++++++++++++------ reader/reader.py | 101 ----------------------------------------------- 2 files changed, 78 insertions(+), 112 deletions(-) delete mode 100644 reader/reader.py diff --git a/read.py b/read.py index 01bc9c4..4c39ceb 100644 --- a/read.py +++ b/read.py @@ -19,14 +19,17 @@ import sys import pandas as pd import logging import json +import copy +from io import StringIO as StringIO -from mdf_reader.reader import reader as reader -from mdf_reader.validate import validate as validate import mdf_reader.schemas as schemas - import mdf_reader.properties as properties import mdf_reader.common.pandas_TextParser_hdlr as pandas_TextParser_hdlr +from mdf_reader.reader import import_data +from mdf_reader.reader import get_sections +from mdf_reader.reader import read_sections +from mdf_reader.validate import validate def validate_arg(arg_name,arg_value,arg_type): @@ -61,29 +64,93 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun if not validate_arg('skiprows',skiprows,int): return - # 1. Read data model: schema reader will return None if schema does not validate + # 1. Read data model + # Schema reader will return None if schema does not validate logging.info("READING DATA MODEL SCHEMA FILE...") schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path) if not schema: return - + # For future use: some work already done in schema reading + if schema['header'].get('multiple_reports_per_line'): + logging.error('File format not yet supported') + sys.exit(1) + # 2. Read data imodel = data_model if data_model else data_model_path - logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel)) - data, valid = reader.read_model(source,schema, sections = sections, chunksize = chunksize, skiprows = skiprows) - # 4. Create out data attributes + logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel)) + + # 2.1. Define output + # Subset data model sections to requested sections + parsing_order = schema['header'].get('parsing_order') + if not sections: + sections = [ x.get(y) for x in parsing_order for y in x ] + read_sections_list = [y for x in sections for y in x] + else: + read_sections_list = sections + + # 2.2 Homogeneize input data to an iterable with dataframes: + # a list with a single dataframe or a pd.io.parsers.TextFileReader + logging.info("Getting data string from source...") + TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows) + + # 2.3. Extract and read data in same loop + logging.info("Extracting sections...") + data_buffer = StringIO() + valid_buffer = StringIO() + + for i,string_df in enumerate(TextParser): + # a. Get a DF with sections separated in columns: + # - one section per column + # - only sections requested, ignore rest + # - requested NA sections as NaN columns + # - columns order as in read_sections_list + sections_df = get_sections.get_sections(string_df, schema, read_sections_list) + + # b. Read elements from sections: along data chunks, resulting data types + # may vary if gaps, keep track of data types! + # Sections as parsed in the same order as sections_df.columns + [data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema) + if i == 0: + out_dtypes = copy.deepcopy(out_dtypesi) + + for k in out_dtypesi: + if out_dtypesi in properties.numpy_floats: + out_dtypes.update({ k:out_dtypesi.get(k) }) + # Save to buffer + data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) + valid_df.to_csv(valid_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) + + # 2.4 Create output data + # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE + data_buffer.seek(0) + valid_buffer.seek(0) + logging.info("Wrapping output....") + # Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader + # (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input): + # This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader + chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None + # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail + date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples.... + for i,element in enumerate(list(out_dtypes)): + if out_dtypes.get(element) == 'datetime': + date_columns.append(i) + + data = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes, parse_dates = date_columns) + valid = pd.read_csv(valid_buffer,names = data_df.columns, chunksize = chunksize) + + # 3. Create out data attributes logging.info("CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL(S)") data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names'] out_atts = schemas.df_schema(data_columns, schema, data_model) - # 5. Complete data model validation + # 4. Complete data model validation logging.info("VALIDATING DATA") valid = validate.validate(data, out_atts, valid, data_model = data_model, data_model_path = data_model_path) if isinstance(data,pd.io.parsers.TextFileReader): logging.info('...RESTORING DATA PARSER') data = pandas_TextParser_hdlr.restore(data.f,data.orig_options) - # 6. Output to files if requested + # 5. Output to files if requested if out_path: enlisted = False if not isinstance(data,pd.io.parsers.TextFileReader): @@ -115,7 +182,7 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun with open(os.path.join(out_path,'atts.json'),'w') as fileObj: json.dump(out_atts_json,fileObj,indent=4) - # 7. Return data + # 6. Return data return {'data':data,'atts':out_atts,'valid_mask':valid} if __name__=='__main__': diff --git a/reader/reader.py b/reader/reader.py deleted file mode 100644 index 796fa2c..0000000 --- a/reader/reader.py +++ /dev/null @@ -1,101 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Created on Tue Apr 30 09:38:17 2019 - -Reads source data from a data model to a pandas DataFrame. - -@author: iregon -""" - -# CAREFULL HERE: -# Note that in Python 3, the io.open function is an alias for the built-in open function. -# The built-in open function only supports the encoding argument in Python 3, not Python 2. -# https://docs.python.org/3.4/library/io.html?highlight=io -from io import StringIO as StringIO -import sys -import pandas as pd -import numpy as np -import logging -from . import meta_formats - -from .. import properties -from . import import_data -from . import get_sections -from . import read_sections - -import copy - - -# Get pandas dtype for time_stamps -pandas_timestamp_dtype = pd.to_datetime(pd.DataFrame(['20000101'])[0],format='%Y%m%d').dtypes - -def read_model(source,schema, sections = None, chunksize = None, skiprows = None): - - # 0. GET META FORMAT SUBCLASS --------------------------------------------- - # For future use: some work already done in schema reading - if schema['header'].get('multiple_reports_per_line'): - logging.error('File format not yet supported') - sys.exit(1) - - # 1. DEFINE OUTPUT -------------------------------------------------------- - # Subset data model sections to requested sections - parsing_order = schema['header'].get('parsing_order') - # 1.1 Sections to read - if not sections: - sections = [ x.get(y) for x in parsing_order for y in x ] - read_sections_list = [y for x in sections for y in x] - else: - read_sections_list = sections - - # 2. HOMOGENEIZE INPUT DATA TO AN ITERABLE WITH DATAFRAMES: - # a list with a single dataframe or a pd.io.parsers.TextFileReader - logging.info("Getting data string from source...") - TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows) - - # 3. EXTRACT AND READ DATA IN SAME LOOP ----------------------------------- - logging.info("Extracting sections...") - data_buffer = StringIO() - valid_buffer = StringIO() - - for i,string_df in enumerate(TextParser): - # a. Get a DF with sections separated in columns: - # - one section per column - # - only sections requested, ignore rest - # - requested NA sections as NaN columns - # - columns order as in read_sections_list - sections_df = get_sections.get_sections(string_df, schema, read_sections_list) - - # b. Read elements from sections: along data chunks, resulting data types - # may vary if gaps, keep track of data types! - # Sections as parsed in the same order as sections_df.columns - [data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema) - if i == 0: - out_dtypes = copy.deepcopy(out_dtypesi) - - for k in out_dtypesi: - if out_dtypesi in properties.numpy_floats: - out_dtypes.update({ k:out_dtypesi.get(k) }) - # Save to buffer - data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) - valid_df.to_csv(valid_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) - - # 4. OUTPUT DATA ---------------------------------------------------------- - # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE - data_buffer.seek(0) - valid_buffer.seek(0) - logging.info("Wrapping output....") - # Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader - # (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input): - # This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader - chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None - # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail - date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples.... - for i,element in enumerate(list(out_dtypes)): - if out_dtypes.get(element) == 'datetime': - date_columns.append(i) - - data_reader = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes, parse_dates = date_columns) - valid_reader = pd.read_csv(valid_buffer,names = data_df.columns, chunksize = chunksize) - - return data_reader, valid_reader -- GitLab