From 51d2375382dbbfe2ee0112e3b5d13cdb49ad622f Mon Sep 17 00:00:00 2001
From: perezgonzalez-irene <iregon@noc.ac.uk>
Date: Mon, 20 Jan 2020 15:34:11 +0000
Subject: [PATCH] Validation mask added back

---
 properties.py           |  2 +-
 read.py                 | 34 +++++++++++++----
 reader/get_sections.py  | 22 +++++------
 reader/import_data.py   | 45 +++++++++++++++++-----
 reader/read_sections.py | 37 +++++++++++++-----
 reader/reader.py        | 83 ++++++++++++++++-------------------------
 schemas/schemas.py      | 17 +--------
 7 files changed, 135 insertions(+), 105 deletions(-)

diff --git a/properties.py b/properties.py
index d00f4f7..8e6e034 100644
--- a/properties.py
+++ b/properties.py
@@ -13,7 +13,7 @@ import pandas as pd
 # Supported formats, sources and internal data models -------------------------
 schema_path = os.path.join(os.path.dirname(__file__),'schemas','lib')
 supported_file_formats = [ os.path.basename(x).split(".")[0] for x in glob.glob(schema_path + '/*/*.json') if os.path.basename(x).split(".")[0] == os.path.dirname(x).split("/")[-1]]
-supported_sources = [pd.DataFrame, pd.io.parsers.TextFileReader, io.StringIO]
+supported_sources = [pd.io.parsers.TextFileReader, io.StringIO]
 
 # Data types ------------------------------------------------------------------
 numpy_integers = ['int8','int16','int32','int64','uint8','uint16','uint32','uint64']
diff --git a/read.py b/read.py
index 1533d0d..b3a9d6f 100644
--- a/read.py
+++ b/read.py
@@ -5,7 +5,7 @@ Created on Tue Apr 30 09:38:17 2019
 
 Reads source data (file, pandas DataFrame or pd.io.parsers.TextFileReader) to
 a pandas DataFrame. The source data model needs to be input to the module as
-a named model (included in the module) or as the path to a data model.
+a named model (included in the module) or as the path to a valid data model.
 
 Data is validated against its data model after reading, producing a boolean mask.
 
@@ -17,13 +17,24 @@ read the data and validate it.
 import os
 import sys
 import pandas as pd
+import logging
+import json
+
 from mdf_reader.reader import reader as reader
 from mdf_reader.validate import validate as validate
 import mdf_reader.schemas as schemas
+
 import mdf_reader.properties as properties
 import mdf_reader.common.pandas_TextParser_hdlr as pandas_TextParser_hdlr
-import logging
-import json
+
+
+def validate_arg(arg_name,arg_value,arg_type):
+    
+    if arg_value and not isinstance(arg_value,arg_type):
+        logging.error('Argument {0} must be {1}, input type is {2}'.format(arg_name,arg_type,type(arg_value))) 
+        return False
+    else:
+        return True
 
 def read(source, data_model = None, data_model_path = None, sections = None,chunksize = None,
          skiprows = None, out_path = None ):
@@ -31,7 +42,7 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
     logging.basicConfig(format='%(levelname)s\t[%(asctime)s](%(filename)s)\t%(message)s',
                     level=logging.INFO,datefmt='%Y%m%d %H:%M:%S',filename=None)
 
-    # 0. Make sure min info is available
+    # 0. Validate input
     if not data_model and not data_model_path:
         logging.error('A valid data model name or path to data model must be provided')
         return
@@ -43,8 +54,14 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
             logging.error('Can\'t reach data source {} as a file'.format(source))
             logging.info('Supported in-memory data sources are {}'.format(",".join(properties.supported_sources)))
             return
+    if not validate_arg('sections',sections,list):
+        return
+    if not validate_arg('chunksize',chunksize,int):
+        return    
+    if not validate_arg('skiprows',skiprows,int):
+        return    
 
-    # 1. Read schema(s) and get file format
+    # 1. Read data model: schema reader will return None if schema does not validate
     logging.info("READING DATA MODEL SCHEMA FILE...")
     schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path)
     if not schema:
@@ -60,13 +77,14 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
     data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names']
     out_atts = schemas.df_schema(data_columns, schema, data_model)
 
-    # 5. Complete data validation
+    # 5. Complete data model validation
     logging.info("VALIDATING DATA")
     valid = validate.validate(data, out_atts, valid, data_model = data_model, data_model_path = data_model_path)
     if isinstance(data,pd.io.parsers.TextFileReader):
             logging.info('...RESTORING DATA PARSER')
             data = pandas_TextParser_hdlr.restore(data.f,data.orig_options)
 
+    # 6. Output to files if requested
     if out_path:
         logging.info('WRITING DATA TO FILES IN: {}'.format(out_path))
         cols = [ x for x in data ]
@@ -81,11 +99,11 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
         with open(os.path.join(out_path,'atts.json'),'w') as fileObj:
             json.dump(out_atts_json,fileObj,indent=4)
 
+    # 7. Return data
     return {'data':data,'atts':out_atts,'valid_mask':valid}
 
 if __name__=='__main__':
     kwargs = dict(arg.split('=') for arg in sys.argv[2:])
     if 'sections' in kwargs.keys():
         kwargs.update({ 'sections': [ x.strip() for x in kwargs.get('sections').split(",")] })
-    read(sys.argv[1],
-         **kwargs) # kwargs
+    read(sys.argv[1], **kwargs) # kwargs
diff --git a/reader/get_sections.py b/reader/get_sections.py
index 4d8a569..c4073d3 100644
--- a/reader/get_sections.py
+++ b/reader/get_sections.py
@@ -3,10 +3,10 @@
 """
 Created on Tue Apr 30 09:38:17 2019
 
-Splits string reports in sections using a data model layout
+Splits string reports in sections using a data model layout.
 
-Input and output are simple pandas dataframes, with the output DF column names
-as the section names
+Input and output are simple pandas dataframes, with the output dataframe
+column names being section names
 
 To work with a pandas TextParser, loop through this module.
 
@@ -34,8 +34,6 @@ use, also support to chunking would make converting to series a bit dirty...
 import pandas as pd
 from copy import deepcopy
 import logging
-import mdf_reader.properties as properties
-
 
 #   ---------------------------------------------------------------------------
 #   FUNCTIONS TO PERFORM INITIAL SEPARATION OF SECTIONS: MAIN IS GET_SECTIONS()
@@ -128,7 +126,7 @@ def add_dynamic_children():
     if (len(threads[thread_id]['modulo'])) > 0:
         add_higher_group_children()
 
-def extract_sections(df_in):
+def extract_sections(string_df):
     # threads elements:
     #    'parsing_order'            What needs to be applied to current parent data
     #    'group_number'             Order in the global parsing order
@@ -154,7 +152,7 @@ def extract_sections(df_in):
     threads[thread_id]['group_number'] = 0
     threads[thread_id]['group_type'] = None
     threads[thread_id]['section'] = None
-    threads[thread_id]['parent_data'] = df_in
+    threads[thread_id]['parent_data'] = string_df
     threads[thread_id]['data'] = None
     threads[thread_id]['modulo'] = threads[thread_id]['parent_data']
     del threads[thread_id]['parent_data']
@@ -198,25 +196,25 @@ def extract_sections(df_in):
 #   ---------------------------------------------------------------------------
 #   MAIN
 #   ---------------------------------------------------------------------------
-def get_sections(StringDf, schema, read_sections):
+def get_sections(string_df, schema, read_sections):
     global sentinals, section_lens, sentinals_lens
     global parsing_order
-
+    # Proceed to split sections if more than one
     if len(schema['sections'].keys())> 1:
         section_lens = { section: schema['sections'][section]['header'].get('length') for section in schema['sections'].keys()}
         sentinals = { section: schema['sections'][section]['header'].get('sentinal') for section in schema['sections'].keys()}
         sentinals_lens = { section: len(sentinals.get(section)) if sentinals.get(section) else 0 for section in sentinals.keys()}
         parsing_order = schema['header']['parsing_order']
         # Get sections separated
-        section_strings = extract_sections(StringDf)
+        section_strings = extract_sections(string_df)
         # Paste in order in a single dataframe with columns named as sections
         # Do not include sections not asked for
         df_out = pd.DataFrame()
         for section in read_sections:
             df_out = pd.concat([df_out,section_strings[section].rename(section)],sort = False,axis=1)
     else:
-        df_out = StringDf
+        # return section in named column
+        df_out = string_df
         df_out.columns = read_sections
 
-
     return df_out
diff --git a/reader/import_data.py b/reader/import_data.py
index a98a184..26b0a98 100644
--- a/reader/import_data.py
+++ b/reader/import_data.py
@@ -3,16 +3,32 @@
 """
 Created on Fri Jan 10 13:17:43 2020
 
-FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS, AN ITERABLE:
-EITHER A PD.IO.PARSERS.TEXTFILEREADER OR A LIST, DEPENDING ON
-SOURCE TYPE AND CHUNKSIZE ARGUMENT
-BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE
+FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS:
+    AN ITERABLE WITH DATAFRMAES
+
+INPUT IS EITHER:
+    - pd.io.parsers.textfilereader
+    - io.StringIO
+    - file path
+    
+OUTPUT IS AN ITERABLE, DEPENDING ON SOURCE TYPE AND CHUNKSIZE BEING SET:
+    - a single dataframe in a list
+    - a pd.io.parsers.textfilereader
+
+
+WITH BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE
 
 delimiter="\t" option in pandas.read_fwf avoids white spaces at taild 
 to be stripped
 
 @author: iregon
 
+DEV NOTES:
+1) What this module is able to ingest needs to align with properties.supported_sources
+2) Check io.StringIO input: why there, does it actually work as it is?
+3) Check pd.io.parsers.textfilereader input: why there, does it actually work as it is?   
+
+
 OPTIONS IN OLD DEVELOPMENT:
     1. DLMT: delimiter = ',' default
         names = [ (x,y) for x in schema['sections'].keys() for y in schema['sections'][x]['elements'].keys()]
@@ -29,16 +45,27 @@ OPTIONS IN OLD DEVELOPMENT:
 
 import pandas as pd
 import os
+import io
+
 from mdf_reader import properties
 
+def to_iterable_df(source,skiprows = None, chunksize = None):
+    TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize)
+    if not chunksize:
+        TextParser = [TextParser]
+    return TextParser    
+    
+
 def import_data(source,chunksize = None, skiprows = None):
 
     if isinstance(source,pd.io.parsers.TextFileReader):
-        TextParser = source
+        return source
+    elif isinstance(source, io.StringIO):
+        TextParser = to_iterable_df(source,skiprows = None, chunksize = None)
+        return TextParser
     elif os.path.isfile(source):
-        TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize)
-        if not chunksize:
-            TextParser = [TextParser]
+        TextParser = to_iterable_df(source,skiprows = None, chunksize = None)
+        return TextParser
     else:
         print('Error')
-    return TextParser
+        return
diff --git a/reader/read_sections.py b/reader/read_sections.py
index 9bdd361..e95631a 100644
--- a/reader/read_sections.py
+++ b/reader/read_sections.py
@@ -3,13 +3,26 @@
 """
 Created on Fri Jan 10 13:17:43 2020
 
+Extracts and reads (decodes, scales, etc...) the elements of data sections.
+Each column of the input dataframe is a section with all its elements stored
+as a single string.
+
+Working on a section by section basis, this module uses the data model 
+information provided in the schema to split the elements, decode and scale them
+where appropriate and ensure its data type consistency.
+
+Output is a dataframe with columns as follows depending on the data model
+structure:
+    1) Data model with sections (1 or more): [(section0,element0),.......(sectionN,elementN)]
+    2) Data model with no sections[element0...element1]
+
+
 @author: iregon
 """
 
 import pandas as pd
 from io import StringIO as StringIO
 import mdf_reader.properties as properties
-import csv # To disable quoting
 from mdf_reader.common.converters import converters
 from mdf_reader.common.decoders import decoders
 
@@ -38,24 +51,25 @@ def read_data(section_df,section_schema):
     section_dtypes = { i:section_schema['elements'][i]['column_type'] for i in section_names }
     encoded = [ (x) for x in section_names if 'encoding' in section_schema['elements'][x]]
     section_encoding = { i:section_schema['elements'][i]['encoding'] for i in encoded }
+    section_valid = pd.DataFrame(index = section_df.index, columns = section_df.columns)
     
     for element in section_dtypes.keys():
-        #missing = section_elements[element].isna()
+        missing = section_df[element].isna()
         if element in encoded:
             section_df[element] = decoders.get(section_encoding.get(element)).get(section_dtypes.get(element))(section_df[element])
 
         kwargs = { converter_arg:section_schema['elements'][element].get(converter_arg) for converter_arg in properties.data_type_conversion_args.get(section_dtypes.get(element))  }
         section_df[element] = converters.get(section_dtypes.get(element))(section_df[element], **kwargs)
 
-#        section_valid[element] = missing | section_elements[element].notna()
+        section_valid[element] = missing | section_df[element].notna()
                 
-    return section_df
+    return section_df,section_valid
 
 def read_sections(sections_df, schema):
     
     multiindex = True if len(sections_df.columns) > 1 or sections_df.columns[0] != properties.dummy_level else False
     data_df = pd.DataFrame()
-    
+    valid_df = pd.DataFrame()
     out_dtypes = dict()
     
     for section in sections_df.columns: 
@@ -81,18 +95,21 @@ def read_sections(sections_df, schema):
             # we'll see if we do that in the caller module or here....
             sections_df[section].to_csv(section_buffer,header=False, encoding = 'utf-8',index = False)#,quoting=csv.QUOTE_NONE,escapechar="\\",sep="\t") 
             ssshh = section_buffer.seek(0)
-        # Get the individual elements as objects
+            # Get the individual elements as objects
             if field_layout == 'fixed_width':
                 section_elements_obj = extract_fixed_width(section_buffer,section_schema)
             elif field_layout == 'delimited':
                 section_elements_obj = extract_delimited(section_buffer,section_schema)
                 
             section_elements_obj.drop(ignore, axis = 1, inplace = True)
+            
             # Read the objects to their data types and apply decoding, scaling and so on...
-            section_elements = read_data(section_elements_obj,section_schema)
+            section_elements, section_valid = read_data(section_elements_obj,section_schema)
             section_elements.index = sections_df[section].index
+            section_valid.index = sections_df[section].index
         else:
             section_elements = pd.DataFrame(sections_df[section],columns = [section])
+            section_valid = pd.DataFrame(index = section_elements.index,data = True, columns = [section])
                
         if not disable_read:
             if multiindex:
@@ -108,6 +125,8 @@ def read_sections(sections_df, schema):
                 out_dtypes.update({ section:'object' } )        
         
         section_elements.columns = [ (section, x) for x in section_elements.columns] if multiindex else section_elements.columns
+        section_valid.columns = section_elements.columns
         data_df = pd.concat([data_df,section_elements],sort = False,axis=1)
-           
-    return data_df,out_dtypes
+        valid_df = pd.concat([valid_df,section_valid],sort = False,axis=1)
+        
+    return data_df, valid_df, out_dtypes
diff --git a/reader/reader.py b/reader/reader.py
index db2a381..737367b 100644
--- a/reader/reader.py
+++ b/reader/reader.py
@@ -5,18 +5,9 @@ Created on Tue Apr 30 09:38:17 2019
 
 Reads source data from a data model to a pandas DataFrame.
 
-Optionally, it reads supplemental data from the same source (from a different
- data model) and pastes that to the output DataFrame
-
-Uses the meta_formats generic submodules ('delimited' and 'fixed_width') to
-pre-format data source and read either generic type of data model.
-
 @author: iregon
 """
 
-from __future__ import unicode_literals
-from __future__ import print_function
-from __future__ import absolute_import
 # CAREFULL HERE:
 # Note that in Python 3, the io.open function is an alias for the built-in open function.
 # The built-in open function only supports the encoding argument in Python 3, not Python 2.
@@ -35,11 +26,6 @@ from . import read_sections
 
 import copy
 
-if sys.version_info[0] >= 3:
-    py3 = True
-else:
-    py3 = False
-    from io import BytesIO as BytesIO
 
 # Get pandas dtype for time_stamps
 pandas_timestamp_dtype = pd.to_datetime(pd.DataFrame(['20000101'])[0],format='%Y%m%d').dtypes
@@ -52,62 +38,59 @@ def read_model(source,schema, sections = None, chunksize = None, skiprows = None
         logging.error('File format not yet supported')
         sys.exit(1)
 
-    # 1. PARSE SCHEMA ---------------------------------------------------------
-
+    # 1. DEFINE OUTPUT --------------------------------------------------------
+    # Subset data model sections to requested sections
     parsing_order = schema['header'].get('parsing_order')
-
-    # 2. DEFINE OUTPUT --------------------------------------------------------
-    # 2.1 Sections to read
+    # 1.1 Sections to read
     if not sections:
         sections = [ x.get(y) for x in parsing_order for y in x ]
         read_sections_list = [y for x in sections for y in x]
     else:
-        read_sections_list = sections
-        
+        read_sections_list = sections 
 
-    # 3. HOMOGENEIZE INPUT DATA (FILE OR TEXTREADER) TO AN ITERABLE TEXTREADER
+    # 2. HOMOGENEIZE INPUT DATA TO AN ITERABLE WITH DATAFRAMES:
+    # a list with a single dataframe or a pd.io.parsers.TextFileReader
     logging.info("Getting data string from source...")
     TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows)
     
-    # 4. EXTRACT SECTIONS IN A PARSER; EXTRACT SECTIONS HERE AND READ DATA IN 
-    # SAME LOOP? SHOULD DO....
+    # 3. EXTRACT AND READ DATA IN SAME LOOP -----------------------------------
     logging.info("Extracting sections...")
     data_buffer = StringIO()
+    valid_buffer = StringIO()
     
-    
-#    valid_buffer = ...
     for i,string_df in enumerate(TextParser):
-        # Get sections separated in a dataframe: one per column, only requested
-        # sections, ignore rest.
+        # a. Get sections separated in a dataframe columns:
+        # one per column, only requested sections, ignore rest.
         sections_df = get_sections.get_sections(string_df, schema, read_sections_list)
-        # Read elements from sections: along data chunks, resulting data types
-        # may vary if gaps
-        [data_df,out_dtypesi ] = read_sections.read_sections(sections_df, schema)
+        # b. Read elements from sections: along data chunks, resulting data types
+        # may vary if gaps, keep track of data types!
+        [data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema)
         if i == 0:
             out_dtypes = copy.deepcopy(out_dtypesi)
             
         for k in out_dtypesi: 
             if out_dtypesi in properties.numpy_floats:
                 out_dtypes.update({ k:out_dtypesi.get(k) })
-
+        # Save to buffer
         data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
-#        [output_buffer,valid_buffer,dtypes] = reader_function(TextParser, schema, read_sections = read_sections, idx_offset = idx_offset )
-#        
-#    # 5. OUTPUT DATA:----------------------------------------------------------
-#    # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
+        valid_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
+       
+    # 4. OUTPUT DATA ----------------------------------------------------------
+    # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
     data_buffer.seek(0)
-#    valid_buffer.seek(0)
-#    logging.info("Wrapping output....")
-#    chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
-#    logging.info('Data')
-#    # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail
-#    date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
-#    for i,element in enumerate(list(dtypes)):
-#        if dtypes.get(element) == 'datetime':
-#            date_columns.append(i)
-    data_reader = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes)#, parse_dates = date_columns)
-#    logging.info('Mask')
-#    valid_reader = pd.read_csv(valid_buffer,names = out_names, chunksize = chunksize)
+    valid_buffer.seek(0)
+    logging.info("Wrapping output....")
+#   Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader
+#   (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input):
+#   This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader
+    chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
+    # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail
+    date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
+    for i,element in enumerate(list(out_dtypes)):
+        if out_dtypes.get(element) == 'datetime':
+            date_columns.append(i)
+            
+    data_reader = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes, parse_dates = date_columns)
+    valid_reader = pd.read_csv(valid_buffer,names = data_df.columns, chunksize = chunksize)
 
-#    return data_reader, valid_reader
-    return data_reader
+    return data_reader, valid_reader
diff --git a/schemas/schemas.py b/schemas/schemas.py
index 8a981a0..f5dcf89 100644
--- a/schemas/schemas.py
+++ b/schemas/schemas.py
@@ -9,6 +9,7 @@ Read data file format json schema to dictionary
 Add schema validation:
     - check mandatory are not null
     - check fixed options
+..and return None if it does not validate
 
 """
 
@@ -59,22 +60,6 @@ def copy_template(schema, out_dir = None,out_path = None):
         print('\tValid names are: {}'.format(", ".join(schemas)))
         return
 
-def get_field_layout(field_layout_def,field_layout):
-    if not field_layout_def and not field_layout:
-        return None
-    elif not field_layout:
-        return field_layout_def
-    else:
-        return field_layout
-
-def get_delimiter(delimiter_def,delimiter):
-    if not delimiter_def and not delimiter:
-        return None
-    elif not delimiter_def:
-        return delimiter
-    else:
-        return field_layout
-
 def read_schema(schema_name = None, ext_schema_path = None):
 
     if schema_name:
-- 
GitLab