fixed_width.py 19.9 KB
Newer Older
Irene Perez Gonzalez's avatar
Irene Perez Gonzalez committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019

Reads source data from a data model with fixed width fields to a pandas
DataFrame.

Uses the data model layout to first find sections in the data and internally
store the data in sections, then reads in, decodes and converts the elements on
a section by section basis and finally merges that together in the output
dataframe.
 
Internally works assuming highest complexity in the input data model: 
multiple non sequential sections

@author: iregon
"""

from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
# CAREFULL HERE:
# Note that in Python 3, the io.open function is an alias for the built-in open function.
# The built-in open function only supports the encoding argument in Python 3, not Python 2.
# https://docs.python.org/3.4/library/io.html?highlight=io
from io import StringIO as StringIO

import sys
import pandas as pd
from copy import deepcopy
import logging
import csv # To disable quoting
import mdf_reader.properties as properties
import mdf_reader.common.functions as functions
from mdf_reader.common.converters import converters
from mdf_reader.common.decoders import decoders

if sys.version_info[0] >= 3:
    py3 = True
else:
    py3 = False
    from io import BytesIO as BytesIO
    

FULL_WIDTH = 100000

#   ---------------------------------------------------------------------------
#   FUNCTIONS TO PERFORM INITIAL SEPARATION OF SECTIONS: MAIN IS GET_SECTIONS()
#   ---------------------------------------------------------------------------
def extract_data():
    section_len = section_lens.get(threads[thread_id]['section'])
    if section_len:
        threads[thread_id]['data'] = pd.DataFrame(threads[thread_id]['parent_data'][0].str[0:section_len]) # object consistency needed here
        threads[thread_id]['modulo'] = pd.DataFrame(threads[thread_id]['parent_data'][0].str[section_len:]) # object consistency needed here
    else:
        threads[thread_id]['data'] = pd.DataFrame(threads[thread_id]['parent_data'][0].str[0:]) #threads[thread_id]['parent_data'].copy()
        # Could even be like with section_len (None in section_len will read to the end)
        threads[thread_id]['modulo'] = pd.DataFrame(columns = [0]) # Just for consistency
    del threads[thread_id]['parent_data']

def add_next_children():
    global children_parsing_order, branch_parsing_order, children_group_type, children_group_number
    children_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
    branch_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
    children_group_type = list(children_parsing_order[0])[0]
    children_group_number = threads[thread_id]['children_group_number']
    threads[thread_id]['children_no'] = 0
    threads[thread_id]['children'] = []
    add_children()

def add_higher_group_children():
    global children_parsing_order, branch_parsing_order, children_group_type, children_group_number
    children_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
    children_parsing_order.pop(0) # Move to next group of sections
    if len(children_parsing_order) > 0:
        branch_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
        branch_parsing_order.pop(0)
        children_group_type = list(children_parsing_order[0])[0]
        children_group_number = threads[thread_id]['children_group_number'] + 1
        add_children()

def add_children():
    if children_group_type == 's':
        add_static_children()
    else:
        add_dynamic_children()

def add_static_children():
    threads[thread_id]['children_no'] += 1
    children_thread_id = str(children_group_number) + str(0) + thread_id
    threads[thread_id]['children'].append(children_thread_id)
    # Now build children's thread
    children_section = children_parsing_order[0][children_group_type].pop(0)
    grandchildren_group_number = children_group_number
    if len(children_parsing_order[0][children_group_type]) == 0:
        children_parsing_order.pop(0)
        if len(children_parsing_order) > 0:
            grandchildren_group_number += 1
        else:
            grandchildren_group_number = None
    threads[children_thread_id] = {'parsing_order':children_parsing_order}
    threads[children_thread_id]['group_number'] = children_group_number
    threads[children_thread_id]['group_type'] = children_group_type
    threads[children_thread_id]['section'] = children_section
    threads[children_thread_id]['parent_data'] = threads[thread_id]['modulo']
    threads[thread_id]['modulo'].iloc[0:0] # Remove reports from modulo
    threads[children_thread_id]['children_group_number'] = grandchildren_group_number

def add_dynamic_children():
    for i in range(0,len(children_parsing_order[0][children_group_type])):
        branch_i_parsing_order = deepcopy(branch_parsing_order)
        children_thread_id = str(children_group_number) + str(i+1) + thread_id
        # Now build children's thread
        children_section = children_parsing_order[0][children_group_type].pop(0)
        children_idx = threads[thread_id]['modulo'].loc[threads[thread_id]['modulo'][0].str[0:sentinals_lens.get(children_section)] == sentinals.get(children_section)].index
        if len(children_idx) == 0:
            continue
        threads[thread_id]['children'].append(children_thread_id)
        threads[thread_id]['children_no'] += 1
        branch_i_parsing_order[0][children_group_type].remove(children_section)
        grandchildren_group_number = children_group_number
        if len(branch_i_parsing_order[0][children_group_type]) == 0 or children_group_type == 'e':
            branch_i_parsing_order.pop(0)
            if len(children_parsing_order) > 0:
                grandchildren_group_number += 1
            else:
                grandchildren_group_number = None
        threads[children_thread_id] = {'parsing_order':branch_i_parsing_order}
        threads[children_thread_id]['group_number'] = children_group_number
        threads[children_thread_id]['group_type'] = children_group_type
        threads[children_thread_id]['section'] = children_section
        threads[children_thread_id]['parent_data'] = threads[thread_id]['modulo'].loc[children_idx]
        threads[thread_id]['modulo'].drop(children_idx,inplace = True)
        threads[children_thread_id]['children_group_number'] = grandchildren_group_number
    if (len(threads[thread_id]['modulo'])) > 0:
        add_higher_group_children()

def get_sections(df_in):
    # threads elements:
    #    'parsing_order'            What needs to be applied to current parent data
    #    'group_number'             Order in the global parsing order
    #    'group_type'               Is it sequential, exclusive or optional
    #    'section'                  Section name to be extracted from parent_data to data
    #    'parent_data'              Inital data from which section must be extracted
    #    'data'                     Section data extracted from parent_data
    #    'modulo'                   Reminder of parent_data after extracting section (data)
    #    'children_no'              Number of children threads to build, based on next parsing order list element. Resets to number of active children
    #    'children'                 Thread id for every child
    #    'children_group_number'    Group number (in the global parsing order, of the children) 
    global sentinals, section_lens, sentinal_lens, parsing_order
    global children_group_type
    global threads
    global thread_id
    global group_type
    
    # Initial "node': input data
    threads = dict()
    thread_id = '00'
    threads_queue = [thread_id]
    threads[thread_id] = {'parsing_order':parsing_order}
    threads[thread_id]['group_number'] = 0
    threads[thread_id]['group_type'] = None
    threads[thread_id]['section'] = None
    threads[thread_id]['parent_data'] = df_in
    threads[thread_id]['data'] = None
    threads[thread_id]['modulo'] = threads[thread_id]['parent_data']
    del threads[thread_id]['parent_data']
    threads[thread_id]['children_group_number'] = 1
    add_next_children()
    threads_queue.extend(threads[thread_id]['children'])
    threads_queue.remove(thread_id)
    # And now, once initialized, let it grow:
    logging.info('Processing section partitioning threads')
    while threads_queue:
        thread_id = threads_queue[0]
        logging.info('{} ...'.format(thread_id))
        group_type = threads[thread_id]['group_type']
        # get section data
        extract_data()
        # kill thread if nothing there
        if len(threads[thread_id]['data']) == 0:
            del threads[thread_id]
            logging.info('{} deleted: no data'.format(thread_id))
            threads_queue.pop(0)
            continue
        # build children threads
        if len(threads[thread_id]['parsing_order']) > 0  and len(threads[thread_id]['modulo']) > 0:
            add_next_children()
            threads_queue.extend(threads[thread_id]['children'])
            #del threads[thread_id]['modulo'] # not until we control what to do whit leftovers....
        threads_queue.pop(0)
        logging.info('done')
    section_dict = dict()
    section_groups = [ d[x] for d in parsing_order for x in d.keys() ]
    sections = [item for sublist in section_groups for item in sublist]
    for section in sections:
        section_dict[section] = pd.DataFrame() # Index as initial size to help final merging
        thread_ids = [ x for x in threads.keys() if threads[x]['section'] == section ]
        for thread_id in thread_ids:
            section_dict[section] = section_dict[section].append(threads[thread_id]['data'],ignore_index=False,sort=True)
        section_dict[section].sort_index(inplace=True)
    return section_dict

#   ---------------------------------------------------------------------------
#   FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS, AN ITERABLE:
#   EITHER A PD.IO.PARSERS.TEXTFILEREADER OR A LIST, DEPENDING ON 
#   SOURCE TYPE AND CHUNKSIZE ARGUMENT
#   BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE
#   ---------------------------------------------------------------------------
def source_11(source,schema, chunksize = None, skiprows = None, delimiter = None ):
    # 11: 1 REPORT PER RECORD IN ONE LINE
    # delimiter = '\t' so that it reads blanks as blanks, otherwise reads as empty: NaN
    # this applies mainly when reading elements from sections, but we leave it also here
    if isinstance(source,pd.DataFrame):
        TextParser = source
        TextParser.columns = [0]
        TextParser = [TextParser]
    elif isinstance(source,pd.io.parsers.TextFileReader):
        TextParser = source
    else:
        TextParser = pd.read_fwf(source,widths=[FULL_WIDTH],header = None, skiprows = skiprows, delimiter="\t", chunksize = chunksize)
        if not chunksize:
            TextParser = [TextParser]
    return TextParser

def source_1x(source,schema, chunksize = None, skiprows = None, delimiter = None  ):
    # 1X: MULTIPLE REPORTS PER RECORD IN ONE LINE
    return source_11(source,schema, chunksize = chunksize, skiprows = skiprows, delimiter = delimiter )

#   ---------------------------------------------------------------------------
#   MAIN FUNCTIONS
#   ---------------------------------------------------------------------------
def source_to_df(TextParser, schema, read_sections, idx_offset = 0):
    global sentinals, section_lens, sentinals_lens
    global parsing_order
    
    section_lens = { section: schema['sections'][section]['header'].get('length') for section in schema['sections'].keys()}
    sentinals = { section: schema['sections'][section]['header'].get('sentinal') for section in schema['sections'].keys()}
    sentinals_lens = { section: schema['sections'][section]['header'].get('sentinal_length') for section in schema['sections'].keys()}
    parsing_order = schema['header']['parsing_order']
    chunk_len = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else 0
    multiindex = True if len(read_sections) > 1 or read_sections[0] != properties.dummy_level else False
    out_dtypes = dict()
    if multiindex:
        for section in read_sections:
            out_dtypes.update({ (section,i):properties.pandas_dtypes.get(schema['sections'][section]['elements'][i].get('column_type')) for i in schema['sections'][section]['elements'].keys() } )
    else:
        for section in read_sections:
            out_dtypes.update({ i:properties.pandas_dtypes.get(schema['sections'][section]['elements'][i].get('column_type')) for i in schema['sections'][section]['elements'].keys() } ) 
    I_CHUNK = 0
    output_buffer = StringIO() if py3 else BytesIO()
    valid_buffer = StringIO() if py3 else BytesIO()
    for df in TextParser: # Indices here are kept are those according to the full record
        logging.info('Processing chunk {}'.format(I_CHUNK))
        # 1. Remove delimiter from mixed type (like meds-buoys) if exists
        if schema['header'].get('delimiter'):
            df.loc[:,0] = df[0].str.replace(',','')
        # 2. Separate sections
        logging.info('Accessing sections'.format(I_CHUNK))
        section_strings = get_sections(df)
        # 3. Read section elements
        # Look below, if names are passed to read_csv as tuples, it creates automatically a multiindex. Initially I did not want this,  wanted tuples
        # but they do not seem to be recommended:https://github.com/pandas-dev/pandas/issues/11799
        # The mapping (like in CDM or others), would be a bit harder if using multiindex...or will get the tuples equally fine
        # 3.0. Prepare df to paste all the sections of current chunk and get initially defined dtypes
        df_out = pd.DataFrame()
        valid_out = pd.DataFrame()
        # We have to cat the data, and also the names and properties to write this to the buffer
        logging.info('Reading and coverting section elements')
        # 3.1. Loop through sections
        for section in read_sections:
            logging.info('{} ...'.format(section))
            section_buffer = StringIO() if py3 else BytesIO()
            # Writing options from quoting on to prevent supp buoy data to be quoted:
            # maybe this happenned because buoy data has commas, and pandas makes its own decission about
            # how to write that.....
            #https://stackoverflow.com/questions/21147058/pandas-to-csv-output-quoting-issue
            section_strings[section].to_csv(section_buffer,header=False, encoding = 'utf-8',index = False,quoting=csv.QUOTE_NONE,escapechar="\\",sep="\t") # Here indices are lost, have to give the real ones, those in section_strings
            shut_up = section_buffer.seek(0)
            # 3.1.1. Read section elements from schema and read from buffer with pandas as objects
            section_names = schema['sections'][section]['elements'].keys()
            ignore = [ i for i in section_names if schema['sections'][section]['elements'][i].get('ignore') ] # evals to True if set and true, evals to False if not set or set and false
            section_widths = list(map(lambda x: x if x else FULL_WIDTH, [ schema['sections'][section]['elements'][i].get('field_length') for i in section_names ]))
            section_dtypes = { i:'object' for i in section_names }
            section_missing = { i:schema['sections'][section]['elements'][i].get('missing_value') if schema['sections'][section]['elements'][i].get('disable_white_strip') == True
                               else [schema['sections'][section]['elements'][i].get('missing_value')," "*schema['sections'][section]['elements'][i].get('field_length', FULL_WIDTH)]
                               for i in section_names }
            section_elements = pd.read_fwf(section_buffer, widths = section_widths, header = None, names = section_names , na_values = section_missing, delimiter="\t", encoding = 'utf-8', dtype = section_dtypes, skip_blank_lines = False )
            section_valid = pd.DataFrame(index = section_elements.index, columns = section_elements.columns)
            # 3.1.2. Decode, scale|offset and convert to dtype (ints will be floats if NaNs)
            section_dtypes = { i:schema['sections'][section]['elements'][i]['column_type'] for i in section_names }
            encoded = [ (x) for x in section_names if 'encoding' in schema['sections'][section]['elements'][x]]
            section_encoding = { i:schema['sections'][section]['elements'][i]['encoding'] for i in encoded }

            for element in section_dtypes.keys():
                missing = section_elements[element].isna()
                if element in encoded:
                    section_elements[element] = decoders.get(section_encoding.get(element)).get(section_dtypes.get(element))(section_elements[element])
                  
                kwargs = { converter_arg:schema['sections'][section]['elements'][element].get(converter_arg) for converter_arg in properties.data_type_conversion_args.get(section_dtypes.get(element))  }
                section_elements[element] = converters.get(section_dtypes.get(element))(section_elements[element], **kwargs)
                
                section_valid[element] = missing | section_elements[element].notna()
                
            # 3.1.3. Format section:
            #   - Put data in its rightfull place of the original data (indexing!) and remove section elements not desired
            #   - Name columns: tuples (section, element_name) for multisection, element_name if one section
            #   tuples will turn into a  multiindex in the df_reader below
            section_elements.index = section_strings[section].index
            section_valid.index = section_strings[section].index
            section_frame = pd.DataFrame(data = { x:pd.Series(index=range(idx_offset + I_CHUNK*chunk_len, idx_offset + len(df) + I_CHUNK*chunk_len )) for x in section_names})
            valid_frame = section_frame.copy()
            if len(section_elements) > 0:
                section_frame.loc[section_elements.index] = section_elements
                valid_frame.loc[section_elements.index] = section_valid
            section_frame.drop(ignore, axis = 1, inplace = True)
            valid_frame.drop(ignore, axis = 1, inplace = True)
            section_frame.columns = [ (section, x) for x in section_frame.columns] if multiindex else section_frame.columns
            valid_frame.columns = section_frame.columns
            out_dtypes.update({ i:section_frame[i].dtype.name for i in section_frame if section_frame[i].dtype.name in properties.numpy_floats})
            # 3.1.4. Paste section to rest of chunk
            df_out = pd.concat([df_out,section_frame],sort = False,axis=1)
            valid_out = pd.concat([valid_out,valid_frame],sort = False,axis=1)
        # 4.3. Add _datetime section: watch this if we mean to do multiple reports in record!!!
        # for this to be valid, would have to assume that same day reports and that date in common report section....
        # If datetime is derived from within the actual report, then we would have add _datetime after expansion on dataframe posprocessing
        if schema['header'].get('date_parser'):
            date_parser = schema['header'].get('date_parser')
            date_name = ('_datetime','_datetime') if multiindex else '_datetime'
            date_elements = [(date_parser['section'],x) for x in date_parser['elements'] ] if date_parser.get('section') else date_parser.get('elements')
            out_dtypes.update({ date_name: 'object' })
            df_out = functions.df_prepend_datetime(df_out, date_elements, date_parser['format'], date_name = date_name )
            valid_out = pd.concat([pd.DataFrame(index = valid_out.index, data = True,columns = [date_name]),valid_out],sort = False,axis=1)
        # 4.4. Add chunk data to output
        header = False if I_CHUNK == 0 else False
        df_out.to_csv(output_buffer,header=header, mode = 'a', encoding = 'utf-8',index = False)
        valid_out.to_csv(valid_buffer,header=header, mode = 'a', encoding = 'utf-8',index = False)
        I_CHUNK += 1

    return output_buffer,valid_buffer,out_dtypes