Commit 4cf39a3d authored by iregon's avatar iregon
Browse files

Removed all submodules

parent 3647ead3
from . import fixed_width
from . import delimited
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019
Reads source data from a data model with delimited fields to a pandas
DataFrame.
Assumes source data as data model layout and all sections and elements in data.
Reads in full data content, then decodes and converts the elements.
Internally works assuming highest complexity in the input data model:
multiple sequential sections
@author: iregon
"""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
# CAREFULL HERE:
# Note that in Python 3, the io.open function is an alias for the built-in open function.
# The built-in open function only supports the encoding argument in Python 3, not Python 2.
# https://docs.python.org/3.4/library/io.html?highlight=io
import io
from io import open # To allow for encoding definition
from io import StringIO as StringIO
import sys
import pandas as pd
import numpy as np
from copy import deepcopy
import logging
import csv # To disable quoting
import mdf_reader.common.functions as functions
from mdf_reader.common.converters import converters
from mdf_reader.common.decoders import decoders
import mdf_reader.properties as properties
if sys.version_info[0] >= 3:
py3 = True
else:
py3 = False
from io import BytesIO as BytesIO
# ---------------------------------------------------------------------------
# MAIN FUNCTIONS
# ---------------------------------------------------------------------------
def source_to_df(source, schema, read_sections, idx_offset = 0):
column_names = []
for section in schema['sections']:
column_names.extend([ (section,i) for i in schema['sections'][section]['elements'] ])
multiindex = True if len(read_sections) > 1 or read_sections[0] != properties.dummy_level else False
out_dtypes = dict()
out_dtypes.update({ (section,i):properties.pandas_dtypes.get(schema['sections'][section]['elements'][i].get('column_type')) for i in schema['sections'][section]['elements'].keys() } )
I_CHUNK = 0
output_buffer = StringIO() if py3 else BytesIO()
valid_buffer = StringIO() if py3 else BytesIO()
for df in source: # Indices here are kept are those according to the full record
logging.info('Processing chunk {}'.format(I_CHUNK))
# 0. Name columns
df.columns = pd.MultiIndex.from_tuples(column_names)
# 1. Remove sections not requested
drop_sections = [ x for x in schema['sections'] if x not in read_sections ]
df.drop(drop_sections, axis = 1, level = 0, inplace = True)
# 2. Decode, scale|offset and convert to dtype (ints will be floats if NaNs)
dtypes = dict()
encodings = dict()
valid = pd.DataFrame(index = df.index, columns = df.columns)
for section in read_sections:
dtypes.update({ (section,i):schema['sections'][section]['elements'][i]['column_type'] for i in schema['sections'][section]['elements'] })
encoded = [ (x) for x in schema['sections'][section]['elements'] if 'encoding' in schema['sections'][section]['elements'][x]]
encodings.update({ (section,i):schema['sections'][section]['elements'][i]['encoding'] for i in encoded })
for element in dtypes.keys():
missing = df[element].isna()
if element in encoded:
df[element] = decoders.get(encodings.get(element)).get(dtypes.get(element))(df[element])
kwargs = { converter_arg:schema['sections'][element[0]]['elements'][element[1]].get(converter_arg) for converter_arg in properties.data_type_conversion_args.get(dtypes.get(element)) }
df[element] = converters.get(dtypes.get(element))(df[element], **kwargs)
valid[element] = missing | df[element].notna()
# Add _datetime section: watch this if we mean to do multiple reports in record!!!
# for this to be valid, would have to assume that same day reports and that date in common report section....
# If datetime is derived from within the actual report, then we would have add _datetime after expansion on dataframe posprocessing
if schema['header'].get('date_parser'):
date_parser = schema['header'].get('date_parser')
date_name = ('_datetime','_datetime') if multiindex else '_datetime'
date_elements = [(date_parser['section'],x) for x in date_parser['elements'] ] if date_parser.get('section') else date_parser.get('elements')
out_dtypes.update({ date_name: 'object' })
df = functions.df_prepend_datetime(df, date_elements, date_parser['format'], date_name = date_name )
valid = pd.concat([pd.DataFrame(index = valid.index, data = True,columns = [date_name]),valid],sort = False,axis=1)
out_dtypes.update({ i:df[i].dtype.name for i in df if df[i].dtype.name in properties.numpy_floats})
if idx_offset > 0:
df.index = df.index + idx_offset
# If I get into the section: is it really only removing that named element from that section???? have to check
#element[section].drop([element],axis=1,level=1,inplace = True)
# Drop level 0 in multilevel if len(read_sections)==1 or section is dummy
# 3. Add chunk data to output
header = False if I_CHUNK == 0 else False
df.to_csv(output_buffer,header = header, mode = 'a', encoding = 'utf-8',index = False)
valid.to_csv(valid_buffer,header=header, mode = 'a', encoding = 'utf-8',index = False)
I_CHUNK += 1
return output_buffer, valid_buffer, out_dtypes
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019
Reads source data from a data model with fixed width fields to a pandas
DataFrame.
Uses the data model layout to first find sections in the data and internally
store the data in sections, then reads in, decodes and converts the elements on
a section by section basis and finally merges that together in the output
dataframe.
Internally works assuming highest complexity in the input data model:
multiple non sequential sections
@author: iregon
"""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
# CAREFULL HERE:
# Note that in Python 3, the io.open function is an alias for the built-in open function.
# The built-in open function only supports the encoding argument in Python 3, not Python 2.
# https://docs.python.org/3.4/library/io.html?highlight=io
from io import StringIO as StringIO
import sys
import pandas as pd
from copy import deepcopy
import logging
import csv # To disable quoting
import mdf_reader.properties as properties
import mdf_reader.common.functions as functions
from mdf_reader.common.converters import converters
from mdf_reader.common.decoders import decoders
if sys.version_info[0] >= 3:
py3 = True
else:
py3 = False
from io import BytesIO as BytesIO
FULL_WIDTH = 100000
# ---------------------------------------------------------------------------
# FUNCTIONS TO PERFORM INITIAL SEPARATION OF SECTIONS: MAIN IS GET_SECTIONS()
# ---------------------------------------------------------------------------
def extract_data():
section_len = section_lens.get(threads[thread_id]['section'])
if section_len:
threads[thread_id]['data'] = pd.DataFrame(threads[thread_id]['parent_data'][0].str[0:section_len]) # object consistency needed here
threads[thread_id]['modulo'] = pd.DataFrame(threads[thread_id]['parent_data'][0].str[section_len:]) # object consistency needed here
else:
threads[thread_id]['data'] = pd.DataFrame(threads[thread_id]['parent_data'][0].str[0:]) #threads[thread_id]['parent_data'].copy()
# Could even be like with section_len (None in section_len will read to the end)
threads[thread_id]['modulo'] = pd.DataFrame(columns = [0]) # Just for consistency
del threads[thread_id]['parent_data']
def add_next_children():
global children_parsing_order, branch_parsing_order, children_group_type, children_group_number
children_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
branch_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
children_group_type = list(children_parsing_order[0])[0]
children_group_number = threads[thread_id]['children_group_number']
threads[thread_id]['children_no'] = 0
threads[thread_id]['children'] = []
add_children()
def add_higher_group_children():
global children_parsing_order, branch_parsing_order, children_group_type, children_group_number
children_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
children_parsing_order.pop(0) # Move to next group of sections
if len(children_parsing_order) > 0:
branch_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
branch_parsing_order.pop(0)
children_group_type = list(children_parsing_order[0])[0]
children_group_number = threads[thread_id]['children_group_number'] + 1
add_children()
def add_children():
if children_group_type == 's':
add_static_children()
else:
add_dynamic_children()
def add_static_children():
threads[thread_id]['children_no'] += 1
children_thread_id = str(children_group_number) + str(0) + thread_id
threads[thread_id]['children'].append(children_thread_id)
# Now build children's thread
children_section = children_parsing_order[0][children_group_type].pop(0)
grandchildren_group_number = children_group_number
if len(children_parsing_order[0][children_group_type]) == 0:
children_parsing_order.pop(0)
if len(children_parsing_order) > 0:
grandchildren_group_number += 1
else:
grandchildren_group_number = None
threads[children_thread_id] = {'parsing_order':children_parsing_order}
threads[children_thread_id]['group_number'] = children_group_number
threads[children_thread_id]['group_type'] = children_group_type
threads[children_thread_id]['section'] = children_section
threads[children_thread_id]['parent_data'] = threads[thread_id]['modulo']
threads[thread_id]['modulo'].iloc[0:0] # Remove reports from modulo
threads[children_thread_id]['children_group_number'] = grandchildren_group_number
def add_dynamic_children():
for i in range(0,len(children_parsing_order[0][children_group_type])):
branch_i_parsing_order = deepcopy(branch_parsing_order)
children_thread_id = str(children_group_number) + str(i+1) + thread_id
# Now build children's thread
children_section = children_parsing_order[0][children_group_type].pop(0)
children_idx = threads[thread_id]['modulo'].loc[threads[thread_id]['modulo'][0].str[0:sentinals_lens.get(children_section)] == sentinals.get(children_section)].index
if len(children_idx) == 0:
continue
threads[thread_id]['children'].append(children_thread_id)
threads[thread_id]['children_no'] += 1
branch_i_parsing_order[0][children_group_type].remove(children_section)
grandchildren_group_number = children_group_number
if len(branch_i_parsing_order[0][children_group_type]) == 0 or children_group_type == 'e':
branch_i_parsing_order.pop(0)
if len(children_parsing_order) > 0:
grandchildren_group_number += 1
else:
grandchildren_group_number = None
threads[children_thread_id] = {'parsing_order':branch_i_parsing_order}
threads[children_thread_id]['group_number'] = children_group_number
threads[children_thread_id]['group_type'] = children_group_type
threads[children_thread_id]['section'] = children_section
threads[children_thread_id]['parent_data'] = threads[thread_id]['modulo'].loc[children_idx]
threads[thread_id]['modulo'].drop(children_idx,inplace = True)
threads[children_thread_id]['children_group_number'] = grandchildren_group_number
if (len(threads[thread_id]['modulo'])) > 0:
add_higher_group_children()
def get_sections(df_in):
# threads elements:
# 'parsing_order' What needs to be applied to current parent data
# 'group_number' Order in the global parsing order
# 'group_type' Is it sequential, exclusive or optional
# 'section' Section name to be extracted from parent_data to data
# 'parent_data' Inital data from which section must be extracted
# 'data' Section data extracted from parent_data
# 'modulo' Reminder of parent_data after extracting section (data)
# 'children_no' Number of children threads to build, based on next parsing order list element. Resets to number of active children
# 'children' Thread id for every child
# 'children_group_number' Group number (in the global parsing order, of the children)
global sentinals, section_lens, sentinal_lens, parsing_order
global children_group_type
global threads
global thread_id
global group_type
# Initial "node': input data
threads = dict()
thread_id = '00'
threads_queue = [thread_id]
threads[thread_id] = {'parsing_order':parsing_order}
threads[thread_id]['group_number'] = 0
threads[thread_id]['group_type'] = None
threads[thread_id]['section'] = None
threads[thread_id]['parent_data'] = df_in
threads[thread_id]['data'] = None
threads[thread_id]['modulo'] = threads[thread_id]['parent_data']
del threads[thread_id]['parent_data']
threads[thread_id]['children_group_number'] = 1
add_next_children()
threads_queue.extend(threads[thread_id]['children'])
threads_queue.remove(thread_id)
# And now, once initialized, let it grow:
logging.info('Processing section partitioning threads')
while threads_queue:
thread_id = threads_queue[0]
logging.info('{} ...'.format(thread_id))
group_type = threads[thread_id]['group_type']
# get section data
extract_data()
# kill thread if nothing there
if len(threads[thread_id]['data']) == 0:
del threads[thread_id]
logging.info('{} deleted: no data'.format(thread_id))
threads_queue.pop(0)
continue
# build children threads
if len(threads[thread_id]['parsing_order']) > 0 and len(threads[thread_id]['modulo']) > 0:
add_next_children()
threads_queue.extend(threads[thread_id]['children'])
#del threads[thread_id]['modulo'] # not until we control what to do whit leftovers....
threads_queue.pop(0)
logging.info('done')
section_dict = dict()
section_groups = [ d[x] for d in parsing_order for x in d.keys() ]
sections = [item for sublist in section_groups for item in sublist]
for section in sections:
section_dict[section] = pd.DataFrame() # Index as initial size to help final merging
thread_ids = [ x for x in threads.keys() if threads[x]['section'] == section ]
for thread_id in thread_ids:
section_dict[section] = section_dict[section].append(threads[thread_id]['data'],ignore_index=False,sort=True)
section_dict[section].sort_index(inplace=True)
return section_dict
# ---------------------------------------------------------------------------
# MAIN FUNCTIONS
# ---------------------------------------------------------------------------
def source_to_df(TextParser, schema, read_sections, idx_offset = 0):
global sentinals, section_lens, sentinals_lens
global parsing_order
section_lens = { section: schema['sections'][section]['header'].get('length') for section in schema['sections'].keys()}
sentinals = { section: schema['sections'][section]['header'].get('sentinal') for section in schema['sections'].keys()}
sentinals_lens = { section: schema['sections'][section]['header'].get('sentinal_length') for section in schema['sections'].keys()}
parsing_order = schema['header']['parsing_order']
chunk_len = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else 0
multiindex = True if len(read_sections) > 1 or read_sections[0] != properties.dummy_level else False
out_dtypes = dict()
if multiindex:
for section in read_sections:
out_dtypes.update({ (section,i):properties.pandas_dtypes.get(schema['sections'][section]['elements'][i].get('column_type')) for i in schema['sections'][section]['elements'].keys() } )
else:
for section in read_sections:
out_dtypes.update({ i:properties.pandas_dtypes.get(schema['sections'][section]['elements'][i].get('column_type')) for i in schema['sections'][section]['elements'].keys() } )
I_CHUNK = 0
output_buffer = StringIO() if py3 else BytesIO()
valid_buffer = StringIO() if py3 else BytesIO()
for df in TextParser: # Indices here are kept are those according to the full record
logging.info('Processing chunk {}'.format(I_CHUNK))
# 1. Remove delimiter from mixed type (like meds-buoys) if exists
if schema['header'].get('delimiter'):
df.loc[:,0] = df[0].str.replace(',','')
# 2. Separate sections
logging.info('Accessing sections'.format(I_CHUNK))
section_strings = get_sections(df)
# 3. Read section elements
# Look below, if names are passed to read_csv as tuples, it creates automatically a multiindex. Initially I did not want this, wanted tuples
# but they do not seem to be recommended:https://github.com/pandas-dev/pandas/issues/11799
# The mapping (like in CDM or others), would be a bit harder if using multiindex...or will get the tuples equally fine
# 3.0. Prepare df to paste all the sections of current chunk and get initially defined dtypes
df_out = pd.DataFrame()
valid_out = pd.DataFrame()
# We have to cat the data, and also the names and properties to write this to the buffer
logging.info('Reading and coverting section elements')
# 3.1. Loop through sections
for section in read_sections:
logging.info('{} ...'.format(section))
section_buffer = StringIO() if py3 else BytesIO()
# Writing options from quoting on to prevent supp buoy data to be quoted:
# maybe this happenned because buoy data has commas, and pandas makes its own decission about
# how to write that.....
#https://stackoverflow.com/questions/21147058/pandas-to-csv-output-quoting-issue
section_strings[section].to_csv(section_buffer,header=False, encoding = 'utf-8',index = False,quoting=csv.QUOTE_NONE,escapechar="\\",sep="\t") # Here indices are lost, have to give the real ones, those in section_strings
shut_up = section_buffer.seek(0)
# 3.1.1. Read section elements from schema and read from buffer with pandas as objects
section_names = schema['sections'][section]['elements'].keys()
ignore = [ i for i in section_names if schema['sections'][section]['elements'][i].get('ignore') ] # evals to True if set and true, evals to False if not set or set and false
section_widths = list(map(lambda x: x if x else FULL_WIDTH, [ schema['sections'][section]['elements'][i].get('field_length') for i in section_names ]))
section_dtypes = { i:'object' for i in section_names }
section_missing = { i:schema['sections'][section]['elements'][i].get('missing_value') if schema['sections'][section]['elements'][i].get('disable_white_strip') == True
else [schema['sections'][section]['elements'][i].get('missing_value')," "*schema['sections'][section]['elements'][i].get('field_length', FULL_WIDTH)]
for i in section_names }
section_elements = pd.read_fwf(section_buffer, widths = section_widths, header = None, names = section_names , na_values = section_missing, delimiter="\t", encoding = 'utf-8', dtype = section_dtypes, skip_blank_lines = False )
section_valid = pd.DataFrame(index = section_elements.index, columns = section_elements.columns)
# 3.1.2. Decode, scale|offset and convert to dtype (ints will be floats if NaNs)
section_dtypes = { i:schema['sections'][section]['elements'][i]['column_type'] for i in section_names }
encoded = [ (x) for x in section_names if 'encoding' in schema['sections'][section]['elements'][x]]
section_encoding = { i:schema['sections'][section]['elements'][i]['encoding'] for i in encoded }
for element in section_dtypes.keys():
missing = section_elements[element].isna()
if element in encoded:
section_elements[element] = decoders.get(section_encoding.get(element)).get(section_dtypes.get(element))(section_elements[element])
kwargs = { converter_arg:schema['sections'][section]['elements'][element].get(converter_arg) for converter_arg in properties.data_type_conversion_args.get(section_dtypes.get(element)) }
section_elements[element] = converters.get(section_dtypes.get(element))(section_elements[element], **kwargs)
section_valid[element] = missing | section_elements[element].notna()
# 3.1.3. Format section:
# - Put data in its rightfull place of the original data (indexing!) and remove section elements not desired
# - Name columns: tuples (section, element_name) for multisection, element_name if one section
# tuples will turn into a multiindex in the df_reader below
section_elements.index = section_strings[section].index
section_valid.index = section_strings[section].index
section_frame = pd.DataFrame(data = { x:pd.Series(index=range(idx_offset + I_CHUNK*chunk_len, idx_offset + len(df) + I_CHUNK*chunk_len )) for x in section_names})
valid_frame = section_frame.copy()
if len(section_elements) > 0:
section_frame.loc[section_elements.index] = section_elements
valid_frame.loc[section_elements.index] = section_valid
section_frame.drop(ignore, axis = 1, inplace = True)
valid_frame.drop(ignore, axis = 1, inplace = True)
section_frame.columns = [ (section, x) for x in section_frame.columns] if multiindex else section_frame.columns
valid_frame.columns = section_frame.columns
out_dtypes.update({ i:section_frame[i].dtype.name for i in section_frame if section_frame[i].dtype.name in properties.numpy_floats})
# 3.1.4. Paste section to rest of chunk
df_out = pd.concat([df_out,section_frame],sort = False,axis=1)
valid_out = pd.concat([valid_out,valid_frame],sort = False,axis=1)
# 4.3. Add _datetime section: watch this if we mean to do multiple reports in record!!!
# for this to be valid, would have to assume that same day reports and that date in common report section....
# If datetime is derived from within the actual report, then we would have add _datetime after expansion on dataframe posprocessing
if schema['header'].get('date_parser'):
date_parser = schema['header'].get('date_parser')
date_name = ('_datetime','_datetime') if multiindex else '_datetime'
date_elements = [(date_parser['section'],x) for x in date_parser['elements'] ] if date_parser.get('section') else date_parser.get('elements')
out_dtypes.update({ date_name: 'object' })
df_out = functions.df_prepend_datetime(df_out, date_elements, date_parser['format'], date_name = date_name )
valid_out = pd.concat([pd.DataFrame(index = valid_out.index, data = True,columns = [date_name]),valid_out],sort = False,axis=1)
# 4.4. Add chunk data to output
header = False if I_CHUNK == 0 else False
df_out.to_csv(output_buffer,header=header, mode = 'a', encoding = 'utf-8',index = False)
valid_out.to_csv(valid_buffer,header=header, mode = 'a', encoding = 'utf-8',index = False)
I_CHUNK += 1
return output_buffer,valid_buffer,out_dtypes
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment