Commit 8d7cd946 authored by iregon's avatar iregon
Browse files

First commit new architecture

parent 611a3444
No preview for this file type
......@@ -11,7 +11,6 @@ import pandas as pd
# Supported formats, sources and internal data models -------------------------
supported_meta_file_formats = ['fixed_width','delimited']
schema_path = os.path.join(os.path.dirname(__file__),'schemas','lib')
supported_file_formats = [ os.path.basename(x).split(".")[0] for x in glob.glob(schema_path + '/*/*.json') if os.path.basename(x).split(".")[0] == os.path.dirname(x).split("/")[-1]]
supported_sources = [pd.DataFrame, pd.io.parsers.TextFileReader, io.StringIO]
......@@ -44,4 +43,6 @@ data_type_conversion_args['datetime'] = ['datetime_format']
# Misc ------------------------------------------------------------------------
tol = 1E-10
dummy_level = '_section'
\ No newline at end of file
dummy_level = 'SECTION__'
# Length of reports in initial read
MAX_FULL_REPORT_WIDTH = 100000
\ No newline at end of file
......@@ -3,8 +3,8 @@
"""
Created on Tue Apr 30 09:38:17 2019
Reads source data (file, pandas DataFrame or pd.io.parsers.TextFileReader) to
a pandas DataFrame. The source data model needs to be input to the module as
Reads source data (file, pandas DataFrame or pd.io.parsers.TextFileReader) to
a pandas DataFrame. The source data model needs to be input to the module as
a named model (included in the module) or as the path to a data model.
Data is validated against its data model after reading, producing a boolean mask.
......@@ -26,7 +26,6 @@ import logging
import json
def read(source, data_model = None, data_model_path = None, sections = None,chunksize = None,
supp_section = None, supp_model = None, supp_model_path = None,
skiprows = None, out_path = None ):
logging.basicConfig(format='%(levelname)s\t[%(asctime)s](%(filename)s)\t%(message)s',
......@@ -50,39 +49,24 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path)
if not schema:
return
if supp_section:
logging.info("READING SUPPLEMENTAL DATA MODEL SCHEMA FILE...")
supp_schema = schemas.read_schema( schema_name = supp_model, ext_schema_path = supp_model_path)
if not supp_schema:
return
else:
supp_schema = None
# 2. Read data
imodel = data_model if data_model else data_model_path
logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel))
data, valid = reader.read_model(source,schema, sections = sections, chunksize = chunksize, skiprows = skiprows)
# 3. Read additional format: on error, return what's been read so far...
# Mmmmm, make sure we can mix meta_file_formats: eg. core('FIXED_WIDTH')-supp("DELIMITED")
if supp_section:
i_suppmodel = supp_model if supp_model else supp_model_path
logging.info("EXTRACTING SUPPLEMENTAL DATA FROM MODEL: {}".format(i_suppmodel))
data, valid = reader.add_supplemental(data, supp_section, supp_schema, valid)
if isinstance(data,pd.io.parsers.TextFileReader):
logging.info('...RESTORING DATA PARSER')
data = pandas_TextParser_hdlr.restore(data.f,data.orig_options)
# 4. Create out data attributes
logging.info("CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL(S)")
data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names']
out_atts = schemas.df_schema(data_columns, schema, data_model, supp_section = supp_section, supp_schema = supp_schema, supp_model = supp_model )
out_atts = schemas.df_schema(data_columns, schema, data_model)
# 5. Complete data validation
logging.info("VALIDATING DATA")
valid = validate.validate(data, out_atts, valid, data_model = data_model, data_model_path = data_model_path, supp_section = supp_section, supp_model = supp_model, supp_model_path = supp_model_path)
valid = validate.validate(data, out_atts, valid, data_model = data_model, data_model_path = data_model_path)
if isinstance(data,pd.io.parsers.TextFileReader):
logging.info('...RESTORING DATA PARSER')
data = pandas_TextParser_hdlr.restore(data.f,data.orig_options)
if out_path:
logging.info('WRITING DATA TO FILES IN: {}'.format(out_path))
cols = [ x for x in data ]
......@@ -96,12 +80,12 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
valid.to_csv(os.path.join(out_path,'valid_mask.csv'), header = header, encoding = 'utf-8',index = True, index_label='index')
with open(os.path.join(out_path,'atts.json'),'w') as fileObj:
json.dump(out_atts_json,fileObj,indent=4)
return {'data':data,'atts':out_atts,'valid_mask':valid}
if __name__=='__main__':
kwargs = dict(arg.split('=') for arg in sys.argv[2:])
if 'sections' in kwargs.keys():
kwargs.update({ 'sections': [ x.strip() for x in kwargs.get('sections').split(",")] })
read(sys.argv[1],
**kwargs) # kwargs
\ No newline at end of file
read(sys.argv[1],
**kwargs) # kwargs
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019
Reads source data (file, pandas DataFrame or pd.io.parsers.TextFileReader) to
a pandas DataFrame. The source data model needs to be input to the module as
a named model (included in the module) or as the path to a data model.
Data is validated against its data model after reading, producing a boolean mask.
Calls the schemas, reader and valiate modules in the tool to access the data models,
read the data and validate it.
@author: iregon
"""
import os
import sys
import pandas as pd
from mdf_reader.reader import reader as reader
from mdf_reader.validate import validate as validate
import mdf_reader.schemas as schemas
import mdf_reader.properties as properties
import mdf_reader.common.pandas_TextParser_hdlr as pandas_TextParser_hdlr
import logging
import json
def read(source, data_model = None, data_model_path = None, sections = None,chunksize = None,
supp_section = None, supp_model = None, supp_model_path = None,
skiprows = None, out_path = None ):
logging.basicConfig(format='%(levelname)s\t[%(asctime)s](%(filename)s)\t%(message)s',
level=logging.INFO,datefmt='%Y%m%d %H:%M:%S',filename=None)
# 0. Make sure min info is available
if not data_model and not data_model_path:
logging.error('A valid data model name or path to data model must be provided')
return
if not isinstance(source,tuple(properties.supported_sources)):
if not source:
logging.error('Data source is empty (first argument to read()) ')
return
elif not os.path.isfile(source):
logging.error('Can\'t reach data source {} as a file'.format(source))
logging.info('Supported in-memory data sources are {}'.format(",".join(properties.supported_sources)))
return
# 1. Read schema(s) and get file format
logging.info("READING DATA MODEL SCHEMA FILE...")
schema = schemas.read_schema( schema_name = data_model, ext_schema_path = data_model_path)
if not schema:
return
if supp_section:
logging.info("READING SUPPLEMENTAL DATA MODEL SCHEMA FILE...")
supp_schema = schemas.read_schema( schema_name = supp_model, ext_schema_path = supp_model_path)
if not supp_schema:
return
else:
supp_schema = None
# 2. Read data
imodel = data_model if data_model else data_model_path
logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel))
data, valid = reader.read_model(source,schema, sections = sections, chunksize = chunksize, skiprows = skiprows)
# 3. Read additional format: on error, return what's been read so far...
# Mmmmm, make sure we can mix meta_file_formats: eg. core('FIXED_WIDTH')-supp("DELIMITED")
if supp_section:
i_suppmodel = supp_model if supp_model else supp_model_path
logging.info("EXTRACTING SUPPLEMENTAL DATA FROM MODEL: {}".format(i_suppmodel))
data, valid = reader.add_supplemental(data, supp_section, supp_schema, valid)
if isinstance(data,pd.io.parsers.TextFileReader):
logging.info('...RESTORING DATA PARSER')
data = pandas_TextParser_hdlr.restore(data.f,data.orig_options)
# 4. Create out data attributes
logging.info("CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL(S)")
data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names']
out_atts = schemas.df_schema(data_columns, schema, data_model, supp_section = supp_section, supp_schema = supp_schema, supp_model = supp_model )
# 5. Complete data validation
logging.info("VALIDATING DATA")
valid = validate.validate(data, out_atts, valid, data_model = data_model, data_model_path = data_model_path, supp_section = supp_section, supp_model = supp_model, supp_model_path = supp_model_path)
if isinstance(data,pd.io.parsers.TextFileReader):
logging.info('...RESTORING DATA PARSER')
data = pandas_TextParser_hdlr.restore(data.f,data.orig_options)
if out_path:
logging.info('WRITING DATA TO FILES IN: {}'.format(out_path))
cols = [ x for x in data ]
if isinstance(cols[0],tuple):
header = [":".join(x) for x in cols]
out_atts_json = { ":".join(x):out_atts.get(x) for x in out_atts.keys() }
else:
header = cols
out_atts_json = out_atts
data.to_csv(os.path.join(out_path,'data.csv'), header = header, encoding = 'utf-8',index = True, index_label='index')
valid.to_csv(os.path.join(out_path,'valid_mask.csv'), header = header, encoding = 'utf-8',index = True, index_label='index')
with open(os.path.join(out_path,'atts.json'),'w') as fileObj:
json.dump(out_atts_json,fileObj,indent=4)
return {'data':data,'atts':out_atts,'valid_mask':valid}
if __name__=='__main__':
kwargs = dict(arg.split('=') for arg in sys.argv[2:])
if 'sections' in kwargs.keys():
kwargs.update({ 'sections': [ x.strip() for x in kwargs.get('sections').split(",")] })
read(sys.argv[1],
**kwargs) # kwargs
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019
Splits string reports in sections using a data model layout
Input and output are simple pandas dataframes, with the output DF column names
as the section names
To work with a pandas TextParser, loop through this module.
Internally works assuming highest complexity in the input data model:
multiple non sequential sections
DEV NOTES:
1) make sure we use Series when working with Series, DataFrames otherwise...
like now:
threads[thread_id]['data'] = pd.Series(threads[thread_id]['parent_data'][0].str[0:section_len])
instead of:
threads[thread_id]['data'] = pd.DataFrame(threads[thread_id]['parent_data'][0].str[0:section_len])
on data import in import_data.py, we use pd.read_fwf because is more general
use, also support to chunking would make converting to series a bit dirty...
2) Can we extend (do we need to?) this to reading sequential sections with
no sentinals? apparently (see td11) we are already able to do that:
provided the section is in a sequential parsing_order group
@author: iregon
"""
import pandas as pd
from copy import deepcopy
import logging
import mdf_reader.properties as properties
# ---------------------------------------------------------------------------
# FUNCTIONS TO PERFORM INITIAL SEPARATION OF SECTIONS: MAIN IS GET_SECTIONS()
# ---------------------------------------------------------------------------
def extract_data():
section_len = section_lens.get(threads[thread_id]['section'])
if section_len:
threads[thread_id]['data'] = pd.Series(threads[thread_id]['parent_data'][0].str[0:section_len]) # object consistency needed here
threads[thread_id]['modulo'] = pd.DataFrame(threads[thread_id]['parent_data'][0].str[section_len:]) # object consistency needed here
else:
threads[thread_id]['data'] = pd.Series(threads[thread_id]['parent_data'][0].str[0:]) #threads[thread_id]['parent_data'].copy()
# Could even be like with section_len (None in section_len will read to the end)
threads[thread_id]['modulo'] = pd.DataFrame(columns = [0]) # Just for consistency
del threads[thread_id]['parent_data']
def add_next_children():
global children_parsing_order, branch_parsing_order, children_group_type, children_group_number
children_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
branch_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
children_group_type = list(children_parsing_order[0])[0]
children_group_number = threads[thread_id]['children_group_number']
threads[thread_id]['children_no'] = 0
threads[thread_id]['children'] = []
add_children()
def add_higher_group_children():
global children_parsing_order, branch_parsing_order, children_group_type, children_group_number
children_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
children_parsing_order.pop(0) # Move to next group of sections
if len(children_parsing_order) > 0:
branch_parsing_order = deepcopy(threads[thread_id]['parsing_order'])
branch_parsing_order.pop(0)
children_group_type = list(children_parsing_order[0])[0]
children_group_number = threads[thread_id]['children_group_number'] + 1
add_children()
def add_children():
if children_group_type == 's':
add_static_children()
else:
add_dynamic_children()
def add_static_children():
threads[thread_id]['children_no'] += 1
children_thread_id = str(children_group_number) + str(0) + thread_id
threads[thread_id]['children'].append(children_thread_id)
# Now build children's thread
children_section = children_parsing_order[0][children_group_type].pop(0)
grandchildren_group_number = children_group_number
if len(children_parsing_order[0][children_group_type]) == 0:
children_parsing_order.pop(0)
if len(children_parsing_order) > 0:
grandchildren_group_number += 1
else:
grandchildren_group_number = None
threads[children_thread_id] = {'parsing_order':children_parsing_order}
threads[children_thread_id]['group_number'] = children_group_number
threads[children_thread_id]['group_type'] = children_group_type
threads[children_thread_id]['section'] = children_section
threads[children_thread_id]['parent_data'] = threads[thread_id]['modulo']
threads[thread_id]['modulo'].iloc[0:0] # Remove reports from modulo
threads[children_thread_id]['children_group_number'] = grandchildren_group_number
def add_dynamic_children():
for i in range(0,len(children_parsing_order[0][children_group_type])):
branch_i_parsing_order = deepcopy(branch_parsing_order)
children_thread_id = str(children_group_number) + str(i+1) + thread_id
# Now build children's thread
children_section = children_parsing_order[0][children_group_type].pop(0)
children_idx = threads[thread_id]['modulo'].loc[threads[thread_id]['modulo'][0].str[0:sentinals_lens.get(children_section)] == sentinals.get(children_section)].index
if len(children_idx) == 0:
continue
threads[thread_id]['children'].append(children_thread_id)
threads[thread_id]['children_no'] += 1
branch_i_parsing_order[0][children_group_type].remove(children_section)
grandchildren_group_number = children_group_number
if len(branch_i_parsing_order[0][children_group_type]) == 0 or children_group_type == 'e':
branch_i_parsing_order.pop(0)
if len(children_parsing_order) > 0:
grandchildren_group_number += 1
else:
grandchildren_group_number = None
threads[children_thread_id] = {'parsing_order':branch_i_parsing_order}
threads[children_thread_id]['group_number'] = children_group_number
threads[children_thread_id]['group_type'] = children_group_type
threads[children_thread_id]['section'] = children_section
threads[children_thread_id]['parent_data'] = threads[thread_id]['modulo'].loc[children_idx]
threads[thread_id]['modulo'].drop(children_idx,inplace = True)
threads[children_thread_id]['children_group_number'] = grandchildren_group_number
if (len(threads[thread_id]['modulo'])) > 0:
add_higher_group_children()
def extract_sections(df_in):
# threads elements:
# 'parsing_order' What needs to be applied to current parent data
# 'group_number' Order in the global parsing order
# 'group_type' Is it sequential, exclusive or optional
# 'section' Section name to be extracted from parent_data to data
# 'parent_data' Inital data from which section must be extracted
# 'data' Section data extracted from parent_data
# 'modulo' Reminder of parent_data after extracting section (data)
# 'children_no' Number of children threads to build, based on next parsing order list element. Resets to number of active children
# 'children' Thread id for every child
# 'children_group_number' Group number (in the global parsing order, of the children)
global sentinals, section_lens, sentinal_lens, parsing_order
global children_group_type
global threads
global thread_id
global group_type
# Initial "node': input data
threads = dict()
thread_id = '00'
threads_queue = [thread_id]
threads[thread_id] = {'parsing_order':parsing_order}
threads[thread_id]['group_number'] = 0
threads[thread_id]['group_type'] = None
threads[thread_id]['section'] = None
threads[thread_id]['parent_data'] = df_in
threads[thread_id]['data'] = None
threads[thread_id]['modulo'] = threads[thread_id]['parent_data']
del threads[thread_id]['parent_data']
threads[thread_id]['children_group_number'] = 1
add_next_children()
threads_queue.extend(threads[thread_id]['children'])
threads_queue.remove(thread_id)
# And now, once initialized, let it grow:
logging.info('Processing section partitioning threads')
while threads_queue:
thread_id = threads_queue[0]
logging.info('{} ...'.format(thread_id))
group_type = threads[thread_id]['group_type']
# get section data
extract_data()
# kill thread if nothing there
if len(threads[thread_id]['data']) == 0:
del threads[thread_id]
logging.info('{} deleted: no data'.format(thread_id))
threads_queue.pop(0)
continue
# build children threads
if len(threads[thread_id]['parsing_order']) > 0 and len(threads[thread_id]['modulo']) > 0:
add_next_children()
threads_queue.extend(threads[thread_id]['children'])
#del threads[thread_id]['modulo'] # not until we control what to do whit leftovers....
threads_queue.pop(0)
logging.info('done')
section_dict = dict()
section_groups = [ d[x] for d in parsing_order for x in d.keys() ]
sections = [item for sublist in section_groups for item in sublist]
for section in sections:
#section_dict[section] = pd.DataFrame() # Index as initial size to help final merging
section_dict[section] = pd.Series()
thread_ids = [ x for x in threads.keys() if threads[x]['section'] == section ]
for thread_id in thread_ids:
section_dict[section] = section_dict[section].append(threads[thread_id]['data'],ignore_index=False)
section_dict[section].sort_index(inplace=True)
return section_dict
# ---------------------------------------------------------------------------
# MAIN
# ---------------------------------------------------------------------------
def get_sections(StringDf, schema, read_sections):
global sentinals, section_lens, sentinals_lens
global parsing_order
if len(schema['sections'].keys())> 1:
section_lens = { section: schema['sections'][section]['header'].get('length') for section in schema['sections'].keys()}
sentinals = { section: schema['sections'][section]['header'].get('sentinal') for section in schema['sections'].keys()}
sentinals_lens = { section: len(sentinals.get(section)) if sentinals.get(section) else 0 for section in sentinals.keys()}
parsing_order = schema['header']['parsing_order']
# Get sections separated
section_strings = extract_sections(StringDf)
# Paste in order in a single dataframe with columns named as sections
# Do not include sections not asked for
df_out = pd.DataFrame()
for section in read_sections:
df_out = pd.concat([df_out,section_strings[section].rename(section)],sort = False,axis=1)
else:
df_out = StringDf
df_out.columns = read_sections
return df_out
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 10 13:17:43 2020
FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS, AN ITERABLE:
EITHER A PD.IO.PARSERS.TEXTFILEREADER OR A LIST, DEPENDING ON
SOURCE TYPE AND CHUNKSIZE ARGUMENT
BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE
delimiter="\t" option in pandas.read_fwf avoids white spaces at taild
to be stripped
@author: iregon
OPTIONS IN OLD DEVELOPMENT:
1. DLMT: delimiter = ',' default
names = [ (x,y) for x in schema['sections'].keys() for y in schema['sections'][x]['elements'].keys()]
missing = { x:schema['sections'][x[0]]['elements'][x[1]].get('missing_value') for x in names }
TextParser = pd.read_csv(source,header = None, delimiter = delimiter, encoding = 'utf-8',
dtype = 'object', skip_blank_lines = False, chunksize = chunksize,
skiprows = skiprows, names = names, na_values = missing)
2. FWF:# delimiter = '\t' so that it reads blanks as blanks, otherwise reads as empty: NaN
this applies mainly when reading elements from sections, but we leave it also here
TextParser = pd.read_fwf(source,widths=[FULL_WIDTH],header = None, skiprows = skiprows, delimiter="\t", chunksize = chunksize)
"""
import pandas as pd
import os
from mdf_reader import properties
def import_data(source,chunksize = None, skiprows = None):
if isinstance(source,pd.io.parsers.TextFileReader):
TextParser = source
elif os.path.isfile(source):
TextParser = pd.read_fwf(source,widths=[properties.MAX_FULL_REPORT_WIDTH],header = None, delimiter="\t", skiprows = skiprows, chunksize = chunksize)
if not chunksize:
TextParser = [TextParser]
else:
print('Error')
return TextParser
......@@ -8,8 +8,8 @@ DataFrame.
Assumes source data as data model layout and all sections and elements in data.
Reads in full data content, then decodes and converts the elements.
Internally works assuming highest complexity in the input data model:
Internally works assuming highest complexity in the input data model:
multiple sequential sections
@author: iregon
......@@ -44,36 +44,10 @@ else:
from io import BytesIO as BytesIO
# ---------------------------------------------------------------------------
# FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS, AN ITERABLE:
# EITHER A PD.IO.PARSERS.TEXTFILEREADER OR A LIST, DEPENDING ON
# SOURCE TYPE AND CHUNKSIZE ARGUMENT
# BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE
# ---------------------------------------------------------------------------
def source_11(source, schema, chunksize = None, skiprows = None, delimiter = ',' ):
# 11: 1 REPORT PER RECORD IN ONE LINE
if isinstance(source,pd.DataFrame):
TextParser = source
TextParser = [TextParser]
elif isinstance(source,pd.io.parsers.TextFileReader):
TextParser = source
else:
names = [ (x,y) for x in schema['sections'].keys() for y in schema['sections'][x]['elements'].keys()]
missing = { x:schema['sections'][x[0]]['elements'][x[1]].get('missing_value') for x in names }
TextParser = pd.read_csv(source,header = None, delimiter = delimiter, encoding = 'utf-8',
dtype = 'object', skip_blank_lines = False, chunksize = chunksize,
skiprows = skiprows, names = names, na_values = missing)
if not chunksize:
TextParser = [TextParser]
return TextParser
def source_1x(source,schema, chunksize = None, skiprows = None, delimiter = ',' ):
# 1X: MULTIPLE REPORTS PER RECORD IN ONE LINE
return source_11(source,schema, chunksize = chunksize, skiprows = skiprows, delimiter = ',' )
# ---------------------------------------------------------------------------
# MAIN FUNCTIONS
# ---------------------------------------------------------------------------
def source_to_df(source, schema, read_sections, idx_offset = 0):
column_names = []
for section in schema['sections']:
column_names.extend([ (section,i) for i in schema['sections'][section]['elements'] ])
......@@ -102,11 +76,11 @@ def source_to_df(source, schema, read_sections, idx_offset = 0):
for element in dtypes.keys():
missing = df[element].isna()
if element in encoded:
df[element] = decoders.get(encodings.get(element)).get(dtypes.get(element))(df[element])
df[element] = decoders.get(encodings.get(element)).get(dtypes.get(element))(df[element])
kwargs = { converter_arg:schema['sections'][element[0]]['elements'][element[1]].get(converter_arg) for converter_arg in properties.data_type_conversion_args.get(dtypes.get(element)) }
df[element] = converters.get(dtypes.get(element))(df[element], **kwargs)
df[element] = converters.get(dtypes.get(element))(df[element], **kwargs)
valid[element] = missing | df[element].notna()
# Add _datetime section: watch this if we mean to do multiple reports in record!!!
# for this to be valid, would have to assume that same day reports and that date in common report section....
......@@ -118,11 +92,11 @@ def source_to_df(source, schema, read_sections, idx_offset = 0):
out_dtypes.update({ date_name: 'object' })
df = functions.df_prepend_datetime(df, date_elements, date_parser['format'], date_name = date_name )
valid = pd.concat([pd.DataFrame(index = valid.index, data = True,columns = [date_name]),valid],sort = False,axis=1)
out_dtypes.update({ i:df[i].dtype.name for i in df if df[i].dtype.name in properties.numpy_floats})
if idx_offset > 0:
out_dtypes.update({ i:df[i].dtype.name for i in df if df[i].dtype.name in properties.numpy_floats})
if idx_offset > 0:
df.index = df.index + idx_offset
# If I get into the section: is it really only removing that named element from that section???? have to check
# If I get into the section: is it really only removing that named element from that section???? have to check
#element[section].drop([element],axis=1,level=1,inplace = True)
# Drop level 0 in multilevel if len(read_sections)==1 or section is dummy
# 3. Add chunk data to output
......@@ -130,5 +104,5 @@ def source_to_df(source, schema, read_sections, idx_offset = 0):
df.to_csv(output_buffer,header = header, mode = 'a', encoding = 'utf-8',index = False)
valid.to_csv(valid_buffer,header=header, mode = 'a', encoding = 'utf-8',index = False)
I_CHUNK += 1
return output_buffer, valid_buffer, out_dtypes
\ No newline at end of file
return output_buffer, valid_buffer, out_dtypes
......@@ -10,8 +10,8 @@ Uses the data model layout to first find sections in the data and internally
store the data in sections, then reads in, decodes and converts the elements on
a section by section basis and finally merges that together in the output
dataframe.
Internally works assuming highest complexity in the input data model:
Internally works assuming highest complexity in the input data model:
multiple non sequential sections
@author: iregon
......@@ -41,7 +41,7 @@ if sys.version_info[0] >= 3:
else:
py3 = False
from io import BytesIO as BytesIO
FULL_WIDTH = 100000
......@@ -147,13 +147,13 @@ def get_sections(df_in):
# 'modulo' Reminder of parent_data after extracting section (data)
# 'children_no' Number of children threads to build, based on next parsing order list element. Resets to number of active children
# 'children' Thread id for every child
# 'children_group_number' Group number (in the global parsing order, of the children)
# 'children_group_number' Group number (in the global parsing order, of the children)
global sentinals, section_lens, sentinal_lens, parsing_order
global children_group_type
global threads
global thread_id
global group_type
# Initial "node': input data
threads = dict()
thread_id = '00'
......@@ -202,39 +202,13 @@ def get_sections(df_in):
section_dict[section].sort_index(inplace=True)
return section_dict
# ---------------------------------------------------------------------------
# FUNCTION TO PREPARE SOURCE DATA TO WHAT GET_SECTIONS() EXPECTS, AN ITERABLE:
# EITHER A PD.IO.PARSERS.TEXTFILEREADER OR A LIST, DEPENDING ON
# SOURCE TYPE AND CHUNKSIZE ARGUMENT
# BASICALLY 1 RECORD (ONE OR MULTIPLE REPORTS) IN ONE LINE
# ---------------------------------------------------------------------------
def source_11(source,schema, chunksize = None, skiprows = None, delimiter = None ):
# 11: 1 REPORT PER RECORD IN ONE LINE
# delimiter = '\t' so that it reads blanks as blanks, otherwise reads as empty: NaN
# this applies mainly when reading elements from sections, but we leave it also here
if isinstance(source,pd.DataFrame):
TextParser = source
TextParser.columns = [0]
TextParser = [TextParser]
elif isinstance(source,pd.io.parsers.TextFileReader):
TextParser = source
else:
TextParser = pd.read_fwf(source,widths=[FULL_WIDTH],header = None, skiprows = skiprows, delimiter="\t", chunksize = chunksize)
if not chunksize:
TextParser = [TextParser]
return TextParser
def source_1x(source,schema, chunksize = None, skiprows = None, delimiter = None ):
# 1X: MULTIPLE REPORTS PER RECORD IN ONE LINE
return source_11(source,schema, chunksize = chunksize, skiprows = skiprows, delimiter = delimiter )
# ---------------------------------------------------------------------------
# MAIN FUNCTIONS
# ---------------------------------------------------------------------------
def source_to_df(TextParser, schema, read_sections, idx_offset = 0):
global sentinals, section_lens, sentinals_lens
global parsing_order
section_lens = { section: schema['sections'][section]['header'].get('length') for section in schema['sections'].keys()}
sentinals = { section: schema['sections'][section]['header'].get('sentinal') for section in schema['sections'].keys()}
sentinals_lens = { section: schema['sections'][section]['header'].get('sentinal_length') for section in schema['sections'].keys()}
......@@ -247,7 +221,7 @@ def source_to_df(TextParser, schema, read_sections, idx_offset = 0):
out_dtypes.update({ (section,i):properties.pandas_dtypes.get(schema['sections'][section]['elements'][i].get('column_type')) for i in schema['sections'][section]['elements'].keys() } )
else:
for section in read_sections:
out_dtypes.update({ i:properties.pandas_dtypes.get(schema['sections'][section]['elements'][i].get('column_type')) for i in schema['sections'][section]['elements'].keys() } )
out_dtypes.update({ i:properties.pandas_dtypes.get(schema['sections'][section]['elements'][i].get('column_type')) for i in schema['sections'][section]['elements'].keys() } )
I_CHUNK = 0
output_buffer = StringIO() if py3 else BytesIO()
valid_buffer = StringIO() if py3 else BytesIO()
......@@ -297,12 +271,12 @@ def source_to_df(TextParser, schema, read_sections, idx_offset = 0):
missing = section_elements[element].isna()
if element in encoded:
section_elements[element] = decoders.get(section_encoding.get(element)).get(section_dtypes.get(element))(section_elements[element])
kwargs = { converter_arg:schema['sections'][section]['elements'][element].get(converter_arg) for converter_arg in properties.data_type_conversion_args.get(section_dtypes.get(element)) }
section_elements[element] = converters.get(section_dtypes.get(element))(section_elements[element], **kwargs)
section_valid[element] = missing | section_elements[element].notna()
# 3.1.3. Format section:
# - Put data in its rightfull place of the original data (indexing!) and remove section elements not desired
# - Name columns: tuples (section, element_name) for multisection, element_name if one section
......@@ -338,4 +312,4 @@ def source_to_df(TextParser, schema, read_sections, idx_offset = 0):
valid_out.to_csv(valid_buffer,header=header, mode = 'a', encoding = 'utf-8',index = False)
I_CHUNK += 1
return output_buffer,valid_buffer,out_dtypes
\ No newline at end of file
return output_buffer,valid_buffer,out_dtypes
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Jan 10 13:17:43 2020
@author: iregon
"""
import pandas as pd
from io import StringIO as StringIO
import mdf_reader.properties as properties
import csv # To disable quoting
from mdf_reader.common.converters import converters
from mdf_reader.common.decoders import decoders
def extract_fixed_width(section_serie_bf,section_schema):
# Read section elements descriptors
section_names = section_schema['elements'].keys()
section_widths = list(map(lambda x: x if x else properties.MAX_FULL_REPORT_WIDTH, [ section_schema['elements'][i].get('field_length') for i in section_names ]))
section_missing = { i:section_schema['elements'][i].get('missing_value') if section_schema['elements'][i].get('disable_white_strip') == True
else [section_schema['elements'][i].get('missing_value')," "*section_schema['elements'][i].get('field_length', properties.MAX_FULL_REPORT_WIDTH)]
for i in section_names }
section_elements = pd.read_fwf(section_serie_bf, widths = section_widths, header = None, names = section_names , na_values = section_missing, delimiter="\t", encoding = 'utf-8', dtype = 'object', skip_blank_lines = False )
return section_elements
def extract_delimited(section_serie_bf,section_schema):
delimiter = section_schema['header'].get('delimiter')
section_names = section_schema['elements'].keys()
section_missing = { x:section_schema['elements'][x].get('missing_value') for x in section_names }
section_elements = pd.read_csv(section_serie_bf,header = None, delimiter = delimiter, encoding = 'utf-8',
dtype = 'object', skip_blank_lines = False,
names = section_names, na_values = section_missing)
return section_elements
def read_data(section_df,section_schema):
section_names = section_df.columns
section_dtypes = { i:section_schema['elements'][i]['column_type'] for i in section_names }
encoded = [ (x) for x in section_names if 'encoding' in section_schema['elements'][x]]
section_encoding = { i:section_schema['elements'][i]['encoding'] for i in encoded }
for element in section_dtypes.keys():
#missing = section_elements[element].isna()
if element in encoded:
section_df[element] = decoders.get(section_encoding.get(element)).get(section_dtypes.get(element))(section_df[element])
kwargs = { converter_arg:section_schema['elements'][element].get(converter_arg) for converter_arg in properties.data_type_conversion_args.get(section_dtypes.get(element)) }
section_df[element] = converters.get(section_dtypes.get(element))(section_df[element], **kwargs)
# section_valid[element] = missing | section_elements[element].notna()
return section_df
def read_sections(sections_df, schema):
multiindex = True if len(sections_df.columns) > 1 or sections_df.columns[0] != properties.dummy_level else False
data_df = pd.DataFrame()
out_dtypes = dict()
for section in sections_df.columns:
print('Reading section {}'.format(section))
section_schema = schema['sections'].get(section)
disable_read = section_schema.get('header').get('disable_read')
if not disable_read:
field_layout = section_schema.get('header').get('field_layout')
ignore = [ i for i in section_schema['elements'].keys() if section_schema['elements'][i].get('ignore') ] # evals to True if set and true, evals to False if not set or set and false
# Get rid of false delimiters in fixed_width
delimiter = section_schema['header'].get('delimiter')
if delimiter and field_layout == 'fixed_width':
sections_df[section] = sections_df[section].str.replace(delimiter,'')
section_buffer = StringIO()
# Writing options from quoting on to prevent supp buoy data to be quoted:
# maybe this happenned because buoy data has commas, and pandas makes its own decission about
# how to write that.....
#https://stackoverflow.com/questions/21147058/pandas-to-csv-output-quoting-issue
# quoting=csv.QUOTE_NONE was failing when a section is empty (or just one record in a section,...)
# Here indices are lost, have to give the real ones, those in section_strings:
# we'll see if we do that in the caller module or here....
sections_df[section].to_csv(section_buffer,header=False, encoding = 'utf-8',index = False)#,quoting=csv.QUOTE_NONE,escapechar="\\",sep="\t")
ssshh = section_buffer.seek(0)
# Get the individual elements as objects
if field_layout == 'fixed_width':
section_elements_obj = extract_fixed_width(section_buffer,section_schema)
elif field_layout == 'delimited':
section_elements_obj = extract_delimited(section_buffer,section_schema)
section_elements_obj.drop(ignore, axis = 1, inplace = True)
# Read the objects to their data types and apply decoding, scaling and so on...
section_elements = read_data(section_elements_obj,section_schema)
section_elements.index = sections_df[section].index
else:
section_elements = pd.DataFrame(sections_df[section],columns = [section])
if not disable_read:
if multiindex:
out_dtypes.update({ (section,i):properties.pandas_dtypes.get(section_schema['elements'][i].get('column_type')) for i in section_elements.columns } )
out_dtypes.update({ (section,i):section_elements[i].dtype.name for i in section_elements if section_elements[i].dtype.name in properties.numpy_floats})
else:
out_dtypes.update({ i:properties.pandas_dtypes.get(section_schema['elements'][i].get('column_type')) for i in section_elements.columns } )
out_dtypes.update({ i:section_elements[i].dtype.name for i in section_elements if section_elements[i].dtype.name in properties.numpy_floats})
else:
if multiindex:
out_dtypes.update({ (section,section):'object' } )
else:
out_dtypes.update({ section:'object' } )
section_elements.columns = [ (section, x) for x in section_elements.columns] if multiindex else section_elements.columns
data_df = pd.concat([data_df,section_elements],sort = False,axis=1)
return data_df,out_dtypes
......@@ -27,7 +27,13 @@ import pandas as pd
import numpy as np
import logging
from . import meta_formats
from .. import properties
from . import import_data
from . import get_sections
from . import read_sections
import copy
if sys.version_info[0] >= 3:
py3 = True
......@@ -38,136 +44,70 @@ else:
# Get pandas dtype for time_stamps
pandas_timestamp_dtype = pd.to_datetime(pd.DataFrame(['20000101'])[0],format='%Y%m%d').dtypes
def add_supplemental(data, supp_section, supp_schema, valid):
# Supplemental data needs to have no sectioning: cannot merge dfs with different level depths in the columns...
try:
supp_format = supp_schema['header'].get('format')
if supp_format in properties.supported_meta_file_formats:
TextParser = data if isinstance(data, pd.io.parsers.TextFileReader) else [data]
TextParser_valid = valid if isinstance(valid, pd.io.parsers.TextFileReader) else [valid]
chunksize = data.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
iidx_offset = chunksize if chunksize else 0
output_buffer = StringIO() if py3 else BytesIO()
output_buffer_valid = StringIO() if py3 else BytesIO()
I_CHUNK = 0
for idata,ivalid in zip(TextParser,TextParser_valid):
date_columns = list(np.where(idata.dtypes == pandas_timestamp_dtype)[0])
dtypes = idata.dtypes.to_dict()
supp, supp_valid = read_model(idata[supp_section],supp_schema, idx_offset = I_CHUNK*iidx_offset )
supp_date_columns = list(np.where(supp.dtypes == pandas_timestamp_dtype)[0] + len(idata.columns) - 1 )
date_columns.extend(supp_date_columns)
date_columns = [ int(x) for x in date_columns ] # reader date parser won't take numpy.int64 from np.where as col index
if I_CHUNK == 0:
o_supp_dtypes = supp.dtypes.to_dict()
else:
o_supp_dtypes.update({ i:supp[i].dtype for i in supp if supp[i].dtype in properties.numpy_floats})
supp_elements = supp.columns.to_list()
supp_dtypes = {}
for element in supp_elements:
supp_dtypes[(supp_section,element)] = o_supp_dtypes.get(element)
dtypes.pop((supp_section,idata[supp_section].columns.to_list()[0]), None)
idata.drop(supp_section, axis = 1, inplace = True, level = 0)# OMG: apparently, with multiindex, this does not drop the columns from idata.columns
ivalid.drop(supp_section, axis = 1, inplace = True, level = 0)
supp.columns = [ (supp_section,x) for x in supp.columns ]
supp_valid.columns = [ (supp_section,x) for x in supp_valid.columns ]
dtypes.update(supp_dtypes)
supp.index = idata.index
supp_valid.index = ivalid.index
column_names = [ x for x in idata if x[0] != supp_section ]
column_names.extend([ x for x in supp ])
new_dtypes = { x:dtypes.get(x) for x in column_names }
idata = pd.concat([idata,supp],sort = False,axis=1)
ivalid = pd.concat([ivalid,supp_valid],sort = False,axis=1)
idata.to_csv(output_buffer,header=False, mode = 'a', encoding = 'utf-8',index = False)
ivalid.to_csv(output_buffer_valid,header=False, mode = 'a', encoding = 'utf-8',index = False)
I_CHUNK += 1
output_buffer.seek(0)
output_buffer_valid.seek(0)
for element in list(dtypes):
if new_dtypes.get(element) == pandas_timestamp_dtype:
new_dtypes[element] = 'object' # Only on output (on reading) will be then converted to datetime64[ns] type, cannot specify 'datetime' here: have to go through parser
data = pd.read_csv(output_buffer,names = idata.columns, dtype = new_dtypes, chunksize = chunksize, parse_dates = date_columns )
valid = pd.read_csv(output_buffer_valid,names = ivalid.columns, chunksize = chunksize)
return data, valid
else:
logging.error('Supplemental file format not supported: {}'.format(supp_format))
logging.warning('Supplemental data not extracted from supplemental section')
return data, valid
except Exception as e:
logging.warning('Supplemental data not extracted from supplemental section', exc_info=True)
return data, valid
def read_model(source,schema, sections = None, chunksize = None, skiprows = None, idx_offset = 0):
meta_format = schema['header'].get('format')
if meta_format not in properties.supported_meta_file_formats:
logging.error('File format read from input schema not supported: {}'.format(meta_format))
return
meta_reader = ".".join(['meta_formats',meta_format])
def read_model(source,schema, sections = None, chunksize = None, skiprows = None):
# 0. GET META FORMAT SUBCLASS ---------------------------------------------
if schema['header'].get('multiple_reports_per_line'): # needs to eval to True if set and True and to false if not set or false, without breaking
format_subclass = '1x'
else:
format_subclass = '11'
# For future use: some work already done in schema reading
if schema['header'].get('multiple_reports_per_line'):
logging.error('File format not yet supported')
sys.exit(1)
# 1. PARSE SCHEMA ---------------------------------------------------------
delimiter = schema['header'].get('delimiter')
parsing_order = schema['header'].get('parsing_order')
# 2. DEFINE OUTPUT --------------------------------------------------------
# 2.1 Sections to read
if not sections:
sections = [ x.get(y) for x in parsing_order for y in x ]
read_sections = [y for x in sections for y in x]
read_sections_list = [y for x in sections for y in x]
else:
read_sections = sections
multiindex = True if len(read_sections) > 1 or read_sections[0] != properties.dummy_level else False
read_sections_list = sections
# 3. HOMOGENEIZE INPUT DATA (FILE OR TEXTREADER) TO AN ITERABLE TEXTREADER
logging.info("Getting data string from source...")
TextParser = import_data.import_data(source, chunksize = chunksize, skiprows = skiprows)
# 4. EXTRACT SECTIONS IN A PARSER; EXTRACT SECTIONS HERE AND READ DATA IN
# SAME LOOP? SHOULD DO....
logging.info("Extracting sections...")
data_buffer = StringIO()
if format_subclass == '1x':
return schema
# 2.1 Elements names: same order as declared in schema, which is the order in which the readers read them...
names = []
if schema['header'].get('date_parser'):
if multiindex:
names.extend([('_datetime','_datetime')])
else:
names.extend(['_datetime'])
for section in read_sections:
if multiindex:
names.extend([ (section,x) for x in schema['sections'][section]['elements'].keys() if not schema['sections'][section]['elements'][x].get('ignore') ])
else:
names.extend([ x for x in schema['sections'][section]['elements'].keys() if not schema['sections'][section]['elements'][x].get('ignore') ])
# 3. GET DATA FROM SOURCE (DF, FILE OR TEXTREADER):------------------------
# SIMPLE STRING PER REPORT/LINE
logging.info("Getting input data from source...")
source_function = eval(meta_reader + "." + "_".join(['source',format_subclass]))
TextParser = source_function(source,schema, chunksize = chunksize, skiprows = skiprows, delimiter = delimiter)
# 4. DO THE ACTUAL READING
reader_function = eval(meta_reader + "." + 'source_to_df')
logging.info("Reading data...")
[output_buffer,valid_buffer,dtypes] = reader_function(TextParser, schema, read_sections = read_sections, idx_offset = idx_offset )
# 5. OUTPUT DATA:----------------------------------------------------------
# WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
output_buffer.seek(0)
valid_buffer.seek(0)
logging.info("Wrapping output....")
chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
logging.info('Data')
# 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail
date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
for i,element in enumerate(list(dtypes)):
if dtypes.get(element) == 'datetime':
date_columns.append(i)
df_reader = pd.read_csv(output_buffer,names = names, chunksize = chunksize, dtype = dtypes, parse_dates = date_columns)
logging.info('Mask')
valid_reader = pd.read_csv(valid_buffer,names = names, chunksize = chunksize)
return df_reader, valid_reader
# valid_buffer = ...
for i,string_df in enumerate(TextParser):
# Get sections separated in a dataframe: one per column, only requested
# sections, ignore rest.
sections_df = get_sections.get_sections(string_df, schema, read_sections_list)
# Read elements from sections: along data chunks, resulting data types
# may vary if gaps
[data_df,out_dtypesi ] = read_sections.read_sections(sections_df, schema)
if i == 0:
out_dtypes = copy.deepcopy(out_dtypesi)
for k in out_dtypesi:
if out_dtypesi in properties.numpy_floats:
out_dtypes.update({ k:out_dtypesi.get(k) })
data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
# [output_buffer,valid_buffer,dtypes] = reader_function(TextParser, schema, read_sections = read_sections, idx_offset = idx_offset )
#
# # 5. OUTPUT DATA:----------------------------------------------------------
# # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
data_buffer.seek(0)
# valid_buffer.seek(0)
# logging.info("Wrapping output....")
# chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
# logging.info('Data')
# # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail
# date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
# for i,element in enumerate(list(dtypes)):
# if dtypes.get(element) == 'datetime':
# date_columns.append(i)
data_reader = pd.read_csv(data_buffer,names = data_df.columns, chunksize = chunksize, dtype = out_dtypes)#, parse_dates = date_columns)
# logging.info('Mask')
# valid_reader = pd.read_csv(valid_buffer,names = out_names, chunksize = chunksize)
# return data_reader, valid_reader
return data_reader
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Apr 30 09:38:17 2019
Reads source data from a data model to a pandas DataFrame.
Optionally, it reads supplemental data from the same source (from a different
data model) and pastes that to the output DataFrame
Uses the meta_formats generic submodules ('delimited' and 'fixed_width') to
pre-format data source and read either generic type of data model.
@author: iregon
"""
from __future__ import unicode_literals
from __future__ import print_function
from __future__ import absolute_import
# CAREFULL HERE:
# Note that in Python 3, the io.open function is an alias for the built-in open function.
# The built-in open function only supports the encoding argument in Python 3, not Python 2.
# https://docs.python.org/3.4/library/io.html?highlight=io
from io import StringIO as StringIO
import sys
import pandas as pd
import numpy as np
import logging
from . import meta_formats
from .. import properties
if sys.version_info[0] >= 3:
py3 = True
else:
py3 = False
from io import BytesIO as BytesIO
# Get pandas dtype for time_stamps
pandas_timestamp_dtype = pd.to_datetime(pd.DataFrame(['20000101'])[0],format='%Y%m%d').dtypes
def add_supplemental(data, supp_section, supp_schema, valid):
# Supplemental data needs to have no sectioning: cannot merge dfs with different level depths in the columns...
try:
supp_format = supp_schema['header'].get('format')
if supp_format in properties.supported_meta_file_formats:
TextParser = data if isinstance(data, pd.io.parsers.TextFileReader) else [data]
TextParser_valid = valid if isinstance(valid, pd.io.parsers.TextFileReader) else [valid]
chunksize = data.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
iidx_offset = chunksize if chunksize else 0
output_buffer = StringIO() if py3 else BytesIO()
output_buffer_valid = StringIO() if py3 else BytesIO()
I_CHUNK = 0
for idata,ivalid in zip(TextParser,TextParser_valid):
date_columns = list(np.where(idata.dtypes == pandas_timestamp_dtype)[0])
dtypes = idata.dtypes.to_dict()
supp, supp_valid = read_model(idata[supp_section],supp_schema, idx_offset = I_CHUNK*iidx_offset )
supp_date_columns = list(np.where(supp.dtypes == pandas_timestamp_dtype)[0] + len(idata.columns) - 1 )
date_columns.extend(supp_date_columns)
date_columns = [ int(x) for x in date_columns ] # reader date parser won't take numpy.int64 from np.where as col index
if I_CHUNK == 0:
o_supp_dtypes = supp.dtypes.to_dict()
else:
o_supp_dtypes.update({ i:supp[i].dtype for i in supp if supp[i].dtype in properties.numpy_floats})
supp_elements = supp.columns.to_list()
supp_dtypes = {}
for element in supp_elements:
supp_dtypes[(supp_section,element)] = o_supp_dtypes.get(element)
dtypes.pop((supp_section,idata[supp_section].columns.to_list()[0]), None)
idata.drop(supp_section, axis = 1, inplace = True, level = 0)# OMG: apparently, with multiindex, this does not drop the columns from idata.columns
ivalid.drop(supp_section, axis = 1, inplace = True, level = 0)
supp.columns = [ (supp_section,x) for x in supp.columns ]
supp_valid.columns = [ (supp_section,x) for x in supp_valid.columns ]
dtypes.update(supp_dtypes)
supp.index = idata.index
supp_valid.index = ivalid.index
column_names = [ x for x in idata if x[0] != supp_section ]
column_names.extend([ x for x in supp ])
new_dtypes = { x:dtypes.get(x) for x in column_names }
idata = pd.concat([idata,supp],sort = False,axis=1)
ivalid = pd.concat([ivalid,supp_valid],sort = False,axis=1)
idata.to_csv(output_buffer,header=False, mode = 'a', encoding = 'utf-8',index = False)
ivalid.to_csv(output_buffer_valid,header=False, mode = 'a', encoding = 'utf-8',index = False)
I_CHUNK += 1
output_buffer.seek(0)
output_buffer_valid.seek(0)
for element in list(dtypes):
if new_dtypes.get(element) == pandas_timestamp_dtype:
new_dtypes[element] = 'object' # Only on output (on reading) will be then converted to datetime64[ns] type, cannot specify 'datetime' here: have to go through parser
data = pd.read_csv(output_buffer,names = idata.columns, dtype = new_dtypes, chunksize = chunksize, parse_dates = date_columns )
valid = pd.read_csv(output_buffer_valid,names = ivalid.columns, chunksize = chunksize)
return data, valid
else:
logging.error('Supplemental file format not supported: {}'.format(supp_format))
logging.warning('Supplemental data not extracted from supplemental section')
return data, valid
except Exception as e:
logging.warning('Supplemental data not extracted from supplemental section', exc_info=True)
return data, valid
def read_model(source,schema, sections = None, chunksize = None, skiprows = None, idx_offset = 0):
meta_format = schema['header'].get('format')
if meta_format not in properties.supported_meta_file_formats:
logging.error('File format read from input schema not supported: {}'.format(meta_format))
return
meta_reader = ".".join(['meta_formats',meta_format])
# 0. GET META FORMAT SUBCLASS ---------------------------------------------
if schema['header'].get('multiple_reports_per_line'): # needs to eval to True if set and True and to false if not set or false, without breaking
format_subclass = '1x'
else:
format_subclass = '11'
# 1. PARSE SCHEMA ---------------------------------------------------------
delimiter = schema['header'].get('delimiter')
parsing_order = schema['header'].get('parsing_order')
# 2. DEFINE OUTPUT --------------------------------------------------------
# 2.1 Sections to read
if not sections:
sections = [ x.get(y) for x in parsing_order for y in x ]
read_sections = [y for x in sections for y in x]
else:
read_sections = sections
multiindex = True if len(read_sections) > 1 or read_sections[0] != properties.dummy_level else False
if format_subclass == '1x':
return schema
# 2.1 Elements names: same order as declared in schema, which is the order in which the readers read them...
names = []
if schema['header'].get('date_parser'):
if multiindex:
names.extend([('_datetime','_datetime')])
else:
names.extend(['_datetime'])
for section in read_sections:
if multiindex:
names.extend([ (section,x) for x in schema['sections'][section]['elements'].keys() if not schema['sections'][section]['elements'][x].get('ignore') ])
else:
names.extend([ x for x in schema['sections'][section]['elements'].keys() if not schema['sections'][section]['elements'][x].get('ignore') ])
# 3. GET DATA FROM SOURCE (DF, FILE OR TEXTREADER):------------------------
# SIMPLE STRING PER REPORT/LINE
logging.info("Getting input data from source...")
source_function = eval(meta_reader + "." + "_".join(['source',format_subclass]))
TextParser = source_function(source,schema, chunksize = chunksize, skiprows = skiprows, delimiter = delimiter)
# 4. DO THE ACTUAL READING
reader_function = eval(meta_reader + "." + 'source_to_df')
logging.info("Reading data...")
[output_buffer,valid_buffer,dtypes] = reader_function(TextParser, schema, read_sections = read_sections, idx_offset = idx_offset )
# 5. OUTPUT DATA:----------------------------------------------------------
# WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
output_buffer.seek(0)
valid_buffer.seek(0)
logging.info("Wrapping output....")
chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
logging.info('Data')
# 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail
date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
for i,element in enumerate(list(dtypes)):
if dtypes.get(element) == 'datetime':
date_columns.append(i)
df_reader = pd.read_csv(output_buffer,names = names, chunksize = chunksize, dtype = dtypes, parse_dates = date_columns)
logging.info('Mask')
valid_reader = pd.read_csv(valid_buffer,names = names, chunksize = chunksize)
return df_reader, valid_reader
{
"0":"Increasing, then decreasing; atmopsheric pressure the same or higher than three hours ago",
"1":"Increasing, then steady; or increasing, then increasing more slowly - Atmospheric pressure now higher than three hours ago",
"2":"Increasing (steadily or unsteadily) - Atmospheric pressure now higher than three hours ago",
"3":"Decreasing or steady, then increasing; or increasing, then increasing more rapidly - Atmospheric pressure now higher than three hours ago",
"4":"Steady; atmopsheric pressure the same as three hours ago",
"5":"Decreasing, then increasing; atmospheric pressure the same ot lower than three hours ago",
"6":"Decreasing, then steady; or decreasing, then decreasing more slowly - Atmospheric pressure now lower than three hours ago",
"7":"Decreasing (steadily or unsteadily) - Atmospheric pressure now lower than three hours ago",
"8":"Steady or increasing, then decreasing; or decreasing, then decreasing more rapidly - Atmospheric pressure now lower than three hours ago"
}
{
"0":"Netherlands",
"1":"Norway",
"2":"US",
"3":"UK",
"4":"France",
"5":"Denmark",
"6":"Italy",
"7":"India",
"8":"Hong Kong",
"9":"New Zealand",
"00":"Netherlands",
"01":"Norway",
"02":"US",
"03":"UK",
"04":"France",
"05":"Denmark",
"06":"Italy",
"07":"India",
"08":"Hong Kong",
"09":"New Zealand",
"10":"Ireland",
"11":"Philippines",
"12":"Egypt",
"13":"Canada",
"14":"Belgium",
"15":"South Africa",
"16":"Australia",
"17":"Japan",
"18":"Pakistan",
"19":"Argentina",
"20":"Sweden",
"21":"Federal Republic of Germany",
"22":"Iceland",
"23":"Israel",
"24":"Malaysia",
"25":"USSR",
"26":"Finland",
"27":"Rep. of Korea",
"28":"New Caledonia",
"29":"Portugal",
"30":"Spain",
"31":"Thailand",
"32":"Yugoslavia",
"33":"Poland",
"34":"Brazil",
"35":"Singapore",
"36":"Kenya",
"37":"Tanzania",
"38":"Uganda",
"39":"Mexico",
"40":"German Democractic Republic",
"AF":"Afghanistan",
"AL":"Albania",
"DZ":"Algeria",
"AD":"Andorra",
"AO":"Angola",
"AG":"Antigua and Barbuda",
"AR":"Argentina",
"AM":"Armenia",
"AW":"Aruba",
"AU":"Australia",
"AT":"Austria",
"AZ":"Azerbaijan",
"BS":"Bahamas",
"BH":"Bahrain",
"BD":"Bangladesh",
"BB":"Barbados",
"BY":"Belarus",
"BE":"Belgium",
"BZ":"Belize",
"BJ":"Benin",
"BT":"Bhutan",
"BO":"Bolivia",
"BA":"Bosnia and Herzegovina",
"BW":"Botswana",
"BR":"Brazil",
"BN":"Brunei Darussalam",
"BG":"Bulgaria",
"BF":"Burkina Faso",
"BI":"Burundi",
"KH":"Cambodia",
"CM":"Cameroon",
"CA":"Canada",
"CV":"Cape Verde",
"CF":"Central African Republic",
"TD":"Chad",
"CL":"Chile",
"CN":"China",
"CO":"Columbia",
"KM":"Comoros",
"CG":"Congo",
"CD":"The Democratic Republic of the Congo",
"CR":"Costa Rica",
"CI":"Cote d'Ivoire",
"HR":"Croatia",
"CU":"Cuba",
"CY":"Cyprus",
"CZ":"Czech Republic",
"DK":"Denmark",
"DJ":"Djibouti",
"DM":"Dominica",
"DO":"Dominican Republic",
"EC":"Ecuador",
"EG":"Egypt",
"SV":"El Salvador",
"GQ":"Equatorial Guinea",
"ER":"Eritrea",
"EE":"Estonia",
"ET":"Ethiopia",
"FJ":"Fiji",
"FI":"Finland",
"FR":"France",
"GA":"Gabon",
"GM":"Gambia",
"GE":"Georgia",
"DE":"Germany",
"GH":"Ghana",
"GR":"Greece",
"GD":"Grenada",
"GT":"Guatemala",
"GN":"Guinea",
"GW":"Guinea Bissau",
"GY":"Guyana",
"HT":"Haiti",
"HN":"Honduras",
"HK":"Hong Kong",
"HU":"Hungary",
"IS":"Iceland",
"IN":"India",
"ID":"Indonesia",
"IR":"Islamic Republic of Iran",
"IQ":"Iraq",
"IE":"Ireland",
"IL":"Israel",
"IT":"Italy",
"JM":"Jamaica",
"JP":"Japan",
"JO":"Jordan",
"KZ":"Kazakhstan",
"KE":"Kenya",
"KI":"Kiribati",
"KR":"Republic of Korea",
"KW":"Kuwait",
"KG":"Kyrgyzstan",
"LA":"Lao Peoples Democratic Republic",
"LV":"Latvia",
"LB":"Lebanon",
"LS":"Lesotho",
"LR":"Liberia",
"LY":"Libyan Arab Jamahiriya",
"LT":"Lithuania",
"LU":"Luxembourg",
"MK":"The Former Yugoslav Republic of Macedonia",
"MG":"Madagascar",
"MW":"Malawi",
"MY":"Malaysia",
"MV":"Maldives",
"ML":"Mali",
"MT":"Malta",
"MH":"Marshal Islands",
"MR":"Mauritania",
"MU":"Mauritius",
"MX":"Mexico",
"FM":"Federated States of Micronesia",
"MD":"Republic of Moldova",
"MC":"Monaco",
"MN":"Mongolia",
"MA":"Morocco",
"MZ":"Mozambique",
"MM":"Myanmar",
"NA":"Namibia",
"NR":"Nauru",
"NP":"Nepal",
"NL":"Netherlands",
"AN":"Netherlands Antilles",
"NZ":"New Zealand",
"NI":"Nicaragua",
"NE":"Niger",
"NG":"Nigeria",
"KP":"Democratic People's Republic of Korea",
"NO":"Norway",
"OM":"Oman",
"PK":"Pakistan",
"PW":"Palau",
"PS":"Occupied Palestinian Territory",
"PA":"Panama",
"PG":"Papua New Guinea",
"PY":"Paraguay",
"PE":"Peru",
"PH":"Philippines",
"PL":"Poland",
"PT":"Portugal",
"QA":"Qatar",
"RO":"Romania",
"RU":"Russian Federation",
"RW":"Rwanda",
"KN":"Saint Kitts and Nevis",
"LC":"Saint Lucia",
"VC":"Saint Vincent and the Grenadines",
"WS":"Samoa",
"SM":"San Marino",
"ST":"Sao Tome And Principe",
"SA":"Saudi Arabia",
"SN":"Senegal",
"CS":"Serbia and Montenegro",
"SC":"Seychelles",
"SL":"Sierra Leone",
"SG":"Singapore",
"SK":"Slovakia",
"SI":"Slovenia",
"SB":"Solomon Islands",
"SO":"Somalia",
"ZA":"South Africa",
"ES":"Spain",
"LK":"Sri Lanka",
"SD":"Sudan",
"SR":"Surinam",
"SZ":"Swaziland",
"SE":"Sweden",
"CH":"Switzerland",
"SY":"Syrian Arab Republic",
"TJ":"Tajikistan",
"TZ":"United Republic of Tanzania",
"TH":"Thailand",
"TL":"Timor - Leste",
"TG":"Togo",
"TO":"Tonga",
"TT":"Trinidad and Tobago",
"TN":"Tunisia",
"TR":"Turkey",
"TM":"Turkmenistan",
"TV":"Tuvala",
"UG":"Uganda",
"UA":"Ukraine",
"AE":"United Arab Emirates",
"GB":"United Kingdom",
"US":"United States",
"UY":"Uruguay",
"UZ":"Uzbekistan",
"VU":"Vanuatu",
"VA":"Vatican City",
"VE":"Venezuela",
"VN":"Viet Nam",
"YE":"Yemen",
"ZM":"Zambia",
"ZW":"Zimbabwe",
"DD":"East Germany",
"CS":"Serbia and Montenegro",
"RU":"Soviet Union",
"NC":"New Caledonia",
"ZY":"None (self recruited)",
"ZZ":"None (third party support)",
"TW":"Taiwan (Province of China)",
"SU":"Soviet Union",
"YU":"Yugoslavia",
"XX":"Multiple recruitment",
"EU":"European Union"
}
{
"0":"No Cirrus, Cirrocumulus or Cirrostratus",
"1":"Cirrus in the form of filaments, strands or hooks, not progressively invading the sky",
"2":"Dense Cirrus, in patches or entangled sheaves, which usually do not increase and sometimes seem to be the remains of the upper part of a Cumulonimbus, or Cirrus with sproutings in the form of small turrets or battlements, or Cirrus having the appearance of cumuliform tufts",
"3":"Dense Cirrus, often in the form of an anvil, being the remains of the upper parts of Cumulonimbus",
"4":"Cirrus in the form of hooks or of filaments, or both, progressively invading the sky; they generally become denser as a whole",
"5":"Cirrus (often in bands converging towards one point or two opposite points of the horizon) and Cirrostratus, or Cirrostratus alone; in either case, they are progressively invading the sky, and generally growing denser as a whole, but the continuous veil does not reach 45 degrees above the horizon.",
"6":"Cirrus (often in bands converging towards one point or two opposite points of the horizon) and Cirrostratus, or Cirrostratus alone; in either case, they are progressively invading the sky, and generally growing denser as a whole; the continuous veil extends more than 45 degrees above the horizon, without the sky being totally covered",
"7":"Veil of Cirrostratus covering the celestial dome",
"8":"Cirrostratus not progressively invading the sky and not completely covering the celestial dome",
"9":"Cirrocumulus alone, or Cirrocumulus accompanied by Cirrus or Cirrostratus, or both, but Cirrocumulus is predominant",
"10":"Cirrus, Cirrocumulus and Cirrostratus invisible owing to darkness, fog, blowing dust or sand, or other similar phenomena, or more often because of the presence of a continuous layer of lower clouds"
}
{
"0":"No Cirrus, Cirrocumulus or Cirrostratus",
"1":"Cirrus in the form of filaments, strands or hooks, not progressively invading the sky",
"2":"Dense Cirrus, in patches or entangled sheaves, which usually do not increase and sometimes seem to be the remains of the upper part of a Cumulonimbus, or Cirrus with sproutings in the form of small turrets or battlements, or Cirrus having the appearance of cumuliform tufts",
"3":"Dense Cirrus, often in the form of an anvil, being the remains of the upper parts of Cumulonimbus",
"4":"Cirrus in the form of hooks or of filaments, or both, progressively invading the sky; they generally become denser as a whole",
"5":"Cirrus (often in bands converging towards one point or two opposite points of the horizon) and Cirrostratus, or Cirrostratus alone; in either case, they are progressively invading the sky, and generally growing denser as a whole, but the continuous veil does not reach 45 degrees above the horizon.",
"6":"Cirrus (often in bands converging towards one point or two opposite points of the horizon) and Cirrostratus, or Cirrostratus alone; in either case, they are progressively invading the sky, and generally growing denser as a whole; the continuous veil extends more than 45 degrees above the horizon, without the sky being totally covered",
"7":"Veil of Cirrostratus covering the celestial dome",
"8":"Cirrostratus not progressively invading the sky and not completely covering the celestial dome",
"9":"Cirrocumulus alone, or Cirrocumulus accompanied by Cirrus or Cirrostratus, or both, but Cirrocumulus is predominant",
"10":"Cirrus, Cirrocumulus and Cirrostratus invisible owing to darkness, fog, blowing dust or sand, or other similar phenomena, or more often because of the presence of a continuous layer of lower clouds"
}
{
"0":"No Altocumulus, Altostratus or Nimbostratus",
"1":"Altostratus, the greater part of which is semitransparent; through this part the sun or moon may be weakly visible, as through ground glass",
"2":"Altostratus, the greater part of which is sufficiently dense to hide the sun or moon, or Nimbostratus",
"3":"Altocumulus, the greater part of which is semitransparent; the various elements of the cloud change only slowly and are all at a single level",
"4":"Patches (often in the form of almonds or fish) of Altocumulus, the greater part of which is semi-transparent; the clouds occur at one or more levels and the elements are continually changing in appearance",
"5":"Altocumulus clouds generally thicken as a whole; Semi-transparent Altocumulus in bands, or Altocumulus, in one or more fairly continuous layer (semi-transparent or opaque), progresively invading the sky; these Altocumulus clouds generally thicken as a whole",
"6":"Altocumulus resulting from the spreading out of Cumulus (or Cumulonimbus)",
"7":"Altocumulus in two or more layers, usually opaque in places, and not progressively invading the sky; or opaque layer of Altocumulus, not progressively invading the sky; or Altocumulus together with Altostratus or Nimbostratus",
"8":"Altocumulus with sproutings in the form of small towers or battlements, or Altocumulus having the appearance of cumuliform tufts",
"9":"Altocumulus of a chaotic sky, generally at several levels",
"10":"Altocumulus, Altostratus and Nimbostratus invisible owing to darkness, fog, blowing dust or sand, or other similar phenomena, or more often because of the presence of a continuous layer of lower clouds"
}
{
"0":"36-point compass",
"1":"32-point compass",
"2":"16 of 36-point compass",
"3":"16 of 32-point compass",
"4":"8-point compass",
"5":"360-point compass",
"6":"high resolution data (e.g., tenths of degrees)"
}
{
"0":"measured",
"1":"computed",
"2":"iced measured",
"3":"iced computed"
}
{
"0":"0",
"1":"45",
"2":"90",
"3":"135",
"4":"180",
"5":"225",
"6":"270",
"7":"315",
"8":"360",
"9":"NULL"
}
{
"0":"0",
"1":"50",
"2":"100",
"3":"200",
"4":"300",
"5":"600",
"6":"1000",
"7":"1500",
"8":"2000",
"9":"2500",
"10":"NULL"
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment