read.py 8.54 KB
Newer Older
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
1 2 3 4 5
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019

iregon's avatar
iregon committed
6 7
Reads source data (file, pandas DataFrame or pd.io.parsers.TextFileReader) to
a pandas DataFrame. The source data model needs to be input to the module as
iregon's avatar
iregon committed
8
a named model (included in the module) or as the path to a valid data model.
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
9 10 11 12 13 14 15 16 17 18 19

Data is validated against its data model after reading, producing a boolean mask.

Calls the schemas, reader and valiate modules in the tool to access the data models,
read the data and validate it.

@author: iregon
"""
import os
import sys
import pandas as pd
iregon's avatar
iregon committed
20 21
import logging
import json
iregon's avatar
iregon committed
22 23
import copy
from io import StringIO as StringIO
iregon's avatar
iregon committed
24

iregon's avatar
Cleaned  
iregon committed
25 26 27
from . import schemas
from . import properties
from .common import pandas_TextParser_hdlr
iregon's avatar
iregon committed
28 29 30 31
from .reader import import_data
from .reader import get_sections
from .reader import read_sections
from .validate import validate
iregon's avatar
iregon committed
32

iregon's avatar
iregon committed
33 34 35 36
toolPath = os.path.dirname(os.path.abspath(__file__))
schema_lib = os.path.join(toolPath,'schemas','lib')

def ERV(TextParser,read_sections_list, schema, code_tables_path):
iregon's avatar
Cleaned  
iregon committed
37

iregon's avatar
iregon committed
38 39 40 41 42 43 44 45 46 47
    data_buffer = StringIO()
    valid_buffer = StringIO()

    for i_chunk, string_df in enumerate(TextParser):
        # a. Get a DF with sections separated in columns:
        # - one section per column
        # - only sections requested, ignore rest
        # - requested NA sections as NaN columns
        # - columns order as in read_sections_list
        sections_df = get_sections.get_sections(string_df, schema, read_sections_list)
iregon's avatar
Cleaned  
iregon committed
48

iregon's avatar
iregon committed
49 50 51 52 53 54
        # b. Read elements from sections: along data chunks, resulting data types
        # may vary if gaps, keep track of data types!
        # Sections as parsed in the same order as sections_df.columns
        [data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema)
        if i_chunk == 0:
            out_dtypes = copy.deepcopy(out_dtypesi)
iregon's avatar
Cleaned  
iregon committed
55 56

        for k in out_dtypesi:
iregon's avatar
iregon committed
57 58
            if out_dtypesi in properties.numpy_floats:
                out_dtypes.update({ k:out_dtypesi.get(k) })
iregon's avatar
Cleaned  
iregon committed
59 60

        valid_df = validate.validate(data_df, valid_df, schema, code_tables_path)
iregon's avatar
iregon committed
61 62 63
        # Save to buffer
        data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
        valid_df.to_csv(valid_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
iregon's avatar
Cleaned  
iregon committed
64
    # Create the output
iregon's avatar
iregon committed
65 66 67 68 69 70 71 72
    # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
    data_buffer.seek(0)
    valid_buffer.seek(0)
    logging.info("Wrapping output....")
    # Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader
    # (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input):
    # This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader
    chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
iregon's avatar
Cleaned  
iregon committed
73 74
    # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type,
    # cannot specify 'datetime' (of any kind) here: would fail
iregon's avatar
iregon committed
75 76 77 78
    date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
    for i,element in enumerate(list(out_dtypes)):
        if out_dtypes.get(element) == 'datetime':
            date_columns.append(i)
iregon's avatar
Cleaned  
iregon committed
79

iregon's avatar
iregon committed
80 81
    data = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes, parse_dates = date_columns)
    valid = pd.read_csv(valid_buffer,names = data_df.columns, chunksize = chunksize)
iregon's avatar
Cleaned  
iregon committed
82

iregon's avatar
iregon committed
83 84
    return data, valid

iregon's avatar
iregon committed
85
def validate_arg(arg_name,arg_value,arg_type):
iregon's avatar
Cleaned  
iregon committed
86

iregon's avatar
iregon committed
87
    if arg_value and not isinstance(arg_value,arg_type):
iregon's avatar
Cleaned  
iregon committed
88
        logging.error('Argument {0} must be {1}, input type is {2}'.format(arg_name,arg_type,type(arg_value)))
iregon's avatar
iregon committed
89 90 91
        return False
    else:
        return True
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
92 93 94 95 96 97 98

def read(source, data_model = None, data_model_path = None, sections = None,chunksize = None,
         skiprows = None, out_path = None ):

    logging.basicConfig(format='%(levelname)s\t[%(asctime)s](%(filename)s)\t%(message)s',
                    level=logging.INFO,datefmt='%Y%m%d %H:%M:%S',filename=None)

iregon's avatar
iregon committed
99
    # 0. Validate input
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
100 101 102 103 104 105 106 107
    if not data_model and not data_model_path:
        logging.error('A valid data model name or path to data model must be provided')
        return
    if not isinstance(source,tuple(properties.supported_sources)):
        if not source:
            logging.error('Data source is empty (first argument to read()) ')
            return
        elif not os.path.isfile(source):
108
            logging.error('Could not open data source file {}'.format(source))
iregon's avatar
Cleaned  
iregon committed
109
            logging.info('If input source was not a file: supported in-memory data sources are {}'.format(",".join([ str(x) for x in properties.supported_sources])))
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
110
            return
iregon's avatar
iregon committed
111 112 113
    if not validate_arg('sections',sections,list):
        return
    if not validate_arg('chunksize',chunksize,int):
iregon's avatar
Cleaned  
iregon committed
114
        return
iregon's avatar
iregon committed
115
    if not validate_arg('skiprows',skiprows,int):
iregon's avatar
Cleaned  
iregon committed
116
        return
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
117

iregon's avatar
iregon committed
118 119
    # 1. Read data model
    # Schema reader will return None if schema does not validate
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
120 121 122 123
    logging.info("READING DATA MODEL SCHEMA FILE...")
    schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path)
    if not schema:
        return
iregon's avatar
iregon committed
124 125 126
    if data_model:
        model_path = os.path.join(schema_lib,data_model)
    else:
iregon's avatar
Cleaned  
iregon committed
127
        model_path = data_model_path
iregon's avatar
iregon committed
128
    code_tables_path = os.path.join(model_path,'code_tables')
iregon's avatar
Cleaned  
iregon committed
129

iregon's avatar
iregon committed
130 131 132 133
    # For future use: some work already done in schema reading
    if schema['header'].get('multiple_reports_per_line'):
        logging.error('File format not yet supported')
        sys.exit(1)
iregon's avatar
Cleaned  
iregon committed
134

iregon's avatar
iregon committed
135
    # 2. Read and validate data
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
136
    imodel = data_model if data_model else data_model_path
iregon's avatar
Cleaned  
iregon committed
137 138 139
    logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel))

    # 2.1. Subset data model sections to requested sections
iregon's avatar
iregon committed
140 141 142 143 144
    parsing_order = schema['header'].get('parsing_order')
    if not sections:
        sections = [ x.get(y) for x in parsing_order for y in x ]
        read_sections_list = [y for x in sections for y in x]
    else:
iregon's avatar
Cleaned  
iregon committed
145 146
        read_sections_list = sections

iregon's avatar
iregon committed
147 148 149 150
    # 2.2 Homogeneize input data to an iterable with dataframes:
    # a list with a single dataframe or a pd.io.parsers.TextFileReader
    logging.info("Getting data string from source...")
    TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows)
iregon's avatar
Cleaned  
iregon committed
151

iregon's avatar
iregon committed
152 153 154
    # 2.3. Extract, read and validate data in same loop
    logging.info("Extracting and reading sections")
    data,valid = ERV(TextParser,read_sections_list, schema, code_tables_path)
iregon's avatar
Cleaned  
iregon committed
155

iregon's avatar
iregon committed
156
    # 3. Create out data attributes
iregon's avatar
Cleaned  
iregon committed
157
    logging.info("CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL")
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
158
    data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names']
iregon's avatar
iregon committed
159
    out_atts = schemas.df_schema(data_columns, schema)
iregon's avatar
iregon committed
160

iregon's avatar
iregon committed
161
    # 4. Output to files if requested
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
162
    if out_path:
iregon's avatar
iregon committed
163 164 165 166 167
        enlisted = False
        if not isinstance(data,pd.io.parsers.TextFileReader):
            data = [data]
            valid = [valid]
            enlisted = True
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
168
        logging.info('WRITING DATA TO FILES IN: {}'.format(out_path))
iregon's avatar
iregon committed
169 170 171 172 173 174 175 176 177 178 179 180 181 182

        for i, (data_df,valid_df) in enumerate(zip(data,valid)):
            header = False
            mode = 'a'
            if i == 0:
                mode = 'w'
                cols = [ x for x in data_df ]
                if isinstance(cols[0],tuple):
                    header = [":".join(x) for x in cols]
                    out_atts_json = { ":".join(x):out_atts.get(x) for x in out_atts.keys() }
                else:
                    header = cols
                    out_atts_json = out_atts
            data_df.to_csv(os.path.join(out_path,'data.csv'), header = header, mode = mode, encoding = 'utf-8',index = True, index_label='index')
iregon's avatar
Cleaned  
iregon committed
183
            valid_df.to_csv(os.path.join(out_path,'valid_mask.csv'), header = header, mode = mode, encoding = 'utf-8',index = True, index_label='index')
iregon's avatar
iregon committed
184 185 186
        if enlisted:
            data = data[0]
            valid = valid[0]
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
187
        else:
iregon's avatar
iregon committed
188 189
            data = pandas_TextParser_hdlr.restore(data.f,data.orig_options)
            valid = pandas_TextParser_hdlr.restore(valid.f,valid.orig_options)
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
190 191
        with open(os.path.join(out_path,'atts.json'),'w') as fileObj:
            json.dump(out_atts_json,fileObj,indent=4)
iregon's avatar
iregon committed
192

iregon's avatar
iregon committed
193
    # 5. Return data
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
194 195 196 197 198 199
    return {'data':data,'atts':out_atts,'valid_mask':valid}

if __name__=='__main__':
    kwargs = dict(arg.split('=') for arg in sys.argv[2:])
    if 'sections' in kwargs.keys():
        kwargs.update({ 'sections': [ x.strip() for x in kwargs.get('sections').split(",")] })
iregon's avatar
iregon committed
200
    read(sys.argv[1], **kwargs) # kwargs