Commit dce32cb0 authored by iregon's avatar iregon
Browse files

Second commit, full validation added back

parent 73bdf289
......@@ -51,8 +51,8 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
logging.error('Data source is empty (first argument to read()) ')
return
elif not os.path.isfile(source):
logging.error('Can\'t reach data source {} as a file'.format(source))
logging.info('Supported in-memory data sources are {}'.format(",".join(properties.supported_sources)))
logging.error('Could not open data source file {}'.format(source))
logging.info('Otherwise, supported in-memory data sources are {}'.format(",".join([ str(x) for x in properties.supported_sources])))
return
if not validate_arg('sections',sections,list):
return
......@@ -71,7 +71,6 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun
imodel = data_model if data_model else data_model_path
logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel))
data, valid = reader.read_model(source,schema, sections = sections, chunksize = chunksize, skiprows = skiprows)
# 4. Create out data attributes
logging.info("CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL(S)")
data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names']
......
......@@ -184,8 +184,8 @@ def extract_sections(string_df):
section_dict = dict()
section_groups = [ d[x] for d in parsing_order for x in d.keys() ]
sections = [item for sublist in section_groups for item in sublist]
for section in sections:
#section_dict[section] = pd.DataFrame() # Index as initial size to help final merging
section_dict[section] = pd.Series()
thread_ids = [ x for x in threads.keys() if threads[x]['section'] == section ]
for thread_id in thread_ids:
......@@ -200,20 +200,24 @@ def get_sections(string_df, schema, read_sections):
global sentinals, section_lens, sentinals_lens
global parsing_order
# Proceed to split sections if more than one
# else return section in a named column
if len(schema['sections'].keys())> 1:
section_lens = { section: schema['sections'][section]['header'].get('length') for section in schema['sections'].keys()}
sentinals = { section: schema['sections'][section]['header'].get('sentinal') for section in schema['sections'].keys()}
sentinals_lens = { section: len(sentinals.get(section)) if sentinals.get(section) else 0 for section in sentinals.keys()}
parsing_order = schema['header']['parsing_order']
# Get sections separated
section_strings = extract_sections(string_df)
# Paste in order in a single dataframe with columns named as sections
# Do not include sections not asked for
# Get sections separated: section dict has a key:value pair for each
# section in the data model. If the section does not exist in the data,
# the value is an empty pd.Series
section_dict = extract_sections(string_df)
# Paste in order (as read_sections) in a single dataframe with columns
# named as sections:
# - Drop unwanted sections
# - Keep requested but non-existent sections
df_out = pd.DataFrame()
for section in read_sections:
df_out = pd.concat([df_out,section_strings[section].rename(section)],sort = False,axis=1)
df_out = pd.concat([df_out,section_dict[section].rename(section)],sort = False,axis=1)
else:
# return section in named column
df_out = string_df
df_out.columns = read_sections
......
......@@ -62,7 +62,7 @@ def read_data(section_df,section_schema):
section_df[element] = converters.get(section_dtypes.get(element))(section_df[element], **kwargs)
section_valid[element] = missing | section_df[element].notna()
return section_df,section_valid
def read_sections(sections_df, schema):
......
......@@ -59,11 +59,16 @@ def read_model(source,schema, sections = None, chunksize = None, skiprows = None
valid_buffer = StringIO()
for i,string_df in enumerate(TextParser):
# a. Get sections separated in a dataframe columns:
# one per column, only requested sections, ignore rest.
# a. Get a DF with sections separated in columns:
# - one section per column
# - only sections requested, ignore rest
# - requested NA sections as NaN columns
# - columns order as in read_sections_list
sections_df = get_sections.get_sections(string_df, schema, read_sections_list)
# b. Read elements from sections: along data chunks, resulting data types
# may vary if gaps, keep track of data types!
# Sections as parsed in the same order as sections_df.columns
[data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema)
if i == 0:
out_dtypes = copy.deepcopy(out_dtypesi)
......@@ -73,16 +78,16 @@ def read_model(source,schema, sections = None, chunksize = None, skiprows = None
out_dtypes.update({ k:out_dtypesi.get(k) })
# Save to buffer
data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
valid_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
valid_df.to_csv(valid_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False)
# 4. OUTPUT DATA ----------------------------------------------------------
# WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE
data_buffer.seek(0)
valid_buffer.seek(0)
logging.info("Wrapping output....")
# Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader
# (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input):
# This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader
# Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader
# (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input):
# This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader
chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None
# 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail
date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples....
......
......@@ -149,6 +149,8 @@ def df_schema(df_columns, schema, data_model, supp_section = None, supp_schema =
for section in schema.get('sections'):
if section == properties.dummy_level:
flat_schema.update(schema['sections'].get(section).get('elements'))
elif schema['sections'].get(section).get('header').get('disable_read'):
flat_schema.update( { (section, section): {'column_type':'object'} })
else:
flat_schema.update( { (section, x): schema['sections'].get(section).get('elements').get(x) for x in schema['sections'].get(section).get('elements') })
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment