Commit 51d23753 authored by iregon's avatar iregon
Browse files

Validation mask added back

parent 6b4a048d
......@@ -13,7 +13,7 @@ import pandas as pd
# Supported formats, sources and internal data models -------------------------
schema_path = os.path.join(os.path.dirname(__file__),'schemas','lib')
supported_file_formats = [ os.path.basename(x).split(".")[0] for x in glob.glob(schema_path + '/*/*.json') if os.path.basename(x).split(".")[0] == os.path.dirname(x).split("/")[-1]]
supported_sources = [pd.DataFrame, pd.io.parsers.TextFileReader, io.StringIO]
supported_sources = [pd.io.parsers.TextFileReader, io.StringIO]
# Data types ------------------------------------------------------------------
numpy_integers = ['int8','int16','int32','int64','uint8','uint16','uint32','uint64']
......
......@@ -5,7 +5,7 @@ Created on Tue Apr 30 09:38:17 2019
Reads source data (file, pandas DataFrame or pd.io.parsers.TextFileReader) to
a pandas DataFrame. The source data model needs to be input to the module as
a named model (included in the module) or as the path to a data model.
a named model (included in the module) or as the path to a valid data model.
Data is validated against its data model after reading, producing a boolean mask.
......@@ -17,13 +17,24 @@ read the data and validate it.
import os
import sys
import pandas as pd
import logging
import json
from mdf_reader.reader import reader as reader
from mdf_reader.validate import validate as validate
import mdf_reader.schemas as schemas
import mdf_reader.properties as properties
import mdf_reader.common.pandas_TextParser_hdlr as pandas_TextParser_hdlr
import logging
import json
def validate_arg(arg_name,arg_value,arg_type):
if arg_value and not isinstance(arg_value,arg_type):
logging.error('Argument {0} must be {1}, input type is {2}'.format(arg_name,arg_type,type(arg_value)))
return False
else:
return True
def read(source, data_model = None, data_model_path = None, sections = None,chunksize = None,
skiprows = None, out_path = None ):
......@@ -31,7 +42,7 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
logging.basicConfig(format='%(levelname)s\t[%(asctime)s](%(filename)s)\t%(message)s',
level=logging.INFO,datefmt='%Y%m%d %H:%M:%S',filename=None)
# 0. Make sure min info is available
# 0. Validate input
if not data_model and not data_model_path:
logging.error('A valid data model name or path to data model must be provided')
return
......@@ -43,8 +54,14 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
logging.error('Can\'t reach data source {} as a file'.format(source))
logging.info('Supported in-memory data sources are {}'.format(",".join(properties.supported_sources)))
return
if not validate_arg('sections',sections,list):
return
if not validate_arg('chunksize',chunksize,int):
return
if not validate_arg('skiprows',skiprows,int):
return
# 1. Read schema(s) and get file format
# 1. Read data model: schema reader will return None if schema does not validate
logging.info("READING DATA MODEL SCHEMA FILE...")
schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path)
if not schema:
......@@ -60,13 +77,14 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names']
out_atts = schemas.df_schema(data_columns, schema, data_model)
# 5. Complete data validation
# 5. Complete data model validation
logging.info("VALIDATING DATA")
valid = validate.validate(data, out_atts, valid, data_model = data_model, data_model_path = data_model_path)
if isinstance(data,pd.io.parsers.TextFileReader):
logging.info('...RESTORING DATA PARSER')
data = pandas_TextParser_hdlr.restore(data.f,data.orig_options)
# 6. Output to files if requested
if out_path:
logging.info('WRITING DATA TO FILES IN: {}'.format(out_path))
cols = [ x for x in data ]
......@@ -81,11 +99,11 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
with open(os.path.join(out_path,'atts.json'),'w') as fileObj:
json.dump(out_atts_json,fileObj,indent=4)
# 7. Return data
return {'data':data,'atts':out_atts,'valid_mask':valid}
if __name__=='__main__':
kwargs = dict(arg.split('=') for arg in sys.argv[2:])
if 'sections' in kwargs.keys():
kwargs.update({ 'sections': [ x.strip() for x in kwargs.get('sections').split(",")] })
read(sys.argv[1],
**kwargs) # kwargs
read(sys.argv[1], **kwargs) # kwargs
......@@ -3,10 +3,10 @@
"""
Created on Tue Apr 30 09:38:17 2019
Splits string reports in sections using a data model layout
Splits string reports in sections using a data model layout.
Input and output are simple pandas dataframes, with the output DF column names
as the section names
Input and output are simple pandas dataframes, with the output dataframe
column names being section names
To work with a pandas TextParser, loop through this module.
......@@ -34,8 +34,6 @@ use, also support to chunking would make converting to series a bit dirty...
import pandas as pd
from copy import deepcopy
import logging
import mdf_reader.properties as properties
# ---------------------------------------------------------------------------
# FUNCTIONS TO PERFORM INITIAL SEPARATION OF SECTIONS: MAIN IS GET_SECTIONS()
......@@ -128,7 +126,7 @@ def add_dynamic_children():
if (len(threads[thread_id]['modulo'])) > 0:
add_higher_group_children()
def extract_sections(df_in):
def extract_sections(string_df):
# threads elements:
# 'parsing_order' What needs to be applied to current parent data
# 'group_number' Order in the global parsing order
......@@ -154,7 +152,7 @@ def extract_sections(df_in):
threads[thread_id]['group_number'] = 0
threads[thread_id]['group_type'] = None
threads[thread_id]['section'] = None
threads[thread_id]['parent_data'] = df_in
threads[thread_id]['parent_data'] = string_df
threads[thread_id]['data'] = None
threads[thread_id]['modulo'] = threads[thread_id]['parent_data']
del threads[thread_id]['parent_data']
......@@ -198,25 +196,25 @@ def extract_sections(df_in):
# ---------------------------------------------------------------------------
# MAIN
# ---------------------------------------------------------------------------
def get_sections(StringDf, schema, read_sections):
def get_sections(string_df, schema, read_sections):
global sentinals, section_lens, sentinals_lens
global parsing_order
# Proceed to split sections if more than one
if len(schema['sections'].keys())> 1:
section_lens = { section: schema['sections'][section]['header'].get('length') for section in schema['sections'].keys()}
sentinals = { section: schema['sections'][section]['header'].get('sentinal') for section in schema['sections'].keys()}
sentinals_lens = { section: len(sentinals.get(section)) if sentinals.get(section) else 0 for section in sentinals.keys()}
parsing_order = schema['header']['parsing_order']
# Get sections separated
section_strings = extract_sections(StringDf)
section_strings = extract_sections(string_df)
# Paste in order in a single dataframe with columns named as sections
# Do not include sections not asked for
df_out = pd.DataFrame()
for section in read_sections:
df_out = pd.concat([df_out,section_strings[section].rename(section)],sort = False,axis=1)
else:
df_out = StringDf
# return section in named column
df_out = string_df
df_out.columns = read_sections
return df_out
......@@ -3,16 +3,32 @@
"""
Created on Fri Jan 10 13:17:43 2020
FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS, AN ITERABLE:
EITHER A PD.IO.PARSERS.TEXTFILEREADER OR A LIST, DEPENDING ON
SOURCE TYPE AND CHUNKSIZE ARGUMENT
BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE
FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS:
AN ITERABLE WITH DATAFRMAES
INPUT IS EITHER:
- pd.io.parsers.textfilereader
- io.StringIO
- file path
OUTPUT IS AN ITERABLE, DEPENDING ON SOURCE TYPE AND CHUNKSIZE BEING SET:
- a single dataframe in a list
- a pd.io.parsers.textfilereader
WITH BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE
delimiter="\t" option in pandas.read_fwf avoids white spaces at taild
to be stripped
@author: iregon
DEV NOTES:
1) What this module is able to ingest needs to align with properties.supported_sources
2) Check io.StringIO input: why there, does it actually work as it is?
3) Check pd.io.parsers.textfilereader input: why there, does it actually work as it is?
OPTIONS IN OLD DEVELOPMENT:
1. DLMT: delimiter = ',' default
names = [ (x,y) for x in schema['sections'].keys() for y in schema['sections'][x]['elements'].keys()]
......@@ -29,16 +45,27 @@ OPTIONS IN OLD DEVELOPMENT:
import pandas as pd
import os
import io
from mdf_reader import properties
def to_iterable_df(source,skiprows = None, chunksize = None):
TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize)
if not chunksize:
TextParser = [TextParser]
return TextParser
def import_data(source,chunksize = None, skiprows = None):
if isinstance(source,pd.io.parsers.TextFileReader):
TextParser = source
return source
elif isinstance(source, io.StringIO):
TextParser = to_iterable_df(source,skiprows = None, chunksize = None)
return TextParser
elif os.path.isfile(source):
TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize)
if not chunksize:
TextParser = [TextParser]
TextParser = to_iterable_df(source,skiprows = None, chunksize = None)
return TextParser
else:
print('Error')
return TextParser
return
......@@ -3,13 +3,26 @@
"""
Created on Fri Jan 10 13:17:43 2020
Extracts and reads (decodes, scales, etc...) the elements of data sections.
Each column of the input dataframe is a section with all its elements stored
as a single string.
Working on a section by section basis, this module uses the data model
information provided in the schema to split the elements, decode and scale them
where appropriate and ensure its data type consistency.
Output is a dataframe with columns as follows depending on the data model
structure:
1) Data model with sections (1 or more): [(section0,element0),.......(sectionN,elementN)]
2) Data model with no sections[element0...element1]
@author: iregon
"""
import pandas as pd
from io import StringIO as StringIO
import mdf_reader.properties as properties
import csv # To disable quoting
from mdf_reader.common.converters import converters
from mdf_reader.common.decoders import decoders
......@@ -38,24 +51,25 @@ def read_data(section_df,section_schema):
section_dtypes = { i:section_schema['elements'][i]['column_type'] for i in section_names }
encoded = [ (x) for x in section_names if 'encoding' in section_schema['elements'][x]]
section_encoding = { i:section_schema['elements'][i]['encoding'] for i in encoded }
section_valid = pd.DataFrame(index = section_df.index, columns = section_df.columns)
for element in section_dtypes.keys():
#missing = section_elements[element].isna()
missing = section_df[element].isna()
if element in encoded:
section_df[element] = decoders.get(section_encoding.get(element)).get(section_dtypes.get(element))(section_df[element])
kwargs = { converter_arg:section_schema['elements'][element].get(converter_arg) for converter_arg in properties.data_type_conversion_args.get(section_dtypes.get(element)) }
section_df[element] = converters.get(section_dtypes.get(element))(section_df[element], **kwargs)
# section_valid[element] = missing | section_elements[element].notna()
section_valid[element] = missing | section_df[element].notna()
return section_df
return section_df,section_valid
def read_sections(sections_df, schema):
multiindex = True if len(sections_df.columns) > 1 or sections_df.columns[0] != properties.dummy_level else False
data_df = pd.DataFrame()
valid_df = pd.DataFrame()
out_dtypes = dict()
for section in sections_df.columns:
......@@ -81,18 +95,21 @@ def read_sections(sections_df, schema):
# we'll see if we do that in the caller module or here....
sections_df[section].to_csv(section_buffer,header=False, encoding = 'utf-8',index = False)#,quoting=csv.QUOTE_NONE,escapechar="\\",sep="\t")
ssshh = section_buffer.seek(0)
# Get the individual elements as objects
# Get the individual elements as objects
if field_layout == 'fixed_width':
section_elements_obj = extract_fixed_width(section_buffer,section_schema)
elif field_layout == 'delimited':
section_elements_obj = extract_delimited(section_buffer,section_schema)
section_elements_obj.drop(ignore, axis = 1, inplace = True)
# Read the objects to their data types and apply decoding, scaling and so on...
section_elements = read_data(section_elements_obj,section_schema)
section_elements, section_valid = read_data(section_elements_obj,section_schema)
section_elements.index = sections_df[section].index
section_valid.index = sections_df[section].index
else:
section_elements = pd.DataFrame(sections_df[section],columns = [section])
section_valid = pd.DataFrame(index = section_elements.index,data = True, columns = [section])
if not disable_read:
if multiindex:
......@@ -108,6 +125,8 @@ def read_sections(sections_df, schema):
out_dtypes.update({ section:'object' } )
section_elements.columns = [ (section, x) for x in section_elements.columns] if multiindex else section_elements.columns
section_valid.columns = section_elements.columns
data_df = pd.concat([data_df,section_elements],sort = False,axis=1)
return data_df,out_dtypes
valid_df = pd.concat([valid_df,section_valid],sort = False,axis=1)
return data_df, valid_df, out_dtypes
......@@ -5,18 +5,9 @@ Created on Tue Apr 30 09:38:17 2019
Reads source data from a data model to a pandas DataFrame.
Optionally, it reads supplemental data from the same source (from a different
data model) and pastes that to the output DataFrame
Uses the meta_formats generic submodules ('delimited' and 'fixed_width') to
pre-format data source and read either generic type of data model.
@author: iregon
"""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
# CAREFULL HERE:
# Note that in Python 3, the io.open function is an alias for the built-in open function.
# The built-in open function only supports the encoding argument in Python 3, not Python 2.
......@@ -35,11 +26,6 @@ from . import read_sections
import copy
if sys.version_info[0] >= 3:
py3 = True
else:
py3 = False
from io import BytesIO as BytesIO
# Get pandas dtype for time_stamps
pandas_timestamp_dtype = pd.to_datetime(pd.DataFrame(['20000101'])[0],format='%Y%m%d').dtypes
......@@ -52,62 +38,59 @@ def read_model(source,schema, sections = None, chunksize = None, skiprows = None
logging.error('File format not yet supported')
sys.exit(1)
# 1. PARSE SCHEMA ---------------------------------------------------------
# 1. DEFINE OUTPUT --------------------------------------------------------
# Subset data model sections to requested sections
parsing_order = schema['header'].get('parsing_order')
# 2. DEFINE OUTPUT --------------------------------------------------------
# 2.1 Sections to read
# 1.1 Sections to read
if not sections:
sections = [ x.get(y) for x in parsing_order for y in x ]
read_sections_list = [y for x in sections for y in x]
else:
read_sections_list = sections
read_sections_list = sections
# 3. HOMOGENEIZE INPUT DATA (FILE OR TEXTREADER) TO AN ITERABLE TEXTREADER
# 2. HOMOGENEIZE INPUT DATA TO AN ITERABLE WITH DATAFRAMES:
# a list with a single dataframe or a pd.io.parsers.TextFileReader
logging.info("Getting data string from source...")
TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows)
# 4. EXTRACT SECTIONS IN A PARSER; EXTRACT SECTIONS HERE AND READ DATA IN
# SAME LOOP? SHOULD DO....
# 3. EXTRACT AND READ DATA IN SAME LOOP -----------------------------------
logging.info("Extracting sections...")
data_buffer = StringIO()
valid_buffer = StringIO()
# valid_buffer = ...
for i,string_df in enumerate(TextParser):
# Get sections separated in a dataframe: one per column, only requested
# sections, ignore rest.
# a. Get sections separated in a dataframe columns:
# one per column, only requested sections, ignore rest.
sections_df = get_sections.get_sections(string_df, schema, read_sections_list)
# Read elements from sections: along data chunks, resulting data types
# may vary if gaps
[data_df,out_dtypesi ] = read_sections.read_sections(sections_df, schema)
# b. Read elements from sections: along data chunks, resulting data types
# may vary if gaps, keep track of data types!
[data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema)
if i == 0:
out_dtypes = copy.deepcopy(out_dtypesi)
for k in out_dtypesi:
if out_dtypesi in properties.numpy_floats:
out_dtypes.update({ k:out_dtypesi.get(k) })
# Save to buffer
data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
# [output_buffer,valid_buffer,dtypes] = reader_function(TextParser, schema, read_sections = read_sections, idx_offset = idx_offset )
#
# # 5. OUTPUT DATA:----------------------------------------------------------
# # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
valid_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
# 4. OUTPUT DATA ----------------------------------------------------------
# WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
data_buffer.seek(0)
# valid_buffer.seek(0)
# logging.info("Wrapping output....")
# chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
# logging.info('Data')
# # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail
# date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
# for i,element in enumerate(list(dtypes)):
# if dtypes.get(element) == 'datetime':
# date_columns.append(i)
data_reader = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes)#, parse_dates = date_columns)
# logging.info('Mask')
# valid_reader = pd.read_csv(valid_buffer,names = out_names, chunksize = chunksize)
valid_buffer.seek(0)
logging.info("Wrapping output....")
# Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader
# (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input):
# This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader
chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
# 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail
date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
for i,element in enumerate(list(out_dtypes)):
if out_dtypes.get(element) == 'datetime':
date_columns.append(i)
data_reader = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes, parse_dates = date_columns)
valid_reader = pd.read_csv(valid_buffer,names = data_df.columns, chunksize = chunksize)
# return data_reader, valid_reader
return data_reader
return data_reader, valid_reader
......@@ -9,6 +9,7 @@ Read data file format json schema to dictionary
Add schema validation:
- check mandatory are not null
- check fixed options
..and return None if it does not validate
"""
......@@ -59,22 +60,6 @@ def copy_template(schema, out_dir = None,out_path = None):
print('\tValid names are: {}'.format(", ".join(schemas)))
return
def get_field_layout(field_layout_def,field_layout):
if not field_layout_def and not field_layout:
return None
elif not field_layout:
return field_layout_def
else:
return field_layout
def get_delimiter(delimiter_def,delimiter):
if not delimiter_def and not delimiter:
return None
elif not delimiter_def:
return delimiter
else:
return field_layout
def read_schema(schema_name = None, ext_schema_path = None):
if schema_name:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment