read.py 8.75 KB
Newer Older
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
1 2 3 4 5
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019

iregon's avatar
iregon committed
6 7
Reads source data (file, pandas DataFrame or pd.io.parsers.TextFileReader) to
a pandas DataFrame. The source data model needs to be input to the module as
iregon's avatar
iregon committed
8
a named model (included in the module) or as the path to a valid data model.
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
9 10 11

Data is validated against its data model after reading, producing a boolean mask.

iregon's avatar
iregon committed
12 13 14 15
Uses submodules:
- schemas
- reader
- valiate
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
16 17 18 19 20 21

@author: iregon
"""
import os
import sys
import pandas as pd
iregon's avatar
iregon committed
22 23
import logging
import json
iregon's avatar
iregon committed
24 25
import copy
from io import StringIO as StringIO
iregon's avatar
iregon committed
26

iregon's avatar
Cleaned  
iregon committed
27 28 29
from . import schemas
from . import properties
from .common import pandas_TextParser_hdlr
iregon's avatar
iregon committed
30 31 32 33
from .reader import import_data
from .reader import get_sections
from .reader import read_sections
from .validate import validate
iregon's avatar
iregon committed
34

iregon's avatar
iregon committed
35 36 37 38
toolPath = os.path.dirname(os.path.abspath(__file__))
schema_lib = os.path.join(toolPath,'schemas','lib')

def ERV(TextParser,read_sections_list, schema, code_tables_path):
iregon's avatar
Cleaned  
iregon committed
39

iregon's avatar
iregon committed
40 41 42 43 44 45 46 47 48 49
    data_buffer = StringIO()
    valid_buffer = StringIO()

    for i_chunk, string_df in enumerate(TextParser):
        # a. Get a DF with sections separated in columns:
        # - one section per column
        # - only sections requested, ignore rest
        # - requested NA sections as NaN columns
        # - columns order as in read_sections_list
        sections_df = get_sections.get_sections(string_df, schema, read_sections_list)
iregon's avatar
Cleaned  
iregon committed
50

iregon's avatar
iregon committed
51 52 53 54 55 56
        # b. Read elements from sections: along data chunks, resulting data types
        # may vary if gaps, keep track of data types!
        # Sections as parsed in the same order as sections_df.columns
        [data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema)
        if i_chunk == 0:
            out_dtypes = copy.deepcopy(out_dtypesi)
iregon's avatar
Cleaned  
iregon committed
57 58

        for k in out_dtypesi:
iregon's avatar
iregon committed
59 60
            if out_dtypesi in properties.numpy_floats:
                out_dtypes.update({ k:out_dtypesi.get(k) })
iregon's avatar
Cleaned  
iregon committed
61 62

        valid_df = validate.validate(data_df, valid_df, schema, code_tables_path)
iregon's avatar
iregon committed
63 64 65
        # Save to buffer
        data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
        valid_df.to_csv(valid_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
iregon's avatar
Cleaned  
iregon committed
66
    # Create the output
iregon's avatar
iregon committed
67 68 69 70 71 72 73 74
    # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
    data_buffer.seek(0)
    valid_buffer.seek(0)
    logging.info("Wrapping output....")
    # Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader
    # (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input):
    # This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader
    chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
iregon's avatar
Cleaned  
iregon committed
75 76
    # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type,
    # cannot specify 'datetime' (of any kind) here: would fail
iregon's avatar
iregon committed
77 78 79 80
    date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
    for i,element in enumerate(list(out_dtypes)):
        if out_dtypes.get(element) == 'datetime':
            date_columns.append(i)
iregon's avatar
Cleaned  
iregon committed
81

iregon's avatar
iregon committed
82 83
    data = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes, parse_dates = date_columns)
    valid = pd.read_csv(valid_buffer,names = data_df.columns, chunksize = chunksize)
iregon's avatar
Cleaned  
iregon committed
84

iregon's avatar
iregon committed
85 86
    return data, valid

iregon's avatar
iregon committed
87 88
def validate_arg(arg_name,arg_value,arg_type):
    if arg_value and not isinstance(arg_value,arg_type):
iregon's avatar
Cleaned  
iregon committed
89
        logging.error('Argument {0} must be {1}, input type is {2}'.format(arg_name,arg_type,type(arg_value)))
iregon's avatar
iregon committed
90 91 92
        return False
    else:
        return True
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
93

iregon's avatar
iregon committed
94 95 96 97 98 99 100
def validate_path(arg_name,arg_value):
    if arg_value and not os.path.isdir(arg_value):
        logging.error('{0} could not find path {1}'.format(arg_name,arg_value))
        return False
    else:
        return True

Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
101 102 103 104 105 106
def read(source, data_model = None, data_model_path = None, sections = None,chunksize = None,
         skiprows = None, out_path = None ):

    logging.basicConfig(format='%(levelname)s\t[%(asctime)s](%(filename)s)\t%(message)s',
                    level=logging.INFO,datefmt='%Y%m%d %H:%M:%S',filename=None)

iregon's avatar
iregon committed
107
    # 0. Validate input
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
108 109 110 111 112 113 114 115
    if not data_model and not data_model_path:
        logging.error('A valid data model name or path to data model must be provided')
        return
    if not isinstance(source,tuple(properties.supported_sources)):
        if not source:
            logging.error('Data source is empty (first argument to read()) ')
            return
        elif not os.path.isfile(source):
116
            logging.error('Could not open data source file {}'.format(source))
iregon's avatar
Cleaned  
iregon committed
117
            logging.info('If input source was not a file: supported in-memory data sources are {}'.format(",".join([ str(x) for x in properties.supported_sources])))
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
118
            return
iregon's avatar
iregon committed
119 120 121
    if not validate_arg('sections',sections,list):
        return
    if not validate_arg('chunksize',chunksize,int):
iregon's avatar
Cleaned  
iregon committed
122
        return
iregon's avatar
iregon committed
123
    if not validate_arg('skiprows',skiprows,int):
iregon's avatar
Cleaned  
iregon committed
124
        return
iregon's avatar
iregon committed
125 126 127 128
    if not validate_path('data_model_path',data_model_path):
        return
    if not validate_path('out_path',out_path):
        return
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
129

iregon's avatar
iregon committed
130
    # 1. Read data model
iregon's avatar
iregon committed
131 132 133
    # Schema reader will return empty if cannot read schema or is not valid
    # and will log the corresponding error
    # multiple_reports_per_line error also while reading schema
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
134 135 136 137
    logging.info("READING DATA MODEL SCHEMA FILE...")
    schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path)
    if not schema:
        return
iregon's avatar
iregon committed
138 139 140
    if data_model:
        model_path = os.path.join(schema_lib,data_model)
    else:
iregon's avatar
Cleaned  
iregon committed
141
        model_path = data_model_path
iregon's avatar
iregon committed
142
    code_tables_path = os.path.join(model_path,'code_tables')
iregon's avatar
Cleaned  
iregon committed
143 144


iregon's avatar
iregon committed
145
    # 2. Read and validate data
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
146
    imodel = data_model if data_model else data_model_path
iregon's avatar
Cleaned  
iregon committed
147 148 149
    logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel))

    # 2.1. Subset data model sections to requested sections
iregon's avatar
iregon committed
150 151 152 153 154
    parsing_order = schema['header'].get('parsing_order')
    if not sections:
        sections = [ x.get(y) for x in parsing_order for y in x ]
        read_sections_list = [y for x in sections for y in x]
    else:
iregon's avatar
Cleaned  
iregon committed
155 156
        read_sections_list = sections

iregon's avatar
iregon committed
157 158 159 160
    # 2.2 Homogeneize input data to an iterable with dataframes:
    # a list with a single dataframe or a pd.io.parsers.TextFileReader
    logging.info("Getting data string from source...")
    TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows)
iregon's avatar
Cleaned  
iregon committed
161

iregon's avatar
iregon committed
162 163 164
    # 2.3. Extract, read and validate data in same loop
    logging.info("Extracting and reading sections")
    data,valid = ERV(TextParser,read_sections_list, schema, code_tables_path)
iregon's avatar
Cleaned  
iregon committed
165

iregon's avatar
iregon committed
166
    # 3. Create out data attributes
iregon's avatar
Cleaned  
iregon committed
167
    logging.info("CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL")
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
168
    data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names']
iregon's avatar
iregon committed
169
    out_atts = schemas.df_schema(data_columns, schema)
iregon's avatar
iregon committed
170

iregon's avatar
iregon committed
171
    # 4. Output to files if requested
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
172
    if out_path:
iregon's avatar
iregon committed
173 174 175 176 177
        enlisted = False
        if not isinstance(data,pd.io.parsers.TextFileReader):
            data = [data]
            valid = [valid]
            enlisted = True
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
178
        logging.info('WRITING DATA TO FILES IN: {}'.format(out_path))
iregon's avatar
iregon committed
179 180 181 182 183 184 185 186 187 188 189 190 191 192

        for i, (data_df,valid_df) in enumerate(zip(data,valid)):
            header = False
            mode = 'a'
            if i == 0:
                mode = 'w'
                cols = [ x for x in data_df ]
                if isinstance(cols[0],tuple):
                    header = [":".join(x) for x in cols]
                    out_atts_json = { ":".join(x):out_atts.get(x) for x in out_atts.keys() }
                else:
                    header = cols
                    out_atts_json = out_atts
            data_df.to_csv(os.path.join(out_path,'data.csv'), header = header, mode = mode, encoding = 'utf-8',index = True, index_label='index')
iregon's avatar
Cleaned  
iregon committed
193
            valid_df.to_csv(os.path.join(out_path,'valid_mask.csv'), header = header, mode = mode, encoding = 'utf-8',index = True, index_label='index')
iregon's avatar
iregon committed
194 195 196
        if enlisted:
            data = data[0]
            valid = valid[0]
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
197
        else:
iregon's avatar
iregon committed
198 199
            data = pandas_TextParser_hdlr.restore(data.f,data.orig_options)
            valid = pandas_TextParser_hdlr.restore(valid.f,valid.orig_options)
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
200 201
        with open(os.path.join(out_path,'atts.json'),'w') as fileObj:
            json.dump(out_atts_json,fileObj,indent=4)
iregon's avatar
iregon committed
202

iregon's avatar
iregon committed
203
    # 5. Return data
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
204 205 206 207 208 209
    return {'data':data,'atts':out_atts,'valid_mask':valid}

if __name__=='__main__':
    kwargs = dict(arg.split('=') for arg in sys.argv[2:])
    if 'sections' in kwargs.keys():
        kwargs.update({ 'sections': [ x.strip() for x in kwargs.get('sections').split(",")] })
iregon's avatar
iregon committed
210
    read(sys.argv[1], **kwargs) # kwargs