Commit 0a2f4c89 authored by Irene Perez Gonzalez's avatar Irene Perez Gonzalez
Browse files

First commit

parents
/**/.DS_Store
/**/*.py[cod]
/**/__pycache__
Tool_validation.py
A. Get code from GitHub
Download zip from https://github.com/perezgonzalez-irene/mdf_reader.git
Clone repo: git clone https://github.com/perezgonzalez-irene/mdf_reader.git
B. Create \____init____.py in code parent directory so python treats it as containing a package
touch \____init____.py
C. Create python3 virtual environment in code directory
(1) cd mdf_reader
(2) Create environment
local:
python3 -m virtualenv --system-site-packages myenv
jasmin: (python3 still not default/operative, but following works)
1. Set path and activate conda environment
export PATH=/apps/contrib/jaspy/miniconda_envs/jaspy3.7/m3-4.5.11/bin:$PATH
source activate jaspy3.7-m3-4.5.11-r20181219
2. Create your own virtualenv - you only do this once!
virtualenv --system-site-packages myenv
(3) Activate environment:
source ./myenv/bin/activate
(4) Install specific package versions:
pip install -r requirements.txt
To deactivate environment:
deactivate
D. Add module parent directory to python path (PYTHONPATH env variable)
- from terminal:
export PYTHONPATH=$toolParentDirectory:${PYTHONPATH}
- In python:
import sys
sys.path.append(toolParentDirectory)
E. Run a test:
import mdf_reader
import matplotlib.pyplot as plt
data = mdf_reader.tests.imma1_buoys_nosupp()
data = mdf_reader.tests.imma1_buoys_supp()
data = td11_deck187_nosupp()
ax = data[section_name][field_name].plot(label='x')
data[section_name][field_name].plot(ax = ax ,label='y')
....
plt.show()
# Following to access the subpackages main modules (or/and functions) directly wihout loops through the full subpackage path
from .schemas import schemas as schemas
from .schemas import code_tables as code_tables
from .tests import tests as tests
from .read import read as read
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from mdf_reader import properties
import numpy as np
import pandas as pd
# 1. dtype must be defined in dtype_properties.data_types
#>>> if not np.dtype('int8'):
#... print('No data type')
#...
#>>> if not np.dtype('int786'):
#... print('No data type')
#...
#Traceback (most recent call last):
# File "<stdin>", line 1, in <module>
#TypeError: data type "int786" not understood
#
# Watch this, for my objects I want to catch both empty and blank strings as missing
# empty_string = ''
# blank_string = ' '
# len(empty_string) == 0
# len(blank_string) != 0
# len(empty_string) == len(blank_string.lstrip()) == 0
# So, we'll eval: len(value.lstrip())
#
# return data.astype(self.dtype, casting = 'safe')
# safe casting specifies, otherwise converts np.nan to some real number depending on dtype.
class df_converters():
def __init__(self, dtype):
self.dtype = dtype
self.numeric_scale = 1. if self.dtype in properties.numpy_floats else 1
self.numeric_offset = 0. if self.dtype in properties.numpy_floats else 0
def object_to_numeric(self, data, scale = None, offset = None):
scale = scale if scale else self.numeric_scale
offset = offset if offset else self.numeric_offset
# Convert to numeric, then scale (?!) and give it's actual int type
data = pd.to_numeric(data,errors = 'coerce') # astype fails on strings, to_numeric manages errors....!
data = offset + data * scale
try:
return data.astype(self.dtype, casting = 'safe')
except:
return data
def object_to_object(self,data,missing_value = None,disable_white_strip = False):
# With strip() an empty element after stripping, is just an empty element, no NaN...
if not disable_white_strip:
return data.str.strip()
else:
if disable_white_strip == 'l':
return data.str.rstrip()
elif disable_white_strip == 'r':
return data.str.lstrip()
else:
return data
def object_to_datetime(self,data, datetime_format = "%Y%m%d"):
data = pd.to_datetime(data, format = datetime_format, errors = 'coerce')
return data
converters = dict()
for dtype in properties.numeric_types:
converters[dtype] = df_converters(dtype).object_to_numeric
converters['datetime'] = df_converters('datetime').object_to_datetime
converters['str'] = df_converters('str').object_to_object
converters['object'] = df_converters('object').object_to_object
converters['key'] = df_converters('key').object_to_object
\ No newline at end of file
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import numpy as np
import pandas as pd
import string
import logging
from mdf_reader import properties
#for importer, modname, ispkg in pkgutil.walk_packages(path=package.__path__,prefix=package.__name__+'.',onerror=lambda x: None):
# print(modname.split(".")[-1])
# TO DECODE FROM OBJECT TO INTEGER
#
# Decodes input object type pd.series to a specified data type
#
# On missing data, the resulting DATA type in numerics will be as integer promotion to accomodate np.nan:
# Promotion dtype for storing NAs: integer cast to float64
# (https://pandas.pydata.org/pandas-docs/version/0.22/gotchas.html#nan-integer-na-values-and-na-type-promotions)
#
# return base10.astype(self.dtype, casting = 'safe')
# safe casting specified, otherwise converts np.nan to some number depending on dtype.
def signed_overpunch_i(x):
# Blanks and np.nan as missing data
# In TDF-11, mix of overpunch and no overpunch: include integers in dictionary
# Define decoding dictionary: should do this smart-like: None where non-existing keys!!!!
overpunch_number = { string.digits[i]:str(i) for i in range(0,10)}
overpunch_number.update({ string.ascii_uppercase[i]:str(i+1) for i in range(0,9)})
overpunch_number.update({ string.ascii_uppercase[i]:str(i-8) for i in range(9,18)})
overpunch_number.update({'{':str(0)})
overpunch_number.update({'<':str(0)})
overpunch_number.update({'}':str(0)})
overpunch_number.update({'!':str(0)})
overpunch_factor = { string.digits[i]:1 for i in range(0,10)}
overpunch_factor.update({ string.ascii_uppercase[i]:1 for i in range(0,9)})
overpunch_factor.update({ string.ascii_uppercase[i]:-1 for i in range(9,18)})
overpunch_factor.update({'}':-1})
overpunch_factor.update({'!':-1})
overpunch_factor.update({'{':1})
overpunch_factor.update({'<':1})
try:
n = "".join(list(map(lambda x: overpunch_number.get(x,np.nan), list(x) ))) if x==x else np.nan
f = np.prod(list(map(lambda x: overpunch_factor.get(x,np.nan), list(x) ))) if x==x else np.nan
converted = f*int(n) if f and n and n == n and f == f else np.nan
return converted
except Exception as e:
print('ERROR decoding element: {}'.format(x))
print(e)
print('Conversion sequence:')
try:
print('number base conversion: {}'.format(n))
except:
pass
try:
print('factor conversion: {}'.format(f))
except:
pass
return np.nan
class df_decoders():
def __init__(self, dtype):
self.dtype = dtype if dtype in properties.numeric_types else 'object'
def signed_overpunch(self, data ):
decoded_numeric = np.vectorize(signed_overpunch_i,otypes=[float])(data)
try:
return decoded_numeric.astype(self.dtype, casting = 'safe')
except:
return decoded_numeric
def base36(self, data):
# int(str(np.nan),36) ==> 30191
# Had to do the following because the astype() below did not seem to convert
# to object element-wise, but the full thing. As a result, str methods
# in converters from objects originating here were failing: the column
# was dtype = 'object', but the elements inside where 'int'....
# Checked that manually a seemed to be happening that way....
if self.dtype == 'object' :
base10 = np.array([str(int(str(i), 36)) if i == i and i else np.nan for i in data ])
else:
base10 = np.array([int(str(i), 36) if i == i and i else np.nan for i in data ])
try:
return base10.astype(self.dtype, casting = 'safe')
except:
return base10
decoders = dict()
decoders['signed_overpunch'] = dict()
for dtype in properties.numeric_types:
decoders['signed_overpunch'][dtype] = df_decoders(dtype).signed_overpunch
decoders['signed_overpunch']['key'] = df_decoders('key').signed_overpunch
decoders['base36'] = dict()
for dtype in properties.numeric_types:
decoders['base36'][dtype] = df_decoders(dtype).base36
decoders['base36']['key'] = df_decoders('key').base36
## Now add the file format specific decoders
#import pkgutil
#import importlib
#from mdf_reader import fs_decoders
#package=fs_decoders
#for importer, modname, ispkg in pkgutil.walk_packages(path=package.__path__,prefix=package.__name__+'.',onerror=lambda x: None):
# file_format = modname.split(".")[-1]
# try:
# file_format_decoders = importlib.import_module(modname, package=None).decoders
# for decoder in file_format_decoders.keys():
# decoders[".".join([file_format,decoder])] = file_format_decoders.get(decoder)
# except Exception as e:
# logging.error("Error loading {0} decoders: {1}".format(modname,e))
#
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
import pandas as pd
def df_prepend_datetime(df,date_columns,date_format,date_name = "_datetime"):
to_convert = df[date_columns].astype(str).apply("-".join, axis=1)
return pd.concat([pd.DataFrame(pd.to_datetime(to_convert, format = "-".join(date_format), errors = 'coerce'),columns = [date_name]),df],sort = False,axis=1)
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Apr 3 08:45:03 2019
@author: iregon
"""
import logging
def init_logger(module, level = 'INFO', fn=None):
# !!! here overriide potential previous config of logging
from imp import reload # python 2.x don't need to import reload, use it directly
reload(logging)
level = logging.getLevelName(level)
logging_params = {
'level': level,
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
}
if fn is not None:
logging_params['filename'] = fn
logging.basicConfig(**logging_params)
logging.info('init basic configure of logging success')
return logging.getLogger(module)
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 2 10:34:56 2019
Assumes we are never writing a header!
@author: iregon
"""
import pandas as pd
import mdf_reader.common.logging_hdlr as logging_hdlr
logger = logging_hdlr.init_logger(__name__,level = 'DEBUG')
def restore(TextParser_ref,TextParser_options):
try:
TextParser_ref.seek(0)
TextParser = pd.read_csv( TextParser_ref, names = TextParser_options['names'],chunksize = TextParser_options['chunksize'], dtype = TextParser_options['dtype']) #, skiprows = options['skiprows'])
return TextParser
except Exception as e:
logger.error('Failed to restore TextParser', exc_info=True)
return TextParser
def is_not_empty(TextParser):
try:
TextParser_ref = TextParser.f
TextParser_options = TextParser.orig_options
except Exception as e:
logger.error('Failed to process input. Input type is {}'.format(type(TextParser)), exc_info=True)
return
try:
first_chunk = TextParser.get_chunk()
TextParser = restore(TextParser_ref,TextParser_options)
if len(first_chunk) > 0:
logger.debug('Is not empty')
return True, TextParser
else:
return False, TextParser
except Exception:
return False, TextParser
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Apr 11 13:45:38 2019
Look into validation results
Need to type:
%matplotlib auto
if ipython and want to get the interactive plots!
@author: iregon
"""
import mdf_reader
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
#------------------------------------------------------------------------------
def plot_numeric_validation(data,mask,element,valid_max,valid_min, units):
bbox_props = dict(boxstyle="round", fc="w", ec="0.5", alpha=0.9)
plt.figure()
ax = data.plot(label = 'data')
# Carefull, something may not be working here in the no data tag!!!
no_data = True if len(data.loc[data.notna()]) == 0 else False
data.where(~mask).plot(marker = 'o',color = 'r',ax=ax,label='not valid')
true_value = data.where(mask).median()
if not true_value or np.isnan(true_value):
if valid_max != None and valid_min != None:
true_value = valid_min + (valid_max - valid_min)/2.
else:
true_value = 1
falses = pd.Series(index = data.index, data = true_value)
falses.where(~mask & data.isna()).plot(marker = 'o',color = 'r',ax=ax,label='_nolegend_')
trues = pd.Series(index = data.index,data = true_value )
trues.where(mask).plot(color = 'YellowGreen',ax=ax,label='valid')
if valid_max != None and valid_min != None:
ax.fill_between(data.index, valid_min,valid_max,
facecolor='DarkSlateGrey', alpha=0.25, interpolate=False, label='valid range',zorder=5)
ax.grid(linestyle=':',which='major',color='grey')
if units:
ax.set_ylabel(units, fontsize=10)
ax.set_xlabel('idx')
if no_data:
ax.text((ax.get_xlim()[1] - ax.get_xlim()[0])/2,ax.get_ylim()[0] + (ax.get_ylim()[1] - ax.get_ylim()[0])/2, "no data", ha="center", va="center", size=20,bbox=bbox_props)
plt.legend()
plt.title(element + ' validation')
plt.show()
def plot_categorical_validation(data,mask,element,codetable):
merged = pd.concat([data,mask],axis = 1)
merged.columns = ['data','mask']
counts = pd.DataFrame(index =merged['data'].value_counts(dropna = False).sort_index().index )
counts['Data'] = merged['data'].value_counts(dropna = False).sort_index()
# Watch here, need to convert to str so that NaNs are not removed from the grouping!
# Could be dangerous if we are not just counting, but then, we would not need the NaNs...
counts.index = counts.index.fillna(str(np.nan))
counts['Not valid'] = merged.astype(str).query('mask == "False"').groupby('data').count()
counts['Valid'] = merged.astype(str).query('mask == "True"').groupby('data').count()
fig = plt.figure() # Create matplotlib figure
ax = fig.add_subplot(111) # Create matplotlib axes
ax2 = ax.twinx() # Create another axes that shares the same x-axis as ax.
width = 0.4
counts['Data'].plot(kind='bar', ax=ax, width=width, position=1, color = 'DarkCyan', label = 'data counts')
counts[['Not valid','Valid']].plot(kind='bar', stacked=True, ax=ax2, width=width, position=0,legend = False, color = ['DarkRed','YellowGreen'])
ax.set_yscale("log")
ax.set_ylabel('data counts', fontsize=10)
ax2.set_yscale("log")
ax2.set_ylabel('validation counts', fontsize=10)
ax.legend(loc = 2)
ax2.legend()
ax.grid(linestyle=':',which='major',color='grey')
plt.title(element + " validation \n Codetable: " + codetable)
ax.set_xlim(-1,len(counts))
ax.set_xlabel('code')
ax2.set_xlim(-1,len(counts))
ax.set_ylim(0.5,ax.get_ylim()[1])
ax2.set_ylim(0.5,ax2.get_ylim()[1])
plt.show()
#------------------------------------------------------------------------------
def plot_model_validation(imodel):
for element in imodel['atts'].keys():
title_element = element if not isinstance(element,tuple) else element[1] + " (" + element[0] + ")"
dtype = imodel['atts'].get(element).get('column_type')
if dtype in mdf_reader.properties.numeric_types:
valid_max = imodel['atts'].get(element).get('valid_max')
valid_min = imodel['atts'].get(element).get('valid_min')
units = imodel['atts'].get(element).get('units')
plot_numeric_validation(imodel['data'][element],imodel['valid_mask'][element],title_element, valid_max, valid_min, units)
elif dtype == 'key':
# ...mmm should account for multi-keyed combinations
codetable = imodel['atts'].get(element).get('codetable')
if not codetable:
codetable = 'undefined'
plot_categorical_validation(imodel['data'][element],imodel['valid_mask'][element],title_element,codetable)
*
*/
!.gitignore
!User_manual.docx
File added
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
"""
import glob
import os
import io
import pandas as pd
# Supported formats, sources and internal data models -------------------------
supported_meta_file_formats = ['fixed_width','delimited']
schema_path = os.path.join(os.path.dirname(__file__),'schemas','lib')
supported_file_formats = [ os.path.basename(x).split(".")[0] for x in glob.glob(schema_path + '/*/*.json') if os.path.basename(x).split(".")[0] == os.path.dirname(x).split("/")[-1]]
supported_sources = [pd.DataFrame, pd.io.parsers.TextFileReader, io.StringIO]
# Data types ------------------------------------------------------------------
numpy_integers = ['int8','int16','int32','int64','uint8','uint16','uint32','uint64']
numpy_floats = ['float16','float32','float64']
numeric_types = numpy_integers.copy()
numeric_types.extend(numpy_floats)
object_types = ['str','object','key','datetime']
data_types = object_types.copy()
data_types.extend(numpy_integers)
data_types.extend(numpy_floats)
pandas_dtypes = {}
for dtype in object_types:
pandas_dtypes[dtype] = 'object'
pandas_dtypes.update({ x:x for x in numeric_types })
# ....and how they are managed
data_type_conversion_args = {}
for dtype in numeric_types:
data_type_conversion_args[dtype] = ['scale','offset']
data_type_conversion_args['str'] = ['disable_white_strip']
data_type_conversion_args['object'] = ['disable_white_strip']
data_type_conversion_args['key'] = ['disable_white_strip']
data_type_conversion_args['datetime'] = ['datetime_format']
# Misc ------------------------------------------------------------------------
tol = 1E-10
dummy_level = '_section'
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019
Reads source data (file, pandas DataFrame or pd.io.parsers.TextFileReader) to
a pandas DataFrame. The source data model needs to be input to the module as
a named model (included in the module) or as the path to a data model.
Data is validated against its data model after reading, producing a boolean mask.
Calls the schemas, reader and valiate modules in the tool to access the data models,
read the data and validate it.
@author: iregon
"""
import os
import sys
import pandas as pd
from mdf_reader.reader import reader as reader
from mdf_reader.validate import validate as validate
import mdf_reader.schemas as schemas
import mdf_reader.properties as properties
import mdf_reader.common.pandas_TextParser_hdlr as pandas_TextParser_hdlr
import logging
import json
def read(source, data_model = None, data_model_path = None, sections = None,chunksize = None,
supp_section = None, supp_model = None, supp_model_path = None,
skiprows = None, out_path = None ):
logging.basicConfig(format='%(levelname)s\t[%(asctime)s](%(filename)s)\t%(message)s',
level=logging.INFO,datefmt='%Y%m%d %H:%M:%S',filename=None)
# 0. Make sure min info is available
if not data_model and not data_model_path:
logging.error('A valid data model name or path to data model must be provided')
return
if not isinstance(source,tuple(properties.supported_sources)):
if not source:
logging.error('Data source is empty (first argument to read()) ')
return
elif not os.path.isfile(source):
logging.error('Can\'t reach data source {} as a file'.format(source))
logging.info('Supported in-memory data sources are {}'.format(",".join(properties.supported_sources)))
return
# 1. Read schema(s) and get file format
logging.info("READING DATA MODEL SCHEMA FILE...")
schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path)
if not schema:
return
if supp_section:
logging.info("READING SUPPLEMENTAL DATA MODEL SCHEMA FILE...")
supp_schema = schemas.read_schema( schema_name = supp_model, ext_schema_path = supp_model_path)
if not supp_schema:
return
else:
supp_schema = None
# 2. Read data
imodel = data_model if data_model else data_model_path
logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel))
data, valid = reader.read_model(source,schema, sections = sections, chunksize = chunksize, skiprows = skiprows)
# 3. Read additional format: on error, return what's been read so far...
# Mmmmm, make sure we can mix meta_file_formats: eg. core('FIXED_WIDTH')-supp("DELIMITED")
if supp_section:
i_suppmodel = supp_model if supp_model else supp_model_path
logging.info("EXTRACTING SUPPLEMENTAL DATA FROM MODEL: {}".format(i_suppmodel))
data, valid = reader.add_supplemental(data, supp_section, supp_schema, valid)
if isinstance(data,pd.io.parsers.TextFileReader):
logging.info('...RESTORING DATA PARSER')
data = pandas_TextParser_hdlr.restore(data.f,data.orig_options)
# 4. Create out data attributes
logging.info("CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL(S)")
data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names']
out_atts = schemas.df_schema(data_columns, schema, data_model, supp_section = supp_section, supp_schema = supp_schema, supp_model = supp_model )
# 5. Complete data validation
logging.info("VALIDATING DATA")
valid = validate.validate(data, out_atts, valid, data_model = data_model, data_model_path = data_model_path, supp_section = supp_section, supp_model = supp_model, supp_model_path = supp_model_path)
if isinstance(data,pd.io.parsers.TextFileReader):
logging.info('...RESTORING DATA PARSER')
data = pandas_TextParser_hdlr.restore(data.f,data.orig_options)
if out_path:
logging.info('WRITING DATA TO FILES IN: {}'.format(out_path))
cols = [ x for x in data ]
if isinstance(cols[0],tuple):
header = [":".join(x) for x in cols]
out_atts_json = { ":".join(x):out_atts.get(x) for x in out_atts.keys() }
else:
header = cols
out_atts_json = out_atts
data.to_csv(os.path.join(out_path,'data.csv'), header = header, encoding = 'utf-8',index = True, index_label='index')
valid.to_csv(os.path.join(out_path,'valid_mask.csv'), header = header, encoding = 'utf-8',index = True, index_label='index')
with open(os.path.join(out_path,'atts.json'),'w') as fileObj:
json.dump(out_atts_json,fileObj,indent=4)
return {'data':data,'atts':out_atts,'valid_mask':valid}
if __name__=='__main__':
kwargs = dict(arg.split('=') for arg in sys.argv[2:])
if 'sections' in kwargs.keys():
kwargs.update({ 'sections': [ x.strip() for x in kwargs.get('sections').split(",")] })
read(sys.argv[1],
**kwargs) # kwargs
\ No newline at end of file
from . import fixed_width
from . import delimited
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019
Reads source data from a data model with delimited fields to a pandas
DataFrame.
Assumes source data as data model layout and all sections and elements in data.
Reads in full data content, then decodes and converts the elements.
Internally works assuming highest complexity in the input data model:
multiple sequential sections
@author: iregon
"""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
# CAREFULL HERE:
# Note that in Python 3, the io.open function is an alias for the built-in open function.
# The built-in open function only supports the encoding argument in Python 3, not Python 2.
# https://docs.python.org/3.4/library/io.html?highlight=io
import io
from io import open # To allow for encoding definition
from io import StringIO as StringIO
import sys
import pandas as pd
import numpy as np
from copy import deepcopy
import logging
import csv # To disable quoting
import mdf_reader.common.functions as functions
from mdf_reader.common.converters import converters
from mdf_reader.common.decoders import decoders
import mdf_reader.properties as properties
if sys.version_info[0] >= 3:
py3 = True
else:
py3 = False
from io import BytesIO as BytesIO
# ---------------------------------------------------------------------------
# FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS, AN ITERABLE:
# EITHER A PD.IO.PARSERS.TEXTFILEREADER OR A LIST, DEPENDING ON
# SOURCE TYPE AND CHUNKSIZE ARGUMENT
# BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE
# ---------------------------------------------------------------------------
def source_11(source, schema, chunksize = None, skiprows = None, delimiter = ',' ):
# 11: 1 REPORT PER RECORD IN ONE LINE
if isinstance(source,pd.DataFrame):
TextParser = source
TextParser = [TextParser]
elif isinstance(source,pd.io.parsers.TextFileReader):
TextParser = source
else:
names = [ (x,y) for x in schema['sections'].keys() for y in schema['sections'][x]['elements'].keys()]
missing = { x:schema['sections'][x[0]]['elements'][x[1]].get('missing_value') for x in names }
TextParser = pd.read_csv(source,header = None, delimiter = delimiter, encoding = 'utf-8',
dtype = 'object', skip_blank_lines = False, chunksize = chunksize,
skiprows = skiprows, names = names, na_values = missing)
if not chunksize:
TextParser = [TextParser]
return TextParser
def source_1x(source,schema, chunksize = None, skiprows = None, delimiter = ',' ):
# 1X: MULTIPLE REPORTS PER RECORD IN ONE LINE
return source_11(source,schema, chunksize = chunksize, skiprows = skiprows, delimiter = ',' )
# ---------------------------------------------------------------------------
# MAIN FUNCTIONS
# ---------------------------------------------------------------------------
def source_to_df(source, schema, read_sections, idx_offset = 0):
column_names = []
for section in schema['sections']:
column_names.extend([ (section,i) for i in schema['sections'][section]['elements'] ])
multiindex = True if len(read_sections) > 1 or read_sections[0] != properties.dummy_level else False
out_dtypes = dict()
out_dtypes.update({ (section,i):properties.pandas_dtypes.get(schema['sections'][section]['elements'][i].get('column_type')) for i in schema['sections'][section]['elements'].keys() } )
I_CHUNK = 0
output_buffer = StringIO() if py3 else BytesIO()
valid_buffer = StringIO() if py3 else BytesIO()
for df in source: # Indices here are kept are those according to the full record
logging.info('Processing chunk {}'.format(I_CHUNK))
# 0. Name columns
df.columns = pd.MultiIndex.from_tuples(column_names)
# 1. Remove sections not requested
drop_sections = [ x for x in schema['sections'] if x not in read_sections ]
df.drop(drop_sections, axis = 1, level = 0, inplace = True)
# 2. Decode, scale|offset and convert to dtype (ints will be floats if NaNs)
dtypes = dict()
encodings = dict()
valid = pd.DataFrame(index = df.index, columns = df.columns)
for section in read_sections:
dtypes.update({ (section,i):schema['sections'][section]['elements'][i]['column_type'] for i in schema['sections'][section]['elements'] })
encoded = [ (x) for x in schema['sections'][section]['elements'] if 'encoding' in schema['sections'][section]['elements'][x]]
encodings.update({ (section,i):schema['sections'][section]['elements'][i]['encoding'] for i in encoded })
for element in dtypes.keys():
missing = df[element].isna()
if element in encoded:
df[element] = decoders.get(encodings.get(element)).get(dtypes.get(element))(df[element])
kwargs = { converter_arg:schema['sections'][element[0]]['elements'][element[1]].get(converter_arg) for converter_arg in properties.data_type_conversion_args.get(dtypes.get(element)) }
df[element] = converters.get(dtypes.get(element))(df[element], **kwargs)
valid[element] = missing | df[element].notna()
# Add _datetime section: watch this if we mean to do multiple reports in record!!!
# for this to be valid, would have to assume that same day reports and that date in common report section....
# If datetime is derived from within the actual report, then we would have add _datetime after expansion on dataframe posprocessing
if schema['header'].get('date_parser'):
date_parser = schema['header'].get('date_parser')
date_name = ('_datetime','_datetime') if multiindex else '_datetime'
date_elements = [(date_parser['section'],x) for x in date_parser['elements'] ] if date_parser.get('section') else date_parser.get('elements')
out_dtypes.update({ date_name: 'object' })
df = functions.df_prepend_datetime(df, date_elements, date_parser['format'], date_name = date_name )
valid = pd.concat([pd.DataFrame(index = valid.index, data = True,columns = [date_name]),valid],sort = False,axis=1)
out_dtypes.update({ i:df[i].dtype.name for i in df if df[i].dtype.name in properties.numpy_floats})
if idx_offset > 0:
df.index = df.index + idx_offset
# If I get into the section: is it really only removing that named element from that section???? have to check
#element[section].drop([element],axis=1,level=1,inplace = True)
# Drop level 0 in multilevel if len(read_sections)==1 or section is dummy
# 3. Add chunk data to output
header = False if I_CHUNK == 0 else False
df.to_csv(output_buffer,header = header, mode = 'a', encoding = 'utf-8',index = False)
valid.to_csv(valid_buffer,header=header, mode = 'a', encoding = 'utf-8',index = False)
I_CHUNK += 1
return output_buffer, valid_buffer, out_dtypes
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019
Reads source data from a data model with fixed width fields to a pandas
DataFrame.
Uses the data model layout to first find sections in the data and internally
store the data in sections, then reads in, decodes and converts the elements on
a section by section basis and finally merges that together in the output
dataframe.
Internally works assuming highest complexity in the input data model:
multiple non sequential sections
@author: iregon
"""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
# CAREFULL HERE:
# Note that in Python 3, the io.open function is an alias for the built-in open function.
# The built-in open function only supports the encoding argument in Python 3, not Python 2.
# https://docs.python.org/3.4/library/io.html?highlight=io
from io import StringIO as StringIO
import sys
import pandas as pd
from copy import deepcopy
import logging
import csv # To disable quoting
import mdf_reader.properties as properties
import mdf_reader.common.functions as functions
from mdf_reader.common.converters import converters
from mdf_reader.common.decoders import decoders
if sys.version_info[0] >= 3:
py3 = True
else:
py3 = False
from io import BytesIO as BytesIO
FULL_WIDTH = 100000
# ---------------------------------------------------------------------------
# FUNCTIONS TO PERFORM INITIAL SEPARATION OF SECTIONS: MAIN IS GET_SECTIONS()
# ---------------------------------------------------------------------------
def extract_data():
section_len = section_lens.get(threads[thread_id]['section'])
if section_len:
threads[thread_id]['data'] = pd.DataFrame(threads[thread_id]['parent_data'][0].str[0:section_len]) # object consistency needed here
threads[thread_id]['modulo'] = pd.DataFrame(threads[thread_id]['parent_data'][0].str[section_len:]) # object consistency needed here
else:
threads[thread_id]['data'] = pd.DataFrame(threads[thread_id]['parent_data'][0].str[0:]) #threads[thread_id]['parent_data'].copy()
# Could even be like with section_len (None in section_len will read to the end)
threads[thread_id]['modulo'] = pd.DataFrame(columns = [0]) # Just for consistency
del threads[thread_id]['parent_data']
def add_next_children():
global children_parsing_order, branch_parsing_order, children_group_type, children_group_number
children_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
branch_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
children_group_type = list(children_parsing_order[0])[0]
children_group_number = threads[thread_id]['children_group_number']
threads[thread_id]['children_no'] = 0
threads[thread_id]['children'] = []
add_children()
def add_higher_group_children():
global children_parsing_order, branch_parsing_order, children_group_type, children_group_number
children_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
children_parsing_order.pop(0) # Move to next group of sections
if len(children_parsing_order) > 0:
branch_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
branch_parsing_order.pop(0)
children_group_type = list(children_parsing_order[0])[0]
children_group_number = threads[thread_id]['children_group_number'] + 1
add_children()
def add_children():
if children_group_type == 's':
add_static_children()
else:
add_dynamic_children()
def add_static_children():
threads[thread_id]['children_no'] += 1
children_thread_id = str(children_group_number) + str(0) + thread_id
threads[thread_id]['children'].append(children_thread_id)
# Now build children's thread
children_section = children_parsing_order[0][children_group_type].pop(0)
grandchildren_group_number = children_group_number
if len(children_parsing_order[0][children_group_type]) == 0:
children_parsing_order.pop(0)
if len(children_parsing_order) > 0:
grandchildren_group_number += 1
else:
grandchildren_group_number = None
threads[children_thread_id] = {'parsing_order':children_parsing_order}
threads[children_thread_id]['group_number'] = children_group_number
threads[children_thread_id]['group_type'] = children_group_type
threads[children_thread_id]['section'] = children_section
threads[children_thread_id]['parent_data'] = threads[thread_id]['modulo']
threads[thread_id]['modulo'].iloc[0:0] # Remove reports from modulo
threads[children_thread_id]['children_group_number'] = grandchildren_group_number
def add_dynamic_children():
for i in range(0,len(children_parsing_order[0][children_group_type])):
branch_i_parsing_order = deepcopy(branch_parsing_order)
children_thread_id = str(children_group_number) + str(i+1) + thread_id
# Now build children's thread
children_section = children_parsing_order[0][children_group_type].pop(0)
children_idx = threads[thread_id]['modulo'].loc[threads[thread_id]['modulo'][0].str[0:sentinals_lens.get(children_section)] == sentinals.get(children_section)].index
if len(children_idx) == 0:
continue
threads[thread_id]['children'].append(children_thread_id)
threads[thread_id]['children_no'] += 1
branch_i_parsing_order[0][children_group_type].remove(children_section)
grandchildren_group_number = children_group_number
if len(branch_i_parsing_order[0][children_group_type]) == 0 or children_group_type == 'e':
branch_i_parsing_order.pop(0)
if len(children_parsing_order) > 0:
grandchildren_group_number += 1
else:
grandchildren_group_number = None
threads[children_thread_id] = {'parsing_order':branch_i_parsing_order}
threads[children_thread_id]['group_number'] = children_group_number
threads[children_thread_id]['group_type'] = children_group_type
threads[children_thread_id]['section'] = children_section
threads[children_thread_id]['parent_data'] = threads[thread_id]['modulo'].loc[children_idx]
threads[thread_id]['modulo'].drop(children_idx,inplace = True)
threads[children_thread_id]['children_group_number'] = grandchildren_group_number
if (len(threads[thread_id]['modulo'])) > 0:
add_higher_group_children()
def get_sections(df_in):
# threads elements:
# 'parsing_order' What needs to be applied to current parent data
# 'group_number' Order in the global parsing order
# 'group_type' Is it sequential, exclusive or optional
# 'section' Section name to be extracted from parent_data to data
# 'parent_data' Inital data from which section must be extracted
# 'data' Section data extracted from parent_data
# 'modulo' Reminder of parent_data after extracting section (data)
# 'children_no' Number of children threads to build, based on next parsing order list element. Resets to number of active children
# 'children' Thread id for every child
# 'children_group_number' Group number (in the global parsing order, of the children)
global sentinals, section_lens, sentinal_lens, parsing_order
global children_group_type
global threads
global thread_id
global group_type
# Initial "node': input data
threads = dict()
thread_id = '00'
threads_queue = [thread_id]
threads[thread_id] = {'parsing_order':parsing_order}
threads[thread_id]['group_number'] = 0
threads[thread_id]['group_type'] = None
threads[thread_id]['section'] = None
threads[thread_id]['parent_data'] = df_in
threads[thread_id]['data'] = None
threads[thread_id]['modulo'] = threads[thread_id]['parent_data']
del threads[thread_id]['parent_data']
threads[thread_id]['children_group_number'] = 1
add_next_children()
threads_queue.extend(threads[thread_id]['children'])
threads_queue.remove(thread_id)
# And now, once initialized, let it grow:
logging.info('Processing section partitioning threads')
while threads_queue:
thread_id = threads_queue[0]
logging.info('{} ...'.format(thread_id))
group_type = threads[thread_id]['group_type']
# get section data
extract_data()
# kill thread if nothing there
if len(threads[thread_id]['data']) == 0:
del threads[thread_id]
logging.info('{} deleted: no data'.format(thread_id))
threads_queue.pop(0)
continue
# build children threads
if len(threads[thread_id]['parsing_order']) > 0 and len(threads[thread_id]['modulo']) > 0:
add_next_children()
threads_queue.extend(threads[thread_id]['children'])
#del threads[thread_id]['modulo'] # not until we control what to do whit leftovers....
threads_queue.pop(0)
logging.info('done')
section_dict = dict()
section_groups = [ d[x] for d in parsing_order for x in d.keys() ]
sections = [item for sublist in section_groups for item in sublist]
for section in sections:
section_dict[section] = pd.DataFrame() # Index as initial size to help final merging
thread_ids = [ x for x in threads.keys() if threads[x]['section'] == section ]
for thread_id in thread_ids:
section_dict[section] = section_dict[section].append(threads[thread_id]['data'],ignore_index=False,sort=True)
section_dict[section].sort_index(inplace=True)
return section_dict
# ---------------------------------------------------------------------------
# FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS, AN ITERABLE:
# EITHER A PD.IO.PARSERS.TEXTFILEREADER OR A LIST, DEPENDING ON
# SOURCE TYPE AND CHUNKSIZE ARGUMENT
# BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE
# ---------------------------------------------------------------------------
def source_11(source,schema, chunksize = None, skiprows = None, delimiter = None ):
# 11: 1 REPORT PER RECORD IN ONE LINE
# delimiter = '\t' so that it reads blanks as blanks, otherwise reads as empty: NaN
# this applies mainly when reading elements from sections, but we leave it also here
if isinstance(source,pd.DataFrame):
TextParser = source
TextParser.columns = [0]
TextParser = [TextParser]
elif isinstance(source,pd.io.parsers.TextFileReader):
TextParser = source
else:
TextParser = pd.read_fwf(source,widths=[FULL_WIDTH],header = None, skiprows = skiprows, delimiter="\t", chunksize = chunksize)
if not chunksize:
TextParser = [TextParser]
return TextParser
def source_1x(source,schema, chunksize = None, skiprows = None, delimiter = None ):
# 1X: MULTIPLE REPORTS PER RECORD IN ONE LINE
return source_11(source,schema, chunksize = chunksize, skiprows = skiprows, delimiter = delimiter )
# ---------------------------------------------------------------------------
# MAIN FUNCTIONS
# ---------------------------------------------------------------------------
def source_to_df(TextParser, schema, read_sections, idx_offset = 0):
global sentinals, section_lens, sentinals_lens
global parsing_order
section_lens = { section: schema['sections'][section]['header'].get('length') for section in schema['sections'].keys()}
sentinals = { section: schema['sections'][section]['header'].get('sentinal') for section in schema['sections'].keys()}
sentinals_lens = { section: schema['sections'][section]['header'].get('sentinal_length') for section in schema['sections'].keys()}
parsing_order = schema['header']['parsing_order']
chunk_len = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else 0
multiindex = True if len(read_sections) > 1 or read_sections[0] != properties.dummy_level else False
out_dtypes = dict()
if multiindex:
for section in read_sections:
out_dtypes.update({ (section,i):properties.pandas_dtypes.get(schema['sections'][section]['elements'][i].get('column_type')) for i in schema['sections'][section]['elements'].keys() } )
else:
for section in read_sections:
out_dtypes.update({ i:properties.pandas_dtypes.get(schema['sections'][section]['elements'][i].get('column_type')) for i in schema['sections'][section]['elements'].keys() } )
I_CHUNK = 0
output_buffer = StringIO() if py3 else BytesIO()
valid_buffer = StringIO() if py3 else BytesIO()
for df in TextParser: # Indices here are kept are those according to the full record
logging.info('Processing chunk {}'.format(I_CHUNK))
# 1. Remove delimiter from mixed type (like meds-buoys) if exists
if schema['header'].get('delimiter'):
df.loc[:,0] = df[0].str.replace(',','')
# 2. Separate sections
logging.info('Accessing sections'.format(I_CHUNK))
section_strings = get_sections(df)
# 3. Read section elements
# Look below, if names are passed to read_csv as tuples, it creates automatically a multiindex. Initially I did not want this, wanted tuples
# but they do not seem to be recommended:https://github.com/pandas-dev/pandas/issues/11799
# The mapping (like in CDM or others), would be a bit harder if using multiindex...or will get the tuples equally fine
# 3.0. Prepare df to paste all the sections of current chunk and get initially defined dtypes
df_out = pd.DataFrame()
valid_out = pd.DataFrame()
# We have to cat the data, and also the names and properties to write this to the buffer
logging.info('Reading and coverting section elements')
# 3.1. Loop through sections
for section in read_sections:
logging.info('{} ...'.format(section))
section_buffer = StringIO() if py3 else BytesIO()
# Writing options from quoting on to prevent supp buoy data to be quoted:
# maybe this happenned because buoy data has commas, and pandas makes its own decission about
# how to write that.....
#https://stackoverflow.com/questions/21147058/pandas-to-csv-output-quoting-issue
section_strings[section].to_csv(section_buffer,header=False, encoding = 'utf-8',index = False,quoting=csv.QUOTE_NONE,escapechar="\\",sep="\t") # Here indices are lost, have to give the real ones, those in section_strings
shut_up = section_buffer.seek(0)
# 3.1.1. Read section elements from schema and read from buffer with pandas as objects
section_names = schema['sections'][section]['elements'].keys()
ignore = [ i for i in section_names if schema['sections'][section]['elements'][i].get('ignore') ] # evals to True if set and true, evals to False if not set or set and false
section_widths = list(map(lambda x: x if x else FULL_WIDTH, [ schema['sections'][section]['elements'][i].get('field_length') for i in section_names ]))
section_dtypes = { i:'object' for i in section_names }
section_missing = { i:schema['sections'][section]['elements'][i].get('missing_value') if schema['sections'][section]['elements'][i].get('disable_white_strip') == True
else [schema['sections'][section]['elements'][i].get('missing_value')," "*schema['sections'][section]['elements'][i].get('field_length', FULL_WIDTH)]
for i in section_names }
section_elements = pd.read_fwf(section_buffer, widths = section_widths, header = None, names = section_names , na_values = section_missing, delimiter="\t", encoding = 'utf-8', dtype = section_dtypes, skip_blank_lines = False )
section_valid = pd.DataFrame(index = section_elements.index, columns = section_elements.columns)
# 3.1.2. Decode, scale|offset and convert to dtype (ints will be floats if NaNs)
section_dtypes = { i:schema['sections'][section]['elements'][i]['column_type'] for i in section_names }
encoded = [ (x) for x in section_names if 'encoding' in schema['sections'][section]['elements'][x]]
section_encoding = { i:schema['sections'][section]['elements'][i]['encoding'] for i in encoded }
for element in section_dtypes.keys():
missing = section_elements[element].isna()
if element in encoded:
section_elements[element] = decoders.get(section_encoding.get(element)).get(section_dtypes.get(element))(section_elements[element])
kwargs = { converter_arg:schema['sections'][section]['elements'][element].get(converter_arg) for converter_arg in properties.data_type_conversion_args.get(section_dtypes.get(element)) }
section_elements[element] = converters.get(section_dtypes.get(element))(section_elements[element], **kwargs)
section_valid[element] = missing | section_elements[element].notna()
# 3.1.3. Format section:
# - Put data in its rightfull place of the original data (indexing!) and remove section elements not desired
# - Name columns: tuples (section, element_name) for multisection, element_name if one section
# tuples will turn into a multiindex in the df_reader below
section_elements.index = section_strings[section].index
section_valid.index = section_strings[section].index
section_frame = pd.DataFrame(data = { x:pd.Series(index=range(idx_offset + I_CHUNK*chunk_len, idx_offset + len(df) + I_CHUNK*chunk_len )) for x in section_names})
valid_frame = section_frame.copy()
if len(section_elements) > 0:
section_frame.loc[section_elements.index] = section_elements
valid_frame.loc[section_elements.index] = section_valid
section_frame.drop(ignore, axis = 1, inplace = True)
valid_frame.drop(ignore, axis = 1, inplace = True)
section_frame.columns = [ (section, x) for x in section_frame.columns] if multiindex else section_frame.columns
valid_frame.columns = section_frame.columns
out_dtypes.update({ i:section_frame[i].dtype.name for i in section_frame if section_frame[i].dtype.name in properties.numpy_floats})
# 3.1.4. Paste section to rest of chunk
df_out = pd.concat([df_out,section_frame],sort = False,axis=1)
valid_out = pd.concat([valid_out,valid_frame],sort = False,axis=1)
# 4.3. Add _datetime section: watch this if we mean to do multiple reports in record!!!
# for this to be valid, would have to assume that same day reports and that date in common report section....
# If datetime is derived from within the actual report, then we would have add _datetime after expansion on dataframe posprocessing
if schema['header'].get('date_parser'):
date_parser = schema['header'].get('date_parser')
date_name = ('_datetime','_datetime') if multiindex else '_datetime'
date_elements = [(date_parser['section'],x) for x in date_parser['elements'] ] if date_parser.get('section') else date_parser.get('elements')
out_dtypes.update({ date_name: 'object' })
df_out = functions.df_prepend_datetime(df_out, date_elements, date_parser['format'], date_name = date_name )
valid_out = pd.concat([pd.DataFrame(index = valid_out.index, data = True,columns = [date_name]),valid_out],sort = False,axis=1)
# 4.4. Add chunk data to output
header = False if I_CHUNK == 0 else False
df_out.to_csv(output_buffer,header=header, mode = 'a', encoding = 'utf-8',index = False)
valid_out.to_csv(valid_buffer,header=header, mode = 'a', encoding = 'utf-8',index = False)
I_CHUNK += 1
return output_buffer,valid_buffer,out_dtypes
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019
Reads source data from a data model to a pandas DataFrame.
Optionally, it reads supplemental data from the same source (from a different
data model) and pastes that to the output DataFrame
Uses the meta_formats generic submodules ('delimited' and 'fixed_width') to
pre-format data source and read either generic type of data model.
@author: iregon
"""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
# CAREFULL HERE:
# Note that in Python 3, the io.open function is an alias for the built-in open function.
# The built-in open function only supports the encoding argument in Python 3, not Python 2.
# https://docs.python.org/3.4/library/io.html?highlight=io
from io import StringIO as StringIO
import sys
import pandas as pd
import numpy as np
import logging
from . import meta_formats
from .. import properties
if sys.version_info[0] >= 3:
py3 = True
else:
py3 = False
from io import BytesIO as BytesIO
# Get pandas dtype for time_stamps
pandas_timestamp_dtype = pd.to_datetime(pd.DataFrame(['20000101'])[0],format='%Y%m%d').dtypes
def add_supplemental(data, supp_section, supp_schema, valid):
# Supplemental data needs to have no sectioning: cannot merge dfs with different level depths in the columns...
try:
supp_format = supp_schema['header'].get('format')
if supp_format in properties.supported_meta_file_formats:
TextParser = data if isinstance(data, pd.io.parsers.TextFileReader) else [data]
TextParser_valid = valid if isinstance(valid, pd.io.parsers.TextFileReader) else [valid]
chunksize = data.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
iidx_offset = chunksize if chunksize else 0
output_buffer = StringIO() if py3 else BytesIO()
output_buffer_valid = StringIO() if py3 else BytesIO()
I_CHUNK = 0
for idata,ivalid in zip(TextParser,TextParser_valid):
date_columns = list(np.where(idata.dtypes == pandas_timestamp_dtype)[0])
dtypes = idata.dtypes.to_dict()
supp, supp_valid = read_model(idata[supp_section],supp_schema, idx_offset = I_CHUNK*iidx_offset )
supp_date_columns = list(np.where(supp.dtypes == pandas_timestamp_dtype)[0] + len(idata.columns) - 1 )
date_columns.extend(supp_date_columns)
date_columns = [ int(x) for x in date_columns ] # reader date parser won't take numpy.int64 from np.where as col index
if I_CHUNK == 0:
o_supp_dtypes = supp.dtypes.to_dict()
else:
o_supp_dtypes.update({ i:supp[i].dtype for i in supp if supp[i].dtype in properties.numpy_floats})
supp_elements = supp.columns.to_list()
supp_dtypes = {}
for element in supp_elements:
supp_dtypes[(supp_section,element)] = o_supp_dtypes.get(element)
dtypes.pop((supp_section,idata[supp_section].columns.to_list()[0]), None)
idata.drop(supp_section, axis = 1, inplace = True, level = 0)# OMG: apparently, with multiindex, this does not drop the columns from idata.columns
ivalid.drop(supp_section, axis = 1, inplace = True, level = 0)
supp.columns = [ (supp_section,x) for x in supp.columns ]
supp_valid.columns = [ (supp_section,x) for x in supp_valid.columns ]
dtypes.update(supp_dtypes)
supp.index = idata.index
supp_valid.index = ivalid.index
column_names = [ x for x in idata if x[0] != supp_section ]
column_names.extend([ x for x in supp ])
new_dtypes = { x:dtypes.get(x) for x in column_names }
idata = pd.concat([idata,supp],sort = False,axis=1)
ivalid = pd.concat([ivalid,supp_valid],sort = False,axis=1)
idata.to_csv(output_buffer,header=False, mode = 'a', encoding = 'utf-8',index = False)
ivalid.to_csv(output_buffer_valid,header=False, mode = 'a', encoding = 'utf-8',index = False)
I_CHUNK += 1
output_buffer.seek(0)
output_buffer_valid.seek(0)
for element in list(dtypes):
if new_dtypes.get(element) == pandas_timestamp_dtype:
new_dtypes[element] = 'object' # Only on output (on reading) will be then converted to datetime64[ns] type, cannot specify 'datetime' here: have to go through parser
data = pd.read_csv(output_buffer,names = idata.columns, dtype = new_dtypes, chunksize = chunksize, parse_dates = date_columns )
valid = pd.read_csv(output_buffer_valid,names = ivalid.columns, chunksize = chunksize)
return data, valid
else:
logging.error('Supplemental file format not supported: {}'.format(supp_format))
logging.warning('Supplemental data not extracted from supplemental section')
return data, valid
except Exception as e:
logging.warning('Supplemental data not extracted from supplemental section', exc_info=True)
return data, valid
def read_model(source,schema, sections = None, chunksize = None, skiprows = None, idx_offset = 0):
meta_format = schema['header'].get('format')
if meta_format not in properties.supported_meta_file_formats:
logging.error('File format read from input schema not supported: {}'.format(meta_format))
return
meta_reader = ".".join(['meta_formats',meta_format])
# 0. GET META FORMAT SUBCLASS ---------------------------------------------
if schema['header'].get('multiple_reports_per_line'): # needs to eval to True if set and True and to false if not set or false, without breaking
format_subclass = '1x'
else:
format_subclass = '11'
# 1. PARSE SCHEMA ---------------------------------------------------------
delimiter = schema['header'].get('delimiter')
parsing_order = schema['header'].get('parsing_order')
# 2. DEFINE OUTPUT --------------------------------------------------------
# 2.1 Sections to read
if not sections:
sections = [ x.get(y) for x in parsing_order for y in x ]
read_sections = [y for x in sections for y in x]
else:
read_sections = sections
multiindex = True if len(read_sections) > 1 or read_sections[0] != properties.dummy_level else False
if format_subclass == '1x':
return schema
# 2.1 Elements names: same order as declared in schema, which is the order in which the readers read them...
names = []
if schema['header'].get('date_parser'):
if multiindex:
names.extend([('_datetime','_datetime')])
else:
names.extend(['_datetime'])
for section in read_sections:
if multiindex:
names.extend([ (section,x) for x in schema['sections'][section]['elements'].keys() if not schema['sections'][section]['elements'][x].get('ignore') ])
else:
names.extend([ x for x in schema['sections'][section]['elements'].keys() if not schema['sections'][section]['elements'][x].get('ignore') ])
# 3. GET DATA FROM SOURCE (DF, FILE OR TEXTREADER):------------------------
# SIMPLE STRING PER REPORT/LINE
logging.info("Getting input data from source...")
source_function = eval(meta_reader + "." + "_".join(['source',format_subclass]))
TextParser = source_function(source,schema, chunksize = chunksize, skiprows = skiprows, delimiter = delimiter)
# 4. DO THE ACTUAL READING
reader_function = eval(meta_reader + "." + 'source_to_df')
logging.info("Reading data...")
[output_buffer,valid_buffer,dtypes] = reader_function(TextParser, schema, read_sections = read_sections, idx_offset = idx_offset )
# 5. OUTPUT DATA:----------------------------------------------------------
# WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
output_buffer.seek(0)
valid_buffer.seek(0)
logging.info("Wrapping output....")
chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
logging.info('Data')
# 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail
date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
for i,element in enumerate(list(dtypes)):
if dtypes.get(element) == 'datetime':
date_columns.append(i)
df_reader = pd.read_csv(output_buffer,names = names, chunksize = chunksize, dtype = dtypes, parse_dates = date_columns)
logging.info('Mask')
valid_reader = pd.read_csv(valid_buffer,names = names, chunksize = chunksize)
return df_reader, valid_reader
#Python 3.7.3
matplotlib==3.0.3
numpy==1.16.2
pandas==0.24.2
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment